-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Maybe no buffered_messages_lock? #127
Comments
And it’s possible to move the In addition, if a stream(partition) corresponds to only one publisher, line 233, 244 can be moved out of the for loop. |
Here is a simple wrapper for Producer, with the following modifications:
About point 5: There may be a performance improvement of close to 20%, but I'm not sure if there are any bugs in the code. class MyProducer(Producer):
def __init__(
self,
host: str,
port: int = 5552,
*,
ssl_context: Optional[ssl.SSLContext] = None,
vhost: str = "/",
username: str,
password: str,
frame_max: int = 1 * 1024 * 1024,
heartbeat: int = 60,
load_balancer_mode: bool = False,
max_retries: int = 20,
default_batch_publishing_delay: float = 0.2,
connection_closed_handler: Optional[CB[Exception]] = None,
):
super().__init__(
host,
port,
ssl_context=ssl_context,
vhost=vhost,
username=username,
password=password,
frame_max=frame_max,
heartbeat=heartbeat,
load_balancer_mode=load_balancer_mode,
max_retries=max_retries,
default_batch_publishing_delay=default_batch_publishing_delay,
connection_closed_handler=connection_closed_handler,
)
self._buffered_sub_entry_messages: dict[str, list] = defaultdict(list)
self._publishers_name: dict[str, str] = {} # stream : publisher_name
self._publishing_ids_callback: dict[
str, defaultdict[CB[ConfirmationStatus], set[int]]
] = {} # use the same dict in `_send_batch` & `_send_sub_entry_batch`
async def close(self) -> None:
if self.task is not None:
for stream, messages in self._buffered_messages.items():
if messages:
with suppress(BaseException):
await self._send_batch(stream, messages)
messages.clear()
for stream, messages in self._buffered_sub_entry_messages.items():
if messages:
with suppress(BaseException):
await self._send_batch(stream, messages)
messages.clear()
self.task.cancel()
for publisher in self._publishers.values():
client = publisher.client
with suppress(BaseException):
await client.delete_publisher(publisher.id)
publisher_reference = publisher.reference
client.remove_handler(schema.PublishConfirm, publisher_reference)
client.remove_handler(schema.PublishError, publisher_reference)
self._publishers.clear()
await self._pool.close()
self._clients.clear()
self._waiting_for_confirm.clear()
self._default_client = None
async def _get_or_create_client(self, stream: str) -> Client:
clients = self._clients
if (client := clients.get(stream)) is not None:
return client
leader, _ = await self.default_client.query_leader_and_replicas(stream)
clients[stream] = client = await self._pool.get(
Addr(leader.host, leader.port), self._connection_closed_handler
)
return client
async def _get_or_create_publisher(
self,
stream: str,
publisher_name: Optional[str] = None,
) -> _Publisher:
publishers = self._publishers
if (publisher := publishers.get(stream)) is not None:
if publisher_name is not None:
assert publisher.reference == publisher_name
return publisher
async with self._lock:
if (publisher := publishers.get(stream)) is not None:
if publisher_name is not None:
assert publisher.reference == publisher_name
return publisher
client = await self._get_or_create_client(stream)
# We can have multiple publishers sharing same connection, so their ids must be distinct
publisher_id = (
len([p for p in publishers.values() if p.client is client]) + 1
)
if publisher_name is None:
publisher_name = f"{stream}_publisher_{publisher_id}"
self._publishers_name[stream] = publisher_name
reference = publisher_name
publisher = publishers[stream] = _Publisher(
id=publisher_id,
stream=stream,
reference=reference,
sequence=utils.MonotonicSeq(),
client=client,
)
await client.declare_publisher(
stream=stream,
reference=reference,
publisher_id=publisher_id,
)
sequence = await client.query_publisher_sequence(
stream=stream,
reference=reference,
)
publisher.sequence.set(sequence + 1)
client.add_handler(
schema.PublishConfirm,
partial(self._on_publish_confirm, publisher=publisher),
name=reference,
)
client.add_handler(
schema.PublishError,
partial(self._on_publish_error, publisher=publisher),
name=reference,
)
return publisher
async def send_batch(
self,
stream: str,
batch: list[MessageT],
publisher_name: Optional[str] = None,
on_publish_confirm: Optional[CB[ConfirmationStatus]] = None,
) -> list[int]:
if (
publisher_name is not None
and publisher_name
!= self._publishers_name.setdefault(stream, publisher_name)
):
raise Exception
wrapped_batch = [
_MessageNotification(entry=entry, callback=on_publish_confirm)
for entry in batch
]
return await self._send_batch(stream, wrapped_batch, publisher_name)
async def _send_batch(
self,
stream: str,
batch: list[_MessageNotification],
publisher_name: Optional[str] = None,
pending_length: Optional[int] = None,
return_ids: bool = True,
) -> list[int]:
if len(batch) == 0:
raise ValueError("Empty batch")
messages = []
messages_append = messages.append
# messages = [None] * pending_length
publishing_ids_callback = self._publishing_ids_callback.setdefault(
stream, defaultdict(set)
)
publisher = await self._get_or_create_publisher(stream, publisher_name)
sequence_next = publisher.sequence.next
for item in islice(batch, pending_length):
entry = item.entry
msg = RawMessage(entry) if isinstance(entry, bytes) else entry
if (publishing_id := msg.publishing_id) is None:
msg.publishing_id = publishing_id = sequence_next()
if (callback := item.callback) is not None:
publishing_ids_callback[callback].add(publishing_id)
messages_append(
schema.Message(
publishing_id=publishing_id,
data=bytes(msg),
)
)
await publisher.client.send_frame(
schema.Publish(
publisher_id=publisher.id,
messages=messages,
),
)
if publishing_ids_callback:
waiters = self._waiting_for_confirm[publisher.reference]
for callback, ids in publishing_ids_callback.items():
waiters.setdefault(callback, set()).update(ids)
publishing_ids_callback.clear()
if return_ids:
return list(set(m.publishing_id for m in messages))
async def _send_sub_entry_batch(
self,
stream: str,
batch: list[_MessageNotification],
publisher_name: Optional[str] = None,
pending_length: Optional[int] = None,
):
publishing_ids = []
publishing_ids_append = publishing_ids.append
publishing_ids_callback = self._publishing_ids_callback.setdefault(
stream, defaultdict(set)
)
publisher = await self._get_or_create_publisher(stream, publisher_name)
publisher_id, sequence_next = publisher.id, publisher.sequence.next
send_frame = publisher.client.send_frame
for item in islice(batch, pending_length):
entry = item.entry
messages_count = entry.messages_count()
for _ in repeat(None, messages_count):
publishing_id = sequence_next()
publishing_ids_append(publishing_id)
if (callback := item.callback) is not None:
publishing_ids_callback[callback].add(publishing_id)
await send_frame(
schema.PublishSubBatching(
publisher_id=publisher_id,
number_of_root_messages=1,
publishing_id=publishing_id,
compress_type=0x80 | entry.compression_type() << 4,
subbatching_message_count=messages_count,
uncompressed_data_size=entry.uncompressed_size(),
compressed_data_size=entry.compressed_size(),
messages=entry.data(),
),
)
if publishing_ids_callback:
waiters = self._waiting_for_confirm[publisher.reference]
for callback, ids in publishing_ids_callback.items():
waiters.setdefault(callback, set()).update(ids)
publishing_ids_callback.clear()
return publishing_ids
async def send_wait(
self,
stream: str,
message: MessageT,
publisher_name: Optional[str] = None,
) -> int:
if (
publisher_name is not None
and publisher_name
!= self._publishers_name.setdefault(stream, publisher_name)
):
raise Exception
publisher = await self._get_or_create_publisher(stream, publisher_name)
msg = RawMessage(message) if isinstance(message, bytes) else message
if (publishing_id := msg.publishing_id) is None:
msg.publishing_id = publishing_id = publisher.sequence.next()
await publisher.client.send_frame(
schema.Publish(
publisher_id=publisher.id,
messages=[
schema.Message(
publishing_id=publishing_id,
data=bytes(msg),
)
],
),
)
future: asyncio.Future[None] = asyncio.Future()
self._waiting_for_confirm[publisher.reference][future] = {publishing_id}
await future
return publishing_id
async def send(
self,
stream: str,
message: MessageT,
publisher_name: Optional[str] = None, # # may be removed
on_publish_confirm: Optional[CB[ConfirmationStatus]] = None,
):
if (
publisher_name is not None
and publisher_name
!= self._publishers_name.setdefault(stream, publisher_name)
):
raise Exception
# start the background thread to send buffered messages
if self.task is None:
self.task = asyncio.create_task(self._timer())
self.task.add_done_callback(self._timer_completed)
wrapped_message = _MessageNotification(
entry=message, callback=on_publish_confirm
)
self._buffered_messages[stream].append(wrapped_message)
self._default_context_switch_counter += 1
if self._default_context_switch_counter > self._default_context_switch_value:
await asyncio.sleep(0)
self._default_context_switch_counter = 0
async def send_sub_entry(
self,
stream: str,
sub_entry_messages: list[MessageT],
compression_type: CompressionType = CompressionType.No,
publisher_name: Optional[str] = None, # may be removed
on_publish_confirm: Optional[CB[ConfirmationStatus]] = None,
):
if (
publisher_name is not None
and publisher_name
!= self._publishers_name.setdefault(stream, publisher_name)
):
raise Exception
if len(sub_entry_messages) == 0:
raise ValueError("Empty batch")
# start the background thread to send buffered messages
if self.task is None:
self.task = asyncio.create_task(self._timer())
self.task.add_done_callback(self._timer_completed)
compression_codec = CompressionHelper.compress(
sub_entry_messages, compression_type
)
wrapped_message = _MessageNotification(
entry=compression_codec, callback=on_publish_confirm
)
self._buffered_sub_entry_messages[stream].append(wrapped_message)
await asyncio.sleep(0)
async def _timer(self):
delay = self._default_batch_publishing_delay
buffered_messages = self._buffered_messages
buffered_sub_entry_messages = self._buffered_sub_entry_messages
publishers_name_getter = self._publishers_name.get
send_batch, send_sub_entry_batch = self._send_batch, self._send_sub_entry_batch
while True:
await asyncio.sleep(delay)
for stream, messages in buffered_messages.items():
if messages:
pending_length = len(messages)
await send_batch(
stream,
messages,
publishers_name_getter(stream),
pending_length,
return_ids=False,
)
del messages[:pending_length]
if buffered_sub_entry_messages:
for stream, messages in buffered_sub_entry_messages.items():
if messages:
pending_length = len(messages)
await send_sub_entry_batch(
stream,
messages,
publishers_name_getter(stream),
pending_length,
)
del messages[:pending_length]
async def _on_publish_confirm(
self, frame: schema.PublishConfirm, publisher: _Publisher
) -> None:
if frame.publisher_id != publisher.id:
return
waiting = self._waiting_for_confirm[publisher.reference]
for confirmation in list(waiting):
ids = waiting[confirmation]
frame_publishing_ids = frame.publishing_ids
ids_to_call = ids.intersection(frame_publishing_ids)
ids.difference_update(frame_publishing_ids)
if not ids:
del waiting[confirmation]
if not isinstance(confirmation, asyncio.Future):
for id in ids_to_call:
confirmation_status = ConfirmationStatus(id, True)
result = confirmation(confirmation_status)
if result is not None and hasattr(result, "__await__"):
await result
else:
confirmation.set_result(None) |
Thank you for your feedback but we don't think the lock is the performance bottleneck. I would invest time on #133 |
Here is a scenario: Once |
I'll close this issue as at the moment we are not planning this implementation. We are anyway noting this issue in the Performance one so in case in the future we may want to look at it again |
It’s unnecessary to acquire/release
buffered_messages_lock
frequently when a large number of messages need to be sent out.To remove the lock, function
_timer
can be written like one of the following two ways:1.use the copy of the
_buffered_messages
andclear
it before_send_batch
.(with data copy)2.trace the
pending_write_length
about_buffered_messages[stream]
anddel
after_send_batch
.(with a new argpending_write_length
on the_send_batch
)And another thing about concurrently write:
One
Producer
, One_timer
task, so the published messages is strictly in order from the perspective of the underlayingStreamWriter
’s view.I don’t know how many connections the
Producer
will open, it may depend on the RabbitMQ’s topology?But if there are multiple connections in one
Producer
, it should send messages concurrently. In this way, the_timer
task should be at the client connection level like the_listener
task, not in theProducer
.The text was updated successfully, but these errors were encountered: