Source code for temporal.stds_import

"""
Space time dataset import functions

Usage:

.. code-block:: python

    import grass.temporal as tgis

    input="/tmp/temp_1950_2012.tar.gz"
    output="temp_1950_2012"
    directory="/tmp"
    title="My new dataset"
    descr="May new shiny dataset"
    location=None
    link=True
    exp=True
    overr=False
    create=False
    tgis.import_stds(input, output, directory, title, descr, location,
                    link, exp, overr, create, "strds")


(C) 2012-2013 by the GRASS Development Team
This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS
for details.

:authors: Soeren Gebbert
"""

import os
import os.path
import tarfile
from pathlib import Path

import grass.script as gs
from grass.exceptions import CalledModuleError

from .core import get_current_mapset, get_tgis_message_interface
from .factory import dataset_factory
from .register import register_maps_in_space_time_dataset

proj_file_name = "proj.txt"
init_file_name = "init.txt"
list_file_name = "list.txt"

# This global variable is for unique vector map export,
# since single vector maps may have several layer
# and therefore several attribute tables
imported_maps = {}

############################################################################


def _import_raster_maps_from_gdal(
    maplist,
    overr,
    exp,
    location,
    link,
    format_,
    set_current_region: bool = False,
    memory=300,
) -> None:
    impflags = ""
    if overr:
        impflags += "o"
    if exp or location:
        impflags += "e"
    for row in maplist:
        name = row["name"]
        if format_ == "GTiff":
            filename = row["filename"] + ".tif"
        elif format_ == "AAIGrid":
            filename = row["filename"] + ".asc"
            if not overr:
                impflags += "o"

        try:
            if link:
                gs.run_command(
                    "r.external",
                    input=filename,
                    output=name,
                    flags=impflags,
                    overwrite=gs.overwrite(),
                )
            else:
                gs.run_command(
                    "r.in.gdal",
                    input=filename,
                    output=name,
                    memory=memory,
                    flags=impflags,
                    overwrite=gs.overwrite(),
                )

        except CalledModuleError:
            gs.fatal(
                _("Unable to import/link raster map <%s> from file %s.")
                % (name, filename)
            )

        # Set the color rules if present
        filename = row["filename"] + ".color"
        if os.path.isfile(filename):
            try:
                gs.run_command(
                    "r.colors", map=name, rules=filename, overwrite=gs.overwrite()
                )
            except CalledModuleError:
                gs.fatal(_("Unable to set the color rules for raster map <%s>.") % name)

    # Set the computational region from the last map imported
    if set_current_region is True:
        gs.run_command("g.region", raster=name)


############################################################################


def _import_raster_maps(maplist, set_current_region: bool = False) -> None:
    # We need to disable the projection check because of its
    # simple implementation
    impflags = "o"
    for row in maplist:
        name = row["name"]
        filename = row["filename"] + ".pack"
        try:
            gs.run_command(
                "r.unpack",
                input=filename,
                output=name,
                flags=impflags,
                overwrite=gs.overwrite(),
                verbose=True,
            )

        except CalledModuleError:
            gs.fatal(
                _("Unable to unpack raster map <%s> from file %s.") % (name, filename)
            )

    # Set the computational region from the last map imported
    if set_current_region is True:
        gs.run_command("g.region", raster=name)


############################################################################


def _import_vector_maps_from_gml(maplist, overr, exp, location, link) -> None:
    impflags = "o"
    if exp or location:
        impflags += "e"
    for row in maplist:
        name = row["name"]
        filename = row["filename"] + ".xml"

        try:
            gs.run_command(
                "v.in.ogr",
                input=filename,
                output=name,
                flags=impflags,
                overwrite=gs.overwrite(),
            )

        except CalledModuleError:
            gs.fatal(
                _("Unable to import vector map <%s> from file %s.") % (name, filename)
            )


############################################################################


def _import_vector_maps(maplist) -> None:
    # We need to disable the projection check because of its
    # simple implementation
    impflags = "o"
    for row in maplist:
        # Separate the name from the layer
        name = row["name"].split(":")[0]
        # Import only unique maps
        if name in imported_maps:
            continue
        filename = row["filename"] + ".pack"
        try:
            gs.run_command(
                "v.unpack",
                input=filename,
                output=name,
                flags=impflags,
                overwrite=gs.overwrite(),
                verbose=True,
            )

        except CalledModuleError:
            gs.fatal(
                _("Unable to unpack vector map <%s> from file %s.") % (name, filename)
            )

        imported_maps[name] = name


############################################################################


[docs]def import_stds( input, output, directory, title=None, descr=None, location=None, link: bool = False, exp: bool = False, overr: bool = False, create: bool = False, stds_type="strds", base=None, set_current_region: bool = False, memory=300, ) -> None: """Import space time datasets of type raster and vector :param input: Name of the input archive file :param output: The name of the output space time dataset :param directory: The extraction directory :param title: The title of the new created space time dataset :param descr: The description of the new created space time dataset :param location: The name of the location that should be created, maps are imported into this location :param link: Switch to link raster maps instead importing them :param exp: Extend location extents based on new dataset :param overr: Override projection (use location's projection) :param create: Create the location specified by the "location" parameter and exit. Do not import the space time datasets. :param stds_type: The type of the space time dataset that should be imported :param base: The base name of the new imported maps, it will be extended using a numerical index. :param memory: Cache size for raster rows, used in r.in.gdal """ old_state = gs.get_raise_on_error() gs.set_raise_on_error(True) # Check if input file and extraction directory exits if not os.path.exists(input): gs.fatal(_("Space time raster dataset archive <%s> not found") % input) if not create and not os.path.exists(directory): gs.fatal(_("Extraction directory <%s> not found") % directory) tar = tarfile.open(name=input, mode="r") # Check for important files msgr = get_tgis_message_interface() msgr.message( _("Checking validity of input file (size: %0.1f MB). Make take a while...") % (os.path.getsize(input) / (1024 * 1024.0)) ) members = tar.getnames() # Make sure that the basenames of the files are used for comparison member_basenames = [os.path.basename(name) for name in members] if init_file_name not in member_basenames: gs.fatal(_("Unable to find init file <%s>") % init_file_name) if list_file_name not in member_basenames: gs.fatal(_("Unable to find list file <%s>") % list_file_name) if proj_file_name not in member_basenames: gs.fatal(_("Unable to find projection file <%s>") % proj_file_name) msgr.message(_("Extracting data...")) # Extraction filters were added in Python 3.12, # and backported to 3.8.17, 3.9.17, 3.10.12, and 3.11.4 # See https://docs.python.org/3.12/library/tarfile.html#tarfile-extraction-filter # and https://peps.python.org/pep-0706/ # In Python 3.12, using `filter=None` triggers a DepreciationWarning, # and in Python 3.14, `filter='data'` will be the default if hasattr(tarfile, "data_filter"): tar.extractall(path=directory, filter="data") else: # Remove this when no longer needed gs.warning(_("Extracting may be unsafe; consider updating Python")) tar.extractall(path=directory) tar.close() # We use a new list file name for map registration new_list_file_name = list_file_name + "_new" # Save current working directory path old_cwd = Path.cwd() # Switch into the data directory os.chdir(directory) # Check projection information if not location: temp_name = gs.tempfile() temp_file = open(temp_name, "w") proj_name = os.path.abspath(proj_file_name) # We need to convert projection strings generated # from other programs than g.proj into # new line format so that the grass file comparison function # can be used to compare the projections proj_content = Path(proj_name).read_text() proj_content = proj_content.replace(" +", "\n+") proj_content = proj_content.replace("\t+", "\n+") proj_name_tmp = f"{temp_name}_in_projection" Path(proj_name_tmp).write_text(proj_content) p = gs.start_command("g.proj", flags="j", stdout=temp_file) p.communicate() temp_file.close() if not gs.compare_key_value_text_files(temp_name, proj_name_tmp, sep="="): if overr: gs.warning(_("Projection information does not match. Proceeding...")) else: diff = "".join(gs.diff_files(temp_name, proj_name)) gs.warning( _( "Difference between PROJ_INFO file of " "imported map and of current location:" "\n{diff}" ).format(diff=diff) ) gs.fatal(_("Projection information does not match. Aborting.")) # Create a new location based on the projection information and switch # into it old_env = gs.gisenv() if location: try: proj4_string = Path(proj_file_name).read_text() gs.create_location( dbase=old_env["GISDBASE"], location=location, proj4=proj4_string ) # Just create a new location and return if create: os.chdir(old_cwd) return except Exception as e: gs.fatal( _("Unable to create location %(l)s. Reason: %(e)s") % {"l": location, "e": str(e)} ) # Switch to the new created location try: gs.run_command( "g.mapset", mapset="PERMANENT", project=location, dbase=old_env["GISDBASE"], ) except CalledModuleError: gs.fatal(_("Unable to switch to location %s") % location) # create default database connection try: gs.run_command("t.connect", flags="d") except CalledModuleError: gs.fatal( _("Unable to create default temporal database in new location %s") % location ) try: # Make sure the temporal database exists from .core import init init() fs = "|" maplist = [] mapset = get_current_mapset() list_file = open(list_file_name) new_list_file = open(new_list_file_name, "w") # get number of lines to correctly form the suffix max_count = -1 for max_count, l in enumerate(list_file): pass max_count += 1 list_file.seek(0) # Read the map list from file line_count = 0 while True: line = list_file.readline() if not line: break line_list = line.split(fs) # The filename is actually the base name of the map # that must be extended by the file suffix filename = line_list[0].strip().split(":")[0] if base: mapname = "%s_%s" % ( base, gs.get_num_suffix(line_count + 1, max_count), ) mapid = "%s@%s" % (mapname, mapset) else: mapname = filename mapid = mapname + "@" + mapset row = {} row["filename"] = filename row["name"] = mapname row["id"] = mapid row["start"] = line_list[1].strip() row["end"] = line_list[2].strip() row["semantic_label"] = line_list[3].strip() if len(line_list) == 4 else "" new_list_file.write( f"{mapname}{fs}{row['start']}{fs}{row['end']}" f"{fs}{row['semantic_label']}\n" ) maplist.append(row) line_count += 1 list_file.close() new_list_file.close() # Read the init file fs = "=" init = {} init_file = open(init_file_name) while True: line = init_file.readline() if not line: break kv = line.split(fs) init[kv[0]] = kv[1].strip() init_file.close() if ( "temporal_type" not in init or "semantic_type" not in init or "number_of_maps" not in init ): gs.fatal( _("Key words %(t)s, %(s)s or %(n)s not found in init file.") % {"t": "temporal_type", "s": "semantic_type", "n": "number_of_maps"} ) if line_count != int(init["number_of_maps"]): gs.fatal(_("Number of maps mismatch in init and list file.")) format_ = "GTiff" type_ = "strds" if "stds_type" in init: type_ = init["stds_type"] if "format" in init: format_ = init["format"] if stds_type != type_: gs.fatal(_("The archive file is of wrong space time dataset type")) # Check the existence of the files if format_ == "GTiff": for row in maplist: filename = row["filename"] + ".tif" if not os.path.exists(filename): gs.fatal( _("Unable to find GeoTIFF raster file <%s> in archive.") % filename ) elif format_ == "AAIGrid": for row in maplist: filename = row["filename"] + ".asc" if not os.path.exists(filename): gs.fatal( _("Unable to find AAIGrid raster file <%s> in archive.") % filename ) elif format_ == "GML": for row in maplist: filename = row["filename"] + ".xml" if not os.path.exists(filename): gs.fatal( _("Unable to find GML vector file <%s> in archive.") % filename ) elif format_ == "pack": for row in maplist: if type_ == "stvds": filename = str(row["filename"].split(":")[0]) + ".pack" else: filename = row["filename"] + ".pack" if not os.path.exists(filename): gs.fatal( _("Unable to find GRASS package file <%s> in archive.") % filename ) else: gs.fatal(_("Unsupported input format")) # Check the space time dataset id = output + "@" + mapset sp = dataset_factory(type_, id) if sp.is_in_db() and gs.overwrite() is False: gs.fatal( _( "Space time %(t)s dataset <%(sp)s> is already in" " the database. Use the overwrite flag." ) % {"t": type_, "sp": sp.get_id()} ) # Import the maps if type_ == "strds": if format_ in {"GTiff", "AAIGrid"}: _import_raster_maps_from_gdal( maplist, overr, exp, location, link, format_, set_current_region, memory, ) if format_ == "pack": _import_raster_maps(maplist, set_current_region) elif type_ == "stvds": if format_ == "GML": _import_vector_maps_from_gml(maplist, overr, exp, location, link) if format_ == "pack": _import_vector_maps(maplist) # Create the space time dataset if sp.is_in_db() and gs.overwrite() is True: gs.info( _( "Overwrite space time %(sp)s dataset " "<%(id)s> and unregister all maps." ) % {"sp": sp.get_new_map_instance(None).get_type(), "id": sp.get_id()} ) sp.delete() sp = sp.get_new_instance(id) temporal_type = init["temporal_type"] semantic_type = init["semantic_type"] relative_time_unit = None if temporal_type == "relative": if "relative_time_unit" not in init: gs.fatal( _("Key word %s not found in init file.") % ("relative_time_unit") ) relative_time_unit = init["relative_time_unit"] sp.set_relative_time_unit(relative_time_unit) gs.verbose( _("Create space time %s dataset.") % sp.get_new_map_instance(None).get_type() ) sp.set_initial_values( temporal_type=temporal_type, semantic_type=semantic_type, title=title, description=descr, ) sp.insert() # register the maps fs = "|" register_maps_in_space_time_dataset( type=sp.get_new_map_instance(None).get_type(), name=output, file=new_list_file_name, start="file", end="file", unit=relative_time_unit, dbif=None, fs=fs, update_cmd_list=False, ) os.chdir(old_cwd) # Make sure the location is switched back correctly finally: if location: # Switch to the old location try: gs.run_command( "g.mapset", mapset=old_env["MAPSET"], project=old_env["LOCATION_NAME"], gisdbase=old_env["GISDBASE"], ) except CalledModuleError: gs.warning(_("Switching to original location failed")) gs.set_raise_on_error(old_state)