"""
@package grass.pygrass.messages
@brief PyGRASS message interface
Fast and exit-safe interface to GRASS C-library message functions
(C) 2013-2024 by the GRASS Development Team
This program is free software under the GNU General Public
License (>=v2). Read the file COPYING that comes with GRASS
for details.
@author Soeren Gebbert, Edouard Choinière
"""
from __future__ import annotations
import sys
from multiprocessing import Lock, Pipe, Process
from typing import TYPE_CHECKING, Literal, NoReturn
import grass.lib.gis as libgis
from grass.exceptions import FatalError
if TYPE_CHECKING:
from multiprocessing.connection import Connection
from multiprocessing.context import _LockLike
_MessagesLiteral = Literal[
"INFO", "IMPORTANT", "VERBOSE", "WARNING", "ERROR", "FATAL"
]
[docs]def message_server(lock: _LockLike, conn: Connection) -> NoReturn:
"""The GRASS message server function designed to be a target for
multiprocessing.Process
:param lock: A multiprocessing.Lock
:param conn: A multiprocessing.connection.Connection object obtained from
multiprocessing.Pipe
This function will use the G_* message C-functions from grass.lib.gis
to provide an interface to the GRASS C-library messaging system.
The data that is sent through the pipe must provide an
identifier string to specify which C-function should be called.
The following identifiers are supported:
- "INFO" Prints an info message, see G_message() for details
- "IMPORTANT" Prints an important info message,
see G_important_message() for details
- "VERBOSE" Prints a verbose message if the verbosity level is
set accordingly, see G_verbose_message() for details
- "WARNING" Prints a warning message, see G_warning() for details
- "ERROR" Prints a message with a leading "ERROR: " string,
see G_important_message() for details
- "PERCENT" Prints a percent value based on three integer values: n, d and s
see G_percent() for details
- "STOP" Stops the server function and closes the pipe
- "FATAL" Calls G_fatal_error(), this functions is only for
testing purpose
The data that is sent through the pipe must be a list of values:
- Messages: ["INFO|IMPORTANT|VERBOSE|WARNING|ERROR|FATAL", "MESSAGE"]
- Debug: ["DEBUG", level, "MESSAGE"]
- Percent: ["PERCENT", n, d, s]
"""
libgis.G_debug(1, "Start messenger server")
while True:
# Avoid busy waiting
conn.poll(None)
data = conn.recv()
message_type: Literal[_MessagesLiteral, "DEBUG", "PERCENT", "STOP"] = data[0]
# Only one process is allowed to write to stderr
with lock:
# Stop the pipe and the infinite loop
if message_type == "STOP":
conn.close()
libgis.G_debug(1, "Stop messenger server")
sys.exit()
if message_type == "PERCENT":
n = int(data[1])
d = int(data[2])
s = int(data[3])
libgis.G_percent(n, d, s)
continue
if message_type == "DEBUG":
level = int(data[1])
message_debug = data[2]
libgis.G_debug(level, message_debug)
continue
message: str = data[1]
if message_type == "VERBOSE":
libgis.G_verbose_message(message)
elif message_type == "INFO":
libgis.G_message(message)
elif message_type == "IMPORTANT":
libgis.G_important_message(message)
elif message_type == "WARNING":
libgis.G_warning(message)
elif message_type == "ERROR":
libgis.G_important_message("ERROR: %s" % message)
# This is for testing only
elif message_type == "FATAL":
libgis.G_fatal_error(message)
[docs]class Messenger:
"""Fast and exit-safe interface to GRASS C-library message functions
This class implements a fast and exit-safe interface to the GRASS
C-library message functions like: G_message(), G_warning(),
G_important_message(), G_verbose_message(), G_percent() and G_debug().
Note:
The C-library message functions a called via ctypes in a subprocess
using a pipe (multiprocessing.Pipe) to transfer the text messages.
Hence, the process that uses the Messenger interface will not be
exited, if a G_fatal_error() was invoked in the subprocess.
In this case the Messenger object will simply start a new subprocess
and restarts the pipeline.
Usage:
>>> msgr = Messenger()
>>> msgr.debug(0, "debug 0")
>>> msgr.verbose("verbose message")
>>> msgr.message("message")
>>> msgr.important("important message")
>>> msgr.percent(1, 1, 1)
>>> msgr.warning("Ohh")
>>> msgr.error("Ohh no")
>>> msgr = Messenger()
>>> msgr.fatal("Ohh no no no!")
Traceback (most recent call last):
File "__init__.py", line 239, in fatal
sys.exit(1)
SystemExit: 1
>>> msgr = Messenger(raise_on_error=True)
>>> msgr.fatal("Ohh no no no!")
Traceback (most recent call last):
File "__init__.py", line 241, in fatal
raise FatalError(message)
grass.exceptions.FatalError: Ohh no no no!
>>> msgr = Messenger(raise_on_error=True)
>>> msgr.set_raise_on_error(False)
>>> msgr.fatal("Ohh no no no!")
Traceback (most recent call last):
File "__init__.py", line 239, in fatal
sys.exit(1)
SystemExit: 1
>>> msgr = Messenger(raise_on_error=False)
>>> msgr.set_raise_on_error(True)
>>> msgr.fatal("Ohh no no no!")
Traceback (most recent call last):
File "__init__.py", line 241, in fatal
raise FatalError(message)
grass.exceptions.FatalError: Ohh no no no!
"""
client_conn: Connection
server_conn: Connection
server: Process
def __init__(self, raise_on_error: bool = False) -> None:
self.raise_on_error = raise_on_error
self.client_conn, self.server_conn = Pipe()
self.lock = Lock()
self.server = Process(target=message_server, args=(self.lock, self.server_conn))
self.server.daemon = True
self.server.start()
[docs] def start_server(self) -> None:
"""Start the messenger server and open the pipe"""
self.client_conn, self.server_conn = Pipe()
self.lock = Lock()
self.server = Process(target=message_server, args=(self.lock, self.server_conn))
self.server.daemon = True
self.server.start()
def _check_restart_server(self) -> None:
"""Restart the server if it was terminated"""
if self.server.is_alive() is True:
return
self.client_conn.close()
self.server_conn.close()
self.start_server()
self.warning("Needed to restart the messenger server")
[docs] def message(self, message: str) -> None:
"""Send a message to stderr
:param message: the text of message
G_message() will be called in the messenger server process
"""
self._check_restart_server()
self.client_conn.send(["INFO", message])
[docs] def verbose(self, message: str) -> None:
"""Send a verbose message to stderr
:param message: the text of message
G_verbose_message() will be called in the messenger server process
"""
self._check_restart_server()
self.client_conn.send(["VERBOSE", message])
[docs] def important(self, message: str) -> None:
"""Send an important message to stderr
:param message: the text of message
G_important_message() will be called in the messenger server process
"""
self._check_restart_server()
self.client_conn.send(["IMPORTANT", message])
[docs] def warning(self, message: str) -> None:
"""Send a warning message to stderr
:param message: the text of message
G_warning() will be called in the messenger server process
"""
self._check_restart_server()
self.client_conn.send(["WARNING", message])
[docs] def error(self, message: str) -> None:
"""Send an error message to stderr
:param message: the text of message
G_important_message() with an additional "ERROR:" string at
the start will be called in the messenger server process
"""
self._check_restart_server()
self.client_conn.send(["ERROR", message])
[docs] def fatal(self, message: str) -> NoReturn:
"""Send an error message to stderr, call sys.exit(1) or raise FatalError
:param message: the text of message
This function emulates the behavior of G_fatal_error(). It prints
an error message to stderr and calls sys.exit(1). If raise_on_error
is set True while creating the messenger object, a FatalError
exception will be raised instead of calling sys.exit(1).
"""
self._check_restart_server()
self.client_conn.send(["ERROR", message])
self.stop()
if self.raise_on_error is True:
raise FatalError(message)
sys.exit(1)
[docs] def debug(self, level: int, message: str) -> None:
"""Send a debug message to stderr
:param message: the text of message
G_debug() will be called in the messenger server process
"""
self._check_restart_server()
self.client_conn.send(["DEBUG", level, message])
[docs] def percent(self, n: int, d: int, s: int) -> None:
"""Send a percentage to stderr
:param n: The current element
:param d: Total number of elements
:param s: Increment size
G_percent() will be called in the messenger server process
"""
self._check_restart_server()
self.client_conn.send(["PERCENT", n, d, s])
[docs] def stop(self) -> None:
"""Stop the messenger server and close the pipe"""
if self.server is not None and self.server.is_alive():
self.client_conn.send(
[
"STOP",
]
)
self.server.join(5)
self.server.terminate()
if self.client_conn is not None:
self.client_conn.close()
[docs] def set_raise_on_error(self, raise_on_error: bool = True) -> None:
"""Set the fatal error behavior
:param raise_on_error: if True a FatalError exception will be
raised instead of calling sys.exit(1)
- If raise_on_error == True, a FatalError exception will be raised
if fatal() is called
- If raise_on_error == False, sys.exit(1) will be invoked if
fatal() is called
"""
self.raise_on_error = raise_on_error
[docs] def get_raise_on_error(self) -> bool:
"""Get the fatal error behavior
:returns: True if a FatalError exception will be raised or False if
sys.exit(1) will be called in case of invoking fatal()
"""
return self.raise_on_error
[docs] def test_fatal_error(self, message: str) -> None:
"""Force the messenger server to call G_fatal_error()"""
import time
self._check_restart_server()
self.client_conn.send(["FATAL", message])
time.sleep(1)
[docs]def get_msgr(
instance=[
None,
],
*args,
**kwargs,
) -> Messenger:
"""Return a Messenger instance.
:returns: the Messenger instance.
>>> msgr0 = get_msgr()
>>> msgr1 = get_msgr()
>>> msgr2 = Messenger()
>>> msgr0 is msgr1
True
>>> msgr0 is msgr2
False
"""
if not instance[0]:
instance[0] = Messenger(*args, **kwargs)
return instance[0]
if __name__ == "__main__":
import doctest
doctest.testmod()