Channels
Skunk provides high-level support for Postgres channels, exposing them as an fs2 Pipe / Stream pair.
See NOTIFY and LISTEN in the PostgreSQL documentation for an explanation of channel operations. The text that follows assumes you have a working knowledge of the information in those sections.
Constructing a Channel
Use the channel method on Session to construct a channel.
// assume s: Session[IO]
val ch = s.channel(id"my_channel") // Channel[IO, String, String]
Observe the following:
- The argument to
channelis anIdentifier. See Identifiers for more information. chis aChannelwhich consumesStrings and emitsNotification[String]s. A notification is a structure that includes the process ID and channel identifier as well as the payload.Channelis a profunctor and thus can be contramapped to change the input type, and mapped to change the output type.
Listening to a Channel
To listen on a channel, construct a stream via .listen.
// assume ch: Channel[IO, String, String]
val nbs = ch.listen(1024) // Stream[IO, Notification[String]]
Observe the following:
- The argument to
listenis the maximum number of messages that will be enqueued before blocking the underlying socket. If you run the resulting stream be sure to process notifications in a timely manner to avoid blocking concurrent session operations. - When
nbsbegins execution it will first issueLISTEN <channel>. - While
nbsis executing it will emit any messages received from<channel>. - When
nbsterminates it will issueUNLISTEN <channel>.
It is perfectly fine to run such a stream concurrently while the underlying session is being used for other things (modulo transaction and fiber lifetime concerns; see Transactions and Concurrency.md for more information).
If you wish to listen to all notifications on all subscribed channels, use the notifications method on Session.
Notifying a Channel
Use .notify to send a message to a channel.
// assume ch: Channel[IO, String, String]
ch.notify("hello") // IO[Unit]
Every Channel is also an fs2 Pipe that consumes messages.
// assume s: Session[IO]
// select all the country names and stream them to the country_names channel.
s.prepare(sql"select name from country".query(varchar)).flatMap { ps =>
ps.stream(Void, 512)
.through(s.channel(id"country_names"))
.compile
.drain
}
Keep in mind (as detailed in the documentation for NOTIFY) that notifications performed inside a transaction are not delivered until the transaction commits. Notifications performed outside a transaction are delivered immediately.