Source code for scistag.remotestag.service

from __future__ import annotations
from typing import Callable, Union
from threading import RLock
from .service_function import RemoteFunction

RemoteCallback = Callable[[dict], dict]
"Defines a callable service function"
RemoteParameterTypes = Union[dict, str, float, bool, int, bytes]
"""
Defines the valid parameter types to be passed into a function.
Single parameters get wrapped into a dict with a single key named _value
"""
RemoteReturnTypes = Union[dict, str, float, bool, int, bytes]
"""
Defines the valid return types. Single values will be stored in the single 
key _resultValue in a dictionary
"""


[docs]class RemoteService: """ Defines a service hosted on this machine. A service consists of one or multiple services sharing the same data. """ INPUT_VALUE = "_value" "Key for single input value in the parameter dictionary" RESULT_VALUE = "_resultValue" "Key for the result value in the dictionary" def __init__(self, identifier: str, multithreading: bool = True): """ :param identifier: The service's unique identifier in the form com.company.ai.inference.yolov2. A service's name is not allowed to be contained within another service's name. :param multithreading: Defines if this data can be accessed from multiple threads or is using thread local data (such as many ML libraries). """ self._identifier = identifier "The service's identifier" self._multithreading = multithreading "Defines if the service is multithreading capable" self._functions: dict[str, Union[RemoteFunction, RemoteCallback]] = {} "Set of all registered functions" self._lock = RLock() "Data access lock" self._started = False "Defines the service was already stared"
[docs] def provides_function(self, identifier: str) -> bool: """ Returns if the function is known :param identifier: The function identifier :return: True on success """ return identifier in self._functions.keys()
[docs] def get_identifier(self) -> str: """ Returns the service's identifier :return: The identifier string """ return self._identifier
[docs] def get_single_threaded(self) -> bool: """ Returns True if this service does NOT support multithreading and needs it's own worker thread :return: True if not multithreading capable """ return not self._multithreading
[docs] def register_function(self, function: RemoteFunction) -> bool: """ Registers a new function. Functions can only be registered before the service was started. :param function: The function to register :return True on success: """ with self._lock: if self._started: raise Exception( "Service already started, can not register additional functions.") function_identifier = function.get_full_identifier() if function_identifier in self._functions: return False self._functions[function_identifier] = function return True
[docs] def register_callback(self, name: str, callback: RemoteCallback) -> bool: """ Registers a new function. Functions can only be registered before the service was started. :param name: The function's name :param callback: The callback function to be called :return True on success: """ with self._lock: if self._started: raise Exception( "Service already started, can not register additional functions.") name = f"{self._identifier}.{name}" if name in self._functions: return False self._functions[name] = callback return True
[docs] def run_task(self, function_name: str, parameters: RemoteParameterTypes, unwrap=False) -> RemoteReturnTypes: """ Executes a task :param function_name: The function's name :param parameters: The function's parameters. Either as base type for a single parameter or as dictionary :param unwrap: Defines if a single value result shall not be wrapped into a dictionary :return: The function's results """ if not isinstance(parameters, dict): parameters = {self.INPUT_VALUE: parameters} function = None with self._lock: if not self._started: self.initialize() self._started = True if self._identifier not in function_name: function_name = f"{self._identifier}.{function_name}" if function_name in self._functions: function = self._functions[function_name] if function is None: return {"error": f"Unknown function: {function_name}"} if isinstance(function, RemoteFunction): function: RemoteFunction result = function.run(parameters) else: function: RemoteCallback result = function(parameters) if not isinstance(result, dict): result = {"_resultValue": result} return result if not unwrap else self.unwrap(result)
[docs] @classmethod def unwrap(cls, result: dict) -> RemoteReturnTypes: """ Unwraps a potentially single value result :param result: The original function result :return: The single value if there is just one element, otherwise the dictionary """ if cls.RESULT_VALUE in result and len(result) == 1: return result[cls.RESULT_VALUE] return result
[docs] def initialize(self): """ Overwrite this method with your initialization code. If this service is thread bound it's guaranteed to be executed on the later execution thread. """ pass
[docs] def deinitialize(self): """ Overwrite this method with your deinitialization code. If this service is thread bound it's guaranteed to be executed on the execution thread. """ pass