Skip to content

Commit

Permalink
feat(bigtable): Run E2E test over DirectPath
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanli-ml committed Nov 11, 2020
1 parent 5bb2b02 commit 79b12f6
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 27 deletions.
9 changes: 9 additions & 0 deletions bigtable/bigtable.go
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"net/url"
"os"
"strconv"
"time"

Expand All @@ -30,6 +31,7 @@ import (
"github.com/golang/protobuf/proto"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
gtransport "google.golang.org/api/transport/grpc"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
"google.golang.org/grpc"
Expand Down Expand Up @@ -81,6 +83,13 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
// TODO(grpc/grpc-go#1388) using connection pool without WithBlock
// can cause RPCs to fail randomly. We can delete this after the issue is fixed.
option.WithGRPCDialOption(grpc.WithBlock()))

// Expose an experimental flag to make client attempt DirectPath.
// Once e2e tests are finished, this flag will be removed, and client
// will attempt by default.
attemptDirectPath, _ := strconv.ParseBool(os.Getenv("ATTEMPT_DIRECTPATH"))
o = append(o, internaloption.EnableDirectPath(attemptDirectPath))

o = append(o, opts...)
connPool, err := gtransport.DialPool(ctx, o...)
if err != nil {
Expand Down
49 changes: 40 additions & 9 deletions bigtable/export_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"flag"
"fmt"
"os"
"strings"
"time"

Expand All @@ -30,6 +31,7 @@ import (
"google.golang.org/api/option"
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)

var legacyUseProd string
Expand All @@ -45,6 +47,8 @@ func init() {
flag.StringVar(&c.Instance, "it.instance", "", "Bigtable instance to use")
flag.StringVar(&c.Cluster, "it.cluster", "", "Bigtable cluster to use")
flag.StringVar(&c.Table, "it.table", "", "Bigtable table to create")
flag.BoolVar(&c.AttemptDirectPath, "it.attempt-directpath", false, "Attempt DirectPath")
flag.BoolVar(&c.DirectPathIpv4Only, "it.directpath-ipv4-only", false, "Attempt DirectPath on a ipv4-only VM")

// Backwards compat
flag.StringVar(&legacyUseProd, "use_prod", "", `DEPRECATED: if set to "proj,instance,table", run integration test against production`)
Expand All @@ -53,13 +57,15 @@ func init() {

// IntegrationTestConfig contains parameters to pick and setup a IntegrationEnv for testing
type IntegrationTestConfig struct {
UseProd bool
AdminEndpoint string
DataEndpoint string
Project string
Instance string
Cluster string
Table string
UseProd bool
AdminEndpoint string
DataEndpoint string
Project string
Instance string
Cluster string
Table string
AttemptDirectPath bool
DirectPathIpv4Only bool
}

// IntegrationEnv represents a testing environment.
Expand All @@ -71,6 +77,7 @@ type IntegrationEnv interface {
NewInstanceAdminClient() (*InstanceAdminClient, error)
NewClient() (*Client, error)
Close()
GetPeer() *peer.Peer
}

// NewIntegrationEnv creates a new environment based on the command line args
Expand Down Expand Up @@ -124,6 +131,10 @@ func NewEmulatedEnv(config IntegrationTestConfig) (*EmulatedEnv, error) {
return env, nil
}

func (e *EmulatedEnv) GetPeer() *peer.Peer {
return nil
}

// Close stops & cleans up the emulator
func (e *EmulatedEnv) Close() {
e.server.Close()
Expand Down Expand Up @@ -197,7 +208,8 @@ func (e *EmulatedEnv) NewClient() (*Client, error) {

// ProdEnv encapsulates the state necessary to connect to the external Bigtable service
type ProdEnv struct {
config IntegrationTestConfig
config IntegrationTestConfig
peerInfo *peer.Peer
}

// NewProdEnv builds the environment representation
Expand All @@ -212,7 +224,22 @@ func NewProdEnv(config IntegrationTestConfig) (*ProdEnv, error) {
return nil, errors.New("Table not set")
}

return &ProdEnv{config}, nil
// AttemptDirectPath experimental flag. Will be removed once e2e test
// is done.
if config.AttemptDirectPath {
os.Setenv("ATTEMPT_DIRECTPATH", "true")
}

env := &ProdEnv{
config: config,
peerInfo: &peer.Peer{},
}
return env, nil
//return &ProdEnv{config}, nil
}

func (e *ProdEnv) GetPeer() *peer.Peer {
return e.peerInfo
}

// Close is a no-op for production environments
Expand Down Expand Up @@ -247,5 +274,9 @@ func (e *ProdEnv) NewClient() (*Client, error) {
if endpoint := e.config.DataEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
}

// For DirectPath tests, we need to add an interceptor to check the peer IP.
clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(e.peerInfo))))

return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
}
2 changes: 1 addition & 1 deletion bigtable/go.mod
Expand Up @@ -10,7 +10,7 @@ require (
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43
golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174 // indirect
google.golang.org/api v0.34.0
google.golang.org/api v0.35.0
google.golang.org/genproto v0.0.0-20201105153401-9d023cd09d72
google.golang.org/grpc v1.33.1
rsc.io/binaryregexp v0.2.0
Expand Down
2 changes: 2 additions & 0 deletions bigtable/go.sum
Expand Up @@ -445,6 +445,8 @@ google.golang.org/api v0.30.0 h1:yfrXXP61wVuLb0vBcG6qaOoIoqYEzOQS8jum51jkv2w=
google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc=
google.golang.org/api v0.34.0 h1:k40adF3uR+6x/+hO5Dh4ZFUqFp67vxvbpafFiJxl10A=
google.golang.org/api v0.34.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
google.golang.org/api v0.35.0 h1:TBCmTTxUrRDA1iTctnK/fIeitxIZ+TQuaf0j29fmCGo=
google.golang.org/api v0.35.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand Down

0 comments on commit 79b12f6

Please sign in to comment.