Barriers in LMAX Disruptor

LMAX Disruptor is an event queue implementation that made a lot of buzz on the Internet a couple of years ago due to its impressive performance and unique approach to concurrency issues. The library was designed by LMAX Exchange Inc., for their core business, a financial trading platform. The developers faced a problem of handling up to 6 millions orders per seconds, each of them changing the market conditions. Their architecture is based on event sourcing, where all the business logic is handled by a single thread (!) that keeps data in memory. Disruptor plays a key role here, because it is used for efficient passing the events into that thread, and delivering output events to the rest of the system. The implementation of disruptor is based on a pre-initialized ring buffer, optimized for efficient caching by CPU-s and lack of locking. I don’t want to cover the entire theory here, as this topic has been covered by Martin Fowler in his article.

I was wondering, how to connect three consumers that read the events from the ring buffer in a case, where the last consumer cannot advance to the given event unless it is processed by the first two consumers. I knew this is possible, as LMAX Disruptor already supports barriers, however the API to compose them changed over the years, and all the blogs and tutorials I found referred to the old one. Suddenly, I typed dot and the IDE showed me the secrets of the API, revealing a nice DSL for composing the barriers. Below you can find a simple example:

public class DisruptorTest {
   public static void main(String[] args) {
      int bufferSize = 1024;      
      Disruptor<Event> disruptor = new Disruptor<>(
         Event::new, bufferSize, Executors.defaultThreadFactory()
      );
      
      // 1
      disruptor
          .handleEventsWith(new Consumer("A"), new Consumer("B"))
          .then(new Consumer("C"));
      
      disruptor.start();
      produceSomeEvents(disruptor);
      disruptor.shutdown();
   }
   
   private static void produceSomeEvents(Disruptor<Event> disruptor) {
      // 2
      RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();
      Producer producer = new Producer();
      for (int i = 0; i < 1000; i++) {
         ringBuffer.publishEvent(producer);
      }
   }
   
   public static class Event {
      private int value;
   }
   
   public static class Producer implements EventTranslator<Event> {
      private int i = 0;
      
      @Override
      public void translateTo(Event event, long sequence) {
         event.value = i++;
      }
   }
   
   public static class Consumer implements EventHandler<Event> {
      private final String name;
      
      public Consumer(String name) {
         this.name = name;
      }

      @Override
      public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
         System.out.println(name+": " + event.value);
      }
   }
}

Explanation:

  1. This our consumer composition. We register three consumers. A and B can work concurrently, whereas C can never pass them in the event processing.
  2. Publish the numbers from 0 to 1000.

All the consumers simply print out the the next number and their name. If you run this example, you’ll see that C always prints out the numbers that have been previously printed by A and B, whereas the outputs of A and B can be mingled. Everything is done with the little help of magic of handleEventsWith() and then() chain.

Barriers are useful for building a reliable event processing architecture. One of the consumers can be a journal, the second one the actual handler. Only the events that have been persisted, can be processed. Otherwise, we risk a failure, where some event has already been processed, it made visible effects to the other services, but has not been written to the journal and after the recovery, we end with a lost event. So use them.