Python 3.11.2
Celeryとは、Pythonの堅牢で汎用性の高い非同期タスクをより便利にするライブラリです。
Pythonを使った開発では様々なライブラリに出会うことがあります。その中でもCeleryは、分散メッセージパッシングに基づいて動作する非同期タスクキュー/ジョブキューです。このライブラリの主な用途はノンブロッキングタスクが必要なWebアプリケーションです。
簡単なコードスニペットで説明しましょう。処理に時間のかかる関数があると想像してください。
Celeryを使えば、メインプログラムの実行を妨げずにこの機能をバックグラウンドで実行させることができます。
Celeryは、Pythonの分散タスクキューフレームワークで複数のプロセスやマシンに作業を分散させることができます。つまり、すぐに実行する必要のないタスクをオフロード(処理の一部を外部に渡すことで処理を分散させること)でき、他のリクエストに対応している間にアプリケーションがブロックされないようにできます。
よりよく理解するために先ほどの関数をCeleryタスクに変換してみましょう。
このコードではブローカーでCeleryを初期化し、@app.taskデコレータを使用して関数をCeleryタスクにしました。
Celeryを使い始めるには、Python環境にCeleryをインストールする必要があります。これはPythonのパッケージマネージャであるpipを使用して、 pip install celery
を実行することで簡単に行うことができます。
インストールしたらRabbitMQやRedisなどのメッセージブローカーも必要です。まずはRabbitMQを利用してみましょう。
本記事執筆時にはMacOSを利用しています。その他のOSの場合はこちらのリンクを参考にインストールしてください。
Celeryがインストールされ、メッセージブローカーが設置されたことで、Pythonで分散アプリケーションを書き始めるための環境が整いました。
まず、新しいCeleryプロジェクトのセットアップから始めましょう。まず、 tasks.py
という名前の新しいPythonファイルを作成し、Celeryのアプリケーションとタスクを格納します。Celeryアプリのインスタンスは、Celeryのすべての機能のエントリポイントとして機能するため重要です。
brokerで指定しているURLは、Celeryが通信するメッセージブローカーを指定します。この例ではメッセージブローカーとしてRabbitMQを使用しています。
Celeryのプロジェクトでは、メッセージブローカーがメッセージの送受信の媒体として機能します。RabbitMQは最も一般的なオプションの1つです。RabbitMQをブローカーとしてセットアップするには、それがインストールされ、実行されていることを確認します。前述のように、MacOSでは brew install rabbitmq
を使用してRabbitMQをインストールできます。
インストールが完了したら、CeleryインスタンスでRabbitMQをブローカーとして設定できます。
これにより、RabbitMQが同じマシン(localhost)上で動作していることをCeleryに伝えます。
Celeryタスクの作成は、関数を定義して@app.taskデコレータを追加することで簡単にできます。このデコレータは、関数がタスクであることをCeleryに伝えます。例えば、以下のようになります。
この単純なタスクは2つの数字を足し合わせています。ここで add.delay(3, 4)
を呼び出すと、Celeryは引数3と4で非同期にタスクを実行します。
タスクの実行を開始するには、Celeryのワーカープロセスを起動する必要があります。プロジェクトディレクトリのコマンドラインから、以下のコマンドを実行することでCeleryワーカーを起動できます。
このコマンドはtasksモジュールでCeleryワーカープロセスを起動します(tasksはPythonモジュールの名前と一致するように変更します)。 loglevel=info
オプションは、デバッグのためにINFOレベルでログを記録するようワーカーに指示します。これでワーカーはタスクを受け取り実行する準備ができました。
先に挙げた tasks.py
を実行すると以下のようなログが表示されます。(一番右の 7
が実際に実行された結果です)
Celeryには様々な機能があります。例えば、タスクの優先度をサポートしており、キューにタスクのバックログがある場合にどのタスクを最初に実行するかを指定できます。タスクの優先度を設定するには、apply_async()を呼び出す際に priority
引数を使用します。
さらにCeleryはタスクのリトライもサポートしています。タスクが失敗した場合、指定した期間後に実行を再試行するように設定できます。
タスクの状態は、保留、開始、成功、失敗など、タスクのライフサイクルに関する有用な情報を提供します。タスクの状態はstate属性を使って確認できます。一方、ワーカーを検査することで、ワーカープロセス、登録されたタスク、完了したタスクなどを監視できます。 app.control.inspect()
メソッドを使用してinspectインスタンスを作成します。
これにより、すべての登録済ワーカーとタスクに関する情報を得ることができます。
Celery Beatは、定期的にタスクを実行できるスケジューラです。例えば、定期的に古いデータを消去したり、レポートを作成したりするのに使用できます。Celery Beatを使用するにはスケジュールを定義する必要があります。これは別のPythonファイル(例: scheduler.py
)で行うことができます。
先ほどのtasksワーカーを立ち上げつつ、スケジューラを以下のコマンドで起動することで、引数3と4で10秒ごとにaddタスクを実行します。
タスクが例外をスローした場合、Celeryは自動的にタスクをリトライできます。再試行の最大回数と再試行の間隔を指定できます。例えば以下のような感じです。
上記の例ではタスクが失敗した場合、1分後に再試行されます。タスクは最大3回まで試行されます。このようにタスクをより弾力的にし、永久に失敗する可能性を低くできます。
Celeryは、Python開発者、特に信頼性が高く効率的なバックグラウンドタスク処理を必要とするWebアプリケーションやサービスを扱う開発者にとって必須のツールです。分散タスクキューを管理する機能により、開発者は重い処理や時間のかかる処理をメインアプリケーションスレッドからオフロードできます。
これにより、タスクが非同期で実行されるため、スムーズなユーザーエクスペリエンスを実現し、アプリケーションのレスポンスと軽快さを維持できます。
例えば、Webアプリケーションでは、ユーザーが新規登録したときにウェルカムメールを送信したいケースがあります。このとき、メールの送信中にユーザーを待たせるのは避けたいところです。Celeryを使えば、このタスクをワーカーへ委譲し、ユーザーにすぐレスポンスを返すことができます。
Celeryを学ぶことは、技術的な優位性をもたらすだけでなくPython開発者のツールキットに貴重なスキルを追加することにもなります。Celeryを習得することで、高負荷に対応できるスケーラブルで堅牢なアプリケーションを構築できるようになり、ユーザーやクライアントにとって魅力的なプロジェクトとなります。もしあなたがPythonで本格的な仕事をするつもりなら、Celeryは絶対に知っておくべきライブラリです。