Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ SCMData
scmdata.ops
scmdata.plotting
scmdata.processing
scmdata.processing
scmdata.run
scmdata.testing
scmdata.time
Expand Down
6 changes: 6 additions & 0 deletions docs/source/scmdata.remote.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.. _scmdata.remote:

scmdata.remote
---------------

.. automodule:: scmdata.remote
1,383 changes: 1,383 additions & 0 deletions notebooks/remote-datasets.ipynb

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ plotting =
seaborn

notebooks =
%(plotting)s
%(optional)s
notebook
ipywidgets
%(plotting)s
%(optional)s
notebook
ipywidgets
pooch

tests =
codecov
Expand Down
1 change: 1 addition & 0 deletions src/scmdata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@

from scmdata.run import ScmRun, run_append # noqa: F401, E402
from scmdata.database import ScmDatabase # noqa: F401, E402
from scmdata.remote import RemoteDataset # noqa: F401, E402
2 changes: 1 addition & 1 deletion src/scmdata/database/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ScmDatabase:

def __init__(
self,
root_dir,
root_dir=None,
levels=("climate_model", "variable", "region", "scenario"),
backend="netcdf",
backend_config=None,
Expand Down
12 changes: 12 additions & 0 deletions src/scmdata/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Custom errors and exceptions used by scmdata
"""
import pandas as pd
import requests.exceptions


class NonUniqueMetadataError(ValueError):
Expand Down Expand Up @@ -63,3 +64,14 @@ class InsufficientDataError(Exception):
"""
Insufficient data is available to interpolate/extrapolate
"""


class RemoteQueryError(Exception):
"""
Something went wrong when fetching data from a remote source
"""

def __init__(self, msg: str, error: requests.exceptions.RequestException):
msg = f"{msg}: {str(error)}"
super().__init__(msg)
self.error = error
252 changes: 252 additions & 0 deletions src/scmdata/remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import io
import logging
import urllib.parse
from functools import lru_cache
from typing import List, Optional

import pandas as pd
import requests

import scmdata
from scmdata import ScmRun
from scmdata.errors import RemoteQueryError

logger = logging.getLogger(__name__)


# How many requests are kept in memory
CACHE_SIZE = 32


def _make_request(method, url, params) -> requests.Response:
try:
resp = requests.request(method, url, params=params)
resp.raise_for_status()

return resp
except requests.exceptions.ConnectionError as err:
# connection failure or DNS error
raise RemoteQueryError("Failed to connect", error=err)
except requests.exceptions.Timeout as err:
# Failed to get a response from the API
raise RemoteQueryError("Connection timeout", error=err)
except requests.exceptions.HTTPError as err:
# Handles non-200 status codes
raise RemoteQueryError("Client error", error=err)
except requests.exceptions.RequestException as err:
raise RemoteQueryError("Unknown error occurred when fetching data", error=err)


@lru_cache(CACHE_SIZE)
def _read_api_timeseries(url: str, **filters):
timeseries_url = urllib.parse.urljoin(url, "timeseries")
filters["format"] = "csv" # CSV format is faster to parse compared to json

resp = _make_request("get", timeseries_url, filters)

df = pd.read_csv(io.StringIO(resp.text))
return ScmRun(df)


@lru_cache(CACHE_SIZE)
def _read_api_facets(url, **filters):
timeseries_url = urllib.parse.urljoin(url, "facets")

resp = _make_request("get", timeseries_url, filters)

data = resp.json()
items = []
for name in data:
for item in data[name]:
items.append({"name": name, **item})
if len(items) == 0:
return pd.DataFrame(columns=["name", "value", "count"])
return pd.DataFrame(items)[["name", "value", "count"]]


@lru_cache(CACHE_SIZE)
def _read_api_meta(url, **filters):
timeseries_url = urllib.parse.urljoin(url, "meta")

resp = _make_request("get", timeseries_url, filters)

data = resp.json()
return pd.DataFrame(data["meta"])


class RemoteDataset:
def __init__(self, base_url: str, filters=None):
"""
To write once happy with interface

Parameters
----------
base_url
Url of API
filters
Default filters

Shorthand for calling ``RemoteDataset(url).filter(**filters)``
"""
# Ensure the url is terminated with a '/'
self.base_url = base_url.rstrip("/") + "/"
self.filters = filters or {}
self._meta_cols = None

def _read_api_info(self):
facets = _read_api_facets(self.base_url)
self._meta_cols = facets["name"].unique().tolist()

def __getattr__(self, item: str):
# Proxy ScmRun functions
if hasattr(ScmRun, item):
return getattr(self.query(), item)
raise AttributeError(
f"'{self.__class__.__name__}' object has no attribute '{item}'"
)

def __len__(self):
return len(self.meta())

def __repr__(self) -> str:
def _indent(s):
_lines = ["\t" + line for line in s.split("\n")]
return "\n".join(_lines)

lines = [
f"<{self.__class__.__name__}> (timeseries: {len(self)})",
f"URL: {self.url()}",
"Filters",
_indent(str(self.filters)),
"Meta",
_indent(repr(self.meta())),
]

return "\n".join(lines)

def url(self) -> str:
opts = self.filter_options()
filters = {k: self.filters[k] for k in self.filters.keys() if k in opts}
filters["format"] = "csv"
query_params = "?" + urllib.parse.urlencode(filters)

return urllib.parse.urljoin(self.base_url, "timeseries") + query_params

def meta(self) -> pd.DataFrame:
"""
Fetch metadata about the filtered dataset from the API
Returns
-------
The meta data for each row. This is the equivalent to :func:`scmdata.ScmRun.meta`
"""
logger.info(
f"Fetching remote meta from {self.base_url} matching {self.filters}"
)
return _read_api_meta(self.base_url, **self.filters)

def get_unique_meta(
self,
col: str,
no_duplicates: Optional[bool] = False,
) -> List:
"""
Get unique values in a metadata column.

This performs a remote query to the API server

Parameters
----------
col
Column to retrieve metadata for

no_duplicates:
Should I raise an error if there is more than one unique value in the
metadata column?

Raises
------
ValueError
There is more than one unique value in the metadata column and
``no_duplicates`` is ``True``.

KeyError
If a ``meta`` column does not exist in the run's metadata

RemoteQueryError
Something went wrong when querying the API

Returns
-------
[List[Any], Any]
List of unique metadata values. If ``no_duplicates`` is ``True`` the
metadata value will be returned (rather than a list).

"""
vals = self.meta()[col].unique().tolist()
if no_duplicates:
if len(vals) != 1:
raise ValueError(
"`{}` column is not unique (found values: {})".format(col, vals)
)

return vals[0]
return vals

def filter_options(self) -> List[str]:
if self._meta_cols is None:
self._read_api_info()

extra_filters = ["year.min", "year.max"]
return [*self._meta_cols, *extra_filters]

def query(self, raise_on_error: bool = False) -> scmdata.ScmRun:
"""
Fetch timeseries from the API

The resulting data will follow any applied filters (see :func:`filters`).


Raises
------
RemoteQueryError
Something went wrong when querying the API

Returns
-------
:class:`scmdata.ScmRun`
"""
logger.info(
f"Fetching remote timeseries from {self.base_url} matching {self.filters}"
)

opts = self.filter_options()
filter_keys = self.filters.keys()
filters = {k: self.filters[k] for k in filter_keys if k in opts}

extra_filters = [k for k in filter_keys if k not in opts]
if len(extra_filters):

msg = f"Could not filter dataset by {extra_filters}"
if raise_on_error:
raise ValueError(msg)
logger.warning(msg + ". Ignoring")
run = _read_api_timeseries(self.base_url, **filters)

run.metadata["source"] = self

return run

def filter(self, **filters):
if not filters.get("keep", True):
logger.warning(
"'keep' is not handled by the API. Querying data and performing filtering locally"
)
return self.query().filter(**filters)

new_filters = {**self.filters}
for k in filters:
if k in self.filters:
raise ValueError(f"Already filtering by {k}")
new_filters[k] = filters[k]

return self.__class__(base_url=self.base_url, filters=new_filters)
Empty file added tests/__init__.py
Empty file.
Empty file added tests/integration/__init__.py
Empty file.
72 changes: 72 additions & 0 deletions tests/integration/test_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import pandas as pd

import scmdata
from scmdata.remote import (
RemoteDataset,
_read_api_facets,
_read_api_meta,
_read_api_timeseries,
)

NDCS_URL = "https://api.climateresource.com.au/ndcs/v1"


def test_read_ndcs():
res = _read_api_timeseries(
NDCS_URL,
**{
"version": "14Feb2022b_CR",
"variable": "Emissions|Total *",
"hot_air": "exclude",
"category": "Current",
},
)

assert len(res)
assert isinstance(res, scmdata.ScmRun)


def test_read_facets():
res = _read_api_facets(
NDCS_URL,
**{
"version": "14Feb2022b_CR",
"variable": "Emissions|Total *",
"hot_air": "exclude",
"category": "Current",
},
)

assert (res.columns == ["name", "value", "count"]).all()
assert len(res)


def test_read_meta():
res = _read_api_meta(
NDCS_URL,
version="14Feb2022b_CR",
variable="Emissions|Total *",
)

assert isinstance(res, pd.DataFrame)
assert "category" in res
assert "Emissions|Total GHG excl. LULUCF" in res["variable"].tolist()


def test_remote_dataset_real():
ds = RemoteDataset(NDCS_URL)

assert "USA" in ds.get_unique_meta("region")

ds = ds.filter(region="AUS")
assert "USA" not in ds.get_unique_meta("region")
assert "AUS" in ds.get_unique_meta("region")
ds_meta = ds.meta()

run = ds.query()
assert isinstance(run, scmdata.ScmRun)
pd.testing.assert_frame_equal(run.meta, ds_meta)

# We should be able to use other ScmRun funcs
res = ds.process_over("variable", "mean")
assert isinstance(res, pd.DataFrame)
Loading