GRASS GIS 7 Programmer's Manual  7.9.dev(2021)-e5379bbd7
replacementHeapBlock.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 
37 #ifndef REPLACEMENT_HEAPBLOCK_H
38 #define REPLACEMENT_HEAPBLOCK_H
39 
40 #include <assert.h>
41 
42 #include "mem_stream.h"
43 #include "replacementHeap.h"
44 
45 
46 #define RBHEAP_DEBUG if(0)
47 
48 
49 
50 /*****************************************************************/
51 /* encapsulation of the element and the run it comes from;
52  */
53 template<class T>
55 public:
56  T value;
58 
59  BlockHeapElement(): run(NULL) {};
60 
61  friend ostream& operator << (ostream& s, const BlockHeapElement &p) {
62  return s << "[" << p.value << "]";
63  };
64 };
65 
66 
67 
68 
69 
70 
71 /*****************************************************************/
72 /*
73 This is a heap of HeapElements, i.e. elements which come from streams;
74 when an element is consumed, the heap knows how to replace it with the
75 next element from the same stream.
76 
77 Compare is a class that has a member function called "compare" which
78 is used to compare two elements of type T
79 */
80 template<class T,class Compare>
82 private:
83  BlockHeapElement<T>* mergeHeap; //the heap;
84  size_t arity; //max size
85  size_t size; //represents actual size, i.e. the nb of (non-empty)
86  //runs; they are stored contigously in the first
87  //<size> positions of mergeHeap; once a run becomes
88  //empty, it is deleted and size is decremented. size
89  //stores the next position where a HeapElement can be
90  //added.
91 
92 
93 protected:
94  void heapify(size_t i);
95 
96  void buildheap();
97 
98  /* for each run in the heap, read an element from the run into the heap */
99  void init();
100 
101  /* add a run; make sure total nb of runs does not exceed heap arity */
102  void addRun(MEM_STREAM<T> *run);
103 
104  /* delete the i-th run (and the element); that is, swap the ith run
105  with the last one, and decrement size; just like in a heap, but
106  no heapify. Note: this function messes up the heap order. If the
107  user wants to maintain heap property should call heapify
108  specifically.
109  */
110  void deleteRun(size_t i);
111 
112 public:
113  //allocate array mergeHeap, where the streams are stored in runList
115 
116  //delete array mergeHeap
118 
119  //is heap empty?
120  int empty() const {
121  return (size == 0);
122  }
123 
124  //delete mergeHeap[0].value, replace it with the next element from
125  //the same stream, and re-heapify
126  T extract_min();
127 
128 
129  ostream & print(ostream& s) const {
130  s << "ReplacementheapBlock " << this << ": " << size << " runs";
131 #if(0)
132  char* runname;
133  off_t runlen;
134  for(size_t i=0; i<size; i++) {
135  s << endl << " <- i=" << i<< ": " << mergeHeap[i].run;
136  assert(mergeHeap[i].run);
137  mergeHeap[i].run->name(&runname);
138  runlen = mergeHeap[i].run->stream_len();
139  s << ", " << runname << ", len=" << runlen;
140  delete runname; //this should be safe
141  }
142 #endif
143  s << endl;
144  return s;
145  }
146 
147 };
148 
149 
150 
151 
152 /*****************************************************************/
153 //allocate array mergeHeap, where the streams are stored in runList
154 template<class T,class Compare>
157 
158  RBHEAP_DEBUG cerr << "ReplacementHeapBlock " << endl;
159 
160  arity = runList->length();
161 
162  size = 0; //no run yet
163 
164  MEM_STREAM<T>* str;
165  mergeHeap = new BlockHeapElement<T>[arity];
166  for (unsigned int i=0; i< arity; i++) {
167  //pop a stream from the list and add it to heap
168  str = NULL;
169  runList->dequeue(&str);
170  assert(str);
171  addRun(str);
172  }
173  init();
174 }
175 
176 
177 /*****************************************************************/
178 template<class T,class Compare>
179 ReplacementHeapBlock<T,Compare>::~ReplacementHeapBlock<T,Compare>() {
180 
181  if (!empty()) {
182  cerr << "warning: ~ReplacementHeapBlock: heap not empty!\n";
183  }
184  //delete the runs first
185  for(size_t i=0; i<size; i++) {
186  if (mergeHeap[i].run)
187  delete mergeHeap[i].run;
188  }
189  delete [] mergeHeap;
190 }
191 
192 
193 
194 /*****************************************************************/
195 /* add a run; make sure total nb of runs does not exceed heap arity
196  */
197 template<class T,class Compare>
198 void
200 
201  assert(r);
202 
203  if(size == arity) {
204  cerr << "ReplacementHeapBlockBlock::addRun size =" << size << ",arity=" << arity
205  << " full, cannot add another run.\n";
206  assert(0);
207  exit(1);
208  }
209  assert(size < arity);
210 
211  mergeHeap[size].run = r;
212  size++;
213 
214  RBHEAP_DEBUG
215  {char* strname;
216  r->name(&strname);
217  cerr << "ReplacementHeapBlock::addRun added run " << strname
218  << " (rheap size=" << size << ")" << endl;
219  delete strname;
220  cerr.flush();
221  }
222 }
223 
224 
225 
226 
227 
228 /*****************************************************************/
229 /* delete the i-th run (and the value); that is, swap ith element with
230  the last one, and decrement size; just like in a heap, but no
231  heapify. Note: this function messes up the heap order. If the user
232  wants to maintain heap property should call heapify specifically.
233  */
234 template<class T,class Compare>
235 void
237 
238  assert(i >= 0 && i < size && mergeHeap[i].run);
239 
240  RBHEAP_DEBUG
241  {
242  cerr << "ReplacementHeapBlock::deleteRun deleting run " << i << ", "
243  << mergeHeap[i].run << endl;
244  print(cerr);
245  }
246 
247 
248  //delete it
249  delete mergeHeap[i].run;
250  //and replace it with
251  if (size > 1) {
252  mergeHeap[i].value = mergeHeap[size-1].value;
253  mergeHeap[i].run = mergeHeap[size-1].run;
254  }
255  size--;
256 }
257 
258 
259 
260 
261 /*****************************************************************/
262 /* for each run in the heap, read an element from the run into the
263  heap; if ith run is empty, delete it
264 */
265 template<class T,class Compare>
266 void
268  AMI_err err;
269  T* elt;
270  size_t i;
271 
272  RBHEAP_DEBUG cerr << "ReplacementHeapBlock::init " ;
273 
274  i=0;
275  while (i<size) {
276 
277  assert(mergeHeap[i].run);
278 
279  // Rewind run i
280  err = mergeHeap[i].run->seek(0);
281  if (err != AMI_ERROR_NO_ERROR) {
282  cerr << "ReplacementHeapBlock::Init(): cannot seek run " << i << "\n";
283  assert(0);
284  exit(1);
285  }
286  //read first item from run i
287  err = mergeHeap[i].run->read_item(&elt);
288  if (err != AMI_ERROR_NO_ERROR) {
289  if (err == AMI_ERROR_END_OF_STREAM) {
290  deleteRun(i);
291  //need to iterate one more time with same i;
292  } else {
293  cerr << "ReplacementHeapBlock::Init(): cannot read run " << i << "\n";
294  assert(0);
295  exit(1);
296  }
297  } else {
298  //copy.... can this be avoided? xxx
299  mergeHeap[i].value = *elt;
300 
301  i++;
302  }
303  }
304  buildheap();
305 }
306 
307 
308 
309 
310 /*****************************************************************/
311 template<class T,class Compare>
312 void
314  size_t min_index = i;
315  size_t lc = rheap_lchild(i);
316  size_t rc = rheap_rchild(i);
317 
318  Compare cmpobj;
319  assert(i >= 0 && i < size);
320  if ((lc < size) &&
321  (cmpobj.compare(mergeHeap[lc].value, mergeHeap[min_index].value) == -1)) {
322  min_index = lc;
323  }
324  if ((rc < size) &&
325  (cmpobj.compare(mergeHeap[rc].value, mergeHeap[min_index].value) == -1)) {
326  min_index = rc;
327  }
328 
329  if (min_index != i) {
330  BlockHeapElement<T> tmp = mergeHeap[min_index];
331 
332  mergeHeap[min_index] = mergeHeap[i];
333  mergeHeap[i] = tmp;
334 
335 
336  heapify(min_index);
337  }
338 
339  return;
340 }
341 
342 /*****************************************************************/
343 template<class T,class Compare>
344 void
346 
347  if (size > 1) {
348  for (int i = rheap_parent(size-1); i>=0; i--) {
349  heapify(i);
350  }
351  }
352  RBHEAP_DEBUG cerr << "Buildheap done\n";
353  return;
354 }
355 
356 
357 /*****************************************************************/
358 template<class T,class Compare>
359 T
361  T *elt, min;
362  AMI_err err;
363 
364  assert(!empty()); //user's job to check first if it's empty
365  min = mergeHeap[0].value;
366 
367 
368  //read a new element from the same run
369  assert(mergeHeap[0].run);
370  err = mergeHeap[0].run->read_item(&elt);
371  if (err != AMI_ERROR_NO_ERROR) {
372  //if run is empty, delete it
373  if (err == AMI_ERROR_END_OF_STREAM) {
374  RBHEAP_DEBUG cerr << "rheap extract_min: run " << mergeHeap[0].run
375  << " empty. deleting\n ";
376  deleteRun(0);
377  } else {
378  cerr << "ReplacementHeapBlock::extract_min: cannot read\n";
379  assert(0);
380  exit(1);
381  }
382  } else {
383  //copy...can this be avoided?
384  mergeHeap[0].value = *elt;
385  }
386 
387  //restore heap
388  if (size > 0) heapify(0);
389 
390  return min;
391 }
392 
393 
394 
395 #endif
#define RBHEAP_DEBUG
#define min(x, y)
Definition: draw2.c:31
MEM_STREAM< T > * run
AMI_err name(char **stream_name)
Definition: mem_stream.h:103
#define NULL
Definition: ccmath.h:32
SYMBOL * err(FILE *fp, SYMBOL *s, char *msg)
Definition: symbol/read.c:220
Definition: queue.h:43
#define assert(condition)
Definition: lz4.c:324
ostream & print(ostream &s) const
ReplacementHeapBlock(queue< MEM_STREAM< T > *> *runList)
AMI_err
Definition: ami_stream.h:86
friend ostream & operator<<(ostream &s, const BlockHeapElement &p)
void addRun(MEM_STREAM< T > *run)
double r
Definition: r_raster.c:39
void init(double work[])
Definition: as177.c:65