Typed Spark ML
The frameless-ml
module provides a strongly typed Spark ML API leveraging TypedDataset
s. It introduces TypedTransformer
s
and TypedEstimator
s, the type-safe equivalents of Spark ML's Transformer
and Estimator
.
A TypedEstimator
fits models to data, i.e trains a ML model based on an input TypedDataset
.
A TypedTransformer
transforms one TypedDataset
into another, usually by appending column(s) to it.
By calling the fit
method of a TypedEstimator
, the TypedEstimator
will train a ML model using the TypedDataset
passed as input (representing the training data) and will return a TypedTransformer
that represents the trained model.
This TypedTransformer
can then be used to make predictions on an input TypedDataset
(representing the test data)
using the transform
method that will return a new TypedDataset
with appended prediction column(s).
Both TypedEstimator
and TypedTransformer
check at compile-time the correctness of their inputs field names and types,
contrary to Spark ML API which only deals with DataFrames (the data structure with the lowest level of type-safety in Spark).
frameless-ml
adds type-safety to Spark ML API but stays very close to it in terms of abstractions and API calls, so
please check Spark ML documentation for more details
on Transformer
s and Estimator
s.
Example 1: predict a continuous value using a TypedRandomForestRegressor
In this example, we want to predict the sale price of a house depending on its square footage and the fact that the house
has a garden or not. We will use a TypedRandomForestRegressor
.
Training
As with the Spark ML API, we use a TypedVectorAssembler
(the type-safe equivalent of VectorAssembler
)
to compute feature vectors:
import frameless._
import frameless.syntax._
import frameless.ml._
import frameless.ml.feature._
import frameless.ml.regression._
import org.apache.spark.ml.linalg.Vector
case class HouseData(squareFeet: Double, hasGarden: Boolean, price: Double)
val trainingData = TypedDataset.create(Seq(
HouseData(20, false, 100000),
HouseData(50, false, 200000),
HouseData(50, true, 250000),
HouseData(100, true, 500000)
))
// trainingData: TypedDataset[HouseData] = [squareFeet: double, hasGarden: boolean ... 1 more field]
case class Features(squareFeet: Double, hasGarden: Boolean)
val assembler = TypedVectorAssembler[Features]
// assembler: TypedVectorAssembler[Features] = frameless.ml.feature.TypedVectorAssembler@2130bed0
case class HouseDataWithFeatures(squareFeet: Double, hasGarden: Boolean, price: Double, features: Vector)
val trainingDataWithFeatures = assembler.transform(trainingData).as[HouseDataWithFeatures]
// trainingDataWithFeatures: TypedDataset[HouseDataWithFeatures] = [squareFeet: double, hasGarden: boolean ... 2 more fields]
In the above code snippet, .as[HouseDataWithFeatures]
is a TypedDataset
's type-safe cast
(see TypedDataset: Feature Overview):
case class WrongHouseFeatures(
squareFeet: Double,
hasGarden: Int, // hasGarden has wrong type
price: Double,
features: Vector
)
assembler.transform(trainingData).as[WrongHouseFeatures]
// error: could not find implicit value for parameter as: frameless.ops.As[(Double, Boolean, Double, org.apache.spark.ml.linalg.Vector),repl.MdocSession.MdocApp0.WrongHouseFeatures]
Moreover, TypedVectorAssembler[Features]
will compile only if Features
contains exclusively fields of type Numeric or Boolean:
case class WrongFeatures(squareFeet: Double, hasGarden: Boolean, city: String)
TypedVectorAssembler[WrongFeatures]
// error: Cannot prove that repl.MdocSession.MdocApp0.WrongFeatures is a valid input type. Input type must only contain fields of numeric or boolean types.
The subsequent call assembler.transform(trainingData)
compiles only if trainingData
contains all fields (names and types)
of Features
:
case class WrongHouseData(squareFeet: Double, price: Double) // hasGarden is missing
val wrongTrainingData = TypedDataset.create(Seq(WrongHouseData(20, 100000)))
// wrongTrainingData: TypedDataset[WrongHouseData] = [squareFeet: double, price: double]
assembler.transform(wrongTrainingData)
// error: Cannot prove that repl.MdocSession.MdocApp0.WrongHouseData can be projected to repl.MdocSession.MdocApp0.Features. Perhaps not all member names and types of repl.MdocSession.MdocApp0.Features are the same in repl.MdocSession.MdocApp0.WrongHouseData?
Then, we train the model. To train a Random Forest, one needs to feed it with features (what we predict from) and
with a label (what we predict). In our example, price
is the label, features
are the features:
case class RFInputs(price: Double, features: Vector)
val rf = TypedRandomForestRegressor[RFInputs]
// rf: TypedRandomForestRegressor[RFInputs] = frameless.ml.regression.TypedRandomForestRegressor@45ac9ca8
val model = rf.fit(trainingDataWithFeatures).run()
// model: AppendTransformer[RFInputs, TypedRandomForestRegressor.Outputs, org.apache.spark.ml.regression.RandomForestRegressionModel] = frameless.ml.TypedEstimator$$anon$1@675cd53f
TypedRandomForestRegressor[RFInputs]
compiles only if RFInputs
contains only one field of type Double (the label) and one field of type Vector (the features):
case class WrongRFInputs(labelOfWrongType: String, features: Vector)
TypedRandomForestRegressor[WrongRFInputs]
// error: Cannot prove that repl.MdocSession.MdocApp0.WrongRFInputs is a valid input type. Input type must only contain a field of type Double (the label) and a field of type org.apache.spark.ml.linalg.Vector (the features).
The subsequent rf.fit(trainingDataWithFeatures)
call compiles only if trainingDataWithFeatures
contains the same fields
(names and types) as RFInputs.
val wrongTrainingDataWithFeatures = TypedDataset.create(Seq(HouseData(20, false, 100000))) // features are missing
// wrongTrainingDataWithFeatures: TypedDataset[HouseData] = [squareFeet: double, hasGarden: boolean ... 1 more field]
rf.fit(wrongTrainingDataWithFeatures)
// error: Cannot prove that repl.MdocSession.MdocApp0.HouseData can be projected to repl.MdocSession.MdocApp0.RFInputs. Perhaps not all member names and types of repl.MdocSession.MdocApp0.RFInputs are the same in repl.MdocSession.MdocApp0.HouseData?
Prediction
We now want to predict price
for testData
using the previously trained model. Like the Spark ML API,
testData
has a default value for price
(0
in our case) that will be ignored at prediction time. We reuse
our assembler
to compute the feature vector of testData
.
val testData = TypedDataset.create(Seq(HouseData(70, true, 0)))
// testData: TypedDataset[HouseData] = [squareFeet: double, hasGarden: boolean ... 1 more field]
val testDataWithFeatures = assembler.transform(testData).as[HouseDataWithFeatures]
// testDataWithFeatures: TypedDataset[HouseDataWithFeatures] = [squareFeet: double, hasGarden: boolean ... 2 more fields]
case class HousePricePrediction(
squareFeet: Double,
hasGarden: Boolean,
price: Double,
features: Vector,
predictedPrice: Double
)
val predictions = model.transform(testDataWithFeatures).as[HousePricePrediction]
// predictions: TypedDataset[HousePricePrediction] = [squareFeet: double, hasGarden: boolean ... 3 more fields]
predictions.select(predictions.col('predictedPrice)).collect.run()
// res7: Seq[Double] = WrappedArray(296250.0)
model.transform(testDataWithFeatures)
will only compile if testDataWithFeatures
contains a field price
of type Double
and a field features
of type Vector:
model.transform(testData)
// error: Cannot prove that repl.MdocSession.MdocApp0.HouseData can be projected to repl.MdocSession.MdocApp0.RFInputs. Perhaps not all member names and types of repl.MdocSession.MdocApp0.RFInputs are the same in repl.MdocSession.MdocApp0.HouseData?
Example 2: predict a categorical value using a TypedRandomForestClassifier
In this example, we want to predict in which city a house is located depending on its price and its square footage. We use a
TypedRandomForestClassifier
.
Training
As with the Spark ML API, we use a TypedVectorAssembler
to compute feature vectors and a TypedStringIndexer
to index city
values in order to be able to pass them to a TypedRandomForestClassifier
(which only accepts Double values as label):
import frameless.ml.classification._
case class HouseData(squareFeet: Double, city: String, price: Double)
val trainingData = TypedDataset.create(Seq(
HouseData(100, "lyon", 100000),
HouseData(200, "lyon", 200000),
HouseData(100, "san francisco", 500000),
HouseData(150, "san francisco", 900000)
))
// trainingData: TypedDataset[HouseData] = [squareFeet: double, city: string ... 1 more field]
case class Features(price: Double, squareFeet: Double)
val vectorAssembler = TypedVectorAssembler[Features]
// vectorAssembler: TypedVectorAssembler[Features] = frameless.ml.feature.TypedVectorAssembler@36329dac
case class HouseDataWithFeatures(squareFeet: Double, city: String, price: Double, features: Vector)
val dataWithFeatures = vectorAssembler.transform(trainingData).as[HouseDataWithFeatures]
// dataWithFeatures: TypedDataset[HouseDataWithFeatures] = [squareFeet: double, city: string ... 2 more fields]
case class StringIndexerInput(city: String)
val indexer = TypedStringIndexer[StringIndexerInput]
// indexer: TypedStringIndexer[StringIndexerInput] = frameless.ml.feature.TypedStringIndexer@10dc6f63
indexer.estimator.setHandleInvalid("keep")
// res12: org.apache.spark.ml.feature.StringIndexer = strIdx_9d16ec84e211
val indexerModel = indexer.fit(dataWithFeatures).run()
// indexerModel: AppendTransformer[StringIndexerInput, TypedStringIndexer.Outputs, org.apache.spark.ml.feature.StringIndexerModel] = frameless.ml.TypedEstimator$$anon$1@4e32f26e
case class HouseDataWithFeaturesAndIndex(
squareFeet: Double,
city: String,
price: Double,
features: Vector,
cityIndexed: Double
)
val indexedData = indexerModel.transform(dataWithFeatures).as[HouseDataWithFeaturesAndIndex]
// indexedData: TypedDataset[HouseDataWithFeaturesAndIndex] = [squareFeet: double, city: string ... 3 more fields]
Then, we train the model:
case class RFInputs(cityIndexed: Double, features: Vector)
val rf = TypedRandomForestClassifier[RFInputs]
// rf: TypedRandomForestClassifier[RFInputs] = frameless.ml.classification.TypedRandomForestClassifier@531660e5
val model = rf.fit(indexedData).run()
// model: AppendTransformer[RFInputs, TypedRandomForestClassifier.Outputs, org.apache.spark.ml.classification.RandomForestClassificationModel] = frameless.ml.TypedEstimator$$anon$1@5f0b0c3c
Prediction
We now want to predict city
for testData
using the previously trained model. Like the Spark ML API,
testData
has a default value for city
(empty string in our case) that will be ignored at prediction time. We reuse
our vectorAssembler
to compute the feature vector of testData
and our indexerModel
to index city
.
val testData = TypedDataset.create(Seq(HouseData(120, "", 800000)))
// testData: TypedDataset[HouseData] = [squareFeet: double, city: string ... 1 more field]
val testDataWithFeatures = vectorAssembler.transform(testData).as[HouseDataWithFeatures]
// testDataWithFeatures: TypedDataset[HouseDataWithFeatures] = [squareFeet: double, city: string ... 2 more fields]
val indexedTestData = indexerModel.transform(testDataWithFeatures).as[HouseDataWithFeaturesAndIndex]
// indexedTestData: TypedDataset[HouseDataWithFeaturesAndIndex] = [squareFeet: double, city: string ... 3 more fields]
case class HouseCityPredictionInputs(features: Vector, cityIndexed: Double)
val testInput = indexedTestData.project[HouseCityPredictionInputs]
// testInput: TypedDataset[HouseCityPredictionInputs] = [features: vector, cityIndexed: double]
case class HouseCityPredictionIndexed(
features: Vector,
cityIndexed: Double,
rawPrediction: Vector,
probability: Vector,
predictedCityIndexed: Double
)
val indexedPredictions = model.transform(testInput).as[HouseCityPredictionIndexed]
// indexedPredictions: TypedDataset[HouseCityPredictionIndexed] = [features: vector, cityIndexed: double ... 3 more fields]
Then, we use a TypedIndexToString
to get back a String value from predictedCityIndexed
. TypedIndexToString
takes
as input the label array computed by our previous indexerModel
:
case class IndexToStringInput(predictedCityIndexed: Double)
val indexToString = TypedIndexToString[IndexToStringInput](indexerModel.transformer.labels)
// indexToString: TypedIndexToString[IndexToStringInput] = frameless.ml.feature.TypedIndexToString@6a0a607
case class HouseCityPrediction(
features: Vector,
cityIndexed: Double,
rawPrediction: Vector,
probability: Vector,
predictedCityIndexed: Double,
predictedCity: String
)
val predictions = indexToString.transform(indexedPredictions).as[HouseCityPrediction]
// predictions: TypedDataset[HouseCityPrediction] = [features: vector, cityIndexed: double ... 4 more fields]
predictions.select(predictions.col('predictedCity)).collect.run()
// res13: Seq[String] = WrappedArray("san francisco")
List of currently implemented TypedEstimator
s
TypedRandomForestClassifier
TypedRandomForestRegressor
- ... your contribution here ... :)
List of currently implemented TypedTransformer
s
TypedIndexToString
TypedStringIndexer
TypedVectorAssembler
- ... your contribution here ... :)
Using Vector and Matrix with TypedDataset
frameless-ml
provides TypedEncoder
instances for org.apache.spark.ml.linalg.Vector
and org.apache.spark.ml.linalg.Matrix
:
import frameless._
import frameless.ml._
import org.apache.spark.ml.linalg._
val vector = Vectors.dense(1, 2, 3)
// vector: Vector = [1.0,2.0,3.0]
val vectorDs = TypedDataset.create(Seq("label" -> vector))
// vectorDs: TypedDataset[(String, Vector)] = [_1: string, _2: vector]
val matrix = Matrices.dense(2, 1, Array(1, 2))
// matrix: Matrix = 1.0
// 2.0
val matrixDs = TypedDataset.create(Seq("label" -> matrix))
// matrixDs: TypedDataset[(String, Matrix)] = [_1: string, _2: matrix]
Under the hood, Vector and Matrix are encoded using org.apache.spark.ml.linalg.VectorUDT
and org.apache.spark.ml.linalg.MatrixUDT
. This is possible thanks to the implicit derivation
from org.apache.spark.sql.types.UserDefinedType[A]
to TypedEncoder[A]
defined in TypedEncoder
companion object.