大数据数仓建模(4)

it2023-01-13  56

                                        第4章 数据采集模块

4.1 Hadoop安装

详见:大数据技术之Hadoop(入门)

1)测试集群规划:

 

服务器bigdata02

服务器bigdata03

服务器bigdata04

HDFS

NameNode

DataNode

DataNode

DataNode

SecondaryNameNode

Yarn

NodeManager

Resourcemanager

NodeManager

NodeManager

注意:尽量使用离线方式安装,CDH要收费了,使用的人会越来越少...

4.1.1 项目经验之HDFS存储多目录

若HDFS存储空间紧张,需要对DataNode进行磁盘扩展。

1)在DataNode节点增加磁盘并进行挂载。

2)在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题。

<property>     <name>dfs.datanode.data.dir</name> <value>file:///${hadoop.tmp.dir}/dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value> </property>

3)增加磁盘后,保证每个目录数据均衡

开启数据均衡命令:

bin/start-balancer.sh –threshold 10

对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。

停止数据均衡命令:

bin/stop-balancer.sh

4.1.2 项目经验之支持LZO压缩配置

1)hadoop本身并不支持lzo压缩,故需要使用twitter提供的hadoop-lzo开源组件。hadoop-lzo需依赖hadoop和lzo进行编译,编译步骤如下。

Hadoop支持LZO 0. 环境准备 maven(下载安装,配置环境变量,修改sitting.xml加阿里云镜像) gcc-c++ zlib-devel autoconf automake libtool 通过yum安装即可,yum -y install gcc-c++ lzo-devel zlib-devel autoconf automake libtool 1. 下载、安装并编译LZO wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz tar -zxvf lzo-2.10.tar.gz cd lzo-2.10 ./configure -prefix=/usr/local/hadoop/lzo/ make make install 2. 编译hadoop-lzo源码 2.1 下载hadoop-lzo的源码,下载地址:https://github.com/twitter/hadoop-lzo/archive/master.zip 2.2 解压之后,修改pom.xml <hadoop.current.version>2.7.2</hadoop.current.version> 2.3 声明两个临时环境变量 export C_INCLUDE_PATH=/usr/local/hadoop/lzo/include export LIBRARY_PATH=/usr/local/hadoop/lzo/lib 2.4 编译 进入hadoop-lzo-master,执行maven编译命令 mvn package -Dmaven.test.skip=true 2.5 进入target,hadoop-lzo-0.4.21-SNAPSHOT.jar 即编译成功的hadoop-lzo组件

2)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/

[hadoop@bigdata02 common]$ pwd /opt/module/hadoop-2.7.2/share/hadoop/common [hadoop@bigdata02 common]$ ls hadoop-lzo-0.4.20.jar

3)同步hadoop-lzo-0.4.20.jar到bigdata03、bigdata04

[hadoop@bigdata02 common]$ xsync hadoop-lzo-0.4.20.jar

4)core-site.xml增加配置支持LZO压缩

<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>io.compression.codecs</name> <value> org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec </value> </property> <property> <name>io.compression.codec.lzo.class</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property> </configuration>

5)同步core-site.xml到bigdata03、bigdata04

[hadoop@bigdata02 hadoop]$ xsync core-site.xml

6)启动及查看集群

[hadoop@bigdata02 hadoop-2.7.2]$ sbin/start-dfs.sh [hadoop@bigdata03 hadoop-2.7.2]$ sbin/start-yarn.sh

4.1.3 项目经验之LZO创建索引

1)创建LZO文件的索引,LZO压缩文件的可切片特性依赖于其索引,故我们需要手动为LZO压缩文件创建索引。若无索引,则LZO文件的切片只有一个。

hadoop jar /path/to/your/hadoop-lzo.jar com.hadoop.compression.lzo.DistributedLzoIndexer bigtable.lzo

2)测试

(1)将bigtable.lzo(150M)上传到集群的根目录

[hadoop@bigdata02 module]$ hadoop fs -mkdir /input [hadoop@bigdata02 module]$ hadoop fs -put bigtable.lzo /input

(2)执行wordcount程序

[hadoop@bigdata02 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount /input /output1

   

 

注意:这个lzo文件有214m,默认的块大小是128m,那么应该是有两个块,我们通过运行mapreduce程序发现,只有一个切分,需要进行下面的步骤才行。

   (3)对上传的LZO文件建索引

[hadoop@bigdata02 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/common/hadoop-lzo-0.4.20.jar  com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo

(4)再次执行WordCount程序

[hadoop@bigdata02 module]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount /input /output2

 

4.1.4 项目经验之基准测试

面试官可能会问,我有10T的数据,读完需要多长时间,写需要多长时间?你应该回答,我们会先做基准测试,测试出这个集群的读写能力,计算的能力...

1) 测试HDFS写性能

测试内容:向HDFS集群写10个128M的文件

[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -write -nrFiles 10 -fileSize 128MB 19/05/02 11:45:23 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write 19/05/02 11:45:23 INFO fs.TestDFSIO:            Date & time: Thu May 02 11:45:23 CST 2019 19/05/02 11:45:23 INFO fs.TestDFSIO:        Number of files: 10 19/05/02 11:45:23 INFO fs.TestDFSIO: Total MBytes processed: 1280.0 19/05/02 11:45:23 INFO fs.TestDFSIO:      Throughput mb/sec: 10.69751115716984 19/05/02 11:45:23 INFO fs.TestDFSIO: Average IO rate mb/sec: 14.91699504852295 19/05/02 11:45:23 INFO fs.TestDFSIO:  IO rate std deviation: 11.160882132355928 19/05/02 11:45:23 INFO fs.TestDFSIO:     Test exec time sec: 52.315

最重要的参数是 Throughput mb/sec: 10.69751115716984 表示平均吞吐量为10.6975m/s

2)测试HDFS读性能

测试内容:读取HDFS集群10个128M的文件

[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -read -nrFiles 10 -fileSize 128MB 19/05/02 11:56:36 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read 19/05/02 11:56:36 INFO fs.TestDFSIO:            Date & time: Thu May 02 11:56:36 CST 2019 19/05/02 11:56:36 INFO fs.TestDFSIO:        Number of files: 10 19/05/02 11:56:36 INFO fs.TestDFSIO: Total MBytes processed: 1280.0 19/05/02 11:56:36 INFO fs.TestDFSIO:      Throughput mb/sec: 16.001000062503905 19/05/02 11:56:36 INFO fs.TestDFSIO: Average IO rate mb/sec: 17.202795028686523 19/05/02 11:56:36 INFO fs.TestDFSIO:  IO rate std deviation: 4.881590515873911 19/05/02 11:56:36 INFO fs.TestDFSIO:     Test exec time sec: 49.116 19/05/02 11:56:36 INFO fs.TestDFSIO:

3)删除测试生成数据

[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar TestDFSIO -clean

4)使用Sort程序评测MapReduce

(1)使用RandomWriter来产生随机数,每个节点运行10个Map任务,每个Map产生大约1G大小的二进制随机数

[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar randomwriter random-data

(2)执行Sort程序

[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar sort random-data sorted-data

(3)验证数据是否真正排好序了

[hadoop@bigdata02 mapreduce]$ hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.2-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data

4.1.5 项目经验之Hadoop参数调优

1)HDFS参数调优hdfs-site.xml

dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为8台时,此参数设置为60 

作用:dataNode与namenode之间通信需要有一定的并发度,如果太低,datanode需要排队,如果太高,集群也没有这么多资源,需要有一个比较合理的并发度...

The number of Namenode RPC server threads that listen to requests from clients. If dfs.namenode.servicerpc-address is not configured then Namenode RPC server threads listen to requests from all nodes. NameNode有一个工作线程池,用来处理不同DataNode的并发心跳以及客户端并发的元数据操作。对于大集群或者有大量客户端的集群来说,通常需要增大参数dfs.namenode.handler.count的默认值10。设置该值的一般原则是将其设置为集群大小的自然对数乘以20,即20 * log2(Cluster Size),N为集群大小。

2)YARN参数调优yarn-site.xml

(1)情景描述:总共7台机器,每天几亿条数据,数据源->Flume->Kafka->HDFS->Hive

面临问题:数据统计主要用HiveSQL,没有数据倾斜,小文件已经做了合并处理,开启的JVM重用,而且IO没有阻塞,内存用了不到50%但是还是跑的非常慢,而且数据量洪峰过来时,整个集群都会宕掉。基于这种情况有没有优化方案。

(2)解决办法:

内存利用率不够。这个一般是Yarn的2个配置造成的,单个任务可以申请的最大内存大小,和Hadoop单个节点可用内存大小。调节这两个参数能提高系统内存的利用率。

(a)yarn.nodemanager.resource.memory-mb

表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。

(b)yarn.scheduler.maximum-allocation-mb

单个任务可申请的最多物理内存量,默认是8192(MB)。

3)Hadoop宕机

(1)如果MR造成系统宕机。此时要控制Yarn同时运行的任务数,和每个任务申请的最大内存。调整参数:yarn.scheduler.maximum-allocation-mb(单个任务可申请的最多物理内存量,默认是8192MB)

(2)如果写入文件过量造成NameNode宕机。那么调高Kafka的存储大小,控制从Kafka到HDFS的写入速度。高峰期的时候用Kafka进行缓存,高峰期过去数据同步会自动跟上。

4.2 Zookeeper安装

4.2.1 安装ZK

详见:大数据技术之Zookeeper

集群规划

 

服务器bigdata02

服务器bigdata03

服务器bigdata04

Zookeeper

Zookeeper

Zookeeper

Zookeeper

4.2.2 ZK集群启动停止脚本

1)在bigdata02的/home/hadoop/bin目录下创建脚本

[hadoop@bigdata02 bin]$ vim zk.sh

在脚本中编写如下内容

#! /bin/bash case $1 in "start"){ for i in bigdata02 bigdata03 bigdata04 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start" done };; "stop"){ for i in bigdata02 bigdata03 bigdata04 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop" done };; "status"){ for i in bigdata02 bigdata03 bigdata04 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status" done };; esac

2)增加脚本执行权限

[hadoop@bigdata02 bin]$ chmod 777 zk.sh

3)Zookeeper集群启动脚本

[hadoop@bigdata02 module]$ zk.sh start

4)Zookeeper集群停止脚本

[hadoop@bigdata02 module]$ zk.sh stop

4.2.3 项目经验之Linux环境变量

1)修改/etc/profile文件:用来设置系统环境参数,比如$PATH. 这里面的环境变量是对系统内所有用户生效。使用bash命令,需要source  /etc/profile一下。

2)修改~/.bashrc文件:针对某一个特定的用户,环境变量的设置只对该用户自己有效。使用bash命令,只要以该用户身份运行命令行就会读取该文件

3)把/etc/profile里面环境变量追加到~/.bashrc目录

[hadoop@bigdata02 ~]$ cat /etc/profile >> ~/.bashrc [hadoop@bigdata03 ~]$ cat /etc/profile >> ~/.bashrc [hadoop@bigdata04 ~]$ cat /etc/profile >> ~/.bashrc

4)说明

登录式Shell,采用用户名比如hadoop登录,会自动加载/etc/profile

非登录式Shell,采用ssh 比如ssh bigdata03登录,不会自动加载/etc/profile,会自动加载~/.bashrc

4.3 日志生成

4.3.1 日志启动

1)代码参数说明

// 参数一:控制发送每条的延时时间,默认是0 Long delay = args.length > 0 ? Long.parseLong(args[0]) : 0L; // 参数二:循环遍历次数 int loop_len = args.length > 1 ? Integer.parseInt(args[1]) : 1000;

2)将生成的jar包log-collector-0.0.1-SNAPSHOT-jar-with-dependencies.jar拷贝bigdata02服务器/opt/module,并同步到bigdata03/opt/module路径

[hadoop@bigdata02 module]$ xsync log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar

3)在bigdata02上执行jar程序

[hadoop@bigdata02 module]$ java -classpath log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.bigdata.appclient.AppMain  >/opt/module/test.log

说明1:

java -classpath 需要在jar包后面指定全类名;

java -jar 需要查看一下解压的jar包META-INF/ MANIFEST.MF文件中,Main-Class是否有全类名。如果有可以用java -jar,如果没有就需要用到java -classpath

说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

标准输入0:从键盘获得输入 /proc/self/fd/0

标准输出1:输出到屏幕(即控制台) /proc/self/fd/1

错误输出2:输出到屏幕(即控制台) /proc/self/fd/2

4)在/tmp/logs路径下查看生成的日志文件

[hadoop@bigdata02 module]$ cd /tmp/logs/ [hadoop@bigdata02 logs]$ ls app-2020-03-10.log

4.3.2 集群日志生成启动脚本

1)在/home/hadoop/bin目录下创建脚本lg.sh

[hadoop@bigdata02 bin]$ vim lg.sh

2)在脚本中编写如下内容

#! /bin/bash for i in bigdata02 bigdata03  do ssh $i "java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.bigdata.appclient.AppMain $1 $2 >/dev/null 2>&1 &" done

3)修改脚本执行权限

[hadoop@bigdata02 bin]$ chmod 777 lg.sh

4)启动脚本

[hadoop@bigdata02 module]$ lg.sh

5)分别在bigdata02、bigdata03的/tmp/logs目录上查看生成的数据

[hadoop@bigdata02 logs]$ ls app-2020-03-10.log [hadoop@bigdata03 logs]$ ls app-2020-03-10.log

4.3.3 集群时间同步修改脚本(非正规---临时脚本)

企业开发时,参考hadoop集群时间同步。

1)在/home/hadoop/bin目录下创建脚本dt.sh

[hadoop@bigdata02 bin]$ vim dt.sh

2)在脚本中编写如下内容

#!/bin/bash for i in bigdata02 bigdata03 bigdata04 do         echo "========== $i =========="         ssh -t $i "sudo date -s $1" done

注意:ssh -t 通常用于ssh远程执行sudo命令

3)修改脚本执行权限

[hadoop@bigdata02 bin]$ chmod 777 dt.sh

4)启动脚本

[hadoop@bigdata02 bin]$ dt.sh 2020-03-10

4.3.4 集群所有进程查看脚本

1)在/home/hadoop/bin目录下创建脚本xcall.sh

[hadoop@bigdata02 bin]$ vim xcall.sh

2)在脚本中编写如下内容

#! /bin/bash for i in bigdata02 bigdata03 bigdata04 do         echo --------- $i ----------         ssh $i "$*" done

3)修改脚本执行权限

[hadoop@bigdata02 bin]$ chmod 777 xcall.sh

4)启动脚本

[hadoop@bigdata02 bin]$ xcall.sh jps

4.4 采集日志Flume

4.4.1 日志采集Flume安装

详见:大数据技术之Flume

集群规划:

 

服务器bigdata02

服务器bigdata03

服务器bigdata04

Flume(采集日志)

Flume

Flume

 

4.4.2 项目经验之Flume组件

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的优势

TailDir Source:断点续传、多目录Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。 tail -F / f  

Spooling Directory Source监控目录,不支持断点续传。

(2)batchSize大小如何设置?

答:batchSize是每次读取的event数量,Event 1K左右时,500-1000合适(默认为100)

2)Channel

采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel数据存储在Kafka里面,所以数据是存储在磁盘中。

注意在Flume1.7以前,Kafka Channel很少有人使用,因为发现parseAsFlumeEvent这个配置起不了作用。也就是无论parseAsFlumeEvent配置为true还是false,都会转为Flume Event。这样的话,造成的结果是,会始终都把Flume的headers中的信息混合着内容一起写入Kafka的消息中,这显然不是我所需要的,我只是需要把内容写入即可。

面试问题:如何选择memory channel 和 file channel?

memory channel  ---》 channel数据在内存里面,数据读写快,如果宕机,会丢失数据

file channel --》 速度就没有那么快,数据存放在磁盘,不会丢失数据,安全性更高

flume 会丢失数据吗?

flume是支持事务,put事务,take事务,可以认为是不丢失数据的,但是如果你用是memory channel 如果发生意外的宕机,memory里面的数据就会丢失..

flume一般来说是不丢数据的,支持事务的

4.4.3 日志采集Flume配置

1)Flume配置分析

 

Flume直接读log日志的数据,log日志的格式是app-yyyy-mm-dd.log。

2)Flume的具体配置如下:

(1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

[hadoop@bigdata02 conf]$ vim file-flume-kafka.conf

在文件配置如下内容

a1.sources=r1 a1.channels=c1 c2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors =  i1 i2 a1.sources.r1.interceptors.i1.type = com.bigdata.flume.interceptor.LogETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.bigdata.flume.interceptor.LogTypeInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = bigdata02:9092,bigdata03:9092,bigdata04:9092 a1.channels.c1.kafka.topic = topic_start a1.channels.c1.parseAsFlumeEvent = false a1.channels.c1.kafka.consumer.group.id = flume-consumer a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = bigdata02:9092,bigdata03:9092,bigdata04:9092 a1.channels.c2.kafka.topic = topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.group.id = flume-consumer

注意:com.bigdata.flume.interceptor.LogETLInterceptorcom.bigdata.flume.interceptor.LogTypeInterceptor自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

 

面试就是一场秀,不能翻车

4.4.4 Flume的ETL和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

1)创建Maven工程flume-interceptor

2)创建包名:com.bigdata.flume.interceptor

3)在pom.xml文件中添加如下配置

<dependencies>     <dependency>         <groupId>org.apache.flume</groupId>         <artifactId>flume-ng-core</artifactId>         <version>1.7.0</version>     </dependency> </dependencies> <build>     <plugins>         <plugin>             <artifactId>maven-compiler-plugin</artifactId>             <version>2.3.2</version>             <configuration>                 <source>1.8</source>                 <target>1.8</target>             </configuration>         </plugin>         <plugin>             <artifactId>maven-assembly-plugin</artifactId>             <configuration>                 <descriptorRefs>                     <descriptorRef>jar-with-dependencies</descriptorRef>                 </descriptorRefs>             </configuration>             <executions>                 <execution>                     <id>make-assembly</id>                     <phase>package</phase>                     <goals>                         <goal>single</goal>                     </goals>                 </execution>             </executions>         </plugin>     </plugins> </build>

4)在com.bigdata.flume.interceptor包下创建LogETLInterceptor类名

Flume ETL拦截器LogETLInterceptor

package com.bigdata.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; public class LogETLInterceptor implements Interceptor {     @Override     public void initialize() {     }     @Override     public Event intercept(Event event) {         // 1 获取数据         byte[] body = event.getBody();         String log = new String(body, Charset.forName("UTF-8"));         // 2 判断数据类型并向Header中赋值         if (log.contains("start")) {             if (LogUtils.validateStart(log)){                 return event;             }         }else {             if (LogUtils.validateEvent(log)){                 return event;             }         }         // 3 返回校验结果         return null;     }     @Override     public List<Event> intercept(List<Event> events) {         ArrayList<Event> interceptors = new ArrayList<>();         for (Event event : events) {             Event intercept1 = intercept(event);             if (intercept1 != null){                 interceptors.add(intercept1);             }         }         return interceptors;     }     @Override     public void close() {     }     public static class Builder implements Interceptor.Builder{         @Override         public Interceptor build() {             return new LogETLInterceptor();         }         @Override         public void configure(Context context) {         }     } } 4)Flume日志过滤工具类 package com.bigdata.flume.interceptor; import org.apache.commons.lang.math.NumberUtils; public class LogUtils {     public static boolean validateEvent(String log) {         // 服务器时间 | json         // 1549696569054 | {"cm":{"ln":"-89.2","sv":"V2.0.4","os":"8.2.0","g":"M67B4QYU@gmail.com","nw":"4G","l":"en","vc":"18","hw":"1080*1920","ar":"MX","uid":"u8678","t":"1549679122062","la":"-27.4","md":"sumsung-12","vn":"1.1.3","ba":"Sumsung","sr":"Y"},"ap":"weather","et":[]}         // 1 切割         String[] logContents = log.split("\\|");         // 2 校验         if(logContents.length != 2){             return false;         }         //3 校验服务器时间         if (logContents[0].length()!=13 || !NumberUtils.isDigits(logContents[0])){             return false;         }         // 4 校验json         if (!logContents[1].trim().startsWith("{") || !logContents[1].trim().endsWith("}")){             return false;         }         return true;     }     public static boolean validateStart(String log) {  // {"action":"1","ar":"MX","ba":"HTC","detail":"542","en":"start","entry":"2","extend1":"","g":"S3HQ7LKM@gmail.com","hw":"640*960","l":"en","la":"-43.4","ln":"-98.3","loading_time":"10","md":"HTC-5","mid":"993","nw":"WIFI","open_ad_type":"1","os":"8.2.1","sr":"D","sv":"V2.9.0","t":"1559551922019","uid":"993","vc":"0","vn":"1.1.5"}         if (log == null){             return false;         }         // 校验json         if (!log.trim().startsWith("{") || !log.trim().endsWith("}")){             return false;         }         return true;     } }

5)Flume日志类型区分拦截器LogTypeInterceptor

package com.bigdata.flume.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Map; public class LogTypeInterceptor implements Interceptor {     @Override     public void initialize() {     }     @Override     public Event intercept(Event event) {         // 区分日志类型:   body  header         // 1 获取body数据         byte[] body = event.getBody();         String log = new String(body, Charset.forName("UTF-8"));         // 2 获取header         Map<String, String> headers = event.getHeaders();         // 3 判断数据类型并向Header中赋值         if (log.contains("start")) {             headers.put("topic","topic_start");         }else {             headers.put("topic","topic_event");         }         return event;     }     @Override     public List<Event> intercept(List<Event> events) {         ArrayList<Event> interceptors = new ArrayList<>();         for (Event event : events) {             Event intercept1 = intercept(event);             interceptors.add(intercept1);         }         return interceptors;     }     @Override     public void close() {     }     public static class Builder implements  Interceptor.Builder{         @Override         public Interceptor build() {             return new LogTypeInterceptor();         }         @Override         public void configure(Context context) {         }     } }

6)打包

拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入Flume的lib文件夹下面。

 

注意:为什么不需要依赖包?因为依赖包在flume的lib目录下面已经存在了。

7)需要先将打好的包放入到bigdata02的/opt/module/flume/lib文件夹下面。

[hadoop@bigdata02 lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT.jar

8)分发Flume到bigdata03、bigdata04

[hadoop@bigdata02 module]$ xsync flume/ [hadoop@bigdata02 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

4.4.5 日志采集Flume启动停止脚本

1)在/home/hadoop/bin目录下创建脚本f1.sh

[hadoop@bigdata02 bin]$ vim f1.sh

在脚本中填写如下内容

#! /bin/bash case $1 in "start"){         for i in bigdata02 bigdata03         do                 echo " --------启动 $i 采集flume-------"                 ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/test1 2>&1  &"         done };; "stop"){         for i in bigdata02 bigdata03         do                 echo " --------停止 $i 采集flume-------"                 ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs kill"         done };; esac

说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令

说明2:awk 默认分隔符为空格

说明3:xargs 表示取出前面命令运行的结果,作为后面命令的输入参数。

2)增加脚本执行权限

[hadoop@bigdata02 bin]$ chmod 777 f1.sh

3)f1集群启动脚本

[hadoop@bigdata02 module]$ f1.sh start

4)f1集群停止脚本

[hadoop@bigdata02 module]$ f1.sh stop

4.5 Kafka安装

 

4.5.1 Kafka集群安装

详见:大数据技术之Kafka

集群规划:

 

服务器bigdata02

服务器bigdata03

服务器bigdata04

Kafka

Kafka

Kafka

Kafka

4.5.2 Kafka集群启动停止脚本

1)在/home/hadoop/bin目录下创建脚本kf.sh

[hadoop@bigdata02 bin]$ vim kf.sh

在脚本中填写如下内容

#! /bin/bash case $1 in "start"){         for i in bigdata02 bigdata03 bigdata04         do                 echo " --------启动 $i Kafka-------"                 ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "         done };; "stop"){         for i in bigdata02 bigdata03 bigdata04         do                 echo " --------停止 $i Kafka-------"                 ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"         done };; esac

2)增加脚本执行权限

[hadoop@bigdata02 bin]$ chmod 777 kf.sh

3)kf集群启动脚本

[hadoop@bigdata02 module]$ kf.sh start

4)kf集群停止脚本

[hadoop@bigdata02 module]$ kf.sh stop

4.5.3 查看Kafka Topic列表

[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --zookeeper bigdata02:2181 --list

4.5.4 创建Kafka Topic

进入到/opt/module/kafka/目录下分别创建:启动日志主题、事件日志主题。

1)创建启动日志主题

[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181  --create --replication-factor 1 --partitions 1 --topic topic_start

2)创建事件日志主题

[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181  --create --replication-factor 1 --partitions 1 --topic topic_event

4.5.5 删除Kafka Topic

1)删除启动日志主题

[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --delete --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181 --topic topic_start

2)删除事件日志主题

[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --delete --zookeeper bigdata02:2181,bigdata03:2181,bigdata04:2181 --topic topic_event

4.5.6 Kafka生产消息

[hadoop@bigdata02 kafka]$ bin/kafka-console-producer.sh \ --broker-list bigdata02:9092 --topic topic_start >hello world >hadoop  bigdata

4.5.7 Kafka消费消息

[hadoop@bigdata02 kafka]$ bin/kafka-console-consumer.sh \ --bootstrap-server bigdata02:9092 --from-beginning --topic topic_start --from-beginning:会把主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

4.5.8 查看Kafka Topic详情

[hadoop@bigdata02 kafka]$ bin/kafka-topics.sh --zookeeper bigdata02:2181 \ --describe --topic topic_start

4.5.9 项目经验之Kafka压力测试

1)Kafka压测

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。 

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

2)Kafka Producer压力测试

(1)在/opt/module/kafka/bin目录下面有这两个文件。我们来测试一下

[hadoop@bigdata02 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=bigdata02:9092,bigdata03:9092,bigdata04:9092

说明:

record-size是一条信息有多大,单位是字节。

num-records是总共发送多少条信息。

throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。

(2)Kafka会打印下面的信息

100000 records sent, 95877.277085 records/sec (9.14 MB/sec), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th.

参数解析:本例中一共写入10w条消息,吞吐量为9.14 MB/sec,每次写入的平均延迟为187.68毫秒,最大的延迟为424.00毫秒。

3)Kafka Consumer压力测试

Consumer的测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能。

[hadoop@bigdata02 kafka]$ bin/kafka-consumer-perf-test.sh --zookeeper bigdata02:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1

参数说明:

--zookeeper 指定zookeeper的链接信息

--topic 指定topic的名称

--fetch-size 指定每次fetch的数据的大小

--messages 总共要消费的消息个数

测试结果说明:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec

2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153

开始测试时间,测试结束数据,共消费数据9.5368MB,吞吐量2.0714MB/s,共消费100010条,平均每秒消费21722.4153条。

4.5.10 项目经验之Kafka机器数量计算

Kafka机器数量(经验公式)=2*(峰值生产速度*副本数/100)+1

先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。

比如我们的峰值生产速度是50M/s。副本数为2。

Kafka机器数量=2*(50*2/100)+ 1=3台

4.6 消费Kafka数据Flume

 

集群规划

 

服务器bigdata02

服务器bigdata03

服务器bigdata04

Flume(消费Kafka)

 

 

Flume

4.6.1 日志消费Flume配置

1)Flume配置分析

 

2)Flume的具体配置如下:

(1)在bigdata04的/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

[hadoop@bigdata04 conf]$ vim kafka-flume-hdfs.conf

在文件配置如下内容

## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = bigdata02:9092,bigdata03:9092,bigdata04:9092 a1.sources.r1.kafka.topics=topic_start ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = bigdata02:9092,bigdata03:9092,bigdata04:9092 a1.sources.r2.kafka.topics=topic_event ## channel1 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 ## channel2 a1.channels.c2.type = file a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2 a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/ a1.channels.c2.maxFileSize = 2146435071 a1.channels.c2.capacity = 1000000 a1.channels.c2.keep-alive = 6 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = logstart- ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = logevent- ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 10 a1.sinks.k2.hdfs.rollSize = 134217728 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2

 

4.6.2 项目经验之Flume组件

1)FileChannel和MemoryChannel区别

MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

选型:

金融类公司、对钱要求非常准确的公司通常会选择FileChannel

传输的是普通日志信息(京东内部一天丢100万-200万条,这是非常正常的),通常选择MemoryChannel。

2)FileChannel优化

通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

官方说明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

3)Sink:HDFS Sink

(1)HDFS存入大量小文件,有什么影响?

元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

(2)HDFS小文件处理

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0几个参数综合作用,效果如下:

(1)文件在达到128M时会滚动生成文件

(2)文件创建超3600时会滚动生成文件

4.6.3 日志消费Flume启动停止脚本

1)在/home/hadoop/bin目录下创建脚本f2.sh

[hadoop@bigdata02 bin]$ vim f2.sh

在脚本中填写如下内容

#! /bin/bash case $1 in "start"){         for i in bigdata04         do                 echo " --------启动 $i 消费flume-------"                 ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log.txt   2>&1 &"         done };; "stop"){         for i in bigdata04         do                 echo " --------停止 $i 消费flume-------"                 ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"         done };; esac

2)增加脚本执行权限

[hadoop@bigdata02 bin]$ chmod 777 f2.sh

3)f2集群启动脚本

[hadoop@bigdata02 module]$ f2.sh start

4)f2集群停止脚本

[hadoop@bigdata02 module]$ f2.sh stop

4.6.4 项目经验之Flume内存优化

1)问题描述:如果启动消费Flume抛出如下异常

ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解决方案步骤:

(1)在bigdata02服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到bigdata03、bigdata04服务器

[hadoop@bigdata02 conf]$ xsync flume-env.sh

3)Flume内存参数设置及优化

JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

-Xms表示JVM Heap(堆内存)最小尺寸,初始分配-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc

https://www.cnblogs.com/leeego-123/p/11572786.html

4.7 采集通道启动/停止脚本

 产生数据写入到/tmp/logs

java -classpath /opt/module/log-collector-1.0-SNAPSHOT-jar-with-dependencies.jar com.bigdata.appclient.AppMain  >/opt/module/test.log

 

1)在/home/hadoop/bin目录下创建脚本cluster.sh

[hadoop@bigdata02 bin]$ vim cluster.sh

在脚本中填写如下内容

#! /bin/bash case $1 in "start"){ echo " -------- 启动 集群 -------" echo " -------- 启动 hadoop集群 -------" /opt/module/hadoop-2.7.2/sbin/start-dfs.sh ssh bigdata03 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh" #启动 Zookeeper集群 zk.sh start sleep 4s; #启动 Flume采集集群 f1.sh start #启动 Kafka采集集群 kf.sh start sleep 6s; #启动 Flume消费集群 f2.sh start };; "stop"){     echo " -------- 停止 集群 -------"     #停止 Flume消费集群 f2.sh stop #停止 Kafka采集集群 kf.sh stop     sleep 6s; #停止 Flume采集集群 f1.sh stop #停止 Zookeeper集群 zk.sh stop echo " -------- 停止 hadoop集群 -------" ssh bigdata03 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh" /opt/module/hadoop-2.7.2/sbin/stop-dfs.sh };; esac

2)增加脚本执行权限

[hadoop@bigdata02 bin]$ chmod 777 cluster.sh

3)cluster集群启动脚本

[hadoop@bigdata02 module]$ cluster.sh start

4)cluster集群停止脚本

[hadoop@bigdata02 module]$ cluster.sh stop

 

 

 

最新回复(0)