本文介紹了 TAOS-JDBCDriver 訂閱功能的使用場景、使用方法和一些限制。本文的預(yù)期讀者是基于 TAOS-JDBCDriver 開發(fā)各種應(yīng)用的軟件開發(fā)人員。
如何使用TDengine中的訂閱功能?
TDengine的Java訂閱接口,目前是與TAOS-JDBCDriver 的 API 配合使用,相關(guān)源碼文件也在TDengine JDBC驅(qū)動的源碼所在目錄下:TSDBSubscribe.java和TSDBSubscribeCallBack.java。與訂閱相關(guān)的方法主要有以下三個,均在TSDBSubscribe.java中定義和實(shí)現(xiàn):
subscribe
public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack) throws SQLException
訂閱一個topic,并設(shè)置訂閱內(nèi)容、輪詢周期、以及異步調(diào)用選擇。如果訂閱成功,此方法會返回一個long型值(本質(zhì)是連接的一個指針值,此處用于表征此成功建立的訂閱);如果訂閱失敗,則此方法應(yīng)直接拋出異常。
topic:訂閱的名字。
sql:訂閱的內(nèi)容,用戶可以傳入一條查詢SQL語句,此時訂閱關(guān)注的內(nèi)容為此查詢結(jié)果集中的新增記錄。
period:訂閱執(zhí)行時,內(nèi)部輪詢的時間。
callBack:執(zhí)行異步訂閱時的回調(diào),空值表示不使用異步。
public TSDBResultSet consume(long subscription) throws OperationsException, SQLException
獲取最新的訂閱結(jié)果,訂閱結(jié)果返回為一個TSDBResultSet類的實(shí)例。
subscription:subscribe成功后返回的long型值。
public void unsubscribe(long subscription, boolean isKeep) throws SQLException
取消已經(jīng)建立的訂閱,如果失敗則直接拋出異常。
subscription:subscribe成功后返回的long型值。
isKeep:是否保持訂閱的記錄。
下面結(jié)合一個示例,介紹下其使用方法。
首先是創(chuàng)建訂閱:
/** * sync subscribe * * @param topic * @param sql * @param restart * @param period * @return * @throws SQLException */ public long subscribe(String topic, String sql, boolean restart, int period) /** * async subscribe * * @param topic * @param sql * @param restart * @param period * @param callBack * @throws SQLException */ public long subscribe(String topic, String sql, boolean restart, int period, TSDBSubscribeCallBack callBack)
Class.forName("com.taosdata.jdbc.TSDBDriver");
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_HOST, host);
connection = DriverManager.getConnection("jdbc:TAOS://" + host + ":" + port + "/" + dbName + "?user=root&password=taosdata"
, properties);
String rawSql = "select * from devices ;";
subscribe = ((TSDBConnection) connection).createSubscribe();
subscribId = subscribe.subscribe(topic, rawSql, false, 1000);
TDengine中的訂閱既可以是同步的,也可以是異步的。這里,同步的意思是用戶程序要直接調(diào)用 consume 來拉取數(shù)據(jù),而異步則由 API 在內(nèi)部的另一個線程中調(diào)用 consume,然后把拉取到的數(shù)據(jù)交給回調(diào)函數(shù) callback 去處理。
注意,這里沒有指定起始時間,所以會讀到所有時間的數(shù)據(jù)。如果只想從一天前的數(shù)據(jù)開始訂閱,而不需要更早的歷史數(shù)據(jù),可以再加上一個時間條件:
select * from devices where ts > now - 1d and temperature > 80;
訂閱的 topic 實(shí)際上是它的名字,因?yàn)橛嗛喒δ苁窃诳蛻舳?API 中實(shí)現(xiàn)的,所以沒必要保證它全局唯一,但需要它在一臺客戶端機(jī)器上唯一。
如果名稱為 topic 的訂閱不存在,參數(shù) restart 沒有意義;但如果用戶程序創(chuàng)建這個訂閱后退出,當(dāng)它再次啟動并重新使用這個 topic 時,restart 就會被用于決定是從頭開始讀取數(shù)據(jù),還是接續(xù)上次的位置進(jìn)行讀取。本例中,如果 restart 是 true,用戶程序肯定會讀到所有數(shù)據(jù)。但如果這個訂閱之前就存在了,并且已經(jīng)讀取了一部分?jǐn)?shù)據(jù),且 restart 是 false,用戶程序就不會讀到之前已經(jīng)讀取的數(shù)據(jù)了。
subscribe 的最后一個參數(shù)是以毫秒為單位的輪詢周期(間隔需要大于 1000 )。在同步模式下,如過前后兩次調(diào)用 consume 的時間間隔小于此時間,consume 會阻塞,直到間隔超過此時間。異步模式下,這個時間是兩次調(diào)用回調(diào)函數(shù)的最小時間間隔。
當(dāng)要結(jié)束一次數(shù)據(jù)訂閱時,需要調(diào)用 unsubscribe:
/**
* cancel subscribe
*
* @param subscription
* @param isKeep
* @throws SQLException
*/
public void unsubscribe(long subscription, boolean isKeep)
其第二個參數(shù),用于決定是否在客戶端保留訂閱的進(jìn)度信息,如果大家還記得前面說過“訂閱功能是在客戶端 API 中實(shí)現(xiàn)的”的話,應(yīng)該可以猜到,如果這個參數(shù)是 false,那無論下次調(diào)用 subscribe 的時的 restart 參數(shù)是什么,訂閱都只能重新開始了。另外,進(jìn)度信息的保存位置是 {DataDir}/subscribe/,這個目錄下,每個訂閱有一個與其 topic 同名的文件,刪掉某個文件,同樣會導(dǎo)致下次創(chuàng)建其對應(yīng)的訂閱時只能重新開始。
限制條件
下面是一些 TDengine 訂閱功能的局限,大家需要在使用中注意。
- 訂閱的查詢語句只能是
select語句,只能查詢原始數(shù)據(jù)(不支持聚合函數(shù)),只能按時間正序查詢數(shù)據(jù)。 - 在滿足應(yīng)用需求的情況下,請盡量將輪詢周期設(shè)置的大一些,否則會對系統(tǒng)性能造成影響。
- 暫不支持亂序數(shù)據(jù),用戶程序可能讀不到使用 import 方式插入的數(shù)據(jù)。
- 如果用戶程序異常退出或沒有正確調(diào)用 unsubscribe,進(jìn)度信息可能會有錯誤,這時,后續(xù)的同名訂閱可能讀到之前已經(jīng)讀過的數(shù)據(jù)。



互聯(lián)網(wǎng).png)



-1.png)







證.png)


伙伴.png)
伙伴.png)
伙伴.png)



