在物聯(lián)網(wǎng)與工業(yè)互聯(lián)網(wǎng)場景中,海量時(shí)序數(shù)據(jù)的實(shí)時(shí)流轉(zhuǎn)是業(yè)務(wù)系統(tǒng)的核心需求。時(shí)序數(shù)據(jù)庫不僅需要高效存儲,還必須提供靈活的數(shù)據(jù)訂閱與推送能力。TDengine 內(nèi)置了類消息隊(duì)列的數(shù)據(jù)訂閱機(jī)制,無需額外部署 Kafka、RabbitMQ 等中間件,即可實(shí)現(xiàn)毫秒級實(shí)時(shí)數(shù)據(jù)推送,大幅降低系統(tǒng)架構(gòu)復(fù)雜度與運(yùn)維成本。本文將從主題創(chuàng)建、消費(fèi)組管理、進(jìn)度控制到典型應(yīng)用場景,全面解析 TDengine 時(shí)序數(shù)據(jù)庫的數(shù)據(jù)訂閱與實(shí)時(shí)推送實(shí)踐。
一、內(nèi)置類消息隊(duì)列:為什么不再需要 Kafka
傳統(tǒng)架構(gòu)中,時(shí)序數(shù)據(jù)從寫入到消費(fèi)往往需要經(jīng)過獨(dú)立的消息隊(duì)列中間件,數(shù)據(jù)鏈路長、運(yùn)維成本高。TDengine 時(shí)序數(shù)據(jù)庫將消息隊(duì)列能力直接內(nèi)置于數(shù)據(jù)庫引擎中,形成”寫入即訂閱”的極簡架構(gòu)。
1.1 核心優(yōu)勢
- 架構(gòu)簡化:無需部署和維護(hù)獨(dú)立的 Kafka、Pulsar 等消息中間件,減少組件數(shù)量和運(yùn)維負(fù)擔(dān)
- 零數(shù)據(jù)拷貝:訂閱系統(tǒng)直接基于 WAL(Write-Ahead Log)讀取數(shù)據(jù),避免了數(shù)據(jù)從數(shù)據(jù)庫到消息隊(duì)列的二次傳輸
- 毫秒級延遲:數(shù)據(jù)寫入完成后立即推送給消費(fèi)者,端到端延遲可控制在毫秒級別
- SQL 預(yù)處理:支持在數(shù)據(jù)庫端完成數(shù)據(jù)過濾與計(jì)算,僅將結(jié)果集推送給消費(fèi)者,大幅減少網(wǎng)絡(luò)傳輸量
1.2 與 Kafka 的對比
| 特性 | TDengine 數(shù)據(jù)訂閱 | Kafka |
|---|---|---|
| 部署方式 | 內(nèi)置,無需額外組件 | 獨(dú)立集群部署 |
| 數(shù)據(jù)存儲 | 與時(shí)序數(shù)據(jù)共享存儲 | 獨(dú)立日志存儲 |
| 數(shù)據(jù)過濾 | 支持 SQL 預(yù)處理 | 需要流處理框架 |
| 消費(fèi)模型 | 推拉結(jié)合 | 純拉取模式 |
| 運(yùn)維成本 | 低(統(tǒng)一運(yùn)維) | 高(獨(dú)立運(yùn)維) |
二、主題創(chuàng)建:靈活定義數(shù)據(jù)來源
主題(Topic)是 TDengine 數(shù)據(jù)訂閱的核心抽象,定義了數(shù)據(jù)的來源和范圍。TDengine 支持三種粒度的主題創(chuàng)建方式,滿足不同場景的數(shù)據(jù)消費(fèi)需求。
2.1 按數(shù)據(jù)庫創(chuàng)建主題
訂閱整個(gè)數(shù)據(jù)庫中的所有表數(shù)據(jù),適用于需要全量數(shù)據(jù)監(jiān)控的場景:
-- 創(chuàng)建數(shù)據(jù)庫級別的主題
CREATE TOPIC topic_db AS DATABASE power;
該主題會自動(dòng)覆蓋 power 數(shù)據(jù)庫中所有表的數(shù)據(jù)變更,包括新增子表的數(shù)據(jù)。
2.2 按超級表創(chuàng)建主題
訂閱指定超級表的數(shù)據(jù),適合按業(yè)務(wù)模塊劃分?jǐn)?shù)據(jù)消費(fèi)范圍:
-- 創(chuàng)建超級表級別的主題
CREATE TOPIC topic_meters AS STABLE meters;
該主題僅包含 meters 超級表及其所有子表的數(shù)據(jù),粒度更精確。
2.3 按 SQL 查詢創(chuàng)建主題
基于自定義 SQL 查詢創(chuàng)建主題,提供最靈活的數(shù)據(jù)篩選能力,是 TDengine 時(shí)序數(shù)據(jù)庫區(qū)別于傳統(tǒng)消息隊(duì)列的核心特性之一:
-- 創(chuàng)建 SQL 查詢主題,僅訂閱電壓超過 220V 的數(shù)據(jù)
CREATE TOPIC topic_high_voltage AS
SELECT ts, location, voltage, current
FROM meters
WHERE voltage > 220;
-- 創(chuàng)建聚合主題,訂閱每分鐘的平均電流
CREATE TOPIC topic_avg_current AS
SELECT _wstart, location, AVG(current) AS avg_current
FROM meters
INTERVAL(1m);
SQL 主題的優(yōu)勢在于:過濾和聚合計(jì)算在數(shù)據(jù)庫服務(wù)端完成,消費(fèi)者只需接收處理后的結(jié)果集,網(wǎng)絡(luò)傳輸量可降低數(shù)倍甚至數(shù)十倍。
2.4 主題管理操作
-- 查看所有主題
SHOW TOPICS;
-- 刪除主題
DROP TOPIC topic_db;
三、實(shí)時(shí)推送:數(shù)據(jù)寫入即推送
TDengine 時(shí)序數(shù)據(jù)庫采用推拉結(jié)合(Push-Pull Hybrid)的數(shù)據(jù)傳輸模式,兼顧實(shí)時(shí)性與資源效率。
3.1 推送機(jī)制
當(dāng)新數(shù)據(jù)寫入 TDengine 時(shí),訂閱系統(tǒng)會立即檢測到 WAL 中的數(shù)據(jù)變更,并主動(dòng)推送給等待中的消費(fèi)者。整個(gè)流程無需消費(fèi)者輪詢,實(shí)現(xiàn)真正的毫秒級延遲:
數(shù)據(jù)寫入 → WAL 落盤 → 訂閱系統(tǒng)檢測 → 主動(dòng)推送至消費(fèi)者
3.2 長輪詢兜底
當(dāng)暫時(shí)沒有新數(shù)據(jù)時(shí),消費(fèi)者通過長輪詢機(jī)制保持與服務(wù)端的連接。一旦有新數(shù)據(jù)到達(dá),服務(wù)端立即響應(yīng)并推送數(shù)據(jù),避免了頻繁空輪詢造成的資源浪費(fèi)。
3.3 代碼示例:消費(fèi)數(shù)據(jù)
import taos
# 創(chuàng)建消費(fèi)者連接
consumer = taos.Consumer(
group_id="g1",
topics=["topic_meters"],
auto_offset="earliest"
)
# 循環(huán)拉取數(shù)據(jù)
for message in consumer:
print(f"主題: {message.topic}, 分區(qū): {message.partition}")
for row in message.rows:
print(f" 時(shí)間: {row[0]}, 電流: {row[1]}, 電壓: {row[2]}")
# 處理完成后提交進(jìn)度
message.commit()
consumer.close()
四、消費(fèi)組管理:共享進(jìn)度與自動(dòng)負(fù)載均衡
消費(fèi)組(Consumer Group)是 TDengine 數(shù)據(jù)訂閱實(shí)現(xiàn)高可靠、高吞吐消費(fèi)的關(guān)鍵機(jī)制。
4.1 共享消費(fèi)進(jìn)度
同一消費(fèi)組內(nèi)的所有消費(fèi)者共享統(tǒng)一的消費(fèi)進(jìn)度。這意味著一條數(shù)據(jù)只會被消費(fèi)組內(nèi)的一個(gè)消費(fèi)者處理,避免了數(shù)據(jù)重復(fù)消費(fèi)的問題。消費(fèi)進(jìn)度由服務(wù)端(mnode)集中管理,持久化存儲,消費(fèi)者重啟后可從斷點(diǎn)繼續(xù)消費(fèi)。
4.2 自動(dòng)負(fù)載均衡(Rebalance)
TDengine 時(shí)序數(shù)據(jù)庫內(nèi)置了自動(dòng) Rebalance 機(jī)制,當(dāng)消費(fèi)組成員發(fā)生變化時(shí),系統(tǒng)會自動(dòng)重新分配數(shù)據(jù)分區(qū):
- 新消費(fèi)者加入:消費(fèi)組新增成員時(shí),自動(dòng)將部分 vnode 分配給新成員
- 消費(fèi)者離開:某個(gè)消費(fèi)者斷開連接或主動(dòng)退出時(shí),其負(fù)責(zé)的分區(qū)自動(dòng)分配給其他成員
- 故障轉(zhuǎn)移:消費(fèi)者故障時(shí),系統(tǒng)在數(shù)秒內(nèi)完成分區(qū)遷移,保證消費(fèi)連續(xù)性
系統(tǒng)每 2 秒檢測一次消費(fèi)組狀態(tài),發(fā)現(xiàn)變化時(shí)自動(dòng)觸發(fā) Rebalance,以 vnode 為最小分配單元,采用均勻分配策略,并優(yōu)先保持已有分配關(guān)系以減少不必要的數(shù)據(jù)遷移。
4.3 代碼示例:多消費(fèi)者協(xié)同
import taos
# 消費(fèi)者 A
consumer_a = taos.Consumer(
group_id="g1", # 同一消費(fèi)組
topics=["topic_meters"],
auto_offset="latest"
)
# 消費(fèi)者 B(另一進(jìn)程或機(jī)器)
consumer_b = taos.Consumer(
group_id="g1", # 同一消費(fèi)組
topics=["topic_meters"],
auto_offset="latest"
)
# 兩個(gè)消費(fèi)者自動(dòng)分配不同的 vnode 分區(qū),并行消費(fèi)
for message in consumer_a:
process(message)
message.commit()
五、進(jìn)度管理:精確控制消費(fèi)位置
消費(fèi)進(jìn)度管理是確保數(shù)據(jù)不丟失、不重復(fù)的關(guān)鍵。TDengine 時(shí)序數(shù)據(jù)庫通過 WAL 版本號精確記錄每個(gè) vnode 的消費(fèi)位置。
5.1 Commit Offset
消費(fèi)者處理完數(shù)據(jù)后,需要提交消費(fèi)進(jìn)度(Commit Offset),告知服務(wù)端已成功處理到哪個(gè)位置:
# 方式一:手動(dòng)提交(推薦)
for message in consumer:
process(message)
message.commit() # 顯式提交
# 方式二:批量提交
messages = []
for message in consumer:
messages.append(message)
if len(messages) >= 100:
process_batch(messages)
for msg in messages:
msg.commit()
messages.clear()
5.2 earliest 與 latest 配置
通過 auto.offset 參數(shù)控制消費(fèi)者首次啟動(dòng)時(shí)的消費(fèi)起始位置:
| 配置值 | 行為 | 適用場景 |
|---|---|---|
earliest | 從最早可用的數(shù)據(jù)開始消費(fèi) | 需要全量歷史數(shù)據(jù)處理 |
latest | 僅從最新數(shù)據(jù)開始消費(fèi) | 僅關(guān)注實(shí)時(shí)數(shù)據(jù) |
none | 無已提交進(jìn)度時(shí)拋出異常 | 必須保證進(jìn)度連續(xù)性 |
# 從最早數(shù)據(jù)開始消費(fèi)(首次啟動(dòng)時(shí)回溯全部歷史數(shù)據(jù))
consumer = taos.Consumer(
group_id="g1",
topics=["topic_meters"],
auto_offset="earliest"
)
# 僅消費(fèi)實(shí)時(shí)數(shù)據(jù)(忽略歷史數(shù)據(jù))
consumer = taos.Consumer(
group_id="g1",
topics=["topic_meters"],
auto_offset="latest"
)
六、兼容 Kafka 風(fēng)格 API:降低遷移成本
對于已經(jīng)使用 Kafka 的團(tuán)隊(duì),TDengine 提供了兼容 Kafka 風(fēng)格的 API 接口,大幅降低遷移成本。開發(fā)者無需學(xué)習(xí)全新的 API 體系,只需調(diào)整連接配置即可完成切換。
6.1 API 對照
| Kafka 概念 | TDengine 對應(yīng)概念 |
|---|---|
| Topic | Topic(主題) |
| Consumer Group | Consumer Group(消費(fèi)組) |
| Partition | VNode(虛擬數(shù)據(jù)節(jié)點(diǎn)) |
| Offset | Commit Offset(消費(fèi)進(jìn)度) |
| poll() | poll()(拉取數(shù)據(jù)) |
| commitSync() | commit()(提交進(jìn)度) |
6.2 遷移示例
# Kafka 風(fēng)格的消費(fèi)者代碼(TDengine)
from taos import Consumer
consumer = Consumer({
"group.id": "g1",
"td.connect.ip": "127.0.0.1",
"td.connect.port": "6030",
"auto.offset.reset": "earliest"
})
consumer.subscribe(["topic_meters"])
while True:
records = consumer.poll(timeout=1000)
for record in records:
handle(record)
consumer.unsubscribe()
consumer.close()
API 的使用方式與 Kafka 高度一致,包括 subscribe()、poll()、commit()、unsubscribe() 等核心方法,開發(fā)者可以快速上手。
七、SQL 預(yù)處理:在數(shù)據(jù)庫端完成過濾
SQL 預(yù)處理是 TDengine 數(shù)據(jù)訂閱區(qū)別于傳統(tǒng)消息隊(duì)列的核心優(yōu)勢。通過在數(shù)據(jù)庫服務(wù)端執(zhí)行過濾、聚合等計(jì)算,僅將結(jié)果集推送給消費(fèi)者,可以大幅減少網(wǎng)絡(luò)傳輸量和客戶端計(jì)算壓力。
7.1 數(shù)據(jù)過濾
-- 僅訂閱特定設(shè)備組的數(shù)據(jù)
CREATE TOPIC topic_group1 AS
SELECT ts, current, voltage
FROM meters
WHERE groupId = 1;
-- 僅訂閱異常數(shù)據(jù)
CREATE TOPIC topic_alert AS
SELECT ts, location, temperature
FROM sensors
WHERE temperature > 80 OR humidity > 95;
7.2 數(shù)據(jù)聚合
-- 訂閱每 5 分鐘的統(tǒng)計(jì)指標(biāo)
CREATE TOPIC topic_stats AS
SELECT _wstart, _wend, location,
AVG(current) AS avg_current,
MAX(voltage) AS max_voltage,
COUNT(*) AS sample_count
FROM meters
WHERE voltage > 200
INTERVAL(5m);
通過 SQL 預(yù)處理,原始數(shù)據(jù)量可能達(dá)到每秒百萬條級別,但推送給消費(fèi)者的聚合結(jié)果可能僅有每秒數(shù)百條,傳輸量降低上千倍。
八、典型應(yīng)用場景
8.1 實(shí)時(shí)監(jiān)控大屏
工業(yè)場景中,監(jiān)控大屏需要實(shí)時(shí)展示設(shè)備運(yùn)行狀態(tài)。通過 TDengine 數(shù)據(jù)訂閱,可以將關(guān)鍵指標(biāo)實(shí)時(shí)推送至前端:
consumer = taos.Consumer(
group_id="dashboard_group",
topics=["topic_realtime_stats"],
auto_offset="latest"
)
for message in consumer:
# 將數(shù)據(jù)推送到 WebSocket 服務(wù)
websocket_server.broadcast(message.rows)
message.commit()
8.2 跨系統(tǒng)數(shù)據(jù)同步
將 TDengine 中的時(shí)序數(shù)據(jù)實(shí)時(shí)同步到 Elasticsearch、ClickHouse 等其他系統(tǒng),用于全文檢索或離線分析:
consumer = taos.Consumer(
group_id="sync_group",
topics=["topic_meters"],
auto_offset="earliest"
)
for message in consumer:
# 批量寫入 Elasticsearch
es_client.bulk_index(message.rows)
message.commit()
8.3 告警通知系統(tǒng)
訂閱關(guān)鍵傳感器數(shù)據(jù),實(shí)時(shí)檢測異常并觸發(fā)告警通知:
-- 創(chuàng)建告警主題
CREATE TOPIC topic_alert AS
SELECT ts, device_id, temperature, pressure
FROM sensors
WHERE temperature > 100 OR pressure < 0.5;
consumer = taos.Consumer(
group_id="alert_group",
topics=["topic_alert"],
auto_offset="latest"
)
for message in consumer:
for row in message.rows:
send_alert(
device=row["device_id"],
metric="temperature",
value=row["temperature"],
channel=["sms", "email", "dingtalk"]
)
message.commit()
九、總結(jié)
TDengine 時(shí)序數(shù)據(jù)庫通過內(nèi)置類消息隊(duì)列的數(shù)據(jù)訂閱機(jī)制,為開發(fā)者提供了從數(shù)據(jù)寫入到實(shí)時(shí)消費(fèi)的一站式解決方案。主題創(chuàng)建的靈活粒度、消費(fèi)組的自動(dòng)負(fù)載均衡、精確的進(jìn)度管理、兼容 Kafka 的 API 設(shè)計(jì)以及強(qiáng)大的 SQL 預(yù)處理能力,使得 TDengine 在實(shí)時(shí)監(jiān)控、數(shù)據(jù)同步、告警通知等場景中展現(xiàn)出顯著優(yōu)勢。
相比傳統(tǒng)”時(shí)序數(shù)據(jù)庫 + 消息隊(duì)列”的復(fù)雜架構(gòu),TDengine 的數(shù)據(jù)訂閱功能不僅簡化了系統(tǒng)架構(gòu)、降低了運(yùn)維成本,更通過零拷貝和推拉結(jié)合的傳輸模式實(shí)現(xiàn)了毫秒級的數(shù)據(jù)推送延遲。如果您正在尋找一款既能高效存儲時(shí)序數(shù)據(jù),又能提供實(shí)時(shí)數(shù)據(jù)分發(fā)能力的時(shí)序數(shù)據(jù)庫,TDengine 無疑是值得深入評估的選擇。歡迎訪問 TDengine 官方文檔中心,獲取更多技術(shù)細(xì)節(jié)與最佳實(shí)踐。



互聯(lián)網(wǎng).png)



-1.png)




.png)


證.png)


伙伴.png)
伙伴.png)
伙伴.png)



