diff --git a/docs/reference/api.md b/docs/reference/api.md index fb6a624645e..971f6e96799 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -297,6 +297,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | +| `upgradeStrategy` _[RayClusterUpgradeStrategy](#rayclusterupgradestrategy)_ | UpgradeStrategy defines the scaling policy used when upgrading the RayCluster | | | | `authOptions` _[AuthOptions](#authoptions)_ | AuthOptions specifies the authentication options for the RayCluster. | | | | `suspend` _boolean_ | Suspend indicates whether a RayCluster should be suspended.
A suspended RayCluster will have head pods and worker pods deleted. | | | | `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayCluster.
The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.
The kuberay-operator reconciles a RayCluster which doesn't have this field at all or
the field value is the reserved string 'ray.io/kuberay-operator',
but delegates reconciling the RayCluster with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable. | | | @@ -309,6 +310,36 @@ _Appears in:_ | `workerGroupSpecs` _[WorkerGroupSpec](#workergroupspec) array_ | WorkerGroupSpecs are the specs for the worker pods | | | +#### RayClusterUpgradeStrategy + + + + + + + +_Appears in:_ +- [RayClusterSpec](#rayclusterspec) + +| Field | Description | Default | Validation | +| --- | --- | --- | --- | +| `type` _[RayClusterUpgradeType](#rayclusterupgradetype)_ | Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. | | Enum: [Recreate None]
| + + +#### RayClusterUpgradeType + +_Underlying type:_ _string_ + + + +_Validation:_ +- Enum: [Recreate None] + +_Appears in:_ +- [RayClusterUpgradeStrategy](#rayclusterupgradestrategy) + + + #### RayJob @@ -425,7 +456,7 @@ _Appears in:_ | Field | Description | Default | Validation | | --- | --- | --- | --- | -| `type` _[RayServiceUpgradeType](#rayserviceupgradetype)_ | Type represents the strategy used when upgrading the RayService. Currently supports `NewCluster` and `None`. | | | +| `type` _[RayServiceUpgradeType](#rayserviceupgradetype)_ | Type represents the strategy used when upgrading the RayService. Currently supports `NewCluster`, `NewClusterWithIncrementalUpgrade` and `None`. | | | | `clusterUpgradeOptions` _[ClusterUpgradeOptions](#clusterupgradeoptions)_ | ClusterUpgradeOptions defines the behavior of a NewClusterWithIncrementalUpgrade type.
RayServiceIncrementalUpgrade feature gate must be enabled to set ClusterUpgradeOptions. | | | diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml index 45f5406c411..575408824d9 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml @@ -4532,6 +4532,14 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + enum: + - Recreate + - None + type: string + type: object workerGroupSpecs: items: properties: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index 1f4c8432168..449c816a84e 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -4614,6 +4614,14 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + enum: + - Recreate + - None + type: string + type: object workerGroupSpecs: items: properties: diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index baec5cfe1f3..c99e2b253bf 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -4512,6 +4512,14 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + enum: + - Recreate + - None + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/apis/ray/v1/raycluster_types.go b/ray-operator/apis/ray/v1/raycluster_types.go index f67a80ba854..c0935244d32 100644 --- a/ray-operator/apis/ray/v1/raycluster_types.go +++ b/ray-operator/apis/ray/v1/raycluster_types.go @@ -11,6 +11,9 @@ import ( // RayClusterSpec defines the desired state of RayCluster type RayClusterSpec struct { + // UpgradeStrategy defines the scaling policy used when upgrading the RayCluster + // +optional + UpgradeStrategy *RayClusterUpgradeStrategy `json:"upgradeStrategy,omitempty"` // AuthOptions specifies the authentication options for the RayCluster. // +optional AuthOptions *AuthOptions `json:"authOptions,omitempty"` @@ -49,6 +52,22 @@ type RayClusterSpec struct { WorkerGroupSpecs []WorkerGroupSpec `json:"workerGroupSpecs,omitempty"` } +// +kubebuilder:validation:Enum=Recreate;None +type RayClusterUpgradeType string + +const ( + // During upgrade, Recreate strategy will delete all existing pods before creating new ones + RayClusterRecreate RayClusterUpgradeType = "Recreate" + // No new pod will be created while the strategy is set to None + RayClusterUpgradeNone RayClusterUpgradeType = "None" +) + +type RayClusterUpgradeStrategy struct { + // Type represents the strategy used when upgrading the RayCluster Pods. Currently supports `Recreate` and `None`. + // +optional + Type *RayClusterUpgradeType `json:"type,omitempty"` +} + // AuthMode describes the authentication mode for the Ray cluster. type AuthMode string diff --git a/ray-operator/apis/ray/v1/rayservice_types.go b/ray-operator/apis/ray/v1/rayservice_types.go index 55d142822db..6a47a545b05 100644 --- a/ray-operator/apis/ray/v1/rayservice_types.go +++ b/ray-operator/apis/ray/v1/rayservice_types.go @@ -24,11 +24,11 @@ type RayServiceUpgradeType string const ( // During upgrade, NewClusterWithIncrementalUpgrade strategy will create an upgraded cluster to gradually scale // and migrate traffic to using Gateway API. - NewClusterWithIncrementalUpgrade RayServiceUpgradeType = "NewClusterWithIncrementalUpgrade" + RayServiceNewClusterWithIncrementalUpgrade RayServiceUpgradeType = "NewClusterWithIncrementalUpgrade" // During upgrade, NewCluster strategy will create new upgraded cluster and switch to it when it becomes ready - NewCluster RayServiceUpgradeType = "NewCluster" + RayServiceNewCluster RayServiceUpgradeType = "NewCluster" // No new cluster will be created while the strategy is set to None - None RayServiceUpgradeType = "None" + RayServiceUpgradeNone RayServiceUpgradeType = "None" ) // These statuses should match Ray Serve's application statuses @@ -75,7 +75,7 @@ type ClusterUpgradeOptions struct { } type RayServiceUpgradeStrategy struct { - // Type represents the strategy used when upgrading the RayService. Currently supports `NewCluster` and `None`. + // Type represents the strategy used when upgrading the RayService. Currently supports `NewCluster`, `NewClusterWithIncrementalUpgrade` and `None`. // +optional Type *RayServiceUpgradeType `json:"type,omitempty"` // ClusterUpgradeOptions defines the behavior of a NewClusterWithIncrementalUpgrade type. diff --git a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go index cd710592d98..ec261a2c7d6 100644 --- a/ray-operator/apis/ray/v1/zz_generated.deepcopy.go +++ b/ray-operator/apis/ray/v1/zz_generated.deepcopy.go @@ -378,6 +378,11 @@ func (in *RayClusterList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RayClusterSpec) DeepCopyInto(out *RayClusterSpec) { *out = *in + if in.UpgradeStrategy != nil { + in, out := &in.UpgradeStrategy, &out.UpgradeStrategy + *out = new(RayClusterUpgradeStrategy) + (*in).DeepCopyInto(*out) + } if in.AuthOptions != nil { in, out := &in.AuthOptions, &out.AuthOptions *out = new(AuthOptions) @@ -480,6 +485,26 @@ func (in *RayClusterStatus) DeepCopy() *RayClusterStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RayClusterUpgradeStrategy) DeepCopyInto(out *RayClusterUpgradeStrategy) { + *out = *in + if in.Type != nil { + in, out := &in.Type, &out.Type + *out = new(RayClusterUpgradeType) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RayClusterUpgradeStrategy. +func (in *RayClusterUpgradeStrategy) DeepCopy() *RayClusterUpgradeStrategy { + if in == nil { + return nil + } + out := new(RayClusterUpgradeStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RayJob) DeepCopyInto(out *RayJob) { *out = *in diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index 45f5406c411..575408824d9 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -4532,6 +4532,14 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + enum: + - Recreate + - None + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index 1f4c8432168..449c816a84e 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -4614,6 +4614,14 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + enum: + - Recreate + - None + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index baec5cfe1f3..c99e2b253bf 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -4512,6 +4512,14 @@ spec: type: string suspend: type: boolean + upgradeStrategy: + properties: + type: + enum: + - Recreate + - None + type: string + type: object workerGroupSpecs: items: properties: diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index be4a64cdfaf..e813568e5ff 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -158,12 +158,27 @@ func configureGCSFaultTolerance(podTemplate *corev1.PodTemplateSpec, instance ra } } +func GeneratePodTemplateHash(template corev1.PodTemplateSpec) (string, error) { + return utils.GenerateJsonHash(template) +} + // DefaultHeadPodTemplate sets the config values func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, headSpec rayv1.HeadGroupSpec, podName string, headPort string) corev1.PodTemplateSpec { // TODO (Dmitri) The argument headPort is essentially unused; // headPort is passed into setMissingRayStartParams but unused there for the head pod. // To mitigate this awkwardness and reduce code redundancy, unify head and worker pod configuration logic. + + log := ctrl.LoggerFrom(ctx) + // Calculate the pod template hash before any modifications + // This ensures the hash reflects the original user-defined template for upgrade detection + templateHash := "" podTemplate := headSpec.Template + if hash, err := GeneratePodTemplateHash(podTemplate); err == nil { + templateHash = hash + } else { + log.Error(err, "Failed to generate pod template hash for head group") + } + if utils.IsDeterministicHeadPodNameEnabled() { podTemplate.Name = podName } else { @@ -173,6 +188,13 @@ func DefaultHeadPodTemplate(ctx context.Context, instance rayv1.RayCluster, head // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash + } + // Update rayStartParams with top-level Resources for head group. updateRayStartParamsResources(ctx, headSpec.RayStartParams, headSpec.Resources) @@ -296,12 +318,29 @@ func getEnableProbesInjection() bool { // DefaultWorkerPodTemplate sets the config values func DefaultWorkerPodTemplate(ctx context.Context, instance rayv1.RayCluster, workerSpec rayv1.WorkerGroupSpec, podName string, fqdnRayIP string, headPort string, replicaGrpName string, replicaIndex int, numHostIndex int) corev1.PodTemplateSpec { + log := ctrl.LoggerFrom(ctx) + podTemplate := workerSpec.Template + // Calculate the pod template hash before any modifications + // This ensures the hash reflects the original user-defined template for upgrade detection + templateHash := "" + if hash, err := GeneratePodTemplateHash(podTemplate); err == nil { + templateHash = hash + } else { + log.Error(err, "Failed to generate pod template hash for worker group", "groupName", workerSpec.GroupName) + } podTemplate.GenerateName = podName + // Pods created by RayCluster should be restricted to the namespace of the RayCluster. // This ensures privilege of KubeRay users are contained within the namespace of the RayCluster. podTemplate.ObjectMeta.Namespace = instance.Namespace + if templateHash != "" { + if podTemplate.Annotations == nil { + podTemplate.Annotations = make(map[string]string) + } + podTemplate.Annotations[utils.PodTemplateHashKey] = templateHash + } // The Ray worker should only start once the GCS server is ready. // only inject init container only when ENABLE_INIT_CONTAINER_INJECTION is true enableInitContainerInjection := getEnableInitContainerInjection() diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 76b21e47527..41b02c9bd93 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -180,6 +180,13 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance return ctrl.Result{}, nil } + if err := utils.ValidateRayClusterUpgradeOptions(instance); err != nil { + logger.Error(err, "The RayCluster UpgradeStrategy is invalid") + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterSpec), + "The RayCluster UpgradeStrategy is invalid %s/%s: %v", instance.Namespace, instance.Name, err) + return ctrl.Result{}, nil + } + if err := utils.ValidateRayClusterStatus(instance); err != nil { logger.Error(err, "The RayCluster status is invalid") r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterStatus), @@ -637,6 +644,29 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv } } + // Check if pods need to be recreated with Recreate upgradeStrategy + if r.shouldRecreatePodsForUpgrade(ctx, instance) { + logger.Info("RayCluster spec changed with Recreate upgradeStrategy, deleting all pods") + pods, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)) + if err != nil { + r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePodCollection), + "Failed deleting Pods due to spec change with Recreate upgradeStrategy for RayCluster %s/%s, %v", + instance.Namespace, instance.Name, err) + return errstd.Join(utils.ErrFailedDeleteAllPods, err) + } + for _, pod := range pods.Items { + groupName := pod.Labels[utils.RayNodeGroupLabelKey] + if groupName == utils.RayNodeHeadGroupLabelValue { + groupName = expectations.HeadGroup + } + r.rayClusterScaleExpectation.ExpectScalePod(pod.Namespace, instance.Name, groupName, pod.Name, expectations.Delete) + } + r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedPod), + "Deleted all Pods for RayCluster %s/%s due to spec change with Recreate upgradeStrategy", + instance.Namespace, instance.Name) + return nil + } + // check if all the pods exist headPods := corev1.PodList{} if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { @@ -1094,6 +1124,62 @@ func (r *RayClusterReconciler) reconcileMultiHostWorkerGroup(ctx context.Context return nil } +// shouldRecreatePodsForUpgrade checks if any pods need to be recreated based on PodTemplateSpec changes +func (r *RayClusterReconciler) shouldRecreatePodsForUpgrade(ctx context.Context, instance *rayv1.RayCluster) bool { + logger := ctrl.LoggerFrom(ctx) + + if instance.Spec.UpgradeStrategy == nil || instance.Spec.UpgradeStrategy.Type == nil { + return false + } + if *instance.Spec.UpgradeStrategy.Type != rayv1.RayClusterRecreate { + return false + } + + headPods := corev1.PodList{} + if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil { + logger.Error(err, "Failed to list head pods for upgrade check") + return false + } + + if len(headPods.Items) == 1 { + expectedHeadHash, err := common.GeneratePodTemplateHash(instance.Spec.HeadGroupSpec.Template) + if err != nil { + logger.Error(err, "Failed to generate head template hash") + return false + } + + headPod := headPods.Items[0] + actualHash := headPod.Annotations[utils.PodTemplateHashKey] + if actualHash != "" && actualHash != expectedHeadHash { + logger.Info("Pod template has changed, will recreate all pods", "rayCluster", instance.Name) + return true + } + } + + for _, workerGroup := range instance.Spec.WorkerGroupSpecs { + workerPods := corev1.PodList{} + if err := r.List(ctx, &workerPods, common.RayClusterGroupPodsAssociationOptions(instance, workerGroup.GroupName).ToListOptions()...); err != nil { + logger.Error(err, "Failed to list worker pods for upgrade check", "groupName", workerGroup.GroupName) + continue + } + + expectedWorkerHash, err := common.GeneratePodTemplateHash(workerGroup.Template) + if err != nil { + logger.Error(err, "Failed to generate worker template hash", "groupName", workerGroup.GroupName) + continue + } + + for _, pod := range workerPods.Items { + actualHash := pod.Annotations[utils.PodTemplateHashKey] + if actualHash != "" && actualHash != expectedWorkerHash { + logger.Info("Pod template has changed, will recreate all pods", "rayCluster", instance.Name) + return true + } + } + } + return false +} + // shouldDeletePod returns whether the Pod should be deleted and the reason // // @param pod: The Pod to be checked. diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index 9c1ac017023..94f7c1a5600 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -3643,3 +3643,125 @@ func TestReconcile_PodsWithAuthToken(t *testing.T) { assert.True(t, authModeEnvFound, "Auth mode env vars not found") } } + +func TestShouldRecreatePodsForUpgrade(t *testing.T) { + setupTest(t) + ctx := context.Background() + + // Calculate template hashes for matching pods + headHash, err := common.GeneratePodTemplateHash(testRayCluster.Spec.HeadGroupSpec.Template) + require.NoError(t, err, "Failed to generate head template hash") + workerHash, err := common.GeneratePodTemplateHash(testRayCluster.Spec.WorkerGroupSpecs[0].Template) + require.NoError(t, err, "Failed to generate worker template hash") + + // Helper function to create a pod with specific template hash + createPodWithHash := func(name string, nodeType rayv1.RayNodeType, groupName string, templateHash string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespaceStr, + Labels: map[string]string{ + utils.RayNodeLabelKey: "yes", + utils.RayClusterLabelKey: instanceName, + utils.RayNodeTypeLabelKey: string(nodeType), + utils.RayNodeGroupLabelKey: groupName, + }, + Annotations: map[string]string{ + utils.PodTemplateHashKey: templateHash, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "ray-head", Image: "rayproject/ray:latest"}, + }, + }, + Status: corev1.PodStatus{Phase: corev1.PodRunning}, + } + } + + tests := []struct { + name string + upgradeStrategy *rayv1.RayClusterUpgradeStrategy + pods []runtime.Object + expectedRecreate bool + }{ + { + name: "No upgrade strategy", + upgradeStrategy: nil, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Upgrade strategy type is nil", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{Type: nil}, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Upgrade strategy type is None", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeNone), + }, + pods: testPods, + expectedRecreate: false, + }, + { + name: "Recreate strategy but no pods exist", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterRecreate), + }, + pods: []runtime.Object{}, + expectedRecreate: false, + }, + { + name: "Recreate strategy with matching template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterRecreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, workerHash), + }, + expectedRecreate: false, + }, + { + name: "Recreate strategy with mismatched head template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterRecreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, "old-head-hash"), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, workerHash), + }, + expectedRecreate: true, + }, + { + name: "Recreate strategy with mismatched worker template hash", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterRecreate), + }, + pods: []runtime.Object{ + createPodWithHash("head-pod", rayv1.HeadNode, headGroupNameStr, headHash), + createPodWithHash("worker-pod", rayv1.WorkerNode, groupNameStr, "old-worker-hash"), + }, + expectedRecreate: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cluster := testRayCluster.DeepCopy() + cluster.Spec.UpgradeStrategy = tc.upgradeStrategy + + fakeClient := clientFake.NewClientBuilder().WithRuntimeObjects(tc.pods...).Build() + testRayClusterReconciler := &RayClusterReconciler{ + Client: fakeClient, + Scheme: scheme.Scheme, + Recorder: &record.FakeRecorder{}, + } + + result := testRayClusterReconciler.shouldRecreatePodsForUpgrade(ctx, cluster) + assert.Equal(t, tc.expectedRecreate, result) + }) + } +} diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index c3fa64cbb84..bee19b5f3dc 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -579,11 +579,11 @@ func isZeroDowntimeUpgradeEnabled(ctx context.Context, upgradeStrategy *rayv1.Ra upgradeType := upgradeStrategy.Type if upgradeType != nil { if features.Enabled(features.RayServiceIncrementalUpgrade) { - if *upgradeType != rayv1.NewCluster && *upgradeType != rayv1.NewClusterWithIncrementalUpgrade { - logger.Info("Zero-downtime upgrade is disabled because UpgradeStrategy.Type is not set to %s or %s.", string(rayv1.NewCluster), string(rayv1.NewClusterWithIncrementalUpgrade)) + if *upgradeType != rayv1.RayServiceNewCluster && *upgradeType != rayv1.RayServiceNewClusterWithIncrementalUpgrade { + logger.Info("Zero-downtime upgrade is disabled because UpgradeStrategy.Type is not set to %s or %s.", string(rayv1.RayServiceNewCluster), string(rayv1.RayServiceNewClusterWithIncrementalUpgrade)) return false } - } else if *upgradeType != rayv1.NewCluster { + } else if *upgradeType != rayv1.RayServiceNewCluster { logger.Info("Zero-downtime upgrade is disabled because UpgradeStrategy.Type is not set to NewCluster.") return false } diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index 39b2d800cec..649240dee39 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -81,7 +81,7 @@ func rayServiceTemplate(name string, namespace string, serveAppName string) *ray Spec: rayv1.RayServiceSpec{ ServeConfigV2: serveConfigV2, UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewCluster), + Type: ptr.To(rayv1.RayServiceNewCluster), }, RayClusterSpec: rayv1.RayClusterSpec{ RayVersion: support.GetRayVersion(), diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 4818fbd6729..ad6a5851335 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -1245,37 +1245,37 @@ func TestIsZeroDowntimeUpgradeEnabled(t *testing.T) { }, { name: "upgrade strategy is set to NewCluster", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.NewCluster)}, + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceNewCluster)}, enableZeroDowntimeEnvVar: "", expected: true, }, { name: "upgrade strategy is set to NewCluster, and env var is not set", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.NewCluster)}, + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceNewCluster)}, enableZeroDowntimeEnvVar: "true", expected: true, }, { name: "upgrade strategy is set to NewCluster, and env var is set to false", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.NewCluster)}, + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceNewCluster)}, enableZeroDowntimeEnvVar: "false", expected: true, }, { - name: "upgrade strategy is set to None, and env var is not set", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is not set", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "", expected: false, }, { - name: "upgrade strategy is set to None, and env var is set to true", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is set to true", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "true", expected: false, }, { - name: "upgrade strategy is set to None, and env var is set to false", - upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.None)}, + name: "upgrade strategy is set to RayServiceUpgradeNone, and env var is set to false", + upgradeStrategy: &rayv1.RayServiceUpgradeStrategy{Type: ptr.To(rayv1.RayServiceUpgradeNone)}, enableZeroDowntimeEnvVar: "false", expected: false, }, @@ -1405,7 +1405,7 @@ func makeIncrementalUpgradeRayService( } if withOptions { spec.UpgradeStrategy = &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ GatewayClassName: gatewayClassName, StepSizePercent: stepSizePercent, @@ -1520,7 +1520,7 @@ func TestCreateHTTPRoute(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "test-rayservice", Namespace: namespace}, Spec: rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ StepSizePercent: &stepSize, IntervalSeconds: &interval, @@ -1685,7 +1685,7 @@ func TestReconcileHTTPRoute(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "test-rayservice", Namespace: namespace}, Spec: rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ StepSizePercent: &stepSize, IntervalSeconds: &interval, @@ -1924,7 +1924,7 @@ func TestReconcileServeTargetCapacity(t *testing.T) { rayService := &rayv1.RayService{ Spec: rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ MaxSurgePercent: ptr.To(tt.maxSurgePercent), }, diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 3e0e86d1096..6980306d463 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -26,6 +26,7 @@ const ( HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete" NumWorkerGroupsKey = "ray.io/num-worker-groups" KubeRayVersion = "ray.io/kuberay-version" + PodTemplateHashKey = "ray.io/pod-template-hash" // Labels for feature RayMultihostIndexing // diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index 8c806d6d413..dab53f8092a 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -787,7 +787,7 @@ func IsIncrementalUpgradeEnabled(spec *rayv1.RayServiceSpec) bool { return false } return spec != nil && spec.UpgradeStrategy != nil && - *spec.UpgradeStrategy.Type == rayv1.NewClusterWithIncrementalUpgrade + *spec.UpgradeStrategy.Type == rayv1.RayServiceNewClusterWithIncrementalUpgrade } func GetRayServiceClusterUpgradeOptions(spec *rayv1.RayServiceSpec) *rayv1.ClusterUpgradeOptions { diff --git a/ray-operator/controllers/ray/utils/util_test.go b/ray-operator/controllers/ray/utils/util_test.go index 47afa55c85a..188f4223fb8 100644 --- a/ray-operator/controllers/ray/utils/util_test.go +++ b/ray-operator/controllers/ray/utils/util_test.go @@ -1593,7 +1593,7 @@ func TestIsIncrementalUpgradeEnabled(t *testing.T) { name: "UpgradeStrategy Type is NewClusterWithIncrementalUpgrade but feature disabled", spec: &rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), }, }, featureEnabled: false, @@ -1603,7 +1603,7 @@ func TestIsIncrementalUpgradeEnabled(t *testing.T) { name: "UpgradeStrategy Type is NewClusterWithIncrementalUpgrade and feature enabled", spec: &rayv1.RayServiceSpec{ UpgradeStrategy: &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), }, }, featureEnabled: true, diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go index 07d4591afc1..a34abeb5464 100644 --- a/ray-operator/controllers/ray/utils/validation.go +++ b/ray-operator/controllers/ray/utils/validation.go @@ -37,6 +37,21 @@ func ValidateRayClusterMetadata(metadata metav1.ObjectMeta) error { return nil } +func ValidateRayClusterUpgradeOptions(instance *rayv1.RayCluster) error { + if instance.Spec.UpgradeStrategy != nil && instance.Spec.UpgradeStrategy.Type != nil { + + if !(*instance.Spec.UpgradeStrategy.Type == rayv1.RayClusterRecreate || *instance.Spec.UpgradeStrategy.Type == rayv1.RayClusterUpgradeNone) { + return fmt.Errorf("The RayCluster spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s or %s", *instance.Spec.UpgradeStrategy.Type, rayv1.RayClusterRecreate, rayv1.RayClusterUpgradeNone) + } + + creatorCRDType := GetCRDType(instance.Labels[RayOriginatedFromCRDLabelKey]) + if creatorCRDType == RayJobCRD || creatorCRDType == RayServiceCRD { + return fmt.Errorf("upgradeStrategy cannot be set when RayCluster is created by %s", creatorCRDType) + } + } + return nil +} + // validateRayGroupResources checks for conflicting resource definitions. func validateRayGroupResources(groupName string, rayStartParams, resources map[string]string) error { hasRayStartResources := rayStartParams["num-cpus"] != "" || @@ -372,10 +387,10 @@ func ValidateRayServiceSpec(rayService *rayv1.RayService) error { // only NewClusterWithIncrementalUpgrade, NewCluster, and None are valid upgradeType if rayService.Spec.UpgradeStrategy != nil && rayService.Spec.UpgradeStrategy.Type != nil && - *rayService.Spec.UpgradeStrategy.Type != rayv1.None && - *rayService.Spec.UpgradeStrategy.Type != rayv1.NewCluster && - *rayService.Spec.UpgradeStrategy.Type != rayv1.NewClusterWithIncrementalUpgrade { - return fmt.Errorf("The RayService spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.NewClusterWithIncrementalUpgrade, rayv1.NewCluster, rayv1.None) + *rayService.Spec.UpgradeStrategy.Type != rayv1.RayServiceUpgradeNone && + *rayService.Spec.UpgradeStrategy.Type != rayv1.RayServiceNewCluster && + *rayService.Spec.UpgradeStrategy.Type != rayv1.RayServiceNewClusterWithIncrementalUpgrade { + return fmt.Errorf("The RayService spec is invalid: Spec.UpgradeStrategy.Type value %s is invalid, valid options are %s, %s, or %s", *rayService.Spec.UpgradeStrategy.Type, rayv1.RayServiceNewClusterWithIncrementalUpgrade, rayv1.RayServiceNewCluster, rayv1.RayServiceUpgradeNone) } if rayService.Spec.RayClusterDeletionDelaySeconds != nil && diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go index 43fa6af48e9..172b3f56f16 100644 --- a/ray-operator/controllers/ray/utils/validation_test.go +++ b/ray-operator/controllers/ray/utils/validation_test.go @@ -1849,7 +1849,7 @@ func TestValidateClusterUpgradeOptions(t *testing.T) { var upgradeStrategy *rayv1.RayServiceUpgradeStrategy if tt.maxSurgePercent != nil || tt.stepSizePercent != nil || tt.intervalSeconds != nil || tt.gatewayClassName != "" { upgradeStrategy = &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), ClusterUpgradeOptions: &rayv1.ClusterUpgradeOptions{ MaxSurgePercent: tt.maxSurgePercent, StepSizePercent: tt.stepSizePercent, @@ -1859,7 +1859,7 @@ func TestValidateClusterUpgradeOptions(t *testing.T) { } } else if tt.expectError { upgradeStrategy = &rayv1.RayServiceUpgradeStrategy{ - Type: ptr.To(rayv1.NewClusterWithIncrementalUpgrade), + Type: ptr.To(rayv1.RayServiceNewClusterWithIncrementalUpgrade), } } @@ -1883,6 +1883,104 @@ func TestValidateClusterUpgradeOptions(t *testing.T) { } } +func TestValidateRayClusterUpgradeOptions(t *testing.T) { + tests := []struct { + upgradeStrategy *rayv1.RayClusterUpgradeStrategy + name string + originatedFromCRD string + errorMessage string + expectError bool + }{ + { + name: "No upgrade strategy", + upgradeStrategy: nil, + expectError: false, + }, + { + name: "Upgrade strategy set None for RayCluster", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeNone), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: false, + }, + { + name: "Upgrade strategy set Recreate for RayCluster", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterRecreate), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: false, + }, + { + name: "Upgrade strategy set Recreate for RayCluster created by RayJob", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterRecreate), + }, + originatedFromCRD: string(RayJobCRD), + expectError: true, + errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayJob", + }, + { + name: "Upgrade strategy set Recreate for RayCluster created by RayService", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterRecreate), + }, + originatedFromCRD: string(RayServiceCRD), + expectError: true, + errorMessage: "upgradeStrategy cannot be set when RayCluster is created by RayService", + }, + { + name: "Invalid upgrade strategy value", + upgradeStrategy: &rayv1.RayClusterUpgradeStrategy{ + Type: ptr.To(rayv1.RayClusterUpgradeType("InvalidStrategy")), + }, + originatedFromCRD: string(RayClusterCRD), + expectError: true, + errorMessage: "Spec.UpgradeStrategy.Type value InvalidStrategy is invalid, valid options are Recreate or None", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cluster := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Namespace: "default", + Labels: map[string]string{}, + }, + Spec: rayv1.RayClusterSpec{ + UpgradeStrategy: tt.upgradeStrategy, + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "ray-head", Image: "rayproject/ray:latest"}, + }, + }, + }, + }, + }, + } + + if tt.originatedFromCRD != "" { + cluster.Labels[RayOriginatedFromCRDLabelKey] = tt.originatedFromCRD + } + + err := ValidateRayClusterUpgradeOptions(cluster) + + if tt.expectError { + require.Error(t, err, "Expected error for test case: %s", tt.name) + if tt.errorMessage != "" { + assert.Contains(t, err.Error(), tt.errorMessage, "Error message mismatch for test case: %s", tt.name) + } + } else { + require.NoError(t, err, "Unexpected error for test case: %s", tt.name) + } + }) + } +} + func TestValidateWorkerGroupIdleTimeout(t *testing.T) { tests := map[string]struct { expectedErr string diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go index 14cb592037f..144ff1812d5 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterspec.go @@ -5,16 +5,17 @@ package v1 // RayClusterSpecApplyConfiguration represents a declarative configuration of the RayClusterSpec type for use // with apply. type RayClusterSpecApplyConfiguration struct { - AuthOptions *AuthOptionsApplyConfiguration `json:"authOptions,omitempty"` - Suspend *bool `json:"suspend,omitempty"` - ManagedBy *string `json:"managedBy,omitempty"` - AutoscalerOptions *AutoscalerOptionsApplyConfiguration `json:"autoscalerOptions,omitempty"` - HeadServiceAnnotations map[string]string `json:"headServiceAnnotations,omitempty"` - EnableInTreeAutoscaling *bool `json:"enableInTreeAutoscaling,omitempty"` - GcsFaultToleranceOptions *GcsFaultToleranceOptionsApplyConfiguration `json:"gcsFaultToleranceOptions,omitempty"` - HeadGroupSpec *HeadGroupSpecApplyConfiguration `json:"headGroupSpec,omitempty"` - RayVersion *string `json:"rayVersion,omitempty"` - WorkerGroupSpecs []WorkerGroupSpecApplyConfiguration `json:"workerGroupSpecs,omitempty"` + UpgradeStrategy *RayClusterUpgradeStrategyApplyConfiguration `json:"upgradeStrategy,omitempty"` + AuthOptions *AuthOptionsApplyConfiguration `json:"authOptions,omitempty"` + Suspend *bool `json:"suspend,omitempty"` + ManagedBy *string `json:"managedBy,omitempty"` + AutoscalerOptions *AutoscalerOptionsApplyConfiguration `json:"autoscalerOptions,omitempty"` + HeadServiceAnnotations map[string]string `json:"headServiceAnnotations,omitempty"` + EnableInTreeAutoscaling *bool `json:"enableInTreeAutoscaling,omitempty"` + GcsFaultToleranceOptions *GcsFaultToleranceOptionsApplyConfiguration `json:"gcsFaultToleranceOptions,omitempty"` + HeadGroupSpec *HeadGroupSpecApplyConfiguration `json:"headGroupSpec,omitempty"` + RayVersion *string `json:"rayVersion,omitempty"` + WorkerGroupSpecs []WorkerGroupSpecApplyConfiguration `json:"workerGroupSpecs,omitempty"` } // RayClusterSpecApplyConfiguration constructs a declarative configuration of the RayClusterSpec type for use with @@ -23,6 +24,14 @@ func RayClusterSpec() *RayClusterSpecApplyConfiguration { return &RayClusterSpecApplyConfiguration{} } +// WithUpgradeStrategy sets the UpgradeStrategy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the UpgradeStrategy field is set to the value of the last call. +func (b *RayClusterSpecApplyConfiguration) WithUpgradeStrategy(value *RayClusterUpgradeStrategyApplyConfiguration) *RayClusterSpecApplyConfiguration { + b.UpgradeStrategy = value + return b +} + // WithAuthOptions sets the AuthOptions field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the AuthOptions field is set to the value of the last call. diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go new file mode 100644 index 00000000000..f1a1aa39064 --- /dev/null +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayclusterupgradestrategy.go @@ -0,0 +1,27 @@ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1 + +import ( + rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" +) + +// RayClusterUpgradeStrategyApplyConfiguration represents a declarative configuration of the RayClusterUpgradeStrategy type for use +// with apply. +type RayClusterUpgradeStrategyApplyConfiguration struct { + Type *rayv1.RayClusterUpgradeType `json:"type,omitempty"` +} + +// RayClusterUpgradeStrategyApplyConfiguration constructs a declarative configuration of the RayClusterUpgradeStrategy type for use with +// apply. +func RayClusterUpgradeStrategy() *RayClusterUpgradeStrategyApplyConfiguration { + return &RayClusterUpgradeStrategyApplyConfiguration{} +} + +// WithType sets the Type field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Type field is set to the value of the last call. +func (b *RayClusterUpgradeStrategyApplyConfiguration) WithType(value rayv1.RayClusterUpgradeType) *RayClusterUpgradeStrategyApplyConfiguration { + b.Type = &value + return b +} diff --git a/ray-operator/pkg/client/applyconfiguration/utils.go b/ray-operator/pkg/client/applyconfiguration/utils.go index 9ad6513e0c8..16a92835cf5 100644 --- a/ray-operator/pkg/client/applyconfiguration/utils.go +++ b/ray-operator/pkg/client/applyconfiguration/utils.go @@ -44,6 +44,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &rayv1.RayClusterSpecApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayClusterStatus"): return &rayv1.RayClusterStatusApplyConfiguration{} + case v1.SchemeGroupVersion.WithKind("RayClusterUpgradeStrategy"): + return &rayv1.RayClusterUpgradeStrategyApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJob"): return &rayv1.RayJobApplyConfiguration{} case v1.SchemeGroupVersion.WithKind("RayJobSpec"): diff --git a/ray-operator/test/e2e/raycluster_test.go b/ray-operator/test/e2e/raycluster_test.go index 3c26221c46b..66cb7bd7584 100644 --- a/ray-operator/test/e2e/raycluster_test.go +++ b/ray-operator/test/e2e/raycluster_test.go @@ -208,3 +208,65 @@ func TestRayClusterScalingDown(t *testing.T) { g.Expect(err).NotTo(HaveOccurred(), "Failed to remove finalizer from pod %s/%s", namespace.Name, pod.Name) } } + +func TestRayClusterUpgradeStrategy(t *testing.T) { + test := With(t) + g := NewWithT(t) + + namespace := test.NewTestNamespace() + + rayClusterAC := rayv1ac.RayCluster("raycluster-upgrade-recreate", namespace.Name).WithSpec(NewRayClusterSpec()) + rayClusterAC.Spec.UpgradeStrategy = rayv1ac.RayClusterUpgradeStrategy().WithType(rayv1.RayClusterRecreate) + + rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Created RayCluster %s/%s successfully", namespace.Name, rayCluster.Name) + + LogWithTimestamp(test.T(), "Waiting for RayCluster %s/%s to become ready", namespace.Name, rayCluster.Name) + g.Eventually(RayCluster(test, rayCluster.Namespace, rayCluster.Name), TestTimeoutMedium). + Should(WithTransform(RayClusterState, Equal(rayv1.Ready))) + + headPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + initialHeadPodName := headPod.Name + initialHeadPodHash := headPod.Annotations[utils.PodTemplateHashKey] + g.Expect(initialHeadPodHash).NotTo(BeEmpty(), "Initial head pod should have template hash annotation") + + workerPods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(workerPods).To(HaveLen(1)) + initialWorkerPodHash := workerPods[0].Annotations[utils.PodTemplateHashKey] + g.Expect(initialWorkerPodHash).NotTo(BeEmpty(), "Initial worker pod should have template hash annotation") + + // Change pod template spec to trigger Recreate upgrade + rayClusterAC.Spec.HeadGroupSpec.Template. + WithAnnotations(map[string]string{"upgrade-trigger": "test"}) + rayClusterAC.Spec.WorkerGroupSpecs[0].Template. + WithAnnotations(map[string]string{"upgrade-trigger": "test"}) + rayCluster, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + LogWithTimestamp(test.T(), "Updated RayCluster pod template to trigger Recreate upgrade") + + LogWithTimestamp(test.T(), "Waiting for new head pod to be running after recreate") + g.Eventually(func() bool { + newHeadPod, err := GetHeadPod(test, rayCluster) + if err != nil { + return false + } + return newHeadPod.Name != initialHeadPodName && newHeadPod.Status.Phase == corev1.PodRunning + }, TestTimeoutMedium).Should(BeTrue()) + + newHeadPod, err := GetHeadPod(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(newHeadPod.Name).NotTo(Equal(initialHeadPodName)) + newHeadPodHash := newHeadPod.Annotations[utils.PodTemplateHashKey] + g.Expect(newHeadPodHash).NotTo(BeEmpty(), "New head pod should have template hash annotation") + g.Expect(newHeadPodHash).NotTo(Equal(initialHeadPodHash), "New head pod should have different template hash after upgrade") + + newWorkerPods, err := GetWorkerPods(test, rayCluster) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(newWorkerPods).To(HaveLen(1)) + newWorkerPodHash := newWorkerPods[0].Annotations[utils.PodTemplateHashKey] + g.Expect(newWorkerPodHash).NotTo(BeEmpty(), "New worker pod should have template hash annotation") + g.Expect(newWorkerPodHash).NotTo(Equal(initialWorkerPodHash), "New worker pod should have different template hash after upgrade") +} diff --git a/ray-operator/test/e2eincrementalupgrade/support.go b/ray-operator/test/e2eincrementalupgrade/support.go index b5e6293f491..58002f25a9b 100644 --- a/ray-operator/test/e2eincrementalupgrade/support.go +++ b/ray-operator/test/e2eincrementalupgrade/support.go @@ -43,7 +43,7 @@ func IncrementalUpgradeRayServiceApplyConfiguration( ) *rayv1ac.RayServiceSpecApplyConfiguration { return rayv1ac.RayServiceSpec(). WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy(). - WithType(rayv1.NewClusterWithIncrementalUpgrade). + WithType(rayv1.RayServiceNewClusterWithIncrementalUpgrade). WithClusterUpgradeOptions( rayv1ac.ClusterUpgradeOptions(). WithGatewayClassName("istio"). diff --git a/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go b/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go index e38da613bda..461d1c270e8 100644 --- a/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go +++ b/ray-operator/test/e2erayservice/rayservice_in_place_update_test.go @@ -182,7 +182,7 @@ func TestUpdateServeConfigAndRayClusterSpecWithUpgradeDisabled(t *testing.T) { rayServiceName := "rayservice-sample" rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).WithSpec(RayServiceSampleYamlApplyConfiguration(). - WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy().WithType(rayv1.None))) + WithUpgradeStrategy(rayv1ac.RayServiceUpgradeStrategy().WithType(rayv1.RayServiceUpgradeNone))) rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) g.Expect(err).NotTo(HaveOccurred())