Skip to content

Commit

Permalink
feat(spanner): Add a DirectPath fallback integration test (#3487)
Browse files Browse the repository at this point in the history
Co-authored-by: Thiago Nunes <thiagotnunes@google.com>
  • Loading branch information
mohanli-ml and thiagotnunes committed Jan 7, 2021
1 parent 3830238 commit de821c5
Showing 1 changed file with 110 additions and 0 deletions.
110 changes: 110 additions & 0 deletions spanner/integration_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"math/big"
"os"
"os/exec"
"reflect"
"regexp"
"strings"
Expand Down Expand Up @@ -168,12 +169,22 @@ var (
}

validInstancePattern = regexp.MustCompile("^projects/(?P<project>[^/]+)/instances/(?P<instance>[^/]+)$")

blackholeDpv6Cmd string
blackholeDpv4Cmd string
allowDpv6Cmd string
allowDpv4Cmd string
)

func init() {
flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag")
flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM")
peerInfo = &peer.Peer{}
// Use sysctl or iptables to blackhole DirectPath IP for fallback tests.
flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
}

type directPathTestConfig struct {
Expand Down Expand Up @@ -2957,6 +2968,52 @@ func TestIntegration_StartBackupOperation(t *testing.T) {
}
}

// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
func TestIntegration_DirectPathFallback(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements)
defer cleanup()

if !dpConfig.attemptDirectPath {
return
}
if len(blackholeDpv6Cmd) == 0 {
t.Fatal("-it.blackhole-dpv6-cmd unset")
}
if len(blackholeDpv4Cmd) == 0 {
t.Fatal("-it.blackhole-dpv4-cmd unset")
}
if len(allowDpv6Cmd) == 0 {
t.Fatal("-it.allowdpv6-cmd unset")
}
if len(allowDpv4Cmd) == 0 {
t.Fatal("-it.allowdpv4-cmd unset")
}

// Precondition: wait for DirectPath to connect.
countEnough := examineTraffic(ctx, client /*blackholeDP = */, false)
if !countEnough {
t.Fatalf("Failed to observe RPCs over DirectPath")
}

// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeOrAllowDirectPath(t /*blackholeDP = */, true)
countEnough = examineTraffic(ctx, client /*blackholeDP = */, true)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}

// Disable the blackhole, and client should use DirectPath again.
blackholeOrAllowDirectPath(t /*blackholeDP = */, false)
countEnough = examineTraffic(ctx, client /*blackholeDP = */, false)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}

// Prepare initializes Cloud Spanner testing DB and clients.
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
if databaseAdmin == nil {
Expand Down Expand Up @@ -3303,3 +3360,56 @@ func isDirectPathRemoteAddress() (_ string, _ bool) {
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}

// examineTraffic counts RPCs use DirectPath or CFE traffic.
func examineTraffic(ctx context.Context, client *Client, expectDP bool) bool {
var numCount uint64
const (
numRPCsToSend = 20
minCompleteRPC = 40
)
countEnough := false
start := time.Now()
for !countEnough && time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = readAllTestTable(client.Single().Read(ctx, testTable, Key{"k1"}, testTableColumns))
if _, useDP := isDirectPathRemoteAddress(); useDP != expectDP {
numCount++
}
time.Sleep(100 * time.Millisecond)
countEnough = numCount >= minCompleteRPC
}
}
return countEnough
}

func blackholeOrAllowDirectPath(t *testing.T, blackholeDP bool) {
if dpConfig.directPathIPv4Only {
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
}
return
}
// DirectPath supports both ipv4 and ipv6
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}
}

0 comments on commit de821c5

Please sign in to comment.