Skip to content

Commit

Permalink
Merge pull request #594 from ably/integration/subscription-filters
Browse files Browse the repository at this point in the history
Integration/subscription filters
  • Loading branch information
owenpearson committed May 26, 2023
2 parents 04023e1 + adedb28 commit d88b30f
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 1 deletion.
43 changes: 43 additions & 0 deletions ably/internal/ablyutil/regex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package ablyutil

import (
"errors"
"fmt"
"regexp"
)

// derivedChannelMatch provides the qualifyingParam and channelName from a channel regex match for derived channels
type derivedChannelMatch struct {
QualifierParam string
ChannelName string
}

// This regex check is to retain existing channel params if any e.g [?rewind=1]foo to
// [filter=xyz?rewind=1]foo. This is to keep channel compatibility around use of
// channel params that work with derived channels.
func MatchDerivedChannel(name string) (*derivedChannelMatch, error) {
regex := `^(\[([^?]*)(?:(.*))\])?(.+)$`
r, err := regexp.Compile(regex)
if err != nil {
err := errors.New("regex compilation failed")
return nil, err
}
match := r.FindStringSubmatch(name)

if len(match) == 0 || len(match) < 5 {
err := errors.New("regex match failed")
return nil, err
}
// Fail if there is already a channel qualifier,
// eg [meta]foo should fail instead of just overriding with [filter=xyz]foo
if len(match[2]) > 0 {
err := fmt.Errorf("cannot use a derived option with a %s channel", match[2])
return nil, err
}

// Return match values to be added to derive channel quantifier.
return &derivedChannelMatch{
QualifierParam: match[3],
ChannelName: match[4],
}, nil
}
37 changes: 37 additions & 0 deletions ably/internal/ablyutil/regex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//go:build !integration
// +build !integration

package ablyutil

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMatchDerivedChannel(t *testing.T) {
var channels = []struct {
name string
input string
want *derivedChannelMatch
}{
{"valid with base channel name", "foo", &derivedChannelMatch{QualifierParam: "", ChannelName: "foo"}},
{"valid with base channel namespace", "foo:bar", &derivedChannelMatch{QualifierParam: "", ChannelName: "foo:bar"}},
{"valid with existing qualifying option", "[?rewind=1]foo", &derivedChannelMatch{QualifierParam: "?rewind=1", ChannelName: "foo"}},
{"valid with existing qualifying option with channel namespace", "[?rewind=1]foo:bar", &derivedChannelMatch{QualifierParam: "?rewind=1", ChannelName: "foo:bar"}},
{"fail with invalid param with channel namespace", "[param:invalid]foo:bar", nil},
{"fail with wrong channel option param", "[param=1]foo", nil},
{"fail with invalid qualifying option", "[meta]foo", nil},
{"fail with invalid regex match", "[failed-match]foo", nil},
}

for _, tt := range channels {
t.Run(tt.name, func(t *testing.T) {
match, err := MatchDerivedChannel(tt.input)
if err != nil {
assert.Error(t, err, "An error is expected for the regex match")
}
assert.Equal(t, tt.want, match, "invalid output received")
})
}
}
23 changes: 23 additions & 0 deletions ably/realtime_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package ably

import (
"context"
"encoding/base64"
"errors"
"fmt"
"sort"
"sync"

"github.com/ably/ably-go/ably/internal/ablyutil"
)

var (
Expand Down Expand Up @@ -50,6 +53,11 @@ type ChannelOption func(*channelOptions)
// channelOptions wraps ChannelOptions. It exists so that users can't implement their own ChannelOption.
type channelOptions protoChannelOptions

// DeriveOptions allows options to be used in creating a derived channel
type DeriveOptions struct {
Filter string
}

// ChannelWithCipherKey is a constructor that takes private key as a argument.
// It is used to encrypt and decrypt payloads (TB3)
func ChannelWithCipherKey(key []byte) ChannelOption {
Expand Down Expand Up @@ -114,6 +122,21 @@ func (ch *RealtimeChannels) Get(name string, options ...ChannelOption) *Realtime
return c
}

// GetDerived is a preview feature and may change in a future non-major release.
// It creates a new derived [ably.RealtimeChannel] object for given channel name, using the provided derive options and
// channel options if any. Returns error if any occurs.
func (ch *RealtimeChannels) GetDerived(name string, deriveOptions DeriveOptions, options ...ChannelOption) (*RealtimeChannel, error) {
if deriveOptions.Filter != "" {
match, err := ablyutil.MatchDerivedChannel(name)
if err != nil {
return nil, newError(40010, err)
}
filter := base64.StdEncoding.EncodeToString([]byte(deriveOptions.Filter))
name = fmt.Sprintf("[filter=%s%s]%s", filter, match.QualifierParam, match.ChannelName)
}
return ch.Get(name, options...), nil
}

// Iterate returns a [ably.RealtimeChannel] for each iteration on existing channels.
// It is safe to call Iterate from multiple goroutines, however there's no guarantee
// the returned list would not list a channel that was already released from
Expand Down
113 changes: 113 additions & 0 deletions ably/realtime_channel_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net/url"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -93,6 +94,118 @@ func TestRealtimeChannel_Subscribe(t *testing.T) {
assert.NoError(t, err)
}

func TestRealtimeChannel_SubscriptionFilters(t *testing.T) {
app, err := ablytest.NewSandbox(nil)
assert.NoError(t, err)
defer app.Close()
options := app.Options()
restClient, err := ably.NewREST(options...)
assert.NoError(t, err)

realtimeClient := app.NewRealtime(ably.WithEchoMessages(false))
defer safeclose(t, ablytest.FullRealtimeCloser(realtimeClient))

testMessages := []*ably.Message{
{
Name: "filtered",
Data: "This message will pass the filter",
Extras: map[string]interface{}{
"headers": map[string]interface{}{
"name": "value one",
"number": 1234,
"bool": true,
},
},
},
{
Name: "filtered",
Data: "filtered messages",
},
{
Name: "filtered",
Data: "This message will be filtered because it does not meet condition on headers.number",
Extras: map[string]interface{}{
"headers": map[string]interface{}{
"name": "value one",
"number": 5678,
"bool": true,
},
},
},
{
Name: "filtered",
Data: "This is filtered",
},
{
Name: "end",
Data: "Last message check",
},
}
filter := ably.DeriveOptions{
Filter: "name == `\"filtered\"` && headers.number == `1234`",
}

err = ablytest.Wait(ablytest.ConnWaiter(realtimeClient, realtimeClient.Connect, ably.ConnectionEventConnected), nil)
assert.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
restChannel := restClient.Channels.Get("test")
rtDerivedChannel, err := realtimeClient.Channels.GetDerived("[?param=1]test", filter)
realtimeChannel := realtimeClient.Channels.Get("test")
assert.NoError(t, err, "realtimeChannel: GetDerived()=%v", err)

filteredMessages := make(chan *ably.Message, 10)
unsub, err := rtDerivedChannel.SubscribeAll(ctx, func(msg *ably.Message) {
filteredMessages <- msg
})
assert.NoError(t, err)
defer unsub()

unfilteredMessages := make(chan *ably.Message, 10)
unsub, err = realtimeChannel.SubscribeAll(ctx, func(msg *ably.Message) {
unfilteredMessages <- msg
})
assert.NoError(t, err)
defer unsub()

err = restChannel.PublishMultiple(context.Background(), testMessages)
assert.NoError(t, err, "restClient: PublishMultiple()=%v", err)

var wg sync.WaitGroup

wg.Add(1)
go func(ctx context.Context) {
defer wg.Done()
for {
select {
case m := <-filteredMessages:
assert.Equal(t, "filtered", m.Name)
case <-ctx.Done():
return
}
}
}(ctx)

wg.Add(1)
go func(ctx context.Context) {
defer cancel()
defer wg.Done()
for {
select {
case m := <-unfilteredMessages:
if m.Name == "end" {
return
}
assert.Equal(t, "filtered", m.Name)
case <-ctx.Done():
return
}
}
}(ctx)

wg.Wait()
}
func TestRealtimeChannel_AttachWhileDisconnected(t *testing.T) {

doEOF := make(chan struct{}, 1)
Expand Down
4 changes: 3 additions & 1 deletion ablytest/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ type Config struct {
func DefaultConfig() *Config {
return &Config{
Keys: []Key{
{},
{
Capability: `{"[*]*":["*"]}`,
},
},
Namespaces: []Namespace{
{ID: "persisted", Persisted: true},
Expand Down

0 comments on commit d88b30f

Please sign in to comment.