Skip to content

Commit 46f0c19

Browse files
authored
Add Langfuse observability to Unified API (#457)
* Add Langfuse observability to LLM execution methods * Enhance observability decorator to validate Langfuse credentials before execution * remove trace metadata * precommit * remove creds check
1 parent 63db354 commit 46f0c19

File tree

2 files changed

+123
-2
lines changed

2 files changed

+123
-2
lines changed

backend/app/core/langfuse/langfuse.py

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import uuid
22
import logging
3-
from typing import Any, Dict, Optional
3+
from typing import Any, Callable, Dict, Optional
4+
from functools import wraps
45

56
from asgi_correlation_id import correlation_id
67
from langfuse import Langfuse
78
from langfuse.client import StatefulGenerationClient, StatefulTraceClient
9+
from app.models.llm import CompletionConfig, QueryParams, LLMCallResponse
810

911
logger = logging.getLogger(__name__)
1012

@@ -107,3 +109,102 @@ def log_error(self, error_message: str, response_id: Optional[str] = None):
107109

108110
def flush(self):
109111
self.langfuse.flush()
112+
113+
114+
def observe_llm_execution(
115+
session_id: str | None = None,
116+
credentials: dict | None = None,
117+
):
118+
"""Decorator to add Langfuse observability to LLM provider execute methods.
119+
120+
Args:
121+
credentials: Langfuse credentials with public_key, secret_key, and host
122+
session_id: Session ID for grouping traces (conversation_id)
123+
124+
Usage:
125+
decorated_execute = observe_llm_execution(
126+
credentials=langfuse_creds,
127+
session_id=conversation_id
128+
)(provider_instance.execute)
129+
"""
130+
131+
def decorator(func: Callable) -> Callable:
132+
@wraps(func)
133+
def wrapper(completion_config: CompletionConfig, query: QueryParams, **kwargs):
134+
# Skip observability if no credentials provided
135+
if not credentials:
136+
logger.info("[Langfuse] No credentials - skipping observability")
137+
return func(completion_config, query, **kwargs)
138+
139+
try:
140+
langfuse = Langfuse(
141+
public_key=credentials.get("public_key"),
142+
secret_key=credentials.get("secret_key"),
143+
host=credentials.get("host"),
144+
)
145+
except Exception as e:
146+
logger.warning(f"[Langfuse] Failed to initialize client: {e}")
147+
return func(completion_config, query, **kwargs)
148+
149+
trace = langfuse.trace(
150+
name="unified-llm-call",
151+
input=query.input,
152+
tags=[completion_config.provider],
153+
)
154+
155+
generation = trace.generation(
156+
name=f"{completion_config.provider}-completion",
157+
input=query.input,
158+
model=completion_config.params.get("model"),
159+
)
160+
161+
try:
162+
# Execute the actual LLM call
163+
response: LLMCallResponse | None
164+
error: str | None
165+
response, error = func(completion_config, query, **kwargs)
166+
167+
if response:
168+
generation.end(
169+
output={
170+
"status": "success",
171+
"output": response.response.output.text,
172+
},
173+
usage_details={
174+
"input": response.usage.input_tokens,
175+
"output": response.usage.output_tokens,
176+
},
177+
model=response.response.model,
178+
)
179+
180+
trace.update(
181+
output={
182+
"status": "success",
183+
"output": response.response.output.text,
184+
},
185+
session_id=session_id or response.response.conversation_id,
186+
)
187+
else:
188+
error_msg = error or "Unknown error"
189+
generation.end(output={"error": error_msg})
190+
trace.update(
191+
output={"status": "failure", "error": error_msg},
192+
session_id=session_id,
193+
)
194+
195+
langfuse.flush()
196+
return response, error
197+
198+
except Exception as e:
199+
error_msg = str(e)
200+
generation.end(output={"error": error_msg})
201+
trace.update(
202+
output={"status": "failure", "error": error_msg},
203+
session_id=session_id,
204+
)
205+
langfuse.flush()
206+
raise
207+
208+
return wrapper
209+
210+
return decorator

backend/app/services/llm/jobs.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77

88
from app.core.db import engine
99
from app.crud.config import ConfigVersionCrud
10+
from app.crud.credentials import get_provider_credential
1011
from app.crud.jobs import JobCrud
1112
from app.models import JobStatus, JobType, JobUpdate, LLMCallRequest
1213
from app.models.llm.request import ConfigBlob, LLMCallConfig
1314
from app.utils import APIResponse, send_callback
1415
from app.celery.utils import start_high_priority_job
16+
from app.core.langfuse.langfuse import observe_llm_execution
1517
from app.services.llm.providers.registry import get_llm_provider
1618

1719

@@ -182,7 +184,25 @@ def execute_job(
182184
)
183185
return handle_job_error(job_id, request.callback_url, callback_response)
184186

185-
response, error = provider_instance.execute(
187+
langfuse_credentials = get_provider_credential(
188+
session=session,
189+
org_id=organization_id,
190+
project_id=project_id,
191+
provider="langfuse",
192+
)
193+
194+
# Extract conversation_id for langfuse session grouping
195+
conversation_id = None
196+
if request.query.conversation and request.query.conversation.id:
197+
conversation_id = request.query.conversation.id
198+
199+
# Apply Langfuse observability decorator to provider execute method
200+
decorated_execute = observe_llm_execution(
201+
credentials=langfuse_credentials,
202+
session_id=conversation_id,
203+
)(provider_instance.execute)
204+
205+
response, error = decorated_execute(
186206
completion_config=config_blob.completion,
187207
query=request.query,
188208
include_provider_raw_response=request.include_provider_raw_response,

0 commit comments

Comments
 (0)