forked from googleapis/google-cloud-ruby
/
acknowledge_test.rb
154 lines (131 loc) · 5.32 KB
/
acknowledge_test.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require "helper"
describe Google::Cloud::PubSub::Subscriber, :acknowledge, :mock_pubsub do
let(:topic_name) { "topic-name-goes-here" }
let(:sub_name) { "subscription-name-goes-here" }
let(:sub_hash) { subscription_hash topic_name, sub_name }
let(:sub_grpc) { Google::Cloud::PubSub::V1::Subscription.new(sub_hash) }
let(:sub_path) { sub_grpc.name }
let(:subscription) { Google::Cloud::PubSub::Subscription.from_grpc sub_grpc, pubsub.service }
let(:rec_msg1_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message1-msg-goes-here", 1111) }
let(:rec_msg2_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message2-msg-goes-here", 1112) }
let(:rec_msg3_grpc) { Google::Cloud::PubSub::V1::ReceivedMessage.new \
rec_message_hash("rec_message3-msg-goes-here", 1113) }
let(:client_id) { "my-client-uuid" }
it "can acknowledge a single message" do
rec_message_msg = "pulled-message"
rec_message_ack_id = 123456789
pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new rec_messages_hash(rec_message_msg, rec_message_ack_id)
response_groups = [[pull_res]]
stub = StreamingPullStub.new response_groups
called = false
subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id
subscriber = subscription.listen streams: 1 do |result|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! unless called
assert_kind_of Google::Cloud::PubSub::ReceivedMessage, result
assert_equal rec_message_msg, result.data
assert_equal "ack-id-#{rec_message_ack_id}", result.ack_id
result.ack!
called = true
end
subscriber.start
subscriber_retries = 0
while !called
fail "total number of calls were never made" if subscriber_retries > 100
subscriber_retries += 1
sleep 0.01
end
subscriber.stop
subscriber.wait!
_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
]
# pusher thread pool may deliver out of order, which stinks...
ack_msg_ids = []
stub.acknowledge_requests.each do |ack_sub_path, msg_ids|
assert_equal ack_sub_path, sub_path
ack_msg_ids += msg_ids
end
_(ack_msg_ids.sort).must_equal ["ack-id-123456789"]
# pusher thread pool may deliver out of order, which stinks...
mod_ack_hash = {}
stub.modify_ack_deadline_requests.each do |ack_sub_path, msg_ids, deadline|
assert_equal ack_sub_path, sub_path
if mod_ack_hash.key? deadline
mod_ack_hash[deadline] += msg_ids
else
mod_ack_hash[deadline] = msg_ids
end
end
_(mod_ack_hash[60].sort).must_equal ["ack-id-123456789"]
end
it "can acknowledge multiple messages" do
pull_res = Google::Cloud::PubSub::V1::StreamingPullResponse.new received_messages: [rec_msg1_grpc, rec_msg2_grpc, rec_msg3_grpc]
response_groups = [[pull_res]]
stub = StreamingPullStub.new response_groups
called = 0
subscription.service.mocked_subscriber = stub
subscription.service.client_id = client_id
subscriber = subscription.listen streams: 1 do |msg|
# flush the initial buffer before any callbacks are processed
subscriber.buffer.flush! if called.zero?
assert_kind_of Google::Cloud::PubSub::ReceivedMessage, msg
msg.ack!
called += 1
end
subscriber.start
subscriber_retries = 0
while called < 3
fail "total number of calls were never made" if subscriber_retries > 200
subscriber_retries += 1
sleep 0.01
end
subscriber.stop
subscriber.wait!
_(stub.requests.map(&:to_a)).must_equal [
[Google::Cloud::PubSub::V1::StreamingPullRequest.new(
client_id: client_id,
subscription: sub_path,
stream_ack_deadline_seconds: 60
)]
]
# pusher thread pool may deliver out of order, which stinks...
ack_msg_ids = []
stub.acknowledge_requests.each do |ack_sub_path, msg_ids|
assert_equal ack_sub_path, sub_path
ack_msg_ids += msg_ids
end
_(ack_msg_ids.sort).must_equal ["ack-id-1111", "ack-id-1112", "ack-id-1113"]
# pusher thread pool may deliver out of order, which stinks...
mod_ack_hash = {}
stub.modify_ack_deadline_requests.each do |ack_sub_path, msg_ids, deadline|
assert_equal ack_sub_path, sub_path
if mod_ack_hash.key? deadline
mod_ack_hash[deadline] += msg_ids
else
mod_ack_hash[deadline] = msg_ids
end
end
_(mod_ack_hash[60].sort).must_equal ["ack-id-1111", "ack-id-1112", "ack-id-1113"]
end
end