Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 92 additions & 46 deletions newrelic/api/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from contextlib import contextmanager

from opentelemetry import trace as otel_api_trace
from opentelemetry.baggage.propagation import W3CBaggagePropagator
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from newrelic.api.application import application_instance
from newrelic.api.background_task import BackgroundTask
Expand All @@ -33,21 +37,68 @@
_logger = logging.getLogger(__name__)


class NRTraceContextPropagator(TraceContextTextMapPropagator):
HEADER_KEYS = ("traceparent", "tracestate", "newrelic")

def extract(self, carrier, context=None, getter=None):
transaction = current_transaction()
# If we are passing into New Relic, traceparent
# and/or tracestate's keys also need to be NR compatible.

if transaction:
nr_headers = {
header_key: getter.get(carrier, header_key)[0]
for header_key in self.HEADER_KEYS
if getter.get(carrier, header_key)
}
transaction.accept_distributed_trace_headers(nr_headers)
return super().extract(carrier=carrier, context=context, getter=getter)

def inject(self, carrier, context=None, setter=None):
transaction = current_transaction()
# Only insert headers if we have not done so already this transaction
# New Relic's Distributed Trace State will have the following states:
# 0 (00) if not set:
# Transaction has not inserted any outbound headers nor has
# it accepted any inbound headers (yet).
# 1 (01) if already accepted:
# Transaction has accepted inbound headers and is able to
# insert outbound headers to the next app if needed.
# 2 (10) if inserted but not accepted:
# Transaction has inserted outbound headers already.
# Do not insert outbound headers multiple times. This is
# a fundamental difference in OTel vs NR behavior: if
# headers are inserted by OTel multiple times, it will
# propagate the last set of data that was inserted. NR
# will not allow more than one header insertion per
# transaction.
# 3 (11) if accepted, then inserted:
# Transaction has accepted inbound headers and has inserted
# outbound headers.

if not transaction:
return super().inject(carrier=carrier, context=context, setter=setter)

if transaction._distributed_trace_state < 2:
nr_headers = []
transaction.insert_distributed_trace_headers(nr_headers)
for key, value in nr_headers:
setter.set(carrier, key, value)
# Do NOT call super().inject() since we have already
# inserted the headers here. It will not cause harm,
# but it is redundant logic.

# If distributed_trace_state == 2 or 3, do not inject headers.


# Context and Context Propagator Setup
otel_context_propagator = CompositePropagator(propagators=[NRTraceContextPropagator(), W3CBaggagePropagator()])
set_global_textmap(otel_context_propagator)

# ----------------------------------------------
# Custom OTel Spans and Traces
# ----------------------------------------------

# TracerProvider: we can think of this as the agent instance. Only one can exist
# SpanProcessor: we can think of this as an application. In NR, we can have multiple applications
# though right now, we can only do SpanProcessor and SynchronousMultiSpanProcessor
# Tracer: we can think of this as the transaction.
# Span: we can think of this as the trace.
# Links functionality has now been enabled but not implemented yet. Links are relationships
# between spans, but lateral in hierarchy. In NR we only have parent-child relationships.
# We may want to preserve this information with a custom attribute. We can also add this
# as a new attribute in a trace, but it will still not be seen in the UI other than a trace
# attribute.


class Span(otel_api_trace.Span):
def __init__(
Expand All @@ -73,11 +124,6 @@ def __init__(
self.nr_trace = None
self.instrumenting_module = instrumenting_module

# Do not create a New Relic trace if parent
# is a remote span and it is not sampled
if self._remote() and not self._sampled():
return

self.nr_parent = None
current_nr_trace = current_trace()
if (
Expand All @@ -100,7 +146,7 @@ def __init__(
_logger.error(
"OpenTelemetry span (%s) and NR trace (%s) do not match nor correspond to a remote span. Open Telemetry span will not be reported to New Relic. Please report this problem to New Relic.",
self.otel_parent,
current_nr_trace,
current_nr_trace, # NR parent trace
)
return

Expand Down Expand Up @@ -140,26 +186,6 @@ def __init__(

self.nr_trace.__enter__()

def _sampled(self):
# Uses NR to determine if the trace is sampled
#
# transaction.sampled can be `None`, `True`, `False`.
# If `None`, this has not been computed by NR which
# can also mean the following:
# 1. There was not a context passed in that explicitly has sampling disabled.
# This flag would be found in the traceparent or traceparent and tracespan headers.
# 2. Transaction was not created where DT headers are accepted during __init__
# Therefore, we will treat a value of `None` as `True` for now.
#
# The primary reason for this behavior is because Otel expects to
# only be able to record information like events and attributes
# when `is_recording()` == `True`

if self.otel_parent:
return bool(self.otel_parent.trace_flags)
else:
return bool(self.nr_transaction and (self.nr_transaction.sampled or (self.nr_transaction.sampled is None)))

def _remote(self):
# Remote span denotes if propagated from a remote parent
return bool(self.otel_parent and self.otel_parent.is_remote)
Expand All @@ -168,14 +194,12 @@ def get_span_context(self):
if not getattr(self, "nr_trace", False):
return otel_api_trace.INVALID_SPAN_CONTEXT

otel_tracestate_headers = None

return otel_api_trace.SpanContext(
trace_id=int(self.nr_transaction.trace_id, 16),
span_id=int(self.nr_trace.guid, 16),
is_remote=self._remote(),
trace_flags=otel_api_trace.TraceFlags(0x01 if self._sampled() else 0x00),
trace_state=otel_api_trace.TraceState(otel_tracestate_headers),
trace_flags=otel_api_trace.TraceFlags(0x01),
trace_state=otel_api_trace.TraceState(),
)

def set_attribute(self, key, value):
Expand All @@ -193,8 +217,7 @@ def _set_attributes_in_nr(self, otel_attributes=None):

def add_event(self, name, attributes=None, timestamp=None):
# TODO: Not implemented yet.
# We can implement this as a log event
raise NotImplementedError("TODO: We can implement this as a log event.")
raise NotImplementedError("Events are not implemented yet.")

def add_link(self, context=None, attributes=None):
# TODO: Not implemented yet.
Expand All @@ -208,7 +231,14 @@ def update_name(self, name):
self.nr_trace.name = self._name

def is_recording(self):
return self._sampled() and not (getattr(self.nr_trace, None), "end_time", None)
# If the trace has an end time set then it is done recording. Otherwise,
# if it does not have an end time set and the transaction's priority
# has not been set yet or it is set to something other than 0 then it
# is also still recording.
if getattr(self.nr_trace, "end_time", None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where a span has been created but recording hasn't started yet that we need to handle here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes--In most of the OTel library instrumentations, span.is_recording() is called almost immediately after creation. This is an example of that happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, it seems pretty common for it to call is_recording() immediately after the creation of the span and right before the end of the span in the framework instrumentation.

return False

return getattr(self.nr_transaction, "priority", 1) > 0

def set_status(self, status, description=None):
# TODO: not implemented yet
Expand Down Expand Up @@ -277,6 +307,10 @@ def start_span(
self.nr_application = application_instance()
self.attributes = attributes or {}

if not self.nr_application.active:
# Force application registration if not already active
self.nr_application.activate()

if not self.nr_application.settings.otel_bridge.enabled:
return otel_api_trace.INVALID_SPAN

Expand All @@ -286,7 +320,12 @@ def start_span(
if parent_span_context is None or not parent_span_context.is_valid:
parent_span_context = None

parent_span_trace_id = None
if parent_span_context and self.nr_application.settings.distributed_tracing.enabled:
parent_span_trace_id = parent_span_context.trace_id

# If remote_parent, transaction must be created, regardless of kind type
# Make sure we transfer DT headers when we are here, if DT is enabled
if parent_span_context and parent_span_context.is_remote:
if kind in (otel_api_trace.SpanKind.SERVER, otel_api_trace.SpanKind.CLIENT):
# This is a web request
Expand All @@ -296,6 +335,7 @@ def start_span(
port = self.attributes.get("net.host.port")
request_method = self.attributes.get("http.method")
request_path = self.attributes.get("http.route")

transaction = WebTransaction(
self.nr_application,
name=name,
Expand All @@ -306,6 +346,7 @@ def start_span(
request_path=request_path,
headers=headers,
)

elif kind in (otel_api_trace.SpanKind.PRODUCER, otel_api_trace.SpanKind.INTERNAL):
transaction = BackgroundTask(self.nr_application, name=name)
elif kind == otel_api_trace.SpanKind.CONSUMER:
Expand All @@ -315,7 +356,7 @@ def start_span(
destination_name=name,
application=self.nr_application,
transport_type=self.instrumentation_library,
headers=headers,
headers=None,
)

transaction.__enter__()
Expand Down Expand Up @@ -348,6 +389,11 @@ def start_span(
request_path=request_path,
headers=headers,
)

transaction._trace_id = (
f"{parent_span_trace_id:x}" if parent_span_trace_id else transaction.trace_id
)

transaction.__enter__()
elif kind == otel_api_trace.SpanKind.INTERNAL:
if transaction:
Expand All @@ -372,7 +418,7 @@ def start_span(
destination_name=name,
application=self.nr_application,
transport_type=self.instrumentation_library,
headers=headers,
headers=None,
)
transaction.__enter__()
elif kind == otel_api_trace.SpanKind.PRODUCER:
Expand Down
10 changes: 10 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4428,6 +4428,16 @@ def _process_module_builtin_defaults():
)

# Hybrid Agent Hooks
_process_module_definition(
"opentelemetry.context", "newrelic.hooks.hybridagent_opentelemetry", "instrument_context_api"
)

_process_module_definition(
"opentelemetry.instrumentation.propagators",
"newrelic.hooks.hybridagent_opentelemetry",
"instrument_global_propagators_api",
)

_process_module_definition(
"opentelemetry.trace", "newrelic.hooks.hybridagent_opentelemetry", "instrument_trace_api"
)
Expand Down
Loading
Loading