GRASS 8 Programmer's Manual 8.6.0dev(2026)-ddeab64dbf
Loading...
Searching...
No Matches
embuffer.h
Go to the documentation of this file.
1/****************************************************************************
2 *
3 * MODULE: iostream
4 *
5
6 * COPYRIGHT (C) 2007 Laura Toma
7 *
8 *
9
10 * Iostream is a library that implements streams, external memory
11 * sorting on streams, and an external memory priority queue on
12 * streams. These are the fundamental components used in external
13 * memory algorithms.
14
15 * Credits: The library was developed by Laura Toma. The kernel of
16 * class STREAM is based on the similar class existent in the GPL TPIE
17 * project developed at Duke University. The sorting and priority
18 * queue have been developed by Laura Toma based on communications
19 * with Rajiv Wickremesinghe. The library was developed as part of
20 * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21 * Rajiv Wickremesinghe as part of the Terracost project.
22
23 *
24 * This program is free software; you can redistribute it and/or modify
25 * it under the terms of the GNU General Public License as published by
26 * the Free Software Foundation; either version 2 of the License, or
27 * (at your option) any later version.
28 *
29
30 * This program is distributed in the hope that it will be useful,
31 * but WITHOUT ANY WARRANTY; without even the implied warranty of
32 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33 * General Public License for more details. *
34 * **************************************************************************/
35
36#ifndef __EMBUFFER_H
37#define __EMBUFFER_H
38
39#include <stdio.h>
40#include <assert.h>
41#include <stdlib.h>
42#include <math.h>
43
44#include "ami_config.h" //for SAVE_MEMORY
45#include "ami_stream.h"
46#include "mm.h"
47#include "mm_utils.h"
48#include "pqheap.h"
49
50#define MY_LOG_DEBUG_ID(x) // inhibit debug printing
51// #define MY_LOG_DEBUG_ID(x) LOG_DEBUG_ID(x)
52
53/**********************************************************
54 DEBUGGING FLAGS
55***********************************************************/
56
57// setting this enables checking that the streams/arrays inserted in
58// buffers are sorted in increasing order
59// #define EMBUF_CHECK_INSERT
60
61// enable checking that stream name is the same as the one stored in
62// the buffer name[]
63// #define EMBUF_CHECK_NAME
64
65// enable printing names as they are checked
66// #define EMBUF_CHECK_NAME_PRINT
67
68// enable printing when streams in a buffer are shifted left to
69// check that names are shifted accordingly
70// #define EMBUF_DELETE_STREAM_PRINT
71
72// enable printing the name of the stream which is inserted in buff
73// #define EMBUF_PRINT_INSERT
74
75// enable printing the stream names/sizes in cleanup()
76// #define EMBUF_CLEANUP_PRINT
77
78// enable printing when get/put_stream is called (for each stream)
79// #define EMBUF_PRINT_GETPUT_STREAM
80
81// enable printing when get/put_streams is called
82// #define EMBUF_PRINT_GETPUT_STREAMS
83
84/***********************************************************/
85
86/*****************************************************************/
87/* encapsulation of the key together with stream_id; used during
88 stream merging to save space;
89*/
90template <class KEY>
91class merge_key {
92public:
94 unsigned int str_id; // id of the stream where key comes from
95
96public:
97 merge_key() : str_id(0) {}
98
99 merge_key(const KEY &x, const unsigned int sid) : k(x), str_id(sid) {}
100
102
103 void set(const KEY &x, const unsigned int sid)
104 {
105 k = x;
106 str_id = sid;
107 }
108 KEY key() const { return k; }
109 unsigned int stream_id() const { return str_id; }
110 KEY getPriority() const { return k; }
111
112 friend ostream &operator<<(ostream &s, const merge_key<KEY> &x)
113 {
114 return s << "<str_id=" << x.str_id << "> " << x.k << " ";
115 }
116 friend int operator<(const merge_key &x, const merge_key &y)
117 {
118 return (x.k < y.k);
119 }
120 friend int operator<=(const merge_key &x, const merge_key &y)
121 {
122 return (x.k <= y.k);
123 }
124 friend int operator>(const merge_key &x, const merge_key &y)
125 {
126 return (x.k > y.k);
127 }
128 friend int operator>=(const merge_key &x, const merge_key &y)
129 {
130 return (x.k >= y.k);
131 }
132 friend int operator!=(const merge_key &x, const merge_key &y)
133 {
134 return (x.k != y.k);
135 }
136 friend int operator==(const merge_key &x, const merge_key &y)
137 {
138 return (x.k == y.k);
139 }
140 friend merge_key operator+(const merge_key &x, const merge_key &y UNUSED)
141 {
142 assert(0);
143 return x;
144 // Key sum = x.k + y.k;
145 // merge_key f(sum, x.str_id);
146 // return f;
147 }
148};
149
150/*****************************************************************
151 *****************************************************************
152 *****************************************************************
153
154 external_memory buffer
155
156 Each level-i buffer can store up to <arity>^i * <basesize> items,
157 where typically <arity> is \theta(m) and <basesize> is \theta(M);
158 therefore log_m{n/m} buffers are needed to store N items, one
159 buffer for each level 1..log_m{n/m}. All buffers must have same
160 values or <arity> and <basesize>.
161
162 Functionality:
163
164 A level-i on-disk buffer stores <arity>^i * <basesize> items of
165 data, organized in <arity> streams of <arity>^{i-1} items each;
166 <basesize> is same for all buffers and equal to the size of the
167 level 0 buffer (in memory buffer).
168
169 Invariant: all the <arity> streams of a level-i buffer are in
170 sorted order; in this way sorting the buffer is done by merging the
171 <arity> streams in linear time.
172
173 Items are inserted in level i-buffer only a whole stream at a time
174 (<arity>^{i-1}*<basesize> items). When all the <arity> streams of
175 the buffer are full, the buffer is sorted and emptied into a stream
176 of a level (i+1)-buffer.
177
178 The <arity> streams of a buffer are allocated contigously from left
179 to r ight. The unused streams are NULL; The buffer keeps the index of
180 the last used(non-NULL) stream. When a buffer becomes full and is
181 empty, all its buffers are set to NULL.
182
183 *****************************************************************
184 *****************************************************************
185 ***************************************************************** */
186
187/* T is a type with priority of type K and method getPriority() */
188template <class T, class Key>
190private:
191 // number of streams in a buffer;
192 unsigned int arity;
193
194 // level of buffer: between 1 and log_arity{n/arity}; (level-0 buffer
195 // has a slightly different behaviour so it is implemented as a
196 // different class <im_buffer>)
197 unsigned short level;
198
199 // level-i buffer contains m streams of data, each of size
200 // arity^{i-1}*basesize;
201 AMI_STREAM<T> **data;
202
203 // the buffers can be depleted to fill the internal pq;
204 // keep an array which counts, for each stream, how many elements
205 // have been deleted (implicitly from the beginning of stream)
206 long *deleted;
207
208 // nb of items in each substream; this can be found out by calling
209 // stream_len() on the stream, but it is more costly esp in the case
210 // when streams are on disk and must be moved in and out just to find
211 // stream length; streamsize is set only at stream creation, and the
212 // actual size must subtract the number of iteme deleted from the
213 // bos
214 unsigned long *streamsize;
215
216 // index of the next available(empty) stream (out of the total m
217 // streams in the buffer);
218 unsigned int index;
219
220 // nb of items in a stream of level_1 buffer
221 unsigned long basesize;
222
223public:
224 // create a level-i buffer of given basesize;
225 em_buffer(const unsigned short i, const unsigned long bs,
226 const unsigned int ar);
227
228 // copy constructor;
229 em_buffer(const em_buffer &buf);
230
231 // free the stream array and the streams pointers
232 ~em_buffer();
233
234 // return the level of the buffer;
235 unsigned short get_level() const { return level; }
236
237 // return the ith stream (load stream in memory)
238 AMI_STREAM<T> *get_stream(unsigned int i);
239
240 // return a pointer to the streams of the buffer (loads streams in
241 // memory)
243
244 // put the ith stream back to disk
245 void put_stream(unsigned int i);
246
247 // called in pair with get_streams to put all streams back to disk
248 void put_streams();
249
250 // return a pointer to the array of deletion count for each stream
251 long *get_bos() const { return deleted; }
252
253 // return the index of the last stream in buffer which contains data;
254 unsigned int laststream() const { return index - 1; }
255
256 // return the index of the next available stream in the buffer
257 unsigned int nextstream() const { return index; }
258
259 // increment the index of the next available stream in the buffer
260 void incr_nextstream() { ++index; }
261
262 // return nb of (non-empty) streams in buffer
263 unsigned int get_nbstreams() const { return index; }
264
265 // return arity
266 unsigned int get_arity() const { return arity; }
267
268 // return total nb of deleted elements in all active streams of the buffer
269 long total_deleted() const
270 {
271 long tot = 0;
272 for (unsigned int i = 0; i < index; i++) {
273 tot += deleted[i];
274 }
275 return tot;
276 }
277
278 // mark as deleted one more element from i'th stream
279 void incr_deleted(unsigned int i)
280 {
281 assert(i < index);
282 deleted[i]++;
283 }
284
285 // return the nominal size of a stream (nb of items):
286 // arity^{level-1}*basesize;
287 unsigned long get_stream_maxlen() const
288 {
289 return (unsigned long)pow((double)arity, (double)level - 1) * basesize;
290 }
291
292 // return the actual size of stream i; i must be the index of a valid
293 // stream
294 unsigned long get_stream_len(unsigned int i)
295 {
296 // assert(i>= 0 && i<index);
297 return streamsize[i] - deleted[i];
298 }
299
300 // return the total current size of the buffer; account for the
301 // deleted elements;
302 unsigned long get_buf_len()
303 {
304 unsigned long tot = 0;
305 for (unsigned int i = 0; i < index; i++) {
306 tot += get_stream_len(i);
307 }
308 return tot;
309 }
310
311 // return the total maximal capacity of the buffer
312 unsigned long get_buf_maxlen() { return arity * get_stream_maxlen(); }
313
314 // return true if buffer is empty (all streams are empty)
315 bool is_empty() { return ((nextstream() == 0) || (get_buf_len() == 0)); }
316
317 // return true if buffer is full(all streams are full)
318 bool is_full() const { return (nextstream() == arity); }
319
320 // reset
321 void reset();
322
323 // clean buffer: in case some streams have been emptied by deletion
324 // delete them and shift streams left;
325 void cleanup();
326
327 // create and return a stream which contains all elements of all
328 // streams of the buffer in sorted ascending order of their
329 // keys(priorities);
331
332 // insert an array into the buffer; can only insert one
333 // level-i-full-stream-len nb of items at a time; assume the length
334 // of the array is precisely the streamlen of level-i buffer n =
335 // (pow(arity,level-1)*basesize); assume array is sorted; return the
336 // number of items actually inserted
337 long insert(T *a, long n);
338
339 // insert a stream into the buffer; assume the length of the stream
340 // is precisely the streamlen of level-i buffer n =
341 // (pow(arity,level-1)*basesize); the <nextstream> pointer of buffer
342 // is set to point to the argument stream; (in this way no stream
343 // copying is done, just one pointer copy). The user should be aware
344 // the argument stream is 'lost' - that is a stream which cannot be
345 // inserted repeatedly into many buffers because this would lead to
346 // several buffers pointing to the same stream.
347
348 // stream is assumed sorted; bos = how many elements are deleted
349 // from the beginning of stream;
350
351 // return the number of items actually inserted
352 long insert(AMI_STREAM<T> *str, long bos = 0);
353
354 // print range of elements in buffer
355 void print_range();
356
357 // print all elements in buffer
358 void print();
359
360 // prints the sizes of the streams in the buffer
361 void print_stream_sizes();
362
363 // print the elements in the buffer
364 friend ostream &operator<<(ostream &s, em_buffer &b)
365 {
366 s << "BUFFER_" << b.level << ": ";
367 if (b.index == 0) {
368 s << "[]";
369 }
370 s << "\n";
371 b.get_streams();
372 for (unsigned int i = 0; i < b.index; i++) {
373 b.print_stream(s, i);
374 }
375 b.put_streams();
376 return s;
377 }
378
379private:
380 // merge the input streams; there are <arity> streams in total;
381 // write output in <outstream>; the input streams are assumed sorted
382 // in increasing order of their keys;
383 AMI_err substream_merge(AMI_STREAM<T> **instreams, unsigned int arity,
385
386 // print to stream the elements in i'th stream
387 void print_stream(ostream &s, unsigned int i);
388
389#ifdef SAVE_MEMORY
390 // array of names of streams;
391 char **name;
392
393 // return the designated name for stream i
394 char *get_stream_name(unsigned int i) const;
395
396 // print all stream names in buffer
397 void print_stream_names();
398
399 // checks that name[i] is the same as stream name; stream i must be in
400 // memory (by a previous get_stream call, for instance) in order to
401 // find its length
402 void check_name(unsigned int i);
403#endif
404};
405
406/************************************************************/
407// create a level-i buffer of given basesize;
408template <class T, class Key>
409em_buffer<T, Key>::em_buffer(const unsigned short i, const unsigned long bs,
410 const unsigned int ar)
411 : arity(ar), level(i), basesize(bs)
412{
413
414 assert(level >= 1);
415
416 char str[100];
417 snprintf(str, sizeof(str),
418 "em_buffer: allocate %d AMI_STREAM*, total %ld\n", arity,
419 (long)(arity * sizeof(AMI_STREAM<T> *)));
420 MEMORY_LOG(str);
421 // allocate STREAM* array
422 data = new AMI_STREAM<T> *[arity];
423
424 // allocate deleted array
425 snprintf(str, sizeof(str), "em_buffer: allocate deleted array: %ld\n",
426 (long)(arity * sizeof(long)));
427 MEMORY_LOG(str);
428 deleted = new long[arity];
429
430 // allocate streamsize array
431 snprintf(str, sizeof(str), "em_buffer: allocate streamsize array: %ld\n",
432 (long)(arity * sizeof(long)));
433 MEMORY_LOG(str);
434 streamsize = new unsigned long[arity];
435
436#ifdef SAVE_MEMORY
437 // allocate name array
438 snprintf(str, sizeof(str), "em_buffer: allocate name array: %ld\n",
439 (long)(arity * sizeof(char *)));
440 MEMORY_LOG(str);
441 name = new char *[arity];
442 assert(name);
443#endif
444
445 // assert data
446 if ((!data) || (!deleted) || (!streamsize)) {
447 cerr << "em_buffer: cannot allocate\n";
448 exit(1);
449 }
450
451 // initialize the <arity> streams to NULL, deleted[], streamsize[]
452 // and name[]
453 for (unsigned int ui = 0; ui < arity; ui++) {
454 data[ui] = NULL;
455 deleted[ui] = 0;
456 streamsize[ui] = 0;
457#ifdef SAVE_MEMORY
458 name[ui] = NULL;
459#endif
460 }
461 // set index
462 index = 0;
463
464#ifdef SAVE_MEMORY
465 // streams_in_memory = false;
466#endif
467}
468
469/************************************************************/
470// copy constructor;
471template <class T, class Key>
473 : level(buf.level), basesize(buf.basesize), index(buf.index),
474 arity(buf.arity)
475{
476
477 assert(0); // should not get called
478
479 MEMORY_LOG("em_buffer: copy constr start\n");
480 get_streams();
481 for (unsigned int i = 0; i < index; i++) {
482 assert(data[i]);
483 delete data[i]; // delete old stream if existing
484 data[i] = NULL;
485
486 // call copy constructor; i'm not sure that it actually duplicates
487 // the stream and copies the data; should that in the BTE
488 // sometimes..
489 data[i] = new AMI_STREAM<T>(*buf.data[i]);
490 deleted[i] = buf.deleted[i];
491 streamsize[i] = buf.streamsize[i];
492#ifdef SAVE_MEMORY
493 assert(name[i]);
494 delete name[i];
495 name[i] = NULL;
496 name[i] = buf.name[i];
497#endif
498 }
499 put_streams();
500 MEMORY_LOG("em_buffer: copy constr end\n");
501}
502
503/************************************************************/
504// free the stream array and the streams pointers
505template <class T, class Key>
507{
508
509 assert(data);
510 // delete the m streams in the buffer
511 get_streams();
512 for (unsigned int i = 0; i < index; i++) {
513 assert(data[i]);
514#ifdef SAVE_MEMORY
515 check_name(i);
516 delete name[i];
517#endif
518 delete data[i];
519 data[i] = NULL;
520 }
521
522 delete[] data;
523 delete[] deleted;
524 delete[] streamsize;
525#ifdef SAVE_MEMORY
526 delete[] name;
527#endif
528}
529
530#ifdef SAVE_MEMORY
531/************************************************************/
532// checks that name[i] is the same as stream name; stream i must be in
533// memory (by a previous get_stream call, for instance) in order to
534// find its length
535template <class T, class Key>
536void em_buffer<T, Key>::check_name(unsigned int i UNUSED)
537{
538
539#ifdef EMBUF_CHECK_NAME
540 assert(i >= 0 && i < index);
541 assert(data[i]);
542
543 char *fooname;
544 data[i]->name(&fooname); // name() allocates the string
545#ifdef EMBUF_CHECK_NAME_PRINT
546 cout << "::check_name: checking stream [" << level << "," << i
547 << "] name:" << fooname << endl;
548 cout.flush();
549#endif
550 if (strcmp(name[i], fooname) != 0) {
551 cerr << "name[" << i << "]=" << name[i] << ", streamname=" << fooname
552 << endl;
553 }
554 assert(strcmp(fooname, name[i]) == 0);
555 delete fooname;
556#endif
557}
558#endif
559
560/************************************************************/
561// if SAVE_MEMORY flag is set, load the stream in memory; return the
562// ith stream
563template <class T, class Key>
565{
566
567 assert(i < index);
568
569#ifdef SAVE_MEMORY
570 MY_LOG_DEBUG_ID("em_buffer::get_stream");
572
573 if (data[i] == NULL) {
574
575 // stream is on disk, load it in memory
576 assert(name[i]);
577 MY_LOG_DEBUG_ID("load stream in memory");
579
580#ifdef EMBUF_PRINT_GETPUT_STREAM
581 cout << "get_stream:: name[" << i << "]=" << name[i] << " from disk\n";
582 cout.flush();
583#endif
584
585 // assert that file exists
586 FILE *fp;
587 if ((fp = fopen(name[i], "rb")) == NULL) {
588 cerr << "get_stream: checking that stream " << name[i]
589 << "exists\n";
590 perror(name[i]);
591 assert(0);
592 exit(1);
593 }
594 fclose(fp);
595
596 // create an AMI_STREAM from file
597 data[i] = new AMI_STREAM<T>(name[i]);
598 assert(data[i]);
599 }
600 else {
601
602 // if data[i] not NULL, stream must be already in memory
603 MY_LOG_DEBUG_ID("stream not NULL");
604 MY_LOG_DEBUG_ID(data[i]->sprint());
605 }
606#endif
607
608 // NOW STREAM IS IN MEMORY
609
610 // some assertion checks
611 assert(data[i]);
612 assert(data[i]->stream_len() == streamsize[i]);
613#ifdef SAVE_MEMORY
614 check_name(i);
615#endif
616
617 return data[i];
618}
619
620/************************************************************/
621// if SAVE_MEMORY flag is set, put the i'th stream back on disk
622template <class T, class Key>
624{
625
626 assert(i < index);
627
628#ifdef SAVE_MEMORY
629 MY_LOG_DEBUG_ID("em_buffer::put_stream");
631
632 if (data[i] != NULL) {
633
634 // stream is in memory, put it on disk
635 MY_LOG_DEBUG_ID("stream put to disk");
636 MY_LOG_DEBUG_ID(data[i]->sprint());
637
638 check_name(i);
639#ifdef EMBUF_PRINT_GETPUT_STREAM
640 cout << "put_stream:: name[" << i << "]=" << name[i] << " to disk\n";
641 cout.flush();
642#endif
643
644 // make stream persistent and delete it
645 data[i]->persist(PERSIST_PERSISTENT);
646 delete data[i];
647 data[i] = NULL;
648 }
649 else {
650
651 // data[i] is NULL, so stream must be already put on disk
652 MY_LOG_DEBUG_ID("stream is NULL");
653 }
654#endif
655}
656
657/************************************************************/
658// return a pointer to the streams of the buffer
659template <class T, class Key>
661{
662
663#ifdef SAVE_MEMORY
664 MY_LOG_DEBUG_ID("em_buffer::get_streams: reading streams from disk");
665#ifdef EMBUF_PRINT_GETPUT_STREAMS
666 cout << "em_buffer::get_streams (buffer " << level << ")";
667 cout << ": index = " << index << "(arity=" << arity << ")\n";
668 cout.flush();
669#endif
670
671 for (unsigned int i = 0; i < index; i++) {
672 get_stream(i);
673 assert(data[i]);
674 }
675
676#endif
677
678 return data;
679}
680
681/************************************************************/
682// called in pair with load_streams to store streams on disk
683// and release the memory
684template <class T, class Key>
686{
687
688#ifdef SAVE_MEMORY
689 MY_LOG_DEBUG_ID("em_buffer::put_streams: writing streams on disk");
690#ifdef EMBUF_PRINT_GETPUT_STREAMS
691 cout << "em_buffer::put_streams (buffer " << level << ")";
692 cout << ": index = " << index << "(arity=" << arity << ")\n";
693 cout.flush();
694#endif
695
696 for (unsigned int i = 0; i < index; i++) {
697 put_stream(i);
698 assert(data[i] == NULL);
699 }
700#endif
701}
702
703#ifdef SAVE_MEMORY
704/************************************************************/
705// return the name of the ith stream
706template <class T, class Key>
707char *em_buffer<T, Key>::get_stream_name(unsigned int i) const
708{
709
710 assert(i >= 0 && i < index);
711 assert(name[i]);
712 return name[i];
713}
714#endif
715
716#ifdef SAVE_MEMORY
717/************************************************************/
718template <class T, class Key>
720{
721 unsigned int i;
722 for (i = 0; i < index; i++) {
723 assert(name[i]);
724 cout << "stream " << i << ": " << name[i] << endl;
725 }
726 cout.flush();
727}
728#endif
729
730/************************************************************/
731// clean buffer in case some streams have been emptied by deletion
732template <class T, class Key>
734{
735
736 MY_LOG_DEBUG_ID("em_buffer::cleanup()");
737#ifdef EMBUF_CLEANUP_PRINT
738#ifdef SAVE_MEMORY
739 if (index > 0) {
740 cout << "before cleanup:\n";
741 print_stream_names();
742 print_stream_sizes();
743 cout.flush();
744 }
745#endif
746#endif
747
748 // load all streams in memory
749 get_streams();
750
751 // count streams of size=0
752 unsigned int i, empty = 0;
753 for (i = 0; i < index; i++) {
754
755 if (get_stream_len(i) == 0) {
756 // printing..
757#ifdef EMBUF_DELETE_STREAM_PRINT
758 cout << "deleting stream [" << level << "," << i << "]:";
759#ifdef SAVE_MEMORY
760 cout << name[i];
761#endif
762 cout << endl;
763 cout.flush();
764#endif
765
766#ifdef SAVE_MEMORY
767 // stream is empty ==> delete its name
768 assert(name[i]);
769 delete name[i];
770 name[i] = NULL;
771#endif
772
773 // stream is empty ==> reset data
774 assert(data[i]);
775 // data[i]->persist(PERSIST_DELETE); //this is done automatically..
776 delete data[i];
777 data[i] = NULL;
778 deleted[i] = 0;
779 streamsize[i] = 0;
780 empty++;
781 }
782 }
783 // streams are in memory; all streams which are NULL must have been
784 // deleted
785
786 // shift streams to the left in case holes were introduced
787 unsigned int j = 0;
788 if (empty) {
789#ifdef EMBUF_DELETE_STREAM_PRINT
790 cout << "em_buffer::cleanup: shifting streams\n";
791 cout.flush();
792#endif
793 for (i = 0; i < index; i++) {
794 // if i'th stream is not empty, shift it left if necessary
795 if (data[i]) {
796 if (i != j) {
797 // set j'th stream to point to i'th stream
798 // cout << j << " set to " << i << endl; cout.flush();
799 data[j] = data[i];
800 deleted[j] = deleted[i];
801 streamsize[j] = streamsize[i];
802 // set i'th stream to point to NULL
803 data[i] = NULL;
804 deleted[i] = 0;
805 streamsize[i] = 0;
806#ifdef SAVE_MEMORY
807 // fix the names
808 /* already done assert(name[j]); */
809 /* delete name[j]; */
810 name[j] = name[i];
811 name[i] = NULL;
812 check_name(j);
813#endif
814 }
815 else {
816 // cout << i << " left the same" << endl;
817 }
818 j++;
819 } // if data[i] != NULL
820 } // for i
821
822 // set the index
823 assert(index == j + empty);
824 index = j;
825
826#ifdef EMBUF_DELETE_STREAM_PRINT
827 cout << "em_buffer::cleanup: index set to " << index << endl;
828 cout.flush();
829#endif
830 } // if empty
831
832 // put streams back to disk
833 put_streams();
834
835#ifdef EMBUF_CLEANUP_PRINT
836#ifdef SAVE_MEMORY
837 if (index > 0) {
838 cout << "after cleanup:\n";
839 print_stream_names();
840 print_stream_sizes();
841 cout.flush();
842 }
843#endif
844#endif
845}
846
847/************************************************************/
848// delete all streams
849template <class T, class Key>
851{
852
853 get_streams();
854
855 // make streams not-persistent and delete them
856 for (unsigned int i = 0; i < index; i++) {
857 assert(data[i]);
858 assert(streamsize[i] == data[i]->stream_len());
859#ifdef SAVE_MEMORY
860 check_name(i);
861 assert(name[i]);
862 delete name[i];
863 name[i] = NULL;
864#endif
865
866 data[i]->persist(PERSIST_DELETE);
867
868 delete data[i];
869 data[i] = NULL;
870 deleted[i] = 0;
871 streamsize[i] = 0;
872 }
873
874 index = 0;
875}
876
877/************************************************************/
878// create and return a stream which contains all elements of
879// all streams of the buffer in sorted ascending order of
880// their keys (priorities);
881template <class T, class Key>
883{
884
885 // create stream
886 MEMORY_LOG("em_buffer::sort: allocate new AMI_STREAM\n");
887
889 new AMI_STREAM<T>(); /* will be deleted in insert() */
891
892 // merge the streams into sorted stream
894 // Key dummykey;
895 // must modify this to seek after deleted[i] elements!!!!!!!!!!!!!
896 // aerr = MIAMI_single_merge_Key(data, arity, sorted_stream,
897 // 0, dummykey);
898 // could not use AMI_merge so i had to write my own..
899
900 get_streams();
901
902 aerr = substream_merge(data, arity, sorted_stream);
904
905 put_streams();
906
907 return sorted_stream;
908}
909
910/************************************************************/
911/* merge the input streams; there are <arity> streams in total; write
912 output in <outstream>;
913
914 the input streams are assumed sorted in increasing order of their
915 keys;
916
917 assumes the instreams are in memory (no need for get_streams()) */
918template <class T, class Key>
920 unsigned int arity,
922{
923
924 unsigned int i, j;
925
926 // some assertion checks
929 for (i = 0; i < arity; i++) {
930 assert(instreams[i]);
931#ifdef SAVE_MEMORY
932 check_name(i);
933#endif
934 }
935
937 arity); // pointers to current leading elements of streams
939
940 char str[200];
941 snprintf(str, sizeof(str),
942 "em_buffer::substream_merge: allocate keys array, total %ldB\n",
943 (long)((long)arity * sizeof(merge_key<Key>)));
944 MEMORY_LOG(str);
945
946 // keys array is initialized with smallest key from each stream (only
947 // non-null keys must be included)
949 // merge_key<Key>* keys = new (merge_key<Key>)[arity];
950 typedef merge_key<Key> footype;
951 keys = new footype[arity];
952 assert(keys);
953
954 // count number of non-empty streams
955 j = 0;
956 // rewind and read the first item from every stream initializing
957 // in_objects and keys
958 for (i = 0; i < arity; i++) {
959 assert(instreams[i]);
960 // rewind stream
961 if ((ami_err = instreams[i]->seek(deleted[i])) != AMI_ERROR_NO_ERROR) {
962 return ami_err;
963 }
964 // read first item from stream
965 if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
968 in_objects[i] = NULL;
969 }
970 else {
971 return ami_err;
972 }
973 }
974 else {
975 // include this key in the array of keys
976 Key k = in_objects[i]->getPriority();
977 keys[j].set(k, i);
978 j++;
979 }
980 }
981 unsigned int NonEmptyRuns = j;
982
983 // build heap from the array of keys
985
986 // repeatedly extract_min from heap, write it to output stream and
987 // insert next element from same stream
989 // rewind output buffer
990 ami_err = outstream->seek(0);
992 while (!mergeheap.empty()) {
993 // find min key and id of the stream from whereit comes
994 mergeheap.min(minelt);
995 i = minelt.stream_id();
996 // write min item to output stream
997 if ((ami_err = outstream->write_item(*in_objects[i])) !=
999 return ami_err;
1000 }
1001 // read next item from same input stream
1002 if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
1005 return ami_err;
1006 }
1007 }
1008 // extract the min from the heap and insert next key from same stream
1010 mergeheap.delete_min();
1011 }
1012 else {
1013 Key k = in_objects[i]->getPriority();
1014 merge_key<Key> nextit(k, i);
1015 mergeheap.delete_min_and_insert(nextit);
1016 }
1017 } // while
1018
1019 // delete [] keys;
1020 //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
1021 // DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT) IF
1022 // I DELETE KEYS EXPLICITLY, THEY WILL BE DELETED AGAIN BY DESTRUCTOR,
1023 // AND EVERYTHING SCREWS UP..
1024
1025 return AMI_ERROR_NO_ERROR;
1026}
1027
1028/************************************************************/
1029// insert an array into the buffer; assume array is sorted; return the
1030// number of items actually inserted; if SAVE_MEMORY FLAG is on, put
1031// stream on disk and release its memory
1032template <class T, class Key>
1034{
1035
1036 assert(a);
1037
1038 if (is_full()) {
1039 cout << "em_buffer::insert: buffer full\n";
1040 return 0;
1041 }
1042
1043 // can only insert one full stream at a time
1044 // relaxed..
1045 // assert(n == get_stream_maxlen());
1046
1047 // create the stream
1048 MEMORY_LOG("em_buffer::insert(from array): allocate AMI_STREAM\n");
1049 AMI_STREAM<T> *str = new AMI_STREAM<T>();
1050 assert(str);
1051
1052 // write the array to stream
1053 AMI_err ae;
1054 for (long i = 0; i < n; i++) {
1055 ae = str->write_item(a[i]);
1057 }
1058 assert(n == str->stream_len());
1059
1060 // insert the stream in the buffer
1061 return insert(str);
1062}
1063
1064/************************************************************/
1065/* insert a stream into the buffer; the next free entry in the buffer
1066 is set to point to the stream; if SAVE_MEMORY flag is on, the
1067 stream is put to disk;
1068
1069 the <nextstream> pointer of buffer is set to point to the argument
1070 stream; (in this way no stream copying is done, just one pointer
1071 copy). The user should be aware the argument stream is 'lost' -
1072 that is a stream which cannot be inserted repeatedly into many
1073 buffers because this would lead to several buffers pointing to the
1074 same stream.
1075
1076 stream is assume stream is sorted; bos = how many elements must be
1077 skipped (were deleted) from the beginning of stream;
1078
1079 return the number of items actually inserted */
1080template <class T, class Key>
1082{
1083
1084 assert(str);
1085
1086 if (is_full()) {
1087 cout << "em_buffer::insert: buffer full\n";
1088 return 0;
1089 }
1090
1091 // can only insert one level-i-full-stream at a time;
1092 // relaxed..can specify bos;
1093 // not only that, but the length of the stream can be smaller
1094 // than nominal length, because a stream is normally obtained by
1095 // merging streams which can be shorter;
1096 // assert(str->stream_len() == get_stream_len() - bos);
1097
1098#ifdef EMBUF_CHECK_INSERT
1099 // check that stream is sorted
1100 cout << "CHECK_INSERT: checking stream is sorted\n";
1101 AMI_err ae;
1102 str->seek(0);
1103 T *crt = NULL, *prev = NULL;
1104 while (str->read_item(&crt)) {
1106 if (prev)
1107 assert(*prev <= *crt);
1108 }
1109#endif
1110
1111 // nextstream must be empty
1112 assert(str);
1113 assert(data[nextstream()] == NULL);
1114 assert(deleted[nextstream()] == 0);
1115 assert(streamsize[nextstream()] == 0);
1116#ifdef SAVE_MEMORY
1117 assert(name[nextstream()] == NULL);
1118#endif
1119
1120 // set next entry i the buffer to point to this stream
1121 data[nextstream()] = str;
1122 deleted[nextstream()] = bos;
1123 streamsize[nextstream()] = str->stream_len();
1124#ifdef SAVE_MEMORY
1125 // set next name entry in buffer to point to this stream's name
1126 char *s;
1127 str->name(&s); // name() allocates the string
1128 name[nextstream()] = s;
1129
1130 // put stream on disk and release its memory
1132 delete str; // stream should be persistent; just delete it
1133 data[nextstream()] = NULL;
1134
1135#ifdef EMBUF_PRINT_INSERT
1136 cout << "insert stream " << s << " at buf [" << level << "," << nextstream()
1137 << "]" << endl;
1138#endif
1139#endif
1140
1141 // increment the index of next available stream in buffer
1142 incr_nextstream();
1143
1144#ifdef EMBUF_PRINT_INSERT
1145 print_stream_sizes();
1146 print_stream_names();
1147#endif
1148
1149#ifdef SAVE_MEMORY
1150 MY_LOG_DEBUG_ID("em_buffer::insert(): inserted stream ");
1151 MY_LOG_DEBUG_ID(name[nextstream() - 1]);
1152#endif
1153
1154 // return nb of items inserted
1155 return get_stream_len(nextstream() - 1);
1156}
1157
1158/************************************************************/
1159// print the elements of the i'th stream of the buffer to a stream;
1160// assumes stream is in memory;
1161template <class T, class Key>
1162void em_buffer<T, Key>::print_stream(ostream &s, unsigned int i)
1163{
1164
1165 assert(data[i]);
1166 assert((i >= 0) && (i < index));
1167
1168 AMI_err ae;
1169 T *x;
1170
1171 s << "STREAM " << i << ": [";
1172
1173 ae = data[i]->seek(deleted[i]);
1175
1176 for (long j = 0; j < get_stream_len(i); j++) {
1177 ae = data[i]->read_item(&x);
1179 s << *x << ",";
1180 }
1181 s << "]\n";
1182}
1183
1184/************************************************************/
1185// print elements range in buffer (read first and last element in each
1186// substream and find global min and max)
1187template <class T, class Key>
1189{
1190
1191 T *min, *max;
1192 AMI_err ae;
1193
1194 get_streams();
1195
1196 for (unsigned int i = 0; i < index; i++) {
1197 cout << "[";
1198 // read min element in substream i
1199 ae = data[i]->seek(deleted[i]);
1201 ae = data[i]->read_item(&min);
1203 cout << min->getPriority() << "..";
1204 // read max element in substream i
1205 ae = data[i]->seek(streamsize[i] - 1);
1207 ae = data[i]->read_item(&max);
1209 cout << max->getPriority() << " (sz=" << get_stream_len(i) << ")] ";
1210 }
1211 for (unsigned int i = index; i < arity; i++) {
1212 cout << "[] ";
1213 }
1214
1215 put_streams();
1216}
1217
1218/************************************************************/
1219// print all elements in buffer
1220template <class T, class Key>
1222{
1223
1224 T *x;
1225 AMI_err ae;
1226
1227 get_streams();
1228
1229 for (unsigned int i = 0; i < index; i++) {
1230 cout << " [";
1231 ae = data[i]->seek(deleted[i]);
1233 for (unsigned long j = 0; j < get_stream_len(i); j++) {
1234 ae = data[i]->read_item(&x);
1236 cout << x->getPriority() << ",";
1237 }
1238 cout << "]" << endl;
1239 }
1240 for (unsigned int i = index; i < arity; i++) {
1241 cout << "[] ";
1242 }
1243
1244 put_streams();
1245}
1246
1247/************************************************************/
1248// print the sizes of the substreams in the buffer
1249template <class T, class Key>
1251{
1252
1253 cout << "(streams=" << index << ") sizes=[";
1254 for (unsigned int i = 0; i < arity; i++) {
1255 cout << get_stream_len(i) << ",";
1256 }
1257 cout << "]" << endl;
1258 cout.flush();
1259}
1260
1261#endif
@ PERSIST_DELETE
Definition ami_stream.h:114
@ PERSIST_PERSISTENT
Definition ami_stream.h:116
AMI_err
Definition ami_stream.h:83
@ AMI_ERROR_END_OF_STREAM
Definition ami_stream.h:86
@ AMI_ERROR_NO_ERROR
Definition ami_stream.h:84
#define NULL
Definition ccmath.h:32
char * sprint()
Definition ami_stream.h:652
AMI_err write_item(const T &elt)
Definition ami_stream.h:588
AMI_err name(char **stream_name)
Definition ami_stream.h:426
AMI_err seek(off_t offset)
Definition ami_stream.h:445
AMI_err read_item(T **elt)
Definition ami_stream.h:525
off_t stream_len(void)
Definition ami_stream.h:375
void persist(persistence p)
Definition ami_stream.h:639
const char * name() const
Definition ami_stream.h:437
unsigned int laststream() const
Definition embuffer.h:254
void cleanup()
Definition embuffer.h:733
void incr_deleted(unsigned int i)
Definition embuffer.h:279
long insert(T *a, long n)
Definition embuffer.h:1033
void print_stream_sizes()
Definition embuffer.h:1250
unsigned short get_level() const
Definition embuffer.h:235
unsigned int nextstream() const
Definition embuffer.h:257
void put_streams()
Definition embuffer.h:685
void print_range()
Definition embuffer.h:1188
em_buffer(const unsigned short i, const unsigned long bs, const unsigned int ar)
Definition embuffer.h:409
unsigned long get_stream_len(unsigned int i)
Definition embuffer.h:294
long total_deleted() const
Definition embuffer.h:269
AMI_STREAM< T > * sort()
Definition embuffer.h:882
void reset()
Definition embuffer.h:850
unsigned long get_buf_maxlen()
Definition embuffer.h:312
bool is_empty()
Definition embuffer.h:315
unsigned long get_buf_len()
Definition embuffer.h:302
unsigned long get_stream_maxlen() const
Definition embuffer.h:287
friend ostream & operator<<(ostream &s, em_buffer &b)
Definition embuffer.h:364
bool is_full() const
Definition embuffer.h:318
unsigned int get_nbstreams() const
Definition embuffer.h:263
void incr_nextstream()
Definition embuffer.h:260
AMI_STREAM< T > ** get_streams()
Definition embuffer.h:660
void print()
Definition embuffer.h:1221
void put_stream(unsigned int i)
Definition embuffer.h:623
unsigned int get_arity() const
Definition embuffer.h:266
AMI_STREAM< T > * get_stream(unsigned int i)
Definition embuffer.h:564
long * get_bos() const
Definition embuffer.h:251
friend int operator!=(const merge_key &x, const merge_key &y)
Definition embuffer.h:132
KEY key() const
Definition embuffer.h:108
friend int operator<=(const merge_key &x, const merge_key &y)
Definition embuffer.h:120
friend ostream & operator<<(ostream &s, const merge_key< KEY > &x)
Definition embuffer.h:112
merge_key()
Definition embuffer.h:97
friend merge_key operator+(const merge_key &x, const merge_key &y)
Definition embuffer.h:140
void set(const KEY &x, const unsigned int sid)
Definition embuffer.h:103
friend int operator==(const merge_key &x, const merge_key &y)
Definition embuffer.h:136
unsigned int stream_id() const
Definition embuffer.h:109
unsigned int str_id
Definition embuffer.h:94
KEY getPriority() const
Definition embuffer.h:110
friend int operator>=(const merge_key &x, const merge_key &y)
Definition embuffer.h:128
merge_key(const KEY &x, const unsigned int sid)
Definition embuffer.h:99
friend int operator<(const merge_key &x, const merge_key &y)
Definition embuffer.h:116
friend int operator>(const merge_key &x, const merge_key &y)
Definition embuffer.h:124
#define min(x, y)
Definition draw2.c:29
#define max(x, y)
Definition draw2.c:30
#define MY_LOG_DEBUG_ID(x)
Definition embuffer.h:50
#define UNUSED
A macro for an attribute, if attached to a variable, indicating that the variable is not used.
Definition gis.h:46
#define assert(condition)
Definition lz4.c:291
void MEMORY_LOG(const std::string &str)
Definition mm_utils.cpp:59
double b
Definition r_raster.c:39
#define x