Source code for pygrass.utils

import fnmatch
import itertools
import os
from pathlib import Path
from sqlite3 import OperationalError

import grass.lib.gis as libgis
from grass.script import core as grasscore
from grass.script import utils as grassutils

# flake8: noqa: E402
libgis.G_gisinit("")

import grass.lib.raster as libraster
from grass.lib.ctypes_preamble import String
from grass.pygrass.errors import GrassError

# flake8: qa


test_vector_name = "Utils_test_vector"
test_raster_name = "Utils_test_raster"


[docs] def looking(obj, filter_string): """ >>> import grass.lib.vector as libvect >>> sorted(looking(libvect, "*by_box*")) # doctest: +NORMALIZE_WHITESPACE ['Vect_select_areas_by_box', 'Vect_select_isles_by_box', 'Vect_select_lines_by_box', 'Vect_select_nodes_by_box'] """ word_list = dir(obj) word_list.sort() return fnmatch.filter(word_list, filter_string)
[docs] def findfiles(dirpath, match=None): """Return a list of the files""" res = [] for f in sorted(os.listdir(dirpath)): abspath = os.path.join(dirpath, f) if Path(abspath).is_dir(): res.extend(findfiles(abspath, match)) if match: if fnmatch.fnmatch(abspath, match): res.append(abspath) else: res.append(abspath) return res
[docs] def findmaps(type, pattern=None, mapset="", location="", gisdbase=""): """Return a list of tuples containing the names of the: * map * mapset, * location, * gisdbase """ from grass.pygrass.gis import Gisdbase, Location, Mapset def find_in_location(type, pattern, location): res = [] for msetname in location.mapsets(): mset = Mapset(msetname, location.name, location.gisdbase) res.extend( [ (m, mset.name, mset.location, mset.gisdbase) for m in mset.glist(type, pattern) ] ) return res def find_in_gisdbase(type, pattern, gisdbase): return [ a for loc in gisdbase.locations() for a in find_in_location(type, pattern, Location(loc, gisdbase.name)) ] if gisdbase and location and mapset: mset = Mapset(mapset, location, gisdbase) return [ (m, mset.name, mset.location, mset.gisdbase) for m in mset.glist(type, pattern) ] if gisdbase and location: loc = Location(location, gisdbase) return find_in_location(type, pattern, loc) if gisdbase: gis = Gisdbase(gisdbase) return find_in_gisdbase(type, pattern, gis) if location: loc = Location(location) return find_in_location(type, pattern, loc) if mapset: mset = Mapset(mapset) return [ (m, mset.name, mset.location, mset.gisdbase) for m in mset.glist(type, pattern) ] gis = Gisdbase() return find_in_gisdbase(type, pattern, gis)
[docs] def remove(oldname, maptype): """Remove a map""" grasscore.run_command("g.remove", quiet=True, flags="f", type=maptype, name=oldname)
[docs] def rename(oldname, newname, maptype, **kwargs): """Rename a map""" kwargs.update( { maptype: "{old},{new}".format(old=oldname, new=newname), } ) grasscore.run_command("g.rename", quiet=True, **kwargs)
[docs] def copy(existingmap, newmap, maptype, **kwargs): """Copy a map >>> copy(test_vector_name, "mycensus", "vector") >>> rename("mycensus", "mynewcensus", "vector") >>> remove("mynewcensus", "vector") """ kwargs.update({maptype: "{old},{new}".format(old=existingmap, new=newmap)}) grasscore.run_command("g.copy", quiet=True, **kwargs)
[docs] def decode(obj, encoding=None): """Decode string coming from c functions, can be ctypes class String, bytes, or None """ if isinstance(obj, String): return grassutils.decode(obj.data, encoding=encoding) if isinstance(obj, bytes): return grassutils.decode(obj) # eg None return obj
[docs] def getenv(env): """Return the current grass environment variables >>> from grass.script.core import gisenv >>> getenv("MAPSET") == gisenv()["MAPSET"] True """ return decode(libgis.G_getenv_nofatal(env))
[docs] def get_mapset_raster(mapname, mapset=""): """Return the mapset of the raster map >>> get_mapset_raster(test_raster_name) == getenv("MAPSET") True """ return decode(libgis.G_find_raster2(mapname, mapset))
[docs] def get_mapset_vector(mapname, mapset=""): """Return the mapset of the vector map >>> get_mapset_vector(test_vector_name) == getenv("MAPSET") True """ return decode(libgis.G_find_vector2(mapname, mapset))
[docs] def is_clean_name(name) -> bool: """Return if the name is valid >>> is_clean_name("census") True >>> is_clean_name("0census") True >>> is_clean_name("census?") True >>> is_clean_name("cénsus") False """ return libgis.G_legal_filename(name) >= 0
[docs] def coor2pixel(coord, region): """Convert coordinates into a pixel row and col >>> from grass.pygrass.gis.region import Region >>> reg = Region() >>> coor2pixel((reg.west, reg.north), reg) (0.0, 0.0) >>> coor2pixel((reg.east, reg.south), reg) == (reg.rows, reg.cols) True """ (east, north) = coord return ( libraster.Rast_northing_to_row(north, region.byref()), libraster.Rast_easting_to_col(east, region.byref()), )
[docs] def pixel2coor(pixel, region): """Convert row and col of a pixel into a coordinates >>> from grass.pygrass.gis.region import Region >>> reg = Region() >>> pixel2coor((0, 0), reg) == (reg.north, reg.west) True >>> pixel2coor((reg.cols, reg.rows), reg) == (reg.south, reg.east) True """ (col, row) = pixel return ( libraster.Rast_row_to_northing(row, region.byref()), libraster.Rast_col_to_easting(col, region.byref()), )
[docs] def get_raster_for_points(poi_vector, raster, column=None, region=None): """Query a raster map for each point feature of a vector Example >>> from grass.pygrass.raster import RasterRow >>> from grass.pygrass.gis.region import Region >>> from grass.pygrass.vector import VectorTopo >>> from grass.pygrass.vector.geometry import Point Create a vector map >>> cols = [("cat", "INTEGER PRIMARY KEY"), ("value", "double precision")] >>> vect = VectorTopo("test_vect_2") >>> vect.open("w", tab_name="test_vect_2", tab_cols=cols) >>> vect.write( ... Point(10, 6), ... cat=1, ... attrs=[ ... 10, ... ], ... ) >>> vect.write( ... Point(12, 6), ... cat=2, ... attrs=[ ... 12, ... ], ... ) >>> vect.write( ... Point(14, 6), ... cat=3, ... attrs=[ ... 14, ... ], ... ) >>> vect.table.conn.commit() >>> vect.close() Setup the raster sampling >>> region = Region() >>> region.from_rast(test_raster_name) >>> region.set_raster_region() >>> ele = RasterRow(test_raster_name) Sample the raster layer at the given points, return a list of values >>> l = get_raster_for_points(vect, ele, region=region) >>> l[0] # doctest: +ELLIPSIS (1, 10.0, 6.0, 1) >>> l[1] # doctest: +ELLIPSIS (2, 12.0, 6.0, 1) Add a new column and sample again >>> vect.open("r") >>> vect.table.columns.add(test_raster_name, "double precision") >>> vect.table.conn.commit() >>> test_raster_name in vect.table.columns True >>> get_raster_for_points(vect, ele, column=test_raster_name, region=region) True >>> vect.table.filters.select("value", test_raster_name) Filters('SELECT value, Utils_test_raster FROM test_vect_2;') >>> cur = vect.table.execute() >>> r = cur.fetchall() >>> r[0] # doctest: +ELLIPSIS (10.0, 1.0) >>> r[1] # doctest: +ELLIPSIS (12.0, 1.0) >>> vect.close() >>> remove("test_vect_2", "vect") :param poi_vector: A VectorTopo object that contains points :param raster: raster object :param str column: column name to update in the attrinute table, if set to None a list of sampled values will be returned :param region: The region to work with, if not set the current computational region will be used :return: True in case of success and a specified column for update, if column name for update was not set a list of (id, x, y, value) is returned """ from math import isnan if not column: result = [] if region is None: from grass.pygrass.gis.region import Region region = Region() if not poi_vector.is_open(): poi_vector.open("r") if not raster.is_open(): raster.open("r") if poi_vector.num_primitive_of("point") == 0: raise GrassError(_("Vector doesn't contain points")) for poi in poi_vector.viter("points"): val = raster.get_value(poi, region) if column: if val is not None and not isnan(val): poi.attrs[column] = val else: # noqa: PLR5501 if val is not None and not isnan(val): result.append((poi.id, poi.x, poi.y, val)) else: result.append((poi.id, poi.x, poi.y, None)) if not column: return result poi.attrs.commit() return True
[docs] def r_export(rast, output="", fmt="png", **kargs): from grass.pygrass.modules import Module if rast.exist(): output = output or "%s_%s.%s" % (rast.name, rast.mapset, fmt) Module( "r.out.%s" % fmt, input=rast.fullname(), output=output, overwrite=True, **kargs, ) return output msg = "Raster map does not exist." raise ValueError(msg)
[docs] def get_lib_path(modname, libname=None): """Return the path of the libname contained in the module. .. deprecated:: 7.1 Use :func:`grass.script.utils.get_lib_path` instead. """ from grass.script.utils import get_lib_path return get_lib_path(modname=modname, libname=libname)
[docs] def set_path(modulename, dirname=None, path="."): """Set sys.path looking in the the local directory GRASS directories. :param modulename: string with the name of the GRASS module :param dirname: string with the directory name containing the python libraries, default None :param path: string with the path to reach the dirname locally. .. deprecated:: 7.1 Use :func:`grass.script.utils.set_path` instead. """ from grass.script.utils import set_path return set_path(modulename=modulename, dirname=dirname, path=path)
[docs] def split_in_chunk(iterable, length=10): """Split a list in chunk. >>> for chunk in split_in_chunk(range(25)): ... print(chunk) (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) (10, 11, 12, 13, 14, 15, 16, 17, 18, 19) (20, 21, 22, 23, 24) >>> for chunk in split_in_chunk(range(25), 3): ... print(chunk) (0, 1, 2) (3, 4, 5) (6, 7, 8) (9, 10, 11) (12, 13, 14) (15, 16, 17) (18, 19, 20) (21, 22, 23) (24,) """ it = iter(iterable) while True: chunk = tuple(itertools.islice(it, length)) if not chunk: return yield chunk
[docs] def table_exist(cursor, table_name): """Return True if the table exist False otherwise""" try: # sqlite cursor.execute( "SELECT name FROM sqlite_master" " WHERE type='table' AND name='%s';" % table_name ) except OperationalError: try: # pg cursor.execute( "SELECT EXISTS(SELECT * FROM " "information_schema.tables " "WHERE table_name=%s)" % table_name ) except OperationalError: return False one = cursor.fetchone() if cursor else None return bool(one and one[0])
[docs] def create_test_vector_map(map_name="test_vector"): """This functions creates a vector map layer with points, lines, boundaries, centroids, areas, isles and attributes for testing purposes This should be used in doc and unit tests to create location/mapset independent vector map layer. This map includes 3 points, 3 lines, 11 boundaries and 4 centroids. The attribute table contains cat, name and value columns. :param map_name: The vector map name that should be used .. code-block:: none P1 P2 P3 6 * * * 5 4 _______ ___ ___ L1 L2 L3 Y 3 |A1___ *| *| *| | | | 2 | |A2*| | | | | | | 1 | |___| |A3 |A4 | | | | 0 |_______|___|___| | | | -1 -1 0 1 2 3 4 5 6 7 8 9 10 12 14 X """ from grass.pygrass.vector import VectorTopo from grass.pygrass.vector.geometry import Boundary, Centroid, Line, Point cols = [ ("cat", "INTEGER PRIMARY KEY"), ("name", "varchar(50)"), ("value", "double precision"), ] with VectorTopo(map_name, mode="w", tab_name=map_name, tab_cols=cols) as vect: # Write 3 points vect.write(Point(10, 6), cat=1, attrs=("point", 1)) vect.write(Point(12, 6), cat=1) vect.write(Point(14, 6), cat=1) # Write 3 lines vect.write(Line([(10, 4), (10, 2), (10, 0)]), cat=2, attrs=("line", 2)) vect.write(Line([(12, 4), (12, 2), (12, 0)]), cat=2) vect.write(Line([(14, 4), (14, 2), (14, 0)]), cat=2) # boundaries 1 - 4 vect.write(Boundary(points=[(0, 0), (0, 4)])) vect.write(Boundary(points=[(0, 4), (4, 4)])) vect.write(Boundary(points=[(4, 4), (4, 0)])) vect.write(Boundary(points=[(4, 0), (0, 0)])) # 5. boundary (Isle) vect.write(Boundary(points=[(1, 1), (1, 3), (3, 3), (3, 1), (1, 1)])) # boundaries 6 - 8 vect.write(Boundary(points=[(4, 4), (6, 4)])) vect.write(Boundary(points=[(6, 4), (6, 0)])) vect.write(Boundary(points=[(6, 0), (4, 0)])) # boundaries 9 - 11 vect.write(Boundary(points=[(6, 4), (8, 4)])) vect.write(Boundary(points=[(8, 4), (8, 0)])) vect.write(Boundary(points=[(8, 0), (6, 0)])) # Centroids, all have the same cat and attribute vect.write(Centroid(x=3.5, y=3.5), cat=3, attrs=("centroid", 3)) vect.write(Centroid(x=2.5, y=2.5), cat=3) vect.write(Centroid(x=5.5, y=3.5), cat=3) vect.write(Centroid(x=7.5, y=3.5), cat=3) vect.organization = "Thuenen Institut" vect.person = "Soeren Gebbert" vect.title = "Test dataset" vect.comment = "This is a comment" vect.table.conn.commit() vect.organization = "Thuenen Institut" vect.person = "Soeren Gebbert" vect.title = "Test dataset" vect.comment = "This is a comment" vect.close()
[docs] def create_test_stream_network_map(map_name="streams"): R"""Create test data This functions creates a vector map layer with lines that represent a stream network with two different graphs. The first graph contains a loop, the second can be used as directed graph. This should be used in doc and unit tests to create location/mapset independent vector map layer. :param map_name: The vector map name that should be used .. code-block:: none 1(0,2) 3(2,2) \ / 1 \ / 2 \ / 2(1,1) 6(0,1) || 5(2,1) 5 \ || / 4 \||/ 4(1,0) | | 6 |7(1,-1) 7(0,-1) 8(2,-1) \ / 8 \ / 9 \ / 9(1, -2) | | 10 | 10(1,-3) """ from grass.pygrass.vector import VectorTopo from grass.pygrass.vector.geometry import Line cols = [("cat", "INTEGER PRIMARY KEY"), ("id", "INTEGER")] with VectorTopo(map_name, mode="w", tab_name=map_name, tab_cols=cols) as streams: # First flow graph line = Line([(0, 2), (0.22, 1.75), (0.55, 1.5), (1, 1)]) streams.write(line, cat=1, attrs=(1,)) line = Line([(2, 2), (1, 1)]) streams.write(line, cat=2, attrs=(2,)) line = Line([(1, 1), (0.85, 0.5), (1, 0)]) streams.write(line, cat=3, attrs=(3,)) line = Line([(2, 1), (1, 0)]) streams.write(line, cat=4, attrs=(4,)) line = Line([(0, 1), (1, 0)]) streams.write(line, cat=5, attrs=(5,)) line = Line([(1, 0), (1, -1)]) streams.write(line, cat=6, attrs=(6,)) # Reverse line 3 line = Line([(1, 0), (1.15, 0.5), (1, 1)]) streams.write(line, cat=7, attrs=(7,)) # second flow graph line = Line([(0, -1), (1, -2)]) streams.write(line, cat=8, attrs=(8,)) line = Line([(2, -1), (1, -2)]) streams.write(line, cat=9, attrs=(9,)) line = Line([(1, -2), (1, -3)]) streams.write(line, cat=10, attrs=(10,)) streams.organization = "Thuenen Institut" streams.person = "Soeren Gebbert" streams.title = "Test dataset for stream networks" streams.comment = "This is a comment" streams.table.conn.commit() streams.close()
if __name__ == "__main__": import doctest from grass.script.core import run_command create_test_vector_map(test_vector_name) run_command("g.region", n=50, s=0, e=60, w=0, res=1) run_command("r.mapcalc", expression="%s = 1" % (test_raster_name), overwrite=True) doctest.testmod() mset = get_mapset_vector(test_vector_name, mapset="") if mset: # Remove the generated vector map, if exists run_command("g.remove", flags="f", type="vector", name=test_vector_name) mset = get_mapset_raster(test_raster_name, mapset="") if mset: # Remove the generated raster map, if exists run_command("g.remove", flags="f", type="raster", name=test_raster_name)