diff --git a/cluster/kube/builder/builder.go b/cluster/kube/builder/builder.go index 682b19620..e86a51f67 100644 --- a/cluster/kube/builder/builder.go +++ b/cluster/kube/builder/builder.go @@ -57,6 +57,8 @@ const ( envVarAkashOwner = "AKASH_OWNER" envVarAkashProvider = "AKASH_PROVIDER" envVarAkashClusterPublicHostname = "AKASH_CLUSTER_PUBLIC_HOSTNAME" + envVarAkashIngressHostname = "AKASH_INGRESS_HOST" + envVarAkashIngressCustomHostname = "AKASH_INGRESS_CUSTOM_HOST" ) var ( diff --git a/cluster/kube/builder/config.go b/cluster/kube/builder/config.go new file mode 100644 index 000000000..14d35ed8e --- /dev/null +++ b/cluster/kube/builder/config.go @@ -0,0 +1,125 @@ +package builder + +const ( + // Config system constants + AkashConfigVolume = "akash-cfg" + AkashConfigMount = "/akash-cfg" + AkashConfigInitName = "akash-init" + AkashConfigEnvFile = "config.env" + + // RBAC constants + AkashRoleName = "akash-role" + AkashRoleBinding = "akash-binding" + + // Init container script + akashInitScript = ` + # Install jq + apk add --no-cache jq curl &>/dev/null + + # Define default paths if not set + AKASH_CONFIG_PATH="${AKASH_CONFIG_PATH:-/akash/config}" + AKASH_CONFIG_FILE="${AKASH_CONFIG_FILE:-env.sh}" + + # Validate paths + [ "$AKASH_CONFIG_PATH" = "/" ] && AKASH_CONFIG_PATH="/tmp/akash" + AKASH_CONFIG_PATH="${AKASH_CONFIG_PATH%/}" + + # Create config directory if it doesn't exist + mkdir -p "${AKASH_CONFIG_PATH}" + + if [ "$AKASH_REQUIRES_NODEPORT" != "true" ]; then + touch "${AKASH_CONFIG_PATH}/${AKASH_CONFIG_FILE}" + echo "# No NodePorts required" >> "${AKASH_CONFIG_PATH}/${AKASH_CONFIG_FILE}" + exit 0 + fi + + # Get service information using the Kubernetes API + NAMESPACE=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace) + TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token) + CACERT=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt + API_SERVER="https://kubernetes.default.svc:443" + BASE_API_URL="${API_SERVER}/api/v1/namespaces/${NAMESPACE}" + + # Function to get first valid service using jq + get_valid_service() { + local service_name="$1" + local services_json + + services_json=$(curl -s --cacert "${CACERT}" -H "Authorization: Bearer ${TOKEN}" "${BASE_API_URL}/services/") + + echo "$services_json" | jq -r " + (.items[] | select(.metadata.name == \"${service_name}\") | .metadata.name) // + (.items[] | select(.metadata.name == \"${service_name}-np\") | .metadata.name) // + (.items[] | select(.metadata.name | contains(\"${service_name}\")) | .metadata.name) // + empty + " | head -n 1 + } + + # Get the valid service name + ACTUAL_SERVICE_NAME=$(get_valid_service "${SERVICE_NAME}") + + [ -z "$ACTUAL_SERVICE_NAME" ] && ACTUAL_SERVICE_NAME="${SERVICE_NAME}" + + API_URL="${BASE_API_URL}/services/${ACTUAL_SERVICE_NAME}" + TEMP_FILE="${AKASH_CONFIG_PATH}/.tmp.${AKASH_CONFIG_FILE}" + CONFIG_FILE="${AKASH_CONFIG_PATH}/${AKASH_CONFIG_FILE}" + + # Create initial config file header + echo "# Akash config generated on $(date)" > "$TEMP_FILE" + echo "# Service: ${ACTUAL_SERVICE_NAME}" >> "$TEMP_FILE" + + # Add retries with exponential backoff + MAX_ATTEMPTS=30 + for i in $(seq 1 $MAX_ATTEMPTS); do + # Query the service to get NodePort mappings + RESPONSE=$(curl -s --max-time 5 --retry 3 --retry-delay 1 --cacert "${CACERT}" \ + -H "Authorization: Bearer ${TOKEN}" \ + "${API_URL}") + + # Check service type first + SERVICE_TYPE=$(echo "$RESPONSE" | jq -r '.spec.type // "unknown"') + + if [ "$SERVICE_TYPE" = "NodePort" ]; then + # Service is NodePort, extract nodePort values + NODE_PORTS=$(echo "$RESPONSE" | jq -r '.spec.ports[] | select(.nodePort != null) | "export AKASH_EXTERNAL_PORT_\(.targetPort)+=\(.nodePort)"' 2>/dev/null || echo "") + + if [ -n "$NODE_PORTS" ]; then + echo "$NODE_PORTS" >> "$TEMP_FILE" + mv "$TEMP_FILE" "$CONFIG_FILE" + exit 0 + fi + elif [ "$SERVICE_TYPE" = "LoadBalancer" ]; then + # Service is LoadBalancer, check for external IPs + EXTERNAL_IPS=$(echo "$RESPONSE" | jq -r '.status.loadBalancer.ingress[]?.ip // empty' 2>/dev/null || echo "") + if [ -n "$EXTERNAL_IPS" ]; then + echo "export AKASH_EXTERNAL_IP+=${EXTERNAL_IPS}" >> "$TEMP_FILE" + mv "$TEMP_FILE" "$CONFIG_FILE" + exit 0 + fi + elif [ "$SERVICE_TYPE" = "ClusterIP" ]; then + # Service is ClusterIP with dedicated IP + echo "# Service type is ClusterIP with dedicated IP" >> "$TEMP_FILE" + + # Get service ports for reference + PORTS=$(echo "$RESPONSE" | jq -r '.spec.ports[] | "# Port \(.port) -> \(.targetPort)"' 2>/dev/null || echo "# No port mappings found") + echo "$PORTS" >> "$TEMP_FILE" + + # Move to final file after waiting for some time, in case it's still being configured + if [ $i -gt 5 ]; then + mv "$TEMP_FILE" "$CONFIG_FILE" + exit 0 + fi + fi + + # Exponential backoff with max of 10 seconds + SLEEP_TIME=$((2 ** ((i-1) > 3 ? 3 : (i-1)))) + sleep $SLEEP_TIME + done + + # Create empty config file to prevent container from failing + echo "# Warning: Service configuration timeout after $MAX_ATTEMPTS attempts" >> "$TEMP_FILE" + echo "# Service type: ${SERVICE_TYPE}" >> "$TEMP_FILE" + mv "$TEMP_FILE" "$CONFIG_FILE" + exit 0 +` +) diff --git a/cluster/kube/builder/deployment.go b/cluster/kube/builder/deployment.go index 3a820b199..b1de010d4 100644 --- a/cluster/kube/builder/deployment.go +++ b/cluster/kube/builder/deployment.go @@ -1,6 +1,8 @@ package builder import ( + "strconv" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,12 +33,66 @@ func NewDeployment(workload *Workload) Deployment { func (b *deployment) Create() (*appsv1.Deployment, error) { // nolint:golint,unparam falseValue := false + trueValue := true revisionHistoryLimit := int32(10) maxSurge := intstr.FromInt32(0) maxUnavailable := intstr.FromInt32(1) + // Add config volume + configVolume := corev1.Volume{ + Name: AkashConfigVolume, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + } + + // Calculate if NodePort is required + requiresNodePort := false + service := &b.deployment.ManifestGroup().Services[b.serviceIdx] + for _, expose := range service.Expose { + if expose.Global { + requiresNodePort = true + break + } + } + + // Add init container + initContainer := corev1.Container{ + Name: AkashConfigInitName, + Image: "alpine/curl:3.14", + Command: []string{ + "/bin/sh", + "-c", + akashInitScript, + }, + Env: []corev1.EnvVar{ + { + Name: "SERVICE_NAME", + Value: b.Name(), + }, + { + Name: "AKASH_CONFIG_PATH", + Value: AkashConfigMount, + }, + { + Name: "AKASH_CONFIG_FILE", + Value: AkashConfigEnvFile, + }, + { + Name: "AKASH_REQUIRES_NODEPORT", + Value: strconv.FormatBool(requiresNodePort), + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: AkashConfigVolume, + MountPath: AkashConfigMount, + }, + }, + } + kdeployment := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: b.Name(), @@ -65,10 +121,11 @@ func (b *deployment) Create() (*appsv1.Deployment, error) { // nolint:golint,unp SecurityContext: &corev1.PodSecurityContext{ RunAsNonRoot: &falseValue, }, - AutomountServiceAccountToken: &falseValue, - Containers: []corev1.Container{b.container()}, - ImagePullSecrets: b.secretsRefs, - Volumes: b.volumesObjs, + AutomountServiceAccountToken: &trueValue, + InitContainers: []corev1.Container{initContainer}, + Containers: []corev1.Container{b.container()}, + ImagePullSecrets: b.secretsRefs, + Volumes: append(b.volumesObjs, configVolume), }, }, }, diff --git a/cluster/kube/builder/rbac.go b/cluster/kube/builder/rbac.go new file mode 100644 index 000000000..764e48d09 --- /dev/null +++ b/cluster/kube/builder/rbac.go @@ -0,0 +1,45 @@ +package builder + +import ( + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Create role that only grants access to specific service +func CreateRole(namespace string, serviceName string) *rbacv1.Role { + return &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: AkashRoleName, + Namespace: namespace, + }, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{""}, + Resources: []string{"services", "services/status"}, + Verbs: []string{"get", "list", "watch"}, + }, + }, + } +} + +// Create role binding between service account and role +func CreateRoleBinding(namespace string) *rbacv1.RoleBinding { + return &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: AkashRoleBinding, + Namespace: namespace, + }, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: "default", + Namespace: namespace, + }, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: AkashRoleName, + }, + } +} diff --git a/cluster/kube/builder/statefulset.go b/cluster/kube/builder/statefulset.go index 97040edb9..6ea030d1a 100644 --- a/cluster/kube/builder/statefulset.go +++ b/cluster/kube/builder/statefulset.go @@ -5,6 +5,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "strconv" ) type StatefulSet interface { @@ -31,12 +32,66 @@ func BuildStatefulSet(workload *Workload) StatefulSet { func (b *statefulSet) Create() (*appsv1.StatefulSet, error) { // nolint:golint,unparam falseValue := false + trueValue := true revisionHistoryLimit := int32(1) partition := int32(0) maxUnavailable := intstr.FromInt32(1) + // Add config volume + configVolume := corev1.Volume{ + Name: AkashConfigVolume, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + } + + // Calculate if NodePort is required + requiresNodePort := false + service := &b.deployment.ManifestGroup().Services[b.serviceIdx] + for _, expose := range service.Expose { + if expose.Global { + requiresNodePort = true + break + } + } + + // Add init container + initContainer := corev1.Container{ + Name: AkashConfigInitName, + Image: "alpine/curl:3.14", + Command: []string{ + "/bin/sh", + "-c", + akashInitScript, + }, + Env: []corev1.EnvVar{ + { + Name: "SERVICE_NAME", + Value: b.Name(), + }, + { + Name: "AKASH_CONFIG_PATH", + Value: AkashConfigMount, + }, + { + Name: "AKASH_CONFIG_FILE", + Value: AkashConfigEnvFile, + }, + { + Name: "AKASH_REQUIRES_NODEPORT", + Value: strconv.FormatBool(requiresNodePort), + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: AkashConfigVolume, + MountPath: AkashConfigMount, + }, + }, + } + kdeployment := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: b.Name(), @@ -65,10 +120,11 @@ func (b *statefulSet) Create() (*appsv1.StatefulSet, error) { // nolint:golint,u SecurityContext: &corev1.PodSecurityContext{ RunAsNonRoot: &falseValue, }, - AutomountServiceAccountToken: &falseValue, + AutomountServiceAccountToken: &trueValue, + InitContainers: []corev1.Container{initContainer}, Containers: []corev1.Container{b.container()}, ImagePullSecrets: b.secretsRefs, - Volumes: b.volumesObjs, + Volumes: append(b.volumesObjs, configVolume), }, }, VolumeClaimTemplates: b.pvcsObjs, diff --git a/cluster/kube/builder/workload.go b/cluster/kube/builder/workload.go index 4ec00d808..560048c0b 100644 --- a/cluster/kube/builder/workload.go +++ b/cluster/kube/builder/workload.go @@ -13,6 +13,7 @@ import ( "github.com/akash-network/node/sdl" sdlutil "github.com/akash-network/node/sdl/util" + pmanifest "github.com/akash-network/provider/manifest" crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2" ) @@ -35,6 +36,7 @@ type Workload struct { volumesObjs []corev1.Volume pvcsObjs []corev1.PersistentVolumeClaim secretsRefs []corev1.LocalObjectReference + podSpec *corev1.PodSpec } var _ workloadBase = (*Workload)(nil) @@ -61,6 +63,7 @@ func NewWorkloadBuilder( sparams: sparams, }, serviceIdx: serviceIdx, + podSpec: &corev1.PodSpec{}, } res.volumesObjs = res.volumes() @@ -84,11 +87,23 @@ func (b *Workload) container() corev1.Container { service := &b.group.Services[b.serviceIdx] sparams := b.sparams[b.serviceIdx] + // Use default service account + b.podSpec.ServiceAccountName = "default" + + // Add config volume mount + volumeMounts := []corev1.VolumeMount{ + { + Name: AkashConfigVolume, + MountPath: AkashConfigMount, + }, + } + kcontainer := corev1.Container{ - Name: service.Name, - Image: service.Image, - Command: service.Command, - Args: service.Args, + Name: service.Name, + Image: service.Image, + Command: service.Command, + Args: service.Args, + VolumeMounts: volumeMounts, Resources: corev1.ResourceRequirements{ Limits: make(corev1.ResourceList), Requests: make(corev1.ResourceList), @@ -166,6 +181,7 @@ func (b *Workload) container() corev1.Container { } envVarsAdded := make(map[string]int) + for _, env := range service.Env { parts := strings.SplitN(env, "=", 2) switch len(parts) { @@ -419,5 +435,22 @@ func (b *Workload) addEnvVarsForDeployment(envVarsAlreadyAdded map[string]int, e env = addIfNotPresent(envVarsAlreadyAdded, env, envVarAkashProvider, lid.Provider) env = addIfNotPresent(envVarsAlreadyAdded, env, envVarAkashClusterPublicHostname, b.settings.ClusterPublicHostname) + ingressHost := pmanifest.IngressHost(lid, b.Name()) + env = addIfNotPresent(envVarsAlreadyAdded, env, envVarAkashIngressHostname, fmt.Sprintf("%s.%s", ingressHost, b.settings.DeploymentIngressDomain)) + + svc := &b.deployment.ManifestGroup().Services[b.serviceIdx] + + // Add hostnames from service expose configurations + for _, expose := range svc.Expose { + if expose.IsIngress() { + // Add custom hostnames if specified + for idx, hostname := range expose.Hosts { + env = addIfNotPresent(envVarsAlreadyAdded, env, + fmt.Sprintf("%s_%d_%d", envVarAkashIngressCustomHostname, expose.Port, idx), + hostname) + } + } + } + return env } diff --git a/cluster/kube/client.go b/cluster/kube/client.go index 340641882..dde980ed7 100644 --- a/cluster/kube/client.go +++ b/cluster/kube/client.go @@ -235,6 +235,20 @@ type previousObj struct { func (p *previousObj) recover(ctx context.Context, kc kubernetes.Interface, ac akashclient.Interface) []error { var errs []error + // Recover RBAC resources first + if err := kc.RbacV1().RoleBindings(p.nns.Name).Delete(ctx, builder.AkashRoleBinding, metav1.DeleteOptions{}); err != nil { + if !kerrors.IsNotFound(err) { + errs = append(errs, err) + } + } + + if err := kc.RbacV1().Roles(p.nns.Name).Delete(ctx, builder.AkashRoleName, metav1.DeleteOptions{}); err != nil { + if !kerrors.IsNotFound(err) { + errs = append(errs, err) + } + } + + // Recover other resources for _, val := range slices.Backward(p.nGlobalServices) { if err := kc.CoreV1().Services(val.Namespace).Delete(ctx, val.Name, metav1.DeleteOptions{}); err != nil { errs = append(errs, err) @@ -352,9 +366,34 @@ func (c *client) Deploy(ctx context.Context, deployment ctypes.IDeployment) (err lid := cdeployment.LeaseID() group := cdeployment.ManifestGroup() + ns := builder.LidNS(lid) po := &previousObj{} + // Create namespace first + applies.ns = builder.BuildNS(settings, cdeployment) + po.nns, po.uns, po.ons, err = applyNS(ctx, c.kc, applies.ns) + if err != nil { + c.log.Error("applying namespace", "err", err, "lease", lid) + return err + } + + // Create RBAC resources + + role := builder.CreateRole(ns, group.Name) + if _, err := c.kc.RbacV1().Roles(ns).Create(ctx, role, metav1.CreateOptions{}); err != nil { + if !kerrors.IsAlreadyExists(err) { + return fmt.Errorf("error creating role: %w", err) + } + } + + binding := builder.CreateRoleBinding(ns) + if _, err := c.kc.RbacV1().RoleBindings(ns).Create(ctx, binding, metav1.CreateOptions{}); err != nil { + if !kerrors.IsAlreadyExists(err) { + return fmt.Errorf("error creating role binding: %w", err) + } + } + defer func() { tmpErr := err @@ -652,23 +691,40 @@ func (c *client) Deploy(ctx context.Context, deployment ctypes.IDeployment) (err func (c *client) TeardownLease(ctx context.Context, lid mtypes.LeaseID) error { c.log.Info("tearing down lease", "lease", lid) + ns := builder.LidNS(lid) + // Delete RBAC resources first + if err := c.kc.RbacV1().RoleBindings(ns).Delete(ctx, builder.AkashRoleBinding, metav1.DeleteOptions{}); err != nil { + if !kerrors.IsNotFound(err) { + c.log.Error("teardown lease: unable to delete role binding", "ns", ns, "error", err) + } + } + + if err := c.kc.RbacV1().Roles(ns).Delete(ctx, builder.AkashRoleName, metav1.DeleteOptions{}); err != nil { + if !kerrors.IsNotFound(err) { + c.log.Error("teardown lease: unable to delete role", "ns", ns, "error", err) + } + } + + // Delete namespace _, result := wrapKubeCall("namespaces-delete", func() (interface{}, error) { - return nil, c.kc.CoreV1().Namespaces().Delete(ctx, builder.LidNS(lid), metav1.DeleteOptions{}) + return nil, c.kc.CoreV1().Namespaces().Delete(ctx, ns, metav1.DeleteOptions{}) }) if result != nil { - c.log.Error("teardown lease: unable to delete namespace", "ns", builder.LidNS(lid), "error", result) + c.log.Error("teardown lease: unable to delete namespace", "ns", ns, "error", result) if kerrors.IsNotFound(result) { result = nil } } + + // Delete manifest _, err := wrapKubeCall("manifests-delete", func() (interface{}, error) { - return nil, c.ac.AkashV2beta2().Manifests(c.ns).Delete(ctx, builder.LidNS(lid), metav1.DeleteOptions{}) + return nil, c.ac.AkashV2beta2().Manifests(c.ns).Delete(ctx, ns, metav1.DeleteOptions{}) }) if err != nil { - c.log.Error("teardown lease: unable to delete manifest", "ns", builder.LidNS(lid), "error", err) + c.log.Error("teardown lease: unable to delete manifest", "ns", ns, "error", err) } return result