diff --git a/google/cloud/pubsublite/cloudpubsub/make_publisher.py b/google/cloud/pubsublite/cloudpubsub/make_publisher.py index a1acc51e..487eea2d 100644 --- a/google/cloud/pubsublite/cloudpubsub/make_publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/make_publisher.py @@ -2,6 +2,7 @@ from google.api_core.client_options import ClientOptions from google.auth.credentials import Credentials +from google.cloud.pubsub_v1.types import BatchSettings from google.cloud.pubsublite.cloudpubsub.internal.async_publisher_impl import ( AsyncPublisherImpl, @@ -10,15 +11,19 @@ from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher from google.cloud.pubsublite.internal.wire.make_publisher import ( make_publisher as make_wire_publisher, + DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING, ) from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata from google.cloud.pubsublite.internal.wire.pubsub_context import pubsub_context from google.cloud.pubsublite.paths import TopicPath +DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING + + def make_async_publisher( topic: TopicPath, - batching_delay_secs: Optional[float] = None, + per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, metadata: Optional[Mapping[str, str]] = None, @@ -28,7 +33,7 @@ def make_async_publisher( Args: topic: The topic to publish to. - batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases. + per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. metadata: Additional metadata to send with the RPC. @@ -43,7 +48,11 @@ def make_async_publisher( def underlying_factory(): return make_wire_publisher( - topic, batching_delay_secs, credentials, client_options, metadata + topic, + per_partition_batching_settings, + credentials, + client_options, + metadata, ) return AsyncPublisherImpl(underlying_factory) @@ -51,7 +60,7 @@ def underlying_factory(): def make_publisher( topic: TopicPath, - batching_delay_secs: Optional[float] = None, + per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, metadata: Optional[Mapping[str, str]] = None, @@ -61,7 +70,7 @@ def make_publisher( Args: topic: The topic to publish to. - batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases. + per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. metadata: Additional metadata to send with the RPC. @@ -74,6 +83,10 @@ def make_publisher( """ return PublisherImpl( make_async_publisher( - topic, batching_delay_secs, credentials, client_options, metadata + topic, + per_partition_batching_settings, + credentials, + client_options, + metadata, ) ) diff --git a/google/cloud/pubsublite/internal/wire/make_publisher.py b/google/cloud/pubsublite/internal/wire/make_publisher.py index c925ed0c..7cd5d027 100644 --- a/google/cloud/pubsublite/internal/wire/make_publisher.py +++ b/google/cloud/pubsublite/internal/wire/make_publisher.py @@ -1,5 +1,7 @@ from typing import AsyncIterator, Mapping, Optional, MutableMapping +from google.cloud.pubsub_v1.types import BatchSettings + from google.cloud.pubsublite.make_admin_client import make_admin_client from google.cloud.pubsublite.endpoints import regional_endpoint from google.cloud.pubsublite.internal.wire.default_routing_policy import ( @@ -23,9 +25,18 @@ from google.auth.credentials import Credentials +DEFAULT_BATCHING_SETTINGS = BatchSettings( + max_bytes=( + 3 * 1024 * 1024 + ), # 3 MiB to stay 1 MiB below GRPC's 4 MiB per-message limit. + max_messages=1000, + max_latency=0.05, # 50 ms +) + + def make_publisher( topic: TopicPath, - batching_delay_secs: Optional[float] = None, + per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, metadata: Optional[Mapping[str, str]] = None, @@ -35,7 +46,7 @@ def make_publisher( Args: topic: The topic to publish to. - batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases. + per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. metadata: Additional metadata to send with the RPC. @@ -46,9 +57,8 @@ def make_publisher( Throws: GoogleApiCallException on any error determining topic structure. """ - batching_delay_secs = ( - batching_delay_secs if batching_delay_secs is not None else 0.05 - ) + if per_partition_batching_settings is None: + per_partition_batching_settings = DEFAULT_BATCHING_SETTINGS admin_client = make_admin_client( region=topic.location.region, credentials=credentials, @@ -76,7 +86,7 @@ def connection_factory(requests: AsyncIterator[PublishRequest]): clients[partition] = SinglePartitionPublisher( InitialPublishRequest(topic=str(topic), partition=partition.value), - batching_delay_secs, + per_partition_batching_settings, GapicConnectionFactory(connection_factory), ) return RoutingPublisher(DefaultRoutingPolicy(partition_count), clients) diff --git a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py index 895d2b3b..b0167e57 100644 --- a/google/cloud/pubsublite/internal/wire/single_partition_publisher.py +++ b/google/cloud/pubsublite/internal/wire/single_partition_publisher.py @@ -2,6 +2,8 @@ from typing import Optional, List, Iterable from absl import logging +from google.cloud.pubsub_v1.types import BatchSettings + from google.cloud.pubsublite.internal.wire.publisher import Publisher from google.cloud.pubsublite.internal.wire.retrying_connection import ( RetryingConnection, @@ -40,7 +42,7 @@ class SinglePartitionPublisher( BatchTester[PubSubMessage], ): _initial: InitialPublishRequest - _flush_seconds: float + _batching_settings: BatchSettings _connection: RetryingConnection[PublishRequest, PublishResponse] _batcher: SerialBatcher[PubSubMessage, Cursor] @@ -52,11 +54,11 @@ class SinglePartitionPublisher( def __init__( self, initial: InitialPublishRequest, - flush_seconds: float, + batching_settings: BatchSettings, factory: ConnectionFactory[PublishRequest, PublishResponse], ): self._initial = initial - self._flush_seconds = flush_seconds + self._batching_settings = batching_settings self._connection = RetryingConnection(factory, self) self._batcher = SerialBatcher(self) self._outstanding_writes = [] @@ -117,7 +119,7 @@ async def _receive_loop(self): async def _flush_loop(self): try: while True: - await asyncio.sleep(self._flush_seconds) + await asyncio.sleep(self._batching_settings.max_latency) await self._flush() except asyncio.CancelledError: return diff --git a/tests/unit/pubsublite/internal/wire/single_partition_publisher_test.py b/tests/unit/pubsublite/internal/wire/single_partition_publisher_test.py index ba998cd7..cfd998ac 100644 --- a/tests/unit/pubsublite/internal/wire/single_partition_publisher_test.py +++ b/tests/unit/pubsublite/internal/wire/single_partition_publisher_test.py @@ -5,6 +5,8 @@ from asynctest.mock import MagicMock, CoroutineMock import pytest +from google.cloud.pubsub_v1.types import BatchSettings + from google.cloud.pubsublite.internal.wire.connection import ( Connection, ConnectionFactory, @@ -25,6 +27,9 @@ from google.cloud.pubsublite.internal.wire.retrying_connection import _MIN_BACKOFF_SECS FLUSH_SECONDS = 100000 +BATCHING_SETTINGS = BatchSettings( + max_bytes=3 * 1024 * 1024, max_messages=1000, max_latency=FLUSH_SECONDS +) # All test coroutines will be treated as marked. pytestmark = pytest.mark.asyncio @@ -81,7 +86,7 @@ async def sleeper(delay: float): @pytest.fixture() def publisher(connection_factory, initial_request): return SinglePartitionPublisher( - initial_request.initial_request, FLUSH_SECONDS, connection_factory + initial_request.initial_request, BATCHING_SETTINGS, connection_factory )