Queries

This section explains how to construct and execute queries.

A query is a SQL statement that can return rows.

Single-Column Query

First let's look at a query that selects a single column and decodes rows as Scala strings.

val a: Query[Void, String] =
  sql"SELECT name FROM country".query(varchar)
// a: Query[Void, String] = Query(
//   sql = "SELECT name FROM country",
//   origin = Origin(file = "Query.md", line = 30),
//   encoder = Codec(void),
//   decoder = Codec(varchar),
//   isDynamic = false
// )

Observe the following:

Query and Command types are usually inferrable, but specifying a type ensures that the chosen encoders and decoders are consistent with the expected input and output Scala types. For this reason (and for clarity) we will always use explicit type annotations in the documentation.

The query above is a simple query.

A simple query is a query with no parameters.

Postgres provides a protocol for execution of simple queries, returning all rows at once (Skunk returns them as a list). Such queries can be passed directly to Session.execute.

// assume s: Session[IO]
s.execute(a) // IO[List[String]]

Session provides the following methods for direct execution of simple queries. See the Scaladoc for more information.

Method Return Type Notes
execute F[List[A]] All results, as a list.
option F[Option[A]] Zero or one result, otherwise an error is raised.
unique F[A] Exactly one result, otherwise an error is raised.

Multi-Column Query

Our next example selects two columns.

val b: Query[Void, String ~ Int] =
  sql"SELECT name, population FROM country".query(varchar ~ int4)
// b: Query[Void, String ~ Int] = Query(
//   sql = "SELECT name, population FROM country",
//   origin = Origin(file = "Query.md", line = 48),
//   encoder = Codec(void),
//   decoder = Codec(varchar, int4),
//   isDynamic = false
// )

Observe that the argument to query is a pair of decoders conjoined with the ~ operator, yielding a Decoder[String ~ Int]. Executing this query will yield a List[String ~ Int], which is an alias for List[(String, Int)]. See the section on twiddle lists for more information on this mechanism.

Mapping Query Results

Decoding into a twiddle list (i.e., nested pairs) isn't ideal, so let's define a Country data type. We can then call map on our query to adapt the row type.

case class Country(name: String, population: Int)

val c: Query[Void, Country] =
  sql"SELECT name, population FROM country"
    .query(varchar ~ int4)                // (1)
    .map { case n ~ p => Country(n, p) }  // (2)
// c: Query[Void, Country] = Query(
//   sql = "SELECT name, population FROM country",
//   origin = Origin(file = "Query.md", line = 58),
//   encoder = Encoder(),
//   decoder = Decoder(varchar, int4),
//   isDynamic = false
// )

Observe the following:

So that is one way to do it.

Mapping Decoder Results

A more reusable way to do this is to define a Decoder[Country] based on the varchar ~ int4 decoder. We can then decode directly into our Country data type.

val country: Decoder[Country] =
  (varchar ~ int4).map { case (n, p) => Country(n, p) }     // (1)
// country: Decoder[Country] = Decoder(varchar, int4)

val d: Query[Void, Country] =
  sql"SELECT name, population FROM country".query(country)  // (2)
// d: Query[Void, Country] = Query(
//   sql = "SELECT name, population FROM country",
//   origin = Origin(file = "Query.md", line = 71),
//   encoder = Codec(void),
//   decoder = Decoder(varchar, int4),
//   isDynamic = false
// )

Observe the following:

Because decoders are structural (i.e., they rely only on the position of column values) it can become a maintenance issue when queries and their decoders become separated in code. Try to keep decoders close to the queries that use them.

Mapping Decoder Results Generically

Because Country is a simple case class we can generate the mapping code mechanically. To do this, use to and specify the target data type.

val country2: Decoder[Country] =
  (varchar *: int4).to[Country]
// country2: Decoder[Country] = Codec(varchar, int4)

Even better, instead of constructing a named decoder you can to the Query itself.

val c2: Query[Void, Country] =
  sql"SELECT name, population FROM country"
    .query(varchar *: int4)
    .to[Country]
// c2: Query[Void, Country] = Query(
//   sql = "SELECT name, population FROM country",
//   origin = Origin(file = "Query.md", line = 85),
//   encoder = Encoder(),
//   decoder = Decoder(varchar, int4),
//   isDynamic = false
// )

Parameterized Query

Now let's add a parameter to the query.

val e: Query[String, Country] =
  sql"""
    SELECT name, population
    FROM   country
    WHERE  name LIKE $varchar
  """.query(country)
// e: Query[String, Country] = Query(
//   sql = """
//     SELECT name, population
//     FROM   country
//     WHERE  name LIKE $1
//   """,
//   origin = Origin(file = "Query.md", line = 94),
//   encoder = Codec(varchar),
//   decoder = Decoder(varchar, int4),
//   isDynamic = false
// )

Observe that we have interpolated a value called varchar, which has type Encoder[String].

This means that Postgres will expect an argument of type varchar, which will have Scala type String. The relationship between Postgres types and Scala types is summarized in the reference section Schema Types.

We have already seen varchar used as a row decoder for String and now we're using it as an encoder for String. We can do this because varchar actually has type Codec[String], which extends both Encoder[String] and Decoder[String]. All type mappings provided by Skunk are codecs and can be used in both positions.

The query above is an extended query.

An extended query is a query with parameters, or a simple query that is executed via the extended query protocol.

Postgres provides a protocol for executing extended queries which is more involved than simple query protocol. It provides for prepared statements that can be reused with different sets of arguments, and provides cursors which allow results to be paged and streamed.

Here we use the extended query protocol to stream directly to the console using constant space.

// assume s: Session[IO]
s.prepare(e).flatMap { ps =>
  ps.stream("U%", 64)
    .evalMap(c => IO.println(c))
    .compile
    .drain
} // IO[Unit]

Observe that prepare returns a Resource that prepares the statement before use and then frees it on completion. Here we use PreparedQuery#stream to pass our parameter "U%" and then create an fs2 stream that fetches rows in blocks of 64 and prints them to the console.

Note that when using Resource and Stream together it is often convenient to express the entire program in terms of Stream.

// assume s: Session[IO]
val stream: Stream[IO, Unit] =
  for {
    ps <- Stream.eval(s.prepare(e))
    c  <- ps.stream("U%", 64)
    _  <- Stream.eval(IO.println(c))
  } yield ()

stream.compile.drain // IO[Unit]

This program does the same thing, but perhaps in a more convenient style.

PreparedQuery provides the following methods for execution. See the Scaladoc for more information.

Method Return Type Notes
stream Stream[F,B] All results, as a stream.
option F[Option[B]] Zero or one result, otherwise an error is raised.
unique F[B] Exactly one result, otherwise an error is raised.
cursor Resource[F,Cursor[F,B]] A cursor that returns pages of results.
pipe Pipe[F, A, B] A pipe that executes the query for each input value, concatenating the results.

Multi-Parameter Query

Multiple parameters work analogously to multiple columns.

val f: Query[String *: Int *: EmptyTuple, Country] =
  sql"""
    SELECT name, population
    FROM   country
    WHERE  name LIKE $varchar
    AND    population < $int4
  """.query(country)
// f: Query[String *: Int *: EmptyTuple, Country] = Query(
//   sql = """
//     SELECT name, population
//     FROM   country
//     WHERE  name LIKE $1
//     AND    population < $2
//   """,
//   origin = Origin(file = "Query.md", line = 142),
//   encoder = Codec(varchar, int4),
//   decoder = Decoder(varchar, int4),
//   isDynamic = false
// )

Observe that we have two parameter encoders varchar and int4 (in that order), whose corresponding Scala input type is String *: Int *: EmptyTuple. See the section on twiddle lists for more information.

// assume s: Session[IO]
s.prepare(f).flatMap { ps =>
  ps.stream(("U%", 2000000), 64)
    .evalMap(c => IO.println(c))
    .compile
    .drain
} // IO[Unit]

And we pass the value ("U%", 2000000) as our statement argument.

Summary of Query Types

The simple query protocol (i.e., Session#execute) is slightly more efficient in terms of message exchange, so use it if:

The extend query protocol (i.e., Session#prepare) is more powerful and more general, but requires additional network exchanges. Use it if:

Full Example

Here is a complete program listing that demonstrates our knowledge thus far.

import cats.effect._
import skunk._
import skunk.implicits._
import skunk.codec.all._
import java.time.OffsetDateTime
import natchez.Trace.Implicits.noop

object QueryExample extends IOApp {

  // a source of sessions
  val session: Resource[IO, Session[IO]] =
    Session.single(
      host     = "localhost",
      user     = "jimmy",
      database = "world",
      password = Some("banana")
    )

  // a data model
  case class Country(name: String, code: String, population: Int)

  // a simple query
  val simple: Query[Void, OffsetDateTime] =
    sql"select current_timestamp".query(timestamptz)

  // an extended query
  val extended: Query[String, Country] =
    sql"""
      SELECT name, code, population
      FROM   country
      WHERE  name like $text
    """.query(varchar *: bpchar(3) *: int4)
       .to[Country]

  // run our simple query
  def doSimple(s: Session[IO]): IO[Unit] =
    for {
      ts <- s.unique(simple) // we expect exactly one row
      _  <- IO.println(s"timestamp is $ts")
    } yield ()

  // run our extended query
  def doExtended(s: Session[IO]): IO[Unit] =
    s.prepare(extended).flatMap { ps =>
      ps.stream("U%", 32)
        .evalMap(c => IO.println(c))
        .compile
        .drain
    }

  // our entry point
  def run(args: List[String]): IO[ExitCode] =
    session.use { s =>
      for {
        _ <- doSimple(s)
        _ <- doExtended(s)
      } yield ExitCode.Success
    }

}

Running this program yields the following.

timestamp is 2024-11-29T15:31:08.782054Z
Country(United Arab Emirates,ARE,2441000)
Country(United Kingdom,GBR,59623400)
Country(Uganda,UGA,21778000)
Country(Ukraine,UKR,50456000)
Country(Uruguay,URY,3337000)
Country(Uzbekistan,UZB,24318000)
Country(United States,USA,278357000)
Country(United States Minor Outlying Islands,UMI,0)

Service-Oriented Example

In real life a program like QueryExample above will grow complicated an hard to maintain because the database abstractions are out in the open. It's better to define an interface that uses a database session and write your program in terms of that interface. Here is a rewritten version of the program above that demonstrates this pattern.

import cats.syntax.all._
import cats.effect._
import skunk._
import skunk.implicits._
import skunk.codec.all._
import java.time.OffsetDateTime
import natchez.Trace.Implicits.noop
import fs2.Stream
import cats.Applicative

// a data model
case class Country(name: String, code: String, population: Int)

// A service interface.
trait Service[F[_]] {
  def currentTimestamp: F[OffsetDateTime]
  def countriesByName(pat: String): Stream[F, Country]
}

// A companion with a constructor.
object Service {

  private val timestamp: Query[Void, OffsetDateTime] =
    sql"select current_timestamp".query(timestamptz)

  private val countries: Query[String, Country] =
    sql"""
      SELECT name, code, population
      FROM   country
      WHERE  name like $text
    """.query(varchar *: bpchar(3) *: int4)
       .to[Country]

  def fromSession[F[_]: Applicative](s: Session[F]): F[Service[F]] =
    s.prepare(countries).map { pq =>

      // Our service implementation. Note that we are preparing the query on construction, so
      // our service can run it many times without paying the planning cost again.
      new Service[F] {
        def currentTimestamp: F[OffsetDateTime] = s.unique(timestamp)
        def countriesByName(pat: String): Stream[F,Country] = pq.stream(pat, 32)
      }

    }
}


object QueryExample2 extends IOApp {

  // a source of sessions
  val session: Resource[IO, Session[IO]] =
    Session.single(
      host     = "localhost",
      user     = "jimmy",
      database = "world",
      password = Some("banana")
    )

  // A source of services
  val service: Resource[IO, Service[IO]] =
    session.evalMap(Service.fromSession(_))

  // our entry point ... there is no indication that we're using a database at all!
  def run(args: List[String]): IO[ExitCode] =
    service.use { s =>
      for {
        ts <- s.currentTimestamp
        _  <- IO.println(s"timestamp is $ts")
        _  <- s.countriesByName("U%")
               .evalMap(c => IO.println(c))
               .compile
               .drain
      } yield ExitCode.Success
    }

}

Running this program yields the same output as above.

timestamp is 2024-11-29T15:31:08.920909Z
Country(United Arab Emirates,ARE,2441000)
Country(United Kingdom,GBR,59623400)
Country(Uganda,UGA,21778000)
Country(Ukraine,UKR,50456000)
Country(Uruguay,URY,3337000)
Country(Uzbekistan,UZB,24318000)
Country(United States,USA,278357000)
Country(United States Minor Outlying Islands,UMI,0)

Experiment

Here are some experiments you might want to try.

For reference, the country table looks like this.

Column Postgres Type Modifiers
code character(3) not null
name character varying not null
continent character varying not null
region character varying not null
surfacearea real not null
indepyear smallint
population integer not null
lifeexpectancy real
gnp numeric(10,2)
gnpold numeric(10,2)
localname character varying not null
governmentform character varying not null
headofstate character varying
capital integer
code2 character(2) not null