diff --git a/newrelic/core/application.py b/newrelic/core/application.py index c71eba909..389440f60 100644 --- a/newrelic/core/application.py +++ b/newrelic/core/application.py @@ -1368,12 +1368,6 @@ def harvest(self, shutdown=False, flexible=False): spans_sampled = spans.num_samples internal_count_metric("Supportability/SpanEvent/TotalEventsSeen", spans_seen) internal_count_metric("Supportability/SpanEvent/TotalEventsSent", spans_sampled) - if configuration.distributed_tracing.sampler.partial_granularity.enabled: - internal_count_metric( - f"Supportability/Python/PartialGranularity/{configuration.distributed_tracing.sampler.partial_granularity.type}", - 1, - ) - stats.reset_span_events() # Send error events diff --git a/newrelic/core/attribute.py b/newrelic/core/attribute.py index 49bc890a8..f396c660c 100644 --- a/newrelic/core/attribute.py +++ b/newrelic/core/attribute.py @@ -87,6 +87,9 @@ "message.routingKey", "messaging.destination.name", "messaging.system", + "nr.durations", + "nr.ids", + "nr.pg", "peer.address", "peer.hostname", "request.headers.accept", @@ -125,6 +128,8 @@ "span.kind", } +SPAN_ERROR_ATTRIBUTES = {"error.class", "error.message", "error.expected"} + MAX_NUM_USER_ATTRIBUTES = 128 MAX_ATTRIBUTE_LENGTH = 255 diff --git a/newrelic/core/node_mixin.py b/newrelic/core/node_mixin.py index 92f097582..f0aae1a22 100644 --- a/newrelic/core/node_mixin.py +++ b/newrelic/core/node_mixin.py @@ -79,71 +79,91 @@ def span_event( if not partial_granularity_sampled: # intrinsics, user attrs, agent attrs return [i_attrs, u_attrs, a_attrs] - else: - if ct_exit_spans is None: - ct_exit_spans = {} - - partial_granularity_type = settings.distributed_tracing.sampler.partial_granularity.type - exit_span_attrs_present = attribute.SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES & set(a_attrs) - # If this is the entry node or an LLM span always return it. - if i_attrs.get("nr.entryPoint") or i_attrs["name"].startswith("Llm/"): - if partial_granularity_type == "reduced": - return [i_attrs, u_attrs, a_attrs] - else: - return [i_attrs, {}, {}] - # If the span is not an exit span, skip it by returning None. - if not exit_span_attrs_present: - return None - # If the span is an exit span and we are in reduced mode (meaning no attribute dropping), - # just return the exit span as is. + + if ct_exit_spans is None: + ct_exit_spans = {"instrumented": 0, "kept": 0, "dropped_ids": 0} + ct_exit_spans["instrumented"] += 1 + + partial_granularity_type = settings.distributed_tracing.sampler.partial_granularity.type + a_attrs_set = set(a_attrs) + exit_span_attrs_present = attribute.SPAN_ENTITY_RELATIONSHIP_ATTRIBUTES & a_attrs_set + exit_span_error_attrs_present = attribute.SPAN_ERROR_ATTRIBUTES & a_attrs_set + # If this is an entry span, add `nr.pg` to indicate transaction is partial + # granularity sampled. + if i_attrs.get("nr.entryPoint"): + a_attrs["nr.pg"] = True + # If this is the entry node or an LLM span always return it. + if i_attrs.get("nr.entryPoint") or i_attrs["name"].startswith("Llm/"): + ct_exit_spans["kept"] += 1 if partial_granularity_type == "reduced": return [i_attrs, u_attrs, a_attrs] else: - a_minimized_attrs = attr_class({key: a_attrs[key] for key in exit_span_attrs_present}) - # If we are in essential mode return the span with minimized attributes. - if partial_granularity_type == "essential": - return [i_attrs, {}, a_minimized_attrs] - # If the span is an exit span but span compression (compact) is enabled, - # we need to check for uniqueness before returning it. - # Combine all the entity relationship attr values into a string to be - # used as the hash to check for uniqueness. - span_attrs = "".join([str(a_minimized_attrs[key]) for key in exit_span_attrs_present]) - new_exit_span = span_attrs not in ct_exit_spans - # If this is a new exit span, add it to the known ct_exit_spans and - # return it. - if new_exit_span: - # nr.ids is the list of span guids that share this unqiue exit span. - a_minimized_attrs["nr.ids"] = [] - a_minimized_attrs["nr.durations"] = self.duration - ct_exit_spans[span_attrs] = [i_attrs, a_minimized_attrs] - return [i_attrs, {}, a_minimized_attrs] - # If this is an exit span we've already seen, add it's guid to the list - # of ids on the seen span, compute the new duration & start time, and - # return None. + return [i_attrs, {}, {key: a_attrs.get(key) for key in exit_span_error_attrs_present | {"nr.pg"}}] + # If the span is not an exit span, skip it by returning None. + if not exit_span_attrs_present: + return None + # If the span is an exit span and we are in reduced mode (meaning no attribute dropping), + # just return the exit span as is. + if partial_granularity_type == "reduced": + ct_exit_spans["kept"] += 1 + return [i_attrs, u_attrs, a_attrs] + else: + a_minimized_attrs = attr_class( + {key: a_attrs[key] for key in (exit_span_attrs_present | exit_span_error_attrs_present)} + ) + # If we are in essential mode return the span with minimized attributes. + if partial_granularity_type == "essential": + ct_exit_spans["kept"] += 1 + return [i_attrs, {}, a_minimized_attrs] + # If the span is an exit span but span compression (compact) is enabled, + # we need to check for uniqueness before returning it. + # Combine all the entity relationship attr values into a string to be + # used as the hash to check for uniqueness. + span_attrs = "".join([str(a_minimized_attrs[key]) for key in exit_span_attrs_present]) + new_exit_span = span_attrs not in ct_exit_spans + # If this is a new exit span, add it to the known ct_exit_spans and + # return it. + if new_exit_span: + # nr.ids is the list of span guids that share this unqiue exit span. + a_minimized_attrs["nr.ids"] = [] + a_minimized_attrs["nr.durations"] = self.duration + ct_exit_spans[span_attrs] = [i_attrs, a_minimized_attrs] + ct_exit_spans["kept"] += 1 + return [i_attrs, {}, a_minimized_attrs] + # If this is an exit span we've already seen, add the error attributes + # (last occurring error takes precedence), add it's guid to the list + # of ids on the seen span, compute the new duration & start time, and + # return None. + ct_exit_spans[span_attrs][1].update( + attr_class({key: a_minimized_attrs[key] for key in exit_span_error_attrs_present}) + ) + # Max size for `nr.ids` = 1024. Max length = 63 (each span id is 16 bytes + 8 bytes for list type). + if len(ct_exit_spans[span_attrs][1]["nr.ids"]) < 63: ct_exit_spans[span_attrs][1]["nr.ids"].append(self.guid) - # Max size for `nr.ids` = 1024. Max length = 63 (each span id is 16 bytes + 8 bytes for list type). - ct_exit_spans[span_attrs][1]["nr.ids"] = ct_exit_spans[span_attrs][1]["nr.ids"][:63] - # Compute the new start and end time for all compressed spans and use - # that to set the duration for all compressed spans. - current_start_time = ct_exit_spans[span_attrs][0]["timestamp"] - current_end_time = ( - ct_exit_spans[span_attrs][0]["timestamp"] / 1000 + ct_exit_spans[span_attrs][1]["nr.durations"] - ) - new_start_time = i_attrs["timestamp"] - new_end_time = i_attrs["timestamp"] / 1000 + i_attrs["duration"] - set_start_time = min(new_start_time, current_start_time) - # If the new span starts after the old span's end time or the new span - # ends before the current span starts; add the durations. - if current_end_time < new_start_time / 1000 or new_end_time < current_start_time / 1000: - set_duration = ct_exit_spans[span_attrs][1]["nr.durations"] + i_attrs["duration"] - # Otherwise, if the new and old span's overlap in time, use the newest - # end time and subtract the start time from it to calculate the new - # duration. - else: - set_duration = max(current_end_time, new_end_time) - set_start_time / 1000 - ct_exit_spans[span_attrs][0]["timestamp"] = set_start_time - ct_exit_spans[span_attrs][1]["nr.durations"] = set_duration - return None + else: + ct_exit_spans["dropped_ids"] += 1 + + ct_exit_spans[span_attrs][1]["nr.ids"] = ct_exit_spans[span_attrs][1]["nr.ids"][:63] + # Compute the new start and end time for all compressed spans and use + # that to set the duration for all compressed spans. + current_start_time = ct_exit_spans[span_attrs][0]["timestamp"] + current_end_time = ( + ct_exit_spans[span_attrs][0]["timestamp"] / 1000 + ct_exit_spans[span_attrs][1]["nr.durations"] + ) + new_start_time = i_attrs["timestamp"] + new_end_time = i_attrs["timestamp"] / 1000 + i_attrs["duration"] + set_start_time = min(new_start_time, current_start_time) + # If the new span starts after the old span's end time or the new span + # ends before the current span starts; add the durations. + if current_end_time < new_start_time / 1000 or new_end_time < current_start_time / 1000: + set_duration = ct_exit_spans[span_attrs][1]["nr.durations"] + i_attrs["duration"] + # Otherwise, if the new and old span's overlap in time, use the newest + # end time and subtract the start time from it to calculate the new + # duration. + else: + set_duration = max(current_end_time, new_end_time) - set_start_time / 1000 + ct_exit_spans[span_attrs][0]["timestamp"] = set_start_time + ct_exit_spans[span_attrs][1]["nr.durations"] = set_duration def span_events( self, @@ -162,13 +182,11 @@ def span_events( partial_granularity_sampled=partial_granularity_sampled, ct_exit_spans=ct_exit_spans, ) - ct_exit_spans["instrumented"] += 1 parent_id = parent_guid if span: # span will be None if the span is an inprocess span or repeated exit span. - ct_exit_spans["kept"] += 1 yield span # Compressed spans are always reparented onto the entry span. - if not settings.distributed_tracing.sampler.partial_granularity.type == "compact" or span[0].get( + if settings.distributed_tracing.sampler.partial_granularity.type != "compact" or span[0].get( "nr.entryPoint" ): parent_id = self.guid @@ -181,9 +199,7 @@ def span_events( partial_granularity_sampled=partial_granularity_sampled, ct_exit_spans=ct_exit_spans, ): - ct_exit_spans["instrumented"] += 1 if event: # event will be None if the span is an inprocess span or repeated exit span. - ct_exit_spans["kept"] += 1 yield event diff --git a/newrelic/core/stats_engine.py b/newrelic/core/stats_engine.py index 507139dfd..fc440971d 100644 --- a/newrelic/core/stats_engine.py +++ b/newrelic/core/stats_engine.py @@ -1192,18 +1192,26 @@ def record_transaction(self, transaction): self._span_events.add(event, priority=transaction.priority) if transaction.partial_granularity_sampled: partial_gran_type = settings.distributed_tracing.sampler.partial_granularity.type - self.record_custom_metrics( - [ - ( - f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Instrumented", - {"count": transaction.instrumented}, - ), - ( - f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Kept", - {"count": transaction.kept}, - ), - ] + self.record_custom_metric( + f"Supportability/Python/PartialGranularity/{partial_gran_type}", {"count": 1} ) + instrumented = getattr(transaction, "instrumented", 0) + if instrumented: + self.record_custom_metric( + f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Instrumented", + {"count": instrumented}, + ) + kept = getattr(transaction, "kept", 0) + if instrumented: + self.record_custom_metric( + f"Supportability/DistributedTrace/PartialGranularity/{partial_gran_type}/Span/Kept", + {"count": kept}, + ) + dropped_ids = getattr(transaction, "dropped_ids", 0) + if dropped_ids: + self.record_custom_metric( + "Supportability/Python/PartialGranularity/NrIds/Dropped", {"count": dropped_ids} + ) # Merge in log events diff --git a/newrelic/core/transaction_node.py b/newrelic/core/transaction_node.py index eaa3b5f34..f5435c0fc 100644 --- a/newrelic/core/transaction_node.py +++ b/newrelic/core/transaction_node.py @@ -634,7 +634,7 @@ def span_events(self, settings, attr_class=dict): ("priority", self.priority), ) ) - ct_exit_spans = {"instrumented": 0, "kept": 0} + ct_exit_spans = {"instrumented": 0, "kept": 0, "dropped_ids": 0} yield from self.root.span_events( settings, base_attrs, @@ -646,6 +646,8 @@ def span_events(self, settings, attr_class=dict): # If this transaction is partial granularity sampled, record the number of spans # instrumented and the number of spans kept to monitor cost savings of partial # granularity tracing. + # Also record the number of span ids dropped (fragmentation) in compact mode. if self.partial_granularity_sampled: self.instrumented = ct_exit_spans["instrumented"] self.kept = ct_exit_spans["kept"] + self.dropped_ids = ct_exit_spans["dropped_ids"] diff --git a/tests/agent_features/test_distributed_tracing.py b/tests/agent_features/test_distributed_tracing.py index 46f441cfb..6e616ba68 100644 --- a/tests/agent_features/test_distributed_tracing.py +++ b/tests/agent_features/test_distributed_tracing.py @@ -30,6 +30,7 @@ from newrelic.api.application import application_instance from newrelic.api.function_trace import function_trace +from newrelic.common.object_names import callable_name from newrelic.common.object_wrapper import function_wrapper, transient_function_wrapper try: @@ -947,6 +948,7 @@ async def call_tests(): exact_intrinsics={ "name": "Function/test_distributed_tracing:test_partial_granularity_max_compressed_spans.._test" }, + exact_agents={"nr.pg": True}, expected_intrinsics=["duration", "timestamp"], ) @validate_span_events( @@ -1001,6 +1003,7 @@ async def call_tests(): exact_intrinsics={ "name": "Function/test_distributed_tracing:test_partial_granularity_compressed_span_attributes_in_series.._test" }, + exact_agents={"nr.pg": True}, expected_intrinsics=["duration", "timestamp"], ) @validate_span_events( @@ -1045,6 +1048,7 @@ def test_partial_granularity_compressed_span_attributes_overlapping(): exact_intrinsics={ "name": "Function/test_distributed_tracing:test_partial_granularity_compressed_span_attributes_overlapping.._test" }, + exact_agents={"nr.pg": True}, expected_intrinsics=["duration", "timestamp"], ) @validate_span_events( @@ -1099,6 +1103,7 @@ def foo(): exact_intrinsics={ "name": "Function/test_distributed_tracing:test_partial_granularity_reduced_span_attributes.._test" }, + exact_agents={"nr.pg": True}, expected_intrinsics=["duration", "timestamp"], expected_agents=["code.function", "code.lineno", "code.namespace"], ) @@ -1153,6 +1158,7 @@ def foo(): exact_intrinsics={ "name": "Function/test_distributed_tracing:test_partial_granularity_essential_span_attributes.._test" }, + exact_agents={"nr.pg": True}, expected_intrinsics=["duration", "timestamp"], unexpected_agents=["code.function", "code.lineno", "code.namespace"], ) @@ -1460,3 +1466,130 @@ def _test(): accept_distributed_trace_headers(headers) _test() + + +def test_partial_granularity_errors_on_compressed_spans(): + @function_trace() + def call_tests(): + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + time.sleep(0.1) + transaction = current_transaction() + try: + raise Exception("Exception 1") + except: + transaction.notice_error() + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + time.sleep(0.1) + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + time.sleep(0.1) + transaction = current_transaction() + try: + raise Exception("Exception 2") + except: + transaction.notice_error(expected=True) + + @validate_span_events( + count=1, # Entry span. + exact_intrinsics={ + "name": "Function/test_distributed_tracing:test_partial_granularity_errors_on_compressed_spans.._test" + }, + exact_agents={"nr.pg": True}, + expected_intrinsics=["duration", "timestamp"], + ) + @validate_span_events( + count=1, # 1 external compressed span. + exact_intrinsics={"name": "External/localhost:3000/requests/GET"}, + exact_agents={ + "http.url": "http://localhost:3000/", + "error.class": callable_name(Exception), + "error.message": "Exception 2", + "error.expected": True, + }, + expected_agents=["nr.durations", "nr.ids"], + ) + @validate_compact_span_event( + name="External/localhost:3000/requests/GET", + compressed_span_count=3, + expected_nr_durations_low_bound=0.3, + expected_nr_durations_high_bound=0.4, + ) + @background_task() + def _test(): + headers = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"} + accept_distributed_trace_headers(headers) + call_tests() + + _test = override_application_settings( + { + "distributed_tracing.sampler.full_granularity.enabled": False, + "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.type": "compact", + "distributed_tracing.sampler.partial_granularity._remote_parent_sampled": "always_on", + "span_events.enabled": True, + } + )(_test) + + _test() + + +def test_partial_granularity_errors_on_compressed_spans_status_overriden(): + @function_trace() + def call_tests(): + transaction = current_transaction() + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + time.sleep(0.1) + try: + raise Exception("Exception 1") + except: + transaction.notice_error(expected=True) + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + time.sleep(0.1) + with ExternalTrace("requests", "http://localhost:3000/", method="GET") as trace: + time.sleep(0.1) + try: + raise Exception("Exception 2") + except: + transaction.notice_error() + + @validate_span_events( + count=1, # Entry span. + exact_intrinsics={ + "name": "Function/test_distributed_tracing:test_partial_granularity_errors_on_compressed_spans_status_overriden.._test" + }, + exact_agents={"nr.pg": True}, + expected_intrinsics=["duration", "timestamp"], + ) + @validate_span_events( + count=1, # 1 external compressed span. + exact_intrinsics={"name": "External/localhost:3000/requests/GET"}, + exact_agents={ + "http.url": "http://localhost:3000/", + "error.class": callable_name(Exception), + "error.message": "Exception 2", + "error.expected": False, + }, + expected_agents=["nr.durations", "nr.ids"], + ) + @validate_compact_span_event( + name="External/localhost:3000/requests/GET", + compressed_span_count=3, + expected_nr_durations_low_bound=0.3, + expected_nr_durations_high_bound=0.4, + ) + @background_task() + def _test(): + headers = {"traceparent": "00-0af7651916cd43dd8448eb211c80319c-00f067aa0ba902b7-01"} + accept_distributed_trace_headers(headers) + call_tests() + + _test = override_application_settings( + { + "distributed_tracing.sampler.full_granularity.enabled": False, + "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.type": "compact", + "distributed_tracing.sampler.partial_granularity._remote_parent_sampled": "always_on", + "span_events.enabled": True, + } + )(_test) + + _test() diff --git a/tests/agent_unittests/test_harvest_loop.py b/tests/agent_unittests/test_harvest_loop.py index a52e4dbe9..675c171e2 100644 --- a/tests/agent_unittests/test_harvest_loop.py +++ b/tests/agent_unittests/test_harvest_loop.py @@ -27,6 +27,7 @@ from newrelic.core.config import finalize_application_settings, global_settings from newrelic.core.custom_event import create_custom_event from newrelic.core.error_node import ErrorNode +from newrelic.core.external_node import ExternalNode from newrelic.core.function_node import FunctionNode from newrelic.core.log_event_node import LogEventNode from newrelic.core.root_node import RootNode @@ -88,7 +89,24 @@ def _transaction_node(partial_granularity=False): user_attributes={}, ) - children = tuple(function for _ in range(num_events)) + children = [function for _ in range(num_events)] + + function = ExternalNode( + library="requests", + url="http:localhost:3000", + method="GET", + children=(), + start_time=0, + end_time=1, + duration=1, + exclusive=1, + params={}, + guid="GUID", + agent_attributes={}, + user_attributes={}, + ) + + children.extend([function for _ in range(num_events)]) root = RootNode( name="Function/main", @@ -362,8 +380,6 @@ def test_application_harvest_with_spans( spans_required_metrics.extend( [("Supportability/SpanEvent/TotalEventsSeen", seen), ("Supportability/SpanEvent/TotalEventsSent", sent)] ) - if partial_granularity_enabled: - spans_required_metrics.extend([("Supportability/Python/PartialGranularity/essential", 1)]) @validate_metric_payload(metrics=spans_required_metrics, endpoints_called=span_endpoints_called) @override_generic_settings( @@ -511,6 +527,7 @@ def test_transaction_count(transaction_node): "application_logging.forwarding.enabled": False, "distributed_tracing.sampler.full_granularity.enabled": False, "distributed_tracing.sampler.partial_granularity.enabled": True, + "distributed_tracing.sampler.partial_granularity.type": "compact", }, ) def test_partial_granularity_metrics(transaction_node): @@ -523,10 +540,14 @@ def test_partial_granularity_metrics(transaction_node): # Harvest has not run yet assert app._transaction_count == 1 - instrumented = "Supportability/DistributedTrace/PartialGranularity/essential/Span/Instrumented" - kept = "Supportability/DistributedTrace/PartialGranularity/essential/Span/Kept" - assert app._stats_engine.stats_table[(instrumented, "")][0] == 102 - assert app._stats_engine.stats_table[(kept, "")][0] == 1 + instrumented = "Supportability/DistributedTrace/PartialGranularity/compact/Span/Instrumented" + kept = "Supportability/DistributedTrace/PartialGranularity/compact/Span/Kept" + pg = "Supportability/Python/PartialGranularity/compact" + dropped_ids = "Supportability/Python/PartialGranularity/NrIds/Dropped" + assert app._stats_engine.stats_table[(instrumented, "")][0] == 203 + assert app._stats_engine.stats_table[(kept, "")][0] == 2 + assert app._stats_engine.stats_table[(pg, "")][0] == 1 + assert app._stats_engine.stats_table[(dropped_ids, "")][0] == 37 app.harvest()