关键字:druid、druid.io、实时olap、大数据实时分析
导读:
一、Druid简介
二、Druid架构组成及相关依赖
三、Druid集群配置
四、Druid集群启动
五、Druid查询
六、后记
一、Druid简介
Druid是一个为大型冷数据集上实时探索查询而设计的开源数据分析和存储系统,提供极具成本效益并且永远在线的实时数据摄取和任意数据处理。
主要特性:
- 为分析而设计——Druid是为OLAP工作流的探索性分析而构建。它支持各种filter、aggregator和查询类型,并为添加新功能提供了一个框架。用户已经利用Druid的基础设施开发了高级K查询和直方图功能。
- 交互式查询——Druid的低延迟数据摄取架构允许事件在它们创建后毫秒内查询,因为Druid的查询延时通过只读取和扫描有必要的元素被优化。Aggregate和 filter没有坐等结果。
- 高可用性——Druid是用来支持需要一直在线的SaaS的实现。你的数据在系统更新时依然可用、可查询。规模的扩大和缩小不会造成数据丢失。
- 可伸缩——现有的Druid部署每天处理数十亿事件和TB级数据。Druid被设计成PB级别。
就系统而言,Druid功能位于PowerDrill和Dremel之间。它实现几乎所有Dremel提供的工具(Dremel处理任意嵌套数据结构,而Druid只允许一个基于数组的嵌套级别)并且从PowerDrill吸收一些有趣的数据格式和压缩方法。
Druid对于需要实时单一、海量数据流摄取产品非常适合。特别是如果你面向无停机操作时,如果你对查询查询的灵活性和原始数据访问要求,高于对速度和无停机操作,Druid可能不是正确的解决方案。在谈到查询速度时候,很有必要澄清“快速”的意思是:Druid是完全有可能在6TB的数据集上实现秒级查询。
二、Druid架构组成及其他依赖
2.1 Overlord Node (Indexing Service)
Overlord会形成一个加载批处理和实时数据到系统中的集群,同时会对存储在系统中的数据变更(也称为索引服务)做出响应。另外,还包含了Middle Manager和Peons,一个Peon负责执行单个task,而Middle Manager负责管理这些Peons。
2.2 Coordinator Node
监控Historical节点组,以确保数据可用、可复制,并且在一般的“最佳”配置。它们通过从MySQL读取数据段的元数据信息,来决定哪些数据段应该在集群中被加载,使用Zookeeper来确定哪个Historical节点存在,并且创建Zookeeper条目告诉Historical节点加载和删除新数据段。
2.3 Historical Node
是对“historical”数据(非实时)进行处理存储和查询的地方。Historical节点响应从Broker节点发来的查询,并将结果返回给broker节点。它们在Zookeeper的管理下提供服务,并使用Zookeeper监视信号加载或删除新数据段。
2.4 Broker Node
接收来自外部客户端的查询,并将这些查询转发到Realtime和Historical节点。当Broker节点收到结果,它们将合并这些结果并将它们返回给调用者。由于了解拓扑,Broker节点使用Zookeeper来确定哪些Realtime和Historical节点的存在。
2.5 Real-time Node
实时摄取数据,它们负责监听输入数据流并让其在内部的Druid系统立即获取,Realtime节点同样只响应broker节点的查询请求,返回查询结果到broker节点。旧数据会被从Realtime节点转存至Historical节点。
2.6 ZooKeeper
为集群服务发现和维持当前的数据拓扑而服务;
2.7 MySQL
用来维持系统服务所需的数据段的元数据;
2.8 Deep Storage
保存“冷数据”,可以使用HDFS。
三、Druid集群配置
3.1 环境信息
我这里有两台机器,node1有32G内存,上面部署了Histotical Node和Coordinator Node;node2有72G内存,上面部署了其他四个服务。
3.2 通用配置(Common Configuration)
##创建MySQL数据库
CREATE DATABASE `druid` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
grant all on druid.* to druid@’%’ identified by ‘druid1234′ WITH GRANT OPTION;
flush privileges;
##配置文件
cd $DRUID_HOME/config/_common
vi common.runtime.properties(所有节点)
##使用Mysql存储元数据 druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-eight", "io.druid.extensions:mysql-metadata-storage","io.druid.extensions:druid-hdfs-storage"] ##zookeeper druid.zk.service.host=zkNode1:2181,zkNode2:2181,zkNode3:2181 ##Mysql配置 druid.metadata.storage.type=mysql druid.metadata.storage.connector.connectURI=jdbc:mysql://node1:3306/druid druid.metadata.storage.connector.user=druid druid.metadata.storage.connector.password=diurd1234 ##配置deep storage到HDFS druid.storage.type=hdfs druid.storage.storageDirectory=hdfs://cdh5/tmp/druid/storage ##配置查询缓存,暂用本地,可配置memcached druid.cache.type=local druid.cache.sizeInBytes=10737418240 ##配置监控 druid.monitoring.monitors=["com.metamx.metrics.JvmMonitor"] ##配置Indexing service的名字 druid.selectors.indexing.serviceName=druid/overlord ## druid.emitter=logging
3.3 Overlord Node(Indexing Service)
在运行Overlord Node节点上:
cd $DRUID_HOME/config/overlord
vi runtime.properties
druid.host=node2 druid.port=8090 druid.service=druid/overlord # Only required if you are autoscaling middle managers druid.indexer.autoscale.doAutoscale=true druid.indexer.autoscale.strategy=ec2 druid.indexer.autoscale.workerIdleTimeout=PT90m druid.indexer.autoscale.terminatePeriod=PT5M druid.indexer.autoscale.workerVersion=0 # Upload all task logs to deep storage druid.indexer.logs.type=hdfs druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog # Run in remote mode druid.indexer.runner.type=remote druid.indexer.runner.minWorkerVersion=0 # Store all task state in the metadata storage druid.indexer.storage.type=metadata
3.4 MiddleManager Node
在运行MiddleManager Node节点上:
cd $DRUID_HOME/config/middleManager
vi runtime.properties
druid.host=node2 druid.port=8091 druid.service=druid/middlemanager druid.indexer.logs.type=hdfs druid.indexer.logs.directory=hdfs://cdh5/tmp/druid/indexlog # Resources for peons druid.indexer.runner.javaOpts=-server -Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps druid.indexer.task.baseTaskDir=/tmp/persistent/task/
3.5 Coordinator Node
在运行Coordinator Node节点上:
cd $DRUID_HOME/config/coordinator
vi runtime.properties
druid.host=node1 druid.port=8081 druid.service=coordinator druid.coordinator.startDelay=PT5M
3.6 Historical Node
在运行Historical Node节点上:
cd $DRUID_HOME/config/historical
vi runtime.properties
druid.host=node1 druid.port=8082 druid.service=druid/historical druid.historical.cache.useCache=true druid.historical.cache.populateCache=true druid.processing.buffer.sizeBytes=1073741824 druid.processing.numThreads=9 druid.server.http.numThreads=9 druid.server.maxSize=300000000000 druid.segmentCache.locations=[{"path": " /tmp/druid/indexCache", "maxSize": 300000000000}] druid.monitoring.monitors=["io.druid.server.metrics.HistoricalMetricsMonitor", "com.metamx.metrics.JvmMonitor"]
3.7 Broker Node
在运行Broker Node节点上:
cd $DRUID_HOME/config/broker
vi runtime.properties
druid.host=node2 druid.port=8092 druid.service=druid/broker druid.broker.http.numConnections=20 druid.broker.http.readTimeout=PT5M druid.processing.buffer.sizeBytes=2147483647 druid.processing.numThreads=11 druid.server.http.numThreads=20
3.8 Real-time Node
在运行Real-time Node节点上:
cd $DRUID_HOME/config/realtime
vi runtime.properties
druid.host=node2 druid.port=8093 druid.service=druid/realtime druid.processing.buffer.sizeBytes=1073741824 druid.processing.numThreads=5 # Override emitter to print logs about events ingested, rejected, etc druid.emitter=logging druid.monitoring.monitors=["io.druid.segment.realtime.RealtimeMetricsMonitor", "com.metamx.metrics.JvmMonitor"]
四、Druid集群启动
首次启动时候,可以遵循下面的启动顺序。
4.1 Broker Node
cd $DRUID_HOME/
cp run_druid_server.sh run_broker.sh
vi run_broker.sh
替换以下内容:
SERVER_TYPE=broker # start process JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms5g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=24g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17071 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Ddruid.extensions.localRepository=${MAVEN_DIR}"
执行./run_broker.sh启动Broker Node:
4.2 Historical Node
cd $DRUID_HOME/
cp run_druid_server.sh run_historical.sh
vi run_historical.sh
替换以下内容:
SERVER_TYPE=historical # start process JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=16g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"
执行命令./run_historical.sh启动Historical Node:
4.3 Coordinator Node
cd $DRUID_HOME/
cp run_druid_server.sh run_coordinator.sh
vi run_coordinator.sh
替换以下内容:
SERVER_TYPE=coordinator # start process JAVA_ARGS="${JAVA_ARGS} -Xmx10g -Xms10g -XX:NewSize=512m -XX:MaxNewSize=512m -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"
执行命令./run_coordinator.sh启动Coordinator Node.
4.4 Middle Manager
cd $DRUID_HOME/
cp run_druid_server.sh run_middleManager.sh
vi run_middleManager.sh
替换以下内容:
SERVER_TYPE=middleManager # start process JAVA_ARGS="${JAVA_ARGS} -Xmx64m -Xms64m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid -Ddruid.extensions.localR epository=${MAVEN_DIR}"
执行命令./run_middleManager.sh启动MiddleManager Node。
4.5 Overlord Node
cd $DRUID_HOME/
cp run_druid_server.sh run_overlord.sh
vi run_overlord.sh
替换以下内容:
SERVER_TYPE=overlord # start process JAVA_ARGS="${JAVA_ARGS} -Xmx4g -Xms4g -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"
执行命令./run_overlord.sh启动Overlord Node:
4.6 Real-time Node
cd $DRUID_HOME/
cp run_druid_server.sh run_realtime.sh
vi run_realtime.sh
替换以下内容:
SERVER_TYPE=realtime # start process JAVA_ARGS="${JAVA_ARGS} -Xmx13g -Xms13g -XX:NewSize=2g -XX:MaxNewSize=2g -XX:MaxDirectMemorySize=9g -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails - XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError" JAVA_ARGS="${JAVA_ARGS} -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8" JAVA_ARGS="${JAVA_ARGS} -Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec" JAVA_ARGS="${JAVA_ARGS} -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -Djava.io.tmpdir=/tmp/druid" JAVA_ARGS="${JAVA_ARGS} -Dcom.sun.management.jmxremote.port=17072 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremot e.ssl=false" JAVA_ARGS="${JAVA_ARGS} -Ddruid.extensions.localRepository=${MAVEN_DIR}"
##特别需要注意参数:
-Ddruid.realtime.specFile=/home/liuxiaowen/druid-0.8.1/examples/wikipedia/wikipedia_realtime.spec
启动RealTime Node需要指定一个realtime数据源的配置文件,本文中使用example提供的wikipedia_realtime.spec,启动后,该数据源从irc.wikimedia.org获取实时数据。
关于RealTime Node的配置,后续文章将会详细介绍。
执行命令./run_realtime.sh启动RealTime Node。
五、Druid查询
第四部分中启动RealTime Node时候使用了例子中自带的配置文件wikipedia_realtime.spec,启动后,该RealTime Node会从irc.wikimedia.org获取实时数据,本章将以该数据源为例,学习几种最常见的查询。
5.1 select查询
首先编辑查询配置文件select_query.json
{ "queryType": "select", "dataSource": "wikipedia", "dimensions":[], "metrics":[], "granularity": "all", "intervals": [ "2015-11-01/2015-11-20" ], "pagingSpec":{"pagingIdentifiers": {}, "threshold":10} }
该配置文件的含义是从数据源”wikipedia”进行select查询所有列,时间区间为2015-11-01/2015-11-20,每10条记录一个分页。
执行命令查询:
curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @select_query.json
瞬间返回结果:
5.2 基于时间序列的查询Timeseries query
编辑查询配置文件timeseries.json
{ "queryType": "timeseries", "dataSource": "wikipedia", "intervals": [ "2010-01-01/2020-01-01" ], "granularity": "minute", "aggregations": [ {"type": "longSum", "fieldName": "count", "name": "edit_count"}, {"type": "doubleSum", "fieldName": "added", "name": "chars_added"} ] }
该配置文件的含义是:从数据源” wikipedia”中进行时间序列查询,区间为2010-01-01/2020-01-01,按分钟汇总结果,汇总字段为count和added;
执行查询命令:
curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @timeseries.json
同样瞬间返回结果:
5.3 TopN查询
编辑查询文件topn.json
{ "queryType": "topN", "dataSource": "wikipedia", "granularity": "all", "dimension": "page", "metric": "edit_count", "threshold" : 10, "aggregations": [ {"type": "longSum", "fieldName": "count", "name": "edit_count"} ], "filter": { "type": "selector", "dimension": "country", "value": "United States" }, "intervals": ["2012-10-01T00:00/2020-01-01T00"] }
该文件含义是:从数据源” wikipedia”进行TopN查询,其中N=10,维度为page,指标为edit_count,也就是,在page维度上将edit_count汇总后取Top 10.
执行查询命令:
curl -X POST ‘http://node2:8093/druid/v2/?pretty’ -H ‘content-type: application/json’ -d @topn.json
结果为:
六、后记
Druid目前已经有很多公司用于实时计算和实时OLAP,而且效果很好。虽然它的配置和查询都比较复杂和繁琐,但如果是真正基于海量数据的实时OLAP,它的威力还是很强大的。我将持续学习和分享Druid的相关技术,验证它在海量数据实时OLAP上的效果,敬请关注我的博客。
参考文章:
http://druid.io
http://www.csdn.net/article/2014-10-30/2822381/2
您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。
如果觉得本博客对您有帮助,请 赞助作者 。