diff --git a/packages/sample-app/sample_app/chats/gemini_chatbot.py b/packages/sample-app/sample_app/chats/gemini_chatbot.py new file mode 100644 index 0000000000..3b4f931239 --- /dev/null +++ b/packages/sample-app/sample_app/chats/gemini_chatbot.py @@ -0,0 +1,227 @@ +import os +import uuid +from datetime import datetime +import google.genai as genai +from google.genai import types +from traceloop.sdk import Traceloop +from traceloop.sdk.associations import AssociationProperty +from traceloop.sdk.decorators import workflow + +# Initialize Traceloop for observability +traceloop = Traceloop.init(app_name="gemini_chatbot") + +# Initialize Gemini client +client = genai.Client(api_key=os.environ.get("GENAI_API_KEY")) + + +# Define tools for the chatbot +def get_weather(location: str) -> str: + """Get the current weather for a location.""" + # Simulated weather data + weather_data = { + "San Francisco": "Sunny, 68°F", + "New York": "Cloudy, 55°F", + "London": "Rainy, 52°F", + "Tokyo": "Clear, 62°F", + } + return weather_data.get(location, f"Weather data not available for {location}") + + +def get_current_time(timezone: str = "UTC") -> str: + """Get the current time in a specific timezone.""" + # Simplified - just return current UTC time + return f"Current time ({timezone}): {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" + + +def search_knowledge_base(query: str) -> str: + """Search the knowledge base for information.""" + # Simulated knowledge base + knowledge = { + "company policy": "Our company policy includes 15 days of vacation per year.", + "support hours": "Support is available Monday-Friday, 9 AM - 5 PM EST.", + "pricing": "Our pricing starts at $29/month for the basic plan.", + } + + for key, value in knowledge.items(): + if key in query.lower(): + return value + + return "I couldn't find specific information about that in our knowledge base." + + +# Define function declarations for Gemini +weather_tool = types.Tool( + function_declarations=[ + types.FunctionDeclaration( + name="get_weather", + description="Get the current weather for a specific location", + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "location": types.Schema( + type=types.Type.STRING, + description="The city name, e.g., 'San Francisco'" + ) + }, + required=["location"] + ) + ) + ] +) + +time_tool = types.Tool( + function_declarations=[ + types.FunctionDeclaration( + name="get_current_time", + description="Get the current time in a specific timezone", + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "timezone": types.Schema( + type=types.Type.STRING, + description="The timezone name, e.g., 'UTC', 'PST'" + ) + }, + required=[] + ) + ) + ] +) + +knowledge_tool = types.Tool( + function_declarations=[ + types.FunctionDeclaration( + name="search_knowledge_base", + description="Search the company knowledge base for information about policies, support, or pricing", + parameters=types.Schema( + type=types.Type.OBJECT, + properties={ + "query": types.Schema( + type=types.Type.STRING, + description="The search query" + ) + }, + required=["query"] + ) + ) + ] +) + + +# Function to execute tool calls +def execute_function(function_name: str, args: dict) -> str: + """Execute the requested function with given arguments.""" + if function_name == "get_weather": + return get_weather(args.get("location", "")) + elif function_name == "get_current_time": + return get_current_time(args.get("timezone", "UTC")) + elif function_name == "search_knowledge_base": + return search_knowledge_base(args.get("query", "")) + else: + return f"Unknown function: {function_name}" + + +@workflow("chatbot_conversation") +def process_message(conversation_id: str, user_message: str, conversation_history: list) -> tuple[str, list]: + """Process a single message with tool support and chat_id association.""" + + # Set a conversation_id to identify the conversation using the associations API + traceloop.associations.set([(AssociationProperty.CONVERSATION_ID, conversation_id)]) + + # Add user message to conversation history + conversation_history.append({ + "role": "user", + "parts": [{"text": user_message}] + }) + + # Keep trying until we get a final response (handle tool calls) + while True: + # Generate content with tools + response = client.models.generate_content( + model="gemini-2.0-flash-exp", + contents=conversation_history, + config=types.GenerateContentConfig( + tools=[weather_tool, time_tool, knowledge_tool], + temperature=0.7, + ) + ) + + # Check if the model wants to use a tool + if response.candidates[0].content.parts[0].function_call: + function_call = response.candidates[0].content.parts[0].function_call + function_name = function_call.name + function_args = dict(function_call.args) + + print(f"[Tool Call]: {function_name}({function_args})") + + # Execute the function + function_result = execute_function(function_name, function_args) + print(f"[Tool Result]: {function_result}") + + # Add the model's function call to history + conversation_history.append({ + "role": "model", + "parts": [{"function_call": function_call}] + }) + + # Add the function result to history + conversation_history.append({ + "role": "user", + "parts": [{ + "function_response": types.FunctionResponse( + name=function_name, + response={"result": function_result} + ) + }] + }) + else: + # Got a text response, we're done with this turn + assistant_message = response.text + + # Add assistant response to conversation history + conversation_history.append({ + "role": "model", + "parts": [{"text": assistant_message}] + }) + + return assistant_message, conversation_history + + +def main(): + """Main function for interactive chatbot.""" + + # Generate a unique chat_id for this conversation + chat_id = str(uuid.uuid4()) + + print(f"Starting chatbot conversation (Chat ID: {chat_id})") + print("Type 'exit', 'quit', or 'bye' to end the conversation") + print("=" * 80) + + conversation_history = [] + + while True: + # Get user input + user_message = input("\nYou: ").strip() + + # Check for exit commands + if user_message.lower() in ['exit', 'quit', 'bye']: + print("\nGoodbye! Chat session ended.") + break + + # Skip empty messages + if not user_message: + continue + + # Process the message + try: + assistant_message, conversation_history = process_message( + chat_id, user_message, conversation_history + ) + print(f"\nAssistant: {assistant_message}") + except Exception as e: + print(f"\nError: {e}") + print("Please try again.") + + +if __name__ == "__main__": + main() diff --git a/packages/traceloop-sdk/tests/test_associations.py b/packages/traceloop-sdk/tests/test_associations.py new file mode 100644 index 0000000000..eff35e773f --- /dev/null +++ b/packages/traceloop-sdk/tests/test_associations.py @@ -0,0 +1,171 @@ +import pytest +from traceloop.sdk import Traceloop, AssociationProperty +from traceloop.sdk.decorators import task, workflow +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + + +@pytest.fixture +def client_with_exporter(): + """ + Fixture that initializes Traceloop with API key. + Client is only created when NO custom exporter/processor is provided. + """ + # Initialize with API key and Traceloop endpoint - this creates a client + client = Traceloop.init( + app_name="test_associations", + api_key="test-api-key", + api_endpoint="https://api.traceloop.com", + disable_batch=True, + # NO exporter or processor - so client gets created + ) + + # Get spans from the tracer provider for assertions + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + + tracer_provider = trace.get_tracer_provider() + if isinstance(tracer_provider, TracerProvider): + # Get the span processor's exporter + span_processors = tracer_provider._active_span_processor._span_processors + # Find the exporter from the processors + for processor in span_processors: + if hasattr(processor, 'span_exporter'): + exporter = processor.span_exporter + break + else: + # Fallback: create a mock exporter + exporter = InMemorySpanExporter() + else: + exporter = InMemorySpanExporter() + + yield client, exporter + + # Cleanup + if hasattr(exporter, 'clear'): + exporter.clear() + + +def test_associations_create_single(client_with_exporter): + """Test creating a single association.""" + client, exporter = client_with_exporter + + @workflow(name="test_single_association") + def test_workflow(): + return test_task() + + @task(name="test_single_task") + def test_task(): + return + + client.associations.set([(AssociationProperty.CONVERSATION_ID, "conv-123")]) + test_workflow() + + spans = exporter.get_finished_spans() + assert [span.name for span in spans] == [ + "test_single_task.task", + "test_single_association.workflow", + ] + + task_span = spans[0] + workflow_span = spans[1] + + assert workflow_span.attributes["conversation_id"] == "conv-123" + assert task_span.attributes["conversation_id"] == "conv-123" + + +def test_associations_create_multiple(client_with_exporter): + """Test creating multiple associations at once.""" + client, exporter = client_with_exporter + + @workflow(name="test_multiple_associations") + def test_workflow(): + return test_task() + + @task(name="test_multiple_task") + def test_task(): + return + + client.associations.set([ + (AssociationProperty.USER_ID, "user-456"), + (AssociationProperty.SESSION_ID, "session-789"), + (AssociationProperty.CUSTOMER_ID, "customer-999"), + ]) + test_workflow() + + spans = exporter.get_finished_spans() + assert [span.name for span in spans] == [ + "test_multiple_task.task", + "test_multiple_associations.workflow", + ] + + task_span = spans[0] + workflow_span = spans[1] + + # Check all associations are present + assert workflow_span.attributes["user_id"] == "user-456" + assert workflow_span.attributes["session_id"] == "session-789" + assert workflow_span.attributes["customer_id"] == "customer-999" + + assert task_span.attributes["user_id"] == "user-456" + assert task_span.attributes["session_id"] == "session-789" + assert task_span.attributes["customer_id"] == "customer-999" + + +def test_associations_within_workflow(client_with_exporter): + """Test creating associations within a workflow.""" + client, exporter = client_with_exporter + + @workflow(name="test_associations_within") + def test_workflow(): + client.associations.set([ + (AssociationProperty.CONVERSATION_ID, "conv-abc"), + (AssociationProperty.USER_ID, "user-xyz"), + ]) + return test_task() + + @task(name="test_within_task") + def test_task(): + return + + test_workflow() + + spans = exporter.get_finished_spans() + assert [span.name for span in spans] == [ + "test_within_task.task", + "test_associations_within.workflow", + ] + + task_span = spans[0] + workflow_span = spans[1] + + # Both spans should have all associations + assert workflow_span.attributes["conversation_id"] == "conv-abc" + assert workflow_span.attributes["user_id"] == "user-xyz" + + assert task_span.attributes["conversation_id"] == "conv-abc" + assert task_span.attributes["user_id"] == "user-xyz" + + +def test_all_association_properties(client_with_exporter): + """Test that all AssociationProperty enum values work correctly.""" + client, exporter = client_with_exporter + + @workflow(name="test_all_properties") + def test_workflow(): + return + + client.associations.set([ + (AssociationProperty.CONVERSATION_ID, "conv-1"), + (AssociationProperty.CUSTOMER_ID, "customer-2"), + (AssociationProperty.USER_ID, "user-3"), + (AssociationProperty.SESSION_ID, "session-4"), + ]) + test_workflow() + + spans = exporter.get_finished_spans() + workflow_span = spans[0] + + assert workflow_span.attributes["conversation_id"] == "conv-1" + assert workflow_span.attributes["customer_id"] == "customer-2" + assert workflow_span.attributes["user_id"] == "user-3" + assert workflow_span.attributes["session_id"] == "session-4" diff --git a/packages/traceloop-sdk/traceloop/sdk/__init__.py b/packages/traceloop-sdk/traceloop/sdk/__init__.py index 9a64f36c26..018ffb5113 100644 --- a/packages/traceloop-sdk/traceloop/sdk/__init__.py +++ b/packages/traceloop-sdk/traceloop/sdk/__init__.py @@ -33,6 +33,7 @@ from traceloop.sdk.client.client import Client + class Traceloop: AUTO_CREATED_KEY_PATH = str( Path.home() / ".cache" / "traceloop" / "auto_created_key" diff --git a/packages/traceloop-sdk/traceloop/sdk/associations/__init__.py b/packages/traceloop-sdk/traceloop/sdk/associations/__init__.py new file mode 100644 index 0000000000..6ecded6e9d --- /dev/null +++ b/packages/traceloop-sdk/traceloop/sdk/associations/__init__.py @@ -0,0 +1,7 @@ +from traceloop.sdk.associations.associations import ( + Associations, + AssociationProperty, + Association, +) + +__all__ = ["Associations", "AssociationProperty", "Association"] diff --git a/packages/traceloop-sdk/traceloop/sdk/associations/associations.py b/packages/traceloop-sdk/traceloop/sdk/associations/associations.py new file mode 100644 index 0000000000..1f939004dd --- /dev/null +++ b/packages/traceloop-sdk/traceloop/sdk/associations/associations.py @@ -0,0 +1,52 @@ +from enum import Enum +from typing import Sequence +from opentelemetry import trace +from opentelemetry.context import attach, set_value, get_value + + +class AssociationProperty(str, Enum): + """Standard association properties for tracing.""" + + CONVERSATION_ID = "conversation_id" + CUSTOMER_ID = "customer_id" + USER_ID = "user_id" + SESSION_ID = "session_id" + + +# Type alias for a single association +Association = tuple[AssociationProperty, str] + + +class Associations: + """Class for managing trace associations.""" + + @staticmethod + def set(associations: Sequence[Association]) -> None: + """ + Set associations that will be added directly to all spans in the current context. + + Args: + associations: A sequence of (property, value) tuples + + Example: + # Single association + traceloop.associations.set([(AssociationProperty.CONVERSATION_ID, "conv-123")]) + + # Multiple associations + traceloop.associations.set([ + (AssociationProperty.USER_ID, "user-456"), + (AssociationProperty.SESSION_ID, "session-789") + ]) + """ + # Store all associations in context + current_associations: dict[str, str] = get_value("associations") or {} # type: ignore + for prop, value in associations: + current_associations[prop.value] = value + + attach(set_value("associations", current_associations)) + + # Also set directly on the current span + span = trace.get_current_span() + if span and span.is_recording(): + for prop, value in associations: + span.set_attribute(prop.value, value) diff --git a/packages/traceloop-sdk/traceloop/sdk/client/client.py b/packages/traceloop-sdk/traceloop/sdk/client/client.py index b3d2510f63..42e4bd23da 100644 --- a/packages/traceloop-sdk/traceloop/sdk/client/client.py +++ b/packages/traceloop-sdk/traceloop/sdk/client/client.py @@ -6,6 +6,7 @@ from traceloop.sdk.experiment.experiment import Experiment from traceloop.sdk.client.http import HTTPClient from traceloop.sdk.version import __version__ +from traceloop.sdk.associations.associations import Associations import httpx @@ -25,6 +26,7 @@ class Client: user_feedback: UserFeedback datasets: Datasets experiment: Experiment + associations: Associations _http: HTTPClient _async_http: httpx.AsyncClient @@ -65,3 +67,4 @@ def __init__( experiment_slug = os.getenv("TRACELOOP_EXP_SLUG") # TODO: Fix type - Experiment constructor should accept Optional[str] self.experiment = Experiment(self._http, self._async_http, experiment_slug) # type: ignore[arg-type] + self.associations = Associations() diff --git a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py index c7299f354a..d6e2f0a7f4 100644 --- a/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py +++ b/packages/traceloop-sdk/traceloop/sdk/tracing/tracing.py @@ -313,6 +313,12 @@ def default_span_processor_on_start(span: Span, parent_context: Context | None = if entity_path is not None: span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_PATH, str(entity_path)) + # Handle associations + associations = get_value("associations") + if associations is not None: + for key, value in associations.items(): + span.set_attribute(key, str(value)) + association_properties = get_value("association_properties") if association_properties is not None and isinstance(association_properties, dict): _set_association_properties_attributes(span, association_properties)