フリーキーズ | 独学プログラミング

FastAPIでJWT認証を実装する

最終更新日
【画像】FastAPIでJWT認証を実装する

このコンテンツではFastAPIでWebアプリケーションによく使われるJWT認証を実装する方法を紹介します。

必要なライブラリのインストール

前提として、PythonやFastAPIにある程度慣れ親しんでいるエンジニアが対象です。
FastAPIの細かな仕様については触れません。

まずは、必要なライブラリをインストールしましょう。

pip install fastapi uvicorn python-multipart python-jose\[cryptography\]

通常のFastAPIアプリケーションへ必要なライブラリ以外にpython-multipartpython-joseのインストールが必要です。

ベースとなるアプリケーション

まずは簡単なFastAPIアプリケーションを作成しましょう。
今回はapp.pyを作っていきます。

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def root():
    return {"message": "Hello World"}

この状態で以下のコマンドを実行することで、ブラウザでhttp://localhost:8000を開くとアプリケーションの立ち上がりが確認できます。

uvicorn app:app --reload

ディレクトリ構成

ベースとなるアプリケーションができたところで、JWTに必要なファイル群とディレクトリ構成を説明します。

.
├── app.py # アプリケーションルート
├── middlewares
│   ├── __init__.py
│   └── auth_middleware.py # 認証バックエンドミドルウェア。認証情報の照合をする。
├── routes
│   └── auth.py # ログインAPIやトークン生成APIの入り口。
└── utils
    ├── auth.py # 認証情報の構成定義。
    ├── dependency.py # ルーティングの認証依存。
    └── jwt.py # クレームセット作成とエンコーダー・デコーダー。

頑張れば1つのファイルにまとめられますが、実際にプロジェクトを開始した場合、こういった形でいくつかのファイルに分けておくと使い勝手が良いです。

JWT認証実装の流れ

慣れていないとJWT認証は何をどう実装すればいいかわかりづらいため、実装の流れを列挙します。

  1. ルーティングを実装
  2. JWTクレームセットの生成やアクセストークンの生成ロジック実装
  3. ミドルウェア実装
  4. ルーティング依存ロジック実装

これで最低限の機能が実装できます。

1. ルーティングを実装

それではルーティングを実装していきましょう。まずは、routes/auth.pyを作成します。

from fastapi import APIRouter

router = APIRouter()


@router.post("/token")
async def token():
    pass

次に、このルーターを最初に作成したapp.pyで読み込みます。

from fastapi import FastAPI

from routes.auth import router as auth_router

app = FastAPI()


@app.get("/")
def root():
    return {"message": "Hello World"}


app.include_router(auth_router, prefix="/auth", tags=["Auth"])

こうすることで、http://localhost:8000/authをプレフィックスとしたURLルーティングが作成できます。

2. 認証情報ロジック実装

次に、utils/jwt.pyを作成しましょう。このjwt.pyでJWTのクレームセットと呼ばれるJSONのデータセットを出力させます。

from datetime import datetime, timedelta
from typing import Any, Dict

from jose import jwt

TYPE_ACCESS_TOKEN = "access_token"
JWT_ALGORITHM = "HS256"
SECRET_KEY="secret"


def jwt_claims(user: Any, token_type: str = "") -> Dict[str, Any]:
    claim_set = {"token_type": token_type, "user_id": user.id}

    if claim_set["token_type"] == TYPE_ACCESS_TOKEN:
        claim_set["exp"] = datetime.utcnow() + timedelta(minutes=60 * 24 * 7)

    return claim_set


def jwt_encode(claim_set: Dict[str, Any]) -> str:
    return jwt.encode(claim_set, SECRET_KEY, algorithm=JWT_ALGORITHM)


def jwt_decode(token: str) -> Dict[str, Any]:
    return jwt.decode(token, SECRET_KEY, algorithms=[JWT_ALGORITHM])

ここでJWTのエンコーダーとデコーダーも実装しておきます。
token_typeを引数としている理由は、このコンテンツでは紹介しませんが、リフレッシュトークンという期限切れを解消するタイプのトークンを発行するときのためです。

この認証情報ロジックをルーティングで呼び出します。

from fastapi import APIRouter, Form, Request

from utils.jwt import TYPE_ACCESS_TOKEN, jwt_claims, jwt_encode

router = APIRouter()


class User(object):
    id: int
    username: str
    password: str


@router.post("/token")
async def token(
    req: Request, username: str = Form(...), password: str = Form(...)
) -> dict[str, str]:
    # ここでユーザーを認証するが、テストのためにオブジェクトを作成
    user = User()
    user.id = 100
    user.username = 'kt2763'
    user.password = 'password'

    if user is None:
        return {}

    claim_set = jwt_claims(user, token_type=TYPE_ACCESS_TOKEN)

    return {"access_token": jwt_encode(claim_set), "token_type": TYPE_ACCESS_TOKEN}

ここまで実装できると、このhttp://localhost:8000/auth/tokenルーティングが確認できます。以下はPostmanでAPIを確認した例です。(認証は含まれていないのでusernamepasswordはなんでもOK)

アクセストークン取得例

アクセストークン取得例

3. ミドルウェア実装

そしてここからミドルウェアの実装です。このあたりは若干複雑となっているので逐一説明していきます。

まずは、utils/auth.pyに認証ユーザーと未認証ユーザーの定義をしましょう。FastAPIと共にインストールされるstarletteにベースがあるので使いましょう。

from typing import Any

from starlette import authentication


class AuthenticatedUser(authentication.SimpleUser):
    def __init__(self, user: Any) -> None:
        self.id = user.id
        self.username = user.username


class UnauthenticatedUser(authentication.UnauthenticatedUser):
    pass

そして、この定義を使ってミドルウェアを作っていきます。

from typing import Optional, Tuple

from fastapi import Request
from fastapi.security.utils import get_authorization_scheme_param
from jose.jwt import ExpiredSignatureError, JWTError
from starlette.authentication import AuthCredentials, BaseUser
from starlette.middleware import authentication

from utils.auth import AuthenticatedUser, UnauthenticatedUser
from utils.jwt import jwt_decode


class User(object):
    id: int
    username: str
    password: str


class AuthenticationBackend(authentication.AuthenticationBackend):
    async def authenticate(
        self, request: Request
    ) -> Optional[Tuple["AuthCredentials", "BaseUser"]]:
        authorization: str = request.headers.get("Authorization")
        scheme, access_token = get_authorization_scheme_param(authorization)

        if not authorization or scheme.lower() != "bearer":
            return (
                authentication.AuthCredentials(["unauthenticated"]),
                UnauthenticatedUser(),
            )

        try:
            payload = jwt_decode(access_token)
        except ExpiredSignatureError:
            return (
                authentication.AuthCredentials(["unauthenticated"]),
                UnauthenticatedUser(),
            )
        except JWTError:
            return (
                authentication.AuthCredentials(["unauthenticated"]),
                UnauthenticatedUser(),
            )
        except Exception as e:
            return (
                authentication.AuthCredentials(["unauthenticated"]),
                UnauthenticatedUser(),
            )

        # ここでユーザー情報をIDから取得する
        user = User()
        user.id = payload["user_id"]
        user.username = 'kt2763'

        if user is None:
            return (
                authentication.AuthCredentials(["unauthenticated"]),
                UnauthenticatedUser(),
            )

        return authentication.AuthCredentials(["authenticated"]), AuthenticatedUser(
            user
        )

ここで何をしているか下記に列挙します。

  1. リクエストのヘッダーに含まれるAuthorizationを取得し、データ化
  2. Bearerが含まれていない、期限切れ、JWTとして不適切、その他エラーの場合は認証不可
  3. データ化されたトークンに含まれるユーザーIDからユーザーデータを取得
  4. ユーザーデータがなければ認証不可
  5. ユーザーデータが正しく取得できれば認証ユーザーを返す

以上となります。

そして、このミドルウェアをapp.pyで呼び出しましょう。事前準備として、middlewares/__init__.pyを作成してインポートできるようにします。

from .auth_middleware import *

次にapp.pyの編集です。

from fastapi import FastAPI
from starlette.middleware.authentication import AuthenticationMiddleware # 追加

from middlewares import AuthenticationBackend # 追加
from routes.auth import router as auth_router

app = FastAPI()

app.add_middleware(AuthenticationMiddleware, backend=AuthenticationBackend()) # 追加


@app.get("/")
def root():
    return {"message": "Hello World"}


app.include_router(auth_router, prefix="/auth", tags=["Auth"])

ミドルウェアとしての機能は備わったものの、これだけではリクエストに対して認証は機能しません。

4. ルーティング依存ロジック実装

最後に、ルーティングに対して依存関係を実装しましょう。この依存関係をルーティングごとに設定できることで、ログインAPIなどの「認証していないユーザーがリクエストしたいルーティング」が有効となります。

まずは作成したutils/auth.pyにFastAPIのリクエストを利用したアクセストークンを返すクラスを実装しましょう。

from typing import Any

from fastapi import HTTPException, Request, security, status # 追加
from fastapi.security.utils import get_authorization_scheme_param # 追加
from starlette import authentication


class OAuth2PasswordBearer(security.OAuth2PasswordBearer):
    async def __call__(self, req: Request) -> str | None:
        authorization: str = req.headers.get("Authorization")
        scheme, access_token = get_authorization_scheme_param(authorization)

        if not authorization or scheme.lower() != "bearer":
            if self.auto_error:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None

        return access_token


class AuthenticatedUser(authentication.SimpleUser):
    def __init__(self, user: Any) -> None:
        self.id = user.id
        self.username = user.username


class UnauthenticatedUser(authentication.UnauthenticatedUser):
    pass

次に依存注入ロジックを実装します。utils/dependency.pyを作成しましょう。

from fastapi import Depends, HTTPException, Request, status

from utils.auth import OAuth2PasswordBearer

OAUTH2_SCHEME = OAuth2PasswordBearer(tokenUrl="/auth/token")


async def login_required(request: Request, token: str = Depends(OAUTH2_SCHEME)) -> None:
    if not request.user.is_authenticated:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Not authenticated",
            headers={"WWW-Authenticate": "Bearer"},
        )

この依存ロジックをルーティングに設定することで、トークンを持ったリクエストでなければ受け付けないようになります。
app.pyに設定してみましょう。

from fastapi import Depends, FastAPI
from starlette.middleware.authentication import AuthenticationMiddleware

from middlewares import AuthenticationBackend
from routes.auth import router as auth_router
from utils.dependency import login_required

app = FastAPI()

app.add_middleware(AuthenticationMiddleware, backend=AuthenticationBackend())


@app.get("/", dependencies=[Depends(login_required)])
def root():
    return {"message": "Hello World"}


app.include_router(auth_router, prefix="/auth", tags=["Auth"])

ここで実際に動作確認してみます。ブラウザでhttp://localhost:8000/にアクセスすると、以下のように表示されます。

{
  "detail": "Not authenticated"
}

次に、成功した場合として先ほど動作確認したhttp://localhost:8000/auth/tokenで取得したアクセストークンをヘッダーに含めてPOSTリクエストしてみます。以下のように、ヘッダーにAuthorizationとアクセストークンを含めてリクエストしてみます。

認証付きリクエスト

認証付きリクエスト

正しくレスポンスが得られたので成功です。

JWT認証は身につけておくべきスキル

ここまで、FastAPI + JWTの実装を見てきました。
JWT認証はWebアプリケーション開発系のバックエンドエンジニアは実装する機会が多いため身につけておきたいスキルです。より深くJWTの構造理解をしたい場合は、以下のJWT仕様について書かれた以下のページを参考にすると良いでしょう。

理解を深めればセキュリティへの意識も高まるので、積極的に手を動かしながら学んでいきたいところです。

関連するコンテンツ