Skip to content

Commit

Permalink
feat: linting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
asthamohta committed Dec 13, 2021
1 parent 444bd38 commit f2646e9
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 53 deletions.
36 changes: 26 additions & 10 deletions spanner/batch.go
Expand Up @@ -145,13 +145,16 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, _ := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientID),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
errGFE := captureGFELatencyStats(ctxGFE, md, "PartitionReadUsingIndexWithOptions")
if errGFE != nil {
return nil, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "PartitionReadUsingIndexWithOptions")
if errGFE != nil {
return nil, errGFE
}
Expand Down Expand Up @@ -214,13 +217,18 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyClientID, t.txReadOnly.clientID),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
if errGFE != nil {
return nil, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "partitionQuery")
return nil, errGFE
if errGFE != nil {
return nil, errGFE
}
}

// prepare ExecuteSqlRequest
Expand Down Expand Up @@ -291,14 +299,17 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyClientID, t.txReadOnly.clientID),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
if errGFE != nil {
logf(logger, "Error in creating new context. Try disabling and rerunning. Error: %v", errGFE)
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Cleanup")
if errGFE != nil {
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", err)
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", errGFE)
}
}

Expand Down Expand Up @@ -340,14 +351,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
})
if client != nil {
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyClientID, t.txReadOnly.clientID),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
if errGFE != nil {
return client, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
Expand All @@ -374,11 +387,14 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyClientID, t.txReadOnly.clientID),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
if errGFE != nil {
return client, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
Expand Down
13 changes: 5 additions & 8 deletions spanner/client.go
Expand Up @@ -289,7 +289,7 @@ func (c *Client) Single() *ReadOnlyTransaction {
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down Expand Up @@ -323,7 +323,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down Expand Up @@ -382,9 +382,6 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
}

if err != nil {
return nil, ToSpannerError(err)
}
t := &BatchReadOnlyTransaction{
ReadOnlyTransaction: ReadOnlyTransaction{
tx: tx,
Expand All @@ -408,7 +405,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down Expand Up @@ -449,7 +446,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down Expand Up @@ -548,7 +545,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down
4 changes: 2 additions & 2 deletions spanner/integration_test.go
Expand Up @@ -3283,11 +3283,11 @@ func TestIntegration_GFE_Latency(t *testing.T) {
}
_, err := client.Apply(ctx, ms)
if err != nil {
t.Fatalf("got error %v", err)
t.Fatalf("Could not insert rows to table. Got error %v", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId", "FirstName", "LastName"})
if err != nil {
t.Fatalf("got error %v", err)
t.Fatalf("Could not read row. Got error %v", err)
}
waitErr := &Error{}
waitFor(t, func() error {
Expand Down
1 change: 0 additions & 1 deletion spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -1061,7 +1061,6 @@ func (s *inMemSpannerServer) PartitionQuery(ctx context.Context, req *spannerpb.
}
partitions = append(partitions, &spannerpb.Partition{PartitionToken: token})
}

return &spannerpb.PartitionResponse{
Partitions: partitions,
Transaction: tx,
Expand Down
1 change: 0 additions & 1 deletion spanner/session.go
Expand Up @@ -349,7 +349,6 @@ func (s *session) delete(ctx context.Context) {
// Ignore the error because even if we fail to explicitly destroy the
// session, it will be eventually garbage collected by Cloud Spanner.
err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.DeleteSessionRequest{Name: s.getID()})

// Do not log DeadlineExceeded errors when deleting sessions, as these do
// not indicate anything the user can or should act upon.
if err != nil && ErrCode(err) != codes.DeadlineExceeded {
Expand Down
1 change: 0 additions & 1 deletion spanner/sessionclient.go
Expand Up @@ -144,7 +144,6 @@ func (sc *sessionClient) createSession(ctx context.Context) (*session, error) {
if errGFE != nil {
return nil, ToSpannerError(errGFE)
}
// Errors should not prevent initializing the session pool.
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, sc.id),
tag.Upsert(tagKeyDatabase, database),
Expand Down
20 changes: 15 additions & 5 deletions spanner/stats.go
Expand Up @@ -38,11 +38,12 @@ var (
tagKeyType = tag.MustNewKey("type")
tagCommonKeys = []tag.Key{tagKeyClientID, tagKeyDatabase, tagKeyInstance, tagKeyLibVersion}

tagNumInUseSessions = tag.Tag{Key: tagKeyType, Value: "num_in_use_sessions"}
tagNumBeingPrepared = tag.Tag{Key: tagKeyType, Value: "num_sessions_being_prepared"}
tagNumReadSessions = tag.Tag{Key: tagKeyType, Value: "num_read_sessions"}
tagNumWriteSessions = tag.Tag{Key: tagKeyType, Value: "num_write_prepared_sessions"}
tagKeyMethod = tag.MustNewKey("grpc_client_method")
tagNumInUseSessions = tag.Tag{Key: tagKeyType, Value: "num_in_use_sessions"}
tagNumBeingPrepared = tag.Tag{Key: tagKeyType, Value: "num_sessions_being_prepared"}
tagNumReadSessions = tag.Tag{Key: tagKeyType, Value: "num_read_sessions"}
tagNumWriteSessions = tag.Tag{Key: tagKeyType, Value: "num_write_prepared_sessions"}
tagKeyMethod = tag.MustNewKey("grpc_client_method")
// GFELatencyOrHeaderMissingCountEnabled is used to track if GFELatency and GFEHeaderMissingCount need to be recorded
GFELatencyOrHeaderMissingCountEnabled = false
)

Expand Down Expand Up @@ -162,12 +163,14 @@ var (
TagKeys: tagCommonKeys,
}

// GFELatency is the latency between Google's network receiving an RPC and reading back the first byte of the response
GFELatency = stats.Int64(
statsPrefix+"gfe_latency",
"Latency between Google's network receiving an RPC and reading back the first byte of the response",
stats.UnitMilliseconds,
)

// GFELatencyView is the view of distribution of GFELatency values
GFELatencyView = &view.View{
Name: "cloud.google.com/go/spanner/gfe_latency",
Measure: GFELatency,
Expand All @@ -179,12 +182,14 @@ var (
TagKeys: append(tagCommonKeys, tagKeyMethod),
}

// GFEHeaderMissingCount is the number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network
GFEHeaderMissingCount = stats.Int64(
statsPrefix+"gfe_header_missing_count",
"Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network",
stats.UnitDimensionless,
)

// GFEHeaderMissingCountView is the view of number of GFEHeaderMissingCount
GFEHeaderMissingCountView = &view.View{
Name: "cloud.google.com/go/spanner/gfe_header_missing_count",
Measure: GFEHeaderMissingCount,
Expand All @@ -207,15 +212,19 @@ func EnableStatViews() error {
)
}

// EnableGfeLatencyView enables GFELatency metric
func EnableGfeLatencyView() error {
GFELatencyOrHeaderMissingCountEnabled = true
return view.Register(GFELatencyView)
}

// EnableGfeHeaderMissingCountView enables GFEHeaderMissingCount metric
func EnableGfeHeaderMissingCountView() error {
GFELatencyOrHeaderMissingCountEnabled = true
return view.Register(GFEHeaderMissingCountView)
}

// EnableGfeLatencyAndHeaderMissingCountViews enables GFEHeaderMissingCount and GFELatency metric
func EnableGfeLatencyAndHeaderMissingCountViews() error {
GFELatencyOrHeaderMissingCountEnabled = true
return view.Register(
Expand All @@ -224,6 +233,7 @@ func EnableGfeLatencyAndHeaderMissingCountViews() error {
)
}

// DisableGfeLatencyAndHeaderMissingCountViews disables GFEHeaderMissingCount and GFELatency metric
func DisableGfeLatencyAndHeaderMissingCountViews() {
GFELatencyOrHeaderMissingCountEnabled = false
view.Unregister(
Expand Down
36 changes: 11 additions & 25 deletions spanner/transaction.go
Expand Up @@ -18,7 +18,6 @@ package spanner

import (
"context"
"log"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -92,7 +91,7 @@ type CommonTags struct {
// Common Tags value set
set bool
// Client ID
clientId string
clientID string
// Database Name
database string
// Instance ID
Expand Down Expand Up @@ -216,24 +215,19 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
})
if client != nil {
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.clientId),
tag.Upsert(tagKeyClientID, t.clientID),
tag.Upsert(tagKeyDatabase, t.database),
tag.Upsert(tagKeyInstance, t.instance),
tag.Upsert(tagKeyLibVersion, t.libVersion),
)
var logger *log.Logger
if sh.session != nil {
logger = sh.session.logger
}
if errGFE != nil {
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", errGFE)
return nil, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "ReadWithOptions")
if errGFE != nil {
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", errGFE)
return client, errGFE
}
}
}
Expand Down Expand Up @@ -433,21 +427,17 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, options Que
md, _ = client.Header()
if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.clientId),
tag.Upsert(tagKeyClientID, t.clientID),
tag.Upsert(tagKeyDatabase, t.database),
tag.Upsert(tagKeyInstance, t.instance),
tag.Upsert(tagKeyLibVersion, t.libVersion),
)
var logger *log.Logger
if sh.session != nil {
logger = sh.session.logger
}
if errGFE != nil {
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", errGFE)
return client, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "query")
if errGFE != nil {
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", errGFE)
return client, errGFE
}
}
}
Expand Down Expand Up @@ -623,7 +613,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyClientID, t.txReadOnly.clientID),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
Expand Down Expand Up @@ -987,7 +977,7 @@ func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts

if GFELatencyOrHeaderMissingCountEnabled && md != nil {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.clientId),
tag.Upsert(tagKeyClientID, t.clientID),
tag.Upsert(tagKeyDatabase, t.database),
tag.Upsert(tagKeyInstance, t.instance),
tag.Upsert(tagKeyLibVersion, t.libVersion),
Expand Down Expand Up @@ -1072,7 +1062,7 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts

if GFELatencyOrHeaderMissingCountEnabled && md != nil {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyClientID, t.txReadOnly.clientID),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
Expand Down Expand Up @@ -1336,10 +1326,6 @@ func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client,
tx: sh.getTransactionID(),
},
}
if err != nil {
// Could not parse database name
return nil, err
}
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
Expand All @@ -1351,7 +1337,7 @@ func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client,
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down

0 comments on commit f2646e9

Please sign in to comment.