Source code for pygrass.vector.table

"""
Created on Wed Aug  8 15:29:21 2012

@author: pietro
"""
from __future__ import (
    nested_scopes,
    generators,
    division,
    absolute_import,
    with_statement,
    print_function,
    unicode_literals,
)

import os
import sys

import ctypes
import numpy as np
from sqlite3 import OperationalError

try:
    from collections import OrderedDict
except ImportError:
    from grass.pygrass.orderdict import OrderedDict

import grass.lib.vector as libvect
from grass.pygrass.gis import Mapset
from grass.exceptions import DBError
from grass.pygrass.utils import table_exist, decode
from grass.script.db import db_table_in_vector
from grass.script.core import warning

from grass.pygrass.vector import sql
from grass.lib.ctypes_preamble import ReturnString


if sys.version_info.major >= 3:
    long = int
    unicode = str


# For test purposes
test_vector_name = "table_doctest_map"

DRIVERS = ("sqlite", "pg")


[docs]def get_path(path, vect_name=None): """Return the full path to the database; replacing environment variable with real values :param path: The path with substitutional parameter :param vect_name: The name of the vector map >>> from grass.script.core import gisenv >>> import os >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> new_path = get_path(path) >>> new_path2 = os.path.join(gisenv()['GISDBASE'], gisenv()['LOCATION_NAME'], ... gisenv()['MAPSET'], 'sqlite', 'sqlite.db') >>> new_path.replace("//","/") == new_path2.replace("//","/") True >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/vector/$MAP/sqlite.db' >>> new_path = get_path(path, "test") >>> new_path2 = os.path.join(gisenv()['GISDBASE'], gisenv()['LOCATION_NAME'], ... gisenv()['MAPSET'], 'vector', 'test', 'sqlite.db') >>> new_path.replace("//","/") == new_path2.replace("//","/") True """ if "$" not in path: return path else: mapset = Mapset() path = path.replace("$GISDBASE", mapset.gisdbase) path = path.replace("$LOCATION_NAME", mapset.location) path = path.replace("$MAPSET", mapset.name) if vect_name is not None: path = path.replace("$MAP", vect_name) return path
[docs]class Filters(object): """Help user to build a simple sql query. >>> filter = Filters('table') >>> filter.get_sql() 'SELECT * FROM table;' >>> filter.where("area<10000").get_sql() 'SELECT * FROM table WHERE area<10000;' >>> filter.select("cat", "area").get_sql() 'SELECT cat, area FROM table WHERE area<10000;' >>> filter.order_by("area").limit(10).get_sql() 'SELECT cat, area FROM table WHERE area<10000 ORDER BY area LIMIT 10;' """ def __init__(self, tname): self.tname = tname self._select = None self._where = None self._orderby = None self._limit = None self._groupby = None def __repr__(self): return "Filters(%r)" % self.get_sql()
[docs] def select(self, *args): """Create the select query""" cols = ", ".join(args) if args else "*" select = sql.SELECT[:-1] self._select = select.format(cols=cols, tname=self.tname) return self
[docs] def where(self, condition): """Create the where condition :param condition: the condition of where statement, for example `cat = 1` :type condition: str """ self._where = "WHERE {condition}".format(condition=condition) return self
[docs] def order_by(self, *orderby): """Create the order by condition :param orderby: the name of column/s to order the result :type orderby: str """ self._orderby = "ORDER BY {orderby}".format(orderby=", ".join(orderby)) return self
[docs] def limit(self, number): """Create the limit condition :param number: the number to limit the result :type number: int """ if not isinstance(number, int): raise ValueError("Must be an integer.") else: self._limit = "LIMIT {number}".format(number=number) return self
[docs] def group_by(self, *groupby): """Create the group by condition :param groupby: the name of column/s to group the result :type groupby: str, list """ self._groupby = "GROUP BY {groupby}".format(groupby=", ".join(groupby)) return self
[docs] def get_sql(self): """Return the SQL query""" sql_list = list() if self._select is None: self.select() sql_list.append(self._select) if self._where is not None: sql_list.append(self._where) if self._groupby is not None: sql_list.append(self._groupby) if self._orderby is not None: sql_list.append(self._orderby) if self._limit is not None: sql_list.append(self._limit) return "%s;" % " ".join(sql_list)
[docs] def reset(self): """Clean internal variables""" self._select = None self._where = None self._orderby = None self._limit = None self._groupby = None
[docs]class Columns(object): """Object to work with columns table. It is possible to instantiate a Columns object given the table name and the database connection. For a sqlite table: >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> cols_sqlite = Columns(test_vector_name, ... sqlite3.connect(get_path(path))) >>> cols_sqlite.tname 'table_doctest_map' For a postgreSQL table: >>> import psycopg2 as pg #doctest: +SKIP >>> cols_pg = Columns(test_vector_name, ... pg.connect('host=localhost dbname=grassdb')) #doctest: +SKIP >>> cols_pg.tname #doctest: +SKIP 'table_doctest_map' #doctest: +SKIP """ def __init__(self, tname, connection, key="cat"): self.tname = tname self.conn = connection self.key = key self.odict = None self.update_odict() def __contains__(self, item): return item in self.names() def __repr__(self): return "Columns(%r)" % list(self.items()) def __getitem__(self, key): return self.odict[key] def __setitem__(self, name, new_type): self.cast(name, new_type) self.update_odict(self) def __iter__(self): return self.odict.__iter__() def __len__(self): return self.odict.__len__() def __eq__(self, obj): """Return True if two table have the same columns. >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> connection = sqlite3.connect(get_path(path)) >>> cols0 = Columns(test_vector_name, connection) >>> cols1 = Columns(test_vector_name, connection) >>> cols0 == cols1 True """ return obj.tname == self.tname and obj.odict == self.odict def __ne__(self, other): return not self == other # Restore Python 2 hashing beaviour on Python 3 __hash__ = object.__hash__
[docs] def is_pg(self): """Return True if is a psycopg connection. >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> cols_sqlite = Columns(test_vector_name, ... sqlite3.connect(get_path(path))) >>> cols_sqlite.is_pg() False >>> import psycopg2 as pg #doctest: +SKIP >>> cols_pg = Columns(test_vector_name, ... pg.connect('host=localhost dbname=grassdb')) #doctest: +SKIP >>> cols_pg.is_pg() #doctest: +SKIP True """ return hasattr(self.conn, "xid")
[docs] def update_odict(self): """Read columns name and types from table and update the odict attribute. """ if self.is_pg(): # is a postgres connection cur = self.conn.cursor() cur.execute("SELECT oid,typname FROM pg_type") diz = dict(cur.fetchall()) odict = OrderedDict() import psycopg2 as pg try: cur.execute(sql.SELECT.format(cols="*", tname=self.tname)) descr = cur.description for column in descr: name, ctype = column[:2] odict[name] = diz[ctype] except pg.ProgrammingError: pass self.odict = odict else: # is a sqlite connection cur = self.conn.cursor() cur.execute(sql.PRAGMA.format(tname=self.tname)) descr = cur.fetchall() odict = OrderedDict() for column in descr: name, ctype = column[1:3] odict[name] = ctype self.odict = odict values = ",".join( [ "?", ] * self.__len__() ) kv = ",".join(["%s=?" % k for k in self.odict.keys() if k != self.key]) where = "%s=?" % self.key self.insert_str = sql.INSERT.format(tname=self.tname, values=values) self.update_str = sql.UPDATE_WHERE.format( tname=self.tname, values=kv, condition=where )
[docs] def sql_descr(self, remove=None): """Return a string with description of columns. Remove it is used to remove a columns. >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> cols_sqlite = Columns(test_vector_name, ... sqlite3.connect(get_path(path))) >>> cols_sqlite.sql_descr() # doctest: +ELLIPSIS 'cat INTEGER, name varchar(50), value double precision' >>> import psycopg2 as pg # doctest: +SKIP >>> cols_pg = Columns(test_vector_name, ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP >>> cols_pg.sql_descr() # doctest: +ELLIPSIS +SKIP 'cat INTEGER, name varchar(50), value double precision' """ if remove: return ", ".join( ["%s %s" % (key, val) for key, val in self.items() if key != remove] ) else: return ", ".join(["%s %s" % (key, val) for key, val in self.items()])
[docs] def types(self): """Return a list with the column types. >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> cols_sqlite = Columns(test_vector_name, ... sqlite3.connect(get_path(path))) >>> cols_sqlite.types() # doctest: +ELLIPSIS ['INTEGER', 'varchar(50)', 'double precision'] >>> import psycopg2 as pg # doctest: +SKIP >>> cols_pg = Columns(test_vector_name, ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP >>> cols_pg.types() # doctest: +ELLIPSIS +SKIP ['INTEGER', 'varchar(50)', 'double precision'] """ return list(self.odict.values())
[docs] def names(self, remove=None, unicod=True): """Return a list with the column names. Remove it is used to remove a columns. >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> cols_sqlite = Columns(test_vector_name, ... sqlite3.connect(get_path(path))) >>> cols_sqlite.names() # doctest: +ELLIPSIS ['cat', 'name', 'value'] >>> import psycopg2 as pg # doctest: +SKIP >>> cols_pg = Columns(test_vector_name, # doctest: +SKIP ... pg.connect('host=localhost dbname=grassdb')) >>> cols_pg.names() # doctest: +ELLIPSIS +SKIP ['cat', 'name', 'value'] """ if remove: nams = list(self.odict.keys()) nams.remove(remove) else: nams = list(self.odict.keys()) if unicod: return nams else: return [str(name) for name in nams]
[docs] def items(self): """Return a list of tuple with column name and column type. >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> cols_sqlite = Columns(test_vector_name, ... sqlite3.connect(get_path(path))) >>> cols_sqlite.items() # doctest: +ELLIPSIS [('cat', 'INTEGER'), ('name', 'varchar(50)'), ('value', 'double precision')] >>> import psycopg2 as pg # doctest: +SKIP >>> cols_pg = Columns(test_vector_name, ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP >>> cols_pg.items() # doctest: +ELLIPSIS +SKIP [('cat', 'INTEGER'), ('name', 'varchar(50)'), ('value', 'double precision')] """ return list(self.odict.items())
[docs] def add(self, col_name, col_type): """Add a new column to the table. :param col_name: the name of column to add :type col_name: str :param col_type: the tipe of column to add :type col_type: str >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> from grass.pygrass.utils import copy, remove >>> copy(test_vector_name,'mycensus','vect') >>> cols_sqlite = Columns('mycensus', ... sqlite3.connect(get_path(path))) >>> cols_sqlite.add(['n_pizza'], ['INT']) >>> 'n_pizza' in cols_sqlite True >>> import psycopg2 as pg # doctest: +SKIP >>> cols_pg = Columns('boundary_municp_pg', ... pg.connect('host=localhost dbname=grassdb')) #doctest: +SKIP >>> cols_pg.add('n_pizza', 'INT') # doctest: +SKIP >>> 'n_pizza' in cols_pg # doctest: +SKIP True >>> remove('mycensus', 'vect') """ def check(col_type): """Check the column type if it is supported by GRASS :param col_type: the type of column :type col_type: str """ valid_type = ( "DOUBLE PRECISION", "DOUBLE", "INT", "INTEGER", "DATE", "VARCHAR", ) col = col_type.upper() valid = [col.startswith(tp) for tp in valid_type] if not any(valid): str_err = "Type: %r is not supported." "\nSupported types are: %s" raise TypeError(str_err % (col_type, ", ".join(valid_type))) return col_type col_type = ( [ check(col_type), ] if isinstance(col_type, (str, unicode)) else [check(col) for col in col_type] ) col_name = ( [ col_name, ] if isinstance(col_name, (str, unicode)) else col_name ) sqlcode = [ sql.ADD_COL.format(tname=self.tname, cname=cn, ctype=ct) for cn, ct in zip(col_name, col_type) ] cur = self.conn.cursor() cur.executescript("\n".join(sqlcode)) self.conn.commit() cur.close() self.update_odict()
[docs] def rename(self, old_name, new_name): """Rename a column of the table. :param old_name: the name of existing column :type old_name: str :param new_name: the name of new column :type new_name: str >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> from grass.pygrass.utils import copy, remove >>> copy(test_vector_name,'mycensus','vect') >>> cols_sqlite = Columns('mycensus', ... sqlite3.connect(get_path(path))) >>> cols_sqlite.add(['n_pizza'], ['INT']) >>> 'n_pizza' in cols_sqlite True >>> cols_sqlite.rename('n_pizza', 'n_pizzas') # doctest: +ELLIPSIS >>> 'n_pizza' in cols_sqlite False >>> 'n_pizzas' in cols_sqlite True >>> import psycopg2 as pg # doctest: +SKIP >>> cols_pg = Columns(test_vector_name, ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP >>> cols_pg.rename('n_pizza', 'n_pizzas') # doctest: +SKIP >>> 'n_pizza' in cols_pg # doctest: +SKIP False >>> 'n_pizzas' in cols_pg # doctest: +SKIP True >>> remove('mycensus', 'vect') """ cur = self.conn.cursor() if self.is_pg(): cur.execute( sql.RENAME_COL.format( tname=self.tname, old_name=old_name, new_name=new_name ) ) self.conn.commit() cur.close() self.update_odict() else: cur.execute( sql.ADD_COL.format( tname=self.tname, cname=new_name, ctype=str(self.odict[old_name]) ) ) cur.execute( sql.UPDATE.format(tname=self.tname, new_col=new_name, old_col=old_name) ) self.conn.commit() cur.close() self.update_odict() self.drop(old_name)
[docs] def cast(self, col_name, new_type): """Change the column type. :param col_name: the name of column :type col_name: str :param new_type: the new type of column :type new_type: str >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> from grass.pygrass.utils import copy, remove >>> copy(test_vector_name,'mycensus','vect') >>> cols_sqlite = Columns('mycensus', ... sqlite3.connect(get_path(path))) >>> cols_sqlite.add(['n_pizzas'], ['INT']) >>> cols_sqlite.cast('n_pizzas', 'float8') # doctest: +ELLIPSIS Traceback (most recent call last): ... grass.exceptions.DBError: SQLite does not support to cast columns. >>> import psycopg2 as pg # doctest: +SKIP >>> cols_pg = Columns(test_vector_name, ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP >>> cols_pg.cast('n_pizzas', 'float8') # doctest: +SKIP >>> cols_pg['n_pizzas'] # doctest: +SKIP 'float8' >>> remove('mycensus', 'vect') .. warning :: It is not possible to cast a column with sqlite """ if self.is_pg(): cur = self.conn.cursor() cur.execute( sql.CAST_COL.format(tname=self.tname, col=col_name, ctype=new_type) ) self.conn.commit() cur.close() self.update_odict() else: # sqlite does not support rename columns: raise DBError("SQLite does not support to cast columns.")
[docs] def drop(self, col_name): """Drop a column from the table. :param col_name: the name of column to remove :type col_name: str >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> from grass.pygrass.utils import copy, remove >>> copy(test_vector_name,'mycensus','vect') >>> cols_sqlite = Columns('mycensus', ... sqlite3.connect(get_path(path))) >>> cols_sqlite.drop('name') # doctest: +ELLIPSIS >>> 'name' in cols_sqlite False >>> import psycopg2 as pg # doctest: +SKIP >>> cols_pg = Columns(test_vector_name, ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP >>> cols_pg.drop('name') # doctest: +SKIP >>> 'name' in cols_pg # doctest: +SKIP False >>> remove('mycensus','vect') """ cur = self.conn.cursor() if self.is_pg(): cur.execute(sql.DROP_COL.format(tname=self.tname, cname=col_name)) else: desc = str(self.sql_descr(remove=col_name)) names = ", ".join(self.names(remove=col_name, unicod=False)) queries = sql.DROP_COL_SQLITE.format( tname=self.tname, keycol=self.key, coldef=desc, colnames=names ).split("\n") for query in queries: cur.execute(query) self.conn.commit() cur.close() self.update_odict()
[docs]class Table(object): """ >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/PERMANENT/sqlite/sqlite.db' >>> tab_sqlite = Table(name=test_vector_name, ... connection=sqlite3.connect(get_path(path))) >>> tab_sqlite.name 'table_doctest_map' >>> import psycopg2 # doctest: +SKIP >>> tab_pg = Table(test_vector_name, ... psycopg2.connect('host=localhost dbname=grassdb', ... 'pg')) # doctest: +SKIP >>> tab_pg.columns # doctest: +ELLIPSIS +SKIP Columns([('cat', 'int4'), ...]) """ def _get_name(self): """Private method to return the name of table""" return self._name def _set_name(self, new_name): """Private method to set the name of table :param new_name: the new name of table :type new_name: str """ old_name = self._name cur = self.conn.cursor() cur.execute(sql.RENAME_TAB.format(old_name=old_name, new_name=new_name)) self.conn.commit() cur.close() name = property(fget=_get_name, fset=_set_name, doc="Set and obtain table name") def __init__(self, name, connection, key="cat"): self._name = name self.conn = connection self.key = key self.columns = Columns(self.name, self.conn, self.key) self.filters = Filters(self.name) def __repr__(self): """ >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/PERMANENT/sqlite/sqlite.db' >>> tab_sqlite = Table(name=test_vector_name, ... connection=sqlite3.connect(get_path(path))) >>> tab_sqlite Table('table_doctest_map') """ return "Table(%r)" % (self.name) def __iter__(self): cur = self.execute() return (cur.fetchone() for _ in range(self.__len__())) def __len__(self): """Return the number of rows""" return self.n_rows()
[docs] def drop(self, cursor=None, force=False): """Method to drop table from database :param cursor: the cursor to connect, if None it use the cursor of connection table object :type cursor: Cursor object :param force: True to remove the table, by default False to print advice :type force: bool """ cur = cursor if cursor else self.conn.cursor() if self.exist(cursor=cur): used = db_table_in_vector(self.name) if used is not None and len(used) > 0 and not force: print( _("Deleting table <%s> which is attached" " to following map(s):") % self.name ) for vect in used: warning("%s" % vect) print( _("You must use the force flag to actually" " remove it. Exiting.") ) else: cur.execute(sql.DROP_TAB.format(tname=self.name))
[docs] def n_rows(self): """Return the number of rows >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> tab_sqlite = Table(name=test_vector_name, ... connection=sqlite3.connect(get_path(path))) >>> tab_sqlite.n_rows() 3 """ cur = self.conn.cursor() cur.execute(sql.SELECT.format(cols="Count(*)", tname=self.name)) number = cur.fetchone()[0] cur.close() return number
[docs] def execute(self, sql_code=None, cursor=None, many=False, values=None): """Execute SQL code from a given string or build with filters and return a cursor object. :param sql_code: the SQL code to execute, if not pass it use filters variable :type sql_code: str :param cursor: the cursor to connect, if None it use the cursor of connection table object :type cursor: Cursor object :param many: True to run executemany function :type many: bool :param values: The values to substitute into sql_code string :type values: list of tuple >>> import sqlite3 >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db' >>> tab_sqlite = Table(name=test_vector_name, ... connection=sqlite3.connect(get_path(path))) >>> tab_sqlite.filters.select('cat', 'name').order_by('value') Filters('SELECT cat, name FROM table_doctest_map ORDER BY value;') >>> cur = tab_sqlite.execute() >>> cur.fetchone() #doctest: +ELLIPSIS +NORMALIZE_WHITESPACE (1, 'point') """ try: sqlc = sql_code if sql_code else self.filters.get_sql() cur = cursor if cursor else self.conn.cursor() if many and values: return cur.executemany(sqlc, values) return cur.execute(sqlc, values) if values else cur.execute(sqlc) except Exception as exc: raise ValueError( "The SQL statement is not correct:\n%r,\n" "values: %r,\n" "SQL error: %s" % (sqlc, values, str(exc)) )
[docs] def exist(self, cursor=None): """Return True if the table already exist in the DB, False otherwise :param cursor: the cursor to connect, if None it use the cursor of connection table object """ cur = cursor if cursor else self.conn.cursor() return table_exist(cur, self.name)
[docs] def insert(self, values, cursor=None, many=False): """Insert a new row :param values: a tuple of values to insert, it is possible to insert more rows using a list of tuple and parameter `many` :type values: tuple :param cursor: the cursor to connect, if None it use the cursor of connection table object :type cursor: Cursor object :param many: True to run executemany function :type many: bool """ cur = cursor if cursor else self.conn.cursor() if many: return cur.executemany(self.columns.insert_str, values) return cur.execute(self.columns.insert_str, values)
[docs] def update(self, key, values, cursor=None): """Update a table row :param key: the rowid :type key: int :param values: the values to insert without row id. For example if we have a table with four columns: cat, c0, c1, c2 the values list should containing only c0, c1, c2 values. :type values: list :param cursor: the cursor to connect, if None it use the cursor of connection table object :type cursor: Cursor object """ cur = cursor if cursor else self.conn.cursor() vals = list(values) + [ key, ] return cur.execute(self.columns.update_str, vals)
[docs] def create(self, cols, name=None, overwrite=False, cursor=None): """Create a new table :param cols: :type cols: :param name: the name of table to create, None for the name of Table object :type name: str :param overwrite: overwrite existing table :type overwrite: bool :param cursor: the cursor to connect, if None it use the cursor of connection table object :type cursor: Cursor object """ cur = cursor if cursor else self.conn.cursor() coldef = ",\n".join(["%s %s" % col for col in cols]) if name: newname = name else: newname = self.name try: cur.execute(sql.CREATE_TAB.format(tname=newname, coldef=coldef)) self.conn.commit() except OperationalError: # OperationalError if overwrite: self.drop(force=True) cur.execute(sql.CREATE_TAB.format(tname=newname, coldef=coldef)) self.conn.commit() else: print("The table: %s already exist." % self.name) cur.close() self.columns.update_odict()
if __name__ == "__main__": import doctest from grass.pygrass import utils utils.create_test_vector_map(test_vector_name) doctest.testmod() """Remove the generated vector map, if exist""" from grass.pygrass.utils import get_mapset_vector from grass.script.core import run_command mset = get_mapset_vector(test_vector_name, mapset="") if mset: run_command("g.remove", flags="f", type="vector", name=test_vector_name)