diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6b5b6dac..50d5d61d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -37,7 +37,15 @@ jobs: - name: Run Test run: | - `which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. + `which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. -v2 + + - name: Install optional dependencies + run: | + pip install prometheus_client + + - name: Run Test with optional dependencies + run: | + `which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=. -v2 mypy: runs-on: ubuntu-latest diff --git a/README.rst b/README.rst index 12b379a7..2ea671ac 100644 --- a/README.rst +++ b/README.rst @@ -368,6 +368,33 @@ Additionally, these statistics are also accessible from the command line. .. image:: demo-django-rq-cli-dashboard.gif +Configuring Prometheus +---------------------- + +``django_rq`` also provides a Prometheus compatible view, which can be enabled +by installing ``prometheus_client`` or installing the extra "prometheus-metrics" +(``pip install django-rq[prometheus]``). The metrics are exposed at +``/django-rq/metrics/`` and the following is an example of the metrics that +are exported:: + + # HELP rq_workers RQ workers + # TYPE rq_workers gauge + # HELP rq_job_successful_total RQ successful job count + # TYPE rq_job_successful_total counter + # HELP rq_job_failed_total RQ failed job count + # TYPE rq_job_failed_total counter + # HELP rq_working_seconds_total RQ total working time + # TYPE rq_working_seconds_total counter + # HELP rq_jobs RQ jobs by status + # TYPE rq_jobs gauge + rq_jobs{queue="default",status="queued"} 0.0 + rq_jobs{queue="default",status="started"} 0.0 + rq_jobs{queue="default",status="finished"} 0.0 + rq_jobs{queue="default",status="failed"} 0.0 + rq_jobs{queue="default",status="deferred"} 0.0 + rq_jobs{queue="default",status="scheduled"} 0.0 + + Configuring Sentry ------------------- Sentry diff --git a/django_rq/admin.py b/django_rq/admin.py index d50a9c7c..d3acf14a 100644 --- a/django_rq/admin.py +++ b/django_rq/admin.py @@ -4,7 +4,7 @@ from django.http.request import HttpRequest from django.http.response import HttpResponse -from . import views, settings, models +from . import settings, stats_views, models class QueueAdmin(admin.ModelAdmin): @@ -32,7 +32,7 @@ def has_module_permission(self, request: HttpRequest): def changelist_view(self, request: HttpRequest, extra_context: Optional[Dict[str, Any]] = None) -> HttpResponse: """The 'change list' admin view for this model.""" # proxy request to stats view - return views.stats(request) + return stats_views.stats(request) if settings.SHOW_ADMIN_LINK: diff --git a/django_rq/contrib/prometheus.py b/django_rq/contrib/prometheus.py new file mode 100644 index 00000000..5f18faeb --- /dev/null +++ b/django_rq/contrib/prometheus.py @@ -0,0 +1,59 @@ +from rq.job import JobStatus + +from ..queues import filter_connection_params, get_connection, get_queue, get_unique_connection_configs +from ..workers import get_worker_class + +try: + from prometheus_client import Summary + from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily + + class RQCollector: + """RQ stats collector""" + + summary = Summary('rq_request_processing_seconds_total', 'Time spent collecting RQ data') + + def collect(self): + from ..settings import QUEUES + + with self.summary.time(): + rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues']) + rq_job_successful_total = CounterMetricFamily('rq_job_successful_total', 'RQ successful job count', labels=['name', 'queues']) + rq_job_failed_total = CounterMetricFamily('rq_job_failed_total', 'RQ failed job count', labels=['name', 'queues']) + rq_working_seconds_total = CounterMetricFamily('rq_working_seconds_total', 'RQ total working time', labels=['name', 'queues']) + + rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by status', labels=['queue', 'status']) + + worker_class = get_worker_class() + unique_configs = get_unique_connection_configs() + connections = {} + for queue_name, config in QUEUES.items(): + index = unique_configs.index(filter_connection_params(config)) + if index not in connections: + connections[index] = connection = get_connection(queue_name) + + for worker in worker_class.all(connection): + name = worker.name + label_queues = ','.join(worker.queue_names()) + rq_workers.add_metric([name, worker.get_state(), label_queues], 1) + rq_job_successful_total.add_metric([name, label_queues], worker.successful_job_count) + rq_job_failed_total.add_metric([name, label_queues], worker.failed_job_count) + rq_working_seconds_total.add_metric([name, label_queues], worker.total_working_time) + else: + connection = connections[index] + + queue = get_queue(queue_name, connection=connection) + rq_jobs.add_metric([queue_name, JobStatus.QUEUED], queue.count) + rq_jobs.add_metric([queue_name, JobStatus.STARTED], queue.started_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.FINISHED], queue.finished_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.FAILED], queue.failed_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.DEFERRED], queue.deferred_job_registry.count) + rq_jobs.add_metric([queue_name, JobStatus.SCHEDULED], queue.scheduled_job_registry.count) + + yield rq_workers + yield rq_job_successful_total + yield rq_job_failed_total + yield rq_working_seconds_total + yield rq_jobs + +except ImportError: + RQCollector = None # type: ignore[assignment, misc] diff --git a/django_rq/stats_views.py b/django_rq/stats_views.py new file mode 100644 index 00000000..8368fa8b --- /dev/null +++ b/django_rq/stats_views.py @@ -0,0 +1,56 @@ +from django.contrib import admin +from django.contrib.admin.views.decorators import staff_member_required +from django.http import Http404, HttpResponse, JsonResponse +from django.shortcuts import render +from django.views.decorators.cache import never_cache + +from .settings import API_TOKEN +from .utils import get_scheduler_statistics, get_statistics + +try: + import prometheus_client + + from .contrib.prometheus import RQCollector +except ImportError: + prometheus_client = RQCollector = None # type: ignore[assignment, misc] + +registry = None + + +@never_cache +@staff_member_required +def prometheus_metrics(request): + global registry + + if not RQCollector: # type: ignore[truthy-function] + raise Http404('prometheus_client has not been installed; install using extra "django-rq[prometheus]"') + + if not registry: + registry = prometheus_client.CollectorRegistry(auto_describe=True) + registry.register(RQCollector()) + + encoder, content_type = prometheus_client.exposition.choose_encoder(request.META.get('HTTP_ACCEPT', '')) + if 'name[]' in request.GET: + registry = registry.restricted_registry(request.GET.getlist('name[]')) + + return HttpResponse(encoder(registry), headers={'Content-Type': content_type}) + + +@never_cache +@staff_member_required +def stats(request): + context_data = { + **admin.site.each_context(request), + **get_statistics(run_maintenance_tasks=True), + **get_scheduler_statistics(), + } + return render(request, 'django_rq/stats.html', context_data) + + +def stats_json(request, token=None): + if request.user.is_staff or (token and token == API_TOKEN): + return JsonResponse(get_statistics()) + + return JsonResponse( + {"error": True, "description": "Please configure API_TOKEN in settings.py before accessing this view."} + ) diff --git a/django_rq/tests/test_prometheus_metrics.py b/django_rq/tests/test_prometheus_metrics.py new file mode 100644 index 00000000..c06da4fe --- /dev/null +++ b/django_rq/tests/test_prometheus_metrics.py @@ -0,0 +1,135 @@ +import os +from unittest import skipIf +from unittest.mock import patch + +from django.contrib.auth.models import User +from django.test import TestCase, override_settings +from django.test.client import Client +from django.urls import NoReverseMatch, reverse + +from django_rq import get_queue +from django_rq.workers import get_worker + +from .fixtures import access_self, failing_job + +try: + import prometheus_client +except ImportError: + prometheus_client = None + +RQ_QUEUES = { + 'default': { + 'HOST': os.environ.get('REDIS_HOST', 'localhost'), + 'PORT': 6379, + 'DB': 0, + }, +} + + +@skipIf(prometheus_client is None, 'prometheus_client is required') +@override_settings(RQ={'AUTOCOMMIT': True}) +class PrometheusTest(TestCase): + def setUp(self): + self.user = User.objects.create_user('foo', password='pass') + self.user.is_staff = True + self.user.is_active = True + self.user.save() + self.client = Client() + self.client.force_login(self.user) + get_queue('default').connection.flushall() + + def assertMetricsContain(self, lines): + response = self.client.get(reverse('rq_metrics')) + self.assertEqual(response.status_code, 200) + self.assertLessEqual( + lines, set(response.content.decode('utf-8').splitlines()) + ) + + @patch('django_rq.settings.QUEUES', RQ_QUEUES) + def test_metrics_default(self): + self.assertMetricsContain( + { + '# HELP rq_jobs RQ jobs by status', + 'rq_jobs{queue="default",status="queued"} 0.0', + 'rq_jobs{queue="default",status="started"} 0.0', + 'rq_jobs{queue="default",status="finished"} 0.0', + 'rq_jobs{queue="default",status="failed"} 0.0', + 'rq_jobs{queue="default",status="deferred"} 0.0', + 'rq_jobs{queue="default",status="scheduled"} 0.0', + } + ) + + @patch('django_rq.settings.QUEUES', RQ_QUEUES) + def test_metrics_with_jobs(self): + queue = get_queue('default') + queue.enqueue(failing_job) + + for _ in range(10): + queue.enqueue(access_self) + + worker = get_worker('default', name='test_worker') + worker.register_birth() + + # override worker registration to effectively simulate non burst mode + register_death = worker.register_death + worker.register_birth = worker.register_death = lambda: None # type: ignore[method-assign] + + try: + self.assertMetricsContain( + { + # job information + '# HELP rq_jobs RQ jobs by status', + 'rq_jobs{queue="default",status="queued"} 11.0', + 'rq_jobs{queue="default",status="started"} 0.0', + 'rq_jobs{queue="default",status="finished"} 0.0', + 'rq_jobs{queue="default",status="failed"} 0.0', + 'rq_jobs{queue="default",status="deferred"} 0.0', + 'rq_jobs{queue="default",status="scheduled"} 0.0', + # worker information + '# HELP rq_workers RQ workers', + 'rq_workers{name="test_worker",queues="default",state="?"} 1.0', + '# HELP rq_job_successful_total RQ successful job count', + 'rq_job_successful_total{name="test_worker",queues="default"} 0.0', + '# HELP rq_job_failed_total RQ failed job count', + 'rq_job_failed_total{name="test_worker",queues="default"} 0.0', + '# HELP rq_working_seconds_total RQ total working time', + 'rq_working_seconds_total{name="test_worker",queues="default"} 0.0', + } + ) + + worker.work(burst=True, max_jobs=4) + self.assertMetricsContain( + { + # job information + 'rq_jobs{queue="default",status="queued"} 7.0', + 'rq_jobs{queue="default",status="finished"} 3.0', + 'rq_jobs{queue="default",status="failed"} 1.0', + # worker information + 'rq_workers{name="test_worker",queues="default",state="idle"} 1.0', + 'rq_job_successful_total{name="test_worker",queues="default"} 3.0', + 'rq_job_failed_total{name="test_worker",queues="default"} 1.0', + } + ) + + worker.work(burst=True) + self.assertMetricsContain( + { + # job information + 'rq_jobs{queue="default",status="queued"} 0.0', + 'rq_jobs{queue="default",status="finished"} 10.0', + 'rq_jobs{queue="default",status="failed"} 1.0', + # worker information + 'rq_workers{name="test_worker",queues="default",state="idle"} 1.0', + 'rq_job_successful_total{name="test_worker",queues="default"} 10.0', + 'rq_job_failed_total{name="test_worker",queues="default"} 1.0', + } + ) + finally: + register_death() + + +@skipIf(prometheus_client is not None, 'prometheus_client is installed') +class NoPrometheusTest(TestCase): + def test_no_metrics_without_prometheus_client(self): + with self.assertRaises(NoReverseMatch): + reverse('rq_metrics') diff --git a/django_rq/tests/test_views.py b/django_rq/tests/test_views.py index 4b8b6997..0d132287 100644 --- a/django_rq/tests/test_views.py +++ b/django_rq/tests/test_views.py @@ -367,7 +367,7 @@ def test_statistics_json_view(self): # With token, token = '12345abcde' - with patch('django_rq.views.API_TOKEN', new_callable=PropertyMock(return_value=token)): + with patch('django_rq.stats_views.API_TOKEN', new_callable=PropertyMock(return_value=token)): response = self.client.get(reverse('rq_home_json', args=[token])) self.assertEqual(response.status_code, 200) self.assertIn("name", response.content.decode('utf-8')) diff --git a/django_rq/tests/utils.py b/django_rq/tests/utils.py index 8b467f7a..28fd87e8 100644 --- a/django_rq/tests/utils.py +++ b/django_rq/tests/utils.py @@ -1,5 +1,24 @@ +from typing import Any, Dict +from unittest.mock import patch + from django_rq.queues import get_connection, get_queue_by_index +try: + from redis.backoff import ExponentialWithJitterBackoff, NoBackoff # type: ignore[attr-defined] + from redis.retry import Retry +except ImportError: + ExponentialWithJitterBackoff = None + Retry = None # type: ignore[misc, assignment] + + +def _is_buggy_retry(kwargs: Dict[str, Any]) -> bool: + return ( + Retry is not None + and (retry := kwargs.get('retry')) is not None + and isinstance(retry, Retry) + and isinstance(retry._backoff, ExponentialWithJitterBackoff) # type: ignore[attr-defined] + ) + def get_queue_index(name='default'): """ @@ -15,7 +34,22 @@ def get_queue_index(name='default'): continue if q.name == name: # assert that the connection is correct - assert q.connection.connection_pool.connection_kwargs == connection_kwargs + pool_kwargs = q.connection.connection_pool.connection_kwargs + if not _is_buggy_retry(pool_kwargs) or not _is_buggy_retry(connection_kwargs): + assert pool_kwargs == connection_kwargs + else: + # patch the retry backoff since there is a bug in the default + # backoff strategy + # + # fixed in https://github.com/redis/redis-py/pull/3668 + with patch.object( + pool_kwargs['retry'], '_backoff', NoBackoff() + ), patch.object( + connection_kwargs['retry'], '_backoff', NoBackoff() + ): + assert pool_kwargs == connection_kwargs + + assert pool_kwargs['retry']._backoff.__dict__ == connection_kwargs['retry']._backoff.__dict__ return i diff --git a/django_rq/urls.py b/django_rq/urls.py index 1aff8d9c..3f3bf725 100644 --- a/django_rq/urls.py +++ b/django_rq/urls.py @@ -1,10 +1,16 @@ from django.urls import re_path -from . import views +from . import stats_views, views +from .contrib.prometheus import RQCollector + +metrics_view = [ + re_path(r'^metrics/?$', stats_views.prometheus_metrics, name='rq_metrics'), +] if RQCollector else [] # type: ignore[truthy-function] urlpatterns = [ - re_path(r'^$', views.stats, name='rq_home'), - re_path(r'^stats.json/(?P[\w]+)?/?$', views.stats_json, name='rq_home_json'), + re_path(r'^$', stats_views.stats, name='rq_home'), + re_path(r'^stats.json/(?P[\w]+)?/?$', stats_views.stats_json, name='rq_home_json'), + *metrics_view, re_path(r'^queues/(?P[\d]+)/$', views.jobs, name='rq_jobs'), re_path(r'^workers/(?P[\d]+)/$', views.workers, name='rq_workers'), re_path(r'^workers/(?P[\d]+)/(?P[-\w\.\:\$]+)/$', views.worker_details, name='rq_worker_details'), diff --git a/django_rq/views.py b/django_rq/views.py index 6212064e..5a3aa3c8 100644 --- a/django_rq/views.py +++ b/django_rq/views.py @@ -1,11 +1,9 @@ -from __future__ import division - from math import ceil from typing import Any, cast, Tuple from django.contrib import admin, messages from django.contrib.admin.views.decorators import staff_member_required -from django.http import Http404, JsonResponse +from django.http import Http404 from django.shortcuts import redirect, render from django.urls import reverse from django.views.decorators.cache import never_cache @@ -25,28 +23,8 @@ from rq.worker_registration import clean_worker_registry from .queues import get_queue_by_index, get_scheduler_by_index -from .settings import API_TOKEN, QUEUES_MAP -from .utils import get_executions, get_jobs, get_scheduler_statistics, get_statistics, stop_jobs - - -@never_cache -@staff_member_required -def stats(request): - context_data = { - **admin.site.each_context(request), - **get_statistics(run_maintenance_tasks=True), - **get_scheduler_statistics(), - } - return render(request, 'django_rq/stats.html', context_data) - - -def stats_json(request, token=None): - if request.user.is_staff or (token and token == API_TOKEN): - return JsonResponse(get_statistics()) - - return JsonResponse( - {"error": True, "description": "Please configure API_TOKEN in settings.py before accessing this view."} - ) +from .settings import QUEUES_MAP +from .utils import get_executions, get_jobs, stop_jobs @never_cache diff --git a/setup.cfg b/setup.cfg index f9b0aea0..04aafd71 100644 --- a/setup.cfg +++ b/setup.cfg @@ -14,6 +14,9 @@ warn_unreachable = true [mypy-django_redis.*] ignore_missing_imports = true +[mypy-prometheus_client.*] +ignore_missing_imports = true + [mypy-redis_cache.*] ignore_missing_imports = true diff --git a/setup.py b/setup.py index 683e9e91..52ad114f 100644 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ }, install_requires=['django>=3.2', 'rq>=2', 'redis>=3.5'], extras_require={ + 'prometheus': ['prometheus_client>=0.4.0'], 'Sentry': ['sentry-sdk>=1.0.0'], 'testing': [], },