from __future__ import annotations
import time
from threading import RLock, Event
from .service import RemoteService
from .service import RemoteReturnTypes
RemoteTaskId = int
"Task execution ID type"
[docs]class RemoteTask:
"""
This function handles the execution of a remotely executed function
"""
ERROR = "_error" # Error identifier
def __init__(self, task_id: RemoteTaskId, service: RemoteService,
target_function: str, parameters: dict,
timeout_s: float = -1.0):
"""
:param task_id: The task's unique id
:param target_function: The function to execute
:param parameters: The parameters to pass into the function
:param timeout_s: The timeout in seconds after which this task
automatically gets cancelled.
"""
self._task_id = task_id
"The task's unique id"
self._service = service
"The service which provides the function"
self._target_function: str = target_function
"The identifier of the function to call"
self._access_lock = RLock()
"Data access lock"
self._event = Event()
"The even to be triggered to wake up sleeping receivers"
self._parameters: dict = parameters
"The function's parameters"
self._result: dict | None = None
"The function's result"
self._sleep_interval = 0.0
"Interval of sleep"
self.timeout_s = timeout_s
"The timeout time"
self._deprecation_time = -1.0 if self.timeout_s == -1.0 else time.time() + timeout_s
[docs] def get_service(self) -> RemoteService:
"""
Returns the service being required for this task
:return: The service
"""
return self._service
[docs] def get_id(self) -> RemoteTaskId:
"""
Returns the task's unique ID
:return: Unique task ID
"""
return self._task_id
[docs] def get_target_function(self) -> str:
"""
Returns the target function
:return: The function's identifier
"""
return self._target_function
[docs] def get_parameters(self):
"""
Returns the parameters passed into the function
:return: The parameters
"""
return self._parameters
[docs] def get_result(self) -> dict | None:
"""
Returns the task's result
:return: The result data (if it's available already)
"""
with self._access_lock:
return self._result
[docs] def get_error(self) -> str | None:
"""
Returns the error string if an error occurred
:return: The error string
"""
with self._access_lock:
if self._result is None:
return None
if self.ERROR in self._result:
return self._result[self.ERROR]
return None
[docs] def get_deprecation_time(self) -> float:
"""
Returns the time when this task becomes invalid
:return: The deprecation time (see time.time()). If -1 the task does not deprecate.
"""
return self._deprecation_time
[docs] def assign_result(self, result):
"""
Assigns the result to the task
:param result: The task's result
"""
with self._access_lock:
self._result = result
self._event.set()
[docs] def assign_error(self, error: str):
"""
Assigns an error to the result
:param error: The error string
"""
with self._access_lock:
self._result = {self.ERROR: error}
self._event.set()
[docs] def wait(self, timeout_s=-1) -> bool:
"""
Waits for the finishing of the execution up to a given timeout
:param timeout_s: The maximum waiting time in seconds
:return: True if the data is available
"""
start_time = time.time()
done = False
while True:
if self._event.wait(self._sleep_interval):
self._event.clear()
with self._access_lock:
if self._result is not None:
done = True
break
cur_time = time.time()
if timeout_s != -1 and cur_time > start_time + timeout_s:
return done
return done
[docs] def unwrap(self) -> RemoteReturnTypes | None:
"""
Unwraps the result dictionary to a single value if it just contains a
single value
:return: A dictionary in case of multiple return values otherwise the
single value
"""
with self._access_lock:
if self._result is None:
return None
if len(self._result) == 1 and RemoteService.RESULT_VALUE in self._result:
return self._result[RemoteService.RESULT_VALUE]
return self._result