diff --git a/README.md b/README.md index 6ffbe3e..2db0125 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,7 @@ A HyperFleet Adapter requires several files for configuration: To see all configuration options read [configuration.md](configuration.md) file #### Adapter configuration + The adapter deployment configuration (`AdapterConfig`) controls runtime and infrastructure settings for the adapter process, such as client connections, retries, and broker subscription details. It is loaded with Viper, so values can be overridden by CLI flags @@ -182,10 +183,12 @@ and environment variables in this priority order: CLI flags > env vars > file > (HyperFleet API, Maestro, broker, Kubernetes) Reference examples: + - `configs/adapter-deployment-config.yaml` (full reference with env/flag notes) - `charts/examples/adapter-config.yaml` (minimal deployment example) #### Adapter task configuration + The adapter task configuration (`AdapterTaskConfig`) defines the **business logic** for processing events: parameters, preconditions, resources to create, and post-actions. This file is loaded as **static YAML** (no Viper overrides) and is required at runtime. @@ -195,20 +198,156 @@ This file is loaded as **static YAML** (no Viper overrides) and is required at r - **Resource manifests**: inline YAML or external file via `manifest.ref` Reference examples: + - `charts/examples/adapter-task-config.yaml` (worked example) - `configs/adapter-task-config-template.yaml` (complete schema reference) - ### Broker Configuration Broker configuration is particular since responsibility is split between: + - **Hyperfleet broker library**: configures the connection to a concrete broker (google pubsub, rabbitmq, ...) - Configured using a YAML file specified by the `BROKER_CONFIG_FILE` environment variable - **Adapter**: configures which topic/subscriptions to use on the broker - - Configure topic/subscription in the `adapter-config.yaml` but can be overriden with env variables or cli params + - Configure topic/subscription in the `adapter-config.yaml` but can be overridden with env variables or cli params See the Helm chart documentation for broker configuration options. +## Dry-Run Mode + +Dry-run mode lets you simulate the full adapter execution pipeline locally without connecting to any real infrastructure (no broker, no Kubernetes cluster, no HyperFleet API). It processes a single CloudEvent from a JSON file and produces a detailed trace of what the adapter would do. + +### Usage + +```bash +hyperfleet-adapter serve \ + --config ./adapter-config.yaml \ + --task-config ./task-config.yaml \ + --dry-run-event ./event.json +``` + +Dry-run mode is activated when the `--dry-run-event` flag is present. Instead of subscribing to a broker, the adapter loads the event from the specified file and runs through all phases (params, preconditions, resources, post-actions) using mock clients. + +### Flags + +| Flag | Required | Description | +|------|----------|-------------| +| `--dry-run-event ` | Yes | Path to a CloudEvent JSON file to process | +| `--dry-run-api-responses ` | No | Path to mock API responses JSON file (defaults to 200 OK for all requests) | +| `--dry-run-discovery ` | No | Path to mock discovery overrides JSON file (simulates server-populated fields) | +| `--dry-run-verbose` | No | Show rendered manifests and API request/response bodies in output | +| `--dry-run-output ` | No | Output format: `text` (default) or `json` | + +### Input Files + +#### CloudEvent File (`--dry-run-event`) + +A standard CloudEvents JSON file: + +```json +{ + "specversion": "1.0", + "id": "abc123", + "type": "io.hyperfleet.cluster.updated", + "source": "/api/clusters_mgmt/v1/clusters/abc123", + "time": "2025-01-15T10:30:00Z", + "datacontenttype": "application/json", + "data": { + "id": "abc123", + "kind": "Cluster", + "href": "/api/clusters_mgmt/v1/clusters/abc123", + "generation": 5 + } +} +``` + +#### Mock API Responses File (`--dry-run-api-responses`) + +Defines canned responses for HyperFleet API calls. Requests are matched by HTTP method and URL regex pattern. When multiple responses are defined for a match, they are returned sequentially (the last response repeats): + +```json +{ + "responses": [ + { + "match": { + "method": "GET", + "urlPattern": "/api/hyperfleet/v1/clusters/.*" + }, + "responses": [ + { + "statusCode": 200, + "headers": { "Content-Type": "application/json" }, + "body": { "id": "abc-123", "name": "abc123", "kind": "Cluster" } + } + ] + } + ] +} +``` + +If no file is provided, all API requests return 200 OK by default. + +#### Discovery Overrides File (`--dry-run-discovery`) + +Maps rendered resource names to complete resource objects, allowing you to simulate server-populated fields (status, uid, resourceVersion, etc.) that would normally be set by the Kubernetes API server: + +```json +{ + "rendered-resource-name": { + "apiVersion": "work.open-cluster-management.io/v1", + "kind": "ManifestWork", + "metadata": { "name": "manifestwork-001", "namespace": "cluster1" }, + "status": { "conditions": [{ "type": "Applied", "status": "True" }] } + } +} +``` + +These overrides replace applied manifests in the in-memory store, making the simulated discovery results available as `resources.*` in post-action CEL expressions. + +Having the discovery mocked is useful to develop the status payload to return to the hyperfleet_api + +### Output + +The trace output shows the result of each execution phase: + +1. **Event Info** - Event ID and type +2. **Phase 1: Parameter Extraction** - Extracted parameters and their values +3. **Phase 2: Preconditions** - Precondition evaluation results (SUCCESS/FAILED/NOT MET) with API calls made +4. **Phase 3: Resources** - Resource operations (CREATE/UPDATE/RECREATE) with kind, namespace, and name +5. **Phase 3.5: Discovery Results** - Resources available for post-action CEL evaluation +6. **Phase 4: Post Actions** - Post-action API calls and skip reasons +7. **Result** - Overall SUCCESS or FAILED + +Use `--dry-run-verbose` to include rendered manifests and full API request/response bodies. + +Use `--dry-run-output json` for structured JSON output suitable for programmatic consumption. + +### Examples + +Minimal dry-run (mock API returns 200 OK for everything): + +```bash +hyperfleet-adapter serve \ + --config ./adapter-config.yaml \ + --task-config ./task-config.yaml \ + --dry-run-event ./event.json +``` + +Full dry-run with mock API responses, discovery overrides, and verbose JSON output: + +```bash +hyperfleet-adapter serve \ + --config ./adapter-config.yaml \ + --task-config ./task-config.yaml \ + --dry-run-event ./event.json \ + --dry-run-api-responses ./api-responses.json \ + --dry-run-discovery ./discovery-overrides.json \ + --dry-run-verbose \ + --dry-run-output json +``` + +Example input files are available in `test/testdata/dryrun/`. + ## Deployment ### Using Helm Chart diff --git a/cmd/adapter/main.go b/cmd/adapter/main.go index 40cba8a..e01fd12 100644 --- a/cmd/adapter/main.go +++ b/cmd/adapter/main.go @@ -10,10 +10,12 @@ import ( "time" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/config_loader" + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/dryrun" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/executor" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/hyperfleet_api" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/k8s_client" "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/maestro_client" + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/transport_client" "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/health" "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/logger" "github.com/openshift-hyperfleet/hyperfleet-adapter/pkg/otel" @@ -32,6 +34,13 @@ var ( logFormat string logOutput string serveFlags *pflag.FlagSet + + // Dry-run flags + dryRunEvent string // Path to CloudEvent JSON file + dryRunAPIResponses string // Path to mock API responses JSON file + dryRunDiscovery string // Path to mock discovery responses JSON file + dryRunVerbose bool // Show verbose dry-run output + dryRunOutput string // Output format: text or json ) // Timeout constants @@ -51,13 +60,12 @@ const ( ) func main() { - // Root command rootCmd := &cobra.Command{ Use: "adapter", Short: "HyperFleet Adapter - event-driven Kubernetes resource manager", - Long: `HyperFleet Adapter listens for events from a message broker and -executes configured actions including Kubernetes resource management + Long: `HyperFleet Adapter listens for events from a message broker and +executes configured actions including Kubernetes resource management and HyperFleet API calls.`, // Disable default completion command CompletionOptions: cobra.CompletionOptions{ @@ -76,8 +84,16 @@ and HyperFleet API calls.`, - Connect to the configured message broker - Subscribe to the specified topic - Process incoming events according to the adapter configuration -- Execute Kubernetes operations and HyperFleet API calls`, +- Execute Kubernetes operations and HyperFleet API calls + +Dry-run mode: + Pass --dry-run-event to process a single CloudEvent from a JSON file + using mock transport clients. No broker, cluster, or API is required. + Optionally pass --dry-run-api-responses to configure mock API responses.`, RunE: func(cmd *cobra.Command, args []string) error { + if isDryRun() { + return runDryRun() + } return runServe() }, } @@ -116,6 +132,18 @@ and HyperFleet API calls.`, serveCmd.Flags().StringVar(&logOutput, "log-output", "", "Log output (stdout, stderr). Env: LOG_OUTPUT") + // Add dry-run flags to serve command + serveCmd.Flags().StringVar(&dryRunEvent, "dry-run-event", "", + "Path to CloudEvent JSON file for dry-run mode") + serveCmd.Flags().StringVar(&dryRunAPIResponses, "dry-run-api-responses", "", + "Path to mock API responses JSON file for dry-run mode (defaults to 200 OK)") + serveCmd.Flags().StringVar(&dryRunDiscovery, "dry-run-discovery", "", + "Path to mock discovery responses JSON file for dry-run mode (overrides applied resources by name)") + serveCmd.Flags().BoolVar(&dryRunVerbose, "dry-run-verbose", false, + "Show rendered manifests, API request/response bodies in dry-run output") + serveCmd.Flags().StringVar(&dryRunOutput, "dry-run-output", "text", + "Dry-run output format: text or json") + // Version command versionCmd := &cobra.Command{ Use: "version", @@ -140,6 +168,15 @@ and HyperFleet API calls.`, } } +// isDryRun returns true when dry-run flags are present. +func isDryRun() bool { + return dryRunEvent != "" || dryRunAPIResponses != "" +} + +// ----------------------------------------------------------------------------- +// Configuration loading (shared between serve and dry-run) +// ----------------------------------------------------------------------------- + // buildLoggerConfig creates a logger configuration from environment variables // and command-line flags. Flags take precedence over environment variables. func buildLoggerConfig(component string) logger.Config { @@ -162,6 +199,138 @@ func buildLoggerConfig(component string) logger.Config { return cfg } +// loadConfig loads the unified adapter configuration from both config files. +func loadConfig(ctx context.Context, log logger.Logger) (*config_loader.Config, error) { + log.Info(ctx, "Loading adapter configuration...") + config, err := config_loader.LoadConfig( + config_loader.WithAdapterConfigPath(configPath), + config_loader.WithTaskConfigPath(taskConfigPath), + config_loader.WithAdapterVersion(version.Version), + config_loader.WithFlags(serveFlags), + ) + if err != nil { + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Failed to load adapter configuration") + return nil, fmt.Errorf("failed to load adapter configuration: %w", err) + } + return config, nil +} + +// ----------------------------------------------------------------------------- +// Client creation (shared between serve and dry-run) +// ----------------------------------------------------------------------------- + +// createAPIClient creates a HyperFleet API client from the config +func createAPIClient(apiConfig config_loader.HyperfleetAPIConfig, log logger.Logger) (hyperfleet_api.Client, error) { + var opts []hyperfleet_api.ClientOption + + // Set base URL if configured (env fallback handled in NewClient) + if apiConfig.BaseURL != "" { + opts = append(opts, hyperfleet_api.WithBaseURL(apiConfig.BaseURL)) + } + + // Set timeout if configured (0 means use default) + if apiConfig.Timeout > 0 { + opts = append(opts, hyperfleet_api.WithTimeout(apiConfig.Timeout)) + } + + // Set retry attempts + if apiConfig.RetryAttempts > 0 { + opts = append(opts, hyperfleet_api.WithRetryAttempts(apiConfig.RetryAttempts)) + } + + // Set retry backoff strategy + if apiConfig.RetryBackoff != "" { + switch apiConfig.RetryBackoff { + case hyperfleet_api.BackoffExponential, hyperfleet_api.BackoffLinear, hyperfleet_api.BackoffConstant: + opts = append(opts, hyperfleet_api.WithRetryBackoff(apiConfig.RetryBackoff)) + default: + return nil, fmt.Errorf("invalid retry backoff strategy %q (supported: exponential, linear, constant)", apiConfig.RetryBackoff) + } + } + + // Set retry base delay + if apiConfig.BaseDelay > 0 { + opts = append(opts, hyperfleet_api.WithBaseDelay(apiConfig.BaseDelay)) + } + + // Set retry max delay + if apiConfig.MaxDelay > 0 { + opts = append(opts, hyperfleet_api.WithMaxDelay(apiConfig.MaxDelay)) + } + + // Set default headers + for key, value := range apiConfig.DefaultHeaders { + opts = append(opts, hyperfleet_api.WithDefaultHeader(key, value)) + } + + return hyperfleet_api.NewClient(log, opts...) +} + +// createTransportClient creates the appropriate transport client based on config. +func createTransportClient(ctx context.Context, config *config_loader.Config, log logger.Logger) (transport_client.TransportClient, error) { + if config.Spec.Clients.Maestro != nil { + log.Info(ctx, "Creating Maestro transport client...") + client, err := createMaestroClient(ctx, config.Spec.Clients.Maestro, log) + if err != nil { + return nil, fmt.Errorf("failed to create Maestro client: %w", err) + } + log.Info(ctx, "Maestro transport client created successfully") + return client, nil + } + + log.Info(ctx, "Creating Kubernetes transport client...") + client, err := createK8sClient(ctx, config.Spec.Clients.Kubernetes, log) + if err != nil { + return nil, fmt.Errorf("failed to create Kubernetes client: %w", err) + } + log.Info(ctx, "Kubernetes transport client created successfully") + return client, nil +} + +// createK8sClient creates a Kubernetes client from the config +func createK8sClient(ctx context.Context, k8sConfig config_loader.KubernetesConfig, log logger.Logger) (*k8s_client.Client, error) { + clientConfig := k8s_client.ClientConfig{ + KubeConfigPath: k8sConfig.KubeConfigPath, + QPS: k8sConfig.QPS, + Burst: k8sConfig.Burst, + } + return k8s_client.NewClient(ctx, clientConfig, log) +} + +// createMaestroClient creates a Maestro client from the config +func createMaestroClient(ctx context.Context, maestroConfig *config_loader.MaestroClientConfig, log logger.Logger) (*maestro_client.Client, error) { + config := &maestro_client.Config{ + MaestroServerAddr: maestroConfig.HTTPServerAddress, + GRPCServerAddr: maestroConfig.GRPCServerAddress, + SourceID: maestroConfig.SourceID, + Insecure: maestroConfig.Insecure, + } + + // Set TLS config if present + if maestroConfig.Auth.TLSConfig != nil { + config.CAFile = maestroConfig.Auth.TLSConfig.CAFile + config.ClientCertFile = maestroConfig.Auth.TLSConfig.CertFile + config.ClientKeyFile = maestroConfig.Auth.TLSConfig.KeyFile + } + + return maestro_client.NewMaestroClient(ctx, config, log) +} + +// buildExecutor creates the executor with the given clients. +func buildExecutor(config *config_loader.Config, apiClient hyperfleet_api.Client, tc transport_client.TransportClient, log logger.Logger) (*executor.Executor, error) { + return executor.NewBuilder(). + WithConfig(config). + WithAPIClient(apiClient). + WithTransportClient(tc). + WithLogger(log). + Build() +} + +// ----------------------------------------------------------------------------- +// Serve mode (normal operation) +// ----------------------------------------------------------------------------- + // runServe contains the main application logic for the serve command func runServe() error { // Create context that cancels on system signals @@ -177,17 +346,9 @@ func runServe() error { log.Infof(ctx, "Starting Hyperfleet Adapter version=%s commit=%s built=%s tag=%s", version.Version, version.Commit, version.BuildDate, version.Tag) // Load unified configuration (deployment + task configs) - log.Info(ctx, "Loading adapter configuration...") - config, err := config_loader.LoadConfig( - config_loader.WithAdapterConfigPath(configPath), - config_loader.WithTaskConfigPath(taskConfigPath), - config_loader.WithAdapterVersion(version.Version), - config_loader.WithFlags(serveFlags), - ) + config, err := loadConfig(ctx, log) if err != nil { - errCtx := logger.WithErrorField(ctx, err) - log.Errorf(errCtx, "Failed to load adapter configuration") - return fmt.Errorf("failed to load adapter configuration: %w", err) + return err } // Recreate logger with component name from config @@ -211,10 +372,8 @@ func runServe() error { } } - // Get trace sample ratio from environment (default: 10%) + // Initialize OpenTelemetry sampleRatio := otel.GetTraceSampleRatio(log, ctx) - - // Initialize OpenTelemetry for trace_id/span_id generation and HTTP propagation tp, err := otel.InitTracer(config.Metadata.Name, version.Version, sampleRatio) if err != nil { errCtx := logger.WithErrorField(ctx, err) @@ -230,14 +389,13 @@ func runServe() error { } }() - // Start health server immediately (readiness starts as false) + // Start health server healthServer := health.NewServer(log, HealthServerPort, config.Metadata.Name) if err := healthServer.Start(ctx); err != nil { errCtx := logger.WithErrorField(ctx, err) log.Errorf(errCtx, "Failed to start health server") return fmt.Errorf("failed to start health server: %w", err) } - // Mark config as loaded since we got here successfully healthServer.SetConfigLoaded() defer func() { shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), HealthServerShutdownTimeout) @@ -248,7 +406,7 @@ func runServe() error { } }() - // Start metrics server with build info + // Start metrics server metricsServer := health.NewMetricsServer(log, MetricsServerPort, health.MetricsConfig{ Component: config.Metadata.Name, Version: version.Version, @@ -268,7 +426,7 @@ func runServe() error { } }() - // Create HyperFleet API client from config + // Create real clients log.Info(ctx, "Creating HyperFleet API client...") apiClient, err := createAPIClient(config.Spec.Clients.HyperfleetAPI, log) if err != nil { @@ -277,49 +435,23 @@ func runServe() error { return fmt.Errorf("failed to create HyperFleet API client: %w", err) } - // Create transport client - only one transport is supported per adapter instance - execBuilder := executor.NewBuilder(). - WithConfig(config). - WithAPIClient(apiClient). - WithLogger(log) - - if config.Spec.Clients.Maestro != nil { - log.Info(ctx, "Creating Maestro transport client...") - maestroClient, err := createMaestroClient(ctx, config.Spec.Clients.Maestro, log) - if err != nil { - errCtx := logger.WithErrorField(ctx, err) - log.Errorf(errCtx, "Failed to create Maestro client") - return fmt.Errorf("failed to create Maestro client: %w", err) - } - execBuilder = execBuilder.WithTransportClient(maestroClient) - log.Info(ctx, "Maestro transport client created successfully") - } else { - log.Info(ctx, "Creating Kubernetes transport client...") - k8sClient, err := createK8sClient(ctx, config.Spec.Clients.Kubernetes, log) - if err != nil { - errCtx := logger.WithErrorField(ctx, err) - log.Errorf(errCtx, "Failed to create Kubernetes client") - return fmt.Errorf("failed to create Kubernetes client: %w", err) - } - execBuilder = execBuilder.WithTransportClient(k8sClient) - log.Info(ctx, "Kubernetes transport client created successfully") + tc, err := createTransportClient(ctx, config, log) + if err != nil { + errCtx := logger.WithErrorField(ctx, err) + log.Errorf(errCtx, "Failed to create transport client") + return err } - // Create the executor using the builder pattern + // Build executor log.Info(ctx, "Creating event executor...") - exec, err := execBuilder.Build() + exec, err := buildExecutor(config, apiClient, tc, log) if err != nil { errCtx := logger.WithErrorField(ctx, err) log.Errorf(errCtx, "Failed to create executor") return fmt.Errorf("failed to create executor: %w", err) } - // Create the event handler from the executor - // This handler will: - // 1. Extract params from event data - // 2. Execute preconditions (API calls, condition checks) - // 3. Create/update Kubernetes resources - // 4. Execute post actions (status reporting) + // Create the event handler and subscribe to broker handler := exec.CreateHandler() // Handle signals for graceful shutdown @@ -328,8 +460,6 @@ func runServe() error { go func() { sig := <-sigCh log.Infof(ctx, "Received signal %s, initiating graceful shutdown...", sig) - // Mark as not ready immediately per HyperFleet Graceful Shutdown Standard - // This must happen BEFORE context cancellation to ensure /readyz returns 503 log.Info(ctx, "Shutdown initiated, marking not ready") healthServer.SetShuttingDown(true) cancel() @@ -340,7 +470,7 @@ func runServe() error { os.Exit(1) }() - // Get broker subscription ID from config + // Get broker config subscriptionID := config.Spec.Clients.Broker.SubscriptionID if subscriptionID == "" { err := fmt.Errorf("spec.clients.broker.subscriptionId is required") @@ -349,7 +479,6 @@ func runServe() error { return err } - // Get broker topic from config topic := config.Spec.Clients.Broker.Topic if topic == "" { err := fmt.Errorf("spec.clients.broker.topic is required") @@ -358,12 +487,7 @@ func runServe() error { return err } - // Create broker subscriber - // Configuration is loaded from environment variables by the broker library: - // - BROKER_TYPE: "rabbitmq" or "googlepubsub" - // - BROKER_GOOGLEPUBSUB_PROJECT_ID: GCP project ID (for googlepubsub) - // - BROKER_RABBITMQ_URL: RabbitMQ URL (for rabbitmq) - // - SUBSCRIBER_PARALLELISM: number of parallel workers (default: 1) + // Create broker subscriber and subscribe log.Info(ctx, "Creating broker subscriber...") subscriber, err := broker.NewSubscriber(log, subscriptionID) if err != nil { @@ -373,7 +497,6 @@ func runServe() error { } log.Info(ctx, "Broker subscriber created successfully") - // Subscribe to topic - this is NON-BLOCKING, it returns immediately after setup log.Info(ctx, "Subscribing to broker topic...") err = subscriber.Subscribe(ctx, topic, handler) if err != nil { @@ -383,28 +506,19 @@ func runServe() error { } log.Info(ctx, "Successfully subscribed to broker topic") - // Mark as ready now that broker subscription is established + // Mark as ready healthServer.SetBrokerReady(true) log.Info(ctx, "Adapter is ready to process events") - // Channel to signal fatal errors from the errors goroutine + // Monitor subscription errors fatalErrCh := make(chan error, 1) - - // Monitor subscription errors channel in a separate goroutine. - // Note: Error context here reflects the handler's location, not the error's origin - // in the broker library. Stack traces (if captured) would show this goroutine's - // call stack. For richer error context, the broker library would need to provide - // errors with embedded stack traces or structured error details. go func() { for subErr := range subscriber.Errors() { errCtx := logger.WithErrorField(ctx, subErr) log.Errorf(errCtx, "Subscription error") - // For critical errors, signal shutdown select { case fatalErrCh <- subErr: - // Signal sent, trigger shutdown default: - // Channel already has an error, don't block } } }() @@ -418,17 +532,15 @@ func runServe() error { case err := <-fatalErrCh: errCtx := logger.WithErrorField(ctx, err) log.Errorf(errCtx, "Fatal subscription error, shutting down") - // Mark as not ready before shutdown per HyperFleet Graceful Shutdown Standard healthServer.SetShuttingDown(true) - cancel() // Cancel context to trigger graceful shutdown + cancel() } - // Close subscriber gracefully with timeout + // Close subscriber gracefully log.Info(ctx, "Closing broker subscriber...") shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second) defer shutdownCancel() - // Close subscriber in a goroutine with timeout closeDone := make(chan error, 1) go func() { closeDone <- subscriber.Close() @@ -453,78 +565,100 @@ func runServe() error { return nil } -// createAPIClient creates a HyperFleet API client from the config -func createAPIClient(apiConfig config_loader.HyperfleetAPIConfig, log logger.Logger) (hyperfleet_api.Client, error) { - var opts []hyperfleet_api.ClientOption +// ----------------------------------------------------------------------------- +// Dry-run mode +// ----------------------------------------------------------------------------- - // Set base URL if configured (env fallback handled in NewClient) - if apiConfig.BaseURL != "" { - opts = append(opts, hyperfleet_api.WithBaseURL(apiConfig.BaseURL)) +// runDryRun processes a single CloudEvent from file using mock clients. +func runDryRun() error { + ctx := context.Background() + + // Create logger on stderr so stdout is reserved for trace output + log, err := logger.NewLogger(logger.Config{ + Level: "warn", + Format: "text", + Output: "stderr", + Component: "dry-run", + }) + if err != nil { + return fmt.Errorf("failed to create logger: %w", err) } - // Set timeout if configured (0 means use default) - if apiConfig.Timeout > 0 { - opts = append(opts, hyperfleet_api.WithTimeout(apiConfig.Timeout)) + // Load config (same path as serve) + config, err := loadConfig(ctx, log) + if err != nil { + return err } - // Set retry attempts - if apiConfig.RetryAttempts > 0 { - opts = append(opts, hyperfleet_api.WithRetryAttempts(apiConfig.RetryAttempts)) + // Load CloudEvent from file + if dryRunEvent == "" { + return fmt.Errorf("--dry-run-event is required for dry-run mode") + } + evt, err := dryrun.LoadCloudEvent(dryRunEvent) + if err != nil { + return fmt.Errorf("failed to load event: %w", err) } - // Set retry backoff strategy - if apiConfig.RetryBackoff != "" { - switch apiConfig.RetryBackoff { - case hyperfleet_api.BackoffExponential, hyperfleet_api.BackoffLinear, hyperfleet_api.BackoffConstant: - opts = append(opts, hyperfleet_api.WithRetryBackoff(apiConfig.RetryBackoff)) - default: - return nil, fmt.Errorf("invalid retry backoff strategy %q (supported: exponential, linear, constant)", apiConfig.RetryBackoff) + // Create dryrun API client + var dryrunResponsesFile *dryrun.DryrunResponsesFile + if dryRunAPIResponses != "" { + dryrunResponsesFile, err = dryrun.LoadDryrunResponses(dryRunAPIResponses) + if err != nil { + return fmt.Errorf("failed to load dryrun responses: %w", err) } } - - // Set retry base delay - if apiConfig.BaseDelay > 0 { - opts = append(opts, hyperfleet_api.WithBaseDelay(apiConfig.BaseDelay)) + dryrunAPI, err := dryrun.NewDryrunAPIClient(dryrunResponsesFile) + if err != nil { + return fmt.Errorf("failed to create dryrun API client: %w", err) } - // Set retry max delay - if apiConfig.MaxDelay > 0 { - opts = append(opts, hyperfleet_api.WithMaxDelay(apiConfig.MaxDelay)) + // Create recording transport client + var dryrunClient *dryrun.DryrunTransportClient + if dryRunDiscovery != "" { + overrides, err := dryrun.LoadDiscoveryOverrides(dryRunDiscovery) + if err != nil { + return fmt.Errorf("failed to load discovery overrides: %w", err) + } + dryrunClient = dryrun.NewDryrunTransportClientWithOverrides(overrides) + } else { + dryrunClient = dryrun.NewDryrunTransportClient() } - // Set default headers - for key, value := range apiConfig.DefaultHeaders { - opts = append(opts, hyperfleet_api.WithDefaultHeader(key, value)) + // Build executor with mock clients (same builder as serve) + exec, err := buildExecutor(config, dryrunAPI, dryrunClient, log) + if err != nil { + return fmt.Errorf("failed to create executor: %w", err) } - return hyperfleet_api.NewClient(log, opts...) -} + // Execute with event data + result := exec.Execute(ctx, evt.Data()) -// createK8sClient creates a Kubernetes client from the config -func createK8sClient(ctx context.Context, k8sConfig config_loader.KubernetesConfig, log logger.Logger) (*k8s_client.Client, error) { - clientConfig := k8s_client.ClientConfig{ - KubeConfigPath: k8sConfig.KubeConfigPath, - QPS: k8sConfig.QPS, - Burst: k8sConfig.Burst, + // Build and output execution trace + trace := &dryrun.ExecutionTrace{ + EventID: evt.ID(), + EventType: evt.Type(), + Result: result, + APIClient: dryrunAPI, + Transport: dryrunClient, + Verbose: dryRunVerbose, } - return k8s_client.NewClient(ctx, clientConfig, log) -} -// createMaestroClient creates a Maestro client from the config -func createMaestroClient(ctx context.Context, maestroConfig *config_loader.MaestroClientConfig, log logger.Logger) (*maestro_client.Client, error) { - config := &maestro_client.Config{ - MaestroServerAddr: maestroConfig.HTTPServerAddress, - GRPCServerAddr: maestroConfig.GRPCServerAddress, - SourceID: maestroConfig.SourceID, - Insecure: maestroConfig.Insecure, + switch dryRunOutput { + case "json": + data, err := trace.FormatJSON() + if err != nil { + return fmt.Errorf("failed to format trace as JSON: %w", err) + } + fmt.Println(string(data)) + default: + fmt.Print(trace.FormatText()) } - // Set TLS config if present - if maestroConfig.Auth.TLSConfig != nil { - config.CAFile = maestroConfig.Auth.TLSConfig.CAFile - config.ClientCertFile = maestroConfig.Auth.TLSConfig.CertFile - config.ClientKeyFile = maestroConfig.Auth.TLSConfig.KeyFile + if result.Status == executor.StatusFailed { + for phase, err := range result.Errors { + fmt.Fprintf(os.Stderr, "Error in %s: %v\n", phase, err) + } } - return maestro_client.NewMaestroClient(ctx, config, log) + return nil } diff --git a/internal/dryrun/discovery_overrides.go b/internal/dryrun/discovery_overrides.go new file mode 100644 index 0000000..53c3ec9 --- /dev/null +++ b/internal/dryrun/discovery_overrides.go @@ -0,0 +1,39 @@ +package dryrun + +import ( + "encoding/json" + "fmt" + "os" +) + +// DiscoveryOverrides maps rendered Kubernetes resource names to complete resource +// objects that replace applied manifests in the in-memory store. This allows +// dry-run mode to simulate server-populated fields (status, uid, resourceVersion, etc.). +type DiscoveryOverrides map[string]map[string]interface{} + +// LoadDiscoveryOverrides reads a JSON file and returns discovery overrides. +// Each top-level key is a rendered metadata.name, and each value is a complete +// Kubernetes-like resource object that must contain at least apiVersion and kind. +func LoadDiscoveryOverrides(path string) (DiscoveryOverrides, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read discovery overrides file: %w", err) + } + + var overrides DiscoveryOverrides + if err := json.Unmarshal(data, &overrides); err != nil { + return nil, fmt.Errorf("failed to parse discovery overrides JSON: %w", err) + } + + // Validate each entry has required fields + for name, obj := range overrides { + if _, ok := obj["apiVersion"]; !ok { + return nil, fmt.Errorf("discovery override %q is missing required field \"apiVersion\"", name) + } + if _, ok := obj["kind"]; !ok { + return nil, fmt.Errorf("discovery override %q is missing required field \"kind\"", name) + } + } + + return overrides, nil +} diff --git a/internal/dryrun/discovery_overrides_test.go b/internal/dryrun/discovery_overrides_test.go new file mode 100644 index 0000000..b7db4f2 --- /dev/null +++ b/internal/dryrun/discovery_overrides_test.go @@ -0,0 +1,80 @@ +package dryrun + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// writeOverrideFile writes content to a file in dir and returns the full path. +func writeOverrideFile(t *testing.T, dir, name, content string) string { + t.Helper() + p := filepath.Join(dir, name) + err := os.WriteFile(p, []byte(content), 0644) + require.NoError(t, err) + return p +} + +func TestLoadDiscoveryOverrides_ValidFile(t *testing.T) { + t.Run("single entry with apiVersion and kind", func(t *testing.T) { + dir := t.TempDir() + path := writeOverrideFile(t, dir, "overrides.json", `{ + "my-resource": { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "my-resource"} + } + }`) + + overrides, err := LoadDiscoveryOverrides(path) + + require.NoError(t, err) + assert.Len(t, overrides, 1) + assert.Equal(t, "v1", overrides["my-resource"]["apiVersion"]) + assert.Equal(t, "ConfigMap", overrides["my-resource"]["kind"]) + }) +} + +func TestLoadDiscoveryOverrides_FileNotFound(t *testing.T) { + t.Run("non-existent file returns read error", func(t *testing.T) { + _, err := LoadDiscoveryOverrides("/no/such/file.json") + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to read") + }) +} + +func TestLoadDiscoveryOverrides_MissingAPIVersion(t *testing.T) { + t.Run("entry missing apiVersion returns validation error", func(t *testing.T) { + dir := t.TempDir() + path := writeOverrideFile(t, dir, "overrides.json", `{ + "bad-resource": { + "kind": "ConfigMap" + } + }`) + + _, err := LoadDiscoveryOverrides(path) + + require.Error(t, err) + assert.Contains(t, err.Error(), `missing required field "apiVersion"`) + }) +} + +func TestLoadDiscoveryOverrides_MissingKind(t *testing.T) { + t.Run("entry missing kind returns validation error", func(t *testing.T) { + dir := t.TempDir() + path := writeOverrideFile(t, dir, "overrides.json", `{ + "bad-resource": { + "apiVersion": "v1" + } + }`) + + _, err := LoadDiscoveryOverrides(path) + + require.Error(t, err) + assert.Contains(t, err.Error(), `missing required field "kind"`) + }) +} diff --git a/internal/dryrun/dryrun_api_client.go b/internal/dryrun/dryrun_api_client.go new file mode 100644 index 0000000..72ef1fd --- /dev/null +++ b/internal/dryrun/dryrun_api_client.go @@ -0,0 +1,187 @@ +package dryrun + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "regexp" + "sync" + + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/hyperfleet_api" +) + +// RequestRecord stores details of an API request made through the dryrun client. +type RequestRecord struct { + Method string + URL string + Headers map[string]string + Body []byte + StatusCode int + Response []byte +} + +// DryrunAPIClient implements hyperfleet_api.Client backed by file-defined dryrun responses. +// It matches requests by HTTP method and URL regex pattern, returning responses +// sequentially from a configured array per endpoint. All requests are recorded. +type DryrunAPIClient struct { + endpoints []compiledEndpoint + mu sync.Mutex + Requests []RequestRecord +} + +type compiledEndpoint struct { + method string + pattern *regexp.Regexp + resps []DryrunResponse + callIdx int +} + +// NewDryrunAPIClient creates a DryrunAPIClient from a DryrunResponsesFile. +// If mrf is nil, a default client that returns 200 OK for all requests is created. +func NewDryrunAPIClient(mrf *DryrunResponsesFile) (*DryrunAPIClient, error) { + client := &DryrunAPIClient{ + Requests: make([]RequestRecord, 0), + } + + if mrf == nil { + return client, nil + } + + for i, ep := range mrf.Responses { + compiled, err := regexp.Compile(ep.Match.URLPattern) + if err != nil { + return nil, fmt.Errorf("endpoint %d: invalid urlPattern %q: %w", i, ep.Match.URLPattern, err) + } + client.endpoints = append(client.endpoints, compiledEndpoint{ + method: ep.Match.Method, + pattern: compiled, + resps: ep.Responses, + }) + } + + return client, nil +} + +func (c *DryrunAPIClient) findEndpoint(method, url string) *compiledEndpoint { + for i := range c.endpoints { + ep := &c.endpoints[i] + if ep.method != "*" && ep.method != method { + continue + } + if ep.pattern.MatchString(url) { + return ep + } + } + return nil +} + +func (c *DryrunAPIClient) nextResponse(ep *compiledEndpoint) DryrunResponse { + idx := ep.callIdx + if idx >= len(ep.resps) { + idx = len(ep.resps) - 1 // repeat last response + } + ep.callIdx++ + return ep.resps[idx] +} + +// Do executes a dryrun HTTP request, matching against configured endpoints. +func (c *DryrunAPIClient) Do(ctx context.Context, req *hyperfleet_api.Request) (*hyperfleet_api.Response, error) { + c.mu.Lock() + defer c.mu.Unlock() + + ep := c.findEndpoint(req.Method, req.URL) + + var statusCode int + var respBody []byte + + if ep == nil { + // Default: 200 OK with empty body + statusCode = http.StatusOK + respBody = []byte("{}") + } else { + dryrunResp := c.nextResponse(ep) + statusCode = dryrunResp.StatusCode + if statusCode == 0 { + statusCode = http.StatusOK + } + + if dryrunResp.Body != nil { + var err error + respBody, err = json.Marshal(dryrunResp.Body) + if err != nil { + return nil, fmt.Errorf("failed to marshal dryrun response body: %w", err) + } + } else { + respBody = []byte("{}") + } + } + + record := RequestRecord{ + Method: req.Method, + URL: req.URL, + Headers: req.Headers, + Body: req.Body, + StatusCode: statusCode, + Response: respBody, + } + c.Requests = append(c.Requests, record) + + return &hyperfleet_api.Response{ + StatusCode: statusCode, + Status: fmt.Sprintf("%d %s", statusCode, http.StatusText(statusCode)), + Body: respBody, + Headers: make(map[string][]string), + Attempts: 1, + }, nil +} + +// Get performs a dryrun GET request. +func (c *DryrunAPIClient) Get(ctx context.Context, url string, opts ...hyperfleet_api.RequestOption) (*hyperfleet_api.Response, error) { + req := &hyperfleet_api.Request{Method: http.MethodGet, URL: url} + for _, opt := range opts { + opt(req) + } + return c.Do(ctx, req) +} + +// Post performs a dryrun POST request. +func (c *DryrunAPIClient) Post(ctx context.Context, url string, body []byte, opts ...hyperfleet_api.RequestOption) (*hyperfleet_api.Response, error) { + req := &hyperfleet_api.Request{Method: http.MethodPost, URL: url, Body: body} + for _, opt := range opts { + opt(req) + } + return c.Do(ctx, req) +} + +// Put performs a dryrun PUT request. +func (c *DryrunAPIClient) Put(ctx context.Context, url string, body []byte, opts ...hyperfleet_api.RequestOption) (*hyperfleet_api.Response, error) { + req := &hyperfleet_api.Request{Method: http.MethodPut, URL: url, Body: body} + for _, opt := range opts { + opt(req) + } + return c.Do(ctx, req) +} + +// Patch performs a dryrun PATCH request. +func (c *DryrunAPIClient) Patch(ctx context.Context, url string, body []byte, opts ...hyperfleet_api.RequestOption) (*hyperfleet_api.Response, error) { + req := &hyperfleet_api.Request{Method: http.MethodPatch, URL: url, Body: body} + for _, opt := range opts { + opt(req) + } + return c.Do(ctx, req) +} + +// Delete performs a dryrun DELETE request. +func (c *DryrunAPIClient) Delete(ctx context.Context, url string, opts ...hyperfleet_api.RequestOption) (*hyperfleet_api.Response, error) { + req := &hyperfleet_api.Request{Method: http.MethodDelete, URL: url} + for _, opt := range opts { + opt(req) + } + return c.Do(ctx, req) +} + +// BaseURL returns a placeholder base URL for the dryrun client. +func (c *DryrunAPIClient) BaseURL() string { + return "http://mock-api" +} diff --git a/internal/dryrun/dryrun_api_client_test.go b/internal/dryrun/dryrun_api_client_test.go new file mode 100644 index 0000000..5b42da7 --- /dev/null +++ b/internal/dryrun/dryrun_api_client_test.go @@ -0,0 +1,340 @@ +package dryrun + +import ( + "context" + "encoding/json" + "net/http" + "testing" + + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/hyperfleet_api" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewDryrunAPIClient_NilConfig(t *testing.T) { + client, err := NewDryrunAPIClient(nil) + require.NoError(t, err) + require.NotNil(t, client) + assert.Empty(t, client.endpoints) + assert.Empty(t, client.Requests) +} + +func TestNewDryrunAPIClient_InvalidRegex(t *testing.T) { + mrf := &DryrunResponsesFile{ + Responses: []DryrunEndpoint{ + { + Match: DryrunMatch{ + Method: "GET", + URLPattern: "[invalid", + }, + Responses: []DryrunResponse{ + {StatusCode: 200}, + }, + }, + }, + } + + client, err := NewDryrunAPIClient(mrf) + assert.Error(t, err) + assert.Nil(t, client) + assert.Contains(t, err.Error(), "invalid urlPattern") +} + +func TestDo_MatchesEndpoint(t *testing.T) { + expectedBody := map[string]interface{}{"key": "value"} + + mrf := &DryrunResponsesFile{ + Responses: []DryrunEndpoint{ + { + Match: DryrunMatch{ + Method: "GET", + URLPattern: "/api/v1/tasks.*", + }, + Responses: []DryrunResponse{ + { + StatusCode: 201, + Body: expectedBody, + }, + }, + }, + }, + } + + client, err := NewDryrunAPIClient(mrf) + require.NoError(t, err) + + ctx := context.Background() + req := &hyperfleet_api.Request{ + Method: "GET", + URL: "/api/v1/tasks/123", + } + + resp, err := client.Do(ctx, req) + require.NoError(t, err) + assert.Equal(t, 201, resp.StatusCode) + assert.Equal(t, "201 Created", resp.Status) + assert.Equal(t, 1, resp.Attempts) + + var body map[string]interface{} + err = json.Unmarshal(resp.Body, &body) + require.NoError(t, err) + assert.Equal(t, "value", body["key"]) + + // Verify request was recorded + require.Len(t, client.Requests, 1) + assert.Equal(t, "GET", client.Requests[0].Method) + assert.Equal(t, "/api/v1/tasks/123", client.Requests[0].URL) + assert.Equal(t, 201, client.Requests[0].StatusCode) +} + +func TestDo_NoMatchDefaultOK(t *testing.T) { + mrf := &DryrunResponsesFile{ + Responses: []DryrunEndpoint{ + { + Match: DryrunMatch{ + Method: "GET", + URLPattern: "/specific-path", + }, + Responses: []DryrunResponse{ + {StatusCode: 404}, + }, + }, + }, + } + + client, err := NewDryrunAPIClient(mrf) + require.NoError(t, err) + + ctx := context.Background() + req := &hyperfleet_api.Request{ + Method: "GET", + URL: "/unmatched-path", + } + + resp, err := client.Do(ctx, req) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "{}", string(resp.Body)) +} + +func TestDo_MethodFiltering(t *testing.T) { + mrf := &DryrunResponsesFile{ + Responses: []DryrunEndpoint{ + { + Match: DryrunMatch{ + Method: "POST", + URLPattern: "/api/v1/tasks", + }, + Responses: []DryrunResponse{ + {StatusCode: 201}, + }, + }, + }, + } + + client, err := NewDryrunAPIClient(mrf) + require.NoError(t, err) + + ctx := context.Background() + req := &hyperfleet_api.Request{ + Method: "GET", + URL: "/api/v1/tasks", + } + + resp, err := client.Do(ctx, req) + require.NoError(t, err) + // GET does not match the POST endpoint, so we get the default 200 OK + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "{}", string(resp.Body)) +} + +func TestDo_WildcardMethod(t *testing.T) { + mrf := &DryrunResponsesFile{ + Responses: []DryrunEndpoint{ + { + Match: DryrunMatch{ + Method: "*", + URLPattern: "/api/v1/anything", + }, + Responses: []DryrunResponse{ + {StatusCode: 204}, + }, + }, + }, + } + + client, err := NewDryrunAPIClient(mrf) + require.NoError(t, err) + + ctx := context.Background() + + tests := []struct { + name string + method string + }{ + {"GET matches wildcard", "GET"}, + {"POST matches wildcard", "POST"}, + {"DELETE matches wildcard", "DELETE"}, + {"PATCH matches wildcard", "PATCH"}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + req := &hyperfleet_api.Request{ + Method: tc.method, + URL: "/api/v1/anything", + } + resp, err := client.Do(ctx, req) + require.NoError(t, err) + assert.Equal(t, 204, resp.StatusCode) + }) + } +} + +func TestDo_SequentialResponses(t *testing.T) { + mrf := &DryrunResponsesFile{ + Responses: []DryrunEndpoint{ + { + Match: DryrunMatch{ + Method: "GET", + URLPattern: "/api/v1/resource", + }, + Responses: []DryrunResponse{ + {StatusCode: 200, Body: map[string]interface{}{"call": "first"}}, + {StatusCode: 201, Body: map[string]interface{}{"call": "second"}}, + {StatusCode: 202, Body: map[string]interface{}{"call": "third"}}, + }, + }, + }, + } + + client, err := NewDryrunAPIClient(mrf) + require.NoError(t, err) + + ctx := context.Background() + req := &hyperfleet_api.Request{ + Method: "GET", + URL: "/api/v1/resource", + } + + // First call → first response + resp, err := client.Do(ctx, req) + require.NoError(t, err) + assert.Equal(t, 200, resp.StatusCode) + + // Second call → second response + resp, err = client.Do(ctx, req) + require.NoError(t, err) + assert.Equal(t, 201, resp.StatusCode) + + // Third call → third response + resp, err = client.Do(ctx, req) + require.NoError(t, err) + assert.Equal(t, 202, resp.StatusCode) + + // Fourth call → repeats last response (third) + resp, err = client.Do(ctx, req) + require.NoError(t, err) + assert.Equal(t, 202, resp.StatusCode) + + // Fifth call → still repeats last response + resp, err = client.Do(ctx, req) + require.NoError(t, err) + assert.Equal(t, 202, resp.StatusCode) +} + +func TestDo_StatusCodeZeroDefaultsOK(t *testing.T) { + mrf := &DryrunResponsesFile{ + Responses: []DryrunEndpoint{ + { + Match: DryrunMatch{ + Method: "GET", + URLPattern: "/api/v1/zero", + }, + Responses: []DryrunResponse{ + {StatusCode: 0, Body: map[string]interface{}{"ok": true}}, + }, + }, + }, + } + + client, err := NewDryrunAPIClient(mrf) + require.NoError(t, err) + + ctx := context.Background() + req := &hyperfleet_api.Request{ + Method: "GET", + URL: "/api/v1/zero", + } + + resp, err := client.Do(ctx, req) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "200 OK", resp.Status) +} + +func TestConvenienceMethods(t *testing.T) { + tests := []struct { + name string + call func(ctx context.Context, client *DryrunAPIClient) (*hyperfleet_api.Response, error) + expectedMethod string + }{ + { + name: "Get", + call: func(ctx context.Context, c *DryrunAPIClient) (*hyperfleet_api.Response, error) { + return c.Get(ctx, "/test") + }, + expectedMethod: "GET", + }, + { + name: "Post", + call: func(ctx context.Context, c *DryrunAPIClient) (*hyperfleet_api.Response, error) { + return c.Post(ctx, "/test", []byte(`{"data":"post"}`)) + }, + expectedMethod: "POST", + }, + { + name: "Put", + call: func(ctx context.Context, c *DryrunAPIClient) (*hyperfleet_api.Response, error) { + return c.Put(ctx, "/test", []byte(`{"data":"put"}`)) + }, + expectedMethod: "PUT", + }, + { + name: "Patch", + call: func(ctx context.Context, c *DryrunAPIClient) (*hyperfleet_api.Response, error) { + return c.Patch(ctx, "/test", []byte(`{"data":"patch"}`)) + }, + expectedMethod: "PATCH", + }, + { + name: "Delete", + call: func(ctx context.Context, c *DryrunAPIClient) (*hyperfleet_api.Response, error) { + return c.Delete(ctx, "/test") + }, + expectedMethod: "DELETE", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + client, err := NewDryrunAPIClient(nil) + require.NoError(t, err) + + ctx := context.Background() + resp, err := tc.call(ctx, client) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + require.Len(t, client.Requests, 1) + assert.Equal(t, tc.expectedMethod, client.Requests[0].Method) + assert.Equal(t, "/test", client.Requests[0].URL) + }) + } +} + +func TestBaseURL(t *testing.T) { + client, err := NewDryrunAPIClient(nil) + require.NoError(t, err) + assert.Equal(t, "http://mock-api", client.BaseURL()) +} diff --git a/internal/dryrun/dryrun_responses.go b/internal/dryrun/dryrun_responses.go new file mode 100644 index 0000000..581ba93 --- /dev/null +++ b/internal/dryrun/dryrun_responses.go @@ -0,0 +1,56 @@ +package dryrun + +import ( + "encoding/json" + "fmt" + "os" +) + +// DryrunResponsesFile represents the top-level structure of a dryrun API responses JSON file. +type DryrunResponsesFile struct { + Responses []DryrunEndpoint `json:"responses"` +} + +// DryrunEndpoint defines a URL pattern matcher and its sequential responses. +type DryrunEndpoint struct { + Match DryrunMatch `json:"match"` + Responses []DryrunResponse `json:"responses"` +} + +// DryrunMatch defines the HTTP method and URL pattern to match against. +type DryrunMatch struct { + Method string `json:"method"` // HTTP method or "*" for any + URLPattern string `json:"urlPattern"` // Go regexp +} + +// DryrunResponse defines a single dryrun HTTP response. +type DryrunResponse struct { + StatusCode int `json:"statusCode"` + Headers map[string]string `json:"headers,omitempty"` + Body interface{} `json:"body,omitempty"` +} + +// LoadDryrunResponses reads and parses a dryrun API responses JSON file. +func LoadDryrunResponses(path string) (*DryrunResponsesFile, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read dryrun responses file %q: %w", path, err) + } + + var mrf DryrunResponsesFile + if err := json.Unmarshal(data, &mrf); err != nil { + return nil, fmt.Errorf("failed to parse dryrun responses file %q: %w", path, err) + } + + // Validate each endpoint has at least one response + for i, ep := range mrf.Responses { + if len(ep.Responses) == 0 { + return nil, fmt.Errorf("dryrun responses file %q: endpoint %d has no responses defined", path, i) + } + if ep.Match.URLPattern == "" { + return nil, fmt.Errorf("dryrun responses file %q: endpoint %d has empty urlPattern", path, i) + } + } + + return &mrf, nil +} diff --git a/internal/dryrun/dryrun_responses_test.go b/internal/dryrun/dryrun_responses_test.go new file mode 100644 index 0000000..2cf7a5a --- /dev/null +++ b/internal/dryrun/dryrun_responses_test.go @@ -0,0 +1,159 @@ +package dryrun + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLoadDryrunResponses_ValidFile(t *testing.T) { + t.Run("minimal valid file", func(t *testing.T) { + dir := t.TempDir() + filePath := filepath.Join(dir, "responses.json") + + content := `{ + "responses": [ + { + "match": { + "method": "GET", + "urlPattern": "/api/v1/things/.*" + }, + "responses": [ + { + "statusCode": 200, + "headers": {"Content-Type": "application/json"}, + "body": {"id": "thing-1", "name": "Thing One"} + } + ] + }, + { + "match": { + "method": "POST", + "urlPattern": "/api/v1/actions" + }, + "responses": [ + { + "statusCode": 201, + "body": {"status": "created"} + }, + { + "statusCode": 409, + "body": {"error": "conflict"} + } + ] + } + ] +}` + err := os.WriteFile(filePath, []byte(content), 0644) + require.NoError(t, err) + + result, err := LoadDryrunResponses(filePath) + require.NoError(t, err) + require.NotNil(t, result) + + assert.Len(t, result.Responses, 2) + + // First endpoint + assert.Equal(t, "GET", result.Responses[0].Match.Method) + assert.Equal(t, "/api/v1/things/.*", result.Responses[0].Match.URLPattern) + assert.Len(t, result.Responses[0].Responses, 1) + assert.Equal(t, 200, result.Responses[0].Responses[0].StatusCode) + assert.Equal(t, "application/json", result.Responses[0].Responses[0].Headers["Content-Type"]) + require.NotNil(t, result.Responses[0].Responses[0].Body) + + // Second endpoint + assert.Equal(t, "POST", result.Responses[1].Match.Method) + assert.Equal(t, "/api/v1/actions", result.Responses[1].Match.URLPattern) + assert.Len(t, result.Responses[1].Responses, 2) + assert.Equal(t, 201, result.Responses[1].Responses[0].StatusCode) + assert.Equal(t, 409, result.Responses[1].Responses[1].StatusCode) + }) + + t.Run("real testdata file", func(t *testing.T) { + testdataPath := filepath.Join("..", "..", "test", "testdata", "dryrun", "dryrun-api-responses.json") + if _, err := os.Stat(testdataPath); os.IsNotExist(err) { + t.Skipf("testdata file not found at %s", testdataPath) + } + + result, err := LoadDryrunResponses(testdataPath) + require.NoError(t, err) + require.NotNil(t, result) + assert.Greater(t, len(result.Responses), 0, "expected at least one endpoint in testdata file") + + for i, ep := range result.Responses { + assert.NotEmpty(t, ep.Match.URLPattern, "endpoint %d should have a urlPattern", i) + assert.NotEmpty(t, ep.Responses, "endpoint %d should have at least one response", i) + } + }) +} + +func TestLoadDryrunResponses_FileNotFound(t *testing.T) { + _, err := LoadDryrunResponses("/nonexistent/path/responses.json") + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to read") +} + +func TestLoadDryrunResponses_InvalidJSON(t *testing.T) { + dir := t.TempDir() + filePath := filepath.Join(dir, "bad.json") + + err := os.WriteFile(filePath, []byte(`{not valid json}`), 0644) + require.NoError(t, err) + + _, err = LoadDryrunResponses(filePath) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse") +} + +func TestLoadDryrunResponses_NoResponses(t *testing.T) { + dir := t.TempDir() + filePath := filepath.Join(dir, "no-responses.json") + + content := `{ + "responses": [ + { + "match": { + "method": "GET", + "urlPattern": "/api/v1/things" + }, + "responses": [] + } + ] +}` + err := os.WriteFile(filePath, []byte(content), 0644) + require.NoError(t, err) + + _, err = LoadDryrunResponses(filePath) + require.Error(t, err) + assert.Contains(t, err.Error(), "no responses defined") +} + +func TestLoadDryrunResponses_EmptyURLPattern(t *testing.T) { + dir := t.TempDir() + filePath := filepath.Join(dir, "empty-pattern.json") + + content := `{ + "responses": [ + { + "match": { + "method": "GET", + "urlPattern": "" + }, + "responses": [ + { + "statusCode": 200 + } + ] + } + ] +}` + err := os.WriteFile(filePath, []byte(content), 0644) + require.NoError(t, err) + + _, err = LoadDryrunResponses(filePath) + require.Error(t, err) + assert.Contains(t, err.Error(), "empty urlPattern") +} diff --git a/internal/dryrun/event_loader.go b/internal/dryrun/event_loader.go new file mode 100644 index 0000000..5be2c93 --- /dev/null +++ b/internal/dryrun/event_loader.go @@ -0,0 +1,30 @@ +package dryrun + +import ( + "encoding/json" + "fmt" + "os" + + cloudevents "github.com/cloudevents/sdk-go/v2/event" +) + +// LoadCloudEvent reads a CloudEvent from a JSON file in standard CloudEvents +// JSON format and returns the parsed event. +func LoadCloudEvent(path string) (*cloudevents.Event, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read event file %q: %w", path, err) + } + + var evt cloudevents.Event + if err := json.Unmarshal(data, &evt); err != nil { + return nil, fmt.Errorf("failed to parse CloudEvent from %q: %w", path, err) + } + + // Basic validation + if err := evt.Validate(); err != nil { + return nil, fmt.Errorf("invalid CloudEvent in %q: %w", path, err) + } + + return &evt, nil +} diff --git a/internal/dryrun/event_loader_test.go b/internal/dryrun/event_loader_test.go new file mode 100644 index 0000000..b2c765c --- /dev/null +++ b/internal/dryrun/event_loader_test.go @@ -0,0 +1,53 @@ +package dryrun + +import ( + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func writeEventFile(t *testing.T, dir, name, content string) string { + t.Helper() + p := filepath.Join(dir, name) + err := os.WriteFile(p, []byte(content), 0644) + require.NoError(t, err) + return p +} + +func TestLoadCloudEvent_ValidFile(t *testing.T) { + t.Run("loads a valid CloudEvent from file", func(t *testing.T) { + dir := t.TempDir() + path := writeEventFile(t, dir, "event.json", `{"specversion":"1.0","id":"test-123","type":"com.example.test","source":"/test"}`) + + evt, err := LoadCloudEvent(path) + + require.NoError(t, err) + assert.Equal(t, "test-123", evt.ID()) + assert.Equal(t, "com.example.test", evt.Type()) + assert.Equal(t, "/test", evt.Source()) + }) +} + +func TestLoadCloudEvent_FileNotFound(t *testing.T) { + t.Run("returns error when file does not exist", func(t *testing.T) { + _, err := LoadCloudEvent("/nonexistent/path/event.json") + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to read") + }) +} + +func TestLoadCloudEvent_InvalidJSON(t *testing.T) { + t.Run("returns error for invalid JSON", func(t *testing.T) { + dir := t.TempDir() + path := writeEventFile(t, dir, "bad.json", `{not json}`) + + _, err := LoadCloudEvent(path) + + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse") + }) +} diff --git a/internal/dryrun/recording_transport_client.go b/internal/dryrun/recording_transport_client.go new file mode 100644 index 0000000..187199a --- /dev/null +++ b/internal/dryrun/recording_transport_client.go @@ -0,0 +1,180 @@ +package dryrun + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/manifest" + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/transport_client" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// TransportRecord stores details of a transport client operation. +type TransportRecord struct { + Operation string // "apply", "get", "discover" + GVK schema.GroupVersionKind + Namespace string + Name string + Manifest []byte + Result *transport_client.ApplyResult + Error error +} + +// DryrunTransportClient implements transport_client.TransportClient by recording +// all operations in-memory without executing real Kubernetes calls. +// Applied resources are stored for subsequent discovery/get operations. +type DryrunTransportClient struct { + mu sync.Mutex + resources map[string]*unstructured.Unstructured // key: "namespace/name/gvk" + Records []TransportRecord + discoveryOverrides DiscoveryOverrides +} + +// NewDryrunTransportClient creates a new DryrunTransportClient. +func NewDryrunTransportClient() *DryrunTransportClient { + return &DryrunTransportClient{ + resources: make(map[string]*unstructured.Unstructured), + Records: make([]TransportRecord, 0), + } +} + +// NewDryrunTransportClientWithOverrides creates a DryrunTransportClient +// with discovery overrides. When a resource is applied and its metadata.name +// matches a key in the overrides map, the override object replaces the applied +// manifest in the in-memory store. +func NewDryrunTransportClientWithOverrides(overrides DiscoveryOverrides) *DryrunTransportClient { + return &DryrunTransportClient{ + resources: make(map[string]*unstructured.Unstructured), + Records: make([]TransportRecord, 0), + discoveryOverrides: overrides, + } +} + +func resourceKey(gvk schema.GroupVersionKind, namespace, name string) string { + return fmt.Sprintf("%s/%s/%s/%s/%s", gvk.Group, gvk.Version, gvk.Kind, namespace, name) +} + +// ApplyResource parses the manifest JSON, stores it in-memory, and records the operation. +func (c *DryrunTransportClient) ApplyResource(ctx context.Context, manifestBytes []byte, opts *transport_client.ApplyOptions, target transport_client.TransportContext) (*transport_client.ApplyResult, error) { + c.mu.Lock() + defer c.mu.Unlock() + + // Parse manifest + obj := &unstructured.Unstructured{} + if err := json.Unmarshal(manifestBytes, &obj.Object); err != nil { + record := TransportRecord{ + Operation: "apply", + Manifest: manifestBytes, + Error: fmt.Errorf("failed to parse manifest: %w", err), + } + c.Records = append(c.Records, record) + return nil, record.Error + } + + gvk := obj.GroupVersionKind() + namespace := obj.GetNamespace() + name := obj.GetName() + key := resourceKey(gvk, namespace, name) + + // Determine operation: create or update + var operation manifest.Operation + if _, exists := c.resources[key]; exists { + operation = manifest.OperationUpdate + } else { + operation = manifest.OperationCreate + } + + if opts != nil && opts.RecreateOnChange && operation == manifest.OperationUpdate { + operation = manifest.OperationRecreate + } + + // Check for discovery override by resource name + if c.discoveryOverrides != nil { + if override, found := c.discoveryOverrides[name]; found { + overrideObj := &unstructured.Unstructured{Object: override} + c.resources[key] = overrideObj + } else { + c.resources[key] = obj + } + } else { + c.resources[key] = obj + } + + result := &transport_client.ApplyResult{ + Operation: operation, + Reason: fmt.Sprintf("dry-run %s", operation), + } + + c.Records = append(c.Records, TransportRecord{ + Operation: "apply", + GVK: gvk, + Namespace: namespace, + Name: name, + Manifest: manifestBytes, + Result: result, + }) + + return result, nil +} + +// GetResource returns a resource from the in-memory store or a NotFound error. +func (c *DryrunTransportClient) GetResource(ctx context.Context, gvk schema.GroupVersionKind, namespace, name string, target transport_client.TransportContext) (*unstructured.Unstructured, error) { + c.mu.Lock() + defer c.mu.Unlock() + + key := resourceKey(gvk, namespace, name) + obj, exists := c.resources[key] + + c.Records = append(c.Records, TransportRecord{ + Operation: "get", + GVK: gvk, + Namespace: namespace, + Name: name, + }) + + if !exists { + return nil, fmt.Errorf("resource %s/%s %s/%s not found (dry-run)", gvk.Kind, gvk.Version, namespace, name) + } + + return obj.DeepCopy(), nil +} + +// DiscoverResources returns resources from the in-memory store filtered by discovery config. +func (c *DryrunTransportClient) DiscoverResources(ctx context.Context, gvk schema.GroupVersionKind, discovery manifest.Discovery, target transport_client.TransportContext) (*unstructured.UnstructuredList, error) { + c.mu.Lock() + defer c.mu.Unlock() + + c.Records = append(c.Records, TransportRecord{ + Operation: "discover", + GVK: gvk, + Namespace: discovery.GetNamespace(), + Name: discovery.GetName(), + }) + + list := &unstructured.UnstructuredList{} + + for _, obj := range c.resources { + objGVK := obj.GroupVersionKind() + if objGVK.Group != gvk.Group || objGVK.Version != gvk.Version || objGVK.Kind != gvk.Kind { + continue + } + + // Filter by namespace + ns := discovery.GetNamespace() + if ns != "" && ns != "*" && obj.GetNamespace() != ns { + continue + } + + // Filter by name if single-resource discovery + if discovery.IsSingleResource() && obj.GetName() != discovery.GetName() { + continue + } + + list.Items = append(list.Items, *obj.DeepCopy()) + } + + return list, nil +} diff --git a/internal/dryrun/recording_transport_client_test.go b/internal/dryrun/recording_transport_client_test.go new file mode 100644 index 0000000..968c2d2 --- /dev/null +++ b/internal/dryrun/recording_transport_client_test.go @@ -0,0 +1,405 @@ +package dryrun + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "testing" + + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/manifest" + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/transport_client" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// testDiscovery implements manifest.Discovery for use in tests. +type testDiscovery struct { + namespace string + name string + labelSelector string + singleResource bool +} + +func (d *testDiscovery) GetNamespace() string { return d.namespace } +func (d *testDiscovery) GetName() string { return d.name } +func (d *testDiscovery) GetLabelSelector() string { + return d.labelSelector +} +func (d *testDiscovery) IsSingleResource() bool { return d.singleResource } + +// makeManifest builds a valid JSON manifest for testing. +func makeManifest(apiVersion, kind, namespace, name string) []byte { + obj := map[string]interface{}{ + "apiVersion": apiVersion, + "kind": kind, + "metadata": map[string]interface{}{ + "name": name, + "namespace": namespace, + }, + } + data, _ := json.Marshal(obj) + return data +} + +func TestApplyResource_CreateNew(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + manifestBytes := makeManifest("v1", "ConfigMap", "default", "my-cm") + + result, err := client.ApplyResource(ctx, manifestBytes, nil, nil) + + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, manifest.OperationCreate, result.Operation) + assert.Contains(t, result.Reason, "dry-run") + + // Verify record was appended. + require.Len(t, client.Records, 1) + assert.Equal(t, "apply", client.Records[0].Operation) + assert.Equal(t, "my-cm", client.Records[0].Name) + assert.Equal(t, "default", client.Records[0].Namespace) + assert.Nil(t, client.Records[0].Error) + + // Verify resource is retrievable via Get. + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + obj, err := client.GetResource(ctx, gvk, "default", "my-cm", nil) + require.NoError(t, err) + assert.Equal(t, "my-cm", obj.GetName()) +} + +func TestApplyResource_UpdateExisting(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + manifestBytes := makeManifest("v1", "ConfigMap", "default", "my-cm") + + // First apply: create. + result1, err := client.ApplyResource(ctx, manifestBytes, nil, nil) + require.NoError(t, err) + assert.Equal(t, manifest.OperationCreate, result1.Operation) + + // Second apply: update. + result2, err := client.ApplyResource(ctx, manifestBytes, nil, nil) + require.NoError(t, err) + assert.Equal(t, manifest.OperationUpdate, result2.Operation) + + require.Len(t, client.Records, 2) +} + +func TestApplyResource_RecreateOnChange(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + manifestBytes := makeManifest("v1", "ConfigMap", "default", "my-cm") + + // First apply to create the resource. + _, err := client.ApplyResource(ctx, manifestBytes, nil, nil) + require.NoError(t, err) + + // Second apply with RecreateOnChange. + opts := &transport_client.ApplyOptions{RecreateOnChange: true} + result, err := client.ApplyResource(ctx, manifestBytes, opts, nil) + require.NoError(t, err) + assert.Equal(t, manifest.OperationRecreate, result.Operation) +} + +func TestApplyResource_InvalidJSON(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + + result, err := client.ApplyResource(ctx, []byte("{invalid-json"), nil, nil) + + assert.Nil(t, result) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse manifest") + + // Record should still be appended with the error. + require.Len(t, client.Records, 1) + assert.Equal(t, "apply", client.Records[0].Operation) + assert.NotNil(t, client.Records[0].Error) +} + +func TestApplyResource_NilOptions(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + manifestBytes := makeManifest("v1", "ConfigMap", "default", "my-cm") + + // First apply to create. + _, err := client.ApplyResource(ctx, manifestBytes, nil, nil) + require.NoError(t, err) + + // Second apply with nil opts should not panic and should produce Update. + result, err := client.ApplyResource(ctx, manifestBytes, nil, nil) + require.NoError(t, err) + assert.Equal(t, manifest.OperationUpdate, result.Operation) +} + +func TestApplyResource_WithDiscoveryOverride(t *testing.T) { + ctx := context.Background() + + overrides := DiscoveryOverrides{ + "my-cm": { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "my-cm", + "namespace": "default", + "resourceVersion": "12345", + "uid": "fake-uid", + }, + "data": map[string]interface{}{ + "overridden": "true", + }, + }, + } + client := NewDryrunTransportClientWithOverrides(overrides) + manifestBytes := makeManifest("v1", "ConfigMap", "default", "my-cm") + + _, err := client.ApplyResource(ctx, manifestBytes, nil, nil) + require.NoError(t, err) + + // Get should return the override, not the original manifest. + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + obj, err := client.GetResource(ctx, gvk, "default", "my-cm", nil) + require.NoError(t, err) + + data, found, err := unstructuredNestedString(obj.Object, "data", "overridden") + require.NoError(t, err) + assert.True(t, found) + assert.Equal(t, "true", data) +} + +// unstructuredNestedString is a small helper to navigate nested maps. +func unstructuredNestedString(obj map[string]interface{}, fields ...string) (string, bool, error) { + current := obj + for i, f := range fields { + val, ok := current[f] + if !ok { + return "", false, nil + } + if i == len(fields)-1 { + s, ok := val.(string) + return s, ok, nil + } + m, ok := val.(map[string]interface{}) + if !ok { + return "", false, fmt.Errorf("field %q is not a map", f) + } + current = m + } + return "", false, nil +} + +func TestApplyResource_OverrideNoMatch(t *testing.T) { + ctx := context.Background() + + overrides := DiscoveryOverrides{ + "other-resource": { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": map[string]interface{}{ + "name": "other-resource", + "namespace": "default", + }, + }, + } + client := NewDryrunTransportClientWithOverrides(overrides) + manifestBytes := makeManifest("v1", "ConfigMap", "default", "my-cm") + + _, err := client.ApplyResource(ctx, manifestBytes, nil, nil) + require.NoError(t, err) + + // Get should return the original manifest since no override matched. + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + obj, err := client.GetResource(ctx, gvk, "default", "my-cm", nil) + require.NoError(t, err) + assert.Equal(t, "my-cm", obj.GetName()) + + // The override's extra fields should not be present. + _, found, _ := unstructuredNestedString(obj.Object, "data", "overridden") + assert.False(t, found) +} + +func TestGetResource_NotFound(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + obj, err := client.GetResource(ctx, gvk, "default", "missing", nil) + + assert.Nil(t, obj) + require.Error(t, err) + assert.Contains(t, err.Error(), "not found") + + // Record should still be appended. + require.Len(t, client.Records, 1) + assert.Equal(t, "get", client.Records[0].Operation) + assert.Equal(t, "missing", client.Records[0].Name) +} + +func TestGetResource_ReturnsDeepCopy(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + manifestBytes := makeManifest("v1", "ConfigMap", "default", "my-cm") + + _, err := client.ApplyResource(ctx, manifestBytes, nil, nil) + require.NoError(t, err) + + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + // Get the resource and mutate it. + obj, err := client.GetResource(ctx, gvk, "default", "my-cm", nil) + require.NoError(t, err) + obj.SetName("mutated-name") + + // Get again and verify the store was not affected. + obj2, err := client.GetResource(ctx, gvk, "default", "my-cm", nil) + require.NoError(t, err) + assert.Equal(t, "my-cm", obj2.GetName()) +} + +func TestDiscoverResources_ByGVK(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + + // Apply two ConfigMaps and one Secret. + _, err := client.ApplyResource(ctx, makeManifest("v1", "ConfigMap", "default", "cm-1"), nil, nil) + require.NoError(t, err) + _, err = client.ApplyResource(ctx, makeManifest("v1", "ConfigMap", "default", "cm-2"), nil, nil) + require.NoError(t, err) + _, err = client.ApplyResource(ctx, makeManifest("v1", "Secret", "default", "secret-1"), nil, nil) + require.NoError(t, err) + + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + disc := &testDiscovery{namespace: "", name: ""} + + list, err := client.DiscoverResources(ctx, gvk, disc, nil) + require.NoError(t, err) + assert.Len(t, list.Items, 2) + + // All returned items should be ConfigMaps. + for _, item := range list.Items { + assert.Equal(t, "ConfigMap", item.GetKind()) + } +} + +func TestDiscoverResources_FilterByNamespace(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + + _, err := client.ApplyResource(ctx, makeManifest("v1", "ConfigMap", "ns-a", "cm-1"), nil, nil) + require.NoError(t, err) + _, err = client.ApplyResource(ctx, makeManifest("v1", "ConfigMap", "ns-b", "cm-2"), nil, nil) + require.NoError(t, err) + + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + disc := &testDiscovery{namespace: "ns-a"} + + list, err := client.DiscoverResources(ctx, gvk, disc, nil) + require.NoError(t, err) + require.Len(t, list.Items, 1) + assert.Equal(t, "cm-1", list.Items[0].GetName()) +} + +func TestDiscoverResources_SingleResourceByName(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + + _, err := client.ApplyResource(ctx, makeManifest("v1", "ConfigMap", "default", "cm-1"), nil, nil) + require.NoError(t, err) + _, err = client.ApplyResource(ctx, makeManifest("v1", "ConfigMap", "default", "cm-2"), nil, nil) + require.NoError(t, err) + + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + disc := &testDiscovery{ + namespace: "default", + name: "cm-1", + singleResource: true, + } + + list, err := client.DiscoverResources(ctx, gvk, disc, nil) + require.NoError(t, err) + require.Len(t, list.Items, 1) + assert.Equal(t, "cm-1", list.Items[0].GetName()) +} + +func TestDiscoverResources_EmptyStore(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + disc := &testDiscovery{} + + list, err := client.DiscoverResources(ctx, gvk, disc, nil) + require.NoError(t, err) + assert.Empty(t, list.Items) +} + +func TestConcurrentApplyAndGet(t *testing.T) { + ctx := context.Background() + client := NewDryrunTransportClient() + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + + const n = 50 + var wg sync.WaitGroup + + // Concurrently apply n resources. + for i := 0; i < n; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + name := fmt.Sprintf("cm-%d", idx) + m := makeManifest("v1", "ConfigMap", "default", name) + _, err := client.ApplyResource(ctx, m, nil, nil) + assert.NoError(t, err) + }(i) + } + wg.Wait() + + // Concurrently get those resources. + for i := 0; i < n; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + name := fmt.Sprintf("cm-%d", idx) + obj, err := client.GetResource(ctx, gvk, "default", name, nil) + assert.NoError(t, err) + assert.Equal(t, name, obj.GetName()) + }(i) + } + wg.Wait() + + // Verify all apply records exist (n applies + n gets). + assert.Len(t, client.Records, 2*n) + + // Count apply records specifically. + applyCount := 0 + for _, r := range client.Records { + if r.Operation == "apply" { + applyCount++ + } + } + assert.Equal(t, n, applyCount) + + // Verify no errors in any record. + for _, r := range client.Records { + if r.Operation == "apply" { + assert.Nil(t, r.Error) + } + } + + // Also verify discover works after concurrent writes. + disc := &testDiscovery{} + list, err := client.DiscoverResources(ctx, gvk, disc, nil) + require.NoError(t, err) + assert.Len(t, list.Items, n) + + // Check no "not found" errors leaked into the results. + for _, r := range client.Records { + if r.Operation == "get" { + assert.False(t, strings.Contains(fmt.Sprintf("%v", r.Error), "not found"), + "unexpected not-found error for resource %s", r.Name) + } + } +} diff --git a/internal/dryrun/trace.go b/internal/dryrun/trace.go new file mode 100644 index 0000000..e496d79 --- /dev/null +++ b/internal/dryrun/trace.go @@ -0,0 +1,394 @@ +package dryrun + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/executor" +) + +const ( + statusSuccess = "SUCCESS" + statusFailed = "FAILED" +) + +// ExecutionTrace contains all data needed to produce the trace output. +type ExecutionTrace struct { + EventID string + EventType string + Result *executor.ExecutionResult + APIClient *DryrunAPIClient + Transport *DryrunTransportClient + Verbose bool +} + +// TraceJSON is the JSON-serializable representation of the execution trace. +type TraceJSON struct { + Event TraceEvent `json:"event"` + Status string `json:"status"` + Params map[string]interface{} `json:"params,omitempty"` + Preconditions []TracePrecondition `json:"preconditions,omitempty"` + Resources []TraceResource `json:"resources,omitempty"` + DiscoveredResources map[string]interface{} `json:"discoveredResources,omitempty"` + PostActions []TracePostAction `json:"postActions,omitempty"` + Errors map[string]string `json:"errors,omitempty"` + APIRequests []TraceAPIRequest `json:"apiRequests,omitempty"` + TransportOps []TraceTransportOp `json:"transportOperations,omitempty"` +} + +// TraceEvent is the JSON representation of the event. +type TraceEvent struct { + ID string `json:"id"` + Type string `json:"type"` +} + +// TracePrecondition is the JSON representation of a precondition result. +type TracePrecondition struct { + Name string `json:"name"` + Status string `json:"status"` + Matched bool `json:"matched"` + Error string `json:"error,omitempty"` +} + +// TraceResource is the JSON representation of a resource result. +type TraceResource struct { + Name string `json:"name"` + Kind string `json:"kind"` + Namespace string `json:"namespace,omitempty"` + ResName string `json:"resourceName,omitempty"` + Status string `json:"status"` + Operation string `json:"operation"` + Reason string `json:"reason,omitempty"` + Error string `json:"error,omitempty"` +} + +// TracePostAction is the JSON representation of a post-action result. +type TracePostAction struct { + Name string `json:"name"` + Status string `json:"status"` + Skipped bool `json:"skipped,omitempty"` + Error string `json:"error,omitempty"` +} + +// TraceAPIRequest is the JSON representation of a recorded API request. +type TraceAPIRequest struct { + Method string `json:"method"` + URL string `json:"url"` + StatusCode int `json:"statusCode"` + Request string `json:"requestBody,omitempty"` + Response string `json:"responseBody,omitempty"` +} + +// TraceTransportOp is the JSON representation of a recorded transport operation. +type TraceTransportOp struct { + Operation string `json:"operation"` + Kind string `json:"kind"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name"` + Result string `json:"result,omitempty"` +} + +// FormatText formats the execution trace as human-readable text. +func (t *ExecutionTrace) FormatText() string { + var b strings.Builder + result := t.Result + + b.WriteString("Dry-Run Execution Trace\n") + b.WriteString("========================\n") + fmt.Fprintf(&b, "Event: id=%s type=%s\n\n", t.EventID, t.EventType) + + // Phase 1: Parameter Extraction + paramStatus := statusSuccess + if _, ok := result.Errors[executor.PhaseParamExtraction]; ok { + paramStatus = statusFailed + } + fmt.Fprintf(&b, "Phase 1: Parameter Extraction .............. %s\n", paramStatus) + if paramStatus == statusSuccess { + for name, val := range result.Params { + fmt.Fprintf(&b, " %-16s = %v\n", name, formatValue(val)) + } + } else { + fmt.Fprintf(&b, " Error: %v\n", result.Errors[executor.PhaseParamExtraction]) + } + b.WriteString("\n") + + // Phase 2: Preconditions + precondStatus := statusSuccess + precondDetail := "" + if _, ok := result.Errors[executor.PhasePreconditions]; ok { + precondStatus = statusFailed + } else if result.ResourcesSkipped && result.SkipReason != "" { + precondDetail = " (NOT MET)" + } else if len(result.PreconditionResults) > 0 { + precondDetail = " (MET)" + } + fmt.Fprintf(&b, "Phase 2: Preconditions ..................... %s%s\n", precondStatus, precondDetail) + + apiReqIdx := 0 + for i, pr := range result.PreconditionResults { + status := "PASS" + if pr.Status == executor.StatusFailed { + status = "FAIL" + } else if !pr.Matched { + status = "NOT MET" + } + fmt.Fprintf(&b, " [%d/%d] %-30s %s\n", i+1, len(result.PreconditionResults), pr.Name, status) + + if pr.APICallMade && apiReqIdx < len(t.APIClient.Requests) { + req := t.APIClient.Requests[apiReqIdx] + fmt.Fprintf(&b, " API Call: %s %s -> %d\n", req.Method, req.URL, req.StatusCode) + if t.Verbose { + if len(req.Body) > 0 { + fmt.Fprintf(&b, " [verbose] Request body:\n %s\n", prettyJSON(req.Body)) + } + if len(req.Response) > 0 { + fmt.Fprintf(&b, " [verbose] Response body:\n %s\n", prettyJSON(req.Response)) + } + } + apiReqIdx++ + } + + if len(pr.CapturedFields) > 0 { + for name, val := range pr.CapturedFields { + fmt.Fprintf(&b, " Captured: %s = %v\n", name, formatValue(val)) + } + } + + if pr.Error != nil { + fmt.Fprintf(&b, " Error: %v\n", pr.Error) + } + } + b.WriteString("\n") + + // Phase 3: Resources + resStatus := statusSuccess + if _, ok := result.Errors[executor.PhaseResources]; ok { + resStatus = statusFailed + } else if result.ResourcesSkipped { + resStatus = "SKIPPED" + } + fmt.Fprintf(&b, "Phase 3: Resources ........................ %s\n", resStatus) + + if result.ResourcesSkipped { + fmt.Fprintf(&b, " Reason: %s\n", result.SkipReason) + } else { + for i, rr := range result.ResourceResults { + opStr := strings.ToUpper(string(rr.Operation)) + if opStr == "" { + opStr = "UNKNOWN" + } + status := opStr + if rr.Status == executor.StatusFailed { + status = statusFailed + } + fmt.Fprintf(&b, " [%d/%d] %-30s %s\n", i+1, len(result.ResourceResults), rr.Name, status) + fmt.Fprintf(&b, " Kind: %-12s Namespace: %-12s Name: %s\n", rr.Kind, rr.Namespace, rr.ResourceName) + + if t.Verbose { + for _, tr := range t.Transport.Records { + if tr.Operation == "apply" && tr.Name == rr.ResourceName && tr.Namespace == rr.Namespace { + fmt.Fprintf(&b, " [verbose] Rendered manifest:\n %s\n", prettyJSON(tr.Manifest)) + break + } + } + } + + if rr.Error != nil { + fmt.Fprintf(&b, " Error: %v\n", rr.Error) + } + } + } + + // Discovery results (resources available for payload CEL: resources.) + if result.ExecutionContext != nil && result.ExecutionContext.Resources != nil && len(result.ExecutionContext.Resources) > 0 { + b.WriteString("\nPhase 3.5: Discovery Results ................. (available as resources.* in payload)\n") + celVars := result.ExecutionContext.GetCELVariables() + if r, ok := celVars["resources"].(map[string]interface{}); ok { + for name, val := range r { + fmt.Fprintf(&b, " %s:\n", name) + b.WriteString(" ") + b.WriteString(formatValue(val)) + b.WriteString("\n") + } + } + } + b.WriteString("\n") + + // Phase 4: Post Actions + postStatus := statusSuccess + if _, ok := result.Errors[executor.PhasePostActions]; ok { + postStatus = statusFailed + } + fmt.Fprintf(&b, "Phase 4: Post Actions ..................... %s\n", postStatus) + + for i, pa := range result.PostActionResults { + status := "EXECUTED" + if pa.Skipped { + status = "SKIPPED" + } else if pa.Status == executor.StatusFailed { + status = statusFailed + } + fmt.Fprintf(&b, " [%d/%d] %-30s %s\n", i+1, len(result.PostActionResults), pa.Name, status) + + if pa.Skipped { + fmt.Fprintf(&b, " Reason: %s\n", pa.SkipReason) + } + + if pa.APICallMade && apiReqIdx < len(t.APIClient.Requests) { + req := t.APIClient.Requests[apiReqIdx] + fmt.Fprintf(&b, " API Call: %s %s -> %d\n", req.Method, req.URL, req.StatusCode) + if t.Verbose { + if len(req.Body) > 0 { + fmt.Fprintf(&b, " [verbose] Request body:\n %s\n", prettyJSON(req.Body)) + } + if len(req.Response) > 0 { + fmt.Fprintf(&b, " [verbose] Response body:\n %s\n", prettyJSON(req.Response)) + } + } + apiReqIdx++ + } + + if pa.Error != nil { + fmt.Fprintf(&b, " Error: %v\n", pa.Error) + } + } + b.WriteString("\n") + + // Final result + resultStr := statusSuccess + if result.Status == executor.StatusFailed { + resultStr = statusFailed + } + fmt.Fprintf(&b, "Result: %s\n", resultStr) + + return b.String() +} + +// FormatJSON formats the execution trace as JSON. +func (t *ExecutionTrace) FormatJSON() ([]byte, error) { + result := t.Result + + trace := TraceJSON{ + Event: TraceEvent{ID: t.EventID, Type: t.EventType}, + Status: string(result.Status), + Params: result.Params, + } + + // Discovered resources (from discovery phase, used in payload CEL) + if result.ExecutionContext != nil { + celVars := result.ExecutionContext.GetCELVariables() + if r, ok := celVars["resources"].(map[string]interface{}); ok { + trace.DiscoveredResources = r + } + } + + // Preconditions + for _, pr := range result.PreconditionResults { + tp := TracePrecondition{ + Name: pr.Name, + Status: string(pr.Status), + Matched: pr.Matched, + } + if pr.Error != nil { + tp.Error = pr.Error.Error() + } + trace.Preconditions = append(trace.Preconditions, tp) + } + + // Resources + for _, rr := range result.ResourceResults { + tr := TraceResource{ + Name: rr.Name, + Kind: rr.Kind, + Namespace: rr.Namespace, + ResName: rr.ResourceName, + Status: string(rr.Status), + Operation: string(rr.Operation), + Reason: rr.OperationReason, + } + if rr.Error != nil { + tr.Error = rr.Error.Error() + } + trace.Resources = append(trace.Resources, tr) + } + + // Post Actions + for _, pa := range result.PostActionResults { + tp := TracePostAction{ + Name: pa.Name, + Status: string(pa.Status), + Skipped: pa.Skipped, + } + if pa.Error != nil { + tp.Error = pa.Error.Error() + } + trace.PostActions = append(trace.PostActions, tp) + } + + // Errors + if len(result.Errors) > 0 { + trace.Errors = make(map[string]string) + for phase, err := range result.Errors { + trace.Errors[string(phase)] = err.Error() + } + } + + // API Requests + for _, req := range t.APIClient.Requests { + tr := TraceAPIRequest{ + Method: req.Method, + URL: req.URL, + StatusCode: req.StatusCode, + } + if t.Verbose { + if len(req.Body) > 0 { + tr.Request = string(req.Body) + } + if len(req.Response) > 0 { + tr.Response = string(req.Response) + } + } + trace.APIRequests = append(trace.APIRequests, tr) + } + + // Transport Operations + for _, rec := range t.Transport.Records { + op := TraceTransportOp{ + Operation: rec.Operation, + Kind: rec.GVK.Kind, + Namespace: rec.Namespace, + Name: rec.Name, + } + if rec.Result != nil { + op.Result = string(rec.Result.Operation) + } + trace.TransportOps = append(trace.TransportOps, op) + } + + return json.MarshalIndent(trace, "", " ") +} + +// prettyJSON attempts to indent raw JSON bytes for readable output. +// If the input is not valid JSON, it is returned as-is. +func prettyJSON(raw []byte) string { + var buf bytes.Buffer + if err := json.Indent(&buf, raw, " ", " "); err != nil { + return string(raw) + } + return buf.String() +} + +func formatValue(v interface{}) string { + switch val := v.(type) { + case string: + return fmt.Sprintf("%q", val) + default: + b, err := json.Marshal(val) + if err != nil { + return fmt.Sprintf("%v", val) + } + return string(b) + } +} diff --git a/internal/dryrun/trace_test.go b/internal/dryrun/trace_test.go new file mode 100644 index 0000000..819b611 --- /dev/null +++ b/internal/dryrun/trace_test.go @@ -0,0 +1,276 @@ +package dryrun + +import ( + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/executor" + "github.com/openshift-hyperfleet/hyperfleet-adapter/internal/manifest" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func makeTestTrace(status executor.ExecutionStatus, verbose bool) *ExecutionTrace { + apiClient, _ := NewDryrunAPIClient(nil) + transport := NewDryrunTransportClient() + return &ExecutionTrace{ + EventID: "test-event-id", + EventType: "test.event.type", + Result: &executor.ExecutionResult{ + Status: status, + Params: map[string]interface{}{"key": "value"}, + Errors: make(map[executor.ExecutionPhase]error), + }, + APIClient: apiClient, + Transport: transport, + Verbose: verbose, + } +} + +func TestFormatText_Success(t *testing.T) { + t.Run("successful execution trace contains all phases and SUCCESS result", func(t *testing.T) { + trace := makeTestTrace(executor.StatusSuccess, false) + trace.Result.PreconditionResults = []executor.PreconditionResult{ + { + Name: "check-exists", + Status: executor.StatusSuccess, + Matched: true, + }, + } + trace.Result.ResourceResults = []executor.ResourceResult{ + { + Name: "my-resource", + Kind: "ConfigMap", + Namespace: "default", + ResourceName: "my-configmap", + Status: executor.StatusSuccess, + Operation: manifest.OperationCreate, + }, + } + + output := trace.FormatText() + + assert.Contains(t, output, "Dry-Run Execution Trace") + assert.Contains(t, output, "Phase 1: Parameter Extraction") + assert.Contains(t, output, "Phase 2: Preconditions") + assert.Contains(t, output, "Phase 3: Resources") + assert.Contains(t, output, "Phase 4: Post Actions") + assert.Contains(t, output, "Result: SUCCESS") + }) +} + +func TestFormatText_Failed(t *testing.T) { + t.Run("failed execution trace shows FAILED result and error in resource result", func(t *testing.T) { + trace := makeTestTrace(executor.StatusFailed, false) + trace.Result.Errors[executor.PhaseResources] = fmt.Errorf("resource apply failed: connection refused") + trace.Result.ResourceResults = []executor.ResourceResult{ + { + Name: "failing-resource", + Kind: "Deployment", + Namespace: "default", + ResourceName: "my-deploy", + Status: executor.StatusFailed, + Operation: manifest.OperationCreate, + Error: fmt.Errorf("resource apply failed: connection refused"), + }, + } + + output := trace.FormatText() + + assert.Contains(t, output, "Result: FAILED") + assert.Contains(t, output, "resource apply failed: connection refused") + }) +} + +func TestFormatText_ResourcesSkipped(t *testing.T) { + t.Run("skipped resources phase shows SKIPPED", func(t *testing.T) { + trace := makeTestTrace(executor.StatusSuccess, false) + trace.Result.ResourcesSkipped = true + trace.Result.SkipReason = "preconditions not met" + + output := trace.FormatText() + + assert.Contains(t, output, "Phase 3: Resources") + assert.Contains(t, output, "SKIPPED") + assert.Contains(t, output, "preconditions not met") + }) +} + +func TestFormatText_VerboseShowsBodies(t *testing.T) { + t.Run("verbose mode includes request and response bodies", func(t *testing.T) { + trace := makeTestTrace(executor.StatusSuccess, true) + + trace.APIClient.Requests = append(trace.APIClient.Requests, RequestRecord{ + Method: "GET", + URL: "http://mock-api/test", + StatusCode: 200, + Body: []byte(`{"request":"data"}`), + Response: []byte(`{"response":"data"}`), + }) + trace.Result.PreconditionResults = []executor.PreconditionResult{ + { + Name: "api-check", + Status: executor.StatusSuccess, + Matched: true, + APICallMade: true, + }, + } + + output := trace.FormatText() + + assert.Contains(t, output, "[verbose]") + }) +} + +func TestFormatText_NonVerboseOmitsBodies(t *testing.T) { + t.Run("non-verbose mode omits request and response bodies", func(t *testing.T) { + trace := makeTestTrace(executor.StatusSuccess, false) + + trace.APIClient.Requests = append(trace.APIClient.Requests, RequestRecord{ + Method: "GET", + URL: "http://mock-api/test", + StatusCode: 200, + Body: []byte(`{"request":"data"}`), + Response: []byte(`{"response":"data"}`), + }) + trace.Result.PreconditionResults = []executor.PreconditionResult{ + { + Name: "api-check", + Status: executor.StatusSuccess, + Matched: true, + APICallMade: true, + }, + } + + output := trace.FormatText() + + assert.NotContains(t, output, "[verbose]") + }) +} + +func TestFormatJSON_Structure(t *testing.T) { + t.Run("JSON output has correct event and status fields", func(t *testing.T) { + trace := makeTestTrace(executor.StatusSuccess, false) + trace.Result.PreconditionResults = []executor.PreconditionResult{ + { + Name: "check-exists", + Status: executor.StatusSuccess, + Matched: true, + }, + } + + data, err := trace.FormatJSON() + require.NoError(t, err) + + var result TraceJSON + err = json.Unmarshal(data, &result) + require.NoError(t, err) + + assert.Equal(t, "test-event-id", result.Event.ID) + assert.Equal(t, "test.event.type", result.Event.Type) + assert.Equal(t, string(executor.StatusSuccess), result.Status) + assert.Len(t, result.Preconditions, 1) + assert.Equal(t, "check-exists", result.Preconditions[0].Name) + }) +} + +func TestFormatJSON_VerboseIncludesBodies(t *testing.T) { + t.Run("verbose JSON includes request and response bodies", func(t *testing.T) { + trace := makeTestTrace(executor.StatusSuccess, true) + + trace.APIClient.Requests = append(trace.APIClient.Requests, RequestRecord{ + Method: "POST", + URL: "http://mock-api/resource", + StatusCode: 201, + Body: []byte(`{"name":"test"}`), + Response: []byte(`{"id":"123"}`), + }) + + data, err := trace.FormatJSON() + require.NoError(t, err) + + var result TraceJSON + err = json.Unmarshal(data, &result) + require.NoError(t, err) + + require.Len(t, result.APIRequests, 1) + assert.NotEmpty(t, result.APIRequests[0].Request) + assert.NotEmpty(t, result.APIRequests[0].Response) + }) +} + +func TestFormatJSON_NonVerboseOmitsBodies(t *testing.T) { + t.Run("non-verbose JSON omits request and response bodies", func(t *testing.T) { + trace := makeTestTrace(executor.StatusSuccess, false) + + trace.APIClient.Requests = append(trace.APIClient.Requests, RequestRecord{ + Method: "POST", + URL: "http://mock-api/resource", + StatusCode: 201, + Body: []byte(`{"name":"test"}`), + Response: []byte(`{"id":"123"}`), + }) + + data, err := trace.FormatJSON() + require.NoError(t, err) + + var result TraceJSON + err = json.Unmarshal(data, &result) + require.NoError(t, err) + + require.Len(t, result.APIRequests, 1) + assert.Empty(t, result.APIRequests[0].Request) + assert.Empty(t, result.APIRequests[0].Response) + }) +} + +func TestPrettyJSON(t *testing.T) { + t.Run("valid JSON is indented", func(t *testing.T) { + input := []byte(`{"key":"value","nested":{"a":1}}`) + result := prettyJSON(input) + + assert.Contains(t, result, "\n") + assert.Contains(t, result, " ") + + // Verify it is valid JSON + var parsed interface{} + err := json.Unmarshal([]byte(result), &parsed) + assert.NoError(t, err) + }) + + t.Run("invalid JSON is returned as-is", func(t *testing.T) { + input := []byte(`not valid json {{{`) + result := prettyJSON(input) + + assert.Equal(t, string(input), result) + }) +} + +func TestFormatValue(t *testing.T) { + t.Run("string value is quoted", func(t *testing.T) { + result := formatValue("hello") + + assert.True(t, strings.HasPrefix(result, `"`)) + assert.True(t, strings.HasSuffix(result, `"`)) + assert.Contains(t, result, "hello") + }) + + t.Run("non-string value uses JSON representation", func(t *testing.T) { + input := map[string]interface{}{"a": 1, "b": "two"} + result := formatValue(input) + + // Should be valid JSON + var parsed map[string]interface{} + err := json.Unmarshal([]byte(result), &parsed) + assert.NoError(t, err) + assert.Equal(t, float64(1), parsed["a"]) + assert.Equal(t, "two", parsed["b"]) + }) + + t.Run("integer value uses JSON representation", func(t *testing.T) { + result := formatValue(42) + assert.Equal(t, "42", result) + }) +} diff --git a/internal/executor/resource_executor.go b/internal/executor/resource_executor.go index 8413b72..dd7d8a9 100644 --- a/internal/executor/resource_executor.go +++ b/internal/executor/resource_executor.go @@ -76,13 +76,21 @@ func (re *ResourceExecutor) executeResource(ctx context.Context, resource config return result, NewExecutorError(PhaseResources, resource.Name, "failed to render manifest", err) } - // Step 2: Prepare apply options + // Step 2: Extract resource identity from rendered manifest for result reporting + var obj unstructured.Unstructured + if err := json.Unmarshal(renderedBytes, &obj.Object); err == nil { + result.Kind = obj.GetKind() + result.Namespace = obj.GetNamespace() + result.ResourceName = obj.GetName() + } + + // Step 3: Prepare apply options var applyOpts *transport_client.ApplyOptions if resource.RecreateOnChange { applyOpts = &transport_client.ApplyOptions{RecreateOnChange: true} } - // Step 3: Build transport context (nil for k8s, *maestro_client.TransportContext for maestro) + // Step 4: Build transport context (nil for k8s, *maestro_client.TransportContext for maestro) var transportTarget transport_client.TransportContext if resource.IsMaestroTransport() && resource.Transport.Maestro != nil { targetCluster, tplErr := renderTemplate(resource.Transport.Maestro.TargetCluster, execCtx.Params) @@ -96,7 +104,7 @@ func (re *ResourceExecutor) executeResource(ctx context.Context, resource config } } - // Step 4: Call transport client ApplyResource with rendered bytes + // Step 5: Call transport client ApplyResource with rendered bytes applyResult, err := transportClient.ApplyResource(ctx, renderedBytes, applyOpts, transportTarget) if err != nil { result.Status = StatusFailed @@ -112,7 +120,7 @@ func (re *ResourceExecutor) executeResource(ctx context.Context, resource config return result, NewExecutorError(PhaseResources, resource.Name, "failed to apply resource", err) } - // Step 5: Extract result + // Step 6: Extract result result.Operation = applyResult.Operation result.OperationReason = applyResult.Reason @@ -120,13 +128,13 @@ func (re *ResourceExecutor) executeResource(ctx context.Context, resource config re.log.Infof(successCtx, "Resource[%s] processed: operation=%s reason=%s", resource.Name, result.Operation, result.OperationReason) - // Step 6: Post-apply discovery — find the applied resource and store in execCtx for CEL evaluation + // Step 7: Post-apply discovery — find the applied resource and store in execCtx for CEL evaluation if resource.Discovery != nil { discovered, discoverErr := re.discoverResource(ctx, resource, execCtx, transportTarget) if discoverErr != nil { re.log.Warnf(ctx, "Resource[%s] discovery after apply failed: %v", resource.Name, discoverErr) } else if discovered != nil { - // Step 7: Nested discoveries — find sub-resources within the discovered parent (e.g., ManifestWork) + // Step 8: Nested discoveries — find sub-resources within the discovered parent (e.g., ManifestWork) if len(resource.NestedDiscoveries) > 0 { nestedResults := re.discoverNestedResources(ctx, resource, execCtx, discovered) execCtx.Resources[resource.Name] = nestedResults diff --git a/test/testdata/dryrun/adapter-config.yaml b/test/testdata/dryrun/adapter-config.yaml new file mode 100644 index 0000000..5a98bf9 --- /dev/null +++ b/test/testdata/dryrun/adapter-config.yaml @@ -0,0 +1,24 @@ +# Adapter deployment configuration for dry-run testing +apiVersion: hyperfleet.redhat.com/v1alpha1 +kind: AdapterConfig +metadata: + name: dryrun-adapter + labels: + hyperfleet.io/component: adapter + +spec: + adapter: + version: "0.1.0" + + clients: + hyperfleetApi: + timeout: 10s + retryAttempts: 3 + retryBackoff: exponential + + broker: + subscriptionId: "dryrun-sub" + topic: "cluster-events" + + kubernetes: + apiVersion: "v1" diff --git a/test/testdata/dryrun/dryrun-api-responses.json b/test/testdata/dryrun/dryrun-api-responses.json new file mode 100644 index 0000000..8a39bb0 --- /dev/null +++ b/test/testdata/dryrun/dryrun-api-responses.json @@ -0,0 +1,55 @@ +{ + "responses": [ + { + "match": { + "method": "GET", + "urlPattern": "/api/hyperfleet/v1/clusters/.*" + }, + "responses": [ + { + "statusCode": 200, + "headers": { + "Content-Type": "application/json" + }, + "body": { + "id": "abc-123", + "name": "abc123", + "kind": "Cluster", + "href": "/api/hyperfleet/v1/clusters/abc123", + "generation": "77", + "nodes": { + "compute": 3 + }, + "spec": { + "region": "us-east-1", + "provider": "aws" + }, + "status": { + "conditions": [ + { + "type": "Ready", + "status": "False" + } + ] + } + } + } + ] + }, + { + "match": { + "method": "PATCH", + "urlPattern": "/api/hyperfleet/v1/clusters/.*/status" + }, + "responses": [ + { + "statusCode": 200, + "headers": { + "Content-Type": "application/json" + }, + "body": {} + } + ] + } + ] +} diff --git a/test/testdata/dryrun/dryrun-discovery.json b/test/testdata/dryrun/dryrun-discovery.json new file mode 100644 index 0000000..5be66f8 --- /dev/null +++ b/test/testdata/dryrun/dryrun-discovery.json @@ -0,0 +1,46 @@ +{ + "manifestwork-symbol000": { + "apiVersion": "work.open-cluster-management.io/v1", + "kind": "ManifestWork", + "metadata": { + "name": "manifestwork000-this-name-not-used", + "namespace": "cluster1", + "labels": { + "hyperfleet.io/resource-type": "manifestwork" + } + }, + "spec": { + "workload": { + "manifests": [ + { + "apiVersion": "v1", + "kind": "Namespace", + "metadata": { + "name": "abc123", + "labels": { + "hyperfleet.io/resource-type": "namespace", + "hyperfleet.io/cluster-id": "abc123", + "hyperfleet.io/label-for-discovery": "namespace-symbol111" + } + } + }, + { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": { + "name": "abc123-symbol222", + "namespace": "abc123", + "labels": { + "hyperfleet.io/cluster-id": "abc123" + } + }, + "data": { + "cluster_id": "abc123", + "cluster_name": "test-cluster" + } + } + ] + } + } + } +} diff --git a/test/testdata/dryrun/event.json b/test/testdata/dryrun/event.json new file mode 100644 index 0000000..f548548 --- /dev/null +++ b/test/testdata/dryrun/event.json @@ -0,0 +1,14 @@ +{ + "specversion": "1.0", + "id": "abc123", + "type": "io.hyperfleet.cluster.updated", + "source": "/api/clusters_mgmt/v1/clusters/abc123", + "time": "2025-01-15T10:30:00Z", + "datacontenttype": "application/json", + "data": { + "id": "abc123", + "kind": "Cluster", + "href": "/api/clusters_mgmt/v1/clusters/abc123", + "generation": 5 + } +} diff --git a/test/testdata/dryrun/task-config-invalid.yaml b/test/testdata/dryrun/task-config-invalid.yaml new file mode 100644 index 0000000..292c858 --- /dev/null +++ b/test/testdata/dryrun/task-config-invalid.yaml @@ -0,0 +1,44 @@ +# Invalid task configuration for testing validate command error reporting +apiVersion: hyperfleet.redhat.com/v1alpha1 +kind: AdapterTaskConfig +metadata: + name: invalid-task + +spec: + params: + - name: "clusterId" + source: "event.id" + type: "string" + required: true + + preconditions: + - name: "bad-cel" + expression: | + this is not valid CEL !!! + + resources: + - name: "missingFields" + manifest: + apiVersion: v1 + kind: ConfigMap + # missing metadata.name — validation should catch this + metadata: + labels: + app: test + annotations: + hyperfleet.io/generation: "1" + discovery: + namespace: "default" + byName: "test" + + - name: "undefinedVar" + manifest: + apiVersion: v1 + kind: ConfigMap + metadata: + name: "{{ .nonExistentVariable }}" + annotations: + hyperfleet.io/generation: "1" + discovery: + namespace: "default" + byName: "test" diff --git a/test/testdata/dryrun/task-config.yaml b/test/testdata/dryrun/task-config.yaml new file mode 100644 index 0000000..994e10b --- /dev/null +++ b/test/testdata/dryrun/task-config.yaml @@ -0,0 +1,251 @@ +# Task configuration for dry-run testing +apiVersion: hyperfleet.redhat.com/v1alpha1 +kind: AdapterTaskConfig +metadata: + name: dryrun-task + labels: + hyperfleet.io/adapter-type: dryrun + +spec: + params: + - name: "clusterId" + source: "event.id" + type: "string" + required: true + + - name: "clusterKind" + source: "event.kind" + type: "string" + default: "Cluster" + + - name: "generationValue" + source: "event.generation" + type: "string" + required: true + + - name: "region" + source: "env.REGION" + type: "string" + default: "us-east-1" + + - name: "adapterName" + source: "env.ADAPTER_NAME" + type: "string" + default: "dry-run-adapter" + + preconditions: + - name: "fetch-cluster" + apiCall: + method: "GET" + url: "/api/hyperfleet/v1/clusters/{{ .clusterId }}" + timeout: 10s + capture: + - name: "clusterName" + field: "name" + - name: "clusterStatus" + expression: | + status.conditions.filter(c, c.type == "Ready").size() > 0 + ? status.conditions.filter(c, c.type == "Ready")[0].status + : "False" + - name: "computeNodes" + field: "nodes.compute" + #TODO: research why we can not have {{ now }} as expression + - name: "timestamp" + expression: "\"2006-01-02T15:04:05Z07:00\"" + + conditions: + - field: "clusterStatus" + operator: "notEquals" + value: "True" + + resources: + - name: "resource0" + transport: + client: maestro + maestro: + targetCluster: cluster1 + manifest: + apiVersion: work.open-cluster-management.io/v1 + kind: ManifestWork + metadata: + # ManifestWork name - must be unique within consumer namespace + #name: "manifestwork-{{ .clusterId }}" + name: "manifestwork-symbol000" + + # Labels for identification, filtering, and management + labels: + # HyperFleet tracking labels + hyperfleet.io/cluster-id: "{{ .clusterId }}" + hyperfleet.io/adapter: "{{ .adapterName }}" + hyperfleet.io/component: "infrastructure" + hyperfleet.io/generation: "{{ .generationValue }}" + hyperfleet.io/resource-group: "cluster-setup" + + # Maestro-specific labels + maestro.io/source-id: "{{ .adapterName }}" + maestro.io/resource-type: "manifestwork" + maestro.io/priority: "normal" + + # Standard Kubernetes application labels + app.kubernetes.io/name: "aro-hcp-cluster" + app.kubernetes.io/instance: "{{ .clusterId }}" + app.kubernetes.io/version: "v1.0.0" + app.kubernetes.io/component: "infrastructure" + app.kubernetes.io/part-of: "hyperfleet" + app.kubernetes.io/managed-by: "hyperfleet-adapter" + app.kubernetes.io/created-by: "{{ .adapterName }}" + + # Annotations for metadata and operational information + annotations: + # Tracking and lifecycle + hyperfleet.io/created-by: "hyperfleet-adapter-framework" + hyperfleet.io/managed-by: "{{ .adapterName }}" + hyperfleet.io/generation: "{{ .generationValue }}" + hyperfleet.io/cluster-name: "{{ .clusterId }}" + hyperfleet.io/deployment-time: "{{ .timestamp }}" + + # Maestro-specific annotations + maestro.io/applied-time: "{{ .timestamp }}" + maestro.io/source-adapter: "{{ .adapterName }}" + + # Operational annotations + deployment.hyperfleet.io/strategy: "rolling" + deployment.hyperfleet.io/timeout: "300s" + monitoring.hyperfleet.io/enabled: "true" + + # Documentation + description: "Complete cluster setup including namespace, configuration, and RBAC" + documentation: "https://docs.hyperfleet.io/adapters/aro-hcp" + + # ManifestWork specification + spec: + # ============================================================================ + # Workload - Contains the Kubernetes manifests to deploy + # ============================================================================ + workload: + # Kubernetes manifests array - injected by framework from business logic config + manifests: + - apiVersion: v1 + kind: Namespace + metadata: + name: "{{ .clusterId | lower }}" + labels: + hyperfleet.io/cluster-id: "{{ .clusterId }}" + hyperfleet.io/managed-by: "{{ .metadata.name }}" + hyperfleet.io/resource-type: "namespace" + hyperfleet.io/label-for-discovery: "namespace-symbol111" + annotations: + hyperfleet.io/created-by: "hyperfleet-adapter" + hyperfleet.io/generation: "{{ .generationValue }}" + - apiVersion: v1 + kind: ConfigMap + metadata: + name: "cluster-config" + namespace: "{{ .clusterId }}-symbol222" + labels: + hyperfleet.io/cluster-id: "{{ .clusterId }}" + annotations: + hyperfleet.io/generation: "{{ .generationValue }}" + data: + cluster_id: "{{ .clusterId }}" + cluster_name: "{{ .clusterName }}" + + # ============================================================================ + # Delete Options - How resources should be removed + # ============================================================================ + deleteOption: + # Propagation policy for resource deletion + # - "Foreground": Wait for dependent resources to be deleted first + # - "Background": Delete immediately, let cluster handle dependents + # - "Orphan": Leave resources on cluster when ManifestWork is deleted + propagationPolicy: "Foreground" + + # Grace period for graceful deletion (seconds) + gracePeriodSeconds: 30 + + # ============================================================================ + # Manifest Configurations - Per-resource settings for update and feedback + # ============================================================================ + manifestConfigs: + # ======================================================================== + # Configuration for Namespace resources + # ======================================================================== + - resourceIdentifier: + group: "" # Core API group (empty for v1 resources) + resource: "namespaces" # Resource type + name: "{{ .clusterId | lower }}" # Specific resource name + updateStrategy: + type: "ServerSideApply" # Use server-side apply for namespaces + serverSideApply: + fieldManager: "hyperfleet-adapter" # Field manager name for conflict resolution + force: false # Don't force conflicts (fail on conflicts) + feedbackRules: + - type: "JSONPaths" # Use JSON path expressions for status feedback + jsonPaths: + - name: "phase" # Namespace phase (Active, Terminating) + path: ".status.phase" + - name: "conditions" # Namespace conditions array + path: ".status.conditions" + - name: "creationTimestamp" # When namespace was created + path: ".metadata.creationTimestamp" + + discovery: + namespace: "*" + #byName: "cluster-{{ .clusterId }}-config" + #byName: "manifestwork0" + bySelectors: + labelSelector: + hyperfleet.io/resource-type: manifestwork + + # Discover sub-resources within the manifestWork + # This approach can be used to use the discovery name to parameter level + # This can support jsonPath to dig into the resource status. like discoveryNamespace.status.conditions[?(@.type=="Ready")].status + nestedDiscoveries: + - name: "discoveryNamespace" + discovery: + bySelectors: + labelSelector: + hyperfleet.io/label-for-discovery: "namespace-symbol111" + - name: "discoveryConfigMap" + discovery: + byName: "{{ .clusterId }}-symbol222" + + post: + payloads: + - name: "statusPayload" + build: + conditions: + - type: "Applied" + status: + expression: | + has(resources.resource0) && has(resources.resource0.discoveryNamespace) ? "True" : "False" + reason: + expression: | + resources.resource0.discoveryNamespace.metadata.name + + ' / ' + + metadata.name + + + ' / ' + + resources.resource0.discoveryConfigMap.metadata.name + message: + expression: | + has(resources.resource0) && has(resources.resource0.discoveryNamespace) + ? "Resources discovered successfully" + : "Discovery pending" + - type: "Health" + status: + expression: | + adapter.?executionStatus.orValue("") == "success" ? "True" : "False" + reason: + expression: | + adapter.?errorReason.orValue("") != "" ? adapter.?errorReason.orValue("") : "Healthy" + message: + expression: | + adapter.?errorMessage.orValue("") != "" ? adapter.?errorMessage.orValue("") : "Adapter healthy" + + postActions: + - name: "update-status" + apiCall: + method: "PATCH" + url: "/api/hyperfleet/v1/clusters/{{ .clusterId }}/statuses" + body: "{{ .statusPayload }}"