From 4e56b941c3b57fcf45f687852ff72b9eec813d94 Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Mon, 29 Sep 2025 14:46:23 +0200 Subject: [PATCH 01/12] add read_with_horizon method --- swvo/io/kp/ensemble.py | 159 +++++++++++++++++++++++++++++++++++------ 1 file changed, 138 insertions(+), 21 deletions(-) diff --git a/swvo/io/kp/ensemble.py b/swvo/io/kp/ensemble.py index f4bd61a..0625ba5 100755 --- a/swvo/io/kp/ensemble.py +++ b/swvo/io/kp/ensemble.py @@ -11,7 +11,7 @@ import warnings from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Optional +from typing import Iterable, Optional import numpy as np import pandas as pd @@ -91,29 +91,10 @@ def read(self, start_time: datetime, end_time: datetime) -> list: start_time = start_time.replace(microsecond=0, minute=0, second=0) str_date = start_time.strftime("%Y%m%dT%H0000") - file_list_old_name = sorted( - self.data_dir.glob(f"FORECAST_PAGER_SWIFT_swift_{str_date}_ensemble_*.csv"), - key=lambda x: int(x.stem.split("_")[-1]), - ) + file_list = self.ensemble_file_list(str_date) - file_list_new_name = sorted( - self.data_dir.glob(f"FORECAST_Kp_{str_date}_ensemble_*.csv"), - key=lambda x: int(x.stem.split("_")[-1]), - ) data = [] - if len(file_list_new_name) == 0 and len(file_list_old_name) == 0: - file_list = [] - elif len(file_list_new_name) > 0: - file_list = file_list_new_name - elif len(file_list_old_name) > 0: - warnings.warn( - "The use of FORECAST_PAGER_SWIFT_swift_* files is deprecated. However since we still have these files from the PAGER project with this prefix, this will be supported", - DeprecationWarning, - stacklevel=2, - ) - file_list = file_list_old_name - if len(file_list) == 0: msg = f"No ensemble files found for requested date {str_date}" warnings.warn(f"{msg}! Returning NaNs dataframe.") @@ -156,3 +137,139 @@ def read(self, start_time: datetime, end_time: datetime) -> list: data.append(df) return data + + def ensemble_file_list(self, str_date: str) -> list[Path]: + """Check for the existence of ensemble files for a given date. + + Parameters + ---------- + str_date : str + Date string in the format YYYYMMDDTHH0000. + + Returns + ------- + list[Path] + A list of file paths for the ensemble files. + + Warnings + -------- + DeprecationWarning + Warns if deprecated file naming convention is used. + """ + + file_list_old_name = sorted( + self.data_dir.glob(f"FORECAST_PAGER_SWIFT_swift_{str_date}_ensemble_*.csv"), + key=lambda x: int(x.stem.split("_")[-1]), + ) + + file_list_new_name = sorted( + self.data_dir.glob(f"FORECAST_Kp_{str_date}_ensemble_*.csv"), + key=lambda x: int(x.stem.split("_")[-1]), + ) + + file_list: list[Path] + + if len(file_list_new_name) == 0 and len(file_list_old_name) == 0: + file_list = [] + elif len(file_list_new_name) > 0: + file_list = file_list_new_name + elif len(file_list_old_name) > 0: + warnings.warn( + "The use of FORECAST_PAGER_SWIFT_swift_* files is deprecated. However since we still have these files from the PAGER project with this prefix, this will be supported", + DeprecationWarning, + stacklevel=2, + ) + file_list = file_list_old_name + return file_list + + def read_with_horizon(self, start: datetime, end: datetime, horizon: int) -> list[pd.DataFrame]: + """Read Ensemble Kp forecast data for a given time range and forecast horizon. + + Parameters + ---------- + start_time : datetime + Start time of the period for which to read the data. + end_time : datetime + End time of the period for which to read the data. + horizon : int + Forecast horizon (in hours). + + Returns + ------- + list[:class:`pandas.DataFrame`] + A list of data frames containing ensemble data for the requested period. + """ + start = start.replace(tzinfo=timezone.utc) + end = end.replace(tzinfo=timezone.utc) + + align_start_to_3hr = start.replace(hour=(start.hour // 3) * 3, minute=0, second=0, microsecond=0) + align_end_to_3hr = end.replace(hour=(end.hour // 3) * 3, minute=0, second=0, microsecond=0) + + full_time_range = pd.date_range(align_start_to_3hr, align_end_to_3hr, freq="3h", tz=timezone.utc) + + file_offsets, time_indices = self._get_file_offsets_and_time_indices(full_time_range, horizon) + + max_ensembles = 30 # Maximum number of ensemble files to check + ensemble_dfs = [pd.DataFrame(index=full_time_range) for _ in range(max_ensembles)] + + for file_time, file_offset, iloc in zip(full_time_range, file_offsets, time_indices): + str_date = (file_time + timedelta(hours=file_offset)).strftime("%Y%m%dT%H0000") + file_list_new_name = sorted( + self.data_dir.glob(f"FORECAST_Kp_{str_date}_ensemble_*.csv"), + key=lambda x: int(x.stem.split("_")[-1]), + ) + + for ensemble_idx in range(max_ensembles): + df = ensemble_dfs[ensemble_idx] + if ensemble_idx < len(file_list_new_name): + data = pd.read_csv( + file_list_new_name[ensemble_idx], + names=["Forecast Time", "kp"], + parse_dates=["Forecast Time"], + ).iloc[iloc] + data["source"] = str_date + data["Forecast Time"] = data["Forecast Time"].tz_localize("UTC") + df.loc[file_time, "Forecast Time"] = data["Forecast Time"] + df.loc[file_time, "kp"] = data["kp"] + df.loc[file_time, "source"] = file_list_new_name[ensemble_idx].stem + else: + df.loc[file_time, "kp"] = np.nan + + for df in ensemble_dfs: + df["horizon"] = horizon + df.index.name = "Time" + ensemble_dfs = [df for df in ensemble_dfs if not df["kp"].isna().all()] + + return ensemble_dfs + + def _get_file_offsets_and_time_indices( + self, file_time_range: Iterable, forecast_horizon: int + ) -> tuple[list[int], list[int]]: + """ + Compute file offsets and time indices for a given forecast horizon. + + Parameters + ---------- + file_time_range : iterable + Available file time steps. + forecast_horizon : int + Forecast horizon (in hours or chosen unit). + + Returns + ------- + file_offsets : list[int] + Offsets from the given time steps indicating which files to read. + time_indices : list[int] + Time indices to use for each file. + """ + file_offsets = [] + time_indices = [] + + for _ in file_time_range: + file_offset = -(forecast_horizon % 3) + file_offsets.append(file_offset) + + time_index = (forecast_horizon + 2) // 3 + time_indices.append(time_index) + + return file_offsets, time_indices From 38f8122d8cfdaf46e164ef030b1df1d9671872be Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Mon, 29 Sep 2025 15:24:11 +0200 Subject: [PATCH 02/12] fix var names and docstrings --- swvo/io/kp/ensemble.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/swvo/io/kp/ensemble.py b/swvo/io/kp/ensemble.py index 0625ba5..01a1603 100755 --- a/swvo/io/kp/ensemble.py +++ b/swvo/io/kp/ensemble.py @@ -182,7 +182,7 @@ def ensemble_file_list(self, str_date: str) -> list[Path]: file_list = file_list_old_name return file_list - def read_with_horizon(self, start: datetime, end: datetime, horizon: int) -> list[pd.DataFrame]: + def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: int) -> list[pd.DataFrame]: """Read Ensemble Kp forecast data for a given time range and forecast horizon. Parameters @@ -199,11 +199,13 @@ def read_with_horizon(self, start: datetime, end: datetime, horizon: int) -> lis list[:class:`pandas.DataFrame`] A list of data frames containing ensemble data for the requested period. """ - start = start.replace(tzinfo=timezone.utc) - end = end.replace(tzinfo=timezone.utc) + if start_time is not None and not start_time.tzinfo: + start_time = start_time.replace(tzinfo=timezone.utc) + if end_time is not None and not end_time.tzinfo: + end_time = end_time.replace(tzinfo=timezone.utc) - align_start_to_3hr = start.replace(hour=(start.hour // 3) * 3, minute=0, second=0, microsecond=0) - align_end_to_3hr = end.replace(hour=(end.hour // 3) * 3, minute=0, second=0, microsecond=0) + align_start_to_3hr = start_time.replace(hour=(start_time.hour // 3) * 3, minute=0, second=0, microsecond=0) + align_end_to_3hr = end_time.replace(hour=(end_time.hour // 3) * 3, minute=0, second=0, microsecond=0) full_time_range = pd.date_range(align_start_to_3hr, align_end_to_3hr, freq="3h", tz=timezone.utc) From 06fd10055bd276615fcc88bb15bde54d168f2e7b Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Mon, 29 Sep 2025 15:24:24 +0200 Subject: [PATCH 03/12] add tests --- tests/io/kp/test_kp_ensemble.py | 119 ++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/tests/io/kp/test_kp_ensemble.py b/tests/io/kp/test_kp_ensemble.py index e035f6e..d055ce2 100644 --- a/tests/io/kp/test_kp_ensemble.py +++ b/tests/io/kp/test_kp_ensemble.py @@ -107,3 +107,122 @@ def test_read_with_default_times(self, kp_ensemble_instance): assert isinstance(data[0], pd.DataFrame) assert all("kp" in i.columns for i in data) assert not data[0].empty + + def make_csv_file(self, path, filename, times, values): + """Helper to create a CSV file with time-value pairs.""" + df = pd.DataFrame({"Forecast Time": times, "kp": values}) + file = path / filename + df.to_csv(file, header=False, index=False) + return file + + def test_read_with_horizon_single_file(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=3) + horizon = 3 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq="3h") + values = np.arange(10) + + self.make_csv_file( + tmp_path, + f"FORECAST_Kp_{str_date}_ensemble_0.csv", + times, + values, + ) + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + + assert isinstance(result, list) + assert all(isinstance(df, pd.DataFrame) for df in result) + assert not result[0]["kp"].isna().all() + assert (result[0]["horizon"] == horizon).all() + assert "Forecast Time" in result[0].columns + assert "kp" in result[0].columns + assert "source" in result[0].columns + + def test_read_with_horizon_multiple_ensembles(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=6) + horizon = 6 + + str_date = (start + timedelta(hours=-(horizon % 3))).strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq="3h") + values1 = np.arange(10) + values2 = np.arange(10, 20) + + self.make_csv_file(tmp_path, f"FORECAST_Kp_{str_date}_ensemble_0.csv", times, values1) + self.make_csv_file(tmp_path, f"FORECAST_Kp_{str_date}_ensemble_1.csv", times, values2) + + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + + assert len(result) == 2 + assert all("kp" in df.columns for df in result) + for df in result: + assert set(df.index) == set(pd.date_range(start, end, freq="3h", tz=timezone.utc)) + + def test_read_with_horizon_no_files(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=3) + horizon = 3 + + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + + assert result == [] + + def test_read_with_horizon_nan_fill_for_missing_files(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=6) + horizon = 3 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq="3h") + values = np.arange(10) + self.make_csv_file(tmp_path, f"FORECAST_Kp_{str_date}_ensemble_0.csv", times, values) + + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + + assert len(result) == 1 + assert not result[0]["kp"].isna().all() + assert (result[0]["horizon"] == horizon).all() + + def test_read_with_horizon_correct_horizon_selection(self, kp_ensemble_instance, tmp_path): + kp_ensemble_instance.data_dir = tmp_path + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=6) + + horizons_to_test = [3, 25, 35] + for horizon in horizons_to_test: + file_time = start + file_offset = -(horizon % 3) + str_date = (file_time + timedelta(hours=file_offset)).strftime("%Y%m%dT%H0000") + + times = pd.date_range(start.replace(tzinfo=None), periods=50, freq="3h") + values = np.arange(len(times)) + horizon + + self.make_csv_file( + tmp_path, + f"FORECAST_Kp_{str_date}_ensemble_0.csv", + times, + values, + ) + + result = kp_ensemble_instance.read_with_horizon(start, end, horizon) + assert len(result) >= 1 + df = result[0] + + file_index = (horizon + 2) // 3 + expected_value = values[file_index] + + actual_value = df.loc[start, "kp"] + assert actual_value == expected_value, ( + f"Expected {expected_value} but got {actual_value} for horizon {horizon}" + ) From a88f1cff8be61ddcc3d0def2f2faa7e0b9628139 Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Mon, 29 Sep 2025 15:29:29 +0200 Subject: [PATCH 04/12] temporary ignore OMNIHighRes tests due to OMNI Web downtime --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index f80a820..e6b4621 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -39,7 +39,7 @@ jobs: pip install coveralls coverage pytest-cov - name: Run tests - run: python -m coverage run --source swvo -m pytest tests/io + run: python -m coverage run --source swvo -m pytest tests/io --ignore tests/io/omni/test_omni_high_res.py --ignore tests/io/solar_wind/test_omni.py - name: Run Coveralls if: ${{ success() && matrix.python-version == '3.11' }} From 79eec4ac79bdb51be17792dcb4a99c58017ed978 Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Mon, 29 Sep 2025 15:59:39 +0200 Subject: [PATCH 05/12] kp horizon must be between 0 and 72 --- swvo/io/kp/ensemble.py | 8 ++++++++ tests/io/kp/test_kp_ensemble.py | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/swvo/io/kp/ensemble.py b/swvo/io/kp/ensemble.py index 01a1603..5883425 100755 --- a/swvo/io/kp/ensemble.py +++ b/swvo/io/kp/ensemble.py @@ -198,12 +198,20 @@ def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: i ------- list[:class:`pandas.DataFrame`] A list of data frames containing ensemble data for the requested period. + + Raises + ------ + ValueError + Raises `ValueError` if the horizon is not between 0 and 72 hours. """ if start_time is not None and not start_time.tzinfo: start_time = start_time.replace(tzinfo=timezone.utc) if end_time is not None and not end_time.tzinfo: end_time = end_time.replace(tzinfo=timezone.utc) + if not (0 <= horizon <= 72): + raise ValueError("Horizon must be between 0 and 72 hours") + align_start_to_3hr = start_time.replace(hour=(start_time.hour // 3) * 3, minute=0, second=0, microsecond=0) align_end_to_3hr = end_time.replace(hour=(end_time.hour // 3) * 3, minute=0, second=0, microsecond=0) diff --git a/tests/io/kp/test_kp_ensemble.py b/tests/io/kp/test_kp_ensemble.py index d055ce2..75cd047 100644 --- a/tests/io/kp/test_kp_ensemble.py +++ b/tests/io/kp/test_kp_ensemble.py @@ -226,3 +226,12 @@ def test_read_with_horizon_correct_horizon_selection(self, kp_ensemble_instance, assert actual_value == expected_value, ( f"Expected {expected_value} but got {actual_value} for horizon {horizon}" ) + + def test_read_with_horizon_invalid_horizon(self, kp_ensemble_instance): + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=3) + invalid_horizons = [-1, 73, 100] + + for horizon in invalid_horizons: + with pytest.raises(ValueError, match="Horizon must be between 0 and 72 hours"): + kp_ensemble_instance.read_with_horizon(start, end, horizon) From fd089e5bdc383645749c03ac82776a095aa41229 Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Thu, 2 Oct 2025 12:21:35 +0200 Subject: [PATCH 06/12] add ensemble file list method --- swvo/io/kp/ensemble.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/swvo/io/kp/ensemble.py b/swvo/io/kp/ensemble.py index 5883425..872cac0 100755 --- a/swvo/io/kp/ensemble.py +++ b/swvo/io/kp/ensemble.py @@ -224,16 +224,13 @@ def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: i for file_time, file_offset, iloc in zip(full_time_range, file_offsets, time_indices): str_date = (file_time + timedelta(hours=file_offset)).strftime("%Y%m%dT%H0000") - file_list_new_name = sorted( - self.data_dir.glob(f"FORECAST_Kp_{str_date}_ensemble_*.csv"), - key=lambda x: int(x.stem.split("_")[-1]), - ) + file_list = self.ensemble_file_list(str_date) for ensemble_idx in range(max_ensembles): df = ensemble_dfs[ensemble_idx] - if ensemble_idx < len(file_list_new_name): + if ensemble_idx < len(file_list): data = pd.read_csv( - file_list_new_name[ensemble_idx], + file_list[ensemble_idx], names=["Forecast Time", "kp"], parse_dates=["Forecast Time"], ).iloc[iloc] @@ -241,7 +238,7 @@ def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: i data["Forecast Time"] = data["Forecast Time"].tz_localize("UTC") df.loc[file_time, "Forecast Time"] = data["Forecast Time"] df.loc[file_time, "kp"] = data["kp"] - df.loc[file_time, "source"] = file_list_new_name[ensemble_idx].stem + df.loc[file_time, "source"] = file_list[ensemble_idx].stem else: df.loc[file_time, "kp"] = np.nan From bae86a1c760db2d72ccb1c44d3bbf7b18d39e1f2 Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Thu, 2 Oct 2025 12:22:08 +0200 Subject: [PATCH 07/12] add read_with_horizon method and change HpEnsemble to ABC --- swvo/io/hp/ensemble.py | 244 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 224 insertions(+), 20 deletions(-) diff --git a/swvo/io/hp/ensemble.py b/swvo/io/hp/ensemble.py index af780d8..d51612b 100755 --- a/swvo/io/hp/ensemble.py +++ b/swvo/io/hp/ensemble.py @@ -7,14 +7,18 @@ import logging import os import warnings +from abc import ABC, abstractmethod from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Optional +from typing import Iterable, Optional, TypeVar +import numpy as np import pandas as pd +Number = TypeVar("Number", int, float) -class HpEnsemble: + +class HpEnsemble(ABC): """This is a base class for Hp ensemble data. Parameters @@ -96,16 +100,61 @@ def read(self, start_time: datetime, end_time: datetime) -> list[pd.DataFrame]: start_date = start_time.replace(microsecond=0, minute=0, second=0) str_date = start_date.strftime("%Y%m%dT%H0000") - file_list_new_name = sorted( + + file_list = self._ensemble_file_list(str_date) + data = [] + + if len(file_list) == 0: + msg = f"No {self.index} ensemble file found for requested date {start_date}" + logging.error(msg) + raise FileNotFoundError(msg) + + for file in file_list: + hp_df = pd.read_csv(file, names=["t", self.index]) + + hp_df["t"] = pd.to_datetime(hp_df["t"], utc=True) + hp_df.index = hp_df["t"] + hp_df = hp_df.drop(labels=["t"], axis=1) + + hp_df = hp_df.truncate( + before=start_time - timedelta(minutes=int(self.index_number) - 0.01), + after=end_time + timedelta(minutes=int(self.index_number) + 0.01), + ) + + data.append(hp_df) + + return data + + def _ensemble_file_list(self, str_date: str) -> list[Path]: + """Check for the existence of ensemble files for a given date. + + Parameters + ---------- + str_date : str + Date string in the format YYYYMMDDTHH0000. + + Returns + ------- + list[Path] + A list of file paths for the ensemble files. + + Warnings + -------- + DeprecationWarning + Warns if deprecated file naming convention is used. + """ + + file_list_old_name = sorted( self.data_dir.glob(f"FORECAST_{self.index.title()}_{str_date}_ensemble_*.csv"), key=lambda x: int(x.stem.split("_")[-1]), ) - file_list_old_name = sorted( + file_list_new_name = sorted( self.data_dir.glob(f"FORECAST_{self.index.upper()}_SWIFT_DRIVEN_swift_{str_date}_ensemble_*.csv"), key=lambda x: int(x.stem.split("_")[-1]), ) - data = [] + + file_list: list[Path] if len(file_list_new_name) == 0 and len(file_list_old_name) == 0: file_list = [] @@ -118,26 +167,129 @@ def read(self, start_time: datetime, end_time: datetime) -> list[pd.DataFrame]: ) file_list = file_list_old_name - if len(file_list) == 0: - msg = f"No {self.index} ensemble file found for requested date {start_date}" + return file_list + + @abstractmethod + def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: Number) -> list[pd.DataFrame]: + if start_time is not None and not start_time.tzinfo: + start_time = start_time.replace(tzinfo=timezone.utc) + if end_time is not None and not end_time.tzinfo: + end_time = end_time.replace(tzinfo=timezone.utc) + + if not (0 <= horizon <= 72): + raise ValueError("Horizon must be between 0 and 72 hours") + + if self.index == "hp30": + freq = "0.5h" + if horizon % 0.5 != 0: + raise ValueError("Horizon for hp30 must be in 0.5 hour increments") + elif self.index == "hp60": + freq = "1h" + if horizon % 1 != 0: + raise ValueError("Horizon for hp60 must be in 1 hour increments") + + align_start_to_hp_hr = start_time.replace(hour=start_time.hour, minute=0, second=0, microsecond=0) + align_end_to_hp_hr = end_time.replace(hour=end_time.hour, minute=0, second=0, microsecond=0) + + full_time_range = pd.date_range(align_start_to_hp_hr, align_end_to_hp_hr, freq=freq, tz=timezone.utc) + + file_offsets, time_indices = self._get_file_offsets_and_time_indices(full_time_range, horizon) + + max_ensembles = 30 # Maximum number of ensemble files to check + ensemble_dfs = [pd.DataFrame(index=full_time_range) for _ in range(max_ensembles)] + + for file_time, file_offset, iloc in zip(full_time_range, file_offsets, time_indices): + str_date = (file_time + timedelta(hours=file_offset)).strftime("%Y%m%dT%H0000") + file_list = self._ensemble_file_list(str_date) + for ensemble_idx in range(max_ensembles): + df = ensemble_dfs[ensemble_idx] + if ensemble_idx < len(file_list): + data = pd.read_csv( + file_list[ensemble_idx], + names=["Forecast Time", self.index], + parse_dates=["Forecast Time"], + ).iloc[iloc] + data["source"] = str_date + data["Forecast Time"] = data["Forecast Time"].tz_localize("UTC") + df.loc[file_time, "Forecast Time"] = data["Forecast Time"] + df.loc[file_time, self.index] = data[self.index] + df.loc[file_time, "source"] = file_list[ensemble_idx].stem + else: + df.loc[file_time, self.index] = np.nan + + for df in ensemble_dfs: + df["horizon"] = horizon + df.index.name = "Time" + ensemble_dfs = [df for df in ensemble_dfs if not df[self.index].isna().all()] + + if len(ensemble_dfs) == 0: + msg = f"No ensemble data found for the requested period {start_time} to {end_time} and horizon {horizon} hours. Check the data directory {self.data_dir} for available data." logging.error(msg) raise FileNotFoundError(msg) - for file in file_list: - hp_df = pd.read_csv(file, names=["t", self.index]) + return ensemble_dfs - hp_df["t"] = pd.to_datetime(hp_df["t"], utc=True) - hp_df.index = hp_df["t"] - hp_df = hp_df.drop(labels=["t"], axis=1) + def _get_file_offsets_and_time_indices( + self, file_time_range: Iterable, forecast_horizon: float + ) -> tuple[list[int], list[int]]: + """ + Compute file offsets and time indices for a given forecast horizon. - hp_df = hp_df.truncate( - before=start_time - timedelta(minutes=int(self.index_number) - 0.01), - after=end_time + timedelta(minutes=int(self.index_number) + 0.01), - ) + Parameters + ---------- + file_time_range : iterable + Available file time steps. + forecast_horizon : float + Forecast horizon in hours (can be fractional, e.g., 0.5, 1.5, 72.0). - data.append(hp_df) + Returns + ------- + file_offsets : list[int] + Offsets from the given time steps indicating which files to read. + time_indices : list[int] + Time indices to use for each file. + """ + file_offsets = [] + time_indices = [] - return data + for current_time in file_time_range: + current_hour = current_time.hour + current_minute = current_time.minute + current_fractional_hour = current_hour + current_minute / 60.0 + + # Target forecast time (fractional hours from same midnight) + target_fractional_hour = current_fractional_hour + forecast_horizon + + # Find the file base hour (multiple of 3) that contains this target + # Files are grouped by 3s: [0,1,2] use base 0, [3,4,5] use base 3, etc. + # Each file covers 72 hours from its base hour + + # Determine which 3-hour group the target falls into + target_hour_int = int(target_fractional_hour) + target_base_hour = (target_hour_int // 3) * 3 + + if target_fractional_hour <= target_base_hour + 72: + file_base_hour = target_base_hour + else: + # use next group + file_base_hour = target_base_hour + 3 + + # Determine which file in the group (base, base+1, base+2) to use + # Use the file that was created at or before current_time + if current_hour >= file_base_hour: + # Use the latest file in the group that's <= current_hour + file_hour = min(current_hour, file_base_hour + 2) + else: + file_hour = file_base_hour + + file_offset = file_hour - current_hour + file_offsets.append(file_offset) + hours_from_file_start = target_fractional_hour - file_base_hour + resolution = 0.5 if self.index == "hp30" else 1.0 + time_index = int(hours_from_file_start / resolution) + time_indices.append(time_index) + + return file_offsets, time_indices class Hp30Ensemble(HpEnsemble): @@ -154,17 +306,69 @@ class Hp30Ensemble(HpEnsemble): def __init__(self, data_dir: Optional[Path] = None) -> None: super().__init__("hp30", data_dir) + def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: float) -> list[pd.DataFrame]: + """Read Ensemble Hp30 forecast data for a given time range and forecast horizon. + + Parameters + ---------- + start_time : datetime + Start time of the period for which to read the data. + end_time : datetime + End time of the period for which to read the data. + horizon : float + Forecast horizon (in hours). + + Returns + ------- + list[:class:`pandas.DataFrame`] + A list of data frames containing ensemble data for the requested period. + + Raises + ------ + ValueError + Raises `ValueError` if the horizon is not between 0 and 72 hours. + ValueError + Raises `ValueError` if the horizon is not in 0.5 hour increments. + """ + return super().read_with_horizon(start_time, end_time, horizon) + class Hp60Ensemble(HpEnsemble): - """A class to handle Hp30 ensemble data. + """A class to handle Hp60 ensemble data. Parameters ---------- data_dir : str | Path, optional - Data directory for the Hp30 ensemble data. If not provided, it will be read from the environment variable + Data directory for the Hp60 ensemble data. If not provided, it will be read from the environment variable """ ENV_VAR_NAME = "HP60_ENSEMBLE_FORECAST_DIR" def __init__(self, data_dir: Optional[Path] = None) -> None: super().__init__("hp60", data_dir) + + def read_with_horizon(self, start_time: datetime, end_time: datetime, horizon: int) -> list[pd.DataFrame]: + """Read Ensemble Hp60 forecast data for a given time range and forecast horizon. + + Parameters + ---------- + start_time : datetime + Start time of the period for which to read the data. + end_time : datetime + End time of the period for which to read the data. + horizon : int + Forecast horizon (in hours). + + Returns + ------- + list[:class:`pandas.DataFrame`] + A list of data frames containing ensemble data for the requested period. + + Raises + ------ + ValueError + Raises `ValueError` if the horizon is not between 0 and 72 hours. + ValueError + Raises `ValueError` if the horizon is not in 1 hour increments. + """ + return super().read_with_horizon(start_time, end_time, horizon) From 1c684fc0f39b9239139a3edb12882742967400b5 Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Thu, 2 Oct 2025 12:22:24 +0200 Subject: [PATCH 08/12] change invalid index test --- tests/io/hp/test_hp_ensemble.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/io/hp/test_hp_ensemble.py b/tests/io/hp/test_hp_ensemble.py index 2e5130e..99c41de 100644 --- a/tests/io/hp/test_hp_ensemble.py +++ b/tests/io/hp/test_hp_ensemble.py @@ -56,8 +56,8 @@ def test_initialization_without_env_var(self): ): Hp30Ensemble() - def test_invalid_index(self): - with pytest.raises(ValueError, match="Encountered invalid index:.*"): + def test_abc_instantiation(self): + with pytest.raises(TypeError, match="Can't instantiate abstract class*"): HpEnsemble("hp45", data_dir=DATA_DIR) @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) From 88a5118330292744ebc7fc8630312117996df1af Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Thu, 2 Oct 2025 12:24:56 +0200 Subject: [PATCH 09/12] restart omni tests --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e6b4621..f80a820 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -39,7 +39,7 @@ jobs: pip install coveralls coverage pytest-cov - name: Run tests - run: python -m coverage run --source swvo -m pytest tests/io --ignore tests/io/omni/test_omni_high_res.py --ignore tests/io/solar_wind/test_omni.py + run: python -m coverage run --source swvo -m pytest tests/io - name: Run Coveralls if: ${{ success() && matrix.python-version == '3.11' }} From 2e60c0ebe4ae3983b49b0dfed545ce879f1049f7 Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Thu, 2 Oct 2025 12:39:30 +0200 Subject: [PATCH 10/12] change month in dstwdc test --- tests/io/dst/test_wdc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/io/dst/test_wdc.py b/tests/io/dst/test_wdc.py index c78cfaf..0c26bec 100644 --- a/tests/io/dst/test_wdc.py +++ b/tests/io/dst/test_wdc.py @@ -83,7 +83,7 @@ def test_get_processed_file_list(self, dst_instance): assert len(time_intervals) == 36 def test_download_and_process(self, dst_instance): - dst_instance.download_and_process(datetime(2025, 1, 1), datetime(2025, 2, 1)) + dst_instance.download_and_process(datetime(2025, 9, 1), datetime(2025, 9, 1)) expected_files = list(MOCK_DATA_PATH.glob("WDC_DST_*.csv")) From 70a40e142ef45bc53a65e616628b825611fa1a34 Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Thu, 2 Oct 2025 12:45:53 +0200 Subject: [PATCH 11/12] change month in dstwdc test --- tests/io/dst/test_wdc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/io/dst/test_wdc.py b/tests/io/dst/test_wdc.py index 0c26bec..15076b6 100644 --- a/tests/io/dst/test_wdc.py +++ b/tests/io/dst/test_wdc.py @@ -83,7 +83,7 @@ def test_get_processed_file_list(self, dst_instance): assert len(time_intervals) == 36 def test_download_and_process(self, dst_instance): - dst_instance.download_and_process(datetime(2025, 9, 1), datetime(2025, 9, 1)) + dst_instance.download_and_process(datetime(2025, 8, 1), datetime(2025, 9, 1)) expected_files = list(MOCK_DATA_PATH.glob("WDC_DST_*.csv")) From da7ff969f61929d3a33df8a97ac498e7725ed0ad Mon Sep 17 00:00:00 2001 From: Sahil Jhawar Date: Thu, 2 Oct 2025 12:46:17 +0200 Subject: [PATCH 12/12] add tests for Hp read_with_horizon --- tests/io/hp/test_hp_ensemble.py | 174 ++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) diff --git a/tests/io/hp/test_hp_ensemble.py b/tests/io/hp/test_hp_ensemble.py index 99c41de..7068779 100644 --- a/tests/io/hp/test_hp_ensemble.py +++ b/tests/io/hp/test_hp_ensemble.py @@ -116,3 +116,177 @@ def test_read_with_default_times(self, instance_type="hp30"): assert isinstance(data[0], pd.DataFrame) assert all("hp30" in i.columns for i in data) assert data[0].index.tz == timezone.utc + + def make_csv_file(self, path, filename, times, values, index_name): + """Helper to create a CSV file with time-value pairs.""" + df = pd.DataFrame({"Forecast Time": times, index_name: values}) + file = path / filename + df.to_csv(file, header=False, index=False) + return file + + @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) + def test_read_with_horizon_single_file(self, instance_type, index_name): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=1) + horizon = 1.0 if instance_type == "hp30" else 1 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq=f"{instance.index_number}min") + values = np.arange(10) + + self.make_csv_file( + ensemble_dir, + f"FORECAST_{index_name.title()}_{str_date}_ensemble_0.csv", + times, + values, + index_name, + ) + result = instance.read_with_horizon(start, end, horizon) + + assert isinstance(result, list) + assert all(isinstance(df, pd.DataFrame) for df in result) + assert not result[0][index_name].isna().all() + assert (result[0]["horizon"] == horizon).all() + assert "Forecast Time" in result[0].columns + assert index_name in result[0].columns + assert "source" in result[0].columns + + @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) + def test_read_with_horizon_multiple_ensembles(self, instance_type, index_name): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=2) + horizon = 2.0 if instance_type == "hp30" else 2 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq=f"{instance.index_number}min") + values1 = np.arange(10) + values2 = np.arange(10, 20) + + self.make_csv_file( + ensemble_dir, f"FORECAST_{index_name.title()}_{str_date}_ensemble_0.csv", times, values1, index_name + ) + self.make_csv_file( + ensemble_dir, f"FORECAST_{index_name.title()}_{str_date}_ensemble_1.csv", times, values2, index_name + ) + + result = instance.read_with_horizon(start, end, horizon) + + assert len(result) == 2 + assert all(index_name in df.columns for df in result) + freq = "0.5h" if instance_type == "hp30" else "1h" + expected_range = pd.date_range(start, end, freq=freq, tz=timezone.utc) + for df in result: + assert set(df.index) == set(expected_range) + + @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) + def test_read_with_horizon_nan_fill_for_missing_files(self, instance_type, index_name): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=2) + horizon = 1.0 if instance_type == "hp30" else 1 + + str_date = start.strftime("%Y%m%dT%H0000") + times = pd.date_range(start.replace(tzinfo=None), periods=10, freq=f"{instance.index_number}min") + values = np.arange(10) + self.make_csv_file( + ensemble_dir, f"FORECAST_{index_name.title()}_{str_date}_ensemble_0.csv", times, values, index_name + ) + + result = instance.read_with_horizon(start, end, horizon) + + assert len(result) >= 1 + assert not result[0][index_name].isna().all() + assert (result[0]["horizon"] == horizon).all() + + @pytest.mark.parametrize("instance_type,index_name", [("hp30", "hp30"), ("hp60", "hp60")]) + def test_read_with_horizon_correct_horizon_selection(self, instance_type, index_name): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=2) + + horizons_to_test = [0.5, 1.0, 1.5] if instance_type == "hp30" else [1, 2] + for horizon in horizons_to_test: + for existing_file in ensemble_dir.glob("FORECAST_*.csv"): + existing_file.unlink() + + str_date = start.strftime("%Y%m%dT%H0000") + + times = pd.date_range(start.replace(tzinfo=None), periods=50, freq=f"{instance.index_number}min") + values = np.arange(len(times)) + horizon + + self.make_csv_file( + ensemble_dir, + f"FORECAST_{index_name.title()}_{str_date}_ensemble_0.csv", + times, + values, + index_name, + ) + + result = instance.read_with_horizon(start, end, horizon) + assert len(result) >= 1 + df = result[0] + + assert not df[index_name].isna().all() + assert (df["horizon"] == horizon).all() + + @pytest.mark.parametrize("instance_type", ["hp30", "hp60"]) + def test_read_with_horizon_invalid_horizon(self, instance_type): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=1) + + invalid_horizons = [-1, 73, 100] + for horizon in invalid_horizons: + with pytest.raises(ValueError, match="Horizon must be between 0 and 72 hours"): + instance.read_with_horizon(start, end, horizon) + + if instance_type == "hp30": + invalid_increments = [0.3, 1.7, 2.2] + for horizon in invalid_increments: + with pytest.raises(ValueError, match="Horizon for hp30 must be in 0.5 hour increments"): + instance.read_with_horizon(start, end, horizon) + + if instance_type == "hp60": + invalid_increments = [0.5, 1.5, 2.3] + for horizon in invalid_increments: + with pytest.raises(ValueError, match="Horizon for hp60 must be in 1 hour increments"): + instance.read_with_horizon(start, end, horizon) + + @pytest.mark.parametrize("instance_type", ["hp30", "hp60"]) + def test_read_with_horizon_no_files(self, instance_type): + ensemble_dir = DATA_DIR / instance_type + ensemble_dir.mkdir(exist_ok=True) + instance_class = Hp30Ensemble if instance_type == "hp30" else Hp60Ensemble + instance = instance_class(data_dir=ensemble_dir) + + start = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc) + end = start + timedelta(hours=1) + horizon = 1.0 if instance_type == "hp30" else 1 + + for existing_file in ensemble_dir.glob("FORECAST_*.csv"): + existing_file.unlink() + + with pytest.raises(FileNotFoundError, match="No ensemble data found"): + instance.read_with_horizon(start, end, horizon)