Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Wip interrupt fix #3984

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions js/eventloop/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/grafana/sobek"

"go.k6.io/k6/js/modules"
)

Expand Down Expand Up @@ -79,6 +80,8 @@ func (e *EventLoop) wakeup() {
// scenario has ended). Any error returned by any callback on the main thread
// will abort the current iteration and no further event loop callbacks will be
// executed in the same iteration.
// As errors returned from resolve and reject are Interrupt errors those should
// be returned.
//
// A common pattern for async work is something like this:
//
Expand All @@ -94,11 +97,9 @@ func (e *EventLoop) wakeup() {
// result, err := doTheActualAsyncWork(vu.Context())
// enqueueCallback(func() error {
// if err != nil {
// reject(err)
// } else {
// resolve(result)
// return reject(err)
// }
// return nil // do not abort the iteration
// return resolve(result)
// })
// }()
//
Expand All @@ -124,7 +125,16 @@ func (e *EventLoop) RegisterCallback() (enqueueCallback func(func() error)) {
panic("RegisterCallback called twice")
}
callbackCalled = true
e.queue = append(e.queue, f)
wrapFunc := func() error {
rt := e.vu.Runtime()
callable, _ := sobek.AssertFunction(rt.ToValue(f))
_, err := callable(nil)
// if there is an exception the err will be nil either way

return err
}

e.queue = append(e.queue, wrapFunc)
e.registeredCallbacks--
e.lock.Unlock()
e.wakeup()
Expand Down
2 changes: 1 addition & 1 deletion js/modules/cjsmodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (cmi *cjsModuleInstance) HasTLA() bool { return false }

func (cmi *cjsModuleInstance) RequestedModules() []string { return cmi.w.RequestedModules() }

func (cmi *cjsModuleInstance) ExecuteModule(rt *sobek.Runtime, _, _ func(any)) (sobek.CyclicModuleInstance, error) {
func (cmi *cjsModuleInstance) ExecuteModule(rt *sobek.Runtime, _, _ func(any) error) (sobek.CyclicModuleInstance, error) {
v, err := rt.RunProgram(cmi.w.prg)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion js/modules/gomodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type goModuleInstance struct {
defaultExport sobek.Value
}

func (gmi *goModuleInstance) ExecuteModule(_ *sobek.Runtime, _, _ func(any)) (sobek.CyclicModuleInstance, error) {
func (gmi *goModuleInstance) ExecuteModule(_ *sobek.Runtime, _, _ func(any) error) (sobek.CyclicModuleInstance, error) {
return gmi, nil
}
func (gmi *goModuleInstance) HasTLA() bool { return false }
Expand Down
2 changes: 1 addition & 1 deletion js/modules/gomodule_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ func (bgmi *basicGoModuleInstance) GetBindingValue(n string) sobek.Value {

func (bgmi *basicGoModuleInstance) HasTLA() bool { return false }

func (bgmi *basicGoModuleInstance) ExecuteModule(_ *sobek.Runtime, _, _ func(any)) (sobek.CyclicModuleInstance, error) {
func (bgmi *basicGoModuleInstance) ExecuteModule(_ *sobek.Runtime, _, _ func(any) error) (sobek.CyclicModuleInstance, error) {
return bgmi, nil
}
52 changes: 23 additions & 29 deletions js/modules/k6/experimental/fs/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.k6.io/k6/lib/fsext"

"github.com/grafana/sobek"

"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/js/promises"
Expand Down Expand Up @@ -218,25 +219,25 @@ func (f *File) Stat() *sobek.Promise {
//
// It is possible for a read to successfully return with 0 bytes.
// This does not indicate EOF.
func (f *File) Read(into sobek.Value) *sobek.Promise {
func (f *File) Read(into sobek.Value) (*sobek.Promise, error) {
promise, resolve, reject := f.vu.Runtime().NewPromise()

if common.IsNullish(into) {
reject(newFsError(TypeError, "read() failed; reason: into argument cannot be null or undefined"))
return promise
err := reject(newFsError(TypeError, "read() failed; reason: into argument cannot be null or undefined"))
return promise, err
}

intoObj := into.ToObject(f.vu.Runtime())
if !isUint8Array(f.vu.Runtime(), intoObj) {
reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise
err := reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise, err
}

// Obtain the underlying ArrayBuffer from the Uint8Array
ab, ok := intoObj.Get("buffer").Export().(sobek.ArrayBuffer)
if !ok {
reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise
err := reject(newFsError(TypeError, "read() failed; reason: into argument must be a Uint8Array"))
return promise, err
}

// To avoid concurrency linked to modifying the runtime's `into` buffer from multiple
Expand All @@ -257,8 +258,7 @@ func (f *File) Read(into sobek.Value) *sobek.Promise {
// Read was successful, resolve early with the number of
// bytes read.
if readErr == nil {
resolve(n)
return nil
return resolve(n)
}

// If the read operation failed, we need to check if it was an io.EOF error
Expand All @@ -270,67 +270,61 @@ func (f *File) Read(into sobek.Value) *sobek.Promise {
var fsErr *fsError
isFSErr := errors.As(readErr, &fsErr)
if !isFSErr {
reject(readErr)
return nil
return reject(readErr)
}

if fsErr.kind == EOFError && n == 0 {
resolve(sobek.Null())
} else {
resolve(n)
return resolve(sobek.Null())
}

return nil
return resolve(n)
})
}()

return promise
return promise, nil
}

// Seek seeks to the given `offset` in the file, under the given `whence` mode.
//
// The returned promise resolves to the new `offset` (position) within the file, which
// is expressed in bytes from the selected start, current, or end position depending
// the provided `whence`.
func (f *File) Seek(offset sobek.Value, whence sobek.Value) *sobek.Promise {
func (f *File) Seek(offset sobek.Value, whence sobek.Value) (*sobek.Promise, error) {
promise, resolve, reject := f.vu.Runtime().NewPromise()

intOffset, err := exportInt(offset)
if err != nil {
reject(newFsError(TypeError, "seek() failed; reason: the offset argument "+err.Error()))
return promise
err := reject(newFsError(TypeError, "seek() failed; reason: the offset argument "+err.Error()))
return promise, err
}

intWhence, err := exportInt(whence)
if err != nil {
reject(newFsError(TypeError, "seek() failed; reason: the whence argument "+err.Error()))
return promise
err := reject(newFsError(TypeError, "seek() failed; reason: the whence argument "+err.Error()))
return promise, err
}

seekMode := SeekMode(intWhence)
switch seekMode {
case SeekModeStart, SeekModeCurrent, SeekModeEnd:
// Valid modes, do nothing.
default:
reject(newFsError(TypeError, "seek() failed; reason: the whence argument must be a SeekMode"))
return promise
err := reject(newFsError(TypeError, "seek() failed; reason: the whence argument must be a SeekMode"))
return promise, err
}

callback := f.vu.RegisterCallback()
go func() {
newOffset, err := f.ReadSeekStater.Seek(intOffset, seekMode)
callback(func() error {
if err != nil {
reject(err)
return err
return reject(err)
}

resolve(newOffset)
return nil
return resolve(newOffset)
})
}()

return promise
return promise, nil
}

func isUint8Array(rt *sobek.Runtime, o *sobek.Object) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"github.com/grafana/sobek"

"go.k6.io/k6/js/common"
)

Expand Down Expand Up @@ -63,15 +64,15 @@
readRequest := ReadRequest{
chunkSteps: func(chunk any) {
// Resolve promise with «[ "value" → chunk, "done" → false ]».
resolve(map[string]any{"value": chunk, "done": false})

Check failure on line 67 in js/modules/k6/experimental/streams/readable_stream_default_reader.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)
},
closeSteps: func() {
// Resolve promise with «[ "value" → undefined, "done" → true ]».
resolve(map[string]any{"value": sobek.Undefined(), "done": true})

Check failure on line 71 in js/modules/k6/experimental/streams/readable_stream_default_reader.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)
},
errorSteps: func(e any) {
// Reject promise with e.
reject(e)

Check failure on line 75 in js/modules/k6/experimental/streams/readable_stream_default_reader.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)
},
}

Expand Down
13 changes: 7 additions & 6 deletions js/modules/k6/experimental/streams/readable_stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"github.com/grafana/sobek"

"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
)
Expand Down Expand Up @@ -35,10 +36,10 @@
SetStream(stream *ReadableStream)

// GetClosed returns a [sobek.Promise] that resolves when the stream is closed.
GetClosed() (p *sobek.Promise, resolve func(any), reject func(any))
GetClosed() (p *sobek.Promise, resolve, reject func(any) error)

// SetClosed sets the [sobek.Promise] that resolves when the stream is closed.
SetClosed(p *sobek.Promise, resolve func(any), reject func(any))
SetClosed(p *sobek.Promise, resolve, reject func(any) error)

// Cancel returns a [sobek.Promise] that resolves when the stream is canceled.
Cancel(reason sobek.Value) *sobek.Promise
Expand All @@ -47,8 +48,8 @@
// BaseReadableStreamReader is a base implement
type BaseReadableStreamReader struct {
closedPromise *sobek.Promise
closedPromiseResolveFunc func(resolve any)
closedPromiseRejectFunc func(reason any)
closedPromiseResolveFunc func(resolve any) error
closedPromiseRejectFunc func(reason any) error

// stream is a [ReadableStream] instance that owns this reader
stream *ReadableStream
Expand All @@ -73,12 +74,12 @@
}

// GetClosed returns the reader's closed promise as well as its resolve and reject functions.
func (reader *BaseReadableStreamReader) GetClosed() (p *sobek.Promise, resolve func(any), reject func(any)) {
func (reader *BaseReadableStreamReader) GetClosed() (p *sobek.Promise, resolve, reject func(any) error) {
return reader.closedPromise, reader.closedPromiseResolveFunc, reader.closedPromiseRejectFunc
}

// SetClosed sets the reader's closed promise as well as its resolve and reject functions.
func (reader *BaseReadableStreamReader) SetClosed(p *sobek.Promise, resolve func(any), reject func(any)) {
func (reader *BaseReadableStreamReader) SetClosed(p *sobek.Promise, resolve, reject func(any) error) {
reader.closedPromise = p
reader.closedPromiseResolveFunc = resolve
reader.closedPromiseRejectFunc = reject
Expand Down Expand Up @@ -133,7 +134,7 @@

// 4. If stream.[[state]] is "readable", reject reader.[[closedPromise]] with a TypeError exception.
if stream.state == ReadableStreamStateReadable {
reader.closedPromiseRejectFunc(newTypeError(reader.runtime, "stream is readable").Err())

Check failure on line 137 in js/modules/k6/experimental/streams/readable_stream_reader.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)
} else { // 5. Otherwise, set reader.[[closedPromise]] to a promise rejected with a TypeError exception.
reader.closedPromise = newRejectedPromise(stream.vu, newTypeError(reader.runtime, "stream is not readable").Err())
}
Expand Down Expand Up @@ -196,7 +197,7 @@
// 4. Otherwise, if stream.[[state]] is "closed",
case ReadableStreamStateClosed:
// 4.1 Set reader.[[closedPromise]] to a promise resolved with undefined.
resolve(sobek.Undefined())

Check failure on line 200 in js/modules/k6/experimental/streams/readable_stream_reader.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)
// 5. Otherwise,
default:
// 5.1 Assert: stream.[[state]] is "errored".
Expand All @@ -206,9 +207,9 @@

// 5.2 Set reader.[[closedPromise]] to a promise rejected with stream.[[storedError]].
if jsErr, ok := stream.storedError.(*jsError); ok {
reject(jsErr.Err())

Check failure on line 210 in js/modules/k6/experimental/streams/readable_stream_reader.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)
} else {
reject(errToObj(stream.runtime, stream.storedError))

Check failure on line 212 in js/modules/k6/experimental/streams/readable_stream_reader.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)
}

// 5.3 Set reader.[[closedPromise]].[[PromiseIsHandled]] to true.
Expand Down
14 changes: 6 additions & 8 deletions js/modules/k6/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,13 @@ func (c *Client) AsyncInvoke(
method string,
req sobek.Value,
params sobek.Value,
) *sobek.Promise {
) (*sobek.Promise, error) {
grpcReq, err := c.buildInvokeRequest(method, req, params)

promise, resolve, reject := c.vu.Runtime().NewPromise()
if err != nil {
reject(err)
return promise
err = reject(err)
return promise, err
}

callback := c.vu.RegisterCallback()
Expand All @@ -307,15 +307,13 @@ func (c *Client) AsyncInvoke(

callback(func() error {
if err != nil {
reject(err)
return nil //nolint:nilerr // we don't want to return the error
return reject(err)
}
resolve(res)
return nil
return resolve(res)
})
}()

return promise
return promise, nil
}

// buildInvokeRequest creates a new InvokeRequest from the given method name, request object and parameters
Expand Down
12 changes: 5 additions & 7 deletions js/modules/k6/http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func (c *Client) asyncRequest(method string, url sobek.Value, args ...sobek.Valu
if err != nil {
var resp *Response
if resp, err = c.handleParseRequestError(err); err != nil {
reject(err)
err = reject(err)
} else {
resolve(resp)
err = resolve(resp)
}
return p, nil
return p, err
}

callback := c.moduleInstance.vu.RegisterCallback()
Expand All @@ -110,12 +110,10 @@ func (c *Client) asyncRequest(method string, url sobek.Value, args ...sobek.Valu
resp, err := httpext.MakeRequest(c.moduleInstance.vu.Context(), state, req)
callback(func() error {
if err != nil {
reject(err)
return nil //nolint:nilerr // we want to reject the promise in this case
return reject(err)
}
c.processResponse(resp, req.ResponseType)
resolve(c.responseFromHTTPext(resp))
return nil
return resolve(c.responseFromHTTPext(resp))
})
}()

Expand Down
7 changes: 3 additions & 4 deletions js/promises/promises.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package promises

import (
"github.com/grafana/sobek"

"go.k6.io/k6/js/modules"
)

Expand Down Expand Up @@ -33,15 +34,13 @@ func New(vu modules.VU) (p *sobek.Promise, resolve func(result any), reject func

resolve = func(result any) {
callback(func() error {
resolveFunc(result)
return nil
return resolveFunc(result)
})
}

reject = func(reason any) {
callback(func() error {
rejectFunc(reason)
return nil
return rejectFunc(reason)
})
}

Expand Down
12 changes: 7 additions & 5 deletions vendor/github.com/grafana/sobek/builtin_promise.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/grafana/sobek/modules.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading