From 03dd8ca5e888df78294ac045f6e955eef35bd51a Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Fri, 8 Aug 2025 13:53:26 -0400 Subject: [PATCH] fix noctx linter Co-authored-by: Stephen Buttolph Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- app/app.go | 15 +-- main/main.go | 5 +- nat/nat.go | 5 +- nat/no_router.go | 13 ++- network/dialer/dialer_test.go | 6 +- network/example_test.go | 8 +- network/network.go | 66 ++++++++----- network/network_test.go | 93 ++++++++++++------- network/peer/network.go | 6 +- network/peer/peer.go | 33 +++---- network/peer/peer_test.go | 1 + network/peer/test_network.go | 6 +- network/peer/test_peer.go | 3 +- network/peer/upgrader.go | 30 ++++-- network/peer/upgrader_test.go | 19 +++- node/node.go | 42 ++++++--- tests/antithesis/init_db.go | 2 +- tests/fixture/bootstrapmonitor/wait.go | 6 +- tests/fixture/tmpnet/monitor_processes.go | 2 +- tests/fixture/tmpnet/network.go | 32 +++++-- tests/fixture/tmpnet/process_runtime.go | 20 +++- tests/load/main/main.go | 2 +- tests/prometheus_server.go | 15 ++- tests/reexecute/c/vm_reexecute_test.go | 2 +- utils/ips/lookup.go | 9 +- utils/ips/lookup_test.go | 3 +- vms/rpcchainvm/batched_vm_test.go | 2 +- .../ghttp/gresponsewriter/writer_server.go | 5 +- vms/rpcchainvm/grpcutils/server.go | 3 +- .../runtime/subprocess/non_linux_stopper.go | 2 +- vms/rpcchainvm/state_syncable_vm_test.go | 22 +++-- vms/rpcchainvm/vm_test.go | 6 +- vms/rpcchainvm/with_context_vm_test.go | 2 +- 33 files changed, 326 insertions(+), 160 deletions(-) diff --git a/app/app.go b/app/app.go index 09369bdec2eb..fac4bb9d6af4 100644 --- a/app/app.go +++ b/app/app.go @@ -4,6 +4,7 @@ package app import ( + "context" "fmt" "os" "os/signal" @@ -33,7 +34,7 @@ var _ App = (*app)(nil) type App interface { // Start kicks off the application and returns immediately. // Start should only be called once. - Start() + Start(ctx context.Context) // Stop notifies the application to exit and returns immediately. // Stop should only be called after [Start]. @@ -45,7 +46,7 @@ type App interface { ExitCode() int } -func New(config nodeconfig.Config) (App, error) { +func New(ctx context.Context, config nodeconfig.Config) (App, error) { // Set the data directory permissions to be read write. if err := perms.ChmodR(config.DatabaseConfig.Path, true, perms.ReadWriteExecute); err != nil { return nil, fmt.Errorf("failed to restrict the permissions of the database directory with: %w", err) @@ -71,7 +72,7 @@ func New(config nodeconfig.Config) (App, error) { return nil, err } - n, err := node.New(&config, logFactory, log) + n, err := node.New(ctx, &config, logFactory, log) if err != nil { log.Fatal("failed to initialize node", zap.Error(err)) log.Stop() @@ -86,9 +87,9 @@ func New(config nodeconfig.Config) (App, error) { }, nil } -func Run(app App) int { +func Run(ctx context.Context, app App) int { // start running the application - app.Start() + app.Start(ctx) // register terminationSignals to kill the application terminationSignals := make(chan os.Signal, 1) @@ -138,7 +139,7 @@ type app struct { // Start the business logic of the node (as opposed to config reading, etc). // Does not block until the node is done. -func (a *app) Start() { +func (a *app) Start(ctx context.Context) { // [p.ExitCode] will block until [p.exitWG.Done] is called a.exitWG.Add(1) go func() { @@ -157,7 +158,7 @@ func (a *app) Start() { a.log.StopOnPanic() }() - err := a.node.Dispatch() + err := a.node.Dispatch(ctx) a.log.Debug("dispatch returned", zap.Error(err), ) diff --git a/main/main.go b/main/main.go index b91d47b16075..f077d4880f10 100644 --- a/main/main.go +++ b/main/main.go @@ -4,6 +4,7 @@ package main import ( + "context" "encoding/json" "errors" "fmt" @@ -61,12 +62,12 @@ func main() { fmt.Println(app.Header) } - nodeApp, err := app.New(nodeConfig) + nodeApp, err := app.New(context.Background(), nodeConfig) if err != nil { fmt.Printf("couldn't start node: %s\n", err) os.Exit(1) } - exitCode := app.Run(nodeApp) + exitCode := app.Run(context.Background(), nodeApp) os.Exit(exitCode) } diff --git a/nat/nat.go b/nat/nat.go index 270bae5a9549..ef0214ab98d5 100644 --- a/nat/nat.go +++ b/nat/nat.go @@ -4,6 +4,7 @@ package nat import ( + "context" "net/netip" "sync" "time" @@ -33,7 +34,7 @@ type Router interface { } // GetRouter returns a router on the current network. -func GetRouter() Router { +func GetRouter(ctx context.Context) Router { if r := getUPnPRouter(); r != nil { return r } @@ -41,7 +42,7 @@ func GetRouter() Router { return r } - return NewNoRouter() + return NewNoRouter(ctx) } // Mapper attempts to open a set of ports on a router diff --git a/nat/no_router.go b/nat/no_router.go index f54ce3579330..4a7fbee35f61 100644 --- a/nat/no_router.go +++ b/nat/no_router.go @@ -4,6 +4,7 @@ package nat import ( + "context" "errors" "net" "net/netip" @@ -40,8 +41,12 @@ func (r noRouter) ExternalIP() (netip.Addr, error) { return r.ip, r.ipErr } -func getOutboundIP() (netip.Addr, error) { - conn, err := net.Dial("udp", googleDNSServer) +func getOutboundIP(ctx context.Context) (netip.Addr, error) { + conn, err := (&net.Dialer{}).DialContext( + ctx, + "udp", + googleDNSServer, + ) if err != nil { return netip.Addr{}, err } @@ -63,8 +68,8 @@ func getOutboundIP() (netip.Addr, error) { } // NewNoRouter returns a router that assumes the network is public -func NewNoRouter() Router { - ip, err := getOutboundIP() +func NewNoRouter(ctx context.Context) Router { + ip, err := getOutboundIP(ctx) return &noRouter{ ip: ip, ipErr: err, diff --git a/network/dialer/dialer_test.go b/network/dialer/dialer_test.go index f707c0885358..b32634163d90 100644 --- a/network/dialer/dialer_test.go +++ b/network/dialer/dialer_test.go @@ -33,7 +33,11 @@ func TestDialerDialCanceledContext(t *testing.T) { func TestDialerDial(t *testing.T) { require := require.New(t) - l, err := net.Listen("tcp", "127.0.0.1:0") + l, err := (&net.ListenConfig{}).Listen( + context.Background(), + "tcp", + "127.0.0.1:0", + ) require.NoError(err) listenedAddrPort, err := netip.ParseAddrPort(l.Addr().String()) diff --git a/network/example_test.go b/network/example_test.go index 90d38fdf15f6..201c25143270 100644 --- a/network/example_test.go +++ b/network/example_test.go @@ -122,7 +122,11 @@ func ExampleNewTestNetwork() { // gossip will enable connecting to all the remaining nodes in the network. bootstrappers := genesis.SampleBootstrappers(constants.FujiID, 5) for _, bootstrapper := range bootstrappers { - network.ManuallyTrack(bootstrapper.ID, bootstrapper.IP) + network.ManuallyTrack( + context.Background(), + bootstrapper.ID, + bootstrapper.IP, + ) } // Typically network.StartClose() should be called based on receiving a @@ -137,7 +141,7 @@ func ExampleNewTestNetwork() { // Calling network.Dispatch() will block until a fatal error occurs or // network.StartClose() is called. - err = network.Dispatch() + err = network.Dispatch(context.Background()) log.Info( "network exited", zap.Error(err), diff --git a/network/network.go b/network/network.go index b5300d32d42a..14cdbf1422ab 100644 --- a/network/network.go +++ b/network/network.go @@ -75,11 +75,11 @@ type Network interface { // Should only be called once, will run until either a fatal error occurs, // or the network is closed. - Dispatch() error + Dispatch(ctx context.Context) error // Attempt to connect to this IP. The network will never stop attempting to // connect to this ID. - ManuallyTrack(nodeID ids.NodeID, ip netip.AddrPort) + ManuallyTrack(ctx context.Context, nodeID ids.NodeID, ip netip.AddrPort) // PeerInfo returns information about peers. If [nodeIDs] is empty, returns // info about all peers that have finished the handshake. Otherwise, returns @@ -506,10 +506,13 @@ func (n *network) AllowConnection(nodeID ids.NodeID) bool { return areWeAPrimaryNetworkAValidator || n.ipTracker.WantsConnection(nodeID) } -func (n *network) Track(claimedIPPorts []*ips.ClaimedIPPort) error { +func (n *network) Track( + ctx context.Context, + claimedIPPorts []*ips.ClaimedIPPort, +) error { _, areWeAPrimaryNetworkAValidator := n.config.Validators.GetValidator(constants.PrimaryNetworkID, n.config.MyNodeID) for _, ip := range claimedIPPorts { - if err := n.track(ip, areWeAPrimaryNetworkAValidator); err != nil { + if err := n.track(ctx, ip, areWeAPrimaryNetworkAValidator); err != nil { return err } } @@ -521,17 +524,17 @@ func (n *network) Track(claimedIPPorts []*ips.ClaimedIPPort) error { // It is guaranteed that [Connected] will not be called with [nodeID] after this // call. Note that this is from the perspective of a single peer object, because // a peer with the same ID can reconnect to this network instance. -func (n *network) Disconnected(nodeID ids.NodeID) { +func (n *network) Disconnected(ctx context.Context, nodeID ids.NodeID) { n.peersLock.RLock() _, connecting := n.connectingPeers.GetByID(nodeID) peer, connected := n.connectedPeers.GetByID(nodeID) n.peersLock.RUnlock() if connecting { - n.disconnectedFromConnecting(nodeID) + n.disconnectedFromConnecting(ctx, nodeID) } if connected { - n.disconnectedFromConnected(peer, nodeID) + n.disconnectedFromConnected(ctx, peer, nodeID) } } @@ -599,7 +602,7 @@ func (n *network) Peers( // Dispatch starts accepting connections from other nodes attempting to connect // to this node. -func (n *network) Dispatch() error { +func (n *network) Dispatch(ctx context.Context) error { go n.runTimers() // Periodically perform operations go n.inboundConnUpgradeThrottler.Dispatch() for n.onCloseCtx.Err() == nil { // Continuously accept new connections @@ -647,7 +650,7 @@ func (n *network) Dispatch() error { zap.Stringer("peerIP", ip), ) - if err := n.upgrade(conn, n.serverUpgrader, true); err != nil { + if err := n.upgrade(ctx, conn, n.serverUpgrader, true); err != nil { n.peerConfig.Log.Verbo("failed to upgrade connection", zap.String("direction", "inbound"), zap.Error(err), @@ -670,7 +673,11 @@ func (n *network) Dispatch() error { return errs.Err } -func (n *network) ManuallyTrack(nodeID ids.NodeID, ip netip.AddrPort) { +func (n *network) ManuallyTrack( + ctx context.Context, + nodeID ids.NodeID, + ip netip.AddrPort, +) { n.ipTracker.ManuallyTrack(nodeID) n.peersLock.Lock() @@ -688,11 +695,15 @@ func (n *network) ManuallyTrack(nodeID ids.NodeID, ip netip.AddrPort) { if !isTracked { tracked := newTrackedIP(ip) n.trackedIPs[nodeID] = tracked - n.dial(nodeID, tracked) + n.dial(ctx, nodeID, tracked) } } -func (n *network) track(ip *ips.ClaimedIPPort, trackAllSubnets bool) error { +func (n *network) track( + ctx context.Context, + ip *ips.ClaimedIPPort, + trackAllSubnets bool, +) error { // To avoid signature verification when the IP isn't needed, we // optimistically filter out IPs. This can result in us not tracking an IP // that we otherwise would have. This case can only happen if the node @@ -741,7 +752,7 @@ func (n *network) track(ip *ips.ClaimedIPPort, trackAllSubnets bool) error { tracked = newTrackedIP(ip.AddrPort) } n.trackedIPs[ip.NodeID] = tracked - n.dial(ip.NodeID, tracked) + n.dial(ctx, ip.NodeID, tracked) return nil } @@ -834,7 +845,10 @@ func (n *network) samplePeers( ) } -func (n *network) disconnectedFromConnecting(nodeID ids.NodeID) { +func (n *network) disconnectedFromConnecting( + ctx context.Context, + nodeID ids.NodeID, +) { n.peersLock.Lock() defer n.peersLock.Unlock() @@ -846,7 +860,7 @@ func (n *network) disconnectedFromConnecting(nodeID ids.NodeID) { if n.ipTracker.WantsConnection(nodeID) { tracked := tracked.trackNewIP(tracked.ip) n.trackedIPs[nodeID] = tracked - n.dial(nodeID, tracked) + n.dial(ctx, nodeID, tracked) } else { tracked.stopTracking() delete(n.trackedIPs, nodeID) @@ -856,7 +870,11 @@ func (n *network) disconnectedFromConnecting(nodeID ids.NodeID) { n.metrics.disconnected.Inc() } -func (n *network) disconnectedFromConnected(peer peer.Peer, nodeID ids.NodeID) { +func (n *network) disconnectedFromConnected( + ctx context.Context, + peer peer.Peer, + nodeID ids.NodeID, +) { n.ipTracker.Disconnected(nodeID) n.router.Disconnected(nodeID) @@ -869,7 +887,7 @@ func (n *network) disconnectedFromConnected(peer peer.Peer, nodeID ids.NodeID) { if ip, wantsConnection := n.ipTracker.GetIP(nodeID); wantsConnection { tracked := newTrackedIP(ip.AddrPort) n.trackedIPs[nodeID] = tracked - n.dial(nodeID, tracked) + n.dial(ctx, nodeID, tracked) } n.metrics.markDisconnected(peer) @@ -894,7 +912,7 @@ func (n *network) disconnectedFromConnected(peer peer.Peer, nodeID ids.NodeID) { // If initiating a connection to [ip] fails, then dial will reattempt. However, // there is a randomized exponential backoff to avoid spamming connection // attempts. -func (n *network) dial(nodeID ids.NodeID, ip *trackedIP) { +func (n *network) dial(ctx context.Context, nodeID ids.NodeID, ip *trackedIP) { n.peerConfig.Log.Verbo("attempting to dial node", zap.Stringer("nodeID", nodeID), zap.Stringer("ip", ip.ip), @@ -994,7 +1012,7 @@ func (n *network) dial(nodeID ids.NodeID, ip *trackedIP) { zap.Stringer("peerIP", ip.ip), ) - err = n.upgrade(conn, n.clientUpgrader, false) + err = n.upgrade(ctx, conn, n.clientUpgrader, false) if err != nil { n.peerConfig.Log.Verbo( "failed to upgrade, attempting again", @@ -1017,7 +1035,12 @@ func (n *network) dial(nodeID ids.NodeID, ip *trackedIP) { // If the connection is desired by the node, then the resulting upgraded // connection will be used to create a new peer. Otherwise the connection will // be immediately closed. -func (n *network) upgrade(conn net.Conn, upgrader peer.Upgrader, isIngress bool) error { +func (n *network) upgrade( + ctx context.Context, + conn net.Conn, + upgrader peer.Upgrader, + isIngress bool, +) error { upgradeTimeout := n.peerConfig.Clock.Time().Add(n.config.ReadHandshakeTimeout) if err := conn.SetReadDeadline(upgradeTimeout); err != nil { _ = conn.Close() @@ -1027,7 +1050,7 @@ func (n *network) upgrade(conn net.Conn, upgrader peer.Upgrader, isIngress bool) return err } - nodeID, tlsConn, cert, err := upgrader.Upgrade(conn) + nodeID, tlsConn, cert, err := upgrader.Upgrade(ctx, conn) if err != nil { _ = conn.Close() n.peerConfig.Log.Verbo("failed to upgrade connection", @@ -1107,6 +1130,7 @@ func (n *network) upgrade(conn net.Conn, upgrader peer.Upgrader, isIngress bool) // same [peerConfig.InboundMsgThrottler]. This is guaranteed by the above // de-duplications for [connectingPeers] and [connectedPeers]. peer := peer.Start( + ctx, n.peerConfig, tlsConn, cert, diff --git a/network/network_test.go b/network/network_test.go index faa74a26e822..cf220f84d6f6 100644 --- a/network/network_test.go +++ b/network/network_test.go @@ -280,7 +280,11 @@ func newFullyConnectedTestNetwork(t *testing.T, handlers []router.InboundHandler for i, net := range networks { if i != 0 { config := configs[0] - net.ManuallyTrack(config.MyNodeID, config.MyIPPort.Get()) + net.ManuallyTrack( + context.Background(), + config.MyNodeID, + config.MyIPPort.Get(), + ) // Wait until the node is connected to the first node. // This forces nodes to connect to each other in a deterministic order. require.Eventually(func() bool { @@ -288,7 +292,9 @@ func newFullyConnectedTestNetwork(t *testing.T, handlers []router.InboundHandler }, 10*time.Second, time.Millisecond) } - eg.Go(net.Dispatch) + eg.Go(func() error { + return net.Dispatch(context.Background()) + }) } if len(networks) > 1 { @@ -469,17 +475,19 @@ func TestTrackVerifiesSignatures(t *testing.T) { stakingCert, err := staking.ParseCertificate(tlsCert.Leaf.Raw) require.NoError(err) - err = network.Track([]*ips.ClaimedIPPort{ - ips.NewClaimedIPPort( - stakingCert, - netip.AddrPortFrom( - netip.AddrFrom4([4]byte{123, 132, 123, 123}), - 10000, + err = network.Track( + context.Background(), + []*ips.ClaimedIPPort{ + ips.NewClaimedIPPort( + stakingCert, + netip.AddrPortFrom( + netip.AddrFrom4([4]byte{123, 132, 123, 123}), + 10000, + ), + 1000, // timestamp + nil, // signature ), - 1000, // timestamp - nil, // signature - ), - }) + }) // The signature is wrong so this peer tracking info isn't useful. require.ErrorIs(err, staking.ErrECDSAVerificationFailure) @@ -539,10 +547,16 @@ func TestTrackDoesNotDialPrivateIPs(t *testing.T) { for i, net := range networks { if i != 0 { config := configs[0] - net.ManuallyTrack(config.MyNodeID, config.MyIPPort.Get()) + net.ManuallyTrack( + context.Background(), + config.MyNodeID, + config.MyIPPort.Get(), + ) } - eg.Go(net.Dispatch) + eg.Go(func() error { + return net.Dispatch(context.Background()) + }) } network := networks[1].(*network) @@ -619,17 +633,22 @@ func TestDialDeletesNonValidators(t *testing.T) { stakingCert, err := staking.ParseCertificate(config.TLSConfig.Certificates[0].Leaf.Raw) require.NoError(err) - require.NoError(net.Track([]*ips.ClaimedIPPort{ - ips.NewClaimedIPPort( - stakingCert, - ip.AddrPort, - ip.Timestamp, - ip.TLSSignature, - ), - })) + require.NoError(net.Track( + context.Background(), + []*ips.ClaimedIPPort{ + ips.NewClaimedIPPort( + stakingCert, + ip.AddrPort, + ip.Timestamp, + ip.TLSSignature, + ), + }, + )) } - eg.Go(net.Dispatch) + eg.Go(func() error { + return net.Dispatch(context.Background()) + }) } // Give the dialer time to run one iteration. This is racy, but should ony @@ -682,12 +701,12 @@ func TestDialContext(t *testing.T) { } ) - network.ManuallyTrack(neverDialedNodeID, neverDialedIP) - network.ManuallyTrack(dialedNodeID, dialedIP) + network.ManuallyTrack(context.Background(), neverDialedNodeID, neverDialedIP) + network.ManuallyTrack(context.Background(), dialedNodeID, dialedIP) // Sanity check that when a non-cancelled context is given, // we actually dial the peer. - network.dial(dialedNodeID, dialedTrackedIP) + network.dial(context.Background(), dialedNodeID, dialedTrackedIP) gotDialedIPConn := make(chan struct{}) go func() { @@ -699,7 +718,7 @@ func TestDialContext(t *testing.T) { // Asset that when [n.onCloseCtx] is cancelled, dial returns immediately. // That is, [neverDialedListener] doesn't accept a connection. network.onCloseCtxCancel() - network.dial(neverDialedNodeID, neverDialedTrackedIP) + network.dial(context.Background(), neverDialedNodeID, neverDialedTrackedIP) gotNeverDialedIPConn := make(chan struct{}) go func() { @@ -759,10 +778,16 @@ func TestAllowConnectionAsAValidator(t *testing.T) { for i, net := range networks { if i != 0 { config := configs[0] - net.ManuallyTrack(config.MyNodeID, config.MyIPPort.Get()) + net.ManuallyTrack( + context.Background(), + config.MyNodeID, + config.MyIPPort.Get(), + ) } - eg.Go(net.Dispatch) + eg.Go(func() error { + return net.Dispatch(context.Background()) + }) } network := networks[1].(*network) @@ -818,8 +843,14 @@ func TestGetAllPeers(t *testing.T) { ) // Connect the non-validator peer to the validator network - nonValidatorNetwork.ManuallyTrack(networks[0].config.MyNodeID, networks[0].config.MyIPPort.Get()) - eg.Go(nonValidatorNetwork.Dispatch) + nonValidatorNetwork.ManuallyTrack( + context.Background(), + networks[0].config.MyNodeID, + networks[0].config.MyIPPort.Get(), + ) + eg.Go(func() error { + return nonValidatorNetwork.Dispatch(context.Background()) + }) { // The non-validator peer should be able to get all the peers in the network diff --git a/network/peer/network.go b/network/peer/network.go index 01b54a55583a..7a1dd5b2576e 100644 --- a/network/peer/network.go +++ b/network/peer/network.go @@ -4,6 +4,8 @@ package peer import ( + "context" + "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/ips" @@ -22,13 +24,13 @@ type Network interface { // Track allows the peer to notify the network of potential new peers to // connect to. - Track(ips []*ips.ClaimedIPPort) error + Track(ctx context.Context, ips []*ips.ClaimedIPPort) error // Disconnected is called when the peer finishes shutting down. It is not // guaranteed that [Connected] was called for the provided peer. However, it // is guaranteed that [Connected] will not be called after [Disconnected] // for a given [Peer] object. - Disconnected(peerID ids.NodeID) + Disconnected(ctx context.Context, peerID ids.NodeID) // KnownPeers returns the bloom filter of the known peers. KnownPeers() (bloomFilter []byte, salt []byte) diff --git a/network/peer/peer.go b/network/peer/peer.go index 91eb6ef62668..a4bf961a3572 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -205,6 +205,7 @@ type peer struct { // Invariant: There must only be one peer running at a time with a reference to // the same [config.InboundMsgThrottler]. func Start( + ctx context.Context, config *Config, conn net.Conn, cert *staking.Certificate, @@ -232,9 +233,9 @@ func Start( p.IngressConnectionCount.Add(1) } - go p.readMessages() - go p.writeMessages() - go p.sendNetworkMessages() + go p.readMessages(ctx) + go p.writeMessages(ctx) + go p.sendNetworkMessages(ctx) return p } @@ -355,7 +356,7 @@ func (p *peer) AwaitClosed(ctx context.Context) error { // close should be called at the end of each goroutine that has been spun up. // When the last goroutine is exiting, the peer will be marked as closed. -func (p *peer) close() { +func (p *peer) close(ctx context.Context) { if atomic.AddInt64(&p.numExecuting, -1) != 0 { return } @@ -364,19 +365,19 @@ func (p *peer) close() { p.IngressConnectionCount.Add(-1) } - p.Network.Disconnected(p.id) + p.Network.Disconnected(ctx, p.id) close(p.onClosed) } // Read and handle messages from this peer. // When this method returns, the connection is closed. -func (p *peer) readMessages() { +func (p *peer) readMessages(ctx context.Context) { // Track this node with the inbound message throttler. p.InboundMsgThrottler.AddNode(p.id) defer func() { p.InboundMsgThrottler.RemoveNode(p.id) p.StartClose() - p.close() + p.close(ctx) }() // Continuously read and handle messages from this peer. @@ -495,15 +496,15 @@ func (p *peer) readMessages() { // Handle the message. Note that when we are done handling this message, // we must call [msg.OnFinishedHandling()]. - p.handle(msg) + p.handle(ctx, msg) p.ResourceTracker.StopProcessing(p.id, p.Clock.Time()) } } -func (p *peer) writeMessages() { +func (p *peer) writeMessages(ctx context.Context) { defer func() { p.StartClose() - p.close() + p.close(ctx) }() writer := bufio.NewWriterSize(p.conn, p.Config.WriteBufferSize) @@ -627,13 +628,13 @@ func (p *peer) writeMessage(writer io.Writer, msg message.OutboundMessage) { p.Metrics.Sent(msg) } -func (p *peer) sendNetworkMessages() { +func (p *peer) sendNetworkMessages(ctx context.Context) { sendPingsTicker := time.NewTicker(p.PingFrequency) defer func() { sendPingsTicker.Stop() p.StartClose() - p.close() + p.close(ctx) }() for { @@ -736,7 +737,7 @@ func (p *peer) shouldDisconnect() bool { return false } -func (p *peer) handle(msg message.InboundMessage) { +func (p *peer) handle(ctx context.Context, msg message.InboundMessage) { switch m := msg.Message().(type) { // Network-related message types case *p2p.Ping: p.handlePing(m) @@ -755,7 +756,7 @@ func (p *peer) handle(msg message.InboundMessage) { msg.OnFinishedHandling() return case *p2p.PeerList: - p.handlePeerList(m) + p.handlePeerList(ctx, m) msg.OnFinishedHandling() return } @@ -1128,7 +1129,7 @@ func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { p.Send(p.onClosingCtx, peerListMsg) } -func (p *peer) handlePeerList(msg *p2p.PeerList) { +func (p *peer) handlePeerList(ctx context.Context, msg *p2p.PeerList) { if !p.finishedHandshake.Get() { if !p.gotHandshake.Get() { return @@ -1188,7 +1189,7 @@ func (p *peer) handlePeerList(msg *p2p.PeerList) { ) } - if err := p.Network.Track(discoveredIPs); err != nil { + if err := p.Network.Track(ctx, discoveredIPs); err != nil { p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), diff --git a/network/peer/peer_test.go b/network/peer/peer_test.go index 8fe67906e713..3472f257d8f7 100644 --- a/network/peer/peer_test.go +++ b/network/peer/peer_test.go @@ -131,6 +131,7 @@ func newRawTestPeer(t *testing.T, config *Config) *rawTestPeer { func startTestPeer(self *rawTestPeer, peer *rawTestPeer, conn net.Conn) *testPeer { return &testPeer{ Peer: Start( + context.Background(), self.config, conn, peer.cert, diff --git a/network/peer/test_network.go b/network/peer/test_network.go index ac57535bc28a..d1596dc30589 100644 --- a/network/peer/test_network.go +++ b/network/peer/test_network.go @@ -4,6 +4,8 @@ package peer import ( + "context" + "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/ips" @@ -20,11 +22,11 @@ func (testNetwork) AllowConnection(ids.NodeID) bool { return true } -func (testNetwork) Track([]*ips.ClaimedIPPort) error { +func (testNetwork) Track(context.Context, []*ips.ClaimedIPPort) error { return nil } -func (testNetwork) Disconnected(ids.NodeID) {} +func (testNetwork) Disconnected(context.Context, ids.NodeID) {} func (testNetwork) KnownPeers() ([]byte, []byte) { return bloom.EmptyFilter.Marshal(), nil diff --git a/network/peer/test_peer.go b/network/peer/test_peer.go index 78a949b10cbe..b8c399fdbb39 100644 --- a/network/peer/test_peer.go +++ b/network/peer/test_peer.go @@ -70,7 +70,7 @@ func StartTestPeer( prometheus.NewCounter(prometheus.CounterOpts{}), ) - peerID, conn, cert, err := clientUpgrader.Upgrade(conn) + peerID, conn, cert, err := clientUpgrader.Upgrade(ctx, conn) if err != nil { return nil, err } @@ -106,6 +106,7 @@ func StartTestPeer( } peer := Start( + ctx, &Config{ Metrics: metrics, MessageCreator: mc, diff --git a/network/peer/upgrader.go b/network/peer/upgrader.go index 5d42abdb5b31..5ea0e4ba949e 100644 --- a/network/peer/upgrader.go +++ b/network/peer/upgrader.go @@ -4,6 +4,7 @@ package peer import ( + "context" "crypto/tls" "errors" "net" @@ -23,7 +24,12 @@ var ( type Upgrader interface { // Must be thread safe - Upgrade(net.Conn) (ids.NodeID, net.Conn, *staking.Certificate, error) + Upgrade(ctx context.Context, conn net.Conn) ( + ids.NodeID, + net.Conn, + *staking.Certificate, + error, + ) } type tlsServerUpgrader struct { @@ -38,8 +44,11 @@ func NewTLSServerUpgrader(config *tls.Config, invalidCerts prometheus.Counter) U } } -func (t *tlsServerUpgrader) Upgrade(conn net.Conn) (ids.NodeID, net.Conn, *staking.Certificate, error) { - return connToIDAndCert(tls.Server(conn, t.config), t.invalidCerts) +func (t *tlsServerUpgrader) Upgrade( + ctx context.Context, + conn net.Conn, +) (ids.NodeID, net.Conn, *staking.Certificate, error) { + return connToIDAndCert(ctx, tls.Server(conn, t.config), t.invalidCerts) } type tlsClientUpgrader struct { @@ -54,12 +63,19 @@ func NewTLSClientUpgrader(config *tls.Config, invalidCerts prometheus.Counter) U } } -func (t *tlsClientUpgrader) Upgrade(conn net.Conn) (ids.NodeID, net.Conn, *staking.Certificate, error) { - return connToIDAndCert(tls.Client(conn, t.config), t.invalidCerts) +func (t *tlsClientUpgrader) Upgrade( + ctx context.Context, + conn net.Conn, +) (ids.NodeID, net.Conn, *staking.Certificate, error) { + return connToIDAndCert(ctx, tls.Client(conn, t.config), t.invalidCerts) } -func connToIDAndCert(conn *tls.Conn, invalidCerts prometheus.Counter) (ids.NodeID, net.Conn, *staking.Certificate, error) { - if err := conn.Handshake(); err != nil { +func connToIDAndCert( + ctx context.Context, + conn *tls.Conn, + invalidCerts prometheus.Counter, +) (ids.NodeID, net.Conn, *staking.Certificate, error) { + if err := conn.HandshakeContext(ctx); err != nil { return ids.EmptyNodeID, nil, nil, err } diff --git a/network/peer/upgrader_test.go b/network/peer/upgrader_test.go index bd5ba1325d44..eb370be408f2 100644 --- a/network/peer/upgrader_test.go +++ b/network/peer/upgrader_test.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "golang.org/x/net/context" "golang.org/x/sync/errgroup" _ "embed" @@ -104,7 +105,11 @@ func TestBlockClientsWithIncorrectRSAKeys(t *testing.T) { Certificates: []tls.Certificate{testCase.genClientTLSCert()}, } - listener, err := net.Listen("tcp", "127.0.0.1:0") + listener, err := (&net.ListenConfig{}).Listen( + context.Background(), + "tcp", + "127.0.0.1:0", + ) require.NoError(t, err) defer listener.Close() @@ -115,13 +120,19 @@ func TestBlockClientsWithIncorrectRSAKeys(t *testing.T) { return err } - _, _, _, err = upgrader.Upgrade(conn) + _, _, _, err = upgrader.Upgrade(context.Background(), conn) return err }) - conn, err := tls.Dial("tcp", listener.Addr().String(), &clientConfig) + conn, err := (&tls.Dialer{Config: &clientConfig}).DialContext( + context.Background(), + "tcp", + listener.Addr().String(), + ) require.NoError(t, err) - require.NoError(t, conn.Handshake()) + tlsConn, ok := conn.(*tls.Conn) + require.True(t, ok) + require.NoError(t, tlsConn.HandshakeContext(context.Background())) err = eg.Wait() require.ErrorIs(t, err, testCase.expectedErr) diff --git a/node/node.go b/node/node.go index 4319153d3dd9..5dcebc76d60f 100644 --- a/node/node.go +++ b/node/node.go @@ -118,6 +118,7 @@ var ( // New returns an instance of Node func New( + ctx context.Context, config *node.Config, logFactory logging.Factory, logger logging.Logger, @@ -186,8 +187,8 @@ func New( return nil, fmt.Errorf("couldn't initialize metrics: %w", err) } - n.initNAT() - if err := n.initAPIServer(); err != nil { // Start the API Server + n.initNAT(ctx) + if err := n.initAPIServer(ctx); err != nil { // Start the API Server return nil, fmt.Errorf("couldn't initialize API server: %w", err) } @@ -233,7 +234,11 @@ func New( } n.initCPUTargeter(&config.CPUTargeterConfig) n.initDiskTargeter(&config.DiskTargeterConfig) - if err := n.initNetworking(networkRegisterer); err != nil { // Set up networking layer. + // Set up networking layer. + if err := n.initNetworking( + ctx, + networkRegisterer, + ); err != nil { return nil, fmt.Errorf("problem initializing networking: %w", err) } @@ -410,7 +415,10 @@ type Node struct { // Initialize the networking layer. // Assumes [n.vdrs], [n.CPUTracker], and [n.CPUTargeter] have been initialized. -func (n *Node) initNetworking(reg prometheus.Registerer) error { +func (n *Node) initNetworking( + ctx context.Context, + reg prometheus.Registerer, +) error { // Providing either loopback address - `::1` for ipv6 and `127.0.0.1` for ipv4 - as the listen // host will avoid the need for a firewall exception on recent MacOS: // @@ -429,7 +437,11 @@ func (n *Node) initNetworking(reg prometheus.Registerer) error { // 1: https://apple.stackexchange.com/questions/393715/do-you-want-the-application-main-to-accept-incoming-network-connections-pop // 2: https://github.com/golang/go/issues/56998 listenAddress := net.JoinHostPort(n.Config.ListenHost, strconv.FormatUint(uint64(n.Config.ListenPort), 10)) - listener, err := net.Listen(constants.NetworkType, listenAddress) + listener, err := (&net.ListenConfig{}).Listen( + ctx, + constants.NetworkType, + listenAddress, + ) if err != nil { return err } @@ -664,7 +676,7 @@ func (n *Node) writeProcessContext() error { // Dispatch starts the node's servers. // Returns when the node exits. -func (n *Node) Dispatch() error { +func (n *Node) Dispatch(ctx context.Context) error { if err := n.writeProcessContext(); err != nil { return err } @@ -710,16 +722,16 @@ func (n *Node) Dispatch() error { // Add state sync nodes to the peer network for i, peerIP := range n.Config.StateSyncIPs { - n.Net.ManuallyTrack(n.Config.StateSyncIDs[i], peerIP) + n.Net.ManuallyTrack(ctx, n.Config.StateSyncIDs[i], peerIP) } // Add bootstrap nodes to the peer network for _, bootstrapper := range n.Config.Bootstrappers { - n.Net.ManuallyTrack(bootstrapper.ID, bootstrapper.IP) + n.Net.ManuallyTrack(ctx, bootstrapper.ID, bootstrapper.IP) } // Start P2P connections - retErr := n.Net.Dispatch() + retErr := n.Net.Dispatch(ctx) // If the P2P server isn't running, shut down the node. // If node is already shutting down, this does not tigger shutdown again, @@ -909,32 +921,32 @@ func (n *Node) initMetrics() error { ) } -func (n *Node) initNAT() { +func (n *Node) initNAT(ctx context.Context) { n.Log.Info("initializing NAT") if n.Config.PublicIP == "" && n.Config.PublicIPResolutionService == "" { - n.router = nat.GetRouter() + n.router = nat.GetRouter(ctx) if !n.router.SupportsNAT() { n.Log.Warn("UPnP and NAT-PMP router attach failed, " + "you may not be listening publicly. " + "Please confirm the settings in your router") } } else { - n.router = nat.NewNoRouter() + n.router = nat.NewNoRouter(ctx) } n.portMapper = nat.NewPortMapper(n.Log, n.router) } // initAPIServer initializes the server that handles HTTP calls -func (n *Node) initAPIServer() error { +func (n *Node) initAPIServer(ctx context.Context) error { n.Log.Info("initializing API server") // An empty host is treated as a wildcard to match all addresses, so it is // considered public. hostIsPublic := n.Config.HTTPHost == "" if !hostIsPublic { - ip, err := ips.Lookup(n.Config.HTTPHost) + ip, err := ips.Lookup(ctx, n.Config.HTTPHost) if err != nil { n.Log.Fatal("failed to lookup HTTP host", zap.String("host", n.Config.HTTPHost), @@ -952,7 +964,7 @@ func (n *Node) initAPIServer() error { } listenAddress := net.JoinHostPort(n.Config.HTTPHost, strconv.FormatUint(uint64(n.Config.HTTPPort), 10)) - listener, err := net.Listen("tcp", listenAddress) + listener, err := (&net.ListenConfig{}).Listen(ctx, "tcp", listenAddress) if err != nil { return err } diff --git a/tests/antithesis/init_db.go b/tests/antithesis/init_db.go index baeba261ddcf..ebe429c811ad 100644 --- a/tests/antithesis/init_db.go +++ b/tests/antithesis/init_db.go @@ -50,7 +50,7 @@ func initBootstrapDB(network *tmpnet.Network, destPath string) error { return fmt.Errorf("failed to create db path %q: %w", destPath, err) } // TODO(marun) Replace with os.CopyFS once we upgrade to Go 1.23 - cmd := exec.Command("cp", "-r", sourcePath, destPath) + cmd := exec.CommandContext(ctx, "cp", "-r", sourcePath, destPath) if err := cmd.Run(); err != nil { return fmt.Errorf("failed to copy bootstrap db from %q to %q: %w", sourcePath, destPath, err) } diff --git a/tests/fixture/bootstrapmonitor/wait.go b/tests/fixture/bootstrapmonitor/wait.go index d813835be2fe..b5b070c0a565 100644 --- a/tests/fixture/bootstrapmonitor/wait.go +++ b/tests/fixture/bootstrapmonitor/wait.go @@ -82,7 +82,7 @@ func WaitForCompletion( defer cancel() // Define common fields for logging - diskUsage := getDiskUsage(log, dataDir) + diskUsage := getDiskUsage(ctx, log, dataDir) commonFields := []zap.Field{ zap.String("diskUsage", diskUsage), zap.Duration("duration", time.Since(testDetails.StartTime)), @@ -149,8 +149,8 @@ func WaitForCompletion( } // Determines the current disk usage for the specified directory -func getDiskUsage(log logging.Logger, dir string) string { - cmd := exec.Command("du", "-sh", dir) +func getDiskUsage(ctx context.Context, log logging.Logger, dir string) string { + cmd := exec.CommandContext(ctx, "du", "-sh", dir) // Create a buffer to capture stderr in case an unexpected error occurs var stderr bytes.Buffer diff --git a/tests/fixture/tmpnet/monitor_processes.go b/tests/fixture/tmpnet/monitor_processes.go index f5de083d3800..7dff19e7a9ab 100644 --- a/tests/fixture/tmpnet/monitor_processes.go +++ b/tests/fixture/tmpnet/monitor_processes.go @@ -495,7 +495,7 @@ func startCollectorProcess( zap.String("logPath", filepath.Join(workingDir, logFilename)), ) - cmd := exec.Command("bash", "-c", fullCmd) + cmd := exec.CommandContext(ctx, "bash", "-c", fullCmd) configureDetachedProcess(cmd) // Ensure the child process will outlive its parent cmd.Dir = workingDir if err := cmd.Start(); err != nil { diff --git a/tests/fixture/tmpnet/network.go b/tests/fixture/tmpnet/network.go index 8a3e19c42ca6..760e96f34175 100644 --- a/tests/fixture/tmpnet/network.go +++ b/tests/fixture/tmpnet/network.go @@ -175,7 +175,12 @@ func BootstrapNewNetwork( return errInsufficientNodes } - if err := checkVMBinaries(log, network.Subnets, network.DefaultRuntimeConfig.Process); err != nil { + if err := checkVMBinaries( + ctx, + log, + network.Subnets, + network.DefaultRuntimeConfig.Process, + ); err != nil { return err } if err := network.EnsureDefaultConfig(ctx, log); err != nil { @@ -1010,7 +1015,12 @@ const invalidRPCVersion = 0 // checkVMBinaries checks that VM binaries for the given subnets exist and optionally checks that VM // binaries have the same rpcchainvm version as the indicated avalanchego binary. -func checkVMBinaries(log logging.Logger, subnets []*Subnet, config *ProcessRuntimeConfig) error { +func checkVMBinaries( + ctx context.Context, + log logging.Logger, + subnets []*Subnet, + config *ProcessRuntimeConfig, +) error { if len(subnets) == 0 { // Without subnets there are no VM binaries to check return nil @@ -1021,7 +1031,12 @@ func checkVMBinaries(log logging.Logger, subnets []*Subnet, config *ProcessRunti return nil } - avalanchegoRPCVersion, err := getRPCVersion(log, config.AvalancheGoPath, "--version-json") + avalanchegoRPCVersion, err := getRPCVersion( + ctx, + log, + config.AvalancheGoPath, + "--version-json", + ) if err != nil { log.Warn("unable to check rpcchainvm version for avalanchego", zap.Error(err)) return nil @@ -1047,7 +1062,7 @@ func checkVMBinaries(log logging.Logger, subnets []*Subnet, config *ProcessRunti } // Check that the VM's rpcchainvm version matches avalanchego's version - vmRPCVersion, err := getRPCVersion(log, vmPath, chain.VersionArgs...) + vmRPCVersion, err := getRPCVersion(ctx, log, vmPath, chain.VersionArgs...) if err != nil { log.Warn("unable to check rpcchainvm version for VM Binary", zap.String("subnet", subnet.Name), @@ -1078,8 +1093,13 @@ type RPCChainVMVersion struct { // getRPCVersion attempts to invoke the given command with the specified version arguments and // retrieve an rpcchainvm version from its output. -func getRPCVersion(log logging.Logger, command string, versionArgs ...string) (uint64, error) { - cmd := exec.Command(command, versionArgs...) +func getRPCVersion( + ctx context.Context, + log logging.Logger, + command string, + versionArgs ...string, +) (uint64, error) { + cmd := exec.CommandContext(ctx, command, versionArgs...) output, err := cmd.CombinedOutput() if err != nil { return 0, fmt.Errorf("command %q failed with output: %s", command, output) diff --git a/tests/fixture/tmpnet/process_runtime.go b/tests/fixture/tmpnet/process_runtime.go index 8aaf5e36f309..561192a5e567 100644 --- a/tests/fixture/tmpnet/process_runtime.go +++ b/tests/fixture/tmpnet/process_runtime.go @@ -105,7 +105,12 @@ func (p *ProcessRuntime) Start(ctx context.Context) error { runtimeConfig := p.getRuntimeConfig() // Attempt to check for rpc version compatibility - if err := checkVMBinaries(log, p.node.network.Subnets, runtimeConfig); err != nil { + if err := checkVMBinaries( + ctx, + log, + p.node.network.Subnets, + runtimeConfig, + ); err != nil { return err } @@ -119,9 +124,16 @@ func (p *ProcessRuntime) Start(ctx context.Context) error { return fmt.Errorf("writing node flags: %w", err) } - // All arguments are provided in the flags file - cmd := exec.Command(runtimeConfig.AvalancheGoPath, "--config-file", p.node.GetFlagsPath()) // #nosec G204 - // Ensure process is detached from the parent process so that an error in the parent will not affect the child + // All arguments are provided in the flags file. + // + // Ensure process is detached from the parent process so that an error in the + // parent will not affect the child. + cmd := exec.CommandContext( + context.WithoutCancel(ctx), + runtimeConfig.AvalancheGoPath, + "--config-file", + p.node.GetFlagsPath(), + ) // #nosec G204 configureDetachedProcess(cmd) if err := cmd.Start(); err != nil { diff --git a/tests/load/main/main.go b/tests/load/main/main.go index 3561ac542f7c..e34dc513a31b 100644 --- a/tests/load/main/main.go +++ b/tests/load/main/main.go @@ -74,7 +74,7 @@ func main() { require.NoError(err) registry := prometheus.NewRegistry() - metricsServer, err := tests.NewPrometheusServer(registry) + metricsServer, err := tests.NewPrometheusServer(ctx, registry) require.NoError(err) tc.DeferCleanup(func() { require.NoError(metricsServer.Stop()) diff --git a/tests/prometheus_server.go b/tests/prometheus_server.go index c87a61e7fc98..79dcfe327343 100644 --- a/tests/prometheus_server.go +++ b/tests/prometheus_server.go @@ -27,12 +27,15 @@ type PrometheusServer struct { // NewPrometheusServer creates and starts a Prometheus server with the provided gatherer // listening on 127.0.0.1:0 and serving /ext/metrics. -func NewPrometheusServer(gatherer prometheus.Gatherer) (*PrometheusServer, error) { +func NewPrometheusServer( + ctx context.Context, + gatherer prometheus.Gatherer, +) (*PrometheusServer, error) { server := &PrometheusServer{ gatherer: gatherer, } - if err := server.start(); err != nil { + if err := server.start(ctx); err != nil { return nil, err } @@ -40,11 +43,15 @@ func NewPrometheusServer(gatherer prometheus.Gatherer) (*PrometheusServer, error } // start the Prometheus server on a dynamic port. -func (s *PrometheusServer) start() error { +func (s *PrometheusServer) start(ctx context.Context) error { mux := http.NewServeMux() mux.Handle("/ext/metrics", promhttp.HandlerFor(s.gatherer, promhttp.HandlerOpts{})) - listener, err := net.Listen("tcp", defaultPrometheusListenAddr) + listener, err := (&net.ListenConfig{}).Listen( + ctx, + "tcp", + defaultPrometheusListenAddr, + ) if err != nil { return err } diff --git a/tests/reexecute/c/vm_reexecute_test.go b/tests/reexecute/c/vm_reexecute_test.go index 87e98255a6c0..dbabca5fd880 100644 --- a/tests/reexecute/c/vm_reexecute_test.go +++ b/tests/reexecute/c/vm_reexecute_test.go @@ -496,7 +496,7 @@ func collectRegistry(tb testing.TB, name string, timeout time.Duration, gatherer r.NoError(tmpnet.StartPrometheus(ctx, tests.NewDefaultLogger("prometheus"))) - server, err := tests.NewPrometheusServer(gatherer) + server, err := tests.NewPrometheusServer(ctx, gatherer) r.NoError(err) var sdConfigFilePath string diff --git a/utils/ips/lookup.go b/utils/ips/lookup.go index c8584a16773a..58eae110bfa7 100644 --- a/utils/ips/lookup.go +++ b/utils/ips/lookup.go @@ -4,6 +4,7 @@ package ips import ( + "context" "errors" "net" "net/netip" @@ -16,8 +17,8 @@ var errNoIPsFound = errors.New("no IPs found") // pick any of the IPs. // // Note: IPv4 is preferred because `net.Listen` prefers IPv4. -func Lookup(hostname string) (netip.Addr, error) { - ips, err := net.LookupIP(hostname) +func Lookup(ctx context.Context, hostname string) (netip.Addr, error) { + ips, err := (&net.Resolver{}).LookupIPAddr(ctx, hostname) if err != nil { return netip.Addr{}, err } @@ -26,12 +27,12 @@ func Lookup(hostname string) (netip.Addr, error) { } for _, ip := range ips { - ipv4 := ip.To4() + ipv4 := ip.IP.To4() if ipv4 != nil { addr, _ := AddrFromSlice(ipv4) return addr, nil } } - addr, _ := AddrFromSlice(ips[0]) + addr, _ := AddrFromSlice(ips[0].IP) return addr, nil } diff --git a/utils/ips/lookup_test.go b/utils/ips/lookup_test.go index ce9d0aa6d512..e7c15788f294 100644 --- a/utils/ips/lookup_test.go +++ b/utils/ips/lookup_test.go @@ -4,6 +4,7 @@ package ips import ( + "context" "net/netip" "testing" @@ -36,7 +37,7 @@ func TestLookup(t *testing.T) { t.Run(tt.host, func(t *testing.T) { require := require.New(t) - ip, err := Lookup(tt.host) + ip, err := Lookup(context.Background(), tt.host) require.NoError(err) require.Equal(tt.ip, ip) }) diff --git a/vms/rpcchainvm/batched_vm_test.go b/vms/rpcchainvm/batched_vm_test.go index 502c77aeeaae..7b2e0063dc2d 100644 --- a/vms/rpcchainvm/batched_vm_test.go +++ b/vms/rpcchainvm/batched_vm_test.go @@ -76,7 +76,7 @@ func TestBatchedParseBlockCaching(t *testing.T) { testKey := batchedParseBlockCachingTestKey // Create and start the plugin - vm := buildClientHelper(require, testKey) + vm := buildClientHelper(context.Background(), require, testKey) defer vm.runtime.Stop(context.Background()) ctx := snowtest.Context(t, snowtest.CChainID) diff --git a/vms/rpcchainvm/ghttp/gresponsewriter/writer_server.go b/vms/rpcchainvm/ghttp/gresponsewriter/writer_server.go index a2d05e22570d..8eb42fa00584 100644 --- a/vms/rpcchainvm/ghttp/gresponsewriter/writer_server.go +++ b/vms/rpcchainvm/ghttp/gresponsewriter/writer_server.go @@ -82,7 +82,10 @@ func (s *Server) Flush(context.Context, *emptypb.Empty) (*emptypb.Empty, error) return &emptypb.Empty{}, nil } -func (s *Server) Hijack(context.Context, *emptypb.Empty) (*responsewriterpb.HijackResponse, error) { +func (s *Server) Hijack( + _ context.Context, + _ *emptypb.Empty, +) (*responsewriterpb.HijackResponse, error) { hijacker, ok := s.writer.(http.Hijacker) if !ok { return nil, errUnsupportedHijacking diff --git a/vms/rpcchainvm/grpcutils/server.go b/vms/rpcchainvm/grpcutils/server.go index 99d865a9db5b..2d0fab9845c2 100644 --- a/vms/rpcchainvm/grpcutils/server.go +++ b/vms/rpcchainvm/grpcutils/server.go @@ -4,6 +4,7 @@ package grpcutils import ( + "context" "math" "net" "time" @@ -89,7 +90,7 @@ func WithStreamInterceptor(streamInterceptor grpc.StreamServerInterceptor) Serve // NewListener returns a TCP listener listening against the next available port // on the system bound to localhost. func NewListener() (net.Listener, error) { - return net.Listen("tcp", "127.0.0.1:") + return (&net.ListenConfig{}).Listen(context.Background(), "tcp", "127.0.0.1:") } // Serve will start a gRPC server and block until it errors or is shutdown. diff --git a/vms/rpcchainvm/runtime/subprocess/non_linux_stopper.go b/vms/rpcchainvm/runtime/subprocess/non_linux_stopper.go index d5bdc5921318..55d58d009033 100644 --- a/vms/rpcchainvm/runtime/subprocess/non_linux_stopper.go +++ b/vms/rpcchainvm/runtime/subprocess/non_linux_stopper.go @@ -16,7 +16,7 @@ import ( ) func NewCmd(path string, args ...string) *exec.Cmd { - return exec.Command(path, args...) + return exec.CommandContext(context.Background(), path, args...) } func stop(_ context.Context, log logging.Logger, cmd *exec.Cmd) { diff --git a/vms/rpcchainvm/state_syncable_vm_test.go b/vms/rpcchainvm/state_syncable_vm_test.go index fe3712daae69..71e11c7238ae 100644 --- a/vms/rpcchainvm/state_syncable_vm_test.go +++ b/vms/rpcchainvm/state_syncable_vm_test.go @@ -263,8 +263,12 @@ func lastAcceptedBlockPostStateSummaryAcceptTestPlugin(t *testing.T, loadExpecta return ssVM } -func buildClientHelper(require *require.Assertions, testKey string) *VMClient { - process := helperProcess(testKey) +func buildClientHelper( + ctx context.Context, + require *require.Assertions, + testKey string, +) *VMClient { + process := helperProcess(ctx, testKey) log := logging.NewLogger( testKey, @@ -302,7 +306,7 @@ func TestStateSyncEnabled(t *testing.T) { testKey := stateSyncEnabledTestKey // Create and start the plugin - vm := buildClientHelper(require, testKey) + vm := buildClientHelper(context.Background(), require, testKey) defer vm.runtime.Stop(context.Background()) // test state sync not implemented @@ -333,7 +337,7 @@ func TestGetOngoingSyncStateSummary(t *testing.T) { testKey := getOngoingSyncStateSummaryTestKey // Create and start the plugin - vm := buildClientHelper(require, testKey) + vm := buildClientHelper(context.Background(), require, testKey) defer vm.runtime.Stop(context.Background()) // test unimplemented case; this is just a guard @@ -358,7 +362,7 @@ func TestGetLastStateSummary(t *testing.T) { testKey := getLastStateSummaryTestKey // Create and start the plugin - vm := buildClientHelper(require, testKey) + vm := buildClientHelper(context.Background(), require, testKey) defer vm.runtime.Stop(context.Background()) // test unimplemented case; this is just a guard @@ -383,7 +387,7 @@ func TestParseStateSummary(t *testing.T) { testKey := parseStateSummaryTestKey // Create and start the plugin - vm := buildClientHelper(require, testKey) + vm := buildClientHelper(context.Background(), require, testKey) defer vm.runtime.Stop(context.Background()) // test unimplemented case; this is just a guard @@ -412,7 +416,7 @@ func TestGetStateSummary(t *testing.T) { testKey := getStateSummaryTestKey // Create and start the plugin - vm := buildClientHelper(require, testKey) + vm := buildClientHelper(context.Background(), require, testKey) defer vm.runtime.Stop(context.Background()) // test unimplemented case; this is just a guard @@ -437,7 +441,7 @@ func TestAcceptStateSummary(t *testing.T) { testKey := acceptStateSummaryTestKey // Create and start the plugin - vm := buildClientHelper(require, testKey) + vm := buildClientHelper(context.Background(), require, testKey) defer vm.runtime.Stop(context.Background()) // retrieve the summary first @@ -467,7 +471,7 @@ func TestLastAcceptedBlockPostStateSummaryAccept(t *testing.T) { testKey := lastAcceptedBlockPostStateSummaryAcceptTestKey // Create and start the plugin - vm := buildClientHelper(require, testKey) + vm := buildClientHelper(context.Background(), require, testKey) defer vm.runtime.Stop(context.Background()) // Step 1: initialize VM and check initial LastAcceptedBlock diff --git a/vms/rpcchainvm/vm_test.go b/vms/rpcchainvm/vm_test.go index f944438ef791..63c629ca6e04 100644 --- a/vms/rpcchainvm/vm_test.go +++ b/vms/rpcchainvm/vm_test.go @@ -61,14 +61,14 @@ var TestServerPluginMap = map[string]func(*testing.T, bool) block.ChainVM{ } // helperProcess helps with creating the subnet binary for testing. -func helperProcess(s ...string) *exec.Cmd { +func helperProcess(ctx context.Context, s ...string) *exec.Cmd { cs := []string{"-test.run=TestHelperProcess", "--"} cs = append(cs, s...) env := []string{ "TEST_PROCESS=1", } run := os.Args[0] - cmd := exec.Command(run, cs...) + cmd := exec.CommandContext(ctx, run, cs...) env = append(env, os.Environ()...) cmd.Env = env return cmd @@ -198,7 +198,7 @@ func TestRuntimeSubprocessBootstrap(t *testing.T) { status, stopper, err := subprocess.Bootstrap( context.Background(), listener, - helperProcess("dummy"), + helperProcess(context.Background(), "dummy"), test.config, ) if err == nil { diff --git a/vms/rpcchainvm/with_context_vm_test.go b/vms/rpcchainvm/with_context_vm_test.go index d4135bc16f39..df9f4af85ba2 100644 --- a/vms/rpcchainvm/with_context_vm_test.go +++ b/vms/rpcchainvm/with_context_vm_test.go @@ -94,7 +94,7 @@ func TestContextVMSummary(t *testing.T) { testKey := contextTestKey // Create and start the plugin - vm := buildClientHelper(require, testKey) + vm := buildClientHelper(context.Background(), require, testKey) defer vm.runtime.Stop(context.Background()) ctx := snowtest.Context(t, snowtest.CChainID)