diff --git a/README.md b/README.md index 19b7178..034061f 100644 --- a/README.md +++ b/README.md @@ -46,11 +46,22 @@ Tools to add: - build - docker +### Environment + +The following variables can be set in the environment. + +| Name | Description | Default | +|-------|------------|---------------| +| `FRACTALE_MCP_PORT` | Port to run MCP server on, if using http variant | 8089 | +| `FRACTALE_MCP_TOKEN` | Token to use for testing | unset | +| `FRACTALE_LLM_PROVIDER` | LLM Backend to use (gemini, openai, llama) | gemini | + ### Testing Start the server in one terminal. Export `FRACTALE_MCP_TOKEN` if you want to require simple token auth. Here is for http. ```bash +export FRACTALE_TOKEN_AUTH=dudewheresmycar fractale start --transport http --port 8089 ``` @@ -64,26 +75,41 @@ curl -s http://0.0.0.0:8089/health | jq python3 examples/mcp/test_echo.py ``` -TODO: +### Agents - - we will want to keep track of state (retries, etc.) for agents somewhere. +The `fractale agent` command provides means to run build, job generation, and deployment agents. +In our [first version](https://github.com/compspec/fractale), an agent corresponded to a kind of task (e.g., build). For this refactored version, the concept of an agent is represented in a prompt or persona, which can be deployed by a generic MCP agent with some model backend (e.g., Gemini, Llama, or OpenAI). Let's test +doing a build: -### Agents +```bash +# In both terminals +export FRACTALE_MCP_TOKEN=dude -**Not written yet** +# In one terminal (start MCP) +fractale start -t http --port 8089 -The `fractale agent` command provides means to run build, job generation, and deployment agents. -This part of the library is under development. There are three kinds of agents: +# Define the model (provider and endpoints) to use. +export FRACTALE_LLM_PROVIDER=openai +export OPENAI_API_KEY=xxxxxxxxxxxxxxxx +export OPENAI_BASE_URL=https://my.custom.url/v1 + +# In the other, run the plan +fractale agent ./examples/plans/docker-build-lammps.yaml +``` - - `step` agents are experts on doing specific tasks (do hold state) - `manager` agents know how to orchestrate step agents and choose between them (don't hold state, but could) - - `helper` agents are used by step agents to do small tasks (e.g., suggest a fix for an error) + - `step` agents are experts on doing specific tasks. This originally was an agent with specific functions to do something (e.g., docker build) and now is a generic MCP agent with a prompt that gives it context and a goal. -The design is simple in that each agent is responding to state of error vs. success. In the [first version]() of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. In the case of a step agent, the return code determines to continue or try again. In the case of a helper, the input is typically an erroneous response (or something that needs changing) with respect to a goal. For a manager, we are making a choice based on a previous erroneous step. +The initial design of `helper` agents from the first fractale is subsumed by the idea of an MCP function. A helper agent _is_ an MCP tool. + +The design is simple in that each agent is responding to state of error vs. success. In the [first version](https://github.com/compspec/fractale) of our library, agents formed a custom graph. In this variant, we refactor to use MCP server tools. It has the same top level design with a manager, but each step agent is like a small state machine governed by an LLM with access to MCP tools and resources. See [examples/agent](examples/agent) for an example, along with observations, research questions, ideas, and experiment brainstorming! -TODO refactor examples. +#### TODO + +- refactor examples +- debug why the startup is so slow. ### Design Choices @@ -96,6 +122,33 @@ Here are a few design choices (subject to change, of course). I am starting with - The backend of FastMCP is essentially starlette, so we define (and add) other routes to the server. +### Job Specifications + +#### Simple + +We provide a simple translation layer between job specifications. We take the assumption that although each manager has many options, the actual options a user would use is a much smaller set, and it's relatively straight forward to translate (and have better accuracy). + +See [examples/transform](examples/transform) for an example. + +#### Complex + +We want to: + +1. Generate software graphs for some cluster (fluxion JGF) (this is done with [compspec](https://github.com/compspec/compspec) +2. Register N clusters to a tool (should be written as a python module) +3. Tool would have ability to select clusters from resources known, return +4. Need graphical representation (json) of each cluster - this will be used with the LLM inference + +See [examples/fractale](examples/fractale) for a detailed walk-through of the above. + +For graph tool: + +```bash +conda install -c conda-forge graph-tool +``` + + + ## License HPCIC DevTools is distributed under the terms of the MIT license. diff --git a/examples/plans/docker-build-lammps.yaml b/examples/plans/docker-build-lammps.yaml new file mode 100644 index 0000000..b95e1f4 --- /dev/null +++ b/examples/plans/docker-build-lammps.yaml @@ -0,0 +1,18 @@ +name: "LAMMPS Pipeline" +plan: + # A step is a generation and validation step, where validation often interacts with + # the system + - name: "build" + # This is the main prompt that will ask for (and generate) the Dockerfile + prompt: "docker_build_persona" + # And this is how we validate to exit this step. + validate: "docker_build" + inputs: + application: "LAMMPS (Large-scale Atomic/Molecular Massively Parallel Simulator)" + container: "ghcr.io/converged-computing/fractale-mcp:lammps-cpu" + environment: "Ubuntu 24.04, CPU Only" + +# - name: "deploy" +# prompt: "k8s-deploy-persona" +# inputs: +# replicas: 4 diff --git a/fractale/agent/__init__.py b/fractale/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fractale/agent/agent.py b/fractale/agent/agent.py new file mode 100644 index 0000000..cc28d65 --- /dev/null +++ b/fractale/agent/agent.py @@ -0,0 +1,306 @@ +import asyncio +import json +import os +import time + +import httpx +from fastmcp import Client +from fastmcp.client.transports import StreamableHttpTransport +from rich import print + +import fractale.utils as utils +import fractale.agent.backends as backends +import fractale.agent.defaults as defaults +import fractale.agent.logger as logger +import fractale.agent.prompts as prompts +from fractale.agent.base import Agent + + +class MCPAgent(Agent): + """ + Backend-Agnostic Agent that uses MCP Tools. + """ + + def init(self): + # 1. Setup MCP Client + port = os.environ.get("FRACTALE_MCP_PORT", defaults.mcp_port) + token = os.environ.get("FRACTALE_MCP_TOKEN") + url = f"http://localhost:{port}/mcp" + + headers = None + if token: + headers = headers = {"Authorization": token} + transport = StreamableHttpTransport(url=url, headers=headers) + self.client = Client(transport) + + # Initialize the provider. We will do this for each step. + self.init_provider() + + def init_provider(self): + """ + Initialize the provider. + """ + # select Backend based on Config/Env first, then cached version + provider = os.environ.get("FRACTALE_LLM_PROVIDER", "gemini").lower() + + # Other envars come from provider backend + if provider in backends.BACKENDS: + self.backend = backends.BACKENDS[provider]() + else: + raise ValueError(f"Provider {provider} is not available. Did you install dependencies?") + + async def get_prompts_list(self): + """ + Get list of prompts. A prompt is technically a persona/role + that was previously considered an entire agent. Now we pair a prompt + with an MCP backend and get a full agent. + """ + async with self.client: + prompts = await self.client.list_prompts_mcp() + return prompts + + async def get_tools_list(self): + """ + Get list of tools. + """ + async with self.client: + tools = await self.client.list_tools() + return tools + + async def execute(self, context, step): + """ + The Async Loop that will start with a prompt name, retrieve it, + and then respond to it until the state is successful. + """ + start_time = time.perf_counter() + + # We keep the client connection open for the duration of the step + async with self.client: + + # These are tools available to agent + # TODO need to filter these to be agent specific? + mcp_tools = await self.client.list_tools() + await self.backend.initialize(mcp_tools) + + # Get prompt to give goal/task/personality to agent + args = getattr(context, "data", context) + + # This partitions inputs, adding inputs from the step and separating + # those from extra + args, extra = step.partition_inputs(args) + instruction = await self.fetch_persona(step.prompt, args) + message = json.loads(instruction)["messages"][0]["content"]["text"] + self.ui.log(message) + + # Run the loop up to some max attempts (internal state machine with MCP tools) + response_text = await self.run_llm_loop(instruction, step, context) + + self.record_usage(time.perf_counter() - start_time) + return response_text + + async def manual_step_run(self, name, args): + """ + Manually run a step (typically after a generation, like a validation). + """ + result = await self.client.call_tool(name, args) + if hasattr(result, "content") and isinstance(result.content, list): + content = result.content[0].text + else: + content = str(result) + + # probably need to harden this more. + was_error = True if "❌" in content or "Error" in content else False + self.record_step(name, args, content) + self.ui.on_step_update(content) + return was_error, content + + async def call_tools(self, calls): + """ + call tools. + """ + tool_outputs = [] + for call in calls: + t_name = call["name"] + t_args = call["args"] + t_id = call.get("id") + logger.info(f"🛠️ Calling: {t_name}") + has_error = False + + try: + result = await self.client.call_tool(t_name, t_args) + content = result.content[0].text if hasattr(result, "content") else str(result) + except Exception as e: + content = f"❌ ERROR: {e}" + has_error = True + self.record_step(t_name, t_args, content) + self.ui.on_step_update(content) + tool_outputs.append({"id": t_id, "name": t_name, "content": content}) + # Return content (with error) early if we generated one + if has_error: + return has_error, content, tool_outputs + return has_error, content, tool_outputs + + async def run_llm_loop(self, instruction, step, context) -> str: + """ + Process -> Tool -> Process loop. + We need to return on some state of success or ultimate failure. + """ + max_loops = context.get("max_loops", 15) + loops = 0 + use_tools = step.validate in [None, ""] + print(f"Using tools? {use_tools}") + while loops < max_loops: + + loops += 1 + print(f"Calling {instruction}") + response, reason, calls = self.backend.generate_response( + prompt=instruction, use_tools=use_tools + ) + + # Reset tool outputs + tool_outputs = None + + # These are here for debugging now. + if reason: + print("🧠 Thinking:") + self.ui.log(reason) + if response: + print("🗣️ Response:") + self.ui.log(response) + print("🔨 Tools available:") + self.ui.log(self.backend.tools_schema) + if calls: + print("📞 Calls requested:") + self.ui.log(calls) + + # We validate OR allow it to call tools. + # Validate is a manual approach for LLMs that suck at following instructions + if step.validate: + + # The response may be nested in a code block. + err, args = self.get_code_block(response) + if err: + print(f" Error parsing code block from response: {err}") + instruction = prompts.was_format_error_prompt(response) + continue + + err, response = await self.manual_step_run(step.validate, args) + if err: + instruction = prompts.was_error_prompt(response) + continue + + # Call tools - good luck. + elif not calls: + logger.info("🛑 Agent finished (No tools called).") + break + + else: + has_error, response, tool_outputs = await self.call_tools(calls) + if has_error: + instruction = prompts.was_error_prompt(response) + continue + + # If we are successful, give back to llm to summarize (and we need to act on its response) + print("Calling again...") + response, reason, calls = self.backend.generate_response(tool_outputs=tool_outputs) + if response: + self.ui.log(f"🤖 Thought:\n{response}") + + return response + + def get_code_block(self, response, code_type=None): + """ + Get a code block from the response. + """ + # If we already have dict, we are good. + if isinstance(response, dict): + return None, response + + # Try to json load if already have string + # Models are adding extra newlines where they shouldn't be... + try: + if isinstance(response, str): + return None, json.loads(response) + except Exception as e: + return str(e), None + + # We might have a code block nested in other output + try: + return None, json.loads(utils.get_code_block(response, code_type)) + except Exception as e: + return str(e), None + + async def fetch_persona(self, prompt_name: str, arguments: dict) -> str: + """ + Asks the MCP Server to render the prompt template. + + This is akin to rendering or fetching the person. E.g., "You are X and + here are your instructions for a task." + """ + self.ui.log(f"📥 Persona: {prompt_name}") + prompt_result = await self.client.get_prompt(name=prompt_name, arguments=arguments) + # MCP Prompts return a list of messages (User/Assistant/Text). + # We squash them into a single string for the instruction. + msgs = [] + for m in prompt_result.messages: + if hasattr(m.content, "text"): + msgs.append(m.content.text) + else: + msgs.append(str(m.content)) + + instruction = "\n\n".join(msgs) + + # Set the prompt if we have a ui for it. + if self.ui and hasattr(self.ui, "on_set_prompt"): + self.ui.on_set_prompt(instruction) + + return instruction + + def record_step(self, tool, args, output): + """ + Record step metadata. + TODO: refactor this into metadata registry (decorator) + """ + if "steps" not in self.metadata: + self.metadata["steps"] = [] + self.metadata["steps"].append( + { + "tool": tool, + "args": args, + "output_snippet": str(output)[:200], + "timestamp": time.time(), + } + ) + + def record_usage(self, duration): + """ + Record token usage. + TODO: refactor this into metadata registry (decorator) + """ + if hasattr(self.backend, "token_usage"): + usage = self.backend.token_usage + self.metadata["llm_usage"].append( + { + "duration": duration, + "prompt": usage.get("prompt_tokens", 0), + "completion": usage.get("completion_tokens", 0), + } + ) + + def run_step(self, context, step): + """ + Run step is called from the Agent run (base class) + It's here so we can asyncio.run the thing! + """ + try: + final_result = asyncio.run(self.execute(context, step)) + print("final result") + import IPython + + IPython.embed() + context.result = final_result + except Exception as e: + context["error_message"] = str(e) + logger.error(f"Agent failed: {e}") + raise e + return context diff --git a/fractale/agent/backends/__init__.py b/fractale/agent/backends/__init__.py new file mode 100644 index 0000000..231037d --- /dev/null +++ b/fractale/agent/backends/__init__.py @@ -0,0 +1,24 @@ +BACKENDS = {} + +# Attempt import of each +# This is ugly, but it works! +try: + from .gemini import GeminiBackend + + BACKENDS["gemini"] = GeminiBackend +except ImportError: + pass + +try: + from .openai import OpenAIBackend + + BACKENDS["openai"] = OpenAIBackend +except ImportError: + pass + +try: + from .llama import LlamaBackend + + BACKENDS["llama"] = LlamaBackend +except ImportError: + pass diff --git a/fractale/agent/backends/gemini.py b/fractale/agent/backends/gemini.py new file mode 100644 index 0000000..f0be7e4 --- /dev/null +++ b/fractale/agent/backends/gemini.py @@ -0,0 +1,89 @@ +import os +from typing import Any, Dict, List + +from .llm import LLMBackend + + +class GeminiBackend(LLMBackend): + def __init__(self, model_name="gemini-1.5-pro"): + """ + Init Gemini! We can try the newer one (3.0) when we test. + """ + # Don't import unless we are actually using. + import google.generativeai as genai + + self.genai = genai + try: + genai.configure(api_key=os.environ["GEMINI_API_KEY"]) + # I'm allowing this for now because I don't have a working one... + except KeyError: + print("❌ GEMINI_API_KEY missing.") + self.model_name = model_name + self.chat = None + self._usage = {} + + async def initialize(self, mcp_tools: List[Any]): + """ + Convert MCP tools to Gemini Format + """ + gemini_tools = [] + for tool in mcp_tools: + gemini_tools.append( + { + "name": tool.name.replace("-", "_"), # Gemini hates dashes + "description": tool.description, + "parameters": tool.inputSchema, + } + ) + + model = self.genai.GenerativeModel(self.model_name, tools=gemini_tools) + self.chat = model.start_chat(enable_automatic_function_calling=False) + + def generate_response(self, prompt: str = None, tool_outputs: List[Dict] = None): + """ + Generate Gemini response. + + This is currently setup as a chat - we need to make sure we can do a one-off + message (no memory or bias). + """ + response = None + + # Sending tool outputs (previous turn was a function call) + if tool_outputs: + parts = [] + for output in tool_outputs: + parts.append( + self.genai.protos.Part( + function_response=genai.protos.FunctionResponse( + name=output["name"].replace("-", "_"), + response={"result": output["content"]}, + ) + ) + ) + response = self.chat.send_message(self.genai.protos.Content(parts=parts)) + + # Sending new text + elif prompt: + response = self.chat.send_message(prompt) + + # Extract Logic + self._usage = { + "prompt_tokens": response.usage_metadata.prompt_token_count, + "completion_tokens": response.usage_metadata.candidates_token_count, + } + + part = response.candidates[0].content.parts[0] + text_content = response.text if not part.function_call else "" + + tool_calls = [] + if part.function_call: + fc = part.function_call + tool_calls.append( + {"name": fc.name.replace("_", "-"), "args": dict(fc.args)} # Map back to MCP + ) + + return text_content, msg.reasoning_content, tool_calls + + @property + def token_usage(self): + return self._usage diff --git a/fractale/agent/backends/llama.py b/fractale/agent/backends/llama.py new file mode 100644 index 0000000..550dddc --- /dev/null +++ b/fractale/agent/backends/llama.py @@ -0,0 +1,113 @@ +import json +import os +from typing import Any, Dict, List + +import fractale.agent.backends.prompts as prompts + +from .llm import LLMBackend + + +class LlamaBackend(LLMBackend): + """ + Backend for Meta Llama 3.1+ models. + """ + + def __init__(self, model_name="meta-llama/Llama-3.3-70B-Instruct"): + base_url = os.environ.get("LLAMA_BASE_URL", "http://localhost:11434/v1") + api_key = os.environ.get("LLAMA_API_KEY", "ollama") + self.disable_history = os.environ.get("LLAMA_DISABLE_HISTORY") is not None + + # self.client = AsyncOpenAI(base_url=base_url, api_key=api_key) + import openai + + self.client = openai.OpenAI(base_url=base_url, api_key=api_key) + self.model_name = os.environ.get("LLAMA_MODEL") or model_name + + self.history = [] + self.tools_schema = [] + self._usage = {} + + async def initialize(self, mcp_tools: List[Any]): + """ + Llama 3.1 follows the OpenAI Tool Schema standard. + + TODO: vsoch see if we can consolidate with OpenAI base when testing. + """ + self.tools_schema = [] + for tool in mcp_tools: + self.tools_schema.append( + { + "type": "function", + "function": { + "name": tool.name, # Llama handles dashes fine + "description": tool.description, + "parameters": tool.inputSchema, + }, + } + ) + + def generate_response( + self, prompt: str = None, tool_outputs: List[Dict] = None, use_tools=True + ): + """ + Manage history and call Llama. + """ + if prompt: + # llama does better with system prompt. + if not self.history: + self.history.append( + { + "role": "system", + "content": prompts.with_tools if use_tools else prompts.without_tools, + } + ) + # We have to add this to history, the main prompt. + self.history.append({"role": "user", "content": prompt}) + + # Handle tool outputs (Function results) + if tool_outputs and use_tools and not self.disable_history: + for out in tool_outputs: + self.history.append( + { + "role": "tool", + # Required for the conversation graph + "tool_call_id": out["id"], + "content": str(out["content"]), + } + ) + + # Derive choice and options - we can add additional filters here. + tool_args = self.select_tools(use_tools) + try: + response = self.client.chat.completions.create( + model=self.model_name, messages=self.history, **tool_args + ) + except Exception as e: + return f"LLAMA API ERROR: {str(e)}", "", [] + + print(f"Response {response}") + msg = response.choices[0].message + if response.usage: + self._usage = dict(response.usage) + + # Store history and get text content ONLY if not disabled. + if not self.disable_history: + self.history.append(msg) + text_content = msg.content or "" + + tool_calls = [] + if msg.tool_calls: + for tc in msg.tool_calls: + tool_calls.append( + { + "id": tc.id, + "name": tc.function.name, + "args": json.loads(tc.function.arguments), + } + ) + + return text_content, msg.reasoning_content, tool_calls + + @property + def token_usage(self): + return self._usage diff --git a/fractale/agent/backends/llm.py b/fractale/agent/backends/llm.py new file mode 100644 index 0000000..5332aae --- /dev/null +++ b/fractale/agent/backends/llm.py @@ -0,0 +1,60 @@ +import json +from abc import ABC, abstractmethod +from typing import Any, Dict, List + + +class LLMBackend(ABC): + """ + Abstract interface for any LLM provider (Gemini, OpenAI, Llama, Local). + """ + + @abstractmethod + async def initialize(self, mcp_tools: List[Any]): + """ + Convert MCP tools to provider-specific format and setup session. + """ + pass + + def ensure_json(self, response): + """ + Require an LLM to return json. + """ + while True: + try: + return json.loads(response) + except Exception as e: + prompt = f"Your response {response} was not valid json: {e}" + response, _, _ = self.backend.generate_response(prompt=prompt) + + def select_tools(self, use_tools=True): + """ + Clean logic to decide to use a tool or not. + """ + if not use_tools: + return {} + + # TODO: we could apply more filters here. + tool_schema = self.tool_schema + tool_choice = "auto" + print(f"Tool choice: {tool_choice}") + print(f"Tool schema: {tools_schema}") + return {"tool_choice": tool_choice, "tools": tool_schema} + + @abstractmethod + def generate_response( + self, prompt: str = None, tool_outputs: List[Dict] = None, use_tools=True + ): + """ + Returns a text_response, tool_calls + """ + pass + + @property + @abstractmethod + def token_usage(self) -> Dict: + """ + Return token stats for metadata stuffs. + + Note from V: we need a more robust provenance tracking thing. + """ + pass diff --git a/fractale/agent/backends/openai.py b/fractale/agent/backends/openai.py new file mode 100644 index 0000000..d620974 --- /dev/null +++ b/fractale/agent/backends/openai.py @@ -0,0 +1,90 @@ +import os +from typing import Any, Dict, List + +from rich import print + +from .llm import LLMBackend + + +class OpenAIBackend(LLMBackend): + """ + Backend to use OpenAI + """ + + def __init__(self, model_name="gpt-5-mini"): #model_name="openai/gpt-oss-120b"): + # Needs to be tested if base url is None. + # Switch to async if/when needed. Annoying for development + # self.client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL")) + import openai + + self.client = openai.OpenAI( + api_key=os.environ.get("OPENAI_API_KEY"), base_url=os.environ.get("OPENAI_BASE_URL") + ) + self.model_name = model_name + self.history = [] + self.tools_schema = [] + self._usage = {} + + async def initialize(self, mcp_tools: List[Any]): + """ + Tell this jerk about all the MCP tools. + """ + self.tools_schema = [] + for tool in mcp_tools: + self.tools_schema.append( + { + "type": "function", + "function": { + "name": tool.name, + "description": tool.description, + "parameters": tool.inputSchema, + }, + } + ) + + def generate_response( + self, prompt: str = None, tool_outputs: List[Dict] = None, use_tools=True + ): + """ + Generate the response and update history. + """ + if prompt: + self.history.append({"role": "user", "content": prompt}) + if tool_outputs: + for out in tool_outputs: + self.history.append( + {"role": "tool", "tool_call_id": out["id"], "content": str(out["content"])} + ) + + default_tool_choice = "auto" if self.tools_schema else None + tool_choice = True if use_tools else default_tool_choice + tools_schema = self.tools_schema if tool_choice else None + response = self.client.chat.completions.create( + model=self.model_name, + messages=self.history, + tools=tools_schema, + tool_choice=tool_choice, + ) + print(response) + msg = response.choices[0].message + + # Save assistant reply to history + self.history.append(msg) + self._usage = dict(response.usage) + + tool_calls = [] + if msg.tool_calls: + for tc in msg.tool_calls: + tool_calls.append( + { + "id": tc.id, + "name": tc.function.name, + "args": json.loads(tc.function.arguments), + } + ) + + return msg.content, "", tool_calls + + @property + def token_usage(self): + return self._usage diff --git a/fractale/agent/backends/prompts.py b/fractale/agent/backends/prompts.py new file mode 100644 index 0000000..8f75865 --- /dev/null +++ b/fractale/agent/backends/prompts.py @@ -0,0 +1,4 @@ +with_tools = ( + "You are a helpful assistant with access to tools. You must use them to answer questions." +) +without_tools = "You are a helpful assistant without access to tools. You must answer questions." diff --git a/fractale/agent/base.py b/fractale/agent/base.py new file mode 100644 index 0000000..238ab48 --- /dev/null +++ b/fractale/agent/base.py @@ -0,0 +1,115 @@ +import copy +import os +import time +from typing import Any, Dict + +from fractale.logger import logger +from fractale.ui.adapters.cli import CLIAdapter + + +class Agent: + """ + Base Agent Infrastructure. + Handles caching, retry counters, metadata logging, and error wrappers. + Does NOT know about MCP or LLMs. + """ + + # Variables to clear on retry + state_variables = ["result", "error_message"] + + def __init__( + self, + name: str = "agent", + use_cache: bool = False, + results_dir: str = None, + save_incremental: bool = False, + max_attempts: int = None, + ui=None, + ): + self.name = name + self.attempts = 0 + self.max_attempts = max_attempts + + self.results_dir = results_dir or os.getcwd() + self.save_incremental = save_incremental + + # Initialize Metadata tracking + self.init_metadata() + self.init() + self.ui = ui or CLIAdapter() + + def init(self): + """ + Init operations, intended to override in subclass. + """ + pass + + def init_metadata(self): + self.metadata = { + "name": self.name, + "times": {}, + "assets": {}, + "failures": [], + # TODO: likely we want to replace this with the metadata registry. + "counts": {"retries": 0, "return_to_manager": 0, "return_to_human": 0}, + "llm_usage": [], + } + + def run(self, context, step): + """ + Main execution wrapper + """ + # Ensure max_attempts is set + context["max_attempts"] = self.max_attempts or context.get("max_attempts") + logger.info(f"▶️ Running {self.name}...") + start_time = time.time() + + try: + # Call abstract method + context = self.run_step(context, step) + + finally: + duration = time.time() - start_time + self.metadata["times"]["execution"] = duration + + return context + + def run_step(self, context, step): + """ + Abstract: Implemented by MCPAgent + """ + raise NotImplementedError(f"Agent {self.name} missing run_step") + + def reset_context(self, context): + """ + Clears output variables to prepare for a retry. + """ + # Convert to dict if it's a Context object + is_obj = hasattr(context, "data") + data = context.data if is_obj else context + + # Clear state variables + for key in self.state_variables: + if key in data: + del data[key] + + # Archive current metadata into failures list + if "failures" not in self.metadata: + self.metadata["failures"] = [] + + # Snapshot current metadata + self.metadata["failures"].append(copy.deepcopy(self.metadata)) + + # Reset current counters (keep retries count consistent) + current_retries = self.metadata["counts"]["retries"] + self.init_metadata() + self.metadata["counts"]["retries"] = current_retries + return context + + def reached_max_attempts(self): + """ + Return true if we have reached maximum number of attempts. + """ + if not self.max_attempts: + return False + return self.attempts >= self.max_attempts diff --git a/fractale/agent/context.py b/fractale/agent/context.py new file mode 100644 index 0000000..f11cebb --- /dev/null +++ b/fractale/agent/context.py @@ -0,0 +1,98 @@ +import collections + +import fractale.agent.logger as logger + + +def get_context(context): + """ + Get or create the context. + """ + if isinstance(context, Context): + return context + return Context(context) + + +class Context(collections.UserDict): + """ + A custom dictionary that allows attribute-style access to keys, + and extends the 'get' method with a 'required' argument. + + The context for an agent should be populated with metadata that + needs to move between agents. The manager decides what from the + context to pass to agents for an updated context. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def reset(self): + """ + Reset the return code and result. + """ + for key in ["return_code", "result", "error_message"]: + self.data[key] = None + + def is_managed(self): + """ + Is the context being managed? + """ + return self.get("managed") is True + + def __getattribute__(self, name): + """ + Intercepts all attribute lookups (including methods/functions) + """ + try: + # Step 1: this would be a normal attribute + attr = object.__getattribute__(self, name) + except AttributeError: + # Then handle lookup of dict key by attribute + return super().__getattribute__(name) + + # Step 2: We allow "get" to be called with defaults / required. + if name == "get": + original_get = attr + + def custom_get(key, default=None, required=False): + """ + Wrapper for the standard dict.get() method. + Accepts the custom 'required' argument. + """ + if required: + if key not in self.data: + raise ValueError(f"Key `{key}` is required but missing") + logger.exit(f"Key `{key}` is required but missing", title="Context Status") + + # If required and found, just return the value + return self.data[key] + else: + # If not required, use the original dict.get behavior + return original_get(key, default) + + # Return the wrapper function instead of the original method + return custom_get + + # 4. For any other attribute (like keys(), items(), update(), or custom methods) + # return the attribute we found earlier + return attr + + # 5. Override __getattr__ to handle attribute-style access to dictionary keys + def __getattr__(self, name): + """ + Allows access to dictionary keys as attributes. + """ + if name in self.data: + return self.data[name] + raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") + + def __setattr__(self, name, value): + """ + Allows setting keys via attribute assignment. + """ + # If the attribute name is a reserved name (like 'data'), set it normally + if name in ("data", "_data"): + super().__setattr__(name, value) + + # Otherwise, treat it as a dictionary key + else: + self.data[name] = value diff --git a/fractale/agent/defaults.py b/fractale/agent/defaults.py new file mode 100644 index 0000000..d774c90 --- /dev/null +++ b/fractale/agent/defaults.py @@ -0,0 +1,18 @@ +environment = "generic cloud environment" +gemini_model = "gemini-2.5-pro" +mcp_port = "8089" + +# These are common / default args we don't need to give in any prompt. +shared_args = { + "command", + "config_dir", + "debug", + "error_message", + "incremental", + "outfile", + "plan", + "quiet", + "result", + "results", + "version", +} diff --git a/fractale/agent/logger.py b/fractale/agent/logger.py new file mode 100644 index 0000000..a560136 --- /dev/null +++ b/fractale/agent/logger.py @@ -0,0 +1,64 @@ +import sys + +from rich import print +from rich.panel import Panel + + +def success(message, title="Success", border_style="green", expand=True): + """ + Helper function to print successful message. + """ + print( + Panel( + f"[bold green]✅ {message}[/bold green]", + title=title, + border_style=border_style, + expand=expand, + ) + ) + + +def error(message, title="Error", border_style="red", expand=True): + """ + Helper function to print error "beep boop" message. + """ + print( + Panel( + f"[bold red]❌ {message}[/bold red]", + title=title, + border_style=border_style, + expand=expand, + ) + ) + + +def exit(message, title="Error", border_style="red", expand=True): + error(message, title, border_style, expand) + sys.exit(-1) + + +def warning(message, title="Warning", border_style="yellow"): + """ + Helper function to print a warning + """ + print( + Panel( + message, + title=f"[yellow]{title}[/yellow]", + border_style=border_style, + ) + ) + + +def custom(message, title, border_style=None, expand=True): + """ + Custom message / title Panel. + """ + if not border_style: + print(Panel(message, title=title, expand=expand)) + else: + print(Panel(message, title=title, border_style=border_style, expand=expand)) + + +def info(message): + print(f"\n[bold cyan] {message}[/bold cyan]") diff --git a/fractale/agent/manager/__init__.py b/fractale/agent/manager/__init__.py new file mode 100644 index 0000000..cf32844 --- /dev/null +++ b/fractale/agent/manager/__init__.py @@ -0,0 +1 @@ +from .agent import ManagerAgent diff --git a/fractale/agent/manager/agent.py b/fractale/agent/manager/agent.py new file mode 100644 index 0000000..f0c97d2 --- /dev/null +++ b/fractale/agent/manager/agent.py @@ -0,0 +1,354 @@ +import asyncio +import json +import os +from datetime import datetime + +from rich import print +from rich.prompt import Prompt as RichPrompt + +import fractale.agent.logger as logger +import fractale.agent.manager.prompts as prompts +import fractale.utils as utils +from fractale.agent.agent import MCPAgent +from fractale.agent.context import get_context +from fractale.agent.manager.plan import Plan +from fractale.utils.timer import Timer + +# The manager IS an agent itself since it needs to decide how to recover. + + +class ManagerAgent(MCPAgent): + """ + An LLM-powered orchestrator that executes a plan. + It acts as a supervisor: + 1. Validates the plan against the server. + 2. Dispatches steps to UniversalAgents. + 3. Handles failure via LLM reasoning OR Human intervention. + """ + + def init(self): + """ + Initialize the MCPAgent infrastructure (MCP Client + Backend). + """ + # This sets up the MCP connection and LLM Backend + super().init() + + async def validate_personas(self, plan): + """ + Queries the MCP server to ensure all personas in the plan exist. + A persona == a prompt. Each step has an associated prompt (persona). + """ + # Attempt to list prompts from server + server_prompts = await self.client.list_prompts() + schema_map = {} + + # FastMCP (bottom) vs Standard MCP (top) return types + if hasattr(server_prompts, "prompts"): + prompts_list = server_prompts.prompts + else: + prompts_list = server_prompts + + # We do this once so later we can call prompt functions and know args vs. context + for p in prompts_list: + # Store set of valid argument names + args = {arg.name for arg in p.arguments} if p.arguments else set() + schema_map[p.name] = args + + print(f"🔎 Validating {len(plan)} steps...") + for step in plan.agents: + if step.prompt not in schema_map: + raise ValueError( + f"❌ Plan Validation Failed: Step '{step.name}' requests persona '{step.prompt}', " + f"but server only has: {schema_map.keys()}" + ) + # Ensure we separate arguments from extra + # This does not check to see if we have required, since they + # might come from a previous step. + step.set_schema(schema_map[step.prompt]) + + # store in the manager metadata so steps can access it + self.metadata["assets"]["prompt_schemas"] = schema_map + print("✅ Personas validated and schemas cached") + + def get_recovery_step(self, context, failed_step, plan): + """ + Uses the LLM Backend to decide which agent to call to fix an error. + """ + # We describe each step (akin to a function) for the manager to choose + descriptions = "" + for step in plan.agents: + descriptions += f"- {step.agent}: {step.description}\n" + if step.agent == failed_step.agent: + break + + # Build the prompt to recover from some failure. + prompt_text = prompts.recovery_prompt % ( + descriptions, + failed_step.agent, + context.error_message, + ) + logger.warning( + f"🤔 Consulting Manager for recovery from {failed_step.agent} failure...", + title="Error Triage", + ) + + # TODO: test and make more resilient if needed + next_step = None + while not next_step: + + # Use the backend directly - get back tuple (text, calls) + # Note I'm trying to do these NOT async because it's easier to debug + response_text = self.backend.generate_response(prompt=prompt_text)[0] + next_step = json.loads(utils.get_code_block(response_text, "json")) + + # Validate - we require the agent name and description + if ( + "agent_name" not in step_json + or "task_description" not in next_step + or "reason" not in next_step + ): + raise ValueError("Missing keys") + if step_json["agent_name"] not in plan.agent_names: + raise ValueError(f"Unknown agent {step_json['agent_name']}") + + agent_name = recovery_step["agent_name"] + logger.warning(f"Recovering to agent: [bold cyan]{agent_name}[/bold cyan]") + + # Find index of target agent + found_index = -1 + for idx, ag in enumerate(plan.agents): + if ag.name == agent_name: + found_index = idx + break + + next_step["index"] = found_index + + # Update recovery metadata with choice. + if failed_step.name not in self.metadata["assets"]["recovery"]: + self.metadata["assets"]["recovery"][failed_step.name] = [] + self.metadata["assets"]["recovery"][failed_step.name].append(next_step) + + return next_step + + def check_personas(self, plan, personas): + """ + Ensure that the prompt (persona) requested by each step is one + known to the MCP server. + """ + for step in plan.agents: + if step.prompt not in persons: + raise ValueError( + f"Unknown persona {step.prompt} in step {step.name}. Available: {personas}" + ) + + def run(self, context): + """ + Executes a plan-driven workflow with intelligent error recovery. + """ + # Ensure context is wrapped + context = get_context(context) + + # Init metadata if needed + if "recovery" not in self.metadata["assets"]: + self.metadata["assets"]["recovery"] = {} + + context.managed = True + self.max_attempts = self.max_attempts or 10 + + # Plan parses the list of agent configs (prompts) + plan_path = context.get("plan", required=True) + plan = Plan(plan_path, save_incremental=self.save_incremental) + + # Connect and validate (don't allow connect without validate) + asyncio.run(self.connect_and_validate(plan)) + + # Still pass the shared context to all tasks + try: + tracker = self.run_tasks(context, plan) + self.metadata["status"] = "Succeeded" + self.save_results(tracker, plan) + logger.custom( + f"Workflow Complete. {len(tracker)} steps executed.", + title="[bold green]Success[/bold green]", + ) + return tracker + + except Exception as e: + self.metadata["status"] = "Failed" + logger.error(f"Orchestration failed: {e}", title="Failure") + raise e + + async def connect_and_validate(self, plan): + """ + Setup client and check prompts. + """ + async with self.client: + # Check if server has the prompts we need + await self.validate_personas(plan) + + # Initialize our backend LLM with the available tools + mcp_tools = await self.client.list_tools() + await self.backend.initialize(mcp_tools) + + def run_tasks(self, context, plan): + """ + Run agent tasks until stopping condition. + """ + tracker = [] + timer = Timer() + current_step_index = 0 + + # Initialize recovery history + if "recovery" not in self.metadata["assets"]: + self.metadata["assets"]["recovery"] = {} + + # Global Manager Loop + while current_step_index < len(plan): + + # This is an instance of MCPAgent + step = plan[current_step_index] + inputs = step.get("inputs", {}) + self.ui.on_step_start(step.name, step.description, inputs) + + # instantiate the agent here. If we need/want, we can cache the + # initial envars (credentials) to not need to discover them again. + agent = MCPAgent( + name=step.name, + save_incremental=plan.save_incremental, + max_attempts=step.max_attempts, + # Pass the UI down so the agent uses same interface + ui=self.ui, + ) + # Update step context + context = step.update_context(context) + + # Execute the step. This is akin to a tiny state machine + # The agent (persona prompt + LLM) is making calls to MCP tools + # Agent -> run is a wrapper to agent.run_step. + with timer: + context = agent.run(context, step) + + # Results, error, and metadata + result = context.get("result") + error = context.get("error_message") + metadata = agent.metadata + + # update the accordion header color and shows the result/error box + self.ui.on_step_finish(step.name, result, error, metadata) + + # Record metrics + # Note: step_agent.metadata is populated by the agent + tracker.append( + { + "agent": step.name, + "duration": timer.elapsed_time, + "result": result, + "error": error, + "attempts": self.attempts, + "metadata": metadata, + } + ) + + # If we have a result and no error message, success. + if result and not error: + current_step_index += 1 + context.reset() + continue + + # Check global manager limits + if self.reached_max_attempts(): + self.ui.on_log("Manager reached max attempts. Aborting.", level="error") + break + self.attempts += 1 + + # Always ask user if an entire step fails. + action = self.ui.ask_user( + f"Step Failed: {err}\nAction?", options=["retry", "assist", "auto", "quit"] + ) + action = action.lower().strip() or "auto" + + # Respond to the action + if action == "quit": + break + elif action == "retry": + context = self.reset_context(context, plan, step) + continue + + # Human in the loop! Ask for a hint and add to error message + elif action == "assist": + hint = self.ui.ask_user("Enter instructions for the agent") + context["error_message"] = f"Previous Error: {error}\nUser Advice: {hint}" + context = self.reset_context(context, plan, step) + continue + + # If we get down here (auto) we ask the manager for a recovery step. + elif action == "auto": + + # If we failed the first step, just try again. + if current_step_index == 0: + context = self.reset_context(context, plan) + continue + + # Otherwise ask the manager to choose. + recovery_step = self.get_recovery_step(context, agent, plan) + index = recovery_step["index"] + if not recovery_step: + self.ui.on_log("Manager could not determine recovery.", level="error") + break + + if index == -1: + self.ui.on_log(f"Recovery agent {target_name} not found!", level="error") + break + + # Reset context up to that point + current_step_index = index + context = self.reset_context(context, plan, plan[current_step_index]) + context["error_message"] = prompts.get_retry_prompt( + context, recovery_step["reason"] + ) + continue + + if current_step_index == len(plan): + self.metadata["status"] = "Succeeded" + self.ui.on_workflow_complete("Success") + else: + self.metadata["status"] = "Failed" + self.ui.on_workflow_complete("Failed") + return tracker + + def save_results(self, tracker, plan): + """ + Save results to file based on timestamp. + """ + if not os.path.exists(self.results_dir): + os.makedirs(self.results_dir) + now = datetime.now() + timestamp = now.strftime("%Y-%m-%d_%H-%M-%S") + results_file = os.path.join(self.results_dir, f"results-{timestamp}.json") + + # We assume plan has a .plan attribute or similar to get raw dict + manager_meta = getattr(plan, "plan", {}) + + if self.metadata["times"]: + manager_meta["times"] = self.metadata["times"] + if self.metadata["assets"]["recovery"]: + manager_meta["recovery"] = self.metadata["assets"]["recovery"] + + result = {"steps": tracker, "manager": manager_meta, "status": self.metadata["status"]} + utils.write_json(result, results_file) + + def reset_context(self, context, plan, failed_step=None): + """ + Reset context state variables up to the failed step. + """ + # We iterate through agents and call their reset logic + for step in plan.agents: + context = step.reset_context(context) + + # If we reached the step we are rolling back to, stop clearing. + if failed_step is not None and step.name == failed_step.name: + break + return context + + def reached_max_attempts(self): + return self.attempts >= self.max_attempts diff --git a/fractale/agent/manager/plan.py b/fractale/agent/manager/plan.py new file mode 100644 index 0000000..d6ff2b9 --- /dev/null +++ b/fractale/agent/manager/plan.py @@ -0,0 +1,208 @@ +import jsonschema +from jsonschema import validators +from rich import print + +import fractale.utils as utils + + +def set_defaults(validator, properties, instance, schema): + """ + Fill in default values for properties that are missing. + """ + for prop, sub_schema in properties.items(): + if "default" in sub_schema: + instance.setdefault(prop, sub_schema["default"]) + + +# Extend validator to apply defaults +plan_validator = validators.extend( + jsonschema.Draft7Validator, + {"properties": set_defaults}, +) + +plan_schema = { + "type": "object", + "properties": { + "name": {"type": "string"}, + "description": {"type": "string"}, + "inputs": {"type": "object", "default": {}}, + "plan": { + "type": "array", + "minItems": 1, + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + # This defines the (previous) agent or persona/role + "prompt": {"type": "string"}, + "description": {"type": "string"}, + "inputs": { + "type": "object", + "additionalProperties": True, + }, + }, + "required": ["name", "prompt"], + }, + }, + }, + "required": ["name", "plan"], +} + + +class Plan: + """ + A plan for a manager includes one or more steps, each defined by a Prompt Persona. + """ + + def __init__(self, plan, save_incremental=False): + if isinstance(plan, dict): + self.plan = plan + self.plan_path = "memory" + else: + self.plan_path = plan + self.plan = utils.read_yaml(self.plan_path) + + self.step_names = set() + self.save_incremental = save_incremental + + # Validate structure... + self.validate_schema() + + # And create steps. Personas validated outside of here with manager. + self.load() + + def validate_schema(self): + """ + Validate against JSON schema. + """ + validator = plan_validator(plan_schema) + try: + validator.validate(self.plan) + print("✅ Plan schema is valid.") + except Exception as e: + raise ValueError(f"❌ Plan YAML invalid: {e}!") + + def load(self): + """ + Initialize the steps in the plan. + """ + print(f"Loading plan from [bold magenta]{self.plan_path}[/bold magenta]...") + self.agents = [] # We refer to steps as agents (v1 of this library) + + for spec in self.plan.get("plan", []): + step_name = spec["name"] + + if step_name in self.step_names: + raise ValueError(f"Duplicate step name: '{step_name}'") + self.step_names.add(step_name) + + step = Step(spec, save_incremental=self.save_incremental) + self.agents.append(step) + + def __len__(self): + return len(self.agents) + + def __getitem__(self, key): + """ + Allows indexing plan[0] + """ + return self.agents[key] + + @property + def agent_names(self): + """ + Used by Manager for recovery lookup. + """ + return [step.name for step in self.agents] + + +class Step: + """ + Wraps a specific execution step. + Holds configuration + schema. + """ + + def __init__(self, spec, save_incremental=False): + self.spec = spec + self.save_incremental = save_incremental + self._prompt_args = set() + self.attempts = 0 + + def set_schema(self, valid_args: set): + """ + Called by Manager to define which arguments the Server Prompt accepts. + """ + self._prompt_args = valid_args + + def partition_inputs(self, full_context: dict) -> tuple[dict, dict]: + """ + Splits context into: + 1. Direct Arguments for the MCP Prompt function. + 2. Supplemental Context for the LLM. + """ + # Only update inputs that aren't already defined for the context + full_context = self.update_context(full_context) + + # Fallback if schema missing + if self._prompt_args is None: + return full_context, {} + + prompt_args = {} + background_info = {} + + ignored = { + "managed", + "max_loops", + "max_attempts", + "result", + "error_message", + } + + for key, value in full_context.items(): + if key in self._prompt_args: + prompt_args[key] = value + elif key not in ignored: + background_info[key] = value + + # TODO: we may want to supplement prompt with other "background info" + print(prompt_args) + return prompt_args, background_info + + def update_context(self, context): + """ + Merge step-specific inputs into the context. + """ + overrides = ["max_attempts", "llm_provider", "llm_model"] + inputs = self.spec.get("inputs", {}) + + for k, v in inputs.items(): + if k not in overrides: + context[k] = v + return context + + @property + def name(self): + return self.spec["name"] + + @property + def max_attempts(self): + return self.spec.get("max_attempts") + + @property + def agent(self): + return self.name + + @property + def prompt(self): + return self.spec["prompt"] + + @property + def validate(self): + return self.spec.get("validate") + + @property + def description(self): + return self.spec.get("description", f"Executes persona: {self.prompt}") + + def get(self, name, default=None): + return self.spec.get(name, default) diff --git a/fractale/agent/manager/prompts.py b/fractale/agent/manager/prompts.py new file mode 100644 index 0000000..321cd02 --- /dev/null +++ b/fractale/agent/manager/prompts.py @@ -0,0 +1,42 @@ +from fractale.agent.prompts import Prompt + +recovery_prompt = f"""You are an expert AI workflow troubleshooter. A step in a workflow has failed and reached a maximum number of retries. This could mean that we need to go back in the workflow and redo work. Your job is to analyze the error and recommend a single, corrective step. The steps, each associated with an agent, you can choose from are as follows: + +Available Agents: +%s + +The above is in the correct order, and ends on the agent that ran last with the failure (%s). The error message of the last step is the following: + +%s + +Your job is to analyze the error message to determine the root cause, and decide which agent is best suited to fix this specific error. +- You MUST formulate a JSON object for the corrective step with two keys: "agent_name" "reason" and "task_description". +- The new "task_description" MUST be a clear instruction for the agent to correct the specific error. +- The "reason" must explain why you chose the step and what needs to be done differently. +- You MUST only provide a single JSON object for the corrective step in your response. +- You MUST format your `task_description` to be a "You MUST" statement to the agent. +""" + +# Same three inputs as above plus the unsuccessful attempt +recovery_error_prompt = recovery_prompt.strip() + " Your last attempt was not successful:\n%s" + +retry_task = """You have had previous attempts, and here is the reason we are retrying this step: +{{ issue }} +""" + +retry_instructions = ["You MUST avoid making these errors again."] + +persona = "You are manager of an agentic AI team." +retry_prompt = { + "persona": persona, + "context": "A step in your workflow has determined it cannot continue and returned to you.", + "instructions": retry_instructions, + "task": retry_task, +} + + +def get_retry_prompt(context, issue): + """ + In testing, this should JUST be the error message. + """ + return Prompt(retry_prompt, context).render({"issue": issue}) diff --git a/fractale/agent/prompts.py b/fractale/agent/prompts.py new file mode 100644 index 0000000..a3016d5 --- /dev/null +++ b/fractale/agent/prompts.py @@ -0,0 +1,61 @@ +import copy + +from jinja2 import Template + +template = """ + Persona: + {{persona}} + + {% if context %}Context: {{context|trim}}{% endif %} + + Task: {{task|trim}} + + {% if instructions %}Instructions & Constraints: + {% for line in instructions %}{{line}} + {% endfor %}{% endif %} +""" + + +def was_format_error_prompt(content): + return f"The previous attempt failed:\n{content}\nYou MUST generate a code block or string that can be parsed into JSON." + + +def was_error_prompt(content): + return ( + f"The previous attempt failed:\n{content}\nPlease regenerate and/or fix inputs and retry." + ) + + +class Prompt: + """ + A prompt is a structured instruction for an LLM. + We attempt to define the persona, context, task, audience, instructions, and constraints. + Data sections should use words MUST, MUST NOT, AVOID, ENSURE + """ + + def __init__(self, data, context): + """ + This currently assumes setting a consistent context for one generation. + If that isn't the case, context should be provided in the function below. + """ + self.data = data + self.context = context + + def render(self, kwargs): + """ + Render the final user task, and then the full prompt. + """ + # The kwargs are rendered into task + render = copy.deepcopy(self.data) + + # Do we have additional details for instrucitons? + try: + render["instructions"] += (self.context.get("details") or "").split("\n") + except: + print("ISSUE WITH RENDER IN GENERIC PROMPTS") + import IPython + + IPython.embed() + render["task"] = Template(self.data["task"]).render(**kwargs) + prompt = Template(template).render(**render) + return prompt diff --git a/fractale/cli/__init__.py b/fractale/cli/__init__.py index fb06e8a..976cac2 100644 --- a/fractale/cli/__init__.py +++ b/fractale/cli/__init__.py @@ -10,8 +10,6 @@ install() import fractale -import fractale.agent.parser as parsers -import fractale.defaults as defaults from fractale.logger import setup_logger @@ -94,24 +92,11 @@ def get_parser(): formatter_class=argparse.RawTextHelpFormatter, description="run an agent", ) - agents = agent.add_subparsers( - title="agent", - description="Run an agent", - ) agent.add_argument( - "--plan", - "-p", - dest="plan", + "plan", help="provide a plan to a manager", ) - # If exists, we will attempt to load and use. - agent.add_argument( - "--use-cache", - dest="use_cache", - help="Use (load and save) local cache in pwd/.fractale/", - action="store_true", - default=False, - ) + agent.add_argument("--mode", choices=["cli", "tui", "web"], default="tui") agent.add_argument( "--max-attempts", help="Maximum attempts for a manager or individual agent", @@ -128,9 +113,6 @@ def get_parser(): action="store_true", default=False, ) - - # Add agent parsers - parsers.register(agents) return parser diff --git a/fractale/cli/agent.py b/fractale/cli/agent.py index 8fbbf3f..dbf2cb0 100644 --- a/fractale/cli/agent.py +++ b/fractale/cli/agent.py @@ -1,37 +1,43 @@ -import sys +import os -from fractale.agent import get_agents +from fractale.agent.manager import ManagerAgent def main(args, extra, **kwargs): """ - Run an agent (do with caution!) + Run an agent workflow. """ - agents = get_agents() - - # If we have a plan, we run the manager. - if args.plan is not None: - args.agent_name = "manager" - - # Right now we only have a build agent :) - if args.agent_name not in agents: - sys.exit(f"{args.agent_name} is not a recognized agent.") - - # Get the agent and run! - # - results determines if we want to save state to an output directory - # - save_incremental will add a metadata section - # - max_attempts is for the manager agent (defaults to 10) - agent = agents[args.agent_name]( - use_cache=args.use_cache, + # Prepare Context from Arguments + context = vars(args) + + # Instantiate Manager (Headless). + # This will by default select a headless manager, which we will update + manager = ManagerAgent( results_dir=args.results, save_incremental=args.incremental, max_attempts=args.max_attempts, + ui=None, ) - # This is the context - we can remove variables not needed - context = vars(args) - del context["use_cache"] + # 3. Select Interaction Mode + if args.mode == "tui": + # The App takes ownership of the Manager, I'm not sure how else to do it. + # It creates the TextualAdapter internally and runs manager.run() in a thread. + from fractale.ui.adapters.tui import FractaleApp + + app = FractaleApp(manager, context) + app.run() + + # These next two are blocking for UI interactions + elif args.mode == "web": + from fractale.ui.adapters.web import WebAdapter + + manager.ui = WebAdapter(url="http://localhost:3000") + manager.run(context) + + else: + # This is the default mode (ui) + from fractale.ui.adapters.cli import CLIAdapter - # This is built and tested! We can do something with it :) - # Note that vars converts the argparse arguments into a dictionary - agent.run(context) + manager.ui = CLIAdapter() + manager.run(context) diff --git a/fractale/cli/start.py b/fractale/cli/start.py index 77dbfad..3bc6f51 100644 --- a/fractale/cli/start.py +++ b/fractale/cli/start.py @@ -31,8 +31,7 @@ def main(args, extra, **kwargs): print(f"🔌 Loading tools... ") # Load into the manager (tools, resources, prompts) - for tool in manager.load_tools(args.tools): - mcp.add_tool(tool) + for tool in manager.load_tools(mcp, args.tools): print(f" ✅ Registered: {tool.name}") # Mount the MCP server. Note from V: we can use mount with antother FastMCP @@ -50,4 +49,4 @@ def main(args, extra, **kwargs): # For testing we usually control+C, let's not make it ugly except KeyboardInterrupt: - print("🖥️ Shutting down...") + print("🖥️ Shutting down...") diff --git a/fractale/tools/build/docker/prompts.py b/fractale/tools/build/docker/prompts.py new file mode 100644 index 0000000..55a7281 --- /dev/null +++ b/fractale/tools/build/docker/prompts.py @@ -0,0 +1,78 @@ +from fractale.tools.prompts import format_rules + +PERSONA = "You are a Dockerfile build expert." + +CONTEXT = """We are running experiments that deploy containerized HPC applications. +You are the agent responsible for the build step in that pipeline.""" + +REQUIRES = [ + "You MUST NOT change the name of the application container image provided.", + "Don't worry about users/permissions - just be root.", + "DO NOT forget to install certificates and you MUST NOT apt-get purge.", + "Assume a default of CPU if GPU or CPU is not stated.", + "Do NOT do a multi-stage build, and do NOT COPY or ADD anything from the host.", + "You MUST copy executables to a system location to be on the PATH. Do NOT symlink", + "You are only scoped to edit a Dockerfile to build the image.", +] + +COMMON_INSTRUCTIONS = [ + "If the application involves MPI, configure it for compatibility for the containerized environment.", + 'Do NOT add your narration unless it has a "#" prefix to indicate a comment.', +] + REQUIRES + + +def get_build_text(application, environment, build_rules): + """ + Get prompt text for an initial build. + """ + return f""" +### PERSONA +{PERSONA} + +### CONTEXT +{CONTEXT} + +### GOAL +I need to create a Dockerfile for an application '{application}'. +The target environment is '{environment}'. +You MUST generate a response with ONLY Dockerfile content. +You MUST NOT include other text or thinking with your response. +You do NOT need to write the Dockerfile to disk, but rather provide to the build tool to handle. +You MUST return a JSON response with a "dockerfile" field. +The dockerfile field MUST be a list of commands, where each entry is a single line. + +### REQUIREMENTS & CONSTRAINTS +You must adhere to these rules strictly: +{format_rules(build_rules)} + +### INSTRUCTIONS +1. Analyze the requirements and generate the Dockerfile content. +2. You MUST generate a json structure with a "dockerfile" +""" + + +def get_retry_prompt(fix_rules, error_message): + return f""" +### PERSONA +{PERSONA} + +### CONTEXT +{CONTEXT} + +### STATUS: BUILD FAILED +Your previous Dockerfile build has failed. Here is the instruction for how to fix it: + +```text +{error_message} +``` + +### REQUIREMENTS + +Please analyze the error and your previous work, and provide a corrected version. +{format_rules(fix_rules)} + +### INSTRUCTIONS +1. Read the error log above carefully. +2. Modify the Dockerfile using your file tools. +3. Use a provided tool to retry the build. +""" diff --git a/fractale/tools/build/docker/tool.py b/fractale/tools/build/docker/tool.py index d642875..2f3c0a4 100644 --- a/fractale/tools/build/docker/tool.py +++ b/fractale/tools/build/docker/tool.py @@ -1,24 +1,22 @@ from fractale.tools.base import BaseTool from fractale.tools.decorator import mcp +import fractale.tools.build.docker.prompts as prompts import fractale.agent.logger as logger import fractale.utils as utils +from fractale.tools.result import Result import shutil import re import os -import sys import shutil import tempfile import subprocess -import textwrap +import shlex from rich import print from rich.syntax import Syntax -name = "docker-build" - class DockerBuildTool(BaseTool): - def setup(self): """ Setup ensures we have docker or podman installed. @@ -29,7 +27,26 @@ def setup(self): if not self.docker: raise ValueError("docker and podman are not present on the system.") - @mcp.tool(name="docker-push") + # @mcp.tool(name="docker-run") + def run_container(self, uri: str, command: str): + """ + Run a docker container. Accepts an optional unique resource identifier (URI). + + uri: the unique resource identifier. + command: string to run (will be shlex split) + """ + # Prepare command to push (docker or podman) + command = [self.docker, "run", "-it", uri] + shlex.split(command) + logger.info(f"Running {command}...") + p = subprocess.run( + command, + capture_output=True, + text=True, + check=False, + ) + return Result(p).render() + + @mcp.tool(name="docker_push") def push_container(self, uri: str, all_tags: bool = False): """ Push a docker container. Accepts an optional unique resource identifier (URI). @@ -53,14 +70,10 @@ def push_container(self, uri: str, all_tags: bool = False): text=True, check=False, ) - if p.returncode != 0: - output = "ERROR: " + p.stdout + p.stderr - logger.warning(f"Issue with docker push: {output}") - return logger.failure(output) - return logger.success(output) + return Result(p).render() - @mcp.tool(name=name) - def build_container(self, dockerfile: str, uri: str, platforms: str = None): + @mcp.tool(name="docker_build") + def build_container(self, dockerfile: list, uri: str = "lammps", platforms: str = None): """ Build a docker container. Accepts an optional unique resource identifier (URI). The build is always done in a protected temporary directory. @@ -71,7 +84,8 @@ def build_container(self, dockerfile: str, uri: str, platforms: str = None): push: push to the registry. Requires that the docker agent is authenticated. load: load into a kubernetes in docker (kind) cluster. """ - # TODO need a way for agent to keep track of retries. Context session id could work as key + # We pass as list because newlines in json... no go! + dockerfile = "\n".join(dockerfile) # This ensures that we aren't given a code block, etc. pattern = "```(?:docker|dockerfile)?\n(.*?)```" @@ -108,12 +122,11 @@ def build_container(self, dockerfile: str, uri: str, platforms: str = None): ) # Clean up after we finish shutil.rmtree(build_dir, ignore_errors=True) - output = logger.success(p.stdout + p.stderr) - output = self.filter_output(output) - if p.returncode == 0: - return logger.success(output) - return logger.failed(output) + # Streamline (filter) output and return result object + p.stdout = self.filter_output(p.stdout) + p.stderr = self.filter_output(p.stderr) + return Result(p).render() def filter_output(self, output): """ @@ -129,12 +142,13 @@ def filter_output(self, output): "update-alternatives", "Reading database ...", ] + output = output or "" regex = "(%s)" % "|".join(skips) output = "\n".join([x for x in output.split("\n") if not re.search(regex, x)]) # Try to match lines that start with # return "\n".join([x for x in output.split("\n") if not re.search(r"^#(\d)+ ", x)]) - @mcp.tool(name="kind-docker-load") + @mcp.tool(name="kind_docker_load") def load_kind(self, uri: str): """ Load a Docker URI into Kind (Kubernetes in Docker) @@ -152,11 +166,7 @@ def load_kind(self, uri: str): text=True, check=False, ) - if p.returncode != 0: - output = p.stdout + p.stderr - logger.warning(f"Issue with kind load: {output}") - return logger.failure(output) - return logger.success(output) + return Result(p).render() def print_result(self, dockerfile): """ @@ -166,3 +176,37 @@ def print_result(self, dockerfile): logger.custom( highlighted_syntax, title="Final Dockerfile", border_style="green", expand=True ) + + @mcp.prompt(name="docker_build_persona", description="Instructions for a fresh build") + def build_persona_prompt(self, application: str, environment: str = "CPU") -> dict: + """ + Generates agent instructions for creating a NEW Dockerfile. + """ + # Specific rules for a fresh build + build_rules = [ + "The Dockerfile content you generate must be complete and robust.", + "The response should ONLY contain the complete Dockerfile.", + "Use the available tools (files-write) to save the Dockerfile to disk.", + ] + prompts.COMMON_INSTRUCTIONS + + # Construct the text from our template + prompt_text = prompts.get_build_text(application, environment, build_rules) + + # Return MCP format + return {"messages": [{"role": "user", "content": {"type": "text", "text": prompt_text}}]} + + @mcp.prompt( + name="docker_fix_persona", description="Instructions for fixing or retrying a build" + ) + def fix_persona_prompt(self, error_message: str) -> dict: + """ + Generates system instructions for retrying a failed build. + """ + # 1. Define specific rules for fixing + fix_rules = [ + "The response should only contain the complete, corrected Dockerfile content.", + "Use succinct comments in the Dockerfile to explain build logic and changes.", + ] + prompts.COMMON_INSTRUCTIONS + + prompt_text = prompts.get_retry_prompt(fix_rules, error_message) + return {"messages": [{"role": "user", "content": {"type": "text", "text": prompt_text}}]} diff --git a/fractale/tools/decorator.py b/fractale/tools/decorator.py index 4b17bba..727b22c 100644 --- a/fractale/tools/decorator.py +++ b/fractale/tools/decorator.py @@ -1,9 +1,5 @@ -import functools -import time from typing import List -from fractale.metrics import DurationMetric, metrics - class McpProxy: """ @@ -18,44 +14,12 @@ def tool(self, name: str = None, description: str = None, tags: List[str] = None """ def decorator(func): - - def record_timing(start_time, error=None): - """ - Wrapper to record timing of tool. - """ - end_time = time.perf_counter() - tool_id = name or func.__name__ - - # Create the specific Metric object - metric = DurationMetric( - name=tool_id, - start_time=start_time, - end_time=end_time, - duration=end_time - start_time, - success=(error is None), - metadata={"error": str(error)} if error else {}, - ) - - # Push to generic registry - metrics.record(metric) - return metric.duration - - @functools.wraps(func) - def sync_wrapper(*args, **kwargs): - start = time.perf_counter() - result = func(*args, **kwargs) - dur = record_timing(start) - # Add the duration to the result for the LLM - result += f"\n\n[⏱️ {dur:.2f}s]" - return result - - wrapper = sync_wrapper default_name = (func.__module__.lower() + "-" + func.__name__.lower()).replace(".", "-") - wrapper._mcp_name = name or default_name - wrapper._mcp_desc = description - wrapper._mcp_tags = tags - wrapper._is_mcp_tool = True - return wrapper + func._mcp_name = name or default_name + func._mcp_desc = description + func._mcp_tags = tags + func._is_mcp_tool = True + return func return decorator diff --git a/fractale/tools/deploy/kubernetes/job/tool.py b/fractale/tools/deploy/kubernetes/job/tool.py index dc9e993..b624f55 100644 --- a/fractale/tools/deploy/kubernetes/job/tool.py +++ b/fractale/tools/deploy/kubernetes/job/tool.py @@ -20,7 +20,7 @@ class K8sJobTool(BaseTool): def setup(self): pass - @mcp.tool(name="kubernetes-status") + @mcp.tool(name="kubernetes_status") def get_status(self, job_id: str): """Checks job status.""" return "Running" diff --git a/fractale/tools/manager.py b/fractale/tools/manager.py index b0706dc..1fd0e55 100644 --- a/fractale/tools/manager.py +++ b/fractale/tools/manager.py @@ -61,7 +61,7 @@ def discover_tools(self, root_path: str, module_path: str) -> Dict[str, Path]: discovered[tool_id] = {"path": file_path, "module": import_path, "root": root_path} return discovered - def load_tools(self, names=None): + def load_tools(self, mcp, names=None): """ Load a set of named tools, or default to all those discovered. """ @@ -95,7 +95,22 @@ def load_tools(self, names=None): # Get the decorated functions for func in getfunc(): - yield ToolClass.from_function(func, name=func._mcp_name) + + # This is how we handle dynamic loading + endpoint = ToolClass.from_function(func, name=func._mcp_name) + + # @mcp.tool + if ToolClass == Tool: + mcp.add_tool(endpoint) + + # @mcp.prompt + elif ToolClass == Prompt: + mcp.add_prompt(endpoint) + + # @mcp.resource + else: + mcp.add_resource(endpoint) + yield endpoint def load_tool(self, tool_id: str) -> BaseTool: """ @@ -122,3 +137,37 @@ def load_tool(self, tool_id: str) -> BaseTool: except ImportError as e: print(f"❌ Error importing {tool_id}: {e}") return None + + def get_available_prompts(self): + """ + Scans all discoverable tools for functions decorated with @mcp.prompt. + Returns a set of prompt names (personas). We need this to validate a plan. + A plan is not valid if it names a persona (prompt) that is not known. + """ + prompts = set() + + # 2. Load them (to execute decorators) + for tool_id, path in self.tools.items(): + mod = self.load_tool_module(tool_id, path) + if not mod: + continue + + # 3. Inspect the classes/functions in the module + for name, obj in inspect.getmembers(mod): + # We usually look for classes inheriting from BaseTool + # But we can also just scan the class attributes + if inspect.isclass(obj): + for attr_name in dir(obj): + try: + func = getattr(obj, attr_name) + except: + continue + + # CHECK FOR PROXY TAG + if callable(func) and getattr(func, "_is_mcp_prompt", False): + # Get the name from the decorator + p_name = getattr(func, "_mcp_name", None) + if p_name: + prompts.add(p_name) + + return prompts diff --git a/fractale/tools/prompts.py b/fractale/tools/prompts.py new file mode 100644 index 0000000..a5bca61 --- /dev/null +++ b/fractale/tools/prompts.py @@ -0,0 +1,2 @@ +def format_rules(rules): + return "\n".join([f"- {r}" for r in rules]) diff --git a/fractale/tools/result.py b/fractale/tools/result.py new file mode 100644 index 0000000..3cc4c6c --- /dev/null +++ b/fractale/tools/result.py @@ -0,0 +1,103 @@ +import json +import subprocess +from typing import Any, Dict, Optional + +from rich import print + +from fractale.logger.logger import logger + + +class Result: + """ + Standardized return object for MCP tools. + Handles formatting output, errors, and metadata for the LLM. + """ + + def __init__( + self, content: Any = None, returncode: int = 0, stderr: str = "", metadata: Dict = None + ): + + self.returncode = returncode + self.stdout = "" + self.stderr = stderr + self.metadata = metadata or {} + self.parse(content) + + def parse(self, content): + """ + Parse content into the unified interface. + """ + # subprocess Result + if isinstance(content, subprocess.CompletedProcess): + self.returncode = content.returncode + self.stdout = self._decode(content.stdout) + self.stderr = self._decode(content.stderr) + + # handle exception + elif isinstance(content, Exception): + self.returncode = 1 + self.stderr = str(content) + + # handle string / dict + elif isinstance(content, (str, dict, list)): + if isinstance(content, (dict, list)): + self.stdout = json.dumps(content, indent=2) + else: + self.stdout = str(content) + + # empty init (manual setting later) + else: + self.stdout = "" + + def _decode(self, val): + """Safe decoding of bytes or string.""" + if val is None: + return "" + if isinstance(val, bytes): + return val.decode("utf-8", errors="replace") + return str(val) + + @property + def is_success(self): + return self.returncode == 0 + + def render(self) -> str: + """ + Renders the result into a formatted string for the LLM. + """ + status = "SUCCESS" if self.is_success else "FAILURE" + logfunc = logger.success if self.is_success else logger.failure + icon = "✅" if self.is_success else "❌" + sections = [f"{icon} STATUS: {status} (Exit Code {self.returncode})"] + + if self.stdout.strip(): + # Use code blocks for clear separation + sections.append(f"--- STDOUT ---\n```text\n{self.stdout.strip()}\n```") + + if self.stderr.strip(): + sections.append(f"--- STDERR ---\n```text\n{self.stderr.strip()}\n```") + + if self.metadata: + sections.append( + f"--- METADATA ---\n```json\n{json.dumps(self.metadata, indent=2)}\n```" + ) + + if not self.is_success: + sections.append("HINT: Analyze the STDERR output above to determine the fix.") + + result = "\n\n".join(sections) + logfunc(result) + return result + + def to_json(self) -> str: + """ + Returns raw JSON for machine-readable only. + """ + return json.dumps( + { + "returncode": self.returncode, + "stdout": self.stdout, + "stderr": self.stderr, + "metadata": self.metadata, + } + ) diff --git a/fractale/tools/simple/tool.py b/fractale/tools/simple/tool.py index 1bebb50..68169da 100644 --- a/fractale/tools/simple/tool.py +++ b/fractale/tools/simple/tool.py @@ -10,7 +10,7 @@ class EchoTool(BaseTool): def setup(self): pass - @mcp.tool(name="simple-echo") + @mcp.tool(name="simple_echo") def echo(self, message: str): """Echo the message back (return it)""" return message diff --git a/fractale/ui/__init__.py b/fractale/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fractale/ui/adapters/__init__.py b/fractale/ui/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fractale/ui/adapters/cli.py b/fractale/ui/adapters/cli.py new file mode 100644 index 0000000..392157a --- /dev/null +++ b/fractale/ui/adapters/cli.py @@ -0,0 +1,38 @@ +import sys + +from rich import print + +from fractale.ui.base import UserInterface + + +class CLIAdapter(UserInterface): + def on_step_start(self, name, description, inputs): + print(f"\n🚀 STEP: {name}") + print(f" Goal: {description}") + + def on_step_update(self, content: str): + """ + Called when a tool produces output, but the step isn't finished yet. + """ + print("\n[dim]--- Tool Output Update ---[/dim]") + if len(content) > 500: + content = content[:500] + "..." + print(content) + + def on_log(self, message, level="info"): + # Simple print + print(f" {message}") + + def on_step_finish(self, name, result, error, metadata): + if error: + print(f"❌ {name} Failed: {error}") + else: + print(f"✅ {name} Complete.") + + def on_workflow_complete(self, status): + print(f"\n🏁 Workflow: {status}") + + def ask_user(self, question, options=None) -> str: + # Standard Python input + opt_str = f"[{'/'.join(options)}]" if options else "" + return input(f"❓ {question} {opt_str}: ").strip() diff --git a/fractale/ui/adapters/tui.py b/fractale/ui/adapters/tui.py new file mode 100644 index 0000000..93aeb55 --- /dev/null +++ b/fractale/ui/adapters/tui.py @@ -0,0 +1,373 @@ +import json +import logging +import queue +import traceback + +from rich.panel import Panel +from rich.syntax import Syntax +from textual.app import App, ComposeResult +from textual.containers import Vertical, VerticalScroll +from textual.widgets import ( + Collapsible, + Footer, + Header, + Label, + LoadingIndicator, + Log, + Markdown, + Static, +) + +from fractale.ui.base import UserInterface +from fractale.ui.screens import UserInputScreen + +logger = logging.getLogger("fractale") + + +class TextualAdapter(UserInterface): + """ + Translate abstract UI calls (from manager) into thread-based actions. + """ + + def __init__(self, app: "FractaleApp"): + self.app = app + + def on_step_update(self, content: str): + """ + Called by Agent to show live tool output in the UI. + """ + # Update the output box. + self.app.call_from_thread(self.app.action_set_result, str(content)) + + def on_step_start(self, name: str, description: str, inputs: dict): + prompt_text = inputs.get("_debug_prompt_text", "") + self.app.call_from_thread(self.app.action_add_step, name, description) + + def on_log(self, message: str, level: str = "info"): + self.app.call_from_thread(self.app.action_log, message) + + def on_step_finish(self, name: str, result: str, error: str, metadata: dict): + if error: + self.app.call_from_thread(self.app.action_log, f"[bold red]ERROR:[/bold red] {error}") + self.app.call_from_thread(self.app.action_status, "Step Failed", "red") + else: + self.app.call_from_thread(self.app.action_set_result, str(result)) + self.app.call_from_thread(self.app.action_status, "Step Complete", "green") + + def on_prompt_loaded(self, text: str): + self.app.call_from_thread(self.app.action_update_prompt, text) + + def on_workflow_complete(self, status: str): + self.app.call_from_thread(self.app.action_status, f"Done: {status}", "blue") + + def ask_user(self, question: str, options: list[str] = None) -> str: + """ + Blocking call. + 1. Creates a Queue. + 2. Signals App to show Modal. + 3. Waits for Modal to put result in Queue. + """ + reply_queue = queue.Queue() + self.app.call_from_thread(self.app.action_prompt_user, question, reply_queue) + return reply_queue.get() + + +class StepDisplay(Collapsible): + """ + A widget representing one Step. + Contains nested collapsibles for Reasoning and Output. + """ + + def __init__(self, title: str, description: str, **kwargs): + super().__init__(title=title, **kwargs) + self.description = description + + # Logs + self.log_widget = Log(highlight=True, classes="step-log") + + # Results + self.result_container = Vertical(classes="step-result-container") + self.container_result = Collapsible(title="📝 Output") + + # Write "thinking" here. + self.container_log = Collapsible(title="🧠 Reasoning & Tools") + self.container_result = Collapsible(title="📝 Output") + + # Write prompt neatly here. + self.prompt_widget = Markdown("", classes="step-prompt") + self.container_prompt = Collapsible(title="📋 System Instruction / Prompt") + + def compose(self) -> ComposeResult: + yield Label(f"Goal: {self.description}", classes="step-desc") + + # Mount Prompt Section + with self.container_prompt: + yield self.prompt_widget + + with self.container_log: + yield self.log_widget + + with self.container_result: + yield self.result_container + + yield LoadingIndicator() + + def on_mount(self): + # Hide all initially + self.container_prompt.display = False + self.container_log.display = False + self.container_result.display = False + + def write_log(self, message: str): + """Reveal log section and write message.""" + if not self.container_log.display: + self.container_log.display = True + self.container_log.collapsed = False # Auto-expand on first log + + if not isinstance(message, str): + message = json.dumps(message) + self.log_widget.write_line(message) + + def start_loading(self): + """ + Mounts a LoadingIndicator if one isn't already present. + """ + # Query to see if we already have one (prevent duplicates) + if not self.query(LoadingIndicator): + indicator = LoadingIndicator() + # Insert it right after the description label + self.mount(indicator, after=self.query_one(".step-desc")) + + def stop_loading(self): + """ + Removes the LoadingIndicator from the DOM. + """ + # Query all indicators in this widget and remove them + for widget in self.query(LoadingIndicator): + widget.remove() + + def set_prompt(self, text: str): + if not text: + return + self.container_prompt.display = True + self.prompt_widget.update(text) + self.container_prompt.collapsed = True + + def set_result(self, content: str): + """ + Robust result rendering. Always shows SOMETHING. + """ + self.stop_loading() + + # 1. Reveal Container + self.container_result.display = True + self.container_result.collapsed = False + self.result_container.remove_children() + + # 2. Handle Empty Content explicitly + if not content: + self.result_container.mount(Label("[dim]No output returned[/dim]")) + return + + try: + # 3. Attempt Structured Rendering + try: + data = json.loads(content) + except (json.JSONDecodeError, TypeError): + # Fallback: Wrap raw string in a dict so the loop handles it + data = {"Raw Output": str(content)} + + # Normalize non-dict JSON (e.g. lists or primitives) + if not isinstance(data, dict): + data = {"Result": data} + + for key, value in data.items(): + title = str(key).replace("_", " ").title() + + # Default renderable + renderable = str(value) + + # Pretty print complex types + if isinstance(value, (dict, list)): + renderable = Pretty(value) + + # Syntax highlight multiline strings + elif isinstance(value, str) and "\n" in value: + renderable = Syntax(value, "text", theme="monokai", word_wrap=True) + + # Mount the panel + # Use height="auto" to ensure it expands to content + panel = Panel(renderable, title=title, border_style="green", expand=True) + self.result_container.mount(Static(panel, classes="result-panel")) + + except Exception as e: + # 4. Nuclear Fallback: If Rich rendering crashes, show raw error + content + error_msg = f"Rendering Error: {e}\n\nRaw Content:\n{content}" + self.result_container.mount(Label(error_msg)) + + +class FractaleApp(App): + CSS = """ + /* Main Step Container */ + StepDisplay { + margin-bottom: 1; + background: $surface; + border: solid $primary; + height: auto; + } + + /* The Description Label */ + .step-desc { + text-style: italic; + color: $text-muted; + padding: 1; + width: 100%; + } + + /* Nested Collapsibles (Reasoning / Output) */ + StepDisplay > Collapsible { + margin-top: 1; + margin-left: 2; /* Indent slightly */ + margin-right: 2; + background: $surface-darken-1; + border-left: solid $accent; + } + + /* The actual Log Widget */ + .step-log { + height: auto; + max-height: 20; /* Limit height so it doesn't dominate */ + background: $surface-darken-2; + overflow-y: scroll; + border: none; + } + + /* The Result Markdown */ + .step-result { + height: auto; + padding: 1; + background: $surface-darken-2; + } + + .step-result-container { + height: auto; + background: $surface-darken-1; + padding: 1; + } + + StepDisplay > .Collapsible--header { + background: $primary-darken-2; + color: $text; + text-style: bold; + } + + LoadingIndicator { + height: 1; + min-height: 1; + margin: 1 2; + color: $accent; + background: transparent; + } + + StepDisplay { + /* Ensure the body background is distinct */ + background: $surface; + margin-bottom: 1; + border: solid $primary; + } + .result-panel { + height: auto; + margin-bottom: 1; + } + """ + + BINDINGS = [ + ("q", "quit", "Quit"), + ] + + def __init__(self, manager, context): + super().__init__() + self.manager = manager + self.context = context + self.current_step_widget = None + + def compose(self) -> ComposeResult: + yield Header(show_clock=True) + yield VerticalScroll(id="steps-container") + yield Footer() + + def on_mount(self) -> None: + self.title = "Fractale Workflow" + + # Create a log so we can write errors during startup + self.action_add_step("System", "Initialization & Validation") + self.run_worker(self.run_process, exclusive=True, thread=True) + + def run_process(self): + """ + The Background Worker. + """ + adapter = TextualAdapter(app=self) + self.manager.ui = adapter + + try: + self.call_from_thread(self.action_stop_loading) + self.manager.run(self.context) + except Exception as e: + tb = traceback.format_exc() + self.call_from_thread(self.action_log, "\n[bold red]💥 CRITICAL FAILURE[/bold red]") + self.call_from_thread(self.action_log, f"{e}") + self.call_from_thread(self.action_log, f"\n[dim]{tb}[/dim]") + self.call_from_thread(self.action_status, "Workflow Failed", "red") + finally: + self.call_from_thread(self.action_stop_loading) + + def action_stop_loading(self): + if self.current_step_widget: + self.current_step_widget.stop_loading() + + def action_update_prompt(self, text: str): + if self.current_step_widget: + self.current_step_widget.set_prompt(text) + + def action_add_step(self, name: str, desc: str, prompt_text: str = ""): + container = self.query_one("#steps-container") + if self.current_step_widget: + self.current_step_widget.stop_loading() + self.current_step_widget.collapsed = True + + step = StepDisplay(title=f"▶️ {name}", description=desc) + step.collapsed = False + container.mount(step) + + # Populate the prompt immediately if available + if prompt_text: + step.set_prompt(prompt_text) + + step.scroll_visible() + self.current_step_widget = step + + def action_log(self, message: str): + if self.current_step_widget: + self.current_step_widget.write_log(message) + else: + self.notify(message, severity="error", timeout=10) + + def action_set_result(self, content: str): + if self.current_step_widget: + self.current_step_widget.title = self.current_step_widget.title.replace("▶️", "✅") + self.current_step_widget.set_result(content) + self.current_step_widget.collapsed = True + + def action_status(self, msg: str, style: str): + # If status indicates failure, stop spinner + if "Failed" in msg and self.current_step_widget: + self.current_step_widget.title = self.current_step_widget.title.replace("▶️", "❌") + + self.sub_title = msg + + def action_prompt_user(self, question: str, reply_queue): + def on_input_done(result: str): + reply_queue.put(result) + + self.push_screen(UserInputScreen(question), on_input_done) diff --git a/fractale/ui/adapters/web.py b/fractale/ui/adapters/web.py new file mode 100644 index 0000000..08e3ac3 --- /dev/null +++ b/fractale/ui/adapters/web.py @@ -0,0 +1,34 @@ +import time + +import requests + +from fractale.ui.base import UserInterface + + +class WebAdapter(UserInterface): + def __init__(self, api_url): + self.api_url = api_url # e.g. http://localhost:3000/api/events + + def _post(self, event_type, payload): + requests.post(self.api_url, json={"type": event_type, "data": payload}) + + def on_step_start(self, name, description, inputs): + self._post("step_start", {"name": name, "desc": description}) + + def on_log(self, message, level="info"): + self._post("log", {"msg": message}) + + # ... other outputs ... + + def ask_user(self, question, options=None) -> str: + # 1. Post the question to the UI + req_id = f"req_{time.time()}" + self._post("ask_user", {"question": question, "id": req_id}) + + # 2. POLL for an answer (or use a Redis queue / Websocket listener) + # This blocks the Manager thread until the frontend user clicks a button. + while True: + resp = requests.get(f"{self.api_url}/answers/{req_id}") + if resp.status_code == 200: + return resp.json()["answer"] + time.sleep(1) diff --git a/fractale/ui/base.py b/fractale/ui/base.py new file mode 100644 index 0000000..9e68078 --- /dev/null +++ b/fractale/ui/base.py @@ -0,0 +1,39 @@ +from typing import Any, Optional, Protocol + + +class UserInterface(Protocol): + """ + The strict contract that ManagerAgent relies on. + Any implementation (Web, TUI, CLI) must provide these methods. + """ + + def on_step_start(self, name: str, description: str, inputs: dict): + pass + + def on_step_update(self, content: str): + pass + + def on_log(self, message: str, level: str = "info"): + pass + + def log(self, message: str, level: str = "info"): + self.on_log(message, level) + + def on_step_finish(self, name: str, result: str, error: Optional[str], metadata: dict): + """ + A step completes (success or failure). + """ + pass + + def on_workflow_complete(self, status: str): + """ + The whole plan finishes. + """ + pass + + # --- INPUT (Blocking) --- + def ask_user(self, question: str, options: list[str] = None) -> str: + """ + The Manager pauses until the user answers (blocking) + """ + pass diff --git a/fractale/ui/screens.py b/fractale/ui/screens.py new file mode 100644 index 0000000..73b45ab --- /dev/null +++ b/fractale/ui/screens.py @@ -0,0 +1,69 @@ +from textual.app import ComposeResult +from textual.containers import Horizontal, Vertical +from textual.screen import ModalScreen +from textual.widgets import Button, Input, Label + + +class UserInputScreen(ModalScreen[str]): + """ + A modal dialog that asks the user for text input. + Returns the string entered when submitted. + """ + + CSS = """ + UserInputScreen { + align: center middle; + } + + #dialog { + grid-size: 2; + grid-gutter: 1 2; + grid-rows: 1fr 3; + padding: 0 1; + width: 60; + height: 11; + border: thick $background 80%; + background: $surface; + } + + #question { + column-span: 2; + height: 1fr; + width: 1fr; + content-align: center middle; + } + + Input { + column-span: 2; + } + + Button { + width: 100%; + } + """ + + def __init__(self, question: str): + super().__init__() + self.question = question + + def compose(self) -> ComposeResult: + with Vertical(id="dialog"): + yield Label(self.question, id="question") + yield Input(placeholder="Type your answer here...", id="input") + with Horizontal(): + yield Button("Submit", variant="primary", id="submit") + yield Button("Cancel", variant="error", id="cancel") + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "submit": + input_widget = self.query_one(Input) + value = input_widget.value + self.dismiss(value) + else: + self.dismiss(None) # Cancelled + + def on_input_submitted(self, event: Input.Submitted) -> None: + """ + Allow pressing Enter to submit. + """ + self.dismiss(event.value) diff --git a/fractale/utils/fileio.py b/fractale/utils/fileio.py index 6c2d982..d79bd65 100644 --- a/fractale/utils/fileio.py +++ b/fractale/utils/fileio.py @@ -10,6 +10,31 @@ import yaml +def run_sync(coroutine): + """ + Runs an async coroutine synchronously. + Patches the loop if running inside IPython/Jupyter. + + Note that I'm not currently using this - keeping here if need. + """ + import asyncio + + import nest_asyncio + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + # We aren't in async -> patch it + nest_asyncio.apply(loop) + return loop.run_until_complete(coroutine) + else: + # We are in standard Python script -> Standard run + return asyncio.run(coroutine) + + def get_local_cluster(): """ Guess the local cluster based on the hostname diff --git a/fractale/utils/text.py b/fractale/utils/text.py index 565e938..83b43c7 100644 --- a/fractale/utils/text.py +++ b/fractale/utils/text.py @@ -1,10 +1,11 @@ import re -def get_code_block(content, code_type): +def get_code_block(content, code_type=None): """ Parse a code block from the response """ + code_type = code_type or r"[\w\+\-\.]*" pattern = f"```(?:{code_type})?\n(.*?)```" match = re.search(pattern, content, re.DOTALL) if match: diff --git a/fractale/version.py b/fractale/version.py index d07e16c..c241926 100644 --- a/fractale/version.py +++ b/fractale/version.py @@ -9,20 +9,21 @@ ################################################################################ -# Global requirements +# TODO vsoch: refactor this to use newer pyproject stuff. -# Note that the spack / environment modules plugins are installed automatically. -# This doesn't need to be the case. INSTALL_REQUIRES = ( ("jsonschema", {"min_version": None}), ("Jinja2", {"min_version": None}), ("uvicorn", {"min_version": None}), ("mcp", {"min_version": None}), ("fastmcp", {"min_version": None}), + ("fastapi", {"min_version": None}), # Yeah, probably overkill, just being used for printing the scripts ("rich", {"min_version": None}), + ("nest_asyncio", {"min_version": None}), ) +OPENAI_REQUIRES = (("openai", {"min_version": None}),) GOOGLE_REQUIRES = (("google-generativeai", {"min_version": None}),) TESTS_REQUIRES = (("pytest", {"min_version": "4.6.2"}),) -INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES + GOOGLE_REQUIRES +INSTALL_REQUIRES_ALL = INSTALL_REQUIRES + TESTS_REQUIRES + GOOGLE_REQUIRES + OPENAI_REQUIRES diff --git a/setup.py b/setup.py index 3a4a3e4..f29fd6e 100644 --- a/setup.py +++ b/setup.py @@ -67,6 +67,8 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"): TESTS_REQUIRES = get_reqs(lookup, "TESTS_REQUIRES") INSTALL_REQUIRES_ALL = get_reqs(lookup, "INSTALL_REQUIRES_ALL") GOOGLE_REQUIRES = get_reqs(lookup, "GOOGLE_REQUIRES") + OPENAI_REQUIRES = get_reqs(lookup, "OPENAI_REQUIRES") + LLAMA_REQUIRES = get_reqs(lookup, "OPENAI_REQUIRES") setup( name=NAME, @@ -89,6 +91,8 @@ def get_reqs(lookup=None, key="INSTALL_REQUIRES"): extras_require={ "all": [INSTALL_REQUIRES_ALL], "google": [GOOGLE_REQUIRES], + "openai": [OPENAI_REQUIRES], + "llama": [LLAMA_REQUIRES], }, classifiers=[ "Intended Audience :: Science/Research",