Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/sentry/replays/endpoints/project_replay_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from sentry.models.project import Project
from sentry.replays.post_process import process_raw_response
from sentry.replays.query import query_replay_instance
from sentry.replays.tasks import delete_recording_segments
from sentry.replays.tasks import delete_replay
from sentry.replays.usecases.reader import has_archived_segment


Expand Down Expand Up @@ -96,5 +96,10 @@ def delete(self, request: Request, project: Project, replay_id: str) -> Response
if has_archived_segment(project.id, replay_id):
return Response(status=404)

delete_recording_segments.delay(project_id=project.id, replay_id=replay_id)
delete_replay.delay(
project_id=project.id,
replay_id=replay_id,
has_seer_data=features.has("organizations:replay-ai-summaries", project.organization),
)

return Response(status=204)
6 changes: 4 additions & 2 deletions src/sentry/replays/endpoints/project_replay_jobs_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from rest_framework.request import Request
from rest_framework.response import Response

from sentry import audit_log
from sentry import audit_log, features
from sentry.api.api_owners import ApiOwner
from sentry.api.api_publish_status import ApiPublishStatus
from sentry.api.base import region_silo_endpoint
Expand Down Expand Up @@ -102,9 +102,11 @@ def post(self, request: Request, project) -> Response:
status="pending",
)

has_seer_data = features.has("organizations:replay-ai-summaries", project.organization)

# We always start with an offset of 0 (obviously) but future work doesn't need to obey
# this. You're free to start from wherever you want.
run_bulk_replay_delete_job.delay(job.id, offset=0)
run_bulk_replay_delete_job.delay(job.id, offset=0, has_seer_data=has_seer_data)

self.create_audit_entry(
request,
Expand Down
69 changes: 42 additions & 27 deletions src/sentry/replays/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
storage_kv,
)
from sentry.replays.models import DeletionJobStatus, ReplayDeletionJobModel, ReplayRecordingSegment
from sentry.replays.usecases.delete import delete_matched_rows, fetch_rows_matching_pattern
from sentry.replays.usecases.delete import (
delete_matched_rows,
delete_seer_replay_data,
fetch_rows_matching_pattern,
)
from sentry.replays.usecases.events import archive_event
from sentry.replays.usecases.reader import fetch_segments_metadata
from sentry.silo.base import SiloMode
Expand All @@ -30,7 +34,7 @@


@instrumented_task(
name="sentry.replays.tasks.delete_recording_segments",
name="sentry.replays.tasks.delete_replay",
queue="replays.delete_replay",
default_retry_delay=5,
max_retries=5,
Expand All @@ -43,32 +47,17 @@
),
),
)
def delete_recording_segments(project_id: int, replay_id: str, **kwargs: Any) -> None:
def delete_replay(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this to match its purpose of deleting a single replay - archive event, del recording segments, and request seer. Maybe this can later be replaced by this task I removed recently, since they seem to be very similar?

project_id: int, replay_id: str, has_seer_data: bool = False, **kwargs: Any
) -> None:
"""Asynchronously delete a replay."""
metrics.incr("replays.delete_recording_segments", amount=1, tags={"status": "started"})
metrics.incr("replays.delete_replay", amount=1, tags={"status": "started"})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we're using this metric anywhere, I can restore if needed

publisher = initialize_replays_publisher(is_async=False)
archive_replay(publisher, project_id, replay_id)
delete_replay_recording(project_id, replay_id)
metrics.incr("replays.delete_recording_segments", amount=1, tags={"status": "finished"})


@instrumented_task(
name="sentry.replays.tasks.delete_replay_recording_async",
queue="replays.delete_replay",
default_retry_delay=5,
max_retries=5,
silo_mode=SiloMode.REGION,
taskworker_config=TaskworkerConfig(
namespace=replays_tasks,
processing_deadline_duration=120,
retry=Retry(
times=5,
delay=5,
),
),
)
def delete_replay_recording_async(project_id: int, replay_id: str) -> None:
delete_replay_recording(project_id, replay_id)
if has_seer_data:
delete_seer_replay_data(project_id, [replay_id])
metrics.incr("replays.delete_replay", amount=1, tags={"status": "finished"})


@instrumented_task(
Expand Down Expand Up @@ -117,6 +106,25 @@ def delete_replays_script_async(
segment_model.delete()


@instrumented_task(
name="sentry.replays.tasks.delete_replay_recording_async",
queue="replays.delete_replay",
default_retry_delay=5,
max_retries=5,
silo_mode=SiloMode.REGION,
taskworker_config=TaskworkerConfig(
namespace=replays_tasks,
processing_deadline_duration=120,
retry=Retry(
times=5,
delay=5,
),
),
)
def delete_replay_recording_async(project_id: int, replay_id: str) -> None:
delete_replay_recording(project_id, replay_id)


def delete_replay_recording(project_id: int, replay_id: str) -> None:
"""Delete all recording-segments associated with a Replay."""
segments_from_metadata = fetch_segments_metadata(project_id, replay_id, offset=0, limit=10000)
Expand Down Expand Up @@ -178,7 +186,9 @@ def _delete_if_exists(filename: str) -> None:
namespace=replays_tasks, retry=Retry(times=5), processing_deadline_duration=300
),
)
def run_bulk_replay_delete_job(replay_delete_job_id: int, offset: int, limit: int = 100) -> None:
def run_bulk_replay_delete_job(
replay_delete_job_id: int, offset: int, limit: int = 100, has_seer_data: bool = False
) -> None:
"""Replay bulk deletion task.

We specify retry behavior in the task definition. However, if the task fails more than 5 times
Expand Down Expand Up @@ -213,6 +223,10 @@ def run_bulk_replay_delete_job(replay_delete_job_id: int, offset: int, limit: in
# Delete the matched rows if any rows were returned.
if len(results["rows"]) > 0:
delete_matched_rows(job.project_id, results["rows"])
if has_seer_data:
delete_seer_replay_data(
job.project_id, [row["replay_id"] for row in results["rows"]]
)
except Exception:
logger.exception("Bulk delete replays failed.")

Expand All @@ -228,8 +242,9 @@ def run_bulk_replay_delete_job(replay_delete_job_id: int, offset: int, limit: in
# Checkpoint before continuing.
job.offset = next_offset
job.save()

run_bulk_replay_delete_job.delay(job.id, next_offset, limit=limit)
run_bulk_replay_delete_job.delay(
job.id, next_offset, limit=limit, has_seer_data=has_seer_data
)
return None
else:
# If we've finished deleting all the replays for the selection. We can move the status to
Expand Down
71 changes: 70 additions & 1 deletion src/sentry/replays/usecases/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import concurrent.futures as cf
import functools
import logging
from datetime import datetime
from typing import TypedDict
from typing import Any, TypedDict

import requests
from django.conf import settings
from google.cloud.exceptions import NotFound
from snuba_sdk import (
Column,
Expand Down Expand Up @@ -32,6 +35,8 @@
from sentry.replays.usecases.events import archive_event
from sentry.replays.usecases.query import execute_query, handle_search_filters
from sentry.replays.usecases.query.configs.aggregate import search_config as agg_search_config
from sentry.seer.signed_seer_api import sign_with_seer_secret
from sentry.utils import json
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
from sentry.utils.snuba import (
QueryExecutionError,
Expand All @@ -49,6 +54,12 @@
UnexpectedResponseError,
)

SEER_DELETE_SUMMARIES_URL = (
f"{settings.SEER_AUTOFIX_URL}/v1/automation/summarize/replay/breadcrumbs/delete"
)

logger = logging.getLogger(__name__)


def delete_matched_rows(project_id: int, rows: list[MatchedRow]) -> int | None:
if not rows:
Expand Down Expand Up @@ -182,3 +193,61 @@ def fetch_rows_matching_pattern(
for row in rows
],
}


def make_seer_request(
url: str,
data: dict[str, Any],
timeout: int | tuple[int, int] | None = None,
) -> tuple[requests.Response | None, int]:
"""
Makes a standalone POST request to a Seer URL with built in error handling. Expects valid JSON data.
Returns a tuple of (response, status code). If a request error occurred the response will be None.
XXX: Investigate migrating this to the shared util make_signed_seer_api_request, which uses connection pool.
"""
str_data = json.dumps(data)

try:
response = requests.post(
url,
data=str_data,
headers={
"content-type": "application/json;charset=utf-8",
**sign_with_seer_secret(str_data.encode()),
},
timeout=timeout or settings.SEER_DEFAULT_TIMEOUT or 5,
)
# Don't raise for error status, just return response.

except requests.exceptions.Timeout:
return (None, 504)

except requests.exceptions.RequestException:
return (None, 502)

return (response, response.status_code)


def delete_seer_replay_data(
project_id: int,
replay_ids: list[str],
timeout: int | tuple[int, int] | None = None,
) -> bool:
response, status_code = make_seer_request(
SEER_DELETE_SUMMARIES_URL,
{
"replay_ids": replay_ids,
},
timeout=timeout,
)
if status_code >= 400:
logger.error(
"Failed to delete replay data from Seer",
extra={
"project_id": project_id,
"replay_ids": replay_ids,
"status_code": status_code,
"response": response.content if response else None,
},
)
return status_code < 400
29 changes: 29 additions & 0 deletions tests/sentry/replays/endpoints/test_project_replay_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sentry.replays.lib.storage import RecordingSegmentStorageMeta, storage
from sentry.replays.models import ReplayRecordingSegment
from sentry.replays.testutils import assert_expected_response, mock_expected_response, mock_replay
from sentry.replays.usecases.delete import SEER_DELETE_SUMMARIES_URL
from sentry.testutils.cases import APITestCase, ReplaysSnubaTestCase
from sentry.testutils.helpers import TaskRunner
from sentry.utils import kafka_config
Expand Down Expand Up @@ -237,3 +238,31 @@ def test_delete_replay_from_clickhouse_data(self) -> None:
assert storage.get(metadata1) is None
assert storage.get(metadata2) is None
assert storage.get(metadata3) is not None

@mock.patch("sentry.replays.usecases.delete.make_seer_request")
def test_delete_replay_from_seer(
self,
mock_make_seer_request: mock.MagicMock,
) -> None:
"""Test delete method deletes from Seer if summaries are enabled."""
kept_replay_id = uuid4().hex

t1 = datetime.datetime.now() - datetime.timedelta(seconds=10)
t2 = datetime.datetime.now() - datetime.timedelta(seconds=5)
self.store_replays(mock_replay(t1, self.project.id, self.replay_id, segment_id=0))
self.store_replays(mock_replay(t2, self.project.id, self.replay_id, segment_id=1))
self.store_replays(mock_replay(t1, self.project.id, kept_replay_id, segment_id=0))

mock_make_seer_request.return_value = (None, 204)

with self.feature({**REPLAYS_FEATURES, "organizations:replay-ai-summaries": True}):
with TaskRunner():
response = self.client.delete(self.url)
assert response.status_code == 204

mock_make_seer_request.assert_called_once()
(url, data) = mock_make_seer_request.call_args.args
assert url == SEER_DELETE_SUMMARIES_URL
assert data == {
"replay_ids": [self.replay_id],
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def test_post_success(self, mock_task: MagicMock) -> None:
assert job.status == "pending"

# Verify task was scheduled
mock_task.assert_called_once_with(job.id, offset=0)
mock_task.assert_called_once_with(job.id, offset=0, has_seer_data=False)

with assume_test_silo_mode(SiloMode.REGION):
RegionOutbox(
Expand Down Expand Up @@ -342,6 +342,30 @@ def test_permission_granted_with_project_admin(self) -> None:
)
assert response.status_code == 201

@patch("sentry.replays.tasks.run_bulk_replay_delete_job.delay")
def test_post_has_seer_data(self, mock_task: MagicMock) -> None:
"""Test POST with summaries enabled schedules task with has_seer_data=True."""
data = {
"data": {
"rangeStart": "2023-01-01T00:00:00Z",
"rangeEnd": "2023-01-02T00:00:00Z",
"environments": ["production"],
"query": None,
}
}

with self.feature({"organizations:replay-ai-summaries": True}):
response = self.get_success_response(
self.organization.slug, self.project.slug, method="post", **data, status_code=201
)

job_data = response.data["data"]
job = ReplayDeletionJobModel.objects.get(id=job_data["id"])
assert job.project_id == self.project.id
assert job.status == "pending"

mock_task.assert_called_once_with(job.id, offset=0, has_seer_data=True)


@region_silo_test
class ProjectReplayDeletionJobDetailTest(APITestCase):
Expand Down
Loading
Loading