Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental v2/channel and consumer group #1661

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

mschristensen
Copy link
Contributor

@mschristensen mschristensen commented Mar 1, 2024

Labelled draft as not intended to merge. We will create a special release tag from this branch for customers requiring preview access to this functionality.


Adds a client-side simulation of channel + consumer groups functionality.

A channel group identifies a set of channels using a regular expression. The client SDK is responsible for subscribing to channels that match the regex as they become active and unsubscribing as they become inactive. Active channels are made known to the client via a dedicated channel named $ably:active which for the POC is published to by a lambda integration rule.

The set of channels in the channel group can be partitioned across a set of consumers that share the same consumerGroup.name. Consumers become aware of one another via presence on a channel with the same name as the consumer group. Channels are partitioned according to a simple modulo hash based partitioning scheme.

// subscribes to all active channels matching `foo:*`
const group = client.channelGroups.get('foo:.*', { consumerGroup: { name: 'group' } });
await group.subscribe((channel, msg) => console.log(`received message ${msg.name} on ${channel}`));

This implementation is intended to demonstrate the channel/consumer groups functionality and is not intended for usage in production. In a production-ready implementation, channel and consumer groups will be implemented in the backend realtime system and the client implementation will be much simpler. However this PR is indicative of what the public API would look like (except for some clearly-labelled simulation-specific configuration).

@github-actions github-actions bot temporarily deployed to staging/pull/1661/features March 1, 2024 18:30 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/typedoc March 1, 2024 18:30 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 1, 2024 18:30 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/features March 4, 2024 10:37 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/typedoc March 4, 2024 10:37 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 4, 2024 10:37 Inactive
Copy link
Contributor

@ttypic ttypic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation looks good, I am happy to release with experiment tag

src/common/lib/util/hashring.ts Outdated Show resolved Hide resolved
modules.d.ts Outdated Show resolved Hide resolved
Copy link
Contributor

@VeskeR VeskeR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, small comment about polyfill.

Test checks are looking weird though:
All integration tests are failing with an abnormal time (100m - 300m) - can't even load logs from them, there are just so many after running for so long.
Maybe there is a flaky/failed test that is just stuck in a loop or doesn't have a timeout set or something.
Should probably make sure tests are passing before releasing

src/common/lib/util/utils.ts Outdated Show resolved Hide resolved
@github-actions github-actions bot temporarily deployed to staging/pull/1661/features March 5, 2024 10:03 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/typedoc March 5, 2024 10:04 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 5, 2024 10:04 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/features March 5, 2024 10:22 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/typedoc March 5, 2024 10:23 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 5, 2024 10:23 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/features March 5, 2024 13:42 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 5, 2024 13:43 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/typedoc March 5, 2024 13:43 Inactive
@mschristensen
Copy link
Contributor Author

@VeskeR I pushed up some changes to tests. They pass reliably for me locally so not sure why we're getting issues in CI. Will wait for the run to finish and inspect the logs

@VeskeR
Copy link
Contributor

VeskeR commented Mar 5, 2024

Still failed after 100+ minutes unfortunately. Hope you will be able to load some logs, there are a lot.

Quickly looked at previous PRs:
seems like it started happening with this PR and onwards: #1655 (see its CI result https://github.com/ably/ably-js/actions/runs/8109535381/job/22164928744). Probably something was added to the tests that is causing an infinite loop of some sorts.

PRs before that had their tests passing in CI. For example, #1651 precedes the PR above and its tests either pass, or fail in a reasonable time (~10 minutes)

@mschristensen
Copy link
Contributor Author

From the logs, there's a lot of

2024-03-05T13:47:13.6564779Z 13:47:13.656 Ably: Connection state: failed; reason: [ErrorInfo: account restricted (connection limit exceeded); statusCode=401; code=40111; see https://help.ably.io/error/40111 ]

So I have bumped the default connection limits for the test account from 100 to 200, and will see what effect that has.

@mschristensen
Copy link
Contributor Author

Actually, looks like there's a lot of

3-06T10:36:33.3326370Z 10:36:33.332 Ably: Transport.onProtocolMessage(): received on web_socket: [ProtocolMessage; action=DISCONNECTED; error=[ErrorInfo: Key/token status changed (expire); statusCode=401; code=40142; see https://help.ably.io/error/40142 ]]; connectionId = IK5jju0wpn
2024-03-06T10:36:33.3330234Z 10:36:33.332 Ably: Transport.onDisconnect(): err = [ErrorInfo: Key/token status changed (expire); statusCode=401; code=40142; see https://help.ably.io/error/40142 ]

Which is causing a large number of connections to be terminated and reopened. Looking into it

@github-actions github-actions bot temporarily deployed to staging/pull/1667/features March 6, 2024 11:36 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1667/bundle-report March 6, 2024 11:36 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1667/typedoc March 6, 2024 11:36 Inactive
Add a client side channel group that listens for active channels and
subscribes/unsubscribes as the set of active changes.

Add a consumer group based on presence set

Add typing information for ChannelGroups

Fix tests for v2

ChannelGroups is a default realtime module

types: add channel group typings

consumergroup: use modulo-based hashing scheme

The `hashring` package is node-only as it depends on the native `crypto`
package. Replaced with a simple modulo hash scheme for now.

Fixes the case where the channel is already attached and the channel is
obtained with new rewind options that require a re-attach.

Updates the consumer group partitioning test to more robustly assert
that channels are partitioned across consumers.

channelgroup: make get sync, subscribe async

This matches the pattern used by Channels, which is sync to obtain the
channel and async on subscribe in order to await attachment. In the
channel group case, awaiting the subscribe awaits the joining of the
consumer group.

consumergroup: make consumerId required field

consumergroup: make hashring a required field

format: apply prettier formatting rules

jsdoc: add consumer group docs

channelgroups: add active channel name option

channelgroup: add explicit join method

Exposes the join method on the channel group, which is analogous to
attach on the channel. Uses this in tests for more robust assertions.

Additionally avoids re-attaching to the consumer group channel if
already attached, and obtains presence membership synchronously in the
join.

consumergroup: use subscribe over on

The `on` method was not reliable across clients, despite being
documented in
https://ably.com/docs/presence-occupancy/presence?lang=java#synced, so
use subscribe instead.

channelgroup: fix assigned channel processing

We need to keep the total set of active channels around as updating the
assigned channel set when the membership changes requires computing the
new assignments from the complete channel set, not the previously set of
assigned channels.

consumergroup: include consumerId in logs

consumergroups: add test for consumer group resize

test: remove explicit join from test

lint: apply formatting and cleanup

test: replace var with let

test: remove unnecessary outer try-catch

test: add prefix to channels

Avoid channel name collisions causing tests to fail from concurrent test
runs in CI.

channelgroup: detach from channel on un-assignment

channelgroup: add unsubscribe listener method

test: fix rebalance test waits for consumers

channelgroup: add leave method

test: remove dangling console logs

test: test consumer group scale down event

test: prefix consumer group channel

Similar to the active channel, we need to avoid conflicts.

consumergroup: fix current member tracking

We store the current active set of members in the hashring.

test: fix done condition w/ at-least-once delivery

Messages can be delivered more than once during a consumer group
rescaling event, so deduplicate the results when checking the end
condition.

channelgroups: use Utils.inspectError in logs

channelgroups: do not share channel object

The Channels object used by the ChannelGroup for internal channel
operations no longer shares the same object exposed on the client via
the .channels() method. This is to ensure that independent usage of an
individual channel that happens to be included in a channel group is not
impacted by its usage in the channel group.

test: tidy up leave test

Now that we can correctly handle a channel group and channel being used
independently from the same client, this tidies up the leave test to
remove the additional client previously needed.

test: rename waitForConsumers for clarity

test: rename waitForConsumers for clarity

channelgroups: do not share channel object

The Channels object used by the ChannelGroup for internal channel
operations no longer shares the same object exposed on the client via
the .channels() method. This is to ensure that independent usage of an
individual channel that happens to be included in a channel group is not
impacted by its usage in the channel group.

channelgroups: add module integration

modules: update ChannelGroups module definitions

channelgroup: add temp rewind channel group option

channelgroup: unsubscribe channel after timeout

In order to avoid keeping the channel alive, we add a configurable
timeout after which the channel will be unsubscribed if no messages are
received. This is to avoid keeping the channel active. This can lead to
missed messages if the a message is published after the client
unsubscribes and before the channel becomes inactive. This is an
acceptable edge case for the client-side simulation, especially with the
default 1h timeout.

deps: remove unused hashring types pkg

utils: remove arrIndexOf polyfill

consumergroup: rename hashring to locator

test: use async style tests for channel groups

Replaces the use of the `done()` callback with an async function style
test.

This allows us to await channel publish results and more easily handle
race conditions in tests.

channelgroup: use qualifier options

Previously we relied on a new BaseRealtime instance with it's own
Channels object to separate usage of channels in the ChannelGroup from
independent external usage of those channels from the regular
client.channels.get() method. This led to various problems with shared
Auth state such as nonces in token requests which caused connections to
terminate and tests to fail.

A simpler solution is to avoid creating a new client instance and
instead share the Channel pool, but force the library to treat channels
used from the ChannelGroup independently (with their own attachment) by
setting dummy options in the qualifier, which is used as the key in the
channel map.

This implementation does not support channels in the channel group which
already have a qualifier. This is acceptable for the experimental
client-side simulation of the feature.
@mschristensen mschristensen force-pushed the experimental-v2/channel-and-consumer-group branch from 10068f9 to bcf965f Compare March 6, 2024 18:42
@github-actions github-actions bot temporarily deployed to staging/pull/1661/features March 6, 2024 18:42 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 6, 2024 18:43 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 6, 2024 19:02 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/typedoc March 6, 2024 19:02 Inactive
@mschristensen
Copy link
Contributor Author

@ttypic @VeskeR Tests are now passing. The problem was due to some shared auth state causing conflicts between simultaneous usage of the two BaseRealtime instances. This was causing a large amount of connection churn due to auth issues. Either way, having separate ConnectionManager instances meant double the volume of connections even when not using channel groups. So, I've made changes to use the same Channels object from the ChannelGroup as well as the base client and prevent conflicts using a dummy channel option so that the library treats channels created by the channel group independently.

Copy link
Contributor

@ttypic ttypic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, left two small comments

@@ -44,6 +44,20 @@ define([

async function monitorConnectionAsync(action, realtime, states) {
const monitoringResultPromise = new Promise((resolve, reject) => {
if (Object.prototype.toString.call(realtime) == '[object Array]') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be Array.isArray(realtime) here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes probably better, I was following the pattern used in closeAndFinish

src/common/lib/util/utils.ts Outdated Show resolved Hide resolved
modular.d.ts Show resolved Hide resolved
modular.d.ts Outdated Show resolved Hide resolved
src/common/lib/util/utils.ts Outdated Show resolved Hide resolved
@@ -0,0 +1,701 @@
'use strict';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that test files are not automatically picked up by the playwright browser tests as they are for node.
If you check, for example, this CI run for webkit in this PR you can see there are no realtime/channelgroup tests completed.

It seems that we need to add new test file names to test/support/browser_file_list.js for it to be included in browser tests. I hope it's the only place we need to change for that.

You can check locally if it works if you do describe.only('realtime/channelgroup' below, and run PLAYWRIGHT_BROWSER=chromium npm run test:playwright (might need to do npx playwright install --with-deps before that to install browsers)

@github-actions github-actions bot temporarily deployed to staging/pull/1661/features March 8, 2024 10:14 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 8, 2024 10:15 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/typedoc March 8, 2024 10:15 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/features March 8, 2024 10:18 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 8, 2024 10:18 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/typedoc March 8, 2024 10:18 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/features March 8, 2024 11:11 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/typedoc March 8, 2024 11:12 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1661/bundle-report March 8, 2024 11:12 Inactive
Each client will be entered in the presence set once per connection, so
we need to deduplicate the presence set by clientId.
…mitter

channelgroup: emit channel assigned/active events
…events

consumergroup: emit membership updated event
Since the set of active channels are surfaced on $ably:active is a
superset of those specified by a particular channel group expression,
this commit ensures we only emit an active.updated event when the set of
channels matching the consumer group expression have actually changed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants