眾所周知,在 TDengine 3.0 中,我們對數(shù)據(jù)訂閱功能進行了全面升級,以便大家可以更加便捷地實時訂閱和獲取數(shù)據(jù)的更新,完成實時監(jiān)控、數(shù)據(jù)分析和有效報警等工作。在本文中,TDengine 資深研發(fā)將以 TDengine 3.0 為對象,為大家介紹數(shù)據(jù)訂閱功能的正確打開方式,給到有需要的人作參考指南,避免走入應(yīng)用誤區(qū)。
本文將從 Java Developer 的視角來介紹如何使用 TDengine 3.0 的數(shù)據(jù)訂閱功能。
TDengine 3.0 的版本迭代很快,可能有些配置參數(shù)或細(xì)節(jié)在之后的版本會發(fā)生變化,本文對應(yīng) TDengine 版本為 3.0.3.0。
寫在前面
在官方文檔里已經(jīng)有介紹,TDengine 的數(shù)據(jù)訂閱是什么以及如何使用,有需要的朋友可以通過下方鏈接進入官網(wǎng)查看相關(guān)介紹:
- 開發(fā)指南:https://docs.tdengine.com/taos-sql/
- SQL:https://docs.tdengine.com/taos-sql/tmq/
- Java 使用數(shù)據(jù)訂閱:https://docs.taosdata.com/connector/java/#%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85
總結(jié)一下,我理解的數(shù)據(jù)訂閱功能是以“訂閱”的方式獲取存在于 TDengine 中的數(shù)據(jù)。一般情況下,“訂閱”意味著的業(yè)務(wù)需求是訂閱數(shù)據(jù)庫中的最新數(shù)據(jù)。“訂閱”的流程很簡單:(1)在數(shù)據(jù)庫中創(chuàng)建 topic;(2)在應(yīng)用中消費 topic 的數(shù)據(jù)。
基本操作:創(chuàng)建
在數(shù)據(jù)庫中創(chuàng)建 topic,使用 SQL 語句 create topic 即可。create topic 這個 SQL 如何寫,實際上定義了 topic 對應(yīng)的數(shù)據(jù)粒度,包括哪些數(shù)據(jù)庫、超級表、子表、列、行。值得一提的是,TDengine 的 SQL 支持訂閱 database、 supertable、subquery 這 3 種模式。CREATE TOPIC topic_name [WITH META] AS DATABASE db_name; 這種 SQL 可以直接訂閱整個 database;CREATE TOPIC topic_name AS STABLE stb_name 這種 SQL 可以訂閱某個超級表;訂閱子查詢是最普遍的場景。例如:
CREATE TOPIC topic_name AS SELECT ts,voltage,location FROM testdb.meters WHERE voltage > 220.0 and location in ('北京','天津');
上面這個 SQL,訂閱了 testdb 數(shù)據(jù)庫中的 meters 超級表,通過 where 子句過濾滿足以下條件:location(tag 列)為“北京”或“天津”的子表,且 voltage 超過 220.0 的 ts、voltage、location 的數(shù)據(jù)。
黃金搭檔:流式計算 + 數(shù)據(jù)訂閱
以智能電表的場景為例,如果我想每 10 分鐘計算一次電壓的平均值,并在平均電壓高于 220V 就進行上報。對于這種需求,單純用 TDengine 的數(shù)據(jù)訂閱功能是不行的,因為 create topic 的子查詢不支持聚合查詢。這個時候,就需要用 TDengine 的流式計算 + 數(shù)據(jù)訂閱這對黃金搭檔了。如下:
CREATE STREAM stream_name TRIGGER WINDOW_CLOSE IGNORE EXPIRED 1
INTO stb_name
AS SELECT _wend as ts, avg(voltage) as voltage, last_row(location) as location
FROM testdb.meters
WHERE location in ('北京', '天津')
PARTITION BY location
INTERVAL(10m);
CREATE TOPIC topic_name AS SELECT * FROM stream_name where voltage > 220.0;
上面的 2 條 SQL 中,第一條 SQL 創(chuàng)建了一個 stream:以 location 分組,計算每 10 分鐘的“北京”、“天津”的平均電壓;用時間窗口的結(jié)束 _wend 作為時間戳 ts;avg(voltage) 計算 voltage 平均值;時間窗口的最后一條 last_row(location) 作為標(biāo)簽。同時,這個 stream 以 WINDOW_CLOSE 作為計算窗口的觸發(fā)模式,過期策略為 IGNORE EXPIRED 1。
第二條 SQL 創(chuàng)建了子查詢訂閱,用于過濾每 10 分鐘平均電壓高于 220V 的數(shù)據(jù)。這樣我們就創(chuàng)建了一個可以被消費的 topic,消費到的數(shù)據(jù)為高于 220V 的 10 分鐘平均電壓,滿足了前面所說的監(jiān)控場景的需求。
消費 topic:很像 Kafka
在應(yīng)用中消費 topic 的數(shù)據(jù),需要按照各種連接器的 API 來使用,具體使用方式請參考官方文檔:https://docs.taosdata.com/。在這里,我只對 TDengine 和訂閱消費 topic 的一些配置參數(shù)進行梳理。
- 連接相關(guān)的參數(shù),java connector 中使用
bootstrap.servers一個參數(shù)代替了td.connect.ip和td.connect.port,使用了和 Kafka 一樣的參數(shù)名。td.connect.user和td.connect.pass仍然需要設(shè)置。 group.id:和 Kafka 一樣,多個線程可以共同消費同一個 topic,只要它們使用同一個 group.id。TDengine 的 vgroup 與 Kafka 的 partition 在概念上是對應(yīng)的。同一個 group.id 中,一個 vgroup 最多只對應(yīng)一個 consumer。如果 consumer 數(shù)量大于 vgroup 的數(shù)量,則有些 consumer 消費不到數(shù)據(jù)。auto.offset.reset:這個參數(shù)和 Kafka 的行為不一樣。如果 group.id 為新值,在設(shè)置 earliest 時,訂閱從頭消費數(shù)據(jù);設(shè)置為 latest 時,從最新數(shù)據(jù)開始訂閱。當(dāng) group.id 為已存在的值時,不管auto.offset.reset為何值,都會從最后一個 offset 開始,繼續(xù)消費。enable.auto.commit:建議設(shè)置為 false。開啟自動提交 offset,TDengine 的 commit 自動提交機制是輪詢提交。auto.commit.interval.ms:建議不設(shè)置。如果enable.auto.commit為 true,自動提交 commit 的間隔為auto.commit.interval.ms設(shè)置的值。enable.heartbeat.background:建議設(shè)置為 true,默認(rèn)值為 true。如果設(shè)置為 false,在應(yīng)用長時間不主動 poll 數(shù)據(jù)時,可能會造成當(dāng)前 consumer 的離線。在 TDengine 的實現(xiàn)上,heartbeat 的 interval 被設(shè)置成了 1 秒。msg.with.table.name:建議設(shè)置成 true。在訂閱超級表和數(shù)據(jù)庫時添加了 WITH META,應(yīng)該開啟這個設(shè)置。例如:訂閱為CREATE TOPIC topic_name WITH META AS STABLE stb時,配置msg.with.table.name為 true,則消費時可以獲取到 tableName。
Show U The Code
到此,本文介紹了有關(guān) TDengine3.0 的數(shù)據(jù)訂閱功能的諸多細(xì)節(jié)。我相信,上面的內(nèi)容應(yīng)該可以為你使用數(shù)據(jù)訂閱功能提供一些思路和幫助。但是,對程序員來說,“Talk is cheap. Show me the code”。下面,我列舉了一些 Java 的示例代碼,供你參考。
- subscribeDemo-java
這個 java 工程實現(xiàn)了一個最簡單的訂閱功能,從 TDengine 中訂閱一個 topic ,并將消費到的數(shù)據(jù)寫到文件中。值得一提的是,代碼使用 bytebuddy 動態(tài)生成了 Java POJO 類和對應(yīng)的 Deserializer 類。因此,你只需要在 schema.txt 內(nèi)寫好 topic 對應(yīng)的字段,就可以不寫代碼,直接訂閱不同 topic 的數(shù)據(jù)了。
鏈接:https://github.com/taosdata/subscribeDemo-java
- SubscribeDemo
這個頁面展示了一段最基本的數(shù)據(jù)訂閱的代碼。main 方法中,包括了在 TDengine 中創(chuàng)建數(shù)據(jù)庫、表、topic 的操作,并從 topic 中消費數(shù)據(jù)進行打印。
- WebsocketSubscribeDemo
這個頁面的代碼和 SubscribeDemo 相比,僅有的區(qū)別是其配置了 td.connect.type 參數(shù)為 ws,即:使用 websocket 連接 taosadapter,這樣的好處是不用安裝客戶端。
結(jié)語
相信借助本篇文章,你一定能夠流暢體驗到 TDengine 的數(shù)據(jù)訂閱功能,有需要的讀者可以收藏備用。對于更為復(fù)雜的應(yīng)用問題,也歡迎大家加入 TDengine 的開發(fā)者交流群(添加小T vx:tdengine),直接向社區(qū)技術(shù)支持人員尋求幫助。關(guān)于 TDengine 3.0 的更多示例代碼,請參考:https://github.com/taosdata/TDengine/tree/main/docs/examples。



互聯(lián)網(wǎng).png)



-1.png)







證.png)


伙伴.png)
伙伴.png)
伙伴.png)



