diff --git a/README.md b/README.md index 993570a..a49ea00 100644 --- a/README.md +++ b/README.md @@ -142,7 +142,7 @@ $ builder-playground logs validator Builder-playground supports inspecting the connection of a service to a specific port. ```bash -$ builder-playground inspect +$ builder-playground debug inspect ``` Example: diff --git a/main.go b/main.go index 6ffdf7d..2f6d54a 100644 --- a/main.go +++ b/main.go @@ -105,6 +105,27 @@ var inspectCmd = &cobra.Command{ }, } +var debugCmd = &cobra.Command{ + Use: "debug", +} + +var probeCmd = &cobra.Command{ + Use: "probe", + RunE: func(cmd *cobra.Command, args []string) error { + serviceName := args[0] + + resp, err := playground.ExecuteHealthCheckManually(serviceName) + if err != nil { + return err + } + + fmt.Printf("Exit code: %d\n", resp.ExitCode) + fmt.Printf("Output: %s\n", resp.Output) + + return nil + }, +} + var logsCmd = &cobra.Command{ Use: "logs", Short: "Show logs for a service", @@ -193,6 +214,10 @@ func main() { rootCmd.AddCommand(stopCmd) stopCmd.Flags().StringVar(&outputFlag, "output", "", "Output folder for the artifacts") + debugCmd.AddCommand(probeCmd) + debugCmd.AddCommand(inspectCmd) + rootCmd.AddCommand(debugCmd) + if err := rootCmd.Execute(); err != nil { fmt.Println(err) os.Exit(1) @@ -280,14 +305,14 @@ func runIt(recipe playground.Recipe) error { if interactive { i := playground.NewInteractiveDisplay(svcManager) - cfg.Callback = i.HandleUpdate + cfg.AddCallback(i.HandleUpdate) } // Add callback to log service updates in debug mode if logLevel == playground.LevelDebug { - cfg.Callback = func(serviceName string, update playground.TaskStatus) { + cfg.AddCallback(func(serviceName string, update playground.TaskStatus) { log.Printf("[DEBUG] [%s] %s\n", serviceName, update) - } + }) } dockerRunner, err := playground.NewLocalRunner(cfg) @@ -328,14 +353,7 @@ func runIt(recipe playground.Recipe) error { return fmt.Errorf("failed to wait for service readiness: %w", err) } - fmt.Printf("\nWaiting for network to be ready for transactions...\n") - networkReadyStart := time.Now() - if err := playground.CompleteReady(ctx, svcManager.Services); err != nil { - dockerRunner.Stop() - return fmt.Errorf("network not ready: %w", err) - } - fmt.Printf("Network is ready for transactions (took %.1fs)\n", time.Since(networkReadyStart).Seconds()) - fmt.Println("Session ID:", svcManager.ID) + fmt.Println("\nServices healthy... Ready to accept transactions") // get the output from the recipe output := recipe.Output(svcManager) @@ -353,9 +371,11 @@ func runIt(recipe playground.Recipe) error { watchdogErr := make(chan error, 1) if watchdog { go func() { - if err := playground.RunWatchdog(artifacts.Out, svcManager.Services); err != nil { - watchdogErr <- fmt.Errorf("watchdog failed: %w", err) - } + cfg.AddCallback(func(name string, status playground.TaskStatus) { + if status == playground.TaskStatusUnhealty { + watchdogErr <- fmt.Errorf("watchdog failed: %w", fmt.Errorf("task '%s' is not healthy anymore", name)) + } + }) }() } diff --git a/playground/components.go b/playground/components.go index 728c13b..99ca966 100644 --- a/playground/components.go +++ b/playground/components.go @@ -1,9 +1,7 @@ package playground import ( - "context" "fmt" - "io" "strconv" "strings" "time" @@ -28,7 +26,6 @@ func (r *RollupBoost) Apply(manifest *Manifest) { service := manifest.NewService("rollup-boost"). WithImage("docker.io/flashbots/rollup-boost"). WithTag("v0.7.5"). - DependsOnHealthy(r.ELNode). WithArgs( "--rpc-host", "0.0.0.0", "--rpc-port", `{{Port "authrpc" 8551}}`, @@ -229,11 +226,9 @@ type ChainMonitor struct { func (c *ChainMonitor) Apply(manifest *Manifest) { manifest.NewService("chain-monitor"). - WithPort("metrics", 8080). - WithImage("ghcr.io/flashbots/chain-monitor"). - WithTag("v0.0.54"). - DependsOnHealthy(c.L1RPC). - DependsOnHealthy(c.L2RPC). + WithPort("metrics", 8080). + WithImage("ghcr.io/flashbots/chain-monitor"). + WithTag("v0.0.54"). WithArgs( "serve", "--l1-rpc", Connect(c.L1RPC, "http"), @@ -343,7 +338,7 @@ func (o *OpGeth) Apply(manifest *Manifest) { trustedPeers = fmt.Sprintf("--bootnodes %s ", manifest.ctx.Bootnode.Connect()) } - manifest.NewService("op-geth"). + svc := manifest.NewService("op-geth"). WithImage("us-docker.pkg.dev/oplabs-tools-artifacts/images/op-geth"). WithTag("v1.101503.2-rc.5"). WithEntrypoint("/bin/sh"). @@ -383,28 +378,11 @@ func (o *OpGeth) Apply(manifest *Manifest) { "--metrics.port "+`{{Port "metrics" 6061}}`, ). WithVolume("data", "/data_opgeth"). - WithWatchdog(opGethWatchdogFn). - WithReadyFn(opGethReadyFn). WithArtifact("/data/l2-genesis.json", "l2-genesis.json"). WithArtifact("/data/jwtsecret", "jwtsecret"). - WithArtifact("/data/p2p_key.txt", o.Enode.Artifact). - WithReady(ReadyCheck{ - QueryURL: "http://localhost:8545", - Interval: 1 * time.Second, - Timeout: 10 * time.Second, - Retries: 20, - StartPeriod: 1 * time.Second, - }) -} - -func opGethReadyFn(ctx context.Context, service *Service) error { - opGethURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort) - return waitForFirstBlock(ctx, opGethURL, 60*time.Second) -} + WithArtifact("/data/p2p_key.txt", o.Enode.Artifact) -func opGethWatchdogFn(out io.Writer, service *Service, ctx context.Context) error { - gethURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort) - return watchChainHead(out, gethURL, 2*time.Second) + UseHealthmon(manifest, svc) } type RethEL struct { @@ -480,24 +458,11 @@ func (r *RethEL) Apply(manifest *Manifest) { logLevelToRethVerbosity(manifest.ctx.LogLevel), ). WithRelease(rethELRelease). - WithWatchdog(func(out io.Writer, service *Service, ctx context.Context) error { - rethURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort) - return watchChainHead(out, rethURL, 12*time.Second) - }). - WithReadyFn(func(ctx context.Context, service *Service) error { - elURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort) - return waitForFirstBlock(ctx, elURL, 60*time.Second) - }). WithArtifact("/data/genesis.json", "genesis.json"). WithArtifact("/data/jwtsecret", "jwtsecret"). - WithVolume("data", "/data_reth"). - WithReady(ReadyCheck{ - QueryURL: "http://localhost:8545", - Interval: 1 * time.Second, - Timeout: 10 * time.Second, - Retries: 20, - StartPeriod: 1 * time.Second, - }) + WithVolume("data", "/data_reth") + + UseHealthmon(manifest, svc) if r.UseNativeReth { // we need to use this otherwise the db cannot be binded @@ -615,7 +580,6 @@ func (m *MevBoostRelay) Apply(manifest *Manifest) { WithEnv("ALLOW_SYNCING_BEACON_NODE", "1"). WithEntrypoint("mev-boost-relay"). DependsOnHealthy(m.BeaconClient). - WithWatchdog(mevboostRelayWatchdogFn). WithArgs( "--api-listen-addr", "0.0.0.0", "--api-listen-port", `{{Port "http" 5555}}`, @@ -627,20 +591,6 @@ func (m *MevBoostRelay) Apply(manifest *Manifest) { } } -func mevboostRelayWatchdogFn(out io.Writer, service *Service, ctx context.Context) error { - beaconNodeURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort) - - watchGroup := newWatchGroup() - watchGroup.watch(func() error { - return watchProposerPayloads(beaconNodeURL) - }) - watchGroup.watch(func() error { - return validateProposerPayloads(out, beaconNodeURL) - }) - - return watchGroup.wait() -} - type OpReth struct{} var opRethRelease = &release{ @@ -661,7 +611,7 @@ var opRethRelease = &release{ } func (o *OpReth) Apply(manifest *Manifest) { - manifest.NewService("op-reth"). + svc := manifest.NewService("op-reth"). WithImage("ghcr.io/paradigmxyz/op-reth"). WithTag("nightly"). WithEntrypoint("op-reth"). @@ -681,20 +631,11 @@ func (o *OpReth) Apply(manifest *Manifest) { "--addr", "0.0.0.0", "--port", `{{Port "rpc" 30303}}`). WithRelease(opRethRelease). - WithWatchdog(func(out io.Writer, service *Service, ctx context.Context) error { - rethURL := fmt.Sprintf("http://localhost:%d", service.MustGetPort("http").HostPort) - return watchChainHead(out, rethURL, 2*time.Second) - }). WithArtifact("/data/jwtsecret", "jwtsecret"). WithArtifact("/data/l2-genesis.json", "l2-genesis.json"). - WithVolume("data", "/data_op_reth"). - WithReady(ReadyCheck{ - QueryURL: "http://localhost:8545", - Interval: 1 * time.Second, - Timeout: 10 * time.Second, - Retries: 20, - StartPeriod: 1 * time.Second, - }) + WithVolume("data", "/data_op_reth") + + UseHealthmon(manifest, svc) } type MevBoost struct { @@ -933,3 +874,18 @@ func (b *BuilderHub) Apply(manifest *Manifest) { WithEnv("TARGET", Connect("web", "http")). DependsOnHealthy("web") } + +func UseHealthmon(m *Manifest, s *Service) { + m.NewService(s.Name+"_healthmon"). + WithImage("ghcr.io/flashbots/ethereum-healthmon"). + WithTag("v0.0.1"). + // TODO: Use this also for beacon node + WithArgs("--chain", "execution", "--url", Connect(s.Name, "http")). + WithReady(ReadyCheck{ + Test: []string{"CMD", "wget", "--spider", "--quiet", "http://127.0.0.1:21171/ready"}, + Interval: 1 * time.Second, + Timeout: 10 * time.Second, + Retries: 20, + StartPeriod: 1 * time.Second, + }) +} diff --git a/playground/components_test.go b/playground/components_test.go index 8c114f6..4a2aa67 100644 --- a/playground/components_test.go +++ b/playground/components_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/rpc" "github.com/stretchr/testify/require" ) @@ -143,8 +145,6 @@ func (tt *testFramework) test(s ServiceGen, args []string) *Manifest { require.NoError(t, err) require.NoError(t, dockerRunner.WaitForReady(context.Background(), 20*time.Second)) - require.NoError(t, CompleteReady(context.Background(), svcManager.Services)) - return svcManager } @@ -164,3 +164,29 @@ func toSnakeCase(s string) string { // Convert to lowercase return strings.ToLower(snake) } + +func waitForBlock(elURL string, targetBlock uint64, timeout time.Duration) error { + rpcClient, err := rpc.Dial(elURL) + if err != nil { + return fmt.Errorf("failed to connect to %s: %w", elURL, err) + } + defer rpcClient.Close() + + clt := ethclient.NewClient(rpcClient) + timeoutCh := time.After(timeout) + + for { + select { + case <-timeoutCh: + return fmt.Errorf("timeout waiting for block %d on %s", targetBlock, elURL) + case <-time.After(500 * time.Millisecond): + num, err := clt.BlockNumber(context.Background()) + if err != nil { + continue + } + if num >= targetBlock { + return nil + } + } + } +} diff --git a/playground/local_runner.go b/playground/local_runner.go index c254c1a..001c2ba 100644 --- a/playground/local_runner.go +++ b/playground/local_runner.go @@ -73,12 +73,13 @@ type task struct { type TaskStatus string var ( - TaskStatusPulling TaskStatus = "pulling" - TaskStatusPulled TaskStatus = "pulled" - TaskStatusPending TaskStatus = "pending" - TaskStatusStarted TaskStatus = "started" - TaskStatusDie TaskStatus = "die" - TaskStatusHealthy TaskStatus = "healthy" + TaskStatusPulling TaskStatus = "pulling" + TaskStatusPulled TaskStatus = "pulled" + TaskStatusPending TaskStatus = "pending" + TaskStatusStarted TaskStatus = "started" + TaskStatusDie TaskStatus = "die" + TaskStatusHealthy TaskStatus = "healthy" + TaskStatusUnhealty TaskStatus = "unhealthy" ) func newDockerClient() (*client.Client, error) { @@ -97,9 +98,17 @@ type RunnerConfig struct { Labels map[string]string LogInternally bool Platform string - Callback func(serviceName string, update TaskStatus) + Callbacks []Callback } +func (r *RunnerConfig) AddCallback(c Callback) { + if r.Callbacks == nil { + r.Callbacks = append(r.Callbacks, c) + } +} + +type Callback func(serviceName string, update TaskStatus) + func NewLocalRunner(cfg *RunnerConfig) (*LocalRunner, error) { client, err := newDockerClient() if err != nil { @@ -144,8 +153,8 @@ func NewLocalRunner(cfg *RunnerConfig) (*LocalRunner, error) { cfg.NetworkName = defaultNetworkName } - if cfg.Callback == nil { - cfg.Callback = func(serviceName string, update TaskStatus) {} // noop + if cfg.Callbacks == nil { + cfg.Callbacks = []Callback{func(serviceName string, update TaskStatus) {}} // noop } d := &LocalRunner{ @@ -205,11 +214,19 @@ func (d *LocalRunner) WaitForReady(ctx context.Context, timeout time.Duration) e } } +func (d *LocalRunner) emitCallback(name string, status TaskStatus) { + for _, callback := range d.config.Callbacks { + callback(name, status) + } +} + func (d *LocalRunner) updateTaskStatus(name string, status TaskStatus) { d.tasksMtx.Lock() defer d.tasksMtx.Unlock() if status == TaskStatusHealthy { d.tasks[name].ready = true + } else if status == TaskStatusUnhealty { + d.tasks[name].ready = false } else { d.tasks[name].status = status } @@ -218,7 +235,7 @@ func (d *LocalRunner) updateTaskStatus(name string, status TaskStatus) { d.exitErr <- fmt.Errorf("container %s failed", name) } - d.config.Callback(name, status) + d.emitCallback(name, status) } func (d *LocalRunner) ExitErr() <-chan error { @@ -782,6 +799,9 @@ func (d *LocalRunner) trackContainerStatusAndLogs() { case events.ActionHealthStatusHealthy: d.updateTaskStatus(name, TaskStatusHealthy) log.Info("container is healthy", "name", name) + + case events.ActionHealthStatusUnhealthy: + d.updateTaskStatus(name, TaskStatusUnhealty) } case err := <-errCh: @@ -864,7 +884,7 @@ func (d *LocalRunner) ensureImage(ctx context.Context, imageName string) error { } // Image not found locally, pull it - d.config.Callback(imageName, TaskStatusPulling) + d.emitCallback(imageName, TaskStatusPulling) slog.Info("pulling image", "image", imageName) reader, err := d.client.ImagePull(ctx, imageName, image.PullOptions{}) @@ -879,7 +899,7 @@ func (d *LocalRunner) ensureImage(ctx context.Context, imageName string) error { return fmt.Errorf("failed to read image pull output %s: %w", imageName, err) } - d.config.Callback(imageName, TaskStatusPulled) + d.emitCallback(imageName, TaskStatusPulled) return nil } @@ -953,3 +973,114 @@ func (d *LocalRunner) Run(ctx context.Context) error { return g.Wait() } + +type HealthCheckResponse struct { + Output string + ExitCode int +} + +func ExecuteHealthCheckManually(serviceName string) (*HealthCheckResponse, error) { + ctx := context.Background() + + cli, err := newDockerClient() + if err != nil { + return nil, fmt.Errorf("failed to create docker client: %w", err) + } + defer cli.Close() + + containerID, err := findServiceByName(cli, ctx, serviceName) + if err != nil { + return nil, err + } + + // Get the container to find the health check command + containerJSON, err := cli.ContainerInspect(ctx, containerID) + if err != nil { + return nil, fmt.Errorf("failed to inspect container: %w", err) + } + + if containerJSON.Config.Healthcheck == nil { + return nil, fmt.Errorf("container has no health check configured") + } + + healthCheckCmd := containerJSON.Config.Healthcheck.Test + + // Health check commands are usually in format: ["CMD-SHELL", "actual command"] + // or ["CMD", "arg1", "arg2", ...] + var execCmd []string + + if len(healthCheckCmd) == 0 { + return nil, fmt.Errorf("health check command is empty") + } + + if healthCheckCmd[0] == "CMD-SHELL" { + // Use sh -c to execute the shell command + if len(healthCheckCmd) > 1 { + execCmd = []string{"sh", "-c", healthCheckCmd[1]} + } else { + return nil, fmt.Errorf("CMD-SHELL specified but no command provided") + } + } else if healthCheckCmd[0] == "CMD" { + // Direct command execution + execCmd = healthCheckCmd[1:] + } else { + // Assume it's a direct command + execCmd = healthCheckCmd + } + + // Create exec instance + execConfig := container.ExecOptions{ + Cmd: execCmd, + AttachStdout: true, + AttachStderr: true, + } + + execID, err := cli.ContainerExecCreate(ctx, containerID, execConfig) + if err != nil { + return nil, fmt.Errorf("failed to create exec: %w", err) + } + + // Start the exec and get output + resp, err := cli.ContainerExecAttach(ctx, execID.ID, container.ExecStartOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to attach to exec: %w", err) + } + defer resp.Close() + + // Read all output + var outBuf bytes.Buffer + _, err = io.Copy(&outBuf, resp.Reader) + if err != nil && err != io.EOF { + return nil, fmt.Errorf("error reading output: %w", err) + } + + // Get exit code + inspectResp, err := cli.ContainerExecInspect(ctx, execID.ID) + if err != nil { + return nil, fmt.Errorf("failed to inspect exec: %w", err) + } + + healthCheckResp := &HealthCheckResponse{ + Output: strings.TrimSpace(outBuf.String()), + ExitCode: inspectResp.ExitCode, + } + return healthCheckResp, nil +} + +func findServiceByName(client *client.Client, ctx context.Context, serviceName string) (string, error) { + containers, err := client.ContainerList(ctx, container.ListOptions{ + All: true, + }) + if err != nil { + return "", fmt.Errorf("error getting container list: %w", err) + } + + for _, container := range containers { + if container.Labels["playground"] == "true" && + container.Labels["com.docker.compose.service"] == serviceName { + return container.ID, nil + } + } + + return "", nil +} diff --git a/playground/local_runner_test.go b/playground/local_runner_test.go index 8cc5448..04b4b51 100644 --- a/playground/local_runner_test.go +++ b/playground/local_runner_test.go @@ -37,8 +37,8 @@ func TestRunnerPullImages(t *testing.T) { } cfg := &RunnerConfig{ - Manifest: manifest, - Callback: callback, + Manifest: manifest, + Callbacks: []Callback{callback}, } runner, err := NewLocalRunner(cfg) require.NoError(t, err) diff --git a/playground/manifest.go b/playground/manifest.go index b3417de..47721a2 100644 --- a/playground/manifest.go +++ b/playground/manifest.go @@ -1,10 +1,8 @@ package playground import ( - "context" "encoding/json" "fmt" - "io" "os" "path/filepath" "strings" @@ -321,16 +319,9 @@ type Service struct { Entrypoint string `json:"entrypoint,omitempty"` HostPath string `json:"host_path,omitempty"` - release *release - watchdogFn watchdogFn - readyFn readyFn + release *release } -type ( - watchdogFn func(out io.Writer, service *Service, ctx context.Context) error - readyFn func(ctx context.Context, service *Service) error -) - type DependsOnCondition string const ( @@ -439,16 +430,6 @@ func (s *Service) WithRelease(rel *release) *Service { return s } -func (s *Service) WithWatchdog(watchdogFn watchdogFn) *Service { - s.watchdogFn = watchdogFn - return s -} - -func (s *Service) WithReadyFn(readyFn readyFn) *Service { - s.readyFn = readyFn - return s -} - func (s *Service) applyTemplate(arg string) { var port []Port var nodeRef []NodeRef diff --git a/playground/watchdog.go b/playground/watchdog.go deleted file mode 100644 index ab27c24..0000000 --- a/playground/watchdog.go +++ /dev/null @@ -1,42 +0,0 @@ -package playground - -import ( - "context" - "fmt" -) - -func RunWatchdog(out *output, services []*Service) error { - watchdogErr := make(chan error, len(services)) - - output, err := out.LogOutput("watchdog") - if err != nil { - return fmt.Errorf("failed to create log output: %w", err) - } - - for _, s := range services { - if watchdogFn := s.watchdogFn; watchdogFn != nil { - go func() { - if err := watchdogFn(output, s, context.Background()); err != nil { - watchdogErr <- fmt.Errorf("service %s watchdog failed: %w", s.Name, err) - } - }() - } - } - - // If any of the watchdogs fail, we return the error - if err := <-watchdogErr; err != nil { - return fmt.Errorf("failed to run watchdog: %w", err) - } - return nil -} - -func CompleteReady(ctx context.Context, services []*Service) error { - for _, s := range services { - if readyFn := s.readyFn; readyFn != nil { - if err := readyFn(ctx, s); err != nil { - return err - } - } - } - return nil -} diff --git a/playground/watchers.go b/playground/watchers.go deleted file mode 100644 index 4e2aece..0000000 --- a/playground/watchers.go +++ /dev/null @@ -1,293 +0,0 @@ -package playground - -import ( - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "time" - - "github.com/ethereum/go-ethereum/ethclient" - "github.com/ethereum/go-ethereum/rpc" - "github.com/flashbots/mev-boost-relay/beaconclient" - mevRCommon "github.com/flashbots/mev-boost-relay/common" -) - -func waitForFirstBlock(ctx context.Context, elURL string, timeout time.Duration) error { - rpcClient, err := rpc.Dial(elURL) - if err != nil { - fmt.Printf(" [%s] Failed to connect: %v\n", elURL, err) - return err - } - defer rpcClient.Close() - - clt := ethclient.NewClient(rpcClient) - fmt.Printf(" [%s] Connected, waiting for first block...\n", elURL) - - timeoutCh := time.After(timeout) - checkCount := 0 - for { - select { - case <-ctx.Done(): - return ctx.Err() - case <-timeoutCh: - return fmt.Errorf("timeout waiting for first block on %s", elURL) - case <-time.After(500 * time.Millisecond): - num, err := clt.BlockNumber(ctx) - checkCount++ - if err != nil { - if checkCount%10 == 0 { - fmt.Printf(" [%s] Error getting block number: %v\n", elURL, err) - } - continue - } - if num > 0 { - fmt.Printf(" [%s] First block detected: %d\n", elURL, num) - return nil - } - if checkCount%10 == 0 { - fmt.Printf(" [%s] Block number: %d (waiting for > 0)\n", elURL, num) - } - } - } -} - -func waitForChainAlive(ctx context.Context, logOutput io.Writer, beaconNodeURL string, timeout time.Duration) error { - // Test that blocks are being produced - log := mevRCommon.LogSetup(false, "info").WithField("context", "waitForChainAlive") - log.Logger.Out = logOutput - - clt := beaconclient.NewProdBeaconInstance(log, beaconNodeURL, beaconNodeURL) - - // Subscribe to head events right away even if the connection has not been established yet - // That is handled internally in the function already. - // Otherwise, if we connect only when the first head slot happens we might miss some initial slots. - ch := make(chan beaconclient.PayloadAttributesEvent) - go clt.SubscribeToPayloadAttributesEvents(ch) - - { - // If the chain has not started yet, wait for it to start. - // Otherwise, the subscription will not return any data. - bClient := beaconclient.NewMultiBeaconClient(log, []beaconclient.IBeaconInstance{ - clt, - }) - - isReady := func() bool { - sync, err := bClient.BestSyncStatus() - if err != nil { - return false - } - return sync.HeadSlot >= 1 - } - - if !isReady() { - syncTimeoutCh := time.After(timeout) - for { - if isReady() { - break - } - select { - case <-syncTimeoutCh: - return fmt.Errorf("beacon client failed to start") - case <-ctx.Done(): - return fmt.Errorf("timeout waiting for chain to start") - default: - time.Sleep(1 * time.Second) - } - } - } - } - - return nil -} - -// validateProposerPayloads validates that payload attribute events are being broadcasted by the beacon node -// in the correct order without any missing slots. -func validateProposerPayloads(logOutput io.Writer, beaconNodeURL string) error { - // Test that blocks are being produced - log := mevRCommon.LogSetup(false, "info").WithField("context", "validateProposerPayloads") - log.Logger.Out = logOutput - - clt := beaconclient.NewProdBeaconInstance(log, beaconNodeURL, beaconNodeURL) - - // We run this after 'waitForChainAlive' to ensure that the beacon node is ready to receive payloads. - ch := make(chan beaconclient.PayloadAttributesEvent) - go clt.SubscribeToPayloadAttributesEvents(ch) - - log.Infof("Chain is alive. Subscribing to head events") - - var lastSlot uint64 - for { - select { - case head := <-ch: - log.Infof("Slot: %d Parent block number: %d", head.Data.ProposalSlot, head.Data.ParentBlockNumber) - - // If we are being notified of a new slot, validate that the slots are contiguous - // Note that lighthouse might send multiple updates for the same slot. - if lastSlot != 0 && lastSlot != head.Data.ProposalSlot && lastSlot+1 != head.Data.ProposalSlot { - return fmt.Errorf("slot mismatch, expected %d, got %d", lastSlot+1, head.Data.ProposalSlot) - } - // if the network did not miss any initial slots, lighthouse will send payload attribute updates - // of the form: (slot = slot, parent block number = slot - 2), (slot, slot - 1). - // The -2 is in case we want to handle reorgs in the chain. - // We need to validate that at least the difference between the parent block number and the slot is 2. - if head.Data.ProposalSlot-head.Data.ParentBlockNumber > 2 { - return fmt.Errorf("parent block too big %d", head.Data.ParentBlockNumber) - } - - lastSlot = head.Data.ProposalSlot - case <-time.After(20 * time.Second): - return fmt.Errorf("timeout waiting for block") - } - } -} - -func watchProposerPayloads(beaconNodeURL string) error { - getProposerPayloadDelivered := func() ([]*mevRCommon.BidTraceV2JSON, error) { - resp, err := http.Get(fmt.Sprintf("%s/relay/v1/data/bidtraces/proposer_payload_delivered", beaconNodeURL)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - data, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - var payloadDeliveredList []*mevRCommon.BidTraceV2JSON - if err := json.Unmarshal(data, &payloadDeliveredList); err != nil { - return nil, err - } - return payloadDeliveredList, nil - } - - // Wait for at least 10 seconds for Mev-boost to start - timerC := time.After(10 * time.Second) -LOOP: - for { - select { - case <-timerC: - break - case <-time.After(2 * time.Second): - if _, err := getProposerPayloadDelivered(); err == nil { - break LOOP - } - } - } - - // This is not the most efficient solution since we are querying the endpoint for the full list of payloads - // every 2 seconds. It should be fine for the kind of workloads expected to run. - - lastSlot := uint64(0) - - for { - time.Sleep(2 * time.Second) - - vals, err := getProposerPayloadDelivered() - if err != nil { - fmt.Println("Error getting proposer payloads:", err) - continue - } - - for _, val := range vals { - if val.Slot <= lastSlot { - continue - } - - fmt.Printf("Block Proposed: Slot: %d, Builder: %s, Block: %d\n", val.Slot, val.BuilderPubkey, val.BlockNumber) - lastSlot = val.Slot - } - } -} - -func waitForBlock(elURL string, targetBlock uint64, timeout time.Duration) error { - rpcClient, err := rpc.Dial(elURL) - if err != nil { - return fmt.Errorf("failed to connect to %s: %w", elURL, err) - } - defer rpcClient.Close() - - clt := ethclient.NewClient(rpcClient) - timeoutCh := time.After(timeout) - - for { - select { - case <-timeoutCh: - return fmt.Errorf("timeout waiting for block %d on %s", targetBlock, elURL) - case <-time.After(500 * time.Millisecond): - num, err := clt.BlockNumber(context.Background()) - if err != nil { - continue - } - if num >= targetBlock { - return nil - } - } - } -} - -// watchChainHead watches the chain head and ensures that it is advancing -func watchChainHead(logOutput io.Writer, elURL string, blockTime time.Duration) error { - log := mevRCommon.LogSetup(false, "info").WithField("context", "watchChainHead").WithField("el", elURL) - log.Logger.Out = logOutput - - // add some wiggle room to block time - blockTime = blockTime + 1*time.Second - - rpcClient, err := rpc.Dial(elURL) - if err != nil { - return err - } - - var latestBlock *uint64 - clt := ethclient.NewClient(rpcClient) - - timeout := time.NewTimer(blockTime) - defer timeout.Stop() - - for { - select { - case <-time.After(500 * time.Millisecond): - num, err := clt.BlockNumber(context.Background()) - if err != nil { - return err - } - if latestBlock != nil && num <= *latestBlock { - continue - } - log.Infof("Chain head: %d", num) - latestBlock = &num - - // Reset timeout since we saw a new block - if !timeout.Stop() { - <-timeout.C - } - timeout.Reset(blockTime) - - case <-timeout.C: - return fmt.Errorf("chain head for %s not advancing", elURL) - } - } -} - -type watchGroup struct { - errCh chan error -} - -func newWatchGroup() *watchGroup { - return &watchGroup{ - errCh: make(chan error, 1), - } -} - -func (wg *watchGroup) watch(watch func() error) { - go func() { - wg.errCh <- watch() - }() -} - -func (wg *watchGroup) wait() error { - return <-wg.errCh -}