在使用javaAPI进行hdfs的操作时,需要导入响应的jar包,这里使用maven统一管理,给出xml配置文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>hdfs_OperateTest</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <goals> <goal>java</goal> </goals> </execution> </executions> <configuration> <classpathScope>test</classpathScope> </configuration> </plugin> </plugins> </build> <properties> <!-- 文件拷贝时的编码 --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!-- 编译时的编码 --> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> </properties> </project>这里给出自己练习hdfs读写的代码:
package MyTest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.Text; import java.io.IOException; import java.io.PrintStream; import java.net.URI; public class hdfsOperate { Path inputPath = null; // 读取文件地址 Path outputPath = null; // 输出文件地址 Configuration conf = new Configuration(); //这里获取本地hadoop位置配置信息 public hdfsOperate(String input,String output){ this.inputPath = new Path(input); this.outputPath = new Path(output); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); } public void FileRead() throws IOException{ //通过FileSystem获取文件 FileSystem fsRead = FileSystem.get(URI.create(inputPath.toString()),conf); //获取文件信息 FileStatus sta = fsRead.getFileStatus(inputPath); //输出文件信息 System.out.print("路径:" + sta.getPath() + " 文件大小:" + sta.getLen() + " 权限:" + sta.getPermission() + " 内容:\n"); //打开文件 FSDataInputStream fsdis = fsRead.open(sta.getPath()); PrintStream ps = new PrintStream(System.out); //控制输出到控制台 byte[] data = new byte[1024]; int read = -1; while ((read = fsdis.read(data))>0){ ps.write(data,0,read); } fsdis.close(); } public void FileWrite(Text text) throws IOException{ //获取文件 FileSystem fsWrite = FileSystem.get(URI.create(outputPath.toString()),conf); //创建文件 FSDataOutputStream fsdos = fsWrite.create(outputPath); //将text写入文件 fsdos.write(text.copyBytes()); fsdos.close(); System.out.print("创建成功!\n"); } public void FileDelete() throws IOException{ FileSystem fsDe = FileSystem.get(URI.create(outputPath.toString()),conf); boolean isDelete = fsDe.delete(outputPath,false); System.out.print("删除成功!\n"); } public static void main(String[] args) throws IOException { hdfsOperate hdfsop = new hdfsOperate( "hdfs://localhost:9000/user/hadoop/merge.txt", //需要读取的文件 "hdfs://localhost:9000/user/hadoop/newFile.txt"); //创建的文件 hdfsop.FileRead(); Text text = new Text("This is a new creat File!"); //文件添加的内容 hdfsop.FileWrite(text); } }输出: 这里可能会出现一个连接失败的问题:大致是JavaAPI 拒绝连接 (没有截图过来) 就一片爆红,会发现是文件打开那的问题。 查看一下端口 : netstat -nultp 看看有没有打开9000端口。
这个问题一般是由于hadoop打开时没有开启9000端口造成的,需要本机打开端口才能使得外部设备设施通过端口进行文件的访问。
这里需要找到hadoop目录下 修改 ./etc/hadoop/core-site.xml
在里面添加:
<property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property>关闭hadoop 重新打开就可以了。
netstat -nultp 再看一下
参考 林子雨老师的文件内容合并代码:
package FileOperate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.IOException; import java.io.PrintStream; import java.net.URI; /* 现在要执行的任务是:假设在目录“hdfs://localhost:9000/user/hadoop”下面有几个文件, 分别是file1.txt、file2.txt、file3.txt、file4.abc和file5.abc,这里需要从该目录 中过滤出所有后缀名不为“.abc”的文件,对过滤之后的文件进行读取,并将这些文件的内容合并到 文件“hdfs://localhost:9000/user/hadoop/merge.txt”中。 */ /** * 过滤掉文件名满足特定条件的文件 */ class MyPathFilter implements PathFilter{ String reg = null; MyPathFilter(String reg){ this.reg = reg; } public boolean accept(Path path){ return !(path.toString().matches(reg)); } } /*** * 利用FSDataOutputStream和FSDataInputStream合并HDFS中的文件 */ public class MergeFile { Path inputPath = null; //待合并文件所在目录路径 Path outputPath = null; //输出文件路径 public MergeFile(String input,String output){ this.inputPath = new Path(input); this.outputPath = new Path(output); } public void doMerge() throws IOException{ Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()),conf); FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()),conf); //过滤掉目录中后缀为abc的文件 FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new MyPathFilter(".*\\.abc")); FSDataOutputStream fsdos = fsDst.create(outputPath); PrintStream ps = new PrintStream(System.out); //下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中 for(FileStatus sta:sourceStatus){ System.out.print("路径:" + sta.getPath() + " 文件大小:" + sta.getLen() + " 权限:" + sta.getPermission() + " 内容:\n"); FSDataInputStream fsdis = fsSource.open(sta.getPath()); byte[] data = new byte[1024]; int read = -1; while ((read = fsdis.read(data))>0){ ps.write(data,0,read); fsdos.write(data,0,read); } fsdis.close(); } ps.close(); fsdos.close(); } public static void main(String[] args) throws IOException { MergeFile merge = new MergeFile( "hdfs://localhost:9000/user/hadoop/", "hdfs://localhost:9000/user/hadoop/merge.txt"); merge.doMerge(); } }标签:(hadoop无法连接、hadoop拒绝访问、9000端口未打开、eclipse无法连接Hadoop、eclipse拒绝连接)