Skip to content

Commit

Permalink
feat(bigtable): Add a DirectPath fallback integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanli-ml committed Jan 7, 2021
1 parent ff2de94 commit 001b5a7
Showing 1 changed file with 26 additions and 26 deletions.
52 changes: 26 additions & 26 deletions bigtable/integration_test.go
Expand Up @@ -2183,47 +2183,48 @@ func TestIntegration_DirectPathFallback(t *testing.T) {
}

// Precondition: wait for DirectPath to connect.
countEnough := examineTraffic(ctx, testEnv, table /*blackholeDP = */, false)
countEnough := examineTraffic(ctx, testEnv, table, false)
if !countEnough {
t.Fatalf("Failed to observe RPCs over DirectPath")
}

// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, true)
countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, true)
blackholeDirectPath(testEnv, t)
countEnough = examineTraffic(ctx, testEnv, table, true)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}

// Disable the blackhole, and client should use DirectPath again.
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, false)
countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, false)
allowDirectPath(testEnv, t)
countEnough = examineTraffic(ctx, testEnv, table, false)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}

// examineTraffic counts RPCs use DirectPath or CFE traffic.
func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, expectDP bool) bool {
var numCount uint64
func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool {
numCount := 0
const (
numRPCsToSend = 20
minCompleteRPC = 40
)

countEnough := false
start := time.Now()
for !countEnough && time.Since(start) < 2*time.Minute {
for time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = table.ReadRow(ctx, "jadams")
if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != expectDP {
if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP {
numCount++
if numCount >= minCompleteRPC {
return true
}
}
time.Sleep(100 * time.Millisecond)
countEnough = numCount >= minCompleteRPC
}
}
return countEnough
return false
}

func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
Expand Down Expand Up @@ -2344,27 +2345,26 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}

func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) {
func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) {
if testEnv.Config().DirectPathIPV4Only {
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
}
return
}
// DirectPath supports both ipv4 and ipv6
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}
}

func allowDirectPath(testEnv IntegrationEnv, t *testing.T) {
if testEnv.Config().DirectPathIPV4Only {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
Expand Down

0 comments on commit 001b5a7

Please sign in to comment.