Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Add a DirectPath fallback integration test #3384

Merged
merged 13 commits into from Jan 13, 2021
Merged
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 109 additions & 0 deletions bigtable/integration_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math"
"math/rand"
"os/exec"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -72,6 +73,10 @@ func populatePresidentsGraph(table *Table) error {
var instanceToCreate string
var instanceToCreateZone string
var instanceToCreateZone2 string
var blackholeDpv6Cmd string
var blackholeDpv4Cmd string
var allowDpv6Cmd string
var allowDpv4Cmd string

func init() {
// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
Expand All @@ -81,6 +86,11 @@ func init() {
"The zone in which to create the new test instance.")
flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c",
"The zone in which to create a second cluster in the test instance.")
// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
Comment on lines +91 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment with the expected values for these flags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added.

}

func TestIntegration_ConditionalMutations(t *testing.T) {
Expand Down Expand Up @@ -2142,6 +2152,81 @@ func TestIntegration_AdminBackup(t *testing.T) {
}
}

// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
func TestIntegration_DirectPathFallback(t *testing.T) {
ctx := context.Background()
testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()

if !testEnv.Config().AttemptDirectPath {
t.Skip()
}

if len(blackholeDpv6Cmd) == 0 {
t.Fatal("-it.blackhole-dpv6-cmd unset")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this will fail the test if the flags aren't passed in. Perhaps we should skip instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have that 3 lines above, from L2164 to L2166:

if !testEnv.Config().AttemptDirectPath {
  return
}

If the test want to test DirectPath, then we can fail the test if no command is given.

}
if len(blackholeDpv4Cmd) == 0 {
t.Fatal("-it.blackhole-dpv4-cmd unset")
}
if len(allowDpv6Cmd) == 0 {
t.Fatal("-it.allowdpv6-cmd unset")
}
if len(allowDpv4Cmd) == 0 {
t.Fatal("-it.allowdpv4-cmd unset")
}

if err := populatePresidentsGraph(table); err != nil {
t.Fatal(err)
}

// Precondition: wait for DirectPath to connect.
dpEnabled := examineTraffic(ctx, testEnv, table, false)
if !dpEnabled {
t.Fatalf("Failed to observe RPCs over DirectPath")
}

// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeDirectPath(testEnv, t)
dpDisabled := examineTraffic(ctx, testEnv, table, true)
if !dpDisabled {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}

// Disable the blackhole, and client should use DirectPath again.
allowDirectPath(testEnv, t)
dpEnabled = examineTraffic(ctx, testEnv, table, false)
if !dpEnabled {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}

// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true).
func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool {
numCount := 0
const (
numRPCsToSend = 20
minCompleteRPC = 40
)

start := time.Now()
for time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = table.ReadRow(ctx, "jadams")
if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP {
numCount++
if numCount >= minCompleteRPC {
return true
}
}
time.Sleep(100 * time.Millisecond)
}
}
return false
}

func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
testEnv, err := NewIntegrationEnv()
if err != nil {
Expand Down Expand Up @@ -2259,3 +2344,27 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}

func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
if testEnv.Config().DirectPathIPV4Only {
return
}
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}

func allowDirectPath(testEnv IntegrationEnv, t *testing.T) {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
if testEnv.Config().DirectPathIPV4Only {
return
}
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}