关注公众号:登峰大数据,阅读Spark实战第二版(完整中文版),系统学习Spark3.0大数据框架!
如果您觉得作者翻译的内容有帮助,请分享给更多人。您的分享,是作者翻译的动力!
本章涵盖了
利用Spark的高效"懒惰(延迟计算)"特点的好处
传统方法与Spark方法构建数据应用程序
使用Spark构建优秀的数据中心应用程序
学习更多关于transformations 和actions的知识
使用Catalyst--Spark的内置优化器
有向无环图简介
这一章不仅仅学习Spark延迟计算的特点。通过示例和实验,本文还介绍了使用传统方法构建数据应用程序和使用Spark构建数据应用程序之间的基本区别。
懒惰(延迟计算)至少有两种:一种是当你承诺要做别的事情时就睡在树下;另一种是提前考虑以最聪明的方式完成工作。尽管此时此刻,我的思绪正想着躺在树荫下,很大程度上受到科西嘉岛的阿斯特里克斯的启发,但在这一章我将展示Spark如何通过优化它的工作量来让你的工作更轻松。您将了解transformation(数据处理的每个步骤)和action(完成工作的触发器)两个基本角色。
您将使用来自美国国家卫生统计中心的真实数据集。该应用程序旨在说明Spark在处理数据时经过的推理(推断)。本章只关注一个应用程序,但它包含三种执行模式,它们对应于您将运行的三个实验,以更好地了解Spark的“思维方式”。
我从Java的角度介绍了transformations 和actions。
最后,您将深入了解Spark的内置优化器Catalyst。与RDBMS查询优化器一样,它可以转储查询计划,这对于调试非常有用。您将学习如何分析其输出。
附录一是本章的参考资料;它包含transformation列表和action列表。
本章的实验示例可在GitHub中获得:https://github.com/jgperrin/net.jgp.books.spark.ch04。
大多数时候,懒惰与消极的行为联系在一起。当我提到懒惰时,你可能会立刻想到在树下午睡,闲逛而不是工作。但是还有其他的吗?在这一部分,我们来看看懒惰和聪明之间是否有联系。
我一直在想,聪明的人是不是比别人更懒。这个理论基于这样一个观点:聪明的人在做某件事之前会考虑得更多。
让我们用下面的方法来考虑这个问题。你的老板(或产品经理)要求你为产品构建1.1版本,然后修改了需求,略有变化。现在您将构建版本1.2。最后,他要求回到第一个版本,并对第一个版本的特性做一些轻微的修改。这是1.3版本。当然,这是一部虚构的作品。名称、业务、事件、敏捷的借口和特性都是我想象的产物。任何与真实的人,活着的或死去的,或真实事件的相似之处纯属巧合。这种事也没发生在你身上,对吧?然而,图4.1说明了思维过程。
图4.1 在构建伟大产品特性的下一个版本时,按照老板或产品经理的要求执行。
图4.2 有时,根据产品经理的需求,即使完成了步骤1和步骤2,倒回第一个版本并重新开始的情况时有发生,这取决于产品经理的需求。
另一种方法是回到1.0版本并从那里进行修改,如图4.2所示。
这可能就是为什么像Git(甚至cvs或sccs)这样的源代码控制工具被发明出来的原因。
如果你事先知道你老板的变更请求,你会同意第二种工作方式,如图4.2所示,会更可取,对吗?这是一种更"懒",但更聪明的方式。在下一节中,将看到如何将其应用于Spark。实际上,即使在开始之前不知道v1.3中的特性,从v1.0开始仍然可能更安全,只是为了确保不会出现bug。
在上一节中,你了解了聪明的懒惰在你的“今天”生活中会是什么样子。本节通过一个特定的示例将其置换到Apache Spark。
最重要的是,你会明白为什么Spark是懒惰的,为什么它对你有好处。您还将执行以下操作:
理解您将运行的实验,以理解Spark的transformations 和actions
观察几个transformations 和actions的结果
看看实验背后的代码
仔细分析结果,看看时间都花在哪里了
transformations 和actions是Spark的面包和黄油。在本节中,您将为将要执行的实验设置上下文。在本例中,您将加载数据集并度量性能,以了解工作在何处完成。
在本书中,我用show()方法结束大多数示例。这是一种快速查看结果的有效方法,但它并不是典型的最终目标。collect操作(代码中的collect())允许将dataframe作为Java list对象返回,允许进一步处理,如创建报告、发送电子邮件等。这通常是确定应用程序时的最后操作之一。在Spark的词汇表中,这称为一个action。
为了理解transformations 和actions的概念,以及通过懒惰操作的概念,您将在实验#200中执行三个实验:
实验1-加载数据集并执行collect action。
实验2 -加载数据集,执行转换(通过复制和数学操作创建三列),并执行collect action。
实验3 -加载数据集,执行转换(通过复制和数学操作创建三列),删除新创建的列,并执行collect action。这个实验将说明Spark的"懒惰"特点。
数据集包含大约250万条记录。具体结果见表4.1。
表4.1分析transformations 和actions的三个实验的结果
我希望你看到表4.1中的一些结果会感到奇怪。如果没有,这里有一些提示:
通过转换,您只需在182毫秒内创建三个包含250万条记录的列,即大约750万个数据集。这很快,不是吗?
在执行action时,如果不进行任何转换,则该操作大约需要21秒。如果您创建三列,操作将花费34秒。但是如果您创建并删除列,这个操作大约需要25秒。那不是很奇怪吗?正如你已经猜到的,这是由于懒惰。
让我们更详细地看看这个过程,以及可以构建来填充表4.1的代码。这样你就有了解决这两个谜团的线索。
在上一节中,您看到了转换过程的结果以及结果中的异常。在本节中,在查看代码并深入探究奥秘之前,您将更详细地探索这个过程。
这个过程本身非常简单。你可以这样做:
获得一个新的Spark session。
加载数据集。在本例中,您将使用来自美国国家卫生统计中心(NCHS)的数据,该中心是疾病控制和预防中心(CDC)的一个部门,可通过www.cdc.gov/nchs/index.htm访问。数据集包含了美国每个县和州每年的平均出生率。数据集包含在实验室的数据目录中。
将数据集复制几次,使其更大一些。示例中文件包含40,781条记录。这个数字有点低:当你研究一个特定的机制或过程时,你可能会看到副作用。Spark设计用于分布式处理大量记录,而不仅仅是40000条。因此,您将通过执行数据集与自身的联合来增加数据集。我知道这没有太多的商业价值,但是我没有发现一个更大但又不太大的数据集(打破了GitHub的100mb限制)。
清理。在外部数据集上总是有一些清理工作要做。在本例中,您将重命名一些列。
执行转换。它们分为三种类型:不转换、创建附加列,以及最后创建列和删除。
最后,采取action。
图4.3说明了该过程。注意,转换将在步骤5中运行,这将根据我们的三个实验而有所不同。
图4.3 转换过程包括六个主要步骤。前四个步骤是数据集的准备;然后您可以执行转换,最后执行action操作。
表4.2详细说明了数据集的结构。您正在使用的NCHS数据集包含了2003年至2015年美国15至19岁青少年的出生率,按县划分。这个数据集的基调有点严肃,但是在本书的过程中,我想使用有意义的和真实的数据集。每个数据集都是一个基础,你也可以在本章之外使用它。
表4.2本章使用的NHCS出生率数据集结构
在这个数据集中,最后两列分别是置信下限和置信上限。这些表明了对出生率的置信水平。
用FIPS规范政府数据
联邦信息处理标准(FIPS)是由美国联邦政府为非军事政府机构和政府承包商在计算机系统中使用而开发的公开公布的标准。
你可以在国家标准和技术协会(NIST)的网站(www.nist.gov/itl/ fips-generalinformation)和维基百科(https:// wikipedia.org/wiki/federal_information_processing _Standards)上了解更多关于FIPS的信息。
置信限度是统计数据的一部分
你可能已经知道,本书中使用的所有大型数据集都来自真实世界,而不是为了学习而编造的。因此,有时您会发现奇怪的(或不寻常的)术语。
此数据集包含置信下限和置信上限。置信极限是置信区间上、下限的数字。区间是从观测数据的统计数据计算出来的,应该包含一个基于观测数据的未知总体参数的真实值。你不需要计算它;它是您正在使用的数据集的一部分。
关于可信限度和可信区间的更多解释,请参见John H. McDonald的《生物统计手册》(Sparky House Publishing, 2014, www.biostathandbook.com/confidence ence.html)和维基百科)。
在前面几节中,您看到了这个过程的结果(表4.1),了解了实验中使用的过程的细节,并查看了数据集的结构。现在,是时候看看代码以及如何运行它了。
您将在命令行上使用三个不同的参数执行该代码三次。代码将根据参数自动调整。使用命令行可以更容易地链接命令、多次运行测试以比较结果、平均结果,以及更容易地尝试其他平台。您将使用Maven。如有需要,请检查附录B的安装。
本节不是对如何执行基准的科学描述;然而,它说明了时间花在哪里。
您可以使用命令行参数在一个命令中运行这三个实验。您将运行clean和编译/安装,然后运行每个实验。第一个实验是默认的,所以它不需要参数。命令行是这样的:
mvn clean install && mvn exec:exec && \ mvn exec:exec -DexecMode=COL && \ mvn exec:exec -DexecMode=FULL
执行结果如下:
...//第一个实验:执行不包括transformation的操作 [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ spark-chapter04 --- 1. Creating a session .............. 1791 //准备时间 2. Loading initial dataset ......... 3287 3. Building full dataset ........... 242 4. Clean-up ........................ 8 5. Transformations ................ 0 //transformation 6. Final action .................... 20770 //action # of records ....................... 2487641 ...//第二个实验:创建列 [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ spark-chapter04 --- 1. Creating a session .............. 1553 2. Loading initial dataset ......... 3197 3. Building full dataset ........... 208 4. Clean-up ........................ 8 5. Transformations ................ 182 6. Final action .................... 34061 # of records ....................... 2487641 ...//第三个实验:整个过程——列的创建和删除 [INFO] --- exec-maven-plugin:1.6.0:exec (default-cli) @ spark-chapter04 --- 1. Creating a session .............. 1903 2. Loading initial dataset ......... 3184 3. Building full dataset ........... 213 4. Clean-up ........................ 8 5. Transformations ................ 205 6. Final action .................... 24909 # of records ....................... 2487641 //整个Maven进程的总执行时间,可以忽略 ... [INFO] Total time: 37.659 s
如果您想构建与表4.1相同的表,您可以复制Microsoft Excel中的值。Excel表格附在项目上。它被称为Analysis results.xlsx,位于本章存储库的data文件夹中。
清单4.1有点长,但应该不难理解。
main()方法将确保向start()方法传递一个参数,所有工作都在该方法中进行。预期参数如下(参数不区分大小写):
noop表示无操作/转换,用于实验1
col用于创建列,在实验2中使用
full为整个过程,用于实验3
start()方法将创建一个会话、读取文件、增加数据集,并在准备阶段执行一些清理工作。然后将执行转换和操作(transformations 和action)。
实验说明 实验200在 net.jgp.books.spark.ch04.lab200_transformation_and_action包中。应用程序是TransformationAndActionApp.java。
package net.jgp.books.spark.ch04.lab200_transformation_and_action; import static org.apache.spark.sql.functions.expr;//使用此函数计算列上的表达式。 import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class TransformationAndActionApp { public static void main(String[] args ) { TransformationAndActionApp app = new TransformationAndActionApp(); String mode = "noop" ;//确保有一个参数要传递给start()。 if ( args . length != 0) { mode = args [0]; } app .start( mode ); }
第一步是获得一个会话,像往常一样:
private void start(String mode ) { long t0 = System.currentTimeMillis(); SparkSession spark = SparkSession.builder() .appName( "Analysing Catalyst's behavior" ) .master( "local" ) .getOrCreate(); long t1 = System.currentTimeMillis(); System. out .println( "1. Creating a session ........... " + ( t1 - t0 ));
第二步是从CSV文件中读取数据:
Dataset<Row> df = spark .read().format( "csv" ) .option( "header" , "true" ) .load( "data/NCHS_-_Teen_Birth_Rates_for_Age_Group_15-19_in_the_United ➥ _States_by_County.csv" ); Dataset<Row> initalDf = df ; long t2 = System.currentTimeMillis(); System. out .println( "2. Loading initial dataset ...... " + ( t2 - t1 ));
在第三步中,将dataframe与其自身结合以创建一个更大的数据集(否则Spark会运行太快,无法衡量"懒惰"的好处):
for ( int i = 0; i < 60; i ++) { df = df .union( initalDf ); } long t3 = System.currentTimeMillis(); System. out .println( "3. Building full dataset ........ " + ( t3 - t2 ));
第四步 重命名列:
df = df .withColumnRenamed( "Lower Confidence Limit" , "lcl" ); df = df .withColumnRenamed( "Upper Confidence Limit" , "ucl" );//基本清理:使列名更短,便于操作 long t4 = System.currentTimeMillis(); System. out .println( "4. Clean-up ..................... " + ( t4 - t3 ));
第五步 是实际的数据转换,有不同的模式:
if ( mode .compareToIgnoreCase( "noop" ) != 0) { df = df .withColumn( "avg" , expr( "(lcl+ucl)/2" )) .withColumn( "lcl2" , df .col( "lcl" )) .withColumn( "ucl2" , df .col( "ucl" )); if ( mode .compareToIgnoreCase( "full" ) == 0) { df = df .drop( df .col( "avg" )) .drop( df .col( "lcl2" )) .drop( df .col( "ucl2" )); } } long t5 = System.currentTimeMillis(); System. out .println( "5. Transformations ............. " + ( t5 - t4 ));
第六步 是应用程序的最后一步,它调用action:
df .collect(); long t6 = System.currentTimeMillis(); System. out .println( "6. Final action ................. " + ( t6 - t5 )); System. out .println( "" ); System. out .println( "# of records .................... " + df .count()); } }
collect()操作返回一个对象。该方法的完整签名如下:
Object collect()
您可以忽略返回值,因为对进一步的处理不感兴趣。但是,在非基准测试的情况下,很可能会使用返回值。在接下来的部分中,您将进一步了解发生了什么。
(未完待续......) 欢迎关注公众号,及时获得最新翻译内容: