Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a delete records api #1710

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open

Added a delete records api #1710

wants to merge 43 commits into from

Conversation

PratRanj07
Copy link
Member

Added a delete records api and created integration tests and unit tests for the same

@PratRanj07 PratRanj07 requested a review from a team as a code owner February 14, 2024 12:21
Copy link

cla-assistant bot commented Feb 14, 2024

CLA assistant check
All committers have signed the CLA.

Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed Halfway. Still reviewing. Please resolve these issues.

examples/adminapi.py Outdated Show resolved Hide resolved
examples/adminapi.py Outdated Show resolved Hide resolved
examples/adminapi.py Outdated Show resolved Hide resolved
examples/adminapi.py Outdated Show resolved Hide resolved
examples/adminapi.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more pass.

examples/adminapi.py Outdated Show resolved Hide resolved
examples/adminapi.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Show resolved Hide resolved
tests/integration/admin/test_delete_records.py Outdated Show resolved Hide resolved
tests/integration/admin/test_delete_records.py Outdated Show resolved Hide resolved
tests/integration/admin/test_delete_records.py Outdated Show resolved Hide resolved
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more review comments.

src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
tests/integration/admin/test_delete_records.py Outdated Show resolved Hide resolved
tests/integration/admin/test_delete_records.py Outdated Show resolved Hide resolved
tests/integration/admin/test_delete_records.py Outdated Show resolved Hide resolved
tests/integration/admin/test_delete_records.py Outdated Show resolved Hide resolved
tests/integration/admin/test_delete_records.py Outdated Show resolved Hide resolved
tests/integration/admin/test_delete_records.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have these concerns for the public API that will we frozen after we release it

for partition, fut in futmap.items():
try:
result = fut.result()
if result.error:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be needed, if there's an error on a single partition the future should just throw

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Told the same thing #1710 (comment)

:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
:param float operation_timeout: The operation timeout in seconds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operation timeout is only for timeouts that are sent to the broker like in CreateTopics, DeleteTopics, CreatePartitions. In this case no timeout is sent to the broker and there should be only the request_timeout

Copy link
Member

@pranavrth pranavrth Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operation_timeout is used in deleteRecords API as well.

/**
 * @brief Delete records (messages) in topic partitions older than the
 *        offsets provided.
 *
 * @param rk Client instance.
 * @param del_records The offsets to delete (up to).
 *                    Currently only one DeleteRecords_t (but containing
 *                    multiple offsets) is supported.
 * @param del_record_cnt The number of elements in del_records, must be 1.
 * @param options Optional admin options, or NULL for defaults.
 * @param rkqu Queue to emit result on.
 *
 * Supported admin options:
 *  - rd_kafka_AdminOptions_set_operation_timeout() - default 60 seconds.
 *    Controls how long the brokers will wait for records to be deleted.
 *  - rd_kafka_AdminOptions_set_request_timeout() - default socket.timeout.ms.
 *    Controls how long \c rdkafka will wait for the request to complete.
 *
 * @remark The result event type emitted on the supplied queue is of type
 *         \c RD_KAFKA_EVENT_DELETERECORDS_RESULT
 */
RD_EXPORT void rd_kafka_DeleteRecords(rd_kafka_t *rk,
                                      rd_kafka_DeleteRecords_t **del_records,
                                      size_t del_record_cnt,
                                      const rd_kafka_AdminOptions_t *options,
                                      rd_kafka_queue_t *rkqu);

for req in request:
if not isinstance(req, _TopicPartition):
raise TypeError("Element of the request list must be of type 'TopicPartition' ")
if req is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already checked by if not isinstance(req, _TopicPartition):, please remove it

const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev);
const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res);

result = c_parts_to_py(c_delete_records_res_list);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the responses are returned in the same order as the requests, so that it's possible to use _make_futmap_result_from_list

Also we should use a DeletedRecords object as in Java, instead of a TopicPartition because that could be extended in future and in that case we cannot apply the same changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are converting it to the futmap in the end keyed at TopicPartition so there is no issue in that I think.

We can create DeletedRecords class as Java instead of directly using offset in the response.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a new class DeleteRecords containing only offset and made changes according to that. Please review.

PratRanj07 and others added 11 commits May 22, 2024 16:22
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
@PratRanj07 PratRanj07 requested a review from pranavrth May 22, 2024 12:57
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there!

CHANGELOG.md Outdated Show resolved Hide resolved
examples/adminapi.py Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Show resolved Hide resolved
CHANGELOG.md Outdated Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
src/confluent_kafka/_model/__init__.py Outdated Show resolved Hide resolved
@PratRanj07 PratRanj07 requested a review from pranavrth May 27, 2024 10:36
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
@PratRanj07 PratRanj07 requested a review from pranavrth May 27, 2024 13:08
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more fixes.

src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Outdated Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
src/confluent_kafka/src/Admin.c Outdated Show resolved Hide resolved
src/confluent_kafka/admin/__init__.py Show resolved Hide resolved
Copy link
Member

@pranavrth pranavrth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work. LGTM!..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants