微信搜索bigdata029 | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 订阅本站 | 赞助作者:赞助作者

海量数据实时OLAP分析系统-Druid.io安装配置和体验

Druid lxw1234@qq.com 18059℃ 9评论

关键字:druid、druid.io、实时olap、大数据实时分析

导读:

一、Druid简介
二、Druid架构组成及相关依赖
三、Druid集群配置
四、Druid集群启动
五、Druid查询
六、后记

一、Druid简介

Druid是一个为大型冷数据集上实时探索查询而设计的开源数据分析和存储系统,提供极具成本效益并且永远在线的实时数据摄取和任意数据处理。

主要特性:

  • 为分析而设计——Druid是为OLAP工作流的探索性分析而构建。它支持各种filter、aggregator和查询类型,并为添加新功能提供了一个框架。用户已经利用Druid的基础设施开发了高级K查询和直方图功能。
  • 交互式查询——Druid的低延迟数据摄取架构允许事件在它们创建后毫秒内查询,因为Druid的查询延时通过只读取和扫描有必要的元素被优化。Aggregate和 filter没有坐等结果。
  • 高可用性——Druid是用来支持需要一直在线的SaaS的实现。你的数据在系统更新时依然可用、可查询。规模的扩大和缩小不会造成数据丢失。
  • 可伸缩——现有的Druid部署每天处理数十亿事件和TB级数据。Druid被设计成PB级别。

就系统而言,Druid功能位于PowerDrill和Dremel之间。它实现几乎所有Dremel提供的工具(Dremel处理任意嵌套数据结构,而Druid只允许一个基于数组的嵌套级别)并且从PowerDrill吸收一些有趣的数据格式和压缩方法。

Druid对于需要实时单一、海量数据流摄取产品非常适合。特别是如果你面向无停机操作时,如果你对查询查询的灵活性和原始数据访问要求,高于对速度和无停机操作,Druid可能不是正确的解决方案。在谈到查询速度时候,很有必要澄清“快速”的意思是:Druid是完全有可能在6TB的数据集上实现秒级查询。

二、Druid架构组成及其他依赖

druid.io

2.1 Overlord Node (Indexing Service)

Overlord会形成一个加载批处理和实时数据到系统中的集群,同时会对存储在系统中的数据变更(也称为索引服务)做出响应。另外,还包含了Middle Manager和Peons,一个Peon负责执行单个task,而Middle Manager负责管理这些Peons。

2.2 Coordinator Node

监控Historical节点组,以确保数据可用、可复制,并且在一般的“最佳”配置。它们通过从MySQL读取数据段的元数据信息,来决定哪些数据段应该在集群中被加载,使用Zookeeper来确定哪个Historical节点存在,并且创建Zookeeper条目告诉Historical节点加载和删除新数据段。

2.3 Historical Node

是对“historical”数据(非实时)进行处理存储和查询的地方。Historical节点响应从Broker节点发来的查询,并将结果返回给broker节点。它们在Zookeeper的管理下提供服务,并使用Zookeeper监视信号加载或删除新数据段。

2.4 Broker Node

接收来自外部客户端的查询,并将这些查询转发到Realtime和Historical节点。当Broker节点收到结果,它们将合并这些结果并将它们返回给调用者。由于了解拓扑,Broker节点使用Zookeeper来确定哪些Realtime和Historical节点的存在。

2.5 Real-time Node

实时摄取数据,它们负责监听输入数据流并让其在内部的Druid系统立即获取,Realtime节点同样只响应broker节点的查询请求,返回查询结果到broker节点。旧数据会被从Realtime节点转存至Historical节点。

2.6 ZooKeeper

为集群服务发现和维持当前的数据拓扑而服务;

2.7 MySQL

用来维持系统服务所需的数据段的元数据;

2.8 Deep Storage

保存“冷数据”,可以使用HDFS。

 

drui.io

三、Druid集群配置

3.1 环境信息

我这里有两台机器,node1有32G内存,上面部署了Histotical Node和Coordinator Node;node2有72G内存,上面部署了其他四个服务。

druid.io

3.2 通用配置(Common Configuration)

##创建MySQL数据库

CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
grant all on druid.* to druid@’%’ identified by ‘druid1234′ WITH GRANT OPTION;
flush privileges;

##配置文件

cd $DRUID_HOME/config/_common
vi common.runtime.properties(所有节点)

##使用Mysql存储元数据
druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-eight", "io.druid.extensions:mysql-metadata-storage","io.druid.extensions:druid-hdfs-storage"]

##zookeeper
druid.zk.service.host=zkNode1:2181,zkNode2:2181,zkNode3:2181

##Mysql配置
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://node1:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd1234

##配置deep storage到HDFS
druid.storage.type=hdfs
druid.storage.storageDirectory=hdfs://cdh5/tmp/druid/storage

##配置查询缓存,暂用本地,可配置memcached
druid.cache.type=local
druid.cache.sizeInBytes=10737418240

##配置监控
druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"]

##配置Indexing service的名字
druid.selectors.indexing.serviceName=druid/overlord

##
druid.emitter=logging

3.3 Overlord Node(Indexing Service)

在运行Overlord Node节点上:

cd $DRUID_HOME/config/overlord
vi runtime.properties

druid.host=node2
druid.port=8090
druid.service=druid/overlord

# Only required if you are autoscaling middle managers
druid.indexer.autoscale.doAutoscale=true
druid.indexer.autoscale.strategy=ec2
druid.indexer.autoscale.workerIdleTimeout=PT90m
druid.indexer.autoscale.terminatePeriod=PT5M
druid.indexer.autoscale.workerVersion=0

# Upload all task logs to deep storage
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog

# Run in remote mode
druid.indexer.runner.type=remote
druid.indexer.runner.minWorkerVersion=0

# Store all task state in the metadata storage
druid.indexer.storage.type=metadata

3.4 MiddleManager Node

在运行MiddleManager Node节点上:
cd $DRUID_HOME/config/middleManager
vi runtime.properties

druid.host=node2
druid.port=8091
druid.service=druid/middlemanager

druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog

# Resources for peons
druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
druid.indexer.task.baseTaskDir=/tmp/persistent/task/

3.5 Coordinator Node

在运行Coordinator Node节点上:
cd $DRUID_HOME/config/coordinator
vi runtime.properties

druid.host=node1
druid.port=8081
druid.service=coordinator

druid.coordinator.startDelay=PT5M

3.6 Historical Node

在运行Historical Node节点上:
cd $DRUID_HOME/config/historical
vi runtime.properties

druid.host=node1
druid.port=8082
druid.service=druid/historical

druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true

druid.processing.buffer.sizeBytes=1073741824
druid.processing.numThreads=9

druid.server.http.numThreads=9
druid.server.maxSize=300000000000

druid.segmentCache.locations=[{"path": " /tmp/druid/indexCache", "maxSize": 300000000000}]

druid.monitoring.monitors=["io.druid.server.metrics.HistoricalMetricsMonitor", "com.metamx.metrics.JvmMonitor"]

3.7 Broker Node

在运行Broker Node节点上:
cd $DRUID_HOME/config/broker
vi runtime.properties

druid.host=node2
druid.port=8092
druid.service=druid/broker

druid.broker.http.numConnections=20
druid.broker.http.readTimeout=PT5M

druid.processing.buffer.sizeBytes=2147483647
druid.processing.numThreads=11

druid.server.http.numThreads=20

3.8 Real-time Node

在运行Real-time Node节点上:
cd $DRUID_HOME/config/realtime
vi runtime.properties

druid.host=node2
druid.port=8093
druid.service=druid/realtime

druid.processing.buffer.sizeBytes=1073741824
druid.processing.numThreads=5

# Override emitter to print logs about events ingested, rejected, etc
druid.emitter=logging

druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor", "com.metamx.metrics.JvmMonitor"]

四、Druid集群启动

首次启动时候,可以遵循下面的启动顺序。

4.1 Broker Node

cd $DRUID_HOME/
cp run_druid_server.sh run_broker.sh
vi run_broker.sh

替换以下内容:

SERVER_TYPE=broker

# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms5g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=24g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17071 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Ddruid.extensions.localRepository=${MAVEN_DIR}"

执行./run_broker.sh启动Broker Node:

druid.io

4.2 Historical Node

cd $DRUID_HOME/
cp run_druid_server.sh run_historical.sh

vi run_historical.sh

替换以下内容:

SERVER_TYPE=historical

# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=16g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

执行命令./run_historical.sh启动Historical Node:

druid

4.3 Coordinator Node

cd $DRUID_HOME/
cp run_druid_server.sh run_coordinator.sh
vi run_coordinator.sh

替换以下内容:

SERVER_TYPE=coordinator

# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

执行命令./run_coordinator.sh启动Coordinator Node.

4.4 Middle Manager

cd $DRUID_HOME/
cp run_druid_server.sh run_middleManager.sh
vi run_middleManager.sh

替换以下内容:

SERVER_TYPE=middleManager
# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid -Ddruid.extensions.localR
epository=${MAVEN_DIR}"

执行命令./run_middleManager.sh启动MiddleManager Node。

4.5 Overlord Node

cd $DRUID_HOME/
cp run_druid_server.sh run_overlord.sh
vi run_overlord.sh

替换以下内容:

SERVER_TYPE=overlord
# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx4g -Xms4g -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

执行命令./run_overlord.sh启动Overlord Node:

druid.io

4.6 Real-time Node

cd $DRUID_HOME/
cp run_druid_server.sh run_realtime.sh
vi run_realtime.sh
替换以下内容:

SERVER_TYPE=realtime

# start process
JAVA_ARGS="${JAVA_ARGS} -Xmx13g -Xms13g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=9g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -
XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError"
JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec"
JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid"
JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17072 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremot
e.ssl=false"
JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"

##特别需要注意参数:

-Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec

启动RealTime Node需要指定一个realtime数据源的配置文件,本文中使用example提供的wikipedia_realtime.spec,启动后,该数据源从irc.wikimedia.org获取实时数据。

关于RealTime Node的配置,后续文章将会详细介绍。

执行命令./run_realtime.sh启动RealTime Node。

五、Druid查询

第四部分中启动RealTime Node时候使用了例子中自带的配置文件wikipedia_realtime.spec,启动后,该RealTime Node会从irc.wikimedia.org获取实时数据,本章将以该数据源为例,学习几种最常见的查询。

5.1 select查询

首先编辑查询配置文件select_query.json

{
   "queryType": "select",
   "dataSource": "wikipedia",
   "dimensions":[],
   "metrics":[],
   "granularity": "all",
   "intervals": [
     "2015-11-01/2015-11-20"
   ],
   "pagingSpec":{"pagingIdentifiers": {}, "threshold":10}
 }

该配置文件的含义是从数据源”wikipedia”进行select查询所有列,时间区间为2015-11-01/2015-11-20,每10条记录一个分页。

执行命令查询:

curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @select_query.json

瞬间返回结果:

druid.io

5.2 基于时间序列的查询Timeseries query

编辑查询配置文件timeseries.json

{
    "queryType": "timeseries",
    "dataSource": "wikipedia",
    "intervals": [ "2010-01-01/2020-01-01" ],
    "granularity": "minute",
    "aggregations": [
        {"type": "longSum", "fieldName": "count", "name": "edit_count"},
        {"type": "doubleSum", "fieldName": "added", "name": "chars_added"}
    ]
}

该配置文件的含义是:从数据源” wikipedia”中进行时间序列查询,区间为2010-01-01/2020-01-01,按分钟汇总结果,汇总字段为count和added;

执行查询命令:

curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @timeseries.json

同样瞬间返回结果:

druid.io

5.3 TopN查询

编辑查询文件topn.json

{
  "queryType": "topN",
  "dataSource": "wikipedia",
  "granularity": "all",
  "dimension": "page",
  "metric": "edit_count",
  "threshold" : 10,
  "aggregations": [
    {"type": "longSum", "fieldName": "count", "name": "edit_count"}
  ],
  "filter": { "type": "selector", "dimension": "country", "value": "United States" },
  "intervals": ["2012-10-01T00:00/2020-01-01T00"]
}

该文件含义是:从数据源” wikipedia”进行TopN查询,其中N=10,维度为page,指标为edit_count,也就是,在page维度上将edit_count汇总后取Top 10.

执行查询命令:

curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @topn.json

结果为:

druid.io

六、后记

Druid目前已经有很多公司用于实时计算和实时OLAP,而且效果很好。虽然它的配置和查询都比较复杂和繁琐,但如果是真正基于海量数据的实时OLAP,它的威力还是很强大的。我将持续学习和分享Druid的相关技术,验证它在海量数据实时OLAP上的效果,敬请关注我的博客

参考文章:

http://druid.io

http://www.csdn.net/article/2014-10-30/2822381/2

您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » 海量数据实时OLAP分析系统-Druid.io安装配置和体验

喜欢 (21)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(9)个小伙伴在吐槽
  1. 您说在realtime节点中通过wikipedia_realtime.spec向irc.wikimedia.org取实时数据,我想问在何处指定了是从irc.wikimedia.org取?因为druid自带的wikipedia_realtime.spec文件中并没有指定irc.wikimedia.org这个地址。
    ldeng762016-02-03 01:28 回复
    • 我用的druid-0.8.1中druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec有下面的配置项: "ioConfig": { "type": "realtime", "firehose": { "type": "irc", "host": "irc.wikimedia.org", "channels": [ "#en.wikipedia", "#fr.wikipedia", "#de.wikipedia", "#ja.wikipedia" ] },
      lxw1234@qq.com2016-02-03 08:22 回复
    • 非常感谢您的回答,这个从irc.wikimedia.org取数据的测试我了通过。但对于导入本地的数据还没搞通,按照你的HadoopDruidIndexer的方式导入wikipedia_data.json的数据总是报错: 1) Not enough direct memory. Please adjust -XX:MaxDirectMemorySize, druid.processing.buffer.sizeBytes, or druid.processing.numThreads: maxDirectMemory[477,102,080], memoryNeeded[2,147,483,648] = druid.processing.buffer.sizeBytes[1,073,741,824] * ( druid.processing.numThreads[1] + 1 ) 不知您遇到过这问题没有?
      ldeng762016-02-03 17:14 回复
  2. 直接在druid 0.8.3上折腾好痛苦,遇到了好多坑。幸好找到了imply.io(是druid的华人开发者创办的公司),它提供了一键安装druid,好爽,半小时就搞定了:安装->导入数据->查询数据->通过pivot可视化查看数据,强烈建议初学者用imply来使用druid。高手请飘过。
    ldeng762016-02-03 22:37 回复
    • 你好 ,能留一下联系方式吗。我想向您请教一些问题。我的QQ是1804686627.谢谢
      FollowYan2016-03-22 16:41 回复
  3. 请问下,我在index一个数据源之后,如何更新数据源中的数据,包含删除、编辑、更新
    lion2016-02-25 09:31 回复
  4. 您好,我想问下,0.9版本后conf目录下就没有realtime的子文件夹了,那么realtime节点是怎么启动的呢?官网的demo中也只给了其他几个节点的配置与启动
    异想天开de2016-07-12 10:52 回复
    • https://groups.google.com/forum/#!topic/druid-user/Dj7x7iSj4YY 作者有回,0.9 以后都放在了 middleManager 处理实时数据
      栈木头2017-04-12 18:23 回复
  5. 我去,将近百G的内存
    阿飞2016-09-08 17:51 回复