Our thoughts, knowledge, insights and opinions

Custom GraphStage in Akka Streams

In this post I will try to present what is GraphStage in Akka Streams. My goal is to describe when it’s useful and how to use it correctly. I will start with outlining key terminology, then proceed with simple example and after that the main use case will be covered. For the latter the most upvoted issue of akka-http will serve. At the end, I will show how to properly test GraphStage. Besides of learning API you’ll gain deeper understanding how backpressure works.

Basic terminology

When reading various materials about Akka Streams I see that common understanding of some terms are taken by granted. I think it’s not always the case, therefore I want to build up our terminology step by step. I just assume that you are already familiar with concepts of Flow, Source and Sink as they were explained in previous post.

Downstream and upstream

Let me explain downstream and upstream with the following graph:

Terms downstream and upstream are relative so you need to know what is your reference point. Let’s say we are talking about flow1 - then its upstream is source while its downstream is flow2 and anything “on right hand side” of flow2. Basically, it’s about direction in which data flows - places where messages are seen earlier are called upstream and places where they’re seen later are called downstream. That being said we can proceed to fundamental fact:

  • Upstream may complete of fail downstream. When upstream completes it means that it has no more data to send and by emitting such message informs about this fact its downstream. Conversely, when upstream fails it means that it was forced to abort its processing due to an error and propagates this error to its downstream.

  • Downstream may cancel upstream. Cancellation means that downstream is no longer interested in incoming data and by emitting such cancellation asks upstream not to send more data.

ProcessingStage and Shape

ProcessingStage is a common name for all building blocks that build up a graph.

Shape is determined by number of inlets (input ports) and outlets (output ports). Besides of that it reveals nothing about processing semantics. Let’s take a look at two examplary shapes:

Shape1 described above is called FlowShape. It has one inlet and one outlet. All Flows (e.g. map or filter) are of shape FlowShape. Shape in general may have arbitrary number of inlets and outlets, so the following one is an example of different Shape:

Shape2 from above has two inlets and one outlet.

What is GraphStage

You can think of GraphStage as an implementation block behind any ProcessingStage of RunnableGraph. Such broad definition implies that any Flow may be implemented as GraphStage. We will try to do that in next section.

Example of simple GraphStage

Let’s try to implement GraphStage responsible for filtering. Here’s a minimal code to have it done (full code with all imports and implicit may be found in accompanying repo for this and all following snippets):

Above implementation may be run with such code (full code):

OK, you pasted some code, it works as expected, now let’s analyze how we actually achieved that.

GraphStage API

Implementing GraphStage boils down to implementing two methods. The first one is:

  • def shape: S <: Shape - each element of execution graph is of defined Shape.

Another method is:

  • def createLogic(inheritedAttributes: Attributes): GraphStageLogic

Here we implement the logic of GraphStage. You should put all state variables within GraphStageLogic as this is the only part that is not reused between different materializations of stream. Our simple FilterStage is stateless - its processing logic depend solely on input elements. Therefore all we see in its GraphStageLogic are two calls of setHandler.

There are 2 types of handlers:

  • InHandler

Handler for inlets. You have to implement one method: def onPush(): Unit. There are other methods you may override, but let’s ignore them for a while. onPush is called when new element has been received on inlet. Usually implemented in terms of grab and push. grab(inlet) returns element recently pushed to inlet. push(el, outlet) pushes element to given outlet.

  • OutHandler

Handler for outlets. You have to implement one method: def onPull(): Unit. There is another method you may override, but we’ll leave it out for a while. onPull is called when demand for element has been signalled for this outlet. Usually implemented in terms of pull which allows to propagate demand upstream.

All inlets and outlets may be represented as Final State Machines. Consequently some operations are not valid in some states. When you call operation which is invalid for current state an exception will be thrown. More information about it and detailed FMS diagrams may be found at Port states section of Akka documentation. To understand why API looks like this and consequently to be able to use it in a correct way we need to think about its relation to backpressure.

Backpressure

Excerpted from Akka documentation:

Backpressure, in general, is a method where a consumer is able to regulate the rate of incoming data from a corresponding producer.

It nicely sums up the ultimate goal of backpressure. Such statement also suggest that consumer needs to have some mean to “regulate the rate of incoming data”. It can does so by sending control messages up to the producer by special channel. Now you should start to feel why API needs to have onPull and pull methods. Those methods refer not to data elements themselves but to this special channel that is used to propagate demand up to producer. Therefore we can say that demand flows from downstream to upstream as opposed to data which flows from upstream to downstream. It can be illustrated as a diagram (taken from akka docs, original):

ConductR

Thread safety

GraphStageLogic callbacks are thread safe the same way as Actor’s receive method is thread safe. That means they are never called concurrently and they can safely modify state of GraphStageLogic. On the other hand similar restriction to Actor one applies - Future’s callbacks should not close over state.

API sugar

Akka provides a lot of API sugar to help you type less code. So our Filter GraphStage can be rewritten as follows:

Real world Filter GraphStage

At the beginning I wrote that any node of execution graph may be implemented as GraphStage. That’s actually an understatement, because actually a lots of built-in combinators are implemented as GraphStage. (I lack knowledge here to say if it’s majority of built-in combinators or even all). Let’s take a look at akka.stream.impl.fusing.Filter:

Well, it differs a little bit from our naive implemenation but the essence remains the same. The main difference here is that Akka’s implementation takes care of applying proper Supervision policy in case of failure. It’s advisable here because filter executes code injected by user which potentially may throw exceptions.

I find the fact that most built-in combinators are implemented as GraphStage quite useful as it means that you can easily inspect how they work underneath. Also, you can use them as a reference when writing your own GraphStages.

When you should consider to write your own GraphStage

The pragmatic answer to such question is when thing you need to do is impossible to achieve with built-in combinators. Here are some signs that you should think about custom GraphStage:

  • you need to maintain state in order to process incoming messages. This is not a sufficient reason just by itself as you can do stateful computations using e.g. statefulMapConcat but this is not something that scales well as complexity grows and therefore should be used just for simple cases
  • you don’t pair input and output elements in one-to-one fashion. Simple example here may be filter - you don’t know how many elements of initial Source will Sink receive. More complicated GraphStages may buffer incoming messages in internal state and push them to downstream based on some criterias. It’s possible to do that because on GraphStage level we have access to push and pull methods and callbacks

More involving example

Now, after I introduced the basics of GraphStage we can move to more involving example. I came across it when working on akka-http#192. The goal is to establish TCP connection to proxy host. Then we need to send CONNECT message and after successful response from proxy we can send and receive actual requests.

To keep our scope on GraphStage I will skip some HTTP-specific things like parsing requests. Also, instead of presenting final version of code at the beginning we will go step by step, so I will present also flawed version of code. I believe it may be beneficial to reader to see whole process instead of just end result.

Where to plug ProxyStage

Let’s start by finding place where we will put not yet built ProxyStage. It looks like this currently:

You may wonder what is BidiFlow. This is an element of execution graph with two input and two output ports. Data elements arrive at first input (let’s call it I1), are processed and flows out at first output (O1), then the response comes back at second input (I2) and after being processed is yielded at second output (O2). Thus tlsStage can be illustrated with such diagram:

BidiFlow is especially useful when subsequent stages of your graph represents consecutive layers of communication. That’s why it was used for TLSStage and is commonly used in akka-http.

Now it’s clear that our ultimate goal can be achieved with such topology:

ProxyStage does not exist yet - we need to implement it. The only interesting thing that ProxyStage does is at the beginning of communication. After HTTPS tunnel is established it will serve just as transparent proxy that forwards what it receives. Let’s start with this easy part - we’ll call it NeutralStage. It may look like this:

It can be run with:

OK, it simply forwards messages in both directions. That’s a good starting point, now we need to send CONNECT message before we forward any other message. We can do it in bytesOut’s onPull - it will be called when transport layer asks for data:

We also introduced state variable of type State. It helps us to track current state of GraphStage. It is defined as:

We also need to take care of incoming data elements on input I1. We append them to bufferedMsgs. As soon as we are in Connected state we can simply push what we received.

The last piece we need to add is validating Proxy response for CONNECT message. If proxyResponseValid returns true we consider it a success and change internal state to Connected. Besides of that we need to remember to send buffered messages. We can do it with emitMultiple:

Whole code may be found in HttpsProxyStage0.scala in repo. We can run this code for sample Source with:

This code will hang up. Do you have any idea why?

The problem is that we don’t pull bytesIn enough. The only place when we pull bytesIn is inside sslOut’s onPull. It was called once (Sink signals it wants to consume some data) and then in bytesIn’s onPush we don’t push it outside as “OK” response is a technical message and Sink doesn’t care about it. In other words - our HttpsProxyStage will see one data element more for ports communicating with transport than for ports communicating with outside world. (That’s an oversimplification in case of akka-http. Because of its streaming nature we cannot be sure that response to CONNECT will come as a single data element. For simplicity in this article we assume that it will always come in one data element).

Let’s add pull(bytesIn) in bytesIn’s onPush (HttpsProxyStage1.scala:

After this change it seems to work.

Reduce state variables

We gained some deeper knowledge about relations between number of data elements. Now let’s analyze if we really need bufferedMsgs. You may think that we need to do something with incoming messages in sslIn’s onPush - and the only reasonable thing to do seems to save it to internal state. And that’s right, but key observation here is that we can avoid onPush to be called unless connection is established. We can do it by not pulling sslIn - as long as it’s not pulled no one is able to push anything to it.

After bufferedMsgs removal (HttpsProxyStage2.scala):

When we run this code it will throw an exception: java.lang.IllegalArgumentException: requirement failed: Cannot pull port (OutgoingSSL.in) twice. Exception message is quite clear, in stack trace we can even found fault line - it’s pull(sslIn) in bytesOut’s onPull. Seems like bytesIn.onPush is called before bytesOut.onPull which results in calling pull(sslIn) twice in row. We shouldn’t code against any specific order as we don’t control layer behind ProxyStage. What we should do is to test the code against both cases. The problem here is that with current testing code - namely transport being a simple flow created with Flow[ByteString].statefulMapConcat we don’t have enough control over data and demand elements. Moreover, logic of ProxyStage is getting quite complex and we should do some proper testing. Having tests done beforehand will make it easier to fix the bug.

Testing

We can test custom GraphStage and akka-streams in general with akka-stream-testkit. In simple cases it’s enough to run flow you want to test - in following example named flowUnderTest - in such fashion:

I want to stress out here that akka-stream-testkit gives us a huge control over all kind of messages used in Akka Reactive Streams implementation. Besides of data elements and completions we can also send and check errors or subscriptions. Let’s see how we can rewrite the last code snippet:

We replaced requestNext(9) with request(1) with subsequent expectNext(9). That way we decoupled pulling from asserting receival of expected message. Let’s take a look at the most used sink’s methods:

  • def request(n: Long) - pulls for n data elements from upstream
  • def expectNext(element: I) - expects arrival of element (does not pull)
  • def expectNext(): I - expects arrival of any element which is returned as result
  • def requestNext(element: T): - does what requestNext(1) and then expectNext(element) does

Going back to HttpsProxyStage - we want to test stage of BidiShape which is little bit more complicated. We can construct transport in such manner:

In that way we have access to transportInProbe and transportOutFlow which will allow us to test different transport layer behaviors.

What we wanted to achieve is to have bytesOut.onPull called before bytesIn.onPush. We can do this with following code:

We cannot replace transportInProbe.request(1) with transportInProbe.requestNext() as it would hang up test. We want to just pull at that place and then only after “OK” response was sent HttpsProxyStage will send anything more. More tests might be found in repo.

Ultimate implementation

You can look at the eventual implementation in CorrectHttpsProxyStage in repo.

We also override onUpstreamFinish and onDownstreamFinish to achieve transparency to errors. When a stage fails it firstly cancels all inputs and only after that sends error to all outputs. This ordering is problematic in case of BidiFlow because default implementation of onDownstreamFinish is to complete stage. So if transport will call failStage it will call onDownstreamFinish of bytesOut. Therefore onUpstreamFailure will never be called for ProxyStage as it already won’t be running when transport propagate errors to its downstream… That’s why we need to do this:

failStage vs throwing exception

We can see that both styles are used in ultimate implementation but there’s a slight difference between them. failStage should be used when something that was anticipated happened and we want to cancel upstream, fail downstream and then stop stage. Error should flows down the stream and may be e.g. materialized to failed Future. In our case it’s server responding with something different than “OK”. It’s a situation that may happen, is out of our control and we should be prepared to this.

On the other hand we have situations when some assertions, on which we rely on, may fail. In that case it’s advisable to throw an exception. The effect will be similar to failStage and error also will flow down the stream but situation will also we logged to akka log as an error.

Summary

  • GraphStage is useful when you run out of possibilities to get what you want with built-in combinators
  • GraphStage is especially useful when you want to have precise control over details how data elements and demand should flow
  • It takes great dilligence to properly balance pulls and pushs
  • akka-stream-testkit is there to help you test your GraphStage and flows in general. It’s very powerful tool which gives you great control over how data element and demand flow
  • When writing custom GraphStages take into account failure - how you want to propagate errors and if you want to suit it to Supervision Policy
  • BidiFlow is useful for modelling layered protocols
  • GraphStage API is a nice abstraction over Reactive Streams. Understanding this API may help you gain deeper understanding about Reactive Streams and backpressure in general
  • Most built-in combinators are implemented with GraphStage. You can study existing implementations and their tests

Acknowledgement

Thanks to James Roper for his suggestions in PR review.

You like this post? Want to stay updated? Follow us on Twitter or subscribe to our Feed.

by Michał Sitko
April 25, 2017
Tags : Akka Scala


comments powered by Disqus