diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java index 34c8cc99a3..30649e8968 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java @@ -37,6 +37,12 @@ @SuperBuilder public abstract class CommonStatus { + // Frequent error message constants for resource failure reporting + public static final String MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED = + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted."; + public static final String MSG_HA_METADATA_NOT_AVAILABLE = "HA metadata is not available"; + public static final String MSG_MANUAL_RESTORE_REQUIRED = "Manual restore required."; + /** Last observed status of the Flink job on Application/Session cluster. */ private JobStatus jobStatus = new JobStatus(); @@ -90,6 +96,23 @@ public ResourceLifecycleState getLifecycleState() { return ResourceLifecycleState.FAILED; } + // Check for unrecoverable deployments that should be marked as FAILED if the error contains + // the following substrings + if (this instanceof FlinkDeploymentStatus) { + FlinkDeploymentStatus deploymentStatus = (FlinkDeploymentStatus) this; + var jmDeployStatus = deploymentStatus.getJobManagerDeploymentStatus(); + + // ERROR/MISSING deployments are in terminal error state and should always be FAILED + if ((jmDeployStatus == JobManagerDeploymentStatus.MISSING + || jmDeployStatus == JobManagerDeploymentStatus.ERROR) + && error != null + && (error.contains(MSG_MANUAL_RESTORE_REQUIRED) + || error.contains(MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED) + || error.contains(MSG_HA_METADATA_NOT_AVAILABLE))) { + return ResourceLifecycleState.FAILED; + } + } + if (reconciliationStatus.getState() == ReconciliationState.ROLLED_BACK) { return ResourceLifecycleState.ROLLED_BACK; } else if (reconciliationStatus.isLastReconciledSpecStable()) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java index 9e79982c25..3fcf8e46cd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java @@ -59,6 +59,8 @@ import java.util.Optional; import java.util.function.Predicate; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE; import static org.apache.flink.kubernetes.operator.reconciler.SnapshotType.CHECKPOINT; @@ -227,7 +229,8 @@ protected JobUpgrade getJobUpgrade(FlinkResourceContext ctx, Configuration d if (!SnapshotUtils.lastSavepointKnown(status)) { throw new UpgradeFailureException( - "Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. Manual restore required.", + "Job is in terminal state but last checkpoint is unknown, possibly due to an unrecoverable restore error. " + + MSG_MANUAL_RESTORE_REQUIRED, "UpgradeFailed"); } LOG.info("Job is in terminal state, ready for upgrade from observed latest state"); @@ -360,7 +363,8 @@ private boolean allowLastStateCancel(FlinkResourceContext ctx) { var conf = ctx.getObserveConfig(); if (!ctx.getFlinkService().isHaMetadataAvailable(conf)) { - LOG.info("HA metadata not available, cancel will be used instead of last-state"); + LOG.info( + "{}, cancel will be used instead of last-state", MSG_HA_METADATA_NOT_AVAILABLE); return true; } return conf.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java index 7e022dbf25..199a5030f4 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java @@ -59,6 +59,9 @@ import java.util.Optional; import java.util.UUID; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED; /** Reconciler Flink Application deployments. */ @@ -114,9 +117,11 @@ protected JobUpgrade getJobUpgrade( || jmDeployStatus == JobManagerDeploymentStatus.ERROR) && !flinkService.isHaMetadataAvailable(deployConfig)) { throw new UpgradeFailureException( - "JobManager deployment is missing and HA data is not available to make stateful upgrades. " - + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. " - + "Manual restore required.", + "JobManager deployment is missing and " + + MSG_HA_METADATA_NOT_AVAILABLE + + " to make stateful upgrades. " + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED + + MSG_MANUAL_RESTORE_REQUIRED, "UpgradeFailed"); } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 211f2e9dc4..820fbced76 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -155,6 +155,9 @@ import java.util.jar.Manifest; import java.util.stream.Collectors; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION; import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX; import static org.apache.flink.util.ExceptionUtils.findThrowable; @@ -567,7 +570,7 @@ public Optional getLastCheckpoint(JobID jobId, Configuration conf) { .getExternalPointer() .equals(NonPersistentMetadataCheckpointStorageLocation.EXTERNAL_POINTER)) { throw new UpgradeFailureException( - "Latest checkpoint not externally addressable, manual recovery required.", + "Latest checkpoint not externally addressable, " + MSG_MANUAL_RESTORE_REQUIRED, "CheckpointNotFound"); } return latestCheckpointOpt.map( @@ -1022,8 +1025,9 @@ protected static Configuration removeOperatorConfigs(Configuration config) { private void validateHaMetadataExists(Configuration conf) { if (!isHaMetadataAvailable(conf)) { throw new UpgradeFailureException( - "HA metadata not available to restore from last state. " - + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. ", + MSG_HA_METADATA_NOT_AVAILABLE + + " to restore from last state." + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED, "RestoreFailed"); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java index bee769fb96..c020c53d98 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java @@ -105,6 +105,10 @@ import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; + /** Flink service mock for tests. */ public class TestingFlinkService extends AbstractFlinkService { @@ -244,9 +248,10 @@ protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) thr protected void validateHaMetadataExists(Configuration conf) { if (!isHaMetadataAvailable(conf)) { throw new UpgradeFailureException( - "HA metadata not available to restore from last state. " - + "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. " - + "Manual restore required.", + MSG_HA_METADATA_NOT_AVAILABLE + + " to restore from last state. " + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED + + MSG_MANUAL_RESTORE_REQUIRED, "RestoreFailed"); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java index a399d871cc..212b0da70d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java @@ -26,6 +26,7 @@ import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; import org.apache.flink.kubernetes.operator.api.spec.JobState; +import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.metrics.CustomResourceMetrics; @@ -52,6 +53,9 @@ import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.STABLE; import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.SUSPENDED; import static org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState.UPGRADING; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_HA_METADATA_NOT_AVAILABLE; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED; +import static org.apache.flink.kubernetes.operator.api.status.CommonStatus.MSG_MANUAL_RESTORE_REQUIRED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -337,4 +341,53 @@ private Map> initTimeHistos() { } return histos; } + + @Test + public void testUnrecoverableDeploymentLifecycleState() { + var application = TestUtils.buildApplicationCluster(); + + // Setup the deployment to simulate it has been deployed (so isBeforeFirstDeployment = + // false) + ReconciliationUtils.updateStatusForDeployedSpec(application, new Configuration()); + application.getStatus().getReconciliationStatus().markReconciledSpecAsStable(); + + application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR); + application + .getStatus() + .setError( + "\"JobManager deployment is missing and " + + MSG_HA_METADATA_NOT_AVAILABLE + + " to make stateful upgrades. " + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED + + MSG_MANUAL_RESTORE_REQUIRED); + assertEquals( + FAILED, + application.getStatus().getLifecycleState(), + "ERROR deployment with `configmaps have been deleted` error should always be FAILED (terminal error state)"); + + application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + application + .getStatus() + .setError( + MSG_HA_METADATA_NOT_AVAILABLE + + " to restore from last state. " + + MSG_JOB_FINISHED_OR_CONFIGMAPS_DELETED); + assertEquals( + FAILED, + application.getStatus().getLifecycleState(), + "MISSING deployment with error should be FAILED"); + + application.getStatus().setError(null); + application.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING); + // Reset to DEPLOYED state (not stable yet) to simulate ongoing deployment + application.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED); + application + .getStatus() + .getReconciliationStatus() + .setLastStableSpec(null); // Mark as not stable + assertEquals( + DEPLOYED, + application.getStatus().getLifecycleState(), + "MISSING deployment before stability should not be FAILED yet (still deploying)"); + } }