diff --git a/docs/release-notes/release-notes-0.20.0.md b/docs/release-notes/release-notes-0.20.0.md index 200efe96c4..5ad714e3c6 100644 --- a/docs/release-notes/release-notes-0.20.0.md +++ b/docs/release-notes/release-notes-0.20.0.md @@ -132,6 +132,10 @@ reader of a payment request. - [Refactored](https://github.com/lightningnetwork/lnd/pull/10018) `channelLink` to improve readability and maintainability of the code. +- Remove unnecessary + [disconnect](https://github.com/lightningnetwork/lnd/pull/10031) and properly + wait for goroutine to finish when shutting down. + ## Breaking Changes ## Performance Improvements diff --git a/peer/brontide.go b/peer/brontide.go index 5b0d84f2f3..52b3c7d832 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -775,7 +775,9 @@ func NewBrontide(cfg Config) *Brontide { // Start starts all helper goroutines the peer needs for normal operations. In // the case this peer has already been started, then this function is a noop. func (p *Brontide) Start() error { - if atomic.AddInt32(&p.started, 1) != 1 { + if !atomic.CompareAndSwapInt32(&p.started, 0, 1) { + p.log.Warn("already started") + return nil } @@ -1579,17 +1581,29 @@ func (p *Brontide) maybeSendChannelUpdates() { // calling Disconnect will signal the quit channel and the method will not // block, since no goroutines were spawned. func (p *Brontide) WaitForDisconnect(ready chan struct{}) { + p.log.Trace("waiting for disconnect") + defer p.log.Trace("peer disconnected") + // Before we try to call the `Wait` goroutine, we'll make sure the main // set of goroutines are already active. select { case <-p.startReady: + p.log.Trace("startReady received, waiting for signal ready") + case <-p.cg.Done(): + p.log.Trace("peer quit, exit waiting for signal startReady") + return } select { case <-ready: + p.log.Trace("ready received, waiting goroutines to finish") + case <-p.cg.Done(): + p.log.Trace("peer quit, exit waiting for signal ready") + + return } p.cg.WgWait() @@ -1604,6 +1618,9 @@ func (p *Brontide) WaitForDisconnect(ready chan struct{}) { // the peer has finished starting up before calling this method. func (p *Brontide) Disconnect(reason error) { if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) { + p.log.Warnf("got disconnect reason [%v], but peer already "+ + "disconnected", reason) + return } @@ -1614,11 +1631,13 @@ func (p *Brontide) Disconnect(reason error) { // started, otherwise we will skip reading it as this chan won't be // closed, hence blocks forever. if atomic.LoadInt32(&p.started) == 1 { - p.log.Debugf("Peer hasn't finished starting up yet, waiting " + - "on startReady signal before closing connection") + p.log.Debug("waiting on startReady signal before closing " + + "connection") select { case <-p.startReady: + p.log.Debug("startReady received") + case <-p.cg.Done(): return } @@ -2047,6 +2066,9 @@ func (p *Brontide) readHandler() { // gossiper? p.initGossipSync() + // exitErr is the error to be used when disconnect the peer. + var exitErr error + discStream := newDiscMsgStream(p) discStream.Start() defer discStream.Stop() @@ -2060,7 +2082,8 @@ out: } } if err != nil { - p.log.Infof("unable to read message from peer: %v", err) + p.log.Debugf("unable to read message from peer: %v", + err) // If we could not read our peer's message due to an // unknown type or invalid alias, we continue processing @@ -2098,6 +2121,7 @@ out: // didn't recognize, then we'll stop all processing as // this is a fatal error. default: + exitErr = err break out } } @@ -2182,8 +2206,7 @@ out: err := p.resendChanSyncMsg(targetChan) if err != nil { // TODO(halseth): send error to peer? - p.log.Errorf("resend failed: %v", - err) + p.log.Errorf("resend failed: %v", err) } } @@ -2242,9 +2265,13 @@ out: idleTimer.Reset(idleTimeout) } - p.Disconnect(errors.New("read handler closed")) + // Disconnect the peer on exitErr, but only if the peer hasn't been + // disconnected before. + if atomic.LoadInt32(&p.disconnect) == 0 { + p.Disconnect(exitErr) + } - p.log.Trace("readHandler for peer done") + p.log.Debugf("peer quit, exit readHandler") } // handleCustomMessage handles the given custom message if a handler is @@ -2729,7 +2756,7 @@ out: } case <-p.cg.Done(): - exitErr = lnpeer.ErrPeerExiting + p.log.Debug("peer quit, exit writeHandler") break out } } @@ -2738,7 +2765,9 @@ out: // disconnect. p.cg.WgDone() - p.Disconnect(exitErr) + if exitErr != nil { + p.Disconnect(exitErr) + } p.log.Trace("writeHandler for peer done") } diff --git a/server.go b/server.go index 7eb8c583da..93d02bb648 100644 --- a/server.go +++ b/server.go @@ -5209,7 +5209,12 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { // // NOTE: We call it in a goroutine to avoid blocking the main server // goroutine because we might hold the server's mutex. - go peer.Disconnect(fmt.Errorf("server: DisconnectPeer called")) + s.wg.Add(1) + go func() { + defer s.wg.Done() + + peer.Disconnect(fmt.Errorf("server: DisconnectPeer called")) + }() return nil }