How to stream multiple stream actions concurrently

How to stream multiple stream actions concurrently

To construct efficient, scalable and low-latency data streams, it is often important to perform tasks concurrently. However Akka Streams executes stages sequentially on a single thread, so you must explicitly request this concurrency. Inserting these asynchronous boundaries is done by way of adding Attributes.asyncBoundary into your flows and graphs using the async method on your SourceSink or Flow.

Choosing which stages can be performed in parallel requires a good understanding of the impact against the different operations in the pipeline. For more details, see this Akka blog article on the subject.

For example, compare the output between the normalGraph (non-parallel) and concurrentGraph (parallel) in the below program.

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val ec = system.dispatcher

  def myStage(name: String): Flow[Int,Int,NotUsed] = Flow[Int].map{index =>
    println(s"Stage $name is processing $index using thread '${Thread.currentThread().getName}'")
    index
  }

  // Run one Runnable graph at a time to see the difference. 
  // Observe the threads in both.  
  val normalGraph = Source(1 to 100000)
    .via(myStage("A"))
    .via(myStage("B"))
    .via(myStage("C"))
    .via(myStage("D"))
    .runWith(Sink.ignore)
    .onComplete(_ => system.terminate())

  val concurrentGraph = Source(1 to 100000)
    .via(myStage("A")).async
    .via(myStage("B")).async
    .via(myStage("C")).async
    .via(myStage("D")).async
    .runWith(Sink.ignore)
    .onComplete(_ => system.terminate())

See more in the Java or Scala documentation.


    • Related Articles

    • Error handling and recovery in Akka Streams

      When developing applications you should assume that there will be unexpected issues. For this, Akka provides a set of supervision strategies to deal with errors within your actors. Akka streams is no different, in fact its error handling strategies ...
    • How to implement batching logic in Akka Streams

      A common request we see with streaming data is the need to take the stream of elements and group them together (i.e. committing data to a database, a message queue or disk). Batching is usually a more efficient and performant solution than writing a ...
    • Terminating a stream

      Streams do not run on the caller thread, instead they run on a different background thread. This is done to avoid blocking the caller. Therefore, once the stream completes, you need to terminate the underlying actor system to completely end the ...
    • How to perform asynchronous streaming computations

      In certain situations you need an asynchronous operation with back pressure handled. For those it's as easy as adding mapAsync or mapAsyncUnordered depending on whether ordering for the elements is required or not. You will need to first provide a ...
    • How to do Throttling in Akka Streams

      When building a streaming application you may find the need to throttle the upstream so as to avoid exceeding a specified rate. Akka Stream's provides the capability to either fail the stream or shape it by applying back pressure. This is simply done ...