Apache Spark - 快速指南


Apache Spark - 简介

各行业广泛使用 Hadoop 来分析其数据集。原因是 Hadoop 框架基于简单的编程模型 (MapReduce),它支持可扩展、灵活、容错且经济高效的计算解决方案。在这里,主要关注的是在查询之间的等待时间和运行程序的等待时间方面保持处理大型数据集的速度。

Spark 是由 Apache 软件基金会推出的,用于加速 Hadoop 计算软件进程。

与普遍看法相反,Spark 不是 Hadoop 的修改版本,并且实际上并不依赖 Hadoop,因为它有自己的集群管理。Hadoop只是Spark的实现方式之一。

Spark以两种方式使用Hadoop——一是存储,二是处理。由于 Spark 有自己的集群管理计算,因此它仅使用 Hadoop 进行存储。

阿帕奇火花

Apache Spark 是一种快如闪电的集群计算技术,专为快速计算而设计。它基于 Hadoop MapReduce,并扩展了 MapReduce 模型,以有效地将其用于更多类型的计算,其中包括交互式查询和流处理。Spark 的主要特点是它的内存集群计算,可以提高应用程序的处理速度。

Spark 旨在涵盖广泛的工作负载,例如批处理应用程序、迭代算法、交互式查询和流式处理。除了支持各个系统中的所有这些工作负载之外,它还减轻了维护单独工具的管理负担。

Apache Spark 的演变

Spark 是 Hadoop 的子项目之一,由 Matei Zaharia 于 2009 年在加州大学伯克利分校的 AMPLab 开发。它于 2010 年在 BSD 许可证下开源。它于 2013 年捐赠给 Apache 软件基金会,现在 Apache Spark 从 2014 年 2 月起已成为 Apache 顶级项目。

Apache Spark 的特点

Apache Spark 具有以下功能。

  • 速度- Spark 有助于在 Hadoop 集群中运行应用程序,在内存中运行速度提高 100 倍,在磁盘上运行时速度提高 10 倍。这可以通过减少磁盘读/写操作的数量来实现。它将中间处理数据存储在内存中。

  • 支持多种语言- Spark 提供 Java、Scala 或 Python 内置 API。因此,您可以用不同的语言编写应用程序。Spark 提供了 80 个用于交互式查询的高级运算符。

  • 高级分析- Spark 不仅支持“Map”和“reduce”。它还支持 SQL 查询、流数据、机器学习 (ML) 和图形算法。

基于 Hadoop 构建的 Spark

下图显示了如何使用 Hadoop 组件构建 Spark 的三种方法。

基于 Hadoop 构建的 Spark

Spark 部署有以下三种方式。

  • 独立- Spark 独立部署意味着 Spark 占据 HDFS(Hadoop 分布式文件系统)之上的位置,并且显式地为 HDFS 分配空间。在这里,Spark 和 MapReduce 将并行运行以覆盖集群上的所有 Spark 作业。

  • Hadoop Yarn - Hadoop Yarn 部署简单来说意味着 Spark 在 Yarn 上运行,无需任何预安装或 root 访问权限。它有助于将 Spark 集成到 Hadoop 生态系统或 Hadoop 堆栈中。它允许其他组件在堆栈顶部运行。

  • Spark in MapReduce (SIMR) - MapReduce 中的 Spark 除了独立部署之外还用于启动 Spark 作业。借助 SIMR,用户可以启动 Spark 并使用其 shell,而无需任何管理访问权限。

Spark的组成部分

下图描述了 Spark 的不同组件。

Spark的组成部分

Apache Spark 核心

Spark Core 是 Spark 平台的底层通用执行引擎,所有其他功能都建立在该平台之上。它提供内存计算和引用外部存储系统中的数据集。

星火SQL

Spark SQL 是 Spark Core 之上的一个组件,它引入了一种称为 SchemaRDD 的新数据抽象,它为结构化和半结构化数据提供支持。

火花流

Spark Streaming 利用 Spark Core 的快速调度功能来执行流分析。它以小批量方式摄取数据,并对这些小批量数据执行 RDD(弹性分布式数据集)转换。

MLlib(机器学习库)

由于采用基于分布式内存的Spark架构,MLlib是Spark之上的分布式机器学习框架。根据基准测试,这是由 MLlib 开发人员针对交替最小二乘法 (ALS) 实现完成的。Spark MLlib 的速度是基于 Hadoop 磁盘的Apache Mahout版本(在 Mahout 获得 Spark 接口之前)的九倍。

图X

GraphX 是一个基于 Spark 的分布式图形处理框架。它提供了一个用于表达图计算的API,可以使用Pregel抽象API对用户定义的图进行建模。它还为此抽象提供了优化的运行时。

Apache Spark-RDD

弹性分布式数据集

弹性分布式数据集(RDD)是 Spark 的基本数据结构。它是一个不可变的分布式对象集合。RDD中的每个数据集被划分为逻辑分区,这些分区可以在集群的不同节点上计算。RDD 可以包含任何类型的 Python、Java 或 Scala 对象,包括用户定义的类。

从形式上来说,RDD 是只读的、分区的记录集合。RDD 可以通过对稳定存储上的数据或其他 RDD 上的确定性操作来创建。RDD 是可以并行操作的容错元素集合。

有两种方法可以创建 RDD:并行化驱动程序中的现有集合,或者引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop 输入格式的数据源。

Spark利用RDD的概念来实现更快、更高效的MapReduce操作。让我们首先讨论 MapReduce 操作是如何发生的以及为什么它们不那么高效。

MapReduce 中数据共享速度慢

MapReduce 被广泛用于在集群上使用并行分布式算法处理和生成大型数据集。它允许用户使用一组高级运算符编写并行计算,而不必担心工作分配和容错。

不幸的是,在大多数当前框架中,在计算之间(例如 - 两个 MapReduce 作业之间)重用数据的唯一方法是将其写入外部稳定存储系统(例如 - HDFS)。尽管该框架提供了许多用于访问集群计算资源的抽象,但用户仍然需要更多。

迭代交互式应用程序都需要在并行作业之间更快地共享数据。由于复制、序列化磁盘 IO ,MapReduce 中的数据共享速度很慢。在存储系统方面,大部分Hadoop应用,90%以上的时间都在做HDFS的读写操作。

MapReduce 上的迭代操作

在多阶段应用程序中的多个计算中重用中间结果。下图解释了当前框架在 MapReduce 上执行迭代操作时的工作原理。由于数据复制、磁盘 I/O 和序列化,这会产生大量开销,从而导致系统变慢。

MapReduce 上的迭代操作

MapReduce 上的交互操作

用户对同一数据子集运行即席查询。每个查询都会在稳定存储上执行磁盘 I/O,这可以控制应用程序的执行时间。

下图解释了当前框架在 MapReduce 上进行交互式查询时如何工作。

MapReduce 上的交互操作

使用 Spark RDD 进行数据共享

由于复制、序列化磁盘 IO ,MapReduce 中的数据共享速度很慢。大多数Hadoop应用程序,他们90%以上的时间都在做HDFS读写操作。

认识到这个问题,研究人员开发了一个名为 Apache Spark 的专门框架。Spark的核心思想是弹性分布式数据(RDD);它支持内存中处理计算。这意味着,它将内存状态存储为跨作业的对象,并且该对象可以在这些作业之间共享。内存中的数据共享比网络和磁盘快10到100倍。

现在让我们尝试了解一下 Spark RDD 中是如何进行迭代和交互操作的。

Spark RDD 上的迭代操作

下图展示了 Spark RDD 上的迭代操作。它将中间结果存储在分布式内存中而不是稳定存储(磁盘)中,并使系统更快。

- 如果分布式内存 (RAM) 足以存储中间结果(作业状态),那么它将把这些结果存储在磁盘上。

Spark RDD 上的迭代操作

Spark RDD的交互操作

该图显示了 Spark RDD 上的交互式操作。如果对同一组数据重复运行不同的查询,则可以将该特定数据保留在内存中以获得更好的执行时间。

Spark RDD的交互操作

默认情况下,每次对每个转换后的 RDD 运行操作时,都可能会重新计算它。但是,您也可以将 RDD保留在内存中,在这种情况下,Spark 会将元素保留在集群上,以便下次查询时更快地访问。还支持将 RDD 持久保存在磁盘上或跨多个节点复制。

Apache Spark - 安装

Spark是Hadoop的子项目。因此,最好将 Spark 安装到基于 Linux 的系统中。以下步骤显示如何安装 Apache Spark。

第 1 步:验证 Java 安装

Java 安装是安装 Spark 时必须做的事情之一。尝试以下命令来验证 JAVA 版本。

$java -version 

如果 Java 已经安装在您的系统上,您将看到以下响应 -

java version "1.7.0_71" 
Java(TM) SE Runtime Environment (build 1.7.0_71-b13) 
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果您的系统上尚未安装 Java,请先安装 Java,然后再继续下一步。

第 2 步:验证 Scala 安装

你应该用Scala语言来实现Spark。因此,让我们使用以下命令验证 Scala 安装。

$scala -version

如果您的系统上已经安装了 Scala,您将看到以下响应 -

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

如果您的系统上尚未安装 Scala,请继续执行下一步以安装 Scala。

第三步:下载Scala

通过访问以下链接下载Scala 下载最新版本的 Scala 。在本教程中,我们使用 scala-2.11.6 版本。下载后,您将在下载文件夹中找到 Scala tar 文件。

第四步:安装Scala

请按照以下给出的步骤安装 Scala。

提取 Scala tar 文件

键入以下命令以提取 Scala tar 文件。

$ tar xvf scala-2.11.6.tgz

移动 Scala 软件文件

使用以下命令将 Scala 软件文件移动到相应的目录(/usr/local/scala)

$ su – 
Password: 
# cd /home/Hadoop/Downloads/ 
# mv scala-2.11.6 /usr/local/scala 
# exit 

设置 Scala 的路径

使用以下命令设置 Scala 的 PATH。

$ export PATH = $PATH:/usr/local/scala/bin

验证 Scala 安装

安装后最好验证一下。使用以下命令验证 Scala 安装。

$scala -version

如果您的系统上已经安装了 Scala,您将看到以下响应 -

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

第 5 步:下载 Apache Spark

通过访问以下链接下载 Spark下载最新版本的 Spark 。在本教程中,我们使用spark-1.3.1-bin-hadoop2.6版本。下载后,您将在下载文件夹中找到 Spark tar 文件。

第6步:安装Spark

请按照以下步骤安装 Spark。

提取 Spark 焦油

以下命令用于提取 Spark tar 文件。

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz 

移动 Spark 软件文件

以下命令用于将 Spark 软件文件移动到相应目录(/usr/local/spark)

$ su – 
Password:  

# cd /home/Hadoop/Downloads/ 
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark 
# exit 

设置 Spark 环境

将以下行添加到 ~ /.bashrc文件中。意思是将spark软件文件所在的位置添加到PATH变量中。

export PATH=$PATH:/usr/local/spark/bin

使用以下命令获取 ~/.bashrc 文件。

$ source ~/.bashrc

第 7 步:验证 Spark 安装

编写以下命令来打开 Spark shell。

$spark-shell

如果 Spark 安装成功,您将看到以下输出。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc  
scala> 

Apache Spark - 核心编程

Spark Core是整个项目的基础。它提供分布式任务调度、调度和基本的I/O功能。Spark 使用一种称为 RDD(弹性分布式数据集)的专用基础数据结构,它是跨机器分区的数据的逻辑集合。RDD 可以通过两种方式创建;一是引用外部存储系统中的数据集,二是对现有 RDD 应用转换(例如映射、过滤器、减速器、连接)。

RDD 抽象通过语言集成的 API 公开。这简化了编程复杂性,因为应用程序操作 RDD 的方式类似于操作本地数据集合。

火花壳

Spark 提供了一个交互式 shell——一个交互式分析数据的强大工具。它可以使用 Scala 或 Python 语言。Spark 的主要抽象是分布式项目集合,称为弹性分布式数据集 (RDD)。RDD 可以从 Hadoop 输入格式(例如 HDFS 文件)或通过转换其他 RDD 创建。

打开 Spark Shell

以下命令用于打开 Spark shell。

$ spark-shell

创建简单的RDD

让我们从文本文件创建一个简单的 RDD。使用以下命令创建一个简单的 RDD。

scala> val inputfile = sc.textFile(“input.txt”)

上述命令的输出是

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API 引入了很少的转换操作来操作 RDD。

RDD 转换

RDD 转换返回指向新 RDD 的指针,并允许您在 RDD 之间创建依赖关系。依赖链(依赖字符串)中的每个 RDD 都有一个计算其数据的函数,并有一个指向其父 RDD 的指针(依赖)。

Spark 是惰性的,因此除非您调用某些将触发作业创建和执行的转换或操作,否则不会执行任何操作。请看下面的字数统计示例片段。

因此,RDD 转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉 Spark 如何获取数据以及如何处理数据。

序列号 转变与意义
1

地图(功能)

返回一个新的分布式数据集,该数据集是通过函数func传递源的每个元素而形成的。

2

过滤器(函数)

返回一个新的数据集,该数据集是通过选择func返回 true的源元素而形成的。

3

平面地图(函数)

与map类似,但每个输入项可以映射到0个或多个输出项(因此func应该返回一个Seq而不是单个项)。

4

映射分区(func)

与map类似,但在RDD的每个分区(块)上单独运行,因此当在T类型的RDD上运行时, func必须是Iterator<T> ⇒ Iterator<U>类型。

5

带索引的映射分区(func)

与map Partitions类似,但也为func提供了一个表示分区索引的整数值,因此当在T类型的RDD上运行时, func必须是(Int, Iterator<T>) ⇒ Iterator<U>类型。

6

样本(带有替换、分数、种子)

使用给定的随机数生成器种子对一小部分数据进行采样,无论是否有替换。

7

联合(其他数据集)

返回一个新数据集,其中包含源数据集中元素和参数的并集。

8

交集(其他数据集)

返回一个新的 RDD,其中包含源数据集中元素与参数的交集。

9

不同的([numTasks])

返回一个新数据集,其中包含源数据集的不同元素。

10

groupByKey([numTasks])

当调用 (K, V) 对数据集时,返回 (K, Iterable<V>) 对数据集。

注意- 如果您要进行分组以便对每个键执行聚合(例如求和或求平均值),则使用reduceByKey或aggregateByKey将产生更好的性能。

11

reduceByKey(func, [numTasks])

当调用 (K, V) 对数据集时,返回 (K, V) 对数据集,其中每个键的值使用给定的reduce 函数 func 进行聚合,该函数的类型必须为 (V, V) V . 与 groupByKey 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

12

aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])

当调用 (K, V) 对数据集时,返回 (K, U) 对数据集,其中每个键的值使用给定的组合函数和中性“零”值进行聚合。允许使用与输入值类型不同的聚合值类型,同时避免不必要的分配。与 groupByKey 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

13

sortByKey([升序], [numTasks])

当调用 K 实现 Ordered 的 (K, V) 对数据集时,返回按布尔升序参数中指定的键按升序或降序排序的 (K, V) 对数据集。

14

加入(其他数据集,[numTasks])

当调用 (K, V) 和 (K, W) 类型的数据集时,返回 (K, (V, W)) 对的数据集,其中每个键的所有元素对。通过 leftOuterJoin、rightOuterJoin 和 fullOuterJoin 支持外连接。

15

cogroup(其他数据集,[numTasks])

当调用 (K, V) 和 (K, W) 类型的数据集时,返回 (K, (Iterable<V>, Iterable<W>)) 元组的数据集。该操作也称为group With。

16

笛卡尔(其他数据集)

当调用 T 和 U 类型的数据集时,返回 (T, U) 对(所有元素对)的数据集。

17 号

管道(命令,[envVars])

通过 shell 命令(例如 Perl 或 bash 脚本)对 RDD 的每个分区进行管道传输。RDD 元素被写入进程的 stdin,并且输出到其 stdout 的行作为字符串 RDD 返回。

18

合并(numPartitions)

将 RDD 中的分区数量减少到 numPartitions。对于过滤大型数据集后更有效地运行操作很有用。

19

重新分区(numPartitions)

随机重新整理 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡。这总是会打乱网络上的所有数据。

20

repartitionAndSortWithinPartitions(分区器)

根据给定的分区程序对 RDD 进行重新分区,并在每个结果分区中按记录的键对记录进行排序。这比调用重新分区然后在每个分区内排序更有效,因为它可以将排序下推到洗牌机制中。

行动

序列号 行动与意义
1

减少(函数)

使用函数func聚合数据集的元素(它接受两个参数并返回一个)。该函数应该是可交换的和关联的,以便可以正确地并行计算。

2

收集()

在驱动程序中以数组形式返回数据集的所有元素。在返回足够小的数据子集的过滤器或其他操作之后,这通常很有用。

3

数数()

返回数据集中的元素数量。

4

第一的()

返回数据集的第一个元素(类似于采取(1))。

5

采取(n)

返回包含数据集前n 个元素的数组。

6

takeSample (withReplacement,num, [种子])

返回一个数组,其中包含数据集num个元素的随机样本,有或没有替换,可以选择预先指定随机数生成器种子。

7

takeOrdered(n, [排序])

使用自然顺序或自定义比较器返回 RDD 的前n个元素。

8

保存为文本文件(路径)

将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定目录中。Spark 对每个元素调用 toString 将其转换为文件中的一行文本。

9

saveAsSequenceFile(path)(Java 和 Scala)

将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对 RDD 上可用。在 Scala 中,它也适用于可隐式转换为 Writable 的类型(Spark 包括对 Int、Double、String 等基本类型的转换)。

10

saveAsObjectFile(path)(Java 和 Scala)

使用 Java 序列化以简单格式写入数据集的元素,然后可以使用 SparkContext.objectFile() 加载。

11

按键计数()

仅适用于 (K, V) 类型的 RDD。返回 (K, Int) 对的哈希图以及每个键的计数。

12

foreach(函数)

对数据集的每个元素运行函数func 。这样做通常是为了防止副作用,例如更新累加器或与外部存储系统交互。

注意- 修改 foreach() 之外的累加器以外的变量可能会导致未定义的Behave。有关更多详细信息,请参阅了解闭包。

使用 RDD 进行编程

让我们通过一个例子来看看 RDD 编程中一些 RDD 转换和操作的实现。

例子

考虑一个字数统计示例 - 它计算文档中出现的每个单词。将以下文本视为输入,并将其保存为主目录中的input.txt文件。

input.txt - 输入文件。

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

请按照下面给出的过程执行给定的示例。

打开 Spark-Shell

以下命令用于打开 Spark shell。一般来说,spark是使用Scala构建的。因此,Spark程序运行在Scala环境上。

$ spark-shell

如果 Spark shell 成功打开,您将看到以下输出。查看输出的最后一行“Spark context available as sc”意味着 Spark 容器会自动创建名为sc的 Spark 上下文对象。在开始程序的第一步之前,应该创建 SparkContext 对象。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

创建一个RDD

首先,我们必须使用 Spark-Scala API 读取输入文件并创建 RDD。

以下命令用于从给定位置读取文件。在这里,使用输入文件的名称创建新的 RDD。在 textFile(“”) 方法中作为参数给出的字符串是输入文件名的绝对路径。但是,如果仅给出文件名,则表示输入文件位于当前位置。

scala> val inputfile = sc.textFile("input.txt")

执行字数转换

我们的目标是计算文件中的单词数。创建一个平面映射,将每行分割成单词(flatMap(line ⇒ line.split(“ ”))。

接下来,使用映射函数 ( map(word ⇒ (word , 1) )将每个单词读取为值为“1”的键(<key, value> = <word, 1> )。

最后,通过添加相似键的值来减少这些键(reduceByKey(_+_))。

以下命令用于执行字计数逻辑。执行完之后,你不会发现任何输出,因为这不是一个动作,这是一个转换;指向一个新的 RDD 或告诉 Spark 如何处理给定的数据)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

当前RDD

在使用 RDD 时,如果您想了解当前的 RDD,请使用以下命令。它将显示有关当前 RDD 及其依赖项的描述,以供调试。

scala> counts.toDebugString

缓存转换

您可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。第一次在操作中计算时,它将保存在节点的内存中。使用以下命令将中间转换存储在内存中。

scala> counts.cache()

应用行动

应用操作,例如将所有转换、结果存储到文本文件中。saveAsTextFile(“ ”) 方法的字符串参数是输出文件夹的绝对路径。尝试以下命令将输出保存在文本文件中。在以下示例中,“输出”文件夹位于当前位置。

scala> counts.saveAsTextFile("output")

检查输出

打开另一个终端进入主目录(spark 在另一个终端中执行)。使用以下命令检查输出目录。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

以下命令用于查看Part-00000文件的输出。

[hadoop@localhost output]$ cat part-00000

输出

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 

以下命令用于查看Part-00001文件的输出。

[hadoop@localhost output]$ cat part-00001 

输出

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

联合国持久存储

在取消持久化之前,如果您想查看该应用程序使用的存储空间,请在浏览器中使用以下 URL。

http://localhost:4040

您将看到以下屏幕,其中显示了在 Spark shell 上运行的应用程序所使用的存储空间。

储存空间

如果您想取消持久化特定 RDD 的存储空间,请使用以下命令。

Scala> counts.unpersist()

您将看到如下输出 -

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

要验证浏览器中的存储空间,请使用以下 URL。

http://localhost:4040/

您将看到以下屏幕。它显示了在 Spark shell 上运行的应用程序使用的存储空间。

应用程序存储空间

Apache Spark - 部署

Spark应用程序使用spark-submit,是一个shell命令,用于在集群上部署Spark应用程序。它通过统一的接口使用所有相应的集群管理器。因此,您不必为每一个应用程序配置。

例子

让我们采用之前使用 shell 命令进行字数统计的相同示例。在这里,我们考虑与 Spark 应用程序相同的示例。

输入样本

以下文本是输入数据,文件名为in.txt

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

看下面的程序 -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
} 

将上述程序保存到名为SparkWordCount.scala的文件中,并将其放置在名为Spark-application的用户定义目录中。

注意- 在将 inputRDD 转换为 countRDD 时,我们使用 flatMap() 将行(来自文本文件)标记为单词,使用 map() 方法来计算单词频率,使用 reduceByKey() 方法来计算每个单词的重复次数。

请按照以下步骤提交此申请。通过终端执行spark-application目录下的所有步骤。

第 1 步:下载 Spark Ja

编译需要Spark core jar,因此,从以下链接下载spark-core_2.10-1.3.0.jar Spark core jar并将jar文件从下载目录移动到spark-application目录。

第二步:编译程序

使用下面给出的命令编译上述程序。该命令应从spark-application 目录执行。这里,/usr/local/spark/ lib/spark- assembly-1.4.0-hadoop2.6.0.jar 是取自 Spark 库的 Hadoop 支持 jar。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

第 3 步:创建 JAR

使用以下命令创建 Spark 应用程序的 jar 文件。这里,wordcount是 jar 文件的文件名。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

第四步:提交spark申请

使用以下命令提交 Spark 应用程序 -

spark-submit --class SparkWordCount --master local wordcount.jar

如果执行成功,您将看到下面给出的输出。OK输入以下输出用于用户识别,这是程序的最后一行如果仔细阅读以下输出,您会发现不同的东西,例如 -

  • 在端口 42954 上成功启动服务“sparkDriver”
  • MemoryStore 启动容量为 267.3 MB
  • 在 http://192.168.1.217:4040 启动 SparkUI
  • 添加JAR文件:/home/hadoop/piapplication/count.jar
  • ResultStage 1(SparkPi.scala:11 处的 saveAsTextFile)在 0.566 秒内完成
  • 已停止 Spark Web UI http://192.168.1.217:4040
  • 内存存储已清除
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!  

第 5 步:检查输出

程序执行成功后,您将在spark-application目录中找到名为outfile的目录。

以下命令用于打开和检查 outfile 目录中的文件列表。

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

检查part-00000文件中输出的命令是 -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

检查part-00001文件中输出的命令是 -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

请阅读以下部分以了解有关“spark-submit”命令的更多信息。

Spark-提交语法

spark-submit [options] <app jar | python file> [app arguments]

选项

序列号 选项 描述
1 - 掌握 Spark://host:port、mesos://host:port、yarn 或 local。
2 --部署模式 是否在本地(“客户端”)或集群内的一台工作计算机(“集群”)上启动驱动程序(默认:客户端)。
3 - 班级 您的应用程序的主类(对于 Java / Scala 应用程序)。
4 - 姓名 您的应用程序的名称。
5 --罐子 要包含在驱动程序和执行程序类路径中的以逗号分隔的本地 jar 列表。
6 --包 要包含在驱动程序和执行程序类路径中的 jar 的 Maven 坐标的逗号分隔列表。
7 --存储库 以逗号分隔的其他远程存储库列表,用于搜索使用 --packages 给出的 Maven 坐标。
8 --py 文件 要放置在 Python 应用程序的 PYTHON PATH 上的以逗号分隔的 .zip、.egg 或 .py 文件列表。
9 --文件 要放置在每个执行程序的工作目录中的以逗号分隔的文件列表。
10 --conf (属性=val) 任意 Spark 配置属性。
11 --属性文件 从中加载额外属性的文件的路径。如果未指定,这将查找conf/spark-defaults。
12 --驱动程序内存 驱动程序内存(例如1000M、2G)(默认:512M)。
13 --driver-java-选项 传递给驱动程序的额外 Java 选项。
14 --驱动程序库路径 要传递给驱动程序的额外库路径条目。
15 --驱动程序类路径

要传递给驱动程序的额外类路径条目。

请注意,使用 --jars 添加的 jar 会自动包含在类路径中。

16 --执行器内存 每个执行器的内存(例如1000M、2G)(默认:1G)。
17 号 --代理用户 提交申请时模拟的用户。
18 --帮助,-h 显示此帮助消息并退出。
19 --详细,-v 打印额外的调试输出。
20 - 版本 打印当前 Spark 的版本。
21 --驱动程序核心数 驱动程序的核心(默认值:1)。
22 - 监督 如果给出,则在失败时重新启动驱动程序。
23 - 杀 如果给出,则杀死指定的驱动程序。
24 - 地位 如果给出,则请求指定驱动程序的状态。
25 --总执行器核心数 所有执行器的核心总数。
26 --执行器核心 每个执行器的核心数。(默认值:YARN 模式下为 1,或独立模式下工作线程上的所有可用核心)。

高级 Spark 编程

Spark 包含两种不同类型的共享变量 - 一种是广播变量,第二种是累加器

  • 广播变量- 用于有效地分配大值。

  • 累加器- 用于聚合特定集合的信息。

广播变量

广播变量允许程序员在每台机器上缓存只读变量,而不是随任务传送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark还尝试使用高效的广播算法来分发广播变量,以降低通信成本。

Spark 操作通过一组阶段执行,这些阶段由分布式“shuffle”操作分隔开。Spark自动广播每个阶段内任务所需的公共数据。

以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。

广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是v的包装器,可以通过调用value方法来访问它的值。下面给出的代码显示了这一点 -

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

输出-

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

创建广播变量后,应在集群上运行的任何函数中使用它代替值v,以便v不会多次传送到节点。另外,对象v在广播后不应该被修改,以确保所有节点得到相同的广播变量值。

累加器

累加器是仅通过关联运算“添加”的变量,因此可以有效地支持并行。它们可用于实现计数器(如在 MapReduce 中)或求和。Spark 原生支持数字类型的累加器,程序员可以添加对新类型的支持。如果累加器是用名称创建的,它们将显示在Spark 的 UI中。这对于了解运行阶段的进度很有用(注意 - Python 尚不支持)。

通过调用SparkContext.accumulator(v)从初始值v创建累加器。然后,可以使用add方法或 += 运算符(在 Scala 和 Python 中)将在集群上运行的任务添加到集群中。然而,他们无法读取其价值。只有驱动程序可以使用累加器的value方法读取累加器的值。

下面给出的代码显示了一个累加器,用于将数组的元素相加 -

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

如果您想查看上述代码的输出,请使用以下命令 -

scala> accum.value 

输出

res2: Int = 10 

数值 RDD 操作

Spark 允许您使用预定义的 API 方法之一对数值数据执行不同的操作。Spark 的数值运算是通过流式算法实现的,该算法允许一次构建模型。

这些操作通过调用status()方法进行计算并作为StatusCounter对象返回。

序列号 方法及意义
1

数数()

RDD 中的元素数量。

2

意思是()

RDD 中元素的平均值。

3

和()

RDD 中元素的总价值。

4

最大限度()

RDD 中所有元素的最大值。

5

最小()

RDD 中所有元素中的最小值。

6

方差()

元素的方差。

7

标准差()

标准差。

如果只想使用其中一种方法,可以直接在RDD上调用相应的方法。