Channels

Skunk provides high-level support for Postgres channels, exposing them as an fs2 Pipe / Stream pair.

See NOTIFY and LISTEN in the PostgreSQL documentation for an explanation of channel operations. The text that follows assumes you have a working knowledge of the information in those sections.

Constructing a Channel

Use the channel method on Session to construct a channel.

// assume s: Session[IO]
val ch = s.channel(id"my_channel") // Channel[IO, String, String]

Observe the following:

Listening to a Channel

To listen on a channel, construct a stream via .listen.

// assume ch: Channel[IO, String, String]
val nbs = ch.listen(1024) // Stream[IO, Notification[String]]

Observe the following:

It is perfectly fine to run such a stream concurrently while the underlying session is being used for other things (modulo transaction and fiber lifetime concerns; see Transactions and Concurrency.md for more information).

If you wish to listen to all notifications on all subscribed channels, use the notifications method on Session.

Notifying a Channel

Use .notify to send a message to a channel.

// assume ch: Channel[IO, String, String]
ch.notify("hello") // IO[Unit]

Every Channel is also an fs2 Pipe that consumes messages.

// assume s: Session[IO]
// select all the country names and stream them to the country_names channel.
s.prepare(sql"select name from country".query(varchar)).flatMap { ps =>
  ps.stream(Void, 512)
    .through(s.channel(id"country_names"))
    .compile
    .drain
}

Keep in mind (as detailed in the documentation for NOTIFY) that notifications performed inside a transaction are not delivered until the transaction commits. Notifications performed outside a transaction are delivered immediately.