ユーザ用ツール

サイト用ツール


java:tomcat:comet

Comet with Tomcat 6

TomcatでCometなサーブレットを作る

Tomcat 6では、NIO APIを使用した新しいHTTPコネクタが追加されました。これは非ブロッキングIOをサポートし、複数のリクエストを同時並行して受け付けることができます。Tomcat 6はこの新しい機能を応用してCometをサポートしています。新しく生まれ変わったHTTPコネクタのテストも兼ねて、Cometサーブレットを作成して遊んでみよう。 catalina.jarをクラスパスに追加

Comet関連のAPIはJ2EE標準のAPIではなくTomcatプロジェクトの独自実装です。このため、Comet関連クラスは servlet.jarではなく、Tomcat自身(catalina.jar)に含まれています。Cometなサーブレットをコンパイルするには catalina.jarにクラスパスを通す必要があります。開発環境がEclipse 3.2+Tomcatプラグインならば、

  1. プロジェクトを右クリック、Properties → Build Path → Configure Build Path を開く
  2. Add Valiableを選択
  3. TOMCAT_HOMEを選択
  4. Extendをクリックし、libの中にあるcatalina.jarを追加

当然、Tomcat 6用のCometアプリケーションは、Tomcat 6以外のサーブレットコンテナで実行する事はできません。Jetty 6も独自にCometをサポートしていますが両者に互換性はありません。

Advanced I/Oを有効にする

デフォルトでは従来のHTTPコネクタを使用するため、Cometは無効になっています。非同期IOを実現するNIO HTTPコネクタを有効にするため、server.xmlのConnector要素を書き換えます。

<Connector port="8080" protocol="HTTP/1.1" 
               connectionTimeout="20000" 
               redirectPort="8443" />

protocol属性を以下のように修正します。

<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" 
               connectionTimeout="60000" 
               redirectPort="8443" />

サーブレットに接続できない場合

NIOのHTTPコネクタを使用した場合、Javaのバージョンによっては、サーブレットに接続できないことがあります。 java.io.Socket#setTrafficClassメソッドでSocketExceptionがスローされている場合は、以下のJVM起動オプションを設定して回避します。

-Djava.net.preferIPv4Stack=true

サーバサイドのコード

サーバサイドは、通常のJavaプログラミングと変わりません。

CometProcessor

TomcatでCometを処理するサーブレットはCometProcessorインターフェースを実装します。そしてdoGetやdoPost等のメソッドはオーバーライドせずに、CometProcessorで定義されたeventメソッドを実装し、Comet処理を記述します。

CometEvent

eventメソッドは、接続開始時、読み込み準備完了、接続終了時、エラー発生時に呼ばれます。イベントタイプを取得し、処理を振り分けます。

メッセージが追加されると、接続しているすべてのクライアントに対して、そのメッセージを送ります。

import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
 
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
 
import org.apache.catalina.CometEvent;
import org.apache.catalina.CometProcessor;
 
/** Cometサーブレット */
public class CometTestServlet extends HttpServlet implements CometProcessor {
    private static final long serialVersionUID = 1L;
    protected transient Set&lt;HttpServletResponse&gt; responseSet = new HashSet&lt;HttpServletResponse&gt;();
    protected transient MessageSender sender;
 
    public void init() {
        this.sender = new MessageSender("CometResponse");
        this.sender.setDaemon(true);
        this.sender.start();
    }
 
    public void destroy() {
        sender.quit();
        sender = null;
        responseSet.clear();
    }
 
    <em>public void event(CometEvent cometEvent) throws IOException, ServletException</em> {
        switch (cometEvent.getEventType()) {
        case BEGIN:
           log("CometBegin")
           cometBegin(cometEvent);
           break;
 
        case READ:
            log("CometRead");
            cometRead(cometEvent);
            break;
 
        case END:
            log("CometEnd");
            cometClose(cometEvent);
            break;
 
        case ERROR:
            log("CometError" + cometEvent.getEventSubType().toString());
            cometClose(cometEvent);
            break;
        }
    }
 
    private void cometClose(CometEvent cometEvent) throws IOException {
        synchronized (sender) {
            responseSet.remove(cometEvent.getHttpServletResponse());
        }
        cometEvent.close();
    }
 
    private void cometRead(CometEvent cometEvent) throws IOException {
        HttpServletRequest request = cometEvent.getHttpServletRequest();
        HttpServletResponse response = cometEvent.getHttpServletResponse();
 
        if (!"get".equalsIgnoreCase(request.getMethod())) {
            cometClose(cometEvent);
        }
    }
 
    private void cometBegin(CometEvent cometEvent) throws IOException {
        HttpServletRequest request = cometEvent.getHttpServletRequest();
 
        request.setCharacterEncoding("UTF-8");
        HttpServletResponse response = cometEvent.getHttpServletResponse();
        response.setCharacterEncoding("UTF-8");
 
        if ("get".equalsIgnoreCase(request.getMethod())) {
            synchronized (sender) {
                responseSet.add(response);
            }
        }
    }
 
    /** 疑似PUSHを実現するロジック */
    public class MessageSender extends Thread {
        protected List&lt;String&gt; messages = new ArrayList&lt;String&gt;();
        protected boolean running = true;
 
        public MessageSender(String name) {
            super(name);
        }
 
        public void quit() {
            running = false;
            this.interrupt();
        }
 
        /** メッセージを追加する */
        public void send (String message) {
            synchronized (this) {
                messages.add(message);
                notifyAll();    // メッセージが追加されたら処理スレッドを再開
            }
        }
 
        public void run() {
            while (running) {
                if (messages.size() == 0) {
                    try {
                        synchronized (this) {
                            wait();   // メッセージが追加されるまで待つ
                        }
                    } catch (InterruptedException ignore) {
                        log("interrupted", ignore);
                    }
 
                synchronized (this) {
                    for (HttpServletResponse response: responseSet) {
                        response.setContentType("text/plain");
                        response.setCharacterEncoding("UTF-8");
                        try {
                            PrintWriter writer = response.getWriter();
                            for (String message: messages) {
                                writer.write(message);
                                writer.write(&#39;\n&#39;);
                            }
                            writer.flush();
                            writer.close();
                            response.flushBuffer();
                        } catch (IOException e) {
                            log("Error on writing message", e);
                        }
                    }
                    messages.clear();
                    responseSet.clear();
                }
            }
        }
    }
}

これはコンセプトコードであるため、例外が発生しないように最低限のsynchronizedしていますが、データ整合性を全く気にしていないので注意してください。

クライアントサイド

ブラウザ側は、XMLHttpRequestを使ってサーバとの接続を維持し続けます。(かなり適当な処理なので真似しないように)

// 要 prototype.js v1.5.1
Event.observe(window, "load", wait);
 
function wait() {
    new Ajax.Request(
        $("ajaxUpdaterUrl").href + "?time=" + (new Date()).getTime(),  // IEのキャッシュ対策
        {
            method: "get",
            onSuccess: function(transport){ appendMessage(transport); wait(); }
            onFailure: function(transport){ wait(); }
        }
    );
}
 
function appendMessage(transport) {
   // 追加メッセージに何らかの処理を行う
}
java/tomcat/comet.txt · 最終更新: 2007/12/13 01:23 by 127.0.0.1