GRASS 8 Programmer's Manual 8.6.0dev(2026)-ddeab64dbf
Loading...
Searching...
No Matches
ami_sort_impl.h
Go to the documentation of this file.
1/****************************************************************************
2 *
3 * MODULE: iostream
4 *
5
6 * COPYRIGHT (C) 2007 Laura Toma
7 *
8 *
9
10 * Iostream is a library that implements streams, external memory
11 * sorting on streams, and an external memory priority queue on
12 * streams. These are the fundamental components used in external
13 * memory algorithms.
14
15 * Credits: The library was developed by Laura Toma. The kernel of
16 * class STREAM is based on the similar class existent in the GPL TPIE
17 * project developed at Duke University. The sorting and priority
18 * queue have been developed by Laura Toma based on communications
19 * with Rajiv Wickremesinghe. The library was developed as part of
20 * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21 * Rajiv Wickremesinghe as part of the Terracost project.
22
23 *
24 * This program is free software; you can redistribute it and/or modify
25 * it under the terms of the GNU General Public License as published by
26 * the Free Software Foundation; either version 2 of the License, or
27 * (at your option) any later version.
28 *
29
30 * This program is distributed in the hope that it will be useful,
31 * but WITHOUT ANY WARRANTY; without even the implied warranty of
32 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33 * General Public License for more details. *
34 * **************************************************************************/
35
36#ifndef AMI_SORT_IMPL_H
37#define AMI_SORT_IMPL_H
38
39#include "ami_stream.h"
40#include "mem_stream.h"
41#include "mm.h"
42#include "quicksort.h"
43#include "queue.h"
44#include "replacementHeap.h"
46
47#define SDEBUG if (0)
48
49/* if this flag is defined, a run will be split into blocks, each
50 block sorted and then all blocks merged */
51#define BLOCKED_RUN
52
53/* ---------------------------------------------------------------------- */
54// set run_size, last_run_size and nb_runs depending on how much memory
55// is available
56template <class T>
57static void initializeRunFormation(AMI_STREAM<T> *instream, size_t &run_size,
58 size_t &last_run_size, unsigned int &nb_runs)
59{
60
63
64#ifdef BLOCKED_RUN
65 // not in place, can only use half memory
66 mm_avail = mm_avail / 2;
67#endif
68 run_size = mm_avail / sizeof(T);
69
71 if (strlen == 0) {
73 }
74 else {
75 if (strlen % run_size == 0) {
78 }
79 else {
80 nb_runs = strlen / run_size + 1;
82 }
83 }
84
85 SDEBUG cout << "nb_runs=" << nb_runs << ", run_size=" << run_size
86 << ", last_run_size=" << last_run_size << "\n";
87}
88
89/* ---------------------------------------------------------------------- */
90/* data is allocated; read run_size elements from stream into data and
91 sort them using quicksort */
92template <class T, class Compare>
93size_t makeRun_Block(AMI_STREAM<T> *instream, T *data, unsigned int run_size,
94 Compare *cmp)
95{
98
99 // read next run from input stream
102
103 // sort it in memory in place
104 quicksort(data, new_run_size, *cmp);
105
106 return new_run_size;
107}
108
109/* ---------------------------------------------------------------------- */
110/* data is allocated; read run_size elements from stream into data and
111 sort them using quicksort; instead of reading the whole chunk at
112 once, it reads it in blocks, sorts each block and then merges the
113 blocks together. Note: it is not in place! it allocates another
114 array of same size as data, writes the sorted run into it and
115 deletes data, and replaces data with outdata */
116template <class T, class Compare>
117void makeRun(AMI_STREAM<T> *instream, T *&data, int run_size, Compare *cmp)
118{
119
121
123
124 if (run_size % block_size == 0) {
127 }
128 else {
131 }
132
133 // create queue of blocks waiting to be merged
135 MEM_STREAM<T> *str;
137 for (unsigned int i = 0; i < nblocks; i++) {
139 makeRun_Block(instream, &(data[i * block_size]), crt_block_size, cmp);
140 str = new MEM_STREAM<T>(&(data[i * block_size]), crt_block_size);
141 blockList->enqueue(str);
142 }
143 assert(blockList->length() == nblocks);
144
145 // now data consists of sorted blocks: merge them
147 SDEBUG rheap.print(cerr);
148 int i = 0;
149 T elt;
150 T *outdata = new T[run_size];
151 while (!rheap.empty()) {
152 elt = rheap.extract_min();
153 outdata[i] = elt;
154 // SDEBUG cerr << "makeRun: written " << elt << endl;
155 i++;
156 }
157 assert(i == run_size && blockList->length() == 0);
158 delete blockList;
159
160 T *tmp = data;
161 delete[] tmp;
162 data = outdata;
163}
164
165/* ---------------------------------------------------------------------- */
166
167// partition instream in streams that fit in main memory, sort each
168// stream, remember its name, make it persistent and store it on
169// disk. if entire stream fits in memory, sort it and store it and
170// return it.
171
172// assume instream is allocated prior to the call.
173// set nb_runs and allocate runNames.
174
175// The comparison object "cmp", of (user-defined) class represented by
176// Compare, must have a member function called "compare" which is used
177// for sorting the input stream.
178
179template <class T, class Compare>
181{
182
184 unsigned int nb_runs;
186 T *data;
187 AMI_STREAM<T> *str;
188 char *strname;
189
190 assert(instream && cmp);
191 SDEBUG cout << "runFormation: ";
193
194 /* leave this in for now, in case some file-based implementations do
195 anything funny... -RW */
196 // rewind file
197 instream->seek(0); // should check error xxx
198
199 // estimate run_size, last_run_size and nb_runs
200 initializeRunFormation(instream, run_size, last_run_size, nb_runs);
201
202 // create runList (if 0 size, queue uses default)
204
205 /* allocate space for a run */
206 if (nb_runs <= 1) {
207 // don't waste space if input stream is smaller than run_size
208 data = new T[last_run_size];
209 }
210 else {
211 data = new T[run_size];
212 }
214
215 for (size_t i = 0; i < nb_runs; i++) {
216 // while(!instream->eof()) {
218
219 SDEBUG cout << "i=" << i << ": runsize=" << crt_run_size << ", ";
220
221 // crt_run_size = makeRun_Block(instream, data, run_size, cmp);
222#ifdef BLOCKED_RUN
223 makeRun(instream, data, crt_run_size, cmp);
224#else
226#endif
227
229
230 // read next run from input stream
231 // err = instream->read_array(data, crt_run_size);
232 // assert(err == AMI_ERROR_NO_ERROR);
233 // sort it in memory in place
234 // quicksort(data, crt_run_size, *cmp);
235
236 if (crt_run_size > 0) {
237 // create a new stream to hold this run
238 str = new AMI_STREAM<T>();
239 str->write_array(data, crt_run_size);
240 assert(str->stream_len() == crt_run_size);
241
242 // remember this run's name
243 str->name(&strname); /* deleted after we dequeue */
244 runList->enqueue(strname);
245 // delete the stream -- should not keep too many streams open
247 delete str;
248 }
249 }
251 // release the run memory!
252 delete[] data;
253
254 SDEBUG cout << "runFormation: done.\n";
256
257 return runList;
258}
259
260/* ---------------------------------------------------------------------- */
261
262// this is one pass of merge; estimate max possible merge arity <ar>
263// and merge the first <ar> streams from runList ; create and return
264// the resulting stream (does not add it to the queue -- the calling
265// function will do that)
266
267// input streams are assumed to be sorted, and are not necessarily of
268// the same length.
269
270// streamList does not contains streams, but names of streams, which
271// must be opened in order to be merged
272
273// The comparison object "cmp", of (user-defined) class represented by
274// Compare, must have a member function called "compare" which is used
275// for sorting the input stream.
276
277template <class T, class Compare>
279{
281 size_t mm_avail, blocksize;
282 unsigned int arity, max_arity;
283 T elt;
284
285 assert(streamList && cmp);
286
287 SDEBUG cout << "singleMerge: ";
288
289 // estimate max possible merge arity with available memory (approx M/B)
291 // blocksize = getpagesize();
292 // should use AMI function, but there's no stream at this point
293 // now use static mtd -RW 5/05
296 if (max_arity < 2) {
297 cerr << __FILE__ ":" << __LINE__
298 << ": OUT OF MEMORY in singleMerge (going over limit)" << endl;
299 max_arity = 2;
300 }
301 else if (max_arity > MAX_STREAMS_OPEN) {
303 }
304 arity =
305 (streamList->length() < max_arity) ? streamList->length() : max_arity;
306
307 SDEBUG cout << "arity=" << arity << " (max_arity=" << max_arity << ")\n";
308
309 /* create the output stream. if this is a complete merge, use finalpath */
310 // create output stream
311 mergedStr = new AMI_STREAM<T>();
312
314 SDEBUG rheap.print(cerr);
315
316 while (!rheap.empty()) {
317 // mergedStr->write_item( rheap.extract_min() );
318 // xxx should check error here
319 elt = rheap.extract_min();
320 mergedStr->write_item(elt);
321 // SDEBUG cerr << "smerge: written " << elt << endl;
322 }
323
324 SDEBUG cout << "..done\n";
325
326 return mergedStr;
327}
328
329/* ---------------------------------------------------------------------- */
330
331// merge runs whose names are given by runList; this may entail
332// multiple passes of singleMerge();
333
334// return the resulting output stream
335
336// input streams are assumed to be sorted, and are not necessarily of
337// the same length.
338
339// The comparison object "cmp", of (user-defined) class represented by
340// Compare, must have a member function called "compare" which is used
341// for sorting the input stream.
342
343template <class T, class Compare>
345{
347 char *path;
348
349 assert(runList && runList->length() > 1 && cmp);
350
351 SDEBUG cout << "multiMerge: " << runList->length() << " runs" << endl;
352
353 while (runList->length() > 1) {
354
355 // merge streams from streamlist into mergedStr
357 // i thought the templates are not needed in the call, but seems to
358 // help the compiler..laura
360
361 // if more runs exist, delete this stream and add it to list
362 if (runList->length() > 0) {
364 runList->enqueue(path);
366 delete mergedStr;
367 }
368 }
369
370 assert(runList->length() == 0);
372 return mergedStr;
373}
374
375#endif
AMI_STREAM< T > * singleMerge(queue< char * > *streamList, Compare *cmp)
size_t makeRun_Block(AMI_STREAM< T > *instream, T *data, unsigned int run_size, Compare *cmp)
#define SDEBUG
void makeRun(AMI_STREAM< T > *instream, T *&data, int run_size, Compare *cmp)
AMI_STREAM< T > * multiMerge(queue< char * > *runList, Compare *cmp)
queue< char * > * runFormation(AMI_STREAM< T > *instream, Compare *cmp)
#define STREAM_BUFFER_SIZE
Definition ami_stream.h:78
#define MAX_STREAMS_OPEN
Definition ami_stream.h:63
@ PERSIST_PERSISTENT
Definition ami_stream.h:116
AMI_err
Definition ami_stream.h:83
@ AMI_ERROR_END_OF_STREAM
Definition ami_stream.h:86
@ AMI_ERROR_NO_ERROR
Definition ami_stream.h:84
#define NULL
Definition ccmath.h:32
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition ami_stream.h:477
AMI_err write_item(const T &elt)
Definition ami_stream.h:588
AMI_err write_array(const T *data, off_t len)
Definition ami_stream.h:613
AMI_err name(char **stream_name)
Definition ami_stream.h:426
AMI_err read_array(T *data, off_t len, off_t *lenp=NULL)
Definition ami_stream.h:554
AMI_err seek(off_t offset)
Definition ami_stream.h:445
off_t stream_len(void)
Definition ami_stream.h:375
void persist(persistence p)
Definition ami_stream.h:639
size_t memory_available()
Definition mm.cpp:194
void print()
Definition mm.cpp:81
#define assert(condition)
Definition lz4.c:291
@ MM_STREAM_USAGE_MAXIMUM
Definition mm.h:79
MM_register MM_manager
Definition mm.cpp:475
void quicksort(T *data, size_t n, CMPR &cmp, size_t min_len=20)
Definition quicksort.h:111
Definition path.h:15
SYMBOL * err(FILE *fp, SYMBOL *s, char *msg)