真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

1、如何用flink的table和sql?構(gòu)建pom文件

這篇文章主要講解了“1、如何用flink的table和sql構(gòu)建pom文件”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“1、如何用flink的table和sql構(gòu)建pom文件”吧!

成都創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括江干網(wǎng)站建設(shè)、江干網(wǎng)站制作、江干網(wǎng)頁(yè)制作以及江干網(wǎng)絡(luò)營(yíng)銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,江干網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到江干省份的部分城市,未來相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

構(gòu)建pom文件



    4.0.0

    org.example
    flinksqldemo
    1.0-SNAPSHOT


    
        
        UTF-8
        UTF-8

        2.11
        2.11.8
        0.10.2.1
        1.12.0
        2.7.3

        
        compile
    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    8
                    8
                
            
        
    



    
        
        
            org.apache.flink
            flink-table-planner-blink_2.11
            1.12.0

        

        
            org.apache.flink
            flink-java
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-streaming-java_2.11
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-clients_2.11
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-connector-kafka-0.10_${scala.binary.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
            ${setting.scope}
        
        
            org.apache.flink
            flink-connector-filesystem_${scala.binary.version}
            ${flink.version}
        
        
        

        
        
            org.apache.kafka
            kafka_${scala.binary.version}
            ${kafka.version}
            ${setting.scope}
        
        

        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            ${setting.scope}
        
        
            org.apache.hadoop
            hadoop-hdfs
            ${hadoop.version}
            ${setting.scope}
        
        
            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
            ${setting.scope}
        
        

        
            org.slf4j
            slf4j-api
            1.7.25
        
        
            com.alibaba
            fastjson
            1.2.72
        
        
            redis.clients
            jedis
            2.7.3
        
        
            com.google.guava
            guava
            29.0-jre
        

    

2、編寫代碼

package com.jd.data;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");
//        DataStreamSource stream = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator map = stream.map(new MapFunction() {

            public SensorReading map(String s) throws Exception {
                String[] split = s.split(",");
                return new SensorReading(split[0], split[1], split[2]);
            }
        });



        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//        使用 table api
//        Table table = tableEnv.fromDataStream(map);
//        table.printSchema();
//        Table select = table.select("a,b");

//        使用 sql api
        tableEnv.createTemporaryView("test", map);
        Table select = tableEnv.sqlQuery(" select a, b from test");


        DataStream sensorReading2DataStream = tableEnv.toAppendStream(select, SensorReading2.class);
        sensorReading2DataStream.map(new MapFunction() {
            @Override
            public Object map(SensorReading2 value) throws Exception {
                System.out.println(value.a+"   "+ value.b);
                return null;
            }
        });
        env.execute();


    }
}
package com.jd.data;

public class SensorReading {
    public String a;
    public String b;
    public String c;

    public SensorReading(){

    }

    public SensorReading(String a, String b, String c) {
        this.a = a;
        this.b = b;
        this.c = c;
    }

    public String getA() {
        return a;
    }

    public void setA(String a) {
        this.a = a;
    }

    public String getB() {
        return b;
    }

    public void setB(String b) {
        this.b = b;
    }

    public String getC() {
        return c;
    }

    public void setC(String c) {
        this.c = c;
    }
}
package com.jd.data;

public class SensorReading2 {
    public String a;
    public String b;

    public SensorReading2(){

    }

    public SensorReading2(String a, String b) {
        this.a = a;
        this.b = b;
    }

    public String getA() {
        return a;
    }

    public void setA(String a) {
        this.a = a;
    }

    public String getB() {
        return b;
    }

    public void setB(String b) {
        this.b = b;
    }


}

注意:pojo 中屬性必須是public的, 包含無參構(gòu)造器

感謝各位的閱讀,以上就是“1、如何用flink的table和sql構(gòu)建pom文件”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)1、如何用flink的table和sql構(gòu)建pom文件這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!


新聞標(biāo)題:1、如何用flink的table和sql?構(gòu)建pom文件
分享路徑:http://www.weahome.cn/article/iegiei.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部