$ pip install fastapi uvicorn python-multipart python-jose\[cryptography\]
python-multipart
とpython-jose
のインストールが必要です。app.py
を作っていきます。from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def root():
return {"message": "Hello World"}
http://localhost:8000
を開くとアプリケーションの立ち上がりが確認できます。$ uvicorn app:app --reload
.
├── app.py # アプリケーションルート
├── middlewares
│ ├── __init__.py
│ └── auth_middleware.py # 認証バックエンドミドルウェア。認証情報の照合をする。
├── routes
│ └── auth.py # ログインAPIやトークン生成APIの入り口。
└── utils
├── auth.py # 認証情報の構成定義。
├── dependency.py # ルーティングの認証依存。
└── jwt.py # クレームセット作成とエンコーダー・デコーダー。
routes/auth.py
を作成します。from fastapi import APIRouter
router = APIRouter()
@router.post("/token")
async def token():
pass
app.py
で読み込みます。from fastapi import FastAPI
from routes.auth import router as auth_router
app = FastAPI()
@app.get("/")
def root():
return {"message": "Hello World"}
app.include_router(auth_router, prefix="/auth", tags=["Auth"])
http://localhost:8000/auth
をプレフィックスとしたURLルーティングが作成できます。utils/jwt.py
を作成しましょう。このjwt.py
でJWTのクレームセットと呼ばれるJSONのデータセットを出力させます。from datetime import datetime, timedelta
from typing import Any, Dict
from jose import jwt
TYPE_ACCESS_TOKEN = "access_token"
JWT_ALGORITHM = "HS256"
SECRET_KEY="secret"
def jwt_claims(user: Any, token_type: str = "") -> Dict[str, Any]:
claim_set = {"token_type": token_type, "user_id": user.id}
if claim_set["token_type"] == TYPE_ACCESS_TOKEN:
claim_set["exp"] = datetime.utcnow() + timedelta(minutes=60 * 24 * 7)
return claim_set
def jwt_encode(claim_set: Dict[str, Any]) -> str:
return jwt.encode(claim_set, SECRET_KEY, algorithm=JWT_ALGORITHM)
def jwt_decode(token: str) -> Dict[str, Any]:
return jwt.decode(token, SECRET_KEY, algorithms=[JWT_ALGORITHM])
token_type
を引数としている理由は、このコンテンツでは紹介しませんが、リフレッシュトークンという期限切れを解消するタイプのトークンを発行するときのためです。from fastapi import APIRouter, Form, Request
from utils.jwt import TYPE_ACCESS_TOKEN, jwt_claims, jwt_encode
router = APIRouter()
class User(object):
id: int
username: str
password: str
@router.post("/token")
async def token(
req: Request, username: str = Form(...), password: str = Form(...)
) -> dict[str, str]:
# ここでユーザーを認証するが、テストのためにオブジェクトを作成
user = User()
user.id = 100
user.username = 'kt2763'
user.password = 'password'
if user is None:
return {}
claim_set = jwt_claims(user, token_type=TYPE_ACCESS_TOKEN)
return {"access_token": jwt_encode(claim_set), "token_type": TYPE_ACCESS_TOKEN}
http://localhost:8000/auth/token
ルーティングが確認できます。以下はPostmanでAPIを確認した例です。(認証は含まれていないのでusername
、password
はなんでもOK)utils/auth.py
に認証ユーザーと未認証ユーザーの定義をしましょう。FastAPIと共にインストールされるstarlette
にベースがあるので使いましょう。from typing import Any
from starlette import authentication
class AuthenticatedUser(authentication.SimpleUser):
def __init__(self, user: Any) -> None:
self.id = user.id
self.username = user.username
class UnauthenticatedUser(authentication.UnauthenticatedUser):
pass
from typing import Optional, Tuple
from fastapi import Request
from fastapi.security.utils import get_authorization_scheme_param
from jose.jwt import ExpiredSignatureError, JWTError
from starlette.authentication import AuthCredentials, BaseUser
from starlette.middleware import authentication
from utils.auth import AuthenticatedUser, UnauthenticatedUser
from utils.jwt import jwt_decode
class User(object):
id: int
username: str
password: str
class AuthenticationBackend(authentication.AuthenticationBackend):
async def authenticate(
self, request: Request
) -> Optional[Tuple["AuthCredentials", "BaseUser"]]:
authorization: str = request.headers.get("Authorization")
scheme, access_token = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
return (
authentication.AuthCredentials(["unauthenticated"]),
UnauthenticatedUser(),
)
try:
payload = jwt_decode(access_token)
except ExpiredSignatureError:
return (
authentication.AuthCredentials(["unauthenticated"]),
UnauthenticatedUser(),
)
except JWTError:
return (
authentication.AuthCredentials(["unauthenticated"]),
UnauthenticatedUser(),
)
except Exception as e:
return (
authentication.AuthCredentials(["unauthenticated"]),
UnauthenticatedUser(),
)
# ここでユーザー情報をIDから取得する
user = User()
user.id = payload["user_id"]
user.username = 'kt2763'
if user is None:
return (
authentication.AuthCredentials(["unauthenticated"]),
UnauthenticatedUser(),
)
return authentication.AuthCredentials(["authenticated"]), AuthenticatedUser(
user
)
Authorization
を取得し、データ化Bearer
が含まれていない、期限切れ、JWTとして不適切、その他エラーの場合は認証不可app.py
で呼び出しましょう。事前準備として、middlewares/__init__.py
を作成してインポートできるようにします。from .auth_middleware import *
app.py
の編集です。from fastapi import FastAPI
from starlette.middleware.authentication import AuthenticationMiddleware # 追加
from middlewares import AuthenticationBackend # 追加
from routes.auth import router as auth_router
app = FastAPI()
app.add_middleware(AuthenticationMiddleware, backend=AuthenticationBackend()) # 追加
@app.get("/")
def root():
return {"message": "Hello World"}
app.include_router(auth_router, prefix="/auth", tags=["Auth"])
utils/auth.py
にFastAPIのリクエストを利用したアクセストークンを返すクラスを実装しましょう。from typing import Any
from fastapi import HTTPException, Request, security, status # 追加
from fastapi.security.utils import get_authorization_scheme_param # 追加
from starlette import authentication
class OAuth2PasswordBearer(security.OAuth2PasswordBearer):
async def __call__(self, req: Request) -> str | None:
authorization: str = req.headers.get("Authorization")
scheme, access_token = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return access_token
class AuthenticatedUser(authentication.SimpleUser):
def __init__(self, user: Any) -> None:
self.id = user.id
self.username = user.username
class UnauthenticatedUser(authentication.UnauthenticatedUser):
pass
utils/dependency.py
を作成しましょう。from fastapi import Depends, HTTPException, Request, status
from utils.auth import OAuth2PasswordBearer
OAUTH2_SCHEME = OAuth2PasswordBearer(tokenUrl="/auth/token")
async def login_required(request: Request, token: str = Depends(OAUTH2_SCHEME)) -> None:
if not request.user.is_authenticated:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
app.py
に設定してみましょう。from fastapi import Depends, FastAPI
from starlette.middleware.authentication import AuthenticationMiddleware
from middlewares import AuthenticationBackend
from routes.auth import router as auth_router
from utils.dependency import login_required
app = FastAPI()
app.add_middleware(AuthenticationMiddleware, backend=AuthenticationBackend())
@app.get("/", dependencies=[Depends(login_required)])
def root():
return {"message": "Hello World"}
app.include_router(auth_router, prefix="/auth", tags=["Auth"])
http://localhost:8000/
にアクセスすると、以下のように表示されます。{
"detail": "Not authenticated"
}
http://localhost:8000/auth/token
で取得したアクセストークンをヘッダーに含めてPOSTリクエストしてみます。以下のように、ヘッダーにAuthorization
とアクセストークンを含めてリクエストしてみます。