3.2.4 spark体系之分布式计算-spark-core之离线计算-计算WordCount(java版+scala版本)

it2024-10-31  9

目录

一、java实现方式-eclipse

1.1 新建java项目,导包

1.2 新建类JavaSparkWordCount

1.3 新建words.txt,运行程序

三、scala实现-IDEA创建spark项目

3.1 创建maven项目

3.2 导入pom依赖

3.3 新建scala文件夹,作为资源根目录

3.4 代码部分

3.4.1 详细代码

3.4.2 一行代码实现wordcount


一、java实现方式-eclipse

1.1 新建java项目,导包

需要导包  spark-assembly-1.6.0-hadoop2.6.0

并且build path一下:

1.2 新建类JavaSparkWordCount

package com.sdnu.test; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; /** * @author GONG * @version 1.0 * @date 2020/10/29 11:09 */ public class WordCount1 { public static void main(String[] args) { // conf 运行模式,应用名称 SparkConf conf = new SparkConf(); conf.setAppName("wordcount1").setMaster("local"); // JavaSparkContext sc是通往集群的唯一通道 JavaSparkContext sc = new JavaSparkContext(conf); // textFile 读取文件,一行一行读取 JavaRDD<String> lines = sc.textFile("data/words.txt", 1); /* * flatMap 进一条数据出多条数据,一对多关系--- new FlatMapFunction<String, String>表示进来String,出去string * */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); /* * 在java中 如果想让某个RDD转换成K,V格式 使用xxxToPair K,V格式的RDD:JavaPairRDD<String, Integer> * */ JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s, 1); } }); /* * reduceByKey 1.先将相同的key分组 2.对每一组的key对应的value去按照你的逻辑去处理 * */ JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer n1, Integer n2) throws Exception { return n1 + n2; } }); /* * 直接从RDD阶段开始遍历 * */ counts.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tuple) throws Exception { System.out.println(tuple); } }); // 将结果进行收集 List<Tuple2<String, Integer>> result = counts.collect(); // 遍历结果 for (Tuple2 t : result) { System.out.println(t._1 + " " + t._2); } sc.stop(); } }

1.3 新建words.txt,运行程序

 

三、scala实现-IDEA创建spark项目

Spark Shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

3.1 创建maven项目

new project ---maven----填写项目名称----finish

3.2 导入pom依赖

build里面自动增加scala的支持,不用专门add framework

<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>2.4.5</version> </dependency> </dependencies> <build> <finalName>WordCount</finalName> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>

3.3 新建scala文件夹,作为资源根目录

后面新建包,新建scala类。

3.4 代码部分

3.4.1 详细代码

package com.ucas.test import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author GONG * @date 2020/10/30 11:40 * @version 1.0 */ object WordCount3 { def main(args: Array[String]): Unit = { val conf = new SparkConf(); conf.setAppName("wordcount3").setMaster("local") val sc = new SparkContext(conf); val lines: RDD[String] = sc.textFile("data/words.txt") val word: RDD[String] = lines.flatMap(line => { line.split(" ") }) val wordWithNum: RDD[(String, Int)] = word.map(word => { new Tuple2(word, 1) }) val wordWithCount: RDD[(String, Int)] = wordWithNum.reduceByKey((n1, n2) => { n1 + n2 }) wordWithCount.foreach(one => { println(one) }) sc.stop() } }

3.4.2 一行代码实现wordcount

package com.ucas.test import org.apache.spark.{SparkConf, SparkContext} /** * @author GONG * @date 2020/10/30 11:29 * @version 1.0 */ object WordCount2 { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("wordcount").setMaster("local"); val sc = new SparkContext(conf); sc.textFile("data/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(println) sc.stop() } }

敲代码的乔帮主 认证博客专家 数据分析 大数据工程师 java软件开发 走在梦想的大道上,跌跌撞撞,时而悲伤,时而难过失落,是什么又让我扬起头继续向前呢?是心中的那一朵彼岸之花,牵使着你我之间的约定,成为我心中那一股永不可灭的骨气,终究是信仰。
最新回复(0)