diff --git a/docs/conf.py b/docs/conf.py index fd31148..9952602 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -57,22 +57,26 @@ nitpick_ignore = [ ("py:class", "Processor"), # docs aren't published yet + ("py:class", "Sink"), # docs aren't published yet ("py:class", "Source"), # docs aren't published yet ("py:class", "TracebackType"), # Used as type annotation. Only available when type checking ("py:class", "concurrent.futures._base.Future"), # sphinx can't find it ("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations ("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations + ("py:class", "sghi.etl.commons.sinks._PDT"), # private type annotations ("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations ("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations ("py:class", "sghi.etl.core._RDT"), # private type annotations ("py:class", "sghi.etl.core._PDT"), # private type annotations ("py:class", "sghi.etl.core.Processor"), # docs aren't published yet + ("py:class", "sghi.etl.core.Sink"), # docs aren't published yet ("py:class", "sghi.etl.core.Source"), # docs aren't published yet ("py:exc", "ResourceDisposedError"), # docs aren't published yet ("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet ("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet ("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations ("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations + ("py:obj", "sghi.etl.commons.sinks._PDT"), # private type annotations ("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations ] diff --git a/src/sghi/etl/commons/__init__.py b/src/sghi/etl/commons/__init__.py index 9ba11ea..1e2e842 100644 --- a/src/sghi/etl/commons/__init__.py +++ b/src/sghi/etl/commons/__init__.py @@ -1,6 +1,7 @@ """Collection of utilities for working with SGHI ETL Workflows.""" from .processors import NOOPProcessor, processor +from .sinks import sink from .sources import source from .utils import fail_fast, fail_fast_factory, ignored_failed @@ -10,5 +11,6 @@ "fail_fast_factory", "ignored_failed", "processor", + "sink", "source", ] diff --git a/src/sghi/etl/commons/sinks.py b/src/sghi/etl/commons/sinks.py new file mode 100644 index 0000000..f58439e --- /dev/null +++ b/src/sghi/etl/commons/sinks.py @@ -0,0 +1,156 @@ +"""Common :class:`~sghi.etl.core.Sink` implementations.""" + +from __future__ import annotations + +import logging +from collections.abc import Callable +from functools import update_wrapper +from logging import Logger +from typing import Final, Generic, Self, TypeVar, final + +from typing_extensions import override + +from sghi.disposable import not_disposed +from sghi.etl.core import Sink +from sghi.utils import ensure_callable, type_fqn + +# ============================================================================= +# TYPES +# ============================================================================= + + +_PDT = TypeVar("_PDT") +""""Type variable representing the data type after processing.""" + +_SinkCallable = Callable[[_PDT], None] + + +# ============================================================================= +# TYPES +# ============================================================================= + + +_OF_CALLABLE_LOGGER_PREFIX: Final[str] = f"{__name__}.@sink" + + +# ============================================================================= +# DECORATORS +# ============================================================================= + + +def sink(f: Callable[[_PDT], None]) -> Sink[_PDT]: + """Mark/decorate a ``Callable`` as a :class:`Sink`. + + The result is that the callable is converted into a ``Sink`` instance. + When used as a decorator, invoking the decorated callable has the same + effect as invoking the ``drain`` method of the resulting ``Sink`` instance. + + .. important:: + + The decorated callable *MUST* accept at least one argument but have at + *MOST* one required argument (the processed data to drain/consume). + + .. note:: + + The resulting values are true ``Sink`` instances that can be disposed. + Once disposed, any attempts to invoke these instances will + result in an :exc:`ResourceDisposedError` being raised. + + .. admonition:: Regarding retry safety + :class: tip + + The resulting ``Sink`` is safe to retry if and only if, the decorated + callable is safe to retry. + + :param f: The callable to be decorated. The callable *MUST* have at *MOST* + one required argument (the processed data to drain/consume). + + :return: A ``Sink`` instance. + + :raise ValueError: If the given value is NOT a ``Callable``. + """ + ensure_callable(f, message="A callable object is required.") + + return _SourceOfCallable(delegate_to=f) + + +# ============================================================================= +# SINK IMPLEMENTATIONS +# ============================================================================= + + +@final +class _SourceOfCallable(Sink[_PDT], Generic[_PDT]): + __slots__ = ("_delegate_to", "_is_disposed", "_logger") + + def __init__(self, delegate_to: _SinkCallable[_PDT]) -> None: + super().__init__() + ensure_callable( + value=delegate_to, + message="'delegate_to' MUST be a callable object.", + ) + self._delegate_to: _SinkCallable[_PDT] = delegate_to + self._is_disposed: bool = False + self._logger: Logger = logging.getLogger( + f"{_OF_CALLABLE_LOGGER_PREFIX}({type_fqn(self._delegate_to)})" + ) + update_wrapper(self, self._delegate_to) + + @not_disposed + @override + def __enter__(self) -> Self: + """Return ``self`` upon entering the runtime context. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :return: This instance. + + :raise ResourceDisposedError: If this sink has already been disposed. + """ + return super(Sink, self).__enter__() + + @property + @override + def is_disposed(self) -> bool: + return self._is_disposed + + @not_disposed + @override + def drain(self, processed_data: _PDT) -> None: + """Delegate consumption of the processed data to the wrapped callable. + + .. admonition:: Don't use after dispose + :class: error + + Invoking this method on an instance that is disposed(i.e. the + :attr:`is_disposed` property on the instance is ``True``) will + result in a :exc:`ResourceDisposedError` being raised. + + :param processed_data: The processed data to consume/drain. + + :return: None. + + :raise ResourceDisposedError: If this sink has already been disposed. + """ + self._logger.info("Delegating to '%s'.", type_fqn(self._delegate_to)) + self._delegate_to(processed_data) + + @override + def dispose(self) -> None: + self._is_disposed = True + self._logger.info("Disposal complete.") + + +# ============================================================================= +# MODULE EXPORTS +# ============================================================================= + + +__all__ = [ + "sink", +] diff --git a/test/sghi/etl/commons_tests/sinks_tests.py b/test/sghi/etl/commons_tests/sinks_tests.py new file mode 100644 index 0000000..069bb4b --- /dev/null +++ b/test/sghi/etl/commons_tests/sinks_tests.py @@ -0,0 +1,100 @@ +# ruff: noqa: D205 +"""Tests for the :module:`sghi.etl.commons.sinks` module.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from sghi.disposable import ResourceDisposedError +from sghi.etl.commons import sink +from sghi.etl.core import Sink + +if TYPE_CHECKING: + from collections.abc import Iterable, MutableSequence + + +def test_sink_decorator_delegates_to_the_wrapped_callable() -> None: + """:func:`sink` should delegate to the wrapped callable when invoked.""" + repository: MutableSequence[int] = [] + + def save_ints(values: Iterable[int]) -> None: + repository.extend(values) + + ints_consumer: Sink[Iterable[int]] = sink(save_ints) + ints_consumer(range(5)) + + assert repository == [0, 1, 2, 3, 4] + + +def test_sink_decorator_fails_on_non_callable_input_value() -> None: + """:func:`sink` should raise a :exc:`ValueError` when given a + non-callable` value. + """ + with pytest.raises(ValueError, match="callable object") as exc_info: + sink("Not a function") # type: ignore + + assert exc_info.value.args[0] == "A callable object is required." + + +def test_sink_decorator_fails_on_a_none_input_value() -> None: + """:func:`sink` should raise a :exc:`ValueError` when given a ``None`` + value. + """ + with pytest.raises(ValueError, match="callable object") as exc_info: + sink(None) # type: ignore + + assert exc_info.value.args[0] == "A callable object is required." + + +def test_sink_decorator_returns_expected_value() -> None: + """:func:`sink` should return a ``Sink`` instance.""" + repository: MutableSequence[int] = [] + + @sink + def save_ints(values: Iterable[int]) -> None: + repository.extend(values) + + print_all: Sink[str] = sink(print) + + assert isinstance(save_ints, Sink) + assert isinstance(print_all, Sink) + + +def test_sink_decorated_value_usage_as_a_context_manager() -> None: + """:func:`sink` decorated callables are valid context managers and + should behave correctly when used as so. + """ + repository: MutableSequence[int] = [] + + def save_ints(values: Iterable[int]) -> None: + repository.extend(values) + + with sink(save_ints) as ints_consumer: + ints_consumer(range(5)) + + assert repository == [0, 1, 2, 3, 4] + assert ints_consumer.is_disposed + + +def test_sink_decorated_value_usage_when_is_disposed_fails() -> None: + """Usage of a :func:`sink` decorated callable should raise + :exc:`ResourceDisposedError` when invoked after being disposed. + """ + repository: MutableSequence[int] = [] + + @sink + def save_ints(values: Iterable[int]) -> None: + repository.extend(values) + + save_ints.dispose() + + with pytest.raises(ResourceDisposedError): + save_ints(range(5)) + + with pytest.raises(ResourceDisposedError): + save_ints.drain(range(5)) + + with pytest.raises(ResourceDisposedError): + save_ints.__enter__() diff --git a/test/sghi/etl/commons_tests/sources_tests.py b/test/sghi/etl/commons_tests/sources_tests.py index 393de7b..8de7ee6 100644 --- a/test/sghi/etl/commons_tests/sources_tests.py +++ b/test/sghi/etl/commons_tests/sources_tests.py @@ -1,5 +1,5 @@ # ruff: noqa: D205 -"""Tests for the :module:`sghi.etl.commons.processors` module.""" +"""Tests for the :module:`sghi.etl.commons.sources` module.""" from __future__ import annotations