Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wire batching settings through publisher client factories #42

Merged
merged 2 commits into from Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 19 additions & 6 deletions google/cloud/pubsublite/cloudpubsub/make_publisher.py
Expand Up @@ -2,6 +2,7 @@

from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials
from google.cloud.pubsub_v1.types import BatchSettings

from google.cloud.pubsublite.cloudpubsub.internal.async_publisher_impl import (
AsyncPublisherImpl,
Expand All @@ -10,15 +11,19 @@
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher
from google.cloud.pubsublite.internal.wire.make_publisher import (
make_publisher as make_wire_publisher,
DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING,
)
from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata
from google.cloud.pubsublite.internal.wire.pubsub_context import pubsub_context
from google.cloud.pubsublite.paths import TopicPath


DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING


def make_async_publisher(
topic: TopicPath,
batching_delay_secs: Optional[float] = None,
per_partition_batching_settings: Optional[BatchSettings] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None,
Expand All @@ -28,7 +33,7 @@ def make_async_publisher(

Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Expand All @@ -43,15 +48,19 @@ def make_async_publisher(

def underlying_factory():
return make_wire_publisher(
topic, batching_delay_secs, credentials, client_options, metadata
topic,
per_partition_batching_settings,
credentials,
client_options,
metadata,
)

return AsyncPublisherImpl(underlying_factory)


def make_publisher(
topic: TopicPath,
batching_delay_secs: Optional[float] = None,
per_partition_batching_settings: Optional[BatchSettings] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None,
Expand All @@ -61,7 +70,7 @@ def make_publisher(

Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Expand All @@ -74,6 +83,10 @@ def make_publisher(
"""
return PublisherImpl(
make_async_publisher(
topic, batching_delay_secs, credentials, client_options, metadata
topic,
per_partition_batching_settings,
credentials,
client_options,
metadata,
)
)
22 changes: 16 additions & 6 deletions google/cloud/pubsublite/internal/wire/make_publisher.py
@@ -1,5 +1,7 @@
from typing import AsyncIterator, Mapping, Optional, MutableMapping

from google.cloud.pubsub_v1.types import BatchSettings

from google.cloud.pubsublite.make_admin_client import make_admin_client
from google.cloud.pubsublite.endpoints import regional_endpoint
from google.cloud.pubsublite.internal.wire.default_routing_policy import (
Expand All @@ -23,9 +25,18 @@
from google.auth.credentials import Credentials


DEFAULT_BATCHING_SETTINGS = BatchSettings(
max_bytes=(
3 * 1024 * 1024
), # 3 MiB to stay 1 MiB below GRPC's 4 MiB per-message limit.
max_messages=1000,
max_latency=0.05, # 50 ms
)


def make_publisher(
topic: TopicPath,
batching_delay_secs: Optional[float] = None,
per_partition_batching_settings: Optional[BatchSettings] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None,
Expand All @@ -35,7 +46,7 @@ def make_publisher(

Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Expand All @@ -46,9 +57,8 @@ def make_publisher(
Throws:
GoogleApiCallException on any error determining topic structure.
"""
batching_delay_secs = (
batching_delay_secs if batching_delay_secs is not None else 0.05
)
if per_partition_batching_settings is None:
per_partition_batching_settings = DEFAULT_BATCHING_SETTINGS
admin_client = make_admin_client(
region=topic.location.region,
credentials=credentials,
Expand Down Expand Up @@ -76,7 +86,7 @@ def connection_factory(requests: AsyncIterator[PublishRequest]):

clients[partition] = SinglePartitionPublisher(
InitialPublishRequest(topic=str(topic), partition=partition.value),
batching_delay_secs,
per_partition_batching_settings,
GapicConnectionFactory(connection_factory),
)
return RoutingPublisher(DefaultRoutingPolicy(partition_count), clients)
Expand Up @@ -2,6 +2,8 @@
from typing import Optional, List, Iterable

from absl import logging
from google.cloud.pubsub_v1.types import BatchSettings

from google.cloud.pubsublite.internal.wire.publisher import Publisher
from google.cloud.pubsublite.internal.wire.retrying_connection import (
RetryingConnection,
Expand Down Expand Up @@ -40,7 +42,7 @@ class SinglePartitionPublisher(
BatchTester[PubSubMessage],
):
_initial: InitialPublishRequest
_flush_seconds: float
_batching_settings: BatchSettings
_connection: RetryingConnection[PublishRequest, PublishResponse]

_batcher: SerialBatcher[PubSubMessage, Cursor]
Expand All @@ -52,11 +54,11 @@ class SinglePartitionPublisher(
def __init__(
self,
initial: InitialPublishRequest,
flush_seconds: float,
batching_settings: BatchSettings,
factory: ConnectionFactory[PublishRequest, PublishResponse],
):
self._initial = initial
self._flush_seconds = flush_seconds
self._batching_settings = batching_settings
self._connection = RetryingConnection(factory, self)
self._batcher = SerialBatcher(self)
self._outstanding_writes = []
Expand Down Expand Up @@ -117,7 +119,7 @@ async def _receive_loop(self):
async def _flush_loop(self):
try:
while True:
await asyncio.sleep(self._flush_seconds)
await asyncio.sleep(self._batching_settings.max_latency)
await self._flush()
except asyncio.CancelledError:
return
Expand Down
Expand Up @@ -5,6 +5,8 @@

from asynctest.mock import MagicMock, CoroutineMock
import pytest
from google.cloud.pubsub_v1.types import BatchSettings

from google.cloud.pubsublite.internal.wire.connection import (
Connection,
ConnectionFactory,
Expand All @@ -25,6 +27,9 @@
from google.cloud.pubsublite.internal.wire.retrying_connection import _MIN_BACKOFF_SECS

FLUSH_SECONDS = 100000
BATCHING_SETTINGS = BatchSettings(
max_bytes=3 * 1024 * 1024, max_messages=1000, max_latency=FLUSH_SECONDS
)

# All test coroutines will be treated as marked.
pytestmark = pytest.mark.asyncio
Expand Down Expand Up @@ -81,7 +86,7 @@ async def sleeper(delay: float):
@pytest.fixture()
def publisher(connection_factory, initial_request):
return SinglePartitionPublisher(
initial_request.initial_request, FLUSH_SECONDS, connection_factory
initial_request.initial_request, BATCHING_SETTINGS, connection_factory
)


Expand Down