Skip to content

Commit

Permalink
otel semconv
Browse files Browse the repository at this point in the history
  • Loading branch information
lhns committed Apr 2, 2024
1 parent 61ab137 commit f9a8552
Showing 1 changed file with 116 additions and 198 deletions.
314 changes: 116 additions & 198 deletions core/src/main/scala/org/http4s/otel4s/OtelMetrics.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 http4s.org
* Copyright 2023 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,256 +18,174 @@ package org.http4s.otel4s

import cats.effect.Sync
import cats.syntax.all._
import org.http4s.metrics.TerminationType.{Abnormal, Canceled, Error, Timeout}
import org.http4s.metrics.{MetricsOps, TerminationType}
import org.http4s.{Method, Status}
import org.http4s.Method
import org.http4s.Status
import org.http4s.metrics.MetricsOps
import org.http4s.metrics.TerminationType
import org.http4s.otel4s.middleware.TypedAttributes
import org.typelevel.otel4s.Attribute
import org.typelevel.otel4s.AttributeKey
import org.typelevel.otel4s.Attributes
import org.typelevel.otel4s.metrics._

/** [[MetricsOps]] algebra capable of recording OpenTelemetry metrics
*
* Registers the following metrics:
*
* {prefix}.response.duration{labels=classifier,method,phase} - Histogram
*
* {prefix}.active_request.count{labels=classifier} - Gauge
*
* {prefix}.request.count{labels=classifier,method,status} - Counter
*
* {prefix}.abnormal_terminations{labels=classifier,termination_type} - Histogram
*
* Labels --
*
* method: Enumeration values: get, put, post, head, move, options, trace, connect, delete, other
*
* phase: Enumeration values: headers, body
*
* code: Enumeration values: 1xx, 2xx, 3xx, 4xx, 5xx
*
* termination_type: Enumeration values: abnormal, error, timeout
*/
import org.typelevel.otel4s.semconv.attributes.ErrorAttributes

/** [[http4s.metrics.MetricsOps]] algebra capable of recording OpenTelemetry metrics
*
* Registers the following metrics:
*
* http.server.request.duration - Histogram
*
* http.server.active_requests - UpDownCounter
*
* http.server.abnormal_terminations - Histogram
*
* https://opentelemetry.io/docs/specs/semconv/http/http-metrics/
*/
object OtelMetrics {

/** Creates a [[MetricsOps]] that supports OpenTelemetry metrics
*
* @param prefix
* a prefix that will be added to all metrics
*/
def metricsOps[F[_] : Sync : Meter](
prefix: String = "org.http4s.server",
responseDurationSecondsHistogramBuckets: BucketBoundaries = DefaultHistogramBuckets,
): F[MetricsOps[F]] =
/** Creates a [[http4s.metrics.MetricsOps]] that supports OpenTelemetry metrics
*
* @param prefix
* a prefix that will be added to all metrics
*/
def metricsOps[F[_]: Sync: Meter](
attributes: Attributes = Attributes.empty,
responseDurationSecondsHistogramBuckets: BucketBoundaries = DefaultHistogramBuckets,
): F[MetricsOps[F]] =
for {
metrics <- createMetricsCollection(prefix, responseDurationSecondsHistogramBuckets)
} yield createMetricsOps(metrics)
metrics <- createMetricsCollection(responseDurationSecondsHistogramBuckets)
} yield createMetricsOps(metrics, attributes)

private def createMetricsOps[F[_] : Sync](metrics: MetricsCollection[F]): MetricsOps[F] =
private def createMetricsOps[F[_]](
metrics: MetricsCollection[F],
attributes: Attributes,
): MetricsOps[F] =
new MetricsOps[F] {
override def increaseActiveRequests(classifier: Option[String]): F[Unit] =
metrics.activeRequests
.inc(Attribute("classifier", label(classifier)))
.inc(
attributes
.added(TypedMetricAttributes.classifier(classifier))
)

override def decreaseActiveRequests(classifier: Option[String]): F[Unit] =
metrics.activeRequests
.dec(Attribute("classifier", label(classifier)))
.dec(
attributes
.added(TypedMetricAttributes.classifier(classifier))
)

override def recordHeadersTime(
method: Method,
elapsed: Long,
classifier: Option[String]
): F[Unit] =
metrics.responseDuration
method: Method,
elapsed: Long,
classifier: Option[String],
): F[Unit] =
metrics.requestDuration
.record(
secondsFromNanos(elapsed),
Attribute("classifier", label(classifier)),
Attribute("method", reportMethod(method)),
Attribute("phase", Phase.report(Phase.Headers))
attributes
.added(TypedMetricAttributes.classifier(classifier))
.added(TypedAttributes.httpRequestMethod(method))
.added(TypedMetricAttributes.httpPhase(Phase.Headers)),
)

override def recordTotalTime(
method: Method,
status: Status,
elapsed: Long,
classifier: Option[String]
): F[Unit] =
metrics.responseDuration
.record(
secondsFromNanos(elapsed),
Attribute("classifier", label(classifier)),
Attribute("method", reportMethod(method)),
Attribute("phase", Phase.report(Phase.Body))
) *>
metrics.requests
.inc(
Attribute("classifier", label(classifier)),
Attribute("method", reportMethod(method)),
Attribute("status", reportStatus(status))
)

override def recordAbnormalTermination(
elapsed: Long,
terminationType: TerminationType,
classifier: Option[String]
): F[Unit] =
terminationType match {
case Abnormal(e) => recordAbnormal(elapsed, classifier, e)
case Error(e) => recordError(elapsed, classifier, e)
case Canceled => recordCanceled(elapsed, classifier)
case Timeout => recordTimeout(elapsed, classifier)
}

private def recordCanceled(elapsed: Long, classifier: Option[String]): F[Unit] =
metrics.abnormalTerminations
.record(
secondsFromNanos(elapsed),
Attribute("classifier", label(classifier)),
Attribute("termination_type", AbnormalTermination.report(AbnormalTermination.Canceled)),
Attribute("cause", label(Option.empty))
)

private def recordAbnormal(
elapsed: Long,
classifier: Option[String],
cause: Throwable
): F[Unit] =
metrics.abnormalTerminations
method: Method,
status: Status,
elapsed: Long,
classifier: Option[String],
): F[Unit] =
metrics.requestDuration
.record(
secondsFromNanos(elapsed),
Attribute("classifier", label(classifier)),
Attribute("termination_type", AbnormalTermination.report(AbnormalTermination.Abnormal)),
Attribute("cause", label(Option(cause.getClass.getName)))
attributes
.added(TypedMetricAttributes.classifier(classifier))
.added(TypedAttributes.httpRequestMethod(method))
.added(TypedAttributes.httpResponseStatusCode(status))
.added(TypedMetricAttributes.httpPhase(Phase.Body)),
)

private def recordError(
elapsed: Long,
classifier: Option[String],
cause: Throwable
): F[Unit] =
metrics.abnormalTerminations
.record(
secondsFromNanos(elapsed),
Attribute("classifier", label(classifier)),
Attribute("termination_type", AbnormalTermination.report(AbnormalTermination.Error)),
Attribute("cause", label(Option(cause.getClass.getName)))
)

private def recordTimeout(elapsed: Long, classifier: Option[String]): F[Unit] =
override def recordAbnormalTermination(
elapsed: Long,
terminationType: TerminationType,
classifier: Option[String],
): F[Unit] =
metrics.abnormalTerminations
.record(
secondsFromNanos(elapsed),
Attribute("classifier", label(classifier)),
Attribute("termination_type", AbnormalTermination.report(AbnormalTermination.Timeout)),
Attribute("cause", label(Option.empty))
attributes
.added(TypedMetricAttributes.classifier(classifier))
.added(TypedMetricAttributes.errorType(terminationType)),
)

private def secondsFromNanos(nanos: Long): Double =
nanos / 1_000_000_000.0

private def label(value: Option[String]): String = value.getOrElse("")

private def reportStatus(status: Status): String =
status.code match {
case hundreds if hundreds < 200 => "1xx"
case twohundreds if twohundreds < 300 => "2xx"
case threehundreds if threehundreds < 400 => "3xx"
case fourhundreds if fourhundreds < 500 => "4xx"
case _ => "5xx"
}

private def reportMethod(m: Method): String =
m match {
case Method.GET => "get"
case Method.PUT => "put"
case Method.POST => "post"
case Method.PATCH => "patch"
case Method.HEAD => "head"
case Method.MOVE => "move"
case Method.OPTIONS => "options"
case Method.TRACE => "trace"
case Method.CONNECT => "connect"
case Method.DELETE => "delete"
case _ => "other"
}
}

private def createMetricsCollection[F[_] : Sync : Meter](
prefix: String,
responseDurationSecondsHistogramBuckets: BucketBoundaries
): F[MetricsCollection[F]] = {
val responseDuration: F[Histogram[F, Double]] = {
Meter[F]
.histogram[Double](prefix + ".response.duration")
.withUnit("seconds")
.withDescription("Response Duration in seconds.")
.create
nanos / 1000000000.0
}

val activeRequests: F[UpDownCounter[F, Long]] = {
private def createMetricsCollection[F[_]: Sync: Meter](
responseDurationSecondsHistogramBuckets: BucketBoundaries,
): F[MetricsCollection[F]] = {
val requestDuration: F[Histogram[F, Double]] =
Meter[F]
.upDownCounter[Long](prefix + ".active_request.count")
.withDescription("Total Active Requests.")
.histogram[Double]("http.server.request.duration")
.withUnit("s")
.withDescription("Duration of HTTP server requests.")
.withExplicitBucketBoundaries(responseDurationSecondsHistogramBuckets)
.create
}

val requests: F[Counter[F, Long]] = {
val activeRequests: F[UpDownCounter[F, Long]] =
Meter[F]
.counter[Long](prefix + ".request.count")
.withDescription("Total Requests.")
.upDownCounter[Long]("http.server.active_requests")
.withDescription("Number of active HTTP server requests.")
.create
}

val abnormalTerminations: F[Histogram[F, Double]] = {
val abnormalTerminations: F[Histogram[F, Double]] =
Meter[F]
.histogram[Double](prefix + ".abnormal_terminations")
.histogram[Double]("http.server.abnormal_terminations")
.withDescription("Total Abnormal Terminations.")
.withExplicitBucketBoundaries(responseDurationSecondsHistogramBuckets)
.create
}

(responseDuration, activeRequests, requests, abnormalTerminations).mapN(MetricsCollection.apply)
(requestDuration, activeRequests, abnormalTerminations).mapN(MetricsCollection.apply)
}

private val DefaultHistogramBuckets: BucketBoundaries =
BucketBoundaries(Vector(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1, 2.5, 5, 7.5, 10))
}

final case class MetricsCollection[F[_]](
responseDuration: Histogram[F, Double],
activeRequests: UpDownCounter[F, Long],
requests: Counter[F, Long],
abnormalTerminations: Histogram[F, Double]
)

private sealed trait Phase
final case class MetricsCollection[F[_]](
requestDuration: Histogram[F, Double],
activeRequests: UpDownCounter[F, Long],
abnormalTerminations: Histogram[F, Double],
)

private object Phase {
case object Headers extends Phase
private sealed trait Phase

case object Body extends Phase
private object Phase {
case object Headers extends Phase

def report(s: Phase): String =
s match {
case Headers => "headers"
case Body => "body"
}
}

private sealed trait AbnormalTermination
case object Body extends Phase
}

private object AbnormalTermination {
case object Abnormal extends AbnormalTermination
private object TypedMetricAttributes {
private val Classifier: AttributeKey[String] = AttributeKey.string("classifier")

case object Error extends AbnormalTermination
def classifier(string: Option[String]): Attribute[String] =
Classifier(string.getOrElse(""))

case object Timeout extends AbnormalTermination
private val HttpPhase: AttributeKey[String] = AttributeKey.string("http.phase")

case object Canceled extends AbnormalTermination
def httpPhase(s: Phase): Attribute[String] =
HttpPhase(s match {
case Phase.Headers => "headers"
case Phase.Body => "body"
})

def report(t: AbnormalTermination): String =
t match {
case Abnormal => "abnormal"
case Timeout => "timeout"
case Error => "error"
case Canceled => "cancel"
}
def errorType(terminationType: TerminationType): Attribute[String] =
ErrorAttributes.ErrorType(terminationType match {
case TerminationType.Abnormal(e) => e.getClass.getName
case TerminationType.Error(e) => e.getClass.getName
case TerminationType.Canceled => "cancel"
case TerminationType.Timeout => "timeout"
})
}
}

0 comments on commit f9a8552

Please sign in to comment.