详见:大数据技术之Hadoop(入门)
1)测试集群规划:
服务器bigdata02
服务器bigdata03
服务器bigdata04
HDFS
NameNode
DataNode
DataNode
DataNode
SecondaryNameNode
Yarn
NodeManager
Resourcemanager
NodeManager
NodeManager
注意:尽量使用离线方式安装,CDH要收费了,使用的人会越来越少...
若HDFS存储空间紧张,需要对DataNode进行磁盘扩展。
1)在DataNode节点增加磁盘并进行挂载。
2)在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题。
<property> <name>dfs.datanode.data.dir</name> <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value> </property>3)增加磁盘后,保证每个目录数据均衡
开启数据均衡命令:
bin/start-balancer.sh –threshold 10
对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。
停止数据均衡命令:
bin/stop-balancer.sh
1)hadoop本身并不支持lzo压缩,故需要使用twitter提供的hadoop-lzo开源组件。hadoop-lzo需依赖hadoop和lzo进行编译,编译步骤如下。
Hadoop支持LZO 0. 环境准备 maven(下载安装,配置环境变量,修改sitting.xml加阿里云镜像) gcc-c++ zlib-devel autoconf automake libtool 通过yum安装即可,yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool 1. 下载、安装并编译LZO wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz tar -zxvf lzo-2.10.tar.gz cd lzo-2.10 ./configure -prefix=/usr/local/hadoop/lzo/ make make install 2. 编译hadoop-lzo源码 2.1 下载hadoop-lzo的源码,下载地址:https://github.com/twitter/hadoop-lzo/archive/master.zip 2.2 解压之后,修改pom.xml <hadoop.current.version>2.7.2</hadoop.current.version> 2.3 声明两个临时环境变量 export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include export LIBRARY_PATH=/usr/local/hadoop/lzo/lib 2.4 编译 进入hadoop-lzo-master,执行maven编译命令 mvn package -Dmaven.test.skip=true 2.5 进入target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即编译成功的hadoop-lzo组件2)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/
[hadoop@bigdata02 common]$ pwd /opt/module/hadoop-2.7.2/share/hadoop/common [hadoop@bigdata02 common]$ ls hadoop-lzo-0.4.20.jar3)同步hadoop-lzo-0.4.20.jar到bigdata03、bigdata04
[hadoop@bigdata02 common]$ xsync hadoop-lzo-0.4.20.jar4)core-site.xml增加配置支持LZO压缩
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>io.compression.codecs</name> <value> org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec </value> </property> <property> <name>io.compression.codec.lzo.class</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property> </configuration>5)同步core-site.xml到bigdata03、bigdata04
[hadoop@bigdata02 hadoop]$ xsync core-site.xml6)启动及查看集群
[hadoop@bigdata02 hadoop-2.7.2]$ sbin/start-dfs.sh [hadoop@bigdata03 hadoop-2.7.2]$ sbin/start-yarn.sh1)创建LZO文件的索引,LZO压缩文件的可切片特性依赖于其索引,故我们需要手动为LZO压缩文件创建索引。若无索引,则LZO文件的切片只有一个。
hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer bigtable.lzo2)测试
(1)将bigtable.lzo(150M)上传到集群的根目录
[hadoop@bigdata02 module]$ hadoop fs -mkdir /input [hadoop@bigdata02 module]$ hadoop fs -put bigtable.lzo /input(2)执行wordcount程序
[hadoop@bigdata02 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount /input /output1
注意:这个lzo文件有214m,默认的块大小是128m,那么应该是有两个块,我们通过运行mapreduce程序发现,只有一个切分,需要进行下面的步骤才行。
(3)对上传的LZO文件建索引
[hadoop@bigdata02 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo(4)再次执行WordCount程序
[hadoop@bigdata02 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount /input /output2
面试官可能会问,我有10T的数据,读完需要多长时间,写需要多长时间?你应该回答,我们会先做基准测试,测试出这个集群的读写能力,计算的能力...
1) 测试HDFS写性能
测试内容:向HDFS集群写10个128M的文件
[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB 19/05/02 11:45:23 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write 19/05/02 11:45:23 INFO fs.TestDFSIO: Date & time: Thu May 02 11:45:23 CST 2019 19/05/02 11:45:23 INFO fs.TestDFSIO: Number of files: 10 19/05/02 11:45:23 INFO fs.TestDFSIO: Total MBytes processed: 1280.0 19/05/02 11:45:23 INFO fs.TestDFSIO: Throughput mb/sec: 10.69751115716984 19/05/02 11:45:23 INFO fs.TestDFSIO: Average IO rate mb/sec: 14.91699504852295 19/05/02 11:45:23 INFO fs.TestDFSIO: IO rate std deviation: 11.160882132355928 19/05/02 11:45:23 INFO fs.TestDFSIO: Test exec time sec: 52.315最重要的参数是 Throughput mb/sec: 10.69751115716984 表示平均吞吐量为10.6975m/s
2)测试HDFS读性能
测试内容:读取HDFS集群10个128M的文件
[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB 19/05/02 11:56:36 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read 19/05/02 11:56:36 INFO fs.TestDFSIO: Date & time: Thu May 02 11:56:36 CST 2019 19/05/02 11:56:36 INFO fs.TestDFSIO: Number of files: 10 19/05/02 11:56:36 INFO fs.TestDFSIO: Total MBytes processed: 1280.0 19/05/02 11:56:36 INFO fs.TestDFSIO: Throughput mb/sec: 16.001000062503905 19/05/02 11:56:36 INFO fs.TestDFSIO: Average IO rate mb/sec: 17.202795028686523 19/05/02 11:56:36 INFO fs.TestDFSIO: IO rate std deviation: 4.881590515873911 19/05/02 11:56:36 INFO fs.TestDFSIO: Test exec time sec: 49.116 19/05/02 11:56:36 INFO fs.TestDFSIO:3)删除测试生成数据
[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -clean4)使用Sort程序评测MapReduce
(1)使用RandomWriter来产生随机数,每个节点运行10个Map任务,每个Map产生大约1G大小的二进制随机数
[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar randomwriter random-data(2)执行Sort程序
[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar sort random-data sorted-data(3)验证数据是否真正排好序了
[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data1)HDFS参数调优hdfs-site.xml
dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为8台时,此参数设置为60
作用:dataNode与namenode之间通信需要有一定的并发度,如果太低,datanode需要排队,如果太高,集群也没有这么多资源,需要有一个比较合理的并发度...
The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes. NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20 * log2(Cluster Size),N为集群大小。2)YARN参数调优yarn-site.xml
(1)情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive
面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%。但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。
(2)解决办法:
内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。
(a)yarn.nodemanager.resource.memory-mb
表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。
(b)yarn.scheduler.maximum-allocation-mb
单个任务可申请的最多物理内存量,默认是8192(MB)。
3)Hadoop宕机
(1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)
(2)如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。
详见:大数据技术之Zookeeper
集群规划
服务器bigdata02
服务器bigdata03
服务器bigdata04
Zookeeper
Zookeeper
Zookeeper
Zookeeper
1)在bigdata02的/home/hadoop/bin目录下创建脚本
[hadoop@bigdata02 bin]$ vim zk.sh在脚本中编写如下内容
#! /bin/bash case $1 in "start"){ for i in bigdata02 bigdata03 bigdata04 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start" done };; "stop"){ for i in bigdata02 bigdata03 bigdata04 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop" done };; "status"){ for i in bigdata02 bigdata03 bigdata04 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status" done };; esac2)增加脚本执行权限
[hadoop@bigdata02 bin]$ chmod 777 zk.sh3)Zookeeper集群启动脚本
[hadoop@bigdata02 module]$ zk.sh start4)Zookeeper集群停止脚本
[hadoop@bigdata02 module]$ zk.sh stop1)修改/etc/profile文件:用来设置系统环境参数,比如$PATH. 这里面的环境变量是对系统内所有用户生效。使用bash命令,需要source /etc/profile一下。
2)修改~/.bashrc文件:针对某一个特定的用户,环境变量的设置只对该用户自己有效。使用bash命令,只要以该用户身份运行命令行就会读取该文件。
3)把/etc/profile里面的环境变量追加到~/.bashrc目录
[hadoop@bigdata02 ~]$ cat /etc/profile >> ~/.bashrc [hadoop@bigdata03 ~]$ cat /etc/profile >> ~/.bashrc [hadoop@bigdata04 ~]$ cat /etc/profile >> ~/.bashrc4)说明
登录式Shell,采用用户名比如hadoop登录,会自动加载/etc/profile
非登录式Shell,采用ssh 比如ssh bigdata03登录,不会自动加载/etc/profile,会自动加载~/.bashrc
1)代码参数说明
// 参数一:控制发送每条的延时时间,默认是0 Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L; // 参数二:循环遍历次数 int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;2)将生成的jar包log-collector-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝到bigdata02服务器/opt/module上,并同步到bigdata03的/opt/module路径下,
[hadoop@bigdata02 module]$ xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar3)在bigdata02上执行jar程序
[hadoop@bigdata02 module]$ java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.bigdata.appclient.AppMain >/opt/module/test.log说明1:
java -classpath 需要在jar包后面指定全类名;
java -jar 需要查看一下解压的jar包META-INF/ MANIFEST.MF文件中,Main-Class是否有全类名。如果有可以用java -jar,如果没有就需要用到java -classpath
说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。
标准输入0:从键盘获得输入 /proc/self/fd/0
标准输出1:输出到屏幕(即控制台) /proc/self/fd/1
错误输出2:输出到屏幕(即控制台) /proc/self/fd/2
4)在/tmp/logs路径下查看生成的日志文件
[hadoop@bigdata02 module]$ cd /tmp/logs/ [hadoop@bigdata02 logs]$ ls app-2020-03-10.log1)在/home/hadoop/bin目录下创建脚本lg.sh
[hadoop@bigdata02 bin]$ vim lg.sh2)在脚本中编写如下内容
#! /bin/bash for i in bigdata02 bigdata03 do ssh $i "java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.bigdata.appclient.AppMain $1 $2 >/dev/null 2>&1 &" done3)修改脚本执行权限
[hadoop@bigdata02 bin]$ chmod 777 lg.sh4)启动脚本
[hadoop@bigdata02 module]$ lg.sh5)分别在bigdata02、bigdata03的/tmp/logs目录上查看生成的数据
[hadoop@bigdata02 logs]$ ls app-2020-03-10.log [hadoop@bigdata03 logs]$ ls app-2020-03-10.log企业开发时,参考hadoop集群时间同步。
1)在/home/hadoop/bin目录下创建脚本dt.sh
[hadoop@bigdata02 bin]$ vim dt.sh2)在脚本中编写如下内容
#!/bin/bash for i in bigdata02 bigdata03 bigdata04 do echo "========== $i ==========" ssh -t $i "sudo date -s $1" done注意:ssh -t 通常用于ssh远程执行sudo命令
3)修改脚本执行权限
[hadoop@bigdata02 bin]$ chmod 777 dt.sh4)启动脚本
[hadoop@bigdata02 bin]$ dt.sh 2020-03-10
1)在/home/hadoop/bin目录下创建脚本xcall.sh
[hadoop@bigdata02 bin]$ vim xcall.sh2)在脚本中编写如下内容
#! /bin/bash for i in bigdata02 bigdata03 bigdata04 do echo --------- $i ---------- ssh $i "$*" done3)修改脚本执行权限
[hadoop@bigdata02 bin]$ chmod 777 xcall.sh4)启动脚本
[hadoop@bigdata02 bin]$ xcall.sh jps详见:大数据技术之Flume
集群规划:
服务器bigdata02
服务器bigdata03
服务器bigdata04
Flume(采集日志)
Flume
Flume
1)Source
(1)Taildir Source相比Exec Source、Spooling Directory Source的优势
TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。 tail -F / f
Spooling Directory Source监控目录,不支持断点续传。
(2)batchSize大小如何设置?
答:batchSize是每次读取的event数量,Event 1K左右时,500-1000合适(默认为100)
2)Channel
采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。
注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。
面试问题:如何选择memory channel 和 file channel?
memory channel ---》 channel数据在内存里面,数据读写快,如果宕机,会丢失数据
file channel --》 速度就没有那么快,数据存放在磁盘,不会丢失数据,安全性更高
flume 会丢失数据吗?
flume是支持事务,put事务,take事务,可以认为是不丢失数据的,但是如果你用是memory channel 如果发生意外的宕机,memory里面的数据就会丢失..
flume一般来说是不丢数据的,支持事务的
1)Flume配置分析
Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。
2)Flume的具体配置如下:
(1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
[hadoop@bigdata02 conf]$ vim file-flume-kafka.conf在文件配置如下内容
a1.sources=r1 a1.channels=c1 c2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.bigdata.flume.interceptor.LogETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.bigdata.flume.interceptor.LogTypeInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = bigdata02:9092,bigdata03:9092,bigdata04:9092 a1.channels.c1.kafka.topic = topic_start a1.channels.c1.parseAsFlumeEvent = false a1.channels.c1.kafka.consumer.group.id = flume-consumer a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = bigdata02:9092,bigdata03:9092,bigdata04:9092 a1.channels.c2.kafka.topic = topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.group.id = flume-consumer注意:com.bigdata.flume.interceptor.LogETLInterceptor和com.bigdata.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。
面试就是一场秀,不能翻车
本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志
日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。
1)创建Maven工程flume-interceptor
2)创建包名:com.bigdata.flume.interceptor
3)在pom.xml文件中添加如下配置
<dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>4)在com.bigdata.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor
package com.bigdata.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; public class LogETLInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 1 获取数据 byte[] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8")); // 2 判断数据类型并向Header中赋值 if (log.contains("start")) { if (LogUtils.validateStart(log)){ return event; } }else { if (LogUtils.validateEvent(log)){ return event; } } // 3 返回校验结果 return null; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); if (intercept1 != null){ interceptors.add(intercept1); } } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogETLInterceptor(); } @Override public void configure(Context context) { } } } 4)Flume日志过滤工具类 package com.bigdata.flume.interceptor; import org.apache.commons.lang.math.NumberUtils; public class LogUtils { public static boolean validateEvent(String log) { // 服务器时间 | json // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]} // 1 切割 String[] logContents = log.split("\\|"); // 2 校验 if(logContents.length != 2){ return false; } //3 校验服务器时间 if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){ return false; } // 4 校验json if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){ return false; } return true; } public static boolean validateStart(String log) { // {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"} if (log == null){ return false; } // 校验json if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){ return false; } return true; } }5)Flume日志类型区分拦截器LogTypeInterceptor
package com.bigdata.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; public class LogTypeInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 区分日志类型: body header // 1 获取body数据 byte[] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8")); // 2 获取header Map<String, String> headers = event.getHeaders(); // 3 判断数据类型并向Header中赋值 if (log.contains("start")) { headers.put("topic","topic_start"); }else { headers.put("topic","topic_event"); } return event; } @Override public List<Event> intercept(List<Event> events) { ArrayList<Event> interceptors = new ArrayList<>(); for (Event event : events) { Event intercept1 = intercept(event); interceptors.add(intercept1); } return interceptors; } @Override public void close() { } public static class Builder implements Interceptor.Builder{ @Override public Interceptor build() { return new LogTypeInterceptor(); } @Override public void configure(Context context) { } } }6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入Flume的lib文件夹下面。
注意:为什么不需要依赖包?因为依赖包在flume的lib目录下面已经存在了。
7)需要先将打好的包放入到bigdata02的/opt/module/flume/lib文件夹下面。
[hadoop@bigdata02 lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT.jar8)分发Flume到bigdata03、bigdata04
[hadoop@bigdata02 module]$ xsync flume/ [hadoop@bigdata02 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &1)在/home/hadoop/bin目录下创建脚本f1.sh
[hadoop@bigdata02 bin]$ vim f1.sh在脚本中填写如下内容
#! /bin/bash case $1 in "start"){ for i in bigdata02 bigdata03 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1 &" done };; "stop"){ for i in bigdata02 bigdata03 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。
说明2:awk 默认分隔符为空格
说明3:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。
2)增加脚本执行权限
[hadoop@bigdata02 bin]$ chmod 777 f1.sh3)f1集群启动脚本
[hadoop@bigdata02 module]$ f1.sh start4)f1集群停止脚本
[hadoop@bigdata02 module]$ f1.sh stop
详见:大数据技术之Kafka
集群规划:
服务器bigdata02
服务器bigdata03
服务器bigdata04
Kafka
Kafka
Kafka
Kafka
1)在/home/hadoop/bin目录下创建脚本kf.sh
[hadoop@bigdata02 bin]$ vim kf.sh在脚本中填写如下内容
#! /bin/bash case $1 in "start"){ for i in bigdata02 bigdata03 bigdata04 do echo " --------启动 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties " done };; "stop"){ for i in bigdata02 bigdata03 bigdata04 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop" done };; esac2)增加脚本执行权限
[hadoop@bigdata02 bin]$ chmod 777 kf.sh3)kf集群启动脚本
[hadoop@bigdata02 module]$ kf.sh start4)kf集群停止脚本
[hadoop@bigdata02 module]$ kf.sh stop进入到/opt/module/kafka/目录下分别创建:启动日志主题、事件日志主题。
1)创建启动日志主题
[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181 --create --replication-factor 1 --partitions 1 --topic topic_start2)创建事件日志主题
[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181 --create --replication-factor 1 --partitions 1 --topic topic_event1)删除启动日志主题
[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --delete --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181 --topic topic_start2)删除事件日志主题
[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --delete --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181 --topic topic_event1)Kafka压测
用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。
kafka-consumer-perf-test.sh
kafka-producer-perf-test.sh
2)Kafka Producer压力测试
(1)在/opt/module/kafka/bin目录下面有这两个文件。我们来测试一下
[hadoop@bigdata02 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=bigdata02:9092,bigdata03:9092,bigdata04:9092说明:
record-size是一条信息有多大,单位是字节。
num-records是总共发送多少条信息。
throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。
(2)Kafka会打印下面的信息
100000 records sent, 95877.277085 records/sec (9.14 MB/sec), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th.参数解析:本例中一共写入10w条消息,吞吐量为9.14 MB/sec,每次写入的平均延迟为187.68毫秒,最大的延迟为424.00毫秒。
3)Kafka Consumer压力测试
Consumer的测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能。
[hadoop@bigdata02 kafka]$ bin/kafka-consumer-perf-test.sh --zookeeper bigdata02:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1参数说明:
--zookeeper 指定zookeeper的链接信息
--topic 指定topic的名称
--fetch-size 指定每次fetch的数据的大小
--messages 总共要消费的消息个数
测试结果说明:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153
开始测试时间,测试结束数据,共消费数据9.5368MB,吞吐量2.0714MB/s,共消费100010条,平均每秒消费21722.4153条。
Kafka机器数量(经验公式)=2*(峰值生产速度*副本数/100)+1
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。
比如我们的峰值生产速度是50M/s。副本数为2。
Kafka机器数量=2*(50*2/100)+ 1=3台
集群规划
服务器bigdata02
服务器bigdata03
服务器bigdata04
Flume(消费Kafka)
Flume
1)Flume配置分析
2)Flume的具体配置如下:
(1)在bigdata04的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
[hadoop@bigdata04 conf]$ vim kafka-flume-hdfs.conf在文件配置如下内容
## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = bigdata02:9092,bigdata03:9092,bigdata04:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = bigdata02:9092,bigdata03:9092,bigdata04:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 ## channel2 a1.channels.c2.type = file a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2 a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/ a1.channels.c2.maxFileSize = 2146435071 a1.channels.c2.capacity = 1000000 a1.channels.c2.keep-alive = 6 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2
1)FileChannel和MemoryChannel区别
MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。
选型:
金融类公司、对钱要求非常准确的公司通常会选择FileChannel
传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。
2)FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformancecheckpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
3)Sink:HDFS Sink
(1)HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
(2)HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:
(1)文件在达到128M时会滚动生成新文件
(2)文件创建超3600秒时会滚动生成新文件
1)在/home/hadoop/bin目录下创建脚本f2.sh
[hadoop@bigdata02 bin]$ vim f2.sh在脚本中填写如下内容
#! /bin/bash case $1 in "start"){ for i in bigdata04 do echo " --------启动 $i 消费flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt 2>&1 &" done };; "stop"){ for i in bigdata04 do echo " --------停止 $i 消费flume-------" ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac2)增加脚本执行权限
[hadoop@bigdata02 bin]$ chmod 777 f2.sh3)f2集群启动脚本
[hadoop@bigdata02 module]$ f2.sh start4)f2集群停止脚本
[hadoop@bigdata02 module]$ f2.sh stop1)问题描述:如果启动消费Flume抛出如下异常
ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded2)解决方案步骤:
(1)在bigdata02服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"(2)同步配置到bigdata03、bigdata04服务器
[hadoop@bigdata02 conf]$ xsync flume-env.sh3)Flume内存参数设置及优化
JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)
-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。
https://www.cnblogs.com/leeego-123/p/11572786.html
产生数据写入到/tmp/logs
java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.bigdata.appclient.AppMain >/opt/module/test.log
1)在/home/hadoop/bin目录下创建脚本cluster.sh
[hadoop@bigdata02 bin]$ vim cluster.sh在脚本中填写如下内容
#! /bin/bash case $1 in "start"){ echo " -------- 启动 集群 -------" echo " -------- 启动 hadoop集群 -------" /opt/module/hadoop-2.7.2/sbin/start-dfs.sh ssh bigdata03 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh" #启动 Zookeeper集群 zk.sh start sleep 4s; #启动 Flume采集集群 f1.sh start #启动 Kafka采集集群 kf.sh start sleep 6s; #启动 Flume消费集群 f2.sh start };; "stop"){ echo " -------- 停止 集群 -------" #停止 Flume消费集群 f2.sh stop #停止 Kafka采集集群 kf.sh stop sleep 6s; #停止 Flume采集集群 f1.sh stop #停止 Zookeeper集群 zk.sh stop echo " -------- 停止 hadoop集群 -------" ssh bigdata03 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh" /opt/module/hadoop-2.7.2/sbin/stop-dfs.sh };; esac2)增加脚本执行权限
[hadoop@bigdata02 bin]$ chmod 777 cluster.sh3)cluster集群启动脚本
[hadoop@bigdata02 module]$ cluster.sh start4)cluster集群停止脚本
[hadoop@bigdata02 module]$ cluster.sh stop