Typed Encoders in Frameless

Spark uses Reflection to derive its Encoders, which is why they can fail at run time. For example, because Spark does not support java.util.Calendar, the following leads to an error:

import java.util.Calendar

import org.apache.spark.sql.Dataset

import spark.implicits._

case class DateRange(s: Calendar, e: Calendar)
def now = new java.util.GregorianCalendar()

val ds: Dataset[DateRange] = Seq(DateRange(now, now)).toDS()
// org.apache.spark.SparkUnsupportedOperationException: [ENCODER_NOT_FOUND] Not found an encoder of the type java.util.Calendar to Spark SQL internal representation. Consider to change the input type to one of supported at 'https://spark.apache.org/docs/latest/sql-ref-datatypes.html'.
// 	at org.apache.spark.sql.errors.ExecutionErrors.cannotFindEncoderForTypeError(ExecutionErrors.scala:172)
// 	at org.apache.spark.sql.errors.ExecutionErrors.cannotFindEncoderForTypeError$(ExecutionErrors.scala:167)
// 	at org.apache.spark.sql.errors.ExecutionErrors$.cannotFindEncoderForTypeError(ExecutionErrors.scala:218)
// 	at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:400)
// 	at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$encoderFor$3(ScalaReflection.scala:394)
// 	at scala.collection.immutable.List.map(List.scala:293)
// 	at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:382)
// 	at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$encoderFor$1(ScalaReflection.scala:247)
// 	at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
// 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:426)
// 	at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:425)
// 	at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:42)
// 	at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:244)
// 	at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:227)
// 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:51)
// 	at org.apache.spark.sql.Encoders$.product(Encoders.scala:315)
// 	at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:264)
// 	at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:264)
// 	at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32)
// 	at repl.MdocSession$MdocApp0$$anonfun$1.apply$mcV$sp(TypedEncoder.md:49)
// 	at repl.MdocSession$MdocApp0$$anonfun$1.apply(TypedEncoder.md:47)
// 	at repl.MdocSession$MdocApp0$$anonfun$1.apply(TypedEncoder.md:47)

As shown by the stack trace, this runtime error goes through ScalaReflection to try to derive an Encoder for Dataset schema. Beside the annoyance of not detecting this error at compile time, a more important limitation of the reflection-based approach is its inability to be extended for custom types. See this Stack Overflow question for a summary of the current situation (as of 2.0) in vanilla Spark: How to store custom objects in a Dataset?.

Frameless introduces a new type class called TypeEncoder to solve these issues. TypeEncoders are passed around as implicit parameters to every Frameless method to ensure that the data being manipulated is Encoder. It uses a standard implicit resolution coupled with shapeless' type class derivation mechanism to ensure every that compiling code manipulates encodable data. For example, the java.util.Calendar example won't compile with Frameless:

import frameless.TypedDataset
import frameless.syntax._
def now = new java.util.GregorianCalendar()

val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(now, now)))
// error: ds is already defined as value ds
// val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(now, now)))
//     ^^
// error: ambiguous reference to overloaded definition,
// both method now in object MdocApp0 of type => java.util.GregorianCalendar
// and  method now in object MdocApp0 of type => java.util.GregorianCalendar
// match expected type java.util.Calendar
// val ds: Dataset[DateRange] = Seq(DateRange(now, now)).toDS()
//                                            ^^^
// error: ambiguous reference to overloaded definition,
// both method now in object MdocApp0 of type => java.util.GregorianCalendar
// and  method now in object MdocApp0 of type => java.util.GregorianCalendar
// match expected type java.util.Calendar
// val ds: Dataset[DateRange] = Seq(DateRange(now, now)).toDS()
//                                                 ^^^
// error: ambiguous reference to overloaded definition,
// both method now in object MdocApp0 of type => java.util.GregorianCalendar
// and  method now in object MdocApp0 of type => java.util.GregorianCalendar
// match expected type java.util.Calendar
// val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(now, now)))
//                                                                     ^^^
// error: ambiguous reference to overloaded definition,
// both method now in object MdocApp0 of type => java.util.GregorianCalendar
// and  method now in object MdocApp0 of type => java.util.GregorianCalendar
// match expected type java.util.Calendar
// val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(now, now)))
//                                                                          ^^^
// error: could not find implicit value for parameter encoder: frameless.TypedEncoder[repl.MdocSession.MdocApp0.DateRange]
// val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(now, now)))
//                                                      ^
// error: method now is defined twice;
//   the conflicting method now was defined at line 11:5
// def now = new java.util.GregorianCalendar()
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Type class derivation takes care of recursively constructing (and proving the existence of) TypeEncoders for case classes. The following works as expected:

case class Bar(d: Double, s: String)
case class Foo(i: Int, b: Bar)

val ds: TypedDataset[Foo] = 
  TypedDataset.create(Seq(Foo(1, Bar(1.1, "s"))))
// ds: TypedDataset[Foo] = [i: int, b: struct<d: double, s: string>]

ds.collect()
// res3: frameless.Job[Seq[Foo]] = frameless.Job$$anon$4@759e2669

But any non-encodable in the case class hierarchy will be detected at compile time:

case class BarDate(d: Double, s: String, t: java.util.Calendar)
case class FooDate(i: Int, b: BarDate)
val ds: TypedDataset[FooDate] = TypedDataset.create(
  Seq(FooDate(1, BarDate(1.1, "s", new java.util.GregorianCalendar))))
// error: ds is already defined as value ds
// val ds: TypedDataset[Foo] = 
//     ^^
// error: ds is already defined as value ds
// val ds: TypedDataset[FooDate] = TypedDataset.create(
//     ^^
// error: could not find implicit value for parameter encoder: frameless.TypedEncoder[repl.MdocSession.MdocApp0.FooDate]
// val ds: TypedDataset[FooDate] = TypedDataset.create(
//                                                    ^

It should be noted that once derived, reflection-based Encoders and implicitly derived TypeEncoders have identical performance. The derivation mechanism is different, but the objects generated to encode and decode JVM objects in Spark's internal representation behave the same at runtime.