Workflow
@aiao/rxdb-plugin-trigger 支持基于 WorkflowJSON 的 DAG(有向无环图)任务编排。
WorkflowJSON 结构
interface WorkflowJSON {
metadata: {
name: string;
description?: string;
version?: number;
maxFanOutConcurrency?: number; // 并行节点最大并发数
};
nodes: WorkflowNode[];
edges: WorkflowEdge[];
}
节点类型
| 类型 | 说明 | 必填字段 |
|---|---|---|
task | 执行已注册的任务 | taskId |
condition | 条件分支,根据表达式选择下游路径 | config.expression |
delay | 延迟节点 | config.delayMs |
fan-out | 并行扇出 | — |
fan-in | 汇聚节点,等待所有上游完成 | — |
边类型
| 类型 | 说明 |
|---|---|
success | 上游成功时走此边 |
failure | 上游失败时走此边 |
always | 无论成功失败都走此边 |
示例:数据处理流水线
const workflow: WorkflowJSON = {
metadata: {
name: 'data-pipeline',
version: 1,
maxFanOutConcurrency: 3
},
nodes: [
{ id: 'fetch', type: 'task', taskId: 'fetch-data', label: '获取数据' },
{ id: 'validate', type: 'task', taskId: 'validate-data', label: '校验' },
{ id: 'transform', type: 'task', taskId: 'transform-data', label: '转换' },
{ id: 'notify-ok', type: 'task', taskId: 'send-notification', label: '成功通知' },
{ id: 'notify-fail', type: 'task', taskId: 'send-alert', label: '失败告警' }
],
edges: [
{ id: 'e1', source: 'fetch', target: 'validate', edgeType: 'success' },
{ id: 'e2', source: 'validate', target: 'transform', edgeType: 'success' },
{ id: 'e3', source: 'transform', target: 'notify-ok', edgeType: 'success' },
{ id: 'e4', source: 'validate', target: 'notify-fail', edgeType: 'failure' },
{ id: 'e5', source: 'transform', target: 'notify-fail', edgeType: 'failure' }
]
};
校验
在执行前使用 validateWorkflow() 校验 WorkflowJSON:
import { validateWorkflow } from '@aiao/rxdb-plugin-trigger';
const result = validateWorkflow(workflow);
if (!result.valid) {
console.error('校验失败:', result.errors);
// [{ path: 'edges[0].target', message: 'Referenced node "xxx" not found' }]
}
校验项包括:
- 必填字段完整性
- Node/Edge ID 唯一性
task节点必须有taskId- Edge 引用的 node 必须存在
- 无自环
- 无 DAG 环路(DFS 检测)
执行
// 启动 workflow
const run = await rxdb.trigger.workflowExecutor.startWorkflow(
workflow,
{ inputData: 'hello' } // payload
);
// 查询 workflow 运行状态
const wfRun = await rxdb.trigger.workflowExecutor.getWorkflowRun(run.id);
// 查询各节点状态
const nodeRuns = await rxdb.trigger.workflowExecutor.getNodeRuns(run.id);
for (const node of nodeRuns) {
console.log(`${node.nodeId}: ${node.status}`);
}
// 取消 workflow
await rxdb.trigger.workflowExecutor.cancelWorkflow(run.id);
节点重试
单个节点支持独立的重试配置:
{
id: 'flaky-api',
type: 'task',
taskId: 'call-api',
retry: { maxAttempts: 3 },
}
失败节点重试不影响其他已完成的节点。当重试耗尽时,节点标记为 FAILED,下游不可达节点标记为 SKIPPED。
运行状态
Workflow Run 状态
PENDING → RUNNING → COMPLETED / FAILED / CANCELED
Node Run 状态
PENDING → RUNNING → COMPLETED / FAILED / SKIPPED / CANCELED
Fan-out / Fan-in
并行处理多个分支,然后汇聚:
{
nodes: [
{ id: 'start', type: 'task', taskId: 'prepare' },
{ id: 'branch-a', type: 'task', taskId: 'process-a' },
{ id: 'branch-b', type: 'task', taskId: 'process-b' },
{ id: 'branch-c', type: 'task', taskId: 'process-c' },
{ id: 'merge', type: 'fan-in' },
{ id: 'finish', type: 'task', taskId: 'finalize' },
],
edges: [
{ id: 'e1', source: 'start', target: 'branch-a', edgeType: 'success' },
{ id: 'e2', source: 'start', target: 'branch-b', edgeType: 'success' },
{ id: 'e3', source: 'start', target: 'branch-c', edgeType: 'success' },
{ id: 'e4', source: 'branch-a', target: 'merge', edgeType: 'always' },
{ id: 'e5', source: 'branch-b', target: 'merge', edgeType: 'always' },
{ id: 'e6', source: 'branch-c', target: 'merge', edgeType: 'always' },
{ id: 'e7', source: 'merge', target: 'finish', edgeType: 'success' },
],
}
fan-in 节点使用 pendingUpstreamCount(基于 DAG 入度)作为屏障,所有上游完成后才执行下游。