DEN将棋EX_ウェブソケットとAnyEvent

DEN将棋EXについては、色々覚書として残しておこうと思っています。とりあえずウェブソケットとAnyEventから。

shogi-serverをwebkoza.comで稼働させ、100%PerlでDEN将棋とDEN将棋Xを作りましたが、頻繁な定期XMLhttpRequestにより手データをブラウザに反映させるため、パフォーマンスが今一で、相手に手が伝わるのに数秒かかることがあるという課題がありました。またサーバー負荷を考慮して席数も最大20と少ないものでした。それでも個人的に遊ぶためには問題無いのですが、かねてからやってみたかったPerlによるウェブソケットAnyEventを使用して改造することにしました。結構苦労したのですが、完成してみると満足のいくレベルに達したと思っています。

最大席数は100にしました。これだけあれば当面問題無いと思っています。ウェブソケットにしたことにより、パフォーマンスは大幅に改善され、モバイル電波経由のスマートフォン同士の対局でも、電波状況にもよりますが、同時対局者数が数人であれば手が相手に伝わる時間は、だいたい1秒程度です。

ここでは、主に、[DEN将棋EX]のゲートウェイdaemon用PSGIで採用したPlack::App::WebSocketAnyEventをどのように使用したかを記載しておきます。

1.概要

ウェブソケットって何?な人は、下記のリンクを参考にしてください。

https://qiita.com/h_tyokinuhata/items/809bdb66f8da6dd53bb6
https://ja.m.wikipedia.org/wiki/WebSocket
https://www.keicode.com/script/html5-websocket-1.php

ウェブソケットにすることにより最初のhttpリクエストが成功した後、対局中は80番ポートを経由したTCPSocket接続が維持されます。これにより必要な時だけデータが通信路に流れることになります。具体的には、手を指した時はブラウザからゲートウェイdaemon経由でshogi-serverにデータが送信され、shogi-serverが手データを配信した時はゲートウェイdaemon経由で手データがサーバーからブラウザに届きます。これにより無駄なデータ交換が無くなり、大幅にパフォーマンスが改善するわけです。

さらに、席毎にForkしたワーカープロセス(直接shogi-serverと通信する)経由のshogi-serverからのデータを、AnyEvent::Handleのイベントハンドラーで受信するようにしました。

こうして、ゲートウェイdaemonとしては、Plack::App::WebSocketによる上位(ブラウザ)からと、下位(ワーカープロセス)からの2種類のデータ受信イベントによる、完全なイベントドリブン型のプログラミングが可能になりました。

なお、ゲートウェイdaemon用PSGIの起動は、AnyEventを使用する場合Plackは未対応という事なので、はじめてTwiggyを使用しました。これも宮川さんが開発したツールです。

Twiggy - AnyEvent HTTP server for PSGI (like Thin)

2.連関図

下図は以前も掲載したソフトウェア連関図ですがhttpの部分がWeb Socketに変更になっています。

DENEX_renkan.PNG

DEN将棋EXは、連関図のPSGI Programs(ゲートウェイdaemon)TCP Client(ワーカープロセス)および、ブラウザサイドプログラム(JavaScript群)とhtml、CSSファイルから構成されています。

3.イベントドリブンプログラミング

(1)ウェブソケットアプリケーション本体部

他にも使えそうなモジュールは有るのですが、作者が丁寧な記事を残してくれているPlack::App::WebSocketを使用する事にしました。開発者の下記記事を参考に実装しました。

https://debug-ito.hatenablog.com/entry/20131110/1384067233
https://debug-ito.hatenablog.com/entry/20131110/1384067449
https://qiita.com/debug-ito/items/e347dd1e08891f8dd612

DEN将棋EXのゲートウェイdaemonとしては、最初のリクエストでログイン処理&席番決定まで行い、後はウェブソケット通信によりワーカープロセス(経由shogi-server)とブラウザが継続的にやり取りします。下記のコードは、PSGIアプリケーション本体(かなり省略して記載しています)です。

Plack::App::WebSocketオブジェクトを生成(new)して、このオブジェクトのcallメソッド(PSGI環境変数が引数)を呼ぶ事が仕事です。このnewメソッドの引数には、必要なコールバックを設置できる仕組みが用意されており、on_error 、on_establish をキーとしたコールバックを設置しています。前者はウェブソケット接続時のエラーハンドラーで後者はウェブソケット確立後の処理です。

ウェブソケット確立後の処理としては、最初に、引数で受け取れる接続オブジェクト($conn)毎に、DEN将棋EXのログインオブジェクト$GateWay{"$conn"}を生成し、このオブジェクトのto_appメソッドをコールしてログインと席番確定を行っています。次に接続オブジェクト($conn)のonメソッドをコールします。このonメソッドの引数には、コールバックとして、ウェブソケットでメッセージを受信した時のイベントハンドラと、ウェブソケット切断イベントハンドラの2つを、各々message 、finishをキーとして記述することができます。

受信イベントハンドラの内容としては、ウェブソケットで受信したデータを対応するワーカープロセスに送信する処理がメインです。送信サーブルーチンのsend_daには、$GateWay{"$conn"}のto_appでリファレンス引数として生成された席番に対応するワーカープロセスのIOハンドラリファレンスと、メッセージそのものを渡しています。

切断イベントハンドラは、異常切断と通常切断の処理を分けて記述しました。内部的にはどちらのイベントハンドラも接続オブジェクト($conn)毎に生成されることに注意してください。

my $ShogiLoginEX = sub {
# 将棋サーバーへのログインアプリケーション
#  対局待ち開始/観戦セッション開始
 my $env0 = shift;
 Plack::App::WebSocket->new(
  on_error => sub {
   my $env = shift;
   return [500,
   ["Content-Type" => "text/plain"],
   ["Error: " . $env->{"plack.app.websocket.error"}]];
  },
  on_establish => sub {
   my $conn = shift;   ## Plack::App::WebSocket::Connection object
   my $env = shift;    ## PSGI env
   my $hs_res = shift; ## extra results from the handshake callback
   $Websockets{"$conn"} = $conn;#確立したソケットを保存
   my $request = Plack::Request->new($env);
   ### マッチメークシーケンス開始
   $GateWay{"$conn"} = ShogiEX::LoginEX->new($request);# PSGI Requestを渡してnew
   $Status{"$conn"} = 'start';
   $GateWay{"$conn"}->to_app($conn,.....);
   $conn->on(
     message => sub {
      my ($conn, $msg) = @_;
      my($sda);
      #データをウェブソケット受信
      $sda = [split(/=/,$msg)]->[1];# sda=*** の右辺を取り出す
      &send_da($OUTREF[$SeatBySocket{"$conn"}],$sda); # データを子プロセスへ
     },
     finish => sub {
      if(....){
#異常終了時の処理
&ng_close_websoket($conn);
}else{
#正常終了時の処理
&close_websoket($conn);
}
},
);
}
)->call($env0);
};

(2)ワーカプロセスからのデータ受信イベントハンドラ

標準入力を監視するためにAnyEvent::Handleという便利なモジュールが有ります。これを使えば、あらかじめdaemon起動後の最初に生成したワーカープロセス(子プロセス)からの入力データを受信するイベントハンドラを記述する事ができます。下記は多少省略していますが、イベントハンドラを含むイベントループサブルーチンです。

このサブルーチンは、daemon起動後の初期化処理で全ワーカープロセスを生成したときの、受信IOハンドラリフェレンスとワーカープロセス番号(=席番)を受け取って、各ワーカープロセス毎の受信イベントハンドラを定義しています。

AnyEvent->timerに渡すコールバック関数($read_cin)の中で、AnyEvent::Handleオブジェクト$fhのon_readメソッドを呼びます。このメソッドの引数で渡すコールバックが、イベントハンドラ実体です。この中で、受信バッファ全体を一気に受け取るために、$fh->{rbuf}を直接参照しているのが味噌です。

イベントハンドラの中でコールしているread_actionは、ワーカープロセスから受信したデータ毎に行うDEN将棋EX独自処理を記述しているサブルーチンです。

AnyEventについての詳細は下記あたりを参照してください。
https://gihyo.jp/dev/serial/01/perl-hackers-hub/000203?page=2

下記は、AnyEvent::Handleドキュメントの日本語訳で、大変参考になりました!
https://abcb2.hatenadiary.org/entry/20101228/1293491312

sub evloop{
my($iref,$i)=@_;
my($cv,$fh,$w);
 $cv = AnyEvent->condvar;
 $fh = AnyEvent::Handle->new(
  fh => $iref,
  # エラー,もしくはEOF時に$cvに終了通知を送る
  on_eof => sub { $cv->end },
  on_error => sub { $cv->end },
 );
 # 最後の待ちを有効にするために1回beginを呼ぶ
 $cv->begin();
 my $read_cin ; $read_cin = sub {
  $fh -> on_read ( sub {#読み出し
   my($fh) = @_;
   my $rbuf = $fh->{rbuf};
   # 処理している間は
   # $cvの条件を満たさないように
   # フラグを立てておく
   $cv->begin();
   my @recdata = split(/\n/,$rbuf);
   $fh->{rbuf} = '';
   for(@recdata){
    &read_action($i,$_);
   }
   $cv->end();#終わったらフラグを落とす
   $w = AnyEvent->timer(after => 0, cb => $read_cin);
  } );
 };
 # もう1回読むために$read_cinをイベントキューにいれる
 $w = AnyEvent->timer(after => 0, cb => $read_cin);
}