GRASS GIS 8 Programmer's Manual  8.4.0dev(2024)-835afb4352
ami_sort_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 #ifndef AMI_SORT_IMPL_H
37 #define AMI_SORT_IMPL_H
38 
39 #include "ami_stream.h"
40 #include "mem_stream.h"
41 #include "mm.h"
42 #include "quicksort.h"
43 #include "queue.h"
44 #include "replacementHeap.h"
45 #include "replacementHeapBlock.h"
46 
47 #define SDEBUG if (0)
48 
49 /* if this flag is defined, a run will be split into blocks, each
50  block sorted and then all blocks merged */
51 #define BLOCKED_RUN
52 
53 /* ---------------------------------------------------------------------- */
54 // set run_size, last_run_size and nb_runs depending on how much memory
55 // is available
56 template <class T>
57 static void initializeRunFormation(AMI_STREAM<T> *instream, size_t &run_size,
58  size_t &last_run_size, unsigned int &nb_runs)
59 {
60 
61  size_t mm_avail = MM_manager.memory_available();
62  off_t strlen;
63 
64 #ifdef BLOCKED_RUN
65  // not in place, can only use half memory
66  mm_avail = mm_avail / 2;
67 #endif
68  run_size = mm_avail / sizeof(T);
69 
70  strlen = instream->stream_len();
71  if (strlen == 0) {
72  nb_runs = last_run_size = 0;
73  }
74  else {
75  if (strlen % run_size == 0) {
76  nb_runs = strlen / run_size;
77  last_run_size = run_size;
78  }
79  else {
80  nb_runs = strlen / run_size + 1;
81  last_run_size = strlen % run_size;
82  }
83  }
84 
85  SDEBUG cout << "nb_runs=" << nb_runs << ", run_size=" << run_size
86  << ", last_run_size=" << last_run_size << "\n";
87 }
88 
89 /* ---------------------------------------------------------------------- */
90 /* data is allocated; read run_size elements from stream into data and
91  sort them using quicksort */
92 template <class T, class Compare>
93 size_t makeRun_Block(AMI_STREAM<T> *instream, T *data, unsigned int run_size,
94  Compare *cmp)
95 {
96  AMI_err err;
97  off_t new_run_size = 0;
98 
99  // read next run from input stream
100  err = instream->read_array(data, run_size, &new_run_size);
102 
103  // sort it in memory in place
104  quicksort(data, new_run_size, *cmp);
105 
106  return new_run_size;
107 }
108 
109 /* ---------------------------------------------------------------------- */
110 /* data is allocated; read run_size elements from stream into data and
111  sort them using quicksort; instead of reading the whole chunk at
112  once, it reads it in blocks, sorts each block and then merges the
113  blocks together. Note: it is not in place! it allocates another
114  array of same size as data, writes the sorted run into it and
115  deteles data, and replaces data with outdata */
116 template <class T, class Compare>
117 void makeRun(AMI_STREAM<T> *instream, T *&data, int run_size, Compare *cmp)
118 {
119 
120  unsigned int nblocks, last_block_size, crt_block_size, block_size;
121 
122  block_size = STREAM_BUFFER_SIZE;
123 
124  if (run_size % block_size == 0) {
125  nblocks = run_size / block_size;
126  last_block_size = block_size;
127  }
128  else {
129  nblocks = run_size / block_size + 1;
130  last_block_size = run_size % block_size;
131  }
132 
133  // create queue of blocks waiting to be merged
134  queue<MEM_STREAM<T> *> *blockList;
135  MEM_STREAM<T> *str;
136  blockList = new queue<MEM_STREAM<T> *>(nblocks);
137  for (unsigned int i = 0; i < nblocks; i++) {
138  crt_block_size = (i == nblocks - 1) ? last_block_size : block_size;
139  makeRun_Block(instream, &(data[i * block_size]), crt_block_size, cmp);
140  str = new MEM_STREAM<T>(&(data[i * block_size]), crt_block_size);
141  blockList->enqueue(str);
142  }
143  assert(blockList->length() == nblocks);
144 
145  // now data consists of sorted blocks: merge them
146  ReplacementHeapBlock<T, Compare> rheap(blockList);
147  SDEBUG rheap.print(cerr);
148  int i = 0;
149  T elt;
150  T *outdata = new T[run_size];
151  while (!rheap.empty()) {
152  elt = rheap.extract_min();
153  outdata[i] = elt;
154  // SDEBUG cerr << "makeRun: written " << elt << endl;
155  i++;
156  }
157  assert(i == run_size && blockList->length() == 0);
158  delete blockList;
159 
160  T *tmp = data;
161  delete[] tmp;
162  data = outdata;
163 }
164 
165 /* ---------------------------------------------------------------------- */
166 
167 // partition instream in streams that fit in main memory, sort each
168 // stream, remember its name, make it persistent and store it on
169 // disk. if entire stream fits in memory, sort it and store it and
170 // return it.
171 
172 // assume instream is allocated prior to the call.
173 // set nb_runs and allocate runNames.
174 
175 // The comparison object "cmp", of (user-defined) class represented by
176 // Compare, must have a member function called "compare" which is used
177 // for sorting the input stream.
178 
179 template <class T, class Compare>
180 queue<char *> *runFormation(AMI_STREAM<T> *instream, Compare *cmp)
181 {
182 
183  size_t run_size, last_run_size, crt_run_size;
184  unsigned int nb_runs;
185  queue<char *> *runList;
186  T *data;
187  AMI_STREAM<T> *str;
188  char *strname;
189 
190  assert(instream && cmp);
191  SDEBUG cout << "runFormation: ";
193 
194  /* leave this in for now, in case some file-based implementations do
195  anything funny... -RW */
196  // rewind file
197  instream->seek(0); // should check error xxx
198 
199  // estimate run_size, last_run_size and nb_runs
200  initializeRunFormation(instream, run_size, last_run_size, nb_runs);
201 
202  // create runList (if 0 size, queue uses default)
203  runList = new queue<char *>(nb_runs);
204 
205  /* allocate space for a run */
206  if (nb_runs <= 1) {
207  // don't waste space if input stream is smaller than run_size
208  data = new T[last_run_size];
209  }
210  else {
211  data = new T[run_size];
212  }
214 
215  for (size_t i = 0; i < nb_runs; i++) {
216  // while(!instream->eof()) {
217  crt_run_size = (i == nb_runs - 1) ? last_run_size : run_size;
218 
219  SDEBUG cout << "i=" << i << ": runsize=" << crt_run_size << ", ";
220 
221  // crt_run_size = makeRun_Block(instream, data, run_size, cmp);
222 #ifdef BLOCKED_RUN
223  makeRun(instream, data, crt_run_size, cmp);
224 #else
225  makeRun_Block(instream, data, crt_run_size, cmp);
226 #endif
227 
229 
230  // read next run from input stream
231  // err = instream->read_array(data, crt_run_size);
232  // assert(err == AMI_ERROR_NO_ERROR);
233  // sort it in memory in place
234  // quicksort(data, crt_run_size, *cmp);
235 
236  if (crt_run_size > 0) {
237  // create a new stream to hold this run
238  str = new AMI_STREAM<T>();
239  str->write_array(data, crt_run_size);
240  assert(str->stream_len() == crt_run_size);
241 
242  // remember this run's name
243  str->name(&strname); /* deleted after we dequeue */
244  runList->enqueue(strname);
245  // delete the stream -- should not keep too many streams open
247  delete str;
248  }
249  }
251  // release the run memory!
252  delete[] data;
253 
254  SDEBUG cout << "runFormation: done.\n";
256 
257  return runList;
258 }
259 
260 /* ---------------------------------------------------------------------- */
261 
262 // this is one pass of merge; estimate max possible merge arity <ar>
263 // and merge the first <ar> streams from runList ; create and return
264 // the resulting stream (does not add it to the queue -- the calling
265 // function will do that)
266 
267 // input streams are assumed to be sorted, and are not necessarily of
268 // the same length.
269 
270 // streamList does not contains streams, but names of streams, which
271 // must be opened in order to be merged
272 
273 // The comparison object "cmp", of (user-defined) class represented by
274 // Compare, must have a member function called "compare" which is used
275 // for sorting the input stream.
276 
277 template <class T, class Compare>
278 AMI_STREAM<T> *singleMerge(queue<char *> *streamList, Compare *cmp)
279 {
280  AMI_STREAM<T> *mergedStr;
281  size_t mm_avail, blocksize;
282  unsigned int arity, max_arity;
283  T elt;
284 
285  assert(streamList && cmp);
286 
287  SDEBUG cout << "singleMerge: ";
288 
289  // estimate max possible merge arity with available memory (approx M/B)
290  mm_avail = MM_manager.memory_available();
291  // blocksize = getpagesize();
292  // should use AMI function, but there's no stream at this point
293  // now use static mtd -RW 5/05
295  max_arity = mm_avail / blocksize;
296  if (max_arity < 2) {
297  cerr << __FILE__ ":" << __LINE__
298  << ": OUT OF MEMORY in singleMerge (going over limit)" << endl;
299  max_arity = 2;
300  }
301  else if (max_arity > MAX_STREAMS_OPEN) {
302  max_arity = MAX_STREAMS_OPEN;
303  }
304  arity =
305  (streamList->length() < max_arity) ? streamList->length() : max_arity;
306 
307  SDEBUG cout << "arity=" << arity << " (max_arity=" << max_arity << ")\n";
308 
309  /* create the output stream. if this is a complete merge, use finalpath */
310  // create output stream
311  mergedStr = new AMI_STREAM<T>();
312 
313  ReplacementHeap<T, Compare> rheap(arity, streamList);
314  SDEBUG rheap.print(cerr);
315 
316  while (!rheap.empty()) {
317  // mergedStr->write_item( rheap.extract_min() );
318  // xxx should check error here
319  elt = rheap.extract_min();
320  mergedStr->write_item(elt);
321  // SDEBUG cerr << "smerge: written " << elt << endl;
322  }
323 
324  SDEBUG cout << "..done\n";
325 
326  return mergedStr;
327 }
328 
329 /* ---------------------------------------------------------------------- */
330 
331 // merge runs whose names are given by runList; this may entail
332 // multiple passes of singleMerge();
333 
334 // return the resulting output stream
335 
336 // input streams are assumed to be sorted, and are not necessarily of
337 // the same length.
338 
339 // The comparison object "cmp", of (user-defined) class represented by
340 // Compare, must have a member function called "compare" which is used
341 // for sorting the input stream.
342 
343 template <class T, class Compare>
344 AMI_STREAM<T> *multiMerge(queue<char *> *runList, Compare *cmp)
345 {
346  AMI_STREAM<T> *mergedStr = NULL;
347  char *path;
348 
349  assert(runList && runList->length() > 1 && cmp);
350 
351  SDEBUG cout << "multiMerge: " << runList->length() << " runs" << endl;
352 
353  while (runList->length() > 1) {
354 
355  // merge streams from streamlist into mergedStr
356  mergedStr = singleMerge<T, Compare>(runList, cmp);
357  // i thought the templates are not needed in the call, but seems to
358  // help the compiler..laura
359  assert(mergedStr);
360 
361  // if more runs exist, delete this stream and add it to list
362  if (runList->length() > 0) {
363  mergedStr->name(&path);
364  runList->enqueue(path);
365  mergedStr->persist(PERSIST_PERSISTENT);
366  delete mergedStr;
367  }
368  }
369 
370  assert(runList->length() == 0);
371  assert(mergedStr);
372  return mergedStr;
373 }
374 
375 #endif
AMI_STREAM< T > * singleMerge(queue< char * > *streamList, Compare *cmp)
size_t makeRun_Block(AMI_STREAM< T > *instream, T *data, unsigned int run_size, Compare *cmp)
Definition: ami_sort_impl.h:93
#define SDEBUG
Definition: ami_sort_impl.h:47
void makeRun(AMI_STREAM< T > *instream, T *&data, int run_size, Compare *cmp)
queue< char * > * runFormation(AMI_STREAM< T > *instream, Compare *cmp)
AMI_STREAM< T > * multiMerge(queue< char * > *runList, Compare *cmp)
#define STREAM_BUFFER_SIZE
Definition: ami_stream.h:78
#define MAX_STREAMS_OPEN
Definition: ami_stream.h:63
@ PERSIST_PERSISTENT
Definition: ami_stream.h:116
AMI_err
Definition: ami_stream.h:83
@ AMI_ERROR_END_OF_STREAM
Definition: ami_stream.h:86
@ AMI_ERROR_NO_ERROR
Definition: ami_stream.h:84
#define NULL
Definition: ccmath.h:32
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition: ami_stream.h:477
AMI_err write_item(const T &elt)
Definition: ami_stream.h:588
AMI_err write_array(const T *data, off_t len)
Definition: ami_stream.h:613
AMI_err name(char **stream_name)
Definition: ami_stream.h:426
AMI_err read_array(T *data, off_t len, off_t *lenp=NULL)
Definition: ami_stream.h:554
AMI_err seek(off_t offset)
Definition: ami_stream.h:445
off_t stream_len(void)
Definition: ami_stream.h:375
void persist(persistence p)
Definition: ami_stream.h:639
size_t memory_available()
Definition: mm.cpp:194
void print()
Definition: mm.cpp:81
ostream & print(ostream &s) const
int empty() const
ostream & print(ostream &s) const
Definition: queue.h:43
unsigned int length() const
Definition: queue.h:60
bool enqueue(T &)
Definition: queue.h:83
#define assert(condition)
Definition: lz4.c:393
@ MM_STREAM_USAGE_MAXIMUM
Definition: mm.h:79
MM_register MM_manager
Definition: mm.cpp:475
void quicksort(T *data, size_t n, CMPR &cmp, size_t min_len=20)
Definition: quicksort.h:111
Definition: path.h:15
SYMBOL * err(FILE *fp, SYMBOL *s, char *msg)
Definition: symbol/read.c:216