From f920470a670b61889452422155625941d39c5b0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Fri, 7 Nov 2025 15:48:36 +0100 Subject: [PATCH 1/8] Working on porting test list endpoint --- .../services/ooniprobe/src/ooniprobe/prio.py | 61 ++++++++++- .../ooniprobe/routers/v1/probe_services.py | 102 +++++++++++++++++- 2 files changed, 158 insertions(+), 5 deletions(-) diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/prio.py b/ooniapi/services/ooniprobe/src/ooniprobe/prio.py index fce2e016..1ce1c752 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/prio.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/prio.py @@ -21,11 +21,17 @@ ``` """ -from typing import List, Tuple +from functools import lru_cache +import random +from typing import Annotated, Dict, List, Tuple import logging +from fastapi import Depends +from pydantic import BaseModel + from .common.clickhouse_utils import query_click from .common.metrics import timer +from .dependencies import ClickhouseDep from clickhouse_driver import Client as Clickhouse import sqlalchemy as sa @@ -155,3 +161,56 @@ def generate_test_list( if debug: return out, entries, prio_rules return out, (), () + + +class CTZ(BaseModel): + url: str + category_code: str + +def failover_fetch_citizenlab_data(clickhouse : Clickhouse) -> Dict[str, List[CTZ]]: + """ + Fetches the citizenlab table from the database. + Used only once at startime for failover. + """ + + log.info("Started failover_fetch_citizenlab_data") + + sql = """SELECT category_code, url + FROM citizenlab + WHERE cc = 'ZZ' + """ + + out: Dict[str, List[CTZ]] = {} + query = query_click(clickhouse, sql, {}, query_prio=1) + for e in query: + catcode = e["category_code"] + c = CTZ(url=e["url"], category_code=catcode) + out.setdefault(catcode, []).append(c) + + log.info("Fetch done: %d" % len(out)) + return out + +@lru_cache +def failover_test_lists_cache(clickhouse : ClickhouseDep): + return failover_fetch_citizenlab_data(clickhouse) + +FailoverTestListDep = Annotated[Dict[str, List[CTZ]], Depends(failover_test_lists_cache)] + +def failover_generate_test_list(failover_test_items: Dict[str, List[CTZ]], category_codes: tuple | None, limit: int): + if not category_codes: + category_codes = tuple(failover_test_items.keys()) + + candidates: List[CTZ] = [] + for catcode in category_codes: + if catcode not in failover_test_items: + continue + new = failover_test_items[catcode] + candidates.extend(new) + + limit = min(limit, len(candidates)) + selected = random.sample(candidates, k=limit) + out = [ + dict(category_code=entry.category_code, url=entry.url, country_code="XX") + for entry in selected + ] + return out diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index 8d4bc2e6..fcf8b80e 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -1,12 +1,14 @@ +from functools import lru_cache import logging from datetime import datetime, timezone, timedelta import time -from typing import List, Optional, Any, Dict, Tuple, Optional +from typing import Annotated, List, Optional, Any, Dict, Tuple, Optional import random +from fastapi.responses import JSONResponse import geoip2 import geoip2.errors -from fastapi import APIRouter, Depends, HTTPException, Response, Request +from fastapi import APIRouter, Depends, HTTPException, Query, Response, Request from prometheus_client import Counter, Info, Gauge from pydantic import Field @@ -21,8 +23,8 @@ from ...common.routers import BaseModel from ...common.auth import create_jwt, decode_jwt, jwt from ...common.config import Settings -from ...common.utils import setnocacheresponse -from ...prio import generate_test_list +from ...common.utils import setnocacheresponse, cachedjson, setcacheresponse +from ...prio import CTZ, FailoverTestListDep, failover_generate_test_list, generate_test_list, failover_fetch_citizenlab_data router = APIRouter(prefix="/v1") @@ -590,3 +592,95 @@ def random_web_test_helpers(th_list: List[str]) -> List[Dict]: for th_addr in th_list: out.append({"address": th_addr, "type": "https"}) return out + +class TestListUrlsMeta(BaseModel): + count: int + current_page: int + limit: int + next_url: str + pages: int + +class TestListUrlsResult(BaseModel): + category_code: str + country_code: str + url: str + +class TestListUrlsResponse(BaseModel): + """ + URL test list + """ + metadata: TestListUrlsMeta + results: List[TestListUrlsResult] + +@router.get("/test-list/urls") +def list_test_urls( + country_code : Annotated[ + str, + Query( + description="Two letter, uppercase country code", + min_length=2, max_length=2, + alias="probe_cc", + default="ZZ" + ) + ], + category_codes : Annotated[ + str, + Query( + description="Comma separated list of URL categories, all uppercase", + regex=r"[A-Z,]*" + ) + ], + limit: Annotated [ + int, + Query( + description="Maximum number of URLs to return", + default=-1, + le=9999 + ) + ], + debug: Annotated [ + bool, + Query( + description="Include measurement counts and priority", + default=False + ) + ], + clickhouse: ClickhouseDep, + failover_test_items: FailoverTestListDep, + response: Response +) -> TestListUrlsResponse | JSONResponse: + """ + Generate test URL list with prioritization + """ + try: + country_code = country_code.upper() + category_codes_list = category_codes.split(",") + if limit == -1: + limit = 9999 + except Exception as e: + log.error(e, exc_info=True) + return cachedjson("0s") + + try: + test_items, _1, _2 = generate_test_list(clickhouse, + country_code, category_codes_list, 0, limit, debug + ) + except Exception as e: + log.error(e, exc_info=True) + # failover_generate_test_list runs without any database interaction + test_items = failover_generate_test_list(failover_test_items, tuple(category_codes_list), limit) + + # TODO: remove current_page / next_url / pages ? + # metrics.gauge("test-list-urls-count", len(test_items)) + out = TestListUrlsResponse( + metadata=TestListUrlsMeta( + count = len(test_items), + current_page=-1, + limit=-1, + next_url="", + pages=1 + ), + results=test_items + ) + setcacheresponse("1s", response) + return out \ No newline at end of file From 1d779e05953bc951d60fa93d73319452b0138bdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Tue, 11 Nov 2025 10:20:36 +0100 Subject: [PATCH 2/8] Working on porting test-list/urls --- .../ooniprobe/src/ooniprobe/routers/v1/probe_services.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index fcf8b80e..24de8a1e 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -671,7 +671,7 @@ def list_test_urls( test_items = failover_generate_test_list(failover_test_items, tuple(category_codes_list), limit) # TODO: remove current_page / next_url / pages ? - # metrics.gauge("test-list-urls-count", len(test_items)) + # metrics.gauge("test-list-urls-count", len(test_items)) # TODO Add this metric out = TestListUrlsResponse( metadata=TestListUrlsMeta( count = len(test_items), @@ -680,7 +680,7 @@ def list_test_urls( next_url="", pages=1 ), - results=test_items + results=[TestListUrlsResult(**item) for item in test_items] ) setcacheresponse("1s", response) return out \ No newline at end of file From 0df216b04530f7281f1e9ecbf21b721298cf1634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Tue, 11 Nov 2025 13:39:21 +0100 Subject: [PATCH 3/8] Adding tests to test-list endpoint --- .../services/ooniprobe/src/ooniprobe/prio.py | 1 - .../ooniprobe/routers/v1/probe_services.py | 39 +++++++++---------- ooniapi/services/ooniprobe/tests/conftest.py | 27 +++++++++++-- .../tests/fixtures/initdb/02-fixtures.sql | 4 +- .../ooniprobe/tests/integ/test_checkin.py | 26 ------------- 5 files changed, 43 insertions(+), 54 deletions(-) diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/prio.py b/ooniapi/services/ooniprobe/src/ooniprobe/prio.py index 1ce1c752..09717877 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/prio.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/prio.py @@ -157,7 +157,6 @@ def generate_test_list( out.append(i) if len(out) >= limit: break - if debug: return out, entries, prio_rules return out, (), () diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index 24de8a1e..120e98c7 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -1,8 +1,7 @@ -from functools import lru_cache import logging from datetime import datetime, timezone, timedelta import time -from typing import Annotated, List, Optional, Any, Dict, Tuple, Optional +from typing import Annotated, List, Optional, Any, Dict, Tuple import random from fastapi.responses import JSONResponse @@ -24,7 +23,7 @@ from ...common.auth import create_jwt, decode_jwt, jwt from ...common.config import Settings from ...common.utils import setnocacheresponse, cachedjson, setcacheresponse -from ...prio import CTZ, FailoverTestListDep, failover_generate_test_list, generate_test_list, failover_fetch_citizenlab_data +from ...prio import FailoverTestListDep, failover_generate_test_list, generate_test_list router = APIRouter(prefix="/v1") @@ -614,41 +613,38 @@ class TestListUrlsResponse(BaseModel): @router.get("/test-list/urls") def list_test_urls( + clickhouse: ClickhouseDep, + failover_test_items: FailoverTestListDep, + response: Response, + category_codes : Annotated[ + str, + Query( + description="Comma separated list of URL categories, all uppercase", + pattern=r"[A-Z,]*" + ) + ], country_code : Annotated[ str, Query( description="Two letter, uppercase country code", min_length=2, max_length=2, alias="probe_cc", - default="ZZ" ) - ], - category_codes : Annotated[ - str, - Query( - description="Comma separated list of URL categories, all uppercase", - regex=r"[A-Z,]*" - ) - ], + ] = "ZZ", limit: Annotated [ int, Query( description="Maximum number of URLs to return", - default=-1, le=9999 ) - ], + ] = -1, debug: Annotated [ bool, Query( description="Include measurement counts and priority", - default=False ) - ], - clickhouse: ClickhouseDep, - failover_test_items: FailoverTestListDep, - response: Response -) -> TestListUrlsResponse | JSONResponse: + ] = False +) -> TestListUrlsResponse | Dict[str, Any]: """ Generate test URL list with prioritization """ @@ -659,7 +655,8 @@ def list_test_urls( limit = 9999 except Exception as e: log.error(e, exc_info=True) - return cachedjson("0s") + setnocacheresponse(response) + return {} try: test_items, _1, _2 = generate_test_list(clickhouse, diff --git a/ooniapi/services/ooniprobe/tests/conftest.py b/ooniapi/services/ooniprobe/tests/conftest.py index f1543893..0a382379 100644 --- a/ooniapi/services/ooniprobe/tests/conftest.py +++ b/ooniapi/services/ooniprobe/tests/conftest.py @@ -4,6 +4,7 @@ import pytest import shutil import os +import json from urllib.request import urlopen from fastapi.testclient import TestClient @@ -12,6 +13,7 @@ from clickhouse_driver import Client as ClickhouseClient from ooniprobe.common.config import Settings +from ooniprobe.common.clickhouse_utils import insert_click from ooniprobe.common.dependencies import get_settings from ooniprobe.dependencies import get_s3_client from ooniprobe.main import app @@ -78,7 +80,7 @@ def client_with_bad_settings(): @pytest.fixture(scope="session") def fixture_path(): """ - Directory for this fixtures used to store temporary data, will be + Directory for this fixtures used to store temporary data, will be deleted after the tests are finished """ FIXTURE_PATH = Path(os.path.dirname(os.path.realpath(__file__))) / "data" @@ -169,9 +171,26 @@ def fastpath_server(docker_ip, docker_services): ) yield url -def is_fastpath_running(url: str) -> bool: - try: +def is_fastpath_running(url: str) -> bool: + try: resp = urlopen(url) return resp.status == 200 except: - return False \ No newline at end of file + return False + +@pytest.fixture +def load_url_priorities(clickhouse_db): + path = Path("tests/fixtures/data") + filename = "url_priorities_us.json" + file = Path(path, filename) + + with file.open("r") as f: + j = json.load(f) + + # 'sign' is created with default value 0, causing a db error. + # use 1 to prevent it + for row in j: + row["sign"] = 1 + + query = "INSERT INTO url_priorities (sign, category_code, cc, domain, url, priority) VALUES" + insert_click(clickhouse_db, query, j) diff --git a/ooniapi/services/ooniprobe/tests/fixtures/initdb/02-fixtures.sql b/ooniapi/services/ooniprobe/tests/fixtures/initdb/02-fixtures.sql index 0adf8e8d..2cf48d4e 100644 --- a/ooniapi/services/ooniprobe/tests/fixtures/initdb/02-fixtures.sql +++ b/ooniapi/services/ooniprobe/tests/fixtures/initdb/02-fixtures.sql @@ -9,6 +9,7 @@ INSERT INTO citizenlab VALUES ('www.ushmm.org', 'https://www.ushmm.org/', 'ZZ', INSERT INTO citizenlab VALUES ('www.cabofrio.rj.gov.br', 'http://www.cabofrio.rj.gov.br/', 'BR', 'CULTR'); INSERT INTO citizenlab VALUES ('ncac.org', 'http://ncac.org/', 'ZZ', 'NEWS'); INSERT INTO citizenlab VALUES ('ncac.org', 'https://ncac.org/', 'ZZ', 'NEWS'); +INSERT INTO citizenlab VALUES ('ncacd.org', 'https://ncacd.org/', 'ZZ', 'NEWS'); INSERT INTO citizenlab VALUES ('www.facebook.com','http://www.facebook.com/saakashvilimikheil','ge','NEWS'); INSERT INTO citizenlab VALUES ('www.facebook.com','http://www.facebook.com/somsakjeam/videos/1283095981743678/','th','POLR'); INSERT INTO citizenlab VALUES ('www.facebook.com','https://www.facebook.com/','ZZ','GRP'); @@ -17,8 +18,7 @@ INSERT INTO citizenlab VALUES ('facebook.com','https://facebook.com/watch','jo', INSERT INTO citizenlab VALUES ('twitter.com','http://twitter.com/ghonim','kw','POLR'); INSERT INTO citizenlab VALUES ('twitter.com','http://twitter.com/ghonim','so','POLR'); INSERT INTO citizenlab VALUES ('twitter.com','https://twitter.com/','ZZ','GRP'); +INSERT INTO citizenlab VALUES ('twitter.com','https://twitter.com/funny','ZZ','HUMR'); -- get_measurement_meta integ tests INSERT INTO jsonl (report_id, input, s3path, linenum) VALUES ('20210709T004340Z_webconnectivity_MY_4818_n1_YCM7J9mGcEHds2K3', 'https://www.backtrack-linux.org/', 'raw/20210709/00/MY/webconnectivity/2021070900_MY_webconnectivity.n0.2.jsonl.gz', 35) - - diff --git a/ooniapi/services/ooniprobe/tests/integ/test_checkin.py b/ooniapi/services/ooniprobe/tests/integ/test_checkin.py index c2d6419f..02e17038 100644 --- a/ooniapi/services/ooniprobe/tests/integ/test_checkin.py +++ b/ooniapi/services/ooniprobe/tests/integ/test_checkin.py @@ -1,9 +1,3 @@ -from pathlib import Path -import json -import pytest - - -from ooniprobe.common.clickhouse_utils import insert_click def getjson(client, url): @@ -29,26 +23,6 @@ def postj(client, url, **kw): assert response.status_code == 200 return response.json() - -## Fixtures -@pytest.fixture -def load_url_priorities(clickhouse_db): - path = Path("tests/fixtures/data") - filename = "url_priorities_us.json" - file = Path(path, filename) - - with file.open("r") as f: - j = json.load(f) - - # 'sign' is created with default value 0, causing a db error. - # use 1 to prevent it - for row in j: - row["sign"] = 1 - - query = "INSERT INTO url_priorities (sign, category_code, cc, domain, url, priority) VALUES" - insert_click(clickhouse_db, query, j) - - ## Tests From 205e971558192bf19df9a35083e914ddc8b7b23f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Tue, 11 Nov 2025 16:23:25 +0100 Subject: [PATCH 4/8] Adding more tests --- ooniapi/services/ooniprobe/src/ooniprobe/prio.py | 6 +++--- .../ooniprobe/src/ooniprobe/routers/v1/probe_services.py | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/prio.py b/ooniapi/services/ooniprobe/src/ooniprobe/prio.py index 09717877..aff48cf9 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/prio.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/prio.py @@ -124,7 +124,7 @@ def fetch_prioritization_rules(clickhouse_db: Clickhouse, cc: str) -> tuple: def generate_test_list( clickhouse: Clickhouse, country_code: str, - category_codes: List, + category_codes: List[str] | None, probe_asn: int, limit: int, debug: bool, @@ -195,9 +195,9 @@ def failover_test_lists_cache(clickhouse : ClickhouseDep): FailoverTestListDep = Annotated[Dict[str, List[CTZ]], Depends(failover_test_lists_cache)] -def failover_generate_test_list(failover_test_items: Dict[str, List[CTZ]], category_codes: tuple | None, limit: int): +def failover_generate_test_list(failover_test_items: Dict[str, List[CTZ]], category_codes: List[str] | None, limit: int): if not category_codes: - category_codes = tuple(failover_test_items.keys()) + category_codes = list(failover_test_items.keys()) candidates: List[CTZ] = [] for catcode in category_codes: diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index 120e98c7..cc401c5f 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -617,12 +617,12 @@ def list_test_urls( failover_test_items: FailoverTestListDep, response: Response, category_codes : Annotated[ - str, + str | None, Query( description="Comma separated list of URL categories, all uppercase", pattern=r"[A-Z,]*" ) - ], + ] = None, country_code : Annotated[ str, Query( @@ -650,7 +650,7 @@ def list_test_urls( """ try: country_code = country_code.upper() - category_codes_list = category_codes.split(",") + category_codes_list = category_codes.split(",") if category_codes else None if limit == -1: limit = 9999 except Exception as e: @@ -665,7 +665,7 @@ def list_test_urls( except Exception as e: log.error(e, exc_info=True) # failover_generate_test_list runs without any database interaction - test_items = failover_generate_test_list(failover_test_items, tuple(category_codes_list), limit) + test_items = failover_generate_test_list(failover_test_items, category_codes_list, limit) # TODO: remove current_page / next_url / pages ? # metrics.gauge("test-list-urls-count", len(test_items)) # TODO Add this metric From 75813bc42d9da02b4be99796a6ee23005a8376bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Tue, 11 Nov 2025 16:27:53 +0100 Subject: [PATCH 5/8] Add metric for test list count --- .../ooniprobe/src/ooniprobe/routers/v1/probe_services.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index cc401c5f..61a83475 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -64,6 +64,10 @@ class Metrics: "geoip_asn_differs", "There's a mismatch between reported ASN and observed ASN" ) + TEST_LIST_URLS_COUNT = Gauge( + "test_list_urls_count", "How many urls were generated for a test list" + ) + class ProbeLogin(BaseModel): # Allow None username and password @@ -668,7 +672,7 @@ def list_test_urls( test_items = failover_generate_test_list(failover_test_items, category_codes_list, limit) # TODO: remove current_page / next_url / pages ? - # metrics.gauge("test-list-urls-count", len(test_items)) # TODO Add this metric + Metrics.TEST_LIST_URLS_COUNT.set(len(test_items)) out = TestListUrlsResponse( metadata=TestListUrlsMeta( count = len(test_items), From cfc003afa5f2411ec4b18820c2778b281b471646 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Tue, 11 Nov 2025 16:30:57 +0100 Subject: [PATCH 6/8] black reformat --- .../services/ooniprobe/src/ooniprobe/prio.py | 18 +++++-- .../src/ooniprobe/routers/reports.py | 10 ++-- .../ooniprobe/routers/v1/probe_services.py | 53 +++++++++---------- ooniapi/services/ooniprobe/tests/conftest.py | 13 ++++- .../services/ooniprobe/tests/fakepath/main.py | 3 +- .../ooniprobe/tests/integ/test_checkin.py | 3 +- .../ooniprobe/tests/integ/test_reports.py | 8 ++- ooniapi/services/ooniprobe/tests/test_prio.py | 2 +- 8 files changed, 68 insertions(+), 42 deletions(-) diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/prio.py b/ooniapi/services/ooniprobe/src/ooniprobe/prio.py index aff48cf9..448599f3 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/prio.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/prio.py @@ -166,7 +166,8 @@ class CTZ(BaseModel): url: str category_code: str -def failover_fetch_citizenlab_data(clickhouse : Clickhouse) -> Dict[str, List[CTZ]]: + +def failover_fetch_citizenlab_data(clickhouse: Clickhouse) -> Dict[str, List[CTZ]]: """ Fetches the citizenlab table from the database. Used only once at startime for failover. @@ -189,13 +190,22 @@ def failover_fetch_citizenlab_data(clickhouse : Clickhouse) -> Dict[str, List[CT log.info("Fetch done: %d" % len(out)) return out + @lru_cache -def failover_test_lists_cache(clickhouse : ClickhouseDep): +def failover_test_lists_cache(clickhouse: ClickhouseDep): return failover_fetch_citizenlab_data(clickhouse) -FailoverTestListDep = Annotated[Dict[str, List[CTZ]], Depends(failover_test_lists_cache)] -def failover_generate_test_list(failover_test_items: Dict[str, List[CTZ]], category_codes: List[str] | None, limit: int): +FailoverTestListDep = Annotated[ + Dict[str, List[CTZ]], Depends(failover_test_lists_cache) +] + + +def failover_generate_test_list( + failover_test_items: Dict[str, List[CTZ]], + category_codes: List[str] | None, + limit: int, +): if not category_codes: category_codes = list(failover_test_items.keys()) diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py index 213669ee..4405683d 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py @@ -221,7 +221,7 @@ async def receive_measurement( log.error( f"[Try {t+1}/{N_RETRIES}] Error trying to send measurement to the fastpath. Error: {exc}" ) - sleep_time = random.uniform(0, min(3, 0.3 * 2 ** t)) + sleep_time = random.uniform(0, min(3, 0.3 * 2**t)) await asyncio.sleep(sleep_time) Metrics.SEND_FASTPATH_FAILURE.inc() @@ -274,9 +274,13 @@ def compare_probe_msmt_cc_asn( Metrics.PROBE_CC_ASN_MATCH.inc() elif db_probe_cc != cc: log.error(f"db_cc != cc: {db_probe_cc} != {cc}") - Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="cc", reported=cc, detected=db_probe_cc).inc() + Metrics.PROBE_CC_ASN_NO_MATCH.labels( + mismatch="cc", reported=cc, detected=db_probe_cc + ).inc() elif db_asn != asn: log.error(f"db_asn != asn: {db_asn} != {asn}") - Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="asn", reported=asn, detected=db_asn).inc() + Metrics.PROBE_CC_ASN_NO_MATCH.labels( + mismatch="asn", reported=asn, detected=db_asn + ).inc() except Exception: pass diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index 61a83475..ffd8052b 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -4,7 +4,6 @@ from typing import Annotated, List, Optional, Any, Dict, Tuple import random -from fastapi.responses import JSONResponse import geoip2 import geoip2.errors from fastapi import APIRouter, Depends, HTTPException, Query, Response, Request @@ -22,7 +21,7 @@ from ...common.routers import BaseModel from ...common.auth import create_jwt, decode_jwt, jwt from ...common.config import Settings -from ...common.utils import setnocacheresponse, cachedjson, setcacheresponse +from ...common.utils import setnocacheresponse, setcacheresponse from ...prio import FailoverTestListDep, failover_generate_test_list, generate_test_list router = APIRouter(prefix="/v1") @@ -596,6 +595,7 @@ def random_web_test_helpers(th_list: List[str]) -> List[Dict]: out.append({"address": th_addr, "type": "https"}) return out + class TestListUrlsMeta(BaseModel): count: int current_page: int @@ -603,51 +603,52 @@ class TestListUrlsMeta(BaseModel): next_url: str pages: int + class TestListUrlsResult(BaseModel): category_code: str country_code: str url: str + class TestListUrlsResponse(BaseModel): """ URL test list """ + metadata: TestListUrlsMeta results: List[TestListUrlsResult] + @router.get("/test-list/urls") def list_test_urls( clickhouse: ClickhouseDep, failover_test_items: FailoverTestListDep, response: Response, - category_codes : Annotated[ + category_codes: Annotated[ str | None, Query( description="Comma separated list of URL categories, all uppercase", - pattern=r"[A-Z,]*" - ) + pattern=r"[A-Z,]*", + ), ] = None, - country_code : Annotated[ + country_code: Annotated[ str, Query( description="Two letter, uppercase country code", - min_length=2, max_length=2, + min_length=2, + max_length=2, alias="probe_cc", - ) + ), ] = "ZZ", - limit: Annotated [ - int, - Query( - description="Maximum number of URLs to return", - le=9999 - ) + limit: Annotated[ + int, Query(description="Maximum number of URLs to return", le=9999) ] = -1, - debug: Annotated [ + debug: Annotated[ bool, Query( description="Include measurement counts and priority", - ) - ] = False + ), + ] = False, ) -> TestListUrlsResponse | Dict[str, Any]: """ Generate test URL list with prioritization @@ -663,25 +664,23 @@ def list_test_urls( return {} try: - test_items, _1, _2 = generate_test_list(clickhouse, - country_code, category_codes_list, 0, limit, debug + test_items, _1, _2 = generate_test_list( + clickhouse, country_code, category_codes_list, 0, limit, debug ) except Exception as e: log.error(e, exc_info=True) # failover_generate_test_list runs without any database interaction - test_items = failover_generate_test_list(failover_test_items, category_codes_list, limit) + test_items = failover_generate_test_list( + failover_test_items, category_codes_list, limit + ) # TODO: remove current_page / next_url / pages ? Metrics.TEST_LIST_URLS_COUNT.set(len(test_items)) out = TestListUrlsResponse( metadata=TestListUrlsMeta( - count = len(test_items), - current_page=-1, - limit=-1, - next_url="", - pages=1 + count=len(test_items), current_page=-1, limit=-1, next_url="", pages=1 ), - results=[TestListUrlsResult(**item) for item in test_items] + results=[TestListUrlsResult(**item) for item in test_items], ) setcacheresponse("1s", response) - return out \ No newline at end of file + return out diff --git a/ooniapi/services/ooniprobe/tests/conftest.py b/ooniapi/services/ooniprobe/tests/conftest.py index 0a382379..303d2877 100644 --- a/ooniapi/services/ooniprobe/tests/conftest.py +++ b/ooniapi/services/ooniprobe/tests/conftest.py @@ -77,6 +77,7 @@ def client_with_bad_settings(): JWT_ENCRYPTION_KEY = "super_secure" + @pytest.fixture(scope="session") def fixture_path(): """ @@ -92,6 +93,7 @@ def fixture_path(): except FileNotFoundError: pass + @pytest.fixture() def geoip_db_dir(fixture_path): ooni_tempdir = fixture_path / "geoip" @@ -109,7 +111,9 @@ def client(clickhouse_server, test_settings, geoip_db_dir): @pytest.fixture -def test_settings(alembic_migration, docker_ip, docker_services, geoip_db_dir, fastpath_server): +def test_settings( + alembic_migration, docker_ip, docker_services, geoip_db_dir, fastpath_server +): port = docker_services.port_for("clickhouse", 9000) yield make_override_get_settings( postgresql_url=alembic_migration, @@ -118,7 +122,7 @@ def test_settings(alembic_migration, docker_ip, docker_services, geoip_db_dir, f clickhouse_url=f"clickhouse://test:test@{docker_ip}:{port}", geoip_db_dir=geoip_db_dir, collector_id="1", - fastpath_url=fastpath_server + fastpath_url=fastpath_server, ) @@ -151,6 +155,7 @@ def clickhouse_server(docker_ip, docker_services): def clickhouse_db(clickhouse_server): yield ClickhouseClient.from_url(clickhouse_server) + class S3ClientMock: def __init__(self) -> None: @@ -159,9 +164,11 @@ def __init__(self) -> None: def upload_fileobj(self, Fileobj, Bucket: str, Key: str): self.files.append(f"{Bucket}/{Key}") + def get_s3_client_mock() -> S3ClientMock: return S3ClientMock() + @pytest.fixture(scope="session") def fastpath_server(docker_ip, docker_services): port = docker_services.port_for("fakepath", 80) @@ -171,6 +178,7 @@ def fastpath_server(docker_ip, docker_services): ) yield url + def is_fastpath_running(url: str) -> bool: try: resp = urlopen(url) @@ -178,6 +186,7 @@ def is_fastpath_running(url: str) -> bool: except: return False + @pytest.fixture def load_url_priorities(clickhouse_db): path = Path("tests/fixtures/data") diff --git a/ooniapi/services/ooniprobe/tests/fakepath/main.py b/ooniapi/services/ooniprobe/tests/fakepath/main.py index 6ffe633e..3dd68c9d 100644 --- a/ooniapi/services/ooniprobe/tests/fakepath/main.py +++ b/ooniapi/services/ooniprobe/tests/fakepath/main.py @@ -11,5 +11,4 @@ @app.get("/") def health(): - return - + return diff --git a/ooniapi/services/ooniprobe/tests/integ/test_checkin.py b/ooniapi/services/ooniprobe/tests/integ/test_checkin.py index 02e17038..7e8401fd 100644 --- a/ooniapi/services/ooniprobe/tests/integ/test_checkin.py +++ b/ooniapi/services/ooniprobe/tests/integ/test_checkin.py @@ -1,5 +1,3 @@ - - def getjson(client, url): response = client.get(url) assert response.status_code == 200 @@ -23,6 +21,7 @@ def postj(client, url, **kw): assert response.status_code == 200 return response.json() + ## Tests diff --git a/ooniapi/services/ooniprobe/tests/integ/test_reports.py b/ooniapi/services/ooniprobe/tests/integ/test_reports.py index 4abf3d09..7ee2d142 100644 --- a/ooniapi/services/ooniprobe/tests/integ/test_reports.py +++ b/ooniapi/services/ooniprobe/tests/integ/test_reports.py @@ -1,16 +1,19 @@ import json import zstd + def postj(client, url, json): response = client.post(url, json=json) assert response.status_code == 200, response.json() return response.json() + def post(client, url, data, headers=None): response = client.post(url, data=data, headers=headers) assert response.status_code == 200 return response.json() + def test_collector_open_report(client): j = { "data_format_version": "0.2.0", @@ -32,11 +35,13 @@ def test_collector_open_report(client): } assert len(rid) == 61, rid + def test_collector_upload_msmt_bogus(client): j = dict(format="json", content=dict(test_keys={})) resp = client.post("/report/bogus", json=j) assert resp.status_code == 400, resp + def test_collector_upload_msmt_valid(client): # open report, upload j = { @@ -59,12 +64,13 @@ def test_collector_upload_msmt_valid(client): assert len(rid) == 61, rid msmt = dict(test_keys={}) - c = postj(client, f"/report/{rid}", {"format":"json", "content":msmt}) + c = postj(client, f"/report/{rid}", {"format": "json", "content": msmt}) assert c == {} c = postj(client, f"/report/{rid}/close", json={}) assert c == {} + def test_collector_upload_msmt_valid_zstd(client): rid = "20230101T000000Z_integtest_IT_1_n1_integtest0000000" msmt = json.dumps(dict(test_keys={})).encode() diff --git a/ooniapi/services/ooniprobe/tests/test_prio.py b/ooniapi/services/ooniprobe/tests/test_prio.py index ba1a85b6..e1fea847 100644 --- a/ooniapi/services/ooniprobe/tests/test_prio.py +++ b/ooniapi/services/ooniprobe/tests/test_prio.py @@ -146,4 +146,4 @@ def test_compute_priorities_country_list(): "url": "https://ooni.org/", "weight": 11.052631578947368, } - ] \ No newline at end of file + ] From 56050356367ce5a154ba4497d9a4ff852ee3987f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luis=20D=C3=ADaz?= Date: Wed, 26 Nov 2025 10:18:06 +0100 Subject: [PATCH 7/8] Add tests for test lists --- .../ooniprobe/tests/integ/test_test_lists.py | 62 +++++++++++++++++++ ooniapi/services/ooniprobe/tests/utils.py | 23 +++++++ 2 files changed, 85 insertions(+) create mode 100644 ooniapi/services/ooniprobe/tests/integ/test_test_lists.py create mode 100644 ooniapi/services/ooniprobe/tests/utils.py diff --git a/ooniapi/services/ooniprobe/tests/integ/test_test_lists.py b/ooniapi/services/ooniprobe/tests/integ/test_test_lists.py new file mode 100644 index 00000000..9585ac46 --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/integ/test_test_lists.py @@ -0,0 +1,62 @@ +from ..utils import getj + + +def test_url_prioritization(client, load_url_priorities): + c = getj(client, "/api/v1/test-list/urls?limit=100") + assert "metadata" in c + assert c["metadata"]["count"] > 1 + c["metadata"]["count"] = 0 + assert c["metadata"] == { + "count": 0, + "current_page": -1, + "limit": -1, + "next_url": "", + "pages": 1, + } + + assert set(r["url"] for r in c["results"]) + + +def test_url_prioritization_category_code(client, load_url_priorities): + c = getj(client, "/api/v1/test-list/urls?category_codes=NEWS&limit=100") + assert "metadata" in c + for r in c["results"]: + assert r["category_code"] == "NEWS" + + assert set(r["url"] for r in c["results"]) + + +def test_url_prioritization_category_codes(client, load_url_priorities): + c = getj( + client, + "/api/v1/test-list/urls?category_codes=NEWS,HUMR&country_code=US&limit=100", + ) + assert "metadata" in c + for r in c["results"]: + assert r["category_code"] in ("NEWS", "HUMR") + + assert set(r["url"] for r in c["results"]) + + +def test_url_prioritization_country_code_limit(client, load_url_priorities): + c = getj(client, "/api/v1/test-list/urls?country_code=US&limit=4") + assert "metadata" in c + assert c["metadata"]["count"] > 1 + c["metadata"]["count"] = 0 + assert c["metadata"] == { + "count": 0, + "current_page": -1, + "limit": -1, + "next_url": "", + "pages": 1, + } + for r in c["results"]: + assert r["country_code"] in ("XX", "US") + + assert 4 >= len(set(r["url"] for r in c["results"])) > 0 + + +def test_url_prioritization_country_code_nolimit(client, load_url_priorities): + c = getj(client, "/api/v1/test-list/urls?country_code=US") + assert "metadata" in c + assert sum(1 for r in c["results"] if r["country_code"] == "XX") diff --git a/ooniapi/services/ooniprobe/tests/utils.py b/ooniapi/services/ooniprobe/tests/utils.py new file mode 100644 index 00000000..01c638fa --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/utils.py @@ -0,0 +1,23 @@ +from httpx import Client +from typing import Dict, Any +from fastapi import status + +def getj(client : Client, url: str, params: Dict[str, Any] = {}) -> Dict[str, Any]: + resp = client.get(url) + assert resp.status_code == status.HTTP_200_OK, f"Unexpected status code: {resp.status_code} - {url}. {resp.content}" + return resp.json() + +def postj( + client : Client, + url: str, + json: Dict[str, Any] | None = None, + headers: Dict[str, Any] | None = None + ) -> Dict[str, Any]: + resp = client.post(url, json=json, headers=headers) + assert resp.status_code == status.HTTP_200_OK, f"Unexpected status code: {resp.status_code} - {url}. {resp.content}" + return resp.json() + +def post(client : Client, url, data=None, headers=None): + response = client.post(url, data=data, headers=headers) + assert response.status_code == 200 + return response.json() \ No newline at end of file From 46b3555552bd950a05217f79b1b8502ac2371661 Mon Sep 17 00:00:00 2001 From: aagbsn Date: Thu, 18 Dec 2025 11:05:33 +0000 Subject: [PATCH 8/8] Port test lists.1 (#1047) * in test_settings, add clickhouse_server fixture (#1028) * in test_settings, add clickhouse_server fixture * delay and decrease probe frequency of is_clickhouse_running to reduce logspam * remove docker_ip and docker_services included by clickhouse_server * add collectors to ooniapi (#1032) * add collectors to ooniapi * Make method "GET" on route for list_collectors * Use SettingsDep from ooniprobe.dependencies * add geolookup to ooniapi probe services (#1031) * add geolookup to ooniapi probe services this includes a small refactor to make the probe_geoip method re-usable * Replace class docstring with pydantic.Field annotation * use pydantic field validation for geolookup endpoint * probe_services: validate probe_geoip; add type GeoLookupResult * monkeypatch lookup_probe_cc and lookup_probe_network to provide static responses * add bouncer/net-tests to ooniapi under new router bouncer (#1036) * add bouncer/net-tests to ooniapi under new router bouncer * fix pylint import complaints * let fastapi validate input * exclude unset parameters in bouncer/net-tests * add bouncer tests from api/tests/integ/test_probe_services.py I modified the tests to expect any error (other than http response 200) because FastAPI returns HTTP error 422 Unprocessable Entity instead of 400, as the tests do not supply a request that the model can validate * raise HTTPException with status 400 for backwards-compatibility * check status code on error is 400 for backwards compatibility with old probes * Fix bad geoip reporting (#1022) * Add logging to failing comparision function * Trigger CI * Trigger CI * Fix bad db_asn logging * Improve logging of geoip mismatches * Fix extract_probe_ipaddr * Cusum changepoint api (#1027) * Add initial endpoint shape for event detector changepoints * Add cusum changepoint list endpoint * Remove unused request model * Add event detector changepoint * Fix event detector table migration for tests * Add basic test for changepoint endpoint * Add basic filtering test to changepoint endpoint * Remove unused log * Add tests to check filtering by date * Add utils module with useful types * Remove unused types * Remove unused import * Add enum for change direction * Black reformat * Add testing data from the DB * Remove out dated comment * Trigger ci * fix typo in changepoint api endpoint * trigger ci * Modify changepoints table to add block_type column * replace bad quotes * Add block_type field to changepoints API * Add prioritization CRUD methods (#1040) * Add prioritization CRUD methods /api/_/show_countries_prioritization /api/_/debug_prioritization * rename routers/prio to routers/prio_crud * use raw string in regex * S3 config parameter (#1039) * Add config bucket setting and function to read file from bucket * trigger ci --------- Co-authored-by: Luis Diaz <41093870+LDiazN@users.noreply.github.com> --- ooniapi/common/src/common/config.py | 23 ++- .../src/oonimeasurements/dependencies.py | 7 +- .../routers/data/aggregate_analysis.py | 175 ++++++++++++++++-- .../routers/v1/measurements.py | 5 +- .../src/oonimeasurements/utils/__init__.py | 0 .../src/oonimeasurements/utils/api.py | 10 + .../oonimeasurements/tests/conftest.py | 4 +- .../migrations/0_clickhouse_init_tables.sql | 45 ++++- .../4_clickhouse_populate_changepoints.sql | 11 ++ .../tests/test_event_detection.py | 136 ++++++++++++++ .../tests/test_measurements.py | 15 +- .../services/ooniprobe/src/ooniprobe/main.py | 6 +- .../src/ooniprobe/routers/bouncer.py | 110 +++++++++++ .../src/ooniprobe/routers/prio_crud.py | 87 +++++++++ .../ooniprobe/routers/v1/probe_services.py | 72 ++++++- .../services/ooniprobe/src/ooniprobe/utils.py | 26 ++- ooniapi/services/ooniprobe/tests/conftest.py | 12 +- .../ooniprobe/tests/integ/test_geolookup.py | 27 +++ .../tests/integ/test_list_collectors.py | 5 + .../services/ooniprobe/tests/test_bouncer.py | 74 ++++++++ ooniapi/services/ooniprobe/tests/test_prio.py | 27 +++ .../services/ooniprobe/tests/test_utils.py | 7 + 22 files changed, 846 insertions(+), 38 deletions(-) create mode 100644 ooniapi/services/oonimeasurements/src/oonimeasurements/utils/__init__.py create mode 100644 ooniapi/services/oonimeasurements/src/oonimeasurements/utils/api.py create mode 100644 ooniapi/services/oonimeasurements/tests/migrations/4_clickhouse_populate_changepoints.sql create mode 100644 ooniapi/services/oonimeasurements/tests/test_event_detection.py create mode 100644 ooniapi/services/ooniprobe/src/ooniprobe/routers/bouncer.py create mode 100644 ooniapi/services/ooniprobe/src/ooniprobe/routers/prio_crud.py create mode 100644 ooniapi/services/ooniprobe/tests/integ/test_geolookup.py create mode 100644 ooniapi/services/ooniprobe/tests/integ/test_list_collectors.py create mode 100644 ooniapi/services/ooniprobe/tests/test_bouncer.py create mode 100644 ooniapi/services/ooniprobe/tests/test_utils.py diff --git a/ooniapi/common/src/common/config.py b/ooniapi/common/src/common/config.py index 803493c3..f2c9c406 100644 --- a/ooniapi/common/src/common/config.py +++ b/ooniapi/common/src/common/config.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Dict from pydantic_settings import BaseSettings @@ -33,6 +33,9 @@ class Settings(BaseSettings): vpn_credential_refresh_hours: int = 24 + # Bucket used to store configuration files + config_bucket: str = "" + # Where the geoip DBs are downloaded to geoip_db_dir: str = "/var/lib/ooni/geoip" # Ooniprobe only @@ -41,3 +44,21 @@ class Settings(BaseSettings): failed_reports_bucket: str = ( "" # for uploading reports that couldn't be sent to fastpath ) + + # ooniprobe client configuration + collectors: List[Dict[str, str]] = [ + {"address": "httpo://guegdifjy7bjpequ.onion", "type": "onion"}, + {"address": "https://ams-pg.ooni.org:443", "type": "https"}, + { + "address": "https://dkyhjv0wpi2dk.cloudfront.net", + "front": "dkyhjv0wpi2dk.cloudfront.net", + "type": "cloudfront", + }, + {"address": "httpo://guegdifjy7bjpequ.onion", "type": "onion"}, + {"address": "https://ams-pg.ooni.org:443", "type": "https"}, + { + "address": "https://dkyhjv0wpi2dk.cloudfront.net", + "front": "dkyhjv0wpi2dk.cloudfront.net", + "type": "cloudfront", + }, + ] diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/dependencies.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/dependencies.py index 60b177c8..c37a0107 100644 --- a/ooniapi/services/oonimeasurements/src/oonimeasurements/dependencies.py +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/dependencies.py @@ -7,10 +7,15 @@ from .common.config import Settings from .common.dependencies import get_settings +SettingsDep = Annotated[Settings, Depends(get_settings)] -def get_clickhouse_session(settings: Annotated[Settings, Depends(get_settings)]): + +def get_clickhouse_session(settings: SettingsDep): db = Clickhouse.from_url(settings.clickhouse_url) try: yield db finally: db.disconnect() + + +ClickhouseDep = Annotated[Clickhouse, Depends(get_clickhouse_session)] diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/data/aggregate_analysis.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/data/aggregate_analysis.py index 09924584..1851671a 100644 --- a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/data/aggregate_analysis.py +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/data/aggregate_analysis.py @@ -1,15 +1,17 @@ +from enum import Enum import time import math from datetime import datetime -from typing import List, Literal, Optional, Tuple, Union, Dict +from ...common.clickhouse_utils import query_click +from typing import Any, List, Literal, Optional, Self, Tuple, Dict from typing_extensions import Annotated from fastapi import APIRouter, Depends, Query -from pydantic import BaseModel +from pydantic import BaseModel, Field +import sqlalchemy as sql from .utils import get_measurement_start_day_agg, TimeGrains, parse_probe_asn_to_int -from ...dependencies import ( - get_clickhouse_session, -) +from ...utils.api import ProbeCCOrNone, ProbeASNOrNone +from ...dependencies import get_clickhouse_session, ClickhouseDep from .list_analysis import ( SinceUntil, utc_30_days_ago, @@ -279,6 +281,10 @@ def format_aggregate_query(extra_cols: Dict[str, str], where: str): ) """ +def nan_to_none(val): + if math.isnan(val): + return None + return val @router.get( "/v1/aggregation/analysis", @@ -292,8 +298,8 @@ async def get_aggregation_analysis( test_name: Annotated[Optional[str], Query()] = None, domain: Annotated[Optional[str], Query()] = None, input: Annotated[Optional[str], Query()] = None, - probe_asn: Annotated[Union[int, str, None], Query()] = None, - probe_cc: Annotated[Optional[str], Query(min_length=2, max_length=2)] = None, + probe_asn: ProbeASNOrNone = None, + probe_cc: ProbeCCOrNone = None, ooni_run_link_id: Annotated[Optional[str], Query()] = None, since: SinceUntil = utc_30_days_ago(), until: SinceUntil = utc_today(), @@ -397,10 +403,6 @@ async def get_aggregation_analysis( d = dict(zip(list(extra_cols.keys()) + fixed_cols, row)) blocked_max_protocol = d["blocked_max_protocol"] - def nan_to_none(val): - if math.isnan(val): - return None - return val loni = Loni( dns_blocked=nan_to_none(d["dns_blocked"]), @@ -447,3 +449,154 @@ def nan_to_none(val): dimension_count=dimension_count, results=results, ) + + +class ChangeDir(str, Enum): + up = "up" + down = "down" + + @classmethod + def from_n_or_i(cls, i: int | None) -> Self | None: + if i is None: + return None + + return cls("down") if i == -1 else cls("up") + + +class ChangePointEntry(BaseModel): + # TODO Double check which fields are actually necessary + probe_asn: int + probe_cc: str + domain: str + start_time: datetime # TODO double check the naming of these datetime fields + end_time: datetime + count_isp_resolver: int + count_other_resolver: int + count: int + dns_isp_blocked: float | None + dns_other_blocked: float | None + tcp_blocked: float | None + tls_blocked: float | None + dns_isp_blocked_obs_w_sum: float | None + dns_isp_blocked_w_sum: float | None + dns_isp_blocked_s_pos: float | None + dns_isp_blocked_s_neg: float | None + dns_other_blocked_obs_w_sum: float | None + dns_other_blocked_w_sum: float | None + dns_other_blocked_s_pos: float | None + dns_other_blocked_s_neg: float | None + tcp_blocked_obs_w_sum: float | None + tcp_blocked_w_sum: float | None + tcp_blocked_s_pos: float | None + tcp_blocked_s_neg: float | None + tls_blocked_obs_w_sum: float | None + tls_blocked_w_sum: float | None + tls_blocked_s_pos: float | None + tls_blocked_s_neg: float | None + change_dir: ChangeDir | None = Field( + description="If blocking behaviour goes up or down" + ) + s_pos: float | None + s_neg: float | None + current_mean: float | None + h: float | None + block_type: str + + @classmethod + def from_row(cls, row: Dict[str, Any]) -> Self: + """ + Takes a row as it comes from the clickhouse table 'event_detector_changepoints' + and converts it to a changepoint entry + """ + + def g(s : str) -> Any | None: + return row.get(s) + + return ChangePointEntry( + probe_asn=g("probe_asn"), + probe_cc=g("probe_cc"), + domain=g("domain"), + start_time=g("ts"), + end_time=g("last_ts"), + count_isp_resolver=g("count_isp_resolver"), + count_other_resolver=g("count_other_resolver"), + count=g("count"), + dns_isp_blocked= nan_to_none(g("dns_isp_blocked")), + dns_other_blocked=nan_to_none(g("dns_other_blocked")), + tcp_blocked=nan_to_none(g("tcp_blocked")), + tls_blocked=nan_to_none(g("tls_blocked")), + dns_isp_blocked_obs_w_sum=nan_to_none(g("dns_isp_blocked_obs_w_sum")), + dns_isp_blocked_w_sum=nan_to_none(g("dns_isp_blocked_w_sum")), + dns_isp_blocked_s_pos=nan_to_none(g("dns_isp_blocked_s_pos")), + dns_isp_blocked_s_neg=nan_to_none(g("dns_isp_blocked_s_neg")), + dns_other_blocked_obs_w_sum=nan_to_none(g("dns_other_blocked_obs_w_sum")), + dns_other_blocked_w_sum=nan_to_none(g("dns_other_blocked_w_sum")), + dns_other_blocked_s_pos=nan_to_none(g("dns_other_blocked_s_pos")), + dns_other_blocked_s_neg=nan_to_none(g("dns_other_blocked_s_neg")), + tcp_blocked_obs_w_sum=nan_to_none(g("tcp_blocked_obs_w_sum")), + tcp_blocked_w_sum=nan_to_none(g("tcp_blocked_w_sum")), + tcp_blocked_s_pos=nan_to_none(g("tcp_blocked_s_pos")), + tcp_blocked_s_neg=nan_to_none(g("tcp_blocked_s_neg")), + tls_blocked_obs_w_sum=nan_to_none(g("tls_blocked_obs_w_sum")), + tls_blocked_w_sum=nan_to_none(g("tls_blocked_w_sum")), + tls_blocked_s_pos=nan_to_none(g("tls_blocked_s_pos")), + tls_blocked_s_neg=nan_to_none(g("tls_blocked_s_neg")), + change_dir=ChangeDir.from_n_or_i(g("change_dir")), + s_pos=nan_to_none(g("s_pos")), + s_neg=nan_to_none(g("s_neg")), + current_mean=nan_to_none(g("current_mean")), + h=nan_to_none(g("h")), + block_type= g("block_type") + ) # type: ignore + + +class ListChangePointsResponse(BaseModel): + results: List[ChangePointEntry] + + +@router.get( + "/v1/detector/changepoints", + tags=["detector"], + description="List changepoints detected by the event detector using the cusum algorithm", + response_model=ListChangePointsResponse, +) +@parse_probe_asn_to_int +async def list_changepoints( + clickhouse: ClickhouseDep, + probe_asn: ProbeASNOrNone = None, + probe_cc: ProbeCCOrNone = None, + domain: str | None = Query(default=None), + since: SinceUntil = utc_30_days_ago(), + until: SinceUntil = utc_today(), +) -> ListChangePointsResponse: + conditions = [] + query_params = {} + + if probe_asn: + conditions.append(sql.text("probe_asn = :probe_asn")) + query_params["probe_asn"] = probe_asn + + if probe_cc: + conditions.append(sql.text("probe_cc = :probe_cc")) + query_params["probe_cc"] = probe_cc + + if domain: + conditions.append( + sql.text("domain = :domain") + ) # TODO should this be 'like %domain%'? + query_params["domain"] = domain + + conditions.append(sql.text("ts >= :since")) + query_params["since"] = since + + conditions.append(sql.text("ts <= :until")) + query_params["until"] = until + + changepoints = sql.table("event_detector_changepoints") + q = sql.select("*").select_from(changepoints).where(sql.and_(*conditions)) + + query_result = query_click(clickhouse, q, query_params) + + results = [ChangePointEntry.from_row(entry) for entry in query_result] + + return ListChangePointsResponse(results=results) diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py index 11251491..06c9381d 100644 --- a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py @@ -470,6 +470,7 @@ def report_id_validator(cls, report_id: str) -> str: return report_id + def validate_report_id(report_id: str) -> str: if len(report_id) < 15 or len(report_id) > 100: raise HTTPException( @@ -483,6 +484,7 @@ def validate_report_id(report_id: str) -> str: return report_id + @router.get("/v1/measurement_meta", response_model_exclude_unset=True) async def get_measurement_meta( response: Response, @@ -505,7 +507,7 @@ async def get_measurement_meta( else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Missing measurement_uid or report_id. You should provide at the least one" + detail="Missing measurement_uid or report_id. You should provide at the least one", ) if msmt_meta.probe_asn is not None and isinstance(msmt_meta.probe_asn, str): @@ -1019,6 +1021,7 @@ def get_bucket_url(bucket_name: str) -> str: def asn_to_int(asn_str: str) -> int: return int(asn_str.strip("AS")) + def is_in_charset(s: str, charset: str, error_msg: str): """Ensure `s` contains only valid characters listed in `charset`""" for c in s: diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/__init__.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/api.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/api.py new file mode 100644 index 00000000..881ea358 --- /dev/null +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/api.py @@ -0,0 +1,10 @@ +""" +Utility functions and types to assist API development +""" + +from typing import Annotated, Optional, Union +from fastapi import Query + + +ProbeCCOrNone = Annotated[Optional[str], Query(min_length=2, max_length=2)] +ProbeASNOrNone = Annotated[Union[int, str, None], Query()] diff --git a/ooniapi/services/oonimeasurements/tests/conftest.py b/ooniapi/services/oonimeasurements/tests/conftest.py index b85c4371..5c553e46 100644 --- a/ooniapi/services/oonimeasurements/tests/conftest.py +++ b/ooniapi/services/oonimeasurements/tests/conftest.py @@ -40,6 +40,8 @@ def maybe_download_fixtures(): def is_clickhouse_running(url): + # using ClickhouseClient as probe spams WARN messages with logger in clickhouse_driver + time.sleep(2) try: with ClickhouseClient.from_url(url) as client: client.execute("SELECT 1") @@ -53,7 +55,7 @@ def clickhouse_server(maybe_download_fixtures, docker_ip, docker_services): port = docker_services.port_for("clickhouse", 9000) url = "clickhouse://test:test@{}:{}".format(docker_ip, port) docker_services.wait_until_responsive( - timeout=30.0, pause=0.1, check=lambda: is_clickhouse_running(url) + timeout=30.0, pause=1.0, check=lambda: is_clickhouse_running(url) ) yield url diff --git a/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql b/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql index 7245f4ae..e5b9e554 100644 --- a/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql +++ b/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql @@ -42,7 +42,7 @@ ENGINE = ReplacingMergeTree ORDER BY (measurement_start_time, report_id, input) SETTINGS index_granularity = 8192; -CREATE TABLE IF NOT EXISTS default.citizenlab +CREATE TABLE IF NOT EXISTS default.citizenlab ( `domain` String, `url` String, @@ -64,3 +64,46 @@ CREATE TABLE IF NOT EXISTS default.jsonl ENGINE = MergeTree ORDER BY (report_id, input) SETTINGS index_granularity = 8192; + +CREATE TABLE IF NOT EXISTS default.event_detector_changepoints +( + `probe_asn` UInt32, + `probe_cc` String, + `domain` String, + `ts` DateTime64(3, 'UTC'), + `count_isp_resolver` Nullable(UInt32), + `count_other_resolver` Nullable(UInt32), + `count` Nullable(UInt32), + `dns_isp_blocked` Nullable(Float32), + `dns_other_blocked` Nullable(Float32), + `tcp_blocked` Nullable(Float32), + `tls_blocked` Nullable(Float32), + `last_ts` DateTime64(3, 'UTC'), + `dns_isp_blocked_obs_w_sum` Nullable(Float32), + `dns_isp_blocked_w_sum` Nullable(Float32), + `dns_isp_blocked_s_pos` Nullable(Float32), + `dns_isp_blocked_s_neg` Nullable(Float32), + `dns_other_blocked_obs_w_sum` Nullable(Float32), + `dns_other_blocked_w_sum` Nullable(Float32), + `dns_other_blocked_s_pos` Nullable(Float32), + `dns_other_blocked_s_neg` Nullable(Float32), + `tcp_blocked_obs_w_sum` Nullable(Float32), + `tcp_blocked_w_sum` Nullable(Float32), + `tcp_blocked_s_pos` Nullable(Float32), + `tcp_blocked_s_neg` Nullable(Float32), + `tls_blocked_obs_w_sum` Nullable(Float32), + `tls_blocked_w_sum` Nullable(Float32), + `tls_blocked_s_pos` Nullable(Float32), + `tls_blocked_s_neg` Nullable(Float32), + `change_dir` Nullable(Int8), + `s_pos` Nullable(Float32), + `s_neg` Nullable(Float32), + `current_mean` Nullable(Float32), + `h` Nullable(Float32), +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(ts) +ORDER BY (probe_asn, probe_cc, ts, domain) +SETTINGS index_granularity = 8192; + +ALTER TABLE default.event_detector_changepoints ADD COLUMN `block_type` String; \ No newline at end of file diff --git a/ooniapi/services/oonimeasurements/tests/migrations/4_clickhouse_populate_changepoints.sql b/ooniapi/services/oonimeasurements/tests/migrations/4_clickhouse_populate_changepoints.sql new file mode 100644 index 00000000..83f0f32f --- /dev/null +++ b/ooniapi/services/oonimeasurements/tests/migrations/4_clickhouse_populate_changepoints.sql @@ -0,0 +1,11 @@ +INSERT INTO default.event_detector_changepoints VALUES +(945,'US','www.facebook.com','2024-01-15 18:00:00.000',0,2,2,nan,0,0.75,0,'2024-01-10 23:00:00.000',nan,0,0,0,0,373,0,0,1.5,373,0,0,0,373,0,0,-1,3.6899884,0,0.04597198,3.5, 'tcp_block'), +(15169,'VE','google.com','2024-01-25 23:00:00.000',6,0,6,0,nan,0,0.7,'2024-01-10 23:00:00.000',nan,660,0,0,nan,2,0,0,0,662,0,0,42.9,662,0,0,-1,3.6650198,0,0.084454976,3.5, 'tls_block'), +(8346,'SN','www.tiktok.com','2024-01-31 15:00:00.000',2,0,2,0,nan,0,0.75,'2024-01-10 23:00:00.000',nan,660,0,0,nan,2,0,0,0,662,0,0,42.9,662,0,0,1,3.7752855,0,0.098714285,3.5, 'tcp_block'), +(8767,'DE','preview.redd.it','2024-01-12 22:00:00.000',1,0,1,0,nan,0.75,0,'2024-01-10 23:00:00.000',nan,56,0,0,nan,26,0,0,0,82,0,0,0,82,0,0,-1,3.5091832,0,0.08009709,3.5, 'dns_isp_block'), +(8767,'DE','twitter.com','2024-01-12 22:00:00.000',1,0,1,0,nan,0.75,0,'2024-01-10 23:00:00.000',nan,132,0,0,nan,37,0,0,0,169,0,0,0,169,0,0,1,3.7883966,0,0.03966346,3.5, 'tcp_block'), +(8767,'DE','www.facebook.com','2024-01-12 22:00:00.000',1,0,1,0,nan,0.75,0,'2024-01-10 23:00:00.000',nan,141,0,0,nan,37,0,0,0,178,0,0,0,178,0,0,1,3.800651,0,0.037844036,3.5, 'dns_other_block'), +(8767,'DE','external.xx.fbcdn.net','2024-01-14 22:00:00.000',2,0,2,0,nan,0.75,0,'2024-01-09 01:00:00.000',nan,37,0,0,nan,13,0,0,0,50,0,0,0,50,0,0,1,3.5141737,0,0.13356164,3.5, 'tcp_block'), +(12668,'RU','www.facebook.com','2024-01-20 19:00:00.000',0,1,1,nan,0,0,0.7,'2024-01-10 19:00:00.000',nan,0,0,0,0,184,0,0,0,184,0,0,48.3,184,0,0,1,3.500514,0,0.31609195,3.5, 'tcp_block'), +(8048,'VE','amazon.com','2024-01-29 22:00:00.000',0,1,1,nan,0,0.75,0,'2024-01-10 20:00:00.000',nan,1,0,0,nan,213,0,0,19.5,214,0,0,0,214,0,0,1,3.647134,0,0.1875,3.5, 'tcp_block'), +(8048,'VE','google.com','2024-01-23 04:00:00.000',0,1,1,nan,0,0,0.7,'2024-01-10 21:00:00.000',nan,16,0,0,nan,17,0,0,0,33,0,0,0,33,0,0,-1,3.6120336,0,0.19636363,3.5, 'tcp_block') \ No newline at end of file diff --git a/ooniapi/services/oonimeasurements/tests/test_event_detection.py b/ooniapi/services/oonimeasurements/tests/test_event_detection.py new file mode 100644 index 00000000..bbd4b20f --- /dev/null +++ b/ooniapi/services/oonimeasurements/tests/test_event_detection.py @@ -0,0 +1,136 @@ +from typing import Dict, Any +import httpx +from datetime import datetime, UTC +import pytest + + +def getj( + client: httpx.Client, url: str, params: Dict[str, Any] | None = None +) -> Dict[str, Any]: + resp = client.get(url, params=params) + assert ( + resp.status_code == 200 + ), f"Unexpected status code: {resp.status_code}. {resp.content}" + return resp.json() + + +# reasonable default since and until for the testing data +since = datetime(2024, 1, 1, tzinfo=UTC) +until = datetime(2024, 1, 30, tzinfo=UTC) + + +def getjsu(client, url, params={}): + # use this default since and until + # since the testing data has a fixed date that might + # become "obsolete" in the future for the default + # '30 days ago' since and until limits + params["since"] = since + params["until"] = until + return getj(client, url, params) + + +def test_changepoint_list_basic(client): + resp = getjsu( + client, + "/api/v1/detector/changepoints", + ) + + assert "results" in resp, resp + assert len(resp["results"]) > 0, resp["results"] + + +def normalize_asn(asn: int | str): + if isinstance(asn, int): + return asn + if isinstance(asn, str) and asn.upper().startswith("AS"): + asn = asn.upper().strip("AS") + + return int(asn) + + +def parse_dt(dt: str) -> datetime: + return datetime.fromisoformat(dt) + + +@pytest.mark.parametrize( + "filter_param, filter_value", + [ + ("probe_asn", 8048), + ("probe_asn", "AS8048"), # filter asn by string is also valid + ("probe_asn", "8048"), + ("probe_cc", "VE"), + ("domain", "google.com"), + ], +) +def test_changepoint_filter_basic(client, filter_param, filter_value): + + resp = getjsu( + client, "/api/v1/detector/changepoints", params={filter_param: filter_value} + ) + + assert len(resp["results"]) > 0, "No results to validate" + + if filter_param == "probe_asn": + normalize = normalize_asn + else: + normalize = id + + if filter_param == "probe_asn": + for r in resp["results"]: + assert r[filter_param] == normalize(filter_value), r + + +@pytest.mark.parametrize( + "since_param, until_param, expect_emtpy", + [ + (since, until, False), + (datetime(2021, 1, 1, tzinfo=UTC), datetime(2021, 1, 30, tzinfo=UTC), True), + ], +) +def test_changepoint_date_filter(client, since_param, until_param, expect_emtpy): + + resp = getj( + client, + "/api/v1/detector/changepoints", + params={"since": since_param, "until": until_param}, + ) + + assert len(resp["results"]) > 0 or expect_emtpy, "Not enough results to validate" + assert len(resp["results"]) == 0 or not expect_emtpy, "Result should be empty" + + for r in resp["results"]: + assert parse_dt(r["start_time"]) >= since, r["start_time"] + assert parse_dt(r["end_time"]) <= until, r["end_time"] + + +def test_changepoint_change_dir_values(client): + + resp = getjsu( + client, + "/api/v1/detector/changepoints", + ) + + assert len(resp["results"]) > 0, "Not enough data to validate" + for r in resp["results"]: + assert r["change_dir"] in ["up", "down"] + + resp = getjsu( + client, + "/api/v1/detector/changepoints", + {"probe_cc": "VE", "probe_asn": 15169, "domain": "google.com"}, + ) + + assert resp["results"][0]["change_dir"] == "down" # this one has change_dir == -1 + + resp = getjsu( + client, + "/api/v1/detector/changepoints", + {"probe_cc": "VE", "probe_asn": 8048, "domain": "amazon.com"}, + ) + + assert resp["results"][0]["change_dir"] == "up" # this one has change_dir == 1 + +def test_changepoint_field_present(client): + + resp = getj(client, "/api/v1/detector/changepoints") + assert all('block_type' in r for r in resp['results']) \ No newline at end of file diff --git a/ooniapi/services/oonimeasurements/tests/test_measurements.py b/ooniapi/services/oonimeasurements/tests/test_measurements.py index 5d5e2039..5435368a 100644 --- a/ooniapi/services/oonimeasurements/tests/test_measurements.py +++ b/ooniapi/services/oonimeasurements/tests/test_measurements.py @@ -269,6 +269,7 @@ def test_msm_meta_probe_asn_int(client, monkeypatch): j = resp.json() assert isinstance(j["probe_asn"], int), "probe_asn should be int" + def test_no_report_id_msm_uid_400(client): """ measurement_meta should return 400 if neither report_id nor measurement_uid are provided @@ -276,6 +277,7 @@ def test_no_report_id_msm_uid_400(client): resp = client.get("/api/v1/measurement_meta") assert resp.status_code == 400 + def test_fix_msm_date_parsing(client): # This query was raising an error parsing the date: @@ -316,7 +318,7 @@ def test_get_measurement_meta_basic(client): # You can also query by measurment uid uid = "20210709005529.664022_MY_webconnectivity_68e5bea1060d1874" - response = client.get("/api/v1/measurement_meta", params={'measurement_uid' : uid}) + response = client.get("/api/v1/measurement_meta", params={"measurement_uid": uid}) assert response.status_code == 200, response.status_code @@ -383,13 +385,18 @@ def test_get_measurement_meta_full(client, monkeypatch): } assert raw_msm + def test_bad_report_id_wont_validate(client): - resp = client.get("/api/v1/measurement_meta", params={ - "report_id" : "20210709T004340Z_webconnectivity_MY_4818_n1_YCM7J9mGcEHds#$%" # bad suffix - }) + resp = client.get( + "/api/v1/measurement_meta", + params={ + "report_id": "20210709T004340Z_webconnectivity_MY_4818_n1_YCM7J9mGcEHds#$%" # bad suffix + }, + ) assert resp.status_code == 422, resp.json() + def test_no_measurements_before_30_days(client): """ The default filtering should not retrieve measurements older than 30 days since tomorrow diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/main.py b/ooniapi/services/ooniprobe/src/ooniprobe/main.py index ca5909be..0cb1a0e8 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/main.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/main.py @@ -16,7 +16,7 @@ from . import models from .routers.v2 import vpn from .routers.v1 import probe_services -from .routers import reports +from .routers import reports, bouncer, prio_crud from .download_geoip import try_update from .dependencies import get_postgresql_session, get_clickhouse_session, SettingsDep @@ -71,7 +71,7 @@ def update_geoip_task(): app.add_middleware( CORSMiddleware, - allow_origin_regex="^https://[-A-Za-z0-9]+(\.test)?\.ooni\.(org|io)$", + allow_origin_regex=r"^https://[-A-Za-z0-9]+(\.test)?\.ooni\.(org|io)$", allow_credentials=True, allow_methods=["*"], allow_headers=["*"], @@ -80,6 +80,8 @@ def update_geoip_task(): app.include_router(vpn.router, prefix="/api") app.include_router(probe_services.router, prefix="/api") app.include_router(reports.router) +app.include_router(bouncer.router) +app.include_router(prio_crud.router, prefix="/api") @app.get("/version") diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/bouncer.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/bouncer.py new file mode 100644 index 00000000..d59188f3 --- /dev/null +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/bouncer.py @@ -0,0 +1,110 @@ +import logging +from typing import List, Optional, Any, Dict +from json.decoder import JSONDecodeError + +from fastapi import APIRouter, HTTPException, Request, Response +from pydantic import Field, ValidationError + +from ooniprobe.common.utils import setnocacheresponse +from ooniprobe.common.routers import BaseModel + +router = APIRouter(prefix="/bouncer") + +log = logging.getLogger(__name__) + + +class TestHelperEntry(BaseModel): + address: str + type: str + front: Optional[str] = None + + +class CollectorEntry(BaseModel): + address: str + type: str + front: Optional[str] = None + + +class NetTest(BaseModel): + name: str + collector: str + altcollector: List[CollectorEntry] = Field(alias="collector-alternate") + hashes: Optional[Any] = Field(None, alias="input-hashes") + helpers: Dict[str, str] = Field(alias="test-helpers") + althelpers: Dict[str, List[TestHelperEntry]] = Field(alias="test-helpers-alternate") + version: str + + +class NetTestRequest(BaseModel): + name: str + version: str + + +class NetTestsRequest(BaseModel): + nettests: List[NetTestRequest] = Field(alias="net-tests") + + +class NetTestResponse(BaseModel): + nettests: List[NetTest] = Field(alias="net-tests") + + +@router.post("/net-tests", tags=["bouncer"], response_model=NetTestResponse, response_model_exclude_unset=True) +async def bouncer_net_tests( + response: Response, + request: Request, +) -> Dict[str, List[NetTest]]: + + try: + j = await request.json() + m = NetTestsRequest(**j) + except ValidationError as e: + raise HTTPException(400, detail=e.errors()) + except JSONDecodeError as e: + raise HTTPException(400, detail=str(e)) + except Exception as e: + log.warning("Unexpected Exception:" + str(e)) + raise HTTPException(400, detail=str(e)) + + try: + name = m.nettests[0].name + version = m.nettests[0].version + except IndexError: + raise HTTPException(status_code=400, detail="invalid net-tests request") + + # TODO: load this json from environment or filepath + j = { + "net-tests": [ + { + "collector": "httpo://guegdifjy7bjpequ.onion", + "collector-alternate": [ + {"type": "https", "address": "https://ams-pg.ooni.org"}, + { + "front": "dkyhjv0wpi2dk.cloudfront.net", + "type": "cloudfront", + "address": "https://dkyhjv0wpi2dk.cloudfront.net", + }, + ], + "input-hashes": None, + "name": name, + "test-helpers": { + "tcp-echo": "37.218.241.93", + "http-return-json-headers": "http://37.218.241.94:80", + "web-connectivity": "httpo://y3zq5fwelrzkkv3s.onion", + }, + "test-helpers-alternate": { + "web-connectivity": [ + {"type": "https", "address": "https://wcth.ooni.io"}, + { + "front": "d33d1gs9kpq1c5.cloudfront.net", + "type": "cloudfront", + "address": "https://d33d1gs9kpq1c5.cloudfront.net", + }, + ] + }, + "version": version, + } + ] + } + resp = NetTestResponse(**j) + setnocacheresponse(response) + return resp diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/prio_crud.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/prio_crud.py new file mode 100644 index 00000000..6a997b1b --- /dev/null +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/prio_crud.py @@ -0,0 +1,87 @@ +import logging + +from typing import List, Optional, Tuple + +from fastapi import Response, APIRouter, HTTPException, Query +from pydantic import BaseModel, Field +from sqlalchemy import sql as sa + +from ooniprobe.prio import compute_priorities, generate_test_list +from ooniprobe.dependencies import ClickhouseDep +from ooniprobe.common.clickhouse_utils import query_click +from ooniprobe.common.utils import convert_to_csv + +router = APIRouter() + +log = logging.getLogger(__name__) + + +class PrioritizationType(BaseModel): + anomaly_perc: Optional[str] = Field(description="Anomaly percent") + category_code: Optional[str] = Field(description="Category code") + cc: Optional[str] = Field(description="Country Code") + domain: Optional[str] = Field(description="Domain or wildcard (*)") + msmt_cnt: Optional[int] = Field(description="msmt_cnt") + priority: Optional[int] = Field("Priority weight") + url: Optional[str] = Field("URL or wildcard (*)") + + +@router.get("/_/show_countries_prioritization", tags=["prioritization"], response_model=None) +def show_countries_prioritization(clickhouse: ClickhouseDep, + format: Optional[str] = Query(default="JSON", description="Format of response, CSV or JSON") + ) -> List[PrioritizationType]: + sql = """ + SELECT domain, url, cc, category_code, msmt_cnt, anomaly_perc + FROM citizenlab + LEFT JOIN ( + SELECT input, probe_cc, count() AS msmt_cnt, + toInt8(countIf(anomaly = 't') / msmt_cnt * 100) AS anomaly_perc + FROM fastpath + WHERE measurement_start_time > now() - interval 1 week + AND measurement_start_time < now() + GROUP BY input, probe_cc + ) AS x ON x.input = citizenlab.url AND x.probe_cc = UPPER(citizenlab.cc) + """ + cz = tuple(query_click(clickhouse, sa.text(sql), {})) # cc can be "ZZ" here + + # Fetch priority rules and apply them to URLs + sql = "SELECT category_code, cc, domain, url, priority FROM url_priorities" + prio_rules = tuple(query_click(clickhouse, sa.text(sql), {})) + li = compute_priorities(cz, prio_rules) + for x in li: + x.pop("weight") + x["cc"] = x["cc"].upper() + + li = sorted(li, key=lambda x: (x["cc"], -x["priority"])) + + if len(li)== 0: + raise HTTPException(status_code=400, detail="no data") + + if format.upper() == "CSV": + csv_data = convert_to_csv(li) + response = Response(content=csv_data, media_type="text/csv") + return response + + return li + + +class DebugPrioritization(BaseModel): + test_items: List + entries: Tuple + prio_rules: Tuple + + +@router.get("/_/debug_prioritization", tags=["prioritization"], response_model=DebugPrioritization) +def debug_prioritization( + clickhouse: ClickhouseDep, + probe_cc: Optional[str] = Query(description="2-letter Country-Code", default="ZZ"), + category_codes: str = Query(description="Comma separated list of uppercase URL categories"), + probe_asn: int = Query(description="Probe ASN"), + limit: Optional[int] = Query(description="Maximum number of URLs to return", default=-1), + ) -> DebugPrioritization: + + test_items, entries, prio_rules = generate_test_list(clickhouse, + probe_cc, category_codes, probe_asn, limit, True + ) + return {"test_items": test_items, "entries": entries, "prio_rules": prio_rules} + diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index ffd8052b..ee025a08 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -8,7 +8,7 @@ import geoip2.errors from fastapi import APIRouter, Depends, HTTPException, Query, Response, Request from prometheus_client import Counter, Info, Gauge -from pydantic import Field +from pydantic import Field, IPvAnyAddress from ...utils import ( generate_report_id, @@ -17,7 +17,6 @@ lookup_probe_network, ) from ...dependencies import CCReaderDep, ASNReaderDep, ClickhouseDep, SettingsDep -from ...common.dependencies import get_settings from ...common.routers import BaseModel from ...common.auth import create_jwt, decode_jwt, jwt from ...common.config import Settings @@ -85,7 +84,7 @@ class ProbeLoginResponse(BaseModel): def probe_login_post( probe_login: ProbeLogin, response: Response, - settings: Settings = Depends(get_settings), + settings: SettingsDep, ) -> ProbeLoginResponse: if probe_login.username is None or probe_login.password is None: @@ -155,7 +154,7 @@ class ProbeRegisterResponse(BaseModel): def probe_register_post( probe_register: ProbeRegister, response: Response, - settings: Settings = Depends(get_settings), + settings: SettingsDep, ) -> ProbeRegisterResponse: """Probe Services: Register @@ -353,9 +352,9 @@ def check_in( probe_asn = check_in.probe_asn software_name = check_in.software_name software_version = check_in.software_version - + ipaddr = extract_probe_ipaddr(request) resp, probe_cc, asn_i = probe_geoip( - request, + ipaddr, probe_cc, probe_asn, cc_reader, @@ -478,7 +477,7 @@ def check_in( def probe_geoip( - request: Request, + ipaddr: str, probe_cc: str, asn: str, cc_reader: CCReaderDep, @@ -491,7 +490,6 @@ def probe_geoip( db_asn = "AS0" db_probe_network_name = None try: - ipaddr = extract_probe_ipaddr(request) db_probe_cc = lookup_probe_cc(ipaddr, cc_reader) db_asn, db_probe_network_name = lookup_probe_network(ipaddr, asn_reader) Metrics.GEOIP_ADDR_FOUND.labels(probe_cc=db_probe_cc, asn=db_asn).inc() @@ -684,3 +682,61 @@ def list_test_urls( ) setcacheresponse("1s", response) return out + + +class GeoLookupResult(BaseModel): + cc: str = Field(description="Country Code") + asn: str = Field(description="Autonomous System Number (ASN)") + as_name: str = Field(description="Autonomous System Name") + + +class GeoLookupRequest(BaseModel): + addresses: List[IPvAnyAddress] = Field(description="list of IPv4 or IPv6 address to geolookup") + + +class GeoLookupResponse(BaseModel): + v: int = Field(description="response format version", default="1") + geolocation: Dict[IPvAnyAddress, GeoLookupResult] = Field(description="Dict of IP addresses to GeoLookupResult") + + +@router.post("/geolookup", tags=["ooniprobe"]) +async def geolookup( + data: GeoLookupRequest, + response: Response, + cc_reader: CCReaderDep, + asn_reader: ASNReaderDep, +) -> GeoLookupResponse: + + # initial values probe_geoip compares with + probe_cc = "ZZ" + asn = "AS0" + geolookup_resp = {"geolocation": {}} + + # for each address provided, call probe_geoip and add the data to our response + for ipaddr in data.addresses: + # call probe_geoip() and map the keys to the geolookup v1 API + resp, _, _ = probe_geoip(ipaddr, probe_cc, asn, cc_reader, asn_reader) + # it doesn't seem possible to have separate aliases for (de)serialization + geolookup_resp["geolocation"][ipaddr] = GeoLookupResult(cc=resp["probe_cc"], + asn=resp["probe_asn"], as_name=resp["probe_network_name"]) + + setnocacheresponse(response) + return geolookup_resp + + +class CollectorEntry(BaseModel): + # not actually used but necessary to be compliant with the old API schema + address: str = Field(description="Address of collector") + front: Optional[str] = Field(default=None, description="Fronted domain") + type: Optional[str] = Field(default=None, description="Type of collector") + +@router.get("/collectors", tags=["ooniprobe"]) +def list_collectors( + settings: SettingsDep, + ) -> List[CollectorEntry]: + config_collectors = settings.collectors + collectors_response = [] + for entry in config_collectors: + collector = CollectorEntry(**entry) + collectors_response.append(collector) + return collectors_response diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py index ee0b8388..83504d74 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py @@ -9,9 +9,11 @@ from datetime import datetime, timezone import itertools import logging -from typing import Dict, List, Mapping, TypedDict, Tuple +from typing import List, TypedDict, Tuple +import io from fastapi import Request +from mypy_boto3_s3 import S3Client from sqlalchemy.orm import Session import pem import httpx @@ -128,7 +130,7 @@ def extract_probe_ipaddr(request: Request) -> str: for h in real_ip_headers: if h in request.headers: - return request.headers.getlist(h)[0].rpartition(" ")[-1] + return get_first_ip(request.headers.getlist(h)[0]) return request.client.host if request.client else "" @@ -145,3 +147,23 @@ def lookup_probe_network(ipaddr: str, asn_reader: ASNReaderDep) -> Tuple[str, st "AS{}".format(resp.autonomous_system_number), resp.autonomous_system_organization or "0", ) + +def get_first_ip(headers: str) -> str: + """ + parse the first ip from a comma-separated list of ips encoded as a string + + example: + in: '123.123.123, 1.1.1.1' + out: '123.123.123' + """ + return headers.partition(',')[0] + +def read_file(s3_client : S3Client, bucket: str, file : str) -> str: + """ + Reads the content of `file` within `bucket` into a string + + Useful for reading config files from the s3 bucket + """ + buff = io.BytesIO() + s3_client.download_fileobj(bucket, file, buff) + return buff.getvalue().decode() \ No newline at end of file diff --git a/ooniapi/services/ooniprobe/tests/conftest.py b/ooniapi/services/ooniprobe/tests/conftest.py index 303d2877..4aeea521 100644 --- a/ooniapi/services/ooniprobe/tests/conftest.py +++ b/ooniapi/services/ooniprobe/tests/conftest.py @@ -5,6 +5,7 @@ import shutil import os import json +import time from urllib.request import urlopen from fastapi.testclient import TestClient @@ -111,15 +112,12 @@ def client(clickhouse_server, test_settings, geoip_db_dir): @pytest.fixture -def test_settings( - alembic_migration, docker_ip, docker_services, geoip_db_dir, fastpath_server -): - port = docker_services.port_for("clickhouse", 9000) +def test_settings(alembic_migration, geoip_db_dir, clickhouse_server, fastpath_server): yield make_override_get_settings( postgresql_url=alembic_migration, jwt_encryption_key=JWT_ENCRYPTION_KEY, prometheus_metrics_password="super_secure", - clickhouse_url=f"clickhouse://test:test@{docker_ip}:{port}", + clickhouse_url=clickhouse_server, geoip_db_dir=geoip_db_dir, collector_id="1", fastpath_url=fastpath_server, @@ -132,6 +130,8 @@ def jwt_encryption_key(): def is_clickhouse_running(url): + # using ClickhouseClient as probe spams WARN messages with logger in clickhouse_driver + time.sleep(2) try: with ClickhouseClient.from_url(url) as client: client.execute("SELECT 1") @@ -146,7 +146,7 @@ def clickhouse_server(docker_ip, docker_services): # See password in docker compose url = "clickhouse://test:test@{}:{}".format(docker_ip, port) docker_services.wait_until_responsive( - timeout=30.0, pause=0.1, check=lambda: is_clickhouse_running(url) + timeout=30.0, pause=1.0, check=lambda: is_clickhouse_running(url) ) yield url diff --git a/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py b/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py new file mode 100644 index 00000000..8dd7fec2 --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py @@ -0,0 +1,27 @@ +from typing import Dict, Tuple +import ooniprobe.routers.v1.probe_services as ps +from ooniprobe.utils import lookup_probe_cc, lookup_probe_network +from ooniprobe.dependencies import CCReaderDep, ASNReaderDep + +def fake_lookup_probe_network(ipaddr: str, asn_reader: ASNReaderDep) -> Tuple[str, str]: + return ("AS4242", "Testing Networks") + +def fake_lookup_probe_cc(ipaddr: str, cc_reader: CCReaderDep) -> str: + return "US" + + +def test_geolookup(client, monkeypatch): + monkeypatch.setattr(ps, "lookup_probe_network", fake_lookup_probe_network) + monkeypatch.setattr(ps, "lookup_probe_cc", fake_lookup_probe_cc) + j = dict( + addresses=["192.33.4.12", "170.247.170.2", "2801:1b8:10::b", "2001:500:2::c"] + ) + c = client.post("/api/v1/geolookup", json=j).json() + assert "geolocation" in c + assert "v" in c + g = c["geolocation"] + + for ip in j["addresses"]: + assert g[ip]["cc"] == "US" + assert g[ip]["asn"] == "AS4242" + assert g[ip]["as_name"] == "Testing Networks" diff --git a/ooniapi/services/ooniprobe/tests/integ/test_list_collectors.py b/ooniapi/services/ooniprobe/tests/integ/test_list_collectors.py new file mode 100644 index 00000000..c79942a3 --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/integ/test_list_collectors.py @@ -0,0 +1,5 @@ +def test_list_collectors(client): + c = client.get("/api/v1/collectors").json() + assert len(c) == 6 + for entry in c: + assert "address" in entry diff --git a/ooniapi/services/ooniprobe/tests/test_bouncer.py b/ooniapi/services/ooniprobe/tests/test_bouncer.py new file mode 100644 index 00000000..3a94cc0f --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/test_bouncer.py @@ -0,0 +1,74 @@ +""" +Integration test for OONIProbe API +""" + + +def test_get_bouncer_nettests(client): + version = "1" + + r = client.post("/bouncer/net-tests", json={"net-tests": [{"name": "foo", "version": version}]}) + j = r.json() + assert "net-tests" in j + for v in j["net-tests"]: + for x in ["collector", "input-hashes", "name", "test-helpers", "test-helpers-alternate", "version"]: + assert x in v + +def test_bouncer_net_tests(client): + j = { + "net-tests": [ + { + "input-hashes": None, + "name": "web_connectivity", + "test-helpers": ["web-connectivity"], + "version": "0.0.1", + } + ] + } + c = client.post("/bouncer/net-tests", json=j) + expected = { + "net-tests": [ + { + "collector": "httpo://guegdifjy7bjpequ.onion", + "collector-alternate": [ + {"type": "https", "address": "https://ams-pg.ooni.org"}, + { + "front": "dkyhjv0wpi2dk.cloudfront.net", + "type": "cloudfront", + "address": "https://dkyhjv0wpi2dk.cloudfront.net", + }, + ], + "name": "web_connectivity", + "test-helpers": { + "tcp-echo": "37.218.241.93", + "http-return-json-headers": "http://37.218.241.94:80", + "web-connectivity": "httpo://y3zq5fwelrzkkv3s.onion", + }, + "test-helpers-alternate": { + "web-connectivity": [ + {"type": "https", "address": "https://wcth.ooni.io"}, + { + "front": "d33d1gs9kpq1c5.cloudfront.net", + "type": "cloudfront", + "address": "https://d33d1gs9kpq1c5.cloudfront.net", + }, + ] + }, + "version": "0.0.1", + "input-hashes": None, + } + ] + } + assert c.json() == expected + + +def test_bouncer_net_tests_bad_request1(client): + resp = client.post("/bouncer/net-tests") + # XXX: returns status code 400 for backwards compatibility + assert resp.status_code == 400 + + +def test_bouncer_net_tests_bad_request2(client): + j = {"net-tests": []} + resp = client.post("/bouncer/net-tests", json=j) + # XXX: returns status code 400 for backwards compatibility + assert resp.status_code == 400 diff --git a/ooniapi/services/ooniprobe/tests/test_prio.py b/ooniapi/services/ooniprobe/tests/test_prio.py index e1fea847..f618c809 100644 --- a/ooniapi/services/ooniprobe/tests/test_prio.py +++ b/ooniapi/services/ooniprobe/tests/test_prio.py @@ -147,3 +147,30 @@ def test_compute_priorities_country_list(): "weight": 11.052631578947368, } ] + + +def test_show_countries_prioritization(client): + c = client.get("/api/_/show_countries_prioritization").json() + assert len(c) > 10 + assert len(c) < 60000 + assert sorted(c[0].keys()) == [ + "anomaly_perc", + "category_code", + "cc", + "domain", + "msmt_cnt", + "priority", + "url", + ] + + +def test_show_countries_prioritization_csv(client): + resp = client.get("/api/_/show_countries_prioritization?format=CSV") + assert resp.status_code == 200 + assert resp.headers["content-type"] != "application/json" + + +def test_debug_prioritization(client): + resp = client.get("/api/_/debug_prioritization?probe_cc=ZZ&category_codes=GOVT&probe_asn=4242") + assert resp.status_code == 200 + assert resp.headers["content-type"] == "application/json" diff --git a/ooniapi/services/ooniprobe/tests/test_utils.py b/ooniapi/services/ooniprobe/tests/test_utils.py new file mode 100644 index 00000000..f34ee403 --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/test_utils.py @@ -0,0 +1,7 @@ +from ooniprobe.utils import get_first_ip + +def test_get_first_ip(): + + assert get_first_ip("1.1.1.1") == "1.1.1.1" + assert get_first_ip("1.1.1.1, 2.2.2.2") == "1.1.1.1" + assert get_first_ip("") == "" \ No newline at end of file