Skip to content

Commit 94b1d39

Browse files
committed
simplify party coordinator
1 parent 523a305 commit 94b1d39

File tree

1 file changed

+10
-15
lines changed

1 file changed

+10
-15
lines changed

p2p/party_coordinator.go

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/libp2p/go-libp2p/core/host"
99
"github.com/libp2p/go-libp2p/core/network"
1010
"github.com/libp2p/go-libp2p/core/peer"
11-
"github.com/libp2p/go-libp2p/core/protocol"
1211
"github.com/pkg/errors"
1312
"github.com/rs/zerolog"
1413
"golang.org/x/sync/errgroup"
@@ -224,7 +223,7 @@ func (pc *PartyCoordinator) sendResponseToAll(msg *messages.JoinPartyLeaderComm,
224223
}
225224

226225
errGroup.Go(func() error {
227-
err := pc.sendMsgToPeer(msgSend, msg.ID, peer, ProtocolJoinPartyWithLeader, true)
226+
err := pc.sendMsgToPeer(msg.ID, peer, msgSend, true)
228227
if err != nil {
229228
pc.logger.Error().Err(err).
230229
Str(logs.MsgID, msg.ID).
@@ -245,33 +244,29 @@ func (pc *PartyCoordinator) sendRequestToLeader(msg *messages.JoinPartyLeaderCom
245244
return errors.Wrap(err, "failed to marshall request")
246245
}
247246

248-
if err := pc.sendMsgToPeer(msgSend, msg.ID, leader, ProtocolJoinPartyWithLeader, false); err != nil {
247+
if err := pc.sendMsgToPeer(msg.ID, leader, msgSend, false); err != nil {
249248
return errors.Wrap(err, "sendMsgToPeer")
250249
}
251250

252251
return nil
253252
}
254253

255-
func (pc *PartyCoordinator) sendMsgToPeer(
256-
msgBuf []byte,
257-
msgID string,
258-
remotePeer peer.ID,
259-
protoID protocol.ID,
260-
needResponse bool,
261-
) error {
254+
func (pc *PartyCoordinator) sendMsgToPeer(msgID string, pid peer.ID, payload []byte, needResponse bool) error {
255+
const protoID = ProtocolJoinPartyWithLeader
256+
262257
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
263258
defer cancel()
264259

265-
pc.logger.Debug().Msgf("try to open stream to (%s) ", remotePeer)
260+
pc.logger.Debug().Msgf("try to open stream to (%s) ", pid)
266261

267-
stream, err := pc.host.NewStream(ctx, remotePeer, protoID)
262+
stream, err := pc.host.NewStream(ctx, pid, protoID)
268263
if err != nil {
269-
return errors.Wrapf(err, "failed to create %s stream to peer %q", protoID, remotePeer.String())
264+
return errors.Wrapf(err, "failed to create %s stream to peer %q", protoID, pid.String())
270265
}
271266

272267
defer pc.streamMgr.Stash(msgID, stream)
273268

274-
if err = WriteStreamWithBuffer(msgBuf, stream); err != nil {
269+
if err = WriteStreamWithBuffer(payload, stream); err != nil {
275270
return errors.Wrap(err, "failed to write message to opened stream")
276271
}
277272

@@ -286,7 +281,7 @@ func (pc *PartyCoordinator) sendMsgToPeer(
286281
case err != nil:
287282
pc.logger.Error().Err(err).
288283
Str(logs.MsgID, msgID).
289-
Stringer(logs.Peer, remotePeer).
284+
Stringer(logs.Peer, pid).
290285
Msg("Failed to await the response from peer")
291286
}
292287

0 commit comments

Comments
 (0)