pythonで非同期subprocess通信

 前回はasyncioを使用して、websocketsによるウェブソケットサーバーのクライアントからの受信イベントと、サブプロセスからの受信イベントを共に非同期で取り扱う実例を紹介しました。今回は、その中でコードだけ紹介したasyncioを使った非同期サブプロセスの実装についての備忘禄です。

subprocessの生成

サブプロセスとの通信方法は色々あるのですが、DEN将棋EX同様双方向パイプで実現しました。

(1)subprocessオブジェクトの生成

 前回紹介した全体コードのうち下記がsubprocessコルーチンオブジェクト生成部分です。初期化用コルーチン「Init()」の中で、生成する子プロセス数分のループ中に記述されています。codeは、子プロセス用実行プログラムのフルパスです。子プロセス用実行プログラムもpythonで記述しましたので、sys.executable(pythonインタプリタのフルパス)とcodeを要素としたタプルを定義してcreate_subprocess_execの第一引数としています。第2,3,4引数では、stdout,stdin,stderr各々に「asyncio.subprocess.PIPE」を指定しています。ここのstdoutは子プロセスの標準出力、stdinは子プロセスの標準入力です。つまり子プロセスに出力する時はstdinにデータを渡し、子プロセスから受信する時はstdoutから取り出します。生成したオブジェクトはグローバル配列変数に代入しています。最後の行では、子プロセスに初期化データを送信するために、send2childコルーチンをコールしています。

cmd = (sys.executable, code)
proc = await asyncio.create_subprocess_exec(
   *cmd, stdin=asyncio.subprocess.PIPE ,stdout=asyncio.subprocess.PIPE,     
stderr=asyncio.subprocess.PIPE) Children.append(proc)#子プロセスをMaxChidrenNo 個生成
await send2child(proc,'Seat:' + str(num) + '\n') #子プロセスに自分のプロセス番号を通知する

(2)子プロセスへの送信部分

send2childコルーチンは下記です。ここではwriteメソッドの後に、drainメソッドを使用しています。このメソッドはコルーチンを返しますのでawaitします。予想通り、送信バッファが空になるまで、ノンブロッキングにここで待つことができます。これは大きなデータを送信する場合に特に有効です。

async def send2child(proc,da):
    proc.stdin.write(da.encode())
    await proc.stdin.drain()

(3)子プロセスからの受信イベントハンドラ

 子プロセスからの受信イベントハンドラは下記です。このコルーチンは、子プロセスが終了するまでの無限ループです。

 proc.stdout.at_eof()とproc.stderr.at_eof()がともに真になる事で子プロセスが終了したと判定しています。終了を捉えるとループを脱出して、念のためawait proc.communicate()で残っている受信バッファデータを吐き出させます。

 async for line in proc.stdout:async for line in proc.stderr:と記述すると、それぞれ子プロセスの標準出力バッファ、エラー出力バッファから1行づつ読み出す事ができます。もちろん非ブロッキング受信なので、受信バッファにデータが無い時でもそこでブロック停止してしまうことは有りません。つまり「無限ループ」といってもイベントループに制御が戻り、他のタスクが処理されます。途中エラー出力データを受信した場合も無限ループを脱出するようにしました。

 紹介はしませんがread_actionコルーチンは、子プロセス番号(seat)と受信データを受け取って、受信データ毎の処理を受け持ちます。この中では、必要に応じて前述のsend2childコルーチンを呼び出して子プロセスにデータを送信します。

# procは子プロセスオブジェクト  seatは子プロセスNo
async def rec_from_child(proc,seat):
#受信専用ループ,子プロセス終了(=EOF)までの無限ループ while True: if proc.stdout.at_eof() and proc.stderr.at_eof(): break async for line in proc.stdout: #これだと非ブロッキング受信となる if line: await read_action(seat,str(line.decode()).replace('\n', '')) async for erdata in proc.stderr: if erdata: print('[sdterr] ' + str(erdata.decode()), end='',flush=True) break print('子プロセスからの受信エラー発生!強制終了') await proc.communicate() print(' proc return code = ' + str(proc.returncode))