diff --git a/spanner/integration_test.go b/spanner/integration_test.go index 2881813c533..a54d6d7d005 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -39,14 +39,21 @@ import ( instance "cloud.google.com/go/spanner/admin/instance/apiv1" "google.golang.org/api/iterator" "google.golang.org/api/option" + "google.golang.org/api/option/internaloption" adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" sppb "google.golang.org/genproto/googleapis/spanner/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" "google.golang.org/grpc/status" ) +const ( + directPathIPV6Prefix = "[2001:4860:8040" + directPathIPV4Prefix = "34.126" +) + var ( // testProjectID specifies the project used for testing. It can be changed // by setting environment variable GCLOUD_TESTS_GOLANG_PROJECT_ID. @@ -68,6 +75,9 @@ var ( databaseAdmin *database.DatabaseAdminClient instanceAdmin *instance.InstanceAdminClient + dpConfig directPathTestConfig + peerInfo *peer.Peer + singerDBStatements = []string{ `CREATE TABLE Singers ( SingerId INT64 NOT NULL, @@ -160,6 +170,17 @@ var ( validInstancePattern = regexp.MustCompile("^projects/(?P[^/]+)/instances/(?P[^/]+)$") ) +func init() { + flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag") + flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM") + peerInfo = &peer.Peer{} +} + +type directPathTestConfig struct { + attemptDirectPath bool + directPathIPv4Only bool +} + func parseInstanceName(inst string) (project, instance string, err error) { matches := validInstancePattern.FindStringSubmatch(inst) if len(matches) == 0 { @@ -206,6 +227,11 @@ func initIntegrationTests() (cleanup func()) { if spannerHost != "" { opts = append(opts, option.WithEndpoint(spannerHost)) } + if dpConfig.attemptDirectPath { + // TODO(mohanli): Move EnableDirectPath internal option to client.go when DirectPath is ready for public beta. + opts = append(opts, internaloption.EnableDirectPath(true)) + opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo)))) + } var err error // Create InstanceAdmin and DatabaseAdmin clients. instanceAdmin, err = instance.NewInstanceAdminClient(ctx, opts...) @@ -353,6 +379,7 @@ func TestIntegration_SingleUse(t *testing.T) { if writes[i].ts, err = client.Apply(ctx, []*Mutation{m}, ApplyAtLeastOnce()); err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) } // Calculate time difference between Cloud Spanner server and localhost to // use to determine the exact staleness value to use. @@ -436,6 +463,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { t.Fatalf("%d: SingleUse.Query returns error %v, want nil", i, err) } + verifyDirectPathRemoteAddress(t) rts, err := su.Timestamp() if err != nil { t.Fatalf("%d: SingleUse.Query doesn't return a timestamp, error: %v", i, err) @@ -452,6 +480,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { t.Fatalf("%d: SingleUse.Read returns error %v, want nil", i, err) } + verifyDirectPathRemoteAddress(t) rts, err = su.Timestamp() if err != nil { t.Fatalf("%d: SingleUse.Read doesn't return a timestamp, error: %v", i, err) @@ -470,6 +499,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { continue } + verifyDirectPathRemoteAddress(t) v, err := rowToValues(r) if err != nil { continue @@ -492,6 +522,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { t.Fatalf("%d: SingleUse.ReadUsingIndex returns error %v, want nil", i, err) } + verifyDirectPathRemoteAddress(t) // The results from ReadUsingIndex is sorted by the index rather than primary key. if len(got) != len(test.want) { t.Fatalf("%d: got unexpected result from SingleUse.ReadUsingIndex: %v, want %v", i, got, test.want) @@ -530,6 +561,7 @@ func TestIntegration_SingleUse(t *testing.T) { if err != nil { continue } + verifyDirectPathRemoteAddress(t) v, err := rowToValues(r) if err != nil { continue @@ -931,10 +963,12 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if e != nil { return e } + verifyDirectPathRemoteAddress(t) bb, e := readBalance(tx.Read(ctx, "Accounts", KeySets(Key{int64(2)}), []string{"Balance"})) if e != nil { return e } + verifyDirectPathRemoteAddress(t) if bf <= 0 { return nil } @@ -948,6 +982,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if err != nil { t.Errorf("%d: failed to execute transaction: %v", iter, err) } + verifyDirectPathRemoteAddress(t) }(i) } // Because of context timeout, all goroutines will eventually return. @@ -958,6 +993,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if e != nil { return e } + verifyDirectPathRemoteAddress(t) if ce := r.Column(0, &bf); ce != nil { return ce } @@ -965,6 +1001,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if e != nil { return e } + verifyDirectPathRemoteAddress(t) if bf != 30 || bb != 21 { t.Errorf("Foo's balance is now %v and Bar's balance is now %v, want %v and %v", bf, bb, 30, 21) } @@ -973,6 +1010,7 @@ func TestIntegration_ReadWriteTransaction(t *testing.T) { if err != nil { t.Errorf("failed to check balances: %v", err) } + verifyDirectPathRemoteAddress(t) } func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) { @@ -1087,6 +1125,7 @@ func TestIntegration_Reads(t *testing.T) { if _, err := client.Apply(ctx, ms); err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) // Empty read. rows, err := readAllTestTable(client.Single().Read(ctx, testTable, @@ -1094,6 +1133,7 @@ func TestIntegration_Reads(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) if got, want := len(rows), 0; got != want { t.Errorf("got %d, want %d", got, want) } @@ -1104,6 +1144,7 @@ func TestIntegration_Reads(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) if got, want := len(rows), 0; got != want { t.Errorf("got %d, want %d", got, want) } @@ -1113,6 +1154,7 @@ func TestIntegration_Reads(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) var got testTableRow if err := row.ToStruct(&got); err != nil { t.Fatal(err) @@ -1126,12 +1168,14 @@ func TestIntegration_Reads(t *testing.T) { if ErrCode(err) != codes.NotFound { t.Fatalf("got %v, want NotFound", err) } + verifyDirectPathRemoteAddress(t) // Index point read. rowIndex, err := client.Single().ReadRowUsingIndex(ctx, testTable, testTableIndex, Key{"v1"}, testTableColumns) if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(t) var gotIndex testTableRow if err := rowIndex.ToStruct(&gotIndex); err != nil { t.Fatal(err) @@ -1144,6 +1188,7 @@ func TestIntegration_Reads(t *testing.T) { if ErrCode(err) != codes.NotFound { t.Fatalf("got %v, want NotFound", err) } + verifyDirectPathRemoteAddress(t) rangeReads(ctx, t, client) indexRangeReads(ctx, t, client) } @@ -1293,6 +1338,7 @@ func TestIntegration_DbRemovalRecovery(t *testing.T) { if _, err := iter.Next(); err == nil { t.Errorf("client sends query to removed database successfully, want it to fail") } + verifyDirectPathRemoteAddress(t) // Recreate database and table. dbName := dbPath[strings.LastIndex(dbPath, "/")+1:] @@ -1322,6 +1368,7 @@ func TestIntegration_DbRemovalRecovery(t *testing.T) { if err != nil && err != iterator.Done { t.Errorf("failed to send query to database %v: %v", dbPath, err) } + verifyDirectPathRemoteAddress(t) } // Test encoding/decoding non-struct Cloud Spanner types. @@ -1514,6 +1561,7 @@ func TestIntegration_BasicTypes(t *testing.T) { if err != nil { t.Fatalf("Unable to fetch row %v: %v", i, err) } + verifyDirectPathRemoteAddress(t) // Create new instance of type of test.want. want := test.want if want == nil { @@ -3105,6 +3153,11 @@ func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (cl if spannerHost != "" { opts = append(opts, option.WithEndpoint(spannerHost)) } + if dpConfig.attemptDirectPath { + // TODO(mohanli): Move EnableDirectPath internal option to client.go when DirectPath is ready for public beta. + opts = append(opts, internaloption.EnableDirectPath(true)) + opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo)))) + } client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...) if err != nil { return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err) @@ -3226,3 +3279,27 @@ func skipEmulatorTest(t *testing.T) { t.Skip("Skipping testing against the emulator.") } } + +func verifyDirectPathRemoteAddress(t *testing.T) { + t.Helper() + if !dpConfig.attemptDirectPath { + return + } + if remoteIP, res := isDirectPathRemoteAddress(); !res { + if dpConfig.directPathIPv4Only { + t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP) + } else { + t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP) + } + } +} + +func isDirectPathRemoteAddress() (_ string, _ bool) { + remoteIP := peerInfo.Addr.String() + // DirectPath ipv4-only can only use ipv4 traffic. + if dpConfig.directPathIPv4Only { + return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) + } + // DirectPath ipv6 can use either ipv4 or ipv6 traffic. + return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) +}