Multiplexing Connections
Connection multiplexing refers to working with multiple active sockets at the same time. This doesn’t specifically refer to doing any work in parallel and is not related to multi-threading. An example will make it clear.
Given the techniques seen so far, let’s imagine how we might write a server that needs to process available data on several TCP connections at any given time. We’d probably use our newfound knowledge of non-blocking IO to keep ourselves from getting stuck blocking on any particular socket.
# Given an Array of connections.
connections = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
# We enter an endless loop.
loop do
# For each connection...
connections.each do |conn|
begin
# Attempt to read from each connection in a non-blocking way,
# processing any data received, otherwise trying the next
# connection.
data = conn.read_nonblock(4096)
process(data)
rescue Errno::EAGAIN
end
end
end
Does this work? It does! But it’s a very busy loop.
Each call to read_nonblock
uses at least one system call and the server will be wasting a lot of cycles trying to read data when there is none. Remember that I said read_nonblock
checks if there’s any data available using select(2)? Well, there’s a Ruby wrapper so that we can use select(2) directly for our own purposes.
select(2)
Here’s the saner method of processing available data on multiple TCP connections:
# Given an Array of connections.
connections = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
loop do
# Ask select(2) which connections are ready for reading.
ready = IO.select(connections)
# Read data only from the available connections.
readable_connections = ready[0]
readable_connections.each do |conn|
data = conn.readpartial(4096)
process(data)
end
end
This example uses IO.select
to greatly reduce the overhead of handling multiple connections. The whole purpose of IO.select
is take in some IO
objects and tell you which of those are ready to be read from or written to so you don’t have to take shots in the dark like we did above.
Let’s review some properties of IO.select
.
It tells you when file descriptors are ready for reading or writing. In the above example we only passed one argument to IO.select
, but there are actually three important Arrays that IO.select
takes as arguments.
for_reading = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
for_writing = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
IO.select(for_reading, for_writing, for_reading)
The first argument is an Array of IO
objects which you want to read from. The second argument is an Array of IO
objects which you want to write to. The third argument is an Array of IO
objects for which you are interested in exceptional conditions. The vast majority of applications can ignore the third argument unless you’re interested in out-of-band data (more on that in the Urgent Data chapter). Note that even if you’re interested in reading from a single IO
object you still must put it in an Array to pass to IO.select
.
It returns an Array of Arrays. IO.select
returns a nested array with three elements that correspond to its argument list. The first element will contain IO
objects that can be read from without blocking. Note that this will be a subset of the Array of IO
objects passed in as the first argument. The second element will contain IO
objects that can be written to without blocking, and the third element will contain IO
objects which have applicable exceptional conditions.
for_reading = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
for_writing = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
ready = IO.select(for_reading, for_writing, for_reading)
# One Array is returned for each Array passed in as an argument.
# In this case none of the connections in for_writing were writable
# and one of connections in for_reading was readable.
p ready #=> [[<TCPSocket>], [], []]
It’s blocking. IO.select
is a synchronous method call. Using it like we’ve seen thus far will cause it to block until the status of one of the passed-in IO
objects changes. At this point it will return immediately. If multiple statuses have changed then all will be returned via the nested Array.
But IO.select
will also take a fourth argument, a timeout value in seconds. This will prevent IO.select
from blocking indefinitely. Pass in an Integer or Float value to specify a timeout. If the timeout is reached before any of the IO
statuses have changed, IO.select
will return nil
.
for_reading = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
for_writing = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
timeout = 10
ready = IO.select(for_reading, for_writing, for_reading, timeout)
# In this case IO.select didn't detect any status changes in 10 seconds,
# thus returned nil instead of a nested Array.
p ready #=> nil
You can also pass plain Ruby objects to IO.select
, so long as they respond to to_io
and return an IO
object. This is useful so that you don’t need to maintain a mapping of IO
object -> your domain object. IO.select
can work with your plain Ruby objects if they implement this to_io
method.
Events Other Than Read/Write
So far we’ve just looked at monitoring readable and writable state with IO.select
, but it can actually be shoehorned into a few other places.
EOF
If you’re monitoring a socket for readability and it receives an EOF, it will be returned as part of the readable sockets Array. Depending on which variant of read(2) you use at that point you might get an EOFError
or nil
when trying to read from it.
Accept
If you’re monitoring a server socket for readability and it receives an incoming connection, it will be returned as part of the readable sockets Array. Obviously, you’ll need to have logic to handle these kinds of sockets specially and use accept
rather than read
.
Connect
This one is probably the most interesting of the bunch. In the last chapter we looked at connect_nonblock
and noted that it raised Errno::EINPROGRESS
if it couldn’t connect immediately. Using IO.select
we can figure out if that background connect has yet completed:
require 'socket'
socket = Socket.new(:INET, :STREAM)
remote_addr = Socket.pack_sockaddr_in(80, 'google.com')
begin
# Initiate a nonblocking connection to google.com on port 80.
socket.connect_nonblock(remote_addr)
rescue Errno::EINPROGRESS
IO.select(nil, [socket])
begin
socket.connect_nonblock(remote_addr)
rescue Errno::EISCONN
# Success!
rescue Errno::ECONNREFUSED
# Refused by remote host.
end
end
The first part of this snippet is the same as last chapter. Try to do a connect_nonblock
and rescue Errno::EINPROGRESS
, which signifies that the connect is happening in the background. Then we enter the new code.
We ask IO.select
to monitor the socket for changes to its writable status. When that changes, we know that the underlying connect is complete. In order to figure out the status, we just try connect_nonblock
again! If it raises Errno::EISCONN
this tells us that the socket is already connected to the remote host. Success! A different exception signifies an error condition in connecting to the remote host.
This fancy bit of code actually emulates a blocking connect
. Why? Partly to show you what’s possible, but you can also imagine sticking your own code into this process. You could initiate the connect_nonblock
, go off and do some other work, then call IO.select
with a timeout. If the underlying connect isn’t finished then you can continue doing other work and check IO.select
again later.
We can actually use this little technique to build a pretty simple port scanner in Ruby. A port scanner attempts to make connections to a range of ports on a remote host and tells you which ones were open to connections.
require 'socket'
# Set up the parameters.
PORT_RANGE = 1..128
HOST = 'archive.org'
TIME_TO_WAIT = 5 # seconds
# Create a socket for each port and initiate the nonblocking
# connect.
sockets = PORT_RANGE.map do |port|
socket = Socket.new(:INET, :STREAM)
remote_addr = Socket.sockaddr_in(port, 'archive.org')
begin
socket.connect_nonblock(remote_addr)
rescue Errno::EINPROGRESS
end
socket
end
# Set the expiration.
expiration = Time.now + TIME_TO_WAIT
loop do
# We call IO.select and adjust the timeout each time so that we'll never
# be waiting past the expiration.
_, writable, _ = IO.select(nil, sockets, nil, expiration - Time.now)
break unless writable
writable.each do |socket|
begin
socket.connect_nonblock(socket.remote_address)
rescue Errno::EISCONN
# If the socket is already connected then we can count this as a success.
puts "#{HOST}:#{socket.remote_address.ip_port} accepts connections..."
# Remove the socket from the list so it doesn't continue to be
# selected as writable.
sockets.delete(socket)
rescue Errno::EINVAL
sockets.delete(socket)
end
end
end
This bit of code takes advantage of connect_nonblock
by initiating several hundred connections at once. It then monitors all of these using IO.select
and ultimately verifies which we were able to connect to successfully. Here’s the output I got when running this against archive.org:
archive.org:25 accepts connections...
archive.org:22 accepts connections...
archive.org:80 accepts connections...
archive.org:443 accepts connections...
Notice that the results aren’t necessarily in order. The first connections that finish the process are printed first. This a pretty common group of open ports, port 25 is reserved for SMTP, port 22 for SSH, port 80 for HTTP and port 443 for HTTPS.
High Performance Multiplexing
IO.select
ships with Ruby’s core library. But it’s the only solution for multiplexing that ships with Ruby. Most modern OS kernels support multiple methods of multiplexing. Almost invariably, select(2) is the oldest and least performing of these methods.
IO.select
will perform well with few connections, but its performance is linear to the number of connections it monitors. As it monitors more connections its performance will continue to degrade. Moreover, the select(2) system call is limited by something called FD_SETSIZE
, a C macro that’s defined as part of your local C library. select(2) is unable to monitor a file descriptor number higher than the number in FD_SETSIZE
(1024 on most systems). So IO.select
will be limited to monitoring at most 1024 IO
objects.
There are, of course, alternatives.
The poll(2) system call provides some slight differences over select(2) but is more or less on par. The (Linux) epoll(2) and (BSD) kqueue(2) system calls provide a more performing, modern alternative to select(2) and poll(2). For example, a high-performance networking toolkit like EventMachine will favour epoll(2) or kqueue(2) where possible.
Rather than trying to give examples of these particular system calls I’ll point you to the nio4r
Ruby gem, which provides a common interface to the all of these multiplexing solutions, favouring the most performant one available on your system.