Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ ThisBuild / libraryDependencySchemes +=
// This is used in a couple places
lazy val fs2Version = "3.13.0-M8"
lazy val openTelemetryVersion = "1.55.0"
lazy val otel4sVersion = "0.15-ca28b04-SNAPSHOT"
lazy val otel4sSdkVersion = "0.15-f5df7b3-SNAPSHOT"
lazy val otel4sVersion = "0.16-83b6f7b-SNAPSHOT"
lazy val otel4sSdkVersion = "0.17-c767f1d-SNAPSHOT"
lazy val refinedVersion = "0.11.3"

// Global Settings
Expand Down Expand Up @@ -123,7 +123,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform)
"org.scodec" %%% "scodec-bits" % "1.2.4",
"org.scodec" %%% "scodec-core" % (if (tlIsScala3.value) "2.3.3" else "1.11.11"),
"org.scodec" %%% "scodec-cats" % "1.3.0-RC1",
"org.typelevel" %%% "otel4s-core-trace" % otel4sVersion,
"org.typelevel" %%% "otel4s-core" % otel4sVersion,
"org.typelevel" %%% "otel4s-semconv" % otel4sVersion,
"org.typelevel" %%% "otel4s-semconv-metrics" % otel4sVersion,
"org.tpolecat" %%% "sourcepos" % "1.2.0",
"org.typelevel" %%% "twiddles-core" % "1.0.0-RC2",
) ++ Seq(
Expand Down
2 changes: 2 additions & 0 deletions modules/bench/src/main/scala/skunk/bench/SelectBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import skunk.codec.all._
import java.sql.DriverManager
import org.openjdk.jmh.annotations._
import org.typelevel.otel4s.trace.Tracer
import org.typelevel.otel4s.metrics.Meter

@State(Scope.Benchmark)
object SelectBenchScope {
implicit val tracer: Tracer[IO] = Tracer.noop[IO]
implicit val meter: Meter[IO] = Meter.noop[IO]

val defaultChunkSize = 512

Expand Down
27 changes: 16 additions & 11 deletions modules/core/shared/src/main/scala/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import fs2.concurrent.Signal
import fs2.io.net.{ Network, Socket, SocketOption }
import fs2.Pipe
import fs2.Stream
import org.typelevel.otel4s.metrics.Meter
import org.typelevel.otel4s.metrics.Histogram
import org.typelevel.otel4s.trace.Tracer
import skunk.codec.all.bool
import skunk.data._
Expand Down Expand Up @@ -476,7 +478,7 @@ object Session {
* @param queryCacheSize size of the session-level cache for query checking; defaults to 2048
* @param parseCacheSize size of the pool-level cache for parsing statements; defaults to 2048
*/
final class Builder[F[_]: Temporal: Network: Console] private (
final class Builder[F[_]: Temporal: Meter: Network: Console] private (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if we should just take a Tracer here too and drop support for explicitly passing a Tracer? Or alternatively, should we support explicitly passing a Meter using a similar pattern? Explicit passing was added back when tracing was done via Natchez: #650

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the implicit Tracer is the more idiomatic way of doing it in otel4s. This could be done in a follow-up PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep for sure

Copy link
Contributor

@iRevive iRevive Jan 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to use MeterProvider so you can explicitly define and control the instrumentation scope. Then, you can create a Meter inside the pooled / single:

val meter: F[Meter] = MeterProvider[F].tracer("skunk").withVersion(BuildInfo.version).get

OpenTelemetry generally recommends that libraries own their instrumentation scopes.
We follow a similar approach in http4s-otel4s-middleware or fs2-grpc-otel4s.

If possible, I would do the same with Tracer.


If users don't need tracing or metering capabilities, they can use given MeterProvider[F] = MeterProvider.noop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to do that in a follow up PR. Same for TraceProvider, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that works for you, We can integrate the metrics and then I can do a cleanup PR with TracerProvider and MeterProvider, and get rid of the explicit Tracer API. TO avoid blowing up the size of this PR. I'd like also to follow-up with metrics for the pool, as per the semantic convention.

val connectionType: ConnectionType,
val host: Host,
val port: Port,
Expand Down Expand Up @@ -645,13 +647,15 @@ object Session {
for {
dc <- Resource.eval(Describe.Cache.empty[F](commandCacheSize, queryCacheSize))
sslOp <- ssl.toSSLNegotiationOptions(if (debug) logger.some else none)
pool <- Pool.ofF({implicit T: Tracer[F] => sessions(sslOp, dc)}, max)(Recyclers.full)
opDuration <- Resource.eval(Otel.OpDurationHistogram[F])
pool <- Pool.ofF({implicit T: Tracer[F] => sessions(sslOp, dc, opDuration)}, max)(Recyclers.full)
} yield pool
}

private def sessions(
sslOptions: Option[SSLNegotiation.Options[F]],
describeCache: Describe.Cache[F]
describeCache: Describe.Cache[F],
opDuration: Histogram[F, Double]
)(implicit T: Tracer[F]): Resource[F, Session[F]] = {
val sockets = connectionType match {
case ConnectionType.TCP =>
Expand All @@ -663,18 +667,19 @@ object Session {
val filteredSocketOptions = socketOptions.filter(o => o.key != SocketOption.NoDelay)
Network[F].connect(address, filteredSocketOptions)
}
fromSockets(sockets, sslOptions, describeCache)
fromSockets(sockets, sslOptions, describeCache, opDuration)
}

private def fromSockets(
sockets: Resource[F, Socket[F]],
sslOptions: Option[SSLNegotiation.Options[F]],
describeCache: Describe.Cache[F]
describeCache: Describe.Cache[F],
opDuration: Histogram[F, Double]
)(implicit T: Tracer[F]): Resource[F, Session[F]] =
for {
namer <- Resource.eval(Namer[F])
pc <- Resource.eval(Parse.Cache.empty[F](parseCacheSize))
proto <- Protocol[F](debug, namer, sockets, sslOptions, describeCache, pc, readTimeout, redactionStrategy)
proto <- Protocol[F](debug, namer, sockets, sslOptions, describeCache, pc, readTimeout, redactionStrategy, opDuration)
creds <- Resource.eval(credentials)
_ <- Resource.eval(proto.startup(creds.user, database.getOrElse(creds.user), creds.password, connectionParameters))
sess <- Resource.make(fromProtocol(proto, namer, typingStrategy, redactionStrategy))(_ => proto.cleanup)
Expand All @@ -701,7 +706,7 @@ object Session {
}}}
*/
object Builder {
def apply[F[_]: Temporal: Network: Console]: Builder[F] =
def apply[F[_]: Temporal: Meter: Network: Console]: Builder[F] =
new Builder[F](
connectionType = ConnectionType.TCP,
host = host"localhost",
Expand Down Expand Up @@ -748,7 +753,7 @@ object Session {
* @group Constructors
*/
@deprecated("1.0.0-M11", "Use Session.Builder[F].pooled instead")
def pooled[F[_]: Temporal: Tracer: Network: Console](
def pooled[F[_]: Temporal: Tracer: Meter: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down Expand Up @@ -806,7 +811,7 @@ object Session {
* @group Constructors
*/
@deprecated("1.0.0-M11", "Use Session.Builder[F].pooledExplicitTracer instead")
def pooledF[F[_]: Temporal: Network: Console](
def pooledF[F[_]: Temporal: Meter: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down Expand Up @@ -848,7 +853,7 @@ object Session {
* @see pooled
*/
@deprecated("1.0.0-M11", "Use Session.Builder[F].single instead")
def single[F[_]: Temporal: Tracer: Network: Console](
def single[F[_]: Temporal: Tracer: Meter: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down Expand Up @@ -887,7 +892,7 @@ object Session {
* @see pooledF
*/
@deprecated("1.0.0-M11", "Use Session.Builder[F].singleExplicitTracer instead")
def singleF[F[_]: Temporal: Network: Console](
def singleF[F[_]: Temporal: Meter: Network: Console](
host: String,
port: Int = 5432,
user: String,
Expand Down
37 changes: 20 additions & 17 deletions modules/core/shared/src/main/scala/net/Protocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import skunk.net.protocol.Describe
import scala.concurrent.duration.Duration
import skunk.net.protocol.Exchange
import skunk.net.protocol.Parse
import org.typelevel.otel4s.metrics.Histogram

/**
* Interface for a Postgres database, expressed through high-level operations that rely on exchange
Expand Down Expand Up @@ -216,19 +217,21 @@ object Protocol {
describeCache: Describe.Cache[F],
parseCache: Parse.Cache[F],
readTimeout: Duration,
redactionStrategy: RedactionStrategy
redactionStrategy: RedactionStrategy,
opDuration: Histogram[F, Double]
): Resource[F, Protocol[F]] =
for {
bms <- BufferedMessageSocket[F](256, debug, sockets, sslOptions, readTimeout) // TODO: should we expose the queue size?
p <- Resource.eval(fromMessageSocket(bms, nam, describeCache, parseCache, redactionStrategy))
p <- Resource.eval(fromMessageSocket(bms, nam, describeCache, parseCache, redactionStrategy, opDuration))
} yield p

def fromMessageSocket[F[_]: Concurrent: Tracer](
bms: BufferedMessageSocket[F],
nam: Namer[F],
dc: Describe.Cache[F],
pc: Parse.Cache[F],
redactionStrategy: RedactionStrategy
redactionStrategy: RedactionStrategy,
opDuration: Histogram[F, Double]
): F[Protocol[F]] =
Exchange[F].map { ex =>
new Protocol[F] {
Expand All @@ -246,50 +249,50 @@ object Protocol {
bms.parameters

override def prepare[A](command: Command[A], ty: Typer): F[PreparedCommand[F, A]] =
protocol.Prepare[F](describeCache, parseCache, redactionStrategy).apply(command, ty)
protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(command, ty)

override def prepare[A, B](query: Query[A, B], ty: Typer): F[PreparedQuery[F, A, B]] =
protocol.Prepare[F](describeCache, parseCache, redactionStrategy).apply(query, ty)
protocol.Prepare[F](describeCache, parseCache, redactionStrategy, opDuration).apply(query, ty)

override def prepareR[A](command: Command[A], ty: Typer): Resource[F, Protocol.PreparedCommand[F, A]] = {
val acquire = Parse.Cache.empty[F](1).flatMap { pc =>
protocol.Prepare[F](describeCache, pc, redactionStrategy).apply(command, ty)
protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(command, ty)
}
Resource.make(acquire)(pc => protocol.Close[F].apply(pc.id))
Resource.make(acquire)(pc => protocol.Close[F](opDuration).apply(pc.id))
}

override def prepareR[A, B](query: Query[A, B], ty: Typer): Resource[F, Protocol.PreparedQuery[F, A, B]] = {
val acquire = Parse.Cache.empty[F](1).flatMap { pc =>
protocol.Prepare[F](describeCache, pc, redactionStrategy).apply(query, ty)
protocol.Prepare[F](describeCache, pc, redactionStrategy, opDuration).apply(query, ty)
}
Resource.make(acquire)(pq => protocol.Close[F].apply(pq.id))
Resource.make(acquire)(pq => protocol.Close[F](opDuration).apply(pq.id))
}

override def execute(command: Command[Void]): F[Completion] =
protocol.Query[F](redactionStrategy).apply(command)
protocol.Query[F](redactionStrategy, opDuration).apply(command)

override def execute[B](query: Query[Void, B], ty: Typer): F[List[B]] =
protocol.Query[F](redactionStrategy).apply(query, ty)
protocol.Query[F](redactionStrategy, opDuration).apply(query, ty)

override def executeDiscard(statement: Statement[Void]): F[Unit] = protocol.Query[F](redactionStrategy).applyDiscard(statement)
override def executeDiscard(statement: Statement[Void]): F[Unit] = protocol.Query[F](redactionStrategy, opDuration).applyDiscard(statement)

override def startup(user: String, database: String, password: Option[String], parameters: Map[String, String]): F[Unit] =
protocol.Startup[F].apply(user, database, password, parameters)
protocol.Startup[F](opDuration).apply(user, database, password, parameters)

override def cleanup: F[Unit] =
parseCache.value.values.flatMap(_.traverse_(protocol.Close[F].apply))
parseCache.value.values.flatMap(_.traverse_(protocol.Close[F](opDuration).apply))

override def transactionStatus: Signal[F, TransactionStatus] =
bms.transactionStatus

override val describeCache: Describe.Cache[F] =
dc

override val parseCache: Parse.Cache[F] =
pc
pc

override def closeEvictedPreparedStatements: F[Unit] =
pc.value.clearEvicted.flatMap(_.traverse_(protocol.Close[F].apply))
pc.value.clearEvicted.flatMap(_.traverse_(protocol.Close[F](opDuration).apply))
}
}

Expand Down
11 changes: 6 additions & 5 deletions modules/core/shared/src/main/scala/net/protocol/Bind.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package skunk.net.protocol

import cats.effect.MonadCancel
import cats.effect.Resource
import cats.syntax.all._
import cats.MonadError
import skunk.exception.PostgresErrorException
import skunk.net.message.{ Bind => BindMessage, Close => _, _ }
import skunk.net.MessageSocket
Expand All @@ -15,6 +15,7 @@ import skunk.util.{ Origin, Namer }
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.trace.{Span, Tracer}
import skunk.RedactionStrategy
import org.typelevel.otel4s.metrics.Histogram

trait Bind[F[_]] {

Expand All @@ -29,8 +30,8 @@ trait Bind[F[_]] {

object Bind {

def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](
implicit ev: MonadError[F, Throwable]
def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](opDuration: Histogram[F, Double])(
implicit ev: MonadCancel[F, Throwable]
): Bind[F] =
new Bind[F] {

Expand All @@ -41,7 +42,7 @@ object Bind {
redactionStrategy: RedactionStrategy
): Resource[F, PortalId] =
Resource.make {
exchange("bind") { (span: Span[F]) =>
exchange("bind", opDuration) { (span: Span[F]) =>
for {
pn <- nextName("portal").map(PortalId(_))
ea = statement.statement.encoder.encode(args) // encoded args
Expand Down Expand Up @@ -70,7 +71,7 @@ object Bind {
}
} yield pn
}
} { Close[F].apply }
} { Close[F](opDuration).apply }

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import skunk.net.Protocol
import skunk.data.Completion
import skunk.net.protocol.exchange
import cats.effect.kernel.Deferred
import org.typelevel.otel4s.metrics.Histogram

trait BindExecute[F[_]] {

Expand All @@ -41,7 +42,7 @@ trait BindExecute[F[_]] {

object BindExecute {

def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](
def apply[F[_]: Exchange: MessageSocket: Namer: Tracer](opDuration: Histogram[F, Double])(
implicit ev: Concurrent[F]
): BindExecute[F] =
new Unroll[F] with BindExecute[F] {
Expand Down Expand Up @@ -133,7 +134,7 @@ object BindExecute {
}

Resource.make {
exchange("bind+execute"){ (span: Span[F]) =>
exchange("bind+execute", opDuration){ (span: Span[F]) =>
for {
pn <- preBind(span)
_ <- send(ExecuteMessage(pn.value, 0))
Expand All @@ -144,7 +145,7 @@ object BindExecute {
def execute: F[Completion] = c.pure
}
}
} { portal => Close[F].apply(portal.id)}
} { portal => Close[F](opDuration).apply(portal.id)}

}

Expand All @@ -158,7 +159,7 @@ object BindExecute {
val (preBind, postBind) = bindExchange(statement, args, argsOrigin, redactionStrategy)
Resource.eval(Deferred[F, Unit]).flatMap { prefetch =>
Resource.make {
exchange("bind+execute"){ (span: Span[F]) =>
exchange("bind+execute", opDuration){ (span: Span[F]) =>
for {
pn <- preBind(span)
_ <- span.addAttributes(
Expand All @@ -173,11 +174,11 @@ object BindExecute {
def execute(maxRows: Int): F[List[B] ~ Boolean] =
prefetch.tryGet.flatMap {
case None => rs.pure <* prefetch.complete(())
case Some(()) => Execute[F].apply(this, maxRows)
case Some(()) => Execute[F](opDuration).apply(this, maxRows)
}
}
}
} { portal => Close[F].apply(portal.id)}
} { portal => Close[F](opDuration).apply(portal.id)}
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions modules/core/shared/src/main/scala/net/protocol/Close.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
package skunk.net
package protocol

import cats.FlatMap
import cats.effect.MonadCancelThrow
import cats.syntax.all._
import skunk.net.message.{ Close => CloseMessage, Flush, CloseComplete }
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.trace.Span
import org.typelevel.otel4s.trace.Tracer
import org.typelevel.otel4s.metrics.Histogram

trait Close[F[_]] {
def apply(portalId: Protocol.PortalId): F[Unit]
Expand All @@ -19,17 +20,17 @@ trait Close[F[_]] {

object Close {

def apply[F[_]: FlatMap: Exchange: MessageSocket: Tracer]: Close[F] =
def apply[F[_]: MonadCancelThrow: Exchange: MessageSocket: Tracer](opDuration: Histogram[F, Double]): Close[F] =
new Close[F] {

override def apply(portalId: Protocol.PortalId): F[Unit] =
exchange("close-portal") { (span: Span[F]) =>
exchange("close-portal", opDuration) { (span: Span[F]) =>
span.addAttribute(Attribute("portal", portalId.value)) *>
close(CloseMessage.portal(portalId.value))
}

override def apply(statementId: Protocol.StatementId): F[Unit] =
exchange("close-statement") { (span: Span[F]) =>
exchange("close-statement", opDuration) { (span: Span[F]) =>
span.addAttribute(Attribute("statement", statementId.value)) *>
close(CloseMessage.statement(statementId.value))
}
Expand Down
Loading