New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue-4917] Add blockingTaskExecutor in Thrift services #5619
base: main
Are you sure you want to change the base?
Conversation
if (useBlockingTaskExecutor) { | ||
ctx.blockingTaskExecutor().execute( | ||
() -> invoke(ctx, serializationFormat, seqId, f, decodedReq, httpRes) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we think invoke()
is only thing that we should wrap with ctx.blockingTaskExecutor()
.
If not, please share to us~! 🙇
@@ -287,6 +287,7 @@ public static Function<? super RpcService, THttpService> newDecorator( | |||
|
|||
private int maxRequestStringLength; | |||
private int maxRequestContainerLength; | |||
private boolean useBlockingTaskExecutor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private boolean useBlockingTaskExecutor; | |
private final boolean useBlockingTaskExecutor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it. Thank you
@Override | ||
public void get(TestServiceRequest request, | ||
AsyncMethodCallback resultHandler) throws TException { | ||
resultHandler.onComplete(new TestServiceResponse(request.getMessage())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Could we check whether the current thread is the blocking task executor?
- Should we also check
TestService.Iface
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we check whether the current thread is the blocking task executor?
I checked it here
Line 81 in 6649eee
assertThat(blocking).isFalse(); |
Should we also check TestService.Iface?
Fixed it. Because of Iface, only verified the current thread is blocking executor without adding useBlockingTaskExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me. 👍
Left minor suggestions. 😉
...ft/thrift0.13/src/test/java/com/linecorp/armeria/server/thrift/THttpServiceBlockingTest.java
Outdated
Show resolved
Hide resolved
...ft/thrift0.13/src/test/java/com/linecorp/armeria/server/thrift/THttpServiceBlockingTest.java
Outdated
Show resolved
Hide resolved
thrift/thrift0.13/src/main/java/com/linecorp/armeria/server/thrift/THttpService.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me, @ChangguHan 👍 👍 👍
Thanks!
Oops, I realized that we also need to fix ThriftCallService not to use blocking task executor twice: armeria/thrift/thrift0.13/src/main/java/com/linecorp/armeria/server/thrift/ThriftCallService.java Line 186 in 10424c7
private static void invoke(
ServiceRequestContext ctx,
Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) {
try {
final TBase<?, ?> tArgs = func.newArgs(args);
if (func.isAsync()) {
invokeAsynchronously(impl, func, tArgs, reply);
} else if (ctx.eventLoop().inEventLoop()) {
invokeSynchronously(ctx, impl, func, tArgs, reply);
} else {
invokeSynchronously0(ctx, impl, func, tArgs, reply);
}
} catch (Throwable t) {
reply.completeExceptionally(t);
}
}
private static void invokeSynchronously(
ServiceRequestContext ctx, Object impl,
ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
ctx.blockingTaskExecutor().execute(() -> invokeSynchronously0(ctx, impl, func, args, reply));
}
private static void invokeSynchronously0(
ServiceRequestContext ctx, Object impl,
ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
...
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether to delegate the call to the blocking task executor should be decided by ThriftCallService
, not THttpService
.
...ft/thrift0.13/src/test/java/com/linecorp/armeria/server/thrift/THttpServiceBlockingTest.java
Outdated
Show resolved
Hide resolved
@trustin armeria/thrift/thrift0.13/src/main/java/com/linecorp/armeria/server/thrift/ThriftCallService.java Lines 125 to 139 in 0963905
|
@minwoox Thank you for your review. |
* Creates a new {@link ThriftCallService} with the specified service implementation. | ||
* | ||
* @param implementation an implementation of {@code *.Iface} or {@code *.AsyncIface} service interface | ||
* generated by the Apache Thrift compiler | ||
* Creates a new instance of {@link ThriftCallServiceBuilder} which can build | ||
* an instance of {@link ThriftCallService} fluently. | ||
*/ | ||
public static ThriftCallService of(Object implementation) { | ||
requireNonNull(implementation, "implementation"); | ||
return new ThriftCallService(ImmutableMap.of("", ImmutableList.of(implementation))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we keep this method because otherwise we would break backward compatibility? We could reimplement this method like this:
return builder().implementation(implementation).builder();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I applied it. thank you!
thrift/thrift0.13/src/main/java/com/linecorp/armeria/server/thrift/ThriftCallService.java
Show resolved
Hide resolved
/** | ||
* Adds an implementation for {@link ThriftServiceEntry}. | ||
*/ | ||
public ThriftCallServiceBuilder implementation(Object implementation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename this to addService()
so it is close to GrpcServiceBuilder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I change the name of this method. Thank you!
* @see ThriftCallService | ||
*/ | ||
public final class ThriftCallServiceBuilder { | ||
private Map<String, ? extends Iterable<?>> implementations; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about making this additive, like:
private final ImmutableMap.Builder<String, ...> implementations = ImmutableMap.builder();
...
public ThriftCallService build() {
...
final Map<...> implementations = this.implementations.build();
...
}
// User code:
ThriftCallService
.builder()
.addService(defaultServceImpl)
.addService("foo", fooServiceImpl)
.addService("bar", barServiceImpl)
.addService("foobar", fooServiceImpl, barServiceImpl)
.addServices(ImmutableMap.of("qux", quxServiceImpl, "quz", quzServiceImpl))
.build();
// implementations will contain "", "foo", "bar", "foobar", "qux", "quz".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good.
I applied it. Thank you for your comment.
* @see ThriftCallService | ||
*/ | ||
public final class ThriftCallServiceBuilder { | ||
private final ImmutableMap.Builder<String, Iterable<?>> implementations = ImmutableMap.builder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note that I changed the type of value here.
? extends Iterable<?>
-> Iterable<?>
/** | ||
* Adds multiple implementations for {@link ThriftServiceEntry}. | ||
*/ | ||
public ThriftCallServiceBuilder implementations(Map<String, ? extends Iterable<?>> implementations) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can rename this to addServices
as well?
*/ | ||
public ThriftCallServiceBuilder addService(String key, Object implementation) { | ||
requireNonNull(implementation, "implementation"); | ||
this.implementations.put(key, ImmutableList.of(implementation)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will overwrite the existing map entry under the specified key
, making the following code work incorrectly:
builder.addService("foobar", fooService);
builder.addService("foobar", barService);
There are two options to fix this here:
- Use
ImmutableMultiMap.Builder
from Guava - Check if there's already a mapping and then merge if so.
Also, it'd be nice if a user is allowed to specify multiple implementations in a single call:
builder.addService("foobar", fooService, barService);
// and
builder.addService("foobar", ImmutableList.of(fooService, barService));
We could replace addService()
with this new method, then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@trustin
Is it okay to use like this? I also added test here.
Lines 34 to 48 in 649bf67
* * ThriftCallService service = ThriftCallService | |
* .builder() | |
* .addService(defaultServiceImpl) // Adds an service | |
* .addService("foo", fooServiceImpl) // Adds an service with a key | |
* .addService("foobar", fooServiceImpl) // Adds multiple services to the same key | |
* .addService("foobar", barServiceImpl) | |
* .addService("foobarOnce", fooServiceImpl, barServiceImpl) // Adds multiple services at once | |
* // Adds multiple services by list | |
* .addService("foobarList", ImmutableList.of(fooServiceImpl, barServiceImpl)) | |
* // Adds multiple services by map | |
* .addServices(ImmutableMap.of("fooMap", fooServiceImpl, "barMap", barServiceImpl)) | |
* // Adds multiple services by multimap | |
* .addServices(ImmutableMultimap.of("foobarMultiMap", fooServiceImpl, | |
* "foobarMultiMap", barServiceImpl)) | |
* .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks almost done once @trustin 's comments are addressed 👍
* @see ThriftCallService | ||
*/ | ||
public final class ThriftCallServiceBuilder { | ||
private final ImmutableMap.Builder<String, Iterable<?>> implementations = ImmutableMap.builder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of consistently naming implementations
-> services
? Ditto for the method parameters and validations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, Thank you!
*/ | ||
public ThriftCallServiceBuilder addService(String key, Object implementation) { | ||
requireNonNull(implementation, "implementation"); | ||
this.implementations.put(key, ImmutableList.of(implementation)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.implementations.put(key, ImmutableList.of(implementation)); | |
implementations.put(key, ImmutableList.of(implementation)); |
ServiceRequestContext ctx, | ||
Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) { | ||
|
||
try { | ||
final TBase<?, ?> tArgs = func.newArgs(args); | ||
if (func.isAsync()) { | ||
invokeAsynchronously(impl, func, tArgs, reply); | ||
if (useBlockingTaskExecutor) { | ||
ctx.blockingTaskExecutor().execute(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: I understood that unlike gRPC
interceptors, thrift
rpcDecorators won't be invoked from the blockingTaskExecutor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct.
I left this comment to call the Thrift RPC decorators from the blockingTaskExecutor
but @trustin suggested using the event loop for executing RPC decorators.
new ScheduledThreadPoolExecutor(1, ThreadFactories.newThreadFactory("blocking-test", true)) { | ||
@Override | ||
protected void beforeExecute(Thread t, Runnable r) { | ||
blocking.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will make blocking=true
for any blocking task execution - even if there is a different call path which uses blocking task executors (of which there are many)
Is it possible to just check blocking.set(Thread.currentThread().getName().startsWith("blocking-test"));
inside the service call methods instead and remove this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't thought about it.
I appreciate your idea. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Left minor comments. 👍
public static ThriftCallService of(Map<String, ? extends Iterable<?>> implementations) { | ||
requireNonNull(implementations, "implementations"); | ||
return new ThriftCallService(implementations); | ||
public static ThriftCallServiceBuilder builder() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
public static ThriftCallServiceBuilder builder() { | |
@UnstableApi | |
public static ThriftCallServiceBuilder builder() { |
* * <pre>{@code | ||
* * ThriftCallService service = ThriftCallService |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* * <pre>{@code | |
* * ThriftCallService service = ThriftCallService | |
* <pre>{@code | |
* ThriftCallService service = ThriftCallService |
* .addServices(ImmutableMap.of("fooIterableMap", | ||
* ImmutableList.of(fooServiceImpl, barServiceImpl))) | ||
* .build(); | ||
* * }</pre> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* * }</pre> | |
* }</pre> |
* | ||
* @see ThriftCallService | ||
*/ | ||
public final class ThriftCallServiceBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public final class ThriftCallServiceBuilder { | |
@UnstableApi | |
public final class ThriftCallServiceBuilder { |
} | ||
|
||
/** | ||
* Adds an service for {@link ThriftServiceEntry}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Adds an service for {@link ThriftServiceEntry}. | |
* Adds a service for {@link ThriftServiceEntry}. |
* Adds an service with a key for {@link ThriftServiceEntry}. | ||
*/ | ||
public ThriftCallServiceBuilder addService(String key, Object... service) { | ||
requireNonNull(service, "service"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requireNonNull(service, "service"); | |
requireNonNull(key, "key"); | |
requireNonNull(service, "service"); |
* Adds services with key by iterable for {@link ThriftServiceEntry}. | ||
*/ | ||
public ThriftCallServiceBuilder addService(String key, Iterable<?> service) { | ||
if (!service.iterator().hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!service.iterator().hasNext()) { | |
requireNonNull(key, "key"); | |
requireNonNull(service, "service"); | |
if (!service.iterator().hasNext()) { |
*/ | ||
public ThriftCallService build() { | ||
return new ThriftCallService( | ||
Multimaps.asMap(servicesBuilder.build()).entrySet().stream().collect( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we can just call asMap
Multimaps.asMap(servicesBuilder.build()).entrySet().stream().collect( | |
servicesBuilder.build().asMap().entrySet().stream().collect( |
ServiceRequestContext ctx, | ||
Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) { | ||
|
||
try { | ||
final TBase<?, ?> tArgs = func.newArgs(args); | ||
if (func.isAsync()) { | ||
invokeAsynchronously(impl, func, tArgs, reply); | ||
if (useBlockingTaskExecutor) { | ||
ctx.blockingTaskExecutor().execute(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct.
I left this comment to call the Thrift RPC decorators from the blockingTaskExecutor
but @trustin suggested using the event loop for executing RPC decorators.
Motivation:
Add blockingTaskExecutor in Thrift services #4917
Modifications:
Result: