import contextlib
import os
import sys
import multiprocessing as mltp
import subprocess as sub
import shutil as sht
from math import ceil
from grass.script.setup import write_gisrc
from grass.script import append_node_pid, legalize_vector_name
from grass.pygrass.gis import Mapset, Location
from grass.pygrass.gis.region import Region
from grass.pygrass.modules import Module
from grass.pygrass.utils import get_mapset_raster, findmaps
from grass.pygrass.modules.grid.split import (
split_region_tiles,
split_region_in_overlapping_tiles,
)
from grass.pygrass.modules.grid.patch import rpatch_map, rpatch_map_r_patch_backend
[docs]def select(parms, ptype):
"""Select only a certain type of parameters.
:param parms: a DictType parameter with inputs or outputs of a Module class
:type parms: DictType parameters
:param ptype: String define the type of parameter that we want to select,
valid ptype are: 'raster', 'vector', 'group'
:type ptype: str
:returns: An iterator with the value of the parameter.
>>> slp = Module('r.slope.aspect',
... elevation='ele', slope='slp', aspect='asp',
... run_=False)
>>> for rast in select(slp.outputs, 'raster'):
... print(rast)
...
slp
asp
"""
for k in parms:
par = parms[k]
if par.type == ptype or (par.typedesc == ptype and par.value):
if par.multiple:
yield from par.value
else:
yield par.value
[docs]def copy_special_mapset_files(path_src, path_dst):
"""Copy all the special GRASS files that are contained in
a mapset to another mapset
:param path_src: the path to the original mapset
:type path_src: str
:param path_dst: the path to the new mapset
:type path_dst: str
"""
for fil in (fi for fi in os.listdir(path_src) if fi.isupper()):
sht.copy(os.path.join(path_src, fil), path_dst)
[docs]def copy_mapset(mapset, path):
"""Copy mapset to another place without copying raster and vector data.
:param mapset: a Mapset instance to copy
:type mapset: Mapset object
:param path: path where the new mapset must be copied
:type path: str
:returns: the instance of the new Mapset.
>>> from grass.script.core import gisenv
>>> mname = gisenv()['MAPSET']
>>> mset = Mapset()
>>> mset.name == mname
True
>>> import tempfile as tmp
>>> import os
>>> path = os.path.join(tmp.gettempdir(), 'my_loc', 'my_mset')
>>> copy_mapset(mset, path) # doctest: +ELLIPSIS
Mapset(...)
>>> sorted(os.listdir(path)) # doctest: +ELLIPSIS
[...'PERMANENT'...]
>>> sorted(os.listdir(os.path.join(path, 'PERMANENT')))
['DEFAULT_WIND', 'PROJ_INFO', 'PROJ_UNITS', 'VAR', 'WIND']
>>> sorted(os.listdir(os.path.join(path, mname))) # doctest: +ELLIPSIS
[...'WIND'...]
>>> import shutil
>>> shutil.rmtree(path)
"""
per_old = os.path.join(mapset.gisdbase, mapset.location, "PERMANENT")
per_new = os.path.join(path, "PERMANENT")
map_old = mapset.path()
map_new = os.path.join(path, mapset.name)
if not os.path.isdir(per_new):
os.makedirs(per_new)
if not os.path.isdir(map_new):
os.mkdir(map_new)
copy_special_mapset_files(per_old, per_new)
copy_special_mapset_files(map_old, map_new)
gisdbase, location = os.path.split(path)
return Mapset(mapset.name, location, gisdbase)
[docs]def read_gisrc(gisrc):
"""Read a GISRC file and return a tuple with the mapset, location
and gisdbase.
:param gisrc: the path to GISRC file
:type gisrc: str
:returns: a tuple with the mapset, location and gisdbase
>>> import os
>>> from grass.script.core import gisenv
>>> genv = gisenv()
>>> (read_gisrc(os.environ['GISRC']) == (genv['MAPSET'],
... genv['LOCATION_NAME'],
... genv['GISDBASE']))
True
"""
with open(gisrc) as gfile:
gis = {k.strip(): v.strip() for k, v in [row.split(":", 1) for row in gfile]}
return gis["MAPSET"], gis["LOCATION_NAME"], gis["GISDBASE"]
[docs]def get_mapset(gisrc_src, gisrc_dst):
"""Get mapset from a GISRC source to a GISRC destination.
:param gisrc_src: path to the GISRC source
:type gisrc_src: str
:param gisrc_dst: path to the GISRC destination
:type gisrc_dst: str
:returns: a tuple with Mapset(src), Mapset(dst)
"""
msrc, lsrc, gsrc = read_gisrc(gisrc_src)
mdst, ldst, gdst = read_gisrc(gisrc_dst)
path_src = os.path.join(gsrc, lsrc, msrc)
path_dst = os.path.join(gdst, ldst, mdst)
if not os.path.isdir(path_dst):
os.makedirs(path_dst)
copy_special_mapset_files(path_src, path_dst)
src = Mapset(msrc, lsrc, gsrc)
dst = Mapset(mdst, ldst, gdst)
visible = list(src.visible)
if src.name not in visible:
visible.append(src.name)
dst.visible.extend(visible)
return src, dst
[docs]def copy_groups(groups, gisrc_src, gisrc_dst, region=None):
"""Copy group from one mapset to another, crop the raster to the region
:param groups: a list of strings with the group that must be copied
from a master to another.
:type groups: list of strings
:param gisrc_src: path of the GISRC file from where we want to copy the groups
:type gisrc_src: str
:param gisrc_dst: path of the GISRC file where the groups will be created
:type gisrc_dst: str
:param region: a region like object or a dictionary with the region
parameters that will be used to crop the rasters of the groups
:type region: Region object or dictionary
:returns: None
"""
def rmloc(r):
return r.split("@")[0] if "@" in r else r
env = os.environ.copy()
# instantiate modules
get_grp = Module("i.group", flags="lg", stdout_=sub.PIPE, run_=False)
set_grp = Module("i.group")
get_grp.run_ = True
src = read_gisrc(gisrc_src)
dst = read_gisrc(gisrc_dst)
rm = src[2] != dst[2]
all_rasts = [r[0] for r in findmaps("raster", location=dst[1], gisdbase=dst[2])]
for grp in groups:
# change gisdbase to src
env["GISRC"] = gisrc_src
get_grp(group=grp, env_=env)
rasts = list(get_grp.outputs.stdout.split())
# change gisdbase to dst
env["GISRC"] = gisrc_dst
rast2cp = [r for r in rasts if rmloc(r) not in all_rasts]
if rast2cp:
copy_rasters(rast2cp, gisrc_src, gisrc_dst, region=region)
set_grp(
group=grp,
input=[rmloc(r) for r in rasts] if rast2cp or rm else rasts,
env_=env,
)
[docs]def set_region(region, gisrc_src, gisrc_dst, env):
"""Set a region into two different mapsets.
:param region: a region like object or a dictionary with the region
parameters that will be used to crop the rasters of the groups
:type region: Region object or dictionary
:param gisrc_src: path of the GISRC file from where we want to copy the groups
:type gisrc_src: str
:param gisrc_dst: path of the GISRC file where the groups will be created
:type gisrc_dst: str
:param env:
:type env:
:returns: None
"""
reg_str = (
"g.region n=%(north)r s=%(south)r "
"e=%(east)r w=%(west)r "
"nsres=%(nsres)r ewres=%(ewres)r"
)
reg_cmd = reg_str % dict(region.items())
env["GISRC"] = gisrc_src
sub.Popen(reg_cmd, shell=True, env=env)
env["GISRC"] = gisrc_dst
sub.Popen(reg_cmd, shell=True, env=env)
[docs]def copy_rasters(rasters, gisrc_src, gisrc_dst, region=None):
"""Copy rasters from one mapset to another, crop the raster to the region.
:param rasters: a list of strings with the raster map that must be copied
from a master to another.
:type rasters: list
:param gisrc_src: path of the GISRC file from where we want to copy the groups
:type gisrc_src: str
:param gisrc_dst: path of the GISRC file where the groups will be created
:type gisrc_dst: str
:param region: a region like object or a dictionary with the region
parameters that will be used to crop the rasters of the groups
:type region: Region object or dictionary
:returns: None
"""
env = os.environ.copy()
if region:
set_region(region, gisrc_src, gisrc_dst, env)
path_dst = os.path.join(*read_gisrc(gisrc_dst)[::-1])
nam = "copy%d__%s" % (id(gisrc_dst), "%s")
# instantiate modules
mpclc = Module("r.mapcalc")
rpck = Module("r.pack")
rupck = Module("r.unpack")
remove = Module("g.remove")
for rast in rasters:
rast_clean = rast.split("@")[0] if "@" in rast else rast
# change gisdbase to src
env["GISRC"] = gisrc_src
name = nam % rast_clean
mpclc(expression="%s=%s" % (name, rast), overwrite=True, env_=env)
file_dst = "%s.pack" % os.path.join(path_dst, name)
rpck(input=name, output=file_dst, overwrite=True, env_=env)
remove(flags="f", type="raster", name=name, env_=env)
# change gisdbase to dst
env["GISRC"] = gisrc_dst
rupck(input=file_dst, output=rast_clean, overwrite=True, env_=env)
os.remove(file_dst)
[docs]def copy_vectors(vectors, gisrc_src, gisrc_dst):
"""Copy vectors from one mapset to another, crop the raster to the region.
:param vectors: a list of strings with the vector map that must be copied
from a master to another.
:type vectors: list
:param gisrc_src: path of the GISRC file from where we want to copy the groups
:type gisrc_src: str
:param gisrc_dst: path of the GISRC file where the groups will be created
:type gisrc_dst: str
:returns: None
"""
env = os.environ.copy()
path_dst = os.path.join(*read_gisrc(gisrc_dst))
nam = "copy%d__%s" % (id(gisrc_dst), "%s")
# instantiate modules
vpck = Module("v.pack")
vupck = Module("v.unpack")
remove = Module("g.remove")
for vect in vectors:
# change gisdbase to src
env["GISRC"] = gisrc_src
name = nam % vect
file_dst = "%s.pack" % os.path.join(path_dst, name)
vpck(input=name, output=file_dst, overwrite=True, env_=env)
remove(flags="f", type="vector", name=name, env_=env)
# change gisdbase to dst
env["GISRC"] = gisrc_dst
vupck(input=file_dst, output=vect, overwrite=True, env_=env)
os.remove(file_dst)
[docs]def get_cmd(cmdd):
"""Transform a cmd dictionary to a list of parameters. It is useful to
pickle a Module class and cnvert into a string that can be used with
`Popen(get_cmd(cmdd), shell=True)`.
:param cmdd: a module dictionary with all the parameters
:type cmdd: dict
>>> slp = Module('r.slope.aspect',
... elevation='ele', slope='slp', aspect='asp',
... overwrite=True, run_=False)
>>> get_cmd(slp.get_dict()) # doctest: +ELLIPSIS
['r.slope.aspect', 'elevation=ele', 'format=degrees', ..., '--o']
"""
return [
cmdd["name"],
*("%s=%s" % (k, v) for k, v in cmdd["inputs"] if not isinstance(v, list)),
*(
"%s=%s"
% (
k,
",".join(vals if isinstance(vals[0], str) else [repr(v) for v in vals]),
)
for k, vals in cmdd["inputs"]
if isinstance(vals, list)
),
*("%s=%s" % (k, v) for k, v in cmdd["outputs"] if not isinstance(v, list)),
*(
"%s=%s" % (k, ",".join([repr(v) for v in vals]))
for k, vals in cmdd["outputs"]
if isinstance(vals, list)
),
*(f"-{flg}" for flg in cmdd["flags"] if len(flg) == 1),
*(f"--{flg[0]}" for flg in cmdd["flags"] if len(flg) > 1),
]
[docs]def cmd_exe(args):
"""Create a mapset, and execute a cmd inside.
:param args: is a tuple that contains several information see below
:type args: tuple
:returns: None
The puple has to contain:
- bbox (dict): a dict with the region parameters (n, s, e, w, etc.)
that we want to set before to apply the command.
- mapnames (dict): a dictionary to substitute the input if the domain has
been split in several tiles.
- gisrc_src (str): path of the GISRC file from where we want to copy the
groups.
- gisrc_dst (str): path of the GISRC file where the groups will be created.
- cmd (dict): a dictionary with all the parameter of a GRASS module.
- groups (list): a list of strings with the groups that we want to copy in
the mapset.
"""
bbox, mapnames, gisrc_src, gisrc_dst, cmd, groups = args
src, dst = get_mapset(gisrc_src, gisrc_dst)
env = os.environ.copy()
env["GISRC"] = gisrc_dst
shell = sys.platform == "win32"
if mapnames:
inputs = dict(cmd["inputs"])
# reset the inputs to
for key in mapnames:
inputs[key] = mapnames[key]
cmd["inputs"] = inputs.items()
# set the region to the tile
sub.Popen(["g.region", "raster=%s" % key], shell=shell, env=env).wait()
else:
# set the computational region
lcmd = ["g.region", *["%s=%s" % (k, v) for k, v in bbox.items()]]
sub.Popen(lcmd, shell=shell, env=env).wait()
if groups:
copy_groups(groups, gisrc_src, gisrc_dst)
# run the grass command
sub.Popen(get_cmd(cmd), shell=shell, env=env).wait()
# remove temp GISRC
os.remove(gisrc_dst)
[docs]class GridModule:
# TODO maybe also i.* could be supported easily
"""Run GRASS raster commands in a multiprocessing mode.
:param cmd: raster GRASS command, only command staring with r.* are valid.
:type cmd: str
:param width: width of the tile, in pixel
:type width: int
:param height: height of the tile, in pixel.
:type height: int
:param overlap: overlap between tiles, in pixel.
:type overlap: int
:param processes: number of threads, default value is equal to the number
of processors available.
:param split: if True use r.tile to split all the inputs.
:type split: bool
:param mapset_prefix: if specified created mapsets start with this prefix
:type mapset_prefix: str
:param patch_backend: "r.patch", "RasterRow", or None for for default
:type patch_backend: None or str
:param run_: if False only instantiate the object
:type run_: bool
:param args: give all the parameters to the command
:param kargs: give all the parameters to the command
When patch_backend is None, the RasterRow method is used for patching the result.
When patch_backend is "r.patch", r.patch is used with nprocs=processes.
r.patch can only be used when overlap is 0.
>>> grd = GridModule('r.slope.aspect',
... width=500, height=500, overlap=2,
... processes=None, split=False,
... elevation='elevation',
... slope='slope', aspect='aspect', overwrite=True)
>>> grd.run()
Temporary mapsets created start with a generated prefix which is unique for each
process (includes PID and node name). If more instances of this class are used in
parallel from one process with the same module, a custom *mapset_prefix* needs to
be provided.
"""
def __init__(
self,
cmd,
width=None,
height=None,
overlap=0,
processes=None,
split=False,
debug=False,
region=None,
move=None,
log=False,
start_row=0,
start_col=0,
out_prefix="",
mapset_prefix=None,
patch_backend=None,
*args,
**kargs,
):
kargs["run_"] = False
self.mset = Mapset()
self.module = Module(cmd, *args, **kargs)
self.width = width
self.height = height
self.overlap = overlap
self.processes = processes
self.region = region or Region()
self.start_row = start_row
self.start_col = start_col
self.out_prefix = out_prefix
self.log = log
self.move = move
# by default RasterRow is used as previously
# if overlap > 0, r.patch won't work properly
if not patch_backend:
self.patch_backend = "RasterRow"
elif patch_backend not in {"r.patch", "RasterRow"}:
raise RuntimeError(
_("Parameter patch_backend must be 'r.patch' or 'RasterRow'")
)
elif patch_backend == "r.patch" and self.overlap:
raise RuntimeError(
_("Patching backend 'r.patch' doesn't work for overlap > 0")
)
else:
self.patch_backend = patch_backend
self.gisrc_src = os.environ["GISRC"]
self.n_mset, self.gisrc_dst = None, None
self.estimate_tile_size()
if self.move:
self.n_mset = copy_mapset(self.mset, self.move)
self.gisrc_dst = write_gisrc(
self.n_mset.gisdbase, self.n_mset.location, self.n_mset.name
)
rasters = list(select(self.module.inputs, "raster"))
if rasters:
copy_rasters(
rasters, self.gisrc_src, self.gisrc_dst, region=self.region
)
vectors = list(select(self.module.inputs, "vector"))
if vectors:
copy_vectors(vectors, self.gisrc_src, self.gisrc_dst)
groups = list(select(self.module.inputs, "group"))
if groups:
copy_groups(groups, self.gisrc_src, self.gisrc_dst, region=self.region)
self.bboxes = split_region_in_overlapping_tiles(
region=region, width=self.width, height=self.height, overlap=overlap
)
if mapset_prefix:
self.mapset_prefix = mapset_prefix
else:
self.mapset_prefix = append_node_pid("grid_" + legalize_vector_name(cmd))
self.msetstr = self.mapset_prefix + "_%03d_%03d"
self.inlist = None
if split:
self.split()
self.debug = debug
def __del__(self):
if self.gisrc_dst:
# remove GISRC file
os.remove(self.gisrc_dst)
[docs] def clean_location(self, location=None):
"""Remove all created mapsets.
:param location: a Location instance where we are running the analysis
:type location: Location object
"""
if location is None:
if self.n_mset:
self.n_mset.current()
location = Location()
mapsets = location.mapsets(self.mapset_prefix + "_*")
for mset in mapsets:
Mapset(mset).delete()
if self.n_mset and self.n_mset.is_current():
self.mset.current()
[docs] def split(self):
"""Split all the raster inputs using r.tile"""
rtile = Module("r.tile")
inlist = {}
for inm in select(self.module.inputs, "raster"):
rtile(
input=inm.value,
output=inm.value,
width=self.width,
height=self.height,
overlap=self.overlap,
)
patt = "%s-*" % inm.value
inlist[inm.value] = sorted(self.mset.glist(type="raster", pattern=patt))
self.inlist = inlist
[docs] def estimate_tile_size(self):
"""Estimates tile width and height based on number of processes.
Keeps width and height if provided by user. If one dimension
is provided the other is computed as the minimum number of tiles
to keep all requested processes working (initially).
If no dimensions are provided, tiling is 1 column x N rows
which speeds up patching.
"""
region = Region()
if self.width and self.height:
return
if self.width:
n_tiles_x = ceil(region.cols / self.width)
n_tiles_y = ceil(self.processes / n_tiles_x)
self.height = ceil(region.rows / n_tiles_y)
elif self.height:
n_tiles_y = ceil(region.rows / self.height)
n_tiles_x = ceil(self.processes / n_tiles_y)
self.width = ceil(region.cols / n_tiles_x)
else:
self.width = region.cols
self.height = ceil(region.rows / self.processes)
[docs] def get_works(self):
"""Return a list of tuples with the parameters for cmd_exe function"""
works = []
reg = Region()
if self.move:
mdst, ldst, gdst = read_gisrc(self.gisrc_dst)
else:
ldst, gdst = self.mset.location, self.mset.gisdbase
cmd = self.module.get_dict()
groups = list(select(self.module.inputs, "group"))
for row, box_row in enumerate(self.bboxes):
for col, box in enumerate(box_row):
inms = None
if self.inlist:
inms = {}
cols = len(box_row)
indx = row * cols + col
for key in self.inlist:
inms[key] = "%s@%s" % (self.inlist[key][indx], self.mset.name)
# set the computational region, prepare the region parameters
bbox = {
**{k[0]: str(v) for k, v in box.items()[:-2]},
"nsres": "%f" % reg.nsres,
"ewres": "%f" % reg.ewres,
}
new_mset = (
self.msetstr % (self.start_row + row, self.start_col + col),
)
works.append(
(
bbox,
inms,
self.gisrc_src,
write_gisrc(gdst, ldst, new_mset),
cmd,
groups,
)
)
return works
[docs] def run(self, patch=True, clean=True):
"""Run the GRASS command
:param patch: set False if you does not want to patch the results
:type patch: bool
:param clean: set False if you does not want to remove all the stuff
created by GridModule
:type clean: bool
"""
with contextlib.ExitStack() as stack:
if clean:
stack.callback(self._clean)
self._actual_run(patch=patch)
def _actual_run(self, patch):
"""Run the GRASS command
:param patch: set False if you does not want to patch the results
"""
self.module.flags.overwrite = True
self.define_mapset_inputs()
if self.debug:
for wrk in self.get_works():
cmd_exe(wrk)
else:
pool = mltp.Pool(processes=self.processes)
result = pool.map_async(cmd_exe, self.get_works())
result.wait()
pool.close()
pool.join()
if not result.successful():
raise RuntimeError(_("Execution of subprocesses was not successful"))
if patch:
if self.move:
os.environ["GISRC"] = self.gisrc_dst
self.n_mset.current()
self.patch()
os.environ["GISRC"] = self.gisrc_src
self.mset.current()
# copy the outputs from dst => src
routputs = [
self.out_prefix + o for o in select(self.module.outputs, "raster")
]
copy_rasters(routputs, self.gisrc_dst, self.gisrc_src)
else:
self.patch()
if self.log:
# record in the temp directory
from grass.lib.gis import G_tempfile
tmp, dummy = os.path.split(G_tempfile())
tmpdir = os.path.join(tmp, self.module.name)
for k in self.module.outputs:
par = self.module.outputs[k]
if par.typedesc == "raster" and par.value:
dirpath = os.path.join(tmpdir, par.name)
if not os.path.isdir(dirpath):
os.makedirs(dirpath)
fil = open(os.path.join(dirpath, self.out_prefix + par.value), "w+")
fil.close()
def _clean(self):
"""Cleanup temporary data"""
self.clean_location()
self.rm_tiles()
if self.n_mset:
gisdbase, location = os.path.split(self.move)
self.clean_location(Location(location, gisdbase))
# rm temporary gis_rc
os.remove(self.gisrc_dst)
self.gisrc_dst = None
sht.rmtree(os.path.join(self.move, "PERMANENT"))
sht.rmtree(os.path.join(self.move, self.mset.name))
[docs] def patch(self):
"""Patch the final results."""
bboxes = split_region_tiles(width=self.width, height=self.height)
noutputs = 0
for otmap in self.module.outputs:
otm = self.module.outputs[otmap]
if otm.typedesc == "raster" and otm.value:
if self.patch_backend == "RasterRow":
rpatch_map(
raster=otm.value,
mapset=self.mset.name,
mset_str=self.msetstr,
bbox_list=bboxes,
overwrite=self.module.flags.overwrite,
start_row=self.start_row,
start_col=self.start_col,
prefix=self.out_prefix,
)
else:
rpatch_map_r_patch_backend(
raster=otm.value,
mset_str=self.msetstr,
bbox_list=bboxes,
overwrite=self.module.flags.overwrite,
start_row=self.start_row,
start_col=self.start_col,
prefix=self.out_prefix,
processes=self.processes,
)
noutputs += 1
if noutputs < 1:
msg = "No raster output option defined for <{}>".format(self.module.name)
if self.module.name == "r.mapcalc":
msg += ". Use <{}.simple> instead".format(self.module.name)
raise RuntimeError(msg)
[docs] def rm_tiles(self):
"""Remove all the tiles."""
# if split, remove tiles
if self.inlist:
grm = Module("g.remove")
for key in self.inlist:
grm(flags="f", type="raster", name=self.inlist[key])