GRASS GIS 8 Programmer's Manual  8.5.0dev(2025)-c0b45cfe22
ami_stream.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 #ifndef _AMI_STREAM_H
37 #define _AMI_STREAM_H
38 
39 #include <grass/config.h>
40 
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <assert.h>
46 #include <fcntl.h>
47 #include <errno.h>
48 #include <unistd.h>
49 
50 #include <cstring>
51 #include <iostream>
52 using std::cerr;
53 using std::cout;
54 using std::endl;
55 using std::istream;
56 using std::ofstream;
57 using std::ostream;
58 
59 extern "C" {
60 #include <grass/gis.h>
61 }
62 
63 #define MAX_STREAMS_OPEN 200
64 
65 #include "mm.h" // Get the memory manager.
66 
67 #define DEBUG_DELETE if (0)
68 #define DEBUG_STREAM_LEN if (0)
69 #define DEBUG_ASSERT if (0)
70 
71 // The name of the environment variable which keeps the name of the
72 // directory where streams are stored
73 #define STREAM_TMPDIR "STREAM_DIR"
74 
75 // All streams will be names STREAM_*****
76 #define BASE_NAME "STREAM"
77 
78 #define STREAM_BUFFER_SIZE (1 << 18)
79 
80 //
81 // AMI error codes are returned using the AMI_err type.
82 //
83 enum AMI_err {
97 };
98 
99 extern const char *ami_str_error[];
100 
101 //
102 // AMI stream types passed to constructors
103 //
105  AMI_READ_STREAM = 1, // Open existing stream for reading
106  AMI_WRITE_STREAM, // Open for writing. Create if non-existent
107  AMI_APPEND_STREAM, // Open for writing at end. Create if needed.
108  AMI_READ_WRITE_STREAM, // Open to read and write.
109  AMI_APPEND_WRITE_STREAM // Open for writing at end (write only mode).
110 };
111 
113  // Delete the stream from the disk when it is destructed.
115  // Do not delete the stream from the disk when it is destructed.
117  // Delete each block of data from the disk as it is read.
119 };
120 
121 /* an un-templated version makes for easier debugging */
123 protected:
124  FILE *fp;
125  int fildes; // descriptor of file
127  char path[BUFSIZ];
129 
130  // 0 for streams, positive for substreams
131  unsigned int substream_level;
132 
133  // If this stream is actually a substream, these will be set to
134  // indicate the portion of the file that is part of this stream. If
135  // the stream is the whole file, they will be set to -1. Both are in
136  // T units.
137  off_t logical_bos;
138  off_t logical_eos;
139 
140  // stream buffer passed in the call to setvbuf when file is opened
141  char *buf;
143 
144 public:
145  static unsigned int get_block_length()
146  {
147  return STREAM_BUFFER_SIZE;
148  // return getpagesize();
149  };
150 };
151 
152 template <class T>
153 class AMI_STREAM : public UntypedStream {
154 
155 protected:
156  T read_tmp; /* this is ugly... RW */
157 
158 public:
159  // An AMI_stream with default name
160  AMI_STREAM();
161 
162  // An AMI stream based on a specific path name.
163  AMI_STREAM(const char *path_name,
165 
166  // convenience function with split path_name
167  // AMI_STREAM(const char *dir_name, const char *file_name, AMI_stream_type
168  // st);
169 
170  // A psuedo-constructor for substreams.
171  AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end,
172  AMI_STREAM<T> **sub_stream);
173 
174  // Destructor
175  ~AMI_STREAM(void);
176 
177  // Read and write elements.
178  AMI_err read_item(T **elt);
179  AMI_err write_item(const T &elt);
180  AMI_err read_array(T *data, off_t len, off_t *lenp = NULL);
181  AMI_err write_array(const T *data, off_t len);
182 
183  // Return the number of items in the stream.
184  off_t stream_len(void);
185 
186  // Return the path name of this stream.
187  AMI_err name(char **stream_name);
188  const char *name() const;
189 
190  // Move to a specific item in the stream.
191  AMI_err seek(off_t offset);
192 
193  // Query memory usage
194  static AMI_err
195  main_memory_usage(size_t *usage,
197 
198  void persist(persistence p);
199 
200  char *sprint();
201 
202  // have we hit the end of the stream
203  int eof();
204 };
205 
206 /**********************************************************************/
207 
208 /**********************************************************************/
209 /* creates a random file name, opens the file for reading and writing
210  and and returns a file descriptor */
211 /* int ami_single_temp_name(char *base, char* tmp_path); */
212 /* fix from Andy Danner */
213 int ami_single_temp_name(const std::string &base, char *tmp_path);
214 
215 /**********************************************************************/
216 /* given fd=fide descriptor, associates with it a stream aopened in
217  access_mode and returns it */
218 FILE *open_stream(int fd, AMI_stream_type st);
219 
220 /**********************************************************************/
221 /* open the file whose name is pathname in access mode */
222 FILE *open_stream(char *pathname, AMI_stream_type st);
223 
224 /********************************************************************/
225 // An AMI stream with default name.
226 template <class T>
228 {
229 
230  access_mode = AMI_READ_WRITE_STREAM;
232  fildes = fd;
233  fp = open_stream(fd, access_mode);
234 
235  /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
236  buf = new char[STREAM_BUFFER_SIZE];
237  if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
238  cerr << "ERROR: setvbuf failed (stream " << path
239  << ") with: " << strerror(errno) << endl;
240  exit(1);
241  }
242 
243  // By default, all streams are deleted at destruction time.
244  per = PERSIST_DELETE;
245 
246  // Not a substream.
247  substream_level = 0;
248  logical_bos = logical_eos = -1;
249 
250  // why is this here in the first place?? -RW
251  seek(0);
252 
253  eof_reached = 0;
254 
255  // Register memory usage before returning.
256  // size_t usage;
257  // main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
258  // MM_manager.register_allocation(usage);
259 }
260 
261 /**********************************************************************/
262 // An AMI stream based on a specific path name.
263 template <class T>
265 {
266 
267  access_mode = st;
268 
269  if (path_name == NULL) {
271  fildes = fd;
272  fp = open_stream(fd, access_mode);
273  }
274  else {
275  strcpy(path, path_name);
276  fp = open_stream(path, st);
277  fildes = -1;
278  }
279 
280  /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
281  buf = new char[STREAM_BUFFER_SIZE];
282  if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
283  cerr << "ERROR: setvbuf failed (stream " << path
284  << ") with: " << strerror(errno) << endl;
285  exit(1);
286  }
287 
288  eof_reached = 0;
289 
290  // By default, all streams are deleted at destruction time.
291  if (st == AMI_READ_STREAM) {
292  per = PERSIST_PERSISTENT;
293  }
294  else {
295  per = PERSIST_DELETE;
296  }
297 
298  // Not a substream.
299  substream_level = 0;
300  logical_bos = logical_eos = -1;
301 
302  seek(0);
303 
304  // Register memory usage before returning.
305  // size_t usage;
306  // main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
307  // MM_manager.register_allocation(usage);
308 }
309 
310 /**********************************************************************/
311 // A psuedo-constructor for substreams.
312 template <class T>
314  off_t sub_end, AMI_STREAM<T> **sub_stream)
315 {
316 
317  // assume this for now
319 
320 #ifdef __MINGW32__
321  /* MINGW32: reopen file here for stream_len() below */
322  // reopen the file
323  AMI_STREAM<T> *substr = new AMI_STREAM<T>(path, st);
324 #endif
325 
326  // check range
327  if (substream_level) {
328  if ((sub_begin >= (logical_eos - logical_bos)) ||
329  (sub_end >= (logical_eos - logical_bos))) {
330 
331  return AMI_ERROR_OUT_OF_RANGE;
332  }
333  }
334  else {
335  off_t len = stream_len();
336  if (sub_begin > len || sub_end > len) {
337 
338  return AMI_ERROR_OUT_OF_RANGE;
339  }
340  }
341 
342 #ifndef __MINGW32__
343  // reopen the file
344  AMI_STREAM<T> *substr = new AMI_STREAM<T>(path, st);
345 #endif
346 
347  // Set up the beginning and end positions.
348  if (substream_level) {
349  substr->logical_bos = logical_bos + sub_begin;
350  substr->logical_eos = logical_bos + sub_end + 1;
351  }
352  else {
353  substr->logical_bos = sub_begin;
354  substr->logical_eos = sub_end + 1;
355  }
356 
357  // Set the current position.
358  substr->seek(0);
359 
360  substr->eof_reached = 0;
361 
362  // set substream level
363  substr->substream_level = substream_level + 1;
364 
365  substr->per = per; // set persistence
366 
367  //*sub_stream = (AMI_base_stream < T > *)substr;
368  *sub_stream = substr;
369  return AMI_ERROR_NO_ERROR;
370 }
371 
372 /**********************************************************************/
373 // Return the number of items in the stream.
374 template <class T>
376 {
377 
378  fflush(fp);
379 
380 #ifdef __MINGW32__
381  // stat() fails on MS Windows if the file is open, so try G_ftell() instead.
382  // FIXME: not 64bit safe, but WinGrass isn't either right now.
383  off_t posn_save, st_size;
384 
385  posn_save = G_ftell(fp);
386  if (posn_save == -1) {
387  perror("ERROR: AMI_STREAM::stream_len(): ftell(fp) failed ");
388  perror(path);
389  exit(1);
390  }
391 
392  G_fseek(fp, 0, SEEK_END);
393  st_size = G_ftell(fp);
394  if (st_size == -1) {
395  perror("ERROR: AMI_STREAM::stream_len(): ftell[SEEK_END] failed ");
396  perror(path);
397  exit(1);
398  }
399 
400  G_fseek(fp, posn_save, SEEK_SET);
401 
402  // debug stream_len:
403  DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%d\n", path,
404  st_size, sizeof(T));
405 
406  return (st_size / sizeof(T));
407 #else
408  struct stat statbuf;
409  if (stat(path, &statbuf) == -1) {
410  perror("AMI_STREAM::stream_len(): fstat failed ");
411  DEBUG_ASSERT assert(0);
412  exit(1);
413  }
414 
415  // debug stream_len:
416  DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%lud\n",
417  path, (long long int)statbuf.st_size, sizeof(T));
418 
419  return (statbuf.st_size / sizeof(T));
420 #endif
421 }
422 
423 /**********************************************************************/
424 // Return the path name of this stream.
425 template <class T>
426 AMI_err AMI_STREAM<T>::name(char **stream_name)
427 {
428 
429  *stream_name = new char[strlen(path) + 1];
430  strcpy(*stream_name, path);
431 
432  return AMI_ERROR_NO_ERROR;
433 }
434 
435 // Return the path name of this stream.
436 template <class T>
437 const char *AMI_STREAM<T>::name() const
438 {
439  return path;
440 }
441 
442 /**********************************************************************/
443 // Move to a specific offset within the (sub)stream.
444 template <class T>
446 {
447 
448  off_t seek_offset;
449 
450  if (substream_level) { // substream
451  if (offset > (unsigned)(logical_eos - logical_bos)) {
452  // offset out of range
453  cerr << "ERROR: AMI_STREAM::seek bos=" << logical_bos
454  << ", eos=" << logical_eos << ", offset " << offset
455  << " out of range.\n";
456  DEBUG_ASSERT assert(0);
457  exit(1);
458  }
459  else {
460  // offset in range
461  seek_offset = (logical_bos + offset) * sizeof(T);
462  }
463  }
464  else {
465  // not a substream
466  seek_offset = offset * sizeof(T);
467  }
468 
469  G_fseek(fp, seek_offset, SEEK_SET);
470 
471  return AMI_ERROR_NO_ERROR;
472 }
473 
474 /**********************************************************************/
475 // Query memory usage
476 template <class T>
478  MM_stream_usage usage_type)
479 {
480 
481  switch (usage_type) {
483  *usage = sizeof(AMI_STREAM<T>);
484  break;
486  // *usage = get_block_length();
487  *usage = STREAM_BUFFER_SIZE * sizeof(char);
488  break;
491  // *usage = sizeof (*this) + get_block_length();
492  *usage = sizeof(AMI_STREAM<T>) + STREAM_BUFFER_SIZE * sizeof(char);
493  break;
494  }
495  return AMI_ERROR_NO_ERROR;
496 }
497 
498 /**********************************************************************/
499 template <class T>
501 {
502 
503  DEBUG_DELETE cerr << "~AMI_STREAM: " << path << "(" << this << ")\n";
504  assert(fp);
505  fclose(fp);
506  delete buf;
507 
508  // Get rid of the file if not persistent and if not substream.
509  if ((per != PERSIST_PERSISTENT) && (substream_level == 0)) {
510  if (unlink(path) == -1) {
511  cerr << "ERROR: AMI_STREAM: failed to unlink " << path << endl;
512  perror("cannot unlink: ");
513  DEBUG_ASSERT assert(0);
514  exit(1);
515  }
516  }
517  // Register memory deallocation before returning.
518  // size_t usage;
519  // main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
520  // MM_manager.register_deallocation(usage);
521 }
522 
523 /**********************************************************************/
524 template <class T>
526 {
527 
528  assert(fp);
529 
530  // if we go past substream range
531  if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
533  }
534  else {
535  if (fread((char *)(&read_tmp), sizeof(T), 1, fp) < 1) {
536  if (feof(fp)) {
537  eof_reached = 1;
539  }
540  else {
541  cerr << "ERROR: file=" << path << ":";
542  perror("cannot read!");
543  return AMI_ERROR_IO_ERROR;
544  }
545  }
546 
547  *elt = &read_tmp;
548  return AMI_ERROR_NO_ERROR;
549  }
550 }
551 
552 /**********************************************************************/
553 template <class T>
554 AMI_err AMI_STREAM<T>::read_array(T *data, off_t len, off_t *lenp)
555 {
556  size_t nobj;
557  assert(fp);
558 
559  // if we go past substream range
560  if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
561  eof_reached = 1;
563  }
564  else {
565  nobj = fread((void *)data, sizeof(T), len, fp);
566 
567  if (nobj < len) { /* some kind of error */
568  if (feof(fp)) {
569  if (lenp)
570  *lenp = nobj;
571  eof_reached = 1;
573  }
574  else {
575  cerr << "ERROR: file=" << path << ":";
576  perror("cannot read!");
577  return AMI_ERROR_IO_ERROR;
578  }
579  }
580  if (lenp)
581  *lenp = nobj;
582  return AMI_ERROR_NO_ERROR;
583  }
584 }
585 
586 /**********************************************************************/
587 template <class T>
589 {
590 
591  assert(fp);
592  // if we go past substream range
593  if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
595  }
596  else {
597  if (fwrite((char *)(&elt), sizeof(T), 1, fp) < 1) {
598  cerr << "ERROR: AMI_STREAM::write_item failed.\n";
599  if (*path)
600  perror(path);
601  else
602  perror("AMI_STREAM::write_item: ");
603  DEBUG_ASSERT assert(0);
604  exit(1);
605  }
606 
607  return AMI_ERROR_NO_ERROR;
608  }
609 }
610 
611 /**********************************************************************/
612 template <class T>
613 AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len)
614 {
615  size_t nobj;
616 
617  assert(fp);
618  // if we go past substream range
619  if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
621  }
622  else {
623  nobj = fwrite(data, sizeof(T), len, fp);
624  if (nobj < len) {
625  cerr << "ERROR: AMI_STREAM::write_array failed.\n";
626  if (*path)
627  perror(path);
628  else
629  perror("AMI_STREAM::write_array: ");
630  DEBUG_ASSERT assert(0);
631  exit(1);
632  }
633  return AMI_ERROR_NO_ERROR;
634  }
635 }
636 
637 /**********************************************************************/
638 template <class T>
640 {
641  per = p;
642 }
643 
644 /**********************************************************************/
645 // sprint()
646 // Return a string describing the stream
647 //
648 // This function gives easy access to the file name, length.
649 // It is not reentrant, but this should not be too much of a problem
650 // if you are careful.
651 template <class T>
653 {
654  static char desc[BUFSIZ + 256];
655  snprintf(desc, sizeof(desc), "[AMI_STREAM %s %ld]", path,
656  (long)stream_len());
657  return desc;
658 }
659 
660 /**********************************************************************/
661 template <class T>
663 {
664  return eof_reached;
665 }
666 
667 #endif // _AMI_STREAM_H
#define STREAM_BUFFER_SIZE
Definition: ami_stream.h:78
#define DEBUG_STREAM_LEN
Definition: ami_stream.h:68
#define DEBUG_DELETE
Definition: ami_stream.h:67
AMI_stream_type
Definition: ami_stream.h:104
@ AMI_APPEND_STREAM
Definition: ami_stream.h:107
@ AMI_WRITE_STREAM
Definition: ami_stream.h:106
@ AMI_READ_STREAM
Definition: ami_stream.h:105
@ AMI_READ_WRITE_STREAM
Definition: ami_stream.h:108
@ AMI_APPEND_WRITE_STREAM
Definition: ami_stream.h:109
persistence
Definition: ami_stream.h:112
@ PERSIST_READ_ONCE
Definition: ami_stream.h:118
@ PERSIST_DELETE
Definition: ami_stream.h:114
@ PERSIST_PERSISTENT
Definition: ami_stream.h:116
#define BASE_NAME
Definition: ami_stream.h:76
const char * ami_str_error[]
Definition: ami_stream.cpp:52
#define DEBUG_ASSERT
Definition: ami_stream.h:69
int ami_single_temp_name(const std::string &base, char *tmp_path)
Definition: ami_stream.cpp:71
AMI_err
Definition: ami_stream.h:83
@ AMI_ERROR_END_OF_STREAM
Definition: ami_stream.h:86
@ AMI_ERROR_IO_ERROR
Definition: ami_stream.h:85
@ AMI_ERROR_MM_ERROR
Definition: ami_stream.h:90
@ AMI_ERROR_PERMISSION_DENIED
Definition: ami_stream.h:92
@ AMI_ERROR_NO_MAIN_MEMORY_OPERATION
Definition: ami_stream.h:96
@ AMI_ERROR_INSUFFICIENT_AVAILABLE_STREAMS
Definition: ami_stream.h:94
@ AMI_ERROR_OS_ERROR
Definition: ami_stream.h:89
@ AMI_ERROR_OBJECT_INITIALIZATION
Definition: ami_stream.h:91
@ AMI_ERROR_INSUFFICIENT_MAIN_MEMORY
Definition: ami_stream.h:93
@ AMI_ERROR_READ_ONLY
Definition: ami_stream.h:88
@ AMI_ERROR_NO_ERROR
Definition: ami_stream.h:84
@ AMI_ERROR_OUT_OF_RANGE
Definition: ami_stream.h:87
@ AMI_ERROR_ENV_UNDEFINED
Definition: ami_stream.h:95
FILE * open_stream(int fd, AMI_stream_type st)
Definition: ami_stream.cpp:100
#define NULL
Definition: ccmath.h:32
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition: ami_stream.h:477
char * sprint()
Definition: ami_stream.h:652
AMI_err write_item(const T &elt)
Definition: ami_stream.h:588
AMI_err write_array(const T *data, off_t len)
Definition: ami_stream.h:613
AMI_err read_array(T *data, off_t len, off_t *lenp=NULL)
Definition: ami_stream.h:554
AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end, AMI_STREAM< T > **sub_stream)
Definition: ami_stream.h:313
AMI_err seek(off_t offset)
Definition: ami_stream.h:445
AMI_err read_item(T **elt)
Definition: ami_stream.h:525
off_t stream_len(void)
Definition: ami_stream.h:375
int eof()
Definition: ami_stream.h:662
~AMI_STREAM(void)
Definition: ami_stream.h:500
void persist(persistence p)
Definition: ami_stream.h:639
const char * name() const
Definition: ami_stream.h:437
off_t logical_bos
Definition: ami_stream.h:137
off_t logical_eos
Definition: ami_stream.h:138
persistence per
Definition: ami_stream.h:128
AMI_stream_type access_mode
Definition: ami_stream.h:126
static unsigned int get_block_length()
Definition: ami_stream.h:145
unsigned int substream_level
Definition: ami_stream.h:131
void G_fseek(FILE *, off_t, int)
Change the file position of the stream.
Definition: gis/seek.c:50
off_t G_ftell(FILE *)
Get the current file position of the stream.
Definition: gis/seek.c:29
#define assert(condition)
Definition: lz4.c:291
MM_stream_usage
Definition: mm.h:68
@ MM_STREAM_USAGE_OVERHEAD
Definition: mm.h:70
@ MM_STREAM_USAGE_MAXIMUM
Definition: mm.h:79
@ MM_STREAM_USAGE_BUFFER
Definition: mm.h:73
@ MM_STREAM_USAGE_CURRENT
Definition: mm.h:76
struct state * st
Definition: parser.c:104
#define strcpy
Definition: parson.c:62
Definition: path.h:15