From d990add80ea856e16a8d7a630c57cdd486f518c0 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Mon, 15 Dec 2025 20:14:18 +0530 Subject: [PATCH 1/9] first stab at refactoring --- backend/app/api/routes/evaluation.py | 500 ++---------------- backend/app/crud/evaluations/__init__.py | 4 + backend/app/services/evaluation/__init__.py | 32 ++ backend/app/services/evaluation/dataset.py | 163 ++++++ backend/app/services/evaluation/evaluation.py | 329 ++++++++++++ backend/app/services/evaluation/validators.py | 174 ++++++ .../app/tests/api/routes/test_evaluation.py | 38 +- 7 files changed, 754 insertions(+), 486 deletions(-) create mode 100644 backend/app/services/evaluation/__init__.py create mode 100644 backend/app/services/evaluation/dataset.py create mode 100644 backend/app/services/evaluation/evaluation.py create mode 100644 backend/app/services/evaluation/validators.py diff --git a/backend/app/api/routes/evaluation.py b/backend/app/api/routes/evaluation.py index 058950d6..fa35c84f 100644 --- a/backend/app/api/routes/evaluation.py +++ b/backend/app/api/routes/evaluation.py @@ -1,50 +1,33 @@ -import csv -import io +"""Evaluation API routes.""" + import logging -import re -from pathlib import Path from fastapi import APIRouter, Body, File, Form, HTTPException, Query, UploadFile from app.api.deps import AuthContextDep, SessionDep -from app.core.cloud import get_cloud_storage -from app.crud.assistants import get_assistant_by_id from app.crud.evaluations import ( - create_evaluation_dataset, - create_evaluation_run, get_dataset_by_id, - get_evaluation_run_by_id, list_datasets, - start_evaluation_batch, - upload_csv_to_object_store, - upload_dataset_to_langfuse, ) from app.crud.evaluations import list_evaluation_runs as list_evaluation_runs_crud -from app.crud.evaluations.core import save_score from app.crud.evaluations.dataset import delete_dataset as delete_dataset_crud -from app.crud.evaluations.langfuse import fetch_trace_scores_from_langfuse from app.models.evaluation import ( DatasetUploadResponse, EvaluationRunPublic, ) +from app.services.evaluation import ( + get_evaluation_with_scores, + start_evaluation, + upload_dataset, + validate_csv_file, +) from app.utils import ( APIResponse, - get_langfuse_client, - get_openai_client, load_description, ) logger = logging.getLogger(__name__) -# File upload security constants -MAX_FILE_SIZE = 1024 * 1024 # 1 MB -ALLOWED_EXTENSIONS = {".csv"} -ALLOWED_MIME_TYPES = { - "text/csv", - "application/csv", - "text/plain", # Some systems report CSV as text/plain -} - router = APIRouter(tags=["evaluation"]) @@ -61,60 +44,12 @@ def _dataset_to_response(dataset) -> DatasetUploadResponse: ) -def sanitize_dataset_name(name: str) -> str: - """ - Sanitize dataset name for Langfuse compatibility. - - Langfuse has issues with spaces and special characters in dataset names. - This function ensures the name can be both created and fetched. - - Rules: - - Replace spaces with underscores - - Replace hyphens with underscores - - Keep only alphanumeric characters and underscores - - Convert to lowercase for consistency - - Remove leading/trailing underscores - - Collapse multiple consecutive underscores into one - - Args: - name: Original dataset name - - Returns: - Sanitized dataset name safe for Langfuse - - Examples: - "testing 0001" -> "testing_0001" - "My Dataset!" -> "my_dataset" - "Test--Data__Set" -> "test_data_set" - """ - # Convert to lowercase - sanitized = name.lower() - - # Replace spaces and hyphens with underscores - sanitized = sanitized.replace(" ", "_").replace("-", "_") - - # Keep only alphanumeric characters and underscores - sanitized = re.sub(r"[^a-z0-9_]", "", sanitized) - - # Collapse multiple underscores into one - sanitized = re.sub(r"_+", "_", sanitized) - - # Remove leading/trailing underscores - sanitized = sanitized.strip("_") - - # Ensure name is not empty - if not sanitized: - raise ValueError("Dataset name cannot be empty after sanitization") - - return sanitized - - @router.post( "/evaluations/datasets", description=load_description("evaluation/upload_dataset.md"), response_model=APIResponse[DatasetUploadResponse], ) -async def upload_dataset( +async def upload_dataset_endpoint( _session: SessionDep, auth_context: AuthContextDep, file: UploadFile = File( @@ -129,198 +64,22 @@ async def upload_dataset( description="Number of times to duplicate each item (min: 1, max: 5)", ), ) -> APIResponse[DatasetUploadResponse]: - # Sanitize dataset name for Langfuse compatibility - original_name = dataset_name - try: - dataset_name = sanitize_dataset_name(dataset_name) - except ValueError as e: - raise HTTPException(status_code=422, detail=f"Invalid dataset name: {str(e)}") - - if original_name != dataset_name: - logger.info( - f"[upload_dataset] Dataset name sanitized | '{original_name}' -> '{dataset_name}'" - ) - - logger.info( - f"[upload_dataset] Uploading dataset | dataset={dataset_name} | " - f"duplication_factor={duplication_factor} | org_id={auth_context.organization.id} | " - f"project_id={auth_context.project.id}" - ) + """Upload an evaluation dataset.""" + # Validate and read CSV file + csv_content = await validate_csv_file(file) - # Security validation: Check file extension - file_ext = Path(file.filename).suffix.lower() - if file_ext not in ALLOWED_EXTENSIONS: - raise HTTPException( - status_code=422, - detail=f"Invalid file type. Only CSV files are allowed. Got: {file_ext}", - ) - - # Security validation: Check MIME type - content_type = file.content_type - if content_type not in ALLOWED_MIME_TYPES: - raise HTTPException( - status_code=422, - detail=f"Invalid content type. Expected CSV, got: {content_type}", - ) - - # Security validation: Check file size - file.file.seek(0, 2) # Seek to end - file_size = file.file.tell() - file.file.seek(0) # Reset to beginning - - if file_size > MAX_FILE_SIZE: - raise HTTPException( - status_code=413, - detail=f"File too large. Maximum size: {MAX_FILE_SIZE / (1024 * 1024):.0f}MB", - ) - - if file_size == 0: - raise HTTPException(status_code=422, detail="Empty file uploaded") - - # Read CSV content - csv_content = await file.read() - - # Step 1: Parse and validate CSV - try: - csv_text = csv_content.decode("utf-8") - csv_reader = csv.DictReader(io.StringIO(csv_text)) - - if not csv_reader.fieldnames: - raise HTTPException(status_code=422, detail="CSV file has no headers") - - # Normalize headers for case-insensitive matching - clean_headers = { - field.strip().lower(): field for field in csv_reader.fieldnames - } - - # Validate required headers (case-insensitive) - if "question" not in clean_headers or "answer" not in clean_headers: - raise HTTPException( - status_code=422, - detail=f"CSV must contain 'question' and 'answer' columns " - f"Found columns: {csv_reader.fieldnames}", - ) - - # Get the actual column names from the CSV - question_col = clean_headers["question"] - answer_col = clean_headers["answer"] - - # Count original items - original_items = [] - for row in csv_reader: - question = row.get(question_col, "").strip() - answer = row.get(answer_col, "").strip() - if question and answer: - original_items.append({"question": question, "answer": answer}) - - if not original_items: - raise HTTPException( - status_code=422, detail="No valid items found in CSV file" - ) - - original_items_count = len(original_items) - total_items_count = original_items_count * duplication_factor - - logger.info( - f"[upload_dataset] Parsed items from CSV | original={original_items_count} | " - f"total_with_duplication={total_items_count}" - ) - - except Exception as e: - logger.error(f"[upload_dataset] Failed to parse CSV | {e}", exc_info=True) - raise HTTPException(status_code=422, detail=f"Invalid CSV file: {e}") - - # Step 2: Upload to object store (if credentials configured) - object_store_url = None - try: - storage = get_cloud_storage( - session=_session, project_id=auth_context.project.id - ) - object_store_url = upload_csv_to_object_store( - storage=storage, csv_content=csv_content, dataset_name=dataset_name - ) - if object_store_url: - logger.info( - f"[upload_dataset] Successfully uploaded CSV to object store | {object_store_url}" - ) - else: - logger.info( - "[upload_dataset] Object store upload returned None | continuing without object store storage" - ) - except Exception as e: - logger.warning( - f"[upload_dataset] Failed to upload CSV to object store (continuing without object store) | {e}", - exc_info=True, - ) - object_store_url = None - - # Step 3: Upload to Langfuse - langfuse_dataset_id = None - try: - # Get Langfuse client - langfuse = get_langfuse_client( - session=_session, - org_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - - # Upload to Langfuse - langfuse_dataset_id, _ = upload_dataset_to_langfuse( - langfuse=langfuse, - items=original_items, - dataset_name=dataset_name, - duplication_factor=duplication_factor, - ) - - logger.info( - f"[upload_dataset] Successfully uploaded dataset to Langfuse | " - f"dataset={dataset_name} | id={langfuse_dataset_id}" - ) - - except Exception as e: - logger.error( - f"[upload_dataset] Failed to upload dataset to Langfuse | {e}", - exc_info=True, - ) - raise HTTPException( - status_code=500, detail=f"Failed to upload dataset to Langfuse: {e}" - ) - - # Step 4: Store metadata in database - metadata = { - "original_items_count": original_items_count, - "total_items_count": total_items_count, - "duplication_factor": duplication_factor, - } - - dataset = create_evaluation_dataset( + # Upload dataset using service + dataset = upload_dataset( session=_session, - name=dataset_name, + csv_content=csv_content, + dataset_name=dataset_name, description=description, - dataset_metadata=metadata, - object_store_url=object_store_url, - langfuse_dataset_id=langfuse_dataset_id, + duplication_factor=duplication_factor, organization_id=auth_context.organization.id, project_id=auth_context.project.id, ) - logger.info( - f"[upload_dataset] Successfully created dataset record in database | " - f"id={dataset.id} | name={dataset_name}" - ) - - # Return response - return APIResponse.success_response( - data=DatasetUploadResponse( - dataset_id=dataset.id, - dataset_name=dataset_name, - total_items=total_items_count, - original_items=original_items_count, - duplication_factor=duplication_factor, - langfuse_dataset_id=langfuse_dataset_id, - object_store_url=object_store_url, - ) - ) + return APIResponse.success_response(data=_dataset_to_response(dataset)) @router.get( @@ -334,6 +93,7 @@ def list_datasets_endpoint( limit: int = 50, offset: int = 0, ) -> APIResponse[list[DatasetUploadResponse]]: + """List evaluation datasets.""" # Enforce maximum limit if limit > 100: limit = 100 @@ -361,6 +121,7 @@ def get_dataset( _session: SessionDep, auth_context: AuthContextDep, ) -> APIResponse[DatasetUploadResponse]: + """Get a specific evaluation dataset.""" logger.info( f"[get_dataset] Fetching dataset | id={dataset_id} | " f"org_id={auth_context.organization.id} | " @@ -392,6 +153,7 @@ def delete_dataset( _session: SessionDep, auth_context: AuthContextDep, ) -> APIResponse[dict]: + """Delete an evaluation dataset.""" logger.info( f"[delete_dataset] Deleting dataset | id={dataset_id} | " f"org_id={auth_context.organization.id} | " @@ -406,7 +168,6 @@ def delete_dataset( ) if not success: - # Check if it's a not found error or other error type if "not found" in message.lower(): raise HTTPException(status_code=404, detail=message) else: @@ -436,143 +197,18 @@ def evaluate( None, description="Optional assistant ID to fetch configuration from" ), ) -> APIResponse[EvaluationRunPublic]: - logger.info( - f"[evaluate] Starting evaluation | experiment_name={experiment_name} | " - f"dataset_id={dataset_id} | " - f"org_id={auth_context.organization.id} | " - f"assistant_id={assistant_id} | " - f"config_keys={list(config.keys())}" - ) - - # Step 1: Fetch dataset from database - dataset = get_dataset_by_id( + """Start an evaluation run.""" + eval_run = start_evaluation( session=_session, dataset_id=dataset_id, - organization_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - - if not dataset: - raise HTTPException( - status_code=404, - detail=f"Dataset {dataset_id} not found or not accessible to this " - f"organization/project", - ) - - logger.info( - f"[evaluate] Found dataset | id={dataset.id} | name={dataset.name} | " - f"object_store_url={'present' if dataset.object_store_url else 'None'} | " - f"langfuse_id={dataset.langfuse_dataset_id}" - ) - - dataset_name = dataset.name - - # Get API clients - openai_client = get_openai_client( - session=_session, - org_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - langfuse = get_langfuse_client( - session=_session, - org_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - - # Validate dataset has Langfuse ID (should have been set during dataset creation) - if not dataset.langfuse_dataset_id: - raise HTTPException( - status_code=400, - detail=f"Dataset {dataset_id} does not have a Langfuse dataset ID. " - "Please ensure Langfuse credentials were configured when the dataset was created.", - ) - - # Handle assistant_id if provided - if assistant_id: - # Fetch assistant details from database - assistant = get_assistant_by_id( - session=_session, - assistant_id=assistant_id, - project_id=auth_context.project.id, - ) - - if not assistant: - raise HTTPException( - status_code=404, detail=f"Assistant {assistant_id} not found" - ) - - logger.info( - f"[evaluate] Found assistant in DB | id={assistant.id} | " - f"model={assistant.model} | instructions=" - f"{assistant.instructions[:50] if assistant.instructions else 'None'}..." - ) - - # Build config from assistant (use provided config values to override - # if present) - config = { - "model": config.get("model", assistant.model), - "instructions": config.get("instructions", assistant.instructions), - "temperature": config.get("temperature", assistant.temperature), - } - - # Add tools if vector stores are available - vector_store_ids = config.get( - "vector_store_ids", assistant.vector_store_ids or [] - ) - if vector_store_ids and len(vector_store_ids) > 0: - config["tools"] = [ - { - "type": "file_search", - "vector_store_ids": vector_store_ids, - } - ] - - logger.info("[evaluate] Using config from assistant") - else: - logger.info("[evaluate] Using provided config directly") - # Validate that config has minimum required fields - if not config.get("model"): - raise HTTPException( - status_code=400, - detail="Config must include 'model' when assistant_id is not provided", - ) - - # Create EvaluationRun record - eval_run = create_evaluation_run( - session=_session, - run_name=experiment_name, - dataset_name=dataset_name, - dataset_id=dataset_id, + experiment_name=experiment_name, config=config, + assistant_id=assistant_id, organization_id=auth_context.organization.id, project_id=auth_context.project.id, ) - # Start the batch evaluation - try: - eval_run = start_evaluation_batch( - langfuse=langfuse, - openai_client=openai_client, - session=_session, - eval_run=eval_run, - config=config, - ) - - logger.info( - f"[evaluate] Evaluation started successfully | " - f"batch_job_id={eval_run.batch_job_id} | total_items={eval_run.total_items}" - ) - - return APIResponse.success_response(data=eval_run) - - except Exception as e: - logger.error( - f"[evaluate] Failed to start evaluation | run_id={eval_run.id} | {e}", - exc_info=True, - ) - # Error is already handled in start_evaluation_batch - _session.refresh(eval_run) - return APIResponse.success_response(data=eval_run) + return APIResponse.success_response(data=eval_run) @router.get( @@ -586,6 +222,7 @@ def list_evaluation_runs( limit: int = 50, offset: int = 0, ) -> APIResponse[list[EvaluationRunPublic]]: + """List evaluation runs.""" logger.info( f"[list_evaluation_runs] Listing evaluation runs | " f"org_id={auth_context.organization.id} | " @@ -629,26 +266,20 @@ def get_evaluation_run_status( ), ), ) -> APIResponse[EvaluationRunPublic]: - logger.info( - f"[get_evaluation_run_status] Fetching status for evaluation run | " - f"evaluation_id={evaluation_id} | " - f"org_id={auth_context.organization.id} | " - f"project_id={auth_context.project.id} | " - f"get_trace_info={get_trace_info} | " - f"resync_score={resync_score}" - ) - + """Get evaluation run status with optional trace info.""" if resync_score and not get_trace_info: raise HTTPException( status_code=400, detail="resync_score=true requires get_trace_info=true", ) - eval_run = get_evaluation_run_by_id( + eval_run, error = get_evaluation_with_scores( session=_session, evaluation_id=evaluation_id, organization_id=auth_context.organization.id, project_id=auth_context.project.id, + get_trace_info=get_trace_info, + resync_score=resync_score, ) if not eval_run: @@ -660,72 +291,7 @@ def get_evaluation_run_status( ), ) - if get_trace_info: - # Only fetch trace info for completed evaluations - if eval_run.status != "completed": - return APIResponse.failure_response( - error=f"Trace info is only available for completed evaluations. " - f"Current status: {eval_run.status}", - data=eval_run, - ) - - # Check if we already have cached scores (before any slow operations) - has_cached_score = eval_run.score is not None and "traces" in eval_run.score - if not resync_score and has_cached_score: - return APIResponse.success_response(data=eval_run) - - # Get Langfuse client (needs session for credentials lookup) - langfuse = get_langfuse_client( - session=_session, - org_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - - # Capture data needed for Langfuse fetch and DB update - dataset_name = eval_run.dataset_name - run_name = eval_run.run_name - eval_run_id = eval_run.id - org_id = auth_context.organization.id - project_id = auth_context.project.id - - # Session is no longer needed - slow Langfuse API calls happen here - # without holding the DB connection - try: - score = fetch_trace_scores_from_langfuse( - langfuse=langfuse, - dataset_name=dataset_name, - run_name=run_name, - ) - except ValueError as e: - # Run not found in Langfuse - return eval_run with error - logger.warning( - f"[get_evaluation_run_status] Run not found in Langfuse | " - f"evaluation_id={evaluation_id} | error={e}" - ) - return APIResponse.failure_response(error=str(e), data=eval_run) - except Exception as e: - logger.error( - f"[get_evaluation_run_status] Failed to fetch trace info | " - f"evaluation_id={evaluation_id} | error={e}", - exc_info=True, - ) - return APIResponse.failure_response( - error=f"Failed to fetch trace info from Langfuse: {str(e)}", - data=eval_run, - ) - - # Open new session just for the score commit - eval_run = save_score( - eval_run_id=eval_run_id, - organization_id=org_id, - project_id=project_id, - score=score, - ) - - if not eval_run: - raise HTTPException( - status_code=404, - detail=f"Evaluation run {evaluation_id} not found after score update", - ) + if error: + return APIResponse.failure_response(error=error, data=eval_run) return APIResponse.success_response(data=eval_run) diff --git a/backend/app/crud/evaluations/__init__.py b/backend/app/crud/evaluations/__init__.py index 5ca0aacd..4e797e9f 100644 --- a/backend/app/crud/evaluations/__init__.py +++ b/backend/app/crud/evaluations/__init__.py @@ -5,6 +5,7 @@ create_evaluation_run, get_evaluation_run_by_id, list_evaluation_runs, + save_score, ) from app.crud.evaluations.cron import ( process_all_pending_evaluations, @@ -24,6 +25,7 @@ ) from app.crud.evaluations.langfuse import ( create_langfuse_dataset_run, + fetch_trace_scores_from_langfuse, update_traces_with_cosine_scores, upload_dataset_to_langfuse, ) @@ -39,6 +41,7 @@ "create_evaluation_run", "get_evaluation_run_by_id", "list_evaluation_runs", + "save_score", # Cron "process_all_pending_evaluations", "process_all_pending_evaluations_sync", @@ -61,6 +64,7 @@ "start_embedding_batch", # Langfuse "create_langfuse_dataset_run", + "fetch_trace_scores_from_langfuse", "update_traces_with_cosine_scores", "upload_dataset_to_langfuse", ] diff --git a/backend/app/services/evaluation/__init__.py b/backend/app/services/evaluation/__init__.py new file mode 100644 index 00000000..6e3aac41 --- /dev/null +++ b/backend/app/services/evaluation/__init__.py @@ -0,0 +1,32 @@ +"""Evaluation services.""" + +from app.services.evaluation.dataset import upload_dataset +from app.services.evaluation.evaluation import ( + build_evaluation_config, + get_evaluation_with_scores, + start_evaluation, +) +from app.services.evaluation.validators import ( + ALLOWED_EXTENSIONS, + ALLOWED_MIME_TYPES, + MAX_FILE_SIZE, + parse_csv_items, + sanitize_dataset_name, + validate_csv_file, +) + +__all__ = [ + # Dataset + "upload_dataset", + # Evaluation + "build_evaluation_config", + "get_evaluation_with_scores", + "start_evaluation", + # Validators + "ALLOWED_EXTENSIONS", + "ALLOWED_MIME_TYPES", + "MAX_FILE_SIZE", + "parse_csv_items", + "sanitize_dataset_name", + "validate_csv_file", +] diff --git a/backend/app/services/evaluation/dataset.py b/backend/app/services/evaluation/dataset.py new file mode 100644 index 00000000..5ac9470e --- /dev/null +++ b/backend/app/services/evaluation/dataset.py @@ -0,0 +1,163 @@ +"""Dataset management service for evaluations.""" + +import logging + +from fastapi import HTTPException +from sqlmodel import Session + +from app.core.cloud import get_cloud_storage +from app.crud.evaluations import ( + create_evaluation_dataset, + upload_csv_to_object_store, + upload_dataset_to_langfuse, +) +from app.models.evaluation import EvaluationDataset +from app.services.evaluation.validators import ( + parse_csv_items, + sanitize_dataset_name, +) +from app.utils import get_langfuse_client + +logger = logging.getLogger(__name__) + + +def upload_dataset( + session: Session, + csv_content: bytes, + dataset_name: str, + description: str | None, + duplication_factor: int, + organization_id: int, + project_id: int, +) -> EvaluationDataset: + """ + Orchestrate dataset upload workflow. + + Steps: + 1. Sanitize dataset name + 2. Parse and validate CSV + 3. Upload to object store + 4. Upload to Langfuse + 5. Store metadata in database + + Args: + session: Database session + csv_content: Raw CSV file content + dataset_name: Name for the dataset + description: Optional dataset description + duplication_factor: Number of times to duplicate each item + organization_id: Organization ID + project_id: Project ID + + Returns: + Created EvaluationDataset record + + Raises: + HTTPException: If upload fails at any step + """ + # Step 1: Sanitize dataset name for Langfuse compatibility + original_name = dataset_name + try: + dataset_name = sanitize_dataset_name(dataset_name) + except ValueError as e: + raise HTTPException(status_code=422, detail=f"Invalid dataset name: {str(e)}") + + if original_name != dataset_name: + logger.info( + f"[upload_dataset] Dataset name sanitized | '{original_name}' -> '{dataset_name}'" + ) + + logger.info( + f"[upload_dataset] Uploading dataset | dataset={dataset_name} | " + f"duplication_factor={duplication_factor} | org_id={organization_id} | " + f"project_id={project_id}" + ) + + # Step 2: Parse CSV and extract items + original_items = parse_csv_items(csv_content) + original_items_count = len(original_items) + total_items_count = original_items_count * duplication_factor + + logger.info( + f"[upload_dataset] Parsed items from CSV | original={original_items_count} | " + f"total_with_duplication={total_items_count}" + ) + + # Step 3: Upload to object store (if credentials configured) + object_store_url = None + try: + storage = get_cloud_storage(session=session, project_id=project_id) + object_store_url = upload_csv_to_object_store( + storage=storage, csv_content=csv_content, dataset_name=dataset_name + ) + if object_store_url: + logger.info( + f"[upload_dataset] Successfully uploaded CSV to object store | {object_store_url}" + ) + else: + logger.info( + "[upload_dataset] Object store upload returned None | " + "continuing without object store storage" + ) + except Exception as e: + logger.warning( + f"[upload_dataset] Failed to upload CSV to object store " + f"(continuing without object store) | {e}", + exc_info=True, + ) + object_store_url = None + + # Step 4: Upload to Langfuse + langfuse_dataset_id = None + try: + langfuse = get_langfuse_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + + langfuse_dataset_id, _ = upload_dataset_to_langfuse( + langfuse=langfuse, + items=original_items, + dataset_name=dataset_name, + duplication_factor=duplication_factor, + ) + + logger.info( + f"[upload_dataset] Successfully uploaded dataset to Langfuse | " + f"dataset={dataset_name} | id={langfuse_dataset_id}" + ) + + except Exception as e: + logger.error( + f"[upload_dataset] Failed to upload dataset to Langfuse | {e}", + exc_info=True, + ) + raise HTTPException( + status_code=500, detail=f"Failed to upload dataset to Langfuse: {e}" + ) + + # Step 5: Store metadata in database + metadata = { + "original_items_count": original_items_count, + "total_items_count": total_items_count, + "duplication_factor": duplication_factor, + } + + dataset = create_evaluation_dataset( + session=session, + name=dataset_name, + description=description, + dataset_metadata=metadata, + object_store_url=object_store_url, + langfuse_dataset_id=langfuse_dataset_id, + organization_id=organization_id, + project_id=project_id, + ) + + logger.info( + f"[upload_dataset] Successfully created dataset record in database | " + f"id={dataset.id} | name={dataset_name}" + ) + + return dataset diff --git a/backend/app/services/evaluation/evaluation.py b/backend/app/services/evaluation/evaluation.py new file mode 100644 index 00000000..ea091538 --- /dev/null +++ b/backend/app/services/evaluation/evaluation.py @@ -0,0 +1,329 @@ +"""Evaluation run orchestration service.""" + +import logging + +from fastapi import HTTPException +from sqlmodel import Session + +from app.crud.assistants import get_assistant_by_id +from app.crud.evaluations import ( + create_evaluation_run, + fetch_trace_scores_from_langfuse, + get_dataset_by_id, + get_evaluation_run_by_id, + save_score, + start_evaluation_batch, +) +from app.models.evaluation import EvaluationRun +from app.utils import get_langfuse_client, get_openai_client + +logger = logging.getLogger(__name__) + + +def build_evaluation_config( + session: Session, + config: dict, + assistant_id: str | None, + project_id: int, +) -> dict: + """ + Build evaluation configuration from assistant or provided config. + + If assistant_id is provided, fetch assistant and merge with config. + Config values take precedence over assistant values. + + Args: + session: Database session + config: Provided configuration dict + assistant_id: Optional assistant ID to fetch configuration from + project_id: Project ID for assistant lookup + + Returns: + Complete evaluation configuration dict + + Raises: + HTTPException: If assistant not found or model missing + """ + if assistant_id: + # Fetch assistant details from database + assistant = get_assistant_by_id( + session=session, + assistant_id=assistant_id, + project_id=project_id, + ) + + if not assistant: + raise HTTPException( + status_code=404, detail=f"Assistant {assistant_id} not found" + ) + + logger.info( + f"[build_evaluation_config] Found assistant in DB | id={assistant.id} | " + f"model={assistant.model} | instructions=" + f"{assistant.instructions[:50] if assistant.instructions else 'None'}..." + ) + + # Build config from assistant (use provided config values to override if present) + merged_config = { + "model": config.get("model", assistant.model), + "instructions": config.get("instructions", assistant.instructions), + "temperature": config.get("temperature", assistant.temperature), + } + + # Add tools if vector stores are available + vector_store_ids = config.get( + "vector_store_ids", assistant.vector_store_ids or [] + ) + if vector_store_ids and len(vector_store_ids) > 0: + merged_config["tools"] = [ + { + "type": "file_search", + "vector_store_ids": vector_store_ids, + } + ] + + logger.info("[build_evaluation_config] Using config from assistant") + return merged_config + + # Using provided config directly + logger.info("[build_evaluation_config] Using provided config directly") + + # Validate that config has minimum required fields + if not config.get("model"): + raise HTTPException( + status_code=400, + detail="Config must include 'model' when assistant_id is not provided", + ) + + return config + + +def start_evaluation( + session: Session, + dataset_id: int, + experiment_name: str, + config: dict, + assistant_id: str | None, + organization_id: int, + project_id: int, +) -> EvaluationRun: + """ + Start an evaluation run. + + Steps: + 1. Validate dataset exists and has Langfuse ID + 2. Build config (from assistant or direct) + 3. Create evaluation run record + 4. Start batch processing + + Args: + session: Database session + dataset_id: ID of the evaluation dataset + experiment_name: Name for this evaluation experiment/run + config: Evaluation configuration + assistant_id: Optional assistant ID to fetch configuration from + organization_id: Organization ID + project_id: Project ID + + Returns: + EvaluationRun instance + + Raises: + HTTPException: If dataset not found or evaluation fails to start + """ + logger.info( + f"[start_evaluation] Starting evaluation | experiment_name={experiment_name} | " + f"dataset_id={dataset_id} | " + f"org_id={organization_id} | " + f"assistant_id={assistant_id} | " + f"config_keys={list(config.keys())}" + ) + + # Step 1: Fetch dataset from database + dataset = get_dataset_by_id( + session=session, + dataset_id=dataset_id, + organization_id=organization_id, + project_id=project_id, + ) + + if not dataset: + raise HTTPException( + status_code=404, + detail=f"Dataset {dataset_id} not found or not accessible to this " + f"organization/project", + ) + + logger.info( + f"[start_evaluation] Found dataset | id={dataset.id} | name={dataset.name} | " + f"object_store_url={'present' if dataset.object_store_url else 'None'} | " + f"langfuse_id={dataset.langfuse_dataset_id}" + ) + + # Validate dataset has Langfuse ID + if not dataset.langfuse_dataset_id: + raise HTTPException( + status_code=400, + detail=f"Dataset {dataset_id} does not have a Langfuse dataset ID. " + "Please ensure Langfuse credentials were configured when the dataset was created.", + ) + + # Step 2: Build evaluation config + eval_config = build_evaluation_config( + session=session, + config=config, + assistant_id=assistant_id, + project_id=project_id, + ) + + # Get API clients + openai_client = get_openai_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + langfuse = get_langfuse_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + + # Step 3: Create EvaluationRun record + eval_run = create_evaluation_run( + session=session, + run_name=experiment_name, + dataset_name=dataset.name, + dataset_id=dataset_id, + config=eval_config, + organization_id=organization_id, + project_id=project_id, + ) + + # Step 4: Start the batch evaluation + try: + eval_run = start_evaluation_batch( + langfuse=langfuse, + openai_client=openai_client, + session=session, + eval_run=eval_run, + config=eval_config, + ) + + logger.info( + f"[start_evaluation] Evaluation started successfully | " + f"batch_job_id={eval_run.batch_job_id} | total_items={eval_run.total_items}" + ) + + return eval_run + + except Exception as e: + logger.error( + f"[start_evaluation] Failed to start evaluation | run_id={eval_run.id} | {e}", + exc_info=True, + ) + # Error is already handled in start_evaluation_batch + session.refresh(eval_run) + return eval_run + + +def get_evaluation_with_scores( + session: Session, + evaluation_id: int, + organization_id: int, + project_id: int, + get_trace_info: bool, + resync_score: bool, +) -> tuple[EvaluationRun | None, str | None]: + """ + Get evaluation run, optionally with trace scores from Langfuse. + + Handles caching logic for trace scores - scores are fetched on first request + and cached in the database. + + Args: + session: Database session + evaluation_id: ID of the evaluation run + organization_id: Organization ID + project_id: Project ID + get_trace_info: If true, fetch trace scores + resync_score: If true, clear cached scores and re-fetch + + Returns: + Tuple of (EvaluationRun or None, error_message or None) + """ + logger.info( + f"[get_evaluation_with_scores] Fetching status for evaluation run | " + f"evaluation_id={evaluation_id} | " + f"org_id={organization_id} | " + f"project_id={project_id} | " + f"get_trace_info={get_trace_info} | " + f"resync_score={resync_score}" + ) + + eval_run = get_evaluation_run_by_id( + session=session, + evaluation_id=evaluation_id, + organization_id=organization_id, + project_id=project_id, + ) + + if not eval_run: + return None, None + + if not get_trace_info: + return eval_run, None + + # Only fetch trace info for completed evaluations + if eval_run.status != "completed": + return eval_run, ( + f"Trace info is only available for completed evaluations. " + f"Current status: {eval_run.status}" + ) + + # Check if we already have cached scores + has_cached_score = eval_run.score is not None and "traces" in eval_run.score + if not resync_score and has_cached_score: + return eval_run, None + + # Get Langfuse client + langfuse = get_langfuse_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + + # Capture data needed for Langfuse fetch and DB update + dataset_name = eval_run.dataset_name + run_name = eval_run.run_name + eval_run_id = eval_run.id + + # Fetch scores from Langfuse + try: + score = fetch_trace_scores_from_langfuse( + langfuse=langfuse, + dataset_name=dataset_name, + run_name=run_name, + ) + except ValueError as e: + logger.warning( + f"[get_evaluation_with_scores] Run not found in Langfuse | " + f"evaluation_id={evaluation_id} | error={e}" + ) + return eval_run, str(e) + except Exception as e: + logger.error( + f"[get_evaluation_with_scores] Failed to fetch trace info | " + f"evaluation_id={evaluation_id} | error={e}", + exc_info=True, + ) + return eval_run, f"Failed to fetch trace info from Langfuse: {str(e)}" + + # Save score to database (uses its own session) + eval_run = save_score( + eval_run_id=eval_run_id, + organization_id=organization_id, + project_id=project_id, + score=score, + ) + + return eval_run, None diff --git a/backend/app/services/evaluation/validators.py b/backend/app/services/evaluation/validators.py new file mode 100644 index 00000000..e4adc0b2 --- /dev/null +++ b/backend/app/services/evaluation/validators.py @@ -0,0 +1,174 @@ +"""Validation utilities for evaluation datasets.""" + +import csv +import io +import logging +import re +from pathlib import Path + +from fastapi import HTTPException, UploadFile + +logger = logging.getLogger(__name__) + +# File upload security constants +MAX_FILE_SIZE = 1024 * 1024 # 1 MB +ALLOWED_EXTENSIONS = {".csv"} +ALLOWED_MIME_TYPES = { + "text/csv", + "application/csv", + "text/plain", # Some systems report CSV as text/plain +} + + +def sanitize_dataset_name(name: str) -> str: + """ + Sanitize dataset name for Langfuse compatibility. + + Langfuse has issues with spaces and special characters in dataset names. + This function ensures the name can be both created and fetched. + + Rules: + - Replace spaces with underscores + - Replace hyphens with underscores + - Keep only alphanumeric characters and underscores + - Convert to lowercase for consistency + - Remove leading/trailing underscores + - Collapse multiple consecutive underscores into one + + Args: + name: Original dataset name + + Returns: + Sanitized dataset name safe for Langfuse + + Examples: + "testing 0001" -> "testing_0001" + "My Dataset!" -> "my_dataset" + "Test--Data__Set" -> "test_data_set" + """ + # Convert to lowercase + sanitized = name.lower() + + # Replace spaces and hyphens with underscores + sanitized = sanitized.replace(" ", "_").replace("-", "_") + + # Keep only alphanumeric characters and underscores + sanitized = re.sub(r"[^a-z0-9_]", "", sanitized) + + # Collapse multiple underscores into one + sanitized = re.sub(r"_+", "_", sanitized) + + # Remove leading/trailing underscores + sanitized = sanitized.strip("_") + + # Ensure name is not empty + if not sanitized: + raise ValueError("Dataset name cannot be empty after sanitization") + + return sanitized + + +async def validate_csv_file(file: UploadFile) -> bytes: + """ + Validate CSV file extension, MIME type, and size. + + Args: + file: The uploaded file + + Returns: + CSV content as bytes if valid + + Raises: + HTTPException: If validation fails + """ + # Security validation: Check file extension + file_ext = Path(file.filename).suffix.lower() + if file_ext not in ALLOWED_EXTENSIONS: + raise HTTPException( + status_code=422, + detail=f"Invalid file type. Only CSV files are allowed. Got: {file_ext}", + ) + + # Security validation: Check MIME type + content_type = file.content_type + if content_type not in ALLOWED_MIME_TYPES: + raise HTTPException( + status_code=422, + detail=f"Invalid content type. Expected CSV, got: {content_type}", + ) + + # Security validation: Check file size + file.file.seek(0, 2) # Seek to end + file_size = file.file.tell() + file.file.seek(0) # Reset to beginning + + if file_size > MAX_FILE_SIZE: + raise HTTPException( + status_code=413, + detail=f"File too large. Maximum size: {MAX_FILE_SIZE / (1024 * 1024):.0f}MB", + ) + + if file_size == 0: + raise HTTPException(status_code=422, detail="Empty file uploaded") + + # Read and return content + return await file.read() + + +def parse_csv_items(csv_content: bytes) -> list[dict[str, str]]: + """ + Parse CSV and extract question/answer pairs. + + Args: + csv_content: CSV file content as bytes + + Returns: + List of dicts with 'question' and 'answer' keys + + Raises: + HTTPException: If CSV is invalid or empty + """ + try: + csv_text = csv_content.decode("utf-8") + csv_reader = csv.DictReader(io.StringIO(csv_text)) + + if not csv_reader.fieldnames: + raise HTTPException(status_code=422, detail="CSV file has no headers") + + # Normalize headers for case-insensitive matching + clean_headers = { + field.strip().lower(): field for field in csv_reader.fieldnames + } + + # Validate required headers (case-insensitive) + if "question" not in clean_headers or "answer" not in clean_headers: + raise HTTPException( + status_code=422, + detail=f"CSV must contain 'question' and 'answer' columns " + f"Found columns: {csv_reader.fieldnames}", + ) + + # Get the actual column names from the CSV + question_col = clean_headers["question"] + answer_col = clean_headers["answer"] + + # Extract items + items = [] + for row in csv_reader: + question = row.get(question_col, "").strip() + answer = row.get(answer_col, "").strip() + if question and answer: + items.append({"question": question, "answer": answer}) + + if not items: + raise HTTPException( + status_code=422, detail="No valid items found in CSV file" + ) + + return items + + except HTTPException: + raise + except Exception as e: + logger.error(f"[parse_csv_items] Failed to parse CSV | {e}", exc_info=True) + raise HTTPException(status_code=422, detail=f"Invalid CSV file: {e}") diff --git a/backend/app/tests/api/routes/test_evaluation.py b/backend/app/tests/api/routes/test_evaluation.py index c4eb3f0b..00aef2f5 100644 --- a/backend/app/tests/api/routes/test_evaluation.py +++ b/backend/app/tests/api/routes/test_evaluation.py @@ -54,13 +54,13 @@ def test_upload_dataset_valid_csv( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): # Mock object store upload @@ -140,13 +140,13 @@ def test_upload_dataset_empty_rows( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): # Mock object store and Langfuse uploads @@ -186,13 +186,13 @@ def test_upload_with_default_duplication( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -227,13 +227,13 @@ def test_upload_with_custom_duplication( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -268,13 +268,13 @@ def test_upload_with_description( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -360,13 +360,13 @@ def test_upload_with_duplication_factor_boundary_minimum( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.api.routes.evaluation.get_langfuse_client" + "app.services.evaluation.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.api.routes.evaluation.upload_dataset_to_langfuse" + "app.services.evaluation.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -405,7 +405,7 @@ def test_upload_langfuse_configuration_fails( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.api.routes.evaluation.upload_csv_to_object_store" + "app.services.evaluation.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch("app.crud.credentials.get_provider_credential") as mock_get_cred, ): From 7ba9b4339e0d0e2e57210e2feef99b22526dc89f Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 18 Dec 2025 12:39:28 +0530 Subject: [PATCH 2/9] cleanups --- backend/app/crud/evaluations/__init__.py | 33 --------------------- backend/app/services/evaluation/__init__.py | 16 ---------- 2 files changed, 49 deletions(-) diff --git a/backend/app/crud/evaluations/__init__.py b/backend/app/crud/evaluations/__init__.py index 4e797e9f..bb095413 100644 --- a/backend/app/crud/evaluations/__init__.py +++ b/backend/app/crud/evaluations/__init__.py @@ -35,36 +35,3 @@ process_completed_embedding_batch, process_completed_evaluation, ) - -__all__ = [ - # Core - "create_evaluation_run", - "get_evaluation_run_by_id", - "list_evaluation_runs", - "save_score", - # Cron - "process_all_pending_evaluations", - "process_all_pending_evaluations_sync", - # Dataset - "create_evaluation_dataset", - "delete_dataset", - "get_dataset_by_id", - "list_datasets", - "upload_csv_to_object_store", - # Batch - "start_evaluation_batch", - # Processing - "check_and_process_evaluation", - "poll_all_pending_evaluations", - "process_completed_embedding_batch", - "process_completed_evaluation", - # Embeddings - "calculate_average_similarity", - "calculate_cosine_similarity", - "start_embedding_batch", - # Langfuse - "create_langfuse_dataset_run", - "fetch_trace_scores_from_langfuse", - "update_traces_with_cosine_scores", - "upload_dataset_to_langfuse", -] diff --git a/backend/app/services/evaluation/__init__.py b/backend/app/services/evaluation/__init__.py index 6e3aac41..8c5a3690 100644 --- a/backend/app/services/evaluation/__init__.py +++ b/backend/app/services/evaluation/__init__.py @@ -14,19 +14,3 @@ sanitize_dataset_name, validate_csv_file, ) - -__all__ = [ - # Dataset - "upload_dataset", - # Evaluation - "build_evaluation_config", - "get_evaluation_with_scores", - "start_evaluation", - # Validators - "ALLOWED_EXTENSIONS", - "ALLOWED_MIME_TYPES", - "MAX_FILE_SIZE", - "parse_csv_items", - "sanitize_dataset_name", - "validate_csv_file", -] From 49a2063d621ae236501ac60202574535f648f8c4 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 18 Dec 2025 12:52:55 +0530 Subject: [PATCH 3/9] run pre commit --- backend/app/api/routes/evaluation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/app/api/routes/evaluation.py b/backend/app/api/routes/evaluation.py index ce7f3f9a..fa35c84f 100644 --- a/backend/app/api/routes/evaluation.py +++ b/backend/app/api/routes/evaluation.py @@ -30,6 +30,7 @@ router = APIRouter(tags=["evaluation"]) + def _dataset_to_response(dataset) -> DatasetUploadResponse: """Convert a dataset model to a DatasetUploadResponse.""" return DatasetUploadResponse( From 698ccf8be3f7790cbb3c7f8d85aa757f0d76cd21 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 25 Dec 2025 10:50:58 +0530 Subject: [PATCH 4/9] added missing permission imports --- backend/app/api/routes/evaluation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/app/api/routes/evaluation.py b/backend/app/api/routes/evaluation.py index 002ba486..bd4130d6 100644 --- a/backend/app/api/routes/evaluation.py +++ b/backend/app/api/routes/evaluation.py @@ -24,6 +24,7 @@ DatasetUploadResponse, EvaluationRunPublic, ) +from app.api.permissions import Permission, require_permission from app.services.evaluation import ( get_evaluation_with_scores, start_evaluation, From 54c6754008158e677ef151953300f863a8b5fe04 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 25 Dec 2025 11:24:01 +0530 Subject: [PATCH 5/9] refactoring batch code --- backend/app/core/batch/__init__.py | 18 ++++++- .../batch/operations.py} | 46 +--------------- backend/app/core/batch/polling.py | 53 +++++++++++++++++++ backend/app/crud/evaluations/batch.py | 3 +- backend/app/crud/evaluations/embeddings.py | 7 ++- backend/app/crud/evaluations/processing.py | 13 ++--- backend/app/crud/job/__init__.py | 24 +++++++++ backend/app/crud/job/batch.py | 22 ++++++++ backend/app/crud/{batch_job.py => job/job.py} | 0 9 files changed, 125 insertions(+), 61 deletions(-) rename backend/app/{crud/batch_operations.py => core/batch/operations.py} (79%) create mode 100644 backend/app/core/batch/polling.py create mode 100644 backend/app/crud/job/__init__.py create mode 100644 backend/app/crud/job/batch.py rename backend/app/crud/{batch_job.py => job/job.py} (100%) diff --git a/backend/app/core/batch/__init__.py b/backend/app/core/batch/__init__.py index 9f7cd88d..0cd10bd5 100644 --- a/backend/app/core/batch/__init__.py +++ b/backend/app/core/batch/__init__.py @@ -1,5 +1,21 @@ """Batch processing infrastructure for LLM providers.""" from .base import BatchProvider +from .openai import OpenAIBatchProvider +from .operations import ( + download_batch_results, + process_completed_batch, + start_batch_job, + upload_batch_results_to_object_store, +) +from .polling import poll_batch_status -__all__ = ["BatchProvider"] +__all__ = [ + "BatchProvider", + "OpenAIBatchProvider", + "start_batch_job", + "download_batch_results", + "process_completed_batch", + "upload_batch_results_to_object_store", + "poll_batch_status", +] diff --git a/backend/app/crud/batch_operations.py b/backend/app/core/batch/operations.py similarity index 79% rename from backend/app/crud/batch_operations.py rename to backend/app/core/batch/operations.py index f2bb332e..d02884fd 100644 --- a/backend/app/crud/batch_operations.py +++ b/backend/app/core/batch/operations.py @@ -8,10 +8,7 @@ from app.core.batch.base import BatchProvider from app.core.cloud import get_cloud_storage from app.core.storage_utils import upload_jsonl_to_object_store as shared_upload_jsonl -from app.crud.batch_job import ( - create_batch_job, - update_batch_job, -) +from app.crud.job.job import create_batch_job, update_batch_job from app.models.batch_job import BatchJob, BatchJobCreate, BatchJobUpdate logger = logging.getLogger(__name__) @@ -86,47 +83,6 @@ def start_batch_job( raise -def poll_batch_status( - session: Session, provider: BatchProvider, batch_job: BatchJob -) -> dict[str, Any]: - """Poll provider for batch status and update database.""" - logger.info( - f"[poll_batch_status] Polling | id={batch_job.id} | " - f"provider_batch_id={batch_job.provider_batch_id}" - ) - - try: - status_result = provider.get_batch_status(batch_job.provider_batch_id) - - provider_status = status_result["provider_status"] - if provider_status != batch_job.provider_status: - update_data = {"provider_status": provider_status} - - if status_result.get("provider_output_file_id"): - update_data["provider_output_file_id"] = status_result[ - "provider_output_file_id" - ] - - if status_result.get("error_message"): - update_data["error_message"] = status_result["error_message"] - - batch_job_update = BatchJobUpdate(**update_data) - batch_job = update_batch_job( - session=session, batch_job=batch_job, batch_job_update=batch_job_update - ) - - logger.info( - f"[poll_batch_status] Updated | id={batch_job.id} | " - f"{batch_job.provider_status} -> {provider_status}" - ) - - return status_result - - except Exception as e: - logger.error(f"[poll_batch_status] Failed | {e}", exc_info=True) - raise - - def download_batch_results( provider: BatchProvider, batch_job: BatchJob ) -> list[dict[str, Any]]: diff --git a/backend/app/core/batch/polling.py b/backend/app/core/batch/polling.py new file mode 100644 index 00000000..c364aeb3 --- /dev/null +++ b/backend/app/core/batch/polling.py @@ -0,0 +1,53 @@ +"""Batch status polling operations.""" + +import logging +from typing import Any + +from sqlmodel import Session + +from app.core.batch.base import BatchProvider +from app.crud.job.job import update_batch_job +from app.models.batch_job import BatchJob, BatchJobUpdate + +logger = logging.getLogger(__name__) + + +def poll_batch_status( + session: Session, provider: BatchProvider, batch_job: BatchJob +) -> dict[str, Any]: + """Poll provider for batch status and update database.""" + logger.info( + f"[poll_batch_status] Polling | id={batch_job.id} | " + f"provider_batch_id={batch_job.provider_batch_id}" + ) + + try: + status_result = provider.get_batch_status(batch_job.provider_batch_id) + + provider_status = status_result["provider_status"] + if provider_status != batch_job.provider_status: + update_data = {"provider_status": provider_status} + + if status_result.get("provider_output_file_id"): + update_data["provider_output_file_id"] = status_result[ + "provider_output_file_id" + ] + + if status_result.get("error_message"): + update_data["error_message"] = status_result["error_message"] + + batch_job_update = BatchJobUpdate(**update_data) + batch_job = update_batch_job( + session=session, batch_job=batch_job, batch_job_update=batch_job_update + ) + + logger.info( + f"[poll_batch_status] Updated | id={batch_job.id} | " + f"{batch_job.provider_status} -> {provider_status}" + ) + + return status_result + + except Exception as e: + logger.error(f"[poll_batch_status] Failed | {e}", exc_info=True) + raise diff --git a/backend/app/crud/evaluations/batch.py b/backend/app/crud/evaluations/batch.py index 7e8b6904..e880d7d0 100644 --- a/backend/app/crud/evaluations/batch.py +++ b/backend/app/crud/evaluations/batch.py @@ -14,8 +14,7 @@ from openai import OpenAI from sqlmodel import Session -from app.core.batch.openai import OpenAIBatchProvider -from app.crud.batch_operations import start_batch_job +from app.core.batch import OpenAIBatchProvider, start_batch_job from app.models import EvaluationRun logger = logging.getLogger(__name__) diff --git a/backend/app/crud/evaluations/embeddings.py b/backend/app/crud/evaluations/embeddings.py index 70e37421..f7ba2283 100644 --- a/backend/app/crud/evaluations/embeddings.py +++ b/backend/app/crud/evaluations/embeddings.py @@ -15,9 +15,8 @@ from openai import OpenAI from sqlmodel import Session -from app.core.batch.openai import OpenAIBatchProvider +from app.core.batch import OpenAIBatchProvider, start_batch_job from app.core.util import now -from app.crud.batch_operations import start_batch_job from app.models import EvaluationRun logger = logging.getLogger(__name__) @@ -43,7 +42,7 @@ def validate_embedding_model(model: str) -> None: if model not in VALID_EMBEDDING_MODELS: valid_models = ", ".join(VALID_EMBEDDING_MODELS.keys()) raise ValueError( - f"Invalid embedding model '{model}'. " f"Supported models: {valid_models}" + f"Invalid embedding model '{model}'. Supported models: {valid_models}" ) @@ -253,7 +252,7 @@ def calculate_cosine_similarity(vec1: list[float], vec2: list[float]) -> float: def calculate_average_similarity( - embedding_pairs: list[dict[str, Any]] + embedding_pairs: list[dict[str, Any]], ) -> dict[str, Any]: """ Calculate cosine similarity statistics for all embedding pairs. diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 12b89266..fbc2d231 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -19,10 +19,10 @@ from openai import OpenAI from sqlmodel import Session, select -from app.core.batch.openai import OpenAIBatchProvider -from app.crud.batch_job import get_batch_job -from app.crud.batch_operations import ( +from app.core.batch import ( + OpenAIBatchProvider, download_batch_results, + poll_batch_status, upload_batch_results_to_object_store, ) from app.crud.evaluations.batch import fetch_dataset_items @@ -36,6 +36,7 @@ create_langfuse_dataset_run, update_traces_with_cosine_scores, ) +from app.crud.job import get_batch_job from app.models import EvaluationRun from app.utils import get_langfuse_client, get_openai_client @@ -484,10 +485,6 @@ async def check_and_process_evaluation( if embedding_batch_job: # Poll embedding batch status provider = OpenAIBatchProvider(client=openai_client) - - # Local import to avoid circular dependency with batch_operations - from app.crud.batch_operations import poll_batch_status - poll_batch_status( session=session, provider=provider, batch_job=embedding_batch_job ) @@ -560,8 +557,6 @@ async def check_and_process_evaluation( # IMPORTANT: Poll OpenAI to get the latest status before checking provider = OpenAIBatchProvider(client=openai_client) - from app.crud.batch_operations import poll_batch_status - poll_batch_status(session=session, provider=provider, batch_job=batch_job) # Refresh batch_job to get the updated provider_status diff --git a/backend/app/crud/job/__init__.py b/backend/app/crud/job/__init__.py new file mode 100644 index 00000000..a7bbf558 --- /dev/null +++ b/backend/app/crud/job/__init__.py @@ -0,0 +1,24 @@ +"""Job-related CRUD operations. + +For batch operations (start_batch_job, poll_batch_status, etc.), +import directly from app.core.batch instead. +""" + +from app.crud.job.job import ( + create_batch_job, + delete_batch_job, + get_batch_job, + get_batch_jobs_by_ids, + get_batches_by_type, + update_batch_job, +) + +__all__ = [ + # CRUD operations + "create_batch_job", + "get_batch_job", + "update_batch_job", + "get_batch_jobs_by_ids", + "get_batches_by_type", + "delete_batch_job", +] diff --git a/backend/app/crud/job/batch.py b/backend/app/crud/job/batch.py new file mode 100644 index 00000000..4ee8c084 --- /dev/null +++ b/backend/app/crud/job/batch.py @@ -0,0 +1,22 @@ +""" +Batch operations re-export layer. + +This module provides convenient imports for batch-related operations +while the actual implementation lives in app.core.batch. +""" + +from app.core.batch.operations import ( + download_batch_results, + process_completed_batch, + start_batch_job, + upload_batch_results_to_object_store, +) +from app.core.batch.polling import poll_batch_status + +__all__ = [ + "start_batch_job", + "download_batch_results", + "process_completed_batch", + "upload_batch_results_to_object_store", + "poll_batch_status", +] diff --git a/backend/app/crud/batch_job.py b/backend/app/crud/job/job.py similarity index 100% rename from backend/app/crud/batch_job.py rename to backend/app/crud/job/job.py From d1283754ca2a5de2ab5b1fafba117e070f820e91 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 8 Jan 2026 11:54:14 +0530 Subject: [PATCH 6/9] refactoring --- backend/app/api/main.py | 4 +- .../app/api/routes/evaluations/__init__.py | 13 ++ .../{evaluation.py => evaluations/dataset.py} | 152 ++---------------- .../app/api/routes/evaluations/evaluation.py | 148 +++++++++++++++++ .../{evaluation => evaluations}/__init__.py | 6 +- .../{evaluation => evaluations}/dataset.py | 2 +- .../{evaluation => evaluations}/evaluation.py | 0 .../{evaluation => evaluations}/validators.py | 0 8 files changed, 179 insertions(+), 146 deletions(-) create mode 100644 backend/app/api/routes/evaluations/__init__.py rename backend/app/api/routes/{evaluation.py => evaluations/dataset.py} (53%) create mode 100644 backend/app/api/routes/evaluations/evaluation.py rename backend/app/services/{evaluation => evaluations}/__init__.py (61%) rename backend/app/services/{evaluation => evaluations}/dataset.py (98%) rename backend/app/services/{evaluation => evaluations}/evaluation.py (100%) rename backend/app/services/{evaluation => evaluations}/validators.py (100%) diff --git a/backend/app/api/main.py b/backend/app/api/main.py index c071a9e1..47cea3b1 100644 --- a/backend/app/api/main.py +++ b/backend/app/api/main.py @@ -20,7 +20,7 @@ onboarding, credentials, cron, - evaluation, + evaluations, fine_tuning, model_evaluation, collection_job, @@ -37,7 +37,7 @@ api_router.include_router(cron.router) api_router.include_router(documents.router) api_router.include_router(doc_transformation_job.router) -api_router.include_router(evaluation.router) +api_router.include_router(evaluations.router) api_router.include_router(llm.router) api_router.include_router(login.router) api_router.include_router(onboarding.router) diff --git a/backend/app/api/routes/evaluations/__init__.py b/backend/app/api/routes/evaluations/__init__.py new file mode 100644 index 00000000..3f7fe120 --- /dev/null +++ b/backend/app/api/routes/evaluations/__init__.py @@ -0,0 +1,13 @@ +"""Evaluation API routes.""" + +from fastapi import APIRouter + +from app.api.routes.evaluations import dataset, evaluation + +router = APIRouter(prefix="/evaluations", tags=["evaluation"]) + +# Include dataset routes under /evaluations/datasets +router.include_router(dataset.router, prefix="/datasets") + +# Include evaluation routes directly under /evaluations +router.include_router(evaluation.router) diff --git a/backend/app/api/routes/evaluation.py b/backend/app/api/routes/evaluations/dataset.py similarity index 53% rename from backend/app/api/routes/evaluation.py rename to backend/app/api/routes/evaluations/dataset.py index bd4130d6..a6bd53e5 100644 --- a/backend/app/api/routes/evaluation.py +++ b/backend/app/api/routes/evaluations/dataset.py @@ -1,16 +1,14 @@ -"""Evaluation API routes.""" +"""Evaluation dataset API routes.""" import logging from fastapi import ( APIRouter, - Body, + Depends, File, Form, HTTPException, - Query, UploadFile, - Depends, ) from app.api.deps import AuthContextDep, SessionDep @@ -18,16 +16,10 @@ get_dataset_by_id, list_datasets, ) -from app.crud.evaluations import list_evaluation_runs as list_evaluation_runs_crud from app.crud.evaluations.dataset import delete_dataset as delete_dataset_crud -from app.models.evaluation import ( - DatasetUploadResponse, - EvaluationRunPublic, -) +from app.models.evaluation import DatasetUploadResponse, EvaluationDataset from app.api.permissions import Permission, require_permission -from app.services.evaluation import ( - get_evaluation_with_scores, - start_evaluation, +from app.services.evaluations import ( upload_dataset, validate_csv_file, ) @@ -38,10 +30,10 @@ logger = logging.getLogger(__name__) -router = APIRouter(tags=["evaluation"]) +router = APIRouter() -def _dataset_to_response(dataset) -> DatasetUploadResponse: +def _dataset_to_response(dataset: EvaluationDataset) -> DatasetUploadResponse: """Convert a dataset model to a DatasetUploadResponse.""" return DatasetUploadResponse( dataset_id=dataset.id, @@ -55,7 +47,7 @@ def _dataset_to_response(dataset) -> DatasetUploadResponse: @router.post( - "/evaluations/datasets", + "/", description=load_description("evaluation/upload_dataset.md"), response_model=APIResponse[DatasetUploadResponse], dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], @@ -86,15 +78,15 @@ async def upload_dataset_endpoint( dataset_name=dataset_name, description=description, duplication_factor=duplication_factor, - organization_id=auth_context.organization.id, - project_id=auth_context.project.id, + organization_id=auth_context.organization_.id, + project_id=auth_context.project_.id, ) return APIResponse.success_response(data=_dataset_to_response(dataset)) @router.get( - "/evaluations/datasets", + "/", description=load_description("evaluation/list_datasets.md"), response_model=APIResponse[list[DatasetUploadResponse]], dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], @@ -124,7 +116,7 @@ def list_datasets_endpoint( @router.get( - "/evaluations/datasets/{dataset_id}", + "/{dataset_id}", description=load_description("evaluation/get_dataset.md"), response_model=APIResponse[DatasetUploadResponse], dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], @@ -157,7 +149,7 @@ def get_dataset( @router.delete( - "/evaluations/datasets/{dataset_id}", + "/{dataset_id}", description=load_description("evaluation/delete_dataset.md"), response_model=APIResponse[dict], dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], @@ -191,123 +183,3 @@ def delete_dataset( return APIResponse.success_response( data={"message": message, "dataset_id": dataset_id} ) - - -@router.post( - "/evaluations", - description=load_description("evaluation/create_evaluation.md"), - response_model=APIResponse[EvaluationRunPublic], - dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], -) -def evaluate( - _session: SessionDep, - auth_context: AuthContextDep, - dataset_id: int = Body(..., description="ID of the evaluation dataset"), - experiment_name: str = Body( - ..., description="Name for this evaluation experiment/run" - ), - config: dict = Body(default_factory=dict, description="Evaluation configuration"), - assistant_id: str - | None = Body( - None, description="Optional assistant ID to fetch configuration from" - ), -) -> APIResponse[EvaluationRunPublic]: - """Start an evaluation run.""" - eval_run = start_evaluation( - session=_session, - dataset_id=dataset_id, - experiment_name=experiment_name, - config=config, - assistant_id=assistant_id, - organization_id=auth_context.organization.id, - project_id=auth_context.project.id, - ) - - return APIResponse.success_response(data=eval_run) - - -@router.get( - "/evaluations", - description=load_description("evaluation/list_evaluations.md"), - response_model=APIResponse[list[EvaluationRunPublic]], - dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], -) -def list_evaluation_runs( - _session: SessionDep, - auth_context: AuthContextDep, - limit: int = 50, - offset: int = 0, -) -> APIResponse[list[EvaluationRunPublic]]: - """List evaluation runs.""" - logger.info( - f"[list_evaluation_runs] Listing evaluation runs | " - f"org_id={auth_context.organization_.id} | " - f"project_id={auth_context.project_.id} | limit={limit} | offset={offset}" - ) - - return APIResponse.success_response( - data=list_evaluation_runs_crud( - session=_session, - organization_id=auth_context.organization_.id, - project_id=auth_context.project_.id, - limit=limit, - offset=offset, - ) - ) - - -@router.get( - "/evaluations/{evaluation_id}", - description=load_description("evaluation/get_evaluation.md"), - response_model=APIResponse[EvaluationRunPublic], - dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], -) -def get_evaluation_run_status( - evaluation_id: int, - _session: SessionDep, - auth_context: AuthContextDep, - get_trace_info: bool = Query( - False, - description=( - "If true, fetch and include Langfuse trace scores with Q&A context. " - "On first request, data is fetched from Langfuse and cached. " - "Subsequent requests return cached data." - ), - ), - resync_score: bool = Query( - False, - description=( - "If true, clear cached scores and re-fetch from Langfuse. " - "Useful when new evaluators have been added or scores have been updated. " - "Requires get_trace_info=true." - ), - ), -) -> APIResponse[EvaluationRunPublic]: - """Get evaluation run status with optional trace info.""" - if resync_score and not get_trace_info: - raise HTTPException( - status_code=400, - detail="resync_score=true requires get_trace_info=true", - ) - - eval_run, error = get_evaluation_with_scores( - session=_session, - evaluation_id=evaluation_id, - organization_id=auth_context.organization.id, - project_id=auth_context.project.id, - get_trace_info=get_trace_info, - resync_score=resync_score, - ) - - if not eval_run: - raise HTTPException( - status_code=404, - detail=( - f"Evaluation run {evaluation_id} not found or not accessible " - "to this organization" - ), - ) - - if error: - return APIResponse.failure_response(error=error, data=eval_run) - return APIResponse.success_response(data=eval_run) diff --git a/backend/app/api/routes/evaluations/evaluation.py b/backend/app/api/routes/evaluations/evaluation.py new file mode 100644 index 00000000..81f34bf4 --- /dev/null +++ b/backend/app/api/routes/evaluations/evaluation.py @@ -0,0 +1,148 @@ +"""Evaluation run API routes.""" + +import logging + +from fastapi import ( + APIRouter, + Body, + Depends, + HTTPException, + Query, +) + +from app.api.deps import AuthContextDep, SessionDep +from app.crud.evaluations import list_evaluation_runs as list_evaluation_runs_crud +from app.models.evaluation import EvaluationRunPublic +from app.api.permissions import Permission, require_permission +from app.services.evaluations import ( + get_evaluation_with_scores, + start_evaluation, +) +from app.utils import ( + APIResponse, + load_description, +) + +logger = logging.getLogger(__name__) + +router = APIRouter() + + +@router.post( + "/", + description=load_description("evaluation/create_evaluation.md"), + response_model=APIResponse[EvaluationRunPublic], + dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], +) +def evaluate( + _session: SessionDep, + auth_context: AuthContextDep, + dataset_id: int = Body(..., description="ID of the evaluation dataset"), + experiment_name: str = Body( + ..., description="Name for this evaluation experiment/run" + ), + config: dict = Body(default_factory=dict, description="Evaluation configuration"), + assistant_id: str + | None = Body( + None, description="Optional assistant ID to fetch configuration from" + ), +) -> APIResponse[EvaluationRunPublic]: + """Start an evaluation run.""" + eval_run = start_evaluation( + session=_session, + dataset_id=dataset_id, + experiment_name=experiment_name, + config=config, + assistant_id=assistant_id, + organization_id=auth_context.organization_.id, + project_id=auth_context.project_.id, + ) + + return APIResponse.success_response(data=eval_run) + + +@router.get( + "/", + description=load_description("evaluation/list_evaluations.md"), + response_model=APIResponse[list[EvaluationRunPublic]], + dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], +) +def list_evaluation_runs( + _session: SessionDep, + auth_context: AuthContextDep, + limit: int = 50, + offset: int = 0, +) -> APIResponse[list[EvaluationRunPublic]]: + """List evaluation runs.""" + logger.info( + f"[list_evaluation_runs] Listing evaluation runs | " + f"org_id={auth_context.organization_.id} | " + f"project_id={auth_context.project_.id} | limit={limit} | offset={offset}" + ) + + return APIResponse.success_response( + data=list_evaluation_runs_crud( + session=_session, + organization_id=auth_context.organization_.id, + project_id=auth_context.project_.id, + limit=limit, + offset=offset, + ) + ) + + +@router.get( + "/{evaluation_id}", + description=load_description("evaluation/get_evaluation.md"), + response_model=APIResponse[EvaluationRunPublic], + dependencies=[Depends(require_permission(Permission.REQUIRE_PROJECT))], +) +def get_evaluation_run_status( + evaluation_id: int, + _session: SessionDep, + auth_context: AuthContextDep, + get_trace_info: bool = Query( + False, + description=( + "If true, fetch and include Langfuse trace scores with Q&A context. " + "On first request, data is fetched from Langfuse and cached. " + "Subsequent requests return cached data." + ), + ), + resync_score: bool = Query( + False, + description=( + "If true, clear cached scores and re-fetch from Langfuse. " + "Useful when new evaluators have been added or scores have been updated. " + "Requires get_trace_info=true." + ), + ), +) -> APIResponse[EvaluationRunPublic]: + """Get evaluation run status with optional trace info.""" + if resync_score and not get_trace_info: + raise HTTPException( + status_code=400, + detail="resync_score=true requires get_trace_info=true", + ) + + eval_run, error = get_evaluation_with_scores( + session=_session, + evaluation_id=evaluation_id, + organization_id=auth_context.organization_.id, + project_id=auth_context.project_.id, + get_trace_info=get_trace_info, + resync_score=resync_score, + ) + + if not eval_run: + raise HTTPException( + status_code=404, + detail=( + f"Evaluation run {evaluation_id} not found or not accessible " + "to this organization" + ), + ) + + if error: + return APIResponse.failure_response(error=error, data=eval_run) + return APIResponse.success_response(data=eval_run) diff --git a/backend/app/services/evaluation/__init__.py b/backend/app/services/evaluations/__init__.py similarity index 61% rename from backend/app/services/evaluation/__init__.py rename to backend/app/services/evaluations/__init__.py index 8c5a3690..62201b42 100644 --- a/backend/app/services/evaluation/__init__.py +++ b/backend/app/services/evaluations/__init__.py @@ -1,12 +1,12 @@ """Evaluation services.""" -from app.services.evaluation.dataset import upload_dataset -from app.services.evaluation.evaluation import ( +from app.services.evaluations.dataset import upload_dataset +from app.services.evaluations.evaluation import ( build_evaluation_config, get_evaluation_with_scores, start_evaluation, ) -from app.services.evaluation.validators import ( +from app.services.evaluations.validators import ( ALLOWED_EXTENSIONS, ALLOWED_MIME_TYPES, MAX_FILE_SIZE, diff --git a/backend/app/services/evaluation/dataset.py b/backend/app/services/evaluations/dataset.py similarity index 98% rename from backend/app/services/evaluation/dataset.py rename to backend/app/services/evaluations/dataset.py index 5ac9470e..fe0e8924 100644 --- a/backend/app/services/evaluation/dataset.py +++ b/backend/app/services/evaluations/dataset.py @@ -12,7 +12,7 @@ upload_dataset_to_langfuse, ) from app.models.evaluation import EvaluationDataset -from app.services.evaluation.validators import ( +from app.services.evaluations.validators import ( parse_csv_items, sanitize_dataset_name, ) diff --git a/backend/app/services/evaluation/evaluation.py b/backend/app/services/evaluations/evaluation.py similarity index 100% rename from backend/app/services/evaluation/evaluation.py rename to backend/app/services/evaluations/evaluation.py diff --git a/backend/app/services/evaluation/validators.py b/backend/app/services/evaluations/validators.py similarity index 100% rename from backend/app/services/evaluation/validators.py rename to backend/app/services/evaluations/validators.py From 696dba61158b1adf6efa869bc1d5a8a5c0c250ef Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 8 Jan 2026 16:36:50 +0530 Subject: [PATCH 7/9] coderabbit suggestion changes --- backend/app/api/routes/evaluations/dataset.py | 8 +++++--- backend/app/crud/evaluations/embeddings.py | 2 +- backend/app/services/evaluations/validators.py | 5 +++++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/backend/app/api/routes/evaluations/dataset.py b/backend/app/api/routes/evaluations/dataset.py index a6bd53e5..0eb3252b 100644 --- a/backend/app/api/routes/evaluations/dataset.py +++ b/backend/app/api/routes/evaluations/dataset.py @@ -1,13 +1,13 @@ """Evaluation dataset API routes.""" import logging - from fastapi import ( APIRouter, Depends, File, Form, HTTPException, + Query, UploadFile, ) @@ -94,8 +94,10 @@ async def upload_dataset_endpoint( def list_datasets_endpoint( _session: SessionDep, auth_context: AuthContextDep, - limit: int = 50, - offset: int = 0, + limit: int = Query( + default=50, ge=1, le=100, description="Maximum number of datasets to return" + ), + offset: int = Query(default=0, ge=0, description="Number of datasets to skip"), ) -> APIResponse[list[DatasetUploadResponse]]: """List evaluation datasets.""" # Enforce maximum limit diff --git a/backend/app/crud/evaluations/embeddings.py b/backend/app/crud/evaluations/embeddings.py index f7ba2283..17ead39a 100644 --- a/backend/app/crud/evaluations/embeddings.py +++ b/backend/app/crud/evaluations/embeddings.py @@ -81,7 +81,7 @@ def build_embedding_jsonl( validate_embedding_model(embedding_model) logger.info( - f"Building embedding JSONL for {len(results)} items with model {embedding_model}" + f"[build_embedding_jsonl] Building JSONL | items={len(results)} | model={embedding_model}" ) jsonl_data = [] diff --git a/backend/app/services/evaluations/validators.py b/backend/app/services/evaluations/validators.py index e4adc0b2..b30b63e6 100644 --- a/backend/app/services/evaluations/validators.py +++ b/backend/app/services/evaluations/validators.py @@ -82,6 +82,11 @@ async def validate_csv_file(file: UploadFile) -> bytes: HTTPException: If validation fails """ # Security validation: Check file extension + if not file.filename: + raise HTTPException( + status_code=422, + detail="File must have a filename", + ) file_ext = Path(file.filename).suffix.lower() if file_ext not in ALLOWED_EXTENSIONS: raise HTTPException( From 684a6efa19d872d44658bf6210da74c6580ee561 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 8 Jan 2026 16:51:49 +0530 Subject: [PATCH 8/9] coderabbit suggestion changes --- backend/app/api/routes/evaluations/dataset.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backend/app/api/routes/evaluations/dataset.py b/backend/app/api/routes/evaluations/dataset.py index 0eb3252b..52862699 100644 --- a/backend/app/api/routes/evaluations/dataset.py +++ b/backend/app/api/routes/evaluations/dataset.py @@ -100,10 +100,6 @@ def list_datasets_endpoint( offset: int = Query(default=0, ge=0, description="Number of datasets to skip"), ) -> APIResponse[list[DatasetUploadResponse]]: """List evaluation datasets.""" - # Enforce maximum limit - if limit > 100: - limit = 100 - datasets = list_datasets( session=_session, organization_id=auth_context.organization_.id, From b197a60205bfddbed7b0e6c9e1676ca333bfdfa9 Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Thu, 8 Jan 2026 19:46:22 +0530 Subject: [PATCH 9/9] updated testcases --- .../app/tests/api/routes/test_evaluation.py | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/backend/app/tests/api/routes/test_evaluation.py b/backend/app/tests/api/routes/test_evaluation.py index 00aef2f5..9daf2da7 100644 --- a/backend/app/tests/api/routes/test_evaluation.py +++ b/backend/app/tests/api/routes/test_evaluation.py @@ -54,13 +54,13 @@ def test_upload_dataset_valid_csv( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.services.evaluation.dataset.upload_csv_to_object_store" + "app.services.evaluations.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.services.evaluation.dataset.get_langfuse_client" + "app.services.evaluations.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.services.evaluation.dataset.upload_dataset_to_langfuse" + "app.services.evaluations.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): # Mock object store upload @@ -75,7 +75,7 @@ def test_upload_dataset_valid_csv( filename, file_obj = create_csv_file(valid_csv_content) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -116,7 +116,7 @@ def test_upload_dataset_missing_columns( # The CSV validation happens before any mocked functions are called # so this test checks the actual validation logic response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -140,13 +140,13 @@ def test_upload_dataset_empty_rows( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.services.evaluation.dataset.upload_csv_to_object_store" + "app.services.evaluations.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.services.evaluation.dataset.get_langfuse_client" + "app.services.evaluations.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.services.evaluation.dataset.upload_dataset_to_langfuse" + "app.services.evaluations.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): # Mock object store and Langfuse uploads @@ -157,7 +157,7 @@ def test_upload_dataset_empty_rows( filename, file_obj = create_csv_file(csv_with_empty_rows) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -186,13 +186,13 @@ def test_upload_with_default_duplication( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.services.evaluation.dataset.upload_csv_to_object_store" + "app.services.evaluations.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.services.evaluation.dataset.get_langfuse_client" + "app.services.evaluations.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.services.evaluation.dataset.upload_dataset_to_langfuse" + "app.services.evaluations.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -202,7 +202,7 @@ def test_upload_with_default_duplication( filename, file_obj = create_csv_file(valid_csv_content) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -227,13 +227,13 @@ def test_upload_with_custom_duplication( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.services.evaluation.dataset.upload_csv_to_object_store" + "app.services.evaluations.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.services.evaluation.dataset.get_langfuse_client" + "app.services.evaluations.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.services.evaluation.dataset.upload_dataset_to_langfuse" + "app.services.evaluations.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -243,7 +243,7 @@ def test_upload_with_custom_duplication( filename, file_obj = create_csv_file(valid_csv_content) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -268,13 +268,13 @@ def test_upload_with_description( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.services.evaluation.dataset.upload_csv_to_object_store" + "app.services.evaluations.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.services.evaluation.dataset.get_langfuse_client" + "app.services.evaluations.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.services.evaluation.dataset.upload_dataset_to_langfuse" + "app.services.evaluations.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -284,7 +284,7 @@ def test_upload_with_description( filename, file_obj = create_csv_file(valid_csv_content) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset_with_description", @@ -316,7 +316,7 @@ def test_upload_with_duplication_factor_below_minimum( filename, file_obj = create_csv_file(valid_csv_content) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -338,7 +338,7 @@ def test_upload_with_duplication_factor_above_maximum( filename, file_obj = create_csv_file(valid_csv_content) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -360,13 +360,13 @@ def test_upload_with_duplication_factor_boundary_minimum( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.services.evaluation.dataset.upload_csv_to_object_store" + "app.services.evaluations.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch( - "app.services.evaluation.dataset.get_langfuse_client" + "app.services.evaluations.dataset.get_langfuse_client" ) as mock_get_langfuse_client, patch( - "app.services.evaluation.dataset.upload_dataset_to_langfuse" + "app.services.evaluations.dataset.upload_dataset_to_langfuse" ) as mock_langfuse_upload, ): mock_store_upload.return_value = "s3://bucket/datasets/test_dataset.csv" @@ -376,7 +376,7 @@ def test_upload_with_duplication_factor_boundary_minimum( filename, file_obj = create_csv_file(valid_csv_content) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -405,7 +405,7 @@ def test_upload_langfuse_configuration_fails( with ( patch("app.core.cloud.get_cloud_storage") as _mock_storage, patch( - "app.services.evaluation.dataset.upload_csv_to_object_store" + "app.services.evaluations.dataset.upload_csv_to_object_store" ) as mock_store_upload, patch("app.crud.credentials.get_provider_credential") as mock_get_cred, ): @@ -417,7 +417,7 @@ def test_upload_langfuse_configuration_fails( filename, file_obj = create_csv_file(valid_csv_content) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -444,7 +444,7 @@ def test_upload_invalid_csv_format(self, client, user_api_key_header): filename, file_obj = create_csv_file(invalid_csv) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -470,7 +470,7 @@ def test_upload_without_authentication(self, client, valid_csv_content): filename, file_obj = create_csv_file(valid_csv_content) response = client.post( - "/api/v1/evaluations/datasets", + "/api/v1/evaluations/datasets/", files={"file": (filename, file_obj, "text/csv")}, data={ "dataset_name": "test_dataset", @@ -499,7 +499,7 @@ def test_start_batch_evaluation_invalid_dataset_id( """Test batch evaluation fails with invalid dataset_id.""" # Try to start evaluation with non-existent dataset_id response = client.post( - "/api/v1/evaluations", + "/api/v1/evaluations/", json={ "experiment_name": "test_evaluation_run", "dataset_id": 99999, # Non-existent @@ -525,7 +525,7 @@ def test_start_batch_evaluation_missing_model(self, client, user_api_key_header) } response = client.post( - "/api/v1/evaluations", + "/api/v1/evaluations/", json={ "experiment_name": "test_no_model", "dataset_id": 1, # Dummy ID, error should come before this is checked @@ -548,7 +548,7 @@ def test_start_batch_evaluation_without_authentication( ): """Test batch evaluation requires authentication.""" response = client.post( - "/api/v1/evaluations", + "/api/v1/evaluations/", json={ "experiment_name": "test_evaluation_run", "dataset_id": 1,