Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue-4917] Add blockingTaskExecutor in Thrift services #5619

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

ChangguHan
Copy link
Contributor

Motivation:

Add blockingTaskExecutor in Thrift services #4917

Modifications:

  • Add useBlockingTaskExecutor into THttpServiceBuilder.java
  • Apply useBlockingTaskExecutor into THttpService.java

Result:

@CLAassistant
Copy link

CLAassistant commented Apr 18, 2024

CLA assistant check
All committers have signed the CLA.

Comment on lines 621 to 624
if (useBlockingTaskExecutor) {
ctx.blockingTaskExecutor().execute(
() -> invoke(ctx, serializationFormat, seqId, f, decodedReq, httpRes)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we think invoke() is only thing that we should wrap with ctx.blockingTaskExecutor().

If not, please share to us~! 🙇

@ikhoon ikhoon added new feature sprint Issues for OSS Sprint participants labels Apr 22, 2024
@@ -287,6 +287,7 @@ public static Function<? super RpcService, THttpService> newDecorator(

private int maxRequestStringLength;
private int maxRequestContainerLength;
private boolean useBlockingTaskExecutor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private boolean useBlockingTaskExecutor;
private final boolean useBlockingTaskExecutor;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it. Thank you

@Override
public void get(TestServiceRequest request,
AsyncMethodCallback resultHandler) throws TException {
resultHandler.onComplete(new TestServiceResponse(request.getMessage()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Could we check whether the current thread is the blocking task executor?
  • Should we also check TestService.Iface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we check whether the current thread is the blocking task executor?

I checked it here

Should we also check TestService.Iface?

Fixed it. Because of Iface, only verified the current thread is blocking executor without adding useBlockingTaskExecutor

thrift/thrift0.13/src/test/thrift/Test.thrift Outdated Show resolved Hide resolved
Copy link
Member

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me. 👍
Left minor suggestions. 😉

Copy link
Member

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me, @ChangguHan 👍 👍 👍
Thanks!

@minwoox
Copy link
Member

minwoox commented Apr 23, 2024

Oops, I realized that we also need to fix ThriftCallService not to use blocking task executor twice:

private static void invoke(
        ServiceRequestContext ctx,
        Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) {

    try {
        final TBase<?, ?> tArgs = func.newArgs(args);
        if (func.isAsync()) {
            invokeAsynchronously(impl, func, tArgs, reply);
        } else if (ctx.eventLoop().inEventLoop()) {
            invokeSynchronously(ctx, impl, func, tArgs, reply);
        } else {
            invokeSynchronously0(ctx, impl, func, tArgs, reply);
        }
    } catch (Throwable t) {
        reply.completeExceptionally(t);
    }
}

private static void invokeSynchronously(
        ServiceRequestContext ctx, Object impl,
        ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
    ctx.blockingTaskExecutor().execute(() -> invokeSynchronously0(ctx, impl, func, args, reply));
}

private static void invokeSynchronously0(
        ServiceRequestContext ctx, Object impl,
        ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
    ...
}

Copy link
Member

@trustin trustin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether to delegate the call to the blocking task executor should be decided by ThriftCallService, not THttpService.

@ChangguHan
Copy link
Contributor Author

@trustin
I fix to decide whether to check useBlockingTaskExecutor in ThriftCallService instead of THttpService.
Would you check this?

if (func.isAsync()) {
if (useBlockingTaskExecutor) {
ctx.blockingTaskExecutor().execute(() -> {
try {
invokeAsynchronously(impl, func, tArgs, reply);
} catch (Throwable t) {
reply.completeExceptionally(t);
}
});
} else {
invokeAsynchronously(impl, func, tArgs, reply);
}
} else {
invokeSynchronously(ctx, impl, func, tArgs, reply);
}

@ChangguHan
Copy link
Contributor Author

Oops, I realized that we also need to fix ThriftCallService not to use blocking task executor twice:

private static void invoke(
        ServiceRequestContext ctx,
        Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) {

    try {
        final TBase<?, ?> tArgs = func.newArgs(args);
        if (func.isAsync()) {
            invokeAsynchronously(impl, func, tArgs, reply);
        } else if (ctx.eventLoop().inEventLoop()) {
            invokeSynchronously(ctx, impl, func, tArgs, reply);
        } else {
            invokeSynchronously0(ctx, impl, func, tArgs, reply);
        }
    } catch (Throwable t) {
        reply.completeExceptionally(t);
    }
}

private static void invokeSynchronously(
        ServiceRequestContext ctx, Object impl,
        ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
    ctx.blockingTaskExecutor().execute(() -> invokeSynchronously0(ctx, impl, func, args, reply));
}

private static void invokeSynchronously0(
        ServiceRequestContext ctx, Object impl,
        ThriftFunction func, TBase<?, ?> args, CompletableRpcResponse reply) {
    ...
}

@minwoox Thank you for your review.
I fix to check in ThriftCallService!

Comment on lines 66 to 74
* Creates a new {@link ThriftCallService} with the specified service implementation.
*
* @param implementation an implementation of {@code *.Iface} or {@code *.AsyncIface} service interface
* generated by the Apache Thrift compiler
* Creates a new instance of {@link ThriftCallServiceBuilder} which can build
* an instance of {@link ThriftCallService} fluently.
*/
public static ThriftCallService of(Object implementation) {
requireNonNull(implementation, "implementation");
return new ThriftCallService(ImmutableMap.of("", ImmutableList.of(implementation)));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we keep this method because otherwise we would break backward compatibility? We could reimplement this method like this:

    return builder().implementation(implementation).builder();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I applied it. thank you!

/**
* Adds an implementation for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder implementation(Object implementation) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename this to addService() so it is close to GrpcServiceBuilder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change the name of this method. Thank you!

* @see ThriftCallService
*/
public final class ThriftCallServiceBuilder {
private Map<String, ? extends Iterable<?>> implementations;
Copy link
Member

@trustin trustin Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about making this additive, like:

private final ImmutableMap.Builder<String, ...> implementations = ImmutableMap.builder();
...
public ThriftCallService build() {
    ...
    final Map<...> implementations = this.implementations.build();
    ...
}

// User code:
ThriftCallService
  .builder()
  .addService(defaultServceImpl)
  .addService("foo", fooServiceImpl)
  .addService("bar", barServiceImpl)
  .addService("foobar", fooServiceImpl, barServiceImpl)
  .addServices(ImmutableMap.of("qux", quxServiceImpl, "quz", quzServiceImpl))
  .build();

// implementations will contain "", "foo", "bar", "foobar", "qux", "quz".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good.
I applied it. Thank you for your comment.

* @see ThriftCallService
*/
public final class ThriftCallServiceBuilder {
private final ImmutableMap.Builder<String, Iterable<?>> implementations = ImmutableMap.builder();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that I changed the type of value here.
? extends Iterable<?> -> Iterable<?>

/**
* Adds multiple implementations for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder implementations(Map<String, ? extends Iterable<?>> implementations) {
Copy link
Member

@trustin trustin May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can rename this to addServices as well?

*/
public ThriftCallServiceBuilder addService(String key, Object implementation) {
requireNonNull(implementation, "implementation");
this.implementations.put(key, ImmutableList.of(implementation));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will overwrite the existing map entry under the specified key, making the following code work incorrectly:

builder.addService("foobar", fooService);
builder.addService("foobar", barService);

There are two options to fix this here:

  • Use ImmutableMultiMap.Builder from Guava
  • Check if there's already a mapping and then merge if so.

Also, it'd be nice if a user is allowed to specify multiple implementations in a single call:

builder.addService("foobar", fooService, barService);
// and 
builder.addService("foobar", ImmutableList.of(fooService, barService));

We could replace addService() with this new method, then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@trustin
Is it okay to use like this? I also added test here.

* * ThriftCallService service = ThriftCallService
* .builder()
* .addService(defaultServiceImpl) // Adds an service
* .addService("foo", fooServiceImpl) // Adds an service with a key
* .addService("foobar", fooServiceImpl) // Adds multiple services to the same key
* .addService("foobar", barServiceImpl)
* .addService("foobarOnce", fooServiceImpl, barServiceImpl) // Adds multiple services at once
* // Adds multiple services by list
* .addService("foobarList", ImmutableList.of(fooServiceImpl, barServiceImpl))
* // Adds multiple services by map
* .addServices(ImmutableMap.of("fooMap", fooServiceImpl, "barMap", barServiceImpl))
* // Adds multiple services by multimap
* .addServices(ImmutableMultimap.of("foobarMultiMap", fooServiceImpl,
* "foobarMultiMap", barServiceImpl))
* .build();

Copy link
Contributor

@jrhee17 jrhee17 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks almost done once @trustin 's comments are addressed 👍

* @see ThriftCallService
*/
public final class ThriftCallServiceBuilder {
private final ImmutableMap.Builder<String, Iterable<?>> implementations = ImmutableMap.builder();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of consistently naming implementations -> services? Ditto for the method parameters and validations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, Thank you!

*/
public ThriftCallServiceBuilder addService(String key, Object implementation) {
requireNonNull(implementation, "implementation");
this.implementations.put(key, ImmutableList.of(implementation));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.implementations.put(key, ImmutableList.of(implementation));
implementations.put(key, ImmutableList.of(implementation));

ServiceRequestContext ctx,
Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) {

try {
final TBase<?, ?> tArgs = func.newArgs(args);
if (func.isAsync()) {
invokeAsynchronously(impl, func, tArgs, reply);
if (useBlockingTaskExecutor) {
ctx.blockingTaskExecutor().execute(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I understood that unlike gRPC interceptors, thrift rpcDecorators won't be invoked from the blockingTaskExecutor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct.
I left this comment to call the Thrift RPC decorators from the blockingTaskExecutor but @trustin suggested using the event loop for executing RPC decorators.

new ScheduledThreadPoolExecutor(1, ThreadFactories.newThreadFactory("blocking-test", true)) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
blocking.set(true);
Copy link
Contributor

@jrhee17 jrhee17 May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make blocking=true for any blocking task execution - even if there is a different call path which uses blocking task executors (of which there are many)

Is it possible to just check blocking.set(Thread.currentThread().getName().startsWith("blocking-test")); inside the service call methods instead and remove this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't thought about it.
I appreciate your idea. Thank you.

@ChangguHan ChangguHan requested a review from trustin May 17, 2024 06:59
@ChangguHan ChangguHan requested a review from jrhee17 May 17, 2024 06:59
Copy link
Member

@minwoox minwoox left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Left minor comments. 👍

public static ThriftCallService of(Map<String, ? extends Iterable<?>> implementations) {
requireNonNull(implementations, "implementations");
return new ThriftCallService(implementations);
public static ThriftCallServiceBuilder builder() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
public static ThriftCallServiceBuilder builder() {
@UnstableApi
public static ThriftCallServiceBuilder builder() {

Comment on lines 32 to 33
* * <pre>{@code
* * ThriftCallService service = ThriftCallService
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* * <pre>{@code
* * ThriftCallService service = ThriftCallService
* <pre>{@code
* ThriftCallService service = ThriftCallService

* .addServices(ImmutableMap.of("fooIterableMap",
* ImmutableList.of(fooServiceImpl, barServiceImpl)))
* .build();
* * }</pre>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* * }</pre>
* }</pre>

*
* @see ThriftCallService
*/
public final class ThriftCallServiceBuilder {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public final class ThriftCallServiceBuilder {
@UnstableApi
public final class ThriftCallServiceBuilder {

}

/**
* Adds an service for {@link ThriftServiceEntry}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Adds an service for {@link ThriftServiceEntry}.
* Adds a service for {@link ThriftServiceEntry}.

* Adds an service with a key for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder addService(String key, Object... service) {
requireNonNull(service, "service");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
requireNonNull(service, "service");
requireNonNull(key, "key");
requireNonNull(service, "service");

* Adds services with key by iterable for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder addService(String key, Iterable<?> service) {
if (!service.iterator().hasNext()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!service.iterator().hasNext()) {
requireNonNull(key, "key");
requireNonNull(service, "service");
if (!service.iterator().hasNext()) {

*/
public ThriftCallService build() {
return new ThriftCallService(
Multimaps.asMap(servicesBuilder.build()).entrySet().stream().collect(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we can just call asMap

Suggested change
Multimaps.asMap(servicesBuilder.build()).entrySet().stream().collect(
servicesBuilder.build().asMap().entrySet().stream().collect(

ServiceRequestContext ctx,
Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) {

try {
final TBase<?, ?> tArgs = func.newArgs(args);
if (func.isAsync()) {
invokeAsynchronously(impl, func, tArgs, reply);
if (useBlockingTaskExecutor) {
ctx.blockingTaskExecutor().execute(() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct.
I left this comment to call the Thrift RPC decorators from the blockingTaskExecutor but @trustin suggested using the event loop for executing RPC decorators.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
new feature sprint Issues for OSS Sprint participants
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow useBlockingTaskExecutor in Thrift services
7 participants