Skip to content

Commit

Permalink
feat(exporter): Add endpoint for RQ metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Tarun Pratap Singh committed Jan 3, 2025
1 parent 48f4dd3 commit a47c740
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 0 deletions.
112 changes: 112 additions & 0 deletions agent/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily
from prometheus_client.registry import Collector
from rq import Queue, Worker
from rq.job import JobStatus

REDIS_PORT = 25025


def get_bench_metrics(bench):
from prometheus_client.exposition import generate_latest

return generate_latest(RQCollector(bench)) or "asa"


def get_workers_stats(port):
from redis import Redis

workers = Worker.all(connection=Redis(port=port))

return [
{
"name": w.name,
"queues": w.queue_names(),
"state": w.get_state(),
"successful_job_count": w.successful_job_count,
"failed_job_count": w.failed_job_count,
"total_working_time": w.total_working_time,
}
for w in workers
]


def get_jobs_by_queue(port):
from redis import Redis

return {
queue.name: {
JobStatus.QUEUED: queue.count,
JobStatus.STARTED: queue.started_job_registry.count,
JobStatus.FINISHED: queue.finished_job_registry.count,
JobStatus.FAILED: queue.failed_job_registry.count,
JobStatus.DEFERRED: queue.deferred_job_registry.count,
JobStatus.SCHEDULED: queue.scheduled_job_registry.count,
}
for queue in Queue.all(connection=Redis(port=port))
}


class RQCollector(Collector):
def __init__(self, bench):
self.bench = bench
super().__init__()

def collect(self):
rq_workers = GaugeMetricFamily(
"rq_workers",
"RQ workers",
labels=["bench", "name", "state", "queues"],
)
rq_workers_success = CounterMetricFamily(
"rq_workers_success",
"RQ workers success count",
labels=["bench", "name", "queues"],
)
rq_workers_failed = CounterMetricFamily(
"rq_workers_failed",
"RQ workers fail count",
labels=["bench", "name", "queues"],
)
rq_workers_working_time = CounterMetricFamily(
"rq_workers_working_time",
"RQ workers spent seconds",
labels=["bench", "name", "queues"],
)

rq_jobs = GaugeMetricFamily("rq_jobs", "RQ jobs by state", labels=["bench", "queue", "status"])

workers = get_workers_stats(REDIS_PORT)
for worker in workers:
label_queues = ",".join(worker["queues"])
rq_workers.add_metric(
[
self.bench,
worker["name"],
worker["state"],
label_queues,
],
1,
)
rq_workers_success.add_metric(
[self.bench, worker["name"], label_queues],
worker["successful_job_count"],
)
rq_workers_failed.add_metric(
[self.bench, worker["name"], label_queues],
worker["failed_job_count"],
)
rq_workers_working_time.add_metric(
[self.bench, worker["name"], label_queues],
worker["total_working_time"],
)

yield rq_workers
yield rq_workers_success
yield rq_workers_failed
yield rq_workers_working_time

for queue_name, jobs in get_jobs_by_queue(REDIS_PORT).items():
for status, count in jobs.items():
rq_jobs.add_metric([self.bench, queue_name, status], count)

yield rq_jobs
8 changes: 8 additions & 0 deletions agent/templates/agent/nginx.conf.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ server {
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Port $server_port;

location /agent/benches/metrics {
return 301 /metrics/rq;
}

proxy_pass http://127.0.0.1:{{ web_port }}/;
}

Expand Down Expand Up @@ -135,6 +139,10 @@ server {
proxy_pass http://127.0.0.1:9114/metrics;
}

location /metrics/rq {
proxy_pass http://127.0.0.1:{{ web_port }}/benches/metrics;
}

}

{% if registry %}
Expand Down
19 changes: 19 additions & 0 deletions agent/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ def wrapper(*args, **kwargs):

@application.before_request
def validate_access_token():
exempt_endpoints = ["get_benches_metrics"]
if request.endpoint in exempt_endpoints:
return None

try:
if application.debug:
return None
Expand Down Expand Up @@ -218,12 +222,27 @@ def get_benches():
return {name: bench.dump() for name, bench in Server().benches.items()}


@application.route("/benches/metrics")
def get_benches_metrics():
from agent.exporter import get_bench_metrics

benches_metrics = [get_bench_metrics(name) for name in Server().benches]
return Response(benches_metrics, mimetype="text/plain")


@application.route("/benches/<string:bench>")
@validate_bench
def get_bench(bench):
return Server().benches[bench].dump()


@application.route("/benches/<string:bench>/metrics", methods=["GET"])
def get_bench_metrics(bench):
from agent.exporter import get_bench_metrics

return Response(get_bench_metrics(bench), mimetype="text/plain")


@application.route("/benches/<string:bench>/info", methods=["POST", "GET"])
@validate_bench
def fetch_sites_info(bench):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Jinja2==2.10.3
MarkupSafe==1.1.1
passlib==1.7.2
peewee==3.13.1
prometheus_client==0.20.0
PyMySQL==0.9.3
python-crontab==2.5.1
python-dateutil==2.8.2
Expand Down

0 comments on commit a47c740

Please sign in to comment.