用spark中DataFrame对数据进行清洗
1. 准备工作2. 数据去重3. 缺失值处理4. 异常值处理
1. 准备工作
配置环境
import os
from pyspark
import SparkContext
,SparkConf
from pyspark
.sql
import SparkSession
import pyspark
.sql
.functions
as fn
JAVA_HOME
= '/root/bigdata/jdk'
PYSPARK_PYTHON
= "/miniconda2/envs/py365/bin/python"
os
.environ
["JAVA_HOME"] = JAVA_HOME
os
.environ
["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os
.environ
["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
SPARK_APP_NAME
= "dataframetest"
SPARK_URL
= "spark://192.168.19.137:7077"
conf
= SparkConf
()
config
= (
("spark.app.name", SPARK_APP_NAME
),
("spark.executor.memory", "6g"),
("spark.master", SPARK_URL
),
("spark.executor.cores", "4"),
conf
.setAll
(config
)
spark
= SparkSession
.builder
.config
(conf
=conf
).getOrCreate
()
2. 数据去重
'''
1.删除重复数据
1.1删除完全一样的数据
1.2删除某些字段值完全相同的记录
1.3删除无意义字段的重复值
'''
df
= spark
.createDataFrame
([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])
df2
= df
.dropDuplicates
().count
()
结果:
2
df3
= df2
.dropDuplicates
(subset
=[c
for c
in df
.columns
if c
!='id']).count
()
结果:
5
df3
.agg
(fn
.count
('id').alias
('id_dount'),
fn
.countDistinct
('id').alias
('distinct_id_count')).collect
()
结果:
[ROW
(id_count
=5,distinct_id_count
=4)]
df3
.withColumn
('new_id',fn
.monotonically_increasing_id
()).show
()
3. 缺失值处理
'''
2.处理缺失值
2.1 对缺失值进行删除操作(行,列)
2.2 对缺失值进行填充操作(列的均值)
2.3 对缺失值对应的行或列进行标记
'''
df_miss
= spark
.createDataFrame
([
(1, 143.5, 5.6, 28,'M', 100000),
(2, 167.2, 5.4, 45,'M', None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000),
],['id', 'weight', 'height', 'age', 'gender', 'income'])
df_miss
.rdd
.map(lambda row
:(row
['id'],sum([c
==None for c
in row
]))).collect
()
结果:
[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]
df_miss
.agg
(*[(1 - fn
.count
(c
) / fn
.count
(*)).alias
(c
+ '_missing')
for c
in df_miss
.columns
]).show
()
df_miss_no_income
.dropna
(thresh
=3).show
()
df_miss_no_income
= df_miss
.select
([c
for c
in df_miss
.columns
if c
!= 'income'])
means
= df_miss_no_income
.agg
(*[fn
.means
(c
).alias
(c
+'_mean') for c
in
df_miss_no_income
.columns
if c
!= 'gender']).toPandas
().to_dict
('record')[0]
means
['gender'] = 'missing'
df_miss_no_income
.fillna
(means
).show
()
4. 异常值处理
'''
3、异常值处理
异常值:不属于正常的值 包含:缺失值,超过正常范围内的较大值或较小值
分位数去极值 ***
中位数绝对偏差去极值
正态分布去极值
上述三种操作的核心都是:通过原始数据设定一个正常的范围,超过此范围的就是一个异常值
'''
df_outliers
= spark
.createDataFrame
([
(1, 143.5, 5.3, 28),
(2, 154.2, 5.5, 45),
(3, 342.3, 5.1, 99),
(4, 144.5, 5.5, 33),
(5, 133.2, 5.4, 54),
(6, 124.1, 5.1, 21),
(7, 129.2, 5.3, 42),
], ['id', 'weight', 'height', 'age'])
cols
= ['weight','height','age']
bounds
= {}
for col
in cols
:
quantiles
= df_outliers
.approxQuantile
(col
,[0.25,0.75],0.05)
IQR
= quantiles
[1] - quantiles
[0]
bounds
[col
] = [
quantiles
[0] - 1.5*IQR
,
quantiles
[1] + 1.5*IQR
]
结果:
{'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005],
'weight': [91.69999999999999, 191.7]}
outliers
= df_outliers
.select
(*['id'] + [((df_outliers
[c
] < bounds
[c
][0]) |
(df_outliers
[c
] > bounds
[c
][1])).alias
(c
+ '_o')] for c
in cols
).show
()
df_outliers
= df_outliers
.join
(outliers
,on
='id')
df_outliers
.filter('weight_o').select
('id','weight').show
()
df_outliers
.filter('age_o').select
('id','age').show
()