Our thoughts, knowledge, insights and opinions

Why you should know Monix

In this short blog post I will try, in 10 minutes or less, to present what Monix library is and convince you that it is good to know it.

Formerly known as Monifu, Monix is a library for asynchronous programming in Scala and Scala.js

It contains several abstractions that are useful and sometimes can be found superior to vanilla Scala or Akka counterparts. But for sure this post is not about deprecating use of Akka actors or streams. It is about another tool in Scala programmers box. I am going to present some of the abstractions that Monix gives and conclude why they are valuable.

Task vs Future

Monix comes with Task abstraction that can be seen as a replacement for Future. However, there are some major differences. Future was designed to be ongoing computation that is possibly finished, Task is a description of computation. So when using Task as a drop in replacement for Future, one has to trigger its execution to start computation. This means that following code will not print anything:

To start execution one has to do two things:

  • get instance of monix.execution.Scheduler;
  • call one of runAsync, runOnComplete, runSyncMaybe or foreach to execute computation

Scheduler is an ExecutionContext that additionally allows scheduling single or periodical execution of Runnables.

CancelableFuture allow programmer to attempt cancellation of some computation before is ends, when it is desired to stop it. That is the second major difference I see and it is a nice improvement in my opinion.

Monix allows high level of control over Task execution. One can choose if task body should be executed on each invocation of runAsync (Task.eval) or if result should be memoized (Task.evalOnce or memoize) or if execution should always execute in separate thread (Task.fork). On top of that Task has an API to choose scheduler it should run on.

Let me show one, contrived example, that shows some errors handling and schedulers choice. Code below gets Picture and should perform analysis that tells if given picture was painted by Picasso or not. First, it runs analysis locally, but in case when the local algorithm cannot decide, remote analysis service is used. Finally, original picture and analysis results are stored in a database. Of course, services are just mocks, thus results are hardcoded for sake of simplicity.

Link to file in GH repo

I have mentioned that Schedulers are like ExecutionContext with an ability to schedule execution on them. But there is one more important trait Scheduler has, namely ExecutionModel which describes if tasks should be executed in separate logical threads or if trampoline should be used or if batch of tasks should be executed synchronously and then a new thread should be used.

Just a few more words about Task. It comes with plenty of combinators and constructors, counterparts for both Future and Scalaz Task combinators. I will just few that grabbed my attention:

  • onErrorRestart - restart until completes successfully
  • restartUntil - restart until result fulfills predicate
  • timeout - adds timeout to existing Task, Monix gives you also FutureUtils.timeout to timeout Scala Futures.

To recap: Monix Task gives users a great dose of control over asynchronous computation execution by a rich set of builders and combinators, ability to tune schedulers and execution models and also by out-of-the-box possibility to timeout or cancel computations. From my point of view this is very considerable profit.

There are two artifacts available: monix-cats and monix-scalaz72 with code that turns Task into an instance of Monad from Cats or Scalaz.

Coeval

Coeval is synchronous counterpart for Task, it gives control over evaluation, side effects and error handling, all at once, but for synchronous computations.

Like Task it does not run when defined but only when value or run is called. One can define Coeval to execute on each value call with or to memoize result (like lazy val). This level of control allows one to remove stack overflows caused by tail recursion. @tailrec can do that also but only for simple tail recursion that involves one function.

Coeval has many builders, combinators (zips, map, flatMap, sequence, traverse, restartUntil, error handing and others) and transformers (from/to Task, Attempt and Try).

Again a bit made up example, this time user will be prompted to enter a number. User enters invalid line 3 times, other prompt will be used and in case of error program falls back to 42.

Code available in repo

Cats and Scalaz integrations contain Monad instances of Coeval (and other tools mentioned here).

MVar

Before I write a really short part about MVar: I have never written “real” code that used this or analogical abstraction.

Thread can put or take something from MVar but both operations asynchronously block, what is expressed in return types of Task[Unit] and Task[A] respectively. When thread takes something from empty MVar it gets Task[A] that will complete when some thread puts value to it. put behaves similar, returns Task that completes only after MVar put was executed (what requires access to empty MVar).

Obligatory example, this time dummy producer-consumer problem.

Here is more verbose version with some printlns to show lazy behaviour of Task as well.

MVar is inspired by Haskell’s MVar but original blocks thread instead of returning asynchronous computation abstraction (Task). That is because threads in Haskell are not so scarce.

Back-pressured streams

Last part of the library I would like to mention in this article is Monix’s take on reactive streams.

I will describe it shortly and only from perspective of user, not implementer of new Observables, Observers nor Consumers.

Observable is equivalent of org.reactivestreams.Publisher or akka-streams Source, Observer is like org.reactivestreams.Subscriber and Consumer, specifies how to consume an Observable, resembles Sink.

Observable has wealth of combinators: for grouping, filtering, mapping, taking, sliding, giving control when upstream or downstream are slow, attaching callbacks for upstream events like stream end, error handling and more. I am not going to enumerate these but I can assure that library author gave extraordinary effort in that matter.

Provided set of Consumers is sufficient, I was able to find replacement for each Sink that I have used in real project based on akka-streams.

I will show what unfolded when I took an akka-stream used in a recent project I worked on and rewritten it with Monix. I have changed business methods to dummy ones.

First, natural language description of processing, then akka-streams implementation and the last will be Monix implementation.

  1. Starting point is some equivalent of reactive streams publisher of Kafka “commitable messages” (combination of payload and means to commit offset).
  2. The message is matched and acted upon. This part is very abstract in following code.
  3. To avoid excessive offsets commits (which are expensive I/O), offsets are commited only once per 100 messages or 1 second, when traffic is lower.
  4. Then stream goes through an injectable trigger that can be used to cancel processing.
  5. There is no element or statistic about the stream that we are interested in, thus we ignore what came through cancel switch.

akka-streams original

Monix rewrite Full example is available in GH repo.

API looks similar to some extent. The main difference is that Monix streams are run on Scheduler when akka-streams requires Materializer, which most probably is ActorMaterializer, which in turn requires ActorSystem. Scheduler is much more lightweight than ActorSystem.

I have also written and ran some JMH benchmarks for totally made up set of operations although I will not summarize them as a comparison to avoid possibly false conclusions. Surely Monix streams are very efficient.

Like with Task and Coeval there is glue library that provides Cats or Scalaz Monads instances for Observables.

So why should you know Monix?

Because it gives you a very good set of tools for asynchronous programming. There is Task as an alternative to Future, with many combinators and helpers to write your asynchronous code. It provides well engineered and efficient implementation of reactive streams. MVar type can sometimes be the simplest solution for synchronization problems. There are also several miscellaneous classes I have not written about, that can be handy, like Atomic that can replace AtomicInt and alike, or CircutBreaker, or Future Utils.

You like this post? Want to stay updated? Follow us on Twitter or subscribe to our Feed.