六月婷婷AV,国产偷窥猎奇福利二区,日韩三级片。,好吊色网站,日韩成人中文在线视频,国产亚洲午夜啪啪,亚洲欧美另类国产精品,国产成人av1,任你艹在线观看

【技術(shù)干貨】代碼示例:使用 Apache Flink 連接 TDengine

小 T 導(dǎo)讀:想用 Flink 對(duì)接 TDengine?保姆級(jí)教程來了。

0、前言

TDengine 是由濤思數(shù)據(jù)開發(fā)并開源的一款高性能、分布式、支持 SQL 的時(shí)序數(shù)據(jù)庫(Time Series Database)。

除了核心的時(shí)序數(shù)據(jù)庫功能外,TDengine 還提供緩存數(shù)據(jù)訂閱、流式計(jì)算等大數(shù)據(jù)平臺(tái)所需要的系列功能。但是很多小伙伴出于架構(gòu)的考慮,還是需要將數(shù)據(jù)導(dǎo)出到 Apache Flink、Apache Spark 等平臺(tái)進(jìn)行計(jì)算分析。

為了幫助大家對(duì)接,我們特別推出了保姆級(jí)課程,包學(xué)包會(huì)。

Flink 對(duì)接 TDengine Database

1、技術(shù)實(shí)現(xiàn)

Apache Flink 提供了 SourceFunction 和 SinkFunction,用來提供 Flink 和外部數(shù)據(jù)源的連接,其中 SouceFunction 為從數(shù)據(jù)源讀取數(shù)據(jù),SinkFunction 為將數(shù)據(jù)寫入數(shù)據(jù)源。 與此同時(shí),F(xiàn)link 提供了 RichSourceFunction 和 RichSinkFunction 這兩個(gè)類(繼承自AbstractRichFunction),提供了額外的初始化(open(Configuration))和銷毀方法(close())。 通過重寫這兩個(gè)方法,可以避免每次讀寫數(shù)據(jù)時(shí)都重新建立連接。

2、代碼實(shí)現(xiàn)

完整源碼:https://github.com/liuyq-617/TD-Flink

代碼邏輯:

1) 自定義類 SourceFromTDengine

用途:數(shù)據(jù)源連接,數(shù)據(jù)讀取

package com.taosdata.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.taosdata.model.Sensor;
import java.sql.*;
import java.util.Properties;

public class SourceFromTDengine extends RichSourceFunction<Sensor> {
    Statement statement;
    private Connection connection;
    private String property;
    public SourceFromTDengine(){
        super();
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        String driver = "com.taosdata.jdbc.rs.RestfulDriver";
        String host = "u05";
        String username = "root";
        String password = "taosdata";
        String prop = System.getProperty("java.library.path");
        Logger LOG = LoggerFactory.getLogger(SourceFromTDengine.class);
        LOG.info("java.library.path:{}", prop);
        System.out.println(prop);
        Class.forName( driver );
        Properties properties = new Properties();
        connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
                , properties);
        statement = connection.createStatement();
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (statement != null) {
            statement.close();
        }
    }

    @Override
    public void run(SourceContext<Sensor> sourceContext) throws Exception {
        try {
            String sql = "select * from tt.meters";
            ResultSet resultSet = statement.executeQuery(sql);
            while (resultSet.next()) {
                Sensor sensor = new Sensor( resultSet.getLong(1),
                        resultSet.getInt( "vol" ),
                        resultSet.getFloat( "current" ),
                        resultSet.getString( "location" ).trim());
                sourceContext.collect( sensor );
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void cancel() {

    }
}   

2) 自定義類 SinkToTDengine

用途:數(shù)據(jù)源連接,數(shù)據(jù)寫入

SinkToTDengine

package com.taosdata.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.taosdata.model.Sensor;
import java.sql.*;
import java.util.Properties;


public class SinkToTDengine extends RichSinkFunction<Sensor> {
    Statement statement;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        String driver = "com.taosdata.jdbc.rs.RestfulDriver";
        String host = "TAOS-FQDN";
        String username = "root";
        String password = "taosdata";
        String prop = System.getProperty("java.library.path");
        System.out.println(prop);
        Class.forName( driver );
        Properties properties = new Properties();
        connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
                , properties);
        statement = connection.createStatement();
        
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (statement != null) {
            statement.close();
        }
    }

    @Override
    public void invoke(Sensor sensor, Context context) throws Exception {
        try {
            String sql = String.format("insert into sinktest.%s using sinktest.meters tags('%s') values(%d,%d,%f)", 
                                sensor.getLocation(),
                                sensor.getLocation(),
                                sensor.getTs(),
                                sensor.getVal(),
                                sensor.getCurrent()
                                );
            statement.executeUpdate(sql);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3) 自定義類 Sensor

用途:定義數(shù)據(jù)結(jié)構(gòu),用來接受數(shù)據(jù)

package com.taosdata.model;

public class Sensor {

    public long ts;
    public int val;
    public float current;
    public String location;

    public Sensor() {

    }

    public Sensor(long ts, int val, float current, String location) {
        this.ts = ts;
        this.val = val;
        this.current = current;
        this.location = location;
    }

    public long getTs() {
        return ts;
    }

    public void setTs(long ts) {
        this.ts = ts;
    }

    public int getVal() {
        return val;
    }

    public void setVal(int val) {
        this.val = val;
    }

    public float getCurrent() {
        return current;
    }

    public void setCurrent(float current) {
        this.current = current;
    }

    public String getLocation() {
        return location;
    }

    public void setLocation(String location) {
        this.location = location;
    }

    @Override
    public String toString() {
        return "Sensor{" +
                "ts=" + ts +
                ", val=" + val +
                ", current=" + current +
                ", location='" + location + '\'' +
                '}';
    }
}

4) 主程序類 ReadFromTDengine

用途:調(diào)用 Flink 進(jìn)行讀取和寫入數(shù)據(jù)

package com.taosdata;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.taosdata.model.Sensor;

import org.slf4j.LoggerFactory;
import org.slf4j.Logger;

public class ReadFromTDengine {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Sensor> SensorList = env.addSource( new com.taosdata.flink.SourceFromTDengine() );
        SensorList.print();
        SensorList.addSink( new com.taosdata.flink.SinkToTDengine() );       
        env.execute();

    }
}

3、簡單測試 RESTful 接口

1) 環(huán)境準(zhǔn)備:

a) Flink 安裝&啟動(dòng):

b) TDengine Database 環(huán)境準(zhǔn)備:

  • 創(chuàng)建原始數(shù)據(jù): 
    • create database tt;
    • create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));
    • insert into beijing using meters tags(‘beijing’) values(now,220,30.2);
  • 創(chuàng)建目標(biāo)數(shù)據(jù)庫表: 
    • create database sinktest;
    • create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));

2) 打包編譯:

源碼位置: https://github.com/liuyq-617/TD-Flink

mvn clean package

3) 程序啟動(dòng):

flink run target/test-flink-1.0-SNAPSHOT-dist.jar

  • 讀取數(shù)據(jù) 
    • vi log/flink-root-taskexecutor-0-xxxxx.out 
    • 查看到數(shù)據(jù)打印:Sensor{ts=1645166073101, val=220, current=5.7, location=’beijing’}
  • 寫入數(shù)據(jù) 
    • show sinktest.tables; 
      • 已經(jīng)創(chuàng)建了beijing 子表
    • select * from sinktest.beijing; 
      • 可以查詢到剛插入的數(shù)據(jù)

4、使用 JNI 方式

舉一反三的小伙伴此時(shí)已經(jīng)猜到,只要把 JDBC URL 修改一下就可以了。

但是 Flink 每次分派作業(yè)時(shí)都在使用一個(gè)新的 ClassLoader,而我們?cè)谟?jì)算節(jié)點(diǎn)上就會(huì)得到“Native library already loaded in another classloader”錯(cuò)誤。

為了避免此問題,可以將 JDBC 的 jar 包放到 Flink 的 lib 目錄下,不去調(diào)用 dist 包就可以了。

  • cp taos-jdbcdriver-2.0.37-dist.jar /usr/local/flink-1.14.3/lib
  • flink run target/test-flink-1.0-SNAPSHOT.jar

5、小結(jié)

通過在項(xiàng)目中引入 SourceFromTDengine 和 SinkToTDengine 兩個(gè)類,即可完成在 Flink 中對(duì) TDengine 的讀寫操作。后面我們會(huì)有文章介紹 Spark 和 TDengine 的對(duì)接。

注:文中使用的是 JDBC 的 RESTful 接口,這樣就不用在 Flink 的節(jié)點(diǎn)安裝 TDengine,JNI 方式需要在 Flink 節(jié)點(diǎn)安裝 TDengine Database 的客戶端。