Python Monthly Topics

Python 3.11の新機能:asyncio.TaskGroupを使った予測可能でより安全な非同期処理

こんにちは、福田@JunyaFffです。第4回目である10月の「Python Monthly Topics」は、2022年10月24日にリリース予定[1]のPython 3.11に追加されるasyncioの新機能asyncio.TaskGroupを紹介します。

Python 3.11の新機能

高速化が話題のPython 3.11ですが、今回取り上げるのは非同期I/Oで並行処理を実現する標準ライブラリasyncioの新機能asyncio.TaskGroupです。asyncio.TaskGroupは複数のタスクを並行処理する高レベルAPIになります。同様の既存機能asyncio.gather()asyncio.wait()と大きく違う2つの特徴があります。

  • 同時に発生した例外の捕捉
  • 例外発生時のタスクのキャンセル

この特徴によって、発生した例外を漏らさずに記録でき、制御しやすい並行処理の実装がより簡単にできるようになりました。私個人としては、かゆいところに手が届くイチオシの新機能です。[2]

asyncioを利用していてすでに例外やタスクのキャンセルを意識されている方も、あまりasyncioを使っておらず例外やキャンセルを意識されていない方も、この記事を通してPython 3.11のasyncio.TaskGroupを試してみよう!と思っていただけると幸いです。

なおこの記事は、Python 3.10.6、Python 3.11.0rc2で動作確認をしています。

asyncio.TaskGroup の基本の書き方

公式ドキュメントにあるサンプルコードを見てみましょう。asyncio.TaskGroupはコンテキストマネージャーを利用し、その中で複数のタスクを同時に実行します。サンプルコードではsome_coro()another_coro()という非同期関数(中身の実装はありません)を同時に実行しています。コンテキストマネージャーを抜けるタイミングでタスクが完了しています。

コード asyncio.TaskGroupのサンプルコード
import asyncio

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(some_coro(...))
        task2 = tg.create_task(another_coro(...))
    print("Both tasks have completed now.")

引用元コルーチンとTask — Python 3.11 documentation

しかし、複数のタスクを同時に処理するのであれば、既存機能のasyncio.gather()asyncio.wait()でも同様の書き方ができます。

コード asyncio.TaskGroup のサンプルコードをasyncio.gather()で書き換え
import asyncio

async def main():
    await asyncio.gather(
      some_coro(...),
      another_coro(...)
    )
    print("Both tasks have completed now.")

違いは例外の扱いにあります。まずは、⁠同時に発生した例外の捕捉」について詳しく見てみましょう。

同時に発生した例外の捕捉

ここでは同時に発生した例外処理をPython 3.10のasyncio.gather()と、Python 3.11のasyncio.TaskGroupで、それぞれ比較してみましょう。

Python 3.10以前のasyncioの例外処理

Python 3.10以前の例外処理について見てみましょう。いくつか方法がありますが、ここではasyncio.gather()の利用例を紹介します。asyncio.gather()では最初の例外を送出するか、すべてのタスクを実行した後に結果をリストで取得するしかありませんでした。

  • asyncio.gather()での例外処理
    • return_exceptions指定なし:最初の例外を送出
    • return_exceptions=True指定あり:すべてのタスクの完了を待ち、例外を含む結果をリストで取得

return_exceptions指定なしの場合、try-exceptで囲い捕捉します。return_exceptions=Trueの場合、戻り値のリストをチェックする必要があります。

実際にコードにして動きを見てみましょう。同時に実行するための簡易な非同期関数を3つ用意します。うち2つは、例外を送出します。

まずはreturn_exceptionsを指定せずにasyncio.gather()で動かしてみます。

コード asyncio.gather()で最初の例外を送出
import asyncio

async def coro_success():
    return "成功"

async def coro_value_err():
    raise ValueError

async def coro_type_err():
    raise TypeError

async def main():
    """return_exceptionsなし"""
    try:
        results = await asyncio.gather(
            coro_success(), coro_value_err(), coro_type_err()
        )
        print(f"{results=}")
    except BaseException as err:
        print(f"{err=}")

asyncio.run(main())

実行すると次のような結果になります。最初の例外のみを捕捉できます。同時に発生した coro_type_err() のTypeErrorは捕捉できません。

$ python3.10 asyncio_gather_except.py
err=ValueError()

次にasyncio.gather()return_exceptions=Trueを指定します。return_exceptions=Trueはすべてのタスクを実行し、例外を含むすべてのタスクの結果をリストで取得します。

async def main():
    """return_exceptions=Trueあり"""
    results = await asyncio.gather(
        coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True)
    print(f"{results=}")

これを実行すると、次のようになります。

$ python3.10 asyncio_gather_except.py
results=['成功', ValueError(), TypeError()]

この場合、例外を処理するには、戻り値のリストをチェックする必要があります。

async def main():
    """return_exceptions=Trueあり"""
    results = await asyncio.gather(
        coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True)
    print(f"{results=}")

    for result in results:
        match result:
            case ValueError():
                print("ValueError")
            case TypeError():
                print("TypeError")

asyncio.gather()ではすべてのタスクを実行しないと、すべての例外を捕捉できないことがわかります。たとえば、10個のHTTPリクエストを同時に行う処理があるとします。最初のタスクに例外が発生した場合、残りの失敗するかもしれない9つが完了するまで待つことになります(結果10個すべてが同じエラーになったとしてもです⁠⁠。

asyncio.gather()での例外処理には、以下の2つの課題があります。

  • asyncio.gather()の例外処理での課題
    • return_exceptions指定なし:同時に発生した例外を1つしか捕捉できない
    • return_exceptions=True指定あり:戻り値のリストをチェックする必要がある。またすべてのタスクを実行しなければならない

これらの課題を解決し読みやすく書けるのがasyncio.TaskGroupです。

asyncio.TaskGroupによる新しい例外処理の書き方

asyncio.TaskGroupは、Python 3.11で追加された新しい構文except*を利用することで、逐次関数の例外処理と同様に書けます。見慣れないexcept*については後述します。

ではasyncio.TaskGroupで同時に発生する例外を捕捉してみましょう。先ほどのasyncio.gather()の例をasyncio.TaskGroupで書き換えてみます。

なお except* で捕捉した場合、 err はExceptionGroupオブジェクトになります。実際に発生した例外は、exceptions属性にタプルで格納されています。

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(coro_success())
            task2 = tg.create_task(coro_value_err())
            task3 = tg.create_task(coro_type_err())
        results = [task1.result(), task2.result(), task3.result()]
    except* ValueError as err:
        print(f"{err.exceptions=}")
    except* TypeError as err:
        print(f"{err.exceptions=}")

実行すると次のような結果になります。複数のタスクから同時に発生する例外を捕捉できました。

$ python3.11 asyncio_taskgroup_except.py
err.exceptions=(ValueError(),)
err.exceptions=(TypeError(),)

同期処理を書くように、直感的に実装できるようになっています。

タスクのキャンセル

さて、asyncio.TaskGroupのもう1つの特徴である、タスクのキャンセルについて見てみましょう。Python 3.10のasyncio.gather()と、Python 3.11のasyncio.TaskGroupをそれぞれ比較してみましょう。

タスクがキャンセルされずに残ってしまうケース

asyncioで複数のタスクを同時に処理する際に例外が送出されると、完了していないタスクが残り意図せず動作してしまっている場合があります。asyncio.gather()を利用し、タスクが残ることを確認してみましょう。

非同期関数のcoro_long()を追加しています。asyncio.sleep()を利用して出力します。

それぞれのタスクの状態を、Taskオブジェクトのインスタンス属性の_stateで確認します。_state「PENDING」⁠FINISHED」⁠CANCELLED」が設定され、作られた時点で「PENDING⁠⁠、終了が「FINISHED⁠⁠、そしてキャンセル完了の「CANCELLED」を示します。

import asyncio

async def coro_success():
    return "成功"

async def coro_value_err():
    raise ValueError

async def coro_long():
    await asyncio.sleep(1)
    print("完了していないタスクが出力しています")
    return "成功?"

async def main():
    try:
        task1 = asyncio.create_task(coro_success())
        task2 = asyncio.create_task(coro_value_err())
        task3 = asyncio.create_task(coro_long(), name="残るコルーチン")  # 分かりやすくするためタスクに名づけ

        results = await asyncio.gather(*[task1, task2, task3])
        print(f"{results=}")
    except ValueError as err:
        print(f"{err=}")

    print(f"タスク1の状態 {task1._state=}")
    print(f"タスク2の状態 {task2._state=}")
    print(f"タスク3の状態 {task3._state=}")
    await asyncio.sleep(1.5)  # 1.5秒待つことでException発生後にcoro_log()が動いていることを確認できる

asyncio.run(main())

実行すると次のような結果になります。追加した非同期関数のcoro_long()PENDINGの状態で残っており、"完了していないタスクが出力しています"が出力されていることから、例外発生後にcoro_long()が動作しているとわかります。

$ python3.10 asyncio_gather_cancel.py
err=ValueError()
タスク1の状態 task1._state='FINISHED'
タスク2の状態 task2._state='FINISHED'
タスク3の状態 task3._state='PENDING'
完了していないタスクが出力しています

このように例外が発生すると、タスクが残ってしまう場合があります。タスクが残ってしまうと、意図せず後続処理の間に動作が完了したり、実装によってはI/Oのコネクションが閉じられないというような可能性が残り、処理の予測が困難です。

Python 3.10以前のタスクのキャンセル

上記のサンプルコードを修正し、タスクのキャンセル処理を追加します。修正のポイントは次の3つです。

  • キャンセルされるタスク(非同期関数)asyncio.CancelledErrorの捕捉と再送出 [3] を追加する
  • 完了していないタスクに対しキャンセルを指示する
  • キャンセルの完了を待つ

まず、キャンセルする非同期関数のcoro_long()asyncio.CancelledErrorの捕捉とキャンセル時の処理を追加します。[4] 次にasyncio.gather()を実行しているmain()にタスクの完了を判定するTask.done()と、タスクのキャンセルを指示するTask.cancel()を追加します。最後にキャンセル処理の完了を待つ必要があるため、ここではasyncio.sleep()を利用し完了を待ちます。


async def coro_long():
    try:
        await asyncio.sleep(1)
        print("完了していないタスクが出力しています")
        return "成功?"
    except asyncio.CancelledError as err:  # キャンセル処理を追加
        print("キャンセルされたタスクが出力しています")
        raise asyncio.CancelledError  # 再送出する

async def main():
    try:
        task1 = asyncio.create_task(coro_success())
        task2 = asyncio.create_task(coro_value_err())
        task3 = asyncio.create_task(coro_long(), name="残るコルーチン")  # 分かりやすくするためタスクに名づけ

        results = await asyncio.gather(*[task1, task2, task3])
        print(f"{results=}")
    except ValueError as err:
        print(f"{err=}")
    print(f"タスク1の状態 {task1._state=}")
    print(f"タスク2の状態 {task2._state=}")
    print(f"タスク3の状態 {task3._state=}")  # この時点ではキャンセルされていない
    for task in [task1, task2, task3]:
        if task.done() is False:
            task.cancel()  # 未完了のタスクをキャンセル
    print(f"タスク3の状態 {task3._state=}")  # この時点ではキャンセル依頼されただけでキャンセルされていない
    await asyncio.sleep(1)  # キャンセル処理完了を待つ必要がある
    print(f"タスク3の状態 {task3._state=}")  # CancelledError内の処理が完了後、キャンセルになる

実行すると次のような結果になります。"完了していないタスクが出力しています"が出力されず、代わりに"キャンセルされたタスクが出力しています"が出力されます。

また、タスクの状態もtask.cancel()実行直後は「PENDING」ですが、asyncio.sleep()を利用しキャンセル処理の完了を待つと「CANCELLED」になり、キャンセルされていることがわかります。

$ python3.10 asyncio_gather_cancel.py
err=ValueError()
タスク1の状態 task1._state='FINISHED'
タスク2の状態 task2._state='FINISHED'
タスク3の状態 task3._state='PENDING'
タスク3の状態 task3._state='PENDING'
キャンセルされたタスクが出力しています
タスク3の状態 task3._state='CANCELLED'

なお、サンプルコードではキャンセル処理の完了待ちに asyncio.sleep() を利用していますが、本来であれば未完了のタスクに対し asyncio.gather() などを再度実行しなければなりません。


    for task in [task1, task2, task3]:
        if task.done() is False:
            task.cancel()  # 未完了のタスクをキャンセル

    # await asyncio.sleep(1)  # キャンセル処理完了を待つ必要がある
    # asyncio.sleep(1)の代わりに、キャンセル完了を待つため asyncio.gather() を実行する
    results = await asyncio.gather(*[task3], return_exceptions=True)

例外発生時にタスクが残ってしまうことを避けるため、キャンセル処理は必要です。サンプルコードと出力からわかるように、asyncio.gather()では終了していないタスクをキャンセルする処理、そしてキャンセル完了を待つ処理の実装が必要です。

asyncio.TaskGroupによるタスクのキャンセル

asyncio.TaskGroupでは、コンテキストマネージャーを抜けるタイミングで未完了のタスクをキャンセルします。例外が発生した場合に開発者は、タスクがキャンセルされたか、またキャンセル処理が完了しているか、を意識する必要がなくなります。

非常にシンプルで読みやすい実装になります。

async def main():
    try:
        async with asyncio.TaskGroup() as g:
            task1 = g.create_task(coro_success())
            task2 = g.create_task(coro_value_err())
            task3 = g.create_task(coro_long(), name="残るコルーチン")
        results = [task1.result(), task2.result(), task3.result()]
        print(f"{results=}")
    except* ValueError as err:
        print(f"{err.exceptions=}")

    print(f"完了していないタスク {task1._state=}")
    print(f"完了していないタスク {task2._state=}")
    print(f"完了していないタスク {task3._state=}")

実行すると、キャンセル処理が実行され、またタスクも「CANCELLED」になっていることがわかります。

$ python3.11 asyncio_taskgroup_cancel.py
キャンセルされたタスクが出力しています
err.exceptions=(ValueError(),)
タスク1の状態 task1._state='FINISHED'
タスク2の状態 task2._state='FINISHED'
タスク3の状態 task3._state='CANCELLED'

まとめ

asyncioはI/Oバウンドな処理を効率的に実行できるため、データベースや外部APIの実行によく使われます。ただ、外部リソースに依存するため、エラーも発生しやすいです。

今回の asyncio.TaskGroup を利用すると、例外処理や例外発生時のキャンセルを非常に読みやすく書けるようになっています。 asyncio.TaskGroup を利用して「予測可能で、より安全」に非同期処理を実装しましょう。

最後に私事ですが、asyncio.TaskGroupとasyncioについてもう少し掘り下げた内容をPyCon JP 2022で紹介しますPython 3.11新機能asyncio.TaskGroup()と2022年asyncioの"Hello-ish world"というトークです⁠⁠。気になる方はぜひ現地でお会いしましょう!

おすすめ記事

記事・ニュース一覧