# -*- coding: utf-8 -*-
from __future__ import (nested_scopes, generators, division, absolute_import,
with_statement, print_function, unicode_literals)
import contextlib
import os
import sys
import multiprocessing as mltp
import subprocess as sub
import shutil as sht
from grass.script.setup import write_gisrc
from grass.pygrass.gis import Mapset, Location
from grass.pygrass.gis.region import Region
from grass.pygrass.modules import Module
from grass.pygrass.utils import get_mapset_raster, findmaps
from grass.pygrass.modules.grid.split import split_region_tiles
from grass.pygrass.modules.grid.patch import rpatch_map
[docs]def select(parms, ptype):
"""Select only a certain type of parameters.
:param parms: a DictType parameter with inputs or outputs of a Module class
:type parms: DictType parameters
:param ptype: String define the type of parameter that we want to select,
valid ptype are: 'raster', 'vector', 'group'
:type ptype: str
:returns: An iterator with the value of the parameter.
>>> slp = Module('r.slope.aspect',
... elevation='ele', slope='slp', aspect='asp',
... run_=False)
>>> for rast in select(slp.outputs, 'raster'):
... print(rast)
...
slp
asp
"""
for k in parms:
par = parms[k]
if par.type == ptype or par.typedesc == ptype and par.value:
if par.multiple:
for val in par.value:
yield val
else:
yield par.value
[docs]def copy_special_mapset_files(path_src, path_dst):
"""Copy all the special GRASS files that are contained in
a mapset to another mapset
:param path_src: the path to the original mapset
:type path_src: str
:param path_dst: the path to the new mapset
:type path_dst: str
"""
for fil in (fi for fi in os.listdir(path_src) if fi.isupper()):
sht.copy(os.path.join(path_src, fil), path_dst)
[docs]def copy_mapset(mapset, path):
"""Copy mapset to another place without copying raster and vector data.
:param mapset: a Mapset instance to copy
:type mapset: Mapset object
:param path: path where the new mapset must be copied
:type path: str
:returns: the instance of the new Mapset.
>>> from grass.script.core import gisenv
>>> mname = gisenv()['MAPSET']
>>> mset = Mapset()
>>> mset.name == mname
True
>>> import tempfile as tmp
>>> import os
>>> path = os.path.join(tmp.gettempdir(), 'my_loc', 'my_mset')
>>> copy_mapset(mset, path) # doctest: +ELLIPSIS
Mapset(...)
>>> sorted(os.listdir(path)) # doctest: +ELLIPSIS
[...'PERMANENT'...]
>>> sorted(os.listdir(os.path.join(path, 'PERMANENT')))
['DEFAULT_WIND', 'PROJ_INFO', 'PROJ_UNITS', 'VAR', 'WIND']
>>> sorted(os.listdir(os.path.join(path, mname))) # doctest: +ELLIPSIS
[...'WIND'...]
>>> import shutil
>>> shutil.rmtree(path)
"""
per_old = os.path.join(mapset.gisdbase, mapset.location, 'PERMANENT')
per_new = os.path.join(path, 'PERMANENT')
map_old = mapset.path()
map_new = os.path.join(path, mapset.name)
if not os.path.isdir(per_new):
os.makedirs(per_new)
if not os.path.isdir(map_new):
os.mkdir(map_new)
copy_special_mapset_files(per_old, per_new)
copy_special_mapset_files(map_old, map_new)
gisdbase, location = os.path.split(path)
return Mapset(mapset.name, location, gisdbase)
[docs]def read_gisrc(gisrc):
"""Read a GISRC file and return a tuple with the mapset, location
and gisdbase.
:param gisrc: the path to GISRC file
:type gisrc: str
:returns: a tuple with the mapset, location and gisdbase
>>> import os
>>> from grass.script.core import gisenv
>>> genv = gisenv()
>>> (read_gisrc(os.environ['GISRC']) == (genv['MAPSET'],
... genv['LOCATION_NAME'],
... genv['GISDBASE']))
True
"""
with open(gisrc, 'r') as gfile:
gis = dict([(k.strip(), v.strip())
for k, v in [row.split(':', 1) for row in gfile]])
return gis['MAPSET'], gis['LOCATION_NAME'], gis['GISDBASE']
[docs]def get_mapset(gisrc_src, gisrc_dst):
"""Get mapset from a GISRC source to a GISRC destination.
:param gisrc_src: path to the GISRC source
:type gisrc_src: str
:param gisrc_dst: path to the GISRC destination
:type gisrc_dst: str
:returns: a tuple with Mapset(src), Mapset(dst)
"""
msrc, lsrc, gsrc = read_gisrc(gisrc_src)
mdst, ldst, gdst = read_gisrc(gisrc_dst)
path_src = os.path.join(gsrc, lsrc, msrc)
path_dst = os.path.join(gdst, ldst, mdst)
if not os.path.isdir(path_dst):
os.makedirs(path_dst)
copy_special_mapset_files(path_src, path_dst)
src = Mapset(msrc, lsrc, gsrc)
dst = Mapset(mdst, ldst, gdst)
visible = [m for m in src.visible]
if src.name not in visible:
visible.append(src.name)
dst.visible.extend(visible)
return src, dst
[docs]def copy_groups(groups, gisrc_src, gisrc_dst, region=None):
"""Copy group from one mapset to another, crop the raster to the region
:param groups: a list of strings with the group that must be copied
from a master to another.
:type groups: list of strings
:param gisrc_src: path of the GISRC file from where we want to copy the groups
:type gisrc_src: str
:param gisrc_dst: path of the GISRC file where the groups will be created
:type gisrc_dst: str
:param region: a region like object or a dictionary with the region
parameters that will be used to crop the rasters of the
groups
:type region: Region object or dictionary
:returns: None
"""
env = os.environ.copy()
# instantiate modules
get_grp = Module('i.group', flags='lg', stdout_=sub.PIPE, run_=False)
set_grp = Module('i.group')
get_grp.run_ = True
rmloc = lambda r: r.split('@')[0] if '@' in r else r
src = read_gisrc(gisrc_src)
dst = read_gisrc(gisrc_dst)
rm = True if src[2] != dst[2] else False
all_rasts = [r[0]
for r in findmaps('raster', location=dst[1], gisdbase=dst[2])]
for grp in groups:
# change gisdbase to src
env['GISRC'] = gisrc_src
get_grp(group=grp, env_=env)
rasts = [r for r in get_grp.outputs.stdout.split()]
# change gisdbase to dst
env['GISRC'] = gisrc_dst
rast2cp = [r for r in rasts if rmloc(r) not in all_rasts]
if rast2cp:
copy_rasters(rast2cp, gisrc_src, gisrc_dst, region=region)
set_grp(group=grp,
input=[rmloc(r) for r in rasts] if rast2cp or rm else rasts,
env_=env)
[docs]def set_region(region, gisrc_src, gisrc_dst, env):
"""Set a region into two different mapsets.
:param region: a region like object or a dictionary with the region
parameters that will be used to crop the rasters of the
groups
:type region: Region object or dictionary
:param gisrc_src: path of the GISRC file from where we want to copy the groups
:type gisrc_src: str
:param gisrc_dst: path of the GISRC file where the groups will be created
:type gisrc_dst: str
:param env:
:type env:
:returns: None
"""
reg_str = "g.region n=%(north)r s=%(south)r " \
"e=%(east)r w=%(west)r " \
"nsres=%(nsres)r ewres=%(ewres)r"
reg_cmd = reg_str % dict(region.items())
env['GISRC'] = gisrc_src
sub.Popen(reg_cmd, shell=True, env=env)
env['GISRC'] = gisrc_dst
sub.Popen(reg_cmd, shell=True, env=env)
[docs]def copy_rasters(rasters, gisrc_src, gisrc_dst, region=None):
"""Copy rasters from one mapset to another, crop the raster to the region.
:param rasters: a list of strings with the raster map that must be copied
from a master to another.
:type rasters: list
:param gisrc_src: path of the GISRC file from where we want to copy the groups
:type gisrc_src: str
:param gisrc_dst: path of the GISRC file where the groups will be created
:type gisrc_dst: str
:param region: a region like object or a dictionary with the region
parameters that will be used to crop the rasters of the
groups
:type region: Region object or dictionary
:returns: None
"""
env = os.environ.copy()
if region:
set_region(region, gisrc_src, gisrc_dst, env)
path_dst = os.path.join(*read_gisrc(gisrc_dst)[::-1])
nam = "copy%d__%s" % (id(gisrc_dst), '%s')
# instantiate modules
mpclc = Module('r.mapcalc')
rpck = Module('r.pack')
rupck = Module('r.unpack')
remove = Module('g.remove')
for rast in rasters:
rast_clean = rast.split('@')[0] if '@' in rast else rast
# change gisdbase to src
env['GISRC'] = gisrc_src
name = nam % rast_clean
mpclc(expression="%s=%s" % (name, rast), overwrite=True, env_=env)
file_dst = "%s.pack" % os.path.join(path_dst, name)
rpck(input=name, output=file_dst, overwrite=True, env_=env)
remove(flags='f', type='raster', name=name, env_=env)
# change gisdbase to dst
env['GISRC'] = gisrc_dst
rupck(input=file_dst, output=rast_clean, overwrite=True, env_=env)
os.remove(file_dst)
[docs]def copy_vectors(vectors, gisrc_src, gisrc_dst):
"""Copy vectors from one mapset to another, crop the raster to the region.
:param vectors: a list of strings with the vector map that must be copied
from a master to another.
:type vectors: list
:param gisrc_src: path of the GISRC file from where we want to copy the groups
:type gisrc_src: str
:param gisrc_dst: path of the GISRC file where the groups will be created
:type gisrc_dst: str
:returns: None
"""
env = os.environ.copy()
path_dst = os.path.join(*read_gisrc(gisrc_dst))
nam = "copy%d__%s" % (id(gisrc_dst), '%s')
# instantiate modules
vpck = Module('v.pack')
vupck = Module('v.unpack')
remove = Module('g.remove')
for vect in vectors:
# change gisdbase to src
env['GISRC'] = gisrc_src
name = nam % vect
file_dst = "%s.pack" % os.path.join(path_dst, name)
vpck(input=name, output=file_dst, overwrite=True, env_=env)
remove(flags='f', type='vector', name=name, env_=env)
# change gisdbase to dst
env['GISRC'] = gisrc_dst
vupck(input=file_dst, output=vect, overwrite=True, env_=env)
os.remove(file_dst)
[docs]def get_cmd(cmdd):
"""Transform a cmd dictionary to a list of parameters. It is useful to
pickle a Module class and cnvert into a string that can be used with
`Popen(get_cmd(cmdd), shell=True)`.
:param cmdd: a module dictionary with all the parameters
:type cmdd: dict
>>> slp = Module('r.slope.aspect',
... elevation='ele', slope='slp', aspect='asp',
... overwrite=True, run_=False)
>>> get_cmd(slp.get_dict()) # doctest: +ELLIPSIS
['r.slope.aspect', 'elevation=ele', 'format=degrees', ..., '--o']
"""
cmd = [cmdd['name'], ]
cmd.extend(("%s=%s" % (k, v) for k, v in cmdd['inputs']
if not isinstance(v, list)))
cmd.extend(("%s=%s" % (k, ','.join(vals if isinstance(vals[0], str)
else [repr(v) for v in vals]))
for k, vals in cmdd['inputs']
if isinstance(vals, list)))
cmd.extend(("%s=%s" % (k, v) for k, v in cmdd['outputs']
if not isinstance(v, list)))
cmd.extend(("%s=%s" % (k, ','.join([repr(v) for v in vals]))
for k, vals in cmdd['outputs'] if isinstance(vals, list)))
cmd.extend(("-%s" % (flg) for flg in cmdd['flags'] if len(flg) == 1))
cmd.extend(("--%s" % (flg[0]) for flg in cmdd['flags'] if len(flg) > 1))
return cmd
[docs]def cmd_exe(args):
"""Create a mapset, and execute a cmd inside.
:param args: is a tuple that contains several information see below
:type args: tuple
:returns: None
The puple has to contain:
- bbox (dict): a dict with the region parameters (n, s, e, w, etc.)
that we want to set before to apply the command.
- mapnames (dict): a dictionary to substitute the input if the domain has
been split in several tiles.
- gisrc_src (str): path of the GISRC file from where we want to copy the
groups.
- gisrc_dst (str): path of the GISRC file where the groups will be created.
- cmd (dict): a dictionary with all the parameter of a GRASS module.
- groups (list): a list of strings with the groups that we want to copy in
the mapset.
"""
bbox, mapnames, gisrc_src, gisrc_dst, cmd, groups = args
src, dst = get_mapset(gisrc_src, gisrc_dst)
env = os.environ.copy()
env['GISRC'] = gisrc_dst
shell = True if sys.platform == 'win32' else False
if mapnames:
inputs = dict(cmd['inputs'])
# reset the inputs to
for key in mapnames:
inputs[key] = mapnames[key]
cmd['inputs'] = inputs.items()
# set the region to the tile
sub.Popen(['g.region', 'raster=%s' % key], shell=shell, env=env).wait()
else:
# set the computational region
lcmd = ['g.region', ]
lcmd.extend(["%s=%s" % (k, v) for k, v in bbox.items()])
sub.Popen(lcmd, shell=shell, env=env).wait()
if groups:
copy_groups(groups, gisrc_src, gisrc_dst)
# run the grass command
sub.Popen(get_cmd(cmd), shell=shell, env=env).wait()
# remove temp GISRC
os.remove(gisrc_dst)
[docs]class GridModule(object):
# TODO maybe also i.* could be supported easily
"""Run GRASS raster commands in a multiprocessing mode.
:param cmd: raster GRASS command, only command staring with r.* are valid.
:type cmd: str
:param width: width of the tile, in pixel
:type width: int
:param height: height of the tile, in pixel.
:type height: int
:param overlap: overlap between tiles, in pixel.
:type overlap: int
:param processes: number of threads, default value is equal to the number
of processor available.
:param split: if True use r.tile to split all the inputs.
:type split: bool
:param mapset_prefix: if specified created mapsets start with this prefix
:type mapset_prefix: str
:param run_: if False only instantiate the object
:type run_: bool
:param args: give all the parameters to the command
:param kargs: give all the parameters to the command
>>> grd = GridModule('r.slope.aspect',
... width=500, height=500, overlap=2,
... processes=None, split=False,
... elevation='elevation',
... slope='slope', aspect='aspect', overwrite=True)
>>> grd.run()
"""
def __init__(self, cmd, width=None, height=None, overlap=0, processes=None,
split=False, debug=False, region=None, move=None, log=False,
start_row=0, start_col=0, out_prefix='', mapset_prefix=None,
*args, **kargs):
kargs['run_'] = False
self.mset = Mapset()
self.module = Module(cmd, *args, **kargs)
self.width = width
self.height = height
self.overlap = overlap
self.processes = processes
self.region = region if region else Region()
self.start_row = start_row
self.start_col = start_col
self.out_prefix = out_prefix
self.log = log
self.move = move
self.gisrc_src = os.environ['GISRC']
self.n_mset, self.gisrc_dst = None, None
if self.move:
self.n_mset = copy_mapset(self.mset, self.move)
self.gisrc_dst = write_gisrc(self.n_mset.gisdbase,
self.n_mset.location,
self.n_mset.name)
rasters = [r for r in select(self.module.inputs, 'raster')]
if rasters:
copy_rasters(rasters, self.gisrc_src, self.gisrc_dst,
region=self.region)
vectors = [v for v in select(self.module.inputs, 'vector')]
if vectors:
copy_vectors(vectors, self.gisrc_src, self.gisrc_dst)
groups = [g for g in select(self.module.inputs, 'group')]
if groups:
copy_groups(groups, self.gisrc_src, self.gisrc_dst,
region=self.region)
self.bboxes = split_region_tiles(region=region,
width=width, height=height,
overlap=overlap)
if mapset_prefix:
self.msetstr = mapset_prefix + "_%03d_%03d"
else:
self.msetstr = cmd.replace('.', '') + "_%03d_%03d"
self.inlist = None
if split:
self.split()
self.debug = debug
def __del__(self):
if self.gisrc_dst:
# remove GISRC file
os.remove(self.gisrc_dst)
[docs] def clean_location(self, location=None):
"""Remove all created mapsets.
:param location: a Location instance where we are running the analysis
:type location: Location object
"""
if location is None:
if self.n_mset:
self.n_mset.current()
location = Location()
mapsets = location.mapsets(self.msetstr.split('_')[0] + '_*')
for mset in mapsets:
Mapset(mset).delete()
if self.n_mset and self.n_mset.is_current():
self.mset.current()
[docs] def split(self):
"""Split all the raster inputs using r.tile"""
rtile = Module('r.tile')
inlist = {}
for inm in select(self.module.inputs, 'raster'):
rtile(input=inm.value, output=inm.value,
width=self.width, height=self.height,
overlap=self.overlap)
patt = '%s-*' % inm.value
inlist[inm.value] = sorted(self.mset.glist(type='raster',
pattern=patt))
self.inlist = inlist
[docs] def get_works(self):
"""Return a list of tuble with the parameters for cmd_exe function"""
works = []
reg = Region()
if self.move:
mdst, ldst, gdst = read_gisrc(self.gisrc_dst)
else:
ldst, gdst = self.mset.location, self.mset.gisdbase
cmd = self.module.get_dict()
groups = [g for g in select(self.module.inputs, 'group')]
for row, box_row in enumerate(self.bboxes):
for col, box in enumerate(box_row):
inms = None
if self.inlist:
inms = {}
cols = len(box_row)
for key in self.inlist:
indx = row * cols + col
inms[key] = "%s@%s" % (self.inlist[key][indx],
self.mset.name)
# set the computational region, prepare the region parameters
bbox = dict([(k[0], str(v)) for k, v in box.items()[:-2]])
bbox['nsres'] = '%f' % reg.nsres
bbox['ewres'] = '%f' % reg.ewres
new_mset = self.msetstr % (self.start_row + row,
self.start_col + col),
works.append((bbox, inms,
self.gisrc_src,
write_gisrc(gdst, ldst, new_mset),
cmd, groups))
return works
[docs] def run(self, patch=True, clean=True):
"""Run the GRASS command
:param patch: set False if you does not want to patch the results
:type patch: bool
:param clean: set False if you does not want to remove all the stuff
created by GridModule
:type clean: bool
"""
with contextlib.ExitStack() as stack:
if clean:
stack.callback(self._clean)
self._actual_run(patch=patch)
def _actual_run(self, patch):
"""Run the GRASS command
:param patch: set False if you does not want to patch the results
"""
self.module.flags.overwrite = True
self.define_mapset_inputs()
if self.debug:
for wrk in self.get_works():
cmd_exe(wrk)
else:
pool = mltp.Pool(processes=self.processes)
result = pool.map_async(cmd_exe, self.get_works())
result.wait()
pool.close()
pool.join()
if not result.successful():
raise RuntimeError(_("Execution of subprocesses was not successful"))
if patch:
if self.move:
os.environ['GISRC'] = self.gisrc_dst
self.n_mset.current()
self.patch()
os.environ['GISRC'] = self.gisrc_src
self.mset.current()
# copy the outputs from dst => src
routputs = [self.out_prefix + o
for o in select(self.module.outputs, 'raster')]
copy_rasters(routputs, self.gisrc_dst, self.gisrc_src)
else:
self.patch()
if self.log:
# record in the temp directory
from grass.lib.gis import G_tempfile
tmp, dummy = os.path.split(G_tempfile())
tmpdir = os.path.join(tmp, self.module.name)
for k in self.module.outputs:
par = self.module.outputs[k]
if par.typedesc == 'raster' and par.value:
dirpath = os.path.join(tmpdir, par.name)
if not os.path.isdir(dirpath):
os.makedirs(dirpath)
fil = open(os.path.join(dirpath,
self.out_prefix + par.value), 'w+')
fil.close()
def _clean(self):
"""Cleanup temporary data"""
self.clean_location()
self.rm_tiles()
if self.n_mset:
gisdbase, location = os.path.split(self.move)
self.clean_location(Location(location, gisdbase))
# rm temporary gis_rc
os.remove(self.gisrc_dst)
self.gisrc_dst = None
sht.rmtree(os.path.join(self.move, "PERMANENT"))
sht.rmtree(os.path.join(self.move, self.mset.name))
[docs] def patch(self):
"""Patch the final results."""
bboxes = split_region_tiles(width=self.width, height=self.height)
noutputs = 0
for otmap in self.module.outputs:
otm = self.module.outputs[otmap]
if otm.typedesc == 'raster' and otm.value:
rpatch_map(otm.value,
self.mset.name, self.msetstr, bboxes,
self.module.flags.overwrite,
self.start_row, self.start_col, self.out_prefix)
noutputs += 1
if noutputs < 1:
msg = 'No raster output option defined for <{}>'.format(self.module.name)
if self.module.name == 'r.mapcalc':
msg += '. Use <{}.simple> instead'.format(self.module.name)
raise RuntimeError(msg)
[docs] def rm_tiles(self):
"""Remove all the tiles."""
# if split, remove tiles
if self.inlist:
grm = Module('g.remove')
for key in self.inlist:
grm(flags='f', type='raster', name=self.inlist[key])