当サイトは、アフィリエイト広告を利用しています

【Python】asyncioで非同期処理を利用して並行処理を実行する方法

作成日:2024月10月04日
更新日:2024年10月04日

Pythonの並行処理ライブラリであるasyncioを使って、非同期処理を使った
並行処理をする方法とそれに関わる用語等をまとめておく

同期処理と非同期処理について

まず同期処理と非同期処理についてまとめる

同期処理

同期処理はある処理を実行している値、その処理が完了するまでのは
次の処理を開始しない逐次処理の方式

処理の途中に待ち時間(DBアクセスやIO処理)がある場合は
その処理が終わるまで待つので場合によって時間がかかる。

PythonのフレームワークのFlaskや外部サーバーのGunicornは
同期処理の仕組みを使っている

非同期処理

非同期処理は、処理の完了を待たずに次の処理を開始する方式。

非同期処理は上記のような待ち時間をを他の処理の実行に割り当てることができるので、
複数の処理を同時に実行しているようになる
無駄な待ち時間を減らすことができる。

そのためI/Oバウンド(待ち時間が多いタスク)とは非常に相性がいい

PythonのフレームワークのFaskApiや外部サーバーのUvicornは
非同期処理の仕組みを使っている

並列処置と並行処理

よく似た言葉で並列処置と並行処理があるのでこれも違いをまとめておく

並列処理

処理AとBを同時並列で処理すること。

並行処理

処理AとBを切り替えながら処理すること ちなみにasyncioは並行処理をするライブラリ

asyncioとは?

asyncioは、シングルスレッドで動作する並行処理のためのライブラリ。
細かく言うとasyncioは

  • シングルスレッド
  • 非同期処理
  • 並行処理

で成り立っている。

asyncioを理解するための4つの大きな概念として

  • コルーチン
  • async/await
  • イベントループ
  • タスク

がある。

コルーチンとは?

コルーチンは、プログラムが一時停止と再開をできる関数のこと。
普通の関数は呼び出されるとすぐに終了するまで実行されるが
コルーチンは「途中で一時停止して、他の作業をして、後で再開する」という動作をすることができる

コルーチンは関数に「async」をつけることで定義できる

コルーチン
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)
  • 関数にasyncをつけて定義すると非同期処理を実行できるコルーチンとなる

async/awaitとは?

asyncとawaitは、Pythonで非同期処理を記述するためのキーワード。

async

asyncはコルーチンで書いた通り、関数につけることでその関数をコルーチンにできる

await

「await」をつけることでコルーチンを一時停止・再開させられるようになる
(他のタスクに制御を渡すことができるようになる)

コルーチンの中でI/Oバウンド(待ち時間が多いタスク)な処理に入った場合
現在のコルーチンは一時停止され、一時停止している間に他のタスクに制御を渡すことで、
非同期処理を行えるようになる。

I/Oバウンド(待ち時間が多いタスク)な処理が完了すると元の処理が再開される

またコルーチンをコルーチンの中で呼び出す場合はawaitをつける必要がある。

コルーチン実行(並行実行でない)
import asyncio
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)
# コルーチン
async def main():
# 非同期関数は await で呼び出す必要がある
await say_hello(name="ichiro")
await say_hello(name="jiro")
await say_hello(name="saburo")
if __name__ == "__main__":
# asyncio.run で main 関数を実行
asyncio.run(main())
  • コルーチンからコルーチンを呼ぶ場合は、awaitが必須
    ※上記コードは並行実行ではないので注意。詳しくは後述する

タスクとは?

タスクとは、非同期処理において実行される単位の処理こと。
Pythonの非同期ではコルーチンをタスクとして扱う。

コルーチンはタスクにすることで非同期を使って並行実行ができるようになる。
上記の「コルーチン実行(並行実行でない)」を並行実行に書き換えると下記のようになる

コルーチンの並行実行
import asyncio
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)
# 並行実行コルーチン
async def main():
# 各 say_hello をタスクとして並行実行
task1 = asyncio.create_task(say_hello("ichiro"))
task2 = asyncio.create_task(say_hello("jiro"))
task3 = asyncio.create_task(say_hello("saburo"))
print("start")
# すべてのタスクが完了するまで待機
await task1
await task2
await task3
print("end")
if __name__ == "__main__":
# asyncio.run で main 関数を実行
asyncio.run(main())
  • mainコルーチンはasyncio.run()で実行する
  • syncio.create_task()を使って、コルーチンsay_helloをタスクに変換することで並行実行できる
  • taskにawaitをつけることで待ち時間で他のタスクの処理を実行するようになる

実行すると

実行結果
start
Hello, ichiro
Hello, jiro
Hello, saburo
end

のようになる。

3つのタスクが並行実行されるので

  • 逐次実行なら3秒かかるところ、1秒で完了する
  • タスクの完了順は変わる場合がある

となる

仮にawaitをつけないとタスクが完了する前にプログラム自体が終了してしまうので下記のようになる

実行結果
start
end

イベントループとは?

イベントループはI/Oの待ち時間やタスクの実行を効率的に切り替えることで、
プログラムを止めずに動かし続ける仕組みのこと。

Pythonではasyncioを使ってイベントループを動作させる

イベントループの役割

タスクをスケジュールして実行するのがイベントループの役割。

イベントループは常に次に実行するべきタスクを管理しており、
各タスクが完了するタイミングを追いながら、効率的にタスクの切り替えを行う

イベントループの動作

イベントループのフローとしては

  1. コルーチンの作成
  2. コルーチンをタスクに登録
  3. タスクを並行実行
  4. I/O待ちや時間のかかる処理中に他のタスクを実行
  5. タスクの完了後に再度処理を再開

のようになる。

例えば

コルーチンの並行実行
import asyncio
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)
# 並行実行コルーチン
async def main():
# 各 say_hello をタスクとして並行実行
task1 = asyncio.create_task(say_hello("ichiro"))
task2 = asyncio.create_task(say_hello("jiro"))
task3 = asyncio.create_task(say_hello("saburo"))
print("start")
# すべてのタスクが完了するまで待機
await task1
await task2
await task3
print("end")
if __name__ == "__main__":
# asyncio.run で main 関数を実行
asyncio.run(main())

をイベントループの観点から説明すると

コルーチンの作成

コルーチンの作成
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)

非同期で並行実行されるsay_helloコルーチンを定義

並行実行コルーチンの作成
# 並行実行コルーチン
async def main():
# 各 say_hello をタスクとして並行実行
task1 = asyncio.create_task(say_hello("ichiro"))
task2 = asyncio.create_task(say_hello("jiro"))
task3 = asyncio.create_task(say_hello("saburo"))
print("start")
# すべてのタスクが完了するまで待機
await task1
await task2
await task3
print("end")

並行実行を行うmainコルーチンを定義

タスクの登録

タスク登録
# 並行実行コルーチン
async def main():
# 各 say_hello をタスクとして並行実行
task1 = asyncio.create_task(say_hello("ichiro"))
task2 = asyncio.create_task(say_hello("jiro"))
task3 = asyncio.create_task(say_hello("saburo"))

say_helloコルーチンを3つのタスクとしてイベントループに登録する

イベントループはコルーチン(この場合はmainコルーチン)内で登録された
タスクを自動で認識して、並行処理を管理してくれる。

つまりtask1~3はイベントループが並行実行してくれる。

I/O待ちや時間のかかる処理中に他のタスクを実行

タスクの中でI/Oバウンド(待ち時間が多いタスク)な処理が発生した場合
通常の同期処理ではプログラム全体が待ち状態になるが
イベントループはその待機中に別のタスクを実行する。

say_helloコルーチン内では「非同期で1秒待つ」があるので
その時に他のタスクが並行で実行される

タスクの完了後に再度処理を再開

イベントループが管理しているtask1~3がすべて完了すると
待機中の処理を再度実行するので「end」が出力される

非同期処理と並行実行について

非同期処理と並行実行の違いについてまとめおく。

asyncioを使って並行処理を実装する場合、非同期関数(コルーチン)を作成し
それをタスク登録して実行する必要がある。

つまり非同期関数をそのまま実行するだけでは並行処理はされないので注意が必要。

例えば

コルーチン
import asyncio
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)
# コルーチン
async def main():
# 非同期関数は await で呼び出す必要がある
await say_hello(name="ichiro")
await say_hello(name="jiro")
await say_hello(name="saburo")
if __name__ == "__main__":
# asyncio.run で main 関数を実行
asyncio.run(main())

のようなコードの場合、「非同期処理」は実行されているが「並行処理」は実行されていない。

詳しく解説すると

コルーチン
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)

ではawait asyncio.sleep(1)の部分で非同期処理が行われ、
I/O待ち時間中は他のタスクに制御を渡すことが可能になる。
だが、このコードでは並行実行がされていないため、制御を移せないので並行処理にはならない。

そのため

コルーチン
# コルーチン
async def main():
# 非同期関数は await で呼び出す必要がある
await say_hello(name="ichiro")
await say_hello(name="jiro")
await say_hello(name="saburo")

では、awaitが1つずつ呼び出され、1つの処理が完了するまで次の処理は開始されない。
つまり、say_helloの呼び出しは直列的(逐次的)になっている。

まとめると、非同期関数(コルーチン)を作るだけでは
非同期処理は実行するが、並行処理は実行しないので非同期のメリット(待機中に他の処理を行うこと)
はできない。

asyncで作ったコルーチンはタスク登録して並行実行させることで
非同期の恩恵を受けることができる

asyncioのコルーチンの実行方法

コルーチンの実行方法としては

  • asyncio.run()
  • await
  • asyncio.create_task()
  • asyncio.gather()

をよく使う

asyncio.run()

asyncio.run()
import asyncio
# コルーチン定義
async def say_hello():
print("Hello, world!")
asyncio.run(say_hello()) # コルーチンを実行

asyncio.run()は、エントリポイントのコルーチンを一度だけ実行するために使う。
通常、スクリプトやアプリケーション全体のメイン部分で使うことが多い。

await

コルーチンは他のコルーチンの中で呼ぶ場合はawaitを使って呼び出す

await
import asyncio
async def say_hello():
print("Hello, world!")
async def main():
await say_hello() # 別のコルーチンを実行
asyncio.run(main()) # メインコルーチンを実行
  • コルーチンの中でコルーチンを呼ぶ場合はawaitがないと実行できない
  • コルーチンをタスクにせずに呼んでも並行実行はできないので注意

asyncio.create_task()

イベントループの説明でも書いたが

asyncio.create_task()
import asyncio
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)
# コルーチン
async def main():
# 各 say_hello をタスクとして並行実行
task1 = asyncio.create_task(say_hello("ichiro"))
task2 = asyncio.create_task(say_hello("jiro"))
task3 = asyncio.create_task(say_hello("saburo"))
print("start")
# すべてのタスクが完了するまで待機
await task1
await task2
await task3
print("end")
if __name__ == "__main__":
# asyncio.run で main 関数を実行
asyncio.run(main())

asyncio.create_task() を使うことで、コルーチンを並行して(並行タスクとして)実行できる
これにより、複数のタスクを同時に実行できる

asyncio.gather()

asyncio.create_task()では個別でタスクを管理するのに対して
asyncio.gather()はまとめてタスクを管理できる

asyncio.gather()
import asyncio
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)
# コルーチン
async def main():
print("start")
# say_hello コルーチンを並行実行
await asyncio.gather(
say_hello("ichiro"),
say_hello("jiro"),
say_hello("saburo")
)
print("end")
if __name__ == "__main__":
# asyncio.run で main 関数を実行
asyncio.run(main())

asyncio.gather()にコルーチンを渡すことでタスクとして並行実行できる
タスクを並行実行して、すべて完了すれば元の処理を再開する点では
asyncio.create_task()と同じで出力結果も同じになる。

タスクをリストで渡せる

タスクが少数ならいいが大量にある場合などはリストで渡して実行できる

asyncio.gather()
import asyncio
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
return f"Hello, {name}"
# メインコルーチン
async def main():
names = ["ichiro", "jiro", "saburo"]
tasks = [say_hello(name) for name in names] # コルーチンのリストを作成
results = await asyncio.gather(*tasks) # リストをアンパックして渡す
print(results) # 出力: ['Hello, ichiro', 'Hello, jiro', 'Hello, saburo']
if __name__ == "__main__":
asyncio.run(main())

結果をlistで受け取れる

コルーチンで出力結果を返却し、出力するようにした場合

asyncio.gather()
import asyncio
# コルーチン
async def say_hello(name: str):
await asyncio.sleep(1) # 非同期で1秒待つ
hello = f"Hello, {name}"
print(hello)
# 出力結果を返却
return hello
# コルーチン
async def main():
print("start")
# say_hello コルーチンを並行実行
results = await asyncio.gather(
say_hello("ichiro"),
say_hello("jiro"),
say_hello("saburo")
)
print(results)
print("end")
if __name__ == "__main__":
# asyncio.run で main 関数を実行
asyncio.run(main())

asyncio.gather()はすべてのタスクが完了すると結果を
リストで返してくれる

実行結果は下記になる

実行結果
start
Hello, ichiro
Hello, jiro
Hello, saburo
['Hello, ichiro', 'Hello, jiro', 'Hello, saburo']
end

まとめ

asyncioの非同期と並行実行についてまとめてみた。
非同期処理にするだけでは特に意味はなく、タスク化して並行実行することで
非同期の恩恵を受けれれる点は注意する必要があると感じた。

参考

関連記事

新着記事

タグ別一覧
top