import fnmatch
import itertools
import os
from sqlite3 import OperationalError
import grass.lib.gis as libgis
from grass.script import core as grasscore
from grass.script import utils as grassutils
# flake8: noqa: E402
libgis.G_gisinit("")
import grass.lib.raster as libraster
from grass.lib.ctypes_preamble import String
from grass.pygrass.errors import GrassError
# flake8: qa
test_vector_name = "Utils_test_vector"
test_raster_name = "Utils_test_raster"
[docs]def looking(obj, filter_string):
"""
>>> import grass.lib.vector as libvect
>>> sorted(looking(libvect, "*by_box*")) # doctest: +NORMALIZE_WHITESPACE
['Vect_select_areas_by_box', 'Vect_select_isles_by_box',
'Vect_select_lines_by_box', 'Vect_select_nodes_by_box']
"""
word_list = dir(obj)
word_list.sort()
return fnmatch.filter(word_list, filter_string)
[docs]def findfiles(dirpath, match=None):
"""Return a list of the files"""
res = []
for f in sorted(os.listdir(dirpath)):
abspath = os.path.join(dirpath, f)
if os.path.isdir(abspath):
res.extend(findfiles(abspath, match))
if match:
if fnmatch.fnmatch(abspath, match):
res.append(abspath)
else:
res.append(abspath)
return res
[docs]def findmaps(type, pattern=None, mapset="", location="", gisdbase=""):
"""Return a list of tuple contining the names of the:
* map
* mapset,
* location,
* gisdbase
"""
from grass.pygrass.gis import Gisdbase, Location, Mapset
def find_in_location(type, pattern, location):
res = []
for msetname in location.mapsets():
mset = Mapset(msetname, location.name, location.gisdbase)
res.extend(
[
(m, mset.name, mset.location, mset.gisdbase)
for m in mset.glist(type, pattern)
]
)
return res
def find_in_gisdbase(type, pattern, gisdbase):
res = []
for loc in gisdbase.locations():
res.extend(find_in_location(type, pattern, Location(loc, gisdbase.name)))
return res
if gisdbase and location and mapset:
mset = Mapset(mapset, location, gisdbase)
return [
(m, mset.name, mset.location, mset.gisdbase)
for m in mset.glist(type, pattern)
]
if gisdbase and location:
loc = Location(location, gisdbase)
return find_in_location(type, pattern, loc)
if gisdbase:
gis = Gisdbase(gisdbase)
return find_in_gisdbase(type, pattern, gis)
if location:
loc = Location(location)
return find_in_location(type, pattern, loc)
if mapset:
mset = Mapset(mapset)
return [
(m, mset.name, mset.location, mset.gisdbase)
for m in mset.glist(type, pattern)
]
gis = Gisdbase()
return find_in_gisdbase(type, pattern, gis)
[docs]def remove(oldname, maptype):
"""Remove a map"""
grasscore.run_command("g.remove", quiet=True, flags="f", type=maptype, name=oldname)
[docs]def rename(oldname, newname, maptype, **kwargs):
"""Rename a map"""
kwargs.update(
{
maptype: "{old},{new}".format(old=oldname, new=newname),
}
)
grasscore.run_command("g.rename", quiet=True, **kwargs)
[docs]def copy(existingmap, newmap, maptype, **kwargs):
"""Copy a map
>>> copy(test_vector_name, "mycensus", "vector")
>>> rename("mycensus", "mynewcensus", "vector")
>>> remove("mynewcensus", "vector")
"""
kwargs.update({maptype: "{old},{new}".format(old=existingmap, new=newmap)})
grasscore.run_command("g.copy", quiet=True, **kwargs)
[docs]def decode(obj, encoding=None):
"""Decode string coming from c functions,
can be ctypes class String, bytes, or None
"""
if isinstance(obj, String):
return grassutils.decode(obj.data, encoding=encoding)
if isinstance(obj, bytes):
return grassutils.decode(obj)
# eg None
return obj
[docs]def getenv(env):
"""Return the current grass environment variables
>>> from grass.script.core import gisenv
>>> getenv("MAPSET") == gisenv()["MAPSET"]
True
"""
return decode(libgis.G_getenv_nofatal(env))
[docs]def get_mapset_raster(mapname, mapset=""):
"""Return the mapset of the raster map
>>> get_mapset_raster(test_raster_name) == getenv("MAPSET")
True
"""
return decode(libgis.G_find_raster2(mapname, mapset))
[docs]def get_mapset_vector(mapname, mapset=""):
"""Return the mapset of the vector map
>>> get_mapset_vector(test_vector_name) == getenv("MAPSET")
True
"""
return decode(libgis.G_find_vector2(mapname, mapset))
[docs]def is_clean_name(name) -> bool:
"""Return if the name is valid
>>> is_clean_name("census")
True
>>> is_clean_name("0census")
True
>>> is_clean_name("census?")
True
>>> is_clean_name("cénsus")
False
"""
return not libgis.G_legal_filename(name) < 0
[docs]def coor2pixel(coord, region):
"""Convert coordinates into a pixel row and col
>>> from grass.pygrass.gis.region import Region
>>> reg = Region()
>>> coor2pixel((reg.west, reg.north), reg)
(0.0, 0.0)
>>> coor2pixel((reg.east, reg.south), reg) == (reg.rows, reg.cols)
True
"""
(east, north) = coord
return (
libraster.Rast_northing_to_row(north, region.byref()),
libraster.Rast_easting_to_col(east, region.byref()),
)
[docs]def pixel2coor(pixel, region):
"""Convert row and col of a pixel into a coordinates
>>> from grass.pygrass.gis.region import Region
>>> reg = Region()
>>> pixel2coor((0, 0), reg) == (reg.north, reg.west)
True
>>> pixel2coor((reg.cols, reg.rows), reg) == (reg.south, reg.east)
True
"""
(col, row) = pixel
return (
libraster.Rast_row_to_northing(row, region.byref()),
libraster.Rast_col_to_easting(col, region.byref()),
)
[docs]def get_raster_for_points(poi_vector, raster, column=None, region=None):
"""Query a raster map for each point feature of a vector
Example
>>> from grass.pygrass.raster import RasterRow
>>> from grass.pygrass.gis.region import Region
>>> from grass.pygrass.vector import VectorTopo
>>> from grass.pygrass.vector.geometry import Point
Create a vector map
>>> cols = [("cat", "INTEGER PRIMARY KEY"), ("value", "double precision")]
>>> vect = VectorTopo("test_vect_2")
>>> vect.open("w", tab_name="test_vect_2", tab_cols=cols)
>>> vect.write(
... Point(10, 6),
... cat=1,
... attrs=[
... 10,
... ],
... )
>>> vect.write(
... Point(12, 6),
... cat=2,
... attrs=[
... 12,
... ],
... )
>>> vect.write(
... Point(14, 6),
... cat=3,
... attrs=[
... 14,
... ],
... )
>>> vect.table.conn.commit()
>>> vect.close()
Setup the raster sampling
>>> region = Region()
>>> region.from_rast(test_raster_name)
>>> region.set_raster_region()
>>> ele = RasterRow(test_raster_name)
Sample the raster layer at the given points, return a list of values
>>> l = get_raster_for_points(vect, ele, region=region)
>>> l[0] # doctest: +ELLIPSIS
(1, 10.0, 6.0, 1)
>>> l[1] # doctest: +ELLIPSIS
(2, 12.0, 6.0, 1)
Add a new column and sample again
>>> vect.open("r")
>>> vect.table.columns.add(test_raster_name, "double precision")
>>> vect.table.conn.commit()
>>> test_raster_name in vect.table.columns
True
>>> get_raster_for_points(vect, ele, column=test_raster_name, region=region)
True
>>> vect.table.filters.select("value", test_raster_name)
Filters('SELECT value, Utils_test_raster FROM test_vect_2;')
>>> cur = vect.table.execute()
>>> r = cur.fetchall()
>>> r[0] # doctest: +ELLIPSIS
(10.0, 1.0)
>>> r[1] # doctest: +ELLIPSIS
(12.0, 1.0)
>>> remove("test_vect_2", "vect")
:param poi_vector: A VectorTopo object that contains points
:param raster: raster object
:param str column: column name to update in the attrinute table,
if set to None a list of sampled values will be returned
:param region: The region to work with, if not set the current computational region
will be used
:return: True in case of success and a specified column for update,
if column name for update was not set a list of (id, x, y, value) is
returned
"""
from math import isnan
if not column:
result = []
if region is None:
from grass.pygrass.gis.region import Region
region = Region()
if not poi_vector.is_open():
poi_vector.open("r")
if not raster.is_open():
raster.open("r")
if poi_vector.num_primitive_of("point") == 0:
raise GrassError(_("Vector doesn't contain points"))
for poi in poi_vector.viter("points"):
val = raster.get_value(poi, region)
if column:
if val is not None and not isnan(val):
poi.attrs[column] = val
else: # noqa: PLR5501
if val is not None and not isnan(val):
result.append((poi.id, poi.x, poi.y, val))
else:
result.append((poi.id, poi.x, poi.y, None))
if not column:
return result
poi.attrs.commit()
return True
[docs]def r_export(rast, output="", fmt="png", **kargs):
from grass.pygrass.modules import Module
if rast.exist():
output = output or "%s_%s.%s" % (rast.name, rast.mapset, fmt)
Module(
"r.out.%s" % fmt,
input=rast.fullname(),
output=output,
overwrite=True,
**kargs,
)
return output
msg = "Raster map does not exist."
raise ValueError(msg)
[docs]def get_lib_path(modname, libname=None):
"""Return the path of the libname contained in the module.
.. deprecated:: 7.1
Use :func:`grass.script.utils.get_lib_path` instead.
"""
from grass.script.utils import get_lib_path
return get_lib_path(modname=modname, libname=libname)
[docs]def set_path(modulename, dirname=None, path="."):
"""Set sys.path looking in the the local directory GRASS directories.
:param modulename: string with the name of the GRASS module
:param dirname: string with the directory name containing the python
libraries, default None
:param path: string with the path to reach the dirname locally.
.. deprecated:: 7.1
Use :func:`grass.script.utils.set_path` instead.
"""
from grass.script.utils import set_path
return set_path(modulename=modulename, dirname=dirname, path=path)
[docs]def split_in_chunk(iterable, length=10):
"""Split a list in chunk.
>>> for chunk in split_in_chunk(range(25)):
... print(chunk)
...
(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
(10, 11, 12, 13, 14, 15, 16, 17, 18, 19)
(20, 21, 22, 23, 24)
>>> for chunk in split_in_chunk(range(25), 3):
... print(chunk)
...
(0, 1, 2)
(3, 4, 5)
(6, 7, 8)
(9, 10, 11)
(12, 13, 14)
(15, 16, 17)
(18, 19, 20)
(21, 22, 23)
(24,)
"""
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, length))
if not chunk:
return
yield chunk
[docs]def table_exist(cursor, table_name):
"""Return True if the table exist False otherwise"""
try:
# sqlite
cursor.execute(
"SELECT name FROM sqlite_master"
" WHERE type='table' AND name='%s';" % table_name
)
except OperationalError:
try:
# pg
cursor.execute(
"SELECT EXISTS(SELECT * FROM "
"information_schema.tables "
"WHERE table_name=%s)" % table_name
)
except OperationalError:
return False
one = cursor.fetchone() if cursor else None
return bool(one and one[0])
[docs]def create_test_vector_map(map_name="test_vector"):
"""This functions creates a vector map layer with points, lines, boundaries,
centroids, areas, isles and attributes for testing purposes
This should be used in doc and unit tests to create location/mapset
independent vector map layer. This map includes 3 points, 3 lines,
11 boundaries and 4 centroids. The attribute table contains cat, name
and value columns.
param map_name: The vector map name that should be used
P1 P2 P3
6 * * *
5
4 _______ ___ ___ L1 L2 L3
Y 3 |A1___ *| *| *| | | |
2 | |A2*| | | | | | |
1 | |___| |A3 |A4 | | | |
0 |_______|___|___| | | |
-1
-1 0 1 2 3 4 5 6 7 8 9 10 12 14
X
"""
from grass.pygrass.vector import VectorTopo
from grass.pygrass.vector.geometry import Boundary, Centroid, Line, Point
cols = [
("cat", "INTEGER PRIMARY KEY"),
("name", "varchar(50)"),
("value", "double precision"),
]
with VectorTopo(map_name, mode="w", tab_name=map_name, tab_cols=cols) as vect:
# Write 3 points
vect.write(Point(10, 6), cat=1, attrs=("point", 1))
vect.write(Point(12, 6), cat=1)
vect.write(Point(14, 6), cat=1)
# Write 3 lines
vect.write(Line([(10, 4), (10, 2), (10, 0)]), cat=2, attrs=("line", 2))
vect.write(Line([(12, 4), (12, 2), (12, 0)]), cat=2)
vect.write(Line([(14, 4), (14, 2), (14, 0)]), cat=2)
# boundaries 1 - 4
vect.write(Boundary(points=[(0, 0), (0, 4)]))
vect.write(Boundary(points=[(0, 4), (4, 4)]))
vect.write(Boundary(points=[(4, 4), (4, 0)]))
vect.write(Boundary(points=[(4, 0), (0, 0)]))
# 5. boundary (Isle)
vect.write(Boundary(points=[(1, 1), (1, 3), (3, 3), (3, 1), (1, 1)]))
# boundaries 6 - 8
vect.write(Boundary(points=[(4, 4), (6, 4)]))
vect.write(Boundary(points=[(6, 4), (6, 0)]))
vect.write(Boundary(points=[(6, 0), (4, 0)]))
# boundaries 9 - 11
vect.write(Boundary(points=[(6, 4), (8, 4)]))
vect.write(Boundary(points=[(8, 4), (8, 0)]))
vect.write(Boundary(points=[(8, 0), (6, 0)]))
# Centroids, all have the same cat and attribute
vect.write(Centroid(x=3.5, y=3.5), cat=3, attrs=("centroid", 3))
vect.write(Centroid(x=2.5, y=2.5), cat=3)
vect.write(Centroid(x=5.5, y=3.5), cat=3)
vect.write(Centroid(x=7.5, y=3.5), cat=3)
vect.organization = "Thuenen Institut"
vect.person = "Soeren Gebbert"
vect.title = "Test dataset"
vect.comment = "This is a comment"
vect.table.conn.commit()
vect.organization = "Thuenen Institut"
vect.person = "Soeren Gebbert"
vect.title = "Test dataset"
vect.comment = "This is a comment"
vect.close()
[docs]def create_test_stream_network_map(map_name="streams"):
R"""Create test data
This functions creates a vector map layer with lines that represent
a stream network with two different graphs. The first graph
contains a loop, the second can be used as directed graph.
This should be used in doc and unit tests to create location/mapset
independent vector map layer.
param map_name: The vector map name that should be used
1(0,2) 3(2,2)
\ /
1 \ / 2
\ /
2(1,1)
6(0,1) || 5(2,1)
5 \ || / 4
\||/
4(1,0)
|
| 6
|7(1,-1)
7(0,-1) 8(2,-1)
\ /
8 \ / 9
\ /
9(1, -2)
|
| 10
|
10(1,-3)
"""
from grass.pygrass.vector import VectorTopo
from grass.pygrass.vector.geometry import Line
cols = [("cat", "INTEGER PRIMARY KEY"), ("id", "INTEGER")]
with VectorTopo(map_name, mode="w", tab_name=map_name, tab_cols=cols) as streams:
# First flow graph
line = Line([(0, 2), (0.22, 1.75), (0.55, 1.5), (1, 1)])
streams.write(line, cat=1, attrs=(1,))
line = Line([(2, 2), (1, 1)])
streams.write(line, cat=2, attrs=(2,))
line = Line([(1, 1), (0.85, 0.5), (1, 0)])
streams.write(line, cat=3, attrs=(3,))
line = Line([(2, 1), (1, 0)])
streams.write(line, cat=4, attrs=(4,))
line = Line([(0, 1), (1, 0)])
streams.write(line, cat=5, attrs=(5,))
line = Line([(1, 0), (1, -1)])
streams.write(line, cat=6, attrs=(6,))
# Reverse line 3
line = Line([(1, 0), (1.15, 0.5), (1, 1)])
streams.write(line, cat=7, attrs=(7,))
# second flow graph
line = Line([(0, -1), (1, -2)])
streams.write(line, cat=8, attrs=(8,))
line = Line([(2, -1), (1, -2)])
streams.write(line, cat=9, attrs=(9,))
line = Line([(1, -2), (1, -3)])
streams.write(line, cat=10, attrs=(10,))
streams.organization = "Thuenen Institut"
streams.person = "Soeren Gebbert"
streams.title = "Test dataset for stream networks"
streams.comment = "This is a comment"
streams.table.conn.commit()
streams.close()
if __name__ == "__main__":
import doctest
from grass.script.core import run_command
create_test_vector_map(test_vector_name)
run_command("g.region", n=50, s=0, e=60, w=0, res=1)
run_command("r.mapcalc", expression="%s = 1" % (test_raster_name), overwrite=True)
doctest.testmod()
mset = get_mapset_vector(test_vector_name, mapset="")
if mset:
# Remove the generated vector map, if exists
run_command("g.remove", flags="f", type="vector", name=test_vector_name)
mset = get_mapset_raster(test_raster_name, mapset="")
if mset:
# Remove the generated raster map, if exists
run_command("g.remove", flags="f", type="raster", name=test_raster_name)