目录
一、java实现方式-eclipse
1.1 新建java项目,导包
1.2 新建类JavaSparkWordCount
1.3 新建words.txt,运行程序
三、scala实现-IDEA创建spark项目
3.1 创建maven项目
3.2 导入pom依赖
3.3 新建scala文件夹,作为资源根目录
3.4 代码部分
3.4.1 详细代码
3.4.2 一行代码实现wordcount
一、java实现方式-eclipse
1.1 新建java项目,导包
需要导包 spark-assembly-1.6.0-hadoop2.6.0
并且build path一下:
1.2 新建类JavaSparkWordCount
package com.sdnu.test;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* @author GONG
* @version 1.0
* @date 2020/10/29 11:09
*/
public class WordCount1 {
public static void main(String[] args) {
// conf 运行模式,应用名称
SparkConf conf = new SparkConf();
conf.setAppName("wordcount1").setMaster("local");
// JavaSparkContext sc是通往集群的唯一通道
JavaSparkContext sc = new JavaSparkContext(conf);
// textFile 读取文件,一行一行读取
JavaRDD<String> lines = sc.textFile("data/words.txt", 1);
/*
* flatMap 进一条数据出多条数据,一对多关系--- new FlatMapFunction<String, String>表示进来String,出去string
* */
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
/*
* 在java中 如果想让某个RDD转换成K,V格式 使用xxxToPair K,V格式的RDD:JavaPairRDD<String, Integer>
* */
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
/*
* reduceByKey 1.先将相同的key分组 2.对每一组的key对应的value去按照你的逻辑去处理
* */
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer n1, Integer n2) throws Exception {
return n1 + n2;
}
});
/*
* 直接从RDD阶段开始遍历
* */
counts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
System.out.println(tuple);
}
});
// 将结果进行收集
List<Tuple2<String, Integer>> result = counts.collect();
// 遍历结果
for (Tuple2 t : result) {
System.out.println(t._1 + " " + t._2);
}
sc.stop();
}
}
1.3 新建words.txt,运行程序
三、scala实现-IDEA创建spark项目
Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。
3.1 创建maven项目
new project ---maven----填写项目名称----finish
3.2 导入pom依赖
build里面自动增加scala的支持,不用专门add framework
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
</dependencies>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3.3 新建scala文件夹,作为资源根目录
后面新建包,新建scala类。
3.4 代码部分
3.4.1 详细代码
package com.ucas.test
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author GONG
* @date 2020/10/30 11:40
* @version 1.0
*/
object WordCount3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf();
conf.setAppName("wordcount3").setMaster("local")
val sc = new SparkContext(conf);
val lines: RDD[String] = sc.textFile("data/words.txt")
val word: RDD[String] = lines.flatMap(line => {
line.split(" ")
})
val wordWithNum: RDD[(String, Int)] = word.map(word => {
new Tuple2(word, 1)
})
val wordWithCount: RDD[(String, Int)] = wordWithNum.reduceByKey((n1, n2) => {
n1 + n2
})
wordWithCount.foreach(one => {
println(one)
})
sc.stop()
}
}
3.4.2 一行代码实现wordcount
package com.ucas.test
import org.apache.spark.{SparkConf, SparkContext}
/**
* @author GONG
* @date 2020/10/30 11:29
* @version 1.0
*/
object WordCount2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("wordcount").setMaster("local");
val sc = new SparkContext(conf);
sc.textFile("data/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(println)
sc.stop()
}
}
敲代码的乔帮主
认证博客专家
数据分析
大数据工程师
java软件开发
走在梦想的大道上,跌跌撞撞,时而悲伤,时而难过失落,是什么又让我扬起头继续向前呢?是心中的那一朵彼岸之花,牵使着你我之间的约定,成为我心中那一股永不可灭的骨气,终究是信仰。