Skip to content

Latest commit

 

History

History
73 lines (51 loc) · 3.15 KB

README-Spark.md

File metadata and controls

73 lines (51 loc) · 3.15 KB

Using the SplittableGZipCodec in Apache Spark

Thanks !

Thanks to Nicholas Chammas for contributing this documentation.

Important

The current implementation of Spark (3.0.1) creating file splits DOES NOT have an option (yet) to ensure a minimum split size. An important effect of this is that you will sometimes get an error that the last split of your file is too small. The error you get looks like this:

java.lang.IllegalArgumentException: The provided InputSplit (562600;562687] is 87 bytes which is too small. (Minimum is 65536)

The problem here is that the size of the last split is really ${fileSize} % ${maxSplitBytes}

By setting the maxPartitionBytes to 1 byte below the size of a test file I was even able to get a split of only 1 byte.

Now if you run into such a situation I recommend trying to play with the spark.sql.files.maxPartitionBytes setting to ensure that the remainder of the mentioned division stays in the valid range.

This is by no means a real solution but at this point in time it seems the only option available.

I have submitted a request/proposal for a good solution at the Spark project: https://issues.apache.org/jira/browse/SPARK-33534

Common problem for Spark users

Apparently the fact that GZipped files are not splittable is also in the Spark arena a recurring problem as you can see in this Spark Jira ticket and these two questions on StackOverflow Question 1 Question 2.

It turns out that this library works with Apache Spark without modification.

Using it

Here is an example, which was tested against Apache Spark 2.4.4 using the Python DataFrame API:

# splittable-gzip.py
from pyspark.sql import SparkSession


if __name__ == '__main__':
    spark = (
        SparkSession.builder
        # If you want to change the split size, you need to use this config
        # instead of mapreduce.input.fileinputformat.split.maxsize.
        # I don't think Spark DataFrames offer an equivalent setting for
        # mapreduce.input.fileinputformat.split.minsize.
        .config('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
        .getOrCreate()
    )

    print(
        spark.read
        # You can also specify this option against the SparkSession.
        .option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
        .csv(...)
        .count()
    )

Run this script as follows:

spark-submit --packages "nl.basjes.hadoop:splittablegzip:1.3" splittable-gzip.py

Here's what the Spark UI looks like when running this script against a 20 GB gzip file on a laptop:

Spark UI

You can see in the task list the behavior described in the README that each task is reading from the start of the file to its target split.

Also in the Executor UI you can see every available core running concurrently against this single file:

Spark Executor UI