背景:
java项目中存在excel这样的资源文件,处理方法是将excel通过hadoop fs -copyFromLocal方式上传到hdfs文件系统上。
hdfs有两个namenode,所以访问路径需要设置成高可用的路径
处理代码:
声明spark环境
val sparkConf = new SparkConf()
.setAppName("FieldRelativeApplication")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
设置高可用的配置
spark.sparkContext.hadoopConfiguration.set("dfs.nameservices", "node")
spark.sparkContext.hadoopConfiguration.set("fs.defaultFS", "hdfs://node")
spark.sparkContext.hadoopConfiguration.set("dfs.ha.namenodes.node", "namenode1,namenode2")
spark.sparkContext.hadoopConfiguration.set("dfs.namenode.rpc-address.node.namenode1", "node1:8020")
spark.sparkContext.hadoopConfiguration.set("dfs.namenode.rpc-address.node.namenode2", "node2:8020")
spark.sparkContext.hadoopConfiguration.set("dfs.client.failover.proxy.provider.node", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
里面的参数和值在对应的hdfs-site.xml里面有,需要找到自己对应的文件一一填入
这样做就不用将hdfs-site.xml,core-site.xml和hive-site.xml文件放在resource目录下了(放在目录下之后可能打包运行的时候会报很多奇奇怪怪的错误)
测试
测试是否存在
private static final String HDFS_PATH = "hdfs://node";
private static final String HDFS_USER = "root";
private static FileSystem fileSystem;
/**
* 获取fileSystem
*/
@Before
public void prepare() {
try {
Configuration configuration = new Configuration();
// 这里我启动的是单节点的Hadoop,副本系数可以设置为1,不设置的话默认值为3
configuration.set("dfs.replication", "3");
fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
@Test
public void exist() throws Exception {
// excel路径为hdfs://node/user/test/analysis/data.xlsx
boolean exists = fileSystem.exists(new Path("/user/test/analysis/data.xlsx"));
System.out.println(exists);
}
调用
val Excel_DF = readFromExcel(spark, "hdfs://node/user/test/analysis/data.xlsx")
def readFromExcel(spark: SparkSession, file: String): DataFrame = {
spark.sqlContext.read
.format("com.crealytics.spark.excel")
.option("path", file)
.option("useHeader", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("addColorColumns", "False")
.option("sheetName", "Sheet1")
.option("maxRowsInMemory", 20)
.load()
.toDF()
}