Skip to content

Commit

Permalink
Merge pull request #433 from ably/feature/webassembly-compatibility
Browse files Browse the repository at this point in the history
Replace obsolete websocket library -- allows WebAssembly
  • Loading branch information
Jmgr committed Jul 1, 2022
2 parents b0876a1 + aaa2962 commit 83054f2
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 67 deletions.
2 changes: 1 addition & 1 deletion ably/realtime_conn_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestRealtimeConn_SendErrorReconnects(t *testing.T) {
ablytest.Instantly.NoRecv(t, nil, publishErr, t.Fatalf)

// Reconnect should happen instantly as a result of transport closure.
ablytest.Instantly.Send(t, allowDial, struct{}{}, t.Fatalf)
allowDial <- struct{}{}

// After reconnection, message should be published.
ablytest.Soon.Recv(t, &err, publishErr, t.Fatalf)
Expand Down
97 changes: 64 additions & 33 deletions ably/websocket.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,94 @@
package ably

import (
"context"
"encoding/json"
"errors"
"net"
"net/http"
"net/url"
"time"

"github.com/ably/ably-go/ably/internal/ablyutil"
"golang.org/x/net/websocket"
"nhooyr.io/websocket"
)

type proto int

const (
jsonProto proto = iota
msgpackProto
)

type websocketConn struct {
conn *websocket.Conn
codec websocket.Codec
proto proto
}

func (ws *websocketConn) Send(msg *protocolMessage) error {
return ws.codec.Send(ws.conn, msg)
switch ws.proto {
case jsonProto:
p, err := json.Marshal(msg)
if err != nil {
return err
}
return ws.conn.Write(context.Background(), websocket.MessageText, p)
case msgpackProto:
p, err := ablyutil.MarshalMsgpack(msg)
if err != nil {
return err
}
return ws.conn.Write(context.Background(), websocket.MessageBinary, p)
}
return nil
}

func (ws *websocketConn) Receive(deadline time.Time) (*protocolMessage, error) {
msg := &protocolMessage{}
if !deadline.IsZero() {
err := ws.conn.SetReadDeadline(deadline)
if err != nil {
return nil, err
}
var (
ctx context.Context
cancel context.CancelFunc
)
if deadline.IsZero() {
ctx = context.Background()
} else {
ctx, cancel = context.WithDeadline(context.Background(), deadline)
defer cancel()
}
err := ws.codec.Receive(ws.conn, &msg)
_, data, err := ws.conn.Read(ctx)
if err != nil {
return nil, err
}
switch ws.proto {
case jsonProto:
err := json.Unmarshal(data, msg)
if err != nil {
return nil, err
}
case msgpackProto:
err := ablyutil.UnmarshalMsgpack(data, msg)
if err != nil {
return nil, err
}
}
return msg, nil
}

func (ws *websocketConn) Close() error {
return ws.conn.Close()
return ws.conn.Close(websocket.StatusNormalClosure, "")
}

func dialWebsocket(proto string, u *url.URL, timeout time.Duration) (*websocketConn, error) {
ws := &websocketConn{}
switch proto {
case "application/json":
ws.codec = websocket.JSON
ws.proto = jsonProto
case "application/x-msgpack":
ws.codec = msgpackCodec
ws.proto = msgpackProto
default:
return nil, errors.New(`invalid protocol "` + proto + `"`)
}
// Starts a raw websocket connection with server
conn, err := dialWebsocketTimeout(u.String(), "", "https://"+u.Host, timeout)
conn, err := dialWebsocketTimeout(u.String(), "https://"+u.Host, timeout)
if err != nil {
return nil, err
}
Expand All @@ -58,27 +97,19 @@ func dialWebsocket(proto string, u *url.URL, timeout time.Duration) (*websocketC
}

// dialWebsocketTimeout dials the websocket with a timeout.
func dialWebsocketTimeout(uri, protocol, origin string, timeout time.Duration) (*websocket.Conn, error) {
config, err := websocket.NewConfig(uri, origin)
func dialWebsocketTimeout(uri, origin string, timeout time.Duration) (*websocket.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

var ops websocket.DialOptions
ops.HTTPHeader = make(http.Header)
ops.HTTPHeader.Add(ablyAgentHeader, ablyAgentIdentifier)

c, _, err := websocket.Dial(ctx, uri, &ops)

if err != nil {
return nil, err
}
config.Header.Set(ablyAgentHeader, ablyAgentIdentifier)
if protocol != "" {
config.Protocol = []string{protocol}
}
config.Dialer = &net.Dialer{
Timeout: timeout,
}
return websocket.DialConfig(config)
}

var msgpackCodec = websocket.Codec{
Marshal: func(v interface{}) ([]byte, byte, error) {
p, err := ablyutil.MarshalMsgpack(v)
return p, websocket.BinaryFrame, err
},
Unmarshal: func(p []byte, _ byte, v interface{}) error {
return ablyutil.UnmarshalMsgpack(p, v)
},
return c, nil
}
39 changes: 13 additions & 26 deletions ably/websocket_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,32 +95,24 @@ func TestWebsocketDial(t *testing.T) {
websocketUrl := fmt.Sprintf("ws%s", strings.TrimPrefix(ts.URL, "http"))
testServerURL, _ := url.Parse(websocketUrl)

// Calculate the expected Ably-Agent header used by tests.
expectedAgentHeader := AblySDKIdentifier + " " + GoRuntimeIdentifier + " " + GoOSIdentifier()

tests := map[string]struct {
dialProtocol string
expectedPayloadType byte
expectedSubprotocol []string
expectedResult *websocketConn
expectedErr error
dialProtocol string
expectedErr error
expectedProto proto
}{
"Can dial for protocol application/json": {
dialProtocol: "application/json",
expectedPayloadType: uint8(1),
expectedSubprotocol: []string(nil),
expectedErr: nil,
dialProtocol: "application/json",
expectedErr: nil,
expectedProto: jsonProto,
},
"Can dial for protocol application/x-msgpack": {
dialProtocol: "application/x-msgpack",
expectedPayloadType: uint8(1),
expectedSubprotocol: []string(nil),
expectedErr: nil,
dialProtocol: "application/x-msgpack",
expectedErr: nil,
expectedProto: msgpackProto,
},
"Can handle an error when dialing for an invalid protocol": {
dialProtocol: "aProtocol",
expectedResult: nil,
expectedErr: errors.New(`invalid protocol "aProtocol"`),
dialProtocol: "aProtocol",
expectedErr: errors.New(`invalid protocol "aProtocol"`),
},
}

Expand All @@ -131,11 +123,8 @@ func TestWebsocketDial(t *testing.T) {
assert.Equal(t, test.expectedErr, err)

if result != nil {
assert.NotNil(t, result.codec)
assert.Equal(t, expectedAgentHeader, result.conn.Config().Header.Get("Ably-Agent"))
assert.Equal(t, test.expectedPayloadType, result.conn.PayloadType)
assert.Equal(t, test.expectedSubprotocol, result.conn.Config().Protocol)
assert.True(t, result.conn.IsClientConn())
assert.NotNil(t, result.conn)
assert.Equal(t, test.expectedProto, result.proto)
}
})
}
Expand All @@ -153,8 +142,6 @@ func TestWebsocketSendAndReceive(t *testing.T) {
tests := map[string]struct {
dialProtocol string
expectedMessageType string
expectedPayloadType byte
expectedResult *websocketConn
expectedErr error
}{
"Can send and receive a message using protocol application/json": {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/ably/ably-go
require (
github.com/stretchr/testify v1.7.1
github.com/ugorji/go/codec v1.1.9
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654
nhooyr.io/websocket v1.8.7
)
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,10 @@ github.com/ugorji/go v1.1.9/go.mod h1:chLrngdsg43geAaeId+nXO57YsDdl5OZqd/QtBiD19
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.1.9 h1:J/7hhpkQwgypRNvaeh/T5gzJ2gEI/l8S3qyRrdEa1fA=
github.com/ugorji/go/codec v1.1.9/go.mod h1:+SWgpdqOgdW5sBaiDfkHilQ1SxQ1hBkq/R+kHfL7Suo=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down

0 comments on commit 83054f2

Please sign in to comment.