PySpark – Home

PySpark Tutorial Table of content What is PySpark? Key Components of PySpark Purpose of PySpark Features of PySpark Applications of PySpark Why to learn PySpark? Prerequisites to learn PySpark PySpark Jobs and Opportunities Frequently Asked Questions about PySpark PDF Version Quick Guide Resources Job Search Discussion What is PySpark? Apache Spark is a powerful open-source data processing engine written in Scala, designed for large-scale data processing. To support Python with Spark, Apache Spark community released a tool, PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this. This is an introductory tutorial, which covers the basics of Data-Driven Documents and explains how to deal with its various components and sub-components. PySpark is the Python API for Apache Spark. It allows you to interface with Spark”s distributed computation framework using Python, making it easier to work with big data in a language many data scientists and engineers are familiar with. By using PySpark, you can create and manage Spark jobs, and perform complex data transformations and analyses. Key Components of PySpark Following are the key components of PySpark − RDDs (Resilient Distributed Datasets) − RDDs are the fundamental data structure in Spark. They are immutable distributed collections of objects that can be processed in parallel. DataFrames − DataFrames are similar to RDDs but with additional features like named columns, and support for a wide range of data sources. They are analogous to tables in a relational database and provide a higher-level abstraction for data manipulation. Spark SQL − This module allows you to execute SQL queries on DataFrames and RDDs. It provides a programming abstraction called DataFrame and can also act as a distributed SQL query engine. MLlib (Machine Learning Library) − MLlib is Spark”s scalable machine learning library, offering various algorithms and utilities for classification, regression, clustering, collaborative filtering, and more. Spark Streaming − Spark Streaming enables real-time data processing and stream processing. It allows you to process live data streams and update results in real-time. Purpose of PySpark The primary purpose of PySpark is to enable processing of large-scale datasets in real-time across a distributed computing environment using Python. PySpark provides an interface for interacting with Spark”s core functionalities, such as working with Resilient Distributed Datasets (RDDs) and DataFrames, using the Python programming language. Features of PySpark PySpark has the following features − Integration with Spark − PySpark is tightly integrated with Apache Spark, allowing seamless data processing and analysis using Python Programming. Real-time Processing − It enables real-time processing of large-scale datasets. Ease of Use − PySpark simplifies complex data processing tasks using Python”s simple syntax and extensive libraries. Interactive Shell − PySpark offers an interactive shell for real-time data analysis and experimentation. Machine Learning − It includes MLlib, a scalable machine learning library. Data Sources − PySpark can read data from various sources, including HDFS, S3, HBase, and more. Partitioning − Efficiently partitions data to enhance processing speed and efficiency. Applications of PySpark PySpark is widely used in various applications, including − Data Analysis − Analyzing large datasets to extract meaningful information. Machine Learning − Implementing machine learning algorithms for predictive analytics. Data Streaming − Processing streaming data in real-time. Data Engineering − Managing and transforming big data for various use cases. Why to learn PySpark? Learning PySpark is essential for anyone interested in big data and data engineering. It offers various benefits − Scalability − Efficiently handles large datasets across distributed systems. Performance − High-speed data processing and real-time analytics. Flexibility − PySpark supports integration with various data sources and tools. Comprehensive Toolset − Includes tools for data manipulation, machine learning, and graph processing. Prerequisites to learn PySpark Before proceeding with the various concepts given in this tutorial, it is being assumed that the readers are already aware about what a programming language and a framework is. In addition to this, it will be very helpful, if the readers have a sound knowledge of Apache Spark, Apache Hadoop, Scala Programming Language, Hadoop Distributed File System (HDFS) and Python. PySpark Jobs and Opportunities Proficiency in PySpark opens up various career opportunities, such as − Data Analyst Data Engineer Python Developer PySpark Developer Data Scientist and more. Frequently Asked Questions about PySpark There are some very Frequently Asked Questions(FAQ) about PySpark, this section tries to answer them briefly. What is PySpark used for? PySpark is used for processing large-scale datasets in real-time across a distributed computing environment using Python. It also offers an interactive PySpark shell for data analysis. Describe the different ways to read data into PySpark. PySpark can read data from multiple sources, including CSV, Parquet, text files, tables, and JSON. It offers methods like format, csv(), load, and more to facilitate data reading. What is the role of partitioning in PySpark? Partitioning in PySpark helps divide a large dataset into smaller, manageable parts based on partitioning expressions, which enhances processing speed and efficiency. What is the purpose of checkpoints in PySpark? Checkpoints in PySpark are used to truncate the logical plan of a DataFrame, particularly useful in iterative algorithms where the plan can become complex and large, thereby improving performance. What is PySpark UDF? A PySpark UDF (User Defined Function) allows the creation of custom functions to apply transformations across multiple DataFrames. UDFs are deterministic

PySpark – MLlib

PySpark – MLlib ”; Previous Next Apache Spark offers a Machine Learning API called MLlib. PySpark has this machine learning API in Python as well. It supports different kind of algorithms, which are mentioned below − mllib.classification − The spark.mllib package supports various methods for binary classification, multiclass classification and regression analysis. Some of the most popular algorithms in classification are Random Forest, Naive Bayes, Decision Tree, etc. mllib.clustering − Clustering is an unsupervised learning problem, whereby you aim to group subsets of entities with one another based on some notion of similarity. mllib.fpm − Frequent pattern matching is mining frequent items, itemsets, subsequences or other substructures that are usually among the first steps to analyze a large-scale dataset. This has been an active research topic in data mining for years. mllib.linalg − MLlib utilities for linear algebra. mllib.recommendation − Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user item association matrix. spark.mllib − It ¬currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.mllib uses the Alternating Least Squares (ALS) algorithm to learn these latent factors. mllib.regression − Linear regression belongs to the family of regression algorithms. The goal of regression is to find relationships and dependencies between variables. The interface for working with linear regression models and model summaries is similar to the logistic regression case. There are other algorithms, classes and functions also as a part of the mllib package. As of now, let us understand a demonstration on pyspark.mllib. The following example is of collaborative filtering using ALS algorithm to build the recommendation model and evaluate it on training data. Dataset used − test.data 1,1,5.0 1,2,1.0 1,3,5.0 1,4,1.0 2,1,5.0 2,2,1.0 2,3,5.0 2,4,1.0 3,1,1.0 3,2,5.0 3,3,1.0 3,4,5.0 4,1,1.0 4,2,5.0 4,3,1.0 4,4,5.0 ————————————–recommend.py—————————————- from __future__ import print_function from pyspark import SparkContext from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating if __name__ == “__main__”: sc = SparkContext(appName=”Pspark mllib Example”) data = sc.textFile(“test.data”) ratings = data.map(lambda l: l.split(”,”)) .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) # Build the recommendation model using Alternating Least Squares rank = 10 numIterations = 10 model = ALS.train(ratings, rank, numIterations) # Evaluate the model on training data testdata = ratings.map(lambda p: (p[0], p[1])) predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) MSE = ratesAndPreds.map(lambda r: (r[1][0] – r[1][1])**2).mean() print(“Mean Squared Error = ” + str(MSE)) # Save and load model model.save(sc, “target/tmp/myCollaborativeFilter”) sameModel = MatrixFactorizationModel.load(sc, “target/tmp/myCollaborativeFilter”) ————————————–recommend.py—————————————- Command − The command will be as follows − $SPARK_HOME/bin/spark-submit recommend.py Output − The output of the above command will be − Mean Squared Error = 1.20536041839e-05 Print Page Previous Next Advertisements ”;

PySpark – Quick Guide

PySpark – Quick Guide ”; Previous Next PySpark – Introduction In this chapter, we will get ourselves acquainted with what Apache Spark is and how was PySpark developed. Spark – Overview Apache Spark is a lightning fast real-time processing framework. It does in-memory computations to analyze data in real-time. It came into picture as Apache Hadoop MapReduce was performing batch processing only and lacked a real-time processing feature. Hence, Apache Spark was introduced as it can perform stream processing in real-time and can also take care of batch processing. Apart from real-time and batch processing, Apache Spark supports interactive queries and iterative algorithms also. Apache Spark has its own cluster manager, where it can host its application. It leverages Apache Hadoop for both storage and processing. It uses HDFS (Hadoop Distributed File system) for storage and it can run Spark applications on YARN as well. PySpark – Overview Apache Spark is written in Scala programming language. To support Python with Spark, Apache Spark Community released a tool, PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this. PySpark offers PySpark Shell which links the Python API to the spark core and initializes the Spark context. Majority of data scientists and analytics experts today use Python because of its rich library set. Integrating Python with Spark is a boon to them. PySpark – Environment Setup In this chapter, we will understand the environment setup of PySpark. Note − This is considering that you have Java and Scala installed on your computer. Let us now download and set up PySpark with the following steps. Step 1 − Go to the official Apache Spark download page and download the latest version of Apache Spark available there. In this tutorial, we are using spark-2.1.0-bin-hadoop2.7. Step 2 − Now, extract the downloaded Spark tar file. By default, it will get downloaded in Downloads directory. # tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz It will create a directory spark-2.1.0-bin-hadoop2.7. Before starting PySpark, you need to set the following environments to set the Spark path and the Py4j path. export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7 export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH export PATH = $SPARK_HOME/python:$PATH Or, to set the above environments globally, put them in the .bashrc file. Then run the following command for the environments to work. # source .bashrc Now that we have all the environments set, let us go to Spark directory and invoke PySpark shell by running the following command − # ./bin/pyspark This will start your PySpark shell. Python 2.7.12 (default, Nov 19 2016, 06:48:10) [GCC 5.4.0 20160609] on linux2 Type “help”, “copyright”, “credits” or “license” for more information. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ ”_/ /__ / .__/_,_/_/ /_/_ version 2.1.0 /_/ Using Python version 2.7.12 (default, Nov 19 2016 06:48:10) SparkSession available as ”spark”. <<< PySpark – SparkContext SparkContext is the entry point to any spark functionality. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker nodes. SparkContext uses Py4J to launch a JVM and creates a JavaSparkContext. By default, PySpark has SparkContext available as ‘sc’, so creating a new SparkContext won”t work. The following code block has the details of a PySpark class and the parameters, which a SparkContext can take. class pyspark.SparkContext ( master = None, appName = None, sparkHome = None, pyFiles = None, environment = None, batchSize = 0, serializer = PickleSerializer(), conf = None, gateway = None, jsc = None, profiler_cls = <class ”pyspark.profiler.BasicProfiler”> ) Parameters Following are the parameters of a SparkContext. Master − It is the URL of the cluster it connects to. appName − Name of your job. sparkHome − Spark installation directory. pyFiles − The .zip or .py files to send to the cluster and add to the PYTHONPATH. Environment − Worker nodes environment variables. batchSize − The number of Python objects represented as a single Java object. Set 1 to disable batching, 0 to automatically choose the batch size based on object sizes, or -1 to use an unlimited batch size. Serializer − RDD serializer. Conf − An object of L{SparkConf} to set all the Spark properties. Gateway − Use an existing gateway and JVM, otherwise initializing a new JVM. JSC − The JavaSparkContext instance. profiler_cls − A class of custom Profiler used to do profiling (the default is pyspark.profiler.BasicProfiler). Among the above parameters, master and appname are mostly used. The first two lines of any PySpark program looks as shown below − from pyspark import SparkContext sc = SparkContext(“local”, “First App”) SparkContext Example – PySpark Shell Now that you know enough about SparkContext, let us run a simple example on PySpark shell. In this example, we will be counting the number of lines with character ”a” or ”b” in the README.md file. So, let us say if there are 5 lines in a file and 3 lines have the character ”a”, then the output will be → Line with a: 3. Same will be done for character ‘b’. Note − We are not creating any SparkContext object in the following example because by default, Spark automatically creates the SparkContext object named sc, when PySpark shell starts. In case you try to create another SparkContext object, you will get the following error – “ValueError: Cannot run multiple SparkContexts at once”. <<< logFile = “file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md” <<< logData = sc.textFile(logFile).cache() <<< numAs = logData.filter(lambda s: ”a” in s).count() <<< numBs = logData.filter(lambda s: ”b” in s).count() <<< print “Lines with a: %i, lines with b: %i” % (numAs, numBs) Lines with a: 62, lines with b: 30 SparkContext Example – Python Program Let us run the same example using a Python program. Create a Python file called firstapp.py and enter the following code in that file. —————————————-firstapp.py————————————— from pyspark import

PySpark – Serializers

PySpark – Serializers ”; Previous Next Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations. PySpark supports custom serializers for performance tuning. The following two serializers are supported by PySpark − MarshalSerializer Serializes objects using Python’s Marshal Serializer. This serializer is faster than PickleSerializer, but supports fewer datatypes. class pyspark.MarshalSerializer PickleSerializer Serializes objects using Python’s Pickle Serializer. This serializer supports nearly any Python object, but may not be as fast as more specialized serializers. class pyspark.PickleSerializer Let us see an example on PySpark serialization. Here, we serialize the data using MarshalSerializer. ————————————–serializing.py————————————- from pyspark.context import SparkContext from pyspark.serializers import MarshalSerializer sc = SparkContext(“local”, “serialization app”, serializer = MarshalSerializer()) print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)) sc.stop() ————————————–serializing.py————————————- Command − The command is as follows − $SPARK_HOME/bin/spark-submit serializing.py Output − The output of the above command is − [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] Print Page Previous Next Advertisements ”;

PySpark – Discussion

Discuss PySpark ”; Previous Next Apache Spark is written in Scala programming language. To support Python with Spark, Apache Spark community released a tool, PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this. This is an introductory tutorial, which covers the basics of Data-Driven Documents and explains how to deal with its various components and sub-components. Print Page Previous Next Advertisements ”;

PySpark – Useful Resources

PySpark – Useful Resources ”; Previous Next The following resources contain additional information on PySpark. Please use them to get more in-depth knowledge on this. Useful Video Courses PySpark and AWS: Master Big Data With PySpark and AWS 149 Lectures 16 hours Packt Publishing More Detail Building Big Data Pipelines with PySpark + MongoDB + Bokeh 25 Lectures 5 hours Edwin Bomela More Detail Big Data Analytics with PySpark + Power BI + MongoDB 27 Lectures 3.5 hours Edwin Bomela More Detail Big Data Analytics with PySpark + Tableau Desktop + MongoDB 24 Lectures 4.5 hours Edwin Bomela More Detail Learn Pyspark – Advance Course 9 Lectures 1 hours Corporate Bridge Consultancy Private Limited More Detail Learn Pyspark – Beginner Course 16 Lectures 2 hours Corporate Bridge Consultancy Private Limited More Detail Print Page Previous Next Advertisements ”;

PySpark – Introduction

PySpark – Introduction ”; Previous Next In this chapter, we will get ourselves acquainted with what Apache Spark is and how was PySpark developed. Spark – Overview Apache Spark is a lightning fast real-time processing framework. It does in-memory computations to analyze data in real-time. It came into picture as Apache Hadoop MapReduce was performing batch processing only and lacked a real-time processing feature. Hence, Apache Spark was introduced as it can perform stream processing in real-time and can also take care of batch processing. Apart from real-time and batch processing, Apache Spark supports interactive queries and iterative algorithms also. Apache Spark has its own cluster manager, where it can host its application. It leverages Apache Hadoop for both storage and processing. It uses HDFS (Hadoop Distributed File system) for storage and it can run Spark applications on YARN as well. PySpark – Overview Apache Spark is written in Scala programming language. To support Python with Spark, Apache Spark Community released a tool, PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this. PySpark offers PySpark Shell which links the Python API to the spark core and initializes the Spark context. Majority of data scientists and analytics experts today use Python because of its rich library set. Integrating Python with Spark is a boon to them. Print Page Previous Next Advertisements ”;

PySpark – SparkFiles

PySpark – SparkFiles ”; Previous Next In Apache Spark, you can upload your files using sc.addFile (sc is your default SparkContext) and get the path on a worker using SparkFiles.get. Thus, SparkFiles resolve the paths to files added through SparkContext.addFile(). SparkFiles contain the following classmethods − get(filename) getrootdirectory() Let us understand them in detail. get(filename) It specifies the path of the file that is added through SparkContext.addFile(). getrootdirectory() It specifies the path to the root directory, which contains the file that is added through the SparkContext.addFile(). —————————————-sparkfile.py———————————— from pyspark import SparkContext from pyspark import SparkFiles finddistance = “/home/hadoop/examples_pyspark/finddistance.R” finddistancename = “finddistance.R” sc = SparkContext(“local”, “SparkFile App”) sc.addFile(finddistance) print “Absolute Path -> %s” % SparkFiles.get(finddistancename) —————————————-sparkfile.py———————————— Command − The command is as follows − $SPARK_HOME/bin/spark-submit sparkfiles.py Output − The output for the above command is − Absolute Path -> /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R Print Page Previous Next Advertisements ”;

PySpark – RDD

PySpark – RDD ”; Previous Next Now that we have installed and configured PySpark on our system, we can program in Python on Apache Spark. However before doing so, let us understand a fundamental concept in Spark – RDD. RDD stands for Resilient Distributed Dataset, these are the elements that run and operate on multiple nodes to do parallel processing on a cluster. RDDs are immutable elements, which means once you create an RDD you cannot change it. RDDs are fault tolerant as well, hence in case of any failure, they recover automatically. You can apply multiple operations on these RDDs to achieve a certain task. To apply operations on these RDD”s, there are two ways − Transformation and Action Let us understand these two ways in detail. Transformation − These are the operations, which are applied on a RDD to create a new RDD. Filter, groupBy and map are the examples of transformations. Action − These are the operations that are applied on RDD, which instructs Spark to perform computation and send the result back to the driver. To apply any operation in PySpark, we need to create a PySpark RDD first. The following code block has the detail of a PySpark RDD Class − class pyspark.RDD ( jrdd, ctx, jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) ) Let us see how to run a few basic operations using PySpark. The following code in a Python file creates RDD words, which stores a set of words mentioned. words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) We will now run a few operations on words. count() Number of elements in the RDD is returned. —————————————-count.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “count app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) counts = words.count() print “Number of elements in RDD -> %i” % (counts) —————————————-count.py————————————— Command − The command for count() is − $SPARK_HOME/bin/spark-submit count.py Output − The output for the above command is − Number of elements in RDD → 8 collect() All the elements in the RDD are returned. —————————————-collect.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “Collect app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) coll = words.collect() print “Elements in RDD -> %s” % (coll) —————————————-collect.py————————————— Command − The command for collect() is − $SPARK_HOME/bin/spark-submit collect.py Output − The output for the above command is − Elements in RDD -> [ ”scala”, ”java”, ”hadoop”, ”spark”, ”akka”, ”spark vs hadoop”, ”pyspark”, ”pyspark and spark” ] foreach(f) Returns only those elements which meet the condition of the function inside foreach. In the following example, we call a print function in foreach, which prints all the elements in the RDD. —————————————-foreach.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “ForEach app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) def f(x): print(x) fore = words.foreach(f) —————————————-foreach.py————————————— Command − The command for foreach(f) is − $SPARK_HOME/bin/spark-submit foreach.py Output − The output for the above command is − scala java hadoop spark akka spark vs hadoop pyspark pyspark and spark filter(f) A new RDD is returned containing the elements, which satisfies the function inside the filter. In the following example, we filter out the strings containing ””spark”. —————————————-filter.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “Filter app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) words_filter = words.filter(lambda x: ”spark” in x) filtered = words_filter.collect() print “Fitered RDD -> %s” % (filtered) —————————————-filter.py—————————————- Command − The command for filter(f) is − $SPARK_HOME/bin/spark-submit filter.py Output − The output for the above command is − Fitered RDD -> [ ”spark”, ”spark vs hadoop”, ”pyspark”, ”pyspark and spark” ] map(f, preservesPartitioning = False) A new RDD is returned by applying a function to each element in the RDD. In the following example, we form a key value pair and map every string with a value of 1. —————————————-map.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “Map app”) words = sc.parallelize ( [“scala”, “java”, “hadoop”, “spark”, “akka”, “spark vs hadoop”, “pyspark”, “pyspark and spark”] ) words_map = words.map(lambda x: (x, 1)) mapping = words_map.collect() print “Key value pair -> %s” % (mapping) —————————————-map.py————————————— Command − The command for map(f, preservesPartitioning=False) is − $SPARK_HOME/bin/spark-submit map.py Output − The output of the above command is − Key value pair -> [ (”scala”, 1), (”java”, 1), (”hadoop”, 1), (”spark”, 1), (”akka”, 1), (”spark vs hadoop”, 1), (”pyspark”, 1), (”pyspark and spark”, 1) ] reduce(f) After performing the specified commutative and associative binary operation, the element in the RDD is returned. In the following example, we are importing add package from the operator and applying it on ‘num’ to carry out a simple addition operation. —————————————-reduce.py————————————— from pyspark import SparkContext from operator import add sc = SparkContext(“local”, “Reduce app”) nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print “Adding all the elements -> %i” % (adding) —————————————-reduce.py————————————— Command − The command for reduce(f) is − $SPARK_HOME/bin/spark-submit reduce.py Output − The output of the above command is − Adding all the elements -> 15 join(other, numPartitions = None) It returns RDD with a pair of elements with the matching keys and all the values for that particular key. In the following example, there are two pair of elements in two different RDDs. After joining these two RDDs, we get an RDD with elements having matching keys and their values. —————————————-join.py————————————— from pyspark import SparkContext sc = SparkContext(“local”, “Join app”) x = sc.parallelize([(“spark”, 1), (“hadoop”, 4)]) y = sc.parallelize([(“spark”, 2), (“hadoop”, 5)]) joined = x.join(y) final = joined.collect() print “Join RDD -> %s” % (final) —————————————-join.py————————————— Command − The command for join(other, numPartitions = None) is − $SPARK_HOME/bin/spark-submit join.py Output − The output for the above command is − Join RDD -> [ (”spark”, (1, 2)), (”hadoop”, (4, 5)) ] cache() Persist this

PySpark – Broadcast & Accumulator

PySpark – Broadcast & Accumulator ”; Previous Next For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks. There are two types of shared variables supported by Apache Spark − Broadcast Accumulator Let us understand them in detail. Broadcast Broadcast variables are used to save the copy of data across all nodes. This variable is cached on all the machines and not sent on machines with tasks. The following code block has the details of a Broadcast class for PySpark. class pyspark.Broadcast ( sc = None, value = None, pickle_registry = None, path = None ) The following example shows how to use a Broadcast variable. A Broadcast variable has an attribute called value, which stores the data and is used to return a broadcasted value. —————————————-broadcast.py————————————– from pyspark import SparkContext sc = SparkContext(“local”, “Broadcast app”) words_new = sc.broadcast([“scala”, “java”, “hadoop”, “spark”, “akka”]) data = words_new.value print “Stored data -> %s” % (data) elem = words_new.value[2] print “Printing a particular element in RDD -> %s” % (elem) —————————————-broadcast.py————————————– Command − The command for a broadcast variable is as follows − $SPARK_HOME/bin/spark-submit broadcast.py Output − The output for the following command is given below. Stored data -> [ ”scala”, ”java”, ”hadoop”, ”spark”, ”akka” ] Printing a particular element in RDD -> hadoop Accumulator Accumulator variables are used for aggregating the information through associative and commutative operations. For example, you can use an accumulator for a sum operation or counters (in MapReduce). The following code block has the details of an Accumulator class for PySpark. class pyspark.Accumulator(aid, value, accum_param) The following example shows how to use an Accumulator variable. An Accumulator variable has an attribute called value that is similar to what a broadcast variable has. It stores the data and is used to return the accumulator”s value, but usable only in a driver program. In this example, an accumulator variable is used by multiple workers and returns an accumulated value. —————————————-accumulator.py———————————— from pyspark import SparkContext sc = SparkContext(“local”, “Accumulator app”) num = sc.accumulator(10) def f(x): global num num+=x rdd = sc.parallelize([20,30,40,50]) rdd.foreach(f) final = num.value print “Accumulated value is -> %i” % (final) —————————————-accumulator.py———————————— Command − The command for an accumulator variable is as follows − $SPARK_HOME/bin/spark-submit accumulator.py Output − The output for the above command is given below. Accumulated value is -> 150 Print Page Previous Next Advertisements ”;