Using Cats with Frameless
There are two main parts to the cats
integration offered by Frameless:
- effect suspension in
TypedDataset
usingcats-effect
andcats-mtl
RDD
enhancements using algebraic typeclasses incats-kernel
All the examples below assume you have previously imported cats.implicits
and frameless.cats.implicits
.
Note that you should not import frameless.syntax._
together with frameless.cats.implicits._
.
import cats.syntax.all._
import frameless.cats.implicits._
Effect Suspension in typed datasets
As noted in the section about Job
, all operations on TypedDataset
are lazy. The results of
operations that would normally block on plain Spark APIs are wrapped in a type constructor F[_]
,
for which there exists an instance of SparkDelay[F]
. This typeclass represents the operation of
delaying a computation and capturing an implicit SparkSession
.
In the cats
module, we utilize the typeclasses from cats-effect
for abstracting over these
effect types - namely, we provide an implicit SparkDelay
instance for all F[_]
for which exists
an instance of cats.effect.Sync[F]
.
This allows one to run operations on TypedDataset
in an existing monad stack. For example, given
this pre-existing monad stack:
import frameless.TypedDataset
import cats.data.ReaderT
import cats.effect.IO
import cats.effect.implicits._
type Action[T] = ReaderT[IO, SparkSession, T]
We will be able to request that values from TypedDataset
will be suspended in this stack:
val typedDs = TypedDataset.create(Seq((1, "string"), (2, "another")))
// typedDs: TypedDataset[(Int, String)] = [_1: int, _2: string]
val result: Action[(Seq[(Int, String)], Long)] = for {
sample <- typedDs.take[Action](1)
count <- typedDs.count[Action]()
} yield (sample, count)
// result: Action[(Seq[(Int, String)], Long)] = Kleisli(
// cats.data.Kleisli$$Lambda$12774/0x000000080382d840@29f28d8d
// )
As with Job
, note that nothing has been run yet. The effect has been properly suspended. To
run our program, we must first supply the SparkSession
to the ReaderT
layer and then
run the IO
effect:
import cats.effect.unsafe.implicits.global
result.run(spark).unsafeRunSync()
// res5: (Seq[(Int, String)], Long) = (WrappedArray((1, "string")), 2L)
Convenience methods for modifying Spark thread-local variables
The frameless.cats.implicits._
import also provides some syntax enrichments for any monad
stack that has the same capabilities as Action
above. Namely, the ability to provide an
instance of SparkSession
and the ability to suspend effects.
For these to work, we will need to import the implicit machinery from the cats-mtl
library:
import cats.mtl.implicits._
And now, we can set the description for the computation being run:
val resultWithDescription: Action[(Seq[(Int, String)], Long)] = for {
r <- result.withDescription("fancy cats")
session <- ReaderT.ask[IO, SparkSession]
_ <- ReaderT.liftF {
IO {
println(s"Description: ${session.sparkContext.getLocalProperty("spark.job.description")}")
}
}
} yield r
// resultWithDescription: Action[(Seq[(Int, String)], Long)] = Kleisli(
// cats.data.Kleisli$$$Lambda$14294/0x0000000803e11040@5773f817
// )
resultWithDescription.run(spark).unsafeRunSync()
// Description: fancy cats
// res6: (Seq[(Int, String)], Long) = (WrappedArray((1, "string")), 2L)
Using algebraic typeclasses from Cats with RDDs
Data aggregation is one of the most important operations when working with Spark (and data in general).
For example, we often have to compute the min
, max
, avg
, etc. from a set of columns grouped by
different predicates. This section shows how cats simplifies these tasks in Spark by
leveraging a large collection of Type Classes for ordering and aggregating data.
Cats offers ways to sort and aggregate tuples of arbitrary arity.
import frameless.cats.implicits._
val data: RDD[(Int, Int, Int)] = sc.makeRDD((1, 2, 3) :: (1, 5, 3) :: (8, 2, 3) :: Nil)
// data: RDD[(Int, Int, Int)] = ParallelCollectionRDD[12] at makeRDD at Cats.md:130
println(data.csum)
// (10,9,9)
println(data.cmax)
// (8,2,3)
println(data.cmin)
// (1,2,3)
In case the RDD is empty, the csum
, cmax
and cmin
will use the default values for the type of
elements inside the RDD. There are counterpart operations to those that have an Option
return type
to deal with the case of an empty RDD:
val data: RDD[(Int, Int, Int)] = sc.emptyRDD
// data: RDD[(Int, Int, Int)] = EmptyRDD[13] at emptyRDD at Cats.md:146
println(data.csum)
// (0,0,0)
println(data.csumOption)
// None
println(data.cmax)
// (0,0,0)
println(data.cmaxOption)
// None
println(data.cmin)
// (0,0,0)
println(data.cminOption)
// None
The following example aggregates all the elements with a common key.
type User = String
type TransactionCount = Int
val allData: RDD[(User,TransactionCount)] =
sc.makeRDD(("Bob", 12) :: ("Joe", 1) :: ("Anna", 100) :: ("Bob", 20) :: ("Joe", 2) :: Nil)
// allData: RDD[(User, TransactionCount)] = ParallelCollectionRDD[14] at makeRDD at Cats.md:177
val totalPerUser = allData.csumByKey
// totalPerUser: RDD[(User, TransactionCount)] = ShuffledRDD[15] at reduceByKey at implicits.scala:42
totalPerUser.collectAsMap
// res16: collection.Map[User, TransactionCount] = Map(
// "Bob" -> 32,
// "Joe" -> 3,
// "Anna" -> 100
// )
The same example would work for more complex keys.
import scala.collection.immutable.SortedMap
val allDataComplexKeu =
sc.makeRDD( ("Bob", SortedMap("task1" -> 10)) ::
("Joe", SortedMap("task1" -> 1, "task2" -> 3)) :: ("Bob", SortedMap("task1" -> 10, "task2" -> 1)) :: ("Joe", SortedMap("task3" -> 4)) :: Nil )
// allDataComplexKeu: RDD[(String, SortedMap[String, Int])] = ParallelCollectionRDD[16] at makeRDD at Cats.md:193
val overalTasksPerUser = allDataComplexKeu.csumByKey
// overalTasksPerUser: RDD[(String, SortedMap[String, Int])] = ShuffledRDD[17] at reduceByKey at implicits.scala:42
overalTasksPerUser.collectAsMap
// res17: collection.Map[String, SortedMap[String, Int]] = Map(
// "Bob" -> Map("task1" -> 20, "task2" -> 1),
// "Joe" -> Map("task1" -> 1, "task2" -> 3, "task3" -> 4)
// )
Joins
// Type aliases for meaningful types
type TimeSeries = Map[Int,Int]
type UserName = String
Example: Using the implicit full-our-join operator
import frameless.cats.outer._
val day1: RDD[(UserName,TimeSeries)] = sc.makeRDD( ("John", Map(0 -> 2, 1 -> 4)) :: ("Chris", Map(0 -> 1, 1 -> 2)) :: ("Sam", Map(0 -> 1)) :: Nil )
// day1: RDD[(UserName, TimeSeries)] = ParallelCollectionRDD[18] at makeRDD at Cats.md:218
val day2: RDD[(UserName,TimeSeries)] = sc.makeRDD( ("John", Map(0 -> 10, 1 -> 11)) :: ("Chris", Map(0 -> 1, 1 -> 2)) :: ("Joe", Map(0 -> 1, 1 -> 2)) :: Nil )
// day2: RDD[(UserName, TimeSeries)] = ParallelCollectionRDD[19] at makeRDD at Cats.md:221
val daysCombined = day1 |+| day2
// daysCombined: RDD[(UserName, TimeSeries)] = MapPartitionsRDD[23] at mapValues at implicits.scala:67
daysCombined.collect()
// res18: Array[(UserName, TimeSeries)] = Array(
// ("Joe", Map(0 -> 1, 1 -> 2)),
// ("Sam", Map(0 -> 1)),
// ("Chris", Map(0 -> 2, 1 -> 4)),
// ("John", Map(0 -> 12, 1 -> 15))
// )
Note how the user's timeseries from different days have been aggregated together.
The |+|
(Semigroup) operator for key-value pair RDD will execute a full-outer-join
on the key and combine values using the default Semigroup for the value type.
In cats
:
Map(1 -> 2, 2 -> 3) |+| Map(1 -> 4, 2 -> -1)
// res19: Map[Int, Int] = Map(1 -> 6, 2 -> 2)