diff --git a/connection_test.go b/connection_test.go index 8208698..71b8971 100644 --- a/connection_test.go +++ b/connection_test.go @@ -619,6 +619,9 @@ func TestOffsetResponseWithVersions(t *testing.T) { ts := time.Unix(0, (time.Now().UnixNano()/int64(time.Millisecond))*int64(time.Millisecond)) resp1.Topics[0].Partitions[0].TimeStamp = ts + // In kafka >= KafkaV1 there might be only one offset + resp1.Topics[0].Partitions[0].Offsets = []int64{92} + b1, err := resp1.Bytes() if err != nil { t.Fatal(err) diff --git a/integration/broker_test.go b/integration/broker_test.go index 648e941..c97949d 100644 --- a/integration/broker_test.go +++ b/integration/broker_test.go @@ -74,6 +74,25 @@ func TestProduceAndConsume(t *testing.T) { } } + // check if offsets are correct + for _, name := range topics { + offe, err := broker.OffsetEarliest(name, 0) + if err != nil { + t.Fatal(err) + } + if offe != 0 { + t.Fatalf("Should get OffsetEarliest == 0 Got %#v instead ", offe) + } + offl, err := broker.OffsetLatest(name, 0) + if err != nil { + t.Fatal(err) + } + if offl != 1 { + t.Fatalf("Should get OffsetLatest == 1. Got %#v instead ", offl) + } + + } + } func TestCompression(t *testing.T) { diff --git a/proto/messages.go b/proto/messages.go index ba69b10..5589078 100644 --- a/proto/messages.go +++ b/proto/messages.go @@ -2218,7 +2218,7 @@ type OffsetRespPartition struct { ID int32 Err error TimeStamp time.Time // >= KafkaV1 only - Offsets []int64 + Offsets []int64 // used in KafkaV0 } func ReadOffsetResp(r io.Reader) (*OffsetResp, error) { @@ -2261,16 +2261,21 @@ func ReadVersionedOffsetResp(r io.Reader, version int16) (*OffsetResp, error) { if version >= KafkaV1 { p.TimeStamp = time.Unix(0, dec.DecodeInt64()*int64(time.Millisecond)) - } - len, err = dec.DecodeArrayLen() - if err != nil { - return nil, err - } - p.Offsets = make([]int64, len) + // in kafka >= KafkaV1 offset can be only one number. + // But for compatibility we still use slice + offset := dec.DecodeInt64() + p.Offsets = []int64{offset} + } else { + len, err = dec.DecodeArrayLen() + if err != nil { + return nil, err + } + p.Offsets = make([]int64, len) - for oi := range p.Offsets { - p.Offsets[oi] = dec.DecodeInt64() + for oi := range p.Offsets { + p.Offsets[oi] = dec.DecodeInt64() + } } } } @@ -2303,11 +2308,19 @@ func (r *OffsetResp) Bytes() ([]byte, error) { if r.Version >= KafkaV1 { enc.Encode(part.TimeStamp.UnixNano() / int64(time.Millisecond)) - } - enc.EncodeArrayLen(len(part.Offsets)) - for _, off := range part.Offsets { - enc.Encode(off) + // in kafka >= KafkaV1 offset can be only one value. + // In this case we use first element of slice + var offset int64 + if len(part.Offsets) > 0 { + offset = part.Offsets[0] + } + enc.Encode(offset) + } else { + enc.EncodeArrayLen(len(part.Offsets)) + for _, off := range part.Offsets { + enc.Encode(off) + } } } }