Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): Adding GFE Latency and Header Missing Count Metrics #5199

Merged
merged 16 commits into from Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 59 additions & 5 deletions spanner/batch.go
Expand Up @@ -23,8 +23,12 @@ import (
"log"
"time"

"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/proto"
"github.com/googleapis/gax-go/v2"
asthamohta marked this conversation as resolved.
Show resolved Hide resolved
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting
Expand Down Expand Up @@ -128,6 +132,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
if err != nil {
return nil, err
}
var md metadata.MD
resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.PartitionReadRequest{
Session: sid,
Transaction: ts,
Expand All @@ -136,7 +141,13 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
Columns: columns,
KeySet: kset,
PartitionOptions: opt.toProto(),
})
}, gax.WithGRPCOptions(grpc.Header(&md)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere: I think it is better to add an if err != nil { return err }` statement before checking for the latency values. I worry that we otherwise might start returning errors that are harder to interpret for users if the following happens:

  1. The client.PartitionRead returns an error. That error is not checked here at the moment.
  2. The RPC did return a header (so md != nil), but that header does not contain a GFE latency value. So instead of the error that was returned from PartitionedRead, we return an error that indicates that it could not find a GFE Latency value.

I worry that the above would be confusing for users, and make it harder to debug errors that occur because of for example invalid requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually if there is no GFE Latency value it won't throw an error. Instead it will add to the header missing count. It only throws an error if it got a garbage value in the value for the key value pair "server-timing". Also currently in the Java implementation, they intercept every rpc call(even when there is a failed operation) and use it to increase the header missing count. I feel if we don't add to the Header missing count in this case then across languages the header missing count might be different in someway

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel if we don't add to the Header missing count in this case then across languages the header missing count might be different in someway

That's a good point. So agreed to keep this as is on that.

I still worry a little bit that we might be returning an error from extracting the metadata instead of the error from the actual operation. I think we had a discussion about that earlier, and agreed on returning the error from those methods. I think I'm starting to change my mind on that. It is not an error that the user can do anything about, and it might just cover other potentially more important errors. So I would suggest then ignoring those errors. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log these errors instead then? How do we let the user know that an error has occurred?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have access to the logger, then logging it is probably a good idea. I don't think users can do anything with these errors anyways, so I don't think we need to actively inform them any more than that (and the chance that it happens should also be minimal, as it would mean that the data that is sent back is invalid)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our conversation going to add a trace instead of returning an error

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "PartitionReadUsingIndexWithOptions"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Expand Down Expand Up @@ -180,6 +191,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
if err != nil {
return nil, err
}
var md metadata.MD

// request Partitions
req := &sppb.PartitionQueryRequest{
Expand All @@ -190,7 +202,13 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
Params: params,
ParamTypes: paramTypes,
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req)
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

asthamohta marked this conversation as resolved.
Show resolved Hide resolved
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Expand Down Expand Up @@ -251,7 +269,16 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
}
t.sh = nil
sid, client := sh.getID(), sh.getClient()
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid})

var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Cleanup"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}

if err != nil {
var logger *log.Logger
if sh.session != nil {
Expand Down Expand Up @@ -280,7 +307,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
// Read or query partition.
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.StreamingRead(ctx, &sppb.ReadRequest{
client, err := client.StreamingRead(ctx, &sppb.ReadRequest{
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Expand All @@ -291,10 +318,23 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
})
if err != nil {
return client, err
}
md, err := client.Header()
if err != nil {
return nil, err
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this early return is no longer needed now that we don't return an error if getting the latency stats fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is still needed as there might be an error in getting the Header, and in that case we return the error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that happens on line 333. The reason that we added the early return above in the first place, was that we would return the error that might be returned createContextAndCaptureGFELatencyMetrics. As we don't do that anymore, we can safely wait until line 333 before returning the error from the client.Header() method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then won't md, err := client.Header() override the value of the original error. I will have to add a separate check as md, errGFE : = client.Header()
if errGFE != nil {
return nil, errGFE
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we mean the same return in this case. I think you can safely return the if err != nil block starting at line 325. The err on line 329 will not override the err from the call to client.Header(), as that is a different variable, although it does have the same name. That err is only visible in the statement on line 329.

So to summarize:

  1. The if err != nil on line 321 should remain.
  2. The if err != nil on line 325 can be removed. (And yes, the md, err := client.Header() does override the previous value of err, but that is not a problem as we know that err at that point is nil)
  3. The if err := createContextAndCaptureGFELatencyMetrics on line 329 can safely remain. That will not override anything, as it is a separate variable only defined on line 329.
  4. The return client, err on line 333 will therefore always return the value of err that was returned by client.Header().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay makes sense. My bad. I thought you meant remove the error return on line 321. Will do that.

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
return client, err
}
} else {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Expand All @@ -305,6 +345,20 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
})
if err != nil {
return client, err
}
md, err := client.Header()
if err != nil {
return nil, err
}
asthamohta marked this conversation as resolved.
Show resolved Hide resolved

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
return client, err
}
}
return stream(
Expand Down
7 changes: 7 additions & 0 deletions spanner/client.go
Expand Up @@ -80,6 +80,7 @@ type Client struct {
idleSessions *sessionPool
logger *log.Logger
qo QueryOptions
ct *commonTags
}

// DatabaseName returns the full name of a database, e.g.,
Expand Down Expand Up @@ -204,6 +205,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
idleSessions: sp,
logger: config.logger,
qo: getQueryOptions(config.QueryOptions),
ct: getCommonTags(sc),
}
return c, nil
}
Expand Down Expand Up @@ -280,6 +282,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.sh = sh
return nil
}
t.ct = c.ct
return t
}

Expand All @@ -300,6 +303,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.ct = c.ct
return t
}

Expand Down Expand Up @@ -368,6 +372,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.ct = c.ct
return t, nil
}

Expand Down Expand Up @@ -395,6 +400,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.ct = c.ct
return t
}

Expand Down Expand Up @@ -480,6 +486,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txOpts = options
t.ct = c.ct

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
"Starting transaction attempt")
Expand Down
1 change: 1 addition & 0 deletions spanner/go.mod
Expand Up @@ -3,6 +3,7 @@ module cloud.google.com/go/spanner
go 1.11

require (
github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect
cloud.google.com/go v0.98.0
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
Expand Down
3 changes: 2 additions & 1 deletion spanner/go.sum
Expand Up @@ -50,8 +50,9 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down
80 changes: 80 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -39,6 +39,7 @@ import (
"cloud.google.com/go/internal/uid"
database "cloud.google.com/go/spanner/admin/database/apiv1"
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
"go.opencensus.io/stats/view"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
Expand Down Expand Up @@ -3264,6 +3265,85 @@ func TestIntegration_DirectPathFallback(t *testing.T) {
}
}

func TestIntegration_GFE_Latency(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView)
GFELatencyMetricsEnabled = true

client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
defer cleanup()

singerColumns := []string{"SingerId", "FirstName", "LastName"}
var ms = []*Mutation{
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
_, err := client.Apply(ctx, ms)
if err != nil {
t.Fatalf("Could not insert rows to table. Got error %v", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId", "FirstName", "LastName"})
if err != nil {
t.Fatalf("Could not read row. Got error %v", err)
}
waitErr := &Error{}
waitFor(t, func() error {
select {
case stat := <-te.Stats:
if len(stat.Rows) > 0 {
return nil
}
}
return waitErr
})

var viewMap = map[string]bool{statsPrefix + "gfe_latency": false,
statsPrefix + "gfe_header_missing_count": false,
}

for {
if viewMap[statsPrefix+"gfe_latency"] || viewMap[statsPrefix+"gfe_header_missing_count"] {
break
}
select {
case stat := <-te.Stats:
if len(stat.Rows) == 0 {
t.Fatal("No metrics are exported")
}
if stat.View.Measure.Name() != statsPrefix+"gfe_latency" && stat.View.Measure.Name() != statsPrefix+"gfe_header_missing_count" {
t.Fatalf("Incorrect measure: got %v, want %v", stat.View.Measure.Name(), statsPrefix+"gfe_latency or "+statsPrefix+"gfe_header_missing_count")
} else {
viewMap[stat.View.Measure.Name()] = true
}
for _, row := range stat.Rows {
m := getTagMap(row.Tags)
checkCommonTagsGFELatency(t, m)
var data string
switch row.Data.(type) {
default:
data = fmt.Sprintf("%v", row.Data)
case *view.CountData:
data = fmt.Sprintf("%v", row.Data.(*view.CountData).Value)
case *view.LastValueData:
data = fmt.Sprintf("%v", row.Data.(*view.LastValueData).Value)
case *view.DistributionData:
data = fmt.Sprintf("%v", row.Data.(*view.DistributionData).Count)
}
if got, want := fmt.Sprintf("%v", data), "0"; got <= want {
t.Fatalf("Incorrect data: got %v, wanted more than %v for metric %v", got, want, stat.View.Measure.Name())
}
}
case <-time.After(10 * time.Second):
if !viewMap[statsPrefix+"gfe_latency"] && !viewMap[statsPrefix+"gfe_header_missing_count"] {
t.Fatal("no stats were exported before timeout")
}
}
}
DisableGfeLatencyAndHeaderMissingCountViews()
}

// Prepare initializes Cloud Spanner testing DB and clients.
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
if databaseAdmin == nil {
Expand Down
10 changes: 10 additions & 0 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -32,7 +32,9 @@ import (
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/genproto/googleapis/rpc/status"
spannerpb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
gstatus "google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -687,6 +689,10 @@ func (s *inMemSpannerServer) BatchCreateSessions(ctx context.Context, req *spann
s.totalSessionsCreated++
s.sessions[sessionName] = sessions[i]
}
header := metadata.New(map[string]string{"server-timing": "gfet4t7; dur=123"})
if err := grpc.SendHeader(ctx, header); err != nil {
return nil, gstatus.Errorf(codes.Internal, "unable to send 'server-timing' header")
}
return &spannerpb.BatchCreateSessionsResponse{Session: sessions}, nil
}

Expand Down Expand Up @@ -922,6 +928,10 @@ func (s *inMemSpannerServer) Read(ctx context.Context, req *spannerpb.ReadReques
}
s.receivedRequests <- req
s.mu.Unlock()
header := metadata.New(map[string]string{"server-timing": "gfet4t7; dur=123"})
if err := grpc.SendHeader(ctx, header); err != nil {
return nil, gstatus.Errorf(codes.Internal, "unable to send 'server-timing' header")
}
return nil, gstatus.Error(codes.Unimplemented, "Method not yet implemented")
}

Expand Down