Skip to content

Commit

Permalink
fix: changes as per review
Browse files Browse the repository at this point in the history
  • Loading branch information
asthamohta committed Dec 22, 2021
1 parent 83cba34 commit a3aff82
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 229 deletions.
92 changes: 35 additions & 57 deletions spanner/batch.go
Expand Up @@ -23,6 +23,7 @@ import (
"log"
"time"

"cloud.google.com/go/internal/trace"
"github.com/golang/protobuf/proto"
"github.com/googleapis/gax-go/v2"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
Expand Down Expand Up @@ -127,11 +128,11 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
partitions []*Partition
)
kset, err = keys.keySetProto()
var md metadata.MD
// Request partitions.
if err != nil {
return nil, err
}
var md metadata.MD
resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.PartitionReadRequest{
Session: sid,
Transaction: ts,
Expand All @@ -142,14 +143,9 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
PartitionOptions: opt.toProto(),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return nil, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "PartitionReadUsingIndexWithOptions")
if errGFE != nil {
return nil, errGFE
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "PartitionReadUsingIndexWithOptions"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
// Prepare ReadRequest.
Expand Down Expand Up @@ -208,14 +204,9 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return nil, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "partitionQuery")
if errGFE != nil {
return nil, errGFE
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}

Expand Down Expand Up @@ -282,22 +273,17 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

var logger *log.Logger
if sh.session != nil {
logger = sh.session.logger
}
if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
logf(logger, "Error in creating new context. Try disabling and rerunning. Error: %v", errGFE)
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Cleanup")
if errGFE != nil {
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", errGFE)
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Cleanup"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}

if err != nil {
var logger *log.Logger
if sh.session != nil {
logger = sh.session.logger
}
logf(logger, "Failed to delete session %v. Error: %v", sid, err)
}
}
Expand Down Expand Up @@ -332,20 +318,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
})
if client != nil {
md, errGFE := client.Header()
if errGFE != nil {
return nil, errGFE
}
if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return client, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
}
if err != nil {
return client, err
}
md, err := client.Header()
if err != nil {
return nil, err
}
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
return client, err
Expand All @@ -363,21 +345,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
})
if client != nil {
md, errGFE := client.Header()
if errGFE != nil {
return nil, errGFE
}
if err != nil {
return client, err
}
md, err := client.Header()
if err != nil {
return nil, err
}

if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return client, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
}
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
return client, err
Expand Down
63 changes: 7 additions & 56 deletions spanner/client.go
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
vkit "cloud.google.com/go/spanner/apiv1"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
Expand Down Expand Up @@ -81,6 +80,7 @@ type Client struct {
idleSessions *sessionPool
logger *log.Logger
qo QueryOptions
ct *CommonTags
}

// DatabaseName returns the full name of a database, e.g.,
Expand Down Expand Up @@ -205,6 +205,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
idleSessions: sp,
logger: config.logger,
qo: getQueryOptions(config.QueryOptions),
ct: getCommonTags(sc),
}
return c, nil
}
Expand Down Expand Up @@ -281,17 +282,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.sh = sh
return nil
}
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}
t.ct = c.ct
return t
}

Expand All @@ -312,17 +303,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}
t.ct = c.ct
return t
}

Expand Down Expand Up @@ -391,17 +372,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}
t.ct = c.ct
return t, nil
}

Expand Down Expand Up @@ -429,17 +400,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}
t.ct = c.ct
return t
}

Expand Down Expand Up @@ -525,17 +486,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txOpts = options
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}
t.ct = c.ct

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
"Starting transaction attempt")
Expand Down
5 changes: 2 additions & 3 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -25,17 +25,16 @@ import (
"sync"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/golang/protobuf/ptypes"
emptypb "github.com/golang/protobuf/ptypes/empty"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/golang/protobuf/ptypes/timestamp"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/genproto/googleapis/rpc/status"
spannerpb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
gstatus "google.golang.org/grpc/status"
)

Expand Down
10 changes: 5 additions & 5 deletions spanner/pdml.go
Expand Up @@ -15,9 +15,8 @@
package spanner

import (
"context"

"cloud.google.com/go/internal/trace"
"context"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
Expand Down Expand Up @@ -120,14 +119,15 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
}
resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))
if GFELatencyMetricsEnabled && md != nil && sh.session.pool != nil {
errGFE := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
if errGFE != nil {
return 0, errGFE
err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
if err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
}
if err != nil {
return 0, err
}

if resultSet.Stats == nil {
return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", req.Sql)
}
Expand Down
42 changes: 18 additions & 24 deletions spanner/sessionclient.go
Expand Up @@ -139,22 +139,22 @@ func (sc *sessionClient) createSession(ctx context.Context) (*session, error) {
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil {
_, instance, database, errGFE := parseDatabaseName(sc.database)
if errGFE != nil {
return nil, ToSpannerError(errGFE)
_, instance, database, err := parseDatabaseName(sc.database)
if err != nil {
return nil, ToSpannerError(err)
}
ctxGFE, errGFE := tag.New(ctx,
ctxGFE, err := tag.New(ctx,
tag.Upsert(tagKeyClientID, sc.id),
tag.Upsert(tagKeyDatabase, database),
tag.Upsert(tagKeyInstance, instance),
tag.Upsert(tagKeyLibVersion, version.Repo),
)
if errGFE != nil {
return nil, ToSpannerError(errGFE)
if err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err))
}
errGFE = captureGFELatencyStats(ctxGFE, md, "createSession")
if errGFE != nil {
return nil, ToSpannerError(errGFE)
err = captureGFELatencyStats(ctxGFE, md, "createSession")
if err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err))
}

}
Expand Down Expand Up @@ -262,29 +262,23 @@ func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createC
}, gax.WithGRPCOptions(grpc.Header(&mdForGFELatency)))

if GFELatencyMetricsEnabled && mdForGFELatency != nil {
_, instance, database, errGFE := parseDatabaseName(sc.database)
if errGFE != nil {
trace.TracePrintf(ctx, nil, "Error getting instance and database name: %v", errGFE)
consumer.sessionCreationFailed(ToSpannerError(errGFE), remainingCreateCount)
break
_, instance, database, err := parseDatabaseName(sc.database)
if err != nil {
trace.TracePrintf(ctx, nil, "Error getting instance and database name: %v", err)
}
// Errors should not prevent initializing the session pool.
ctxGFE, errGFE := tag.New(ctx,
ctxGFE, err := tag.New(ctx,
tag.Upsert(tagKeyClientID, sc.id),
tag.Upsert(tagKeyDatabase, database),
tag.Upsert(tagKeyInstance, instance),
tag.Upsert(tagKeyLibVersion, version.Repo),
)
if errGFE != nil {
trace.TracePrintf(ctx, nil, "Error in adding tags in BatchCreateSessions for GFE Latency: %v", errGFE)
consumer.sessionCreationFailed(ToSpannerError(errGFE), remainingCreateCount)
break
if err != nil {
trace.TracePrintf(ctx, nil, "Error in adding tags in BatchCreateSessions for GFE Latency: %v", err)
}
errGFE = captureGFELatencyStats(ctxGFE, mdForGFELatency, "executeBatchCreateSessions")
if errGFE != nil {
trace.TracePrintf(ctx, nil, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: : %v", errGFE)
consumer.sessionCreationFailed(ToSpannerError(errGFE), remainingCreateCount)
break
err = captureGFELatencyStats(ctxGFE, mdForGFELatency, "executeBatchCreateSessions")
if err != nil {
trace.TracePrintf(ctx, nil, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", err)
}
}
if err != nil {
Expand Down

0 comments on commit a3aff82

Please sign in to comment.