When diving into the world of data processing, working with Spark offers an unparalleled edge for handling massive datasets with speed and efficiency. Whether you’re new to distributed computing or a seasoned data engineer, getting ready to harness Spark’s full potential is an exciting journey that will revolutionize the way you approach data challenges.
So, how do you do it? Understanding Spark’s architecture, its resilient distributed datasets (RDDs) and its powerful APIs is the first step to unlocking its capabilities. As you begin working with Spark, you will see how it seamlessly scales across clusters, offering lightning-fast computations even with complex data pipelines. But that’s not all there’s more to explore, from real time streaming to machine learning integration, making Spark a dynamic tool for any data-driven business. For those looking to push boundaries and dive deeper into advanced Spark techniques, see also related technologies like Hadoop, Kafka and Delta Lake that complement Spark’s power and versatility.
Table of Content
- Working with spark data
- Getting ready
- How to do it
- How it works
- There’s more
- See also
Working With Spark Data
When working with large datasets we sometimes need to rely on distributed
resources to clean and manipulate our data. With Apache Spark, analysts can
take advantage of the combined processing power of many machines. We
will use PySpark, a Python API for working with Spark, in this recipe. We
will also go over how to use PySpark tools to take a first look at our data,
select parts of a our data and generate some simple summary statistics.
Getting Ready
To run the code in this section you need to get Spark running on your
computer. If you have installed Anaconda you can follow these steps to work
with Spark.
- Install Java with conda install openjdk.
- Install PySpark with conda install pyspark or
conda install -c conda forge pyspark. - Install findspark with conda install -c conda-forge findspark
We will work with the land temperature data from section 1 and the
candidate news data from this section. All data and the code we will be
running in this recipe are available in the GitHub repository for this section.
This dataset, taken from the Global Historical Climatology
Network integrated database, is made available for public use by
the United States National Oceanic and Atmospheric
Administration at: (https://www.ncdc.noaa.gov/data-access/land-
based-station-data/land-based-datasets/global-historical-
climatology-network-monthly-version-4.)
- Let’s start a Spark session and load the land temperatures data. We can
use the read method of the session object to create a Spark DataFrame.
We indicate that the first row of the CSV file we are importing has a
header.
from pyspark.sql import SparkSession
spark = SparkSession.builder \.getOrCreate()
landtemps = spark.read.option("header",True) \.csv("data/landtemps.csv")
type(landtemps)
Output:
pyspark.sql.dataframe.DataFrame
Now Note that the read method returns a Spark DataFrame not a pandas
DataFrame. We will need to use different methods to view our data than those we have used so far.We load the full dataset not just a 100,000 row
sample as we did in the first section. If your system is low on resources, you
can import the landtempssample.csv file instead.
- We should take a look at the number of rows and the column names and
data types that were imported. The temp column was read as string. It
should be a float. We will fix that in a later step.
landtemps.count()
Output:
16904868
landtemps.printSchema()
Output:
root
|-- locationid: string (nullable = true)
|-- year: string (nullable = true)
|-- month: string (nullable = true)
|-- temp: string (nullable = true)
|-- latitude: string (nullable = true)
|-- longitude: string (nullable = true)
|-- stnelev: string (nullable = true)
|-- station: string (nullable = true)
|-- countryid: string (nullable = true)
|-- country: string (nullable = true)
- Let’s look at the data for a few rows. We can choose a subset of the
columns by using the select method.
landtemps.select("station",'country','month','year','temp') \.show(5, False)
Output:
+-------+-------------------+-----+----+-----+
|station|country |month|year|temp |
+-------+-------------------+-----+----+-----+
|SAVE |Antigua and Barbuda|1 |1961|-0.85|
|SAVE |Antigua and Barbuda|1 |1962|1.17 |
|SAVE |Antigua and Barbuda|1 |1963|-7.09|
|SAVE |Antigua and Barbuda|1 |1964|0.66 |
|SAVE |Antigua and Barbuda|1 |1965|0.48 |
+-------+-------------------+-----+----+-----+
only showing top 5 rows
We should fix the the data type of the temp column. We can use the
withColumn function to do a range of column operations in Spark. Here we use it to cast the temp column to float.
landtemps = landtemps \
.withColumn("temp",landtemps.temp.cast('float'))
landtemps.select("temp").dtypes
Output:
[('temp', 'float')]
- Now we can run summary statistics on the temp variable. We can use
the described method for that.
landtemps.describe('temp').show()
Output:
+-------+------------------+
|summary| temp|
+-------+------------------+
| count| 14461547|
| mean|10.880725773138536|
| stddev|11.509636369381685|
| min| -75.0|
| max| 42.29|
+-------+------------------+
- The Spark session’s read method can import a variety of different data
files not just CSV files. Let’s try that with the allcandidatenews JSON
file that we worked with earlier in this section.
allcandidatenews = spark.read \
.json("data/allcandidatenewssample.json")
allcandidatenews \
.select("source","title","story_position") \.show(5)
Output:
+--------------------+--------------------+--------------+
| source | title |story_positio
n|
+--------------------+--------------------+--------------+
| NBC News |Bloomberg cuts ti... |
6|
|Town & Country Ma... |Democratic Candid...| 3|
| null | null | nul
l|
| TheHill |Sanders responds ... |
7|
| CNBC.com |From Andrew Yang'...| 2|
+--------------------+--------------------+--------------+
only showing top 5 rows
- We can use the count and printSchema methods again to look at our
data.
allcandidatenews.count()
Output:
60000
allcandidatenews.printSchema()
Output:
root
|-- category: string (nullable = true)
|-- date: string (nullable = true)
|-- domain: string (nullable = true)
|-- panel_position: string (nullable = true)
|-- query: string (nullable = true)
|-- reason: string (nullable = true)
|-- source: string (nullable = true)
|-- story_position: long (nullable = true)
|-- time: string (nullable = true)
|-- title: string (nullable = true)
|-- url: string (nullable = true)
- We can also generate some summary statistics on the story_position
variable.
allcandidatenews \
.describe('story_position') \.show()
Output:
+-------+-----------------+
|summary| story_position|
+-------+-----------------+
| count| 57618|
| mean|5.249626852719636|
| stddev|2.889001922195635|
| min| 1|
| max| 10|
+-------+-----------------+
These steps demonstrate how to import data files into a Spark DataFrame
and then view the structure of the data and generate summary statistics.
How It Works
The PySpark API significantly reduces the amount of work Python
programmers have to do to use Apache Spark to handle large data files. We
get methods to work with that are not very different from the methods we use
with pandas DataFrames. We can see the number of rows and columns,
examine and change data types and get summary statistics.
There’s More
At some point in our analysis we might want to convert the Spark DataFrame
into a pandas DataFrame. This is a fairly expensive process and we will lose
the benefits of working with Spark, so we typically will not do that unless we
are at the point of our analysis when we require the pandas library or a
library that depends on pandas. But when we need to move to pandas, it is
very easy to do -- though if you are working with a lot of data and your
machine’s processer and ram are not exactly top of the line, you might want
to start the conversion and then go have some tea or coffee. The following
code converts the allcandidatenews Spark DataFrame that we created to a
pandas DataFrame and displays the resulting DataFrame structure.
allcandidatenewsdf = allcandidatenews.toPandas()
allcandidatenewsdf.info()
Output:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 60000 entries, 0 to 59999
Data columns (total 11 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 category 416 non-null object
1 date 60000 non-null object
2 domain 57618 non-null object
3 panel_position 57618 non-null object
4 query 57618 non-null object
5 reason 2382 non-null object
6 source 57618 non-null object
7 story_position 57618 non-null float64
8 time 57618 non-null object
9 title 57618 non-null object
10 url 57618 non-null object
dtypes: float64(1), object(10)
memory usage: 5.0+ MB
We have been largely working with non-traditional data stores in this section. JSON files, data from HTML pages and Spark files. Often we reach a point
in our data cleaning work that it makes sense to preserve the results of that
cleaning by persisting data. At the end of section 1 we examined how to
persist tabular data. That works fine in cases where our data can be captured
well with columns and rows. When it cannot, say when we are working with
a JSON file that has complicated sub documents, we might want to preserve
that structure when persisting data. In th next recipe we go over persisting
JSON data.
Conclusion
In conclusion, working with Spark data unlocks immense potential for handling large scale data processing with speed and efficiency. By getting ready and understanding the fundamental steps, you can smoothly navigate through the setup and execution of Spark, empowering your business with data driven insights. The "how to do it" and "how it works" sections serve as the practical guide to using Spark’s distributed computing power, making complex tasks simpler and more accessible. But the journey doesn’t end here there’s always more to explore with Spark, from advanced optimizations to integrations with other big data tools. To expand your knowledge further, check out additional resources and deepen your expertise in this powerful technology that continues to revolutionize the way businesses handle data.