diff --git a/bigtable/export_test.go b/bigtable/export_test.go index e392da70b52..fd84c499585 100644 --- a/bigtable/export_test.go +++ b/bigtable/export_test.go @@ -28,8 +28,10 @@ import ( btopt "cloud.google.com/go/bigtable/internal/option" "cloud.google.com/go/internal/testutil" "google.golang.org/api/option" + "google.golang.org/api/option/internaloption" gtransport "google.golang.org/api/transport/grpc" "google.golang.org/grpc" + "google.golang.org/grpc/peer" ) var legacyUseProd string @@ -45,6 +47,8 @@ func init() { flag.StringVar(&c.Instance, "it.instance", "", "Bigtable instance to use") flag.StringVar(&c.Cluster, "it.cluster", "", "Bigtable cluster to use") flag.StringVar(&c.Table, "it.table", "", "Bigtable table to create") + flag.BoolVar(&c.AttemptDirectPath, "it.attempt-directpath", false, "Attempt DirectPath") + flag.BoolVar(&c.DirectPathIPV4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a ipv4-only VM") // Backwards compat flag.StringVar(&legacyUseProd, "use_prod", "", `DEPRECATED: if set to "proj,instance,table", run integration test against production`) @@ -53,13 +57,15 @@ func init() { // IntegrationTestConfig contains parameters to pick and setup a IntegrationEnv for testing type IntegrationTestConfig struct { - UseProd bool - AdminEndpoint string - DataEndpoint string - Project string - Instance string - Cluster string - Table string + UseProd bool + AdminEndpoint string + DataEndpoint string + Project string + Instance string + Cluster string + Table string + AttemptDirectPath bool + DirectPathIPV4Only bool } // IntegrationEnv represents a testing environment. @@ -71,6 +77,7 @@ type IntegrationEnv interface { NewInstanceAdminClient() (*InstanceAdminClient, error) NewClient() (*Client, error) Close() + Peer() *peer.Peer } // NewIntegrationEnv creates a new environment based on the command line args @@ -124,6 +131,10 @@ func NewEmulatedEnv(config IntegrationTestConfig) (*EmulatedEnv, error) { return env, nil } +func (e *EmulatedEnv) Peer() *peer.Peer { + return nil +} + // Close stops & cleans up the emulator func (e *EmulatedEnv) Close() { e.server.Close() @@ -197,7 +208,8 @@ func (e *EmulatedEnv) NewClient() (*Client, error) { // ProdEnv encapsulates the state necessary to connect to the external Bigtable service type ProdEnv struct { - config IntegrationTestConfig + config IntegrationTestConfig + peerInfo *peer.Peer } // NewProdEnv builds the environment representation @@ -212,7 +224,15 @@ func NewProdEnv(config IntegrationTestConfig) (*ProdEnv, error) { return nil, errors.New("Table not set") } - return &ProdEnv{config}, nil + env := &ProdEnv{ + config: config, + peerInfo: &peer.Peer{}, + } + return env, nil +} + +func (e *ProdEnv) Peer() *peer.Peer { + return e.peerInfo } // Close is a no-op for production environments @@ -247,5 +267,13 @@ func (e *ProdEnv) NewClient() (*Client, error) { if endpoint := e.config.DataEndpoint; endpoint != "" { clientOpts = append(clientOpts, option.WithEndpoint(endpoint)) } + + if e.config.AttemptDirectPath { + // TODO(mohanli): Move the EnableDirectPath internal option to bigtable.go after e2e tests are done. + clientOpts = append(clientOpts, internaloption.EnableDirectPath(true)) + // For DirectPath tests, we need to add an interceptor to check the peer IP. + clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(e.peerInfo)))) + } + return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...) } diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 007b4d1cbc3..e03b8d1b478 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -39,6 +39,11 @@ import ( btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" ) +const ( + directPathIPV6Prefix = "[2001:4860:8040" + directPathIPV4Prefix = "34.126" +) + var ( presidentsSocialGraph = map[string][]string{ "wmckinley": {"tjefferson"}, @@ -80,7 +85,7 @@ func init() { func TestIntegration_ConditionalMutations(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -98,6 +103,7 @@ func TestIntegration_ConditionalMutations(t *testing.T) { if err := table.Apply(ctx, "tjefferson", mut); err != nil { t.Fatalf("Conditionally mutating row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) // Do a second condition mutation with a filter that does not match, // and thus no changes should be made. mutTrue = NewMutation() @@ -107,12 +113,14 @@ func TestIntegration_ConditionalMutations(t *testing.T) { if err := table.Apply(ctx, "tjefferson", mut); err != nil { t.Fatalf("Conditionally mutating row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) // Fetch a row. row, err := table.ReadRow(ctx, "jadams") if err != nil { t.Fatalf("Reading a row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) wantRow := Row{ "follows": []ReadItem{ {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, @@ -126,7 +134,7 @@ func TestIntegration_ConditionalMutations(t *testing.T) { func TestIntegration_PartialReadRows(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -157,7 +165,7 @@ func TestIntegration_PartialReadRows(t *testing.T) { func TestIntegration_ReadRowList(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -190,7 +198,7 @@ func TestIntegration_ReadRowList(t *testing.T) { func TestIntegration_DeleteRow(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -217,7 +225,7 @@ func TestIntegration_DeleteRow(t *testing.T) { func TestIntegration_ReadModifyWrite(t *testing.T) { ctx := context.Background() - _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) + testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -267,6 +275,7 @@ func TestIntegration_ReadModifyWrite(t *testing.T) { if err != nil { t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) } + verifyDirectPathRemoteAddress(testEnv, t) // Make sure the modified cell returned by the RMW operation has a timestamp. if row["counter"][0].Timestamp == 0 { t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) @@ -283,15 +292,18 @@ func TestIntegration_ReadModifyWrite(t *testing.T) { if err != nil { t.Fatalf("ApplyReadModifyWrite null string: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) if err != nil { t.Fatalf("ApplyReadModifyWrite null string: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) // Get only the correct row back on read. r, err := table.ReadRow(ctx, "issue-723-1") if err != nil { t.Fatalf("Reading row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) if r.Key() != "issue-723-1" { t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") } @@ -299,7 +311,7 @@ func TestIntegration_ReadModifyWrite(t *testing.T) { func TestIntegration_ArbitraryTimestamps(t *testing.T) { ctx := context.Background() - _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) + _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -581,7 +593,7 @@ func TestIntegration_ArbitraryTimestamps(t *testing.T) { func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { ctx := context.Background() - _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) + _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -624,7 +636,7 @@ func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { ctx := context.Background() - _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) + testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -642,10 +654,12 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err := table.Apply(ctx, "bigrow", mut); err != nil { t.Fatalf("Big write: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) r, err := table.ReadRow(ctx, "bigrow") if err != nil { t.Fatalf("Big read: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) wantRow := Row{"ts": []ReadItem{ {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, }} @@ -670,6 +684,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err := table.Apply(ctx, row, mut); err != nil { t.Errorf("Preparing large scan: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) }() } wg.Wait() @@ -685,6 +700,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatalf("Doing large scan: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) if want := 1000 * len(medBytes); n != want { t.Fatalf("Large scan returned %d bytes, want %d", n, want) } @@ -698,6 +714,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(testEnv, t) if rc != wantRc { t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) } @@ -725,6 +742,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) } + verifyDirectPathRemoteAddress(testEnv, t) if status != nil { t.Fatalf("non-nil errors: %v", err) } @@ -735,6 +753,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatalf("Reading a bulk row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) var wantItems []ReadItem for _, val := range ss { wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) @@ -755,6 +774,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) } + verifyDirectPathRemoteAddress(testEnv, t) if status == nil { t.Fatalf("No errors for bad bulk mutation") } else if status[0] == nil || status[1] == nil { @@ -764,7 +784,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { func TestIntegration_Read(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -785,6 +805,7 @@ func TestIntegration_Read(t *testing.T) { if err := table.Apply(ctx, row, mut); err != nil { t.Fatalf("Mutating row %q: %v", row, err) } + verifyDirectPathRemoteAddress(testEnv, t) } for _, test := range []struct { @@ -993,6 +1014,7 @@ func TestIntegration_Read(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(testEnv, t) if got := strings.Join(elt, ","); got != test.want { t.Fatalf("got %q\nwant %q", got, test.want) } @@ -1005,7 +1027,7 @@ func TestIntegration_Read(t *testing.T) { func TestIntegration_SampleRowKeys(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -1026,6 +1048,7 @@ func TestIntegration_SampleRowKeys(t *testing.T) { if err := table.Apply(ctx, row, mut); err != nil { t.Fatalf("Mutating row %q: %v", row, err) } + verifyDirectPathRemoteAddress(testEnv, t) } sampleKeys, err := table.SampleRowKeys(context.Background()) if err != nil { @@ -2119,10 +2142,10 @@ func TestIntegration_AdminBackup(t *testing.T) { } } -func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { +func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { testEnv, err := NewIntegrationEnv() if err != nil { - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } var timeout time.Duration @@ -2137,12 +2160,12 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminCli client, err := testEnv.NewClient() if err != nil { - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } adminClient, err := testEnv.NewAdminClient() if err != nil { - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } if testEnv.Config().UseProd { @@ -2155,14 +2178,14 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminCli if err := adminClient.CreateTable(ctx, tableName); err != nil { cancel() - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { cancel() - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } - return client, adminClient, client.Open(tableName), tableName, func() { + return testEnv, client, adminClient, client.Open(tableName), tableName, func() { if err := adminClient.DeleteTable(ctx, tableName); err != nil { t.Errorf("DeleteTable got error %v", err) } @@ -2212,3 +2235,27 @@ func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string t.Logf("DeleteTable: %v", err) } } + +func verifyDirectPathRemoteAddress(testEnv IntegrationEnv, t *testing.T) { + t.Helper() + if !testEnv.Config().AttemptDirectPath { + return + } + if remoteIP, res := isDirectPathRemoteAddress(testEnv); !res { + if testEnv.Config().DirectPathIPV4Only { + t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP) + } else { + t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP) + } + } +} + +func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { + remoteIP := testEnv.Peer().Addr.String() + // DirectPath ipv4-only can only use ipv4 traffic. + if testEnv.Config().DirectPathIPV4Only { + return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) + } + // DirectPath ipv6 can use either ipv4 or ipv6 traffic. + return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) +}