Skip to content

Feature/cron scheduling rayjob 2426 #3836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
3c6a9da
init cron sheduling for ray jobs, transferring for local dev
DW-Han Jun 23, 2025
0d986a9
initial cron scheduling for ray jobs
DW-Han Jun 26, 2025
acdc061
Delete ray-operator/controllers/ray/utils/cron_helpers.go
DW-Han Jun 26, 2025
b4330c9
updating dependencies
DW-Han Jun 26, 2025
f7da0f3
Delete ray-operator/test/e2e/rayjob_scheduling_test.go
DW-Han Jun 26, 2025
1a87974
Merge branch 'ray-project:master' into feature/cron-scheduling-rayjob…
DW-Han Jun 26, 2025
d2f294c
unit tests and cleaning up scheduling
DW-Han Jul 7, 2025
9b4db80
adding cluster delete option and cleaning code
DW-Han Jul 9, 2025
d6f0076
cleaning
DW-Han Jul 11, 2025
b6b3989
cleaning and adding schedule util unit tests
DW-Han Jul 15, 2025
e449c92
cleaning up comment
DW-Han Jul 15, 2025
d6a2bfc
seperate case for scheduling
DW-Han Jul 18, 2025
d369a93
cleaning doc string
DW-Han Jul 18, 2025
7a9f462
updating api.md
DW-Han Jul 18, 2025
c06bbed
cleaning up scheduling state, controller, etc.
DW-Han Jul 18, 2025
6da9226
integration tests
DW-Han Jul 21, 2025
93adf7c
cleaning controller
DW-Han Jul 21, 2025
8055fa7
cleaning and lint
DW-Han Jul 21, 2025
23f5e28
deepcopy function
DW-Han Jul 21, 2025
1a032b9
working integration tests and cleaning
DW-Han Jul 22, 2025
95cd767
making tests more air tight
DW-Han Jul 23, 2025
5f176a3
making tests more air tight
DW-Han Jul 23, 2025
c86ea08
cleaning tests
DW-Han Jul 24, 2025
05c47c7
cleaning rayjob controller
DW-Han Jul 24, 2025
f679491
cleaning test
DW-Han Jul 24, 2025
90c5236
Merge branch 'ray-project:master' into feature/cron-scheduling-rayjob…
DW-Han Jul 24, 2025
226113f
cleaning test
DW-Han Jul 24, 2025
75cf551
returning to scheduled state
DW-Han Jul 24, 2025
baa17d6
cleaning test and controller
DW-Han Jul 25, 2025
4e03d82
cleaning
DW-Han Jul 25, 2025
00932b8
no cluster creation at start of schedule and cleaning tests
DW-Han Jul 28, 2025
26fe74f
cleaning schedule distance function and other cleaning
DW-Han Jul 28, 2025
f5f26fa
cleaning comments
DW-Han Jul 28, 2025
c88ee44
cleaning
DW-Han Jul 28, 2025
2a57524
clean commit
DW-Han Jul 29, 2025
e4c2a3e
commiting changes
DW-Han Jul 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ _Appears in:_
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.<br />In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster. | K8sJobMode | |
| `entrypointResources` _string_ | EntrypointResources specifies the custom resources and quantities to reserve for the<br />entrypoint command. | | |
| `schedule` _string_ | Schedule specifies a cron like string for scheduling Ray jobs.<br />When shutdownAfterJobFinishes is set to true, a new cluster is provisioned<br />per scheduled job, otherwise the job is scheduled on an existing cluster. | | |
| `entrypointNumCpus` _float_ | EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command. | | |
| `entrypointNumGpus` _float_ | EntrypointNumGpus specifies the number of gpus to reserve for the entrypoint command. | | |
| `ttlSecondsAfterFinished` _integer_ | TTLSecondsAfterFinished is the TTL to clean up RayCluster.<br />It's only working when ShutdownAfterJobFinishes set to true. | 0 | |
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ const (
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
JobDeploymentStatusRetrying JobDeploymentStatus = "Retrying"
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
JobDeploymentStatusScheduling JobDeploymentStatus = "Scheduling"
JobDeploymentStatusScheduled JobDeploymentStatus = "Scheduled"
)

// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus
Expand Down Expand Up @@ -181,6 +183,11 @@ type RayJobSpec struct {
// entrypoint command.
// +optional
EntrypointResources string `json:"entrypointResources,omitempty"`
// Schedule specifies a cron like string for scheduling Ray jobs.
// When shutdownAfterJobFinishes is set to true, a new cluster is provisioned
// per scheduled job, otherwise the job is scheduled on an existing cluster.
// +optional
Schedule string `json:"schedule,omitempty"`
// EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command.
// +optional
EntrypointNumCpus float32 `json:"entrypointNumCpus,omitempty"`
Expand Down Expand Up @@ -233,6 +240,9 @@ type RayJobStatus struct {
// or the submitter Job has failed.
// +optional
EndTime *metav1.Time `json:"endTime,omitempty"`
// lastScheduledTime is the last time the job was successfully scheduled.
// +optional
LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
// Succeeded is the number of times this job succeeded.
// +kubebuilder:default:=0
// +optional
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 121 additions & 0 deletions ray-operator/config/samples/ray-job.schedule.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-schedule
spec:
# schedule specifires a cron scheduling string telling the rayjob when to start schedule and run new jobs
# Here it runs at every 5 minutes of every hour of every day of every week of every year
schedule: "*/5 * * * *"

entrypoint: python /home/ray/samples/sample_code.py

# shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
# NOTE that the expected behavior with schedule is that the cluster will be deleted and recreated at each schedule if set to true, and it will keep using the same cluster otherwise
shutdownAfterJobFinishes: true

runtimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"


rayClusterSpec:
rayVersion: '2.46.0'
headGroupSpec:
rayStartParams: {}
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.46.0
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
- name: code-sample
configMap:
name: ray-job-code-sample
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
- replicas: 1
minReplicas: 1
maxReplicas: 5
groupName: small-group
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
# SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
# If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
# submitterPodTemplate:
# spec:
# restartPolicy: Never
# containers:
# - name: my-custom-rayjob-submitter-pod
# image: rayproject/ray:2.46.0
# # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field.
# # Specifying Command is not recommended.
# # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"]


######################Ray code sample#################################
# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example
# it is mounted into the container and executed to show the Ray job at work
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests

ray.init()

@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0

def inc(self):
self.counter += 1

def get_counter(self):
return "{} got {}".format(self.name, self.counter)

counter = Counter.remote()

for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))

# Verify that the correct runtime env was used for the job.
assert requests.__version__ == "2.26.0"
80 changes: 79 additions & 1 deletion ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/robfig/cron/v3"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -34,6 +35,8 @@ const (
RayJobDefaultRequeueDuration = 3 * time.Second
RayJobDefaultClusterSelectorKey = "ray.io/cluster"
PythonUnbufferedEnvVarName = "PYTHONUNBUFFERED"
// The buffer period in which a scheduled rajob can run since the last cron tick
ScheduleBuffer = 100 * time.Millisecond
)

// RayJobReconciler reconciles a RayJob object
Expand Down Expand Up @@ -449,9 +452,59 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}
if rayJobInstance.Spec.Schedule != "" {
logger.Info("RayJob is scheduled again")
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduling
break
}

// If the RayJob is completed, we should not requeue it.
return ctrl.Result{}, nil
case rayv1.JobDeploymentStatusScheduling:
deleteCluster := rayJobInstance.Spec.ShutdownAfterJobFinishes

isJobDeleted, err := r.deleteSubmitterJob(ctx, rayJobInstance)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

if !isJobDeleted {
logger.Info("The release of the compute resources has not been completed yet. " +
"Wait for the resources to be deleted before the status transitions to avoid a resource leak.")
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
}

if deleteCluster {
rayJobInstance.Status.RayClusterStatus = rayv1.RayClusterStatus{}
rayJobInstance.Status.RayClusterName = ""

}
rayJobInstance.Status.DashboardURL = ""
rayJobInstance.Status.JobId = ""
rayJobInstance.Status.Message = ""
rayJobInstance.Status.Reason = ""
rayJobInstance.Status.RayJobStatusInfo = rayv1.RayJobStatusInfo{}

rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
case rayv1.JobDeploymentStatusScheduled:
// We get the time from the current time to the previous and next cron schedule times
// We pass in time.Now() as a parameter so easier unit testing and consistency
t1, t2, err := r.getPreviousAndNextScheduleDistance(ctx, time.Now(), rayJobInstance)
if err != nil {
logger.Error(err, "Could not get the previous and next distances for a cron schedule")
return ctrl.Result{}, err
}
// Checking if we are currently within a buffer to the previous cron schedule time
if t2 <= ScheduleBuffer {
logger.Info("The current time is within the buffer window of a cron tick", "NextScheduleTimeDuration", t1, "LastScheduleTimeDuration", t2)
rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusNew
} else {
logger.Info("Waiting until the next reconcile to determine schedule", "nextScheduleDuration", t1, "currentTime", time.Now(), "lastScheduleTimeDuration", t2)
return ctrl.Result{RequeueAfter: t1}, nil
}

default:
logger.Info("Unknown JobDeploymentStatus", "JobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
Expand Down Expand Up @@ -802,7 +855,14 @@ func initRayJobStatusIfNeed(ctx context.Context, rayJob *rayv1.RayJob) error {
if rayJob.Status.JobStatus == "" {
rayJob.Status.JobStatus = rayv1.JobStatusNew
}
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusInitializing
// if the rayjob is scheduled according to a cron string set the status to scheduling instead of initializing to begin with
// we check the job count to know if its the first job
if rayJob.Spec.Schedule != "" && rayJob.Status.Failed == nil && rayJob.Status.Succeeded == nil {
logger.Info("Since this is a new schdueled job we enter the Scheduled state not Initializing")
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
} else {
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusInitializing
}
rayJob.Status.StartTime = &metav1.Time{Time: time.Now()}
return nil
}
Expand Down Expand Up @@ -894,6 +954,24 @@ func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.Ra
return rayCluster, nil
}

func (r *RayJobReconciler) getPreviousAndNextScheduleDistance(ctx context.Context, currentTime time.Time, rayJobInstance *rayv1.RayJob) (time.Duration, time.Duration, error) {
logger := ctrl.LoggerFrom(ctx)
logger.Info("Calculating next schedule for the RayJob")
cronSchedule, err := cron.ParseStandard(utils.FormatSchedule(rayJobInstance, r.Recorder))
if err != nil {
// this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", rayJobInstance.Spec.Schedule, err)
return 0, 0, fmt.Errorf("the cron schedule provided is unparseable: %w", err)
}
logger.Info("Successfully parsed cron schedule", "CronSchedule", cronSchedule)

t1 := utils.NextScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)
t2 := utils.LastScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)

return t1, t2, nil
}

func updateStatusToSuspendingIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
logger := ctrl.LoggerFrom(ctx)
if !rayJob.Spec.Suspend {
Expand Down
Loading
Loading