Skip to content

Commit 612d11b

Browse files
authored
Merge branch 'main' into issue968bis
2 parents 5d87350 + 30cb1c0 commit 612d11b

File tree

5 files changed

+116
-18
lines changed

5 files changed

+116
-18
lines changed

CHANGES.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
Added:
44
- Support for catalog source publishing stages has been added to
5-
subscription_request.catalog_source ().
5+
subscription_request.catalog_source (#977).
6+
- Add the option to get Planetary Variable subscription results as a CSV file
7+
(#981).
68
- A subscription_request.planetary_variable_source function has been added
79
(#976).
810
- The subscription_request.build_request function has a new option to clip to

planet/cli/subscriptions.py

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,13 @@ async def get_subscription_cmd(ctx, subscription_id, pretty):
162162
"success"]),
163163
multiple=True,
164164
default=None,
165-
callback=lambda ctx,
166-
param,
167-
value: set(value),
165+
callback=(lambda ctx, param, value: set(value)),
168166
help="Select subscription results in one or more states. Default: all.")
167+
@click.option('--csv',
168+
'csv_flag',
169+
is_flag=True,
170+
default=False,
171+
help="Get subscription results as comma-separated fields.")
169172
@limit
170173
# TODO: the following 3 options.
171174
# –created: timestamp instant or range.
@@ -178,13 +181,39 @@ async def list_subscription_results_cmd(ctx,
178181
subscription_id,
179182
pretty,
180183
status,
184+
csv_flag,
181185
limit):
182-
"""Gets results of a subscription and prints the API response."""
186+
"""Print the results of a subscription to stdout.
187+
188+
The output of this command is a sequence of JSON objects (the
189+
default) or a sequence of comma-separated fields (when the --csv
190+
option is used), one result per line.
191+
192+
Examples:
193+
194+
\b
195+
planet subscriptions results SUBSCRIPTION_ID --status=success --limit 10
196+
197+
Where SUBSCRIPTION_ID is the unique identifier for a subscription,
198+
this prints the last 10 successfully delivered results for that
199+
subscription as JSON objects.
200+
201+
\b
202+
planet subscriptions results SUBSCRIPTION_ID --limit 0 --csv > results.csv
203+
204+
Prints all results for a subscription and saves them to a CSV file.
205+
"""
183206
async with subscriptions_client(ctx) as client:
184-
async for result in client.get_results(subscription_id,
185-
status=status,
186-
limit=limit):
187-
echo_json(result, pretty)
207+
if csv_flag:
208+
async for result in client.get_results_csv(subscription_id,
209+
status=status,
210+
limit=limit):
211+
click.echo(result)
212+
else:
213+
async for result in client.get_results(subscription_id,
214+
status=status,
215+
limit=limit):
216+
echo_json(result, pretty)
188217

189218

190219
@subscriptions.command() # type: ignore

planet/clients/subscriptions.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Planet Subscriptions API Python client."""
22

33
import logging
4-
from typing import AsyncIterator, Optional, Set
4+
from typing import AsyncIterator, Literal, Optional, Sequence
55

66
from planet.exceptions import APIError, ClientError
77
from planet.http import Session
@@ -58,7 +58,7 @@ def __init__(self,
5858
self._base_url = self._base_url[:-1]
5959

6060
async def list_subscriptions(self,
61-
status: Optional[Set[str]] = None,
61+
status: Optional[Sequence[str]] = None,
6262
limit: int = 100) -> AsyncIterator[dict]:
6363
"""Iterate over list of account subscriptions with optional filtering.
6464
@@ -216,16 +216,21 @@ async def get_subscription(self, subscription_id: str) -> dict:
216216

217217
async def get_results(self,
218218
subscription_id: str,
219-
status: Optional[Set[str]] = None,
219+
status: Optional[Sequence[Literal[
220+
"created",
221+
"queued",
222+
"processing",
223+
"failed",
224+
"success"]]] = None,
220225
limit: int = 100) -> AsyncIterator[dict]:
221226
"""Iterate over results of a Subscription.
222227
223-
Note:
228+
Notes:
224229
The name of this method is based on the API's method name. This
225230
method provides iteration over results, it does not get a
226231
single result description or return a list of descriptions.
227232
228-
Args:
233+
Parameters:
229234
subscription_id (str): id of a subscription.
230235
status (Set[str]): pass result with status in this set,
231236
filter out results with status not in this set.
@@ -252,7 +257,6 @@ class _ResultsPager(Paged):
252257
resp = await self._session.request(method='GET',
253258
url=url,
254259
params=params)
255-
256260
async for sub in _ResultsPager(resp,
257261
self._session.request,
258262
limit=limit):
@@ -263,3 +267,41 @@ class _ResultsPager(Paged):
263267
raise
264268
except ClientError: # pragma: no cover
265269
raise
270+
271+
async def get_results_csv(
272+
self,
273+
subscription_id: str,
274+
status: Optional[Sequence[Literal["created",
275+
"queued",
276+
"processing",
277+
"failed",
278+
"success"]]] = None,
279+
limit: int = 100,
280+
) -> AsyncIterator[str]:
281+
"""Iterate over rows of results CSV for a Subscription.
282+
283+
Parameters:
284+
subscription_id (str): id of a subscription.
285+
status (Set[str]): pass result with status in this set,
286+
filter out results with status not in this set.
287+
TODO: created, updated, completed, user_id
288+
289+
Yields:
290+
str: a row from a CSV file.
291+
292+
Raises:
293+
APIError: on an API server error.
294+
ClientError: on a client error.
295+
"""
296+
url = f'{self._base_url}/{subscription_id}/results'
297+
params = {'status': [val for val in status or {}], 'format': 'csv'}
298+
299+
# Note: retries are not implemented yet. This project has
300+
# retry logic for HTTP requests, but does not handle errors
301+
# during streaming. We may want to consider a retry decorator
302+
# for this entire method a la stamina:
303+
# https://github.com/hynek/stamina.
304+
async with self._session._client.stream('GET', url,
305+
params=params) as response:
306+
async for line in response.aiter_lines():
307+
yield line

tests/integration/test_subscriptions_api.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,25 @@ def result_pages(status=None, size=40):
128128
# must disable the default.
129129
res_api_mock = respx.mock(assert_all_called=False)
130130

131-
# 1. Request for status: created. Response has three pages.
131+
# 1. CSV results
132+
res_api_mock.route(
133+
M(url__startswith=TEST_URL), M(params__contains={'format': 'csv'})).mock(
134+
side_effect=[Response(200, text="id,status\n1234-abcd,SUCCESS\n")])
135+
136+
# 2. Request for status: created. Response has three pages.
132137
res_api_mock.route(
133138
M(url__startswith=TEST_URL),
134139
M(params__contains={'status': 'created'})).mock(side_effect=[
135140
Response(200, json=page)
136141
for page in result_pages(status={'created'}, size=40)
137142
])
138143

139-
# 2. Request for status: queued. Response has a single empty page.
144+
# 3. Request for status: queued. Response has a single empty page.
140145
res_api_mock.route(M(url__startswith=TEST_URL),
141146
M(params__contains={'status': 'queued'})).mock(
142147
side_effect=[Response(200, json={'results': []})])
143148

144-
# 3. No status requested. Response is the same as for 1.
149+
# 4. No status requested. Response is the same as for 1.
145150
res_api_mock.route(M(url__startswith=TEST_URL)).mock(
146151
side_effect=[Response(200, json=page) for page in result_pages(size=40)])
147152

@@ -276,6 +281,18 @@ async def test_get_results_success():
276281
assert len(results) == 100
277282

278283

284+
@pytest.mark.anyio
285+
@res_api_mock
286+
async def test_get_results_csv():
287+
"""Subscription CSV fetched, has the expected items."""
288+
async with Session() as session:
289+
client = SubscriptionsClient(session, base_url=TEST_URL)
290+
results = [res async for res in client.get_results_csv("42")]
291+
import csv
292+
rows = list(csv.reader(results))
293+
assert rows == [['id', 'status'], ['1234-abcd', 'SUCCESS']]
294+
295+
279296
paging_cycle_api_mock = respx.mock()
280297

281298
# Identical next links is a hangup we want to avoid.

tests/integration/test_subscriptions_cli.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,3 +306,11 @@ def test_request_catalog_success(invoke, geom_geojson):
306306
])
307307
assert json.loads(result.output) == source
308308
assert result.exit_code == 0 # success.
309+
310+
311+
@res_api_mock
312+
def test_subscriptions_results_csv(invoke):
313+
"""Get results as CSV."""
314+
result = invoke(['results', 'test', '--csv'])
315+
assert result.exit_code == 0 # success.
316+
assert result.output.splitlines() == ['id,status', '1234-abcd,SUCCESS']

0 commit comments

Comments
 (0)