午夜激情网址,精品久久久久久777米琪桃花,性生活TV日韩,骚货自慰久草

首頁 >頭條 > 正文

【熱聞】大數(shù)據(jù)Flink進階(六):Flink入門案例

2023-03-22 05:16:24來源:騰訊云

Flink入門案例

需求:讀取本地數(shù)據(jù)文件,統(tǒng)計文件中每個單詞出現(xiàn)的次數(shù)。

一、IDEA Project創(chuàng)建及配置

本案例編寫Flink代碼選擇語言為Java和Scala,所以這里我們通過IntelliJ IDEA創(chuàng)建一個目錄,其中包括Java項目模塊和Scala項目模塊,將Flink Java api和Flink Scala api分別在不同項目模塊中實現(xiàn)。步驟如下:


【資料圖】

1、打開IDEA,創(chuàng)建空項目

2、在IntelliJ IDEA 中安裝Scala插件

使用IntelliJ IDEA開發(fā)Flink,如果使用Scala api 那么還需在IntelliJ IDEA中安裝Scala的插件,如果已經(jīng)安裝可以忽略此步驟,下圖為以安裝Scala插件。

3、打開Structure,創(chuàng)建項目新模塊

創(chuàng)建Java模塊:

繼續(xù)點擊"+",創(chuàng)建Scala模塊:

創(chuàng)建好"FlinkScalaCode"模塊后,右鍵該模塊添加Scala框架支持,并修改該模塊中的"java"src源為"scala":

在"FlinkScalaCode"模塊Maven pom.xml中引入Scala依賴包,這里使用的Scala版本為2.12.10。

  org.scala-lang  scala-library  2.12.10  org.scala-lang  scala-compiler  2.12.10  org.scala-lang  scala-reflect  2.12.10

4、Log4j日志配置

為了方便查看項目運行過程中的日志,需要在兩個項目模塊中配置log4j.properties配置文件,并放在各自項目src/main/resources資源目錄下,沒有resources資源目錄需要手動創(chuàng)建并設置成資源目錄。log4j.properties配置文件內(nèi)容如下:

log4j.rootLogger=ERROR, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n

復制

并在兩個項目中的Maven pom.xml中添加對應的log4j需要的依賴包,使代碼運行時能正常打印結果:

  org.slf4j  slf4j-log4j12  1.7.36  org.apache.logging.log4j  log4j-to-slf4j  2.17.2

5、分別在兩個項目模塊中導入Flink Maven依賴

"FlinkJavaCode"模塊導入Flink Maven依賴如下:

  UTF-8  1.8  1.8  1.16.0  1.7.36  2.17.2        org.apache.flink    flink-clients    ${flink.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

"FlinkScalaCode"模塊導入Flink Maven依賴如下:

  UTF-8  1.8  1.8  1.16.0  1.7.31  2.17.1  2.12.10  2.12        org.apache.flink    flink-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-streaming-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-clients    ${flink.version}          org.scala-lang    scala-library    ${scala.version}        org.scala-lang    scala-compiler    ${scala.version}        org.scala-lang    scala-reflect    ${scala.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

注意:在后續(xù)實現(xiàn)WordCount需求時,F(xiàn)link Java Api只需要在Maven中導入"flink-clients"依賴包即可,而Flink Scala Api 需要導入以下三個依賴包:

flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients

主要是因為在Flink1.15版本后,F(xiàn)link添加對opting-out(排除)Scala的支持,如果你只使用Flink的Java api,導入包不必包含scala后綴,如果使用Flink的Scala api,需要選擇匹配的Scala版本。

二、案例數(shù)據(jù)準備

在項目"MyFlinkCode"中創(chuàng)建"data"目錄,在目錄中創(chuàng)建"words.txt"文件,向文件中寫入以下內(nèi)容,方便后續(xù)使用Flink編寫WordCount實現(xiàn)代碼。

hello Flinkhello MapReducehello Sparkhello Flinkhello Flinkhello Flinkhello Flinkhello Javahello Scalahello Flinkhello Javahello Flinkhello Scalahello Flinkhello Flinkhello Flink

三、案例實現(xiàn)

數(shù)據(jù)源分為有界和無界之分,有界數(shù)據(jù)源可以編寫批處理程序,無界數(shù)據(jù)源可以編寫流式程序。DataSet API用于批處理,DataStream API用于流式處理。

批處理使用ExecutionEnvironment和DataSet,流式處理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示數(shù)據(jù)的特殊類,DataSet處理的數(shù)據(jù)是有界的,DataStream處理的數(shù)據(jù)是無界的,這兩個類都是不可變的,一旦創(chuàng)建出來就無法添加或者刪除數(shù)據(jù)元。

1、Flink 批數(shù)據(jù)處理案例

Java版本W(wǎng)ordCount

使用Flink Java Dataset api實現(xiàn)WordCount具體代碼如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1.讀取文件DataSource linesDS = env.readTextFile("./data/words.txt");//2.切分單詞FlatMapOperator wordsDS =        linesDS.flatMap((String lines, Collector collector) -> {    String[] arr = lines.split(" ");    for (String word : arr) {        collector.collect(word);    }}).returns(Types.STRING);//3.將單詞轉換成Tuple2 KV 類型MapOperator> kvWordsDS =        wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.按照key 進行分組處理得到最后結果并打印kvWordsDS.groupBy(0).sum(1).print();

Scala版本W(wǎng)ordCount

使用Flink Scala Dataset api實現(xiàn)WordCount具體代碼如下:

//1.準備環(huán)境,注意是Scala中對應的Flink環(huán)境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.導入隱式轉換,使用Scala API 時需要隱式轉換來推斷函數(shù)操作后的類型import org.apache.flink.api.scala._//3.讀取數(shù)據(jù)文件val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")//4.進行 WordCount 統(tǒng)計并打印linesDS.flatMap(line => {  line.split(" ")})  .map((_, 1))  .groupBy(0)  .sum(1)  .print()

以上無論是Java api 或者是Scala api 輸出結果如下,顯示的最終結果是統(tǒng)計好的單詞個數(shù)。

(hello,15)(Spark,1)(Scala,2)(Java,2)(MapReduce,1)(Flink,10)

2、Flink流式數(shù)據(jù)處理案例

Java版本W(wǎng)ordCount

使用Flink Java DataStream api實現(xiàn)WordCount具體代碼如下:

//1.創(chuàng)建流式處理環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.讀取文件數(shù)據(jù)DataStreamSource lines = env.readTextFile("./data/words.txt");//3.切分單詞,設置KV格式數(shù)據(jù)SingleOutputStreamOperator> kvWordsDS =        lines.flatMap((String line, Collector> collector) -> {    String[] words = line.split(" ");    for (String word : words) {        collector.collect(Tuple2.of(word, 1L));    }}).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分組統(tǒng)計獲取 WordCount 結果kvWordsDS.keyBy(tp->tp.f0).sum(1).print();//5.流式計算中需要最后執(zhí)行execute方法env.execute();
Scala版本W(wǎng)ordCount

使用Flink Scala DataStream api實現(xiàn)WordCount具體代碼如下:

//1.創(chuàng)建環(huán)境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.導入隱式轉換,使用Scala API 時需要隱式轉換來推斷函數(shù)操作后的類型import org.apache.flink.streaming.api.scala._//3.讀取文件val ds: DataStream[String] = env.readTextFile("./data/words.txt")//4.進行wordCount統(tǒng)計ds.flatMap(line=>{line.split(" ")})  .map((_,1))  .keyBy(_._1)  .sum(1)  .print()//5.最后使用execute 方法觸發(fā)執(zhí)行env.execute()

以上輸出結果開頭展示的是處理當前數(shù)據(jù)的線程,一個Flink應用程序執(zhí)行時默認的線程數(shù)與當前節(jié)點cpu的總線程數(shù)有關。

3、DataStream BATCH模式

下面使用Java代碼使用DataStream API 的Batch 模式來處理批WordCount代碼,方式如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設置批運行模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource linesDS = env.readTextFile("./data/words.txt");SingleOutputStreamOperator> wordsDS = linesDS.flatMap(new FlatMapFunction>() {    @Override    public void flatMap(String lines, Collector> out) throws Exception {        String[] words = lines.split(" ");        for (String word : words) {            out.collect(new Tuple2<>(word, 1L));        }    }});wordsDS.keyBy(tp -> tp.f0).sum(1).print();env.execute();

以上代碼運行完成之后結果如下,可以看到結果與批處理結果類似,只是多了對應的處理線程號。

3> (hello,15)8> (Flink,10)8> (Spark,1)7> (Java,2)7> (Scala,2)7> (MapReduce,1)

此外,Stream API 中除了可以設置Batch批處理模式之外,還可以設置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式會根據(jù)數(shù)據(jù)是有界流/無界流自動決定采用BATCH/STREAMING模式來讀取數(shù)據(jù),設置方式如下:

//BATCH 設置批處理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);//AUTOMATIC 會根據(jù)有界流/無界流自動決定采用BATCH/STREAMING模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//STREAMING 設置流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

除了在代碼中設置處理模式外,還可以在Flink配置文件(flink-conf.yaml)中設置execution.runtime-mode參數(shù)來指定對應的模式,也可以在集群中提交Flink任務時指定execution.runtime-mode來指定,F(xiàn)link官方建議在提交Flink任務時指定執(zhí)行模式,這樣減少了代碼配置給Flink Application提供了更大的靈活性,提交任務指定參數(shù)如下:

$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar
責任編輯:

標簽:

免責聲明

頭條新聞

推薦內(nèi)容

粉嫩白浆| 亚洲成人久久久| 中文字幕日本不卡| 平湖市| 一本无码久久免费| 日韩v欧美v中文在线| 久久变态视频| 免费在线看黄色视频的| 后入自拍| 欧美在线23P| 太久网页版| 亚洲日本女孩3区| AV无码国产在线观看岛国 | 日本息子熟妇电影| 丰满少妇被猛烈进入无码| 中文字幕在线观看首页| 免费无码无遮挡裸体视频在线观看 | 男女aj免费视频网站| 日本1区| 精品久久久了| 亚洲色精品视频| 超碰97人妻在线| 色欧美色天堂色综合| 亚洲色欲色欲欲www在线| 欧美一级乳乱中文字幕| 亚洲一区二区三区影院| 茄子视频一区二区三区| 西西444WWW无码视频软件| 亚洲中文字幕精品无码| 丰满人妻一级在| 人妻高清无码| 日韩夫妻精品| 日本不卡在线视频二区三区| 高潮毛片片7777| 欧美FREESEX黑人又粗又大| 二区国产你懂的| 国产日韩机器一区二区| 中文字幕亚洲乱| 青青草干| 综合五月丁香| 在线视频网站汚|