Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): Adding GFE Latency and Header Missing Count Metrics #5199

Merged
merged 16 commits into from Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 38 additions & 5 deletions spanner/batch.go
Expand Up @@ -20,6 +20,10 @@ import (
"bytes"
"context"
"encoding/gob"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"log"
"time"

Expand Down Expand Up @@ -124,6 +128,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
partitions []*Partition
)
kset, err = keys.keySetProto()
var md metadata.MD
asthamohta marked this conversation as resolved.
Show resolved Hide resolved
// Request partitions.
if err != nil {
return nil, err
Expand All @@ -136,7 +141,11 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
Columns: columns,
KeySet: kset,
PartitionOptions: opt.toProto(),
})
}, gax.WithGRPCOptions(grpc.Header(&md)))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere: I think it is better to add an if err != nil { return err }` statement before checking for the latency values. I worry that we otherwise might start returning errors that are harder to interpret for users if the following happens:

  1. The client.PartitionRead returns an error. That error is not checked here at the moment.
  2. The RPC did return a header (so md != nil), but that header does not contain a GFE latency value. So instead of the error that was returned from PartitionedRead, we return an error that indicates that it could not find a GFE Latency value.

I worry that the above would be confusing for users, and make it harder to debug errors that occur because of for example invalid requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually if there is no GFE Latency value it won't throw an error. Instead it will add to the header missing count. It only throws an error if it got a garbage value in the value for the key value pair "server-timing". Also currently in the Java implementation, they intercept every rpc call(even when there is a failed operation) and use it to increase the header missing count. I feel if we don't add to the Header missing count in this case then across languages the header missing count might be different in someway

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel if we don't add to the Header missing count in this case then across languages the header missing count might be different in someway

That's a good point. So agreed to keep this as is on that.

I still worry a little bit that we might be returning an error from extracting the metadata instead of the error from the actual operation. I think we had a discussion about that earlier, and agreed on returning the error from those methods. I think I'm starting to change my mind on that. It is not an error that the user can do anything about, and it might just cover other potentially more important errors. So I would suggest then ignoring those errors. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log these errors instead then? How do we let the user know that an error has occurred?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have access to the logger, then logging it is probably a good idea. I don't think users can do anything with these errors anyways, so I don't think we need to actively inform them any more than that (and the chance that it happens should also be minimal, as it would mean that the data that is sent back is invalid)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our conversation going to add a trace instead of returning an error

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "PartitionReadUsingIndexWithOptions")
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Expand Down Expand Up @@ -180,6 +189,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
if err != nil {
return nil, err
}
var md metadata.MD

// request Partitions
req := &sppb.PartitionQueryRequest{
Expand All @@ -190,7 +200,11 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
Params: params,
ParamTypes: paramTypes,
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req)
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

asthamohta marked this conversation as resolved.
Show resolved Hide resolved
if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "partitionQuery")
}

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Expand Down Expand Up @@ -251,7 +265,13 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
}
t.sh = nil
sid, client := sh.getID(), sh.getClient()
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid})
var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "Cleanup")
}

if err != nil {
var logger *log.Logger
if sh.session != nil {
Expand All @@ -277,10 +297,11 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
// Might happen if transaction is closed in the middle of a API call.
return &RowIterator{err: errSessionClosed(sh)}
}
var md metadata.MD
// Read or query partition.
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.StreamingRead(ctx, &sppb.ReadRequest{
client, err := client.StreamingRead(ctx, &sppb.ReadRequest{
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Expand All @@ -291,10 +312,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
})
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "Execute")
}
return client,err
}
} else {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Expand All @@ -305,6 +332,12 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
})
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "Execute")
}
return client,err
}
}
return stream(
Expand Down
8 changes: 7 additions & 1 deletion spanner/client.go
Expand Up @@ -19,6 +19,8 @@ package spanner
import (
"context"
"fmt"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
"log"
"os"
"regexp"
Expand Down Expand Up @@ -335,6 +337,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
}
sh = &sessionHandle{session: s}

var md metadata.MD
// Begin transaction.
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Expand All @@ -343,7 +346,10 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
ReadOnly: buildTransactionOptionsReadOnly(tb, true),
},
},
})
}, gax.WithGRPCOptions(grpc.Header(&md)))
if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "BatchReadOnlyTransaction_BeginTransaction")
}
if err != nil {
return nil, ToSpannerError(err)
}
Expand Down
1 change: 1 addition & 0 deletions spanner/go.mod
Expand Up @@ -4,6 +4,7 @@ go 1.11

require (
cloud.google.com/go v0.97.0
github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
github.com/googleapis/gax-go/v2 v2.1.1
Expand Down
3 changes: 2 additions & 1 deletion spanner/go.sum
Expand Up @@ -49,8 +49,9 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down
82 changes: 82 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"flag"
"fmt"
"go.opencensus.io/stats/view"
"log"
"math"
"math/big"
Expand Down Expand Up @@ -3264,6 +3265,87 @@ func TestIntegration_DirectPathFallback(t *testing.T) {
}
}

func TestIntegration_GFE_Latency(t *testing.T){
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView)
GFELatencyOrHeaderMissingCountEnabled= true

client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
defer cleanup()

singerColumns := []string{"SingerId", "FirstName", "LastName"}
var ms = []*Mutation{
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
_, err := client.Apply(ctx, ms)
if err!= nil {
t.Fatalf("got error %v", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId", "FirstName", "LastName"})
if err!= nil {
t.Fatalf("got error %v", err)
}
waitErr := &Error{}
waitFor(t, func() error {
select {
case stat := <-te.Stats:
if len(stat.Rows) > 0 {
return nil
}
}
return waitErr
})

var viewMap = map[string]bool{
statsPrefix+"gfe_latency" : false,
statsPrefix+"gfe_header_missing_count" : false,
}

for {
if viewMap[statsPrefix+"gfe_latency"] && viewMap[statsPrefix+"gfe_header_missing_count"]{
break
}
select {
case stat := <-te.Stats:
if len(stat.Rows) == 0 {
t.Fatal("No metrics are exported")
}
if stat.View.Measure.Name() != statsPrefix+"gfe_latency" && stat.View.Measure.Name() != statsPrefix+"gfe_header_missing_count" {
t.Fatalf("Incorrect measure: got %v, want %v", stat.View.Measure.Name(), statsPrefix+"gfe_latency or "+statsPrefix+"gfe_header_missing_count")
} else
{
viewMap[stat.View.Measure.Name()] = true
}
for _, row := range stat.Rows {
m := getTagMap(row.Tags)
checkCommonTagsGFELatency(t, m)
var data string
switch row.Data.(type) {
default:
data = fmt.Sprintf("%v", row.Data)
case *view.CountData:
data = fmt.Sprintf("%v", row.Data.(*view.CountData).Value)
case *view.LastValueData:
data = fmt.Sprintf("%v", row.Data.(*view.LastValueData).Value)
case *view.DistributionData:
data = fmt.Sprintf("%v", row.Data.(*view.DistributionData).Count)
}
if got, want := fmt.Sprintf("%v", data), "0"; got <= want {
t.Fatalf("Incorrect data: got %v, wanted more than %v for metric %v", got, want, stat.View.Measure.Name())
}
}
case <-time.After(2 * time.Second):
asthamohta marked this conversation as resolved.
Show resolved Hide resolved
if !viewMap[statsPrefix+"gfe_latency"] || !viewMap[statsPrefix+"gfe_header_missing_count"] {
t.Fatal("no stats were exported before timeout")
}
}
}
DisableGfeLatencyAndHeaderMissingCountViews()
}

// Prepare initializes Cloud Spanner testing DB and clients.
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
if databaseAdmin == nil {
Expand Down
51 changes: 51 additions & 0 deletions spanner/oc_test.go
Expand Up @@ -261,6 +261,57 @@ func TestOCStats_SessionPool_GetSessionTimeoutsCount(t *testing.T) {
}
}

func TestOCStats_GFE_Latency(t *testing.T){
te := testutil.NewTestExporter(GFELatencyView)
asthamohta marked this conversation as resolved.
Show resolved Hide resolved
defer te.Unregister()

GFELatencyOrHeaderMissingCountEnabled = true

_, client, teardown := setupMockedTestServer(t)
defer teardown()

client.Single().ReadRow(context.Background(), "Users", Key{"alice"}, []string{"email"})
asthamohta marked this conversation as resolved.
Show resolved Hide resolved

waitErr := &Error{}
waitFor(t, func() error {
select {
case stat := <-te.Stats:
asthamohta marked this conversation as resolved.
Show resolved Hide resolved
if len(stat.Rows) > 0 {
return nil
}
}
return waitErr
})

// Wait until we see data from the view.
select {
case stat := <-te.Stats:
if len(stat.Rows) == 0 {
t.Fatal("No metrics are exported")
}
if got, want := stat.View.Measure.Name(), statsPrefix+"gfe_latency"; got != want {
t.Fatalf("Incorrect measure: got %v, want %v", got, want)
}
row := stat.Rows[0]
m := getTagMap(row.Tags)
checkCommonTags(t, m)
var data string
switch row.Data.(type) {
default:
data = fmt.Sprintf("%v", row.Data)
case *view.CountData:
data = fmt.Sprintf("%v", row.Data.(*view.CountData).Value)
case *view.LastValueData:
data = fmt.Sprintf("%v", row.Data.(*view.LastValueData).Value)
}
if got, want := data, "1"; got != want {
t.Fatalf("Incorrect data: got %v, want %v", got, want)
}
case <-time.After(1 * time.Second):
t.Fatal("no stats were exported before timeout")
}

}
func getTagMap(tags []tag.Tag) map[tag.Key]string {
m := make(map[tag.Key]string)
for _, t := range tags {
Expand Down
18 changes: 14 additions & 4 deletions spanner/pdml.go
Expand Up @@ -15,12 +15,14 @@
package spanner

import (
"context"

"cloud.google.com/go/internal/trace"
"context"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
)

// PartitionedUpdate executes a DML statement in parallel across the database,
Expand Down Expand Up @@ -100,21 +102,29 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
//
// Note that PDML transactions cannot be committed or rolled back.
func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) {
var md metadata.MD
// Begin transaction.
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}},
},
})
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_BeginTransaction")
}
if err != nil {
return 0, ToSpannerError(err)
}
// Add a reference to the PDML transaction on the ExecuteSql request.
req.Transaction = &sppb.TransactionSelector{
Selector: &sppb.TransactionSelector_Id{Id: res.Id},
}
resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req)
resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))
if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
}
if err != nil {
return 0, err
}
Expand Down