Apache Spark - 核心编程


Spark Core是整个项目的基础。它提供分布式任务调度、调度和基本的I/O功能。Spark 使用一种称为 RDD(弹性分布式数据集)的专用基础数据结构,它是跨机器分区的数据的逻辑集合。RDD 可以通过两种方式创建;一是引用外部存储系统中的数据集,二是对现有 RDD 应用转换(例如映射、过滤器、减速器、连接)。

RDD 抽象通过语言集成的 API 公开。这简化了编程复杂性,因为应用程序操作 RDD 的方式类似于操作本地数据集合。

火花壳

Spark 提供了一个交互式 shell——一个交互式分析数据的强大工具。它可以使用 Scala 或 Python 语言。Spark 的主要抽象是分布式项目集合,称为弹性分布式数据集 (RDD)。RDD 可以从 Hadoop 输入格式(例如 HDFS 文件)或通过转换其他 RDD 创建。

打开 Spark Shell

以下命令用于打开 Spark shell。

$ spark-shell

创建简单的RDD

让我们从文本文件创建一个简单的 RDD。使用以下命令创建一个简单的 RDD。

scala> val inputfile = sc.textFile(“input.txt”)

上述命令的输出是

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API 引入了很少的转换操作来操作 RDD。

RDD 转换

RDD 转换返回指向新 RDD 的指针,并允许您在 RDD 之间创建依赖关系。依赖链(依赖字符串)中的每个 RDD 都有一个计算其数据的函数,并有一个指向其父 RDD 的指针(依赖)。

Spark 是惰性的,因此除非您调用某些将触发作业创建和执行的转换或操作,否则不会执行任何操作。请看下面的字数统计示例片段。

因此,RDD 转换不是一组数据,而是程序中的一个步骤(可能是唯一的步骤),告诉 Spark 如何获取数据以及如何处理数据。

下面给出了 RDD 转换的列表。

序列号 转变与意义
1

地图(功能)

返回一个新的分布式数据集,该数据集是通过函数func传递源的每个元素而形成的。

2

过滤器(函数)

返回一个新的数据集,该数据集是通过选择func返回 true的源元素而形成的。

3

平面地图(函数)

与map类似,但每个输入项可以映射到0个或多个输出项(因此func应该返回一个Seq而不是单个项)。

4

映射分区(func)

与map类似,但在RDD的每个分区(块)上单独运行,因此当在T类型的RDD上运行时, func必须是Iterator<T> ⇒ Iterator<U>类型。

5

带索引的映射分区(func)

与map Partitions类似,但也为func提供了一个表示分区索引的整数值,因此当在T类型的RDD上运行时, func必须是(Int, Iterator<T>) ⇒ Iterator<U>类型。

6

样本(带有替换、分数、种子)

使用给定的随机数生成器种子对一小部分数据进行采样,无论是否有替换。

7

联合(其他数据集)

返回一个新数据集,其中包含源数据集中元素和参数的并集。

8

交集(其他数据集)

返回一个新的 RDD,其中包含源数据集中元素与参数的交集。

9

不同的([numTasks])

返回一个新数据集,其中包含源数据集的不同元素。

10

groupByKey([numTasks])

当调用 (K, V) 对数据集时,返回 (K, Iterable<V>) 对数据集。

注意- 如果您要进行分组以便对每个键执行聚合(例如求和或求平均值),则使用reduceByKey或aggregateByKey将产生更好的性能。

11

reduceByKey(func, [numTasks])

当调用 (K, V) 对数据集时,返回 (K, V) 对数据集,其中每个键的值使用给定的reduce 函数 func 进行聚合,该函数的类型必须为 (V, V) V . 与 groupByKey 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

12

aggregateByKey(zeroValue)(seqOp,combOp,[numTasks])

当调用 (K, V) 对数据集时,返回 (K, U) 对数据集,其中每个键的值使用给定的组合函数和中性“零”值进行聚合。允许使用与输入值类型不同的聚合值类型,同时避免不必要的分配。与 groupByKey 一样,reduce 任务的数量可以通过可选的第二个参数进行配置。

13

sortByKey([升序], [numTasks])

当调用 K 实现 Ordered 的 (K, V) 对数据集时,返回按布尔升序参数中指定的键按升序或降序排序的 (K, V) 对数据集。

14

加入(其他数据集,[numTasks])

当调用 (K, V) 和 (K, W) 类型的数据集时,返回 (K, (V, W)) 对的数据集,其中每个键的所有元素对。通过 leftOuterJoin、rightOuterJoin 和 fullOuterJoin 支持外连接。

15

cogroup(其他数据集,[numTasks])

当调用 (K, V) 和 (K, W) 类型的数据集时,返回 (K, (Iterable<V>, Iterable<W>)) 元组的数据集。该操作也称为group With。

16

笛卡尔(其他数据集)

当调用 T 和 U 类型的数据集时,返回 (T, U) 对(所有元素对)的数据集。

17 号

管道(命令,[envVars])

通过 shell 命令(例如 Perl 或 bash 脚本)对 RDD 的每个分区进行管道传输。RDD 元素被写入进程的 stdin,并且输出到其 stdout 的行作为字符串 RDD 返回。

18

合并(numPartitions)

将 RDD 中的分区数量减少到 numPartitions。对于过滤大型数据集后更有效地运行操作很有用。

19

重新分区(numPartitions)

随机重新整理 RDD 中的数据以创建更多或更少的分区并在它们之间进行平衡。这总是会打乱网络上的所有数据。

20

repartitionAndSortWithinPartitions(分区器)

根据给定的分区程序对 RDD 进行重新分区,并在每个结果分区中按记录的键对记录进行排序。这比调用重新分区然后在每个分区内排序更有效,因为它可以将排序下推到洗牌机制中。

行动

下表给出了返回值的操作列表。

序列号 行动与意义
1

减少(函数)

使用函数func聚合数据集的元素(它接受两个参数并返回一个)。该函数应该是可交换的和关联的,以便可以正确地并行计算。

2

收集()

在驱动程序中以数组形式返回数据集的所有元素。在返回足够小的数据子集的过滤器或其他操作之后,这通常很有用。

3

数数()

返回数据集中的元素数量。

4

第一的()

返回数据集的第一个元素(类似于采取(1))。

5

采取(n)

返回包含数据集前n 个元素的数组。

6

takeSample (withReplacement,num, [种子])

返回一个数组,其中包含数据集num个元素的随机样本,有或没有替换,可以选择预先指定随机数生成器种子。

7

takeOrdered(n, [排序])

使用自然顺序或自定义比较器返回 RDD 的前n个元素。

8

保存为文本文件(路径)

将数据集的元素作为文本文件(或文本文件集)写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定目录中。Spark 对每个元素调用 toString 将其转换为文件中的一行文本。

9

saveAsSequenceFile(path)(Java 和 Scala)

将数据集的元素作为 Hadoop SequenceFile 写入本地文件系统、HDFS 或任何其他 Hadoop 支持的文件系统的给定路径中。这在实现 Hadoop 的 Writable 接口的键值对 RDD 上可用。在 Scala 中,它也适用于可隐式转换为 Writable 的类型(Spark 包括对 Int、Double、String 等基本类型的转换)。

10

saveAsObjectFile(path)(Java 和 Scala)

使用 Java 序列化以简单格式写入数据集的元素,然后可以使用 SparkContext.objectFile() 加载。

11

按键计数()

仅适用于 (K, V) 类型的 RDD。返回 (K, Int) 对的哈希图以及每个键的计数。

12

foreach(函数)

对数据集的每个元素运行函数func 。这样做通常是为了防止副作用,例如更新累加器或与外部存储系统交互。

注意- 修改 foreach() 之外的累加器以外的变量可能会导致未定义的Behave。有关更多详细信息,请参阅了解闭包。

使用 RDD 进行编程

让我们通过一个例子来看看 RDD 编程中一些 RDD 转换和操作的实现。

例子

考虑一个字数统计示例 - 它计算文档中出现的每个单词。将以下文本视为输入,并将其保存为主目录中的input.txt文件。

input.txt - 输入文件。

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

请按照下面给出的过程执行给定的示例。

打开 Spark-Shell

以下命令用于打开 Spark shell。一般来说,spark是使用Scala构建的。因此,Spark程序运行在Scala环境上。

$ spark-shell

如果 Spark shell 成功打开,您将看到以下输出。查看输出的最后一行“Spark context available as sc”意味着 Spark 容器会自动创建名为sc的 Spark 上下文对象。在开始程序的第一步之前,应该创建 SparkContext 对象。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

创建一个RDD

首先,我们必须使用 Spark-Scala API 读取输入文件并创建 RDD。

以下命令用于从给定位置读取文件。在这里,使用输入文件的名称创建新的 RDD。在 textFile(“”) 方法中作为参数给出的字符串是输入文件名的绝对路径。但是,如果仅给出文件名,则表示输入文件位于当前位置。

scala> val inputfile = sc.textFile("input.txt")

执行字数转换

我们的目标是计算文件中的单词数。创建一个平面映射,将每行分割成单词(flatMap(line ⇒ line.split(“ ”))。

接下来,使用映射函数 ( map(word ⇒ (word , 1) )将每个单词读取为值为“1”的键(<key, value> = <word, 1> )。

最后,通过添加相似键的值来减少这些键(reduceByKey(_+_))。

以下命令用于执行字计数逻辑。执行完之后,你不会发现任何输出,因为这不是一个动作,这是一个转换;指向一个新的 RDD 或告诉 Spark 如何处理给定的数据)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

当前RDD

在使用 RDD 时,如果您想了解当前的 RDD,请使用以下命令。它将显示有关当前 RDD 及其依赖项的描述,以供调试。

scala> counts.toDebugString

缓存转换

您可以使用 persist() 或 cache() 方法将 RDD 标记为持久化。第一次在操作中计算时,它将保存在节点的内存中。使用以下命令将中间转换存储在内存中。

scala> counts.cache()

应用行动

应用操作,例如将所有转换、结果存储到文本文件中。saveAsTextFile(“ ”) 方法的字符串参数是输出文件夹的绝对路径。尝试以下命令将输出保存在文本文件中。在以下示例中,“输出”文件夹位于当前位置。

scala> counts.saveAsTextFile("output")

检查输出

打开另一个终端进入主目录(spark 在另一个终端中执行)。使用以下命令检查输出目录。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

以下命令用于查看Part-00000文件的输出。

[hadoop@localhost output]$ cat part-00000

输出

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 

以下命令用于查看Part-00001文件的输出。

[hadoop@localhost output]$ cat part-00001 

输出

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

联合国持久存储

在取消持久化之前,如果您想查看该应用程序使用的存储空间,请在浏览器中使用以下 URL。

http://localhost:4040

您将看到以下屏幕,其中显示了在 Spark shell 上运行的应用程序所使用的存储空间。

储存空间

如果您想取消持久化特定 RDD 的存储空间,请使用以下命令。

Scala> counts.unpersist()

您将看到如下输出 -

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

要验证浏览器中的存储空间,请使用以下 URL。

http://localhost:4040/

您将看到以下屏幕。它显示了在 Spark shell 上运行的应用程序使用的存储空间。

应用程序存储空间