开发手册 欢迎您!
软件开发者资料库

PySpark - 广播与广播累加器

PySpark广播和累积器 - 从基本到高级概念的简单易用步骤学习PySpark,其中包括简介,环境设置,SparkContext,RDD,广播和累积器,SparkConf,SparkFiles,StorageLevel,MLlib,Serializers等示例。

对于并行处理,Apache Spark使用共享变量.当驱动程序将任务发送到集群上的执行程序时,共享变量的副本会在集群的每个节点上运行,以便它可用于执行任务.

有两种类型Apache Spark : 支持的共享变量;

  • 广播

  • 累加器

让我们详细了解它们.

广播

广播变量用于保存所有节点上的数据副本.此变量缓存在所有计算机上,而不是在具有任务的计算机上发送.以下代码块包含PySpark的Broadcast类的详细信息.

class pyspark.Broadcast (   sc = None,    value = None,    pickle_registry = None,    path = None)

以下示例显示如何使用Broadcast变量. Broadcast变量有一个名为value的属性,它存储数据并用于返回广播值.

----------- ----------------------------- broadcast.py ------------------ -------------------- from pyspark import SparkContext sc = SparkContext("local", "Broadcast app") words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) data = words_new.value print "Stored data -> %s" % (data) elem = words_new.value[2] print "Printing a particular element in RDD -> %s" % (elem)-------- -------------------------------- broadcast.py --------------- -----------------------

命令 : 号;广播变量的命令如下 :

  $ SPARK_HOME/bin/spark-submit broadcast.py

输出 : 以下命令的输出如下:

Stored data -> [   'scala',     'java',    'hadoop',    'spark',    'akka']Printing a particular element in RDD -> hadoop

累加器

累加器变量用于通过关联和交换操作聚合信息.例如,您可以使用累加器进行求和操作或计数器(在MapReduce中).以下代码块包含PySpark的Accumulator类的详细信息.

  class pyspark.Accumulator(aid,value,accum_param)

以下示例显示如何使用Accumulator变量. Accumulator变量有一个名为value的属性,类似于广播变量.它存储数据并用于返回累加器的值,但仅在驱动程序中可用.

在此示例中,累加器变量由多个工作程序使用并返回累计值.

----------------------------- ----------- accumulator.py ------------------------------------来自pyspark导入的from pyspark import SparkContext sc = SparkContext("local", "Accumulator app") num = sc.accumulator(10) def f(x):    global num    num+=x rdd = sc.parallelize([20,30,40,50]) rdd.foreach(f) final = num.value print "Accumulated value is -> %i" % (final)---------------------------- ------------ accumulator.py ----------------------------------- -

命令 : 累加器变量的命令如下 :

  $ SPARK_HOME/bin/spark-submit accumulator.py

输出 : 以上命令的输出如下:

Accumulated value is -> 150