Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maybe no buffered_messages_lock? #127

Closed
garvenlee opened this issue Sep 20, 2023 · 5 comments
Closed

Maybe no buffered_messages_lock? #127

garvenlee opened this issue Sep 20, 2023 · 5 comments

Comments

@garvenlee
Copy link

garvenlee commented Sep 20, 2023

It’s unnecessary to acquire/release buffered_messages_lock frequently when a large number of messages need to be sent out.

To remove the lock, function _timer can be written like one of the following two ways:

1.use the copy of the _buffered_messages and clear it before _send_batch.(with data copy)

2.trace the pending_write_length about _buffered_messages[stream] and del after _send_batch.(with a new arg pending_write_length on the _send_batch)

async def _timer(self):
    send_batch = self._send_batch
    delay = self._default_batch_publishing_delay
    while True:
        await asyncio.sleep(delay)
        for stream, messages in self._buffered_messages.items():
            if messages:
                messages_copy = messages[:]
                messages.clear()  
                # pending_write_length = len(messages)
                await send_batch(stream, messages_copy)
                # del messages[:pending_write_length]

And another thing about concurrently write:

One Producer, One _timer task, so the published messages is strictly in order from the perspective of the underlaying StreamWriter’s view.

I don’t know how many connections the Producer will open, it may depend on the RabbitMQ’s topology?

But if there are multiple connections in one Producer, it should send messages concurrently. In this way, the _timer task should be at the client connection level like the _listener task, not in the Producer.

@garvenlee
Copy link
Author

And it’s possible to move the _lock at line 233 in producer.py to function _get_or_create_publisher? Maybe _lock is only required when _send_batch needs to create a new publisher.

In addition, if a stream(partition) corresponds to only one publisher, line 233, 244 can be moved out of the for loop.

@garvenlee
Copy link
Author

Here is a simple wrapper for Producer, with the following modifications:

  1. remove _buffered_messages_lock
  2. move _lock into _get_or_create_publisher
  3. add _buffered_sub_entry_messages
  4. remove the arg sync from _send_batch
  5. redesign publisher_name (add publisher_name check in send)
  6. optimize some Python statements (attributes query etc)

About point 5:
Inside Producer.send, publisher_name and message are used together to construct _MessageNotification, but publisher_name is redundant actually. Because each partition corresponds to a publisher, and publisher_name can be found by partition_name. To support this query, Producer needs a dict _publishers_name[stream : publisher_name].

There may be a performance improvement of close to 20%, but I'm not sure if there are any bugs in the code.

class MyProducer(Producer):
    def __init__(
        self,
        host: str,
        port: int = 5552,
        *,
        ssl_context: Optional[ssl.SSLContext] = None,
        vhost: str = "/",
        username: str,
        password: str,
        frame_max: int = 1 * 1024 * 1024,
        heartbeat: int = 60,
        load_balancer_mode: bool = False,
        max_retries: int = 20,
        default_batch_publishing_delay: float = 0.2,
        connection_closed_handler: Optional[CB[Exception]] = None,
    ):
        super().__init__(
            host,
            port,
            ssl_context=ssl_context,
            vhost=vhost,
            username=username,
            password=password,
            frame_max=frame_max,
            heartbeat=heartbeat,
            load_balancer_mode=load_balancer_mode,
            max_retries=max_retries,
            default_batch_publishing_delay=default_batch_publishing_delay,
            connection_closed_handler=connection_closed_handler,
        )

        self._buffered_sub_entry_messages: dict[str, list] = defaultdict(list)
        self._publishers_name: dict[str, str] = {}  # stream : publisher_name
        self._publishing_ids_callback: dict[
            str, defaultdict[CB[ConfirmationStatus], set[int]]
        ] = {}  # use the same dict in `_send_batch` & `_send_sub_entry_batch`

    async def close(self) -> None:
        if self.task is not None:
            for stream, messages in self._buffered_messages.items():
                if messages:
                    with suppress(BaseException):
                        await self._send_batch(stream, messages)
                    messages.clear()

            for stream, messages in self._buffered_sub_entry_messages.items():
                if messages:
                    with suppress(BaseException):
                        await self._send_batch(stream, messages)
                    messages.clear()

            self.task.cancel()

        for publisher in self._publishers.values():
            client = publisher.client
            with suppress(BaseException):
                await client.delete_publisher(publisher.id)

            publisher_reference = publisher.reference
            client.remove_handler(schema.PublishConfirm, publisher_reference)
            client.remove_handler(schema.PublishError, publisher_reference)

        self._publishers.clear()

        await self._pool.close()
        self._clients.clear()
        self._waiting_for_confirm.clear()
        self._default_client = None

    async def _get_or_create_client(self, stream: str) -> Client:
        clients = self._clients
        if (client := clients.get(stream)) is not None:
            return client

        leader, _ = await self.default_client.query_leader_and_replicas(stream)
        clients[stream] = client = await self._pool.get(
            Addr(leader.host, leader.port), self._connection_closed_handler
        )
        return client

    async def _get_or_create_publisher(
        self,
        stream: str,
        publisher_name: Optional[str] = None,
    ) -> _Publisher:
        publishers = self._publishers
        if (publisher := publishers.get(stream)) is not None:
            if publisher_name is not None:
                assert publisher.reference == publisher_name
            return publisher

        async with self._lock:
            if (publisher := publishers.get(stream)) is not None:
                if publisher_name is not None:
                    assert publisher.reference == publisher_name
                return publisher

            client = await self._get_or_create_client(stream)
            # We can have multiple publishers sharing same connection, so their ids must be distinct
            publisher_id = (
                len([p for p in publishers.values() if p.client is client]) + 1
            )
            if publisher_name is None:
                publisher_name = f"{stream}_publisher_{publisher_id}"
                self._publishers_name[stream] = publisher_name
            
            reference = publisher_name
            publisher = publishers[stream] = _Publisher(
                id=publisher_id,
                stream=stream,
                reference=reference,
                sequence=utils.MonotonicSeq(),
                client=client,
            )

            await client.declare_publisher(
                stream=stream,
                reference=reference,
                publisher_id=publisher_id,
            )
            sequence = await client.query_publisher_sequence(
                stream=stream,
                reference=reference,
            )
            publisher.sequence.set(sequence + 1)

            client.add_handler(
                schema.PublishConfirm,
                partial(self._on_publish_confirm, publisher=publisher),
                name=reference,
            )
            client.add_handler(
                schema.PublishError,
                partial(self._on_publish_error, publisher=publisher),
                name=reference,
            )
            return publisher

    async def send_batch(
        self,
        stream: str,
        batch: list[MessageT],
        publisher_name: Optional[str] = None,
        on_publish_confirm: Optional[CB[ConfirmationStatus]] = None,
    ) -> list[int]:
        if (
            publisher_name is not None
            and publisher_name
            != self._publishers_name.setdefault(stream, publisher_name)
        ):
            raise Exception

        wrapped_batch = [
            _MessageNotification(entry=entry, callback=on_publish_confirm)
            for entry in batch
        ]

        return await self._send_batch(stream, wrapped_batch, publisher_name)

    async def _send_batch(
        self,
        stream: str,
        batch: list[_MessageNotification],
        publisher_name: Optional[str] = None,
        pending_length: Optional[int] = None,
        return_ids: bool = True,
    ) -> list[int]:
        if len(batch) == 0:
            raise ValueError("Empty batch")

        messages = []
        messages_append = messages.append
        # messages = [None] * pending_length
        publishing_ids_callback = self._publishing_ids_callback.setdefault(
            stream, defaultdict(set)
        )

        publisher = await self._get_or_create_publisher(stream, publisher_name)
        sequence_next = publisher.sequence.next
        for item in islice(batch, pending_length):
            entry = item.entry
            msg = RawMessage(entry) if isinstance(entry, bytes) else entry
            if (publishing_id := msg.publishing_id) is None:
                msg.publishing_id = publishing_id = sequence_next()

            if (callback := item.callback) is not None:
                publishing_ids_callback[callback].add(publishing_id)

            messages_append(
                schema.Message(
                    publishing_id=publishing_id,
                    data=bytes(msg),
                )
            )

        await publisher.client.send_frame(
            schema.Publish(
                publisher_id=publisher.id,
                messages=messages,
            ),
        )

        if publishing_ids_callback:
            waiters = self._waiting_for_confirm[publisher.reference]
            for callback, ids in publishing_ids_callback.items():
                waiters.setdefault(callback, set()).update(ids)
            publishing_ids_callback.clear()

        if return_ids:
            return list(set(m.publishing_id for m in messages))

    async def _send_sub_entry_batch(
        self,
        stream: str,
        batch: list[_MessageNotification],
        publisher_name: Optional[str] = None,
        pending_length: Optional[int] = None,
    ):
        publishing_ids = []
        publishing_ids_append = publishing_ids.append
        publishing_ids_callback = self._publishing_ids_callback.setdefault(
            stream, defaultdict(set)
        )

        publisher = await self._get_or_create_publisher(stream, publisher_name)
        publisher_id, sequence_next = publisher.id, publisher.sequence.next
        send_frame = publisher.client.send_frame
        for item in islice(batch, pending_length):
            entry = item.entry

            messages_count = entry.messages_count()
            for _ in repeat(None, messages_count):
                publishing_id = sequence_next()

            publishing_ids_append(publishing_id)
            if (callback := item.callback) is not None:
                publishing_ids_callback[callback].add(publishing_id)

            await send_frame(
                schema.PublishSubBatching(
                    publisher_id=publisher_id,
                    number_of_root_messages=1,
                    publishing_id=publishing_id,
                    compress_type=0x80 | entry.compression_type() << 4,
                    subbatching_message_count=messages_count,
                    uncompressed_data_size=entry.uncompressed_size(),
                    compressed_data_size=entry.compressed_size(),
                    messages=entry.data(),
                ),
            )

        if publishing_ids_callback:
            waiters = self._waiting_for_confirm[publisher.reference]
            for callback, ids in publishing_ids_callback.items():
                waiters.setdefault(callback, set()).update(ids)
            publishing_ids_callback.clear()

        return publishing_ids

    async def send_wait(
        self,
        stream: str,
        message: MessageT,
        publisher_name: Optional[str] = None,
    ) -> int:
        if (
            publisher_name is not None
            and publisher_name
            != self._publishers_name.setdefault(stream, publisher_name)
        ):
            raise Exception
        publisher = await self._get_or_create_publisher(stream, publisher_name)

        msg = RawMessage(message) if isinstance(message, bytes) else message
        if (publishing_id := msg.publishing_id) is None:
            msg.publishing_id = publishing_id = publisher.sequence.next()

        await publisher.client.send_frame(
            schema.Publish(
                publisher_id=publisher.id,
                messages=[
                    schema.Message(
                        publishing_id=publishing_id,
                        data=bytes(msg),
                    )
                ],
            ),
        )

        future: asyncio.Future[None] = asyncio.Future()
        self._waiting_for_confirm[publisher.reference][future] = {publishing_id}
        await future

        return publishing_id

    async def send(
        self,
        stream: str,
        message: MessageT,
        publisher_name: Optional[str] = None,  # # may be removed
        on_publish_confirm: Optional[CB[ConfirmationStatus]] = None,
    ):
        if (
            publisher_name is not None
            and publisher_name
            != self._publishers_name.setdefault(stream, publisher_name)
        ):
            raise Exception

        # start the background thread to send buffered messages
        if self.task is None:
            self.task = asyncio.create_task(self._timer())
            self.task.add_done_callback(self._timer_completed)

        wrapped_message = _MessageNotification(
            entry=message, callback=on_publish_confirm
        )
        self._buffered_messages[stream].append(wrapped_message)

        self._default_context_switch_counter += 1
        if self._default_context_switch_counter > self._default_context_switch_value:
            await asyncio.sleep(0)
            self._default_context_switch_counter = 0

    async def send_sub_entry(
        self,
        stream: str,
        sub_entry_messages: list[MessageT],
        compression_type: CompressionType = CompressionType.No,
        publisher_name: Optional[str] = None,  # may be removed
        on_publish_confirm: Optional[CB[ConfirmationStatus]] = None,
    ):
        if (
            publisher_name is not None
            and publisher_name
            != self._publishers_name.setdefault(stream, publisher_name)
        ):
            raise Exception

        if len(sub_entry_messages) == 0:
            raise ValueError("Empty batch")

        # start the background thread to send buffered messages
        if self.task is None:
            self.task = asyncio.create_task(self._timer())
            self.task.add_done_callback(self._timer_completed)

        compression_codec = CompressionHelper.compress(
            sub_entry_messages, compression_type
        )

        wrapped_message = _MessageNotification(
            entry=compression_codec, callback=on_publish_confirm
        )
        self._buffered_sub_entry_messages[stream].append(wrapped_message)
        await asyncio.sleep(0)

    async def _timer(self):
        delay = self._default_batch_publishing_delay
        buffered_messages = self._buffered_messages
        buffered_sub_entry_messages = self._buffered_sub_entry_messages
        publishers_name_getter = self._publishers_name.get
        send_batch, send_sub_entry_batch = self._send_batch, self._send_sub_entry_batch
        while True:
            await asyncio.sleep(delay)
            for stream, messages in buffered_messages.items():
                if messages:
                    pending_length = len(messages)
                    await send_batch(
                        stream,
                        messages,
                        publishers_name_getter(stream),
                        pending_length,
                        return_ids=False,
                    )
                    del messages[:pending_length]

            if buffered_sub_entry_messages:
                for stream, messages in buffered_sub_entry_messages.items():
                    if messages:
                        pending_length = len(messages)
                        await send_sub_entry_batch(
                            stream,
                            messages,
                            publishers_name_getter(stream),
                            pending_length,
                        )
                        del messages[:pending_length]

    async def _on_publish_confirm(
        self, frame: schema.PublishConfirm, publisher: _Publisher
    ) -> None:
        if frame.publisher_id != publisher.id:
            return

        waiting = self._waiting_for_confirm[publisher.reference]
        for confirmation in list(waiting):
            ids = waiting[confirmation]

            frame_publishing_ids = frame.publishing_ids
            ids_to_call = ids.intersection(frame_publishing_ids)
            ids.difference_update(frame_publishing_ids)
            if not ids:
                del waiting[confirmation]

            if not isinstance(confirmation, asyncio.Future):
                for id in ids_to_call:
                    confirmation_status = ConfirmationStatus(id, True)
                    result = confirmation(confirmation_status)
                    if result is not None and hasattr(result, "__await__"):
                        await result
            else:
                confirmation.set_result(None)

@Gsantomaggio
Copy link
Collaborator

Thank you for your feedback but we don't think the lock is the performance bottleneck. I would invest time on #133

@garvenlee
Copy link
Author

Here is a scenario: _timer blocks at writer.drain(), and at this point there are a lot of other tasks calling send concurrently. These tasks must wait _timer because of this lock.

Once _timer completes and the lock is released, a large number of waiters wait for wake-ups in sequence, and only one waiter will be waked up in each event loop. I think this will affect performance in a high-concurrency web server.

@DanielePalaia
Copy link
Collaborator

I'll close this issue as at the moment we are not planning this implementation. We are anyway noting this issue in the Performance one so in case in the future we may want to look at it again

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants