GRASS 8 Programmer's Manual 8.6.0dev(2026)-ddeab64dbf
Loading...
Searching...
No Matches
empq.h
Go to the documentation of this file.
1/****************************************************************************
2 *
3 * MODULE: iostream
4 *
5
6 * COPYRIGHT (C) 2007 Laura Toma
7 *
8 *
9
10 * Iostream is a library that implements streams, external memory
11 * sorting on streams, and an external memory priority queue on
12 * streams. These are the fundamental components used in external
13 * memory algorithms.
14
15 * Credits: The library was developed by Laura Toma. The kernel of
16 * class STREAM is based on the similar class existent in the GPL TPIE
17 * project developed at Duke University. The sorting and priority
18 * queue have been developed by Laura Toma based on communications
19 * with Rajiv Wickremesinghe. The library was developed as part of
20 * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21 * Rajiv Wickremesinghe as part of the Terracost project.
22
23 *
24 * This program is free software; you can redistribute it and/or modify
25 * it under the terms of the GNU General Public License as published by
26 * the Free Software Foundation; either version 2 of the License, or
27 * (at your option) any later version.
28 *
29
30 * This program is distributed in the hope that it will be useful,
31 * but WITHOUT ANY WARRANTY; without even the implied warranty of
32 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33 * General Public License for more details. *
34 * **************************************************************************/
35
36#ifndef __EMPQ_H
37#define __EMPQ_H
38
39#include <stdio.h>
40#include <assert.h>
41
42#include "ami_config.h" //for SAVE_MEMORY
43#include "ami_stream.h"
44#include "mm.h"
45#include "mm_utils.h" //for MEMORY_LOG, getAvailableMemory
46#include "imbuffer.h"
47#include "embuffer.h"
48#include "pqheap.h"
49#include "minmaxheap.h"
50
51template <class T, class Key>
53#define ExtendedMergeStream AMI_STREAM<ExtendedEltMergeType<T, Key>>
54
55/**********************************************************
56 DEBUGGING FLAGS
57***********************************************************/
58
59// enables printing messages when buffers are emptied
60// #define EMPQ_EMPTY_BUF_PRINT
61
62// enables printing when pq gets filled from buffers
63// #define EMPQ_PQ_FILL_PRINT
64
65// enables printing inserts
66// #define EMPQ_PRINT_INSERT
67
68// enables printing deletes
69// #define EMPQ_PRINT_EXTRACTALL
70
71// enables printing the empq on insert/extract_all_min
72// #define EMPQ_PRINT_EMPQ
73
74// enable printing the size of the EMPQ and nb of active streams
75// on fillpq() amd on empty_buff_0
76// #define EMPQ_PRINT_SIZE
77
78// enable printing 'fill pq from B0' in extract_min()
79// #define EMPQ_PRINT_FILLPQ_FROM_BUFF0
80
81// enable expensive size asserts
82// #define EMPQ_ASSERT_EXPENSIVE
83
84/**********************************************************/
85
86/* external memory priority queue
87
88 Functionality:
89
90 Keep a pqueue PQ of size \theta(M) in memory. Keep a buffer B0 of
91 size \theta(M) in memory. keep an array of external-memory
92 buffers, one for each level 1..log_m{n/m} (where N is the maximum
93 number of items in pqueue at any time).
94
95 invariants:
96 1. PQ contains the smallest items in the structure.
97
98 2. each stream of any external memory buffers is sorted in
99 increasing order.
100
101 insert(x): if (x < maximum_item(PQ) exchange x with
102 maximum_item(PQ); if buffer B0 is full, empty it; insert x in B0;
103
104 extract_min():
105
106 analysis:
107
108 1. inserts: once the buffer B0 is empty, the next sizeof(B0)
109 inserts are free; one insert can cause many I/Os if cascading
110 emptying of external buffers Bi occurs. Emptying level-i buffer
111 costs <arity>^i*sizeof(B0)/B I/Os and occurs every
112 N/<arity>^i*sizeof(B0) inserts (or less, if deletes too). It can be
113 proved that the amortized time of 1 insert is 1/B*maxnb_buffers.
114*/
115
116/*
117T is assumed to be a class for which getPriority() and getValue()
118are implemented; for simplicity it is assumed that the comparison
119operators have been overloaded on T such that
120x < y <==> x.getPriority() < y.getPriority()
121*/
122
123template <class T, class Key>
125
126private:
127 // in memory priority queue
128 MinMaxHeap<T> *pq;
129
130 // pqueue size
131 unsigned long pqsize;
132
133 // in-memory buffer
134 im_buffer<T> *buff_0;
135
136 // in-memory buffer size
137 unsigned long bufsize;
138
139 // external memory buffers
140 em_buffer<T, Key> **buff;
141
142 /* number of external memory buffers statically allocated in the
143 beginning; since the number of buffers needed is \log_m{n/m}, we
144 cannot know it in advance; estimate it roughly and then reallocate
145 it dynamically on request;
146
147 TO DO: dynamic reallocation with a bigger nb of external buffer
148 if structure becomes full */
149 unsigned short max_nbuf;
150
151 // index of next external buffer entry available for use (i.e. is NULL)
152 unsigned short crt_buf;
153
154 // external buffer arity
155 unsigned int buf_arity;
156
157public:
158 // create an em_pqueue of specified size
159 em_pqueue(long pq_sz, long buf_sz, unsigned short nb_buf,
160 unsigned int buf_ar);
161
162 // create an em_pqueue capable to store <= N elements
163 em_pqueue();
164 em_pqueue(long N UNUSED) { em_pqueue(); }; // N not used
165
166#ifdef SAVE_MEMORY
167 // create an empq, initialize its pq with im and insert amis in
168 // buff[0]; im should not be used/deleted after that outside empq
170#endif
171
172 // copy constructor
174
175 // clean up
176 ~em_pqueue();
177
178 // return the nb of elements in the structure
179 unsigned long size();
180
181 // return true if empty
182 bool is_empty();
183
184 // return true if full
185 bool is_full()
186 {
187 cout << "em_pqueue::is_full(): sorry not implemented\n";
188 exit(1);
189 }
190
191 // return the element with minimum priority in the structure
192 bool min(T &elt);
193
194 // delete the element with minimum priority in the structure;
195 // return false if pq is empty
196 bool extract_min(T &elt);
197
198 // extract all elts with min key, add them and return their sum
199 bool extract_all_min(T &elt);
200
201 // insert an element; return false if insertion fails
202 bool insert(const T &elt);
203
204 // return maximum capacity of i-th external buffer
205 long maxlen(unsigned short i);
206
207 // return maximum capacity of em_pqueue
208 long maxlen();
209
210 // delete all the data in the pq; reset to empty but don't free memory
211 void clear();
212
213 // print structure
214 void print_range();
215
216 void print();
217
218 // print the detailed size of empq (pq, buf_0, buff[i])
219 void print_size();
220
221 friend ostream &operator<<(ostream &s, const em_pqueue &empq)
222 {
223 s << "EM_PQ: pq size=" << empq.pqsize
224 << ", buff_0 size=" << empq.bufsize << ", ext_bufs=" << empq.crt_buf
225 << "(max " << empq.max_nbuf << ")\n";
226 s << "IN_MEMORY PQ: \n" << *(empq.pq) << "\n";
227 s << "IN_MEMORY BUFFER: \n" << *(empq.buff_0) << "\n";
228 for (unsigned short i = 0; i < empq.crt_buf; i++) {
229 // s << "EM_BUFFER " << i << ":\n" ;
230 s << *(empq.buff[i]);
231 }
232 return s;
233 }
234
235protected:
236 // return the nb of active streams in the buffer
238 {
239 int totstr = 0;
240 for (unsigned short i = 0; i < crt_buf; i++) {
241 totstr += buff[i]->get_nbstreams();
242 }
243 return totstr;
244 }
245
246 // called when buff_0 is full to empty it on external level_1 buffer;
247 // can produce cascading emptying
248 bool empty_buff_0();
249
250 // sort and empty buffer i into buffer (i+1) recursively;
251 // called recursively by empty_buff_0() to empty subsequent buffers
252 // i must be a valid (i<crt_buf) full buffer
253 void empty_buff(unsigned short i);
254
255 /* merge the first <K> elements of the streams of input buffer,
256 starting at position <buf.deleted[i]> in each stream; there are
257 <buf.arity> streams in total; write output in <outstream>; the
258 items written in outstream are of type <merge_output_type> which
259 extends T with the stream nb and buffer nb the item comes from;
260 this information is needed later to distribute items back; do not
261 delete the K merged elements from the input streams; <bufid> is the
262 id of the buffer whose streams are being merged;
263
264 the input streams are assumed sorted in increasing order of keys; */
266 long K);
267
268 /* merge the first <K> elements of the input streams; there are
269 <arity> streams in total; write output in <outstream>;
270
271 the input streams are assumed sorted in increasing order of their
272 keys; */
273 AMI_err merge_streams(ExtendedMergeStream **instr, unsigned short arity,
275
276 // deletes one element from <buffer, stream>
277 void delete_str_elt(unsigned short buf_id, unsigned int stream_id);
278
279 /* copy the minstream in the internal pqueue while merging with
280 buff_0; the smallest <pqsize> elements between minstream and
281 buff_0 will be inserted in internal pqueue; also, the elements
282 from minstram which are inserted in pqueue must be marked as
283 deleted in the source streams; */
285
286 // clean buffers in case some streams have been emptied
287 void cleanup();
288
289 // called when pq must be filled from external buffers
290 bool fillpq();
291
292 // print the nb of elements in each stream
293 void print_stream_sizes();
294};
295
296#endif
AMI_err
Definition ami_stream.h:83
void clear()
Definition empq_impl.h:1482
em_pqueue(const em_pqueue &ep)
void print_stream_sizes()
Definition empq_impl.h:1568
void print()
Definition empq_impl.h:1523
void delete_str_elt(unsigned short buf_id, unsigned int stream_id)
Definition empq_impl.h:954
bool fillpq()
Definition empq_impl.h:574
void print_range()
Definition empq_impl.h:1498
bool insert(const T &elt)
Definition empq_impl.h:1000
AMI_err merge_buffer(em_buffer< T, Key > *buf, AMI_STREAM< ExtendedEltMergeType< T, Key > > *outstr, long K)
Definition empq_impl.h:1257
bool extract_min(T &elt)
Definition empq_impl.h:727
bool is_full()
Definition empq.h:185
long maxlen()
Definition empq_impl.h:538
friend ostream & operator<<(ostream &s, const em_pqueue &empq)
Definition empq.h:221
AMI_err merge_streams(AMI_STREAM< ExtendedEltMergeType< T, Key > > **instr, unsigned short arity, AMI_STREAM< ExtendedEltMergeType< T, Key > > *outstr, long K)
Definition empq_impl.h:1380
unsigned long size()
Definition empq_impl.h:550
void empty_buff(unsigned short i)
Definition empq_impl.h:1155
bool extract_all_min(T &elt)
Definition empq_impl.h:802
void cleanup()
Definition empq_impl.h:968
void merge_bufs2pq(AMI_STREAM< ExtendedEltMergeType< T, Key > > *minstream)
Definition empq_impl.h:851
int active_streams()
Definition empq.h:237
bool is_empty()
Definition empq_impl.h:563
void print_size()
Definition empq_impl.h:1546
bool empty_buff_0()
Definition empq_impl.h:1086
em_pqueue(long N)
Definition empq.h:164
#define min(x, y)
Definition draw2.c:29
#define N
#define ExtendedMergeStream
Definition empq.h:53
#define UNUSED
A macro for an attribute, if attached to a variable, indicating that the variable is not used.
Definition gis.h:46