Celeryとは、Pythonの堅牢で汎用性の高い非同期タスクをより便利にするライブラリです。
Celery基礎知識
Pythonを使った開発では様々なライブラリに出会うことがあります。その中でもCeleryは、分散メッセージパッシングに基づいて動作する非同期タスクキュー/ジョブキューです。このライブラリの主な用途はノンブロッキングタスクが必要なWebアプリケーションです。
簡単なコードスニペットで説明しましょう。処理に時間のかかる関数があると想像してください。
import time
def long_running_task():
print('タスクを開始しました...')
time.sleep(10)
print('タスクが完了しました...')
Celeryを使えば、メインプログラムの実行を妨げずにこの機能をバックグラウンドで実行させることができます。
Celeryとは
Celeryは、Pythonの分散タスクキューフレームワークで複数のプロセスやマシンに作業を分散させることができます。つまり、すぐに実行する必要のないタスクをオフロード(処理の一部を外部に渡すことで処理を分散させること)でき、他のリクエストに対応している間にアプリケーションがブロックされないようにできます。
よりよく理解するために先ほどの関数をCeleryタスクに変換してみましょう。
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def long_running_task():
print('タスクを開始しました...')
time.sleep(10)
print('タスクが完了しました...')
このコードではブローカーでCeleryを初期化し、@app.taskデコレータを使用して関数をCeleryタスクにしました。
Celeryのインストール
Celeryを使い始めるには、Python環境にCeleryをインストールする必要があります。これはPythonのパッケージマネージャであるpipを使用して、 pip install celery
を実行することで簡単に行うことができます。
インストールしたらRabbitMQやRedisなどのメッセージブローカーも必要です。まずはRabbitMQを利用してみましょう。
本記事執筆時にはMacOSを利用しています。その他のOSの場合は
こちらのリンクを参考にインストールしてください。
pip install celery
# MacOSの場合
brew install rabbitmq
# 起動する
brew services start rabbitmq
Celeryがインストールされ、メッセージブローカーが設置されたことで、Pythonで分散アプリケーションを書き始めるための環境が整いました。
最小のCeleryプロジェクト
まず、新しいCeleryプロジェクトのセットアップから始めましょう。まず、 tasks.py
という名前の新しいPythonファイルを作成し、Celeryのアプリケーションとタスクを格納します。Celeryアプリのインスタンスは、Celeryのすべての機能のエントリポイントとして機能するため重要です。
tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
brokerで指定しているURLは、Celeryが通信するメッセージブローカーを指定します。この例ではメッセージブローカーとしてRabbitMQを使用しています。
ブローカーとは
Celeryのプロジェクトでは、メッセージブローカーがメッセージの送受信の媒体として機能します。RabbitMQは最も一般的なオプションの1つです。RabbitMQをブローカーとしてセットアップするには、それがインストールされ、実行されていることを確認します。前述のように、MacOSでは brew install rabbitmq
を使用してRabbitMQをインストールできます。
インストールが完了したら、CeleryインスタンスでRabbitMQをブローカーとして設定できます。
app = Celery('tasks', broker='pyamqp://guest@localhost//')
これにより、RabbitMQが同じマシン(localhost)上で動作していることをCeleryに伝えます。
タスクを作成してみる
Celeryタスクの作成は、関数を定義して@app.taskデコレータを追加することで簡単にできます。このデコレータは、関数がタスクであることをCeleryに伝えます。例えば、以下のようになります。
tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
add.delay(3, 4)
この単純なタスクは2つの数字を足し合わせています。ここで add.delay(3, 4)
を呼び出すと、Celeryは引数3と4で非同期にタスクを実行します。
Celery Workerサーバーを実行する
タスクの実行を開始するには、Celeryのワーカープロセスを起動する必要があります。プロジェクトディレクトリのコマンドラインから、以下のコマンドを実行することでCeleryワーカーを起動できます。
celery -A tasks worker --loglevel=info
このコマンドはtasksモジュールでCeleryワーカープロセスを起動します(tasksはPythonモジュールの名前と一致するように変更します)。 loglevel=info
オプションは、デバッグのためにINFOレベルでログを記録するようワーカーに指示します。これでワーカーはタスクを受け取り実行する準備ができました。
先に挙げた tasks.py
を実行すると以下のようなログが表示されます。(一番右の 7
が実際に実行された結果です)
Task tasks.add[7ca66891-3ad5-4b20-b0b3-cf16e5ef4059] succeeded in 0.0005504169967025518s: 7
Celeryの機能を深堀りする
Celeryには様々な機能があります。例えば、タスクの優先度をサポートしており、キューにタスクのバックログがある場合にどのタスクを最初に実行するかを指定できます。タスクの優先度を設定するには、apply_async()を呼び出す際に priority
引数を使用します。
add.apply_async((3, 4), priority=10)
さらにCeleryはタスクのリトライもサポートしています。タスクが失敗した場合、指定した期間後に実行を再試行するように設定できます。
タスクの状態を理解しワーカーを検査する
タスクの状態は、保留、開始、成功、失敗など、タスクのライフサイクルに関する有用な情報を提供します。タスクの状態はstate属性を使って確認できます。一方、ワーカーを検査することで、ワーカープロセス、登録されたタスク、完了したタスクなどを監視できます。 app.control.inspect()
メソッドを使用してinspectインスタンスを作成します。
inspection.py
from celery import Celery
app = Celery('inspector', broker='pyamqp://guest@localhost//')
inspector = app.control.inspect()
registerd = inspector.registered()
for node, tasks in registerd.items():
print(node, tasks) # 登録済みタスクを出力
これにより、すべての登録済ワーカーとタスクに関する情報を得ることができます。
Celery Beatと周期的なタスクの探索
Celery Beatは、定期的にタスクを実行できるスケジューラです。例えば、定期的に古いデータを消去したり、レポートを作成したりするのに使用できます。Celery Beatを使用するにはスケジュールを定義する必要があります。これは別のPythonファイル(例: scheduler.py
)で行うことができます。
scheduler.py
from celery import Celery
from datetime import timedelta
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.beat_schedule = {
'add-every-10-seconds': {
'task': 'tasks.add',
'schedule': timedelta(seconds=10),
'args': (3, 4)
},
}
先ほどのtasksワーカーを立ち上げつつ、スケジューラを以下のコマンドで起動することで、引数3と4で10秒ごとにaddタスクを実行します。
Celeryにおけるタスクの再試行とエラー処理
タスクが例外をスローした場合、Celeryは自動的にタスクをリトライできます。再試行の最大回数と再試行の間隔を指定できます。例えば以下のような感じです。
@app.task(bind=True, max_retries=3)
def add(self, x, y):
try:
return x + y
except Exception as e:
raise self.retry(exc=e, countdown=10) # 10秒ごとにリトライ
上記の例ではタスクが失敗した場合、1分後に再試行されます。タスクは最大3回まで試行されます。このようにタスクをより弾力的にし、永久に失敗する可能性を低くできます。
CeleryがPython開発者の必修科目である理由
Celeryは、Python開発者、特に信頼性が高く効率的なバックグラウンドタスク処理を必要とするWebアプリケーションやサービスを扱う開発者にとって必須のツールです。分散タスクキューを管理する機能により、開発者は重い処理や時間のかかる処理をメインアプリケーションスレッドからオフロードできます。
これにより、タスクが非同期で実行されるため、スムーズなユーザーエクスペリエンスを実現し、アプリケーションのレスポンスと軽快さを維持できます。
例えば、Webアプリケーションでは、ユーザーが新規登録したときにウェルカムメールを送信したいケースがあります。このとき、メールの送信中にユーザーを待たせるのは避けたいところです。Celeryを使えば、このタスクをワーカーへ委譲し、ユーザーにすぐレスポンスを返すことができます。
from celery import Celery
app = Celery('my_app')
@app.task
def send_welcome_email(user_id):
# 省略(実際にメールをユーザーに送る処理をここに書く)
pass
# ユーザー登録時
user_id = '123'
send_welcome_email.delay(user_id)
Celeryを学ぶことは、技術的な優位性をもたらすだけでなくPython開発者のツールキットに貴重なスキルを追加することにもなります。Celeryを習得することで、高負荷に対応できるスケーラブルで堅牢なアプリケーションを構築できるようになり、ユーザーやクライアントにとって魅力的なプロジェクトとなります。もしあなたがPythonで本格的な仕事をするつもりなら、Celeryは絶対に知っておくべきライブラリです。