Source code for grass.pygrass.rpc.base
"""
Fast and exit-safe interface to PyGRASS Raster and Vector layer
using multiprocessing
(C) 2015 by the GRASS Development Team
This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS
for details.
:authors: Soeren Gebbert
"""
from grass.exceptions import FatalError
import time
import threading
import sys
from multiprocessing import Process, Lock, Pipe
import logging
###############################################################################
[docs]def dummy_server(lock, conn):
"""Dummy server process
:param lock: A multiprocessing.Lock
:param conn: A multiprocessing.Pipe
"""
while True:
# Avoid busy waiting
conn.poll(None)
data = conn.recv()
lock.acquire()
if data[0] == 0:
conn.close()
lock.release()
sys.exit()
if data[0] == 1:
raise Exception("Server process intentionally killed by exception")
lock.release()
[docs]class RPCServerBase:
"""This is the base class for send and receive RPC server
It uses a Pipe for IPC.
>>> import grass.script as gscript
>>> from grass.pygrass.rpc.base import RPCServerBase
>>> import time
>>> provider = RPCServerBase()
>>> provider.is_server_alive()
True
>>> provider.is_check_thread_alive()
True
>>> provider.stop()
>>> time.sleep(1)
>>> provider.is_server_alive()
False
>>> provider.is_check_thread_alive()
False
>>> provider = RPCServerBase()
>>> provider.is_server_alive()
True
>>> provider.is_check_thread_alive()
True
Kill the server process with an exception, it should restart
>>> provider.client_conn.send([1])
>>> provider.is_server_alive()
True
>>> provider.is_check_thread_alive()
True
"""
def __init__(self):
self.client_conn = None
self.server_conn = None
self.queue = None
self.server = None
self.checkThread = None
self.threadLock = threading.Lock()
self.start_server()
self.start_checker_thread()
self.stopThread = False
self.stopped = True
# logging.basicConfig(level=logging.DEBUG)
[docs] def is_server_alive(self):
return self.server.is_alive()
[docs] def is_check_thread_alive(self):
return self.checkThread.is_alive()
[docs] def start_checker_thread(self):
if self.checkThread is not None and self.checkThread.is_alive():
self.stop_checker_thread()
self.checkThread = threading.Thread(target=self.thread_checker)
self.checkThread.daemon = True
self.stopThread = False
self.checkThread.start()
[docs] def stop_checker_thread(self):
self.threadLock.acquire()
self.stopThread = True
self.threadLock.release()
self.checkThread.join(None)
[docs] def thread_checker(self):
"""Check every 200 micro seconds if the server process is alive"""
while True:
time.sleep(0.2)
self._check_restart_server(caller="Server check thread")
self.threadLock.acquire()
if self.stopThread is True:
self.threadLock.release()
return
self.threadLock.release()
[docs] def start_server(self):
"""This function must be re-implemented in the subclasses"""
logging.debug("Start the libgis server")
self.client_conn, self.server_conn = Pipe(True)
self.lock = Lock()
self.server = Process(target=dummy_server, args=(self.lock, self.server_conn))
self.server.daemon = True
self.server.start()
[docs] def check_server(self):
self._check_restart_server()
def _check_restart_server(self, caller="main thread"):
"""Restart the server if it was terminated"""
logging.debug("Check libgis server restart")
self.threadLock.acquire()
if self.server.is_alive() is True:
self.threadLock.release()
return
self.client_conn.close()
self.server_conn.close()
self.start_server()
if self.stopped is not True:
logging.warning(
"Needed to restart the libgis server, caller: %s" % (caller)
)
self.threadLock.release()
self.stopped = False
[docs] def safe_receive(self, message):
"""Receive the data and throw a FatalError exception in case the server
process was killed and the pipe was closed by the checker thread"""
logging.debug("Receive message: {message}")
try:
ret = self.client_conn.recv()
if isinstance(ret, FatalError):
raise ret
return ret
except (EOFError, OSError, FatalError) as e:
# The pipe was closed by the checker thread because
# the server process was killed
raise FatalError("Exception raised: " + str(e) + " Message: " + message)
[docs] def stop(self):
"""Stop the check thread, the libgis server and close the pipe
This method should be called at exit using the package atexit
"""
logging.debug("Stop libgis server")
self.stop_checker_thread()
if self.server is not None and self.server.is_alive():
self.client_conn.send(
[
0,
]
)
self.server.terminate()
if self.client_conn is not None:
self.client_conn.close()
self.stopped = True
if __name__ == "__main__":
import doctest
doctest.testmod()