フリーキーズ | 独学プログラミング

非同期タスクキューに便利なPython Celeryの使い方

最終更新日
実行時の環境

Python 3.11.2

Celeryとは、Pythonの堅牢で汎用性の高い非同期タスクをより便利にするライブラリです。

Celery基礎知識

Pythonを使った開発では様々なライブラリに出会うことがあります。その中でもCeleryは、分散メッセージパッシングに基づいて動作する非同期タスクキュー/ジョブキューです。このライブラリの主な用途はノンブロッキングタスクが必要なWebアプリケーションです。

簡単なコードスニペットで説明しましょう。処理に時間のかかる関数があると想像してください。

import time

def long_running_task():
    print('タスクを開始しました...')
    time.sleep(10)
    print('タスクが完了しました...')

Celeryを使えば、メインプログラムの実行を妨げずにこの機能をバックグラウンドで実行させることができます。

Celeryとは

Celeryは、Pythonの分散タスクキューフレームワークで複数のプロセスやマシンに作業を分散させることができます。つまり、すぐに実行する必要のないタスクをオフロード(処理の一部を外部に渡すことで処理を分散させること)でき、他のリクエストに対応している間にアプリケーションがブロックされないようにできます。

よりよく理解するために先ほどの関数をCeleryタスクに変換してみましょう。

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def long_running_task():
    print('タスクを開始しました...')
    time.sleep(10)
    print('タスクが完了しました...')

このコードではブローカーでCeleryを初期化し、@app.taskデコレータを使用して関数をCeleryタスクにしました。

Celeryのインストール

Celeryを使い始めるには、Python環境にCeleryをインストールする必要があります。これはPythonのパッケージマネージャであるpipを使用して、 pip install celery を実行することで簡単に行うことができます。

インストールしたらRabbitMQやRedisなどのメッセージブローカーも必要です。まずはRabbitMQを利用してみましょう。

本記事執筆時にはMacOSを利用しています。その他のOSの場合はこちらのリンクを参考にインストールしてください。

pip install celery

# MacOSの場合
brew install rabbitmq
# 起動する
brew services start rabbitmq

Celeryがインストールされ、メッセージブローカーが設置されたことで、Pythonで分散アプリケーションを書き始めるための環境が整いました。

最小のCeleryプロジェクト

まず、新しいCeleryプロジェクトのセットアップから始めましょう。まず、 tasks.py という名前の新しいPythonファイルを作成し、Celeryのアプリケーションとタスクを格納します。Celeryアプリのインスタンスは、Celeryのすべての機能のエントリポイントとして機能するため重要です。

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

brokerで指定しているURLは、Celeryが通信するメッセージブローカーを指定します。この例ではメッセージブローカーとしてRabbitMQを使用しています。

ブローカーとは

Celeryのプロジェクトでは、メッセージブローカーがメッセージの送受信の媒体として機能します。RabbitMQは最も一般的なオプションの1つです。RabbitMQをブローカーとしてセットアップするには、それがインストールされ、実行されていることを確認します。前述のように、MacOSでは brew install rabbitmq を使用してRabbitMQをインストールできます。

インストールが完了したら、CeleryインスタンスでRabbitMQをブローカーとして設定できます。

app = Celery('tasks', broker='pyamqp://guest@localhost//')

これにより、RabbitMQが同じマシン(localhost)上で動作していることをCeleryに伝えます。

タスクを作成してみる

Celeryタスクの作成は、関数を定義して@app.taskデコレータを追加することで簡単にできます。このデコレータは、関数がタスクであることをCeleryに伝えます。例えば、以下のようになります。

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

add.delay(3, 4)

この単純なタスクは2つの数字を足し合わせています。ここで add.delay(3, 4) を呼び出すと、Celeryは引数3と4で非同期にタスクを実行します。

Celery Workerサーバーを実行する

タスクの実行を開始するには、Celeryのワーカープロセスを起動する必要があります。プロジェクトディレクトリのコマンドラインから、以下のコマンドを実行することでCeleryワーカーを起動できます。

celery -A tasks worker --loglevel=info

このコマンドはtasksモジュールでCeleryワーカープロセスを起動します(tasksはPythonモジュールの名前と一致するように変更します)。 loglevel=info オプションは、デバッグのためにINFOレベルでログを記録するようワーカーに指示します。これでワーカーはタスクを受け取り実行する準備ができました。

先に挙げた tasks.py を実行すると以下のようなログが表示されます。(一番右の 7 が実際に実行された結果です)

Task tasks.add[7ca66891-3ad5-4b20-b0b3-cf16e5ef4059] succeeded in 0.0005504169967025518s: 7

Celeryの機能を深堀りする

Celeryには様々な機能があります。例えば、タスクの優先度をサポートしており、キューにタスクのバックログがある場合にどのタスクを最初に実行するかを指定できます。タスクの優先度を設定するには、apply_async()を呼び出す際に priority 引数を使用します。

add.apply_async((3, 4), priority=10)

さらにCeleryはタスクのリトライもサポートしています。タスクが失敗した場合、指定した期間後に実行を再試行するように設定できます。

タスクの状態を理解しワーカーを検査する

タスクの状態は、保留、開始、成功、失敗など、タスクのライフサイクルに関する有用な情報を提供します。タスクの状態はstate属性を使って確認できます。一方、ワーカーを検査することで、ワーカープロセス、登録されたタスク、完了したタスクなどを監視できます。 app.control.inspect() メソッドを使用してinspectインスタンスを作成します。

from celery import Celery

app = Celery('inspector', broker='pyamqp://guest@localhost//')

inspector = app.control.inspect()

registerd = inspector.registered()
for node, tasks in registerd.items():
    print(node, tasks) # 登録済みタスクを出力

これにより、すべての登録済ワーカーとタスクに関する情報を得ることができます。

Celery Beatと周期的なタスクの探索

Celery Beatは、定期的にタスクを実行できるスケジューラです。例えば、定期的に古いデータを消去したり、レポートを作成したりするのに使用できます。Celery Beatを使用するにはスケジュールを定義する必要があります。これは別のPythonファイル(例: scheduler.py )で行うことができます。

from celery import Celery
from datetime import timedelta

app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.beat_schedule = {
    'add-every-10-seconds': {
        'task': 'tasks.add',
        'schedule': timedelta(seconds=10),
        'args': (3, 4)
    },
}

先ほどのtasksワーカーを立ち上げつつ、スケジューラを以下のコマンドで起動することで、引数3と4で10秒ごとにaddタスクを実行します。

celery -A scheduler beat

Celeryにおけるタスクの再試行とエラー処理

タスクが例外をスローした場合、Celeryは自動的にタスクをリトライできます。再試行の最大回数と再試行の間隔を指定できます。例えば以下のような感じです。

@app.task(bind=True, max_retries=3)
def add(self, x, y):
    try:
        return x + y
    except Exception as e:
        raise self.retry(exc=e, countdown=10)  # 10秒ごとにリトライ

上記の例ではタスクが失敗した場合、1分後に再試行されます。タスクは最大3回まで試行されます。このようにタスクをより弾力的にし、永久に失敗する可能性を低くできます。

CeleryがPython開発者の必修科目である理由

Celeryは、Python開発者、特に信頼性が高く効率的なバックグラウンドタスク処理を必要とするWebアプリケーションやサービスを扱う開発者にとって必須のツールです。分散タスクキューを管理する機能により、開発者は重い処理や時間のかかる処理をメインアプリケーションスレッドからオフロードできます。
これにより、タスクが非同期で実行されるため、スムーズなユーザーエクスペリエンスを実現し、アプリケーションのレスポンスと軽快さを維持できます。

例えば、Webアプリケーションでは、ユーザーが新規登録したときにウェルカムメールを送信したいケースがあります。このとき、メールの送信中にユーザーを待たせるのは避けたいところです。Celeryを使えば、このタスクをワーカーへ委譲し、ユーザーにすぐレスポンスを返すことができます。

from celery import Celery

app = Celery('my_app')

@app.task
def send_welcome_email(user_id):
    # 省略(実際にメールをユーザーに送る処理をここに書く)
    pass

# ユーザー登録時
user_id = '123'
send_welcome_email.delay(user_id)

Celeryを学ぶことは、技術的な優位性をもたらすだけでなくPython開発者のツールキットに貴重なスキルを追加することにもなります。Celeryを習得することで、高負荷に対応できるスケーラブルで堅牢なアプリケーションを構築できるようになり、ユーザーやクライアントにとって魅力的なプロジェクトとなります。もしあなたがPythonで本格的な仕事をするつもりなら、Celeryは絶対に知っておくべきライブラリです。

関連するコンテンツ