目次
Comet with Tomcat 6
TomcatでCometなサーブレットを作る
Tomcat 6では、NIO APIを使用した新しいHTTPコネクタが追加されました。これは非ブロッキングIOをサポートし、複数のリクエストを同時並行して受け付けることができます。Tomcat 6はこの新しい機能を応用してCometをサポートしています。新しく生まれ変わったHTTPコネクタのテストも兼ねて、Cometサーブレットを作成して遊んでみよう。 catalina.jarをクラスパスに追加
Comet関連のAPIはJ2EE標準のAPIではなくTomcatプロジェクトの独自実装です。このため、Comet関連クラスは servlet.jarではなく、Tomcat自身(catalina.jar)に含まれています。Cometなサーブレットをコンパイルするには catalina.jarにクラスパスを通す必要があります。開発環境がEclipse 3.2+Tomcatプラグインならば、
- プロジェクトを右クリック、Properties → Build Path → Configure Build Path を開く
- Add Valiableを選択
- TOMCAT_HOMEを選択
- Extendをクリックし、libの中にあるcatalina.jarを追加
当然、Tomcat 6用のCometアプリケーションは、Tomcat 6以外のサーブレットコンテナで実行する事はできません。Jetty 6も独自にCometをサポートしていますが両者に互換性はありません。
Advanced I/Oを有効にする
デフォルトでは従来のHTTPコネクタを使用するため、Cometは無効になっています。非同期IOを実現するNIO HTTPコネクタを有効にするため、server.xmlのConnector要素を書き換えます。
<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
protocol属性を以下のように修正します。
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="60000" redirectPort="8443" />
サーブレットに接続できない場合
NIOのHTTPコネクタを使用した場合、Javaのバージョンによっては、サーブレットに接続できないことがあります。 java.io.Socket#setTrafficClassメソッドでSocketExceptionがスローされている場合は、以下のJVM起動オプションを設定して回避します。
-Djava.net.preferIPv4Stack=true
サーバサイドのコード
サーバサイドは、通常のJavaプログラミングと変わりません。
CometProcessor
TomcatでCometを処理するサーブレットはCometProcessorインターフェースを実装します。そしてdoGetやdoPost等のメソッドはオーバーライドせずに、CometProcessorで定義されたeventメソッドを実装し、Comet処理を記述します。
CometEvent
eventメソッドは、接続開始時、読み込み準備完了、接続終了時、エラー発生時に呼ばれます。イベントタイプを取得し、処理を振り分けます。
メッセージが追加されると、接続しているすべてのクライアントに対して、そのメッセージを送ります。
import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.catalina.CometEvent; import org.apache.catalina.CometProcessor; /** Cometサーブレット */ public class CometTestServlet extends HttpServlet implements CometProcessor { private static final long serialVersionUID = 1L; protected transient Set<HttpServletResponse> responseSet = new HashSet<HttpServletResponse>(); protected transient MessageSender sender; public void init() { this.sender = new MessageSender("CometResponse"); this.sender.setDaemon(true); this.sender.start(); } public void destroy() { sender.quit(); sender = null; responseSet.clear(); } <em>public void event(CometEvent cometEvent) throws IOException, ServletException</em> { switch (cometEvent.getEventType()) { case BEGIN: log("CometBegin") cometBegin(cometEvent); break; case READ: log("CometRead"); cometRead(cometEvent); break; case END: log("CometEnd"); cometClose(cometEvent); break; case ERROR: log("CometError" + cometEvent.getEventSubType().toString()); cometClose(cometEvent); break; } } private void cometClose(CometEvent cometEvent) throws IOException { synchronized (sender) { responseSet.remove(cometEvent.getHttpServletResponse()); } cometEvent.close(); } private void cometRead(CometEvent cometEvent) throws IOException { HttpServletRequest request = cometEvent.getHttpServletRequest(); HttpServletResponse response = cometEvent.getHttpServletResponse(); if (!"get".equalsIgnoreCase(request.getMethod())) { cometClose(cometEvent); } } private void cometBegin(CometEvent cometEvent) throws IOException { HttpServletRequest request = cometEvent.getHttpServletRequest(); request.setCharacterEncoding("UTF-8"); HttpServletResponse response = cometEvent.getHttpServletResponse(); response.setCharacterEncoding("UTF-8"); if ("get".equalsIgnoreCase(request.getMethod())) { synchronized (sender) { responseSet.add(response); } } } /** 疑似PUSHを実現するロジック */ public class MessageSender extends Thread { protected List<String> messages = new ArrayList<String>(); protected boolean running = true; public MessageSender(String name) { super(name); } public void quit() { running = false; this.interrupt(); } /** メッセージを追加する */ public void send (String message) { synchronized (this) { messages.add(message); notifyAll(); // メッセージが追加されたら処理スレッドを再開 } } public void run() { while (running) { if (messages.size() == 0) { try { synchronized (this) { wait(); // メッセージが追加されるまで待つ } } catch (InterruptedException ignore) { log("interrupted", ignore); } synchronized (this) { for (HttpServletResponse response: responseSet) { response.setContentType("text/plain"); response.setCharacterEncoding("UTF-8"); try { PrintWriter writer = response.getWriter(); for (String message: messages) { writer.write(message); writer.write('\n'); } writer.flush(); writer.close(); response.flushBuffer(); } catch (IOException e) { log("Error on writing message", e); } } messages.clear(); responseSet.clear(); } } } } }
これはコンセプトコードであるため、例外が発生しないように最低限のsynchronizedしていますが、データ整合性を全く気にしていないので注意してください。
クライアントサイド
ブラウザ側は、XMLHttpRequestを使ってサーバとの接続を維持し続けます。(かなり適当な処理なので真似しないように)
// 要 prototype.js v1.5.1 Event.observe(window, "load", wait); function wait() { new Ajax.Request( $("ajaxUpdaterUrl").href + "?time=" + (new Date()).getTime(), // IEのキャッシュ対策 { method: "get", onSuccess: function(transport){ appendMessage(transport); wait(); } onFailure: function(transport){ wait(); } } ); } function appendMessage(transport) { // 追加メッセージに何らかの処理を行う }