HDFS Client操作与IO操作

it2026-01-10  7

1. 配置Maven 开发环境

1.1 导入Maven依赖

<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.2</version> </dependency> </dependencies>

1.2 引入hdfs配置文件

代码中若需要修改hdfs集群配置,需要在resources中配置对应文件;

2. Client 操作

package boe.b11.cim; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.net.URI; /** * @Des Hdfs 客户端操作 * @Author Add By Zhao.J * @Date 2020/10/22 13:27 */ public class HdfsClient { /** * 解析hdfs-sitexml和core-site.xml */ private static Configuration configuration; private static FileSystem fileSystem; /** * 1. 链接集群true表示会加载配置文件 */ @Before public void connect() throws IOException, InterruptedException { configuration = new Configuration(true); // 以下三种方式都可以解析配置文件 /// fileSystem = FileSystem.get(configuration); /// fileSystem = FileSystem.get(URI.create("hdfs://mycluster"), configuration); fileSystem = FileSystem.get(URI.create("hdfs://192.168.206.131:9000"), configuration, "hadoop"); } /** * @Des 2. 创建目录 * @Author Add By Zhao.J * @Date 2020/10/22 13:56 */ @Test public void testMkdirs() throws IOException{ Path dir = new Path("/tmp"); if(fileSystem.exists(dir)){ fileSystem.delete(dir, true); } fileSystem.mkdirs(dir); } /** * @Des 3. 执行上传操作 * @Author Add By Zhao.J * @Date 2020/10/22 14:13 */ @Test public void testCopyFromLocalFile() throws IOException{ // 参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的用户自定义配置文件 >(3)然后是服务器的默认配置 configuration.set("dfs.replication", "2"); fileSystem.copyFromLocalFile(new Path("d:/tnsnames.ora"),new Path("/tnsnames.ora")); } /** * @Des 4. 执行下载操作 * @Author Add By Zhao.J * @Date 2020/10/22 14:13 */ @Test public void testCopyToLocalFile() throws IOException{ // boolean delSrc 指是否将原文件删除 // Path src 指要下载的文件路径 // Path dst 指将文件下载到的路径 fileSystem.copyToLocalFile(false,new Path("/tnsnames.ora"),new Path("e:/tnsnames.ora"),true); } /** * @Des 5.删除文件夹/文件夹 * @Author Add By Zhao.J * @Date 2020/10/22 14:17 */ @Test public void testDelete() throws IOException{ fileSystem.delete(new Path("/axis.log"),true); } /** * @Des 6. 修改文件名/文件夹名 * @Author Add By Zhao.J * @Date 2020/10/22 14:19 */ @Test public void testRename() throws IOException{ fileSystem.rename(new Path("/tnsnames.ora"),new Path("/testtnsnames.ora")); } /** * @Des 7. 文件详情查看 * @Author Add By Zhao.J * @Date 2020/10/22 14:46 */ @Test public void testListFiles() throws IOException{ RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"),true); while (listFiles.hasNext()){ LocatedFileStatus status = listFiles.next(); // 文件名称 System.out.println(status.getPath().getName()); // 长度 System.out.println(status.getLen()); // 权限 System.out.println(status.getPermission()); // 分组 System.out.println(status.getGroup()); // 获取存储的块信息 BlockLocation[] blockLocations = status.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) { // 获取块存储的主机节点 String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.println(host); } } System.out.println("---------------------"); } } /** * 关闭连接 */ @After public void close() throws IOException { fileSystem.close(); System.out.println("Running Over..."); } }

3. I/O流操作

package boe.b11.cim; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; /** * @Des I/O流操作 * @Author Add By Zhao.J * @Date 2020/10/22 14:55 */ public class HdfsIo { private static Configuration configuration; private static FileSystem fileSystem; private static FileInputStream fis; private static FileOutputStream fos; private static FSDataInputStream hdfsFis; private static FSDataOutputStream hdfsFos; @Before public void connect() throws IOException, InterruptedException { configuration = new Configuration(true); fileSystem = FileSystem.get(URI.create("hdfs://192.168.206.131:9000"), configuration, "hadoop"); } /** * @Des 1. 文件上传 * @Author Add By Zhao.J * @Date 2020/10/22 15:01 */ @Test public void putFileToHdfs() throws IOException{ // 1. 获取输入流 fis = new FileInputStream(new File("d:/axis.log")); // 2. 获取输出流 hdfsFos = fileSystem.create(new Path("/axis.log")); // 3. 流对拷 这是hadoop给我们提供的方便的io操作工具类,最后一个boolean代表执行完这句要不要关闭io流 IOUtils.copyBytes(fis,hdfsFos,configuration,true); } /** * @Des 2. 文件下载 * @Author Add By Zhao.J * @Date 2020/10/22 15:01 */ @Test public void getFileFromHdfs() throws IOException{ hdfsFis = fileSystem.open(new Path("/axis.log")); fos = new FileOutputStream(new File("e:/axis.log")); IOUtils.copyBytes(hdfsFis,fos,configuration,true); } @After public void close() throws IOException { fileSystem.close(); System.out.println("Running Over..."); } }

 

最新回复(0)