Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/1443 remove the JS runtime from threshold calculations #2251

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 18 additions & 11 deletions core/engine_test.go
Expand Up @@ -258,7 +258,7 @@ func TestEngine_processSamples(t *testing.T) {
})
t.Run("submetric", func(t *testing.T) {
t.Parallel()
ths, err := stats.NewThresholds([]string{`1+1==2`})
ths, err := stats.NewThresholds([]string{`value<2`})
assert.NoError(t, err)

e, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{
Expand Down Expand Up @@ -286,7 +286,10 @@ func TestEngineThresholdsWillAbort(t *testing.T) {
t.Parallel()
metric := stats.New("my_metric", stats.Gauge)

ths, err := stats.NewThresholds([]string{"1+1==3"})
// The incoming samples for the metric set it to 1.25. Considering
// the metric is of type Gauge, value > 1.25 should always fail, and
// trigger an abort.
ths, err := stats.NewThresholds([]string{"value>1.25"})
assert.NoError(t, err)
ths.Thresholds[0].AbortOnFail = true

Expand All @@ -305,7 +308,11 @@ func TestEngineAbortedByThresholds(t *testing.T) {
t.Parallel()
metric := stats.New("my_metric", stats.Gauge)

ths, err := stats.NewThresholds([]string{"1+1==3"})
// The MiniRunner sets the value of the metric to 1.25. Considering
// the metric is of type Gauge, value > 1.25 should always fail, and
// trigger an abort.
// **N.B**: a threshold returning an error, won't trigger an abort.
ths, err := stats.NewThresholds([]string{"value>1.25"})
assert.NoError(t, err)
ths.Thresholds[0].AbortOnFail = true

Expand Down Expand Up @@ -343,14 +350,14 @@ func TestEngine_processThresholds(t *testing.T) {
ths map[string][]string
abort bool
}{
"passing": {true, map[string][]string{"my_metric": {"1+1==2"}}, false},
"failing": {false, map[string][]string{"my_metric": {"1+1==3"}}, false},
"aborting": {false, map[string][]string{"my_metric": {"1+1==3"}}, true},

"submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"1+1==2"}}, false},
"submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"1+1==3"}}, false},
"submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"1+1==2"}}, false},
"submetric,nomatch,failing": {true, map[string][]string{"my_metric{a:2}": {"1+1==3"}}, false},
"passing": {true, map[string][]string{"my_metric": {"value<2"}}, false},
"failing": {false, map[string][]string{"my_metric": {"value>1.25"}}, false},
"aborting": {false, map[string][]string{"my_metric": {"value>1.25"}}, true},

"submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"value<2"}}, false},
"submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"value>1.25"}}, false},
"submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"value<2"}}, false},
"submetric,nomatch,failing": {true, map[string][]string{"my_metric{a:2}": {"value>1.25"}}, false},
}

for name, data := range testdata {
Expand Down
181 changes: 112 additions & 69 deletions stats/thresholds.go
Expand Up @@ -17,54 +17,35 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package stats

import (
"bytes"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/dop251/goja"

"go.k6.io/k6/lib/types"
)

const jsEnvSrc = `
function p(pct) {
return __sink__.P(pct/100.0);
};
`

var jsEnv *goja.Program

func init() {
pgm, err := goja.Compile("__env__", jsEnvSrc, true)
if err != nil {
panic(err)
}
jsEnv = pgm
}

// Threshold is a representation of a single threshold for a single metric
type Threshold struct {
// Source is the text based source of the threshold
Source string
// LastFailed is a makrer if the last testing of this threshold failed
// LastFailed is a marker if the last testing of this threshold failed
LastFailed bool
// AbortOnFail marks if a given threshold fails that the whole test should be aborted
AbortOnFail bool
// AbortGracePeriod is a the minimum amount of time a test should be running before a failing
// this threshold will abort the test
AbortGracePeriod types.NullDuration

pgm *goja.Program
rt *goja.Runtime
// parsed is the threshold expression parsed from the Source
parsed *thresholdExpression
}

func newThreshold(src string, newThreshold *goja.Runtime, abortOnFail bool, gracePeriod types.NullDuration) (*Threshold, error) {
pgm, err := goja.Compile("__threshold__", src, true)
func newThreshold(src string, abortOnFail bool, gracePeriod types.NullDuration) (*Threshold, error) {
parsedExpression, err := parseThresholdExpression(src)
if err != nil {
return nil, err
}
Expand All @@ -73,23 +54,57 @@ func newThreshold(src string, newThreshold *goja.Runtime, abortOnFail bool, grac
Source: src,
AbortOnFail: abortOnFail,
AbortGracePeriod: gracePeriod,
pgm: pgm,
rt: newThreshold,
parsed: parsedExpression,
}, nil
}

func (t Threshold) runNoTaint() (bool, error) {
v, err := t.rt.RunProgram(t.pgm)
if err != nil {
return false, err
func (t *Threshold) runNoTaint(sinks map[string]float64) (bool, error) {
// Extract the sink value for the aggregation method used in the threshold
// expression
lhs, ok := sinks[t.parsed.AggregationMethod]
if !ok {
return false, fmt.Errorf("unable to apply threshold %s over metrics; reason: "+
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next step here, in another PR, would be to tie the threshold validation with the metrics in our metrics registry. This validation is awesome, much better than what existed (or rather, what didn't exist 馃槄) before, but we can do it right after the initial execution of the init context. Right now, this script:

import { sleep } from 'k6';
import { Counter } from 'k6/metrics';
import exec from 'k6/execution';


const myCounter = new Counter('my_counter');

export const options = {
    vus: 5,
    iterations: 30,
    thresholds: {
        my_counter: ['p(95)<200'], // error, Counters do not have p()
    },
};


export default function () {
    console.log(`[t=${(new Date()) - exec.scenario.startTime}ms] VU{${exec.vu.idInTest}} ran scenario iteration ${exec.scenario.iterationInTest}`);
    myCounter.add(1);
    sleep(1);
}

will emit something like this:

INFO[0000] [t=0ms] VU{3} ran scenario iteration 0        source=console
INFO[0000] [t=0ms] VU{5} ran scenario iteration 2        source=console
INFO[0000] [t=0ms] VU{2} ran scenario iteration 1        source=console
INFO[0000] [t=0ms] VU{1} ran scenario iteration 3        source=console
INFO[0000] [t=0ms] VU{4} ran scenario iteration 4        source=console
INFO[0001] [t=1001ms] VU{2} ran scenario iteration 7     source=console
INFO[0001] [t=1001ms] VU{4} ran scenario iteration 5     source=console
INFO[0001] [t=1001ms] VU{3} ran scenario iteration 9     source=console
INFO[0001] [t=1001ms] VU{5} ran scenario iteration 6     source=console
INFO[0001] [t=1001ms] VU{1} ran scenario iteration 8     source=console
ERRO[0002] Threshold error                               component=engine error="threshold 0 run error: unable to apply threshold p(95)<200 over metrics; reason: no metric supporting the p(95) aggregation method found" m=my_counter

but right now there is nothing preventing us from validating the thresholds before we actually start running the script, registry should have all of the metrics (custom or built-in) and their types right after the last line here:

k6/cmd/run.go

Lines 113 to 115 in 23575eb

registry := metrics.NewRegistry()
builtinMetrics := metrics.RegisterBuiltinMetrics(registry)
initRunner, err := newRunner(logger, src, runType, filesystems, runtimeOptions, builtinMetrics, registry)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing #1346 can probably be done at the same time as 猬嗭笍 , but I'm not finding an already open issue for it. @oleiade, do you want to create one or should I?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, thinking about it, #2320 will also be easier when we don't have to do any validation in Thresholds.Run() 馃 馃帀 Each threshold can be "pre-compiled" by just turning it into a Go lambda or something like that. Once we do the validation once after the init context execution, that a threshold impression is valid for the metric type it is defined for, we shouldn't need any more validation afterwards 馃帀

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good points indeed! I've created #2330 to keep track of it 馃帀

"no metric supporting the %s aggregation method found",
t.Source,
t.parsed.AggregationMethod)
}
return v.ToBoolean(), nil

// Apply the threshold expression operator to the left and
// right hand side values
var passes bool
switch t.parsed.Operator {
case ">":
passes = lhs > t.parsed.Value
case ">=":
passes = lhs >= t.parsed.Value
case "<=":
passes = lhs <= t.parsed.Value
case "<":
passes = lhs < t.parsed.Value
case "==", "===":
// Considering a sink always maps to float64 values,
// strictly equal is equivalent to loosely equal
passes = lhs == t.parsed.Value
case "!=":
passes = lhs != t.parsed.Value
default:
// The parseThresholdExpression function should ensure that no invalid
// operator gets through, but let's protect our future selves anyhow.
return false, fmt.Errorf("unable to apply threshold %s over metrics; "+
"reason: %s is an invalid operator",
t.Source,
t.parsed.Operator,
)
}

// Perform the actual threshold verification
return passes, nil
}

func (t *Threshold) run() (bool, error) {
b, err := t.runNoTaint()
t.LastFailed = !b
return b, err
func (t *Threshold) run(sinks map[string]float64) (bool, error) {
passes, err := t.runNoTaint(sinks)
t.LastFailed = !passes
return passes, err
}

type thresholdConfig struct {
Expand All @@ -98,11 +113,11 @@ type thresholdConfig struct {
AbortGracePeriod types.NullDuration `json:"delayAbortEval"`
}

//used internally for JSON marshalling
// used internally for JSON marshalling
type rawThresholdConfig thresholdConfig

func (tc *thresholdConfig) UnmarshalJSON(data []byte) error {
//shortcircuit unmarshalling for simple string format
// shortcircuit unmarshalling for simple string format
if err := json.Unmarshal(data, &tc.Threshold); err == nil {
return nil
}
Expand All @@ -122,9 +137,9 @@ func (tc thresholdConfig) MarshalJSON() ([]byte, error) {

// Thresholds is the combination of all Thresholds for a given metric
type Thresholds struct {
Runtime *goja.Runtime
Thresholds []*Threshold
Abort bool
sinked map[string]float64
}

// NewThresholds returns Thresholds objects representing the provided source strings
Expand All @@ -138,60 +153,88 @@ func NewThresholds(sources []string) (Thresholds, error) {
}

func newThresholdsWithConfig(configs []thresholdConfig) (Thresholds, error) {
rt := goja.New()
if _, err := rt.RunProgram(jsEnv); err != nil {
return Thresholds{}, fmt.Errorf("threshold builtin error: %w", err)
}
thresholds := make([]*Threshold, len(configs))
sinked := make(map[string]float64)

ts := make([]*Threshold, len(configs))
for i, config := range configs {
t, err := newThreshold(config.Threshold, rt, config.AbortOnFail, config.AbortGracePeriod)
t, err := newThreshold(config.Threshold, config.AbortOnFail, config.AbortGracePeriod)
if err != nil {
return Thresholds{}, fmt.Errorf("threshold %d error: %w", i, err)
}
ts[i] = t
thresholds[i] = t
}

return Thresholds{rt, ts, false}, nil
return Thresholds{thresholds, false, sinked}, nil
}

func (ts *Thresholds) updateVM(sink Sink, t time.Duration) error {
ts.Runtime.Set("__sink__", sink)
f := sink.Format(t)
for k, v := range f {
ts.Runtime.Set(k, v)
}
return nil
}

func (ts *Thresholds) runAll(t time.Duration) (bool, error) {
succ := true
for i, th := range ts.Thresholds {
b, err := th.run()
func (ts *Thresholds) runAll(timeSpentInTest time.Duration) (bool, error) {
succeeded := true
for i, threshold := range ts.Thresholds {
b, err := threshold.run(ts.sinked)
if err != nil {
return false, fmt.Errorf("threshold %d run error: %w", i, err)
}

if !b {
succ = false
succeeded = false

if ts.Abort || !th.AbortOnFail {
if ts.Abort || !threshold.AbortOnFail {
continue
}

ts.Abort = !th.AbortGracePeriod.Valid ||
th.AbortGracePeriod.Duration < types.Duration(t)
ts.Abort = !threshold.AbortGracePeriod.Valid ||
threshold.AbortGracePeriod.Duration < types.Duration(timeSpentInTest)
}
}
return succ, nil

return succeeded, nil
}

// Run processes all the thresholds with the provided Sink at the provided time and returns if any
// of them fails
func (ts *Thresholds) Run(sink Sink, t time.Duration) (bool, error) {
if err := ts.updateVM(sink, t); err != nil {
return false, err
func (ts *Thresholds) Run(sink Sink, duration time.Duration) (bool, error) {
// Initialize the sinks store
ts.sinked = make(map[string]float64)

// FIXME: Remove this comment as soon as the stats.Sink does not expose Format anymore.
//
// As of December 2021, this block reproduces the behavior of the
// stats.Sink.Format behavior. As we intend to try to get away from it,
// we instead implement the behavior directly here.
//
// For more details, see https://github.com/grafana/k6/issues/2320
switch sinkImpl := sink.(type) {
case *CounterSink:
ts.sinked["count"] = sinkImpl.Value
ts.sinked["rate"] = sinkImpl.Value / (float64(duration) / float64(time.Second))
case *GaugeSink:
ts.sinked["value"] = sinkImpl.Value
case *TrendSink:
ts.sinked["min"] = sinkImpl.Min
ts.sinked["max"] = sinkImpl.Max
ts.sinked["avg"] = sinkImpl.Avg
ts.sinked["med"] = sinkImpl.Med

// Parse the percentile thresholds and insert them in
// the sinks mapping.
for _, threshold := range ts.Thresholds {
if !strings.HasPrefix(threshold.parsed.AggregationMethod, "p(") {
continue
}

ts.sinked[threshold.parsed.AggregationMethod] = sinkImpl.P(threshold.parsed.AggregationValue.Float64 / 100)
}
case *RateSink:
ts.sinked["rate"] = float64(sinkImpl.Trues) / float64(sinkImpl.Total)
case DummySink:
for k, v := range sinkImpl {
ts.sinked[k] = v
}
default:
return false, fmt.Errorf("unable to run Thresholds; reason: unknown sink type")
}
return ts.runAll(t)

return ts.runAll(duration)
}

// UnmarshalJSON is implementation of json.Unmarshaler
Expand Down