GRASS 8 Programmer's Manual 8.6.0dev(2026)-ddeab64dbf
Loading...
Searching...
No Matches
empq_adaptive_impl.h
Go to the documentation of this file.
1/****************************************************************************
2 *
3 * MODULE: iostream
4 *
5
6 * COPYRIGHT (C) 2007 Laura Toma
7 *
8 *
9
10 * Iostream is a library that implements streams, external memory
11 * sorting on streams, and an external memory priority queue on
12 * streams. These are the fundamental components used in external
13 * memory algorithms.
14
15 * Credits: The library was developed by Laura Toma. The kernel of
16 * class STREAM is based on the similar class existent in the GPL TPIE
17 * project developed at Duke University. The sorting and priority
18 * queue have been developed by Laura Toma based on communications
19 * with Rajiv Wickremesinghe. The library was developed as part of
20 * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21 * Rajiv Wickremesinghe as part of the Terracost project.
22
23 *
24 * This program is free software; you can redistribute it and/or modify
25 * it under the terms of the GNU General Public License as published by
26 * the Free Software Foundation; either version 2 of the License, or
27 * (at your option) any later version.
28 *
29
30 * This program is distributed in the hope that it will be useful,
31 * but WITHOUT ANY WARRANTY; without even the implied warranty of
32 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33 * General Public License for more details. *
34 * **************************************************************************/
35
36#ifndef __EMPQ_ADAPTIVE_IMPL_H
37#define __EMPQ_ADAPTIVE_IMPL_H
38
39#include <stdio.h>
40#include <assert.h>
41
42#include "ami_config.h"
43#include "ami_stream.h"
44#include "mm.h"
45#include "mm_utils.h"
46#include "empq_adaptive.h"
47
48#include "ami_sort.h"
49
50// EMPQAD_DEBUG defined in "empqAdaptive.H"
51
52//------------------------------------------------------------
53// allocate an internal pqueue of size precisely twice
54// the size of the pqueue within the em_pqueue;
55//
56// This constructor uses a user defined amount of memory
57
58template <class T, class Key>
60{
61 regim = INMEM;
62 EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue" << endl;
63
64 //------------------------------------------------------------
65 // set the size precisely as in empq constructor since we cannot
66 // really call the em__pqueue constructor, because we don't want
67 // the space allocated; we just want the sizes;
68 // AMI_err ae;
69 EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: "
70 << ((float)inMem / (1 << 20)) << "MB" << endl;
71
72 initPQ(inMem);
73}
74
75//------------------------------------------------------------
76// This more resembles the original constructor which is greedy
77template <class T, class Key>
79{
80 regim = INMEM;
81 EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: starting in-memory pqueue" << endl;
82
83 //------------------------------------------------------------
84 // set the size precisely as in empq constructor since we cannot
85 // really call the em__pqueue constructor, because we don't want
86 // the space allocated; we just want the sizes;
88 EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: available memory: "
89 << ((float)mm_avail / (1 << 20)) << "MB" << endl;
90
91 initPQ(mm_avail);
92}
93
94//------------------------------------------------------------
95// This method initialized the PQ based on the memory passed
96// into it
97template <class T, class Key>
99{
100 AMI_err ae;
101 EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: desired memory: "
102 << ((float)initMem / (1 << 20)) << "MB" << endl;
103
104 /* same calculations as empq constructor in order to estimate
105 overhead memory; this is because we want to allocate a pqueue of
106 size exactly double the size of the pqueue inside the empq;
107 switching from this pqueue to empq when the memory fills up will
108 be simple */
109 //------------------------------------------------------------
110 // AMI_STREAM memory usage
111 size_t sz_stream;
112 AMI_STREAM<T> dummy;
115 cerr << "EMPQueueAdaptive constr: failing to get stream_usage\n";
116 exit(1);
117 }
118
119 // account for temporary memory usage
120 unsigned short max_nbuf = 2;
121 unsigned int buf_arity = initMem / (2 * sz_stream);
122 if (buf_arity > MAX_STREAMS_OPEN)
123 buf_arity = MAX_STREAMS_OPEN;
124 unsigned long mm_overhead = buf_arity * sizeof(merge_key<Key>) +
125 max_nbuf * sizeof(em_buffer<T, Key>) +
126 2 * sz_stream + max_nbuf * sz_stream;
127 mm_overhead *= 8; // overestimate..this should be fixed with
128 // a precise accounting of the extra memory required
129
130 EMPQAD_DEBUG cout << "sz_stream: " << sz_stream
131 << " buf_arity: " << buf_arity
132 << " mm_overhead: " << mm_overhead
133 << " mm_avail: " << initMem << ".\n";
134
135 EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: memory overhead set to "
136 << ((float)mm_overhead / (1 << 20)) << "MB" << endl;
137 if (mm_overhead > initMem) {
138 cerr << "overhead bigger than available memory (" << initMem << "); "
139 << "increase -m and try again\n";
140 exit(1);
141 }
143 //------------------------------------------------------------
144
145 long pqsize = initMem / sizeof(T);
146 EMPQAD_DEBUG cout << "EMPQUEUEADAPTIVE: pqsize set to " << pqsize << endl;
147
148 // initialize in memory pqueue and set em to NULL
149 im = new MinMaxHeap<T>(pqsize);
150 assert(im);
151 em = NULL;
152}
153
154template <class T, class Key>
156{
157 switch (regim) {
158 case INMEM:
159 delete im;
160 break;
161 case EXTMEM:
162 delete em;
163 break;
164 case EXTMEM_DEBUG:
165 delete dim;
166 delete em;
167 break;
168 }
169}
170
171// return the maximum nb of elts that can fit
172template <class T, class Key>
174{
175 long m = -1;
176 switch (regim) {
177 case INMEM:
178 assert(im);
179 m = im->get_maxsize();
180 break;
181 case EXTMEM:
182 assert(em);
183 m = em->maxlen();
184 break;
185 case EXTMEM_DEBUG:
186 m = em->maxlen();
187 break;
188 }
189 return m;
190}
191
192// return true if empty
193template <class T, class Key>
195{
196 bool v = false;
197 switch (regim) {
198 case INMEM:
199 assert(im);
200 v = im->empty();
201 break;
202 case EXTMEM:
203 assert(em);
204 v = em->is_empty();
205 break;
206 case EXTMEM_DEBUG:
207 assert(dim->empty() == em->is_empty());
208 v = em->is_empty();
209 break;
210 }
211 return v;
212}
213
214// return true if full
215template <class T, class Key>
217{
218 cerr << "EMPQueueAdaptive::is_full(): sorry not implemented\n";
219 assert(0);
220 exit(1);
221}
222
223// return the element with minimum priority in the structure
224template <class T, class Key>
226{
227 bool v = false, v1;
228 T tmp;
229 switch (regim) {
230 case INMEM:
231 assert(im);
232 v = im->min(elt);
233 break;
234 case EXTMEM:
235 assert(em);
236 v = em->min(elt);
237 break;
238 case EXTMEM_DEBUG:
239 v1 = dim->min(tmp);
240 v = em->min(elt);
241 // dim->verify();
242 if (!(tmp == elt)) {
243 cerr << "------------------------------" << endl;
244 cerr << dim << endl;
245 cerr << "------------------------------" << endl;
246 em->print();
247 cerr << "------------------------------" << endl;
248 cerr << "tmp=" << tmp << endl;
249 cerr << "elt=" << elt << endl;
250 cerr << "------------------------------" << endl;
251 dim->destructiveVerify();
252 }
253 assert(v == v1);
254 assert(tmp == elt);
255 break;
256 }
257 return v;
258}
259
260/* switch over to using an external priority queue */
261template <class T, class Key>
263{
264 switch (regim) {
265 case INMEM:
266 im->clear();
267 break;
268 case EXTMEM:
269 em->clear();
270 break;
271 case EXTMEM_DEBUG:
272 dim->clear();
273 break;
274 }
275}
276
277template <class T, class Key>
279{
280 switch (regim) {
281 case INMEM:
282 im->verify();
283 break;
284 case EXTMEM:
285 break;
286 case EXTMEM_DEBUG:
287 dim->verify();
288 break;
289 }
290}
291
292// extract all elts with min key, add them and return their sum
293template <class T, class Key>
295{
296 bool v = false, v1;
297 T tmp;
298 switch (regim) {
299 case INMEM:
300 assert(im);
301 v = im->extract_all_min(elt);
302 break;
303 case EXTMEM:
304 assert(em);
305 v = em->extract_all_min(elt);
306 break;
307 case EXTMEM_DEBUG:
308 v1 = dim->extract_all_min(tmp);
309 v = em->extract_all_min(elt);
310 assert(dim->size() == em->size());
311 assert(v == v1);
312 assert(tmp == elt);
313 break;
314 }
315 return v;
316}
317
318// return the nb of elements in the structure
319template <class T, class Key>
321{
322 long v = 0, v1;
323 switch (regim) {
324 case INMEM:
325 assert(im);
326 v = im->size();
327 break;
328 case EXTMEM:
329 assert(em);
330 v = em->size();
331 break;
332 case EXTMEM_DEBUG:
333 v1 = dim->size();
334 v = em->size();
335 assert(v == v1);
336 break;
337 }
338 return v;
339}
340
341// ----------------------------------------------------------------------
342template <class T, class Key>
344{
345 bool v = false, v1;
346 T tmp;
347 switch (regim) {
348 case INMEM:
349 assert(im);
350 v = im->extract_min(elt);
351 break;
352 case EXTMEM:
353 assert(em);
354 v = em->extract_min(elt);
355 break;
356 case EXTMEM_DEBUG:
357 v1 = dim->extract_min(tmp);
358 v = em->extract_min(elt);
359 assert(v == v1);
360 assert(tmp == elt);
361 assert(dim->size() == em->size());
362 break;
363 }
364 return v;
365}
366
367//------------------------------------------------------------
368/* insert an element; if regim == INMEM, try insert it in im, and if
369 it is full, extract_max pqsize/2 elements of im into a stream,
370 switch to EXTMEM and insert the stream into em; if regim is
371 EXTMEM, insert in em; */
372template <class T, class Key>
374{
375 bool v = false;
376 switch (regim) {
377 case INMEM:
378 if (!im->full()) {
379 im->insert(elt);
380 v = true;
381 }
382 else {
383 makeExternal();
384 v = em->insert(elt); // insert the element
385 }
386 break;
387 case EXTMEM:
388 v = em->insert(elt);
389 break;
390 case EXTMEM_DEBUG:
391 dim->insert(elt);
392 v = em->insert(elt);
393 assert(dim->size() == em->size());
394 break;
395 }
396 return v;
397}
398
399template <class T, class Key>
401{
402 assert(size() == 0);
403 switch (regim) {
404 case INMEM:
405 makeExternal();
406 break;
407 case EXTMEM:
408 break;
409 case EXTMEM_DEBUG:
410 assert(0);
411 break;
412 }
413 dim = new UnboundedMinMaxHeap<T>();
414 regim = EXTMEM_DEBUG;
415}
416
417template <class T>
419public:
420 static int compare(const T &x, const T &y)
421 {
422 return (x < y ? -1 : (x > y ? 1 : 0));
423 }
424};
425
426/* switch over to using an external priority queue */
427template <class T, class Key>
429{
430 AMI_err ae;
431#ifndef NDEBUG
432 long sizeCheck;
433 sizeCheck = size();
434#endif
435
436 assert(regim == INMEM);
437 regim = EXTMEM;
438
439 EMPQAD_DEBUG cout << endl
440 << "EMPQUEUEADAPTIVE: memory full: "
441 << "switching to external-memory pqueue " << endl;
442
443 // create an AMI_stream and write in it biggest half elts of im;
446 assert(amis0);
447 unsigned long pqsize = im->size();
448 // assert(im->size() == im->get_maxsize());
449 T x;
450 for (unsigned long i = 0; i < pqsize / 2; i++) {
451 int z = im->extract_max(x);
452 assert(z);
453 ae = amis0->write_item(x);
455 }
456 assert(amis0->stream_len() == pqsize / 2);
458 {
459 cout << "written " << pqsize / 2 << " elts to stream\n";
460 cout.flush();
461 }
462
463 assert(im->size() == pqsize / 2 + (pqsize % 2));
464
466
467 // sort the stream
469 AMI_sort(amis0, &amis1, &fun); // XXX laura: replaced this to use a cmp obj
470 assert(amis1);
471 delete amis0;
473 {
474 cout << "sorted the stream\n";
475 cout.flush();
476 }
477
479
480 // set im to NULL and initialize em from im and amis1
481 em = new em_pqueue<T, Key>(im, amis1);
482 im = NULL;
483 assert(em);
485 {
486 cout << "empq initialized from im\n";
487 cout.flush();
488 }
490 {
491 em->print_size();
492 }
493
495#ifndef NDEBUG
496 assert(sizeCheck == size());
497#endif
498}
499
500#endif
AMI_err AMI_sort(AMI_STREAM< T > *instream, AMI_STREAM< T > **outstream, Compare *cmp, int deleteInputStream=0)
Definition ami_sort.h:85
#define MAX_STREAMS_OPEN
Definition ami_stream.h:63
AMI_err
Definition ami_stream.h:83
@ AMI_ERROR_NO_ERROR
Definition ami_stream.h:84
#define NULL
Definition ccmath.h:32
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition ami_stream.h:477
AMI_err write_item(const T &elt)
Definition ami_stream.h:588
off_t stream_len(void)
Definition ami_stream.h:375
bool extract_all_min(T &elt)
bool insert(const T &elt)
static int compare(const T &x, const T &y)
@ EXTMEM_DEBUG
@ EXTMEM
@ INMEM
#define EMPQAD_DEBUG
#define assert(condition)
Definition lz4.c:291
@ MM_STREAM_USAGE_MAXIMUM
Definition mm.h:79
void LOG_avail_memo()
Definition mm_utils.cpp:45
size_t getAvailableMemory()
Definition mm_utils.cpp:52
#define x