Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions newrelic/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -1368,12 +1368,6 @@ def harvest(self, shutdown=False, flexible=False):
spans_sampled = spans.num_samples
internal_count_metric("Supportability/SpanEvent/TotalEventsSeen", spans_seen)
internal_count_metric("Supportability/SpanEvent/TotalEventsSent", spans_sampled)
if configuration.distributed_tracing.sampler.partial_granularity.enabled:
internal_count_metric(
f"Supportability/Python/PartialGranularity/{configuration.distributed_tracing.sampler.partial_granularity.type}",
1,
)

stats.reset_span_events()

# Send error events
Expand Down
5 changes: 5 additions & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@
"message.routingKey",
"messaging.destination.name",
"messaging.system",
"nr.durations",
"nr.ids",
"nr.pg",
"peer.address",
"peer.hostname",
"request.headers.accept",
Expand Down Expand Up @@ -125,6 +128,8 @@
"span.kind",
}

SPAN_ERROR_ATTRIBUTES = {"error.class", "error.message", "error.expected"}


MAX_NUM_USER_ATTRIBUTES = 128
MAX_ATTRIBUTE_LENGTH = 255
Expand Down
148 changes: 82 additions & 66 deletions newrelic/core/node_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,71 +79,91 @@ def span_event(
if not partial_granularity_sampled:
# intrinsics, user attrs, agent attrs
return [i_attrs, u_attrs, a_attrs]
else:
if ct_exit_spans is None:
ct_exit_spans = {}

partial_granularity_type = settings.distributed_tracing.sampler.partial_granularity.type
exit_span_attrs_present = attribute.SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES & set(a_attrs)
# If this is the entry node or an LLM span always return it.
if i_attrs.get("nr.entryPoint") or i_attrs["name"].startswith("Llm/"):
if partial_granularity_type == "reduced":
return [i_attrs, u_attrs, a_attrs]
else:
return [i_attrs, {}, {}]
# If the span is not an exit span, skip it by returning None.
if not exit_span_attrs_present:
return None
# If the span is an exit span and we are in reduced mode (meaning no attribute dropping),
# just return the exit span as is.

if ct_exit_spans is None:
ct_exit_spans = {"instrumented": 0, "kept": 0, "dropped_ids": 0}
ct_exit_spans["instrumented"] += 1

partial_granularity_type = settings.distributed_tracing.sampler.partial_granularity.type
a_attrs_set = set(a_attrs)
exit_span_attrs_present = attribute.SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES & a_attrs_set
exit_span_error_attrs_present = attribute.SPAN_ERROR_ATTRIBUTES & a_attrs_set
# If this is an entry span, add `nr.pg` to indicate transaction is partial
# granularity sampled.
if i_attrs.get("nr.entryPoint"):
a_attrs["nr.pg"] = True
# If this is the entry node or an LLM span always return it.
if i_attrs.get("nr.entryPoint") or i_attrs["name"].startswith("Llm/"):
ct_exit_spans["kept"] += 1
if partial_granularity_type == "reduced":
return [i_attrs, u_attrs, a_attrs]
else:
a_minimized_attrs = attr_class({key: a_attrs[key] for key in exit_span_attrs_present})
# If we are in essential mode return the span with minimized attributes.
if partial_granularity_type == "essential":
return [i_attrs, {}, a_minimized_attrs]
# If the span is an exit span but span compression (compact) is enabled,
# we need to check for uniqueness before returning it.
# Combine all the entity relationship attr values into a string to be
# used as the hash to check for uniqueness.
span_attrs = "".join([str(a_minimized_attrs[key]) for key in exit_span_attrs_present])
new_exit_span = span_attrs not in ct_exit_spans
# If this is a new exit span, add it to the known ct_exit_spans and
# return it.
if new_exit_span:
# nr.ids is the list of span guids that share this unqiue exit span.
a_minimized_attrs["nr.ids"] = []
a_minimized_attrs["nr.durations"] = self.duration
ct_exit_spans[span_attrs] = [i_attrs, a_minimized_attrs]
return [i_attrs, {}, a_minimized_attrs]
# If this is an exit span we've already seen, add it's guid to the list
# of ids on the seen span, compute the new duration & start time, and
# return None.
return [i_attrs, {}, {key: a_attrs.get(key) for key in exit_span_error_attrs_present | {"nr.pg"}}]
# If the span is not an exit span, skip it by returning None.
if not exit_span_attrs_present:
return None
# If the span is an exit span and we are in reduced mode (meaning no attribute dropping),
# just return the exit span as is.
if partial_granularity_type == "reduced":
ct_exit_spans["kept"] += 1
return [i_attrs, u_attrs, a_attrs]
else:
a_minimized_attrs = attr_class(
{key: a_attrs[key] for key in (exit_span_attrs_present | exit_span_error_attrs_present)}
)
# If we are in essential mode return the span with minimized attributes.
if partial_granularity_type == "essential":
ct_exit_spans["kept"] += 1
return [i_attrs, {}, a_minimized_attrs]
# If the span is an exit span but span compression (compact) is enabled,
# we need to check for uniqueness before returning it.
# Combine all the entity relationship attr values into a string to be
# used as the hash to check for uniqueness.
span_attrs = "".join([str(a_minimized_attrs[key]) for key in exit_span_attrs_present])
new_exit_span = span_attrs not in ct_exit_spans
# If this is a new exit span, add it to the known ct_exit_spans and
# return it.
if new_exit_span:
# nr.ids is the list of span guids that share this unqiue exit span.
a_minimized_attrs["nr.ids"] = []
a_minimized_attrs["nr.durations"] = self.duration
ct_exit_spans[span_attrs] = [i_attrs, a_minimized_attrs]
ct_exit_spans["kept"] += 1
return [i_attrs, {}, a_minimized_attrs]
# If this is an exit span we've already seen, add the error attributes
# (last occurring error takes precedence), add it's guid to the list
# of ids on the seen span, compute the new duration & start time, and
# return None.
ct_exit_spans[span_attrs][1].update(
attr_class({key: a_minimized_attrs[key] for key in exit_span_error_attrs_present})
)
# Max size for `nr.ids` = 1024. Max length = 63 (each span id is 16 bytes + 8 bytes for list type).
if len(ct_exit_spans[span_attrs][1]["nr.ids"]) < 63:
ct_exit_spans[span_attrs][1]["nr.ids"].append(self.guid)
# Max size for `nr.ids` = 1024. Max length = 63 (each span id is 16 bytes + 8 bytes for list type).
ct_exit_spans[span_attrs][1]["nr.ids"] = ct_exit_spans[span_attrs][1]["nr.ids"][:63]
# Compute the new start and end time for all compressed spans and use
# that to set the duration for all compressed spans.
current_start_time = ct_exit_spans[span_attrs][0]["timestamp"]
current_end_time = (
ct_exit_spans[span_attrs][0]["timestamp"] / 1000 + ct_exit_spans[span_attrs][1]["nr.durations"]
)
new_start_time = i_attrs["timestamp"]
new_end_time = i_attrs["timestamp"] / 1000 + i_attrs["duration"]
set_start_time = min(new_start_time, current_start_time)
# If the new span starts after the old span's end time or the new span
# ends before the current span starts; add the durations.
if current_end_time < new_start_time / 1000 or new_end_time < current_start_time / 1000:
set_duration = ct_exit_spans[span_attrs][1]["nr.durations"] + i_attrs["duration"]
# Otherwise, if the new and old span's overlap in time, use the newest
# end time and subtract the start time from it to calculate the new
# duration.
else:
set_duration = max(current_end_time, new_end_time) - set_start_time / 1000
ct_exit_spans[span_attrs][0]["timestamp"] = set_start_time
ct_exit_spans[span_attrs][1]["nr.durations"] = set_duration
return None
else:
ct_exit_spans["dropped_ids"] += 1

ct_exit_spans[span_attrs][1]["nr.ids"] = ct_exit_spans[span_attrs][1]["nr.ids"][:63]
# Compute the new start and end time for all compressed spans and use
# that to set the duration for all compressed spans.
current_start_time = ct_exit_spans[span_attrs][0]["timestamp"]
current_end_time = (
ct_exit_spans[span_attrs][0]["timestamp"] / 1000 + ct_exit_spans[span_attrs][1]["nr.durations"]
)
new_start_time = i_attrs["timestamp"]
new_end_time = i_attrs["timestamp"] / 1000 + i_attrs["duration"]
set_start_time = min(new_start_time, current_start_time)
# If the new span starts after the old span's end time or the new span
# ends before the current span starts; add the durations.
if current_end_time < new_start_time / 1000 or new_end_time < current_start_time / 1000:
set_duration = ct_exit_spans[span_attrs][1]["nr.durations"] + i_attrs["duration"]
# Otherwise, if the new and old span's overlap in time, use the newest
# end time and subtract the start time from it to calculate the new
# duration.
else:
set_duration = max(current_end_time, new_end_time) - set_start_time / 1000
ct_exit_spans[span_attrs][0]["timestamp"] = set_start_time
ct_exit_spans[span_attrs][1]["nr.durations"] = set_duration

def span_events(
self,
Expand All @@ -162,13 +182,11 @@ def span_events(
partial_granularity_sampled=partial_granularity_sampled,
ct_exit_spans=ct_exit_spans,
)
ct_exit_spans["instrumented"] += 1
parent_id = parent_guid
if span: # span will be None if the span is an inprocess span or repeated exit span.
ct_exit_spans["kept"] += 1
yield span
# Compressed spans are always reparented onto the entry span.
if not settings.distributed_tracing.sampler.partial_granularity.type == "compact" or span[0].get(
if settings.distributed_tracing.sampler.partial_granularity.type != "compact" or span[0].get(
"nr.entryPoint"
):
parent_id = self.guid
Expand All @@ -181,9 +199,7 @@ def span_events(
partial_granularity_sampled=partial_granularity_sampled,
ct_exit_spans=ct_exit_spans,
):
ct_exit_spans["instrumented"] += 1
if event: # event will be None if the span is an inprocess span or repeated exit span.
ct_exit_spans["kept"] += 1
yield event


Expand Down
30 changes: 19 additions & 11 deletions newrelic/core/stats_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,18 +1192,26 @@ def record_transaction(self, transaction):
self._span_events.add(event, priority=transaction.priority)
if transaction.partial_granularity_sampled:
partial_gran_type = settings.distributed_tracing.sampler.partial_granularity.type
self.record_custom_metrics(
[
(
f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Instrumented",
{"count": transaction.instrumented},
),
(
f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Kept",
{"count": transaction.kept},
),
]
self.record_custom_metric(
f"Supportability/Python/PartialGranularity/{partial_gran_type}", {"count": 1}
)
instrumented = getattr(transaction, "instrumented", 0)
if instrumented:
self.record_custom_metric(
f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Instrumented",
{"count": instrumented},
)
kept = getattr(transaction, "kept", 0)
if instrumented:
self.record_custom_metric(
f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Kept",
{"count": kept},
)
dropped_ids = getattr(transaction, "dropped_ids", 0)
if dropped_ids:
self.record_custom_metric(
"Supportability/Python/PartialGranularity/NrIds/Dropped", {"count": dropped_ids}
)

# Merge in log events

Expand Down
4 changes: 3 additions & 1 deletion newrelic/core/transaction_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ def span_events(self, settings, attr_class=dict):
("priority", self.priority),
)
)
ct_exit_spans = {"instrumented": 0, "kept": 0}
ct_exit_spans = {"instrumented": 0, "kept": 0, "dropped_ids": 0}
yield from self.root.span_events(
settings,
base_attrs,
Expand All @@ -646,6 +646,8 @@ def span_events(self, settings, attr_class=dict):
# If this transaction is partial granularity sampled, record the number of spans
# instrumented and the number of spans kept to monitor cost savings of partial
# granularity tracing.
# Also record the number of span ids dropped (fragmentation) in compact mode.
if self.partial_granularity_sampled:
self.instrumented = ct_exit_spans["instrumented"]
self.kept = ct_exit_spans["kept"]
self.dropped_ids = ct_exit_spans["dropped_ids"]
Loading
Loading