Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Stream is already completed, no further calls are allowed #4829

Open
2 of 3 tasks
9997766 opened this issue Apr 12, 2024 · 2 comments
Open
2 of 3 tasks

[Bug] Stream is already completed, no further calls are allowed #4829

9997766 opened this issue Apr 12, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@9997766
Copy link

9997766 commented Apr 12, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

Environment

Linux

EventMesh version

1.10.0

What happened

我这应用系统集成1.10的sdk,对接eventmesh服务端,用grpc协议接收消息,rocketmq是双机集群,发现有时能收到消息有时收不到,
eventmesh 服务端总是报如下错误,

2024-04-11 19:15:24,290 ERROR [eventMesh-grpc-pushMsg-2] StreamPushRequest(StreamPushRequest.java:91) - message|eventMesh2client|exception=Stream is already completed, no further calls are allowed |emitter|topic=TEST-TOPIC-GRPC-SYNC|bizSeqNo=949462082066922981761046640402|uniqueId=988003831724711254497105493010|cost=0
java.lang.IllegalStateException: Stream is already completed, no further calls are allowed
at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.0.1-jre.jar:?]
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:375) ~[grpc-stub-1.43.2.jar:1.43.2]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.StreamPushRequest.tryPushRequest(StreamPushRequest.java:81) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler.lambda$handle$1(MessageHandler.java:75) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

我把EventEmitter 这类改成如下方式,客户端肯定能收到消息,没有消息会丢或者收不到。
但是clustering模式失效了,变成广播模式接收了,即所有节点能收到所有的消息。
不知道作者们,能不能处理一下这个问题。

package org.apache.eventmesh.runtime.core.protocol.grpc.service;

import io.grpc.stub.StreamObserver;

import lombok.extern.slf4j.Slf4j;

@slf4j
public class EventEmitter {
private boolean inError = false;

private final StreamObserver<T> emitter;

public EventEmitter(StreamObserver<T> emitter) {
    this.emitter = emitter;
}

public synchronized void onNext(T event) {
    if (inError) {
        return;
    }
    try {
        emitter.onNext(event);
    } catch (final Exception e) {
        log.warn("StreamObserver Error onNext. {}", e.getMessage());
        onError(e);
    }

// try {
// emitter.onNext(event);
// } catch (Exception e) {
// log.warn("StreamObserver Error onNext. {}", e.getMessage());
// }
}

public synchronized void onCompleted() {
    if (inError) {
        return;
    }
    try {
        emitter.onCompleted();
    } catch (final Exception e) {
        log.warn("StreamObserver Error onCompleted. {}", e.getMessage());
        onError(e);
    }

// try {
// emitter.onCompleted();
// } catch (Exception e) {
// log.warn("StreamObserver Error onCompleted. {}", e.getMessage());
// }
}

public synchronized void onError(Throwable t) {

    emitter.onError(t);
    inError = true;
    log.warn("StreamObserver Error onError. {}", t.getMessage());

// try {
// emitter.onError(t);
// } catch (Exception e) {
// log.warn("StreamObserver Error onError. {}", e.getMessage());
// }
}

public StreamObserver<T> getEmitter() {
    return emitter;
}

}

How to reproduce

我这应用系统集成1.10的sdk,对接eventmesh服务端,用grpc协议接收消息,发现有时能收到消息有时收不到,
eventmesh 服务端总是报错。

Debug logs

2024-04-11 19:15:24,290 ERROR [eventMesh-grpc-pushMsg-2] StreamPushRequest(StreamPushRequest.java:91) - message|eventMesh2client|exception=Stream is already completed, no further calls are allowed |emitter|topic=TEST-TOPIC-GRPC-SYNC|bizSeqNo=949462082066922981761046640402|uniqueId=988003831724711254497105493010|cost=0
java.lang.IllegalStateException: Stream is already completed, no further calls are allowed
        at com.google.common.base.Preconditions.checkState(Preconditions.java:502) ~[guava-31.0.1-jre.jar:?]
        at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:375) ~[grpc-stub-1.43.2.jar:1.43.2]
        at org.apache.eventmesh.runtime.core.protocol.grpc.push.StreamPushRequest.tryPushRequest(StreamPushRequest.java:81) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
        at org.apache.eventmesh.runtime.core.protocol.grpc.push.MessageHandler.lambda$handle$1(MessageHandler.java:75) ~[eventmesh-runtime-1.10.0-release.jar:1.10.0-release]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@9997766 9997766 added the bug Something isn't working label Apr 12, 2024
@Pil0tXia Pil0tXia changed the title Stream is already completed, no further calls are allowed [Bug] Stream is already completed, no further calls are allowed Apr 12, 2024
@Pil0tXia
Copy link
Member

Please correct markdown code block, otherwise it is un-readable.

@Pil0tXia
Copy link
Member

You may submit a PR and it will be easier to see changes before and after under diff view.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants