-
Notifications
You must be signed in to change notification settings - Fork 850
feat(tracing): Add general AssociationProperty #3484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
nina-kollman
wants to merge
14
commits into
main
Choose a base branch
from
nk/chat
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 12 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
9c34d29
added chat
nina-kollman 43d78b6
add the set conversation id
nina-kollman 70431e7
change test location
nina-kollman cd37f9a
add
nina-kollman a7ae838
remove script
nina-kollman 5d12479
Merge branch 'main' of https://github.com/traceloop/openllmetry into …
nina-kollman 045c862
added enum
nina-kollman f6f8d60
init file|
nina-kollman 36dc637
on client
nina-kollman c0b2818
change to class
nina-kollman 0e8ac3e
add init
nina-kollman 23d088f
trace
nina-kollman 645269c
add all
nina-kollman 505c403
remove all
nina-kollman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,227 @@ | ||
| import os | ||
| import uuid | ||
| from datetime import datetime | ||
| import google.genai as genai | ||
| from google.genai import types | ||
| from traceloop.sdk import Traceloop | ||
| from traceloop.sdk.associations import AssociationProperty | ||
| from traceloop.sdk.decorators import workflow | ||
|
|
||
| # Initialize Traceloop for observability | ||
| traceloop = Traceloop.init(app_name="gemini_chatbot") | ||
|
|
||
| # Initialize Gemini client | ||
| client = genai.Client(api_key=os.environ.get("GENAI_API_KEY")) | ||
|
|
||
|
|
||
| # Define tools for the chatbot | ||
| def get_weather(location: str) -> str: | ||
| """Get the current weather for a location.""" | ||
| # Simulated weather data | ||
| weather_data = { | ||
| "San Francisco": "Sunny, 68°F", | ||
| "New York": "Cloudy, 55°F", | ||
| "London": "Rainy, 52°F", | ||
| "Tokyo": "Clear, 62°F", | ||
| } | ||
| return weather_data.get(location, f"Weather data not available for {location}") | ||
|
|
||
|
|
||
| def get_current_time(timezone: str = "UTC") -> str: | ||
| """Get the current time in a specific timezone.""" | ||
| # Simplified - just return current UTC time | ||
| return f"Current time ({timezone}): {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | ||
|
|
||
|
|
||
| def search_knowledge_base(query: str) -> str: | ||
| """Search the knowledge base for information.""" | ||
| # Simulated knowledge base | ||
| knowledge = { | ||
| "company policy": "Our company policy includes 15 days of vacation per year.", | ||
| "support hours": "Support is available Monday-Friday, 9 AM - 5 PM EST.", | ||
| "pricing": "Our pricing starts at $29/month for the basic plan.", | ||
| } | ||
|
|
||
| for key, value in knowledge.items(): | ||
| if key in query.lower(): | ||
| return value | ||
|
|
||
| return "I couldn't find specific information about that in our knowledge base." | ||
|
|
||
|
|
||
| # Define function declarations for Gemini | ||
| weather_tool = types.Tool( | ||
| function_declarations=[ | ||
| types.FunctionDeclaration( | ||
| name="get_weather", | ||
| description="Get the current weather for a specific location", | ||
| parameters=types.Schema( | ||
| type=types.Type.OBJECT, | ||
| properties={ | ||
| "location": types.Schema( | ||
| type=types.Type.STRING, | ||
| description="The city name, e.g., 'San Francisco'" | ||
| ) | ||
| }, | ||
| required=["location"] | ||
| ) | ||
| ) | ||
| ] | ||
| ) | ||
|
|
||
| time_tool = types.Tool( | ||
| function_declarations=[ | ||
| types.FunctionDeclaration( | ||
| name="get_current_time", | ||
| description="Get the current time in a specific timezone", | ||
| parameters=types.Schema( | ||
| type=types.Type.OBJECT, | ||
| properties={ | ||
| "timezone": types.Schema( | ||
| type=types.Type.STRING, | ||
| description="The timezone name, e.g., 'UTC', 'PST'" | ||
| ) | ||
| }, | ||
| required=[] | ||
| ) | ||
| ) | ||
| ] | ||
| ) | ||
|
|
||
| knowledge_tool = types.Tool( | ||
| function_declarations=[ | ||
| types.FunctionDeclaration( | ||
| name="search_knowledge_base", | ||
| description="Search the company knowledge base for information about policies, support, or pricing", | ||
| parameters=types.Schema( | ||
| type=types.Type.OBJECT, | ||
| properties={ | ||
| "query": types.Schema( | ||
| type=types.Type.STRING, | ||
| description="The search query" | ||
| ) | ||
| }, | ||
| required=["query"] | ||
| ) | ||
| ) | ||
| ] | ||
| ) | ||
|
|
||
|
|
||
| # Function to execute tool calls | ||
| def execute_function(function_name: str, args: dict) -> str: | ||
| """Execute the requested function with given arguments.""" | ||
| if function_name == "get_weather": | ||
| return get_weather(args.get("location", "")) | ||
| elif function_name == "get_current_time": | ||
| return get_current_time(args.get("timezone", "UTC")) | ||
| elif function_name == "search_knowledge_base": | ||
| return search_knowledge_base(args.get("query", "")) | ||
| else: | ||
| return f"Unknown function: {function_name}" | ||
|
|
||
|
|
||
| @workflow("chatbot_conversation") | ||
| def process_message(conversation_id: str, user_message: str, conversation_history: list) -> tuple[str, list]: | ||
| """Process a single message with tool support and chat_id association.""" | ||
|
|
||
| # Set a conversation_id to identify the conversation using the associations API | ||
| traceloop.associations.set([(AssociationProperty.CONVERSATION_ID, conversation_id)]) | ||
|
|
||
| # Add user message to conversation history | ||
| conversation_history.append({ | ||
| "role": "user", | ||
| "parts": [{"text": user_message}] | ||
| }) | ||
|
|
||
| # Keep trying until we get a final response (handle tool calls) | ||
| while True: | ||
| # Generate content with tools | ||
| response = client.models.generate_content( | ||
| model="gemini-2.0-flash-exp", | ||
| contents=conversation_history, | ||
| config=types.GenerateContentConfig( | ||
| tools=[weather_tool, time_tool, knowledge_tool], | ||
| temperature=0.7, | ||
| ) | ||
| ) | ||
|
|
||
| # Check if the model wants to use a tool | ||
| if response.candidates[0].content.parts[0].function_call: | ||
| function_call = response.candidates[0].content.parts[0].function_call | ||
| function_name = function_call.name | ||
| function_args = dict(function_call.args) | ||
|
|
||
| print(f"[Tool Call]: {function_name}({function_args})") | ||
|
|
||
| # Execute the function | ||
| function_result = execute_function(function_name, function_args) | ||
| print(f"[Tool Result]: {function_result}") | ||
|
|
||
| # Add the model's function call to history | ||
| conversation_history.append({ | ||
| "role": "model", | ||
| "parts": [{"function_call": function_call}] | ||
| }) | ||
|
|
||
| # Add the function result to history | ||
| conversation_history.append({ | ||
| "role": "user", | ||
| "parts": [{ | ||
| "function_response": types.FunctionResponse( | ||
| name=function_name, | ||
| response={"result": function_result} | ||
| ) | ||
| }] | ||
| }) | ||
| else: | ||
| # Got a text response, we're done with this turn | ||
| assistant_message = response.text | ||
|
|
||
| # Add assistant response to conversation history | ||
| conversation_history.append({ | ||
| "role": "model", | ||
| "parts": [{"text": assistant_message}] | ||
| }) | ||
|
|
||
| return assistant_message, conversation_history | ||
|
|
||
|
|
||
| def main(): | ||
| """Main function for interactive chatbot.""" | ||
|
|
||
| # Generate a unique chat_id for this conversation | ||
| chat_id = str(uuid.uuid4()) | ||
|
|
||
| print(f"Starting chatbot conversation (Chat ID: {chat_id})") | ||
| print("Type 'exit', 'quit', or 'bye' to end the conversation") | ||
| print("=" * 80) | ||
|
|
||
| conversation_history = [] | ||
|
|
||
| while True: | ||
| # Get user input | ||
| user_message = input("\nYou: ").strip() | ||
|
|
||
| # Check for exit commands | ||
| if user_message.lower() in ['exit', 'quit', 'bye']: | ||
| print("\nGoodbye! Chat session ended.") | ||
| break | ||
|
|
||
| # Skip empty messages | ||
| if not user_message: | ||
| continue | ||
|
|
||
| # Process the message | ||
| try: | ||
| assistant_message, conversation_history = process_message( | ||
| chat_id, user_message, conversation_history | ||
| ) | ||
| print(f"\nAssistant: {assistant_message}") | ||
| except Exception as e: | ||
| print(f"\nError: {e}") | ||
| print("Please try again.") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,171 @@ | ||
| import pytest | ||
| from traceloop.sdk import Traceloop, AssociationProperty | ||
| from traceloop.sdk.decorators import task, workflow | ||
| from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def client_with_exporter(): | ||
| """ | ||
| Fixture that initializes Traceloop with API key. | ||
| Client is only created when NO custom exporter/processor is provided. | ||
| """ | ||
| # Initialize with API key and Traceloop endpoint - this creates a client | ||
| client = Traceloop.init( | ||
| app_name="test_associations", | ||
| api_key="test-api-key", | ||
| api_endpoint="https://api.traceloop.com", | ||
| disable_batch=True, | ||
| # NO exporter or processor - so client gets created | ||
| ) | ||
|
|
||
| # Get spans from the tracer provider for assertions | ||
| from opentelemetry import trace | ||
| from opentelemetry.sdk.trace import TracerProvider | ||
|
|
||
| tracer_provider = trace.get_tracer_provider() | ||
| if isinstance(tracer_provider, TracerProvider): | ||
| # Get the span processor's exporter | ||
| span_processors = tracer_provider._active_span_processor._span_processors | ||
| # Find the exporter from the processors | ||
| for processor in span_processors: | ||
| if hasattr(processor, 'span_exporter'): | ||
| exporter = processor.span_exporter | ||
| break | ||
| else: | ||
| # Fallback: create a mock exporter | ||
| exporter = InMemorySpanExporter() | ||
| else: | ||
| exporter = InMemorySpanExporter() | ||
|
|
||
| yield client, exporter | ||
|
|
||
| # Cleanup | ||
| if hasattr(exporter, 'clear'): | ||
| exporter.clear() | ||
|
|
||
nina-kollman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def test_associations_create_single(client_with_exporter): | ||
| """Test creating a single association.""" | ||
| client, exporter = client_with_exporter | ||
|
|
||
| @workflow(name="test_single_association") | ||
| def test_workflow(): | ||
| return test_task() | ||
|
|
||
| @task(name="test_single_task") | ||
| def test_task(): | ||
| return | ||
|
|
||
| client.associations.set([(AssociationProperty.CONVERSATION_ID, "conv-123")]) | ||
| test_workflow() | ||
|
|
||
| spans = exporter.get_finished_spans() | ||
| assert [span.name for span in spans] == [ | ||
| "test_single_task.task", | ||
| "test_single_association.workflow", | ||
| ] | ||
|
|
||
| task_span = spans[0] | ||
| workflow_span = spans[1] | ||
|
|
||
| assert workflow_span.attributes["conversation_id"] == "conv-123" | ||
| assert task_span.attributes["conversation_id"] == "conv-123" | ||
|
|
||
|
|
||
| def test_associations_create_multiple(client_with_exporter): | ||
| """Test creating multiple associations at once.""" | ||
| client, exporter = client_with_exporter | ||
|
|
||
| @workflow(name="test_multiple_associations") | ||
| def test_workflow(): | ||
| return test_task() | ||
|
|
||
| @task(name="test_multiple_task") | ||
| def test_task(): | ||
| return | ||
|
|
||
| client.associations.set([ | ||
| (AssociationProperty.USER_ID, "user-456"), | ||
| (AssociationProperty.SESSION_ID, "session-789"), | ||
| (AssociationProperty.CUSTOMER_ID, "customer-999"), | ||
| ]) | ||
| test_workflow() | ||
|
|
||
| spans = exporter.get_finished_spans() | ||
| assert [span.name for span in spans] == [ | ||
| "test_multiple_task.task", | ||
| "test_multiple_associations.workflow", | ||
| ] | ||
|
|
||
| task_span = spans[0] | ||
| workflow_span = spans[1] | ||
|
|
||
| # Check all associations are present | ||
| assert workflow_span.attributes["user_id"] == "user-456" | ||
| assert workflow_span.attributes["session_id"] == "session-789" | ||
| assert workflow_span.attributes["customer_id"] == "customer-999" | ||
|
|
||
| assert task_span.attributes["user_id"] == "user-456" | ||
| assert task_span.attributes["session_id"] == "session-789" | ||
| assert task_span.attributes["customer_id"] == "customer-999" | ||
|
|
||
|
|
||
| def test_associations_within_workflow(client_with_exporter): | ||
| """Test creating associations within a workflow.""" | ||
| client, exporter = client_with_exporter | ||
|
|
||
| @workflow(name="test_associations_within") | ||
| def test_workflow(): | ||
| client.associations.set([ | ||
| (AssociationProperty.CONVERSATION_ID, "conv-abc"), | ||
| (AssociationProperty.USER_ID, "user-xyz"), | ||
| ]) | ||
| return test_task() | ||
|
|
||
| @task(name="test_within_task") | ||
| def test_task(): | ||
| return | ||
|
|
||
| test_workflow() | ||
|
|
||
| spans = exporter.get_finished_spans() | ||
| assert [span.name for span in spans] == [ | ||
| "test_within_task.task", | ||
| "test_associations_within.workflow", | ||
| ] | ||
|
|
||
| task_span = spans[0] | ||
| workflow_span = spans[1] | ||
|
|
||
| # Both spans should have all associations | ||
| assert workflow_span.attributes["conversation_id"] == "conv-abc" | ||
| assert workflow_span.attributes["user_id"] == "user-xyz" | ||
|
|
||
| assert task_span.attributes["conversation_id"] == "conv-abc" | ||
| assert task_span.attributes["user_id"] == "user-xyz" | ||
|
|
||
|
|
||
| def test_all_association_properties(client_with_exporter): | ||
| """Test that all AssociationProperty enum values work correctly.""" | ||
| client, exporter = client_with_exporter | ||
|
|
||
| @workflow(name="test_all_properties") | ||
| def test_workflow(): | ||
| return | ||
|
|
||
| client.associations.set([ | ||
| (AssociationProperty.CONVERSATION_ID, "conv-1"), | ||
| (AssociationProperty.CUSTOMER_ID, "customer-2"), | ||
| (AssociationProperty.USER_ID, "user-3"), | ||
| (AssociationProperty.SESSION_ID, "session-4"), | ||
| ]) | ||
| test_workflow() | ||
|
|
||
| spans = exporter.get_finished_spans() | ||
| workflow_span = spans[0] | ||
|
|
||
| assert workflow_span.attributes["conversation_id"] == "conv-1" | ||
| assert workflow_span.attributes["customer_id"] == "customer-2" | ||
| assert workflow_span.attributes["user_id"] == "user-3" | ||
| assert workflow_span.attributes["session_id"] == "session-4" | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.