-
Notifications
You must be signed in to change notification settings - Fork 12
/
serial_batcher.py
50 lines (38 loc) · 1.51 KB
/
serial_batcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from abc import ABC, abstractmethod
from typing import Generic, List, Iterable
import asyncio
from google.cloud.pubsublite.internal.wire.connection import Request, Response
from google.cloud.pubsublite.internal.wire.work_item import WorkItem
class BatchTester(Generic[Request], ABC):
"""A BatchTester determines whether a given batch of messages must be sent."""
@abstractmethod
def test(self, requests: Iterable[Request]) -> bool:
"""
Args:
requests: The current outstanding batch.
Returns: Whether that batch must be sent.
"""
raise NotImplementedError()
class SerialBatcher(Generic[Request, Response]):
_tester: BatchTester[Request]
_requests: List[WorkItem[Request, Response]] # A list of outstanding requests
def __init__(self, tester: BatchTester[Request]):
self._tester = tester
self._requests = []
def add(self, request: Request) -> 'asyncio.Future[Response]':
"""Add a new request to this batcher. Callers must always call should_flush() after add, and flush() if that returns
true.
Args:
request: The request to send.
Returns:
A future that will resolve to the response or a GoogleAPICallError.
"""
item = WorkItem[Request, Response](request)
self._requests.append(item)
return item.response_future
def should_flush(self) -> bool:
return self._tester.test(item.request for item in self._requests)
def flush(self) -> List[WorkItem[Request, Response]]:
requests = self._requests
self._requests = []
return requests