From 8621eae7537e005f34abf58a33d0cd86c28551ac Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Fri, 4 Dec 2020 18:17:43 +0000 Subject: [PATCH 01/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 94 ++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index e03b8d1b478..570f7f8ade0 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -22,10 +22,12 @@ import ( "fmt" "math" "math/rand" + "os/exec" "reflect" "sort" "strings" "sync" + "sync/atomic" "testing" "time" @@ -2142,6 +2144,63 @@ func TestIntegration_AdminBackup(t *testing.T) { } } +// Blackhole directpath address to test fallback. +func TestIntegration_DirectPathFallback(t *testing.T) { + ctx := context.Background() + testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) + if err != nil { + t.Fatal(err) + } + defer cleanup() + + if err := populatePresidentsGraph(table); err != nil { + t.Fatal(err) + } + + // Precondition: wait for DirectPath to connect. + countEnough := exerciseDirectPath(ctx, testEnv, table /*blackholeDP = */, false) + if !countEnough { + t.Fatalf("Failed to observe RPCs over DirectPath") + } + + // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. + blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, true) + countEnough = exerciseDirectPath(ctx, testEnv, table /*blackholeDP = */, true) + if !countEnough { + t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") + } + + // Make sure that the client will start reading from IPv6 again by sending new requests and + // checking the injected IPv6 counter has been updated. + blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, false) + countEnough = exerciseDirectPath(ctx, testEnv, table /*blackholeDP = */, false) + if !countEnough { + t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") + } +} + +func exerciseDirectPath(ctx context.Context, testEnv IntegrationEnv, table *Table, isBlackhole bool) bool { + var numCount uint64 + const ( + numRPCsToSend = 20 + minCompleteRPC = 40 + ) + + countEnough := false + start := time.Now() + for !countEnough && time.Since(start) < 2*time.Minute { + for i := 0; i < numRPCsToSend; i++ { + _, _ = table.ReadRow(ctx, "jadams") + if _, useDp := isDirectPathRemoteAddress(testEnv); useDp != isBlackhole { + atomic.AddUint64(&numCount, 1) + } + time.Sleep(100 * time.Millisecond) + countEnough = numCount >= minCompleteRPC + } + } + return countEnough +} + func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { testEnv, err := NewIntegrationEnv() if err != nil { @@ -2259,3 +2318,38 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { // DirectPath ipv6 can use either ipv4 or ipv6 traffic. return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) } + +func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) { + blackholeDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j DROP && sleep 5 && echo blackholeDpv6" + blackholeDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j DROP && sleep 5 && echo blackholeDpv4" + allowDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j ACCEPT && sleep 5 && echo allowDpv6" + allowDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j ACCEPT && sleep 5 && echo allowDpv4" + + if testEnv.Config().DirectPathIPV4Only { + if blackholeDP { + cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + } else { + cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + } + } else { + if blackholeDP { + cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) + } else { + cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) + } + } +} From 4c0e80c3050989ff6ac4778903630361a875cd14 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Fri, 4 Dec 2020 21:18:30 +0000 Subject: [PATCH 02/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 570f7f8ade0..873c4b9ce1e 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -2144,7 +2144,7 @@ func TestIntegration_AdminBackup(t *testing.T) { } } -// Blackhole directpath address to test fallback. +// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. func TestIntegration_DirectPathFallback(t *testing.T) { ctx := context.Background() testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) @@ -2158,28 +2158,28 @@ func TestIntegration_DirectPathFallback(t *testing.T) { } // Precondition: wait for DirectPath to connect. - countEnough := exerciseDirectPath(ctx, testEnv, table /*blackholeDP = */, false) + countEnough := examineTraffic(ctx, testEnv, table /*blackholeDP = */, false) if !countEnough { t.Fatalf("Failed to observe RPCs over DirectPath") } // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, true) - countEnough = exerciseDirectPath(ctx, testEnv, table /*blackholeDP = */, true) + countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, true) if !countEnough { t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") } - // Make sure that the client will start reading from IPv6 again by sending new requests and - // checking the injected IPv6 counter has been updated. + // Disable the blackhole, and client should use DirectPath again. blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, false) - countEnough = exerciseDirectPath(ctx, testEnv, table /*blackholeDP = */, false) + countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, false) if !countEnough { t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") } } -func exerciseDirectPath(ctx context.Context, testEnv IntegrationEnv, table *Table, isBlackhole bool) bool { +// examineTraffic counts RPCs use DirectPath or CFE traffic. +func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, expectDP bool) bool { var numCount uint64 const ( numRPCsToSend = 20 @@ -2191,7 +2191,7 @@ func exerciseDirectPath(ctx context.Context, testEnv IntegrationEnv, table *Tabl for !countEnough && time.Since(start) < 2*time.Minute { for i := 0; i < numRPCsToSend; i++ { _, _ = table.ReadRow(ctx, "jadams") - if _, useDp := isDirectPathRemoteAddress(testEnv); useDp != isBlackhole { + if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != expectDP { atomic.AddUint64(&numCount, 1) } time.Sleep(100 * time.Millisecond) From a53fa6436998f70316cc4146d15c91b416e59cd5 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Fri, 4 Dec 2020 23:32:31 +0000 Subject: [PATCH 03/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 873c4b9ce1e..f925b61e1c2 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -2153,6 +2153,10 @@ func TestIntegration_DirectPathFallback(t *testing.T) { } defer cleanup() + if (!testEnv.Config().AttemptDirectPath) { + return + } + if err := populatePresidentsGraph(table); err != nil { t.Fatal(err) } From 789932ca23b444d1f8c891458199299d90931127 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Sat, 5 Dec 2020 00:14:11 +0000 Subject: [PATCH 04/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index f925b61e1c2..b5698da8f48 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -2153,9 +2153,9 @@ func TestIntegration_DirectPathFallback(t *testing.T) { } defer cleanup() - if (!testEnv.Config().AttemptDirectPath) { - return - } + if !testEnv.Config().AttemptDirectPath { + return + } if err := populatePresidentsGraph(table); err != nil { t.Fatal(err) From 39fb39784abff9b1089e128efc667c1645abf9b1 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Tue, 15 Dec 2020 21:09:28 +0000 Subject: [PATCH 05/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index b5698da8f48..8d40df9560c 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -74,6 +74,10 @@ func populatePresidentsGraph(table *Table) error { var instanceToCreate string var instanceToCreateZone string var instanceToCreateZone2 string +var blackholeDpv6Cmd string +var blackholeDpv4Cmd string +var allowDpv6Cmd string +var allowDpv4Cmd string func init() { // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. @@ -83,6 +87,10 @@ func init() { "The zone in which to create the new test instance.") flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", "The zone in which to create a second cluster in the test instance.") + flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") + flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") + flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") + flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4") } func TestIntegration_ConditionalMutations(t *testing.T) { @@ -2157,6 +2165,19 @@ func TestIntegration_DirectPathFallback(t *testing.T) { return } + if len(blackholeDpv6Cmd) == 0 { + t.Fatal("-it.blackhole-dpv6-cmd unset") + } + if len(blackholeDpv4Cmd) == 0 { + t.Fatal("-it.blackhole-dpv4-cmd unset") + } + if len(allowDpv6Cmd) == 0 { + t.Fatal("-it.allowdpv6-cmd unset") + } + if len(allowDpv4Cmd) == 0 { + t.Fatal("-it.allowdpv4-cmd unset") + } + if err := populatePresidentsGraph(table); err != nil { t.Fatal(err) } @@ -2324,10 +2345,10 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { } func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) { - blackholeDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j DROP && sleep 5 && echo blackholeDpv6" - blackholeDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j DROP && sleep 5 && echo blackholeDpv4" - allowDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j ACCEPT && sleep 5 && echo allowDpv6" - allowDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j ACCEPT && sleep 5 && echo allowDpv4" +// blackholeDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j DROP && sleep 5 && echo blackholeDpv6" +// blackholeDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j DROP && sleep 5 && echo blackholeDpv4" +// allowDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j ACCEPT && sleep 5 && echo allowDpv6" +// allowDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j ACCEPT && sleep 5 && echo allowDpv4" if testEnv.Config().DirectPathIPV4Only { if blackholeDP { From 4966f16715de7263d467e51873d2b9a4218ac745 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Tue, 15 Dec 2020 21:09:57 +0000 Subject: [PATCH 06/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 40 ++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 8d40df9560c..2f14e7610cd 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -87,10 +87,10 @@ func init() { "The zone in which to create the new test instance.") flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", "The zone in which to create a second cluster in the test instance.") - flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") - flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") - flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") - flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4") + flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") + flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") + flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") + flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4") } func TestIntegration_ConditionalMutations(t *testing.T) { @@ -2165,18 +2165,18 @@ func TestIntegration_DirectPathFallback(t *testing.T) { return } - if len(blackholeDpv6Cmd) == 0 { - t.Fatal("-it.blackhole-dpv6-cmd unset") - } - if len(blackholeDpv4Cmd) == 0 { - t.Fatal("-it.blackhole-dpv4-cmd unset") - } - if len(allowDpv6Cmd) == 0 { - t.Fatal("-it.allowdpv6-cmd unset") - } - if len(allowDpv4Cmd) == 0 { - t.Fatal("-it.allowdpv4-cmd unset") - } + if len(blackholeDpv6Cmd) == 0 { + t.Fatal("-it.blackhole-dpv6-cmd unset") + } + if len(blackholeDpv4Cmd) == 0 { + t.Fatal("-it.blackhole-dpv4-cmd unset") + } + if len(allowDpv6Cmd) == 0 { + t.Fatal("-it.allowdpv6-cmd unset") + } + if len(allowDpv4Cmd) == 0 { + t.Fatal("-it.allowdpv4-cmd unset") + } if err := populatePresidentsGraph(table); err != nil { t.Fatal(err) @@ -2345,10 +2345,10 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { } func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) { -// blackholeDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j DROP && sleep 5 && echo blackholeDpv6" -// blackholeDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j DROP && sleep 5 && echo blackholeDpv4" -// allowDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j ACCEPT && sleep 5 && echo allowDpv6" -// allowDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j ACCEPT && sleep 5 && echo allowDpv4" + // blackholeDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j DROP && sleep 5 && echo blackholeDpv6" + // blackholeDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j DROP && sleep 5 && echo blackholeDpv4" + // allowDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j ACCEPT && sleep 5 && echo allowDpv6" + // allowDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j ACCEPT && sleep 5 && echo allowDpv4" if testEnv.Config().DirectPathIPV4Only { if blackholeDP { From b9744a666b5f8d9fadf18590fcd017e35210af93 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Tue, 15 Dec 2020 21:18:01 +0000 Subject: [PATCH 07/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 2f14e7610cd..1c9e3af1f87 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -2345,11 +2345,6 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { } func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) { - // blackholeDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j DROP && sleep 5 && echo blackholeDpv6" - // blackholeDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j DROP && sleep 5 && echo blackholeDpv4" - // allowDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j ACCEPT && sleep 5 && echo allowDpv6" - // allowDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j ACCEPT && sleep 5 && echo allowDpv4" - if testEnv.Config().DirectPathIPV4Only { if blackholeDP { cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) From ff2de9431f63b3905f0194cfa262903e9278f844 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Fri, 18 Dec 2020 00:30:50 +0000 Subject: [PATCH 08/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 1c9e3af1f87..4dbd53c1757 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -27,7 +27,6 @@ import ( "sort" "strings" "sync" - "sync/atomic" "testing" "time" @@ -87,6 +86,7 @@ func init() { "The zone in which to create the new test instance.") flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", "The zone in which to create a second cluster in the test instance.") + // Use sysctl or iptables to blackhole DirectPath IP for fallback tests. flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") @@ -2217,7 +2217,7 @@ func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, e for i := 0; i < numRPCsToSend; i++ { _, _ = table.ReadRow(ctx, "jadams") if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != expectDP { - atomic.AddUint64(&numCount, 1) + numCount++ } time.Sleep(100 * time.Millisecond) countEnough = numCount >= minCompleteRPC @@ -2355,21 +2355,22 @@ func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeD out, _ := cmdRes.CombinedOutput() t.Logf(string(out)) } + return + } + // DirectPath supports both ipv4 and ipv6 + if blackholeDP { + cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) } else { - if blackholeDP { - cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) - out, _ := cmdRes.CombinedOutput() - t.Logf(string(out)) - cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) - out, _ = cmdRes.CombinedOutput() - t.Logf(string(out)) - } else { - cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) - out, _ := cmdRes.CombinedOutput() - t.Logf(string(out)) - cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) - out, _ = cmdRes.CombinedOutput() - t.Logf(string(out)) - } + cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) } } From 001b5a7477ae6c1ecd82c7e7420948988023a3b6 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Thu, 7 Jan 2021 23:07:55 +0000 Subject: [PATCH 09/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 52 ++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 4dbd53c1757..d83a40104b2 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -2183,47 +2183,48 @@ func TestIntegration_DirectPathFallback(t *testing.T) { } // Precondition: wait for DirectPath to connect. - countEnough := examineTraffic(ctx, testEnv, table /*blackholeDP = */, false) + countEnough := examineTraffic(ctx, testEnv, table, false) if !countEnough { t.Fatalf("Failed to observe RPCs over DirectPath") } // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. - blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, true) - countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, true) + blackholeDirectPath(testEnv, t) + countEnough = examineTraffic(ctx, testEnv, table, true) if !countEnough { t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") } // Disable the blackhole, and client should use DirectPath again. - blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, false) - countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, false) + allowDirectPath(testEnv, t) + countEnough = examineTraffic(ctx, testEnv, table, false) if !countEnough { t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") } } // examineTraffic counts RPCs use DirectPath or CFE traffic. -func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, expectDP bool) bool { - var numCount uint64 +func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool { + numCount := 0 const ( numRPCsToSend = 20 minCompleteRPC = 40 ) - countEnough := false start := time.Now() - for !countEnough && time.Since(start) < 2*time.Minute { + for time.Since(start) < 2*time.Minute { for i := 0; i < numRPCsToSend; i++ { _, _ = table.ReadRow(ctx, "jadams") - if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != expectDP { + if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP { numCount++ + if numCount >= minCompleteRPC { + return true + } } time.Sleep(100 * time.Millisecond) - countEnough = numCount >= minCompleteRPC } } - return countEnough + return false } func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { @@ -2344,27 +2345,26 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) } -func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) { +func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) { if testEnv.Config().DirectPathIPV4Only { - if blackholeDP { - cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) - out, _ := cmdRes.CombinedOutput() - t.Logf(string(out)) - } else { - cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) - out, _ := cmdRes.CombinedOutput() - t.Logf(string(out)) - } - return - } - // DirectPath supports both ipv4 and ipv6 - if blackholeDP { + cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + } else { cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) out, _ := cmdRes.CombinedOutput() t.Logf(string(out)) cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) out, _ = cmdRes.CombinedOutput() t.Logf(string(out)) + } +} + +func allowDirectPath(testEnv IntegrationEnv, t *testing.T) { + if testEnv.Config().DirectPathIPV4Only { + cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) } else { cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) out, _ := cmdRes.CombinedOutput() From af681e9e46f69593d61cf44e3b786c196170b3f5 Mon Sep 17 00:00:00 2001 From: mohanli-ml Date: Mon, 11 Jan 2021 21:19:53 +0000 Subject: [PATCH 10/10] feat(bigtable): Add a DirectPath fallback integration test --- bigtable/integration_test.go | 50 ++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index d83a40104b2..308b569af23 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -2162,7 +2162,7 @@ func TestIntegration_DirectPathFallback(t *testing.T) { defer cleanup() if !testEnv.Config().AttemptDirectPath { - return + t.Skip() } if len(blackholeDpv6Cmd) == 0 { @@ -2183,27 +2183,27 @@ func TestIntegration_DirectPathFallback(t *testing.T) { } // Precondition: wait for DirectPath to connect. - countEnough := examineTraffic(ctx, testEnv, table, false) - if !countEnough { + dpEnabled := examineTraffic(ctx, testEnv, table, false) + if !dpEnabled { t.Fatalf("Failed to observe RPCs over DirectPath") } // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. blackholeDirectPath(testEnv, t) - countEnough = examineTraffic(ctx, testEnv, table, true) - if !countEnough { + dpDisabled := examineTraffic(ctx, testEnv, table, true) + if !dpDisabled { t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") } // Disable the blackhole, and client should use DirectPath again. allowDirectPath(testEnv, t) - countEnough = examineTraffic(ctx, testEnv, table, false) - if !countEnough { + dpEnabled = examineTraffic(ctx, testEnv, table, false) + if !dpEnabled { t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") } } -// examineTraffic counts RPCs use DirectPath or CFE traffic. +// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true). func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool { numCount := 0 const ( @@ -2346,31 +2346,25 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { } func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) { + cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) if testEnv.Config().DirectPathIPV4Only { - cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) - out, _ := cmdRes.CombinedOutput() - t.Logf(string(out)) - } else { - cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) - out, _ := cmdRes.CombinedOutput() - t.Logf(string(out)) - cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) - out, _ = cmdRes.CombinedOutput() - t.Logf(string(out)) + return } + cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) } func allowDirectPath(testEnv IntegrationEnv, t *testing.T) { + cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) if testEnv.Config().DirectPathIPV4Only { - cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) - out, _ := cmdRes.CombinedOutput() - t.Logf(string(out)) - } else { - cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) - out, _ := cmdRes.CombinedOutput() - t.Logf(string(out)) - cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) - out, _ = cmdRes.CombinedOutput() - t.Logf(string(out)) + return } + cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) }