Navigation Menu

Skip to content

Commit

Permalink
feat(bigtable): Add a DirectPath fallback integration test (#3384)
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanli-ml committed Jan 13, 2021
1 parent d1b59ef commit e6684c3
Showing 1 changed file with 109 additions and 0 deletions.
109 changes: 109 additions & 0 deletions bigtable/integration_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math"
"math/rand"
"os/exec"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -73,6 +74,10 @@ func populatePresidentsGraph(table *Table) error {
var instanceToCreate string
var instanceToCreateZone string
var instanceToCreateZone2 string
var blackholeDpv6Cmd string
var blackholeDpv4Cmd string
var allowDpv6Cmd string
var allowDpv4Cmd string

func init() {
// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
Expand All @@ -82,6 +87,11 @@ func init() {
"The zone in which to create the new test instance.")
flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c",
"The zone in which to create a second cluster in the test instance.")
// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
}

func TestIntegration_ConditionalMutations(t *testing.T) {
Expand Down Expand Up @@ -2208,6 +2218,81 @@ func TestIntegration_AdminBackup(t *testing.T) {
}
}

// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
func TestIntegration_DirectPathFallback(t *testing.T) {
ctx := context.Background()
testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()

if !testEnv.Config().AttemptDirectPath {
t.Skip()
}

if len(blackholeDpv6Cmd) == 0 {
t.Fatal("-it.blackhole-dpv6-cmd unset")
}
if len(blackholeDpv4Cmd) == 0 {
t.Fatal("-it.blackhole-dpv4-cmd unset")
}
if len(allowDpv6Cmd) == 0 {
t.Fatal("-it.allowdpv6-cmd unset")
}
if len(allowDpv4Cmd) == 0 {
t.Fatal("-it.allowdpv4-cmd unset")
}

if err := populatePresidentsGraph(table); err != nil {
t.Fatal(err)
}

// Precondition: wait for DirectPath to connect.
dpEnabled := examineTraffic(ctx, testEnv, table, false)
if !dpEnabled {
t.Fatalf("Failed to observe RPCs over DirectPath")
}

// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeDirectPath(testEnv, t)
dpDisabled := examineTraffic(ctx, testEnv, table, true)
if !dpDisabled {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}

// Disable the blackhole, and client should use DirectPath again.
allowDirectPath(testEnv, t)
dpEnabled = examineTraffic(ctx, testEnv, table, false)
if !dpEnabled {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}

// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true).
func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool {
numCount := 0
const (
numRPCsToSend = 20
minCompleteRPC = 40
)

start := time.Now()
for time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = table.ReadRow(ctx, "jadams")
if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP {
numCount++
if numCount >= minCompleteRPC {
return true
}
}
time.Sleep(100 * time.Millisecond)
}
}
return false
}

func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
testEnv, err := NewIntegrationEnv()
if err != nil {
Expand Down Expand Up @@ -2325,3 +2410,27 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}

func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
if testEnv.Config().DirectPathIPV4Only {
return
}
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}

func allowDirectPath(testEnv IntegrationEnv, t *testing.T) {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
if testEnv.Config().DirectPathIPV4Only {
return
}
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}

0 comments on commit e6684c3

Please sign in to comment.