GRASS 8 Programmer's Manual 8.6.0dev(2026)-5f4f7ad06c
Loading...
Searching...
No Matches
ami_stream.h
Go to the documentation of this file.
1/****************************************************************************
2 *
3 * MODULE: iostream
4 *
5
6 * COPYRIGHT (C) 2007 Laura Toma
7 *
8 *
9
10 * Iostream is a library that implements streams, external memory
11 * sorting on streams, and an external memory priority queue on
12 * streams. These are the fundamental components used in external
13 * memory algorithms.
14
15 * Credits: The library was developed by Laura Toma. The kernel of
16 * class STREAM is based on the similar class existent in the GPL TPIE
17 * project developed at Duke University. The sorting and priority
18 * queue have been developed by Laura Toma based on communications
19 * with Rajiv Wickremesinghe. The library was developed as part of
20 * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21 * Rajiv Wickremesinghe as part of the Terracost project.
22
23 *
24 * This program is free software; you can redistribute it and/or modify
25 * it under the terms of the GNU General Public License as published by
26 * the Free Software Foundation; either version 2 of the License, or
27 * (at your option) any later version.
28 *
29
30 * This program is distributed in the hope that it will be useful,
31 * but WITHOUT ANY WARRANTY; without even the implied warranty of
32 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33 * General Public License for more details. *
34 * **************************************************************************/
35
36#ifndef _AMI_STREAM_H
37#define _AMI_STREAM_H
38
39#include <grass/config.h>
40
41#include <sys/types.h>
42#include <sys/stat.h>
43#include <stdio.h>
44#include <stdlib.h>
45#include <assert.h>
46#include <fcntl.h>
47#include <errno.h>
48#include <unistd.h>
49
50#include <cstring>
51#include <iostream>
52using std::cerr;
53using std::cout;
54using std::endl;
55using std::istream;
56using std::ofstream;
57using std::ostream;
58
59extern "C" {
60#include <grass/gis.h>
61}
62
63#define MAX_STREAMS_OPEN 200
64
65#include "mm.h" // Get the memory manager.
66
67#define DEBUG_DELETE if (0)
68#define DEBUG_STREAM_LEN if (0)
69#define DEBUG_ASSERT if (0)
70
71// The name of the environment variable which keeps the name of the
72// directory where streams are stored
73#define STREAM_TMPDIR "STREAM_DIR"
74
75// All streams will be names STREAM_*****
76#define BASE_NAME "STREAM"
77
78#define STREAM_BUFFER_SIZE (1 << 18)
79
80//
81// AMI error codes are returned using the AMI_err type.
82//
98
99extern const char *ami_str_error[];
100
101//
102// AMI stream types passed to constructors
103//
105 AMI_READ_STREAM = 1, // Open existing stream for reading
106 AMI_WRITE_STREAM, // Open for writing. Create if non-existent
107 AMI_APPEND_STREAM, // Open for writing at end. Create if needed.
108 AMI_READ_WRITE_STREAM, // Open to read and write.
109 AMI_APPEND_WRITE_STREAM // Open for writing at end (write only mode).
111
113 // Delete the stream from the disk when it is destructed.
115 // Do not delete the stream from the disk when it is destructed.
117 // Delete each block of data from the disk as it is read.
120
121/* an un-templated version makes for easier debugging */
123protected:
125 int fildes; // descriptor of file
129
130 // 0 for streams, positive for substreams
131 unsigned int substream_level;
132
133 // If this stream is actually a substream, these will be set to
134 // indicate the portion of the file that is part of this stream. If
135 // the stream is the whole file, they will be set to -1. Both are in
136 // T units.
139
140 // stream buffer passed in the call to setvbuf when file is opened
141 char *buf;
143
144public:
145 static unsigned int get_block_length()
146 {
147 return STREAM_BUFFER_SIZE;
148 // return getpagesize();
149 };
150};
151
152template <class T>
153class AMI_STREAM : public UntypedStream {
154
155protected:
156 T read_tmp; /* this is ugly... RW */
157
158public:
159 // An AMI_stream with default name
160 AMI_STREAM();
161
162 // An AMI stream based on a specific path name.
163 AMI_STREAM(const char *path_name,
165
166 // convenience function with split path_name
167 // AMI_STREAM(const char *dir_name, const char *file_name, AMI_stream_type
168 // st);
169
170 // A psuedo-constructor for substreams.
173
174 // Destructor
175 ~AMI_STREAM(void);
176
177 // Read and write elements.
178 AMI_err read_item(T **elt);
179 AMI_err write_item(const T &elt);
180 AMI_err read_array(T *data, off_t len, off_t *lenp = NULL);
181 AMI_err write_array(const T *data, off_t len);
182
183 // Return the number of items in the stream.
184 off_t stream_len(void);
185
186 // Return the path name of this stream.
187 AMI_err name(char **stream_name);
188 const char *name() const;
189
190 // Move to a specific item in the stream.
191 AMI_err seek(off_t offset);
192
193 // Query memory usage
194 static AMI_err
195 main_memory_usage(size_t *usage,
197
198 void persist(persistence p);
199
200 char *sprint();
201
202 // have we hit the end of the stream
203 int eof();
204};
205
206/**********************************************************************/
207
208/**********************************************************************/
209/* creates a random file name, opens the file for reading and writing
210 and and returns a file descriptor */
211/* int ami_single_temp_name(char *base, char* tmp_path); */
212/* fix from Andy Danner */
213int ami_single_temp_name(const std::string &base, char *tmp_path);
214
215/**********************************************************************/
216/* given fd=fide descriptor, associates with it a stream aopened in
217 access_mode and returns it */
218FILE *open_stream(int fd, AMI_stream_type st);
219
220/**********************************************************************/
221/* open the file whose name is pathname in access mode */
223
224/********************************************************************/
225// An AMI stream with default name.
226template <class T>
228{
229
232 fildes = fd;
234
235 /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
236 buf = new char[STREAM_BUFFER_SIZE];
237 if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
238 cerr << "ERROR: setvbuf failed (stream " << path
239 << ") with: " << strerror(errno) << endl;
240 exit(1);
241 }
242
243 // By default, all streams are deleted at destruction time.
245
246 // Not a substream.
247 substream_level = 0;
249
250 // why is this here in the first place?? -RW
251 seek(0);
252
253 eof_reached = 0;
254
255 // Register memory usage before returning.
256 // size_t usage;
257 // main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
258 // MM_manager.register_allocation(usage);
259}
260
261/**********************************************************************/
262// An AMI stream based on a specific path name.
263template <class T>
265{
266
267 access_mode = st;
268
269 if (path_name == NULL) {
271 fildes = fd;
273 }
274 else {
276 fp = open_stream(path, st);
277 fildes = -1;
278 }
279
280 /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
281 buf = new char[STREAM_BUFFER_SIZE];
282 if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
283 cerr << "ERROR: setvbuf failed (stream " << path
284 << ") with: " << strerror(errno) << endl;
285 exit(1);
286 }
287
288 eof_reached = 0;
289
290 // By default, all streams are deleted at destruction time.
291 if (st == AMI_READ_STREAM) {
293 }
294 else {
296 }
297
298 // Not a substream.
299 substream_level = 0;
301
302 seek(0);
303
304 // Register memory usage before returning.
305 // size_t usage;
306 // main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
307 // MM_manager.register_allocation(usage);
308}
309
310/**********************************************************************/
311// A psuedo-constructor for substreams.
312template <class T>
315{
316
317 // assume this for now
319
320#ifdef __MINGW32__
321 /* MINGW32: reopen file here for stream_len() below */
322 // reopen the file
324#endif
325
326 // check range
327 if (substream_level) {
328 if ((sub_begin >= (logical_eos - logical_bos)) ||
330
332 }
333 }
334 else {
335 off_t len = stream_len();
336 if (sub_begin > len || sub_end > len) {
337
339 }
340 }
341
342#ifndef __MINGW32__
343 // reopen the file
345#endif
346
347 // Set up the beginning and end positions.
348 if (substream_level) {
351 }
352 else {
355 }
356
357 // Set the current position.
358 substr->seek(0);
359
360 substr->eof_reached = 0;
361
362 // set substream level
364
365 substr->per = per; // set persistence
366
367 //*sub_stream = (AMI_base_stream < T > *)substr;
369 return AMI_ERROR_NO_ERROR;
370}
371
372/**********************************************************************/
373// Return the number of items in the stream.
374template <class T>
376{
377
378 fflush(fp);
379
380#ifdef __MINGW32__
381 // stat() fails on MS Windows if the file is open, so try G_ftell() instead.
382 // FIXME: not 64bit safe, but WinGrass isn't either right now.
384
386 if (posn_save == -1) {
387 perror("ERROR: AMI_STREAM::stream_len(): ftell(fp) failed ");
388 perror(path);
389 exit(1);
390 }
391
392 G_fseek(fp, 0, SEEK_END);
393 st_size = G_ftell(fp);
394 if (st_size == -1) {
395 perror("ERROR: AMI_STREAM::stream_len(): ftell[SEEK_END] failed ");
396 perror(path);
397 exit(1);
398 }
399
401
402 // debug stream_len:
403 DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%d\n", path,
404 st_size, sizeof(T));
405
406 return (st_size / sizeof(T));
407#else
408 struct stat statbuf;
409 if (stat(path, &statbuf) == -1) {
410 perror("AMI_STREAM::stream_len(): fstat failed ");
412 exit(1);
413 }
414
415 // debug stream_len:
416 DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%zud\n",
417 path, (long long int)statbuf.st_size, sizeof(T));
418
419 return (statbuf.st_size / sizeof(T));
420#endif
421}
422
423/**********************************************************************/
424// Return the path name of this stream.
425template <class T>
427{
428
429 *stream_name = new char[strlen(path) + 1];
431
432 return AMI_ERROR_NO_ERROR;
433}
434
435// Return the path name of this stream.
436template <class T>
437const char *AMI_STREAM<T>::name() const
438{
439 return path;
440}
441
442/**********************************************************************/
443// Move to a specific offset within the (sub)stream.
444template <class T>
446{
447
449
450 if (substream_level) { // substream
451 if (offset > (unsigned)(logical_eos - logical_bos)) {
452 // offset out of range
453 cerr << "ERROR: AMI_STREAM::seek bos=" << logical_bos
454 << ", eos=" << logical_eos << ", offset " << offset
455 << " out of range.\n";
457 exit(1);
458 }
459 else {
460 // offset in range
461 seek_offset = (logical_bos + offset) * sizeof(T);
462 }
463 }
464 else {
465 // not a substream
466 seek_offset = offset * sizeof(T);
467 }
468
470
471 return AMI_ERROR_NO_ERROR;
472}
473
474/**********************************************************************/
475// Query memory usage
476template <class T>
479{
480
481 switch (usage_type) {
483 *usage = sizeof(AMI_STREAM<T>);
484 break;
486 // *usage = get_block_length();
487 *usage = STREAM_BUFFER_SIZE * sizeof(char);
488 break;
491 // *usage = sizeof (*this) + get_block_length();
492 *usage = sizeof(AMI_STREAM<T>) + STREAM_BUFFER_SIZE * sizeof(char);
493 break;
494 }
495 return AMI_ERROR_NO_ERROR;
496}
497
498/**********************************************************************/
499template <class T>
501{
502
503 DEBUG_DELETE cerr << "~AMI_STREAM: " << path << "(" << this << ")\n";
504 assert(fp);
505 fclose(fp);
506 delete buf;
507
508 // Get rid of the file if not persistent and if not substream.
509 if ((per != PERSIST_PERSISTENT) && (substream_level == 0)) {
510 if (unlink(path) == -1) {
511 cerr << "ERROR: AMI_STREAM: failed to unlink " << path << endl;
512 perror("cannot unlink: ");
514 exit(1);
515 }
516 }
517 // Register memory deallocation before returning.
518 // size_t usage;
519 // main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
520 // MM_manager.register_deallocation(usage);
521}
522
523/**********************************************************************/
524template <class T>
526{
527
528 assert(fp);
529
530 // if we go past substream range
531 if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
533 }
534 else {
535 if (fread((char *)(&read_tmp), sizeof(T), 1, fp) < 1) {
536 if (feof(fp)) {
537 eof_reached = 1;
539 }
540 else {
541 cerr << "ERROR: file=" << path << ":";
542 perror("cannot read!");
543 return AMI_ERROR_IO_ERROR;
544 }
545 }
546
547 *elt = &read_tmp;
548 return AMI_ERROR_NO_ERROR;
549 }
550}
551
552/**********************************************************************/
553template <class T>
555{
556 size_t nobj;
557 assert(fp);
558
559 // if we go past substream range
560 if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
561 eof_reached = 1;
563 }
564 else {
565 nobj = fread((void *)data, sizeof(T), len, fp);
566
567 if (nobj < len) { /* some kind of error */
568 if (feof(fp)) {
569 if (lenp)
570 *lenp = nobj;
571 eof_reached = 1;
573 }
574 else {
575 cerr << "ERROR: file=" << path << ":";
576 perror("cannot read!");
577 return AMI_ERROR_IO_ERROR;
578 }
579 }
580 if (lenp)
581 *lenp = nobj;
582 return AMI_ERROR_NO_ERROR;
583 }
584}
585
586/**********************************************************************/
587template <class T>
589{
590
591 assert(fp);
592 // if we go past substream range
593 if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
595 }
596 else {
597 if (fwrite((char *)(&elt), sizeof(T), 1, fp) < 1) {
598 cerr << "ERROR: AMI_STREAM::write_item failed.\n";
599 if (*path)
600 perror(path);
601 else
602 perror("AMI_STREAM::write_item: ");
604 exit(1);
605 }
606
607 return AMI_ERROR_NO_ERROR;
608 }
609}
610
611/**********************************************************************/
612template <class T>
614{
615 size_t nobj;
616
617 assert(fp);
618 // if we go past substream range
619 if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
621 }
622 else {
623 nobj = fwrite(data, sizeof(T), len, fp);
624 if (nobj < len) {
625 cerr << "ERROR: AMI_STREAM::write_array failed.\n";
626 if (*path)
627 perror(path);
628 else
629 perror("AMI_STREAM::write_array: ");
631 exit(1);
632 }
633 return AMI_ERROR_NO_ERROR;
634 }
635}
636
637/**********************************************************************/
638template <class T>
640{
641 per = p;
642}
643
644/**********************************************************************/
645// sprint()
646// Return a string describing the stream
647//
648// This function gives easy access to the file name, length.
649// It is not reentrant, but this should not be too much of a problem
650// if you are careful.
651template <class T>
653{
654 static char desc[BUFSIZ + 256];
655 snprintf(desc, sizeof(desc), "[AMI_STREAM %s %ld]", path,
656 (long)stream_len());
657 return desc;
658}
659
660/**********************************************************************/
661template <class T>
663{
664 return eof_reached;
665}
666
667#endif // _AMI_STREAM_H
#define STREAM_BUFFER_SIZE
Definition ami_stream.h:78
#define DEBUG_STREAM_LEN
Definition ami_stream.h:68
#define DEBUG_DELETE
Definition ami_stream.h:67
FILE * open_stream(int fd, AMI_stream_type st)
AMI_stream_type
Definition ami_stream.h:104
@ AMI_APPEND_STREAM
Definition ami_stream.h:107
@ AMI_WRITE_STREAM
Definition ami_stream.h:106
@ AMI_READ_STREAM
Definition ami_stream.h:105
@ AMI_READ_WRITE_STREAM
Definition ami_stream.h:108
@ AMI_APPEND_WRITE_STREAM
Definition ami_stream.h:109
persistence
Definition ami_stream.h:112
@ PERSIST_READ_ONCE
Definition ami_stream.h:118
@ PERSIST_DELETE
Definition ami_stream.h:114
@ PERSIST_PERSISTENT
Definition ami_stream.h:116
#define BASE_NAME
Definition ami_stream.h:76
const char * ami_str_error[]
#define DEBUG_ASSERT
Definition ami_stream.h:69
int ami_single_temp_name(const std::string &base, char *tmp_path)
AMI_err
Definition ami_stream.h:83
@ AMI_ERROR_END_OF_STREAM
Definition ami_stream.h:86
@ AMI_ERROR_IO_ERROR
Definition ami_stream.h:85
@ AMI_ERROR_MM_ERROR
Definition ami_stream.h:90
@ AMI_ERROR_PERMISSION_DENIED
Definition ami_stream.h:92
@ AMI_ERROR_NO_MAIN_MEMORY_OPERATION
Definition ami_stream.h:96
@ AMI_ERROR_INSUFFICIENT_AVAILABLE_STREAMS
Definition ami_stream.h:94
@ AMI_ERROR_OS_ERROR
Definition ami_stream.h:89
@ AMI_ERROR_OBJECT_INITIALIZATION
Definition ami_stream.h:91
@ AMI_ERROR_INSUFFICIENT_MAIN_MEMORY
Definition ami_stream.h:93
@ AMI_ERROR_READ_ONLY
Definition ami_stream.h:88
@ AMI_ERROR_NO_ERROR
Definition ami_stream.h:84
@ AMI_ERROR_OUT_OF_RANGE
Definition ami_stream.h:87
@ AMI_ERROR_ENV_UNDEFINED
Definition ami_stream.h:95
#define NULL
Definition ccmath.h:32
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition ami_stream.h:477
char * sprint()
Definition ami_stream.h:652
AMI_err write_item(const T &elt)
Definition ami_stream.h:588
AMI_err write_array(const T *data, off_t len)
Definition ami_stream.h:613
AMI_err read_array(T *data, off_t len, off_t *lenp=NULL)
Definition ami_stream.h:554
AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end, AMI_STREAM< T > **sub_stream)
Definition ami_stream.h:313
AMI_err seek(off_t offset)
Definition ami_stream.h:445
AMI_err read_item(T **elt)
Definition ami_stream.h:525
off_t stream_len(void)
Definition ami_stream.h:375
~AMI_STREAM(void)
Definition ami_stream.h:500
void persist(persistence p)
Definition ami_stream.h:639
const char * name() const
Definition ami_stream.h:437
char path[BUFSIZ]
Definition ami_stream.h:127
off_t logical_bos
Definition ami_stream.h:137
off_t logical_eos
Definition ami_stream.h:138
persistence per
Definition ami_stream.h:128
AMI_stream_type access_mode
Definition ami_stream.h:126
static unsigned int get_block_length()
Definition ami_stream.h:145
unsigned int substream_level
Definition ami_stream.h:131
void G_fseek(FILE *, off_t, int)
Change the file position of the stream.
Definition gis/seek.c:50
off_t G_ftell(FILE *)
Get the current file position of the stream.
Definition gis/seek.c:29
Header file for msvc/fcntl.c.
#define assert(condition)
Definition lz4.c:291
MM_stream_usage
Definition mm.h:68
@ MM_STREAM_USAGE_OVERHEAD
Definition mm.h:70
@ MM_STREAM_USAGE_MAXIMUM
Definition mm.h:79
@ MM_STREAM_USAGE_BUFFER
Definition mm.h:73
@ MM_STREAM_USAGE_CURRENT
Definition mm.h:76
struct state * st
Definition parser.c:104
#define strcpy
Definition parson.c:66
Definition path.h:15
#define unlink
Definition unistd.h:11