- PySpark Tutorial
- PySpark - Home
- PySpark - Introduction
- PySpark - Environment Setup
- PySpark - SparkContext
- PySpark - RDD
- PySpark - Broadcast & Accumulator
- PySpark - SparkConf
- PySpark - SparkFiles
- PySpark - StorageLevel
- PySpark - MLlib
- PySpark - Serializers
- PySpark Useful Resources
- PySpark - Quick Guide
- PySpark - Useful Resources
- PySpark - Discussion
PySpark - 广播和累加器
对于并行处理,Apache Spark 使用共享变量。当驱动程序向集群上的执行程序发送任务时,共享变量的副本会保存在集群的每个节点上,以便可以使用它来执行任务。
Apache Spark 支持两种类型的共享变量 -
- 播送
- 累加器
让我们详细了解它们。
播送
广播变量用于保存所有节点上的数据副本。该变量缓存在所有机器上,不会在有任务的机器上发送。以下代码块包含 PySpark 的 Broadcast 类的详细信息。
class pyspark.Broadcast ( sc = None, value = None, pickle_registry = None, path = None )
以下示例显示如何使用广播变量。广播变量有一个名为 value 的属性,它存储数据并用于返回广播值。
----------------------------------------broadcast.py-------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Broadcast app") words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) data = words_new.value print "Stored data -> %s" % (data) elem = words_new.value[2] print "Printing a particular element in RDD -> %s" % (elem) ----------------------------------------broadcast.py--------------------------------------
命令- 广播变量的命令如下 -
$SPARK_HOME/bin/spark-submit broadcast.py
输出- 以下命令的输出如下所示。
Stored data -> [ 'scala', 'java', 'hadoop', 'spark', 'akka' ] Printing a particular element in RDD -> hadoop
累加器
累加器变量用于通过关联和交换运算来聚合信息。例如,您可以使用累加器进行求和运算或计数器(在 MapReduce 中)。以下代码块包含 PySpark Accumulator 类的详细信息。
class pyspark.Accumulator(aid, value, accum_param)
以下示例显示如何使用累加器变量。累加器变量有一个名为 value 的属性,与广播变量的属性类似。它存储数据并用于返回累加器的值,但只能在驱动程序中使用。
在此示例中,累加器变量由多个工作人员使用并返回累加值。
----------------------------------------accumulator.py------------------------------------ from pyspark import SparkContext sc = SparkContext("local", "Accumulator app") num = sc.accumulator(10) def f(x): global num num+=x rdd = sc.parallelize([20,30,40,50]) rdd.foreach(f) final = num.value print "Accumulated value is -> %i" % (final) ----------------------------------------accumulator.py------------------------------------
命令- 累加器变量的命令如下 -
$SPARK_HOME/bin/spark-submit accumulator.py
输出- 上述命令的输出如下所示。
Accumulated value is -> 150