Multiplexing Connections

Connection multiplexing refers to working with multiple active sockets at the same time. This doesn’t specifically refer to doing any work in parallel and is not related to multi-threading. An example will make it clear.

Given the techniques seen so far, let’s imagine how we might write a server that needs to process available data on several TCP connections at any given time. We’d probably use our newfound knowledge of non-blocking IO to keep ourselves from getting stuck blocking on any particular socket.


# Given an Array of connections.
connections = [<TCPSocket>, <TCPSocket>, <TCPSocket>]

# We enter an endless loop.
loop do
  # For each connection...
  connections.each do |conn|
    begin
      # Attempt to read from each connection in a non-blocking way,
      # processing any data received, otherwise trying the next
      # connection.
      data = conn.read_nonblock(4096)
      process(data)
    rescue Errno::EAGAIN
    end
  end
end

Does this work? It does! But it’s a very busy loop.

Each call to read_nonblock uses at least one system call and the server will be wasting a lot of cycles trying to read data when there is none. Remember that I said read_nonblock checks if there’s any data available using select(2)? Well, there’s a Ruby wrapper so that we can use select(2) directly for our own purposes.

select(2)

Here’s the saner method of processing available data on multiple TCP connections:


# Given an Array of connections.
connections = [<TCPSocket>, <TCPSocket>, <TCPSocket>]

loop do
  # Ask select(2) which connections are ready for reading.
  ready = IO.select(connections)

  # Read data only from the available connections.
  readable_connections = ready[0]
  readable_connections.each do |conn|
    data = conn.readpartial(4096)
    process(data)
  end
end

This example uses IO.select to greatly reduce the overhead of handling multiple connections. The whole purpose of IO.select is take in some IO objects and tell you which of those are ready to be read from or written to so you don’t have to take shots in the dark like we did above.

Let’s review some properties of IO.select.

It tells you when file descriptors are ready for reading or writing. In the above example we only passed one argument to IO.select, but there are actually three important Arrays that IO.select takes as arguments.

for_reading = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
for_writing = [<TCPSocket>, <TCPSocket>, <TCPSocket>]

IO.select(for_reading, for_writing, for_reading)

The first argument is an Array of IO objects which you want to read from. The second argument is an Array of IO objects which you want to write to. The third argument is an Array of IO objects for which you are interested in exceptional conditions. The vast majority of applications can ignore the third argument unless you’re interested in out-of-band data (more on that in the Urgent Data chapter). Note that even if you’re interested in reading from a single IO object you still must put it in an Array to pass to IO.select.

It returns an Array of Arrays. IO.select returns a nested array with three elements that correspond to its argument list. The first element will contain IO objects that can be read from without blocking. Note that this will be a subset of the Array of IO objects passed in as the first argument. The second element will contain IO objects that can be written to without blocking, and the third element will contain IO objects which have applicable exceptional conditions.

for_reading = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
for_writing = [<TCPSocket>, <TCPSocket>, <TCPSocket>]

ready = IO.select(for_reading, for_writing, for_reading)

# One Array is returned for each Array passed in as an argument.
# In this case none of the connections in for_writing were writable
# and one of connections in for_reading was readable.
p ready #=> [[<TCPSocket>], [], []]

It’s blocking. IO.select is a synchronous method call. Using it like we’ve seen thus far will cause it to block until the status of one of the passed-in IO objects changes. At this point it will return immediately. If multiple statuses have changed then all will be returned via the nested Array.

But IO.select will also take a fourth argument, a timeout value in seconds. This will prevent IO.select from blocking indefinitely. Pass in an Integer or Float value to specify a timeout. If the timeout is reached before any of the IO statuses have changed, IO.select will return nil.

for_reading = [<TCPSocket>, <TCPSocket>, <TCPSocket>]
for_writing = [<TCPSocket>, <TCPSocket>, <TCPSocket>]

timeout = 10
ready = IO.select(for_reading, for_writing, for_reading, timeout)

# In this case IO.select didn't detect any status changes in 10 seconds, 
# thus returned nil instead of a nested Array.
p ready #=> nil

You can also pass plain Ruby objects to IO.select, so long as they respond to to_io and return an IO object. This is useful so that you don’t need to maintain a mapping of IO object -> your domain object. IO.select can work with your plain Ruby objects if they implement this to_io method.

Events Other Than Read/Write

So far we’ve just looked at monitoring readable and writable state with IO.select, but it can actually be shoehorned into a few other places.

EOF

If you’re monitoring a socket for readability and it receives an EOF, it will be returned as part of the readable sockets Array. Depending on which variant of read(2) you use at that point you might get an EOFError or nil when trying to read from it.

Accept

If you’re monitoring a server socket for readability and it receives an incoming connection, it will be returned as part of the readable sockets Array. Obviously, you’ll need to have logic to handle these kinds of sockets specially and use accept rather than read.

Connect

This one is probably the most interesting of the bunch. In the last chapter we looked at connect_nonblock and noted that it raised Errno::EINPROGRESS if it couldn’t connect immediately. Using IO.select we can figure out if that background connect has yet completed:

require 'socket'

socket = Socket.new(:INET, :STREAM)
remote_addr = Socket.pack_sockaddr_in(80, 'google.com')

begin
  # Initiate a nonblocking connection to google.com on port 80.
  socket.connect_nonblock(remote_addr)
rescue Errno::EINPROGRESS
  IO.select(nil, [socket])

  begin
    socket.connect_nonblock(remote_addr)
  rescue Errno::EISCONN
    # Success!
  rescue Errno::ECONNREFUSED
    # Refused by remote host.
  end
end

The first part of this snippet is the same as last chapter. Try to do a connect_nonblock and rescue Errno::EINPROGRESS, which signifies that the connect is happening in the background. Then we enter the new code.

We ask IO.select to monitor the socket for changes to its writable status. When that changes, we know that the underlying connect is complete. In order to figure out the status, we just try connect_nonblock again! If it raises Errno::EISCONN this tells us that the socket is already connected to the remote host. Success! A different exception signifies an error condition in connecting to the remote host.

This fancy bit of code actually emulates a blocking connect. Why? Partly to show you what’s possible, but you can also imagine sticking your own code into this process. You could initiate the connect_nonblock, go off and do some other work, then call IO.select with a timeout. If the underlying connect isn’t finished then you can continue doing other work and check IO.select again later.

We can actually use this little technique to build a pretty simple port scanner in Ruby. A port scanner attempts to make connections to a range of ports on a remote host and tells you which ones were open to connections.

require 'socket'

# Set up the parameters.
PORT_RANGE = 1..128
HOST = 'archive.org'
TIME_TO_WAIT = 5 # seconds

# Create a socket for each port and initiate the nonblocking
# connect.
sockets = PORT_RANGE.map do |port|
  socket = Socket.new(:INET, :STREAM)
  remote_addr = Socket.sockaddr_in(port, 'archive.org')

  begin
    socket.connect_nonblock(remote_addr)
  rescue Errno::EINPROGRESS
  end

  socket
end

# Set the expiration.
expiration = Time.now + TIME_TO_WAIT

loop do
  # We call IO.select and adjust the timeout each time so that we'll never
  # be waiting past the expiration.
  _, writable, _ = IO.select(nil, sockets, nil, expiration - Time.now)
  break unless writable

  writable.each do |socket|
    begin
      socket.connect_nonblock(socket.remote_address)
    rescue Errno::EISCONN
      # If the socket is already connected then we can count this as a success.
      puts "#{HOST}:#{socket.remote_address.ip_port} accepts connections..."
      # Remove the socket from the list so it doesn't continue to be
      # selected as writable.
      sockets.delete(socket)
    rescue Errno::EINVAL
      sockets.delete(socket)
    end
  end
end

This bit of code takes advantage of connect_nonblock by initiating several hundred connections at once. It then monitors all of these using IO.select and ultimately verifies which we were able to connect to successfully. Here’s the output I got when running this against archive.org:

archive.org:25 accepts connections...
archive.org:22 accepts connections...
archive.org:80 accepts connections...
archive.org:443 accepts connections...

Notice that the results aren’t necessarily in order. The first connections that finish the process are printed first. This a pretty common group of open ports, port 25 is reserved for SMTP, port 22 for SSH, port 80 for HTTP and port 443 for HTTPS.

High Performance Multiplexing

IO.select ships with Ruby’s core library. But it’s the only solution for multiplexing that ships with Ruby. Most modern OS kernels support multiple methods of multiplexing. Almost invariably, select(2) is the oldest and least performing of these methods.

IO.select will perform well with few connections, but its performance is linear to the number of connections it monitors. As it monitors more connections its performance will continue to degrade. Moreover, the select(2) system call is limited by something called FD_SETSIZE, a C macro that’s defined as part of your local C library. select(2) is unable to monitor a file descriptor number higher than the number in FD_SETSIZE (1024 on most systems). So IO.select will be limited to monitoring at most 1024 IO objects.

There are, of course, alternatives.

The poll(2) system call provides some slight differences over select(2) but is more or less on par. The (Linux) epoll(2) and (BSD) kqueue(2) system calls provide a more performing, modern alternative to select(2) and poll(2). For example, a high-performance networking toolkit like EventMachine will favour epoll(2) or kqueue(2) where possible.

Rather than trying to give examples of these particular system calls I’ll point you to the nio4r Ruby gem, which provides a common interface to the all of these multiplexing solutions, favouring the most performant one available on your system.