Source code for pygrass.tests.benchmark

"""
Created on Sat Jun 16 20:24:56 2012

@author: soeren
"""

import optparse

# import numpy as np
import time
import collections
import copy
import cProfile
import sys
from jinja2 import Template
from pathlib import Path


sys.path.append(str(Path.cwd()))
sys.path.append("%s/.." % (str(Path.cwd())))


# flake8: noqa: E402
import grass.lib.gis as libgis
import grass.lib.raster as libraster
import grass.script as gs
import ctypes

# flake8: qa


[docs]def test__RasterSegment_value_access__if(): test_a = pygrass.RasterSegment(name="test_a") test_a.open(mode="r") test_c = pygrass.RasterSegment(name="test_c") test_c.open(mode="w", mtype="CELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) for col in range(test_a.cols): test_c.put(row, col, buff_a[col] > 50) test_a.close() test_c.close()
[docs]def test__RasterSegment_value_access__add(): test_a = pygrass.RasterSegment(name="test_a") test_a.open(mode="r") test_b = pygrass.RasterSegment(name="test_b") test_b.open(mode="r") test_c = pygrass.RasterSegment(name="test_c") test_c.open(mode="w", mtype="DCELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) buff_b = pygrass.Buffer(test_b.cols, test_b.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) test_b.get_row(row, buff_b) for col in range(test_a.cols): test_c.put(row, col, buff_a[col] + buff_b[col]) test_a.close() test_b.close() test_c.close()
[docs]def test__RasterSegment_row_access__if(): test_a = pygrass.RasterSegment(name="test_a") test_a.open(mode="r") test_c = pygrass.RasterSegment(name="test_c") test_c.open(mode="w", mtype="CELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) test_c.put_row(row, buff_a > 50) test_a.close() test_c.close()
[docs]def test__RasterSegment_row_access__add(): test_a = pygrass.RasterSegment(name="test_a") test_a.open(mode="r") test_b = pygrass.RasterSegment(name="test_b") test_b.open(mode="r") test_c = pygrass.RasterSegment(name="test_c") test_c.open(mode="w", mtype="DCELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) buff_b = pygrass.Buffer(test_b.cols, test_b.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) test_b.get_row(row, buff_b) test_c.put_row(row, buff_a + buff_b) test_a.close() test_b.close() test_c.close()
[docs]def test__RasterRow_value_access__add(): test_a = pygrass.RasterRow(name="test_a") test_a.open(mode="r") test_b = pygrass.RasterRow(name="test_b") test_b.open(mode="r") test_c = pygrass.RasterRow(name="test_c") test_c.open(mode="w", mtype="FCELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) buff_b = pygrass.Buffer(test_b.cols, test_b.mtype) buff_c = pygrass.Buffer(test_b.cols, test_b.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) test_b.get_row(row, buff_b) for col in range(test_a.cols): buff_c[col] = buff_a[col] + buff_b[col] test_c.put_row(buff_c) test_a.close() test_b.close() test_c.close()
[docs]def test__RasterRow_value_access__if(): test_a = pygrass.RasterRow(name="test_a") test_a.open(mode="r") test_c = pygrass.RasterRow(name="test_c") test_c.open(mode="w", mtype="CELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) buff_c = pygrass.Buffer(test_a.cols, test_a.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) for col in range(test_a.cols): buff_c[col] = buff_a[col] > 50 test_c.put_row(buff_c) test_a.close() test_c.close()
[docs]def test__RasterRowIO_row_access__add(): test_a = pygrass.RasterRowIO(name="test_a") test_a.open(mode="r") test_b = pygrass.RasterRowIO(name="test_b") test_b.open(mode="r") test_c = pygrass.RasterRowIO(name="test_c") test_c.open(mode="w", mtype="FCELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) buff_b = pygrass.Buffer(test_b.cols, test_b.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) test_b.get_row(row, buff_b) test_c.put_row(buff_a + buff_b) test_a.close() test_b.close() test_c.close()
[docs]def test__RasterRowIO_row_access__if(): test_a = pygrass.RasterRowIO(name="test_a") test_a.open(mode="r") test_c = pygrass.RasterRowIO(name="test_c") test_c.open(mode="w", mtype="CELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) test_c.put_row(buff_a > 50) test_a.close() test_c.close()
[docs]def test__RasterRow_row_access__add(): test_a = pygrass.RasterRow(name="test_a") test_a.open(mode="r") test_b = pygrass.RasterRow(name="test_b") test_b.open(mode="r") test_c = pygrass.RasterRow(name="test_c") test_c.open(mode="w", mtype="FCELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) buff_b = pygrass.Buffer(test_b.cols, test_b.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) test_b.get_row(row, buff_b) test_c.put_row(buff_a + buff_b) test_a.close() test_b.close() test_c.close()
[docs]def test__RasterRow_row_access__if(): test_a = pygrass.RasterRow(name="test_a") test_a.open(mode="r") test_c = pygrass.RasterRow(name="test_c") test_c.open(mode="w", mtype="CELL", overwrite=True) buff_a = pygrass.Buffer(test_a.cols, test_a.mtype) for row in range(test_a.rows): test_a.get_row(row, buff_a) test_c.put_row(buff_a > 50) test_a.close() test_c.close()
[docs]def test__mapcalc__add(): gs.mapcalc("test_c = test_a + test_b", quite=True, overwrite=True)
[docs]def test__mapcalc__if(): gs.mapcalc("test_c = if(test_a > 50, 1, 0)", quite=True, overwrite=True)
[docs]def mytimer(func, runs=1): times = [] t = 0.0 for _ in range(runs): start = time.time() func() end = time.time() times.append(end - start) t = t + end - start return t / runs, times
[docs]def run_benchmark(resolution_list, runs, testdict, profile): regions = [] for resolution in resolution_list: gs.use_temp_region() gs.run_command("g.region", e=50, w=-50, n=50, s=-50, res=resolution, flags="p") # Adjust the computational region for this process region = libgis.Cell_head() libraster.Rast_get_window(ctypes.byref(region)) region.e = 50 region.w = -50 region.n = 50 region.s = -50 region.ew_res = resolution region.ns_res = resolution libgis.G_adjust_Cell_head(ctypes.byref(region), 0, 0) libraster.Rast_set_window(ctypes.byref(region)) libgis.G_set_window(ctypes.byref(region)) # Create two raster maps with random numbers gs.mapcalc("test_a = rand(0, 100)", quite=True, overwrite=True) gs.mapcalc("test_b = rand(0.0, 1.0)", quite=True, overwrite=True) result = collections.OrderedDict() result["res"] = resolution result["cols"] = region.cols result["rows"] = region.rows result["cells"] = region.rows * region.cols result["results"] = copy.deepcopy(testdict) for execmode, operation in result["results"].items(): print(execmode) for oper, operdict in operation.items(): operdict["time"], operdict["times"] = mytimer(operdict["func"], runs) if profile: filename = "{0}_{1}_{2}".format(execmode, oper, profile) cProfile.runctx( operdict["func"].__name__ + "()", globals(), locals(), filename=filename, ) print(" {0}: {1: 40.6f}s".format(oper, operdict["time"])) del operdict["func"] regions.append(result) gs.del_temp_region() return regions
[docs]def get_testlist(loc): testlist = [test for test in list(loc.keys()) if "test" in test[:5]] testlist.sort() return testlist
[docs]def get_testdict(testlist): testdict = collections.OrderedDict() for testfunc in testlist: # import pdb; pdb.set_trace() dummy, execmode, operation = testfunc.split("__") if execmode in list(testdict.keys()): testdict[execmode][operation] = collections.OrderedDict() testdict[execmode][operation]["func"] = loc[testfunc] else: testdict[execmode] = collections.OrderedDict() testdict[execmode][operation] = collections.OrderedDict() testdict[execmode][operation]["func"] = loc[testfunc] return testdict
TXT = """ {% for region in regions %} {{ '#'*60 }} # Benchmark cols = {{ region.cols }} rows = {{ region.rows }} cells = {{ region.cells }} {{ '#'*60 }} # equation: c = a + b {% for execmode, operation in region.results.iteritems() %} {{ "%-30s - %5s % 12.6fs"|format(execmode, 'add', operation.add.time) }} {%- endfor %} # equation: c = if a > 50 then 1 else 0 {% for execmode, operation in region.results.iteritems() %} {{ "%-30s - %5s % 12.6fs"|format(execmode, 'if', operation.if.time) }} {%- endfor %} {%- endfor %} """ CSV = """Class; Mode; Operation; """ RST = """ """ # >>> txt = Template(TxT) # >>> txt.render(name='John Doe')
[docs]def get_txt(results): txt = Template(TXT) return txt.render(regions=results)
# classes for required options strREQUIRED = "required"
[docs]class OptionWithDefault(optparse.Option): ATTRS = optparse.Option.ATTRS + [strREQUIRED] def __init__(self, *opts, **attrs): if attrs.get(strREQUIRED): attrs["help"] = "(Required) " + attrs.get("help", "") optparse.Option.__init__(self, *opts, **attrs)
[docs]class OptionParser(optparse.OptionParser): def __init__(self, **kwargs): kwargs["option_class"] = OptionWithDefault optparse.OptionParser.__init__(self, **kwargs)
[docs] def check_values(self, values, args): for option in self.option_list: if hasattr(option, strREQUIRED) and option.required: if not getattr(values, option.dest): self.error("option {} is required".format(str(option))) return optparse.OptionParser.check_values(self, values, args)
[docs]def main(testdict): """Main function""" # usage usage = "usage: %prog [options] raster_map" parser = OptionParser(usage=usage) # ntime parser.add_option( "-n", "--ntimes", dest="ntime", default=5, type="int", help="Number of run for each test.", ) # res parser.add_option( "-r", "--resolution", action="store", type="string", dest="res", default="1,0.25", help="Resolution list separate by comma.", ) # fmt parser.add_option( "-f", "--fmt", action="store", type="string", dest="fmt", default="txt", help="Choose the output format: 'txt', 'csv', 'rst'.", ) # output parser.add_option( "-o", "--output", action="store", type="string", dest="output", help="The output filename.", ) # store parser.add_option( "-s", "--store", action="store", type="string", dest="store", help="The filename of pickle obj.", ) # profile parser.add_option( "-p", "--profile", action="store", type="string", dest="profile", help="The filename of the profile results.", ) # return options and argument options, args = parser.parse_args() res = [float(r) for r in options.res.split(",")] # res = [1, 0.25, 0.1, 0.05] results = run_benchmark(res, options.ntime, testdict, options.profile) if options.store: import pickle output = open(options.store, "wb") pickle.dump(results, output) output.close() # import pdb; pdb.set_trace() print(get_txt(results))
# add options if __name__ == "__main__": # import pdb; pdb.set_trace() loc = locals() testlist = get_testlist(loc) testdict = get_testdict(testlist) # print_test(testdict) # import pdb; pdb.set_trace() main(testdict)