From e0d8f67b24fac4ed5bcb6af3070f4362d84cc8e8 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 9 Dec 2025 12:38:15 +0100 Subject: [PATCH 1/5] Re-emit track_published events for already published tracks after connecting to SFU --- getstream/video/rtc/connection_manager.py | 27 +++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 26e6d0de..5a075297 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -322,6 +322,9 @@ async def _connect_internal( # Connect subscriber offer event to handle SDP negotiation self._ws_client.on_event("subscriber_offer", self._on_subscriber_offer) + # Re-emit the events so they can be subscribed to on the ConnectionManager + self._ws_client.on_wildcard("*", self.emit) + if hasattr(sfu_event, "join_response"): logger.debug(f"sfu join response: {sfu_event.join_response}") # Populate participants state with existing participants @@ -378,6 +381,10 @@ def _on_coordinator_task_done(task: asyncio.Task): self._coordinator_task.add_done_callback(_on_coordinator_task_done) await self._connect_internal() + # Re-publish the already published tracks because + # SFU doesn't send events for them. + await self._republish_existing_tracks() + async def wait(self): """ Wait until the connection is over. @@ -530,3 +537,23 @@ async def _restore_published_tracks(self): await self._peer_manager.restore_published_tracks() except Exception as e: logger.error("Failed to restore published tracks", exc_info=e) + + async def _republish_existing_tracks(self) -> None: + """ + Use the participants info from the SFU to re-emit the "track_published" + events for the already published tracks. + + It's needed because SFU does not send the events for those when the + agent joins after the user. + """ + + participants = self.participants_state.get_participants() + for participant in participants: + for track_type_int in participant.published_tracks: + event = events_pb2.TrackPublished( + user_id=participant.user_id, + session_id=participant.session_id, + participant=participant, + type=track_type_int, + ) + self._ws_client.emit("track_published", event) From d0ad24436cbdd3a5f6e243b56489f463285726fe Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 9 Dec 2025 12:57:02 +0100 Subject: [PATCH 2/5] Skip republishing for agent's tracks, error handling --- getstream/video/rtc/connection_manager.py | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 5a075297..88337c9b 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -542,13 +542,21 @@ async def _republish_existing_tracks(self) -> None: """ Use the participants info from the SFU to re-emit the "track_published" events for the already published tracks. - - It's needed because SFU does not send the events for those when the + + It's needed because SFU does not send the events for those when the agent joins after the user. """ + if not self._ws_client: + return None + participants = self.participants_state.get_participants() + for participant in participants: + # Skip the tracks belonging to this connection + if participant.session_id == self.session_id: + continue + for track_type_int in participant.published_tracks: event = events_pb2.TrackPublished( user_id=participant.user_id, @@ -556,4 +564,13 @@ async def _republish_existing_tracks(self) -> None: participant=participant, type=track_type_int, ) - self._ws_client.emit("track_published", event) + try: + self._ws_client.emit("track_published", event) + except Exception: + logger.exception( + f"Failed to emit track_published event " + f"for the already published " + f"track {participant.user_id}:{participant.session_id}:{track_type_int}" + ) + + return None From 2111c61b80996e60e93acbadddced9bd3baad904 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 9 Dec 2025 16:18:19 +0100 Subject: [PATCH 3/5] Emit event on ConnectionManager instead of `self._ws_client` --- getstream/video/rtc/connection_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 88337c9b..db32c5e7 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -565,7 +565,10 @@ async def _republish_existing_tracks(self) -> None: type=track_type_int, ) try: - self._ws_client.emit("track_published", event) + # Update track subscriptions first + await self._subscription_manager.handle_track_published(event) + # Emit the event downstream + self.emit(event) except Exception: logger.exception( f"Failed to emit track_published event " From d0439bba09adfcdbea6e3c30ada69caf124e5c16 Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 9 Dec 2025 16:32:33 +0100 Subject: [PATCH 4/5] Fix signature --- getstream/video/rtc/connection_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index db32c5e7..4c929d3a 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -568,7 +568,7 @@ async def _republish_existing_tracks(self) -> None: # Update track subscriptions first await self._subscription_manager.handle_track_published(event) # Emit the event downstream - self.emit(event) + self.emit("track_published", event) except Exception: logger.exception( f"Failed to emit track_published event " From e4c5bcc9cf39c909c94662c27df14967bcf403bb Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Wed, 10 Dec 2025 14:30:33 +0100 Subject: [PATCH 5/5] Move republishing logic to `ConnectionManager.republish_tracks` --- getstream/video/rtc/connection_manager.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 4c929d3a..6d6c9756 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -381,10 +381,6 @@ def _on_coordinator_task_done(task: asyncio.Task): self._coordinator_task.add_done_callback(_on_coordinator_task_done) await self._connect_internal() - # Re-publish the already published tracks because - # SFU doesn't send events for them. - await self._republish_existing_tracks() - async def wait(self): """ Wait until the connection is over. @@ -538,12 +534,12 @@ async def _restore_published_tracks(self): except Exception as e: logger.error("Failed to restore published tracks", exc_info=e) - async def _republish_existing_tracks(self) -> None: + async def republish_tracks(self) -> None: """ Use the participants info from the SFU to re-emit the "track_published" events for the already published tracks. - It's needed because SFU does not send the events for those when the + It's needed because SFU does not send the events for the already present tracks when the agent joins after the user. """