Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/openstef-models/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ classifiers = [
]

dependencies = [
"filterpy>=1.4.5",
"openstef-core",
"pycountry>=24.6.1",
"pytest>=8.4.2",
"sktime>=0.39.0",
]

optional-dependencies.all = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from datetime import timedelta
from decimal import Decimal
from typing import Literal
from typing import TYPE_CHECKING, Literal

from pydantic import Field
from pydantic_extra_types.coordinate import Coordinate, Latitude, Longitude
Expand All @@ -23,7 +23,7 @@
R2Provider,
)
from openstef_core.base_model import BaseConfig
from openstef_core.mixins import TransformPipeline
from openstef_core.mixins import Transform, TransformPipeline
from openstef_core.types import LeadTime, Q, Quantile, QuantileOrGlobal
from openstef_models.integrations.mlflow import MLFlowStorage, MLFlowStorageCallback
from openstef_models.mixins import ModelIdentifier
Expand All @@ -32,7 +32,13 @@
from openstef_models.models.forecasting.gblinear_forecaster import GBLinearForecaster
from openstef_models.models.forecasting.xgboost_forecaster import XGBoostForecaster
from openstef_models.transforms.energy_domain import WindPowerFeatureAdder
from openstef_models.transforms.general import Clipper, EmptyFeatureRemover, Imputer, SampleWeighter, Scaler
from openstef_models.transforms.general import (
Clipper,
EmptyFeatureRemover,
Imputer,
SampleWeighter,
Scaler,
)
from openstef_models.transforms.postprocessing import ConfidenceIntervalApplicator, QuantileSorter
from openstef_models.transforms.time_domain import (
CyclicFeaturesAdder,
Expand All @@ -49,6 +55,9 @@
from openstef_models.utils.feature_selection import Exclude, FeatureSelection, Include
from openstef_models.workflows.custom_forecasting_workflow import CustomForecastingWorkflow, ForecastingCallback

if TYPE_CHECKING:
from openstef_core.datasets import ForecastDataset


class LocationConfig(BaseConfig):
"""Configuration for location information in forecasting workflows."""
Expand Down Expand Up @@ -145,6 +154,31 @@ class ForecastingWorkflowConfig(BaseConfig): # PredictionJob
default=[],
description="If not None, rolling aggregate(s) of load will be used as features in the model.",
)
kalman_features: FeatureSelection = Field(
default=FeatureSelection(
include={
"temperature_2m",
"relative_humidity_2m",
"surface_pressure",
"cloud_cover",
"wind_speed_10m",
"wind_speed_80m",
"shortwave_radiation",
"direct_radiation",
"diffuse_radiation",
"direct_normal_irradiance",
"dni",
"gti",
"saturation_vapour_pressure",
"vapour_pressure",
"dewpoint",
"air_density",
"windspeed_hub_height",
},
exclude=None,
),
description="Feature selection for which features to apply Kalman smoothing.",
)
clip_features: FeatureSelection = Field(
default=FeatureSelection(include=None, exclude=None),
description="Feature selection for which features to clip.",
Expand All @@ -159,7 +193,6 @@ class ForecastingWorkflowConfig(BaseConfig): # PredictionJob
default=0.1,
description="Minimum weight value to ensure all samples contribute to training.",
)

# Data splitting strategy
data_splitter: DataSplitter = Field(
default_factory=DataSplitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@
EmptyFeatureRemover,
)
from openstef_models.transforms.general.imputer import Imputer
from openstef_models.transforms.general.kalman_filter import (
BaseKalman,
KalmanPostprocessor,
KalmanPreprocessor,
)
from openstef_models.transforms.general.sample_weighter import SampleWeighter
from openstef_models.transforms.general.scaler import Scaler

__all__ = [
"BaseKalman",
"Clipper",
"DimensionalityReducer",
"EmptyFeatureRemover",
"Imputer",
"KalmanPostprocessor",
"KalmanPreprocessor",
"SampleWeighter",
"Scaler",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <short.term.energy.forecasts@alliander.com>
#
# SPDX-License-Identifier: MPL-2.0

"""Kalman Filter Transforms for Time Series Data pre and post-processing.

This class provides implementations of Kalman Smoothing as both a preprocessor
and postprocessor for time series datasets. The Kalman Smoother helps reduce noise
"""

from collections.abc import Iterable
from typing import override

import pandas as pd
from pydantic import Field
from sktime.transformations.series.kalman_filter import (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this as a dependency? I don't think we have it already.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added as dependency of openstef models

KalmanFilterTransformerFP,
)

from openstef_core.base_model import BaseConfig
from openstef_core.datasets import ForecastDataset, TimeSeriesDataset
from openstef_core.mixins import Transform
from openstef_core.transforms import TimeSeriesTransform
from openstef_models.utils.feature_selection import FeatureSelection


class BaseKalman(BaseConfig):
"""Base class for Kalman Smoothing transforms."""

selection: FeatureSelection = Field(default=FeatureSelection.ALL, description="Columns to smooth")
state_dim: int = Field(
default=1,
description="Kalman filter state dimension (1 = per-column independent)",
)

@staticmethod
def _run_kalman_filter(df: pd.DataFrame, features: Iterable[str]) -> pd.DataFrame:
features_list = list(features)
if not features_list:
return df

kf = KalmanFilterTransformerFP(state_dim=len(features_list))
out = df.copy(deep=True)
out[features_list] = kf.fit_transform(X=df[features_list]) # type: ignore[assignment]
return out


class KalmanPreprocessor(BaseKalman, TimeSeriesTransform):
"""Apply Kalman Smoothing to time series data to reduce noise and improve temporal consistency.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific source you used for the implementation? If so, I think it is nice to add it to the docs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation was done to mimic other transforms. No additional sources were used


Example:
>>> from datetime import timedelta
>>> import pandas as pd
>>> from openstef_core.testing import create_timeseries_dataset
>>> from openstef_models.transforms.general import KalmanPreprocessor
>>> dataset = create_timeseries_dataset(
... index=pd.date_range("2025-01-01", periods=5, freq="1h"),
... load=[10.0, 50.0, 100.0, 200.0, 150.0],
... sample_interval=timedelta(hours=1),
... )
>>> transform = KalmanPreprocessor()
>>> result = transform.fit_transform(dataset)
>>> result.data
load
timestamp
2025-01-01 00:00:00 6.666667
2025-01-01 01:00:00 33.750000
2025-01-01 02:00:00 74.761905
2025-01-01 03:00:00 152.181818
2025-01-01 04:00:00 150.833333
"""

@property
@override
def is_fitted(self) -> bool:
return True

@override
def fit(self, data: TimeSeriesDataset) -> None:
# stateless: nothing to do
return None

@override
def transform(self, data: TimeSeriesDataset) -> TimeSeriesDataset:
features: list[str] = list(self.selection.resolve(data.feature_names))
# restrict to numeric
numeric = data.data.select_dtypes(include=["number"]).columns.tolist()
features = [f for f in features if f in numeric]
if not features:
return data
df = data.data.copy(deep=True)
df_filtered = self._run_kalman_filter(df, features)
return data.copy_with(data=df_filtered, is_sorted=True)

@override
def features_added(self) -> list[str]:
# Preprocessor doesn't add columns
return []

class KalmanPostprocessor(BaseKalman, Transform[ForecastDataset, ForecastDataset]):
"""Apply Kalman Smoothing to quantile forecasts to reduce noise and improve temporal consistency.

Example:
>>> from datetime import timedelta
>>> import pandas as pd
>>> import numpy as np
>>> from openstef_core.datasets.validated_datasets import ForecastDataset
>>> from openstef_models.transforms.general import KalmanPostprocessor
>>> forecast_data = pd.DataFrame({
... 'load': [100, np.nan],
... 'quantile_P10': [90, 95],
... 'quantile_P50': [100, 110],
... 'quantile_P90': [115, 125]
... }, index=pd.date_range('2025-01-01', periods=2, freq='h'))
>>> dataset = ForecastDataset(forecast_data, timedelta(hours=1))
>>> transform = KalmanPostprocessor()
>>> result = transform.fit_transform(dataset)
>>> result.data
load quantile_P10 quantile_P50 quantile_P90
timestamp
2025-01-01 00:00:00 100.0 60.000 66.666667 76.666667
2025-01-01 01:00:00 NaN 81.875 93.750000 106.875000
"""

monotonic: bool = Field(
default=True,
description="Enforce non-crossing quantiles after smoothing",
)

@property
@override
def is_fitted(self) -> bool:
return True

@override
def fit(self, data: ForecastDataset) -> None:
return None

@override
def transform(self, data: ForecastDataset) -> ForecastDataset:
quantile_columns = [q.format() for q in sorted(data.quantiles)]
df = data.data.copy(deep=True)
df_filtered = self._run_kalman_filter(df, quantile_columns)
return ForecastDataset.from_timeseries(data.copy_with(data=df_filtered, is_sorted=True))


__all__ = ["BaseKalman", "KalmanPostprocessor", "KalmanPreprocessor"]
Loading
Loading