TypedDataset: Feature Overview
This tutorial introduces TypedDataset
using a simple example.
The following imports are needed to make all code examples compile.
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import frameless.functions.aggregate._
import frameless.TypedDataset
val conf = new SparkConf().setMaster("local[*]").setAppName("Frameless repl").set("spark.ui.enabled", "false")
implicit val spark = SparkSession.builder().config(conf).appName("REPL").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
Creating TypedDataset instances
We start by defining a case class:
case class Apartment(city: String, surface: Int, price: Double, bedrooms: Int)
And few Apartment
instances:
val apartments = Seq(
Apartment("Paris", 50, 300000.0, 2),
Apartment("Paris", 100, 450000.0, 3),
Apartment("Paris", 25, 250000.0, 1),
Apartment("Lyon", 83, 200000.0, 2),
Apartment("Lyon", 45, 133000.0, 1),
Apartment("Nice", 74, 325000.0, 3)
)
We are now ready to instantiate a TypedDataset[Apartment]
:
val aptTypedDs = TypedDataset.create(apartments)
// aptTypedDs: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]
We can also create one from an existing Spark Dataset
:
val aptDs = spark.createDataset(apartments)
// aptDs: org.apache.spark.sql.Dataset[Apartment] = [city: string, surface: int ... 2 more fields]
val aptTypedDs = TypedDataset.create(aptDs)
// aptTypedDs: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]
Or use the Frameless syntax:
import frameless.syntax._
val aptTypedDs2 = aptDs.typed
// aptTypedDs2: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]
Typesafe column referencing
This is how we select a particular column from a TypedDataset
:
val cities: TypedDataset[String] = aptTypedDs.select(aptTypedDs('city))
// cities: TypedDataset[String] = [value: string]
This is completely type-safe, for instance suppose we misspell city
as citi
:
aptTypedDs.select(aptTypedDs('citi))
// error: No column Symbol with shapeless.tag.Tagged[String("citi")] of type A in repl.MdocSession.MdocApp0.Apartment
This gets raised at compile time, whereas with the standard Dataset
API the error appears at runtime (enjoy the stack trace):
aptDs.select('citi)
// org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `citi` cannot be resolved. Did you mean one of the following? [`city`, `price`, `surface`, `bedrooms`].;
// 'Project ['citi]
// +- LocalRelation [city#64, surface#65, price#66, bedrooms#67]
//
// at org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:306)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:141)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6(CheckAnalysis.scala:299)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$6$adapted(CheckAnalysis.scala:297)
// at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:244)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:297)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:297)
// at scala.collection.immutable.Stream.foreach(Stream.scala:533)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:297)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:215)
// at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:244)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:215)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:197)
// at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:202)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:193)
// at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:171)
// at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:202)
// at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:225)
// at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
// at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:222)
// at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
// at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
// at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
// at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
// at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
// at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
// at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
// at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
// at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
// at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
// at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:91)
// at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
// at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:89)
// at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:4352)
// at org.apache.spark.sql.Dataset.select(Dataset.scala:1542)
// at repl.MdocSession$MdocApp0$$anonfun$15.apply(FeatureOverview.md:95)
// at repl.MdocSession$MdocApp0$$anonfun$15.apply(FeatureOverview.md:95)
select()
supports arbitrary column operations:
aptTypedDs.select(aptTypedDs('surface) * 10, aptTypedDs('surface) + 2).show().run()
// +----+---+
// | _1| _2|
// +----+---+
// | 500| 52|
// |1000|102|
// | 250| 27|
// | 830| 85|
// | 450| 47|
// | 740| 76|
// +----+---+
//
Note that unlike the standard Spark API, where some operations are lazy and some are not, all TypedDatasets operations are lazy.
In the above example, show()
is lazy. It requires to apply run()
for the show
job to materialize.
A more detailed explanation of Job
is given here.
Next we compute the price by surface unit:
val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface))
// error: overloaded method value / with alternatives:
// (u: Double)(implicit n: frameless.CatalystNumeric[Double])frameless.TypedColumn[repl.MdocSession.MdocApp0.Apartment,Double] <and>
// [Out, TT, W](other: frameless.TypedColumn[TT,Double])(implicit n: frameless.CatalystDivisible[Double,Out], implicit e: frameless.TypedEncoder[Out], implicit w: frameless.With[repl.MdocSession.MdocApp0.Apartment,TT]{type Out = W})frameless.TypedColumn[W,Out]
// cannot be applied to (frameless.TypedColumn[repl.MdocSession.MdocApp0.Apartment,Int])
// val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface))
// ^^^^^^^^^^^^^^^^^^^^
As the error suggests, we can't divide a TypedColumn
of Double
by Int.
For safety, in Frameless only math operations between same types is allowed:
val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface).cast[Double])
// priceBySurfaceUnit: TypedDataset[Double] = [value: double]
priceBySurfaceUnit.collect().run()
// res5: Seq[Double] = WrappedArray(
// 6000.0,
// 4500.0,
// 10000.0,
// 2409.6385542168673,
// 2955.5555555555557,
// 4391.891891891892
// )
Looks like it worked, but that cast
seems unsafe right? Actually it is safe.
Let's try to cast a TypedColumn
of String
to Double
:
aptTypedDs('city).cast[Double]
// error: could not find implicit value for parameter c: frameless.CatalystCast[String,Double]
The compile-time error tells us that to perform the cast, an evidence
(in the form of CatalystCast[String, Double]
) must be available.
Since casting from String
to Double
is not allowed, this results
in a compilation error.
Check here
for the set of available CatalystCast.
Working with Optional columns
When working with real data we have to deal with imperfections, such as missing fields. Columns that may have
missing data should be represented using Options
. For this example, let's assume that the Apartments dataset
may have missing values.
case class ApartmentOpt(city: Option[String], surface: Option[Int], price: Option[Double], bedrooms: Option[Int])
val apartmentsOpt = Seq(
ApartmentOpt(Some("Paris"), Some(50), Some(300000.0), None),
ApartmentOpt(None, None, Some(450000.0), Some(3))
)
val aptTypedDsOpt = TypedDataset.create(apartmentsOpt)
// aptTypedDsOpt: TypedDataset[ApartmentOpt] = [city: string, surface: int ... 2 more fields]
aptTypedDsOpt.show().run()
// +-----+-------+--------+--------+
// | city|surface| price|bedrooms|
// +-----+-------+--------+--------+
// |Paris| 50|300000.0| NULL|
// | NULL| NULL|450000.0| 3|
// +-----+-------+--------+--------+
//
Unfortunately the syntax used above with select()
will not work here:
aptTypedDsOpt.select(aptTypedDsOpt('surface) * 10, aptTypedDsOpt('surface) + 2).show().run()
// error: overloaded method value * with alternatives:
// (u: Option[Int])(implicit n: frameless.CatalystNumeric[Option[Int]])frameless.TypedColumn[ApartmentOpt,Option[Int]] <and>
// [TT, W](other: frameless.TypedColumn[TT,Option[Int]])(implicit n: frameless.CatalystNumeric[Option[Int]], implicit w: frameless.With[ApartmentOpt,TT]{type Out = W}, implicit t: scala.reflect.ClassTag[Option[Int]])frameless.TypedColumn[W,Option[Int]]
// cannot be applied to (Int)
// aptTypedDsOpt.select(aptTypedDsOpt('surface) * 10, aptTypedDsOpt('surface) + 2).show().run()
// ^^^^^^^^^^^^^^^^^^^^^^^^^
// error: overloaded method value + with alternatives:
// (u: Option[Int])(implicit n: frameless.CatalystNumeric[Option[Int]])frameless.TypedColumn[ApartmentOpt,Option[Int]] <and>
// [TT, W](other: frameless.TypedColumn[TT,Option[Int]])(implicit n: frameless.CatalystNumeric[Option[Int]], implicit w: frameless.With[ApartmentOpt,TT]{type Out = W})frameless.TypedColumn[W,Option[Int]]
// cannot be applied to (Int)
// aptTypedDsOpt.select(aptTypedDsOpt('surface) * 10, aptTypedDsOpt('surface) + 2).show().run()
// ^^^^^^^^^^^^^^^^^^^^^^^^^
This is because we cannot multiple an Option
with an Int
. In Scala, Option
has a map()
method to help address
exactly this (e.g., Some(10).map(c => c * 2)
). Frameless follows a similar convention. By applying the opt
method on
any Option[X]
column you can then use map()
to provide a function that works with the unwrapped type X
.
This is best shown in the example bellow:
aptTypedDsOpt.select(aptTypedDsOpt('surface).opt.map(c => c * 10), aptTypedDsOpt('surface).opt.map(_ + 2)).show().run()
Known issue: map()
will throw a runtime exception when the applied function includes a udf()
. If you want to
apply a udf()
to an optional column, we recommend changing your udf
to work directly with Optional
fields.
Casting and projections
In the general case, select()
returns a TypedDataset of type TypedDataset[TupleN[...]]
(with N in [1...10]
).
For example, if we select three columns with types String
, Int
, and Boolean
the result will have type
TypedDataset[(String, Int, Boolean)]
.
We often want to give more expressive types to the result of our computations.
as[T]
allows us to safely cast a TypedDataset[U]
to another of type TypedDataset[T]
as long
as the types in U
and T
align.
When the cast is valid the expression compiles:
case class UpdatedSurface(city: String, surface: Int)
val updated = aptTypedDs.select(aptTypedDs('city), aptTypedDs('surface) + 2).as[UpdatedSurface]
// updated: TypedDataset[UpdatedSurface] = [city: string, surface: int]
updated.show(2).run()
// +-----+-------+
// | city|surface|
// +-----+-------+
// |Paris| 52|
// |Paris| 102|
// +-----+-------+
// only showing top 2 rows
//
Next we try to cast a (String, String)
to an UpdatedSurface
(which has types String
, Int
).
The cast is not valid and the expression does not compile:
aptTypedDs.select(aptTypedDs('city), aptTypedDs('city)).as[UpdatedSurface]
// error: could not find implicit value for parameter as: frameless.ops.As[(String, String),UpdatedSurface]
Advanced topics with select()
When you select()
a single column that has type A
, the resulting type is TypedDataset[A]
and
not TypedDataset[Tuple1[A]]
. This behavior makes working with nested schema easier (i.e., in the case
where A
is a complex data type) and simplifies type-checking column operations (e.g., verify that two
columns can be added, divided, etc.). However, when A
is scalar, say a Long
, it makes it harder to select
and work with the resulting TypedDataset[Long]
. For instance, it's harder to reference this single scalar
column using select()
. If this becomes an issue, you can bypass this behavior by using the
selectMany()
method instead of select()
. In the previous example, selectMany()
will return
TypedDataset[Tuple1[Long]]
and you can reference its single column using the name _1
.
selectMany()
should also be used when you need to select more than 10 columns.
select()
has better IDE support and compiles faster than the macro based selectMany()
,
so prefer select()
for the most common use cases.
When you are handed a single scalar column TypedDataset (e.g., TypedDataset[Double]
)
the best way to reference its single column is using the asCol
(short for "as a column") method.
This is best shown in the example below. We will see more usages of asCol
later in this tutorial.
val priceBySurfaceUnit = aptTypedDs.select(aptTypedDs('price) / aptTypedDs('surface).cast[Double])
// priceBySurfaceUnit: TypedDataset[Double] = [value: double]
priceBySurfaceUnit.select(priceBySurfaceUnit.asCol * 2).show(2).run()
// +-------+
// | value|
// +-------+
// |12000.0|
// | 9000.0|
// +-------+
// only showing top 2 rows
//
Projections
We often want to work with a subset of the fields in a dataset. Projections allow us to easily select our fields of interest while preserving their initial names and types for extra safety.
Here is an example using the TypedDataset[Apartment]
with an additional column:
val aptds = aptTypedDs // For shorter expressions
// aptds: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]
case class ApartmentDetails(city: String, price: Double, surface: Int, ratio: Double)
val aptWithRatio =
aptds.select(
aptds('city),
aptds('price),
aptds('surface),
aptds('price) / aptds('surface).cast[Double]
).as[ApartmentDetails]
// aptWithRatio: TypedDataset[ApartmentDetails] = [city: string, price: double ... 2 more fields]
Suppose we only want to work with city
and ratio
:
case class CityInfo(city: String, ratio: Double)
val cityRatio = aptWithRatio.project[CityInfo]
// cityRatio: TypedDataset[CityInfo] = [city: string, ratio: double]
cityRatio.show(2).run()
// +-----+------+
// | city| ratio|
// +-----+------+
// |Paris|6000.0|
// |Paris|4500.0|
// +-----+------+
// only showing top 2 rows
//
Suppose we only want to work with price
and ratio
:
case class PriceInfo(ratio: Double, price: Double)
val priceInfo = aptWithRatio.project[PriceInfo]
// priceInfo: TypedDataset[PriceInfo] = [ratio: double, price: double]
priceInfo.show(2).run()
// +------+--------+
// | ratio| price|
// +------+--------+
// |6000.0|300000.0|
// |4500.0|450000.0|
// +------+--------+
// only showing top 2 rows
//
We see that the order of the fields does not matter as long as the names and the corresponding types agree. However, if we make a mistake in any of the names and/or their types, then we get a compilation error.
Say we make a typo in a field name:
case class PriceInfo2(ratio: Double, pricEE: Double)
aptWithRatio.project[PriceInfo2]
// error: Cannot prove that ApartmentDetails can be projected to PriceInfo2. Perhaps not all member names and types of PriceInfo2 are the same in ApartmentDetails?
Say we make a mistake in the corresponding type:
case class PriceInfo3(ratio: Int, price: Double) // ratio should be Double
aptWithRatio.project[PriceInfo3]
// error: Cannot prove that ApartmentDetails can be projected to PriceInfo3. Perhaps not all member names and types of PriceInfo3 are the same in ApartmentDetails?
Union of TypedDatasets
Lets create a projection of our original dataset with a subset of the fields.
case class ApartmentShortInfo(city: String, price: Double, bedrooms: Int)
val aptTypedDs2: TypedDataset[ApartmentShortInfo] = aptTypedDs.project[ApartmentShortInfo]
The union of aptTypedDs2
with aptTypedDs
uses all the fields of the caller (aptTypedDs2
)
and expects the other dataset (aptTypedDs
) to include all those fields.
If field names/types do not match you get a compilation error.
aptTypedDs2.union(aptTypedDs).show().run
// +-----+--------+--------+
// | city| price|bedrooms|
// +-----+--------+--------+
// |Paris|300000.0| 2|
// |Paris|450000.0| 3|
// |Paris|250000.0| 1|
// | Lyon|200000.0| 2|
// | Lyon|133000.0| 1|
// | Nice|325000.0| 3|
// |Paris|300000.0| 2|
// |Paris|450000.0| 3|
// |Paris|250000.0| 1|
// | Lyon|200000.0| 2|
// | Lyon|133000.0| 1|
// | Nice|325000.0| 3|
// +-----+--------+--------+
//
The other way around will not compile, since aptTypedDs2
has only a subset of the fields.
aptTypedDs.union(aptTypedDs2).show().run
// error: Cannot prove that ApartmentShortInfo can be projected to repl.MdocSession.MdocApp0.Apartment. Perhaps not all member names and types of repl.MdocSession.MdocApp0.Apartment are the same in ApartmentShortInfo?
// Error occurred in an application involving default arguments.
Finally, as with project
, union
will align fields that have same names/types,
so fields do not have to be in the same order.
TypedDataset functions and transformations
Frameless supports many of Spark's functions and transformations.
However, whenever a Spark function does not exist in Frameless,
calling .dataset
will expose the underlying
Dataset
(from org.apache.spark.sql, the original Spark APIs),
where you can use anything that would be missing from the Frameless' API.
These are the main imports for Frameless' aggregate and non-aggregate functions.
import frameless.functions._ // For literals
import frameless.functions.nonAggregate._ // e.g., concat, abs
import frameless.functions.aggregate._ // e.g., count, sum, avg
Drop/Replace/Add fields
dropTupled()
drops a single column and results in a tuple-based schema.
aptTypedDs2.dropTupled('price): TypedDataset[(String,Int)]
// res18: TypedDataset[(String, Int)] = [_1: string, _2: int]
To drop a column and specify a new schema use drop()
.
case class CityBeds(city: String, bedrooms: Int)
val cityBeds: TypedDataset[CityBeds] = aptTypedDs2.drop[CityBeds]
// cityBeds: TypedDataset[CityBeds] = [city: string, bedrooms: int]
Often, you want to replace an existing column with a new value.
val inflation = aptTypedDs2.withColumnReplaced('price, aptTypedDs2('price) * 2)
// inflation: TypedDataset[ApartmentShortInfo] = [city: string, price: double ... 1 more field]
inflation.show(2).run()
// +-----+--------+--------+
// | city| price|bedrooms|
// +-----+--------+--------+
// |Paris|600000.0| 2|
// |Paris|900000.0| 3|
// +-----+--------+--------+
// only showing top 2 rows
//
Or use a literal instead.
import frameless.functions.lit
aptTypedDs2.withColumnReplaced('price, lit(0.001))
// res20: TypedDataset[ApartmentShortInfo] = [city: string, price: double ... 1 more field]
Adding a column using withColumnTupled()
results in a tupled-based schema.
aptTypedDs2.withColumnTupled(lit(Array("a","b","c"))).show(2).run()
// +-----+--------+---+---------+
// | _1| _2| _3| _4|
// +-----+--------+---+---------+
// |Paris|300000.0| 2|[a, b, c]|
// |Paris|450000.0| 3|[a, b, c]|
// +-----+--------+---+---------+
// only showing top 2 rows
//
Similarly, withColumn()
adds a column and explicitly expects a schema for the result.
case class CityBedsOther(city: String, bedrooms: Int, other: List[String])
cityBeds.
withColumn[CityBedsOther](lit(List("a","b","c"))).
show(1).run()
// +-----+--------+---------+
// | city|bedrooms| other|
// +-----+--------+---------+
// |Paris| 2|[a, b, c]|
// +-----+--------+---------+
// only showing top 1 row
//
To conditionally change a column use the when/otherwise
operation.
import frameless.functions.nonAggregate.when
aptTypedDs2.withColumnTupled(
when(aptTypedDs2('city) === "Paris", aptTypedDs2('price)).
when(aptTypedDs2('city) === "Lyon", lit(1.1)).
otherwise(lit(0.0))).show(8).run()
// +-----+--------+---+--------+
// | _1| _2| _3| _4|
// +-----+--------+---+--------+
// |Paris|300000.0| 2|300000.0|
// |Paris|450000.0| 3|450000.0|
// |Paris|250000.0| 1|250000.0|
// | Lyon|200000.0| 2| 1.1|
// | Lyon|133000.0| 1| 1.1|
// | Nice|325000.0| 3| 0.0|
// +-----+--------+---+--------+
//
A simple way to add a column without losing important schema information is
to project the entire source schema into a single column using the asCol()
method.
val c = cityBeds.select(cityBeds.asCol, lit(List("a","b","c")))
// c: TypedDataset[(CityBeds, List[String])] = [_1: struct<city: string, bedrooms: int>, _2: array<string>]
c.show(1).run()
// +----------+---------+
// | _1| _2|
// +----------+---------+
// |{Paris, 2}|[a, b, c]|
// +----------+---------+
// only showing top 1 row
//
When working with Spark's DataFrames
, you often select all columns using .select($"*", ...)
.
In a way, asCol()
is a typed equivalent of $"*"
.
To access nested columns, use the colMany()
method.
c.select(c.colMany('_1, 'city), c('_2)).show(2).run()
// +-----+---------+
// | _1| _2|
// +-----+---------+
// |Paris|[a, b, c]|
// |Paris|[a, b, c]|
// +-----+---------+
// only showing top 2 rows
//
Working with collections
import frameless.functions._
import frameless.functions.nonAggregate._
val t = cityRatio.select(cityRatio('city), lit(List("abc","c","d")))
// t: TypedDataset[(String, List[String])] = [_1: string, _2: array<string>]
t.withColumnTupled(
arrayContains(t('_2), "abc")
).show(1).run()
// +-----+-----------+----+
// | _1| _2| _3|
// +-----+-----------+----+
// |Paris|[abc, c, d]|true|
// +-----+-----------+----+
// only showing top 1 row
//
If accidentally you apply a collection function on a column that is not a collection, you get a compilation error.
t.withColumnTupled(
arrayContains(t('_1), "abc")
)
// error: no type parameters for method arrayContains: (column: frameless.AbstractTypedColumn[T,C[A]], value: A)(implicit evidence$1: frameless.CatalystCollection[C])column.ThisType[T,Boolean] exist so that it can be applied to arguments (frameless.TypedColumn[(String, List[String]),String], String)
// --- because ---
// argument expression's type is not compatible with formal parameter type;
// found : frameless.TypedColumn[(String, List[String]),String]
// required: frameless.AbstractTypedColumn[?T,?C[?A]]
//
// Error occurred in an application involving default arguments.
// error: type mismatch;
// found : frameless.TypedColumn[(String, List[String]),String]
// required: frameless.AbstractTypedColumn[T,C[A]]
// Error occurred in an application involving default arguments.
// error: type mismatch;
// found : String("abc")
// required: A
// Error occurred in an application involving default arguments.
// arrayContains(t('_1), "abc")
// ^^^^^
// error: Cannot do collection operations on columns of type C.
// Error occurred in an application involving default arguments.
Flattening columns in Spark is done with the explode()
method. Unlike vanilla Spark,
in Frameless explode()
is part of TypedDataset
and not a function of a column.
This provides additional safety since more than one explode()
applied in a single
statement results in runtime error in vanilla Spark.
val t2 = cityRatio.select(cityRatio('city), lit(List(1,2,3,4)))
// t2: TypedDataset[(String, List[Int])] = [_1: string, _2: array<int>]
val flattened = t2.explode('_2): TypedDataset[(String, Int)]
// flattened: TypedDataset[(String, Int)] = [_1: string, _2: int]
flattened.show(4).run()
// +-----+---+
// | _1| _2|
// +-----+---+
// |Paris| 1|
// |Paris| 2|
// |Paris| 3|
// |Paris| 4|
// +-----+---+
// only showing top 4 rows
//
Here is an example of how explode()
may fail in vanilla Spark. The Frameless
implementation does not suffer from this problem since, by design, it can only be applied
to a single column at a time.
{
import org.apache.spark.sql.functions.{explode => sparkExplode}
t2.dataset.toDF().select(sparkExplode($"_2"), sparkExplode($"_2"))
}
// error: Unit does not take parameters
// Error occurred in an application involving default arguments.
Collecting data to the driver
In Frameless all Spark actions (such as collect()
) are safe.
Take the first element from a dataset (if the dataset is empty return None
).
cityBeds.headOption.run()
// res30: Option[CityBeds] = Some(CityBeds("Paris", 2))
Take the first n
elements.
cityBeds.take(2).run()
// res31: Seq[CityBeds] = WrappedArray(
// CityBeds("Paris", 2),
// CityBeds("Paris", 3)
// )
cityBeds.head(3).run()
// res32: Seq[CityBeds] = WrappedArray(
// CityBeds("Paris", 2),
// CityBeds("Paris", 3),
// CityBeds("Paris", 1)
// )
cityBeds.limit(4).collect().run()
// res33: Seq[CityBeds] = WrappedArray(
// CityBeds("Paris", 2),
// CityBeds("Paris", 3),
// CityBeds("Paris", 1),
// CityBeds("Lyon", 2)
// )
Sorting columns
Only column types that can be sorted are allowed to be selected for sorting.
aptTypedDs.orderBy(aptTypedDs('city).asc).show(2).run()
// +----+-------+--------+--------+
// |city|surface| price|bedrooms|
// +----+-------+--------+--------+
// |Lyon| 83|200000.0| 2|
// |Lyon| 45|133000.0| 1|
// +----+-------+--------+--------+
// only showing top 2 rows
//
The ordering can be changed by selecting .acs
or .desc
.
aptTypedDs.orderBy(
aptTypedDs('city).asc,
aptTypedDs('price).desc
).show(2).run()
// +----+-------+--------+--------+
// |city|surface| price|bedrooms|
// +----+-------+--------+--------+
// |Lyon| 83|200000.0| 2|
// |Lyon| 45|133000.0| 1|
// +----+-------+--------+--------+
// only showing top 2 rows
//
User Defined Functions
Frameless supports lifting any Scala function (up to five arguments) to the
context of a particular TypedDataset
:
// The function we want to use as UDF
val priceModifier =
(name: String, price:Double) => if(name == "Paris") price * 2.0 else price
// priceModifier: (String, Double) => Double = <function2>
val udf = aptTypedDs.makeUDF(priceModifier)
// udf: (frameless.TypedColumn[Apartment, String], frameless.TypedColumn[Apartment, Double]) => frameless.TypedColumn[Apartment, Double] = frameless.functions.Udf$$Lambda$15360/0x00000008041f7840@7ce1d593
val aptds = aptTypedDs // For shorter expressions
// aptds: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]
val adjustedPrice = aptds.select(aptds('city), udf(aptds('city), aptds('price)))
// adjustedPrice: TypedDataset[(String, Double)] = [_1: string, _2: double]
adjustedPrice.show().run()
// +-----+--------+
// | _1| _2|
// +-----+--------+
// |Paris|600000.0|
// |Paris|900000.0|
// |Paris|500000.0|
// | Lyon|200000.0|
// | Lyon|133000.0|
// | Nice|325000.0|
// +-----+--------+
//
GroupBy and Aggregations
Let's suppose we wanted to retrieve the average apartment price in each city
val priceByCity = aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('price)))
// priceByCity: TypedDataset[(String, Double)] = [_1: string, _2: double]
priceByCity.collect().run()
// res37: Seq[(String, Double)] = WrappedArray(
// ("Paris", 333333.3333333333),
// ("Lyon", 166500.0),
// ("Nice", 325000.0)
// )
Again if we try to aggregate a column that can't be aggregated, we get a compilation error
aptTypedDs.groupBy(aptTypedDs('city)).agg(avg(aptTypedDs('city)))
// error: Cannot compute average of type String.
// Error occurred in an application involving default arguments.
Next, we combine select
and groupBy
to calculate the average price/surface ratio per city:
val aptds = aptTypedDs // For shorter expressions
// aptds: TypedDataset[Apartment] = [city: string, surface: int ... 2 more fields]
val cityPriceRatio = aptds.select(aptds('city), aptds('price) / aptds('surface).cast[Double])
// cityPriceRatio: TypedDataset[(String, Double)] = [_1: string, _2: double]
cityPriceRatio.groupBy(cityPriceRatio('_1)).agg(avg(cityPriceRatio('_2))).show().run()
// +-----+------------------+
// | _1| _2|
// +-----+------------------+
// |Paris| 6833.333333333333|
// | Lyon|2682.5970548862115|
// | Nice| 4391.891891891892|
// +-----+------------------+
//
We can also use pivot
to further group data on a secondary column.
For example, we can compare the average price across cities by number of bedrooms.
case class BedroomStats(
city: String,
AvgPriceBeds1: Option[Double], // Pivot values may be missing, so we encode them using Options
AvgPriceBeds2: Option[Double],
AvgPriceBeds3: Option[Double],
AvgPriceBeds4: Option[Double])
val bedroomStats = aptds.
groupBy(aptds('city)).
pivot(aptds('bedrooms)).
on(1,2,3,4). // We only care for up to 4 bedrooms
agg(avg(aptds('price))).
as[BedroomStats] // Typesafe casting
// bedroomStats: TypedDataset[BedroomStats] = [city: string, AvgPriceBeds1: double ... 3 more fields]
bedroomStats.show().run()
// +-----+-------------+-------------+-------------+-------------+
// | city|AvgPriceBeds1|AvgPriceBeds2|AvgPriceBeds3|AvgPriceBeds4|
// +-----+-------------+-------------+-------------+-------------+
// | Nice| NULL| NULL| 325000.0| NULL|
// |Paris| 250000.0| 300000.0| 450000.0| NULL|
// | Lyon| 133000.0| 200000.0| NULL| NULL|
// +-----+-------------+-------------+-------------+-------------+
//
With pivot, collecting data preserves typesafety by
encoding potentially missing columns with Option
.
bedroomStats.collect().run().foreach(println)
// BedroomStats(Nice,None,None,Some(325000.0),None)
// BedroomStats(Paris,Some(250000.0),Some(300000.0),Some(450000.0),None)
// BedroomStats(Lyon,Some(133000.0),Some(200000.0),None,None)
Working with Optional fields
Optional fields can be converted to non-optional using getOrElse()
.
val sampleStats = bedroomStats.select(
bedroomStats('AvgPriceBeds2).getOrElse(0.0),
bedroomStats('AvgPriceBeds3).getOrElse(0.0))
// sampleStats: TypedDataset[(Double, Double)] = [_1: double, _2: double]
sampleStats.show().run()
// +--------+--------+
// | _1| _2|
// +--------+--------+
// | 0.0|325000.0|
// |300000.0|450000.0|
// |200000.0| 0.0|
// +--------+--------+
//
In addition, optional columns can be flatten using the .flattenOption
method on TypedDatset
.
The result contains the rows for which the flattened column is not None (or null). The schema
is automatically adapted to reflect this change.
val flattenStats = bedroomStats.flattenOption('AvgPriceBeds2)
// flattenStats: TypedDataset[shapeless.ops.TuplerInstances.<refinement>.this.type.Out] = [_1: string, _2: double ... 3 more fields]
// The second Option[Double] is now of type Double, since all 'null' values are removed
flattenStats: TypedDataset[(String, Option[Double], Double, Option[Double], Option[Double])]
// res43: TypedDataset[(String, Option[Double], Double, Option[Double], Option[Double])] = [_1: string, _2: double ... 3 more fields]
In a DataFrame, if you just ignore types, this would equivelantly be written as:
bedroomStats.dataset.toDF().filter($"AvgPriceBeds2".isNotNull)
// res44: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [city: string, AvgPriceBeds1: double ... 3 more fields]
Entire TypedDataset Aggregation
We often want to aggregate the entire TypedDataset
and skip the groupBy()
clause.
In Frameless you can do this using the agg()
operator directly on the TypedDataset
.
In the following example, we compute the average price, the average surface,
the minimum surface, and the set of cities for the entire dataset.
case class Stats(
avgPrice: Double,
avgSurface: Double,
minSurface: Int,
allCities: Vector[String])
aptds.agg(
avg(aptds('price)),
avg(aptds('surface)),
min(aptds('surface)),
collectSet(aptds('city))
).as[Stats].show().run()
// +-----------------+------------------+----------+-------------------+
// | avgPrice| avgSurface|minSurface| allCities|
// +-----------------+------------------+----------+-------------------+
// |276333.3333333333|62.833333333333336| 25|[Paris, Nice, Lyon]|
// +-----------------+------------------+----------+-------------------+
//
You may apply any TypedColumn
operation to a TypedAggregate
column as well.
import frameless.functions._
aptds.agg(
avg(aptds('price)) * min(aptds('surface)).cast[Double],
avg(aptds('surface)) * 0.2,
litAggr("Hello World")
).show().run()
// +-----------------+------------------+-----------+
// | _1| _2| _3|
// +-----------------+------------------+-----------+
// |6908333.333333333|12.566666666666668|Hello World|
// +-----------------+------------------+-----------+
//
Joins
case class CityPopulationInfo(name: String, population: Int)
val cityInfo = Seq(
CityPopulationInfo("Paris", 2229621),
CityPopulationInfo("Lyon", 500715),
CityPopulationInfo("Nice", 343629)
)
val citiInfoTypedDS = TypedDataset.create(cityInfo)
Here is how to join the population information to the apartment's dataset:
val withCityInfo = aptTypedDs.joinInner(citiInfoTypedDS) { aptTypedDs('city) === citiInfoTypedDS('name) }
// withCityInfo: TypedDataset[(Apartment, CityPopulationInfo)] = [_1: struct<city: string, surface: int ... 2 more fields>, _2: struct<name: string, population: int>]
withCityInfo.show().run()
// +--------------------+----------------+
// | _1| _2|
// +--------------------+----------------+
// |{Paris, 50, 30000...|{Paris, 2229621}|
// |{Paris, 100, 4500...|{Paris, 2229621}|
// |{Paris, 25, 25000...|{Paris, 2229621}|
// |{Lyon, 83, 200000...| {Lyon, 500715}|
// |{Lyon, 45, 133000...| {Lyon, 500715}|
// |{Nice, 74, 325000...| {Nice, 343629}|
// +--------------------+----------------+
//
The joined TypedDataset has type TypedDataset[(Apartment, CityPopulationInfo)]
.
We can then select which information we want to continue to work with:
case class AptPriceCity(city: String, aptPrice: Double, cityPopulation: Int)
withCityInfo.select(
withCityInfo.colMany('_2, 'name), withCityInfo.colMany('_1, 'price), withCityInfo.colMany('_2, 'population)
).as[AptPriceCity].show().run
// +-----+--------+--------------+
// | city|aptPrice|cityPopulation|
// +-----+--------+--------------+
// |Paris|300000.0| 2229621|
// |Paris|450000.0| 2229621|
// |Paris|250000.0| 2229621|
// | Lyon|200000.0| 500715|
// | Lyon|133000.0| 500715|
// | Nice|325000.0| 343629|
// +-----+--------+--------------+
//
Chained Joins
Joins, or any similar operation, may be chained using a thrush combinator removing the need for intermediate values. Instead of:
val withBedroomInfoInterim = aptTypedDs.joinInner(citiInfoTypedDS)( aptTypedDs('city) === citiInfoTypedDS('name) )
// withBedroomInfoInterim: TypedDataset[(Apartment, CityPopulationInfo)] = [_1: struct<city: string, surface: int ... 2 more fields>, _2: struct<name: string, population: int>]
val withBedroomInfo = withBedroomInfoInterim
.joinLeft(bedroomStats)( withBedroomInfoInterim.col('_1).field('city) === bedroomStats('city) )
// withBedroomInfo: TypedDataset[((Apartment, CityPopulationInfo), Option[BedroomStats])] = [_1: struct<_1: struct<city: string, surface: int ... 2 more fields>, _2: struct<name: string, population: int>>, _2: struct<city: string, AvgPriceBeds1: double ... 3 more fields>]
withBedroomInfo.show().run()
// +--------------------+--------------------+
// | _1| _2|
// +--------------------+--------------------+
// |{{Paris, 50, 3000...|{Paris, 250000.0,...|
// |{{Paris, 100, 450...|{Paris, 250000.0,...|
// |{{Paris, 25, 2500...|{Paris, 250000.0,...|
// |{{Lyon, 83, 20000...|{Lyon, 133000.0, ...|
// |{{Lyon, 45, 13300...|{Lyon, 133000.0, ...|
// |{{Nice, 74, 32500...|{Nice, NULL, NULL...|
// +--------------------+--------------------+
//
You can use thrush from mouse:
libraryDependencies += "org.typelevel" %% "mouse" % "1.2.1"
import mouse.all._
val withBedroomInfoChained = aptTypedDs.joinInner(citiInfoTypedDS)( aptTypedDs('city) === citiInfoTypedDS('name) )
.thrush( interim => interim.joinLeft(bedroomStats)( interim.col('_1).field('city) === bedroomStats('city) ) )
// withBedroomInfoChained: TypedDataset[((Apartment, CityPopulationInfo), Option[BedroomStats])] = [_1: struct<_1: struct<city: string, surface: int ... 2 more fields>, _2: struct<name: string, population: int>>, _2: struct<city: string, AvgPriceBeds1: double ... 3 more fields>]
withBedroomInfoChained.show().run()
// +--------------------+--------------------+
// | _1| _2|
// +--------------------+--------------------+
// |{{Paris, 50, 3000...|{Paris, 250000.0,...|
// |{{Paris, 100, 450...|{Paris, 250000.0,...|
// |{{Paris, 25, 2500...|{Paris, 250000.0,...|
// |{{Lyon, 83, 20000...|{Lyon, 133000.0, ...|
// |{{Lyon, 45, 13300...|{Lyon, 133000.0, ...|
// |{{Nice, 74, 32500...|{Nice, NULL, NULL...|
// +--------------------+--------------------+
//