Skip to content

export RQ statistics as prometheus metrics #666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ jobs:

- name: Run Test
run: |
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=.
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. -v2

- name: Install optional dependencies
run: |
pip install prometheus_client

- name: Run Test with optional dependencies
run: |
`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. -v2

mypy:
runs-on: ubuntu-latest
Expand Down
27 changes: 27 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,33 @@ Additionally, these statistics are also accessible from the command line.

.. image:: demo-django-rq-cli-dashboard.gif

Configuring Prometheus
----------------------

``django_rq`` also provides a Prometheus compatible view, which can be enabled
by installing ``prometheus_client`` or installing the extra "prometheus-metrics"
(``pip install django-rq[prometheus]``). The metrics are exposed at
``/django-rq/metrics/`` and the following is an example of the metrics that
are exported::

# HELP rq_workers RQ workers
# TYPE rq_workers gauge
# HELP rq_job_successful_total RQ successful job count
# TYPE rq_job_successful_total counter
# HELP rq_job_failed_total RQ failed job count
# TYPE rq_job_failed_total counter
# HELP rq_working_seconds_total RQ total working time
# TYPE rq_working_seconds_total counter
# HELP rq_jobs RQ jobs by status
# TYPE rq_jobs gauge
rq_jobs{queue="default",status="queued"} 0.0
rq_jobs{queue="default",status="started"} 0.0
rq_jobs{queue="default",status="finished"} 0.0
rq_jobs{queue="default",status="failed"} 0.0
rq_jobs{queue="default",status="deferred"} 0.0
rq_jobs{queue="default",status="scheduled"} 0.0


Configuring Sentry
-------------------
Sentry
Expand Down
4 changes: 2 additions & 2 deletions django_rq/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.http.request import HttpRequest
from django.http.response import HttpResponse

from . import views, settings, models
from . import settings, stats_views, models


class QueueAdmin(admin.ModelAdmin):
Expand Down Expand Up @@ -32,7 +32,7 @@ def has_module_permission(self, request: HttpRequest):
def changelist_view(self, request: HttpRequest, extra_context: Optional[Dict[str, Any]] = None) -> HttpResponse:
"""The 'change list' admin view for this model."""
# proxy request to stats view
return views.stats(request)
return stats_views.stats(request)


if settings.SHOW_ADMIN_LINK:
Expand Down
59 changes: 59 additions & 0 deletions django_rq/contrib/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from rq.job import JobStatus

from ..queues import filter_connection_params, get_connection, get_queue, get_unique_connection_configs
from ..workers import get_worker_class

try:
from prometheus_client import Summary
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily

class RQCollector:
"""RQ stats collector"""

summary = Summary('rq_request_processing_seconds_total', 'Time spent collecting RQ data')

def collect(self):
from ..settings import QUEUES

with self.summary.time():
rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues'])
rq_job_successful_total = CounterMetricFamily('rq_job_successful_total', 'RQ successful job count', labels=['name', 'queues'])
rq_job_failed_total = CounterMetricFamily('rq_job_failed_total', 'RQ failed job count', labels=['name', 'queues'])
rq_working_seconds_total = CounterMetricFamily('rq_working_seconds_total', 'RQ total working time', labels=['name', 'queues'])

rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by status', labels=['queue', 'status'])

worker_class = get_worker_class()
unique_configs = get_unique_connection_configs()
connections = {}
for queue_name, config in QUEUES.items():
index = unique_configs.index(filter_connection_params(config))
if index not in connections:
connections[index] = connection = get_connection(queue_name)

for worker in worker_class.all(connection):
name = worker.name
label_queues = ','.join(worker.queue_names())
rq_workers.add_metric([name, worker.get_state(), label_queues], 1)
rq_job_successful_total.add_metric([name, label_queues], worker.successful_job_count)
rq_job_failed_total.add_metric([name, label_queues], worker.failed_job_count)
rq_working_seconds_total.add_metric([name, label_queues], worker.total_working_time)
else:
connection = connections[index]

queue = get_queue(queue_name, connection=connection)
rq_jobs.add_metric([queue_name, JobStatus.QUEUED], queue.count)
rq_jobs.add_metric([queue_name, JobStatus.STARTED], queue.started_job_registry.count)
rq_jobs.add_metric([queue_name, JobStatus.FINISHED], queue.finished_job_registry.count)
rq_jobs.add_metric([queue_name, JobStatus.FAILED], queue.failed_job_registry.count)
rq_jobs.add_metric([queue_name, JobStatus.DEFERRED], queue.deferred_job_registry.count)
rq_jobs.add_metric([queue_name, JobStatus.SCHEDULED], queue.scheduled_job_registry.count)

yield rq_workers
yield rq_job_successful_total
yield rq_job_failed_total
yield rq_working_seconds_total
yield rq_jobs

except ImportError:
RQCollector = None # type: ignore[assignment, misc]
56 changes: 56 additions & 0 deletions django_rq/stats_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from django.contrib import admin
from django.contrib.admin.views.decorators import staff_member_required
from django.http import Http404, HttpResponse, JsonResponse
from django.shortcuts import render
from django.views.decorators.cache import never_cache

from .settings import API_TOKEN
from .utils import get_scheduler_statistics, get_statistics

try:
import prometheus_client

from .contrib.prometheus import RQCollector
except ImportError:
prometheus_client = RQCollector = None # type: ignore[assignment, misc]

registry = None


@never_cache
@staff_member_required
def prometheus_metrics(request):
global registry

if not RQCollector: # type: ignore[truthy-function]
raise Http404('prometheus_client has not been installed; install using extra "django-rq[prometheus]"')

if not registry:
registry = prometheus_client.CollectorRegistry(auto_describe=True)
registry.register(RQCollector())

encoder, content_type = prometheus_client.exposition.choose_encoder(request.META.get('HTTP_ACCEPT', ''))
if 'name[]' in request.GET:
registry = registry.restricted_registry(request.GET.getlist('name[]'))

return HttpResponse(encoder(registry), headers={'Content-Type': content_type})


@never_cache
@staff_member_required
def stats(request):
context_data = {
**admin.site.each_context(request),
**get_statistics(run_maintenance_tasks=True),
**get_scheduler_statistics(),
}
return render(request, 'django_rq/stats.html', context_data)


def stats_json(request, token=None):
if request.user.is_staff or (token and token == API_TOKEN):
return JsonResponse(get_statistics())

return JsonResponse(
{"error": True, "description": "Please configure API_TOKEN in settings.py before accessing this view."}
)
135 changes: 135 additions & 0 deletions django_rq/tests/test_prometheus_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import os
from unittest import skipIf
from unittest.mock import patch

from django.contrib.auth.models import User
from django.test import TestCase, override_settings
from django.test.client import Client
from django.urls import NoReverseMatch, reverse

from django_rq import get_queue
from django_rq.workers import get_worker

from .fixtures import access_self, failing_job

try:
import prometheus_client
except ImportError:
prometheus_client = None

RQ_QUEUES = {
'default': {
'HOST': os.environ.get('REDIS_HOST', 'localhost'),
'PORT': 6379,
'DB': 0,
},
}


@skipIf(prometheus_client is None, 'prometheus_client is required')
@override_settings(RQ={'AUTOCOMMIT': True})
class PrometheusTest(TestCase):
def setUp(self):
self.user = User.objects.create_user('foo', password='pass')
self.user.is_staff = True
self.user.is_active = True
self.user.save()
self.client = Client()
self.client.force_login(self.user)
get_queue('default').connection.flushall()

def assertMetricsContain(self, lines):
response = self.client.get(reverse('rq_metrics'))
self.assertEqual(response.status_code, 200)
self.assertLessEqual(
lines, set(response.content.decode('utf-8').splitlines())
)

@patch('django_rq.settings.QUEUES', RQ_QUEUES)
def test_metrics_default(self):
self.assertMetricsContain(
{
'# HELP rq_jobs RQ jobs by status',
'rq_jobs{queue="default",status="queued"} 0.0',
'rq_jobs{queue="default",status="started"} 0.0',
'rq_jobs{queue="default",status="finished"} 0.0',
'rq_jobs{queue="default",status="failed"} 0.0',
'rq_jobs{queue="default",status="deferred"} 0.0',
'rq_jobs{queue="default",status="scheduled"} 0.0',
}
)

@patch('django_rq.settings.QUEUES', RQ_QUEUES)
def test_metrics_with_jobs(self):
queue = get_queue('default')
queue.enqueue(failing_job)

for _ in range(10):
queue.enqueue(access_self)

worker = get_worker('default', name='test_worker')
worker.register_birth()

# override worker registration to effectively simulate non burst mode
register_death = worker.register_death
worker.register_birth = worker.register_death = lambda: None # type: ignore[method-assign]

try:
self.assertMetricsContain(
{
# job information
'# HELP rq_jobs RQ jobs by status',
'rq_jobs{queue="default",status="queued"} 11.0',
'rq_jobs{queue="default",status="started"} 0.0',
'rq_jobs{queue="default",status="finished"} 0.0',
'rq_jobs{queue="default",status="failed"} 0.0',
'rq_jobs{queue="default",status="deferred"} 0.0',
'rq_jobs{queue="default",status="scheduled"} 0.0',
# worker information
'# HELP rq_workers RQ workers',
'rq_workers{name="test_worker",queues="default",state="?"} 1.0',
'# HELP rq_job_successful_total RQ successful job count',
'rq_job_successful_total{name="test_worker",queues="default"} 0.0',
'# HELP rq_job_failed_total RQ failed job count',
'rq_job_failed_total{name="test_worker",queues="default"} 0.0',
'# HELP rq_working_seconds_total RQ total working time',
'rq_working_seconds_total{name="test_worker",queues="default"} 0.0',
}
)

worker.work(burst=True, max_jobs=4)
self.assertMetricsContain(
{
# job information
'rq_jobs{queue="default",status="queued"} 7.0',
'rq_jobs{queue="default",status="finished"} 3.0',
'rq_jobs{queue="default",status="failed"} 1.0',
# worker information
'rq_workers{name="test_worker",queues="default",state="idle"} 1.0',
'rq_job_successful_total{name="test_worker",queues="default"} 3.0',
'rq_job_failed_total{name="test_worker",queues="default"} 1.0',
}
)

worker.work(burst=True)
self.assertMetricsContain(
{
# job information
'rq_jobs{queue="default",status="queued"} 0.0',
'rq_jobs{queue="default",status="finished"} 10.0',
'rq_jobs{queue="default",status="failed"} 1.0',
# worker information
'rq_workers{name="test_worker",queues="default",state="idle"} 1.0',
'rq_job_successful_total{name="test_worker",queues="default"} 10.0',
'rq_job_failed_total{name="test_worker",queues="default"} 1.0',
}
)
finally:
register_death()


@skipIf(prometheus_client is not None, 'prometheus_client is installed')
class NoPrometheusTest(TestCase):
def test_no_metrics_without_prometheus_client(self):
with self.assertRaises(NoReverseMatch):
reverse('rq_metrics')
2 changes: 1 addition & 1 deletion django_rq/tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def test_statistics_json_view(self):

# With token,
token = '12345abcde'
with patch('django_rq.views.API_TOKEN', new_callable=PropertyMock(return_value=token)):
with patch('django_rq.stats_views.API_TOKEN', new_callable=PropertyMock(return_value=token)):
response = self.client.get(reverse('rq_home_json', args=[token]))
self.assertEqual(response.status_code, 200)
self.assertIn("name", response.content.decode('utf-8'))
Expand Down
36 changes: 35 additions & 1 deletion django_rq/tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
from typing import Any, Dict
from unittest.mock import patch

from django_rq.queues import get_connection, get_queue_by_index

try:
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff # type: ignore[attr-defined]
from redis.retry import Retry
except ImportError:
ExponentialWithJitterBackoff = None
Retry = None # type: ignore[misc, assignment]


def _is_buggy_retry(kwargs: Dict[str, Any]) -> bool:
return (
Retry is not None
and (retry := kwargs.get('retry')) is not None
and isinstance(retry, Retry)
and isinstance(retry._backoff, ExponentialWithJitterBackoff) # type: ignore[attr-defined]
)


def get_queue_index(name='default'):
"""
Expand All @@ -15,7 +34,22 @@ def get_queue_index(name='default'):
continue
if q.name == name:
# assert that the connection is correct
assert q.connection.connection_pool.connection_kwargs == connection_kwargs
pool_kwargs = q.connection.connection_pool.connection_kwargs
if not _is_buggy_retry(pool_kwargs) or not _is_buggy_retry(connection_kwargs):
assert pool_kwargs == connection_kwargs
else:
# patch the retry backoff since there is a bug in the default
# backoff strategy
#
# fixed in https://github.com/redis/redis-py/pull/3668
with patch.object(
pool_kwargs['retry'], '_backoff', NoBackoff()
), patch.object(
connection_kwargs['retry'], '_backoff', NoBackoff()
):
assert pool_kwargs == connection_kwargs

assert pool_kwargs['retry']._backoff.__dict__ == connection_kwargs['retry']._backoff.__dict__

return i

Expand Down
12 changes: 9 additions & 3 deletions django_rq/urls.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from django.urls import re_path

from . import views
from . import stats_views, views
from .contrib.prometheus import RQCollector

metrics_view = [
re_path(r'^metrics/?$', stats_views.prometheus_metrics, name='rq_metrics'),
] if RQCollector else [] # type: ignore[truthy-function]

urlpatterns = [
re_path(r'^$', views.stats, name='rq_home'),
re_path(r'^stats.json/(?P<token>[\w]+)?/?$', views.stats_json, name='rq_home_json'),
re_path(r'^$', stats_views.stats, name='rq_home'),
re_path(r'^stats.json/(?P<token>[\w]+)?/?$', stats_views.stats_json, name='rq_home_json'),
*metrics_view,
re_path(r'^queues/(?P<queue_index>[\d]+)/$', views.jobs, name='rq_jobs'),
re_path(r'^workers/(?P<queue_index>[\d]+)/$', views.workers, name='rq_workers'),
re_path(r'^workers/(?P<queue_index>[\d]+)/(?P<key>[-\w\.\:\$]+)/$', views.worker_details, name='rq_worker_details'),
Expand Down
Loading
Loading