Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Add a DirectPath fallback integration test #3384

Merged
merged 13 commits into from Jan 13, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 94 additions & 0 deletions bigtable/integration_test.go
Expand Up @@ -22,10 +22,12 @@ import (
"fmt"
"math"
"math/rand"
"os/exec"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -2142,6 +2144,63 @@ func TestIntegration_AdminBackup(t *testing.T) {
}
}

// Blackhole directpath address to test fallback.
func TestIntegration_DirectPathFallback(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Blackhole directpath address to test fallback.
func TestIntegration_DirectPathFallback(t *testing.T) {
// TestIntegration_DirectPathFallback tests the fallback when the directpath address is unavailable.
func TestIntegration_DirectPathFallback(t *testing.T) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. I remember I should let the first word in the comment the same as the function name. Thanks for this!

ctx := context.Background()
testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()

if err := populatePresidentsGraph(table); err != nil {
t.Fatal(err)
}

// Precondition: wait for DirectPath to connect.
countEnough := exerciseDirectPath(ctx, testEnv, table /*blackholeDP = */, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go style does not have inline comments like this -- it's usually a sign the function being called should be split into more than one function. Perhaps one for the true case and one for the false case. That would also hopefully make it easier to understand & debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I do not understand this comment. I found this test https://github.com/googleapis/google-cloud-go/blob/master/bigtable/integration_test.go#L2013 also have similar inline comments. So could you give me an example about how should I write comments? Thanks for help!

if !countEnough {
t.Fatalf("Failed to observe RPCs over DirectPath")
}

// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, true)
countEnough = exerciseDirectPath(ctx, testEnv, table /*blackholeDP = */, true)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}

// Make sure that the client will start reading from IPv6 again by sending new requests and
// checking the injected IPv6 counter has been updated.
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, false)
countEnough = exerciseDirectPath(ctx, testEnv, table /*blackholeDP = */, false)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}

func exerciseDirectPath(ctx context.Context, testEnv IntegrationEnv, table *Table, isBlackhole bool) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me what the return value is. Perhaps a different function name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the function name to examineTraffic. The function checks and returns if enough DirectPath or CFE traffic is seen within 2 minutes.

var numCount uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider:

Suggested change
var numCount uint64
count := 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

const (
numRPCsToSend = 20
minCompleteRPC = 40
)

countEnough := false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return as soon as we set this to true, thereby avoiding the need for this variable at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have removed the variable.

start := time.Now()
for !countEnough && time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = table.ReadRow(ctx, "jadams")
if _, useDp := isDirectPathRemoteAddress(testEnv); useDp != isBlackhole {
atomic.AddUint64(&numCount, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm missing something -- is this run in parallel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bigtable client will open 4 channels by default (https://github.com/googleapis/google-cloud-go/blob/master/bigtable/bigtable.go#L78), so they could be run in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular code will not run in parallel. The code you linked to is for pooling the underlying network connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. I removed the atomic operation.

}
time.Sleep(100 * time.Millisecond)
countEnough = numCount >= minCompleteRPC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the suggestion to return early above, the check should be above the time.Sleep call. No need to sleep if we're already done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
return countEnough
}

func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
testEnv, err := NewIntegrationEnv()
if err != nil {
Expand Down Expand Up @@ -2259,3 +2318,38 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}

func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split this into two functions, one corresponding to blackholeDP=true and one for false?

That would get rid of the need for the blackholeDP argument at all, making calls more clear and easier to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

blackholeDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j DROP && sleep 5 && echo blackholeDpv6"
blackholeDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j DROP && sleep 5 && echo blackholeDpv4"
allowDpv6Cmd := "sudo ip6tables -I INPUT -s 2001:4860:8040::/42 -j ACCEPT && sleep 5 && echo allowDpv6"
allowDpv4Cmd := "sudo iptables -I INPUT -s 34.126.0.0/18 -j ACCEPT && sleep 5 && echo allowDpv4"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me really nervous. I don't expect a test to need sudo or alter iptables. Is there another way to do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test itself is a bit tricky because we want to create a bad situation to make the client think it can not receive dp response (i.e., blackholed), so that we can test fallback. Previously in java, we modified the grpc netty library to achieve the blackhole (code here: https://github.com/googleapis/java-bigtable/blob/master/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/DirectPathFallbackIT.java#L184), but this is very hacky and sorry I have no idea how to do a similar thing in Go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that this seems really problematic as-is.

I'm more familiar with the HTTP clients but if I were testing something similar there, I would look into either setting up a small mock server to hit for my test, or using a custom roundtripper at the transport layer to produce the behavior I was looking for. Is it possible to do something similar in grpc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test, we want to create a low level failure on TCP/IP, and grpc does not have any API to do that. Besides, a mock server is more like what we will use in unit tests, but this is an integration test and we want to make it as real as possible, so we may not want to use a mock server.

When we are running the tests, we have the ownership of the VM that runs the test (create the VM, use sudo to setup the VM, run the test, destroy the VM..), so it will fine to use sudo without worrying about environment consistency.

Also notice that users are not supposed to run this test, since users are not supposed to know if the traffic is DirectPath or CFE. I added a check for AttemptDirectPath flag at the beginning of the test. Does this looks good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the feedback, tests should avoid mutating the machine state.
Ideally it should follow a pattern similar to the java impl:
https://github.com/googleapis/java-bigtable/blob/master/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/it/DirectPathFallbackIT.java

Where the network failures are injected in process. It appears that grpc-go has some hooks to do something similar via the DialContext. the grpclb_fallback test that @kolea2 linked to, seems to use those hooks to test this exact scenario

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kolea2 @igorbernstein2 The grpclb_fallback test is using shell command to blackhole ip, https://github.com/grpc/grpc-go/blob/ff1fc890e43ac77e922f53a2cef396b3c6a8f2a1/interop/grpclb_fallback/client.go#L202-L207. And from what I know the command is iptables. So it seems we still need to use iptables here?


if testEnv.Config().DirectPathIPV4Only {
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
}
} else {
if blackholeDP {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
} else {
if blackholeDP {
}
return
}
if blackholeDP {

https://github.com/golang/go/wiki/CodeReviewComments#indent-error-flow is related.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks for catching the format!

cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}
}
}