Skip to content

Commit

Permalink
feat(storage): allow retry ErrorFunc configs (googleapis#5166)
Browse files Browse the repository at this point in the history
Allows users to supply a custom ErrorFunc which will be used
to determine whether the error is retryable.
  • Loading branch information
tritone authored and asthamohta committed Dec 7, 2021
1 parent fa5e458 commit b672cd0
Show file tree
Hide file tree
Showing 14 changed files with 473 additions and 26 deletions.
43 changes: 38 additions & 5 deletions spanner/batch.go
Expand Up @@ -20,6 +20,10 @@ import (
"bytes"
"context"
"encoding/gob"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"log"
"time"

Expand Down Expand Up @@ -124,6 +128,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
partitions []*Partition
)
kset, err = keys.keySetProto()
var md metadata.MD
// Request partitions.
if err != nil {
return nil, err
Expand All @@ -136,7 +141,11 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex
Columns: columns,
KeySet: kset,
PartitionOptions: opt.toProto(),
})
}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "PartitionReadUsingIndexWithOptions")
}
// Prepare ReadRequest.
req := &sppb.ReadRequest{
Session: sid,
Expand Down Expand Up @@ -180,6 +189,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
if err != nil {
return nil, err
}
var md metadata.MD

// request Partitions
req := &sppb.PartitionQueryRequest{
Expand All @@ -190,7 +200,11 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement
Params: params,
ParamTypes: paramTypes,
}
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req)
resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "partitionQuery")
}

// prepare ExecuteSqlRequest
r := &sppb.ExecuteSqlRequest{
Expand Down Expand Up @@ -251,7 +265,13 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
}
t.sh = nil
sid, client := sh.getID(), sh.getClient()
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid})
var md metadata.MD
err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md)))

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "Cleanup")
}

if err != nil {
var logger *log.Logger
if sh.session != nil {
Expand All @@ -277,10 +297,11 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
// Might happen if transaction is closed in the middle of a API call.
return &RowIterator{err: errSessionClosed(sh)}
}
var md metadata.MD
// Read or query partition.
if p.rreq != nil {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.StreamingRead(ctx, &sppb.ReadRequest{
client, err := client.StreamingRead(ctx, &sppb.ReadRequest{
Session: p.rreq.Session,
Transaction: p.rreq.Transaction,
Table: p.rreq.Table,
Expand All @@ -291,10 +312,16 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.rreq.RequestOptions,
ResumeToken: resumeToken,
})
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "Execute")
}
return client,err
}
} else {
rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
return client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{
Session: p.qreq.Session,
Transaction: p.qreq.Transaction,
Sql: p.qreq.Sql,
Expand All @@ -305,6 +332,12 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
RequestOptions: p.qreq.RequestOptions,
ResumeToken: resumeToken,
})
md, _ = client.Header()

if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "Execute")
}
return client,err
}
}
return stream(
Expand Down
8 changes: 7 additions & 1 deletion spanner/client.go
Expand Up @@ -19,6 +19,8 @@ package spanner
import (
"context"
"fmt"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
"log"
"os"
"regexp"
Expand Down Expand Up @@ -335,6 +337,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
}
sh = &sessionHandle{session: s}

var md metadata.MD
// Begin transaction.
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Expand All @@ -343,7 +346,10 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
ReadOnly: buildTransactionOptionsReadOnly(tb, true),
},
},
})
}, gax.WithGRPCOptions(grpc.Header(&md)))
if GFELatencyOrHeaderMissingCountEnabled && md != nil{
captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "BatchReadOnlyTransaction_BeginTransaction")
}
if err != nil {
return nil, ToSpannerError(err)
}
Expand Down
1 change: 1 addition & 0 deletions spanner/go.mod
Expand Up @@ -4,6 +4,7 @@ go 1.11

require (
cloud.google.com/go v0.97.0
contrib.go.opencensus.io/exporter/stackdriver v0.13.10
github.com/golang/protobuf v1.5.2
github.com/google/go-cmp v0.5.6
github.com/googleapis/gax-go/v2 v2.1.1
Expand Down
27 changes: 26 additions & 1 deletion spanner/go.sum
Expand Up @@ -34,6 +34,8 @@ cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4g
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/monitoring v1.1.0 h1:ZnyNdf/XRcynMmKzRSNTOdOyYPs6G7do1l2D2hIvIKo=
cloud.google.com/go/monitoring v1.1.0/go.mod h1:L81pzz7HKn14QCMaCs6NTQkdBnE87TElyanS95vIcl4=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
Expand All @@ -43,14 +45,21 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/trace v1.0.0 h1:laKx2y7IWMjguCe5zZx6n7qLtREk4kyE69SXVC0VSN8=
cloud.google.com/go/trace v1.0.0/go.mod h1:4iErSByzxkyHWzzlAj63/Gmjz0NH1ASqhJguHpGcr6A=
contrib.go.opencensus.io/exporter/stackdriver v0.13.10 h1:a9+GZPUe+ONKUwULjlEOucMMG0qfSCCenlji0Nhqbys=
contrib.go.opencensus.io/exporter/stackdriver v0.13.10/go.mod h1:I5htMbyta491eUxufwwZPQdcKvvgzMB4O9ni41YnIM8=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/aws/aws-sdk-go v1.37.0 h1:GzFnhOIsrGyQ69s7VgqtrG2BG8v7X7vwB3Xpbd/DBBk=
github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down Expand Up @@ -158,6 +167,10 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand All @@ -166,6 +179,7 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand Down Expand Up @@ -295,6 +309,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -341,6 +356,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down Expand Up @@ -441,6 +458,8 @@ google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6
google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI=
google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E=
google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU=
google.golang.org/api v0.60.0 h1:eq/zs5WPH4J9undYM9IP1O7dSr7Yh8Y0GtSCpzGzIUk=
google.golang.org/api v0.60.0/go.mod h1:d7rl65NZAkEQ90JFzqBjcRq1TVeG5ZoGV3sSpEnnVb4=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
Expand Down Expand Up @@ -506,7 +525,11 @@ google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEc
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211018162055-cf77aa76bad2/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211021150943-2b146023228c/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 h1:b9mVrqYfq3P4bCdaLg1qtBnPzUYgglsIdjZkL/fQVOE=
google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
Expand Down Expand Up @@ -557,6 +580,8 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
82 changes: 82 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"flag"
"fmt"
"go.opencensus.io/stats/view"
"log"
"math"
"math/big"
Expand Down Expand Up @@ -3264,6 +3265,87 @@ func TestIntegration_DirectPathFallback(t *testing.T) {
}
}

func TestIntegration_GFE_Latency(t *testing.T){
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView)
GFELatencyOrHeaderMissingCountEnabled= true

client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements)
defer cleanup()

singerColumns := []string{"SingerId", "FirstName", "LastName"}
var ms = []*Mutation{
InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}),
}
_, err := client.Apply(ctx, ms)
if err!= nil {
t.Fatalf("got error %v", err)
}
_, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId", "FirstName", "LastName"})
if err!= nil {
t.Fatalf("got error %v", err)
}
waitErr := &Error{}
waitFor(t, func() error {
select {
case stat := <-te.Stats:
if len(stat.Rows) > 0 {
return nil
}
}
return waitErr
})

var viewMap = map[string]bool{
statsPrefix+"gfe_latency" : false,
statsPrefix+"gfe_header_missing_count" : false,
}

for {
if viewMap[statsPrefix+"gfe_latency"] && viewMap[statsPrefix+"gfe_header_missing_count"]{
break
}
select {
case stat := <-te.Stats:
if len(stat.Rows) == 0 {
t.Fatal("No metrics are exported")
}
if stat.View.Measure.Name() != statsPrefix+"gfe_latency" && stat.View.Measure.Name() != statsPrefix+"gfe_header_missing_count" {
t.Fatalf("Incorrect measure: got %v, want %v", stat.View.Measure.Name(), statsPrefix+"gfe_latency or "+statsPrefix+"gfe_header_missing_count")
} else
{
viewMap[stat.View.Measure.Name()] = true
}
for _, row := range stat.Rows {
m := getTagMap(row.Tags)
checkCommonTagsGFELatency(t, m)
var data string
switch row.Data.(type) {
default:
data = fmt.Sprintf("%v", row.Data)
case *view.CountData:
data = fmt.Sprintf("%v", row.Data.(*view.CountData).Value)
case *view.LastValueData:
data = fmt.Sprintf("%v", row.Data.(*view.LastValueData).Value)
case *view.DistributionData:
data = fmt.Sprintf("%v", row.Data.(*view.DistributionData).Count)
}
if got, want := fmt.Sprintf("%v", data), "0"; got <= want {
t.Fatalf("Incorrect data: got %v, wanted more than %v for metric %v", got, want, stat.View.Measure.Name())
}
}
case <-time.After(2 * time.Second):
if !viewMap[statsPrefix+"gfe_latency"] || !viewMap[statsPrefix+"gfe_header_missing_count"] {
t.Fatal("no stats were exported before timeout")
}
}
}
DisableGfeLatencyAndHeaderMissingCountViews()
}

// Prepare initializes Cloud Spanner testing DB and clients.
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
if databaseAdmin == nil {
Expand Down

0 comments on commit b672cd0

Please sign in to comment.