TDengine 数据订阅 — Topic 与 Consumer 基础
分类6.数据订阅 TMQ |篇章01 Topic 与 Consumer适用版本TDengine v3.xv3.3.x / v3.4.xTDengine 内置的 TMQTime-series Message Queue让数据库既是存储也是消息总线。应用通过 Topic 订阅数据变化无需额外部署 Kafka 即可获得数据库 消息队列双重能力。核心概念速查表概念说明Topic订阅主题基于 SQL 查询定义Consumer消费者实例Consumer Group消费组同组共享分区Offset消费位点VGroup 内序号Subscribe订阅一个或多个 TopicCommit提交位点标记已消费Rebalance消费组成员变化时重新分配分区详细解析1. Topic 的三种类型① 列订阅最常用 CREATE TOPIC topic_meters AS SELECT ts, current, voltage FROM meters; - 订阅指定列的所有写入 - 可附加 WHERE 过滤 - 可订阅子集 ② 超级表订阅 CREATE TOPIC topic_all AS STABLE meters; - 订阅整张超级表所有列 - 包含所有子表的变化 ③ 数据库订阅 CREATE TOPIC topic_db AS DATABASE test; - 订阅数据库内所有表的变化 - 用于全库同步场景2. Consumer 模型Consumer 抽象 Consumer Group: etl_workers Consumer-1 ─┐ Consumer-2 ─┼─→ 共同消费 Topic Consumer-3 ─┘ 特点 - 同组消费者分摊 VGroup 分区 - 每个 VGroup 同时只能被组内一个 Consumer 消费 - 跨组互不影响广播模式 分区分配 Topic 在 N 个 VGroup 上有数据 Consumer 组有 M 个成员 分配每个 Consumer 拿 N/M 个 VGroup 如果 M N多余的 Consumer 空转 如果 M N每个 Consumer 处理多个 VGroup3. 订阅生命周期Consumer 完整生命周期 ① 配置 Consumer 参数 - group.id必须 - auto.offset.resetearliest/latest - enable.auto.commit - msg.with.table.name ② 创建 Consumer 实例 ③ Subscribe 一个或多个 Topic ④ Poll 循环 - 拉取消息 - 处理业务逻辑 - Commit Offset ⑤ Unsubscribe可选 ⑥ Close Consumer 关键参数 group.id // 消费组 ID auto.offset.reset // earliest / latest / none enable.auto.commit // true / false auto.commit.interval.ms msg.with.table.name // 消息中包含表名4. Offset 与位点管理Offset 概念 每个 VGroup 维护一个递增序号 Consumer 消费后 Commit Offset 下次启动从 Committed Offset 1 继续 位点策略 auto.offset.reset earliest → 首次订阅从最早位置开始 auto.offset.reset latest → 首次订阅从最新位置开始之前的数据跳过 auto.offset.reset none → 没有位点则报错 自动 vs 手动 Commit 自动定期自动 Commit简单但可能丢/重复 手动业务确认后 Commit精确控制5. Rebalance 机制Rebalance 触发场景 - 新 Consumer 加入组 - 现有 Consumer 离开崩溃/关闭 - Topic 分区变化新增 VGroup Rebalance 过程 ① 协调者检测变化 ② 通知所有 Consumer 暂停消费 ③ 重新分配分区 ④ 各 Consumer 获取新的 VGroup 列表 ⑤ 从 Committed Offset 继续消费 对应用的影响 - 短暂停顿毫秒~秒级 - 未 Commit 的消息可能被另一 Consumer 重新处理 - 建议设计为幂等消费6. 消息内容消费到的消息结构 Message { topic: topic_meters, vgroup_id: 3, offset: 12345, table_name: d001, // 若启用 msg.with.table.name columns: [ts, current, voltage], rows: [ [T1, 25.3, 220], [T2, 25.5, 221], ... ] } 消息特点 - 一次 Poll 可返回多行批量 - 同一消息内可能来自不同子表 - 已写入 WAL 的数据才可订阅 - 时间顺序按写入顺序非业务 ts7. 与 Kafka 的对比特性TDengine TMQKafka数据来源数据库写入直接产生应用显式 produce数据保留与数据库共用存储独立 Topic 文件过滤能力Topic SQL 内置过滤应用层过滤Schema强类型字节流部署数据库内置无额外组件独立集群适用时序数据 ETL/CDC通用消息8. 应用场景① 数据 ETL 订阅原始数据 → 清洗 → 写入下游 ② 实时计算 订阅指标数据 → 聚合 → 报警 ③ 数据同步 订阅整库 → 同步到异地灾备 ④ 数据归档 订阅冷数据 → 备份到对象存储 ⑤ 流计算上游 流计算引擎订阅 Topic 作为输入源代码示例Python 消费者fromtaos.tmqimportConsumer consumerConsumer({group.id:etl_group,auto.offset.reset:earliest,td.connect.user:root,td.connect.pass:taosdata,enable.auto.commit:false,})consumer.subscribe([topic_meters])try:whileTrue:msgconsumer.poll(timeout1.0)ifmsgisNone:continueforblockinmsg:forrowinblock:print(row)consumer.commit()finally:consumer.close()Java 消费者PropertiespropsnewProperties();props.setProperty(TMQConstants.BOOTSTRAP_SERVERS,127.0.0.1:6030);props.setProperty(TMQConstants.GROUP_ID,etl_group);props.setProperty(TMQConstants.ENABLE_AUTO_COMMIT,false);try(TaosConsumerMeterconsumernewTaosConsumer(props)){consumer.subscribe(Collections.singletonList(topic_meters));while(running){ConsumerRecordsMeterrecordsconsumer.poll(Duration.ofMillis(1000));for(ConsumerRecordMeterr:records){process(r.value());}consumer.commitSync();}}管理 Topic-- 创建CREATETOPIC topic_metersASSELECTts,currentFROMmeters;-- 查看SHOWTOPICS;-- 查看 Consumer Group 状态SELECT*FROMinformation_schema.ins_subscriptions;-- 删除DROPTOPIC topic_meters;性能考量消费性能因素影响单 Consumer 拉取批量大批量提升吞吐并发 Consumer 数与 VGroup 数匹配最优处理逻辑耗时直接限制吞吐Commit 频率频繁 Commit 影响性能配置建议场景配置低延迟实时poll 短间隔 频繁 commit高吞吐 ETL大批量 异步 commit严格一次手动 commit 业务幂等FAQQ1: 订阅历史数据吗可以。设置auto.offset.resetearliest从 WAL 中可保留范围的最早数据开始消费。但 WAL 有保留期限。Q2: 一个 Topic 能多少 Consumer 同时消费同组内最多 VGroup 数。再多的 Consumer 会空闲。跨组不同 group.id则无限制。Q3: 消费失败如何重试不 Commit Offset 即可。下次 Poll 仍能拿到该消息。注意业务必须幂等。Q4: Topic 删除后消费者会怎样Poll 返回错误。需要重新订阅其他 Topic 或关闭 Consumer。Q5: TMQ 与流计算关系流计算本质上是内置消费 计算 写入。可以用 Topic 作为流计算的输入实现外部应用可见的中间数据。参考系统构架篇01-《TDengine 整体架构全景》02-《集群拓扑深度解析》03-《MNode 内部机制深度解析》04-《RPC 通信层深度解析》05-《VNode 生命周期》06-《RAFT 共识协议》07-《端到端的消息流》数据模型01-《数据库创建与参数详解》02-《超级表/子表/普通表》03-《支持数据类型深度解析》04-《TDengine Tag 设计哲学与 Schema 变更机制》05-《TDengine 虚拟表实现原理》存储引擎01-《TDengine 存储引擎概览》02-《TDengine MemTable 深度解析》03-《TDengine WAL 预写日志机制》04-《TDengine 数据文件格式》05-《TDengine Commit 与 Flush 机制 》06-《TDengine Compaction 合并策略 》07-《TDengine 数据保留与 TTL》08-《TDengine 压缩编码机制》09-《TDengine Cache 与 Last 查询加速》10-《TDengine 逻辑计划生成》查询引擎01-《TDengine 查询引擎概览》02-《TDengine SQL 解析与词法分析》03-《TDengine 语义分析与 AST 重写》04-《TDengine 逻辑计划生成》05-《TDengine 物理计划生成》06-《TDengine 扫描算子》07-《TDengine 聚合算子》08-《TDengine 聚合算子》09-《TDengine 连接算子》10-《TDengine 排序、填充与投影》11-《TDengine 分布式查询执行》12-《TDengine EXPLAIN 与查询优化》数据写入01-《TDengine SQL INSERT》02-《TDengine 无模式写入》03-《TDengine STMT 写入》04-《TDengine 写入内部流程》05-《TDengine 数据更新删除》关于 TDengineTDengine 专为物联网IoT平台、工业大数据平台设计。其中TDengine TSDB 是一款高性能、分布式的时序数据库Time Series Database同时它还带有内建的缓存、流式计算、数据订阅等系统功能TDengine IDMP 是一款AI原生工业数据管理平台它通过树状层次结构建立数据目录对数据进行标准化、情景化并通过 AI 提供实时分析、可视化、事件管理与报警等功能。

相关新闻

“有求必应“的leader,差点带废了整个团

“有求必应“的leader,差点带废了整个团

作者:技术从业16年,踩过坑、做过技术负责人、带过团队,也亲眼看着AI把很多"理所当然"的事情重新洗牌。不追热点,只写真实踩过的坑和总结过的东西,欢迎关注一起交流。上周,组里一个新人跑来问我一…

2026/6/28 4:58:21阅读更多 →
2026杭州APP开发公司排行:哪家好?电商、平台与AI应用项目怎么选

2026杭州APP开发公司排行:哪家好?电商、平台与AI应用项目怎么选

2026杭州APP开发公司排行:哪家好?电商、平台与AI应用项目怎么选 杭州企业做APP,常见项目集中在电商交易、品牌会员、内容服务、知识付费、预约到店、平台撮合、企业服务和AI应用等方向。 很多企业在启动APP项目时,会先问开发周期和…

2026/6/28 4:58:21阅读更多 →
Claude Code 切换为 OpenAI 格式 API 接入说明

Claude Code 切换为 OpenAI 格式 API 接入说明

Claude Code 切换为 OpenAI 格式 API 接入说明 一、适用场景与前提 已安装 Claude Code(claude 命令可正常运行,无论当前走官方登录还是 Anthropic 直连)。手里有第三方中转站提供的 OpenAI 格式 API:一个 base URL(形…

2026/6/28 4:53:21阅读更多 →
大模型 API 返回内容太短的完整排查:max_tokens、stop、stream 与上下文窗口配置

大模型 API 返回内容太短的完整排查:max_tokens、stop、stream 与上下文窗口配置

调用大模型 API 时,经常会遇到一个现象:网页端能生成很长的内容,但换成 API 后只返回几句话;明明把参数调大了,回答还是很短;更严重时,JSON 生成到一半就断掉,右括号都没有。这类问题…

2026/6/28 6:28:26阅读更多 →
做GEO代理多久可以盈利

做GEO代理多久可以盈利

这是代理商在付诸行动之前最想算清楚的一笔账。多久能回本?多久开始赚钱?多久收入稳定?影响盈利周期的四个变量变量一:代理模式。 全案托管代理前期投入最少,启动最快——从签约培训到开始开发客户通常只需2-3周。盈利…

2026/6/28 6:28:26阅读更多 →
IDM激活脚本:免费永久使用IDM下载管理器的完整指南

IDM激活脚本:免费永久使用IDM下载管理器的完整指南

IDM激活脚本:免费永久使用IDM下载管理器的完整指南 【免费下载链接】IDM-Activation-Script IDM Activation & Trail Reset Script 项目地址: https://gitcode.com/gh_mirrors/id/IDM-Activation-Script IDM激活脚本是一款开源免费的Internet Download M…

2026/6/28 6:28:26阅读更多 →
OpCore-Simplify:黑苹果配置自动化的终极解决方案

OpCore-Simplify:黑苹果配置自动化的终极解决方案

OpCore-Simplify:黑苹果配置自动化的终极解决方案 【免费下载链接】OpCore-Simplify A tool designed to simplify the creation of OpenCore EFI 项目地址: https://gitcode.com/GitHub_Trending/op/OpCore-Simplify 在构建黑苹果系统的复杂过程中&#xff…

2026/6/28 6:28:26阅读更多 →
Python类实例化过程详解

Python类实例化过程详解

Python类实例化过程详解 Python 类实例化的完整过程远不止表面的 __init__,它涉及元类、__new__和__init__三者的深度协作。 下面我将从底层原理、执行顺序、内存分配以及继承场景四个维度,为你层层拆解。 一、核心触发点:元类的 __call__…

2026/6/28 6:28:26阅读更多 →
自动售货机放哪里最合适?6大热门场景分析~YH

自动售货机放哪里最合适?6大热门场景分析~YH

很多人以为随便找个地方放一台自动售货机就能运转起来,其实场地选择直接决定了机器的使用率和寿命。今天给大家盘点一下最适合放自动售货机的6大场景,以及每个场景需要注意的要点。学校场景学校是比较稳定的场景,师生群体固定,消费…

2026/6/28 6:23:26阅读更多 →
AI Coding 六个月真实ROI账本:产品经理的血泪教训,研发的冷静忠告

AI Coding 六个月真实ROI账本:产品经理的血泪教训,研发的冷静忠告

6个月前的2025年12月,Boris Cherny 公开宣布自己卸载了 IDE。一时间,Vibe Coding 成了全行业最热的话题。6个月后,当我们回过头来拉一份真实账本,发现事情远没有"一句话生成一个App"那么浪漫。本文从产品经理和研发两个…

2026/6/28 0:08:01阅读更多 →
审计来了,数据权限全开——审计走了,怎么确保权限全部关掉?

审计来了,数据权限全开——审计走了,怎么确保权限全部关掉?

引言:审计结束三个月了,审计员的权限还没关某城商行每年按照监管要求开展至少一次数据安全审计。审计期间,内审部门需要抽样检查各类业务数据——交易流水、客户信息、员工操作日志、权限配置记录。这些数据分布在不同系统中,审计…

2026/6/28 0:08:01阅读更多 →
AI Coding 六个月真实ROI账本:产品经理的血泪教训,研发的冷静忠告

AI Coding 六个月真实ROI账本:产品经理的血泪教训,研发的冷静忠告

6个月前的2025年12月,Boris Cherny 公开宣布自己卸载了 IDE。一时间,Vibe Coding 成了全行业最热的话题。6个月后,当我们回过头来拉一份真实账本,发现事情远没有"一句话生成一个App"那么浪漫。本文从产品经理和研发两个…

2026/6/28 0:08:01阅读更多 →
审计来了,数据权限全开——审计走了,怎么确保权限全部关掉?

审计来了,数据权限全开——审计走了,怎么确保权限全部关掉?

引言:审计结束三个月了,审计员的权限还没关某城商行每年按照监管要求开展至少一次数据安全审计。审计期间,内审部门需要抽样检查各类业务数据——交易流水、客户信息、员工操作日志、权限配置记录。这些数据分布在不同系统中,审计…

2026/6/28 0:08:01阅读更多 →