- PySpark Tutorial
- PySpark - Home
- PySpark - Introduction
- PySpark - Environment Setup
- PySpark - SparkContext
- PySpark - RDD
- PySpark - Broadcast & Accumulator
- PySpark - SparkConf
- PySpark - SparkFiles
- PySpark - StorageLevel
- PySpark - MLlib
- PySpark - Serializers
- PySpark Useful Resources
- PySpark - Quick Guide
- PySpark - Useful Resources
- PySpark - Discussion
PySpark-RDD
现在我们已经在系统上安装并配置了 PySpark,我们可以在 Apache Spark 上使用 Python 进行编程。不过在此之前,让我们先了解一下 Spark 中的一个基本概念——RDD。
RDD 代表弹性分布式数据集,这些元素在多个节点上运行和操作以在集群上进行并行处理。RDD 是不可变元素,这意味着一旦创建 RDD,就无法更改它。RDD 也具有容错能力,因此如果发生任何故障,它们会自动恢复。您可以对这些 RDD 应用多个操作来完成特定任务。
要对这些 RDD 应用操作,有两种方法 -
- 转型与
- 行动
让我们详细了解这两种方式。
转换- 这些是应用于 RDD 以创建新 RDD 的操作。Filter、groupBy 和 map 是转换的示例。
Action - 这些是应用于 RDD 的操作,指示 Spark 执行计算并将结果发送回驱动程序。
要在 PySpark 中应用任何操作,我们需要首先创建一个PySpark RDD。以下代码块包含 PySpark RDD 类的详细信息 -
class pyspark.RDD ( jrdd, ctx, jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) )
让我们看看如何使用 PySpark 运行一些基本操作。Python 文件中的以下代码创建 RDD 单词,其中存储提到的一组单词。
words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] )
我们现在将对单词进行一些操作。
数数()
返回 RDD 中的元素数量。
----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------
命令- count() 的命令是 -
$SPARK_HOME/bin/spark-submit count.py
输出- 上述命令的输出是 -
Number of elements in RDD → 8
收集()
返回 RDD 中的所有元素。
----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------
命令-collect() 的命令是 -
$SPARK_HOME/bin/spark-submit collect.py
输出- 上述命令的输出是 -
Elements in RDD -> [ 'scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]
foreach(f)
仅返回满足 foreach 内部函数条件的元素。在下面的示例中,我们在 foreach 中调用打印函数,该函数打印 RDD 中的所有元素。
----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------
命令- foreach(f) 的命令是 -
$SPARK_HOME/bin/spark-submit foreach.py
输出- 上述命令的输出是 -
scala java hadoop spark akka spark vs hadoop pyspark pyspark and spark
过滤器(f)
返回一个包含元素的新 RDD,满足过滤器内的函数。在下面的示例中,我们过滤掉包含“spark”的字符串。
----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------
命令- 过滤器(f)的命令是 -
$SPARK_HOME/bin/spark-submit filter.py
输出- 上述命令的输出是 -
Fitered RDD -> [ 'spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]
地图(f,保留分区=假)
通过对 RDD 中的每个元素应用一个函数来返回一个新的 RDD。在下面的示例中,我们形成一个键值对,并将每个字符串映射为值 1。
----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------
命令-map(f,preservesPartitioning=False)的命令是-
$SPARK_HOME/bin/spark-submit map.py
输出- 上述命令的输出是 -
Key value pair -> [
('scala', 1),
('java', 1),
('hadoop', 1),
('spark', 1),
('akka', 1),
('spark vs hadoop', 1),
('pyspark', 1),
('pyspark and spark', 1)
]
减少(f)
执行指定的交换和结合二元运算后,返回RDD中的元素。在下面的示例中,我们从运算符导入 add 包并将其应用于“num”以执行简单的加法操作。
----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------
命令-reduce(f)的命令是-
$SPARK_HOME/bin/spark-submit reduce.py
输出- 上述命令的输出是 -
Adding all the elements -> 15
加入(其他,numPartitions = 无)
它返回带有一对元素的 RDD,其中包含匹配的键以及该特定键的所有值。在下面的示例中,两个不同的 RDD 中有两对元素。连接这两个 RDD 后,我们得到一个 RDD,其中的元素具有匹配的键及其值。
----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------
命令- join(other, numPartitions = None) 的命令是 -
$SPARK_HOME/bin/spark-submit join.py
输出- 上述命令的输出是 -
Join RDD -> [
('spark', (1, 2)),
('hadoop', (4, 5))
]
缓存()
使用默认存储级别 (MEMORY_ONLY) 保留此 RDD。您还可以检查 RDD 是否已缓存。
----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------
命令-cache() 的命令是 -
$SPARK_HOME/bin/spark-submit cache.py
输出- 上述程序的输出是 -
Words got cached -> True
这些是在 PySpark RDD 上完成的一些最重要的操作。