diff --git a/.gitignore b/.gitignore index 8508637..6702239 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ dist/ coverage/ node_modules/ .turbo/ +.vscode/ TODO* NOTE* diff --git a/docs/src/guide/getting-started.md b/docs/src/guide/getting-started.md index b423879..c955dd6 100644 --- a/docs/src/guide/getting-started.md +++ b/docs/src/guide/getting-started.md @@ -54,6 +54,7 @@ async function run() { // "serializedContext": "{\"value\":42,\"_outputs.start\":42,\"_inputs.double\":42,\"_outputs.double\":84}", // "status": "completed" // } +} run() ``` diff --git a/docs/src/guide/loops.md b/docs/src/guide/loops.md index 1c116f6..6d00b47 100644 --- a/docs/src/guide/loops.md +++ b/docs/src/guide/loops.md @@ -79,7 +79,7 @@ const flow = createFlow('loop-workflow') }) // 4. Define the edges. - .edge('initialize', 'increment') + .edge('initialize', 'counter') .toBlueprint() ``` @@ -98,6 +98,12 @@ The [`.loop()`](/api/flow#loop-id-options) method adds a `loop-controller` node. > [!NOTE] > The [`.loop()`](/api/flow#loop-id-options) method automatically configures the `joinStrategy` of the loop's start and end nodes to `'any'` so they can be re-executed on each iteration. +> [!INFO] +> `joinStrategy` decides how a node should be executed when it has multiple predecessors: +> - `'any'`: the node will be executed when any of the predecessors finishes, possibly for many times +> - `'all'`: the node will be only executed once when all its predecessors finish. +> The default value is `'all'`. + ## Security Considerations By default, Flowcraft uses [`PropertyEvaluator`](/api/evaluator#propertyevaluator-class) for expression evaluation, which only allows simple property access (e.g., `result.output.status`). Complex expressions with operators like `<`, `>`, `===`, or `!==` (as shown in the example above) require the [`UnsafeEvaluator`](/api/evaluator#unsafeevaluator-class). @@ -119,6 +125,8 @@ const runtime = new FlowRuntime({ While loops provide a structured way to handle iteration, it's also possible to create workflows with cycles (non-DAG graphs) using manual edges. However, this comes with significant risks and unpredictable behavior. +For this reason, Flowcraft stops at cycles that are not handled by `loop-controller` by default. You should prefer `.loop()` for handling cycles. + ### Risks of Non-DAG Workflows When a workflow contains cycles and is run in non-strict mode, the runtime arbitrarily selects the first node of a detected cycle as the starting point. This can lead to: diff --git a/examples/advanced/hitl-workflow/src/workflow.ts b/examples/advanced/hitl-workflow/src/workflow.ts index 40eb5ad..d93ceed 100644 --- a/examples/advanced/hitl-workflow/src/workflow.ts +++ b/examples/advanced/hitl-workflow/src/workflow.ts @@ -1,5 +1,5 @@ -import { createFlow, type NodeContext } from 'flowcraft' import { createInterface } from 'node:readline' +import { createFlow, type NodeContext } from 'flowcraft' interface WorkflowContext { task: { id: string; description: string; amount: number; reason: string } diff --git a/package.json b/package.json index bf2d6da..a356ca3 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "packageManager": "pnpm@10.20.0", "scripts": { "lint": "biome check packages examples", + "lint:fix": "biome check packages examples --write", "changeset": "changeset", "release": "turbo build && changeset publish" }, diff --git a/packages/core/src/flow.ts b/packages/core/src/flow.ts index 348f753..17c30f9 100644 --- a/packages/core/src/flow.ts +++ b/packages/core/src/flow.ts @@ -26,7 +26,6 @@ export class FlowBuilder< > { private blueprint: Partial private functionRegistry: Map - private loopControllerIds: Map private loopDefinitions: Array<{ id: string startNodeId: string @@ -43,7 +42,6 @@ export class FlowBuilder< constructor(id: string) { this.blueprint = { id, nodes: [], edges: [] } this.functionRegistry = new Map() - this.loopControllerIds = new Map() this.loopDefinitions = [] this.batchDefinitions = [] this.cycleEntryPoints = new Map() @@ -191,22 +189,17 @@ export class FlowBuilder< }, ): this { const { startNodeId, endNodeId, condition } = options - const controllerId = `${id}-loop` - - this.loopControllerIds.set(id, controllerId) - this.loopDefinitions.push({ id, startNodeId, endNodeId, condition }) this.blueprint.nodes?.push({ - id: controllerId, + id, uses: 'loop-controller', params: { condition }, config: { joinStrategy: 'any' }, }) - this.edge(endNodeId, controllerId) - - this.edge(controllerId, startNodeId, { + this.edge(endNodeId, id) + this.edge(id, startNodeId, { action: 'continue', transform: `context.${endNodeId}`, }) @@ -214,14 +207,6 @@ export class FlowBuilder< return this } - getLoopControllerId(id: string): string { - const controllerId = this.loopControllerIds.get(id) - if (!controllerId) { - throw new Error(`Loop with id '${id}' not found. Ensure you have defined it using the .loop() method.`) - } - return controllerId - } - /** * Sets the preferred entry point for a cycle in non-DAG workflows. * This helps remove ambiguity when the runtime needs to choose a starting node for cycles. @@ -242,10 +227,9 @@ export class FlowBuilder< // loop edge re-wiring for (const loopDef of this.loopDefinitions) { - const controllerId = this.getLoopControllerId(loopDef.id) - const edgesToRewire = allOriginalEdges.filter((e) => e.source === loopDef.endNodeId && e.target !== controllerId) + const edgesToRewire = allOriginalEdges.filter((e) => e.source === loopDef.id && e.target !== loopDef.startNodeId) for (const edge of edgesToRewire) { - finalEdges.push({ ...edge, source: controllerId, action: edge.action || 'break' }) + finalEdges.push({ ...edge, action: edge.action || 'break', transform: `context.${loopDef.endNodeId}` }) processedOriginalEdges.add(edge) } } @@ -283,9 +267,6 @@ export class FlowBuilder< if (!endNode) { throw new Error(`Loop '${loopDef.id}' references non-existent end node '${loopDef.endNodeId}'.`) } - - startNode.config = { ...startNode.config, joinStrategy: 'any' } - endNode.config = { ...endNode.config, joinStrategy: 'any' } } if (this.cycleEntryPoints.size > 0) { @@ -311,10 +292,8 @@ export class FlowBuilder< // replace loop-controllers with direct, cyclical edges for (const loopDef of this.loopDefinitions) { - const controllerId = this.loopControllerIds.get(loopDef.id) - if (!controllerId) continue - - ignoredNodeIds.add(controllerId) + const id = loopDef.id + ignoredNodeIds.add(id) // direct edge from the end of loop to start uiEdges.push({ @@ -328,13 +307,22 @@ export class FlowBuilder< }) // re-wire any 'break' edges - const breakEdges = blueprint.edges.filter((edge) => edge.source === controllerId && edge.action === 'break') + const breakEdges = blueprint.edges.filter((edge) => edge.source === id && edge.action === 'break') for (const breakEdge of breakEdges) { uiEdges.push({ ...breakEdge, source: loopDef.endNodeId, }) } + + // re-wire any 'incoming' edges + const incomingEdges = blueprint.edges.filter((edge) => edge.target === id && edge.source !== loopDef.endNodeId) + for (const incomingEdge of incomingEdges) { + uiEdges.push({ + ...incomingEdge, + source: loopDef.startNodeId, + }) + } } // replace scatter/gather pairs with a single representative worker node diff --git a/packages/core/src/runtime/orchestrators/utils.ts b/packages/core/src/runtime/orchestrators/utils.ts index 48ee257..2632713 100644 --- a/packages/core/src/runtime/orchestrators/utils.ts +++ b/packages/core/src/runtime/orchestrators/utils.ts @@ -97,12 +97,7 @@ export async function processResults( executionId, ) - const loopControllerMatch = matched.find( - (m: { node: NodeDefinition; edge: any }) => m.node.uses === 'loop-controller', - ) - const finalMatched = loopControllerMatch ? [loopControllerMatch] : matched - - for (const { node, edge } of finalMatched) { + for (const { node, edge } of matched) { await runtime.applyEdgeTransform( edge, result, @@ -116,7 +111,7 @@ export async function processResults( traverser.markNodeCompleted( nodeId, result, - finalMatched.map((m: { node: NodeDefinition; edge: any }) => m.node), + matched.map((m: { node: NodeDefinition; edge: any }) => m.node), ) } else if (executionResult.status === 'failed_with_fallback') { const { fallbackNodeId, error } = executionResult diff --git a/packages/core/src/runtime/traverser.ts b/packages/core/src/runtime/traverser.ts index f605fb7..6fd84a4 100644 --- a/packages/core/src/runtime/traverser.ts +++ b/packages/core/src/runtime/traverser.ts @@ -10,19 +10,28 @@ export interface ReadyNode { export class GraphTraverser { private frontier = new Set() private allPredecessors: Map> + private allSuccessors: Map> private dynamicBlueprint: WorkflowBlueprint private completedNodes = new Set() + private nodesInLoops: Map> constructor(blueprint: WorkflowBlueprint, isStrictMode: boolean = false) { this.dynamicBlueprint = structuredClone(blueprint) as WorkflowBlueprint this.allPredecessors = new Map>() + this.allSuccessors = new Map>() + this.nodesInLoops = new Map>() for (const node of this.dynamicBlueprint.nodes) { this.allPredecessors.set(node.id, new Set()) + this.allSuccessors.set(node.id, new Set()) } for (const edge of this.dynamicBlueprint.edges) { - this.allPredecessors.get(edge.target)?.add(edge.source) + this.getPredecessors(edge.target).add(edge.source) + } + for (const edge of this.dynamicBlueprint.edges) { + this.getSuccessors(edge.source).add(edge.target) } const analysis = analyzeBlueprint(blueprint) + this.filterNodesInLoops(blueprint) this.frontier = new Set(analysis.startNodeIds.filter((id) => !this.isFallbackNode(id))) if (this.frontier.size === 0 && analysis.cycles.length > 0 && !isStrictMode) { const uniqueStartNodes = new Set() @@ -65,7 +74,7 @@ export class GraphTraverser { if (traverser.completedNodes.has(node.id)) continue const requiredPredecessors = traverser.allPredecessors.get(node.id) - const joinStrategy = traverser.getEffectiveJoinStrategy(node.id) + const joinStrategy = traverser.getJoinStrategy(node.id) // if no predecessors and not completed, it's a start node and should be in the frontier if (!requiredPredecessors || requiredPredecessors.size === 0) { @@ -89,25 +98,34 @@ export class GraphTraverser { return this.dynamicBlueprint.nodes.some((n) => n.config?.fallback === nodeId) } - private getEffectiveJoinStrategy(nodeId: string): 'any' | 'all' { + private getJoinStrategy(nodeId: string): 'any' | 'all' { const node = this.dynamicBlueprint.nodes.find((n) => n.id === nodeId) const baseJoinStrategy = node?.config?.joinStrategy || 'all' + return baseJoinStrategy + } - if (node?.uses === 'loop-controller') { - return 'any' - } + private filterNodesInLoops(blueprint: WorkflowBlueprint): void { + blueprint.nodes.forEach((node) => { + if (node.uses !== 'loop-controller') return - const predecessors = this.allPredecessors.get(nodeId) - if (predecessors) { - for (const predecessorId of predecessors) { - const predecessorNode = this.dynamicBlueprint.nodes.find((n) => n.id === predecessorId) - if (predecessorNode?.uses === 'loop-controller') { - return 'any' - } - } - } + const nextInLoopId = blueprint.edges.find((e) => e.source === node.id && e.action === 'continue')?.target + if (!nextInLoopId) return - return baseJoinStrategy + const set: Set = new Set() + set.add(nextInLoopId) + this.nodesInLoops.set(node.id, this.getAllLoopSuccessors(nextInLoopId, blueprint, set)) + }) + } + + private getAllLoopSuccessors(nodeId: string, blueprint: WorkflowBlueprint, set: Set): Set { + this.getSuccessors(nodeId).forEach((successor) => { + if (set.has(successor)) return + const node = this.getNode(successor, blueprint) + if (!node || node.uses === 'loop-controller') return + set.add(successor) + this.getAllLoopSuccessors(successor, blueprint, set) + }) + return set } getReadyNodes(): ReadyNode[] { @@ -135,18 +153,17 @@ export class GraphTraverser { this.dynamicBlueprint.nodes.push(dynamicNode) this.allPredecessors.set(dynamicNode.id, new Set([nodeId])) if (gatherNodeId) { - this.allPredecessors.get(gatherNodeId)?.add(dynamicNode.id) + this.getPredecessors(gatherNodeId).add(dynamicNode.id) } this.frontier.add(dynamicNode.id) } } for (const node of nextNodes) { - const joinStrategy = this.getEffectiveJoinStrategy(node.id) + const joinStrategy = this.getJoinStrategy(node.id) if (joinStrategy !== 'any' && this.completedNodes.has(node.id)) continue - const requiredPredecessors = this.allPredecessors.get(node.id) - if (!requiredPredecessors) continue + const requiredPredecessors = this.getPredecessors(node.id) const isReady = joinStrategy === 'any' @@ -155,19 +172,33 @@ export class GraphTraverser { if (isReady) { this.frontier.add(node.id) + // reset to uncompleted for all nodes in a loop + if (node.uses === 'loop-controller') { + this.getNodesInLoop(node.id).forEach((id) => { + this.resetNodeCompletion(id) + }) + } } } if (nextNodes.length === 0) { for (const [potentialNextId, predecessors] of this.allPredecessors) { if (predecessors.has(nodeId) && !this.completedNodes.has(potentialNextId)) { - const joinStrategy = this.getEffectiveJoinStrategy(potentialNextId) + const joinStrategy = this.getJoinStrategy(potentialNextId) const isReady = joinStrategy === 'any' ? predecessors.has(nodeId) : [...predecessors].every((p) => this.completedNodes.has(p)) if (isReady) { this.frontier.add(potentialNextId) + const node = this.getNode(potentialNextId, this.dynamicBlueprint) + if (!node) continue + // reset to uncompleted for all nodes in a loop + if (node.uses === 'loop-controller') { + this.getNodesInLoop(node.id).forEach((id) => { + this.resetNodeCompletion(id) + }) + } } } } @@ -198,6 +229,36 @@ export class GraphTraverser { return this.allPredecessors } + getAllSuccessors(): Map> { + return this.allSuccessors + } + + getPredecessors(nodeId: string): Set { + const predecessors = this.allPredecessors.get(nodeId) + if (!predecessors) return new Set() + return predecessors + } + + getSuccessors(nodeId: string): Set { + const successors = this.allSuccessors.get(nodeId) + if (!successors) return new Set() + return successors + } + + getNodesInLoop(id: string): Set { + const loopNodes = this.nodesInLoops.get(id) + if (!loopNodes) return new Set() + return loopNodes + } + + resetNodeCompletion(nodeId: string): void { + this.completedNodes.delete(nodeId) + } + + getNode(nodeId: string, blueprint: WorkflowBlueprint): NodeDefinition | undefined { + return blueprint.nodes.find((n) => n.id === nodeId) + } + addDynamicNode(_nodeId: string, dynamicNode: NodeDefinition, predecessorId: string, gatherNodeId?: string): void { this.dynamicBlueprint.nodes.push(dynamicNode) this.allPredecessors.set(dynamicNode.id, new Set([predecessorId])) diff --git a/packages/core/src/runtime/workflow-logic-handler.ts b/packages/core/src/runtime/workflow-logic-handler.ts index 6c28d8e..0212fd3 100644 --- a/packages/core/src/runtime/workflow-logic-handler.ts +++ b/packages/core/src/runtime/workflow-logic-handler.ts @@ -25,16 +25,9 @@ export class WorkflowLogicHandler { ): Promise<{ node: NodeDefinition; edge: EdgeDefinition }[]> { if (!result) return [] - let effectiveSourceNodeId = completedNodeId - const completedNodeDef = blueprint.nodes.find((n) => n.id === completedNodeId) - if (completedNodeDef?.uses === 'loop-controller' && result.action !== 'continue') - // act as if the loop's end node just finished - effectiveSourceNodeId = completedNodeDef.params?.endNodeId + const effectiveSourceNodeId = completedNodeId - let directOutgoingEdges = blueprint.edges.filter((edge) => edge.source === effectiveSourceNodeId) - if (effectiveSourceNodeId !== completedNodeId) - // breaking from a loop, ignore the edge that leads back to the controller - directOutgoingEdges = directOutgoingEdges.filter((edge) => edge.target !== completedNodeId) + const directOutgoingEdges = blueprint.edges.filter((edge) => edge.source === effectiveSourceNodeId) const nodesThisIsAFallbackFor = blueprint.nodes.filter((n) => n.config?.fallback === completedNodeId) const inheritedOutgoingEdges = nodesThisIsAFallbackFor.flatMap((originalNode) => diff --git a/packages/core/test/flow.test.ts b/packages/core/test/flow.test.ts index aa2c829..4f0356f 100644 --- a/packages/core/test/flow.test.ts +++ b/packages/core/test/flow.test.ts @@ -196,24 +196,32 @@ describe('Flow Builder', () => { it('should wire the `continue` and `break` paths for a .loop() construct', () => { const flow = createFlow('test') flow.node('start', async () => ({})) + flow.node('inner', async () => ({})) flow.node('end', async () => ({})) - flow.edge('start', 'end') - flow.loop('loop1', { - startNodeId: 'start', - endNodeId: 'end', + flow.edge('start', 'loop') + flow.edge('loop', 'end') + flow.loop('loop', { + startNodeId: 'inner', + endNodeId: 'inner', condition: 'i < 10', }) const blueprint = flow.toBlueprint() - expect(blueprint.edges).toHaveLength(3) - expect(blueprint.edges[1]).toEqual({ - source: 'end', - target: 'loop1-loop', - }) + expect(blueprint.edges).toHaveLength(4) expect(blueprint.edges[2]).toEqual({ - source: 'loop1-loop', - target: 'start', + source: 'inner', + target: 'loop', + }) + expect(blueprint.edges[3]).toEqual({ + source: 'loop', + target: 'inner', action: 'continue', - transform: 'context.end', + transform: 'context.inner', + }) + expect(blueprint.edges[0]).toEqual({ + source: 'loop', + target: 'end', + action: 'break', + transform: 'context.inner', }) }) @@ -294,8 +302,8 @@ describe('Flow Builder', () => { flow.node('end', async () => ({})) flow.node('exit', async () => ({})) flow.edge('start', 'end') - flow.edge('end', 'exit') // This should become a break edge - flow.loop('loop1', { + flow.edge('loop', 'exit') // This should become a break edge + flow.loop('loop', { startNodeId: 'start', endNodeId: 'end', condition: 'i < 10', diff --git a/packages/core/test/runtime.test.ts b/packages/core/test/runtime.test.ts index b131d0d..49e9862 100644 --- a/packages/core/test/runtime.test.ts +++ b/packages/core/test/runtime.test.ts @@ -137,9 +137,8 @@ describe('Flowcraft Runtime - Integration Tests', () => { endNodeId: 'loopBody', condition: 'counter < 2', }) - .edge('start', 'loopBody') - // The break path is the default edge (no action) from the loop controller - .edge('mainLoop-loop', 'end') + .edge('start', 'mainLoop') + .edge('mainLoop', 'end') const runtime = new FlowRuntime({ evaluator: new UnsafeEvaluator() }) const result = await runtime.run(flow.toBlueprint(), {}, { functionRegistry: flow.getFunctionRegistry() }) @@ -150,6 +149,43 @@ describe('Flowcraft Runtime - Integration Tests', () => { expect(result.context['_outputs.end']).toBe('finished') }) + it('should correctly execute a loop involving multiple nodes', async () => { + const loopBodyMock = vi.fn(async ({ context }) => { + const count = (await context.get('counter')) + 1 || 1 + await context.set('counter', count) + return { output: `iteration_${count}` } + }) + const branchBeforeLoop = vi.fn(async () => ({ output: 0 })) + + const flow = createFlow('multi-node-loop-test') + .node('start', async ({ context }) => { + await context.set('counter', 0) + return { output: 0 } + }) + .node('branchBeforeLoop', branchBeforeLoop) + .node('loopBody', async () => ({ output: 0 })) + .node('loopBody2', loopBodyMock) + .node('loopBody3', async () => ({ output: 0 })) + .node('end', async () => ({ output: 'finished' })) + .loop('mainLoop', { + startNodeId: 'loopBody', + endNodeId: 'loopBody3', + condition: 'counter < 3', + }) + .edge('start', 'mainLoop') + .edge('start', 'branchBeforeLoop') + .edge('loopBody', 'loopBody2') + .edge('loopBody2', 'loopBody3') + .edge('mainLoop', 'end') + + const runtime = new FlowRuntime({ evaluator: new UnsafeEvaluator() }) + const result = await runtime.run(flow.toBlueprint(), {}, { functionRegistry: flow.getFunctionRegistry() }) + + expect(result.status).toBe('completed') + expect(loopBodyMock).toHaveBeenCalledTimes(3) + expect(branchBeforeLoop).toBeCalled() + }) + it('should execute fallback node when main node fails', async () => { const flow = createFlow('fallback-test') flow diff --git a/packages/core/test/runtime/builtins.test.ts b/packages/core/test/runtime/builtins.test.ts index b63ef35..2232e8f 100644 --- a/packages/core/test/runtime/builtins.test.ts +++ b/packages/core/test/runtime/builtins.test.ts @@ -169,9 +169,8 @@ describe('Built-In Nodes', () => { const finalCount = await context.get('count') return { output: `Finalized at ${finalCount}` } }) - .edge('initialize', 'increment') - // The intuitive edge that now works, replacing the need for .edge('counter-loop', 'finalize') - .edge('increment', 'finalize') + .edge('initialize', 'counter') + .edge('counter', 'finalize') const runtime = new FlowRuntime({ evaluator: new UnsafeEvaluator() }) const result = await runtime.run(flow.toBlueprint(), {}, { functionRegistry: flow.getFunctionRegistry() }) @@ -199,7 +198,7 @@ describe('Built-In Nodes', () => { return { output: `checked_${counter}` } }) .node('final', async () => ({ output: 'final' })) - .edge('initialize', 'increment') + .edge('initialize', 'test-loop') .edge('increment', 'check') .loop('test-loop', { startNodeId: 'increment', @@ -207,7 +206,7 @@ describe('Built-In Nodes', () => { condition: 'counter < 3', }) // This intuitive edge from the loop's end node ('check') now correctly wires the exit. - .edge('check', 'final') + .edge('test-loop', 'final') const runtime = new FlowRuntime({ evaluator: new UnsafeEvaluator() }) const result = await runtime.run(flow.toBlueprint(), {}, { functionRegistry: flow.getFunctionRegistry() }) diff --git a/packages/core/test/testing/stepper.test.ts b/packages/core/test/testing/stepper.test.ts index 7c55e74..6698ade 100644 --- a/packages/core/test/testing/stepper.test.ts +++ b/packages/core/test/testing/stepper.test.ts @@ -235,22 +235,23 @@ describe('createStepper', () => { return { output: `work_${count}` } }) .node('end', async () => ({ output: 'end' })) - .edge('init', 'work') + .edge('init', 'test-loop') .loop('test-loop', { startNodeId: 'work', endNodeId: 'work', condition: 'count < 1', }) - .edge('work', 'end') + .edge('test-loop', 'end') const runtime = new FlowRuntime({ evaluator: new (await import('../../src/evaluator')).UnsafeEvaluator() }) const stepper = await createStepper(runtime, flow.toBlueprint(), flow.getFunctionRegistry(), {}) // Execute steps await stepper.next() // init - await stepper.next() // work1 - await stepper.next() // end - const result = await stepper.next() + await stepper.next() // controller + await stepper.next() // work + await stepper.next() // controller + const result = await stepper.next() // end expect(result?.status).toBe('completed') expect((await stepper.state.getContext().toJSON())['_outputs.end']).toBe('end') })