Skip to content

Commit

Permalink
#39 refresh client
Browse files Browse the repository at this point in the history
  • Loading branch information
peacess committed Jan 26, 2021
1 parent e620d59 commit ce6a857
Show file tree
Hide file tree
Showing 23 changed files with 3,372 additions and 31 deletions.
5 changes: 4 additions & 1 deletion packages/services/lib/services.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
library services;

export 'src/grpc/client_channel.dart';
export 'src/rpc_client/client_channel.dart';
export 'src/rpc_client/refresh.dart';
export 'src/enum_const.dart';

93 changes: 93 additions & 0 deletions packages/services/lib/src/enum_const.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// type AppPlatformType string
//
// //命名使用 rustup target list中的名字
// const (
// AppPlatformType_any AppPlatformType = "any"
// AppPlatformType_aarch64_linux_android AppPlatformType = "aarch64-linux-android"
// AppPlatformType_armv7_linux_androideabi AppPlatformType = "armv7-linux-androideabi"
// AppPlatformType_i686_linux_android AppPlatformType = "i686-linux-android"
// AppPlatformType_x86_64_linux_android AppPlatformType = "x86_64-linux-android"
// AppPlatformType_x86_64_pc_windows_gnu AppPlatformType = "x86_64-pc-windows-gnu"
// AppPlatformType_x86_64_unknown_linux_gnu AppPlatformType = "x86_64-unknown-linux-gnu"
// )

enum AppPlatformType {
any,
aarch64_linux_android,
armv7_linux_androideabi,
i686_linux_android,
x86_64_linux_android,
x86_64_pc_windows_gnu,
x86_64_unknown_linux_gnu
}

extension AppPlatformTypeEx on AppPlatformType {

static AppPlatformType from(String chainType) {
AppPlatformType re;
switch (chainType) {
case "any":
re = AppPlatformType.any;
break;
case "aarch64_linux_android":
re = AppPlatformType.aarch64_linux_android;
break;
case "armv7_linux_androideabi":
re = AppPlatformType.armv7_linux_androideabi;
break;
case "i686_linux_android":
re = AppPlatformType.i686_linux_android;
break;
case "x86_64_linux_android":
re = AppPlatformType.x86_64_linux_android;
break;
case "x86_64_pc_windows_gnu":
re = AppPlatformType.x86_64_pc_windows_gnu;
break;
case "x86_64_unknown_linux_gnu":
re = AppPlatformType.x86_64_unknown_linux_gnu;
break;
default:
{
//todo log
// let err = format!("the str:{} can not be ChainType", chain_type);
// log.error!("{}", err);
re = null;
}
}
return re;
}

String toEnumString() {
String re;
switch (this) {
case AppPlatformType.any:
re = "any";break;
case AppPlatformType.aarch64_linux_android:
re = "aarch64_linux_android";
break;
case AppPlatformType.armv7_linux_androideabi:
re = "armv7_linux_androideabi";
break;
case AppPlatformType.i686_linux_android:
re = "i686_linux_android";
break;
case AppPlatformType.x86_64_linux_android:
re = "x86_64_linux_android";
break;
case AppPlatformType.x86_64_pc_windows_gnu:
re = "x86_64_pc_windows_gnu";
break;
case AppPlatformType.x86_64_unknown_linux_gnu:
re = "x86_64_unknown_linux_gnu";
break;
}
return re;
}
}

extension StringEx on String {
AppPlatformType toAppPlatformType() {
return AppPlatformTypeEx.from(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import 'package:grpc/src/client/http2_connection.dart';
import 'package:grpc/src/shared/profiler.dart';
import 'package:meta/meta.dart';

ClientTransportChannel createClientChannel(
ConnectParameter parameter, ReFreshCall refreshCall) {
var refreshParameter = ReFreshParameter(parameter, refreshCall);
var channel = ClientTransportChannel(refreshParameter);
$channel.ClientChannel createClientChannel(_ReFreshCall refreshCall) {
var channel = ClientTransportChannel(_ReFreshParameter(refreshCall));
return channel;
}

Expand All @@ -24,49 +22,49 @@ class ConnectParameter {
const ChannelOptions(credentials: ChannelCredentials.insecure())});
}

typedef ReFreshCall = Future<ConnectParameter> Function(ConnectParameter);
typedef _ReFreshCall = Future<ConnectParameter> Function();

//刷新服务端连接参数
class ReFreshParameter {
class _ReFreshParameter extends Function {
ConnectParameter _connectParameter;
final ConnectParameter _refreshService;
final ReFreshCall _refreshCall;
final _ReFreshCall _refreshCall;

ReFreshParameter(this._refreshService, this._refreshCall);
_ReFreshParameter(this._refreshCall);

ConnectParameter get connectParameter => _connectParameter;

void resetConnectParameter() {
void _resetConnectParameter() {
_connectParameter = null;
}

Future<ConnectParameter> refreshParameter() async {
Future<ConnectParameter> _refreshParameter() async {
if (_connectParameter != null) {
return _connectParameter;
}
if (_refreshCall != null) {
_connectParameter = await _refreshCall(_refreshService);
_connectParameter = await _refreshCall();
}
return _connectParameter;
}
}

// see: grpc-2.8.0/lib/src/client/channel.dart ClientChannelBase
// 增加功能,当连接出错时,刷新服务地址
@visibleForTesting
class ClientTransportChannel implements $channel.ClientChannel {
// TODO: Multiple connections, load balancing.
ClientConnection _connection;

bool _isShutdown = false;

final ReFreshParameter _refreshParameter;
final _ReFreshParameter _refreshParameter;

ClientTransportChannel(this._refreshParameter);

Future<ClientConnection> createConnection() async {
var parameter = _refreshParameter.connectParameter;
if (parameter == null) {
await _refreshParameter.refreshParameter();
await _refreshParameter._refreshParameter();
parameter = _refreshParameter.connectParameter;
}
if (parameter == null) {
Expand Down Expand Up @@ -98,7 +96,7 @@ class ClientTransportChannel implements $channel.ClientChannel {
@override
ClientCall<Q, R> createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options) {
final call = ClientCallError(
final call = _ClientCallError(
method,
requests,
options,
Expand All @@ -113,10 +111,6 @@ class ClientTransportChannel implements $channel.ClientChannel {
return call;
}

void resetConnectParameter() {
_onConnectionError(new Error());
}

@visibleForTesting
ConnectParameter get connectParameter => _refreshParameter.connectParameter;

Expand All @@ -126,19 +120,19 @@ class ClientTransportChannel implements $channel.ClientChannel {
//不能调用 _connection.shutdown()
// var t = Future(_connection.shutdown);
_connection = null;
_refreshParameter.resetConnectParameter();
_refreshParameter._resetConnectParameter();
}
}
}

typedef ErrorCall = void Function(dynamic);
typedef _ErrorCall = void Function(dynamic);

class ClientCallError<Q, R> extends ClientCall<Q, R> {
ErrorCall _errCall;
class _ClientCallError<Q, R> extends ClientCall<Q, R> {
_ErrorCall _errCall;
Stream<R> _stream = null;

ClientCallError(ClientMethod<Q, R> method, Stream<Q> requests,
CallOptions options, ErrorCall this._errCall,
_ClientCallError(ClientMethod<Q, R> method, Stream<Q> requests,
CallOptions options, _ErrorCall this._errCall,
[timelineTask])
: super(method, requests, options, timelineTask);

Expand Down
45 changes: 45 additions & 0 deletions packages/services/lib/src/rpc_client/refresh.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@

import 'package:grpc/grpc.dart';
import 'package:meta/meta.dart';
import 'package:services/services.dart';
import 'package:services/src/rpc_face/refresh.pbgrpc.dart';

class Refresh {
ClientChannel _channel;
RefreshFaceClient _client;
Refresh_RefreshReq _req;
static Refresh _instance;
Refresh._internal();
factory Refresh.get(ConnectParameter parameter, String version, AppPlatformType platformType){
if(_instance == null) {
_instance = Refresh._internal();
_instance._channel = new ClientChannel(
parameter.host,
port: parameter.port,
options: const ChannelOptions(credentials: ChannelCredentials.insecure(), connectionTimeout: Duration(minutes: 1)),
);
_instance._client = new RefreshFaceClient(_instance._channel);
_instance._req = Refresh_RefreshReq(version:version,appPlatformType: platformType.toEnumString());
}
return _instance;
}

@visibleForTesting
set version(String v) => _req.version = v;

Future<ConnectParameter> refreshCall() async {
var res = await _client.refresh(_req);
if(res.hasErr()){
return null;
}
ConnectParameter parameter = new ConnectParameter(res.serviceMeta.host, res.serviceMeta.port.toInt());
return parameter;
}

shutDown() {
_client = null;
_channel.shutdown();
_channel = null;
}
}

0 comments on commit ce6a857

Please sign in to comment.