Skip to content

Commit

Permalink
Merge pull request #108 from optiopay/bug_in_record_headers_size
Browse files Browse the repository at this point in the history
fix protocol bug related to wrong record headers length
  • Loading branch information
e-max committed Jan 21, 2019
2 parents 969d39f + 3373258 commit 6c12652
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 5 deletions.
7 changes: 2 additions & 5 deletions proto/messages.go
Expand Up @@ -476,12 +476,9 @@ func readRecord(r io.Reader) (*Record, error) {
rec.Key = dec.DecodeVarBytes()
rec.Value = dec.DecodeVarBytes()

len, err := dec.DecodeArrayLen()
if err != nil {
return nil, err
}
headersLen := dec.DecodeVarInt()

rec.Headers = make([]RecordHeader, len)
rec.Headers = make([]RecordHeader, headersLen)
for i := range rec.Headers {
rec.Headers[i].Key = dec.DecodeVarString()
rec.Headers[i].Value = dec.DecodeVarBytes()
Expand Down
61 changes: 61 additions & 0 deletions proto/messages_test.go
Expand Up @@ -760,6 +760,67 @@ func TestFetchResponseWithRecordBatch2(t *testing.T) {

}

func TestFetchResponseWithRecordBatchWithMultipleRecords(t *testing.T) {

messageFetchResponseV5MultipleRecords := []byte{
0, 0, 1, 205, //size
0, 0, 0, 63, //correlation, id
0, 0, 0, 0, //throttle, time
0, 0, 0, 1, //number, of, topics
0, 4, 97, 117, 116, 104, // name of the topic ( auth)
0, 0, 0, 1, //number of partitions
0, 0, 0, 0, // partition id
0, 0, // error
0, 0, 0, 0, 0, 0, 1, 121, //, hight, water, martk
255, 255, 255, 255, 255, 255, 255, 255, //, last, stable, offset
0, 0, 0, 0, 0, 0, 0, 0, //, log, start, offset
255, 255, 255, 255, //, ,
0, 0, 1, 145, //, size, of, batch
0, 0, 0, 0, 0, 0, 1, 72, //, first, offset
0, 0, 1, 133, //, length
0, 0, 0, 2, //, partition, leader, epoch
2, //, magic,
7, 177, 219, 215, //crc,
0, 0, //, attr
0, 0, 0, 1, //, last, offset, delta
255, 255, 255, 255, 255, 255, 255, 255, //first, timestampt
255, 255, 255, 255, 255, 255, 255, 255, //, max, timestampt
255, 255, 255, 255, 255, 255, 255, 255, //, producer, id
255, 255, //, producer, epoch
255, 255, 255, 255, //, base, sequence
0, 0, 0, 2, //, number, of, records
212, 2, //size,
0, //, attribute
0, //, timestampt, delta
0, //, offset, delta
28, //, key, length
8, 128, 254, 7, 16, 131, 214, 136, 178, 165, 138, 220, 153, 2, 170, 2, 1, 1, 16, 17, 2, 32, 246, 240, 230, 136, 163, 28, //, key
67, //,
50, 175, 30, 87, 26, 228, 189, 143, 130, 80, 116, 194, 56, 70, 130, 136, 212, 23, 149, 222, 38, 125, 51, 192, 151, 107, 0, 0, 0, 10, 52, 10, 24, 102, 113, 109, 111, 106, 116, 50, 97, 113, 108, 119, 121, 101, 51, 118, 103, 116, 118, 114, 118, 105, 108, 119, 112, 24, 180, 149, 247, 197, 241, 159, 161, 188, 21,
32, 180, 249, 142, 150, 132, 160, 161, 188, 21, 42, 4, 104, 116, 116, 112, 18, 32, 246, 240, 230, 136, 163, 28, 67, 50, 175, 30, 87, 26, 228, 189, 143, 130, 80, 116, 194, 56, 70, 130, 136, 212, 23, 149, 222, 38, 125, 51, 192, 151, 26, 17, 111, 114, 103, 97, 110, 105, 122, 97, 116, 105, 111, 110, 58, 114, 101, 97, 100, 0,
204, 2, 0, 0, 2, 28, 8, 128, 254, 7, 16, 132, 214, 136, 178, 165, 138, 220, 153, 2, 162, 2, 1, 1, 16, 17, 2, 32, 246, 240, 230, 136, 163, 28, 67, 50, 175, 30, 87, 26, 228, 189, 143, 130, 80, 116, 194, 56, 70, 130, 136, 212, 23, 149, 222, 38, 125, 51, 192, 151, 103, 0, 0, 0, 10, 52, 10, 24, 119, 51, 120, 104, 55, 107, 118, 104, 104, 118, 112, 55, 104, 113, 111, 103, 119, 97, 108, 53, 55, 102, 100, 102, 24, 229, 183, 247, 197, 241, 159, 161, 188, 21, 32, 229, 155, 143, 150, 132, 160, 161, 188, 21, 42, 4, 104, 116, 116, 112, 18, 32, 246, 240, 230, 136, 163, 28, 67, 50, 175, 30, 87, 26, 228, 189, 143, 130, 80, 116, 194, 56, 70, 130, 136, 212, 23, 149, 222, 38, 125, 51, 192, 151, 26, 13, 115, 117, 112, 112, 108, 105, 101, 114, 58, 114, 101, 97, 100, 0,
}

resp, err := ReadVersionedFetchResp(bytes.NewReader(messageFetchResponseV5MultipleRecords), 5)
if err != nil {
t.Fatal(err)
}

records := resp.Topics[0].Partitions[0].RecordBatch.Records

if len(records) != 2 {
t.Fatal("Expect 2 records")
}

if records[0].Length != 170 {
t.Fatal("Wrong record length")
}
if records[1].Length != 166 {
t.Fatal("Wrong record length")
}

}

func TestConsumerMetadataWithVersions(t *testing.T) {
respV0 := ConsumerMetadataResp{
Version: 0,
Expand Down

0 comments on commit 6c12652

Please sign in to comment.