From 948452ce896d3f44c0e22cdaf69e122f26a3c912 Mon Sep 17 00:00:00 2001 From: Mohan Li <67390330+mohanli-ml@users.noreply.github.com> Date: Fri, 4 Dec 2020 07:50:03 -0800 Subject: [PATCH] feat(bigtable): run E2E test over DirectPath (#3116) In the relevant GAX library PR https://github.com/googleapis/google-api-go-client/pull/732, we removed environment variable GOOGLE_CLOUD_ENABLE_DIRECT_PATH, and expose an internal option to Yoshi library, so that Yoshi library can decide if it want to attempt DirectPath or not. In this PR, we make use of this option to run E2E test over DirectPath. Relevant bug is b/172700854. --- bigtable/export_test.go | 46 ++++++++++++++++---- bigtable/integration_test.go | 81 ++++++++++++++++++++++++++++-------- 2 files changed, 101 insertions(+), 26 deletions(-) diff --git a/bigtable/export_test.go b/bigtable/export_test.go index e392da70b52..fd84c499585 100644 --- a/bigtable/export_test.go +++ b/bigtable/export_test.go @@ -28,8 +28,10 @@ import ( btopt "cloud.google.com/go/bigtable/internal/option" "cloud.google.com/go/internal/testutil" "google.golang.org/api/option" + "google.golang.org/api/option/internaloption" gtransport "google.golang.org/api/transport/grpc" "google.golang.org/grpc" + "google.golang.org/grpc/peer" ) var legacyUseProd string @@ -45,6 +47,8 @@ func init() { flag.StringVar(&c.Instance, "it.instance", "", "Bigtable instance to use") flag.StringVar(&c.Cluster, "it.cluster", "", "Bigtable cluster to use") flag.StringVar(&c.Table, "it.table", "", "Bigtable table to create") + flag.BoolVar(&c.AttemptDirectPath, "it.attempt-directpath", false, "Attempt DirectPath") + flag.BoolVar(&c.DirectPathIPV4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a ipv4-only VM") // Backwards compat flag.StringVar(&legacyUseProd, "use_prod", "", `DEPRECATED: if set to "proj,instance,table", run integration test against production`) @@ -53,13 +57,15 @@ func init() { // IntegrationTestConfig contains parameters to pick and setup a IntegrationEnv for testing type IntegrationTestConfig struct { - UseProd bool - AdminEndpoint string - DataEndpoint string - Project string - Instance string - Cluster string - Table string + UseProd bool + AdminEndpoint string + DataEndpoint string + Project string + Instance string + Cluster string + Table string + AttemptDirectPath bool + DirectPathIPV4Only bool } // IntegrationEnv represents a testing environment. @@ -71,6 +77,7 @@ type IntegrationEnv interface { NewInstanceAdminClient() (*InstanceAdminClient, error) NewClient() (*Client, error) Close() + Peer() *peer.Peer } // NewIntegrationEnv creates a new environment based on the command line args @@ -124,6 +131,10 @@ func NewEmulatedEnv(config IntegrationTestConfig) (*EmulatedEnv, error) { return env, nil } +func (e *EmulatedEnv) Peer() *peer.Peer { + return nil +} + // Close stops & cleans up the emulator func (e *EmulatedEnv) Close() { e.server.Close() @@ -197,7 +208,8 @@ func (e *EmulatedEnv) NewClient() (*Client, error) { // ProdEnv encapsulates the state necessary to connect to the external Bigtable service type ProdEnv struct { - config IntegrationTestConfig + config IntegrationTestConfig + peerInfo *peer.Peer } // NewProdEnv builds the environment representation @@ -212,7 +224,15 @@ func NewProdEnv(config IntegrationTestConfig) (*ProdEnv, error) { return nil, errors.New("Table not set") } - return &ProdEnv{config}, nil + env := &ProdEnv{ + config: config, + peerInfo: &peer.Peer{}, + } + return env, nil +} + +func (e *ProdEnv) Peer() *peer.Peer { + return e.peerInfo } // Close is a no-op for production environments @@ -247,5 +267,13 @@ func (e *ProdEnv) NewClient() (*Client, error) { if endpoint := e.config.DataEndpoint; endpoint != "" { clientOpts = append(clientOpts, option.WithEndpoint(endpoint)) } + + if e.config.AttemptDirectPath { + // TODO(mohanli): Move the EnableDirectPath internal option to bigtable.go after e2e tests are done. + clientOpts = append(clientOpts, internaloption.EnableDirectPath(true)) + // For DirectPath tests, we need to add an interceptor to check the peer IP. + clientOpts = append(clientOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(e.peerInfo)))) + } + return NewClient(context.Background(), e.config.Project, e.config.Instance, clientOpts...) } diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 007b4d1cbc3..e03b8d1b478 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -39,6 +39,11 @@ import ( btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" ) +const ( + directPathIPV6Prefix = "[2001:4860:8040" + directPathIPV4Prefix = "34.126" +) + var ( presidentsSocialGraph = map[string][]string{ "wmckinley": {"tjefferson"}, @@ -80,7 +85,7 @@ func init() { func TestIntegration_ConditionalMutations(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -98,6 +103,7 @@ func TestIntegration_ConditionalMutations(t *testing.T) { if err := table.Apply(ctx, "tjefferson", mut); err != nil { t.Fatalf("Conditionally mutating row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) // Do a second condition mutation with a filter that does not match, // and thus no changes should be made. mutTrue = NewMutation() @@ -107,12 +113,14 @@ func TestIntegration_ConditionalMutations(t *testing.T) { if err := table.Apply(ctx, "tjefferson", mut); err != nil { t.Fatalf("Conditionally mutating row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) // Fetch a row. row, err := table.ReadRow(ctx, "jadams") if err != nil { t.Fatalf("Reading a row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) wantRow := Row{ "follows": []ReadItem{ {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, @@ -126,7 +134,7 @@ func TestIntegration_ConditionalMutations(t *testing.T) { func TestIntegration_PartialReadRows(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -157,7 +165,7 @@ func TestIntegration_PartialReadRows(t *testing.T) { func TestIntegration_ReadRowList(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -190,7 +198,7 @@ func TestIntegration_ReadRowList(t *testing.T) { func TestIntegration_DeleteRow(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -217,7 +225,7 @@ func TestIntegration_DeleteRow(t *testing.T) { func TestIntegration_ReadModifyWrite(t *testing.T) { ctx := context.Background() - _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) + testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -267,6 +275,7 @@ func TestIntegration_ReadModifyWrite(t *testing.T) { if err != nil { t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) } + verifyDirectPathRemoteAddress(testEnv, t) // Make sure the modified cell returned by the RMW operation has a timestamp. if row["counter"][0].Timestamp == 0 { t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) @@ -283,15 +292,18 @@ func TestIntegration_ReadModifyWrite(t *testing.T) { if err != nil { t.Fatalf("ApplyReadModifyWrite null string: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) if err != nil { t.Fatalf("ApplyReadModifyWrite null string: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) // Get only the correct row back on read. r, err := table.ReadRow(ctx, "issue-723-1") if err != nil { t.Fatalf("Reading row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) if r.Key() != "issue-723-1" { t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") } @@ -299,7 +311,7 @@ func TestIntegration_ReadModifyWrite(t *testing.T) { func TestIntegration_ArbitraryTimestamps(t *testing.T) { ctx := context.Background() - _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) + _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -581,7 +593,7 @@ func TestIntegration_ArbitraryTimestamps(t *testing.T) { func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { ctx := context.Background() - _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) + _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -624,7 +636,7 @@ func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { ctx := context.Background() - _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) + testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -642,10 +654,12 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err := table.Apply(ctx, "bigrow", mut); err != nil { t.Fatalf("Big write: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) r, err := table.ReadRow(ctx, "bigrow") if err != nil { t.Fatalf("Big read: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) wantRow := Row{"ts": []ReadItem{ {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, }} @@ -670,6 +684,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err := table.Apply(ctx, row, mut); err != nil { t.Errorf("Preparing large scan: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) }() } wg.Wait() @@ -685,6 +700,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatalf("Doing large scan: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) if want := 1000 * len(medBytes); n != want { t.Fatalf("Large scan returned %d bytes, want %d", n, want) } @@ -698,6 +714,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(testEnv, t) if rc != wantRc { t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) } @@ -725,6 +742,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) } + verifyDirectPathRemoteAddress(testEnv, t) if status != nil { t.Fatalf("non-nil errors: %v", err) } @@ -735,6 +753,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatalf("Reading a bulk row: %v", err) } + verifyDirectPathRemoteAddress(testEnv, t) var wantItems []ReadItem for _, val := range ss { wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) @@ -755,6 +774,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { if err != nil { t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) } + verifyDirectPathRemoteAddress(testEnv, t) if status == nil { t.Fatalf("No errors for bad bulk mutation") } else if status[0] == nil || status[1] == nil { @@ -764,7 +784,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { func TestIntegration_Read(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -785,6 +805,7 @@ func TestIntegration_Read(t *testing.T) { if err := table.Apply(ctx, row, mut); err != nil { t.Fatalf("Mutating row %q: %v", row, err) } + verifyDirectPathRemoteAddress(testEnv, t) } for _, test := range []struct { @@ -993,6 +1014,7 @@ func TestIntegration_Read(t *testing.T) { if err != nil { t.Fatal(err) } + verifyDirectPathRemoteAddress(testEnv, t) if got := strings.Join(elt, ","); got != test.want { t.Fatalf("got %q\nwant %q", got, test.want) } @@ -1005,7 +1027,7 @@ func TestIntegration_Read(t *testing.T) { func TestIntegration_SampleRowKeys(t *testing.T) { ctx := context.Background() - _, _, table, _, cleanup, err := setupIntegration(ctx, t) + testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) if err != nil { t.Fatal(err) } @@ -1026,6 +1048,7 @@ func TestIntegration_SampleRowKeys(t *testing.T) { if err := table.Apply(ctx, row, mut); err != nil { t.Fatalf("Mutating row %q: %v", row, err) } + verifyDirectPathRemoteAddress(testEnv, t) } sampleKeys, err := table.SampleRowKeys(context.Background()) if err != nil { @@ -2119,10 +2142,10 @@ func TestIntegration_AdminBackup(t *testing.T) { } } -func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { +func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { testEnv, err := NewIntegrationEnv() if err != nil { - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } var timeout time.Duration @@ -2137,12 +2160,12 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminCli client, err := testEnv.NewClient() if err != nil { - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } adminClient, err := testEnv.NewAdminClient() if err != nil { - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } if testEnv.Config().UseProd { @@ -2155,14 +2178,14 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminCli if err := adminClient.CreateTable(ctx, tableName); err != nil { cancel() - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { cancel() - return nil, nil, nil, "", nil, err + return nil, nil, nil, nil, "", nil, err } - return client, adminClient, client.Open(tableName), tableName, func() { + return testEnv, client, adminClient, client.Open(tableName), tableName, func() { if err := adminClient.DeleteTable(ctx, tableName); err != nil { t.Errorf("DeleteTable got error %v", err) } @@ -2212,3 +2235,27 @@ func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string t.Logf("DeleteTable: %v", err) } } + +func verifyDirectPathRemoteAddress(testEnv IntegrationEnv, t *testing.T) { + t.Helper() + if !testEnv.Config().AttemptDirectPath { + return + } + if remoteIP, res := isDirectPathRemoteAddress(testEnv); !res { + if testEnv.Config().DirectPathIPV4Only { + t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP) + } else { + t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP) + } + } +} + +func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { + remoteIP := testEnv.Peer().Addr.String() + // DirectPath ipv4-only can only use ipv4 traffic. + if testEnv.Config().DirectPathIPV4Only { + return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) + } + // DirectPath ipv6 can use either ipv4 or ipv6 traffic. + return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) +}