六月婷婷AV,国产偷窥猎奇福利二区,日韩三级片。,好吊色网站,日韩成人中文在线视频,国产亚洲午夜啪啪,亚洲欧美另类国产精品,国产成人av1,任你艹在线观看

TDengine中訂閱的用途和用法

本文將介紹TDengine Database訂閱功能的使用場(chǎng)景、使用方法和一些限制,并與InfluxDB的訂閱功能進(jìn)行簡(jiǎn)單的對(duì)比。本文的預(yù)期讀者是基于TDengine開發(fā)各種應(yīng)用的軟件開發(fā)人員。

什么是訂閱?

訂閱,是一種數(shù)據(jù)查詢方式,其特點(diǎn)為:客戶端執(zhí)行一個(gè)查詢語句后,可以增量形式,不斷收到新到達(dá)服務(wù)端的、符合查詢條件的數(shù)據(jù)。訂閱的實(shí)現(xiàn)模型有兩種,一種是“推”,即服務(wù)器主動(dòng)將數(shù)據(jù)發(fā)到客戶端;另一種是“拉”,即客戶端主動(dòng)向服務(wù)器請(qǐng)求數(shù)據(jù)。兩種方式各有優(yōu)缺點(diǎn),這里不做詳細(xì)的對(duì)比,只是說明一下,TDengine Database使用的是“拉”模型。

什么時(shí)候需要使用訂閱?

為了便于用戶程序消費(fèi)TDengine Database中的數(shù)據(jù),TDengine實(shí)現(xiàn)了基于SQL的數(shù)據(jù)查詢語法,并提供了豐富的聚合函數(shù),這種方式的優(yōu)勢(shì)已在多個(gè)實(shí)際案例中得到了體現(xiàn)。但由于時(shí)序數(shù)據(jù)的特點(diǎn),單純的直接數(shù)據(jù)查詢并不能滿足用戶程序的需求,比如:我們管理著一批溫度測(cè)量設(shè)備,希望當(dāng)某個(gè)設(shè)備檢測(cè)到的溫度超過限制(比如80°C)后能得到通知并進(jìn)行一些處理時(shí),肯定會(huì)先為所有的設(shè)備建立一張超級(jí)表:

create database test;
use test;
create table devices (ts timestamp, temperature float) tags(id int);

并為每個(gè)設(shè)備創(chuàng)建一張子表:

create table device1 using devices tags(1);
create table device2 using devices tags(2);
...

這種設(shè)計(jì)滿足了設(shè)備管理的需求,但如何滿足溫度監(jiān)測(cè)的需求呢?如果僅使用普通的查詢,有兩種方法:一是分別對(duì)每張子表進(jìn)行查詢,每次查詢后記錄最后一條數(shù)據(jù)的時(shí)間戳,后續(xù)只查詢這個(gè)時(shí)間戳之后的數(shù)據(jù):

select * from device1 where ts > last_timestamp1 and temperature > 80;
select * from device2 where ts > last_timestamp2 and temperature > 80;
...

這確實(shí)可行,但隨著設(shè)備數(shù)量的增加,查詢數(shù)量也會(huì)增加,客戶端和服務(wù)端的性能都會(huì)受到影響,當(dāng)設(shè)備數(shù)增長(zhǎng)到一定的程度,系統(tǒng)就無法承受了。

另一種方法是對(duì)超級(jí)表進(jìn)行查詢。這樣,無論有多少設(shè)備,都只需一次查詢:

select * from devices where ts > last_timestamp and temperature > 80;

但是,如何選擇 last_timestamp 就成了一個(gè)新的問題。因?yàn)椋环矫鏀?shù)據(jù)的產(chǎn)生時(shí)間(也就是數(shù)據(jù)時(shí)間戳)和數(shù)據(jù)入庫(kù)的時(shí)間一般并不相同,有時(shí)偏差還很大;另一方面,不同設(shè)備的數(shù)據(jù)到達(dá)TDengine的時(shí)間也會(huì)有差異。所以,如果我們?cè)诓樵冎惺褂米盥哪桥_(tái)設(shè)備的數(shù)據(jù)的時(shí)間戳作為 last_timestamp ,就可能重復(fù)讀入其它設(shè)備的數(shù)據(jù);如果使用最快的設(shè)備的時(shí)間戳,其它設(shè)備的數(shù)據(jù)就可能被漏掉。

TDengine的訂閱功能為上面這個(gè)問題提供了一個(gè)徹底的解決方案。

如何使用TDengine中的訂閱功能?

TDengine的API中,與訂閱相關(guān)的主要有以下三個(gè):

  • taos_subscribe
  • taos_consume
  • taos_unsubscribe

這三個(gè)API的具體說明請(qǐng)見《C/C++數(shù)據(jù)訂閱接口》,下面結(jié)合一個(gè)示例,介紹下其使用方法,完整的示例代碼可以在這里找到。

首先是創(chuàng)建訂閱:

TAOS_SUB* tsub = NULL;
if (async) {
  // create an asynchronized subscription, the callback function will be called every 1s
  tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else {
  // create an synchronized subscription, need to call 'taos_consume' manually
  tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
}

TDengine中的訂閱既可以是同步的,也可以是異步的,上面的代碼會(huì)根據(jù)從命令行獲取的參數(shù)async的值來決定使用哪種方式。這里,同步的意思是用戶程序要直接調(diào)用 taos_consume來拉取數(shù)據(jù),而異步則由API在內(nèi)部的另一個(gè)線程中調(diào)用taos_consume,然后把拉取到的數(shù)據(jù)交給回調(diào)函數(shù) subscribe_callback去處理。

參數(shù)taos是一個(gè)已經(jīng)建立好的數(shù)據(jù)庫(kù)連接,在同步模式下無特殊要求。但在異步模式下,需要注意它不會(huì)被其它線程使用,否則可能導(dǎo)致不可預(yù)計(jì)的錯(cuò)誤,因?yàn)榛卣{(diào)函數(shù)在API的內(nèi)部線程中被調(diào)用,而TDengine的部分API不是線程安全的。

參數(shù)sql是查詢語句,可以在其中使用where子句指定過濾條件。回到開頭的例子,如果我們只想訂閱設(shè)備溫度超過 80°C 時(shí)的數(shù)據(jù),可以這樣寫:

select * from devices where temperature > 80;

注意,這里沒有指定起始時(shí)間,所以會(huì)讀到所有時(shí)間的數(shù)據(jù)。如果只想從一天前的數(shù)據(jù)開始訂閱,而不需要更早的歷史數(shù)據(jù),可以再加上一個(gè)時(shí)間條件:

select * from devices where ts > now - 1d and temperature > 80;

訂閱的topic實(shí)際上是它的名字,因?yàn)橛嗛喒δ苁窃诳蛻舳薃PI中實(shí)現(xiàn)的,所以沒必要保證它全局唯一,但需要它在一臺(tái)客戶端機(jī)器上唯一。

如果名topic的訂閱不存在,參數(shù)restart沒有意義;但如果用戶程序創(chuàng)建這個(gè)訂閱后退出,當(dāng)它再次啟動(dòng)并重新使用這個(gè)topic時(shí),restart就會(huì)被用于決定是從頭開始讀取數(shù)據(jù),還是接續(xù)上次的位置進(jìn)行讀取。本例中,如果restarttrue(非零值),用戶程序肯定會(huì)讀到所有數(shù)據(jù)。但如果這個(gè)訂閱之前就存在了,并且已經(jīng)讀取了一部分?jǐn)?shù)據(jù),且restartfalse(0),用戶程序就不會(huì)讀到之前已經(jīng)讀取的數(shù)據(jù)了。

taos_subscribe的最后一個(gè)參數(shù)是以毫秒為單位的輪詢周期。在同步模式下,如過前后兩次調(diào)用taos_consume的時(shí)間間隔小于此時(shí)間,taos_consume會(huì)阻塞,直到間隔超過此時(shí)間。異步模式下,這個(gè)時(shí)間是兩次調(diào)用回調(diào)函數(shù)的最小時(shí)間間隔。

taos_subscribe的倒數(shù)第二個(gè)參數(shù)用于用戶程序向回調(diào)函數(shù)傳遞附加參數(shù),訂閱API不對(duì)其做任何處理,只原樣傳遞給回調(diào)函數(shù)。此參數(shù)在同步模式下無意義。

訂閱創(chuàng)建以后,就可以消費(fèi)其數(shù)據(jù)了,同步模式下,示例代碼是下面的 else 部分:

if (async) {
  getchar();
} else while(1) {
  TAOS_RES* res = taos_consume(tsub);
  if (res == NULL) {
    printf("failed to consume data.");
    break;
  } else {
    print_result(res, blockFetch);
    getchar();
  }
}

這里是一個(gè)while循環(huán),用戶每按一次回車鍵就調(diào)用一次taos_consume,而taos_consume的返回值是查詢到的結(jié)果集,與taos_use_result完全相同,例子中使用這個(gè)結(jié)果集的代碼是函數(shù)print_result

void print_result(TAOS_RES* res, int blockFetch) {
  TAOS_ROW row = NULL;
  int num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
  int nRows = 0;
  if (blockFetch) {
    nRows = taos_fetch_block(res, &row);
    for (int i = 0; i < nRows; i++) {
      char temp[256];
      taos_print_row(temp, row + i, fields, num_fields);
      puts(temp);
    }
  } else {
    while ((row = taos_fetch_row(res))) {
      char temp[256];
      taos_print_row(temp, row, fields, num_fields);puts(temp);
      nRows++;
    }
  }
  printf("%d rows consumed.\n", nRows);
}

其中的 taos_print_row 用于處理訂閱到數(shù)據(jù),在我們的例子中,它會(huì)打印出所有符合條件的記錄。而異步模式下,消費(fèi)訂閱到的數(shù)據(jù)則顯得更為簡(jiǎn)單:

void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  print_result(res, *(int*)param);
}

當(dāng)要結(jié)束一次數(shù)據(jù)訂閱時(shí),需要調(diào)用taos_unsubscribe:

taos_unsubscribe(tsub, keep);

其第二個(gè)參數(shù),用于決定是否在客戶端保留訂閱的進(jìn)度信息,如果大家還記得前面說過“訂閱功能是在客戶端API中實(shí)現(xiàn)的”的話,應(yīng)該可以猜到,如果這個(gè)參數(shù)是false(0),那無論下次調(diào)用taos_subscribe的時(shí)的restart參數(shù)是什么,訂閱都只能重新開始了。另外,進(jìn)度信息的保存位置是{DataDir}/subscribe/,這個(gè)目錄下,每個(gè)訂閱有一個(gè)與其topic同名的文件,刪掉某個(gè)文件,同樣會(huì)導(dǎo)致下次創(chuàng)建其對(duì)應(yīng)的訂閱時(shí)只能重新開始。

代碼介紹完畢,我們來看一下實(shí)際的運(yùn)行效果。假設(shè):

  • 示例代碼已經(jīng)下載到本地
  • TDengine 也已經(jīng)在同一臺(tái)機(jī)器上安裝好
  • 已經(jīng)按照本文開頭的腳本創(chuàng)建數(shù)據(jù)庫(kù)、超級(jí)表和一些子表

則可以在示例代碼所在目錄執(zhí)行以下命令來編譯并啟動(dòng)示例程序:

$ make
$ ./subscribe -sql='select * from devices where temperature > 80;'

示例程序啟動(dòng)后,打開另一個(gè)終端窗口,啟動(dòng) TDengine 的 shell 向 device1 插入一條溫度為 90 °C 的數(shù)據(jù):

$ taos
> use test;
> insert into device1 values(0, 90);

這時(shí),因?yàn)闇囟瘸^了 80 °C ,您應(yīng)該可以看到示例程序?qū)⑺敵龅搅似聊簧稀D梢岳^續(xù)插入一些數(shù)據(jù)觀察示例程序的輸出。

用作消息隊(duì)列

本文開頭的例子,是用訂閱實(shí)現(xiàn)了一個(gè)報(bào)警監(jiān)控的功能,但其實(shí)訂閱也可以用在其它場(chǎng)景中,比如:消息隊(duì)列。

應(yīng)用程序可以訂閱數(shù)據(jù)庫(kù)某些表的內(nèi)容,同一個(gè)表也可以被多個(gè)應(yīng)用訂閱,一旦表有新的記錄,應(yīng)用將立即得到通知。這樣,再把數(shù)據(jù)插入看做Publish操作,用戶完全可以把TDengine作為一個(gè)消息隊(duì)列中間件來使用。

所以,當(dāng)下次面對(duì)需要使用Kafka的場(chǎng)景時(shí),不妨先考慮下TDengine,因?yàn)門Dengine除了安裝包超小、運(yùn)維超簡(jiǎn)單的優(yōu)點(diǎn)外,還有一個(gè)Kafka不具備的功能——數(shù)據(jù)過濾:可以在查詢語句中指定過濾條件,保證讀到的數(shù)據(jù)都是有用的,不用再在代碼中手寫過濾邏輯了。

與InfluxDB的對(duì)比

概念上說,InfluxDB的訂閱和TDengine的訂閱區(qū)別很大,我們可以認(rèn)為訂閱在InfluxDB中更像一種數(shù)據(jù)同步機(jī)制,而TDengine中的訂閱則是一種數(shù)據(jù)查詢機(jī)制:

  • InfluxDB將收到的數(shù)據(jù)實(shí)時(shí)推送給其它節(jié)點(diǎn),TDengine通過輪詢的方式拉取數(shù)據(jù),InfluxDB具有更好的實(shí)時(shí)性。
  • InfluxDB中只能訂閱全部數(shù)據(jù),TDengine中可以指定數(shù)據(jù)過濾條件。
  • InfluxDB中只能訂閱當(dāng)前時(shí)間之后的數(shù)據(jù),TDengine中可以在訂閱中讀到歷史數(shù)據(jù)。

所以,兩相對(duì)比,InfluxDB的優(yōu)勢(shì)是實(shí)時(shí)性,而TDengine則以稍微犧牲實(shí)時(shí)性為代價(jià)提供了更強(qiáng)大的功能。

限制條件

下面是一些TDengine訂閱功能的局限,大家需要在使用中注意。

  • 訂閱的查詢語句只能是 select 語句,只能查詢?cè)紨?shù)據(jù)(不支持聚合函數(shù)),只能按時(shí)間正序查詢數(shù)據(jù)。
  • 在滿足應(yīng)用需求的情況下,請(qǐng)盡量將輪詢周期設(shè)置的大一些,否則會(huì)對(duì)系統(tǒng)性能造成影響。
  • 暫不支持亂序數(shù)據(jù),用戶程序可能讀不到使用import方式插入的數(shù)據(jù)。
  • 如果用戶程序異常退出或沒有正確調(diào)用taos_unsubscribe,進(jìn)度信息可能會(huì)有錯(cuò)誤,這時(shí),后續(xù)的同名訂閱可能讀到之前已經(jīng)讀過的數(shù)據(jù)。