神策埋点后端方案

it2026-02-13  9

神策埋点后端方案 一、埋点方案 openresty+lua+logstash 二、openresty a. openresty安装 依赖包安装

yum install pcre-devel openssl-devel gcc curl

openresty安装

wget https://openresty.org/download/openresty-1.17.8.2.tar.gz tar xzvf openresty-1.17.8.2.tar.gz cd openresty-1.17.8.2 ./configure make make install

b. zlib依赖安装

yum install -y gcc gcc-c++ make automake yum install -y cmake wget https://github.com/brimworks/lua-zlib/archive/master.zip unzip master.zip cd lua-zlib-master/ cmake -DLUA_INCLUDE_DIR=/usr/local/openresty/luajit/include/luajit-2.1 -DLUA_LIBRARIES=/usr/local/openresty/luajit/lib -DUSE_LUAJIT=ON -DUSE_LUA=OFF make cp zlib.so /usr/local/openresty/lualib/zlib.so

c. lua脚本

cd /usr/local/openresty/nginx/conf/ vim lua_zlib_body.lua local zlib = require "zlib" local cjson = require("cjson") local function set_data(str) local s = string.sub(str,0,1) if s == "{" then str = "["..str.."]" end ngx.var.data=str end local method=ngx.req.get_method() if method == "POST" then ngx.req.read_body() local body = ngx.req.get_post_args() local gzip = body["gzip"] if gzip == "1" then local data_list = body["data_list"] if data_list then local stream = zlib.inflate() local dec= ngx.decode_base64(data_list) local str = stream(dec) set_data(str) end else local data = body["data"] if data then local dec= ngx.decode_base64(data) set_data(dec) end end elseif method == "GET" then local args = ngx.req.get_uri_args() local data = args["data"] local dec= ngx.decode_base64(data) set_data(dec) end ngx.say("success")

d. nginx配置

vim nginx.conf #user nobody; worker_processes 1; #error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info; #pid logs/nginx.pid; events { worker_connections 1024; } http { include mime.types; default_type application/octet-stream; log_format user_log_format escape=json $data; sendfile on; keepalive_timeout 65; gzip on; server { listen 8032; default_type 'application/json'; charset utf-8; location /logate/data{ add_header 'Access-Control-Allow-Origin' $http_origin; add_header 'Access-Control-Allow-Credentials' 'true'; add_header 'Access-Control-Allow-Methods' 'GET, HEAD, POST, PUT, PATCH, DELETE, OPTIONS'; if ($request_method = 'OPTIONS') { add_header 'Access-Control-Max-Age' 1728000; add_header 'Content-Type' 'text/plain; charset=utf-8'; add_header 'Content-Length' 0; return 204; } set $data ''; content_by_lua_file "/usr/local/openresty/nginx/conf/lua_zlib_body.lua"; access_log /data/buried_point/data.log user_log_format; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } } }

e. nginx启动

mkdir /data/buried_point /usr/local/openresty/nginx/sbin/nginx ps -ef|grep nginx

三、logstash a.下载:

cd /data wget https://mirrors.huaweicloud.com/logstash/7.8.0/logstash-7.8.0.tar.gz

b.解压:

tar -zxvf logstash-7.8.0.tar.gz cd logstash-7.8.0/bin

c.安装Webhdfs插件

./logstash-plugin install logstash-output-webhdfs

d.编辑json文件: buried_point_logstash.conf

input { file { path => "/data/buried_point/data.log" start_position => "beginning" } } output { webhdfs { host => "hadoop-master01" port => 50070 path => "/tmp/buried-point-data-%{+YYYY-MM-dd}.log" user => "hdfs" retry_times => 100 } }

e.启动

../bin/logstash -f buried_point_logstash.conf &

四、azkaban a. job buried-point-start.job

#buried_point_data__analyze_workflow_start.job type=command user.to.proxy=spark command=echo "buried_point_data__analyze_workflow_start!" command.1=sh job/buried-point-hdfs2hive.sh "${dbName}"

b. perproties buried-point.properties

dbName=test

c. sh buried-point-hdfs2hive.sh

#!/bin/bash lastDay=$(date --date='1 day ago' "+%Y%m%d") lastDayFormat=$(date --date='1 day ago' "+%Y-%m-%d") dbName=$1 #filepath=/tmp/buried-point-data-2020-09-10.log filepath=/tmp/buried-point-data-${lastDayFormat}.log /usr/hdp/current/hive-client/bin/hive -d lastDay="${lastDay}" -d file="${filepath}" -d dbName="${dbName}" -f "./sql/event_tracking.sql"

d. job sql event_tracking.sql

SET hive.exec.dynamic.partition=true; SET hive.exec.dynamic.partition.mode=nonstrict; drop table if exists ${dbName}.event_tracking; create table ${dbName}.event_tracking ( creat_time string, node_id string, source_event_tracking STRING COMMENT '埋点原始数据' )COMMENT '埋点原始数据表' ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS TEXTFILE; load data inpath "${file}" overwrite into table ${dbName}.event_tracking; select distinct_id,from_unixtime(cast(time/1000 as int)),event,properties from ( SELECT explode(split(regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(source_event_tracking,'\\$',''),"\\\\",""),'\\[',''), '\\]',''),'\\}\\,\\{','\\}\\;\\{'),'\\;')) as json from ${dbName}.event_tracking ) a lateral view json_tuple(a.json,'distinct_id','time','event','properties') b as distinct_id,time,event,properties;
最新回复(0)