TypedDataset: Feature Overview

This tutorial introduces TypedDataset using a simple example. The following imports are needed to make all code examples compile.

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import frameless.functions.aggregate._
import frameless.TypedDataset

val conf = new SparkConf().setMaster("local[*]").setAppName("Frameless repl").set("spark.ui.enabled", "false")
implicit val spark = SparkSession.builder().config(conf).appName("REPL").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

import spark.implicits._

Creating TypedDataset instances

We start by defining a case class:

case class Apartment(city: String, surface: Int, price: Double, bedrooms: Int)

And few Apartment instances:

val apartments = Seq(
  Apartment("Paris", 50,  300000.0, 2),
  Apartment("Paris", 100, 450000.0, 3),
  Apartment("Paris", 25,  250000.0, 1),
  Apartment("Lyon",  83,  200000.0, 2),
  Apartment("Lyon",  45,  133000.0, 1),
  Apartment("Nice",  74,  325000.0, 3)
)

We are now ready to instantiate a TypedDataset[Apartment]:

val aptTypedDs = TypedDataset.create(apartments)
// aptTypedDs: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]

We can also create one from an existing Spark Dataset:

val aptDs = spark.createDataset(apartments)
// aptDs: org.apache.spark.sql.Dataset[Apartment] = [city: string, surface: int ... 2 more fields]
val aptTypedDs = TypedDataset.create(aptDs)
// aptTypedDs: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]

Or use the Frameless syntax:

import frameless.syntax._

val aptTypedDs2 = aptDs.typed
// aptTypedDs2: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]

Typesafe column referencing

This is how we select a particular column from a TypedDataset:

val cities: TypedDataset[String] = aptTypedDs.select(aptTypedDs('city))
// cities: TypedDataset[String] = [value: string]

This is completely type-safe, for instance suppose we misspell city as citi:

aptTypedDs.select(aptTypedDs('citi))
// error: No column Symbol with shapeless.tag.Tagged[String("citi")] of type A in repl.MdocSession.MdocApp0.Apartment
// aptTypedDs.select(aptTypedDs('citi))
//                             ^

This gets raised at compile time, whereas with the standard Dataset API the error appears at runtime (enjoy the stack trace):

aptDs.select('citi)
// org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `citi` cannot be resolved. Did you mean one of the following? [`city`, `price`, `surface`, `bedrooms`].;
// 'Project ['citi]
// +- LocalRelation [city#64, surface#65, price#66, bedrooms#67]
// 
// 	at org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:307)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:147)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6(CheckAnalysis.scala:266)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6$adapted(CheckAnalysis.scala:264)
// 	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:244)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:264)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:264)
// 	at scala.collection.immutable.Stream.foreach(Stream.scala:533)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:264)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:182)
// 	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:244)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:182)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:164)
// 	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:188)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:160)
// 	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:150)
// 	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:188)
// 	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:211)
// 	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
// 	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
// 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
// 	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
// 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
// 	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
// 	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
// 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
// 	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
// 	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
// 	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
// 	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
// 	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
// 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
// 	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
// 	at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:4351)
// 	at org.apache.spark.sql.Dataset.select(Dataset.scala:1540)
// 	at repl.MdocSession$MdocApp0$$anonfun$15.apply(FeatureOverview.md:95)
// 	at repl.MdocSession$MdocApp0$$anonfun$15.apply(FeatureOverview.md:95)

select() supports arbitrary column operations:

aptTypedDs.select(aptTypedDs('surface) * 10, aptTypedDs('surface) + 2).show().run()
// +----+---+
// |  _1| _2|
// +----+---+
// | 500| 52|
// |1000|102|
// | 250| 27|
// | 830| 85|
// | 450| 47|
// | 740| 76|
// +----+---+
//

Note that unlike the standard Spark API, where some operations are lazy and some are not, all TypedDatasets operations are lazy. In the above example, show() is lazy. It requires to apply run() for the show job to materialize. A more detailed explanation of Job is given here.

Next we compute the price by surface unit:

val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface))
// error: overloaded method value / with alternatives:
//   (u: Double)(implicit n: frameless.CatalystNumeric[Double])frameless.TypedColumn[repl.MdocSession.MdocApp0.Apartment,Double] <and>
//   [Out, TT, W](other: frameless.TypedColumn[TT,Double])(implicit n: frameless.CatalystDivisible[Double,Out], implicit e: frameless.TypedEncoder[Out], implicit w: frameless.With[repl.MdocSession.MdocApp0.Apartment,TT]{type Out = W})frameless.TypedColumn[W,Out]
//  cannot be applied to (frameless.TypedColumn[repl.MdocSession.MdocApp0.Apartment,Int])
// val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface).cast[Double])
//                                            ^^^^^^^^^^^^^^^^^^^^

As the error suggests, we can't divide a TypedColumn of Double by Int. For safety, in Frameless only math operations between same types is allowed:

val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface).cast[Double])
// priceBySurfaceUnit: TypedDataset[Double] = [value: double]
priceBySurfaceUnit.collect().run()
// res5: Seq[Double] = WrappedArray(
//   6000.0,
//   4500.0,
//   10000.0,
//   2409.6385542168673,
//   2955.5555555555557,
//   4391.891891891892
// )

Looks like it worked, but that cast seems unsafe right? Actually it is safe. Let's try to cast a TypedColumn of String to Double:

aptTypedDs('city).cast[Double]
// error: could not find implicit value for parameter c: frameless.CatalystCast[String,Double]
// val updated = aptTypedDs.select(aptTypedDs('city), aptTypedDs('surface) + 2).as[UpdatedSurface]
//               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The compile-time error tells us that to perform the cast, an evidence (in the form of CatalystCast[String, Double]) must be available. Since casting from String to Double is not allowed, this results in a compilation error.

Check here for the set of available CatalystCast.

Working with Optional columns

When working with real data we have to deal with imperfections, such as missing fields. Columns that may have missing data should be represented using Options. For this example, let's assume that the Apartments dataset may have missing values.

case class ApartmentOpt(city: Option[String], surface: Option[Int], price: Option[Double], bedrooms: Option[Int])
val apartmentsOpt = Seq(
  ApartmentOpt(Some("Paris"), Some(50),  Some(300000.0), None),
  ApartmentOpt(None, None, Some(450000.0), Some(3))
)
val aptTypedDsOpt = TypedDataset.create(apartmentsOpt)
// aptTypedDsOpt: TypedDataset[ApartmentOpt] = [city: string, surface: int ... 2 more fields]
aptTypedDsOpt.show().run()
// +-----+-------+--------+--------+
// | city|surface|   price|bedrooms|
// +-----+-------+--------+--------+
// |Paris|     50|300000.0|    NULL|
// | NULL|   NULL|450000.0|       3|
// +-----+-------+--------+--------+
//

Unfortunately the syntax used above with select() will not work here:

aptTypedDsOpt.select(aptTypedDsOpt('surface) * 10, aptTypedDsOpt('surface) + 2).show().run()
// error: overloaded method value * with alternatives:
//   (u: Option[Int])(implicit n: frameless.CatalystNumeric[Option[Int]])frameless.TypedColumn[ApartmentOpt,Option[Int]] <and>
//   [TT, W](other: frameless.TypedColumn[TT,Option[Int]])(implicit n: frameless.CatalystNumeric[Option[Int]], implicit w: frameless.With[ApartmentOpt,TT]{type Out = W}, implicit t: scala.reflect.ClassTag[Option[Int]])frameless.TypedColumn[W,Option[Int]]
//  cannot be applied to (Int)
// aptTypedDsOpt.select(aptTypedDsOpt('surface) * 10, aptTypedDsOpt('surface) + 2).show().run()
//                      ^^^^^^^^^^^^^^^^^^^^^^^^^
// error: overloaded method value + with alternatives:
//   (u: Option[Int])(implicit n: frameless.CatalystNumeric[Option[Int]])frameless.TypedColumn[ApartmentOpt,Option[Int]] <and>
//   [TT, W](other: frameless.TypedColumn[TT,Option[Int]])(implicit n: frameless.CatalystNumeric[Option[Int]], implicit w: frameless.With[ApartmentOpt,TT]{type Out = W})frameless.TypedColumn[W,Option[Int]]
//  cannot be applied to (Int)
// aptTypedDsOpt.select(aptTypedDsOpt('surface) * 10, aptTypedDsOpt('surface) + 2).show().run()
//                                                    ^^^^^^^^^^^^^^^^^^^^^^^^^

This is because we cannot multiple an Option with an Int. In Scala, Option has a map() method to help address exactly this (e.g., Some(10).map(c => c * 2)). Frameless follows a similar convention. By applying the opt method on any Option[X] column you can then use map() to provide a function that works with the unwrapped type X. This is best shown in the example bellow:

aptTypedDsOpt.select(aptTypedDsOpt('surface).opt.map(c => c * 10), aptTypedDsOpt('surface).opt.map(_ + 2)).show().run()

Known issue: map() will throw a runtime exception when the applied function includes a udf(). If you want to apply a udf() to an optional column, we recommend changing your udf to work directly with Optional fields.

Casting and projections

In the general case, select() returns a TypedDataset of type TypedDataset[TupleN[...]] (with N in [1...10]). For example, if we select three columns with types String, Int, and Boolean the result will have type TypedDataset[(String, Int, Boolean)].

We often want to give more expressive types to the result of our computations. as[T] allows us to safely cast a TypedDataset[U] to another of type TypedDataset[T] as long as the types in U and T align.

When the cast is valid the expression compiles:

case class UpdatedSurface(city: String, surface: Int)
val updated = aptTypedDs.select(aptTypedDs('city), aptTypedDs('surface) + 2).as[UpdatedSurface]
// updated: TypedDataset[UpdatedSurface] = [city: string, surface: int]
updated.show(2).run()
// +-----+-------+
// | city|surface|
// +-----+-------+
// |Paris|     52|
// |Paris|    102|
// +-----+-------+
// only showing top 2 rows
//

Next we try to cast a (String, String) to an UpdatedSurface (which has types String, Int). The cast is not valid and the expression does not compile:

aptTypedDs.select(aptTypedDs('city), aptTypedDs('city)).as[UpdatedSurface]
// error: could not find implicit value for parameter as: frameless.ops.As[(String, String),UpdatedSurface]
// aptTypedDs.select(aptTypedDs('city), aptTypedDs('city)).as[UpdatedSurface]
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Advanced topics with select()

When you select() a single column that has type A, the resulting type is TypedDataset[A] and not TypedDataset[Tuple1[A]]. This behavior makes working with nested schema easier (i.e., in the case where A is a complex data type) and simplifies type-checking column operations (e.g., verify that two columns can be added, divided, etc.). However, when A is scalar, say a Long, it makes it harder to select and work with the resulting TypedDataset[Long]. For instance, it's harder to reference this single scalar column using select(). If this becomes an issue, you can bypass this behavior by using the selectMany() method instead of select(). In the previous example, selectMany() will return TypedDataset[Tuple1[Long]] and you can reference its single column using the name _1. selectMany() should also be used when you need to select more than 10 columns. select() has better IDE support and compiles faster than the macro based selectMany(), so prefer select() for the most common use cases.

When you are handed a single scalar column TypedDataset (e.g., TypedDataset[Double]) the best way to reference its single column is using the asCol (short for "as a column") method. This is best shown in the example below. We will see more usages of asCol later in this tutorial.

val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface).cast[Double])
// priceBySurfaceUnit: TypedDataset[Double] = [value: double]
priceBySurfaceUnit.select(priceBySurfaceUnit.asCol * 2).show(2).run()
// +-------+
// |  value|
// +-------+
// |12000.0|
// | 9000.0|
// +-------+
// only showing top 2 rows
//

Projections

We often want to work with a subset of the fields in a dataset. Projections allow us to easily select our fields of interest while preserving their initial names and types for extra safety.

Here is an example using the TypedDataset[Apartment] with an additional column:

val aptds = aptTypedDs // For shorter expressions
// aptds: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]

case class ApartmentDetails(city: String, price: Double, surface: Int, ratio: Double)
val aptWithRatio =
  aptds.select(
    aptds('city),
    aptds('price),
    aptds('surface),
    aptds('price) / aptds('surface).cast[Double]
  ).as[ApartmentDetails]
// aptWithRatio: TypedDataset[ApartmentDetails] = [city: string, price: double ... 2 more fields]

Suppose we only want to work with city and ratio:

case class CityInfo(city: String, ratio: Double)

val cityRatio = aptWithRatio.project[CityInfo]
// cityRatio: TypedDataset[CityInfo] = [city: string, ratio: double]

cityRatio.show(2).run()
// +-----+------+
// | city| ratio|
// +-----+------+
// |Paris|6000.0|
// |Paris|4500.0|
// +-----+------+
// only showing top 2 rows
//

Suppose we only want to work with price and ratio:

case class PriceInfo(ratio: Double, price: Double)

val priceInfo = aptWithRatio.project[PriceInfo]
// priceInfo: TypedDataset[PriceInfo] = [ratio: double, price: double]

priceInfo.show(2).run()
// +------+--------+
// | ratio|   price|
// +------+--------+
// |6000.0|300000.0|
// |4500.0|450000.0|
// +------+--------+
// only showing top 2 rows
//

We see that the order of the fields does not matter as long as the names and the corresponding types agree. However, if we make a mistake in any of the names and/or their types, then we get a compilation error.

Say we make a typo in a field name:

case class PriceInfo2(ratio: Double, pricEE: Double)
aptWithRatio.project[PriceInfo2]
// error: Cannot prove that ApartmentDetails can be projected to PriceInfo2. Perhaps not all member names and types of PriceInfo2 are the same in ApartmentDetails?
// aptWithRatio.project[PriceInfo2]
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Say we make a mistake in the corresponding type:

case class PriceInfo3(ratio: Int, price: Double) // ratio should be Double
aptWithRatio.project[PriceInfo3]
// error: Cannot prove that ApartmentDetails can be projected to PriceInfo3. Perhaps not all member names and types of PriceInfo3 are the same in ApartmentDetails?
// aptWithRatio.project[PriceInfo3]
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Union of TypedDatasets

Lets create a projection of our original dataset with a subset of the fields.

case class ApartmentShortInfo(city: String, price: Double, bedrooms: Int)

val aptTypedDs2: TypedDataset[ApartmentShortInfo] = aptTypedDs.project[ApartmentShortInfo]

The union of aptTypedDs2 with aptTypedDs uses all the fields of the caller (aptTypedDs2) and expects the other dataset (aptTypedDs) to include all those fields. If field names/types do not match you get a compilation error.

aptTypedDs2.union(aptTypedDs).show().run
// +-----+--------+--------+
// | city|   price|bedrooms|
// +-----+--------+--------+
// |Paris|300000.0|       2|
// |Paris|450000.0|       3|
// |Paris|250000.0|       1|
// | Lyon|200000.0|       2|
// | Lyon|133000.0|       1|
// | Nice|325000.0|       3|
// |Paris|300000.0|       2|
// |Paris|450000.0|       3|
// |Paris|250000.0|       1|
// | Lyon|200000.0|       2|
// | Lyon|133000.0|       1|
// | Nice|325000.0|       3|
// +-----+--------+--------+
//

The other way around will not compile, since aptTypedDs2 has only a subset of the fields.

aptTypedDs.union(aptTypedDs2).show().run
// error: Cannot prove that ApartmentShortInfo can be projected to repl.MdocSession.MdocApp0.Apartment. Perhaps not all member names and types of repl.MdocSession.MdocApp0.Apartment are the same in ApartmentShortInfo?
// Error occurred in an application involving default arguments.
// aptTypedDs.union(aptTypedDs2).show().run
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Finally, as with project, union will align fields that have same names/types, so fields do not have to be in the same order.

TypedDataset functions and transformations

Frameless supports many of Spark's functions and transformations. However, whenever a Spark function does not exist in Frameless, calling .dataset will expose the underlying Dataset (from org.apache.spark.sql, the original Spark APIs), where you can use anything that would be missing from the Frameless' API.

These are the main imports for Frameless' aggregate and non-aggregate functions.

import frameless.functions._                // For literals
import frameless.functions.nonAggregate._   // e.g., concat, abs
import frameless.functions.aggregate._      // e.g., count, sum, avg 

Drop/Replace/Add fields

dropTupled() drops a single column and results in a tuple-based schema.

aptTypedDs2.dropTupled('price): TypedDataset[(String,Int)]
// res18: TypedDataset[(String, Int)] = [_1: string, _2: int]

To drop a column and specify a new schema use drop().

case class CityBeds(city: String, bedrooms: Int)
val cityBeds: TypedDataset[CityBeds] = aptTypedDs2.drop[CityBeds] 
// cityBeds: TypedDataset[CityBeds] = [city: string, bedrooms: int]

Often, you want to replace an existing column with a new value.

val inflation = aptTypedDs2.withColumnReplaced('price, aptTypedDs2('price) * 2)
// inflation: TypedDataset[ApartmentShortInfo] = [city: string, price: double ... 1 more field]
 
inflation.show(2).run()
// +-----+--------+--------+
// | city|   price|bedrooms|
// +-----+--------+--------+
// |Paris|600000.0|       2|
// |Paris|900000.0|       3|
// +-----+--------+--------+
// only showing top 2 rows
//

Or use a literal instead.

import frameless.functions.lit
aptTypedDs2.withColumnReplaced('price, lit(0.001)) 
// res20: TypedDataset[ApartmentShortInfo] = [city: string, price: double ... 1 more field]

Adding a column using withColumnTupled() results in a tupled-based schema.

aptTypedDs2.withColumnTupled(lit(Array("a","b","c"))).show(2).run()
// +-----+--------+---+---------+
// |   _1|      _2| _3|       _4|
// +-----+--------+---+---------+
// |Paris|300000.0|  2|[a, b, c]|
// |Paris|450000.0|  3|[a, b, c]|
// +-----+--------+---+---------+
// only showing top 2 rows
//

Similarly, withColumn() adds a column and explicitly expects a schema for the result.

case class CityBedsOther(city: String, bedrooms: Int, other: List[String])

cityBeds.
   withColumn[CityBedsOther](lit(List("a","b","c"))).
   show(1).run()
// +-----+--------+---------+
// | city|bedrooms|    other|
// +-----+--------+---------+
// |Paris|       2|[a, b, c]|
// +-----+--------+---------+
// only showing top 1 row
//

To conditionally change a column use the when/otherwise operation.

import frameless.functions.nonAggregate.when
aptTypedDs2.withColumnTupled(
   when(aptTypedDs2('city) === "Paris", aptTypedDs2('price)).
   when(aptTypedDs2('city) === "Lyon", lit(1.1)).
   otherwise(lit(0.0))).show(8).run()
// +-----+--------+---+--------+
// |   _1|      _2| _3|      _4|
// +-----+--------+---+--------+
// |Paris|300000.0|  2|300000.0|
// |Paris|450000.0|  3|450000.0|
// |Paris|250000.0|  1|250000.0|
// | Lyon|200000.0|  2|     1.1|
// | Lyon|133000.0|  1|     1.1|
// | Nice|325000.0|  3|     0.0|
// +-----+--------+---+--------+
//

A simple way to add a column without losing important schema information is to project the entire source schema into a single column using the asCol() method.

val c = cityBeds.select(cityBeds.asCol, lit(List("a","b","c")))
// c: TypedDataset[(CityBeds, List[String])] = [_1: struct<city: string, bedrooms: int>, _2: array<string>]
c.show(1).run()
// +----------+---------+
// |        _1|       _2|
// +----------+---------+
// |{Paris, 2}|[a, b, c]|
// +----------+---------+
// only showing top 1 row
//

When working with Spark's DataFrames, you often select all columns using .select($"*", ...). In a way, asCol() is a typed equivalent of $"*".

To access nested columns, use the colMany() method.

c.select(c.colMany('_1, 'city), c('_2)).show(2).run()
// +-----+---------+
// |   _1|       _2|
// +-----+---------+
// |Paris|[a, b, c]|
// |Paris|[a, b, c]|
// +-----+---------+
// only showing top 2 rows
//

Working with collections

import frameless.functions._
import frameless.functions.nonAggregate._
val t = cityRatio.select(cityRatio('city), lit(List("abc","c","d")))
// t: TypedDataset[(String, List[String])] = [_1: string, _2: array<string>]
t.withColumnTupled(
   arrayContains(t('_2), "abc")
).show(1).run()
// +-----+-----------+----+
// |   _1|         _2|  _3|
// +-----+-----------+----+
// |Paris|[abc, c, d]|true|
// +-----+-----------+----+
// only showing top 1 row
//

If accidentally you apply a collection function on a column that is not a collection, you get a compilation error.

t.withColumnTupled(
   arrayContains(t('_1), "abc")
)
// error: no type parameters for method arrayContains: (column: frameless.AbstractTypedColumn[T,C[A]], value: A)(implicit evidence$1: frameless.CatalystCollection[C])column.ThisType[T,Boolean] exist so that it can be applied to arguments (frameless.TypedColumn[(String, List[String]),String], String)
//  --- because ---
// argument expression's type is not compatible with formal parameter type;
//  found   : frameless.TypedColumn[(String, List[String]),String]
//  required: frameless.AbstractTypedColumn[?T,?C[?A]]
// 
// Error occurred in an application involving default arguments.
//    arrayContains(t('_1), "abc")
//    ^^^^^^^^^^^^^
// error: type mismatch;
//  found   : frameless.TypedColumn[(String, List[String]),String]
//  required: frameless.AbstractTypedColumn[T,C[A]]
// Error occurred in an application involving default arguments.
//    arrayContains(t('_1), "abc")
//                  ^^^^^^
// error: type mismatch;
//  found   : String("abc")
//  required: A
// Error occurred in an application involving default arguments.
//    arrayContains(t('_1), "abc")
//                          ^^^^^
// error: Cannot do collection operations on columns of type C.
// Error occurred in an application involving default arguments.
//    arrayContains(t('_1), "abc")
//                 ^

Flattening columns in Spark is done with the explode() method. Unlike vanilla Spark, in Frameless explode() is part of TypedDataset and not a function of a column. This provides additional safety since more than one explode() applied in a single statement results in runtime error in vanilla Spark.

val t2 = cityRatio.select(cityRatio('city), lit(List(1,2,3,4)))
// t2: TypedDataset[(String, List[Int])] = [_1: string, _2: array<int>]
val flattened = t2.explode('_2): TypedDataset[(String, Int)]
// flattened: TypedDataset[(String, Int)] = [_1: string, _2: int]
flattened.show(4).run()
// +-----+---+
// |   _1| _2|
// +-----+---+
// |Paris|  1|
// |Paris|  2|
// |Paris|  3|
// |Paris|  4|
// +-----+---+
// only showing top 4 rows
//

Here is an example of how explode() may fail in vanilla Spark. The Frameless implementation does not suffer from this problem since, by design, it can only be applied to a single column at a time.

{
  import org.apache.spark.sql.functions.{explode => sparkExplode}
  t2.dataset.toDF().select(sparkExplode($"_2"), sparkExplode($"_2"))
}
// error: Unit does not take parameters
// Error occurred in an application involving default arguments.
// flattened.show(4).run()
// ^^^^^^^^^^^^^

Collecting data to the driver

In Frameless all Spark actions (such as collect()) are safe.

Take the first element from a dataset (if the dataset is empty return None).

cityBeds.headOption.run()
// res30: Option[CityBeds] = Some(CityBeds("Paris", 2))

Take the first n elements.

cityBeds.take(2).run()
// res31: Seq[CityBeds] = WrappedArray(
//   CityBeds("Paris", 2),
//   CityBeds("Paris", 3)
// )
cityBeds.head(3).run()
// res32: Seq[CityBeds] = WrappedArray(
//   CityBeds("Paris", 2),
//   CityBeds("Paris", 3),
//   CityBeds("Paris", 1)
// )
cityBeds.limit(4).collect().run()
// res33: Seq[CityBeds] = WrappedArray(
//   CityBeds("Paris", 2),
//   CityBeds("Paris", 3),
//   CityBeds("Paris", 1),
//   CityBeds("Lyon", 2)
// )

Sorting columns

Only column types that can be sorted are allowed to be selected for sorting.

aptTypedDs.orderBy(aptTypedDs('city).asc).show(2).run()
// +----+-------+--------+--------+
// |city|surface|   price|bedrooms|
// +----+-------+--------+--------+
// |Lyon|     45|133000.0|       1|
// |Lyon|     83|200000.0|       2|
// +----+-------+--------+--------+
// only showing top 2 rows
//

The ordering can be changed by selecting .acs or .desc.

aptTypedDs.orderBy(
   aptTypedDs('city).asc, 
   aptTypedDs('price).desc
).show(2).run()
// +----+-------+--------+--------+
// |city|surface|   price|bedrooms|
// +----+-------+--------+--------+
// |Lyon|     83|200000.0|       2|
// |Lyon|     45|133000.0|       1|
// +----+-------+--------+--------+
// only showing top 2 rows
//

User Defined Functions

Frameless supports lifting any Scala function (up to five arguments) to the context of a particular TypedDataset:

// The function we want to use as UDF
val priceModifier =
    (name: String, price:Double) => if(name == "Paris") price * 2.0 else price
// priceModifier: (String, Double) => Double = <function2>

val udf = aptTypedDs.makeUDF(priceModifier)
// udf: (frameless.TypedColumn[Apartment, String], frameless.TypedColumn[Apartment, Double]) => frameless.TypedColumn[Apartment, Double] = frameless.functions.Udf$$Lambda$15137/0x00000008041d1840@782725bb

val aptds = aptTypedDs // For shorter expressions
// aptds: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]

val adjustedPrice = aptds.select(aptds('city), udf(aptds('city), aptds('price)))
// adjustedPrice: TypedDataset[(String, Double)] = [_1: string, _2: double]

adjustedPrice.show().run()
// +-----+--------+
// |   _1|      _2|
// +-----+--------+
// |Paris|600000.0|
// |Paris|900000.0|
// |Paris|500000.0|
// | Lyon|200000.0|
// | Lyon|133000.0|
// | Nice|325000.0|
// +-----+--------+
//

GroupBy and Aggregations

Let's suppose we wanted to retrieve the average apartment price in each city

val priceByCity = aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('price)))
// priceByCity: TypedDataset[(String, Double)] = [_1: string, _2: double]
priceByCity.collect().run()
// res37: Seq[(String, Double)] = WrappedArray(
//   ("Paris", 333333.3333333333),
//   ("Lyon", 166500.0),
//   ("Nice", 325000.0)
// )

Again if we try to aggregate a column that can't be aggregated, we get a compilation error

aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('city)))
// error: Cannot compute average of type String.
// Error occurred in an application involving default arguments.
// aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('city)))
//                                              ^

Next, we combine select and groupBy to calculate the average price/surface ratio per city:

val aptds = aptTypedDs // For shorter expressions
// aptds: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]

val cityPriceRatio =  aptds.select(aptds('city), aptds('price) / aptds('surface).cast[Double])
// cityPriceRatio: TypedDataset[(String, Double)] = [_1: string, _2: double]

cityPriceRatio.groupBy(cityPriceRatio('_1)).agg(avg(cityPriceRatio('_2))).show().run()
// +-----+------------------+
// |   _1|                _2|
// +-----+------------------+
// |Paris| 6833.333333333333|
// | Lyon|2682.5970548862115|
// | Nice| 4391.891891891892|
// +-----+------------------+
//

We can also use pivot to further group data on a secondary column. For example, we can compare the average price across cities by number of bedrooms.

case class BedroomStats(
   city: String,
   AvgPriceBeds1: Option[Double], // Pivot values may be missing, so we encode them using Options
   AvgPriceBeds2: Option[Double],
   AvgPriceBeds3: Option[Double],
   AvgPriceBeds4: Option[Double])

val bedroomStats = aptds.
   groupBy(aptds('city)).
   pivot(aptds('bedrooms)).
   on(1,2,3,4). // We only care for up to 4 bedrooms
   agg(avg(aptds('price))).
   as[BedroomStats]  // Typesafe casting
// bedroomStats: TypedDataset[BedroomStats] = [city: string, AvgPriceBeds1: double ... 3 more fields]

bedroomStats.show().run()
// +-----+-------------+-------------+-------------+-------------+
// | city|AvgPriceBeds1|AvgPriceBeds2|AvgPriceBeds3|AvgPriceBeds4|
// +-----+-------------+-------------+-------------+-------------+
// | Nice|         NULL|         NULL|     325000.0|         NULL|
// |Paris|     250000.0|     300000.0|     450000.0|         NULL|
// | Lyon|     133000.0|     200000.0|         NULL|         NULL|
// +-----+-------------+-------------+-------------+-------------+
//

With pivot, collecting data preserves typesafety by encoding potentially missing columns with Option.

bedroomStats.collect().run().foreach(println)
// BedroomStats(Nice,None,None,Some(325000.0),None)
// BedroomStats(Paris,Some(250000.0),Some(300000.0),Some(450000.0),None)
// BedroomStats(Lyon,Some(133000.0),Some(200000.0),None,None)

Working with Optional fields

Optional fields can be converted to non-optional using getOrElse().

val sampleStats = bedroomStats.select(
   bedroomStats('AvgPriceBeds2).getOrElse(0.0),
   bedroomStats('AvgPriceBeds3).getOrElse(0.0))
// sampleStats: TypedDataset[(Double, Double)] = [_1: double, _2: double]

sampleStats.show().run()   
// +--------+--------+
// |      _1|      _2|
// +--------+--------+
// |     0.0|325000.0|
// |300000.0|450000.0|
// |200000.0|     0.0|
// +--------+--------+
//

In addition, optional columns can be flatten using the .flattenOption method on TypedDatset. The result contains the rows for which the flattened column is not None (or null). The schema is automatically adapted to reflect this change.

val flattenStats = bedroomStats.flattenOption('AvgPriceBeds2)
// flattenStats: TypedDataset[shapeless.ops.TuplerInstances.<refinement>.this.type.Out] = [_1: string, _2: double ... 3 more fields]


// The second Option[Double] is now of type Double, since all 'null' values are removed
flattenStats: TypedDataset[(String, Option[Double], Double, Option[Double], Option[Double])]
// res43: TypedDataset[(String, Option[Double], Double, Option[Double], Option[Double])] = [_1: string, _2: double ... 3 more fields]

In a DataFrame, if you just ignore types, this would equivelantly be written as:

bedroomStats.dataset.toDF().filter($"AvgPriceBeds2".isNotNull)
// res44: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [city: string, AvgPriceBeds1: double ... 3 more fields]

Entire TypedDataset Aggregation

We often want to aggregate the entire TypedDataset and skip the groupBy() clause. In Frameless you can do this using the agg() operator directly on the TypedDataset. In the following example, we compute the average price, the average surface, the minimum surface, and the set of cities for the entire dataset.

case class Stats(
   avgPrice: Double,
   avgSurface: Double,
   minSurface: Int,
   allCities: Vector[String])

aptds.agg(
   avg(aptds('price)),
   avg(aptds('surface)),
   min(aptds('surface)),
   collectSet(aptds('city))
).as[Stats].show().run()
// +-----------------+------------------+----------+-------------------+
// |         avgPrice|        avgSurface|minSurface|          allCities|
// +-----------------+------------------+----------+-------------------+
// |276333.3333333333|62.833333333333336|        25|[Paris, Nice, Lyon]|
// +-----------------+------------------+----------+-------------------+
//

You may apply any TypedColumn operation to a TypedAggregate column as well.

import frameless.functions._
aptds.agg(
   avg(aptds('price)) * min(aptds('surface)).cast[Double], 
   avg(aptds('surface)) * 0.2,
   litAggr("Hello World")
).show().run()
// +-----------------+------------------+-----------+
// |               _1|                _2|         _3|
// +-----------------+------------------+-----------+
// |6908333.333333333|12.566666666666668|Hello World|
// +-----------------+------------------+-----------+
//

Joins

case class CityPopulationInfo(name: String, population: Int)

val cityInfo = Seq(
  CityPopulationInfo("Paris", 2229621),
  CityPopulationInfo("Lyon", 500715),
  CityPopulationInfo("Nice", 343629)
)

val citiInfoTypedDS = TypedDataset.create(cityInfo)

Here is how to join the population information to the apartment's dataset:

val withCityInfo = aptTypedDs.joinInner(citiInfoTypedDS) { aptTypedDs('city) === citiInfoTypedDS('name) }
// withCityInfo: TypedDataset[(Apartment, CityPopulationInfo)] = [_1: struct<city: string, surface: int ... 2 more fields>, _2: struct<name: string, population: int>]

withCityInfo.show().run()
// +--------------------+----------------+
// |                  _1|              _2|
// +--------------------+----------------+
// |{Paris, 50, 30000...|{Paris, 2229621}|
// |{Paris, 100, 4500...|{Paris, 2229621}|
// |{Paris, 25, 25000...|{Paris, 2229621}|
// |{Lyon, 83, 200000...|  {Lyon, 500715}|
// |{Lyon, 45, 133000...|  {Lyon, 500715}|
// |{Nice, 74, 325000...|  {Nice, 343629}|
// +--------------------+----------------+
//

The joined TypedDataset has type TypedDataset[(Apartment, CityPopulationInfo)].

We can then select which information we want to continue to work with:

case class AptPriceCity(city: String, aptPrice: Double, cityPopulation: Int)

withCityInfo.select(
   withCityInfo.colMany('_2, 'name), withCityInfo.colMany('_1, 'price), withCityInfo.colMany('_2, 'population)
).as[AptPriceCity].show().run
// +-----+--------+--------------+
// | city|aptPrice|cityPopulation|
// +-----+--------+--------------+
// |Paris|300000.0|       2229621|
// |Paris|450000.0|       2229621|
// |Paris|250000.0|       2229621|
// | Lyon|200000.0|        500715|
// | Lyon|133000.0|        500715|
// | Nice|325000.0|        343629|
// +-----+--------+--------------+
//

Chained Joins

Joins, or any similar operation, may be chained using a thrush combinator removing the need for intermediate values. Instead of:

val withBedroomInfoInterim = aptTypedDs.joinInner(citiInfoTypedDS)( aptTypedDs('city) === citiInfoTypedDS('name) )
// withBedroomInfoInterim: TypedDataset[(Apartment, CityPopulationInfo)] = [_1: struct<city: string, surface: int ... 2 more fields>, _2: struct<name: string, population: int>]
val withBedroomInfo = withBedroomInfoInterim 
  .joinLeft(bedroomStats)( withBedroomInfoInterim.col('_1).field('city) === bedroomStats('city) )
// withBedroomInfo: TypedDataset[((Apartment, CityPopulationInfo), Option[BedroomStats])] = [_1: struct<_1: struct<city: string, surface: int ... 2 more fields>, _2: struct<name: string, population: int>>, _2: struct<city: string, AvgPriceBeds1: double ... 3 more fields>]

withBedroomInfo.show().run()
// +--------------------+--------------------+
// |                  _1|                  _2|
// +--------------------+--------------------+
// |{{Paris, 50, 3000...|{Paris, 250000.0,...|
// |{{Paris, 100, 450...|{Paris, 250000.0,...|
// |{{Paris, 25, 2500...|{Paris, 250000.0,...|
// |{{Lyon, 83, 20000...|{Lyon, 133000.0, ...|
// |{{Lyon, 45, 13300...|{Lyon, 133000.0, ...|
// |{{Nice, 74, 32500...|{Nice, NULL, NULL...|
// +--------------------+--------------------+
//

You can use thrush from mouse:

libraryDependencies += "org.typelevel" %% "mouse" % "1.2.1"
import mouse.all._

val withBedroomInfoChained = aptTypedDs.joinInner(citiInfoTypedDS)( aptTypedDs('city) === citiInfoTypedDS('name) )
  .thrush( interim => interim.joinLeft(bedroomStats)( interim.col('_1).field('city) === bedroomStats('city) ) )
// withBedroomInfoChained: TypedDataset[((Apartment, CityPopulationInfo), Option[BedroomStats])] = [_1: struct<_1: struct<city: string, surface: int ... 2 more fields>, _2: struct<name: string, population: int>>, _2: struct<city: string, AvgPriceBeds1: double ... 3 more fields>]

withBedroomInfoChained.show().run()
// +--------------------+--------------------+
// |                  _1|                  _2|
// +--------------------+--------------------+
// |{{Paris, 50, 3000...|{Paris, 250000.0,...|
// |{{Paris, 100, 450...|{Paris, 250000.0,...|
// |{{Paris, 25, 2500...|{Paris, 250000.0,...|
// |{{Lyon, 83, 20000...|{Lyon, 133000.0, ...|
// |{{Lyon, 45, 13300...|{Lyon, 133000.0, ...|
// |{{Nice, 74, 32500...|{Nice, NULL, NULL...|
// +--------------------+--------------------+
//