Skip to content

Conversation

vertex451
Copy link

@vertex451 vertex451 commented Sep 2, 2025

Port Reservation System for Akash Provider

Resolves akash-network/support#234

Important limitation: we return only a list of ports because when an order is created, the provider sees only limited data about future deployment, not the full manifest.

Testing

Manual E2E Testing:
I spin up Aakash blockchain and run a provider with my changes.
Then I created a deployment, which triggers bidding, and port reservation happens.
So, when the bid from the provider is here, we can query provider-bid-proposed-ports:

{"proposed_ports":[30106,30107]}

Now we can create a lease, send a manifest, and query provider lease status for that deployment, and in forwarded_ports we can see that the service uses ports reserved during the bidding phase:

  "forwarded_ports": {
    "bew": [
      {
        "host": "localhost",
        "port": 80,
        "externalPort": 30106,
        "proto": "TCP",
        "name": "bew"
      }
    ],
    "web": [
      {
        "host": "localhost",
        "port": 80,
        "externalPort": 30107,
        "proto": "TCP",
        "name": "web"
      }
    ]
  },

Overview

Enables users to query proposed external ports from providers during the bidding phase, allowing informed provider selection based on port allocations.

Problem

Users only saw ports after lease creation, making provider comparison impossible.

Solution

  1. Reserve ports during bidding.
  2. Using those ports if a lease is created
  3. Releasing ports on corresponding events
  4. HTTP endpoint to query proposed allocations.

Architecture

  • HTTP API: /bid-proposed-ports/{owner}/{dseq}/{gseq}/{oseq}. Transport layer.
  • PortManager: Unified port operations (order + lease phases). Usecase layer.
  • portreserve.Store: separated db layer

Challenges

Port Release Triggers

  1. Bid Lost: Order ports released immediately when the bid doesn't win
  2. TTL Expiration: Auto-cleanup after 5 minutes (orders) or 1 minute (leases)
  3. Lease Closed: All lease ports are released when the lease terminates
  4. Manual Cleanup: Periodic cleanup routine removes expired reservations
  5. Provider Restart: Stale reservations cleaned on startup recovery

TODOs

  • persistent store
  • Configurable port ranges and TTL
  • Better testing with concurrent deployments

Proposal: Persistent Port Reservations via CRD

Memory-only port reservations are lost on provider restart. Introduce a lightweight PortReservation CRD:

Fields: port, bidID, leaseID, status (reserved/allocated/released), expiresAt.

On bid → create reserved; bid lost/TTL → released; bid won → allocated.

Manifest reads CRDs for port assignment.

Survives provider restarts, no external DB needed.

Copy link

coderabbitai bot commented Sep 2, 2025

Walkthrough

Adds a centralized PortManager and a file-backed port reservation store; integrates lease-scoped port allocation into builder/deploy flows; exposes an unauthenticated REST endpoint to return proposed ports for an order; adds a Makefile helper and extensive tests/mocks for port-management behavior.

Changes

Cohort / File(s) Summary
Make targets
_run/common-commands.mk
Adds .PHONY: provider-bid-proposed-ports and a target that curls the new REST endpoint for proposed ports.
Kubernetes deployment config
_run/kube/deployment.yaml
Adjusts public port mappings to 8080/8081 and removes host/HTTP options in service expose blocks.
Port store (persistence)
cluster/portreserve/store.go, cluster/portreserve/store_critical_test.go
New file-backed, thread-safe Store with ReserveForOrder, PortsForOrder, PromoteOrderToLease, NextForLease, ReleaseOrder/Lease, CleanupExpired, SetPortRange and critical tests for collisions, persistence, exhaustion, concurrency.
Port manager core
cluster/port_manager.go, cluster/port_manager_critical_test.go
New PortManager interface and implementation coordinating order/lease reservations, caching, AllocatePorts/ReserveForOrder/Promote/Release, CleanupExpired, Stats, plus extensive critical tests.
Inventory / service integration
cluster/inventory.go, cluster/service.go, cluster/manager.go, cluster/inventory_test.go
Inventory creates/exposes PortManager, reserves order ports during request handling with TTL, runs periodic cleanup; service promotes order→lease on EventLeaseCreated and releases on EventLeaseClosed; manager injects PortManager and LeaseID into deploy context; tests adjusted.
Builder / kube client wiring
cluster/kube/builder/service.go, cluster/kube/builder/builder_test.go, cluster/kube/builder/mocks/service_port_allocator.go, cluster/kube/client.go
Adds ServicePortAllocator interface and mock; BuildService signature updated to accept allocator and return (Service, error); ports() pre-allocates NodePorts via allocator; kube client provides lease-scoped allocator from context.
Context helpers
cluster/ctx/context.go
New typed context helpers: WithPortManager, PortManagerFrom, WithLeaseID, LeaseIDFrom.
Gateway REST API
gateway/rest/router.go
Adds unauthenticated GET /bid-proposed-ports/{owner}/{dseq}/{gseq}/{oseq} that parses path, builds OrderID, queries ClusterService().PortManager().PortsForOrder(orderID), and returns JSON { "proposed_ports": [...] }.
Mocks & tests (scaffolding)
cluster/mocks/service.go, cluster/mocks/cluster.go, cluster/monitor_test.go, cluster/port_integration_critical_test.go, cluster/portreserve/store_critical_test.go, cluster/port_manager_critical_test.go
Updates mocks (generator bump), adds PortManager-aware mock methods, replaces/extends test scaffolding, and adds many integration/critical tests exercising end-to-end and concurrent port scenarios.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant User
  participant Make as Makefile
  participant REST as Gateway REST
  participant Cluster as Cluster Service
  participant PM as PortManager
  participant Store as PortReserve Store

  User->>Make: make provider-bid-proposed-ports
  Make->>REST: GET /bid-proposed-ports/{owner}/{dseq}/{gseq}/{oseq}
  REST->>Cluster: PortsForOrder(orderID)
  Cluster->>PM: PortsForOrder(orderID)
  PM->>Store: PortsForOrder(orderID)
  Store-->>PM: [ports]
  PM-->>Cluster: [ports]
  Cluster-->>REST: { "proposed_ports": [...] }
  REST-->>Make: JSON
  Make-->>User: prints JSON
Loading
sequenceDiagram
  autonumber
  participant Scheduler
  participant PM as PortManager
  participant Store
  participant Cluster
  participant Deploy as Kube Client / Builder

  Scheduler->>PM: ReserveForOrder(orderID, n, ttl)
  PM->>Store: ReserveForOrder(...)
  Store-->>PM: [reserved ports]
  Cluster->>PM: PromoteOrderToLease(orderID, leaseID, ttl)
  PM->>Store: PromoteOrderToLease(...)
  Store-->>PM: ok

  Cluster->>Deploy: Deploy(ctx{port-manager, lease-id})
  Deploy->>Deploy: BuildService(..., allocator)
  Deploy->>PM: AllocatePorts(leaseID, serviceName, count)
  PM->>Store: NextForLease(leaseID) x count
  Store-->>PM: next port(s)
  PM-->>Deploy: [nodePorts]
  Deploy-->>Cluster: k8s resources with NodePorts

  Cluster->>PM: ReleaseLease(leaseID)
  PM->>Store: ReleaseLease(leaseID)
  Store-->>PM: done
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Assessment against linked issues

Objective Addressed Explanation
Expose the ports assigned within the container (#234) Endpoint exposes proposed external ports for an order but does not return per-container (internal SDL port) → external NodePort mappings; no container-internal port mapping surface is added.

Assessment against linked issues: Out-of-scope changes

Code Change Explanation
Adjust service expose mappings and remove host/http options (_run/kube/deployment.yaml) Manifest and environment tweaks do not relate to programmatic querying of container-internal → external port mappings required by #234.

Suggested reviewers

  • boz

Poem

I twitched my whiskers, tucked a port,
Wrote it down in files of sort.
Orders whisper, leases keep,
Ports awake from restful sleep.
Curl, mock, and test — a rabbit’s hop to court. 🐇

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.2.2)

Error: can't load config: unsupported version of the configuration: "" See https://golangci-lint.run/product/migration-guide for migration instructions
The command is terminated due to an error: can't load config: unsupported version of the configuration: "" See https://golangci-lint.run/product/migration-guide for migration instructions

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

image: quay.io/ovrclk/demo-app
expose:
- port: 80
as: 80
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this diff to show example of deployment i used to create NodePort for randpom port assigments

@vertex451 vertex451 marked this pull request as ready for review September 2, 2025 14:05
@vertex451 vertex451 requested review from boz and troian as code owners September 2, 2025 14:05
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
cluster/inventory.go (1)

629-655: Release reserved ports when unreserving an order

Unreserving capacity should also free any NodePorts reserved for the order; otherwise ports linger until TTL.

Apply this diff:

 			for idx, res := range state.reservations {
 				if !res.OrderID().Equals(req.order) {
 					continue
 				}
 
 				is.log.Info("removing reservation", "order", res.OrderID())
 
+				// Release any NodePorts reserved for this order
+				is.portManager.ReleaseOrder(req.order)
+
 				state.reservations = append(state.reservations[:idx], state.reservations[idx+1:]...)
 				// reclaim availableExternalPorts if unreserving allocated resources
 				if res.allocated {
 					is.availableExternalPorts += reservationCountEndpoints(res)
 				}
 
 				req.ch <- inventoryResponse{value: res}
 				is.log.Info("unreserve capacity complete", "order", req.order)
 				inventoryRequestsCounter.WithLabelValues("unreserve", "destroyed").Inc()
 				continue loop
 			}
♻️ Duplicate comments (1)
_run/kube/deployment.yaml (1)

9-9: Drop hardcoded "as" ports in example SDL (conflicts with allocator).

These fixed external ports can fight the new PortManager/allocator and, on most clusters, 8080/8081 are outside the default NodePort range (30000–32767). Recommend reverting to allocator-driven ports or removing "as" from this sample.

Apply:

-        as: 8080
+        # as: 8080  # leave unset to use provider-assigned port
-        as: 8081
+        # as: 8081  # leave unset to use provider-assigned port

Also applies to: 16-16

🧹 Nitpick comments (15)
_run/common-commands.mk (1)

312-315: Parameterize gateway URL (avoid hardcoded localhost) and trim trailing whitespace.

Let users point at non-local providers; also remove the stray blank line.

Apply:

+PROVIDER_GATEWAY ?= https://localhost:8443
 .PHONY: provider-bid-proposed-ports
 provider-bid-proposed-ports:
-	curl -sk "https://localhost:8443/bid-proposed-ports/$(KEY_ADDRESS)/$(DSEQ)/$(GSEQ)/$(OSEQ)"
-		
+	curl -sk "$(PROVIDER_GATEWAY)/bid-proposed-ports/$(KEY_ADDRESS)/$(DSEQ)/$(GSEQ)/$(OSEQ)"

Optional: add auth header if JWT is required:

-	curl -sk "$(PROVIDER_GATEWAY)/bid-proposed-ports/$(KEY_ADDRESS)/$(DSEQ)/$(GSEQ)/$(OSEQ)"
+	curl -sk -H "Authorization: Bearer $(JWT_TOKEN)" "$(PROVIDER_GATEWAY)/bid-proposed-ports/$(KEY_ADDRESS)/$(DSEQ)/$(GSEQ)/$(OSEQ)"
cluster/portreserve/store.go (4)

165-172: Honor default TTL when zero/negative is passed.

Prevents accidentally creating near-immediate expiry reservations.

Apply:

 func (s *Store) ReserveForOrder(id mtypes.OrderID, count int, ttl time.Duration) []int32 {
 	if count <= 0 {
 		return nil
 	}
+	if ttl <= 0 {
+		ttl = DefaultReservationTTL
+	}
 	s.mu.Lock()
 	defer s.mu.Unlock()

120-152: loadFromFile: guard against corrupt JSON and document locking expectations.

Consider logging parse errors and explicitly stating the caller must hold s.mu (except during init).


223-241: Remove stdout debug prints or route through the project logger.

fmt.Printf in hot paths will spam logs and leak IDs; gate with a debug flag or use the provider’s logger.

Apply (example):

-	fmt.Printf("DEBUG: PromoteOrderToLease called - OrderID=%+v, LeaseID=%+v\n", oid, lid)
+	// log.Debug().Str("module","portreserve").Interface("order", oid).Interface("lease", lid).Msg("PromoteOrderToLease")

Also applies to: 245-263


63-72: Parameterize port range and storage path.

Expose via config/env (e.g., AKASH_NODEPORT_START/END and AKASH_PORTRESERVE_FILE) to match cluster NodePort ranges and FHS-compliant paths.

cluster/inventory.go (1)

632-632: Nit: log message grammar

“attempting to removing reservation” → “attempting to remove reservation”.

Apply this diff:

-			is.log.Info("attempting to removing reservation", "order", req.order)
+			is.log.Info("attempting to remove reservation", "order", req.order)
cluster/service.go (2)

246-249: Guard against nil PortManager at startup.
If inventory.PortManager() can ever be nil, fail fast in NewService to avoid late panics.

Apply this minimal check in NewService after inventory is created:

@@
   inventory, err := newInventoryService(ctx, cfg, log, sub, client, waiter, deployments)
   if err != nil {
     sub.Close()
     return nil, err
   }
+  if inventory.PortManager() == nil {
+    sub.Close()
+    return nil, errors.New("inventory PortManager is nil")
+  }

394-399: Use Debug level (not Info) and drop the "DEBUG:" prefix; consider configurable TTL.
Noise reduction and clarity. Also, a 1m TTL risks expiring before slower manifests are applied.

Diff to adjust log level and message:

 case mtypes.EventLeaseCreated:
-  // Promote NodePort reservation from order to lease; keep for the manifest window (PoC)
-  s.log.Info("DEBUG: Promoting order ports to lease", "orderID", ev.ID.OrderID(), "leaseID", ev.ID)
+  // Promote NodePort reservation from order to lease; keep for the manifest window (PoC)
+  s.log.Debug("promoting order ports to lease", "orderID", ev.ID.OrderID(), "leaseID", ev.ID)
   s.PortManager().PromoteOrderToLease(ev.ID.OrderID(), ev.ID, time.Minute)
-  s.log.Info("DEBUG: Promotion completed", "leaseID", ev.ID)
+  s.log.Debug("promotion completed", "leaseID", ev.ID)
 case mtypes.EventLeaseClosed:
-  // Release lease ports when lease is closed
-  s.log.Info("DEBUG: Releasing lease ports", "leaseID", ev.ID)
+  // Release lease ports when lease is closed
+  s.log.Debug("releasing lease ports", "leaseID", ev.ID)
   s.PortManager().ReleaseLease(ev.ID)
-  s.log.Info("DEBUG: Lease ports released", "leaseID", ev.ID)
+  s.log.Debug("lease ports released", "leaseID", ev.ID)

Optional: make the lease promotion TTL configurable (e.g., via Config) to prevent premature expiry under backpressure.

Also applies to: 400-404

cluster/kube/builder/builder_test.go (3)

114-117: Assert allocator was not called when no exposes exist.
Make the intent explicit and catch accidental allocations.

Example:

 import (
   "strconv"
   "testing"
+  "github.com/stretchr/testify/mock"
@@
- // This test has no expose directives, so AllocatePorts should not be called
- mockAllocator := mocks.NewServicePortAllocator(t)
+ // This test has no expose directives, so AllocatePorts should not be called
+ mockAllocator := mocks.NewServicePortAllocator(t)
   serviceBuilder := BuildService(workload, true, mockAllocator)
   require.NotNil(t, serviceBuilder)
   // Should have name ending with suffix
   require.Equal(t, "myservice-np", serviceBuilder.Name())
   // Should not have any work to do
   require.False(t, serviceBuilder.Any())
+  mockAllocator.AssertNotCalled(t, "AllocatePorts", mock.Anything, mock.Anything)

219-222: Same here: explicitly assert no allocations for local-only exposes.

- // This test has expose directives but only local services (Global: false), so AllocatePorts should not be called for global service builder
- mockAllocator := mocks.NewServicePortAllocator(t)
+ // This test has expose directives but only local services (Global: false), so AllocatePorts should not be called
+ mockAllocator := mocks.NewServicePortAllocator(t)
   serviceBuilder := BuildService(workload, true, mockAllocator)
+  mockAllocator.AssertNotCalled(t, "AllocatePorts", mock.Anything, mock.Anything)

344-348: Fix misleading comment.
Comment says “Should have work to do” but the assertion expects no work.

- // Should have work to do
+ // Should not have any work to do
   require.False(t, serviceBuilder.Any())
cluster/kube/client.go (2)

1156-1166: Add compile-time interface conformance for leasePortAllocator.
Prevents regressions if the allocator interface changes.

-// leasePortAllocator adapts PortManager to the builder.ServicePortAllocator interface
+// leasePortAllocator adapts PortManager to the builder.ServicePortAllocator interface
+var _ builder.ServicePortAllocator = (*leasePortAllocator)(nil)
 type leasePortAllocator struct {
   portManager cluster.PortManager
   leaseID     mtypes.LeaseID
 }

463-473: Prefer typed context keys over plain strings
Using untyped string keys in context.WithValue/Value risks collisions across packages. Define a private key type and constants, for example:

type ctxKey string

const (
  ctxKeyPortManager ctxKey = "port-manager"
  ctxKeyLeaseID     ctxKey = "lease-id"
)

Then replace the literals in

  • cluster/manager.go (lines ~389–390: context.WithValue)
  • cluster/kube/client.go (lines 463–473: ctx.Value)

with these typed constants.

cluster/port_manager.go (2)

175-182: Unused Cleanup method

The Cleanup method at the end appears to be unused dead code. It's not part of the PortManager interface and has a comment suggesting it's optional maintenance.

Consider either:

  1. Remove this method if it's not needed
  2. Add it to the interface if it should be called periodically
  3. Call it from the existing CleanupExpired method if appropriate
-// Cleanup removes expired lease allocators (optional maintenance)
-func (pm *portManager) Cleanup() {
-	pm.mu.Lock()
-	defer pm.mu.Unlock()
-
-	// This could be enhanced to remove allocators for closed leases
-	// For now, we rely on explicit ReleaseLease calls
-}

56-58: Consider validating TTL parameter

The ReserveForOrder method accepts a TTL but doesn't validate it. Consider checking for negative or zero values that might cause issues.

 func (pm *portManager) ReserveForOrder(orderID mtypes.OrderID, count int, ttl time.Duration) []int32 {
+	if ttl <= 0 {
+		pm.log.Warn("Invalid TTL for order reservation, using default", "orderID", orderID, "ttl", ttl)
+		ttl = portreserve.DefaultReservationTTL
+	}
 	pm.log.Info("Reserving ports for order", "orderID", orderID, "count", count)
 	return pm.store.ReserveForOrder(orderID, count, ttl)
 }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between bf47737 and 6ddc7da.

📒 Files selected for processing (12)
  • _run/common-commands.mk (1 hunks)
  • _run/kube/deployment.yaml (1 hunks)
  • cluster/inventory.go (6 hunks)
  • cluster/kube/builder/builder_test.go (7 hunks)
  • cluster/kube/builder/mocks/service_port_allocator.go (1 hunks)
  • cluster/kube/builder/service.go (3 hunks)
  • cluster/kube/client.go (3 hunks)
  • cluster/manager.go (4 hunks)
  • cluster/port_manager.go (1 hunks)
  • cluster/portreserve/store.go (1 hunks)
  • cluster/service.go (4 hunks)
  • gateway/rest/router.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (9)
cluster/service.go (1)
cluster/port_manager.go (1)
  • PortManager (14-28)
cluster/kube/builder/service.go (5)
cluster/kube/builder/mocks/service_port_allocator.go (1)
  • ServicePortAllocator (8-10)
cluster/service.go (1)
  • Service (99-109)
service.go (1)
  • Service (57-62)
cluster/kube/builder/workload.go (1)
  • Workload (32-38)
pkg/apis/akash.network/v2beta2/manifest.go (1)
  • ManifestGroup (63-68)
cluster/port_manager.go (1)
cluster/portreserve/store.go (1)
  • Store (49-56)
gateway/rest/router.go (1)
cluster/port_manager.go (1)
  • PortManager (14-28)
cluster/kube/builder/mocks/service_port_allocator.go (1)
cluster/kube/builder/service.go (1)
  • ServicePortAllocator (58-60)
cluster/kube/client.go (4)
cluster/port_manager.go (1)
  • PortManager (14-28)
pkg/apis/akash.network/v2beta2/types.go (1)
  • LeaseID (49-55)
pkg/apis/akash.network/v2beta1/types.go (1)
  • LeaseID (95-101)
cluster/kube/builder/service.go (1)
  • BuildService (29-43)
cluster/kube/builder/builder_test.go (2)
cluster/kube/builder/mocks/service_port_allocator.go (1)
  • NewServicePortAllocator (71-81)
cluster/kube/builder/service.go (1)
  • BuildService (29-43)
cluster/manager.go (2)
cluster/port_manager.go (1)
  • PortManager (14-28)
pkg/apis/akash.network/v2beta2/types.go (1)
  • LeaseID (49-55)
cluster/inventory.go (2)
cluster/port_manager.go (2)
  • PortManager (14-28)
  • NewPortManager (46-52)
cluster/portreserve/store.go (2)
  • GetSharedStore (76-83)
  • DefaultReservationTTL (18-18)
🔇 Additional comments (10)
cluster/manager.go (1)

74-75: LGTM: PortManager plumbed into deploymentManager

Field introduction is straightforward and keeps deps localized.

cluster/inventory.go (3)

103-104: LGTM: centralized PortManager field added

Keeps the API surface minimal while enabling downstream usage.


141-143: LGTM: initialize PortManager with shared store

Reasonable default; enables restart recovery via store.loadFromFile().


176-195: LGTM: periodic cleanup loop

Properly stops on ctx/lifecycle; emits useful logs when work is done.

cluster/service.go (2)

5-5: LGTM: time import is appropriate.


107-109: Interface extension looks good.
Exposing PortManager via Service makes sense for downstream consumers.

cluster/kube/builder/builder_test.go (1)

404-405: LGTM: local-service build with no allocator.

cluster/kube/client.go (1)

512-514: Allocator wiring for global services looks correct.
Local service uses nil; global uses lease-scoped allocator.

cluster/kube/builder/service.go (1)

212-215: Add bounds checking for allocated ports array

While the condition checks len(allocatedPorts) > allocatedIndex, it would be safer to also verify that we have exactly the expected number of allocated ports.

 				// Assign NodePort using allocator
 				if b.requireNodePort && len(allocatedPorts) > allocatedIndex {
+					if allocatedIndex >= len(allocatedPorts) {
+						b.log.Error("Insufficient allocated ports", "expected", nodePortCount, "got", len(allocatedPorts))
+						return nil, errors.New("insufficient allocated ports")
+					}
 					sp.NodePort = allocatedPorts[allocatedIndex]
 					allocatedIndex++
 				}

Likely an incorrect or invalid review comment.

cluster/kube/builder/mocks/service_port_allocator.go (1)

1-81: LGTM! Auto-generated mock file

This is a properly auto-generated mock file by mockery v2.53.5. The implementation correctly mocks the ServicePortAllocator interface with all necessary test helpers.

Comment on lines 30 to 32
if requireNodePort && allocator == nil {
panic("ServicePortAllocator is required when requireNodePort is true")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider returning an error instead of panicking

While the panic correctly prevents invalid usage, returning an error would be more idiomatic and allow callers to handle the situation gracefully. This is especially important in production environments where panics can cause service disruptions.

-func BuildService(workload *Workload, requireNodePort bool, allocator ServicePortAllocator) Service {
-	if requireNodePort && allocator == nil {
-		panic("ServicePortAllocator is required when requireNodePort is true")
-	}
+func BuildService(workload *Workload, requireNodePort bool, allocator ServicePortAllocator) (Service, error) {
+	if requireNodePort && allocator == nil {
+		return nil, errors.New("ServicePortAllocator is required when requireNodePort is true")
+	}

Note: This change would require updating the function signature and all call sites to handle the error return.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if requireNodePort && allocator == nil {
panic("ServicePortAllocator is required when requireNodePort is true")
}
func BuildService(workload *Workload, requireNodePort bool, allocator ServicePortAllocator) (Service, error) {
if requireNodePort && allocator == nil {
return nil, errors.New("ServicePortAllocator is required when requireNodePort is true")
}
🤖 Prompt for AI Agents
In cluster/kube/builder/service.go around lines 30 to 32, replace the panic when
requireNodePort is true and allocator is nil with an error return: update the
function signature to return (T, error) or error as appropriate, construct and
return a descriptive error (e.g., "ServicePortAllocator required when
requireNodePort is true"), and propagate this change through all callers by
handling the error (returning it up the stack or handling it locally); ensure
tests and call sites are updated to check and handle the new error return.

Comment on lines 183 to 201
// Very simple sequential allocator in default range (PoC only)
ports := make([]int32, 0, count)
var p int32 = s.defaultStart

for p <= s.defaultEnd && len(ports) < count {
if !s.inUseLocked(p) {
ports = append(ports, p)
}
p++
}

res := &reservation{Ports: ports, Next: 0, Expires: time.Now().Add(ttl)}
s.byOrder[ok] = res

// Save to file for cross-process access
s.saveToFile()

return append([]int32(nil), ports...)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fail fast if unable to allocate the requested count (avoid partial reservations).

Returning fewer ports than requested can cause nondeterministic failures later.

Apply:

 	for p <= s.defaultEnd && len(ports) < count {
 		if !s.inUseLocked(p) {
 			ports = append(ports, p)
 		}
 		p++
 	}
 
-	res := &reservation{Ports: ports, Next: 0, Expires: time.Now().Add(ttl)}
+	// Ensure we satisfied the request; otherwise, don't persist a partial reservation.
+	if len(ports) < count {
+		return nil
+	}
+	res := &reservation{Ports: ports, Next: 0, Expires: time.Now().Add(ttl)}
 	s.byOrder[ok] = res
 
 	// Save to file for cross-process access
 	s.saveToFile()
 
 	return append([]int32(nil), ports...)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Very simple sequential allocator in default range (PoC only)
ports := make([]int32, 0, count)
var p int32 = s.defaultStart
for p <= s.defaultEnd && len(ports) < count {
if !s.inUseLocked(p) {
ports = append(ports, p)
}
p++
}
res := &reservation{Ports: ports, Next: 0, Expires: time.Now().Add(ttl)}
s.byOrder[ok] = res
// Save to file for cross-process access
s.saveToFile()
return append([]int32(nil), ports...)
}
// Very simple sequential allocator in default range (PoC only)
ports := make([]int32, 0, count)
var p int32 = s.defaultStart
for p <= s.defaultEnd && len(ports) < count {
if !s.inUseLocked(p) {
ports = append(ports, p)
}
p++
}
// Ensure we satisfied the request; otherwise, don't persist a partial reservation.
if len(ports) < count {
return nil
}
res := &reservation{Ports: ports, Next: 0, Expires: time.Now().Add(ttl)}
s.byOrder[ok] = res
// Save to file for cross-process access
s.saveToFile()
return append([]int32(nil), ports...)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
cluster/kube/builder/service.go (1)

115-122: Over-strict match prevents NodePort retention on harmless name changes.

Update() only retains existing NodePort when Name AND (Port, TargetPort, Protocol) all match. If names are regenerated (e.g., due to expose order changes), you lose retention unnecessarily. Match on the material fields; ignore Name.

-                if curport.Name == port.Name &&
-                    curport.Port == port.Port &&
+                if /* name-agnostic match */ curport.Port == port.Port &&
                     curport.TargetPort.IntValue() == port.TargetPort.IntValue() &&
                     curport.Protocol == port.Protocol {
♻️ Duplicate comments (9)
cluster/kube/client.go (1)

464-468: No panics on missing context — good fix.

Replacing panics with explicit errors aligns with prior feedback and improves resilience.

cluster/kube/builder/builder_test.go (1)

284-302: Great: verifies allocator→NodePort wiring end-to-end.

The added NodePort assertion covers the critical path.

cluster/port_manager.go (1)

78-115: Race fixed by locked ensureLeaseState path.

Replacing unlock/relock with ensureLeaseStateLocked resolves the race on leaseState.

cluster/inventory.go (1)

480-508: Port reservation moved after capacity check with proper rollback!

Great improvement! The implementation now correctly reserves ports only after successful capacity adjustment and includes a deferred rollback mechanism. This addresses the previous concern about port reservation leaks on capacity failures.

cluster/kube/builder/service.go (2)

29-33: Good change: panic → error return (matches prior feedback).

The constructor now returns an error instead of panicking when requireNodePort is true and allocator is nil. This is the right call for production stability.


159-179: Dedup/counting fix LGTM.

Using a separate countedPorts map for counting and a fresh portsAdded for creation resolves the earlier duplicate-detection bug.

Also applies to: 181-185

cluster/portreserve/store.go (3)

117-157: Nice: atomic, permission-hardened writes.

Temp-file + fsync + rename with 0600 perms addresses earlier torn-write and leakage concerns.


194-216: Correct: collision check now covers both orders and leases.

This fixes prior collisions during bidding against active leases.


236-257: Avoid persisting partial reservations; fail fast on insufficient capacity.

Current code stores fewer than requested ports, causing fragmentation and long-lived blocks under exhaustion. Return nil and do not persist unless count is fully satisfied.

Apply:

  // Very simple sequential allocator in default range (PoC only)
  ports := make([]int32, 0, count)
  var p int32 = s.defaultStart

  for p <= s.defaultEnd && len(ports) < count {
    if !s.inUseLocked(p) {
      ports = append(ports, p)
    }
    p++
  }

- res := &reservation{Ports: ports, Next: 0, Expires: time.Now().Add(ttl)}
+ // Ensure full satisfaction; avoid persisting partial allocations
+ if len(ports) < count {
+   return nil
+ }
+ res := &reservation{Ports: ports, Next: 0, Expires: time.Now().Add(ttl)}
  s.byOrder[ok] = res

  // Save to file for cross-process access
  if err := s.saveToFile(); err != nil {
    // Log error but don't fail the operation - ports are still reserved in memory
    fmt.Printf("Warning: failed to persist port reservation to file: %v\n", err)
  }
🧹 Nitpick comments (26)
cluster/kube/client.go (2)

469-479: Defer PortManager/LeaseID retrieval until required.

Currently Deploy fails if the context lacks PortManager/LeaseID even when no global (NodePort) services exist. Lazily fetch them only when you actually need to build a global service. This preserves backward compatibility and avoids hard coupling.

Apply this refactor:

@@
- // Get port manager and lease ID from context (passed by deployment manager)
- pmInterface, ok := clusterctx.PortManagerFrom(ctx)
- if !ok {
-   return fmt.Errorf("missing PortManager in context - deployment manager must provide one")
- }
-
- portManager, ok := pmInterface.(cluster.PortManager)
- if !ok {
-   return fmt.Errorf("invalid PortManager type in context")
- }
-
- leaseID, ok := clusterctx.LeaseIDFrom(ctx)
- if !ok {
-   return fmt.Errorf("missing lease ID in context - deployment manager must provide one")
- }
+ // Lazily initialize PortManager/LeaseID only if a global service requires NodePort
+ var (
+   portManager cluster.PortManager
+   leaseID     mtypes.LeaseID
+   pmInit      bool
+ )

…and before building the global service:

@@
- svc.globalService, err = builder.BuildService(workload, true, &leasePortAllocator{portManager, leaseID})
+ // Build global service only if any expose is Global (avoids requiring allocator otherwise)
+ needsGlobal := false
+ for _, e := range service.Expose {
+   if e.Global {
+     needsGlobal = true
+     break
+   }
+ }
+ if needsGlobal {
+   if !pmInit {
+     pmInterface, ok := clusterctx.PortManagerFrom(ctx)
+     if !ok {
+       return fmt.Errorf("missing PortManager in context - deployment manager must provide one")
+     }
+     var okType bool
+     portManager, okType = pmInterface.(cluster.PortManager)
+     if !okType {
+       return fmt.Errorf("invalid PortManager type in context")
+     }
+     var okLease bool
+     leaseID, okLease = clusterctx.LeaseIDFrom(ctx)
+     if !okLease {
+       return fmt.Errorf("missing lease ID in context - deployment manager must provide one")
+     }
+     pmInit = true
+   }
+   svc.globalService, err = builder.BuildService(workload, true, &leasePortAllocator{portManager, leaseID})
+   if err != nil {
+     return fmt.Errorf("failed to build global service: %w", err)
+   }
+ }

518-526: Only build a global service when at least one expose is Global.

Avoids requiring an allocator when a service has only local exposes.

cluster/kube/builder/builder_test.go (4)

113-118: Assert allocator is not invoked when no expose directives exist.

Strengthens the negative path expectation.

 import (
   "strconv"
   "testing"
@@
- // This test has no expose directives, so AllocatePorts should not be called
   mockAllocator := mocks.NewServicePortAllocator(t)
   serviceBuilder, err := BuildService(workload, true, mockAllocator)
   require.NoError(t, err)
   require.NotNil(t, serviceBuilder)
+ // Ensure allocator wasn't used
+ // import "github.com/stretchr/testify/mock" at top if not present
+ // mockAllocator.AssertNotCalled(t, "AllocatePorts", mock.Anything, mock.Anything)

221-225: Also assert allocator not called for local-only exposes.

 mockAllocator := mocks.NewServicePortAllocator(t)
 serviceBuilder, err := BuildService(workload, true, mockAllocator)
 require.NoError(t, err)
 
 // Should not have any work to do
 require.False(t, serviceBuilder.Any())
+// Ensure allocator wasn't used
+// mockAllocator.AssertNotCalled(t, "AllocatePorts", mock.Anything, mock.Anything)

295-301: Swap expected/actual in require.Equal for clarity.

Minor readability improvement across these assertions.

- require.Equal(t, result.Spec.Type, corev1.ServiceTypeNodePort)
+ require.Equal(t, corev1.ServiceTypeNodePort, result.Spec.Type)
@@
- require.Equal(t, ports[0].Port, int32(1001))
+ require.Equal(t, int32(1001), ports[0].Port)
- require.Equal(t, ports[0].TargetPort, intstr.FromInt(1000))
+ require.Equal(t, intstr.FromInt(1000), ports[0].TargetPort)
- require.Equal(t, ports[0].Name, "0-1001")
+ require.Equal(t, "0-1001", ports[0].Name)

410-411: Swap expected/actual in this equality too.

- require.Equal(t, result.Spec.Type, corev1.ServiceTypeClusterIP)
+ require.Equal(t, corev1.ServiceTypeClusterIP, result.Spec.Type)
cluster/ctx/context.go (2)

18-20: Document expected type for pm to avoid misuse.

Add a short comment that pm should implement cluster.PortManager (kept untyped to avoid import cycles).


12-15: Prefer non-string unique keys for context.

Use an unexported zero-sized struct keys (vars) to further reduce collision risk.

-type contextKey string
-
-const (
-  portManagerKey contextKey = "port-manager"
-  leaseIDKey     contextKey = "lease-id"
-)
+type contextKey struct{}
+
+var (
+  portManagerKey = &contextKey{}
+  leaseIDKey     = &contextKey{}
+)
cluster/port_manager.go (2)

134-147: Avoid hard-coded 1000 cap when loading ports.

If a lease ever reserves >1000 ports, allocations will silently miss remaining ports. Consider deriving a bound from store state or iterating until sentinel with a higher safety cap and metric.

- maxIterations := 1000 // Defensive limit to prevent infinite loops
- for i := 0; i < maxIterations; i++ {
+ const maxIterations = 1 << 16 // defensive ceiling; revisit when store exposes counts
+ for i := 0; i < maxIterations; i++ {

113-115: Reduce log verbosity on hot path.

AllocatePorts is a frequent call; consider Debug level to prevent log noise.

- pm.log.Info("Allocated ports for service", "leaseID", leaseID, "service", serviceName, "ports", result)
+ pm.log.Debug("Allocated ports for service", "leaseID", leaseID, "service", serviceName, "ports", result)
cluster/port_manager_critical_test.go (1)

208-225: Good defensive programming with loop termination protection!

The test for infinite loop protection is valuable. However, the mock store implementation could be more realistic by actually implementing proper state management while simulating the misbehavior.

Consider enhancing the mock to properly track reserved ports in ReserveForOrder and PromoteOrderToLease while only misbehaving in NextForLease:

 func newMockInfiniteStore() *mockInfiniteStore {
     return &mockInfiniteStore{
         Store:   portreserve.NewStore(),
         counter: 0,
     }
 }

-func (m *mockInfiniteStore) NextForLease(leaseID mtypes.LeaseID) int32 {
-    m.counter++
-    return 30000 + m.counter // Always return a port, never 0
-}
-
-func (m *mockInfiniteStore) ReserveForOrder(id mtypes.OrderID, count int, ttl time.Duration) []int32 {
-    ports := make([]int32, count)
-    for i := 0; i < count; i++ {
-        ports[i] = 30000 + int32(i)
-    }
-    return ports
-}
-
-func (m *mockInfiniteStore) PromoteOrderToLease(oid mtypes.OrderID, lid mtypes.LeaseID, ttl time.Duration) {
-    // Do nothing, just to satisfy interface
-}
+func (m *mockInfiniteStore) NextForLease(leaseID mtypes.LeaseID) int32 {
+    m.counter++
+    return 30000 + m.counter // Always return a port, never 0
+}
cluster/inventory.go (1)

490-507: Consider adding metrics for port reservation failures

While the implementation correctly handles port reservations and rollbacks, there's no observability into port reservation failures or rollbacks.

Add Prometheus metrics to track port reservation outcomes:

+var (
+    portReservationCounter = promauto.NewCounterVec(prometheus.CounterOpts{
+        Name: "provider_port_reservations_total",
+        Help: "Total number of port reservation attempts",
+    }, []string{"status"}) // status: success, rollback
+)

 if randomPortCount > 0 {
     // Use centralized port manager for order operations
     is.portManager.ReserveForOrder(req.order, int(randomPortCount), portreserve.DefaultReservationTTL)
     is.log.Info("reserved NodePorts for random port endpoints", "order", req.order, "count", randomPortCount)
     portsReserved = true
+    portReservationCounter.WithLabelValues("success").Inc()
 }

 // Add the reservation to the list - if this fails, we need to rollback port reservation
 defer func() {
     if portsReserved && err != nil {
         // Rollback: release the reserved ports on any error
         is.portManager.ReleaseOrder(req.order)
         is.log.Info("rolled back NodePort reservation due to error", "order", req.order, "error", err)
+        portReservationCounter.WithLabelValues("rollback").Inc()
     }
 }()
cluster/kube/builder/service.go (2)

205-217: Make port names stable, not index-based.

Index i in the name makes updates brittle and ties retention to list order. Consider deriving names from stable attributes (e.g., externalPort and protocol).

Example:

-                sp := corev1.ServicePort{
-                    Name:       fmt.Sprintf("%d-%d", i, int(externalPort)),
+                sp := corev1.ServicePort{
+                    Name:       fmt.Sprintf("%d-%s", int(externalPort), strings.ToLower(string(exposeProtocol))),

Note: Add strings import. Keep under k8s’ 15-char svc port name limit; truncate if necessary.


151-152: Nit: error string style.

Prefer lower-case, no trailing punctuation for error vars.

-var errUnsupportedProtocol = errors.New("Unsupported protocol for service")
+var errUnsupportedProtocol = errors.New("unsupported protocol for service")
cluster/monitor_test.go (3)

155-183: Tighten expectations in TestMonitorInstantiate.

Assert mock expectations to detect unintended calls.

   monitor := newDeploymentMonitor(myDeploymentManager)
   require.NotNil(t, monitor)

   monitor.lc.Shutdown(nil)
+  client.AssertExpectations(t)

185-229: Avoid potential flakiness: add a timeout when waiting for the first event.

A miswire would deadlock the test. Use require.Eventually or a select with time.After.

-  ev := <-sub.Events()
+  var ev interface{}
+  select {
+  case ev = <-sub.Events():
+  case <-time.After(5 * time.Second):
+      t.Fatal("timeout waiting for first cluster event")
+  }

230-286: Mirror the same assertions in Deployed test.

Add client.AssertExpectations(t) and an event wait timeout as above for symmetry and robustness.

cluster/portreserve/store_critical_test.go (5)

146-176: Reduce TTL flakiness window.

Sleeping 150ms for a 100ms TTL may be tight under CI load. Increase margin or use Eventually.

- shortTTL := 100 * time.Millisecond
+ shortTTL := 100 * time.Millisecond
@@
- time.Sleep(150 * time.Millisecond)
+ require.Eventually(t, func() bool {
+   return len(store.PortsForOrder(orderID)) == 0
+ }, 2*time.Second, 20*time.Millisecond, "Ports should expire within TTL")

173-176: Don’t over-constrain reuse ordering.

Requiring newPorts[0] == ports[0] ties the test to allocation order internals. Assert reuse/overlap instead.

- require.Equal(t, ports[0], newPorts[0],
-   "Should reuse the first expired port (sequential allocation)")
+ require.True(t, containsAny(newPorts, ports),
+   "Should reuse expired ports when available")

Helper to add in this test file:

func containsAny(a, b []int32) bool {
    m := map[int32]struct{}{}
    for _, x := range b { m[x] = struct{}{} }
    for _, y := range a { if _, ok := m[y]; ok { return true } }
    return false
}

178-213: Cross-process test: verify persistence write path explicitly.

If persistence is lazy, calling loadFromFile() on store2 may read nothing. Ensure store1 flushed (or call loadFromFile on store1 before creating store2) to harden the test’s intent.

 ports1 := store1.ReserveForOrder(orderID, 3, 5*time.Minute)
 require.Len(t, ports1, 3, "Store1 should reserve ports")
+// Ensure data is on disk before the second store loads it
+store1.loadFromFile() // no-op if store writes eagerly; ensures file exists

256-278: Exercise the actual recovery path.

Call loadFromFile() before operations so the code reads the corrupted file, then proceed. This validates recovery instead of bypassing it.

 store := NewStore()
 store.storageFile = storageFile
-// Should not crash on corrupted file, should continue working
+// Should not crash on corrupted file, should continue working
+store.loadFromFile()

76-97: Exhaustion test: also assert no duplicates on reuse.

After ReleaseOrder and new reservation, add a quick collision check to encode the invariant.

 ports3 := store.ReserveForOrder(order2, 3, 5*time.Minute)
 require.Len(t, ports3, 3, "Should get ports after release")
+for _, p := range ports3 {
+  require.NotContains(t, ports1, p, "Reused ports should only be available after release")
+}
cluster/port_integration_critical_test.go (1)

254-257: Clarify SetPortRange end-point inclusivity and assert it in tests.

Allocator loop uses p <= defaultEnd, so end is inclusive. Consider asserting expected first/last ports to catch regressions.

cluster/portreserve/store.go (3)

259-277: Loading from disk on every query may be costly and can reintroduce stale state.

Consider caching with a short freshness window, or only loading once at startup plus on explicit cleanup. If you keep loading here, ensure reconciliation (see prior comment).


281-320: Replace fmt.Printf debug logs with the project logger.

Use a logger passed in (or inject a logger into Store) and gate debug output; fmt.Printf is noisy and hard to control in production.


380-387: Document inclusivity of range and validate inputs.

Add a guard to ensure start <= end and both within NodePort range; return early otherwise.

Example:

 func (s *Store) SetPortRange(start, end int32) {
   s.mu.Lock()
   defer s.mu.Unlock()
+  if start > end {
+    start, end = end, start
+  }
   s.defaultStart = start
   s.defaultEnd = end
 }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 6ddc7da and b521ef3.

📒 Files selected for processing (16)
  • cluster/ctx/context.go (1 hunks)
  • cluster/inventory.go (7 hunks)
  • cluster/inventory_test.go (4 hunks)
  • cluster/kube/builder/builder_test.go (8 hunks)
  • cluster/kube/builder/service.go (3 hunks)
  • cluster/kube/client.go (4 hunks)
  • cluster/manager.go (4 hunks)
  • cluster/mocks/cluster.go (1 hunks)
  • cluster/mocks/service.go (2 hunks)
  • cluster/monitor_test.go (3 hunks)
  • cluster/port_integration_critical_test.go (1 hunks)
  • cluster/port_manager.go (1 hunks)
  • cluster/port_manager_critical_test.go (1 hunks)
  • cluster/portreserve/store.go (1 hunks)
  • cluster/portreserve/store_critical_test.go (1 hunks)
  • gateway/rest/router.go (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • cluster/mocks/cluster.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • gateway/rest/router.go
  • cluster/manager.go
🧰 Additional context used
🧬 Code graph analysis (11)
cluster/port_manager_critical_test.go (2)
cluster/portreserve/store.go (2)
  • NewStore (64-72)
  • Store (49-56)
cluster/port_manager.go (1)
  • NewPortManager (46-52)
cluster/kube/builder/service.go (4)
cluster/kube/builder/mocks/service_port_allocator.go (1)
  • ServicePortAllocator (8-10)
cluster/service.go (1)
  • Service (99-109)
service.go (1)
  • Service (57-62)
cluster/kube/builder/workload.go (1)
  • Workload (32-38)
cluster/kube/client.go (3)
cluster/ctx/context.go (2)
  • PortManagerFrom (23-29)
  • LeaseIDFrom (37-40)
cluster/port_manager.go (1)
  • PortManager (14-28)
cluster/kube/builder/service.go (1)
  • BuildService (29-43)
cluster/inventory.go (2)
cluster/port_manager.go (2)
  • PortManager (14-28)
  • NewPortManager (46-52)
cluster/portreserve/store.go (2)
  • GetSharedStore (76-83)
  • DefaultReservationTTL (18-18)
cluster/monitor_test.go (5)
pkg/apis/akash.network/v2beta2/types.go (1)
  • LeaseID (49-55)
cluster/types/v1beta3/types.go (3)
  • EventsWatcher (72-77)
  • ServiceLog (31-35)
  • ExecResult (117-119)
pkg/apis/akash.network/v2beta2/manifest.go (1)
  • ManifestGroup (63-68)
pkg/apis/akash.network/v2beta2/provider_lease_ip.go (1)
  • ProviderLeasedIPSpec (30-37)
cluster/types/v1beta3/deployment.go (1)
  • Deployment (18-23)
cluster/inventory_test.go (3)
cluster/client.go (1)
  • Client (66-100)
cluster/mocks/client.go (1)
  • Client (32-34)
mocks/client.go (1)
  • Client (26-28)
cluster/portreserve/store_critical_test.go (1)
cluster/portreserve/store.go (1)
  • NewStore (64-72)
cluster/port_manager.go (1)
cluster/portreserve/store.go (1)
  • Store (49-56)
cluster/kube/builder/builder_test.go (2)
cluster/kube/builder/mocks/service_port_allocator.go (1)
  • NewServicePortAllocator (71-81)
cluster/kube/builder/service.go (1)
  • BuildService (29-43)
cluster/port_integration_critical_test.go (2)
cluster/portreserve/store.go (3)
  • GetSharedStore (76-83)
  • DefaultReservationTTL (18-18)
  • NewStore (64-72)
cluster/port_manager.go (1)
  • NewPortManager (46-52)
cluster/mocks/service.go (3)
cluster/service.go (1)
  • Service (99-109)
service.go (1)
  • Service (57-62)
cluster/port_manager.go (1)
  • PortManager (14-28)
🔇 Additional comments (14)
cluster/inventory_test.go (3)

117-118: Decoupling tests from cluster client looks good.

Passing nil here removes an unnecessary dependency and simplifies the scaffold.


213-215: LGTM: nil cluster client usage is consistent.

Keeps the tests focused on inventory logic.


278-279: Type change improves flexibility; ensure nil is handled.

Changing clusterClient to the interface type is a good move. Please confirm newInventoryService gracefully handles a nil client (no dereference without nil checks).

cluster/kube/client.go (2)

35-35: Correct import of typed context helpers.

The dedicated package removes stringly-typed context keys scattered across the codebase.


1170-1178: Adapter implementation is correct and minimal.

Bridges PortManager to the builder allocator cleanly.

cluster/kube/builder/builder_test.go (1)

349-351: Local service builder usage is correct.

No allocator required; assertions are sufficient.

cluster/port_manager_critical_test.go (2)

15-55: Excellent test coverage for the critical user requirement!

This test thoroughly validates the consistency between proposed ports during bidding and deployed ports during lease execution, which directly addresses the core objective from issue #234. The sequential validation of each phase (reservation → query → promotion → allocation) ensures the critical invariant is maintained.


170-204: Comprehensive TTL-based cleanup test!

The test effectively validates the cleanup mechanism, ensuring expired reservations are removed while valid ones persist. This is crucial for preventing port exhaustion over time.

cluster/mocks/service.go (2)

1-1: Mockery version updated to v2.53.5

The mock generator has been updated from v2.52.2 to v2.53.5.


252-297: Correctly generated mock implementation for PortManager accessor

The mock implementation properly supports the new PortManager() method with typed expectation helpers, enabling comprehensive testing of port management functionality.

cluster/inventory.go (2)

176-195: Well-implemented periodic cleanup mechanism

The cleanup goroutine properly handles both context cancellation and service shutdown, preventing goroutine leaks. The minute interval is reasonable for TTL-based cleanup.


170-174: Good addition of PortManager accessor method

The public accessor method follows Go conventions and enables external components to interact with the centralized port management system.

cluster/monitor_test.go (1)

28-153: Test mock surface looks solid.

The mock implements the required client methods with correct signatures; type assertions align. No concerns.

cluster/port_integration_critical_test.go (1)

169-219: No additional locking needed—thread-safety verified
portManager’s leaseState map is consistently protected by pm.mu (sync.RWMutex) in AllocatePorts, ensureLeaseState, ensureLeaseStateLocked, ReleaseLease, and Stats; ReserveForOrder/PromoteOrderToLease delegate to the underlying store. The concurrency test can run as is.

Comment on lines 141 to 143
// Create centralized port manager with shared store
is.portManager = NewPortManager(is.log, portreserve.GetSharedStore())

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider validating port range configuration

The port manager is initialized with a shared store, but there's no validation of the port range configuration. Invalid ranges could cause issues during deployment.

Add validation to ensure the port range is properly configured:

 // Create centralized port manager with shared store
-is.portManager = NewPortManager(is.log, portreserve.GetSharedStore())
+store := portreserve.GetSharedStore()
+// Validate port range configuration
+if config.NodePortMin > 0 && config.NodePortMax > config.NodePortMin {
+    store.SetPortRange(int32(config.NodePortMin), int32(config.NodePortMax))
+}
+is.portManager = NewPortManager(is.log, store)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Create centralized port manager with shared store
is.portManager = NewPortManager(is.log, portreserve.GetSharedStore())
// Create centralized port manager with shared store
store := portreserve.GetSharedStore()
// Validate port range configuration
if config.NodePortMin > 0 && config.NodePortMax > config.NodePortMin {
store.SetPortRange(int32(config.NodePortMin), int32(config.NodePortMax))
}
is.portManager = NewPortManager(is.log, store)
🤖 Prompt for AI Agents
In cluster/inventory.go around lines 141 to 143, validate the configured port
range before instantiating the PortManager: read the configured min/max ports
(from config or env), ensure they are integers, min <= max, and both within
acceptable bounds (e.g. 1-65535); log and return an error (or fallback to a sane
default) if validation fails, and pass the validated range into NewPortManager
(or call a PortManager constructor that accepts the validated range) so the
PortManager is never created with an invalid range.

Comment on lines 175 to 179
// Allocate all needed ports at once
if nodePortCount > 0 {
allocatedPorts = b.portAllocator.AllocatePorts(service.Name, nodePortCount)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enforce full pre-allocation when NodePorts are required.

If AllocatePorts returns fewer ports than requested, we silently fall back to Kubernetes auto-allocation, breaking the “use reserved ports if a lease is created” guarantee and producing mismatches with proposed ports. Fail fast instead.

Apply this diff to validate the allocation and surface an actionable error:

 func (b *service) ports() ([]corev1.ServicePort, error) {
@@
-        // Allocate all needed ports at once
-        if nodePortCount > 0 {
-            allocatedPorts = b.portAllocator.AllocatePorts(service.Name, nodePortCount)
-        }
+        // Allocate all needed ports at once
+        if nodePortCount > 0 {
+            allocatedPorts = b.portAllocator.AllocatePorts(service.Name, nodePortCount)
+            if len(allocatedPorts) != nodePortCount {
+                return nil, fmt.Errorf("allocated %d NodePorts, expected %d", len(allocatedPorts), nodePortCount)
+            }
+        }
@@
-                // Assign NodePort using allocator
-                if b.requireNodePort && len(allocatedPorts) > allocatedIndex {
-                    sp.NodePort = allocatedPorts[allocatedIndex]
-                    allocatedIndex++
-                }
+                // Assign NodePort using allocator (allocation count validated above)
+                if b.requireNodePort {
+                    sp.NodePort = allocatedPorts[allocatedIndex]
+                    allocatedIndex++
+                }

Also applies to: 212-216

🤖 Prompt for AI Agents
In cluster/kube/builder/service.go around lines 175-179 (and also apply the same
change at 212-216), the code calls b.portAllocator.AllocatePorts(service.Name,
nodePortCount) but does not verify that the allocator returned the requested
number of ports; if fewer ports are returned we should fail fast instead of
letting Kubernetes auto-allocate. After calling AllocatePorts, validate that
len(allocatedPorts) == nodePortCount and if not return an actionable error
(including service.Name, requested count and actual count and any allocator
error) from the function so the caller can handle the failure rather than
proceeding with partial allocation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
cluster/inventory.go (1)

661-687: Leak: release reserved ports on unreserve

When a bid is lost/unreserved, release order-scoped reservations immediately to free capacity.

       is.log.Info("removing reservation", "order", res.OrderID())
 
       state.reservations = append(state.reservations[:idx], state.reservations[idx+1:]...)
       // reclaim availableExternalPorts if unreserving allocated resources
       if res.allocated {
         is.availableExternalPorts += reservationCountEndpoints(res)
       }
 
+      // Release any NodePort reservations tied to this order
+      is.portManager.ReleaseOrder(req.order)
+      is.log.Info("released NodePort reservations for order", "order", req.order)
+
       req.ch <- inventoryResponse{value: res}
♻️ Duplicate comments (2)
cluster/kube/builder/service.go (1)

215-221: Remove redundant length guard and assert full consumption

You already validated len(allocatedPorts) == nodePortCount. Make assignment unconditional and assert full consumption to catch logic regressions.

-                // Assign NodePort using allocator
-                if b.requireNodePort && len(allocatedPorts) > allocatedIndex {
-                    sp.NodePort = allocatedPorts[allocatedIndex]
-                    allocatedIndex++
-                }
+                // Assign NodePort using allocator (allocation count already validated)
+                if b.requireNodePort {
+                    sp.NodePort = allocatedPorts[allocatedIndex]
+                    allocatedIndex++
+                }

Optionally, after the loop:

+    if b.requireNodePort && allocatedIndex != len(allocatedPorts) {
+        return nil, fmt.Errorf("internal error: consumed %d of %d allocated ports", allocatedIndex, len(allocatedPorts))
+    }
cluster/portreserve/store.go (1)

238-277: Avoid partial reservations (fail fast) to prevent deploy-time mismatch

Persisting fewer than requested ports can lead to builder errors after promotion. Only commit when fully satisfied.

   for p <= s.defaultEnd && len(ports) < count {
     if !s.inUseLocked(p) {
       ports = append(ports, p)
     }
     p++
   }
 
-  res := &reservation{Ports: ports, Next: 0, Expires: time.Now().Add(ttl)}
-  s.byOrder[ok] = res
+  // Ensure we satisfied the request; otherwise, do not persist a partial reservation.
+  if len(ports) < count {
+    return nil
+  }
+  res := &reservation{Ports: ports, Next: 0, Expires: time.Now().Add(ttl)}
+  s.byOrder[ok] = res
🧹 Nitpick comments (8)
cluster/kube/builder/service.go (2)

45-60: Allocator interface can’t surface errors

Consider returning ([]int32, error) to propagate allocation failures (e.g., range exhausted) instead of encoding them as length mismatches at call sites.


159-183: Pre-allocation logic: dedup + count validation looks right

Unique counting and strict length check prevent drift between proposed and assigned ports. Minor: compute and reuse now if used repeatedly (not urgent here).

cluster/portreserve/store.go (4)

99-163: Atomic, permissioned writes: nice hardening; add dir fsync

Add a directory fsync after rename to ensure durability across crashes.

 if err := os.Rename(tempPath, s.storageFile); err != nil {
   os.Remove(tempPath)
   return fmt.Errorf("failed to rename temp file to target: %w", err)
 }
+
+// Ensure directory entry is flushed
+df, err := os.Open(dir)
+if err == nil {
+  _ = df.Sync()
+  _ = df.Close()
+}

279-297: Holding lock while reading disk

PortsForOrder calls loadFromFile under s.mu. Consider loading into a temp struct without the lock, then merging under lock to reduce critical-section time. Not urgent.


299-322: Use logger instead of fmt.Printf

These debug prints bypass structured logging. Consider injecting a logger or using a lightweight log func.


405-411: Validate port range on SetPortRange

Guard against invalid ranges.

 func (s *Store) SetPortRange(start, end int32) {
   s.mu.Lock()
   defer s.mu.Unlock()
-  s.defaultStart = start
-  s.defaultEnd = end
+  if start < 1 || end > 65535 || start > end {
+    return
+  }
+  s.defaultStart = start
+  s.defaultEnd = end
 }
cluster/inventory.go (2)

141-163: Port range: wire from config when available

Defaults are fine for now. When config lands, plumb min/max from config with validation here.


501-529: Deferred rollback never triggers (err is immutable and nil here)

The defer checks err, which won’t change after Adjust succeeds; it will never roll back. Either remove the defer or switch to a named return error and set it on all exit paths.

- // Add the reservation to the list - if this fails, we need to rollback port reservation
- defer func() {
-   if portsReserved && err != nil {
-     // Rollback: release the reserved ports on any error
-     is.portManager.ReleaseOrder(req.order)
-     is.log.Info("rolled back NodePort reservation due to error", "order", req.order, "error", err)
-   }
- }()
+ // No later error paths modify state; defer not needed. Remove to avoid confusion.
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between b521ef3 and a19f356.

📒 Files selected for processing (4)
  • cluster/inventory.go (7 hunks)
  • cluster/kube/builder/service.go (3 hunks)
  • cluster/port_integration_critical_test.go (1 hunks)
  • cluster/portreserve/store.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • cluster/port_integration_critical_test.go
🧰 Additional context used
🧬 Code graph analysis (2)
cluster/inventory.go (2)
cluster/port_manager.go (2)
  • PortManager (14-28)
  • NewPortManager (46-52)
cluster/portreserve/store.go (2)
  • GetSharedStore (81-88)
  • DefaultReservationTTL (18-18)
cluster/kube/builder/service.go (4)
cluster/service.go (1)
  • Service (99-109)
service.go (1)
  • Service (57-62)
cluster/kube/builder/workload.go (1)
  • Workload (32-38)
pkg/apis/akash.network/v2beta2/manifest.go (1)
  • ManifestGroup (63-68)
🔇 Additional comments (4)
cluster/kube/builder/service.go (2)

29-33: Good: return error instead of panic

BuildService now fails fast and propagates an actionable error. LGTM.


107-125: NodePort retention on Update is correct

Matching on name/port/target/proto to preserve NodePort is sound.

cluster/portreserve/store.go (1)

63-77: Testability improvement: constructor with file path

NewStoreWithFile enables isolated tests. Good addition.

cluster/inventory.go (1)

197-216: Background cleanup loop

Periodic CleanupExpired is a good safeguard; logs on >0 cleans keep noise low. LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Expose the ports assigned within the container

1 participant