@@ -354,47 +354,53 @@ async def _handle_data_stream(
354354 comp_id: Id of the requested component.
355355 category: The category of the component.
356356 """
357- stream_senders = []
358- if comp_id in self ._req_streaming_metrics :
359- await self ._check_requested_component_and_metrics (
360- comp_id , category , self ._req_streaming_metrics [comp_id ]
357+ try :
358+ stream_senders = []
359+ if comp_id in self ._req_streaming_metrics :
360+ await self ._check_requested_component_and_metrics (
361+ comp_id , category , self ._req_streaming_metrics [comp_id ]
362+ )
363+ stream_senders = self ._get_metric_senders (
364+ category , self ._req_streaming_metrics [comp_id ]
365+ )
366+ api_data_receiver : Receiver [Any ] = self .comp_data_receivers [comp_id ]
367+
368+ senders_done : asyncio .Event = asyncio .Event ()
369+ pending_messages = 0
370+
371+ def process_msg (data : Any ) -> None :
372+ tasks = []
373+ for extractor , senders in stream_senders :
374+ for sender in senders :
375+ tasks .append (
376+ sender .send (
377+ Sample (data .timestamp , Quantity (extractor (data )))
378+ )
379+ )
380+ asyncio .gather (* tasks )
381+ nonlocal pending_messages
382+ pending_messages -= 1
383+ if pending_messages == 0 :
384+ senders_done .set ()
385+
386+ async for data in api_data_receiver :
387+ pending_messages += 1
388+ senders_done .clear ()
389+ process_msg (data )
390+
391+ while pending_messages > 0 :
392+ await senders_done .wait ()
393+
394+ await asyncio .gather (
395+ * [
396+ self ._registry .close_and_remove (r .get_channel_name ())
397+ for requests in self ._req_streaming_metrics [comp_id ].values ()
398+ for r in requests
399+ ]
361400 )
362- stream_senders = self ._get_metric_senders (
363- category , self ._req_streaming_metrics [comp_id ]
364- )
365- api_data_receiver : Receiver [Any ] = self .comp_data_receivers [comp_id ]
366-
367- senders_done : asyncio .Event = asyncio .Event ()
368- pending_messages = 0
369-
370- def process_msg (data : Any ) -> None :
371- tasks = []
372- for extractor , senders in stream_senders :
373- for sender in senders :
374- tasks .append (
375- sender .send (Sample (data .timestamp , Quantity (extractor (data ))))
376- )
377- asyncio .gather (* tasks )
378- nonlocal pending_messages
379- pending_messages -= 1
380- if pending_messages == 0 :
381- senders_done .set ()
382-
383- async for data in api_data_receiver :
384- pending_messages += 1
385- senders_done .clear ()
386- process_msg (data )
387-
388- while pending_messages > 0 :
389- await senders_done .wait ()
390-
391- await asyncio .gather (
392- * [
393- self ._registry .close_and_remove (r .get_channel_name ())
394- for requests in self ._req_streaming_metrics [comp_id ].values ()
395- for r in requests
396- ]
397- )
401+ except Exception as exc :
402+ _logger .exception ("Error while streaming data for component %d" , comp_id )
403+ raise exc
398404
399405 async def _update_streams (
400406 self ,
0 commit comments