Skip to content

Commit

Permalink
feat(bigtable): Run E2E test over DirectPath
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanli-ml committed Nov 23, 2020
1 parent cf30830 commit 4a99910
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 26 deletions.
9 changes: 9 additions & 0 deletions bigtable/bigtable.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/golang/protobuf/proto"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
gtransport "google.golang.org/api/transport/grpc"
btpb "google.golang.org/genproto/googleapis/bigtable/v2"
"google.golang.org/grpc"
Expand All @@ -41,6 +42,11 @@ import (
const prodAddr = "bigtable.googleapis.com:443"
const mtlsProdAddr = "bigtable.mtls.googleapis.com:443"

// AttemptDirectPathFlag is an experimental flag to make client attempt DirectPath.
// Once e2e tests are finished, this flag will be removed, and client
// will attempt by default.
var AttemptDirectPathFlag = false

// Client is a client for reading and writing data to tables in an instance.
//
// A Client is safe to use concurrently, except for its Close method.
Expand Down Expand Up @@ -81,6 +87,9 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
// TODO(grpc/grpc-go#1388) using connection pool without WithBlock
// can cause RPCs to fail randomly. We can delete this after the issue is fixed.
option.WithGRPCDialOption(grpc.WithBlock()))

o = append(o, internaloption.EnableDirectPath(AttemptDirectPathFlag))

o = append(o, opts...)
connPool, err := gtransport.DialPool(ctx, o...)
if err != nil {
Expand Down
48 changes: 39 additions & 9 deletions bigtable/export_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/api/option"
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)

var legacyUseProd string
Expand All @@ -45,6 +46,8 @@ func init() {
flag.StringVar(&c.Instance, "it.instance", "", "Bigtable instance to use")
flag.StringVar(&c.Cluster, "it.cluster", "", "Bigtable cluster to use")
flag.StringVar(&c.Table, "it.table", "", "Bigtable table to create")
flag.BoolVar(&c.AttemptDirectPath, "it.attempt-directpath", false, "Attempt DirectPath")
flag.BoolVar(&c.DirectPathIPV4Only, "it.directpath-ipv4-only", false, "Attempt DirectPath on a ipv4-only VM")

// Backwards compat
flag.StringVar(&legacyUseProd, "use_prod", "", `DEPRECATED: if set to "proj,instance,table", run integration test against production`)
Expand All @@ -53,13 +56,15 @@ func init() {

// IntegrationTestConfig contains parameters to pick and setup a IntegrationEnv for testing
type IntegrationTestConfig struct {
UseProd bool
AdminEndpoint string
DataEndpoint string
Project string
Instance string
Cluster string
Table string
UseProd bool
AdminEndpoint string
DataEndpoint string
Project string
Instance string
Cluster string
Table string
AttemptDirectPath bool
DirectPathIPV4Only bool
}

// IntegrationEnv represents a testing environment.
Expand All @@ -71,6 +76,7 @@ type IntegrationEnv interface {
NewInstanceAdminClient() (*InstanceAdminClient, error)
NewClient() (*Client, error)
Close()
GetPeer() *peer.Peer
}

// NewIntegrationEnv creates a new environment based on the command line args
Expand Down Expand Up @@ -124,6 +130,10 @@ func NewEmulatedEnv(config IntegrationTestConfig) (*EmulatedEnv, error) {
return env, nil
}

func (e *EmulatedEnv) GetPeer() *peer.Peer {
return nil
}

// Close stops & cleans up the emulator
func (e *EmulatedEnv) Close() {
e.server.Close()
Expand Down Expand Up @@ -197,7 +207,8 @@ func (e *EmulatedEnv) NewClient() (*Client, error) {

// ProdEnv encapsulates the state necessary to connect to the external Bigtable service
type ProdEnv struct {
config IntegrationTestConfig
config IntegrationTestConfig
peerInfo *peer.Peer
}

// NewProdEnv builds the environment representation
Expand All @@ -212,7 +223,22 @@ func NewProdEnv(config IntegrationTestConfig) (*ProdEnv, error) {
return nil, errors.New("Table not set")
}

return &ProdEnv{config}, nil
// AttemptDirectPath experimental flag. Will be removed once e2e test
// is done.
if config.AttemptDirectPath {
AttemptDirectPathFlag = true
}

env := &ProdEnv{
config: config,
peerInfo: &peer.Peer{},
}
return env, nil
//return &ProdEnv{config}, nil
}

func (e *ProdEnv) GetPeer() *peer.Peer {
return e.peerInfo
}

// Close is a no-op for production environments
Expand Down Expand Up @@ -247,5 +273,9 @@ func (e *ProdEnv) NewClient() (*Client, error) {
if endpoint := e.config.DataEndpoint; endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(endpoint))
}

// For DirectPath tests, we need to add an interceptor to check the peer IP.
clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(e.peerInfo))))

return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...)
}

0 comments on commit 4a99910

Please sign in to comment.