Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: backport improvements to mixedversion and backup test #103912

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -146,9 +146,9 @@ func StartWithBinary(
nodes option.NodeListOption,
binaryPath string,
startOpts option.StartOpts,
) {
) error {
settings := install.MakeClusterSettings(install.BinaryOption(binaryPath))
c.Start(ctx, l, startOpts, settings, nodes)
return c.StartE(ctx, l, startOpts, settings, nodes)
}

// BinaryPathFromVersion shows where the binary for the given version
Expand Down Expand Up @@ -200,7 +200,9 @@ func RestartNodesWithNewBinary(
if err != nil {
return err
}
StartWithBinary(ctx, l, c, c.Node(node), binary, startOpts)
if err := StartWithBinary(ctx, l, c, c.Node(node), binary, startOpts); err != nil {
return err
}

// We have seen cases where a transient error could occur when this
// newly upgraded node serves as a gateway for a distributed query due
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/BUILD.bazel
Expand Up @@ -20,8 +20,10 @@ go_library(
"//pkg/roachprod/logger",
"//pkg/util/ctxgroup",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/version",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
133 changes: 96 additions & 37 deletions pkg/cmd/roachtest/roachtestutil/mixedversion/mixedversion.go
Expand Up @@ -72,6 +72,7 @@ import (
"fmt"
"math/rand"
"strings"
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
Expand Down Expand Up @@ -179,11 +180,14 @@ type (
// for human-consumption. Displayed when pretty-printing the test
// plan.
Description() string
// Background indicates whether the step should be run in the
// background. When a step is *not* run in the background, the
// test will wait for it to finish before moving on. When a
// background step fails, the entire test fails.
Background() bool
// Background returns a channel that controls the execution of a
// background step: when that channel is closed, the context
// associated with the step will be canceled. Returning `nil`
// indicates that the step should not be run in the background.
// When a step is *not* run in the background, the test will wait
// for it to finish before moving on. When a background step
// fails, the entire test fails.
Background() shouldStop
// Run implements the actual functionality of the step.
Run(context.Context, *logger.Logger, cluster.Cluster, *Helper) error
}
Expand Down Expand Up @@ -215,10 +219,29 @@ type (
seed int64
hooks *testHooks

// bgChans collects all channels that control the execution of
// background steps in the test (created with `BackgroundFunc`,
// `Workload`, et al.). These channels are passsed to the test
// runner, and the test author can stop a background function by
// closing the channel. When that happens, the test runner will
// cancel the underlying context.
//
// Invariant: there is exactly one channel in `bgChans` for each
// hook in `Test.hooks.background`.
bgChans []shouldStop

// test-only field, allowing us to avoid passing a test.Test
// implementation in the tests
_buildVersion version.Version
}

shouldStop chan struct{}

// StopFunc is the signature of the function returned by calls that
// create background steps. StopFuncs are meant to be called by test
// authors when they want to stop a background step as part of test
// logic itself, without causing the test to fail.
StopFunc func()
)

// NewTest creates a Test struct that users can use to create and run
Expand Down Expand Up @@ -252,6 +275,17 @@ func NewTest(
}
}

// RNG returns the underlying random number generator used by the
// mixedversion framework to generate a test plan. This rng can be
// used to make random decisions during test setup.
//
// Do NOT use the rng returned by this function in mixedversion hooks
// (functions passed to `InMixedVersion` and similar). Instead, use
// the rng instance directly passed as argument to those functions.
func (t *Test) RNG() *rand.Rand {
return t.prng
}

// InMixedVersion adds a new mixed-version hook to the test. The
// functionality in the function passed as argument to this function
// will be tested in arbitrary mixed-version states. If multiple
Expand Down Expand Up @@ -308,9 +342,19 @@ func (t *Test) AfterUpgradeFinalized(desc string, fn userFunc) {
// steps have finished). If the `userFunc` returns an error, it will
// cause the test to fail. These functions can run indefinitely but
// should respect the context passed to them, which will be canceled
// when the test terminates (successfully or not).
func (t *Test) BackgroundFunc(desc string, fn userFunc) {
// when the test terminates (successfully or not). Returns a function
// that can be called to terminate the step, which will cancel the
// context passed to `userFunc`.
func (t *Test) BackgroundFunc(desc string, fn userFunc) StopFunc {
t.hooks.AddBackground(versionUpgradeHook{name: desc, fn: fn})

ch := make(shouldStop)
t.bgChans = append(t.bgChans, ch)
var closeOnce sync.Once
// Make sure to only close the background channel once, allowing the
// caller to call the StopFunc multiple times (subsequent calls will
// be no-ops).
return func() { closeOnce.Do(func() { close(ch) }) }
}

// BackgroundCommand is a convenience wrapper around `BackgroundFunc`
Expand All @@ -322,8 +366,8 @@ func (t *Test) BackgroundFunc(desc string, fn userFunc) {
// command itself lived within the `mixed-version/*.log` files.
func (t *Test) BackgroundCommand(
desc string, nodes option.NodeListOption, cmd *roachtestutil.Command,
) {
t.BackgroundFunc(desc, t.runCommandFunc(nodes, cmd.String()))
) StopFunc {
return t.BackgroundFunc(desc, t.runCommandFunc(nodes, cmd.String()))
}

// Workload is a convenience wrapper that allows callers to run
Expand All @@ -333,7 +377,7 @@ func (t *Test) BackgroundCommand(
// command to actually run the command; it is run in the background.
func (t *Test) Workload(
name string, node option.NodeListOption, initCmd, runCmd *roachtestutil.Command,
) {
) StopFunc {
seed := uint64(t.prng.Int63())
addSeed := func(cmd *roachtestutil.Command) {
if !cmd.HasFlag("seed") {
Expand All @@ -347,7 +391,7 @@ func (t *Test) Workload(
}

addSeed(runCmd)
t.BackgroundCommand(fmt.Sprintf("%s workload", name), node, runCmd)
return t.BackgroundCommand(fmt.Sprintf("%s workload", name), node, runCmd)
}

// Run runs the mixed-version test. It should be called once all
Expand Down Expand Up @@ -385,6 +429,7 @@ func (t *Test) plan() (*TestPlan, error) {
crdbNodes: t.crdbNodes,
hooks: t.hooks,
prng: t.prng,
bgChans: t.bgChans,
}

return planner.Plan(), nil
Expand Down Expand Up @@ -414,8 +459,8 @@ type startFromCheckpointStep struct {
crdbNodes option.NodeListOption
}

func (s startFromCheckpointStep) ID() int { return s.id }
func (s startFromCheckpointStep) Background() bool { return false }
func (s startFromCheckpointStep) ID() int { return s.id }
func (s startFromCheckpointStep) Background() shouldStop { return nil }

func (s startFromCheckpointStep) Description() string {
return fmt.Sprintf("starting cluster from fixtures for version %q", s.version)
Expand All @@ -438,8 +483,7 @@ func (s startFromCheckpointStep) Run(

startOpts := option.DefaultStartOptsNoBackups()
startOpts.RoachprodOpts.Sequential = false
clusterupgrade.StartWithBinary(ctx, l, c, s.crdbNodes, binaryPath, startOpts)
return nil
return clusterupgrade.StartWithBinary(ctx, l, c, s.crdbNodes, binaryPath, startOpts)
}

// uploadCurrentVersionStep uploads the current cockroach binary to
Expand All @@ -453,8 +497,8 @@ type uploadCurrentVersionStep struct {
dest string
}

func (s uploadCurrentVersionStep) ID() int { return s.id }
func (s uploadCurrentVersionStep) Background() bool { return false }
func (s uploadCurrentVersionStep) ID() int { return s.id }
func (s uploadCurrentVersionStep) Background() shouldStop { return nil }

func (s uploadCurrentVersionStep) Description() string {
return fmt.Sprintf("upload current binary to all cockroach nodes (%v)", s.crdbNodes)
Expand All @@ -480,8 +524,8 @@ type waitForStableClusterVersionStep struct {
nodes option.NodeListOption
}

func (s waitForStableClusterVersionStep) ID() int { return s.id }
func (s waitForStableClusterVersionStep) Background() bool { return false }
func (s waitForStableClusterVersionStep) ID() int { return s.id }
func (s waitForStableClusterVersionStep) Background() shouldStop { return nil }

func (s waitForStableClusterVersionStep) Description() string {
return fmt.Sprintf(
Expand All @@ -504,8 +548,8 @@ type preserveDowngradeOptionStep struct {
prng *rand.Rand
}

func (s preserveDowngradeOptionStep) ID() int { return s.id }
func (s preserveDowngradeOptionStep) Background() bool { return false }
func (s preserveDowngradeOptionStep) ID() int { return s.id }
func (s preserveDowngradeOptionStep) Background() shouldStop { return nil }

func (s preserveDowngradeOptionStep) Description() string {
return "preventing auto-upgrades by setting `preserve_downgrade_option`"
Expand Down Expand Up @@ -539,8 +583,8 @@ type restartWithNewBinaryStep struct {
node int
}

func (s restartWithNewBinaryStep) ID() int { return s.id }
func (s restartWithNewBinaryStep) Background() bool { return false }
func (s restartWithNewBinaryStep) ID() int { return s.id }
func (s restartWithNewBinaryStep) Background() shouldStop { return nil }

func (s restartWithNewBinaryStep) Description() string {
return fmt.Sprintf("restart node %d with binary version %s", s.node, versionMsg(s.version))
Expand Down Expand Up @@ -574,8 +618,8 @@ type finalizeUpgradeStep struct {
prng *rand.Rand
}

func (s finalizeUpgradeStep) ID() int { return s.id }
func (s finalizeUpgradeStep) Background() bool { return false }
func (s finalizeUpgradeStep) ID() int { return s.id }
func (s finalizeUpgradeStep) Background() shouldStop { return nil }

func (s finalizeUpgradeStep) Description() string {
return "finalize upgrade by resetting `preserve_downgrade_option`"
Expand All @@ -597,11 +641,11 @@ type runHookStep struct {
testContext Context
prng *rand.Rand
hook versionUpgradeHook
background bool
stopChan shouldStop
}

func (s runHookStep) ID() int { return s.id }
func (s runHookStep) Background() bool { return s.background }
func (s runHookStep) ID() int { return s.id }
func (s runHookStep) Background() shouldStop { return s.stopChan }

func (s runHookStep) Description() string {
return fmt.Sprintf("run %q", s.hook.name)
Expand Down Expand Up @@ -684,15 +728,28 @@ func (h hooks) Filter(testContext Context) hooks {
// AsSteps transforms the sequence of hooks into a corresponding test
// step. If there is only one hook, the corresponding `runHookStep` is
// returned. Otherwise, a `concurrentRunStep` is returned, where every
// hook is run concurrently.
// hook is run concurrently. `stopChans` should either be `nil` for
// steps that are not meant to be run in the background, or contain
// one stop channel (`shouldStop`) for each hook.
func (h hooks) AsSteps(
label string, idGen func() int, prng *rand.Rand, testContext Context, background bool,
label string, idGen func() int, prng *rand.Rand, testContext Context, stopChans []shouldStop,
) []testStep {
steps := make([]testStep, 0, len(h))
for _, hook := range h {
stopChanFor := func(j int) shouldStop {
if len(stopChans) == 0 {
return nil
}
return stopChans[j]
}

for j, hook := range h {
hookPrng := rngFromRNG(prng)
steps = append(steps, runHookStep{
id: idGen(), prng: hookPrng, hook: hook, background: background, testContext: testContext,
id: idGen(),
prng: hookPrng,
hook: hook,
stopChan: stopChanFor(j),
testContext: testContext,
})
}

Expand Down Expand Up @@ -720,21 +777,23 @@ func (th *testHooks) AddAfterUpgradeFinalized(hook versionUpgradeHook) {
}

func (th *testHooks) StartupSteps(idGen func() int, testContext Context) []testStep {
return th.startup.AsSteps(startupLabel, idGen, th.prng, testContext, false)
return th.startup.AsSteps(startupLabel, idGen, th.prng, testContext, nil)
}

func (th *testHooks) BackgroundSteps(idGen func() int, testContext Context) []testStep {
return th.background.AsSteps(backgroundLabel, idGen, th.prng, testContext, true)
func (th *testHooks) BackgroundSteps(
idGen func() int, testContext Context, stopChans []shouldStop,
) []testStep {
return th.background.AsSteps(backgroundLabel, idGen, th.prng, testContext, stopChans)
}

func (th *testHooks) MixedVersionSteps(testContext Context, idGen func() int) []testStep {
return th.mixedVersion.
Filter(testContext).
AsSteps(mixedVersionLabel, idGen, th.prng, testContext, false)
AsSteps(mixedVersionLabel, idGen, th.prng, testContext, nil)
}

func (th *testHooks) AfterUpgradeFinalizedSteps(idGen func() int, testContext Context) []testStep {
return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, testContext, false)
return th.afterUpgradeFinalized.AsSteps(afterTestLabel, idGen, th.prng, testContext, nil)
}

func randomDelay(rng *rand.Rand) time.Duration {
Expand Down
3 changes: 2 additions & 1 deletion pkg/cmd/roachtest/roachtestutil/mixedversion/planner.go
Expand Up @@ -40,6 +40,7 @@ type (
rt test.Test
hooks *testHooks
prng *rand.Rand
bgChans []shouldStop
}
)

Expand Down Expand Up @@ -78,7 +79,7 @@ func (p *testPlanner) Plan() *TestPlan {
addSteps := func(ss []testStep) { steps = append(steps, ss...) }

addSteps(p.initSteps())
addSteps(p.hooks.BackgroundSteps(p.nextID, p.initialContext()))
addSteps(p.hooks.BackgroundSteps(p.nextID, p.initialContext(), p.bgChans))

// previous -> current
addSteps(p.upgradeSteps(p.initialVersion, clusterupgrade.MainVersion))
Expand Down