Pattern: Evented (Reactor)
Up until now all of the patterns we’ve seen have really been a variation on the serial pattern. Besides the serial pattern itself, the other patterns used the same structure but wrapped threads or processes around it.
This pattern takes a whole different approach that won’t look anything like the others.
Overview
The evented pattern (based on the Reactor pattern) seems to be all the rage these days. It’s at the core of libraries like EventMachine, Twisted, Node.js, Nginx, and others.
This pattern is single-threaded and single-process, yet it can achieve levels of concurrency at least on par with the other patterns discussed.
It centers around a central connection multiplexer (hereby referred to as the Reactor core). Each stage of the connection lifecycle is broken down into individual events that can be interleaved and handled in any given order. The different stages of a connection are simply the possible IO operations: accept, read, write, close.
The central multiplexer monitors all the active connections for events and dispatches the relevant code upon being triggered by an event.
Let’s review the workflow:
- The server monitors the listening socket for incoming connections.
- Upon receiving a new connection it adds it to the list of sockets to monitor.
- The server now monitors the active connection as well as the listening socket.
- Upon being notified that the active connection is readable the server reads a chunk of data from that connection and dispatches the relevant callback.
- Upon being notified that the active connection is still readable the server reads another chunk and dispatches the callback again.
- The server receives another new connection; it adds that to the list of sockets to monitor.
- The server is notified that the first connection is ready for writing, so the response is written out on that connection.
Keep in mind that all of this is happening in a single thread. Notice that the server was able to accept
a new connection while the first connection was still in the middle of its read/write flow?
The server is simply splitting each operation into small chunks so that the various events pertaining to multiple connections can be interleaved.
Time to dig into the code.
Implementation
require 'socket'
require_relative '../command_handler'
module FTP
class Evented
CHUNK_SIZE = 1024 * 16
class Connection
CRLF = "\r\n"
attr_reader :client
def initialize(io)
@client = io
@request, @response = "", ""
@handler = CommandHandler.new(self)
respond "220 OHAI"
on_writable
end
def on_data(data)
@request << data
if @request.end_with?(CRLF)
# Request is completed.
respond @handler.handle(@request)
@request = ""
end
end
def respond(message)
@response << message + CRLF
# Write what can be written immediately,
# the rest will be retried next time the
# socket is writable.
on_writable
end
def on_writable
bytes = client.write_nonblock(@response)
@response.slice!(0, bytes)
end
def monitor_for_reading?
true
end
def monitor_for_writing?
!(@response.empty?)
end
end
def initialize(port = 21)
@control_socket = TCPServer.new(port)
trap(:INT) { exit }
end
def run
@handles = {}
loop do
to_read = @handles.values.select(&:monitor_for_reading?).map(&:client)
to_write = @handles.values.select(&:monitor_for_writing?).map(&:client)
readables, writables = IO.select(to_read + [@control_socket], to_write)
readables.each do |socket|
if socket == @control_socket
io = @control_socket.accept
connection = Connection.new(io)
@handles[io.fileno] = connection
else
connection = @handles[socket.fileno]
begin
data = socket.read_nonblock(CHUNK_SIZE)
connection.on_data(data)
rescue Errno::EAGAIN
rescue EOFError
@handles.delete(socket.fileno)
end
end
end
writables.each do |socket|
connection = @handles[socket.fileno]
connection.on_writable
end
end
end
end
end
server = FTP::Evented.new(4481)
server.run
You can see already that this implementation follows a different cadence than the others that we’ve looked at thus far. Let’s start breaking it down by section.
class Connection
This bit of code defines a Connection
class for our Evented server.
We saw a Connection
class for the threaded examples earlier to keep state separated between threads. This example doesn’t use threads, so why does it need a Connection
class?
All of the process-based patterns used processes to separate connections from each other. No matter which way they were using processes, they always made sure that each connection was handled by a single, independent process; each connection was represented by a process.
The Evented pattern is single-threaded, but multiple client connections will be handled concurrently, so each client connection needs to represented with its own object so they don’t trample on each others state.
class Connection
CRLF = "\r\n"
attr_reader :client
def initialize(io)
@client = io
@request, @response = "", ""
@handler = CommandHandler.new(self)
respond "220 OHAI"
on_writable
end
Starting at the top of the Connection
class, we see some familiarity.
The connection stores the actual underlying IO
object in its @client
instance variable and makes that accessible to the outside world with an attr_accessor
.
When an individual connection is initialized it gets its own CommandHandler
instance, just as before. After that it writes out the customary ‘hello’ response that FTP requires. However, rather than writing it out to the client connection directly, it just assigns the response body to the @response
variable. As we’ll see in the next section this triggers the Reactor to take over and send this data out to the client.
def on_data(data)
@request << data
if @request.end_with?(CRLF)
# Request is completed.
respond @handler.handle(@request)
@request = ""
end
end
def respond(message)
@response << message + CRLF
# Write what can be written immediately,
# the rest will be retried next time the
# socket is writable.
on_writable
end
def on_writable
bytes = client.write_nonblock(@response)
@response.slice!(0, bytes)
end
def monitor_for_reading?
true
end
def monitor_for_writing?
!(@response.empty?)
end
This part of Connection
defines the lifecycle methods that the Reactor core interacts with.
For example, when the Reactor reads data from the client connection it triggers the on_data
with that data. Inside that method it checks to see if it’s received a complete request. If it has then it asks the @handler
to build the response and, once again, assigns that to @response
.
The on_writable
method is called when the client connection is ready to be written to. This is where the @response
variable is dealt with. It writes what it can from the @response
out to the client connection. Based on how many bytes it was able to write, it slices the @response
to remove the bit that was successfully written.
As such, any subsequent writes will only write the part of the @response
that wasn’t able to be written this time around. If the whole thing was able to be written, the @response
will be sliced to an empty string, and nothing more will be written.
The last two methods, monitor_for_reading?
and monitor_for_writing?
, are queried by the Reactor to see if it should monitor the state of this particular connection for reading, writing, or both. In this case we’re always willing to read new data if it’s available, but we only want to monitor for the ability to write if there’s a @response
to be written. Given an empty @response
, the Reactor won’t notify us if the client connection is writable.
def run
@handles = {}
loop do
to_read = @handles.values.select(&:monitor_for_reading?).map(&:client)
to_write = @handles.values.select(&:monitor_for_writing?).map(&:client)
readables, writables = IO.select(to_read + [@control_socket], to_write)
This is the main work of the Reactor core.
The @handles
Hash looks something {6 => #<FTP::Evented::Connection:xyz123>)
where the keys are file descriptor numbers and the values are Connection
objects.
The first lines inside the main loop ask each of the active connections if they want to be monitored for reading or writing, using the lifecycle methods we saw earlier. It grabs a reference to the underlying IO
object for each of the eligible connections.
The Reactor then passes these IO
instances to IO.select
with no timeout. This IO.select
call will block until one of the monitored sockets gets an event that requires attention.
Note that the Reactor also sneaks the @control_socket
into the connections to monitor for reading so it can detect new incoming client connections.
readables.each do |socket|
if socket == @control_socket
io = @control_socket.accept
connection = Connection.new(io)
@handles[io.fileno] = connection
else
connection = @handles[socket.fileno]
begin
data = socket.read_nonblock(CHUNK_SIZE)
connection.on_data(data)
rescue Errno::EAGAIN
rescue EOFError
@handles.delete(socket.fileno)
end
end
end
writables.each do |socket|
connection = @handles[socket.fileno]
connection.on_writable
end
end
This is the part of the Reactor that triggers appropriate methods based on events it receives from IO.select
.
First, it handles the sockets deemed ‘readable’. If the @control_socket
was readable this means that there’s a new client connection. So the Reactor accept
s, builds a new Connection
and slots it into the @handles
Hash so it can be monitored the next time around the loop.
Next, it handles the case where a socket deemed ‘readable’ was a regular client connection. In this case it attempts to read the data and trigger the on_data
method of the appropriate Connection
. In the case that the read would block (Errno::EAGAIN
), it doesn’t do anything special, just lets the event fall through. In the case that the client disconnected (EOFError
), it makes sure to remove the entry from the @handles
Hash so the appropriate objects can be garbage collected and will no longer be monitored.
The last bit handles sockets deemed ‘writable’ simply by triggering the on_writable
method of the appropriate Connection
.
Considerations
This pattern is notably different than the others and, as such, produces notably different advantages and disadvantages.
First of all, this pattern has a reputation of being able to handle extremely high levels of concurrency, numbering in the thousands or tens of thousands, of concurrent connections. This is something that the other patterns simply can’t approach because they’re limited by processes/threads.
If your server attempts to spawn 5000 threads to handle 5000 connections then things will likely grind to a halt. This pattern wins, hands down, in terms of handling concurrent connections.
The main disadvantage of this pattern is the programming model that it forces upon you. On the one hand the model is simpler because there are no processes/threads to deal with. This means there are no issues of shared memory, synchronization, runaway processes, etc. to deal with. However, given that all this concurrency is happening inside a single thread, there’s one very important rule to follow: never block the Reactor.
To illustrate this, let’s look closely at our implementation. Look inside the CommandHandler
class. Notice that when it handles the FTP file transfer command (RETR
) it actually opens a socket, streams the data, then closes the socket. The important part is that this socket is being used outside the main Reactor loop, the Reactor knows nothing about it.
Imagine that our client requesting a file transfer were on a slow connection. What effect would this have on the Reactor?
Given that everything runs in same thread, this single slow client connection would block the whole Reactor! When the Reactor triggers a method on a Connection
, the whole Reactor is blocked until that method returns. Since the on_data
method delegates to the CommandHandler
, the whole Reactor is blocked while it streams the file transfer to the client. In the meantime, no other data is being read, no new connections are being accepted, etc.
It’s very important that anything that your application code wants to do, be done very quickly. So how should we handle a slow connection with a Reactor? Use the Reactor itself!
If you’re using this pattern you need to make sure that any blocking IO is handled by the Reactor itself. In our example this would mean the socket used by the CommandHandler
would need to be encapsulated inside its own subclass of Connection
that defined its own on_data
and on_writable
methods.
When the Reactor is able to write some data to that slow connection, it would then trigger the appropriate on_writable
method, which would write as much as it could to the client without blocking. In this way the Reactor can continue processing other connections while waiting for this slow remote connection, yet still handle that connection when it’s ready.
In short, this pattern offers some obvious advantages and really simplifies some aspects of socket programming. On the other hand, it requires you to rethink all of the IO that your app does. It’s easy to cancel all of the offered benefits with a bit of slow code or some third-party library that does blocking IO.