Source code for temporal.extract
"""
Extract functions for space time raster, 3d raster and vector datasets
(C) 2012-2013 by the GRASS Development Team
This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS
for details.
:authors: Soeren Gebbert
"""
import sys
from multiprocessing import Process
import grass.script as gs
from grass.exceptions import CalledModuleError
from .abstract_map_dataset import AbstractMapDataset
from .core import (
SQLDatabaseInterfaceConnection,
get_current_mapset,
get_tgis_message_interface,
)
from .datetime_math import (
create_numeric_suffix,
create_suffix_from_datetime,
create_time_suffix,
)
from .open_stds import check_new_stds, open_new_stds, open_old_stds
############################################################################
[docs]def extract_dataset(
input,
output,
type,
where,
expression,
base,
time_suffix,
nprocs: int = 1,
register_null: bool = False,
layer: int = 1,
vtype="point,line,boundary,centroid,area,face",
) -> None:
"""Extract a subset of a space time raster, raster3d or vector dataset
A mapcalc expression can be provided to process the temporal extracted
maps.
Mapcalc expressions are supported for raster and raster3d maps.
:param input: The name of the input space time raster/raster3d dataset
:param output: The name of the extracted new space time raster/raster3d
dataset
:param type: The type of the dataset: "raster", "raster3d" or vector
:param where: The temporal SQL WHERE statement for subset extraction
:param expression: The r(3).mapcalc expression or the v.extract where
statement
:param base: The base name of the new created maps in case a mapclac
expression is provided
:param time_suffix: string to choose which suffix to use: gran, time, num%*
(where * are digits)
:param nprocs: The number of parallel processes to be used for mapcalc
processing
:param register_null: Set this number True to register empty maps
(only raster and raster3d maps)
:param layer: The vector layer number to be used when no timestamped
layer is present, default is 1
:param vtype: The feature type to be extracted for vector maps, default
is point,line,boundary,centroid,area and face
"""
# Check the parameters
msgr = get_tgis_message_interface()
if expression and not base:
msgr.fatal(_("You need to specify the base name of new created maps"))
mapset = get_current_mapset()
dbif = SQLDatabaseInterfaceConnection()
dbif.connect()
sp = open_old_stds(input, type, dbif)
# Check the new stds
new_sp = check_new_stds(output, type, dbif, gs.overwrite())
if type == "vector":
rows = sp.get_registered_maps("id,name,mapset,layer", where, "start_time", dbif)
else:
rows = sp.get_registered_maps("id", where, "start_time", dbif)
new_maps = {}
if rows:
num_rows = len(rows)
msgr.percent(0, num_rows, 1)
# Run the mapcalc expression
if expression:
count = 0
proc_count = 0
proc_list = []
for row in rows:
count += 1
if count % 10 == 0:
msgr.percent(count, num_rows, 1)
if sp.get_temporal_type() == "absolute" and time_suffix == "gran":
old_map = sp.get_new_map_instance(row["id"])
old_map.select(dbif)
suffix = create_suffix_from_datetime(
old_map.temporal_extent.get_start_time(), sp.get_granularity()
)
map_name = "{ba}_{su}".format(ba=base, su=suffix)
elif sp.get_temporal_type() == "absolute" and time_suffix == "time":
old_map = sp.get_new_map_instance(row["id"])
old_map.select(dbif)
suffix = create_time_suffix(old_map)
map_name = "{ba}_{su}".format(ba=base, su=suffix)
else:
map_name = create_numeric_suffix(base, count, time_suffix)
# We need to modify the r(3).mapcalc expression
if type != "vector":
expr = expression
expr = expr.replace(sp.base.get_map_id(), row["id"])
expr = expr.replace(sp.base.get_name(), row["id"])
expr = "%s = %s" % (map_name, expr)
# We need to build the id
map_id = AbstractMapDataset.build_id(map_name, mapset)
else:
map_id = AbstractMapDataset.build_id(map_name, mapset, row["layer"])
new_map = sp.get_new_map_instance(map_id)
# Check if new map is in the temporal database
if new_map.is_in_db(dbif):
if gs.overwrite():
# Remove the existing temporal database entry
new_map.delete(dbif)
new_map = sp.get_new_map_instance(map_id)
else:
msgr.error(
_(
"Map <%s> is already in temporal database"
", use overwrite flag to overwrite"
)
% (new_map.get_map_id())
)
continue
# Add process to the process list
if type == "raster":
msgr.verbose(_('Applying r.mapcalc expression: "%s"') % expr)
proc_list.append(Process(target=run_mapcalc2d, args=(expr,)))
elif type == "raster3d":
msgr.verbose(_('Applying r3.mapcalc expression: "%s"') % expr)
proc_list.append(Process(target=run_mapcalc3d, args=(expr,)))
elif type == "vector":
msgr.verbose(
_('Applying v.extract where statement: "%s"') % expression
)
if row["layer"]:
proc_list.append(
Process(
target=run_vector_extraction,
args=(
row["name"] + "@" + row["mapset"],
map_name,
row["layer"],
vtype,
expression,
),
)
)
else:
proc_list.append(
Process(
target=run_vector_extraction,
args=(
row["name"] + "@" + row["mapset"],
map_name,
layer,
vtype,
expression,
),
)
)
proc_list[proc_count].start()
proc_count += 1
# Join processes if the maximum number of processes are
# reached or the end of the loop is reached
if proc_count == nprocs or count == num_rows:
proc_count = 0
exitcodes = 0
for proc in proc_list:
proc.join()
exitcodes += proc.exitcode
if exitcodes != 0:
dbif.close()
msgr.fatal(_("Error in computation process"))
# Empty process list
proc_list = []
# Store the new maps
new_maps[row["id"]] = new_map
msgr.percent(0, num_rows, 1)
temporal_type, semantic_type, title, description = sp.get_initial_values()
new_sp = open_new_stds(
output,
type,
sp.get_temporal_type(),
title,
description,
semantic_type,
dbif,
gs.overwrite(),
)
# collect empty maps to remove them
empty_maps = []
# Register the maps in the database
count = 0
for row in rows:
count += 1
if count % 10 == 0:
msgr.percent(count, num_rows, 1)
old_map = sp.get_new_map_instance(row["id"])
old_map.select(dbif)
if expression:
# Register the new maps
if row["id"] in new_maps:
new_map = new_maps[row["id"]]
# Read the raster map data
new_map.load()
# In case of a empty map continue, do not register empty
# maps
if type in {"raster", "raster3d"}:
if (
new_map.metadata.get_min() is None
and new_map.metadata.get_max() is None
):
if not register_null:
empty_maps.append(new_map)
continue
elif type == "vector":
if (
new_map.metadata.get_number_of_primitives() == 0
or new_map.metadata.get_number_of_primitives() is None
):
if not register_null:
empty_maps.append(new_map)
continue
# Set the time stamp
new_map.set_temporal_extent(old_map.get_temporal_extent())
if type == "raster":
# Set the semantic label
semantic_label = old_map.metadata.get_semantic_label()
if semantic_label is not None:
new_map.set_semantic_label(semantic_label)
# Insert map in temporal database
new_map.insert(dbif)
new_sp.register_map(new_map, dbif)
else:
new_sp.register_map(old_map, dbif)
# Update the spatio-temporal extent and the metadata table entries
new_sp.update_from_registered_maps(dbif)
msgr.percent(num_rows, num_rows, 1)
# Remove empty maps
if len(empty_maps) > 0:
names = ""
count = 0
for map in empty_maps:
if count == 0:
names += "%s" % (map.get_name())
else:
names += ",%s" % (map.get_name())
count += 1
if type == "raster":
gs.run_command(
"g.remove", flags="f", type="raster", name=names, quiet=True
)
elif type == "raster3d":
gs.run_command(
"g.remove", flags="f", type="raster_3d", name=names, quiet=True
)
elif type == "vector":
gs.run_command(
"g.remove", flags="f", type="vector", name=names, quiet=True
)
dbif.close()
###############################################################################
[docs]def run_mapcalc2d(expr) -> None:
"""Helper function to run r.mapcalc in parallel"""
try:
gs.run_command(
"r.mapcalc", expression=expr, overwrite=gs.overwrite(), quiet=True
)
except CalledModuleError:
sys.exit(1)
[docs]def run_mapcalc3d(expr) -> None:
"""Helper function to run r3.mapcalc in parallel"""
try:
gs.run_command(
"r3.mapcalc", expression=expr, overwrite=gs.overwrite(), quiet=True
)
except CalledModuleError:
sys.exit(1)
[docs]def run_vector_extraction(input, output, layer, type, where) -> None:
"""Helper function to run r.mapcalc in parallel"""
try:
gs.run_command(
"v.extract",
input=input,
output=output,
layer=layer,
type=type,
where=where,
overwrite=gs.overwrite(),
quiet=True,
)
except CalledModuleError:
sys.exit(1)