Skip to content

Commit

Permalink
ref(typing): add types to tasks/reprocessing2 (#66817)
Browse files Browse the repository at this point in the history
  • Loading branch information
anonrig committed Mar 12, 2024
1 parent d89c3bb commit 4c634e0
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Expand Up @@ -477,7 +477,6 @@ module = [
"sentry.tasks.integrations.sync_status_outbound",
"sentry.tasks.merge",
"sentry.tasks.process_buffer",
"sentry.tasks.reprocessing2",
"sentry.tasks.sentry_apps",
"sentry.tasks.store",
"sentry.tasks.unmerge",
Expand Down Expand Up @@ -631,6 +630,7 @@ module = [
"sentry.reprocessing2",
"sentry.relay.config.metric_extraction",
"sentry.snuba.metrics.extraction",
"sentry.tasks.reprocessing2",
"tests.sentry.tasks.test_on_demand_metrics",
"tests.sentry.relay.config.test_metric_extraction",
]
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/reprocessing2.py
Expand Up @@ -193,7 +193,7 @@ def pull_event_data(project_id: int, event_id: str) -> ReprocessableEvent:
return ReprocessableEvent(event=event, data=data, attachments=attachments)


def reprocess_event(project_id: int, event_id: str, start_time: int) -> None:
def reprocess_event(project_id: int, event_id: str, start_time: float) -> None:
from sentry.ingest.consumer.processors import CACHE_TIMEOUT
from sentry.tasks.store import preprocess_event_from_reprocessing

Expand Down Expand Up @@ -428,7 +428,7 @@ def buffered_handle_remaining_events(
old_group_id: int,
new_group_id: int,
datetime_to_event: list[tuple[datetime, str]],
remaining_events: int,
remaining_events: str,
force_flush_batch: bool = False,
) -> None:
"""
Expand Down
52 changes: 29 additions & 23 deletions src/sentry/tasks/reprocessing2.py
@@ -1,4 +1,7 @@
import time
from collections.abc import Sequence
from datetime import datetime
from typing import TYPE_CHECKING

import sentry_sdk
from django.conf import settings
Expand All @@ -24,15 +27,15 @@
silo_mode=SiloMode.REGION,
)
def reprocess_group(
project_id,
group_id,
remaining_events="delete",
new_group_id=None,
query_state=None,
start_time=None,
max_events=None,
acting_user_id=None,
):
project_id: int,
group_id: int,
remaining_events: str = "delete",
new_group_id: int | None = None,
query_state: str | None = None,
start_time: float | None = None,
max_events: int | None = None,
acting_user_id: int | None = None,
) -> None:
sentry_sdk.set_tag("project", project_id)
sentry_sdk.set_tag("group_id", group_id)

Expand Down Expand Up @@ -141,17 +144,17 @@ def reprocess_group(
)
@retry
def handle_remaining_events(
project_id,
new_group_id,
remaining_events,
project_id: int,
new_group_id: str,
remaining_events: str,
# TODO(markus): Should be mandatory arguments.
event_ids_redis_key=None,
old_group_id=None,
event_ids_redis_key: str | None = None,
old_group_id: str | None = None,
# TODO(markus): Deprecated arguments, can remove in next version.
event_ids=None,
from_timestamp=None,
to_timestamp=None,
):
event_ids: Sequence[str] | None = None,
from_timestamp: datetime | None = None,
to_timestamp: datetime | None = None,
) -> None:
"""
Delete or merge/move associated per-event data: nodestore, event
attachments, user reports. Mark the event as "tombstoned" in Snuba.
Expand All @@ -173,6 +176,9 @@ def handle_remaining_events(
if event_ids_redis_key is not None:
event_ids, from_timestamp, to_timestamp = pop_batched_events_from_redis(event_ids_redis_key)

if TYPE_CHECKING:
assert event_ids is not None

metrics.distribution(
"events.reprocessing.handle_remaining_events.batch_size",
len(event_ids),
Expand All @@ -187,10 +193,10 @@ def handle_remaining_events(

# Remove from nodestore
node_ids = [Event.generate_node_id(project_id, event_id) for event_id in event_ids]
nodestore.delete_multi(node_ids)
nodestore.backend.delete_multi(node_ids)

# Tell Snuba to delete the event data.
eventstream.tombstone_events_unsafe(
eventstream.backend.tombstone_events_unsafe(
project_id, event_ids, from_timestamp=from_timestamp, to_timestamp=to_timestamp
)
elif remaining_events == "keep":
Expand All @@ -199,7 +205,7 @@ def handle_remaining_events(
group_id=new_group_id
)

eventstream.replace_group_unsafe(
eventstream.backend.replace_group_unsafe(
project_id,
event_ids,
new_group_id=new_group_id,
Expand All @@ -225,7 +231,7 @@ def handle_remaining_events(
time_limit=(60 * 5) + 5,
soft_time_limit=60 * 5,
)
def finish_reprocessing(project_id, group_id):
def finish_reprocessing(project_id: int, group_id: int) -> None:
from sentry.models.activity import Activity
from sentry.models.group import Group
from sentry.models.groupredirect import GroupRedirect
Expand Down Expand Up @@ -263,7 +269,7 @@ def finish_reprocessing(project_id, group_id):
force_flush_batch=True,
)

eventstream.exclude_groups(project_id, [group_id])
eventstream.backend.exclude_groups(project_id, [group_id])

from sentry import similarity

Expand Down
2 changes: 1 addition & 1 deletion src/sentry/tasks/store.py
Expand Up @@ -262,7 +262,7 @@ def preprocess_event(
def preprocess_event_from_reprocessing(
cache_key: str,
data: Event | None = None,
start_time: int | None = None,
start_time: float | None = None,
event_id: str | None = None,
project: Project | None = None,
**kwargs: Any,
Expand Down

0 comments on commit 4c634e0

Please sign in to comment.