In Why you should know Monix
I’ve taken a brief look at some of Monix’s abstractions and utilities,
but I haven’t dived into implementing reactive streams elements.
This time I’m going to build
Observer for RabbitMQ message broker.
I have to start with vocabulary, to make things understandable:
What I aim for are: Monix
Consumer that takes messages from
Observable and produces messages to broker
Observable that consumes messages from broker and gives them to
For both elements I will provide tests for “reactiveness”.
What I’m not going to implement is a complete library of production quality or part of it. I’m trying to keep things very simple.
Some basic knowledge about RabbitMQ you ought to have to understand this post is:
Queues(zero to many) accordingly to message
routing keyand server bindings setup
Queuesand their consumption (delivery) is acknowledged by client, delivered messages will not be delivered to other consumer, unacknowledged messages re-appear in
Queueand will be delivered again to one of consumers
Connectionis scarce resource but one can create multiple
Channelcan cause closing
Connectionif it produces too much, so it is good idea to have separate
Channelinstances must not be shared between threads”, I understand this as “don’t access
Channelconcurrently from multiple threads”
Basic AMQP concepts page contains all necessary information, it is about 15 minutes read.
Short summary of Monix’s Observer[T], Subscriber[T] and Consumer[T, R] abstractions and their relations:
Observer[T]has three callbacks:
onNext(e: T): Future[Ack],
onError(ex: Throwable): Unit, defines behaviour on stream events.
Consumer[T, R]is a factory of
o: Observable[T]is applied to
c: Consumer[C, R],
s: Subscriber[C, R]and
Ris type of element emitted when
Subscribercompletes - for example, a consumer that sums length of all
Strings passed to it has type
ExchangeConsumer I’m implementing will expect an
Observable[OutboundMessage] and it will produce observed messages to RabbitMQ server.
There is no value I want to return upon completion of upstream (data producer),
thus I need to return
If I was going to count messages sent and signal this number when upstream completes, then type would be
There is more than one way to skin a cat, my way is to have one
Connection is used to create one
This is cheap, but
Connection - thus all
Subscribers using it’s
can be blocked by server because of one
Subscriber that produces too much.
Let’s live with this design decision and dive into the code:
Important points of
Channelis created for new
Scheduleris used by new
Channel.basicPublishwhich has synchronous, blocking API
Subscriberis ready for next element at that time
Callbackis used after aborting a
AssignableCancelable.dummyis returned along
Subscriber, I’ll get back to it in canceling subscription
I can’t tell it was hard but Monix can make it even easier,
this time I’ll create equivalent
Consumer.fromObserver takes care of
Callback[Unit] seen in previous example,
by calling it’s
onError after calling
Callback is absent in this code.
It’s time for short example of how to start streaming from observable to Rabbit, please just note types that are used:
Finally I’m ready to explain
AssignableCancelable.dummy present in the first example.
Subscriber can cancel this
AssignableCancelable to cancel subscription to data source.
Assignable part of it, because it assigns subscription to
Observer to it.
In other words it allows canceling subscription to data source from data sink.
Let’s consider scenario in which RabbitMQ will close
and therefore all
onNext subsequent to channel shutdown has to fail (
Stop should be returned and
Callback should be invoked).
But what if
Observable uses some scarce resource and supports canceling subscriptions properly?
Then we can save this resource, by canceling subscription, when
Channels shutdown is observed.
Channel.addShutdownListener improvement is very simple.
Final version of
ExchangeConsumer in repo: ExchangeConsumer.scala
ExchangeSubscriber is now polite to
because it reacts to connection closed with canceling it’s subscription.
Unfortunately, in following scenario no one is so kind to
it will await for subsequent
onNext calls keeping
Channel open. Forever.
states that “The data-source can get canceled without the observer receiving any notification about it”.
But there is one weird trick that can help us, it is:
After little change
produce ends with
java.util.concurrent.CancellationException but I don’t care,
I just told it to cancel and my resource is free!
Ok, it compiles, so it works. I call it a day.
Just kidding, tests are missing! I’m conservative about tests but I’m also motivated to end this section with little effort.
I’m going to omit round trip tests where messages are produced by
because this post is more about reactive streams than about AMQP.
For tests about being reactive streams, things look good for me,
because someone has written pretty nice Technology Compatibility Kit for Reactive Streams.
I’ll use Monix goodness to obtain
org.reactivestreams.Subscriber from my
Consumer and TCK to perform tests!
Code above is missing starting RabbitMQ server (not in Repo also), creating exchange and connection management.
Relevant parts are:
createElement- required by
SubscriberBlackboxVerificationto create elements that
createSubscriber- providing an instance of
toReactive(requestCount = 10)- converts to
org.reactivestreams.Subscriberwhich demands 10 elements in advance, before processing them;
requestCountcan be considered as a size of internal buffer
That was my free lunch! At least 40% of it because only 10 of 25 specs of TCK are run by default without extra custom code. It is good beginning though!
Channel API, Monix goodness and ReactiveStreams TCK made my first objective quite easy.
It’s turn to implement
Observable that will consume from RabbitMQ queue.
API gives me a choice between callback and pull.
By callback API I understand using
Channel.basicConsume(queue, autoAck, consumer),
consumer has handful of callback methods.
reactive-rabbit uses this approach.
Pulling is using
Channel.basicGet(queue, autoAck) which returns
basicGet is synchronous and blocking, but if there is no message ready for consumption,
null instantly - it doesn’t perform blocking await for next message.
I’ll use pull.
The second design decision to make is to choose how to acknowledge messages in case of
basicGet(queue, false) is used - that is when client side decides when messages processing is done and it is OK to remove it from queue.
I’m going to make an
QueueObservable that is an
which closes on
Channel reference and allows to
ack message to the broker.
Similar approach is used by Akka Streams connector for Apache Kafka, which returns
CommittableMessage to commit offsets.
Same type is returned regardless auto-ack is used, that is only to keep code blog-post-example short.
Observable I need to provide a function from
Very side-effecting function in fact.
It should start process of feeding given
Subscriber with elements that are observable (messages from queue in our case).
Cancelable ought to allow aborting that process.
What I’m going to do with given
Channelfor it, if that step fails, call
onError(queue got deleted, channel got closed, etc.)
onComplete, because I treat queues as infinite observables (design decision)
Following code is more involving than previous examples, I tried to document my intentions with comments.
feeding is a process that feeds subscriber with messages consumed from queue,
with added cancel handler (sets
continue flag to false)
and error handler that signals to
I justify so trivial cancellation with synchronous nature of
and quick spinning in case of no messages ready for being consumed.
oneGet if processing should continue,
otherwise aborts channel and terminates processing with
oneGet performs one
if there was some message consumed
subscriber.onNext is called.
If subscriber wants to
feedSubscriber is trampolined.
In opposite case channel is aborted and processing is terminated by
When exception occurs, perhaps during
Task.raiseError is returned.
It will cause
feeding - thus
No surprise for testing
I’ll use reactive streams TCK again.
Observable has method
toReactivePublisher by which I obtain
to run test against it.
I ought to justify myself for skipping test for Specification Rule 1.09
Textual specification states:
Original test fails because it expects
onSubscribe and given implementation doesn’t call it.
Although it doesn’t break specification because it doesn’t signal anything at all in this test case -
doesn’t violate “MUST call onSubscribe (…) prior to any other signals (…)”.
Of course, more rules of specification could be checked with extra effort although I’m going to stop here.
Thank you for your attention, I hope this post is helpful!
In my opinion it went pretty easy.
For sure there are features missing,
like Publisher Confirms (for
or using Channel auto-recovery (for both
also my take on acknowledgments is very simple,
so don’t I dare comparing implementation with Monix to
or any other production grade library.
I hope I demonstrated potential of creating one with Monix
and provided nice example of creating