Job[A]

All operations on TypedDataset are lazy. An operation either returns a new transformed TypedDataset or an F[A], where F[_] is a type constructor with an instance of the SparkDelay typeclass and A is the result of running a non-lazy computation in Spark.

A default such type constructor called Job is provided by Frameless.

Job serves several functions:

The toy example showcases the use of for-comprehension to explicitly sequences Spark Jobs. First we calculate the size of the TypedDataset and then we collect to the driver exactly 20% of its elements:

import frameless.syntax._

val ds = TypedDataset.create(1 to 20)
// ds: TypedDataset[Int] = [value: int]

val countAndTakeJob =
  for {
    count <- ds.count()
    sample <- ds.take((count/5).toInt)
  } yield sample
// countAndTakeJob: frameless.Job[Seq[Int]] = frameless.Job$$anon$3@24c9a2f6

countAndTakeJob.run()
// res1: Seq[Int] = WrappedArray(1, 2, 3, 4)

The countAndTakeJob can either be executed using run() (as we show above) or it can be passed along to other parts of the program to be further composed into more complex sequences of Spark jobs.

import frameless.Job
def computeMinOfSample(sample: Job[Seq[Int]]): Job[Int] = sample.map(_.min)

val finalJob = computeMinOfSample(countAndTakeJob)
// finalJob: Job[Int] = frameless.Job$$anon$2@347e2262

Now we can execute this new job by specifying a group-id and a description. This allows the programmer to see this information on the Spark UI and help track, say, performance issues.

finalJob.
  withGroupId("samplingJob").
  withDescription("Samples 20% of elements and computes the min").
  run()
// res2: Int = 1

More on SparkDelay

As mentioned above, SparkDelay[F[_]] is a typeclass required for suspending effects by Spark computations. This typeclass represents the ability to suspend an => A thunk into an F[A] value, while implicitly capturing a SparkSession.

As it is a typeclass, it is open for implementation by the user in order to use other data types for suspension of effects. The cats module, for example, uses this typeclass to support suspending Spark computations in any effect type that has a cats.effect.Sync instance.