Skip to content

Commit

Permalink
fix: changes as per review
Browse files Browse the repository at this point in the history
  • Loading branch information
asthamohta committed Dec 20, 2021
1 parent 83cba34 commit 13d936d
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 184 deletions.
79 changes: 28 additions & 51 deletions spanner/batch.go
Expand Up @@ -127,11 +127,11 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
partitions []*Partition
)
kset, err = keys.keySetProto()
var md metadata.MD
// Request partitions.
if err != nil {
return nil, err
}
var md metadata.MD
resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.PartitionReadRequest{
Session: sid,
Transaction: ts,
Expand All @@ -142,13 +142,8 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
PartitionOptions: opt.toProto(),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return nil, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "PartitionReadUsingIndexWithOptions")
if errGFE != nil {
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if errGFE := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "PartitionReadUsingIndexWithOptions"); errGFE != nil {
return nil, errGFE
}
}
Expand Down Expand Up @@ -208,13 +203,8 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return nil, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "partitionQuery")
if errGFE != nil {
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if errGFE := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); errGFE != nil {
return nil, errGFE
}
}
Expand Down Expand Up @@ -286,14 +276,9 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
if sh.session != nil {
logger = sh.session.logger
}
if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
logf(logger, "Error in creating new context. Try disabling and rerunning. Error: %v", errGFE)
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Cleanup")
if errGFE != nil {
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", errGFE)
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if errGFE := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Cleanup"); errGFE != nil {
logf(logger, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", errGFE)
}
}

Expand Down Expand Up @@ -332,20 +317,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
})
if client != nil {
md, errGFE := client.Header()
if errGFE != nil {
return nil, errGFE
}
if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return client, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
}
if err != nil {
return client, err
}
md, errGFE := client.Header()
if errGFE != nil {
return nil, errGFE
}
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if errGFE := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); errGFE != nil {
return client, errGFE
}
}
return client, err
Expand All @@ -363,21 +344,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
})
if client != nil {
md, errGFE := client.Header()
if errGFE != nil {
return nil, errGFE
}
if err != nil {
return client, err
}
md, errGFE := client.Header()
if errGFE != nil {
return nil, errGFE
}

if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return client, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
}
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if errGFE := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); errGFE != nil {
return client, errGFE
}
}
return client, err
Expand Down
67 changes: 16 additions & 51 deletions spanner/client.go
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
vkit "cloud.google.com/go/spanner/apiv1"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
Expand Down Expand Up @@ -81,6 +80,7 @@ type Client struct {
idleSessions *sessionPool
logger *log.Logger
qo QueryOptions
ct *CommonTags
}

// DatabaseName returns the full name of a database, e.g.,
Expand Down Expand Up @@ -281,17 +281,10 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.sh = sh
return nil
}
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
if c.sc != nil && c.ct == nil {
setCommonTags(c)
}
t.ct = c.ct
return t
}

Expand All @@ -312,17 +305,10 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
if c.sc != nil && c.ct == nil {
setCommonTags(c)
}
t.ct = c.ct
return t
}

Expand Down Expand Up @@ -391,17 +377,10 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
if c.sc != nil && c.ct == nil {
setCommonTags(c)
}
t.ct = c.ct
return t, nil
}

Expand Down Expand Up @@ -429,17 +408,10 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
if c.sc != nil && c.ct == nil {
setCommonTags(c)
}
t.ct = c.ct
return t
}

Expand Down Expand Up @@ -525,17 +497,10 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txOpts = options
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
if c.sc != nil && c.ct == nil {
setCommonTags(c)
}
t.ct = c.ct

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
"Starting transaction attempt")
Expand Down
5 changes: 2 additions & 3 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -25,17 +25,16 @@ import (
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/golang/protobuf/ptypes"
emptypb "github.com/golang/protobuf/ptypes/empty"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/golang/protobuf/ptypes/timestamp"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/genproto/googleapis/rpc/status"
spannerpb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
gstatus "google.golang.org/grpc/status"
)

Expand Down
39 changes: 33 additions & 6 deletions spanner/stats.go
Expand Up @@ -278,11 +278,38 @@ func checkCommonTagsGFELatency(t *testing.T, m map[tag.Key]string) {
}
}

func createContextForGFELatencyMetrics(ctx context.Context, t txReadOnly) (context.Context, error) {
return tag.New(ctx,
tag.Upsert(tagKeyClientID, t.clientID),
tag.Upsert(tagKeyDatabase, t.database),
tag.Upsert(tagKeyInstance, t.instance),
tag.Upsert(tagKeyLibVersion, t.libVersion),
func createContextAndCaptureGFELatencyMetrics(ctx context.Context, ct *CommonTags, md metadata.MD, keyMethod string) error {
var ctxGFE, errGFE = tag.New(ctx,
tag.Upsert(tagKeyClientID, ct.clientID),
tag.Upsert(tagKeyDatabase, ct.database),
tag.Upsert(tagKeyInstance, ct.instance),
tag.Upsert(tagKeyLibVersion, ct.libVersion),
)
if errGFE != nil {
return errGFE
}
return captureGFELatencyStats(ctxGFE, md, keyMethod)
}

func setCommonTags(c *Client) {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
c.ct = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}

type CommonTags struct {
// Client ID
clientID string
// Database Name
database string
// Instance ID
instance string
// Library Version
libVersion string
}

0 comments on commit 13d936d

Please sign in to comment.