Skip to content

Metrics

gvdongen edited this page Dec 30, 2020 · 1 revision

Latency

Latency is the time it takes to process an event. It is the time the event spends within the framework. We measure latency by subtracting the Kafka timestamps of the input and output event.

Throughput

The throughput is the amount of messages that are processed by the framework in an interval of one second. In this benchmark, we compute the amount of output messages that were published to Kafka by the framework.

CPU Utilization

We use cAdvisor for scraping CPU metrics of the containers. First of all, we scrape the total CPU usage. Additionally, we look at the part of this that was spent in user space or system space. To get an idea of the wait times, we compute CPU IO wait by subtracting the user and system usage from the total usage. Finally, we also keep track of the system load average via JMX. The load average expresses a count of the number of processes that are using or waiting for a CPU or are blocked, as further explained in this article. If the load average is higher than the processor count, not all processes can be run directly because of a scarcity of CPU cores.

Memory

We use cAdvisor, JMX and a JMX exporter to scrape memory metrics of the running JVMs. We collect the following metrics:

  • Total memory usage of container
  • Heap: This is the most important metric to look at. Frameworks use this to store data that is being processed and state (Spark, Structured Streaming and Flink with filesystem backend).
  • Non heap
  • RSS: the amount of memory that was allocated to the process.
  • Swap: This should never be used since it will only be used if data doesn't fit in memory anymore and is swapped to disk, as explained by this article.
  • Cache: used by all frameworks to persist data in memory. This is also used by RocksDB when this is used as the state backend. RocksDB uses cache for its page cache to avoid having to read values from the filesystem, as explained in this article.
  • Mapped files
  • Memory pools: G1 Eden, G1 Survivor and G1 Old Generation space. These are the different generations of the heap memory as used by G1GC.
  • Page faults and major page faults

Garbage collection

We use JMX to export metrics from the master and slave JVMs of the clusters. These metrics are scraped by the JMX exporter which publishes them to Kafka. To monitor the garbage collection (GC) activities, We collect for the G1 Young Generation and G1 Old Generation algorithms:

  • Total collection time: approximate accumulated collection elapsed time
  • Total collection count: total number of collections that have occurred
  • Memory before and after collection for each memory pool (Eden, Survivor and Old Gen)
  • Duration of the last collection

We use these metrics to diagnose memory pressure.

Network

We monitor the load on the network of both the Docker containers and the EC2 instances. For the EC2 instances, we export the network load metrics from CloudWatch. For each of the Docker containers, we collect the following metrics with cAdvisor:

  • Bytes transmitted
  • Bytes received
  • Received packages dropped (RxDropped): If this happens, this can be an indication of network congestion
  • Transmitted packages dropped (TxDropped): If this happens, this can be an indication of network congestion

Disk and filesystem

Disk and filesystem are mostly used when RocksDB is the state backend. In the frameworks included in this benchmark, Kafka Streams makes use of RocksDB by default and Flink can make use of it when this is configured.

For the other frameworks, disk or filesystem would only be used when there is memory pressure and files are swapped out to disk.

We monitor the following metrics for disk usage:

  • Disk IO bytes read
  • Disk IO bytes written

We monitor the following metrics on filesystem usage:

  • Usage: number of bytes that is consumed by the container on the filesystem
  • IO time: number of milliseconds spent doing I/Os
  • Weighted IO time: weighted number of milliseconds spent doing I/Os. As stated in the documentation of cAdvisor, this can provide an easy measure of both I/O completion time and the backlog that may be accumulating.
  • Write time
  • Read time

Kafka

We monitor the following metrics to get an idea of the load on the Kafka broker:

  • Total CPU usage
  • Disk IO per second: reads and writes
  • Memory: usage, cache, swap, RSS, working set, mapped
  • Network: bytes transmitted and received, packages dropped (transmitted and received)

HDFS

We monitor the following metrics to get an idea of the load on the HDFS data nodes:

  • Total CPU usage
  • Disk IO per second: reads and writes
  • Memory: usage, cache, swap, RSS, working set, mapped

References

Tooling

Monitoring

CPU

Memory