Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dist/
coverage/
node_modules/
.turbo/
.vscode/

TODO*
NOTE*
Expand Down
1 change: 1 addition & 0 deletions docs/src/guide/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async function run() {
// "serializedContext": "{\"value\":42,\"_outputs.start\":42,\"_inputs.double\":42,\"_outputs.double\":84}",
// "status": "completed"
// }
}

run()
```
Expand Down
10 changes: 9 additions & 1 deletion docs/src/guide/loops.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const flow = createFlow('loop-workflow')
})

// 4. Define the edges.
.edge('initialize', 'increment')
.edge('initialize', 'counter')
.toBlueprint()
```

Expand All @@ -98,6 +98,12 @@ The [`.loop()`](/api/flow#loop-id-options) method adds a `loop-controller` node.
> [!NOTE]
> The [`.loop()`](/api/flow#loop-id-options) method automatically configures the `joinStrategy` of the loop's start and end nodes to `'any'` so they can be re-executed on each iteration.

> [!INFO]
> `joinStrategy` decides how a node should be executed when it has multiple predecessors:
> - `'any'`: the node will be executed when any of the predecessors finishes, possibly for many times
> - `'all'`: the node will be only executed once when all its predecessors finish.
> The default value is `'all'`.

## Security Considerations

By default, Flowcraft uses [`PropertyEvaluator`](/api/evaluator#propertyevaluator-class) for expression evaluation, which only allows simple property access (e.g., `result.output.status`). Complex expressions with operators like `<`, `>`, `===`, or `!==` (as shown in the example above) require the [`UnsafeEvaluator`](/api/evaluator#unsafeevaluator-class).
Expand All @@ -119,6 +125,8 @@ const runtime = new FlowRuntime({

While loops provide a structured way to handle iteration, it's also possible to create workflows with cycles (non-DAG graphs) using manual edges. However, this comes with significant risks and unpredictable behavior.

For this reason, Flowcraft stops at cycles that are not handled by `loop-controller` by default. You should prefer `.loop()` for handling cycles.

### Risks of Non-DAG Workflows

When a workflow contains cycles and is run in non-strict mode, the runtime arbitrarily selects the first node of a detected cycle as the starting point. This can lead to:
Expand Down
2 changes: 1 addition & 1 deletion examples/advanced/hitl-workflow/src/workflow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createFlow, type NodeContext } from 'flowcraft'
import { createInterface } from 'node:readline'
import { createFlow, type NodeContext } from 'flowcraft'

interface WorkflowContext {
task: { id: string; description: string; amount: number; reason: string }
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"packageManager": "pnpm@10.20.0",
"scripts": {
"lint": "biome check packages examples",
"lint:fix": "biome check packages examples --write",
"changeset": "changeset",
"release": "turbo build && changeset publish"
},
Expand Down
46 changes: 17 additions & 29 deletions packages/core/src/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export class FlowBuilder<
> {
private blueprint: Partial<WorkflowBlueprint>
private functionRegistry: Map<string, NodeFunction | NodeClass>
private loopControllerIds: Map<string, string>
private loopDefinitions: Array<{
id: string
startNodeId: string
Expand All @@ -43,7 +42,6 @@ export class FlowBuilder<
constructor(id: string) {
this.blueprint = { id, nodes: [], edges: [] }
this.functionRegistry = new Map()
this.loopControllerIds = new Map()
this.loopDefinitions = []
this.batchDefinitions = []
this.cycleEntryPoints = new Map()
Expand Down Expand Up @@ -191,37 +189,24 @@ export class FlowBuilder<
},
): this {
const { startNodeId, endNodeId, condition } = options
const controllerId = `${id}-loop`

this.loopControllerIds.set(id, controllerId)

this.loopDefinitions.push({ id, startNodeId, endNodeId, condition })

this.blueprint.nodes?.push({
id: controllerId,
id,
uses: 'loop-controller',
params: { condition },
config: { joinStrategy: 'any' },
})

this.edge(endNodeId, controllerId)

this.edge(controllerId, startNodeId, {
this.edge(endNodeId, id)
this.edge(id, startNodeId, {
action: 'continue',
transform: `context.${endNodeId}`,
})

return this
}

getLoopControllerId(id: string): string {
const controllerId = this.loopControllerIds.get(id)
if (!controllerId) {
throw new Error(`Loop with id '${id}' not found. Ensure you have defined it using the .loop() method.`)
}
return controllerId
}

/**
* Sets the preferred entry point for a cycle in non-DAG workflows.
* This helps remove ambiguity when the runtime needs to choose a starting node for cycles.
Expand All @@ -242,10 +227,9 @@ export class FlowBuilder<

// loop edge re-wiring
for (const loopDef of this.loopDefinitions) {
const controllerId = this.getLoopControllerId(loopDef.id)
const edgesToRewire = allOriginalEdges.filter((e) => e.source === loopDef.endNodeId && e.target !== controllerId)
const edgesToRewire = allOriginalEdges.filter((e) => e.source === loopDef.id && e.target !== loopDef.startNodeId)
for (const edge of edgesToRewire) {
finalEdges.push({ ...edge, source: controllerId, action: edge.action || 'break' })
finalEdges.push({ ...edge, action: edge.action || 'break', transform: `context.${loopDef.endNodeId}` })
processedOriginalEdges.add(edge)
}
}
Expand Down Expand Up @@ -283,9 +267,6 @@ export class FlowBuilder<
if (!endNode) {
throw new Error(`Loop '${loopDef.id}' references non-existent end node '${loopDef.endNodeId}'.`)
}

startNode.config = { ...startNode.config, joinStrategy: 'any' }
endNode.config = { ...endNode.config, joinStrategy: 'any' }
}

if (this.cycleEntryPoints.size > 0) {
Expand All @@ -311,10 +292,8 @@ export class FlowBuilder<

// replace loop-controllers with direct, cyclical edges
for (const loopDef of this.loopDefinitions) {
const controllerId = this.loopControllerIds.get(loopDef.id)
if (!controllerId) continue

ignoredNodeIds.add(controllerId)
const id = loopDef.id
ignoredNodeIds.add(id)

// direct edge from the end of loop to start
uiEdges.push({
Expand All @@ -328,13 +307,22 @@ export class FlowBuilder<
})

// re-wire any 'break' edges
const breakEdges = blueprint.edges.filter((edge) => edge.source === controllerId && edge.action === 'break')
const breakEdges = blueprint.edges.filter((edge) => edge.source === id && edge.action === 'break')
for (const breakEdge of breakEdges) {
uiEdges.push({
...breakEdge,
source: loopDef.endNodeId,
})
}

// re-wire any 'incoming' edges
const incomingEdges = blueprint.edges.filter((edge) => edge.target === id && edge.source !== loopDef.endNodeId)
for (const incomingEdge of incomingEdges) {
uiEdges.push({
...incomingEdge,
source: loopDef.startNodeId,
})
}
}

// replace scatter/gather pairs with a single representative worker node
Expand Down
9 changes: 2 additions & 7 deletions packages/core/src/runtime/orchestrators/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,7 @@ export async function processResults(
executionId,
)

const loopControllerMatch = matched.find(
(m: { node: NodeDefinition; edge: any }) => m.node.uses === 'loop-controller',
)
const finalMatched = loopControllerMatch ? [loopControllerMatch] : matched

for (const { node, edge } of finalMatched) {
for (const { node, edge } of matched) {
await runtime.applyEdgeTransform(
edge,
result,
Expand All @@ -116,7 +111,7 @@ export async function processResults(
traverser.markNodeCompleted(
nodeId,
result,
finalMatched.map((m: { node: NodeDefinition; edge: any }) => m.node),
matched.map((m: { node: NodeDefinition; edge: any }) => m.node),
)
} else if (executionResult.status === 'failed_with_fallback') {
const { fallbackNodeId, error } = executionResult
Expand Down
103 changes: 82 additions & 21 deletions packages/core/src/runtime/traverser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,28 @@ export interface ReadyNode {
export class GraphTraverser {
private frontier = new Set<string>()
private allPredecessors: Map<string, Set<string>>
private allSuccessors: Map<string, Set<string>>
private dynamicBlueprint: WorkflowBlueprint
private completedNodes = new Set<string>()
private nodesInLoops: Map<string, Set<string>>

constructor(blueprint: WorkflowBlueprint, isStrictMode: boolean = false) {
this.dynamicBlueprint = structuredClone(blueprint) as WorkflowBlueprint
this.allPredecessors = new Map<string, Set<string>>()
this.allSuccessors = new Map<string, Set<string>>()
this.nodesInLoops = new Map<string, Set<string>>()
for (const node of this.dynamicBlueprint.nodes) {
this.allPredecessors.set(node.id, new Set())
this.allSuccessors.set(node.id, new Set())
}
for (const edge of this.dynamicBlueprint.edges) {
this.allPredecessors.get(edge.target)?.add(edge.source)
this.getPredecessors(edge.target).add(edge.source)
}
for (const edge of this.dynamicBlueprint.edges) {
this.getSuccessors(edge.source).add(edge.target)
}
const analysis = analyzeBlueprint(blueprint)
this.filterNodesInLoops(blueprint)
this.frontier = new Set(analysis.startNodeIds.filter((id) => !this.isFallbackNode(id)))
if (this.frontier.size === 0 && analysis.cycles.length > 0 && !isStrictMode) {
const uniqueStartNodes = new Set<string>()
Expand Down Expand Up @@ -65,7 +74,7 @@ export class GraphTraverser {
if (traverser.completedNodes.has(node.id)) continue

const requiredPredecessors = traverser.allPredecessors.get(node.id)
const joinStrategy = traverser.getEffectiveJoinStrategy(node.id)
const joinStrategy = traverser.getJoinStrategy(node.id)

// if no predecessors and not completed, it's a start node and should be in the frontier
if (!requiredPredecessors || requiredPredecessors.size === 0) {
Expand All @@ -89,25 +98,34 @@ export class GraphTraverser {
return this.dynamicBlueprint.nodes.some((n) => n.config?.fallback === nodeId)
}

private getEffectiveJoinStrategy(nodeId: string): 'any' | 'all' {
private getJoinStrategy(nodeId: string): 'any' | 'all' {
const node = this.dynamicBlueprint.nodes.find((n) => n.id === nodeId)
const baseJoinStrategy = node?.config?.joinStrategy || 'all'
return baseJoinStrategy
}

if (node?.uses === 'loop-controller') {
return 'any'
}
private filterNodesInLoops(blueprint: WorkflowBlueprint): void {
blueprint.nodes.forEach((node) => {
if (node.uses !== 'loop-controller') return

const predecessors = this.allPredecessors.get(nodeId)
if (predecessors) {
for (const predecessorId of predecessors) {
const predecessorNode = this.dynamicBlueprint.nodes.find((n) => n.id === predecessorId)
if (predecessorNode?.uses === 'loop-controller') {
return 'any'
}
}
}
const nextInLoopId = blueprint.edges.find((e) => e.source === node.id && e.action === 'continue')?.target
if (!nextInLoopId) return

return baseJoinStrategy
const set: Set<string> = new Set()
set.add(nextInLoopId)
this.nodesInLoops.set(node.id, this.getAllLoopSuccessors(nextInLoopId, blueprint, set))
})
}

private getAllLoopSuccessors(nodeId: string, blueprint: WorkflowBlueprint, set: Set<string>): Set<string> {
this.getSuccessors(nodeId).forEach((successor) => {
if (set.has(successor)) return
const node = this.getNode(successor, blueprint)
if (!node || node.uses === 'loop-controller') return
set.add(successor)
this.getAllLoopSuccessors(successor, blueprint, set)
})
return set
}

getReadyNodes(): ReadyNode[] {
Expand Down Expand Up @@ -135,18 +153,17 @@ export class GraphTraverser {
this.dynamicBlueprint.nodes.push(dynamicNode)
this.allPredecessors.set(dynamicNode.id, new Set([nodeId]))
if (gatherNodeId) {
this.allPredecessors.get(gatherNodeId)?.add(dynamicNode.id)
this.getPredecessors(gatherNodeId).add(dynamicNode.id)
}
this.frontier.add(dynamicNode.id)
}
}

for (const node of nextNodes) {
const joinStrategy = this.getEffectiveJoinStrategy(node.id)
const joinStrategy = this.getJoinStrategy(node.id)
if (joinStrategy !== 'any' && this.completedNodes.has(node.id)) continue

const requiredPredecessors = this.allPredecessors.get(node.id)
if (!requiredPredecessors) continue
const requiredPredecessors = this.getPredecessors(node.id)

const isReady =
joinStrategy === 'any'
Expand All @@ -155,19 +172,33 @@ export class GraphTraverser {

if (isReady) {
this.frontier.add(node.id)
// reset to uncompleted for all nodes in a loop
if (node.uses === 'loop-controller') {
this.getNodesInLoop(node.id).forEach((id) => {
this.resetNodeCompletion(id)
})
}
}
}

if (nextNodes.length === 0) {
for (const [potentialNextId, predecessors] of this.allPredecessors) {
if (predecessors.has(nodeId) && !this.completedNodes.has(potentialNextId)) {
const joinStrategy = this.getEffectiveJoinStrategy(potentialNextId)
const joinStrategy = this.getJoinStrategy(potentialNextId)
const isReady =
joinStrategy === 'any'
? predecessors.has(nodeId)
: [...predecessors].every((p) => this.completedNodes.has(p))
if (isReady) {
this.frontier.add(potentialNextId)
const node = this.getNode(potentialNextId, this.dynamicBlueprint)
if (!node) continue
// reset to uncompleted for all nodes in a loop
if (node.uses === 'loop-controller') {
this.getNodesInLoop(node.id).forEach((id) => {
this.resetNodeCompletion(id)
})
}
}
}
}
Expand Down Expand Up @@ -198,6 +229,36 @@ export class GraphTraverser {
return this.allPredecessors
}

getAllSuccessors(): Map<string, Set<string>> {
return this.allSuccessors
}

getPredecessors(nodeId: string): Set<string> {
const predecessors = this.allPredecessors.get(nodeId)
if (!predecessors) return new Set()
return predecessors
}

getSuccessors(nodeId: string): Set<string> {
const successors = this.allSuccessors.get(nodeId)
if (!successors) return new Set()
return successors
}

getNodesInLoop(id: string): Set<string> {
const loopNodes = this.nodesInLoops.get(id)
if (!loopNodes) return new Set()
return loopNodes
}

resetNodeCompletion(nodeId: string): void {
this.completedNodes.delete(nodeId)
}

getNode(nodeId: string, blueprint: WorkflowBlueprint): NodeDefinition | undefined {
return blueprint.nodes.find((n) => n.id === nodeId)
}

addDynamicNode(_nodeId: string, dynamicNode: NodeDefinition, predecessorId: string, gatherNodeId?: string): void {
this.dynamicBlueprint.nodes.push(dynamicNode)
this.allPredecessors.set(dynamicNode.id, new Set([predecessorId]))
Expand Down
Loading