【Flume】flume输出sink到hbase的实现
flume 1.5.2
hbase 0.98.9
hadoop 2.6
zk 3.4.6
以上是基础的软件及对应版本,请先确认以上软件安装成功!
1、添加jar包支持
将hbase的lib下的这些jar包拷贝到flume的lib下
2、配置flume
注意看以上的serializer配置,采用的是官方的RegexHbaseEventSerializer,
当然还有一个SimpleHbaseEventSerializer
如果你使用了SimpleHbaseEventSerializer
就会出现如下的错误
[2015-03-04 09:35:41,244] [SinkRunner-PollingRunner-DefaultSinkProcessor:5672] [ERROR] Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;
at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)
at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)
at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)
at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)
at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
[2015-03-04 09:35:41,249] [SinkRunner-PollingRunner-DefaultSinkProcessor:5677] [ERROR] Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;
at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)
at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)
at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)
at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)
at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Increment.setWriteToWAL(Z)Lorg/apache/hadoop/hbase/client/Increment;
at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:408)
at org.apache.flume.sink.hbase.HBaseSink$4.run(HBaseSink.java:391)
at org.apache.flume.sink.hbase.HBaseSink.runPrivileged(HBaseSink.java:427)
at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:391)
at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:344)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)
所以不要使用该序列化器
当然这个序列化器应该自己来开发的,因为列族,列修饰符什么的,应该由我们自己定义的
3、测试
这里注意,你必须提前在hbase将表创建好
hbase(main):004:0> create "flume","chiwei"
0 row(s) in 1.0770 seconds
=> Hbase::Table - flume
启动flume
待监测的文件内容为
Hello Flume
Hello HBase
Hello Hadoop
Hello Zk
Hello chiwei
再来看一下hbase的表数据
hbase(main):006:0> scan "flume"
ROW COLUMN+CELL
1425437884264-rd3B0VE5Uz-0 column=chiwei:payload, timestamp=1425437884520, value=Hello Flume
1425437884275-rd3B0VE5Uz-1 column=chiwei:payload, timestamp=1425437884520, value=Hello HBase
1425437884276-rd3B0VE5Uz-2 column=chiwei:payload, timestamp=1425437884520, value=Hello Hadoop
1425437884277-rd3B0VE5Uz-3 column=chiwei:payload, timestamp=1425437884520, value=Hello Zk
1425437884278-rd3B0VE5Uz-4 column=chiwei:payload, timestamp=1425437884520, value=Hello chiwei
1425437884624-rd3B0VE5Uz-5 column=chiwei:payload, timestamp=1425437884639, value=Hello Flume
1425437884626-rd3B0VE5Uz-6 column=chiwei:payload, timestamp=1425437884639, value=Hello HBase
1425437884628-rd3B0VE5Uz-7 column=chiwei:payload, timestamp=1425437884639, value=Hello Hadoop
1425437884629-rd3B0VE5Uz-8 column=chiwei:payload, timestamp=1425437884639, value=Hello Zk
1425437884630-rd3B0VE5Uz-9 column=chiwei:payload, timestamp=1425437884639, value=Hello chiwei
10 row(s) in 0.4020 seconds
hbase(main):007:0>
测试成功!
声明:该文观点仅代表作者本人,入门客AI创业平台信息发布平台仅提供信息存储空间服务,如有疑问请联系rumenke@qq.com。