Hadoop 计算引擎Flink

Apache Flink 是一个框架和分布式处理引擎,用于在 无边界有边界 数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

1.1 无界和有界数据

任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。

数据可以被作为 无界 或者 有界 流来处理。

  1. 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  2. 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理。

img

Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

1.2 可靠的容错能力

在分布式系统中,硬件故障、进程异常、应用异常、网络故障等多种多样的异常无处不在。像Flink这样的分布式计算引擎必须能够从故障中恢复到正常状态,以便实现全天候运行。这就要求引擎在故障发生后不仅可以重新启动应用程序,还要确保其内部状态保持一致,从最后一次正确的点重新执行,从用户的角度来说,最终的计算结果与未发生故障是一样的。

1.3 集群级容错

(1)与集群管理器集成
Flink与集群管理器紧密集成,例如Hadoop YARN、Mesos或Kubernetes。当进程挂掉时,将自动启动一个新进程来接管它的工作。
(2)高可用性设置
Flink具有高可用性模式特性,可消除所有单点故障。HA模式基于Apache ZooKeeper,Zookeeper是一种经过验证的可靠的分布式协调服务。

1.4 应用级容错

Flink使用轻量级分布式快照机制,设计了检查点(Checkpoint)来实现可靠的容错。其特性如下。
(1)一致性
Flink的恢复机制基于应用程序状态的一致性检查点。如果发生故障,将重新启动应用程序并从最新检查点加载其状态。结合可重放的流数据源,此特性可以保证精确、一次的状态一致性。
Flink、Spark、Storm等都支持引擎内的Exactly-Once语义,即确保数据仅处理一次,不会重复也不会丢失。但是在把结果写入外部存储的时候,可能会发生存储故障、网络中断、Flink应用异常恢复等多种情况,在这些情况下,部分数据可能已经写入外部存储,重复执行可能导致数据的重复写出,此时需要开发者为写出到外部存储的行为保证幂等性。
在Spark、Storm中需要开发者自行实现Sink,实现端到端的Exactly-Once行为。而Flink利用检查点特性,在框架层面提供了Exactly-Once的支持,内置了支持Exactly-Once语义的Sink,即使出现故障,也能保证数据只写出一次。
(2)轻量级
对于长期运行的Flink应用程序,其检查点的状态可能高达TB级,生成和保存检查应用程序的检查点成本非常高。所以Flink提供了检查点的执行异步和增量检查点,以便尽量降低生成和保存检查点带来的计算负荷,避免数据处理的延迟异常变大和吞吐量的短暂剧降。

1.5 高吞吐、低延迟

从Storm流计算引擎开始,大家似乎留下了这样一个印象,要实现低延迟,就要牺牲吞吐量,高吞吐、低延迟是流处理引擎的核心矛盾。以 Storm 为代表的第一代流计算引擎可以做到几十毫秒的处理延迟,但是吞吐量确实不高。后来的 Spark Streaming 基于mini-batch的流计算框架能够实现较高的吞吐量,但是数据处理的延迟不甚理想,一般可达到秒级。
Flink 借助轻量级分布式快照机制,能够定时生成分布式快照,并将快照保存到外部存储中。检查点之间的数据处理被当作是原子的,如果失败,直接回到上一个检查点重新执行即可。在整个数据处理过程中不会产生阻塞,不必像mini-batch机制一样需要等待调度,可以持续处理数据,容错开销非常低。Flink在数据的计算、传输、序列化等方面也做了大量的优化,既能保持数据处理的低延迟,也能尽可能地提高吞吐量。

1.6 大规模复杂计算

Flink在设计之初就非常在意性能相关的任务状态和流控等关键技术的设计,这些都使得用Flink执行复杂的大规模任务时性能更胜一筹。
对于大规模复杂计算,尤其是长期运行的流计算应用而言,有状态计算是大数据计算引擎中一个比较大的需求点。所谓的有状态计算就是要结合历史信息进行的计算,例如对于反欺诈行为的识别,要根据用户在近几分钟之内的行为做出判断。一旦出现异常,就需要重新执行流计算任务,但重新处理所有的原始数据是不现实的,而Flink的容错机制State能够使Flink的流计算作业恢复到近期的一个时间点,从这个时间点开始执行流计算任务,这无疑能够大大降低大规模任务失败恢复的成本。
Flink为了提供有状态计算的性能,针对本地状态访问进行了优化,任务状态始终驻留在内存中,如果状态大小超过可用内存,则保存在高效磁盘上的数据结构中。因此,任务通过访问本地(通常是内存中)状态来执行所有计算,从而达到特别低的处理延迟。Flink通过定期和异步检查点将本地状态进行持久存储来保证在出现故障时实现精确、一次的状态一致性。
Flink的轻量级容错机制也能够尽量降低大规模数据处理时的调度、管理成本,计算规模的增大不会显著增加容错,数据吞吐不会剧烈下降,数据延迟不会急剧增大。

1.7 多平台部署

Flink是一个分布式计算系统,需要计算资源才能执行应用程序。Flink可以与所有常见的集群资源管理器(如Hadoop YARN、Apache Mesos和Kubernetes)集成,也可以在物理服务器上作为独立集群运行。
为了实现不同的部署模式,Flink设计了一套资源管理框架,针对上面提到的资源管理平台实现了对应的资源管理器(ResourceManager),能够与上面提到的资源管理平台无缝对接。
部署Flink应用程序时,Flink会根据应用程序配置的并行度自动识别所需资源,并向资源管理器申请资源。如果发生故障,Flink会通过请求新的资源来替换发生故障的资源。Flink提供了提交或控制应用程序的REST接口,方便与外部应用进行集成,管理Flink作业。

图片1

从上到下依次是

2.1.1 Libraries层

该层也可以称为Flink应用框架层,是指根据API层的划分,在API层之上构建的满足特定应用场景的计算框架,总体上分为流计算和批处理两类应用框架。面向流计算的应用框架有流上SQL(Flink Table&SQL)、CEP(复杂事件处理),面向批处理的应用框架有批上SQL(Flink Table&SQL)、Flink ML(机器学习)、Gelly(图处理)。
(1)Table&SQL
Table&SQL是Flink中提供SQL语义支持的内置应用框架,其中Table API提供Scala和Java语言的SQL语义支持,允许开发者使用编码的方式实现SQL语义。SQL基于Apache Calcite,支持标准SQL,使用者可以在应用中直接使用SQL语句,同时也支持Table API和SQL的混合编码。

(2)CEP
CEP本质上是一种实时事件流上的模式匹配技术,是实时事件流上常见的用例。CEP通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定匹配规则,持续地从事件流中匹配出符合要求的事件序列,通过模式组合能够识别更加复杂的事件序列,主要用于反欺诈、风控、营销决策、网络安全分析等场景。
(3)Gelly
Gelly是一个可扩展的图形处理和分析库。Gelly是在DataSet API之上实现的,并与DataSet API集成在一起。因此,它受益于其可扩展且强大的操作符。Gelly具有内置算法,如label propagation(标签传播)、triangle (4)(4)ML
Flink ML是Flink的机器学习框架,定位类似于Spark MLLib,但是在目前阶段其实现的算法和成熟度距离Spark MLLib有较大差距,不具备生产环境的可用性,在Flink1.9之后的版本中会对其进行重构。

2.1.2 API层

API层是Flink对外提供能力的接口,实现了面向流计算的DataStream API和面向批处理的DataSet API。理论上来说,Flink的API应该像Apache Beam、Spark那样实现API层流批统一,但是目前却依然是两套系统,使用起来并不方便,所以社区也在以DataStream API为核心,推进批流API的统一。

2.1.3 运行时层

运行时层提供了支持Flink集群计算的核心,将开发的Flink应用分布式执行起来,包含如下内容。
1)DAG抽象:将分布式计算作业拆分成并行子任务,每个子任务表示数据处理的一个步骤,并且在上下游之间建立数据流的流通关系。
2)数据处理:包含了开发层面、运行层面的数据处理抽象,例如包含数据处理行为的封装、通用数据运算的实现(如Join、Filter、Map等)。
3)作业调度:调度批流作业的执行。
4)容错:提供了集群级、应用级容错处理机制,保障集群、作业的可靠运行。
5)内存管理、数据序列化:通过序列化,使用二进制方式在内存中存储数据,避免JVM的垃圾回收带来的停顿问题。
6)数据交换:数据在计算任务之间的本地、跨网络传递。
Flink运行时层并不是给一般的Flink应用开发者使用的。

2.1.4 部署层

该层是Flink集群部署抽象层,Flink提供了灵活的部署模式,可以本地运行、与常见的资源管理集群集成,也支持云上的部署。
Flink支持多种部署模式:
1)Standalone模式:Flink安装在普通的Linux机器上,或者安装在K8s中,集群的资源由Flink自行管理。
2)Yarn、Mesos、K8s等资源管理集群模式:Flink向资源集群申请资源,创建Flink集群。
3)云上模式:Flink可以在Google、亚马逊云计算平台上轻松部署。

Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager

图片2

2.2.1 组件介绍

Flink客户端
Flink客户端是Flink提供的CLI命令行工具,用来提交Flink作业到Flink集群,在客户端中负责Stream Graph(流图)和Job Graph(作业图)的构建。使用Table API和SQL编写的Flink应用,还会在客户端中负责SQL解析和优化。
Flink的Flip改进建议中提出了新的模式,SQL解析、优化,StreamGraph、JobGraph、ExecutionGraph构建转换等全部都会在JobManager中完成,这在Flink1.10后续版本中实现。

JobManager
JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:

Dispatcher:提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息;
ResourceManager:负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。

JobManager:负责管理作业的执行,在一个 Flink 集群中可能有多个作业同时执行,每个作业都有自己的 JobManager 组件。

TaskManager
TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。
必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子(请参考Tasks 和算子链)。

TaskManager接收JobManager分发的子任务,根据自身的资源情况,管理子任务的启动、停止、销毁、异常恢复等生命周期阶段。作业启动后开始从数据源消费数据、处理数据,并写入外部存储中。

2.2.2 流程分析

1)当用户提交作业的时候,提交脚本会首先启动一个 Client进程负责作业的编译与提交。它首先将用户编写的代码编译为一个 JobGraph,在这个过程,它还会进行一些检查或优化等工作,例如判断哪些 Operator 可以 Chain 到同一个 Task 中。然后,Client 将产生的 JobGraph 提交到集群中执行。此时有两种情况,一种是类似于 Standalone 这种 Session 模式,AM 会预先启动,此时 Client 直接与 Dispatcher 建立连接并提交作业即可。另一种是 Per-Job 模式,AM 不会预先启动,此时 Client 将首先向资源管理系统 (如Yarn、K8S)申请资源来启动 AM,然后再向 AM 中的 Dispatcher 提交作业。

2)当作业到 Dispatcher 后,Dispatcher 会首先启动一个 JobManager 组件,然后 JobManager 会向 ResourceManager 申请资源来启动作业中具体的任务。如果是Session模式,则TaskManager已经启动了,就可以直接分配资源。如果是per-Job模式,ResourceManager 也需要首先向外部资源管理系统申请资源来启动 TaskExecutor,然后等待 TaskExecutor 注册相应资源后再继续选择空闲资源进程分配,JobManager 收到 TaskExecutor 注册上来的 Slot 后,就可以实际提交 Task 了。

3)TaskExecutor 收到 JobManager 提交的 Task 之后,会启动一个新的线程来执行该 Task。Task 启动后就会开始进行预先指定的计算,并通过数据 Shuffle 模块互相交换数据。

  • source:数据源
  • transformations:基于数据流的一组operate操作
  • sink:数据处理结果的目的地

图片7

2.3.2 并行数据流

图片8

  • 在flink中,transformation是由一组operator组成,每一个operator被分割成operator subtask,同一个operator的多个 subtasks在不同的线程、不同的物理机或不同的容器中彼此互不依赖得并行执行。
  • Stream在operator有两种形式(One-to-one:类似于spark中的窄依赖,Redistributing类似于spark中的宽依赖)

2.3.3 operator chains

图片9

出于分布式程序效率考虑,Flink将前后有依赖关系的一组operator的subtask链接在一起形成operator chains。operator chain在一个线程中执行,它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。链接的行为可以在编程API中进行指定。

2.3.4 窗口

flink可以基于窗口对在流上对数据进行聚合操作。flink支持的窗口有:

图片10

  • 时间窗口(tumbing windows(不重叠),sliding windows(有重叠,session windows(有空隙的活动))
  • 数据窗口(tumbing windows(不重叠),sliding windows(有重叠,session windows(有空隙的活动))
  • 事件窗口

2.3.5 时间

Stream中的记录时,记录中通常会包含各种典型的时间字段,Flink支持多种时间的处理:

图片11

  • event Time:表示事件创建时间;
  • Ingestion Time:表示事件进入到Flink Dataflow的时间;
  • Processing Time:表示某个Operator对事件进行处理时的本地系统时间(是在TaskManager节点上)。

图片12

1.Flink程序由JobClient进行提交;
2.JobClient将作业提交给JobManager;
3.JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager;
4.TaskManager启动一个线程以开始执行。TaskManager会向JobManager报告状态更改,如开始执行,正在进行或已完成;
5.作业执行完成后,结果将发送回客户端(JobClient)。

图片13

1.client客户端提交任务给JobManager;
2.JobManager负责申请任务运行所需要的资源并管理任务和资源;
3.JobManager分发任务给TaskManager执行;
4.TaskManager定期向JobManager汇报状态。

Standalone模式需要先启动Jobmanager和TaskManager进程,每一个作业都是自己的JobManager。 Client:任务提交,生成JobGraph

JobManager:调度Job,协调Task,通信,申请资源;
TaskManager:具体任务执行,请求资源。

Flink如何和Yarn进行交互的

图片14

1.Client上传jar包和配置文件到HDFS集群上;
2.Client向Yarn ResourceManager提交任务并申请资源;
3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager;

  • JobManager和ApplicationMaster运行在同一个container上;
  • 一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器);
  • 它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager);
  • 这个配置文件也被上传到HDFS上;
  • 此外,AppMaster容器也提供了Flink的web服务接口;
  • YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink;

4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager;
5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。

长久以来,在YARN集群中部署Flink作业有两种模式,即Session Mode和Per-Job Mode,而在Flink 1.11版本中,又引入了第三种全新的模式:Application Mode。

3.3.1 Session 模式

Session模式是预分配资源的,也就是提前根据指定的资源参数初始化一个Flink集群,并常驻在YARN系统中,拥有固定数量的JobManager和TaskManager(注意JobManager只有一个)。提交到这个集群的作业可以直接运行,免去每次分配资源的overhead。但是Session的资源总量有限,多个作业之间又不是隔离的,故可能会造成资源的争用;如果有一个TaskManager宕机,它上面承载着的所有作业也都会失败。另外,启动的作业越多,JobManager的负载也就越大。所以,Session模式一般用来部署那些对延迟非常敏感但运行时长较短的作业

Session 模式特点:
1)共享Dispatcher与ResourceManager;
2)共享资源;
3)适合小规模,执行时间较短的作业。

3.3.2 Per-Job 模式

顾名思义,在Per-Job模式下,每个提交到YARN上的作业会各自形成单独的Flink集群,拥有专属的JobManager和TaskManager。可见,以Per-Job模式提交作业的启动延迟可能会较高,但是作业之间的资源完全隔离,一个作业的TaskManager失败不会影响其他作业的运行,JobManager的负载也是分散开来的,不存在单点问题。当作业运行完成,与它关联的集群也就被销毁,资源被释放。所以,Per-Job模式一般用来部署那些长时间运行的作业

Per-Job 模式特点:
1)独享Dispatcher与ResourceManager ;
2)按需申请资源(TaskExecutor) ;
3)适合执行时间较长的大作业 。

存在的问题

上述Session模式和Per-Job模式可以用如下的简图表示,其中红色、蓝色和绿色的图形代表不同的作业。

图片15

Deployer代表向YARN集群发起部署请求的节点,一般来讲在生产环境中,也总有这样一个节点作为所有作业的提交入口(即客户端)。在main()方法开始执行直到env.execute()方法之前,客户端也需要做一些工作,即:

1)获取作业所需的依赖项;
2)通过执行环境分析并取得逻辑计划,即StreamGraph→JobGraph;
3)将依赖项和JobGraph上传到集群中。
只有在这些都完成之后,才会通过env.execute()方法触发Flink运行时真正地开始执行作业。试想,如果所有用户都在Deployer上提交作业,较大的依赖会消耗更多的带宽,而较复杂的作业逻辑翻译成JobGraph也需要吃掉更多的CPU和内存,客户端的资源反而会成为瓶颈——不管Session还是Per-Job模式都存在此问题。为了解决它,社区在传统部署模式的基础上实现了Application模式。

3.3.3 Application 模式

图片16

可见,原本需要客户端做的三件事被转移到了JobManager里,也就是说main()方法在集群中执行(入口点位于ApplicationClusterEntryPoint),Deployer只需要负责发起部署请求了。另外,如果一个main()方法中有多个env.execute()/executeAsync()调用,在Application模式下,这些作业会被视为属于同一个应用,在同一个集群中执行(如果在Per-Job模式下,就会启动多个集群)。可见,Application模式本质上是Session和Per-Job模式的折衷

用Application模式提交作业的示例命令如下。

1
2
3
4
5
6
7
bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=10 \
-Dyarn.application.name="MyFlinkApp" \
/path/to/my/flink-app/MyFlinkApp.jar

-t参数用来指定部署目标,目前支持YARN(yarn-application)和K8S(kubernetes-application)。-D参数则用来指定与作业相关的各项参数,具体可参见官方文档

那么如何解决传输依赖项造成的带宽占用问题呢?Flink作业必须的依赖是发行包flink-dist.jar,还有扩展库(位于$FLINK_HOME/lib)和插件库(位于$FLINK_HOME/plugin),我们将它们预先上传到像HDFS这样的共享存储,再通过yarn.provided.lib.dirs参数指定存储的路径即可。

1
-Dyarn.provided.lib.dirs="hdfs://myhdfs/flink-common-deps/lib;hdfs://myhdfs/flink-common-deps/plugins"

这样所有作业就不必各自上传依赖,可以直接从HDFS拉取,并且YARN NodeManager也会缓存这些依赖,进一步加快作业的提交过程。同理,包含Flink作业的用户JAR包也可以上传到HDFS,并指定远程路径进行提交。

-------------本文结束感谢您的阅读-------------