Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some reported issues with channels #73

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## [0.7.2.]

- Delay channel join until socket is open
- Only emit channel replies from channel.messages

## [0.7.1.]

- Ensure all exceptions are retried
Expand Down
12 changes: 9 additions & 3 deletions lib/src/channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class PhoenixChannel {
_joinPush = _prepareJoin();
_logger = Logger('phoenix_socket.channel.$loggerName');
_subscriptions
..add(messages.listen(_onMessage))
..add(_controller.stream.listen(_onMessage))
..addAll(_subscribeToSocketStreams(socket));
}

Expand Down Expand Up @@ -83,7 +83,9 @@ class PhoenixChannel {
final List<Push> pushBuffer = [];

/// Stream of all messages coming through this channel from the backend.
Stream<Message> get messages => _controller.stream;
Stream<Message> get messages => _controller.stream.where(
(message) => !message.event.isReply || message.event.isChannelReply,
);

/// Unique identifier of the 'join' push message.
String get joinRef => _joinPush.ref;
Expand Down Expand Up @@ -226,7 +228,11 @@ class PhoenixChannel {
}

_joinedOnce = true;
_attemptJoin();
if (socket.isConnected) {
_attemptJoin();
} else {
_state = PhoenixChannelState.errored;
}

return _joinPush;
}
Expand Down
4 changes: 4 additions & 0 deletions lib/src/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ class PhoenixChannelEvent extends Equatable {
value.startsWith(__chanReplyEventName) ||
value.startsWith(__replyEventName);

/// Whether the event name is a 'channel reply' event
bool get isChannelReply =>
value.startsWith(__chanReplyEventName);

@override
List<Object> get props => [value];
}
10 changes: 7 additions & 3 deletions lib/src/push.dart
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ class Push {
/// after a reconnection.
Future<void> resend(Duration? newTimeout) async {
timeout = newTimeout ?? timeout;
reset();
if (_sent) {
reset();
}
await send();
}

Expand Down Expand Up @@ -264,8 +266,10 @@ class Push {

// Remove existing waiters and reset completer
void cleanUp() {
clearReceivers();
_responseCompleter = Completer();
if (_sent) {
clearReceivers();
_responseCompleter = Completer();
}
}

void _receiveResponse(dynamic response) {
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ description: >-
PhoenixSocket provides a feature-complete implementation
of Phoenix Sockets, using a single API based on StreamChannels
compatible with any deployment of Flutter.
version: 0.7.1
version: 0.7.2
repository: https://www.github.com/matehat/phoenix-socket-dart
homepage: https://www.github.com/matehat/phoenix-socket-dart

Expand Down
28 changes: 28 additions & 0 deletions test/channel_integration_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,19 @@ void main() {
await completer.future;
});

test('can join a channel through an unawaited socket', () async {
final socket = PhoenixSocket(addr);
final completer = Completer<void>();

socket.connect();
socket.addChannel(topic: 'channel1').join().onReply('ok', (reply) {
expect(reply.status, equals('ok'));
completer.complete();
});

await completer.future;
});

test('can join a channel requiring parameters', () async {
final socket = PhoenixSocket(addr);

Expand Down Expand Up @@ -80,6 +93,21 @@ void main() {
expect(reply.response, equals({'name': 'bar'}));
});

test('only emits reply messages that are channel replies', () async {
final socket = PhoenixSocket(addr);

socket.connect();

final channel1 = socket.addChannel(topic: 'channel1');
final channelMessages = [];
channel1.messages.forEach((element) => channelMessages.add(element));

await channel1.join().future;
await channel1.push('hello!', {'foo': 'bar'}).future;

expect(channelMessages, hasLength(2));
});

test('can receive messages from channels', () async {
final socket = PhoenixSocket(addr);

Expand Down