/
test_dlq.py
70 lines (56 loc) · 1.96 KB
/
test_dlq.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import time
from datetime import datetime
from unittest.mock import Mock
import msgpack
import pytest
from arroyo.backends.kafka import KafkaPayload
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message, Partition, Topic
from sentry.ingest.consumer.factory import IngestStrategyFactory
from sentry.testutils.pytest.fixtures import django_db_all
def make_message(payload: bytes, partition: Partition, offset: int) -> Message:
return Message(
BrokerValue(
KafkaPayload(None, payload, []),
partition,
offset,
datetime.now(),
)
)
@django_db_all
def test_dlq_invalid_messages(factories) -> None:
organization = factories.create_organization()
project = factories.create_project(organization=organization)
valid_payload = msgpack.packb(
{
"type": "event",
"project_id": project.id,
"payload": b"{}",
"start_time": int(time.time()),
"event_id": "aaa",
}
)
bogus_payload = b"bogus message"
partition = Partition(Topic("ingest-events"), 0)
offset = 5
factory = IngestStrategyFactory(
"events",
reprocess_only_stuck_events=False,
num_processes=1,
max_batch_size=1,
max_batch_time=1,
input_block_size=None,
output_block_size=None,
)
strategy = factory.create_with_partitions(Mock(), Mock())
# Valid payload raises original error
with pytest.raises(Exception) as exc_info:
message = make_message(valid_payload, partition, offset)
strategy.submit(message)
assert not isinstance(exc_info.value, InvalidMessage)
# Invalid payload raises InvalidMessage error
with pytest.raises(InvalidMessage) as exc_info:
message = make_message(bogus_payload, partition, offset)
strategy.submit(message)
assert exc_info.value.partition == partition
assert exc_info.value.offset == offset