Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,26 @@ def _emit_tool_call_event(
attributes[GEN_AI_EVENT_CONTENT] = json.dumps(content_array, ensure_ascii=False)
span.span_instance.add_event(name=GEN_AI_ASSISTANT_MESSAGE_EVENT, attributes=attributes)

def _emit_tool_output_event(
self,
span: "AbstractSpan",
tool_output: Dict[str, Any],
conversation_id: Optional[str] = None,
) -> None:
"""Helper to emit a single tool output event."""
# Wrap tool output in parts array
# Tool outputs are inputs TO the model (from tool execution), so use role "tool"
parts = [{"type": "tool_call_output", "content": tool_output}]
content_array = [{"role": "tool", "parts": parts}]
attributes = self._create_event_attributes(
conversation_id=conversation_id,
message_role="tool",
)
# Store as JSON array directly without outer wrapper
attributes[GEN_AI_EVENT_CONTENT] = json.dumps(content_array, ensure_ascii=False)
# Tool outputs are inputs to the model, so use input.messages event
span.span_instance.add_event(name=GEN_AI_USER_MESSAGE_EVENT, attributes=attributes)

def _add_tool_call_events( # pylint: disable=too-many-branches
self,
span: "AbstractSpan",
Expand All @@ -1002,9 +1022,11 @@ def _add_tool_call_events( # pylint: disable=too-many-branches
if not output:
return

# Process output items for tool call events
for output_item in output:
try:
item_type = getattr(output_item, "type", None)
# Process item based on type
if not item_type:
continue

Expand Down Expand Up @@ -1331,7 +1353,9 @@ def _add_tool_call_events( # pylint: disable=too-many-branches
self._emit_tool_call_event(span, tool_call, conversation_id)

# Handle unknown/future tool call types with best effort
elif item_type and "_call" in item_type:
# Exclude _output types - those are handled separately as tool outputs, not tool calls
elif item_type and "_call" in item_type and not item_type.endswith("_output"):
# Generic handler for tool calls
try:
tool_call = {
"type": item_type,
Expand All @@ -1345,8 +1369,18 @@ def _add_tool_call_events( # pylint: disable=too-many-branches

# Only include detailed fields if content recording is enabled
if _trace_responses_content:
# Try to get the full tool details using as_dict() if available
if hasattr(output_item, "as_dict"):
# Try to get the full tool details using model_dump() for Pydantic models
if hasattr(output_item, "model_dump"):
tool_dict = output_item.model_dump()
# Extract the tool-specific details (exclude common fields already captured)
for key, value in tool_dict.items():
if (
key
not in ["type", "id", "call_id", "role", "content", "status", "partition_key"]
and value is not None
):
tool_call[key] = value
elif hasattr(output_item, "as_dict"):
tool_dict = output_item.as_dict()
# Extract the tool-specific details (exclude common fields already captured)
for key, value in tool_dict.items():
Expand All @@ -1373,6 +1407,64 @@ def _add_tool_call_events( # pylint: disable=too-many-branches
# Log but don't crash if we can't handle an unknown tool type
logger.debug(f"Failed to process unknown tool call type '{item_type}': {e}")

# Handle unknown/future tool output types with best effort
# These are the _output types that correspond to the tool calls above
elif item_type and item_type.endswith("_output"):
# Generic handler for tool outputs
try:
tool_output = {
"type": item_type,
}

# Always try to include common ID fields (safe, needed for correlation)
for id_field in ["id", "call_id"]:
if hasattr(output_item, id_field):
tool_output["id"] = getattr(output_item, id_field)
break # Use first available ID field

# Only include detailed fields if content recording is enabled
if _trace_responses_content:
# Try to get the full tool output using model_dump() for Pydantic models
if hasattr(output_item, "model_dump"):
output_dict = output_item.model_dump()
# Extract the tool-specific output (exclude common fields already captured)
# Include fields even if empty string (but not None) for API consistency
for key, value in output_dict.items():
if (
key
not in ["type", "id", "call_id", "role", "content", "status", "partition_key"]
and value is not None
):
tool_output[key] = value
elif hasattr(output_item, "as_dict"):
output_dict = output_item.as_dict()
# Extract the tool-specific output (exclude common fields already captured)
for key, value in output_dict.items():
if (
key not in ["type", "id", "call_id", "role", "content", "status"]
and value is not None
):
tool_output[key] = value
else:
# Fallback: try to capture common output fields manually
for field in [
"output",
"result",
"results",
"data",
"response",
]:
if hasattr(output_item, field):
value = getattr(output_item, field)
if value is not None:
tool_output[field] = value

self._emit_tool_output_event(span, tool_output, conversation_id)

except Exception as e:
# Log but don't crash if we can't handle an unknown tool output type
logger.debug(f"Failed to process unknown tool output type '{item_type}': {e}")

except Exception as e:
# Catch-all to prevent any tool call processing errors from breaking instrumentation
logger.debug(f"Error processing tool call events: {e}")
Expand Down Expand Up @@ -2241,7 +2333,8 @@ def __init__(
self.finish_reason = None # Track finish_reason from streaming chunks

# Track all output items from streaming events (tool calls, workflow actions, etc.)
self.output_items = {} # Dict[item_id, output_item] - keyed by call_id, action_id, or id
# Use (id, type) as key to avoid overwriting when call and output have same ID
self.output_items = {} # Dict[(item_id, item_type), output_item]
self.has_output_items = False

# Expose response attribute for compatibility with ResponseStreamManager
Expand Down Expand Up @@ -2302,8 +2395,11 @@ def process_chunk(self, chunk):
or getattr(item, "id", None)
)
if item_id:
self.output_items[item_id] = item
# Use (id, type) tuple as key to distinguish call from output
key = (item_id, item_type)
self.output_items[key] = item
self.has_output_items = True
# Items without ID or type are skipped

# Capture response ID from ResponseCreatedEvent or ResponseCompletedEvent
if chunk_type == "response.created" and hasattr(chunk, "response"):
Expand Down Expand Up @@ -2483,6 +2579,7 @@ def __init__(
self.span_ended = True

def __iter__(self):
# Start streaming iteration
return self

def __next__(self):
Expand Down Expand Up @@ -2713,7 +2810,8 @@ def __init__(
self.finish_reason = None # Track finish_reason from streaming chunks

# Track all output items from streaming events (tool calls, workflow actions, etc.)
self.output_items = {} # Dict[item_id, output_item] - keyed by call_id, action_id, or id
# Use (id, type) as key to avoid overwriting when call and output have same ID
self.output_items = {} # Dict[(item_id, item_type), output_item]
self.has_output_items = False

# Expose response attribute for compatibility with AsyncResponseStreamManager
Expand Down Expand Up @@ -2776,8 +2874,10 @@ def process_chunk(self, chunk):
or getattr(item, "id", None)
)
if item_id:
self.output_items[item_id] = item
# Use (id, type) tuple as key to distinguish call from output
self.output_items[(item_id, item_type)] = item
self.has_output_items = True
# Items without ID or type are skipped

# Capture response ID from ResponseCreatedEvent or ResponseCompletedEvent
if chunk_type == "response.created" and hasattr(chunk, "response"):
Expand Down Expand Up @@ -3692,8 +3792,8 @@ def _add_conversation_item_event( # pylint: disable=too-many-branches,too-many-

event_name = GEN_AI_CONVERSATION_ITEM_EVENT

elif item_type == "remote_function_call_output":
# Remote function call output (like Azure AI Search)
elif item_type == "remote_function_call":
# Remote function call (like Bing Custom Search call)
role = "assistant" # Override role for remote function calls

# Extract the tool name
Expand All @@ -3718,8 +3818,12 @@ def _add_conversation_item_event( # pylint: disable=too-many-branches,too-many-
# Extract data from model_extra if available (Pydantic v2 style)
if hasattr(item, "model_extra") and isinstance(item.model_extra, dict):
for key, value in item.model_extra.items():
# Skip already captured fields, redundant fields (name, label), and empty/None values
if key not in ["type", "id", "call_id", "name", "label"] and value is not None and value != "":
# Skip already captured fields, redundant fields (name, label), internal fields (partition_key), and empty/None values
if (
key not in ["type", "id", "call_id", "name", "label", "partition_key"]
and value is not None
and value != ""
):
tool_call[key] = value

# Also try as_dict if available
Expand Down Expand Up @@ -3748,8 +3852,7 @@ def _add_conversation_item_event( # pylint: disable=too-many-branches,too-many-
# Fallback: try common fields directly (skip if empty and skip redundant name/label)
for field in [
"input",
"output",
"results",
"arguments",
"status",
"error",
"search_query",
Expand All @@ -3770,6 +3873,88 @@ def _add_conversation_item_event( # pylint: disable=too-many-branches,too-many-

event_name = GEN_AI_CONVERSATION_ITEM_EVENT

elif item_type == "remote_function_call_output":
# Remote function call output (like Bing Custom Search output)
role = "tool" # Tool outputs use role "tool"

# Extract the tool name
tool_name = getattr(item, "name", None) if hasattr(item, "name") else None

tool_output = {
"type": tool_name if tool_name else "remote_function",
}

# Always include ID (needed for correlation)
if hasattr(item, "id"):
tool_output["id"] = item.id
elif hasattr(item, "call_id"):
tool_output["id"] = item.call_id
# Check model_extra for call_id
elif hasattr(item, "model_extra") and isinstance(item.model_extra, dict):
if "call_id" in item.model_extra:
tool_output["id"] = item.model_extra["call_id"]

# Only include tool details if content recording is enabled
if _trace_responses_content:
# Extract data from model_extra if available (Pydantic v2 style)
if hasattr(item, "model_extra") and isinstance(item.model_extra, dict):
for key, value in item.model_extra.items():
# Skip already captured fields, redundant fields (name, label), internal fields (partition_key), and empty/None values
if (
key not in ["type", "id", "call_id", "name", "label", "partition_key"]
and value is not None
and value != ""
):
tool_output[key] = value

# Also try as_dict if available
if hasattr(item, "as_dict"):
try:
tool_dict = item.as_dict()
# Extract relevant fields (exclude already captured ones and empty/None values)
for key, value in tool_dict.items():
if key not in [
"type",
"id",
"call_id",
"name",
"label",
"role",
"content",
]:
# Skip empty strings and None values
if value is not None and value != "":
# Don't overwrite if already exists
if key not in tool_output:
tool_output[key] = value
except Exception as e:
logger.debug(f"Failed to extract data from as_dict: {e}")

# Fallback: try common fields directly (skip if empty and skip redundant name/label)
for field in [
"input",
"output",
"results",
"status",
"error",
"search_query",
"query",
]:
if hasattr(item, field):
try:
value = getattr(item, field)
if value is not None and value != "":
# If not already in tool_output, add it
if field not in tool_output:
tool_output[field] = value
except Exception:
pass

# Tool outputs use tool_call_output type in parts
event_body = [{"role": role, "parts": [{"type": "tool_call_output", "content": tool_output}]}]

event_name = GEN_AI_CONVERSATION_ITEM_EVENT

elif item_type == "workflow_action":
# Workflow action item - include workflow execution details
role = "workflow"
Expand Down