Skip to content

Commit

Permalink
Switch to snappy framing
Browse files Browse the repository at this point in the history
  • Loading branch information
olegbespalov committed Jun 28, 2023
1 parent 1df72a0 commit a25735a
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 15 deletions.
7 changes: 7 additions & 0 deletions cloudapi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Config struct {
// This is how many concurrent pushes will be done at the same time to the cloud
MetricPushConcurrency null.Int `json:"metricPushConcurrency" envconfig:"K6_CLOUD_METRIC_PUSH_CONCURRENCY"`

MetricCompression null.String `json:"metricCompression" envconfig:"K6_CLOUD_METRIC_COMPRESSION"`

// Indicates whether to send traces to the k6 Insights backend service.
TracesEnabled null.Bool `json:"tracesEnabled" envconfig:"K6_CLOUD_TRACES_ENABLED"`

Expand Down Expand Up @@ -267,6 +269,11 @@ func (c Config) Apply(cfg Config) Config {
if cfg.AggregationOutlierIqrCoefUpper.Valid {
c.AggregationOutlierIqrCoefUpper = cfg.AggregationOutlierIqrCoefUpper
}

if cfg.MetricCompression.Valid {
c.MetricCompression = cfg.MetricCompression
}

return c
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/fatih/color v1.15.0
github.com/go-sourcemap/sourcemap v2.1.4-0.20211119122758-180fcef48034+incompatible
github.com/golang/protobuf v1.5.3
github.com/golang/snappy v0.0.4
github.com/gorilla/websocket v1.5.0
github.com/grafana/xk6-browser v0.10.0
github.com/grafana/xk6-grpc v0.1.2
Expand Down Expand Up @@ -62,7 +63,6 @@ require (
github.com/dlclark/regexp2 v1.9.0 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand Down
77 changes: 68 additions & 9 deletions output/cloud/expv2/metrics_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"net/http"
"strings"

"github.com/klauspost/compress/snappy"
"github.com/golang/snappy"
"google.golang.org/protobuf/proto"

"go.k6.io/k6/cloudapi"
Expand All @@ -22,10 +22,16 @@ import (
type metricsClient struct {
httpClient *cloudapi.Client
url string
compressor compressor
}

type compressor interface {
Compress(b []byte) ([]byte, error)
EncodingHeader() string
}

// newMetricsClient creates and initializes a new MetricsClient.
func newMetricsClient(c *cloudapi.Client, testRunID string) (*metricsClient, error) {
func newMetricsClient(c *cloudapi.Client, testRunID, compression string) (*metricsClient, error) {
// The cloudapi.Client works across different versions of the API, the test
// lifecycle management is under /v1 instead the metrics ingestion is /v2.
// Unfortunately, the current client has v1 hard-coded so we need to trim the wrong path
Expand All @@ -40,15 +46,24 @@ func newMetricsClient(c *cloudapi.Client, testRunID string) (*metricsClient, err
if testRunID == "" {
return nil, errors.New("TestRunID of the test is required")
}

// by default, use snappy compression, but allow overriding it
// with snappy framed
var comp compressor = &snappyCompressor{}
if compression == "snappy-framed" {
comp = &snappyFramedCompressor{}
}

return &metricsClient{
httpClient: c,
url: strings.TrimSuffix(u, "/v1") + "/v2/metrics/" + testRunID,
compressor: comp,
}, nil
}

// Push the provided metrics for the given test run ID.
func (mc *metricsClient) push(samples *pbcloud.MetricSet) error {
b, err := newRequestBody(samples)
b, err := newRequestBody(mc.compressor, samples)
if err != nil {
return err
}
Expand All @@ -64,7 +79,7 @@ func (mc *metricsClient) push(samples *pbcloud.MetricSet) error {
}

req.Header.Set("Content-Type", "application/x-protobuf")
req.Header.Set("Content-Encoding", "snappy")
req.Header.Set("Content-Encoding", mc.compressor.EncodingHeader())
req.Header.Set("K6-Metrics-Protocol-Version", "2.0")

err = mc.httpClient.Do(req, nil)
Expand All @@ -75,18 +90,62 @@ func (mc *metricsClient) push(samples *pbcloud.MetricSet) error {
return nil
}

func newRequestBody(data *pbcloud.MetricSet) ([]byte, error) {
func newRequestBody(compressor compressor, data *pbcloud.MetricSet) ([]byte, error) {
b, err := proto.Marshal(data)
if err != nil {
return nil, fmt.Errorf("encoding metrics as Protobuf write request failed: %w", err)
}
// TODO: use the framing format
// https://github.com/google/snappy/blob/main/framing_format.txt
// It can be done replacing the encode with
// https://pkg.go.dev/github.com/klauspost/compress/snappy#NewBufferedWriter

b, err = compressor.Compress(b)
if err != nil {
return nil, fmt.Errorf("metrics request compression failed: %w", err)
}

return b, nil
}

// snappyCompressor
type snappyCompressor struct{}

var _ compressor = (*snappyCompressor)(nil)

func (s *snappyCompressor) Compress(b []byte) ([]byte, error) {
if snappy.MaxEncodedLen(len(b)) < 0 {
return nil, fmt.Errorf("the Protobuf message is too large to be handled by Snappy encoder; "+
"size: %d, limit: %d", len(b), 0xffffffff)
}

return snappy.Encode(nil, b), nil
}

func (s *snappyCompressor) EncodingHeader() string {
return "snappy"
}

// See https://github.com/google/snappy/blob/main/framing_format.txt
type snappyFramedCompressor struct{}

var _ compressor = (*snappyFramedCompressor)(nil)

func (s *snappyFramedCompressor) Compress(b []byte) ([]byte, error) {
var buf bytes.Buffer
writer := snappy.NewBufferedWriter(&buf)

_, err := writer.Write(b)
if err != nil {
_ = writer.Close()

return nil, err
}

err = writer.Close()
if err != nil {
return nil, err
}

return buf.Bytes(), nil
}

func (s *snappyFramedCompressor) EncodingHeader() string {
return "x-snappy-framed"
}
15 changes: 11 additions & 4 deletions output/cloud/expv2/metrics_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func TestMetricsClientPush(t *testing.T) {
assert.Equal(t, http.MethodPost, r.Method)
assert.Equal(t, "Token fake-token", r.Header.Get("Authorization"))
assert.Contains(t, r.Header.Get("User-Agent"), "k6cloud/v0.4")
assert.Equal(t, "application/x-protobuf", r.Header.Get("Content-Type"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
assert.Equal(t, "2.0", r.Header.Get("K6-Metrics-Protocol-Version"))
b, err := io.ReadAll(r.Body)
Expand All @@ -36,10 +35,18 @@ func TestMetricsClientPush(t *testing.T) {
defer ts.Close()

c := cloudapi.NewClient(nil, "fake-token", ts.URL, "k6cloud/v0.4", 1*time.Second)
mc, err := newMetricsClient(c, "test-ref-id")
mc, err := newMetricsClient(c, "test-ref-id", "snappy")
require.NoError(t, err)

mset := pbcloud.MetricSet{}
mset := pbcloud.MetricSet{
Metrics: []*pbcloud.Metric{
{
Name: "http_reqs",
Type: pbcloud.MetricType_METRIC_TYPE_COUNTER,
TimeSeries: []*pbcloud.TimeSeries{},
},
},
}
err = mc.push(&mset)
require.NoError(t, err)
assert.Equal(t, 1, reqs)
Expand All @@ -55,7 +62,7 @@ func TestMetricsClientPushUnexpectedStatus(t *testing.T) {
defer ts.Close()

c := cloudapi.NewClient(nil, "fake-token", ts.URL, "k6cloud/v0.4", 1*time.Second)
mc, err := newMetricsClient(c, "test-ref-id")
mc, err := newMetricsClient(c, "test-ref-id", "snappy")
require.NoError(t, err)

err = mc.push(nil)
Expand Down
2 changes: 1 addition & 1 deletion output/cloud/expv2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (o *Output) Start() error {
return fmt.Errorf("failed to initialize the samples collector: %w", err)
}

mc, err := newMetricsClient(o.cloudClient, o.referenceID)
mc, err := newMetricsClient(o.cloudClient, o.referenceID, o.config.MetricCompression.String)
if err != nil {
return fmt.Errorf("failed to initialize the http metrics flush client: %w", err)
}
Expand Down

0 comments on commit a25735a

Please sign in to comment.