Skip to content

Commit

Permalink
Merge pull request #596 from ably/feature/add-publish-async-method
Browse files Browse the repository at this point in the history
add publish async method
  • Loading branch information
AndyTWF committed Jun 7, 2023
2 parents d88b30f + d912495 commit e8d6171
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 54 deletions.
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@ if err != nil {
}
```

`Publish` will block until either the publish is acknowledged or failed to
deliver.

Alternatively you can use `PublishAsync` which does not block:

```go
channel.PublishAsync("EventName1", "EventData11", func(err error) {
if err != nil {
fmt.Println("failed to publish", err)
} else {
fmt.Println("publish ok")
}
})
```

Note the `onAck` callback must not block as it would block the internal client.

#### Handling errors

Errors returned by this library may have an underlying `*ErrorInfo` type.
Expand Down Expand Up @@ -341,9 +358,6 @@ As of release 1.2.0, the following are not implemented and will be covered in fu
connection becomes `suspended` and then resumes, and presence members associated with the client will not be
automatically re-entered.

- Transient realtime publishing is not supported, so a call to `publish()` on a realtime channel will trigger attachment
of the channel.

- Inband reauthentication is not supported; expiring tokens will trigger a disconnection and resume of a realtime
connection.

Expand Down
78 changes: 48 additions & 30 deletions ably/realtime_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,25 +585,49 @@ func (em ChannelEventEmitter) OffAll() {
}

// Publish publishes a single message to the channel with the given event name and message payload.
// A callback may optionally be passed in to this call to be notified of success or failure of the operation.
// When publish is called with this client library, it won't attempt to implicitly attach to the channel,
// so long as transient publishing is available in the library. Otherwise, the client will implicitly attach (RTL6i).
//
// This implicitly attaches the channel if it's not already attached. If the
// context is canceled before the attach operation finishes, the call
// returns with an error, but the operation carries on in the background and
// the channel may eventually be attached and the message published anyway.
// This will block until either the publish is acknowledged or fails to deliver.
//
// If the context is cancelled before the attach operation finishes, the call
// returns an error but the publish will carry on in the background and may
// eventually be published anyway.
func (c *RealtimeChannel) Publish(ctx context.Context, name string, data interface{}) error {
return c.PublishMultiple(ctx, []*Message{{Name: name, Data: data}})
}

// PublishAsync is the same as Publish except instead of blocking it calls onAck
// with nil if the publish was successful or the appropriate error.
//
// Note onAck must not block as it would block the internal client.
func (c *RealtimeChannel) PublishAsync(name string, data interface{}, onAck func(err error)) error {
return c.PublishMultipleAsync([]*Message{{Name: name, Data: data}}, onAck)
}

// PublishMultiple publishes all given messages on the channel at once.
//
// This implicitly attaches the channel if it's not already attached. If the
// context is canceled before the attach operation finishes, the call
// returns with an error, but the operation carries on in the background and
// the channel may eventually be attached and the message published anyway (RTL6i).
// If the context is cancelled before the attach operation finishes, the call
// returns an error but the publish will carry on in the background and may
// eventually be published anyway.
func (c *RealtimeChannel) PublishMultiple(ctx context.Context, messages []*Message) error {
listen := make(chan error, 1)
onAck := func(err error) {
listen <- err
}
if err := c.PublishMultipleAsync(messages, onAck); err != nil {
return err
}

select {
case <-ctx.Done():
return ctx.Err()
case err := <-listen:
return err
}
}

// PublishMultipleAsync is the same as PublishMultiple except it calls onAck instead of blocking
// (see PublishAsync).
func (c *RealtimeChannel) PublishMultipleAsync(messages []*Message, onAck func(err error)) error {
id := c.client.Auth.clientIDForCheck()
for _, v := range messages {
if v.ClientID != "" && id != wildcardClientID && v.ClientID != id {
Expand All @@ -616,11 +640,7 @@ func (c *RealtimeChannel) PublishMultiple(ctx context.Context, messages []*Messa
Channel: c.Name,
Messages: messages,
}
res, err := c.send(msg)
if err != nil {
return err
}
return res.Wait(ctx)
return c.send(msg, onAck)
}

// History retrieves a [ably.HistoryRequest] object, containing an array of historical
Expand All @@ -633,45 +653,43 @@ func (c *RealtimeChannel) History(o ...HistoryOption) HistoryRequest {
return c.client.rest.Channels.Get(c.Name).History(o...)
}

func (c *RealtimeChannel) send(msg *protocolMessage) (result, error) {
if res, enqueued := c.maybeEnqueue(msg); enqueued {
return res, nil
func (c *RealtimeChannel) send(msg *protocolMessage, onAck func(err error)) error {
if enqueued := c.maybeEnqueue(msg, onAck); enqueued {
return nil
}

if !c.canSend() {
return nil, newError(ErrChannelOperationFailedInvalidChannelState, nil)
return newError(ErrChannelOperationFailedInvalidChannelState, nil)
}

res, listen := newErrResult()
c.client.Connection.send(msg, listen)
return res, nil
c.client.Connection.send(msg, onAck)
return nil
}

func (c *RealtimeChannel) maybeEnqueue(msg *protocolMessage) (_ result, enqueued bool) {
func (c *RealtimeChannel) maybeEnqueue(msg *protocolMessage, onAck func(err error)) bool {
// RTL6c2
if c.opts().NoQueueing {
return nil, false
return false
}
switch c.client.Connection.State() {
default:
return nil, false
return false
case ConnectionStateInitialized,
ConnectionStateConnecting,
ConnectionStateDisconnected:
}
switch c.State() {
default:
return nil, false
return false
case ChannelStateInitialized,
ChannelStateAttached,
ChannelStateDetached,
ChannelStateAttaching,
ChannelStateDetaching:
}

res, listen := newErrResult()
c.queue.Enqueue(msg, listen)
return res, true
c.queue.Enqueue(msg, onAck)
return true
}

func (c *RealtimeChannel) canSend() bool {
Expand Down
14 changes: 14 additions & 0 deletions ably/realtime_channel_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ func TestRealtimeChannel_Publish(t *testing.T) {
assert.NoError(t, err, "Publish()=%v", err)
}

func TestRealtimeChannel_PublishAsync(t *testing.T) {
app, client := ablytest.NewRealtime(nil...)
defer safeclose(t, ablytest.FullRealtimeCloser(client), app)
err := ablytest.Wait(ablytest.ConnWaiter(client, client.Connect, ably.ConnectionEventConnected), nil)
assert.NoError(t, err)
channel := client.Channels.Get("test")
listen := make(chan error, 1)
err = channel.PublishAsync("hello", "world", func(err error) {
listen <- err
})
assert.NoError(t, err, "PublishAsync()=%v", err)
assert.NoError(t, <-listen, "onAck=%v", err)
}

func TestRealtimeChannel_Subscribe(t *testing.T) {
app, client1 := ablytest.NewRealtime(nil...)
defer safeclose(t, ablytest.FullRealtimeCloser(client1), app)
Expand Down
24 changes: 15 additions & 9 deletions ably/realtime_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,25 +572,31 @@ func (c *Connection) advanceSerial() {
c.msgSerial = (c.msgSerial + 1) % maxint64
}

func (c *Connection) send(msg *protocolMessage, listen chan<- error) {
func (c *Connection) send(msg *protocolMessage, onAck func(err error)) {
hasMsgSerial := msg.Action == actionMessage || msg.Action == actionPresence
c.mtx.Lock()
switch state := c.state; state {
default:
c.mtx.Unlock()
listen <- connStateError(state, nil)
if onAck != nil {
onAck(connStateError(state, nil))
}

case ConnectionStateInitialized, ConnectionStateConnecting, ConnectionStateDisconnected:
c.mtx.Unlock()
if c.opts.NoQueueing {
listen <- connStateError(state, errQueueing)
if onAck != nil {
onAck(connStateError(state, errQueueing))
}
}
c.queue.Enqueue(msg, listen) // RTL4i
c.queue.Enqueue(msg, onAck) // RTL4i

case ConnectionStateConnected:
if err := c.verifyAndUpdateMessages(msg); err != nil {
c.mtx.Unlock()
listen <- err
if onAck != nil {
onAck(err)
}
return
}
if hasMsgSerial {
Expand All @@ -606,13 +612,13 @@ func (c *Connection) send(msg *protocolMessage, listen chan<- error) {
// indefinitely.
c.conn.Close()
c.mtx.Unlock()
c.queue.Enqueue(msg, listen)
c.queue.Enqueue(msg, onAck)
} else {
if hasMsgSerial {
c.advanceSerial()
}
if listen != nil {
c.pending.Enqueue(msg, listen)
if onAck != nil {
c.pending.Enqueue(msg, onAck)
}
c.mtx.Unlock()
}
Expand Down Expand Up @@ -691,7 +697,7 @@ func (c *Connection) resendPending() {
c.mtx.Unlock()
c.log().Debugf("resending %d messages waiting for ACK/NACK", len(cx))
for _, v := range cx {
c.send(v.msg, v.ch)
c.send(v.msg, v.onAck)
}
}

Expand Down
16 changes: 15 additions & 1 deletion ably/realtime_presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,21 @@ func (pres *RealtimePresence) send(msg *PresenceMessage) (result, error) {
if err != nil {
return err
}
return wait(ctx)(pres.channel.send(protomsg))

listen := make(chan error, 1)
onAck := func(err error) {
listen <- err
}
if err := pres.channel.send(protomsg, onAck); err != nil {
return err
}

select {
case <-ctx.Done():
return ctx.Err()
case err := <-listen:
return err
}
}), nil
}

Expand Down
26 changes: 15 additions & 11 deletions ably/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func newPendingEmitter(log logger) pendingEmitter {
}

type msgCh struct {
msg *protocolMessage
ch chan<- error
msg *protocolMessage
onAck func(err error)
}

// Dismiss lets go of the channels that are waiting for an error on this queue.
Expand All @@ -129,14 +129,14 @@ func (q *pendingEmitter) Dismiss() []msgCh {
return cx
}

func (q *pendingEmitter) Enqueue(msg *protocolMessage, ch chan<- error) {
func (q *pendingEmitter) Enqueue(msg *protocolMessage, onAck func(err error)) {
if len(q.queue) > 0 {
expected := q.queue[len(q.queue)-1].msg.MsgSerial + 1
if got := msg.MsgSerial; expected != got {
panic(fmt.Sprintf("protocol violation: expected next enqueued message to have msgSerial %d; got %d", expected, got))
}
}
q.queue = append(q.queue, msgCh{msg, ch})
q.queue = append(q.queue, msgCh{msg, onAck})
}

func (q *pendingEmitter) Ack(msg *protocolMessage, errInfo *ErrorInfo) {
Expand Down Expand Up @@ -184,13 +184,15 @@ func (q *pendingEmitter) Ack(msg *protocolMessage, errInfo *ErrorInfo) {
err = errImplictNACK
}
q.log.Verbosef("received %v for message serial %d", msg.Action, sch.msg.MsgSerial)
sch.ch <- err
if sch.onAck != nil {
sch.onAck(err)
}
}
}

type msgch struct {
msg *protocolMessage
ch chan<- error
msg *protocolMessage
onAck func(err error)
}

type msgQueue struct {
Expand All @@ -205,17 +207,17 @@ func newMsgQueue(conn *Connection) *msgQueue {
}
}

func (q *msgQueue) Enqueue(msg *protocolMessage, listen chan<- error) {
func (q *msgQueue) Enqueue(msg *protocolMessage, onAck func(err error)) {
q.mtx.Lock()
// TODO(rjeczalik): reorder the queue so Presence / Messages can be merged
q.queue = append(q.queue, msgch{msg, listen})
q.queue = append(q.queue, msgch{msg, onAck})
q.mtx.Unlock()
}

func (q *msgQueue) Flush() {
q.mtx.Lock()
for _, msgch := range q.queue {
q.conn.send(msgch.msg, msgch.ch)
q.conn.send(msgch.msg, msgch.onAck)
}
q.queue = nil
q.mtx.Unlock()
Expand All @@ -225,7 +227,9 @@ func (q *msgQueue) Fail(err error) {
q.mtx.Lock()
for _, msgch := range q.queue {
q.log().Errorf("failure sending message (serial=%d): %v", msgch.msg.MsgSerial, err)
msgch.ch <- newError(90000, err)
if msgch.onAck != nil {
msgch.onAck(newError(90000, err))
}
}
q.queue = nil
q.mtx.Unlock()
Expand Down

0 comments on commit e8d6171

Please sign in to comment.