Skip to content

Commit

Permalink
Add support for native distributed execution
Browse files Browse the repository at this point in the history
This adds the `k6 agent` and `k6 coordinator` sub-commands and adds a very simple way to do distributed execution, including packaging and sending the script to agents, and setup() and teardown() handling. However, it doesn't include automatic metric handling (e.g. thresholds and the end-of-test summary).
  • Loading branch information
na-- committed Dec 13, 2023
1 parent 19a30e2 commit 2443ac6
Show file tree
Hide file tree
Showing 10 changed files with 1,529 additions and 6 deletions.
125 changes: 125 additions & 0 deletions cmd/agent.go
@@ -0,0 +1,125 @@
package cmd

import (
"bytes"
"encoding/json"

"github.com/spf13/afero"
"github.com/spf13/cobra"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/execution"
"go.k6.io/k6/execution/distributed"
"go.k6.io/k6/js"
"go.k6.io/k6/lib"
"go.k6.io/k6/loader"
"go.k6.io/k6/metrics"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gopkg.in/guregu/null.v3"
)

// TODO: a whole lot of cleanup, refactoring, error handling and hardening
func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen
c := &cmdsRunAndAgent{gs: gs}

c.loadConfiguredTest = func(cmd *cobra.Command, args []string) (
*loadedAndConfiguredTest, execution.Controller, error,
) {
// TODO: add some gRPC authentication
conn, err := grpc.Dial(args[0], grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, err
}
c.testEndHook = func(err error) {
gs.Logger.Debugf("k6 agent run ended with err=%s", err)
_ = conn.Close()
}

client := distributed.NewDistributedTestClient(conn)

resp, err := client.Register(gs.Ctx, &distributed.RegisterRequest{})
if err != nil {
return nil, nil, err
}

controller, err := distributed.NewAgentController(gs.Ctx, resp.InstanceID, client, gs.Logger)
if err != nil {
return nil, nil, err
}

var options lib.Options
if err = json.Unmarshal(resp.Options, &options); err != nil {
return nil, nil, err
}

arc, err := lib.ReadArchive(bytes.NewReader(resp.Archive))
if err != nil {
return nil, nil, err
}

registry := metrics.NewRegistry()
piState := &lib.TestPreInitState{
Logger: gs.Logger,
RuntimeOptions: lib.RuntimeOptions{
NoThresholds: null.BoolFrom(true),
NoSummary: null.BoolFrom(true),
Env: arc.Env,
CompatibilityMode: null.StringFrom(arc.CompatibilityMode),
},
Registry: registry,
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
}

initRunner, err := js.NewFromArchive(piState, arc)
if err != nil {
return nil, nil, err
}

test := &loadedTest{
pwd: arc.Pwd,
sourceRootPath: arc.Filename,
source: &loader.SourceData{
Data: resp.Archive,
URL: arc.FilenameURL,
},
fs: afero.NewMemMapFs(), // TODO: figure out what should be here
fileSystems: arc.Filesystems,
preInitState: piState,
initRunner: initRunner,
}

pseudoConsoldatedConfig := applyDefault(Config{Options: options})
for _, thresholds := range pseudoConsoldatedConfig.Thresholds {
if err = thresholds.Parse(); err != nil {
return nil, nil, err
}
}
derivedConfig, err := deriveAndValidateConfig(pseudoConsoldatedConfig, initRunner.IsExecutable, gs.Logger)
if err != nil {
return nil, nil, err
}

configuredTest := &loadedAndConfiguredTest{
loadedTest: test,
consolidatedConfig: pseudoConsoldatedConfig,
derivedConfig: derivedConfig,
}

gs.Flags.Address = "" // TODO: fix, this is a hack so agents don't start an API server

return configuredTest, controller, nil // TODO
}

agentCmd := &cobra.Command{
Use: "agent",
Short: "Join a distributed load test",
Long: `TODO`,
Args: exactArgsWithMsg(1, "arg should either the IP and port of the controller k6 instance"),
RunE: c.run,
Hidden: true, // TODO: remove when officially released
}

// TODO: add flags

return agentCmd
}
85 changes: 85 additions & 0 deletions cmd/coordinator.go
@@ -0,0 +1,85 @@
package cmd

import (
"net"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.k6.io/k6/cmd/state"
"go.k6.io/k6/execution/distributed"
"google.golang.org/grpc"
)

// cmdCoordinator handles the `k6 coordinator` sub-command
type cmdCoordinator struct {
gs *state.GlobalState
gRPCAddress string
instanceCount int
}

func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) {
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
if err != nil {
return err
}

coordinator, err := distributed.NewCoordinatorServer(
c.instanceCount, test.initRunner.MakeArchive(), c.gs.Logger,
)
if err != nil {
return err
}

c.gs.Logger.Infof("Starting gRPC server on %s", c.gRPCAddress)
listener, err := net.Listen("tcp", c.gRPCAddress)
if err != nil {
return err
}

grpcServer := grpc.NewServer() // TODO: add auth and a whole bunch of other options
distributed.RegisterDistributedTestServer(grpcServer, coordinator)

go func() {
err := grpcServer.Serve(listener)
c.gs.Logger.Debugf("gRPC server end: %s", err)
}()
coordinator.Wait()
c.gs.Logger.Infof("All done!")
return nil
}

func (c *cmdCoordinator) flagSet() *pflag.FlagSet {
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.SortFlags = false
flags.AddFlagSet(optionFlagSet())
flags.AddFlagSet(runtimeOptionFlagSet(false))

// TODO: add support bi-directional gRPC authentication and authorization
flags.StringVar(&c.gRPCAddress, "grpc-addr", "localhost:6566", "address on which to bind the gRPC server")

// TODO: add some better way to specify the test, e.g. an execution segment
// sequence + some sort of a way to map instances with specific segments
// (e.g. key-value tags that can be matched to every execution segment, with
// each instance advertising its own tags when it connects).
flags.IntVar(&c.instanceCount, "instance-count", 1, "number of distributed instances")
return flags
}

func getCmdCoordnator(gs *state.GlobalState) *cobra.Command {
c := &cmdCoordinator{
gs: gs,
}

coordinatorCmd := &cobra.Command{
Use: "coordinator",
Short: "Start a distributed load test",
Long: `TODO`,
RunE: c.run,
Hidden: true, // TODO: remove when officially released
}

coordinatorCmd.Flags().SortFlags = false
coordinatorCmd.Flags().AddFlagSet(c.flagSet())

return coordinatorCmd
}
1 change: 1 addition & 0 deletions cmd/root.go
Expand Up @@ -64,6 +64,7 @@ func newRootCommand(gs *state.GlobalState) *rootCommand {
getCmdArchive, getCmdCloud, getCmdNewScript, getCmdInspect,
getCmdLogin, getCmdPause, getCmdResume, getCmdScale, getCmdRun,
getCmdStats, getCmdStatus, getCmdVersion,
getCmdAgent, getCmdCoordnator,
}

for _, sc := range subCommands {
Expand Down
16 changes: 10 additions & 6 deletions cmd/run.go
Expand Up @@ -37,12 +37,13 @@ import (
"go.k6.io/k6/ui/pb"
)

// cmdRun handles the `k6 run` sub-command
type cmdRun struct {
// cmdsRunAndAgent handles the `k6 run` and `k6 agent` sub-commands
type cmdsRunAndAgent struct {
gs *state.GlobalState

// TODO: figure out something more elegant?
loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error)
testEndHook func(err error)
}

const (
Expand All @@ -60,14 +61,17 @@ const (
// TODO: split apart some more
//
//nolint:funlen,gocognit,gocyclo,cyclop
func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) {
var logger logrus.FieldLogger = c.gs.Logger
defer func() {
if err == nil {
logger.Debug("Everything has finished, exiting k6 normally!")
} else {
logger.WithError(err).Debug("Everything has finished, exiting k6 with an error!")
}
if c.testEndHook != nil {
c.testEndHook(err)
}
}()
printBanner(c.gs)

Expand Down Expand Up @@ -435,7 +439,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
return nil
}

func (c *cmdRun) flagSet() *pflag.FlagSet {
func (c *cmdsRunAndAgent) flagSet() *pflag.FlagSet {
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
flags.SortFlags = false
flags.AddFlagSet(optionFlagSet())
Expand All @@ -444,7 +448,7 @@ func (c *cmdRun) flagSet() *pflag.FlagSet {
return flags
}

func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfiguredTest) error {
func (c *cmdsRunAndAgent) setupTracerProvider(ctx context.Context, test *loadedAndConfiguredTest) error {
ro := test.preInitState.RuntimeOptions
if ro.TracesOutput.String == "none" {
test.preInitState.TracerProvider = trace.NewNoopTracerProvider()
Expand All @@ -461,7 +465,7 @@ func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfigu
}

func getCmdRun(gs *state.GlobalState) *cobra.Command {
c := &cmdRun{
c := &cmdsRunAndAgent{
gs: gs,
loadConfiguredTest: func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) {
test, err := loadAndConfigureLocalTest(gs, cmd, args, getConfig)
Expand Down

0 comments on commit 2443ac6

Please sign in to comment.