Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Add a DirectPath fallback integration test #3384

Merged
merged 13 commits into from Jan 13, 2021
Merged
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
115 changes: 115 additions & 0 deletions bigtable/integration_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math"
"math/rand"
"os/exec"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -72,6 +73,10 @@ func populatePresidentsGraph(table *Table) error {
var instanceToCreate string
var instanceToCreateZone string
var instanceToCreateZone2 string
var blackholeDpv6Cmd string
var blackholeDpv4Cmd string
var allowDpv6Cmd string
var allowDpv4Cmd string

func init() {
// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
Expand All @@ -81,6 +86,11 @@ func init() {
"The zone in which to create the new test instance.")
flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c",
"The zone in which to create a second cluster in the test instance.")
// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
Comment on lines +91 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment with the expected values for these flags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added.

}

func TestIntegration_ConditionalMutations(t *testing.T) {
Expand Down Expand Up @@ -2142,6 +2152,81 @@ func TestIntegration_AdminBackup(t *testing.T) {
}
}

// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
func TestIntegration_DirectPathFallback(t *testing.T) {
ctx := context.Background()
testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()

if !testEnv.Config().AttemptDirectPath {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return or t.Skip? Return means the test "passes." So, skipping seems more correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

if len(blackholeDpv6Cmd) == 0 {
t.Fatal("-it.blackhole-dpv6-cmd unset")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this will fail the test if the flags aren't passed in. Perhaps we should skip instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have that 3 lines above, from L2164 to L2166:

if !testEnv.Config().AttemptDirectPath {
  return
}

If the test want to test DirectPath, then we can fail the test if no command is given.

}
if len(blackholeDpv4Cmd) == 0 {
t.Fatal("-it.blackhole-dpv4-cmd unset")
}
if len(allowDpv6Cmd) == 0 {
t.Fatal("-it.allowdpv6-cmd unset")
}
if len(allowDpv4Cmd) == 0 {
t.Fatal("-it.allowdpv4-cmd unset")
}

if err := populatePresidentsGraph(table); err != nil {
t.Fatal(err)
}

// Precondition: wait for DirectPath to connect.
countEnough := examineTraffic(ctx, testEnv, table, false)
if !countEnough {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this name could be more clear?

Suggested change
countEnough := examineTraffic(ctx, testEnv, table, false)
if !countEnough {
dpEnabled := examineTraffic(ctx, testEnv, table, false)
if !dpEnabled {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

t.Fatalf("Failed to observe RPCs over DirectPath")
}

// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeDirectPath(testEnv, t)
countEnough = examineTraffic(ctx, testEnv, table, true)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}

// Disable the blackhole, and client should use DirectPath again.
allowDirectPath(testEnv, t)
countEnough = examineTraffic(ctx, testEnv, table, false)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}

// examineTraffic counts RPCs use DirectPath or CFE traffic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// examineTraffic counts RPCs use DirectPath or CFE traffic.
// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true).

Is that right? I'm not sure if there is a more clear way to say it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is correct.

func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool {
numCount := 0
const (
numRPCsToSend = 20
minCompleteRPC = 40
)

start := time.Now()
for time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = table.ReadRow(ctx, "jadams")
if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP {
numCount++
if numCount >= minCompleteRPC {
return true
}
}
time.Sleep(100 * time.Millisecond)
}
}
return false
}

func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
testEnv, err := NewIntegrationEnv()
if err != nil {
Expand Down Expand Up @@ -2259,3 +2344,33 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}

func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) {
if testEnv.Config().DirectPathIPV4Only {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if testEnv.Config().DirectPathIPV4Only {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
if testEnv.Config().DirectPathIPV4Only {
return
}
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

func allowDirectPath(testEnv IntegrationEnv, t *testing.T) {
if testEnv.Config().DirectPathIPV4Only {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}
}