image.png

概要

PostgreSQLはMySQLと同様のオープンソースソフトウェアRDBMS(リレーショナルデータベースマネジメントシステム)で、導入コストの低さから社内のデータベースとして採用している企業もあるかと思います。ですが近年クラウド型DWH(データウェアハウス)サービスが相次いで登場し、安価な運用コストながらデーベースの管理コストをベンダー側に負担してもらうことが出来るようになりました。またデータ量の増加に伴うデータベースの拡張も容易です。

ですが従来のPostgreSQLをベースにした運用からこのようなサービスを中心とした運用へのシフトにはPostgreSQLからDWHへのデータ統合が簡単に行えなければあまり意味がありません。

また一度データパイプラインが構築できたとしてもデータの更新のたびにPostgreSQLのデータを丸ごと転送し直していてはPostgreSQL、DWH双方のパフォーマンスを低下させます。

そこで今回はtrocco®のCDC形式転送機能を利用し、PostgreSQLbinlogのうち変更が生じたデータだけをGoogle BigQueryへ自動で転送し、データベースをアップデータとする体制をほぼノーコードでやってみたいと思います。

PostgreSQLのデータをGoogle BigQueryへ転送

1-0. CDCとは?

今回はPostgreSQLのWALをCDC形式(Change Data Capture = 変更データキャプチャ)で転送します。
CDC形式の転送により、初回の転送のみデータの全件転送が発生するものの、それ以後はPostgreSQL上に生じた差分データのみ転送するため、トータルでみると転送量の削減・転送の高速化を図ることができます。

なおCDC形式の転送と全件転送との比較はを表にすると以下のようになります。

全件転送CDC
trocco転送量(初回実行時)ソーステーブルの全データソーステーブルの全データ
trocco転送量(2回目以降)ソーステーブルの全データ更新ログのみ(※)
スキーマ変更発生時
(列追加等)
自動追従自動追従
データ削除(DELETE)の挙動物理削除論理削除
データ更新(UPDATE)の挙動物理更新物理更新
BigQueryスキャン量なし現在のBigQuery上のテーブル全量+更新ログ
データソースDBへの負荷低(初回転送時のみ全量のため高負荷)
※ INSERT,DELETE,DDL文については1行文、UPDATE文については更新前後の2行分

trocco®でCDC転送を行う際には

  1. trocco®がPostgreSQLのWALをGoogle BigQuery上に転送
  2. WALのデータから重複を排除し、それぞれのレコードの最新のレコードの集合を作成
  3. レコードの集合を現在のGoogle BigQueryテーブルとマージし、マージ後のテーブルで置換する
    という手順を踏んでいます。

CDC転送の詳細な手順についてはこちらを参照してください。

2-1. PostgreSQLのパラメータ設定

ヘルプのドキュメントを参照しつつPostgreSQLのパラメータ設定を行います。今回はRDSを使っていますので、AWSのコンソールから環境変数を確認します。

image.png

rds.logical_replicationは1に設定されています。
max_replication_slotsとmax_wal_sendersも連携するテーブルの数と同じになっていることが確認できたので、このまま次のステップに進めます。

2-2. Slotの作成

ヘルプのドキュメントを参照しつつ、Slotの作成を行います。

SUPERUSERでログイン後、下記のSQLでスロットを作成してください。

SELECT * FROM pg_create_logical_replication_slot('trocco_<table_name>', 'wal2json');

作成後、下記のSQLでSlotからデータが取得できるかの確認ができます。

SELECT COUNT(*) FROM pg_logical_slot_peek_changes('<replication_slot_name>', null, null);

2-3. PostgreSQLの権限設定

ヘルプのドキュメントを参照しつつ、

PostgreSQLの権限設定をします。

ここではCONNECT, USAGE, SELECT, rds_replication, ALTER DEFAULT PRIVILEGES の5つの権限を設定します。
(rds_replicationは、RDSの場合のみ必要となります)

GRANT CONNECT ON DATABASE <database_name> TO <username>;
GRANT USAGE ON SCHEMA <schema_name> TO <username>;
GRANT SELECT ON <schema_name>.<table_name> TO <username>;
GRANT rds_replication TO <username>;
ALTER DEFAULT PRIVILEGES IN SCHEMA <schema_name> GRANT SELECT ON TABLES TO <username>;

2-3. Primary Keyの設定

転送元のテーブルにはPrimary Keyが設定されている必要があります。
ここでは既に設定されているものとして、割愛します。

その他、trocco®で転送する際の必須条件などについてはこちらのドキュメントを参照してください。

これでPostgreSQL側での設定は完了です。
続いてtrocco®の設定を行います。

3-0. trocco®に登録

はじめにtrocco®のアカウントが必要です。
trocco®は無料のトライアルも実施しているので、前もって申し込み・登録をしておきましょう。
https://trocco.io/lp/index.html
(申し込みの際にこちらの記事を見たという旨を記載して頂ければスムーズにご案内できます)

3-1. 転送元・転送先を決定

trocco®にアクセスし、ダッシュボード画面から「転送設定を作成」のボタンを押します。

貼り付けた画像_2021_02_09_18_45.png

転送元にPostgreSQL WAL(CDC)、転送先にGoogle BigQueryを選択し、「この内容で作成」ボタンを押します。

image.png

設定画面になるので、必要な情報を入力していきます。

3-2. PostgreSQLとの連携設定

転送設定の名前とメモを入力します。

image.png

転送設定の名前を決めたら、「転送元の設定」内の「接続情報を追加」ボタンを押し、PostgreSQLの接続情報の設定を行います。

image.png

接続設定の名前・ホスト・ポート・ユーザー名・パスワードを入力します。

image.png

接続確認をしたのち、設定の保存をします。

image.png

再度転送設定画面に戻り、「接続情報を読み込む」ボタンを押すと、作成した接続情報が選択できるかと思います。

image.png

3-3. PostgreSQLからのデータ抽出設定

これでPostgreSQLとの連携は完了です。
次にPostgreSQLから取得するデータを設定します。
まずは必須項目の「データベース名」「スキーマ名」「テーブル」「スロット」をセレクトボックスの中から選択します。

image.png

スキーマ追従の有効/無効を指定します。

image.png

スキーマ追従が有効になっている場合、

  • クエリにRENAME COLUMNが含まれている場合、変更後の名前でデータ連携される
  • クエリにADD COLUMNが含まれている場合、追加したテーブルが自動で転送されるようになる

といった挙動になります。

最後に接続確認が通るか確認します。

PostgreSQLとの接続が確認できました。以上でPostgreSQL側の設定は完了です。
次は転送先のGoogle BigQueryの設定を行っていきましょう。

image.png

3-4. 転送先Google BigQueryの設定

基本的には転送元と同じ要領です。
「接続情報を追加」ボタンからGoogle BigQueryの接続設定を行い、データベース・テーブル・データセットのロケーションを指定します。

貼り付けた画像_2021_02_09_18_17.png

PostgreSQLとの接続同様Google BigQueryについても接続テストを行います。

image.png

接続が確認できたためこれで入力は完了です。保存して次に進みましょう。

3-5. カラム名・データ型の確認

「データプレビュー」の画面でカラム名・データ型の確認を行います。
PostgreSQLの各データはtrocco®の方でデータ型を判断し、自動的にGoogle BigQueryのデータ型に変換されます。

データ型についてはこちらのドキュメントをご参照ください。

image.png

問題なければ保存します。

3-6. 初回転送

CDCは変更データのみを転送する方式ですが、初回だけは全件転送をする必要があります。
このステップでは、手動で全件転送を行います。

右上にある「実行」ボタンを押します。

saiki-fix.png

「全件転送を行う」にチェックマークを入れて、「ジョブを実行」を押します。

image.png

転送完了まで待ちます。

image.png

転送が成功しました。これで初回の全件転送は完了です。

image.png

続いてCDC転送の設定をします。
転送設定の詳細画面に戻りましょう。

3-7. スケジュール設定

「スケジュール・トリガー設定」タブを開きます。

image.png

以下のように実行スケジュールを設定することで、転送を自動化することが出来ます。

image.png

3-8. 通知設定

必須の設定ではないですが、ジョブの実行ステータスに応じてEmailやSlackに通知を行うことが出来ます。転送の成功/失敗時の通知だけでなく、転送ジョブが不自然に時間がかかっている場合や、転送は成功したもののデータの転送件数が0になってしまっている場合など様々なエラー管理に対応しています。

image.png

これで全ての設定が完了しました。
以後は設定したスケジュールに合わせてCDC転送が実行され、差分のデータがGoogle BigQueryに統合されるようになります。

4-1. 転送結果の確認

trocco®での設定のみで転送が完了しているため、Google BigQuery側でなにか操作は必要ありません。

最後に設定に従ってGoogle BigQueryにデータの統合ができているか確認してみます。

image.png
PostgreSQLから転送したデータを確認
image.png
Google BigQuery側から転送したデータを確認

両方とも同じデータが入っていることが確認できます。

元のテーブルにいくつか変更を加えて、Gogle BigQuery側でどのように表示されるのかを確認してみましょう。

DELETE FROM qiita_test WHERE id = 1;
INSERT INTO qiita_test VALUES (4, 'Kimura');
ALTER TABLE qiita_test ADD NewColumn VARCHAR(20) NULL;

id=1 の行を削除
id=4 の行を追加
新たに「NewColumn」のカラムを追加
この3点が変更されています。

image.png

このようにPostgreSQLのテーブルに変更を加えたうえでCDC転送を行い、Google BigQuery側のコンソール画面から再度データを確認します。

image.png

先ほどのクエリで変更した部分がGoogle BigQueryでも反映されていることが分かります。
また削除したカラムは_trocco_deletedカラムがtrueになっています。

今回は「スキーマ追従」の設定を有効にしているのでカラムの追加が反映されていますが、スキーマ追従が無効の場合には反映されないのでご注意ください。

4-2. おわりに

転送が不要になったらここまでの工程で作成したスロットを削除してください。スロットを削除しないでいると、WALがストレージを圧迫してしまい、データベースがクラシュしてしまう可能性があります。

まとめ

いかがでしたでしょうか。trocco®を利用することで簡単にPostgreSQLのデータを取得し、DWH(今回はGoogle BigQuery)に統合することができました。またDWHとBIツールを連携させることでデータ分析を行うことが可能となります。
PostgreSQLを活用している企業の方はぜひtrocco®をご検討ください。

trocco®では、クレジットカード不要のフリープランをご案内しています。ご興味がある方はぜひこの機会に一度お試しください。

hirokazu.kobayashi

慶應義塾大学卒業後、2014年より株式会社リブセンスへ入社。データエンジニアとして同社分析基盤立ち上げをリードする。2017年より現職primeNumberに入社。自社プロダクト「systemN」におけるSpark/Redshift活用等のデータエンジニアリング業務を行うかたわら、データ統合業務における工数削減が課題だと感じ、データ統合を自動化するサービス「trocco®」を立ち上げる。