PySpark – SparkFiles ”; Previous Next In Apache Spark, you can upload your files using sc.addFile (sc is your default SparkContext) and get the path on a worker using SparkFiles.get. Thus, SparkFiles resolve the paths to files added through SparkContext.addFile(). SparkFiles contain the following classmethods − get(filename) getrootdirectory() Let us understand them in detail. get(filename) It specifies the path of the file that is added through SparkContext.addFile(). getrootdirectory() It specifies the path to the root directory, which contains the file that is added through the SparkContext.addFile(). —————————————-sparkfile.py———————————— from pyspark import SparkContext from pyspark import SparkFiles finddistance = “/home/hadoop/examples_pyspark/finddistance.R” finddistancename = “finddistance.R” sc = SparkContext(“local”, “SparkFile App”) sc.addFile(finddistance) print “Absolute Path -> %s” % SparkFiles.get(finddistancename) —————————————-sparkfile.py———————————— Command − The command is as follows − $SPARK_HOME/bin/spark-submit sparkfiles.py Output − The output for the above command is − Absolute Path -> /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R Print Page Previous Next Advertisements ”;
Category: pyspark
PySpark – RDD
PySpark – RDD ”; Previous Next Now that we have installed and configured PySpark on our system, we can program in Python on Apache Spark. However before doing so, let us understand a fundamental concept in Spark – RDD. RDD stands for Resilient Distributed Dataset, these are the elements that run and operate on multiple nodes to do parallel processing on a cluster. RDDs are immutable elements, which means once you create an RDD you cannot change it. RDDs are fault tolerant as well, hence in case of any failure, they recover automatically. You can apply multiple operations on these RDDs to achieve a certain task. To apply operations on these RDD”s, there are two ways − Transformation and Action Let us understand these two ways in detail. Transformation − These are the operations, which are applied on a RDD to create a new RDD. Filter, groupBy and map are the examples of transformations. Action − These are the operations that are applied on RDD, which instructs Spark to perform computation and send the result back to the driver. To apply any operation in PySpark, we need to create a PySpark RDD first. The following code block has the detail of a PySpark RDD Class − class pyspark.RDD ( jrdd, ctx, jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) ) Let us see how to run a few basic operations using PySpark. The following code in a Python file creates RDD words, which stores a set of words mentioned. words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) We will now run a few operations on words. count() Number of elements in the RDD is returned. —————————————-count.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “count app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) counts = words.count() print “Number of elements in RDD -> %i” % (counts) —————————————-count.py————————————— Command − The command for count() is − $SPARK_HOME/bin/spark-submit count.py Output − The output for the above command is − Number of elements in RDD → 8 collect() All the elements in the RDD are returned. —————————————-collect.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “Collect app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) coll = words.collect() print “Elements in RDD -> %s” % (coll) —————————————-collect.py————————————— Command − The command for collect() is − $SPARK_HOME/bin/spark-submit collect.py Output − The output for the above command is − Elements in RDD -> [ ”scala”, ”java”, ”hadoop”, ”spark”, ”akka”, ”spark vs hadoop”, ”pyspark”, ”pyspark and spark” ] foreach(f) Returns only those elements which meet the condition of the function inside foreach. In the following example, we call a print function in foreach, which prints all the elements in the RDD. —————————————-foreach.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “ForEach app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) def f(x): print(x) fore = words.foreach(f) —————————————-foreach.py————————————— Command − The command for foreach(f) is − $SPARK_HOME/bin/spark-submit foreach.py Output − The output for the above command is − scala java hadoop spark akka spark vs hadoop pyspark pyspark and spark filter(f) A new RDD is returned containing the elements, which satisfies the function inside the filter. In the following example, we filter out the strings containing ””spark”. —————————————-filter.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “Filter app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) words_filter = words.filter(lambda x: ”spark” in x) filtered = words_filter.collect() print “Fitered RDD -> %s” % (filtered) —————————————-filter.py—————————————- Command − The command for filter(f) is − $SPARK_HOME/bin/spark-submit filter.py Output − The output for the above command is − Fitered RDD -> [ ”spark”, ”spark vs hadoop”, ”pyspark”, ”pyspark and spark” ] map(f, preservesPartitioning = False) A new RDD is returned by applying a function to each element in the RDD. In the following example, we form a key value pair and map every string with a value of 1. —————————————-map.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “Map app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) words_map = words.map(lambda x: (x, 1)) mapping = words_map.collect() print “Key value pair -> %s” % (mapping) —————————————-map.py————————————— Command − The command for map(f, preservesPartitioning=False) is − $SPARK_HOME/bin/spark-submit map.py Output − The output of the above command is − Key value pair -> [ (”scala”, 1), (”java”, 1), (”hadoop”, 1), (”spark”, 1), (”akka”, 1), (”spark vs hadoop”, 1), (”pyspark”, 1), (”pyspark and spark”, 1) ] reduce(f) After performing the specified commutative and associative binary operation, the element in the RDD is returned. In the following example, we are importing add package from the operator and applying it on ‘num’ to carry out a simple addition operation. —————————————-reduce.py————————————— from pyspark import SparkContext from operator import add sc = SparkContext(“local”, “Reduce app”) nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print “Adding all the elements -> %i” % (adding) —————————————-reduce.py————————————— Command − The command for reduce(f) is − $SPARK_HOME/bin/spark-submit reduce.py Output − The output of the above command is − Adding all the elements -> 15 join(other, numPartitions = None) It returns RDD with a pair of elements with the matching keys and all the values for that particular key. In the following example, there are two pair of elements in two different RDDs. After joining these two RDDs, we get an RDD with elements having matching keys and their values. —————————————-join.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “Join app”) x = sc.parallelize([(“spark”, 1), (“hadoop”, 4)]) y = sc.parallelize([(“spark”, 2), (“hadoop”, 5)]) joined = x.join(y) final = joined.collect() print “Join RDD -> %s” % (final) —————————————-join.py————————————— Command − The command for join(other, numPartitions = None) is − $SPARK_HOME/bin/spark-submit join.py Output − The output for the above command is − Join RDD -> [ (”spark”, (1, 2)), (”hadoop”, (4, 5)) ] cache() Persist this
PySpark – Broadcast & Accumulator ”; Previous Next For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks. There are two types of shared variables supported by Apache Spark − Broadcast Accumulator Let us understand them in detail. Broadcast Broadcast variables are used to save the copy of data across all nodes. This variable is cached on all the machines and not sent on machines with tasks. The following code block has the details of a Broadcast class for PySpark. class pyspark.Broadcast ( sc = None, value = None, pickle_registry = None, path = None ) The following example shows how to use a Broadcast variable. A Broadcast variable has an attribute called value, which stores the data and is used to return a broadcasted value. —————————————-broadcast.py————————————– from pyspark import SparkContext sc = SparkContext(“local”, “Broadcast app”) words_new = sc.broadcast([“scala”, “java”, “hadoop”, “spark”, “akka”]) data = words_new.value print “Stored data -> %s” % (data) elem = words_new.value[2] print “Printing a particular element in RDD -> %s” % (elem) —————————————-broadcast.py————————————– Command − The command for a broadcast variable is as follows − $SPARK_HOME/bin/spark-submit broadcast.py Output − The output for the following command is given below. Stored data -> [ ”scala”, ”java”, ”hadoop”, ”spark”, ”akka” ] Printing a particular element in RDD -> hadoop Accumulator Accumulator variables are used for aggregating the information through associative and commutative operations. For example, you can use an accumulator for a sum operation or counters (in MapReduce). The following code block has the details of an Accumulator class for PySpark. class pyspark.Accumulator(aid, value, accum_param) The following example shows how to use an Accumulator variable. An Accumulator variable has an attribute called value that is similar to what a broadcast variable has. It stores the data and is used to return the accumulator”s value, but usable only in a driver program. In this example, an accumulator variable is used by multiple workers and returns an accumulated value. —————————————-accumulator.py———————————— from pyspark import SparkContext sc = SparkContext(“local”, “Accumulator app”) num = sc.accumulator(10) def f(x): global num num+=x rdd = sc.parallelize([20,30,40,50]) rdd.foreach(f) final = num.value print “Accumulated value is -> %i” % (final) —————————————-accumulator.py———————————— Command − The command for an accumulator variable is as follows − $SPARK_HOME/bin/spark-submit accumulator.py Output − The output for the above command is given below. Accumulated value is -> 150 Print Page Previous Next Advertisements ”;