Carpe Diem

備忘録

I/O Multiplexing(I/O多重化)

概要

christina04.hatenablog.com

で一度まとめましたが、まだ理解があやふやなところがありました。

その後

Working With TCP Sockets

を読んでようやくストンと理解できたのでまとめます。

経緯

たくさんのリクエストを受けるにはどうしたらいいか、で

  • マルチプロセス
  • マルチスレッド

といった手法が提案されてきました。

前者はシンプルであるもののリソースの過剰な使用が問題であり、後者は前者ほどリソースは使わないもののやはり無駄は大きく、かつ共有リソースのハンドリングの複雑性が問題になります。

そもそもボトルネックになってるのはどこかというと、

  • コネクション確立後、クライアントからリクエストが送られてくるまでの待機時間
  • DBや外部APIの処理をしている時間

といったread(2)やwrite(2)で実際にデータを読み書きする準備が整うまでの待ち時間が大半です。
そんな待ち時間に処理をブロックしないのがNon-Blocking I/Oです。

Non-Blocking I/O検証

Rubyコードで検証してみます。

blockingなコード

require 'socket'

Socket.tcp_server_loop(4481) do |connection|
  while data = connection.read(4096) do  # ブロックする
    puts data
  end
  connection.close
end

read()ブロッキングなメソッドであるため、クライアントがコネクション確立後にリクエストを送らない限り、ずっと処理をブロックしてしまいます。
なので貧弱なネットワークの場合などでデータ転送が非常に遅かったりすると、ただただ待ち時間のみ増えてリソースを無駄遣いすることになります。

non blockingなコード(ビジーループ)

require 'socket'

Socket.tcp_server_loop(4481) do |connection|
  loop do
    begin
      puts connection.read_nonblock(4096)  # ブロックしない
    rescue Errno::EAGAIN  # リクエストデータが来るまでずっとEAGAIN
      puts "EAGAIN"
      retry
    rescue EOFError
      break
    end
  end
  connection.close
end

先ほどの箇所をread_nonblock()にすると、ブロックされることはなくなります。
内部ではfdをノンブロッキングモードにしてread(2)システムコールしています。
そうするとブロックはしなくなりますが、代わりにソケットの準備ができていないと(=クライアントからデータを受信しないと)、エラーEAGAINが返ります。
なので上記コードでは常にそれをハンドリングし、リクエストデータを受信するまでretryしています。

メリット

先ほどと違ってブロックされないのでその間の時間を自由に扱える。

デメリット

リクエストデータが来るまで毎回read(2)システムコールをしてチェックする(=カーネルへのコンテキストスイッチが走る)ので無駄が大きい。

non blockingなコード(監視)

require 'socket'

Socket.tcp_server_loop(4481) do |connection|
  loop do
    begin
      puts connection.read_nonblock(4096)  # ブロックしない
    rescue Errno::EAGAIN
      puts "EAGAIN"
      IO.select([connection])  # selectでソケットを監視。ブロックする
      puts "socket is ready"
      retry
    rescue EOFError
      break
    end
  end
  connection.close
end

IO.select()はソケットの状態がreadableになるまでブロックします。引数にソケットの配列を渡すと、返り値にはreadableなソケットの配列が返ります。
上記コードでは先ほどのビジーループのコードと比較しやすいよう、IO.select()は単にブロックするのに使ってます。

メリット

毎回read(2)システムコールせずに済む。

I/O Multiplexing

Non-Blocking I/Oを利用して複数コネクションを扱うと考えてみると、シングルプロセス・シングルスレッドであるにも関わらず複数のコネクションを並行してハンドリングできることが分かります。

先程は1コネクションでの話でしたが、複数コネクションの場合のコードは以下のようになります。

connections = [<TCPSocket>, <TCPSocket>, <TCPSocket>]

loop do
  ready = IO.select(connections)  # 複数のI/Oを監視。準備完了したソケットのみ返す
  readable_connections = ready[0]
  readable_connections.each do |conn|
    data = conn.readpartial(4096)
    process(data)
  end
end

複数のコネクションを監視し、準備が整ったコネクション(1つの場合も複数の場合もある)を返します。

もちろんソケット準備が整った後の処理(=コールバック)は1つずつ処理されるので並行処理ではありません。しかしコネクション(=I/O)の扱い自体は並行であることが分かります。

これがI/O Multiplexing(I/O多重化)です。

その他

readはいつブロックされる?

readメソッドは

  • Rubyの内部バッファにデータが有る
  • OSカーネルのバッファにデータが有る

状態を除いてブロックされます。

writeはいつブロックされる?

基本的にwriteがブロックされることはあまりありません。しかし

  • TCP接続の受信側がデータの受信をまだ確認しておらず、許可されている限りのデータを送信した
  • TCP接続の受信側がデータの受信を確認したが、受信中のデータを処理していない

これらのケースの場合はTCP輻輳制御によりブロックされます。

IO.selectでブロックするなら新しいコネクションはどこから得る?

実際のイベントループ型のサーバはどうしているか、です。

あらかじめ↓を参考にソケット通信の流れを理解していると分かりやすいです。

christina04.hatenablog.com

先の例ではacceptした後のソケットをselect(2)でデータの到着を監視してました。 イベントループモデルはlistenするソケットもselect(2)で監視して、コネクションが来たかどうかチェックします。

def initialize(port = 21)
  @control_socket = TCPServer.new(port)
  trap(:INT) { exit }
end

def run
  @handles = {}
  loop do
    to_read = @handles.values.select(&:monitor_for_reading?).map(&:client)
    to_write = @handles.values.select(&:monitor_for_writing?).map(&:client)

    # IO.select only returns readable/writable sockets.
    # If readables include @control_socket, it means that there's a new client connection.
    readables, writables = IO.select(to_read + [@control_socket], to_write)

    readables.each do |socket|
      if socket == @control_socket
        io = @control_socket.accept
        connection = Connection.new(io)
        @handles[io.fileno] = connection
      else
        # In this case, 'readable' is regular client connection.
        connection = @handles[socket.fileno]
        begin
          data = socket.read_nonblock(CHUNK_SIZE)
          connection.on_data(data)
        # In this case, 'readable' is not readable.
        rescue Errno::EAGAIN
        # In this case, the client disconnected.
        rescue EOFError
          @handles.delete(socket.fileno)
        end
      end
    end

    writables.each do |socket|
      connection = @handles[socket.fileno]
      connection.on_writable
    end
  end
end

このようにlistenするソケットも監視対象に入れています。これにより

  • 新しいコネクションが来る
  • リクエストデータが来る

のどちらかを満たせばIO.select()のブロックを抜けることができます。
新しいコネクションをハンドリングしつつ、ソケットの準備ができ次第コールバックを発火します。

まとめ

メモリ共有、パイプ、Unixドメインソケットによるプロセス間通信に比べ、ネットワークを経由する処理は非常に遅いです。
そしてそれによる待ち時間がリソースの無駄遣いを生みます。

I/O多重化の考えはマルチプロセスやマルチスレッドとは関係なく、Non-Blocking I/Oを利用して並行してI/Oタスクをカーネル側に委ね、準備が整い次第ハンドリングするという手法です。

ソース