Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Add a DirectPath fallback integration test #3384

Merged
merged 13 commits into from Jan 13, 2021
Merged
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
114 changes: 114 additions & 0 deletions bigtable/integration_test.go
Expand Up @@ -22,10 +22,12 @@ import (
"fmt"
"math"
"math/rand"
"os/exec"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -72,6 +74,10 @@ func populatePresidentsGraph(table *Table) error {
var instanceToCreate string
var instanceToCreateZone string
var instanceToCreateZone2 string
var blackholeDpv6Cmd string
var blackholeDpv4Cmd string
var allowDpv6Cmd string
var allowDpv4Cmd string

func init() {
// Don't test instance creation by default, as quota is necessary and aborted tests could strand resources.
Expand All @@ -81,6 +87,10 @@ func init() {
"The zone in which to create the new test instance.")
flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c",
"The zone in which to create a second cluster in the test instance.")
flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6")
flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4")
flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6")
flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4")
Comment on lines +91 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment with the expected values for these flags?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added.

}

func TestIntegration_ConditionalMutations(t *testing.T) {
Expand Down Expand Up @@ -2142,6 +2152,80 @@ func TestIntegration_AdminBackup(t *testing.T) {
}
}

// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
func TestIntegration_DirectPathFallback(t *testing.T) {
ctx := context.Background()
testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t)
if err != nil {
t.Fatal(err)
}
defer cleanup()

if !testEnv.Config().AttemptDirectPath {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return or t.Skip? Return means the test "passes." So, skipping seems more correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

if len(blackholeDpv6Cmd) == 0 {
t.Fatal("-it.blackhole-dpv6-cmd unset")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this will fail the test if the flags aren't passed in. Perhaps we should skip instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have that 3 lines above, from L2164 to L2166:

if !testEnv.Config().AttemptDirectPath {
  return
}

If the test want to test DirectPath, then we can fail the test if no command is given.

}
if len(blackholeDpv4Cmd) == 0 {
t.Fatal("-it.blackhole-dpv4-cmd unset")
}
if len(allowDpv6Cmd) == 0 {
t.Fatal("-it.allowdpv6-cmd unset")
}
if len(allowDpv4Cmd) == 0 {
t.Fatal("-it.allowdpv4-cmd unset")
}

if err := populatePresidentsGraph(table); err != nil {
t.Fatal(err)
}

// Precondition: wait for DirectPath to connect.
countEnough := examineTraffic(ctx, testEnv, table /*blackholeDP = */, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this blackholdDP = comment means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I want to make /*blackholeDP=*/ after the comma, like countEnough := examineTraffic(ctx, testEnv, table, /*blackholeDP = */false), to indicate the meaning of the parameter, i.e., DirectPath is blackholed or not, because it could be confusing with only true and false here. However, gofmt will move the comment before the comma.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those types of comments are not ordinarily done in Go style, which is why gofmt doesn't handle them well -- they're uncommon and not designed for. I gave a few suggestions below of ways we could simplify.

if !countEnough {
t.Fatalf("Failed to observe RPCs over DirectPath")
}

// Enable the blackhole, which will prevent communication with grpclb and thus DirectPath.
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, true)
countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, true)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}

// Disable the blackhole, and client should use DirectPath again.
blackholeOrAllowDirectPath(testEnv, t /*blackholeDP = */, false)
countEnough = examineTraffic(ctx, testEnv, table /*blackholeDP = */, false)
if !countEnough {
t.Fatalf("Failed to fallback to CFE after blackhole DirectPath")
}
}

// examineTraffic counts RPCs use DirectPath or CFE traffic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// examineTraffic counts RPCs use DirectPath or CFE traffic.
// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true).

Is that right? I'm not sure if there is a more clear way to say it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is correct.

func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, expectDP bool) bool {
var numCount uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider:

Suggested change
var numCount uint64
count := 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

const (
numRPCsToSend = 20
minCompleteRPC = 40
)

countEnough := false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return as soon as we set this to true, thereby avoiding the need for this variable at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I have removed the variable.

start := time.Now()
for !countEnough && time.Since(start) < 2*time.Minute {
for i := 0; i < numRPCsToSend; i++ {
_, _ = table.ReadRow(ctx, "jadams")
if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != expectDP {
atomic.AddUint64(&numCount, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm missing something -- is this run in parallel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bigtable client will open 4 channels by default (https://github.com/googleapis/google-cloud-go/blob/master/bigtable/bigtable.go#L78), so they could be run in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular code will not run in parallel. The code you linked to is for pooling the underlying network connections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. I removed the atomic operation.

}
time.Sleep(100 * time.Millisecond)
countEnough = numCount >= minCompleteRPC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the suggestion to return early above, the check should be above the time.Sleep call. No need to sleep if we're already done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
return countEnough
}

func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) {
testEnv, err := NewIntegrationEnv()
if err != nil {
Expand Down Expand Up @@ -2259,3 +2343,33 @@ func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) {
// DirectPath ipv6 can use either ipv4 or ipv6 traffic.
return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix)
}

func blackholeOrAllowDirectPath(testEnv IntegrationEnv, t *testing.T, blackholeDP bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split this into two functions, one corresponding to blackholeDP=true and one for false?

That would get rid of the need for the blackholeDP argument at all, making calls more clear and easier to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if testEnv.Config().DirectPathIPV4Only {
if blackholeDP {
cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
}
} else {
if blackholeDP {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
} else {
if blackholeDP {
}
return
}
if blackholeDP {

https://github.com/golang/go/wiki/CodeReviewComments#indent-error-flow is related.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks for catching the format!

cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
} else {
cmdRes := exec.Command("bash", "-c", allowDpv4Cmd)
out, _ := cmdRes.CombinedOutput()
t.Logf(string(out))
cmdRes = exec.Command("bash", "-c", allowDpv6Cmd)
out, _ = cmdRes.CombinedOutput()
t.Logf(string(out))
}
}
}