Skip to content

Commit

Permalink
feat(spanner): run E2E test over DirectPath (#3466)
Browse files Browse the repository at this point in the history
Co-authored-by: Knut Olav Løite <koloite@gmail.com>
Co-authored-by: Thiago Nunes <thiagotnunes@google.com>
  • Loading branch information
3 people committed Dec 22, 2020
1 parent 4e1ef1b commit 18e3a4f
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -39,14 +39,21 @@ import (
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)

const (
directPathIPV6Prefix = "[2001:4860:8040"
directPathIPV4Prefix = "34.126"
)

var (
// testProjectID specifies the project used for testing. It can be changed
// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
Expand All @@ -68,6 +75,9 @@ var (
databaseAdmin *database.DatabaseAdminClient
instanceAdmin *instance.InstanceAdminClient

dpConfig directPathTestConfig
peerInfo *peer.Peer

singerDBStatements = []string{
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
Expand Down Expand Up @@ -160,6 +170,17 @@ var (
validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$")
)

func init() {
flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag")
flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM")
peerInfo = &peer.Peer{}
}

type directPathTestConfig struct {
attemptDirectPath bool
directPathIPv4Only bool
}

func parseInstanceName(inst string) (project, instance string, err error) {
matches := validInstancePattern.FindStringSubmatch(inst)
if len(matches) == 0 {
Expand Down Expand Up @@ -206,6 +227,11 @@ func initIntegrationTests() (cleanup func()) {
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
// TODO(mohanli): Move EnableDirectPath internal option to client.go when DirectPath is ready for public beta.
opts = append(opts, internaloption.EnableDirectPath(true))
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
var err error
// Create InstanceAdmin and DatabaseAdmin clients.
instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...)
Expand Down Expand Up @@ -353,6 +379,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
}
// Calculate time difference between Cloud Spanner server and localhost to
// use to determine the exact staleness value to use.
Expand Down Expand Up @@ -436,6 +463,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
}
verifyDirectPathRemoteAddress(t)
rts, err := su.Timestamp()
if err != nil {
t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
Expand All @@ -452,6 +480,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
}
verifyDirectPathRemoteAddress(t)
rts, err = su.Timestamp()
if err != nil {
t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
Expand All @@ -470,6 +499,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
continue
}
verifyDirectPathRemoteAddress(t)
v, err := rowToValues(r)
if err != nil {
continue
Expand All @@ -492,6 +522,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
}
verifyDirectPathRemoteAddress(t)
// The results from ReadUsingIndex is sorted by the index rather than primary key.
if len(got) != len(test.want) {
t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
Expand Down Expand Up @@ -530,6 +561,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
continue
}
verifyDirectPathRemoteAddress(t)
v, err := rowToValues(r)
if err != nil {
continue
Expand Down Expand Up @@ -931,10 +963,12 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) {
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
if bf <= 0 {
return nil
}
Expand All @@ -948,6 +982,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) {
if err != nil {
t.Errorf("%d: failed to execute transaction: %v", iter, err)
}
verifyDirectPathRemoteAddress(t)
}(i)
}
// Because of context timeout, all goroutines will eventually return.
Expand All @@ -958,13 +993,15 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) {
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
if ce := r.Column(0, &bf); ce != nil {
return ce
}
bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
if bf != 30 || bb != 21 {
t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
}
Expand All @@ -973,6 +1010,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) {
if err != nil {
t.Errorf("failed to check balances: %v", err)
}
verifyDirectPathRemoteAddress(t)
}

func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) {
Expand Down Expand Up @@ -1087,13 +1125,15 @@ func TestIntegration_Reads(t *testing.T) {
if _, err := client.Apply(ctx, ms); err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)

// Empty read.
rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
if got, want := len(rows), 0; got != want {
t.Errorf("got %d, want %d", got, want)
}
Expand All @@ -1104,6 +1144,7 @@ func TestIntegration_Reads(t *testing.T) {
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
if got, want := len(rows), 0; got != want {
t.Errorf("got %d, want %d", got, want)
}
Expand All @@ -1113,6 +1154,7 @@ func TestIntegration_Reads(t *testing.T) {
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
var got testTableRow
if err := row.ToStruct(&got); err != nil {
t.Fatal(err)
Expand All @@ -1126,12 +1168,14 @@ func TestIntegration_Reads(t *testing.T) {
if ErrCode(err) != codes.NotFound {
t.Fatalf("got %v, want NotFound", err)
}
verifyDirectPathRemoteAddress(t)

// Index point read.
rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns)
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
var gotIndex testTableRow
if err := rowIndex.ToStruct(&gotIndex); err != nil {
t.Fatal(err)
Expand All @@ -1144,6 +1188,7 @@ func TestIntegration_Reads(t *testing.T) {
if ErrCode(err) != codes.NotFound {
t.Fatalf("got %v, want NotFound", err)
}
verifyDirectPathRemoteAddress(t)
rangeReads(ctx, t, client)
indexRangeReads(ctx, t, client)
}
Expand Down Expand Up @@ -1293,6 +1338,7 @@ func TestIntegration_DbRemovalRecovery(t *testing.T) {
if _, err := iter.Next(); err == nil {
t.Errorf("client sends query to removed database successfully, want it to fail")
}
verifyDirectPathRemoteAddress(t)

// Recreate database and table.
dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
Expand Down Expand Up @@ -1322,6 +1368,7 @@ func TestIntegration_DbRemovalRecovery(t *testing.T) {
if err != nil && err != iterator.Done {
t.Errorf("failed to send query to database %v: %v", dbPath, err)
}
verifyDirectPathRemoteAddress(t)
}

// Test encoding/decoding non-struct Cloud Spanner types.
Expand Down Expand Up @@ -1514,6 +1561,7 @@ func TestIntegration_BasicTypes(t *testing.T) {
if err != nil {
t.Fatalf("Unable to fetch row %v: %v", i, err)
}
verifyDirectPathRemoteAddress(t)
// Create new instance of type of test.want.
want := test.want
if want == nil {
Expand Down Expand Up @@ -3105,6 +3153,11 @@ func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (cl
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
// TODO(mohanli): Move EnableDirectPath internal option to client.go when DirectPath is ready for public beta.
opts = append(opts, internaloption.EnableDirectPath(true))
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...)
if err != nil {
return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
Expand Down Expand Up @@ -3226,3 +3279,27 @@ func skipEmulatorTest(t *testing.T) {
t.Skip("Skipping testing against the emulator.")
}
}

func verifyDirectPathRemoteAddress(t *testing.T) {
t.Helper()
if !dpConfig.attemptDirectPath {
return
}
if remoteIP, res := isDirectPathRemoteAddress(); !res {
if dpConfig.directPathIPv4Only {
t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP)
} else {
t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP)
}
}
}

func isDirectPathRemoteAddress() (_ string, _ bool) {
remoteIP := peerInfo.Addr.String()
// DirectPath ipv4-only can only use ipv4 traffic.
if dpConfig.directPathIPv4Only {
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix)
}
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}

0 comments on commit 18e3a4f

Please sign in to comment.