Source code for scistag.filestag.file_source

"""
Implements the :class:`FileSource` class and it's essential helper classes.

The FileSource class lets you easily iterate through large amounts of files
stored locally, on disk, in zip archives, in cloud storages and even zip
archives in cloud storages.
"""

from __future__ import annotations

import os
from abc import abstractmethod
from collections import Counter
from dataclasses import dataclass
from fnmatch import fnmatch
from typing import Callable, Union, Any

import pandas as pd
from pydantic import BaseModel

from scistag.filestag.bundle import Bundle
from scistag.filestag.protocols import AZURE_PROTOCOL_HEADER
from scistag.filestag.file_stag import FileStag

CACHE_VERSION = "cache_version"


[docs]class FileSourceElement: """ Provides the data of a single file returned from a :class:`FileSource` """ def __init__(self, data: bytes, name: str): """ :param data: The file's content """ self.data = data "Holds the file's data" self.name = name "The file's name. Usually relative to it's search path"
[docs]class FileListEntry(BaseModel): """ Defines a single entry in the file list """ filename: str "The file's name" file_size: int "The file's size in bytes"
[docs]class FileListModel(BaseModel): """ Defines a list of files storable in a database """ user_version = 1 """ The user definable version number. If the version of the stored data does not match the one passed, the list is considered being invalid """ format_version: int = 1 """ The format version """ files: list[FileListEntry] """ The file data """
FileList = list[FileListEntry]
[docs]class FileSourceIterator: """ Iterator providing the data from a file source """ def __init__(self, source: "FileSource"): """ :param source: The file source to provide the data for """ self.source = source "The FileSource which created this iterator" self.processing_data = {} """ Additional, user defined parameters you can store here to make them accessible to your callback for example """ self.file_index = 0 "The index of all found files (including skipped ones)" self.processed_file_count = 0 "The index of all really processed files" self.current_file_size = 0 """ The size of the file which is currently being handled. Not available for all file sources. (0 in that case) """ def __next__(self) -> FileSourceElement | None: """ Requests the next data from the file source :return: The data object """ result = self.source.handle_next(self) if result is None: raise StopIteration return result
[docs]@dataclass class FileIterationData: """ Provides the data to filter single file entries """ file_source: "FileSource" "The :class:`FileSource` object for which the decision is made" file_index: int "The file's index" filename: str "The file's name" file_size: int "The file's size"
FilterCallback = Callable[[FileIterationData], Union[bool, str]] """ Shall verify if a function shall be handled or ignored. Parameters: * The file iteration data describing the current file to handle. See :class:`FileIterationData`. Return: * True if the file shall be processed, False if not. Alternatively a string if the file shall be processed but renamed. """
[docs]class FileSource: """ Base class for an iterable file source to batch process file lists such as directories, zip archives or cloud storages at small and large scale using a unified interface. """ # noinspection PyUnusedLocal def __init__(self, search_mask: str = "*", search_path: str = "", recursive: bool = True, filter_callback: FilterCallback | None = None, index_filter: tuple[int, int] | None = None, fetch_file_list: bool = False, max_file_count: int = -1, file_list_name: str | tuple[str, int] | None = None, max_web_cache_age: float = 0.0, dont_load=False, sorting_callback: Callable[ [FileListEntry], Any] | None = None): """ For a detailed parameter description see :meth:`from_path` """ self.search_mask = search_mask """ The search mask to match the filenames against before they are returned """ self.search_path = search_path "The path to search within, e.g. a file path" self.recursive = recursive "Defines if the search shall be executed recursive" self.filter_callback = filter_callback """ The filter function which will be called for each file to verify if it shall be processed """ self.user_data = {} "The user data for further customization, e.g. of the filter callback" self.index_filter: tuple[int, int] | None = index_filter """ The index filter helps splitting a processing task to multiple, threads nodes and/or processes. See initializer parameter. """ self._file_list: FileList | None = None """ A sorted list of all to files (if available e.g. by setting fetch_file_list=True). Note that settings such as :attr:`index_filter` and :attr:`max_file_count` have no effect on the file_list by default. You can though explicitly call the method :meth:`reduce_file_list` which will execute all filters in advance to provide you the final file_list and will disable these variable afterwards. """ self.file_set = None """ A set containing all known files. Only valid if file_list is available too """ self.output_filename_list: list[str] | None = None """ If defined it provides the output filenames for every file in self.file_list. """ self.max_file_count = max_file_count """ The maximum number of files to process. (excluding the impact of :attr:`index_filter` """ self.is_closed = False "Defines if this file source was closed" self._statistics: dict | None = None "The statistics, only available when all files were iterated" self.dont_load = dont_load """ If set to true the iterator ``for element in FileSource`` will not fetch the file's content but just iterate through it's filenames """ self._file_list_name = None "The name of the file from which the file list shall be loaded" self._file_list_version: int = -1 """ The version of the file list to assume. If it mismatches the stored version it will be replaced """ self.sorting_callback = sorting_callback if sorting_callback is not None and not fetch_file_list: raise ValueError("Sorting is only supported in combination w/" " fetch_file_list=True") """ A function to be called (and pass into sorted) to sort the file list before it's stored. """ if file_list_name is not None: if isinstance(file_list_name, tuple): self._file_list_name, self._file_list_version = file_list_name else: self._file_list_name = file_list_name self.max_web_cache_age = max_web_cache_age
[docs] @staticmethod def from_source(source: str | bytes, search_mask: str = "*", search_path: str = "", recursive: bool = True, filter_callback: FilterCallback | None = None, sorting_callback: \ Callable[[FileListEntry], Any] | None = None, index_filter: tuple[int, int] | None = None, fetch_file_list: bool = False, max_file_count: int = -1, file_list_name: str | tuple[str, int] | None = None, max_web_cache_age: float = 0.0, dont_load=False) -> FileSource | None: """ Auto-detects the required FileSource implementation for a given source path :param source: The path you would like to iterate. The following path types are currently supported: * /home/aDirectory: Will return a FileSourceDisk object to iterate through a directory's content * /home/myZipArchive.zip: Will return a FileSourceZip object to iterate through a zip archive * azure://DefaultEndpointsProtocol=https;AccountName=...;AccountKey=.../container/path: Will iterate to an Azure Blob Storage. * A bytes object: Detects the source type and opens it. At the moment only zip archive data ia supported. :param search_mask: The file name filter mask :param search_path: The search path, e.g. directory name or relative path to the zip root, storage root etc. :param recursive: Defines if the search shall be executed recursive. True by default. :param filter_callback: A callback function to call for each file to verify if it shall be processed or ignored. See :const:`FilterCallback` :param sorting_callback: A function to be called (and pass into sorted) to sort the file list before it is stored. Is called for every element and has to return the sorting value, either a string, float or another size comparable data type. Does only work in combination with fetch_file_list. :param index_filter: The index filter helps splitting a processing task to multiple, threads nodes and/or processes. The first tuple element defines the total worker count, the second tuple element the current worker index (0 .. worker_count-1). If you want to for example process a zip archive by 4 threads in parallel just spawn 4 threads and pass (4,0) to the first, (4,1) to the second (4,2) to the third and (4,3) to the third. All four threads can then work in parallel and store their processed data parallel into one or multiple FileSinks which are (at least in most cases) multi-thread safe. :param fetch_file_list: If set to true the FileSource will try to iterate all filenames in advance. This is recommended especially if you are using sources where it's not guaranteed that the file names will always be provided in the same order and you intend to share a task among multiple threads to guarantee a consistent behavior. :param file_list_name: If provided the the file list will be stored in given file so that the files do not need to be iterated over and over again each run (which can save a lot of time). You can either pass a string, just containing the file name or a tuple of (filename, version) so you can enforce replacing the list when ever you pass a new version number. :param max_file_count: The maximum number of files to process (excluding the index filter's impact) :param max_web_cache_age: The count of seconds for how long files from this source may be stored and received from the cache if this source is remote, e.g. Azure, AWS. :param dont_load: If set to true the iterator will not provide the file's content but just iterate the filenames. Helpful if the consumer for example requires a path to files stored on disk. :return: The FileSource implementation for your path. None if the path can not be identified. """ params = {"search_mask": search_mask, "search_path": search_path, "recursive": recursive, "filter_callback": filter_callback, "index_filter": index_filter, "fetch_file_list": fetch_file_list, "file_list_name": file_list_name, "max_web_cache_age": max_web_cache_age, "max_file_count": max_file_count, "sorting_callback": sorting_callback, "dont_load": dont_load} if isinstance(source, bytes): from scistag.filestag.file_source_zip import FileSourceZip return FileSourceZip(source=source, **params) if source.startswith(AZURE_PROTOCOL_HEADER): from scistag.filestag.azure.azure_storage_file_source import \ AzureStorageFileSource return AzureStorageFileSource(source=source, **params) if not (source.endswith("/") or source.endswith( "\\")) and source.endswith(".zip"): from scistag.filestag.file_source_zip import FileSourceZip return FileSourceZip(source=source, **params) if source.__contains__("://"): raise NotImplementedError("Unknown protocol for FileSource") from scistag.filestag.file_source_disk import FileSourceDisk return FileSourceDisk(path=source, **params)
[docs] @abstractmethod def _get_source_identifier(self) -> str: """ Has to return a unique identifier for this file source which identifies the name of this source in the cache database. Can for example be the search path and the search mask or parts of the connection string. :return: The unique identifier """ return ""
[docs] def get_file_list(self) -> FileList | None: """ Returns the file list (if available). Note that the file list is not available for all file sources. Pass fetch_file_list = true to the initializer of all supported FileSources to fetch the list in advance. :return: The list of filenames and their size (so far known). """ return self._file_list
[docs] def get_file_list_as_df(self) -> "pd.DataFrame": """ Returns the file list as dataframe :return: The file list """ file_list = [entry.dict() for entry in self._file_list] return pd.DataFrame(file_list)
[docs] def encode_file_list(self, version: int = -1) -> bytes: """ Encodes the file list so it can be stored on disk :param version: The user defined version number. It can be passed to enforce updating the list when ever this number is changed. If -1 is passed the version is ignored. :return: The encoded file list """ df = self.get_file_list_as_df() data = Bundle.bundle( {"version": 1, "data": df, CACHE_VERSION: version}, compression=0) return data
[docs] def load_file_list(self, source: bytes | str, version: int = -1) -> bool: """ Tries to load the file list from file :param source: The file list source. Any FileStag compatible data source. :param version: The user defined version number. It can be passed to enforce updating the list when ever this number is changed. If -1 is passed the version is ignored. :return: True if a valid list could be loaded. """ if not isinstance(source, bytes): source = FileStag.load(source) if source is None: return False data = Bundle.unpack(source) assert isinstance(data, dict) and data.get("version") == 1 if version != -1 and data.get(CACHE_VERSION, -1) != version: return False df: pd.DataFrame = data['data'] key_list = df.columns.to_list() self._file_list = [ FileListEntry.parse_obj(dict(zip(key_list, cur_element))) for cur_element in df.itertuples(index=False, name=None) ] return True
[docs] def save_file_list(self, target: str, version: int = -1): """ Saves the file list to a file so it can be quickly restored after a restart of the application. :param target: The FileStag compatible file target, e.g. a local file name :param version: The user defined version number. It can be passed to enforce updating the list when ever this number is changed. If -1 is passed the version is ignored. """ FileStag.save(target, self.encode_file_list(version=version))
[docs] def set_file_list(self, new_list: list[str] | list[FileListEntry]): """ Sets a custom file list provided by the user. Helpful for large jobs where the total file list is split into several working packages in advance and the shares need to be customized. :param new_list: The new list to be assigned. Either a list of "FileListEntry"s with all details or a list of filenames """ if len(new_list) and isinstance(new_list[0], str): lst = [FileListEntry(filename=element, file_size=-1) for element in new_list] else: lst = new_list self.update_file_list(lst)
[docs] def get_statistics(self) -> dict | None: """ Returns statistics about the file source if available. Requires a valid file list, see :meth:`get_file_list`. :return: Dictionary with statistics about file types, total size etc. """ if self._file_list is None: return None if self._statistics is not None: return self._statistics file_type_counter = Counter() size_by_filetype = Counter() dir_names = set() total_size = 0 for cur_file in self._file_list: cur_name = cur_file.filename dir_name = os.path.dirname(cur_name) extension = os.path.splitext(cur_name)[1] dir_names.add(dir_name) file_type_counter[extension] += 1 size_by_filetype[extension] += cur_file.file_size total_size += cur_file.file_size file_extension_list = sorted( list(file_type_counter.keys()), reverse=True, key=lambda x: file_type_counter[x]) sorted_keys = sorted(file_type_counter.keys(), reverse=True, key=lambda x: file_type_counter[x]) ext_details = { key: { "totalFileSizeMb": size_by_filetype[key] / 1000000.0, "totalFileCount": file_type_counter[key]} for key in sorted_keys} self._statistics = {"totalFileCount": len(self._file_list), "totalFileSizeMb": total_size / 1000000, "totalDirs": len(dir_names), "fileExtensions": file_extension_list, "extensionDetails": ext_details} return self._statistics
def __str__(self): result = self.__class__.__name__ + "\n" statistics = self.get_statistics() from scistag.common.dict_helper import dict_to_bullet_list if statistics is not None: result += dict_to_bullet_list(statistics) result += f"* search-mask: {self.search_mask}" return result def __iter__(self) -> FileSourceIterator: """ Returns an iterator object for this file source :return: The iterator """ return FileSourceIterator(self) def __enter__(self) -> "FileSource": """ Provides the FileSource context. Allows automated clean closing of a file source via ``with FileSource.from_path("./my_folder") as source`` as once the with with-block is left it will automatically call the source's :meth:`close` function. :return: The FileSource object """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ Closes the context (and in consequence open connections, archives etc.) """ if not self.is_closed: self.close()
[docs] @abstractmethod def _read_file_int(self, filename: str) -> bytes | None: """ Reads a file from this file source, identified by name. Note: Not all FileSources support direct file access by name, so you should always prefer to just iterate through a FileSource object rather than accessing single files if your FileSource can be freely configured. For example an :class:`ImageFileSource` pointing to a camera can only provide it's data frame by frame - and thus image file by image file - and not by name. :param filename: The name of the file to read :return: The file's content on success, None otherwise """ return None
[docs] def read_file(self, filename: str) -> bytes | None: """ Reads a file from this file source, identified by name. Note: Not all FileSources support direct file access by name, so you should always prefer to just iterate through a FileSource object rather than accessing single files if your FileSource can be freely configured. For example an :class:`ImageFileSource` pointing to a camera can only provide it's data frame by frame - and thus image file by image file - and not by name. :param filename: The name of the file to read :return: The file's content on success, None otherwise """ from scistag.webstag import WebCache if self.max_web_cache_age != 0: # try to fetch data if cache is on unique_name = self._get_source_identifier() + "/" + filename try: data = WebCache.fetch(unique_name, max_age=self.max_web_cache_age) except: data = None if data is not None: return data result = self._read_file_int(filename) if self.max_web_cache_age != 0: # store new data if cache is on unique_name = self._get_source_identifier() + "/" + filename WebCache.store(unique_name, result) return result
[docs] def exists(self, filename: str) -> bool: """ Verifies if a file exists. Note: This function may not be supported by all sources (such as streaming sources) :param filename: The file to look for :return: True if the file exists """ if self.file_set is not None: return filename in self.file_set raise NotImplementedError("Missing implementation for exists method")
[docs] def update_file_list(self, new_list: list[FileListEntry]): """ Call this function if you want to manually update the file list. Updates the internal search index and other helper variables. :param new_list: The new list """ self._file_list = new_list if self.sorting_callback is not None: # apply sorting self._file_list = sorted(self._file_list, key=self.sorting_callback) self.file_set = set([element.filename for element in new_list]) self._statistics = None
[docs] def reduce_file_list(self) -> list[FileListEntry] | None: """ Reduces the :attr:`file_list` by applying all filters (index_range, max_file_count, filter_callback) in advance. Requires the source being initialized with fetch_file_list in advance and thus requires a non-streaming file source where the full file list is known in advance. This way you know in advance which files (after all the filters) are really getting processed with your current filtering settings. So the filters are not applied twice this function also disables all callbacks and filter variables after it's execution. :return: Returns the reduced file list """ if self._file_list is None: return None output_filenames = [] cleaned_list = [] for index, element in enumerate(self._file_list): file_info = FileIterationData(self, index, element.filename, element.file_size) new_filename = self.handle_skip_check(file_info) if new_filename is None: continue cleaned_list.append(element) output_filenames.append(new_filename) if self.max_file_count != -1 and len( cleaned_list) > self.max_file_count: cleaned_list = cleaned_list[0:self.max_file_count] output_filenames = output_filenames[0:self.max_file_count] self.output_filename_list = output_filenames self.update_file_list(cleaned_list) self.max_file_count = -1 self.index_filter = None self.filter_callback = None return self._file_list
[docs] def handle_next(self, iterator: FileSourceIterator) -> FileSourceElement | None: """ Returns the next available element :param iterator: The iterator object which keeps track of the current processing :return: The next file object if available """ if (self.max_file_count != -1 and iterator.processed_file_count >= self.max_file_count): raise StopIteration while True: next_file = self.handle_get_next_filename(iterator) if next_file is None: # stop if no files are available anymore return None filename, filesize = next_file # was already filtered using reduce_file_list if self.output_filename_list is not None: target_name = self.output_filename_list[iterator.file_index - 1] else: target_name = self.handle_skip_check( FileIterationData(self, iterator.file_index - 1, filename, filesize)) # continue if just the current file is skipped if target_name is not None: break data = self.read_file(filename) if not self.dont_load else None return self.handle_provide_result(iterator, target_name, data)
[docs] def handle_get_next_filename(self, iterator: "FileSourceIterator") -> \ tuple[str, int] | None: """ Returns the filename and the file size of the next file to be processed. Overwrite this method for your own, custom File iterator. :param iterator: The file iterator object :return: Name and size of the next element as tuple """ index = iterator.file_index iterator.file_index += 1 if self._file_list is None: return None if index >= len(self._file_list): return None return self._file_list[index].filename, self._file_list[index].file_size
[docs] def handle_fetch_file_list(self, force: bool = False) -> None: """ Called when the file list shall be pre-fetched. If your custom FileSource is able to do so populate the self.file_list with a sorted list of all files available and instead of iterating the files live always access the matching file list entry using self.file_list[file_index] appropriately. :param force: Enforce an update of the file list, even if it was created before already """ pass
[docs] def handle_file_list_filter(self, filename: str) -> bool: """ Verifies if the file is valid and shall be processed by comparing it to the file mask, the index_filter etc. Increases the file_index upon failure. Does NOT increase it upon success (as :meth:`provide_result` will do so). :param filename: The file's name :return: A valid filename if the file shall be processed, None otherwise. """ if not fnmatch(os.path.basename(filename), self.search_mask): return False if len(self.search_path) > 0 and not filename.startswith( self.search_path): return False rest = filename[len(self.search_path):].lstrip("/").lstrip("\\") if not self.recursive: if "/" in rest or "\\" in rest: return False return True
[docs] def handle_skip_check(self, file_info: FileIterationData) -> str | None: """ Verifies if the file is valid and shall be processed by comparing it to the file mask, the index_filter etc. Increases the file_index upon failure. Does NOT increase it upon success (as :meth:`provide_result` will do so). :param file_info: Information about the current file :return: A valid filename if the file shall be processed, None otherwise. """ filename = file_info.filename if self.index_filter is not None: if (file_info.file_index % self.index_filter[0] != self.index_filter[1]): return None if self.filter_callback is not None: result = self.filter_callback(file_info) if result is None: return None if isinstance(result, bool): if not result: return None else: filename = result return filename
# noinspection PyMethodMayBeStatic
[docs] def handle_provide_result(self, iterator: FileSourceIterator, filename: str, data: bytes) -> FileSourceElement: """ Provides the file result for the current iterator index :param iterator: The iterator handle :param filename: The name of the file to be stored :param data: The file data """ iterator.processed_file_count += 1 return FileSourceElement(data=data, name=filename)
[docs] def close(self): """ Closes the current file source, e.g. zip archive, streaming connection etc. if applicable """ self.is_closed = True