Skip to content

Commit

Permalink
feat: Implement SerialBatcher which helps with transforming single wr…
Browse files Browse the repository at this point in the history
…ites into batch writes. (#7)
  • Loading branch information
dpcollins-google committed Aug 11, 2020
1 parent f72a2f0 commit a6dc15f
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions google/cloud/pubsublite/internal/wire/serial_batcher.py
@@ -0,0 +1,50 @@
from abc import ABC, abstractmethod
from typing import Generic, List, Iterable
import asyncio

from google.cloud.pubsublite.internal.wire.connection import Request, Response
from google.cloud.pubsublite.internal.wire.work_item import WorkItem


class BatchTester(Generic[Request], ABC):
"""A BatchTester determines whether a given batch of messages must be sent."""
@abstractmethod
def test(self, requests: Iterable[Request]) -> bool:
"""
Args:
requests: The current outstanding batch.
Returns: Whether that batch must be sent.
"""
raise NotImplementedError()


class SerialBatcher(Generic[Request, Response]):
_tester: BatchTester[Request]
_requests: List[WorkItem[Request]] # A list of outstanding requests

def __init__(self, tester: BatchTester[Request]):
self._tester = tester
self._requests = []

def add(self, request: Request) -> 'asyncio.Future[Response]':
"""Add a new request to this batcher. Callers must always call should_flush() after add, and flush() if that returns
true.
Args:
request: The request to send.
Returns:
A future that will resolve to the response or a GoogleAPICallError.
"""
item = WorkItem[Request](request)
self._requests.append(item)
return item.response_future

def should_flush(self) -> bool:
return self._tester.test(item.request for item in self._requests)

def flush(self) -> Iterable[WorkItem[Request]]:
requests = self._requests
self._requests = []
return requests

0 comments on commit a6dc15f

Please sign in to comment.