亚洲国产精品嫩草影院2_美丽人妻沦为公厕菜市场_99久久婷婷国产综合精品_国内揄拍自拍视频在线直播平台_免费观看在线视频一区

天天新資訊:大數(shù)據(jù)Flink進(jìn)階(十):Flink集群部署

2023-04-10 03:21:33     來源 : 騰訊云

?Flink的安裝和部署主要分為本地(單機(jī))模式和集群模式,其中本地模式只需直接解壓就可以使用,不用修改任何參數(shù),一般在做一些簡單測試的時候使用。本地模式在這里不再贅述。集群部署模式主要包含Standalone、Hadoop Yarn 、Kubernetes等,F(xiàn)link可以借助以上資源管理器來實(shí)現(xiàn)分布式計(jì)算,目前企業(yè)使用最多的是Flink 基于Hadoop Yarn資源管理器模式,下面我們重點(diǎn)講解Flink 基于Standalone集群、Yarn資源管理器以及Kubernetes集群部署方式。

一、Standalone集群部署

1、節(jié)點(diǎn)劃分

通過Flink運(yùn)行時架構(gòu)小結(jié),我們知道Flink集群是由一個JobManager(Master)節(jié)點(diǎn)和多個TaskManager(Worker)節(jié)點(diǎn)構(gòu)成,并且有對應(yīng)提交任務(wù)的客戶端。這里部署Standalone集群基于Linux Centos7.6版本,選擇4臺節(jié)點(diǎn)進(jìn)行部署Flink,其中3臺節(jié)點(diǎn)Standalone集群節(jié)點(diǎn)、一臺節(jié)點(diǎn)是提交Flink任務(wù)的客戶端,各個節(jié)點(diǎn)需要滿足以下特點(diǎn):


(資料圖片)

各節(jié)點(diǎn)安裝java8版本及以上jdk(這里選擇jdk8)。各個節(jié)點(diǎn)之間需要兩兩免密。

4臺節(jié)點(diǎn)角色劃分如下:

節(jié)點(diǎn)IP

節(jié)點(diǎn)名稱

Flink服務(wù)

192.168.179.4

node1

JobManager,TaskManager

192.168.179.5

node2

TaskManager

192.168.179.6

node3

TaskManager

192.168.179.7

node4

client

2、standalone集群部署

我們可以從Flink的官網(wǎng)下載Flink最新的安裝包,這里選擇Flink1.16.0版本,F(xiàn)link安裝包下載地址:https://flink.apache.org/downloads.html#apache-flink-1160。Standalone集群部署步驟如下:

上傳壓縮包解壓

將Flink的安裝包上傳到node1節(jié)點(diǎn)/software下并解壓:

[root@node1 software]# tar -zxvf ./flink-1.16.0-bin-scala_2.12.tgz

配置Master節(jié)點(diǎn)

配置Master節(jié)點(diǎn)就是配置JobManager節(jié)點(diǎn),在$FLINK_HOME/conf/masters文件中配置jobManager節(jié)點(diǎn)如下:

#vim $FLINK_HOME/conf/mastersnode1:8081

配置Worker節(jié)點(diǎn)

配置Worker節(jié)點(diǎn)就是配置TaskManager節(jié)點(diǎn),在$FLINK_HOME/conf/workers文件中配置taskManager節(jié)點(diǎn)如下:

#vim $FLINK_HOME/conf/workersnode1node2node3

配置flink-conf.yaml 文件

在node1節(jié)點(diǎn)上進(jìn)入到FLINK_HOME/conf目錄下,配置flink?conf.yaml文件(vimFLINK_HOME/conf/flink-conf.yaml配置如下內(nèi)容),內(nèi)容如下:

# JobManager地址jobmanager.rpc.address: node1# JobManager地址綁定設(shè)置jobmanager.bind-host: 0.0.0.0# TaskManager地址綁定設(shè)置taskmanager.bind-host: 0.0.0.0# TaskManager地址(不同TaskManager節(jié)點(diǎn)host配置對應(yīng)的host)taskmanager.host: node1# 設(shè)置每個TaskManager 的slot個數(shù)taskmanager.numberOfTaskSlots: 3# WEB UI 節(jié)點(diǎn)(只需JobManager節(jié)點(diǎn)設(shè)置,TaskManager節(jié)點(diǎn)設(shè)置了也無所謂)rest.address: node1# WEB UI節(jié)點(diǎn)綁定設(shè)置(只需JobManster節(jié)點(diǎn)設(shè)置)rest.bind-address: 0.0.0.0

注意:以上設(shè)置的0.0.0.0代表監(jiān)聽當(dāng)前節(jié)點(diǎn)每一個可用的網(wǎng)絡(luò)接口,0.0.0.0不再是一個真正意義上的ip地址,而表示一個集合,監(jiān)聽0.0.0.0的端口相當(dāng)于是可以監(jiān)聽本機(jī)中的所有ip端口。以上配置的0.0.0.0 表示想要讓外部訪問需要設(shè)置具體ip,或者直接設(shè)置為"0.0.0.0"。

分發(fā)安裝包并配置node2 、node3 節(jié)點(diǎn)flink-conf.yaml 文件

#分發(fā)到node2、node3節(jié)點(diǎn)上[root@node1 ~]# scp -r /software/flink-1.16.0 node2:/software/[root@node1 ~]# scp -r /software/flink-1.16.0 node3:/software/#修改node2、node3 節(jié)點(diǎn)flink-conf.yaml文件中的TaskManager【node2節(jié)點(diǎn)】 taskmanager.host: node2【node3節(jié)點(diǎn)】 taskmanager.host: node3#注意,這里發(fā)送到node4,node4只是客戶端[root@node1 ~]# scp -r /software/flink-1.16.0 node4:/software/

啟動Flink 集群

#在node1節(jié)點(diǎn)中,啟動Flink集群[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./start-cluster.sh

訪問Flink WebUI

https://node1:8081,進(jìn)入頁面如下:

3、任務(wù)提交測試

Standalone集群搭建完成后,可以將Flink任務(wù)提交到Flink Standalone集群中運(yùn)行。有兩種方式提交Flink任務(wù),一種是在WebUI界面上提交Flink任務(wù),一種方式是通過命令行方式。

這里編寫讀取Socket數(shù)據(jù)進(jìn)行實(shí)時WordCount統(tǒng)計(jì)Flink任務(wù)提交到Flink集群中運(yùn)行,這里以Flink Java代碼為例來實(shí)現(xiàn),代碼如下:

/** * 讀取Socket數(shù)據(jù)進(jìn)行實(shí)時WordCount統(tǒng)計(jì) */public class SocketWordCount {    public static void main(String[] args) throws Exception {        //1.準(zhǔn)備環(huán)境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2.讀取Socket數(shù)據(jù)        DataStreamSource ds = env.socketTextStream("node5", 9999);        //3.準(zhǔn)備K,V格式數(shù)據(jù)        SingleOutputStreamOperator> tupleDS = ds.flatMap((String line, Collector> out) -> {            String[] words = line.split(",");            for (String word : words) {                out.collect(Tuple2.of(word, 1));            }        }).returns(Types.TUPLE(Types.STRING, Types.INT));        //4.聚合打印結(jié)果        tupleDS.keyBy(tp -> tp.f0).sum(1).print();        //5.execute觸發(fā)執(zhí)行        env.execute();    }}

以上代碼編寫完成后,在對應(yīng)的項(xiàng)目Maven pom 文件中加入以下plugin:

            maven-assembly-plugin      2.6                                        jar-with-dependencies                                      xx.xx.xx                                                make-assembly          package                      assembly                              

然后使用Maven assembly 插件對項(xiàng)目進(jìn)行打包,得到"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"完整jar包。

此外,代碼中讀取的是node5節(jié)點(diǎn)scoket 9999端口數(shù)據(jù),需要在node5節(jié)點(diǎn)上安裝nc組件:

[root@node5 ~]# yum -y install nc
命令行提交Flink任務(wù)

node1 上啟動Flink Standalone 集群

[root@node1 bin]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./start-cluster.sh

node5 節(jié)點(diǎn)上啟動nc socket服務(wù)

[root@node5 ~]# nc -lk 9999

將打好的包提交到Flink 客戶端node4 節(jié)點(diǎn)/root 目錄下并提交任務(wù)

[root@node4 ~]# cd /software/flink-1.16.0/bin/#向Flink集群中提交任務(wù)[root@node4 bin]# ./flink run -m node1:8081 -c com.mashibing.flinkjava.code.lesson03.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

進(jìn)入Flink WebUI 界面查看任務(wù)和結(jié)果

#向node5 socket 9999 端口寫入以下數(shù)據(jù)hello,a hello,bhello,chello,a

WebUI查看對應(yīng)任務(wù)和結(jié)果

登錄Flink WebUI http://node1:8081查看對應(yīng)任務(wù)執(zhí)行情況。

WebUI查看執(zhí)行結(jié)果:

在WebUI中點(diǎn)擊對應(yīng)的任務(wù)Job,進(jìn)入如下頁面點(diǎn)擊"Cancel Job"取消任務(wù)執(zhí)行:

Web界面提交 Flink任務(wù)

向Flink集群提交任務(wù)還可以通過WebUI方式提交。點(diǎn)擊上傳jar包,進(jìn)行參數(shù)配置,并提交任務(wù)。

提交任務(wù)之后,可以通過WebUI頁面查看提交任務(wù),輸入數(shù)據(jù)之后可以在對應(yīng)的TaskManager節(jié)點(diǎn)上看到相應(yīng)結(jié)果。

二、Flink On Yarn

Flink可以基于Yarn來運(yùn)行任務(wù),Yarn作為資源提供方,可以根據(jù)Flink任務(wù)資源需求動態(tài)的啟動TaskManager來提供資源。Flink基于Yarn提交任務(wù)通常叫做Flink On Yarn,Yarn資源調(diào)度框架運(yùn)行需要有Hadoop集群,Hadoop版本最低是2.8.5。

1、Flink不同版本與Hadoop整合

Flink基于Yarn提交任務(wù)時,需要Flink與Hadoop進(jìn)行整合。Flink1.8版本之前,F(xiàn)link與Hadoop整合是通過Flink官方提供的基于對應(yīng)hadoop版本編譯的安裝包來實(shí)現(xiàn),例如:flink-1.7.2-bin-hadoop24-scala_2.11.tgz,在Flink1.8版本后不再支持基于不同Hadoop版本的編譯安裝包,F(xiàn)link與Hadoop進(jìn)行整合時,需要在官網(wǎng)中下載對應(yīng)的Hadoop版本的"flink-shaded-hadoop-2-uber-x.x.x-x.x.jar"jar包,然后后上傳到提交Flink任務(wù)的客戶端對應(yīng)的$FLINK_HOME/lib中完成Flink與Hadoop的整合。

在Flink1.11版本之后不再提供任何更新的flink-shaded-hadoop-x jars,F(xiàn)link與Hadoop整合統(tǒng)一使用基于Hadoop2.8.5編譯的Flink安裝包,支持與Hadoop2.8.5及以上Hadoop版本(包括Hadoop3.x)整合。在Flink1.11版本后與Hadoop整合時還需要配置HADOOP_CLASSPATH環(huán)境變量來完成對Hadoop的支持。

2、Flink on Yarn 配置及環(huán)境準(zhǔn)備

Flink 基于Yarn提交任務(wù),向Yarn集群中提交Flink任務(wù)的客戶端需要滿足以下兩點(diǎn)

客戶端安裝了Hadoop2.8.5+版本的hadoop。客戶端配置了HADOOP_CLASSPATH環(huán)境變量。

這里選擇node5節(jié)點(diǎn)作為提交Flink的客戶端,該節(jié)點(diǎn)已經(jīng)安裝了Hadoop3.3.4版本,然后在該節(jié)點(diǎn)中配置profile文件,加入以下環(huán)境變量:

# vim /etc/profile,加入以下配置export HADOOP_CLASSPATH=`hadoop classpath`#source /etc/profile 使環(huán)境變量生效[root@node5 ~]# source /etc/profile

然后將Flink的安裝包上傳到node5節(jié)點(diǎn)/software下并解壓:

[root@node5 software]# tar -zxvf ./flink-1.16.0-bin-scala_2.12.tgz

3、任務(wù)提交測試

基于Yarn運(yùn)行Flink任務(wù)只能通過命令行方式進(jìn)行任務(wù)提交,F(xiàn)link任務(wù)基于Yarn運(yùn)行時有幾種任務(wù)提交部署模式(后續(xù)章節(jié)會進(jìn)行介紹),下面以Application模式來提交任務(wù)。步驟如下:

啟動HDFS集群
#在 node3、node4、node5節(jié)點(diǎn)啟動zookeeper[root@node3 ~]#  zkServer.sh start[root@node4 ~]#  zkServer.sh start[root@node5 ~]#  zkServer.sh start#在node1啟動HDFS集群[root@node1 ~]# start-all.sh
將 Flink任務(wù)對應(yīng)的jar 包上傳到node5 節(jié)點(diǎn)

這里的Flink任務(wù)還是以讀取Socket數(shù)據(jù)做實(shí)時WordCount任務(wù)為例,將打好的"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"jar包上傳到node5節(jié)點(diǎn)的/root/目錄下。

node5 節(jié)點(diǎn)執(zhí)行如下命令運(yùn)行Flink 作業(yè)
[root@node5 ~]# cd /software/flink-1.16.0/bin/# 提交Flink任務(wù)[root@node5 bin]#./flink run-application -t yarn-application -c com.mashibing.flinkjava.code.chapter3.SocketWordCount /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
查看WebUI及運(yùn)行結(jié)果

Flink任務(wù)Application模式提交后,瀏覽器輸入https://node1:8088登錄Yarn WebUI,找到提交的任務(wù),點(diǎn)擊對應(yīng)的Tracking UI"ApplicationMaster"進(jìn)入到Flink WEBUI任務(wù)頁面。

?

向node5 scoket 9999端口輸入以下數(shù)據(jù)并在對應(yīng)的WebUI中查看結(jié)果:

#向node5 socket 9999 端口寫入以下數(shù)據(jù)hello,a hello,bhello,chello,a

在WebUI中找到對應(yīng)的Flink TaskManager節(jié)點(diǎn) Stdout輸出,結(jié)果如下:

?

?

標(biāo)簽:

推薦文章

X 關(guān)閉

最新資訊

X 關(guān)閉