From b1489dbbb17f31f9185b66666702cc713c912eef Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 31 Oct 2025 10:00:25 +0100 Subject: [PATCH 1/7] feat: Added support for faststream >= 0.6.0 --- .github/workflows/ci.yml | 6 +- pyproject.toml | 2 +- .../test_faststream_di_pass_message.py | 4 +- that_depends/integrations/faststream.py | 177 +++++++++++++----- 4 files changed, 137 insertions(+), 52 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 49a8982b..4b793b76 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,6 +33,9 @@ jobs: - "3.12" - "3.13" - "3.14" + faststream-version: + - "0.5.48" + - "0.6.2" steps: - uses: actions/checkout@v5 - uses: extractions/setup-just@v3 @@ -41,7 +44,8 @@ jobs: cache-dependency-glob: "**/pyproject.toml" - run: uv python install ${{ matrix.python-version }} - run: just install - - run: just test . --cov=. --cov-report xml + - run: uv pip install "faststream==${{ matrix.faststream-version }}" + - run: pytest . --cov=. --cov-report xml - uses: codecov/codecov-action@v5.4.3 env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/pyproject.toml b/pyproject.toml index c68b78a6..398e1a1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ fastapi = [ "fastapi", ] faststream = [ - "faststream<0.6.0" + "faststream" ] [project.urls] diff --git a/tests/integrations/faststream/test_faststream_di_pass_message.py b/tests/integrations/faststream/test_faststream_di_pass_message.py index cc84c69b..77905142 100644 --- a/tests/integrations/faststream/test_faststream_di_pass_message.py +++ b/tests/integrations/faststream/test_faststream_di_pass_message.py @@ -1,7 +1,7 @@ import typing from faststream import BaseMiddleware, Context, Depends -from faststream.broker.message import StreamMessage +from faststream.message import StreamMessage from faststream.nats import NatsBroker, TestNatsBroker from faststream.nats.message import NatsMessage @@ -18,7 +18,7 @@ async def consume_scope( return await call_next(msg) -broker = NatsBroker(middlewares=(ContextMiddleware,), validate=False) +broker = NatsBroker(middlewares=(ContextMiddleware,)) TEST_SUBJECT = "test" diff --git a/that_depends/integrations/faststream.py b/that_depends/integrations/faststream.py index e59ec3db..6d8ba36c 100644 --- a/that_depends/integrations/faststream.py +++ b/that_depends/integrations/faststream.py @@ -1,8 +1,9 @@ import typing +from importlib.metadata import version from types import TracebackType -from typing import Any, Optional +from typing import Any, Final, Optional -from faststream import BaseMiddleware +from packaging.version import Version from typing_extensions import override from that_depends import container_context @@ -10,49 +11,129 @@ from that_depends.utils import UNSET, Unset, is_set -class DIContextMiddleware(BaseMiddleware): - """Initializes the container context for faststream brokers.""" - - def __init__( - self, - *context_items: SupportsContext[Any], - global_context: dict[str, Any] | Unset = UNSET, - scope: ContextScope | Unset = UNSET, - ) -> None: - """Initialize the container context middleware. - - Args: - *context_items (SupportsContext[Any]): Context items to initialize. - global_context (dict[str, Any] | Unset): Global context to initialize the container. - scope (ContextScope | Unset): Context scope to initialize the container. - - """ - super().__init__() - self._context: container_context | None = None - self._context_items = set(context_items) - self._global_context = global_context - self._scope = scope - - @override - async def on_receive(self) -> None: - self._context = container_context( - *self._context_items, - scope=self._scope if is_set(self._scope) else None, - global_context=self._global_context if is_set(self._global_context) else None, - ) - await self._context.__aenter__() - - @override - async def after_processed( - self, - exc_type: type[BaseException] | None = None, - exc_val: BaseException | None = None, - exc_tb: Optional["TracebackType"] = None, - ) -> bool | None: - if self._context is not None: - await self._context.__aexit__(exc_type, exc_val, exc_tb) - return None - - def __call__(self, *args: typing.Any, **kwargs: typing.Any) -> "DIContextMiddleware": # noqa: ARG002, ANN401 - """Create an instance of DIContextMiddleware.""" - return DIContextMiddleware(*self._context_items, scope=self._scope, global_context=self._global_context) +_FASTSTREAM_MODULE_NAME: Final[str] = "faststream" +_FASTSTREAM_VERSION: Final[str] = version(_FASTSTREAM_MODULE_NAME) +if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): + from faststream import BaseMiddleware, ContextRepo + + class DIContextMiddleware(BaseMiddleware): + """Initializes the container context for faststream brokers.""" + + def __init__( + self, + *context_items: SupportsContext[Any], + msg: Any = None, # noqa: ANN401 + context: Optional["ContextRepo"] = None, + global_context: dict[str, Any] | Unset = UNSET, + scope: ContextScope | Unset = UNSET, + ) -> None: + """Initialize the container context middleware. + + Args: + *context_items (SupportsContext[Any]): Context items to initialize. + msg (Any): Message object (for faststream v0.6+ compatibility). + context (ContextRepo): Context repository (for faststream v0.6+ compatibility). + global_context (dict[str, Any] | Unset): Global context to initialize the container. + scope (ContextScope | Unset): Context scope to initialize the container. + + """ + super().__init__(msg, context=context) # type: ignore[arg-type] + self._context: container_context | None = None + self._context_items = set(context_items) + self._global_context = global_context + self._scope = scope + + @override + async def on_receive(self) -> None: + self._context = container_context( + *self._context_items, + scope=self._scope if is_set(self._scope) else None, + global_context=self._global_context if is_set(self._global_context) else None, + ) + await self._context.__aenter__() + + @override + async def after_processed( + self, + exc_type: type[BaseException] | None = None, + exc_val: BaseException | None = None, + exc_tb: Optional["TracebackType"] = None, + ) -> bool | None: + if self._context is not None: + await self._context.__aexit__(exc_type, exc_val, exc_tb) + return None + + def __call__(self, msg: Any = None, **kwargs: Any) -> "DIContextMiddleware": # noqa: ANN401 + """Create an instance of DIContextMiddleware. + + Args: + msg (Any): Message object (for faststream v0.6+ compatibility). + **kwargs: Additional keyword arguments (context for v0.6+). + + Returns: + DIContextMiddleware: A new instance of DIContextMiddleware. + + """ + # Extract context if present (v0.6+) + context = kwargs.get("context") + + return DIContextMiddleware( + *self._context_items, + msg=msg, + context=context, + scope=self._scope, + global_context=self._global_context, + ) +else: + from faststream import BaseMiddleware + + class DIContextMiddleware(BaseMiddleware): # type: ignore[no-redef] + """Initializes the container context for faststream brokers.""" + + def __init__( + self, + *context_items: SupportsContext[Any], + global_context: dict[str, Any] | Unset = UNSET, + scope: ContextScope | Unset = UNSET, + ) -> None: + """Initialize the container context middleware. + + Args: + *context_items (SupportsContext[Any]): Context items to initialize. + global_context (dict[str, Any] | Unset): Global context to initialize the container. + scope (ContextScope | Unset): Context scope to initialize the container. + + """ + super().__init__() # type: ignore[call-arg] + self._context: container_context | None = None + self._context_items = set(context_items) + self._global_context = global_context + self._scope = scope + + @override + async def on_receive(self) -> None: + self._context = container_context( + *self._context_items, + scope=self._scope if is_set(self._scope) else None, + global_context=self._global_context if is_set(self._global_context) else None, + ) + await self._context.__aenter__() + + @override + async def after_processed( + self, + exc_type: type[BaseException] | None = None, + exc_val: BaseException | None = None, + exc_tb: Optional["TracebackType"] = None, + ) -> bool | None: + if self._context is not None: + await self._context.__aexit__(exc_type, exc_val, exc_tb) + return None + + def __call__(self, *args: typing.Any, **kwargs: typing.Any) -> "DIContextMiddleware": # noqa: ARG002, ANN401 + """Create an instance of DIContextMiddleware.""" + return DIContextMiddleware(*self._context_items, scope=self._scope, global_context=self._global_context) + + +if __name__ == "__main__": + pass From 1c35a2cf58b43124b647468816bee5cd9d1cc4b6 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 31 Oct 2025 10:12:21 +0100 Subject: [PATCH 2/7] ci: Fix faststream installation. --- .github/workflows/ci.yml | 3 +-- that_depends/integrations/faststream.py | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4b793b76..b4fdbba3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,8 +44,7 @@ jobs: cache-dependency-glob: "**/pyproject.toml" - run: uv python install ${{ matrix.python-version }} - run: just install - - run: uv pip install "faststream==${{ matrix.faststream-version }}" - - run: pytest . --cov=. --cov-report xml + - run: uv run --with faststream==${{ matrix.faststream-version }} pytest . --cov=. --cov-report xml - uses: codecov/codecov-action@v5.4.3 env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/that_depends/integrations/faststream.py b/that_depends/integrations/faststream.py index 6d8ba36c..7bbdd700 100644 --- a/that_depends/integrations/faststream.py +++ b/that_depends/integrations/faststream.py @@ -4,7 +4,7 @@ from typing import Any, Final, Optional from packaging.version import Version -from typing_extensions import override +from typing_extensions import deprecated, override from that_depends import container_context from that_depends.providers.context_resources import ContextScope, SupportsContext @@ -87,6 +87,7 @@ def __call__(self, msg: Any = None, **kwargs: Any) -> "DIContextMiddleware": # else: from faststream import BaseMiddleware + @deprecated("Will be removed with faststream v1") class DIContextMiddleware(BaseMiddleware): # type: ignore[no-redef] """Initializes the container context for faststream brokers.""" From af7c92a000bc1a66d33aa943204a7dee02e9e229 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 31 Oct 2025 10:19:31 +0100 Subject: [PATCH 3/7] test: Fix tests. --- .../faststream/test_faststream_di_pass_message.py | 9 ++++++++- that_depends/integrations/faststream.py | 13 ++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/integrations/faststream/test_faststream_di_pass_message.py b/tests/integrations/faststream/test_faststream_di_pass_message.py index 77905142..46f31f7f 100644 --- a/tests/integrations/faststream/test_faststream_di_pass_message.py +++ b/tests/integrations/faststream/test_faststream_di_pass_message.py @@ -1,11 +1,18 @@ import typing from faststream import BaseMiddleware, Context, Depends -from faststream.message import StreamMessage from faststream.nats import NatsBroker, TestNatsBroker from faststream.nats.message import NatsMessage +from packaging.version import Version from that_depends import BaseContainer, container_context, fetch_context_item, providers +from that_depends.integrations.faststream import _FASTSTREAM_VERSION + + +if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): + from faststream.message import StreamMessage +else: + from faststream.broker.message import StreamMessage # type: ignore[import-not-found, no-redef] class ContextMiddleware(BaseMiddleware): diff --git a/that_depends/integrations/faststream.py b/that_depends/integrations/faststream.py index 7bbdd700..a43537e5 100644 --- a/that_depends/integrations/faststream.py +++ b/that_depends/integrations/faststream.py @@ -31,8 +31,8 @@ def __init__( Args: *context_items (SupportsContext[Any]): Context items to initialize. - msg (Any): Message object (for faststream v0.6+ compatibility). - context (ContextRepo): Context repository (for faststream v0.6+ compatibility). + msg (Any): Message object. + context (ContextRepo): Context repository. global_context (dict[str, Any] | Unset): Global context to initialize the container. scope (ContextScope | Unset): Context scope to initialize the container. @@ -67,14 +67,13 @@ def __call__(self, msg: Any = None, **kwargs: Any) -> "DIContextMiddleware": # """Create an instance of DIContextMiddleware. Args: - msg (Any): Message object (for faststream v0.6+ compatibility). - **kwargs: Additional keyword arguments (context for v0.6+). + msg (Any): Message object. + **kwargs: Additional keyword arguments. Returns: DIContextMiddleware: A new instance of DIContextMiddleware. """ - # Extract context if present (v0.6+) context = kwargs.get("context") return DIContextMiddleware( @@ -134,7 +133,3 @@ async def after_processed( def __call__(self, *args: typing.Any, **kwargs: typing.Any) -> "DIContextMiddleware": # noqa: ARG002, ANN401 """Create an instance of DIContextMiddleware.""" return DIContextMiddleware(*self._context_items, scope=self._scope, global_context=self._global_context) - - -if __name__ == "__main__": - pass From 6f22f6c02cd36f8aef7dbd7d6fcec06786c6e0d3 Mon Sep 17 00:00:00 2001 From: Alexander Date: Fri, 31 Oct 2025 10:29:24 +0100 Subject: [PATCH 4/7] test: Fixed broker init. --- .github/workflows/ci.yml | 6 +++--- .../faststream/test_faststream_di_pass_message.py | 10 +++++++--- that_depends/integrations/faststream.py | 4 ++-- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b4fdbba3..39eb8cfa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,8 +34,8 @@ jobs: - "3.13" - "3.14" faststream-version: - - "0.5.48" - - "0.6.2" + - "<0.6.0" + - ">=0.6.0" steps: - uses: actions/checkout@v5 - uses: extractions/setup-just@v3 @@ -44,7 +44,7 @@ jobs: cache-dependency-glob: "**/pyproject.toml" - run: uv python install ${{ matrix.python-version }} - run: just install - - run: uv run --with faststream==${{ matrix.faststream-version }} pytest . --cov=. --cov-report xml + - run: uv run --with "faststream${{ matrix.faststream-version }}" pytest . --cov=. --cov-report xml - uses: codecov/codecov-action@v5.4.3 env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/tests/integrations/faststream/test_faststream_di_pass_message.py b/tests/integrations/faststream/test_faststream_di_pass_message.py index 46f31f7f..0ca62d1a 100644 --- a/tests/integrations/faststream/test_faststream_di_pass_message.py +++ b/tests/integrations/faststream/test_faststream_di_pass_message.py @@ -9,9 +9,9 @@ from that_depends.integrations.faststream import _FASTSTREAM_VERSION -if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): +if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): # pragma: no cover from faststream.message import StreamMessage -else: +else: # pragma: no cover from faststream.broker.message import StreamMessage # type: ignore[import-not-found, no-redef] @@ -25,7 +25,11 @@ async def consume_scope( return await call_next(msg) -broker = NatsBroker(middlewares=(ContextMiddleware,)) +if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): # pragma: no cover + broker = NatsBroker(middlewares=(ContextMiddleware,)) + +else: # pragma: no cover + broker = NatsBroker(middlewares=(ContextMiddleware,), validate=False) # type: ignore[call-arg] TEST_SUBJECT = "test" diff --git a/that_depends/integrations/faststream.py b/that_depends/integrations/faststream.py index a43537e5..368792ec 100644 --- a/that_depends/integrations/faststream.py +++ b/that_depends/integrations/faststream.py @@ -13,7 +13,7 @@ _FASTSTREAM_MODULE_NAME: Final[str] = "faststream" _FASTSTREAM_VERSION: Final[str] = version(_FASTSTREAM_MODULE_NAME) -if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): +if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): # pragma: no cover from faststream import BaseMiddleware, ContextRepo class DIContextMiddleware(BaseMiddleware): @@ -83,7 +83,7 @@ def __call__(self, msg: Any = None, **kwargs: Any) -> "DIContextMiddleware": # scope=self._scope, global_context=self._global_context, ) -else: +else: # pragma: no cover from faststream import BaseMiddleware @deprecated("Will be removed with faststream v1") From fde4dfca753b667d681c6476b51a604bcc03ab2a Mon Sep 17 00:00:00 2001 From: Alex <60134319+xelandernt@users.noreply.github.com> Date: Tue, 4 Nov 2025 12:30:01 +0100 Subject: [PATCH 5/7] Update that_depends/integrations/faststream.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- that_depends/integrations/faststream.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/that_depends/integrations/faststream.py b/that_depends/integrations/faststream.py index 368792ec..db01a701 100644 --- a/that_depends/integrations/faststream.py +++ b/that_depends/integrations/faststream.py @@ -74,7 +74,14 @@ def __call__(self, msg: Any = None, **kwargs: Any) -> "DIContextMiddleware": # DIContextMiddleware: A new instance of DIContextMiddleware. """ - context = kwargs.get("context") + # Explicitly extract and validate 'context' + context = None + if "context" in kwargs: + context = kwargs.pop("context") + if context is not None and not isinstance(context, ContextRepo): + raise TypeError(f"'context' must be of type ContextRepo or None, got {type(context).__name__}") + if kwargs: + raise TypeError(f"Unexpected keyword arguments: {', '.join(kwargs.keys())}") return DIContextMiddleware( *self._context_items, From 267e3c526079972497e09697d68607599a96c219 Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 4 Nov 2025 12:34:32 +0100 Subject: [PATCH 6/7] Revert "Update that_depends/integrations/faststream.py" This reverts commit fde4dfca753b667d681c6476b51a604bcc03ab2a. --- that_depends/integrations/faststream.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/that_depends/integrations/faststream.py b/that_depends/integrations/faststream.py index db01a701..368792ec 100644 --- a/that_depends/integrations/faststream.py +++ b/that_depends/integrations/faststream.py @@ -74,14 +74,7 @@ def __call__(self, msg: Any = None, **kwargs: Any) -> "DIContextMiddleware": # DIContextMiddleware: A new instance of DIContextMiddleware. """ - # Explicitly extract and validate 'context' - context = None - if "context" in kwargs: - context = kwargs.pop("context") - if context is not None and not isinstance(context, ContextRepo): - raise TypeError(f"'context' must be of type ContextRepo or None, got {type(context).__name__}") - if kwargs: - raise TypeError(f"Unexpected keyword arguments: {', '.join(kwargs.keys())}") + context = kwargs.get("context") return DIContextMiddleware( *self._context_items, From 691032f6da1805fc9ad6c9da052e42f1f2ebe0bd Mon Sep 17 00:00:00 2001 From: alex Date: Tue, 4 Nov 2025 12:37:54 +0100 Subject: [PATCH 7/7] review: Change type hint for message. --- that_depends/integrations/faststream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/that_depends/integrations/faststream.py b/that_depends/integrations/faststream.py index 368792ec..67d9fc64 100644 --- a/that_depends/integrations/faststream.py +++ b/that_depends/integrations/faststream.py @@ -15,6 +15,7 @@ _FASTSTREAM_VERSION: Final[str] = version(_FASTSTREAM_MODULE_NAME) if Version(_FASTSTREAM_VERSION) >= Version("0.6.0"): # pragma: no cover from faststream import BaseMiddleware, ContextRepo + from faststream._internal.types import AnyMsg class DIContextMiddleware(BaseMiddleware): """Initializes the container context for faststream brokers.""" @@ -22,7 +23,7 @@ class DIContextMiddleware(BaseMiddleware): def __init__( self, *context_items: SupportsContext[Any], - msg: Any = None, # noqa: ANN401 + msg: AnyMsg | None = None, context: Optional["ContextRepo"] = None, global_context: dict[str, Any] | Unset = UNSET, scope: ContextScope | Unset = UNSET,