-
Notifications
You must be signed in to change notification settings - Fork 878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a delete records api #1710
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed Halfway. Still reviewing. Please resolve these issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more review comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have these concerns for the public API that will we frozen after we release it
examples/adminapi.py
Outdated
for partition, fut in futmap.items(): | ||
try: | ||
result = fut.result() | ||
if result.error: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be needed, if there's an error on a single partition the future should just throw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Told the same thing #1710 (comment)
:param float request_timeout: The overall request timeout in seconds, | ||
including broker lookup, request transmission, operation time | ||
on broker, and response. Default: `socket.timeout.ms*1000.0` | ||
:param float operation_timeout: The operation timeout in seconds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Operation timeout is only for timeouts that are sent to the broker like in CreateTopics, DeleteTopics, CreatePartitions. In this case no timeout is sent to the broker and there should be only the request_timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
operation_timeout
is used in deleteRecords API as well.
/**
* @brief Delete records (messages) in topic partitions older than the
* offsets provided.
*
* @param rk Client instance.
* @param del_records The offsets to delete (up to).
* Currently only one DeleteRecords_t (but containing
* multiple offsets) is supported.
* @param del_record_cnt The number of elements in del_records, must be 1.
* @param options Optional admin options, or NULL for defaults.
* @param rkqu Queue to emit result on.
*
* Supported admin options:
* - rd_kafka_AdminOptions_set_operation_timeout() - default 60 seconds.
* Controls how long the brokers will wait for records to be deleted.
* - rd_kafka_AdminOptions_set_request_timeout() - default socket.timeout.ms.
* Controls how long \c rdkafka will wait for the request to complete.
*
* @remark The result event type emitted on the supplied queue is of type
* \c RD_KAFKA_EVENT_DELETERECORDS_RESULT
*/
RD_EXPORT void rd_kafka_DeleteRecords(rd_kafka_t *rk,
rd_kafka_DeleteRecords_t **del_records,
size_t del_record_cnt,
const rd_kafka_AdminOptions_t *options,
rd_kafka_queue_t *rkqu);
for req in request: | ||
if not isinstance(req, _TopicPartition): | ||
raise TypeError("Element of the request list must be of type 'TopicPartition' ") | ||
if req is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is already checked by if not isinstance(req, _TopicPartition):
, please remove it
src/confluent_kafka/src/Admin.c
Outdated
const rd_kafka_DeleteRecords_result_t *c_delete_records_res = rd_kafka_event_DeleteRecords_result(rkev); | ||
const rd_kafka_topic_partition_list_t *c_delete_records_res_list = rd_kafka_DeleteRecords_result_offsets(c_delete_records_res); | ||
|
||
result = c_parts_to_py(c_delete_records_res_list); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure the responses are returned in the same order as the requests, so that it's possible to use _make_futmap_result_from_list
Also we should use a DeletedRecords
object as in Java, instead of a TopicPartition
because that could be extended in future and in that case we cannot apply the same changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are converting it to the futmap in the end keyed at TopicPartition
so there is no issue in that I think.
We can create DeletedRecords
class as Java instead of directly using offset
in the response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a new class DeleteRecords containing only offset and made changes according to that. Please review.
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more fixes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work. LGTM!..
Added a delete records api and created integration tests and unit tests for the same