Skip to content
This repository has been archived by the owner on Jan 26, 2020. It is now read-only.

Flush support for BrotliWriter (tests currently failing) #27

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 27 additions & 17 deletions enc/encode.go
Expand Up @@ -263,6 +263,22 @@ func NewBrotliWriter(params *BrotliParams, writer io.Writer) *BrotliWriter {
}
}

// Writes the current contents of the ring buffer to the underlying BrotliCompressor
func (w *BrotliWriter) flushToCompressor(isLast, forceFlush bool) error {
compressedData, err := w.compressor.writeBrotliData(isLast, forceFlush)
if err != nil {
return err
}

if _, err = w.writer.Write(compressedData); err != nil {
return err
}

w.inRingBuffer = 0

return nil
}

func (w *BrotliWriter) Write(buffer []byte) (int, error) {
comp := w.compressor
blockSize := int(comp.getInputBlockSize())
Expand All @@ -273,17 +289,10 @@ func (w *BrotliWriter) Write(buffer []byte) (int, error) {
comp.copyInputToRingBuffer(buffer[:roomFor])
copied += roomFor

compressedData, err := comp.writeBrotliData(false, false)
if err != nil {
if err := w.flushToCompressor(false, false); err != nil {
return copied, err
}

_, err = w.writer.Write(compressedData)
if err != nil {
return copied, err
}

w.inRingBuffer = 0
buffer = buffer[roomFor:]
roomFor = blockSize
}
Expand All @@ -298,19 +307,20 @@ func (w *BrotliWriter) Write(buffer []byte) (int, error) {
return copied, nil
}

// Close cleans up the resources used by the Brotli encoder for this
// stream. If the output buffer is an io.Closer, it will also be closed.
// Flush sends any pending data immediately to the underlying writer.
func (w *BrotliWriter) Flush() error {
return w.flushToCompressor(false, true)
}

// Close cleans up the resources used by the Brotli encoder for this stream.
// Any pending data in the write buffer is flushed to the underlying stream.
// If the output buffer is an io.Closer, it will also be closed.
func (w *BrotliWriter) Close() error {
compressedData, err := w.compressor.writeBrotliData(true, false)
if err != nil {
if err := w.flushToCompressor(true, false); err != nil {
return err
}
w.compressor.free()

_, err = w.writer.Write(compressedData)
if err != nil {
return err
}
w.compressor.free()

if v, ok := w.writer.(io.Closer); ok {
return v.Close()
Expand Down
61 changes: 57 additions & 4 deletions enc/encode_test.go
Expand Up @@ -2,9 +2,10 @@ package enc

import (
"bytes"
"log"
"strings"
"testing"

"gopkg.in/kothar/brotli-go.v0/dec"
)

const (
Expand All @@ -16,7 +17,7 @@ func TestBufferSizes(T *testing.T) {
params.SetQuality(testQuality)

input1 := []byte(strings.Repeat("The quick brown fox jumps over the lazy dog", 100000))
log.Printf("q=%d, inputSize=%d\n", params.Quality(), len(input1))
T.Logf("q=%d, inputSize=%d\n", params.Quality(), len(input1))

output1 := make([]byte, len(input1)*2)
_, err := CompressBuffer(params, input1, output1)
Expand Down Expand Up @@ -48,7 +49,7 @@ func TestStreamEncode(T *testing.T) {

input1 := []byte(strings.Repeat("The quick brown fox jumps over the lazy dog", 100000))
inputSize := len(input1)
log.Printf("q=%d, inputSize=%d\n", params.Quality(), inputSize)
T.Logf("q=%d, inputSize=%d\n", params.Quality(), inputSize)

for lgwin := 16; lgwin <= 22; lgwin++ {
params.SetLgwin(lgwin)
Expand Down Expand Up @@ -100,6 +101,58 @@ func TestStreamEncode(T *testing.T) {
}

outputSize := len(fullStreamOutput)
log.Printf("lgwin=%d, rounds=%d, output=%d (%.4f%% of input size)\n", params.Lgwin(), rounds, outputSize, float32(outputSize)*100.0/float32(inputSize))
T.Logf("lgwin=%d, rounds=%d, output=%d (%.4f%% of input size)\n", params.Lgwin(), rounds, outputSize, float32(outputSize)*100.0/float32(inputSize))
}
}

func TestFlush(T *testing.T) {
params := NewBrotliParams()
params.SetQuality(testQuality)

input1 := []byte(strings.Repeat("The quick brown fox jumps over the lazy dog", 10000))
inputSize := len(input1)

for _, blockSize := range []int{324, 1623, 6125, 21126} {
T.Logf("q=%d, inputSize=%d, blockSize=%d\n", params.Quality(), inputSize, blockSize)

writerBuffer := new(bytes.Buffer)
writer := NewBrotliWriter(params, writerBuffer)
outputLength := 0

// Write small blocks
for pos := 0; pos < len(input1); pos += blockSize {
end := pos + blockSize
if end > len(input1) {
end = len(input1)
}
if _, err := writer.Write(input1[pos:end]); err != nil {
T.Error(err)
}

newOutputLength := writerBuffer.Len()
if newOutputLength == outputLength {
// Nothing was written, attempt to flush
if err := writer.Flush(); err != nil {
T.Error(err)
}
newOutputLength = writerBuffer.Len()
if newOutputLength == outputLength {
T.Error("Flush did not produce any additional output")
}
}
outputLength = newOutputLength
}
if err := writer.Close(); err != nil {
T.Error(err)
}

// Check the output is valid
decoded, err := dec.DecompressBuffer(writerBuffer.Bytes(), nil)
if err != nil {
T.Error(err)
}
if !bytes.Equal(decoded, input1) {
T.Errorf("Flushed output does not decode to same bytes as input")
}
}
}