object IO extends IOInstances
- Source
- IO.scala
- Alphabetic
- By Inheritance
- IO
- IOInstances
- IOLowPriorityInstances
- IOParallelNewtype
- IOCompanionBinaryCompat
- IOTimerRef
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Type Members
- type Par[+A] = IOParallelNewtype.Par.Type[A]
Newtype encoding for an
IO
datatype that has acats.Applicative
capable of doing parallel processing inap
andmap2
, needed for implementingcats.Parallel
.Newtype encoding for an
IO
datatype that has acats.Applicative
capable of doing parallel processing inap
andmap2
, needed for implementingcats.Parallel
.Helpers are provided for converting back and forth in
Par.apply
for wrapping anyIO
value andPar.unwrap
for unwrapping.The encoding is based on the "newtypes" project by Alexander Konovalov, chosen because it's devoid of boxing issues and a good choice until opaque types will land in Scala.
- Definition Classes
- IOParallelNewtype
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def apply[A](body: => A): IO[A]
Suspends a synchronous side effect in
IO
.Suspends a synchronous side effect in
IO
.Alias for
IO.delay(body)
. - final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def async[A](k: ((Either[Throwable, A]) => Unit) => Unit): IO[A]
Suspends an asynchronous side effect in
IO
.Suspends an asynchronous side effect in
IO
.The given function will be invoked during evaluation of the
IO
to "schedule" the asynchronous callback, where the callback is the parameter passed to that function. Only the first invocation of the callback will be effective! All subsequent invocations will be silently dropped.As a quick example, you can use this function to perform a parallel computation given an
ExecutorService
:def fork[A](body: => A)(implicit E: ExecutorService): IO[A] = { IO async { cb => E.execute(new Runnable { def run() = try cb(Right(body)) catch { case NonFatal(t) => cb(Left(t)) } }) } }
The
fork
function will do exactly what it sounds like: take a thunk and anExecutorService
and run that thunk on the thread pool. Or rather, it will produce anIO
which will do those things when run; it does *not* schedule the thunk until the resultingIO
is run! Note that there is no thread blocking in this implementation; the resultingIO
encapsulates the callback in a pure and monadic fashion without using threads.This function can be thought of as a safer, lexically-constrained version of
Promise
, whereIO
is like a safer, lazy version ofFuture
.- See also
asyncF and cancelable
- def asyncF[A](k: ((Either[Throwable, A]) => Unit) => IO[Unit]): IO[A]
Suspends an asynchronous side effect in
IO
, this being a variant of async that takes a pure registration function.Suspends an asynchronous side effect in
IO
, this being a variant of async that takes a pure registration function.Implements Async.asyncF.
The difference versus async is that this variant can suspend side-effects via the provided function parameter. It's more relevant in polymorphic code making use of the Async type class, as it alleviates the need for Effect.
Contract for the returned
IO[Unit]
in the provided function:- can be asynchronous
- can be cancelable, in which case it hooks into IO's cancelation mechanism such that the resulting task is cancelable
- it should not end in error, because the provided callback
is the only way to signal the final result and it can only
be called once, so invoking it twice would be a contract
violation; so on errors thrown in
IO
, the task can become non-terminating, with the error being printed to stderr
- See also
async and cancelable
- val cancelBoundary: IO[Unit]
Returns a cancelable boundary — an
IO
task that checks for the cancellation status of the run-loop and does not allow for the bind continuation to keep executing in case cancellation happened.Returns a cancelable boundary — an
IO
task that checks for the cancellation status of the run-loop and does not allow for the bind continuation to keep executing in case cancellation happened.This operation is very similar to IO.shift, as it can be dropped in
flatMap
chains in order to make loops cancelable.Example:
def fib(n: Int, a: Long, b: Long): IO[Long] = IO.defer { if (n <= 0) IO.pure(a) else { val next = fib(n - 1, b, a + b) // Every 100-th cycle, check cancellation status if (n % 100 == 0) IO.cancelBoundary *> next else next } }
- def cancelable[A](k: ((Either[Throwable, A]) => Unit) => CancelToken[IO]): IO[A]
Builds a cancelable
IO
.Builds a cancelable
IO
.Implements Concurrent.cancelable.
The provided function takes a side effectful callback that's supposed to be registered in async apis for signaling a final result.
The provided function also returns an
IO[Unit]
that represents the cancelation token, the logic needed for canceling the running computations.Example:
import java.util.concurrent.ScheduledExecutorService import scala.concurrent.duration._ def sleep(d: FiniteDuration)(implicit ec: ScheduledExecutorService): IO[Unit] = IO.cancelable { cb => // Schedules task to run after delay val run = new Runnable { def run() = cb(Right(())) } val future = ec.schedule(run, d.length, d.unit) // Cancellation logic, suspended in IO IO(future.cancel(true)) }
This example is for didactic purposes, you don't need to describe this function, as it's already available in IO.sleep.
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native() @HotSpotIntrinsicCandidate()
- def contextShift(ec: ExecutionContext): ContextShift[IO]
Returns a ContextShift instance for IO, built from a Scala
ExecutionContext
.Returns a ContextShift instance for IO, built from a Scala
ExecutionContext
.NOTE: you don't need to build such instances when using IOApp.
- ec
is the execution context used for the actual execution of tasks (e.g. bind continuations) and can be backed by the user's own thread-pool
- def defer[A](thunk: => IO[A]): IO[A]
Suspends a synchronous side effect which produces an
IO
inIO
.Suspends a synchronous side effect which produces an
IO
inIO
.This is useful for trampolining (i.e. when the side effect is conceptually the allocation of a stack frame). Any exceptions thrown by the side effect will be caught and sequenced into the
IO
. - def delay[A](body: => A): IO[A]
Suspends a synchronous side effect in
IO
.Suspends a synchronous side effect in
IO
.Any exceptions thrown by the effect will be caught and sequenced into the
IO
. - final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def eval[A](fa: Eval[A]): IO[A]
Lifts an
Eval
intoIO
.Lifts an
Eval
intoIO
.This function will preserve the evaluation semantics of any actions that are lifted into the pure
IO
. EagerEval
instances will be converted into thunk-lessIO
(i.e. eagerIO
), while lazy eval and memoized will be executed as such. - def fromEither[A](e: Either[Throwable, A]): IO[A]
Lifts an
Either[Throwable, A]
into theIO[A]
context, raising the throwable if it exists. - def fromFuture[A](iof: IO[Future[A]])(implicit cs: ContextShift[IO]): IO[A]
Constructs an
IO
which evaluates the givenFuture
and produces the result (or failure).Constructs an
IO
which evaluates the givenFuture
and produces the result (or failure).Because
Future
eagerly evaluates, as well as because it memoizes, this function takes its parameter as anIO
, which could be lazily evaluated. If this laziness is appropriately threaded back to the definition site of theFuture
, it ensures that the computation is fully managed byIO
and thus referentially transparent.Example:
// Lazy evaluation, equivalent with by-name params IO.fromFuture(IO(f)) // Eager evaluation, for pure futures IO.fromFuture(IO.pure(f))
Roughly speaking, the following identities hold:
IO.fromFuture(IO(f)).unsafeToFuture() === f // true-ish (except for memoization) IO.fromFuture(IO(ioa.unsafeToFuture())) === ioa // true
- See also
- def fromOption[A](option: Option[A])(orElse: => Throwable): IO[A]
Lifts an
Option[A]
into theIO[A]
context, raising the throwable if the option is empty. - def fromTry[A](t: Try[A]): IO[A]
Lifts an
Try[A]
into theIO[A]
context, raising the throwable if it exists. - final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- implicit val ioAlign: Align[IO]
- Definition Classes
- IOInstances
- implicit def ioConcurrentEffect(implicit cs: ContextShift[IO]): ConcurrentEffect[IO]
- Definition Classes
- IOInstances
- implicit val ioEffect: Effect[IO]
- Definition Classes
- IOLowPriorityInstances
- implicit def ioMonoid[A](implicit arg0: Monoid[A]): Monoid[IO[A]]
- Definition Classes
- IOInstances
- implicit def ioParallel(implicit cs: ContextShift[IO]): Aux[IO, Par]
- Definition Classes
- IOInstances
- implicit def ioSemigroup[A](implicit arg0: Semigroup[A]): Semigroup[IO[A]]
- Definition Classes
- IOLowPriorityInstances
- implicit val ioSemigroupK: SemigroupK[IO]
- Definition Classes
- IOInstances
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- val never: IO[Nothing]
A non-terminating
IO
, alias forasync(_ => ())
. - def none[A]: IO[Option[A]]
An IO that contains an empty Option.
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @HotSpotIntrinsicCandidate()
- implicit def parAlign(implicit cs: ContextShift[IO]): Align[Par]
- Definition Classes
- IOInstances
- implicit def parApplicative(implicit cs: ContextShift[IO]): Applicative[Par]
- Definition Classes
- IOLowPriorityInstances
- implicit def parCommutativeApplicative(implicit cs: ContextShift[IO]): CommutativeApplicative[Par]
- Definition Classes
- IOInstances
- def pure[A](a: A): IO[A]
Suspends a pure value in
IO
.Suspends a pure value in
IO
.This should only be used if the value in question has "already" been computed! In other words, something like
IO.pure(readLine)
is most definitely not the right thing to do! However,IO.pure(42)
is correct and will be more efficient (when evaluated) thanIO(42)
, due to avoiding the allocation of extra thunks. - def race[A, B](lh: IO[A], rh: IO[B])(implicit cs: ContextShift[IO]): IO[Either[A, B]]
Run two IO tasks concurrently, and return the first to finish, either in success or error.
Run two IO tasks concurrently, and return the first to finish, either in success or error. The loser of the race is canceled.
The two tasks are executed in parallel if asynchronous, the winner being the first that signals a result.
As an example see IO.timeout and IO.timeoutTo
N.B. this is the implementation of Concurrent.race.
Also see racePair for a version that does not cancel the loser automatically on successful results.
- lh
is the "left" task participating in the race
- rh
is the "right" task participating in the race
- cs
is an implicit requirement needed because
race
automatically forks the involved tasks
- def racePair[A, B](lh: IO[A], rh: IO[B])(implicit cs: ContextShift[IO]): IO[Either[(A, Fiber[IO, B]), (Fiber[IO, A], B)]]
Run two IO tasks concurrently, and returns a pair containing both the winner's successful value and the loser represented as a still-unfinished task.
Run two IO tasks concurrently, and returns a pair containing both the winner's successful value and the loser represented as a still-unfinished task.
If the first task completes in error, then the result will complete in error, the other task being canceled.
On usage the user has the option of canceling the losing task, this being equivalent with plain race:
val ioA: IO[A] = ??? val ioB: IO[B] = ??? IO.racePair(ioA, ioB).flatMap { case Left((a, fiberB)) => fiberB.cancel.map(_ => a) case Right((fiberA, b)) => fiberA.cancel.map(_ => b) }
N.B. this is the implementation of Concurrent.racePair.
See race for a simpler version that cancels the loser immediately.
- lh
is the "left" task participating in the race
- rh
is the "right" task participating in the race
- cs
is an implicit requirement needed because
race
automatically forks the involved tasks
- def raiseError[A](e: Throwable): IO[A]
Constructs an
IO
which sequences the specified exception.Constructs an
IO
which sequences the specified exception.If this
IO
is run usingunsafeRunSync
orunsafeRunTimed
, the exception will be thrown. This exception can be "caught" (or rather, materialized into value-space) using theattempt
method.- See also
- def raiseUnless(cond: Boolean)(e: => Throwable): IO[Unit]
Returns
raiseError
whencond
is false, otherwise IO.unitReturns
raiseError
whencond
is false, otherwise IO.unitval tooMany = 5 val x: Int = ??? IO.raiseUnless(x < tooMany)(new IllegalArgumentException("Too many"))
Example: - def raiseWhen(cond: Boolean)(e: => Throwable): IO[Unit]
Returns
raiseError
when thecond
is true, otherwiseIO.unit
Returns
raiseError
when thecond
is true, otherwiseIO.unit
val tooMany = 5 val x: Int = ??? IO.raiseWhen(x >= tooMany)(new IllegalArgumentException("Too many"))
Example: - def shift(ec: ExecutionContext): IO[Unit]
Asynchronous boundary described as an effectful
IO
, managed by the provided ScalaExecutionContext
.Asynchronous boundary described as an effectful
IO
, managed by the provided ScalaExecutionContext
.Note there are 2 overloads of the
IO.shift
function:- One that takes a
ContextShift
that manages the thread-pool used to trigger async boundaries. - Another that takes a Scala
ExecutionContext
as the thread-pool.
Please use the former by default and use the latter only for fine-grained control over the thread pool in use.
By default, Cats Effect can provide instance of
ContextShift[IO]
that manages thread-pools, but only if there’s anExecutionContext
in scope or ifIOApp
is used:import cats.effect.{IO, ContextShift} val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(...)) val contextShift = IO.contextShift(ec)
For example we can introduce an asynchronous boundary in the
flatMap
chain before a certain task:val task = IO(println("task")) IO.shift(contextShift).flatMap(_ => task)
Note that the ContextShift value is taken implicitly from the context so you can just do this:
IO.shift.flatMap(_ => task)
Or using Cats syntax:
import cats.syntax.apply._ IO.shift *> task // equivalent to ContextShift[IO].shift *> task
Or we can specify an asynchronous boundary after the evaluation of a certain task:
task.flatMap(a => IO.shift.map(_ => a))
Or using Cats syntax:
task <* IO.shift // equivalent to task <* ContextShift[IO].shift
Example of where this might be useful:
for { _ <- IO.shift(BlockingIO) bytes <- readFileUsingJavaIO(file) _ <- IO.shift(DefaultPool) secure = encrypt(bytes, KeyManager) _ <- sendResponse(Protocol.v1, secure) _ <- IO { println("it worked!") } } yield ()
In the above,
readFileUsingJavaIO
will be shifted to the pool represented byBlockingIO
, so long as it is defined usingapply
orsuspend
(which, judging by the name, it probably is). Once its computation is complete, the rest of thefor
-comprehension is shifted again, this time onto theDefaultPool
. This pool is used to compute the encrypted version of the bytes, which are then passed tosendResponse
. If we assume thatsendResponse
is defined usingasync
(perhaps backed by an NIO socket channel), then we don't actually know on which pool the finalIO
action (theprintln
) will be run. If we wanted to ensure that theprintln
runs onDefaultPool
, we would insert anothershift
followingsendResponse
.Another somewhat less common application of
shift
is to reset the thread stack and yield control back to the underlying pool. For example:lazy val repeat: IO[Unit] = for { _ <- doStuff _ <- IO.shift _ <- repeat } yield ()
In this example,
repeat
is a very long runningIO
(infinite, in fact!) which will just hog the underlying thread resource for as long as it continues running. This can be a bit of a problem, and so we inject theIO.shift
which yields control back to the underlying thread pool, giving it a chance to reschedule things and provide better fairness. This shifting also "bounces" the thread stack, popping all the way back to the thread pool and effectively trampolining the remainder of the computation. This sort of manual trampolining is unnecessary ifdoStuff
is defined usingsuspend
orapply
, but if it was defined usingasync
and does not involve any real concurrency, the call toshift
will be necessary to avoid aStackOverflowError
.Thus, this function has four important use cases:
- shifting blocking actions off of the main compute pool,
- defensively re-shifting asynchronous continuations back to the main compute pool
- yielding control to some underlying pool for fairness reasons, and
- preventing an overflow of the call stack in the case of
improperly constructed
async
actions
Note there are 2 overloads of this function:
Use the former by default, use the later for fine grained control over the thread pool used.
- ec
is the Scala
ExecutionContext
that's managing the thread-pool used to trigger this async boundary
- One that takes a
- def shift(implicit cs: ContextShift[IO]): IO[Unit]
Asynchronous boundary described as an effectful
IO
, managed by the provided ContextShift.Asynchronous boundary described as an effectful
IO
, managed by the provided ContextShift.Note there are 2 overloads of the
IO.shift
function:- One that takes a
ContextShift
that manages the thread-pool used to trigger async boundaries. - Another that takes a Scala
ExecutionContext
as the thread-pool.
Please use the former by default and use the latter only for fine-grained control over the thread pool in use.
By default, Cats Effect can provide instance of
ContextShift[IO]
that manages thread-pools, but only if there’s anExecutionContext
in scope or ifIOApp
is used:import cats.effect.{IO, ContextShift} val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(...)) val contextShift = IO.contextShift(ec)
For example we can introduce an asynchronous boundary in the
flatMap
chain before a certain task:val task = IO(println("task")) IO.shift(contextShift).flatMap(_ => task)
Note that the ContextShift value is taken implicitly from the context so you can just do this:
IO.shift.flatMap(_ => task)
Or using Cats syntax:
import cats.syntax.apply._ IO.shift *> task // equivalent to ContextShift[IO].shift *> task
Or we can specify an asynchronous boundary after the evaluation of a certain task:
task.flatMap(a => IO.shift.map(_ => a))
Or using Cats syntax:
task <* IO.shift // equivalent to task <* ContextShift[IO].shift
Example of where this might be useful:
for { _ <- IO.shift(BlockingIO) bytes <- readFileUsingJavaIO(file) _ <- IO.shift(DefaultPool) secure = encrypt(bytes, KeyManager) _ <- sendResponse(Protocol.v1, secure) _ <- IO { println("it worked!") } } yield ()
In the above,
readFileUsingJavaIO
will be shifted to the pool represented byBlockingIO
, so long as it is defined usingapply
orsuspend
(which, judging by the name, it probably is). Once its computation is complete, the rest of thefor
-comprehension is shifted again, this time onto theDefaultPool
. This pool is used to compute the encrypted version of the bytes, which are then passed tosendResponse
. If we assume thatsendResponse
is defined usingasync
(perhaps backed by an NIO socket channel), then we don't actually know on which pool the finalIO
action (theprintln
) will be run. If we wanted to ensure that theprintln
runs onDefaultPool
, we would insert anothershift
followingsendResponse
.Another somewhat less common application of
shift
is to reset the thread stack and yield control back to the underlying pool. For example:lazy val repeat: IO[Unit] = for { _ <- doStuff _ <- IO.shift _ <- repeat } yield ()
In this example,
repeat
is a very long runningIO
(infinite, in fact!) which will just hog the underlying thread resource for as long as it continues running. This can be a bit of a problem, and so we inject theIO.shift
which yields control back to the underlying thread pool, giving it a chance to reschedule things and provide better fairness. This shifting also "bounces" the thread stack, popping all the way back to the thread pool and effectively trampolining the remainder of the computation. This sort of manual trampolining is unnecessary ifdoStuff
is defined usingsuspend
orapply
, but if it was defined usingasync
and does not involve any real concurrency, the call toshift
will be necessary to avoid aStackOverflowError
.Thus, this function has four important use cases:
- shifting blocking actions off of the main compute pool,
- defensively re-shifting asynchronous continuations back to the main compute pool
- yielding control to some underlying pool for fairness reasons, and
- preventing an overflow of the call stack in the case of
improperly constructed
async
actions
Note there are 2 overloads of this function:
Use the former by default, use the later for fine grained control over the thread pool used.
- cs
is the ContextShift that's managing the thread-pool used to trigger this async boundary
- One that takes a
- def sleep(duration: FiniteDuration)(implicit timer: Timer[IO]): IO[Unit]
Creates an asynchronous task that on evaluation sleeps for the specified duration, emitting a notification on completion.
Creates an asynchronous task that on evaluation sleeps for the specified duration, emitting a notification on completion.
This is the pure, non-blocking equivalent to:
Thread.sleep
(JVM)ScheduledExecutorService.schedule
(JVM)setTimeout
(JavaScript)
Similar with IO.shift, you can combine it via
flatMap
to create delayed tasks:val timeout = IO.sleep(10.seconds).flatMap { _ => IO.raiseError(new TimeoutException) }
This operation creates an asynchronous boundary, even if the specified duration is zero, so you can count on this equivalence:
IO.sleep(Duration.Zero) <-> IO.shift
The created task is cancelable and so it can be used safely in race conditions without resource leakage.
- duration
is the time span to wait before emitting the tick
- timer
is the Timer used to manage this delayed task,
IO.sleep
being in fact just an alias for Timer.sleep- returns
a new asynchronous and cancelable
IO
that will sleep for the specified duration and then finally emit a tick
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def timer(ec: ExecutionContext, sc: ScheduledExecutorService): Timer[IO]
Returns a Timer instance for IO, built from a Scala
ExecutionContext
and a JavaScheduledExecutorService
.Returns a Timer instance for IO, built from a Scala
ExecutionContext
and a JavaScheduledExecutorService
.N.B. this is the JVM-specific version. On top of JavaScript the implementation needs no
ExecutionContext
.- ec
is the execution context used for actual execution tasks (e.g. bind continuations)
- sc
is the
ScheduledExecutorService
used for scheduling ticks with a delay
- Definition Classes
- IOTimerRef
- def timer(ec: ExecutionContext): Timer[IO]
Returns a Timer instance for IO, built from a Scala
ExecutionContext
. - def toString(): String
- Definition Classes
- AnyRef → Any
- val trace: IO[IOTrace]
Returns the accumulated trace of the currently active fiber.
- val unit: IO[Unit]
Alias for
IO.pure(())
. - def unlessA(cond: Boolean)(action: => IO[Unit]): IO[Unit]
Returns the given argument if
cond
is false, otherwiseIO.Unit
Returns the given argument if
cond
is false, otherwiseIO.Unit
- See also
IO.whenA for the inverse
IO.raiseWhen for conditionally raising an error
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- def whenA(cond: Boolean)(action: => IO[Unit]): IO[Unit]
Returns the given argument if
cond
is true, otherwiseIO.Unit
Returns the given argument if
cond
is true, otherwiseIO.Unit
- See also
IO.unlessA for the inverse
IO.raiseWhen for conditionally raising an error
- object Par extends IONewtype
Newtype encoding, see the IO.Par type alias for more details.
Newtype encoding, see the IO.Par type alias for more details.
- Definition Classes
- IOParallelNewtype
Deprecated Value Members
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable]) @Deprecated
- Deprecated
- def suspend[A](thunk: => IO[A]): IO[A]
Suspends a synchronous side effect which produces an
IO
inIO
.Suspends a synchronous side effect which produces an
IO
inIO
.This is useful for trampolining (i.e. when the side effect is conceptually the allocation of a stack frame). Any exceptions thrown by the side effect will be caught and sequenced into the
IO
.- Annotations
- @deprecated
- Deprecated
(Since version 2.5.3) use defer
This is the API documentation for the cats-effect library.
See the cats.effect package for a quick overview.
Links
Canonical documentation links:
Related Cats links (the core):