机器学习入门之Flume学习系列(四)---- Interceptors(拦截器)
小标 2018-12-12 来源 : 阅读 941 评论 0

摘要:本文主要向大家介绍了机器学习入门之Flume学习系列(四)---- Interceptors(拦截器),通过具体的内容向大家展现,希望对大家学习机器学习入门有所帮助。

本文主要向大家介绍了机器学习入门之Flume学习系列(四)---- Interceptors(拦截器),通过具体的内容向大家展现,希望对大家学习机器学习入门有所帮助。


前言:flume通过使用Interceptors(拦截器)实现修改和过滤事件的功能。举个栗子,一个网站每天产生海量数据,但是可能会有很多数据是不完整的(缺少重要字段),或冗余的,如果不对这些数据进行特殊处理,那么会降低系统的效率。这时候拦截器就派上用场了。

一、flume内置的拦截器

先列个flume内置拦截器的表:



table1.png

    由于拦截器一般针对Event的Header进行处理,那我先介绍一Event吧。Event结构如下图:



001.jpg

event是flume中处理消息的基本单元,由零个或者多个header和正文body组成。

Header 是 key/value 形式的,可以用来制造路由决策或携带其他结构化信息(如事件的时间戳或事件来源的服务器主机名)。你可以把它想象成和 HTTP 头一样提供相同的功能——通过该方法来传输正文之外的额外信息。

Body是一个字节数组,包含了实际的内容。

flume提供的不同source会给其生成的event添加不同的header。

    这些拦截器实际都是往Event的header里插数据,比如Timestamp Interceptor拦截器就是可以往event的header中插入关键词为timestamp的时间戳。

1.1 timestamp拦截器

下面是官网的timestamp的配置:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

测试例子:
    在flume的conf目录下新建timestamp.conf文件,输入以下代码:

#配置文件:timestamp.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
 
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.preserveExisting= falsea1.sources.r1.interceptors.i1.type = timestamp
 
 
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
 
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100

    打开一个终端,进入flume的bin目录,输入./flume-ng agent -c ../conf -f ../conf/timestamp.conf -Dflume.root.logger=INFO,console -n a1
    启动成功后,flume开始监控本主机上的所有IP地址,再开一个终端,向50000端口发TCP数据,命令如下:echo "TimestampInterceptor" | nc 127.0.0.1 50000。
    此时,flume的终端会接收到这个消息,如下:


002.png


我们可以清楚地看到header中有timestamp的影子。成功了。



003.jpg

1.2 host拦截器

    再说一个host interceptor,该拦截器可以往event的header中插入关键词默认为host的主机名或者ip地址(注意是agent运行的机器的主机名或者ip地址)。官网配置如下:

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

测试例子

#配置文件:host.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
 
a1.sources.r1.interceptors = i2
# a1.sources.r1.interceptors.i1.preserveExisting= false# a1.sources.r1.interceptors.i1.type =timestamp
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.hostHeader =hostname
a1.sources.r1.interceptors.i2.useIP = false
 # Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
 
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100

    进入flume的bin目录,执行./flume-ng agent -c ../conf -f ../conf/host.conf -Dflume.root.logger=INFO,console -n a1
然后在新的终端输入echo "HostInterceptor" | nc 127.0.0.1 50000
    结果如下:


004.jpg


    可以发现header中增加了一个hostname的值:192.168.1.105。使用ifconfig命令,正是我电脑的IP地址,因为flume的agent运行在这个IP上嘛。


1.3 Regex Filtering Interceptor拦截器

    还有比较重要的Regex Filtering Interceptor,Regex Filtering Interceptor拦截器用于过滤事件,筛选出与配置的正则表达式相匹配的事件。可以用于包含事件和排除事件。常用于数据清洗,通过正则表达式把数据过滤出来。

测试例子:

#配置文件:regex_filter.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
#全部是数字的数据
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
#排除符合正则表达式的数据
a1.sources.r1.interceptors.i1.excludeEvents =true
 # Describe the sink
a1.sinks.k1.type = logger
 
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100
 # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

    这样的配置将会过滤掉所有纯数字的数据。
    进入bin目录启动flume,执行./flume-ng agent -c ../conf -f ../conf/regex_filter.conf -Dflume.root.logger=INFO,console -n a1
    新开终端输入
echo "1234" | nc 127.0.0.1 50000
echo "1234" | nc 127.0.0.1 50000
echo "1234" | nc 127.0.0.1 50000
    在flume的终端会输出如下内容,但是没有数据,因为被我们过滤掉了。


005.png


二、总结

    本文介绍了flume中的interceptor拦截器,并针对具体的三个拦截器进行了测试,验证其功能。后面我们还会编写符合自己需求的自定义拦截器。敬请期待。

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标人工智能机器学习频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程