GRASS GIS 7 Programmer's Manual  7.9.dev(2021)-e5379bbd7
empq_adaptive_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 
37 #ifndef __EMPQ_ADAPTIVE_IMPL_H
38 #define __EMPQ_ADAPTIVE_IMPL_H
39 
40 #include <stdio.h>
41 #include <assert.h>
42 
43 #include "ami_config.h"
44 #include "ami_stream.h"
45 #include "mm.h"
46 #include "mm_utils.h"
47 #include "empq_adaptive.h"
48 
49 #include "ami_sort.h"
50 
51 
52 // EMPQAD_DEBUG defined in "empqAdaptive.H"
53 
54 
55 
56 
57 //------------------------------------------------------------
58 //allocate an internal pqueue of size precisely twice
59 //the size of the pqueue within the em_pqueue;
60 //
61 //This constructor uses a user defined amount of memory
62 
63 template<class T, class Key>
65  regim = INMEM;
66  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue"
67  << endl;
68 
69  //------------------------------------------------------------
70  //set the size precisely as in empq constructor since we cannot
71  //really call the em__pqueue constructor, because we don't want
72  //the space allocated; we just want the sizes;
73  //AMI_err ae;
74  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: "
75  << ( (float)inMem/ (1<< 20)) << "MB" << endl;
76 
77  initPQ(inMem);
78 }
79 
80 
81 //------------------------------------------------------------
82 // This more resembles the original constuctor which is greedy
83 template<class T, class Key>
85  regim = INMEM;
86  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue"
87  << endl;
88 
89  //------------------------------------------------------------
90  //set the size precisely as in empq constructor since we cannot
91  //really call the em__pqueue constructor, because we don't want
92  //the space allocated; we just want the sizes;
93  size_t mm_avail = getAvailableMemory();
94  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: available memory: "
95  << ( (float)mm_avail/ (1<< 20)) << "MB" << endl;
96 
97 
98  initPQ(mm_avail);
99 
100 }
101 
102 
103 //------------------------------------------------------------
104 // This metod initialized the PQ based on the memory passed
105 // into it
106 template<class T, class Key>
107 void
108 EMPQueueAdaptive<T,Key>::initPQ(size_t initMem) {
109  AMI_err ae;
110  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: "
111  << ( (float)initMem/ (1<< 20)) << "MB" << endl;
112 
113  /* same calculations as empq constructor in order to estimate
114  overhead memory; this is because we want to allocate a pqueue of
115  size exactly double the size of the pqueue inside the empq;
116  switching from this pqueue to empq when the memory fills up will
117  be simple */
118  //------------------------------------------------------------
119  //AMI_STREAM memory usage
120  size_t sz_stream;
121  AMI_STREAM<T> dummy;
122  if ((ae = dummy.main_memory_usage(&sz_stream,
125  cerr << "EMPQueueAdaptive constr: failing to get stream_usage\n";
126  exit(1);
127  }
128 
129 
130  //account for temporary memory usage
131  unsigned short max_nbuf = 2;
132  unsigned int buf_arity = initMem/(2 * sz_stream);
133  if (buf_arity > MAX_STREAMS_OPEN) buf_arity = MAX_STREAMS_OPEN;
134  unsigned long mm_overhead = buf_arity*sizeof(merge_key<Key>) +
135  max_nbuf * sizeof(em_buffer<T,Key>) +
136  2*sz_stream + max_nbuf*sz_stream;
137  mm_overhead *= 8; //overestimate..this should be fixed with
138  //a precise accounting of the extra memory required
139 
140  EMPQAD_DEBUG cout << "sz_stream: " << sz_stream << " buf_arity: " << buf_arity <<
141  " mm_overhead: " << mm_overhead << " mm_avail: " << initMem << ".\n";
142 
143 
144 
145  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: memory overhead set to "
146  << ((float)mm_overhead / (1 << 20)) << "MB" << endl;
147  if (mm_overhead > initMem) {
148  cerr << "overhead bigger than available memory ("<< initMem << "); "
149  << "increase -m and try again\n";
150  exit(1);
151  }
152  initMem -= mm_overhead;
153  //------------------------------------------------------------
154 
155 
156  long pqsize = initMem/sizeof(T);
157  EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: pqsize set to " << pqsize << endl;
158 
159  //initialize in memory pqueue and set em to NULL
160  im = new MinMaxHeap<T>(pqsize);
161  assert(im);
162  em = NULL;
163 }
164 
165 
166 template<class T, class Key>
168  switch(regim) {
169  case INMEM:
170  delete im;
171  break;
172  case EXTMEM:
173  delete em;
174  break;
175  case EXTMEM_DEBUG:
176  delete dim;
177  delete em;
178  break;
179  }
180 }
181 
182 
183 
184 //return the maximum nb of elts that can fit
185 template<class T, class Key>
186 long
188  long m=-1;
189  switch(regim) {
190  case INMEM:
191  assert(im);
192  m = im->get_maxsize();
193  break;
194  case EXTMEM:
195  assert(em);
196  m = em->maxlen();
197  break;
198  case EXTMEM_DEBUG:
199  m = em->maxlen();
200  break;
201  }
202  return m;
203 }
204 
205 
206 
207 
208 //return true if empty
209 template<class T, class Key>
210 bool
212  bool v = false;
213  switch(regim) {
214  case INMEM:
215  assert(im);
216  v = im->empty();
217  break;
218  case EXTMEM:
219  assert(em);
220  v = em->is_empty();
221  break;
222  case EXTMEM_DEBUG:
223  assert(dim->empty() == em->is_empty());
224  v = em->is_empty();
225  break;
226  }
227  return v;
228 }
229 
230 
231 //return true if full
232 template<class T, class Key>
233 bool
235  cerr << "EMPQueueAdaptive::is_full(): sorry not implemented\n";
236  assert(0);
237  exit(1);
238 }
239 
240 
241 //return the element with minimum priority in the structure
242 template<class T, class Key>
243 bool
245  bool v=false, v1;
246  T tmp;
247  switch(regim) {
248  case INMEM:
249  assert(im);
250  v = im->min(elt);
251  break;
252  case EXTMEM:
253  assert(em);
254  v = em->min(elt);
255  break;
256  case EXTMEM_DEBUG:
257  v1 = dim->min(tmp);
258  v = em->min(elt);
259  //dim->verify();
260  if(!(tmp==elt)) {
261  cerr << "------------------------------" << endl;
262  cerr << dim << endl;
263  cerr << "------------------------------" << endl;
264  em->print();
265  cerr << "------------------------------" << endl;
266  cerr << "tmp=" << tmp << endl;
267  cerr << "elt=" << elt << endl;
268  cerr << "------------------------------" << endl;
269  dim->destructiveVerify();
270  }
271  assert(v == v1);
272  assert(tmp == elt);
273  break;
274  }
275  return v;
276 }
277 
278 /* switch over to using an external priority queue */
279 template<class T, class Key>
280 void
282  switch(regim) {
283  case INMEM:
284  im->clear();
285  break;
286  case EXTMEM:
287  em->clear();
288  break;
289  case EXTMEM_DEBUG:
290  dim->clear();
291  break;
292  }
293 }
294 
295 
296 template<class T, class Key>
297 void
299  switch(regim) {
300  case INMEM:
301  im->verify();
302  break;
303  case EXTMEM:
304  break;
305  case EXTMEM_DEBUG:
306  dim->verify();
307  break;
308  }
309 }
310 
311 //extract all elts with min key, add them and return their sum
312 template<class T, class Key>
313 bool
315  bool v=false, v1;
316  T tmp;
317  switch(regim) {
318  case INMEM:
319  assert(im);
320  v = im->extract_all_min(elt);
321  break;
322  case EXTMEM:
323  assert(em);
324  v = em->extract_all_min(elt);
325  break;
326  case EXTMEM_DEBUG:
327  v1 = dim->extract_all_min(tmp);
328  v = em->extract_all_min(elt);
329  assert(dim->size() == em->size());
330  assert(v == v1);
331  assert(tmp == elt);
332  break;
333  }
334  return v;
335 }
336 
337 //return the nb of elements in the structure
338 template<class T, class Key>
339 long
341  long v=0, v1;
342  switch(regim) {
343  case INMEM:
344  assert(im);
345  v = im->size();
346  break;
347  case EXTMEM:
348  assert(em);
349  v = em->size();
350  break;
351  case EXTMEM_DEBUG:
352  v1 = dim->size();
353  v = em->size();
354  assert(v == v1);
355  break;
356  }
357  return v;
358 }
359 
360 
361 
362 
363 // ----------------------------------------------------------------------
364 template<class T, class Key>
365 bool
367  bool v=false, v1;
368  T tmp;
369  switch(regim) {
370  case INMEM:
371  assert(im);
372  v = im->extract_min(elt);
373  break;
374  case EXTMEM:
375  assert(em);
376  v = em->extract_min(elt);
377  break;
378  case EXTMEM_DEBUG:
379  v1 = dim->extract_min(tmp);
380  v = em->extract_min(elt);
381  assert(v == v1);
382  assert(tmp == elt);
383  assert(dim->size() == em->size());
384  break;
385  }
386  return v;
387 }
388 
389 
390 
391 
392 //------------------------------------------------------------
393  /* insert an element; if regim == INMEM, try insert it in im, and if
394  it is full, extract_max pqsize/2 elements of im into a stream,
395  switch to EXTMEM and insert the stream into em; if regim is
396  EXTMEM, insert in em; */
397 template<class T, class Key>
398 bool
400  bool v=false;
401  switch(regim) {
402  case INMEM:
403  if (!im->full()) {
404  im->insert(elt);
405  v = true;
406  } else {
407  makeExternal();
408  v = em->insert(elt); //insert the element
409  }
410  break;
411  case EXTMEM:
412  v = em->insert(elt);
413  break;
414  case EXTMEM_DEBUG:
415  dim->insert(elt);
416  v = em->insert(elt);
417  assert(dim->size() == em->size());
418  break;
419  }
420  return v;
421 }
422 
423 template<class T, class Key>
424 void
426  assert(size() == 0);
427  switch(regim) {
428  case INMEM:
429  makeExternal();
430  break;
431  case EXTMEM:
432  break;
433  case EXTMEM_DEBUG:
434  assert(0);
435  break;
436  }
437  dim = new UnboundedMinMaxHeap<T>();
438  regim = EXTMEM_DEBUG;
439 }
440 
441 
442 
443 template<class T>
444 class baseCmpType {
445 public:
446  static int compare(const T& x, const T& y) {
447  return (x < y ? -1 : (x > y ? 1 : 0));
448  }
449 
450 };
451 
452 /* switch over to using an external priority queue */
453 template<class T, class Key>
454 void
456  AMI_err ae;
457 #ifndef NDEBUG
458  long sizeCheck;
459  sizeCheck = size();
460 #endif
461 
462  assert(regim == INMEM);
463  regim = EXTMEM;
464 
465  EMPQAD_DEBUG cout << endl
466  << "EMPQUEUEADAPTIVE: memory full: "
467  << "switching to external-memory pqueue " << endl;
468 
469  //create an AMI_stream and write in it biggest half elts of im;
470  AMI_STREAM<T> *amis0 = new AMI_STREAM<T>();
471  AMI_STREAM<T> *amis1 = NULL;
472  assert(amis0);
473  unsigned long pqsize = im->size();
474  //assert(im->size() == im->get_maxsize());
475  T x;
476  for (unsigned long i=0; i< pqsize/2; i++) {
477  int z = im->extract_max(x);
478  assert(z);
479  ae = amis0->write_item(x);
480  assert(ae == AMI_ERROR_NO_ERROR);
481  }
482  assert(amis0->stream_len() == pqsize/2);
483  EMPQAD_DEBUG { cout << "written " << pqsize/2
484  << " elts to stream\n"; cout.flush(); }
485 
486  assert(im->size() == pqsize/2 + (pqsize % 2));
487 
489 
490  //sort the stream
491  baseCmpType<T> fun;
492  AMI_sort(amis0, &amis1, &fun); //XXX laura: replaced this to use a cmp obj
493  assert(amis1);
494  delete amis0;
495  EMPQAD_DEBUG { cout << "sorted the stream\n"; cout.flush(); }
496 
498 
499  //set im to NULL and initialize em from im and amis1
500  em = new em_pqueue<T,Key>(im, amis1);
501  im = NULL;
502  assert(em);
503  EMPQAD_DEBUG { cout << "empq initialized from im\n"; cout.flush(); }
504  EMPQAD_DEBUG {em->print_size();}
505 
507 #ifndef NDEBUG
508  assert(sizeCheck == size());
509 #endif
510 }
511 
512 #endif
static int compare(const T &x, const T &y)
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition: ami_stream.h:494
size_t getAvailableMemory()
Definition: mm_utils.cpp:54
bool extract_min(T &elt)
AMI_err write_item(const T &elt)
Definition: ami_stream.h:606
void LOG_avail_memo()
Definition: mm_utils.cpp:47
#define NULL
Definition: ccmath.h:32
#define x
#define MAX_STREAMS_OPEN
Definition: ami_stream.h:65
#define assert(condition)
Definition: lz4.c:324
bool insert(const T &elt)
bool extract_all_min(T &elt)
AMI_err
Definition: ami_stream.h:86
off_t stream_len(void)
Definition: ami_stream.h:388
AMI_err AMI_sort(AMI_STREAM< T > *instream, AMI_STREAM< T > **outstream, Compare *cmp, int deleteInputStream=0)
Definition: ami_sort.h:95
#define EMPQAD_DEBUG
Definition: empq_adaptive.h:47