Cyclic Barrier
A re-usable synchronization primitive that allows a set of fibers to wait until they've all reached the same point.
trait CyclicBarrier[F[_]] {
def await: F[Unit]
}
A cyclic barrier is initialized with a positive integer n
and
fibers which call await
are semantically blocked until n
of
them have invoked await
, at which point all of them are unblocked
and the cyclic barrier is reset.
await
cancelation is supported, in which case the number of
fibers required to unblock the cyclic barrier is incremented again.
import cats.implicits._
import cats.effect._
import cats.effect.std.CyclicBarrier
import cats.effect.unsafe.implicits.global
import scala.concurrent.duration._
val run = (for {
b <- CyclicBarrier[IO](2)
f1 <- (IO.println("fast fiber before barrier") >>
b.await >>
IO.println("fast fiber after barrier")
) .start
f2 <- (IO.sleep(1.second) >>
IO.println("slow fiber before barrier") >>
IO.sleep(1.second) >>
b.await >>
IO.println("slow fiber after barrier")
).start
_ <- (f1.join, f2.join).tupled
} yield ())
// run: IO[Unit] = FlatMap(
// ioe = Map(
// ioe = FlatMap(
// ioe = Map(
// ioe = Delay(
// thunk = cats.effect.IO$$$Lambda$17644/0x0000000803d2e440@33078845,
// event = cats.effect.tracing.TracingEvent$StackTrace
// ),
// f = cats.effect.std.CyclicBarrier$$$Lambda$17685/0x0000000803dbd040@5d77e3a3,
// event = cats.effect.tracing.TracingEvent$StackTrace
// ),
// f = cats.effect.std.CyclicBarrier$$$Lambda$17686/0x0000000803dbc840@a920f10,
// event = cats.effect.tracing.TracingEvent$StackTrace
// ),
// f = cats.effect.std.CyclicBarrier$$$Lambda$17687/0x0000000803dbb840@17555a95,
// event = cats.effect.tracing.TracingEvent$StackTrace
// ),
// f = <function1>,
// event = cats.effect.tracing.TracingEvent$StackTrace
// )
run.unsafeRunSync()