Skip to content

Commit

Permalink
[linkedin#555] Response-only Streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Zizhong Zhang committed Mar 15, 2021
1 parent 4f5c93d commit 38430cd
Show file tree
Hide file tree
Showing 29 changed files with 463 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,24 @@ public void restRequest(RestRequest request,
callback.onResponse(TransportResponseImpl.success(response));
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
restRequest = request;
restRequestContext = requestContext;
restWireAttrs = wireAttrs;
streamCallback = callback;
StreamResponseBuilder builder = new StreamResponseBuilder();
StreamResponse response = _emptyResponse ? builder.build(EntityStreams.emptyStream())
: builder.build(EntityStreams.newEntityStream(new ByteStringWriter(ByteString.copy("This is not empty".getBytes()))));
if (_deferCallback)
{
scheduleTimeout(requestContext, callback);
return;
}
callback.onResponse(TransportResponseImpl.success(response, wireAttrs));
}

@Override
public void streamRequest(StreamRequest request,
RequestContext requestContext,
Expand Down
11 changes: 11 additions & 0 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientDelegator.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ public void streamRequest(StreamRequest request, RequestContext requestContext,
_d2Client.streamRequest(request, requestContext, callback);
}

@Override
public void restRequestStreamResponse(RestRequest request, Callback<StreamResponse> callback) {
_d2Client.restRequestStreamResponse(request, callback);
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Callback<StreamResponse> callback) {
_d2Client.restRequestStreamResponse(request, requestContext, callback);
}

@Override
public void getMetadata(URI uri, Callback<Map<String, Object>> callback)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,24 @@ public void streamRequest(StreamRequest request, RequestContext requestContext,
decorateCallbackSync(request, requestContext, _d2Client::streamRequest, callback));
}

@Override
public void restRequestStreamResponse(RestRequest request, Callback<StreamResponse> callback) {
restRequestStreamResponse(request, new RequestContext(), callback);
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Callback<StreamResponse> callback) {
if (_isD2Async)
{
requestAsync(request, requestContext, _d2Client::restRequestStreamResponse, callback);
return;
}

_d2Client.restRequestStreamResponse(request, requestContext,
decorateCallbackSync(request, requestContext, _d2Client::restRequestStreamResponse, callback));
}

private <R extends Request, T> Callback<T> decorateCallbackSync(R request, RequestContext requestContext,
DecoratorClient<R, T> client, Callback<T> callback)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,16 @@ public void streamRequest(StreamRequest request,
_balancer.getClient(request, requestContext,
getClientCallback(request, requestContext, true, callback, client -> client.streamRequest(request, requestContext, loggerCallback))
);
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Callback<StreamResponse> callback) {
Callback<StreamResponse> loggerCallback = decorateLoggingCallback(callback, request, "stream");

_balancer.getClient(request, requestContext,
getClientCallback(request, requestContext, true, callback, client -> client.restRequestStreamResponse(request, requestContext, loggerCallback))
);
}

private Callback<TransportClient> getClientCallback(Request request, RequestContext requestContext, final boolean restOverStream, Callback<? extends Response> callback, SuccessCallback<Client> clientSuccessCallback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public void restRequest(RestRequest request,
getWrappedClient().restRequest(request, requestContext, wireAttrs, callback);
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
getWrappedClient().restRequestStreamResponse(request, requestContext, wireAttrs, callback);
}

@Override
public void streamRequest(StreamRequest request,
RequestContext requestContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ public void streamRequest(StreamRequest request, RequestContext requestContext,
_d2Client.streamRequest(request, requestContext, transportCallback);
}

@Override
public void restRequestStreamResponse(RestRequest request, Callback<StreamResponse> callback) {
restRequestStreamResponse(request, new RequestContext(), callback);
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Callback<StreamResponse> callback) {
final Callback<StreamResponse> transportCallback =
decorateCallbackWithRequestTimeout(callback, request, requestContext);

_d2Client.restRequestStreamResponse(request, requestContext, transportCallback);
}

/**
* Enforces the user timeout to the layer below if necessary.
*
Expand Down
45 changes: 45 additions & 0 deletions d2/src/main/java/com/linkedin/d2/balancer/clients/RetryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,29 @@ public void streamRequest(StreamRequest request, RequestContext requestContext,
}
}

@Override
public void restRequestStreamResponse(RestRequest request, Callback<StreamResponse> callback) {
restRequestStreamResponse(request, new RequestContext(), callback);
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Callback<StreamResponse> callback) {
if (_restRetryEnabled)
{
RestRequest newRequest = request.builder()
.setHeader(HttpConstants.HEADER_NUMBER_OF_RETRY_ATTEMPTS, "0")
.build();
ClientRetryTracker retryTracker = updateRetryTracker(newRequest.getURI(), false);
final Callback<StreamResponse> transportCallback = new ResponseOnlyStreamRetryRequestCallback(newRequest, requestContext, callback, retryTracker);
_d2Client.restRequestStreamResponse(newRequest, requestContext, transportCallback);
}
else
{
_d2Client.restRequestStreamResponse(request, requestContext, callback);
}
}

private ClientRetryTracker updateRetryTracker(URI uri, boolean isRetry)
{
String serviceName = LoadBalancerUtil.getServiceNameFromUri(uri);
Expand Down Expand Up @@ -276,6 +299,28 @@ public boolean doRetryRequest(RestRequest request, RequestContext context, int n
}
}

/**
* Callback implementation for Retry {@link RestRequest} and {@link StreamResponse}
*/
private class ResponseOnlyStreamRetryRequestCallback extends RetryRequestCallback<RestRequest, StreamResponse>
{
public ResponseOnlyStreamRetryRequestCallback(RestRequest request, RequestContext context, Callback<StreamResponse> callback, ClientRetryTracker retryTracker)
{
super(request, context, callback, retryTracker);
}

@Override
public boolean doRetryRequest(RestRequest request, RequestContext context, int numberOfRetryAttempts)
{
RestRequest newRequest = request.builder()
.setHeader(HttpConstants.HEADER_NUMBER_OF_RETRY_ATTEMPTS, Integer.toString(numberOfRetryAttempts))
.build();
updateRetryTracker(request.getURI(), true);
_d2Client.restRequestStreamResponse(newRequest, context, this);
return true;
}
}

/**
* Abstract callback implementation of retry requests.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ public void restRequest(RestRequest request, RequestContext requestContext, Map<
_transportClient.restRequest(rewriteRequest(request), requestContext, wireAttrs, callback);
}

@Override
public void restRequestStreamResponse(RestRequest request,
RequestContext requestContext,
Map<String, String> wireAttrs,
TransportCallback<StreamResponse> callback) {
_transportClient.restRequestStreamResponse(rewriteRequest(request), requestContext, wireAttrs, callback);
}

/**
* Asynchronously issues the given request. The given callback is invoked when the response is
* received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ public void streamRequest(StreamRequest request,
_client.streamRequest(request, requestContext, wireAttrs, callback);
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
assert _serviceName.equals(LoadBalancerUtil.getServiceNameFromUri(request.getURI()));
_client.restRequestStreamResponse(request, requestContext, wireAttrs, callback);
}

@Override
public void shutdown(Callback<None> callback)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public void restRequest(RestRequest request,
_transportClient.restRequest(request, requestContext, wireAttrs, new TrackerClientRestCallback(callback, _callTracker.startCall()));
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
_transportClient.restRequestStreamResponse(request, requestContext, wireAttrs, new TrackerClientStreamCallback(callback, _callTracker.startCall()));
}

@Override
public void streamRequest(StreamRequest request,
RequestContext requestContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ public void restRequest(RestRequest request,
getWrappedClient().restRequest(request, requestContext, wireAttrs, callback);
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
updateRequestContext(requestContext);
getWrappedClient().restRequestStreamResponse(request, requestContext, wireAttrs, callback);
}

@Override
public void streamRequest(StreamRequest request,
RequestContext requestContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ public void restRequest(RestRequest request,
++requestCount;
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
++requestCount;
}

@Override
public void shutdown(Callback<None> callback)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public void streamRequest(StreamRequest request,
() -> new StreamResponseBuilder().build(EntityStreams.emptyStream()));
}

@Override
public void restRequestStreamResponse(RestRequest request, RequestContext requestContext,
Map<String, String> wireAttrs, TransportCallback<StreamResponse> callback) {
handleRequest(request, wireAttrs, callback,
r -> {},
() -> new StreamResponseBuilder().build(EntityStreams.emptyStream()));
}

@Override
public URI getUri()
{
Expand Down

0 comments on commit 38430cd

Please sign in to comment.