From f974e254cb40e39e227382a15d49c9383d5a252b Mon Sep 17 00:00:00 2001 From: Kennedy Kori Date: Wed, 27 Mar 2024 13:49:05 +0300 Subject: [PATCH] feat: add common utils --- docs/conf.py | 37 +++ src/sghi/etl/commons/__init__.py | 26 ++ src/sghi/etl/commons/processors.py | 283 +++++++++++++++-- src/sghi/etl/commons/sinks.py | 336 ++++++++++++++++++++ src/sghi/etl/commons/sources.py | 96 +++++- src/sghi/etl/commons/workflow_builder.py | 285 ++++++++++++++++- src/sghi/etl/commons/workflow_definition.py | 100 ++++++ 7 files changed, 1110 insertions(+), 53 deletions(-) create mode 100644 src/sghi/etl/commons/sinks.py create mode 100644 src/sghi/etl/commons/workflow_definition.py diff --git a/docs/conf.py b/docs/conf.py index 4f1f7c8..33f4a1c 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -56,8 +56,45 @@ nitpicky = True nitpick_ignore = [ + ("py:class", "_PDT"), # private type annotations + ("py:class", "_RDT"), # private type annotations + ("py:class", "Processor"), # docs aren't published yet + ("py:class", "Sink"), # docs aren't published yet + ("py:class", "Source"), # docs aren't published yet + ("py:class", "TracebackType"), # Used as type annotation. Only available when type checking + ("py:class", "WorkflowDefinition"), # docs aren't published yet + ("py:class", "concurrent.futures._base.Executor"), # sphinx can't find it ("py:class", "concurrent.futures._base.Future"), # sphinx can't find it + ("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations + ("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations + ("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations + ("py:class", "sghi.etl.commons.sinks._RDT"), # private type annotations + ("py:class", "sghi.etl.commons.sources._PDT"), # private type annotations + ("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations ("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations + ("py:class", "sghi.etl.commons.workflow_builder._PDT"), # private type annotations + ("py:class", "sghi.etl.commons.workflow_builder._RDT"), # private type annotations + ("py:class", "sghi.etl.commons.workflow_definition._PDT"), # private type annotations + ("py:class", "sghi.etl.commons.workflow_definition._RDT"), # private type annotations + ("py:class", "sghi.etl.core._RDT"), # private type annotations + ("py:class", "sghi.etl.core._PDT"), # private type annotations + ("py:class", "sghi.etl.core.Processor"), # docs aren't published yet + ("py:class", "sghi.etl.core.Sink"), # docs aren't published yet + ("py:class", "sghi.etl.core.Source"), # docs aren't published yet + ("py:class", "sghi.etl.core.WorkflowDefinition"), # docs aren't published yet + ("py:class", "sghi.retry.Retry"), # docs aren't published yet + ("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet + ("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet + ("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations + ("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations + ("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations + ("py:obj", "sghi.etl.commons.sinks._RDT"), # private type annotations + ("py:obj", "sghi.etl.commons.sources._PDT"), # private type annotations + ("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations + ("py:obj", "sghi.etl.commons.workflow_builder._PDT"), # private type annotations + ("py:obj", "sghi.etl.commons.workflow_builder._RDT"), # private type annotations + ("py:obj", "sghi.etl.commons.workflow_definition._PDT"), # private type annotations + ("py:obj", "sghi.etl.commons.workflow_definition._RDT"), # private type annotations ] templates_path = ["templates"] diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index b89986d..f9ec81d 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -1 +1,27 @@ """Collection of utilities for working with SGHI ETL Worflows.""" + +from .processors import ( + NoOpProcessor, + ProcessorPipe, + ProcessorSet, + ScatterProcessor, + processor, +) +from .sinks import NullSink, ScatterSink, SinkSet, sink +from .sources import SourceSet, source +from .workflow_builder import WorkflowBuilder + +__all__ = [ + "NoOpProcessor", + "NullSink", + "ProcessorPipe", + "ProcessorSet", + "ScatterProcessor", + "ScatterSink", + "SinkSet", + "SourceSet", + "WorkflowBuilder", + "processor", + "sink", + "source", +] diff --git a/src/sghi/etl/commons/processors.py b/src/sghi/etl/commons/processors.py index f18591a..2c3ec69 100644 --- a/src/sghi/etl/commons/processors.py +++ b/src/sghi/etl/commons/processors.py @@ -3,18 +3,22 @@ from __future__ import annotations import logging -from collections.abc import Callable, Sequence +from collections.abc import Callable, Iterable, Sequence +from concurrent.futures import Executor, Future, ThreadPoolExecutor from contextlib import ExitStack from logging import Logger -from typing import TYPE_CHECKING, Any, Generic, TypeVar +from typing import TYPE_CHECKING, Any, Final, Generic, TypeVar from attrs import define, field, validators from typing_extensions import override from sghi.disposable import not_disposed from sghi.etl.core import Processor -from sghi.task import Task, pipe -from sghi.utils import ensure_not_none, type_fqn +from sghi.retry import Retry, noop_retry +from sghi.task import ConcurrentExecutor, Task, pipe, task +from sghi.utils import ensure_not_none, ensure_predicate, type_fqn + +from .utils import fail_fast if TYPE_CHECKING: from typing import Self @@ -30,8 +34,20 @@ _RDT = TypeVar("_RDT") """Raw Data Type.""" +_T = TypeVar("_T") + _ProcessorCallable = Callable[[_RDT], _PDT] +_ResultGatherer = Callable[[Iterable[Future[_T]]], Iterable[_T]] + + +# ============================================================================= +# TYPES +# ============================================================================= + + +_OF_CALLABLE_LOGGER_PREFIX: Final[str] = f"{__name__}.@processor" + # ============================================================================= # DECORATORS @@ -46,9 +62,10 @@ def processor(f: Callable[[_RDT], _PDT]) -> Processor[_RDT, _PDT]: :return: A ``Processor`` instance. - :raise ValueError: If ``f`` is ``None``. + :raise ValueError: If the given value is ``None`` or not a ``Callable``. """ - ensure_not_none(f, "'f' MUST not be None.") + ensure_not_none(f, message="The given callable MUST not be None.") + ensure_predicate(callable(f), message="A callable object is required.") return _ProcessorOfCallable(callable=f) @@ -58,6 +75,38 @@ def processor(f: Callable[[_RDT], _PDT]) -> Processor[_RDT, _PDT]: # ============================================================================= +@define +class NoOpProcessor(Processor[_RDT, _RDT], Generic[_RDT]): + """:class:`Processor` that returns received data as is.""" + + _is_disposed: bool = field(default=False, init=False) + _logger: Logger = field(init=False, repr=False) + + def __attrs_post_init__(self) -> None: # noqa: D105 + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + + @not_disposed + @override + def __enter__(self) -> Self: + return super(Processor, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def apply(self, raw_data: _RDT) -> _RDT: + self._logger.debug("Skipping data processing. Return raw data as is.") + return raw_data + + @override + def dispose(self) -> None: + self._is_disposed = True + self._logger.debug("Disposal complete.") + + @define class ProcessorPipe(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): """:class:`Processor` that pipes multiple ``Processors`` together.""" @@ -74,6 +123,12 @@ class ProcessorPipe(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): validators.min_len(1), ], ) + _retry_policy_factory: Callable[[], Retry] = field( + alias="retry_policy_factory", + default=noop_retry, + repr=False, + validator=validators.is_callable(), + ) _is_disposed: bool = field(default=False, init=False) _logger: Logger = field(init=False, repr=False) _exit_stack: ExitStack = field(factory=ExitStack, init=False, repr=False) @@ -85,8 +140,8 @@ class ProcessorPipe(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): def __attrs_post_init__(self) -> None: # noqa: D105 self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) - # Prepare processors for execution by ensuring that they are all - # disposed properly once this object is disposed. + # Prepare nested processors for execution by ensuring that they are + # all disposed of properly once this object is disposed. self._prepped_processors = [ self._processor_to_task(self._exit_stack.push(_processor)) for _processor in self._processors @@ -95,9 +150,7 @@ def __attrs_post_init__(self) -> None: # noqa: D105 @not_disposed @override def __enter__(self) -> Self: - super(Processor, self).__enter__() - self._exit_stack.__enter__() - return self + return super(Processor, self).__enter__() @property @override @@ -106,8 +159,8 @@ def is_disposed(self) -> bool: @not_disposed @override - def process(self, raw_data: _RDT) -> _PDT: - self._logger.info("Processing %s.", str(raw_data)) + def apply(self, raw_data: _RDT) -> _PDT: + self._logger.debug("Piping received data through all processors.") return pipe(*self._prepped_processors)(raw_data) @@ -117,25 +170,175 @@ def dispose(self) -> None: self._exit_stack.close() self._logger.debug("Disposal complete.") - @staticmethod - def _processor_to_task(p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]: - def do_process(raw_data: _RDT) -> _PDT: + def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]: + @task + def do_apply(raw_data: _RDT) -> _PDT: with p as _p: - return _p.process(raw_data) + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data) - return Task.of_callable(do_process) + return do_apply @define -class NoOpProcessor(Processor[_RDT, _RDT], Generic[_RDT]): - """:class:`Processor` that returns received data as is.""" +class ProcessorSet( + Processor[Sequence[_RDT], Sequence[_PDT]], + Generic[_RDT, _PDT], +): + """:class:`Processor` composed of other processors.""" + + _processors: Sequence[Processor[_RDT, _PDT]] = field( + alias="processor", + converter=tuple, + repr=False, + validator=[ + validators.deep_iterable( + iterable_validator=validators.instance_of(Sequence), + member_validator=validators.instance_of(Processor), + ), + validators.min_len(1), + ], + ) + _retry_policy_factory: Callable[[], Retry] = field( + alias="retry_policy_factory", + default=noop_retry, + repr=False, + validator=validators.is_callable(), + ) + _executor_factor: Callable[[], Executor] = field( + alias="executor_factory", + default=ThreadPoolExecutor, + repr=False, + validator=validators.is_callable(), + ) + _result_gatherer: _ResultGatherer[_PDT] = field( + alias="result_gatherer", + default=fail_fast, + init=False, + repr=False, + validator=validators.is_callable(), + ) + _is_disposed: bool = field(default=False, init=False) + _logger: Logger = field(init=False, repr=False) + _executor: ConcurrentExecutor[Sequence[_RDT], _PDT] = field( + init=False, + repr=False, + ) + _exit_stack: ExitStack = field(factory=ExitStack, init=False, repr=False) + + def __attrs_post_init__(self) -> None: # noqa: D105 + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + # Prepare nested processors for execution by ensuring that they are + # all disposed of properly once this object is disposed. + prepped_processors = ( + self._processor_to_task(self._exit_stack.push(_processor), _i) + for _i, _processor in enumerate(self._processors) + ) + # Schedule the processors for concurrent execution later. + self._executor = ConcurrentExecutor( + *prepped_processors, + executor=self._executor_factor(), + ) + + @not_disposed + @override + def __enter__(self) -> Self: + return super(Processor, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @override + def apply(self, raw_data: Sequence[_RDT]) -> Sequence[_PDT]: + self._logger.debug( + "Distributing processing to all available processors.", + ) + + assert len(raw_data) == len(self._processors) + with self._executor as executor: + futures = executor.execute(raw_data) + + return tuple(self._result_gatherer(futures)) + + @override + def dispose(self) -> None: + self._is_disposed = True + self._exit_stack.close() + self._executor.dispose() + self._logger.debug("Disposal complete.") + + def _processor_to_task( + self, + p: Processor[_RDT, _PDT], + i: int, + ) -> Task[Sequence[_RDT], _PDT]: + @task + def do_apply(raw_data: Sequence[_RDT]) -> _PDT: + with p as _p: + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data[i]) + + return do_apply + + +@define +class ScatterProcessor(Processor[_RDT, Sequence[_PDT]], Generic[_RDT, _PDT]): + """:class:`Processor` that 'fans-out' its input to other processors.""" + + _processors: Sequence[Processor[_RDT, _PDT]] = field( + alias="processors", + converter=tuple, + repr=False, + validator=[ + validators.deep_iterable( + iterable_validator=validators.instance_of(Sequence), + member_validator=validators.instance_of(Processor), + ), + validators.min_len(1), + ], + ) + _retry_policy_factory: Callable[[], Retry] = field( + alias="retry_policy_factory", + default=noop_retry, + repr=False, + validator=validators.is_callable(), + ) + _executor_factor: Callable[[], Executor] = field( + alias="executor_factory", + default=ThreadPoolExecutor, + repr=False, + validator=validators.is_callable(), + ) + _result_gatherer: _ResultGatherer[_PDT] = field( + alias="result_gatherer", + default=fail_fast, + init=False, + repr=False, + validator=validators.is_callable(), + ) _is_disposed: bool = field(default=False, init=False) _logger: Logger = field(init=False, repr=False) + _executor: ConcurrentExecutor[_RDT, _PDT] = field(init=False, repr=False) + _exit_stack: ExitStack = field(factory=ExitStack, init=False, repr=False) def __attrs_post_init__(self) -> None: # noqa: D105 self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + # Prepare nested processors for execution by ensuring that they are + # all disposed of properly once this object is disposed. + prepped_processors = ( + self._processor_to_task(self._exit_stack.push(_processor)) + for _processor in self._processors + ) + # Schedule the processors for concurrent execution later. + self._executor = ConcurrentExecutor( + *prepped_processors, + executor=self._executor_factor(), + ) + @not_disposed @override def __enter__(self) -> Self: @@ -148,15 +351,32 @@ def is_disposed(self) -> bool: @not_disposed @override - def process(self, raw_data: _RDT) -> _RDT: - self._logger.debug("Skipping data processing. Return raw data as is.") - return raw_data + def apply(self, raw_data: _RDT) -> Sequence[_PDT]: + self._logger.debug( + "Scattering received data to all available processors." + ) + + with self._executor as executor: + futures = executor.execute(raw_data) + + return tuple(self._result_gatherer(futures)) @override def dispose(self) -> None: self._is_disposed = True + self._exit_stack.close() + self._executor.dispose() self._logger.debug("Disposal complete.") + def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]: + @task + def do_apply(raw_data: _RDT) -> _PDT: + with p as _p: + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data) + + return do_apply + @define class _ProcessorOfCallable(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): @@ -166,9 +386,16 @@ class _ProcessorOfCallable(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): ) _is_disposed: bool = field(default=False, init=False) _logger: Logger = field(init=False, repr=False) + __wrapped__: _ProcessorCallable[_RDT, _PDT] = field( + init=False, + repr=False, + ) def __attrs_post_init__(self) -> None: - self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + self._logger: Logger = logging.getLogger( + f"{_OF_CALLABLE_LOGGER_PREFIX}({type_fqn(self._callable)})" + ) + self.__wrapped__ = self._callable @not_disposed @override @@ -182,12 +409,8 @@ def is_disposed(self) -> bool: @not_disposed @override - def process(self, raw_data: _RDT) -> _PDT: - self._logger.debug( - "Invoking '%s' to process %s.", - type_fqn(self._callable), - str(raw_data), - ) + def apply(self, raw_data: _RDT) -> _PDT: + self._logger.debug("Delegating to '%s'.", type_fqn(self._callable)) return self._callable(raw_data) @override diff --git a/src/sghi/etl/commons/sinks.py b/src/sghi/etl/commons/sinks.py new file mode 100644 index 0000000..c1e9d7c --- /dev/null +++ b/src/sghi/etl/commons/sinks.py @@ -0,0 +1,336 @@ +"""Common :class:`~sghi.etl.core.Sink` implementations.""" + +from __future__ import annotations + +import logging +from collections.abc import Callable, Iterable, Sequence +from concurrent.futures import Executor, Future, ThreadPoolExecutor +from contextlib import ExitStack +from logging import Logger +from typing import TYPE_CHECKING, Final, Generic, TypeVar + +from attrs import define, field, validators +from typing_extensions import override + +from sghi.disposable import not_disposed +from sghi.etl.core import Sink +from sghi.retry import Retry, noop_retry +from sghi.task import ConcurrentExecutor, Task, task +from sghi.utils import ensure_not_none, ensure_predicate, type_fqn + +from .utils import fail_fast + +if TYPE_CHECKING: + from typing import Self + +# ============================================================================= +# TYPES +# ============================================================================= + + +_PDT = TypeVar("_PDT") +"""Processed Data Type.""" + +_T = TypeVar("_T") + +_ResultGatherer = Callable[[Iterable[Future[_T]]], Iterable[_T]] + +_SinkCallable = Callable[[_PDT], None] + + +# ============================================================================= +# TYPES +# ============================================================================= + + +_OF_CALLABLE_LOGGER_PREFIX: Final[str] = f"{__name__}.@sink" + + +# ============================================================================= +# DECORATORS +# ============================================================================= + + +def sink(f: Callable[[_PDT], None]) -> Sink[_PDT]: + """Mark/Decorate a ``Callable`` as a :class:`Sink`. + + :param f: The callable to be decorated. The callable *MUST* have at *MOST* + one required argument (the processed data to be consumed). + + :return: A ``Sink`` instance. + + :raise ValueError: If the given value is ``None`` or not a ``Callable``. + """ + ensure_not_none(f, message="The given callable MUST not be None.") + ensure_predicate(callable(f), message="A callable object is required.") + + return _SinkOfCallable(callable=f) + + +# ============================================================================= +# SINK IMPLEMENTATIONS +# ============================================================================= + + +@define +class NullSink(Sink[_PDT], Generic[_PDT]): + """:class:`Sink` that discards all the data it receives.""" + + _is_disposed: bool = field(default=False, init=False) + _logger: Logger = field(init=False, repr=False) + + def __attrs_post_init__(self) -> None: # noqa: D105 + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + + @not_disposed + @override + def __enter__(self) -> Self: + return super(Sink, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def drain(self, processed_data: _PDT) -> None: + self._logger.debug("Discarding all received data.") + # Do nothing, discard all received data. + ... + + @override + def dispose(self) -> None: + self._is_disposed = True + self._logger.debug("Disposal complete.") + + +@define +class ScatterSink(Sink[_PDT], Generic[_PDT]): + """:class:`Sink` that 'fans-out' its input to other sinks.""" + + _sinks: Sequence[Sink[_PDT]] = field( + alias="sinks", + converter=tuple, + repr=False, + validator=[ + validators.deep_iterable( + iterable_validator=validators.instance_of(Sequence), + member_validator=validators.instance_of(Sink), + ), + validators.min_len(1), + ], + ) + _retry_policy_factory: Callable[[], Retry] = field( + alias="retry_policy_factory", + default=noop_retry, + repr=False, + validator=validators.is_callable(), + ) + _executor_factor: Callable[[], Executor] = field( + alias="executor_factory", + default=ThreadPoolExecutor, + repr=False, + validator=validators.is_callable(), + ) + _result_gatherer: _ResultGatherer[None] = field( + alias="result_gatherer", + default=fail_fast, + init=False, + repr=False, + validator=validators.is_callable(), + ) + _is_disposed: bool = field(default=False, init=False) + _logger: Logger = field(init=False, repr=False) + _executor: ConcurrentExecutor[_PDT, None] = field(init=False, repr=False) + _exit_stack: ExitStack = field(factory=ExitStack, init=False, repr=False) + + def __attrs_post_init__(self) -> None: # noqa: D105 + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + + # Prepare nested sinks for execution by ensuring that they are all + # disposed of properly once this object is disposed. + prepped_sinks = ( + self._sink_to_task(self._exit_stack.push(_sink)) + for _sink in self._sinks + ) + # Schedule the sinks for concurrent execution later. + self._executor = ConcurrentExecutor( + *prepped_sinks, + executor=self._executor_factor(), + ) + + @not_disposed + @override + def __enter__(self) -> Self: + return super(Sink, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @override + def drain(self, processed_data: _PDT) -> None: + self._logger.debug("Scattering received data to all available sinks.") + + with self._executor as executor: + futures = executor.execute(processed_data) + + self._result_gatherer(futures) + + @override + @override + def dispose(self) -> None: + self._is_disposed = True + self._exit_stack.close() + self._executor.dispose() + self._logger.debug("Disposal complete.") + + def _sink_to_task(self, s: Sink[_PDT]) -> Task[_PDT, None]: + @task + def do_drain(processed_data: _PDT) -> None: + with s as _s: + drain = self._retry_policy_factory().retry(_s.drain) + return drain(processed_data) + + return do_drain + + +@define +class SinkSet(Sink[Sequence[_PDT]], Generic[_PDT]): + """:class:`Sink` composed of other sinks.""" + + _sinks: Sequence[Sink[_PDT]] = field( + alias="sinks", + converter=tuple, + repr=False, + validator=[ + validators.deep_iterable( + iterable_validator=validators.instance_of(Sequence), + member_validator=validators.instance_of(Sink), + ), + validators.min_len(1), + ], + ) + _retry_policy_factory: Callable[[], Retry] = field( + alias="retry_policy_factory", + default=noop_retry, + repr=False, + validator=validators.is_callable(), + ) + _executor_factor: Callable[[], Executor] = field( + alias="executor_factory", + default=ThreadPoolExecutor, + repr=False, + validator=validators.is_callable(), + ) + _result_gatherer: _ResultGatherer[None] = field( + alias="result_gatherer", + default=fail_fast, + init=False, + repr=False, + validator=validators.is_callable(), + ) + _is_disposed: bool = field(default=False, init=False) + _logger: Logger = field(init=False, repr=False) + _executor: ConcurrentExecutor[Sequence[_PDT], None] = field( + init=False, + repr=False, + ) + _exit_stack: ExitStack = field(factory=ExitStack, init=False, repr=False) + + def __attrs_post_init__(self) -> None: # noqa: D105 + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + + # Prepare nested sinks for execution by ensuring that they are all + # disposed of properly once this object is disposed. + prepped_sinks = ( + self._sink_to_task(self._exit_stack.push(_sink), _i) + for _i, _sink in enumerate(self._sinks) + ) + # Schedule the sinks for concurrent execution later. + self._executor = ConcurrentExecutor( + *prepped_sinks, + executor=self._executor_factor(), + ) + + @not_disposed + @override + def __enter__(self) -> Self: + return super(Sink, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @override + def drain(self, processed_data: Sequence[_PDT]) -> None: + self._logger.debug("Distributing consumption to all available sinks.") + + assert len(processed_data) == len(self._sinks) + + with self._executor as executor: + futures = executor.execute(processed_data) + + self._result_gatherer(futures) + + @override + @override + def dispose(self) -> None: + self._is_disposed = True + self._exit_stack.close() + self._executor.dispose() + self._logger.debug("Disposal complete.") + + def _sink_to_task( + self, + s: Sink[_PDT], + i: int, + ) -> Task[Sequence[_PDT], None]: + @task + def do_drain(processed_data: Sequence[_PDT]) -> None: + with s as _s: + drain = self._retry_policy_factory().retry(_s.drain) + return drain(processed_data[i]) + + return do_drain + + +@define +class _SinkOfCallable(Sink[_PDT], Generic[_PDT]): + _callable: _SinkCallable[_PDT] = field( + alias="callable", + validator=validators.is_callable(), + ) + _is_disposed: bool = field(default=False, init=False) + _logger: Logger = field(init=False, repr=False) + __wrapped__: _SinkCallable[_PDT] = field(init=False, repr=False) + + def __attrs_post_init__(self) -> None: + self._logger: Logger = logging.getLogger( + f"{_OF_CALLABLE_LOGGER_PREFIX}({type_fqn(self._callable)})" + ) + self.__wrapped__ = self._callable + + @not_disposed + @override + def __enter__(self) -> Self: + return super(Sink, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def drain(self, processed_data: _PDT) -> None: + self._logger.debug("Delegating to '%s'.", type_fqn(self._callable)) + self._callable(processed_data) + + @override + def dispose(self) -> None: + self._is_disposed = True + self._logger.debug("Disposal complete.") diff --git a/src/sghi/etl/commons/sources.py b/src/sghi/etl/commons/sources.py index a2d2cf3..4dbbdc7 100644 --- a/src/sghi/etl/commons/sources.py +++ b/src/sghi/etl/commons/sources.py @@ -3,17 +3,22 @@ from __future__ import annotations import logging -from collections.abc import Callable, Sequence +from collections.abc import Callable, Iterable, Sequence +from concurrent.futures import Executor, Future, ThreadPoolExecutor from contextlib import ExitStack from logging import Logger -from typing import TYPE_CHECKING, Generic, TypeVar +from typing import TYPE_CHECKING, Final, Generic, TypeVar from attrs import define, field, validators from typing_extensions import override from sghi.disposable import not_disposed from sghi.etl.core import Source -from sghi.utils import ensure_not_none, type_fqn +from sghi.retry import Retry, noop_retry +from sghi.task import ConcurrentExecutor, Task, task +from sghi.utils import ensure_callable, type_fqn + +from .utils import fail_fast if TYPE_CHECKING: from typing import Self @@ -26,24 +31,37 @@ _RDT = TypeVar("_RDT") """Raw Data Type.""" +_T = TypeVar("_T") + +_ResultGatherer = Callable[[Iterable[Future[_T]]], Iterable[_T]] + _SourceCallable = Callable[[], _RDT] +# ============================================================================= +# TYPES +# ============================================================================= + + +_OF_CALLABLE_LOGGER_PREFIX: Final[str] = f"{__name__}.@source" + + # ============================================================================= # DECORATORS # ============================================================================= -def data_source(f: Callable[[], _RDT]) -> Source[_RDT]: - """Mark a ``Callable`` as a :class:`Source`. +def source(f: Callable[[], _RDT]) -> Source[_RDT]: + """Mark/Decorate a ``Callable`` as a :class:`Source`. :param f: The callable to be decorated. The callable *MUST* not have any required arguments but *MUST* return a value (the drawn data). - :return: A ``DataSource`` instance. - :raise ValueError: If ``f`` is ``None``. + :return: A ``Source`` instance. + + :raise ValueError: If the given value is ``None`` or not a ``Callable``. """ - ensure_not_none(f, "'f' MUST not be None.") + ensure_callable(f, message="A callable object is required.") return _SourceOfCallable(callable=f) @@ -69,19 +87,49 @@ class SourceSet(Source[Sequence[_RDT]], Generic[_RDT]): validators.min_len(1), ], ) + _retry_policy_factory: Callable[[], Retry] = field( + alias="retry_policy_factory", + default=noop_retry, + repr=False, + validator=validators.is_callable(), + ) + _executor_factor: Callable[[], Executor] = field( + alias="executor_factory", + default=ThreadPoolExecutor, + repr=False, + validator=validators.is_callable(), + ) + _result_gatherer: _ResultGatherer[_RDT] = field( + alias="result_gatherer", + default=fail_fast, + init=False, + repr=False, + validator=validators.is_callable(), + ) _is_disposed: bool = field(default=False, init=False) _logger: Logger = field(init=False, repr=False) + _executor: ConcurrentExecutor[None, _RDT] = field(init=False, repr=False) _exit_stack: ExitStack = field(factory=ExitStack, init=False, repr=False) def __attrs_post_init__(self) -> None: # noqa: D105 self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + # Prepare sources for execution by ensuring that they are all + # disposed properly once this object is disposed. + prepped_sources = ( + self._source_to_task(self._exit_stack.push(_source)) + for _source in self._sources + ) + # Schedule the sources for concurrent execution later. + self._executor = ConcurrentExecutor( + *prepped_sources, + executor=self._executor_factor(), + ) + @not_disposed @override def __enter__(self) -> Self: - super(Source, self).__enter__() - self._exit_stack.__enter__() - return self + return super(Source, self).__enter__() @property @override @@ -90,14 +138,30 @@ def is_disposed(self) -> bool: @not_disposed @override - def draw(self) -> Sequence[_RDT]: ... + def draw(self) -> Sequence[_RDT]: + self._logger.debug("Aggregating data from all available sources.") + + with self._executor as executor: + futures = executor.execute(None) + + return tuple(self._result_gatherer(futures)) @override def dispose(self) -> None: self._is_disposed = True self._exit_stack.close() + self._executor.dispose() self._logger.debug("Disposal complete.") + def _source_to_task(self, s: Source[_RDT]) -> Task[None, _RDT]: + @task + def do_draw(_: None) -> _RDT: + with s as _s: + draw = self._retry_policy_factory().retry(_s.draw) + return draw() + + return do_draw + @define class _SourceOfCallable(Source[_RDT], Generic[_RDT]): @@ -107,9 +171,13 @@ class _SourceOfCallable(Source[_RDT], Generic[_RDT]): ) _is_disposed: bool = field(default=False, init=False) _logger: Logger = field(init=False, repr=False) + __wrapped__: _SourceCallable[_RDT] = field(init=False, repr=False) def __attrs_post_init__(self) -> None: - self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + self._logger: Logger = logging.getLogger( + f"{_OF_CALLABLE_LOGGER_PREFIX}({type_fqn(self._callable)})" + ) + self.__wrapped__ = self._callable @not_disposed @override @@ -124,7 +192,7 @@ def is_disposed(self) -> bool: @not_disposed @override def draw(self) -> _RDT: - self._logger.debug("Drawing from '%s'.", type_fqn(self._callable)) + self._logger.debug("Delegating to '%s'.", type_fqn(self._callable)) return self._callable() @override diff --git a/src/sghi/etl/commons/workflow_builder.py b/src/sghi/etl/commons/workflow_builder.py index d7ec53a..1c9a262 100644 --- a/src/sghi/etl/commons/workflow_builder.py +++ b/src/sghi/etl/commons/workflow_builder.py @@ -1,11 +1,22 @@ +"""A :class:`WorkflowDefinition` builder class.""" + from __future__ import annotations -from typing import TYPE_CHECKING, Generic, TypeVar +from collections.abc import Callable, Sequence +from typing import TYPE_CHECKING, Any, Generic, TypeVar + +from attrs import define, field, validators + +from sghi.etl.core import Processor, Sink, Source +from sghi.utils import ensure_not_none, ensure_predicate -from attrs import define +from .processors import NoOpProcessor, ScatterProcessor +from .sinks import NullSink, ScatterSink +from .sources import SourceSet +from .workflow_definition import SimpleWorkflowDefinitions if TYPE_CHECKING: - from sghi.etl.core import Processor, Sink, Source, WorkflowDefinition + from sghi.etl.core import WorkflowDefinition # ============================================================================= # TYPES @@ -18,6 +29,21 @@ _RDT = TypeVar("_RDT") """Raw Data Type.""" +_CompositeProcessorFactory = Callable[ + [Sequence[Processor[Any, Any]]], + Processor[_RDT, _PDT], +] + +_CompositeSourceFactory = Callable[[Sequence[Source[Any]]], Source[_RDT]] + +_CompositeSinkFactory = Callable[[Sequence[Sink[Any]]], Sink[_PDT]] + +_ProcessorFactory = Callable[[], Processor[_RDT, _PDT]] + +_SinkFactory = Callable[[], Sink[_PDT]] + +_SourceFactory = Callable[[], Source[_RDT]] + # ============================================================================= # WORKFLOW BUILDER @@ -26,17 +52,258 @@ @define class WorkflowBuilder(Generic[_RDT, _PDT]): + """A builder of :class:`workflow definitions `.""" + + id: str = field( + validator=[validators.instance_of(str), validators.min_len(2)], + ) + name: str = field(validator=validators.instance_of(str)) + description: str | None = field( + default=None, + kw_only=True, + validator=validators.optional(validator=validators.instance_of(str)), + ) + source_factories: Sequence[_SourceFactory[_RDT]] | None = field( + default=None, + kw_only=True, + repr=False, + validator=validators.optional( + validators.deep_iterable( + iterable_validator=validators.instance_of(Sequence), + member_validator=validators.is_callable(), + ), + ), + ) + processor_factories: Sequence[_ProcessorFactory[_RDT, _PDT]] | None = ( + field( + default=None, + kw_only=True, + repr=False, + validator=validators.optional( + validators.deep_iterable( + iterable_validator=validators.instance_of(Sequence), + member_validator=validators.is_callable(), + ), + ), + ) + ) + sink_factories: Sequence[_SinkFactory[_PDT]] | None = field( + default=None, + kw_only=True, + repr=True, + validator=validators.optional( + validators.deep_iterable( + iterable_validator=validators.instance_of(Sequence), + member_validator=validators.is_callable(), + ), + ), + ) + default_processor_factory: _ProcessorFactory[_RDT, _PDT] = field( + default=NoOpProcessor, + kw_only=True, + repr=True, + validator=validators.is_callable(), + ) + default_sink_factory: _SinkFactory[_PDT] = field( + default=NullSink, + kw_only=True, + repr=True, + validator=validators.is_callable(), + ) + composite_source_factory: _CompositeSourceFactory[_RDT] = field( + default=SourceSet, + kw_only=True, + repr=True, + validator=validators.is_callable(), + ) + composite_processor_factory: _CompositeProcessorFactory[_RDT, _PDT] = ( + field( + default=ScatterProcessor, + kw_only=True, + repr=True, + validator=validators.is_callable(), + ) + ) + composite_sink_factory: _CompositeSinkFactory[_PDT] = field( + default=ScatterSink, + kw_only=True, + repr=True, + validator=validators.is_callable(), + ) + _source_factories: list[_SourceFactory[_RDT]] = field( + factory=list, init=False, repr=False + ) + _processor_factories: list[_ProcessorFactory[_RDT, _PDT]] = field( + factory=list, + init=False, + repr=False, + ) + _sink_factories: list[_SinkFactory[_PDT]] = field( + factory=list, + init=False, + repr=False, + ) + + def __attrs_post_init__(self) -> None: # noqa: D105 + self._source_factories.extend(self.source_factories or ()) + self._processor_factories.extend(self._processor_factories or ()) + self._sink_factories.extend(self._sink_factories or ()) def __call__(self) -> WorkflowDefinition[_RDT, _PDT]: + """Create a :class:`WorkflowDefinition` instance. + + Delegates the actual call to :meth:`build`. + + :return: A new ``WorkflowDefinition`` instance. + """ return self.build() - def build(self) -> WorkflowDefinition[_RDT, _PDT]: ... + def build(self) -> WorkflowDefinition[_RDT, _PDT]: + """Create a :class:`WorkflowDefinition` instance. + + :return: A new ``WorkflowDefinition`` instance. + """ + return SimpleWorkflowDefinitions( + id=self.id, + name=self.name, + description=self.description, + source_factory=self._build_source_factory(), + processor_factory=self._build_processor_factory(), + sink_factory=self._build_sink_factory(), + ) + + def draw_from( + self, + source: Source[_RDT] | _SourceFactory[_RDT], + ) -> Source[_RDT] | _SourceFactory[_RDT]: + """Add a new :class:`Source` or ``Source`` factory to draw from. + + :param source: A ``Source`` instance or factory function that returns + a ``Source`` instance to draw from. + + :return: + """ + ensure_not_none(source, "'source' MUST not be None.") + + match source: + case Source(): + self._source_factories.append(lambda: source) + case _ if callable(source): + self._source_factories.append(source) + case _: + _err_msg: str = ( + "'source' MUST be a 'sghi.etl.core.Source' instance or a " + "factory function that returns an instance of the same " + "type." + ) + raise ValueError(_err_msg) - def draw_from(self, source: Source[_RDT]) -> Source[_RDT]: ... + return source - def drain_to(self, sink: Sink[_PDT]) -> Sink[_PDT]: ... + def drain_to( + self, + sink: Sink[_PDT] | _SinkFactory[_PDT], + ) -> Sink[_PDT] | _SinkFactory: + """Add a new :class:`Sink` or ``Sink`` factory to drain to. + + :param sink: A ``Sink`` instance or factory function that returns a + ``Sink`` instance to drain to. + + :return: + """ + ensure_not_none(sink, "'sink' MUST not be None.") + + match sink: + case Sink(): + self._sink_factories.append(lambda: sink) + case _ if callable(sink): + self._sink_factories.append(sink) + case _: + _err_msg: str = ( + "'sink' MUST be a 'sghi.etl.core.Sink' instance or a " + "factory function that returns an instance of the same " + "type." + ) + raise ValueError(_err_msg) + + return sink - def process_using( + def apply_processor( self, - processor: Processor[_RDT, _PDT], - ) -> Processor[_RDT, _PDT]: ... + processor: Processor[_RDT, _PDT] | _ProcessorFactory[_RDT, _PDT], + ) -> Processor[_RDT, _PDT] | _ProcessorFactory[_RDT, _PDT]: + """Add a new ``Processor`` or ``Processor`` factory to process using. + + :param processor: A ``Processor`` instance or factory function that + returns a ``Processor`` instance to use when processing the + extracted data. + + :return: + """ + ensure_not_none(processor, "'processor' MUST not be None.") + + match processor: + case Processor(): + self._processor_factories.append(lambda: processor) + case _ if callable(processor): + self._processor_factories.append(processor) + case _: + _err_msg: str = ( + "'processor' MUST be a 'sghi.etl.core.Processor' instance " + "or a factory function that returns an instance of the " + "same type." + ) + raise ValueError(_err_msg) + + return processor + + def _build_source_factory(self) -> _SourceFactory[_RDT]: + ensure_predicate( + bool(self._source_factories), + message=( + "No sources available. At least once source MUST be provided." + ), + exc_factory=RuntimeError, + ) + + match self._source_factories: + case (_, _, *_): + + def _factory() -> Source[_RDT]: + return self.composite_source_factory( + [_sf() for _sf in self._source_factories] + ) + + return _factory + case _: + return self._source_factories[0] + + def _build_processor_factory(self) -> _ProcessorFactory[_RDT, _PDT]: + match self._processor_factories: + case (_, _, *_): + + def _factory() -> Processor[_RDT, _PDT]: + return self.composite_processor_factory( + [_pf() for _pf in self._processor_factories] + ) + + return _factory + case (entry, *_): + return entry + case _: + return self.default_processor_factory + + def _build_sink_factory(self) -> _SinkFactory[_PDT]: + match self._sink_factories: + case (_, _, *_): + + def _factory() -> Sink[_PDT]: + return self.composite_sink_factory( + [_sf() for _sf in self._sink_factories] + ) + + return _factory + case (entry, *_): + return entry + case _: + return self.default_sink_factory diff --git a/src/sghi/etl/commons/workflow_definition.py b/src/sghi/etl/commons/workflow_definition.py new file mode 100644 index 0000000..fba2b90 --- /dev/null +++ b/src/sghi/etl/commons/workflow_definition.py @@ -0,0 +1,100 @@ +"""Common :class:`sghi.etl.core.WorkflowDefinition` implementations.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Generic, TypeVar, override + +from attrs import field, frozen, validators + +from sghi.etl.core import Processor, Sink, Source, WorkflowDefinition + +from .processors import NoOpProcessor +from .sinks import NullSink + +if TYPE_CHECKING: + from collections.abc import Callable + +# ============================================================================= +# TYPES +# ============================================================================= + + +_PDT = TypeVar("_PDT") +"""Processed Data Type.""" + +_RDT = TypeVar("_RDT") +"""Raw Data Type.""" + + +# ============================================================================= +# SPEC IMPLEMENTATIONS +# ============================================================================= + + +@frozen +class SimpleWorkflowDefinitions( + WorkflowDefinition[_RDT, _PDT], + Generic[_RDT, _PDT], +): + """A simple :class:`WorkflowDefinition` implementation.""" + + _id: str = field( + alias="id", + validator=[validators.instance_of(str), validators.min_len(2)], + ) + _name: str = field(alias="name", validator=validators.instance_of(str)) + _source_factory: Callable[[], Source[_RDT]] = field( + alias="source_factory", + repr=False, + validator=validators.is_callable(), + ) + _description: str | None = field( + alias="description", + default=None, + kw_only=True, + validator=validators.optional(validator=validators.instance_of(str)), + ) + _processor_factory: Callable[[], Processor[_RDT, _PDT]] = field( + alias="processor_factory", + default=NoOpProcessor, + kw_only=True, + repr=False, + validator=validators.is_callable(), + ) + _sink_factory: Callable[[], Sink[_PDT]] = field( + alias="sink_factory", + default=NullSink, + kw_only=True, + repr=False, + validator=validators.is_callable(), + ) + + @property + @override + def id(self) -> str: + return self._id + + @property + @override + def name(self) -> str: + return self._name + + @property + @override + def description(self) -> str | None: + return self._description + + @property + @override + def source_factory(self) -> Callable[[], Source[_RDT]]: + return self._source_factory + + @property + @override + def processor_factory(self) -> Callable[[], Processor[_RDT, _PDT]]: + return self._processor_factory + + @property + @override + def sink_factory(self) -> Callable[[], Sink[_PDT]]: + return self._sink_factory