Typed Spark ML

The frameless-ml module provides a strongly typed Spark ML API leveraging TypedDatasets. It introduces TypedTransformers and TypedEstimators, the type-safe equivalents of Spark ML's Transformer and Estimator.

A TypedEstimator fits models to data, i.e trains a ML model based on an input TypedDataset. A TypedTransformer transforms one TypedDataset into another, usually by appending column(s) to it.

By calling the fit method of a TypedEstimator, the TypedEstimator will train a ML model using the TypedDataset passed as input (representing the training data) and will return a TypedTransformer that represents the trained model. This TypedTransformercan then be used to make predictions on an input TypedDataset (representing the test data) using the transform method that will return a new TypedDataset with appended prediction column(s).

Both TypedEstimator and TypedTransformer check at compile-time the correctness of their inputs field names and types, contrary to Spark ML API which only deals with DataFrames (the data structure with the lowest level of type-safety in Spark).

frameless-ml adds type-safety to Spark ML API but stays very close to it in terms of abstractions and API calls, so please check Spark ML documentation for more details on Transformers and Estimators.

Example 1: predict a continuous value using a TypedRandomForestRegressor

In this example, we want to predict the sale price of a house depending on its square footage and the fact that the house has a garden or not. We will use a TypedRandomForestRegressor.

Training

As with the Spark ML API, we use a TypedVectorAssembler (the type-safe equivalent of VectorAssembler) to compute feature vectors:

import frameless._
import frameless.syntax._
import frameless.ml._
import frameless.ml.feature._
import frameless.ml.regression._
import org.apache.spark.ml.linalg.Vector
case class HouseData(squareFeet: Double, hasGarden: Boolean, price: Double)

val trainingData = TypedDataset.create(Seq(
  HouseData(20, false, 100000),
  HouseData(50, false, 200000),
  HouseData(50, true, 250000),
  HouseData(100, true, 500000)
))
// trainingData: TypedDataset[HouseData] = [squareFeet: double, hasGarden: boolean ... 1 more field]

case class Features(squareFeet: Double, hasGarden: Boolean)
val assembler = TypedVectorAssembler[Features]
// assembler: TypedVectorAssembler[Features] = frameless.ml.feature.TypedVectorAssembler@6b0ef032

case class HouseDataWithFeatures(squareFeet: Double, hasGarden: Boolean, price: Double, features: Vector)
val trainingDataWithFeatures = assembler.transform(trainingData).as[HouseDataWithFeatures]
// trainingDataWithFeatures: TypedDataset[HouseDataWithFeatures] = [squareFeet: double, hasGarden: boolean ... 2 more fields]

In the above code snippet, .as[HouseDataWithFeatures] is a TypedDataset's type-safe cast (see TypedDataset: Feature Overview):

case class WrongHouseFeatures(
  squareFeet: Double,
  hasGarden: Int, // hasGarden has wrong type
  price: Double,
  features: Vector
)
assembler.transform(trainingData).as[WrongHouseFeatures]
// error: could not find implicit value for parameter as: frameless.ops.As[(Double, Boolean, Double, org.apache.spark.ml.linalg.Vector),repl.MdocSession.MdocApp0.WrongHouseFeatures]
// assembler.transform(trainingData).as[WrongHouseFeatures]
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Moreover, TypedVectorAssembler[Features] will compile only if Features contains exclusively fields of type Numeric or Boolean:

case class WrongFeatures(squareFeet: Double, hasGarden: Boolean, city: String)
TypedVectorAssembler[WrongFeatures]
// error: Cannot prove that repl.MdocSession.MdocApp0.WrongFeatures is a valid input type. Input type must only contain fields of numeric or boolean types.
// TypedVectorAssembler[WrongFeatures]
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The subsequent call assembler.transform(trainingData) compiles only if trainingData contains all fields (names and types) of Features:

case class WrongHouseData(squareFeet: Double, price: Double) // hasGarden is missing
val wrongTrainingData = TypedDataset.create(Seq(WrongHouseData(20, 100000)))
// wrongTrainingData: TypedDataset[WrongHouseData] = [squareFeet: double, price: double]
assembler.transform(wrongTrainingData)
// error: Cannot prove that repl.MdocSession.MdocApp0.WrongHouseData can be projected to repl.MdocSession.MdocApp0.Features. Perhaps not all member names and types of repl.MdocSession.MdocApp0.Features are the same in repl.MdocSession.MdocApp0.WrongHouseData?
// assembler.transform(wrongTrainingData)
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Then, we train the model. To train a Random Forest, one needs to feed it with features (what we predict from) and with a label (what we predict). In our example, price is the label, features are the features:

case class RFInputs(price: Double, features: Vector)
val rf = TypedRandomForestRegressor[RFInputs]
// rf: TypedRandomForestRegressor[RFInputs] = frameless.ml.regression.TypedRandomForestRegressor@6aa1198e

val model = rf.fit(trainingDataWithFeatures).run()
// model: AppendTransformer[RFInputs, TypedRandomForestRegressor.Outputs, org.apache.spark.ml.regression.RandomForestRegressionModel] = frameless.ml.TypedEstimator$$anon$1@6218df02

TypedRandomForestRegressor[RFInputs] compiles only if RFInputs contains only one field of type Double (the label) and one field of type Vector (the features):

case class WrongRFInputs(labelOfWrongType: String, features: Vector)
TypedRandomForestRegressor[WrongRFInputs]
// error: Cannot prove that repl.MdocSession.MdocApp0.WrongRFInputs is a valid input type. Input type must only contain a field of type Double (the label) and a field of type org.apache.spark.ml.linalg.Vector (the features).
// TypedRandomForestRegressor[WrongRFInputs]
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The subsequent rf.fit(trainingDataWithFeatures) call compiles only if trainingDataWithFeatures contains the same fields (names and types) as RFInputs.

val wrongTrainingDataWithFeatures = TypedDataset.create(Seq(HouseData(20, false, 100000))) // features are missing
// wrongTrainingDataWithFeatures: TypedDataset[HouseData] = [squareFeet: double, hasGarden: boolean ... 1 more field]
rf.fit(wrongTrainingDataWithFeatures) 
// error: Cannot prove that repl.MdocSession.MdocApp0.HouseData can be projected to repl.MdocSession.MdocApp0.RFInputs. Perhaps not all member names and types of repl.MdocSession.MdocApp0.RFInputs are the same in repl.MdocSession.MdocApp0.HouseData?
// rf.fit(wrongTrainingDataWithFeatures) 
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Prediction

We now want to predict price for testData using the previously trained model. Like the Spark ML API, testData has a default value for price (0 in our case) that will be ignored at prediction time. We reuse our assembler to compute the feature vector of testData.

val testData = TypedDataset.create(Seq(HouseData(70, true, 0)))
// testData: TypedDataset[HouseData] = [squareFeet: double, hasGarden: boolean ... 1 more field]
val testDataWithFeatures = assembler.transform(testData).as[HouseDataWithFeatures]
// testDataWithFeatures: TypedDataset[HouseDataWithFeatures] = [squareFeet: double, hasGarden: boolean ... 2 more fields]

case class HousePricePrediction(
  squareFeet: Double,
  hasGarden: Boolean,
  price: Double,
  features: Vector,
  predictedPrice: Double
)
val predictions = model.transform(testDataWithFeatures).as[HousePricePrediction]
// predictions: TypedDataset[HousePricePrediction] = [squareFeet: double, hasGarden: boolean ... 3 more fields]

predictions.select(predictions.col('predictedPrice)).collect.run()
// res7: Seq[Double] = WrappedArray(296250.0)

model.transform(testDataWithFeatures) will only compile if testDataWithFeatures contains a field price of type Double and a field features of type Vector:

model.transform(testData)
// error: Cannot prove that repl.MdocSession.MdocApp0.HouseData can be projected to repl.MdocSession.MdocApp0.RFInputs. Perhaps not all member names and types of repl.MdocSession.MdocApp0.RFInputs are the same in repl.MdocSession.MdocApp0.HouseData?
// val model = rf.fit(indexedData).run()
//     ^^^^^^^^^^^^^^^^^^^^

Example 2: predict a categorical value using a TypedRandomForestClassifier

In this example, we want to predict in which city a house is located depending on its price and its square footage. We use a TypedRandomForestClassifier.

Training

As with the Spark ML API, we use a TypedVectorAssembler to compute feature vectors and a TypedStringIndexer to index city values in order to be able to pass them to a TypedRandomForestClassifier (which only accepts Double values as label):

import frameless.ml.classification._
case class HouseData(squareFeet: Double, city: String, price: Double)

val trainingData = TypedDataset.create(Seq(
  HouseData(100, "lyon", 100000),
  HouseData(200, "lyon", 200000),
  HouseData(100, "san francisco", 500000),
  HouseData(150, "san francisco", 900000)
))
// trainingData: TypedDataset[HouseData] = [squareFeet: double, city: string ... 1 more field]

case class Features(price: Double, squareFeet: Double)
val vectorAssembler = TypedVectorAssembler[Features]
// vectorAssembler: TypedVectorAssembler[Features] = frameless.ml.feature.TypedVectorAssembler@13f58847

case class HouseDataWithFeatures(squareFeet: Double, city: String, price: Double, features: Vector)
val dataWithFeatures = vectorAssembler.transform(trainingData).as[HouseDataWithFeatures]
// dataWithFeatures: TypedDataset[HouseDataWithFeatures] = [squareFeet: double, city: string ... 2 more fields]

case class StringIndexerInput(city: String)
val indexer = TypedStringIndexer[StringIndexerInput]
// indexer: TypedStringIndexer[StringIndexerInput] = frameless.ml.feature.TypedStringIndexer@864cdcc
indexer.estimator.setHandleInvalid("keep")
// res12: org.apache.spark.ml.feature.StringIndexer = strIdx_573fe836f1d8
val indexerModel = indexer.fit(dataWithFeatures).run()
// indexerModel: AppendTransformer[StringIndexerInput, TypedStringIndexer.Outputs, org.apache.spark.ml.feature.StringIndexerModel] = frameless.ml.TypedEstimator$$anon$1@20c7e37d

case class HouseDataWithFeaturesAndIndex(
  squareFeet: Double,
  city: String,
  price: Double,
  features: Vector,
  cityIndexed: Double
)
val indexedData = indexerModel.transform(dataWithFeatures).as[HouseDataWithFeaturesAndIndex]
// indexedData: TypedDataset[HouseDataWithFeaturesAndIndex] = [squareFeet: double, city: string ... 3 more fields]

Then, we train the model:

case class RFInputs(cityIndexed: Double, features: Vector)
val rf = TypedRandomForestClassifier[RFInputs]
// rf: TypedRandomForestClassifier[RFInputs] = frameless.ml.classification.TypedRandomForestClassifier@497ce755

val model = rf.fit(indexedData).run()
// model: AppendTransformer[RFInputs, TypedRandomForestClassifier.Outputs, org.apache.spark.ml.classification.RandomForestClassificationModel] = frameless.ml.TypedEstimator$$anon$1@7674f98d

Prediction

We now want to predict city for testData using the previously trained model. Like the Spark ML API, testData has a default value for city (empty string in our case) that will be ignored at prediction time. We reuse our vectorAssembler to compute the feature vector of testData and our indexerModel to index city.

val testData = TypedDataset.create(Seq(HouseData(120, "", 800000)))
// testData: TypedDataset[HouseData] = [squareFeet: double, city: string ... 1 more field]

val testDataWithFeatures = vectorAssembler.transform(testData).as[HouseDataWithFeatures]
// testDataWithFeatures: TypedDataset[HouseDataWithFeatures] = [squareFeet: double, city: string ... 2 more fields]
val indexedTestData = indexerModel.transform(testDataWithFeatures).as[HouseDataWithFeaturesAndIndex]
// indexedTestData: TypedDataset[HouseDataWithFeaturesAndIndex] = [squareFeet: double, city: string ... 3 more fields]

case class HouseCityPredictionInputs(features: Vector, cityIndexed: Double)
val testInput = indexedTestData.project[HouseCityPredictionInputs]
// testInput: TypedDataset[HouseCityPredictionInputs] = [features: vector, cityIndexed: double]

case class HouseCityPredictionIndexed(
  features: Vector,
  cityIndexed: Double,
  rawPrediction: Vector,
  probability: Vector,
  predictedCityIndexed: Double
)
val indexedPredictions = model.transform(testInput).as[HouseCityPredictionIndexed]
// indexedPredictions: TypedDataset[HouseCityPredictionIndexed] = [features: vector, cityIndexed: double ... 3 more fields]

Then, we use a TypedIndexToString to get back a String value from predictedCityIndexed. TypedIndexToString takes as input the label array computed by our previous indexerModel:

case class IndexToStringInput(predictedCityIndexed: Double)

val indexToString = TypedIndexToString[IndexToStringInput](indexerModel.transformer.labels)
// indexToString: TypedIndexToString[IndexToStringInput] = frameless.ml.feature.TypedIndexToString@58d05022

case class HouseCityPrediction(
  features: Vector,
  cityIndexed: Double,
  rawPrediction: Vector,
  probability: Vector,
  predictedCityIndexed: Double,
  predictedCity: String
)
val predictions = indexToString.transform(indexedPredictions).as[HouseCityPrediction]
// predictions: TypedDataset[HouseCityPrediction] = [features: vector, cityIndexed: double ... 4 more fields]

predictions.select(predictions.col('predictedCity)).collect.run()
// res13: Seq[String] = WrappedArray("san francisco")

List of currently implemented TypedEstimators

List of currently implemented TypedTransformers

Using Vector and Matrix with TypedDataset

frameless-ml provides TypedEncoder instances for org.apache.spark.ml.linalg.Vector and org.apache.spark.ml.linalg.Matrix:

import frameless._
import frameless.ml._
import org.apache.spark.ml.linalg._
val vector = Vectors.dense(1, 2, 3)
// vector: Vector = [1.0,2.0,3.0]
val vectorDs = TypedDataset.create(Seq("label" -> vector))
// vectorDs: TypedDataset[(String, Vector)] = [_1: string, _2: vector]

val matrix = Matrices.dense(2, 1, Array(1, 2))
// matrix: Matrix = 1.0  
// 2.0  
val matrixDs = TypedDataset.create(Seq("label" -> matrix))
// matrixDs: TypedDataset[(String, Matrix)] = [_1: string, _2: matrix]

Under the hood, Vector and Matrix are encoded using org.apache.spark.ml.linalg.VectorUDT and org.apache.spark.ml.linalg.MatrixUDT. This is possible thanks to the implicit derivation from org.apache.spark.sql.types.UserDefinedType[A] to TypedEncoder[A] defined in TypedEncoder companion object.