Flink 常用的几种模式部署

1.1 配置jdk环境

1
2
3
4
5
6
[hadoop@hadoop1 ~]$ tar xf downloads/jdk-8u301-linux-x64.tar.gz
[hadoop@hadoop1 ~]$ vim .bash_profile
export JAVA_HOME=/home/hadoop/jdk1.8.0_301
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
1
2
3
4
5
[hadoop@hadoop1 ~/downloads]$ wget https://dlcdn.apache.org/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.11.tgz
[hadoop@hadoop1 ~]$ tar xf downloads/flink-1.14.0-bin-scala_2.11.tgz

#启动flink
[hadoop@hadoop1 ~]$ ./flink-1.14.0/bin/start-cluster.sh

1.3 访问 web 页面

图片1

默认端口8081,端口冲突可以修改 conf/flink-conf.yaml 配置项 rest.port: 进行修改

1.4 提交作业(Job)

Flink 的 Releases 附带了许多的示例作业。可以任意选择一个,快速部署到已运行的集群上。

1
[hadoop@hadoop1 ~/flink-1.14.0]$ ./bin/flink run examples/streaming/WordCount.jar

flink run命令参数

-c,–class <classname>: Flink应用程序的入口
-C,–classpath <url>: 指定所有节点都可以访问到的url,可用于多个应用程序都需要的工具类加载
-d,–detached: 是否使用分离模式,就是提交任务,cli是否退出,加了-d参数,cli会退出
-n,–allowNonRestoredState: 允许跳过无法还原的savepoint。比如删除了代码中的部分operator
-p,–parallelism <parallelism>: 执行并行度
-s,–fromSavepoint: 从savepoint恢复任务
-sae,–shutdownOnAttachedExit: 以attached模式提交,客户端退出的时候关闭集群
flink yarn-cluster: 模式

yarn-cluster模式

-d,–detached: 是否使用分离模式
-m,–jobmanager <arg>: 指定提交的jobmanager
-yat,–yarnapplicationType <arg>: 设置yarn应用的类型
-yD <property=value>: 使用给定属性的值
-yd,–yarndetached: 使用yarn分离模式
-yh,–yarnhelp: yarn session的帮助
-yid,–yarnapplicationId <arg>: 挂到正在运行的yarnsession上
-yj,–yarnjar <arg>: Flink jar文件的路径
-yjm,–yarnjobManagerMemory <arg>: jobmanager的内存(单位M)
-ynl,–yarnnodeLabel <arg>: 指定 YARN 应用程序 YARN 节点标签
-ynm,–yarnname <arg>: 自定义yarn应用名称
-yq,–yarnquery: 显示yarn的可用资源
-yqu,–yarnqueue <arg>: 指定yarn队列
-ys,–yarnslots <arg>: 指定每个taskmanager的slots数
-yt,–yarnship <arg>: 在指定目录中传输文件
-ytm,–yarntaskManagerMemory <arg>: 每个taskmanager的内存
-yz,–yarnzookeeperNamespace <arg>: 用来创建ha的zk子路径的命名空间
-z,–zookeeperNamespace <arg>: 用来创建ha的zk子路径的命名空间

通用CLI模式

-D <property=value>:允许指定多个常规配置选项。有关可用选项,请访问 https://nightlies.apache.org/flink/flink-docs-stable/ops/config.html
-e,–executor <arg>:不推荐:请改用-t选项,该选项在”Application Mode”中也可用。用于执行给定作业的执行器的名称,相当于“execution.target”配置选项。当前可用的执行器有:”remote”, “local”, “kubernetes-session”, “yarn-per-job”, “yarn-session”。
-t,–target <arg>:给定应用程序的部署目标,相当于”execution.target”配置选项。对于“run”操作,当前可用的目标有:”remote”, “local”, “kubernetes-session”,”yarn-per-job”, “yarn-session”。对于”run-application”操作,当前可用的有:”kubernetes-application”,”yarn-application”。

通过web查看运行情况
图片2

查看运行日志

1
2
3
4
5
6
7
8
9
10
11
[hadoop@hadoop1 ~/flink-1.14.0]$ tail log/flink-hadoop-taskexecutor-0-hadoop1.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)

服务规划

主机名Flink角色其它依赖
hadoop1jobManager(Master)JDK1.8
hadoop2taskManager(Worker)JDK1.8
hadoop3taskManager(Worker)JDK1.8

2.1 准备工作

服务器免密登录

1
2
3
4
5
[dev@hadoop1 ~]# su - hadoop
[hadoop@hadoop1 ~]$ ssh-keygen
[hadoop@hadoop1 ~]$ ssh-copy-id hadoop1
[hadoop@hadoop1 ~]$ ssh-copy-id hadoop2
[hadoop@hadoop1 ~]$ ssh-copy-id hadoop3

配置jdk 1.8环境

1
2
3
4
5
6
7
8
[hadoop@hadoop1 ~]$ tar xf downloads/jdk-8u301-linux-x64.tar.gz
[hadoop@hadoop1 ~]$ vim .bash_profile
export JAVA_HOME=/home/hadoop/jdk1.8.0_301
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

[hadoop@hadoop1 ~]$ source .bash_profile
1
2
3
#下载flink
[hadoop@hadoop1 ~/downloads]$ wget https://dlcdn.apache.org/flink/flink-1.14.0/flink-1.14.0-bin-scala_2.11.tgz
[hadoop@hadoop1 ~]$ tar xf downloads/flink-1.14.0-bin-scala_2.11.tgz

修改 flink-conf.yaml 配置文件

1
2
3
4
5
6
7
8
9
[hadoop@hadoop1 ~/flink-1.14.0]$ grep -Ev "^$|#" conf/flink-conf.yaml
jobmanager.rpc.address: hadoop1
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
rest.port: 8089

修改 masters 配置文件

1
2
[hadoop@hadoop1 ~/flink-1.14.0]$ vim conf/masters
hadoop1:8089

修改 workers 配置文件

1
2
3
[hadoop@hadoop1 ~/flink-1.14.0]$ vim conf/workers
hadoop2
hadoop3
1
2
[hadoop@hadoop1 ~]$ rsync -av flink-1.14.0 hadoop@hadoop2:~/
[hadoop@hadoop1 ~]$ rsync -av flink-1.14.0 hadoop@hadoop3:~/
1
[hadoop@hadoop1 ~/flink-1.14.0]$ ./bin/start-cluster.sh

默认端口8081,端口冲突可以修改 conf/flink-conf.yaml 配置项 rest.port: 进行修改

访问web页面
图片3

2.5 提交示例作业

1
[hadoop@hadoop1 ~/flink-1.14.0]$ ./bin/flink run examples/streaming/WindowJoin.jar

在web查看任务
图片4

服务规划

主机名Flink角色其它依赖
hadoop1jobManagerJDK1.8、zookeeper
hadoop2jobManager、taskManagerJDK1.8、zookeeper
hadoop3taskManagerJDK1.8、zookeeper

前置条件

Flink Standalone、zookeeper集群是可用的

修改 flink-conf.yaml 配置文件,增加以下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
[hadoop@hadoop1 ~/flink-1.14.0]$ vim conf/flink-conf.yaml
#高可用性配置
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha
high-availability.zookeeper.quorum: hadoop1:2181, hadoop2:2181, hadoop3:2181
high-availability.zookeeper.path.root: ./flink
high-availability.cluster-id: /cluster_one

#checkpoint配置
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink-checkpoints
state.savepoints.dir: hdfs:///flink-savepoints
state.checkpoints.num-retained: 20

修改 masters 配置文件

1
2
3
[hadoop@hadoop1 ~/flink-1.14.0]$ vim conf/masters
hadoop1:8089
hadoop2:8089

3.2 下载 Hadoop 附加组件

在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar,需要额外下载jar包并放在Flink的lib目录下,使Flink能够支持对Hadoop的操作

前往官网 https://flink.apache.org/downloads.html ,找到 Additional Components 选项,下载相应Hadoop版本的包

1
2
[hadoop@hadoop1 ~/flink-1.14.0]$ wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
[hadoop@hadoop1 ~/flink-1.14.0]$ mv flink-shaded-hadoop-2-uber-2.7.5-10.0.jar lib/

或者添加 HADOOP_CLASSPATH 环境变量,也是可以的

1
2
3
4
[hadoop@hadoop1 ~]$ vim .bash_profile
export HADOOP_HOME=/home/hadoop/hadoop-2.7.2
export PATH=$PATH:$HADOOP_HOME/bin
export HADOOP_CLASSPATH=`hadoop classpath`
1
2
[hadoop@hadoop1 ~]$ rsync -av flink-1.14.0 hadoop@hadoop2:~/
[hadoop@hadoop1 ~]$ rsync -av flink-1.14.0 hadoop@hadoop3:~/

除此之外,需要将master2上 flink-conf.yaml 配置中的监听地址修改为master2所在机器

1
2
[hadoop@hadoop2 ~/flink-1.14.0]$ vim conf/flink-conf.yaml
jobmanager.rpc.address: hadoop2
1
[hadoop@hadoop1 ~/flink-1.14.0]$ ./bin/start-cluster.sh

3.5 验证高可用

通过 zookeeper 查看当前集群的master是那台节点

1
2
3
[hadoop@hadoop1 ~/zookeeper-3.4.8]$ ./bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 2] get /flink/cluster_one/leader/rest_server/connection_info
��whttp://hadoop1:8089srjava.util.UUID����m�/J

关闭当前的master进程

1
2
3
[hadoop@hadoop1 ~/flink-1.14.0]$ jps |grep StandaloneSessionClusterEntrypoint
10582 StandaloneSessionClusterEntrypoint
[hadoop@hadoop1 ~/flink-1.14.0]$ kill 10582

提交示例作业

1
[hadoop@hadoop2 ~/flink-1.14.0]$ ./bin/flink run examples/streaming/WordCount.jar

在另一台master web查看作业
图片5

在YARN上使用Flink有3种模式:Per-Job模式、Session模式和Application模式。

前置条件

Hadoop,Yarn集群是可用的
配置好 HADOOP_HOME 环境变量

4.1 Session 模式

启动flink集群

1
[hadoop@hadoop1 ~/flink-1.14.0]$ ./bin/yarn-session.sh -n 3 -s 3 -nm yarn-session1

yarn-session.sh 参数说明

-n,–container <arg>: 表示分配容器的数量(也就是 TaskManager 的数量)。
-D <arg>: 动态属性。
-d,–detached: 在后台独立运行。
-jm,–jobManagerMemory <arg>:设置 JobManager 的内存,单位是 MB。
-nm,–name:在 YARN 上为一个自定义的应用设置一个名字。
-q,–query:显示 YARN 中可用的资源(内存、cpu 核数)。
-qu,–queue <arg>:指定 YARN 队列。
-s,–slots <arg>:每个 TaskManager 使用的 Slot 数量。
-tm,–taskManagerMemory <arg>:每个 TaskManager 的内存,单位是 MB。
-z,–zookeeperNamespace <arg>:针对 HA 模式在 ZooKeeper 上创建 NameSpace。
-id,–applicationId <yarnAppId>:指定 YARN 集群上的任务 ID,附着到一个后台独立运行的 yarn session 中

查看yarn UI界面,可以看到提交的application
图片6

使用flink run提交示例作业

1
2
3
[hadoop@hadoop1 ~/flink-1.14.0]$ ./bin/flink run -t yarn-session \
-Dyarn.application.id=application_1635158058849_0059 \
examples/batch/WordCount.jar

之后可以通过yarn找到相应的 application 中的 ApplicationMaster 进入flink管理页面查看任务运行情况
图片7

图片8

图片9

关闭yarn-session

1
[hadoop@hadoop1 ~/flink-1.14.0]$ yarn application -kill application_1635158058849_0059

查看yarn UI中的application状态
图片10

session模式将在 /tmp/.yarn-properties-<username> 中创建一个隐藏文件,在提交作业时,命令行界面将拾取该文件进行集群发现。

4.2 Per-Job模式

直接提交作业

1
2
3
4
5
[hadoop@hadoop1 ~/flink-1.14.0]$ ./bin/flink run -t yarn-per-job \
-yjm 4096 \
-ytm 16384 \
-ys 4 \
examples/batch/WordCount.jar

在yarn UI中查看application执行状态
图片11

图片12

4.3 Application 模式

直接提交作业

1
2
3
4
5
[hadoop@hadoop2 ~/flink-1.14.0]$ ./bin/flink run-application -t yarn-application \
-yjm 4096 \
-ytm 16384 \
-ys 4 \
examples/batch/WordCount.jar

在yarn UI中查看application执行状态
图片13

-------------本文结束感谢您的阅读-------------