From 564d9abadc319c8753f6389298a9d47af746c0d5 Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Mon, 21 Jan 2019 14:43:27 +0100 Subject: [PATCH 1/3] fix protocol bug related to wrong record headers length --- proto/messages.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/proto/messages.go b/proto/messages.go index fd6b96e..bef767d 100644 --- a/proto/messages.go +++ b/proto/messages.go @@ -476,10 +476,7 @@ func readRecord(r io.Reader) (*Record, error) { rec.Key = dec.DecodeVarBytes() rec.Value = dec.DecodeVarBytes() - len, err := dec.DecodeArrayLen() - if err != nil { - return nil, err - } + len := dec.DecodeVarInt() rec.Headers = make([]RecordHeader, len) for i := range rec.Headers { From 2d53b24ccbe04897e0ac357c8db35fdff5673f14 Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Mon, 21 Jan 2019 15:26:24 +0100 Subject: [PATCH 2/3] add tests --- proto/messages_test.go | 61 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/proto/messages_test.go b/proto/messages_test.go index e4e08b7..b885e25 100644 --- a/proto/messages_test.go +++ b/proto/messages_test.go @@ -760,6 +760,67 @@ func TestFetchResponseWithRecordBatch2(t *testing.T) { } +func TestFetchResponseWithRecordBatchWithMultipleRecords(t *testing.T) { + + messageFetchResponseV5MultipleRecords := []byte{ + 0, 0, 1, 205, //size + 0, 0, 0, 63, //correlation, id + 0, 0, 0, 0, //throttle, time + 0, 0, 0, 1, //number, of, topics + 0, 4, 97, 117, 116, 104, // name of the topic ( auth) + 0, 0, 0, 1, //number of partitions + 0, 0, 0, 0, // partition id + 0, 0, // error + 0, 0, 0, 0, 0, 0, 1, 121, //, hight, water, martk + 255, 255, 255, 255, 255, 255, 255, 255, //, last, stable, offset + 0, 0, 0, 0, 0, 0, 0, 0, //, log, start, offset + 255, 255, 255, 255, //, , + 0, 0, 1, 145, //, size, of, batch + 0, 0, 0, 0, 0, 0, 1, 72, //, first, offset + 0, 0, 1, 133, //, length + 0, 0, 0, 2, //, partition, leader, epoch + 2, //, magic, + 7, 177, 219, 215, //crc, + 0, 0, //, attr + 0, 0, 0, 1, //, last, offset, delta + 255, 255, 255, 255, 255, 255, 255, 255, //first, timestampt + 255, 255, 255, 255, 255, 255, 255, 255, //, max, timestampt + 255, 255, 255, 255, 255, 255, 255, 255, //, producer, id + 255, 255, //, producer, epoch + 255, 255, 255, 255, //, base, sequence + 0, 0, 0, 2, //, number, of, records + 212, 2, //size, + 0, //, attribute + 0, //, timestampt, delta + 0, //, offset, delta + 28, //, key, length + 8, 128, 254, 7, 16, 131, 214, 136, 178, 165, 138, 220, 153, 2, 170, 2, 1, 1, 16, 17, 2, 32, 246, 240, 230, 136, 163, 28, //, key + 67, //, + 50, 175, 30, 87, 26, 228, 189, 143, 130, 80, 116, 194, 56, 70, 130, 136, 212, 23, 149, 222, 38, 125, 51, 192, 151, 107, 0, 0, 0, 10, 52, 10, 24, 102, 113, 109, 111, 106, 116, 50, 97, 113, 108, 119, 121, 101, 51, 118, 103, 116, 118, 114, 118, 105, 108, 119, 112, 24, 180, 149, 247, 197, 241, 159, 161, 188, 21, + 32, 180, 249, 142, 150, 132, 160, 161, 188, 21, 42, 4, 104, 116, 116, 112, 18, 32, 246, 240, 230, 136, 163, 28, 67, 50, 175, 30, 87, 26, 228, 189, 143, 130, 80, 116, 194, 56, 70, 130, 136, 212, 23, 149, 222, 38, 125, 51, 192, 151, 26, 17, 111, 114, 103, 97, 110, 105, 122, 97, 116, 105, 111, 110, 58, 114, 101, 97, 100, 0, + 204, 2, 0, 0, 2, 28, 8, 128, 254, 7, 16, 132, 214, 136, 178, 165, 138, 220, 153, 2, 162, 2, 1, 1, 16, 17, 2, 32, 246, 240, 230, 136, 163, 28, 67, 50, 175, 30, 87, 26, 228, 189, 143, 130, 80, 116, 194, 56, 70, 130, 136, 212, 23, 149, 222, 38, 125, 51, 192, 151, 103, 0, 0, 0, 10, 52, 10, 24, 119, 51, 120, 104, 55, 107, 118, 104, 104, 118, 112, 55, 104, 113, 111, 103, 119, 97, 108, 53, 55, 102, 100, 102, 24, 229, 183, 247, 197, 241, 159, 161, 188, 21, 32, 229, 155, 143, 150, 132, 160, 161, 188, 21, 42, 4, 104, 116, 116, 112, 18, 32, 246, 240, 230, 136, 163, 28, 67, 50, 175, 30, 87, 26, 228, 189, 143, 130, 80, 116, 194, 56, 70, 130, 136, 212, 23, 149, 222, 38, 125, 51, 192, 151, 26, 13, 115, 117, 112, 112, 108, 105, 101, 114, 58, 114, 101, 97, 100, 0, + } + + resp, err := ReadVersionedFetchResp(bytes.NewReader(messageFetchResponseV5MultipleRecords), 5) + if err != nil { + t.Fatal(err) + } + + records := resp.Topics[0].Partitions[0].RecordBatch.Records + + if len(records) != 2 { + t.Fatal("Expect 2 records") + } + + if records[0].Length != 170 { + t.Fatal("Wrong record length") + } + if records[1].Length != 166 { + t.Fatal("Wrong record length") + } + +} + func TestConsumerMetadataWithVersions(t *testing.T) { respV0 := ConsumerMetadataResp{ Version: 0, From 3373258a5bda47f9798e0d4beeda231d0a324a7d Mon Sep 17 00:00:00 2001 From: Max Lavrenov Date: Mon, 21 Jan 2019 15:31:22 +0100 Subject: [PATCH 3/3] rename variable --- proto/messages.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proto/messages.go b/proto/messages.go index bef767d..a6416bb 100644 --- a/proto/messages.go +++ b/proto/messages.go @@ -476,9 +476,9 @@ func readRecord(r io.Reader) (*Record, error) { rec.Key = dec.DecodeVarBytes() rec.Value = dec.DecodeVarBytes() - len := dec.DecodeVarInt() + headersLen := dec.DecodeVarInt() - rec.Headers = make([]RecordHeader, len) + rec.Headers = make([]RecordHeader, headersLen) for i := range rec.Headers { rec.Headers[i].Key = dec.DecodeVarString() rec.Headers[i].Value = dec.DecodeVarBytes()