From e6684c39599221e9a1e22a790305e42e8ce5d903 Mon Sep 17 00:00:00 2001 From: Mohan Li <67390330+mohanli-ml@users.noreply.github.com> Date: Wed, 13 Jan 2021 12:01:57 -0800 Subject: [PATCH] feat(bigtable): Add a DirectPath fallback integration test (#3384) --- bigtable/integration_test.go | 109 +++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index b6d2edafe87..38ad5942507 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "math/rand" + "os/exec" "reflect" "sort" "strings" @@ -73,6 +74,10 @@ func populatePresidentsGraph(table *Table) error { var instanceToCreate string var instanceToCreateZone string var instanceToCreateZone2 string +var blackholeDpv6Cmd string +var blackholeDpv4Cmd string +var allowDpv6Cmd string +var allowDpv4Cmd string func init() { // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. @@ -82,6 +87,11 @@ func init() { "The zone in which to create the new test instance.") flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", "The zone in which to create a second cluster in the test instance.") + // Use sysctl or iptables to blackhole DirectPath IP for fallback tests. + flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") + flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") + flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") + flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4") } func TestIntegration_ConditionalMutations(t *testing.T) { @@ -2208,6 +2218,81 @@ func TestIntegration_AdminBackup(t *testing.T) { } } +// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. +func TestIntegration_DirectPathFallback(t *testing.T) { + ctx := context.Background() + testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) + if err != nil { + t.Fatal(err) + } + defer cleanup() + + if !testEnv.Config().AttemptDirectPath { + t.Skip() + } + + if len(blackholeDpv6Cmd) == 0 { + t.Fatal("-it.blackhole-dpv6-cmd unset") + } + if len(blackholeDpv4Cmd) == 0 { + t.Fatal("-it.blackhole-dpv4-cmd unset") + } + if len(allowDpv6Cmd) == 0 { + t.Fatal("-it.allowdpv6-cmd unset") + } + if len(allowDpv4Cmd) == 0 { + t.Fatal("-it.allowdpv4-cmd unset") + } + + if err := populatePresidentsGraph(table); err != nil { + t.Fatal(err) + } + + // Precondition: wait for DirectPath to connect. + dpEnabled := examineTraffic(ctx, testEnv, table, false) + if !dpEnabled { + t.Fatalf("Failed to observe RPCs over DirectPath") + } + + // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. + blackholeDirectPath(testEnv, t) + dpDisabled := examineTraffic(ctx, testEnv, table, true) + if !dpDisabled { + t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") + } + + // Disable the blackhole, and client should use DirectPath again. + allowDirectPath(testEnv, t) + dpEnabled = examineTraffic(ctx, testEnv, table, false) + if !dpEnabled { + t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") + } +} + +// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true). +func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool { + numCount := 0 + const ( + numRPCsToSend = 20 + minCompleteRPC = 40 + ) + + start := time.Now() + for time.Since(start) < 2*time.Minute { + for i := 0; i < numRPCsToSend; i++ { + _, _ = table.ReadRow(ctx, "jadams") + if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP { + numCount++ + if numCount >= minCompleteRPC { + return true + } + } + time.Sleep(100 * time.Millisecond) + } + } + return false +} + func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { testEnv, err := NewIntegrationEnv() if err != nil { @@ -2325,3 +2410,27 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { // DirectPath ipv6 can use either ipv4 or ipv6 traffic. return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) } + +func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) { + cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + if testEnv.Config().DirectPathIPV4Only { + return + } + cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) +} + +func allowDirectPath(testEnv IntegrationEnv, t *testing.T) { + cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + if testEnv.Config().DirectPathIPV4Only { + return + } + cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) +}