Source code for pygrass.rpc.base

"""
Fast and exit-safe interface to PyGRASS Raster and Vector layer
using multiprocessing

(C) 2015-2024 by the GRASS Development Team
This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS
for details.

:authors: Soeren Gebbert
"""

from __future__ import annotations

import logging
import sys
import threading
import time
from multiprocessing import Lock, Pipe, Process
from typing import TYPE_CHECKING, NoReturn

from grass.exceptions import FatalError

if TYPE_CHECKING:
    from multiprocessing.connection import Connection
    from multiprocessing.synchronize import _LockLike


logger: logging.Logger = logging.getLogger(__name__)


###############################################################################


[docs]def dummy_server(lock: _LockLike, conn: Connection) -> NoReturn: """Dummy server process :param lock: A multiprocessing.Lock :param conn: A multiprocessing.connection.Connection object obtained from multiprocessing.Pipe """ while True: # Avoid busy waiting conn.poll(None) data = conn.recv() with lock: if data[0] == 0: conn.close() sys.exit() if data[0] == 1: msg = "Server process intentionally killed by exception" raise Exception(msg)
[docs]class RPCServerBase: """This is the base class for send and receive RPC server It uses a Pipe for IPC. >>> import grass.script as gscript >>> from grass.pygrass.rpc.base import RPCServerBase >>> import time >>> provider = RPCServerBase() >>> provider.is_server_alive() True >>> provider.is_check_thread_alive() True >>> provider.stop() >>> time.sleep(1) >>> provider.is_server_alive() False >>> provider.is_check_thread_alive() False >>> provider = RPCServerBase() >>> provider.is_server_alive() True >>> provider.is_check_thread_alive() True Kill the server process with an exception, it should restart >>> provider.client_conn.send([1]) >>> provider.is_server_alive() True >>> provider.is_check_thread_alive() True """ def __init__(self) -> None: self.client_conn: Connection | None = None self.server_conn: Connection | None = None self.queue = None self.server = None self.checkThread: threading.Thread | None = None self.threadLock = threading.Lock() self.start_server() self.start_checker_thread() self.stopThread = False self.stopped = True # logging.basicConfig(level=logging.DEBUG)
[docs] def is_server_alive(self): return self.server.is_alive() if self.server is not None else False
[docs] def is_check_thread_alive(self): return self.checkThread.is_alive() if self.checkThread is not None else False
[docs] def start_checker_thread(self): if self.checkThread is not None and self.checkThread.is_alive(): self.stop_checker_thread() self.checkThread = threading.Thread(target=self.thread_checker) self.checkThread.daemon = True self.stopThread = False self.checkThread.start()
[docs] def stop_checker_thread(self): with self.threadLock: self.stopThread = True if self.checkThread is not None: self.checkThread.join(None)
[docs] def thread_checker(self): """Check every 200 micro seconds if the server process is alive""" while True: time.sleep(0.2) self._check_restart_server(caller="Server check thread") with self.threadLock: if self.stopThread is True: return
[docs] def start_server(self): """This function must be re-implemented in the subclasses""" logger.debug("Start the libgis server") self.client_conn, self.server_conn = Pipe(True) self.lock = Lock() self.server = Process(target=dummy_server, args=(self.lock, self.server_conn)) self.server.daemon = True self.server.start()
[docs] def check_server(self): self._check_restart_server()
def _check_restart_server(self, caller="main thread") -> None: """Restart the server if it was terminated""" logger.debug("Check libgis server restart") with self.threadLock: if self.server is not None and self.server.is_alive() is True: return if self.client_conn is not None: self.client_conn.close() if self.server_conn is not None: self.server_conn.close() self.start_server() if self.stopped is not True: logger.warning( "Needed to restart the libgis server, caller: %(caller)s", {"caller": caller}, ) self.stopped = False
[docs] def safe_receive(self, message): """Receive the data and throw a FatalError exception in case the server process was killed and the pipe was closed by the checker thread""" if logger.isEnabledFor(logging.DEBUG): logger.debug("Receive message: %s", message) try: ret = self.client_conn.recv() if isinstance(ret, FatalError): raise ret return ret except (EOFError, OSError, FatalError) as e: # The pipe was closed by the checker thread because # the server process was killed raise FatalError("Exception raised: " + str(e) + " Message: " + message)
[docs] def stop(self): """Stop the check thread, the libgis server and close the pipe This method should be called at exit using the package atexit """ logger.debug("Stop libgis server") self.stop_checker_thread() if self.server is not None and self.server.is_alive(): if self.client_conn is not None: self.client_conn.send( [ 0, ] ) self.server.terminate() if self.client_conn is not None: self.client_conn.close() self.stopped = True
if __name__ == "__main__": import doctest doctest.testmod()