Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): run E2E test over DirectPath #3466

Merged
merged 4 commits into from Dec 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 77 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -39,14 +39,21 @@ import (
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)

const (
directPathIPV6Prefix = "[2001:4860:8040"
olavloite marked this conversation as resolved.
Show resolved Hide resolved
directPathIPV4Prefix = "34.126"
)

var (
// testProjectID specifies the project used for testing. It can be changed
// by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID.
Expand All @@ -68,6 +75,9 @@ var (
databaseAdmin *database.DatabaseAdminClient
instanceAdmin *instance.InstanceAdminClient

dpConfig directPathTestConfig
peerInfo *peer.Peer

singerDBStatements = []string{
`CREATE TABLE Singers (
SingerId INT64 NOT NULL,
Expand Down Expand Up @@ -160,6 +170,17 @@ var (
validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$")
)

func init() {
flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag")
flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM")
peerInfo = &peer.Peer{}
}

type directPathTestConfig struct {
attemptDirectPath bool
directPathIPv4Only bool
}

func parseInstanceName(inst string) (project, instance string, err error) {
matches := validInstancePattern.FindStringSubmatch(inst)
if len(matches) == 0 {
Expand Down Expand Up @@ -206,6 +227,11 @@ func initIntegrationTests() (cleanup func()) {
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
// TODO(mohanli): Move EnableDirectPath internal option to client.go when DirectPath is ready for public beta.
opts = append(opts, internaloption.EnableDirectPath(true))
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
var err error
// Create InstanceAdmin and DatabaseAdmin clients.
instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...)
Expand Down Expand Up @@ -353,6 +379,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
}
// Calculate time difference between Cloud Spanner server and localhost to
// use to determine the exact staleness value to use.
Expand Down Expand Up @@ -436,6 +463,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err)
}
verifyDirectPathRemoteAddress(t)
rts, err := su.Timestamp()
if err != nil {
t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err)
Expand All @@ -452,6 +480,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err)
}
verifyDirectPathRemoteAddress(t)
rts, err = su.Timestamp()
if err != nil {
t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err)
Expand All @@ -470,6 +499,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
continue
}
verifyDirectPathRemoteAddress(t)
v, err := rowToValues(r)
if err != nil {
continue
Expand All @@ -492,6 +522,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err)
}
verifyDirectPathRemoteAddress(t)
// The results from ReadUsingIndex is sorted by the index rather than primary key.
if len(got) != len(test.want) {
t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want)
Expand Down Expand Up @@ -530,6 +561,7 @@ func TestIntegration_SingleUse(t *testing.T) {
if err != nil {
continue
}
verifyDirectPathRemoteAddress(t)
v, err := rowToValues(r)
if err != nil {
continue
Expand Down Expand Up @@ -931,10 +963,12 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) {
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"}))
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
if bf <= 0 {
return nil
}
Expand All @@ -948,6 +982,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) {
if err != nil {
t.Errorf("%d: failed to execute transaction: %v", iter, err)
}
verifyDirectPathRemoteAddress(t)
}(i)
}
// Because of context timeout, all goroutines will eventually return.
Expand All @@ -958,13 +993,15 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) {
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
if ce := r.Column(0, &bf); ce != nil {
return ce
}
bb, e = readBalance(tx.ReadUsingIndex(ctx, "Accounts", "AccountByNickname", KeySets(Key{"Bar"}), []string{"Balance"}))
if e != nil {
return e
}
verifyDirectPathRemoteAddress(t)
if bf != 30 || bb != 21 {
t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21)
}
Expand All @@ -973,6 +1010,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) {
if err != nil {
t.Errorf("failed to check balances: %v", err)
}
verifyDirectPathRemoteAddress(t)
}

func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) {
Expand Down Expand Up @@ -1087,13 +1125,15 @@ func TestIntegration_Reads(t *testing.T) {
if _, err := client.Apply(ctx, ms); err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)

// Empty read.
rows, err := readAllTestTable(client.Single().Read(ctx, testTable,
KeyRange{Start: Key{"k99"}, End: Key{"z"}}, testTableColumns))
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
if got, want := len(rows), 0; got != want {
t.Errorf("got %d, want %d", got, want)
}
Expand All @@ -1104,6 +1144,7 @@ func TestIntegration_Reads(t *testing.T) {
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
if got, want := len(rows), 0; got != want {
t.Errorf("got %d, want %d", got, want)
}
Expand All @@ -1113,6 +1154,7 @@ func TestIntegration_Reads(t *testing.T) {
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
var got testTableRow
if err := row.ToStruct(&got); err != nil {
t.Fatal(err)
Expand All @@ -1126,12 +1168,14 @@ func TestIntegration_Reads(t *testing.T) {
if ErrCode(err) != codes.NotFound {
t.Fatalf("got %v, want NotFound", err)
}
verifyDirectPathRemoteAddress(t)

// Index point read.
rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns)
if err != nil {
t.Fatal(err)
}
verifyDirectPathRemoteAddress(t)
var gotIndex testTableRow
if err := rowIndex.ToStruct(&gotIndex); err != nil {
t.Fatal(err)
Expand All @@ -1144,6 +1188,7 @@ func TestIntegration_Reads(t *testing.T) {
if ErrCode(err) != codes.NotFound {
t.Fatalf("got %v, want NotFound", err)
}
verifyDirectPathRemoteAddress(t)
rangeReads(ctx, t, client)
indexRangeReads(ctx, t, client)
}
Expand Down Expand Up @@ -1293,6 +1338,7 @@ func TestIntegration_DbRemovalRecovery(t *testing.T) {
if _, err := iter.Next(); err == nil {
t.Errorf("client sends query to removed database successfully, want it to fail")
}
verifyDirectPathRemoteAddress(t)

// Recreate database and table.
dbName := dbPath[strings.LastIndex(dbPath, "/")+1:]
Expand Down Expand Up @@ -1322,6 +1368,7 @@ func TestIntegration_DbRemovalRecovery(t *testing.T) {
if err != nil && err != iterator.Done {
t.Errorf("failed to send query to database %v: %v", dbPath, err)
}
verifyDirectPathRemoteAddress(t)
}

// Test encoding/decoding non-struct Cloud Spanner types.
Expand Down Expand Up @@ -1514,6 +1561,7 @@ func TestIntegration_BasicTypes(t *testing.T) {
if err != nil {
t.Fatalf("Unable to fetch row %v: %v", i, err)
}
verifyDirectPathRemoteAddress(t)
// Create new instance of type of test.want.
want := test.want
if want == nil {
Expand Down Expand Up @@ -3105,6 +3153,11 @@ func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (cl
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
// TODO(mohanli): Move EnableDirectPath internal option to client.go when DirectPath is ready for public beta.
opts = append(opts, internaloption.EnableDirectPath(true))
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...)
if err != nil {
return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
Expand Down Expand Up @@ -3226,3 +3279,27 @@ func skipEmulatorTest(t *testing.T) {
t.Skip("Skipping testing against the emulator.")
}
}

func verifyDirectPathRemoteAddress(t *testing.T) {
t.Helper()
if !dpConfig.attemptDirectPath {
return
}
if remoteIP, res := isDirectPathRemoteAddress(); !res {
if dpConfig.directPathIPv4Only {
t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP)
} else {
t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP)
}
}
}

func isDirectPathRemoteAddress() (_ string, _ bool) {
remoteIP := peerInfo.Addr.String()
// DirectPath ipv4-only can only use ipv4 traffic.
if dpConfig.directPathIPv4Only {
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix)
}
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}