现在我们已经在我们的系统上安装并配置了PySpark,我们可以在Apache Spark上用Python编程.然而,在这样做之前,让我们理解Spark中的一个基本概念 - RDD.
RDD代表弹性分布式数据集,这些是运行和运行的元素多个节点在集群上进行并行处理. RDD是不可变元素,这意味着一旦创建了RDD,就无法对其进行更改. RDD也具有容错能力,因此在发生任何故障时,它们会自动恢复.您可以对这些RDD应用多个操作来完成某项任务.
要对这些RDD应用操作,有两种方法和减号;
转型和
行动
让我们了解这两种方式详细.
转型 : 这些是操作,它们应用于RDD以创建新的RDD. Filter,groupBy和map是转换的例子.
动作 : 这些是应用于RDD的操作,它指示Spark执行计算并将结果发送回驱动程序.
要在PySpark中应用任何操作,我们需要创建 PySpark RDD 首先.以下代码块具有PySpark RDD Class : 的详细信息;
class pyspark.RDD ( jrdd, ctx, jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()))
让我们看看如何使用PySpark运行一些基本操作. Python文件中的以下代码创建RDD单词,其中存储了一组提到的单词.
words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"])
我们现在将对单词进行一些操作.
count()
返回RDD中的元素数.
----------------------------------------count.py---------------------------------------from pyspark import SparkContextsc = SparkContext("local", "count app")words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"])counts = words.count()print "Number of elements in RDD -> %i" % (counts)----------------------------------------count.py---------------------------------------
命令 : 号; count()的命令是 :
$ SPARK_HOME/bin/spark-submit count.py
输出 : 上述命令的输出是 :
RDD&rarr中的元素数量; 8
collect()
返回RDD中的所有元素.
------------------------------------- --- collect.py --------------------------------------- from pyspark import SparkContextsc = SparkContext("local", "Collect app")words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"])coll = words.collect()print "Elements in RDD -> %s" % (coll) ------------ ---------------------------- collect.py ------------------- --------------------
命令 : collect()的命令是 :
$ SPARK_HOME/bin/spark-submit collect.py
输出 : 上述命令的输出是 :
Elements in RDD -> [ 'scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
foreach(f)
仅返回满足foreach内函数条件的元素.在下面的例子中,我们在foreach中调用print函数,它打印RDD中的所有元素.
------- --------------------------------- foreach.py --------------来自pyspark导入的------------------------- from pyspark import SparkContextsc = SparkContext("local", "ForEach app")words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"])def f(x): print(x)fore = words.foreach(f) ------------------------------------ ---- foreach.py ---------------------------------------
命令 : foreach(f)的命令是 :
$ SPARK_HOME/bin/spark-submit foreach.py
输出 : 上述命令的输出是 :
scala java hadoop spark akka spark vs hadoop pyspark pyspark和spark
filter(f)
返回一个包含元素的新RDD,它满足过滤器内部的功能.在下面的示例中,我们过滤掉包含''spark'的字符串.
------------- --------------------------- filter.py --------------------来自pyspark导入的------------------- from pyspark import SparkContextsc = SparkContext("local", "Filter app")words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"])words_filter = words.filter(lambda x: 'spark' in x)filtered = words_filter.collect()print "Fitered RDD -> %s" % (filtered) --------------------------------------- -filter.py ----------------------------------------
命令 : 过滤器(f)的命令是 :
$ SPARK_HOME/bin/spark-submit filter.py
输出 : 上述命令的输出为 : 去;
Fitered RDD -> [ 'spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
map(f,preservesPartitioning = False)
通过将函数应用于RDD中的每个元素来返回一个新的RDD.在下面的示例中,我们形成一个键值对,并将每个字符串映射为值.
---------------------------------------- map .py --------------------------------------- from pyspark import SparkContextsc = SparkContext("local", "Map app")words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"])words_map = words.map(lambda x: (x, 1))mapping = words_map.collect()print "Key value pair -> %s" % (mapping)--------------------------------------- -map.py ---------------------------------------
命令 : map命令(f,preservesPartitioning = False)是 :
$ SPARK_HOME/bin/spark-submit map.py
输出 : 以上输出命令是 :
Key value pair -> [ ('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1)]
reduce(f)
执行指定的可交换和关联二进制运算后,返回RDD中的元素.在下面的示例中,我们从运算符导入add package并将其应用于'num'以执行简单的加法操作.
----------------------------------- ----- reduce.py --------------------------------------- from pyspark import SparkContextfrom operator import addsc = SparkContext("local", "Reduce app")nums = sc.parallelize([1, 2, 3, 4, 5])adding = nums.reduce(add)print "Adding all the elements -> %i" % (adding)--------------------------------------- -reduce.py ---------------------------------------
命令 : reduce(f)的命令是 :
$ SPARK_HOME/bin/spark-submit reduce.py
输出 : 上述命令的输出为 :
Adding all the elements -> 15
加入(其他,numPartitions =无)
它返回RDD,其中包含一对带有匹配键的元素以及该特定键的所有值.在下面的示例中,两个元素中有两对不同的RDD.加入这两个RDD后,我们得到一个RDD,其元素具有匹配的键及其值.
----------- ----------------------------- join.py ------------------来自pyspark导入的--------------------- from pyspark import SparkContextsc = SparkContext("local", "Join app")x = sc.parallelize([("spark", 1), ("hadoop", 4)])y = sc.parallelize([("spark", 2), ("hadoop", 5)])joined = x.join(y)final = joined.collect()print "Join RDD -> %s" % (final)--------------------------------------- -join.py ---------------------------------------
命令 : 连接命令(其他,numPartitions =无)是 :
$ SPARK_HOME/bin/spark-submit join.py
输出 : 以上输出命令是 :
Join RDD -> [ ('spark', (1, 2)), ('hadoop', (4, 5))]
cache()
持续这个RDD具有默认存储级别(MEMORY_ONLY).您还可以检查RDD是否被缓存.
---------- ------------------------------ cache.py ----------------- ---------------------- from pyspark import SparkContext sc = SparkContext("local", "Cache app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"]) words.cache() caching = words.persist().is_cached print "Words got chached > %s" % (caching)--------------------------------------- -cache.py ---------------------------------------
命令 : 缓存()的命令是 :
$ SPARK_HOME/bin/spark-submit cache.py
输出 : 上述程序的输出为 :
Words got cached -> True
这些是一些在PySpark RDD上完成的最重要的操作.