• 0

  • 1

  • 收藏

Flume读取数据写入Kafka消息队列中

猿人不正经

不想写代码

1年前

前面已经给大家讲过flume和Kafka的简介以及安装,今天就给大家讲讲二者如何关联使用。
本文主要就是讲解如何使用flume采集日志信息后把数据写入kafka中,由于时间关系,这里就暂时用伪数据,把存放伪数据的文件放到专门用于flume监听文件的目录中就是前面提到过的/opt/soft/datas下。

1.配置flume

先新建配置文件用于关联kafka
还是在/opt/flumeconf下创建properties文件,并添加以下配置
cd /opt/flumeconf
vi conf_0812_kafka.properties

a5.channels=c5
a5.sources=s5
a5.sinks=k5

a5.sources.s5.type=spooldir
a5.sources.s5.spoolDir=/opt/soft/datas
a5.sources.s5.interceptors=head_filter
#正则拦截器
a5.sources.s5.interceptors.head_filter.type=regex_filter
a5.sources.s5.interceptors.head_filter.regex=^event_id.*
a5.sources.s5.interceptors.head_filter.excludeEvents=true

#用来关联kafka
a5.sinks.k5.type=org.apache.flume.sink.kafka.KafkaSink
a5.sinks.k5.kafka.bootstrap.servers=192.168.5.150:9092
#topic的主题名要一致
a5.sinks.k5.kafka.topic=msgEvent

a5.channels.c5.type=memory
a5.channels.c5.capacity=10000
a5.channels.c5.transactionCapacity=10000

a5.sinks.k5.channel=c5
a5.sources.s5.channels=c5

2.先启动zookeeper和Kafka

zkServer.sh start

cd /opt/soft/kafka211/bin
./kafka-server-start.sh /opt/soft/kafka211/config/server.properties 

使用jps验证服务是否启动!

3.创建topic

kafka-topics.sh --create --zookeeper 192.168.5.150:2181 --replication-factor 1 --partitions 1 --topic msgEvent

注意:
创建的topic 和 配置文件里的sinks组件中的topic必须要一致

4.启动消费者以接收消息

kafka-console-consumer.sh --bootstrap-server 192.168.5.150:9092 --topic msgEvent --from-beginning

5.启动生产者以发送消息

kafka-console-producer.sh --broker-list 192.168.5.150:9092 --topic msgEvent

6.启动flume的agent

flume-ng agent -n a5 -c conf -f /opt/flumeconf/conf_0812_kafka.properties -Dflume.root.logger=INFO,console
这样就开启了日志采集 可能要等一段时间, 日志采集完毕之后 flume会提示文件的后缀名会有COMPLETED,说明文件按已采集完毕!如下图所示:
在这里插入图片描述
文件会写入到kafka中 具体路径是kafka配置文件中server.properties里面Log Basics的配置 如下图所示:
在这里插入图片描述
查看文件信息,是否已经写入到kafka的日志目录中。
如下图所示:
在这里插入图片描述
megEvent已出现子datas日志目录中,接下来查看文件中是否有日志信息,如下图所示:
在这里插入图片描述
这样数据就写入到了kafka中!

免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

1

相关文章推荐

未登录头像

暂无评论