Typed Encoders in Frameless
Spark uses Reflection to derive its Encoder
s, which is why they can fail at run time. For example, because Spark does not support java.util.Calendar
, the following leads to an error:
import java.util.Calendar
import org.apache.spark.sql.Dataset
import spark.implicits._
case class DateRange(s: Calendar, e: Calendar)
def now = new java.util.GregorianCalendar()
val ds: Dataset[DateRange] = Seq(DateRange(now, now)).toDS()
// org.apache.spark.SparkUnsupportedOperationException: [ENCODER_NOT_FOUND] Not found an encoder of the type java.util.Calendar to Spark SQL internal representation. Consider to change the input type to one of supported at 'https://spark.apache.org/docs/latest/sql-ref-datatypes.html'.
// at org.apache.spark.sql.errors.ExecutionErrors.cannotFindEncoderForTypeError(ExecutionErrors.scala:172)
// at org.apache.spark.sql.errors.ExecutionErrors.cannotFindEncoderForTypeError$(ExecutionErrors.scala:167)
// at org.apache.spark.sql.errors.ExecutionErrors$.cannotFindEncoderForTypeError(ExecutionErrors.scala:218)
// at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:400)
// at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$encoderFor$3(ScalaReflection.scala:394)
// at scala.collection.immutable.List.map(List.scala:293)
// at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:382)
// at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$encoderFor$1(ScalaReflection.scala:247)
// at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
// at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:426)
// at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:425)
// at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:42)
// at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:244)
// at org.apache.spark.sql.catalyst.ScalaReflection$.encoderFor(ScalaReflection.scala:227)
// at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:51)
// at org.apache.spark.sql.Encoders$.product(Encoders.scala:315)
// at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder(SQLImplicits.scala:264)
// at org.apache.spark.sql.LowPrioritySQLImplicits.newProductEncoder$(SQLImplicits.scala:264)
// at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:32)
// at repl.MdocSession$MdocApp0$$anonfun$1.apply$mcV$sp(TypedEncoder.md:49)
// at repl.MdocSession$MdocApp0$$anonfun$1.apply(TypedEncoder.md:47)
// at repl.MdocSession$MdocApp0$$anonfun$1.apply(TypedEncoder.md:47)
As shown by the stack trace, this runtime error goes through ScalaReflection to try to derive an Encoder
for Dataset
schema. Beside the annoyance of not detecting this error at compile time, a more important limitation of the reflection-based approach is its inability to be extended for custom types. See this Stack Overflow question for a summary of the current situation (as of 2.0) in vanilla Spark: How to store custom objects in a Dataset?.
Frameless introduces a new type class called TypeEncoder
to solve these issues. TypeEncoder
s are passed around as implicit parameters to every Frameless method to ensure that the data being manipulated is Encoder
. It uses a standard implicit resolution coupled with shapeless' type class derivation mechanism to ensure every that compiling code manipulates encodable data. For example, the java.util.Calendar
example won't compile with Frameless:
import frameless.TypedDataset
import frameless.syntax._
def now = new java.util.GregorianCalendar()
val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(now, now)))
// error: ds is already defined as value ds
// error: ambiguous reference to overloaded definition,
// both method now in object MdocApp0 of type => java.util.GregorianCalendar
// and method now in object MdocApp0 of type => java.util.GregorianCalendar
// match expected type java.util.Calendar
// error: ambiguous reference to overloaded definition,
// both method now in object MdocApp0 of type => java.util.GregorianCalendar
// and method now in object MdocApp0 of type => java.util.GregorianCalendar
// match expected type java.util.Calendar
// error: ambiguous reference to overloaded definition,
// both method now in object MdocApp0 of type => java.util.GregorianCalendar
// and method now in object MdocApp0 of type => java.util.GregorianCalendar
// match expected type java.util.Calendar
// val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(now, now)))
// ^^^
// error: ambiguous reference to overloaded definition,
// both method now in object MdocApp0 of type => java.util.GregorianCalendar
// and method now in object MdocApp0 of type => java.util.GregorianCalendar
// match expected type java.util.Calendar
// val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(now, now)))
// ^^^
// error: could not find implicit value for parameter encoder: frameless.TypedEncoder[repl.MdocSession.MdocApp0.DateRange]
// val ds: TypedDataset[DateRange] = TypedDataset.create(Seq(DateRange(now, now)))
// ^
// error: method now is defined twice;
// the conflicting method now was defined at line 11:5
Type class derivation takes care of recursively constructing (and proving the existence of) TypeEncoder
s for case classes. The following works as expected:
case class Bar(d: Double, s: String)
case class Foo(i: Int, b: Bar)
val ds: TypedDataset[Foo] =
TypedDataset.create(Seq(Foo(1, Bar(1.1, "s"))))
// ds: TypedDataset[Foo] = [i: int, b: struct<d: double, s: string>]
ds.collect()
// res3: frameless.Job[Seq[Foo]] = frameless.Job$$anon$4@738087d0
But any non-encodable in the case class hierarchy will be detected at compile time:
case class BarDate(d: Double, s: String, t: java.util.Calendar)
case class FooDate(i: Int, b: BarDate)
val ds: TypedDataset[FooDate] = TypedDataset.create(
Seq(FooDate(1, BarDate(1.1, "s", new java.util.GregorianCalendar))))
// error: ds is already defined as value ds
// error: ds is already defined as value ds
// error: could not find implicit value for parameter encoder: frameless.TypedEncoder[repl.MdocSession.MdocApp0.FooDate]
// val ds: TypedDataset[FooDate] = TypedDataset.create(
// ^
It should be noted that once derived, reflection-based Encoder
s and implicitly derived TypeEncoder
s have identical performance.
The derivation mechanism is different, but the objects generated to encode and decode JVM objects in Spark's internal representation behave the same at runtime.