Skip to content

Commit

Permalink
fix: allow websocket to block in some cases but not infefinitely
Browse files Browse the repository at this point in the history
  • Loading branch information
npaton committed Oct 26, 2023
1 parent 3d38bee commit b676ce9
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 27 deletions.
2 changes: 1 addition & 1 deletion internal/runtime/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newChangesSub(ctx context.Context, p *models.Participant) (*changesSub, cha
ctx: ctx,
cancel: cancel,
p: p,
w: NewWebsocketWriter[*models.ChangePayload](ctx),
w: NewWebsocketWriter[*models.ChangePayload](ctx, "changes"),
participants: map[string]map[string]struct{}{},
}

Expand Down
5 changes: 4 additions & 1 deletion internal/runtime/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,13 @@ func startRuntime(ctx context.Context, msgBuf int32) *runtime.Runtime {
}

func setupDeadlock() func() {
runtime.WebsocketCheckInterval = 50 * time.Millisecond
runtime.WebsocketCheckThreshold = 5

optsTimeout := deadlock.Opts.DeadlockTimeout
optsOnDeadlock := deadlock.Opts.OnPotentialDeadlock

deadlock.Opts.DeadlockTimeout = 250 * time.Millisecond
deadlock.Opts.DeadlockTimeout = 500 * time.Millisecond
deadlock.Opts.OnPotentialDeadlock = func() {
defer GinkgoRecover()
Fail("potential deadlock")
Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (r *Runtime) SubOnEvent(ctx context.Context, input *mgen.OnEventInput) (<-c

actorID := actr.GetID()

w := NewWebsocketWriter[*mgen.OnEventPayload](ctx)
w := NewWebsocketWriter[*mgen.OnEventPayload](ctx, "hooks")

go r.setupOnEventSub(ctx, input, actorID, w)

Expand Down
2 changes: 1 addition & 1 deletion internal/runtime/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func newScopedAttributesSub(ctx context.Context, inputs models.ScopedAttributesI
ctx: ctx,
inputs: inputs,
scopes: make(map[string]*models.Scope),
w: NewWebsocketWriter[*mgen.SubAttributesPayload](ctx),
w: NewWebsocketWriter[*mgen.SubAttributesPayload](ctx, "scopedAttributes"),
}

return s, s.w.outbound
Expand Down
78 changes: 58 additions & 20 deletions internal/runtime/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"time"

"github.com/99designs/gqlgen/graphql/handler/transport"
"github.com/rs/zerolog/log"
"github.com/vektah/gqlparser/v2/gqlerror"
)

type WebsocketWriter[T any] struct {
ctx context.Context
module string
ctx context.Context

inbound chan []T
outbound chan T
Expand All @@ -22,8 +24,9 @@ type WebsocketWriter[T any] struct {
closed chan struct{}
}

func NewWebsocketWriter[T any](ctx context.Context) *WebsocketWriter[T] {
func NewWebsocketWriter[T any](ctx context.Context, module string) *WebsocketWriter[T] {
w := &WebsocketWriter[T]{
module: module,
ctx: ctx,
inbound: make(chan []T, MaxWebsocketMsgBuf),
outbound: make(chan T),
Expand All @@ -36,6 +39,7 @@ func NewWebsocketWriter[T any](ctx context.Context) *WebsocketWriter[T] {
}()

go w.run()
go w.check()

return w
}
Expand All @@ -62,12 +66,18 @@ func (s *WebsocketWriter[T]) Send(msgs []T) {
return
}

buffered := s.buffered.Add(int32(len(msgs)))
if buffered > MaxWebsocketMsgBuf {
s.fail("websocket buffer full")
// NOTE: we allow the buffer to grow beyond the limit because we don't want
// to automatically kill the connection if the client is trying to catch up.
// But we do want to kill the connection if the client is never able to
// catch up and blocking everyone else (see the check in the run method
// below).
s.buffered.Add(int32(len(msgs)))
// buffered := s.buffered.Add(int32(len(msgs)))
// if buffered > MaxWebsocketMsgBuf {
// s.fail("websocket buffer full")

return
}
// return
// }

select {
case <-s.closed:
Expand All @@ -79,10 +89,31 @@ func (s *WebsocketWriter[T]) Send(msgs []T) {
func (s *WebsocketWriter[T]) run() {
defer close(s.outbound)

for {
select {
case <-s.closed:
return
case msgs := <-s.inbound:
for _, msg := range msgs {
select {
case <-s.closed:
return
case s.outbound <- msg:
}
s.buffered.Add(int32(-1))
}

// s.buffered.Add(int32(-len(msgs)))
}
}
}

func (s *WebsocketWriter[T]) check() {
ticker := time.NewTicker(WebsocketCheckInterval)
lastCount := int32(0)

var timesAbove int
lastCounts := make([]int32, 0, WebsocketCheckThreshold)

for {
select {
Expand All @@ -94,29 +125,36 @@ func (s *WebsocketWriter[T]) run() {
// If for 10 seconds, we're still growing, then we're probably
// dealing with a laggy client. So we close the connection.
currentCount := s.buffered.Load()
if currentCount > lastCount {

if currentCount != 0 && currentCount >= lastCount {
timesAbove++

if timesAbove > WebsocketCheckThreshold {
s.fail("websocket buffer full")
lastCounts = append(lastCounts, currentCount)

if timesAbove >= WebsocketCheckThreshold {
s.fail("close on timeout")

counts := make([]int, len(lastCounts))

for i, c := range lastCounts {
counts[i] = int(c)
}

log.Warn().
Ints("counts", counts).
Str("module", s.module).
Int("threshold", WebsocketCheckThreshold).
Str("interval", WebsocketCheckInterval.String()).
Msg("websocket: closing connection, socket buffer size not reduced for too long")

return
}
} else {
lastCounts = lastCounts[:0]
timesAbove = 0
}

lastCount = currentCount
case msgs := <-s.inbound:
s.buffered.Add(int32(-len(msgs)))

for _, msg := range msgs {
select {
case <-s.closed:
return
case s.outbound <- msg:
}
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions lib/tajriba/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/tajriba/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@empirica/tajriba",
"version": "1.4.1",
"version": "1.4.2",
"description": "Tajriba client",
"source": "src/index.ts",
"main": "dist/index.js",
Expand Down

0 comments on commit b676ce9

Please sign in to comment.