
概要
PostgreSQLはMySQLと同様のオープンソースソフトウェアRDBMS(リレーショナルデータベースマネジメントシステム)で、導入コストの低さから社内のデータベースとして採用している企業もあるかと思います。ですが近年クラウド型DWH(データウェアハウス)サービスが相次いで登場し、安価な運用コストながらデーベースの管理コストをベンダー側に負担してもらうことが出来るようになりました。またデータ量の増加に伴うデータベースの拡張も容易です。
ですが従来のPostgreSQLをベースにした運用からこのようなサービスを中心とした運用へのシフトにはPostgreSQLからDWHへのデータ統合が簡単に行えなければあまり意味がありません。
また一度データパイプラインが構築できたとしてもデータの更新のたびにPostgreSQLのデータを丸ごと転送し直していてはPostgreSQL、DWH双方のパフォーマンスを低下させます。
そこで今回はtrocco®のCDC形式転送機能を利用し、PostgreSQLbinlogのうち変更が生じたデータだけをGoogle BigQueryへ自動で転送し、データベースをアップデータとする体制をほぼノーコードでやってみたいと思います。

1-0. CDCとは?
今回はPostgreSQLのWALをCDC形式(Change Data Capture = 変更データキャプチャ)で転送します。
CDC形式の転送により、初回の転送のみデータの全件転送が発生するものの、それ以後はPostgreSQL上に生じた差分データのみ転送するため、トータルでみると転送量の削減・転送の高速化を図ることができます。
なおCDC形式の転送と全件転送との比較はを表にすると以下のようになります。
全件転送 | CDC | |
---|---|---|
trocco転送量(初回実行時) | ソーステーブルの全データ | ソーステーブルの全データ |
trocco転送量(2回目以降) | ソーステーブルの全データ | 更新ログのみ(※) |
スキーマ変更発生時 (列追加等) | 自動追従 | 自動追従 |
データ削除(DELETE)の挙動 | 物理削除 | 論理削除 |
データ更新(UPDATE)の挙動 | 物理更新 | 物理更新 |
BigQueryスキャン量 | なし | 現在のBigQuery上のテーブル全量+更新ログ |
データソースDBへの負荷 | 高 | 低(初回転送時のみ全量のため高負荷) |
trocco®でCDC転送を行う際には
- trocco®がPostgreSQLのWALをGoogle BigQuery上に転送
- WALのデータから重複を排除し、それぞれのレコードの最新のレコードの集合を作成
- レコードの集合を現在のGoogle BigQueryテーブルとマージし、マージ後のテーブルで置換する
という手順を踏んでいます。
CDC転送の詳細な手順についてはこちらを参照してください。
2-1. PostgreSQLのパラメータ設定
ヘルプのドキュメントを参照しつつPostgreSQLのパラメータ設定を行います。今回はRDSを使っていますので、AWSのコンソールから環境変数を確認します。

rds.logical_replicationは1に設定されています。
max_replication_slotsとmax_wal_sendersも連携するテーブルの数と同じになっていることが確認できたので、このまま次のステップに進めます。
2-2. Slotの作成
ヘルプのドキュメントを参照しつつ、Slotの作成を行います。
SUPERUSERでログイン後、下記のSQLでスロットを作成してください。
SELECT * FROM pg_create_logical_replication_slot('trocco_<table_name>', 'wal2json');
作成後、下記のSQLでSlotからデータが取得できるかの確認ができます。
SELECT COUNT(*) FROM pg_logical_slot_peek_changes('<replication_slot_name>', null, null);
2-3. PostgreSQLの権限設定
ヘルプのドキュメントを参照しつつ、
PostgreSQLの権限設定をします。
ここではCONNECT, USAGE, SELECT, rds_replication, ALTER DEFAULT PRIVILEGES の5つの権限を設定します。
(rds_replicationは、RDSの場合のみ必要となります)
GRANT CONNECT ON DATABASE <database_name> TO <username>;
GRANT USAGE ON SCHEMA <schema_name> TO <username>;
GRANT SELECT ON <schema_name>.<table_name> TO <username>;
GRANT rds_replication TO <username>;
ALTER DEFAULT PRIVILEGES IN SCHEMA <schema_name> GRANT SELECT ON TABLES TO <username>;
2-3. Primary Keyの設定
転送元のテーブルにはPrimary Keyが設定されている必要があります。
ここでは既に設定されているものとして、割愛します。
その他、trocco®で転送する際の必須条件などについてはこちらのドキュメントを参照してください。
これでPostgreSQL側での設定は完了です。
続いてtrocco®の設定を行います。
3-0. trocco®に登録
はじめにtrocco®のアカウントが必要です。
trocco®は無料のトライアルも実施しているので、前もって申し込み・登録をしておきましょう。
https://trocco.io/lp/index.html
(申し込みの際にこちらの記事を見たという旨を記載して頂ければスムーズにご案内できます)
3-1. 転送元・転送先を決定
trocco®にアクセスし、ダッシュボード画面から「転送設定を作成」のボタンを押します。

転送元にPostgreSQL WAL(CDC)、転送先にGoogle BigQueryを選択し、「この内容で作成」ボタンを押します。

設定画面になるので、必要な情報を入力していきます。
3-2. PostgreSQLとの連携設定
転送設定の名前とメモを入力します。

転送設定の名前を決めたら、「転送元の設定」内の「接続情報を追加」ボタンを押し、PostgreSQLの接続情報の設定を行います。

接続設定の名前・ホスト・ポート・ユーザー名・パスワードを入力します。

接続確認をしたのち、設定の保存をします。

再度転送設定画面に戻り、「接続情報を読み込む」ボタンを押すと、作成した接続情報が選択できるかと思います。

3-3. PostgreSQLからのデータ抽出設定
これでPostgreSQLとの連携は完了です。
次にPostgreSQLから取得するデータを設定します。
まずは必須項目の「データベース名」「スキーマ名」「テーブル」「スロット」をセレクトボックスの中から選択します。

スキーマ追従の有効/無効を指定します。

スキーマ追従が有効になっている場合、
- クエリにRENAME COLUMNが含まれている場合、変更後の名前でデータ連携される
- クエリにADD COLUMNが含まれている場合、追加したテーブルが自動で転送されるようになる
といった挙動になります。
最後に接続確認が通るか確認します。
PostgreSQLとの接続が確認できました。以上でPostgreSQL側の設定は完了です。
次は転送先のGoogle BigQueryの設定を行っていきましょう。

3-4. 転送先Google BigQueryの設定
基本的には転送元と同じ要領です。
「接続情報を追加」ボタンからGoogle BigQueryの接続設定を行い、データベース・テーブル・データセットのロケーションを指定します。

PostgreSQLとの接続同様Google BigQueryについても接続テストを行います。

接続が確認できたためこれで入力は完了です。保存して次に進みましょう。
3-5. カラム名・データ型の確認
「データプレビュー」の画面でカラム名・データ型の確認を行います。
PostgreSQLの各データはtrocco®の方でデータ型を判断し、自動的にGoogle BigQueryのデータ型に変換されます。
データ型についてはこちらのドキュメントをご参照ください。

問題なければ保存します。
3-6. 初回転送
CDCは変更データのみを転送する方式ですが、初回だけは全件転送をする必要があります。
このステップでは、手動で全件転送を行います。
右上にある「実行」ボタンを押します。

「全件転送を行う」にチェックマークを入れて、「ジョブを実行」を押します。

転送完了まで待ちます。

転送が成功しました。これで初回の全件転送は完了です。

続いてCDC転送の設定をします。
転送設定の詳細画面に戻りましょう。
3-7. スケジュール設定
「スケジュール・トリガー設定」タブを開きます。

以下のように実行スケジュールを設定することで、転送を自動化することが出来ます。

3-8. 通知設定
必須の設定ではないですが、ジョブの実行ステータスに応じてEmailやSlackに通知を行うことが出来ます。転送の成功/失敗時の通知だけでなく、転送ジョブが不自然に時間がかかっている場合や、転送は成功したもののデータの転送件数が0になってしまっている場合など様々なエラー管理に対応しています。

これで全ての設定が完了しました。
以後は設定したスケジュールに合わせてCDC転送が実行され、差分のデータがGoogle BigQueryに統合されるようになります。
4-1. 転送結果の確認
trocco®での設定のみで転送が完了しているため、Google BigQuery側でなにか操作は必要ありません。
最後に設定に従ってGoogle BigQueryにデータの統合ができているか確認してみます。


両方とも同じデータが入っていることが確認できます。
元のテーブルにいくつか変更を加えて、Gogle BigQuery側でどのように表示されるのかを確認してみましょう。
DELETE FROM qiita_test WHERE id = 1;
INSERT INTO qiita_test VALUES (4, 'Kimura');
ALTER TABLE qiita_test ADD NewColumn VARCHAR(20) NULL;
id=1 の行を削除
id=4 の行を追加
新たに「NewColumn」のカラムを追加
この3点が変更されています。

このようにPostgreSQLのテーブルに変更を加えたうえでCDC転送を行い、Google BigQuery側のコンソール画面から再度データを確認します。

先ほどのクエリで変更した部分がGoogle BigQueryでも反映されていることが分かります。
また削除したカラムは_trocco_deletedカラムがtrueになっています。
今回は「スキーマ追従」の設定を有効にしているのでカラムの追加が反映されていますが、スキーマ追従が無効の場合には反映されないのでご注意ください。
4-2. おわりに
転送が不要になったらここまでの工程で作成したスロットを削除してください。スロットを削除しないでいると、WALがストレージを圧迫してしまい、データベースがクラシュしてしまう可能性があります。
まとめ
いかがでしたでしょうか。trocco®を利用することで簡単にPostgreSQLのデータを取得し、DWH(今回はGoogle BigQuery)に統合することができました。またDWHとBIツールを連携させることでデータ分析を行うことが可能となります。
PostgreSQLを活用している企業の方はぜひtrocco®をご検討ください。
trocco®では、クレジットカード不要のフリープランをご案内しています。ご興味がある方はぜひこの機会に一度お試しください。
