Apache Druid 联机分析处理(OLAP)

一、简介

Apache Druid 是一个实时分析数据库,专为大型数据集进行快速的查询分析(OLAP 查询)而设计。Druid最常被当做数据库来用以支持实时摄取、高性能查询和高稳定运行的应用场景。Druid也通常被用来助力分析型应用的图形化界面,或者当做需要快速聚合的高并发后端API,Druid最适合应用于面向事件类型的数据。

1.1 Druid 常见应用领域

Druid的常见应用领域包括:

  • 点击流分析(网络和移动分析)
  • 网络遥测分析(网络性能监控)
  • 服务器指标存储
  • 供应链分析(制造指标)
  • 应用程序性能指标
  • 数字营销/广告分析
  • 商业智能/OLAP

1.2 Druid 的三个设计原则

1.2.1 快速查询

对于数据分析场景,大部分情况下,我们只关心一定粒度聚合的数据,而非每一行原始数据的细节情况。因此,数据聚合粒度可以是1分钟、5分钟、1小时或1天等。部分数据聚合(Partial Aggregate)给Druid争取了很大的性能优化空间。
数据内存化也是提高查询速度的杀手锏。内存和硬盘的访问速度相差近百倍,但内存的大小是非常有限的,因此在内存使用方面要精细设计,比如Druid里面使用了Bitmap和各种压缩技术。
另外,为了支持Drill-Down某些维度,Druid维护了一些倒排索引。这种方式可以加快AND和OR等计算操作。

1.2.2 水平扩展能力

Druid 查询性能在很大程度上依赖于内存的优化使用。数据可以分布在多个节点的内存中,因此当数据增长的时候,可以通过简单增加机器的方式进行扩容。为了保持平衡,Druid按照时间范围把聚合数据进行分区处理。对于高基数的维度,只按照时间切分有时候是不够的(Druid的每个Segment不超过2000万行),故Druid还支持对Segment进一步分区。
历史Segment数据可以保存在深度存储系统中,存储系统可以是本地磁盘、HDFS或远程的云服务。如果某些节点出现故障,则可借助Zookeeper协调其他节点重新构造数据。Druid内置了容易并行化的集合操作,在直方图方面和去重查询方面采用近似算法保证性能,如HyperLoglog,DataSketches等。

1.2.3 实时分析

Druid提供了包含基于时间维度数据的存储服务,并且任何一行数据都是历史真实发生的事件,因此在设计之初就约定事件一但进入系统,就不能再改变。
对于历史数据Druid以Segment数据文件的方式组织,并且将它们存储到深度存储系统中,例如文件系统或亚马逊的S3等。当需要查询这些数据的时候,Druid再从深度存储系统中将它们装载到内存供查询使用。

1.3 特点

Druid的核心架构吸收和结合了数据仓库时序数据库以及检索系统的优势,其主要特征如下:

  • 面向列的存储:Druid 单独存储和压缩每一列,只需要读取特定查询所需的列,支持快速扫描、排名和 groupBys。
  • 本地搜索索引:Druid 为字符串值创建倒排索引以进行快速搜索和过滤。
  • 流式传输和批量摄取:适用于 Apache Kafka、HDFS、AWS S3、流处理器等的开箱即用连接器。
  • 灵活的模式:Druid 优雅地处理不断变化的模式和嵌套数据
  • 时间优化分区:Druid根据时间对数据进行智能分区,基于时间的查询速度明显快于传统数据库。
  • SQL 支持:除了其原生的基于 JSON 的语言外,Druid 还通过 HTTP 或 JDBC 使用SQL
  • 水平扩展性:Druid 已在生产中用于每秒摄取数百万个事件、保留多年数据并提供亚秒级查询。
  • 操作简单:只需添加或删除服务器即可扩大或缩小规模,Druid 会自动重新平衡。容错架构围绕服务器故障进行路由。

1.4 数据格式

  • 数据源(类似数据库中表的概念,存放一类数据)

    • 时间列:每个数据源都需要有的事件时间,是预聚合的主要依据;
    • 维度列:用于标识事件和属性,用于聚合;
    • 指标列:用于聚合计算的列,通常是关键量化指标;

    图片3

  • 数据摄入

    • 实时摄入:Kafka;
    • 批量摄入:HDFS、CSV等;

    图片4

  • 数据查询

    • 原生Json查询,Http接口;
    • SQL查询,除了标准 SQL 运算符之外,Druid 还支持独特的运算符,这些运算符利用其近似算法套件来提供快速计数、排名和分位数。

    图片5

1.5 数据保障

Druid 旨在为需要每周 7 天、每天 24 小时不间断运行的应用程序提供支持。因此,德鲁伊拥有多项功能,可确保正常运行且不会丢失数据。

数据复制

Druid 中的所有数据都复制了可配置的次数,因此单个服务器故障不会对查询产生影响。

独立服务

Druid 明确命名其所有主要服务,每个服务都可以根据用例进行微调。服务可以独立失败而不影响其他服务。例如,如果摄取服务失败,系统中不会加载新数据,但现有数据仍可查询。

自动数据备份

Druid 自动将所有索引数据备份到 HDFS 等文件系统。您可能会丢失整个 Druid 集群并从这些备份数据中快速恢复它。

滚动更新

您可以通过滚动更新在不停机的情况下更新 Druid 集群,也不会对最终用户产生影响。所有德鲁伊版本都向后兼容以前的版本。

1.6 优缺点

优点:
1、支持的数据规模大(本地存储+DeepStorage–HDFS);
2、性能高,列存压缩,预聚合加上倒排索引以及位图索引,秒级查询;
3、实时性高,可以通过kafka实时导入数据。

缺点:
1、灵活性适中,虽然维度之间随意组合,但不支持adhoc查询,不能自由组合查询,且丢失了明细数据(不采用roll-up情况下可以进行明细查询);
2、易用性较差,不支持join,不支持更新,sql支持很弱(有些插件类似于pinot的PQL语言),只能JSON格式查询,对于去重操作不能精准去重;
3、处理方式复杂,需要流处理引擎将数据join成宽表,维护相对复杂;对内存要求较高。

1.7 适合的业务

  • 时序化数据:Druid 可以理解为时序数据库,所有的数据必须有时间字段;
  • 实时数据接入可容忍丢数据(tranquility):目前 tranquility 有丢数据的风险,所以建议实时和离线一起用,实时接当天数据,离线第二天把今天的数据全部覆盖,保证数据完备性;
  • OLAP 查询而不是 OLTP 查询:Druid 查询并发有限,不适合 OLTP 查询;
  • 非精确的去重计算:目前 Druid 的去重都是非精确的;
  • 无 Join 操作:Druid 适合处理星型模型的数据,不支持关联操作;
  • 数据没有 update 更新操作,只对 segment 粒度进行覆盖:由于时序化数据的特点,Druid 不支持数据的更新。

1.8 高可用性

  • MetaStore挂掉:无法感知新的Segment生成,不影响老数据;
  • Indexing Service挂掉:无法执行新的任务,新数据无法摄入,不影响查询;
  • Broker挂掉:本Broker节点不能查询,其他节点Broker继续服务,不影响数据摄入;
  • Historical挂掉:Coordinator Node重分配该节点上segment到其它节点;
  • Coordinator挂掉:Segment不会被加载和删除,选举新leader;
  • Zookeeper挂掉:无法执行新的任务,新数据进不来;Broker有缓存。

二、基本架构

图片1

2.1 内部进程

2.1.1 Coordinator

协调节点(Coordinator)主要负责历史节点的数据负载均衡,以及通过规则(Rule) 管理数据的生命周期。协调节点告诉历史节点加载新数据、卸载过期数据、复制数据、 和为了负载均衡移动数据。

Coordinator 是周期性运行的(由 druid.coordinator.period 配置指定,默认执行间隔为 60s)。因为需要评估集群的当前状态,才能决定应用哪种策略,所以,Coordinator 需要维护和 ZooKeeper 的连接,以获取集群的信息。而关于 Segment 和 Rule 的信息保存在了元数据库中,所以也需要维护与元数据库的连接。

2.1.2 Overlord

统治者(Overlord)进程监视 MiddleManager 进程,并且是数据摄入 Druid 的控制器。他们负责将提取任务分配给 MiddleManagers 并协调 Segement 发布,包括接受、拆解、分配 Task,以及创建 Task 相关的锁,并返回 Task 的状态。

2.1.3 Broker

查询节点(Broker)接收客户端查询请求,并将这些查询转发给 Historicals 和 MiddleManagers。当 Brokers 从这些子查询中收到结果时,它们会合并这些结果并将它们返回给调用者。

2.1.4 Router

Router 将请求路由到Broker, Coordinators和Overlords,进程可以在 Brokers、Overlords 和 Coordinators 进程之上,提供一层统一的 API网关。Router 进程是可选进程,不过如果集群的数据规模已经达到了 TB级别,还是需要考虑启用的(druid.router.managementProxy.enabled=true)。因为一旦集群规模达到一定的数量级,那么发生故障的概率就会变得不容忽视,而 Router 支持将请求只发送给健康的节点,避免请求失败。同时,查询的响应时间和资源消耗,也会随着数据量的增长而变高,而 Router 支持设置查询的优先级和负载均衡策略,避免了大查询造成的队列堆积或查询热点等问题。

另外,Router 节点还可用于将查询路由到不同的 Broker 节点,便于实现冷热分层,以更好地应对超大规模数据集。默认情况下,Router 会根据设置的 Rule 规则,来路由查询请求。例如,如果将最近一个月的数据加载到热集群中,则最近一个月内的查询可以路由到一组专用 Broker,超出该时间范围的查询将被路由到另一组 Broker,如此便实现了查询的冷热隔离。

2.1.5 Historical

历史节点(Historical)加载已生成好的数据文件,以供数据查询。historical 节点是整个集群查询性能的核心所在,因为 historical 会承担绝大部分的 segment 查询。
Historical 进程从 Deep Storage 中下载 Segment,并响应有关这些 Segment 的查询请求(这些请求来自Broker 进程)。另外,Historical 进程不处理写入请求 。
Historical 进程采用了无共享架构设计,它知道如何去加载和删除 Segment,以及如何基于 Segment 来响应查询。因此,即便底层的 Deep Storage无法正常工作,Historical 进程还是能针对其已同步的 Segments,正常提供查询服务。

2.1.6 MiddleManager

中间管理节点(MiddleManager)及时摄入实时数据,已生成 Segment 数据文件。
MiddleManager 进程是执行提交的任务的工作节点。Middle Managers 将任务转发给在不同 JVM 中运行的 Peon进程(如此,可以做到资源和日志的隔离)。MiddleManager、Peon、Task 的对应关系是,每个 Peon 进程一次只能运行一个Task 任务,但一个 MiddleManager 却可以管理多个 Peon 进程。

Druid进程可以按照您喜欢的任何方式进行部署,但是为了便于部署,我们建议将其组织为三种服务器类型:

  • Master: Coordinator, Overload 运行协调器和领主进程,管理数据可用性和接收;
  • Query: Broker and Router,行代理和可选的路由器进程,处理来自外部客户端的查询;
  • Data: Historical and MiddleManager,运行Historical和MiddleManager进程,执行提取工作负载并存储所有可查询的数据。

2.2 外部依赖

2.2.1 深度储存

深度储存(Deep Storage):存放生成的 Segment 数据文件,并供历史服务器下载, 对于单节点集群可以是本地磁盘,而对于分布式集群一般是 HDFS。

2.2.2 元数据库

元数据库(Metadata Storage),存储 Druid 集群的元数据信息,比如 Segment 的相关信息,一 般用 MySQL 或 PostgreSQL。

2.2.3 Zookeeper

Zookeeper:为 Druid 集群提供以执行协调服务。如内部服务的监控,协调和领导者选举。

涵盖了以下的几个主要特性:

Coordinator 节点的 Leader 选举;
Historical 节点发布 Segment 的协议;
Coordinator 和 Historical 之间 load/drop Segment 的协议;
Overlord 节点的 Leader 选举;
Overlord 和 MiddleManager 之间的 Task 管理。

2.3 Druid 主要功能特性

Druid的核心体系结构结合了数据仓库时间序列数据库日志搜索系统的思想。Druid的一些主要功能是:

  1. 列式存储格式。
    Druid使用面向列的存储,这意味着它仅需要加载特定查询所需的确切列。这极大地提高了仅命中几列的查询的速度。此外,每列都针对其特定数据类型进行了优化存储,从而支持快速扫描和聚合。
  2. 可扩展的分布式系统。
    Druid通常部署在数十到数百台服务器的群集中,并且可以提供每秒数百万条记录的接收速率,数万亿条记录的保留以及亚秒级到几秒的查询延迟。
  3. 大规模并行处理。
    Druid可以在整个集群中并行处理查询。
  4. 实时或批量摄取。
    Druid可以实时(批量获取被查询的数据)或批量提取数据。
  5. 自我修复,自我平衡,易于操作。
    作为操作员,要扩展或扩展集群,只需添加或删除服务器,集群就会在后台自动重新平衡自身,而不会造成任何停机。如果任何Druid服务器出现故障,系统将自动绕过损坏,直到可以更换这些服务器为止。Druid设计为24*7全天候运行,而无需出于任何原因而导致计划内停机,包括配置更改和软件更新。
  6. 云原生的容错架构,不会丢失数据。
    一旦Druid摄取了数据,副本就安全地存储在深度存储(通常是云存储,HDFS或共享文件系统)中。即使每台Druid服务器发生故障,也可以从深度存储中恢复数据。对于仅影响少数Druid服务器的有限故障,复制可确保在系统恢复时仍可进行查询。
  7. 用于快速过滤的索引。
    Druid使用CONCISE或 Roaring压缩的位图索引来创建索引,以支持快速过滤和跨多列搜索。
  8. 基于时间的分区。
    Druid首先按时间对数据进行分区,然后可以根据其他字段进行分区。这意味着基于时间的查询将仅访问与查询时间范围匹配的分区。这将大大提高基于时间的数据的性能。
  9. 近似算法。
    Druid包括用于近似计数区别,近似排名以及近似直方图和分位数计算的算法。这些算法提供有限的内存使用量,通常比精确计算要快得多。对于精度比速度更重要的情况,Druid还提供了精确的计数区别和精确的排名。
  10. 摄取时自动汇总。
    Druid可选地在摄取时支持数据汇总。这种汇总会部分地预先聚合您的数据,并可以节省大量成本并提高性能。

三、架构设计

3.1 存储设计

3.1.1 数据源和段

Datasources and segments

Druid 数据存储在 datasources 中,类似于传统RDBMS中的表。每个数据源都按时间分区,并且可以选择按其他属性进一步分区。每个时间范围都称为 chunk(例如,如果您的数据源按天划分,则为一天)。在一个chunk内,数据被划分为一个或多个 segments。每个段都是单个文件,通常包含多达几百万行的数据。由于细分是按时间块组织的,因此有时将段视为存在于如下时间线上是有帮助的:

图片2

数据源可能具有从几个段到数十万甚至数百万个段的任何位置。每个段都是从在MiddleManager上创建开始的,并且时段是可变的且未提交的。段构建过程包括以下步骤,旨在生成紧凑且支持快速查询的数据文件:

  • 转换为列格式
  • 使用位图索引编制索引
  • 使用各种算法进行压缩
    • 字符串列的ID存储最小化的字典编码
    • 位图索引的位图压缩
    • 所有列的类型感知压缩

分段会定期提交和发布。此时,它们被写入深度存储 deep storage,变得不可变,并从MiddleManagers迁移到Historical进程。有关该段的条目也将写入到元数据存储 metadata store中。该条目是有关该段的元数据的自描述位,包括诸如段的模式,其大小以及其在深度存储上的位置之类的信息。这些条目是协调器用来了解集群上应该有哪些数据的内容。

有关段文件格式的详细信息,请参阅段文件
有关在Druid中对数据建模的详细信息,请参见模式设计

3.1.2 索引和切换

Indexing and handoff

索引 是创建新段的机制,切换 是它们被发布并开始由历史进程提供服务的机制。该机制在索引端的工作方式如下:

1)索引任务开始运行并生成新段。必须先在索引任务构建段之前确定段的标识符,对于一个追加数据类型的任务(例如Kafka任务或者其他追加模式的索引任务),这将通过调用Overlord的allocate API来在现有的段集合中添加一个新的分区。对于一个重写类型的任务(例如Hadoop任务,或者一个非追加模式的索引任务)这是通过锁定间隔并创建新的版本号和新的段集来完成的。
2)如果一个索引任务是实时任务(像kafka任务),那么段在此刻可以被立即查询,它是可用的,但是未发布。
3)索引任务完成对段的数据读取后,会将其推入深度存储,然后通过将记录写入元数据存储来发布。
4)如果索引任务是实时任务,则此时它等待Historical进程加载段。如果索引任务不是实时任务,它将立即退出。

CoordinatorHistorical方面:

1)Coordinator定期(默认情况下,每1分钟)摘取元数据存储以查找新发布的段。
2)当Coordinator发现一个段是发布且可以被使用的、但是不可用的状态时,它会选个一个Historical进程来加载这个段;
3)Historical加载这个段并开始为其服务;
4)此时,如果索引任务正在等待切换,它将退出。

3.1.3 段标识符

Segment identifiers

包含一个由四部分组成的标识符,包含以下组件:

  • 数据源名称;
  • 时间间隔(包含该段的时间块,这与摄取时指定的 segmentGranularity 有关);
  • 版本号(通常是与段集首次启动时间相对应的 ISO8601 时间戳);
  • 分区号(一个整数,在数据源+间隔+版本中唯一;可能不一定是连续的)。

例如这个一个段标识符,数据源为 clarity-cloud0, 时间块为 2018-05-21T16:00:00.000Z/2018-05-21T17:00:00.000Z, 版本为 2018-05-21T15:56:09.909Z 以及分区编号为 1:

1
2
clarity-cloud0_2018-05-21T16:00:00.000Z_2018-05-21T17:00:00.000Z_2018-05-21T15:56:09.909Z_1
Copy

分区号为0的段(块中的第一个分区)忽略分区号,如下例所示,该段与上一个时间块位于同一时间块中,但分区号为0而不是1:

1
clarity-cloud0_2018-05-21T16:00:00.000Z_2018-05-21T17:00:00.000Z_2018-05-21T15:56:09.909

3.1.4 段版本

Segment versioning

您可能想知道上一节中描述的版本号是用来做什么的。或者,你可能不想知道,在这种情况下对你有好处,你可以跳过这一节!

它支持批处理模式覆盖。在Druid中,如果您所做的只是附加数据,那么每个时间块只有一个版本。但是当您覆盖数据时,幕后发生的事情是,使用相同的数据源、相同的时间间隔、但更高的版本号创建一组新的段。这向Druid系统的其他部分发出了一个信号:旧版本应该从集群中删除,新版本应该替换它。

这个切换对用户来说似乎是瞬间发生的,因为Druid通过首先加载新数据(但不允许查询它)来处理这个问题,然后在新数据全部加载后,将所有新查询切换为使用这些新段。然后它会在几分钟后丢弃旧的段。

3.1.5 段生命周期

Segment lifecycle

每个段都有一个生命周期,涉及以下三个主要领域:

1)元数据存储:段的元数据(一个小的JSON,通常不超过几个KB)在段构建完成后存储在元数据存储中,将段的记录插入到元数据存储中称为发布(Publishing)。这些元数据记录中有一个 used 的布尔标识,控制着段是否可查询。被实时任务创建的段在发布之前是可用的,因为它们仅在完成之时发布,并且不再接受额外的数据行。

2)深度存储:一旦构建了一个段,在将元数据发布到元数据存储之前就立刻将段数据文件推送到深度存储。

3)可查询性:在某些Druid数据服务器上段是可以进行查询的,如实时任务或Historical进程。

可以使用Druid SQL查询 sys.segments 表检查当前活动段的状态,它包括以下标志:

  • is_published: 如果段元数据已发布到元数据存储且 used 是true的话,则为true;
  • is_available: 如果段当前可用于查询(实时任务或Historical进程),则为true;
  • is_realtime: 如果段仅在实时任务上可用,则为true。对于使用实时摄取的数据源,这通常从true开始,然后在发布和切换段时变为false;
  • is_overshadowed: 如果段已发布( used 设置为true),并且被某些其他已发布段完全覆盖,则为true。一般来说,这是一个过渡状态,处于该状态的段很快将其 used 标志自动设置为false;

3.2.6 查询处理

查询分布在 Druid 集群中,并由 Broker 管理。查询首先进入Broker,Broker首先鉴别哪些段可能与本次查询有关。 段的列表总是按照时间进行筛选和修剪的,当然也可能由其他属性,具体取决于数据源的分区方式。然后,Broker将确定哪些HistoricalMiddleManager为这些段提供服务、并向每个进程发送一个子查询。 Historical和MiddleManager进程接收查询、处理查询并返回结果,Broker将接收到的结果合并到一起形成最后的结果集返回给调用者。

时间和属性精简是Druid限制每个查询扫描数据量的一个重要方法,但不是唯一的方法。对于比Broker更细粒度级别的精简筛选器,每个段中的索引结构允许Druid在查看任何数据行之前,找出哪些行(如果有的话)与筛选器集匹配。一旦Druid知道哪些行与特定查询匹配,它就只访问该查询所需的特定列。在这些列中,Druid可以从一行跳到另一行,避免读取与查询过滤器不匹配的数据。

因此,Druid使用三种不同的技术来最大化查询性能:

  • 精简每个查询访问的段;
  • 在每个段中,使用索引标识必须访问哪些行;
  • 在每个段中,只读取与特定查询相关的特定行和列。

有关 Druid 如何执行查询的更多详细信息,请参阅查询执行文档。

3.2 段设计

Apache Druid 将其索引存储在按时间分区的段文件中。在基本设置中,通常为每个时间间隔创建一个段文件,其中时间间隔可在 granularitySpecsegmentGranularity 参数中配置。为了让 Druid 在繁重的查询负载下运行良好,段文件大小在 300MB-700MB 的推荐范围内很重要。如果你的段文件大于该范围,请考虑更改时间间隔的粒度,或者对数据进行分区,并在 partitionsSpec中调整 targetPartitionSize(此参数的建议起点是500万行)。有关更多信息,请参阅下面的分片部分批处理摄取文档的分区规范部分。

3.2.1 段文件的核心数据结构

在这里,我们描述段文件的内部结构,它本质上是列式的:每列的数据在单独的数据结构中。通过分别存储每一列,Druid可以通过只扫描查询实际需要的列来减少查询延迟。有三种基本列类型:时间戳列维度列指标列,如下图所示:

图片3

timestamp和metric列很简单:在底层,每个列都是用LZ4压缩的整数或浮点值数组。一旦查询知道需要选择哪些行,它只需解压缩这些行,提取相关行,然后使用所需的聚合运算符进行计算。与所有列一样,如果不查询一个列,则跳过该列的数据。

dimension列是不同的,因为它们支持过滤和聚合操作,所以每一个维度都需要以下三种数据结构:

1)一个将值(通常被当做字符串)映射到整数id的字典;
2)一个使用第1步的字典进行编码的列值的列表;
3)对于列中每一个不同的值,标识哪些行包含该值的位图。

为什么需要这三种数据结构?字典简单地将字符串值映射到整数id,以便于在(2)和(3)中可以紧凑的表示。(3)中的位图(也称倒排索引)可以进行快速过滤操作(特别是,位图便于快速进行AND和OR操作)。 最后,GroupByTopN 查询需要(2)中的值列表。换句话说,仅基于过滤器的聚合指标是不需要(2)中存储的维度值列表的。

要具体了解这些数据结构,请考虑上面示例数据中的page列,表示此维度的三个数据结构如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1: Dictionary that encodes column values
{
"Justin Bieber": 0,
"Ke$ha": 1
}

2: Column data
[0,
0,
1,
1]

3: Bitmaps - one for each unique value of the column
value="Justin Bieber": [1,1,0,0]
value="Ke$ha": [0,0,1,1]

多值列

如果数据源使用多值列,那么段文件中的数据结构看起来有点不同。让我们假设在上面的例子中,第二行同时标记了Ke$haJustin Bieber主题。在这种情况下,这三种数据结构现在看起来如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1: Dictionary that encodes column values
{
"Justin Bieber": 0,
"Ke$ha": 1
}

2: Column data
[0,
[0,1], <--Row value of multi-value column can have array of values
1,
1]

3: Bitmaps - one for each unique value
value="Justin Bieber": [1,1,0,0]
value="Ke$ha": [0,1,1,1]
^
|
|
Multi-value column has multiple non-zero entries

请注意列数据和Ke$ha位图中第二行的更改。如果一行中的某一列有多个值,则其在列数据中的条目是一个数组。此外,在列数据中有n个值的行在位图中将有n个非零值项。

3.2.2 SQL 兼容的空值处理

默认情况下,Druid字符串维度列可以使用 "" 或者 null ,数值列和指标列则不能表示为 null,而是将 null 强制为 0。但是,Druid还提供了一个与SQL兼容的空值处理模式,必须在系统级别通过 Druid.generic.useDefaultValueForNull 启用,当设置为 false 时,此设置将允许Druid在接收数据时创建的段中:字符串列区分 ""null ,数值列区分 null0

在这种模式下,字符串维度列不包含额外的列结构,只是为 null 保留额外的字典条目。但是,数值列将与一个附加位图一起存储在段中,该位图标识哪些行是 null 值。除了略微增加段大小之外,由于需要检查 null 的位图,SQL兼容的空值处理在查询时也会导致性能损失,此性能开销仅对实际包含null列的场景中存在。

3.2.3 命名规则

段标识符通常使用数据源、时间区间的开始时间(ISO 8601格式)、时间区间的结束时间(ISO 8601格式)和版本来构造。如果数据被额外的分片后超出了时间范围,则段标识符还将包含分区号。

一个示例段标识符可以是: 数据源名称_开始时间_结束时间_版本号_分区号

3.2.4 段的组成

在底层,一个段由以下几个文件组成:

  • version.bin

    4个字节,以整数表示当前段版本。 例如,对于v9段,版本为0x0、0x0、0x0、0x9

  • meta.smoosh

    一个包含其他 smoosh 文件内容的元数据(文件名以及偏移量)文件

  • XXXXX.smoosh

    这些文件中有一些是串联的二进制数据

    smoosh 文件代表 smooshed 在一起的多个文件,以减少必须打开用来容纳数据的文件描述符的数量,它们是最大为2GB的文件(以匹配Java中内存映射的ByteBuffer的限制)。smoosh 文件为数据中的每个列提供单独的文件,并在 index.drd 文件提供有关该段的额外元数据。

    还有一个称为 __time 的特殊列,它表示该段的时间列。

在代码库中,段具有内部格式版本。当前的句段格式版本为 v9

3.2.5 列的格式

每列存储为两部分:

  • Jackson序列化的列描述符(ColumnDescriptor);
  • 列二进制文件的其余部分。

列描述符本质上是一个对象,它允许我们使用Jackson的多态反序列化来添加新的有趣的序列化方法,并且对代码的影响最小。它由关于列的一些元数据(它是什么类型的,它是多值的,等等)和一列序列化/反序列化逻辑组成,这些逻辑可以反序列化二进制文件的其余部分。

3.2.6 压缩

Druid 压缩字符串、long、float 和 double 列的值块,默认使用LZ4,字符串列和数字空值的位图使用Roaring压缩. 我们建议坚持使用这些默认值,除非使用您自己的数据和查询模式进行实验验证表明非默认选项在您的特定情况下会表现得更好。例如,对于字符串列中的位图,使用 Roaring 和 CONCISE 之间的差异对于高基数列最为明显。在这种情况下,Roaring 在匹配大量值的过滤器上要快得多,但在某些情况下,由于 Roaring 格式的开销,CONCISE 的占用空间较小(但在匹配大量值时仍然较慢)。目前,压缩配置在段级别而不是单个列,有关更多详细信息,请参阅IndexSpec

3.2.7 切分数据以创建段

数据分片

对于同一数据源,同一时间间隔内可能存在多个段。这些段在一段时间内形成一个 。根据用于切分数据的 shardSpec 的类型,只有当一个 完成时,Druid查询才能完成。也就是说,如果一个块由3个段组成,例如:

1
2
3
sampleData_2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z_v1_0
sampleData_2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z_v1_1
sampleData_2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z_v1_2

在完成对间隔 2011-01-01T02:00:00:00Z_2011-01-01T03:00:00:00Z 的查询之前,必须加载所有3个段。

此规则的例外是使用线性切片规范。线性切片规范不会强制完整性,即使系统中没有加载切片,查询也可以完成。例如,如果您的实时摄取任务创建了3个使用线性切片规范进行分段的段,并且系统中只加载了其中的两个段,那么查询将只返回这两个段的结果。

3.2.8 Schema更改

替换段

Druid使用数据源、间隔、版本和分区号唯一地标识段。只有在为某个时间粒度创建多个段时,分区号才在段id中可见。例如,如果有小时段,但一小时内的数据量超过单个段的容量,则可以为同一小时创建多个段。这些段将共享相同的数据源、间隔和版本,但具有线性增加的分区号。

1
2
3
foo_2015-01-01/2015-01-02_v1_0
foo_2015-01-01/2015-01-02_v1_1
foo_2015-01-01/2015-01-02_v1_2

在上述段的实例中,dataSource = foo, interval = 2015-01-01/2015-01-02, version = v1, and partitionNum = 0。 如果在以后的某个时间点,使用新的schema重新索引数据,则新创建的段将具有更高的版本id。

1
2
3
foo_2015-01-01/2015-01-02_v2_0
foo_2015-01-01/2015-01-02_v2_1
foo_2015-01-01/2015-01-02_v2_2

Druid批量索引任务(基于Hadoop或基于IndexTask)保证了间隔内的的原子更新。在我们的例子中,在 2015-01-01/2015-01-02 的所有 v2 段加载到Druid集群之前,查询只使用 v1 段。加载完所有 v2 段并可查询后,所有查询都将忽略 v1 段并切换到 v2 段。不久之后,v1段将从集群中卸载。

请注意,跨越多个段间隔的更新在每个间隔内都是原子的,但是在整个更新过程中它们不是原子的。例如,您有如下段:

1
2
3
foo_2015-01-01/2015-01-02_v1_0
foo_2015-01-02/2015-01-03_v1_1
foo_2015-01-03/2015-01-04_v1_2

v2 段将在构建后立即加载到集群中,并在段重叠的时间段内替换 v1 段。在完全加载 v2段之前,集群可能混合了 v1v2 段。

1
2
3
foo_2015-01-01/2015-01-02_v1_0
foo_2015-01-02/2015-01-03_v2_1
foo_2015-01-03/2015-01-04_v1_2

在这种情况下,查询可能会命中 v1v2 段的混合.

3.2.9 段之间的不同Schema

同一数据源的Druid段可能有不同的schema。如果一个字符串列(维度列)存在于一个段中而不是另一个段中,则涉及这两个段的查询仍然有效。对缺少维度的段的查询将表现为该维度只有空值。类似地,如果一个段有一个数值列(指标列),而另一个没有,那么查询缺少指标列的段通常会做正确的事情,在缺失的指标上做聚合操作也就是缺失的。

3.3 进程和服务

3.3.1 进程类型

Druid有以下几种进程类型:

3.3.2 服务类型

Druid进程可以按照您喜欢的任何方式部署,但是为了便于部署,我们建议将它们组织成三种服务器类型:

  • Master
  • Query
  • Data

图片1

3.3.2.1 Master服务

Master服务管理数据的摄取和可用性:它负责启动新的摄取作业并协调下面描述的”Data服务”上数据的可用性。

在Master服务中,功能分为两个进程:Coordinator和Overlord。

Coordinator进程

Coordinator 监视Data服务中的Historical进程,它们负责将数据段分配给特定的服务器,并确保数据段在各个Historical之间保持良好的平衡。

Overlord进程

Overlord 监视Data服务中的MiddleManager进程,并且是Druid数据接收的控制器。它们负责将接收任务分配给MiddleManager,并协调数据段的发布。

3.3.2.2 Query服务

Query服务提供用户和客户端应用程序交互,将查询路由到Data服务或其他Query服务(以及可选的代理Master服务请求)。

在Query服务中,功能上分为两个进程:Broker和Router。

Broker进程

Broker从外部客户端接收查询并将这些查询转发到Data服务器, 当Broker接收到子查询的结果时,它们会合并这些结果并将其返回给调用者。用户通常查询Broker,而不是直接查询Data服务中的Historical或MiddleManager进程。

Router进程(可选)

Router进程是可选的进程,相当于是为Druid Broker、Overlord和Coordinator提供一个统一的API网关。Router是可选的,因为也可以直接与Druid的Broker、Overlord和Coordinator。

Router还运行着Druid控制台,一个用于数据源、段、任务、数据进程(Historical和MiddleManager)和Coordinator动态配置的管理UI。用户还可以在控制台中运行SQL和本地Druid查询。

3.3.2.3 Data服务

Data服务执行摄取作业并存储可查询数据。

在Data服务中,根据功能被分为两个进程:Historical和MiddleManager。

3.3.2.4 Historical进程

Historical 进程是处理存储和查询”Historical”数据(包括系统中已提交足够长时间的任何流数据)的工作程序。Historical进程从深层存储下载段并响应有关这些段的查询,他们不接受写操作。

3.3.2.5 MiddleManager进程

MiddleManager 进程处理将新数据摄取到集群中的操作, 他们负责读取外部数据源并发布新的Druid段。

Peon进程

Peon 进程是由MiddleManagers生成的任务执行引擎。每个Peon运行一个单独的JVM,负责执行一个任务。Peon总是和运行它们的MiddleManager在同一个主机上运行。

3.3.2.6 Indexer进程(可选)

Indexer 进程是MiddleManager和Peon的替代方法。Indexer在单个JVM进程中作为单个线程运行任务,而不是为每个任务派生单独的JVM进程。

与MiddleManager + Peon系统相比,Indexer的设计更易于配置和部署,并且能够更好地实现跨任务的资源共享。Indexer是一种较新的功能,由于其内存管理系统仍在开发中,因此目前被指定为实验性的特性。它将在Druid的未来版本中继续成熟。

通常,您可以部署MiddleManagers或indexer,但不能同时部署两者。

3.3.3 服务混合部署的利弊

Druid进程可以基于上面描述的Master/Data/Query服务组织进行混合部署,这种部署方式通常会使大多数集群更好地利用硬件资源。

但是,对于非常大规模的集群,可以分割Druid进程,使它们在单独的服务器上运行,以避免资源争用。

本节介绍与进程混合部署相关的指南和配置参数。

3.3.3.1 Coordinator和Overlord

Coordinator进程的工作负载往往随着集群中段的数量而增加。Overlord的工作量也会根据集群中的分段数而增加,但程度要比Coordinator小。

在具有非常大量的段的集群中,可以将Coordinator进程和Overlord进程分开,以便为Coordinator进程的分段平衡工作负载提供更多资源。

统一进程

通过设置 druid.Coordinator.asOverlord.enabled 属性,Coordinator进程和Overlord进程可以作为单个组合进程运行。

有关详细信息,请参阅Coordinator配置

3.3.3.2 Historical和MiddleManager

对于更高级别的数据摄取或查询负载,将Historical进程和MiddleManager进程部署在不同的主机上以避免CPU和内存争用。

Historical还受益于为内存映射段提供可用内存,这也是分别部署Historical和MiddleManager进程的另一个原因。

3.4 深度存储

Apache Druid不提供的存储机制,深度存储是存储段的地方。深度存储基础结构定义了数据的持久性级别,只要Druid进程能够看到这个存储基础结构并获得存储在上面的段,那么无论丢失多少Druid节点,都不会丢失数据。如果段在深度存储层消失了,则这些段中存储的任何数据都将丢失。

3.4.1 本地挂载

本地装载也可用于存储段。这使得您可以使用本地文件系统或任何可以在本地挂载的东西,如NFS、Ceph等来存储段。这是默认的深度存储实现。

为了使用本地装载进行深层存储,需要在公共配置中设置以下配置:

属性可能的取值描述默认值
druid.storage.typelocal必须设置
druid.storage.storageDirectory存储段的目录必须设置

注意,通常应该将 druid.storage.storageDirectory 设置为与 druid.segmentCache.locationsdruid.segmentCache.infoDir 不同的目录。

如果在本地模式下使用Hadoop Indexer,那么只需给它一个本地目录作为输出目录,它就可以工作了。

3.4.2 S3适配

请看druid-s3-extensions扩展文档

3.4.3 HDFS

请看druid-hdfs-extensions扩展文档

3.4.4 其他深度存储

对于另外的深度存储等,可以参见扩展列表

3.5 元数据存储

元数据存储是Apache Druid的一个外部依赖。Druid使用它来存储系统的各种元数据,但不存储实际的数据。下面有许多用于各种目的的表。

Derby是Druid的默认元数据存储,但是它不适合生产环境。MySQLPostgreSQL是更适合生产的元数据存储。

元数据存储存储了Druid集群工作所必需的整个元数据。对于生产集群,考虑使用MySQL或PostgreSQL而不是Derby。此外,强烈建议设置数据库的高可用,因为如果丢失任何元数据,将无法恢复。

3.5.1 使用Derby

将以下内容添加到您的Druid配置中:

1
2
3
druid.metadata.storage.type=derby
druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527//opt/var/druid_state/derby;create=true
Copy

3.5.2 MySQL

参见mysql-metadata-storage扩展文档

5.3.3 PostgreSQL

参见postgresql-metadata-storage扩展文档

5.3.4 添加自定义的数据库连接池属性

注意:usernamepasswordconnectURIvalidationQuerytestOnBorrow 这些属性不能通过 druid.metadata.storage.connector.dbcp 属性设置,这些必须通过 druid.metadata.storage.connector 属性设置。

支持的属性示例:

1
2
3
druid.metadata.storage.connector.dbcp.maxConnLifetimeMillis=1200000
druid.metadata.storage.connector.dbcp.defaultQueryTimeout=30000
Copy

全部列表请查看 基本数据源配置

5.3.5 元数据存储表

5.3.5.1 段表

这是由 druid.metadata.storage.tables.segments 属性决定的。

此表存储了系统中可用段的元数据。Coordinator 对表进行轮询,以确定可用于在系统中查询的段集。该表有两个主功能列,其他列用于索引。

used 列是布尔型标识。1表示集群应”使用”该段(即,应加载该段并可用于请求), 0表示不应将段主动加载到集群中。我们这样做是为了从集群中删除段,而不实际删除它们的元数据(这允许在出现问题时更简单地回滚)。

payload 列存储一个JSON blob,该blob包含该段的所有元数据(存储在该payload中的某些数据与表中的某些列是冗余的,这是有意的), 信息如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"dataSource":"wikipedia",
"interval":"2012-05-23T00:00:00.000Z/2012-05-24T00:00:00.000Z",
"version":"2012-05-24T00:10:00.046Z",
"loadSpec":{
"type":"s3_zip",
"bucket":"bucket_for_segment",
"key":"path/to/segment/on/s3"
},
"dimensions":"comma-delimited-list-of-dimension-names",
"metrics":"comma-delimited-list-of-metric-names",
"shardSpec":{"type":"none"},
"binaryVersion":9,
"size":size_of_segment,
"identifier":"wikipedia_2012-05-23T00:00:00.000Z_2012-05-24T00:00:00.000Z_2012-05-23T00:10:00.046Z"
}
Copy

请注意,此blob的格式可以而且将不时地更改。

5.3.5.2 规则表

规则表用于存储有关段应在何处着陆的各种规则。Coordinator 在对集群进行段(重)分配决策时使用这些规则。

5.3.5.3 配置表

配置表用于存储运行时的配置对象。我们还没有很多这样的机制,我们也不确定是否会继续使用这种机制,但这是一种在运行时跨集群更改一些配置参数的方法的开始。

5.3.5.4 任务相关的表

在管理任务时,OverlordMiddleManager 还创建和使用了许多表。

5.3.5.5 审计表

审核表用于存储配置更改的历史记录,例如Coordinator 所做的规则更改和其他配置更改。

只有以下角色才能访问元数据存储:

  • 索引服务进程(如果有)
  • 实时进程(如果有)
  • 协调程序

因此,您只需要为这些计算机授予访问元数据存储的权限(例如,在AWS安全组中)。

3.6 ZooKeeper

Apache Druid使用Apache ZooKeeper 来管理整个集群状态。通过ZK来进行的操作有:

3.6.1 Coordinator Leader选举

我们使用 Curator LeadershipLatch 进行Leader选举:

1
${druid.zk.paths.coordinatorPath}/_COORDINATOR

3.6.2 Historical和Realtime之间的段发布

announcementsPathservedSegmentsPath 这两个参数用于这个功能。

所有的 Historical 进程都将它们自身发布到 announcementsPath, 具体来说它们将在以下路径创建一个临时的ZNODE:

1
${druid.zk.paths.announcementsPath}/${druid.host}

这意味着Historical节点可用。它们也将随后创建一个ZNODE:

1
${druid.zk.paths.servedSegmentsPath}/${druid.host}

当它们加载段时,它们将在以下路径附着一个临时的ZNODE:

1
${druid.zk.paths.servedSegmentsPath}/${druid.host}/_segment_identifier_

然后,CoordinatorBroker 之类的进程可以监视这些路径,以查看哪些进程当前正在为哪些段提供服务。

3.6.3 Coordinator和Historical之间的段加载/删除

loadQueuePath 参数用于这个功能。

Coordiantor 决定一个 Historical 进程应该加载或删除一个段时,它会将一个临时znode写到:

1
${druid.zk.paths.loadQueuePath}/_host_of_historical_process/_segment_identifier

这个znode将包含一个payload,它向Historical进程指示它应该如何处理给定的段。当Historical进程完成任务时,它将删除znode,以便向Coordinator表示它已经完成处理。

四、Druid 部署

4.1 单机模式部署

Druid 包含一组用于单机部署的参考配置和启动脚本,位于conf/druid/single-server/目录下

  • nano-quickstart
  • micro-quickstart
  • small
  • medium
  • large
  • xlarge
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Nano-Quickstart:1个CPU,4GB RAM
启动命令: bin/start-nano-quickstart
配置目录: conf/druid/single-server/nano-quickstart/*
微型快速入门:4个CPU,16GB RAM
启动命令: bin/start-micro-quickstart
配置目录: conf/druid/single-server/micro-quickstart/*
小型:8 CPU,64GB RAM(〜i3.2xlarge)
启动命令: bin/start-small
配置目录: conf/druid/single-server/small/*
中:16 CPU,128GB RAM(〜i3.4xlarge)
启动命令: bin/start-medium
配置目录: conf/druid/single-server/medium/*
大型:32 CPU,256GB RAM(〜i3.8xlarge)
启动命令: bin/start-large
配置目录: conf/druid/single-server/large/*
超大型:64 CPU,512GB RAM(〜i3.16xlarge)
启动命令: bin/start-xlarge
配置目录: conf/druid/single-server/xlarge/*

4.1.1 安装 jdk

1
2
3
4
5
6
7
[hadoop@hadoop3 ~]$ tar xf downloads/jdk-8u301-linux-x64.tar.gz
[hadoop@hadoop3 ~]$ vim .bash_profile
export JAVA_HOME=/home/hadoop/jdk1.8.0_301
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

[hadoop@hadoop1 ~]$ source .bash_profile

4.1.2 准备 druid

1
2
3
4
5
[hadoop@hadoop3 ~/downloads]$ wget https://dlcdn.apache.org/druid/0.21.1/apache-druid-0.21.1-bin.tar.gz
https://dlcdn.apache.org/druid/0.21.1/apache-druid-0.21.1-bin.tar.gz
https://downloads.apache.org/druid/0.21.1/apache-druid-0.21.1-bin.tar.gz
[hadoop@hadoop3 ~]$ tar xf apache-druid-0.21.1-bin.tar.gz
[hadoop@hadoop3 ~]$ cd apache-druid-0.21.1/

4.1.3 启动 druid

1
[hadoop@hadoop3 ~/apache-druid-0.21.1]$ ./bin/start-nano-quickstart start

如果端口冲突,可通过以下配置修改各服务端口信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cd conf/druid/single-server

./nano-quickstart/
├── _common
│ ├── common.runtime.properties
│ └── log4j2.xml
├── broker
│ └── runtime.properties druid.plaintextPort=端口号
├── coordinator-overlord
│ └── runtime.properties druid.plaintextPort=端口号
├── historical
│ └── runtime.properties druid.plaintextPort=端口号
├── middleManager
│ └── runtime.properties druid.plaintextPort=端口号
└── router
└── runtime.properties druid.plaintextPort=端口号

访问 WEB UI

图片6

4.2 集群模式部署

Apache Druid 旨在部署为可扩展、容错的集群。

在这里,我们将设置一个简单的集群,这个简单的集群将具有:

  • 一个Master用于托管 Coordinator 和 Overlord 进程的主服务器;
  • 两个Data运行Historical和 MiddleManager 进程的服务器;
  • 一个Query服务器,托管 Broker 和 Router 进程。

服务规划

主机名druid角色依赖服务
hadoop1Master(Coordinator、Overlord)JDK1.8、zookeeper、Mysql、HDFS
hadoop2Data(Historical、MiddleManager)JDK1.8、zookeeper
hadoop3Query(Broker、Router)、DataJDK1.8、zookeeper

在生产环境,推荐署多个Master和多个Query服务来满足容错。但现在可以快速的使用一个Master、一个Query服务器的方式先完成集群部署,后续添加Master、Query服务器。

前置条件

HDFS、zookeeper和Mysql是可用的

4.2.1 安装 jdk

1
2
3
4
5
6
7
[hadoop@hadoop1 ~]$ tar xf downloads/jdk-8u301-linux-x64.tar.gz
[hadoop@hadoop1 ~]$ vim .bash_profile
export JAVA_HOME=/home/hadoop/jdk1.8.0_301
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib

[hadoop@hadoop1 ~]$ source .bash_profile

4.2.2 准备 druid

1
2
3
[hadoop@hadoop1 ~/downloads]$ wget https://dlcdn.apache.org/druid/0.21.1/apache-druid-0.21.1-bin.tar.gz
[hadoop@hadoop1 ~]$ tar xf apache-druid-0.21.1-bin.tar.gz
[hadoop@hadoop1 ~]$ cd apache-druid-0.21.1/

4.2.3 准备 mysql 账户

1
2
3
4
#创建druid数据库和账户
mysql> CREATE DATABASE druid DEFAULT CHARACTER SET utf8mb4;
mysql> CREATE USER 'druid'@'%' IDENTIFIED BY 'druid';
mysql> GRANT ALL PRIVILEGES ON druid.* TO 'druid'@'%';

4.2.3 druid 配置

4.2.3.1 元数据存储配置

修改 conf/druid/cluster/_common/common.runtime.properties 配置文件,添加mysql相关信息,记得关闭默认的derby配置

1
2
3
4
5
druid.extensions.loadList=["mysql-metadata-storage"]
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://<host>/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd

下载mysql驱动,存储到 extensions/mysql-metadata-storage 目录下

1
[hadoop@hadoop1 ~/apache-druid-0.21.1/extensions/mysql-metadata-storage]$ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
4.2.3.2 深度存储配置

Druid 依赖分布式文件系统或大对象 (blob) 存储来存储数据。最常用的深度存储实现是 S3 和 HDFS 。这里配置为HDFS,同样修改 conf/druid/cluster/_common/common.runtime.properties 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
druid.extensions.loadList=["mysql-metadata-storage", "druid-hdfs-storage"]

druid.host=hadoop3
#druid.storage.type=local
#druid.storage.storageDirectory=var/druid/segments

druid.storage.type=hdfs
druid.storage.storageDirectory=/druid/segments

#druid.indexer.logs.type=file
#druid.indexer.logs.directory=var/druid/indexing-logs

druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/druid/indexing-logs
4.2.3.3 配置 Zookeeper 连接
1
2
druid.zk.service.host=hadoop1:2181,hadoop2:2181,hadoop3:2181
druid.zk.paths.base=/druid
4.2.3.4 配置连接Hadoop

因为前面深度存储配置为了HDFS,那么此时应该配置 Druid 以了解您的Hadoop集群:

  • 将 Hadoop 配置 XML(core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml)放在 Druid 进程的类路径上。您可以通过将它们复制到conf/druid/cluster/_common/ 目录下来做到这一点。

请注意,若不需要使用 HDFS 深度存储来从 Hadoop 加载数据。例如,如果您的集群在 Amazon Web Services 上运行,我们建议使用 S3 进行深度存储,即使您使用 Hadoop 或 Elastic MapReduce 加载数据。

4.2.4 分发配置

将修改好的druid配置分发到各节点

1
2
[hadoop@hadoop1 ~]$ rsync -av apache-druid-0.21.1 hadoop@hadoop2:~/
[hadoop@hadoop1 ~]$ rsync -av apache-druid-0.21.1 hadoop@hadoop3:~/

注意各节点的 druid.host 主机名,配置为可以通过该主机名访问的节点名称

4.2.5 启动各节点服务

1
2
3
4
5
6
7
8
9
#启动Master
[hadoop@hadoop1 ~/apache-druid-0.21.1]$ ./bin/start-cluster-master-no-zk-server

#启动Data
[hadoop@hadoop2 ~/apache-druid-0.21.1]$ ./bin/start-cluster-data-server
[hadoop@hadoop3 ~/apache-druid-0.21.1]$ ./bin/start-cluster-data-server

#启动Query
[hadoop@hadoop3 ~/apache-druid-0.21.1]$ ./bin/start-cluster-query-server

访问 WEB UI,Query节点8888端口

图片7

4.2.6 druid 各服务默认资源占用

角色默认资源占用
Coordinator-Xms15g -Xmx15g
OverlordXms15g -Xmx15g
middleManagerXms128m -Xmx128m
historical-Xms8g -Xmx8g -XX:MaxDirectMemorySize=13g
broker-Xms12g -Xmx12g -XX:MaxDirectMemorySize=6g
routers-Xms1g -Xmx1g -XX:MaxDirectMemorySize=128m

4.2.7 druid 各服务默认端口

Master Server

  • 1527 (Derby 元数据存储;如果您使用单独的元数据存储,如 MySQL 或 PostgreSQL,则不需要)
  • 2181 (ZooKeeper,如果您使用单独的 ZooKeeper 集群,则不需要)
  • 8081 (Coordinator)
  • 8090 (Overlord)

Data Server

  • 8083 (Historical)
  • 8091, 8100–8199 (Druid Middle Manager)

Query Server

  • 8082 (Broker)
  • 8088 (Router, 如果使用)

4.3 加载示例数据

这里演示一下从本地加载官方的示例数据,大概有以下几种方式

4.3.1 通过控制台

Druid 包包含以下示例本机批处理摄取任务规范quickstart/tutorial/wikipedia-index.json,为方便起见,此处显示,已配置为读取quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz输入文件:

图片8

或者,在Ingestion视图中,单击Tasks旁边的省略号并选择Submit JSON task,粘贴以下内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
{
"type" : "index_parallel",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"timestampSpec": {
"column": "time",
"format": "iso"
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2015-09-12/2015-09-13"],
"rollup" : false
}
},
"ioConfig" : {
"type" : "index_parallel",
"inputSource" : {
"type" : "local",
"baseDir" : "quickstart/tutorial/",
"filter" : "wikiticker-2015-09-12-sampled.json.gz"
},
"inputFormat" : {
"type": "json"
},
"appendToExisting" : false
},
"tuningConfig" : {
"type" : "index_parallel",
"maxRowsPerSegment" : 5000000,
"maxRowsInMemory" : 25000
}
}
}

该规范创建了一个名为wikipedia的数据源。

4.3.2 通过命令行

1
$ bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://localhost:8081

4.3.3 HTTP 客户端

1
$ curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-index.json http://localhost:8081/druid/indexer/v1/task

4.4 数据查询方式

4.4.1 控制台查询

Druid 控制台包含一个视图,可以更轻松地构建和测试查询以及查看其结果。

控制台进入Query查询视图,在左则选择需要查询的数据源

图片9

展开左窗格中的wikipedia数据源树。我们将为页面维度创建一个查询。

单击page,然后从菜单中单击Show:page

图片10

SELECT 查询出现在查询编辑窗格中并立即运行。但是,在这种情况下,查询不返回任何数据,因为默认情况下查询会过滤最后一天的数据,而我们的数据远比这更旧。让我们移除过滤器。

在数据源树中,单击__time删除过滤器

图片11

单击运行以运行查询。

图片12

默认情况下,由于智能查询限制 功能,控制台中的结果被限制在大约一百个左右。这有助于用户避免无意中运行返回过多数据的查询,这可能会使他们的系统不堪重负。

4.4.2 dsql 查询 SQL

Druid 包包含一个 SQL 命令行客户端,位于bin/dsqlDruid 包根目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[hadoop@hadoop3 ~/apache-druid-0.21.1]$ ./bin/dsql
Welcome to dsql, the command-line client for Druid SQL.
Connected to [http://localhost:8082/].

Type "\h" for help.
dsql> SELECT page, COUNT(*) AS Edits FROM wikipedia WHERE "__time" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY page ORDER BY Edits DESC LIMIT 10;
┌──────────────────────────────────────────────────────────┬───────┐
│ page │ Edits │
├──────────────────────────────────────────────────────────┼───────┤
│ Wikipedia:Vandalismusmeldung │ 33 │
│ User:Cyde/List of candidates for speedy deletion/Subpage │ 28 │
│ Jeremy Corbyn │ 27 │
│ Wikipedia:Administrators' noticeboard/Incidents │ 21 │
│ Flavia Pennetta │ 20 │
│ Total Drama Presents: The Ridonculous Race │ 18 │
│ User talk:Dudeperson176123 │ 18 │
│ Wikipédia:Le Bistro/12 septembre 2015 │ 18 │
│ Wikipedia:In the news/Candidates │ 17 │
│ Wikipedia:Requests for page protection │ 17 │
└──────────────────────────────────────────────────────────┴───────┘
Retrieved 10 rows in 0.09s.

dsql>

4.4.3 HTTP 查询 SQL

可以通过 HTTP 直接向 Druid Broker 提交查询。

Druid 包包括一个示例文件,其中包含上面显示的 SQL 查询quickstart/tutorial/wikipedia-top-pages-sql.json。让我们将该查询提交给 Druid Broker:

1
$ curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/tutorial/wikipedia-top-pages-sql.json http://localhost:8888/druid/v2/sql | jq .

应该返回以下结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
[
{
"page": "Wikipedia:Vandalismusmeldung",
"Edits": 33
},
{
"page": "User:Cyde/List of candidates for speedy deletion/Subpage",
"Edits": 28
},
{
"page": "Jeremy Corbyn",
"Edits": 27
},
{
"page": "Wikipedia:Administrators' noticeboard/Incidents",
"Edits": 21
},
{
"page": "Flavia Pennetta",
"Edits": 20
},
{
"page": "Total Drama Presents: The Ridonculous Race",
"Edits": 18
},
{
"page": "User talk:Dudeperson176123",
"Edits": 18
},
{
"page": "Wikipédia:Le Bistro/12 septembre 2015",
"Edits": 18
},
{
"page": "Wikipedia:In the news/Candidates",
"Edits": 17
},
{
"page": "Wikipedia:Requests for page protection",
"Edits": 17
}
]
-------------本文结束感谢您的阅读-------------