Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api_server_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.instrumentation import api_tracing
from cloud_pipelines_backend.instrumentation import contextual_logging
from cloud_pipelines_backend.instrumentation import otel_tracing

app = fastapi.FastAPI(
title="Cloud Pipelines API",
version="0.0.1",
separate_input_output_schemas=False,
)

# Configure OpenTelemetry tracing
otel_tracing.setup_api_tracing(app)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making it non-invasive.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😀


# Add request context middleware for automatic request_id generation
app.add_middleware(api_tracing.RequestContextMiddleware)

Expand Down
97 changes: 97 additions & 0 deletions cloud_pipelines_backend/instrumentation/otel_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""
OpenTelemetry tracing configuration for FastAPI applications.

This module sets up distributed tracing with OTLP exporter for sending traces
to an OpenTelemetry collector endpoint.
"""

import fastapi
import logging
import os

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc import (
trace_exporter as otel_grpc_trace_exporter,
)
from opentelemetry.exporter.otlp.proto.http import (
trace_exporter as otel_http_trace_exporter,
)
from opentelemetry.instrumentation import fastapi as otel_fastapi
from opentelemetry.sdk import resources as otel_resources
from opentelemetry.sdk import trace as otel_trace
from opentelemetry.sdk.trace import export as otel_trace_export

_logger = logging.getLogger(__name__)

_OTEL_PROTOCOL_GRPC = "grpc"
_OTEL_PROTOCOL_HTTP = "http"
_OTEL_PROTOCOLS = (_OTEL_PROTOCOL_GRPC, _OTEL_PROTOCOL_HTTP)


def setup_api_tracing(app: fastapi.FastAPI) -> None:
"""
Configure OpenTelemetry tracing for a FastAPI application.

Args:
app: The FastAPI application instance to instrument

Environment Variables:
TANGLE_OTEL_EXPORTER_ENDPOINT: The endpoint URL for the OTLP collector
(e.g., "http://localhost:4317")
TANGLE_ENV: Environment name to include in service name
(defaults to "development")
TANGLE_OTEL_EXPORTER_PROTOCOL: The protocol to use for the OTLP exporter
(defaults to "grpc", can be "http")
"""
otel_endpoint = os.environ.get("TANGLE_OTEL_EXPORTER_ENDPOINT")
if not otel_endpoint:
return

app_env = os.environ.get("TANGLE_ENV", "development")
otel_protocol = os.environ.get("TANGLE_OTEL_EXPORTER_PROTOCOL", _OTEL_PROTOCOL_GRPC)
service_name = f"tangle-{app_env}"

try:
_logger.info(
f"Configuring OpenTelemetry tracing, endpoint={otel_endpoint}, protocol={otel_protocol}, service_name={service_name}"
)

_validate_otel_config(otel_endpoint, otel_protocol)

if otel_protocol == _OTEL_PROTOCOL_GRPC:
otel_exporter = otel_grpc_trace_exporter.OTLPSpanExporter(
endpoint=otel_endpoint
)
else:
otel_exporter = otel_http_trace_exporter.OTLPSpanExporter(
endpoint=otel_endpoint
)

resource = otel_resources.Resource(
attributes={otel_resources.SERVICE_NAME: service_name}
)
tracer_provider = otel_trace.TracerProvider(resource=resource)
span_processor = otel_trace_export.BatchSpanProcessor(otel_exporter)
tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(tracer_provider)

# FastAPI auto-instrumentation docs:
# https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/fastapi/fastapi.html
otel_fastapi.FastAPIInstrumentor.instrument_app(app)

_logger.info(f"OpenTelemetry tracing configured successfully.")

except Exception as e:
_logger.exception(f"Failed to configure OpenTelemetry tracing: {e}")


def _validate_otel_config(otel_endpoint: str, otel_protocol: str) -> None:
"""Validate OTel configuration. Raises ValueError if invalid."""
if not otel_endpoint.startswith(("http://", "https://")):
raise ValueError(
f"Invalid OTel endpoint format: {otel_endpoint}. Expected format: http://<host>:<port> or https://<host>:<port>"
)
if otel_protocol not in _OTEL_PROTOCOLS:
raise ValueError(
f"Invalid OTel protocol: {otel_protocol}. Expected values: {_OTEL_PROTOCOL_GRPC}, {_OTEL_PROTOCOL_HTTP}"
)
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ dependencies = [
"cloud-pipelines>=0.23.2.4",
"fastapi[standard]>=0.115.12",
"kubernetes>=33.1.0",
"opentelemetry-api>=1.28.2",
"opentelemetry-exporter-otlp-proto-grpc>=1.39.1",
"opentelemetry-exporter-otlp-proto-http>=1.39.1",
"opentelemetry-instrumentation-fastapi>=0.60b1",
"opentelemetry-sdk>=1.39.1",
"requests<2.32.0",
"sqlalchemy>=2.0.41",
]
Expand Down
4 changes: 4 additions & 0 deletions start_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def run_orchestrator(
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.instrumentation import api_tracing
from cloud_pipelines_backend.instrumentation import contextual_logging
from cloud_pipelines_backend.instrumentation import otel_tracing


@contextlib.asynccontextmanager
Expand All @@ -242,6 +243,9 @@ async def lifespan(app: fastapi.FastAPI):
lifespan=lifespan,
)

# Configure OpenTelemetry tracing
otel_tracing.setup_api_tracing(app)

# Add request context middleware for automatic request_id generation
app.add_middleware(api_tracing.RequestContextMiddleware)

Expand Down
Loading