Skip to content

Commit

Permalink
feat(spanner): Adding GFE Latency and Header Missing Count Metrics (#…
Browse files Browse the repository at this point in the history
…5199)

* feat(storage): allow retry ErrorFunc configs (#5166)

Allows users to supply a custom ErrorFunc which will be used
to determine whether the error is retryable.

* Changes in test case

* feat: changes to tests

* feat: fixes for errors

* featt: changes to imports

* feat: linting changes

* fix: changes as per review

* fix: changes as per review

* fix: changes as per review

* fix: changes as per review

Co-authored-by: Chris Cotter <cjcotter@google.com>
Co-authored-by: rahul2393 <rahulyadavsep92@gmail.com>
  • Loading branch information
3 people committed Jan 4, 2022
1 parent d0ff35e commit 3d8a9ea
Show file tree
Hide file tree
Showing 11 changed files with 502 additions and 14 deletions.
58 changes: 53 additions & 5 deletions spanner/batch.go
Expand Up @@ -23,8 +23,12 @@ import (
"log"
"time"

"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/proto"
"github.com/googleapis/gax-go/v2"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting
Expand Down Expand Up @@ -128,6 +132,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
if err != nil {
return nil, err
}
var md metadata.MD
resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.PartitionReadRequest{
Session: sid,
Transaction: ts,
Expand All @@ -136,7 +141,13 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
Columns: columns,
KeySet: kset,
PartitionOptions: opt.toProto(),
})
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "PartitionReadUsingIndexWithOptions"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Expand Down Expand Up @@ -180,6 +191,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
if err != nil {
return nil, err
}
var md metadata.MD

// request Partitions
req := &sppb.PartitionQueryRequest{
Expand All @@ -190,7 +202,13 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
Params: params,
ParamTypes: paramTypes,
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req)
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Expand Down Expand Up @@ -251,7 +269,16 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
}
t.sh = nil
sid, client := sh.getID(), sh.getClient()
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid})

var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Cleanup"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}

if err != nil {
var logger *log.Logger
if sh.session != nil {
Expand Down Expand Up @@ -280,7 +307,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
// Read or query partition.
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.StreamingRead(ctx, &sppb.ReadRequest{
client, err := client.StreamingRead(ctx, &sppb.ReadRequest{
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Expand All @@ -291,10 +318,20 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
})
if err != nil {
return client, err
}
md, err := client.Header()
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
return client, err
}
} else {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Expand All @@ -305,6 +342,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
})
if err != nil {
return client, err
}
md, err := client.Header()

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
return client, err
}
}
return stream(
Expand Down
7 changes: 7 additions & 0 deletions spanner/client.go
Expand Up @@ -80,6 +80,7 @@ type Client struct {
idleSessions *sessionPool
logger *log.Logger
qo QueryOptions
ct *commonTags
}

// DatabaseName returns the full name of a database, e.g.,
Expand Down Expand Up @@ -204,6 +205,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
idleSessions: sp,
logger: config.logger,
qo: getQueryOptions(config.QueryOptions),
ct: getCommonTags(sc),
}
return c, nil
}
Expand Down Expand Up @@ -280,6 +282,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.sh = sh
return nil
}
t.ct = c.ct
return t
}

Expand All @@ -300,6 +303,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.ct = c.ct
return t
}

Expand Down Expand Up @@ -368,6 +372,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.ct = c.ct
return t, nil
}

Expand Down Expand Up @@ -395,6 +400,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.ct = c.ct
return t
}

Expand Down Expand Up @@ -480,6 +486,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txOpts = options
t.ct = c.ct

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
"Starting transaction attempt")
Expand Down
1 change: 1 addition & 0 deletions spanner/go.mod
Expand Up @@ -3,6 +3,7 @@ module cloud.google.com/go/spanner
go 1.11

require (
github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect
cloud.google.com/go v0.99.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
Expand Down
3 changes: 2 additions & 1 deletion spanner/go.sum
Expand Up @@ -50,8 +50,9 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down
80 changes: 80 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -39,6 +39,7 @@ import (
"cloud.google.com/go/internal/uid"
database "cloud.google.com/go/spanner/admin/database/apiv1"
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
"go.opencensus.io/stats/view"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
Expand Down Expand Up @@ -3264,6 +3265,85 @@ func TestIntegration_DirectPathFallback(t *testing.T) {
}
}

func TestIntegration_GFE_Latency(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView)
GFELatencyMetricsEnabled = true

client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
defer cleanup()

singerColumns := []string{"SingerId", "FirstName", "LastName"}
var ms = []*Mutation{
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
_, err := client.Apply(ctx, ms)
if err != nil {
t.Fatalf("Could not insert rows to table. Got error %v", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId", "FirstName", "LastName"})
if err != nil {
t.Fatalf("Could not read row. Got error %v", err)
}
waitErr := &Error{}
waitFor(t, func() error {
select {
case stat := <-te.Stats:
if len(stat.Rows) > 0 {
return nil
}
}
return waitErr
})

var viewMap = map[string]bool{statsPrefix + "gfe_latency": false,
statsPrefix + "gfe_header_missing_count": false,
}

for {
if viewMap[statsPrefix+"gfe_latency"] || viewMap[statsPrefix+"gfe_header_missing_count"] {
break
}
select {
case stat := <-te.Stats:
if len(stat.Rows) == 0 {
t.Fatal("No metrics are exported")
}
if stat.View.Measure.Name() != statsPrefix+"gfe_latency" && stat.View.Measure.Name() != statsPrefix+"gfe_header_missing_count" {
t.Fatalf("Incorrect measure: got %v, want %v", stat.View.Measure.Name(), statsPrefix+"gfe_latency or "+statsPrefix+"gfe_header_missing_count")
} else {
viewMap[stat.View.Measure.Name()] = true
}
for _, row := range stat.Rows {
m := getTagMap(row.Tags)
checkCommonTagsGFELatency(t, m)
var data string
switch row.Data.(type) {
default:
data = fmt.Sprintf("%v", row.Data)
case *view.CountData:
data = fmt.Sprintf("%v", row.Data.(*view.CountData).Value)
case *view.LastValueData:
data = fmt.Sprintf("%v", row.Data.(*view.LastValueData).Value)
case *view.DistributionData:
data = fmt.Sprintf("%v", row.Data.(*view.DistributionData).Count)
}
if got, want := fmt.Sprintf("%v", data), "0"; got <= want {
t.Fatalf("Incorrect data: got %v, wanted more than %v for metric %v", got, want, stat.View.Measure.Name())
}
}
case <-time.After(10 * time.Second):
if !viewMap[statsPrefix+"gfe_latency"] && !viewMap[statsPrefix+"gfe_header_missing_count"] {
t.Fatal("no stats were exported before timeout")
}
}
}
DisableGfeLatencyAndHeaderMissingCountViews()
}

// Prepare initializes Cloud Spanner testing DB and clients.
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
if databaseAdmin == nil {
Expand Down
10 changes: 10 additions & 0 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -32,7 +32,9 @@ import (
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/genproto/googleapis/rpc/status"
spannerpb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
gstatus "google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -687,6 +689,10 @@ func (s *inMemSpannerServer) BatchCreateSessions(ctx context.Context, req *spann
s.totalSessionsCreated++
s.sessions[sessionName] = sessions[i]
}
header := metadata.New(map[string]string{"server-timing": "gfet4t7; dur=123"})
if err := grpc.SendHeader(ctx, header); err != nil {
return nil, gstatus.Errorf(codes.Internal, "unable to send 'server-timing' header")
}
return &spannerpb.BatchCreateSessionsResponse{Session: sessions}, nil
}

Expand Down Expand Up @@ -922,6 +928,10 @@ func (s *inMemSpannerServer) Read(ctx context.Context, req *spannerpb.ReadReques
}
s.receivedRequests <- req
s.mu.Unlock()
header := metadata.New(map[string]string{"server-timing": "gfet4t7; dur=123"})
if err := grpc.SendHeader(ctx, header); err != nil {
return nil, gstatus.Errorf(codes.Internal, "unable to send 'server-timing' header")
}
return nil, gstatus.Error(codes.Unimplemented, "Method not yet implemented")
}

Expand Down

0 comments on commit 3d8a9ea

Please sign in to comment.