Working with CSV and Parquet data

You need these imports for most Frameless projects.

import frameless._
import frameless.syntax._
import frameless.functions.aggregate._

Working with CSV

We first load some CSV data and print the schema.

val df = spark.read.format("csv").load(testDataPath)
// df: org.apache.spark.sql.package.DataFrame = [_c0: string, _c1: string ... 3 more fields]
df.show(2)
// +---+---+---+---+-----------+
// |_c0|_c1|_c2|_c3|        _c4|
// +---+---+---+---+-----------+
// |5.1|3.5|1.4|0.2|Iris-setosa|
// |4.9|3.0|1.4|0.2|Iris-setosa|
// +---+---+---+---+-----------+
// only showing top 2 rows
// 
df.printSchema
// root
//  |-- _c0: string (nullable = true)
//  |-- _c1: string (nullable = true)
//  |-- _c2: string (nullable = true)
//  |-- _c3: string (nullable = true)
//  |-- _c4: string (nullable = true)
//

The easiest way to read from CSV into a TypedDataset is to create a case class that follows the exact number, type, and order for the fields as they appear in the CSV file. This is shown in the example bellow with the use of the Iris case class.

final case class Iris(sLength: Double, sWidth: Double, pLength: Double, pWidth: Double, kind: String)
val testDataDf = spark.read.format("csv").schema(TypedExpressionEncoder[Iris].schema).load(testDataPath)
// testDataDf: org.apache.spark.sql.package.DataFrame = [sLength: double, sWidth: double ... 3 more fields]
val data: TypedDataset[Iris] = TypedDataset.createUnsafe[Iris](testDataDf)
// data: TypedDataset[Iris] = [sLength: double, sWidth: double ... 3 more fields]
data.show(2).run()
// +-------+------+-------+------+-----------+
// |sLength|sWidth|pLength|pWidth|       kind|
// +-------+------+-------+------+-----------+
// |    5.1|   3.5|    1.4|   0.2|Iris-setosa|
// |    4.9|   3.0|    1.4|   0.2|Iris-setosa|
// +-------+------+-------+------+-----------+
// only showing top 2 rows
//

If we do not explicitly define the schema of the CSV file then the types will not match leading to runtime errors.

val testDataNoSchema = spark.read.format("csv").load(testDataPath)
// testDataNoSchema: org.apache.spark.sql.package.DataFrame = [_c0: string, _c1: string ... 3 more fields]
val data: TypedDataset[Iris] = TypedDataset.createUnsafe[Iris](testDataNoSchema)
// data: TypedDataset[Iris] = [sLength: string, sWidth: string ... 3 more fields]
data.collect().run()
// org.apache.spark.SparkRuntimeException: Error while decoding: scala.ScalaReflectionException: <none> is not a term
// newInstance(class repl.MdocSession$MdocApp0$Iris).
// 	at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1405)
// 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:185)
// 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:173)
// 	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
// 	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
// 	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
// 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
// 	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
// 	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
// 	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
// 	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4332)
// 	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3573)
// 	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
// 	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
// 	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
// 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
// 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
// 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
// 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
// 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
// 	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
// 	at org.apache.spark.sql.Dataset.collect(Dataset.scala:3573)
// 	at frameless.TypedDataset.$anonfun$collect$1(TypedDataset.scala:333)
// 	at frameless.Job$$anon$4.run(Job.scala:38)
// 	at repl.MdocSession$MdocApp0$$anonfun$13.apply(WorkingWithCsvParquetJson.md:85)
// 	at repl.MdocSession$MdocApp0$$anonfun$13.apply(WorkingWithCsvParquetJson.md:85)
// Caused by: scala.ScalaReflectionException: <none> is not a term
// 	at scala.reflect.api.Symbols$SymbolApi.asTerm(Symbols.scala:211)
// 	at scala.reflect.api.Symbols$SymbolApi.asTerm$(Symbols.scala:211)
// 	at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:100)
// 	at org.apache.spark.sql.catalyst.ScalaReflection$.findConstructor(ScalaReflection.scala:177)
// 	at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.$anonfun$constructor$1(objects.scala:567)
// 	at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.$anonfun$constructor$5(objects.scala:578)
// 	at scala.Option.getOrElse(Option.scala:189)
// 	at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.constructor$lzycompute(objects.scala:577)
// 	at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.constructor(objects.scala:564)
// 	at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.eval(objects.scala:600)
// 	at org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:115)
// 	at org.apache.spark.sql.catalyst.expressions.InterpretedSafeProjection.apply(InterpretedSafeProjection.scala:32)
// 	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:182)
// 	... 24 more

Dealing with CSV files with multiple columns

When the dataset has many columns, it is impractical to define a case class that contains many columns we don't need. In such case, we can project the columns we do need, cast them to the proper type, and then call createUnsafe using a case class that contains a much smaller subset of the columns.

import org.apache.spark.sql.types.DoubleType
final case class IrisLight(kind: String, sLength: Double)

val testDataDf = spark.read.format("csv").load(testDataPath)
// testDataDf: org.apache.spark.sql.package.DataFrame = [_c0: string, _c1: string ... 3 more fields]
val projectedDf = testDataDf.select(testDataDf("_c4").as("kind"), testDataDf("_c1").cast(DoubleType).as("sLength"))
// projectedDf: org.apache.spark.sql.package.DataFrame = [kind: string, sLength: double]
val data = TypedDataset.createUnsafe[IrisLight](projectedDf)
// data: TypedDataset[IrisLight] = [kind: string, sLength: double]
data.take(2).run()
// res5: Seq[IrisLight] = WrappedArray(
//   IrisLight("Iris-setosa", 3.5),
//   IrisLight("Iris-setosa", 3.0)
// )

Working with Parquet

Spark is much better at reading the schema from parquet files.

val testDataParquet = spark.read.format("parquet").load(testDataPathParquet)
// testDataParquet: org.apache.spark.sql.package.DataFrame = [sLength: double, sWidth: double ... 3 more fields]
testDataParquet.printSchema
// root
//  |-- sLength: double (nullable = true)
//  |-- sWidth: double (nullable = true)
//  |-- pLength: double (nullable = true)
//  |-- pWidth: double (nullable = true)
//  |-- kind: string (nullable = true)
//

So as long as we use a type (case class) that reflects the same number, type, and order of the fields from the data everything works as expected.

val data: TypedDataset[Iris] = TypedDataset.createUnsafe[Iris](testDataParquet)
// data: TypedDataset[Iris] = [sLength: double, sWidth: double ... 3 more fields]
data.take(2).run()
// res10: Seq[Iris] = WrappedArray(
//   Iris(5.1, 3.5, 1.4, 0.2, "Iris-setosa"),
//   Iris(4.9, 3.0, 1.4, 0.2, "Iris-setosa")
// )

Dealing with Parquet files with multiple columns

The main difference compared to CSV is that with Parquet Spark is better at inferring the types. This makes it simpler to project the columns we need without having the cast the to the proper type.

final case class IrisLight(kind: String, sLength: Double)

val projectedDf = testDataParquet.select("kind", "sLength")
// projectedDf: org.apache.spark.sql.package.DataFrame = [kind: string, sLength: double]
val data = TypedDataset.createUnsafe[IrisLight](projectedDf)
// data: TypedDataset[IrisLight] = [kind: string, sLength: double]
data.take(2).run()
// res11: Seq[IrisLight] = WrappedArray(
//   IrisLight("Iris-setosa", 5.1),
//   IrisLight("Iris-setosa", 4.9)
// )