Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: fix for cluster aggregation performance #631

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ project adheres to [Semantic Versioning](http://semver.org/).

### Changed

- Changes for cluster mode
- Removed `byLabels` Grouper in `metricAggregators.js` file and created a global Map to avoid Map creation on every request for the metrics
- Moved hashing of labels from master to worker to distribute the cpu bound hashing among workers
- Workers to write metrics in tmp file and send the file name to master to read metrics from rather than sending on IPC to keep IPC congestion free. (change in `cluster.js`)

### Added

[unreleased]: https://github.com/siimon/prom-client/compare/v15.1.2...HEAD
Expand Down
70 changes: 52 additions & 18 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
*/

const Registry = require('./registry');
const { Grouper } = require('./util');
const { Grouper, hashObject } = require('./util');
const { aggregators } = require('./metricAggregators');
const fs = require('fs');
const path = require('path');
const os = require('os');
// We need to lazy-load the 'cluster' module as some application servers -
// namely Passenger - crash when it is imported.
let cluster = () => {
Expand Down Expand Up @@ -175,19 +178,28 @@ function addListeners() {
request.done(new Error(message.error));
return;
}

message.metrics.forEach(registry => request.responses.push(registry));
request.pending--;

if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);

const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}
fs.readFile(message.filename, 'utf8', (err, data) => {
Copy link

@BourgoisMickael BourgoisMickael Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About IPC

Can we have it as a customizable option and keep the default behavior instead ?
So users could pick if they want file vs ipc communication and there would be no breaking change.


Instead of sending filename to worker, could the worker keep a permanent file (named with its pid) and use IPC only to notify the master to read the file (you could use the 1st line of the file to reconcile requestId) or use fs.watchFile


Instead of file, could using unix socket or named pipes (if available) give better perf ?

About hashing

The master process should be used only to initiate and restart workers and aggregate metrics. So it should not need that much CPU compared to workers that handle the workload. And this will put a little more load to all the workers at constant interval.

I guess you could have bad performance when you have a lot of workers and short scrape interval then.

Thus same for this feature, could it be an option to configure prom-client with hashing either in master or distributed on workers, and by default keep the current behavior to introduce no breaking changes.

if (err) {
request.done(err);
return;
} else {
const metrics = JSON.parse(data);
metrics.forEach(registry => request.responses.push(registry));
fs.unlink(message.filename, e => {
if (e)
console.error(`Error deleting file ${message.filename}:`, e);
});
request.pending--;
if (request.pending === 0) {
// finalize
requests.delete(message.requestId);
clearTimeout(request.errorTimeout);
const registry = AggregatorRegistry.aggregate(request.responses);
const promString = registry.metrics();
request.done(null, promString);
}
}
});
}
});
}
Expand All @@ -198,10 +210,32 @@ function addListeners() {
if (message.type === GET_METRICS_REQ) {
Promise.all(registries.map(r => r.getMetricsAsJSON()))
.then(metrics => {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
metrics,
metrics.forEach(registry => {
registry.forEach(value => {
const hash = hashObject(value);
const key = `${value.metricName}_${hash}`;
value.hash = key;
});
});
// adding request id in file path to handle concurrency
const filename = path.join(
os.tmpdir(),
`metrics-${process.pid}-${message.requestId}.json`,
);
fs.writeFile(filename, JSON.stringify(metrics), err => {
Comment on lines +220 to +225
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really hesitant to involve files, since that suddenly involves the page cache, file system, more syscalls and possibly other surprises like filesystem permissions, running out of disk space and cleaning up. How much of an improvement did this yield? Did you benchmark before or after Node.js v18.6.0, which has this optimization? How much disk I/O load did your system have when benchmarking? I think IPC is "supposed" to be faster than involving the filesystem.

Copy link
Author

@ssg2526 ssg2526 Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmarks that I have performed are on our own application. The screenshots of the results are shared in the
this Issue Link. The benchmarks were done on a 16 core machine and approximately 250-300 metrics (total size ~500 Kbs). At P99 and above levels we are getting ~10x improvement in app performance. The only disk I/Os we have is logging but that is also not happening on /tmp mounted drive on which we are writing our metrics.
The major choking point was with IPC and creating the map and hashing the object. The detailed bifurcation that I have done on my local machine which is 8 core machine is given In the screenshot below. and the code used is in below given zip file. The zip file contains the node modules as well because I have added some logs in prom-client to get the time data of each parts. Here in screenshot if we see the worst IPC time is always higher. I had run multiple iterations and same/similar results are found. The total aggregation time which is 72ms in this screenshot out of which 68ms is taken for hashing and building the map. I haven't tested with node v18.6.0 yet. will test the same code with that as well and share the results in the same thread
cluster_test.zip
Screenshot 2024-06-03 at 2 27 48 PM

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In these benchmarks (issue link) the scraping interval used was every 5 seconds and the throughput on the app was about 1100-1200 RPS.

Copy link
Author

@ssg2526 ssg2526 Jun 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zbjornson

  • I also tried the code in the zip file with node v18.6.0. I am getting the similar results without any improvements in the logged timings.
  • I think we shouldn't use IPC for this communication because that can create a problem whenever the sizes of metrics go beyond a certain point and that will start blocking the requests routing at high throughputs.
  • We can ask user to provide permissions for /tmp folder. From prom-client performance POV i didn't see much of a difference with files but again I am only hitting it once every 5 seconds.
  • If not files then we should think of other ways of communication to solve this completely like workers calling over tcp on master port

Copy link
Author

@ssg2526 ssg2526 Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zbjornson @SimenB we went live in production with these changes about 3 months ago and everything is working fine for us. We are not seeing any additional latencies post this change. We serve more than 100,000 RPS at peak for this service. If required I can share the production results as well.
I understand that introducing file system can pose different challenges for the library, But we can explore communication with HTTP or some other mode of communication (Not IPC) when we have larger size of the metrics as the current solution will cause very high tail latencies (>P99) at high throughput and larger metrics sizes.
Requesting your inputs on this, In terms of how we can take this forward as we don't want to diverge from the library maintaining our own custom version.

if (err) {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
error: err.message,
});
} else {
process.send({
type: GET_METRICS_RES,
requestId: message.requestId,
filename,
});
}
});
})
.catch(error => {
Expand Down
16 changes: 12 additions & 4 deletions lib/metricAggregators.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const { Grouper, hashObject } = require('./util');
const metricMap = new Map();

/**
* Returns a new function that applies the `aggregatorFn` to the values.
Expand All @@ -18,11 +18,18 @@ function AggregatorFactory(aggregatorFn) {
aggregator: metrics[0].aggregator,
};
// Gather metrics by metricName and labels.
const byLabels = new Grouper();
if (!metricMap.get(metrics[0].name)) {
metricMap.set(metrics[0].name, new Map());
}
const byLabels = metricMap.get(metrics[0].name);
metrics.forEach(metric => {
metric.values.forEach(value => {
const key = hashObject(value.labels);
byLabels.add(`${value.metricName}_${key}`, value);
const valuesArray = byLabels.get(value.hash);
if (!valuesArray) {
byLabels.set(value.hash, [value]);
} else {
valuesArray.push(value);
}
});
});
// Apply aggregator function to gathered metrics.
Expand All @@ -37,6 +44,7 @@ function AggregatorFactory(aggregatorFn) {
}
// NB: Timestamps are omitted.
result.values.push(valObj);
values.length = 0;
});
return result;
};
Expand Down
14 changes: 7 additions & 7 deletions test/aggregatorsTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ describe('aggregators', () => {
name: 'metric_name',
type: 'does not matter',
values: [
{ labels: [], value: 1 },
{ labels: ['label1'], value: 2 },
{ labels: [], value: 1, hash: 'h1' },
{ labels: ['label1'], value: 2, hash: 'h2' },
],
},
{
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [
{ labels: [], value: 3 },
{ labels: ['label1'], value: 4 },
{ labels: [], value: 3, hash: 'h1' },
{ labels: ['label1'], value: 4, hash: 'h2' },
],
},
];
Expand Down Expand Up @@ -102,19 +102,19 @@ describe('aggregators', () => {
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [{ labels: [], value: 1, metricName: 'abc' }],
values: [{ labels: [], value: 1, metricName: 'abc', hash: 'h1' }],
},
{
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [{ labels: [], value: 3, metricName: 'abc' }],
values: [{ labels: [], value: 3, metricName: 'abc', hash: 'h1' }],
},
{
help: 'metric_help',
name: 'metric_name',
type: 'does not matter',
values: [{ labels: [], value: 5, metricName: 'def' }],
values: [{ labels: [], value: 5, metricName: 'def', hash: 'h2' }],
},
];
const result = aggregators.sum(metrics2);
Expand Down
59 changes: 49 additions & 10 deletions test/clusterTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const cluster = require('cluster');
const process = require('process');
const Registry = require('../lib/cluster');
const { hash } = require('crypto');

describe.each([
['Prometheus', Registry.PROMETHEUS_CONTENT_TYPE],
Expand Down Expand Up @@ -61,11 +62,13 @@ describe.each([
labels: { le: 0.1, code: '300' },
value: 0,
metricName: 'test_histogram_bucket',
hash: 'test_histogram_bucket{le="0.1",code="300"}',
},
{
labels: { le: 10, code: '300' },
value: 1.6486727018068046,
metricName: 'test_histogram_bucket',
hash: 'test_histogram_bucket{le="10",code="300"}',
},
],
aggregator: 'sum',
Expand All @@ -75,24 +78,40 @@ describe.each([
name: 'test_gauge',
type: 'gauge',
values: [
{ value: 0.47, labels: { method: 'get', code: 200 } },
{ value: 0.64, labels: {} },
{ value: 23, labels: { method: 'post', code: '300' } },
{
value: 0.47,
labels: { method: 'get', code: 200 },
hash: 'test_gauge{method="get",code="200"}',
},
{ value: 0.64, labels: {}, hash: 'test_gauge{}' },
{
value: 23,
labels: { method: 'post', code: '300' },
hash: 'test_gauge{method="post",code="300"}',
},
],
aggregator: 'sum',
},
{
help: 'Start time of the process since unix epoch in seconds.',
name: 'process_start_time_seconds',
type: 'gauge',
values: [{ value: 1502075832, labels: {} }],
values: [
{
value: 1502075832,
labels: {},
hash: 'process_start_time_seconds{}',
},
],
aggregator: 'omit',
},
{
help: 'Lag of event loop in seconds.',
name: 'nodejs_eventloop_lag_seconds',
type: 'gauge',
values: [{ value: 0.009, labels: {} }],
values: [
{ value: 0.009, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' },
],
aggregator: 'average',
},
{
Expand All @@ -103,6 +122,7 @@ describe.each([
{
value: 1,
labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 },
hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}',
},
],
aggregator: 'first',
Expand All @@ -118,11 +138,13 @@ describe.each([
labels: { le: 0.1, code: '300' },
value: 0.235151,
metricName: 'test_histogram_bucket',
hash: 'test_histogram_bucket{le="0.1",code="300"}',
},
{
labels: { le: 10, code: '300' },
value: 1.192591,
metricName: 'test_histogram_bucket',
hash: 'test_histogram_bucket{le="10",code="300"}',
},
],
aggregator: 'sum',
Expand All @@ -132,24 +154,40 @@ describe.each([
name: 'test_gauge',
type: 'gauge',
values: [
{ value: 0.02, labels: { method: 'get', code: 200 } },
{ value: 0.24, labels: {} },
{ value: 51, labels: { method: 'post', code: '300' } },
{
value: 0.02,
labels: { method: 'get', code: 200 },
hash: 'test_gauge{method="get",code="200"}',
},
{ value: 0.24, labels: {}, hash: 'test_gauge{}' },
{
value: 51,
labels: { method: 'post', code: '300' },
hash: 'test_gauge{method="post",code="300"}',
},
],
aggregator: 'sum',
},
{
help: 'Start time of the process since unix epoch in seconds.',
name: 'process_start_time_seconds',
type: 'gauge',
values: [{ value: 1502075849, labels: {} }],
values: [
{
value: 1502075849,
labels: {},
hash: 'process_start_time_seconds{}',
},
],
aggregator: 'omit',
},
{
help: 'Lag of event loop in seconds.',
name: 'nodejs_eventloop_lag_seconds',
type: 'gauge',
values: [{ value: 0.008, labels: {} }],
values: [
{ value: 0.008, labels: {}, hash: 'nodejs_eventloop_lag_seconds{}' },
],
aggregator: 'average',
},
{
Expand All @@ -160,6 +198,7 @@ describe.each([
{
value: 1,
labels: { version: 'v6.11.1', major: 6, minor: 11, patch: 1 },
hash: 'nodejs_version_info{version="v6.11.1",major="6",minor="11",patch="1"}',
},
],
aggregator: 'first',
Expand Down
Loading