From e0576ce15b282cdca2b1257668628194b5d218ca Mon Sep 17 00:00:00 2001 From: xueli Date: Wed, 11 Feb 2026 13:22:54 +0800 Subject: [PATCH] fix: fixed the targetCluster cannot be set per event issue in maestro client transportation --- internal/executor/resource_executor.go | 9 +- internal/k8s_client/apply.go | 6 +- internal/k8s_client/client.go | 5 +- internal/k8s_client/discovery.go | 5 +- internal/k8s_client/mock.go | 5 +- internal/maestro_client/client.go | 171 +++++++++++------- internal/manifest/manifestwork.go | 48 +++++ internal/manifest/manifestwork_test.go | 158 ++++++++++++++++ internal/transport_client/interface.go | 19 +- internal/transport_client/types.go | 12 ++ .../executor/executor_k8s_integration_test.go | 10 +- .../k8s_client/client_integration_test.go | 20 +- 12 files changed, 366 insertions(+), 102 deletions(-) create mode 100644 internal/manifest/manifestwork.go create mode 100644 internal/manifest/manifestwork_test.go diff --git a/internal/executor/resource_executor.go b/internal/executor/resource_executor.go index 2ace77d..b20c389 100644 --- a/internal/executor/resource_executor.go +++ b/internal/executor/resource_executor.go @@ -3,7 +3,6 @@ package executor import ( "context" "fmt" - "strings" "github.com/mitchellh/copystructure" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/config_loader" @@ -114,7 +113,7 @@ func (re *ResourceExecutor) applyResource(ctx context.Context, resource config_l } else { // No Discovery config - lookup by name from manifest re.log.Debugf(ctx, "Looking up existing resource by name...") - existingResource, err = re.client.GetResource(ctx, gvk, resourceManifest.GetNamespace(), resourceManifest.GetName()) + existingResource, err = re.client.GetResource(ctx, gvk, resourceManifest.GetNamespace(), resourceManifest.GetName(), nil) } // Fail fast on any error except NotFound (which means resource doesn't exist yet) @@ -180,7 +179,7 @@ func (re *ResourceExecutor) applyResource(ctx context.Context, resource config_l successCtx := logger.WithK8sResult(ctx, "SUCCESS") re.log.Infof(successCtx, "Resource[%s] processed: operation=%s reason=%s", - resource.Name, strings.ToUpper(string(result.Operation)), result.OperationReason) + resource.Name, result.Operation, result.OperationReason) // Store resource in execution context if result.Resource != nil { @@ -269,7 +268,7 @@ func (re *ResourceExecutor) discoverExistingResource(ctx context.Context, gvk sc if err != nil { return nil, fmt.Errorf("failed to render byName template: %w", err) } - return re.client.GetResource(ctx, gvk, namespace, name) + return re.client.GetResource(ctx, gvk, namespace, name, nil) } // Discover by label selector @@ -295,7 +294,7 @@ func (re *ResourceExecutor) discoverExistingResource(ctx context.Context, gvk sc LabelSelector: labelSelector, } - list, err := re.client.DiscoverResources(ctx, gvk, discoveryConfig) + list, err := re.client.DiscoverResources(ctx, gvk, discoveryConfig, nil) if err != nil { return nil, err } diff --git a/internal/k8s_client/apply.go b/internal/k8s_client/apply.go index 592b862..2396874 100644 --- a/internal/k8s_client/apply.go +++ b/internal/k8s_client/apply.go @@ -95,7 +95,7 @@ func (c *Client) ApplyResource( gvk := newManifest.GroupVersionKind() name := newManifest.GetName() - c.log.Infof(ctx, "ApplyResource %s/%s: operation=%s reason=%s", + c.log.Debugf(ctx, "ApplyResource %s/%s: operation=%s reason=%s", gvk.Kind, name, result.Operation, result.Reason) // Execute the operation @@ -189,7 +189,7 @@ func (c *Client) ApplyResources( result.Results = append(result.Results, resourceResult) result.SuccessCount++ - c.log.Infof(ctx, "Applied resource %s: operation=%s", r.Name, applyResult.Operation) + c.log.Debugf(ctx, "Applied resource %s: operation=%s reason=%s", r.Name, applyResult.Operation, applyResult.Reason) } c.log.Infof(ctx, "Applied %d resources successfully", result.SuccessCount) @@ -243,7 +243,7 @@ func (c *Client) waitForDeletion( c.log.Warnf(ctx, "Context cancelled/timed out while waiting for deletion of %s/%s", gvk.Kind, name) return fmt.Errorf("context cancelled while waiting for resource deletion: %w", ctx.Err()) case <-ticker.C: - _, err := c.GetResource(ctx, gvk, namespace, name) + _, err := c.GetResource(ctx, gvk, namespace, name, nil) if err != nil { // NotFound means the resource is deleted - this is success if apierrors.IsNotFound(err) { diff --git a/internal/k8s_client/client.go b/internal/k8s_client/client.go index d986cde..5b6d613 100644 --- a/internal/k8s_client/client.go +++ b/internal/k8s_client/client.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/transport_client" apperrors "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/errors" "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/logger" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -141,7 +142,7 @@ func (c *Client) CreateResource(ctx context.Context, obj *unstructured.Unstructu } // GetResource retrieves a specific Kubernetes resource by GVK, namespace, and name -func (c *Client) GetResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string) (*unstructured.Unstructured, error) { +func (c *Client) GetResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string, _ transport_client.TransportContext) (*unstructured.Unstructured, error) { c.log.Infof(ctx, "Getting resource: %s/%s (namespace: %s)", gvk.Kind, name, namespace) obj := &unstructured.Unstructured{} @@ -357,5 +358,5 @@ func (c *Client) PatchResource(ctx context.Context, gvk schema.GroupVersionKind, c.log.Infof(ctx, "Successfully patched resource: %s/%s", gvk.Kind, name) // Get the updated resource to return - return c.GetResource(ctx, gvk, namespace, name) + return c.GetResource(ctx, gvk, namespace, name, nil) } diff --git a/internal/k8s_client/discovery.go b/internal/k8s_client/discovery.go index b1a191e..ee01880 100644 --- a/internal/k8s_client/discovery.go +++ b/internal/k8s_client/discovery.go @@ -4,6 +4,7 @@ import ( "context" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/manifest" + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/transport_client" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -29,7 +30,7 @@ var BuildLabelSelector = manifest.BuildLabelSelector // LabelSelector: "app=myapp", // } // list, err := client.DiscoverResources(ctx, gvk, discovery) -func (c *Client) DiscoverResources(ctx context.Context, gvk schema.GroupVersionKind, discovery manifest.Discovery) (*unstructured.UnstructuredList, error) { +func (c *Client) DiscoverResources(ctx context.Context, gvk schema.GroupVersionKind, discovery manifest.Discovery, _ transport_client.TransportContext) (*unstructured.UnstructuredList, error) { list := &unstructured.UnstructuredList{} list.SetGroupVersionKind(gvk) if discovery == nil { @@ -41,7 +42,7 @@ func (c *Client) DiscoverResources(ctx context.Context, gvk schema.GroupVersionK c.log.Infof(ctx, "Discovering single resource: %s/%s (namespace: %s)", gvk.Kind, discovery.GetName(), discovery.GetNamespace()) - obj, err := c.GetResource(ctx, gvk, discovery.GetNamespace(), discovery.GetName()) + obj, err := c.GetResource(ctx, gvk, discovery.GetNamespace(), discovery.GetName(), nil) if err != nil { return list, err } diff --git a/internal/k8s_client/mock.go b/internal/k8s_client/mock.go index 4b8ad96..4531a10 100644 --- a/internal/k8s_client/mock.go +++ b/internal/k8s_client/mock.go @@ -4,6 +4,7 @@ import ( "context" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/manifest" + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/transport_client" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" @@ -40,7 +41,7 @@ func NewMockK8sClient() *MockK8sClient { // GetResource implements K8sClient.GetResource // Returns a NotFound error when the resource doesn't exist, matching real K8s client behavior. -func (m *MockK8sClient) GetResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string) (*unstructured.Unstructured, error) { +func (m *MockK8sClient) GetResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string, _ transport_client.TransportContext) (*unstructured.Unstructured, error) { // Check explicit error override first if m.GetResourceError != nil { return nil, m.GetResourceError @@ -149,7 +150,7 @@ func (m *MockK8sClient) ApplyResources(ctx context.Context, resources []Resource } // DiscoverResources implements K8sClient.DiscoverResources -func (m *MockK8sClient) DiscoverResources(ctx context.Context, gvk schema.GroupVersionKind, discovery manifest.Discovery) (*unstructured.UnstructuredList, error) { +func (m *MockK8sClient) DiscoverResources(ctx context.Context, gvk schema.GroupVersionKind, discovery manifest.Discovery, _ transport_client.TransportContext) (*unstructured.UnstructuredList, error) { if m.DiscoverError != nil { return nil, m.DiscoverError } diff --git a/internal/maestro_client/client.go b/internal/maestro_client/client.go index 59add93..288f72b 100644 --- a/internal/maestro_client/client.go +++ b/internal/maestro_client/client.go @@ -14,7 +14,6 @@ import ( "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/manifest" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/transport_client" - "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/constants" apperrors "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/errors" "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/logger" "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/version" @@ -85,14 +84,6 @@ type Config struct { HTTPTimeout time.Duration // ServerHealthinessTimeout is the timeout for gRPC server health checks (default: 20s) ServerHealthinessTimeout time.Duration - - // TransportClient configuration for implementing transport_client.TransportClient - // ConsumerName is the target cluster name (Maestro consumer) - required for TransportClient - ConsumerName string - // WorkName is the name of the ManifestWork to manage - required for TransportClient - WorkName string - // WorkLabels are optional labels to add to the ManifestWork - WorkLabels map[string]string } // NewMaestroClient creates a new Maestro client using the official Maestro client pattern @@ -414,14 +405,30 @@ func (c *Client) SourceID() string { return c.config.SourceID } -// ConsumerName returns the target cluster name -func (c *Client) ConsumerName() string { - return c.config.ConsumerName +// TransportContext carries per-request routing information for the Maestro transport backend. +// Pass this as the TransportContext (any) in ResourceToApply.Target or method parameters. +type TransportContext struct { + // ConsumerName is the target cluster name (Maestro consumer). + // Required for all Maestro operations. + ConsumerName string + + // ManifestWork is the ManifestWork template providing metadata (name, labels, annotations). + // The template's spec.workload.manifests will be replaced with the actual resources. + // Required for ApplyResources; ignored by GetResource/DiscoverResources. + ManifestWork *workv1.ManifestWork } -// WorkName returns the ManifestWork name -func (c *Client) WorkName() string { - return c.config.WorkName +// resolveTransportContext extracts the maestro TransportContext from the generic transport context. +// Returns nil if target is nil or wrong type. +func (c *Client) resolveTransportContext(target transport_client.TransportContext) *TransportContext { + if target == nil { + return nil + } + tc, ok := target.(*TransportContext) + if !ok { + return nil + } + return tc } // ============================================================================= @@ -433,6 +440,7 @@ var _ transport_client.TransportClient = (*Client)(nil) // ApplyResources applies multiple resources by bundling them into a ManifestWork. // All resources are stored in a single ManifestWork for the target cluster. +// Requires a *maestro_client.TransportContext with ConsumerName and ManifestWork template. func (c *Client) ApplyResources( ctx context.Context, resources []transport_client.ResourceToApply, @@ -445,17 +453,32 @@ func (c *Client) ApplyResources( return result, nil } - // Build ManifestWork from resources - work, err := c.buildManifestWork(resources) + // Resolve maestro transport context from first resource + transportCtx := c.resolveTransportContext(resources[0].Target) + if transportCtx == nil { + return nil, fmt.Errorf("maestro TransportContext is required: set ResourceToApply.Target to *maestro_client.TransportContext") + } + + consumerName := transportCtx.ConsumerName + if consumerName == "" { + return nil, fmt.Errorf("consumer name (target cluster) is required: set TransportContext.ConsumerName") + } + + if transportCtx.ManifestWork == nil { + return nil, fmt.Errorf("ManifestWork template is required: set TransportContext.ManifestWork") + } + + // Build ManifestWork from template and resources + work, err := c.buildManifestWork(transportCtx.ManifestWork, resources, consumerName) if err != nil { return nil, fmt.Errorf("failed to build ManifestWork: %w", err) } c.log.Infof(ctx, "Applying %d resources via ManifestWork %s/%s", - len(resources), c.config.ConsumerName, c.config.WorkName) + len(resources), consumerName, work.Name) // Apply the ManifestWork (create or update) - appliedWork, err := c.ApplyManifestWork(ctx, c.config.ConsumerName, work) + appliedWork, err := c.ApplyManifestWork(ctx, consumerName, work) if err != nil { // Convert to result with error for _, r := range resources { @@ -485,7 +508,7 @@ func (c *Client) ApplyResources( ApplyResult: &transport_client.ApplyResult{ Resource: r.Manifest, Operation: op, - Reason: fmt.Sprintf("applied via ManifestWork %s", c.config.WorkName), + Reason: fmt.Sprintf("applied via ManifestWork %s/%s", consumerName, work.Name), }, } result.Results = append(result.Results, resourceResult) @@ -496,33 +519,42 @@ func (c *Client) ApplyResources( return result, nil } -// GetResource retrieves a resource from the ManifestWork's manifest list. +// GetResource retrieves a resource by searching all ManifestWorks for the target consumer. func (c *Client) GetResource( ctx context.Context, gvk schema.GroupVersionKind, namespace, name string, + target transport_client.TransportContext, ) (*unstructured.Unstructured, error) { - work, err := c.GetManifestWork(ctx, c.config.ConsumerName, c.config.WorkName) + transportCtx := c.resolveTransportContext(target) + consumerName := "" + if transportCtx != nil { + consumerName = transportCtx.ConsumerName + } + if consumerName == "" { + gr := schema.GroupResource{Group: gvk.Group, Resource: gvk.Kind} + return nil, apierrors.NewNotFound(gr, name) + } + + // List all ManifestWorks for this consumer and search across them + workList, err := c.ListManifestWorks(ctx, consumerName, "") if err != nil { - if apierrors.IsNotFound(err) { - gr := schema.GroupResource{Group: gvk.Group, Resource: gvk.Kind} - return nil, apierrors.NewNotFound(gr, name) - } return nil, err } - // Search for the resource in the manifests - for _, m := range work.Spec.Workload.Manifests { - obj, err := manifestToUnstructured(m) - if err != nil { - continue - } + for i := range workList.Items { + for _, m := range workList.Items[i].Spec.Workload.Manifests { + obj, err := manifestToUnstructured(m) + if err != nil { + continue + } - if obj.GetKind() == gvk.Kind && - obj.GetAPIVersion() == gvk.GroupVersion().String() && - obj.GetNamespace() == namespace && - obj.GetName() == name { - return obj, nil + if obj.GetKind() == gvk.Kind && + obj.GetAPIVersion() == gvk.GroupVersion().String() && + obj.GetNamespace() == namespace && + obj.GetName() == name { + return obj, nil + } } } @@ -530,30 +562,54 @@ func (c *Client) GetResource( return nil, apierrors.NewNotFound(gr, name) } -// DiscoverResources discovers resources within the ManifestWork that match the discovery criteria. +// DiscoverResources discovers resources by searching all ManifestWorks for the target consumer. func (c *Client) DiscoverResources( ctx context.Context, gvk schema.GroupVersionKind, discovery manifest.Discovery, + target transport_client.TransportContext, ) (*unstructured.UnstructuredList, error) { - return c.DiscoverManifest(ctx, c.config.ConsumerName, c.config.WorkName, discovery) -} + transportCtx := c.resolveTransportContext(target) + consumerName := "" + if transportCtx != nil { + consumerName = transportCtx.ConsumerName + } + if consumerName == "" { + return &unstructured.UnstructuredList{}, nil + } -// buildManifestWork creates a ManifestWork containing all resources as manifests. -func (c *Client) buildManifestWork(resources []transport_client.ResourceToApply) (*workv1.ManifestWork, error) { - manifests := make([]workv1.Manifest, 0, len(resources)) + // List all ManifestWorks for this consumer and search across them + workList, err := c.ListManifestWorks(ctx, consumerName, "") + if err != nil { + return nil, err + } - // Find the highest generation among all resources - var maxGeneration int64 - for _, r := range resources { - gen := manifest.GetGenerationFromUnstructured(r.Manifest) - if gen > maxGeneration { - maxGeneration = gen + allItems := &unstructured.UnstructuredList{} + for i := range workList.Items { + list, err := c.DiscoverManifestInWork(&workList.Items[i], discovery) + if err != nil { + continue } + allItems.Items = append(allItems.Items, list.Items...) } + return allItems, nil +} + +// buildManifestWork creates a ManifestWork from the template, populating spec.workload.manifests +// with the actual resources. The template provides metadata (name, labels, annotations). +// The namespace is set to the consumer name (target cluster). +func (c *Client) buildManifestWork(template *workv1.ManifestWork, resources []transport_client.ResourceToApply, consumerName string) (*workv1.ManifestWork, error) { + // DeepCopy the template so we don't mutate the original + work := template.DeepCopy() + work.Namespace = consumerName + // Convert each resource to a Manifest + manifests := make([]workv1.Manifest, 0, len(resources)) for _, r := range resources { + if r.Manifest == nil { + continue // Skip resources with no manifest. It means manifests already defined in the manifestWork template + } raw, err := json.Marshal(r.Manifest.Object) if err != nil { return nil, fmt.Errorf("failed to marshal manifest %s: %w", r.Name, err) @@ -563,22 +619,11 @@ func (c *Client) buildManifestWork(resources []transport_client.ResourceToApply) }) } - // Build the ManifestWork - work := &workv1.ManifestWork{} - work.Name = c.config.WorkName - work.Namespace = c.config.ConsumerName - - if c.config.WorkLabels != nil { - work.Labels = c.config.WorkLabels + // Replace the template's manifests with actual resources (only if there are any) + if len(manifests) > 0 { + work.Spec.Workload.Manifests = manifests } - if work.Annotations == nil { - work.Annotations = make(map[string]string) - } - work.Annotations[constants.AnnotationGeneration] = fmt.Sprintf("%d", maxGeneration) - - work.Spec.Workload.Manifests = manifests - return work, nil } diff --git a/internal/manifest/manifestwork.go b/internal/manifest/manifestwork.go new file mode 100644 index 0000000..8f5a7fe --- /dev/null +++ b/internal/manifest/manifestwork.go @@ -0,0 +1,48 @@ +package manifest + +import ( + "encoding/json" + "fmt" + "os" + + workv1 "open-cluster-management.io/api/work/v1" + "sigs.k8s.io/yaml" +) + +// ParseManifestWork parses raw bytes (JSON or YAML) into a ManifestWork. +// It first tries JSON; if that fails, it converts from YAML to JSON then parses. +func ParseManifestWork(data []byte) (*workv1.ManifestWork, error) { + work := &workv1.ManifestWork{} + + // Try JSON first + if err := json.Unmarshal(data, work); err == nil { + return work, nil + } + + // Fall back to YAML → JSON → ManifestWork + jsonData, err := yaml.YAMLToJSON(data) + if err != nil { + return nil, fmt.Errorf("failed to convert ManifestWork YAML to JSON: %w", err) + } + + if err := json.Unmarshal(jsonData, work); err != nil { + return nil, fmt.Errorf("failed to parse ManifestWork: %w", err) + } + + return work, nil +} + +// LoadManifestWork reads a ManifestWork from a file path (JSON or YAML). +func LoadManifestWork(path string) (*workv1.ManifestWork, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read ManifestWork file %s: %w", path, err) + } + + work, err := ParseManifestWork(data) + if err != nil { + return nil, fmt.Errorf("failed to parse ManifestWork from %s: %w", path, err) + } + + return work, nil +} diff --git a/internal/manifest/manifestwork_test.go b/internal/manifest/manifestwork_test.go new file mode 100644 index 0000000..1c6d940 --- /dev/null +++ b/internal/manifest/manifestwork_test.go @@ -0,0 +1,158 @@ +package manifest + +import ( + "os" + "path/filepath" + "testing" +) + +func TestParseManifestWork_JSON(t *testing.T) { + jsonData := []byte(`{ + "apiVersion": "work.open-cluster-management.io/v1", + "kind": "ManifestWork", + "metadata": { + "name": "test-work", + "namespace": "cluster1", + "labels": { + "app": "test" + }, + "annotations": { + "hyperfleet.io/generation": "1" + } + }, + "spec": { + "workload": { + "manifests": [ + { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": "test-cm", + "namespace": "default" + } + } + ] + } + } + }`) + + work, err := ParseManifestWork(jsonData) + if err != nil { + t.Fatalf("ParseManifestWork failed for JSON: %v", err) + } + + if work.Name != "test-work" { + t.Errorf("expected name 'test-work', got %q", work.Name) + } + if work.Namespace != "cluster1" { + t.Errorf("expected namespace 'cluster1', got %q", work.Namespace) + } + if work.Labels["app"] != "test" { + t.Errorf("expected label app=test, got %v", work.Labels) + } + if work.Annotations["hyperfleet.io/generation"] != "1" { + t.Errorf("expected generation annotation '1', got %v", work.Annotations) + } + if len(work.Spec.Workload.Manifests) != 1 { + t.Errorf("expected 1 manifest, got %d", len(work.Spec.Workload.Manifests)) + } +} + +func TestParseManifestWork_YAML(t *testing.T) { + yamlData := []byte(`apiVersion: work.open-cluster-management.io/v1 +kind: ManifestWork +metadata: + name: test-work-yaml + namespace: cluster2 + labels: + env: staging + annotations: + hyperfleet.io/generation: "2" +spec: + workload: + manifests: + - apiVersion: v1 + kind: ConfigMap + metadata: + name: yaml-cm + namespace: default + data: + key1: value1 +`) + + work, err := ParseManifestWork(yamlData) + if err != nil { + t.Fatalf("ParseManifestWork failed for YAML: %v", err) + } + + if work.Name != "test-work-yaml" { + t.Errorf("expected name 'test-work-yaml', got %q", work.Name) + } + if work.Namespace != "cluster2" { + t.Errorf("expected namespace 'cluster2', got %q", work.Namespace) + } + if work.Labels["env"] != "staging" { + t.Errorf("expected label env=staging, got %v", work.Labels) + } + if work.Annotations["hyperfleet.io/generation"] != "2" { + t.Errorf("expected generation annotation '2', got %v", work.Annotations) + } + if len(work.Spec.Workload.Manifests) != 1 { + t.Errorf("expected 1 manifest, got %d", len(work.Spec.Workload.Manifests)) + } +} + +func TestParseManifestWork_InvalidData(t *testing.T) { + _, err := ParseManifestWork([]byte("not valid json or yaml {{{")) + if err == nil { + t.Error("expected error for invalid data, got nil") + } +} + +func TestParseManifestWork_EmptyData(t *testing.T) { + work, err := ParseManifestWork([]byte("{}")) + if err != nil { + t.Fatalf("ParseManifestWork failed for empty JSON: %v", err) + } + if work.Name != "" { + t.Errorf("expected empty name, got %q", work.Name) + } +} + +func TestLoadManifestWork(t *testing.T) { + // Write a temporary YAML file + tmpDir := t.TempDir() + yamlPath := filepath.Join(tmpDir, "work.yaml") + yamlData := []byte(`apiVersion: work.open-cluster-management.io/v1 +kind: ManifestWork +metadata: + name: file-loaded-work + annotations: + hyperfleet.io/generation: "3" +spec: + workload: + manifests: [] +`) + if err := os.WriteFile(yamlPath, yamlData, 0644); err != nil { + t.Fatalf("failed to write temp file: %v", err) + } + + work, err := LoadManifestWork(yamlPath) + if err != nil { + t.Fatalf("LoadManifestWork failed: %v", err) + } + + if work.Name != "file-loaded-work" { + t.Errorf("expected name 'file-loaded-work', got %q", work.Name) + } + if work.Annotations["hyperfleet.io/generation"] != "3" { + t.Errorf("expected generation annotation '3', got %v", work.Annotations) + } +} + +func TestLoadManifestWork_FileNotFound(t *testing.T) { + _, err := LoadManifestWork("/nonexistent/path/work.yaml") + if err == nil { + t.Error("expected error for missing file, got nil") + } +} diff --git a/internal/transport_client/interface.go b/internal/transport_client/interface.go index 5094d1d..bd8a76f 100644 --- a/internal/transport_client/interface.go +++ b/internal/transport_client/interface.go @@ -19,6 +19,10 @@ import ( // - Create if resource doesn't exist // - Update if generation changed // - Skip if generation matches (idempotent) +// +// Methods accept an optional TransportContext (any) for per-request routing: +// - k8s_client ignores it (pass nil) +// - maestro_client expects its own concrete context type type TransportClient interface { // ApplyResources applies multiple Kubernetes resources. // Implementation details vary by backend: @@ -26,23 +30,18 @@ type TransportClient interface { // - maestro_client: bundles all resources into a single ManifestWork // // Each resource in the batch can have its own ApplyOptions (e.g., RecreateOnChange). + // The Target field in ResourceToApply provides per-request routing context. // Results are returned for all processed resources. - // - // Parameters: - // - ctx: Context for the operation - // - resources: List of resources to apply (must have generation annotations) - // - // Returns: - // - ApplyResourcesResult containing results for all processed resources - // - Error if any resource fails (results will contain partial results up to failure) ApplyResources(ctx context.Context, resources []ResourceToApply) (*ApplyResourcesResult, error) // GetResource retrieves a single Kubernetes resource by GVK, namespace, and name. + // The target parameter provides per-request routing context (nil for k8s_client). // Returns the resource or an error if not found. - GetResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string) (*unstructured.Unstructured, error) + GetResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string, target TransportContext) (*unstructured.Unstructured, error) // DiscoverResources discovers Kubernetes resources based on the Discovery configuration. + // The target parameter provides per-request routing context (nil for k8s_client). // If Discovery.IsSingleResource() is true, it fetches a single resource by name. // Otherwise, it lists resources matching the label selector. - DiscoverResources(ctx context.Context, gvk schema.GroupVersionKind, discovery manifest.Discovery) (*unstructured.UnstructuredList, error) + DiscoverResources(ctx context.Context, gvk schema.GroupVersionKind, discovery manifest.Discovery, target TransportContext) (*unstructured.UnstructuredList, error) } diff --git a/internal/transport_client/types.go b/internal/transport_client/types.go index 2b9afb0..8bd0a8f 100644 --- a/internal/transport_client/types.go +++ b/internal/transport_client/types.go @@ -26,6 +26,14 @@ type ApplyResult struct { Reason string } +// TransportContext carries per-request routing information for the transport backend. +// Each transport client defines its own concrete context type and type-asserts: +// - k8s_client: ignores it (nil) +// - maestro_client: expects *maestro_client.TransportContext with ConsumerName, etc. +// +// This is typed as `any` to allow each backend to define its own context shape. +type TransportContext = any + // ResourceToApply represents a single resource to be applied in a batch operation. type ResourceToApply struct { // Name is a logical name for the resource (used in results for identification) @@ -39,6 +47,10 @@ type ResourceToApply struct { // Options for this apply operation (optional, defaults to no special options) Options *ApplyOptions + + // Target provides per-request routing context for the transport backend. + // Each backend defines its own concrete type. Pass nil if not needed (e.g., k8s_client). + Target TransportContext } // ResourceApplyResult contains the result for a single resource in a batch operation. diff --git a/test/integration/executor/executor_k8s_integration_test.go b/test/integration/executor/executor_k8s_integration_test.go index a43fc8f..abbd6d3 100644 --- a/test/integration/executor/executor_k8s_integration_test.go +++ b/test/integration/executor/executor_k8s_integration_test.go @@ -385,7 +385,7 @@ func TestExecutor_K8s_CreateResources(t *testing.T) { // Verify ConfigMap exists in K8s cmGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} cmName := fmt.Sprintf("cluster-config-%s", clusterId) - cm, err := k8sEnv.Client.GetResource(ctx, cmGVK, testNamespace, cmName) + cm, err := k8sEnv.Client.GetResource(ctx, cmGVK, testNamespace, cmName, nil) require.NoError(t, err, "ConfigMap should exist in K8s") assert.Equal(t, cmName, cm.GetName()) @@ -408,7 +408,7 @@ func TestExecutor_K8s_CreateResources(t *testing.T) { // Verify Secret exists in K8s secretGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Secret"} secretName := fmt.Sprintf("cluster-secret-%s", clusterId) - secret, err := k8sEnv.Client.GetResource(ctx, secretGVK, testNamespace, secretName) + secret, err := k8sEnv.Client.GetResource(ctx, secretGVK, testNamespace, secretName, nil) require.NoError(t, err, "Secret should exist in K8s") assert.Equal(t, secretName, secret.GetName()) t.Logf("Secret verified: %s", secretName) @@ -509,7 +509,7 @@ func TestExecutor_K8s_UpdateExistingResource(t *testing.T) { // Verify ConfigMap was updated with new data cmGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} cmName := fmt.Sprintf("cluster-config-%s", clusterId) - updatedCM, err := k8sEnv.Client.GetResource(ctx, cmGVK, testNamespace, cmName) + updatedCM, err := k8sEnv.Client.GetResource(ctx, cmGVK, testNamespace, cmName, nil) require.NoError(t, err) cmData, _, _ := unstructured.NestedStringMap(updatedCM.Object, "data") @@ -683,7 +683,7 @@ func TestExecutor_K8s_RecreateOnChange(t *testing.T) { // Get the original UID cmGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} cmName := fmt.Sprintf("cluster-config-%s", clusterId) - originalCM, err := k8sEnv.Client.GetResource(ctx, cmGVK, testNamespace, cmName) + originalCM, err := k8sEnv.Client.GetResource(ctx, cmGVK, testNamespace, cmName, nil) require.NoError(t, err) originalUID := originalCM.GetUID() t.Logf("Original ConfigMap UID: %s", originalUID) @@ -696,7 +696,7 @@ func TestExecutor_K8s_RecreateOnChange(t *testing.T) { t.Logf("Second execution: %s", result2.ResourceResults[0].Operation) // Verify it's a new resource (different UID) - recreatedCM, err := k8sEnv.Client.GetResource(ctx, cmGVK, testNamespace, cmName) + recreatedCM, err := k8sEnv.Client.GetResource(ctx, cmGVK, testNamespace, cmName, nil) require.NoError(t, err) newUID := recreatedCM.GetUID() assert.NotEqual(t, originalUID, newUID, "Resource should have new UID after recreate") diff --git a/test/integration/k8s_client/client_integration_test.go b/test/integration/k8s_client/client_integration_test.go index bdfc7df..12c0801 100644 --- a/test/integration/k8s_client/client_integration_test.go +++ b/test/integration/k8s_client/client_integration_test.go @@ -128,7 +128,7 @@ func TestIntegration_GetResource(t *testing.T) { require.NoError(t, err) // Get the namespace - retrieved, err := env.GetClient().GetResource(env.GetContext(), gvk.Namespace, "", nsName) + retrieved, err := env.GetClient().GetResource(env.GetContext(), gvk.Namespace, "", nsName, nil) require.NoError(t, err) require.NotNil(t, retrieved) @@ -137,7 +137,7 @@ func TestIntegration_GetResource(t *testing.T) { }) t.Run("get non-existent resource returns error", func(t *testing.T) { - _, err := env.GetClient().GetResource(env.GetContext(), gvk.Namespace, "", "non-existent-namespace-12345") + _, err := env.GetClient().GetResource(env.GetContext(), gvk.Namespace, "", "non-existent-namespace-12345", nil) require.Error(t, err) }) } @@ -253,7 +253,7 @@ func TestIntegration_DeleteResource(t *testing.T) { require.NoError(t, err) // Verify it exists - _, err = env.GetClient().GetResource(env.GetContext(), gvk.Namespace, "", nsName) + _, err = env.GetClient().GetResource(env.GetContext(), gvk.Namespace, "", nsName, nil) require.NoError(t, err) // Delete the namespace @@ -262,7 +262,7 @@ func TestIntegration_DeleteResource(t *testing.T) { // Verify it's being deleted (namespaces go into Terminating phase) time.Sleep(100 * time.Millisecond) - deletedNs, err := env.GetClient().GetResource(env.GetContext(), gvk.Namespace, "", nsName) + deletedNs, err := env.GetClient().GetResource(env.GetContext(), gvk.Namespace, "", nsName, nil) if err == nil { // Namespace still exists, should have deletionTimestamp set (Terminating state) deletionTimestamp := deletedNs.GetDeletionTimestamp() @@ -305,7 +305,7 @@ func TestIntegration_ResourceLifecycle(t *testing.T) { assert.Equal(t, cmName, created.GetName()) // 2. Get and verify - retrieved, err := env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", cmName) + retrieved, err := env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", cmName, nil) require.NoError(t, err) data, _, _ := unstructured.NestedString(retrieved.Object, "data", "stage") assert.Equal(t, "created", data) @@ -319,7 +319,7 @@ func TestIntegration_ResourceLifecycle(t *testing.T) { assert.Equal(t, "updated", data) // 4. Get and verify update - retrieved2, err := env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", cmName) + retrieved2, err := env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", cmName, nil) require.NoError(t, err) data, _, _ = unstructured.NestedString(retrieved2.Object, "data", "stage") assert.Equal(t, "updated", data) @@ -329,7 +329,7 @@ func TestIntegration_ResourceLifecycle(t *testing.T) { require.NoError(t, err) // 6. Verify deletion - _, err = env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", cmName) + _, err = env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", cmName, nil) assert.Error(t, err) }) } @@ -515,7 +515,7 @@ func TestIntegration_ErrorScenarios(t *testing.T) { }) t.Run("get with empty name returns error", func(t *testing.T) { - _, err := env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", "") + _, err := env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", "", nil) require.Error(t, err) }) @@ -578,7 +578,7 @@ func TestIntegration_ErrorScenarios(t *testing.T) { // but behavior may vary - verify at least that it doesn't silently corrupt data if err == nil { // If it succeeded, verify the update was applied - retrieved, getErr := env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", cmName) + retrieved, getErr := env.GetClient().GetResource(env.GetContext(), gvk.ConfigMap, "default", cmName, nil) require.NoError(t, getErr) data, _, _ := unstructured.NestedString(retrieved.Object, "data", "key") assert.Equal(t, "updated", data) @@ -628,7 +628,7 @@ func TestIntegration_DifferentResourceTypes(t *testing.T) { assert.Equal(t, svcName, created.GetName()) // Get the service - retrieved, err := env.GetClient().GetResource(env.GetContext(), gvk.Service, "default", svcName) + retrieved, err := env.GetClient().GetResource(env.GetContext(), gvk.Service, "default", svcName, nil) require.NoError(t, err) assert.Equal(t, svcName, retrieved.GetName()) })