Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structs as values in maps not being properly read from Parquet files #372

Open
Inaki-Martin opened this issue Feb 27, 2018 · 0 comments
Open

Comments

@Inaki-Martin
Copy link

Given the following example structure

val childStructType = StructType(
    Field("childFoo", StringType),
    Field("childBar", ArrayType(StringType), nullable = true)
)
val parentStructType = StructType(
    Field("foo", StringType),
    Field("children", MapType(StringType, childStructType))
)

persisting a row to a Parquet file works fine but reading the content returns null values in the child structure fields. The following is a full class to reproduce the issue:

import io.eels.component.parquet.{ParquetSink, ParquetSource}
import io.eels.datastream.DataStream
import io.eels.schema._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

object EelBugReproductionTest extends App {

private implicit val conf = new Configuration()
private implicit val fs = FileSystem.get(new Configuration())

case class Child(foo: String, bar: Array[String]) {
    override def toString: String = {
        s"$foo :: ${bar.mkString(",")}"
    }
}
case class Parent(foo: String, children: Map[String, Child]) {
    override def toString: String = {
        s"$foo :: ${children.mkString(" >> ")}"
    }
}

val childStructType = StructType(
    Field("foo", StringType),
    Field("bar", ArrayType(StringType))
)
val parentStructType = StructType(
    Field("foo", StringType),
    Field("children", MapType(StringType, childStructType))
)

def toSeq(c: Child): Seq[Any] = Seq(c.foo, c.bar)

def toDataStream(p: Parent) = {
    val values = Seq(p.foo, p.children.mapValues(toSeq))
    DataStream.fromValues(parentStructType, Seq(values))
}

def write(p: Parent, path: Path): Unit = {
    if (fs.exists(path)) fs.delete(path, false)
    toDataStream(p).to(ParquetSink(path))
}

def toChild(values: Seq[Any]): Child = {
    val foo = values(0).asInstanceOf[String]
    val bar = values(1).asInstanceOf[Vector[String]].toArray
    Child(foo, bar)
}

def readParent(path: Path): Parent = {
    val ps = ParquetSource(path)
    val row = ps.toDataStream.head
    val foo = row.values(0).asInstanceOf[String]
    val children = row.values(1).asInstanceOf[Map[String, Seq[Any]]].mapValues(toChild)
    Parent(foo, children)
}

val path = new Path("test.pq")
if (fs.exists(path)) fs.delete(path, false)

val child1 = Child("foo1", Array("bar11", "bar12"))
val child2 = Child("foo2", Array("bar21", "bar22"))
val parent = Parent("foo1", Map("child1" -> child1, "child2" -> child2))

write(parent, path)
println(readParent(path))

}

I have managed to fix the issue locally by amending RowReadSupport.scala as per the following diff:

diff --git a/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala b/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala
index 7ec2501e..ebdb32a3 100644
--- a/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala
+++ b/eel-core/src/main/scala/io/eels/component/parquet/RowReadSupport.scala
@@ -132,11 +132,13 @@ class MapConverter(index: Int,

private val keys = new VectorBuilder()
private val values = new VectorBuilder()

  • private val keysConverter = Converter(mapType.keyType, false, -1, keys)

  • private val valuesConverter = Converter(mapType.valueType, false, -1, values)

    override def getConverter(fieldIndex: Int): Converter = new GroupConverter {
    override def getConverter(fieldIndex: Int): Converter = fieldIndex match {

  •  case 0 => Converter(mapType.keyType, false, -1, keys)
    
  •  case 1 => Converter(mapType.valueType, false, -1, values)
    
  •  case 0 => keysConverter
    
  •  case 1 => valuesConverter
    
    }
    override def start(): Unit = ()
    override def end(): Unit = () // a no-op as each nested group only contains a single element and we want to handle the finished list
    ~

I'd appreciate if you could take a look both at the issue and my amendment and merge it if it makes sense to you. Thanks

Regards,
Iñaki

@Inaki-Martin Inaki-Martin changed the title Structs as values in maps not being properly read from Spark files Structs as values in maps not being properly read from Parquet files Feb 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant