by Gabriel Volpe on May 09, 2018
technical
There have been a couple of really nice blog posts about Tagless Final
and some related topics. However, I have faced some design problems when writing some algebras and haven’t seen anybody talking about. So please let me introduce this problem to you.
Given the following data definition:
case class ItemName(value: String) extends AnyVal
case class Item(name: ItemName, price: BigDecimal)
Consider the following algebra:
trait ItemRepository[F[_]] {
def findAll: F[List[Item]]
def find(name: ItemName): F[Option[Item]]
def save(item: Item): F[Unit]
def remove(name: ItemName): F[Unit]
}
Let’s go through each method’s definition:
findAll
needs to return many Items, obtainable inside a context: F[List[Item]]
.find
might or might not return an Item inside a context: F[Option[Item]]
.save
and remove
will perform some actions without returning any actual value: F[Unit]
.Everything is clear and you might have seen this kind of pattern before, so let’s create an interpreter for it:
import doobie.implicits._
import doobie.util.transactor.Transactor
import cats.effect.Sync
// Doobie implementation (not fully implemented, what matters here are the types).
class PostgreSQLItemRepository[F[_]](xa: Transactor[F])
(implicit F: Sync[F]) extends ItemRepository[F] {
override def findAll: F[List[Item]] = sql"select name, price from items"
.query[Item]
.to[List]
.transact(xa)
override def find(name: ItemName): F[Option[Item]] = F.pure(None)
override def save(item: Item): F[Unit] = F.unit
override def remove(name: ItemName): F[Unit] = F.unit
}
Here we are using Doobie, defined as A principled JDBC layer for Scala
and one of the most popular DB libraries in the Typelevel ecosystem. And it comes with one super powerful feature: it supports Streaming
results, since it’s built on top of fs2.
Now it could be very common to have a huge amount of Item
s in our DB that a List
will not fit into memory and / or it will be a very expensive operation. So we might want to stream the results of findAll
instead of have them all in memory on a List
, making Doobie
a great candidate for the job. But wait… We have a problem now. Our ItemRepository
algebra has fixed the definition of findAll
as F[List[Item]]
so we won’t be able to create an interpreter that returns a streaming result instead.
We should think about abstracting over that List
and two of the most common abstractions that immediately come to mind are Foldable
and Traverse
. But although these typeclasses are very useful, they are not enough to represent a stream of values, so we should come up with a better abstraction.
Well, it seems that our options are either adding another higher-kinded parameter G[_]
to our algebra or just define an abstract member G[_]
. So let’s go with the first one:
trait ItemRepository[F[_], G[_]] {
def findAll: G[Item]
def find(name: ItemName): F[Option[Item]]
def save(item: Item): F[Unit]
def remove(name: ItemName): F[Unit]
}
Great! This looks good so far.
Now let’s write a new PostgreSQL
interpreter with streaming support:
import doobie.implicits._
import doobie.util.transactor.Transactor
import fs2.Stream
// Doobie implementation (not fully implemented, what matters here are the types).
class StreamingItemRepository[F[_]](xa: Transactor[F])
(implicit F: Sync[F]) extends ItemRepository[F, Stream[F, ?]] {
override def findAll: Stream[F, Item] = sql"select name, price from items"
.query[Item]
.stream
.transact(xa)
override def find(name: ItemName): F[Option[Item]] = F.pure(None)
override def save(item: Item): F[Unit] = F.delay(println(s"Saving item: $item"))
override def remove(name: ItemName): F[Unit] = F.delay(println(s"Removing item: $item"))
}
Voilà! We got our streaming implementation of findAll
.
That’s all we wanted, but what about testing it? Sure, we might prefer to have a simple implementation by just using a plain List
, so what can we possibly do?
object MemRepository extends ItemRepository[Id, List] {
private val mem = MutableMap.empty[String, Item]
override def findAll: List[Item] = mem.headOption.map(_._2).toList
override def find(name: ItemName): Id[Option[Item]] = mem.get(name.value)
override def save(item: Item): Id[Unit] = mem.update(item.name.value, item)
override def remove(name: ItemName): Id[Unit] = {
mem.remove(name.value)
()
}
}
That’s pretty much it! We managed to abstract over the return type of findAll
by just adding an extra parameter to our algebra.
At this point the avid reader might have thought, what if I want to write a generic function that takes all the items (using findAll
), applies some discounts and writes them back to the DB (using save
)?
Short answer is, you might want to define a different algebra where findAll
and save
have the same types (eg: both of them are streams) but in case you find yourself wanting to make this work with the current types then let’s try and find out!
class DiscountProcessor[F[_], G[_]: Functor](repo: ItemRepository[F, G], join: G[F[Unit]] => F[Unit]) {
def process(discount: Double): F[Unit] = {
val items: G[Item] = repo.findAll.map(item => item.copy(price = item.price * (1 - discount)))
val saved: G[F[Unit]] = items.map(repo.save)
join(saved)
}
}
We defined a join
function responsible for evaluating the effects and flatten the result to F[Unit]
. As you can see below, this works for both a streaming interpreter and a list interpreter (shout out to fthomas for proposing this solution):
object StreamingDiscountInterpreter {
private val join: Stream[IO, IO[Unit]] => IO[Unit] = _.evalMap(identity).compile.drain
def apply(repo: ItemRepository[IO, Stream[IO, ?]]): DiscountProcessor[IO, Stream[IO, ?]] =
new DiscountProcessor[IO, Stream[IO, ?]](repo, join)
}
object ListDiscountInterpreter {
private val join: List[IO[Unit]] => IO[Unit] = list => Traverse[List].sequence(list).void
def apply(repo: ItemRepository[IO, List]): DiscountProcessor[IO, List] =
new DiscountProcessor[IO, List](repo, join)
}
While in this case it was possible to make it generic I don’t recommend to do this at home because:
fs2 DSL
which is super convenient.What I recommend instead, is to write this kind of logic in the streaming interpreter itself. You could also write a generic program that implements the parts that can be abstracted (eg. applying a discount to an item f: Item => Item
) and leave the other parts to the interpreter.
Another possible and very interesting alternative suggested by Michael Pilquist, would be to define our repository as follows:
trait ItemRepository[F[_], S[_[_], _]] {
def findAll: S[F, Item]
}
Where the second type parameter matches the shape of fs2.Stream
. In this case our streaming repository will remain the same (it should just extend ItemRepository[F, Stream]
instead of ItemRepository[F, Stream[F, ?]]
) but our in memory interpreter will now rely on fs2.Stream
instead of a parametric G[_]
, for example:
object MemRepositoryAlt extends ItemRepository[Id, Stream] {
override def findAll: Stream[Id, Item] = {
sql"select name, price from items"
.query[Item]
.stream
.transact(xa)
}
}
I think it’s an alternative worth exploring further that might require a blog post on its own, so I’ll leave it here for reference :)
I’ve come up with most of the ideas presented here during my work on Fs2 Rabbit, a stream based client for Rabbit MQ
, where I make heavy use of this technique as I originally described in this blog post.
Another great source of inspiration was this talk given by Luka Jacobowitz at Scale by the Bay.
One thing you might have noticed in the examples above is that both ItemRepository
interpreters are not fixed to IO
or Task
or any other effect type but rather requiring a parametric F[_]
and an implicit instance of Sync[F]
. This is a quite powerful technique for both library authors and application developers. Well know libraries such as Http4s, Monix and Fs2 make a heavy use of it.
And by requiring a Sync[F]
instance we are just saying that our implementation will need to suspend synchronous side effects.
Once at the edge of our program, commonly the main method, we can give F[_]
a concrete type. At the moment, there are two options: cats.effect.IO
and monix.eval.Task
. But hopefully soon we’ll have a Scalaz 8 IO
implementation as well (fingers crossed).
Abstracting over the effect type doesn’t only mean that we should require Sync[F]
, Async[F]
or Effect[F]
. It also means that we should only require the minimal typeclass instance that satisfies our predicate. For example:
import cats.Functor
import cats.implicits._
def bar[F[_]: Applicative]: F[Int] = 1.pure[F]
def foo[F[_]: Functor](fa: F[Int]): F[String] =
fa.map(_.toString)
Here our bar
method just returns a pure value in the F
context, thus we need an Applicative[F]
instance that defines pure
. On the other hand, our foo
method just converts the inner Int
into String
, what we call a pure data transformation. So all we need here is a Functor[F]
instance. Another example:
import cats.Monad
def fp[F[_]: Monad]: F[String] =
` for {
a <- bar[F]
b <- bar[F]
} yield a + b
The above implementation makes use of a for-comprehension
which is a syntactic sugar for flatMap
and map
, so all we need is a Monad[F]
instance because we also need an Applicative[F]
instance for bar
, otherwise we could just use a FlatMap[F]
instance.
I think we got quite far with all these abstractions, giving us the chance to write clean and elegant code in a pure functional programming style, and there’s even more! Other topics worth mentioning that might require a blog post on their own are:
F[_]
implementation. In some cases, FunctionK
(a.k.a. natural transformation) can be the solution.What do you think about it? Have you come across a similar design problem? I’d love to hear your thoughts!