こんにちは。バックエンドエンジニアの李です。弊社ではアンケート回答の分析など時間のかかる処理の実行環境として Cloud Run Jobs を活用しています。そこで今回は Cloud Run Jobs を使ったプロダクトを安定運用するための仕組みの一部として、Google Cloud の Workflows を用いた試みを紹介いたします。
背景
弊社が開発している TopicScan では、大規模データをバッチ処理するために Cloud Run Jobs を採用しました。しかし、初期はリソース不足やアプリケーションの不具合でジョブが失敗したり Cloud Run Jobs の内部的な障害によりジョブがキャンセルされたりするといった問題が起きました。
これらの問題への対策として単に自動リトライすることは良い方法ではありませんでした。リトライしても、原因が解決されないまま同じ処理を繰り返すかもしれないからです。また、TopicScan では料金が高い AI サービスを使っているため、リトライするたびに料金がかさんでしまいます。
そのため、リソース割り当ての見直しや不具合修正などと並行して以下のような仕組みを構築することにしました。
- Cloud Run Jobs の状態を継続してモニタリング
- エラーが発生した時点で運用担当者に通知する
- これを受けて運用担当者が原因調査やリカバリ作業を行い、ジョブを再実行する
ここで有効に機能したのが Google Cloud Workflows を活用したジョブ監視・通知の仕組みです。
Cloud Run Jobs の状態を継続的にモニタリングし、失敗やエラーが発生した場合には即座にアラートを行う設計とすることで、リトライ回数を抑えつつも適切なタイミングで対応できるようになりました。
Workflows を活用したジョブ管理の実装例
実装するのは下記のような仕組みです。
- ジョブを起動する
- ジョブの状態を監視する
- ジョブの状態に応じてアラート通知などを行う外部 API の呼び出しを行う
- ジョブの実行開始を確認後、完了まで待つ
以下に、Google Cloud Workflows を用いた具体的な実装例を示します。
main: params: [input] steps: - start_job: call: googleapis.run.v2.projects.locations.jobs.run args: name: ${input.job_name} body: overrides: containerOverrides: args: - ${"--token=" + input.token} result: job_operation - monitor_job_progress: steps: - get_operation: call: googleapis.run.v2.projects.locations.operations.get args: name: ${job_operation.name} result: google_longrunning_operation - get_job_status: switch: - condition: ${get_job_status(google_longrunning_operation) == "Pending"} steps: - call_external_service_1: call: call_external_service args: callback_url: ${input.callbackUrl} token: ${input.token} status: "Pending" - wait: call: sys.sleep args: seconds: 5 next: get_operation - condition: ${get_job_status(google_longrunning_operation) == "Running"} steps: - call_external_service_2: call: call_external_service args: callback_url: ${input.callbackUrl} token: ${input.token} status: "Running" - condition: ${get_job_status(google_longrunning_operation) == "Failed"} steps: - call_external_service_3: call: call_external_service args: callback_url: ${input.callbackUrl} token: ${input.token} status: "Failed" next: end - wait_for_completion: call: googleapis.run.v2.projects.locations.operations.wait args: name: ${job_operation.name} body: timeout: "45000s" result: google_longrunning_operation - finish_job: switch: - condition: ${get_job_status(google_longrunning_operation) == "Completed"} steps: - handle_success: call: call_external_service args: callback_url: ${input.callbackUrl} token: ${input.token} status: "Completed" - condition: ${get_job_status(google_longrunning_operation) == "Failed"} steps: - handle_error: call: call_external_service args: callback_url: ${input.callbackUrl} token: ${input.token} status: "Failed" get_job_status: params: [google_longrunning_operation] steps: - set_job_status: assign: - job_status: "Failed" - check_resource: switch: - condition: ${map.get(google_longrunning_operation, "done") == true and map.get(google_longrunning_operation, "response") != null} assign: - job_status: "Completed" - condition: ${map.get(google_longrunning_operation, "error") != null and map.get(map.get(google_longrunning_operation, "error"), "code") == 1} assign: - job_status: "Cancelled" - condition: ${map.get(google_longrunning_operation, "error") != null and map.get(map.get(google_longrunning_operation, "error"), "message") != null and text.match_regex(map.get(map.get(google_longrunning_operation, "error"), "message"), "Cancelled|cancelled")} assign: - job_status: "Cancelled" - condition: ${map.get(google_longrunning_operation, "metadata") != null} steps: - check_conditions: for: value: condition in: ${map.get(google_longrunning_operation.metadata, "conditions")} steps: - if_condition: switch: - condition: ${map.get(condition, "state") == "CONDITION_FAILED"} assign: - job_status: "Failed" next: break - condition: ${map.get(condition, "state") in ["CONDITION_PENDING", "CONDITION_RECONCILING", "STATE_UNSPECIFIED"]} assign: - job_status: "Pending" next: break - condition: ${map.get(condition, "state") == "CONDITION_SUCCEEDED"} assign: - job_status: "Running" next: break - return_status: return: ${job_status} call_external_service: params: [callback_url, token, status] steps: - update: try: call: http.patch args: url: ${callback_url} headers: token: ${token} body: status: ${status} retry: ${http.default_retry_non_idempotent} except: as: e steps: - log_external_error: call: sys.log args: data: ${e} severity: "ERROR" - issue_external_exception: raise: ${e}
意識した点
この実装をする際にいくつか意識した点があるので紹介します。
Workflows からの Cloud Run Jobs 呼び出しにAPI コネクタ を利用
API コネクタは、他の Google Cloud プロダクトの API を簡単に呼び出せる仕組みであり、直接 REST API を使用するよりも実装が簡潔になります。
また、API のREST API(http.get) を使う場合、デフォルトで 5分、設定を変えても1時間でタイムアウトしてしまいますが、API コネクタを使う場合、その制限はなく、ワークフロー実行の開始時刻から終了時刻まで処理を継続することができます。そのため、長時間のジョブを Workflows で管理するには API コネクタを利用するのが有効です。
ジョブの監視に利用するAPI
現在のジョブの状態をポーリングしながら確認できるoperations.getとジョブの完了を待機するoperations.waitを組み合わせて利用しています。
なぜ組み合わせているかというと、Cloud Run Jobs は、ジョブの実行が開始される前に reconciliation(調整プロセス) を経るため、「ジョブが開始されているのか」「まだ準備中なのか」を見極めることが重要だからです。
例えば、Cloud Run のリソース制限によって、ジョブが起動されないまま終了する可能性があります。その場合、operations.wait だけでは「ジョブの完了」しかわからず、実際にジョブが開始されていないのに、実行完了だと誤認するリスクがあります。
さらにoperations.get を活用することで起動失敗の原因(メモリ不足、リソース競合など)を知ることができます。
したがって、本実装では operations.get を利用して reconciliation の状態を定期的に確認し、ジョブが実際に開始されたことを確認したうえで operations.wait に移行する設計にしています。
ポーリング間隔の最適化
ジョブの状態を監視する際には、API の呼び出し回数を減らしつつ、適切な間隔(例: 5~10秒)で状態をチェックすることが重要です。
ポーリングの頻度が高すぎると API 呼び出し回数の増加による負荷が発生するため、適切なバランスを考慮する必要があります。
google_longrunning_operation のレスポンスの取得
Cloud Run Jobs の API から返される google_longrunning_operation のレスポンスは、状況によって特定のキーが存在しない場合があるため、map.get() を活用して安全に値を取得することが推奨されます。
❌ 誤った例(エラーが発生する可能性あり)
- condition: ${google_longrunning_operation.done == true}
✅ 安全な例(キーが存在しない場合でもエラーを回避)
- condition: ${map.get(google_longrunning_operation, "done") == true}
このように、レスポンスの解析時に map.get() を使用することで、予期しないエラーを防ぐことができます。
おわりに
背景で挙げたような初期の問題は Cloud Run Jobs 内部の障害も含め既にほぼ解決していますが、ジョブの監視や障害時の適切な対応は引き続き重要です。Google Cloud Workflows を活用することで、ジョブの実行状態を効率的に管理し、安定した運用が可能になります。本記事が、Cloud Run Jobs を活用する際の参考になれば幸いです。