在物聯(lián)網與工業(yè)互聯(lián)網場景中,海量時序數(shù)據(jù)的實時流轉是業(yè)務系統(tǒng)的核心需求。時序數(shù)據(jù)庫不僅需要高效存儲,還必須提供靈活的數(shù)據(jù)訂閱與推送能力。TDengine 內置了類消息隊列的數(shù)據(jù)訂閱機制,無需額外部署 Kafka、RabbitMQ 等中間件,即可實現(xiàn)毫秒級實時數(shù)據(jù)推送,大幅降低系統(tǒng)架構復雜度與運維成本。本文將從主題創(chuàng)建、消費組管理、進度控制到典型應用場景,全面解析 TDengine 時序數(shù)據(jù)庫的數(shù)據(jù)訂閱與實時推送實踐。
一、內置類消息隊列:為什么不再需要 Kafka
傳統(tǒng)架構中,時序數(shù)據(jù)從寫入到消費往往需要經過獨立的消息隊列中間件,數(shù)據(jù)鏈路長、運維成本高。TDengine 時序數(shù)據(jù)庫將消息隊列能力直接內置于數(shù)據(jù)庫引擎中,形成”寫入即訂閱”的極簡架構。
1.1 核心優(yōu)勢
- 架構簡化:無需部署和維護獨立的 Kafka、Pulsar 等消息中間件,減少組件數(shù)量和運維負擔
- 零數(shù)據(jù)拷貝:訂閱系統(tǒng)直接基于 WAL(Write-Ahead Log)讀取數(shù)據(jù),避免了數(shù)據(jù)從數(shù)據(jù)庫到消息隊列的二次傳輸
- 毫秒級延遲:數(shù)據(jù)寫入完成后立即推送給消費者,端到端延遲可控制在毫秒級別
- SQL 預處理:支持在數(shù)據(jù)庫端完成數(shù)據(jù)過濾與計算,僅將結果集推送給消費者,大幅減少網絡傳輸量
1.2 與 Kafka 的對比
| 特性 | TDengine 數(shù)據(jù)訂閱 | Kafka |
|---|---|---|
| 部署方式 | 內置,無需額外組件 | 獨立集群部署 |
| 數(shù)據(jù)存儲 | 與時序數(shù)據(jù)共享存儲 | 獨立日志存儲 |
| 數(shù)據(jù)過濾 | 支持 SQL 預處理 | 需要流處理框架 |
| 消費模型 | 推拉結合 | 純拉取模式 |
| 運維成本 | 低(統(tǒng)一運維) | 高(獨立運維) |
二、主題創(chuàng)建:靈活定義數(shù)據(jù)來源
主題(Topic)是 TDengine 數(shù)據(jù)訂閱的核心抽象,定義了數(shù)據(jù)的來源和范圍。TDengine 支持三種粒度的主題創(chuàng)建方式,滿足不同場景的數(shù)據(jù)消費需求。
2.1 按數(shù)據(jù)庫創(chuàng)建主題
訂閱整個數(shù)據(jù)庫中的所有表數(shù)據(jù),適用于需要全量數(shù)據(jù)監(jiān)控的場景:
-- 創(chuàng)建數(shù)據(jù)庫級別的主題
CREATE TOPIC topic_db AS DATABASE power;
該主題會自動覆蓋 power 數(shù)據(jù)庫中所有表的數(shù)據(jù)變更,包括新增子表的數(shù)據(jù)。
2.2 按超級表創(chuàng)建主題
訂閱指定超級表的數(shù)據(jù),適合按業(yè)務模塊劃分數(shù)據(jù)消費范圍:
-- 創(chuàng)建超級表級別的主題
CREATE TOPIC topic_meters AS STABLE meters;
該主題僅包含 meters 超級表及其所有子表的數(shù)據(jù),粒度更精確。
2.3 按 SQL 查詢創(chuàng)建主題
基于自定義 SQL 查詢創(chuàng)建主題,提供最靈活的數(shù)據(jù)篩選能力,是 TDengine 時序數(shù)據(jù)庫區(qū)別于傳統(tǒng)消息隊列的核心特性之一:
-- 創(chuàng)建 SQL 查詢主題,僅訂閱電壓超過 220V 的數(shù)據(jù)
CREATE TOPIC topic_high_voltage AS
SELECT ts, location, voltage, current
FROM meters
WHERE voltage > 220;
-- 創(chuàng)建聚合主題,訂閱每分鐘的平均電流
CREATE TOPIC topic_avg_current AS
SELECT _wstart, location, AVG(current) AS avg_current
FROM meters
INTERVAL(1m);
SQL 主題的優(yōu)勢在于:過濾和聚合計算在數(shù)據(jù)庫服務端完成,消費者只需接收處理后的結果集,網絡傳輸量可降低數(shù)倍甚至數(shù)十倍。
2.4 主題管理操作
-- 查看所有主題
SHOW TOPICS;
-- 刪除主題
DROP TOPIC topic_db;
三、實時推送:數(shù)據(jù)寫入即推送
TDengine 時序數(shù)據(jù)庫采用推拉結合(Push-Pull Hybrid)的數(shù)據(jù)傳輸模式,兼顧實時性與資源效率。
3.1 推送機制
當新數(shù)據(jù)寫入 TDengine 時,訂閱系統(tǒng)會立即檢測到 WAL 中的數(shù)據(jù)變更,并主動推送給等待中的消費者。整個流程無需消費者輪詢,實現(xiàn)真正的毫秒級延遲:
數(shù)據(jù)寫入 → WAL 落盤 → 訂閱系統(tǒng)檢測 → 主動推送至消費者
3.2 長輪詢兜底
當暫時沒有新數(shù)據(jù)時,消費者通過長輪詢機制保持與服務端的連接。一旦有新數(shù)據(jù)到達,服務端立即響應并推送數(shù)據(jù),避免了頻繁空輪詢造成的資源浪費。
3.3 代碼示例:消費數(shù)據(jù)
import taos
# 創(chuàng)建消費者連接
consumer = taos.Consumer(
group_id="g1",
topics=["topic_meters"],
auto_offset="earliest"
)
# 循環(huán)拉取數(shù)據(jù)
for message in consumer:
print(f"主題: {message.topic}, 分區(qū): {message.partition}")
for row in message.rows:
print(f" 時間: {row[0]}, 電流: {row[1]}, 電壓: {row[2]}")
# 處理完成后提交進度
message.commit()
consumer.close()
四、消費組管理:共享進度與自動負載均衡
消費組(Consumer Group)是 TDengine 數(shù)據(jù)訂閱實現(xiàn)高可靠、高吞吐消費的關鍵機制。
4.1 共享消費進度
同一消費組內的所有消費者共享統(tǒng)一的消費進度。這意味著一條數(shù)據(jù)只會被消費組內的一個消費者處理,避免了數(shù)據(jù)重復消費的問題。消費進度由服務端(mnode)集中管理,持久化存儲,消費者重啟后可從斷點繼續(xù)消費。
4.2 自動負載均衡(Rebalance)
TDengine 時序數(shù)據(jù)庫內置了自動 Rebalance 機制,當消費組成員發(fā)生變化時,系統(tǒng)會自動重新分配數(shù)據(jù)分區(qū):
- 新消費者加入:消費組新增成員時,自動將部分 vnode 分配給新成員
- 消費者離開:某個消費者斷開連接或主動退出時,其負責的分區(qū)自動分配給其他成員
- 故障轉移:消費者故障時,系統(tǒng)在數(shù)秒內完成分區(qū)遷移,保證消費連續(xù)性
系統(tǒng)每 2 秒檢測一次消費組狀態(tài),發(fā)現(xiàn)變化時自動觸發(fā) Rebalance,以 vnode 為最小分配單元,采用均勻分配策略,并優(yōu)先保持已有分配關系以減少不必要的數(shù)據(jù)遷移。
4.3 代碼示例:多消費者協(xié)同
import taos
# 消費者 A
consumer_a = taos.Consumer(
group_id="g1", # 同一消費組
topics=["topic_meters"],
auto_offset="latest"
)
# 消費者 B(另一進程或機器)
consumer_b = taos.Consumer(
group_id="g1", # 同一消費組
topics=["topic_meters"],
auto_offset="latest"
)
# 兩個消費者自動分配不同的 vnode 分區(qū),并行消費
for message in consumer_a:
process(message)
message.commit()
五、進度管理:精確控制消費位置
消費進度管理是確保數(shù)據(jù)不丟失、不重復的關鍵。TDengine 時序數(shù)據(jù)庫通過 WAL 版本號精確記錄每個 vnode 的消費位置。
5.1 Commit Offset
消費者處理完數(shù)據(jù)后,需要提交消費進度(Commit Offset),告知服務端已成功處理到哪個位置:
# 方式一:手動提交(推薦)
for message in consumer:
process(message)
message.commit() # 顯式提交
# 方式二:批量提交
messages = []
for message in consumer:
messages.append(message)
if len(messages) >= 100:
process_batch(messages)
for msg in messages:
msg.commit()
messages.clear()
5.2 earliest 與 latest 配置
通過 auto.offset 參數(shù)控制消費者首次啟動時的消費起始位置:
| 配置值 | 行為 | 適用場景 |
|---|---|---|
earliest | 從最早可用的數(shù)據(jù)開始消費 | 需要全量歷史數(shù)據(jù)處理 |
latest | 僅從最新數(shù)據(jù)開始消費 | 僅關注實時數(shù)據(jù) |
none | 無已提交進度時拋出異常 | 必須保證進度連續(xù)性 |
# 從最早數(shù)據(jù)開始消費(首次啟動時回溯全部歷史數(shù)據(jù))
consumer = taos.Consumer(
group_id="g1",
topics=["topic_meters"],
auto_offset="earliest"
)
# 僅消費實時數(shù)據(jù)(忽略歷史數(shù)據(jù))
consumer = taos.Consumer(
group_id="g1",
topics=["topic_meters"],
auto_offset="latest"
)
六、兼容 Kafka 風格 API:降低遷移成本
對于已經使用 Kafka 的團隊,TDengine 提供了兼容 Kafka 風格的 API 接口,大幅降低遷移成本。開發(fā)者無需學習全新的 API 體系,只需調整連接配置即可完成切換。
6.1 API 對照
| Kafka 概念 | TDengine 對應概念 |
|---|---|
| Topic | Topic(主題) |
| Consumer Group | Consumer Group(消費組) |
| Partition | VNode(虛擬數(shù)據(jù)節(jié)點) |
| Offset | Commit Offset(消費進度) |
| poll() | poll()(拉取數(shù)據(jù)) |
| commitSync() | commit()(提交進度) |
6.2 遷移示例
# Kafka 風格的消費者代碼(TDengine)
from taos import Consumer
consumer = Consumer({
"group.id": "g1",
"td.connect.ip": "127.0.0.1",
"td.connect.port": "6030",
"auto.offset.reset": "earliest"
})
consumer.subscribe(["topic_meters"])
while True:
records = consumer.poll(timeout=1000)
for record in records:
handle(record)
consumer.unsubscribe()
consumer.close()
API 的使用方式與 Kafka 高度一致,包括 subscribe()、poll()、commit()、unsubscribe() 等核心方法,開發(fā)者可以快速上手。
七、SQL 預處理:在數(shù)據(jù)庫端完成過濾
SQL 預處理是 TDengine 數(shù)據(jù)訂閱區(qū)別于傳統(tǒng)消息隊列的核心優(yōu)勢。通過在數(shù)據(jù)庫服務端執(zhí)行過濾、聚合等計算,僅將結果集推送給消費者,可以大幅減少網絡傳輸量和客戶端計算壓力。
7.1 數(shù)據(jù)過濾
-- 僅訂閱特定設備組的數(shù)據(jù)
CREATE TOPIC topic_group1 AS
SELECT ts, current, voltage
FROM meters
WHERE groupId = 1;
-- 僅訂閱異常數(shù)據(jù)
CREATE TOPIC topic_alert AS
SELECT ts, location, temperature
FROM sensors
WHERE temperature > 80 OR humidity > 95;
7.2 數(shù)據(jù)聚合
-- 訂閱每 5 分鐘的統(tǒng)計指標
CREATE TOPIC topic_stats AS
SELECT _wstart, _wend, location,
AVG(current) AS avg_current,
MAX(voltage) AS max_voltage,
COUNT(*) AS sample_count
FROM meters
WHERE voltage > 200
INTERVAL(5m);
通過 SQL 預處理,原始數(shù)據(jù)量可能達到每秒百萬條級別,但推送給消費者的聚合結果可能僅有每秒數(shù)百條,傳輸量降低上千倍。
八、典型應用場景
8.1 實時監(jiān)控大屏
工業(yè)場景中,監(jiān)控大屏需要實時展示設備運行狀態(tài)。通過 TDengine 數(shù)據(jù)訂閱,可以將關鍵指標實時推送至前端:
consumer = taos.Consumer(
group_id="dashboard_group",
topics=["topic_realtime_stats"],
auto_offset="latest"
)
for message in consumer:
# 將數(shù)據(jù)推送到 WebSocket 服務
websocket_server.broadcast(message.rows)
message.commit()
8.2 跨系統(tǒng)數(shù)據(jù)同步
將 TDengine 中的時序數(shù)據(jù)實時同步到 Elasticsearch、ClickHouse 等其他系統(tǒng),用于全文檢索或離線分析:
consumer = taos.Consumer(
group_id="sync_group",
topics=["topic_meters"],
auto_offset="earliest"
)
for message in consumer:
# 批量寫入 Elasticsearch
es_client.bulk_index(message.rows)
message.commit()
8.3 告警通知系統(tǒng)
訂閱關鍵傳感器數(shù)據(jù),實時檢測異常并觸發(fā)告警通知:
-- 創(chuàng)建告警主題
CREATE TOPIC topic_alert AS
SELECT ts, device_id, temperature, pressure
FROM sensors
WHERE temperature > 100 OR pressure < 0.5;
consumer = taos.Consumer(
group_id="alert_group",
topics=["topic_alert"],
auto_offset="latest"
)
for message in consumer:
for row in message.rows:
send_alert(
device=row["device_id"],
metric="temperature",
value=row["temperature"],
channel=["sms", "email", "dingtalk"]
)
message.commit()
九、總結
TDengine 時序數(shù)據(jù)庫通過內置類消息隊列的數(shù)據(jù)訂閱機制,為開發(fā)者提供了從數(shù)據(jù)寫入到實時消費的一站式解決方案。主題創(chuàng)建的靈活粒度、消費組的自動負載均衡、精確的進度管理、兼容 Kafka 的 API 設計以及強大的 SQL 預處理能力,使得 TDengine 在實時監(jiān)控、數(shù)據(jù)同步、告警通知等場景中展現(xiàn)出顯著優(yōu)勢。
相比傳統(tǒng)”時序數(shù)據(jù)庫 + 消息隊列”的復雜架構,TDengine 的數(shù)據(jù)訂閱功能不僅簡化了系統(tǒng)架構、降低了運維成本,更通過零拷貝和推拉結合的傳輸模式實現(xiàn)了毫秒級的數(shù)據(jù)推送延遲。如果您正在尋找一款既能高效存儲時序數(shù)據(jù),又能提供實時數(shù)據(jù)分發(fā)能力的時序數(shù)據(jù)庫,TDengine 無疑是值得深入評估的選擇。歡迎訪問 TDengine 官方文檔中心,獲取更多技術細節(jié)與最佳實踐。



互聯(lián)網.png)



-1.png)












伙伴.png)



