diff --git a/docs/conf.py b/docs/conf.py index 9952602..d8aad8d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -71,6 +71,7 @@ ("py:class", "sghi.etl.core.Processor"), # docs aren't published yet ("py:class", "sghi.etl.core.Sink"), # docs aren't published yet ("py:class", "sghi.etl.core.Source"), # docs aren't published yet + ("py:class", "sghi.retry.Retry"), # docs aren't published yet ("py:exc", "ResourceDisposedError"), # docs aren't published yet ("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet ("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index 494eac8..c43e42d 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -1,6 +1,11 @@ """Collection of utilities for working with SGHI ETL Workflows.""" -from .processors import NOOPProcessor, processor +from .processors import ( + NOOPProcessor, + ProcessorPipe, + pipe_processors, + processor, +) from .sinks import NullSink, sink from .sources import source from .utils import fail_fast, fail_fast_factory, ignored_failed @@ -8,9 +13,11 @@ __all__ = [ "NOOPProcessor", "NullSink", + "ProcessorPipe", "fail_fast", "fail_fast_factory", "ignored_failed", + "pipe_processors", "processor", "sink", "source", diff --git a/src/sghi/etl/commons/processors.py b/src/sghi/etl/commons/processors.py index 98add64..5240bc1 100644 --- a/src/sghi/etl/commons/processors.py +++ b/src/sghi/etl/commons/processors.py @@ -3,16 +3,19 @@ from __future__ import annotations import logging -from collections.abc import Callable +from collections.abc import Callable, Sequence +from contextlib import ExitStack from functools import update_wrapper from logging import Logger -from typing import Final, Generic, Self, TypeVar, final +from typing import Any, Final, Generic, Self, TypeVar, final from typing_extensions import override from sghi.disposable import not_disposed from sghi.etl.core import Processor -from sghi.utils import ensure_callable, type_fqn +from sghi.retry import Retry, noop_retry +from sghi.task import Task, pipe, task +from sghi.utils import ensure_callable, ensure_not_none_nor_empty, type_fqn # ============================================================================= # TYPES @@ -161,6 +164,170 @@ def dispose(self) -> None: self._logger.info("Disposal complete.") +@final +class ProcessorPipe(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): + """A :class:`Processor` that pipes raw data to other embedded processors. + + This ``Processor`` pipes the raw data applied to it through a series of + other ``Processor`` instances, passing the output of one ``Processor`` as + the input to the next. If an unhandled error occurs in one of the embedded + processors, the entire pipeline fails and propagates the error to the + caller. + + Instances of this class are **NOT SAFE** to retry and **SHOULD NEVER** be + retried. However, they do support retrying their embedded processors. This + is disabled by default but can be enabled by providing a suitable value to + the ``retry_policy_factory`` constructor parameter when creating new + instances. When enabled, each embedded processor will be retried + individually per the specified retry policy in case it fails. + + Disposing instances of this class also disposes of their embedded + processors. + + .. admonition:: Regarding retry safety + :class: tip + + Instances of this ``Processor`` are **NOT SAFE** to retry. + """ + + __slots__ = ( + "_processors", + "_retry_policy_factory", + "_is_disposed", + "_logger", + "_exit_stack", + "_prepped_processors", + ) + + def __init__( + self, + processors: Sequence[Processor[Any, Any]], + retry_policy_factory: Callable[[], Retry] = noop_retry, + ) -> None: + """Create a new ``ProcessorPipe`` instance with the given properties. + + :param processors: A ``Sequence`` of processors to pipe the raw data + applied to this processor. This *MUST NOT* be empty. + :param retry_policy_factory: A function that supplies retry policy + instance(s) to apply to each embedded processor. This MUST be a + callable object. Defaults to a factory that returns retry policies + that do nothing. + + :raise ValueError: If ``processors`` is ``None`` or empty, or if + ``retry_policy_factory`` is NOT a callable object. + """ + super().__init__() + ensure_not_none_nor_empty( + value=processors, + message="'processors' MUST NOT be None or empty.", + ) + self._processors: Sequence[Processor[Any, Any]] + self._processors = tuple(processors) + self._retry_policy_factory: Callable[[], Retry] = ensure_callable( + value=retry_policy_factory, + message="'retry_policy_factory' MUST be a callable.", + ) + self._is_disposed: bool = False + self._logger: Logger = logging.getLogger(type_fqn(self.__class__)) + self._exit_stack: ExitStack = ExitStack() + + # Prepare embedded processors for execution by ensuring that they are + # all disposed of properly once this object is disposed. + self._prepped_processors: Sequence[Task[Any, Any]] = tuple( + self._processor_to_task(self._exit_stack.push(_processor)) + for _processor in self._processors + ) + + @not_disposed + @override + def __enter__(self) -> Self: + """Return ``self`` upon entering the runtime context. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :return: This instance. + + :raise ResourceDisposedError: If this processor has already been + disposed. + """ + return super(Processor, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def apply(self, raw_data: _RDT) -> _PDT: + """Pipe the given raw data through all the embedded processors. + + The output of each embedded ``Processor`` becomes the input to the next + one. The result of the final ``Processor`` is the output of this apply + operation. If an unhandled error occurs in one of the embedded + processors, the entire operation fails and propagates the error to the + caller. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :param raw_data: The data to be processed. + + :return: The processed data after being piped through the embedded + processors. + + :raise ResourceDisposedError: If this processor has already been + disposed. + """ + self._logger.info("Piping received data through all processors.") + return pipe(*self._prepped_processors).execute(raw_data) + + @override + def dispose(self) -> None: + """Release any underlying resources contained by this processor. + + All embedded processors are also disposed. After this method returns + successfully, the :attr:`is_disposed` property should return ``True``. + + .. note:: + Unless otherwise specified, trying to use methods of a + ``Disposable`` instance decorated with the + :func:`~sghi.disposable.not_disposed` decorator after this method + returns should generally be considered a programming error and + should result in a :exc:`~sghi.disposable.ResourceDisposedError` + being raised. + + This method should be idempotent allowing it to be called more + than once; only the first call, however, should have an effect. + + :return: None. + """ + self._is_disposed = True + self._exit_stack.close() + self._logger.info("Disposal complete.") + + def _processor_to_task(self, p: Processor[_RDT, _PDT]) -> Task[_RDT, _PDT]: + @task + def do_apply(raw_data: _RDT) -> _PDT: + with p as _p: + apply = self._retry_policy_factory().retry(_p.apply) + return apply(raw_data) + + return do_apply + + +pipe_processors = ProcessorPipe + + @final class _ProcessorOfCallable(Processor[_RDT, _PDT], Generic[_RDT, _PDT]): __slots__ = ("_delegate_to", "_is_disposed", "_logger") @@ -237,5 +404,7 @@ def dispose(self) -> None: __all__ = [ "NOOPProcessor", + "ProcessorPipe", + "pipe_processors", "processor", ] diff --git a/test/sghi/etl/commons_tests/processors_tests.py b/test/sghi/etl/commons_tests/processors_tests.py index ada3059..bde0dd1 100644 --- a/test/sghi/etl/commons_tests/processors_tests.py +++ b/test/sghi/etl/commons_tests/processors_tests.py @@ -3,15 +3,20 @@ from __future__ import annotations +from typing import TYPE_CHECKING from unittest import TestCase import pytest +from typing_extensions import override from sghi.disposable import ResourceDisposedError -from sghi.etl.commons import NOOPProcessor, processor +from sghi.etl.commons import NOOPProcessor, ProcessorPipe, processor from sghi.etl.core import Processor from sghi.task import task +if TYPE_CHECKING: + from collections.abc import Iterable, Sequence + def test_processor_decorator_delegates_to_the_wrapped_callable() -> None: """:func:`processor` should delegate to the wrapped callable when @@ -138,7 +143,7 @@ def test_dispose_has_the_intended_side_effects(self) -> None: assert instance.is_disposed def test_multiple_dispose_invocations_is_okay(self) -> None: - """Calling :meth:`NOOPProcessor.dispose` should be okay. + """Calling :meth:`NOOPProcessor.dispose` multiple times should be okay. No errors should be raised and the object should remain disposed. """ @@ -185,3 +190,125 @@ def test_usage_when_is_disposed_fails(self) -> None: with pytest.raises(ResourceDisposedError): instance.__enter__() + + +class TestProcessorPipe(TestCase): + """Tests for the :class:`sghi.etl.commons.ProcessorPipe` class.""" + + @override + def setUp(self) -> None: + super().setUp() + + @processor + def add_65(ints: Iterable[int]) -> Iterable[int]: + yield from (v + 65 for v in ints) + + @processor + def ints_to_chars(ints: Iterable[int]) -> Iterable[str]: + yield from map(chr, ints) + + @processor + def join_chars(values: Iterable[str]) -> str: + return "".join(list(values)) + + self._embedded_processors: Sequence[Processor] = [ + add_65, + ints_to_chars, + join_chars, + ] + self._instance: Processor[Iterable[int], str] = ProcessorPipe( + processors=self._embedded_processors, + ) + + @override + def tearDown(self) -> None: + super().tearDown() + self._instance.dispose() + + def test_apply_returns_the_expected_value(self) -> None: + """:meth:`ProcessorPipe.apply` should return the result after applying + the given raw data through its embedded processors. + """ + assert self._instance.apply(range(10)) == "ABCDEFGHIJ" + + def test_instantiation_fails_on_none_processors_argument(self) -> None: + """Instantiating a :class:`ProcessorPipe` with a ``None`` + ``processors`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="None or empty") as exp_info: + ProcessorPipe(processors=None) # type: ignore + + assert ( + exp_info.value.args[0] == "'processors' MUST NOT be None or empty." + ) + + def test_instantiation_fails_on_an_empty_processors_argument(self) -> None: + """Instantiating a :class:`ProcessorPipe` with an empty + ``processors`` argument should raise a :exc:`ValueError`. + """ + with pytest.raises(ValueError, match="None or empty") as exp_info: + ProcessorPipe(processors=[]) + + assert ( + exp_info.value.args[0] == "'processors' MUST NOT be None or empty." + ) + + def test_dispose_has_the_intended_side_effects(self) -> None: + """Calling :meth:`ProcessorPipe.dispose` should result in the + :attr:`ProcessorPipe.is_disposed` property being set to ``True``. + + Each embedded ``Processor`` should also be disposed. + """ + self._instance.dispose() + + assert self._instance.is_disposed + for _processor in self._embedded_processors: + assert _processor.is_disposed + + def test_multiple_dispose_invocations_is_okay(self) -> None: + """Calling :meth:`ProcessorPipe.dispose` multiple times should be okay. + + No errors should be raised and the object should remain disposed. + """ + for _ in range(10): + try: + self._instance.dispose() + except Exception as exc: # noqa: BLE001 + fail_reason: str = ( + "Calling 'ProcessorPipe.dispose()' multiple times should " + f"be okay. But the following error was raised: {exc!s}" + ) + pytest.fail(fail_reason) + + assert self._instance.is_disposed + for _processor in self._embedded_processors: + assert _processor.is_disposed + + def test_usage_as_a_context_manager_behaves_as_expected(self) -> None: + """:class:`ProcessorPipe` instances are valid context managers and + should behave correctly when used as so. + """ + with self._instance: + assert self._instance.apply(range(5, 10)) == "FGHIJ" + + assert self._instance.is_disposed + for _processor in self._embedded_processors: + assert _processor.is_disposed + + def test_usage_when_is_disposed_fails(self) -> None: + """Invoking "resource-aware" methods of a disposed instance should + result in an :exc:`ResourceDisposedError` being raised. + + Specifically, invoking the following two methods on a disposed instance + should fail: + + - :meth:`ProcessorPipe.__enter__` + - :meth:`ProcessorPipe.apply` + """ + self._instance.dispose() + + with pytest.raises(ResourceDisposedError): + self._instance.apply(range(5)) + + with pytest.raises(ResourceDisposedError): + self._instance.__enter__() diff --git a/test/sghi/etl/commons_tests/sinks_tests.py b/test/sghi/etl/commons_tests/sinks_tests.py index 1a5d22f..e5e6035 100644 --- a/test/sghi/etl/commons_tests/sinks_tests.py +++ b/test/sghi/etl/commons_tests/sinks_tests.py @@ -114,7 +114,7 @@ def test_dispose_has_the_intended_side_effects(self) -> None: assert instance.is_disposed def test_multiple_dispose_invocations_is_okay(self) -> None: - """Calling :meth:`NullSink.dispose` should be okay. + """Calling :meth:`NullSink.dispose` multiple times should be okay. No errors should be raised and the object should remain disposed. """