Using Cats with Frameless

There are two main parts to the cats integration offered by Frameless:

All the examples below assume you have previously imported cats.implicits and frameless.cats.implicits.

Note that you should not import frameless.syntax._ together with frameless.cats.implicits._.

import cats.syntax.all._
import frameless.cats.implicits._

Effect Suspension in typed datasets

As noted in the section about Job, all operations on TypedDataset are lazy. The results of operations that would normally block on plain Spark APIs are wrapped in a type constructor F[_], for which there exists an instance of SparkDelay[F]. This typeclass represents the operation of delaying a computation and capturing an implicit SparkSession.

In the cats module, we utilize the typeclasses from cats-effect for abstracting over these effect types - namely, we provide an implicit SparkDelay instance for all F[_] for which exists an instance of cats.effect.Sync[F].

This allows one to run operations on TypedDataset in an existing monad stack. For example, given this pre-existing monad stack:

import frameless.TypedDataset
import cats.data.ReaderT
import cats.effect.IO
import cats.effect.implicits._

type Action[T] = ReaderT[IO, SparkSession, T]

We will be able to request that values from TypedDataset will be suspended in this stack:

val typedDs = TypedDataset.create(Seq((1, "string"), (2, "another")))
// typedDs: TypedDataset[(Int, String)] = [_1: int, _2: string]
val result: Action[(Seq[(Int, String)], Long)] = for {
  sample <- typedDs.take[Action](1)
  count <- typedDs.count[Action]()
} yield (sample, count)
// result: Action[(Seq[(Int, String)], Long)] = Kleisli(
//   cats.data.Kleisli$$Lambda$12691/0x0000000803840840@6a1c7fc3
// )

As with Job, note that nothing has been run yet. The effect has been properly suspended. To run our program, we must first supply the SparkSession to the ReaderT layer and then run the IO effect:

import cats.effect.unsafe.implicits.global

result.run(spark).unsafeRunSync()
// res5: (Seq[(Int, String)], Long) = (WrappedArray((1, "string")), 2L)

Convenience methods for modifying Spark thread-local variables

The frameless.cats.implicits._ import also provides some syntax enrichments for any monad stack that has the same capabilities as Action above. Namely, the ability to provide an instance of SparkSession and the ability to suspend effects.

For these to work, we will need to import the implicit machinery from the cats-mtl library:

import cats.mtl.implicits._

And now, we can set the description for the computation being run:

val resultWithDescription: Action[(Seq[(Int, String)], Long)] = for {
  r <- result.withDescription("fancy cats")
  session <- ReaderT.ask[IO, SparkSession]
  _ <- ReaderT.liftF {
         IO {
           println(s"Description: ${session.sparkContext.getLocalProperty("spark.job.description")}")
         }
       }
} yield r
// resultWithDescription: Action[(Seq[(Int, String)], Long)] = Kleisli(
//   cats.data.Kleisli$$$Lambda$14193/0x0000000803dd5040@2e5ff3bf
// )

resultWithDescription.run(spark).unsafeRunSync()
// Description: fancy cats
// res6: (Seq[(Int, String)], Long) = (WrappedArray((1, "string")), 2L)

Using algebraic typeclasses from Cats with RDDs

Data aggregation is one of the most important operations when working with Spark (and data in general). For example, we often have to compute the min, max, avg, etc. from a set of columns grouped by different predicates. This section shows how cats simplifies these tasks in Spark by leveraging a large collection of Type Classes for ordering and aggregating data.

Cats offers ways to sort and aggregate tuples of arbitrary arity.

import frameless.cats.implicits._

val data: RDD[(Int, Int, Int)] = sc.makeRDD((1, 2, 3) :: (1, 5, 3) :: (8, 2, 3) :: Nil)
// data: RDD[(Int, Int, Int)] = ParallelCollectionRDD[12] at makeRDD at Cats.md:130

println(data.csum)
// (10,9,9)
println(data.cmax)
// (8,2,3)
println(data.cmin)
// (1,2,3)

In case the RDD is empty, the csum, cmax and cmin will use the default values for the type of elements inside the RDD. There are counterpart operations to those that have an Option return type to deal with the case of an empty RDD:

val data: RDD[(Int, Int, Int)] = sc.emptyRDD
// data: RDD[(Int, Int, Int)] = EmptyRDD[13] at emptyRDD at Cats.md:146

println(data.csum)
// (0,0,0)
println(data.csumOption)
// None
println(data.cmax)
// (0,0,0)
println(data.cmaxOption)
// None
println(data.cmin)
// (0,0,0)
println(data.cminOption)
// None

The following example aggregates all the elements with a common key.

type User = String
type TransactionCount = Int

val allData: RDD[(User,TransactionCount)] =
   sc.makeRDD(("Bob", 12) :: ("Joe", 1) :: ("Anna", 100) :: ("Bob", 20) :: ("Joe", 2) :: Nil)
// allData: RDD[(User, TransactionCount)] = ParallelCollectionRDD[14] at makeRDD at Cats.md:177

val totalPerUser =  allData.csumByKey
// totalPerUser: RDD[(User, TransactionCount)] = ShuffledRDD[15] at reduceByKey at implicits.scala:42

totalPerUser.collectAsMap
// res16: collection.Map[User, TransactionCount] = Map(
//   "Bob" -> 32,
//   "Joe" -> 3,
//   "Anna" -> 100
// )

The same example would work for more complex keys.

import scala.collection.immutable.SortedMap

val allDataComplexKeu =
   sc.makeRDD( ("Bob", SortedMap("task1" -> 10)) ::
    ("Joe", SortedMap("task1" -> 1, "task2" -> 3)) :: ("Bob", SortedMap("task1" -> 10, "task2" -> 1)) :: ("Joe", SortedMap("task3" -> 4)) :: Nil )
// allDataComplexKeu: RDD[(String, SortedMap[String, Int])] = ParallelCollectionRDD[16] at makeRDD at Cats.md:193

val overalTasksPerUser = allDataComplexKeu.csumByKey
// overalTasksPerUser: RDD[(String, SortedMap[String, Int])] = ShuffledRDD[17] at reduceByKey at implicits.scala:42

overalTasksPerUser.collectAsMap
// res17: collection.Map[String, SortedMap[String, Int]] = Map(
//   "Bob" -> Map("task1" -> 20, "task2" -> 1),
//   "Joe" -> Map("task1" -> 1, "task2" -> 3, "task3" -> 4)
// )

Joins

// Type aliases for meaningful types
type TimeSeries = Map[Int,Int]
type UserName = String

Example: Using the implicit full-our-join operator

import frameless.cats.outer._

val day1: RDD[(UserName,TimeSeries)] = sc.makeRDD( ("John", Map(0 -> 2, 1 -> 4)) :: ("Chris", Map(0 -> 1, 1 -> 2)) :: ("Sam", Map(0 -> 1)) :: Nil )
// day1: RDD[(UserName, TimeSeries)] = ParallelCollectionRDD[18] at makeRDD at Cats.md:218
val day2: RDD[(UserName,TimeSeries)] = sc.makeRDD( ("John", Map(0 -> 10, 1 -> 11)) :: ("Chris", Map(0 -> 1, 1 -> 2)) :: ("Joe", Map(0 -> 1, 1 -> 2)) :: Nil )
// day2: RDD[(UserName, TimeSeries)] = ParallelCollectionRDD[19] at makeRDD at Cats.md:221

val daysCombined = day1 |+| day2
// daysCombined: RDD[(UserName, TimeSeries)] = MapPartitionsRDD[23] at mapValues at implicits.scala:67

daysCombined.collect()
// res18: Array[(UserName, TimeSeries)] = Array(
//   ("Joe", Map(0 -> 1, 1 -> 2)),
//   ("Sam", Map(0 -> 1)),
//   ("Chris", Map(0 -> 2, 1 -> 4)),
//   ("John", Map(0 -> 12, 1 -> 15))
// )

Note how the user's timeseries from different days have been aggregated together. The |+| (Semigroup) operator for key-value pair RDD will execute a full-outer-join on the key and combine values using the default Semigroup for the value type.

In cats:

Map(1 -> 2, 2 -> 3) |+| Map(1 -> 4, 2 -> -1)
// res19: Map[Int, Int] = Map(1 -> 6, 2 -> 2)