Flume是一种分布式,可靠且可用的服务,用于有效地收集,聚合和移动大量日志数据。Flume构建在日志流之上一个简单灵活的架构。它具有可靠的可靠性机制和许多故障转移和恢复机制,具有强大的容错性。使用Flume这套架构实现对日志流数据的实时在线分析。Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。当前Flume有两个版本Flume 0.9X版本的统称Flume-og,Flume1.X版本的统称Flume-ng。由于Flume-ng经过重大重构,与Flume-og有很大不同,使用时请注意区分。本次课程使用的是apache-flume-1.9.0-bin.tar.gz
模板结构是必须掌握的,掌握该模板的目的是为了便于后期的查阅和配置。
<Agent>、<Channel>、<Sink>、<Source>表示组件的名字,系统有哪些可以使用的组件需要查阅文档.
查阅:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
①helloword.properties 单个Agent的配置,将该配置文件放置在flume安装目录下的conf目录下。
# 声明基本组件 Source Channel Sink a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = netcat a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = logger # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c11、安装一下yum -y install nmap-ncat,这样方便后续的测试。 2、需要安装yum -y install telnet,方便做测试。
②启动a1 采集组件
[root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/helloword.properties -Dflume.root.logger=INFO,console附注启动命令参数
Usage: ./bin/flume-ng <command> [options]... commands: help display this help text agent run a Flume agent avro-client run an avro Flume client version show Flume version info global options:# 全局属性 --conf,-c <conf> use configs in <conf> directory --classpath,-C <cp> append to the classpath --dryrun,-d do not actually start Flume, just print the command --plugins-path <dirs> colon-separated list of plugins.d directories. See the plugins.d section in the user guide for more details. Default: $FLUME_HOME/plugins.d -Dproperty=value sets a Java system property value -Xproperty=value sets a Java -X option agent options: --name,-n <name> the name of this agent (required) --conf-file,-f <file> specify a config file (required if -z missing) --zkConnString,-z <str> specify the ZooKeeper connection to use (required if -f missing) --zkBasePath,-p <path> specify the base path in ZooKeeper for agent configs --no-reload-conf do not reload config file if changed --help,-h display help text avro-client options: --rpcProps,-P <file> RPC client properties file with server connection params --host,-H <host> hostname to which events will be sent --port,-p <port> port of the avro source --dirname <dir> directory to stream to avro source --filename,-F <file> text file to stream to avro source (default: std input) --headerFile,-R <file> File containing event headers as key/value pairs on each new line --help,-h display help text Either --rpcProps or both --host and --port must be specified. Note that if <conf> directory is specified, then it is always included first in the classpath.③测试a1
[root@CentOS apache-flume-1.9.0-bin]# telnet CentOS 44444 Trying 192.168.52.134... Connected to CentOS. Escape character is '^]'. hello world 2020-02-05 11:44:43,546 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 0D hello world. }通常用于远程采集数据(RPC服务),内部启动一个Avro 服务器,用于接收来自Avro Client的请求,并且将接收数据存储到Chanel中。
属性默认值含义channels需要对接Channeltype表示组件类型,必须给avrobind绑定IPport绑定监听端口 #声明组件 a1.sources = s1 # 配置组件 a1.sources.s1.type = avro a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 对接channel a1.sources.s1.channels = c1 <Agent>.sources = <Source> # 组件配置 <Agent>.sources.<Source>.<someProperty> = <someValue> # 声明基本组件 Source Channel Sink example2.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = avro a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = logger # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1 [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example2.properties -Dflume.root.logger=INFO,console [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng avro-client --host CentOS --port 44444 --filename /root/t_employee可以将指令在控制台输出采集过来。通常需要将Flume的agent目标采集服务部署在一起。
属性默认值描述channels需要对接Channeltype必须指定为execcommand要执行的命令 # 声明基本组件 Source Channel Sink example3.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /root/t_user # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = logger # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1 [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example3.properties -Dflume.root.logger=INFO,console [root@CentOS ~]# tail -f t_user采集静态目录下,新增文本文件,采集完成后会修改文件后缀,但是不会删除采集的源文件,如果用户只想采集一次,可以修改该source默认行为。通常需要将Flume的agent目标采集服务部署在一起。
属性默认值说明channels对接的Channeltype必须修改为spooldirspoolDir给定需要采集的目录fileSuffix.COMPLETED使用该值修改采集完成文件名deletePolicynever可选值never/immediateincludePattern^.*$表示匹配所有文件ignorePattern^$表示不匹配的文件 # 声明基本组件 Source Channel Sink example4.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = spooldir a1.sources.s1.spoolDir = /root/spooldir a1.sources.s1.fileHeader = true a1.sources.s1.deletePolicy = immediate # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = logger # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1 [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example4.properties -Dflume.root.logger=INFO,console实时监测动态文本行的追加,并且记录采集的文件读取的位置了偏移量,即使下一次再次采集,可以实现增量采集。通常需要将Flume的agent目标采集服务部署在一起。
属性默认值说明channels对接的通道type必须指定为TAILDIRfilegroups以空格分隔的文件组列表。filegroups.文件组的绝对路径。正则表达式(而非文件系统模式)只能用于文件名。positionFile~/.flume/taildir_position.json记录采集文件的位置信息,实现增量采集 # 声明基本组件 Source Channel Sink example5.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = TAILDIR a1.sources.s1.filegroups = g1 g2 a1.sources.s1.filegroups.g1 = /root/taildir/.*\.log$ a1.sources.s1.filegroups.g2 = /root/taildir/.*\.java$ a1.sources.s1.headers.g1.type = log a1.sources.s1.headers.g2.type = java # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = logger # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1 [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example5.properties -Dflume.root.logger=INFO,console通常用于测试/调试目的。
可以将采集的数据写入到本地文件
# 声明基本组件 Source Channel Sink example6.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = netcat a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = file_roll a1.sinks.sk1.sink.directory = /root/file_roll a1.sinks.sk1.sink.rollInterval = 0 # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1 [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/example6.properties可以将数据写入到HDFS文件系统。
# 声明基本组件 Source Channel Sink example7.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = netcat a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = hdfs a1.sinks.sk1.hdfs.path = /flume-hdfs/%y-%m-%d a1.sinks.sk1.hdfs.rollInterval = 0 a1.sinks.sk1.hdfs.rollSize = 0 a1.sinks.sk1.hdfs.rollCount = 0 a1.sinks.sk1.hdfs.useLocalTimeStamp = true a1.sinks.sk1.hdfs.fileType = DataStream # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1将数据写入Kafka的Topic中
# 声明基本组件 Source Channel Sink example8.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = netcat a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.sk1.kafka.bootstrap.servers = CentOS:9092 a1.sinks.sk1.kafka.topic = topic01 a1.sinks.sk1.kafka.flumeBatchSize = 20 a1.sinks.sk1.kafka.producer.acks = 1 a1.sinks.sk1.kafka.producer.linger.ms = 1 a1.sinks.sk1.kafka.producer.compression.type = snappy # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1快 将Source数据直接写入内存,不安全,可能会导致数据丢失。
参数默认值说明type只可以写memorycapacity100通道中存储的最大事件数transactionCapacity100每一次source或者Sink组件写入Channel或者读取Channel的批量大小transactionCapacity <= capacity
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100事件存储在数据库支持的持久性存储中。 JDBC通道当前支持嵌入式Derby。这是一种持久通道,非常适合可恢复性很重要的流程。-存储非常重要的数据,的时候可以使用jdbc channel
a1.channels.c1.type = jdbc1、如果用户配置HIVE_HOME环境,需要用户移除hive的lib下的derby或者flume的lib下的derby(仅仅删除一方即可)
2、默认情况下,flume使用的是复制|广播模式的通道选择器。
将Source采集的数据写入外围系统的Kafka集群。
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = CentOS:9092 a1.channels.c1.kafka.topic = topic_channel a1.channels.c1.kafka.consumer.group.id = g1 # 声明基本组件 Source Channel Sink example10.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = netcat a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = logger # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = CentOS:9092 a1.channels.c1.kafka.topic = topic_channel a1.channels.c1.kafka.consumer.group.id = g1 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1使用文件系统作为通道的实现,能够实现对缓冲数据的持久化。
a1.channels.c1.type = file a1.channels.c1.checkpointDir = /root/flume/checkpoint a1.channels.c1.dataDirs = /root/flume/data作用于Source组件,对Source封装的Event数据进行拦截或者是装饰,Flume内建了许多拦截器:
Timestamp Interceptor:装饰类型,负责在Event Header添加时间信息Host Interceptor:装饰类型,负责在Event Header添加主机信息Static Interceptor:装饰类型,负责在Event Header添加自定义key和value。Remove Header Interceptor:装饰类型,负责删除Event Header中指定的 keyUUID Interceptor:装饰类型,负责在Event Header添加uuid的随机的唯一字符串Search and Replace Interceptor:装饰类型,负责搜索EventBody的内容,并且将匹配的内容进行替换Regex Filtering Interceptor:拦截类型,将满足正则表达式的内容进行过滤或者匹配。Regex Extractor Interceptor:装饰类型,负责搜索EventBody的内容,并且将匹配的内容添加到Event Header里面。测试装饰拦截器
# 声明基本组件 Source Channel Sink example11.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = netcat a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 添加拦截器 a1.sources.s1.interceptors = i1 i2 i3 i4 i5 i6 a1.sources.s1.interceptors.i1.type = timestamp a1.sources.s1.interceptors.i2.type = host a1.sources.s1.interceptors.i3.type = static a1.sources.s1.interceptors.i3.key = from a1.sources.s1.interceptors.i3.value = baizhi a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder a1.sources.s1.interceptors.i4.headerName = uuid a1.sources.s1.interceptors.i5.type = remove_header a1.sources.s1.interceptors.i5.withName = from a1.sources.s1.interceptors.i6.type = search_replace a1.sources.s1.interceptors.i6.searchPattern = ^jiangzz a1.sources.s1.interceptors.i6.replaceString = baizhi # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = logger # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1 [root@CentOS apache-flume-1.9.0-bin]# ./bin/flume-ng agent --name a1 --conf conf/ --conf-file conf/example11.properties -Dflume.root.logger=INFO,console测试过滤和抽取拦截器
# 声明基本组件 Source Channel Sink example12.properties a1.sources = s1 a1.sinks = sk1 a1.channels = c1 # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = netcat a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 添加拦截器 a1.sources.s1.interceptors = i1 i2 a1.sources.s1.interceptors.i1.type = regex_extractor a1.sources.s1.interceptors.i1.regex = ^(INFO|ERROR) a1.sources.s1.interceptors.i1.serializers = s1 a1.sources.s1.interceptors.i1.serializers.s1.name = loglevel a1.sources.s1.interceptors.i2.type = regex_filter a1.sources.s1.interceptors.i2.regex = .*baizhi.* a1.sources.s1.interceptors.i2.excludeEvents = false # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = logger # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 进行组件间的绑定 a1.sources.s1.channels = c1 a1.sinks.sk1.channel = c1当一个Source组件对接多个Channel组件的时候,通道选择器决定了Source的数据如何路由到Channel中,如果用户不指定通道选择器,默认系统会将Source数据广播给所有的Channel(默认使用replicating模式)。
等价写法:
# 声明基本组件 Source Channel Sink example14.properties a1.sources = s1 a1.sinks = sk1 sk2 a1.channels = c1 c2 # 通道选择器 复制模式 a1.sources.s1.selector.type = replicating # 配置Source组件,从Socket中接收文本数据 a1.sources.s1.type = netcat a1.sources.s1.bind = CentOS a1.sources.s1.port = 44444 # 配置Sink组件,将接收数据打印在日志控制台 a1.sinks.sk1.type = file_roll a1.sinks.sk1.sink.directory = /root/file_roll_1 a1.sinks.sk1.sink.rollInterval = 0 a1.sinks.sk2.type = file_roll a1.sinks.sk2.sink.directory = /root/file_roll_2 a1.sinks.sk2.sink.rollInterval = 0 # 配置Channel通道,主要负责数据缓冲 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = jdbc # 进行组件间的绑定 a1.sources.s1.channels = c1 c2 a1.sinks.sk1.channel = c1 a1.sinks.sk2.channel = c2这里需要删除hive安装目录下的derby的驱动jar!
Flume使用Sink Group将多个Sink实例封装成一个逻辑的Sink组件,内部通过Sink Processors实现Sink Group的故障和负载均衡。
如果想看到负载均衡效果,sink.batchSize和 transactionCapacity必须配置成1
参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift
public class RpcClientTests { private RpcClient client; @Before public void before(){ client= RpcClientFactory.getDefaultInstance("CentOS",44444); } @Test public void testSend() throws EventDeliveryException { Event event= EventBuilder.withBody("this is body".getBytes()); HashMap<String, String> header = new HashMap<String, String>(); header.put("from","baizhi"); event.setHeaders(header); client.append(event); } @After public void after(){ client.close(); } } 集群链接①故障转移
//参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift public class RpcClientTests02_FailoverClient { private RpcClient client; @Before public void before(){ Properties props = new Properties(); props.put("client.type", "default_failover"); // List of hosts (space-separated list of user-chosen host aliases) props.put("hosts", "h1 h2 h3"); // host/port pair for each host alias props.put("hosts.h1", "CentOSA:44444"); props.put("hosts.h2","CentOSB:44444"); props.put("hosts.h3", "CentOSC:44444"); client= RpcClientFactory.getInstance(props); } @Test public void testSend() throws EventDeliveryException { Event event= EventBuilder.withBody("this is body".getBytes()); HashMap<String, String> header = new HashMap<String, String>(); header.put("from","zhangsan"); event.setHeaders(header); client.append(event); } @After public void after(){ client.close(); } }②负载均衡
//参考:http://flume.apache.org/releases/content/1.9.0/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift public class RpcClientTests02_LoadBalancing { private RpcClient client; @Before public void before(){ Properties props = new Properties(); props.put("client.type", "default_loadbalance"); // List of hosts (space-separated list of user-chosen host aliases) props.put("hosts", "h1 h2 h3"); // host/port pair for each host alias props.put("hosts.h1", "CentOSA:44444"); props.put("hosts.h2", "CentOSB:44444"); props.put("hosts.h3", "CentOSC:44444"); props.put("host-selector", "random"); // For random host selection // props.put("host-selector", "round_robin"); // For round-robin host // // selection props.put("backoff", "true"); // Disabled by default. props.put("maxBackoff", "10000"); // Defaults 0, which effectively // becomes 30000 ms client= RpcClientFactory.getInstance(props); } @Test public void testSend() throws EventDeliveryException { Event event= EventBuilder.withBody("this is body".getBytes()); HashMap<String, String> header = new HashMap<String, String>(); header.put("from","lisi"); event.setHeaders(header); client.append(event); } @After public void after(){ client.close(); } }参考:https://github.com/gilt/logback-flume-appender
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--junit测试--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-sdk</artifactId> <version>1.9.0</version> </dependency> </dependencies> <?xml version="1.0" encoding="UTF-8"?> <configuration scan="true" scanPeriod="60 seconds" debug="false"> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender" > <encoder> <pattern>%p %c#%M %d{yyyy-MM-dd HH:mm:ss} %m%n</pattern> <charset>UTF-8</charset> </encoder> </appender> <appender name="flume" class="com.gilt.logback.flume.FlumeLogstashV1Appender"> <flumeAgents> CentOS:44444, CentOS:44444, CentOS:44444 </flumeAgents> <flumeProperties> connect-timeout=4000; request-timeout=8000 </flumeProperties> <batchSize>1</batchSize> <reportingWindow>1</reportingWindow> <additionalAvroHeaders> myHeader=myValue </additionalAvroHeaders> <application>smapleapp</application> <layout class="ch.qos.logback.classic.PatternLayout"> <pattern>%p %c#%M %d{yyyy-MM-dd HH:mm:ss} %m%n</pattern> </layout> </appender> <!-- 控制台输出日志级别 --> <root level="ERROR"> <appender-ref ref="STDOUT" /> </root> <logger name="com.baizhi.service" level="DEBUG" additivity="false"> <appender-ref ref="STDOUT" /> <appender-ref ref="flume" /> </logger> </configuration> import com.baizhi.service.IUserSerivice; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @Service public class UserService implements IUserSerivice { private static final Logger LOG= LoggerFactory.getLogger(UserService.class); @Override public String sayHello(String name) { LOG.info("hello "+name); return "hello "+name; } } @SpringBootApplication public class FlumeAplication { public static void main(String[] args) { SpringApplication.run(FlumeAplication.class,args); } } @SpringBootTest(classes = {KafkaSpringBootApplication.class}) @RunWith(SpringRunner.class) public class KafkaTempolateTests { @Autowired private KafkaTemplate kafkaTemplate; @Autowired private IOrderService orderService; @Test public void testOrderService(){ orderService.saveOrder("002","baizhi xxxxx "); } @Test public void testKafkaTemplate(){ kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() { @Override public Object doInOperations(KafkaOperations kafkaOperations) { return kafkaOperations.send(new ProducerRecord("topic01","002","this is a demo")); } }); } }.LoggerFactory; import org.springframework.stereotype.Service;
@Service public class UserService implements IUserSerivice { private static final Logger LOG= LoggerFactory.getLogger(UserService.class); @Override public String sayHello(String name) { LOG.info("hello "+name); return "hello "+name; } }
```java @SpringBootApplication public class FlumeAplication { public static void main(String[] args) { SpringApplication.run(FlumeAplication.class,args); } } @SpringBootTest(classes = {KafkaSpringBootApplication.class}) @RunWith(SpringRunner.class) public class KafkaTempolateTests { @Autowired private KafkaTemplate kafkaTemplate; @Autowired private IOrderService orderService; @Test public void testOrderService(){ orderService.saveOrder("002","baizhi xxxxx "); } @Test public void testKafkaTemplate(){ kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() { @Override public Object doInOperations(KafkaOperations kafkaOperations) { return kafkaOperations.send(new ProducerRecord("topic01","002","this is a demo")); } }); } }