fflume 版本为 1.6 cdh 5.13 注意启动flume是 --name 要和配置文件的 前缀一直 否知启动失败
flume 目录为
/opt/cloudera/parcels/CDH/lib/flume-ngflume 配置文件exec-memory-avro.conf
vim /opt/test/exec-memory-avro.confa1.sources = r1a1.sinks = k1a1.channels = c1
a1.sources.r1.type = execa1.sources.r1.command = tail -F /opt/test/data.loga1.sources.r1.shell = /bin/sh -c
a1.sinks.k1.type = avroa1.sinks.k1.hostname = udap69a165a1.sinks.k1.port = 44444
a1.channels.c1.type = memorya1.sources.r1.channels = c1a1.sinks.k1.channel = c1
flume 配置文件avro-memory-kafka.conf
vim /opt/test/avro-memory-kafka.confavro-memory-kafka.sources = avro-sourceavro-memory-kafka.sinks = kafka-sinkavro-memory-kafka.channels = memory-channel
avro-memory-kafka.sources.avro-source.type = avroavro-memory-kafka.sources.avro-source.bind = udap69a165avro-memory-kafka.sources.avro-source.port = 44444
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSinkavro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = udap69a166:9092avro-memory-kafka.sinks.kafka-sink.topic = hh_testavro-memory-kafka.sinks.kafka-sink.batchSize = 5avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
avro-memory-kafka.channels.memory-channel.type = memory
avro-memory-kafka.sources.avro-source.channels = memory-channelavro-memory-kafka.sinks.kafka-sink.channel = memory-channel
进入到flume 的bin目录下,启动flume agent
./flume-ng agent --name a1 --conf-file /opt/test/exec-memory-avro.conf -Dflume.root.logger=INFO,console ./flume-ng agent --name avro-memeory-kafka --conf-file /opt/test/avro-memeory-kafka.conf -Dflume.root.logger=INFO,console查看日志是否启动成功,然后干掉程序,选择后台启动
jps -m nohup sh flume-ng agent --name a1 --conf-file /opt/test/exec-memory-avro.conf -Dflume.root.logger=INFO,console & nohup sh flume-ng agent --name avro-memeory-kafka --conf-file /opt/test/avro-memeory-kafka.conf -Dflume.root.logger=INFO,console &启动kafka消费的脚本
kafka-console-consumer --zookeeper udap69a166:2181/kafka --topic hh_test写入一些数据
echo "lisi" >> /opt/test/data.log echo "wangwu" >> /opt/test/data.log echo "zhangdan" >> /opt/test/data.log查看kafka 消费者是否消费到数据 问题解决!