Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

goroutine leak when connectionUp(true) return error #675

Closed
kiqi007 opened this issue May 8, 2024 · 3 comments · Fixed by #678
Closed

goroutine leak when connectionUp(true) return error #675

kiqi007 opened this issue May 8, 2024 · 3 comments · Fixed by #678

Comments

@kiqi007
Copy link
Contributor

kiqi007 commented May 8, 2024

Issue Description: Encountered a goroutine leak scenario when using the MQTT package.

	for {
		select {
		case <-m.checkDone:
			hlog.Info("mqtt check done")
			return
		default:
			if m.client == nil || !m.client.IsConnectionOpen() {
				if nil != m.client {
					m.client.Disconnect(1000)
				}
				err := m.Connect() // create a new client, and do connect
				if nil != err {
					hlog.Error("mqtt Connect error", elog.String("host", m.host), elog.Int("port", m.port), elog.FieldErr(err))
				} else {
					hlog.Info("mqtt Connect success", elog.String("host", m.host), elog.Int("port", m.port))
				}
			}
			<-m.connCheckTicker.C
		}
	}
gorouine pprof:
goroutine profile: total 2787
1339 @ 0xe2e556 0xe3edac 0xe3ed86 0xe57ae5 0xe77032 0x16e6c25 0x16e43dc 0xe5ca61
#	0xe57ae4	sync.runtime_Semacquire+0x24							/usr/local/go/src/runtime/sema.go:56
#	0xe77031	sync.(*WaitGroup).Wait+0x51							/usr/local/go/src/sync/waitgroup.go:136
#	0x16e6c24	github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers+0x4a4	/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:608
#	0x16e43db	github.com/eclipse/paho%2emqtt%2egolang.(*client).Connect.func1+0x49b		/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:282

1339 @ 0xe2e556 0xe3edac 0xe3ed86 0xe57c05 0xe755a5 0x16e7cc9 0x16e7c9e 0x16e5cc5 0x16e5c06 0x16e5b87 0xe5ca61
#	0xe57c04	sync.runtime_SemacquireMutex+0x24						/usr/local/go/src/runtime/sema.go:71
#	0xe755a4	sync.(*Mutex).lockSlow+0x164							/usr/local/go/src/sync/mutex.go:162
#	0x16e7cc8	sync.(*Mutex).Lock+0xa8								/usr/local/go/src/sync/mutex.go:81
#	0x16e7c9d	github.com/eclipse/paho%2emqtt%2egolang.(*client).stopCommsWorkers+0x7d		/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:711
#	0x16e5cc4	github.com/eclipse/paho%2emqtt%2egolang.(*client).disconnect+0x24		/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:498
#	0x16e5c05	github.com/eclipse/paho%2emqtt%2egolang.(*client).Disconnect.func1.1+0x25	/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:451
#	0x16e5b86	github.com/eclipse/paho%2emqtt%2egolang.(*client).Disconnect.func1+0x4a6	/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:470

Local Reproduction Steps: Perform the following code changes and simulate network fluctuations.

  • Several key nodes are marked in the code through issue comments
// github.com/eclipse/paho.mqtt.golang/client.go:574 startCommsWorkers
		c.workers.Add(1)
		go keepalive(c, conn)  // issue:  This goroutine will change the connect status to disconnecting
	}

	// matchAndDispatch will process messages received from the network. It may generate acknowledgements
	// It will complete when incomingPubChan is closed and will close ackOut prior to exiting
	incomingPubChan := make(chan *packets.PublishPacket)
	c.workers.Add(1) // Done will be called when ackOut is closed
	ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)

	time.Sleep(10 * time.Second) // issue:  Simulate the situation where keepalive has already converted status to disconnecting, but connectionUp has not yet been executed

	// The connection is now ready for use (we spin up a few go routines below). It is possible that
	// Disconnect has been called in the interim...
	if err := connectionUp(true); err != nil {
		DEBUG.Println(CLI, err)
		close(c.stop) // Tidy up anything we have already started
		close(incomingPubChan)
		c.workers.Wait()       // issue: Goroutine will block at this location
		c.conn.Close()

Issue Cause: Within the code mentioned above, there is no call to Done() between Add() and Wait(), which causes the Wait() method to block indefinitely. To resolve this issue, a call to Done() needs to be properly placed.

@MattBrittan
Copy link
Contributor

Hmmm, so we probably need to drain ackOut and then call c.workers.Done() before calling c.workers.Wait().

@kiqi007
Copy link
Contributor Author

kiqi007 commented May 10, 2024

Hey there! Any idea when we might tackle this issue? Also, could anyone lend a hand with fixing it?
I stumbled upon this while assisting my colleagues in debugging a memory leak, but I'm not very familiar with the MQTT library itself. So, if someone could help fix this problem, that would be great!

@MattBrittan
Copy link
Contributor

@kiqi007 I flagged it as "Help Wanted" because I doubt it's something I will get around to in the foreseeable future (don't ever call Disconnect personally and am investing more time in the v5 client).

MattBrittan added a commit that referenced this issue May 20, 2024
If `Disconnect` was called whilst a connection attempt was in progress a goroutine leak occurred. This change allows the connection attempt to complete as normal (including calling the `OnConnect` callback) before the Disconnect is handled.

closes #675
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants