Carpe Diem

備忘録

Server-Sent Eventsを使ってみる

概要

ChatGPTでは以下のGIFのように、文字列が少しずつ出てくるUXになっています。

このUXの実現方法として Server-Sent Events というものがあるので、それに必要な知識や使い方を説明します。

環境

  • Go 1.24.3

Server-Sent Events

Server-Sent Eventsとは

Server-Sent Events(SSE)はサーバ→クライアントへの単方向のリアルタイム技術で、次のようなユースケースに適しています。

  • ストリーミングAPI(ChatGPTなど)
    • AIの応答生成をリアルタイムに文字列で流す
  • リアルタイム通知
    • SNSの通知バッジ、メール着信、コメント通知などに軽量に適応可能
  • 進行状況表示
    • バックエンド処理の進捗やCI/CDログストリーム
  • 管理ダッシュボードの更新
    • データのリアルタイムな反映(例:売上、トラフィック、モニタリングなど)

シーケンス図

SSEのシーケンス図は次のようになっています。

必要とするフィールド

SSEでサーバからクライアントに送信されるフィールドは次です。

フィールド名 必須 説明 備考
data: 必須 実際のメッセージ本文 複数行対応。data: を複数回使うと改行付きで連結される
id: 推奨(オプション) イベントID(再接続時の復元用) Last-Event-ID ヘッダーと連携して再送制御に利用される
event: オプション イベント種別名(カスタムイベント名) addEventListener("event名") でハンドリング可
retry: オプション クライアントの再接続間隔(ミリ秒) クライアント側のデフォルト(3000ms)を上書きできる
:(コメント) オプション コメントまたは keep-alive ping クライアントに表示されず、接続維持やデバッグに使える

ポイント

  • id: は省略可能だが、実運用では Last-Event-ID による復旧が必要なので「ほぼ必須」
  • event: がない場合、イベントはすべて message として処理される(es.onmessage)
  • retry: は指定しないとクライアントの既定(3000ms)になる

他のリアルタイム技術との違い

SSEとよく比較されるリアルタイム技術としてWebSocketがあります。
簡単な比較表を以下に書きます。

特徴 SSE WebSocket
通信方向 単方向(サーバ→クライアント) 双方向
プロトコル HTTP(text/event-stream) 独自プロトコル(ws://)
ブラウザ対応 ◎(ネイティブで簡単) ◎(ただしAPIの違いあり)
保守性/簡易性 ◎(軽量) ○(接続・再接続管理が必要)
適した用途 通知、ログ、進捗表示など チャット、ゲーム、双方向制御

SSE は「単方向」「HTTP ベースで手軽」がメリットです。
WebSocket の双方向通信による柔軟性はありませんが、逆にLBやCDNとの相性が良いです。
なぜならSSEは普通の HTTP ストリームなので、LBやCDN 側をほとんど特別扱いしなくて済むのです。

具体的な実装

それでは具体的な実装について説明します。

サーバサイド

SSEのハンドリングをする実装は以下の様な感じになります。

func handleStream(w http.ResponseWriter, r *http.Request) {
    // SSEヘッダーの設定
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    // クライアントが切断したことを検知するためのコンテキスト
    ctx := r.Context()

    // Last-Event-IDヘッダーの取得
    lastEventId := 0
    if idStr := r.Header.Get("Last-Event-ID"); idStr != "" {
        if id, err := strconv.Atoi(idStr); err == nil {
            lastEventId = id
            log.Printf("Client requested to resume from ID: %d", lastEventId)
        }
    }

    // メッセージを送信するためのチャネル
    messageChan := make(chan Message)
    pingTicker := time.NewTicker(15 * time.Second)
    defer pingTicker.Stop()

    // メッセージを生成するゴルーチン
    go func() {
        for i := lastEventId; i < len(chunks); i++ {
            select {
            case <-ctx.Done():
                return
            default:
                messageChan <- getChunk(i)
                time.Sleep(100 * time.Millisecond)
            }
        }
    }()

    // メッセージをクライアントに送信
    for {
        select {
        case <-ctx.Done():
            log.Println("Client disconnected")
            return
        case <-pingTicker.C:
            fmt.Fprintf(w, ": ping\n\n")
            w.(http.Flusher).Flush()
        case message := <-messageChan:
            jsonData, err := json.Marshal(message)
            if err != nil {
                log.Printf("Error marshaling JSON: %v", err)
                continue
            }
            fmt.Fprintf(w, "id: %d\ndata: %s\n\n", message.ID, jsonData)
            w.(http.Flusher).Flush()

            if message.Done {
                return
            }
        }
    }
}

ポイント

  • Content-Typetext/event-streamを使うこと
  • Flusher で都度Flush()すること
  • 切断時に再開できるようid: をつける
  • : ping コメントによる keep-alive
  • r.Context().Done() によるクライアントからの切断検出

クライアントサイド

クライアントサイドの実装は以下のようになります。

function startStream() {
    if (eventSource) eventSource.close();

    messageDiv.innerHTML = '';
    startBtn.disabled = true;
    stopBtn.disabled = false;

    eventSource = new EventSource('http://localhost:8080/stream');

    eventSource.onopen = clearErrorTimeout;

    eventSource.onmessage = function(event) {
        clearErrorTimeout();
        try {
            const data = JSON.parse(event.data);
            messageDiv.textContent = data.content;
            updateStatus(`Last received ID: ${data.id}`);
            if (data.done) {
                stopStream();
            }
        } catch (error) {
            console.error('Error parsing JSON:', error);
        }
    };

    eventSource.onerror = function(error) {
        console.error('Stream Error:', error);
        updateStatus('Connection error. Waiting 10 seconds before disconnect...');
        clearErrorTimeout();
        errorTimeout = setTimeout(() => {
            stopStream();
            updateStatus('Disconnected after error. Click Start to reconnect.');
        }, 10000);
    };
}

function stopStream() {
    if (eventSource) {
        eventSource.close();
        eventSource = null;
    }
    startBtn.disabled = false;
    stopBtn.disabled = true;
    clearErrorTimeout();
}

ポイント

  • EventSource を使う
  • .onmessageで受け取ったデータを逐次処理
  • .onerror による検出と再接続までの待機
    • 再接続のためのタイマー

動作確認

今回の実装を動作確認すると、冒頭のようなUXになります。

途中で切断したら?

途中でネットワークが途切れたような状況では、EventSourceの再接続によってコネクションが復帰したタイミングで途中から再開できます。

ここで一度デプロイのようにサーバを落として接続を切っていますが、再起動後は再接続によって途中から取得できています。

事前に理解しておくべきこと

3つのkeepaliveについて

SSEでは接続を維持するためKeepAliveを理解しておく必要がありますが、前提として 3つの層が存在します。

方法 役割 備考
TCPレベル TCP Keep-Alive OSが生死確認(ACK) OS設定が必要
HTTPレベル Connection: keep-alive TCP再利用の意思表示 HTTP/1.1 ではデフォルト
アプリレベル(SSE) : ping コメント 実際にデータを流し「生きている」ことを示す SSEでは最も重要

結論としてはSSEにおける : ping でほぼ十分です。なぜなら

  • プロキシやロードバランサ(Cloudflare, Nginx, AWS ALB等)が切断しないよう、通信アクティビティを見せる
  • TCPコネクション上でもデータが流れるため、OSのTCPスタックも生きてると判断
    • 実質、TCP Keep-Alive の代替として機能

となるためです。

しかし一部のプロキシやファイアウォールは TCP Keep-Alive(SYN/ACKレベル)でしか監視していないケースもあるので、

  • TCP Keep-Alive
  • : pingコメント

の二つを意識しましょう。

よくある event: タイプとユースケース

eventはカスタムイベントを用意することで、全てをonmessageで処理せず、状況に応じて責務を明確にすることができます。

イベントタイプ名 ユースケース例
message
(デフォルト)
明示的に event: を指定しない場合の標準イベント名。すべて es.onmessage で受け取る
update データ更新通知(例:チャットメッセージ、プロフィール変更)
delete データ削除通知(例:アイテム削除、通知の消去)
alert 重要なアラート、バナー表示、管理者通知など
status サーバステータス・ジョブ進捗・接続状態変更など
notification 汎用的な通知(新着のお知らせ、バッジ更新)
heartbeat 定期的な状態確認(ping代わりにも)
error エラー通知(クライアントに警告などを表示)
custom-xxx 任意のアプリ固有イベント(例:custom-order-shipped

クライアントでの受信例は以下です。

const es = new EventSource("/events");

es.addEventListener("update", (event) => {
  console.log("更新イベント:", event.data);
});

es.addEventListener("alert", (event) => {
  showAlert(event.data);
});

es.onmessage = (event) => {
  // event: が指定されていない場合(=message)
  console.log("通常メッセージ:", event.data);
};

EventSourceがやってくれること、やってくれないこと

Last-Event-IDの保持など、実は開発者が何もしないでもEventSourceがよろしくやってくれるので、そういった要素を下記の表にまとめました。

機能 開発者の実装有無 補足説明
サーバへの接続 不要(自動) new EventSource("/events") だけでOK
イベントの受信 (onmessage) 必要 event.data などで処理
再接続(接続切れ時) 不要(自動) デフォルトで3秒ごとにリトライ
Last-Event-ID の保存/送信 不要(自動) サーバが id: を送れば勝手に覚えて送る
接続エラーの検出 任意(onerror 再接続ロジックのログに便利
複数イベントタイプの処理 任意(addEventListener event: foo を分けて処理可能

その他

サンプルコード

今回のサンプルコードはこちらです。

github.com

まとめ

Server-Sent Eventsの基本的な知識と使い方を紹介してみました。

ただ現状のままだとスケール性に課題があるので、次回はアーキテクチャのデザインパターンを含めて応用例を紹介したいと思います。

参考