Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): add a source decorator #5

Merged
merged 1 commit into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,23 @@

nitpick_ignore = [
("py:class", "Processor"), # docs aren't published yet
("py:class", "Source"), # docs aren't published yet
("py:class", "TracebackType"), # Used as type annotation. Only available when type checking
("py:class", "concurrent.futures._base.Future"), # sphinx can't find it
("py:class", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:class", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:class", "sghi.etl.commons.sources._RDT"), # private type annotations
("py:class", "sghi.etl.commons.utils.result_gatherers._T"), # private type annotations
("py:class", "sghi.etl.core._RDT"), # private type annotations
("py:class", "sghi.etl.core._PDT"), # private type annotations
("py:class", "sghi.etl.core.Processor"), # docs aren't published yet
("py:class", "sghi.etl.core.Source"), # docs aren't published yet
("py:exc", "ResourceDisposedError"), # docs aren't published yet
("py:exc", "sghi.disposable.ResourceDisposedError"), # docs aren't published yet
("py:func", "sghi.disposable.not_disposed"), # docs aren't published yet
("py:obj", "sghi.etl.commons.processors._PDT"), # private type annotations
("py:obj", "sghi.etl.commons.processors._RDT"), # private type annotations
("py:obj", "sghi.etl.commons.sources._RDT"), # private type annotations
]

templates_path = ["templates"]
Expand Down
2 changes: 2 additions & 0 deletions src/sghi/etl/commons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Collection of utilities for working with SGHI ETL Workflows."""

from .processors import NOOPProcessor, processor
from .sources import source
from .utils import fail_fast, fail_fast_factory, ignored_failed

__all__ = [
Expand All @@ -9,4 +10,5 @@
"fail_fast_factory",
"ignored_failed",
"processor",
"source",
]
155 changes: 155 additions & 0 deletions src/sghi/etl/commons/sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""Common :class:`~sghi.etl.core.Source` implementations."""

from __future__ import annotations

import logging
from collections.abc import Callable
from functools import update_wrapper
from logging import Logger
from typing import Final, Generic, Self, TypeVar, final

from typing_extensions import override

from sghi.disposable import not_disposed
from sghi.etl.core import Source
from sghi.utils import ensure_callable, type_fqn

# =============================================================================
# TYPES
# =============================================================================


_RDT = TypeVar("_RDT")
"""Raw Data Type."""

_SourceCallable = Callable[[], _RDT]


# =============================================================================
# TYPES
# =============================================================================


_OF_CALLABLE_LOGGER_PREFIX: Final[str] = f"{__name__}.@source"


# =============================================================================
# DECORATORS
# =============================================================================


def source(f: Callable[[], _RDT]) -> Source[_RDT]:
"""Mark/decorate a ``Callable`` as a :class:`Source`.

The result is that the callable is converted into a ``Source`` instance.
When used as a decorator, invoking the decorated callable has the same
effect as invoking the ``draw`` method of the resulting ``Source``
instance.

.. important::

The decorated callable *MUST NOT* have any required arguments but MUST
return a value (the drawn data).

.. note::

The resulting values are true ``Source`` instances that can be
disposed. Once disposed, any attempts to invoke these instances will
result in an :exc:`ResourceDisposedError` being raised.

.. admonition:: Regarding retry safety
:class: tip

The resulting ``Source`` is safe to retry if and only if, the
decorated callable is safe to retry.

:param f: The callable to be decorated. The callable *MUST NOT* have any
required arguments but *MUST* return a value (the drawn data).

:return: A ``Source`` instance.

:raise ValueError: If the given value is NOT a ``Callable``.
"""
ensure_callable(f, message="A callable object is required.")

return _SourceOfCallable(delegate_to=f)


# =============================================================================
# SOURCE IMPLEMENTATIONS
# =============================================================================


@final
class _SourceOfCallable(Source[_RDT], Generic[_RDT]):
__slots__ = ("_delegate_to", "_is_disposed", "_logger")

def __init__(self, delegate_to: _SourceCallable[_RDT]) -> None:
super().__init__()
ensure_callable(
value=delegate_to,
message="'delegate_to' MUST be a callable object.",
)
self._delegate_to: _SourceCallable[_RDT] = delegate_to
self._is_disposed: bool = False
self._logger: Logger = logging.getLogger(
f"{_OF_CALLABLE_LOGGER_PREFIX}({type_fqn(self._delegate_to)})"
)
update_wrapper(self, self._delegate_to)

@not_disposed
@override
def __enter__(self) -> Self:
"""Return ``self`` upon entering the runtime context.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:return: This instance.

:raise ResourceDisposedError: If this source has already been disposed.
"""
return super(Source, self).__enter__()

@property
@override
def is_disposed(self) -> bool:
return self._is_disposed

@not_disposed
@override
def draw(self) -> _RDT:
"""Delegate data retrival to the wrapped callable.

.. admonition:: Don't use after dispose
:class: error

Invoking this method on an instance that is disposed(i.e. the
:attr:`is_disposed` property on the instance is ``True``) will
result in a :exc:`ResourceDisposedError` being raised.

:return: The drawn, raw data as returned by the wrapped callable.

:raise ResourceDisposedError: If this source has already been disposed.
"""
self._logger.info("Delegating to '%s'.", type_fqn(self._delegate_to))
return self._delegate_to()

@override
def dispose(self) -> None:
self._is_disposed = True
self._logger.info("Disposal complete.")


# =============================================================================
# MODULE EXPORTS
# =============================================================================


__all__ = [
"source",
]
95 changes: 95 additions & 0 deletions test/sghi/etl/commons_tests/sources_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# ruff: noqa: D205
"""Tests for the :module:`sghi.etl.commons.processors` module."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pytest

from sghi.disposable import ResourceDisposedError
from sghi.etl.commons import source
from sghi.etl.core import Source

if TYPE_CHECKING:
from collections.abc import Iterable


def test_source_decorator_delegates_to_the_wrapped_callable() -> None:
""":func:`source` should delegate to the wrapped callable when invoked."""

def supply_ints(count: int = 4) -> Iterable[int]:
yield from range(count)

int_supplier_source: Source[Iterable[int]] = source(supply_ints)

assert list(int_supplier_source()) == list(supply_ints()) == [0, 1, 2, 3]


def test_source_decorator_fails_on_non_callable_input_value() -> None:
""":func:`source` should raise a :exc:`ValueError` when given a
non-callable` value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
source("Not a function") # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_source_decorator_fails_on_a_none_input_value() -> None:
""":func:`source` should raise a :exc:`ValueError` when given a ``None``
value.
"""
with pytest.raises(ValueError, match="callable object") as exc_info:
source(None) # type: ignore

assert exc_info.value.args[0] == "A callable object is required."


def test_source_decorator_returns_expected_value() -> None:
""":func:`source` should return a ``Source`` instance."""

@source
def supply_ints(count: int = 5) -> Iterable[int]:
yield from range(count)

empty_string_supplier: Source[str] = source(str)

assert isinstance(supply_ints, Source)
assert isinstance(empty_string_supplier, Source)


def test_source_decorated_value_usage_as_a_context_manager() -> None:
""":func:`source` decorated callables are valid context managers and
should behave correctly when used as so.
"""

def supply_ints(count: int = 5) -> Iterable[int]:
yield from range(count)

with source(supply_ints) as int_supplier:
result: tuple[int, ...] = tuple(int_supplier())

assert result == (0, 1, 2, 3, 4)
assert int_supplier.is_disposed


def test_source_decorated_value_usage_when_is_disposed_fails() -> None:
"""Usage of a :func:`source` decorated callable should raise
:exc:`ResourceDisposedError` when invoked after being disposed.
"""

@source
def supply_ints(count: int = 5) -> Iterable[int]:
yield from range(count)

supply_ints.dispose()

with pytest.raises(ResourceDisposedError):
supply_ints()

with pytest.raises(ResourceDisposedError):
supply_ints.draw()

with pytest.raises(ResourceDisposedError):
supply_ints.__enter__()
Loading