Pattern: Thread Pool

Overview

This pattern is to Preforking what Thread Per Connection is to Process Per Connection. Much like Preforking, this pattern will spawn a number of threads when the server boots and defer connection handling to each independent thread.

The flow of this architecture is the same as the previous, but substitute ‘thread’ for ‘process’.

Implementation

require 'socket'
require 'thread'
require_relative '../command_handler'

module FTP
  Connection = Struct.new(:client) do
    CRLF = "\r\n"

    def gets
      client.gets(CRLF)
    end

    def respond(message)
      client.write(message)
      client.write(CRLF)
    end

    def close
      client.close
    end
  end

  class ThreadPool
    CONCURRENCY = 25

    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def run
      Thread.abort_on_exception = true
      threads = ThreadGroup.new

      CONCURRENCY.times do
        threads.add spawn_thread
      end

      sleep
    end

    def spawn_thread
      Thread.new do
        loop do
          conn = Connection.new(@control_socket.accept)
          conn.respond "220 OHAI"

          handler = CommandHandler.new(conn)

          loop do
            request = conn.gets

            if request
              conn.respond handler.handle(request)
            else
              conn.close
              break
            end
          end
        end
      end
    end
  end
end

server = FTP::ThreadPool.new(4481)
server.run

Again, two main methods here. One spawns the threads, the other encapsulates the spawning and thread behaviour. Since we’re working with threads, we’ll once again be using the Connection class.

    CONCURRENCY = 25

    def initialize(port = 21)
      @control_socket = TCPServer.new(port)
      trap(:INT) { exit }
    end

    def run
      Thread.abort_on_exception = true
      threads = ThreadGroup.new

      CONCURRENCY.times do
        threads.add spawn_thread
      end

      sleep
    end

The run method creates a ThreadGroup to keep track of all the threads. ThreadGroup is a bit like a thread-aware Array. You add threads to the ThreadGroup, but when a member thread finishes execution it’s silently dropped from the group.

You can use ThreadGroup#list to get a list of all the threads currently in the group, all of which will be active. We don’t actually use this in this implementation but ThreadGroup would be useful if we wanted to act on all active threads (to join them, for instance).

Much like in the last chapter, we simply call the spawn_thread method as many times as CONCURRENCY calls for. Notice how the CONCURRENCY number is higher here than in Preforking? Again, that’s because threads are lighter weight and, therefore, we can have more of them. Just keep in mind that the MRI GIL mitigates some of this gain.

The end of this method calls sleep to prevent it from exiting. The main thread remains idle while the pool does the work. Theoretically it could be doing its own work monitoring the pool, but here it just sleeps to prevent it from exiting.

    def spawn_thread
      Thread.new do
        loop do
          conn = Connection.new(@control_socket.accept)
          conn.respond "220 OHAI"

          handler = CommandHandler.new(conn)

          loop do
            request = conn.gets

            if request
              conn.respond handler.handle(request)
            else
              conn.close
              break
            end
          end
        end
      end
    end

This method is pretty unexciting. It follows the same pattern as Preforking. Namely, spawn a thread that loops around the connection handling code. Again, the kernel ensures that a single connection can only be accepted into a single thread.

Considerations

Much of the considerations of this pattern are shared with the previous.

Besides the obvious thread vs. process tradeoff this pattern will not need to spawn threads each time it handles a connection, does not have any crazy locks or race conditions, yet still provides parallel processing.

Examples