GRASS 8 Programmer's Manual 8.6.0dev(2026)-ddeab64dbf
Loading...
Searching...
No Matches
empq_impl.h
Go to the documentation of this file.
1/****************************************************************************
2 *
3 * MODULE: iostream
4 *
5
6 * COPYRIGHT (C) 2007 Laura Toma
7 *
8 *
9
10 * Iostream is a library that implements streams, external memory
11 * sorting on streams, and an external memory priority queue on
12 * streams. These are the fundamental components used in external
13 * memory algorithms.
14
15 * Credits: The library was developed by Laura Toma. The kernel of
16 * class STREAM is based on the similar class existent in the GPL TPIE
17 * project developed at Duke University. The sorting and priority
18 * queue have been developed by Laura Toma based on communications
19 * with Rajiv Wickremesinghe. The library was developed as part of
20 * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21 * Rajiv Wickremesinghe as part of the Terracost project.
22
23 *
24 * This program is free software; you can redistribute it and/or modify
25 * it under the terms of the GNU General Public License as published by
26 * the Free Software Foundation; either version 2 of the License, or
27 * (at your option) any later version.
28 *
29
30 * This program is distributed in the hope that it will be useful,
31 * but WITHOUT ANY WARRANTY; without even the implied warranty of
32 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33 * General Public License for more details. *
34 * **************************************************************************/
35
36#ifndef __EMPQ_IMPL_H
37#define __EMPQ_IMPL_H
38
39#include <ostream>
40#include <vector>
41
42#include "empq.h"
43
44#if (0)
45#include "option.H"
46#define MY_LOG_DEBUG_ID(x) \
47 if (GETOPT("debug")) \
48 cerr << __FILE__ << ":" << __LINE__ << " " << x << endl;
49#endif
50
51#undef XXX
52#define XXX if (0)
53
54#define MY_LOG_DEBUG_ID(x)
55
56/*****************************************************************/
57/* encapsulation of the element=<key/prio, data> together with <buffer_id>
58 and <stream_id>; used during stream merging to remember where each
59 key comes from;
60
61 assumes that class T implements: Key getPriority()
62
63 implements operators {<, <=, ...} such that a< b iff a.x.prio < b.x.prio
64*/
65template <class T, class Key>
67
68private:
69 T x;
70 unsigned short buf_id;
71 unsigned int str_id;
72
73public:
75
76 ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid)
77 : x(e), buf_id(bid), str_id(sid)
78 {
79 }
80
82
83 void set(T &e, unsigned short bid, unsigned int sid)
84 {
85 x = e;
86 buf_id = bid;
87 str_id = sid;
88 }
89 T elt() const { return x; }
90 unsigned short buffer_id() const { return buf_id; }
91 unsigned int stream_id() const { return str_id; }
92 Key getPriority() const { return x.getPriority(); }
93 // print
94 friend ostream &operator<<(ostream &s,
96 {
97 return s << "<buf_id=" << elt.buf_id << ",str_id=" << elt.str_id << "> "
98 << elt.x << " ";
99 }
100
103 {
104 return (e1.getPriority() < e2.getPriority());
105 }
108 {
109 return (e1.getPriority() <= e2.getPriority());
110 }
113 {
114 return (e1.getPriority() > e2.getPriority());
115 }
118 {
119 return (e1.getPriority() >= e2.getPriority());
120 }
123 {
124 return (e1.getPriority() != e2.getPriority());
125 }
128 {
129 return (e1.getPriority() == e2.getPriority());
130 }
131};
132
133//************************************************************/
134// create an em_pqueue
135template <class T, class Key>
137 unsigned int buf_ar)
138 : pqsize(pq_sz), bufsize(buf_sz), max_nbuf(nb_buf), crt_buf(0),
139 buf_arity(buf_ar)
140{
141
142 //____________________________________________________________
143 // ESTIMATE AVAILABLE MEMORY BEFORE ALLOCATION
144 AMI_err ae;
145 size_t mm_avail = getAvailableMemory();
146 printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
147 mm_avail / (float)(1 << 20));
148 printf("EM_PQUEUE:available memory before allocation: %ldB\n", mm_avail);
149
150 //____________________________________________________________
151 // ALLOCATE STRUCTURE
152 // some dummy checks..
153 assert(pqsize > 0 && bufsize > 0);
154
155 MEMORY_LOG("em_pqueue: allocating int pqueue\n");
156 // initialize in memory pqueue
157 pq = new MinMaxHeap<T>(pqsize);
158 assert(pq);
159
160 MEMORY_LOG("em_pqueue: allocating buff_0\n");
161 // initialize in memory buffer
162 buff_0 = new im_buffer<T>(bufsize);
163 assert(buff_0);
164
165 char str[200];
166 snprintf(str, sizeof(str),
167 "em_pqueue: allocating array of %ld buff pointers\n",
168 (long)max_nbuf);
169 MEMORY_LOG(str);
170
171 // allocate ext memory buffers array
172 buff = new em_buffer<T, Key> *[max_nbuf];
173 assert(buff);
174 for (unsigned short i = 0; i < max_nbuf; i++) {
175 buff[i] = NULL;
176 }
177
178 //____________________________________________________________
179 // some memory checks- make sure the empq fits in memory !!
180
181 // estimate available memory after allocation
183 printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
184 mm_avail / (float)(1 << 20));
185
186 // estimate AMI_STREAM memory usage
187 size_t sz_stream;
188 AMI_STREAM<T> dummy;
191 cout << "em_pqueue constructor: failing to get stream_usage\n";
192 exit(1);
193 }
194 cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
195 cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
196
197 // estimate memory overhead
198 long mm_overhead = buf_arity * sizeof(merge_key<Key>) +
199 max_nbuf * sizeof(em_buffer<T, Key>) + 2 * sz_stream +
200 max_nbuf * sz_stream;
201
202 mm_overhead *= 8; // overestimate
203 cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
204 if (mm_overhead > mm_avail) {
205 cout << "overhead bigger than available memory"
206 << "increase -m and try again\n";
207 exit(1);
208 }
210
211 // arity*sizeof(AMI_STREAM) < memory
212 cout << "pqsize=" << pqsize << ", bufsize=" << bufsize
213 << ", maximum allowed arity=" << mm_avail / sz_stream << endl;
214 if (buf_arity * sz_stream > mm_avail) {
215 cout << "sorry - empq exceeds memory limits\n";
216 cout << "try again decreasing arity or pqsize/bufsize\n";
217 cout.flush();
218 }
219}
220
221//************************************************************/
222// create an em_pqueue capable to store <= N elements
223template <class T, class Key>
225{
226
227 MY_LOG_DEBUG_ID("em_pqueue constructor");
228
229 /************************************************************/
230 // available memory
231 AMI_err ae;
232 // available memory
233 size_t mm_avail = getAvailableMemory();
234 printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
235 mm_avail / (float)(1 << 20));
236 cout.flush();
237
238 // AMI_STREAM memory usage
239 size_t sz_stream;
240 AMI_STREAM<T> dummy;
243 cout << "em_pqueue constructor: failing to get main_memory_usage\n";
244 exit(1);
245 }
246 cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
247 cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
248 cout.flush();
249 // assume max_nbuf=2 suffices; check after arity is computed
250 max_nbuf = 2;
251
252 // account for temporary memory usage (set up a preliminary arity)
253 buf_arity = mm_avail / (2 * sz_stream);
254 long mm_overhead = buf_arity * sizeof(merge_key<Key>) +
255 max_nbuf * sizeof(em_buffer<T, Key>) + 2 * sz_stream +
256 max_nbuf * sz_stream;
257
258 mm_overhead *= 8; // overestimate
259 cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
260 if (mm_overhead > mm_avail) {
261 cout << "overhead bigger than available memory"
262 << "increase -m and try again\n";
263 exit(1);
264 }
266
267#ifdef SAVE_MEMORY
268 // assign M/2 to pq
269 pqsize = mm_avail / (2 * sizeof(T));
270 // assign M/2 to buff_0
271 bufsize = mm_avail / (2 * sizeof(T));
272#else
273 // assign M/4 to pq
274 pqsize = mm_avail / (4 * sizeof(T));
275 // assign M/4 to buff_0
276 bufsize = mm_avail / (4 * sizeof(T));
277#endif
278
279 cout << "EM_PQUEUE: pqsize set to " << pqsize << endl;
280 cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
281 cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
282
283 // assign M/2 to AMI_STREAMS and compute arity
284 /* arity is mainly constrained by the size of an AMI_STREAM; the
285 rest of the memory must accommodate for arity * max_nbuf
286 *sizeof(AMI_STREAM); there are some temporary stuff like arity *
287 sizeof(long) (the deleted array), arity * sizeof(T) (the array of
288 keys for merging) and so on, but the main factor is the
289 AMI_STREAM size which is roughly B * LBS * 2 (each AMI_STREAM
290 allocates 2 logical blocks) */
291#ifdef SAVE_MEMORY
292 buf_arity = mm_avail / (2 * sz_stream);
293#else
294 buf_arity = mm_avail / (2 * max_nbuf * sz_stream);
295#endif
296
297 // overestimate usage
298 if (buf_arity > 3) {
299 buf_arity -= 3;
300 }
301 else {
302 buf_arity = 1;
303 }
304
305 cout << "EM_PQUEUE: arity set to " << buf_arity << endl;
306
307 crt_buf = 0;
308
309 // initialize in memory pqueue
310 MEMORY_LOG("em_pqueue: allocating int pqueue\n");
311 pq = new MinMaxHeap<T>(pqsize);
312 assert(pq);
313
314 // initialize in memory buffer
315 MEMORY_LOG("em_pqueue: allocating buff_0\n");
316 buff_0 = new im_buffer<T>(bufsize);
317 assert(buff_0);
318
319 // allocate ext memory buffers array
320 char str[200];
321 snprintf(str, sizeof(str),
322 "em_pqueue: allocating array of %ld buff pointers\n",
323 (long)max_nbuf);
324 MEMORY_LOG(str);
325 // allocate ext memory buffers array
326 buff = new em_buffer<T, Key> *[max_nbuf];
327 assert(buff);
328 for (unsigned short i = 0; i < max_nbuf; i++) {
329 buff[i] = NULL;
330 }
331
332 // max nb of items the structure can accommodate (constrained by max_nbuf)
333 cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
334 cout.flush();
335
336 // check that structure can accommodate N elements
337 // assert(N < buf_arity * (buf_arity + 1) * bufsize);
338 // assert(N < maxlen());
340 printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
341 mm_avail / (float)(1 << 20));
342}
343
344#ifdef SAVE_MEMORY
345//************************************************************/
346// create an empq, initialize its pq with im and insert amis in
347// buff[0]; im should not be used/deleted after that outside empq;
348//
349// assumption: im was allocated such that maxsize = mm_avail/T;
350// when this constructor is called im is only half full, so we must
351// free half of its space and give to buff_0
352template <class T, class Key>
354{
355 AMI_err ae;
356 int pqcapacity; /* amount of memory we can use for each of new
357 minmaxheap, and em-buffer */
358 unsigned int pqcurrentsize; /* number of elements currently in im */
359 assert(im && amis);
360
361 pqcapacity = im->get_maxsize() / 2; // we think this memory is now available
362 pqsize = pqcapacity + 1; // truncate errors
363 pqcurrentsize = im->size();
364 // assert( pqcurrentsize <= pqsize);
365 if (!(pqcurrentsize <= pqsize)) {
366 cout << "EMPQ: pq maxsize=" << pqsize
367 << ", pq crtsize=" << pqcurrentsize << "\n";
368 assert(0);
369 exit(1);
370 }
371
373
374 /* at this point im is allocated all memory, but it is only at most
375 half full; we need to relocate im to half space and to allocate
376 buff_0 the other half; since we use new, there is no realloc, so
377 we will copy to a file...*/
378
379 {
380 // copy im to a stream and free its memory
381 T x;
383 for (unsigned int i = 0; i < pqcurrentsize; i++) {
384 im->extract_min(x);
387 }
388 delete im;
389 im = NULL;
391
392 // allocate pq and buff_0 half size
393 bufsize = pqcapacity;
394 cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize << " total "
395 << (float)bufsize * sizeof(T) / (1 << 20) << "MB\n";
396 cout.flush();
397 buff_0 = new im_buffer<T>(bufsize);
398 assert(buff_0);
399 cout << "EM_PQUEUE: allocating pq size=" << pqsize << " total "
400 << (float)pqcapacity * sizeof(T) / (1 << 20) << "MB\n";
401 cout.flush();
402 pq = new MinMaxHeap<T>(pqsize);
403 assert(pq);
404
405 // fill pq from tmp stream
406 ae = tmpstr.seek(0);
408 T *elt;
409 for (unsigned int i = 0; i < pqcurrentsize; i++) {
410 ae = tmpstr.read_item(&elt);
412 pq->insert(*elt);
413 }
414 assert(pq->size() == pqcurrentsize);
415 }
416
417 // estimate buf_arity
418 // AMI_STREAM memory usage
419 size_t sz_stream;
420 AMI_STREAM<T> dummy;
423 cout << "em_pqueue constructor: failing to get main_memory_usage\n";
424 exit(1);
425 }
426 cout << "EM_PQUEUE: AMI_stream memory usage: " << sz_stream << endl;
427 cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
428 // assume max_nbuf=2 suffices; check after arity is computed
429 max_nbuf = 2;
430 buf_arity = pqcapacity * sizeof(T) / sz_stream;
431 // should account for some overhead
432 if (buf_arity == 0) {
433 cout << "EM_PQUEUE: arity=0 (not enough memory..)\n";
434 exit(1);
435 }
436 if (buf_arity > 3) {
437 buf_arity -= 3;
438 }
439 else {
440 buf_arity = 1;
441 }
442
443 // added on 05/16/2005 by Laura
444 if (buf_arity > MAX_STREAMS_OPEN) {
445 buf_arity = MAX_STREAMS_OPEN;
446 }
447
448 // allocate ext memory buffer array
449 char str[200];
450 snprintf(str, sizeof(str),
451 "em_pqueue: allocating array of %ld buff pointers\n",
452 (long)max_nbuf);
453 MEMORY_LOG(str);
454 buff = new em_buffer<T, Key> *[max_nbuf];
455 assert(buff);
456 for (unsigned short i = 0; i < max_nbuf; i++) {
457 buff[i] = NULL;
458 }
459 crt_buf = 0;
460
461 cout << "EM_PQUEUE: new pqsize set to " << pqcapacity << endl;
462 cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
463 cout << "EM_PQUEUE: buf arity set to " << buf_arity << endl;
464 cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
465 cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
466 cout.flush();
467
468 // estimate available remaining memory
469 size_t mm_avail = getAvailableMemory();
470 printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
471 mm_avail / (float)(1 << 20));
472
473 // last thing: insert the input stream in external buffers
474 // allocate buffer if necessary
475 // assert(crt_buf==0 && !buff[0]);// given
476 if (amis->stream_len()) {
477 // create buff[0] as a level1 buffer
478 MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
479 buff[0] = new em_buffer<T, Key>(1, bufsize, buf_arity);
480 buff[0]->insert(amis);
481 crt_buf = 1;
482 }
483}
484
485#endif
486
487//************************************************************/
488// free space
489template <class T, class Key>
491{
492 // delete in memory pqueue
493 if (pq) {
494 delete pq;
495 pq = NULL;
496 }
497 // delete in memory buffer
498 if (buff_0) {
499 delete buff_0;
500 buff_0 = NULL;
501 }
502 // delete ext memory buffers
503 for (unsigned short i = 0; i < crt_buf; i++) {
504 if (buff[i])
505 delete buff[i];
506 }
507 delete[] buff;
508}
509
510//************************************************************/
511// return maximum capacity of i-th external buffer
512template <class T, class Key>
513long em_pqueue<T, Key>::maxlen(unsigned short i)
514{
515
516 if (i >= max_nbuf) {
517 printf("em_pqueue::max_len: level=%d exceeds capacity=%d\n", i,
518 max_nbuf);
519 return 0;
520 }
521 if (i < crt_buf) {
522 return buff[i]->get_buf_maxlen();
523 }
524 // try allocating buffer
525 em_buffer<T, Key> *tmp = new em_buffer<T, Key>(i + 1, bufsize, buf_arity);
526 if (!tmp) {
527 cout << "em_pqueue::max_len: cannot allocate\n";
528 return 0;
529 }
530 long len = tmp->get_buf_maxlen();
531 delete tmp;
532 return len;
533}
534
535//************************************************************/
536// return maximum capacity of em_pqueue
537template <class T, class Key>
539{
540 long len = 0;
541 for (unsigned short i = 0; i < max_nbuf; i++) {
542 len += maxlen(i);
543 }
544 return len + buff_0->get_buf_maxlen();
545}
546
547//************************************************************/
548// return the total nb of elements in the structure
549template <class T, class Key>
551{
552 // sum up the lengths(nb of elements) of the external buffers
553 unsigned long elen = 0;
554 for (unsigned short i = 0; i < crt_buf; i++) {
555 elen += buff[i]->get_buf_len();
556 }
557 return elen + pq->size() + buff_0->get_buf_len();
558}
559
560//************************************************************/
561// return true if empty
562template <class T, class Key>
564{
565
566 // return (size() == 0);
567 // more efficient?
568 return ((pq->size() == 0) && (buff_0->get_buf_len() == 0) && (size() == 0));
569}
570
571//************************************************************/
572// called when pq must be filled from external buffers
573template <class T, class Key>
575{
576
577#ifndef NDEBUG
578 {
579 int k = 0;
580 for (unsigned short i = 0; i < crt_buf; i++) {
581 k |= buff[i]->get_buf_len();
582 }
583 if (!k) {
584 cerr << "fillpq called with empty external buff!" << endl;
585 }
586 assert(k);
587 }
588#endif
589
590#ifdef EMPQ_PQ_FILL_PRINT
591 cout << "filling pq\n";
592 cout.flush();
593#endif
594 XXX cerr << "filling pq" << endl;
595 MY_LOG_DEBUG_ID("fillpq");
596
597 AMI_err ae;
598 {
599 char str[200];
600 snprintf(str, sizeof(str),
601 "em_pqueue::fillpq: allocate array of %hd AMI_STREAMs\n",
602 crt_buf);
603 MEMORY_LOG(str);
604 }
605 // merge pqsize smallest elements from each buffer into a new stream
607 outstreams = new ExtendedMergeStream *[crt_buf];
608
609 /* gets stream of smallest pqsize elts from each level */
610 for (unsigned short i = 0; i < crt_buf; i++) {
611 MY_LOG_DEBUG_ID(crt_buf);
613 assert(buff[i]->get_buf_len());
614 ae = merge_buffer(buff[i], outstreams[i], pqsize);
617 // print_stream(outstreams[i]); cout.flush();
618 }
619
620 /* merge above streams into pqsize elts in minstream */
621 if (crt_buf == 1) {
622 // just one level; make common case faster :)
623 merge_bufs2pq(outstreams[0]);
624 delete outstreams[0];
625 delete[] outstreams;
626 }
627 else {
628 // merge the outstreams to get the global mins and delete them
629 // afterwards
631 // cout << "merging streams\n";
632 ae = merge_streams(outstreams, crt_buf, minstream, pqsize);
634 for (int i = 0; i < crt_buf; i++) {
635 delete outstreams[i];
636 }
637 delete[] outstreams;
638
639 // copy the minstream in the internal pqueue while merging with buff_0;
640 // the smallest <pqsize> elements between minstream and buff_0 will be
641 // inserted in internal pqueue;
642 // also, the elements from minstram which are inserted in pqueue must be
643 // marked as deleted in the source streams;
644 merge_bufs2pq(minstream);
645 delete minstream;
646 minstream = NULL;
647 // cout << "after merge_bufs2pq: \n" << *this << "\n";
648 }
649
650 XXX assert(pq->size());
651 XXX cerr << "fillpq done" << endl;
652 return true;
653}
654
655//************************************************************/
656// return the element with minimum priority in the structure
657template <class T, class Key>
659{
660
661 bool ok;
662
663 MY_LOG_DEBUG_ID("em_pqueue::min");
664
665 // try first the internal pqueue
666 if (!pq->empty()) {
667 ok = pq->min(elt);
668 assert(ok);
669 return ok;
670 }
671
672 MY_LOG_DEBUG_ID("extract_min: reset pq");
673 pq->reset();
674
675 // if external buffers are empty
676 if (crt_buf == 0) {
677 // if internal buffer is empty too, then nothing to extract
678 if (buff_0->is_empty()) {
679 // cerr << "min called on empty empq" << endl;
680 return false;
681 }
682 else {
683#ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
684 cout << "filling pq from B0\n";
685 cout.flush();
686#endif
687 // ext. buffs empty; fill int pqueue from buff_0
688 long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
689 buff_0->reset(pqsize, n);
690 ok = pq->min(elt);
691 assert(ok);
692 return true;
693 }
694 }
695 else {
696 // external buffers are not empty;
697 // called when pq must be filled from external buffers
698 XXX print_size();
699 fillpq();
700 XXX cerr << "fillpq done; about to take min" << endl;
701 ok = pq->min(elt);
702 XXX cerr << "after taking min" << endl;
703 assert(ok);
704 return ok;
705 } // end of else (if ext buffers are not empty)
706
707 assert(0); // not reached
708}
709
710//************************************************************/
711template <class T, class Key>
712static void print_ExtendedMergeStream(ExtendedMergeStream &str)
713{
714
716 str.seek(0);
717 while (str.read_item(&x) == AMI_ERROR_NO_ERROR) {
718 cout << *x << ", ";
719 }
720 cout << "\n";
721}
722
723//************************************************************/
724// delete the element with minimum priority in the structure;
725// return false if pq is empty
726template <class T, class Key>
728{
729
730 bool ok;
731
732 MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_MIN");
733
734 // try first the internal pqueue
735 if (!pq->empty()) {
736 // cout << "extract from internal pq\n";
737 MY_LOG_DEBUG_ID("extract from internal pq");
738 ok = pq->extract_min(elt);
739 assert(ok);
740 return ok;
741 }
742
743 // if internal pqueue is empty, fill it up
744 MY_LOG_DEBUG_ID("internal pq empty: filling it up from external buffers");
745 MY_LOG_DEBUG_ID("extract_min: reset pq");
746 pq->reset();
747
748 // if external buffers are empty
749 if (crt_buf == 0) {
750 // if internal buffer is empty too, then nothing to extract
751 if (buff_0->is_empty()) {
752 return false;
753 }
754 else {
755#ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
756 cout << "filling pq from B0\n";
757 cout.flush();
758#endif
759 MY_LOG_DEBUG_ID("filling pq from buff_0");
760 // ext. buffs empty; fill int pqueue from buff_0
761 long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
762 buff_0->reset(pqsize, n);
763 ok = pq->extract_min(elt);
764 assert(ok);
765 return true;
766 }
767 }
768 else {
769 // external buffers are not empty;
770 // called when pq must be filled from external buffers
771 MY_LOG_DEBUG_ID("filling pq from buffers");
772#ifdef EMPQ_PRINT_SIZE
773 long x = size(), y;
774 y = x * sizeof(T) >> 20;
775 cout << "pqsize:[" << active_streams() << " streams: ";
776 print_stream_sizes();
777 cout << " total " << x << "(" << y << "MB)]" << endl;
778 cout.flush();
779#endif
780 fillpq();
781 MY_LOG_DEBUG_ID("pq filled");
782 XXX cerr << "about to get the min" << endl;
783 assert(pq);
784 ok = pq->extract_min(elt);
785 if (!ok) {
786 cout << "failing assertion: pq->extract_min == true\n";
787 this->print();
788 assert(ok);
789 }
790
791 return ok;
792 } // end of else (if ext buffers are not empty)
793
794 assert(0); // not reached
795}
796
797//************************************************************/
798// extract all elts with min key, add them and return their sum
799// delete the element with minimum priority in the structure;
800// return false if pq is empty
801template <class T, class Key>
803{
804 // cout << "em_pqueue::extract_min_all(T): sorry not implemented\n";
805 // exit(1);
806
807 T next_elt;
808 bool done = false;
809
810 MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_ALL_MIN");
811
812 // extract first elt
813 if (!extract_min(elt)) {
814 return false;
815 }
816 else {
817 while (!done) {
818 // peek at the next min elt to see if matches
819 if ((!min(next_elt)) ||
820 !(next_elt.getPriority() == elt.getPriority())) {
821 done = true;
822 }
823 else {
824 extract_min(next_elt);
825 elt = elt + next_elt;
826
827 MY_LOG_DEBUG_ID("EXTRACT_ALL_MIN: adding ");
828 MY_LOG_DEBUG_ID(elt);
829 }
830 }
831 }
832
833#ifdef EMPQ_PRINT_EXTRACTALL
834 cout << "EXTRACTED: " << elt << endl;
835 cout.flush();
836#endif
837#ifdef EMPQ_PRINT_EMPQ
838 this->print();
839 cout << endl;
840#endif
841 return true;
842}
843
844//************************************************************/
845// copy the minstream in the internal pqueue while merging with buff_0;
846// the smallest <pqsize> elements between minstream and buff_0 will be
847// inserted in internal pqueue;
848// also, the elements from minstram which are inserted in pqueue must be
849// marked as deleted in the source streams;
850template <class T, class Key>
852{
853
854 // cout << "bufs2pq: \nminstream: "; print_stream(minstream);
855 MY_LOG_DEBUG_ID("merge_bufs2pq: enter");
856
857 AMI_err ae;
858
859 // sort the internal buffer
860 buff_0->sort();
861 // cout << "bufs2pq: \nbuff0: " << *buff_0 << endl;
862
863 ae = minstream->seek(0); // rewind minstream
865
866 bool strEmpty = false, bufEmpty = false;
867
868 unsigned int bufPos = 0;
870 T bufElt, strElt;
871
874 strEmpty = true;
875 }
876 else {
878 }
879 if (bufPos < buff_0->get_buf_len()) {
880 bufElt = buff_0->get_item(bufPos);
881 }
882 else {
883 // cout << "buff0 empty\n";
884 bufEmpty = true;
885 }
886
887 XXX cerr << "pqsize=" << pqsize << endl;
888 XXX if (strEmpty) cerr << "stream is empty!!" << endl;
889 for (unsigned int i = 0; i < pqsize; i++) {
890
891 if (!bufEmpty) {
892 if ((!strEmpty) && (strElt = strItem->elt(),
893 bufElt.getPriority() > strElt.getPriority())) {
894 delete_str_elt(strItem->buffer_id(), strItem->stream_id());
895 pq->insert(strElt);
898 strEmpty = true;
899 }
900 else {
902 }
903 }
904 else {
905 bufPos++;
906 pq->insert(bufElt);
907 if (bufPos < buff_0->get_buf_len()) {
908 bufElt = buff_0->get_item(bufPos);
909 }
910 else {
911 bufEmpty = true;
912 }
913 }
914 }
915 else {
916 if (!strEmpty) { // stream not empty
917 strElt = strItem->elt();
918 // cout << "i=" << i << "str & buff empty\n";
919 delete_str_elt(strItem->buffer_id(), strItem->stream_id());
920 pq->insert(strElt);
921 // cout << "insert " << strElt << "\n";
924 strEmpty = true;
925 }
926 else {
928 }
929 }
930 else { // both empty: < pqsize items
931 break;
932 }
933 }
934 }
935
936 // shift left buff_0 in case elements were deleted from the beginning
937 buff_0->shift_left(bufPos);
938
939 MY_LOG_DEBUG_ID("pq filled");
940#ifdef EMPQ_PQ_FILL_PRINT
941 cout << "merge_bufs2pq: pq filled; now cleaning\n";
942 cout.flush();
943#endif
944 // this->print();
945 // clean buffers in case some streams have been emptied
946 cleanup();
947
948 MY_LOG_DEBUG_ID("merge_bufs2pq: done");
949}
950
951//************************************************************/
952// deletes one element from <buffer, stream>
953template <class T, class Key>
954void em_pqueue<T, Key>::delete_str_elt(unsigned short buf_id,
955 unsigned int stream_id)
956{
957
958 // check them
959 assert(buf_id < crt_buf);
960 assert(stream_id < buff[buf_id]->get_nbstreams());
961 // update;
962 buff[buf_id]->incr_deleted(stream_id);
963}
964
965//************************************************************/
966// clean buffers in case some streams have been emptied
967template <class T, class Key>
969{
970
971 MY_LOG_DEBUG_ID("em_pqueue::cleanup()");
972#ifdef EMPQ_PQ_FILL_PRINT
973 cout << "em_pqueue: cleanup enter\n";
974 cout.flush();
975#endif
976 // adjust buffers in case whole streams got deleted
977 for (unsigned short i = 0; i < crt_buf; i++) {
978 // cout << "clean buffer " << i << ": "; cout.flush();
979 buff[i]->cleanup();
980 }
981 if (crt_buf) {
982 short i = crt_buf - 1;
983 while ((i >= 0) && buff[i]->is_empty()) {
984 crt_buf--;
985 i--;
986 }
987 }
988#ifdef EMPQ_PQ_FILL_PRINT
989 cout << "em_pqueue: cleanup done\n";
990 cout.flush();
991#endif
992 // if a stream becomes too short move it on previous level
993 // to be added..
994 // cout <<"done cleaning up\n";
995}
996
997//************************************************************/
998// insert an element; return false if insertion fails
999template <class T, class Key>
1001{
1002 bool ok;
1003#ifdef EMPQ_ASSERT_EXPENSIVE
1004 long init_size = size();
1005#endif
1006 T val = x;
1007
1008 MY_LOG_DEBUG_ID("\nEM_PQUEUE::INSERT");
1009 // if structure is empty insert x in pq; not worth the trouble..
1010 if ((crt_buf == 0) && (buff_0->is_empty())) {
1011 if (!pq->full()) {
1012 MY_LOG_DEBUG_ID("insert in pq");
1013 pq->insert(x);
1014 return true;
1015 }
1016 }
1017 if (!pq->empty()) {
1018 T pqmax;
1019
1020 ok = pq->max(pqmax);
1021 assert(ok);
1022 // cout << "insert " << x << " max: " << pqmax << "\n";
1023 if (x <= pqmax) {
1024 // if x is smaller then pq_max and pq not full, insert x in pq
1025 if (!pq->full()) {
1026#ifdef EMPQ_ASSERT_EXPENSIVE
1027 assert(size() == init_size);
1028#endif
1029 pq->insert(x);
1030 return true;
1031 }
1032 else {
1033 // if x is smaller then pq_max and pq full, exchange x with
1034 // pq_max
1035 pq->extract_max(val);
1036 pq->insert(x);
1037 // cout << "max is: " << val << endl;
1038 }
1039 }
1040 }
1041 /* at this point, x >= pqmax.
1042 we need to insert val==x or val==old max.
1043 */
1044
1045 // if buff_0 full, empty it
1046#ifdef EMPQ_ASSERT_EXPENSIVE
1047 assert(size() == init_size);
1048#endif
1049 if (buff_0->is_full()) {
1050#ifdef EMPQ_PRINT_SIZE
1051 long x = size(), y;
1052 y = x * sizeof(T) >> 20;
1053 cout << "pqsize:[" << active_streams() << " streams: ";
1054 print_stream_sizes();
1055 cout << " total " << x << "(" << y << "MB)]" << endl;
1056 cout.flush();
1057#endif
1058 empty_buff_0();
1059 }
1060#ifdef EMPQ_ASSERT_EXPENSIVE
1061 assert(size() == init_size);
1062#endif
1063 // insert x in buff_0
1064 assert(!buff_0->is_full());
1065 MY_LOG_DEBUG_ID("insert in buff_0");
1066 ok = buff_0->insert(val);
1067 assert(ok);
1068
1069#ifdef EMPQ_PRINT_INSERT
1070 cout << "INSERTED: " << x << endl;
1071 cout.flush();
1072#endif
1073#ifdef EMPQ_PRINT_EMPQ
1074 this->print();
1075 cout << endl;
1076#endif
1077 MY_LOG_DEBUG_ID("EM_PQUEUE: INSERTED");
1078 return true;
1079}
1080
1081//************************************************************/
1082/* called when buff_0 is full to empty it on external level_1 buffer;
1083 can produce cascading emptying
1084*/
1085template <class T, class Key>
1087{
1088#ifdef EMPQ_ASSERT_EXPENSIVE
1089 long init_size = size();
1090#endif
1091
1092#ifdef EMPQ_EMPTY_BUF_PRINT
1093 cout << "emptying buff_0\n";
1094 cout.flush();
1095 print_size();
1096#endif
1097 MY_LOG_DEBUG_ID("empty buff 0");
1098
1099 assert(buff_0->is_full());
1100
1101 // sort the buffer
1102 buff_0->sort();
1103 // cout << "sorted buff_0: \n" << *buff_0 << "\n";
1104#ifdef EMPQ_ASSERT_EXPENSIVE
1105 assert(size() == init_size);
1106#endif
1107 // allocate buffer if necessary
1108 if (!buff[0]) { // XXX should check crt_buf
1109 // create buff[0] as a level1 buffer
1110 MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
1111 buff[0] = new em_buffer<T, Key>(1, bufsize, buf_arity);
1112 }
1113 // check that buff_0 fills exactly a stream of buff[0]
1114 assert(buff_0->get_buf_len() == buff[0]->get_stream_maxlen());
1115
1116 // save buff_0 to stream
1117 MY_LOG_DEBUG_ID("empty buff_0 to stream");
1118 AMI_STREAM<T> *buff_0_str = buff_0->save2str();
1120 // MY_LOG_DEBUG_ID("buff_0 emptied");
1121
1122 // reset buff_0
1123 buff_0->reset();
1124 MY_LOG_DEBUG_ID("buf_0 now reset");
1125
1126#ifdef EMPQ_ASSERT_EXPENSIVE
1127 assert(size() + buff_0->maxlen() == init_size);
1128#endif
1129
1130 // copy data from buff_0 to buff[0]
1131 if (buff[0]->is_full()) {
1132 // if buff[0] full, empty it recursively
1133 empty_buff(0);
1134 }
1135 buff[0]->insert(buff_0_str);
1136 MY_LOG_DEBUG_ID("stream inserted in buff[0]");
1137
1138 // update the crt_buf pointer if necessary
1139 if (crt_buf == 0)
1140 crt_buf = 1;
1141
1142#ifdef EMPQ_ASSERT_EXPENSIVE
1143 assert(size() == init_size);
1144#endif
1145
1146 return true;
1147}
1148
1149//************************************************************/
1150/* sort and empty buff[i] into buffer[i+1] recursively; called
1151 by empty_buff_0() to empty subsequent buffers; i must
1152 be a valid (i<crt_buf) full buffer;
1153*/
1154template <class T, class Key>
1155void em_pqueue<T, Key>::empty_buff(unsigned short i)
1156{
1157
1158#ifdef EMPQ_ASSERT_EXPENSIVE
1159 long init_size = size();
1160#endif
1161#ifdef EMPQ_EMPTY_BUF_PRINT
1162 cout << "emptying buffer_" << i << "\n";
1163 cout.flush();
1164 print_size();
1165#endif
1166 MY_LOG_DEBUG_ID("empty buff ");
1167 MY_LOG_DEBUG_ID(i);
1168
1169 // i must be a valid, full buffer
1170 assert(i < crt_buf);
1171 assert(buff[i]->is_full());
1172
1173 // check there is space to empty to
1174 if (i == max_nbuf - 1) {
1175 cerr << "empty_buff:: cannot empty further - structure is full..\n";
1176 print_size();
1177 cerr << "ext buff array should reallocate in a future version..\n";
1178 exit(1);
1179 }
1180
1181 // create next buffer if necessary
1182 if (!buff[i + 1]) {
1183 // create buff[i+1] as a level-(i+2) buffer
1184 char str[200];
1185 snprintf(str, sizeof(str),
1186 "em_pqueue::empty_buff( %hd ) allocate new em_buffer\n", i);
1187 MEMORY_LOG(str);
1188 buff[i + 1] = new em_buffer<T, Key>(i + 2, bufsize, buf_arity);
1189 }
1190 assert(buff[i + 1]);
1191 // check that buff[i] fills exactly a stream of buff[i+1];
1192 // extraneous (its checked in insert)
1193 // assert(buff[i]->len() == buff[i+1]->streamlen());
1194
1195 // sort the buffer into a new stream
1196 MY_LOG_DEBUG_ID("sort buffer ");
1197 AMI_STREAM<T> *sorted_buf = buff[i]->sort();
1198
1199 // assert(sorted_buf->stream_len() == buff[i]->len());
1200 // this is just for debugging
1201 if (sorted_buf->stream_len() != buff[i]->get_buf_len()) {
1202 cout << "sorted_stream_len: " << sorted_buf->stream_len()
1203 << " , bufflen: " << buff[i]->get_buf_len() << endl;
1204 cout.flush();
1205 AMI_err ae;
1206 ae = sorted_buf->seek(0);
1208 T *x;
1210 cout << *x << ", ";
1211 cout.flush();
1212 }
1213 cout << "\n";
1214#ifdef EMPQ_ASSERT_EXPENSIVE
1215 assert(sorted_buf->stream_len() == buff[i]->len());
1216#endif
1217 }
1218#ifdef EMPQ_ASSERT_EXPENSIVE
1219 assert(size() == init_size);
1220#endif
1221 // reset buff[i] (delete all its streams )
1222 buff[i]->reset();
1223#ifdef EMPQ_ASSERT_EXPENSIVE
1224 assert(size() == init_size - sorted_buf->stream_len());
1225#endif
1226
1227 // link sorted buff[i] as a substream into buff[i+1];
1228 // sorted_buf is a new stream, so it starts out with 0 deleted elements;
1229 // of ocurse, its length might be smaller than nominal;
1230 if (buff[i + 1]->is_full()) {
1231 empty_buff(i + 1);
1232 }
1233 buff[i + 1]->insert(sorted_buf, 0);
1234
1235 // update the crt_buf pointer if necessary
1236 if (crt_buf < i + 2)
1237 crt_buf = i + 2;
1238
1239#ifdef EMPQ_ASSERT_EXPENSIVE
1240 assert(size() == init_size);
1241#endif
1242}
1243
1244//************************************************************/
1245/* merge the first <K> elements of the streams of input buffer,
1246 starting at position <buf.deleted[i]> in each stream; there are
1247 <buf.arity> streams in total; write output in <outstream>; the
1248 items written in outstream are of type <merge_output_type> which
1249 extends T with the stream nb and buffer nb the item comes from;
1250 this information is needed later to distribute items back; do not
1251 delete the K merged elements from the input streams; <bufid> is the
1252 id of the buffer whose streams are being merged;
1253
1254 the input streams are assumed sorted in increasing order of keys;
1255*/
1256template <class T, class Key>
1259{
1260 long *bos = buf->get_bos();
1261 /* buff[0] is a level-1 buffer and so on */
1262 unsigned short bufid = buf->get_level() - 1;
1263 /* Pointers to current leading elements of streams */
1264 unsigned int arity = buf->get_nbstreams();
1265 AMI_STREAM<T> **instreams = buf->get_streams();
1268 unsigned int i, j;
1269
1270 MY_LOG_DEBUG_ID("merge_buffer ");
1271 MY_LOG_DEBUG_ID(buf->get_level());
1272
1275 assert(buf->get_buf_len());
1276 assert(K > 0);
1277
1278 // array initialized with first key from each stream (only non-null keys
1279 // must be included)
1280 MEMORY_LOG("em_pqueue::merge_buffer: allocate keys array\n");
1281 merge_key<Key> *keys = new merge_key<Key>[arity];
1282
1283 /* count number of non-empty runs */
1284 j = 0;
1285 /* rewind and read the first item from every stream */
1286 for (i = 0; i < arity; i++) {
1287 assert(instreams[i]);
1288 // rewind stream
1289 if ((ami_err = instreams[i]->seek(bos[i])) != AMI_ERROR_NO_ERROR) {
1290 cerr << "WARNING!!! EARLY EXIT!!!" << endl;
1291 return ami_err;
1292 }
1293 /* read first item */
1295 switch (ami_err) {
1297 in_objects[i] = NULL;
1298 break;
1299 case AMI_ERROR_NO_ERROR:
1300 // cout << "stream " << i << " read " << *in_objects[i] << "\n";
1301 // cout.flush();
1302 // include this key in the array of keys
1303 keys[j] = merge_key<Key>(in_objects[i]->getPriority(), i);
1304 // cout << "key " << j << "set to " << keys[j] << "\n";
1305 j++;
1306 break;
1307 default:
1308 cerr << "WARNING!!! EARLY EXIT!!!" << endl;
1309 return ami_err;
1310 }
1311 }
1312 unsigned int NonEmptyRuns = j;
1313 // cout << "nonempyruns = " << NonEmptyRuns << "\n";
1314
1315 // build heap from the array of keys
1317
1318 // cout << "heap is : " << mergeheap << "\n";
1319 // repeatedly extract_min from heap and insert next item from same stream
1320 long extracted = 0;
1321 // rewind output buffer
1322 ami_err = outstream->seek(0);
1325 while (!mergeheap.empty() && (extracted < K)) {
1326 // find min key and id of stream it comes from
1327 i = mergeheap.min().stream_id();
1328 // write min item to output stream
1331 cerr << "WARNING!!! EARLY EXIT!!!" << endl;
1332 return ami_err;
1333 }
1334 // cout << "wrote " << out << "\n";
1335 extracted++; // update nb of extracted elements
1336 // read next item from same input stream
1338 switch (ami_err) {
1340 mergeheap.delete_min();
1341 break;
1342 case AMI_ERROR_NO_ERROR:
1343 // extract the min from the heap and insert next key from the
1344 // same stream
1345 {
1346 Key k = in_objects[i]->getPriority();
1347 mergeheap.delete_min_and_insert(merge_key<Key>(k, i));
1348 }
1349 break;
1350 default:
1351 cerr << "WARNING!!! early breakout!!!" << endl;
1352 return ami_err;
1353 }
1354 // cout << "PQ: " << mergeheap << "\n";
1355 } // while
1356
1357 // delete [] keys;
1358 //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
1359 // DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
1360 // IF I DELETE KEYS EXPLICITLY, THEY WILL BE DELETED AGAIN BY
1361 // DESTRUCTOR, AND EVERYTHING SCREWS UP..
1362
1363 buf->put_streams();
1364 MY_LOG_DEBUG_ID("merge_buffer: done");
1365 // cout << "done merging buffer\n";
1366
1368 assert(extracted); // something in, something out
1369 return AMI_ERROR_NO_ERROR;
1370}
1371
1372//************************************************************/
1373/* merge the first <K> elements of the input streams; there are <arity>
1374 streams in total; write output in <outstream>;
1375
1376 the input streams are assumed sorted in increasing order of their
1377 keys;
1378*/
1379template <class T, class Key>
1381 unsigned short arity,
1383{
1384
1385 MY_LOG_DEBUG_ID("enter merge_streams");
1386 assert(arity > 1);
1387
1388 // Pointers to current leading elements of streams
1390
1392 // unsigned int i;
1393 unsigned int nonEmptyRuns = 0; // count number of non-empty runs
1394
1395 // array initialized with first element from each stream (only non-null keys
1396 // must be included)
1397 MEMORY_LOG("em_pqueue::merge_streams: allocate keys array\n");
1398
1399 merge_key<Key> *keys = new merge_key<Key>[arity];
1400 assert(keys);
1401
1402 // rewind and read the first item from every stream
1403 for (int i = 0; i < arity; i++) {
1404 // rewind stream
1405 if ((ami_err = instreams[i]->seek(0)) != AMI_ERROR_NO_ERROR) {
1406 return ami_err;
1407 }
1408 // read first item
1411 switch (ami_err) {
1412 case AMI_ERROR_NO_ERROR:
1413 in_objects[i] = *objp;
1414 keys[nonEmptyRuns] = merge_key<Key>(in_objects[i].getPriority(), i);
1415 nonEmptyRuns++;
1416 break;
1418 break;
1419 default:
1420 return ami_err;
1421 }
1422 }
1423 assert(nonEmptyRuns <= arity);
1424
1425 // build heap from the array of keys
1427 keys, nonEmptyRuns); /* takes ownership of keys */
1428
1429 // repeatedly extract_min from heap and insert next item from same stream
1430 long extracted = 0;
1431 // rewind output buffer
1432 ami_err = outstream->seek(0);
1434
1435 while (!mergeheap.empty() && (extracted < K)) {
1436 // find min key and id of stream it comes from
1437 int id = mergeheap.min().stream_id();
1438 // write min item to output stream
1439 assert(id < nonEmptyRuns);
1440 assert(id >= 0);
1441 assert(mergeheap.size() == nonEmptyRuns);
1444 return ami_err;
1445 }
1446 // cout << "wrote " << *in_objects[i] << "\n";
1447
1448 // extract the min from the heap and insert next key from same stream
1449 assert(id < nonEmptyRuns);
1450 assert(id >= 0);
1453 switch (ami_err) {
1454 case AMI_ERROR_NO_ERROR: {
1455 in_objects[id] = *objp;
1456 merge_key<Key> tmp =
1457 merge_key<Key>(in_objects[id].getPriority(), id);
1458 mergeheap.delete_min_and_insert(tmp);
1459 }
1460 extracted++; // update nb of extracted elements
1461 break;
1463 mergeheap.delete_min();
1464 break;
1465 default:
1466 return ami_err;
1467 }
1468 } // while
1469
1470 // delete [] keys;
1471 //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
1472 // DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
1473 // IF I DELETE KEYS EXPLICITLY, THEY WILL BE DELETED AGAIN BY
1474 // DESTRUCTOR, AND EVERYTHING SCREWS UP..
1475
1476 MY_LOG_DEBUG_ID("merge_streams: done");
1477 return AMI_ERROR_NO_ERROR;
1478}
1479
1480//************************************************************/
1481template <class T, class Key>
1483{
1484 pq->clear();
1485 buff_0->clear();
1486
1487 for (int i = 0; i < crt_buf; i++) {
1488 if (buff[i]) {
1489 delete buff[i];
1490 buff[i] = NULL;
1491 }
1492 }
1493 crt_buf = 0;
1494}
1495
1496//************************************************************/
1497template <class T, class Key>
1499{
1500 cout << "EM_PQ: [pq=" << pqsize << ", b=" << bufsize
1501 << ", bufs=" << max_nbuf << ", ar=" << buf_arity << "]\n";
1502
1503 cout << "PQ: ";
1504 // pq->print_range();
1505 pq->print();
1506 cout << endl;
1507
1508 cout << "B0: ";
1509 // buff_0->print_range();
1510 buff_0->print();
1511 cout << "\n";
1512
1513 for (unsigned short i = 0; i < crt_buf; i++) {
1514 cout << "B" << i + 1 << ": ";
1515 buff[i]->print_range();
1516 cout << endl;
1517 }
1518 cout.flush();
1519}
1520
1521//************************************************************/
1522template <class T, class Key>
1524{
1525 cout << "EM_PQ: [pq=" << pqsize << ", b=" << bufsize
1526 << ", bufs=" << max_nbuf << ", ar=" << buf_arity << "]\n";
1527
1528 cout << "PQ: ";
1529 pq->print();
1530 cout << endl;
1531
1532 cout << "B0: ";
1533 buff_0->print();
1534 cout << "\n";
1535
1536 for (unsigned short i = 0; i < crt_buf; i++) {
1537 cout << "B" << i + 1 << ": " << endl;
1538 buff[i]->print();
1539 cout << endl;
1540 }
1541 cout.flush();
1542}
1543
1544//************************************************************/
1545template <class T, class Key>
1547{
1548 // sum up the lengths(nb of elements) of the external buffers
1549 long elen = 0;
1550 cout << "EMPQ: pq=" << pq->size() << ",B0=" << buff_0->get_buf_len()
1551 << endl;
1552 cout.flush();
1553 for (unsigned short i = 0; i < crt_buf; i++) {
1554 assert(buff[i]);
1555 cout << "B_" << i + 1 << ":";
1556 cout.flush();
1557 buff[i]->print_stream_sizes();
1558 elen += buff[i]->get_buf_len();
1559 // cout << endl; cout.flush();
1560 }
1561 cout << "total: " << elen + pq->size() + buff_0->get_buf_len() << endl
1562 << endl;
1563 cout.flush();
1564}
1565
1566/*****************************************************************/
1567template <class T, class Key>
1569{
1570 for (unsigned short i = 0; i < crt_buf; i++) {
1571 cout << "[";
1572 buff[i]->print_stream_sizes();
1573 cout << "]";
1574 }
1575 cout.flush();
1576}
1577
1578#undef XXX
1579
1580#endif
#define MAX_STREAMS_OPEN
Definition ami_stream.h:63
AMI_err
Definition ami_stream.h:83
@ AMI_ERROR_END_OF_STREAM
Definition ami_stream.h:86
@ AMI_ERROR_NO_ERROR
Definition ami_stream.h:84
#define NULL
Definition ccmath.h:32
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition ami_stream.h:477
AMI_err write_item(const T &elt)
Definition ami_stream.h:588
AMI_err seek(off_t offset)
Definition ami_stream.h:445
AMI_err read_item(T **elt)
Definition ami_stream.h:525
off_t stream_len(void)
Definition ami_stream.h:375
friend int operator<=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition empq_impl.h:106
friend ostream & operator<<(ostream &s, const ExtendedEltMergeType< T, Key > &elt)
Definition empq_impl.h:94
unsigned int stream_id() const
Definition empq_impl.h:91
friend int operator!=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition empq_impl.h:121
friend int operator>(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition empq_impl.h:111
friend int operator>=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition empq_impl.h:116
ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid)
Definition empq_impl.h:76
unsigned short buffer_id() const
Definition empq_impl.h:90
friend int operator<(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition empq_impl.h:101
Key getPriority() const
Definition empq_impl.h:92
void set(T &e, unsigned short bid, unsigned int sid)
Definition empq_impl.h:83
friend int operator==(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition empq_impl.h:126
void clear()
Definition empq_impl.h:1482
void print_stream_sizes()
Definition empq_impl.h:1568
void print()
Definition empq_impl.h:1523
void delete_str_elt(unsigned short buf_id, unsigned int stream_id)
Definition empq_impl.h:954
bool fillpq()
Definition empq_impl.h:574
void print_range()
Definition empq_impl.h:1498
bool insert(const T &elt)
Definition empq_impl.h:1000
AMI_err merge_buffer(em_buffer< T, Key > *buf, AMI_STREAM< ExtendedEltMergeType< T, Key > > *outstr, long K)
Definition empq_impl.h:1257
bool extract_min(T &elt)
Definition empq_impl.h:727
bool min(T &elt)
Definition empq_impl.h:658
long maxlen()
Definition empq_impl.h:538
AMI_err merge_streams(AMI_STREAM< ExtendedEltMergeType< T, Key > > **instr, unsigned short arity, AMI_STREAM< ExtendedEltMergeType< T, Key > > *outstr, long K)
Definition empq_impl.h:1380
unsigned long size()
Definition empq_impl.h:550
void empty_buff(unsigned short i)
Definition empq_impl.h:1155
bool extract_all_min(T &elt)
Definition empq_impl.h:802
void cleanup()
Definition empq_impl.h:968
void merge_bufs2pq(AMI_STREAM< ExtendedEltMergeType< T, Key > > *minstream)
Definition empq_impl.h:851
bool is_empty()
Definition empq_impl.h:563
void print_size()
Definition empq_impl.h:1546
bool empty_buff_0()
Definition empq_impl.h:1086
#define min(x, y)
Definition draw2.c:29
#define MY_LOG_DEBUG_ID(x)
Definition embuffer.h:50
#define ExtendedMergeStream
Definition empq.h:53
#define XXX
Definition empq_impl.h:52
#define assert(condition)
Definition lz4.c:291
@ MM_STREAM_USAGE_MAXIMUM
Definition mm.h:79
void MEMORY_LOG(const std::string &str)
Definition mm_utils.cpp:59
void LOG_avail_memo()
Definition mm_utils.cpp:45
size_t getAvailableMemory()
Definition mm_utils.cpp:52
#define x