diff --git a/build.sbt b/build.sbt index bbcdcab..3e485c9 100644 --- a/build.sbt +++ b/build.sbt @@ -38,12 +38,14 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) "org.typelevel" %%% "cats-core" % catsV, "org.typelevel" %%% "cats-effect" % catsEffectV, "org.http4s" %%% "http4s-client" % http4sV, + "org.typelevel" %%% "otel4s-core-metrics" % otel4sV, "org.typelevel" %%% "otel4s-core-trace" % otel4sV, "org.typelevel" %%% "otel4s-semconv" % otel4sV, - "org.typelevel" %%% "otel4s-sdk-trace-testkit" % otel4sV % Test, + "org.typelevel" %%% "otel4s-sdk-testkit" % otel4sV % Test, "org.typelevel" %%% "cats-effect-testkit" % catsEffectV % Test, "org.typelevel" %%% "munit-cats-effect" % munitCatsEffectV % Test, "org.scalameta" %%% "munit" % munitV % Test, + "org.http4s" %%% "http4s-server" % http4sV % Test, ), ) diff --git a/core/src/main/scala/org/http4s/otel4s/middleware/OtelMetrics.scala b/core/src/main/scala/org/http4s/otel4s/middleware/OtelMetrics.scala new file mode 100644 index 0000000..fa4a3d1 --- /dev/null +++ b/core/src/main/scala/org/http4s/otel4s/middleware/OtelMetrics.scala @@ -0,0 +1,235 @@ +/* + * Copyright 2023 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.otel4s.middleware + +import cats.Monad +import cats.syntax.all._ +import org.http4s.Method +import org.http4s.Status +import org.http4s.metrics.MetricsOps +import org.http4s.metrics.TerminationType +import org.typelevel.otel4s.Attribute +import org.typelevel.otel4s.AttributeKey +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics._ +import org.typelevel.otel4s.semconv.attributes.ErrorAttributes + +/** [[http4s.metrics.MetricsOps]] algebra capable of recording OpenTelemetry metrics + */ +object OtelMetrics { + + /** Creates a [[http4s.metrics.MetricsOps]] for clients that supports OpenTelemetry metrics. + * + * Registers the following metrics: + * + * http.client.request.duration - Histogram + * + * http.client.active_requests - UpDownCounter + * + * http.client.abnormal_terminations - Histogram + * + * https://opentelemetry.io/docs/specs/semconv/http/http-metrics/ + * + * @param attributes additional [[org.typelevel.otel4s.Attributes]] that are added to all metrics + * @param responseDurationSecondsHistogramBuckets histogram buckets for the response duration metrics + */ + def clientMetricsOps[F[_]: Monad: Meter]( + attributes: Attributes = Attributes.empty, + responseDurationSecondsHistogramBuckets: BucketBoundaries = DefaultHistogramBuckets, + ): F[MetricsOps[F]] = + metricsOps( + "http.client", + attributes, + responseDurationSecondsHistogramBuckets, + ) + + /** Creates a [[http4s.metrics.MetricsOps]] for servers that supports OpenTelemetry metrics. + * + * Registers the following metrics: + * + * http.server.request.duration - Histogram + * + * http.server.active_requests - UpDownCounter + * + * http.server.abnormal_terminations - Histogram + * + * https://opentelemetry.io/docs/specs/semconv/http/http-metrics/ + * + * @param attributes additional [[org.typelevel.otel4s.Attributes]] that are added to all metrics + * @param responseDurationSecondsHistogramBuckets histogram buckets for the response duration metrics + */ + def serverMetricsOps[F[_]: Monad: Meter]( + attributes: Attributes = Attributes.empty, + responseDurationSecondsHistogramBuckets: BucketBoundaries = DefaultHistogramBuckets, + ): F[MetricsOps[F]] = + metricsOps( + "http.server", + attributes, + responseDurationSecondsHistogramBuckets, + ) + + private def metricsOps[F[_]: Monad: Meter]( + prefix: String, + attributes: Attributes, + responseDurationSecondsHistogramBuckets: BucketBoundaries, + ): F[MetricsOps[F]] = + for { + metrics <- createMetricsCollection( + prefix, + responseDurationSecondsHistogramBuckets, + ) + } yield createMetricsOps( + metrics, + attributes, + ) + + private def createMetricsOps[F[_]]( + metrics: MetricsCollection[F], + attributes: Attributes, + ): MetricsOps[F] = + new MetricsOps[F] { + override def increaseActiveRequests(classifier: Option[String]): F[Unit] = + metrics.activeRequests + .inc( + attributes + .added(TypedMetricAttributes.classifier(classifier)) + ) + + override def decreaseActiveRequests(classifier: Option[String]): F[Unit] = + metrics.activeRequests + .dec( + attributes + .added(TypedMetricAttributes.classifier(classifier)) + ) + + override def recordHeadersTime( + method: Method, + elapsed: Long, + classifier: Option[String], + ): F[Unit] = + metrics.requestDuration + .record( + secondsFromNanos(elapsed), + attributes + .added(TypedMetricAttributes.classifier(classifier)) + .added(TypedAttributes.httpRequestMethod(method)) + .added(TypedMetricAttributes.httpPhase(Phase.Headers)), + ) + + override def recordTotalTime( + method: Method, + status: Status, + elapsed: Long, + classifier: Option[String], + ): F[Unit] = + metrics.requestDuration + .record( + secondsFromNanos(elapsed), + attributes + .added(TypedMetricAttributes.classifier(classifier)) + .added(TypedAttributes.httpRequestMethod(method)) + .added(TypedAttributes.httpResponseStatusCode(status)) + .added(TypedMetricAttributes.httpPhase(Phase.Body)), + ) + + override def recordAbnormalTermination( + elapsed: Long, + terminationType: TerminationType, + classifier: Option[String], + ): F[Unit] = + metrics.abnormalTerminations + .record( + secondsFromNanos(elapsed), + attributes + .added(TypedMetricAttributes.classifier(classifier)) + .added(TypedMetricAttributes.errorType(terminationType)), + ) + + private def secondsFromNanos(nanos: Long): Double = + nanos / 1000000000.0 + } + + private def createMetricsCollection[F[_]: Monad: Meter]( + prefix: String, + responseDurationSecondsHistogramBuckets: BucketBoundaries, + ): F[MetricsCollection[F]] = { + val requestDuration: F[Histogram[F, Double]] = + Meter[F] + .histogram[Double](s"$prefix.request.duration") + .withUnit("s") + .withDescription("Duration of HTTP server requests.") + .withExplicitBucketBoundaries(responseDurationSecondsHistogramBuckets) + .create + + val activeRequests: F[UpDownCounter[F, Long]] = + Meter[F] + .upDownCounter[Long](s"$prefix.active_requests") + .withUnit("{request}") + .withDescription("Number of active HTTP server requests.") + .create + + val abnormalTerminations: F[Histogram[F, Double]] = + Meter[F] + .histogram[Double](s"$prefix.abnormal_terminations") + .withUnit("s") + .withDescription("Duration of HTTP server abnormal terminations.") + .withExplicitBucketBoundaries(responseDurationSecondsHistogramBuckets) + .create + + (requestDuration, activeRequests, abnormalTerminations).mapN(MetricsCollection.apply) + } + + private val DefaultHistogramBuckets: BucketBoundaries = + BucketBoundaries(Vector(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10)) + + final case class MetricsCollection[F[_]]( + requestDuration: Histogram[F, Double], + activeRequests: UpDownCounter[F, Long], + abnormalTerminations: Histogram[F, Double], + ) + + private sealed trait Phase + + private object Phase { + case object Headers extends Phase + + case object Body extends Phase + } + + private object TypedMetricAttributes { + private val Classifier: AttributeKey[String] = AttributeKey.string("classifier") + + def classifier(string: Option[String]): Attribute[String] = + Classifier(string.getOrElse("")) + + private val HttpPhase: AttributeKey[String] = AttributeKey.string("http.phase") + + def httpPhase(s: Phase): Attribute[String] = + HttpPhase(s match { + case Phase.Headers => "headers" + case Phase.Body => "body" + }) + + def errorType(terminationType: TerminationType): Attribute[String] = + ErrorAttributes.ErrorType(terminationType match { + case TerminationType.Abnormal(e) => e.getClass.getName + case TerminationType.Error(e) => e.getClass.getName + case TerminationType.Canceled => "cancel" + case TerminationType.Timeout => "timeout" + }) + } +} diff --git a/core/src/test/scala/org/http4s/otel4s/middleware/OtelMetricsTests.scala b/core/src/test/scala/org/http4s/otel4s/middleware/OtelMetricsTests.scala new file mode 100644 index 0000000..877d09a --- /dev/null +++ b/core/src/test/scala/org/http4s/otel4s/middleware/OtelMetricsTests.scala @@ -0,0 +1,104 @@ +/* + * Copyright 2023 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.otel4s.middleware + +import cats.data.OptionT +import cats.effect.IO +import munit.CatsEffectSuite +import org.http4s._ +import org.http4s.server.middleware.Metrics +import org.typelevel.otel4s.Attributes +import org.typelevel.otel4s.metrics.Meter +import org.typelevel.otel4s.sdk.metrics.data.MetricPoints +import org.typelevel.otel4s.sdk.metrics.data.PointData +import org.typelevel.otel4s.sdk.testkit.metrics.MetricsTestkit + +class OtelMetricsTests extends CatsEffectSuite { + test("OtelMetrics") { + MetricsTestkit + .inMemory[IO]() + .use { testkit => + for { + meterIO <- testkit.meterProvider.get("meter") + metricsOps <- { + implicit val meter: Meter[IO] = meterIO + OtelMetrics.serverMetricsOps[IO]() + } + _ <- { + val fakeServer = + HttpRoutes[IO](e => OptionT.liftF(e.body.compile.drain.as(Response[IO](Status.Ok)))) + val meteredServer = Metrics[IO](metricsOps)(fakeServer) + + meteredServer + .run(Request[IO](Method.GET)) + .semiflatMap(_.body.compile.drain) + .value + } + metrics <- testkit.collectMetrics + } yield { + def attributes(attrs: Attributes): Map[String, String] = + attrs.map(a => a.key.name -> a.value.toString).toMap + + val activeRequestsDataPoints: Map[Map[String, String], Long] = + metrics + .find(_.name == "http.server.active_requests") + .map(_.data) + .collect { case sum: MetricPoints.Sum => + sum.points.toVector.collect { case long: PointData.LongNumber => + attributes(long.attributes) -> long.value + }.toMap + } + .getOrElse(Map.empty) + + val requestDurationDataPoints: Map[Map[String, String], Long] = + metrics + .find(_.name == "http.server.request.duration") + .map(_.data) + .collect { case histogram: MetricPoints.Histogram => + histogram.points.toVector + .map(e => attributes(e.attributes) -> e.stats.map(_.count).getOrElse(0L)) + .toMap + } + .getOrElse(Map.empty) + + assertEquals( + activeRequestsDataPoints, + Map( + Map("classifier" -> "") -> 0L + ), + ) + + assertEquals( + requestDurationDataPoints, + Map( + Map( + "classifier" -> "", + "http.phase" -> "headers", + "http.request.method" -> "GET", + ) -> 1L, + Map( + "classifier" -> "", + "http.phase" -> "body", + "http.request.method" -> "GET", + "http.response.status_code" -> "200", + ) -> 1L, + ), + ) + } + } + } +} diff --git a/examples/src/main/scala/example/Http4sExample.scala b/examples/src/main/scala/example/Http4sExample.scala index 632f2ee..e2b465d 100644 --- a/examples/src/main/scala/example/Http4sExample.scala +++ b/examples/src/main/scala/example/Http4sExample.scala @@ -17,15 +17,19 @@ package example import cats.effect._ +import cats.effect.syntax.all._ import com.comcast.ip4s._ import fs2.io.net.Network import org.http4s.ember.client.EmberClientBuilder import org.http4s.ember.server.EmberServerBuilder import org.http4s.implicits._ import org.http4s.otel4s.middleware.ClientMiddleware +import org.http4s.otel4s.middleware.OtelMetrics import org.http4s.otel4s.middleware.ServerMiddleware import org.http4s.server.Server +import org.http4s.server.middleware.Metrics import org.typelevel.otel4s.Otel4s +import org.typelevel.otel4s.metrics.Meter import org.typelevel.otel4s.oteljava.OtelJava import org.typelevel.otel4s.trace.Tracer @@ -50,15 +54,19 @@ object Http4sExample extends IOApp with Common { def tracer[F[_]](otel: Otel4s[F]): F[Tracer[F]] = otel.tracerProvider.tracer("Http4sExample").get + def meter[F[_]](otel: Otel4s[F]): F[Meter[F]] = + otel.meterProvider.meter("Http4sExample").get + // Our main app resource - def server[F[_]: Async: Network: Tracer]: Resource[F, Server] = + def server[F[_]: Async: Network: Tracer: Meter]: Resource[F, Server] = for { client <- EmberClientBuilder .default[F] .build .map(ClientMiddleware.default.build) + metricsOps <- OtelMetrics.serverMetricsOps[F]().toResource app = ServerMiddleware.default[F].buildHttpApp { - routes(client).orNotFound + Metrics(metricsOps)(routes(client)).orNotFound } sv <- EmberServerBuilder.default[F].withPort(port"8080").withHttpApp(app).build } yield sv @@ -69,7 +77,9 @@ object Http4sExample extends IOApp with Common { .autoConfigured[IO]() .flatMap { otel4s => Resource.eval(tracer(otel4s)).flatMap { implicit T: Tracer[IO] => - server[IO] + Resource.eval(meter(otel4s)).flatMap { implicit M: Meter[IO] => + server[IO] + } } } .use(_ => IO.never)