pythonによる非同期プログラミング

 前回は、はじめて作ったpythonによるコードを基に、websocketsモジュールを使ったウェブソケットサーバーの実装までを紹介しました。今回は、ウェブソケットクライアント(ブラウザ)からの受信データイベントハンドラと、子プロセスからの受信データイベントハンドラを同時に実装した実例を紹介します。使用したのはasyncioモジュールです。

asynioの使い方

 asyncioモジュールはpipで簡単にインストールできます。わからない場合は適当にググってください。

 asyncioはいわゆる非同期プログラミングを行うことで、リソース由来のブロッキングや時間がかかる処理による無駄な待ち時間を減らすためのモジュールです。perlのイベントドリブンに特化したAnyEventとコルーチンを定義できるCoroという2つのモジュールをドッキングしたようなモジュールです。AnyEvent配下に色々リソース毎のノンブロッキングIOを提供するモジュールが有るように、pythonのasyncioにもその配下にリソース毎の同様なモジュール/メソッド群が有ります。

 注意したいのは非同期プログラミングとは、マルチプロセス・マルチスレッドとは異なるという事です。マルチプロセス・マルチスレッドでは、再開するタスクはOSのスケジューラが決定しますが、非同期プログラミングでは、再開するタスクはアプリケーションが決定します。

 asyncioでは、通常の関数定義の前に「asyc」キーワードを付加するとその関数は、コルーチンオブジェクトになります。直接この関数を実行しようとしても関数リファレンス値が帰って来るだけです。このオブジェクトを実行するには、イベントループにタスクとして登録する必要が有ります。登録したら、イベントループを実行すれば登録したタスクの順番に関数が実行されます。

下方に、今回作ったプログラムの抜粋を掲載します。実現方法は他にも色々あるはずなのであくまで一例です。main部分の説明は以下の通りです。
(****はポート番号)

  1. asyncio.get_event_loop()
    イベントループを生成
  2. loop.run_until_complete(Init())
    Init()というコルーチンを実行し、完了したらイベントループ終了
    Init()は全子プロセス(コルーチンオブジェクト)を生成し全子プロセス初期化データを送信する
  3. futures = set()
    集合変数を生成
  4. for k in range(0, MaxChidrenNo):
        futures.add(rec_from_child(Children[k],k))
         全子プロセス用の受信イベントハンドラ(コルーチン)を集合変数に追加
  5. start_server = websockets.serve(message_received, "localhost", ****)
    ウェブソケットサーバー(コルーチン)を生成
  6. futures.add(start_server)
    ウェブソケットサーバー(コルーチン)をを集合変数に追加
  7. loop.run_until_complete(asyncio.wait(futures))
    コルーチン集合をタスクとしてイベントループに渡し、全タスクが完了するまでイベントループを回し続ける

 紛らわしくてすいませんが、ここでfuturesという集合は、コルーチンの集合です。futureオブジェクトではありません。asyncio.waitにコルーチンの集合を渡すと要素各々がタスクに変換されます。タスクはfutureクラスのサブクラスだそうです。これをイベントループに渡しています。run_until_completeは、全タスクが完了すると終了してしまうので、終了してはいけないウェブソケットイベントハンドラと子プロセスからの受信イベントハンドラは各々関数内で無限ループとなっています。

 ウェブソケットイベントハンドラmessage_receivedは、websockets.serveに渡しているコールバックなわけですが、クライアントからの接続が確立されるたびに実行され、受信データ待ちの無限ループに入ります。ここではゾンビソケットが増殖することを防ぐために、例外を捉えてこれをウェブソケット切断と判断しています。これはtryの中のrecvを呼んだ時にクライアントが存在しないと例外が発生するからです。切断を検知すると後処理を行いこのイベントハンドラ(タスク)は終了します。別のクライアントが接続されるとまた、新たなイベントハンドラ(タスク)が実行開始するわけです。

 子プロセスからの受信イベントハンドラも無限ループとなっており、設定した子プロセス数のイベントハンドラ(タスク)は終了しませんので、結局run_until_complete()は終了しないということになります。

 各コルーチン内では、awaitキーワードを使って、実際の仕事が完了するまでのブロックが発生せずイベントループに戻るようになっています。タスクが完了するとまたその場所から次の処理が行われます。

<ウェブソケットイベントハンドラ内で受信データを取得する部分>

message = await websocket.recv()

 上記コードでブラウザからのデータを受信しています。send_imagefile2wstcli というコルーチンの中では、ブラウザにイメージファイルを送信しています。websocketsは、sendメソッドにバイナリデータを渡せば自動でウェブソケットのヘッダー部を「opcode=0x2(バイナリデータ)」にして送信してくれます。ファイルを読み込んでバイナリデータに変換する部分は、PILライブラリのImageモジュールを使用しています。このコルーチンの中で下記がブラウザにバイナリデータを送信する部分です。引数にテキストデータを指定すればテキストデータとして送信してくれます。

<Init()コルーチン内でブラウザにイメージファイルを送信する部分>

await wst.send(img.getvalue())

 子プロセスとの通信のために、asyncio.create_subprocess_execを使用して双方向パイプを生成しています。通常の双方向パイプだと送受信でブロックが発生してしまうのですが、このメソッドを使えばノンブロッキングな受信イベントハンドラを書くことができます。asycioの双方向パイプによるサブプロセス通信については次回(rec_from_childの説明など)記事にします。

 最後に1つ。「await (croutine)」を多用していますが、これは前述の通りコルーチンが実行を返すのを待ち、実行が終わるまでイベントループに制御を戻すということを行います。よって、recvとかsendとかというメソッドはコルーチンを返すので、awaitで実行させるわけなのですが、awaitの行を含む関数は必ずコルーチンでなければなりません。つまりasync defで定義する必要があります。defだけだとエラーになります。そうするとそのコルーチンを実行させるのにもまたawaitを使う必要があるので、結局awaitとasync defが大量に発生する事になるのです。これを防ぐ試みもあるようですが、python初心者には敷居が高そうです。

<コード全体(抜粋ですが。。)>

import logging , sys ,re ,io, datetime, os, time, urllib.parse
import websockets, asyncio
from PIL import Image
・・・・・・・・・・・・・・
Children = []#子プロセスオブジェクト配列。インデックスは子プロセスNo
・・・・・・・・・・・・・・
#--初期化
async def Init():
    for num in range(MaxChidrenNo):
        SeatUsing.append(0)
        code = ChildCode
        cmd = (sys.executable, code)
        proc = await asyncio.create_subprocess_exec(
            *cmd, stdin=asyncio.subprocess.PIPE ,stdout=asyncio.subprocess.PIPE,                                                                        
stderr=asyncio.subprocess.PIPE) Children.append(proc)#子プロセスをMaxChidrenNo 個生成 await send2child(proc,'Seat:' + str(num) + '\n')
#最初に子プロセスに自分の席番を通知する #--子プロセスからの受信イベントハンドラ # procは子プロセスオブジェクト seatは子プロセスNo async def rec_from_child(proc,seat): #受信専用ループ,子プロセス終了(=EOF)までの無限ループ while True: if proc.stdout.at_eof() and proc.stderr.at_eof(): break async for line in proc.stdout: #これだと非ブロッキング受信となる if line: await read_action(seat,str(line.decode()).replace('\n', '')) async for erdata in proc.stderr: if erdata: print('[sdterr] ' + str(erdata.decode()), end='',flush=True) break print('子プロセスからの受信エラー発生!強制終了') await proc.communicate() print(' proc return code = ' + str(proc.returncode)) #--子プロセスにデータを送信 async def send2child(proc,da): proc.stdin.write(da.encode()) await proc.stdin.drain() #--イメージファイルをウェブソケットクライアントへ送信 async def send_imagefile2wstcli(wst,fname): im = Image.open(Appdir + '/' + fname) img = io.BytesIO() #空のインスタンスを作る im.save(img,"png") #空のインスタンスに保存する await wst.send(img.getvalue()) #--ウェブソケットクライアントからの初期化コマンド処理 async def init_act(websocket,mobj): StatusByWSID[str(websocket.id)] = 'wait_tran' #トップイメージをブラウザへ await send_imagefile2wstcli(websocket,TopImgFile) #トップイメージ送信後の子プロセスへの指示 await send2child(Children[SeatByWSID[str(websocket.id)]],'command2child:1\n') #--ウェブソケットクライアントからの受信イベントハンドラ async def message_received(websocket, path): #・・・・新しいクライアントの認証処理、子プロセスNoとの紐付け、・・・・ #・・・・グローバル変数へのセット・・・・ while True: try: TempWSK = websocket message = await websocket.recv() sda_ar = message.split('=') sda = sda_ar[1] urllib.parse.unquote(sda)#URIアンエスケープ pat_ar = ['^init:','^A','^B','^C'] act_func = {'^init:':init_act,'^A':A_act,'^B':B_Act,'^C':C_Act} if StatusByWSID[str(websocket.id)] == 'wait_reset': mobj = re.match(pat_ar[0],sda)#初期化指示 if mobj:await act_func[pat_ar[0]](websocket,mobj) #・・・・その他の状態の時の処理・・・・ except: #websocketオブジェクトは消滅しているため、 #ここに飛ぶ前にtryでストアしたテンポラリオブジェクトを使って
#ステータス配列から削除 StatusByWSID.pop(str(TempWSK.id)) #・・・・その他グローバル変数からの削除・・・・ break #-- Main loop = asyncio.get_event_loop() #まずInit()では全子プロセスを生成し各プロセスに初期化データ(席番)通知実施 loop.run_until_complete(Init()) futures = set() for k in range(0, MaxChidrenNo): futures.add(rec_from_child(Children[k],k)) start_server = websockets.serve(message_received, "localhost", ****) futures.add(start_server) loop.run_until_complete(asyncio.wait(futures))