用 Python 写个轻量级工作流引擎
用 Python 写个轻量级工作流引擎做企业级应用时工作流是绕不开的。很多团队一上来就搞 Camunda 或 Activity结果发现太重了维护起来心累。其实大部分场景一个基于 DAG有向无环图的轻量级引擎就够了。为什么传统引擎不好用以前做工作流最头疼三件事配置太复杂。XML 定义流程改个逻辑得翻半天文档业务一变流程就得重构。状态难保。分布式环境下节点挂了任务状态丢了重试机制又得重新写。没法动态调整。现在业务里经常要接 LLM模型输出不确定传统引擎的静态分支根本处理不了。与其堆功能不如把核心逻辑剥离出来用 JSON 定义流程用 Python 跑调度数据在节点间传。核心逻辑DAG 调度器架构其实就三层定义层JSON 写节点和依赖。调度层拓扑排序按顺序跑任务。数据层全局上下文节点间传参。流程大概是这样的解析 JSON生成拓扑结构。入度为 0 的节点进队列。线程池拉取任务执行完更新上下文释放后续节点依赖。没任务了流程结束。代码实现不用第三方库原生 Python 就能写。核心是SimpleWorkflowEngine支持并发和条件分支。import threading import queue from typing import Dict, List, Callable, Any class WorkflowNode: def __init__(self, name: str, action: Callable, condition: Callable None): self.name name self.action action self.condition condition class SimpleWorkflowEngine: def __init__(self): self.nodes {} self.dependencies {} self.adjacency {} self.context {} self.lock threading.Lock() def add_node(self, node: WorkflowNode): self.nodes[node.name] node self.dependencies[node.name] [] self.adjacency[node.name] [] def add_dependency(self, parent: str, child: str): if parent not in self.dependencies[child]: self.dependencies[child].append(parent) if child not in self.adjacency[parent]: self.adjacency[parent].append(child) def execute(self, initial_context: Dict[str, Any]) - Dict[str, Any]: self.context initial_context.copy() in_degree {n: len(d) for n, d in self.dependencies.items()} completed set() q queue.Queue() # 初始节点入队 for n, d in in_degree.items(): if d 0: q.put(n) def worker(): while True: try: name q.get(timeout1) except queue.Empty: with self.lock: if len(completed) len(self.nodes): break continue node self.nodes[name] run True if node.condition: with self.lock: run node.condition(self.context) if run: try: res node.action(self.context) with self.lock: self.context.update(res) except Exception as e: print(fNode {name} failed: {e}) with self.lock: completed.add(name) for nxt in self.adjacency[name]: in_degree[nxt] - 1 if in_degree[nxt] 0: q.put(nxt) q.task_done() threads [threading.Thread(targetworker) for _ in range(4)] for t in threads: t.start() for t in threads: t.join() return self.context测试一下if __name__ __main__: engine SimpleWorkflowEngine() engine.add_node(WorkflowNode(Start, lambda ctx: {loaded: True})) engine.add_node(WorkflowNode(Process, lambda ctx: {res: 42}, lambda ctx: ctx.get(loaded))) engine.add_node(WorkflowNode(Skip, lambda ctx: {skipped: True}, lambda ctx: not ctx.get(loaded))) engine.add_dependency(Start, Process) engine.add_dependency(Start, Skip) print(engine.execute({}))生产环境要注意什么代码跑通只是第一步上线还得考虑这几块1. 记录日志别只打印 stdout。每个实例给个 ID节点开始时间、结束时间、输入输出、报错信息全存数据库。不然出了问题根本没法查。2. 心跳检测任务跑久了容易僵死。执行节点得定时发心跳调度器发现超时就触发重试或降级。3. 断点续跑网络抖动或重启是常事。引擎得支持从最后成功的节点接着跑别每次都从头来。前提是节点操作得是幂等的不然数据就乱了。总结工作流引擎不用追求大而全。DAG 模型够用了重点是把状态管理、并发调度和异常处理做好。代码越轻维护越容易接业务也越快。

相关新闻

加密算法实战指南:从对称/非对称原理到混合系统设计与密钥管理

加密算法实战指南:从对称/非对称原理到混合系统设计与密钥管理

1. 项目概述:为什么今天还要聊加密算法?如果你是一名开发者,或者对技术安全稍有涉猎,那么“加密算法”这个词对你来说肯定不陌生。从我们每天登录网站时输入的密码,到手机支付时一闪而过的安全校验,再到企业…

2026/6/27 0:54:11阅读更多 →
有理群代数Wedderburn分解与Schur指数计算实战指南

有理群代数Wedderburn分解与Schur指数计算实战指南

1. 项目概述:从一道“硬骨头”说起如果你在代数表示论或有限群论领域摸爬滚打过一阵子,大概率会听说过“有理群代数的Wedderburn分解”和“Schur指数”这两个名词。它们常常出现在一些前沿论文的引言里,或者作为某个更宏大定理的证明基石&…

2026/6/27 0:54:10阅读更多 →
94个公共Tracker服务器:彻底终结BT下载卡在99%的终极解决方案

94个公共Tracker服务器:彻底终结BT下载卡在99%的终极解决方案

94个公共Tracker服务器:彻底终结BT下载卡在99%的终极解决方案 【免费下载链接】trackerslist Updated list of public BitTorrent trackers 项目地址: https://gitcode.com/GitHub_Trending/tr/trackerslist 还在为BT下载总是卡在99%而抓狂吗?每次…

2026/6/27 0:49:10阅读更多 →
浅谈Java匿名内部类

浅谈Java匿名内部类

一、特点 基本定义 匿名内部类:没有类名的局部内部类,只能在定义时直接创建唯一对象,只能使用一次。 核心特点 1. 没有类名,不能手动实例化第二次 没有类名,无法用 new 类名() 再次创建对象只能在定义的同时 new 出来&…

2026/6/28 5:38:23阅读更多 →
牛场管理系统

牛场管理系统

选题背景 随着全球人口持续增长和消费结构升级,对优质动物蛋白尤其是牛肉的需求日益旺盛,这为畜牧业特别是肉牛养殖业带来了巨大的发展机遇。然而,传统粗放式的牛场管理模式正面临着严峻挑战:生产效率低下、养殖成本高企、疫病防控…

2026/6/28 5:38:23阅读更多 →
第55篇:代理池架构与IP管理策略

第55篇:代理池架构与IP管理策略

在爬虫开发中,IP封禁是最常见的反爬手段之一。当爬取速度过快或请求量过大时,网站往往会限制或封禁IP。代理池通过管理和轮换多个代理IP,帮助爬虫规避IP封禁风险,提高爬取的稳定性和效率。 本文将详细介绍代理池的架构设计、实现方案以及最佳实践。 一、代理池概述 1.1 …

2026/6/28 5:38:23阅读更多 →
4-12岁数学思维训练App横评:从“刷题机器“到“独立思考“,什么才能真正培养底层逻辑?

4-12岁数学思维训练App横评:从“刷题机器“到“独立思考“,什么才能真正培养底层逻辑?

比做对一道题更重要的,是孩子愿意去想"为什么"。一、一个让无数家长破防的真相"孩子数学不好,是不是刷题不够?"这是我在教育社群看到最多的问题。于是很多家长的做法是:买更多的练习册、报更多的班、逼孩子做…

2026/6/28 5:38:23阅读更多 →
PE缠绕膜打包流程怎么标准化?成都工厂仓库操作参考

PE缠绕膜打包流程怎么标准化?成都工厂仓库操作参考

四川暖辉包装材料有限公司可围绕成都及周边企业的PE缠绕膜采购和使用需求,提供手用缠绕膜、机用缠绕膜及常用规格供应沟通。对于工厂仓库、电商发货区和物流中转现场来说,PE缠绕膜不是简单“拿起来绕几圈”就完成的耗材,它实际参与的是货物整…

2026/6/28 5:38:23阅读更多 →
四款连锁 AI 称重收银软件深度横评与选型指南

四款连锁 AI 称重收银软件深度横评与选型指南

在生鲜零售行业,称重环节的效率和准确性直接决定了门店的运营成本和顾客体验。传统电子秤依赖人工输入代码或记忆快捷键,不仅速度慢,还容易因操作失误导致计价错误,引发客诉。随着人工智能视觉识别技术的成熟,越来越多…

2026/6/28 5:33:23阅读更多 →
AI Coding 六个月真实ROI账本:产品经理的血泪教训,研发的冷静忠告

AI Coding 六个月真实ROI账本:产品经理的血泪教训,研发的冷静忠告

6个月前的2025年12月,Boris Cherny 公开宣布自己卸载了 IDE。一时间,Vibe Coding 成了全行业最热的话题。6个月后,当我们回过头来拉一份真实账本,发现事情远没有"一句话生成一个App"那么浪漫。本文从产品经理和研发两个…

2026/6/28 0:08:01阅读更多 →
审计来了,数据权限全开——审计走了,怎么确保权限全部关掉?

审计来了,数据权限全开——审计走了,怎么确保权限全部关掉?

引言:审计结束三个月了,审计员的权限还没关某城商行每年按照监管要求开展至少一次数据安全审计。审计期间,内审部门需要抽样检查各类业务数据——交易流水、客户信息、员工操作日志、权限配置记录。这些数据分布在不同系统中,审计…

2026/6/28 0:08:01阅读更多 →
AI Coding 六个月真实ROI账本:产品经理的血泪教训,研发的冷静忠告

AI Coding 六个月真实ROI账本:产品经理的血泪教训,研发的冷静忠告

6个月前的2025年12月,Boris Cherny 公开宣布自己卸载了 IDE。一时间,Vibe Coding 成了全行业最热的话题。6个月后,当我们回过头来拉一份真实账本,发现事情远没有"一句话生成一个App"那么浪漫。本文从产品经理和研发两个…

2026/6/28 0:08:01阅读更多 →
审计来了,数据权限全开——审计走了,怎么确保权限全部关掉?

审计来了,数据权限全开——审计走了,怎么确保权限全部关掉?

引言:审计结束三个月了,审计员的权限还没关某城商行每年按照监管要求开展至少一次数据安全审计。审计期间,内审部门需要抽样检查各类业务数据——交易流水、客户信息、员工操作日志、权限配置记录。这些数据分布在不同系统中,审计…

2026/6/28 0:08:01阅读更多 →