Job[A]
All operations on TypedDataset
are lazy. An operation either returns a new
transformed TypedDataset
or an F[A]
, where F[_]
is a type constructor
with an instance of the SparkDelay
typeclass and A
is the result of running a
non-lazy computation in Spark.
A default such type constructor called Job
is provided by Frameless.
Job
serves several functions:
- Makes all operations on a
TypedDataset
lazy, which makes them more predictable compared to having few operations being lazy and other being strict - Allows the programmer to make expensive blocking operations explicit
- Allows for Spark jobs to be lazily sequenced using monadic composition via for-comprehension
- Provides an obvious place where you can annotate/name your Spark jobs to make it easier to track different parts of your application in the Spark UI
The toy example showcases the use of for-comprehension to explicitly sequences Spark Jobs.
First we calculate the size of the TypedDataset
and then we collect to the driver
exactly 20% of its elements:
import frameless.syntax._
val ds = TypedDataset.create(1 to 20)
// ds: TypedDataset[Int] = [value: int]
val countAndTakeJob =
for {
count <- ds.count()
sample <- ds.take((count/5).toInt)
} yield sample
// countAndTakeJob: frameless.Job[Seq[Int]] = frameless.Job$$anon$3@4ef31fa4
countAndTakeJob.run()
// res1: Seq[Int] = WrappedArray(1, 2, 3, 4)
The countAndTakeJob
can either be executed using run()
(as we show above) or it can
be passed along to other parts of the program to be further composed into more complex sequences
of Spark jobs.
import frameless.Job
def computeMinOfSample(sample: Job[Seq[Int]]): Job[Int] = sample.map(_.min)
val finalJob = computeMinOfSample(countAndTakeJob)
// finalJob: Job[Int] = frameless.Job$$anon$2@aa3d28d
Now we can execute this new job by specifying a group-id and a description. This allows the programmer to see this information on the Spark UI and help track, say, performance issues.
finalJob.
withGroupId("samplingJob").
withDescription("Samples 20% of elements and computes the min").
run()
// res2: Int = 1
More on SparkDelay
As mentioned above, SparkDelay[F[_]]
is a typeclass required for suspending
effects by Spark computations. This typeclass represents the ability to suspend
an => A
thunk into an F[A]
value, while implicitly capturing a SparkSession
.
As it is a typeclass, it is open for implementation by the user in order to use
other data types for suspension of effects. The cats
module, for example, uses
this typeclass to support suspending Spark computations in any effect type that
has a cats.effect.Sync
instance.