@@ -162,56 +162,63 @@ func (c *Communication) writeToStream(pID peer.ID, msg []byte, msgID string) err
162162
163163func (c * Communication ) readFromStream (stream network.Stream ) {
164164 peerID := stream .Conn ().RemotePeer ().String ()
165- c .logger .Debug ().Msgf ( "reading from stream of peer: %s" , peerID )
165+ c .logger .Debug ().Str ( logs . Peer , peerID ). Msg ( "Reading from peer's stream" )
166166
167167 const timeout = 10 * time .Second
168168
169- select {
170- case <- c .stopChan :
169+ payload , err := ReadStreamWithBuffer (stream )
170+ if err != nil {
171+ c .logger .Error ().Err (err ).Str (logs .Peer , peerID ).Msg ("fail to read from stream" )
172+ c .streamMgr .AddStream ("UNKNOWN" , stream )
173+ return
174+ }
175+
176+ var wrappedMsg messages.WrappedMessage
177+ if err := json .Unmarshal (payload , & wrappedMsg ); nil != err {
178+ c .logger .Error ().Err (err ).Msg ("fail to unmarshal wrapped message bytes" )
179+ c .streamMgr .AddStream ("UNKNOWN" , stream )
171180 return
172- default :
173- dataBuf , err := ReadStreamWithBuffer (stream )
174- if err != nil {
175- c .logger .Error ().Err (err ).Str (logs .Peer , peerID ).Msg ("fail to read from stream" )
176- c .streamMgr .AddStream ("UNKNOWN" , stream )
177- return
178- }
179- var wrappedMsg messages.WrappedMessage
180- if err := json .Unmarshal (dataBuf , & wrappedMsg ); nil != err {
181- c .logger .Error ().Err (err ).Msg ("fail to unmarshal wrapped message bytes" )
182- c .streamMgr .AddStream ("UNKNOWN" , stream )
183- return
184- }
185- c .logger .Debug ().Msgf (">>>>>>>[%s] %s" , wrappedMsg .MessageType , string (wrappedMsg .Payload ))
186- channel := c .getSubscriber (wrappedMsg .MessageType , wrappedMsg .MsgID )
187- if nil == channel {
188- c .logger .Debug ().Msgf ("no MsgID %s found for this message" , wrappedMsg .MsgID )
189- c .logger .Debug ().Msgf ("no MsgID %s found for this message" , wrappedMsg .MessageType )
190- _ = stream .Reset ()
191- return
192- }
193- c .streamMgr .AddStream (wrappedMsg .MsgID , stream )
194- select {
195- case <- time .After (timeout ):
196- c .logger .Warn ().
197- Str (logs .MsgID , wrappedMsg .MsgID ).
198- Str (logs .Peer , peerID ).
199- Str ("protocol" , string (stream .Protocol ())).
200- Str ("message_type" , wrappedMsg .MessageType .String ()).
201- Float64 ("timeout" , timeout .Seconds ()).
202- Msg ("Timeout to send message to channel" )
203- case channel <- & Message {
204- PeerID : stream .Conn ().RemotePeer (),
205- Payload : dataBuf }:
206- }
181+ }
182+
183+ channel := c .getSubscriber (wrappedMsg .MessageType , wrappedMsg .MsgID )
184+ if nil == channel {
185+ c .logger .Debug ().Msgf ("no MsgID %s found for this message" , wrappedMsg .MsgID )
186+ c .logger .Debug ().Msgf ("no MsgID %s found for this message" , wrappedMsg .MessageType )
187+ _ = stream .Reset ()
188+ return
189+ }
190+
191+ c .streamMgr .AddStream (wrappedMsg .MsgID , stream )
192+
193+ msg := & Message {
194+ PeerID : stream .Conn ().RemotePeer (),
195+ Payload : payload ,
196+ }
197+
198+ select {
199+ case channel <- msg :
200+ // all good, message sent
201+ case <- time .After (timeout ):
202+ // Note that we aren't logging payload itself
203+ // as it might contain sensitive information
204+ c .logger .Warn ().
205+ Str (logs .MsgID , wrappedMsg .MsgID ).
206+ Str (logs .Peer , peerID ).
207+ Str ("protocol" , string (stream .Protocol ())).
208+ Str ("message_type" , wrappedMsg .MessageType .String ()).
209+ Int ("message_payload_bytes" , len (wrappedMsg .Payload )).
210+ Float64 ("timeout" , timeout .Seconds ()).
211+ Msg ("readFromStream: timeout to send message to channel" )
207212 }
208213}
209214
210215func (c * Communication ) handleStream (stream network.Stream ) {
211- peerID := stream .Conn ().RemotePeer ().String ()
212- c .logger .Debug ().Msgf ("handle stream from peer: %s" , peerID )
213- // we will read from that stream
214- c .readFromStream (stream )
216+ select {
217+ case <- c .stopChan :
218+ return
219+ default :
220+ c .readFromStream (stream )
221+ }
215222}
216223
217224func (c * Communication ) bootStrapConnectivityCheck () error {
0 commit comments