Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 51 additions & 39 deletions Sources/Valkey/Connection/ValkeyConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,55 +164,67 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
self.channelHandler.triggerGracefulShutdown()
}

/// Send RESP command to Valkey connection
/// - Parameter command: ValkeyCommand structure
/// - Returns: The command response as defined in the ValkeyCommand
@inlinable
public func execute<Command: ValkeyCommand>(_ command: Command) async throws -> Command.Response {
@inlinable func withTracingSupport<Value>(
name: String,
operation: () async throws -> Value,
updateAttributes: (inout SpanAttributes) -> Void
) async throws -> Value {
#if DistributedTracingSupport
let span = self.tracer?.startSpan(Command.name, ofKind: .client)
defer { span?.end() }

span?.updateAttributes { attributes in
self.applyCommonAttributes(to: &attributes, commandName: Command.name)
}
#endif

self.logger.trace("execute", metadata: ["command": "\(Command.name)"])
if let span = self.tracer?.startSpan(name, ofKind: .client) {
defer { span.end() }

let requestID = Self.requestIDGenerator.next()

do {
let token = try await withTaskCancellationHandler {
if Task.isCancelled {
throw ValkeyClientError(.cancelled)
}
return try await withCheckedThrowingContinuation { continuation in
self.channelHandler.write(command: command, continuation: continuation, requestID: requestID)
}
} onCancel: {
self.cancel(requestID: requestID)
}
return try .init(token)
} catch let error as ValkeyClientError {
#if DistributedTracingSupport
if let span {
span.updateAttributes(updateAttributes)
do {
return try await operation()
} catch let error as ValkeyClientError {
#if DistributedTracingSupport
span.recordError(error)
span.setStatus(SpanStatus(code: .error))
if let prefix = error.simpleErrorPrefix {
span.attributes["db.response.status_code"] = "\(prefix)"
}
}
#endif
throw error
} catch {
#if DistributedTracingSupport
if let span {
#endif
throw error
} catch {
#if DistributedTracingSupport
span.recordError(error)
span.setStatus(SpanStatus(code: .error))
#endif
throw error
}
#endif
throw error
} else {
return try await operation()
}
#else
return try await operation()
#endif
}

/// Send RESP command to Valkey connection
/// - Parameter command: ValkeyCommand structure
/// - Returns: The command response as defined in the ValkeyCommand
@inlinable
public func execute<Command: ValkeyCommand>(_ command: Command) async throws -> Command.Response {
try await withTracingSupport(name: Command.name) {
self.logger.trace("execute", metadata: ["command": "\(Command.name)"])

let requestID = Self.requestIDGenerator.next()

do {
let token = try await withTaskCancellationHandler {
if Task.isCancelled {
throw ValkeyClientError(.cancelled)
}
return try await withCheckedThrowingContinuation { continuation in
self.channelHandler.write(command: command, continuation: continuation, requestID: requestID)
}
} onCancel: {
self.cancel(requestID: requestID)
}
return try .init(token)
}
} updateAttributes: { attributes in
self.applyCommonAttributes(to: &attributes, commandName: Command.name)
}
}

Expand Down
Loading