Skip to content

Commit

Permalink
Ensure all exceptions are retried. (#71)
Browse files Browse the repository at this point in the history
* Ensure all exceptions are retried.

* Add unit tests for socket

* Only propagate errors to the channels on channel exceptions

* Don't rethrow. Stops the periodic timer.

---------

Co-authored-by: Brian Egan <brian@brianegan.com>
  • Loading branch information
brian-superlist and brianegan committed Mar 27, 2024
1 parent a28e41d commit 7cf64c7
Show file tree
Hide file tree
Showing 7 changed files with 520 additions and 184 deletions.
2 changes: 1 addition & 1 deletion lib/src/push.dart
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class Push {
// ignore: avoid_catches_without_on_clauses
} catch (err, stacktrace) {
_logger.warning(
'Catched error for push $ref',
'Caught error for push $ref',
err,
stacktrace,
);
Expand Down
34 changes: 21 additions & 13 deletions lib/src/socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ class PhoenixSocket {
PhoenixSocket(
/// The URL of the Phoenix server.
String endpoint, {

/// The options used when initiating and maintaining the
/// websocket connection.
PhoenixSocketOptions? socketOptions,

/// The factory to use to create the WebSocketChannel.
WebSocketChannel Function(Uri uri)? webSocketChannelFactory,
}) : _endpoint = endpoint,
_socketState = SocketState.unknown {
_socketState = SocketState.unknown,
_webSocketChannelFactory = webSocketChannelFactory {
_options = socketOptions ?? PhoenixSocketOptions();

_reconnects = _options.reconnectDelays;
Expand Down Expand Up @@ -87,6 +90,7 @@ class PhoenixSocket {
StreamController.broadcast();
final String _endpoint;
final StreamController<Message> _topicMessages = StreamController();
final WebSocketChannel Function(Uri uri)? _webSocketChannelFactory;

late Uri _mountPoint;

Expand Down Expand Up @@ -189,7 +193,9 @@ class PhoenixSocket {
final completer = Completer<PhoenixSocket?>();

try {
_ws = WebSocketChannel.connect(_mountPoint);
_ws = _webSocketChannelFactory != null
? _webSocketChannelFactory!(_mountPoint)
: WebSocketChannel.connect(_mountPoint);
_ws!.stream
.where(_shouldPipeMessage)
.listen(_onSocketData, cancelOnError: true)
Expand All @@ -212,8 +218,8 @@ class PhoenixSocket {
} else {
throw PhoenixException();
}
} on PhoenixException catch (err, stackTrace) {
_logger.severe('Raised PhoenixException', err, stackTrace);
} catch (err, stackTrace) {
_logger.severe('Raised Exception', err, stackTrace);

_ws = null;
_socketState = SocketState.closed;
Expand Down Expand Up @@ -437,16 +443,9 @@ class PhoenixSocket {
await sendMessage(_heartbeatMessage());
_logger.fine('[phoenix_socket] Heartbeat completed');
return true;
} on PhoenixException catch (err, stacktrace) {
_logger.severe(
'[phoenix_socket] Heartbeat message failed with error',
err,
stacktrace,
);
return false;
} on WebSocketChannelException catch (err, stacktrace) {
_logger.severe(
'[phoenix_socket] Heartbeat message failed with error',
'[phoenix_socket] Heartbeat message failed: WebSocketChannelException',
err,
stacktrace,
);
Expand All @@ -456,6 +455,15 @@ class PhoenixSocket {
stacktrace: stacktrace,
),
));

return false;
} catch (err, stacktrace) {
_logger.severe(
'[phoenix_socket] Heartbeat message failed',
err,
stacktrace,
);

return false;
}
}
Expand Down
File renamed without changes.
3 changes: 3 additions & 0 deletions test/mocks.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import 'package:mockito/annotations.dart';
import 'package:mockito/mockito.dart';
import 'package:phoenix_socket/phoenix_socket.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

export 'mocks.mocks.dart';

@GenerateNiceMocks(
[
MockSpec<PhoenixChannel>(),
MockSpec<PhoenixSocket>(),
MockSpec<WebSocketChannel>(),
MockSpec<WebSocketSink>(),
],
)
class MockTest extends Mock {
Expand Down

0 comments on commit 7cf64c7

Please sign in to comment.