Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,9 @@ IS_CYBERSTORM_ENABLED=
SHOW_CYBERSTORM_API_DOCS=

DECOMPILER_API_HOST=http://host.docker.internal:8000

KAFKA_ENABLED=False
KAFKA_BOOTSTRAP_SERVERS=
KAFKA_USERNAME=
KAFKA_PASSWORD=
KAFKA_CA_CERT=
61 changes: 60 additions & 1 deletion django/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions django/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ markdown-it-py = {version = "2.1.0", extras = ["linkify"]}
abyss = {git = "https://github.com/akx/abyss.git", rev = "4352e557f25a303f718ec1bf82ea0ca545ebc077"}
lxml = "^4.9.1"
pygments = "^2.16.1"
confluent-kafka = "^2.0"

[tool.poetry.group.plugins]
optional = true
Expand Down
22 changes: 22 additions & 0 deletions django/thunderstore/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@
CACHALOT_TIMEOUT_SECONDS=(int, 60 * 15), # 15 minutes by default
CACHALOT_ENABLED=(bool, True),
DOWNLOAD_METRICS_TTL_SECONDS=(int, 60 * 10),
KAFKA_BOOTSTRAP_SERVERS=(str, ""),
KAFKA_USERNAME=(str, ""),
KAFKA_PASSWORD=(str, ""),
KAFKA_CA_CERT=(str, ""),
KAFKA_ENABLED=(bool, False),
# FEATURE FLAGS UNDER HERE
IS_CYBERSTORM_ENABLED=(bool, False),
SHOW_CYBERSTORM_API_DOCS=(bool, False),
Expand Down Expand Up @@ -283,6 +288,7 @@ def load_db_certs():
"thunderstore.moderation",
"thunderstore.permissions",
"thunderstore.ts_reports",
"thunderstore.ts_analytics",
]
)

Expand Down Expand Up @@ -391,6 +397,7 @@ class CeleryQueues:
BackgroundCache = "background.cache"
BackgroundTask = "background.task"
BackgroundLongRunning = "background.long_running"
Analytics = "analytics"


CELERY_BROKER_URL = env.str("CELERY_BROKER_URL")
Expand Down Expand Up @@ -584,6 +591,21 @@ def get_redis_cache(env_key: str, fallback_key: Optional[str] = None):

THUMBNAIL_QUALITY = 95

# Kafka configuration

# Whether Kafka is enabled
KAFKA_ENABLED = env.bool("KAFKA_ENABLED")

KAFKA_CONFIG = {
"bootstrap.servers": env.str("KAFKA_BOOTSTRAP_SERVERS"),
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": env.str("KAFKA_USERNAME"),
"sasl.password": env.str("KAFKA_PASSWORD"),
"ssl.ca.pem": env.str("KAFKA_CA_CERT"),
"client.id": "thunderstore-analytics",
}

#######################################
# STORAGE #
#######################################
Expand Down
1 change: 1 addition & 0 deletions django/thunderstore/core/tests/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def test_task():
"thunderstore.repository.tasks.cleanup_package_submissions",
"thunderstore.repository.tasks.log_version_download",
"thunderstore.webhooks.tasks.process_audit_event",
"thunderstore.analytics.send_kafka_message_async",
)


Expand Down
90 changes: 0 additions & 90 deletions django/thunderstore/repository/tests/test_download_metrics.py

This file was deleted.

1 change: 1 addition & 0 deletions django/thunderstore/ts_analytics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default_app_config = "thunderstore.ts_analytics.apps.AnalyticsAppConfig"
28 changes: 28 additions & 0 deletions django/thunderstore/ts_analytics/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from django.apps import AppConfig
from django.db.models.signals import post_save


class AnalyticsAppConfig(AppConfig):
name = "thunderstore.ts_analytics"
label = "ts_analytics"

def ready(self):
# Connect the signal handlers
from thunderstore.community.models import Community, PackageListing
from thunderstore.metrics.models import PackageVersionDownloadEvent
from thunderstore.repository.models import Package, PackageVersion
from thunderstore.ts_analytics.signals import (
community_post_save,
package_listing_post_save,
package_post_save,
package_version_download_event_post_save,
package_version_post_save,
)

post_save.connect(
package_version_download_event_post_save, sender=PackageVersionDownloadEvent
)
post_save.connect(package_post_save, sender=Package)
post_save.connect(package_version_post_save, sender=PackageVersion)
post_save.connect(package_listing_post_save, sender=PackageListing)
post_save.connect(community_post_save, sender=Community)
83 changes: 83 additions & 0 deletions django/thunderstore/ts_analytics/kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import json
from enum import Enum
from functools import lru_cache
from typing import Any, Dict, Optional, Union

from celery import shared_task
from confluent_kafka import Producer
from django.conf import settings

from thunderstore.core.settings import CeleryQueues


class KafkaTopic(str, Enum):
PACKAGE_DOWNLOADED = "ts.package.downloaded"
PACKAGE_UPDATED = "ts.package.updated"
PACKAGE_VERSION_UPDATED = "ts.package.version.updated"
PACKAGE_LISTING_UPDATED = "ts.package.listing.updated"
COMMUNITY_UPDATED = "ts.community.updated"


def send_kafka_message(
topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None
):
client = get_kafka_client()
client.send(topic=topic, payload=payload, key=key)


@shared_task(
queue=CeleryQueues.Analytics,
name="thunderstore.analytics.send_kafka_message_async",
)
def send_kafka_message_async(
topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None
):
send_kafka_message(topic=topic, payload=payload, key=key)


class KafkaClient:
def __init__(self, config: Dict[str, Any]):
self._producer = Producer(config)

def send(
self,
topic: Union[KafkaTopic, str],
payload: dict,
key: Optional[str] = None,
):
value_bytes = json.dumps(payload).encode("utf-8")
key_bytes = key.encode("utf-8") if key else None

topic_str = topic.value if isinstance(topic, KafkaTopic) else topic
self._producer.produce(
topic=topic_str,
value=value_bytes,
key=key_bytes,
)

self._producer.poll(0)


class DummyKafkaClient:
"""A dummy Kafka client that does nothing when Kafka is disabled."""

def send(
self, topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None
):
pass


@lru_cache(maxsize=1)
def get_kafka_client() -> Union[KafkaClient, DummyKafkaClient]:
# Return dummy client if Kafka is disabled
if not getattr(settings, "KAFKA_ENABLED", False):
return DummyKafkaClient()

config = getattr(settings, "KAFKA_CONFIG", None)
if not config:
raise RuntimeError("KAFKA_CONFIG is not configured.")

if not config.get("bootstrap.servers"):
raise RuntimeError("Kafka bootstrap servers are not configured.")

return KafkaClient(config)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource Leak: The Kafka producer is cached via @lru_cache(maxsize=1) but never properly closed. The producer maintains network connections and internal buffers that should be cleaned up on shutdown. This will cause:

  1. Resource leaks on application restarts
  2. Potential message loss when the process terminates
  3. TCP connections left in TIME_WAIT state

The producer should be properly managed as a singleton with cleanup:

# Remove @lru_cache and use a proper singleton pattern with __del__ or
# register an atexit handler to call producer.flush() and producer.close()
import atexit

class KafkaClient:
    def __init__(self, config: Dict[str, Any]):
        self._producer = Producer(config)
        atexit.register(self.close)
    
    def close(self):
        if hasattr(self, '_producer'):
            self._producer.flush()

Spotted by Graphite Agent

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

This comment came from an experimental review—please leave feedback if it was helpful/unhelpful. Learn more about experimental comments here.

Copy link
Contributor

@Oksamies Oksamies Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this valid? (the AI scanning warning)

Loading
Loading