We’ve spoken a lot about on-disk and distributed storage, as well as blocks. All of this theory is great, let’s talk about this in practice. Here’s what we’re going to do:
We’ll read a CSV dataset into Spark
We’ll write the dataset into 5 Parquet files (treating each file as a block)
We’re going to have 1 row group per block for simplicity’s sake.
We’ll introspect some of the metadata that exists on the files.
We’ll run queries that show the power of predicate pushdown.
You can follow along using the commands below.
Hands-On: Setup
I’m first going to pick a random example dataset.
$ wget https://raw.githubusercontent.com/curran/data/gh-pages/vegaExamples/airports.csv -O dataset.csv
$ head dataset.csv
iata,name,city,state,country,latitude,longitude
00M,Thigpen,Bay Springs,MS,USA,31.95376472,-89.23450472
00R,Livingston Municipal,Livingston,TX,USA,30.68586111,-95.01792778
00V,Meadow Lake,Colorado Springs,CO,USA,38.94574889,-104.5698933
01G,Perry-Warsaw,Perry,NY,USA,42.74134667,-78.05208056
01J,Hilliard Airpark,Hilliard,FL,USA,30.6880125,-81.90594389
01M,Tishomingo County,Belmont,MS,USA,34.49166667,-88.20111111
02A,Gragg-Wade,Clanton,AL,USA,32.85048667,-86.61145333
02C,Capitol,Brookfield,WI,USA,43.08751,-88.17786917
02G,Columbiana County,East Liverpool,OH,USA,40.67331278,-80.64140639
Next, I’m going to open my spark shell and read this data in, inferring it’s schema.
$ ./spark-shell
scala> val dataset = spark.read.option("header","true").option("inferSchema","true").csv("dataset.csv")
dataset: org.apache.spark.sql.DataFrame = [iata: string, name: string ... 5 more field
And let’s check out the inferred schema. We can see that spark has inferred the data type of the latitude
and longitude
column as doubles.
scala> dataset.printSchema
root
|-- iata: string (nullable = true)
|-- name: string (nullable = true)
|-- city: string (nullable = true)
|-- state: string (nullable = true)
|-- country: string (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
For purposes of this example, I’m going to force our hand a bit and write this dataset into 5 Parquet files. By writing the files in Parquet, extra metadata will be added to each file that that gives the readers information about the contents of the file.
scala> dataset.repartition(5).write.parquet("/root/parquet_dataset")
Let’s see how these files look on disk.
$ ls /root/parquet_dataset
part-00000-2c88417e-fb93-4b83-8912-0027ade7804c-c000.snappy.parquet
part-00001-2c88417e-fb93-4b83-8912-0027ade7804c-c000.snappy.parquet
part-00002-2c88417e-fb93-4b83-8912-0027ade7804c-c000.snappy.parquet
part-00003-2c88417e-fb93-4b83-8912-0027ade7804c-c000.snappy.parquet
part-00004-2c88417e-fb93-4b83-8912-0027ade7804c-c000.snappy.parquet
_SUCCESS
Let’s ignore the _SUCCESS
file for now given that it’s mostly just a flag that our write job completed successfully (and yes, it’s very possible that write jobs can fail at times).
Unfortunately, we can use something like vim or emacs to open these files, given that they are in the Parquet file format.
To aid in our investigative process, we’re going to install 2 libraries that will help us introspect Parquet data.
pip3 install parquet-tools
pip3 install parquet-metadata
Now, let’s look at one of the files. Note: the names of your part files, if you are following along at home, may be different from the names of mine above.
$ parquet-tools show part-00000-53b27d15-b049-41db-a8aa-fa3033763836-c000.snappy.parquet
You should see the data contained in this file nicely printed out.
At this point, we have 5 parquet files (5 Blocks) each with a distinct subset of our initial file.
Hands-On: Query Plans
Let’s query this data and see how the magic of partitioning can help us.
Let’s first read our CSV dataset:
scala> val dataset = spark.read.option("header","true").option("inferSchema","true").csv("dataset.csv")
And run a simple filter operation.
scala> val simpleFilter = dataset.filter($"latitude" > 30)
You can see the results with the following command:
scala> simpleFilter.show()
You can see that all of our values for the latitude
column are > 30.
Now, let’s dig a little deeper and see what’s going on behind the scenes.
res3: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
'Filter ('latitude > 30)
+- Relation[iata#16,name#17,city#18,state#19,country#20,latitude#21,longitude#22] csv
== Analyzed Logical Plan ==
iata: string, name: string, city: string, state: string, country: string, latitude: double, longitude: double
Filter (latitude#21 > cast(30 as double))
+- Relation[iata#16,name#17,city#18,state#19,country#20,latitude#21,longitude#22] csv
== Optimized Logical Plan ==
Filter (isnotnull(latitude#21) AND (latitude#21 > 30.0))
+- Relation[iata#16,name#17,city#18,state#19,country#20,latitude#21,longitude#22] csv
== Physical Plan ==
*(1) Filter (isnotnull(latitude#21) AND (latitude#21 > 30.0))
+- FileScan csv [iata#16,name#17,city#18,state#19,country...
This seems like a lot, so let me make it easier.
The parsed logical plan has our filter condition. So does the analyzed logical plan, but it has pushed the condition down a bit, and has cast our predicate value as a double (which makes sense, since schema inference has deemed that age is a double). Finally, the optimized logical plan has added some null checking—which also matches our predicate.
== Parsed Logical Plan ==
'Filter ('latitude > 30)
...
== Analyzed Logical Plan ==
...
Filter (latitude#21 > cast(30 as double))
...
== Optimized Logical Plan ==
Filter (isnotnull(latitude#21) AND (latitude#21 > 30.0))
..
Let's make our query a bit more complicated and look for a set of values in a range on the same column.
scala> val complexFilter = dataset.filter($"latitude" > 30).filter($"latitude" < 40)
Once again, let’s look at the query plan.
res4: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
'Filter ('latitude < 40)
+- Filter (latitude#21 > cast(30 as double))
+- Relation[iata#16,name#17,city#18,state#19,country#20,latitude#21,longitude#22] csv
== Analyzed Logical Plan ==
iata: string, name: string, city: string, state: string, country: string, latitude: double, longitude: double
Filter (latitude#21 < cast(40 as double))
+- Filter (latitude#21 > cast(30 as double))
+- Relation[iata#16,name#17,city#18,state#19,country#20,latitude#21,longitude#22] csv
== Optimized Logical Plan ==
Filter ((isnotnull(latitude#21) AND (latitude#21 > 30.0)) AND (latitude#21 < 40.0))
+- Relation[iata#16,name#17,city#18,state#19,country#20,latitude#21,longitude#22] csv
== Physical Plan...
Let’s simplify it again. As you can see below, the plan has combined both of our predicates into one step as part of the query process (meaning that what would previously take two passes over the data now only requires one). Meaning, we’re looking for values where latitude
is > 30 AND < 40 during the same pass of the data.
== Parsed Logical Plan ==
'Filter ('latitude < 40)
+- Filter (latitude#21 > cast(30 as double))
...
== Analyzed Logical Plan ==
...
Filter (latitude#21 < cast(40 as double))
+- Filter (latitude#21 > cast(30 as double))
...
== Optimized Logical Plan ==
Filter ((isnotnull(latitude#21) AND (latitude#21 > 30.0)) AND (latitude#21 < 40.0))
...
Let’s exit out of our spark shell and play a bit more with Parquet.
Hands-On: Querying with Parquet
Remember, we have 5 Parquet files. Let’s inspect one of them.
$ parquet-metadata /root/parquet_dataset/part-00000-53b27d15-b049-41db-a8aa-fa3033763836-c000.snappy.parquet
You will get an output that looks like the following.
file created_by parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
file columns 7
file row_groups 1
file rows 675
...
row_group 0 latitude type DOUBLE
row_group 0 latitude num_values 675
row_group 0 latitude compression SNAPPY
row_group 0 latitude encodings BIT_PACKED,PLAIN,RLE
row_group 0 latitude compressed_size 5476
row_group 0 latitude uncompressed_size 5471
row_group 0 latitude stats:min 14.1743075
row_group 0 latitude stats:max 70.46727611
...
The two things I'd like you to focus on for now are the stats:min
and stats:max
attributes.
These attributes contain the minimum and maximum values inside each specified column of the specified row_group
. That is, the smallest value in the columns of that group of rows is equal to stats:min
and the largest value is equal to stats:max
.
This information is a huge performance win! If we are running a query that falls outside of this range, this entire row group can be excluded.
Let’s say we have another file with the following metadata.
file created_by parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
file columns 7
file row_groups 1
file rows 675
...
row_group 0 latitude type DOUBLE
row_group 0 latitude num_values 674
row_group 0 latitude compression SNAPPY
row_group 0 latitude encodings BIT_PACKED,PLAIN,RLE
row_group 0 latitude compressed_size 5176
row_group 0 latitude uncompressed_size 5771
row_group 0 latitude stats:min 44.4430157
row_group 0 latitude stats:max 74.46727611
...
In our original query (where latitude
is > 30 and < 40) we are able to exclude this whole file.
scala> val complexFilter = dataset.filter($"latitude" > 30).filter($"latitude" < 40)
In practice, this is called predicate pushdown. The requirements of the predicate (the query) have been pushed down, allowing the optimizers to look at the metadata on the row groups themselves to decide which row groups to read, and when they can be ignored.
Conclusion
There is a lot of magic that goes into our ability to query data quickly and Efficiently. In future posts, we’ll cover some of these in more detail as well.