当サイトは、アフィリエイト広告を利用しています
Pythonの並行処理ライブラリであるasyncioを使って、非同期処理を使った
並行処理をする方法とそれに関わる用語等をまとめておく
まず同期処理と非同期処理についてまとめる
同期処理はある処理を実行している値、その処理が完了するまでのは
次の処理を開始しない逐次処理の方式
処理の途中に待ち時間(DBアクセスやIO処理)がある場合は
その処理が終わるまで待つので場合によって時間がかかる。
PythonのフレームワークのFlaskや外部サーバーのGunicornは
同期処理の仕組みを使っている
非同期処理は、処理の完了を待たずに次の処理を開始する方式。
非同期処理は上記のような待ち時間をを他の処理の実行に割り当てることができるので、
複数の処理を同時に実行しているようになる
無駄な待ち時間を減らすことができる。
そのためI/Oバウンド(待ち時間が多いタスク)とは非常に相性がいい
PythonのフレームワークのFaskApiや外部サーバーのUvicornは
非同期処理の仕組みを使っている
よく似た言葉で並列処置と並行処理があるのでこれも違いをまとめておく
処理AとBを同時並列で処理すること。
処理AとBを切り替えながら処理すること ちなみにasyncioは並行処理をするライブラリ
asyncioは、シングルスレッドで動作する並行処理のためのライブラリ。
細かく言うとasyncioは
で成り立っている。
asyncioを理解するための4つの大きな概念として
がある。
コルーチンは、プログラムが一時停止と再開をできる関数のこと。
普通の関数は呼び出されるとすぐに終了するまで実行されるが
コルーチンは「途中で一時停止して、他の作業をして、後で再開する」という動作をすることができる
コルーチンは関数に「async」をつけることで定義できる
# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)
asyncとawaitは、Pythonで非同期処理を記述するためのキーワード。
asyncはコルーチンで書いた通り、関数につけることでその関数をコルーチンにできる
「await」をつけることでコルーチンを一時停止・再開させられるようになる
(他のタスクに制御を渡すことができるようになる)
コルーチンの中でI/Oバウンド(待ち時間が多いタスク)な処理に入った場合
現在のコルーチンは一時停止され、一時停止している間に他のタスクに制御を渡すことで、
非同期処理を行えるようになる。
I/Oバウンド(待ち時間が多いタスク)な処理が完了すると元の処理が再開される
またコルーチンをコルーチンの中で呼び出す場合はawaitをつける必要がある。
import asyncio# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)# コルーチンasync def main():# 非同期関数は await で呼び出す必要があるawait say_hello(name="ichiro")await say_hello(name="jiro")await say_hello(name="saburo")if __name__ == "__main__":# asyncio.run で main 関数を実行asyncio.run(main())
タスクとは、非同期処理において実行される単位の処理こと。
Pythonの非同期ではコルーチンをタスクとして扱う。
コルーチンはタスクにすることで非同期を使って並行実行ができるようになる。
上記の「コルーチン実行(並行実行でない)」を並行実行に書き換えると下記のようになる
import asyncio# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)# 並行実行コルーチンasync def main():# 各 say_hello をタスクとして並行実行task1 = asyncio.create_task(say_hello("ichiro"))task2 = asyncio.create_task(say_hello("jiro"))task3 = asyncio.create_task(say_hello("saburo"))print("start")# すべてのタスクが完了するまで待機await task1await task2await task3print("end")if __name__ == "__main__":# asyncio.run で main 関数を実行asyncio.run(main())
実行すると
startHello, ichiroHello, jiroHello, saburoend
のようになる。
3つのタスクが並行実行されるので
となる
仮にawaitをつけないとタスクが完了する前にプログラム自体が終了してしまうので下記のようになる
startend
イベントループはI/Oの待ち時間やタスクの実行を効率的に切り替えることで、
プログラムを止めずに動かし続ける仕組みのこと。
Pythonではasyncioを使ってイベントループを動作させる
タスクをスケジュールして実行するのがイベントループの役割。
イベントループは常に次に実行するべきタスクを管理しており、
各タスクが完了するタイミングを追いながら、効率的にタスクの切り替えを行う
イベントループのフローとしては
のようになる。
例えば
import asyncio# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)# 並行実行コルーチンasync def main():# 各 say_hello をタスクとして並行実行task1 = asyncio.create_task(say_hello("ichiro"))task2 = asyncio.create_task(say_hello("jiro"))task3 = asyncio.create_task(say_hello("saburo"))print("start")# すべてのタスクが完了するまで待機await task1await task2await task3print("end")if __name__ == "__main__":# asyncio.run で main 関数を実行asyncio.run(main())
をイベントループの観点から説明すると
# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)
非同期で並行実行されるsay_helloコルーチンを定義
# 並行実行コルーチンasync def main():# 各 say_hello をタスクとして並行実行task1 = asyncio.create_task(say_hello("ichiro"))task2 = asyncio.create_task(say_hello("jiro"))task3 = asyncio.create_task(say_hello("saburo"))print("start")# すべてのタスクが完了するまで待機await task1await task2await task3print("end")
並行実行を行うmainコルーチンを定義
# 並行実行コルーチンasync def main():# 各 say_hello をタスクとして並行実行task1 = asyncio.create_task(say_hello("ichiro"))task2 = asyncio.create_task(say_hello("jiro"))task3 = asyncio.create_task(say_hello("saburo"))
say_helloコルーチンを3つのタスクとしてイベントループに登録する
イベントループはコルーチン(この場合はmainコルーチン)内で登録された
タスクを自動で認識して、並行処理を管理してくれる。
つまりtask1~3はイベントループが並行実行してくれる。
タスクの中でI/Oバウンド(待ち時間が多いタスク)な処理が発生した場合
通常の同期処理ではプログラム全体が待ち状態になるが
イベントループはその待機中に別のタスクを実行する。
say_helloコルーチン内では「非同期で1秒待つ」があるので
その時に他のタスクが並行で実行される
イベントループが管理しているtask1~3がすべて完了すると
待機中の処理を再度実行するので「end」が出力される
非同期処理と並行実行の違いについてまとめおく。
asyncioを使って並行処理を実装する場合、非同期関数(コルーチン)を作成し
それをタスク登録して実行する必要がある。
つまり非同期関数をそのまま実行するだけでは並行処理はされないので注意が必要。
例えば
import asyncio# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)# コルーチンasync def main():# 非同期関数は await で呼び出す必要があるawait say_hello(name="ichiro")await say_hello(name="jiro")await say_hello(name="saburo")if __name__ == "__main__":# asyncio.run で main 関数を実行asyncio.run(main())
のようなコードの場合、「非同期処理」は実行されているが「並行処理」は実行されていない。
詳しく解説すると
# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)
ではawait asyncio.sleep(1)の部分で非同期処理が行われ、
I/O待ち時間中は他のタスクに制御を渡すことが可能になる。
だが、このコードでは並行実行がされていないため、制御を移せないので並行処理にはならない。
そのため
# コルーチンasync def main():# 非同期関数は await で呼び出す必要があるawait say_hello(name="ichiro")await say_hello(name="jiro")await say_hello(name="saburo")
では、awaitが1つずつ呼び出され、1つの処理が完了するまで次の処理は開始されない。
つまり、say_helloの呼び出しは直列的(逐次的)になっている。
まとめると、非同期関数(コルーチン)を作るだけでは
非同期処理は実行するが、並行処理は実行しないので非同期のメリット(待機中に他の処理を行うこと)
はできない。
asyncで作ったコルーチンはタスク登録して並行実行させることで
非同期の恩恵を受けることができる
コルーチンの実行方法としては
をよく使う
import asyncio# コルーチン定義async def say_hello():print("Hello, world!")asyncio.run(say_hello()) # コルーチンを実行
asyncio.run()は、エントリポイントのコルーチンを一度だけ実行するために使う。
通常、スクリプトやアプリケーション全体のメイン部分で使うことが多い。
コルーチンは他のコルーチンの中で呼ぶ場合はawaitを使って呼び出す
import asyncioasync def say_hello():print("Hello, world!")async def main():await say_hello() # 別のコルーチンを実行asyncio.run(main()) # メインコルーチンを実行
イベントループの説明でも書いたが
import asyncio# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)# コルーチンasync def main():# 各 say_hello をタスクとして並行実行task1 = asyncio.create_task(say_hello("ichiro"))task2 = asyncio.create_task(say_hello("jiro"))task3 = asyncio.create_task(say_hello("saburo"))print("start")# すべてのタスクが完了するまで待機await task1await task2await task3print("end")if __name__ == "__main__":# asyncio.run で main 関数を実行asyncio.run(main())
asyncio.create_task() を使うことで、コルーチンを並行して(並行タスクとして)実行できる
これにより、複数のタスクを同時に実行できる
asyncio.create_task()では個別でタスクを管理するのに対して
asyncio.gather()はまとめてタスクを管理できる
import asyncio# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)# コルーチンasync def main():print("start")# say_hello コルーチンを並行実行await asyncio.gather(say_hello("ichiro"),say_hello("jiro"),say_hello("saburo"))print("end")if __name__ == "__main__":# asyncio.run で main 関数を実行asyncio.run(main())
asyncio.gather()にコルーチンを渡すことでタスクとして並行実行できる
タスクを並行実行して、すべて完了すれば元の処理を再開する点では
asyncio.create_task()と同じで出力結果も同じになる。
タスクが少数ならいいが大量にある場合などはリストで渡して実行できる
import asyncio# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つreturn f"Hello, {name}"# メインコルーチンasync def main():names = ["ichiro", "jiro", "saburo"]tasks = [say_hello(name) for name in names] # コルーチンのリストを作成results = await asyncio.gather(*tasks) # リストをアンパックして渡すprint(results) # 出力: ['Hello, ichiro', 'Hello, jiro', 'Hello, saburo']if __name__ == "__main__":asyncio.run(main())
コルーチンで出力結果を返却し、出力するようにした場合
import asyncio# コルーチンasync def say_hello(name: str):await asyncio.sleep(1) # 非同期で1秒待つhello = f"Hello, {name}"print(hello)# 出力結果を返却return hello# コルーチンasync def main():print("start")# say_hello コルーチンを並行実行results = await asyncio.gather(say_hello("ichiro"),say_hello("jiro"),say_hello("saburo"))print(results)print("end")if __name__ == "__main__":# asyncio.run で main 関数を実行asyncio.run(main())
asyncio.gather()はすべてのタスクが完了すると結果を
リストで返してくれる
実行結果は下記になる
startHello, ichiroHello, jiroHello, saburo['Hello, ichiro', 'Hello, jiro', 'Hello, saburo']end
asyncioの非同期と並行実行についてまとめてみた。
非同期処理にするだけでは特に意味はなく、タスク化して並行実行することで
非同期の恩恵を受けれれる点は注意する必要があると感じた。