Typed Encoders in Frameless

Spark uses Reflection to derive its Encoders, which is why they can fail at run time. For example, because Spark does not support java.util.Date, the following leads to an error:

import org.apache.spark.sql.Dataset
import spark.implicits._

case class DateRange(s: java.util.Date, e: java.util.Date)
scala> val ds: Dataset[DateRange] = Seq(DateRange(new java.util.Date, new java.util.Date)).toDS()
java.lang.UnsupportedOperationException: No Encoder found for java.util.Date
- field (class: "java.util.Date", name: "s")
- root class: "DateRange"
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:591)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:432)
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$6(ScalaReflection.scala:577)
  at scala.collection.immutable.List.map(List.scala:293)
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerFor$1(ScalaReflection.scala:562)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:432)
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$serializerForType$1(ScalaReflection.scala:421)
  at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:904)
  at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:903)
  at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerForType(ScalaReflection.scala:413)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:56)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:285)
  at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:251)
  at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:251)
  at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32)
  ... 42 elided

As shown by the stack trace, this runtime error goes through ScalaReflection to try to derive an Encoder for Dataset schema. Beside the annoyance of not detecting this error at compile time, a more important limitation of the reflection-based approach is its inability to be extended for custom types. See this Stack Overflow question for a summary of the current situation (as of 2.0) in vanilla Spark: How to store custom objects in a Dataset?.

Frameless introduces a new type class called TypeEncoder to solve these issues. TypeEncoders are passed around as implicit parameters to every Frameless method to ensure that the data being manipulated is Encoder. It uses a standard implicit resolution coupled with shapeless' type class derivation mechanism to ensure every that compiling code manipulates encodable data. For example, the java.util.Date example won't compile with Frameless:

import frameless.TypedDataset
import frameless.syntax._
val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(new java.util.Date, new java.util.Date)))
// <console>:28: error: could not find implicit value for parameter encoder: frameless.TypedEncoder[DateRange]
//        val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(new java.util.Date, new java.util.Date)))
//                                                             ^

Type class derivation takes care of recursively constructing (and proving the existence of) TypeEncoders for case classes. The following works as expected:

case class Bar(d: Double, s: String)
// defined class Bar

case class Foo(i: Int, b: Bar)
// defined class Foo

val ds: TypedDataset[Foo] = TypedDataset.create(Seq(Foo(1, Bar(1.1, "s"))))
// ds: frameless.TypedDataset[Foo] = [i: int, b: struct<d: double, s: string>]

ds.collect()
// res1: frameless.Job[Seq[Foo]] = frameless.Job$$anon$4@77bca0d3

But any non-encodable in the case class hierarchy will be detected at compile time:

case class BarDate(d: Double, s: String, t: java.util.Date)
case class FooDate(i: Int, b: BarDate)
val ds: TypedDataset[FooDate] = TypedDataset.create(Seq(FooDate(1, BarDate(1.1, "s", new java.util.Date))))
// <console>:30: error: could not find implicit value for parameter encoder: frameless.TypedEncoder[FooDate]
//        val ds: TypedDataset[FooDate] = TypedDataset.create(Seq(FooDate(1, BarDate(1.1, "s", new java.util.Date))))
//                                                           ^

It should be noted that once derived, reflection-based Encoders and implicitly derived TypeEncoders have identical performance. The derivation mechanism is different, but the objects generated to encode and decode JVM objects in Spark's internal representation behave the same at runtime.

results matching ""

    No results matching ""