mapPartitions in a PySpark Dataframe

Carlos Gameiro
2 min readNov 22, 2021

--

It’s now possible to apply map_partitionsdirectly to a PySpark dataframe, instead of a RDD. The API is very similar to Python’s DASK library.

This functionality is especially useful to take advantage of the performance provided by vectorized functions, when multiple columns need to be accessed, or when the output of a the process is a dataframe instead of a single column or scalar.

This can be achieved with PySpark’s Pandas UDF functions, mainly the applyInPandasmethod. The problem is that this process must be performed over grouped data, and for some datasets this might not make sense, since there might be no natural way to create even groups.

To match the same level of functionality of DASK’s map_partitions the fast solution is to group the data according to the partition it already belongs to.

This can be achieved through the spark_partition_id function.

df.groupBy(f.spark_partition_id()).applyInPandas(custom_pandas_udf)

Example

For this example I’m going to use an airport dataset from OurAirports.com.

Each airport has a latitude and longitude WGS84 coordinate, I’m going to select only airports in the US, perform a cartesian product (cross join) to get all pairs of airports (approximately 783 million), and then calculate the great-circle distance with the Haversine formula using a vectorized NumPy function.

Disclaimer: the same result could probably be achieved faster using PySpark’s native functions. But the purpose of this example is to showcase the potencial of Pandas UDF, mapPartitions and vectorized functions.

  1. Preprocessing

Download dataset, cast data types, filter airports in the US, save as Delta.

2. Cross join

Before performing the cross join it is necessary to change the number of partitions manually. Spark does not change this value dynamically because it cannot know how many rows the final dataframe will have without triggering the computation and counting each dataframe.

27 984 airports ^ 2 / 120 partitions = 6 525 868 rows per partition

3. mapPartitions.

  • It takes less than 7 minutes to calculate 783 million distances in a 2 node cluster.
Preview of the result.

--

--