Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FetchMessage returns io.EOF when reader not closed #1222

Open
zydovech opened this issue Nov 3, 2023 · 0 comments
Open

FetchMessage returns io.EOF when reader not closed #1222

zydovech opened this issue Nov 3, 2023 · 0 comments

Comments

@zydovech
Copy link

zydovech commented Nov 3, 2023

The comments of FetchMessage state that when io.EOF is returned, it means that the reader has been closed.

However, I encountered a scenario where the reader did not closed and returned io.EOF.

described as follows:

I test to exit kafka server gracefully. Will trigger rebalance .

If I stop the consumer group leader, such as 127.0.0.1:9092,The coordinator function will try to call findCoordinator to find the coordinator,I added the log。

func (cg *ConsumerGroup) coordinator() (coordinator, error) {
	// NOTE : could try to cache the coordinator to avoid the double connect
	//        here.  since consumer group balances happen infrequently and are
	//        an expensive operation, we're not currently optimizing that case
	//        in order to keep the code simpler.
	conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...)
	if err != nil {
		fmt.Println("------coordinator conn----------------", conn, err)
		return nil, err
	}
	tt, _ := conn.(*timeoutCoordinator)
	defer conn.Close()

	fmt.Println("----------------------coordinator---------------------------------", tt.conn.conn.RemoteAddr().String(), tt.conn.conn.LocalAddr().String())
	out, err := conn.findCoordinator(findCoordinatorRequestV0{
		CoordinatorKey: cg.config.ID,
	})
	if err == nil && out.ErrorCode != 0 {
		err = Error(out.ErrorCode)
	}
	if err != nil {
		fmt.Println("------coordinator findCoordinator----------------", err)
		return nil, err
	}

	address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port)))
	return cg.config.connect(cg.config.Dialer, address)
}

The output log is as follows:

1. first 
----------------------coordinator--------------------------------- 127.0.0.1:9092 127.0.0.1:50589
------coordinator findCoordinator---------------- EOF

2. second
----------------------coordinator--------------------------------- 127.0.0.1:9092 127.0.0.1:50600
------coordinator findCoordinator---------------- EOF

3. third
----------------------coordinator--------------------------------- 127.0.0.1:9092 127.0.0.1:50612
------coordinator findCoordinator---------------- EOF

The connection was established three times, but because the kafka server had stopped at this time, the connection was closed, and an EOF error was returned three times.

Eventually the error was exposed to the business layer. But at this time, the reader did not close. So can we use other flag to indicate reader close?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant