46#define MY_LOG_DEBUG_ID(x) \
47 if (GETOPT("debug")) \
48 cerr << __FILE__ << ":" << __LINE__ << " " << x << endl;
54#define MY_LOG_DEBUG_ID(x)
65template <
class T,
class Key>
70 unsigned short buf_id;
77 :
x(e), buf_id(
bid), str_id(
sid)
83 void set(T &e,
unsigned short bid,
unsigned int sid)
89 T
elt()
const {
return x; }
97 return s <<
"<buf_id=" <<
elt.buf_id <<
",str_id=" <<
elt.str_id <<
"> "
104 return (e1.getPriority() < e2.getPriority());
109 return (e1.getPriority() <= e2.getPriority());
114 return (e1.getPriority() > e2.getPriority());
119 return (e1.getPriority() >= e2.getPriority());
124 return (e1.getPriority() != e2.getPriority());
129 return (e1.getPriority() == e2.getPriority());
135template <
class T,
class Key>
146 printf(
"EM_PQUEUE:available memory before allocation: %.2fMB\n",
148 printf(
"EM_PQUEUE:available memory before allocation: %ldB\n",
mm_avail);
153 assert(pqsize > 0 && bufsize > 0);
155 MEMORY_LOG(
"em_pqueue: allocating int pqueue\n");
167 "em_pqueue: allocating array of %ld buff pointers\n",
174 for (
unsigned short i = 0; i < max_nbuf; i++) {
183 printf(
"EM_PQUEUE: available memory after allocation: %.2fMB\n",
191 cout <<
"em_pqueue constructor: failing to get stream_usage\n";
194 cout <<
"EM_PQUEUE:AMI_stream memory usage: " <<
sz_stream << endl;
195 cout <<
"EM_PQUEUE: item size=" <<
sizeof(T) << endl;
203 cout <<
"EM_PQUEUE: mm_overhead estimated as " <<
mm_overhead << endl;
205 cout <<
"overhead bigger than available memory"
206 <<
"increase -m and try again\n";
212 cout <<
"pqsize=" << pqsize <<
", bufsize=" << bufsize
215 cout <<
"sorry - empq exceeds memory limits\n";
216 cout <<
"try again decreasing arity or pqsize/bufsize\n";
223template <
class T,
class Key>
234 printf(
"EM_PQUEUE:available memory before allocation: %.2fMB\n",
243 cout <<
"em_pqueue constructor: failing to get main_memory_usage\n";
246 cout <<
"EM_PQUEUE:AMI_stream memory usage: " <<
sz_stream << endl;
247 cout <<
"EM_PQUEUE: item size=" <<
sizeof(T) << endl;
259 cout <<
"EM_PQUEUE: mm_overhead estimated as " <<
mm_overhead << endl;
261 cout <<
"overhead bigger than available memory"
262 <<
"increase -m and try again\n";
269 pqsize =
mm_avail / (2 *
sizeof(T));
271 bufsize =
mm_avail / (2 *
sizeof(T));
274 pqsize =
mm_avail / (4 *
sizeof(T));
276 bufsize =
mm_avail / (4 *
sizeof(T));
279 cout <<
"EM_PQUEUE: pqsize set to " << pqsize << endl;
280 cout <<
"EM_PQUEUE: bufsize set to " << bufsize << endl;
281 cout <<
"EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
305 cout <<
"EM_PQUEUE: arity set to " << buf_arity << endl;
310 MEMORY_LOG(
"em_pqueue: allocating int pqueue\n");
322 "em_pqueue: allocating array of %ld buff pointers\n",
328 for (
unsigned short i = 0; i < max_nbuf; i++) {
333 cout <<
"EM_PQUEUE: maximum length is " << maxlen() <<
"\n";
340 printf(
"EM_PQUEUE: available memory after allocation: %.2fMB\n",
352template <
class T,
class Key>
366 cout <<
"EMPQ: pq maxsize=" << pqsize
394 cout <<
"EM_PQUEUE: allocating im_buffer size=" << bufsize <<
" total "
395 << (
float)bufsize *
sizeof(T) / (1 << 20) <<
"MB\n";
399 cout <<
"EM_PQUEUE: allocating pq size=" << pqsize <<
" total "
423 cout <<
"em_pqueue constructor: failing to get main_memory_usage\n";
426 cout <<
"EM_PQUEUE: AMI_stream memory usage: " <<
sz_stream << endl;
427 cout <<
"EM_PQUEUE: item size=" <<
sizeof(T) << endl;
432 if (buf_arity == 0) {
433 cout <<
"EM_PQUEUE: arity=0 (not enough memory..)\n";
451 "em_pqueue: allocating array of %ld buff pointers\n",
456 for (
unsigned short i = 0; i < max_nbuf; i++) {
461 cout <<
"EM_PQUEUE: new pqsize set to " <<
pqcapacity << endl;
462 cout <<
"EM_PQUEUE: bufsize set to " << bufsize << endl;
463 cout <<
"EM_PQUEUE: buf arity set to " << buf_arity << endl;
464 cout <<
"EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
465 cout <<
"EM_PQUEUE: maximum length is " << maxlen() <<
"\n";
470 printf(
"EM_PQUEUE: available memory after allocation: %.2fMB\n",
478 MEMORY_LOG(
"em_pqueue::empty_buff_0: create new em_buffer\n");
480 buff[0]->insert(
amis);
489template <
class T,
class Key>
503 for (
unsigned short i = 0; i < crt_buf; i++) {
512template <
class T,
class Key>
517 printf(
"em_pqueue::max_len: level=%d exceeds capacity=%d\n", i,
522 return buff[i]->get_buf_maxlen();
527 cout <<
"em_pqueue::max_len: cannot allocate\n";
530 long len = tmp->get_buf_maxlen();
537template <
class T,
class Key>
541 for (
unsigned short i = 0; i < max_nbuf; i++) {
544 return len + buff_0->get_buf_maxlen();
549template <
class T,
class Key>
553 unsigned long elen = 0;
554 for (
unsigned short i = 0; i < crt_buf; i++) {
555 elen += buff[i]->get_buf_len();
557 return elen + pq->size() + buff_0->get_buf_len();
562template <
class T,
class Key>
568 return ((pq->size() == 0) && (buff_0->get_buf_len() == 0) && (size() == 0));
573template <
class T,
class Key>
580 for (
unsigned short i = 0; i < crt_buf; i++) {
581 k |= buff[i]->get_buf_len();
584 cerr <<
"fillpq called with empty external buff!" << endl;
590#ifdef EMPQ_PQ_FILL_PRINT
591 cout <<
"filling pq\n";
594 XXX cerr <<
"filling pq" << endl;
601 "em_pqueue::fillpq: allocate array of %hd AMI_STREAMs\n",
610 for (
unsigned short i = 0; i < crt_buf; i++) {
613 assert(buff[i]->get_buf_len());
634 for (
int i = 0; i < crt_buf; i++) {
651 XXX cerr <<
"fillpq done" << endl;
657template <
class T,
class Key>
678 if (buff_0->is_empty()) {
683#ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
684 cout <<
"filling pq from B0\n";
688 long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
689 buff_0->reset(pqsize, n);
700 XXX cerr <<
"fillpq done; about to take min" << endl;
702 XXX cerr <<
"after taking min" << endl;
711template <
class T,
class Key>
726template <
class T,
class Key>
738 ok = pq->extract_min(elt);
744 MY_LOG_DEBUG_ID(
"internal pq empty: filling it up from external buffers");
751 if (buff_0->is_empty()) {
755#ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
756 cout <<
"filling pq from B0\n";
761 long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
762 buff_0->reset(pqsize, n);
763 ok = pq->extract_min(elt);
772#ifdef EMPQ_PRINT_SIZE
774 y =
x *
sizeof(T) >> 20;
775 cout <<
"pqsize:[" << active_streams() <<
" streams: ";
776 print_stream_sizes();
777 cout <<
" total " <<
x <<
"(" << y <<
"MB)]" << endl;
782 XXX cerr <<
"about to get the min" << endl;
784 ok = pq->extract_min(elt);
786 cout <<
"failing assertion: pq->extract_min == true\n";
801template <
class T,
class Key>
813 if (!extract_min(elt)) {
820 !(
next_elt.getPriority() == elt.getPriority())) {
833#ifdef EMPQ_PRINT_EXTRACTALL
834 cout <<
"EXTRACTED: " << elt << endl;
837#ifdef EMPQ_PRINT_EMPQ
850template <
class T,
class Key>
887 XXX cerr <<
"pqsize=" << pqsize << endl;
888 XXX if (
strEmpty) cerr <<
"stream is empty!!" << endl;
889 for (
unsigned int i = 0; i < pqsize; i++) {
937 buff_0->shift_left(
bufPos);
940#ifdef EMPQ_PQ_FILL_PRINT
941 cout <<
"merge_bufs2pq: pq filled; now cleaning\n";
953template <
class T,
class Key>
955 unsigned int stream_id)
960 assert(stream_id < buff[buf_id]->get_nbstreams());
962 buff[buf_id]->incr_deleted(stream_id);
967template <
class T,
class Key>
972#ifdef EMPQ_PQ_FILL_PRINT
973 cout <<
"em_pqueue: cleanup enter\n";
977 for (
unsigned short i = 0; i < crt_buf; i++) {
982 short i = crt_buf - 1;
983 while ((i >= 0) && buff[i]->is_empty()) {
988#ifdef EMPQ_PQ_FILL_PRINT
989 cout <<
"em_pqueue: cleanup done\n";
999template <
class T,
class Key>
1003#ifdef EMPQ_ASSERT_EXPENSIVE
1010 if ((crt_buf == 0) && (buff_0->is_empty())) {
1026#ifdef EMPQ_ASSERT_EXPENSIVE
1035 pq->extract_max(val);
1046#ifdef EMPQ_ASSERT_EXPENSIVE
1049 if (buff_0->is_full()) {
1050#ifdef EMPQ_PRINT_SIZE
1052 y =
x *
sizeof(T) >> 20;
1053 cout <<
"pqsize:[" << active_streams() <<
" streams: ";
1054 print_stream_sizes();
1055 cout <<
" total " <<
x <<
"(" << y <<
"MB)]" << endl;
1060#ifdef EMPQ_ASSERT_EXPENSIVE
1064 assert(!buff_0->is_full());
1066 ok = buff_0->insert(val);
1069#ifdef EMPQ_PRINT_INSERT
1070 cout <<
"INSERTED: " <<
x << endl;
1073#ifdef EMPQ_PRINT_EMPQ
1085template <
class T,
class Key>
1088#ifdef EMPQ_ASSERT_EXPENSIVE
1092#ifdef EMPQ_EMPTY_BUF_PRINT
1093 cout <<
"emptying buff_0\n";
1099 assert(buff_0->is_full());
1104#ifdef EMPQ_ASSERT_EXPENSIVE
1110 MEMORY_LOG(
"em_pqueue::empty_buff_0: create new em_buffer\n");
1114 assert(buff_0->get_buf_len() == buff[0]->get_stream_maxlen());
1126#ifdef EMPQ_ASSERT_EXPENSIVE
1131 if (buff[0]->is_full()) {
1142#ifdef EMPQ_ASSERT_EXPENSIVE
1154template <
class T,
class Key>
1158#ifdef EMPQ_ASSERT_EXPENSIVE
1161#ifdef EMPQ_EMPTY_BUF_PRINT
1162 cout <<
"emptying buffer_" << i <<
"\n";
1171 assert(buff[i]->is_full());
1174 if (i == max_nbuf - 1) {
1175 cerr <<
"empty_buff:: cannot empty further - structure is full..\n";
1177 cerr <<
"ext buff array should reallocate in a future version..\n";
1186 "em_pqueue::empty_buff( %hd ) allocate new em_buffer\n", i);
1203 <<
" , bufflen: " << buff[i]->get_buf_len() << endl;
1214#ifdef EMPQ_ASSERT_EXPENSIVE
1218#ifdef EMPQ_ASSERT_EXPENSIVE
1223#ifdef EMPQ_ASSERT_EXPENSIVE
1230 if (buff[i + 1]->is_full()) {
1236 if (crt_buf < i + 2)
1239#ifdef EMPQ_ASSERT_EXPENSIVE
1256template <
class T,
class Key>
1260 long *
bos =
buf->get_bos();
1262 unsigned short bufid =
buf->get_level() - 1;
1264 unsigned int arity =
buf->get_nbstreams();
1280 MEMORY_LOG(
"em_pqueue::merge_buffer: allocate keys array\n");
1286 for (i = 0; i < arity; i++) {
1290 cerr <<
"WARNING!!! EARLY EXIT!!!" << endl;
1308 cerr <<
"WARNING!!! EARLY EXIT!!!" << endl;
1331 cerr <<
"WARNING!!! EARLY EXIT!!!" << endl;
1351 cerr <<
"WARNING!!! early breakout!!!" << endl;
1379template <
class T,
class Key>
1381 unsigned short arity,
1397 MEMORY_LOG(
"em_pqueue::merge_streams: allocate keys array\n");
1403 for (
int i = 0; i < arity; i++) {
1481template <
class T,
class Key>
1487 for (
int i = 0; i < crt_buf; i++) {
1497template <
class T,
class Key>
1500 cout <<
"EM_PQ: [pq=" << pqsize <<
", b=" << bufsize
1501 <<
", bufs=" << max_nbuf <<
", ar=" << buf_arity <<
"]\n";
1513 for (
unsigned short i = 0; i < crt_buf; i++) {
1514 cout <<
"B" << i + 1 <<
": ";
1515 buff[i]->print_range();
1522template <
class T,
class Key>
1525 cout <<
"EM_PQ: [pq=" << pqsize <<
", b=" << bufsize
1526 <<
", bufs=" << max_nbuf <<
", ar=" << buf_arity <<
"]\n";
1536 for (
unsigned short i = 0; i < crt_buf; i++) {
1537 cout <<
"B" << i + 1 <<
": " << endl;
1545template <
class T,
class Key>
1550 cout <<
"EMPQ: pq=" << pq->size() <<
",B0=" << buff_0->get_buf_len()
1553 for (
unsigned short i = 0; i < crt_buf; i++) {
1555 cout <<
"B_" << i + 1 <<
":";
1557 buff[i]->print_stream_sizes();
1558 elen += buff[i]->get_buf_len();
1561 cout <<
"total: " <<
elen + pq->size() + buff_0->get_buf_len() << endl
1567template <
class T,
class Key>
1570 for (
unsigned short i = 0; i < crt_buf; i++) {
1572 buff[i]->print_stream_sizes();
@ AMI_ERROR_END_OF_STREAM
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
AMI_err write_item(const T &elt)
AMI_err seek(off_t offset)
AMI_err read_item(T **elt)
friend int operator<=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
friend ostream & operator<<(ostream &s, const ExtendedEltMergeType< T, Key > &elt)
unsigned int stream_id() const
friend int operator!=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
friend int operator>(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
friend int operator>=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid)
unsigned short buffer_id() const
friend int operator<(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
void set(T &e, unsigned short bid, unsigned int sid)
friend int operator==(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
void print_stream_sizes()
void delete_str_elt(unsigned short buf_id, unsigned int stream_id)
bool insert(const T &elt)
AMI_err merge_buffer(em_buffer< T, Key > *buf, AMI_STREAM< ExtendedEltMergeType< T, Key > > *outstr, long K)
AMI_err merge_streams(AMI_STREAM< ExtendedEltMergeType< T, Key > > **instr, unsigned short arity, AMI_STREAM< ExtendedEltMergeType< T, Key > > *outstr, long K)
void empty_buff(unsigned short i)
bool extract_all_min(T &elt)
void merge_bufs2pq(AMI_STREAM< ExtendedEltMergeType< T, Key > > *minstream)
#define MY_LOG_DEBUG_ID(x)
#define ExtendedMergeStream
#define assert(condition)
@ MM_STREAM_USAGE_MAXIMUM
void MEMORY_LOG(const std::string &str)
size_t getAvailableMemory()