GRASS GIS 7 Programmer's Manual  7.5.svn(2017)-r71942
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ami_sort_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 
37 
38 #ifndef AMI_SORT_IMPL_H
39 #define AMI_SORT_IMPL_H
40 
41 #include "ami_stream.h"
42 #include "mem_stream.h"
43 #include "mm.h"
44 #include "quicksort.h"
45 #include "queue.h"
46 #include "replacementHeap.h"
47 #include "replacementHeapBlock.h"
48 
49 #define SDEBUG if(0)
50 
51 
52 /* if this flag is defined, a run will be split into blocks, each
53  block sorted and then all blocks merged */
54 #define BLOCKED_RUN
55 
56 
57 /* ---------------------------------------------------------------------- */
58 //set run_size, last_run_size and nb_runs depending on how much memory
59 //is available
60 template<class T>
61 static void
62 initializeRunFormation(AMI_STREAM<T> *instream,
63  size_t &run_size, size_t &last_run_size,
64  unsigned int &nb_runs) {
65 
66  size_t mm_avail = MM_manager.memory_available();
67  off_t strlen;
68 
69 #ifdef BLOCKED_RUN
70  // not in place, can only use half memory
71  mm_avail = mm_avail/2;
72 #endif
73  run_size = mm_avail/sizeof(T);
74 
75 
76  strlen = instream->stream_len();
77  if (strlen == 0) {
78  nb_runs = last_run_size = 0;
79  } else {
80  if (strlen % run_size == 0) {
81  nb_runs = strlen/run_size;
82  last_run_size = run_size;
83  } else {
84  nb_runs = strlen/run_size + 1;
85  last_run_size = strlen % run_size;
86  }
87  }
88 
89  SDEBUG cout << "nb_runs=" << nb_runs
90  << ", run_size=" << run_size
91  << ", last_run_size=" << last_run_size
92  << "\n";
93 }
94 
95 
96 
97 /* ---------------------------------------------------------------------- */
98 /* data is allocated; read run_size elements from stream into data and
99  sort them using quicksort */
100 template<class T, class Compare>
101 size_t makeRun_Block(AMI_STREAM<T> *instream, T* data,
102  unsigned int run_size, Compare *cmp) {
103  AMI_err err;
104  off_t new_run_size;
105 
106  //read next run from input stream
107  err = instream->read_array(data, run_size, &new_run_size);
108  assert(err == AMI_ERROR_NO_ERROR || err == AMI_ERROR_END_OF_STREAM);
109 
110  //sort it in memory in place
111  quicksort(data, new_run_size, *cmp);
112 
113  return new_run_size;
114 }
115 
116 
117 /* ---------------------------------------------------------------------- */
118 /* data is allocated; read run_size elements from stream into data and
119  sort them using quicksort; instead of reading the whole chunk at
120  once, it reads it in blocks, sorts each block and then merges the
121  blocks together. Note: it is not in place! it allocates another
122  array of same size as data, writes the sorted run into it and
123  deteles data, and replaces data with outdata */
124 template<class T, class Compare>
125 void makeRun(AMI_STREAM<T> *instream, T* &data,
126  int run_size, Compare *cmp) {
127 
128  unsigned int nblocks, last_block_size, crt_block_size, block_size;
129 
130 
131  block_size = STREAM_BUFFER_SIZE;
132 
133  if (run_size % block_size == 0) {
134  nblocks = run_size / block_size;
135  last_block_size = block_size;
136  } else {
137  nblocks = run_size / block_size + 1;
138  last_block_size = run_size % block_size;
139  }
140 
141  //create queue of blocks waiting to be merged
142  queue<MEM_STREAM<T> *> *blockList;
143  MEM_STREAM<T>* str;
144  blockList = new queue<MEM_STREAM<T> *>(nblocks);
145  for (unsigned int i=0; i < nblocks; i++) {
146  crt_block_size = (i == nblocks-1) ? last_block_size: block_size;
147  makeRun_Block(instream, &(data[i*block_size]), crt_block_size, cmp);
148  str = new MEM_STREAM<T>( &(data[i*block_size]), crt_block_size);
149  blockList->enqueue(str);
150  }
151  assert(blockList->length() == nblocks);
152 
153  //now data consists of sorted blocks: merge them
154  ReplacementHeapBlock<T,Compare> rheap(blockList);
155  SDEBUG rheap.print(cerr);
156  int i = 0;
157  T elt;
158  T* outdata = new T [run_size];
159  while (!rheap.empty()) {
160  elt = rheap.extract_min();
161  outdata[i] = elt;
162  //SDEBUG cerr << "makeRun: written " << elt << endl;
163  i++;
164  }
165  assert( i == run_size && blockList->length() == 0);
166  delete blockList;
167 
168  T* tmp = data;
169  delete [] tmp;
170  data = outdata;
171 }
172 
173 
174 
175 /* ---------------------------------------------------------------------- */
176 
177 //partition instream in streams that fit in main memory, sort each
178 //stream, remember its name, make it persistent and store it on
179 //disk. if entire stream fits in memory, sort it and store it and
180 //return it.
181 
182 //assume instream is allocated prior to the call.
183 // set nb_runs and allocate runNames.
184 
185 //The comparison object "cmp", of (user-defined) class represented by
186 //Compare, must have a member function called "compare" which is used
187 //for sorting the input stream.
188 
189 template<class T, class Compare>
191 runFormation(AMI_STREAM<T> *instream, Compare *cmp) {
192 
193  size_t run_size,last_run_size, crt_run_size;
194  unsigned int nb_runs;
195  queue<char*>* runList;
196  T* data;
197  AMI_STREAM<T>* str;
198  char* strname;
199 
200  assert(instream && cmp);
201  SDEBUG cout << "runFormation: ";
203 
204  /* leave this in for now, in case some file-based implementations do
205  anything funny... -RW */
206  //rewind file
207  instream->seek(0); //should check error xxx
208 
209  //estimate run_size, last_run_size and nb_runs
210  initializeRunFormation(instream, run_size, last_run_size, nb_runs);
211 
212  //create runList (if 0 size, queue uses default)
213  runList = new queue<char*>(nb_runs);
214 
215  /* allocate space for a run */
216  if (nb_runs <= 1) {
217  //don't waste space if input stream is smaller than run_size
218  data = new T[last_run_size];
219  } else {
220  data = new T[run_size];
221  }
223 
224  for (size_t i=0; i< nb_runs; i++) {
225  //while(!instream->eof()) {
226  crt_run_size = (i == nb_runs-1) ? last_run_size: run_size;
227 
228  SDEBUG cout << "i=" << i << ": runsize=" << crt_run_size << ", ";
229 
230  //crt_run_size = makeRun_Block(instream, data, run_size, cmp);
231 #ifdef BLOCKED_RUN
232  makeRun(instream, data, crt_run_size, cmp);
233 #else
234  makeRun_Block(instream, data, crt_run_size, cmp);
235 #endif
236 
238 
239  //read next run from input stream
240  //err = instream->read_array(data, crt_run_size);
241  //assert(err == AMI_ERROR_NO_ERROR);
242  //sort it in memory in place
243  //quicksort(data, crt_run_size, *cmp);
244 
245  if(crt_run_size > 0) {
246  //create a new stream to hold this run
247  str = new AMI_STREAM<T>();
248  str->write_array(data, crt_run_size);
249  assert(str->stream_len() == crt_run_size);
250 
251  //remember this run's name
252  str->name(&strname); /* deleted after we dequeue */
253  runList->enqueue(strname);
254  //delete the stream -- should not keep too many streams open
256  delete str;
257  }
258 
259  };
261  //release the run memory!
262  delete [] data;
263 
264  SDEBUG cout << "runFormation: done.\n";
266 
267  return runList;
268 };
269 
270 
271 
272 
273 
274 
275 /* ---------------------------------------------------------------------- */
276 
277 //this is one pass of merge; estimate max possible merge arity <ar>
278 //and merge the first <ar> streams from runList ; create and return
279 //the resulting stream (does not add it to the queue -- the calling
280 //function will do that)
281 
282 //input streams are assumed to be sorted, and are not necessarily of
283 //the same length.
284 
285 //streamList does not contains streams, but names of streams, which
286 //must be opened in order to be merged
287 
288 //The comparison object "cmp", of (user-defined) class represented by
289 //Compare, must have a member function called "compare" which is used
290 //for sorting the input stream.
291 
292 
293 template<class T, class Compare>
295 singleMerge(queue<char*>* streamList, Compare *cmp)
296 {
297  AMI_STREAM<T>* mergedStr;
298  size_t mm_avail, blocksize;
299  unsigned int arity, max_arity;
300  T elt;
301 
302  assert(streamList && cmp);
303 
304  SDEBUG cout << "singleMerge: ";
305 
306  //estimate max possible merge arity with available memory (approx M/B)
307  mm_avail = MM_manager.memory_available();
308  //blocksize = getpagesize();
309  //should use AMI function, but there's no stream at this point
310  //now use static mtd -RW 5/05
312  max_arity = mm_avail / blocksize;
313  if(max_arity < 2) {
314  cerr << __FILE__ ":" << __LINE__ << ": OUT OF MEMORY in singleMerge (going over limit)" << endl;
315  max_arity = 2;
316  } else if(max_arity > MAX_STREAMS_OPEN) {
317  max_arity = MAX_STREAMS_OPEN;
318  }
319  arity = (streamList->length() < max_arity) ?
320  streamList->length() : max_arity;
321 
322  SDEBUG cout << "arity=" << arity << " (max_arity=" <<max_arity<< ")\n";
323 
324  /* create the output stream. if this is a complete merge, use finalpath */
325  //create output stream
326  mergedStr = new AMI_STREAM<T>();
327 
328  ReplacementHeap<T,Compare> rheap(arity, streamList);
329  SDEBUG rheap.print(cerr);
330 
331  int i = 0;
332  while (!rheap.empty()) {
333  //mergedStr->write_item( rheap.extract_min() );
334  //xxx should check error here
335  elt = rheap.extract_min();
336  mergedStr->write_item(elt);
337  //SDEBUG cerr << "smerge: written " << elt << endl;
338  i++;
339  }
340 
341  SDEBUG cout << "..done\n";
342 
343  return mergedStr;
344 }
345 
346 
347 
348 
349 /* ---------------------------------------------------------------------- */
350 
351 //merge runs whose names are given by runList; this may entail
352 //multiple passes of singleMerge();
353 
354 //return the resulting output stream
355 
356 //input streams are assumed to be sorted, and are not necessarily of
357 //the same length.
358 
359 //The comparison object "cmp", of (user-defined) class represented by
360 //Compare, must have a member function called "compare" which is used
361 //for sorting the input stream.
362 
363 
364 template<class T, class Compare>
366 multiMerge(queue<char*>* runList, Compare *cmp)
367 {
368  AMI_STREAM<T> * mergedStr= NULL;
369  char* path;
370 
371  assert(runList && runList->length() > 1 && cmp);
372 
373  SDEBUG cout << "multiMerge: " << runList->length() << " runs" << endl;
374 
375  while (runList->length() > 1) {
376 
377  //merge streams from streamlist into mergedStr
378  mergedStr = singleMerge<T,Compare>(runList, cmp);
379  //i thought the templates are not needed in the call, but seems to
380  //help the compiler..laura
381  assert(mergedStr);
382 
383  //if more runs exist, delete this stream and add it to list
384  if (runList->length() > 0) {
385  mergedStr->name(&path);
386  runList->enqueue(path);
387  mergedStr->persist(PERSIST_PERSISTENT);
388  delete mergedStr;
389  }
390  }
391 
392  assert(runList->length() == 0);
393  assert(mergedStr);
394  return mergedStr;
395 }
396 
397 
398 
399 
400 #endif
401 
AMI_STREAM< T > * multiMerge(queue< char * > *runList, Compare *cmp)
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition: ami_stream.h:494
#define STREAM_BUFFER_SIZE
Definition: ami_stream.h:80
unsigned int length() const
Definition: queue.h:59
void print()
Definition: mm.cpp:86
AMI_err read_array(T *data, off_t len, off_t *lenp=NULL)
Definition: ami_stream.h:573
AMI_err write_item(const T &elt)
Definition: ami_stream.h:606
size_t makeRun_Block(AMI_STREAM< T > *instream, T *data, unsigned int run_size, Compare *cmp)
MM_register MM_manager
Definition: mm.cpp:487
#define NULL
Definition: ccmath.h:32
#define MAX_STREAMS_OPEN
Definition: ami_stream.h:65
void makeRun(AMI_STREAM< T > *instream, T *&data, int run_size, Compare *cmp)
AMI_err seek(off_t offset)
Definition: ami_stream.h:460
SYMBOL * err(FILE *fp, SYMBOL *s, char *msg)
Definition: symbol/read.c:220
Definition: queue.h:43
queue< char * > * runFormation(AMI_STREAM< T > *instream, Compare *cmp)
AMI_err write_array(const T *data, off_t len)
Definition: ami_stream.h:631
void quicksort(T *data, size_t n, CMPR &cmp, size_t min_len=20)
Definition: quicksort.h:119
bool enqueue(T &)
Definition: queue.h:83
#define SDEBUG
Definition: ami_sort_impl.h:49
AMI_err name(char **stream_name)
Definition: ami_stream.h:440
Definition: path.h:16
void persist(persistence p)
Definition: ami_stream.h:657
AMI_err
Definition: ami_stream.h:86
AMI_STREAM< T > * singleMerge(queue< char * > *streamList, Compare *cmp)
off_t stream_len(void)
Definition: ami_stream.h:388
size_t memory_available()
Definition: mm.cpp:200