Server Sent Events (SSE) を受信する

Recipe ID: net-014

サーバーからプッシュ通知のようなリアルタイム更新を受け取るための Server-Sent Events (SSE) の実装方法を解説します。

Web 標準の EventSource API は手軽ですが、以下の重大な制約があります。
1. CORS の影響を受ける: サーバー側で Access-Control-Allow-Origin が必要。
2. ヘッダーのカスタマイズができない: Authorization: Bearer ... などの認証ヘッダーを標準仕様では送信できない。

Tauri の HTTP プラグインを使用することで、これらの制約を回避して SSE を受信する方法も併せて紹介します。

前提条件

プラグインのインストール

HTTP プラグインを使用する場合は、以下のコマンドでインストールしてください。

npx @tauri-apps/cli add http

Permissions (権限) の設定

Rust 側や HTTP プラグインを使用する場合は、接続先を許可リストに追加してください。
標準の EventSource (JavaScript) を使う場合は、Tauri の permissions 設定は不要です(ブラウザの通常のセキュリティモデルに従います)。

{
  "permissions": [
    ...,
    {
      "identifier": "http:default",
      "allow": [{ "url": "https://api.example.com/*" }]
    }
  ]
}

1. 標準 API (EventSource) を使用する場合

CORS 設定が適切で、認証が Cookie ベース(または不要)な場合は、ブラウザ標準の API が最も簡単です。

// 接続を開始
function startSseListener() {
  const evtSource = new EventSource('https://api.example.com/events');

  evtSource.onmessage = (event: MessageEvent) => {
    console.log('New message:', event.data);
  };

  evtSource.onerror = (err: Event) => {
    console.error('SSE Error:', err);
  };

  return evtSource;
}

// 使用例
const source = startSseListener();

// 終了用
// source.close();

2. HTTP プラグインで受信する場合 (推奨)

認証ヘッダーが必要だったり、CORS を回避したい場合は、@tauri-apps/plugin-httpfetch を使い、レスポンスボディをストリーム (ReadableStream) として読み込みます。

この方法なら 認証ヘッダー (Authorization) も自由に設定でき、CORS も回避 できます。

実装ユーティリティ

SSE のストリーム形式 (data: ...\n\n) をパースするための簡易クラスを作成します。
(本格的なアプリでは eventsource-parser などのライブラリ利用を推奨)

import { fetch } from '@tauri-apps/plugin-http';

async function connectToSse(url: string, token: string, signal?: AbortSignal): Promise<void> {
  try {
    const response = await fetch(url, {
      method: 'GET',
      headers: {
        'Authorization': `Bearer ${token}`, // カスタムヘッダーが可能
        'Accept': 'text/event-stream',
      },
      signal, // fetch に signal を渡すことで切断可能にする
    });

    if (!response.ok || !response.body) {
      throw new Error(`Failed to connect: ${response.status}`);
    }

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    // ループでストリームを読み続ける
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value, { stream: true });
      parseSseChunk(chunk);
    }
  } catch (err) {
    if (signal?.aborted) {
      console.log('SSE connection aborted by user');
    } else {
      console.error('SSE Error:', err);
      throw err;
    }
  }
}

// 簡易的なパースロジック (行ごとの分割)
function parseSseChunk(chunk: string) {
  const lines = chunk.split('\n');
  for (const line of lines) {
    if (line.startsWith('data: ')) {
      const data = line.slice(6).trim();
      if (data === '[DONE]') {
        // 終了シグナル受信時など
      } else {
        try {
          const json = JSON.parse(data);
          console.log('Received Event:', json);
        } catch (e) {
          console.log('Received Text:', data);
        }
      }
    }
  }
}

// --- 使用例: 切断機能付き ---
const controller = new AbortController();

// 接続開始
connectToSse('https://api.example.com/sse', 'my-secret-token', controller.signal);

// 任意のタイミングで切断する場合
// controller.abort();

3. Rust 側で受信する場合

非常に高いパフォーマンスが求められたり、バックグラウンドでの永続的な接続が必要な場合は、Rust 側で reqwesteventsource-stream などのクレートを使って受信し、フロントエンドにイベント (emit) する構成も検討してください。

Cargo.toml:

[dependencies]
reqwest = { version = "0.11", features = ["stream"] }
futures = "0.3"

Rust (src-tauri/src/lib.rs):

use tauri::{AppHandle, Emitter, State};
use futures::StreamExt;
use std::sync::Mutex;

// 接続ハンドルを保持するためのステート
struct SseState(Mutex<Option<tauri::async_runtime::JoinHandle<()>>>);

#[tauri::command]
async fn start_sse_listener(app: AppHandle, state: State<'_, SseState>, url: String) -> Result<(), String> {
    // 既存の接続があれば切断
    if let Some(handle) = state.0.lock().unwrap().take() {
        handle.abort();
    }

    let client = reqwest::Client::new();
    let mut stream = client.get(&url)
        .send()
        .await
        .map_err(|e| e.to_string())?
        .bytes_stream();

    let handle = tauri::async_runtime::spawn(async move {
        while let Some(item) = stream.next().await {
            if let Ok(bytes) = item {
                if let Ok(text) = String::from_utf8(bytes.to_vec()) {
                    // 単純化のためそのまま送信
                    let _ = app.emit("sse-event", text);
                }
            }
        }
    });

    // ハンドルを保存
    *state.0.lock().unwrap() = Some(handle);
    
    Ok(())
}

#[tauri::command]
async fn stop_sse_listener(state: State<'_, SseState>) -> Result<(), String> {
    if let Some(handle) = state.0.lock().unwrap().take() {
        handle.abort();
    }
    Ok(())
}

#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub fn run() {
    tauri::Builder::default()
        .manage(SseState(Mutex::new(None)))
        .invoke_handler(tauri::generate_handler![start_sse_listener, stop_sse_listener])
        .run(tauri::generate_context!())
        .expect("error while running tauri application");
}
注意: You must call .manage() before using this command というエラーが出る場合は、run 関数内の tauri::Builder チェーンで .manage(SseState(Mutex::new(None))) が正しく呼び出されているか確認してください。State を使用するコマンドには、事前の初期化と登録が必須です。

Frontend (TypeScript):

import { listen } from '@tauri-apps/api/event';
import { invoke } from '@tauri-apps/api/core';

async function startSse() {
  await invoke('start_sse_listener', { url: 'https://api.example.com/events' });
}

async function stopSse() {
  await invoke('stop_sse_listener');
}

// 開始
startSse();

// 受信
listen<string>('sse-event', (event) => {
  console.log('Received from Rust:', event.payload);
});

// 停止したい時
// stopSse();

まとめ

方法CORS 回避カスタムヘッダー実装難易度推奨ケース
標準 EventSource一般公開されている単純な API
Plugin Fetch (Stream)認証が必要な API / CORS 回避が必要な場合
Rust Backendバックグラウンド処理や高度な制御が必要な場合