1. 配置Maven 开发环境
1.1 导入Maven依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
</dependencies>
1.2 引入hdfs配置文件
代码中若需要修改hdfs集群配置,需要在resources中配置对应文件;
2. Client 操作
package boe.b11.cim;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
/**
* @Des Hdfs 客户端操作
* @Author Add By Zhao.J
* @Date 2020/10/22 13:27
*/
public class HdfsClient {
/**
* 解析hdfs-sitexml和core-site.xml
*/
private static Configuration configuration;
private static FileSystem fileSystem;
/**
* 1. 链接集群true表示会加载配置文件
*/
@Before
public void connect() throws IOException, InterruptedException {
configuration = new Configuration(true);
// 以下三种方式都可以解析配置文件
/// fileSystem = FileSystem.get(configuration);
/// fileSystem = FileSystem.get(URI.create("hdfs://mycluster"), configuration);
fileSystem = FileSystem.get(URI.create("hdfs://192.168.206.131:9000"), configuration, "hadoop");
}
/**
* @Des 2. 创建目录
* @Author Add By Zhao.J
* @Date 2020/10/22 13:56
*/
@Test
public void testMkdirs() throws IOException{
Path dir = new Path("/tmp");
if(fileSystem.exists(dir)){
fileSystem.delete(dir, true);
}
fileSystem.mkdirs(dir);
}
/**
* @Des 3. 执行上传操作
* @Author Add By Zhao.J
* @Date 2020/10/22 14:13
*/
@Test
public void testCopyFromLocalFile() throws IOException{
// 参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的用户自定义配置文件 >(3)然后是服务器的默认配置
configuration.set("dfs.replication", "2");
fileSystem.copyFromLocalFile(new Path("d:/tnsnames.ora"),new Path("/tnsnames.ora"));
}
/**
* @Des 4. 执行下载操作
* @Author Add By Zhao.J
* @Date 2020/10/22 14:13
*/
@Test
public void testCopyToLocalFile() throws IOException{
// boolean delSrc 指是否将原文件删除
// Path src 指要下载的文件路径
// Path dst 指将文件下载到的路径
fileSystem.copyToLocalFile(false,new Path("/tnsnames.ora"),new Path("e:/tnsnames.ora"),true);
}
/**
* @Des 5.删除文件夹/文件夹
* @Author Add By Zhao.J
* @Date 2020/10/22 14:17
*/
@Test
public void testDelete() throws IOException{
fileSystem.delete(new Path("/axis.log"),true);
}
/**
* @Des 6. 修改文件名/文件夹名
* @Author Add By Zhao.J
* @Date 2020/10/22 14:19
*/
@Test
public void testRename() throws IOException{
fileSystem.rename(new Path("/tnsnames.ora"),new Path("/testtnsnames.ora"));
}
/**
* @Des 7. 文件详情查看
* @Author Add By Zhao.J
* @Date 2020/10/22 14:46
*/
@Test
public void testListFiles() throws IOException{
RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path("/"),true);
while (listFiles.hasNext()){
LocatedFileStatus status = listFiles.next();
// 文件名称
System.out.println(status.getPath().getName());
// 长度
System.out.println(status.getLen());
// 权限
System.out.println(status.getPermission());
// 分组
System.out.println(status.getGroup());
// 获取存储的块信息
BlockLocation[] blockLocations = status.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 获取块存储的主机节点
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("---------------------");
}
}
/**
* 关闭连接
*/
@After
public void close() throws IOException {
fileSystem.close();
System.out.println("Running Over...");
}
}
3. I/O流操作
package boe.b11.cim;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @Des I/O流操作
* @Author Add By Zhao.J
* @Date 2020/10/22 14:55
*/
public class HdfsIo {
private static Configuration configuration;
private static FileSystem fileSystem;
private static FileInputStream fis;
private static FileOutputStream fos;
private static FSDataInputStream hdfsFis;
private static FSDataOutputStream hdfsFos;
@Before
public void connect() throws IOException, InterruptedException {
configuration = new Configuration(true);
fileSystem = FileSystem.get(URI.create("hdfs://192.168.206.131:9000"), configuration, "hadoop");
}
/**
* @Des 1. 文件上传
* @Author Add By Zhao.J
* @Date 2020/10/22 15:01
*/
@Test
public void putFileToHdfs() throws IOException{
// 1. 获取输入流
fis = new FileInputStream(new File("d:/axis.log"));
// 2. 获取输出流
hdfsFos = fileSystem.create(new Path("/axis.log"));
// 3. 流对拷 这是hadoop给我们提供的方便的io操作工具类,最后一个boolean代表执行完这句要不要关闭io流
IOUtils.copyBytes(fis,hdfsFos,configuration,true);
}
/**
* @Des 2. 文件下载
* @Author Add By Zhao.J
* @Date 2020/10/22 15:01
*/
@Test
public void getFileFromHdfs() throws IOException{
hdfsFis = fileSystem.open(new Path("/axis.log"));
fos = new FileOutputStream(new File("e:/axis.log"));
IOUtils.copyBytes(hdfsFis,fos,configuration,true);
}
@After
public void close() throws IOException {
fileSystem.close();
System.out.println("Running Over...");
}
}