Pattern: Evented (Reactor)

Up until now all of the patterns we’ve seen have really been a variation on the serial pattern. Besides the serial pattern itself, the other patterns used the same structure but wrapped threads or processes around it.

This pattern takes a whole different approach that won’t look anything like the others.

Overview

The evented pattern (based on the Reactor pattern) seems to be all the rage these days. It’s at the core of libraries like EventMachine, Twisted, Node.js, Nginx, and others.

This pattern is single-threaded and single-process, yet it can achieve levels of concurrency at least on par with the other patterns discussed.

It centers around a central connection multiplexer (hereby referred to as the Reactor core). Each stage of the connection lifecycle is broken down into individual events that can be interleaved and handled in any given order. The different stages of a connection are simply the possible IO operations: accept, read, write, close.

The central multiplexer monitors all the active connections for events and dispatches the relevant code upon being triggered by an event.

Let’s review the workflow:

  1. The server monitors the listening socket for incoming connections.
  2. Upon receiving a new connection it adds it to the list of sockets to monitor.
  3. The server now monitors the active connection as well as the listening socket.
  4. Upon being notified that the active connection is readable the server reads a chunk of data from that connection and dispatches the relevant callback.
  5. Upon being notified that the active connection is still readable the server reads another chunk and dispatches the callback again.
  6. The server receives another new connection; it adds that to the list of sockets to monitor.
  7. The server is notified that the first connection is ready for writing, so the response is written out on that connection.

Keep in mind that all of this is happening in a single thread. Notice that the server was able to accept a new connection while the first connection was still in the middle of its read/write flow?

The server is simply splitting each operation into small chunks so that the various events pertaining to multiple connections can be interleaved.

Time to dig into the code.

Implementation

require 'socket'
require_relative '../command_handler'

module FTP
  class Evented
    CHUNK_SIZE = 1024 * 16
    
    class Connection
      CRLF = "\r\n"
      attr_reader :client

      def initialize(io)
        @client = io
        @request, @response = "", ""
        @handler = CommandHandler.new(self)

        respond "220 OHAI"
        on_writable
      end

      def on_data(data)
        @request << data

        if @request.end_with?(CRLF)
          # Request is completed.
          respond @handler.handle(@request)
          @request = ""
        end
      end

      def respond(message)
        @response << message + CRLF
        
        # Write what can be written immediately,
        # the rest will be retried next time the
        # socket is writable.
        on_writable
      end

      def on_writable
        bytes = client.write_nonblock(@response)
        @response.slice!(0, bytes)
      end

      def monitor_for_reading?
        true
      end

      def monitor_for_writing?
        !(@response.empty?)
      end
    end

    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def run
      @handles = {}

      loop do
        to_read = @handles.values.select(&:monitor_for_reading?).map(&:client)
        to_write = @handles.values.select(&:monitor_for_writing?).map(&:client)

        readables, writables = IO.select(to_read + [@control_socket], to_write)

        readables.each do |socket|
          if socket == @control_socket
            io = @control_socket.accept
            connection = Connection.new(io)
            @handles[io.fileno] = connection

          else
            connection = @handles[socket.fileno]

            begin
              data = socket.read_nonblock(CHUNK_SIZE)
              connection.on_data(data)
            rescue Errno::EAGAIN
            rescue EOFError
              @handles.delete(socket.fileno)
            end
          end
        end

        writables.each do |socket|
          connection = @handles[socket.fileno]
          connection.on_writable
        end
      end
    end
  end
end

server = FTP::Evented.new(4481)
server.run

You can see already that this implementation follows a different cadence than the others that we’ve looked at thus far. Let’s start breaking it down by section.

    class Connection

This bit of code defines a Connection class for our Evented server.

We saw a Connection class for the threaded examples earlier to keep state separated between threads. This example doesn’t use threads, so why does it need a Connection class?

All of the process-based patterns used processes to separate connections from each other. No matter which way they were using processes, they always made sure that each connection was handled by a single, independent process; each connection was represented by a process.

The Evented pattern is single-threaded, but multiple client connections will be handled concurrently, so each client connection needs to represented with its own object so they don’t trample on each others state.

    class Connection
      CRLF = "\r\n"
      attr_reader :client

      def initialize(io)
        @client = io
        @request, @response = "", ""
        @handler = CommandHandler.new(self)

        respond "220 OHAI"
        on_writable
      end

Starting at the top of the Connection class, we see some familiarity.

The connection stores the actual underlying IO object in its @client instance variable and makes that accessible to the outside world with an attr_accessor.

When an individual connection is initialized it gets its own CommandHandler instance, just as before. After that it writes out the customary ‘hello’ response that FTP requires. However, rather than writing it out to the client connection directly, it just assigns the response body to the @response variable. As we’ll see in the next section this triggers the Reactor to take over and send this data out to the client.

      def on_data(data)
        @request << data

        if @request.end_with?(CRLF)
          # Request is completed.
          respond @handler.handle(@request)
          @request = ""
        end
      end

      def respond(message)
        @response << message + CRLF

        # Write what can be written immediately,
        # the rest will be retried next time the
        # socket is writable.
        on_writable
      end

      def on_writable
        bytes = client.write_nonblock(@response)
        @response.slice!(0, bytes)
      end

      def monitor_for_reading?
        true
      end

      def monitor_for_writing?
        !(@response.empty?)
      end

This part of Connection defines the lifecycle methods that the Reactor core interacts with.

For example, when the Reactor reads data from the client connection it triggers the on_data with that data. Inside that method it checks to see if it’s received a complete request. If it has then it asks the @handler to build the response and, once again, assigns that to @response.

The on_writable method is called when the client connection is ready to be written to. This is where the @response variable is dealt with. It writes what it can from the @response out to the client connection. Based on how many bytes it was able to write, it slices the @response to remove the bit that was successfully written.

As such, any subsequent writes will only write the part of the @response that wasn’t able to be written this time around. If the whole thing was able to be written, the @response will be sliced to an empty string, and nothing more will be written.

The last two methods, monitor_for_reading? and monitor_for_writing?, are queried by the Reactor to see if it should monitor the state of this particular connection for reading, writing, or both. In this case we’re always willing to read new data if it’s available, but we only want to monitor for the ability to write if there’s a @response to be written. Given an empty @response, the Reactor won’t notify us if the client connection is writable.

    def run
      @handles = {}

      loop do
        to_read = @handles.values.select(&:monitor_for_reading?).map(&:client)
        to_write = @handles.values.select(&:monitor_for_writing?).map(&:client)

        readables, writables = IO.select(to_read + [@control_socket], to_write)

This is the main work of the Reactor core.

The @handles Hash looks something {6 => #<FTP::Evented::Connection:xyz123>) where the keys are file descriptor numbers and the values are Connection objects.

The first lines inside the main loop ask each of the active connections if they want to be monitored for reading or writing, using the lifecycle methods we saw earlier. It grabs a reference to the underlying IO object for each of the eligible connections.

The Reactor then passes these IO instances to IO.select with no timeout. This IO.select call will block until one of the monitored sockets gets an event that requires attention.

Note that the Reactor also sneaks the @control_socket into the connections to monitor for reading so it can detect new incoming client connections.

        readables.each do |socket|
          if socket == @control_socket
            io = @control_socket.accept
            connection = Connection.new(io)
            @handles[io.fileno] = connection

          else
            connection = @handles[socket.fileno]

            begin
              data = socket.read_nonblock(CHUNK_SIZE)
              connection.on_data(data)
            rescue Errno::EAGAIN
            rescue EOFError
              @handles.delete(socket.fileno)
            end
          end
        end

        writables.each do |socket|
          connection = @handles[socket.fileno]
          connection.on_writable
        end
      end

This is the part of the Reactor that triggers appropriate methods based on events it receives from IO.select.

First, it handles the sockets deemed ‘readable’. If the @control_socket was readable this means that there’s a new client connection. So the Reactor accepts, builds a new Connection and slots it into the @handles Hash so it can be monitored the next time around the loop.

Next, it handles the case where a socket deemed ‘readable’ was a regular client connection. In this case it attempts to read the data and trigger the on_data method of the appropriate Connection. In the case that the read would block (Errno::EAGAIN), it doesn’t do anything special, just lets the event fall through. In the case that the client disconnected (EOFError), it makes sure to remove the entry from the @handles Hash so the appropriate objects can be garbage collected and will no longer be monitored.

The last bit handles sockets deemed ‘writable’ simply by triggering the on_writable method of the appropriate Connection.

Considerations

This pattern is notably different than the others and, as such, produces notably different advantages and disadvantages.

First of all, this pattern has a reputation of being able to handle extremely high levels of concurrency, numbering in the thousands or tens of thousands, of concurrent connections. This is something that the other patterns simply can’t approach because they’re limited by processes/threads.

If your server attempts to spawn 5000 threads to handle 5000 connections then things will likely grind to a halt. This pattern wins, hands down, in terms of handling concurrent connections.

The main disadvantage of this pattern is the programming model that it forces upon you. On the one hand the model is simpler because there are no processes/threads to deal with. This means there are no issues of shared memory, synchronization, runaway processes, etc. to deal with. However, given that all this concurrency is happening inside a single thread, there’s one very important rule to follow: never block the Reactor.

To illustrate this, let’s look closely at our implementation. Look inside the CommandHandler class. Notice that when it handles the FTP file transfer command (RETR) it actually opens a socket, streams the data, then closes the socket. The important part is that this socket is being used outside the main Reactor loop, the Reactor knows nothing about it.

Imagine that our client requesting a file transfer were on a slow connection. What effect would this have on the Reactor?

Given that everything runs in same thread, this single slow client connection would block the whole Reactor! When the Reactor triggers a method on a Connection, the whole Reactor is blocked until that method returns. Since the on_data method delegates to the CommandHandler, the whole Reactor is blocked while it streams the file transfer to the client. In the meantime, no other data is being read, no new connections are being accepted, etc.

It’s very important that anything that your application code wants to do, be done very quickly. So how should we handle a slow connection with a Reactor? Use the Reactor itself!

If you’re using this pattern you need to make sure that any blocking IO is handled by the Reactor itself. In our example this would mean the socket used by the CommandHandler would need to be encapsulated inside its own subclass of Connection that defined its own on_data and on_writable methods.

When the Reactor is able to write some data to that slow connection, it would then trigger the appropriate on_writable method, which would write as much as it could to the client without blocking. In this way the Reactor can continue processing other connections while waiting for this slow remote connection, yet still handle that connection when it’s ready.

In short, this pattern offers some obvious advantages and really simplifies some aspects of socket programming. On the other hand, it requires you to rethink all of the IO that your app does. It’s easy to cancel all of the offered benefits with a bit of slow code or some third-party library that does blocking IO.

Examples