Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(spanner)!: fix data race in spanner integration tests #5276

Merged
merged 2 commits into from Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions spanner/batch.go
Expand Up @@ -143,7 +143,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
PartitionOptions: opt.toProto(),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "PartitionReadUsingIndexWithOptions"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Cleanup"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
return client, err
}
md, err := client.Header()
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand All @@ -347,7 +347,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
}
md, err := client.Header()

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion spanner/integration_test.go
Expand Up @@ -3271,7 +3271,7 @@ func TestIntegration_GFE_Latency(t *testing.T) {
defer cancel()

te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView)
GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)

client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
defer cleanup()
Expand Down
2 changes: 1 addition & 1 deletion spanner/oc_test.go
Expand Up @@ -268,7 +268,7 @@ func TestOCStats_GFE_Latency(t *testing.T) {
te := testutil.NewTestExporter([]*view.View{GFELatencyView, GFEHeaderMissingCountView}...)
defer te.Unregister()

GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)

server, client, teardown := setupMockedTestServer(t)
defer teardown()
Expand Down
2 changes: 1 addition & 1 deletion spanner/pdml.go
Expand Up @@ -119,7 +119,7 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq
Selector: &sppb.TransactionSelector_Id{Id: res.Id},
}
resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))
if GFELatencyMetricsEnabled && md != nil && sh.session.pool != nil {
if getGFELatencyMetricsFlag() && md != nil && sh.session.pool != nil {
err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql")
if err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions spanner/sessionclient.go
Expand Up @@ -138,7 +138,7 @@ func (sc *sessionClient) createSession(ctx context.Context) (*session, error) {
Session: &sppb.Session{Labels: sc.sessionLabels},
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil {
if getGFELatencyMetricsFlag() && md != nil {
_, instance, database, err := parseDatabaseName(sc.database)
if err != nil {
return nil, ToSpannerError(err)
Expand Down Expand Up @@ -260,7 +260,7 @@ func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createC
SessionTemplate: &sppb.Session{Labels: labels},
}, gax.WithGRPCOptions(grpc.Header(&mdForGFELatency)))

if GFELatencyMetricsEnabled && mdForGFELatency != nil {
if getGFELatencyMetricsFlag() && mdForGFELatency != nil {
_, instance, database, err := parseDatabaseName(sc.database)
if err != nil {
trace.TracePrintf(ctx, nil, "Error getting instance and database name: %v", err)
Expand Down
33 changes: 21 additions & 12 deletions spanner/stats.go
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"strconv"
"strings"
"sync"
"testing"

"cloud.google.com/go/internal/version"
Expand All @@ -42,8 +43,10 @@ var (
tagNumReadSessions = tag.Tag{Key: tagKeyType, Value: "num_read_sessions"}
tagNumWriteSessions = tag.Tag{Key: tagKeyType, Value: "num_write_prepared_sessions"}
tagKeyMethod = tag.MustNewKey("grpc_client_method")
// GFELatencyMetricsEnabled is used to track if GFELatency and GFEHeaderMissingCount need to be recorded
GFELatencyMetricsEnabled = false
// gfeLatencyMetricsEnabled is used to track if GFELatency and GFEHeaderMissingCount need to be recorded
gfeLatencyMetricsEnabled = false
// mutex to avoid data race in reading/writing the above flag
statsMu = sync.RWMutex{}
)

func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
Expand Down Expand Up @@ -213,28 +216,40 @@ func EnableStatViews() error {

// EnableGfeLatencyView enables GFELatency metric
func EnableGfeLatencyView() error {
GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)
return view.Register(GFELatencyView)
}

// EnableGfeHeaderMissingCountView enables GFEHeaderMissingCount metric
func EnableGfeHeaderMissingCountView() error {
GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)
return view.Register(GFEHeaderMissingCountView)
}

// EnableGfeLatencyAndHeaderMissingCountViews enables GFEHeaderMissingCount and GFELatency metric
func EnableGfeLatencyAndHeaderMissingCountViews() error {
GFELatencyMetricsEnabled = true
setGFELatencyMetricsFlag(true)
return view.Register(
GFELatencyView,
GFEHeaderMissingCountView,
)
}

func getGFELatencyMetricsFlag() bool {
statsMu.RLock()
defer statsMu.RUnlock()
return gfeLatencyMetricsEnabled
}

func setGFELatencyMetricsFlag(enable bool) {
statsMu.Lock()
gfeLatencyMetricsEnabled = enable
statsMu.Unlock()
}

// DisableGfeLatencyAndHeaderMissingCountViews disables GFEHeaderMissingCount and GFELatency metric
func DisableGfeLatencyAndHeaderMissingCountViews() {
GFELatencyMetricsEnabled = false
setGFELatencyMetricsFlag(false)
view.Unregister(
GFELatencyView,
GFEHeaderMissingCountView,
Expand Down Expand Up @@ -267,12 +282,6 @@ func checkCommonTagsGFELatency(t *testing.T, m map[tag.Key]string) {
if !strings.HasPrefix(m[tagKeyClientID], "client") {
t.Fatalf("Incorrect client ID: %v", m[tagKeyClientID])
}
if !strings.HasPrefix(m[tagKeyInstance], "gotest") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch 👍

t.Fatalf("Incorrect instance ID: %v", m[tagKeyInstance])
}
if !strings.HasPrefix(m[tagKeyDatabase], "gotest") {
t.Fatalf("Incorrect database ID: %v", m[tagKeyDatabase])
}
if m[tagKeyLibVersion] != version.Repo {
t.Fatalf("Incorrect library version: %v", m[tagKeyLibVersion])
}
Expand Down
10 changes: 5 additions & 5 deletions spanner/transaction.go
Expand Up @@ -194,7 +194,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key
return client, err
}
md, err := client.Header()
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "ReadWithOptions"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -402,7 +402,7 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, options Que
return client, err
}
md, err := client.Header()
if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "query"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -577,7 +577,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
},
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "begin_BeginTransaction"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -931,7 +931,7 @@ func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts
var md metadata.MD
resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "update"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err)
}
Expand Down Expand Up @@ -1006,7 +1006,7 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts
RequestOptions: createRequestOptions(opts.Priority, opts.RequestTag, t.txOpts.TransactionTag),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyMetricsEnabled && md != nil && t.ct != nil {
if getGFELatencyMetricsFlag() && md != nil && t.ct != nil {
if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "batchUpdateWithOptions"); err != nil {
trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err))
}
Expand Down