新聞中心
在Flink CDC中,將數(shù)據(jù)從Flink 1.17寫入Doris,需要遵循以下步驟:

創(chuàng)新互聯(lián)公司公司2013年成立,先為巴林左旗等服務(wù)建站,巴林左旗等地企業(yè),進行企業(yè)商務(wù)咨詢服務(wù)。為巴林左旗企業(yè)網(wǎng)站制作PC+手機+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。
1、添加依賴
在項目的pom.xml文件中添加Flink CDC和Doris的依賴:
org.apache.flink flinkconnectordoris_2.11 1.13.2 org.apache.flink flinkconnectormysqlcdc 2.1.0
2、創(chuàng)建Flink CDC Source
創(chuàng)建一個Flink CDC Source,用于從MySQL數(shù)據(jù)庫中讀取數(shù)據(jù)變更事件:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
public class FlinkCDCSourceExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SourceFunction sourceFunction = MySqlSource.builder()
.hostname("localhost")
.port(3306)
.databaseList("mydb") // 監(jiān)聽的數(shù)據(jù)庫名
.tableList("mydb.mytable") // 監(jiān)聽的表名
.username("root")
.password("password")
.deserializer(new StringDebeziumDeserializationSchema()) // 反序列化方式
.build();
env.addSource(sourceFunction).print();
env.execute("Flink CDC Example");
}
}
3、創(chuàng)建Doris Sink
創(chuàng)建一個Doris Sink,用于將數(shù)據(jù)寫入Doris數(shù)據(jù)庫:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.doris.DorisSink;
import org.apache.flink.streaming.connectors.doris.DorisStreamLoadOptions;
import org.apache.flink.types.Row;
public class DorisSinkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 假設(shè)從Flink CDC Source獲取的數(shù)據(jù)流為dataStream
DataStream dataStream = ...;
DorisSink dorisSink = DorisSink.builder()
.setDorisTable("mydb.mytable") // Doris表名
.setUsername("root")
.setPassword("password")
.setFenodes("localhost:8030") // Doris FE節(jié)點地址
.setLoadProps(DorisStreamLoadOptions.DEFAULT_LOAD_PROPS) // 加載屬性
.build();
dataStream.addSink(dorisSink);
env.execute("Doris Sink Example");
}
}
4、整合Flink CDC Source和Doris Sink
將Flink CDC Source和Doris Sink整合到一起,實現(xiàn)從MySQL數(shù)據(jù)庫到Doris數(shù)據(jù)庫的數(shù)據(jù)同步:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.doris.DorisSink;
import org.apache.flink.streaming.connectors.doris.DorisStreamLoadOptions;
import org.apache.flink.types.Row;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
public class FlinkCDCToDorisExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SourceFunction sourceFunction = MySqlSource.builder()
.hostname("localhost")
.port(3306)
.databaseList("mydb") // 監(jiān)聽的數(shù)據(jù)庫名
.tableList("mydb.mytable") // 監(jiān)聽的表名
.username("root")
.password("password")
.deserializer(new StringDebeziumDeserializationSchema()) // 反序列化方式
.build();
DataStream dataStream = env.addSource(sourceFunction);
// 將數(shù)據(jù)流轉(zhuǎn)換為Row類型,以便寫入Doris
DataStream rowDataStream = dataStream.map(json > {
JsonObject jsonObject = new JsonParser().parse(json).getAsJsonObject();
String before = jsonObject.get("before").getAsString();
String after = jsonObject.get("after").getAsString();
return Row.of(before, after);
}).returns(new RowTypeInfo(Types.STRING, Types.STRING));
DorisSink dorisSink = DorisSink.builder()
.setDorisTable("mydb.mytable") // Doris表名
.setUsername("root")
.setPassword("password")
.setFenodes("localhost:8030") // Doris FE節(jié)點地址
.setLoadProps(DorisStreamLoadOptions.DEFAULT_LOAD_PROPS) // 加載屬性
.build();
rowDataStream.addSink(dorisSink);
env.execute("Flink CDC to Doris Example");
}
}
這樣,就完成了使用Flink CDC將數(shù)據(jù)從MySQL數(shù)據(jù)庫同步到Doris數(shù)據(jù)庫的過程。
名稱欄目:FlinkCDC里flink1.17寫doris的代碼怎么做?
轉(zhuǎn)載來源:http://www.fisionsoft.com.cn/article/cdecesc.html


咨詢
建站咨詢
