跳到主要内容

Workflow

@aiao/rxdb-plugin-trigger 支持基于 WorkflowJSON 的 DAG(有向无环图)任务编排。

WorkflowJSON 结构

interface WorkflowJSON {
metadata: {
name: string;
description?: string;
version?: number;
maxFanOutConcurrency?: number; // 并行节点最大并发数
};
nodes: WorkflowNode[];
edges: WorkflowEdge[];
}

节点类型

类型说明必填字段
task执行已注册的任务taskId
condition条件分支,根据表达式选择下游路径config.expression
delay延迟节点config.delayMs
fan-out并行扇出
fan-in汇聚节点,等待所有上游完成

边类型

类型说明
success上游成功时走此边
failure上游失败时走此边
always无论成功失败都走此边

示例:数据处理流水线

const workflow: WorkflowJSON = {
metadata: {
name: 'data-pipeline',
version: 1,
maxFanOutConcurrency: 3
},
nodes: [
{ id: 'fetch', type: 'task', taskId: 'fetch-data', label: '获取数据' },
{ id: 'validate', type: 'task', taskId: 'validate-data', label: '校验' },
{ id: 'transform', type: 'task', taskId: 'transform-data', label: '转换' },
{ id: 'notify-ok', type: 'task', taskId: 'send-notification', label: '成功通知' },
{ id: 'notify-fail', type: 'task', taskId: 'send-alert', label: '失败告警' }
],
edges: [
{ id: 'e1', source: 'fetch', target: 'validate', edgeType: 'success' },
{ id: 'e2', source: 'validate', target: 'transform', edgeType: 'success' },
{ id: 'e3', source: 'transform', target: 'notify-ok', edgeType: 'success' },
{ id: 'e4', source: 'validate', target: 'notify-fail', edgeType: 'failure' },
{ id: 'e5', source: 'transform', target: 'notify-fail', edgeType: 'failure' }
]
};

校验

在执行前使用 validateWorkflow() 校验 WorkflowJSON:

import { validateWorkflow } from '@aiao/rxdb-plugin-trigger';

const result = validateWorkflow(workflow);
if (!result.valid) {
console.error('校验失败:', result.errors);
// [{ path: 'edges[0].target', message: 'Referenced node "xxx" not found' }]
}

校验项包括:

  • 必填字段完整性
  • Node/Edge ID 唯一性
  • task 节点必须有 taskId
  • Edge 引用的 node 必须存在
  • 无自环
  • 无 DAG 环路(DFS 检测)

执行

// 启动 workflow
const run = await rxdb.trigger.workflowExecutor.startWorkflow(
workflow,
{ inputData: 'hello' } // payload
);

// 查询 workflow 运行状态
const wfRun = await rxdb.trigger.workflowExecutor.getWorkflowRun(run.id);

// 查询各节点状态
const nodeRuns = await rxdb.trigger.workflowExecutor.getNodeRuns(run.id);
for (const node of nodeRuns) {
console.log(`${node.nodeId}: ${node.status}`);
}

// 取消 workflow
await rxdb.trigger.workflowExecutor.cancelWorkflow(run.id);

节点重试

单个节点支持独立的重试配置:

{
id: 'flaky-api',
type: 'task',
taskId: 'call-api',
retry: { maxAttempts: 3 },
}

失败节点重试不影响其他已完成的节点。当重试耗尽时,节点标记为 FAILED,下游不可达节点标记为 SKIPPED

运行状态

Workflow Run 状态

PENDINGRUNNINGCOMPLETED / FAILED / CANCELED

Node Run 状态

PENDINGRUNNINGCOMPLETED / FAILED / SKIPPED / CANCELED

Fan-out / Fan-in

并行处理多个分支,然后汇聚:

{
nodes: [
{ id: 'start', type: 'task', taskId: 'prepare' },
{ id: 'branch-a', type: 'task', taskId: 'process-a' },
{ id: 'branch-b', type: 'task', taskId: 'process-b' },
{ id: 'branch-c', type: 'task', taskId: 'process-c' },
{ id: 'merge', type: 'fan-in' },
{ id: 'finish', type: 'task', taskId: 'finalize' },
],
edges: [
{ id: 'e1', source: 'start', target: 'branch-a', edgeType: 'success' },
{ id: 'e2', source: 'start', target: 'branch-b', edgeType: 'success' },
{ id: 'e3', source: 'start', target: 'branch-c', edgeType: 'success' },
{ id: 'e4', source: 'branch-a', target: 'merge', edgeType: 'always' },
{ id: 'e5', source: 'branch-b', target: 'merge', edgeType: 'always' },
{ id: 'e6', source: 'branch-c', target: 'merge', edgeType: 'always' },
{ id: 'e7', source: 'merge', target: 'finish', edgeType: 'success' },
],
}

fan-in 节点使用 pendingUpstreamCount(基于 DAG 入度)作为屏障,所有上游完成后才执行下游。