diff --git a/src/sentry/replays/endpoints/project_replay_details.py b/src/sentry/replays/endpoints/project_replay_details.py index e40b3d3fbc0e76..987cb938748ce2 100644 --- a/src/sentry/replays/endpoints/project_replay_details.py +++ b/src/sentry/replays/endpoints/project_replay_details.py @@ -14,7 +14,7 @@ from sentry.models.project import Project from sentry.replays.post_process import process_raw_response from sentry.replays.query import query_replay_instance -from sentry.replays.tasks import delete_recording_segments +from sentry.replays.tasks import delete_replay from sentry.replays.usecases.reader import has_archived_segment @@ -96,5 +96,10 @@ def delete(self, request: Request, project: Project, replay_id: str) -> Response if has_archived_segment(project.id, replay_id): return Response(status=404) - delete_recording_segments.delay(project_id=project.id, replay_id=replay_id) + delete_replay.delay( + project_id=project.id, + replay_id=replay_id, + has_seer_data=features.has("organizations:replay-ai-summaries", project.organization), + ) + return Response(status=204) diff --git a/src/sentry/replays/endpoints/project_replay_jobs_delete.py b/src/sentry/replays/endpoints/project_replay_jobs_delete.py index ad0afbbd609777..f5dbe74bb43da2 100644 --- a/src/sentry/replays/endpoints/project_replay_jobs_delete.py +++ b/src/sentry/replays/endpoints/project_replay_jobs_delete.py @@ -2,7 +2,7 @@ from rest_framework.request import Request from rest_framework.response import Response -from sentry import audit_log +from sentry import audit_log, features from sentry.api.api_owners import ApiOwner from sentry.api.api_publish_status import ApiPublishStatus from sentry.api.base import region_silo_endpoint @@ -102,9 +102,11 @@ def post(self, request: Request, project) -> Response: status="pending", ) + has_seer_data = features.has("organizations:replay-ai-summaries", project.organization) + # We always start with an offset of 0 (obviously) but future work doesn't need to obey # this. You're free to start from wherever you want. - run_bulk_replay_delete_job.delay(job.id, offset=0) + run_bulk_replay_delete_job.delay(job.id, offset=0, has_seer_data=has_seer_data) self.create_audit_entry( request, diff --git a/src/sentry/replays/tasks.py b/src/sentry/replays/tasks.py index b90d2d3f607f55..85b6874d309fb3 100644 --- a/src/sentry/replays/tasks.py +++ b/src/sentry/replays/tasks.py @@ -15,7 +15,11 @@ storage_kv, ) from sentry.replays.models import DeletionJobStatus, ReplayDeletionJobModel, ReplayRecordingSegment -from sentry.replays.usecases.delete import delete_matched_rows, fetch_rows_matching_pattern +from sentry.replays.usecases.delete import ( + delete_matched_rows, + delete_seer_replay_data, + fetch_rows_matching_pattern, +) from sentry.replays.usecases.events import archive_event from sentry.replays.usecases.reader import fetch_segments_metadata from sentry.silo.base import SiloMode @@ -30,7 +34,7 @@ @instrumented_task( - name="sentry.replays.tasks.delete_recording_segments", + name="sentry.replays.tasks.delete_replay", queue="replays.delete_replay", default_retry_delay=5, max_retries=5, @@ -43,32 +47,17 @@ ), ), ) -def delete_recording_segments(project_id: int, replay_id: str, **kwargs: Any) -> None: +def delete_replay( + project_id: int, replay_id: str, has_seer_data: bool = False, **kwargs: Any +) -> None: """Asynchronously delete a replay.""" - metrics.incr("replays.delete_recording_segments", amount=1, tags={"status": "started"}) + metrics.incr("replays.delete_replay", amount=1, tags={"status": "started"}) publisher = initialize_replays_publisher(is_async=False) archive_replay(publisher, project_id, replay_id) delete_replay_recording(project_id, replay_id) - metrics.incr("replays.delete_recording_segments", amount=1, tags={"status": "finished"}) - - -@instrumented_task( - name="sentry.replays.tasks.delete_replay_recording_async", - queue="replays.delete_replay", - default_retry_delay=5, - max_retries=5, - silo_mode=SiloMode.REGION, - taskworker_config=TaskworkerConfig( - namespace=replays_tasks, - processing_deadline_duration=120, - retry=Retry( - times=5, - delay=5, - ), - ), -) -def delete_replay_recording_async(project_id: int, replay_id: str) -> None: - delete_replay_recording(project_id, replay_id) + if has_seer_data: + delete_seer_replay_data(project_id, [replay_id]) + metrics.incr("replays.delete_replay", amount=1, tags={"status": "finished"}) @instrumented_task( @@ -117,6 +106,25 @@ def delete_replays_script_async( segment_model.delete() +@instrumented_task( + name="sentry.replays.tasks.delete_replay_recording_async", + queue="replays.delete_replay", + default_retry_delay=5, + max_retries=5, + silo_mode=SiloMode.REGION, + taskworker_config=TaskworkerConfig( + namespace=replays_tasks, + processing_deadline_duration=120, + retry=Retry( + times=5, + delay=5, + ), + ), +) +def delete_replay_recording_async(project_id: int, replay_id: str) -> None: + delete_replay_recording(project_id, replay_id) + + def delete_replay_recording(project_id: int, replay_id: str) -> None: """Delete all recording-segments associated with a Replay.""" segments_from_metadata = fetch_segments_metadata(project_id, replay_id, offset=0, limit=10000) @@ -178,7 +186,9 @@ def _delete_if_exists(filename: str) -> None: namespace=replays_tasks, retry=Retry(times=5), processing_deadline_duration=300 ), ) -def run_bulk_replay_delete_job(replay_delete_job_id: int, offset: int, limit: int = 100) -> None: +def run_bulk_replay_delete_job( + replay_delete_job_id: int, offset: int, limit: int = 100, has_seer_data: bool = False +) -> None: """Replay bulk deletion task. We specify retry behavior in the task definition. However, if the task fails more than 5 times @@ -213,6 +223,10 @@ def run_bulk_replay_delete_job(replay_delete_job_id: int, offset: int, limit: in # Delete the matched rows if any rows were returned. if len(results["rows"]) > 0: delete_matched_rows(job.project_id, results["rows"]) + if has_seer_data: + delete_seer_replay_data( + job.project_id, [row["replay_id"] for row in results["rows"]] + ) except Exception: logger.exception("Bulk delete replays failed.") @@ -228,8 +242,9 @@ def run_bulk_replay_delete_job(replay_delete_job_id: int, offset: int, limit: in # Checkpoint before continuing. job.offset = next_offset job.save() - - run_bulk_replay_delete_job.delay(job.id, next_offset, limit=limit) + run_bulk_replay_delete_job.delay( + job.id, next_offset, limit=limit, has_seer_data=has_seer_data + ) return None else: # If we've finished deleting all the replays for the selection. We can move the status to diff --git a/src/sentry/replays/usecases/delete.py b/src/sentry/replays/usecases/delete.py index db36ca4839fd7b..4f2b4a06823d22 100644 --- a/src/sentry/replays/usecases/delete.py +++ b/src/sentry/replays/usecases/delete.py @@ -2,9 +2,12 @@ import concurrent.futures as cf import functools +import logging from datetime import datetime -from typing import TypedDict +from typing import Any, TypedDict +import requests +from django.conf import settings from google.cloud.exceptions import NotFound from snuba_sdk import ( Column, @@ -32,6 +35,8 @@ from sentry.replays.usecases.events import archive_event from sentry.replays.usecases.query import execute_query, handle_search_filters from sentry.replays.usecases.query.configs.aggregate import search_config as agg_search_config +from sentry.seer.signed_seer_api import sign_with_seer_secret +from sentry.utils import json from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay from sentry.utils.snuba import ( QueryExecutionError, @@ -49,6 +54,12 @@ UnexpectedResponseError, ) +SEER_DELETE_SUMMARIES_URL = ( + f"{settings.SEER_AUTOFIX_URL}/v1/automation/summarize/replay/breadcrumbs/delete" +) + +logger = logging.getLogger(__name__) + def delete_matched_rows(project_id: int, rows: list[MatchedRow]) -> int | None: if not rows: @@ -182,3 +193,61 @@ def fetch_rows_matching_pattern( for row in rows ], } + + +def make_seer_request( + url: str, + data: dict[str, Any], + timeout: int | tuple[int, int] | None = None, +) -> tuple[requests.Response | None, int]: + """ + Makes a standalone POST request to a Seer URL with built in error handling. Expects valid JSON data. + Returns a tuple of (response, status code). If a request error occurred the response will be None. + XXX: Investigate migrating this to the shared util make_signed_seer_api_request, which uses connection pool. + """ + str_data = json.dumps(data) + + try: + response = requests.post( + url, + data=str_data, + headers={ + "content-type": "application/json;charset=utf-8", + **sign_with_seer_secret(str_data.encode()), + }, + timeout=timeout or settings.SEER_DEFAULT_TIMEOUT or 5, + ) + # Don't raise for error status, just return response. + + except requests.exceptions.Timeout: + return (None, 504) + + except requests.exceptions.RequestException: + return (None, 502) + + return (response, response.status_code) + + +def delete_seer_replay_data( + project_id: int, + replay_ids: list[str], + timeout: int | tuple[int, int] | None = None, +) -> bool: + response, status_code = make_seer_request( + SEER_DELETE_SUMMARIES_URL, + { + "replay_ids": replay_ids, + }, + timeout=timeout, + ) + if status_code >= 400: + logger.error( + "Failed to delete replay data from Seer", + extra={ + "project_id": project_id, + "replay_ids": replay_ids, + "status_code": status_code, + "response": response.content if response else None, + }, + ) + return status_code < 400 diff --git a/tests/sentry/replays/endpoints/test_project_replay_details.py b/tests/sentry/replays/endpoints/test_project_replay_details.py index b076fb00dfe03a..1c129af7688ab4 100644 --- a/tests/sentry/replays/endpoints/test_project_replay_details.py +++ b/tests/sentry/replays/endpoints/test_project_replay_details.py @@ -10,6 +10,7 @@ from sentry.replays.lib.storage import RecordingSegmentStorageMeta, storage from sentry.replays.models import ReplayRecordingSegment from sentry.replays.testutils import assert_expected_response, mock_expected_response, mock_replay +from sentry.replays.usecases.delete import SEER_DELETE_SUMMARIES_URL from sentry.testutils.cases import APITestCase, ReplaysSnubaTestCase from sentry.testutils.helpers import TaskRunner from sentry.utils import kafka_config @@ -237,3 +238,31 @@ def test_delete_replay_from_clickhouse_data(self) -> None: assert storage.get(metadata1) is None assert storage.get(metadata2) is None assert storage.get(metadata3) is not None + + @mock.patch("sentry.replays.usecases.delete.make_seer_request") + def test_delete_replay_from_seer( + self, + mock_make_seer_request: mock.MagicMock, + ) -> None: + """Test delete method deletes from Seer if summaries are enabled.""" + kept_replay_id = uuid4().hex + + t1 = datetime.datetime.now() - datetime.timedelta(seconds=10) + t2 = datetime.datetime.now() - datetime.timedelta(seconds=5) + self.store_replays(mock_replay(t1, self.project.id, self.replay_id, segment_id=0)) + self.store_replays(mock_replay(t2, self.project.id, self.replay_id, segment_id=1)) + self.store_replays(mock_replay(t1, self.project.id, kept_replay_id, segment_id=0)) + + mock_make_seer_request.return_value = (None, 204) + + with self.feature({**REPLAYS_FEATURES, "organizations:replay-ai-summaries": True}): + with TaskRunner(): + response = self.client.delete(self.url) + assert response.status_code == 204 + + mock_make_seer_request.assert_called_once() + (url, data) = mock_make_seer_request.call_args.args + assert url == SEER_DELETE_SUMMARIES_URL + assert data == { + "replay_ids": [self.replay_id], + } diff --git a/tests/sentry/replays/endpoints/test_project_replay_jobs_delete.py b/tests/sentry/replays/endpoints/test_project_replay_jobs_delete.py index 11348218561aef..9d1aab93d603a2 100644 --- a/tests/sentry/replays/endpoints/test_project_replay_jobs_delete.py +++ b/tests/sentry/replays/endpoints/test_project_replay_jobs_delete.py @@ -185,7 +185,7 @@ def test_post_success(self, mock_task: MagicMock) -> None: assert job.status == "pending" # Verify task was scheduled - mock_task.assert_called_once_with(job.id, offset=0) + mock_task.assert_called_once_with(job.id, offset=0, has_seer_data=False) with assume_test_silo_mode(SiloMode.REGION): RegionOutbox( @@ -342,6 +342,30 @@ def test_permission_granted_with_project_admin(self) -> None: ) assert response.status_code == 201 + @patch("sentry.replays.tasks.run_bulk_replay_delete_job.delay") + def test_post_has_seer_data(self, mock_task: MagicMock) -> None: + """Test POST with summaries enabled schedules task with has_seer_data=True.""" + data = { + "data": { + "rangeStart": "2023-01-01T00:00:00Z", + "rangeEnd": "2023-01-02T00:00:00Z", + "environments": ["production"], + "query": None, + } + } + + with self.feature({"organizations:replay-ai-summaries": True}): + response = self.get_success_response( + self.organization.slug, self.project.slug, method="post", **data, status_code=201 + ) + + job_data = response.data["data"] + job = ReplayDeletionJobModel.objects.get(id=job_data["id"]) + assert job.project_id == self.project.id + assert job.status == "pending" + + mock_task.assert_called_once_with(job.id, offset=0, has_seer_data=True) + @region_silo_test class ProjectReplayDeletionJobDetailTest(APITestCase): diff --git a/tests/sentry/replays/tasks/test_delete_replays_bulk.py b/tests/sentry/replays/tasks/test_delete_replays_bulk.py index 0af746e8ea0ab5..67c0b2356cf088 100644 --- a/tests/sentry/replays/tasks/test_delete_replays_bulk.py +++ b/tests/sentry/replays/tasks/test_delete_replays_bulk.py @@ -2,14 +2,15 @@ import datetime import uuid -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, Mock, patch from sentry.replays.models import DeletionJobStatus, ReplayDeletionJobModel from sentry.replays.tasks import run_bulk_replay_delete_job from sentry.replays.testutils import mock_replay -from sentry.replays.usecases.delete import fetch_rows_matching_pattern +from sentry.replays.usecases.delete import SEER_DELETE_SUMMARIES_URL, fetch_rows_matching_pattern from sentry.testutils.cases import APITestCase, ReplaysSnubaTestCase from sentry.testutils.helpers import TaskRunner +from sentry.utils import json class TestDeleteReplaysBulk(APITestCase, ReplaysSnubaTestCase): @@ -234,3 +235,114 @@ def test_fetch_rows_matching_pattern(self) -> None: ) assert len(result["rows"]) == 1 assert result["rows"][0]["replay_id"] == str(uuid.UUID(replay_id)) + + @patch("requests.post") + @patch("sentry.replays.tasks.fetch_rows_matching_pattern") + @patch("sentry.replays.tasks.delete_matched_rows") + def test_run_bulk_replay_delete_job_has_seer_data_true( + self, mock_delete_matched_rows: MagicMock, mock_fetch_rows: MagicMock, mock_post: MagicMock + ) -> None: + def row_generator(): + yield { + "rows": [ + { + "retention_days": 90, + "replay_id": "a", + "max_segment_id": 1, + }, + { + "retention_days": 90, + "replay_id": "b", + "max_segment_id": 0, + }, + ], + "has_more": True, + } + yield { + "rows": [ + { + "retention_days": 90, + "replay_id": "c", + "max_segment_id": 1, + }, + ], + "has_more": False, + } + + mock_fetch_rows.side_effect = row_generator() + mock_post.return_value = Mock(status_code=204) + + with TaskRunner(): + run_bulk_replay_delete_job.delay(self.job.id, offset=0, limit=2, has_seer_data=True) + + # Runs were chained. + self.job.refresh_from_db() + assert self.job.status == "completed" + assert self.job.offset == 3 + + assert mock_post.call_count == 2 + assert mock_post.call_args_list[0].args == (SEER_DELETE_SUMMARIES_URL,) + assert mock_post.call_args_list[0].kwargs["data"] == json.dumps( + { + "replay_ids": ["a", "b"], + } + ) + assert ( + mock_post.call_args_list[0].kwargs["headers"]["content-type"] + == "application/json;charset=utf-8" + ) + assert mock_post.call_args_list[1].args == (SEER_DELETE_SUMMARIES_URL,) + assert mock_post.call_args_list[1].kwargs["data"] == json.dumps( + { + "replay_ids": ["c"], + } + ) + assert ( + mock_post.call_args_list[1].kwargs["headers"]["content-type"] + == "application/json;charset=utf-8" + ) + + @patch("requests.post") + @patch("sentry.replays.tasks.fetch_rows_matching_pattern") + @patch("sentry.replays.tasks.delete_matched_rows") + def test_run_bulk_replay_delete_job_has_seer_data_false( + self, mock_delete_matched_rows: MagicMock, mock_fetch_rows: MagicMock, mock_post: MagicMock + ) -> None: + def row_generator(): + yield { + "rows": [ + { + "retention_days": 90, + "replay_id": "a", + "max_segment_id": 1, + }, + { + "retention_days": 90, + "replay_id": "b", + "max_segment_id": 0, + }, + ], + "has_more": True, + } + yield { + "rows": [ + { + "retention_days": 90, + "replay_id": "c", + "max_segment_id": 1, + }, + ], + "has_more": False, + } + + mock_fetch_rows.side_effect = row_generator() + + with TaskRunner(): + run_bulk_replay_delete_job.delay(self.job.id, offset=0, limit=2, has_seer_data=False) + + # Runs were chained. + self.job.refresh_from_db() + assert self.job.status == "completed" + assert self.job.offset == 3 + + assert mock_post.call_count == 0