眾所周知,在 TDengine 3.0 中,我們對(duì)數(shù)據(jù)訂閱功能進(jìn)行了全面升級(jí),以便大家可以更加便捷地實(shí)時(shí)訂閱和獲取數(shù)據(jù)的更新,完成實(shí)時(shí)監(jiān)控、數(shù)據(jù)分析和有效報(bào)警等工作。在本文中,TDengine 資深研發(fā)將以 TDengine 3.0 為對(duì)象,為大家介紹數(shù)據(jù)訂閱功能的正確打開方式,給到有需要的人作參考指南,避免走入應(yīng)用誤區(qū)。
本文將從 Java Developer 的視角來介紹如何使用 TDengine 3.0 的數(shù)據(jù)訂閱功能。
TDengine 3.0 的版本迭代很快,可能有些配置參數(shù)或細(xì)節(jié)在之后的版本會(huì)發(fā)生變化,本文對(duì)應(yīng) TDengine 版本為 3.0.3.0。
寫在前面
在官方文檔里已經(jīng)有介紹,TDengine 的數(shù)據(jù)訂閱是什么以及如何使用,有需要的朋友可以通過下方鏈接進(jìn)入官網(wǎng)查看相關(guān)介紹:
- 開發(fā)指南:https://docs.tdengine.com/taos-sql/
- SQL:https://docs.tdengine.com/taos-sql/tmq/
- Java 使用數(shù)據(jù)訂閱:https://docs.taosdata.com/connector/java/#%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85
總結(jié)一下,我理解的數(shù)據(jù)訂閱功能是以“訂閱”的方式獲取存在于 TDengine 中的數(shù)據(jù)。一般情況下,“訂閱”意味著的業(yè)務(wù)需求是訂閱數(shù)據(jù)庫(kù)中的最新數(shù)據(jù)。“訂閱”的流程很簡(jiǎn)單:(1)在數(shù)據(jù)庫(kù)中創(chuàng)建 topic;(2)在應(yīng)用中消費(fèi) topic 的數(shù)據(jù)。
基本操作:創(chuàng)建
在數(shù)據(jù)庫(kù)中創(chuàng)建 topic,使用 SQL 語(yǔ)句 create topic 即可。create topic 這個(gè) SQL 如何寫,實(shí)際上定義了 topic 對(duì)應(yīng)的數(shù)據(jù)粒度,包括哪些數(shù)據(jù)庫(kù)、超級(jí)表、子表、列、行。值得一提的是,TDengine 的 SQL 支持訂閱 database、 supertable、subquery 這 3 種模式。CREATE TOPIC topic_name [WITH META] AS DATABASE db_name; 這種 SQL 可以直接訂閱整個(gè) database;CREATE TOPIC topic_name AS STABLE stb_name 這種 SQL 可以訂閱某個(gè)超級(jí)表;訂閱子查詢是最普遍的場(chǎng)景。例如:
CREATE TOPIC topic_name AS SELECT ts,voltage,location FROM testdb.meters WHERE voltage > 220.0 and location in ('北京','天津');
上面這個(gè) SQL,訂閱了 testdb 數(shù)據(jù)庫(kù)中的 meters 超級(jí)表,通過 where 子句過濾滿足以下條件:location(tag 列)為“北京”或“天津”的子表,且 voltage 超過 220.0 的 ts、voltage、location 的數(shù)據(jù)。
黃金搭檔:流式計(jì)算 + 數(shù)據(jù)訂閱
以智能電表的場(chǎng)景為例,如果我想每 10 分鐘計(jì)算一次電壓的平均值,并在平均電壓高于 220V 就進(jìn)行上報(bào)。對(duì)于這種需求,單純用 TDengine 的數(shù)據(jù)訂閱功能是不行的,因?yàn)?create topic 的子查詢不支持聚合查詢。這個(gè)時(shí)候,就需要用 TDengine 的流式計(jì)算 + 數(shù)據(jù)訂閱這對(duì)黃金搭檔了。如下:
CREATE STREAM stream_name TRIGGER WINDOW_CLOSE IGNORE EXPIRED 1
INTO stb_name
AS SELECT _wend as ts, avg(voltage) as voltage, last_row(location) as location
FROM testdb.meters
WHERE location in ('北京', '天津')
PARTITION BY location
INTERVAL(10m);
CREATE TOPIC topic_name AS SELECT * FROM stream_name where voltage > 220.0;
上面的 2 條 SQL 中,第一條 SQL 創(chuàng)建了一個(gè) stream:以 location 分組,計(jì)算每 10 分鐘的“北京”、“天津”的平均電壓;用時(shí)間窗口的結(jié)束 _wend 作為時(shí)間戳 ts;avg(voltage) 計(jì)算 voltage 平均值;時(shí)間窗口的最后一條 last_row(location) 作為標(biāo)簽。同時(shí),這個(gè) stream 以 WINDOW_CLOSE 作為計(jì)算窗口的觸發(fā)模式,過期策略為 IGNORE EXPIRED 1。
第二條 SQL 創(chuàng)建了子查詢訂閱,用于過濾每 10 分鐘平均電壓高于 220V 的數(shù)據(jù)。這樣我們就創(chuàng)建了一個(gè)可以被消費(fèi)的 topic,消費(fèi)到的數(shù)據(jù)為高于 220V 的 10 分鐘平均電壓,滿足了前面所說的監(jiān)控場(chǎng)景的需求。
消費(fèi) topic:很像 Kafka
在應(yīng)用中消費(fèi) topic 的數(shù)據(jù),需要按照各種連接器的 API 來使用,具體使用方式請(qǐng)參考官方文檔:https://docs.taosdata.com/。在這里,我只對(duì) TDengine 和訂閱消費(fèi) topic 的一些配置參數(shù)進(jìn)行梳理。
- 連接相關(guān)的參數(shù),java connector 中使用
bootstrap.servers一個(gè)參數(shù)代替了td.connect.ip和td.connect.port,使用了和 Kafka 一樣的參數(shù)名。td.connect.user和td.connect.pass仍然需要設(shè)置。 group.id:和 Kafka 一樣,多個(gè)線程可以共同消費(fèi)同一個(gè) topic,只要它們使用同一個(gè) group.id。TDengine 的 vgroup 與 Kafka 的 partition 在概念上是對(duì)應(yīng)的。同一個(gè) group.id 中,一個(gè) vgroup 最多只對(duì)應(yīng)一個(gè) consumer。如果 consumer 數(shù)量大于 vgroup 的數(shù)量,則有些 consumer 消費(fèi)不到數(shù)據(jù)。auto.offset.reset:這個(gè)參數(shù)和 Kafka 的行為不一樣。如果 group.id 為新值,在設(shè)置 earliest 時(shí),訂閱從頭消費(fèi)數(shù)據(jù);設(shè)置為 latest 時(shí),從最新數(shù)據(jù)開始訂閱。當(dāng) group.id 為已存在的值時(shí),不管auto.offset.reset為何值,都會(huì)從最后一個(gè) offset 開始,繼續(xù)消費(fèi)。enable.auto.commit:建議設(shè)置為 false。開啟自動(dòng)提交 offset,TDengine 的 commit 自動(dòng)提交機(jī)制是輪詢提交。auto.commit.interval.ms:建議不設(shè)置。如果enable.auto.commit為 true,自動(dòng)提交 commit 的間隔為auto.commit.interval.ms設(shè)置的值。enable.heartbeat.background:建議設(shè)置為 true,默認(rèn)值為 true。如果設(shè)置為 false,在應(yīng)用長(zhǎng)時(shí)間不主動(dòng) poll 數(shù)據(jù)時(shí),可能會(huì)造成當(dāng)前 consumer 的離線。在 TDengine 的實(shí)現(xiàn)上,heartbeat 的 interval 被設(shè)置成了 1 秒。msg.with.table.name:建議設(shè)置成 true。在訂閱超級(jí)表和數(shù)據(jù)庫(kù)時(shí)添加了 WITH META,應(yīng)該開啟這個(gè)設(shè)置。例如:訂閱為CREATE TOPIC topic_name WITH META AS STABLE stb時(shí),配置msg.with.table.name為 true,則消費(fèi)時(shí)可以獲取到 tableName。
Show U The Code
到此,本文介紹了有關(guān) TDengine3.0 的數(shù)據(jù)訂閱功能的諸多細(xì)節(jié)。我相信,上面的內(nèi)容應(yīng)該可以為你使用數(shù)據(jù)訂閱功能提供一些思路和幫助。但是,對(duì)程序員來說,“Talk is cheap. Show me the code”。下面,我列舉了一些 Java 的示例代碼,供你參考。
- subscribeDemo-java
這個(gè) java 工程實(shí)現(xiàn)了一個(gè)最簡(jiǎn)單的訂閱功能,從 TDengine 中訂閱一個(gè) topic ,并將消費(fèi)到的數(shù)據(jù)寫到文件中。值得一提的是,代碼使用 bytebuddy 動(dòng)態(tài)生成了 Java POJO 類和對(duì)應(yīng)的 Deserializer 類。因此,你只需要在 schema.txt 內(nèi)寫好 topic 對(duì)應(yīng)的字段,就可以不寫代碼,直接訂閱不同 topic 的數(shù)據(jù)了。
鏈接:https://github.com/taosdata/subscribeDemo-java
- SubscribeDemo
這個(gè)頁(yè)面展示了一段最基本的數(shù)據(jù)訂閱的代碼。main 方法中,包括了在 TDengine 中創(chuàng)建數(shù)據(jù)庫(kù)、表、topic 的操作,并從 topic 中消費(fèi)數(shù)據(jù)進(jìn)行打印。
- WebsocketSubscribeDemo
這個(gè)頁(yè)面的代碼和 SubscribeDemo 相比,僅有的區(qū)別是其配置了 td.connect.type 參數(shù)為 ws,即:使用 websocket 連接 taosadapter,這樣的好處是不用安裝客戶端。
結(jié)語(yǔ)
相信借助本篇文章,你一定能夠流暢體驗(yàn)到 TDengine 的數(shù)據(jù)訂閱功能,有需要的讀者可以收藏備用。對(duì)于更為復(fù)雜的應(yīng)用問題,也歡迎大家加入 TDengine 的開發(fā)者交流群(添加小T vx:tdengine),直接向社區(qū)技術(shù)支持人員尋求幫助。關(guān)于 TDengine 3.0 的更多示例代碼,請(qǐng)參考:https://github.com/taosdata/TDengine/tree/main/docs/examples。



互聯(lián)網(wǎng).png)



-1.png)




.png)


證.png)


伙伴.png)
伙伴.png)
伙伴.png)



