GRASS GIS 7 Programmer's Manual  7.9.dev(2021)-e5379bbd7
empq.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 
37 
38 #ifndef __EMPQ_H
39 #define __EMPQ_H
40 
41 #include <stdio.h>
42 #include <assert.h>
43 
44 
45 
46 #include "ami_config.h" //for SAVE_MEMORY
47 #include "ami_stream.h"
48 #include "mm.h"
49 #include "mm_utils.h" //for MEMORY_LOG, getAvailableMemory
50 #include "imbuffer.h"
51 #include "embuffer.h"
52 #include "pqheap.h"
53 #include "minmaxheap.h"
54 
55 
56 
57 template<class T,class Key> class ExtendedEltMergeType;
58 #define ExtendedMergeStream AMI_STREAM<ExtendedEltMergeType<T,Key> >
59 
60 
61 /**********************************************************
62  DEBUGGING FLAGS
63 ***********************************************************/
64 
65 //enables printing messages when buffers are emptied
66 //#define EMPQ_EMPTY_BUF_PRINT
67 
68 //enables printing when pq gets filled from buffers
69 //#define EMPQ_PQ_FILL_PRINT
70 
71 //enables priting inserts
72 //#define EMPQ_PRINT_INSERT
73 
74 //enables printing deletes
75 //#define EMPQ_PRINT_EXTRACTALL
76 
77 //enables printing the empq on insert/extract_all_min
78 //#define EMPQ_PRINT_EMPQ
79 
80 //enable priting the size of the EMPQ and nb of active streams
81 //on fillpq() amd on empty_buff_0
82 //#define EMPQ_PRINT_SIZE
83 
84 //enable printing 'fill pq from B0' in extract_min()
85 //#define EMPQ_PRINT_FILLPQ_FROM_BUFF0
86 
87 //enable expensive size asserts
88 //#define EMPQ_ASSERT_EXPENSIVE
89 
90 
91 /**********************************************************/
92 
93 
94 
95 
96 
97 
98 /* external memory priority queue
99 
100  Functionality:
101 
102  Keep a pqueue PQ of size \theta(M) in memory. Keep a buffer B0 of
103  size \theta(M) in memory. keep an array of external-memory
104  buffers, one for each level 1..log_m{n/m} (where N is the maximum
105  number of items in pqueue at any time).
106 
107  invariants:
108  1. PQ contains the smallest items in the structure.
109 
110  2. each stream of any external memory buffers is sorted in
111  increasing order.
112 
113  insert(x): if (x < maximum_item(PQ) exchange x with
114  maximum_item(PQ); if buffer B0 is full, empty it; insert x in B0;
115 
116  extract_min():
117 
118  analysis:
119 
120  1. inserts: once the buffer B0 is empty, the next sizeof(B0)
121  inserts are free; one insert can cause many I/Os if cascading
122  emptying of external buffers Bi occurs. Emptying level-i buffer
123  costs <arity>^i*sizeof(B0)/B I/Os and occurs every
124  N/<arity>^i*sizeof(B0) inserts (or less, if deletes too). It can be
125  proved that the amortized time of 1 insert is 1/B*maxnb_buffers.
126 */
127 
128 /*
129 T is assumed to be a class for which getPriority() and getValue()
130 are implemented; for simplicity it is assumed that the comparison
131 operators have been overloaded on T such that
132 x < y <==> x.getPriority() < y.getPriority()
133 */
134 
135 template<class T, class Key>
136 class em_pqueue {
137 
138 private:
139 
140  //in memory priority queue
141  MinMaxHeap<T> *pq;
142 
143  //pqueue size
144  unsigned long pqsize;
145 
146  //in-memory buffer
147  im_buffer<T> *buff_0;
148 
149  //in-memory buffer size
150  unsigned long bufsize;
151 
152  //external memory buffers
153  em_buffer<T,Key>** buff;
154 
155  /* number of external memory buffers statically allocated in the
156  beginning; since the number of buffers needed is \log_m{n/m}, we
157  cannot know it in advance; estimate it roughly and then reallocate
158  it dynamically on request;
159 
160  TO DO: dynamic reallocation with a bigger nb of external buffer
161  if structure becomes full */
162  unsigned short max_nbuf;
163 
164  //index of next external buffer entry available for use (i.e. is NULL)
165  unsigned short crt_buf;
166 
167  //external buffer arity
168  unsigned int buf_arity;
169 
170 
171 public:
172 
173  //create an em_pqueue of specified size
174  em_pqueue(long pq_sz, long buf_sz, unsigned short nb_buf,
175  unsigned int buf_ar);
176 
177  //create an em_pqueue capable to store <= N elements
178  em_pqueue();
179  em_pqueue(long N) { em_pqueue(); }; // N not used
180 
181 #ifdef SAVE_MEMORY
182  // create an empq, initialize its pq with im and insert amis in
183  // buff[0]; im should not be used/deleted after that outside empq
185 #endif
186 
187  //copy constructor
188  em_pqueue(const em_pqueue &ep);
189 
190  //clean up
191  ~em_pqueue();
192 
193  //return the nb of elements in the structure
194  unsigned long size();
195 
196  //return true if empty
197  bool is_empty();
198 
199  //return true if full
200  bool is_full() {
201  cout << "em_pqueue::is_full(): sorry not implemented\n";
202  exit(1);
203  }
204 
205  //return the element with minimum priority in the structure
206  bool min(T& elt);
207 
208  //delete the element with minimum priority in the structure;
209  //return false if pq is empty
210  bool extract_min(T& elt);
211 
212  //extract all elts with min key, add them and return their sum
213  bool extract_all_min(T& elt);
214 
215  //insert an element; return false if insertion fails
216  bool insert(const T& elt);
217 
218  //return maximum capacity of i-th external buffer
219  long maxlen(unsigned short i);
220 
221  //return maximum capacity of em_pqueue
222  long maxlen();
223 
224  // delete all the data in the pq; reset to empty but don't free memory
225  void clear();
226 
227  //print structure
228  void print_range();
229 
230  void print();
231 
232  //print the detailed size of empq (pq, buf_0, buff[i])
233  void print_size();
234 
235  friend ostream& operator<<(ostream& s, const em_pqueue &empq) {
236  s << "EM_PQ: pq size=" << empq.pqsize
237  << ", buff_0 size=" << empq.bufsize
238  << ", ext_bufs=" << empq.crt_buf
239  << "(max " << empq.max_nbuf << ")\n";
240  s << "IN_MEMORY PQ: \n" << *(empq.pq) << "\n";
241  s << "IN_MEMORY BUFFER: \n" << *(empq.buff_0) << "\n";
242  for (unsigned short i=0; i < empq.crt_buf; i++) {
243  //s << "EM_BUFFER " << i << ":\n" ;
244  s << *(empq.buff[i]);
245  }
246  return s;
247  }
248 
249 
250 protected:
251  //return the nb of active streams in the buffer
253  int totstr = 0;
254  for (unsigned short i = 0; i< crt_buf; i++) {
255  totstr+= buff[i]->get_nbstreams();
256  }
257  return totstr;
258  }
259 
260  //called when buff_0 is full to empty it on external level_1 buffer;
261  //can produce cascading emptying
262  bool empty_buff_0();
263 
264  //sort and empty buffer i into buffer (i+1) recursively;
265  //called recursively by empty_buff_0() to empty subsequent buffers
266  //i must be a valid (i<crt_buf) full buffer
267  void empty_buff(unsigned short i);
268 
269 
270  /* merge the first <K> elements of the streams of input buffer,
271  starting at position <buf.deleted[i]> in each stream; there are
272  <buf.arity> streams in total; write output in <outstream>; the
273  items written in outstream are of type <merge_output_type> which
274  extends T with the stream nb and buffer nb the item comes from;
275  this information is needed later to distribute items back; do not
276  delete the K merged elements from the input streams; <bufid> is the
277  id of the buffer whose streams are being merged;
278 
279  the input streams are assumed sorted in increasing order of keys; */
281  ExtendedMergeStream *outstr, long K);
282 
283 
284  /* merge the first <K> elements of the input streams; there are
285  <arity> streams in total; write output in <outstream>;
286 
287  the input streams are assumed sorted in increasing order of their
288  keys; */
290  unsigned short arity,
291  ExtendedMergeStream *outstr, long K);
292 
293  //deletes one element from <buffer, stream>
294  void delete_str_elt(unsigned short buf_id,
295  unsigned int stream_id);
296 
297  /* copy the minstream in the internal pqueue while merging with
298  buff_0; the smallest <pqsize> elements between minstream and
299  buff_0 will be inserted in internal pqueue; also, the elements
300  from minstram which are inserted in pqueue must be marked as
301  deleted in the source streams; */
302  void merge_bufs2pq(ExtendedMergeStream *minstream);
303 
304  //clean buffers in case some streams have been emptied
305  void cleanup();
306 
307  //called when pq must be filled from external buffers
308  bool fillpq();
309 
310  //print the nb of elements in each stream
311  void print_stream_sizes();
312 };
313 
314 
315 
316 #endif
unsigned long size()
Definition: empq_impl.h:558
bool insert(const T &elt)
Definition: empq_impl.h:994
AMI_err merge_buffer(em_buffer< T, Key > *buf, ExtendedMergeStream *outstr, long K)
Definition: empq_impl.h:1244
bool is_full()
Definition: empq.h:200
bool extract_min(T &elt)
Definition: empq_impl.h:731
void empty_buff(unsigned short i)
Definition: empq_impl.h:1145
#define N
Definition: e_intersect.c:923
void print_size()
Definition: empq_impl.h:1540
bool extract_all_min(T &elt)
Definition: empq_impl.h:804
void cleanup()
Definition: empq_impl.h:964
bool min(T &elt)
Definition: empq_impl.h:664
friend ostream & operator<<(ostream &s, const em_pqueue &empq)
Definition: empq.h:235
long maxlen()
Definition: empq_impl.h:545
#define ExtendedMergeStream
Definition: empq.h:58
void delete_str_elt(unsigned short buf_id, unsigned int stream_id)
Definition: empq_impl.h:949
bool is_empty()
Definition: empq_impl.h:571
void merge_bufs2pq(ExtendedMergeStream *minstream)
Definition: empq_impl.h:853
em_pqueue(long N)
Definition: empq.h:179
void clear()
Definition: empq_impl.h:1471
unsigned int get_nbstreams() const
Definition: embuffer.h:288
AMI_err
Definition: ami_stream.h:86
void print_stream_sizes()
Definition: empq_impl.h:1560
~em_pqueue()
Definition: empq_impl.h:500
bool fillpq()
Definition: empq_impl.h:583
AMI_err merge_streams(ExtendedMergeStream **instr, unsigned short arity, ExtendedMergeStream *outstr, long K)
Definition: empq_impl.h:1369
void print_range()
Definition: empq_impl.h:1487
bool empty_buff_0()
Definition: empq_impl.h:1077
void print()
Definition: empq_impl.h:1516
int active_streams()
Definition: empq.h:252