Skip to content

Commit

Permalink
Merge pull request #87 from uber/hyperbahn-client
Browse files Browse the repository at this point in the history
Hyperbahn client
  • Loading branch information
ShanniLi committed Oct 20, 2015
2 parents c9e5b33 + 5e88331 commit 7dc3ba0
Show file tree
Hide file tree
Showing 24 changed files with 611 additions and 35 deletions.
14 changes: 9 additions & 5 deletions tchannel-core/src/main/java/com/uber/tchannel/api/TChannel.java
Expand Up @@ -27,6 +27,7 @@
import com.uber.tchannel.api.errors.TChannelError;
import com.uber.tchannel.api.errors.TChannelConnectionTimeout;
import com.uber.tchannel.api.handlers.RequestHandler;
import com.uber.tchannel.channels.Connection;
import com.uber.tchannel.channels.PeerManager;
import com.uber.tchannel.channels.ChannelRegistrar;
import com.uber.tchannel.codecs.MessageCodec;
Expand All @@ -48,7 +49,6 @@
import com.uber.tchannel.schemes.ThriftSerializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -211,14 +211,18 @@ public ListenableFuture<RawResponse> call(
}

// Get an outbound channel
Channel ch = this.peerManager.findOrNew(new InetSocketAddress(host, port), this.clientBootstrap).channel();
Connection conn = this.peerManager.findOrNew(new InetSocketAddress(host, port), this.clientBootstrap);

if (!this.peerManager.waitForIdentified(ch, this.initTimeout)) {
throw new TChannelConnectionTimeout();
if (!conn.waitForIdentified(this.initTimeout)) {
if (conn.lastError() != null) {
throw conn.lastError();
} else {
throw new TChannelConnectionTimeout();
}
}

// Get a response router for our outbound channel
ResponseRouter responseRouter = ch.pipeline().get(ResponseRouter.class);
ResponseRouter responseRouter = conn.channel().pipeline().get(ResponseRouter.class);

// Ask the router to make a call on our behalf, and return its promise
return responseRouter.expectResponse(request);
Expand Down
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2015 Uber Technologies, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.uber.tchannel.api.errors;

public class TChannelConnectionFailure extends TChannelError {
public TChannelConnectionFailure(Throwable ex) {
super("Failed to connect to the host", TChannelError.ERROR_CONNECTION_FAILURE, ex);
}
}
Expand Up @@ -23,12 +23,15 @@
package com.uber.tchannel.api.errors;

public class TChannelError extends Exception {
public static final String ERROR_INTERRUPTED = "tchannel.interrupted";

public static final String ERROR_INIT_TIMEOUT = "tchannel.connection.timeout";
public static final String ERROR_CONNECTION_FAILURE = "tchannel.socket";

public final String type;
public final Exception subError;
public final Throwable subError;

public TChannelError(String message, String type, Exception subError) {
public TChannelError(String message, String type, Throwable subError) {
super(message);
this.type = type;
this.subError = subError;
Expand Down
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2015 Uber Technologies, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.uber.tchannel.api.errors;

public class TChannelInterrupted extends TChannelError {
public TChannelInterrupted(Throwable ex) {
super("Interrupted Error", TChannelError.ERROR_INTERRUPTED, ex);
}
}
Expand Up @@ -21,6 +21,7 @@
*/

package com.uber.tchannel.channels;
import com.uber.tchannel.api.errors.TChannelError;
import com.uber.tchannel.messages.InitMessage;
import io.netty.channel.Channel;

Expand All @@ -35,6 +36,7 @@ public class Connection {

private final Channel channel;
private String remoteAddress = null;
private TChannelError lastError = null;

public Connection(Channel channel, Direction direction) {
this.channel = channel;
Expand All @@ -47,6 +49,9 @@ public Connection(Channel channel, Direction direction) {
public Channel channel() {
return this.channel;
}
public TChannelError lastError() {
return this.lastError;
}

public synchronized boolean satisfy(ConnectionState preferedState) {
ConnectionState connState = this.state;
Expand All @@ -65,7 +70,8 @@ public synchronized boolean satisfy(ConnectionState preferedState) {

public synchronized void setState(ConnectionState state) {
this.state = state;
if (state == ConnectionState.IDENTIFIED) {
if (state == ConnectionState.IDENTIFIED || (
state == ConnectionState.UNCONNECTED && this.lastError != null)) {
this.notifyAll();
}
}
Expand All @@ -81,6 +87,12 @@ public synchronized void setIndentified(Map<String, String> headers) {
this.setState(ConnectionState.IDENTIFIED);
}

public synchronized void setIndentified(TChannelError error) {
this.remoteAddress = null;
this.lastError = error;
this.setState(ConnectionState.UNCONNECTED);
}

public synchronized boolean isEphemeral() {
return this.remoteAddress.equals("0.0.0.0:0");
}
Expand All @@ -99,6 +111,7 @@ public synchronized boolean waitForIdentified(long timeout) {
// TODO reap connections/peers on init timeout
try {
if (this.state != ConnectionState.IDENTIFIED) {
this.lastError = null;
this.wait(timeout);
}
} catch (InterruptedException ex) {
Expand All @@ -109,7 +122,7 @@ public synchronized boolean waitForIdentified(long timeout) {
}

public synchronized void close() throws InterruptedException {
channel.close().sync();
channel.close();
this.state = ConnectionState.DESTROYED;
}

Expand Down
21 changes: 18 additions & 3 deletions tchannel-core/src/main/java/com/uber/tchannel/channels/Peer.java
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.HashMap;

import com.uber.tchannel.api.errors.TChannelConnectionFailure;
import com.uber.tchannel.api.errors.TChannelError;
import com.uber.tchannel.messages.InitMessage;
import com.uber.tchannel.messages.InitRequest;
import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -99,14 +101,27 @@ public Connection remove(Channel channel) {
return conn;
}

public Connection connect(Bootstrap bootstrap) throws InterruptedException {
public Connection connect(Bootstrap bootstrap) throws TChannelError {
Connection conn = getConnection(ConnectionState.IDENTIFIED);
if (conn != null) {
return conn;
}

Channel channel = bootstrap.connect(remoteAddress).sync().channel();
return add(channel, Connection.Direction.OUT);
final ChannelFuture f = bootstrap.connect(remoteAddress);
Channel channel = f.channel();
final Connection connection = add(channel, Connection.Direction.OUT);

// handle connection errors
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
connection.setIndentified(new TChannelConnectionFailure(future.cause()));
}
}
});

return connection;
}

public Connection getConnection(ConnectionState preferedState) {
Expand Down
Expand Up @@ -22,6 +22,7 @@

package com.uber.tchannel.channels;

import com.uber.tchannel.api.errors.TChannelError;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -38,7 +39,7 @@ public class PeerManager {
private final ConcurrentHashMap<SocketAddress, Peer> peers = new ConcurrentHashMap<>();
private String hostPort = "0.0.0.0:0";

public Connection findOrNew(SocketAddress address, Bootstrap bootstrap) throws InterruptedException {
public Connection findOrNew(SocketAddress address, Bootstrap bootstrap) throws TChannelError {
Peer peer = peers.get(address);
if (peer == null) {
peer = new Peer(this, address);
Expand Down
Expand Up @@ -30,8 +30,10 @@
import com.uber.tchannel.api.handlers.RequestHandler;
import com.uber.tchannel.errors.BadRequestError;
import com.uber.tchannel.errors.BusyError;
import com.uber.tchannel.errors.ErrorType;
import com.uber.tchannel.headers.ArgScheme;
import com.uber.tchannel.headers.TransportHeaders;
import com.uber.tchannel.messages.ErrorMessage;
import com.uber.tchannel.schemes.JSONSerializer;
import com.uber.tchannel.schemes.RawRequest;
import com.uber.tchannel.schemes.RawResponse;
Expand Down Expand Up @@ -127,7 +129,14 @@ public void onSuccess(RawResponse response) {
@Override
public void onFailure(Throwable throwable) {
queuedRequests.decrementAndGet();
// TODO handle the failure case

// TODO better interface for sending errors
ErrorMessage error = new ErrorMessage(
rawRequest.getId(),
ErrorType.BadRequest,
new Trace(0, 0, 0, (byte) 0x00),
throwable.getMessage());
ctx.writeAndFlush(error);
}
});
}
Expand Down
Expand Up @@ -50,8 +50,12 @@ public String decodeEndpoint(ByteBuf arg1) {
public Map<String, String> decodeHeaders(ByteBuf arg2) {
String headerJSON = arg2.toString(CharsetUtil.UTF_8);
arg2.release();
Map<String, String> headers = GSON.fromJson(headerJSON, HEADER_TYPE);
return (headers != null) ? headers : new HashMap<String, String>();
if (headerJSON == null || headerJSON.isEmpty() || headerJSON.equals("\"\"")) {
headerJSON = "{}";
}

Map<String, String> headers = new Gson().fromJson(headerJSON, HEADER_TYPE);
return (headers == null) ? new HashMap<String, String>() : headers;
}

@Override
Expand Down
Expand Up @@ -60,7 +60,7 @@ public void testPeerAndConnections() throws Exception {
int port = server.getListeningPort();

// create client
final TChannel client = new TChannel.Builder("json-server")
final TChannel client = new TChannel.Builder("server")
.setServerHost(host)
.build();
client.listen();
Expand Down
11 changes: 11 additions & 0 deletions tchannel-example/README.md
Expand Up @@ -25,4 +25,15 @@ java -cp tchannel-example/target/tchannel-example.jar com.uber.tchannel.ping.Pin
#Stopping Client...
```

#### HyperbahnExample
```bash
mvn package
Run Hyperbahn: node server.js --port 21300 2>&1 | jq .
java -cp tchannel-example/target/tchannel-example.jar com.uber.tchannel.hyperbahn.HyperbahnExample
tcurl -p 127.0.0.1:21300 javaServer ping -j -2 "{}" -3 '{"request":"hello"}' | jq .
```




## MIT Licenced
5 changes: 5 additions & 0 deletions tchannel-example/pom.xml
Expand Up @@ -41,6 +41,11 @@
<artifactId>tchannel-core</artifactId>
<version>0.1.4-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.uber.tchannel</groupId>
<artifactId>tchannel-hyperbahn</artifactId>
<version>0.1.4-SNAPSHOT</version>
</dependency>
</dependencies>

<build>
Expand Down

0 comments on commit 7dc3ba0

Please sign in to comment.