小 T 導(dǎo)讀:想用 Flink 對接 TDengine?保姆級教程來了。
0、前言
TDengine 是由濤思數(shù)據(jù)開發(fā)并開源的一款高性能、分布式、支持 SQL 的時序數(shù)據(jù)庫(Time Series Database)。
除了核心的時序數(shù)據(jù)庫功能外,TDengine 還提供緩存、數(shù)據(jù)訂閱、流式計算等大數(shù)據(jù)平臺所需要的系列功能。但是很多小伙伴出于架構(gòu)的考慮,還是需要將數(shù)據(jù)導(dǎo)出到 Apache Flink、Apache Spark 等平臺進(jìn)行計算分析。
為了幫助大家對接,我們特別推出了保姆級課程,包學(xué)包會。

1、技術(shù)實(shí)現(xiàn)
Apache Flink 提供了 SourceFunction 和 SinkFunction,用來提供 Flink 和外部數(shù)據(jù)源的連接,其中 SouceFunction 為從數(shù)據(jù)源讀取數(shù)據(jù),SinkFunction 為將數(shù)據(jù)寫入數(shù)據(jù)源。 與此同時,F(xiàn)link 提供了 RichSourceFunction 和 RichSinkFunction 這兩個類(繼承自AbstractRichFunction),提供了額外的初始化(open(Configuration))和銷毀方法(close())。 通過重寫這兩個方法,可以避免每次讀寫數(shù)據(jù)時都重新建立連接。
2、代碼實(shí)現(xiàn)
完整源碼:https://github.com/liuyq-617/TD-Flink
代碼邏輯:
1) 自定義類 SourceFromTDengine
用途:數(shù)據(jù)源連接,數(shù)據(jù)讀取
package com.taosdata.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.taosdata.model.Sensor;
import java.sql.*;
import java.util.Properties;
public class SourceFromTDengine extends RichSourceFunction<Sensor> {
Statement statement;
private Connection connection;
private String property;
public SourceFromTDengine(){
super();
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String driver = "com.taosdata.jdbc.rs.RestfulDriver";
String host = "u05";
String username = "root";
String password = "taosdata";
String prop = System.getProperty("java.library.path");
Logger LOG = LoggerFactory.getLogger(SourceFromTDengine.class);
LOG.info("java.library.path:{}", prop);
System.out.println(prop);
Class.forName( driver );
Properties properties = new Properties();
connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
, properties);
statement = connection.createStatement();
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (statement != null) {
statement.close();
}
}
@Override
public void run(SourceContext<Sensor> sourceContext) throws Exception {
try {
String sql = "select * from tt.meters";
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
Sensor sensor = new Sensor( resultSet.getLong(1),
resultSet.getInt( "vol" ),
resultSet.getFloat( "current" ),
resultSet.getString( "location" ).trim());
sourceContext.collect( sensor );
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void cancel() {
}
}
2) 自定義類 SinkToTDengine
用途:數(shù)據(jù)源連接,數(shù)據(jù)寫入
SinkToTDengine
package com.taosdata.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.taosdata.model.Sensor;
import java.sql.*;
import java.util.Properties;
public class SinkToTDengine extends RichSinkFunction<Sensor> {
Statement statement;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String driver = "com.taosdata.jdbc.rs.RestfulDriver";
String host = "TAOS-FQDN";
String username = "root";
String password = "taosdata";
String prop = System.getProperty("java.library.path");
System.out.println(prop);
Class.forName( driver );
Properties properties = new Properties();
connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
, properties);
statement = connection.createStatement();
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (statement != null) {
statement.close();
}
}
@Override
public void invoke(Sensor sensor, Context context) throws Exception {
try {
String sql = String.format("insert into sinktest.%s using sinktest.meters tags('%s') values(%d,%d,%f)",
sensor.getLocation(),
sensor.getLocation(),
sensor.getTs(),
sensor.getVal(),
sensor.getCurrent()
);
statement.executeUpdate(sql);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3) 自定義類 Sensor
用途:定義數(shù)據(jù)結(jié)構(gòu),用來接受數(shù)據(jù)
package com.taosdata.model;
public class Sensor {
public long ts;
public int val;
public float current;
public String location;
public Sensor() {
}
public Sensor(long ts, int val, float current, String location) {
this.ts = ts;
this.val = val;
this.current = current;
this.location = location;
}
public long getTs() {
return ts;
}
public void setTs(long ts) {
this.ts = ts;
}
public int getVal() {
return val;
}
public void setVal(int val) {
this.val = val;
}
public float getCurrent() {
return current;
}
public void setCurrent(float current) {
this.current = current;
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
@Override
public String toString() {
return "Sensor{" +
"ts=" + ts +
", val=" + val +
", current=" + current +
", location='" + location + '\'' +
'}';
}
}
4) 主程序類 ReadFromTDengine
用途:調(diào)用 Flink 進(jìn)行讀取和寫入數(shù)據(jù)
package com.taosdata;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.taosdata.model.Sensor;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
public class ReadFromTDengine {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Sensor> SensorList = env.addSource( new com.taosdata.flink.SourceFromTDengine() );
SensorList.print();
SensorList.addSink( new com.taosdata.flink.SinkToTDengine() );
env.execute();
}
}
3、簡單測試 RESTful 接口
1) 環(huán)境準(zhǔn)備:
a) Flink 安裝&啟動:
- wget https://dlcdn.apache.org/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
- tar zxf flink-1.14.3-bin-scala_2.12.tgz -C /usr/local
- /usr/local/flink-1.14.3/bin/start-cluster.sh
b) TDengine Database 環(huán)境準(zhǔn)備:
- 創(chuàng)建原始數(shù)據(jù):
- create database tt;
- create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));
- insert into beijing using meters tags(‘beijing’) values(now,220,30.2);
- 創(chuàng)建目標(biāo)數(shù)據(jù)庫表:
- create database sinktest;
- create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));
2) 打包編譯:
源碼位置: https://github.com/liuyq-617/TD-Flink
mvn clean package
3) 程序啟動:
flink run target/test-flink-1.0-SNAPSHOT-dist.jar
- 讀取數(shù)據(jù)
- vi log/flink-root-taskexecutor-0-xxxxx.out
- 查看到數(shù)據(jù)打印:Sensor{ts=1645166073101, val=220, current=5.7, location=’beijing’}
- 寫入數(shù)據(jù)
- show sinktest.tables;
- 已經(jīng)創(chuàng)建了beijing 子表
- select * from sinktest.beijing;
- 可以查詢到剛插入的數(shù)據(jù)
- show sinktest.tables;
4、使用 JNI 方式
舉一反三的小伙伴此時已經(jīng)猜到,只要把 JDBC URL 修改一下就可以了。
但是 Flink 每次分派作業(yè)時都在使用一個新的 ClassLoader,而我們在計算節(jié)點(diǎn)上就會得到“Native library already loaded in another classloader”錯誤。
為了避免此問題,可以將 JDBC 的 jar 包放到 Flink 的 lib 目錄下,不去調(diào)用 dist 包就可以了。
- cp taos-jdbcdriver-2.0.37-dist.jar /usr/local/flink-1.14.3/lib
- flink run target/test-flink-1.0-SNAPSHOT.jar
5、小結(jié)
通過在項目中引入 SourceFromTDengine 和 SinkToTDengine 兩個類,即可完成在 Flink 中對 TDengine 的讀寫操作。后面我們會有文章介紹 Spark 和 TDengine 的對接。
注:文中使用的是 JDBC 的 RESTful 接口,這樣就不用在 Flink 的節(jié)點(diǎn)安裝 TDengine,JNI 方式需要在 Flink 節(jié)點(diǎn)安裝 TDengine Database 的客戶端。



互聯(lián)網(wǎng).png)



-1.png)







證.png)


伙伴.png)
伙伴.png)
伙伴.png)



