Source code for scistag.remotestag.service_handler

from __future__ import annotations
from threading import RLock
from .service_worker import RemoteWorker
from .service import RemoteService, RemoteParameterTypes
from .service_task import RemoteTask


[docs]class RemoteServiceHandler: """ Orchestrates the single services and their associated execution resources """ default_handler: "RemoteServiceHandler" = None "The singleton default handler" def __init__(self): self._lock = RLock() self._workers: list[RemoteWorker] = [] self._worker_count = 8 self._services: dict[str, RemoteService] = {} self._started = False self._call_counter = 0 "Call id counter" self._todo = [] "Tasks to be executed" self._in_progress = [] "Tasks in progress"
[docs] def register_service(self, service: RemoteService) -> bool: """ Registers a new service :param service: The new service """ identifier = service.get_identifier() with self._lock: if self._started: raise Exception( "Can not register new services once the service handler was started.") if identifier in self._services: return False self._services[identifier] = service return True
[docs] def start(self) -> bool: """ Initiates the handler :return: True on success """ with self._lock: if self._started: return False self._started = True self.__start_workers() return True
[docs] def stop(self) -> bool: """ Stops the handler :return: True on success """ with self._lock: if not self._started: raise Exception("RemoteServiceHandler not started yet") self._started = False self.__stop_workers() return True
[docs] def get_todo(self) -> int: """ Returns the count of tasks waiting on the to do list :return: The count of tasks which did not start yet """ with self._lock: return len(self._todo)
[docs] def get_in_progress(self) -> int: """ Returns the count of tasks which are in progress :return: The count of tasks in progress """ with self._lock: return len(self._in_progress)
[docs] def execute_async(self, identifier: str, parameters: RemoteParameterTypes, timeout_s: float = -1.0) -> RemoteTask: """ Initiates the asynchronous execution of a function :param identifier: The function's identifier :param parameters: The function parameters :param timeout_s: The timeout in seconds. Very recommended in case you access this service from the web. :return: The task to retrieve the result with """ if not isinstance(parameters, dict): parameters = {RemoteService.INPUT_VALUE: parameters} service_found = None for service in self._services.values(): # verify that function is available service: RemoteService if service.provides_function(identifier): service_found = service break if service_found is None: raise Exception("Method not configured") with self._lock: self._call_counter += 1 new_id = self._call_counter task = RemoteTask(new_id, service=service_found, target_function=identifier, parameters=parameters, timeout_s=timeout_s) self._todo.append(task) identifier = service_found.get_identifier() for worker in self._workers: if worker.supports_identifier(identifier): worker.wake_up() return task
[docs] def get_task(self, identifier_set) -> RemoteTask | None: """ Tries to find a suitable task for the list of supported services. Moves the task internal from to do to in progress :param identifier_set: The supported services :return: A new task if one is available. """ with self._lock: for task in self._todo: task: RemoteTask identifier = task.get_service().get_identifier() if identifier in identifier_set: self._todo.remove(task) self._in_progress.append(task) return task return None
[docs] def flag_as_done(self, task: RemoteTask): """ Flags a task as done :param task: The task to flag as done """ with self._lock: if task in self._in_progress: self._in_progress.remove(task)
def __start_workers(self): """ Winds up all worker threads """ single_threaded_identifiers = [] multi_threaded_identifiers = [] for identifier, service in self._services.items(): is_single_threaded = service.get_single_threaded() if is_single_threaded: single_threaded_identifiers.append(identifier) else: multi_threaded_identifiers.append(identifier) for identifier in single_threaded_identifiers: new_worker = RemoteWorker(self, [identifier]) self._workers.append(new_worker) new_worker.start() if len(multi_threaded_identifiers) != 0: for _ in range(self._worker_count): new_worker = RemoteWorker(self, multi_threaded_identifiers) self._workers.append(new_worker) new_worker.start() def __stop_workers(self): """ Stop and join all workers """ for worker in self._workers: worker.terminate() for worker in self._workers: worker.join()
[docs] @classmethod def get_default_handler(cls): """ Returns the default remote handler """ return cls.default_handler
remote_service_handler = RemoteServiceHandler() RemoteServiceHandler.default_handler = remote_service_handler