Skip to content

Commit

Permalink
feat: fixes for errors
Browse files Browse the repository at this point in the history
  • Loading branch information
asthamohta committed Dec 9, 2021
1 parent 8f86321 commit e4ae8e1
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 138 deletions.
71 changes: 39 additions & 32 deletions spanner/batch.go
Expand Up @@ -143,14 +143,17 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
PartitionOptions: opt.toProto(),
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, _ := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
captureGFELatencyStats(ctxGFE, md, "PartitionReadUsingIndexWithOptions")
errGFE := captureGFELatencyStats(ctxGFE, md, "PartitionReadUsingIndexWithOptions")
if errGFE != nil {
return nil, errGFE
}
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Expand Down Expand Up @@ -208,7 +211,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
Expand Down Expand Up @@ -285,15 +288,15 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
errGFE = captureGFELatencyStats(ctxGFE, md, "Cleanup")
if errGFE != nil{
if errGFE != nil {
logf(logger, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", err)
}
}
Expand Down Expand Up @@ -334,25 +337,27 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
})
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil{
return client, errGFE
if client != nil {
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
}
}
}
return client,err
return client, err
}
} else {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Expand All @@ -363,21 +368,23 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
})
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil{
return client, errGFE
if client != nil {
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil && t.txReadOnly.set == true {
ctxGFE, errGFE := tag.New(ctx,
tag.Upsert(tagKeyClientID, t.txReadOnly.clientId),
tag.Upsert(tagKeyDatabase, t.txReadOnly.database),
tag.Upsert(tagKeyInstance, t.txReadOnly.instance),
tag.Upsert(tagKeyLibVersion, t.txReadOnly.libVersion),
)
errGFE = captureGFELatencyStats(ctxGFE, md, "Execute")
if errGFE != nil {
return client, errGFE
}
}
}
return client,err
return client, err
}
}
return stream(
Expand Down
95 changes: 65 additions & 30 deletions spanner/client.go
Expand Up @@ -262,7 +262,6 @@ func (c *Client) Close() {
// "time-travel" to prior versions of the database, see the documentation of
// TimestampBound for details.
func (c *Client) Single() *ReadOnlyTransaction {
_, instance, database, _ := parseDatabaseName(c.sc.database)
t := &ReadOnlyTransaction{singleUse: true}
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
Expand All @@ -282,11 +281,19 @@ func (c *Client) Single() *ReadOnlyTransaction {
t.sh = sh
return nil
}
t.txReadOnly.CommonTags = CommonTags{
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}
return t
}
Expand All @@ -301,19 +308,26 @@ func (c *Client) Single() *ReadOnlyTransaction {
// "time-travel" to prior versions of the database, see the documentation of
// TimestampBound for details.
func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction {
_, instance, database, _ := parseDatabaseName(c.sc.database)
t := &ReadOnlyTransaction{
singleUse: false,
txReadyOrClosed: make(chan struct{}),
}
t.txReadOnly.sp = c.idleSessions
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.CommonTags = CommonTags{
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}
return t
}
Expand Down Expand Up @@ -367,7 +381,6 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
}

_, instance, database, err := parseDatabaseName(c.sc.database)
if err != nil {
return nil, ToSpannerError(err)
}
Expand All @@ -387,11 +400,19 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.CommonTags = CommonTags{
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}
return t, nil
}
Expand All @@ -408,7 +429,6 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
}
sh := &sessionHandle{session: s}

_, instance, database, err := parseDatabaseName(c.sc.database)
t := &BatchReadOnlyTransaction{
ReadOnlyTransaction: ReadOnlyTransaction{
tx: tid.tid,
Expand All @@ -421,11 +441,19 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID)
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txReadOnly.CommonTags = CommonTags{
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}
return t
}
Expand Down Expand Up @@ -508,16 +536,23 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea
} else {
t = &ReadWriteTransaction{}
}
_, instance, database, _ := parseDatabaseName(c.sc.database)
t.txReadOnly.sh = sh
t.txReadOnly.txReadEnv = t
t.txReadOnly.qo = c.qo
t.txOpts = options
t.txReadOnly.CommonTags = CommonTags{
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
t.txReadOnly.CommonTags = CommonTags{}
t.txReadOnly.CommonTags.init()
if c.sc != nil {
_, instance, database, err := parseDatabaseName(c.sc.database)
if err == nil {
t.txReadOnly.CommonTags = CommonTags{
set: true,
clientId: c.sc.id,
database: database,
instance: instance,
libVersion: version.Repo,
}
}
}

trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
Expand Down
17 changes: 8 additions & 9 deletions spanner/integration_test.go
Expand Up @@ -3265,13 +3265,13 @@ func TestIntegration_DirectPathFallback(t *testing.T) {
}
}

func TestIntegration_GFE_Latency(t *testing.T){
func TestIntegration_GFE_Latency(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView)
GFELatencyOrHeaderMissingCountEnabled= true
GFELatencyOrHeaderMissingCountEnabled = true

client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
defer cleanup()
Expand All @@ -3281,11 +3281,11 @@ func TestIntegration_GFE_Latency(t *testing.T){
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
_, err := client.Apply(ctx, ms)
if err!= nil {
if err != nil {
t.Fatalf("got error %v", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId", "FirstName", "LastName"})
if err!= nil {
if err != nil {
t.Fatalf("got error %v", err)
}
waitErr := &Error{}
Expand All @@ -3299,12 +3299,12 @@ func TestIntegration_GFE_Latency(t *testing.T){
return waitErr
})

var viewMap = map[string]bool{statsPrefix+"gfe_latency" : false,
statsPrefix+"gfe_header_missing_count" : false,
var viewMap = map[string]bool{statsPrefix + "gfe_latency": false,
statsPrefix + "gfe_header_missing_count": false,
}

for {
if viewMap[statsPrefix+"gfe_latency"] || viewMap[statsPrefix+"gfe_header_missing_count"]{
if viewMap[statsPrefix+"gfe_latency"] || viewMap[statsPrefix+"gfe_header_missing_count"] {
break
}
select {
Expand All @@ -3314,8 +3314,7 @@ func TestIntegration_GFE_Latency(t *testing.T){
}
if stat.View.Measure.Name() != statsPrefix+"gfe_latency" && stat.View.Measure.Name() != statsPrefix+"gfe_header_missing_count" {
t.Fatalf("Incorrect measure: got %v, want %v", stat.View.Measure.Name(), statsPrefix+"gfe_latency or "+statsPrefix+"gfe_header_missing_count")
} else
{
} else {
viewMap[stat.View.Measure.Name()] = true
}
for _, row := range stat.Rows {
Expand Down
4 changes: 2 additions & 2 deletions spanner/internal/testutil/inmem_spanner_server.go
Expand Up @@ -689,7 +689,7 @@ func (s *inMemSpannerServer) BatchCreateSessions(ctx context.Context, req *spann
s.totalSessionsCreated++
s.sessions[sessionName] = sessions[i]
}
header := metadata.New(map[string]string{"server-timing": "123"})
header := metadata.New(map[string]string{"server-timing": "gfet4t7; dur=123"})
if err := grpc.SendHeader(ctx, header); err != nil {
return nil, gstatus.Errorf(codes.Internal, "unable to send 'server-timing' header")
}
Expand Down Expand Up @@ -928,7 +928,7 @@ func (s *inMemSpannerServer) Read(ctx context.Context, req *spannerpb.ReadReques
}
s.receivedRequests <- req
s.mu.Unlock()
header := metadata.New(map[string]string{"server-timing": "123"})
header := metadata.New(map[string]string{"server-timing": "gfet4t7; dur=123"})
if err := grpc.SendHeader(ctx, header); err != nil {
return nil, gstatus.Errorf(codes.Internal, "unable to send 'server-timing' header")
}
Expand Down

0 comments on commit e4ae8e1

Please sign in to comment.