from __future__ import annotations
import io
import os
import zipfile
from multiprocessing import RLock
import fnmatch
from scistag.filestag.protocols import ZIP_SOURCE_PROTOCOL
[docs]class SharedArchive:
"""
Defines a shared zip archive which can be used by multiple users, e.g. classes to provide shared data quickly from a
compact archive once initialized.
Usage:
SharedArchive.register("sharedData.zip", "sharedData")
Then data can be loaded flexible via FileStag, independent of if it's located in the web, in a zip archive or
as simple local file:
FileStag.load_file("zip://@sharedData/testFile.zip")
FileStag.load_file("local_file.txt")
FileStag.load_file("https://www....")
Note: Registered zip files have to add an @ in front of their identifier.
"""
access_lock = RLock()
"Multithreading access lock"
archives: dict[str, "SharedArchive"] = {}
"Dictionary of the loaded archives, identifier: SharedArchive"
def __init__(self, source: str | bytes, identifier: str, cache=False):
"""
Initializer
:param source: The source, either a filename or a bytes object
:param identifier: The identifier via which this object can be accessed
:param cache: Defines if this archive shall be cached in memory
"""
self.identifier = identifier
"The archive's unique identifier"
self.access_lock = RLock()
"Access lock (for multi-threading)"
self.filename = ""
"The archive's filename (if loaded from a file), otherwise empty"
if isinstance(source, str):
self.filename = os.path.normpath(source)
if cache:
with open(source, "rb") as source_file:
source = io.BytesIO(source_file.read())
elif isinstance(source, bytes):
source = io.BytesIO(source)
self.zip_file = zipfile.ZipFile(source)
[docs] def close(self):
"""
Closes the archive to unload and not having to wait for the gc
"""
if self.zip_file is not None:
self.zip_file.close()
self.zip_file = None
[docs] def find_files(self, name_filter: str = "*") -> list[str]:
"""
Lists all element from the archive matching given filter
:param name_filter: The filter
:return: The list of found elements
"""
with self.access_lock:
elements = [element.filename for element in self.zip_file.filelist
if
fnmatch.fnmatch(element.filename, name_filter)]
return elements
[docs] def exists(self, name: str) -> bool:
"""
Returns if the file exists
:param name: The file's name
:return: True if it exists
"""
with self.access_lock:
return name in self.zip_file.namelist()
[docs] def read_file(self, name: str) -> bytes | None:
"""
Loads the data from given file to memory
:param name: The name of the file to load
:return: The file's data. None if the file could not be found
"""
with self.access_lock:
if name not in self.zip_file.namelist():
return None
return self.zip_file.open(name, "r").read()
[docs] @classmethod
def register(cls, source: str | bytes, identifier: str,
cache=False) -> "SharedArchive":
"""
Registers a new archive.
:param source: The source, either a filename or a bytes object
:param identifier: The identifier via which this object can be accessed
:param cache: Defines if this archive shall be cached in memory
:return: The archive
"""
assert len(identifier)
with cls.access_lock:
if identifier in cls.archives:
return cls.archives[identifier]
new_archive = SharedArchive(source, identifier, cache)
cls.archives[identifier] = new_archive
return new_archive
[docs] @classmethod
def exists_at_source(cls, identifier: str,
filename: str | None = None) -> bool:
"""
Returns if given file exists
:param identifier: The archive identifier. Alternate: a full identifier
in the form
zip://@identifier/filename
:param filename: The name of the file to load.
:return: True if the file exists
"""
archive: SharedArchive | None = None
if identifier.startswith(ZIP_SOURCE_PROTOCOL):
identifier, filename = cls._split_identifier_and_filename(
identifier)
if identifier.endswith(".zip"):
return cls.check_in_zip_direct(identifier, filename)
with cls.access_lock:
if identifier in cls.archives:
archive = cls.archives[identifier]
if archive is None:
return False
archive: SharedArchive
return archive.exists(filename)
[docs] @classmethod
def load_file(cls, identifier: str,
filename: str | None = None) -> bytes | None:
"""
Loads a file by filename
:param identifier: The archive identifier. Alternate: a full identifier
in the form
zip://@identifier/filename or a path to a zipfile, e.g.
zip://filename.zip/filename
:param filename: The name of the file to load.
:return: The data if the file could be found
"""
archive: SharedArchive | None = None
if identifier.startswith(ZIP_SOURCE_PROTOCOL):
identifier, filename = cls._split_identifier_and_filename(
identifier)
if identifier.endswith(".zip"):
return cls.load_file_from_zip_direct(identifier, filename)
with cls.access_lock:
if identifier in cls.archives:
archive = cls.archives[identifier]
if archive is None:
return None
archive: SharedArchive
return archive.read_file(filename)
[docs] @classmethod
def scan(cls, identifier: str, name_filter: str = "*",
long_identifier=True) -> list[str]:
"""
Scans an archive for a given file mask to search for files of a specific
type
:param identifier: The archive identifier
:param name_filter: The name mask to search for
:param long_identifier: Defines if the scan shall return long
identifiers
(zip://@identifier/filename) so the results can be used for
FileStag.load_file). True by default.
:return: All file in given archive matching the mask
"""
if identifier.startswith(ZIP_SOURCE_PROTOCOL):
identifier = identifier[len(ZIP_SOURCE_PROTOCOL):]
identifier = identifier.lstrip("@").rstrip("/")
archive: SharedArchive | None = None
with cls.access_lock:
if identifier in cls.archives:
archive = cls.archives[identifier]
if archive is None:
return []
archive: SharedArchive
results = archive.find_files(name_filter)
if long_identifier:
results = [f"{ZIP_SOURCE_PROTOCOL}@{identifier}/{element}" for
element in results]
return results
[docs] @classmethod
def is_loaded(cls, filename) -> bool:
"""
Returns if a given zip file was registered
:param filename: The zip file to be removed
:return: True if the archive exists
"""
with cls.access_lock:
for identifier, archive in cls.archives.items():
archive: SharedArchive
if filename == archive.filename:
return True
return False
[docs] @classmethod
def unload(cls, filename: str | None = None,
identifier: str | None = None) -> bool:
"""
Unloads a zip file, e.g. if it's uninstalled
:param filename: The zip file to be removed
:param identifier: The identifier of the archive to unload
:return: True if an archive with given filename could be found and
removed
"""
with cls.access_lock:
for cur_identifier, archive in cls.archives.items():
archive: SharedArchive
if filename is not None and filename == archive.filename or \
identifier is not None and cur_identifier == identifier:
cls.archives[cur_identifier].close()
del cls.archives[cur_identifier]
return True
return False
[docs] @staticmethod
def load_file_from_zip_direct(zip_filename, filename) -> bytes | None:
"""
Loads a file directly from a zip archive
:param zip_filename: The name of the zip file
:param filename: The filename within the zip file
:return: The file's data if it could be found. None otherwise.
"""
with zipfile.ZipFile(zip_filename, "r") as zip_file:
return zip_file.read(filename)
[docs] @staticmethod
def check_in_zip_direct(zip_filename, filename) -> bool:
"""
Verifies if a file exists within a zip file
:param zip_filename: The name of the zip file
:param filename: The filename within the zip file
:return: Returns if the file exists
"""
with zipfile.ZipFile(zip_filename, "r") as zip_file:
return filename in zip_file.namelist()
[docs] @classmethod
def _split_identifier_and_filename(cls, identifier) -> (str, str):
"""
Returns the registered zip archive and the filename within the archive
:param identifier: The provided identifier
:return: identifier or filename of zip file, filename within the archive
"""
identifier = identifier[len(ZIP_SOURCE_PROTOCOL):]
if not identifier.startswith("@"):
identifier_elements = identifier.split(".zip/")
if len(identifier_elements) < 2:
raise ValueError(
"You need to pass the zip's filename followed by a slash "
"and the name of the"
"file within the zip archive.")
archive_name, filename_in_zip = identifier_elements[0:2]
archive_name += ".zip"
return archive_name, filename_in_zip
if "/" not in identifier:
raise ValueError(
"No filename provided. Form: zip://@identifier/filename")
slash_index = identifier.index("/")
filename = identifier[slash_index + 1:].lstrip("/")
identifier = identifier[1:slash_index]
return identifier, filename
__all__ = ["SharedArchive"]