-
Notifications
You must be signed in to change notification settings - Fork 29
Add Kafka download metrics #1191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds Kafka-based analytics: adds the Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
django/poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
django/pyproject.toml(1 hunks)django/thunderstore/core/kafka.py(1 hunks)django/thunderstore/core/settings.py(2 hunks)django/thunderstore/repository/tasks/downloads.py(2 hunks)python-packages(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
django/thunderstore/repository/tasks/downloads.py (1)
django/thunderstore/core/kafka.py (1)
KafkaTopics(4-5)
🪛 GitHub Actions: Build & Test
django/thunderstore/repository/tasks/downloads.py
[error] 6-6: ModuleNotFoundError: No module named 'ts_kafka' while importing KafkaClient during collectstatic.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (javascript)
🔇 Additional comments (2)
django/thunderstore/core/kafka.py (1)
4-5: Topic enum looks good.Self-descriptive constant, typed as str/Enum. No concerns.
django/thunderstore/core/settings.py (1)
139-142: Confirm Kafka client and config keys
- Verify which Kafka client library is in use (e.g., confluent-kafka, kafka-python, ts-kafka) and its exact config key names (sasl.mechanism vs sasl.mechanisms, ssl.ca.pem vs ssl.ca.location).
- Then build KAFKA_CONFIG by filtering out falsy values and gate on bootstrap.servers.
44971d3 to
07c026e
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1191 +/- ##
==========================================
+ Coverage 92.80% 92.97% +0.17%
==========================================
Files 337 340 +3
Lines 10367 10610 +243
Branches 942 996 +54
==========================================
+ Hits 9621 9865 +244
+ Misses 618 616 -2
- Partials 128 129 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
django/thunderstore/core/kafka.py (1)
45-54: Consider validating additional required config fields.The bootstrap servers check is good, but you might also want to validate other critical config fields defined in
settings.py:
security.protocol(should be "SASL_SSL" per settings)sasl.mechanism(should be "SCRAM-SHA-256")sasl.usernameandsasl.password(for auth)This would provide earlier, clearer error messages during startup rather than cryptic Kafka connection errors at runtime.
Example validation:
@lru_cache(maxsize=1) def get_kafka_client() -> KafkaClient: config = getattr(settings, "KAFKA_CONFIG", None) if not config: raise RuntimeError("KAFKA_CONFIG is not configured.") if not config.get("bootstrap.servers"): raise RuntimeError("Kafka bootstrap servers are not configured.") # Optional: validate security config if config.get("security.protocol") == "SASL_SSL": if not config.get("sasl.username") or not config.get("sasl.password"): raise RuntimeError("Kafka SASL credentials are not configured.") return KafkaClient(config)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
django/poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
django/pyproject.toml(1 hunks)django/thunderstore/core/kafka.py(1 hunks)django/thunderstore/core/settings.py(2 hunks)django/thunderstore/repository/tasks/downloads.py(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- django/thunderstore/core/settings.py
- django/thunderstore/repository/tasks/downloads.py
🧰 Additional context used
🪛 Ruff (0.14.0)
django/thunderstore/core/kafka.py
41-41: Do not catch blind exception: Exception
(BLE001)
49-49: Avoid specifying long messages outside the exception class
(TRY003)
52-52: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (3)
django/pyproject.toml (1)
38-38: LGTM - Core dependency correctly added.Adding
confluent-kafkaas a core dependency ensures it's available at runtime for the Kafka client inkafka.py. This is the right approach.django/thunderstore/core/kafka.py (2)
10-11: LGTM.The
str, Enumpattern correctly creates a string-valued enum for topic names.
14-16: LGTM.Producer initialization is straightforward.
07c026e to
aec8325
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
django/thunderstore/core/kafka.py (2)
29-34: Replace print() with proper logging.Error handling uses
print()which won't appear in production logs. Use Django's logging framework.+import logging + +logger = logging.getLogger(__name__) + class KafkaClient: ... def send(...): try: value_bytes = json.dumps(payload).encode("utf-8") key_bytes = key.encode("utf-8") if key else None - except TypeError as e: - print(f"Failed to serialize payload to JSON for topic {topic}: {e}") + except (TypeError, ValueError) as e: + logger.error(f"Failed to serialize payload to JSON for topic {topic}: {e}") return
36-45: Add delivery guarantees and fix error handling.Critical issues:
print()won't log in production- Bare
Exceptioncatch is too broad (Ruff BLE001)- No delivery callback means async failures are silently lost
- Missing flush mechanism for shutdown
+from confluent_kafka import KafkaException + +logger = logging.getLogger(__name__) + +def _delivery_callback(err, msg): + if err: + logger.error(f"Message delivery failed: {err}") + class KafkaClient: ... def send(...): ... try: self._producer.produce( topic=topic.value, value=value_bytes, key=key_bytes, + callback=_delivery_callback, ) self._producer.poll(0) - except Exception as e: - print("Error producing message in kafka: " + e.__str__()) + except (KafkaException, BufferError) as e: + logger.error(f"Error producing message to Kafka topic {topic}: {e}") + + def flush(self, timeout: float = 5.0): + """Flush pending messages before shutdown.""" + self._producer.flush(timeout)
🧹 Nitpick comments (2)
django/thunderstore/core/kafka.py (2)
4-4: Remove unused import.
Unionis imported but never used.-from typing import Any, Dict, Optional, Union +from typing import Any, Dict, Optional
52-52: Simplify exception messages.Per TRY003, avoid long messages in exception constructors. Consider using custom exception classes if needed.
- raise RuntimeError("KAFKA_CONFIG is not configured.") + raise RuntimeError("KAFKA_CONFIG missing") - raise RuntimeError("Kafka bootstrap servers are not configured.") + raise RuntimeError("bootstrap.servers missing")Also applies to: 55-55
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
django/poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
django/pyproject.toml(1 hunks)django/thunderstore/core/kafka.py(1 hunks)django/thunderstore/core/settings.py(2 hunks)django/thunderstore/repository/tasks/downloads.py(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- django/pyproject.toml
- django/thunderstore/core/settings.py
🧰 Additional context used
🧬 Code graph analysis (1)
django/thunderstore/repository/tasks/downloads.py (1)
django/thunderstore/core/kafka.py (2)
KafkaTopics(10-11)send_kafka_message(14-16)
🪛 Flake8 (7.3.0)
django/thunderstore/core/kafka.py
[error] 4-4: 'typing.Union' imported but unused
(F401)
🪛 Ruff (0.14.0)
django/thunderstore/core/kafka.py
44-44: Do not catch blind exception: Exception
(BLE001)
52-52: Avoid specifying long messages outside the exception class
(TRY003)
55-55: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build docker image
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
django/thunderstore/core/kafka.py (2)
24-33: Replace print() with logging and narrow exception handling.The bare
Exceptioncatch silently swallows all errors. Use Django's logging framework and catch specific exceptions.Apply this diff:
+import logging + +logger = logging.getLogger(__name__) + @shared_task( queue=CeleryQueues.Kafka, name="thunderstore.core.kafka.send_kafka_message_async" ) def send_kafka_message_async( topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None ): try: send_kafka_message(topic=topic, payload=payload, key=key) - except Exception as e: - print(f"Error sending Kafka message to topic {topic}: {e}") + except (TypeError, ValueError, RuntimeError) as e: + logger.error(f"Error sending Kafka message to topic {topic}: {e}")
36-63: Critical: Replace print() with logging, add delivery guarantees, and narrow exception handling.Current issues:
print()statements (lines 50, 63) should use Django logging- Bare
Exceptioncatch (line 62) is too broad- No delivery callback to detect async failures
- No flush mechanism for graceful shutdown
Apply this diff:
+import logging +from confluent_kafka import KafkaException, BufferError + +logger = logging.getLogger(__name__) + class KafkaClient: def __init__(self, config: Dict[str, Any]): self._producer = Producer(config) + + def flush(self, timeout: float = 10.0) -> int: + """Flush pending messages. Returns number of messages still in queue.""" + return self._producer.flush(timeout) def send( self, topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None, ): + def delivery_report(err, msg): + if err: + logger.error(f"Message delivery failed for topic {msg.topic()}: {err}") + try: value_bytes = json.dumps(payload).encode("utf-8") key_bytes = key.encode("utf-8") if key else None - except TypeError as e: - print(f"Failed to serialize payload to JSON for topic {topic}: {e}") + except (TypeError, ValueError) as e: + logger.error(f"Failed to serialize payload to JSON for topic {topic}: {e}") return try: topic_str = topic.value if isinstance(topic, KafkaTopic) else topic self._producer.produce( topic=topic_str, value=value_bytes, key=key_bytes, + callback=delivery_report, ) self._producer.poll(0) - except Exception as e: - print("Error producing message in kafka: " + e.__str__()) + except (KafkaException, BufferError) as e: + logger.error(f"Error producing message to Kafka topic {topic}: {e}")django/thunderstore/repository/tasks/downloads.py (1)
26-34: Prevent double-counting on Kafka send failures.If
send_kafka_message()raises after the DB commit completes, the Celery task will fail and retry, causing the download counter to increment twice. Wrap the Kafka send in a try/except block to make it best-effort.Apply this diff:
- transaction.on_commit( - lambda: send_kafka_message( - topic=KafkaTopic.METRICS_DOWNLOADS, - payload={ - "version_id": version_id, - "timestamp": timestamp, - }, - ) - ) + def _emit_kafka_event(): + try: + send_kafka_message( + topic=KafkaTopic.METRICS_DOWNLOADS, + payload={ + "version_id": version_id, + "timestamp": timestamp, + }, + ) + except Exception: + # Best-effort metrics; never fail task post-commit + pass + + transaction.on_commit(_emit_kafka_event)
🧹 Nitpick comments (2)
.env.template (1)
66-70: Consider alphabetical ordering of Kafka variables.The dotenv-linter flags that
KAFKA_CA_CERTshould precedeKAFKA_ENABLEDalphabetically. While this is a minor consistency issue, maintaining alphabetical order improves readability and reduces merge conflicts.Apply this diff to reorder:
-KAFKA_ENABLED=False -KAFKA_BOOTSTRAP_SERVERS= -KAFKA_USERNAME= -KAFKA_PASSWORD= -KAFKA_CA_CERT= +KAFKA_BOOTSTRAP_SERVERS= +KAFKA_CA_CERT= +KAFKA_ENABLED=False +KAFKA_PASSWORD= +KAFKA_USERNAME=django/thunderstore/core/kafka.py (1)
75-88: Simplify error messages and consider startup validation.The
RuntimeErrormessages (lines 83, 86) are verbose. Consider custom exceptions or simpler messages.Apply this diff:
@lru_cache(maxsize=1) def get_kafka_client() -> Union[KafkaClient, DummyKafkaClient]: # Return dummy client if Kafka is disabled if not getattr(settings, "KAFKA_ENABLED", True): return DummyKafkaClient() config = getattr(settings, "KAFKA_CONFIG", None) if not config: - raise RuntimeError("KAFKA_CONFIG is not configured.") + raise RuntimeError("KAFKA_CONFIG missing") if not config.get("bootstrap.servers"): - raise RuntimeError("Kafka bootstrap servers are not configured.") + raise RuntimeError("bootstrap.servers not configured") return KafkaClient(config)Additionally, consider validating Kafka configuration at startup rather than lazily in
get_kafka_client()to fail fast.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
.env.template(1 hunks)django/thunderstore/core/kafka.py(1 hunks)django/thunderstore/core/settings.py(3 hunks)django/thunderstore/repository/tasks/downloads.py(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- django/thunderstore/core/settings.py
🧰 Additional context used
🧬 Code graph analysis (2)
django/thunderstore/core/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(393-399)
django/thunderstore/repository/tasks/downloads.py (1)
django/thunderstore/core/kafka.py (2)
KafkaTopic(13-14)send_kafka_message(17-21)
🪛 dotenv-linter (3.3.0)
.env.template
[warning] 67-67: [UnorderedKey] The KAFKA_BOOTSTRAP_SERVERS key should go before the KAFKA_ENABLED key
(UnorderedKey)
[warning] 69-69: [UnorderedKey] The KAFKA_PASSWORD key should go before the KAFKA_USERNAME key
(UnorderedKey)
[warning] 70-70: [UnorderedKey] The KAFKA_CA_CERT key should go before the KAFKA_ENABLED key
(UnorderedKey)
🪛 Ruff (0.14.0)
django/thunderstore/core/kafka.py
32-32: Do not catch blind exception: Exception
(BLE001)
62-62: Do not catch blind exception: Exception
(BLE001)
83-83: Avoid specifying long messages outside the exception class
(TRY003)
86-86: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build docker image
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
🔇 Additional comments (3)
django/thunderstore/core/kafka.py (2)
1-11: LGTM - Clean imports.Imports are well-organized and appropriate for the Kafka integration.
13-14: LGTM - Topic enum is properly structured.Using
str, Enumpattern allows the enum to be used directly as a string while maintaining type safety.django/thunderstore/repository/tasks/downloads.py (1)
7-7: LGTM - Import is correct.The import statement correctly brings in the required Kafka types and functions.
ae2574e to
78d0d77
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
django/thunderstore/core/kafka.py (2)
24-33: Use proper logging instead of print().This issue was flagged in previous reviews: the bare
Exceptioncatch andprint()statement should be replaced with Django's logging framework.Apply the logging pattern from the previous review:
+import logging + +logger = logging.getLogger(__name__) + @shared_task( queue=CeleryQueues.Kafka, name="thunderstore.core.kafka.send_kafka_message_async" ) def send_kafka_message_async( topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None ): try: send_kafka_message(topic=topic, payload=payload, key=key) - except Exception as e: - print(f"Error sending Kafka message to topic {topic}: {e}") + except Exception as e: + logger.exception(f"Error sending Kafka message to topic {topic}") + raiseNote: Consider whether to raise the exception to mark the task as failed in Celery.
36-63: Replace print() with logging and add delivery guarantees.This is the same critical issue flagged in previous reviews. The error handling must use proper logging, catch specific exceptions, and provide delivery callbacks.
As per previous review, apply these fixes:
+import logging +from confluent_kafka import KafkaException, BufferError + +logger = logging.getLogger(__name__) + class KafkaClient: def __init__(self, config: Dict[str, Any]): self._producer = Producer(config) def send( self, topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None, ): try: value_bytes = json.dumps(payload).encode("utf-8") key_bytes = key.encode("utf-8") if key else None - except TypeError as e: - print(f"Failed to serialize payload to JSON for topic {topic}: {e}") + except (TypeError, ValueError) as e: + logger.error(f"Failed to serialize payload to JSON for topic {topic}: {e}") return try: topic_str = topic.value if isinstance(topic, KafkaTopic) else topic self._producer.produce( topic=topic_str, value=value_bytes, key=key_bytes, ) self._producer.poll(0) - except Exception as e: - print("Error producing message in kafka: " + e.__str__()) + except (KafkaException, BufferError) as e: + logger.error(f"Error producing message to Kafka topic {topic}: {e}")Additionally, add a
flush()method for graceful shutdown:def flush(self, timeout: int = 5): """Flush pending messages before shutdown.""" self._producer.flush(timeout=timeout)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
django/thunderstore/core/kafka.py(1 hunks)django/thunderstore/core/settings.py(3 hunks)django/thunderstore/core/tests/test_celery.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- django/thunderstore/core/settings.py
🧰 Additional context used
🧬 Code graph analysis (1)
django/thunderstore/core/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(393-399)
🪛 Ruff (0.14.0)
django/thunderstore/core/kafka.py
32-32: Do not catch blind exception: Exception
(BLE001)
62-62: Do not catch blind exception: Exception
(BLE001)
83-83: Avoid specifying long messages outside the exception class
(TRY003)
86-86: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (5)
django/thunderstore/core/kafka.py (4)
1-11: Imports look good.All necessary dependencies are present for the Kafka integration.
13-14: LGTM.The
str+Enumpattern ensures topic names are type-safe and serializable.
17-21: LGTM.Clean delegation pattern for synchronous Kafka sends.
66-72: LGTM.Appropriate no-op implementation for when Kafka is disabled.
django/thunderstore/core/tests/test_celery.py (1)
44-44: LGTM.Correctly registers the new Kafka Celery task in the known tasks list.
Added ValueError exception, made KAFKA_ENABLED false by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
django/thunderstore/core/kafka.py (2)
24-33: Replace print() with proper logging.The async task still uses
print()for error handling. As noted in previous reviews, use Django's logging framework instead.+import logging + +logger = logging.getLogger(__name__) + @shared_task( queue=CeleryQueues.Kafka, name="thunderstore.core.kafka.send_kafka_message_async" ) def send_kafka_message_async( topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None ): try: send_kafka_message(topic=topic, payload=payload, key=key) except Exception as e: - print(f"Error sending Kafka message to topic {topic}: {e}") + logger.error(f"Error sending Kafka message to topic {topic}: {e}")
36-63: Replace print() with logging and narrow exception handling.As previously noted, the error handling needs improvement:
- Lines 50, 63: Use
logger.error()instead ofprint()- Line 62: Catch specific Kafka exceptions instead of bare
Exception- Consider adding a delivery callback to
produce()for async failure detection- Consider exposing a
flush()method for graceful shutdown+import logging +from confluent_kafka import KafkaException, BufferError + +logger = logging.getLogger(__name__) + class KafkaClient: def __init__(self, config: Dict[str, Any]): self._producer = Producer(config) def send( self, topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None, ): try: value_bytes = json.dumps(payload).encode("utf-8") key_bytes = key.encode("utf-8") if key else None except (TypeError, ValueError) as e: - print(f"Failed to serialize payload to JSON for topic {topic}: {e}") + logger.error(f"Failed to serialize payload to JSON for topic {topic}: {e}") return try: topic_str = topic.value if isinstance(topic, KafkaTopic) else topic self._producer.produce( topic=topic_str, value=value_bytes, key=key_bytes, ) self._producer.poll(0) - except Exception as e: - print("Error producing message in kafka: " + e.__str__()) + except (KafkaException, BufferError) as e: + logger.error(f"Error producing message to Kafka topic {topic}: {e}")
🧹 Nitpick comments (1)
django/thunderstore/core/tests/test_kafka.py (1)
152-152: Escape regex metacharacters in pytest.raises match patterns.The error messages contain periods which are regex metacharacters. Use raw strings or
re.escape()to avoid unintended matching.- with pytest.raises(RuntimeError, match="KAFKA_CONFIG is not configured."): + with pytest.raises(RuntimeError, match=r"KAFKA_CONFIG is not configured\."): get_kafka_client()with pytest.raises( - RuntimeError, match="Kafka bootstrap servers are not configured." + RuntimeError, match=r"Kafka bootstrap servers are not configured\." ):Also applies to: 159-159
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
django/thunderstore/core/kafka.py(1 hunks)django/thunderstore/core/tests/test_kafka.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
django/thunderstore/core/tests/test_kafka.py (1)
django/thunderstore/core/kafka.py (6)
DummyKafkaClient(66-72)KafkaClient(36-63)KafkaTopic(13-14)get_kafka_client(76-88)send_kafka_message(17-21)send_kafka_message_async(27-33)
django/thunderstore/core/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(393-399)
🪛 Ruff (0.14.0)
django/thunderstore/core/tests/test_kafka.py
135-135: Unused method argument: method
(ARG002)
152-152: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
159-159: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
django/thunderstore/core/kafka.py
32-32: Do not catch blind exception: Exception
(BLE001)
62-62: Do not catch blind exception: Exception
(BLE001)
83-83: Avoid specifying long messages outside the exception class
(TRY003)
86-86: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (2)
django/thunderstore/core/tests/test_kafka.py (1)
17-227: Excellent test coverage for the Kafka integration.The test suite thoroughly validates all components: enum values, client initialization and behavior, error handling, configuration validation, and the async wrapper. The setup_method pattern correctly clears the lru_cache to ensure test isolation.
django/thunderstore/core/kafka.py (1)
75-88: Good fix: Kafka is now opt-in by default.The KAFKA_ENABLED default has been changed to False (line 78), addressing the previous concern about Kafka being enabled by default. This is the safer approach.
7156f93 to
af640d4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (1)
django/thunderstore/core/tests/test_kafka.py (1)
199-227: Consider removing unnecessary@pytest.mark.django_dbdecorator.The
send_kafka_message_asyncfunction doesn't perform database operations, so the@pytest.mark.django_dbdecorator is likely unnecessary. If database access isn't required, removing it would speed up test execution.Otherwise, the tests correctly verify delegation and exception handling.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
django/thunderstore/core/tests/test_kafka.py(1 hunks)django/thunderstore/repository/tests/test_download_metrics.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
django/thunderstore/repository/tests/test_download_metrics.py (1)
django/thunderstore/core/kafka.py (1)
KafkaTopic(13-14)
django/thunderstore/core/tests/test_kafka.py (1)
django/thunderstore/core/kafka.py (6)
DummyKafkaClient(66-72)KafkaClient(36-63)KafkaTopic(13-14)get_kafka_client(76-88)send_kafka_message(17-21)send_kafka_message_async(27-33)
🪛 Flake8 (7.3.0)
django/thunderstore/repository/tests/test_download_metrics.py
[error] 1-1: 'datetime.timedelta' imported but unused
(F401)
[error] 6-6: 'django.conf.settings' imported but unused
(F401)
🪛 Ruff (0.14.0)
django/thunderstore/core/tests/test_kafka.py
135-135: Unused method argument: method
(ARG002)
152-152: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
159-159: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (10)
django/thunderstore/core/tests/test_kafka.py (10)
17-21: LGTM!The enum tests correctly verify both the enum member value and the
.valueattribute.
24-36: LGTM!Good fixture design with proper patching, and the init test correctly verifies the producer assignment.
38-71: LGTM!These tests thoroughly verify the
sendmethod behavior with and without keys, including proper JSON encoding and producer interaction.
73-88: LGTM!Correctly tests that string topics are passed through directly without
.valueaccess.
90-104: LGTM!Good coverage of the serialization error path, verifying that produce/poll are not called when payload serialization fails.
106-121: LGTM!Correctly verifies that producer exceptions are caught and handled gracefully. The test confirms
pollis not called after an exception inproduce.
124-131: LGTM!Simple and effective test confirming the dummy client's no-op behavior.
135-141: Cache clearing is correct; consider suppressing the linter warning.The
methodparameter is unused but required by pytest'ssetup_methodsignature. The cache clearing is essential for test isolation.Consider adding a comment or suppression if the unused arg warning is noisy:
def setup_method(self, method): # noqa: ARG002
163-177: LGTM!The test correctly verifies that
KafkaClientis instantiated with the proper config when Kafka is enabled.
180-196: LGTM!Clean test that correctly verifies delegation to the client's send method.
django/thunderstore/core/analytics/tests/test_download_metrics.py
Outdated
Show resolved
Hide resolved
django/thunderstore/core/analytics/tests/test_download_metrics.py
Outdated
Show resolved
Hide resolved
Make sure kafka is disabled by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
django/thunderstore/core/settings.py (1)
598-606: Consider validating required Kafka configuration when enabled.While the configuration structure is sound, there's no validation ensuring required fields (bootstrap servers, credentials) are set when
KAFKA_ENABLEDisTrue. This could lead to runtime errors when the Kafka client is initialized.Consider adding validation after the configuration is defined:
KAFKA_CONFIG = { "bootstrap.servers": env.str("KAFKA_BOOTSTRAP_SERVERS"), "security.protocol": "SASL_SSL", "sasl.mechanism": "SCRAM-SHA-256", "sasl.username": env.str("KAFKA_USERNAME"), "sasl.password": env.str("KAFKA_PASSWORD"), "ssl.ca.location": env.str("KAFKA_CA_CERT"), # Note: parameter name may need correction "client.id": "thunderstore-kafka", } if KAFKA_ENABLED: required_kafka_fields = [ "KAFKA_BOOTSTRAP_SERVERS", "KAFKA_USERNAME", "KAFKA_PASSWORD", "KAFKA_CA_CERT", ] missing = [f for f in required_kafka_fields if not env.str(f)] if missing: raise RuntimeError(f"Kafka is enabled but missing required config: {', '.join(missing)}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
django/thunderstore/core/settings.py(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build docker image
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
🔇 Additional comments (2)
django/thunderstore/core/settings.py (2)
139-143: LGTM! Past review concern addressed.The Kafka environment variables are properly declared with sensible defaults.
KAFKA_ENABLEDcorrectly defaults toFalse, ensuring Kafka remains disabled until explicitly configured.
399-399: LGTM!The Kafka queue constant follows the existing naming convention and integrates cleanly with the CeleryQueues class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reasonable, good work. Also requesting a review from @Oksamies as he has a better eye for this sorta stuff.
fa240e4 to
faf0703
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
django/thunderstore/core/analytics/tests/test_kafka.py (1)
165-173: Use raw strings for regex patterns in pytest.raises match=.Prevents unintended escapes (RUF043).
- with pytest.raises(RuntimeError, match="KAFKA_CONFIG is not configured."): + with pytest.raises(RuntimeError, match=r"KAFKA_CONFIG is not configured\."): @@ - with pytest.raises( - RuntimeError, match="Kafka bootstrap servers are not configured." - ): + with pytest.raises( + RuntimeError, match=r"Kafka bootstrap servers are not configured\." + ):django/thunderstore/core/analytics/signals.py (1)
18-28: Enqueue Kafka sends with Celery and pass topic strings (avoid Enum JSON issues).Call the task with
.delay(...)insideon_committo avoid blocking/double-counting, and passKafkaTopic.*.valueso Celery JSON serializer is happy.- transaction.on_commit( - lambda: send_kafka_message_async( - topic=KafkaTopic.PACKAGE_DOWNLOADED, + transaction.on_commit( + lambda: send_kafka_message_async.delay( + topic=KafkaTopic.PACKAGE_DOWNLOADED.value, payload={ "id": instance.id, "version_id": instance.version_id, "timestamp": instance.timestamp.isoformat(), }, ) ) @@ - transaction.on_commit( - lambda: send_kafka_message_async( - topic=KafkaTopic.PACKAGE_UPDATED, + transaction.on_commit( + lambda: send_kafka_message_async.delay( + topic=KafkaTopic.PACKAGE_UPDATED.value, payload={ "package_id": instance.id, "is_active": instance.is_active, "owner": instance.owner.name, "name": instance.name, "date_created": instance.date_created.isoformat(), "date_updated": instance.date_updated.isoformat(), "is_deprecated": instance.is_deprecated, "is_pinned": instance.is_pinned, }, ) ) @@ - transaction.on_commit( - lambda: send_kafka_message_async( - topic=KafkaTopic.PACKAGE_VERSION_UPDATED, + transaction.on_commit( + lambda: send_kafka_message_async.delay( + topic=KafkaTopic.PACKAGE_VERSION_UPDATED.value, payload={ "id": instance.id, "is_active": instance.is_active, "owner": instance.package.owner.name, "name": instance.name, "version_number": instance.version_number, "package_id": instance.package_id, "downloads": instance.downloads, "date_created": instance.date_created.isoformat(), "file_size": instance.file_size, }, ) ) @@ - transaction.on_commit( - lambda: send_kafka_message_async( - topic=KafkaTopic.PACKAGE_LISTING_UPDATED, + transaction.on_commit( + lambda: send_kafka_message_async.delay( + topic=KafkaTopic.PACKAGE_LISTING_UPDATED.value, payload={ "id": instance.id, "has_nsfw_content": instance.has_nsfw_content, "package_id": instance.package_id, - "datetime_created": instance.datetime_created.isoformat(), - "datetime_updated": instance.datetime_updated.isoformat(), + "datetime_created": instance.datetime_created.isoformat(), + "datetime_updated": instance.datetime_updated.isoformat(), "review_status": instance.review_status, }, ) ) @@ - transaction.on_commit( - lambda: send_kafka_message_async( - topic=KafkaTopic.COMMUNITY_UPDATED, + transaction.on_commit( + lambda: send_kafka_message_async.delay( + topic=KafkaTopic.COMMUNITY_UPDATED.value, payload={ "id": instance.id, "identifier": instance.identifier, "name": instance.name, - "datetime_created": instance.datetime_created.isoformat(), - "datetime_updated": instance.datetime_updated.isoformat(), + "datetime_created": instance.datetime_created.isoformat(), + "datetime_updated": instance.datetime_updated.isoformat(), }, ) )Also applies to: 36-50, 58-73, 81-93, 101-112
django/thunderstore/repository/tasks/downloads.py (1)
7-7: Remove unused Kafka imports (lint blocker).Kafka events are emitted via signals; these imports are unused.
Apply:
-from thunderstore.core.analytics.kafka import KafkaTopic, send_kafka_message
🧹 Nitpick comments (3)
django/thunderstore/core/analytics/tests/test_kafka.py (1)
148-156: Silence unusedmethodarg in setup_method.Rename to
_methodto satisfy Ruff ARG002.- def setup_method(self, method): + def setup_method(self, _method):django/thunderstore/core/analytics/signals.py (1)
1-6: Drop unused import and mark unused signal args.
- Remove unused
timezone.- For handlers that don’t use
sender/created/kwargs, prefix with_to satisfy Ruff ARG001.-from django.utils import timezone @@ -def package_post_save(sender, instance, created, **kwargs): +def package_post_save(sender, instance, _created, **_kwargs): @@ -def package_version_post_save(sender, instance, created, **kwargs): +def package_version_post_save(sender, instance, _created, **_kwargs): @@ -def package_listing_post_save(sender, instance, created, **kwargs): +def package_listing_post_save(sender, instance, _created, **_kwargs): @@ -def community_post_save(sender, instance, created, **kwargs): +def community_post_save(sender, instance, _created, **_kwargs):django/thunderstore/core/analytics/kafka.py (1)
6-8: Guardconfluent_kafkaimport to improve portability.Make module import-safe when the lib isn’t installed (e.g., local/CI without Kafka). Raise clearly if enabled but Producer missing.
-from confluent_kafka import Producer +try: + from confluent_kafka import Producer # type: ignore +except Exception: # pragma: no cover + Producer = None # type: ignore @@ class KafkaClient: def __init__(self, config: Dict[str, Any]): - self._producer = Producer(config) + if Producer is None: + raise RuntimeError("confluent_kafka is not installed") + self._producer = Producer(config) @@ def get_kafka_client() -> Union[KafkaClient, DummyKafkaClient]: @@ - return KafkaClient(config) + # Optional: validate SASL when requested + if config.get("security.protocol") == "SASL_SSL": + if not config.get("sasl.username"): + raise RuntimeError("SASL username is required when using SASL_SSL.") + if not config.get("sasl.password"): + raise RuntimeError("SASL password is required when using SASL_SSL.") + return KafkaClient(config)Also applies to: 38-41, 70-83
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
django/thunderstore/core/analytics/__init__.py(1 hunks)django/thunderstore/core/analytics/kafka.py(1 hunks)django/thunderstore/core/analytics/signals.py(1 hunks)django/thunderstore/core/analytics/tests/test_kafka.py(1 hunks)django/thunderstore/repository/tasks/downloads.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
django/thunderstore/core/analytics/tests/test_kafka.py (1)
django/thunderstore/core/analytics/kafka.py (8)
DummyKafkaClient(61-67)KafkaClient(38-58)KafkaTopic(13-18)get_kafka_client(71-83)send_kafka_message(21-25)send_kafka_message_async(32-35)send(42-58)send(64-67)
django/thunderstore/core/analytics/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(393-399)
django/thunderstore/repository/tasks/downloads.py (1)
django/thunderstore/core/analytics/kafka.py (2)
KafkaTopic(13-18)send_kafka_message(21-25)
django/thunderstore/core/analytics/signals.py (4)
django/thunderstore/community/models/package_listing.py (1)
PackageListing(62-437)django/thunderstore/core/analytics/kafka.py (2)
KafkaTopic(13-18)send_kafka_message_async(32-35)django/thunderstore/repository/models/package.py (1)
Package(49-445)django/thunderstore/repository/models/package_version.py (2)
owner(230-231)is_deprecated(234-235)
🪛 Flake8 (7.3.0)
django/thunderstore/core/analytics/__init__.py
[error] 1-1: 'thunderstore.core.analytics.kafka' imported but unused
(F401)
[error] 1-1: 'thunderstore.core.analytics.signals' imported but unused
(F401)
django/thunderstore/repository/tasks/downloads.py
[error] 7-7: 'thunderstore.core.analytics.kafka.KafkaTopic' imported but unused
(F401)
[error] 7-7: 'thunderstore.core.analytics.kafka.send_kafka_message' imported but unused
(F401)
django/thunderstore/core/analytics/signals.py
[error] 3-3: 'django.utils.timezone' imported but unused
(F401)
🪛 Ruff (0.14.0)
django/thunderstore/core/analytics/tests/test_kafka.py
148-148: Unused method argument: method
(ARG002)
165-165: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
172-172: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
django/thunderstore/core/analytics/kafka.py
78-78: Avoid specifying long messages outside the exception class
(TRY003)
81-81: Avoid specifying long messages outside the exception class
(TRY003)
django/thunderstore/core/analytics/signals.py
13-13: Unused function argument: sender
(ARG001)
13-13: Unused function argument: kwargs
(ARG001)
31-31: Unused function argument: sender
(ARG001)
31-31: Unused function argument: created
(ARG001)
31-31: Unused function argument: kwargs
(ARG001)
53-53: Unused function argument: sender
(ARG001)
53-53: Unused function argument: created
(ARG001)
53-53: Unused function argument: kwargs
(ARG001)
76-76: Unused function argument: sender
(ARG001)
76-76: Unused function argument: created
(ARG001)
76-76: Unused function argument: kwargs
(ARG001)
96-96: Unused function argument: sender
(ARG001)
96-96: Unused function argument: created
(ARG001)
96-96: Unused function argument: kwargs
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
🔇 Additional comments (1)
django/thunderstore/core/analytics/signals.py (1)
81-93: No field name mismatches found; attributes exist and are correctly named.Both
PackageListingandCommunityinherit fromTimestampMixin, which definesdatetime_createdanddatetime_updatedfields. The signal code correctly uses these attribute names.
faf0703 to
64b2678
Compare
| @lru_cache(maxsize=1) | ||
| def get_kafka_client() -> Union[KafkaClient, DummyKafkaClient]: | ||
| # Return dummy client if Kafka is disabled | ||
| if not getattr(settings, "KAFKA_ENABLED", False): | ||
| return DummyKafkaClient() | ||
|
|
||
| config = getattr(settings, "KAFKA_CONFIG", None) | ||
| if not config: | ||
| raise RuntimeError("KAFKA_CONFIG is not configured.") | ||
|
|
||
| if not config.get("bootstrap.servers"): | ||
| raise RuntimeError("Kafka bootstrap servers are not configured.") | ||
|
|
||
| return KafkaClient(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource Leak: The Kafka producer is cached via @lru_cache(maxsize=1) but never properly closed. The producer maintains network connections and internal buffers that should be cleaned up on shutdown. This will cause:
- Resource leaks on application restarts
- Potential message loss when the process terminates
- TCP connections left in TIME_WAIT state
The producer should be properly managed as a singleton with cleanup:
# Remove @lru_cache and use a proper singleton pattern with __del__ or
# register an atexit handler to call producer.flush() and producer.close()
import atexit
class KafkaClient:
def __init__(self, config: Dict[str, Any]):
self._producer = Producer(config)
atexit.register(self.close)
def close(self):
if hasattr(self, '_producer'):
self._producer.flush()Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
This comment came from an experimental review—please leave feedback if it was helpful/unhelpful. Learn more about experimental comments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this valid? (the AI scanning warning)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
django/thunderstore/core/analytics/tests/test_kafka.py (2)
165-165: Use raw string for regex pattern.The period is a metacharacter; use a raw string to make the pattern explicit.
Apply this diff:
- with pytest.raises(RuntimeError, match="KAFKA_CONFIG is not configured."): + with pytest.raises(RuntimeError, match=r"KAFKA_CONFIG is not configured\."): get_kafka_client()
172-172: Use raw string for regex pattern.The period is a metacharacter; use a raw string to make the pattern explicit.
Apply this diff:
with pytest.raises( - RuntimeError, match="Kafka bootstrap servers are not configured." + RuntimeError, match=r"Kafka bootstrap servers are not configured\." ):
🧹 Nitpick comments (4)
django/thunderstore/core/analytics/__init__.py (1)
1-1: Add__all__to clarify public interface and silence F401.The re-export pattern is idiomatic, but adding
__all__makes the public API explicit and resolves the linter warning.Apply this diff:
from thunderstore.core.analytics import kafka, signals + +__all__ = ["kafka", "signals"]django/thunderstore/core/analytics/signals.py (1)
3-3: Remove unused import.The
timezonemodule is imported but never used.Apply this diff:
from django.db import transaction from django.db.models import signals -from django.utils import timezonedjango/thunderstore/core/analytics/kafka.py (2)
42-58: Consider error handling strategy for resilience.The
send()method lets JSON serialization and producer exceptions propagate. While tests confirm this behavior, consider whether you want:
- Current approach (fail-fast): Exceptions propagate → Celery retries → alerting. Good for catching config/schema issues early.
- Resilient approach: Catch exceptions, log with structured logging, and return gracefully to prevent task queue backups.
If resilience is preferred, apply:
+import logging + +logger = logging.getLogger(__name__) + class KafkaClient: def __init__(self, config: Dict[str, Any]): self._producer = Producer(config) def send( self, topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None, ): - value_bytes = json.dumps(payload).encode("utf-8") - key_bytes = key.encode("utf-8") if key else None + try: + value_bytes = json.dumps(payload).encode("utf-8") + except (TypeError, ValueError): + logger.exception("Kafka payload JSON serialization failed") + return + + key_bytes = key.encode("utf-8") if key else None topic_str = topic.value if isinstance(topic, KafkaTopic) else topic - self._producer.produce( - topic=topic_str, - value=value_bytes, - key=key_bytes, - ) - - self._producer.poll(0) + try: + self._producer.produce( + topic=topic_str, + value=value_bytes, + key=key_bytes, + ) + self._producer.poll(0) + except Exception: + logger.exception("Kafka produce failed")Note: This would require updating tests to mock the logger.
Which approach do you prefer? I can help refactor if you choose the resilient approach.
32-35: Consider adding error handling to prevent task failures.As a Celery task, uncaught exceptions will trigger retries. If Kafka is temporarily unavailable, this could cause task queue backups. Consider catching exceptions and logging them:
+import logging + +logger = logging.getLogger(__name__) + @shared_task( queue=CeleryQueues.Analytics, name="thunderstore.core.analytics.send_kafka_message_async", ) def send_kafka_message_async( topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None ): - send_kafka_message(topic=topic, payload=payload, key=key) + try: + send_kafka_message(topic=topic, payload=payload, key=key) + except Exception: + logger.exception(f"Failed to send Kafka message to {topic}")This ensures the task completes even if Kafka operations fail, while still logging for monitoring.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
django/thunderstore/core/analytics/__init__.py(1 hunks)django/thunderstore/core/analytics/kafka.py(1 hunks)django/thunderstore/core/analytics/signals.py(1 hunks)django/thunderstore/core/analytics/tests/test_kafka.py(1 hunks)docker-compose.yml(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
django/thunderstore/core/analytics/tests/test_kafka.py (1)
django/thunderstore/core/analytics/kafka.py (8)
DummyKafkaClient(61-67)KafkaClient(38-58)KafkaTopic(13-18)get_kafka_client(71-83)send_kafka_message(21-25)send_kafka_message_async(32-35)send(42-58)send(64-67)
django/thunderstore/core/analytics/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(393-399)
django/thunderstore/core/analytics/signals.py (3)
django/thunderstore/community/models/package_listing.py (1)
PackageListing(62-437)django/thunderstore/core/analytics/kafka.py (2)
KafkaTopic(13-18)send_kafka_message_async(32-35)django/thunderstore/repository/models/package.py (2)
Package(49-445)downloads(195-197)
🪛 Checkov (3.2.334)
docker-compose.yml
[medium] 12-13: Basic Auth Credentials
(CKV_SECRET_4)
🪛 Flake8 (7.3.0)
django/thunderstore/core/analytics/__init__.py
[error] 1-1: 'thunderstore.core.analytics.kafka' imported but unused
(F401)
[error] 1-1: 'thunderstore.core.analytics.signals' imported but unused
(F401)
django/thunderstore/core/analytics/signals.py
[error] 3-3: 'django.utils.timezone' imported but unused
(F401)
🪛 Ruff (0.14.0)
django/thunderstore/core/analytics/tests/test_kafka.py
148-148: Unused method argument: method
(ARG002)
165-165: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
172-172: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
django/thunderstore/core/analytics/kafka.py
78-78: Avoid specifying long messages outside the exception class
(TRY003)
81-81: Avoid specifying long messages outside the exception class
(TRY003)
django/thunderstore/core/analytics/signals.py
11-11: Unused function argument: sender
(ARG001)
11-11: Unused function argument: kwargs
(ARG001)
29-29: Unused function argument: sender
(ARG001)
29-29: Unused function argument: created
(ARG001)
29-29: Unused function argument: kwargs
(ARG001)
51-51: Unused function argument: sender
(ARG001)
51-51: Unused function argument: created
(ARG001)
51-51: Unused function argument: kwargs
(ARG001)
74-74: Unused function argument: sender
(ARG001)
74-74: Unused function argument: created
(ARG001)
74-74: Unused function argument: kwargs
(ARG001)
94-94: Unused function argument: sender
(ARG001)
94-94: Unused function argument: created
(ARG001)
94-94: Unused function argument: kwargs
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (8)
docker-compose.yml (1)
13-13: LGTM!The addition of the
analyticsqueue aligns with the new Kafka analytics infrastructure and theCeleryQueues.Analyticsconstant in settings.py.django/thunderstore/core/analytics/tests/test_kafka.py (1)
20-21: Tests correctly use PACKAGE_DOWNLOADED enum.Note: A past review comment incorrectly flagged these tests as using a non-existent enum
PACKAGE_DOWNLOADED. The enum is correctly defined inkafka.pylines 12-18, and the tests are accurate.django/thunderstore/core/analytics/kafka.py (2)
70-83: LGTM! Clean fallback and validation logic.The cached factory correctly returns
DummyKafkaClientwhen disabled, validates required config, and provides clear error messages.
13-18: LGTM! Well-defined topic enum.The
KafkaTopicenum provides type-safe topic references aligned with the domain events.django/thunderstore/core/analytics/signals.py (4)
42-43: No changes needed.The Package model defines both
date_createdanddate_updatedasDateTimeField(auto_now_add=True)withoutnull=True, confirming they are non-nullable. The.isoformat()calls are safe.
67-67: No issue found. ThePackageVersion.date_createdfield is non-nullable.The field is defined as
models.DateTimeField(auto_now_add=True)with nonull=Trueparameter, making it non-nullable. The.isoformat()call is safe.
106-107: No issues found.The
datetime_createdanddatetime_updatedfields are defined in TimestampMixin asDateTimeField(auto_now_add=True)andDateTimeField(auto_now=True)respectively, which means they're non-nullable and always populated. The.isoformat()calls are safe.
86-87: No issues found — datetime fields are non-nullable.PackageListing inherits from TimestampMixin, which defines
datetime_createdanddatetime_updatedasmodels.DateTimeField(auto_now_add=True)andmodels.DateTimeField(auto_now=True)respectively. These fields are non-nullable by default and automatically populated by Django, making the.isoformat()calls safe.
64b2678 to
b9c2462
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
django/thunderstore/ts_analytics/tests/test_kafka.py (1)
165-174: Use raw strings for regex patterns in pytest.raises match=.Prevents escape ambiguities and satisfies Ruff RUF043.
- with pytest.raises(RuntimeError, match="KAFKA_CONFIG is not configured."): + with pytest.raises(RuntimeError, match=r"KAFKA_CONFIG is not configured\."): @@ - with pytest.raises( - RuntimeError, match="Kafka bootstrap servers are not configured." - ): + with pytest.raises( + RuntimeError, match=r"Kafka bootstrap servers are not configured\." + ):django/thunderstore/core/settings.py (1)
599-607: Wrong CA config key for Confluent Kafka; use ssl.ca.location for file paths.Current code passes a path/string to ssl.ca.pem, which expects in‑memory PEM. For file paths, use ssl.ca.location or omit if not needed.
- "ssl.ca.pem": env.str("KAFKA_CA_CERT"), + "ssl.ca.location": env.str("KAFKA_CA_CERT"),
🧹 Nitpick comments (9)
docker-compose.yml (1)
12-13: Avoid hardcoding broker credentials in the URL (even in dev).Use Compose var substitution so creds come from the RabbitMQ env you already define.
- CELERY_BROKER_URL: "pyamqp://django:django@rabbitmq/django" + CELERY_BROKER_URL: "pyamqp://${RABBITMQ_DEFAULT_USER:-django}:${RABBITMQ_DEFAULT_PASS:-django}@rabbitmq/${RABBITMQ_DEFAULT_VHOST:-django}"django/thunderstore/ts_analytics/tests/test_kafka.py (1)
148-155: Silence ARG002: mark setup_method arg as intentionally unused.- def setup_method(self, method): + def setup_method(self, _method):django/thunderstore/ts_analytics/signals.py (5)
6-21: Add a partition key for better ordering and locality.Consider keying downloads by version to preserve ordering on a partition.
- lambda: send_kafka_message_async.delay( + lambda: send_kafka_message_async.delay( topic=KafkaTopic.PACKAGE_DOWNLOADED, payload={ "id": instance.id, "version_id": instance.version_id, "timestamp": instance.timestamp.isoformat(), }, + key=str(instance.version_id), )
24-43: Minor: reduce linter noise for unused signal args.Rename unused args to underscored to satisfy linters without changing behavior.
-def package_post_save(sender, instance, created, **kwargs): +def package_post_save(_sender, instance, _created, **_kwargs):Apply similarly to other handlers below.
51-66: Add partitioning key to version updates.Key by package_id or version_id for stable ordering.
- lambda: send_kafka_message_async.delay( + lambda: send_kafka_message_async.delay( topic=KafkaTopic.PACKAGE_VERSION_UPDATED, payload={ "id": instance.id, "is_active": instance.is_active, "owner": instance.package.owner.name, "name": instance.name, "version_number": instance.version_number, "package_id": instance.package_id, "downloads": instance.downloads, "date_created": instance.date_created.isoformat(), "file_size": instance.file_size, }, + key=str(instance.package_id), )
74-86: Add partitioning key to listing updates.- lambda: send_kafka_message_async.delay( + lambda: send_kafka_message_async.delay( topic=KafkaTopic.PACKAGE_LISTING_UPDATED, payload={ "id": instance.id, "has_nsfw_content": instance.has_nsfw_content, "package_id": instance.package_id, "datetime_created": instance.datetime_created.isoformat(), "datetime_updated": instance.datetime_updated.isoformat(), "review_status": instance.review_status, }, + key=str(instance.package_id), )
94-105: Add partitioning key to community updates.- lambda: send_kafka_message_async.delay( + lambda: send_kafka_message_async.delay( topic=KafkaTopic.COMMUNITY_UPDATED, payload={ "id": instance.id, "identifier": instance.identifier, "name": instance.name, "datetime_created": instance.datetime_created.isoformat(), "datetime_updated": instance.datetime_updated.isoformat(), }, + key=str(instance.id), )django/thunderstore/ts_analytics/kafka.py (2)
38-41: Flush producer on shutdown to avoid message loss.Register an atexit handler to flush the producer buffers.
+import atexit @@ class KafkaClient: def __init__(self, config: Dict[str, Any]): self._producer = Producer(config) + atexit.register(self.close) + + def close(self): + try: + # Attempt graceful flush; non-blocking poll is already used per send + self._producer.flush() + except Exception: + pass
70-83: Validate SASL creds when SASL_SSL is configured.Prevents confusing runtime errors due to missing auth while enabled.
if not config.get("bootstrap.servers"): raise RuntimeError("Kafka bootstrap servers are not configured.") + # Basic auth validation for SASL_SSL + if config.get("security.protocol") == "SASL_SSL": + if not config.get("sasl.username"): + raise RuntimeError("SASL username is required when using SASL_SSL.") + if not config.get("sasl.password"): + raise RuntimeError("SASL password is required when using SASL_SSL.") + return KafkaClient(config)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
django/thunderstore/core/settings.py(4 hunks)django/thunderstore/core/tests/test_celery.py(1 hunks)django/thunderstore/ts_analytics/__init__.py(1 hunks)django/thunderstore/ts_analytics/apps.py(1 hunks)django/thunderstore/ts_analytics/kafka.py(1 hunks)django/thunderstore/ts_analytics/signals.py(1 hunks)django/thunderstore/ts_analytics/tests/test_kafka.py(1 hunks)docker-compose.yml(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- django/thunderstore/ts_analytics/init.py
🚧 Files skipped from review as they are similar to previous changes (1)
- django/thunderstore/core/tests/test_celery.py
🧰 Additional context used
🧬 Code graph analysis (4)
django/thunderstore/ts_analytics/signals.py (4)
django/thunderstore/ts_analytics/kafka.py (2)
KafkaTopic(13-18)send_kafka_message_async(32-35)django/thunderstore/repository/models/package_version.py (2)
owner(230-231)is_deprecated(234-235)django/thunderstore/repository/models/package.py (2)
version_number(212-213)downloads(195-197)django/thunderstore/repository/admin/package.py (1)
file_size(148-149)
django/thunderstore/ts_analytics/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(394-400)
django/thunderstore/ts_analytics/apps.py (3)
django/thunderstore/community/models/package_listing.py (1)
PackageListing(62-437)django/thunderstore/repository/models/package.py (1)
Package(49-445)django/thunderstore/ts_analytics/signals.py (5)
community_post_save(89-105)package_listing_post_save(69-86)package_post_save(24-43)package_version_download_event_post_save(6-21)package_version_post_save(46-66)
django/thunderstore/ts_analytics/tests/test_kafka.py (1)
django/thunderstore/ts_analytics/kafka.py (8)
DummyKafkaClient(61-67)KafkaClient(38-58)KafkaTopic(13-18)get_kafka_client(71-83)send_kafka_message(21-25)send_kafka_message_async(32-35)send(42-58)send(64-67)
🪛 Checkov (3.2.334)
docker-compose.yml
[medium] 12-13: Basic Auth Credentials
(CKV_SECRET_4)
🪛 Ruff (0.14.1)
django/thunderstore/ts_analytics/signals.py
6-6: Unused function argument: sender
(ARG001)
6-6: Unused function argument: kwargs
(ARG001)
24-24: Unused function argument: sender
(ARG001)
24-24: Unused function argument: created
(ARG001)
24-24: Unused function argument: kwargs
(ARG001)
46-46: Unused function argument: sender
(ARG001)
46-46: Unused function argument: created
(ARG001)
46-46: Unused function argument: kwargs
(ARG001)
69-69: Unused function argument: sender
(ARG001)
69-69: Unused function argument: created
(ARG001)
69-69: Unused function argument: kwargs
(ARG001)
89-89: Unused function argument: sender
(ARG001)
89-89: Unused function argument: created
(ARG001)
89-89: Unused function argument: kwargs
(ARG001)
django/thunderstore/ts_analytics/kafka.py
78-78: Avoid specifying long messages outside the exception class
(TRY003)
81-81: Avoid specifying long messages outside the exception class
(TRY003)
django/thunderstore/ts_analytics/tests/test_kafka.py
148-148: Unused method argument: method
(ARG002)
165-165: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
172-172: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (2)
docker-compose.yml (1)
13-13: Queue added correctly; verify workers pick it up.Looks good. Please confirm django-worker and django-beat inherit this env (they do via anchors) and that CELERY_TASK_CREATE_MISSING_QUEUES=True in settings so the analytics queue is created automatically.
django/thunderstore/ts_analytics/kafka.py (1)
28-36: Confirm failure semantics for the async task.Exceptions propagate from send_kafka_message_async. If Celery retries are enabled, this may create retries/backlogs. If you want fire-and-forget, consider catching/logging inside the task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
django/thunderstore/ts_analytics/apps.py (1)
22-46: Past review addressed:dispatch_uidprevents duplicate connections.The
dispatch_uidparameters now ensure idempotent signal registration during Django autoreload. The past review also suggestedweak=False, but that's optional here since the receivers are module-level functions that won't be garbage collected.If you want to be fully explicit, you could add
weak=Falseto each connection:post_save.connect( receiver=package_version_download_event_post_save, sender=PackageVersionDownloadEvent, dispatch_uid="analytics_package_version_download_event_post_save", + weak=False, )Apply similar changes to the remaining four connections.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
django/thunderstore/ts_analytics/apps.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
django/thunderstore/ts_analytics/apps.py (3)
django/thunderstore/community/models/package_listing.py (1)
PackageListing(62-437)django/thunderstore/repository/models/package.py (1)
Package(49-445)django/thunderstore/ts_analytics/signals.py (5)
community_post_save(89-105)package_listing_post_save(69-86)package_post_save(24-43)package_version_download_event_post_save(6-21)package_version_post_save(46-66)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Analyze (javascript)
- GitHub Check: Analyze (python)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (3)
django/thunderstore/ts_analytics/apps.py (3)
1-3: LGTM: Clean imports.Standard Django imports for app configuration and signal handling.
5-8: LGTM: Standard AppConfig setup.
9-20: LGTM: Correct import placement.Importing models and handlers inside
ready()prevents circular imports—standard Django pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, but I'll set as request changes until questions are answered at minimum.
| @lru_cache(maxsize=1) | ||
| def get_kafka_client() -> Union[KafkaClient, DummyKafkaClient]: | ||
| # Return dummy client if Kafka is disabled | ||
| if not getattr(settings, "KAFKA_ENABLED", False): | ||
| return DummyKafkaClient() | ||
|
|
||
| config = getattr(settings, "KAFKA_CONFIG", None) | ||
| if not config: | ||
| raise RuntimeError("KAFKA_CONFIG is not configured.") | ||
|
|
||
| if not config.get("bootstrap.servers"): | ||
| raise RuntimeError("Kafka bootstrap servers are not configured.") | ||
|
|
||
| return KafkaClient(config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this valid? (the AI scanning warning)
Remove Celery task results from kafka task for performance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
django/thunderstore/ts_analytics/tests/test_kafka.py (1)
158-159: Use raw strings for regex patterns in pytest.raises.Avoid accidental regex metacharacters. Escape the trailing dot.
- with pytest.raises(RuntimeError, match="KAFKA_CONFIG is not configured."): + with pytest.raises(RuntimeError, match=r"KAFKA_CONFIG is not configured\."): get_kafka_client() @@ - with pytest.raises( - RuntimeError, match="Kafka bootstrap servers are not configured." - ): + with pytest.raises( + RuntimeError, match=r"Kafka bootstrap servers are not configured\." + ): get_kafka_client()Also applies to: 164-167
🧹 Nitpick comments (5)
django/thunderstore/ts_analytics/tests/test_kafka.py (3)
141-148: Silence unused setup_method argument warning (ARG002).Rename the parameter or add a noqa.
- def setup_method(self, method): + def setup_method(self, _method): """ CRITICAL FIX: Clear the lru_cache for get_kafka_client before each test. This ensures that settings overrides (like KAFKA_ENABLED=True) are respected and the client initialization logic is re-run. """ get_kafka_client.cache_clear()
206-207: Drop unnecessary django_db marker on async wrapper tests.These tests don’t hit the DB; removing the marker speeds up the suite.
-@pytest.mark.django_db -class TestSendKafkaMessageAsync: +class TestSendKafkaMessageAsync:
31-58: Broaden coverage across all KafkaTopic values.Parametrize send tests over KafkaTopic members to ensure topic.value mapping works for every enum.
Also applies to: 66-82
django/thunderstore/ts_analytics/kafka.py (2)
39-42: Flush producer on shutdown to avoid message loss and resource leaks.
KafkaClientis cached and never closed; pending messages may be dropped on process exit.-import json +import json +import atexit +import logging @@ class KafkaClient: def __init__(self, config: Dict[str, Any]): - self._producer = Producer(config) + self._producer = Producer(config) + self._closed = False + atexit.register(self.close) @@ self._producer.poll(0) + def close(self): + if self._closed: + return + try: + # Allow up to 5s to deliver queued messages + self._producer.flush(5) + except Exception: # best-effort on shutdown + logging.getLogger(__name__).exception("Kafka producer flush failed on shutdown") + finally: + self._closed = TrueAlso applies to: 59-60
71-84: Validate SASL credentials whensecurity.protocol=SASL_SSL.Pre-flight checks reduce runtime connection failures.
@lru_cache(maxsize=1) def get_kafka_client() -> Union[KafkaClient, DummyKafkaClient]: @@ - if not config.get("bootstrap.servers"): + if not config.get("bootstrap.servers"): raise RuntimeError("Kafka bootstrap servers are not configured.") + # Optional: auth validation + if config.get("security.protocol") == "SASL_SSL": + if not config.get("sasl.username"): + raise RuntimeError("SASL username is required when using SASL_SSL.") + if not config.get("sasl.password"): + raise RuntimeError("SASL password is required when using SASL_SSL.") + return KafkaClient(config)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
django/thunderstore/ts_analytics/kafka.py(1 hunks)django/thunderstore/ts_analytics/tests/test_kafka.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
django/thunderstore/ts_analytics/tests/test_kafka.py (1)
django/thunderstore/ts_analytics/kafka.py (8)
DummyKafkaClient(62-68)KafkaClient(39-59)KafkaTopic(13-18)get_kafka_client(72-84)send_kafka_message(21-25)send_kafka_message_async(33-36)send(43-59)send(65-68)
django/thunderstore/ts_analytics/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(394-400)
🪛 Ruff (0.14.1)
django/thunderstore/ts_analytics/tests/test_kafka.py
141-141: Unused method argument: method
(ARG002)
158-158: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
165-165: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
django/thunderstore/ts_analytics/kafka.py
79-79: Avoid specifying long messages outside the exception class
(TRY003)
82-82: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build docker image
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
🔇 Additional comments (1)
django/thunderstore/ts_analytics/kafka.py (1)
43-60: Exception propagation in send() is consistent with tests.Allowing JSON/producer errors to bubble is fine given current expectations; no changes needed.
Ensure upstream callers either handle exceptions or rely on Celery retries/backoff where applicable.
Use str instead of Union[KafkaTopic, str]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
django/thunderstore/ts_analytics/kafka.py (4)
26-33: Harden async task: swallow/log errors to avoid task churn/backoffs.Wrap the sync call in try/except; log and return so failures don’t create retries/backlogs.
@shared_task( queue=CeleryQueues.Analytics, name="thunderstore.analytics.send_kafka_message_async", ignore_result=True, ) def send_kafka_message_async(topic: str, payload: dict, key: Optional[str] = None): - send_kafka_message(topic=topic, payload=payload, key=key) + try: + send_kafka_message(topic=topic, payload=payload, key=key) + except Exception: + logger.exception("Failed to send Kafka message to %s", topic)
73-80: Validate SASL credentials when SASL_ protocol is configured.*Preempt runtime connection errors by asserting required
sasl.username/passwordwhen using SASL.if not config.get("bootstrap.servers"): raise RuntimeError("Kafka bootstrap servers are not configured.") + protocol = (config.get("security.protocol") or "").upper() + if protocol in {"SASL_SSL", "SASL_PLAINTEXT"}: + if not config.get("sasl.username"): + raise RuntimeError("SASL username is required when using %s." % protocol) + if not config.get("sasl.password"): + raise RuntimeError("SASL password is required when using %s." % protocol) + return KafkaClient(config)
7-7: Producer lifecycle: add lazy import + graceful shutdown (flush) to avoid leaks and hard dependency when disabled.Cache makes the producer effectively a singleton, but it’s never flushed/closed; also
confluent_kafkais imported even if Kafka is disabled. Lazy-import the Producer in__init__, add an atexit flush, and a safe__del__. This prevents message loss on shutdown and avoids import-time failures in environments without Kafka when disabled.@@ -import json +import json +import logging +import atexit @@ -from confluent_kafka import Producer from django.conf import settings @@ from thunderstore.core.settings import CeleryQueues + +logger = logging.getLogger(__name__) @@ class KafkaClient: def __init__(self, config: Dict[str, Any]): - self._producer = Producer(config) + # Lazy import to avoid hard dependency when Kafka is disabled + from confluent_kafka import Producer as _Producer + self._producer = _Producer(config) + atexit.register(self.close) + + def close(self): + if hasattr(self, "_producer"): + try: + # Flush delivery queue; avoid blocking indefinitely + self._producer.flush(5.0) + except Exception: + logger.exception("Kafka producer flush on shutdown failed") + + def __del__(self): + try: + self.close() + except Exception: + passAlso applies to: 35-38, 67-67
39-56: Make send() resilient: catch JSON serialization and produce errors; poll only on success.Uncaught exceptions here will bubble to callers/tasks. Guard both steps and skip
poll(0)on failure.def send( self, topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None, ): - value_bytes = json.dumps(payload).encode("utf-8") - key_bytes = key.encode("utf-8") if key else None - - topic_str = topic.value if isinstance(topic, KafkaTopic) else topic - self._producer.produce( - topic=topic_str, - value=value_bytes, - key=key_bytes, - ) - - self._producer.poll(0) + try: + value_bytes = json.dumps(payload).encode("utf-8") + except (TypeError, ValueError): + logger.exception("Kafka payload JSON serialization failed") + return + + key_bytes = key.encode("utf-8") if key else None + topic_str = topic.value if isinstance(topic, KafkaTopic) else topic + try: + self._producer.produce(topic=topic_str, value=value_bytes, key=key_bytes) + except Exception: + logger.exception("Kafka produce failed") + return + # Serve delivery callbacks without blocking + self._producer.poll(0)
🧹 Nitpick comments (2)
django/thunderstore/ts_analytics/kafka.py (2)
58-64: Optional: emit debug log when Kafka is disabled.A small debug log helps confirm events are intentionally no‑op in disabled environments. Keep it at DEBUG to avoid noise.
class DummyKafkaClient: @@ - def send( - self, topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None - ): - pass + def send( + self, topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None + ): + logger.debug("Kafka disabled; skipping send to %s", topic)
73-78: Nit: address Ruff TRY003 or silence it.Either shorten messages or define a small
KafkaConfigErrorto avoid long literals in raises; alternatively, ignore TRY003 for this module.- if not config: - raise RuntimeError("KAFKA_CONFIG is not configured.") + if not config: + raise RuntimeError("KAFKA_CONFIG missing.") @@ - if not config.get("bootstrap.servers"): - raise RuntimeError("Kafka bootstrap servers are not configured.") + if not config.get("bootstrap.servers"): + raise RuntimeError("bootstrap.servers missing.")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
django/poetry.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
django/pyproject.toml(1 hunks)django/thunderstore/ts_analytics/kafka.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- django/pyproject.toml
🧰 Additional context used
🧬 Code graph analysis (1)
django/thunderstore/ts_analytics/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(394-400)
🪛 Ruff (0.14.1)
django/thunderstore/ts_analytics/kafka.py
75-75: Avoid specifying long messages outside the exception class
(TRY003)
78-78: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Analyze (python)
- GitHub Check: Build docker image
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
🔇 Additional comments (1)
django/thunderstore/ts_analytics/kafka.py (1)
21-24: Design is sound—no issues found.KafkaTopic is a
str-subclassed Enum, so passing its members to.delay()works correctly. Thestrtype hint in both sync and async APIs ensures compatibility while preventing serialization issues. Enum members are properly serializable as strings.
Add batching, linger, nagle settings to Kafka Config in settings.py Refactor signals to call send_kafka_message which dumps the json into a string before sending it to a celery task Refactor get_kafka_client() to use a global variable rather than @lru_cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
django/thunderstore/ts_analytics/tests/test_kafka.py (1)
219-219: Use raw strings for regex patterns.The
match=arguments contain metacharacters (.) that should be escaped or use raw string literals.Apply this diff:
- with pytest.raises(RuntimeError, match="KAFKA_CONFIG is not configured."): + with pytest.raises(RuntimeError, match=r"KAFKA_CONFIG is not configured\."): get_kafka_client() @@ with pytest.raises( - RuntimeError, match="Kafka bootstrap servers are not configured." + RuntimeError, match=r"Kafka bootstrap servers are not configured\." ):Also applies to: 231-231
django/thunderstore/ts_analytics/kafka.py (3)
37-39: Add error handling to prevent task failures.The task has no error handling, so any exception from
get_kafka_client()orclient.send()will cause the task to fail and potentially retry indefinitely. Wrap the logic in try-except to log errors gracefully.Apply this diff:
def send_kafka_message_task(topic: str, payload_string: str, key: Optional[str] = None): - client = get_kafka_client() - client.send(topic=topic, payload_string=payload_string, key=key) + try: + client = get_kafka_client() + client.send(topic=topic, payload_string=payload_string, key=key) + except Exception as e: + print(f"ERROR: Failed to send Kafka message to {topic}: {e}")
53-66: Add error handling to KafkaClient.send().The
send()method has no error handling for encoding failures orproducer.produce()exceptions. This can crash the Celery task. Wrap the logic in try-except to handle and log errors gracefully.Apply this diff:
def send( self, topic: str, payload_string: str, key: Optional[str] = None, ): - value_bytes = payload_string.encode("utf-8") - key_bytes = key.encode("utf-8") if key else None - - self._producer.produce( - topic=topic, - value=value_bytes, - key=key_bytes, - ) + try: + value_bytes = payload_string.encode("utf-8") + key_bytes = key.encode("utf-8") if key else None + + self._producer.produce( + topic=topic, + value=value_bytes, + key=key_bytes, + ) + except Exception as e: + print(f"ERROR: Kafka produce failed for topic {topic}: {e}") + return
42-51: Producer resource leak - close() never called.The
KafkaClientcreates aProducerin__init__and provides aclose()method to flush and clean up, butget_kafka_client()creates a singleton that is never closed. This leaves network connections and message buffers open when the application shuts down, potentially causing message loss.Consider registering an
atexithandler or Django signal to callclose()on shutdown:+import atexit + class KafkaClient: def __init__(self, config: Dict[str, Any]): self._producer = Producer(config) + atexit.register(self.close) def close(self): """Flushes any remaining messages and closes the producer.""" # The timeout (e.g., 10 seconds) is the maximum time to wait. remaining_messages = self._producer.flush(timeout=10) if remaining_messages > 0: print(f"WARNING: {remaining_messages} messages still in queue after flush.")Also applies to: 76-99
🧹 Nitpick comments (5)
django/thunderstore/ts_analytics/tests/test_kafka.py (2)
5-5: Remove unused imports.The
settingsimport fromdjango.confis shadowed by thesettingsfixture used throughout the tests. TheKafkaTopicimport is never referenced in this test file.Apply this diff:
-from django.conf import settings from django.db import transaction from django.test import TestCase from thunderstore.ts_analytics.kafka import ( DummyKafkaClient, KafkaClient, - KafkaTopic, get_kafka_client, send_kafka_message, send_kafka_message_task, )Also applies to: 9-9
124-169: Consider adding tests for KafkaTopic enum values.All tests use hardcoded topic strings (
"test.topic","test-topic-2") but none verify that the actualKafkaTopicenum values work correctly through the send path. Consider adding at least one test that usesKafkaTopic.PACKAGE_DOWNLOADED.valueor similar to ensure the integration works with the defined enum values.Based on learnings
django/thunderstore/ts_analytics/signals.py (2)
1-1: Remove unused transaction import.The
transactionmodule is imported but never used in this file. Thesend_kafka_messagefunction handles transaction callbacks internally.Apply this diff:
-from django.db import transaction - from thunderstore.ts_analytics.kafka import KafkaTopic, send_kafka_message
29-38: Inconsistent datetime field naming across payloads.The field naming convention is inconsistent:
package_post_save: usesdate_created,date_updatedpackage_listing_post_save: usesdatetime_created,datetime_updatedcommunity_post_save: usesdatetime_created,datetime_updatedConsider standardizing on one naming pattern across all payloads for consistency.
Also applies to: 70-76, 88-94
django/thunderstore/ts_analytics/kafka.py (1)
21-29: Clarify type signature and add error handling.Two issues:
The signature shows
topic: str, but callers insignals.pypassKafkaTopicenum values. SinceKafkaTopicinherits fromstr, this works at runtime, but the type hint should reflect actual usage:topic: Union[KafkaTopic, str]for clarity.No error handling if
json.dumps(payload)fails with non-serializable data. Consider wrapping in try-except to log and gracefully handle serialization errors.Apply this diff:
-def send_kafka_message(topic: str, payload: dict, key: Optional[str] = None): - payload_string = json.dumps(payload) +def send_kafka_message(topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None): + try: + payload_string = json.dumps(payload) + except (TypeError, ValueError) as e: + # Log serialization error and skip message + print(f"ERROR: Failed to serialize Kafka payload: {e}") + return + transaction.on_commit( lambda: send_kafka_message_task.delay( - topic=topic, + topic=topic.value if isinstance(topic, KafkaTopic) else topic, payload_string=payload_string, key=key, ) )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
django/thunderstore/core/settings.py(4 hunks)django/thunderstore/ts_analytics/kafka.py(1 hunks)django/thunderstore/ts_analytics/signals.py(1 hunks)django/thunderstore/ts_analytics/tests/test_kafka.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- django/thunderstore/core/settings.py
🧰 Additional context used
🧬 Code graph analysis (3)
django/thunderstore/ts_analytics/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(394-400)
django/thunderstore/ts_analytics/signals.py (4)
django/thunderstore/ts_analytics/kafka.py (2)
KafkaTopic(13-18)send_kafka_message(21-29)django/thunderstore/repository/models/package_version.py (2)
owner(230-231)is_deprecated(234-235)django/thunderstore/repository/models/package.py (2)
version_number(212-213)downloads(195-197)django/thunderstore/repository/admin/package.py (1)
file_size(148-149)
django/thunderstore/ts_analytics/tests/test_kafka.py (1)
django/thunderstore/ts_analytics/kafka.py (9)
DummyKafkaClient(69-73)KafkaClient(42-66)KafkaTopic(13-18)get_kafka_client(79-99)send_kafka_message(21-29)send_kafka_message_task(37-39)send(53-66)send(72-73)close(46-51)
🪛 Flake8 (7.3.0)
django/thunderstore/ts_analytics/signals.py
[error] 1-1: 'django.db.transaction' imported but unused
(F401)
django/thunderstore/ts_analytics/tests/test_kafka.py
[error] 5-5: 'django.conf.settings' imported but unused
(F401)
[error] 9-9: 'thunderstore.ts_analytics.kafka.KafkaTopic' imported but unused
(F401)
[error] 174-174: redefinition of unused 'settings' from line 5
(F811)
[error] 181-181: redefinition of unused 'settings' from line 5
(F811)
[error] 196-196: redefinition of unused 'settings' from line 5
(F811)
[error] 213-213: redefinition of unused 'settings' from line 5
(F811)
[error] 224-224: redefinition of unused 'settings' from line 5
(F811)
[error] 240-240: redefinition of unused 'settings' from line 5
(F811)
🪛 Ruff (0.14.1)
django/thunderstore/ts_analytics/kafka.py
91-91: Avoid specifying long messages outside the exception class
(TRY003)
94-94: Avoid specifying long messages outside the exception class
(TRY003)
django/thunderstore/ts_analytics/signals.py
6-6: Unused function argument: sender
(ARG001)
6-6: Unused function argument: kwargs
(ARG001)
22-22: Unused function argument: sender
(ARG001)
22-22: Unused function argument: created
(ARG001)
22-22: Unused function argument: kwargs
(ARG001)
42-42: Unused function argument: sender
(ARG001)
42-42: Unused function argument: created
(ARG001)
42-42: Unused function argument: kwargs
(ARG001)
63-63: Unused function argument: sender
(ARG001)
63-63: Unused function argument: created
(ARG001)
63-63: Unused function argument: kwargs
(ARG001)
81-81: Unused function argument: sender
(ARG001)
81-81: Unused function argument: created
(ARG001)
81-81: Unused function argument: kwargs
(ARG001)
django/thunderstore/ts_analytics/tests/test_kafka.py
55-55: Unused function argument: args
(ARG001)
55-55: Unused function argument: kwargs
(ARG001)
174-174: Redefinition of unused settings from line 5
(F811)
174-174: Unused function argument: clear_kafka_client_instance
(ARG001)
181-181: Redefinition of unused settings from line 5
(F811)
181-181: Unused function argument: mock_producer
(ARG001)
181-181: Unused function argument: clear_kafka_client_instance
(ARG001)
196-196: Redefinition of unused settings from line 5
(F811)
196-196: Unused function argument: mock_producer
(ARG001)
196-196: Unused function argument: clear_kafka_client_instance
(ARG001)
213-213: Redefinition of unused settings from line 5
(F811)
213-213: Unused function argument: clear_kafka_client_instance
(ARG001)
219-219: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
224-224: Redefinition of unused settings from line 5
(F811)
224-224: Unused function argument: clear_kafka_client_instance
(ARG001)
231-231: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
240-240: Unused function argument: mock_producer
(ARG001)
240-240: Unused function argument: clear_kafka_client_instance
(ARG001)
240-240: Redefinition of unused settings from line 5
(F811)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Build docker image
- GitHub Check: Analyze (javascript)
- GitHub Check: Analyze (python)
- GitHub Check: Build docker image
🔇 Additional comments (1)
django/thunderstore/ts_analytics/signals.py (1)
22-39: Verify that all post_save signals should fire on updates, not just creation.Most signal handlers (
package_post_save,package_version_post_save,package_listing_post_save,community_post_save) don't check thecreatedparameter, so they emit Kafka messages on every save including updates. Onlypackage_version_download_event_post_saveis creation-only. Confirm this is the intended behavior, especially for high-frequency update operations.If some handlers should only fire on creation, add the guard:
def package_post_save(sender, instance, created, **kwargs): if created: # Only emit on creation send_kafka_message(...)Also applies to: 42-60, 63-78, 81-95
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
django/thunderstore/ts_analytics/signals.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
django/thunderstore/ts_analytics/signals.py (1)
django/thunderstore/ts_analytics/kafka.py (2)
KafkaTopic(13-18)send_kafka_message(21-29)
🪛 Ruff (0.14.1)
django/thunderstore/ts_analytics/signals.py
13-13: Unused function argument: sender
(ARG001)
13-13: Unused function argument: kwargs
(ARG001)
29-29: Unused function argument: sender
(ARG001)
29-29: Unused function argument: created
(ARG001)
29-29: Unused function argument: kwargs
(ARG001)
49-49: Unused function argument: sender
(ARG001)
49-49: Unused function argument: created
(ARG001)
49-49: Unused function argument: kwargs
(ARG001)
70-70: Unused function argument: sender
(ARG001)
70-70: Unused function argument: created
(ARG001)
70-70: Unused function argument: kwargs
(ARG001)
88-88: Unused function argument: sender
(ARG001)
88-88: Unused function argument: created
(ARG001)
88-88: Unused function argument: kwargs
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (5)
django/thunderstore/ts_analytics/signals.py (5)
13-26: LGTM: Download event handler correctly fires only on creation.The
createdcheck ensures only new downloads are tracked. The past review concern about missing.delay()is resolved—send_kafka_message(from kafka.py) already wraps the task call with.delay()internally.
29-46: LGTM: Package handler tracks both creates and updates.Emitting events for all saves (not just creates) appears intentional for keeping analytics in sync with package state changes.
Note: The static analysis warnings about unused
sender,created, andkwargsparameters are false positives—these are required by Django's signal handler signature and can be safely ignored.
88-102: LGTM: Community handler implementation looks correct.The handler appropriately emits events for all Community saves. Static analysis warnings about unused parameters can be ignored (standard Django signal signature).
70-85: The field names are correct as implemented.The
TimestampMixindefinesdatetime_createdanddatetime_updated, whichPackageListinginherits. ThePackagemodel defines its owndate_createdanddate_updatedfields. Each signal handler correctly references its respective model's actual field names—no inconsistency exists.Likely an incorrect or invalid review comment.
49-67: No critical performance risk identified with current codebase patterns.The foreign key traversal on line 59 is minimal overhead: it accesses
instance.package.owner.nameonce per signal invocation. While this theoretically could trigger a database query if thepackagerelation isn't prefetched, verification shows no bulk.save()patterns in production code. The codebase usesbulk_update()(which bypasses signals) and direct.update()queries for high-volume operations.The concern remains valid as a defensive best practice—if bulk save patterns are introduced later, the signal could become problematic. Consider prefetching
package__ownerat call sites if this becomes a bottleneck, but it's not actionable today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
django/thunderstore/ts_analytics/tests/test_kafka.py (1)
221-221: Use raw strings for regex patterns.The
match=arguments should use raw strings to avoid issues with escape sequences.Apply this diff:
with pytest.raises(RuntimeError, match="KAFKA_CONFIG is not configured."): + with pytest.raises(RuntimeError, match=r"KAFKA_CONFIG is not configured\."): get_kafka_client() @@ with pytest.raises( RuntimeError, match="Kafka bootstrap servers are not configured." + RuntimeError, match=r"Kafka bootstrap servers are not configured\." ): get_kafka_client()Also applies to: 232-234
🧹 Nitpick comments (4)
django/thunderstore/ts_analytics/tests/test_kafka.py (3)
2-2: Remove unused import.
timedeltais imported but never used.-from datetime import date, datetime, timedelta +from datetime import date, datetime
13-13: Consider using KafkaTopic enum in tests.
KafkaTopicis imported but never used. Consider using the enum values instead of string literals in tests for type safety and consistency with the implementation.For example, in test methods like
test_send_kafka_message_queues_task_on_commit:- test_topic = "test-topic" + test_topic = KafkaTopic.PACKAGE_DOWNLOADED
49-66: Remove unused fixture.The
fake_commit_executorfixture is defined but never used in any test. If it was intended for future use, consider removing it for now to reduce clutter.-@pytest.fixture -def fake_commit_executor(): - """ - Mocks transaction.on_commit to capture callbacks for execution after the test. - Used for testing send_kafka_message, which is deferred on commit. - """ - callbacks = [] - - def fake_on_commit(func, *args, **kwargs): - callbacks.append(func) - - # Patch the function where it is defined - with patch("django.db.transaction.on_commit", side_effect=fake_on_commit): - yield callbacks # Yield the list of captured callbacks - - # After the test, manually run the captured callbacks to simulate transaction commit - for callback in callbacks: - callback() - -django/thunderstore/ts_analytics/signals.py (1)
4-12: Consider adding type hints.While the logic is correct, adding type hints would improve clarity and enable better IDE support.
+from typing import Optional, Union +from datetime import date, datetime + -def format_datetime(date_or_string): +def format_datetime(date_or_string: Optional[Union[str, date, datetime]]) -> Optional[str]: + """Convert datetime/date to ISO format string, pass through strings, return None for None.""" if date_or_string is None: return None if isinstance(date_or_string, str): return date_or_string try: return date_or_string.isoformat() except AttributeError: return None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
django/thunderstore/ts_analytics/signals.py(1 hunks)django/thunderstore/ts_analytics/tests/test_kafka.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
django/thunderstore/ts_analytics/tests/test_kafka.py (2)
django/thunderstore/ts_analytics/kafka.py (9)
DummyKafkaClient(69-73)KafkaClient(42-66)KafkaTopic(13-18)get_kafka_client(79-99)send_kafka_message(21-29)send_kafka_message_task(37-39)send(53-66)send(72-73)close(46-51)django/thunderstore/ts_analytics/signals.py (1)
format_datetime(4-12)
django/thunderstore/ts_analytics/signals.py (3)
django/thunderstore/ts_analytics/kafka.py (2)
KafkaTopic(13-18)send_kafka_message(21-29)django/thunderstore/repository/models/package_version.py (2)
owner(230-231)is_deprecated(234-235)django/thunderstore/repository/models/package.py (2)
version_number(212-213)downloads(195-197)
🪛 Flake8 (7.3.0)
django/thunderstore/ts_analytics/tests/test_kafka.py
[error] 2-2: 'datetime.timedelta' imported but unused
(F401)
[error] 6-6: 'django.conf.settings' imported but unused
(F401)
[error] 10-10: 'thunderstore.ts_analytics.kafka.KafkaTopic' imported but unused
(F401)
[error] 176-176: redefinition of unused 'settings' from line 6
(F811)
[error] 183-183: redefinition of unused 'settings' from line 6
(F811)
[error] 198-198: redefinition of unused 'settings' from line 6
(F811)
[error] 215-215: redefinition of unused 'settings' from line 6
(F811)
[error] 226-226: redefinition of unused 'settings' from line 6
(F811)
[error] 242-242: redefinition of unused 'settings' from line 6
(F811)
🪛 Ruff (0.14.1)
django/thunderstore/ts_analytics/tests/test_kafka.py
57-57: Unused function argument: args
(ARG001)
57-57: Unused function argument: kwargs
(ARG001)
176-176: Redefinition of unused settings from line 6
(F811)
176-176: Unused function argument: clear_kafka_client_instance
(ARG001)
183-183: Redefinition of unused settings from line 6
(F811)
183-183: Unused function argument: mock_producer
(ARG001)
183-183: Unused function argument: clear_kafka_client_instance
(ARG001)
198-198: Redefinition of unused settings from line 6
(F811)
198-198: Unused function argument: mock_producer
(ARG001)
198-198: Unused function argument: clear_kafka_client_instance
(ARG001)
215-215: Redefinition of unused settings from line 6
(F811)
215-215: Unused function argument: clear_kafka_client_instance
(ARG001)
221-221: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
226-226: Redefinition of unused settings from line 6
(F811)
226-226: Unused function argument: clear_kafka_client_instance
(ARG001)
233-233: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
242-242: Unused function argument: mock_producer
(ARG001)
242-242: Unused function argument: clear_kafka_client_instance
(ARG001)
242-242: Redefinition of unused settings from line 6
(F811)
django/thunderstore/ts_analytics/signals.py
15-15: Unused function argument: sender
(ARG001)
15-15: Unused function argument: kwargs
(ARG001)
31-31: Unused function argument: sender
(ARG001)
31-31: Unused function argument: created
(ARG001)
31-31: Unused function argument: kwargs
(ARG001)
51-51: Unused function argument: sender
(ARG001)
51-51: Unused function argument: created
(ARG001)
51-51: Unused function argument: kwargs
(ARG001)
72-72: Unused function argument: sender
(ARG001)
72-72: Unused function argument: created
(ARG001)
72-72: Unused function argument: kwargs
(ARG001)
90-90: Unused function argument: sender
(ARG001)
90-90: Unused function argument: created
(ARG001)
90-90: Unused function argument: kwargs
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (1)
django/thunderstore/ts_analytics/signals.py (1)
31-48: Verify ifcreatedparameter should be checked.Only
package_version_download_event_post_savechecks thecreatedparameter. The other handlers send messages on every save (both create and update). If this is intentional to track all changes, consider adding a comment explaining the behavior difference.If updates should only be sent on creation, apply a similar check:
def package_post_save(sender, instance, created, **kwargs): """ Signal handler for Package post_save events. Sends package information through Kafka as a Celery task. """ + if created: send_kafka_message( topic=KafkaTopic.PACKAGE_UPDATED,Also applies to: 51-69, 72-87, 90-104
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change set compared to my previous review looks good 👍 I'd recommend waiting for @ethangreen-dev s review too before merging, as I've got limited knowledge of the mission still.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
django/thunderstore/ts_analytics/tests/test_kafka.py (1)
247-247: Use raw strings for regex patterns.The
match=arguments should use raw strings to avoid issues with escape sequences.Apply this diff:
- with pytest.raises(RuntimeError, match="KAFKA_CONFIG is not configured."): + with pytest.raises(RuntimeError, match=r"KAFKA_CONFIG is not configured\."): get_kafka_client()with pytest.raises( - RuntimeError, match="Kafka bootstrap servers are not configured." + RuntimeError, match=r"Kafka bootstrap servers are not configured\." ):Also applies to: 259-259
🧹 Nitpick comments (3)
django/thunderstore/ts_analytics/tests/test_kafka.py (2)
2-2: Remove unused import.
timedeltais imported but never used.Apply this diff:
-from datetime import date, datetime, timedelta +from datetime import date, datetime
10-10: Remove unused import.
KafkaTopicis imported but never referenced in the tests.Apply this diff:
from thunderstore.ts_analytics.kafka import ( DummyKafkaClient, KafkaClient, - KafkaTopic, get_kafka_client, send_kafka_message, send_kafka_message_task, )django/thunderstore/ts_analytics/kafka.py (1)
21-29: Consider accepting KafkaTopic enum in addition to string.The function signature accepts only
str, butKafkaTopicenum exists. Either acceptUnion[KafkaTopic, str]or document that callers must pass.value.Apply this diff if you want to accept the enum directly:
-def send_kafka_message(topic: str, payload: dict, key: Optional[str] = None): +def send_kafka_message(topic: Union[KafkaTopic, str], payload: dict, key: Optional[str] = None): + topic_str = topic.value if isinstance(topic, KafkaTopic) else topic payload_string = json.dumps(payload) transaction.on_commit( lambda: send_kafka_message_task.delay( - topic=topic, + topic=topic_str, payload_string=payload_string, key=key, ) )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
.env.template(1 hunks)django/thunderstore/core/settings.py(4 hunks)django/thunderstore/ts_analytics/kafka.py(1 hunks)django/thunderstore/ts_analytics/tests/test_kafka.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- django/thunderstore/core/settings.py
🧰 Additional context used
🧬 Code graph analysis (2)
django/thunderstore/ts_analytics/tests/test_kafka.py (2)
django/thunderstore/ts_analytics/kafka.py (12)
DummyKafkaClient(109-113)KafkaClient(42-66)KafkaTopic(13-18)get_kafka_client(119-142)send_kafka_message(21-29)send_kafka_message_task(37-39)send(53-66)send(75-86)send(95-106)send(112-113)DevKafkaClient(89-106)ProdKafkaClient(69-86)django/thunderstore/ts_analytics/signals.py (1)
format_datetime(4-12)
django/thunderstore/ts_analytics/kafka.py (1)
django/thunderstore/core/settings.py (1)
CeleryQueues(395-401)
🪛 dotenv-linter (4.0.0)
.env.template
[warning] 68-68: [UnorderedKey] The KAFKA_DEV key should go before the KAFKA_ENABLED key
(UnorderedKey)
[warning] 69-69: [UnorderedKey] The KAFKA_BOOTSTRAP_SERVERS key should go before the KAFKA_DEV key
(UnorderedKey)
[warning] 71-71: [UnorderedKey] The KAFKA_PASSWORD key should go before the KAFKA_USERNAME key
(UnorderedKey)
[warning] 72-72: [UnorderedKey] The KAFKA_CA_CERT key should go before the KAFKA_DEV key
(UnorderedKey)
🪛 Flake8 (7.3.0)
django/thunderstore/ts_analytics/tests/test_kafka.py
[error] 2-2: 'datetime.timedelta' imported but unused
(F401)
[error] 6-6: 'django.conf.settings' imported but unused
(F401)
[error] 10-10: 'thunderstore.ts_analytics.kafka.KafkaTopic' imported but unused
(F401)
[error] 182-182: redefinition of unused 'settings' from line 6
(F811)
[error] 190-190: redefinition of unused 'settings' from line 6
(F811)
[error] 206-206: redefinition of unused 'settings' from line 6
(F811)
[error] 222-222: redefinition of unused 'settings' from line 6
(F811)
[error] 241-241: redefinition of unused 'settings' from line 6
(F811)
[error] 252-252: redefinition of unused 'settings' from line 6
(F811)
[error] 265-265: redefinition of unused 'settings' from line 6
(F811)
🪛 Ruff (0.14.1)
django/thunderstore/ts_analytics/tests/test_kafka.py
57-57: Unused function argument: args
(ARG001)
57-57: Unused function argument: kwargs
(ARG001)
182-182: Redefinition of unused settings from line 6
(F811)
182-182: Unused function argument: clear_kafka_client_instance
(ARG001)
190-190: Redefinition of unused settings from line 6
(F811)
190-190: Unused function argument: clear_kafka_client_instance
(ARG001)
206-206: Redefinition of unused settings from line 6
(F811)
206-206: Unused function argument: clear_kafka_client_instance
(ARG001)
222-222: Redefinition of unused settings from line 6
(F811)
222-222: Unused function argument: clear_kafka_client_instance
(ARG001)
241-241: Redefinition of unused settings from line 6
(F811)
241-241: Unused function argument: clear_kafka_client_instance
(ARG001)
247-247: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
252-252: Redefinition of unused settings from line 6
(F811)
252-252: Unused function argument: clear_kafka_client_instance
(ARG001)
259-259: Pattern passed to match= contains metacharacters but is neither escaped nor raw
(RUF043)
265-265: Unused function argument: mock_producer
(ARG001)
265-265: Unused function argument: clear_kafka_client_instance
(ARG001)
265-265: Redefinition of unused settings from line 6
(F811)
django/thunderstore/ts_analytics/kafka.py
131-131: Avoid specifying long messages outside the exception class
(TRY003)
134-134: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Analyze (javascript)
- GitHub Check: Build docker image
- GitHub Check: Build docker image
🔇 Additional comments (1)
.env.template (1)
65-72: LGTM! Sensible Kafka configuration defaults.The defaults are safe (Kafka disabled, dev mode enabled with helpful comment) and sensitive values are left empty for explicit configuration.
| def get_kafka_client() -> Union[KafkaClient, DevKafkaClient, DummyKafkaClient]: | ||
| global _KAFKA_CLIENT_INSTANCE | ||
|
|
||
| if _KAFKA_CLIENT_INSTANCE is not None: | ||
| return _KAFKA_CLIENT_INSTANCE | ||
|
|
||
| # Return dummy client if Kafka is disabled | ||
| if not getattr(settings, "KAFKA_ENABLED", False): | ||
| client = DummyKafkaClient() | ||
| else: | ||
| config = getattr(settings, "KAFKA_CONFIG", None) | ||
| if not config: | ||
| raise RuntimeError("KAFKA_CONFIG is not configured.") | ||
|
|
||
| if not config.get("bootstrap.servers"): | ||
| raise RuntimeError("Kafka bootstrap servers are not configured.") | ||
|
|
||
| if getattr(settings, "KAFKA_DEV", False): | ||
| client = DevKafkaClient(config) | ||
| else: | ||
| client = ProdKafkaClient(config) | ||
|
|
||
| _KAFKA_CLIENT_INSTANCE = client | ||
| return client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Producer resource leak: singleton is never closed.
The cached client holds network connections and buffers that are never flushed on shutdown, risking message loss and connection leaks.
Consider registering an atexit handler or Django signal to call close() on shutdown:
import atexit
def get_kafka_client() -> Union[KafkaClient, DevKafkaClient, DummyKafkaClient]:
global _KAFKA_CLIENT_INSTANCE
if _KAFKA_CLIENT_INSTANCE is not None:
return _KAFKA_CLIENT_INSTANCE
if not getattr(settings, "KAFKA_ENABLED", False):
client = DummyKafkaClient()
else:
config = getattr(settings, "KAFKA_CONFIG", None)
if not config:
raise RuntimeError("KAFKA_CONFIG is not configured.")
if not config.get("bootstrap.servers"):
raise RuntimeError("Kafka bootstrap servers are not configured.")
if getattr(settings, "KAFKA_DEV", False):
client = DevKafkaClient(config)
else:
client = ProdKafkaClient(config)
# Register cleanup handler for non-dummy clients
atexit.register(client.close)
_KAFKA_CLIENT_INSTANCE = client
return clientHave you tested this in a scenario where the application restarts or shuts down? You may want to verify that messages aren't lost.
---
🏁 Script executed:
```shell
#!/bin/bash
# Find Kafka client class definitions and check for close() methods
fd -e py -t f | xargs rg -l "class (KafkaClient|DevKafkaClient|DummyKafkaClient|ProdKafkaClient)" -A20 | head -20
Length of output: 41
🏁 Script executed:
#!/bin/bash
# Look for close() method implementations in Kafka client classes
rg -n "def close\(" --type=py -B5 -A5 | grep -A5 -B5 "Kafka"Length of output: 516
🏁 Script executed:
#!/bin/bash
# Search for existing atexit or Django signal handlers for cleanup
rg -n "atexit|django.core.signals|django.db.models.signals" --type=py | head -20Length of output: 89
🏁 Script executed:
#!/bin/bash
# Check if there's a Django app config or settings that handle cleanup
rg -n "ready\(|AppConfig" --type=py django/thunderstore/ts_analytics/ -A10Length of output: 1674
🏁 Script executed:
#!/bin/bash
# Look for any close() calls on _KAFKA_CLIENT_INSTANCE
rg -n "_KAFKA_CLIENT_INSTANCE|\.close\(" django/thunderstore/ts_analytics/kafka.py -B3 -A3Length of output: 537
🏁 Script executed:
#!/bin/bash
# Find where KafkaClient, DevKafkaClient, etc. are defined or imported
rg -n "class (KafkaClient|DevKafkaClient|DummyKafkaClient|ProdKafkaClient)" --type=py django/thunderstore/ts_analytics/Length of output: 294
---
🏁 Script executed:
```shell
#!/bin/bash
# Get full close() method implementations for all client classes
rg -n "class (KafkaClient|ProdKafkaClient|DevKafkaClient|DummyKafkaClient)" --type=py django/thunderstore/ts_analytics/kafka.py -A30 | grep -A5 "def close"
Length of output: 364
🏁 Script executed:
#!/bin/bash
# Check the complete implementation of KafkaClient.close() and base class
sed -n '42,68p' django/thunderstore/ts_analytics/kafka.pyLength of output: 814
🏁 Script executed:
#!/bin/bash
# Check complete implementations of all client classes
sed -n '69,115p' django/thunderstore/ts_analytics/kafka.pyLength of output: 1068
🏁 Script executed:
#!/bin/bash
# Search for Django shutdown signals (django.core.signals and process termination)
rg -n "Signal|shutdown|teardown|exit" django/thunderstore/ts_analytics/ --type=pyLength of output: 515
🏁 Script executed:
#!/bin/bash
# Check if get_kafka_client() is called multiple times or if singleton is reused
rg -n "get_kafka_client" django/thunderstore/ --type=py -B2 -A2Length of output: 6974
Kafka producer cleanup is necessary—messages may be lost on shutdown.
The singleton is never closed, so pending messages in the producer buffer won't be flushed when the application terminates. The review's concern is valid: KafkaClient and its subclasses have a close() method that flushes with a 10-second timeout, but it's never called.
The suggested atexit approach is sound. One note: the proposed code correctly registers the handler only for non-dummy clients (inside the else block), avoiding a close() call on DummyKafkaClient which lacks that method.
Add the cleanup registration and verify during deployment that messages are flushed cleanly on shutdown.
🧰 Tools
🪛 Ruff (0.14.1)
131-131: Avoid specifying long messages outside the exception class
(TRY003)
134-134: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
In django/thunderstore/ts_analytics/kafka.py around lines 119 to 142, the
singleton Kafka client is never closed which can cause buffered messages to be
lost on shutdown; add an atexit-registered cleanup that, when a real Kafka
client (ProdKafkaClient or DevKafkaClient) is created, registers a handler to
call its close() (with the existing flush timeout, e.g., 10s) and swallow/log
any exceptions; ensure you import atexit at the top, only register the handler
for non-DummyKafkaClient instances, and make the handler idempotent by checking
the global _KAFKA_CLIENT_INSTANCE before calling close().
Refactoring of the other two PRs into one