-
Notifications
You must be signed in to change notification settings - Fork 3
feat(p2p): key signing improvements #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughThis pull request refactors various components across the system. Changes include adjustments to comment placement and naming conventions, updates to concurrency controls and synchronization mechanisms, and the replacement of legacy functions with streamlined counterparts. Notably, protocol identifiers and stream management have been overhauled through a new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Server
participant PartyCoordinator
participant StreamManager
Client->>Server: Request Key Signature
Server->>PartyCoordinator: joinParty(msgID, blockHeight, peers, threshold)
PartyCoordinator->>StreamManager: Stash stream for msgID
PartyCoordinator-->>Server: Return leader (peer.ID)
Server->>PartyCoordinator: Initiate batch signing (wait for signatures)
PartyCoordinator->>StreamManager: Free streams upon completion
Server->>Client: Return built signature response
sequenceDiagram
participant Caller
participant ProtocolModule
participant PeerList
Caller->>ProtocolModule: Invoke PickLeader(msgID, blockHeight, peers)
ProtocolModule->>PeerList: Compute hash for each peer (hashPeer)
PeerList->>ProtocolModule: Return hash values
ProtocolModule-->Caller: Return peer with lowest hash as leader
Possibly related issues
Suggested reviewers
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
setup | ✅ e2e tests completed in 5m54.261976797s
setup | ---📈 Network Report ---
setup | Block Height: 203
setup | CCTX Processed: 84
setup | Emissions Pool Balance: 19998075ZETA
e2e passed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (10)
common/tss.go (1)
883-893: Consider implementing a configurable handler for unknown message types.The added comment raises a valid consideration about unknown message type handling. Instead of always returning
messages.Unknown, consider implementing a configurable strategy (skip/log/error) to handle unknown message types more flexibly.- // todo should we skip unknown message types? - func getBroadcastMessageType(msgType messages.THORChainTSSMessageType) messages.THORChainTSSMessageType { + // getBroadcastMessageType returns the corresponding verification message type + // for a given TSS message type, or messages.Unknown for unsupported types + func getBroadcastMessageType(msgType messages.THORChainTSSMessageType) messages.THORChainTSSMessageType { switch msgType { case messages.TSSKeyGenMsg: return messages.TSSKeyGenVerMsg case messages.TSSKeySignMsg: return messages.TSSKeySignVerMsg default: - // this should not happen return messages.Unknown } }p2p/stream_manager_test.go (1)
239-286: Test rewritten to validate the new StreamManager APIThe test has been significantly improved to verify multiple aspects of the StreamManager:
- Proper handling of nil streams
- Correct stashing and replacement behavior
- Effective freeing of streams by message ID
- Verification of map state after operations
The updated test provides more comprehensive coverage of the StreamManager functionality with clear assertions that validate the expected behavior.
Consider adding an explicit test to verify that streams are properly closed when freed, not just removed from the map. This could be done by adding a method to the MockNetworkStream to track close operations.
type MockNetworkStream struct { *bytes.Buffer protocol protocol.ID errSetReadDeadLine bool errSetWriteDeadLine bool errRead bool id int64 + closed bool } func (m MockNetworkStream) Close() error { + m.closed = true return nil } // In TestStreamManager // After freeing, verify: +streamA.closed = true +assert.True(t, streamA.closed, "Stream should be closed when freed")keysign/ecdsa/tss_keysign.go (1)
117-118: TODOs should be addressed or converted to issuesThis TODO comment about potentially returning
ErrNotInThePartyshould either be implemented or tracked as a formal issue rather than left as a comment.Either implement the suggested error handling or convert this TODO to a tracked issue with clear acceptance criteria.
keysign/signature_notifier.go (1)
122-124: Minor grammar suggestion in the error message.Consider changing "Fail to write the reply to peer" to "Failed to write reply to peer" to maintain consistent and clear logging.
- logger.Error().Err(err).Msg("Fail to write the reply to peer") + logger.Error().Err(err).Msg("Failed to write reply to peer")p2p/stream_manager.go (2)
19-23: Reasonable default resource limits.Limiting the payload to 20 MB safeguards against excessive memory usage. Ensure downstream consumers understand and handle large payloads carefully.
I can help document these limits or provide fallback logic if desired.
148-163: Clean, atomic stream deletion.
deleteStreamproperly resets and removes the stream from the map. Consider logging the success of the reset for debugging purposes (beyond error logs).p2p/communication.go (2)
29-30: Improve comment clarity for TimeoutConnectingThe current comment simply repeats the constant name without providing additional information about its purpose or why this specific timeout value was chosen.
-// TimeoutConnecting maximum time for wait for peers to connect +// TimeoutConnecting defines the maximum duration to wait for peers to establish +// connections before timing out. This value affects connection reliability +// vs connection speed tradeoffs. const TimeoutConnecting = 20 * time.Second
192-193: Consider standardizing timeout constantsThere's a local timeout constant (10 seconds) that differs from the global
TimeoutConnectingconstant (20 seconds). Having different timeouts for similar operations may lead to inconsistent behavior.-const timeout = 10 * time.Second +// Use a named constant that explains the purpose of this specific timeout +const timeoutChannelSend = 10 * time.Second select { case channel <- msg: // all good, message sent -case <-time.After(timeout): +case <-time.After(timeoutChannelSend):p2p/party_coordinator.go (2)
196-197: Address the TODO comment for parameter typeThe TODO comment suggests dropping the
getPeerIDsfunction in favor of using[]peer.IDdirectly. Since this is already part of the PR scope, it would be better to implement this change now rather than leaving it as a TODO.Consider implementing this change by updating the signature of functions that call
getPeerIDsto accept[]peer.IDdirectly instead of[]string.
254-256: Simplify constant declarationThe local
protoIDconstant is only used once in the function, making it unnecessary.func (pc *PartyCoordinator) sendMsgToPeer(msgID string, pid peer.ID, payload []byte, needResponse bool) error { -const protoID = ProtocolJoinPartyWithLeader - ctx, cancel := context.WithTimeout(context.Background(), time.Second*4) defer cancel() pc.logger.Debug().Msgf("try to open stream to (%s) ", pid) -stream, err := pc.host.NewStream(ctx, pid, protoID) +stream, err := pc.host.NewStream(ctx, pid, ProtocolJoinPartyWithLeader) if err != nil { - return errors.Wrapf(err, "failed to create %s stream to peer %q", protoID, pid.String()) + return errors.Wrapf(err, "failed to create %s stream to peer %q", ProtocolJoinPartyWithLeader, pid.String()) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (20)
common/tss.go(1 hunks)keysign/ecdsa/tss_keysign.go(2 hunks)keysign/notifier.go(1 hunks)keysign/signature_notifier.go(6 hunks)messages/p2p_message.go(3 hunks)p2p/communication.go(11 hunks)p2p/leader_provider.go(0 hunks)p2p/leader_provider_test.go(0 hunks)p2p/party_coordinator.go(14 hunks)p2p/party_coordinator_test.go(9 hunks)p2p/peer_status.go(4 hunks)p2p/protocol.go(1 hunks)p2p/protocol_test.go(1 hunks)p2p/stream_helper.go(0 hunks)p2p/stream_manager.go(1 hunks)p2p/stream_manager_test.go(6 hunks)tss/keygen.go(4 hunks)tss/keysign.go(8 hunks)tss/tss.go(1 hunks)tss/tss_4nodes_zeta_test.go(2 hunks)
💤 Files with no reviewable changes (3)
- p2p/leader_provider.go
- p2p/leader_provider_test.go
- p2p/stream_helper.go
🧰 Additional context used
🧬 Code Definitions (11)
common/tss.go (1)
messages/p2p_message.go (6)
THORChainTSSMessageType(11-11)TSSKeyGenMsg(15-15)TSSKeyGenVerMsg(19-19)TSSKeySignMsg(17-17)TSSKeySignVerMsg(21-21)Unknown(27-27)
tss/keygen.go (1)
conversion/key_provider.go (1)
GetPubKeyFromPeerID(78-97)
p2p/stream_manager_test.go (1)
p2p/stream_manager.go (1)
NewStreamManager(54-76)
tss/keysign.go (7)
keysign/response.go (3)
Response(17-21)Signature(9-14)NewResponse(32-38)logs/fields.go (3)
Leader(11-11)MsgID(8-8)Peer(9-9)common/tss_helper.go (1)
MsgToHashInt(42-51)p2p/party_coordinator.go (1)
ErrSigGenerated(26-26)blame/types.go (1)
Blame(41-47)keysign/eddsa/tss_keysign.go (1)
New(39-62)common/status.go (1)
Success(7-7)
p2p/protocol.go (1)
tss/tss.go (1)
New(69-141)
p2p/protocol_test.go (1)
p2p/protocol.go (1)
PickLeader(45-69)
p2p/party_coordinator_test.go (2)
p2p/party_coordinator.go (2)
PartyCoordinator(29-37)NewPartyCoordinator(40-61)p2p/protocol.go (1)
PickLeader(45-69)
keysign/signature_notifier.go (3)
logs/fields.go (3)
Host(10-10)Component(7-7)Peer(9-9)p2p/protocol.go (4)
ProtocolSignatureNotifier(25-25)ResponseMessage(39-39)PayloadHeaderLen(36-36)ResponseMessageBytesLen(40-40)messages/signature_notifier.pb.go (4)
KeysignSignature(72-79)KeysignSignature(92-92)KeysignSignature(107-109)KeysignSignature_Success(27-27)
p2p/communication.go (2)
logs/fields.go (3)
Component(7-7)Peer(9-9)MsgID(8-8)messages/p2p_message.go (1)
WrappedMessage(52-56)
p2p/party_coordinator.go (4)
logs/fields.go (3)
MsgID(8-8)Peer(9-9)Leader(11-11)p2p/stream_manager.go (4)
StreamManager(41-46)NewStreamManager(54-76)WriteStreamWithBuffer(251-285)ReadStreamWithBuffer(216-248)p2p/protocol.go (2)
ProtocolJoinPartyWithLeader(17-17)ResponseMessage(39-39)messages/join_party.pb.go (3)
JoinPartyLeaderComm(78-86)JoinPartyLeaderComm(99-99)JoinPartyLeaderComm(114-116)
keysign/ecdsa/tss_keysign.go (2)
keysign/keysign.go (1)
TssKeySign(11-19)logs/fields.go (1)
Party(15-22)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: test go 1.22
🔇 Additional comments (81)
keysign/notifier.go (1)
43-43: Improved code organization with proper spacing.The addition of this blank line enhances readability by clearly separating error handling logic from the function's return statement, following Go's style conventions for visually distinct code blocks.
p2p/peer_status.go (8)
21-22: Improved mutex implementation by removing pointer indirection.Converting from a pointer to a direct mutex instance reduces indirection, eliminates potential nil dereference risks, and aligns with Go's idiomatic patterns for field-level synchronization.
26-28: Updated lock operations to use the new mutex field.The RLock and RUnlock operations now properly reference the renamed mutex field.
33-35: Updated lock operations to use the new mutex field.The Lock and Unlock operations now properly reference the renamed mutex field.
40-42: Updated lock operations to use the new mutex field.The RLock and RUnlock operations now properly reference the renamed mutex field.
56-64: Improved struct initialization with direct mutex instance.The peerStatus initialization now includes the mutex as a direct field rather than a pointer, which aligns with the field definition change. This is a cleaner implementation that reduces memory allocations.
74-76: Updated lock operations to use the new mutex field.The RLock and RUnlock operations now properly reference the renamed mutex field.
81-93: Enhanced getPeersStatus method structure and mutex usage.The method has been refactored to:
- Declare variables before locking
- Use the renamed mutex field
- Improve the code flow with continue statements
This approach minimizes the critical section and follows Go's idiomatic patterns for concurrent code.
99-101: Updated lock operations to use the new mutex field.The Lock and Unlock operations now properly reference the renamed mutex field.
p2p/protocol.go (7)
13-17: Well-structured protocol constant with clear documentation.The constant declaration provides a comprehensive explanation of the protocol's purpose and responsibility. The naming convention follows Go standards, making the code more maintainable.
19-21: Clear documentation for TSS protocol constant.The protocol identifier is well-documented, explaining its purpose for passing lib-tss messages across peers and handling key ceremonies.
23-25: Well-defined protocol identifier for signature notification.The constant declaration clearly explains the protocol's purpose for notifying peers about signature data and verification responsibilities.
28-31: Established message type constants with clear naming.The message type constants follow a consistent naming convention and are appropriately scoped for JoinPartyLeaderComm communications.
33-41: Well-documented payload header and response constants.The constants are clearly documented with explanations of their purpose and technical details, which enhances code maintainability.
43-69: Robust leader selection implementation with comprehensive validation.The PickLeader function is well-implemented with:
- Proper input validation
- Handling of edge cases (single peer)
- Efficient leader selection algorithm
- Clear error reporting
The function preserves the original implementation logic while improving code organization.
71-76: Secure peer hashing implementation.The hashPeer helper function creates a deterministic hash by combining the message ID, block height, and peer ID before applying SHA-256. This ensures consistent and secure leader selection across the network.
messages/p2p_message.go (3)
10-11: Improved comment clarity for THORChainTSSMessageType.The comment now correctly describes the type's purpose in representing message types for THORChain TSS.
22-25: Enhanced documentation for TSSControlMsg and TSSTaskDone constants.The comments now provide clear explanations of these message types' purposes, making the code more maintainable.
30-48: Improved String method implementation for THORChainTSSMessageType.The implementation has been enhanced with:
- More meaningful receiver name ('t' instead of 'msgType')
- Comprehensive coverage of all enum values
- Better handling of unknown values with formatted output
This improves error diagnostics and debugging capabilities.
p2p/protocol_test.go (7)
1-9: Well-structured imports with appropriate packages.The imports include the necessary testing packages (standard testing and testify) along with libp2p peer libraries, providing a clean foundation for the test suite.
11-29: Good test structure for leader selection.The test follows best practices with descriptive test names and comprehensive validation. The test properly checks the happy path for leader selection using both require (for fatal assertions) and assert (for non-fatal assertions).
31-38: Effective single-peer test case.This test case appropriately verifies the optimization path in the
PickLeaderfunction when only one peer is provided, ensuring the function returns that peer without performing additional computation.
40-43: Proper validation of empty peer list error handling.The test effectively validates the error case when an empty peer list is provided, ensuring the function fails gracefully with an error rather than panicking.
45-48: Effective validation of empty msgID error handling.The test properly checks that providing an empty message ID results in an error, ensuring the function's input validation works correctly.
50-54: Thorough testing of blockHeight validation.The test verifies that the function correctly rejects invalid block heights (≤ 0) with an appropriate error, completing the error case coverage.
1-54: Well-implemented test suite with comprehensive coverage.The entire test file provides excellent coverage for the
PickLeaderfunction, testing all the key scenarios: successful leader selection, single-peer optimization, and all error conditions. The implementation follows Go testing best practices with clear assertions and error validation.tss/tss.go (2)
184-190: Improved return type from string to peer.ID.Good refactoring of the function signature to return a more appropriate peer.ID type instead of a string, enhancing type safety. The logging level change to Debug is also appropriate for this operational message.
196-197: Streamlined API call without unnecessary type conversions.The code now directly passes peersID to JoinPartyWithLeader, eliminating unnecessary string conversion. This is more efficient and maintainable.
tss/tss_4nodes_zeta_test.go (1)
247-267: Enhanced logging with contextual information.The addition of node-specific context to the logger (index and port) is excellent. This structured logging approach will significantly improve debugging capabilities during tests by making it clear which node is generating each log message.
tss/keygen.go (4)
78-79: Stream management method updated to follow new naming conventionThe method names have been updated from
ReleaseStreamtoFreeStreamsto maintain consistency with the refactored StreamManager interface. This change aligns with similar updates across the codebase.
256-257: Stream management method updated in KeygenAllAlgoSimilar to the change in the Keygen method, this updates the stream cleanup operations to use the new
FreeStreamsmethod for consistency across the codebase.
286-286: Improved type safety in PeerID conversionSame improvement as earlier, ensuring proper string conversion before calling
GetPubKeyFromPeerID.
113-113:✅ Verification successful
Improved type safety in PeerID conversion
Converting the leader to a string explicitly with
leader.String()before passing it toGetPubKeyFromPeerIDensures proper input type and clarifies the function's usage, asGetPubKeyFromPeerIDexpects a string parameter.
🏁 Script executed:
#!/bin/bash # Check if all calls to GetPubKeyFromPeerID are passing String() properly rg -A 1 "GetPubKeyFromPeerID" --type go | grep -v "String()"Length of output: 552
Approval: Explicit Conversion Confirmed in tss/keygen.go
I have verified that the call in
tss/keygen.goat line 113 correctly usesleader.String()when invokingconversion.GetPubKeyFromPeerID. This explicit conversion ensures that the function receives a properly typed string parameter—which is its expected input—and thereby enhances type safety and clarity. Although similar function calls in other files (e.g., inconversion/key_provider.go) pass variables that are already strings, the explicit conversion in this context is both intentional and correct.
- Confirmed proper usage at the specified location.
- Verified that the explicit conversion is aligned with the intended interface of
GetPubKeyFromPeerID.No further changes are needed in this instance.
p2p/stream_manager_test.go (4)
120-120: Constant name updated to better reflect its purposeThe constant
LengthHeaderhas been renamed toPayloadHeaderLenwhich more clearly describes its purpose as the length of the payload header.
134-134: Consistent use of updated constant nameUpdated to use
PayloadHeaderLenfor consistency with the previous change.
147-147: Consistent use of updated constant nameUpdated to use
PayloadHeaderLenfor consistency.
160-160: Consistent use of updated constant nameUpdated to use
PayloadHeaderLenfor consistency.keysign/ecdsa/tss_keysign.go (1)
73-104: Improved clarity and function structure in startBatchSigningThe changes improve the function in several ways:
- Better documentation with clearer comments
- More descriptive variable names (
startedvsret)- Simplified variable naming (
wgvskeySignWg)- More consistent error messages
- Better log level selection for routine operations
The code is now more maintainable and follows better Go idioms.
tss/keysign.go (6)
46-53: Improved error handling in waitForSignaturesThe changes improve error handling by:
- Using a more descriptive variable name (
sigDatavsdata)- Providing a more detailed error message with
errors.Wrap- Using the new
buildResponsefunction for consistent response constructionThese changes enhance code maintainability and aid in debugging.
106-107: Improved type safety in PeerID conversionConverting the leader to a string explicitly with
leader.String()ensures proper input type forGetPubKeyFromPeerID, which expects a string parameter.
135-136: Enhanced logging with proper type handlingUsing
Stringer()for logging the leader provides better type handling and ensures the leader is properly formatted in log messages.
267-269: Stream management method updated to follow new naming conventionUpdated from
ReleaseStreamtoFreeStreamsfor consistency with the refactored StreamManager interface.
288-301: Simplified sorting logicThe sort comparison function has been simplified to directly return the comparison result, making the code more concise and readable.
378-397: New buildResponse function improves error handlingThe new function centralizes the response building logic and adds proper error handling for empty signature lists, which was missing in the previous implementation.
The function:
- Validates the signature list is not empty
- Properly transforms signature data into the expected format
- Returns a properly structured response
This refactoring improves code organization and error handling.
keysign/signature_notifier.go (14)
37-37: Good addition for modular design.Introducing
streamMgras a separate field improves maintainability by encapsulating all stream-related operations in one place.
42-45: Enrich logger for better troubleshooting.Associating the logger with the component name and host ID is a best practice for clearer logs and more robust monitoring.
55-56: Initialization clarity.Constructing the new
StreamManagerwhile also customizing the logger streamlines the setup, ensuring eachSignatureNotifierhas its own dedicated manager and contextual logger.
59-59: Appropriate protocol handler registration.Setting the stream handler to
p2p.ProtocolSignatureNotifieris aligned with the overall design, ensuring incoming notifier streams are correctly routed tohandleStream.
108-110: Detailed logging for debugging.Logging the remote peer within
handleStreamis beneficial when diagnosing network interactions or tracing connection issues.
117-117: Fallback for unknown message ID.Calling
StashUnknown(stream)upon read failure ensures the stream is still tracked and eventually cleaned up. This guards against orphaned connections.
130-130: Consistent unknown handling.Invoking
StashUnknown(stream)again for unmarshaling errors keeps the stream management consistent.
136-139: Concise error handling approach.Early return on unsuccessful or empty signatures is efficient. Ensuring the calling function logs relevant details (if needed) is recommended for completeness.
142-150: Proper signature unmarshaling.Iterating over each signature slice and unmarshaling into
common.SignatureDatais correct. An optional improvement is to capture partial successes or track invalid signatures; however, this approach is adequate if partial sets are not tolerated.
152-154: Confirm desired behavior on error.If
createOrUpdateNotifierfails, the code only logs the error without returning. Verify if continuing execution is intentional. An early return may be safer if the notifier is crucial to the workflow.Would you like to enforce an immediate return after logging to prevent inconsistent state?
162-164: Robust error wrapping.Wrapping the error with
Wrapfclarifies the context when stream creation fails. This is a good practice for diagnosing connectivity issues.
167-167: Deferred stashing is safe and idiomatic.Deferring
Stash(m.messageID, stream)ensures release logic remains consistent even if the function exits early due to an error.
240-244: Graceful handling of stream reset.Allowing a dropped stream (when
network.ErrResetsurfaces) to pass silently avoids spurious error logs when the operation is already complete.
320-321: Freeing streams post-signature.Invoking
s.streamMgr.Free(messageID)after waiting for the signature is a clean approach to avoid resource leaks.p2p/party_coordinator_test.go (3)
45-75: Transition to peer.ID array usage.Using
peers []peer.IDinstead of[]stringpromotes stronger type safety and aligns with libp2p's pattern of working directly with peer IDs.
108-113: Proper population of the peers slice.Appending
el.ID()ensures we store the actual peer IDs rather than string representations, preventing serialization discrepancies.Also applies to: 150-158
123-184: Correct usage of PickLeader.Invoking
PickLeader(msgID, 10, peers)and directly comparinghost.ID()with the returned leader peer ID is a sound approach that avoids needless string parsing.p2p/stream_manager.go (10)
1-11: Clear module structure and constants.Defining
TimeoutReadPayload,TimeoutWritePayload, andMaxPayloadat the top conveys critical thresholds at a glance, aiding future maintainers.
25-30: Practical approach for toggling deadlines.Using
ApplyDeadlineas an atomic boolean helps in tests where deadlines are not supported by the mock network.
35-46: Well-scoped manager state.Keeping
streams,mu, and the configuredmaxAgeBeforeCleanupall inStreamManagerforms a cohesive and maintainable structure.
53-76: Automated cleanup routine.The cleanup ticker ensures stale streams are removed regularly. This design helps maintain stable resource usage without manual intervention.
78-105: Stash logic is concise.
StashandStashUnknownensure each stream is tracked under a unique stream ID. This approach is straightforward and reliable for future lookups or cleanup.
106-130: Graceful Free operation.Grouping and removing all streams for a message ID with a single call is user-friendly. Ensuring we lock while enumerating the map prevents concurrency issues.
131-146: Consistent ResetStream helper.Encapsulating
stream.Reset()inResetStreamsimplifies error handling, ensuring consistent logging in one place.
164-213: Periodic eviction logic.Iterating over stashed streams and removing those older than
maxAgeBeforeCleanupkeeps the system healthy. Logging the oldest stream's age and stats helps refine future tuning.
215-249: Robust read handling with header checks.Safe reading of the header, plus bounding the payload size, prevents out-of-memory scenarios and indefinite reads.
250-286: Adequate write pipeline.Constructing a combined buffer of the header and payload is efficient. Resetting the stream upon errors ensures no partial writes linger.
p2p/communication.go (5)
63-64: Well-structured logger setup with clear component identificationGood use of the structured logging pattern with the
logs.Componentconstant for consistent log identification.
93-94: Appropriate implementation of StreamManager initializationThe use of
NewStreamManagerwith logger dependency injection is a good practice that supports proper initialization and testability.
119-139: Improved goroutine management patternThe changes properly synchronize goroutines by adding to the wait group before starting them and using explicit waiting at the end of the function. This ensures clean termination and prevents potential goroutine leaks.
141-159: Well-structured stream connection with timeout and improved error handlingThe refactored function follows best practices by:
- Using a timeout context for connection establishment
- Proper resource cleanup with deferred cancel
- Clear error messages that include the protocol information
504-506: Good method renaming for clarityRenaming from
ReleaseStreamtoFreeStreamsmakes the purpose clearer and better aligns with the actual implementation which frees multiple streams rather than a single one.p2p/party_coordinator.go (5)
35-36: Improved mutex naming and type consistencyGood refactoring of the mutex name from the specific
joinPartyGroupLockto the more genericmu, which better follows Go conventions. TheStreamManagertype change aligns with the system-wide refactoring.
73-76: Consistent mutex locking patternThe code now uses a consistent pattern for mutex locking and unlocking across all methods, which improves thread safety and reduces the risk of deadlocks.
Also applies to: 115-118, 178-180, 189-191
277-286: Enhanced error handling for stream readsThe error handling for stream reads has been improved with proper categorization of errors and a cleaner switch-case structure. This allows for more targeted handling of specific error types.
437-447: Improved leader selection with dedicated functionUsing the dedicated
PickLeaderfunction improves code modularity and allows for better leader selection logic implementation. This refactoring aligns with the PR's objective to improve leader-picking logic.
467-469: Consistent stream cleanup method namingRenaming from
ReleaseStreamtoFreeStreamsaligns with the changes in other files and better represents the actual behavior of freeing multiple associated streams.
|
Please open a node PR so we can verify the CI results ourselfs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many changes, will have a deeper look tomorrow
https://github.com/zeta-chain/node/actions/runs/14091436850/job/39469176369?pr=3767 |
Co-authored-by: Lucas Bertrand <[email protected]>
1194670 to
aeee9ad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (10)
p2p/stream_manager.go (10)
53-76: Consider making cleanup interval configurableThe constructor hardcodes the cleanup interval to match
maxAgeBeforeCleanup. For better flexibility, consider making this interval configurable, potentially as a parameter to the constructor.-func NewStreamManager(logger zerolog.Logger) *StreamManager { +func NewStreamManager(logger zerolog.Logger, maxAgeBeforeCleanup time.Duration, cleanupInterval time.Duration) *StreamManager { // The max age before cleanup for unused streams // Could be moved to an constructor argument in the future. - const maxAgeBeforeCleanup = time.Minute + if maxAgeBeforeCleanup <= 0 { + maxAgeBeforeCleanup = time.Minute + } + + if cleanupInterval <= 0 { + cleanupInterval = maxAgeBeforeCleanup + } sm := &StreamManager{ streams: make(map[string]streamItem), maxAgeBeforeCleanup: maxAgeBeforeCleanup, mu: sync.RWMutex{}, logger: logger, } - ticker := time.NewTicker(sm.maxAgeBeforeCleanup) + ticker := time.NewTicker(cleanupInterval)
78-99: Consider tracking stream creation timeThe
Stashmethod currently doesn't track when a stream was added. Adding this information could help with more sophisticated cleanup strategies based on stream usage patterns.type streamItem struct { msgID string stream network.Stream + createdAt time.Time } // Stash adds a stream to the manager for later release. func (sm *StreamManager) Stash(msgID string, stream network.Stream) { if stream == nil { return } streamID := stream.ID() sm.mu.Lock() defer sm.mu.Unlock() // already exists if _, ok := sm.streams[streamID]; ok { return } // add stream items sm.streams[streamID] = streamItem{ msgID: msgID, stream: stream, + createdAt: time.Now(), } }
148-162: Potential inconsistent return value in deleteStreamThe
deleteStreammethod returnsfalseifResetStreamfails, but also returnsfalseif the stream doesn't exist. Consider using more specific error handling to distinguish between these cases.-// deleteStream deletes stream by stream id. NOT thread safe -func (sm *StreamManager) deleteStream(streamID string) bool { +// deleteStream deletes stream by stream id. Returns true if the stream was found and deleted successfully. +// NOT thread safe. +func (sm *StreamManager) deleteStream(streamID string) bool { s, ok := sm.streams[streamID] if !ok { return false } - if ok = sm.ResetStream(s.msgID, s.stream); !ok { - return false - } + // Even if reset fails, we should remove the stream from our map + resetSuccess := sm.ResetStream(s.msgID, s.stream) delete(sm.streams, streamID) - return true + return resetSuccess }
164-213: Improve cleanup method's performance and structureThe
cleanupmethod could be optimized by preallocating slices and using a two-pass approach to avoid potential issues with map iteration while deleting.// cleanup removes UNUSED && STALE streams. Also logs the stats. func (sm *StreamManager) cleanup() { sm.mu.Lock() defer sm.mu.Unlock() var ( totalStreams = len(sm.streams) unknownStreams = 0 freedStreams = 0 oldestStream = time.Duration(0) + streamsToDelete = make([]string, 0, totalStreams/4) // Preallocate assuming ~25% might need deletion ) for streamID, streamItem := range sm.streams { var ( s = streamItem.stream lifespan = time.Since(s.Stat().Opened) ) if streamItem.msgID == unknown { unknownStreams++ } if lifespan > oldestStream { oldestStream = lifespan } // let's revisit later if lifespan <= sm.maxAgeBeforeCleanup { continue } - if sm.deleteStream(streamID) { - freedStreams++ - } + streamsToDelete = append(streamsToDelete, streamID) } + + // Delete stale streams in a separate pass + for _, streamID := range streamsToDelete { + if sm.deleteStream(streamID) { + freedStreams++ + } + } if freedStreams == 0 { return }
215-248: ReadStreamWithBuffer should handle empty responsesThe
ReadStreamWithBufferfunction doesn't handle the case where the payload size is 0, which could be a valid case. Consider adding a check for this.func ReadStreamWithBuffer(stream network.Stream) ([]byte, error) { // ... existing code ... payloadSize := binary.LittleEndian.Uint32(header) + if payloadSize == 0 { + return []byte{}, nil + } if payloadSize > MaxPayload { return nil, errors.Errorf("stream payload exceeded (got %d, max %d)", payloadSize, MaxPayload) }
231-233: Enhance error logging with more contextThe error message could be more informative by including the remote peer information.
n, err := io.ReadFull(streamReader, header) if err != nil { - return nil, errors.Wrapf(err, "unable to read header from the stream (got %d bytes)", n) + return nil, errors.Wrapf(err, "unable to read header from the stream (got %d bytes, peer: %s)", + n, stream.Conn().RemotePeer().String()) }
256-266: Potential inconsistent error handling in WriteStreamWithBufferThe error handling in the
WriteStreamWithBufferfunction might be inconsistent with the read function. Consider standardizing the approach.if ApplyDeadline.Load() { deadline := time.Now().Add(TimeoutWritePayload) if err := stream.SetWriteDeadline(deadline); err != nil { if errReset := stream.Reset(); errReset != nil { - return errors.Wrap(errReset, "failed to reset stream during failure in write deadline") + return errReset } - - return errors.Wrap(err, "failed to set write deadline") + return err } }
217-225: Consider extracting timeout handling to separate functionsThe timeout handling logic is duplicated between
ReadStreamWithBufferandWriteStreamWithBuffer. Consider extracting this into helper functions.+// setStreamDeadline sets a read or write deadline and handles errors +func setStreamDeadline(stream network.Stream, isRead bool, timeout time.Duration) error { + if !ApplyDeadline.Load() { + return nil + } + + deadline := time.Now().Add(timeout) + var err error + + if isRead { + err = stream.SetReadDeadline(deadline) + } else { + err = stream.SetWriteDeadline(deadline) + } + + if err != nil { + if errReset := stream.Reset(); errReset != nil { + return errReset + } + return err + } + + return nil +} // ReadStreamWithBuffer read data from the given stream func ReadStreamWithBuffer(stream network.Stream) ([]byte, error) { - if ApplyDeadline.Load() { - deadline := time.Now().Add(TimeoutReadPayload) - if err := stream.SetReadDeadline(deadline); err != nil { - if errReset := stream.Reset(); errReset != nil { - return nil, errReset - } - return nil, err - } + if err := setStreamDeadline(stream, true, TimeoutReadPayload); err != nil { + return nil, err } // ... rest of the function ... } // WriteStreamWithBuffer write the message to stream func WriteStreamWithBuffer(msg []byte, stream network.Stream) error { if len(msg) > (MaxPayload - PayloadHeaderLen) { return errors.Errorf("payload size exceeded (got %d, max %d)", len(msg), MaxPayload) } - if ApplyDeadline.Load() { - deadline := time.Now().Add(TimeoutWritePayload) - - if err := stream.SetWriteDeadline(deadline); err != nil { - if errReset := stream.Reset(); errReset != nil { - return errors.Wrap(errReset, "failed to reset stream during failure in write deadline") - } - - return errors.Wrap(err, "failed to set write deadline") - } + if err := setStreamDeadline(stream, false, TimeoutWritePayload); err != nil { + return err } // ... rest of the function ... }Also applies to: 256-266
19-29: Consider using a struct for configuration settingsThe various constants and configuration settings could be better organized in a configuration struct. This would make it easier to manage and extend settings in the future.
+// StreamManagerConfig contains configuration settings for the StreamManager +type StreamManagerConfig struct { + TimeoutReadPayload time.Duration + TimeoutWritePayload time.Duration + MaxPayload int + MaxAgeBeforeCleanup time.Duration + CleanupInterval time.Duration + ApplyDeadline bool +} + +// DefaultStreamManagerConfig returns a config with sensible defaults +func DefaultStreamManagerConfig() StreamManagerConfig { + return StreamManagerConfig{ + TimeoutReadPayload: 20 * time.Second, + TimeoutWritePayload: 20 * time.Second, + MaxPayload: 20 * 1024 * 1024, // 20M + MaxAgeBeforeCleanup: time.Minute, + CleanupInterval: time.Minute, + ApplyDeadline: true, + } +} -const ( - TimeoutReadPayload = 20 * time.Second - TimeoutWritePayload = 20 * time.Second - MaxPayload = 20 * 1024 * 1024 // 20M -)
41-46: Consider adding metrics for StreamManager performanceAdding metrics would provide valuable insights into the StreamManager's performance and help identify potential issues.
type StreamManager struct { streams map[string]streamItem maxAgeBeforeCleanup time.Duration mu sync.RWMutex logger zerolog.Logger + // Metrics + totalStreamsAdded int64 + totalStreamsFreed int64 + cleanupRuns int64 } +// Metrics returns a map of metrics for monitoring +func (sm *StreamManager) Metrics() map[string]int64 { + sm.mu.RLock() + defer sm.mu.RUnlock() + + return map[string]int64{ + "streams_current": int64(len(sm.streams)), + "streams_added": atomic.LoadInt64(&sm.totalStreamsAdded), + "streams_freed": atomic.LoadInt64(&sm.totalStreamsFreed), + "cleanup_runs": atomic.LoadInt64(&sm.cleanupRuns), + "unknown_streams": sm.countUnknownStreams(), + } +} + +// countUnknownStreams returns the number of unknown streams +// Caller must hold at least a read lock +func (sm *StreamManager) countUnknownStreams() int64 { + var count int64 + for _, item := range sm.streams { + if item.msgID == unknown { + count++ + } + } + return count +}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
common/tss.go(1 hunks)keysign/ecdsa/tss_keysign.go(2 hunks)p2p/stream_manager.go(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- common/tss.go
🧰 Additional context used
🧬 Code Definitions (1)
keysign/ecdsa/tss_keysign.go (2)
keysign/keysign.go (1)
TssKeySign(11-19)logs/fields.go (1)
Party(15-22)
🔇 Additional comments (8)
keysign/ecdsa/tss_keysign.go (6)
73-76: Improved function signature and parameter naming.The parameter renaming from
keySignPartyMaptopartyMapenhances clarity and readability of the function. The updated comments also provide better context for understanding the purpose of the parameter.
77-82: Enhanced variable naming and synchronization.The variable renaming from
rettostartedprovides better semantic meaning that reflects its purpose. The variable is now properly initialized usingatomic.NewBool(true)and the WaitGroup is more appropriately named aswgwithin this function scope.
83-99: Improved error handling and type assertion.The changes maintain the same functionality while using the renamed parameter. The error message is now more consistent with the codebase style (capitalized first letter) and more precise about the failure context.
90-91: Improved error logging with consistent capitalization.The error message now follows a consistent style with capitalized first letter, improving the overall logging consistency throughout the codebase.
95-95: Reduced log verbosity by changing log level.Changing the logging level from
InfotoDebugappropriately reduces log verbosity for routine operations while still maintaining the information for debugging scenarios.
101-103: Simplified return logic.The code now correctly returns the value of
started.Load()after waiting for all goroutines to complete, providing a clear indication of whether all parties successfully started.p2p/stream_manager.go (2)
35-46: Well-structured StreamManager implementationThe StreamManager implementation follows good design principles with clear separation of concerns. It properly encapsulates the stream management logic with appropriate fields for tracking streams, cleanup timing, and synchronization.
1-286: Overall assessment: Well-structured implementation with room for improvementsThe StreamManager implementation is well-structured and addresses the core functionality needed. The code uses appropriate synchronization primitives, error handling, and logging. The periodic cleanup mechanism is a good approach to prevent resource leaks. There are opportunities for improved configurability, better resource management, and more sophisticated metrics collection as suggested in other comments.
Linked PR: zeta-chain/node#3767
Related: #50
Changes
StreamManager*p2p/protocol.goStreamManager* refactoring ensures thatI also haven't noticed any TSS errors during E2E (eg:
stream resetas previously)Please note that, although this and previous PRs improve the codebase, logic, and structuring and fix errors and bugs, I haven’t noticed a drastic performance improvement so far. My gut feeling is that signing on the
mainnetwould definitely become better, but it would not result in a 2x performance boost. However, the logging part should provide us with a clear picture of the P2P flow of each signature.PS: During code investigation, I noticed that in most cases we open a stream for a single message instead of reserving it for a “session” during key signing. Streams are virtual (i.e., no new TCP connection), but they consume CPU/memory resources. We could explore this performance improvement; however, it requires significant refactoring in Thor’s codebase. Probably, rewriting would take even less time than refactoring.
Summary by CodeRabbit
Summary by CodeRabbit