关键字:druid,HadoopDruidIndexer,Batch Data Ingestion
有两种方式将批量数据加载到Druid集群中,使用Indexing Service或者HadoopDruidIndexer。
本文介绍如何使用HadoopDruidIndexer向Druid集群中加载批量数据。
如何选择批量数据的加载方式
Indexing Service作为Druid集群的一部分,运行在多个节点上,可以完成一些不同类型的Indexing任务,Indexing Service里面包含了元数据存储和管理的功能,启动后,外部系统可以以编程方式来与之互相协作,完成周期性的Indexing任务。
HadoopDruidIndexer通过运行Hadoop job来完成segments的分隔和索引,如果已经有Hadoop平台,大量数据的Indexing任务,通过HadoopDruidIndexer来完成会更加效率。
使用HadoopDruidIndexer加载批量数据
使用HadoopDruidIndexer加载批量数据时,会启动MapReduce任务,将数据生成segments文件,存放在HDFS上,同时向Druid metastore中写入segments元数据。
Coordinator Node监控元数据中有新增的segments,会将指令写入Zookeeper,而Historical Node监控到Zookeeper中的指令之后,从HDFS上下载segments文件到本地。
之后,该批量数据便可从Druid中查询。
特别注意:由于DeepStorage使用了HDFS,因此_common配置文件common.runtime.properties中的druid.extensions.coordinates需要加入:io.druid.extensions:druid-hdfs-storage,特别是Historical Node和Coordinator Node需要使用这个,如果之前没有加,这两个服务需要重启。
另外,在运行过程中如果有Hadoop相关的类找不到,则需要将相关jar包加入CLASSPATH。
我这里采用了简单粗暴的做法:在启动Historical Node和Coordinator Node时候,将HADOOP_CLASSPATH全部加入,即在run_historical.sh和run_coordinator.sh中增加:DRUID_CP=”${DRUID_CP}:`hadoop classpath`”
注:运行这两个Node的机器上装有Hadoop Client,并配置好了环境变量。
HadoopDruidIndexer配置文件
需要编写一个spec文件,用于配置数据源及相关信息。
我的源数据存储在HDFS/hivedata/warehouse/liuxiaowen.db/lxw_drui_testdata/目录下,数据有三个字段,timestamp、cookieid、ip
vi hadoopindex.spec
{ "dataSchema": { "dataSource": "lxw1234", "parser": { "type": "string", "parseSpec": { "format": "csv", "timestampSpec": { "column": "timestamp", "format" : "yyyy-MM-dd HH:mm:ss" }, "dimensionsSpec": { "dimensions": [ "cookieid", "ip" ], "dimensionExclusions": [], "spatialDimensions": [] }, "listDelimiter" : ",", "columns" : ["timestamp","cookieid","ip"] } }, "metricsSpec": [ { "type": "count", "name": "count" } ], "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": "NONE", "intervals" : ["2015-11-01/2015-12-01"] } }, "ioConfig": { "type": "hadoop", "inputSpec" : { "type" : "static", "paths" : "hdfs://cdh5/hivedata/warehouse/liuxiaowen.db/lxw_drui_testdata/000000_0,hdfs://cdh5/hivedata/warehouse/liuxiaowen.db/lxw_drui_testdata/000001_0" }, "metadataUpdateSpec" : { "type":"mysql", "connectURI" : "jdbc:mysql:\/\/127.0.0.102:3306\/druid", "password" : "druid", "segmentTable" : "druid_segments", "user" : "druid" }, "segmentOutputPath" : "hdfs:\/\/cdh5\/tmp\/data\/index\/output" }, "tuningConfig" : { "type" : "hadoop", "workingPath": "\/tmp\/druid", "partitionsSpec" : { "type" : "dimension", "partitionDimension" : null, "targetPartitionSize" : 5000000, "maxPartitionSize" : 7500000, "assumeGrouped" : false, "numShards" : -1 }, "shardSpecs" : { }, "leaveIntermediate" : false, "cleanupOnFailure" : true, "overwriteFiles" : false, "ignoreInvalidRows" : false, "jobProperties" : { }, "combineText" : false, "persistInHeap" : false, "ingestOffheap" : false, "bufferSize" : 134217728, "aggregationBufferRatio" : 0.5, "rowFlushBoundary" : 300000 } }
运行HadoopDruidIndexer MapReduce任务
使用下面的命令,运行HadoopDruidIndexer MapReduce任务:
java -Xmx256m -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8 -classpath /home/liuxiaowen/druid-0.8.1/config/_common/common.runtime.properties:\ /home/liuxiaowen/druid-0.8.1/lib/*:/etc/hadoop/conf \ io.druid.cli.Main index hadoop /home/liuxiaowen/druid-0.8.1/hadoopindex.spec
MapReduce完成后,将新的segments写入元数据MySQL:
Coordinator Node从元数据中发现新的segments,给Historaical发送Load指令:
Historical Node监测到指令,Load新的segments:
查询加载的数据
编写查询配置文件,按天统计记录数:
vi timeseries2.json
{ "queryType": "timeseries", "dataSource": "lxw1234", "intervals": [ "2015-11-15/2015-11-18" ], "granularity": "day", "aggregations": [ {"type": "longSum", "fieldName": "count", "name": "total_count"} ] }
执行查询(这里指定的是Broker Node的地址):
curl -X POST 'http://node2:8092/druid/v2/?pretty' -H 'content-type: application/json' -d @tieries2.json
有一个问题,还是时间格式的问题,显示的仍然是UTC时间,待解决。
接下来将学习Druid提供的各种查询类型。
您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。
如果觉得本博客对您有帮助,请 赞助作者 。
转载请注明:lxw的大数据田地 » 使用HadoopDruidIndexer向Druid集群中加载批量数据-Batch Data Ingestion