這篇文章主要講解了“1、如何用flink的table和sql構(gòu)建pom文件”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“1、如何用flink的table和sql構(gòu)建pom文件”吧!
成都創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括江干網(wǎng)站建設(shè)、江干網(wǎng)站制作、江干網(wǎng)頁(yè)制作以及江干網(wǎng)絡(luò)營(yíng)銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,江干網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到江干省份的部分城市,未來相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
構(gòu)建pom文件
4.0.0 org.example flinksqldemo 1.0-SNAPSHOT UTF-8 UTF-8 2.11 2.11.8 0.10.2.1 1.12.0 2.7.3 compile org.apache.maven.plugins maven-compiler-plugin 8 8 org.apache.flink flink-table-planner-blink_2.11 1.12.0 org.apache.flink flink-java ${flink.version} ${setting.scope} org.apache.flink flink-streaming-java_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-clients_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-connector-kafka-0.10_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} ${setting.scope} org.apache.flink flink-connector-filesystem_${scala.binary.version} ${flink.version} org.apache.kafka kafka_${scala.binary.version} ${kafka.version} ${setting.scope} org.apache.hadoop hadoop-common ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-hdfs ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-client ${hadoop.version} ${setting.scope} org.slf4j slf4j-api 1.7.25 com.alibaba fastjson 1.2.72 redis.clients jedis 2.7.3 com.google.guava guava 29.0-jre
2、編寫代碼
package com.jd.data; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSourcestream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt"); // DataStreamSource stream = env.socketTextStream("localhost", 8888); SingleOutputStreamOperator map = stream.map(new MapFunction () { public SensorReading map(String s) throws Exception { String[] split = s.split(","); return new SensorReading(split[0], split[1], split[2]); } }); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 使用 table api // Table table = tableEnv.fromDataStream(map); // table.printSchema(); // Table select = table.select("a,b"); // 使用 sql api tableEnv.createTemporaryView("test", map); Table select = tableEnv.sqlQuery(" select a, b from test"); DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class); sensorReading2DataStream.map(new MapFunction () { @Override public Object map(SensorReading2 value) throws Exception { System.out.println(value.a+" "+ value.b); return null; } }); env.execute(); } }
package com.jd.data; public class SensorReading { public String a; public String b; public String c; public SensorReading(){ } public SensorReading(String a, String b, String c) { this.a = a; this.b = b; this.c = c; } public String getA() { return a; } public void setA(String a) { this.a = a; } public String getB() { return b; } public void setB(String b) { this.b = b; } public String getC() { return c; } public void setC(String c) { this.c = c; } }
package com.jd.data; public class SensorReading2 { public String a; public String b; public SensorReading2(){ } public SensorReading2(String a, String b) { this.a = a; this.b = b; } public String getA() { return a; } public void setA(String a) { this.a = a; } public String getB() { return b; } public void setB(String b) { this.b = b; } }
注意:pojo 中屬性必須是public的, 包含無參構(gòu)造器
感謝各位的閱讀,以上就是“1、如何用flink的table和sql構(gòu)建pom文件”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)1、如何用flink的table和sql構(gòu)建pom文件這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!