关键字:flume、拦截器、interceptor
Flume中的拦截器(interceptor),用户Source读取events发送到Sink的时候,在events header中加入一些有用的信息,或者对events的内容进行过滤,完成初步的数据清洗。这在实际业务场景中非常有用,Flume-ng 1.6中目前提供了以下拦截器:
Timestamp Interceptor;
Host Interceptor;
Static Interceptor;
UUID Interceptor;
Morphline Interceptor;
Search and Replace Interceptor;
Regex Filtering Interceptor;
Regex Extractor Interceptor;
本文对常用的几种拦截器进行学习和介绍,并附上使用示例。
本文中使用的Source为TaildirSource,就是监控一个文件的变化,将内容发送给Sink,具体可参考《Flume中的TaildirSource》,Source配置如下:
#-->设置sources名称 agent_lxw1234.sources = sources1 #--> 设置channel名称 agent_lxw1234.channels = fileChannel #--> 设置sink 名称 agent_lxw1234.sinks = sink1 # source 配置 agent_lxw1234.sources.sources1.type = com.lxw1234.flume17.TaildirSource agent_lxw1234.sources.sources1.positionFile = /tmp/flume/agent_lxw1234_position.json agent_lxw1234.sources.sources1.filegroups = f1 agent_lxw1234.sources.sources1.filegroups.f1 = /tmp/lxw1234_.*.log agent_lxw1234.sources.sources1.batchSize = 100 agent_lxw1234.sources.sources1.backoffSleepIncrement = 1000 agent_lxw1234.sources.sources1.maxBackoffSleep = 5000 agent_lxw1234.sources.sources1.channels = fileChannel
Flume Source中使用拦截器的相关配置如下:
## source 拦截器 agent_lxw1234.sources.sources1.interceptors = i1 i2 agent_lxw1234.sources.sources1.interceptors.i1.type = host agent_lxw1234.sources.sources1.interceptors.i1.useIP = false agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost agent_lxw1234.sources.sources1.interceptors.i2.type = timestamp
对一个Source可以使用多个拦截器。
Timestamp Interceptor
时间戳拦截器,将当前时间戳(毫秒)加入到events header中,key名字为:timestamp,值为当前时间戳。用的不是很多。比如在使用HDFS Sink时候,根据events的时间戳生成结果文件,hdfs.path = hdfs://cdh5/tmp/dap/%Y%m%d
hdfs.filePrefix = log_%Y%m%d_%H
会根据时间戳将数据写入相应的文件中。
但可以用其他方式代替(设置useLocalTimeStamp = true)。
Host Interceptor
主机名拦截器。将运行Flume agent的主机名或者IP地址加入到events header中,key名字为:host(也可自定义)。
根据上面的Source,拦截器的配置如下:
## source 拦截器 agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = host agent_lxw1234.sources.sources1.interceptors.i1.useIP = false agent_lxw1234.sources.sources1.interceptors.i1.hostHeader = agentHost # sink 1 配置 agent_lxw1234.sinks.sink1.type = hdfs agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234/%Y%m%d agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{agentHost} agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text agent_lxw1234.sinks.sink1.hdfs.rollCount = 0 agent_lxw1234.sinks.sink1.hdfs.rollSize = 0 agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600 agent_lxw1234.sinks.sink1.hdfs.batchSize = 500 agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10 agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0 agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1 agent_lxw1234.sinks.sink1.channel = fileChannel
该配置用于将source的events保存到HDFS上hdfs://cdh5/tmp/lxw1234的目录下,文件名为lxw1234_<主机名>.log
Static Interceptor
静态拦截器,用于在events header中加入一组静态的key和value。
根据上面的Source,拦截器的配置如下:
## source 拦截器 agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = static agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true agent_lxw1234.sources.sources1.interceptors.i1.key = static_key agent_lxw1234.sources.sources1.interceptors.i1.value = static_value # sink 1 配置 agent_lxw1234.sinks.sink1.type = hdfs agent_lxw1234.sinks.sink1.hdfs.path = hdfs://cdh5/tmp/lxw1234 agent_lxw1234.sinks.sink1.hdfs.filePrefix = lxw1234_%{static_key} agent_lxw1234.sinks.sink1.hdfs.fileSuffix = .log agent_lxw1234.sinks.sink1.hdfs.fileType = DataStream agent_lxw1234.sinks.sink1.hdfs.useLocalTimeStamp = true agent_lxw1234.sinks.sink1.hdfs.writeFormat = Text agent_lxw1234.sinks.sink1.hdfs.rollCount = 0 agent_lxw1234.sinks.sink1.hdfs.rollSize = 0 agent_lxw1234.sinks.sink1.hdfs.rollInterval = 600 agent_lxw1234.sinks.sink1.hdfs.batchSize = 500 agent_lxw1234.sinks.sink1.hdfs.threadsPoolSize = 10 agent_lxw1234.sinks.sink1.hdfs.idleTimeout = 0 agent_lxw1234.sinks.sink1.hdfs.minBlockReplicas = 1 agent_lxw1234.sinks.sink1.channel = fileChannel
看看最终Sink在HDFS上生成的文件结构:
UUID Interceptor
UUID拦截器,用于在每个events header中生成一个UUID字符串,例如:b5755073-77a9-43c1-8fad-b7a586fc1b97。生成的UUID可以在sink中读取并使用。根据上面的source,拦截器的配置如下:
## source 拦截器 agent_lxw1234.sources.sources1.interceptors = i1 agent_lxw1234.sources.sources1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder agent_lxw1234.sources.sources1.interceptors.i1.headerName = uuid agent_lxw1234.sources.sources1.interceptors.i1.preserveExisting = true agent_lxw1234.sources.sources1.interceptors.i1.prefix = UUID_ # sink 1 配置 agent_lxw1234.sinks.sink1.type = logger agent_lxw1234.sinks.sink1.channel = fileChannel
运行后在日志中查看header信息:
Morphline Interceptor
Morphline拦截器,该拦截器使用Morphline对每个events数据做相应的转换。关于Morphline的使用,可参考
http://kitesdk.org/docs/current/morphlines/morphlines-reference-guide.html
后续再研究这块。
剩下几个拦截器的介绍和使用在下篇文章中介绍。
您可以关注 lxw的大数据田地 ,或者 加入邮件列表 ,随时接收博客更新的通知邮件。
如果觉得本博客对您有帮助,请 赞助作者 。