You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Could we have a method that combines Batch.Read() and Batch.ReadMessage()
Where if there is enough space in msg.Value and msg.Key it will reuse those buffers otherwise create a new buffer.
Example usage.
// to consume messagestopic:="my-topic"partition:=0conn, err:=kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
iferr!=nil {
log.Fatal("failed to dial leader:", err)
}
conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch:=conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB maxmsg:= kafka.Message{
Value: make([]byte, 10e3), // 10KB max per messageKey: make([]byte, 256), // 256 bytes max per message
}
for {
err:=batch.ReadIntoMessage(&msg)
iferr!=nil {
break
}
fmt.Println(string(msg.Value))
}
iferr:=batch.Close(); err!=nil {
log.Fatal("failed to close batch:", err)
}
iferr:=conn.Close(); err!=nil {
log.Fatal("failed to close connection:", err)
}
I could provide a PR if you guys are interested.
The text was updated successfully, but these errors were encountered:
Describe the solution you would like
Could we have a method that combines Batch.Read() and Batch.ReadMessage()
Where if there is enough space in msg.Value and msg.Key it will reuse those buffers otherwise create a new buffer.
I could provide a PR if you guys are interested.
The text was updated successfully, but these errors were encountered: