Skip to content

Commit

Permalink
Add ReadableStream support to k6 (#3696)
Browse files Browse the repository at this point in the history
* Add base implementation of ReadableStream (Streams API)

Co-authored-by: oleiade <theo@crevon.me>

* Base tooling for Web Platform Tests

Co-authored-by: oleiade <theo@crevon.me>

* Web Platform Tests for ReadableStream

Co-authored-by: oleiade <theo@crevon.me>

* Expose k6/experimental/streams

Co-authored-by: oleiade <theo@crevon.me>

* Add k6/experimental/streams example

Co-authored-by: oleiade <theo@crevon.me>

* Chore: Fix linter errors

* Resolve import cycle

* Remove handmade WPTs

* Remove test specifics from shared modulestest

* Adjust ReadableStream WPTs to use checked out code

* Add GH Workflow to run WPTs for Streams

* Apply Pull Request suggestions

* Apply Pull Request suggestions

* Apply code review suggestions

* Fix Web Platform Tests execution for streams

* Apply code review suggestions

* Fix linting issue

* Apply pull request review suggestions

---------

Co-authored-by: oleiade <theo@crevon.me>
Co-authored-by: Théo Crevon <oleiade@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 29, 2024
1 parent f38ac59 commit 3431d7d
Show file tree
Hide file tree
Showing 23 changed files with 2,795 additions and 1 deletion.
26 changes: 26 additions & 0 deletions .github/workflows/wpt.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Web Platform Tests
on:
workflow_dispatch:
pull_request:

defaults:
run:
shell: bash

jobs:
streams:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
check-latest: true
- name: Run tests
run: |
set -x
cd js/modules/k6/experimental/streams/tests
sh checkout.sh
go test ../... -tags=wpt
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/dist
/pkg-build
/js/tc39/TestTC39
/js/modules/k6/experimental/streams/tests/wpt

.vscode
*.sublime-workspace
Expand Down
34 changes: 34 additions & 0 deletions examples/experimental/streams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { ReadableStream } from 'k6/experimental/streams'
import { setTimeout } from 'k6/timers'

function numbersStream() {
let currentNumber = 0

return new ReadableStream({
start(controller) {
const fn = () => {
if (currentNumber < 5) {
controller.enqueue(++currentNumber)
setTimeout(fn, 1000)
return;
}

controller.close()
}
setTimeout(fn, 1000)
},
})
}

export default async function () {
const stream = numbersStream()
const reader = stream.getReader()

while (true) {
const { done, value } = await reader.read()
if (done) break
console.log(`received number ${value} from stream`)
}

console.log('we are done')
}
2 changes: 2 additions & 0 deletions js/jsmodules.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.k6.io/k6/js/modules/k6/encoding"
"go.k6.io/k6/js/modules/k6/execution"
"go.k6.io/k6/js/modules/k6/experimental/fs"
"go.k6.io/k6/js/modules/k6/experimental/streams"
"go.k6.io/k6/js/modules/k6/experimental/tracing"
"go.k6.io/k6/js/modules/k6/grpc"
"go.k6.io/k6/js/modules/k6/html"
Expand All @@ -38,6 +39,7 @@ func getInternalJSModules() map[string]interface{} {
"k6/timers": timers.New(),
"k6/execution": execution.New(),
"k6/experimental/redis": redis.New(),
"k6/experimental/streams": streams.New(),
"k6/experimental/webcrypto": webcrypto.New(),
"k6/experimental/websockets": &expws.RootModule{},
"k6/experimental/timers": newWarnExperimentalModule(timers.New(),
Expand Down
119 changes: 119 additions & 0 deletions js/modules/k6/experimental/streams/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package streams

import "github.com/dop251/goja"

func newTypeError(rt *goja.Runtime, message string) *jsError {
return newJsError(rt, rt.Get("TypeError"), TypeError, message)
}

func newRangeError(rt *goja.Runtime, message string) *jsError {
return newJsError(rt, rt.Get("RangeError"), RangeError, message)
}

func newJsError(rt *goja.Runtime, base goja.Value, kind errorKind, message string) *jsError {
constructor, ok := goja.AssertConstructor(base)
if !ok {
throw(rt, newError(kind, message))
}

e, err := constructor(nil, rt.ToValue(message))
if err != nil {
throw(rt, newError(kind, message))
}

return &jsError{err: e, msg: message}
}

// jsError is a wrapper around a JS error object.
//
// We need to use it because whenever we need to return a [TypeError]
// or a [RangeError], we want to use original JS errors, which can be
// retrieved from Goja, for instance with: goja.Runtime.Get("TypeError").
//
// However, that is implemented as a [*goja.Object], but sometimes we
// need to return that error as a Go [error], or even keep the instance
// in memory to be returned/thrown later.
//
// So, we use this wrapper instead of returning the original JS error.
// Otherwise, we would need to replace everything typed as [error] with
// [any] to be compatible, and that would be a mess.
type jsError struct {
err *goja.Object
msg string
}

func (e *jsError) Error() string {
return e.msg
}

func (e *jsError) Err() *goja.Object {
return e.err
}

func newError(k errorKind, message string) *streamError {
return &streamError{
Name: k.String(),
Message: message,
kind: k,
}
}

//go:generate enumer -type=errorKind -output errors_gen.go
type errorKind uint8

const (
// TypeError is thrown when an argument is not of an expected type
TypeError errorKind = iota + 1

// RangeError is thrown when an argument is not within the expected range
RangeError

// RuntimeError is thrown when an error occurs that was caused by the JS runtime
// and is not likely caused by the user, but rather the implementation.
RuntimeError

// AssertionError is thrown when an assertion fails
AssertionError

// NotSupportedError is thrown when a feature is not supported, or not yet implemented
NotSupportedError
)

type streamError struct {
// Name contains the name of the error
Name string `json:"name"`

// Message contains the error message
Message string `json:"message"`

// kind contains the kind of error
kind errorKind
}

// Ensure that the fsError type implements the Go `error` interface
var _ error = (*streamError)(nil)

func (e *streamError) Error() string {
return e.Name + ":" + e.Message
}

func throw(rt *goja.Runtime, err any) {
if e, ok := err.(*jsError); ok {
panic(e.Err())
}

panic(errToObj(rt, err))
}

func errToObj(rt *goja.Runtime, err any) goja.Value {
// Undefined remains undefined.
if goja.IsUndefined(rt.ToValue(err)) {
return rt.ToValue(err)
}

if e, ok := err.(*goja.Exception); ok {
return e.Value().ToObject(rt)
}

return rt.ToValue(err).ToObject(rt)
}
53 changes: 53 additions & 0 deletions js/modules/k6/experimental/streams/errors_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

107 changes: 107 additions & 0 deletions js/modules/k6/experimental/streams/goja.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package streams

import (
"fmt"
"reflect"

"github.com/dop251/goja"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
)

// newResolvedPromise instantiates a new resolved promise.
func newResolvedPromise(vu modules.VU, with goja.Value) *goja.Promise {
promise, resolve, _ := vu.Runtime().NewPromise()
resolve(with)
return promise
}

// newRejectedPromise instantiates a new rejected promise.
func newRejectedPromise(vu modules.VU, with any) *goja.Promise {
promise, _, reject := vu.Runtime().NewPromise()
reject(with)
return promise
}

// promiseThen facilitates instantiating a new promise and defining callbacks for to be executed
// on fulfillment as well as rejection, directly from Go.
func promiseThen(
rt *goja.Runtime,
promise *goja.Promise,
onFulfilled, onRejected func(goja.Value),
) (*goja.Promise, error) {
val, err := rt.RunString(
`(function(promise, onFulfilled, onRejected) { return promise.then(onFulfilled, onRejected) })`)
if err != nil {
return nil, newError(RuntimeError, "unable to initialize promiseThen internal helper function")
}

cal, ok := goja.AssertFunction(val)
if !ok {
return nil, newError(RuntimeError, "the internal promiseThen helper is not a function")
}

if onRejected == nil {
val, err = cal(goja.Undefined(), rt.ToValue(promise), rt.ToValue(onFulfilled))
} else {
val, err = cal(goja.Undefined(), rt.ToValue(promise), rt.ToValue(onFulfilled), rt.ToValue(onRejected))
}

if err != nil {
return nil, err
}

newPromise, ok := val.Export().(*goja.Promise)
if !ok {
return nil, newError(RuntimeError, "unable to cast the internal promiseThen helper's return value to a promise")
}

return newPromise, nil
}

// isNumber returns true if the given goja value holds a number
func isNumber(value goja.Value) bool {
_, isFloat := value.Export().(float64)
_, isInt := value.Export().(int64)

return isFloat || isInt
}

// isNonNegativeNumber implements the [IsNonNegativeNumber] algorithm.
//
// [IsNonNegativeNumber]: https://streams.spec.whatwg.org/#is-non-negative-number
func isNonNegativeNumber(value goja.Value) bool {
if common.IsNullish(value) {
return false
}

if !isNumber(value) {
return false
}

if value.ToFloat() < 0 || value.ToInteger() < 0 {
return false
}

return true
}

// setReadOnlyPropertyOf sets a read-only property on the given [goja.Object].
func setReadOnlyPropertyOf(obj *goja.Object, objName, propName string, propValue goja.Value) error {
err := obj.DefineDataProperty(propName,
propValue,
goja.FLAG_FALSE,
goja.FLAG_FALSE,
goja.FLAG_TRUE,
)
if err != nil {
return fmt.Errorf("unable to define %s read-only property on %s object; reason: %w", propName, objName, err)
}

return nil
}

// isObject determines whether the given [goja.Value] is a [goja.Object] or not.
func isObject(val goja.Value) bool {
return val != nil && val.ExportType() != nil && val.ExportType().Kind() == reflect.Map
}

0 comments on commit 3431d7d

Please sign in to comment.