Skip to content

Commit 4c3ec4e

Browse files
committed
WIP Quantity ABC
1 parent 5eb5aa6 commit 4c3ec4e

32 files changed

+809
-564
lines changed

src/frequenz/sdk/actor/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -592,13 +592,12 @@ async def main() -> None: # (6)!
592592
[_run]: #the-_run-method
593593
"""
594594

595-
from ..timeseries._resampling import ResamplerConfig
596595
from ._actor import Actor
597596
from ._background_service import BackgroundService
598597
from ._channel_registry import ChannelRegistry
599598
from ._config_managing import ConfigManagingActor
600599
from ._data_sourcing import ComponentMetricRequest, DataSourcingActor
601-
from ._resampling import ComponentMetricsResamplingActor
600+
from ._resampling import ComponentMetricsResamplingActor, ResamplingActorConfig
602601
from ._run_utils import run
603602

604603
__all__ = [
@@ -609,6 +608,6 @@ async def main() -> None: # (6)!
609608
"ComponentMetricsResamplingActor",
610609
"ConfigManagingActor",
611610
"DataSourcingActor",
612-
"ResamplerConfig",
611+
"ResamplingActorConfig",
613612
"run",
614613
]

src/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
MeterData,
2121
)
2222
from ...timeseries import Sample
23-
from ...timeseries._quantities import Quantity
2423
from .._channel_registry import ChannelRegistry
2524
from ._component_metric_request import ComponentMetricRequest
2625

@@ -318,7 +317,7 @@ def _get_metric_senders(
318317
self,
319318
category: ComponentCategory,
320319
requests: dict[ComponentMetricId, list[ComponentMetricRequest]],
321-
) -> list[tuple[Callable[[Any], float], list[Sender[Sample[Quantity]]]]]:
320+
) -> list[tuple[Callable[[Any], float], list[Sender[Sample[float]]]]]:
322321
"""Get channel senders from the channel registry for each requested metric.
323322
324323
Args:
@@ -335,7 +334,7 @@ def _get_metric_senders(
335334
self._get_data_extraction_method(category, metric),
336335
[
337336
self._registry.get_or_create(
338-
Sample[Quantity], request.get_channel_name()
337+
Sample[float], request.get_channel_name()
339338
).new_sender()
340339
for request in req_list
341340
],
@@ -353,6 +352,9 @@ async def _handle_data_stream(
353352
Args:
354353
comp_id: Id of the requested component.
355354
category: The category of the component.
355+
356+
Raises:
357+
Exception: if an error occurs while streaming data.
356358
"""
357359
try:
358360
stream_senders = []
@@ -373,9 +375,7 @@ def process_msg(data: Any) -> None:
373375
for extractor, senders in stream_senders:
374376
for sender in senders:
375377
tasks.append(
376-
sender.send(
377-
Sample(data.timestamp, Quantity(extractor(data)))
378-
)
378+
sender.send(Sample(data.timestamp, extractor(data)))
379379
)
380380
asyncio.gather(*tasks)
381381
nonlocal pending_messages

src/frequenz/sdk/actor/_resampling.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
import asyncio
88
import dataclasses
99
import logging
10+
from typing import Callable
1011

1112
from frequenz.channels import Receiver, Sender
1213

1314
from .._internal._asyncio import cancel_and_await
14-
from ..timeseries import Sample
15-
from ..timeseries._quantities import Quantity
15+
from ..timeseries._base_types import Sample
1616
from ..timeseries._resampling import Resampler, ResamplerConfig, ResamplingError
1717
from ._actor import Actor
1818
from ._channel_registry import ChannelRegistry
@@ -21,6 +21,17 @@
2121
_logger = logging.getLogger(__name__)
2222

2323

24+
# We need to use the dataclass decorator again because we are making a required
25+
# attribute optional, so we need the dataclass to re-generate the constructor with the
26+
# new signature.
27+
@dataclasses.dataclass(frozen=True)
28+
class ResamplingActorConfig(ResamplerConfig[float]):
29+
"""Configuration for the resampling actor."""
30+
31+
value_constructor: Callable[[float], float] = float
32+
"""The constructor to use to create new sample values."""
33+
34+
2435
class ComponentMetricsResamplingActor(Actor):
2536
"""An actor to resample microgrid component metrics."""
2637

@@ -30,7 +41,7 @@ def __init__( # pylint: disable=too-many-arguments
3041
channel_registry: ChannelRegistry,
3142
data_sourcing_request_sender: Sender[ComponentMetricRequest],
3243
resampling_request_receiver: Receiver[ComponentMetricRequest],
33-
config: ResamplerConfig,
44+
config: ResamplingActorConfig,
3445
name: str | None = None,
3546
) -> None:
3647
"""Initialize an instance.
@@ -49,13 +60,15 @@ def __init__( # pylint: disable=too-many-arguments
4960
"""
5061
super().__init__(name=name)
5162
self._channel_registry: ChannelRegistry = channel_registry
63+
5264
self._data_sourcing_request_sender: Sender[ComponentMetricRequest] = (
5365
data_sourcing_request_sender
5466
)
5567
self._resampling_request_receiver: Receiver[ComponentMetricRequest] = (
5668
resampling_request_receiver
5769
)
58-
self._resampler: Resampler = Resampler(config)
70+
self._resampler: Resampler[float] = Resampler(config)
71+
5972
self._active_req_channels: set[str] = set()
6073

6174
async def _subscribe(self, request: ComponentMetricRequest) -> None:
@@ -78,19 +91,16 @@ async def _subscribe(self, request: ComponentMetricRequest) -> None:
7891
data_source_channel_name = data_source_request.get_channel_name()
7992
await self._data_sourcing_request_sender.send(data_source_request)
8093
receiver = self._channel_registry.get_or_create(
81-
Sample[Quantity], data_source_channel_name
94+
Sample[float], data_source_channel_name
8295
).new_receiver()
8396

8497
# This is a temporary hack until the Sender implementation uses
8598
# exceptions to report errors.
8699
sender = self._channel_registry.get_or_create(
87-
Sample[Quantity], request_channel_name
100+
Sample[float], request_channel_name
88101
).new_sender()
89102

90-
async def sink_adapter(sample: Sample[Quantity]) -> None:
91-
await sender.send(sample)
92-
93-
self._resampler.add_timeseries(request_channel_name, receiver, sink_adapter)
103+
self._resampler.add_timeseries(request_channel_name, receiver, sender.send)
94104

95105
async def _process_resampling_requests(self) -> None:
96106
"""Process resampling data requests."""
@@ -113,8 +123,6 @@ async def _run(self) -> None:
113123
Raises:
114124
RuntimeError: If there is some unexpected error while resampling or
115125
handling requests.
116-
117-
[//]: # (# noqa: DAR401 error)
118126
"""
119127
tasks_to_cancel: set[asyncio.Task[None]] = set()
120128
try:

src/frequenz/sdk/microgrid/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,8 @@
121121
to limit the charge power of individual EV Chargers.
122122
""" # noqa: D205, D400
123123

124-
from ..actor import ResamplerConfig
124+
import typing
125+
125126
from . import _data_pipeline, client, component, connection_manager, metadata
126127
from ._data_pipeline import (
127128
battery_pool,
@@ -134,8 +135,13 @@
134135
voltage,
135136
)
136137

138+
if typing.TYPE_CHECKING:
139+
from ..actor._resampling import ResamplingActorConfig
140+
137141

138-
async def initialize(host: str, port: int, resampler_config: ResamplerConfig) -> None:
142+
async def initialize(
143+
host: str, port: int, resampler_config: "ResamplingActorConfig"
144+
) -> None:
139145
"""Initialize the microgrid connection manager and the data pipeline.
140146
141147
Args:

src/frequenz/sdk/microgrid/_data_pipeline.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
#
3636
# pylint: disable=import-outside-toplevel
3737
if typing.TYPE_CHECKING:
38-
from ..actor import ComponentMetricRequest, ResamplerConfig, _power_managing
38+
from ..actor import ComponentMetricRequest, ResamplingActorConfig, _power_managing
3939
from ..actor.power_distributing import ( # noqa: F401 (imports used by string type hints)
4040
ComponentPoolStatus,
4141
PowerDistributingActor,
@@ -81,7 +81,7 @@ class _DataPipeline: # pylint: disable=too-many-instance-attributes
8181

8282
def __init__(
8383
self,
84-
resampler_config: ResamplerConfig,
84+
resampler_config: ResamplingActorConfig,
8585
) -> None:
8686
"""Create a `DataPipeline` instance.
8787
@@ -90,7 +90,7 @@ def __init__(
9090
"""
9191
from ..actor import ChannelRegistry
9292

93-
self._resampler_config: ResamplerConfig = resampler_config
93+
self._resampler_config: ResamplingActorConfig = resampler_config
9494

9595
self._channel_registry: ChannelRegistry = ChannelRegistry(
9696
name="Data Pipeline Registry"
@@ -408,7 +408,7 @@ async def _stop(self) -> None:
408408
_DATA_PIPELINE: _DataPipeline | None = None
409409

410410

411-
async def initialize(resampler_config: ResamplerConfig) -> None:
411+
async def initialize(resampler_config: ResamplingActorConfig) -> None:
412412
"""Initialize a `DataPipeline` instance.
413413
414414
Args:

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,19 @@
99
from collections.abc import Callable, Iterator
1010
from dataclasses import dataclass
1111
from datetime import datetime, timezone
12-
from typing import Generic, Self, TypeVar, overload
12+
from typing import Generic, Self, SupportsFloat, TypeVar, overload
1313

14-
from ._quantities import Power, QuantityT
14+
from ._quantities import Power
15+
16+
SupportsFloatT = TypeVar("SupportsFloatT", bound=SupportsFloat)
17+
"""Type variable for types that support conversion to float."""
1518

1619
UNIX_EPOCH = datetime.fromtimestamp(0.0, tz=timezone.utc)
1720
"""The UNIX epoch (in UTC)."""
1821

1922

2023
@dataclass(frozen=True, order=True)
21-
class Sample(Generic[QuantityT]):
24+
class Sample(Generic[SupportsFloatT]):
2225
"""A measurement taken at a particular point in time.
2326
2427
The `value` could be `None` if a component is malfunctioning or data is
@@ -29,12 +32,12 @@ class Sample(Generic[QuantityT]):
2932
timestamp: datetime
3033
"""The time when this sample was generated."""
3134

32-
value: QuantityT | None = None
35+
value: SupportsFloatT | None = None
3336
"""The value of this sample."""
3437

3538

3639
@dataclass(frozen=True)
37-
class Sample3Phase(Generic[QuantityT]):
40+
class Sample3Phase(Generic[SupportsFloatT]):
3841
"""A 3-phase measurement made at a particular point in time.
3942
4043
Each of the `value` fields could be `None` if a component is malfunctioning
@@ -45,16 +48,16 @@ class Sample3Phase(Generic[QuantityT]):
4548

4649
timestamp: datetime
4750
"""The time when this sample was generated."""
48-
value_p1: QuantityT | None
51+
value_p1: SupportsFloatT | None
4952
"""The value of the 1st phase in this sample."""
5053

51-
value_p2: QuantityT | None
54+
value_p2: SupportsFloatT | None
5255
"""The value of the 2nd phase in this sample."""
5356

54-
value_p3: QuantityT | None
57+
value_p3: SupportsFloatT | None
5558
"""The value of the 3rd phase in this sample."""
5659

57-
def __iter__(self) -> Iterator[QuantityT | None]:
60+
def __iter__(self) -> Iterator[SupportsFloatT | None]:
5861
"""Return an iterator that yields values from each of the phases.
5962
6063
Yields:
@@ -65,12 +68,12 @@ def __iter__(self) -> Iterator[QuantityT | None]:
6568
yield self.value_p3
6669

6770
@overload
68-
def max(self, default: QuantityT) -> QuantityT: ...
71+
def max(self, default: SupportsFloatT) -> SupportsFloatT: ...
6972

7073
@overload
71-
def max(self, default: None = None) -> QuantityT | None: ...
74+
def max(self, default: None = None) -> SupportsFloatT | None: ...
7275

73-
def max(self, default: QuantityT | None = None) -> QuantityT | None:
76+
def max(self, default: SupportsFloatT | None = None) -> SupportsFloatT | None:
7477
"""Return the max value among all phases, or default if they are all `None`.
7578
7679
Args:
@@ -81,19 +84,19 @@ def max(self, default: QuantityT | None = None) -> QuantityT | None:
8184
"""
8285
if not any(self):
8386
return default
84-
value: QuantityT = functools.reduce(
85-
lambda x, y: x if x > y else y,
87+
value: SupportsFloatT = functools.reduce(
88+
lambda x, y: x if float(x) > float(y) else y,
8689
filter(None, self),
8790
)
8891
return value
8992

9093
@overload
91-
def min(self, default: QuantityT) -> QuantityT: ...
94+
def min(self, default: SupportsFloatT) -> SupportsFloatT: ...
9295

9396
@overload
94-
def min(self, default: None = None) -> QuantityT | None: ...
97+
def min(self, default: None = None) -> SupportsFloatT | None: ...
9598

96-
def min(self, default: QuantityT | None = None) -> QuantityT | None:
99+
def min(self, default: SupportsFloatT | None = None) -> SupportsFloatT | None:
97100
"""Return the min value among all phases, or default if they are all `None`.
98101
99102
Args:
@@ -104,16 +107,16 @@ def min(self, default: QuantityT | None = None) -> QuantityT | None:
104107
"""
105108
if not any(self):
106109
return default
107-
value: QuantityT = functools.reduce(
108-
lambda x, y: x if x < y else y,
110+
value: SupportsFloatT = functools.reduce(
111+
lambda x, y: x if float(x) < float(y) else y,
109112
filter(None, self),
110113
)
111114
return value
112115

113116
def map(
114117
self,
115-
function: Callable[[QuantityT], QuantityT],
116-
default: QuantityT | None = None,
118+
function: Callable[[SupportsFloatT], SupportsFloatT],
119+
default: SupportsFloatT | None = None,
117120
) -> Self:
118121
"""Apply the given function on each of the phase values and return the result.
119122

src/frequenz/sdk/timeseries/_grid_frequency.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import asyncio
99
import logging
10+
import math
1011
from typing import TYPE_CHECKING
1112

1213
from frequenz.channels import Receiver, Sender
@@ -15,7 +16,7 @@
1516
from ..microgrid import connection_manager
1617
from ..microgrid.component import Component, ComponentCategory, ComponentMetricId
1718
from ..timeseries._base_types import Sample
18-
from ..timeseries._quantities import Frequency, Quantity
19+
from ..timeseries._quantities import Frequency
1920

2021
if TYPE_CHECKING:
2122
# Imported here to avoid a circular import.
@@ -96,7 +97,7 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]:
9697
A receiver that will receive grid frequency samples.
9798
"""
9899
receiver = self._channel_registry.get_or_create(
99-
Sample[Quantity], self._component_metric_request.get_channel_name()
100+
Sample[float], self._component_metric_request.get_channel_name()
100101
).new_receiver()
101102

102103
if not self._task:
@@ -109,10 +110,8 @@ def new_receiver(self) -> Receiver[Sample[Frequency]]:
109110
return receiver.map(
110111
lambda sample: (
111112
Sample[Frequency](sample.timestamp, None)
112-
if sample.value is None or sample.value.isnan()
113-
else Sample(
114-
sample.timestamp, Frequency.from_hertz(sample.value.base_value)
115-
)
113+
if sample.value is None or math.isnan(sample.value)
114+
else Sample(sample.timestamp, Frequency.from_hertz(float(sample.value)))
116115
)
117116
)
118117

0 commit comments

Comments
 (0)