Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

goroutine leak when connectionUp(true) return error #675

Closed
kiqi007 opened this issue May 8, 2024 · 3 comments · Fixed by #678
Closed

goroutine leak when connectionUp(true) return error #675

kiqi007 opened this issue May 8, 2024 · 3 comments · Fixed by #678

Comments

@kiqi007
Copy link
Contributor

kiqi007 commented May 8, 2024

Issue Description: Encountered a goroutine leak scenario when using the MQTT package.

	for {
		select {
		case <-m.checkDone:
			hlog.Info("mqtt check done")
			return
		default:
			if m.client == nil || !m.client.IsConnectionOpen() {
				if nil != m.client {
					m.client.Disconnect(1000)
				}
				err := m.Connect() // create a new client, and do connect
				if nil != err {
					hlog.Error("mqtt Connect error", elog.String("host", m.host), elog.Int("port", m.port), elog.FieldErr(err))
				} else {
					hlog.Info("mqtt Connect success", elog.String("host", m.host), elog.Int("port", m.port))
				}
			}
			<-m.connCheckTicker.C
		}
	}
gorouine pprof:
goroutine profile: total 2787
1339 @ 0xe2e556 0xe3edac 0xe3ed86 0xe57ae5 0xe77032 0x16e6c25 0x16e43dc 0xe5ca61
#	0xe57ae4	sync.runtime_Semacquire+0x24							/usr/local/go/src/runtime/sema.go:56
#	0xe77031	sync.(*WaitGroup).Wait+0x51							/usr/local/go/src/sync/waitgroup.go:136
#	0x16e6c24	github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers+0x4a4	/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:608
#	0x16e43db	github.com/eclipse/paho%2emqtt%2egolang.(*client).Connect.func1+0x49b		/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:282

1339 @ 0xe2e556 0xe3edac 0xe3ed86 0xe57c05 0xe755a5 0x16e7cc9 0x16e7c9e 0x16e5cc5 0x16e5c06 0x16e5b87 0xe5ca61
#	0xe57c04	sync.runtime_SemacquireMutex+0x24						/usr/local/go/src/runtime/sema.go:71
#	0xe755a4	sync.(*Mutex).lockSlow+0x164							/usr/local/go/src/sync/mutex.go:162
#	0x16e7cc8	sync.(*Mutex).Lock+0xa8								/usr/local/go/src/sync/mutex.go:81
#	0x16e7c9d	github.com/eclipse/paho%2emqtt%2egolang.(*client).stopCommsWorkers+0x7d		/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:711
#	0x16e5cc4	github.com/eclipse/paho%2emqtt%2egolang.(*client).disconnect+0x24		/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:498
#	0x16e5c05	github.com/eclipse/paho%2emqtt%2egolang.(*client).Disconnect.func1.1+0x25	/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:451
#	0x16e5b86	github.com/eclipse/paho%2emqtt%2egolang.(*client).Disconnect.func1+0x4a6	/home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:470

Local Reproduction Steps: Perform the following code changes and simulate network fluctuations.

  • Several key nodes are marked in the code through issue comments
// github.com/eclipse/paho.mqtt.golang/client.go:574 startCommsWorkers
		c.workers.Add(1)
		go keepalive(c, conn)  // issue:  This goroutine will change the connect status to disconnecting
	}

	// matchAndDispatch will process messages received from the network. It may generate acknowledgements
	// It will complete when incomingPubChan is closed and will close ackOut prior to exiting
	incomingPubChan := make(chan *packets.PublishPacket)
	c.workers.Add(1) // Done will be called when ackOut is closed
	ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)

	time.Sleep(10 * time.Second) // issue:  Simulate the situation where keepalive has already converted status to disconnecting, but connectionUp has not yet been executed

	// The connection is now ready for use (we spin up a few go routines below). It is possible that
	// Disconnect has been called in the interim...
	if err := connectionUp(true); err != nil {
		DEBUG.Println(CLI, err)
		close(c.stop) // Tidy up anything we have already started
		close(incomingPubChan)
		c.workers.Wait()       // issue: Goroutine will block at this location
		c.conn.Close()

Issue Cause: Within the code mentioned above, there is no call to Done() between Add() and Wait(), which causes the Wait() method to block indefinitely. To resolve this issue, a call to Done() needs to be properly placed.

@MattBrittan
Copy link
Contributor

Hmmm, so we probably need to drain ackOut and then call c.workers.Done() before calling c.workers.Wait().

@kiqi007
Copy link
Contributor Author

kiqi007 commented May 10, 2024

Hey there! Any idea when we might tackle this issue? Also, could anyone lend a hand with fixing it?
I stumbled upon this while assisting my colleagues in debugging a memory leak, but I'm not very familiar with the MQTT library itself. So, if someone could help fix this problem, that would be great!

@MattBrittan
Copy link
Contributor

@kiqi007 I flagged it as "Help Wanted" because I doubt it's something I will get around to in the foreseeable future (don't ever call Disconnect personally and am investing more time in the v5 client).

MattBrittan added a commit that referenced this issue May 20, 2024
If `Disconnect` was called whilst a connection attempt was in progress a goroutine leak occurred. This change allows the connection attempt to complete as normal (including calling the `OnConnect` callback) before the Disconnect is handled.

closes #675
# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants