Skip to content

Commit

Permalink
test(spanner)!: fix data race in spanner integration tests (#5276)
Browse files Browse the repository at this point in the history
Co-authored-by: Rahul Yadav <irahul@google.com>
  • Loading branch information
rahul2393 and rahul2393 committed Jan 6, 2022
1 parent bc97804 commit 22df34b
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 27 deletions.
10 changes: 5 additions & 5 deletions spanner/batch.go
Expand Up @@ -143,7 +143,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
PartitionOptions: opt.toProto(),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "PartitionReadUsingIndexWithOptions"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Cleanup"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
return client, err
}
md, err := client.Header()
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand All @@ -347,7 +347,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
}
md, err := client.Header()

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion spanner/integration_test.go
Expand Up @@ -3271,7 +3271,7 @@ func TestIntegration_GFE_Latency(t *testing.T) {
defer cancel()

te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView)
GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)

client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
defer cleanup()
Expand Down
2 changes: 1 addition & 1 deletion spanner/oc_test.go
Expand Up @@ -268,7 +268,7 @@ func TestOCStats_GFE_Latency(t *testing.T) {
te := testutil.NewTestExporter([]*view.View{GFELatencyView, GFEHeaderMissingCountView}...)
defer te.Unregister()

GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)

server, client, teardown := setupMockedTestServer(t)
defer teardown()
Expand Down
2 changes: 1 addition & 1 deletion spanner/pdml.go
Expand Up @@ -119,7 +119,7 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
Selector: &sppb.TransactionSelector_Id{Id: res.Id},
}
resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))
if GFELatencyMetricsEnabled && md != nil && sh.session.pool != nil {
if getGFELatencyMetricsFlag() && md != nil && sh.session.pool != nil {
err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
if err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions spanner/sessionclient.go
Expand Up @@ -138,7 +138,7 @@ func (sc *sessionClient) createSession(ctx context.Context) (*session, error) {
Session: &sppb.Session{Labels: sc.sessionLabels},
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil {
if getGFELatencyMetricsFlag() && md != nil {
_, instance, database, err := parseDatabaseName(sc.database)
if err != nil {
return nil, ToSpannerError(err)
Expand Down Expand Up @@ -260,7 +260,7 @@ func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createC
SessionTemplate: &sppb.Session{Labels: labels},
}, gax.WithGRPCOptions(grpc.Header(&mdForGFELatency)))

if GFELatencyMetricsEnabled && mdForGFELatency != nil {
if getGFELatencyMetricsFlag() && mdForGFELatency != nil {
_, instance, database, err := parseDatabaseName(sc.database)
if err != nil {
trace.TracePrintf(ctx, nil, "Error getting instance and database name: %v", err)
Expand Down
33 changes: 21 additions & 12 deletions spanner/stats.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"strconv"
"strings"
"sync"
"testing"

"cloud.google.com/go/internal/version"
Expand All @@ -42,8 +43,10 @@ var (
tagNumReadSessions = tag.Tag{Key: tagKeyType, Value: "num_read_sessions"}
tagNumWriteSessions = tag.Tag{Key: tagKeyType, Value: "num_write_prepared_sessions"}
tagKeyMethod = tag.MustNewKey("grpc_client_method")
// GFELatencyMetricsEnabled is used to track if GFELatency and GFEHeaderMissingCount need to be recorded
GFELatencyMetricsEnabled = false
// gfeLatencyMetricsEnabled is used to track if GFELatency and GFEHeaderMissingCount need to be recorded
gfeLatencyMetricsEnabled = false
// mutex to avoid data race in reading/writing the above flag
statsMu = sync.RWMutex{}
)

func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
Expand Down Expand Up @@ -213,28 +216,40 @@ func EnableStatViews() error {

// EnableGfeLatencyView enables GFELatency metric
func EnableGfeLatencyView() error {
GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)
return view.Register(GFELatencyView)
}

// EnableGfeHeaderMissingCountView enables GFEHeaderMissingCount metric
func EnableGfeHeaderMissingCountView() error {
GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)
return view.Register(GFEHeaderMissingCountView)
}

// EnableGfeLatencyAndHeaderMissingCountViews enables GFEHeaderMissingCount and GFELatency metric
func EnableGfeLatencyAndHeaderMissingCountViews() error {
GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)
return view.Register(
GFELatencyView,
GFEHeaderMissingCountView,
)
}

func getGFELatencyMetricsFlag() bool {
statsMu.RLock()
defer statsMu.RUnlock()
return gfeLatencyMetricsEnabled
}

func setGFELatencyMetricsFlag(enable bool) {
statsMu.Lock()
gfeLatencyMetricsEnabled = enable
statsMu.Unlock()
}

// DisableGfeLatencyAndHeaderMissingCountViews disables GFEHeaderMissingCount and GFELatency metric
func DisableGfeLatencyAndHeaderMissingCountViews() {
GFELatencyMetricsEnabled = false
setGFELatencyMetricsFlag(false)
view.Unregister(
GFELatencyView,
GFEHeaderMissingCountView,
Expand Down Expand Up @@ -267,12 +282,6 @@ func checkCommonTagsGFELatency(t *testing.T, m map[tag.Key]string) {
if !strings.HasPrefix(m[tagKeyClientID], "client") {
t.Fatalf("Incorrect client ID: %v", m[tagKeyClientID])
}
if !strings.HasPrefix(m[tagKeyInstance], "gotest") {
t.Fatalf("Incorrect instance ID: %v", m[tagKeyInstance])
}
if !strings.HasPrefix(m[tagKeyDatabase], "gotest") {
t.Fatalf("Incorrect database ID: %v", m[tagKeyDatabase])
}
if m[tagKeyLibVersion] != version.Repo {
t.Fatalf("Incorrect library version: %v", m[tagKeyLibVersion])
}
Expand Down
10 changes: 5 additions & 5 deletions spanner/transaction.go
Expand Up @@ -194,7 +194,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
return client, err
}
md, err := client.Header()
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "ReadWithOptions"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -402,7 +402,7 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, options Que
return client, err
}
md, err := client.Header()
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "query"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -577,7 +577,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
},
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "begin_BeginTransaction"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -931,7 +931,7 @@ func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts
var md metadata.MD
resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "update"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -1006,7 +1006,7 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts
RequestOptions: createRequestOptions(opts.Priority, opts.RequestTag, t.txOpts.TransactionTag),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "batchUpdateWithOptions"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err))
}
Expand Down

0 comments on commit 22df34b

Please sign in to comment.