Our thoughts, knowledge, insights and opinions

Introduction to Streams in Akka

A very common scenario in many kinds of software is when the input data is potentially unlimited and it can appear at arbitrary intervals. The common way of handling such cases is using the Observer pattern in it’s imperative form - callbacks.

But this approach creates what’s commonly called “Callback Hell”. It’s a concept basically identical to the more commonly known “GOTO Hell” as they both mean erratic jumps in flow of control that can be very hard to reason about and work with. When writing an application we need to analyze all the callbacks to be sure e.g. we’re not using a value that can be changed by a callback at a random point of time.

But there exists a declarative approach to solving this problem that let’s us reason about it in a much more predictable and less chaotic fashion - Streams.

Introduction to the concept of Streams

Basic concept and terminology

First and foremost - why use Streams? What kind of advantage do they give us over the standard ways (e.g. callbacks) of handling Codata (i.e. potentially unlimited data)?

The answer is simple - it abstracts away from the imperative nature of how the data is inputted into the application giving us a declarative way of describing, handling it and hiding details that we don’t care about. This makes reasoning and design much easier.

Of course as with all abstractions we loose some control over how it’s handled that we could use to e.g. implement a more fine-grained control of how the input data is handled that could lead to optimizing how the application works.

Any non-cyclical Stream consists of at least 2 parts - the Source and the Sink. In between them can be any finite number of Flow elements. The Source and Sink are special cases of the Flow element, depending on whenever only the output is open (Source), or the input (Sink).

Here is an explanation of what each element is, using the terminology from Akka Streams:

  • Source - as the name suggests the source of the data, consists of exactly one output
  • Flow - basically an ordered collection of transformation that act upon the data from the Source, consists of exactly one output and one input
  • Sink - the receiver of the data after it’s transformed by the Flow, consists of exactly one input

You can think about a Source as a Publisher and the Sink as a Subscriber.

Stream composition and abstraction

A very important thing to notice is that we can combine these elements to get another part. E.g. if we combine a Source and a Flow to obtain a new Source or n-Flows to get another Flow. Combining a Source and Sink will produce a Flow as it has one input and one output.

This gives us the ability to compose Streams just as we compose functions and ease up reasoning and developing the system as we start to operate on independent constructs that we “glue” together later on. Basically we can put together a few Streams, put them in a “box” to hide the details we don’t need and use it as an independent element with its own unique functionality. This let’s us reason about them on a higher level, easening up the development and design process.

An example of this would be combining a Source that emits natural numbers with a Flow that filters out non-prime numbers. By combining them both we get Source that outputs primes which we can use in further Stream composition. We don’t care from what Streams it’s built, just that it is a Source that emits primes.

Back Pressure

A possible problematic scenario is when the Source produces values too fast for the Sink to handle and can possibly overwhelm it. As it gets more data that it cannot
process at the moment it constantly buffers it for processing in the future. The biggest problem with this is that we handle Codata which is possibly infinite thus the buffer would need to be also be of infinite size (which is of course impossible and eventually the system runs out of memory).

To combat this the Sink would need to communicate with the Source to inform it that it should “slow down” with pushing new data until it finishes handling the current batch. This enables a constant size buffer for the Sink as it will inform the Source to stop sending new data when it’s not ready.

This kind of bilateral communication is called Back Pressure. Streams that employ this mechanism are often called Reactive Streams.

Streams in Akka

Introduction

Akka provides us with it’s own framework for working with Streams in it’s Akka Streams API. This text will be introductory as it would be impossible to cover it all in one single post while keeping it readable.

I will cover the basic project/dependency setup and show a simple code example along with details how it works.

Example

I will now present a simple code snippet of how to use Akka Streams in Scala (The akka-streams version is 2.4.17):

Or presented as a diagram:

Diagram of the example

In the example above we’ve created the:

  1. ActorSystem and ActorMaterializer instances that we will use to run the Stream. This is needed because Akka Streams is backed by Akka’s Actor model.
  2. Source based of the static number sequence’s iterator
  3. Flow that filters that only let’s through even numbers
  4. Sink that will print out its input to the console using println
  5. Complete Stream by connecting evenNumbers with consoleSink and running it by using runWith

In the following sections I will try to explain in more detail what each of these elements does, how their API looks like and how to use them.

Actor Materializer

The ActorMaterializer class instance is needed to materialize a Flow into a Processor which represents a processing stage, which is a construct from the Reactive Streams standard, which Akka Streams implements.

In fact Akka Streams employs back-pressure as described in the Reactive Streams standard mentioned above. Source, Flow, Sink get eventually transformed into low-level Reactive Streams constructs via the process of materialization.

Source

Source as explained previously represents a Stream (set of processing steps) that consists of only one open output.

Source takes two type parameters. The first one represents the type of data it emits and the second one is the type of the auxiliary value it can produce when ran/materialized. If we don’t produce any we use the NotUsed type provided by Akka.

The static methods to create a Source are provided by the Source companion object. I’ll provide a quick rundown of them:

  • fromIterator - This is what we used in the example. It takes a function that returns an Iterator. It will take elements until iterator is empty or next() fails.
  • fromPublisher - Uses an object that provides the Publisher functionality to produce elements
  • fromFuture - Start a new Source from the given Future. The stream will consist of one element when the Future is completed with a successful value.
  • fromGraph - A Source shaped Graph is of course also a source, this will simply type it as Source. Graphs in Akka Streams won’t be explained in this introduction post.

The Source object also provides us with an apply helper method that simply takes an Iterable, creating a Source from it. The main difference from fromIterator is that every subscriber to the Source will get an individual flow of elements, always starting from the beginning, regardless when they subscribed to it.

Also these three methods can be pretty useful in many use-cases:

  • single - As the name suggest it takes a single element and creates a Source with one element from it.
  • repeat - Creates a Source that will continually emit the given element.
  • empty - A Source with no elements, it’s immediately completed for any connected Sink.

To run the Stream we have three convenient methods, two of them let us omit creating a separate Sink instance:

  • runWith - Convenience method run a Source, Flow or Sink. To run a Source we need to supply a Sink, for the Sink we need to supply a Source and for the Flow we need to supply two elements - a Sink and Source. Returns a Future that when the stream ends holds the final value of the evaluation or a Failure.
  • runFold - Shortcut method for running the Source with a Sink.fold(...)(...). It return a Future that when the stream ends holds the final value of the evaluation or a Failure.
  • runForeach - Shortcut for running this Source with a Sink.foreach(...). As above it returns a Future.

This should give You a brief outlook on how to create and work with the Source class.

Flow

Flow represents a Stream (set of processing steps) with one open input and one open output.

Basically a Flow is an ordered chain of transformations to its input, the cumulative effect of which it emits on its output. It takes three type parameters - first for the input data type, the second for the output and the last one for the auxiliary type.

The most straightforward way of creating a Flow class instance is by using the companion objects apply method. It creates an “identity” Flow, i.e. it simply propagates its input to its output, thus takes only one type parameter.

val uselessFlow: Flow[Int, Int, NotUsed] = Flow[Int]

The example above as You may guess is not very usable in real world scenarios. So how do we create something more useful? By method cascading. The Flow consists of a finite number of transformations upon the Stream that we put together using method cascading, as mentioned above:

val flowCascade: Flow[Int, Boolean, NotUsed] = Flow[Int].filter((num) => num > 2).map((num) => num % 2 == 0)

Here we simply discard all input smaller than 3 and on the output check if the number is even. We can chain as many transformations as we want when creating the flow or simply combine two flow into one using its join method.

We can also create a Source by combining an existing one and a Flow. This was used in the example code:

val evenNumbersSource: Source[Int, NotUsed] = numberSource.via(isEvenFlow)

The possible transformations are basically the same as with standard Scala collections, thus we’ll omit going into much detail about them. For more detailed information about them please consult the API documentation.

Sink

Sink represents a Stream (set of processing steps) with only one open input.

The Sink is the last element of our Stream puzzle. Basically it’s a subscriber of the data sent/processed by a Source. Usually it outputs its input to some system IO (TCP port, console, file, etc.) creating the side-effect of our application working.

It’s basically a Flow which uses an foreach or fold function to run a procedure (function with no return value, Unit) over its input elements and propagate the auxiliary value (e.g. a Future that will complete when when it finishes writing the input to a file or console).

As with Source and Flow the companion object provides method for creating an instance of it. As mentioned above the two main methods of doing so are:

  • foreach - Invokes the given procedure for each received element.
  • foreachParallel - same as above, except it invokes the procedure in parallel. Takes the number of concurrent executions to be used at the same time as the argument.
  • fold - Invokes the given procedure for each received element, propagating the resulting value to the next iteration.

For details about the rest of possible methods for creating and operating on a Sink please consult the documentation.

To run/materialize the Sink and the whole Stream we need to connect it to a Source as mentioned previously in the Source section. The runWith method produces a Future that will be completed when the Source is empty and the Sink finished processing the elements it got from it. If the processing fail it will return a Failure.

We can also create a RunnableGraph instance and run it manually using toMat (or viaMat), i.e.:

val streamGraph: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)

Every stage of the Stream produces a value but we only want the one produced by the Sink thus we use Keep.right. Then just run it:

val materializedValue: Future[Int] = streamGraph.run()

Doing this manually has a point if we have some reason to persist the values produced by the intermediate processing stages.

Summary

This more or less exhausts the basics of what are Streams and how to use Akka Streams to utilize them. Of course the example presented here was very simplistic and we only talked about a small part of the capabilities offered us by Akka Streams.

An important and very useful feature provided by the library that we didn’t go into in this post is describing the Stream structure as a graph by using a pretty straight-forward, declarative DSL. This enables us to much more flawlessly transform our design in graph/diagram form into clean and maintainable code.

As this is a pretty big and important topic I didn’t want to shove it in here as it would most probably have a negative effect upon the size and complexity of the post (as it is supposed to be an introduction). Most probably I’ll try to write another post that explains the whole concept of using graphs and the Graph DSL with more complex examples to show how it eases up developing application based on Stream processing. If someone is interesting on researching the topic further I’ve added a link to the official documentation. Also the Quick Start Guide contains a great introduction to using graphs.

An other pretty important concept is controlling the rate at which the elements are processed and how to control it.

Also worth reading is based on what principles and ideas Akka Streams are designed and built. I’ve added a link to the documentation explaining that in the Links section.

I hope You’ve enjoyed this short introduction to the concept of Streams and I hope that You’ve get the chance to utilize this knowledge in all kinds of interesting projects!

You like this post? Want to stay updated? Follow us on Twitter or subscribe to our Feed.