-
Notifications
You must be signed in to change notification settings - Fork 132
Hybrid agent context management #1589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8f2b281
63d7803
15712ed
61638e2
66e567f
e6116c6
6f9e55c
0356205
2a05eef
71dad25
3649a90
b846930
a41d413
c29cf43
31c4106
fbb6d84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,10 @@ | |
| from contextlib import contextmanager | ||
|
|
||
| from opentelemetry import trace as otel_api_trace | ||
| from opentelemetry.baggage.propagation import W3CBaggagePropagator | ||
| from opentelemetry.propagate import set_global_textmap | ||
| from opentelemetry.propagators.composite import CompositePropagator | ||
| from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator | ||
|
|
||
| from newrelic.api.application import application_instance | ||
| from newrelic.api.background_task import BackgroundTask | ||
|
|
@@ -33,21 +37,68 @@ | |
| _logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class NRTraceContextPropagator(TraceContextTextMapPropagator): | ||
| HEADER_KEYS = ("traceparent", "tracestate", "newrelic") | ||
|
|
||
| def extract(self, carrier, context=None, getter=None): | ||
| transaction = current_transaction() | ||
| # If we are passing into New Relic, traceparent | ||
| # and/or tracestate's keys also need to be NR compatible. | ||
|
|
||
| if transaction: | ||
| nr_headers = { | ||
| header_key: getter.get(carrier, header_key)[0] | ||
| for header_key in self.HEADER_KEYS | ||
| if getter.get(carrier, header_key) | ||
| } | ||
| transaction.accept_distributed_trace_headers(nr_headers) | ||
| return super().extract(carrier=carrier, context=context, getter=getter) | ||
|
|
||
| def inject(self, carrier, context=None, setter=None): | ||
| transaction = current_transaction() | ||
| # Only insert headers if we have not done so already this transaction | ||
| # New Relic's Distributed Trace State will have the following states: | ||
| # 0 (00) if not set: | ||
| # Transaction has not inserted any outbound headers nor has | ||
| # it accepted any inbound headers (yet). | ||
| # 1 (01) if already accepted: | ||
| # Transaction has accepted inbound headers and is able to | ||
| # insert outbound headers to the next app if needed. | ||
| # 2 (10) if inserted but not accepted: | ||
| # Transaction has inserted outbound headers already. | ||
| # Do not insert outbound headers multiple times. This is | ||
| # a fundamental difference in OTel vs NR behavior: if | ||
| # headers are inserted by OTel multiple times, it will | ||
| # propagate the last set of data that was inserted. NR | ||
| # will not allow more than one header insertion per | ||
| # transaction. | ||
| # 3 (11) if accepted, then inserted: | ||
| # Transaction has accepted inbound headers and has inserted | ||
| # outbound headers. | ||
|
|
||
| if not transaction: | ||
| return super().inject(carrier=carrier, context=context, setter=setter) | ||
|
|
||
| if transaction._distributed_trace_state < 2: | ||
| nr_headers = [] | ||
| transaction.insert_distributed_trace_headers(nr_headers) | ||
| for key, value in nr_headers: | ||
| setter.set(carrier, key, value) | ||
| # Do NOT call super().inject() since we have already | ||
| # inserted the headers here. It will not cause harm, | ||
| # but it is redundant logic. | ||
|
|
||
| # If distributed_trace_state == 2 or 3, do not inject headers. | ||
|
|
||
|
|
||
| # Context and Context Propagator Setup | ||
| otel_context_propagator = CompositePropagator(propagators=[NRTraceContextPropagator(), W3CBaggagePropagator()]) | ||
| set_global_textmap(otel_context_propagator) | ||
|
|
||
| # ---------------------------------------------- | ||
| # Custom OTel Spans and Traces | ||
| # ---------------------------------------------- | ||
|
|
||
| # TracerProvider: we can think of this as the agent instance. Only one can exist | ||
| # SpanProcessor: we can think of this as an application. In NR, we can have multiple applications | ||
| # though right now, we can only do SpanProcessor and SynchronousMultiSpanProcessor | ||
| # Tracer: we can think of this as the transaction. | ||
| # Span: we can think of this as the trace. | ||
| # Links functionality has now been enabled but not implemented yet. Links are relationships | ||
| # between spans, but lateral in hierarchy. In NR we only have parent-child relationships. | ||
| # We may want to preserve this information with a custom attribute. We can also add this | ||
| # as a new attribute in a trace, but it will still not be seen in the UI other than a trace | ||
| # attribute. | ||
|
|
||
|
|
||
| class Span(otel_api_trace.Span): | ||
| def __init__( | ||
|
|
@@ -73,11 +124,6 @@ def __init__( | |
| self.nr_trace = None | ||
| self.instrumenting_module = instrumenting_module | ||
|
|
||
| # Do not create a New Relic trace if parent | ||
| # is a remote span and it is not sampled | ||
| if self._remote() and not self._sampled(): | ||
| return | ||
|
|
||
| self.nr_parent = None | ||
| current_nr_trace = current_trace() | ||
| if ( | ||
|
|
@@ -100,7 +146,7 @@ def __init__( | |
| _logger.error( | ||
| "OpenTelemetry span (%s) and NR trace (%s) do not match nor correspond to a remote span. Open Telemetry span will not be reported to New Relic. Please report this problem to New Relic.", | ||
| self.otel_parent, | ||
| current_nr_trace, | ||
| current_nr_trace, # NR parent trace | ||
| ) | ||
| return | ||
|
|
||
|
|
@@ -140,26 +186,6 @@ def __init__( | |
|
|
||
| self.nr_trace.__enter__() | ||
|
|
||
| def _sampled(self): | ||
| # Uses NR to determine if the trace is sampled | ||
| # | ||
| # transaction.sampled can be `None`, `True`, `False`. | ||
| # If `None`, this has not been computed by NR which | ||
| # can also mean the following: | ||
| # 1. There was not a context passed in that explicitly has sampling disabled. | ||
| # This flag would be found in the traceparent or traceparent and tracespan headers. | ||
| # 2. Transaction was not created where DT headers are accepted during __init__ | ||
| # Therefore, we will treat a value of `None` as `True` for now. | ||
| # | ||
| # The primary reason for this behavior is because Otel expects to | ||
| # only be able to record information like events and attributes | ||
| # when `is_recording()` == `True` | ||
|
|
||
| if self.otel_parent: | ||
| return bool(self.otel_parent.trace_flags) | ||
| else: | ||
| return bool(self.nr_transaction and (self.nr_transaction.sampled or (self.nr_transaction.sampled is None))) | ||
|
|
||
| def _remote(self): | ||
| # Remote span denotes if propagated from a remote parent | ||
| return bool(self.otel_parent and self.otel_parent.is_remote) | ||
|
|
@@ -168,14 +194,12 @@ def get_span_context(self): | |
| if not getattr(self, "nr_trace", False): | ||
| return otel_api_trace.INVALID_SPAN_CONTEXT | ||
|
|
||
| otel_tracestate_headers = None | ||
|
|
||
| return otel_api_trace.SpanContext( | ||
| trace_id=int(self.nr_transaction.trace_id, 16), | ||
| span_id=int(self.nr_trace.guid, 16), | ||
| is_remote=self._remote(), | ||
| trace_flags=otel_api_trace.TraceFlags(0x01 if self._sampled() else 0x00), | ||
| trace_state=otel_api_trace.TraceState(otel_tracestate_headers), | ||
| trace_flags=otel_api_trace.TraceFlags(0x01), | ||
| trace_state=otel_api_trace.TraceState(), | ||
| ) | ||
|
|
||
| def set_attribute(self, key, value): | ||
|
|
@@ -193,8 +217,7 @@ def _set_attributes_in_nr(self, otel_attributes=None): | |
|
|
||
| def add_event(self, name, attributes=None, timestamp=None): | ||
| # TODO: Not implemented yet. | ||
| # We can implement this as a log event | ||
| raise NotImplementedError("TODO: We can implement this as a log event.") | ||
| raise NotImplementedError("Events are not implemented yet.") | ||
|
|
||
| def add_link(self, context=None, attributes=None): | ||
| # TODO: Not implemented yet. | ||
|
|
@@ -208,7 +231,14 @@ def update_name(self, name): | |
| self.nr_trace.name = self._name | ||
|
|
||
| def is_recording(self): | ||
| return self._sampled() and not (getattr(self.nr_trace, None), "end_time", None) | ||
| # If the trace has an end time set then it is done recording. Otherwise, | ||
| # if it does not have an end time set and the transaction's priority | ||
| # has not been set yet or it is set to something other than 0 then it | ||
| # is also still recording. | ||
| if getattr(self.nr_trace, "end_time", None): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a case where a span has been created but recording hasn't started yet that we need to handle here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes--In most of the OTel library instrumentations,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify, it seems pretty common for it to call
lrafeei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return False | ||
|
|
||
| return getattr(self.nr_transaction, "priority", 1) > 0 | ||
|
|
||
| def set_status(self, status, description=None): | ||
| # TODO: not implemented yet | ||
|
|
@@ -277,6 +307,10 @@ def start_span( | |
| self.nr_application = application_instance() | ||
| self.attributes = attributes or {} | ||
|
|
||
| if not self.nr_application.active: | ||
| # Force application registration if not already active | ||
| self.nr_application.activate() | ||
|
|
||
| if not self.nr_application.settings.otel_bridge.enabled: | ||
| return otel_api_trace.INVALID_SPAN | ||
|
|
||
|
|
@@ -286,7 +320,12 @@ def start_span( | |
| if parent_span_context is None or not parent_span_context.is_valid: | ||
| parent_span_context = None | ||
|
|
||
| parent_span_trace_id = None | ||
| if parent_span_context and self.nr_application.settings.distributed_tracing.enabled: | ||
lrafeei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| parent_span_trace_id = parent_span_context.trace_id | ||
|
|
||
| # If remote_parent, transaction must be created, regardless of kind type | ||
| # Make sure we transfer DT headers when we are here, if DT is enabled | ||
| if parent_span_context and parent_span_context.is_remote: | ||
| if kind in (otel_api_trace.SpanKind.SERVER, otel_api_trace.SpanKind.CLIENT): | ||
| # This is a web request | ||
|
|
@@ -296,6 +335,7 @@ def start_span( | |
| port = self.attributes.get("net.host.port") | ||
| request_method = self.attributes.get("http.method") | ||
| request_path = self.attributes.get("http.route") | ||
|
|
||
| transaction = WebTransaction( | ||
| self.nr_application, | ||
| name=name, | ||
|
|
@@ -306,6 +346,7 @@ def start_span( | |
| request_path=request_path, | ||
| headers=headers, | ||
| ) | ||
|
|
||
| elif kind in (otel_api_trace.SpanKind.PRODUCER, otel_api_trace.SpanKind.INTERNAL): | ||
| transaction = BackgroundTask(self.nr_application, name=name) | ||
| elif kind == otel_api_trace.SpanKind.CONSUMER: | ||
|
|
@@ -315,7 +356,7 @@ def start_span( | |
| destination_name=name, | ||
| application=self.nr_application, | ||
| transport_type=self.instrumentation_library, | ||
| headers=headers, | ||
| headers=None, | ||
| ) | ||
|
|
||
| transaction.__enter__() | ||
|
|
@@ -348,6 +389,11 @@ def start_span( | |
| request_path=request_path, | ||
| headers=headers, | ||
| ) | ||
|
|
||
| transaction._trace_id = ( | ||
| f"{parent_span_trace_id:x}" if parent_span_trace_id else transaction.trace_id | ||
| ) | ||
|
|
||
| transaction.__enter__() | ||
| elif kind == otel_api_trace.SpanKind.INTERNAL: | ||
| if transaction: | ||
|
|
@@ -372,7 +418,7 @@ def start_span( | |
| destination_name=name, | ||
| application=self.nr_application, | ||
| transport_type=self.instrumentation_library, | ||
| headers=headers, | ||
| headers=None, | ||
| ) | ||
| transaction.__enter__() | ||
| elif kind == otel_api_trace.SpanKind.PRODUCER: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.