Skip to content

Commit 40d04b1

Browse files
authored
Merge branch 'main' into issue969
2 parents 508f8f3 + 30cb1c0 commit 40d04b1

File tree

7 files changed

+130
-29
lines changed

7 files changed

+130
-29
lines changed

CHANGES.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
2.1.0 (TBD)
22

33
Added:
4-
- Support for catalog source publishing stages (#977) and time range types ()
5-
have been added to subscription_request.catalog_source.
4+
- Support for catalog source publishing stages (#977) and time range types
5+
(#978) have been added to subscription_request.catalog_source.
6+
- Add the option to get Planetary Variable subscription results as a CSV file
7+
(#981).
68
- A subscription_request.planetary_variable_source function has been added
79
(#976).
810
- The subscription_request.build_request function has a new option to clip to

planet/cli/subscriptions.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,13 @@ async def get_subscription_cmd(ctx, subscription_id, pretty):
162162
"success"]),
163163
multiple=True,
164164
default=None,
165-
callback=lambda ctx,
166-
param,
167-
value: set(value),
165+
callback=(lambda ctx, param, value: set(value)),
168166
help="Select subscription results in one or more states. Default: all.")
167+
@click.option('--csv',
168+
'csv_flag',
169+
is_flag=True,
170+
default=False,
171+
help="Get subscription results as comma-separated fields.")
169172
@limit
170173
# TODO: the following 3 options.
171174
# –created: timestamp instant or range.
@@ -178,13 +181,39 @@ async def list_subscription_results_cmd(ctx,
178181
subscription_id,
179182
pretty,
180183
status,
184+
csv_flag,
181185
limit):
182-
"""Gets results of a subscription and prints the API response."""
186+
"""Print the results of a subscription to stdout.
187+
188+
The output of this command is a sequence of JSON objects (the
189+
default) or a sequence of comma-separated fields (when the --csv
190+
option is used), one result per line.
191+
192+
Examples:
193+
194+
\b
195+
planet subscriptions results SUBSCRIPTION_ID --status=success --limit 10
196+
197+
Where SUBSCRIPTION_ID is the unique identifier for a subscription,
198+
this prints the last 10 successfully delivered results for that
199+
subscription as JSON objects.
200+
201+
\b
202+
planet subscriptions results SUBSCRIPTION_ID --limit 0 --csv > results.csv
203+
204+
Prints all results for a subscription and saves them to a CSV file.
205+
"""
183206
async with subscriptions_client(ctx) as client:
184-
async for result in client.get_results(subscription_id,
185-
status=status,
186-
limit=limit):
187-
echo_json(result, pretty)
207+
if csv_flag:
208+
async for result in client.get_results_csv(subscription_id,
209+
status=status,
210+
limit=limit):
211+
click.echo(result)
212+
else:
213+
async for result in client.get_results(subscription_id,
214+
status=status,
215+
limit=limit):
216+
echo_json(result, pretty)
188217

189218

190219
@subscriptions.command() # type: ignore

planet/clients/subscriptions.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Planet Subscriptions API Python client."""
22

33
import logging
4-
from typing import AsyncIterator, Optional, Set
4+
from typing import AsyncIterator, Literal, Optional, Sequence
55

66
from planet.exceptions import APIError, ClientError
77
from planet.http import Session
@@ -58,7 +58,7 @@ def __init__(self,
5858
self._base_url = self._base_url[:-1]
5959

6060
async def list_subscriptions(self,
61-
status: Optional[Set[str]] = None,
61+
status: Optional[Sequence[str]] = None,
6262
limit: int = 100) -> AsyncIterator[dict]:
6363
"""Iterate over list of account subscriptions with optional filtering.
6464
@@ -216,16 +216,21 @@ async def get_subscription(self, subscription_id: str) -> dict:
216216

217217
async def get_results(self,
218218
subscription_id: str,
219-
status: Optional[Set[str]] = None,
219+
status: Optional[Sequence[Literal[
220+
"created",
221+
"queued",
222+
"processing",
223+
"failed",
224+
"success"]]] = None,
220225
limit: int = 100) -> AsyncIterator[dict]:
221226
"""Iterate over results of a Subscription.
222227
223-
Note:
228+
Notes:
224229
The name of this method is based on the API's method name. This
225230
method provides iteration over results, it does not get a
226231
single result description or return a list of descriptions.
227232
228-
Args:
233+
Parameters:
229234
subscription_id (str): id of a subscription.
230235
status (Set[str]): pass result with status in this set,
231236
filter out results with status not in this set.
@@ -252,7 +257,6 @@ class _ResultsPager(Paged):
252257
resp = await self._session.request(method='GET',
253258
url=url,
254259
params=params)
255-
256260
async for sub in _ResultsPager(resp,
257261
self._session.request,
258262
limit=limit):
@@ -263,3 +267,41 @@ class _ResultsPager(Paged):
263267
raise
264268
except ClientError: # pragma: no cover
265269
raise
270+
271+
async def get_results_csv(
272+
self,
273+
subscription_id: str,
274+
status: Optional[Sequence[Literal["created",
275+
"queued",
276+
"processing",
277+
"failed",
278+
"success"]]] = None,
279+
limit: int = 100,
280+
) -> AsyncIterator[str]:
281+
"""Iterate over rows of results CSV for a Subscription.
282+
283+
Parameters:
284+
subscription_id (str): id of a subscription.
285+
status (Set[str]): pass result with status in this set,
286+
filter out results with status not in this set.
287+
TODO: created, updated, completed, user_id
288+
289+
Yields:
290+
str: a row from a CSV file.
291+
292+
Raises:
293+
APIError: on an API server error.
294+
ClientError: on a client error.
295+
"""
296+
url = f'{self._base_url}/{subscription_id}/results'
297+
params = {'status': [val for val in status or {}], 'format': 'csv'}
298+
299+
# Note: retries are not implemented yet. This project has
300+
# retry logic for HTTP requests, but does not handle errors
301+
# during streaming. We may want to consider a retry decorator
302+
# for this entire method a la stamina:
303+
# https://github.com/hynek/stamina.
304+
async with self._session._client.stream('GET', url,
305+
params=params) as response:
306+
async for line in response.aiter_lines():
307+
yield line

planet/subscription_request.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,8 +297,8 @@ def planetary_variable_source(
297297
configured.
298298
299299
Examples:
300-
```pycon
301-
>>> source = planetary_variables_source(
300+
```python
301+
>>> source = planetary_variable_source(
302302
... "soil_water_content",
303303
... "SWC-AMSR2-C_V1.0_100",
304304
... geometry={

tests/integration/test_subscriptions_api.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,25 @@ def result_pages(status=None, size=40):
128128
# must disable the default.
129129
res_api_mock = respx.mock(assert_all_called=False)
130130

131-
# 1. Request for status: created. Response has three pages.
131+
# 1. CSV results
132+
res_api_mock.route(
133+
M(url__startswith=TEST_URL), M(params__contains={'format': 'csv'})).mock(
134+
side_effect=[Response(200, text="id,status\n1234-abcd,SUCCESS\n")])
135+
136+
# 2. Request for status: created. Response has three pages.
132137
res_api_mock.route(
133138
M(url__startswith=TEST_URL),
134139
M(params__contains={'status': 'created'})).mock(side_effect=[
135140
Response(200, json=page)
136141
for page in result_pages(status={'created'}, size=40)
137142
])
138143

139-
# 2. Request for status: queued. Response has a single empty page.
144+
# 3. Request for status: queued. Response has a single empty page.
140145
res_api_mock.route(M(url__startswith=TEST_URL),
141146
M(params__contains={'status': 'queued'})).mock(
142147
side_effect=[Response(200, json={'results': []})])
143148

144-
# 3. No status requested. Response is the same as for 1.
149+
# 4. No status requested. Response is the same as for 1.
145150
res_api_mock.route(M(url__startswith=TEST_URL)).mock(
146151
side_effect=[Response(200, json=page) for page in result_pages(size=40)])
147152

@@ -276,6 +281,18 @@ async def test_get_results_success():
276281
assert len(results) == 100
277282

278283

284+
@pytest.mark.anyio
285+
@res_api_mock
286+
async def test_get_results_csv():
287+
"""Subscription CSV fetched, has the expected items."""
288+
async with Session() as session:
289+
client = SubscriptionsClient(session, base_url=TEST_URL)
290+
results = [res async for res in client.get_results_csv("42")]
291+
import csv
292+
rows = list(csv.reader(results))
293+
assert rows == [['id', 'status'], ['1234-abcd', 'SUCCESS']]
294+
295+
279296
paging_cycle_api_mock = respx.mock()
280297

281298
# Identical next links is a hangup we want to avoid.

tests/integration/test_subscriptions_cli.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,3 +306,11 @@ def test_request_catalog_success(invoke, geom_geojson):
306306
])
307307
assert json.loads(result.output) == source
308308
assert result.exit_code == 0 # success.
309+
310+
311+
@res_api_mock
312+
def test_subscriptions_results_csv(invoke):
313+
"""Get results as CSV."""
314+
result = invoke(['results', 'test', '--csv'])
315+
assert result.exit_code == 0 # success.
316+
assert result.output.splitlines() == ['id,status', '1234-abcd,SUCCESS']

tests/unit/test_subscription_request.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -349,22 +349,25 @@ def test_toar_tool_success():
349349
assert res == expected
350350

351351

352-
def test_pv_source_success(geom_geojson):
352+
@pytest.mark.parametrize(
353+
"var_type, var_id",
354+
[
355+
("biomass_proxy", "BIOMASS-PROXY_V3.0_10"), # actual real type and id.
356+
("var1", "VAR1-ABCD"), # nonsense type and id
357+
])
358+
def test_pv_source_success(geom_geojson, var_type, var_id):
353359
"""Configure a planetary variable subscription source."""
354-
# NOTE: this function does not yet validate type and id.
355-
# The nonsense values are intended to fail when the function does
356-
# add validation.
357360
source = subscription_request.planetary_variable_source(
358-
"var1",
359-
"VAR1-abcd",
361+
var_type,
362+
var_id,
360363
geometry=geom_geojson,
361364
start_time=datetime(2021, 3, 1),
362365
end_time=datetime(2021, 3, 2),
363366
)
364367

365-
assert source["type"] == "var1"
368+
assert source["type"] == var_type
366369
params = source["parameters"]
367-
assert params["id"] == "VAR1-abcd"
370+
assert params["id"] == var_id
368371
assert params["geometry"] == geom_geojson
369372
assert params["start_time"].startswith("2021-03-01")
370373

0 commit comments

Comments
 (0)