New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bigtable): Add a DirectPath fallback integration test #3384
Changes from 7 commits
8621eae
4c0e80c
a53fa64
789932c
39fb397
4966f16
b9744a6
ff2de94
001b5a7
af681e9
a54f3db
bc3b79a
a59c3af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -22,10 +22,12 @@ import ( | |||||||||||||||
"fmt" | ||||||||||||||||
"math" | ||||||||||||||||
"math/rand" | ||||||||||||||||
"os/exec" | ||||||||||||||||
"reflect" | ||||||||||||||||
"sort" | ||||||||||||||||
"strings" | ||||||||||||||||
"sync" | ||||||||||||||||
"sync/atomic" | ||||||||||||||||
"testing" | ||||||||||||||||
"time" | ||||||||||||||||
|
||||||||||||||||
|
@@ -72,6 +74,10 @@ func populatePresidentsGraph(table *Table) error { | |||||||||||||||
var instanceToCreate string | ||||||||||||||||
var instanceToCreateZone string | ||||||||||||||||
var instanceToCreateZone2 string | ||||||||||||||||
var blackholeDpv6Cmd string | ||||||||||||||||
var blackholeDpv4Cmd string | ||||||||||||||||
var allowDpv6Cmd string | ||||||||||||||||
var allowDpv4Cmd string | ||||||||||||||||
|
||||||||||||||||
func init() { | ||||||||||||||||
// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. | ||||||||||||||||
|
@@ -81,6 +87,10 @@ func init() { | |||||||||||||||
"The zone in which to create the new test instance.") | ||||||||||||||||
flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", | ||||||||||||||||
"The zone in which to create a second cluster in the test instance.") | ||||||||||||||||
flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") | ||||||||||||||||
flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") | ||||||||||||||||
flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") | ||||||||||||||||
flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4") | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func TestIntegration_ConditionalMutations(t *testing.T) { | ||||||||||||||||
|
@@ -2142,6 +2152,80 @@ func TestIntegration_AdminBackup(t *testing.T) { | |||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. | ||||||||||||||||
func TestIntegration_DirectPathFallback(t *testing.T) { | ||||||||||||||||
ctx := context.Background() | ||||||||||||||||
testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) | ||||||||||||||||
if err != nil { | ||||||||||||||||
t.Fatal(err) | ||||||||||||||||
} | ||||||||||||||||
defer cleanup() | ||||||||||||||||
|
||||||||||||||||
if !testEnv.Config().AttemptDirectPath { | ||||||||||||||||
return | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
if len(blackholeDpv6Cmd) == 0 { | ||||||||||||||||
t.Fatal("-it.blackhole-dpv6-cmd unset") | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this will fail the test if the flags aren't passed in. Perhaps we should skip instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have that 3 lines above, from L2164 to L2166:
If the test want to test DirectPath, then we can fail the test if no command is given. |
||||||||||||||||
} | ||||||||||||||||
if len(blackholeDpv4Cmd) == 0 { | ||||||||||||||||
t.Fatal("-it.blackhole-dpv4-cmd unset") | ||||||||||||||||
} | ||||||||||||||||
if len(allowDpv6Cmd) == 0 { | ||||||||||||||||
t.Fatal("-it.allowdpv6-cmd unset") | ||||||||||||||||
} | ||||||||||||||||
if len(allowDpv4Cmd) == 0 { | ||||||||||||||||
t.Fatal("-it.allowdpv4-cmd unset") | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
if err := populatePresidentsGraph(table); err != nil { | ||||||||||||||||
t.Fatal(err) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Precondition: wait for DirectPath to connect. | ||||||||||||||||
countEnough := examineTraffic(ctx, testEnv, table /*blackholeDP = */, false) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I want to make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those types of comments are not ordinarily done in Go style, which is why |
||||||||||||||||
if !countEnough { | ||||||||||||||||
t.Fatalf("Failed to observe RPCs over DirectPath") | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. | ||||||||||||||||
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, true) | ||||||||||||||||
countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, true) | ||||||||||||||||
if !countEnough { | ||||||||||||||||
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Disable the blackhole, and client should use DirectPath again. | ||||||||||||||||
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, false) | ||||||||||||||||
countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, false) | ||||||||||||||||
if !countEnough { | ||||||||||||||||
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// examineTraffic counts RPCs use DirectPath or CFE traffic. | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Is that right? I'm not sure if there is a more clear way to say it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this is correct. |
||||||||||||||||
func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, expectDP bool) bool { | ||||||||||||||||
var numCount uint64 | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||
const ( | ||||||||||||||||
numRPCsToSend = 20 | ||||||||||||||||
minCompleteRPC = 40 | ||||||||||||||||
) | ||||||||||||||||
|
||||||||||||||||
countEnough := false | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we return as soon as we set this to true, thereby avoiding the need for this variable at all? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I have removed the variable. |
||||||||||||||||
start := time.Now() | ||||||||||||||||
for !countEnough && time.Since(start) < 2*time.Minute { | ||||||||||||||||
for i := 0; i < numRPCsToSend; i++ { | ||||||||||||||||
_, _ = table.ReadRow(ctx, "jadams") | ||||||||||||||||
if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != expectDP { | ||||||||||||||||
atomic.AddUint64(&numCount, 1) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'm missing something -- is this run in parallel? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The bigtable client will open 4 channels by default (https://github.com/googleapis/google-cloud-go/blob/master/bigtable/bigtable.go#L78), so they could be run in parallel. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This particular code will not run in parallel. The code you linked to is for pooling the underlying network connections. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing out. I removed the atomic operation. |
||||||||||||||||
} | ||||||||||||||||
time.Sleep(100 * time.Millisecond) | ||||||||||||||||
countEnough = numCount >= minCompleteRPC | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the suggestion to return early above, the check should be above the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
return countEnough | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { | ||||||||||||||||
testEnv, err := NewIntegrationEnv() | ||||||||||||||||
if err != nil { | ||||||||||||||||
|
@@ -2259,3 +2343,33 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { | |||||||||||||||
// DirectPath ipv6 can use either ipv4 or ipv6 traffic. | ||||||||||||||||
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we split this into two functions, one corresponding to That would get rid of the need for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||||
if testEnv.Config().DirectPathIPV4Only { | ||||||||||||||||
if blackholeDP { | ||||||||||||||||
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) | ||||||||||||||||
out, _ := cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
} else { | ||||||||||||||||
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) | ||||||||||||||||
out, _ := cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
} | ||||||||||||||||
} else { | ||||||||||||||||
if blackholeDP { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
https://github.com/golang/go/wiki/CodeReviewComments#indent-error-flow is related. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Thanks for catching the format! |
||||||||||||||||
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) | ||||||||||||||||
out, _ := cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) | ||||||||||||||||
out, _ = cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
} else { | ||||||||||||||||
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) | ||||||||||||||||
out, _ := cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) | ||||||||||||||||
out, _ = cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment with the expected values for these flags?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added.