PySpark – Home

PySpark Tutorial Table of content What is PySpark? Key Components of PySpark Purpose of PySpark Features of PySpark Applications of PySpark Why to learn PySpark? Prerequisites to learn PySpark PySpark Jobs and Opportunities Frequently Asked Questions about PySpark PDF Version Quick Guide Resources Job Search Discussion What is PySpark? Apache Spark is a powerful open-source data processing engine written in Scala, designed for large-scale data processing. To support Python with Spark, Apache Spark community released a tool, PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this. This is an introductory tutorial, which covers the basics of Data-Driven Documents and explains how to deal with its various components and sub-components. PySpark is the Python API for Apache Spark. It allows you to interface with Spark”s distributed computation framework using Python, making it easier to work with big data in a language many data scientists and engineers are familiar with. By using PySpark, you can create and manage Spark jobs, and perform complex data transformations and analyses. Key Components of PySpark Following are the key components of PySpark − RDDs (Resilient Distributed Datasets) − RDDs are the fundamental data structure in Spark. They are immutable distributed collections of objects that can be processed in parallel. DataFrames − DataFrames are similar to RDDs but with additional features like named columns, and support for a wide range of data sources. They are analogous to tables in a relational database and provide a higher-level abstraction for data manipulation. Spark SQL − This module allows you to execute SQL queries on DataFrames and RDDs. It provides a programming abstraction called DataFrame and can also act as a distributed SQL query engine. MLlib (Machine Learning Library) − MLlib is Spark”s scalable machine learning library, offering various algorithms and utilities for classification, regression, clustering, collaborative filtering, and more. Spark Streaming − Spark Streaming enables real-time data processing and stream processing. It allows you to process live data streams and update results in real-time. Purpose of PySpark The primary purpose of PySpark is to enable processing of large-scale datasets in real-time across a distributed computing environment using Python. PySpark provides an interface for interacting with Spark”s core functionalities, such as working with Resilient Distributed Datasets (RDDs) and DataFrames, using the Python programming language. Features of PySpark PySpark has the following features − Integration with Spark − PySpark is tightly integrated with Apache Spark, allowing seamless data processing and analysis using Python Programming. Real-time Processing − It enables real-time processing of large-scale datasets. Ease of Use − PySpark simplifies complex data processing tasks using Python”s simple syntax and extensive libraries. Interactive Shell − PySpark offers an interactive shell for real-time data analysis and experimentation. Machine Learning − It includes MLlib, a scalable machine learning library. Data Sources − PySpark can read data from various sources, including HDFS, S3, HBase, and more. Partitioning − Efficiently partitions data to enhance processing speed and efficiency. Applications of PySpark PySpark is widely used in various applications, including − Data Analysis − Analyzing large datasets to extract meaningful information. Machine Learning − Implementing machine learning algorithms for predictive analytics. Data Streaming − Processing streaming data in real-time. Data Engineering − Managing and transforming big data for various use cases. Why to learn PySpark? Learning PySpark is essential for anyone interested in big data and data engineering. It offers various benefits − Scalability − Efficiently handles large datasets across distributed systems. Performance − High-speed data processing and real-time analytics. Flexibility − PySpark supports integration with various data sources and tools. Comprehensive Toolset − Includes tools for data manipulation, machine learning, and graph processing. Prerequisites to learn PySpark Before proceeding with the various concepts given in this tutorial, it is being assumed that the readers are already aware about what a programming language and a framework is. In addition to this, it will be very helpful, if the readers have a sound knowledge of Apache Spark, Apache Hadoop, Scala Programming Language, Hadoop Distributed File System (HDFS) and Python. PySpark Jobs and Opportunities Proficiency in PySpark opens up various career opportunities, such as − Data Analyst Data Engineer Python Developer PySpark Developer Data Scientist and more. Frequently Asked Questions about PySpark There are some very Frequently Asked Questions(FAQ) about PySpark, this section tries to answer them briefly. What is PySpark used for? PySpark is used for processing large-scale datasets in real-time across a distributed computing environment using Python. It also offers an interactive PySpark shell for data analysis. Describe the different ways to read data into PySpark. PySpark can read data from multiple sources, including CSV, Parquet, text files, tables, and JSON. It offers methods like format, csv(), load, and more to facilitate data reading. What is the role of partitioning in PySpark? Partitioning in PySpark helps divide a large dataset into smaller, manageable parts based on partitioning expressions, which enhances processing speed and efficiency. What is the purpose of checkpoints in PySpark? Checkpoints in PySpark are used to truncate the logical plan of a DataFrame, particularly useful in iterative algorithms where the plan can become complex and large, thereby improving performance. What is PySpark UDF? A PySpark UDF (User Defined Function) allows the creation of custom functions to apply transformations across multiple DataFrames. UDFs are deterministic

PySpark – MLlib

PySpark – MLlib ”; Previous Next Apache Spark offers a Machine Learning API called MLlib. PySpark has this machine learning API in Python as well. It supports different kind of algorithms, which are mentioned below − mllib.classification − The spark.mllib package supports various methods for binary classification, multiclass classification and regression analysis. Some of the most popular algorithms in classification are Random Forest, Naive Bayes, Decision Tree, etc. mllib.clustering − Clustering is an unsupervised learning problem, whereby you aim to group subsets of entities with one another based on some notion of similarity. mllib.fpm − Frequent pattern matching is mining frequent items, itemsets, subsequences or other substructures that are usually among the first steps to analyze a large-scale dataset. This has been an active research topic in data mining for years. mllib.linalg − MLlib utilities for linear algebra. mllib.recommendation − Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user item association matrix. spark.mllib − It ¬currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.mllib uses the Alternating Least Squares (ALS) algorithm to learn these latent factors. mllib.regression − Linear regression belongs to the family of regression algorithms. The goal of regression is to find relationships and dependencies between variables. The interface for working with linear regression models and model summaries is similar to the logistic regression case. There are other algorithms, classes and functions also as a part of the mllib package. As of now, let us understand a demonstration on pyspark.mllib. The following example is of collaborative filtering using ALS algorithm to build the recommendation model and evaluate it on training data. Dataset used − test.data 1,1,5.0 1,2,1.0 1,3,5.0 1,4,1.0 2,1,5.0 2,2,1.0 2,3,5.0 2,4,1.0 3,1,1.0 3,2,5.0 3,3,1.0 3,4,5.0 4,1,1.0 4,2,5.0 4,3,1.0 4,4,5.0 ————————————–recommend.py—————————————- from __future__ import print_function from pyspark import SparkContext from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating if __name__ == “__main__”: sc = SparkContext(appName=”Pspark mllib Example”) data = sc.textFile(“test.data”) ratings = data.map(lambda l: l.split(”,”)) .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) # Build the recommendation model using Alternating Least Squares rank = 10 numIterations = 10 model = ALS.train(ratings, rank, numIterations) # Evaluate the model on training data testdata = ratings.map(lambda p: (p[0], p[1])) predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) MSE = ratesAndPreds.map(lambda r: (r[1][0] – r[1][1])**2).mean() print(“Mean Squared Error = ” + str(MSE)) # Save and load model model.save(sc, “target/tmp/myCollaborativeFilter”) sameModel = MatrixFactorizationModel.load(sc, “target/tmp/myCollaborativeFilter”) ————————————–recommend.py—————————————- Command − The command will be as follows − $SPARK_HOME/bin/spark-submit recommend.py Output − The output of the above command will be − Mean Squared Error = 1.20536041839e-05 Print Page Previous Next Advertisements ”;

PySpark – Quick Guide

PySpark – Quick Guide ”; Previous Next PySpark – Introduction In this chapter, we will get ourselves acquainted with what Apache Spark is and how was PySpark developed. Spark – Overview Apache Spark is a lightning fast real-time processing framework. It does in-memory computations to analyze data in real-time. It came into picture as Apache Hadoop MapReduce was performing batch processing only and lacked a real-time processing feature. Hence, Apache Spark was introduced as it can perform stream processing in real-time and can also take care of batch processing. Apart from real-time and batch processing, Apache Spark supports interactive queries and iterative algorithms also. Apache Spark has its own cluster manager, where it can host its application. It leverages Apache Hadoop for both storage and processing. It uses HDFS (Hadoop Distributed File system) for storage and it can run Spark applications on YARN as well. PySpark – Overview Apache Spark is written in Scala programming language. To support Python with Spark, Apache Spark Community released a tool, PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this. PySpark offers PySpark Shell which links the Python API to the spark core and initializes the Spark context. Majority of data scientists and analytics experts today use Python because of its rich library set. Integrating Python with Spark is a boon to them. PySpark – Environment Setup In this chapter, we will understand the environment setup of PySpark. Note − This is considering that you have Java and Scala installed on your computer. Let us now download and set up PySpark with the following steps. Step 1 − Go to the official Apache Spark download page and download the latest version of Apache Spark available there. In this tutorial, we are using spark-2.1.0-bin-hadoop2.7. Step 2 − Now, extract the downloaded Spark tar file. By default, it will get downloaded in Downloads directory. # tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz It will create a directory spark-2.1.0-bin-hadoop2.7. Before starting PySpark, you need to set the following environments to set the Spark path and the Py4j path. export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7 export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH export PATH = $SPARK_HOME/python:$PATH Or, to set the above environments globally, put them in the .bashrc file. Then run the following command for the environments to work. # source .bashrc Now that we have all the environments set, let us go to Spark directory and invoke PySpark shell by running the following command − # ./bin/pyspark This will start your PySpark shell. Python 2.7.12 (default, Nov 19 2016, 06:48:10) [GCC 5.4.0 20160609] on linux2 Type “help”, “copyright”, “credits” or “license” for more information. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ ”_/ /__ / .__/_,_/_/ /_/_ version 2.1.0 /_/ Using Python version 2.7.12 (default, Nov 19 2016 06:48:10) SparkSession available as ”spark”. <<< PySpark – SparkContext SparkContext is the entry point to any spark functionality. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker nodes. SparkContext uses Py4J to launch a JVM and creates a JavaSparkContext. By default, PySpark has SparkContext available as ‘sc’, so creating a new SparkContext won”t work. The following code block has the details of a PySpark class and the parameters, which a SparkContext can take. class pyspark.SparkContext ( master = None, appName = None, sparkHome = None, pyFiles = None, environment = None, batchSize = 0, serializer = PickleSerializer(), conf = None, gateway = None, jsc = None, profiler_cls = <class ”pyspark.profiler.BasicProfiler”> ) Parameters Following are the parameters of a SparkContext. Master − It is the URL of the cluster it connects to. appName − Name of your job. sparkHome − Spark installation directory. pyFiles − The .zip or .py files to send to the cluster and add to the PYTHONPATH. Environment − Worker nodes environment variables. batchSize − The number of Python objects represented as a single Java object. Set 1 to disable batching, 0 to automatically choose the batch size based on object sizes, or -1 to use an unlimited batch size. Serializer − RDD serializer. Conf − An object of L{SparkConf} to set all the Spark properties. Gateway − Use an existing gateway and JVM, otherwise initializing a new JVM. JSC − The JavaSparkContext instance. profiler_cls − A class of custom Profiler used to do profiling (the default is pyspark.profiler.BasicProfiler). Among the above parameters, master and appname are mostly used. The first two lines of any PySpark program looks as shown below − from pyspark import SparkContext sc = SparkContext(“local”, “First App”) SparkContext Example – PySpark Shell Now that you know enough about SparkContext, let us run a simple example on PySpark shell. In this example, we will be counting the number of lines with character ”a” or ”b” in the README.md file. So, let us say if there are 5 lines in a file and 3 lines have the character ”a”, then the output will be → Line with a: 3. Same will be done for character ‘b’. Note − We are not creating any SparkContext object in the following example because by default, Spark automatically creates the SparkContext object named sc, when PySpark shell starts. In case you try to create another SparkContext object, you will get the following error – “ValueError: Cannot run multiple SparkContexts at once”. <<< logFile = “file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md” <<< logData = sc.textFile(logFile).cache() <<< numAs = logData.filter(lambda s: ”a” in s).count() <<< numBs = logData.filter(lambda s: ”b” in s).count() <<< print “Lines with a: %i, lines with b: %i” % (numAs, numBs) Lines with a: 62, lines with b: 30 SparkContext Example – Python Program Let us run the same example using a Python program. Create a Python file called firstapp.py and enter the following code in that file. —————————————-firstapp.py————————————— from pyspark import

PySpark – Serializers

PySpark – Serializers ”; Previous Next Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations. PySpark supports custom serializers for performance tuning. The following two serializers are supported by PySpark − MarshalSerializer Serializes objects using Python’s Marshal Serializer. This serializer is faster than PickleSerializer, but supports fewer datatypes. class pyspark.MarshalSerializer PickleSerializer Serializes objects using Python’s Pickle Serializer. This serializer supports nearly any Python object, but may not be as fast as more specialized serializers. class pyspark.PickleSerializer Let us see an example on PySpark serialization. Here, we serialize the data using MarshalSerializer. ————————————–serializing.py————————————- from pyspark.context import SparkContext from pyspark.serializers import MarshalSerializer sc = SparkContext(“local”, “serialization app”, serializer = MarshalSerializer()) print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)) sc.stop() ————————————–serializing.py————————————- Command − The command is as follows − $SPARK_HOME/bin/spark-submit serializing.py Output − The output of the above command is − [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] Print Page Previous Next Advertisements ”;

PySpark – Discussion

Discuss PySpark ”; Previous Next Apache Spark is written in Scala programming language. To support Python with Spark, Apache Spark community released a tool, PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this. This is an introductory tutorial, which covers the basics of Data-Driven Documents and explains how to deal with its various components and sub-components. Print Page Previous Next Advertisements ”;

PySpark – Useful Resources

PySpark – Useful Resources ”; Previous Next The following resources contain additional information on PySpark. Please use them to get more in-depth knowledge on this. Useful Video Courses PySpark and AWS: Master Big Data With PySpark and AWS 149 Lectures 16 hours Packt Publishing More Detail Building Big Data Pipelines with PySpark + MongoDB + Bokeh 25 Lectures 5 hours Edwin Bomela More Detail Big Data Analytics with PySpark + Power BI + MongoDB 27 Lectures 3.5 hours Edwin Bomela More Detail Big Data Analytics with PySpark + Tableau Desktop + MongoDB 24 Lectures 4.5 hours Edwin Bomela More Detail Learn Pyspark – Advance Course 9 Lectures 1 hours Corporate Bridge Consultancy Private Limited More Detail Learn Pyspark – Beginner Course 16 Lectures 2 hours Corporate Bridge Consultancy Private Limited More Detail Print Page Previous Next Advertisements ”;

PySpark – StorageLevel

PySpark – StorageLevel ”; Previous Next StorageLevel decides how RDD should be stored. In Apache Spark, StorageLevel decides whether RDD should be stored in the memory or should it be stored over the disk, or both. It also decides whether to serialize RDD and whether to replicate RDD partitions. The following code block has the class definition of a StorageLevel − class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1) Now, to decide the storage of RDD, there are different storage levels, which are given below − DISK_ONLY = StorageLevel(True, False, False, False, 1) DISK_ONLY_2 = StorageLevel(True, False, False, False, 2) MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1) MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2) MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1) MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2) MEMORY_ONLY = StorageLevel(False, True, False, False, 1) MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2) MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1) MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2) OFF_HEAP = StorageLevel(True, True, True, False, 1) Let us consider the following example of StorageLevel, where we use the storage level MEMORY_AND_DISK_2, which means RDD partitions will have replication of 2. ————————————storagelevel.py————————————- from pyspark import SparkContext import pyspark sc = SparkContext ( “local”, “storagelevel app” ) rdd1 = sc.parallelize([1,2]) rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 ) rdd1.getStorageLevel() print(rdd1.getStorageLevel()) ————————————storagelevel.py————————————- Command − The command is as follows − $SPARK_HOME/bin/spark-submit storagelevel.py Output − The output for the above command is given below − Disk Memory Serialized 2x Replicated Print Page Previous Next Advertisements ”;

PySpark – SparkConf

PySpark – SparkConf ”; Previous Next To run a Spark application on the local/cluster, you need to set a few configurations and parameters, this is what SparkConf helps with. It provides configurations to run a Spark application. The following code block has the details of a SparkConf class for PySpark. class pyspark.SparkConf ( loadDefaults = True, _jvm = None, _jconf = None ) Initially, we will create a SparkConf object with SparkConf(), which will load the values from spark.* Java system properties as well. Now you can set different parameters using the SparkConf object and their parameters will take priority over the system properties. In a SparkConf class, there are setter methods, which support chaining. For example, you can write conf.setAppName(“PySpark App”).setMaster(“local”). Once we pass a SparkConf object to Apache Spark, it cannot be modified by any user. Following are some of the most commonly used attributes of SparkConf − set(key, value) − To set a configuration property. setMaster(value) − To set the master URL. setAppName(value) − To set an application name. get(key, defaultValue=None) − To get a configuration value of a key. setSparkHome(value) − To set Spark installation path on worker nodes. Let us consider the following example of using SparkConf in a PySpark program. In this example, we are setting the spark application name as PySpark App and setting the master URL for a spark application to → spark://master:7077. The following code block has the lines, when they get added in the Python file, it sets the basic configurations for running a PySpark application. ————————————————————————————— from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName(“PySpark App”).setMaster(“spark://master:7077″) sc = SparkContext(conf=conf) ————————————————————————————— Print Page Previous Next Advertisements ”;

PySpark – Environment Setup

PySpark – Environment Setup ”; Previous Next In this chapter, we will understand the environment setup of PySpark. Note − This is considering that you have Java and Scala installed on your computer. Let us now download and set up PySpark with the following steps. Step 1 − Go to the official Apache Spark download page and download the latest version of Apache Spark available there. In this tutorial, we are using spark-2.1.0-bin-hadoop2.7. Step 2 − Now, extract the downloaded Spark tar file. By default, it will get downloaded in Downloads directory. # tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz It will create a directory spark-2.1.0-bin-hadoop2.7. Before starting PySpark, you need to set the following environments to set the Spark path and the Py4j path. export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7 export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH export PATH = $SPARK_HOME/python:$PATH Or, to set the above environments globally, put them in the .bashrc file. Then run the following command for the environments to work. # source .bashrc Now that we have all the environments set, let us go to Spark directory and invoke PySpark shell by running the following command − # ./bin/pyspark This will start your PySpark shell. Python 2.7.12 (default, Nov 19 2016, 06:48:10) [GCC 5.4.0 20160609] on linux2 Type “help”, “copyright”, “credits” or “license” for more information. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ ”_/ /__ / .__/_,_/_/ /_/_ version 2.1.0 /_/ Using Python version 2.7.12 (default, Nov 19 2016 06:48:10) SparkSession available as ”spark”. <<< Print Page Previous Next Advertisements ”;

PySpark – Introduction

PySpark – Introduction ”; Previous Next In this chapter, we will get ourselves acquainted with what Apache Spark is and how was PySpark developed. Spark – Overview Apache Spark is a lightning fast real-time processing framework. It does in-memory computations to analyze data in real-time. It came into picture as Apache Hadoop MapReduce was performing batch processing only and lacked a real-time processing feature. Hence, Apache Spark was introduced as it can perform stream processing in real-time and can also take care of batch processing. Apart from real-time and batch processing, Apache Spark supports interactive queries and iterative algorithms also. Apache Spark has its own cluster manager, where it can host its application. It leverages Apache Hadoop for both storage and processing. It uses HDFS (Hadoop Distributed File system) for storage and it can run Spark applications on YARN as well. PySpark – Overview Apache Spark is written in Scala programming language. To support Python with Spark, Apache Spark Community released a tool, PySpark. Using PySpark, you can work with RDDs in Python programming language also. It is because of a library called Py4j that they are able to achieve this. PySpark offers PySpark Shell which links the Python API to the spark core and initializes the Spark context. Majority of data scientists and analytics experts today use Python because of its rich library set. Integrating Python with Spark is a boon to them. Print Page Previous Next Advertisements ”;