Deferred
A purely functional synchronization primitive which represents a single value which may not yet be available.
When created, a Deferred is empty. It can then be completed exactly once, and never be made empty again.
abstract class Deferred[F[_], A] {
def get: F[A]
def complete(a: A): F[Boolean]
}
Expected behavior of get
geton an emptyDeferredwill block until theDeferredis completedgeton a completedDeferredwill always immediately return its contentgetis cancelable and on cancelation it will unsubscribe the registered listener, an operation that's possible for as long as theDeferredvalue isn't complete
Expected behavior of complete
complete(a)on an emptyDeferredwill set it toa, notify any and all readers currently blocked on a call togetand returntruecomplete(a)on aDeferredthat has already been completed will not modify its content, and will resultfalse
Albeit simple, Deferred can be used in conjunction with Ref to build complex concurrent behaviour and data structures like queues and semaphores.
Finally, the blocking mentioned above is semantic only, no actual threads are blocked by the implementation.
Only Once
Whenever you are in a scenario when many processes can modify the same value but you only care about the first one in doing so and stop processing, then this is a great use case of Deferred[F, A].
Two processes will try to complete at the same time but only one will succeed, completing the deferred primitive exactly once. The loser one will get a false in F when trying to complete a Deferred already completed or automatically be canceled by the IO.race mechanism.
import cats.effect.{IO, Deferred}
import cats.syntax.all._
def start(d: Deferred[IO, Int]): IO[Unit] = {
val attemptCompletion: Int => IO[Unit] = n => d.complete(n).void
List(
IO.race(attemptCompletion(1), attemptCompletion(2)),
d.get.flatMap { n => IO(println(show"Result: $n")) }
).parSequence.void
}
val program: IO[Unit] =
for {
d <- Deferred[IO, Int]
_ <- start(d)
} yield ()