Skip to content

Commit

Permalink
Remove the JS runtime from threshold calculations
Browse files Browse the repository at this point in the history
In this commit we replace the previously existing
thresholds condition evaluation, which was depending
on Goja's Runtime, with a new pure-Go one.

Thresholds are now parsed, and evaluated in Go, and
no JS rutimes are involved in the process anymore. It
is built upong the thresholds parser, and parser
combinators library introduced in previous commits.
  • Loading branch information
oleiade committed Nov 25, 2021
1 parent f5754a1 commit 28cd94b
Show file tree
Hide file tree
Showing 3 changed files with 506 additions and 182 deletions.
29 changes: 18 additions & 11 deletions core/engine_test.go
Expand Up @@ -258,7 +258,7 @@ func TestEngine_processSamples(t *testing.T) {
})
t.Run("submetric", func(t *testing.T) {
t.Parallel()
ths, err := stats.NewThresholds([]string{`1+1==2`})
ths, err := stats.NewThresholds([]string{`value<2`})
assert.NoError(t, err)

e, _, wait := newTestEngine(t, nil, nil, nil, lib.Options{
Expand Down Expand Up @@ -286,7 +286,10 @@ func TestEngineThresholdsWillAbort(t *testing.T) {
t.Parallel()
metric := stats.New("my_metric", stats.Gauge)

ths, err := stats.NewThresholds([]string{"1+1==3"})
// The incoming samples for the metric set it to 1.25. Considering
// the metric is of type Gauge, value > 1.25 should always fail, and
// trigger an abort.
ths, err := stats.NewThresholds([]string{"value>1.25"})
assert.NoError(t, err)
ths.Thresholds[0].AbortOnFail = true

Expand All @@ -305,7 +308,11 @@ func TestEngineAbortedByThresholds(t *testing.T) {
t.Parallel()
metric := stats.New("my_metric", stats.Gauge)

ths, err := stats.NewThresholds([]string{"1+1==3"})
// The MiniRunner sets the value of the metric to 1.25. Considering
// the metric is of type Gauge, value > 1.25 should always fail, and
// trigger an abort.
// **N.B**: a threshold returning an error, won't trigger an abort.
ths, err := stats.NewThresholds([]string{"value>1.25"})
assert.NoError(t, err)
ths.Thresholds[0].AbortOnFail = true

Expand Down Expand Up @@ -343,14 +350,14 @@ func TestEngine_processThresholds(t *testing.T) {
ths map[string][]string
abort bool
}{
"passing": {true, map[string][]string{"my_metric": {"1+1==2"}}, false},
"failing": {false, map[string][]string{"my_metric": {"1+1==3"}}, false},
"aborting": {false, map[string][]string{"my_metric": {"1+1==3"}}, true},

"submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"1+1==2"}}, false},
"submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"1+1==3"}}, false},
"submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"1+1==2"}}, false},
"submetric,nomatch,failing": {true, map[string][]string{"my_metric{a:2}": {"1+1==3"}}, false},
"passing": {true, map[string][]string{"my_metric": {"value<2"}}, false},
"failing": {false, map[string][]string{"my_metric": {"value>1.25"}}, false},
"aborting": {false, map[string][]string{"my_metric": {"value>1.25"}}, true},

"submetric,match,passing": {true, map[string][]string{"my_metric{a:1}": {"value<2"}}, false},
"submetric,match,failing": {false, map[string][]string{"my_metric{a:1}": {"value>1.25"}}, false},
"submetric,nomatch,passing": {true, map[string][]string{"my_metric{a:2}": {"value<2"}}, false},
"submetric,nomatch,failing": {true, map[string][]string{"my_metric{a:2}": {"value>1.25"}}, false},
}

for name, data := range testdata {
Expand Down
242 changes: 173 additions & 69 deletions stats/thresholds.go
Expand Up @@ -17,7 +17,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package stats

import (
Expand All @@ -26,70 +25,181 @@ import (
"fmt"
"time"

"github.com/dop251/goja"

"go.k6.io/k6/lib/types"
)

const jsEnvSrc = `
function p(pct) {
return __sink__.P(pct/100.0);
};
`

var jsEnv *goja.Program

func init() {
pgm, err := goja.Compile("__env__", jsEnvSrc, true)
if err != nil {
panic(err)
}
jsEnv = pgm
}

// Threshold is a representation of a single threshold for a single metric
type Threshold struct {
// Source is the text based source of the threshold
Source string
// LastFailed is a makrer if the last testing of this threshold failed
// LastFailed is a marker if the last testing of this threshold failed
LastFailed bool
// AbortOnFail marks if a given threshold fails that the whole test should be aborted
AbortOnFail bool
// AbortGracePeriod is a the minimum amount of time a test should be running before a failing
// this threshold will abort the test
AbortGracePeriod types.NullDuration

pgm *goja.Program
rt *goja.Runtime
// parsed is the threshold condition parsed from the Source expression
parsed *thresholdCondition
}

func newThreshold(src string, newThreshold *goja.Runtime, abortOnFail bool, gracePeriod types.NullDuration) (*Threshold, error) {
pgm, err := goja.Compile("__threshold__", src, true)
func newThreshold(src string, abortOnFail bool, gracePeriod types.NullDuration) (*Threshold, error) {
condition, err := parseThresholdCondition(src)
if err != nil {
return nil, err
}

return &Threshold{
Source: src,
parsed: condition,
AbortOnFail: abortOnFail,
AbortGracePeriod: gracePeriod,
pgm: pgm,
rt: newThreshold,
}, nil
}

func (t Threshold) runNoTaint() (bool, error) {
v, err := t.rt.RunProgram(t.pgm)
if err != nil {
return false, err
func (t *Threshold) runNoTaint(sinks map[string]float64) (bool, error) {
// Extract the sink value for the aggregation method used in the threshold
// expression
lhs, ok := sinks[t.parsed.AggregationMethod]
if !ok {
return false, fmt.Errorf("unable to apply threshold %s over metrics; reason: "+
"no metric supporting the %s aggregation method found",
t.parsed.AggregationMethod,
t.parsed.AggregationMethod)
}

// Apply the threshold expression operator to the left and
// right hand side values
var passes bool
switch t.parsed.Operator {
case ">":
passes = lhs > t.parsed.Value
case ">=":
passes = lhs >= t.parsed.Value
case "<=":
passes = lhs <= t.parsed.Value
case "<":
passes = lhs < t.parsed.Value
case "==":
passes = lhs == t.parsed.Value
case "===":
// Considering a sink always maps to float64 values,
// strictly equal is equivalent to loosely equal
passes = lhs == t.parsed.Value
case "!=":
passes = lhs != t.parsed.Value
default:
// The ParseThresholdCondition constructor should ensure that no invalid
// operator gets through, but let's protect our future selves anyhow.
return false, fmt.Errorf("unable to apply threshold %s over metrics; "+
"reason: %s is an invalid operator",
t.Source,
t.parsed.Operator,
)
}
return v.ToBoolean(), nil

// Perform the actual threshold verification
return passes, nil
}

func (t *Threshold) run() (bool, error) {
b, err := t.runNoTaint()
t.LastFailed = !b
return b, err
func (t *Threshold) run(sinks map[string]float64) (bool, error) {
passes, err := t.runNoTaint(sinks)
t.LastFailed = !passes
return passes, err
}

type thresholdCondition struct {
AggregationMethod string
Operator string
Value float64
}

// ParseThresholdCondition parses a threshold condition expression,
// as defined in a JS script (for instance p(95)<1000), into a ThresholdCondition
// instance, using our parser combinators package.

// This parser expect a threshold expression matching the following BNF
//
// ```
// assertion -> aggregation_method whitespace* operator whitespace* float
// aggregation_method -> trend | rate | gauge | counter
// counter -> "count" | "sum" | "rate"
// gauge -> "last" | "min" | "max" | "value"
// rate -> "rate"
// trend -> "min" | "mean" | "avg" | "max" | percentile
// percentile -> "p(" float ")"
// operator -> ">" | ">=" | "<=" | "<" | "==" | "===" | "!="
// float -> digit+ (. digit+)?
// digit -> "0" | "1" | "2" | "3" | "4" | "5" | "6" | "7" | "8" | "9"
// whitespace -> space | tab
// tab -> "\t"
// space -> " "
// ```
func parseThresholdCondition(expression string) (*thresholdCondition, error) {
parser := ParseAssertion()

// Parse the Threshold as provided in the JS script options thresholds value (p(95)<1000)
result := parser([]rune(expression))
if result.Err != nil {
return nil, fmt.Errorf("parsing threshold condition %s failed; "+
"reason: the parser failed on %s",
expression,
result.Err.ErrorAtChar([]rune(expression)))
}

// The Sequence combinator will return a slice of interface{}
// instances. Up to us to decide what we want to cast them down
// to.
// Considering our expression format, the parser should return a slice
// of size 3 to us: aggregation_method operator sink_value. The type system
// ensures us it should be the case too, but let's protect our future selves anyhow.
var ok bool
parsed, ok := result.Payload.([]interface{})
if !ok {
return nil, fmt.Errorf("parsing threshold condition %s failed"+
"; reason: unable to cast parsed expression to []interface{}"+
"it looks like you've found a bug, we'd be grateful if you would consider "+
"opening an issue in the K6 repository (https://github.com/grafana/k6/issues/new)",
expression,
)
} else if len(parsed) != 3 {
return nil, fmt.Errorf("parsing threshold condition %s failed"+
"; reason: parsed %d expression tokens, expected 3 (aggregation_method operator value, as in rate<100)"+
"it looks like you've found a bug, we'd be grateful if you would consider "+
"opening an issue in the K6 repository (https://github.com/grafana/k6/issues/new)",
expression,
len(parsed),
)
}

// Unpack the various components of the parsed threshold expression
method, ok := parsed[0].(string)
if !ok {
return nil, fmt.Errorf("the threshold expression parser failed; " +
"reason: unable to cast parsed aggregation method to string" +
"it looks like you've found a bug, we'd be grateful if you would consider " +
"opening an issue in the K6 repository (https://github.com/grafana/k6/issues/new)",
)
}
operator, ok := parsed[1].(string)
if !ok {
return nil, fmt.Errorf("the threshold expression parser failed; " +
"reason: unable to cast parsed operator to string" +
"it looks like you've found a bug, we'd be grateful if you would consider " +
"opening an issue in the K6 repository (https://github.com/grafana/k6/issues/new)",
)
}

value, ok := parsed[2].(float64)
if !ok {
return nil, fmt.Errorf("the threshold expression parser failed; " +
"reason: unable to cast parsed value to underlying type (float64)" +
"it looks like you've found a bug, we'd be grateful if you would consider " +
"opening an issue in the K6 repository (https://github.com/grafana/k6/issues/new)",
)
}

return &thresholdCondition{AggregationMethod: method, Operator: operator, Value: value}, nil
}

type thresholdConfig struct {
Expand All @@ -98,11 +208,11 @@ type thresholdConfig struct {
AbortGracePeriod types.NullDuration `json:"delayAbortEval"`
}

//used internally for JSON marshalling
// used internally for JSON marshalling
type rawThresholdConfig thresholdConfig

func (tc *thresholdConfig) UnmarshalJSON(data []byte) error {
//shortcircuit unmarshalling for simple string format
// shortcircuit unmarshalling for simple string format
if err := json.Unmarshal(data, &tc.Threshold); err == nil {
return nil
}
Expand All @@ -122,9 +232,9 @@ func (tc thresholdConfig) MarshalJSON() ([]byte, error) {

// Thresholds is the combination of all Thresholds for a given metric
type Thresholds struct {
Runtime *goja.Runtime
Thresholds []*Threshold
Abort bool
sinked map[string]float64
}

// NewThresholds returns Thresholds objects representing the provided source strings
Expand All @@ -138,60 +248,54 @@ func NewThresholds(sources []string) (Thresholds, error) {
}

func newThresholdsWithConfig(configs []thresholdConfig) (Thresholds, error) {
rt := goja.New()
if _, err := rt.RunProgram(jsEnv); err != nil {
return Thresholds{}, fmt.Errorf("threshold builtin error: %w", err)
}
thresholds := make([]*Threshold, len(configs))
sinked := make(map[string]float64)

ts := make([]*Threshold, len(configs))
for i, config := range configs {
t, err := newThreshold(config.Threshold, rt, config.AbortOnFail, config.AbortGracePeriod)
t, err := newThreshold(config.Threshold, config.AbortOnFail, config.AbortGracePeriod)
if err != nil {
return Thresholds{}, fmt.Errorf("threshold %d error: %w", i, err)
}
ts[i] = t
thresholds[i] = t
}

return Thresholds{rt, ts, false}, nil
return Thresholds{thresholds, false, sinked}, nil
}

func (ts *Thresholds) updateVM(sink Sink, t time.Duration) error {
ts.Runtime.Set("__sink__", sink)
f := sink.Format(t)
for k, v := range f {
ts.Runtime.Set(k, v)
}
return nil
}

func (ts *Thresholds) runAll(t time.Duration) (bool, error) {
succ := true
for i, th := range ts.Thresholds {
b, err := th.run()
func (ts *Thresholds) runAll(duration time.Duration) (bool, error) {
succeeded := true
for i, threshold := range ts.Thresholds {
b, err := threshold.run(ts.sinked)
if err != nil {
return false, fmt.Errorf("threshold %d run error: %w", i, err)
}

if !b {
succ = false
succeeded = false

if ts.Abort || !th.AbortOnFail {
if ts.Abort || !threshold.AbortOnFail {
continue
}

ts.Abort = !th.AbortGracePeriod.Valid ||
th.AbortGracePeriod.Duration < types.Duration(t)
ts.Abort = !threshold.AbortGracePeriod.Valid ||
threshold.AbortGracePeriod.Duration < types.Duration(duration)
}
}
return succ, nil

return succeeded, nil
}

// Run processes all the thresholds with the provided Sink at the provided time and returns if any
// of them fails
func (ts *Thresholds) Run(sink Sink, t time.Duration) (bool, error) {
if err := ts.updateVM(sink, t); err != nil {
return false, err
func (ts *Thresholds) Run(sink Sink, duration time.Duration) (bool, error) {
// Update the sinks store
ts.sinked = make(map[string]float64)
f := sink.Format(duration)
for k, v := range f {
ts.sinked[k] = v
}
return ts.runAll(t)

return ts.runAll(duration)
}

// UnmarshalJSON is implementation of json.Unmarshaler
Expand Down

0 comments on commit 28cd94b

Please sign in to comment.