微信搜索lxw1234bigdata | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

使用HadoopDruidIndexer向Druid集群中加载批量数据-Batch Data Ingestion

Druid lxw1234@qq.com 20785℃ 3评论

关键字:druid,HadoopDruidIndexer,Batch Data Ingestion

有两种方式将批量数据加载到Druid集群中,使用Indexing Service或者HadoopDruidIndexer。

本文介绍如何使用HadoopDruidIndexer向Druid集群中加载批量数据。

如何选择批量数据的加载方式

Indexing Service作为Druid集群的一部分,运行在多个节点上,可以完成一些不同类型的Indexing任务,Indexing Service里面包含了元数据存储和管理的功能,启动后,外部系统可以以编程方式来与之互相协作,完成周期性的Indexing任务。

HadoopDruidIndexer通过运行Hadoop job来完成segments的分隔和索引,如果已经有Hadoop平台,大量数据的Indexing任务,通过HadoopDruidIndexer来完成会更加效率。

使用HadoopDruidIndexer加载批量数据

使用HadoopDruidIndexer加载批量数据时,会启动MapReduce任务,将数据生成segments文件,存放在HDFS上,同时向Druid metastore中写入segments元数据。

Coordinator Node监控元数据中有新增的segments,会将指令写入Zookeeper,而Historical Node监控到Zookeeper中的指令之后,从HDFS上下载segments文件到本地。

之后,该批量数据便可从Druid中查询。

特别注意:由于DeepStorage使用了HDFS,因此_common配置文件common.runtime.properties中的druid.extensions.coordinates需要加入:io.druid.extensions:druid-hdfs-storage,特别是Historical Node和Coordinator Node需要使用这个,如果之前没有加,这两个服务需要重启。

另外,在运行过程中如果有Hadoop相关的类找不到,则需要将相关jar包加入CLASSPATH。

我这里采用了简单粗暴的做法:在启动Historical Node和Coordinator Node时候,将HADOOP_CLASSPATH全部加入,即在run_historical.sh和run_coordinator.sh中增加:DRUID_CP=”${DRUID_CP}:`hadoop classpath`”

注:运行这两个Node的机器上装有Hadoop Client,并配置好了环境变量。

HadoopDruidIndexer配置文件

需要编写一个spec文件,用于配置数据源及相关信息。

我的源数据存储在HDFS/hivedata/warehouse/liuxiaowen.db/lxw_drui_testdata/目录下,数据有三个字段,timestamp、cookieid、ip

vi hadoopindex.spec

{
  "dataSchema": {
    "dataSource": "lxw1234",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "csv",
        "timestampSpec": {
          "column": "timestamp",
          "format" : "yyyy-MM-dd HH:mm:ss"
        },
        "dimensionsSpec": {
          "dimensions": [
            "cookieid",
            "ip"
          ],
          "dimensionExclusions": [],
          "spatialDimensions": []
        },
        "listDelimiter" : ",",
        "columns" : ["timestamp","cookieid","ip"]
      }
    },
    "metricsSpec": [
      {
        "type": "count",
        "name": "count"
      }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": "NONE",
      "intervals" : ["2015-11-01/2015-12-01"]
    }
  },
  "ioConfig": {
    "type": "hadoop",
    "inputSpec" : {
      "type" : "static",
      "paths" : "hdfs://cdh5/hivedata/warehouse/liuxiaowen.db/lxw_drui_testdata/000000_0,hdfs://cdh5/hivedata/warehouse/liuxiaowen.db/lxw_drui_testdata/000001_0"
    },
    "metadataUpdateSpec" : {
      "type":"mysql",
      "connectURI" : "jdbc:mysql:\/\/127.0.0.102:3306\/druid",
      "password" : "druid",
      "segmentTable" : "druid_segments",
      "user" : "druid"
    },
    "segmentOutputPath" : "hdfs:\/\/cdh5\/tmp\/data\/index\/output"
  },
  "tuningConfig" : {
    "type" : "hadoop",
    "workingPath": "\/tmp\/druid",
    "partitionsSpec" : {
      "type" : "dimension",
      "partitionDimension" : null,
      "targetPartitionSize" : 5000000,
      "maxPartitionSize" : 7500000,
      "assumeGrouped" : false,
      "numShards" : -1
    },
    "shardSpecs" : { },
    "leaveIntermediate" : false,
    "cleanupOnFailure" : true,
    "overwriteFiles" : false,
    "ignoreInvalidRows" : false,
    "jobProperties" : { },
    "combineText" : false,
    "persistInHeap" : false,
    "ingestOffheap" : false,
    "bufferSize" : 134217728,
    "aggregationBufferRatio" : 0.5,
    "rowFlushBoundary" : 300000
  }
}

运行HadoopDruidIndexer MapReduce任务

使用下面的命令,运行HadoopDruidIndexer MapReduce任务:

java -Xmx256m -Duser.timezone=GMT+8 -Dfile.encoding=UTF-8 -classpath /home/liuxiaowen/druid-0.8.1/config/_common/common.runtime.properties:\
/home/liuxiaowen/druid-0.8.1/lib/*:/etc/hadoop/conf \
io.druid.cli.Main index hadoop /home/liuxiaowen/druid-0.8.1/hadoopindex.spec

MapReduce完成后,将新的segments写入元数据MySQL:

druid.io

druid.io

Coordinator Node从元数据中发现新的segments,给Historaical发送Load指令:

druid.io

Historical Node监测到指令,Load新的segments:

druid.io

查询加载的数据

编写查询配置文件,按天统计记录数:

vi timeseries2.json

{
    "queryType": "timeseries", 
    "dataSource": "lxw1234", 
    "intervals": [ "2015-11-15/2015-11-18" ], 
    "granularity": "day", 
    "aggregations": [
        {"type": "longSum", "fieldName": "count", "name": "total_count"}
    ]
}

执行查询(这里指定的是Broker Node的地址):

curl -X POST 'http://node2:8092/druid/v2/?pretty' -H 'content-type: application/json'  -d @tieries2.json

druid.io

有一个问题,还是时间格式的问题,显示的仍然是UTC时间,待解决。

接下来将学习Druid提供的各种查询类型。

您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。

 

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » 使用HadoopDruidIndexer向Druid集群中加载批量数据-Batch Data Ingestion

喜欢 (11)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(3)个小伙伴在吐槽
  1. 请问这样导入的性能如何?
    luoyueshi2015-11-24 15:37 回复
  2. 解决: druid默认使用UTC,发现日志和系统时间相差8小时,于是改为 UTC+8,你把设置改一下
    运维耗子2018-07-05 15:55 回复
  3. 请问一下历史数据的roll up怎么搞?比如说30天之前的数据,我想把精度降低到1天一个
    小八2018-09-05 16:10 回复