Skip to content

[FLINK-28648][Kubernetes Operator] Allow session deletion to block on any running job #994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

nishita-09
Copy link

@nishita-09 nishita-09 commented Jul 10, 2025

What is the purpose of the change

This pull request adds a configuration to allow session cluster cleanup blocking in case unmanaged jobs are present. That is the FlinkDeployment deletion is blocked in presence of FlinkSessionJobs as well as if jobs submitted through CLI are in non_terminal states.

Brief change log

(for example:)

  • Added getUnmanagedJobs() method to detect CLI-submitted jobs not managed by FlinkSessionJob resources
  • Modified cleanupInternal() to check for unmanaged jobs when blocking is enabled
  • Improved error messages and logging.

Verifying this change

This change added tests and can be verified as follows:

  • Added testGetUnmanagedJobs test for validating getUnmanagedJobs

    • This test would validate if the function correctly identifies jobs which are not controlled by SessionJob and are in non-terminal state.
  • Manually verified the change by running a cluster with 2 JobManagers and submitted a SessionJob as well as CLI Jobs.

    • Config: session.block-on-unmanaged-jobs: true

    • Deleted flinkDeployment -> Generates Event CleanupFailed in flinkDeployment due to presence of sessionjobs

    • Deleted flinkSessionJob -> Generates Event CleanupFailed in flinkDeployment due to presence of unmanaged Jobs -> flinkSessionJob was deleted.

    • Cancelled CLI submitted job -> Generates Event Cleanup after ReconcileInterval -> CLI job was cancelled and then the flinkDeployment was cleaned up.

Screenshot 2025-07-10 at 10 04 51 PM Screenshot 2025-07-10 at 10 06 54 PM Screenshot 2025-07-14 at 2 15 32 PM
  • Config: session.block-on-unmanaged-jobs: false
  • Manually verified the change by running a cluster with 2 JobManagers and submitted a SessionJob as well as CLI Jobs.
  • Deleted flinkDeployment -> Generates Event CleanupFailed in flinkDeployment due to presence of sessionjobs
  • Deleted flinkSessionJob -> Generates Event Cleanup inspite of running CLI jobs being present. -> SessionJob is deleted , followed by flinkDeployment .

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changes to the CustomResourceDescriptors: no
  • Core observer or reconciler logic that is regularly executed: yes

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented

@nishita-09 nishita-09 changed the title [FLINK-28648][operator] Fix Cleanup Process for Session Cluster [FLINK-28648][operator] Allow session deletion to block on any running job Jul 10, 2025
@nishita-09 nishita-09 changed the title [FLINK-28648][operator] Allow session deletion to block on any running job [FLINK-28648][Kubernetes Operator] Allow session deletion to block on any running job Jul 10, 2025
@nishita-09
Copy link
Author

@gyfora The build was failing due to missing documentation about added configuration. I have added those in the new commit now.

var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
var resourceContext = getResourceContext(deployment, context);

// Use reflection to access the private getUnmanagedJobs method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not use reflection for this, we can make the method protected with the @visiblefor testing annotation

Comment on lines 159 to 164
// Get job IDs that are not managed by FlinkSessionJob resources and are currently
// running
// FAILED(TerminalState.GLOBALLY)
// CANCELED(TerminalState.GLOBALLY)
// FINISHED(TerminalState.GLOBALLY)
// Above terminal states are not considered since they are no longer active jobs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment feels a bit too verbose, enough to say only consider non terminal jobs (feels a bit like AI generated)

Copy link
Author

@nishita-09 nishita-09 Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually thought this would be necessary to clarify the functionality of this. 😢 Sure will remove if its too verbose

* the Flink cluster but are not managed by FlinkSessionJob resources.
*/
private Set<JobID> getUnmanagedJobs(
FlinkResourceContext<FlinkDeployment> ctx, Set<FlinkSessionJob> sessionJobs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we need to pass sessionJobs here. if the flag is enabled, any running job should simply block it. We can simplify this logic a lot

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simply replace this method with something like getNonTerminalJobIds() or boolean anyNonTerminalJobs()

That would be enough for this feature.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will simplify this further. Thanks for the review

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gyfora I have pushed another commit to address the comments here. I have stuck with getNonTerminalJobIds() to ensure the Event contains the list of job IDs that are not terminated for better observability for the user.

Comment on lines 135 to 137
LOG.info(
"Starting unmanaged job detection for session cluster: {}",
ctx.getResource().getMetadata().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be on debug level, also no need to include resource name/info in the log message. Its in the MDC already

Comment on lines 222 to 230
LOG.warn(error);
if (eventRecorder.triggerEvent(
deployment,
EventRecorder.Type.Warning,
EventRecorder.Reason.CleanupFailed,
EventRecorder.Component.Operator,
error,
ctx.getKubernetesClient())) {
LOG.warn(error);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are logging the error twice, you don't need to log it at all as the event triggering already logs it.

@nishita-09 nishita-09 requested a review from gyfora July 14, 2025 09:30
@nishita-09
Copy link
Author

@gyfora I ran these tests on my local, they seem to be passing. I am not sure what is causing the issue here. Can you have a look at it if possible?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants