Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions _run/common-commands.mk
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ provider-lease-status:
--provider "$(PROVIDER_ADDRESS)" \
--auth-type=$(GW_AUTH_TYPE)

.PHONY: provider-bid-proposed-ports
provider-bid-proposed-ports:
curl -sk "https://localhost:8443/bid-proposed-ports/$(KEY_ADDRESS)/$(DSEQ)/$(GSEQ)/$(OSEQ)"


.PHONY: provider-service-status
provider-service-status:
$(PROVIDER_SERVICES) lease-status \
Expand Down
12 changes: 2 additions & 10 deletions _run/kube/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,14 @@ services:
image: quay.io/ovrclk/demo-app
expose:
- port: 80
as: 80
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this diff to show example of deployment i used to create NodePort for randpom port assigments

http_options:
max_body_size: 2097152
next_cases:
- off
accept:
- hello.localhost
as: 8080
to:
- global: true
bew:
image: quay.io/ovrclk/demo-app
expose:
- port: 80
as: 80
accept:
- hello1.localhost
as: 8081
to:
- global: true

Expand Down
40 changes: 40 additions & 0 deletions cluster/ctx/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ctx

import (
"context"

mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4"
)

// contextKey is a private type for context keys to avoid collisions
type contextKey string

const (
portManagerKey contextKey = "port-manager"
leaseIDKey contextKey = "lease-id"
)

// WithPortManager adds a PortManager to the context
func WithPortManager(ctx context.Context, pm interface{}) context.Context {
return context.WithValue(ctx, portManagerKey, pm)
}

// PortManagerFrom retrieves a PortManager from the context
func PortManagerFrom(ctx context.Context) (interface{}, bool) {
pm := ctx.Value(portManagerKey)
if pm == nil {
return nil, false
}
return pm, true
}

// WithLeaseID adds a LeaseID to the context
func WithLeaseID(ctx context.Context, leaseID mtypes.LeaseID) context.Context {
return context.WithValue(ctx, leaseIDKey, leaseID)
}

// LeaseIDFrom retrieves a LeaseID from the context
func LeaseIDFrom(ctx context.Context) (mtypes.LeaseID, bool) {
leaseID, ok := ctx.Value(leaseIDKey).(mtypes.LeaseID)
return leaseID, ok
}
76 changes: 75 additions & 1 deletion cluster/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
sdlutil "github.com/akash-network/node/sdl/util"
"github.com/akash-network/node/util/runner"

"github.com/akash-network/provider/cluster/portreserve"
ctypes "github.com/akash-network/provider/cluster/types/v1beta3"
cinventory "github.com/akash-network/provider/cluster/types/v1beta3/clients/inventory"
cip "github.com/akash-network/provider/cluster/types/v1beta3/clients/ip"
Expand Down Expand Up @@ -99,6 +100,7 @@ type inventoryService struct {
lc lifecycle.Lifecycle
waiter waiter.OperatorWaiter
availableExternalPorts uint
portManager PortManager // Centralized port allocation management

clients struct {
ip cip.Client
Expand Down Expand Up @@ -136,6 +138,30 @@ func newInventoryService(
waiter: waiter,
}

// Create centralized port manager with shared store and validated port range
store := portreserve.GetSharedStore()

// Validate port range configuration
minPort := int32(30100) // Default minimum
maxPort := int32(32767) // Default maximum

// TODO: Read from config when port range configuration is added
// For now, use safe defaults

if minPort < 1 || minPort > 65535 {
return nil, fmt.Errorf("invalid minimum port %d: must be between 1-65535", minPort)
}
if maxPort < 1 || maxPort > 65535 {
return nil, fmt.Errorf("invalid maximum port %d: must be between 1-65535", maxPort)
}
if minPort > maxPort {
return nil, fmt.Errorf("invalid port range: minimum port %d > maximum port %d", minPort, maxPort)
}

// Set validated port range on store
store.SetPortRange(minPort, maxPort)
is.portManager = NewPortManager(is.log, store)

is.clients.inventory = cfromctx.ClientInventoryFromContext(ctx)
is.clients.ip = cfromctx.ClientIPFromContext(ctx)

Expand All @@ -149,6 +175,7 @@ func newInventoryService(

go is.lc.WatchChannel(ctx.Done())
go is.run(ctx, reservations)
go is.startPortCleanup(ctx)

return is, nil
}
Expand All @@ -161,6 +188,33 @@ func (is *inventoryService) ready() <-chan struct{} {
return is.readych
}

// PortManager returns the centralized port manager instance.
func (is *inventoryService) PortManager() PortManager {
return is.portManager
}

// startPortCleanup runs periodic cleanup of expired port reservations.
func (is *inventoryService) startPortCleanup(ctx context.Context) {
ticker := time.NewTicker(time.Minute) // Cleanup every minute
defer ticker.Stop()

for {
select {
case <-ticker.C:
cleaned := is.portManager.CleanupExpired()
if cleaned > 0 {
is.log.Info("cleaned up expired port reservations", "count", cleaned)
}
case <-ctx.Done():
is.log.Info("port cleanup stopped")
return
case <-is.lc.ShuttingDown():
is.log.Info("port cleanup stopped due to service shutdown")
return
}
}
}

func (is *inventoryService) lookup(order mtypes.OrderID, resources dtypes.ResourceGroup) (ctypes.Reservation, error) {
ch := make(chan inventoryResponse, 1)
req := inventoryRequest{
Expand Down Expand Up @@ -444,6 +498,7 @@ func (is *inventoryService) handleRequest(req inventoryRequest, state *inventory
reservation.ipsConfirmed = true // No IPs, just mark it as confirmed implicitly
}

// First, check if we have sufficient capacity before reserving any resources
err := state.inventory.Adjust(reservation)
if err != nil {
is.log.Info("insufficient capacity for reservation", "order", req.order)
Expand All @@ -452,7 +507,26 @@ func (is *inventoryService) handleRequest(req inventoryRequest, state *inventory
return
}

// Add the reservation to the list
// Only reserve NodePorts after successful capacity check
randomPortCount := reservationCountEndpoints(reservation)
var portsReserved bool

if randomPortCount > 0 {
// Use centralized port manager for order operations
is.portManager.ReserveForOrder(req.order, int(randomPortCount), portreserve.DefaultReservationTTL)
is.log.Info("reserved NodePorts for random port endpoints", "order", req.order, "count", randomPortCount)
portsReserved = true
}

// Add the reservation to the list - if this fails, we need to rollback port reservation
defer func() {
if portsReserved && err != nil {
// Rollback: release the reserved ports on any error
is.portManager.ReleaseOrder(req.order)
is.log.Info("rolled back NodePort reservation due to error", "order", req.order, "error", err)
}
}()

state.reservations = append(state.reservations, reservation)
req.ch <- inventoryResponse{value: reservation}
inventoryRequestsCounter.WithLabelValues("reserve", "create").Inc()
Expand Down
15 changes: 4 additions & 11 deletions cluster/inventory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/akash-network/node/pubsub"
"github.com/akash-network/node/testutil"

"github.com/akash-network/provider/cluster/mocks"
ctypes "github.com/akash-network/provider/cluster/types/v1beta3"
cinventory "github.com/akash-network/provider/cluster/types/v1beta3/clients/inventory"
cip "github.com/akash-network/provider/cluster/types/v1beta3/clients/ip"
Expand Down Expand Up @@ -100,8 +99,6 @@ func TestInventory_ClusterDeploymentNotDeployed(t *testing.T) {

deployments := make([]ctypes.IDeployment, 0)

clusterClient := &mocks.Client{}

ctx, cancel := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, fromctx.CtxKeyPubSub, tpubsub.New(ctx, 1000))

Expand All @@ -117,7 +114,7 @@ func TestInventory_ClusterDeploymentNotDeployed(t *testing.T) {
config,
myLog,
subscriber,
clusterClient,
nil,
waiter.NewNullWaiter(), // Do not need to wait in test
deployments)
require.NoError(t, err)
Expand Down Expand Up @@ -192,8 +189,6 @@ func TestInventory_ClusterDeploymentDeployed(t *testing.T) {

deployments[0] = deployment

clusterClient := &mocks.Client{}

// clusterInv := newInventory("nodeA")
//
// inventoryCalled := make(chan int, 1)
Expand All @@ -215,7 +210,7 @@ func TestInventory_ClusterDeploymentDeployed(t *testing.T) {
config,
myLog,
subscriber,
clusterClient,
nil,
waiter.NewNullWaiter(), // Do not need to wait in test
deployments)
require.NoError(t, err)
Expand Down Expand Up @@ -280,7 +275,7 @@ type inventoryScaffold struct {
donech chan struct{}
// inventoryCalled chan struct{}
bus pubsub.Bus
clusterClient *mocks.Client
clusterClient Client
}

func makeInventoryScaffold(t *testing.T, leaseQty uint) *inventoryScaffold {
Expand Down Expand Up @@ -333,9 +328,7 @@ func makeInventoryScaffold(t *testing.T, leaseQty uint) *inventoryScaffold {
Resources: deploymentRequirements,
}

cclient := &mocks.Client{}

scaffold.clusterClient = cclient
scaffold.clusterClient = nil

return scaffold
}
Expand Down
28 changes: 22 additions & 6 deletions cluster/kube/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/akash-network/provider/cluster/kube/builder/mocks"
crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
)

Expand Down Expand Up @@ -109,7 +110,11 @@ func TestGlobalServiceBuilder(t *testing.T) {

workload, err := NewWorkloadBuilder(myLog, mySettings, cdep, mani, 0)
require.NoError(t, err)
serviceBuilder := BuildService(workload, true)

// This test has no expose directives, so AllocatePorts should not be called
mockAllocator := mocks.NewServicePortAllocator(t)
serviceBuilder, err := BuildService(workload, true, mockAllocator)
require.NoError(t, err)
require.NotNil(t, serviceBuilder)
// Should have name ending with suffix
require.Equal(t, "myservice-np", serviceBuilder.Name())
Expand Down Expand Up @@ -158,7 +163,8 @@ func TestLocalServiceBuilder(t *testing.T) {
workload, err := NewWorkloadBuilder(myLog, mySettings, cdep, mani, 0)
require.NoError(t, err)

serviceBuilder := BuildService(workload, false)
serviceBuilder, err := BuildService(workload, false, nil)
require.NoError(t, err)
require.NotNil(t, serviceBuilder)
// Should have name verbatim
require.Equal(t, "myservice", serviceBuilder.Name())
Expand Down Expand Up @@ -212,7 +218,10 @@ func TestGlobalServiceBuilderWithoutGlobalServices(t *testing.T) {
workload, err := NewWorkloadBuilder(myLog, mySettings, cdep, mani, 0)
require.NoError(t, err)

serviceBuilder := BuildService(workload, true)
// This test has expose directives but only local services (Global: false), so AllocatePorts should not be called for global service builder
mockAllocator := mocks.NewServicePortAllocator(t)
serviceBuilder, err := BuildService(workload, true, mockAllocator)
require.NoError(t, err)

// Should not have any work to do
require.False(t, serviceBuilder.Any())
Expand Down Expand Up @@ -272,7 +281,11 @@ func TestGlobalServiceBuilderWithGlobalServices(t *testing.T) {
workload, err := NewWorkloadBuilder(myLog, mySettings, cdep, mani, 0)
require.NoError(t, err)

serviceBuilder := BuildService(workload, true)
// This test has one global service, so AllocatePorts should be called with count=1
mockAllocator := mocks.NewServicePortAllocator(t)
mockAllocator.EXPECT().AllocatePorts("myservice", 1).Return([]int32{30000})
serviceBuilder, err := BuildService(workload, true, mockAllocator)
require.NoError(t, err)

// Should have work to do
require.True(t, serviceBuilder.Any())
Expand All @@ -285,6 +298,7 @@ func TestGlobalServiceBuilderWithGlobalServices(t *testing.T) {
require.Equal(t, ports[0].Port, int32(1001))
require.Equal(t, ports[0].TargetPort, intstr.FromInt(1000))
require.Equal(t, ports[0].Name, "0-1001")
require.Equal(t, int32(30000), ports[0].NodePort)
}

func TestLocalServiceBuilderWithoutLocalServices(t *testing.T) {
Expand Down Expand Up @@ -332,7 +346,8 @@ func TestLocalServiceBuilderWithoutLocalServices(t *testing.T) {
workload, err := NewWorkloadBuilder(myLog, mySettings, cdep, mani, 0)
require.NoError(t, err)

serviceBuilder := BuildService(workload, false)
serviceBuilder, err := BuildService(workload, false, nil)
require.NoError(t, err)

// Should have work to do
require.False(t, serviceBuilder.Any())
Expand Down Expand Up @@ -392,7 +407,8 @@ func TestLocalServiceBuilderWithLocalServices(t *testing.T) {
workload, err := NewWorkloadBuilder(myLog, mySettings, cdep, mani, 0)
require.NoError(t, err)

serviceBuilder := BuildService(workload, false)
serviceBuilder, err := BuildService(workload, false, nil)
require.NoError(t, err)

// Should have work to do
require.True(t, serviceBuilder.Any())
Expand Down
Loading