日志文件 部分字段如上
spark-shell下 进行
一、导入日志文件转为df:
scala> val fileRDD = sc.textFile("hdfs://hadoop:9000/app/data/op.log") scala> val opJsonDF = opJsonStr.toDF二、ap为独立字段所以分成三段
scala> val opJscala explodesonObj = opJsonDF.select(get_json_object($"value","$.cm").alias("cm"),get_json_object($"value","$.ap").alias("ap"),get_json_object($"value","$.et").alias("et"))show: 三、先将cm字段通过get_json_object 转化为表结构
scala> val opJsonObj1 = opJsonObj.select($"ap",get_json_object($"cm","$.ln").alias("ln"),get_json_object($"cm","$.sv").alias("sv"),get_json_object($"cm","$.os").alias("os"),get_json_object($"cm","$.g").alias("g") ,get_json_object($"cm","$.mid").alias("mid"),get_json_object($"cm","$.nw").alias("nw"),get_json_object($"cm","$.l").alias("l"),get_json_object($"cm","$.vc").alias("vc"),get_json_object($"cm","$.hw").alias("hw") ,get_json_object($"cm","$.ar").alias("ar"),get_json_object($"cm","$.uid").alias("uid"),get_json_object($"cm","$.t").alias("t"),get_json_object($"cm","$.la").alias("la") ,get_json_object($"cm","$.md").alias("md"),get_json_object($"cm","$.vn").alias("vn"),get_json_object($"cm","$.ba").alias("ba"),get_json_object($"cm","$.sr").alias("sr"),$"et")四、导入必要的包
scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions._ scala> import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala> import org.apache.spark.sql._ import org.apache.spark.sql._五、查询
scala> val opJsonObj2 = opJsonObj1.select($"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr",from_json($"et", ArrayType(StructType(StructField("ett",StringType)::StructField("en",StringType)::StructField("kv",StringType)::Nil))).alias("events"))六、explode 行转列
scala> val opJsonObj3 = opJsonObj2.select($"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr" ,explode($"events").alias("events"))七、行转列后在截分
val opJsonObj4 = opJsonObj3.select($"ap",$"ln",$"sv",$"os",$"g",$"mid",$"nw",$"l",$"vc",$"hw",$"ar",$"uid",$"t",$"la",$"md",$"vn",$"ba",$"sr",$"events.ett",$"events.en",$"events.kv")功能
#将 vc的String类型转换成Integer类型 scala> opJsonObj4.withColumn("vc",$"vc".cast(DataTypes.IntegerType))