Skip to content


Folders and files

Last commit message
Last commit date

Latest commit



13 Commits

Repository files navigation


This Spark SQL UDF converts ip addresses to geo locations on a JDK 8 environment. The MaxMind's geoip database is stored as a local file.

The UDF can be registered as a Spark SQL UDF or can be called in a dataframe's mapPartition.

Existing projects such as MaxMind GeoIP 2 Java and snowplow's Scala MaxMind require Java 11 or greater and are not always feasible (certain instances of AWS Glue still require Java 8)

This project creates the Spark UDF function which runs on JDK 8


  • sbt compile to compile the jar
  • sbt assembly to create the assembly jar to distribute on the Spark cluster

Version 0.2

Scala version is set to 2.12.15 Spark version is set to 3.1.1

Requirements for Spark 3.1.1

  • Spark runs on Java 8/11, Scala 2.12,
  • For the Scala API, Spark 3.1.1 uses Scala 2.12. You will need to use a compatible Scala version (2.12.x).
  • spark-core has a transitive dependency on hadoop-common 3.2

Version 0.3

Scala version is set to 2.12.15 Spark version is set to 3.3.0

Requirements for Spark 3.3.0

  • Spark runs on Java 8/11/17, Scala 2.12/2.13, Python 3.7+ and R 3.5+.

  • Java 8 prior to version 8u201 support is deprecated as of Spark 3.2.0.

  • For the Scala API, Spark 3.3.0 uses Scala 2.12. You will need to use a compatible Scala version (2.12.x).

  • In case you want to run the local integration test (local Hadoop and Spark), pay attention to the Hadoop cluster version. Hadoop is sensitive to versions of the different components.

Test Environment

Note: To run the integration test, JDK8 environment is needed.

  • sbt test to run the local test (no Spark involved)
  • sbt IntegrationTest/testOnly sets up a local Hadoop/Spark cluster (no docker)

The integration test starts a local Hadoop / Spark node. The initialization of the mini cluster loads some sample data files src/it/resources/ips as parquet files and registers the (free downloaded MaxMind database)[] to the Spark environment.

if (SystemUtils.IS_OS_WINDOWS)
      startHdfs(new File("c:\\tmp")) 
      // on Windows, use a tmp folder without spaces

    copyFromLocal("src/it/resources/ips", "/ips")
    val spark: SparkSession = getSpark()


The integration test will use the 2 different methods:


df.mapPartitions( iterator => {
      val _lookup: Option[IpCityLookup] = Some(new IpCityLookup(new File(sparkFilename), 1000))
      // row.getString is IP address row => {
        val ipLocationShort: Option[IpLocationShort] = IpCityLookupRegister.ip2location(_lookup, row.getString(0))
        (row.getString(0), ipLocationShort)


  // from the table `ips`,  call ip2location in the SQL query 
  val dfWithLocation = spark.sql("""SELECT genericIp, ip2location(genericIp) as location FROM ips""")

Windows: Integration Test on Hadoop Home

Winutils is required when installing Hadoop on Windows environment. contains versions from 2.6 to 3.0 contains versions from 2.6.1 to 3.3.5 (3.1.0)

Once you have installed the correct version and added those to your Windows PATH, the integration test will run,

REM set environment
set JAVA_HOME=c:\java\jdk-8
set HADOOP_HOME=d:\hadoop\3.1.0

REM compile, test and build assembly
sbt IntegrationTest/test assembly

Python: How to register UDF in pyspark

# add MaxMind Geo file to Spark

# register the Scala UDF, "ip2location", "GeoLite2-City.mmdb")

# read a dataframe in a Spark view"data/.../ips").createOrReplaceTempView("ips")

# use the registered UDF in SQL 
spark.sql("SELECT genericIp, ip2location(genericIp) as location FROM ips").show(10, False)