Skip to content

Commit

Permalink
feat: linting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
asthamohta committed Dec 13, 2021
1 parent 444bd38 commit 83cba34
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 189 deletions.
95 changes: 45 additions & 50 deletions spanner/batch.go
Expand Up @@ -23,13 +23,11 @@ import (
"log"
"time"

"github.com/golang/protobuf/proto"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/golang/protobuf/proto"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
)

// BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting
Expand Down Expand Up @@ -144,14 +142,12 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
PartitionOptions: opt.toProto(),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, _ := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
errGFE := captureGFELatencyStats(ctxGFE, md, "PartitionReadUsingIndexWithOptions")
if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return nil, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "PartitionReadUsingIndexWithOptions")
if errGFE != nil {
return nil, errGFE
}
Expand Down Expand Up @@ -212,15 +208,15 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return nil, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "partitionQuery")
return nil, errGFE
if errGFE != nil {
return nil, errGFE
}
}

// prepare ExecuteSqlRequest
Expand Down Expand Up @@ -282,23 +278,22 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
}
t.sh = nil
sid, client := sh.getID(), sh.getClient()

var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

var logger *log.Logger
if sh.session != nil {
logger = sh.session.logger
}
var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
logf(logger, "Error in creating new context. Try disabling and rerunning. Error: %v", errGFE)
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Cleanup")
if errGFE != nil {
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", err)
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", errGFE)
}
}

Expand All @@ -323,7 +318,6 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
// Might happen if transaction is closed in the middle of a API call.
return &RowIterator{err: errSessionClosed(sh)}
}
var md metadata.MD
// Read or query partition.
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
Expand All @@ -339,15 +333,15 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
ResumeToken: resumeToken,
})
if client != nil {
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
md, errGFE := client.Header()
if errGFE != nil {
return nil, errGFE
}
if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return client, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
Expand All @@ -370,15 +364,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
ResumeToken: resumeToken,
})
if client != nil {
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
md, errGFE := client.Header()
if errGFE != nil {
return nil, errGFE
}

if GFELatencyMetricsEnabled && md != nil && t.txReadOnly.CommonTags != nil {
ctxGFE, errGFE := createContextForGFELatencyMetrics(ctx, t.txReadOnly)
if errGFE != nil {
return client, errGFE
}
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
Expand Down
41 changes: 11 additions & 30 deletions spanner/client.go
Expand Up @@ -24,9 +24,8 @@ import (
"regexp"
"time"

"cloud.google.com/go/internal/version"

"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
vkit "cloud.google.com/go/spanner/apiv1"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
Expand Down Expand Up @@ -282,14 +281,11 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.sh = sh
return nil
}
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand All @@ -316,14 +312,11 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down Expand Up @@ -382,9 +375,6 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
}

if err != nil {
return nil, ToSpannerError(err)
}
t := &BatchReadOnlyTransaction{
ReadOnlyTransaction: ReadOnlyTransaction{
tx: tx,
Expand All @@ -401,14 +391,11 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down Expand Up @@ -442,14 +429,11 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down Expand Up @@ -541,14 +525,11 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txOpts = options
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
t.txReadOnly.CommonTags = &CommonTags{
clientID: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
Expand Down
11 changes: 5 additions & 6 deletions spanner/integration_test.go
Expand Up @@ -34,13 +34,12 @@ import (
"testing"
"time"

"go.opencensus.io/stats/view"

"cloud.google.com/go/civil"
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
database "cloud.google.com/go/spanner/admin/database/apiv1"
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
"go.opencensus.io/stats/view"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
Expand Down Expand Up @@ -3272,7 +3271,7 @@ func TestIntegration_GFE_Latency(t *testing.T) {
defer cancel()

te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView)
GFELatencyOrHeaderMissingCountEnabled = true
GFELatencyMetricsEnabled = true

client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
defer cleanup()
Expand All @@ -3283,11 +3282,11 @@ func TestIntegration_GFE_Latency(t *testing.T) {
}
_, err := client.Apply(ctx, ms)
if err != nil {
t.Fatalf("got error %v", err)
t.Fatalf("Could not insert rows to table. Got error %v", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId", "FirstName", "LastName"})
if err != nil {
t.Fatalf("got error %v", err)
t.Fatalf("Could not read row. Got error %v", err)
}
waitErr := &Error{}
waitFor(t, func() error {
Expand Down Expand Up @@ -3336,7 +3335,7 @@ func TestIntegration_GFE_Latency(t *testing.T) {
t.Fatalf("Incorrect data: got %v, wanted more than %v for metric %v", got, want, stat.View.Measure.Name())
}
}
case <-time.After(2 * time.Second):
case <-time.After(10 * time.Second):
if !viewMap[statsPrefix+"gfe_latency"] && !viewMap[statsPrefix+"gfe_header_missing_count"] {
t.Fatal("no stats were exported before timeout")
}
Expand Down
1 change: 0 additions & 1 deletion spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -1061,7 +1061,6 @@ func (s *inMemSpannerServer) PartitionQuery(ctx context.Context, req *spannerpb.
}
partitions = append(partitions, &spannerpb.Partition{PartitionToken: token})
}

return &spannerpb.PartitionResponse{
Partitions: partitions,
Transaction: tx,
Expand Down
9 changes: 4 additions & 5 deletions spanner/oc_test.go
Expand Up @@ -22,15 +22,14 @@ import (
"testing"
"time"

structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/api/iterator"
spannerpb "google.golang.org/genproto/googleapis/spanner/v1"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/version"
stestutil "cloud.google.com/go/spanner/internal/testutil"
structpb "github.com/golang/protobuf/ptypes/struct"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"google.golang.org/api/iterator"
spannerpb "google.golang.org/genproto/googleapis/spanner/v1"
)

// Check that stats are being exported.
Expand Down Expand Up @@ -269,7 +268,7 @@ func TestOCStats_GFE_Latency(t *testing.T) {
te := testutil.NewTestExporter([]*view.View{GFELatencyView, GFEHeaderMissingCountView}...)
defer te.Unregister()

GFELatencyOrHeaderMissingCountEnabled = true
GFELatencyMetricsEnabled = true

server, client, teardown := setupMockedTestServer(t)
defer teardown()
Expand Down
2 changes: 1 addition & 1 deletion spanner/pdml.go
Expand Up @@ -119,7 +119,7 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
Selector: &sppb.TransactionSelector_Id{Id: res.Id},
}
resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))
if GFELatencyOrHeaderMissingCountEnabled && md != nil && sh.session.pool != nil {
if GFELatencyMetricsEnabled && md != nil && sh.session.pool != nil {
errGFE := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
if errGFE != nil {
return 0, errGFE
Expand Down
1 change: 0 additions & 1 deletion spanner/session.go
Expand Up @@ -349,7 +349,6 @@ func (s *session) delete(ctx context.Context) {
// Ignore the error because even if we fail to explicitly destroy the
// session, it will be eventually garbage collected by Cloud Spanner.
err := s.client.DeleteSession(contextWithOutgoingMetadata(ctx, s.md), &sppb.DeleteSessionRequest{Name: s.getID()})

// Do not log DeadlineExceeded errors when deleting sessions, as these do
// not indicate anything the user can or should act upon.
if err != nil && ErrCode(err) != codes.DeadlineExceeded {
Expand Down

0 comments on commit 83cba34

Please sign in to comment.