Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rough POC for strongly typed realtime channel. #582

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 63 additions & 0 deletions ably/realtime_channel_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ably_test
import (
"context"
"fmt"
"github.com/stretchr/testify/require"
"net/url"
"reflect"
"testing"
Expand Down Expand Up @@ -141,3 +142,65 @@ func TestRealtimeChannel_AttachWhileDisconnected(t *testing.T) {
err = ablytest.Wait(<-res, nil)
assert.NoError(t, err)
}

func TestTypeSafePubSub(t *testing.T) {
type obj struct {
Name string
Num int
}
app, client1 := ablytest.NewRealtime(nil...)
defer safeclose(t, ablytest.FullRealtimeCloser(client1), app)

client2 := app.NewRealtime(ably.WithEchoMessages(false))
defer safeclose(t, ablytest.FullRealtimeCloser(client2))

// create two typed channels which carry messages of type obj.
channel1 := ably.GetChannelOf[obj](client1, "test")
channel2 := ably.GetChannelOf[obj](client2, "test")

ctx := context.Background()

done := make(chan struct{})
cancel, err := channel2.Subscribe(ctx, "eg", func(m *ably.MessageOf[obj], err error) {
assert.NoError(t, err)
assert.Equal(t, "eg", m.Name)
assert.Equal(t, obj{"a", 1}, m.Data)
close(done)
})

assert.NoError(t, err)

err = channel1.Publish(ctx, "eg", obj{"a", 1})
require.NoError(t, err)
<-done
cancel()
}

func TestTypeSafePubSubString(t *testing.T) {
app, client1 := ablytest.NewRealtime(nil...)
defer safeclose(t, ablytest.FullRealtimeCloser(client1), app)

client2 := app.NewRealtime(ably.WithEchoMessages(false))
defer safeclose(t, ablytest.FullRealtimeCloser(client2))

// create two typed channels which carry messages of type string.
channel1 := ably.GetChannelOf[string](client1, "test")
channel2 := ably.GetChannelOf[string](client2, "test")

ctx := context.Background()

done := make(chan struct{})
cancel, err := channel2.Subscribe(ctx, "eg", func(m *ably.MessageOf[string], err error) {
assert.NoError(t, err)
assert.Equal(t, "eg", m.Name)
assert.Equal(t, "hiya", m.Data)
close(done)
})

assert.NoError(t, err)

err = channel1.Publish(ctx, "eg", "hiya")
require.NoError(t, err)
<-done
cancel()
}
86 changes: 86 additions & 0 deletions ably/realtime_typed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package ably

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
)

// MessageOf is a strongly typed varient of Message, where the Data value is constrained to be a of type T.
type MessageOf[T any] struct {
// Data is the message payload, if provided (TM2d).
Data T `json:"data,omitempty" codec:"data,omitempty"`
Message
}

type RealtimeChannelOf[T any] struct {
RealtimeChannel
}

// Publish publishes a message of type T.
func (r *RealtimeChannelOf[T]) Publish(ctx context.Context, name string, o T) error {
return r.RealtimeChannel.Publish(ctx, name, &o)
}

type DecodeError struct {
Want any
Got any
}

func (e DecodeError) Error() string {
return fmt.Sprintf("can not decode message of type %T into %T", e.Got, e.Want)
}

// Subscribe subscribes to messages. When a message arrives, it decoded as a MessageOf[T] and handle is called.
// If the message can not be decoded, then handle is called with a DecodeError.
func (r *RealtimeChannelOf[T]) Subscribe(ctx context.Context, name string, handle func(*MessageOf[T], error)) (func(), error) {
return r.RealtimeChannel.Subscribe(ctx, name, func(msg *Message) {
var mo MessageOf[T]
mo.Name = msg.Name
mo.ID = msg.ID
mo.Timestamp = msg.Timestamp
mo.ClientID = msg.ClientID
mo.Extras = msg.Extras

// TODO: this switch statement does not work.
switch msg.Data.(type) {
case string:
case []byte:
var ok bool
mo.Data, ok = msg.Data.(T)
if !ok {
handle(nil, DecodeError{msg.Data, mo.Data})
return
}
handle(&mo, nil)
return
}

var r io.Reader
fmt.Printf("%T", msg.Data)
switch d := msg.Data.(type) {
case string:
r = strings.NewReader(d)
case []byte:
r = bytes.NewReader(d)
default:
handle(nil, DecodeError{msg.Data, mo.Data})
}
err := json.NewDecoder(r).Decode(&mo.Data)
if err != nil {
handle(nil, fmt.Errorf("could not decode message value into %T, %w", mo.Data, err))
return
}

handle(&mo, nil)
})
}

// GetChannelOf[T] returns a channel of messages of type MessageOf[T].
func GetChannelOf[T any](client *Realtime, name string) *RealtimeChannelOf[T] {
ch := client.Channels.Get(name)
return &RealtimeChannelOf[T]{*ch}
}
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
module github.com/ably/ably-go

go 1.19

require (
github.com/stretchr/testify v1.7.1
github.com/ugorji/go/codec v1.1.9
golang.org/x/sys v0.2.0
golang.org/x/sys v0.4.0
nhooyr.io/websocket v1.8.7
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/klauspost/compress v1.10.3 // indirect
github.com/klauspost/compress v1.15.14 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.17
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvK
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8=
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc=
github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
Expand All @@ -46,14 +47,13 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go v1.1.9 h1:SObrQTaSuP8WOv2WNCj8gECiNSJIUvk3Q7N26c96Gws=
github.com/ugorji/go v1.1.9/go.mod h1:chLrngdsg43geAaeId+nXO57YsDdl5OZqd/QtBiD19g=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.1.9 h1:J/7hhpkQwgypRNvaeh/T5gzJ2gEI/l8S3qyRrdEa1fA=
github.com/ugorji/go/codec v1.1.9/go.mod h1:+SWgpdqOgdW5sBaiDfkHilQ1SxQ1hBkq/R+kHfL7Suo=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -64,7 +64,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=