Aniruddha Bhandari—Posted November 17, 2020, last modified November 23, 2020
Overview
- Get Familiar with PySpark Integration with Google Colab
- We will also see how to perform data exploration with PySpark in Google Colab
Introduction
Google Colab is a lifeline for data scientists when it comes to working with large data sets and running complex models.
While for data engineers, PySpark is just a demigod.
So what happens if we take these two, each one of the best players in their respective category, and combine them?
We have the (almost) perfect solution for all your data science and machine learning problems!
In this article we will see how we can run PySpark in a Google Collaboratory notebook. We'll also perform some basic data exploration tasks that are common to most data science problems. So let's break up!
Note: I am assuming that you are already familiar with the basics of Spark and Google Colab. Otherwise, I recommend reading the following articles before reading this one:
- PySpark for beginners
- Introduction to Google Collaborator
Index
- Connection from Google Drive to Colab
- Reading data from Google Drive
- Configure PySpark in Google Colab
- Load data into PySpark
- data understanding
- Data exploration with PySpark data framework
- show column details
- show rows
- Number of rows in the data frame
- show specific columns
- describe the columns
- Unique values for categorical columns
- Add with Groupby
- Count and remove zero values
- Save as
Connecting the unit to Colab
The first thing you'll want to do when working with Colab is to set up your Google Drive. This allows you to access any directory on your drive within the Colab notebook.
fromgoogle.colabimportdrive.mount('/content/drive')
Once you've done that, the next obvious step is to load the data.
Bonus: You can find some awesome hacks for Google ColabIn this article!
Read drive data
Now I am assuming that you will be working with a large enough data set. So the best way to send data to Drive is in ZIP format. Just drag and drop your ZIP folder to any directory in Drive.
Unpacking this data is not a problem. You just need to provide the path to the ZIP folder along with the !unzip command.
!unzip"/content/drive/MyDrive/AVarticles/PySparkonColab/black_friday_train.zip"
If you're not sure exactly where the folder is, you can check the Colab side panel.
Alright, let's set up Spark
Configuring PySpark in Colab
Spark is written in the Scala programming language and requires a Java Virtual Machine (JVM) to run. So our first task is to download Java.
!apt-getinstallopenjdk-8-jdk-headless-qq>/dev/null
Next we will install Apache Spark 3.0.1 with Hadoop 2.7 fromhere.
!wget-qhttps://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
Now all we have to do is unzip this folder.
!tarxfspark-3.0.1-bin-hadoop2.7.tgz
Note: At the time of writing this article, the latest version of Apache Spark is 3.0.1. But Spark is developing rapidly. So if there is a newer version of Spark when you run this code, just replace 3.0.1 with the newer version everywhere you see it.
There is one last thing we need to install, and that is thisfind cuttingsLibrary. It finds Spark on the system and imports it as a normal library.
!pipinstall-qfindspark
Now that we have all the required dependencies installed in Colab, it's time to set the environment path. This allows us to run Pyspark in the Colab environment.
importados.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"os.environ["SPARK_HOME"]="/content/spark-3.0.1-bin-hadoop2.7"
It's time for the real test!
We need to find Spark in the system. To do this, we import findpark and use the findpark.init() method.
importfindsparkfindspark.init()
Bonus: If you want to know where Spark is installed, use findpark.find()
findpark.find()
Now we can importSparkSessionoutsidepyspark.sqland creates a SparkSession which is the entry point for Spark.
You can name the session with appName() and add some configuration with config() if you want.
frompyspark.sqlimportSparkSessionspark=SparkSession.builder\.master("local")\.appName("Colab")\.config('spark.ui.port','4050')\.getOrCreate()
Finally, print the SparkSession variable.
Funke
If all goes well, you should be able to see the above output.
If you want to display the Spark UI, you need to add a few more lines of code to create a public URL for the UI page.
!wgethttps://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip!unzipngrok-stable-linux-amd64.zipget_ipython().system_raw('./ngrokhttp4050&')!curl-shttp: //localhost:4040/api/túneles
You should now be able to see the jobs and their steps in the link created.
Great! Let's get started with PySpark now!
Loading data into PySpark
First we need to load the dataset. we will use thoseler.csvModule. THEinferSchemaThe supplied parameter allows Spark to automatically determine the data type for each column, but it needs to examine the data once. If you don't want this, you can explicitly specify the schema in theThe planParameter.
df=spark.read.csv("tren.csv",header=True,inferSchema=True)
This creates a Spark dataframe.
Bonus: There are multiple data sources in Spark and you can learn all about themIn this article!
data understanding
we have thatblack friday recordhere from the DataHack platform. There are purchase summaries for various customers of a retail store in the last month. We receive customer demographics, purchase details, and the total amount of the purchase. The objective is to predict the purchase value per customer in relation to different products.
Data exploration with PySpark DF
Now is the time to use the data framework capabilities of PySpark to explore our data. And along the way, we'll keep comparing it to pandas dataframes.
show column details
The first step in an exploratory data analysis is to review the data frame schema. This gives you a bird's-eye view of the columns in the data frame along with their data types.
df.printScheme()
show rows
Now, of course, you also want to have an overview of the current data.
Just like in the Panda dataframe, you have thedf.head()function, here it isShow()Occupation. You can specify the number of lines to print in parentheses.
df.show(5)
Number of lines in the DF
If you want to know the total number of rows in the dataframe, what would you do, just use thetell()Occupation.
df.count()
550068
show specific columns
Sometimes you may want to display specific data frame columns. You can use Spark's SQL capabilities to do this.
Use ofto choose()function, you can specify the columns you want to display.
df.select("user_id","gender","age","occupation").show(5)
describe the columns
When we work with number resources, we often want to see statistics related to the data frame. EITHERdescribe()function is best suited for such purposes.
It's quite similar to Panda's describe function, but the statistical values are much smaller, and it also describes columns of strings.
df.describe().show()
Working with PySpark at Google Colab for Data Scientists!
Unique values for categorical columns
odistinguishable()It is useful when you want to determine the unique values in the categorical columns in the data frame.
df.select("City_Kategorie").distinct().show()
Add with Groupby
we can use thosegroup byFunction to group the dataframe column values and then apply an aggregate function to them to get useful information.
Here we can summarize the different city categories in the data frame and determine the total purchase by city category. For that we have to use theSomaoverall function ofthe Spark SQL function module.
frompyspark.sqlimportfunctionsasFdf.groupBy("City_Category").agg(F.sum("Compra")).show()
Count and remove zero values
Now we all know that real world data does not ignore missing values. Therefore, it is advisable to always check for missing values and remove them if necessary.
df.select([F.count(F.when(F.isnull(c),c)).alias(c)forcindf.columns]).show()
We have some columns with null values. So it's better to replace them with some values. Based on our dataset, a null value in the Product Category column could mean that the user did not purchase the product. So it's better to replace the null value with 0.
we will use thosefill()Function to replace null values. Since Spark data frames are immutable, we need to store the result in a new data frame.
df=df.fillna({'ProductCategory_2':0,'ProductCategory_3':0})
We can check again for null values to verify the change.
df.select([F.count(F.when(F.isnull(c),c)).alias(c)forcindf.columns]).show()
Perfect! There are no null values in the data frame anymore.
Save as
Finally, after all the analysis, if you want to save your results to a new CSV file, you can do so with the write.csv() function.
df.write.csv("/content/drive/MyDrive/AVarticles/PySparkonColab/preprocessed_data")
But there is a problem here. Not just a single CSV file is saved, but several, depending on the number of partitions in the data frame. So if there are 2 partitions, two CSV files will be saved for each partition.
df.rdd.getNumPartitions()
2
Bonus: I converted the Spark dataframe to an RDD here. What is the difference between the two? MoneyThis article!
However, this is not very convenient when we need to reload these files. This is how we can convert Spark df to Good Old Fashionedpandasdf and then use the usuala_csv()Method to save the results.
#SparkdftoPandasdfdf_pd=df.toPandas()#Storeresultdf_pd.to_csv("/content/drive/MyDrive/AVarticles/PySparkonColab/pandas_preprocessed_data.csv")
closing words
I hope you had as much fun working with PySpark in Colab as I did writing this article!
This is not an exhaustive article on PySpark's dataframe capabilities. For that, you can read this amazing article about it.Data frames make PySpark🇧🇷 And if you want to go further and build a machine learning model with PySpark, we highly recommend checking it out.This article!
Related
data technologygoogle coPySparkNombrepyspark co