From bf8776437e11512e7898da42e933a60121f8196e Mon Sep 17 00:00:00 2001 From: win5923 Date: Tue, 18 Nov 2025 15:53:35 +0000 Subject: [PATCH 01/12] [Feature] Support recreate pods for RayCluster using RayClusterSpec Signed-off-by: win5923 --- docs/reference/api.md | 30 ++++++ .../crds/ray.io_rayclusters.yaml | 5 + .../kuberay-operator/crds/ray.io_rayjobs.yaml | 5 + .../crds/ray.io_rayservices.yaml | 5 + ray-operator/apis/ray/v1/raycluster_types.go | 18 ++++ ray-operator/apis/ray/v1/rayservice_types.go | 2 +- .../apis/ray/v1/zz_generated.deepcopy.go | 25 +++++ .../config/crd/bases/ray.io_rayclusters.yaml | 5 + .../config/crd/bases/ray.io_rayjobs.yaml | 5 + .../config/crd/bases/ray.io_rayservices.yaml | 5 + ray-operator/controllers/ray/common/pod.go | 39 ++++++++ .../controllers/ray/raycluster_controller.go | 93 +++++++++++++++++++ .../ray/rayservice_controller_unit_test.go | 12 +-- .../controllers/ray/utils/constant.go | 1 + .../controllers/ray/utils/validation.go | 14 ++- .../ray/v1/rayclusterspec.go | 29 ++++-- .../ray/v1/rayclusterupgradestrategy.go | 27 ++++++ .../pkg/client/applyconfiguration/utils.go | 2 + .../rayservice_in_place_update_test.go | 2 +- 19 files changed, 304 insertions(+), 20 deletions(-) create mode 100644 ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go diff --git a/docs/reference/api.md b/docs/reference/api.md index fb6a624645e..2a2c3bbf470 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -297,6 +297,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | +| `upgradeStrategy` _[RayClusterUpgradeStrategy](#rayclusterupgradestrategy)_ | UpgradeStrategy defines the scaling policy used when upgrading the RayCluster | | | | `authOptions` _[AuthOptions](#authoptions)_ | AuthOptions specifies the authentication options for the RayCluster. | | | | `suspend` _boolean_ | Suspend indicates whether a RayCluster should be suspended.
A suspended RayCluster will have head pods and worker pods deleted. | | | | `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayCluster.
The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.
The kuberay-operator reconciles a RayCluster which doesn't have this field at all or
the field value is the reserved string 'ray.io/kuberay-operator',
but delegates reconciling the RayCluster with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable. | | | @@ -309,6 +310,35 @@ _Appears in:_ | `workerGroupSpecs` _[WorkerGroupSpec](#workergroupspec) array_ | WorkerGroupSpecs are the specs for the worker pods | | | +#### RayClusterUpgradeStrategy + + + + + + + +_Appears in:_ +- [RayClusterSpec](#rayclusterspec) + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `type` _[RayClusterUpgradeType](#rayclusterupgradetype)_ | Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. | | | + + +#### RayClusterUpgradeType + +_Underlying type:_ _string_ + + + + + +_Appears in:_ +- [RayClusterUpgradeStrategy](#rayclusterupgradestrategy) + + + #### RayJob diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml index 45f5406c411..e731ca0105f 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml @@ -4532,6 +4532,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index 1f4c8432168..b5d7ce86490 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -4614,6 +4614,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index baec5cfe1f3..634b54b59de 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -4512,6 +4512,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index f67a80ba854..b3113d65fd1 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -11,6 +11,9 @@ import ( // RayClusterSpec defines the desired state of RayCluster type RayClusterSpec struct { + // UpgradeStrategy defines the scaling policy used when upgrading the RayCluster + // +optional + UpgradeStrategy *RayClusterUpgradeStrategy `json:"upgradeStrategy,omitempty"` // AuthOptions specifies the authentication options for the RayCluster. // +optional AuthOptions *AuthOptions `json:"authOptions,omitempty"` @@ -49,6 +52,21 @@ type RayClusterSpec struct { WorkerGroupSpecs []WorkerGroupSpec `json:"workerGroupSpecs,omitempty"` } +type RayClusterUpgradeType string + +const ( + // During upgrade, Recreate strategy will delete all existing pods before creating new ones + Recreate RayClusterUpgradeType = "Recreate" + // No new pod will be created while the strategy is set to None + RayClusterUpgradeNone RayClusterUpgradeType = "None" +) + +type RayClusterUpgradeStrategy struct { + // Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. + // +optional + Type *RayClusterUpgradeType `json:"type,omitempty"` +} + // AuthMode describes the authentication mode for the Ray cluster. type AuthMode string diff --git a/ray-operator/apis/ray/v1/rayservice_types.go b/ray-operator/apis/ray/v1/rayservice_types.go index 83b9775f377..d8b8304a806 100644 --- a/ray-operator/apis/ray/v1/rayservice_types.go +++ b/ray-operator/apis/ray/v1/rayservice_types.go @@ -28,7 +28,7 @@ const ( // During upgrade, NewCluster strategy will create new upgraded cluster and switch to it when it becomes ready NewCluster RayServiceUpgradeType = "NewCluster" // No new cluster will be created while the strategy is set to None - None RayServiceUpgradeType = "None" + RayServiceUpgradeNone RayServiceUpgradeType = "None" ) // These statuses should match Ray Serve's application statuses diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index cd710592d98..ec261a2c7d6 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -378,6 +378,11 @@ func (in *RayClusterList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RayClusterSpec) DeepCopyInto(out *RayClusterSpec) { *out = *in + if in.UpgradeStrategy != nil { + in, out := &in.UpgradeStrategy, &out.UpgradeStrategy + *out = new(RayClusterUpgradeStrategy) + (*in).DeepCopyInto(*out) + } if in.AuthOptions != nil { in, out := &in.AuthOptions, &out.AuthOptions *out = new(AuthOptions) @@ -480,6 +485,26 @@ func (in *RayClusterStatus) DeepCopy() *RayClusterStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RayClusterUpgradeStrategy) DeepCopyInto(out *RayClusterUpgradeStrategy) { + *out = *in + if in.Type != nil { + in, out := &in.Type, &out.Type + *out = new(RayClusterUpgradeType) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RayClusterUpgradeStrategy. +func (in *RayClusterUpgradeStrategy) DeepCopy() *RayClusterUpgradeStrategy { + if in == nil { + return nil + } + out := new(RayClusterUpgradeStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RayJob) DeepCopyInto(out *RayJob) { *out = *in diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index 45f5406c411..e731ca0105f 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -4532,6 +4532,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index 1f4c8432168..b5d7ce86490 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -4614,6 +4614,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index baec5cfe1f3..634b54b59de 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -4512,6 +4512,11 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index a42f9184912..6df0b750975 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -158,11 +158,24 @@ func configureGCSFaultTolerance(podTemplate *corev1.PodTemplateSpec, instance ra } } +func GeneratePodTemplateHash(template corev1.PodTemplateSpec) (string, error) { + originalTemplate := template.DeepCopy() + return utils.GenerateJsonHash(*originalTemplate) +} + // DefaultHeadPodTemplate sets the config values func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, headSpec rayv1.HeadGroupSpec, podName string, headPort string) corev1.PodTemplateSpec { // TODO (Dmitri) The argument headPort is essentially unused; // headPort is passed into setMissingRayStartParams but unused there for the head pod. // To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic. + + // Calculate the pod template hash before any modifications + // This ensures the hash reflects the original user-defined template for upgrade detection + templateHash := "" + if hash, err := GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template); err == nil { + templateHash = hash + } + podTemplate := headSpec.Template if utils.IsDeterministicHeadPodNameEnabled() { podTemplate.Name = podName @@ -173,6 +186,13 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash + } + // Update rayStartParams with top-level Resources for head group. updateRayStartParamsResources(ctx, headSpec.RayStartParams, headSpec.Resources) @@ -295,12 +315,31 @@ func getEnableProbesInjection() bool { // DefaultWorkerPodTemplate sets the config values func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec { + // Calculate the pod template hash before any modifications + // This ensures the hash reflects the original user-defined template for upgrade detection + templateHash := "" + for _, wg := range instance.Spec.WorkerGroupSpecs { + if wg.GroupName == workerSpec.GroupName { + if hash, err := GeneratePodTemplateHash(wg.Template); err == nil { + templateHash = hash + } + break + } + } + podTemplate := workerSpec.Template podTemplate.GenerateName = podName + // Pods created by RayCluster should be restricted to the namespace of the RayCluster. // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash + } // The Ray worker should only start once the GCS server is ready. // only inject init container only when ENABLE_INIT_CONTAINER_INJECTION is true enableInitContainerInjection := getEnableInitContainerInjection() diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 7d4d8fab8f6..1e95c365343 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -180,6 +180,13 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance return ctrl.Result{}, nil } + if err := utils.ValidateRayClusterUpgradeOptions(instance); err != nil { + logger.Error(err, "The RayCluster UpgradeStrategy is invalid") + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterSpec), + "The RayCluster UpgradeStrategy is invalid %s/%s: %v", instance.Namespace, instance.Name, err) + return ctrl.Result{}, nil + } + if err := utils.ValidateRayClusterStatus(instance); err != nil { logger.Error(err, "The RayCluster status is invalid") r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterStatus), @@ -637,6 +644,21 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv } } + // Check if pods need to be recreated with Recreate upgradeStrategy + if r.shouldRecreatePodsForUpgrade(ctx, instance) { + logger.Info("RayCluster spec changed with Recreate upgradeStrategy, deleting all pods") + if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil { + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePodCollection), + "Failed deleting Pods due to spec change with Recreate upgradeStrategy for RayCluster %s/%s, %v", + instance.Namespace, instance.Name, err) + return errstd.Join(utils.ErrFailedDeleteAllPods, err) + } + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedPod), + "Deleted all Pods for RayCluster %s/%s due to spec change with Recreate upgradeStrategy", + instance.Namespace, instance.Name) + return nil + } + // check if all the pods exist headPods := corev1.PodList{} if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { @@ -1094,6 +1116,77 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context return nil } +// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on Head/Worker pod template changes +func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { + logger := ctrl.LoggerFrom(ctx) + + if instance.Spec.UpgradeStrategy == nil || instance.Spec.UpgradeStrategy.Type == nil { + return false + } + if *instance.Spec.UpgradeStrategy.Type != rayv1.Recreate { + return false + } + + allPods := corev1.PodList{} + if err := r.List(ctx, &allPods, common.RayClusterAllPodsAssociationOptions(instance).ToListOptions()...); err != nil { + logger.Error(err, "Failed to list pods for upgrade check") + return false + } + + if len(allPods.Items) == 0 { + return false + } + + headHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) + if err != nil { + logger.Error(err, "Failed to generate head template hash") + return false + } + + workerHashMap := make(map[string]string) + for _, workerGroup := range instance.Spec.WorkerGroupSpecs { + hash, err := common.GeneratePodTemplateHash(workerGroup.Template) + if err != nil { + logger.Error(err, "Failed to generate worker template hash", "groupName", workerGroup.GroupName) + continue + } + workerHashMap[workerGroup.GroupName] = hash + } + + // Check each pod to see if its template hash matches the current spec + for _, pod := range allPods.Items { + nodeType := pod.Labels[utils.RayNodeTypeLabelKey] + actualHash := pod.Annotations[utils.PodTemplateHashKey] + + var expectedHash string + if nodeType == string(rayv1.HeadNode) { + expectedHash = headHash + } else if nodeType == string(rayv1.WorkerNode) { + groupName := pod.Labels[utils.RayNodeGroupLabelKey] + var ok bool + expectedHash, ok = workerHashMap[groupName] + if !ok { + logger.Info("Worker group not found in spec, skipping pod", "pod", pod.Name, "groupName", groupName) + continue + } + } else { + continue + } + + if actualHash != expectedHash { + logger.Info("Pod template has changed, will recreate all pods", + "pod", pod.Name, + "nodeType", nodeType, + "actualHash", actualHash, + "expectedHash", expectedHash, + "upgradeStrategy", *instance.Spec.UpgradeStrategy.Type) + return true + } + } + + return false +} + // shouldDeletePod returns whether the Pod should be deleted and the reason // // @param pod: The Pod to be checked. diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 4818fbd6729..e40c4bd554b 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -1262,20 +1262,20 @@ func TestIsZeroDowntimeUpgradeEnabled(t *testing.T) { expected: true, }, { - name: "upgrade strategy is set to None, and env var is not set", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is not set", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "", expected: false, }, { - name: "upgrade strategy is set to None, and env var is set to true", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is set to true", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "true", expected: false, }, { - name: "upgrade strategy is set to None, and env var is set to false", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is set to false", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "false", expected: false, }, diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 3e0e86d1096..6980306d463 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -26,6 +26,7 @@ const ( HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete" NumWorkerGroupsKey = "ray.io/num-worker-groups" KubeRayVersion = "ray.io/kuberay-version" + PodTemplateHashKey = "ray.io/pod-template-hash" // Labels for feature RayMultihostIndexing // diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index a3e6da66e80..a9a2b69ac0a 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -37,6 +37,16 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { return nil } +func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { + if instance.Spec.UpgradeStrategy != nil { + creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) + if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { + return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) + } + } + return nil +} + // validateRayGroupResources checks for conflicting resource definitions. func validateRayGroupResources(groupName string, rayStartParams, resources map[string]string) error { hasRayStartResources := rayStartParams["num-cpus"] != "" || @@ -369,10 +379,10 @@ func ValidateRayServiceSpec(rayService *rayv1.RayService) error { // only NewClusterWithIncrementalUpgrade, NewCluster, and None are valid upgradeType if rayService.Spec.UpgradeStrategy != nil && rayService.Spec.UpgradeStrategy.Type != nil && - *rayService.Spec.UpgradeStrategy.Type != rayv1.None && + *rayService.Spec.UpgradeStrategy.Type != rayv1.RayServiceUpgradeNone && *rayService.Spec.UpgradeStrategy.Type != rayv1.NewCluster && *rayService.Spec.UpgradeStrategy.Type != rayv1.NewClusterWithIncrementalUpgrade { - return fmt.Errorf("Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.NewClusterWithIncrementalUpgrade, rayv1.NewCluster, rayv1.None) + return fmt.Errorf("Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.NewClusterWithIncrementalUpgrade, rayv1.NewCluster, rayv1.RayServiceUpgradeNone) } if rayService.Spec.RayClusterDeletionDelaySeconds != nil && diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go index 14cb592037f..144ff1812d5 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go @@ -5,16 +5,17 @@ package v1 // RayClusterSpecApplyConfiguration represents a declarative configuration of the RayClusterSpec type for use // with apply. type RayClusterSpecApplyConfiguration struct { - AuthOptions *AuthOptionsApplyConfiguration `json:"authOptions,omitempty"` - Suspend *bool `json:"suspend,omitempty"` - ManagedBy *string `json:"managedBy,omitempty"` - AutoscalerOptions *AutoscalerOptionsApplyConfiguration `json:"autoscalerOptions,omitempty"` - HeadServiceAnnotations map[string]string `json:"headServiceAnnotations,omitempty"` - EnableInTreeAutoscaling *bool `json:"enableInTreeAutoscaling,omitempty"` - GcsFaultToleranceOptions *GcsFaultToleranceOptionsApplyConfiguration `json:"gcsFaultToleranceOptions,omitempty"` - HeadGroupSpec *HeadGroupSpecApplyConfiguration `json:"headGroupSpec,omitempty"` - RayVersion *string `json:"rayVersion,omitempty"` - WorkerGroupSpecs []WorkerGroupSpecApplyConfiguration `json:"workerGroupSpecs,omitempty"` + UpgradeStrategy *RayClusterUpgradeStrategyApplyConfiguration `json:"upgradeStrategy,omitempty"` + AuthOptions *AuthOptionsApplyConfiguration `json:"authOptions,omitempty"` + Suspend *bool `json:"suspend,omitempty"` + ManagedBy *string `json:"managedBy,omitempty"` + AutoscalerOptions *AutoscalerOptionsApplyConfiguration `json:"autoscalerOptions,omitempty"` + HeadServiceAnnotations map[string]string `json:"headServiceAnnotations,omitempty"` + EnableInTreeAutoscaling *bool `json:"enableInTreeAutoscaling,omitempty"` + GcsFaultToleranceOptions *GcsFaultToleranceOptionsApplyConfiguration `json:"gcsFaultToleranceOptions,omitempty"` + HeadGroupSpec *HeadGroupSpecApplyConfiguration `json:"headGroupSpec,omitempty"` + RayVersion *string `json:"rayVersion,omitempty"` + WorkerGroupSpecs []WorkerGroupSpecApplyConfiguration `json:"workerGroupSpecs,omitempty"` } // RayClusterSpecApplyConfiguration constructs a declarative configuration of the RayClusterSpec type for use with @@ -23,6 +24,14 @@ func RayClusterSpec() *RayClusterSpecApplyConfiguration { return &RayClusterSpecApplyConfiguration{} } +// WithUpgradeStrategy sets the UpgradeStrategy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UpgradeStrategy field is set to the value of the last call. +func (b *RayClusterSpecApplyConfiguration) WithUpgradeStrategy(value *RayClusterUpgradeStrategyApplyConfiguration) *RayClusterSpecApplyConfiguration { + b.UpgradeStrategy = value + return b +} + // WithAuthOptions sets the AuthOptions field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the AuthOptions field is set to the value of the last call. diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go new file mode 100644 index 00000000000..f1a1aa39064 --- /dev/null +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go @@ -0,0 +1,27 @@ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1 + +import ( + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" +) + +// RayClusterUpgradeStrategyApplyConfiguration represents a declarative configuration of the RayClusterUpgradeStrategy type for use +// with apply. +type RayClusterUpgradeStrategyApplyConfiguration struct { + Type *rayv1.RayClusterUpgradeType `json:"type,omitempty"` +} + +// RayClusterUpgradeStrategyApplyConfiguration constructs a declarative configuration of the RayClusterUpgradeStrategy type for use with +// apply. +func RayClusterUpgradeStrategy() *RayClusterUpgradeStrategyApplyConfiguration { + return &RayClusterUpgradeStrategyApplyConfiguration{} +} + +// WithType sets the Type field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Type field is set to the value of the last call. +func (b *RayClusterUpgradeStrategyApplyConfiguration) WithType(value rayv1.RayClusterUpgradeType) *RayClusterUpgradeStrategyApplyConfiguration { + b.Type = &value + return b +} diff --git a/ray-operator/pkg/client/applyconfiguration/utils.go b/ray-operator/pkg/client/applyconfiguration/utils.go index 9ad6513e0c8..16a92835cf5 100644 --- a/ray-operator/pkg/client/applyconfiguration/utils.go +++ b/ray-operator/pkg/client/applyconfiguration/utils.go @@ -44,6 +44,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &rayv1.RayClusterSpecApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayClusterStatus"): return &rayv1.RayClusterStatusApplyConfiguration{} + case v1.SchemeGroupVersion.WithKind("RayClusterUpgradeStrategy"): + return &rayv1.RayClusterUpgradeStrategyApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJob"): return &rayv1.RayJobApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJobSpec"): diff --git a/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go b/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go index e38da613bda..461d1c270e8 100644 --- a/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go +++ b/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go @@ -182,7 +182,7 @@ func TestUpdateServeConfigAndRayClusterSpecWithUpgradeDisabled(t *testing.T) { rayServiceName := "rayservice-sample" rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).WithSpec(RayServiceSampleYamlApplyConfiguration(). - WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy().WithType(rayv1.None))) + WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy().WithType(rayv1.RayServiceUpgradeNone))) rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) g.Expect(err).NotTo(HaveOccurred()) From 9359c5f87b4e409adacd2d84072fb14f1fb260e6 Mon Sep 17 00:00:00 2001 From: win5923 Date: Thu, 20 Nov 2025 14:12:35 +0000 Subject: [PATCH 02/12] Add test Signed-off-by: win5923 --- .../ray/raycluster_controller_unit_test.go | 122 ++++++++++++++++++ .../controllers/ray/utils/validation.go | 13 +- .../controllers/ray/utils/validation_test.go | 89 +++++++++++++ ray-operator/test/e2e/raycluster_test.go | 57 ++++++++ 4 files changed, 276 insertions(+), 5 deletions(-) diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index 9c1ac017023..901d4cf8165 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -3643,3 +3643,125 @@ func TestReconcile_PodsWithAuthToken(t *testing.T) { assert.True(t, authModeEnvFound, "Auth mode env vars not found") } } + +func TestShouldRecreatePodsForUpgrade(t *testing.T) { + setupTest(t) + ctx := context.Background() + + // Calculate template hashes for matching pods + headHash, err := common.GeneratePodTemplateHash(testRayCluster.Spec.HeadGroupSpec.Template) + require.NoError(t, err, "Failed to generate head template hash") + workerHash, err := common.GeneratePodTemplateHash(testRayCluster.Spec.WorkerGroupSpecs[0].Template) + require.NoError(t, err, "Failed to generate worker template hash") + + // Helper function to create a pod with specific template hash + createPodWithHash := func(name string, nodeType rayv1.RayNodeType, groupName string, templateHash string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespaceStr, + Labels: map[string]string{ + utils.RayNodeLabelKey: "yes", + utils.RayClusterLabelKey: instanceName, + utils.RayNodeTypeLabelKey: string(nodeType), + utils.RayNodeGroupLabelKey: groupName, + }, + Annotations: map[string]string{ + utils.PodTemplateHashKey: templateHash, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "ray-head", Image: "rayproject/ray:latest"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + } + + tests := []struct { + name string + upgradeStrategy *rayv1.RayClusterUpgradeStrategy + pods []runtime.Object + expectedRecreate bool + }{ + { + name: "No upgrade strategy", + upgradeStrategy: nil, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Upgrade strategy type is nil", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{Type: nil}, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Upgrade strategy type is None", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeNone), + }, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Recreate strategy but no pods exist", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{}, + expectedRecreate: false, + }, + { + name: "Recreate strategy with matching template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, workerHash), + }, + expectedRecreate: false, + }, + { + name: "Recreate strategy with mismatched head template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, "old-head-hash"), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, workerHash), + }, + expectedRecreate: true, + }, + { + name: "Recreate strategy with mismatched worker template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, "old-worker-hash"), + }, + expectedRecreate: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cluster := testRayCluster.DeepCopy() + cluster.Spec.UpgradeStrategy = tc.upgradeStrategy + + fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects(tc.pods...).Build() + testRayClusterReconciler := &RayClusterReconciler{ + Client: fakeClient, + Scheme: scheme.Scheme, + Recorder: &record.FakeRecorder{}, + } + + result := testRayClusterReconciler.shouldRecreatePodsForUpgrade(ctx, cluster) + assert.Equal(t, tc.expectedRecreate, result) + }) + } +} diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index a9a2b69ac0a..aa8ce262517 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -38,11 +38,14 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { } func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { - if instance.Spec.UpgradeStrategy != nil { - creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) - if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { - return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) - } + strategy := instance.Spec.UpgradeStrategy + if strategy == nil || strategy.Type == nil || *strategy.Type == rayv1.RayClusterUpgradeNone { + return nil + } + + creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) + if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { + return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) } return nil } diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index 494fe39c29c..d95b0f55a4d 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -1887,3 +1887,92 @@ func TestValidateClusterUpgradeOptions(t *testing.T) { }) } } + +func TestValidateRayClusterUpgradeOptions(t *testing.T) { + tests := []struct { + upgradeStrategy *rayv1.RayClusterUpgradeStrategy + name string + originatedFromCRD string + errorMessage string + expectError bool + }{ + { + name: "No upgrade strategy", + upgradeStrategy: nil, + expectError: false, + }, + { + name: "Upgrade strategy set None for RayCluster", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeNone), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: false, + }, + { + name: "Upgrade strategy set Recreate for RayCluster", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: false, + }, + { + name: "Upgrade strategy set Recreate for RayCluster created by RayJob", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + originatedFromCRD: string(RayJobCRD), + expectError: true, + errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayJob", + }, + { + name: "Upgrade strategy set Recreate for RayCluster created by RayService", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.Recreate), + }, + originatedFromCRD: string(RayServiceCRD), + expectError: true, + errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayService", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cluster := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + Labels: map[string]string{}, + }, + Spec: rayv1.RayClusterSpec{ + UpgradeStrategy: tt.upgradeStrategy, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "ray-head", Image: "rayproject/ray:latest"}, + }, + }, + }, + }, + }, + } + + if tt.originatedFromCRD != "" { + cluster.Labels[RayOriginatedFromCRDLabelKey] = tt.originatedFromCRD + } + + err := ValidateRayClusterUpgradeOptions(cluster) + + if tt.expectError { + require.Error(t, err, "Expected error for test case: %s", tt.name) + if tt.errorMessage != "" { + assert.Contains(t, err.Error(), tt.errorMessage, "Error message mismatch for test case: %s", tt.name) + } + } else { + require.NoError(t, err, "Unexpected error for test case: %s", tt.name) + } + }) + } +} diff --git a/ray-operator/test/e2e/raycluster_test.go b/ray-operator/test/e2e/raycluster_test.go index 3c26221c46b..719ee651f32 100644 --- a/ray-operator/test/e2e/raycluster_test.go +++ b/ray-operator/test/e2e/raycluster_test.go @@ -208,3 +208,60 @@ func TestRayClusterScalingDown(t *testing.T) { g.Expect(err).NotTo(HaveOccurred(), "Failed to remove finalizer from pod %s/%s", namespace.Name, pod.Name) } } + +func TestRayClusterUpgradeStrategy(t *testing.T) { + test := With(t) + g := NewWithT(t) + + namespace := test.NewTestNamespace() + + rayClusterAC := rayv1ac.RayCluster("raycluster-upgrade-recreate", namespace.Name).WithSpec(NewRayClusterSpec()) + rayClusterAC.Spec.UpgradeStrategy = rayv1ac.RayClusterUpgradeStrategy().WithType(rayv1.Recreate) + + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayCluster %s/%s successfully", namespace.Name, rayCluster.Name) + + LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", namespace.Name, rayCluster.Name) + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + initialHeadPodName := headPod.Name + + workerPods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(workerPods).To(HaveLen(1)) + + LogWithTimestamp(test.T(), "Updating RayCluster %s/%s pod template to trigger upgrade", rayCluster.Namespace, rayCluster.Name) + // Update head pod template spec to trigger pod template hash change + // Add an annotation to change the pod spec + rayClusterAC.Spec.HeadGroupSpec.Template. + WithAnnotations(map[string]string{"upgrade-trigger": "test"}) + rayCluster, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Updated RayCluster pod template") + + LogWithTimestamp(test.T(), "Waiting for new head pod to be running after recreate") + g.Eventually(func() bool { + newHeadPod, err := GetHeadPod(test, rayCluster) + if err != nil { + return false + } + return newHeadPod.Name != initialHeadPodName && newHeadPod.Status.Phase == corev1.PodRunning + }, TestTimeoutMedium).Should(BeTrue()) + + // Wait for cluster to become ready + LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready again", rayCluster.Namespace, rayCluster.Name) + g.Eventually(RayCluster(test, namespace.Name, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + newHeadPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(newHeadPod.Name).NotTo(Equal(initialHeadPodName)) + + newWorkerPods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(newWorkerPods).To(HaveLen(1)) +} From acd77391083378684989f1416e16bb2b1f1709e8 Mon Sep 17 00:00:00 2001 From: win5923 Date: Sat, 22 Nov 2025 10:44:47 +0000 Subject: [PATCH 03/12] improve readability Signed-off-by: win5923 --- .../controllers/ray/raycluster_controller.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 1e95c365343..358e75beec5 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -1116,7 +1116,7 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context return nil } -// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on Head/Worker pod template changes +// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on PodTemplateSpec changes func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { logger := ctrl.LoggerFrom(ctx) @@ -1159,9 +1159,10 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, actualHash := pod.Annotations[utils.PodTemplateHashKey] var expectedHash string - if nodeType == string(rayv1.HeadNode) { + switch rayv1.RayNodeType(nodeType) { + case rayv1.HeadNode: expectedHash = headHash - } else if nodeType == string(rayv1.WorkerNode) { + case rayv1.WorkerNode: groupName := pod.Labels[utils.RayNodeGroupLabelKey] var ok bool expectedHash, ok = workerHashMap[groupName] @@ -1169,21 +1170,16 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, logger.Info("Worker group not found in spec, skipping pod", "pod", pod.Name, "groupName", groupName) continue } - } else { + default: continue } if actualHash != expectedHash { logger.Info("Pod template has changed, will recreate all pods", - "pod", pod.Name, - "nodeType", nodeType, - "actualHash", actualHash, - "expectedHash", expectedHash, - "upgradeStrategy", *instance.Spec.UpgradeStrategy.Type) + "rayCluster", instance.Name) return true } } - return false } From f8102f37f28a26091d9946dec97d49781fc633e3 Mon Sep 17 00:00:00 2001 From: win5923 Date: Tue, 2 Dec 2025 14:37:29 +0000 Subject: [PATCH 04/12] Remove deepcopy in GeneratePodTemplateHash Signed-off-by: win5923 --- ray-operator/controllers/ray/common/pod.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index 6df0b750975..4ef0766dd6d 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -159,8 +159,7 @@ func configureGCSFaultTolerance(podTemplate *corev1.PodTemplateSpec, instance ra } func GeneratePodTemplateHash(template corev1.PodTemplateSpec) (string, error) { - originalTemplate := template.DeepCopy() - return utils.GenerateJsonHash(*originalTemplate) + return utils.GenerateJsonHash(template) } // DefaultHeadPodTemplate sets the config values From c93e25b31a102fa467b7dc13edee58a9b1491de8 Mon Sep 17 00:00:00 2001 From: win5923 Date: Tue, 2 Dec 2025 15:07:31 +0000 Subject: [PATCH 05/12] Refactor ValidateRayClusterUpgradeOptions Signed-off-by: win5923 --- ray-operator/controllers/ray/common/pod.go | 6 ++++++ .../controllers/ray/utils/validation.go | 17 +++++++++++------ .../controllers/ray/utils/validation_test.go | 9 +++++++++ 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index 4ef0766dd6d..c6003e40166 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -168,11 +168,14 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // headPort is passed into setMissingRayStartParams but unused there for the head pod. // To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic. + log := ctrl.LoggerFrom(ctx) // Calculate the pod template hash before any modifications // This ensures the hash reflects the original user-defined template for upgrade detection templateHash := "" if hash, err := GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template); err == nil { templateHash = hash + } else { + log.Error(err, "Failed to generate pod template hash for head group") } podTemplate := headSpec.Template @@ -314,6 +317,7 @@ func getEnableProbesInjection() bool { // DefaultWorkerPodTemplate sets the config values func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec { + log := ctrl.LoggerFrom(ctx) // Calculate the pod template hash before any modifications // This ensures the hash reflects the original user-defined template for upgrade detection templateHash := "" @@ -321,6 +325,8 @@ func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, wo if wg.GroupName == workerSpec.GroupName { if hash, err := GeneratePodTemplateHash(wg.Template); err == nil { templateHash = hash + } else { + log.Error(err, "Failed to generate pod template hash for worker group", "groupName", workerSpec.GroupName) } break } diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index aa8ce262517..ac0c46bcb27 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -38,14 +38,19 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { } func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { - strategy := instance.Spec.UpgradeStrategy - if strategy == nil || strategy.Type == nil || *strategy.Type == rayv1.RayClusterUpgradeNone { - return nil + // only Recreate and None are valid upgradeType + if instance.Spec.UpgradeStrategy != nil && instance.Spec.UpgradeStrategy.Type != nil && + *instance.Spec.UpgradeStrategy.Type != rayv1.Recreate && + *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterUpgradeNone { + return fmt.Errorf("The RayCluster spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s or %s", *instance.Spec.UpgradeStrategy.Type, rayv1.Recreate, rayv1.RayClusterUpgradeNone) } - creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) - if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { - return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) + // only allow UpgradeStrategy to be set when RayCluster is created directly by user + if instance.Spec.UpgradeStrategy != nil && instance.Spec.UpgradeStrategy.Type != nil { + creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) + if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { + return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) + } } return nil } diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index d95b0f55a4d..c1660f28e90 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -1935,6 +1935,15 @@ func TestValidateRayClusterUpgradeOptions(t *testing.T) { expectError: true, errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayService", }, + { + name: "Invalid upgrade strategy value", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeType("InvalidStrategy")), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: true, + errorMessage: "Spec.UpgradeStrategy.Type value InvalidStrategy is invalid, valid options are Recreate or None", + }, } for _, tt := range tests { From 9b622e55de45f7e14fbc01c3de8148041336b5f4 Mon Sep 17 00:00:00 2001 From: win5923 Date: Sat, 6 Dec 2025 13:39:06 +0000 Subject: [PATCH 06/12] add kubebuilder:validation Signed-off-by: win5923 --- docs/reference/api.md | 5 +++-- helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml | 3 +++ helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml | 3 +++ helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml | 3 +++ ray-operator/apis/ray/v1/raycluster_types.go | 1 + ray-operator/config/crd/bases/ray.io_rayclusters.yaml | 3 +++ ray-operator/config/crd/bases/ray.io_rayjobs.yaml | 3 +++ ray-operator/config/crd/bases/ray.io_rayservices.yaml | 3 +++ ray-operator/controllers/ray/utils/validation.go | 1 - 9 files changed, 22 insertions(+), 3 deletions(-) diff --git a/docs/reference/api.md b/docs/reference/api.md index 2a2c3bbf470..e723ea3849d 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -323,7 +323,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | -| `type` _[RayClusterUpgradeType](#rayclusterupgradetype)_ | Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. | | | +| `type` _[RayClusterUpgradeType](#rayclusterupgradetype)_ | Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. | | Enum: [Recreate None]
| #### RayClusterUpgradeType @@ -332,7 +332,8 @@ _Underlying type:_ _string_ - +_Validation:_ +- Enum: [Recreate None] _Appears in:_ - [RayClusterUpgradeStrategy](#rayclusterupgradestrategy) diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml index e731ca0105f..575408824d9 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml @@ -4535,6 +4535,9 @@ spec: upgradeStrategy: properties: type: + enum: + - Recreate + - None type: string type: object workerGroupSpecs: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index b5d7ce86490..449c816a84e 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -4617,6 +4617,9 @@ spec: upgradeStrategy: properties: type: + enum: + - Recreate + - None type: string type: object workerGroupSpecs: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index 634b54b59de..c99e2b253bf 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -4515,6 +4515,9 @@ spec: upgradeStrategy: properties: type: + enum: + - Recreate + - None type: string type: object workerGroupSpecs: diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index b3113d65fd1..537f3c15975 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -52,6 +52,7 @@ type RayClusterSpec struct { WorkerGroupSpecs []WorkerGroupSpec `json:"workerGroupSpecs,omitempty"` } +// +kubebuilder:validation:Enum=Recreate;None type RayClusterUpgradeType string const ( diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index e731ca0105f..575408824d9 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -4535,6 +4535,9 @@ spec: upgradeStrategy: properties: type: + enum: + - Recreate + - None type: string type: object workerGroupSpecs: diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index b5d7ce86490..449c816a84e 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -4617,6 +4617,9 @@ spec: upgradeStrategy: properties: type: + enum: + - Recreate + - None type: string type: object workerGroupSpecs: diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index 634b54b59de..c99e2b253bf 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -4515,6 +4515,9 @@ spec: upgradeStrategy: properties: type: + enum: + - Recreate + - None type: string type: object workerGroupSpecs: diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index ac0c46bcb27..7675a01dca3 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -38,7 +38,6 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { } func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { - // only Recreate and None are valid upgradeType if instance.Spec.UpgradeStrategy != nil && instance.Spec.UpgradeStrategy.Type != nil && *instance.Spec.UpgradeStrategy.Type != rayv1.Recreate && *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterUpgradeNone { From 5ec2fff0b0f09f56d6e96eb7cca4f945dbff4dc4 Mon Sep 17 00:00:00 2001 From: win5923 Date: Sun, 7 Dec 2025 06:52:58 +0000 Subject: [PATCH 07/12] Rename the RayServiceUpgradeType and RayClusterUpgradeType constants Signed-off-by: win5923 --- docs/reference/api.md | 2 +- ray-operator/apis/ray/v1/raycluster_types.go | 2 +- ray-operator/apis/ray/v1/rayservice_types.go | 6 +++--- .../controllers/ray/raycluster_controller.go | 2 +- .../ray/raycluster_controller_unit_test.go | 8 ++++---- .../controllers/ray/rayservice_controller.go | 6 +++--- .../controllers/ray/rayservice_controller_test.go | 2 +- .../ray/rayservice_controller_unit_test.go | 14 +++++++------- ray-operator/controllers/ray/utils/util.go | 2 +- ray-operator/controllers/ray/utils/util_test.go | 4 ++-- ray-operator/controllers/ray/utils/validation.go | 10 +++++----- .../controllers/ray/utils/validation_test.go | 10 +++++----- ray-operator/test/e2e/raycluster_test.go | 2 +- ray-operator/test/e2eincrementalupgrade/support.go | 2 +- 14 files changed, 36 insertions(+), 36 deletions(-) diff --git a/docs/reference/api.md b/docs/reference/api.md index e723ea3849d..971f6e96799 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -456,7 +456,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | -| `type` _[RayServiceUpgradeType](#rayserviceupgradetype)_ | Type represents the strategy used when upgrading the RayService. Currently supports `NewCluster` and `None`. | | | +| `type` _[RayServiceUpgradeType](#rayserviceupgradetype)_ | Type represents the strategy used when upgrading the RayService. Currently supports `NewCluster`, `NewClusterWithIncrementalUpgrade` and `None`. | | | | `clusterUpgradeOptions` _[ClusterUpgradeOptions](#clusterupgradeoptions)_ | ClusterUpgradeOptions defines the behavior of a NewClusterWithIncrementalUpgrade type.
RayServiceIncrementalUpgrade feature gate must be enabled to set ClusterUpgradeOptions. | | | diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index 537f3c15975..c0935244d32 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -57,7 +57,7 @@ type RayClusterUpgradeType string const ( // During upgrade, Recreate strategy will delete all existing pods before creating new ones - Recreate RayClusterUpgradeType = "Recreate" + RayClusterRecreate RayClusterUpgradeType = "Recreate" // No new pod will be created while the strategy is set to None RayClusterUpgradeNone RayClusterUpgradeType = "None" ) diff --git a/ray-operator/apis/ray/v1/rayservice_types.go b/ray-operator/apis/ray/v1/rayservice_types.go index d8b8304a806..a7494c08158 100644 --- a/ray-operator/apis/ray/v1/rayservice_types.go +++ b/ray-operator/apis/ray/v1/rayservice_types.go @@ -24,9 +24,9 @@ type RayServiceUpgradeType string const ( // During upgrade, NewClusterWithIncrementalUpgrade strategy will create an upgraded cluster to gradually scale // and migrate traffic to using Gateway API. - NewClusterWithIncrementalUpgrade RayServiceUpgradeType = "NewClusterWithIncrementalUpgrade" + RayServiceNewClusterWithIncrementalUpgrade RayServiceUpgradeType = "NewClusterWithIncrementalUpgrade" // During upgrade, NewCluster strategy will create new upgraded cluster and switch to it when it becomes ready - NewCluster RayServiceUpgradeType = "NewCluster" + RayServiceNewCluster RayServiceUpgradeType = "NewCluster" // No new cluster will be created while the strategy is set to None RayServiceUpgradeNone RayServiceUpgradeType = "None" ) @@ -75,7 +75,7 @@ type ClusterUpgradeOptions struct { } type RayServiceUpgradeStrategy struct { - // Type represents the strategy used when upgrading the RayService. Currently supports `NewCluster` and `None`. + // Type represents the strategy used when upgrading the RayService. Currently supports `NewCluster`, `NewClusterWithIncrementalUpgrade` and `None`. // +optional Type *RayServiceUpgradeType `json:"type,omitempty"` // ClusterUpgradeOptions defines the behavior of a NewClusterWithIncrementalUpgrade type. diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 358e75beec5..59dd9b03769 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -1123,7 +1123,7 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, if instance.Spec.UpgradeStrategy == nil || instance.Spec.UpgradeStrategy.Type == nil { return false } - if *instance.Spec.UpgradeStrategy.Type != rayv1.Recreate { + if *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterRecreate { return false } diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index 901d4cf8165..94f7c1a5600 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -3708,7 +3708,7 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { { name: "Recreate strategy but no pods exist", upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ - Type: ptr.To(rayv1.Recreate), + Type: ptr.To(rayv1.RayClusterRecreate), }, pods: []runtime.Object{}, expectedRecreate: false, @@ -3716,7 +3716,7 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { { name: "Recreate strategy with matching template hash", upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ - Type: ptr.To(rayv1.Recreate), + Type: ptr.To(rayv1.RayClusterRecreate), }, pods: []runtime.Object{ createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), @@ -3727,7 +3727,7 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { { name: "Recreate strategy with mismatched head template hash", upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ - Type: ptr.To(rayv1.Recreate), + Type: ptr.To(rayv1.RayClusterRecreate), }, pods: []runtime.Object{ createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, "old-head-hash"), @@ -3738,7 +3738,7 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { { name: "Recreate strategy with mismatched worker template hash", upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ - Type: ptr.To(rayv1.Recreate), + Type: ptr.To(rayv1.RayClusterRecreate), }, pods: []runtime.Object{ createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index a774cd3d127..a5865af3d50 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -559,11 +559,11 @@ func isZeroDowntimeUpgradeEnabled(ctx context.Context, upgradeStrategy *rayv1.Ra upgradeType := upgradeStrategy.Type if upgradeType != nil { if features.Enabled(features.RayServiceIncrementalUpgrade) { - if *upgradeType != rayv1.NewCluster && *upgradeType != rayv1.NewClusterWithIncrementalUpgrade { - logger.Info("Zero-downtime upgrade is disabled because UpgradeStrategy.Type is not set to %s or %s.", string(rayv1.NewCluster), string(rayv1.NewClusterWithIncrementalUpgrade)) + if *upgradeType != rayv1.RayServiceNewCluster && *upgradeType != rayv1.RayServiceNewClusterWithIncrementalUpgrade { + logger.Info("Zero-downtime upgrade is disabled because UpgradeStrategy.Type is not set to %s or %s.", string(rayv1.RayServiceNewCluster), string(rayv1.RayServiceNewClusterWithIncrementalUpgrade)) return false } - } else if *upgradeType != rayv1.NewCluster { + } else if *upgradeType != rayv1.RayServiceNewCluster { logger.Info("Zero-downtime upgrade is disabled because UpgradeStrategy.Type is not set to NewCluster.") return false } diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index 29b8ad1dd55..4abbc411ed2 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -80,7 +80,7 @@ func rayServiceTemplate(name string, namespace string, serveAppName string) *ray Spec: rayv1.RayServiceSpec{ ServeConfigV2: serveConfigV2, UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewCluster), + Type: ptr.To(rayv1.RayServiceNewCluster), }, RayClusterSpec: rayv1.RayClusterSpec{ RayVersion: support.GetRayVersion(), diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index e40c4bd554b..ad6a5851335 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -1245,19 +1245,19 @@ func TestIsZeroDowntimeUpgradeEnabled(t *testing.T) { }, { name: "upgrade strategy is set to NewCluster", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.NewCluster)}, + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceNewCluster)}, enableZeroDowntimeEnvVar: "", expected: true, }, { name: "upgrade strategy is set to NewCluster, and env var is not set", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.NewCluster)}, + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceNewCluster)}, enableZeroDowntimeEnvVar: "true", expected: true, }, { name: "upgrade strategy is set to NewCluster, and env var is set to false", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.NewCluster)}, + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceNewCluster)}, enableZeroDowntimeEnvVar: "false", expected: true, }, @@ -1405,7 +1405,7 @@ func makeIncrementalUpgradeRayService( } if withOptions { spec.UpgradeStrategy = &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ GatewayClassName: gatewayClassName, StepSizePercent: stepSizePercent, @@ -1520,7 +1520,7 @@ func TestCreateHTTPRoute(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "test-rayservice", Namespace: namespace}, Spec: rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ StepSizePercent: &stepSize, IntervalSeconds: &interval, @@ -1685,7 +1685,7 @@ func TestReconcileHTTPRoute(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "test-rayservice", Namespace: namespace}, Spec: rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ StepSizePercent: &stepSize, IntervalSeconds: &interval, @@ -1924,7 +1924,7 @@ func TestReconcileServeTargetCapacity(t *testing.T) { rayService := &rayv1.RayService{ Spec: rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ MaxSurgePercent: ptr.To(tt.maxSurgePercent), }, diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index cc4ed92fec7..ab27e8bf319 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -761,7 +761,7 @@ func IsIncrementalUpgradeEnabled(spec *rayv1.RayServiceSpec) bool { return false } return spec != nil && spec.UpgradeStrategy != nil && - *spec.UpgradeStrategy.Type == rayv1.NewClusterWithIncrementalUpgrade + *spec.UpgradeStrategy.Type == rayv1.RayServiceNewClusterWithIncrementalUpgrade } func GetRayServiceClusterUpgradeOptions(spec *rayv1.RayServiceSpec) *rayv1.ClusterUpgradeOptions { diff --git a/ray-operator/controllers/ray/utils/util_test.go b/ray-operator/controllers/ray/utils/util_test.go index 9a9371acd05..771f0cef587 100644 --- a/ray-operator/controllers/ray/utils/util_test.go +++ b/ray-operator/controllers/ray/utils/util_test.go @@ -1510,7 +1510,7 @@ func TestIsIncrementalUpgradeEnabled(t *testing.T) { name: "UpgradeStrategy Type is NewClusterWithIncrementalUpgrade but feature disabled", spec: &rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), }, }, featureEnabled: false, @@ -1520,7 +1520,7 @@ func TestIsIncrementalUpgradeEnabled(t *testing.T) { name: "UpgradeStrategy Type is NewClusterWithIncrementalUpgrade and feature enabled", spec: &rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), }, }, featureEnabled: true, diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 7675a01dca3..35190e7be42 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -39,9 +39,9 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { if instance.Spec.UpgradeStrategy != nil && instance.Spec.UpgradeStrategy.Type != nil && - *instance.Spec.UpgradeStrategy.Type != rayv1.Recreate && + *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterRecreate && *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterUpgradeNone { - return fmt.Errorf("The RayCluster spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s or %s", *instance.Spec.UpgradeStrategy.Type, rayv1.Recreate, rayv1.RayClusterUpgradeNone) + return fmt.Errorf("The RayCluster spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s or %s", *instance.Spec.UpgradeStrategy.Type, rayv1.RayClusterRecreate, rayv1.RayClusterUpgradeNone) } // only allow UpgradeStrategy to be set when RayCluster is created directly by user @@ -387,9 +387,9 @@ func ValidateRayServiceSpec(rayService *rayv1.RayService) error { if rayService.Spec.UpgradeStrategy != nil && rayService.Spec.UpgradeStrategy.Type != nil && *rayService.Spec.UpgradeStrategy.Type != rayv1.RayServiceUpgradeNone && - *rayService.Spec.UpgradeStrategy.Type != rayv1.NewCluster && - *rayService.Spec.UpgradeStrategy.Type != rayv1.NewClusterWithIncrementalUpgrade { - return fmt.Errorf("Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.NewClusterWithIncrementalUpgrade, rayv1.NewCluster, rayv1.RayServiceUpgradeNone) + *rayService.Spec.UpgradeStrategy.Type != rayv1.RayServiceNewCluster && + *rayService.Spec.UpgradeStrategy.Type != rayv1.RayServiceNewClusterWithIncrementalUpgrade { + return fmt.Errorf("The RayService spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.RayServiceNewClusterWithIncrementalUpgrade, rayv1.RayServiceNewCluster, rayv1.RayServiceUpgradeNone) } if rayService.Spec.RayClusterDeletionDelaySeconds != nil && diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index c1660f28e90..398259e4080 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -1854,7 +1854,7 @@ func TestValidateClusterUpgradeOptions(t *testing.T) { var upgradeStrategy *rayv1.RayServiceUpgradeStrategy if tt.maxSurgePercent != nil || tt.stepSizePercent != nil || tt.intervalSeconds != nil || tt.gatewayClassName != "" { upgradeStrategy = &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ MaxSurgePercent: tt.maxSurgePercent, StepSizePercent: tt.stepSizePercent, @@ -1864,7 +1864,7 @@ func TestValidateClusterUpgradeOptions(t *testing.T) { } } else if tt.expectError { upgradeStrategy = &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), } } @@ -1912,7 +1912,7 @@ func TestValidateRayClusterUpgradeOptions(t *testing.T) { { name: "Upgrade strategy set Recreate for RayCluster", upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ - Type: ptr.To(rayv1.Recreate), + Type: ptr.To(rayv1.RayClusterRecreate), }, originatedFromCRD: string(RayClusterCRD), expectError: false, @@ -1920,7 +1920,7 @@ func TestValidateRayClusterUpgradeOptions(t *testing.T) { { name: "Upgrade strategy set Recreate for RayCluster created by RayJob", upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ - Type: ptr.To(rayv1.Recreate), + Type: ptr.To(rayv1.RayClusterRecreate), }, originatedFromCRD: string(RayJobCRD), expectError: true, @@ -1929,7 +1929,7 @@ func TestValidateRayClusterUpgradeOptions(t *testing.T) { { name: "Upgrade strategy set Recreate for RayCluster created by RayService", upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ - Type: ptr.To(rayv1.Recreate), + Type: ptr.To(rayv1.RayClusterRecreate), }, originatedFromCRD: string(RayServiceCRD), expectError: true, diff --git a/ray-operator/test/e2e/raycluster_test.go b/ray-operator/test/e2e/raycluster_test.go index 719ee651f32..3b45333789d 100644 --- a/ray-operator/test/e2e/raycluster_test.go +++ b/ray-operator/test/e2e/raycluster_test.go @@ -216,7 +216,7 @@ func TestRayClusterUpgradeStrategy(t *testing.T) { namespace := test.NewTestNamespace() rayClusterAC := rayv1ac.RayCluster("raycluster-upgrade-recreate", namespace.Name).WithSpec(NewRayClusterSpec()) - rayClusterAC.Spec.UpgradeStrategy = rayv1ac.RayClusterUpgradeStrategy().WithType(rayv1.Recreate) + rayClusterAC.Spec.UpgradeStrategy = rayv1ac.RayClusterUpgradeStrategy().WithType(rayv1.RayClusterRecreate) rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) g.Expect(err).NotTo(HaveOccurred()) diff --git a/ray-operator/test/e2eincrementalupgrade/support.go b/ray-operator/test/e2eincrementalupgrade/support.go index b5e6293f491..58002f25a9b 100644 --- a/ray-operator/test/e2eincrementalupgrade/support.go +++ b/ray-operator/test/e2eincrementalupgrade/support.go @@ -43,7 +43,7 @@ func IncrementalUpgradeRayServiceApplyConfiguration( ) *rayv1ac.RayServiceSpecApplyConfiguration { return rayv1ac.RayServiceSpec(). WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy(). - WithType(rayv1.NewClusterWithIncrementalUpgrade). + WithType(rayv1.RayServiceNewClusterWithIncrementalUpgrade). WithClusterUpgradeOptions( rayv1ac.ClusterUpgradeOptions(). WithGatewayClassName("istio"). From 4c724b513534803021ead5f7bab20122fd8d4a44 Mon Sep 17 00:00:00 2001 From: win5923 Date: Sun, 7 Dec 2025 15:07:20 +0000 Subject: [PATCH 08/12] add ray.io/kuberay-version annotations for head pod and worker pods Signed-off-by: win5923 --- ray-operator/controllers/ray/common/pod.go | 17 +++-- .../controllers/ray/raycluster_controller.go | 72 ++++++++++++++++++- .../ray/raycluster_controller_unit_test.go | 19 +++++ 3 files changed, 100 insertions(+), 8 deletions(-) diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index c6003e40166..1661f0c7157 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -188,10 +188,12 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + // Set the KubeRay version used to create the pod + podTemplate.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION if templateHash != "" { - if podTemplate.Annotations == nil { - podTemplate.Annotations = make(map[string]string) - } podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash } @@ -334,15 +336,16 @@ func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, wo podTemplate := workerSpec.Template podTemplate.GenerateName = podName - // Pods created by RayCluster should be restricted to the namespace of the RayCluster. // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + // Set the KubeRay version used to create the pod + podTemplate.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION if templateHash != "" { - if podTemplate.Annotations == nil { - podTemplate.Annotations = make(map[string]string) - } podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash } // The Ray worker should only start once the GCS server is ready. diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 59dd9b03769..6563346af57 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -1137,6 +1137,21 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, return false } + // Case 1: If the KubeRay version has changed, update annotations then check in the next reconciliation + for _, pod := range allPods.Items { + podVersion := pod.Annotations[utils.KubeRayVersion] + if podVersion != "" && podVersion != utils.KUBERAY_VERSION { + logger.Info("Pods have different KubeRay version, updating pod annotations", + "pod", pod.Name, + "podVersion", podVersion, + "currentVersion", utils.KUBERAY_VERSION) + if err := r.updatePodsAnnotations(ctx, instance, &allPods); err != nil { + logger.Error(err, "Failed to update pod annotations for KubeRay version change") + } + return false + } + } + headHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) if err != nil { logger.Error(err, "Failed to generate head template hash") @@ -1153,7 +1168,7 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, workerHashMap[workerGroup.GroupName] = hash } - // Check each pod to see if its template hash matches the current spec + // Case 2: If the pod template hash has changed, recreate all pods for _, pod := range allPods.Items { nodeType := pod.Labels[utils.RayNodeTypeLabelKey] actualHash := pod.Annotations[utils.PodTemplateHashKey] @@ -1183,6 +1198,61 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, return false } +// updatePodsAnnotations updates pod annotations to match the current KubeRay version and PodTemplateHashKey +func (r *RayClusterReconciler) updatePodsAnnotations(ctx context.Context, instance *rayv1.RayCluster, allPods *corev1.PodList) error { + logger := ctrl.LoggerFrom(ctx) + + for i := range allPods.Items { + pod := &allPods.Items[i] + podVersion := pod.Annotations[utils.KubeRayVersion] + + if podVersion == utils.KUBERAY_VERSION || podVersion == "" { + continue + } + + newHash, err := r.calculatePodTemplateHash(instance, pod) + if err != nil { + return err + } + + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + pod.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION + pod.Annotations[utils.PodTemplateHashKey] = newHash + + if err := r.Update(ctx, pod); err != nil { + return err + } + + logger.Info("Updated pod annotations", "pod", pod.Name, "version", utils.KUBERAY_VERSION) + } + + return nil +} + +// calculatePodTemplateHash calculates the hash for a pod's template based on its node type and group +func (r *RayClusterReconciler) calculatePodTemplateHash(instance *rayv1.RayCluster, pod *corev1.Pod) (string, error) { + nodeType := pod.Labels[utils.RayNodeTypeLabelKey] + + switch rayv1.RayNodeType(nodeType) { + case rayv1.HeadNode: + return common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) + + case rayv1.WorkerNode: + groupName := pod.Labels[utils.RayNodeGroupLabelKey] + for _, workerGroup := range instance.Spec.WorkerGroupSpecs { + if workerGroup.GroupName == groupName { + return common.GeneratePodTemplateHash(workerGroup.Template) + } + } + return "", fmt.Errorf("worker group %s not found in RayCluster spec", groupName) + + default: + return "", fmt.Errorf("unknown node type: %s", nodeType) + } +} + // shouldDeletePod returns whether the Pod should be deleted and the reason // // @param pod: The Pod to be checked. diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index 94f7c1a5600..d25b592e8e7 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -3668,6 +3668,7 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { }, Annotations: map[string]string{ utils.PodTemplateHashKey: templateHash, + utils.KubeRayVersion: utils.KUBERAY_VERSION, }, }, Spec: corev1.PodSpec{ @@ -3679,6 +3680,13 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { } } + // Helper function to create a pod with specific template hash and KubeRay version + createPodWithHashAndVersion := func(name string, nodeType rayv1.RayNodeType, groupName string, templateHash string, kuberayVersion string) *corev1.Pod { + pod := createPodWithHash(name, nodeType, groupName, templateHash) + pod.Annotations[utils.KubeRayVersion] = kuberayVersion + return pod + } + tests := []struct { name string upgradeStrategy *rayv1.RayClusterUpgradeStrategy @@ -3746,6 +3754,17 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { }, expectedRecreate: true, }, + { + name: "Recreate strategy with different KubeRay version - should update annotations and not recreate", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterRecreate), + }, + pods: []runtime.Object{ + createPodWithHashAndVersion("head-pod", rayv1.HeadNode, headGroupNameStr, "old-head-hash", "v1.0.0"), + createPodWithHashAndVersion("worker-pod", rayv1.WorkerNode, groupNameStr, "old-worker-hash", "v1.0.0"), + }, + expectedRecreate: false, + }, } for _, tc := range tests { From d1590387ebb134ef362da7a43d9f8244688c4250 Mon Sep 17 00:00:00 2001 From: Jun-Hao Wan Date: Mon, 8 Dec 2025 21:04:24 +0800 Subject: [PATCH 09/12] Update ray-operator/controllers/ray/common/pod.go Co-authored-by: Nary Yeh <60069744+machichima@users.noreply.github.com> Signed-off-by: Jun-Hao Wan --- ray-operator/controllers/ray/common/pod.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index 1661f0c7157..d5cf5ba37fe 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -172,13 +172,13 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // Calculate the pod template hash before any modifications // This ensures the hash reflects the original user-defined template for upgrade detection templateHash := "" - if hash, err := GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template); err == nil { + podTemplate := headSpec.Template + if hash, err := GeneratePodTemplateHash(podTemplate); err == nil { templateHash = hash } else { log.Error(err, "Failed to generate pod template hash for head group") } - podTemplate := headSpec.Template if utils.IsDeterministicHeadPodNameEnabled() { podTemplate.Name = podName } else { @@ -320,21 +320,16 @@ func getEnableProbesInjection() bool { // DefaultWorkerPodTemplate sets the config values func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec { log := ctrl.LoggerFrom(ctx) + + podTemplate := workerSpec.Template // Calculate the pod template hash before any modifications // This ensures the hash reflects the original user-defined template for upgrade detection templateHash := "" - for _, wg := range instance.Spec.WorkerGroupSpecs { - if wg.GroupName == workerSpec.GroupName { - if hash, err := GeneratePodTemplateHash(wg.Template); err == nil { - templateHash = hash - } else { - log.Error(err, "Failed to generate pod template hash for worker group", "groupName", workerSpec.GroupName) - } - break - } + if hash, err := GeneratePodTemplateHash(podTemplate); err == nil { + templateHash = hash + } else { + log.Error(err, "Failed to generate pod template hash for worker group", "groupName", workerSpec.GroupName) } - - podTemplate := workerSpec.Template podTemplate.GenerateName = podName // Pods created by RayCluster should be restricted to the namespace of the RayCluster. // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. From 96611773b7e20446c6a002af8b048ad9d49ff0ef Mon Sep 17 00:00:00 2001 From: win5923 Date: Wed, 10 Dec 2025 14:21:19 +0000 Subject: [PATCH 10/12] Revert "add ray.io/kuberay-version annotations for head pod and worker pods" This reverts commit 5f3afb37724896ee2ae13399ab3d48d26fb6719f. --- ray-operator/controllers/ray/common/pod.go | 17 ++--- .../controllers/ray/raycluster_controller.go | 72 +------------------ .../ray/raycluster_controller_unit_test.go | 19 ----- 3 files changed, 8 insertions(+), 100 deletions(-) diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index d5cf5ba37fe..9bda89a86c6 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -188,12 +188,10 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace - if podTemplate.Annotations == nil { - podTemplate.Annotations = make(map[string]string) - } - // Set the KubeRay version used to create the pod - podTemplate.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash } @@ -331,16 +329,15 @@ func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, wo log.Error(err, "Failed to generate pod template hash for worker group", "groupName", workerSpec.GroupName) } podTemplate.GenerateName = podName + // Pods created by RayCluster should be restricted to the namespace of the RayCluster. // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace - if podTemplate.Annotations == nil { - podTemplate.Annotations = make(map[string]string) - } - // Set the KubeRay version used to create the pod - podTemplate.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash } // The Ray worker should only start once the GCS server is ready. diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 6563346af57..59dd9b03769 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -1137,21 +1137,6 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, return false } - // Case 1: If the KubeRay version has changed, update annotations then check in the next reconciliation - for _, pod := range allPods.Items { - podVersion := pod.Annotations[utils.KubeRayVersion] - if podVersion != "" && podVersion != utils.KUBERAY_VERSION { - logger.Info("Pods have different KubeRay version, updating pod annotations", - "pod", pod.Name, - "podVersion", podVersion, - "currentVersion", utils.KUBERAY_VERSION) - if err := r.updatePodsAnnotations(ctx, instance, &allPods); err != nil { - logger.Error(err, "Failed to update pod annotations for KubeRay version change") - } - return false - } - } - headHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) if err != nil { logger.Error(err, "Failed to generate head template hash") @@ -1168,7 +1153,7 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, workerHashMap[workerGroup.GroupName] = hash } - // Case 2: If the pod template hash has changed, recreate all pods + // Check each pod to see if its template hash matches the current spec for _, pod := range allPods.Items { nodeType := pod.Labels[utils.RayNodeTypeLabelKey] actualHash := pod.Annotations[utils.PodTemplateHashKey] @@ -1198,61 +1183,6 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, return false } -// updatePodsAnnotations updates pod annotations to match the current KubeRay version and PodTemplateHashKey -func (r *RayClusterReconciler) updatePodsAnnotations(ctx context.Context, instance *rayv1.RayCluster, allPods *corev1.PodList) error { - logger := ctrl.LoggerFrom(ctx) - - for i := range allPods.Items { - pod := &allPods.Items[i] - podVersion := pod.Annotations[utils.KubeRayVersion] - - if podVersion == utils.KUBERAY_VERSION || podVersion == "" { - continue - } - - newHash, err := r.calculatePodTemplateHash(instance, pod) - if err != nil { - return err - } - - if pod.Annotations == nil { - pod.Annotations = make(map[string]string) - } - pod.Annotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION - pod.Annotations[utils.PodTemplateHashKey] = newHash - - if err := r.Update(ctx, pod); err != nil { - return err - } - - logger.Info("Updated pod annotations", "pod", pod.Name, "version", utils.KUBERAY_VERSION) - } - - return nil -} - -// calculatePodTemplateHash calculates the hash for a pod's template based on its node type and group -func (r *RayClusterReconciler) calculatePodTemplateHash(instance *rayv1.RayCluster, pod *corev1.Pod) (string, error) { - nodeType := pod.Labels[utils.RayNodeTypeLabelKey] - - switch rayv1.RayNodeType(nodeType) { - case rayv1.HeadNode: - return common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) - - case rayv1.WorkerNode: - groupName := pod.Labels[utils.RayNodeGroupLabelKey] - for _, workerGroup := range instance.Spec.WorkerGroupSpecs { - if workerGroup.GroupName == groupName { - return common.GeneratePodTemplateHash(workerGroup.Template) - } - } - return "", fmt.Errorf("worker group %s not found in RayCluster spec", groupName) - - default: - return "", fmt.Errorf("unknown node type: %s", nodeType) - } -} - // shouldDeletePod returns whether the Pod should be deleted and the reason // // @param pod: The Pod to be checked. diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index d25b592e8e7..94f7c1a5600 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -3668,7 +3668,6 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { }, Annotations: map[string]string{ utils.PodTemplateHashKey: templateHash, - utils.KubeRayVersion: utils.KUBERAY_VERSION, }, }, Spec: corev1.PodSpec{ @@ -3680,13 +3679,6 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { } } - // Helper function to create a pod with specific template hash and KubeRay version - createPodWithHashAndVersion := func(name string, nodeType rayv1.RayNodeType, groupName string, templateHash string, kuberayVersion string) *corev1.Pod { - pod := createPodWithHash(name, nodeType, groupName, templateHash) - pod.Annotations[utils.KubeRayVersion] = kuberayVersion - return pod - } - tests := []struct { name string upgradeStrategy *rayv1.RayClusterUpgradeStrategy @@ -3754,17 +3746,6 @@ func TestShouldRecreatePodsForUpgrade(t *testing.T) { }, expectedRecreate: true, }, - { - name: "Recreate strategy with different KubeRay version - should update annotations and not recreate", - upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ - Type: ptr.To(rayv1.RayClusterRecreate), - }, - pods: []runtime.Object{ - createPodWithHashAndVersion("head-pod", rayv1.HeadNode, headGroupNameStr, "old-head-hash", "v1.0.0"), - createPodWithHashAndVersion("worker-pod", rayv1.WorkerNode, groupNameStr, "old-worker-hash", "v1.0.0"), - }, - expectedRecreate: false, - }, } for _, tc := range tests { From 1923c2589e8c672fdf358c8c8ed3458bb93f897b Mon Sep 17 00:00:00 2001 From: win5923 Date: Wed, 10 Dec 2025 15:05:28 +0000 Subject: [PATCH 11/12] add rayClusterScaleExpectation.Delete for deleteAllPods Signed-off-by: win5923 --- ray-operator/controllers/ray/raycluster_controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 59dd9b03769..60951a5c5cf 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -653,6 +653,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv instance.Namespace, instance.Name, err) return errstd.Join(utils.ErrFailedDeleteAllPods, err) } + r.rayClusterScaleExpectation.Delete(instance.Name, instance.Namespace) r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedPod), "Deleted all Pods for RayCluster %s/%s due to spec change with Recreate upgradeStrategy", instance.Namespace, instance.Name) From b829245749a43b17d7e560686c0e471ffec9416b Mon Sep 17 00:00:00 2001 From: win5923 Date: Wed, 10 Dec 2025 15:33:02 +0000 Subject: [PATCH 12/12] Apply suggestions Signed-off-by: win5923 --- .../controllers/ray/raycluster_controller.go | 65 ++++++++----------- 1 file changed, 27 insertions(+), 38 deletions(-) diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 60951a5c5cf..0582a940ad0 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -1128,57 +1128,46 @@ func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, return false } - allPods := corev1.PodList{} - if err := r.List(ctx, &allPods, common.RayClusterAllPodsAssociationOptions(instance).ToListOptions()...); err != nil { - logger.Error(err, "Failed to list pods for upgrade check") + headPods := corev1.PodList{} + if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { + logger.Error(err, "Failed to list head pods for upgrade check") return false } - if len(allPods.Items) == 0 { - return false - } + if len(headPods.Items) == 1 { + expectedHeadHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) + if err != nil { + logger.Error(err, "Failed to generate head template hash") + return false + } - headHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) - if err != nil { - logger.Error(err, "Failed to generate head template hash") - return false + headPod := headPods.Items[0] + actualHash := headPod.Annotations[utils.PodTemplateHashKey] + if actualHash != "" && actualHash != expectedHeadHash { + logger.Info("Pod template has changed, will recreate all pods", "rayCluster", instance.Name) + return true + } } - workerHashMap := make(map[string]string) for _, workerGroup := range instance.Spec.WorkerGroupSpecs { - hash, err := common.GeneratePodTemplateHash(workerGroup.Template) - if err != nil { - logger.Error(err, "Failed to generate worker template hash", "groupName", workerGroup.GroupName) + workerPods := corev1.PodList{} + if err := r.List(ctx, &workerPods, common.RayClusterGroupPodsAssociationOptions(instance, workerGroup.GroupName).ToListOptions()...); err != nil { + logger.Error(err, "Failed to list worker pods for upgrade check", "groupName", workerGroup.GroupName) continue } - workerHashMap[workerGroup.GroupName] = hash - } - // Check each pod to see if its template hash matches the current spec - for _, pod := range allPods.Items { - nodeType := pod.Labels[utils.RayNodeTypeLabelKey] - actualHash := pod.Annotations[utils.PodTemplateHashKey] - - var expectedHash string - switch rayv1.RayNodeType(nodeType) { - case rayv1.HeadNode: - expectedHash = headHash - case rayv1.WorkerNode: - groupName := pod.Labels[utils.RayNodeGroupLabelKey] - var ok bool - expectedHash, ok = workerHashMap[groupName] - if !ok { - logger.Info("Worker group not found in spec, skipping pod", "pod", pod.Name, "groupName", groupName) - continue - } - default: + expectedWorkerHash, err := common.GeneratePodTemplateHash(workerGroup.Template) + if err != nil { + logger.Error(err, "Failed to generate worker template hash", "groupName", workerGroup.GroupName) continue } - if actualHash != expectedHash { - logger.Info("Pod template has changed, will recreate all pods", - "rayCluster", instance.Name) - return true + for _, pod := range workerPods.Items { + actualHash := pod.Annotations[utils.PodTemplateHashKey] + if actualHash != "" && actualHash != expectedWorkerHash { + logger.Info("Pod template has changed, will recreate all pods", "rayCluster", instance.Name) + return true + } } } return false