2023-03-22 05:16:24來源:騰訊云
需求:讀取本地數(shù)據(jù)文件,統(tǒng)計文件中每個單詞出現(xiàn)的次數(shù)。
本案例編寫Flink代碼選擇語言為Java和Scala,所以這里我們通過IntelliJ IDEA創(chuàng)建一個目錄,其中包括Java項目模塊和Scala項目模塊,將Flink Java api和Flink Scala api分別在不同項目模塊中實現(xiàn)。步驟如下:
【資料圖】
使用IntelliJ IDEA開發(fā)Flink,如果使用Scala api 那么還需在IntelliJ IDEA中安裝Scala的插件,如果已經(jīng)安裝可以忽略此步驟,下圖為以安裝Scala插件。
創(chuàng)建Java模塊:
繼續(xù)點擊"+",創(chuàng)建Scala模塊:
創(chuàng)建好"FlinkScalaCode"模塊后,右鍵該模塊添加Scala框架支持,并修改該模塊中的"java"src源為"scala":
在"FlinkScalaCode"模塊Maven pom.xml中引入Scala依賴包,這里使用的Scala版本為2.12.10。
org.scala-lang scala-library 2.12.10 org.scala-lang scala-compiler 2.12.10 org.scala-lang scala-reflect 2.12.10
為了方便查看項目運行過程中的日志,需要在兩個項目模塊中配置log4j.properties配置文件,并放在各自項目src/main/resources資源目錄下,沒有resources資源目錄需要手動創(chuàng)建并設置成資源目錄。log4j.properties配置文件內(nèi)容如下:
log4j.rootLogger=ERROR, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n
復制
并在兩個項目中的Maven pom.xml中添加對應的log4j需要的依賴包,使代碼運行時能正常打印結果:
org.slf4j slf4j-log4j12 1.7.36 org.apache.logging.log4j log4j-to-slf4j 2.17.2
"FlinkJavaCode"模塊導入Flink Maven依賴如下:
UTF-8 1.8 1.8 1.16.0 1.7.36 2.17.2 org.apache.flink flink-clients ${flink.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version}
"FlinkScalaCode"模塊導入Flink Maven依賴如下:
UTF-8 1.8 1.8 1.16.0 1.7.31 2.17.1 2.12.10 2.12 org.apache.flink flink-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-clients ${flink.version} org.scala-lang scala-library ${scala.version} org.scala-lang scala-compiler ${scala.version} org.scala-lang scala-reflect ${scala.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version}
注意:在后續(xù)實現(xiàn)WordCount需求時,F(xiàn)link Java Api只需要在Maven中導入"flink-clients"依賴包即可,而Flink Scala Api 需要導入以下三個依賴包:
flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients
主要是因為在Flink1.15版本后,F(xiàn)link添加對opting-out(排除)Scala的支持,如果你只使用Flink的Java api,導入包不必包含scala后綴,如果使用Flink的Scala api,需要選擇匹配的Scala版本。
在項目"MyFlinkCode"中創(chuàng)建"data"目錄,在目錄中創(chuàng)建"words.txt"文件,向文件中寫入以下內(nèi)容,方便后續(xù)使用Flink編寫WordCount實現(xiàn)代碼。
hello Flinkhello MapReducehello Sparkhello Flinkhello Flinkhello Flinkhello Flinkhello Javahello Scalahello Flinkhello Javahello Flinkhello Scalahello Flinkhello Flinkhello Flink
數(shù)據(jù)源分為有界和無界之分,有界數(shù)據(jù)源可以編寫批處理程序,無界數(shù)據(jù)源可以編寫流式程序。DataSet API用于批處理,DataStream API用于流式處理。
批處理使用ExecutionEnvironment和DataSet,流式處理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示數(shù)據(jù)的特殊類,DataSet處理的數(shù)據(jù)是有界的,DataStream處理的數(shù)據(jù)是無界的,這兩個類都是不可變的,一旦創(chuàng)建出來就無法添加或者刪除數(shù)據(jù)元。
使用Flink Java Dataset api實現(xiàn)WordCount具體代碼如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1.讀取文件DataSource linesDS = env.readTextFile("./data/words.txt");//2.切分單詞FlatMapOperator wordsDS = linesDS.flatMap((String lines, Collector collector) -> { String[] arr = lines.split(" "); for (String word : arr) { collector.collect(word); }}).returns(Types.STRING);//3.將單詞轉換成Tuple2 KV 類型MapOperator> kvWordsDS = wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.按照key 進行分組處理得到最后結果并打印kvWordsDS.groupBy(0).sum(1).print();
Scala版本W(wǎng)ordCount
使用Flink Scala Dataset api實現(xiàn)WordCount具體代碼如下:
//1.準備環(huán)境,注意是Scala中對應的Flink環(huán)境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.導入隱式轉換,使用Scala API 時需要隱式轉換來推斷函數(shù)操作后的類型import org.apache.flink.api.scala._//3.讀取數(shù)據(jù)文件val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")//4.進行 WordCount 統(tǒng)計并打印linesDS.flatMap(line => { line.split(" ")}) .map((_, 1)) .groupBy(0) .sum(1) .print()
以上無論是Java api 或者是Scala api 輸出結果如下,顯示的最終結果是統(tǒng)計好的單詞個數(shù)。
(hello,15)(Spark,1)(Scala,2)(Java,2)(MapReduce,1)(Flink,10)
使用Flink Java DataStream api實現(xiàn)WordCount具體代碼如下:
//1.創(chuàng)建流式處理環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.讀取文件數(shù)據(jù)DataStreamSource lines = env.readTextFile("./data/words.txt");//3.切分單詞,設置KV格式數(shù)據(jù)SingleOutputStreamOperator> kvWordsDS = lines.flatMap((String line, Collector> collector) -> { String[] words = line.split(" "); for (String word : words) { collector.collect(Tuple2.of(word, 1L)); }}).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分組統(tǒng)計獲取 WordCount 結果kvWordsDS.keyBy(tp->tp.f0).sum(1).print();//5.流式計算中需要最后執(zhí)行execute方法env.execute();
Scala版本W(wǎng)ordCount使用Flink Scala DataStream api實現(xiàn)WordCount具體代碼如下:
//1.創(chuàng)建環(huán)境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.導入隱式轉換,使用Scala API 時需要隱式轉換來推斷函數(shù)操作后的類型import org.apache.flink.streaming.api.scala._//3.讀取文件val ds: DataStream[String] = env.readTextFile("./data/words.txt")//4.進行wordCount統(tǒng)計ds.flatMap(line=>{line.split(" ")}) .map((_,1)) .keyBy(_._1) .sum(1) .print()//5.最后使用execute 方法觸發(fā)執(zhí)行env.execute()
以上輸出結果開頭展示的是處理當前數(shù)據(jù)的線程,一個Flink應用程序執(zhí)行時默認的線程數(shù)與當前節(jié)點cpu的總線程數(shù)有關。
下面使用Java代碼使用DataStream API 的Batch 模式來處理批WordCount代碼,方式如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設置批運行模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource linesDS = env.readTextFile("./data/words.txt");SingleOutputStreamOperator> wordsDS = linesDS.flatMap(new FlatMapFunction>() { @Override public void flatMap(String lines, Collector> out) throws Exception { String[] words = lines.split(" "); for (String word : words) { out.collect(new Tuple2<>(word, 1L)); } }});wordsDS.keyBy(tp -> tp.f0).sum(1).print();env.execute();
以上代碼運行完成之后結果如下,可以看到結果與批處理結果類似,只是多了對應的處理線程號。
3> (hello,15)8> (Flink,10)8> (Spark,1)7> (Java,2)7> (Scala,2)7> (MapReduce,1)
此外,Stream API 中除了可以設置Batch批處理模式之外,還可以設置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式會根據(jù)數(shù)據(jù)是有界流/無界流自動決定采用BATCH/STREAMING模式來讀取數(shù)據(jù),設置方式如下:
//BATCH 設置批處理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);//AUTOMATIC 會根據(jù)有界流/無界流自動決定采用BATCH/STREAMING模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//STREAMING 設置流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
除了在代碼中設置處理模式外,還可以在Flink配置文件(flink-conf.yaml)中設置execution.runtime-mode參數(shù)來指定對應的模式,也可以在集群中提交Flink任務時指定execution.runtime-mode來指定,F(xiàn)link官方建議在提交Flink任務時指定執(zhí)行模式,這樣減少了代碼配置給Flink Application提供了更大的靈活性,提交任務指定參數(shù)如下:
$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar
責任編輯:
標簽:
本案例編寫Flink代碼選擇語言為Java和Scala,所以這里我們通過IntelliJIDEA創(chuàng)建一個目錄,其中包括Java...
4個小時,不可以連續(xù)使用!建議大家2-4小時更換一次。我們使用口罩的時候一定要注意以下幾點:1 不要觸...
烏拉特后旗氣象臺解除沙塵暴黃色預警【III級 較重】
春暖花開賞花踏青小心被“蟄”近日,市民張女士帶著兒子在共青森林公園游玩沒想到兒子的一根手指被蜜蜂...
【 學生因點外賣遭校務人員按倒訓斥 校方:已對該校務人員批評教育】 學?;貞獙W生因點外賣遭按倒訓...