Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception caused by suspend lambda of kotlin that implements java.util.function.Consumer #1081

Open
tonny1983 opened this issue Oct 17, 2023 · 2 comments

Comments

@tonny1983
Copy link

tonny1983 commented Oct 17, 2023

Describe the bug
According to the docs, a lambda can be used to implement java.utl.function.Cusumer.

However, if calls a suspend method in the body, the lambda must add suspend keywords which casuses an exception of java.lang.UnsupportedOperationException: Multi argument Kotlin functions are not currently supported.

Sample
Suppose I have a DataRepository interface which extends CoroutineCrudRepository, hence the DataRepository#save method is a suspend method (In the CoroutineCrudRepository interface, it is suspend fun <S : T> save(entity: S): T).

And the function class is

@Configuration
class DataFunction {
    @Bean
    fun consumerData(dataRepository: DataRepository): suspend (Data) -> Unit =
         {
            val result = dataRepository.save(it)
        }
}

and it cause the exception

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Multi argument Kotlin functions are not currently supported
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:598)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:520)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:325)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:323)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:942)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:608)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
	at org.springframework.boot.test.context.SpringBootContextLoader.lambda$loadContext$3(SpringBootContextLoader.java:137)
	at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:58)
	at org.springframework.util.function.ThrowingSupplier.get(ThrowingSupplier.java:46)
	at org.springframework.boot.SpringApplication.withHook(SpringApplication.java:1409)
	at org.springframework.boot.test.context.SpringBootContextLoader$ContextLoaderHook.run(SpringBootContextLoader.java:545)
	at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:137)
	at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:108)
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:187)
	at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:119)
	at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:127)
	at io.kotest.extensions.spring.SpringAutowireConstructorExtension.instantiate(SpringAutowireConstructorExtension.kt:27)
	at io.kotest.engine.spec.InstantiateSpecKt.createAndInitializeSpec(instantiateSpec.kt:30)
	at io.kotest.engine.spec.InstantiateSpecKt.instantiate(instantiateSpec.kt:11)
	at io.kotest.engine.spec.SpecRefKt.instance(SpecRef.kt:14)
	at io.kotest.engine.spec.SpecExecutor.createInstance-gIAlu-s(SpecExecutor.kt:63)
	at io.kotest.engine.spec.SpecExecutor.access$createInstance-gIAlu-s(SpecExecutor.kt:24)
	at io.kotest.engine.spec.SpecExecutor$execute$innerExecute$1.invokeSuspend(SpecExecutor.kt:40)
	at io.kotest.engine.spec.SpecExecutor$execute$innerExecute$1.invoke(SpecExecutor.kt)
	at io.kotest.engine.spec.SpecExecutor$execute$innerExecute$1.invoke(SpecExecutor.kt)
	at io.kotest.engine.spec.interceptor.ref.FinalizeSpecInterceptor.intercept-0E7RQCE(FinalizeSpecInterceptor.kt:24)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.BeforeSpecStateInterceptor.intercept-0E7RQCE(BeforeSpecStateInterceptor.kt:25)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.PrepareSpecInterceptor.intercept-0E7RQCE(PrepareSpecInterceptor.kt:25)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.ApplyExtensionsInterceptor.intercept-0E7RQCE(ApplyExtensionsInterceptor.kt:36)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.SpecFinishedInterceptor.intercept-0E7RQCE(SpecFinishedInterceptor.kt:22)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.SpecStartedInterceptor.intercept-0E7RQCE(SpecStartedInterceptor.kt:20)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor$intercept$inner$1.invokeSuspend(SpecRefExtensionInterceptor.kt:28)
	at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor$intercept$inner$1.invoke(SpecRefExtensionInterceptor.kt)
	at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor$intercept$inner$1.invoke(SpecRefExtensionInterceptor.kt)
	at io.kotest.engine.spec.interceptor.ref.SpecRefExtensionInterceptor.intercept-0E7RQCE(SpecRefExtensionInterceptor.kt:31)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.RequiresTagInterceptor.intercept-0E7RQCE(RequiresTagInterceptor.kt:35)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.TagsInterceptor.intercept-0E7RQCE(TagsInterceptor.kt:38)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.SystemPropertySpecFilterInterceptor.intercept-0E7RQCE(SystemPropertySpecFilterInterceptor.kt:48)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.SpecFilterInterceptor.intercept-0E7RQCE(SpecFilterInterceptor.kt:38)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.IgnoredSpecInterceptor.intercept-0E7RQCE(IgnoredSpecInterceptor.kt:51)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.EnabledIfInterceptor.intercept-0E7RQCE(EnabledIfInterceptor.kt:41)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.ref.RequiresPlatformInterceptor.intercept-0E7RQCE(RequiresPlatformInterceptor.kt:32)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invokeSuspend(SpecRefInterceptorPipeline.kt:49)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline$execute$3$1.invoke(SpecRefInterceptorPipeline.kt)
	at io.kotest.engine.spec.interceptor.SpecRefInterceptorPipeline.execute-0E7RQCE(SpecRefInterceptorPipeline.kt:50)
	at io.kotest.engine.spec.SpecExecutor.execute(SpecExecutor.kt:42)
	at io.kotest.engine.ConcurrentTestSuiteScheduler$schedule$8$1$1$2.invokeSuspend(ConcurrentTestSuiteScheduler.kt:72)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:280)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at io.kotest.common.RunBlockingKt.runBlocking(runBlocking.kt:3)
	at io.kotest.engine.launcher.MainKt.main(main.kt:34)
Caused by: java.lang.UnsupportedOperationException: Multi argument Kotlin functions are not currently supported
	at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.getFunctionRegistration(KotlinLambdaToFunctionAutoConfiguration.java:200)
	at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.lookup(BeanFactoryAwareFunctionRegistry.java:165)
	at org.springframework.cloud.function.context.FunctionCatalog.lookup(FunctionCatalog.java:39)
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionBindingRegistrar.afterPropertiesSet(FunctionConfiguration.java:877)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1817)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1766)
	... 103 common frames omitted

Other tries
If using kotlinx.coroutines.flow.Flow, a non-suspend method CoroutineCrudRepository#saveAll can be used. Therefore, the lambda signature can be changed to (Flow<Data>) -> Unit which causes no exceptions in startup.

However, if call the function with a message, another exception occurs:

Caused by: java.lang.ClassCastException: class Data cannot be cast to class kotlinx.coroutines.flow.Flow (Data and kotlinx.coroutines.flow.Flow are in unnamed module of loader 'app')
	at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.invoke(KotlinLambdaToFunctionAutoConfiguration.java:124)
	at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.apply(KotlinLambdaToFunctionAutoConfiguration.java:99)
	at org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration$KotlinFunctionWrapper.accept(KotlinLambdaToFunctionAutoConfiguration.java:146)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:1029)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:731)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577)
	at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:92)
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:832)
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:661)
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)

ugly workaound
Calling suspend method in a runBlocking block that makes the Consumer itself be not in a suspend one.

The version of spring-cloud-function is 4.0.5 (spring-cloud 2022.0.4).

@olegz
Copy link
Contributor

olegz commented Oct 23, 2023

Have you looked at one of our tests with coroutines? -

@tonny1983
Copy link
Author

tonny1983 commented Oct 26, 2023

Hi @olegz ,

Thanks a lot for your kind reply.

The test code you mentioned is written in Java, while I use kotest in my project.

When comparing these codes, I got some further information.

1. when "Multi argument Kotlin functions are not currently supported" exception occurs
The exception occurs when using a raw object as input parameter, viz., suspend (Data) -> Unit will cause the exception but suspend (Flow<Data>) -> Unit will not.

The reason of this result is due to isValidKotlinSuspendConsumer method defined in KotlinLambdaToFunctionAutoConfiguration class which does 4 checks:

isTypeRepresentedByClass(functionType, Function2.class) &&
type.length == 3 &&
CoroutinesUtils.isFlowType(type[0]) &&
CoroutinesUtils.isContinuationUnitType(type[1])

Obviously, the third one which checks whether the input patameter is a Flow type or not failed. I'm not sure whethere it is an expected behaviour because no ducoments describe using coroutine in Spring Cloud Function, but it is a normal case to use raw object rather than a Flow object as a suspend method's paramter using kotlin coroutine (CoroutineCrudRepository is just a great example).

2. the problem of using Flow<*>
In your mentioned Java test code, I rewrote it to kotlin style and added something to test result of the function:

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KotlinSuspendFunctionJUnitTest {
    private lateinit var context: GenericApplicationContext

    private lateinit var catalog: FunctionCatalog

    @AfterEach
    fun close() {
        context.close()
    }

    @Test
    fun typeDiscoveryTests() {
        create(arrayOf(KotlinSuspendFlowLambdasConfiguration::class.java))
        val functionCatalog = context.getBean(FunctionCatalog::class.java)
        val kotlinFunction = functionCatalog.lookup<FunctionInvocationWrapper>("kotlinFunction")
        Assertions.assertThat(kotlinFunction.isFunction).isTrue()
        Assertions.assertThat(kotlinFunction.inputType.typeName)
            .isEqualTo("reactor.core.publisher.Flux<java.lang.String>")
        Assertions.assertThat(kotlinFunction.outputType.typeName)
            .isEqualTo("reactor.core.publisher.Flux<java.lang.String>")
        // The following code runs correctly
        val result = kotlinFunction.apply(Flux.just("abcd")) as  Flux<String>
        StepVerifier.create(result).expectNext("ABCD").verifyComplete()
    }

    private fun create(types: Array<Class<*>>, vararg props: String) {
        context = SpringApplicationBuilder(*types).properties(*props).run() as GenericApplicationContext
        catalog = context.getBean(FunctionCatalog::class.java)
    }
}

As you can see, if using Flux which is the assertion checked input and output type, the test passes without errors.
However, if trying to use Flow which is just as the same as the definition of the kotlinFunction method, an exception occured and the test failed:

  • CASE 1: Flow as input and Flux as output
val result1 = kotlinFunction.apply(flowOf("abcd"))
val result2 = result1 as Flux<String>
StepVerifier.create(result2).expectNext("ABCD").verifyComplete()

An exception occurs at the expectNext assertion:

expectation "expectNext(ABCD)" failed (expected: onNext(ABCD); actual: onError(java.lang.ClassCastException: class kotlinx.coroutines.flow.FlowKt__BuildersKt$flowOf$$inlined$unsafeFlow$2 cannot be cast to class java.lang.String (kotlinx.coroutines.flow.FlowKt__BuildersKt$flowOf$$inlined$unsafeFlow$2 is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')))
java.lang.AssertionError: expectation "expectNext(ABCD)" failed (expected: onNext(ABCD); actual: onError(java.lang.ClassCastException: class kotlinx.coroutines.flow.FlowKt__BuildersKt$flowOf$$inlined$unsafeFlow$2 cannot be cast to class java.lang.String (kotlinx.coroutines.flow.FlowKt__BuildersKt$flowOf$$inlined$unsafeFlow$2 is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')))
	at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
	at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
	at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
	at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
	at reactor.test.DefaultStepVerifierBuilder.lambda$addExpectedValue$10(DefaultStepVerifierBuilder.java:509)
	at reactor.test.DefaultStepVerifierBuilder$SignalEvent.test(DefaultStepVerifierBuilder.java:2289)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignal(DefaultStepVerifierBuilder.java:1529)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1477)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1129)
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222)
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:255)
	at kotlinx.coroutines.reactive.FlowSubscription.flowProcessing(ReactiveFlow.kt:215)
	at kotlinx.coroutines.reactive.FlowSubscription.access$flowProcessing(ReactiveFlow.kt:187)
	at kotlinx.coroutines.reactive.FlowSubscription$createInitialContinuation$1$1.invoke(ReactiveFlow.kt:204)
	at kotlinx.coroutines.reactive.FlowSubscription$createInitialContinuation$1$1.invoke(ReactiveFlow.kt:204)
	at kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt$createCoroutineUnintercepted$$inlined$createCoroutineFromSuspendFunction$IntrinsicsKt__IntrinsicsJvmKt$2.invokeSuspend(IntrinsicsJvm.kt:270)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:68)
	at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:375)
	at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
	at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:25)
	at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:110)
	at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126)
	at kotlinx.coroutines.reactor.MonoKt.monoInternal$lambda$2(Mono.kt:92)
	at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:58)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8773)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:891)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:831)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:823)
	at reactor.test.DefaultStepVerifierBuilder.verifyComplete(DefaultStepVerifierBuilder.java:690)
  • CASE 2: Flux as input and Flow as output
val result1 = kotlinFunction.apply(Flux.just("abcd"))
val result2 = result1 as Flow<String>

An exception occurs on the type cast:

class reactor.core.publisher.FluxMap cannot be cast to class kotlinx.coroutines.flow.Flow (reactor.core.publisher.FluxMap and kotlinx.coroutines.flow.Flow are in unnamed module of loader 'app')
java.lang.ClassCastException: class reactor.core.publisher.FluxMap cannot be cast to class kotlinx.coroutines.flow.Flow (reactor.core.publisher.FluxMap and kotlinx.coroutines.flow.Flow are in unnamed module of loader 'app')
	at cc.tonny.springfunctiondemo.KotlinSuspendFunctionKotest$1$1.invokeSuspend(KotlinSuspendFunctionKotest.kt:27)
	at cc.tonny.springfunctiondemo.KotlinSuspendFunctionKotest$1$1.invoke(KotlinSuspendFunctionKotest.kt)
  • CASE 3: Flow as input and Flow as output
    The result is as the same as CASE 2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants