@@ -10,7 +10,6 @@ import (
1010 "github.com/libp2p/go-libp2p/core/host"
1111 "github.com/libp2p/go-libp2p/core/network"
1212 "github.com/libp2p/go-libp2p/core/peer"
13- "github.com/libp2p/go-libp2p/core/protocol"
1413 "github.com/pkg/errors"
1514 "github.com/rs/zerolog"
1615
@@ -19,8 +18,6 @@ import (
1918 "github.com/zeta-chain/go-tss/p2p"
2019)
2120
22- var signatureNotifierProtocol protocol.ID = "/p2p/signatureNotifier"
23-
2421type signatureItem struct {
2522 messageID string
2623 peerID peer.ID
@@ -37,11 +34,16 @@ type SignatureNotifier struct {
3734 host host.Host
3835 notifierLock * sync.Mutex
3936 notifiers map [string ]* notifier
40- streamMgr * p2p.StreamMgr
37+ streamMgr * p2p.StreamManager
4138}
4239
4340// NewSignatureNotifier create a new instance of SignatureNotifier
4441func NewSignatureNotifier (host host.Host , logger zerolog.Logger ) * SignatureNotifier {
42+ logger = logger .With ().
43+ Str (logs .Component , "signature_notifier" ).
44+ Stringer (logs .Host , host .ID ()).
45+ Logger ()
46+
4547 ctx , cancel := context .WithCancel (context .Background ())
4648
4749 s := & SignatureNotifier {
@@ -50,14 +52,11 @@ func NewSignatureNotifier(host host.Host, logger zerolog.Logger) *SignatureNotif
5052 host : host ,
5153 notifierLock : & sync.Mutex {},
5254 notifiers : make (map [string ]* notifier ),
53- streamMgr : p2p .NewStreamMgr (logger ),
54- logger : logger .With ().
55- Str (logs .Component , "signature_notifier" ).
56- Stringer (logs .Host , host .ID ()).
57- Logger (),
55+ streamMgr : p2p .NewStreamManager (logger ),
56+ logger : logger ,
5857 }
5958
60- host .SetStreamHandler (signatureNotifierProtocol , s .handleStream )
59+ host .SetStreamHandler (p2p . ProtocolSignatureNotifier , s .handleStream )
6160
6261 return s
6362}
@@ -106,60 +105,68 @@ func (s *SignatureNotifier) cleanupStaleNotifiers() {
106105
107106// HandleStream handle signature notify stream
108107func (s * SignatureNotifier ) handleStream (stream network.Stream ) {
109- remotePeer := stream .Conn ().RemotePeer ()
110- logger := s .logger .With ().Str ("remote peer" , remotePeer .String ()).Logger ()
108+ logger := s .logger .With ().
109+ Stringer (logs .Peer , stream .Conn ().RemotePeer ()).
110+ Logger ()
111+
111112 logger .Debug ().Msg ("reading signature notifier message" )
113+
112114 payload , err := p2p .ReadStreamWithBuffer (stream )
113115 if err != nil {
114116 logger .Err (err ).Msg ("fail to read payload from stream" )
115- s .streamMgr .AddStream ( "UNKNOWN" , stream )
117+ s .streamMgr .StashUnknown ( stream )
116118 return
117119 }
120+
118121 // we tell the sender we have received the message
119- err = p2p .WriteStreamWithBuffer ([]byte ("done" ), stream )
122+ err = p2p .WriteStreamWithBuffer ([]byte (p2p . ResponseMessage ), stream )
120123 if err != nil {
121- logger .Error ().Err (err ).Stringer ( logs . Peer , remotePeer ). Msg ("Fail to write the reply to peer" )
124+ logger .Error ().Err (err ).Msg ("Fail to write the reply to peer" )
122125 }
126+
123127 var msg messages.KeysignSignature
124128 if err := proto .Unmarshal (payload , & msg ); err != nil {
125129 logger .Err (err ).Msg ("fail to unmarshal join party request" )
126- s .streamMgr .AddStream ( "UNKNOWN" , stream )
130+ s .streamMgr .StashUnknown ( stream )
127131 return
128132 }
129- s .streamMgr .AddStream (msg .ID , stream )
133+
134+ s .streamMgr .Stash (msg .ID , stream )
135+
136+ success := msg .KeysignStatus == messages .KeysignSignature_Success && len (msg .Signatures ) > 0
137+ if ! success {
138+ return
139+ }
140+
130141 var signatures []* common.SignatureData
131- if len (msg .Signatures ) > 0 && msg .KeysignStatus == messages .KeysignSignature_Success {
132- for _ , el := range msg .Signatures {
133- var signature common.SignatureData
134- if err := proto .Unmarshal (el , & signature ); err != nil {
135- logger .Error ().Err (err ).Msg ("fail to unmarshal signature data" )
136- return
137- }
138- signatures = append (signatures , & signature )
142+ for _ , el := range msg .Signatures {
143+ var signature common.SignatureData
144+ if err := proto .Unmarshal (el , & signature ); err != nil {
145+ logger .Error ().Err (err ).Msg ("fail to unmarshal signature data" )
146+ return
139147 }
140148
141- _ , err = s .createOrUpdateNotifier (msg .ID , nil , "" , signatures , defaultNotifierTTL )
142- if err != nil {
143- s .logger .Error ().Err (err ).Msg ("fail to update notifier" )
144- }
149+ signatures = append (signatures , & signature )
150+ }
151+
152+ _ , err = s .createOrUpdateNotifier (msg .ID , nil , "" , signatures , defaultNotifierTTL )
153+ if err != nil {
154+ s .logger .Error ().Err (err ).Msg ("fail to update notifier" )
145155 }
146156}
147157
148158func (s * SignatureNotifier ) sendOneMsgToPeer (m * signatureItem ) error {
149159 ctx , cancel := context .WithTimeout (context .Background (), time .Second * 2 )
150160 defer cancel ()
151161
152- stream , err := s .host .NewStream (ctx , m .peerID , signatureNotifierProtocol )
162+ stream , err := s .host .NewStream (ctx , m .peerID , p2p . ProtocolSignatureNotifier )
153163 if err != nil {
154- return errors .Wrapf (err , "unable to create %s stream to peer %s " , signatureNotifierProtocol , m . peerID )
164+ return errors .Wrapf (err , "unable to create %s stream to peer" , p2p . ProtocolSignatureNotifier )
155165 }
156166
157- s . logger . Debug (). Stringer ( logs . Peer , m . peerID ). Msg ( "opened stream to peer successfully" )
167+ defer s . streamMgr . Stash ( m . messageID , stream )
158168
159- defer func () {
160- // todo: why we need to add stream to streamMgr here?
161- s .streamMgr .AddStream (m .messageID , stream )
162- }()
169+ s .logger .Debug ().Stringer (logs .Peer , m .peerID ).Msg ("opened stream to peer successfully" )
163170
164171 ks := & messages.KeysignSignature {
165172 ID : m .messageID ,
@@ -196,9 +203,11 @@ func (s *SignatureNotifier) sendOneMsgToPeer(m *signatureItem) error {
196203 return errors .Wrapf (err , "fail to set read deadline to stream" )
197204 }
198205
199- ret := make ([]byte , 8 )
200- if _ , err := stream .Read (ret ); err != nil {
201- return errors .Wrapf (err , "fail to read response from stream" )
206+ const expectedResponseSize = p2p .PayloadHeaderLen + p2p .ResponseMessageBytesLen
207+
208+ ack := make ([]byte , expectedResponseSize )
209+ if _ , err := stream .Read (ack ); err != nil {
210+ return errors .Wrapf (err , "failed to read response from stream" )
202211 }
203212
204213 return err
@@ -228,7 +237,12 @@ func (s *SignatureNotifier) broadcastCommon(messageID string, sig []*common.Sign
228237 go func () {
229238 defer wg .Done ()
230239
231- if err := s .sendOneMsgToPeer (sig ); err != nil {
240+ err := s .sendOneMsgToPeer (sig )
241+ switch {
242+ case errors .Is (err , network .ErrReset ):
243+ // someone above dropped the stream
244+ // (eg the signature is already processed) that's fine
245+ case err != nil :
232246 s .logger .Error ().Err (err ).
233247 Str (logs .MsgID , sig .messageID ).
234248 Stringer (logs .Peer , sig .peerID ).
@@ -303,6 +317,8 @@ func (s *SignatureNotifier) WaitForSignature(
303317) ([]* common.SignatureData , error ) {
304318 s .logger .Debug ().Msg ("waiting for signature" )
305319
320+ defer s .streamMgr .Free (messageID )
321+
306322 n , err := s .createOrUpdateNotifier (messageID , message , poolPubKey , nil , timeout + time .Second )
307323 if err != nil {
308324 return nil , errors .Wrapf (err , "unable to create or update notifier" )
@@ -320,11 +336,6 @@ func (s *SignatureNotifier) WaitForSignature(
320336 s .logger .Debug ().Msg ("timed out waiting for signature from peer" )
321337 return nil , errors .Errorf ("timeout: didn't receive signature after %s" , timeout .String ())
322338 case <- sigChan :
323- s .logger .Debug ().Msg ("got signature generated signal" )
324339 return nil , p2p .ErrSigGenerated
325340 }
326341}
327-
328- func (s * SignatureNotifier ) ReleaseStream (msgID string ) {
329- s .streamMgr .ReleaseStream (msgID )
330- }
0 commit comments