diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index fbd96378..fc27af6b 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -164,55 +164,67 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { self.channelHandler.triggerGracefulShutdown() } - /// Send RESP command to Valkey connection - /// - Parameter command: ValkeyCommand structure - /// - Returns: The command response as defined in the ValkeyCommand - @inlinable - public func execute(_ command: Command) async throws -> Command.Response { + @inlinable func withTracingSupport( + name: String, + operation: () async throws -> Value, + updateAttributes: (inout SpanAttributes) -> Void + ) async throws -> Value { #if DistributedTracingSupport - let span = self.tracer?.startSpan(Command.name, ofKind: .client) - defer { span?.end() } - - span?.updateAttributes { attributes in - self.applyCommonAttributes(to: &attributes, commandName: Command.name) - } - #endif - - self.logger.trace("execute", metadata: ["command": "\(Command.name)"]) + if let span = self.tracer?.startSpan(name, ofKind: .client) { + defer { span.end() } - let requestID = Self.requestIDGenerator.next() - - do { - let token = try await withTaskCancellationHandler { - if Task.isCancelled { - throw ValkeyClientError(.cancelled) - } - return try await withCheckedThrowingContinuation { continuation in - self.channelHandler.write(command: command, continuation: continuation, requestID: requestID) - } - } onCancel: { - self.cancel(requestID: requestID) - } - return try .init(token) - } catch let error as ValkeyClientError { - #if DistributedTracingSupport - if let span { + span.updateAttributes(updateAttributes) + do { + return try await operation() + } catch let error as ValkeyClientError { + #if DistributedTracingSupport span.recordError(error) span.setStatus(SpanStatus(code: .error)) if let prefix = error.simpleErrorPrefix { span.attributes["db.response.status_code"] = "\(prefix)" } - } - #endif - throw error - } catch { - #if DistributedTracingSupport - if let span { + #endif + throw error + } catch { + #if DistributedTracingSupport span.recordError(error) span.setStatus(SpanStatus(code: .error)) + #endif + throw error } - #endif - throw error + } else { + return try await operation() + } + #else + return try await operation() + #endif + } + + /// Send RESP command to Valkey connection + /// - Parameter command: ValkeyCommand structure + /// - Returns: The command response as defined in the ValkeyCommand + @inlinable + public func execute(_ command: Command) async throws -> Command.Response { + try await withTracingSupport(name: Command.name) { + self.logger.trace("execute", metadata: ["command": "\(Command.name)"]) + + let requestID = Self.requestIDGenerator.next() + + do { + let token = try await withTaskCancellationHandler { + if Task.isCancelled { + throw ValkeyClientError(.cancelled) + } + return try await withCheckedThrowingContinuation { continuation in + self.channelHandler.write(command: command, continuation: continuation, requestID: requestID) + } + } onCancel: { + self.cancel(requestID: requestID) + } + return try .init(token) + } + } updateAttributes: { attributes in + self.applyCommonAttributes(to: &attributes, commandName: Command.name) } }