Skip to content

[FLINK-28648][Kubernetes Operator] Allow session deletion to block on any running job #994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 21, 2025
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/dynamic_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@
<td>Duration</td>
<td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td>
</tr>
<tr>
<td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,14 @@ public static String operatorConfigKey(String key) {
.withDescription(
"Indicate whether the job should be drained when stopping with savepoint.");

@Documentation.Section(SECTION_DYNAMIC)
public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS =
operatorConfig("session.block-on-unmanaged-jobs")
.booleanType()
.defaultValue(true)
.withDescription(
"Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.");

@Documentation.Section(SECTION_ADVANCED)
public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
operatorConfig("cluster.resource-view.refresh-interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.kubernetes.operator.reconciler.deployment;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
Expand All @@ -25,11 +27,16 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;

import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -120,6 +127,37 @@ private void recoverSession(FlinkResourceContext<FlinkDeployment> ctx) throws Ex
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
}

// Detects jobs which are not in globally terminated states
@VisibleForTesting
Set<JobID> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) {
LOG.debug("Starting nonTerminal jobs detection for session cluster");
try {
// Get all jobs running in the Flink cluster
var flinkService = ctx.getFlinkService();
var clusterClient = flinkService.getClusterClient(ctx.getObserveConfig());
var allJobs =
clusterClient
.sendRequest(
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.get()
.getJobs();

// running job Ids
Set<JobID> nonTerminalJobIds =
allJobs.stream()
.filter(job -> !job.getStatus().isGloballyTerminalState())
.map(JobDetails::getJobId)
.collect(Collectors.toSet());

return nonTerminalJobIds;
} catch (Exception e) {
LOG.warn("Failed to detect nonTerminal jobs in session cluster", e);
return Set.of();
}
}

@Override
public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) {
Set<FlinkSessionJob> sessionJobs =
Expand All @@ -143,13 +181,39 @@ public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx)
}
return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
} else {
LOG.info("Stopping session cluster");
var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
ctx.getFlinkService()
.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), conf, true);
return DeleteControl.defaultDelete();
}

// Check for non-terminated jobs if the option is enabled (Enabled by default) , after
// sessionJobs are deleted
boolean blockOnUnmanagedJobs =
ctx.getObserveConfig()
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
if (blockOnUnmanagedJobs) {
Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx);
if (!nonTerminalJobs.isEmpty()) {
var error =
String.format(
"The session cluster has non terminated jobs %s that should be cancelled first",
nonTerminalJobs.stream()
.map(JobID::toHexString)
.collect(Collectors.toList()));
eventRecorder.triggerEvent(
deployment,
EventRecorder.Type.Warning,
EventRecorder.Reason.CleanupFailed,
EventRecorder.Component.Operator,
error,
ctx.getKubernetesClient());
return DeleteControl.noFinalizerRemoval()
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
}
}

LOG.info("Stopping session cluster");
var conf = ctx.getObserveConfig();
ctx.getFlinkService()
.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), conf, true);
return DeleteControl.defaultDelete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@

package org.apache.flink.kubernetes.operator.reconciler.deployment;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.runtime.client.JobStatusMessage;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
Expand All @@ -36,13 +42,15 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -139,4 +147,161 @@ public void testSetOwnerReference() throws Exception {
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
Assertions.assertEquals(expectedOwnerReferences, or);
}

@Test
public void testGetNonTerminalJobs() throws Exception {
FlinkDeployment deployment = TestUtils.buildSessionCluster();
deployment
.getSpec()
.getFlinkConfiguration()
.put(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key(), "true");

assertEquals(
"true",
deployment
.getSpec()
.getFlinkConfiguration()
.get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key()));

reconciler.reconcile(deployment, flinkService.getContext());

// Verify deployment is in DEPLOYED state
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());

// Create different types of jobs
JobID managedJobId1 = new JobID();
JobID managedJobId2 = new JobID();
JobID unmanagedRunningJobId1 = new JobID();
JobID unmanagedTerminatedJobId = new JobID();
JobID unmanagedRunningJobId2 = new JobID();

// Add jobs to the testing service
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
managedJobId1,
"managed-job-1",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
managedJobId2,
"managed-job-2",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedRunningJobId1,
"unmanaged-running-job-1",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedTerminatedJobId,
"unmanaged-terminated-job",
JobStatus.CANCELED,
System.currentTimeMillis()),
new Configuration()));
flinkService
.listJobs()
.add(
Tuple3.of(
null,
new JobStatusMessage(
unmanagedRunningJobId2,
"unmanaged-running-job-2",
JobStatus.RUNNING,
System.currentTimeMillis()),
new Configuration()));

// Create FlinkSessionJob resources for the managed jobs
FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
managedSessionJob1.getMetadata().setName("managed-session-job-1");
managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
kubernetesClient.resource(managedSessionJob1).createOrReplace();

FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
managedSessionJob2.getMetadata().setName("managed-session-job-2");
managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
kubernetesClient.resource(managedSessionJob2).createOrReplace();

Set<FlinkSessionJob> sessionJobs = new HashSet<>();
sessionJobs.add(managedSessionJob1);
sessionJobs.add(managedSessionJob2);

// Test with blocking enabled - should identify all non-terminal jobs
var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
var resourceContext = getResourceContext(deployment, context);

var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
Set<JobID> nonTerminalJobs = sessionReconciler.getNonTerminalJobs(resourceContext);

// Verify all non-terminal jobs are identified - should be 4 (2 managed + 2 unmanaged
// running)
assertEquals(4, nonTerminalJobs.size(), "Should identify exactly 4 non-terminal jobs");

assertTrue(
nonTerminalJobs.contains(unmanagedRunningJobId1),
"Should contain unmanagedRunningJobId1");
assertTrue(
nonTerminalJobs.contains(unmanagedRunningJobId2),
"Should contain unmanagedRunningJobId2");

// Verify terminated job is not included
assertFalse(
nonTerminalJobs.contains(unmanagedTerminatedJobId),
"Should not contain terminated job");

// Test scenario with only unmanaged jobs
flinkService
.listJobs()
.removeIf(
job ->
job.f1.getJobId().equals(managedJobId1)
|| job.f1.getJobId().equals(managedJobId2));

Set<JobID> nonTerminalJobsAfterSessionJobsRemoval =
sessionReconciler.getNonTerminalJobs(resourceContext);

assertEquals(
2,
nonTerminalJobsAfterSessionJobsRemoval.size(),
"Should have 2 non-terminal jobs when sessionjobs are deleted");

// Test scenario with no running jobs
flinkService
.listJobs()
.removeIf(
job ->
job.f1.getJobId().equals(unmanagedRunningJobId1)
|| job.f1.getJobId().equals(unmanagedRunningJobId2));

Set<JobID> nonTerminalJobsAfterRemoval =
sessionReconciler.getNonTerminalJobs(resourceContext);

assertEquals(
0,
nonTerminalJobsAfterRemoval.size(),
"Should have no non-terminal jobs when only terminated jobs exist");
}
}