Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How can we figure out which partition an EOF error is for? #94

Open
gridaphobe opened this issue May 8, 2019 · 3 comments
Open

How can we figure out which partition an EOF error is for? #94

gridaphobe opened this issue May 8, 2019 · 3 comments

Comments

@gridaphobe
Copy link

The C function for consuming always returns an rd_kafka_message_t, which contains an rd_kafka_resp_err_t in the case of failure. Unfortunately this error value is not self-contained, making sense of it may require inspecting other fields of the rd_kafka_message_t. For example, if you get a partition EOF, you'll have to inspect the partition field to determine which partition the EOF applies to.

In contrast, the Haskell function for consuming is

pollMessage :: MonadIO m
            => KafkaConsumer
            -> Timeout -- ^ the timeout, in milliseconds
            -> m (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString))) -- ^ Left on error or timeout, right for success

This is a much more natural type since the error case is disjoint, but the KafkaError type is missing some crucial context. E.g. there appears to be no way for me to determine which partition hit the EOF. This is important if you want to consume the entire topic in one go and then stop.

@AlexeyRaga
Copy link
Member

AlexeyRaga commented May 9, 2019

I see, this is a piece of valuable information that we don't currently get.

We could get the partition here:
https://github.com/haskell-works/hw-kafka-client/blob/master/src/Kafka/Consumer/Convert.hs#L160

What is not clear for me (although I didn't think about it properly) is how to represent it in an error. Currently we have this KafkaResponseError RdKafkaRespErrT where RdKafkaRespErrT is that PartitionEOF, and there is no place to add a partition ID: KafkaResponseError is too broad and used in non-partition related contexts, and RdKafkaRespErrT is a generated enum:
https://github.com/haskell-works/hw-kafka-client/blob/master/src/Kafka/Types.hs#L77

Maybe we could change this type:

fromMessagePtr :: RdKafkaMessageTPtr -> IO (Either KafkaError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))

to something like

data MessageConsumeError = MessageConsumeError
  { topic :: TopicName
  , partition :: PartitionId
  , error :: KafkaError 
  }

fromMessagePtr :: RdKafkaMessageTPtr -> IO (Either MessageConsumerError (ConsumerRecord (Maybe BS.ByteString) (Maybe BS.ByteString)))

then it would allow us to reason about what happens and where in the scope of receiving messages...
We would need to think about the nullPtr case though:
https://github.com/haskell-works/hw-kafka-client/blob/master/src/Kafka/Consumer/Convert.hs#L156

@gridaphobe
Copy link
Author

Yeah, it's unfortunately kind of an awkward API to adapt to Haskell idioms.

I've been playing with a change that just models the C API faithfully, adding a crError :: RdKafkaRespErrT field to ConsumerRecord, but it just feels wrong to do that in Haskell. You also still have to account for a possible nullPtr, so you end up with errors split across two different locations.

I like your MessageConsumeError approach much better from a user perspective. Judging by https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L3235-L3237, it actually looks like rdkafka will never return a nullPtr. I doubt it guarantees that the topic and partition fields will be set for all possible errors, so there would still be some work required by users to know when topic and partition contain useful values.

@AlexeyRaga
Copy link
Member

Yes, it never returns null, unless something fails and then we need to check the global error pointer. I am not sure if it can actually happen though....

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants