Working with CSV and Parquet data
You need these imports for most Frameless projects.
import frameless._
import frameless.syntax._
import frameless.functions.aggregate._
Working with CSV
We first load some CSV data and print the schema.
val df = spark.read.format("csv").load(testDataPath)
// df: org.apache.spark.sql.package.DataFrame = [_c0: string, _c1: string ... 3 more fields]
df.show(2)
// +---+---+---+---+-----------+
// |_c0|_c1|_c2|_c3| _c4|
// +---+---+---+---+-----------+
// |5.1|3.5|1.4|0.2|Iris-setosa|
// |4.9|3.0|1.4|0.2|Iris-setosa|
// +---+---+---+---+-----------+
// only showing top 2 rows
//
df.printSchema
// root
// |-- _c0: string (nullable = true)
// |-- _c1: string (nullable = true)
// |-- _c2: string (nullable = true)
// |-- _c3: string (nullable = true)
// |-- _c4: string (nullable = true)
//
The easiest way to read from CSV into a TypedDataset
is to create a case class that follows
the exact number, type, and order for the fields as they appear in the CSV file. This is shown in
the example bellow with the use of the Iris
case class.
final case class Iris(sLength: Double, sWidth: Double, pLength: Double, pWidth: Double, kind: String)
val testDataDf = spark.read.format("csv").schema(TypedExpressionEncoder[Iris].schema).load(testDataPath)
// testDataDf: org.apache.spark.sql.package.DataFrame = [sLength: double, sWidth: double ... 3 more fields]
val data: TypedDataset[Iris] = TypedDataset.createUnsafe[Iris](testDataDf)
// data: TypedDataset[Iris] = [sLength: double, sWidth: double ... 3 more fields]
data.show(2).run()
// +-------+------+-------+------+-----------+
// |sLength|sWidth|pLength|pWidth| kind|
// +-------+------+-------+------+-----------+
// | 5.1| 3.5| 1.4| 0.2|Iris-setosa|
// | 4.9| 3.0| 1.4| 0.2|Iris-setosa|
// +-------+------+-------+------+-----------+
// only showing top 2 rows
//
If we do not explicitly define the schema of the CSV file then the types will not match leading to runtime errors.
val testDataNoSchema = spark.read.format("csv").load(testDataPath)
// testDataNoSchema: org.apache.spark.sql.package.DataFrame = [_c0: string, _c1: string ... 3 more fields]
val data: TypedDataset[Iris] = TypedDataset.createUnsafe[Iris](testDataNoSchema)
// data: TypedDataset[Iris] = [sLength: string, sWidth: string ... 3 more fields]
data.collect().run()
// org.apache.spark.SparkRuntimeException: Error while decoding: scala.ScalaReflectionException: <none> is not a term
// newInstance(class repl.MdocSession$MdocApp0$Iris).
// at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1406)
// at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:191)
// at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:179)
// at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
// at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
// at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
// at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
// at scala.collection.TraversableLike.map(TraversableLike.scala:286)
// at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
// at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
// at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
// at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3575)
// at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
// at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
// at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
// at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
// at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
// at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
// at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
// at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
// at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
// at org.apache.spark.sql.Dataset.collect(Dataset.scala:3575)
// at frameless.TypedDataset.$anonfun$collect$1(TypedDataset.scala:333)
// at frameless.Job$$anon$4.run(Job.scala:38)
// at repl.MdocSession$MdocApp0$$anonfun$13.apply(WorkingWithCsvParquetJson.md:85)
// at repl.MdocSession$MdocApp0$$anonfun$13.apply(WorkingWithCsvParquetJson.md:85)
// Caused by: scala.ScalaReflectionException: <none> is not a term
// at scala.reflect.api.Symbols$SymbolApi.asTerm(Symbols.scala:211)
// at scala.reflect.api.Symbols$SymbolApi.asTerm$(Symbols.scala:211)
// at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:100)
// at org.apache.spark.sql.catalyst.ScalaReflection$.findConstructor(ScalaReflection.scala:177)
// at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.$anonfun$constructor$1(objects.scala:567)
// at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.$anonfun$constructor$5(objects.scala:578)
// at scala.Option.getOrElse(Option.scala:189)
// at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.constructor$lzycompute(objects.scala:577)
// at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.constructor(objects.scala:564)
// at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.eval(objects.scala:600)
// at org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:115)
// at org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:32)
// at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188)
// ... 24 more
Dealing with CSV files with multiple columns
When the dataset has many columns, it is impractical to define a case class that contains many columns we don't need.
In such case, we can project the columns we do need, cast them to the proper type, and then call createUnsafe
using a case class
that contains a much smaller subset of the columns.
import org.apache.spark.sql.types.DoubleType
final case class IrisLight(kind: String, sLength: Double)
val testDataDf = spark.read.format("csv").load(testDataPath)
// testDataDf: org.apache.spark.sql.package.DataFrame = [_c0: string, _c1: string ... 3 more fields]
val projectedDf = testDataDf.select(testDataDf("_c4").as("kind"), testDataDf("_c1").cast(DoubleType).as("sLength"))
// projectedDf: org.apache.spark.sql.package.DataFrame = [kind: string, sLength: double]
val data = TypedDataset.createUnsafe[IrisLight](projectedDf)
// data: TypedDataset[IrisLight] = [kind: string, sLength: double]
data.take(2).run()
// res5: Seq[IrisLight] = WrappedArray(
// IrisLight("Iris-setosa", 3.5),
// IrisLight("Iris-setosa", 3.0)
// )
Working with Parquet
Spark is much better at reading the schema from parquet files.
val testDataParquet = spark.read.format("parquet").load(testDataPathParquet)
// testDataParquet: org.apache.spark.sql.package.DataFrame = [sLength: double, sWidth: double ... 3 more fields]
testDataParquet.printSchema
// root
// |-- sLength: double (nullable = true)
// |-- sWidth: double (nullable = true)
// |-- pLength: double (nullable = true)
// |-- pWidth: double (nullable = true)
// |-- kind: string (nullable = true)
//
So as long as we use a type (case class) that reflects the same number, type, and order of the fields from the data everything works as expected.
val data: TypedDataset[Iris] = TypedDataset.createUnsafe[Iris](testDataParquet)
// data: TypedDataset[Iris] = [sLength: double, sWidth: double ... 3 more fields]
data.take(2).run()
// res10: Seq[Iris] = WrappedArray(
// Iris(5.1, 3.5, 1.4, 0.2, "Iris-setosa"),
// Iris(4.9, 3.0, 1.4, 0.2, "Iris-setosa")
// )
Dealing with Parquet files with multiple columns
The main difference compared to CSV is that with Parquet Spark is better at inferring the types. This makes it simpler to project the columns we need without having the cast the to the proper type.
final case class IrisLight(kind: String, sLength: Double)
val projectedDf = testDataParquet.select("kind", "sLength")
// projectedDf: org.apache.spark.sql.package.DataFrame = [kind: string, sLength: double]
val data = TypedDataset.createUnsafe[IrisLight](projectedDf)
// data: TypedDataset[IrisLight] = [kind: string, sLength: double]
data.take(2).run()
// res11: Seq[IrisLight] = WrappedArray(
// IrisLight("Iris-setosa", 5.1),
// IrisLight("Iris-setosa", 4.9)
// )