Flask × OpenID Connectの実装法【コード例付き実装ガイド】

OpenID Connectの技術解説

OpenID Connectとは

OpenID Connect (OIDC) は、ユーザーに関する情報を取得するための認証プロトコルです。OAuth 2.0を基盤として開発され、以下のような特徴を持ちます。

  • IDトークンの活用: OIDCでは、IDトークンにユーザー情報が含まれています。これにより、アプリケーションはパーソナライズされたサービスを提供可能です。
  • 高い安全性: IDトークンはデジタル署名されており、改ざんが困難で、安全な認証を実現します。
  • OAuth 2.0との統合: 既存のOAuth 2.0インフラを活かせるため、導入がスムーズです。

OAuth 2.0とOIDCの違い

  • OAuth 2.0は認可、OpenID Connectは認証: OAuth 2.0は「誰にどの範囲までアクセスを許可するか」を決定するのに対し、OpenID Connectは「誰がアクセスしようとしているのか」を特定します。
  • OpenID ConnectはOAuth 2.0を拡張: OpenID Connectは、OAuth 2.0の仕組みを拡張し、認証機能を追加したプロトコルです。OAuth 2.0のフローを踏襲しつつ、IDトークンを発行することで、より安全な認証を実現しています。

OIDCは、OAuth 2.0を拡張する形で認証機能を追加したプロトコルです。

OpenID Connectの認証フロー

IDトークンの発行

  • 認証が成功すると、OAuth 2.0のフローに加えて、IDトークンが発行されます。

IDトークンの内容

IDトークンには以下の情報が含まれます。

  • ユーザーID(ユーザーを一意に特定)
  • 所属グループや権限
  • 認証サーバーから提供される追加情報

IDトークンの検証

アプリケーションは受け取ったIDトークンを以下のプロセスで検証します。

  1. トークンの署名を確認。
  2. トークンに含まれる情報を解析し、ユーザーを特定。
  3. ユーザー情報を基に、アクセス権限を付与。

(img: 認証フローを図示したイメージ)

OpenID Connectの実装

OIDCを実装する際には、認証サーバーの設定とアプリケーションのコーディングが必要です。

1. Google Cloud Platformでの設定

  1. プロジェクトの作成
    Google Cloud Platform (GCP)で新しいプロジェクトを作成します。
  2. OAuthクライアントIDとシークレットの取得
    JSON形式でクライアントIDとクライアントシークレットをダウンロードします。

(img: GCPのOAuth設定画面のスクリーンショット例)

2. コーディング

使用ライブラリ

OIDCをPythonで実装する際に便利なライブラリは以下の通りです。

実装例

from google_auth_oauthlib.flow import Flow
import google.oauth2.credentials
import os, hashlib

CLIENT_SECRET_FILE = 'oauth_oidc_flask/client_secret.json'
SCOPES = ['openid', 'profile', 'email']
GOOGLE_CLIENT_ID = "xxxxxxx.apps.googleusercontent.com"
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'

@bp.route('/authorize')
def authorize():
    flow = Flow.from_client_secrets_file(
        CLIENT_SECRET_FILE,
        scopes=SCOPES,
    )
    flow.redirect_uri = url_for('app.oauth2callback', _external=True)
    authorization_url, state  = flow.authorization_url(
        access_type='offline',
        include_granted_scopes='true',
        prompt='consent',
        state = hashlib.sha256(os.urandom(1024)).hexdigest()
        )
    session['state'] = state
    return redirect(authorization_url)

Flowインスタンスは、Google OAuth 2.0の認証フローを管理するためのオブジェクトです。
このインスタンスで処理を簡潔にできます。

authorization_url, state = authorization_url()
google認証ページのurlと、stateな通信を保証するstateトークンですauthorization_url()
認証URLの生成: authorization_url()メソッドを用いて、Googleの認証サーバーにアクセスするためのURLを生成します。
prompt=’select_account’: ユーザーに複数のGoogleアカウントがある場合、どのアカウントでログインするかを選択させるためのプロンプトを表示します。
state=os.urandom(16).hex(): CSRF攻撃を防ぐためのランダムな状態トークンを生成します。
redirectで、googleの認証ページに遷移します。return redirect(authorization_url)
authorization_url() メソッドで生成されたURLは、GoogleのOAuth 2.0認証サーバーへのアクセスを要求するURLです。このURLにユーザーをリダイレクトすることで、Googleの認証画面に遷移し、ユーザーは自分のGoogleアカウントでログインすることになります。session[‘state’] = state
stateの生成と保存: CSRF攻撃を防ぐために、ランダムな文字列を生成し、state変数に格納します。このstate変数をセッションに保存することで、後続の処理で同一の要求であることを確認できます。
リダイレクト: 生成された認証URLにユーザーをリダイレクトし、Googleの認証画面に遷移させます。
生成されたstateトークンをFlaskのセッションに保存することで、後続のコールバック処理でこのトークンと比較し、同一の要求であることを確認できます。

@bp.route('/oauth2callback')
def oauth2callback():
    state = session.get('state')
    flow = Flow.from_client_secrets_file(
        CLIENT_SECRET_FILE,
        scopes=SCOPES,
        state=state
    )
    flow.redirect_uri = url_for('app.oauth2callback', _external=True)
    flow.fetch_token(authorization_response=request.url)

    credentials = flow.credentials
    session['credentials'] = {
        'token': credentials.token,
        'refresh_token': credentials.refresh_token,
        'token_uri': credentials.token_uri,
        'client_id': credentials.client_id,
        'client_secret': credentials.client_secret,
        'scopes': credentials.scopes,
    }
    from google.oauth2 import id_token
    token_request = google.auth.transport.requests.Request()
    id_info = id_token.verify_oauth2_token(
        id_token=credentials.id_token,
        request=token_request,
        audience=GOOGLE_CLIENT_ID
    )
    email = id_info['email']
    user = User.select_by_email(email)
    if not user:
        user = User(email=email)
        user.add_user()
    login_user(user, remember=True)
    return redirect(url_for('app.inner'))

googleで認証が終わりました。完了後は、flowインスタンスに書いた、redirect_uri=’http://127.0.0.1:5000/callback’ callback関数に返ってきます。

flow.fetch_token(authorization_response=request.url)認証フローを完了し、アクセストークンを取得します。OAuth 2.0認証フローの最後のステップです。
ユーザーが「同意する」を押した後の処理です。

flow.fetch_token()
認証コードの交換:認証コードをアクセストークンとリフレッシュトークンに交換します。

CSRF対策: セッションに保存されたstateとリクエストパラメータのstateを比較し、一致しない場合はCSRF攻撃と判断し、エラー処理を行います。エラーメッセージを表示してホームページにリダイレクト

credentials = flow.credentials
コード中のcredentialsオブジェクトには、アクセストークンとリフレッシュトークンが格納されています。これらのトークンにアクセスするには、credentials.tokenとcredentials.refresh_tokenプロパティを使用します。
アクセストークンは、通常、JSON Web Token (JWT) という形式で発行されます。JWTは、構造化された情報をJSON形式で表し、デジタル署名によって改ざんを防ぐことができるトークンです。アクセストークンに含まれる情報は、発行者、有効期限、対象ユーザーなどです。GoogleのAPIに直接アクセスするための短命なトークンです。一定時間(通常は数十分から数時間)で有効期限が切れます。アクセストークンを使用して、GoogleのAPIを呼び出し、ユーザーの情報を取得したり、Googleサービスを利用したりすることができます。
リフレッシュトークンはアクセストークンの有効期限が切れた際に、新しいアクセストークンを取得するためのトークンです。アクセストークンよりも長期間有効ですが、こちらも定期的に更新する必要があります。
リフレッシュトークンを使用して、新しいアクセストークンをリクエストする際には、アクセストークンが失効していることを示す必要があります。

token_request = google.auth.transport.requests.Request()
Google Cloud Platformの認証ライブラリであるgoogle-authを用いて、トークンリクエスト用のオブジェクトを作成しています。このオブジェクトは、後続の処理で、Googleの認証サーバーに対してアクセストークンやリフレッシュトークンをリクエストするための基盤となります。上記のコードで、Requestオブジェクトが生成されます。後続の処理で、このオブジェクトにアクセストークンなどの認証情報が設定されます。このオブジェクトを使用して、認証サーバーに対してHTTPリクエストが実行されます。id_info = id_token.verify_oauth2_token()
id_token: 検証したいIDトークンの文字列です。このコードでは、credentials.id_tokenから取得しています。署名の検証: IDトークンに含まれる署名を検証し、トークンが改ざんされていないことを確認します。

request: HTTPリクエストオブジェクトです。このオブジェクトを使用して、Googleの認証サーバーに対してIDトークンの検証をリクエストします。

audience: IDトークンの対象となるアプリケーションのクライアントIDです。この値が正しい場合のみ、IDトークンが有効と判断されます。

ユーザーのメールアドレスを取得します。

リフレッシュトークンの保存: リフレッシュトークンをセッションに保存します。


@bp.route('/logout')
@login_required
def logout():
    if 'refresh_token' in session:
        token = session['refresh_token']
        try:
            resp = requests.post(
                "https://oauth2.googleapis.com/revoke",
                params={"token": token},
                headers={"Content-Type": "application/x-www-form-urlencoded"}
            )
            if not resp.ok:
                flash("Googleトークンの取り消し中にエラーが発生しました。", "error")
                return redirect(url_for('app.index'))
        except requests.RequestException as e:
            flash(f"Googleトークンの取り消し中に例外が発生しました: {str(e)}", "error")
            return redirect(url_for('app.index'))
        session.pop('refresh_token', None)
    session.clear()
    logout_user()
    flash("ログアウトしました。", "success")
    return redirect(url_for('app.home'))