高级 Spark 编程


Spark 包含两种不同类型的共享变量 - 一种是广播变量,第二种是累加器

  • 广播变量- 用于有效地分配大值。

  • 累加器- 用于聚合特定集合的信息。

广播变量

广播变量允许程序员在每台机器上缓存只读变量,而不是随任务传送它的副本。例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。Spark还尝试使用高效的广播算法来分发广播变量,以降低通信成本。

Spark 操作通过一组阶段执行,这些阶段由分布式“shuffle”操作分隔开。Spark自动广播每个阶段内任务所需的公共数据。

以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。

广播变量是通过调用SparkContext.broadcast(v)从变量v创建的。广播变量是v的包装器,可以通过调用value方法来访问它的值。下面给出的代码显示了这一点 -

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

输出-

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

创建广播变量后,应在集群上运行的任何函数中使用它代替值v,以便v不会多次传送到节点。另外,对象v在广播后不应该被修改,以确保所有节点得到相同的广播变量值。

累加器

累加器是仅通过关联运算“添加”的变量,因此可以有效地支持并行。它们可用于实现计数器(如在 MapReduce 中)或求和。Spark 原生支持数字类型的累加器,程序员可以添加对新类型的支持。如果累加器是用名称创建的,它们将显示在Spark 的 UI中。这对于了解运行阶段的进度很有用(注意 - Python 尚不支持)。

通过调用SparkContext.accumulator(v)从初始值v创建累加器。然后,可以使用add方法或 += 运算符(在 Scala 和 Python 中)将在集群上运行的任务添加到集群中。然而,他们无法读取其价值。只有驱动程序可以使用累加器的value方法读取累加器的值。

下面给出的代码显示了一个累加器,用于将数组的元素相加 -

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

如果您想查看上述代码的输出,请使用以下命令 -

scala> accum.value 

输出

res2: Int = 10 

数值 RDD 操作

Spark 允许您使用预定义的 API 方法之一对数值数据执行不同的操作。Spark 的数值运算是通过流式算法实现的,该算法允许一次构建模型。

这些操作通过调用status()方法进行计算并作为StatusCounter对象返回。

以下是StatusCounter中可用的数字方法的列表。

序列号 方法及意义
1

数数()

RDD 中的元素数量。

2

意思是()

RDD 中元素的平均值。

3

和()

RDD 中元素的总价值。

4

最大限度()

RDD 中所有元素的最大值。

5

最小()

RDD 中所有元素中的最小值。

6

方差()

元素的方差。

7

标准差()

标准差。

如果只想使用其中一种方法,可以直接在RDD上调用相应的方法。