微信搜索lxw1234bigdata | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

异构数据源海量数据交换工具-Taobao DataX 下载和使用

大数据平台 lxw1234@qq.com 94509℃ 20评论

DataX介绍

DataX是一个在异构的数据库/文件系统之间高速交换数据的工具,实现了在任意的数据处理系统(RDBMS/Hdfs/Local filesystem)之间的数据交换。

目前成熟的数据导入导出工具比较多,但是一般都只能用于数据导入或者导出,并且只能支持一个或者几个特定类型的数据库。
这样带来的一个问题是,如果我们拥有很多不同类型的数据库/文件系统(Mysql/Oracle/Rac/Hive/Other…),
并且经常需要在它们之间导入导出数据,那么我们可能需要开发/维护/学习使用一批这样的工具(jdbcdump/dbloader/multithread/getmerge+sqlloader/mysqldumper…)。而且以后每增加一种库类型,我们需要的工具数目将线性增长。(当我们需要将mysql的数据导入oracle的时候,有没有过想从jdbcdump和dbloader上各掰下来一半拼在一起到冲动?)这些工具有些使用文件中转数据,有些使用管道,不同程度的为数据中转带来额外开销,效率差别很非常大。
很多工具也无法满足ETL任务中常见的需求,比如日期格式转化,特性字符的转化,编码转换。

另外,有些时候,我们希望在一个很短的时间窗口内,将一份数据从一个数据库同时导出到多个不同类型的数据库。

DataX正是为了解决这些问题而生。

datax

datax

DataX特点

  • 在异构的数据库/文件系统之间高速交换数据
  • 采用Framework + plugin架构构建,Framework处理了缓冲,流控,并发,上下文加载等高速数据交换的大部分技术问题,提供了简单的接口与插件交互,插件仅需实现对数据处理系统的访问
  • 运行模式:stand-alone
  • 数据传输过程在单进程内完成,全内存操作,不读写磁盘,也没有IPC
  • 开放式的框架,开发者可以在极短的时间开发一个新插件以快速支持新的数据库/文件系统。(具体参见《DataX插件开发指南》)

DataX结构模式(框架+插件)

 

DataX架构模式

DataX架构模式

  • Job: 一道数据同步作业
  • Splitter: 作业切分模块,将一个大任务与分解成多个可以并发的小任务.
  • Sub-job: 数据同步作业切分后的小任务
  • Reader(Loader): 数据读入模块,负责运行切分后的小任务,将数据从源头装载入DataX
  • Storage: Reader和Writer通过Storage交换数据
  • Writer(Dumper): 数据写出模块,负责将数据从DataX导入至目的数据地

DataX框架内部通过双缓冲队列、线程池封装等技术,集中处理了高速数据交换遇到的问题,提供简单的接口与插件交互,插件分为Reader和Writer两类,基于框架提供的插件接口,可以十分便捷的开发出需要的插件。
比如想要从oracle导出数据到mysql,那么需要做的就是开发出OracleReader和MysqlWriter插件,装配到框架上即可。并且这样的插件一般情况下在其他数据交换场合是可以通用的。

更大的惊喜是我们已经开发了如下插件:

Reader插件

hdfsreader : 支持从hdfs文件系统获取数据。
mysqlreader: 支持从mysql数据库获取数据。
sqlserverreader: 支持从sqlserver数据库获取数据。
oraclereader : 支持从oracle数据库获取数据。
streamreader: 支持从stream流获取数据(常用于测试)
httpreader : 支持从http URL获取数据。

Writer插件

hdfswriter:支持向hdbf写入数据。
mysqlwriter:支持向mysql写入数据。
oraclewriter:支持向oracle写入数据。
streamwriter:支持向stream流写入数据。(常用于测试)

您可以按需选择使用或者独立开发您自己的插件 (具体参见《DataX插件开发指南》)

DataX在淘宝的运用

       数据同步工具归一化为DataX后,大大提高了用户拖表数据速度和内存利用率, 同时针对归一化后的DataX工具,我们能够做到更好应对mysql切库、数据同步监控等以前零散工具下很难完成的运维任务。
下面是部分工具替换后的比对情况:

DataX

DataX

 

下面是我将DataX源码编译后的配置和使用示例(下载地址在文章最后面):

环境需求:

1. java >= 1.6  python >= 2.6
2. 如果使用Oracle,需要安装Oracle客户端;
3. 如果使用HDFS,需要确保hadoop命令行可用;同时请确保在执行DataX的用户/home目录下,链接Hadoop config目录文件,在用户目录下执行: ln  -s  /home/$user/config  hadoop-configure-目录
4. 默认安装到/home/taobao/datax 目录, 最好使用root用户安装,因为还有其他权限问题;

 

安装:

1. 先安装Datax engine
rpm -ivh t_dp_datax_engine-1.0.0-1.noarch.rpm
安装之后,/home/taobao/datax目录结构如下:

datax安装

datax安装

2. 安装需要的读写插件,比如,我需要在HDFS和Mysql之间的数据传输,则需要装HDFS的读写插件和Mysql的读写插件:
rpm -ivh t_dp_datax_hdfsreader-1.0.0-1.noarch.rpm
rpm -ivh t_dp_datax_hdfswriter-1.0.0-1.noarch.rpm
rpm -ivh t_dp_datax_mysqlreader-1.0.0-1.noarch.rpm
rpm -ivh t_dp_datax_mysqlwriter-1.0.0-1.noarch.rpm
成功安装之后,/home/taobao/datax下多了plugins/目录,再往下,是reader和writer目录,分别用来存放读插件和写插件,如图:

datax读写插件

datax读写插件

配置(以mysql数据导入hdfs为例)

1.建立Hadoop配置文件目录的链接(执行任务的用户为lxw1234 ),切换到lxw1234用户,执行:
ln -s /usr/local/hadoop-0.20.2/conf /home/lxw1234/config
如图:

datax配置

datax配置

2. 生成job配置文件
进入:/home/taobao/datax/bin/,
执行:./datax.py –e
屏幕显示如下图:

datax配置

datax配置

列出了可用的数据源类型(之前安装了hdfs和mysql的reader插件,因此这里显示这两种数据源),选择1(mysql),如图:

datax配置

datax配置

列出了可用的数据目标类型(同理,显示hdfs和mysql),选择0(hdfs),如图:

datax配置

datax配置

生成了job的配置文件/home/taobao/datax/jobs/mysqlreader_to_hdfswriter_1432867511409.xml

3. 编辑job配置文件
vi /home/taobao/datax/jobs/mysqlreader_to_hdfswriter_1432867511409.xml

<reader></reader>标签里面配置数据源,这里需要修改从mysql中读取数据的配置信息:
<param key=”ip” value=”127.0.0.1″/>
<param key=”port” value=”3306″/>
<param key=”dbname” value=”lxw1234″/>
<param key=”username” value=”lxw1234″/>
<param key=”password” value=”lxw1234.com”/>
<param key=”sql” value=”select job_id,job_create_time,job_last_update_time,job_type from dmp_job_log limit 500″/>

其他reader参数可参考使用手册中的说明。

<writer></writer>标签里面配置数据目标,这里需要修改往HDFS中写入数据的配置信息:
<param key=”hadoop.job.ugi” value=”?”/>   //Hadoop的认证配置,如果没有,就不需要配置;
<param key=”hadoop_conf” value=”/home/lxw1234/config/core-site.xml”/>    //Hadoop的配置文件目录
<param key=”dir” value=”hdfs://namenode:8020/tmp/lxw1234/datax/”/>     //往hdfs的哪个目录下写数据
<param key=”field_split” value=”\001″/>   //写入文件的列分隔符
<param key=”file_type” value=”TXT”/>  //写入hdfs的文件类型
<param key=”concurrency” value=”1″/>  //写并发,每个并发生成一个文件

其他writer参数可参考使用手册中的说明。

执行:

1. 进入:/home/taobao/datax/bin/,
执行:./datax.py /home/taobao/datax/jobs/mysqlreader_to_hdfswriter_1432867511409.xml

运行结果如图:

datax执行

datax执行

 

datax执行

datax执行

datax执行

datax执行

执行完后,查看hdfs上生成的文件:

datax执行

datax执行

说明:

1. datax中还有很多扩展属性,如:动态参数、动态序列、读写并发等,详见文档说明;
2. 后续其他job,可直接复制修改之前的job配置文件,然后执行即可;
3. hdfs reader和writer插件使用的hadoop jar包版本较低hadoop-0.19.2-core.jar(见/home/taobao/datax/plugins/writer/hdfswriter和/home/taobao/datax/plugins/reader/hdfsreader),使用hdfs插件时候,需要将你的hadoop jar包拷贝至插件目录,比如,我使用的hadoop版本为hadoop-core-0.20.2-cdh3u2.jar,将该jar包拷贝至hdfs插件目录,并删除原来的hadoop-0.19.2-core.jar;

相关下载

 


 

阿里最新开源的DataX3.0版本。

https://github.com/alibaba/DataX

里面有详细安装配置文档。

 


 

数据交换是大数据平台中的一个重要模块,另一个重要模块就是任务调度监控,参考(http://lxw1234.com/archives/2015/04/109.htm)。

前面介绍过的京东大数据平台中,海量数据交换工具也是参考DataX开发的(http://lxw1234.com/archives/2015/05/228.htm

后续将持续介绍我自己大数据分析平台的构建经验,请关注我的博客。


更新: 请参考阿里最新开源的DataX3.0版本。

https://github.com/alibaba/DataX

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » 异构数据源海量数据交换工具-Taobao DataX 下载和使用

喜欢 (110)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(20)个小伙伴在吐槽
  1. 你应该拿的是DataX最初版本吧,老版本问题不少。
    shisi2015-09-23 18:08 回复
    • 是最初版本,不知道taocode上有没有再更新。 我们是基于上面做了二次开发,并没有直接使用开源的这个。
      lxw1234@qq.com2015-09-24 09:52 回复
      • 请问一下,二次开发孩子后所支持的hadoop和hive和hbase的版本是多少。
        zhanmsl2016-01-27 17:51 回复
        • 只是使用了HDFS和HBASE的基础读写的API,基本所有版本都能支持,只需要依赖对应版本的jar包即可。
          lxw1234@qq.com2016-01-28 08:22 回复
  2. hi , 我用你百度云裳下载的rpm,安装时候有点问题,是不是缺少那个src.rpm 包 ?
    keith2015-12-21 18:09 回复
  3. 公司新项目要用datax转换mongodb的数据到mysql.遇到一个问题,mongodb的是数据很多都是一个document里不是所有字段都的值都有例如:user = [{"name":"张三","id":"0000001"},{"age":12,"id":"0000002"},{"name":"王二麻子","age":13,"id":"0000003"}] 。这样在datax的任务中就会报读取的字段数和要写入的数目不相同。这该如何解决?
    fg2016-03-30 13:59 回复
    • 这个在自定义mongodb read plugin时候,需要配置字段名,比如,name,id,age,然后从一个doc中解析所配置的这几个字段,如果没有,就给这个字段置空,最终形成固定字段数的Message;
      lxw1234@qq.com2016-03-30 17:27 回复
      • 这个可以在job配置里配置吗?还是需要重新去插件里来该?
        fg2016-03-31 13:01 回复
        • 需要自己重新写插件。
          lxw1234@qq.com2016-03-31 13:05 回复
          • 楼主,rpm包已经无效了,能另外发个过来吗,邮箱1174646618@qq.com
            wuzhenx2016-09-05 17:39
  4. 这篇是DataX网上教程最详细的一篇,博主好人 :mrgreen:
    晴天2016-04-01 08:31 回复
    • 博主大哥,dx小弟刚刚学习,我用oracle账号装oracle客户端,如果我要从oracle到oracle传数据表,安装datax是不是也得oracle账号,除了参数,还需要配置其他东西吗?
      晴天2016-04-01 11:42 回复
      • dx不一定用oracle账号,只要配置链接oracle的用户和密码就行。
        lxw1234@qq.com2016-04-05 09:19 回复
  5. oraclereader是使用的jdbc的连接方式,oraclewriter是使用的oci的方式写数据,可不可以用他提供的.so文件然后将oraclereader也改成oci的方式?
    晴天2016-04-13 08:18 回复
  6. 你好,最新版本的DataX已经完全开放,快去看看吧。https://github.com/alibaba/DataX
    烬哥哥2016-08-04 00:49 回复
  7. DataX有可视化界面吗
    威笑2016-08-31 11:56 回复
  8. 你好,楼主了,我用最新版本的DataX将mysql数据导入hbase里面,但是报错了。解决不了,能否帮我分析下,谢谢! 报错信息如下: 十一月 19, 2016 3:31:44 下午 org.apache.hadoop.hbase.client.ZooKeeperRegistry getClusterId 信息: ClusterId read in ZooKeeper is null 十一月 19, 2016 3:31:45 下午 org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation closeZooKeeperWatcher 信息: Closing zookeeper sessionid=0x1587b7edd960000 2016-11-19 15:31:45.753 [0-0-0-writer] INFO ZooKeeper - Session: 0x1587b7edd960000 closed 2016-11-19 15:31:45.754 [0-0-0-writer-EventThread] INFO ClientCnxn - EventThread shut down 2016-11-19 15:31:45.774 [0-0-0-writer] ERROR WriterRunner - Writer Runner Received Exceptions: com.alibaba.datax.common.exception.DataXException: Code:[Hbasewriter-12], Description:[获取hbase BufferedMutator 时出错.]. - java.lang.RuntimeException: java.lang.NullPointerException at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208) at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295) at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160) at org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155) at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821) at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:602) at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366) at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:392) at com.alibaba.datax.plugin.writer.hbase11xwriter.Hbase11xHelper.checkHbaseTable(Hbase11xHelper.java:186) at com.alibaba.datax.plugin.writer.hbase11xwriter.Hbase11xHelper.getBufferedMutator(Hbase11xHelper.java:92) at com.alibaba.datax.plugin.writer.hbase11xwriter.HbaseAbstractTask.(HbaseAbstractTask.java:38) at com.alibaba.datax.plugin.writer.hbase11xwriter.NormalTask.(NormalTask.java:27) at com.alibaba.datax.plugin.writer.hbase11xwriter.Hbase11xWriter$Task.init(Hbase11xWriter.java:58) at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.getMetaReplicaNodes(ZooKeeperWatcher.java:395) at org.apache.hadoop.hbase.zookeeper.MetaTableLocator.blockUntilAvailable(MetaTableLocator.java:553) at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation(ZooKeeperRegistry.java:61) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateMeta(ConnectionManager.java:1186) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1153) at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:155) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) ... 15 more - java.lang.RuntimeException: java.lang.NullPointerException at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208) at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295) at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160) at org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:155) at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821) at org.apache.hadoop.hbase.MetaTableAccessor.fullScan(MetaTableAccessor.java:602) at org.apache.hadoop.hbase.MetaTableAccessor.tableExists(MetaTableAccessor.java:366) at org.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:392) at com.alibaba.datax.plugin.writer.hbase11xwriter.Hbase11xHelper.checkHbaseTable(Hbase11xHelper.java:186) at com.alibaba.datax.plugin.writer.hbase11xwriter.Hbase11xHelper.getBufferedMutator(Hbase11xHelper.java:92) at com.alibaba.datax.plugin.writer.hbase11xwriter.HbaseAbstractTask.(HbaseAbstractTask.java:38) at com.alibaba.datax.plugin.writer.hbase11xwriter.NormalTask.(NormalTask.java:27) at com.alibaba.datax.plugin.writer.hbase11xwriter.Hbase11xWriter$Task.init(Hbase11xWriter.java:58) at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:44) at java.lang.Thread.run(Thread.java:745) 我的json文件内容如下: { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": ["TBL_GRANT_ID", "CREATE_TIME", "GRANT_OPTION", "GRANTOR", "GRANTOR_TYPE", "PRINCIPAL_NAME", "PRINCIPAL_TYPE", "TBL_PRIV", "TBL_ID" ], "connection": [ { "jdbcUrl": ["jdbc:mysql://10.206.20.83:3306/hive"], "table": ["TBL_PRIVS"] } ], "password": "hive", "username": "hive", "where": "" } }, "writer": { "name": "hbase11xwriter", "parameter": { "column": [ { "index":1, "name":"cf1:q2", "type":"int" }, { "index":2, "name":"cf1:q3", "type":"int" }, { "index":3, "name":"cf1:q4", "type":"string" }, { "index":4, "name":"cf1:q5", "type":"string" }, { "index":5, "name":"cf2:q1", "type":"string" }, { "index":6, "name":"cf2:q2", "type":"string" }, { "index":7, "name":"cf2:q3", "type":"string" }, { "index":8, "name":"cf2:q4", "type":"int" } ], "encoding": "utf-8", "hbaseConfig": { "hbase.cluster.distributed": "true", "hbase.rootdir": "hdfs://nn1:8020/apps/hbase/data", "hbase.zookeeper.quorum": "dn1:2181,nn1:2181,nn2:2181" }, "mode": "normal", "rowkeyColumn": [ { "index":0, "type":"int" } ], "table": "TBL_PRIVS", "versionColumn": { "index": "-1", "value": "123456789" } } } } ], "setting": { "speed": { "channel": "1" } } } }
    zphandsome2016-11-19 15:40 回复
  9. 楼主,在正式环境中,datax可以部署在Hadoop集群的接口机上吗?
    公子盛2017-04-01 17:44 回复
    • 请问这个rpm -ivh t_dp_datax_engine-1.0.0-1.noarch.rpm 在哪下载呢?
      李素娜2017-05-05 17:15 回复