机器学习入门之Flume学习系列(五)---- Custom Interceptors(自定义拦截器)
小标 2018-12-12 来源 : 阅读 2327 评论 0

摘要:本文主要向大家介绍了机器学习入门之Flume学习系列(五)---- Custom Interceptors(自定义拦截器),通过具体的内容向大家展现,希望对大家学习机器学习入门有所帮助。

本文主要向大家介绍了机器学习入门之Flume学习系列(五)---- Custom Interceptors(自定义拦截器),通过具体的内容向大家展现,希望对大家学习机器学习入门有所帮助。


前言:接上一篇,本篇文章实现一个自定义的拦截器。主要功能是在Event的body中添加IP地址。因为没有拦截器可以在Body中添加(host是在header中添加),所以需要自定义。掌握了这个,其他的情况根据自己的业务需求去写即可。


001.png


我又是前言: 说一下学习自定义拦截器方法吧,导入flume的源码后,看flume-ng-core-xxx.jar下的org.apache.flume.interceptor
包,里面有flume为我们写好的一些拦截器,我们只需要仿照这些类去写即可,同时可以查看接口的javadoc知道要重写的方法是什么作用。

一、流程

①搭建flume开发环境(巧妇难为无米之炊,你没开发环境怎么玩,程序都不知道你写的类是个啥)
②新建一个类,实现Interceptor接口,重写intercept(Event event)方法
③新建一个类,实现Interceptor.Builder接口,重写configure(Context context)和build()方法
④打成jar包放到flume的lib目录下
⑤编写相应的flume.conf文件,将type值使用类的全限定名指定我们的拦截器。如果有自定义属性,需要配置该自定义属性。

二、搭建开发环境

    新建一个maven工程,在pom.xml中添加如下依赖(我这里的版本号和我的flume版本号一致):

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-sdk</artifactId>
        <version>1.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.8.0</version>
    </dependency></dependencies>

二、编写自定义Interceptor和自定义Builder

    代码不难,希望大家不要有恐惧心理,一看到代码段就脑壳疼。

package com.zhb.flume;import java.util.List;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import com.google.common.base.Charsets;public class MyInterceptor implements Interceptor {    private String ipAddress = null;    // 自定义属性 serviceId
    public MyInterceptor(String ipAddress) {        this.ipAddress = ipAddress;
    }    public void initialize() {        // TODO Auto-generated method stub

    }       //拦截器的核心
    public Event intercept(Event event) {        //获得body的内容
        String eventBody = new String(event.getBody(), Charsets.UTF_8);
        String fmt = "%s %s";        // 添加ipAddress 到event的开头
        event.setBody(String.format(fmt, ipAddress, eventBody).getBytes());        return event;
    }    public List<Event> intercept(List<Event> events) {        for (Event event : events) {
            intercept(event);
        }        return events;
    }    public void close() {        // TODO Auto-generated method stub
    }

}package com.zhb.flume;import org.apache.flume.Context;import org.apache.flume.interceptor.Interceptor;public class AppendIPBuilder implements Interceptor.Builder {    private String ipAddress = null;    public void configure(Context context) {        // set argument serviceId
        String configServiceId = context.getString("ipAddress");
        ipAddress = configServiceId;
    }    public Interceptor build() {        
        return new MyInterceptor(ipAddress);
    }

}

三、编写flume的配置文件appendIP.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 50000a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1
 
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zhb.flume.AppendIPBuilder
#这里配置的值和Builder里的变量名字要一样
a1.sources.r1.interceptors.i1.ipAddress= 192.168.1.101

# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
 
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100

四、运行flume进行测试

    首先进入到flume的bin目录下,执行./flume-ng agent -c ../conf -f ../conf/appendIP.conf -Dflume.root.logger=INFO,console -n a1
    成功启动后,新开一个终端输入echo "AppendIPAddress" | nc 127.0.0.1 50000
    这时,flume启动的终端的小齿轮转了起来,会心一笑,一定是成功了。


002.jpg


结果如下:


003.png


    IP地址是成功加上去了“192.168.1.101”,说明我们的拦截器成功了,刚要窃喜。But,后面的Ap是什么鬼,这不就是AppendIPAddress的前两个字母么?后面的字母呢?


004.jpg


    查了一下资料,找到了原因,因为我们使用的sink是Logger。看了下源码,这货会自动截取前16个字节的数据,然后我真的数了一下,IP地址13个字节+空格+Ap两个字节=16个字节。
    怎么解决呢?很简单,就是我们重写自己的LoggerSink。下一篇拿下他。


五、总结
    本文实现了自定义拦截器,在body中添加ip地址,实际工作中,根据各种不同需求编写符合自己需求的拦截器即可。下一篇写自定义Sink,解决刚才的问题。

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标人工智能机器学习频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved