spark关于hdfs上配置文件的高可用访问配置

it2026-01-08  7

背景:

java项目中存在excel这样的资源文件,处理方法是将excel通过hadoop fs -copyFromLocal方式上传到hdfs文件系统上。

hdfs有两个namenode,所以访问路径需要设置成高可用的路径

处理代码:

声明spark环境

val sparkConf = new SparkConf() .setAppName("FieldRelativeApplication") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setMaster("local[*]") val spark = SparkSession.builder() .config(sparkConf) .getOrCreate()

设置高可用的配置

spark.sparkContext.hadoopConfiguration.set("dfs.nameservices", "node") spark.sparkContext.hadoopConfiguration.set("fs.defaultFS", "hdfs://node") spark.sparkContext.hadoopConfiguration.set("dfs.ha.namenodes.node", "namenode1,namenode2") spark.sparkContext.hadoopConfiguration.set("dfs.namenode.rpc-address.node.namenode1", "node1:8020") spark.sparkContext.hadoopConfiguration.set("dfs.namenode.rpc-address.node.namenode2", "node2:8020") spark.sparkContext.hadoopConfiguration.set("dfs.client.failover.proxy.provider.node", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")

里面的参数和值在对应的hdfs-site.xml里面有,需要找到自己对应的文件一一填入

这样做就不用将hdfs-site.xml,core-site.xml和hive-site.xml文件放在resource目录下了(放在目录下之后可能打包运行的时候会报很多奇奇怪怪的错误)

测试

测试是否存在

private static final String HDFS_PATH = "hdfs://node"; private static final String HDFS_USER = "root"; private static FileSystem fileSystem; /** * 获取fileSystem */ @Before public void prepare() { try { Configuration configuration = new Configuration(); // 这里我启动的是单节点的Hadoop,副本系数可以设置为1,不设置的话默认值为3 configuration.set("dfs.replication", "3"); fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); } } @Test public void exist() throws Exception { // excel路径为hdfs://node/user/test/analysis/data.xlsx boolean exists = fileSystem.exists(new Path("/user/test/analysis/data.xlsx")); System.out.println(exists); }

调用

val Excel_DF = readFromExcel(spark, "hdfs://node/user/test/analysis/data.xlsx") def readFromExcel(spark: SparkSession, file: String): DataFrame = { spark.sqlContext.read .format("com.crealytics.spark.excel") .option("path", file) .option("useHeader", "true") .option("treatEmptyValuesAsNulls", "true") .option("inferSchema", "true") .option("addColorColumns", "False") .option("sheetName", "Sheet1") .option("maxRowsInMemory", 20) .load() .toDF() }

 

最新回复(0)