EmotionTechテックブログ

株式会社エモーションテックのProduct Teamのメンバーが、日々の取り組みや技術的なことを発信していくブログです。

Dataflowのエラーハンドリング四方山話

はじめに

こんにちは!バックエンドエンジニアの谷口(@ravineport)です。

以前、「サンプルコードからざっくり理解するDataflowでストリーム処理」という記事を書きました。今回はDataflowでストリーム処理をする際のエラーハンドリングについて触れたいと思います。

この記事はエモーションテック Advent Calendar 2023 の16日目の記事です。

今回扱うストリームパイプライン

今回は例として以下のようなシンプルなストリームを考えてみます。

今回扱うストリームの流れ

ストリームへの入力はPub/Subで、出力先はBigQueryです。Pub/Subにpublishされるデータは以下のようなJSONです。

{ 
  "id": 123,
  "data": "a"
}

パイプラインでは入力のJSON文字列をパースして、dataの値をBigQueryのテーブルに書き込む処理をします。

このパイプラインのJavaによる実装は以下のとおりです。(筆者はあまりJavaに明るくないため至らぬ点はご容赦くださいmm)

Javaによる実装(折りたたんであります。この行をクリックすると展開されます)

package org.example;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class PubSubToBQ {
    public static void main(String[] args) {
        PubSubToBQOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToBQOptions.class);
        options.setJobName("pubsub-to-bq");
        options.setStreaming(true);

        Pipeline pipeline = Pipeline.create(options);
        pipeline
            .apply("PubSubからメッセージを読み取る", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
            .apply("JSON文字列をパースしてdataを抽出", ParDo.of(new ExtractDataFromJsonString()))
            .apply("BigQueryに書き込む", BigQueryIO.<String>write()
                .to("sample-project.sample-dataset:data-table")
                .withFormatFunction(
                    (String data) -> new TableRow().set("data", data)
                )
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
            );

        pipeline.run();
    }

    public interface PubSubToBQOptions extends DataflowPipelineOptions {
        @Description("読み込み先のPub/Subのトピック名")
        @Validation.Required
        String getInputTopic();

        void setInputTopic(String value);
    }

    public static class Data {
        public Long id;
        public String data;

        @JsonCreator
        public Data(
            @JsonProperty(value = "id", required = true) Long id,
            @JsonProperty(value = "data", required = true) String data
        ) {
            this.id = id;
            this.data = data;
        }
    }

    public static class ExtractDataFromJsonString extends DoFn<String, String> {
        private final ObjectMapper mapper;

        ExtractDataFromJsonString() { mapper = new ObjectMapper(); }

        @ProcessElement
        public void processElement(@Element String str, OutputReceiver<String> output) {
            try {
                Data data = mapper.readValue(str, Data.class);
                output.output(data.data);
            } catch (Exception e) {
                // JSONのパースに失敗した場合は無視する
            }
        }
    }
}

今回は以下の前提を置いています。

  • GCP Project名:sample-project
  • BigQueryのデータセット名:sample-dataset
  • BigQueryのテーブル名:data-table
    • カラム
      • data: String

BigQueryに上記のデータセットとテーブルを先に作成しておきます。

また、BigQueryへの書き込みはApache Beamが提供しているBigQueryIOを使います。詳細な使い方はこちらのドキュメントを参照ください。書き込みにはStorage Write APIを使っています。

自前の処理におけるエラーハンドリング

上記のコードではJSONのパースに失敗した(例:必須プロパティが含まれていない、JSONフォーマットの文字列でない、etc…)ときは特になにもしないようになっています。

JSONパースに失敗した文字列についてはBigQueryの別テーブルに保存したい、という場合は以下のように書くことができます。

JSONパースに失敗したときのハンドリング(折りたたんであります。この行をクリックすると展開されます)

package org.example;

// ...略
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;

public class PubSubToBQ {

    static final TupleTag<String> MAIN = new TupleTag<>() {
    };

    static final TupleTag<ErrorData> ERROR = new TupleTag<>() {
    };

    public static void main(String[] args) {
        // ...略

        Pipeline pipeline = Pipeline.create(options);

        PCollectionTuple pct = pipeline
            .apply("PubSubからメッセージを読み取る", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
            .apply("JSON文字列をパースしてdataを抽出。失敗した場合はErrorData", ParDo.of(new ExtractDataFromJsonString2()).withOutputTags(MAIN, TupleTagList.of(ERROR)));
        pct.get(MAIN)
            .apply("正常データをBigQueryに書き込む", BigQueryIO.<String>write()
                .to("sample-project.sample-dataset:data-table")
                .withFormatFunction(
                    (String data) -> new TableRow().set("data", data)
                )
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
            );
        pct.get(ERROR)
            .apply("エラーデータをBigQueryに書き込む", BigQueryIO.<ErrorData>write()
                .to("sample-project.sample-dataset:error-data-table")
                .withFormatFunction(
                    (ErrorData data) -> new TableRow().set("error_msg", data.errorMsg).set("data_string", data.dataString)
                )
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
            );

        pipeline.run();
    }

    // ...略

    // JSONパースに失敗したデータを扱う型
    public static class ErrorData {
        public String errorMsg;

        public String dataString;

        public ErrorData(String errorMsg, String dataString) {
            this.errorMsg = errorMsg;
            this.dataString = dataString;
        }
    }

    public static class ExtractDataFromJsonString2 extends DoFn<String, String> {
        private final ObjectMapper mapper;

        ExtractDataFromJsonString2() { mapper = new ObjectMapper(); }

        @ProcessElement
        public void processElement(@Element String str, MultiOutputReceiver output) {
            try {
                Data data = mapper.readValue(str, Data.class);
                output.get(MAIN).output(data.data);
            } catch (Exception e) {
                // JSONのパースに失敗した場合はエラーメッセージと入力文字列を流す
                output.get(ERROR).output(new ErrorData(e.getMessage(), str));
            }
        }
    }
}

JSONをパースする処理から、その結果によってパイプラインを分岐させるイメージです。

分岐したパイプラインを区別するために、TupleTagクラスを使って正常データを流すためのMAIN, 異常データを流すためのERRORというタグをそれぞれつくります。

ExtractDataFromJsonString2MultiOutputReceiverを引数に取るようになっています。 これとTupleTagを組み合わせて条件によって出力先を変えています。

また、Pipeline pipelineの扱い方も変わっています。 ParDo.of(new ExtractDataFromJsonString2()).withOutputTags(MAIN, TupleTagList.of(ERROR))で扱うようになっており、返り値の型もPCollectionTuple になっています。

PCollectionTupleインスタンスからMAINとERRORをそれぞれgetしてそれぞれの後続処理を行っています。

この例のようにApache Beamではストリームを複数個に分岐させることができます。ある処理でエラーが起きたときに、後続処理を変えたいときはこれらの仕組みを使うことができそうです。

BigQueryIO内の処理についてはほとんどエラーハンドリングができない

先ほどの例では自前のDoFnに関するエラーハンドリングを紹介しました。では、BigQueryIOで発生したエラーについてはどのように扱うのでしょうか?

結論、BigQueryIOで提供されていること以上のエラーハンドリングはできません。StorageWriteAPIを使っている場合は、パイプラインの実装が書き込み先のテーブルのスキーマと食い違っている、書き込みデータのサイズが大きすぎるエラーなどはハンドリング可能です。

BigQueryIOでは、書き込み結果をWriteResultとして受け取ることができます。StorageWriteAPIを使ったときに失敗したデータはgetFailedStorageApiInserts メソッドで取得ができます(参考)。しかしこれは上記のエラーになったデータのみが取得できます。

今回の例では、書き込み時に対象のテーブルがない場合は作成しないような実装になっています。テーブルがない状態でPub/Subにデータを流すとRuntimeExceptionになります。 このとき、パイプラインはそのデータを捨てることなく無限にリトライを繰り返します。

なぜすべてのエラーをハンドリングしていないのか考察

getFailedStorageApiInsertsのコメントにもある通り、persistentlyな失敗以外は外から復旧可能なためと考えられそうです。
テーブルがないのなら外から作れば解決できるし、権限が足りないのであれば割り当てれば解決可能です。しかし、スキーマの食い違いやデータが大きすぎることについてはもはやどうしようもありません。そういったもののみ、パイプラインの実装でハンドリングするという方針になっていそうです。

RuntimeExceptionを投げればDataflow的には無限にリトライを繰り返すのでデータが失われることはなさそうです(その状態でパイプラインを更新しても失敗した状態は引き継がれます)。そうしているうちに外から解決できるのであれば問題なし、ということかと推察しています。

(確証はないので詳しい方いらっしゃいましたらぜひご教授ください…!)

エラーハンドリングについてのまとめ

  • 自前で作成したDoFnなどの処理ついては、TupleTag,MultiOutputReceiverを使って複数パイプラインを作成することで処理の分岐ができる
  • BigQueryIOなど既に用意されているPTransformを使う場合は、それらが用意しているメソッド以外のハンドリングはできない

PTransformのスタイルガイドも用意されているので、これを読むと一層理解が深まりそうです。 基本的に内部事情を外に出さず、それを使って新しいPTransformを作ったときにもとのPTransformの挙動は影響を受けないようにするというのが方針のようです。PTransformの内部事情に干渉したくなったときは、なんとか干渉せずにすむようにパイプラインの実装を考えるのがよさそうでした。

最後に

Dataflow、とても便利ですが少し踏み込むとなかなか難しいことがたくさんありました。DataflowはApache Beamの実行環境ということもあり、課題にあたったときにDataflowの話なのか、Apache Beamの話なのかの切り分けがなかなか難しい印象です。課題によってはGoogle Cloudのサポートの力を借りることが難しいケースもありました。

そういった難しさはありつつも、大規模なストリーム処理をする上でDataflowはとても強力なツールです。今後も活用していきたいと思います!

エモーションテックでは顧客体験、従業員体験の改善をサポートし、世の中の体験を変えるプロダクトを開発しています。プロダクトに興味のある方、この記事や他の記事を見て少しでも弊社に興味をもっていただけましたら、ぜひ採用ページからご応募をお願いいたします!

hrmos.co