diff --git a/spanner/integration_test.go b/spanner/integration_test.go index a54d6d7d005..6c3a0738095 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -25,6 +25,7 @@ import ( "math" "math/big" "os" + "os/exec" "reflect" "regexp" "strings" @@ -168,12 +169,22 @@ var ( } validInstancePattern = regexp.MustCompile("^projects/(?P[^/]+)/instances/(?P[^/]+)$") + + blackholeDpv6Cmd string + blackholeDpv4Cmd string + allowDpv6Cmd string + allowDpv4Cmd string ) func init() { flag.BoolVar(&dpConfig.attemptDirectPath, "it.attempt-directpath", false, "DirectPath integration test flag") flag.BoolVar(&dpConfig.directPathIPv4Only, "it.directpath-ipv4-only", false, "Run DirectPath on a IPv4-only VM") peerInfo = &peer.Peer{} + // Use sysctl or iptables to blackhole DirectPath IP for fallback tests. + flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") + flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") + flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") + flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4") } type directPathTestConfig struct { @@ -2957,6 +2968,52 @@ func TestIntegration_StartBackupOperation(t *testing.T) { } } +// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. +func TestIntegration_DirectPathFallback(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + // Set up testing environment. + client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, readDBStatements) + defer cleanup() + + if !dpConfig.attemptDirectPath { + return + } + if len(blackholeDpv6Cmd) == 0 { + t.Fatal("-it.blackhole-dpv6-cmd unset") + } + if len(blackholeDpv4Cmd) == 0 { + t.Fatal("-it.blackhole-dpv4-cmd unset") + } + if len(allowDpv6Cmd) == 0 { + t.Fatal("-it.allowdpv6-cmd unset") + } + if len(allowDpv4Cmd) == 0 { + t.Fatal("-it.allowdpv4-cmd unset") + } + + // Precondition: wait for DirectPath to connect. + countEnough := examineTraffic(ctx, client /*blackholeDP = */, false) + if !countEnough { + t.Fatalf("Failed to observe RPCs over DirectPath") + } + + // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. + blackholeOrAllowDirectPath(t /*blackholeDP = */, true) + countEnough = examineTraffic(ctx, client /*blackholeDP = */, true) + if !countEnough { + t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") + } + + // Disable the blackhole, and client should use DirectPath again. + blackholeOrAllowDirectPath(t /*blackholeDP = */, false) + countEnough = examineTraffic(ctx, client /*blackholeDP = */, false) + if !countEnough { + t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") + } +} + // Prepare initializes Cloud Spanner testing DB and clients. func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) { if databaseAdmin == nil { @@ -3303,3 +3360,56 @@ func isDirectPathRemoteAddress() (_ string, _ bool) { // DirectPath ipv6 can use either ipv4 or ipv6 traffic. return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) } + +// examineTraffic counts RPCs use DirectPath or CFE traffic. +func examineTraffic(ctx context.Context, client *Client, expectDP bool) bool { + var numCount uint64 + const ( + numRPCsToSend = 20 + minCompleteRPC = 40 + ) + countEnough := false + start := time.Now() + for !countEnough && time.Since(start) < 2*time.Minute { + for i := 0; i < numRPCsToSend; i++ { + _, _ = readAllTestTable(client.Single().Read(ctx, testTable, Key{"k1"}, testTableColumns)) + if _, useDP := isDirectPathRemoteAddress(); useDP != expectDP { + numCount++ + } + time.Sleep(100 * time.Millisecond) + countEnough = numCount >= minCompleteRPC + } + } + return countEnough +} + +func blackholeOrAllowDirectPath(t *testing.T, blackholeDP bool) { + if dpConfig.directPathIPv4Only { + if blackholeDP { + cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + } else { + cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + } + return + } + // DirectPath supports both ipv4 and ipv6 + if blackholeDP { + cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) + } else { + cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) + out, _ := cmdRes.CombinedOutput() + t.Logf(string(out)) + cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) + out, _ = cmdRes.CombinedOutput() + t.Logf(string(out)) + } +}