Skip to content

Commit 3cb8567

Browse files
committed
Minor StreamMgr improvements
1 parent b7ad29e commit 3cb8567

File tree

5 files changed

+45
-34
lines changed

5 files changed

+45
-34
lines changed

keysign/signature_notifier.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,19 @@ type SignatureNotifier struct {
4242

4343
// NewSignatureNotifier create a new instance of SignatureNotifier
4444
func NewSignatureNotifier(host host.Host, logger zerolog.Logger) *SignatureNotifier {
45-
logger = logger.With().
46-
Str(logs.Component, "signature_notifier").
47-
Stringer(logs.Host, host.ID()).
48-
Logger()
49-
5045
ctx, cancel := context.WithCancel(context.Background())
5146

5247
s := &SignatureNotifier{
5348
ctx: ctx,
5449
cancel: cancel,
55-
logger: logger,
5650
host: host,
5751
notifierLock: &sync.Mutex{},
5852
notifiers: make(map[string]*notifier),
59-
streamMgr: p2p.NewStreamMgr(),
53+
streamMgr: p2p.NewStreamMgr(logger),
54+
logger: logger.With().
55+
Str(logs.Component, "signature_notifier").
56+
Stringer(logs.Host, host.ID()).
57+
Logger(),
6058
}
6159

6260
host.SetStreamHandler(signatureNotifierProtocol, s.handleStream)

p2p/communication.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func NewCommunication(
9595
streamCount: 0,
9696
BroadcastMsgChan: make(chan *messages.BroadcastMsgChan, 1024),
9797
externalAddr: externalAddr,
98-
streamMgr: NewStreamMgr(),
98+
streamMgr: NewStreamMgr(logger),
9999
whitelistedPeers: whitelistedPeers,
100100
}, nil
101101
}

p2p/party_coordinator.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,19 @@ func NewPartyCoordinator(host host.Host, timeout time.Duration, logger zerolog.L
4545
if timeout.Nanoseconds() == 0 {
4646
timeout = 10 * time.Second
4747
}
48+
4849
pc := &PartyCoordinator{
4950
logger: logger,
5051
host: host,
5152
stopChan: make(chan struct{}),
5253
timeout: timeout,
5354
peersGroup: make(map[string]*peerStatus),
5455
joinPartyGroupLock: &sync.Mutex{},
55-
streamMgr: NewStreamMgr(),
56+
streamMgr: NewStreamMgr(logger),
5657
}
58+
5759
host.SetStreamHandler(joinPartyProtocolWithLeader, pc.HandleStreamWithLeader)
60+
5861
return pc
5962
}
6063

p2p/stream_helper.go

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
"github.com/libp2p/go-libp2p/core/network"
1313
"github.com/pkg/errors"
1414
"github.com/rs/zerolog"
15-
"github.com/rs/zerolog/log"
15+
16+
"github.com/zeta-chain/go-tss/logs"
1617
)
1718

1819
const (
@@ -32,52 +33,60 @@ func init() {
3233

3334
type StreamMgr struct {
3435
unusedStreams map[string][]network.Stream
35-
streamLocker *sync.RWMutex
36+
mu *sync.RWMutex
3637
logger zerolog.Logger
3738
}
3839

39-
func NewStreamMgr() *StreamMgr {
40+
func NewStreamMgr(logger zerolog.Logger) *StreamMgr {
4041
return &StreamMgr{
4142
unusedStreams: make(map[string][]network.Stream),
42-
streamLocker: &sync.RWMutex{},
43-
logger: log.With().Str("module", "communication").Logger(),
43+
mu: &sync.RWMutex{},
44+
logger: logger.With().Str(logs.Component, "stream_manager").Logger(),
4445
}
4546
}
4647

4748
func (sm *StreamMgr) ReleaseStream(msgID string) {
48-
sm.streamLocker.RLock()
49+
sm.mu.RLock()
4950
usedStreams, okStream := sm.unusedStreams[msgID]
5051
unknownStreams, okUnknown := sm.unusedStreams["UNKNOWN"]
5152
streams := append(usedStreams, unknownStreams...)
52-
sm.streamLocker.RUnlock()
53-
if okStream || okUnknown {
54-
for _, el := range streams {
55-
err := el.Reset()
56-
if err != nil {
57-
sm.logger.Error().Err(err).Msg("fail to reset the stream,skip it")
58-
}
53+
sm.mu.RUnlock()
54+
55+
// noop
56+
if !(okStream || okUnknown) {
57+
return
58+
}
59+
60+
for _, stream := range streams {
61+
if err := stream.Reset(); err != nil {
62+
sm.logger.Error().Err(err).
63+
Str(logs.MsgID, msgID).
64+
Str("stream_id", stream.ID()).
65+
Msg("Failed to reset the stream")
5966
}
60-
sm.streamLocker.Lock()
61-
delete(sm.unusedStreams, msgID)
62-
delete(sm.unusedStreams, "UNKNOWN")
63-
sm.streamLocker.Unlock()
6467
}
68+
69+
sm.mu.Lock()
70+
delete(sm.unusedStreams, msgID)
71+
delete(sm.unusedStreams, "UNKNOWN")
72+
sm.mu.Unlock()
6573
}
6674

6775
func (sm *StreamMgr) AddStream(msgID string, stream network.Stream) {
6876
if stream == nil {
6977
return
7078
}
71-
sm.streamLocker.Lock()
72-
defer sm.streamLocker.Unlock()
79+
80+
sm.mu.Lock()
81+
defer sm.mu.Unlock()
82+
7383
entries, ok := sm.unusedStreams[msgID]
7484
if !ok {
75-
entries := []network.Stream{stream}
76-
sm.unusedStreams[msgID] = entries
77-
} else {
78-
entries = append(entries, stream)
79-
sm.unusedStreams[msgID] = entries
85+
sm.unusedStreams[msgID] = []network.Stream{stream}
86+
return
8087
}
88+
89+
sm.unusedStreams[msgID] = append(entries, stream)
8190
}
8291

8392
// ReadStreamWithBuffer read data from the given stream

p2p/stream_helper_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/pkg/errors"
11+
"github.com/rs/zerolog"
1112

1213
"github.com/libp2p/go-libp2p/core/network"
1314
"github.com/libp2p/go-libp2p/core/protocol"
@@ -234,7 +235,7 @@ func TestReadPayload(t *testing.T) {
234235
}
235236

236237
func TestStreamManager(t *testing.T) {
237-
streamMgr := NewStreamMgr()
238+
streamMgr := NewStreamMgr(zerolog.New(zerolog.NewTestWriter(t)))
238239
stream := NewMockNetworkStream()
239240

240241
streamMgr.AddStream("1", nil)

0 commit comments

Comments
 (0)