Skip to content

Commit

Permalink
Merge pull request #13 from eugenechyrski/master
Browse files Browse the repository at this point in the history
Support for rsocket over websocket in browser
  • Loading branch information
linux-china committed Apr 5, 2022
2 parents 902a4fd + 81b6f38 commit f31cc6e
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 17 deletions.
4 changes: 2 additions & 2 deletions lib/core/rsocket_requester.dart
Expand Up @@ -261,10 +261,10 @@ class RSocketRequester extends RSocket {
case frame_types.REQUEST_RESPONSE:
var requestResponseFrame = frame as RequestResponseFrame;
if (responder != null && requestResponseFrame.payload != null) {
responder!.requestResponse!(requestResponseFrame.payload)
responder!.subscribe!(requestResponseFrame.payload)
.then((payload) {
connection.write(
FrameCodec.encodePayloadFrame(header.streamId, true, payload));
FrameCodec.encodePayloadFrame(header.streamId, false, payload));
}).catchError((error) {
var rsocketError = convertToRSocketException(error);
connection.write(FrameCodec.encodeErrorFrame(
Expand Down
4 changes: 3 additions & 1 deletion lib/core/rsocket_responder.dart
@@ -1,4 +1,6 @@
import 'dart:io';


import 'package:universal_io/io.dart';

import '../core/rsocket_requester.dart';
import '../duplex_connection.dart';
Expand Down
24 changes: 15 additions & 9 deletions lib/duplex_connection.dart
@@ -1,6 +1,8 @@
import 'dart:io';
import 'dart:typed_data';

import 'package:universal_io/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

import 'io/bytes.dart';
import 'rsocket.dart';

Expand Down Expand Up @@ -56,14 +58,16 @@ class TcpDuplexConnection extends DuplexConnection {
}

class WebSocketDuplexConnection extends DuplexConnection {
WebSocket webSocket;
bool closed = false;
WebSocketChannel webSocket;
bool closed = true;

WebSocketDuplexConnection(this.webSocket);

@override
void init() {
webSocket.listen((message) {


webSocket.stream.listen((message) {
var data = message as List<int>;
var frameLenBytes = i24ToBytes(data.length);
receiveHandler!(Uint8List.fromList(frameLenBytes + data));
Expand All @@ -79,15 +83,14 @@ class WebSocketDuplexConnection extends DuplexConnection {
if (!closed) {
closed = true;
_availability = 0.0;
webSocket.close();
closeHandler?.call();
}
}

@override
void write(Uint8List chunk) {
//remove frame length: 3 bytes
webSocket.add(chunk.sublist(3));
webSocket.sink.add(chunk.sublist(3));
}
}

Expand All @@ -97,10 +100,13 @@ Future<DuplexConnection> connectRSocket(String url, TcpChunkHandler handler) {
if (scheme == 'tcp') {
var socketFuture = Socket.connect(uri.host, uri.port);
return socketFuture.then((socket) => TcpDuplexConnection(socket));
} else if (scheme == 'ws' || scheme == 'wss') {
var socketFuture = WebSocket.connect(url);
return socketFuture.then((socket) => WebSocketDuplexConnection(socket));
}if (scheme == 'ws' || scheme == 'wss') {
final websocket = WebSocketChannel.connect(
Uri.parse(url),
);
return Future.value(WebSocketDuplexConnection(websocket));
} else {
return Future.error('${scheme} unsupported');
}
}

11 changes: 10 additions & 1 deletion lib/io/bytes.dart
Expand Up @@ -155,7 +155,16 @@ class RSocketByteBuffer {
}

Uint8List i64ToBytes(int value) {
return Uint8List(8)..buffer.asByteData().setUint64(0, value, Endian.big);
//because of browser limitations
int l = value;
var b = BytesBuilder();
for (int i = 7; i >= 0; i--) {
b.addByte(l & 0xFF);

l >>= 8;
}
return Uint8List.fromList(b.toBytes().reversed.toList());
//return Uint8List(8)..buffer.asByteData().setUint64(0, value, Endian.big);
}

Uint8List i32ToBytes(int value) {
Expand Down
2 changes: 1 addition & 1 deletion lib/rsocket.dart
Expand Up @@ -26,7 +26,7 @@ class RSocket implements Closeable, Availability {
(Stream<Payload> payloads) => Stream.error(Exception('Unsupported'));
MetadataPush? metadataPush =
(Payload? payload) => Future.error(Exception('Unsupported'));

RequestResponse? subscribe= (Payload? payload)=> Future.error(Exception('Unsupported'));
@override
void close() {}

Expand Down
3 changes: 2 additions & 1 deletion lib/rsocket_server.dart
@@ -1,4 +1,5 @@
import 'dart:io';

import 'package:universal_io/io.dart';

import 'core/rsocket_responder.dart';
import 'rsocket.dart';
Expand Down
4 changes: 2 additions & 2 deletions pubspec.yaml
Expand Up @@ -7,7 +7,7 @@ issue_tracker: https://github.com/rsocket/rsocket-dart/issues
environment:
sdk: '>=2.12.0 <3.0.0'
dependencies:
rxdart: ^0.27.2
collection: ^1.15.0-nullsafety.4
web_socket_channel:
universal_io:
dev_dependencies:
test: ^1.19.2

0 comments on commit f31cc6e

Please sign in to comment.