Source code for scistag.addons.addon_manager

import copy
import json
import os.path
import hashlib
from fnmatch import fnmatch
from threading import RLock
from typing import Optional, Dict

import scistag.filestag.protocols
import scistag.filestag.shared_archive

FEATURE_FS_PATH = 'fspath'

ADDON_DATA_FILENAME = os.path.normpath(
    os.path.dirname(__file__) + "/../data/addon_packages.json")
ADDON_DATA_PATH = os.path.dirname(ADDON_DATA_FILENAME)

GROUP_INFO = "info"
GROUP_IGNORE_SET = {"info"}

"Feature name constants"
FEATURE_SIZE = "size"
FEATURE_MD5 = "md5"
FEATURE_INFO = "info"
FEATURE_REMOTE_FILENAMES = "remoteFilenames"
FEATURE_LOCAL_FILENAME = "localFilename"
FEATURE_TEST = "test"

# FileStag access constants
FS_IDENTIFIER_HEADING = "addons."
"""
The shared identifier heading with which all shared zip archives in
FileStag start with
"""
ADDON_FS_BASE_PATH = \
    scistag.filestag.protocols.ZIP_SOURCE_PROTOCOL + \
    f"@{FS_IDENTIFIER_HEADING}"
"""
The path under which the addon's data can be accessed using FileStag.load_file 
and FileStag.exists
"""

MAX_S_PER_MB = 4
"Maximum count of seconds we grant to download a single megabyte. 2 MBit."


[docs]class AddonManager: """ Manages the optional SciStag addons and their installation from the web. """ access_lock = RLock() "Data access lock" addon_data: Optional[Dict] = None "The addon main dictionary" all_addons: Dict = {} "A sorted dictionary of all addons"
[docs] @classmethod def get_addon_data(cls) -> Optional[Dict]: """ Returns the addon dictionary :return: The dictionary. See data/addon_packages.json for details. """ with cls.access_lock: if cls.addon_data is not None: return cls.addon_data cls.addon_data = json.load(open(ADDON_DATA_FILENAME, "r")) cls.build_helper_data() return cls.addon_data
[docs] @classmethod def get_groups(cls) -> Dict: """ Returns a dictionary containing all groups (and their elements) :return: The addon dictionary cleared from (future) features. A single element has the structure: ```javascript "groupName": { "elements" } ``` """ cls.get_addon_data() # ensure data is available filtered_dict = copy.deepcopy(cls.addon_data) return filtered_dict
[docs] @classmethod def get_all_addons(cls): """ Returns a list of all addon packages and their respective unique identifier. This identifier can then be used by install_addon and remove_addon to install or remove a feature. :return: A dictionary containing all installable addons """ cls.get_addon_data() # ensure data is available return cls.all_addons
[docs] @classmethod def get_installed_addons(cls): """ Returns a list of all installed addon packages and their respective unique identifier. This identifier can then be used by install_addon and remove_addon to install or remove a feature. :return: A dictionary containing all installable addons """ result = {} with cls.access_lock: for key, element in cls.get_all_addons().items(): if cls.get_addon_installed(key): result[key] = element return result
[docs] @classmethod def build_helper_data(cls) -> None: """ Fills helper variables such as the all_addons list for easier enumeration """ cls.all_addons = {} for g_key, group in cls.get_groups().items(): group: Dict for f_key, feature in group.items(): if f_key in GROUP_IGNORE_SET: continue cls.all_addons[g_key + "." + f_key] = feature
[docs] @classmethod def get_addon_installed(cls, feature_name) -> bool: """ Returns if an addon with given name is installed :param feature_name: The addon's name, see get_all_addons() :return: True if the addon exists. """ with cls.access_lock: if feature_name not in cls.get_all_addons(): return False feature = cls.all_addons[feature_name] local_path = ADDON_DATA_PATH + "/" + feature[FEATURE_LOCAL_FILENAME] return os.path.exists(local_path)
[docs] @classmethod def get_local_path(cls, feature_name): """ Returns the local path of a feature :param feature_name: The feature's name :return: The feature's path. Empty string if invalid """ if feature_name not in cls.all_addons: return "" feature = cls.all_addons[feature_name] return ADDON_DATA_PATH + "/" + feature[FEATURE_LOCAL_FILENAME]
[docs] @classmethod def get_addon_healthy(cls, feature_name) -> (bool, str): """ Returns if an addon with given name is installed and it's data file healthy :param feature_name: The addon's name, see get_all_addons() :return: (Health status (bool), Error_text) """ with cls.access_lock: if feature_name not in cls.all_addons: return False feature = cls.all_addons[feature_name] local_path = cls.get_local_path(feature_name) if not os.path.exists(local_path): return False, f'Could not find file "{local_path}"' filesize = os.path.getsize(local_path) if filesize != feature[FEATURE_SIZE]: return False, f'File size mismatch/ "{local_path}" ' \ f'should have the size {feature[FEATURE_SIZE]}"' \ f' but has the size {filesize}' with open(local_path, 'rb') as file_handle: md5_hash = hashlib.md5(file_handle.read()).hexdigest() if md5_hash != feature[FEATURE_MD5]: return False, \ f'MD% checksum mismatch/ "{local_path}" ' \ f'should have the MD5 checksum {feature[FEATURE_MD5]}"' \ f' but has the checksum {md5_hash}' return True, "Success"
[docs] @classmethod def install_addon(cls, feature_name, verbose_if_installed=True) -> bool: """ Installs given feature :param feature_name: The feature's key. As defined in :param verbose_if_installed: Defines if no message shall be printed if the addon is already installed :return: """ with cls.access_lock: if cls.get_addon_installed(feature_name): if not verbose_if_installed: print(f"\nAddon addon {feature_name} already installed.", flush=True) return False feature = cls.all_addons[feature_name] local_path = cls.get_local_path(feature_name) os.makedirs(os.path.dirname(local_path), exist_ok=True) main_url = feature[FEATURE_REMOTE_FILENAMES][0] mb_size = feature[FEATURE_SIZE] / (2 ** 20) print( f"\nInstalling addon {feature_name} from {main_url} " f"({mb_size:0.1f} MB)...", flush=True) from scistag.webstag import web_fetch web_fetch(url=main_url, filename=local_path, timeout_s=int(2 + mb_size * MAX_S_PER_MB)) healthy, error = cls.get_addon_healthy(feature_name) if healthy: print("Installation successful", flush=True) else: print(f"Installation failed: {error}", flush=True) return False return True
[docs] @classmethod def remove_addon(cls, feature_name) -> bool: """ Removes given addons, removes all associated files and unregisters in the shared archive registry. :param feature_name: The feature's key. As defined in :return: True if the addon existed and could successfully be removed """ with cls.access_lock: if not cls.get_addon_installed(feature_name): return False local_path = cls.get_local_path(feature_name) print(f"Uninstalling addon {feature_name}...", flush=True) # if the addon was loaded, unload it if FEATURE_FS_PATH in cls.all_addons[feature_name]: from scistag.filestag.shared_archive import SharedArchive SharedArchive.unload(filename=os.path.normpath(local_path)) os.remove(local_path) if os.path.exists(local_path): print( f"Error uninstalling {feature_name}. " f"Data could not be removed.", flush=True) return False else: print(f"Successfully uninstalled {feature_name}.", flush=True) return True
[docs] @classmethod def get_addons_paths(cls, filter_mask: str) -> Dict: """ Returns the addon paths for FileStag for all addons matching given filter_mask, e.g. emoji.* :param filter_mask: The addons to search for. Can contains * and ? placeholders. :return: A dictionary of the format str: str for every matched addon and it's corresponding FileStag path, e.g. `{"emojis.512": "zip://@addons.emojis.512/"}` """ with cls.access_lock: addons = cls.get_all_addons() valid_addons = set( [key for key in addons.keys() if fnmatch(key, filter_mask)]) result = {} for addon_name in valid_addons: if not cls.get_addon_installed(addon_name): continue addon = cls.all_addons[addon_name] fs_path = ADDON_FS_BASE_PATH + addon_name + "/" result[addon_name] = fs_path if len(addon.get(FEATURE_FS_PATH, "")) > 0: continue scistag.filestag.shared_archive.SharedArchive.register( source=cls.get_local_path(addon_name), identifier=FS_IDENTIFIER_HEADING + addon_name) addon[FEATURE_FS_PATH] = fs_path return result