"""
Functions to create space time dataset lists
Usage:
.. code-block:: python
import grass.temporal as tgis
tgis.register_maps_in_space_time_dataset(type, name, maps)
(C) 2012-2022 by the GRASS Development Team
This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS GIS
for details.
:authors: Soeren Gebbert
:authors: Vaclav Petras
"""
import os
import sys
from contextlib import contextmanager
import grass.script as gs
from .core import get_available_temporal_mapsets, get_tgis_message_interface, init_dbif
from .datetime_math import time_delta_to_relative_time
from .factory import dataset_factory
from .open_stds import open_old_stds
###############################################################################
[docs]def get_dataset_list(
type, temporal_type, columns=None, where=None, order=None, dbif=None
):
"""Return a list of time stamped maps or space time datasets of a specific
temporal type that are registered in the temporal database
This method returns a dictionary, the keys are the available mapsets,
the values are the rows from the SQL database query.
:param type: The type of the datasets (strds, str3ds, stvds, raster,
raster_3d, vector)
:param temporal_type: The temporal type of the datasets (absolute,
relative)
:param columns: A comma separated list of columns that will be selected
:param where: A where statement for selected listing without "WHERE"
:param order: A comma separated list of columns to order the
datasets by category
:param dbif: The database interface to be used
:return: A dictionary with the rows of the SQL query for each
available mapset
.. code-block:: python
>>> import grass.temporal as tgis
>>> tgis.core.init()
>>> name = "list_stds_test"
>>> sp = tgis.open_stds.open_new_stds(
... name=name,
... type="strds",
... temporaltype="absolute",
... title="title",
... descr="descr",
... semantic="mean",
... dbif=None,
... overwrite=True,
... )
>>> mapset = tgis.get_current_mapset()
>>> stds_list = tgis.list_stds.get_dataset_list(
... "strds", "absolute", columns="name"
... )
>>> rows = stds_list[mapset]
>>> for row in rows:
... if row["name"] == name:
... print(True)
...
True
>>> stds_list = tgis.list_stds.get_dataset_list(
... "strds",
... "absolute",
... columns="name,mapset",
... where="mapset = '%s'" % (mapset),
... )
>>> rows = stds_list[mapset]
>>> for row in rows:
... if row["name"] == name and row["mapset"] == mapset:
... print(True)
...
True
>>> check = sp.delete()
"""
id = None
sp = dataset_factory(type, id)
dbif, connection_state_changed = init_dbif(dbif)
mapsets = get_available_temporal_mapsets()
result = {}
for mapset in mapsets.keys():
if temporal_type == "absolute":
table = sp.get_type() + "_view_abs_time"
else:
table = sp.get_type() + "_view_rel_time"
if columns and columns.find("all") == -1:
sql = "SELECT " + str(columns) + " FROM " + table
else:
sql = "SELECT * FROM " + table
if where:
sql += " WHERE " + where
sql += " AND mapset = '%s'" % (mapset)
else:
sql += " WHERE mapset = '%s'" % (mapset)
if order:
sql += " ORDER BY " + order
dbif.execute(sql, mapset=mapset)
rows = dbif.fetchall(mapset=mapset)
if rows:
result[mapset] = rows
if connection_state_changed:
dbif.close()
return result
###############################################################################
@contextmanager
def _open_output_file(file, encoding="utf-8", **kwargs):
if not file:
yield sys.stdout
elif not isinstance(file, (str, os.PathLike)):
yield file
else:
with open(file, "w", encoding=encoding, **kwargs) as stream:
yield stream
def _write_line(items, separator, file) -> None:
if not separator:
separator = ","
output = separator.join([f"{item}" for item in items])
with _open_output_file(file) as stream:
print(f"{output}", file=stream)
def _write_plain(rows, header, separator, file) -> None:
def write_plain_row(items, separator, file) -> None:
output = separator.join([f"{item}" for item in items])
print(f"{output}", file=file)
with _open_output_file(file) as stream:
# Print the column names if requested
if header:
write_plain_row(items=header, separator=separator, file=stream)
for row in rows:
write_plain_row(items=row, separator=separator, file=stream)
def _write_json(rows, column_names, file) -> None:
# Lazy import output format-specific dependencies.
# pylint: disable=import-outside-toplevel
import datetime
import json
class ResultsEncoder(json.JSONEncoder):
"""Results encoder for JSON which handles SimpleNamespace objects"""
def default(self, o):
"""Handle additional types"""
if isinstance(o, datetime.datetime):
return f"{o}"
return super().default(o)
dict_rows = []
for row in rows:
new_row = {}
for key, value in zip(column_names, row):
new_row[key] = value
dict_rows.append(new_row)
meta = {"column_names": column_names}
with _open_output_file(file) as stream:
json.dump({"data": dict_rows, "metadata": meta}, stream, cls=ResultsEncoder)
def _write_yaml(rows, column_names, file=sys.stdout) -> None:
# Lazy import output format-specific dependencies.
# pylint: disable=import-outside-toplevel
import yaml
class NoAliasIndentListSafeDumper(yaml.SafeDumper):
"""YAML dumper class which does not create aliases and indents lists
This avoid dates being labeled with &id001 and referenced with *id001.
Instead, same dates are simply repeated.
Lists have their dash-space (- ) indented instead of considering the
dash and space to be a part of indentation. This might be better handled
when https://github.com/yaml/pyyaml/issues/234 is resolved.
"""
def ignore_aliases(self, data) -> bool:
return True
def increase_indent(self, flow: bool = False, indentless: bool = False):
return super().increase_indent(flow=flow, indentless=False)
dict_rows = []
for row in rows:
new_row = {}
for key, value in zip(column_names, row):
new_row[key] = value
dict_rows.append(new_row)
meta = {"column_names": column_names}
with _open_output_file(file) as stream:
print(
yaml.dump(
{"data": dict_rows, "metadata": meta},
Dumper=NoAliasIndentListSafeDumper,
default_flow_style=False,
),
end="",
file=stream,
)
def _write_csv(rows, column_names, separator, file=sys.stdout) -> None:
# Lazy import output format-specific dependencies.
# pylint: disable=import-outside-toplevel
import csv
# Newlines handled by the CSV writer. Set according to the package doc.
with _open_output_file(file, newline="") as stream:
spamwriter = csv.writer(
stream,
delimiter=separator,
quotechar='"',
doublequote=True,
quoting=csv.QUOTE_NONNUMERIC,
lineterminator="\n",
)
if column_names:
spamwriter.writerow(column_names)
for row in rows:
spamwriter.writerow(row)
def _write_table(rows, column_names, output_format, separator, file):
if output_format == "json":
_write_json(rows=rows, column_names=column_names, file=file)
elif output_format == "yaml":
_write_yaml(rows=rows, column_names=column_names, file=file)
elif output_format == "plain":
# No particular reason for this separator except that this is the original
# behavior.
if not separator:
separator = "\t"
_write_plain(rows=rows, header=column_names, separator=separator, file=file)
elif output_format == "csv":
if not separator:
separator = ","
_write_csv(rows=rows, column_names=column_names, separator=separator, file=file)
else:
msg = f"Unknown value '{output_format}' for output_format"
raise ValueError(msg)
def _get_get_registered_maps_as_objects_with_method(dataset, where, method, gran, dbif):
if method == "deltagaps":
return dataset.get_registered_maps_as_objects_with_gaps(where=where, dbif=dbif)
if method == "delta":
return dataset.get_registered_maps_as_objects(
where=where, order="start_time", dbif=dbif
)
if method == "gran":
if where:
msg = f"The where parameter is not supported with method={method}"
raise ValueError(msg)
if gran is not None and gran != "":
return dataset.get_registered_maps_as_objects_by_granularity(
gran=gran, dbif=dbif
)
return dataset.get_registered_maps_as_objects_by_granularity(dbif=dbif)
msg = f"Invalid method '{method}'"
raise ValueError(msg)
def _get_get_registered_maps_as_objects_delta_gran(
dataset, where, method, gran, dbif, msgr
):
maps = _get_get_registered_maps_as_objects_with_method(
dataset=dataset, where=where, method=method, gran=gran, dbif=dbif
)
if not maps:
return []
if isinstance(maps[0], list):
if len(maps[0]) > 0:
first_time, unused = maps[0][0].get_temporal_extent_as_tuple()
else:
msgr.warning(_("Empty map list"))
return []
else:
first_time, unused = maps[0].get_temporal_extent_as_tuple()
records = []
for map_object in maps:
if isinstance(map_object, list):
if len(map_object) > 0:
map_object = map_object[0]
else:
msgr.fatal(_("Empty entry in map list, this should not happen"))
start, end = map_object.get_temporal_extent_as_tuple()
delta = end - start if end else None
delta_first = start - first_time
if map_object.is_time_absolute():
if end:
delta = time_delta_to_relative_time(delta)
delta_first = time_delta_to_relative_time(delta_first)
records.append((map_object, start, end, delta, delta_first))
return records
def _get_list_of_maps_delta_gran(dataset, columns, where, method, gran, dbif, msgr):
maps = _get_get_registered_maps_as_objects_delta_gran(
dataset=dataset, where=where, method=method, gran=gran, dbif=dbif, msgr=msgr
)
rows = []
for map_object, start, end, delta, delta_first in maps:
row = []
# Here the names must be the same as in the database
# to make the interface consistent.
for column in columns:
if column == "id":
row.append(map_object.get_id())
elif column == "name":
row.append(map_object.get_name())
elif column == "layer":
row.append(map_object.get_layer())
elif column == "mapset":
row.append(map_object.get_mapset())
elif column == "start_time":
row.append(start)
elif column == "end_time":
row.append(end)
elif column == "interval_length":
row.append(delta)
elif column == "distance_from_begin":
row.append(delta_first)
else:
msg = f"Unsupported column '{column}'"
raise ValueError(msg)
rows.append(row)
return rows
def _get_list_of_maps_stds(
element_type,
name,
columns,
order,
where,
method,
output_format,
gran=None,
dbif=None,
):
dbif, connection_state_changed = init_dbif(dbif)
msgr = get_tgis_message_interface()
dataset = open_old_stds(name, element_type, dbif)
def check_columns(column_names, output_format, element_type):
if element_type != "stvds" and "layer" in columns:
msg = f"Column 'layer' is not allowed with temporal type '{element_type}'"
raise ValueError(msg)
if output_format == "line" and len(column_names) > 1:
msg = (
f"'{output_format}' output_format can have only 1 column, "
f"not {len(column_names)}"
)
raise ValueError(msg)
# This method expects a list of objects for gap detection
if method in {"delta", "deltagaps", "gran"}:
if not columns:
if output_format == "list":
# Only one column is needed.
columns = ["id"]
else:
columns = ["id", "name"]
if element_type == "stvds":
columns.append("layer")
columns.extend(
[
"mapset",
"start_time",
"end_time",
"interval_length",
"distance_from_begin",
]
)
check_columns(
column_names=columns,
output_format=output_format,
element_type=element_type,
)
rows = _get_list_of_maps_delta_gran(
dataset=dataset,
columns=columns,
where=where,
method=method,
gran=gran,
dbif=dbif,
msgr=msgr,
)
else:
if columns:
check_columns(
column_names=columns,
output_format=output_format,
element_type=element_type,
)
elif output_format == "line":
# For list of values, only one column is needed.
columns = ["id"]
else:
columns = ["name", "mapset", "start_time", "end_time"]
if not order:
order = "start_time"
rows = dataset.get_registered_maps(",".join(columns), where, order, dbif)
# End with error for the old, custom formats. Proper formats simply return
# empty result whatever empty is for each format (e.g., empty list for JSON).
if not rows and (output_format in {"plain", "line"}):
dbif.close()
gs.fatal(
_(
"Nothing found in the database for space time dataset <{name}> "
"(type: {element_type}): {detail}"
).format(
name=dataset.get_id(),
element_type=element_type,
detail=(
_(
"Dataset is empty or where clause is too constrained or "
"incorrect"
)
if where
else _("Dataset is empty")
),
)
)
if connection_state_changed:
dbif.close()
return rows, columns
# The code is compatible with pre-v8.2 versions, but for v9, it needs to be reviewed
# to remove the backwards compatibility which will clean it up.
[docs]def list_maps_of_stds(
type, # pylint: disable=redefined-builtin
input, # pylint: disable=redefined-builtin
columns,
order,
where,
separator,
method,
no_header: bool = False,
gran=None,
dbif=None,
outpath=None,
output_format=None,
) -> None:
"""List the maps of a space time dataset using different methods
:param type: The type of the maps raster, raster3d or vector
:param input: Name of a space time raster dataset
:param columns: A comma separated list of columns to be printed to stdout
:param order: A comma separated list of columns to order the
maps by category
:param where: A where statement for selected listing without "WHERE"
e.g: start_time < "2001-01-01" and end_time > "2001-01-01"
:param separator: The field separator character between the columns
:param method: String identifier to select a method out of cols,
comma,delta or deltagaps
:param dbif: The database interface to be used
- "cols" Print preselected columns specified by columns
- "comma" Print the map ids ("name@mapset") as comma separated string
- "delta" Print the map ids ("name@mapset") with start time,
end time, relative length of intervals and the relative
distance to the begin
- "deltagaps" Same as "delta" with additional listing of gaps.
Gaps can be easily identified as the id is "None"
- "gran" List map using the granularity of the space time dataset,
columns are identical to deltagaps
:param no_header: Suppress the printing of column names
:param gran: The user defined granule to be used if method=gran is
set, in case gran=None the granule of the space time
dataset is used
:param outpath: The path to file where to save output
"""
if not output_format:
if method == "comma":
output_format = "line"
output_format = "plain"
if columns:
if isinstance(columns, str):
columns = columns.split(",")
rows, columns = _get_list_of_maps_stds(
element_type=type,
name=input,
columns=columns,
order=order,
where=where,
method=method,
output_format=output_format,
gran=gran,
dbif=dbif,
)
if output_format == "line":
_write_line(
items=[row[0] for row in rows],
separator=separator,
file=outpath,
)
else:
_write_table(
rows=rows,
column_names=None if no_header else columns,
separator=separator,
output_format=output_format,
file=outpath,
)
###############################################################################
if __name__ == "__main__":
import doctest
doctest.testmod()