Skip to content

Commit

Permalink
Merge pull request #103 from optiopay/fix_offsetresponse
Browse files Browse the repository at this point in the history
fixed bug in OffsetResponse
  • Loading branch information
e-max committed Apr 17, 2018
2 parents 5e9b2f6 + cdb2b83 commit 004b9ef
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 13 deletions.
3 changes: 3 additions & 0 deletions connection_test.go
Expand Up @@ -619,6 +619,9 @@ func TestOffsetResponseWithVersions(t *testing.T) {
ts := time.Unix(0, (time.Now().UnixNano()/int64(time.Millisecond))*int64(time.Millisecond))
resp1.Topics[0].Partitions[0].TimeStamp = ts

// In kafka >= KafkaV1 there might be only one offset
resp1.Topics[0].Partitions[0].Offsets = []int64{92}

b1, err := resp1.Bytes()
if err != nil {
t.Fatal(err)
Expand Down
19 changes: 19 additions & 0 deletions integration/broker_test.go
Expand Up @@ -74,6 +74,25 @@ func TestProduceAndConsume(t *testing.T) {
}
}

// check if offsets are correct
for _, name := range topics {
offe, err := broker.OffsetEarliest(name, 0)
if err != nil {
t.Fatal(err)
}
if offe != 0 {
t.Fatalf("Should get OffsetEarliest == 0 Got %#v instead ", offe)
}
offl, err := broker.OffsetLatest(name, 0)
if err != nil {
t.Fatal(err)
}
if offl != 1 {
t.Fatalf("Should get OffsetLatest == 1. Got %#v instead ", offl)
}

}

}

func TestCompression(t *testing.T) {
Expand Down
39 changes: 26 additions & 13 deletions proto/messages.go
Expand Up @@ -2218,7 +2218,7 @@ type OffsetRespPartition struct {
ID int32
Err error
TimeStamp time.Time // >= KafkaV1 only
Offsets []int64
Offsets []int64 // used in KafkaV0
}

func ReadOffsetResp(r io.Reader) (*OffsetResp, error) {
Expand Down Expand Up @@ -2261,16 +2261,21 @@ func ReadVersionedOffsetResp(r io.Reader, version int16) (*OffsetResp, error) {

if version >= KafkaV1 {
p.TimeStamp = time.Unix(0, dec.DecodeInt64()*int64(time.Millisecond))
}

len, err = dec.DecodeArrayLen()
if err != nil {
return nil, err
}
p.Offsets = make([]int64, len)
// in kafka >= KafkaV1 offset can be only one number.
// But for compatibility we still use slice
offset := dec.DecodeInt64()
p.Offsets = []int64{offset}
} else {
len, err = dec.DecodeArrayLen()
if err != nil {
return nil, err
}
p.Offsets = make([]int64, len)

for oi := range p.Offsets {
p.Offsets[oi] = dec.DecodeInt64()
for oi := range p.Offsets {
p.Offsets[oi] = dec.DecodeInt64()
}
}
}
}
Expand Down Expand Up @@ -2303,11 +2308,19 @@ func (r *OffsetResp) Bytes() ([]byte, error) {

if r.Version >= KafkaV1 {
enc.Encode(part.TimeStamp.UnixNano() / int64(time.Millisecond))
}

enc.EncodeArrayLen(len(part.Offsets))
for _, off := range part.Offsets {
enc.Encode(off)
// in kafka >= KafkaV1 offset can be only one value.
// In this case we use first element of slice
var offset int64
if len(part.Offsets) > 0 {
offset = part.Offsets[0]
}
enc.Encode(offset)
} else {
enc.EncodeArrayLen(len(part.Offsets))
for _, off := range part.Offsets {
enc.Encode(off)
}
}
}
}
Expand Down

0 comments on commit 004b9ef

Please sign in to comment.