New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bigtable): Add a DirectPath fallback integration test #3384
Changes from 4 commits
8621eae
4c0e80c
a53fa64
789932c
39fb397
4966f16
b9744a6
ff2de94
001b5a7
af681e9
a54f3db
bc3b79a
a59c3af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -22,10 +22,12 @@ import ( | |||||||||||||||
"fmt" | ||||||||||||||||
"math" | ||||||||||||||||
"math/rand" | ||||||||||||||||
"os/exec" | ||||||||||||||||
"reflect" | ||||||||||||||||
"sort" | ||||||||||||||||
"strings" | ||||||||||||||||
"sync" | ||||||||||||||||
"sync/atomic" | ||||||||||||||||
"testing" | ||||||||||||||||
"time" | ||||||||||||||||
|
||||||||||||||||
|
@@ -2142,6 +2144,67 @@ func TestIntegration_AdminBackup(t *testing.T) { | |||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. | ||||||||||||||||
func TestIntegration_DirectPathFallback(t *testing.T) { | ||||||||||||||||
ctx := context.Background() | ||||||||||||||||
testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) | ||||||||||||||||
if err != nil { | ||||||||||||||||
t.Fatal(err) | ||||||||||||||||
} | ||||||||||||||||
defer cleanup() | ||||||||||||||||
|
||||||||||||||||
if !testEnv.Config().AttemptDirectPath { | ||||||||||||||||
return | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
if err := populatePresidentsGraph(table); err != nil { | ||||||||||||||||
t.Fatal(err) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Precondition: wait for DirectPath to connect. | ||||||||||||||||
countEnough := examineTraffic(ctx, testEnv, table /*blackholeDP = */, false) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry I want to make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those types of comments are not ordinarily done in Go style, which is why |
||||||||||||||||
if !countEnough { | ||||||||||||||||
t.Fatalf("Failed to observe RPCs over DirectPath") | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. | ||||||||||||||||
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, true) | ||||||||||||||||
countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, true) | ||||||||||||||||
if !countEnough { | ||||||||||||||||
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Disable the blackhole, and client should use DirectPath again. | ||||||||||||||||
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, false) | ||||||||||||||||
countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, false) | ||||||||||||||||
if !countEnough { | ||||||||||||||||
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// examineTraffic counts RPCs use DirectPath or CFE traffic. | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Is that right? I'm not sure if there is a more clear way to say it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this is correct. |
||||||||||||||||
func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, expectDP bool) bool { | ||||||||||||||||
var numCount uint64 | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||
const ( | ||||||||||||||||
numRPCsToSend = 20 | ||||||||||||||||
minCompleteRPC = 40 | ||||||||||||||||
) | ||||||||||||||||
|
||||||||||||||||
countEnough := false | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we return as soon as we set this to true, thereby avoiding the need for this variable at all? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I have removed the variable. |
||||||||||||||||
start := time.Now() | ||||||||||||||||
for !countEnough && time.Since(start) < 2*time.Minute { | ||||||||||||||||
for i := 0; i < numRPCsToSend; i++ { | ||||||||||||||||
_, _ = table.ReadRow(ctx, "jadams") | ||||||||||||||||
if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != expectDP { | ||||||||||||||||
atomic.AddUint64(&numCount, 1) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'm missing something -- is this run in parallel? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The bigtable client will open 4 channels by default (https://github.com/googleapis/google-cloud-go/blob/master/bigtable/bigtable.go#L78), so they could be run in parallel. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This particular code will not run in parallel. The code you linked to is for pooling the underlying network connections. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing out. I removed the atomic operation. |
||||||||||||||||
} | ||||||||||||||||
time.Sleep(100 * time.Millisecond) | ||||||||||||||||
countEnough = numCount >= minCompleteRPC | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the suggestion to return early above, the check should be above the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
return countEnough | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { | ||||||||||||||||
testEnv, err := NewIntegrationEnv() | ||||||||||||||||
if err != nil { | ||||||||||||||||
|
@@ -2259,3 +2322,38 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { | |||||||||||||||
// DirectPath ipv6 can use either ipv4 or ipv6 traffic. | ||||||||||||||||
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we split this into two functions, one corresponding to That would get rid of the need for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||||
blackholeDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j DROP && sleep 5 && echo blackholeDpv6" | ||||||||||||||||
blackholeDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j DROP && sleep 5 && echo blackholeDpv4" | ||||||||||||||||
allowDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j ACCEPT && sleep 5 && echo allowDpv6" | ||||||||||||||||
allowDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j ACCEPT && sleep 5 && echo allowDpv4" | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes me really nervous. I don't expect a test to need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test itself is a bit tricky because we want to create a bad situation to make the client think it can not receive dp response (i.e., blackholed), so that we can test fallback. Previously in java, we modified the grpc netty library to achieve the blackhole (code here: https://github.com/googleapis/java-bigtable/blob/master/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/DirectPathFallbackIT.java#L184), but this is very hacky and sorry I have no idea how to do a similar thing in Go. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed that this seems really problematic as-is. I'm more familiar with the HTTP clients but if I were testing something similar there, I would look into either setting up a small mock server to hit for my test, or using a custom roundtripper at the transport layer to produce the behavior I was looking for. Is it possible to do something similar in grpc? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this test, we want to create a low level failure on TCP/IP, and grpc does not have any API to do that. Besides, a mock server is more like what we will use in unit tests, but this is an integration test and we want to make it as real as possible, so we may not want to use a mock server. When we are running the tests, we have the ownership of the VM that runs the test (create the VM, use Also notice that users are not supposed to run this test, since users are not supposed to know if the traffic is DirectPath or CFE. I added a check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From @igorbernstein2, this go test might help us out: https://github.com/grpc/grpc-go/blob/ff1fc890e43ac77e922f53a2cef396b3c6a8f2a1/interop/grpclb_fallback/client.go#L78 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with the feedback, tests should avoid mutating the machine state. Where the network failures are injected in process. It appears that grpc-go has some hooks to do something similar via the DialContext. the grpclb_fallback test that @kolea2 linked to, seems to use those hooks to test this exact scenario There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kolea2 @igorbernstein2 The grpclb_fallback test is using shell command to blackhole ip, https://github.com/grpc/grpc-go/blob/ff1fc890e43ac77e922f53a2cef396b3c6a8f2a1/interop/grpclb_fallback/client.go#L202-L207. And from what I know the command is iptables. So it seems we still need to use iptables here? |
||||||||||||||||
|
||||||||||||||||
if testEnv.Config().DirectPathIPV4Only { | ||||||||||||||||
if blackholeDP { | ||||||||||||||||
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) | ||||||||||||||||
out, _ := cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
} else { | ||||||||||||||||
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) | ||||||||||||||||
out, _ := cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
} | ||||||||||||||||
} else { | ||||||||||||||||
if blackholeDP { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
https://github.com/golang/go/wiki/CodeReviewComments#indent-error-flow is related. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. Thanks for catching the format! |
||||||||||||||||
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) | ||||||||||||||||
out, _ := cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) | ||||||||||||||||
out, _ = cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
} else { | ||||||||||||||||
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) | ||||||||||||||||
out, _ := cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) | ||||||||||||||||
out, _ = cmdRes.CombinedOutput() | ||||||||||||||||
t.Logf(string(out)) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this
return
ort.Skip
? Return means the test "passes." So, skipping seems more correct.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done