トレジャーデータ株式会社が開催した「Embulk & Digdag Online Meetup 2020」。
Embulk / Digdagのオリジナル開発者である古橋(@frsyuki )や現在のコア開発チームも参加し、Embulk / Digdagを本番環境で運用しているエンジニアやコントリビュートをしている・したいエンジニア向けに、EmbulkとDigdagそれぞれの今後のロードマップについて語られました。

また、株式会社primeNumberからは、Embulkを利用した分析基盤構築・データ統合SaaSの「trocco®」の開発の話や、Kubernetes on EKSを利用したスケールの確保、実運用の裏側などをご紹介。

EmbulkとDigdagをプロダクション環境で利用しているZOZO Technologiesからは、「Digdagを利用したデータ基盤とその運用Tips」と題し、EmbulkとDigdagの運用やプラグイン開発についてのディープなナレッジが共有されました。

Embulkプラグインをスクリプトで書けるようにしてみた

古橋 貞之氏

データを基にした論理的な意思決定をしていくことが、(ビジネスにおいて)非常に大事であることが、近年の共通認識になりました。これからは、クラウドやSaaSにデータを集めてきて解析する、という作業がデータサイエンスやデータエンジニアリングをする上で必要になっていくでしょう。さらにSaaS間のデータ統合も必要となり、これは10年前にはあり得なかった世界観です。

古橋氏:「SaaSのクラウドベースにあるデータをインテグレートしていくためにはEmbulkのプラグインAPIの活用することになります。しかしそのプラグインを書くためには、JavaのAPIが使えなければならないのですが、そうしたプラグインを書ける人は多くありません。

その一方で、『スクリプトなら書けます』『スクリプトとSDKがあってドキュメントがあれば書けます』というSaaSの設定をしてる人たちのほうが、圧倒的に人口は多くなります。そうなるとプラグインを書いてデータを持ってこれないと、そもそもデータ解析も始まらないという問題があります」

そこでSaaSの設定をしている人とプラグインを書く人を同じ人にすることで、データを使った意思決定を素早く行うことができ、ギャップを解消することができるようになると古橋氏は提案しています。また、Embulkのフレームワーク上で走らせることで、多くのアウトプットプラグインを組み合わせることもできるそうです。

Plan and incompatibility toward Embulk v1.0

三廻部氏からは、2015年にリリースされてから5年が経ったEmbulk ver1.0に向けた計画と、その中でいくつかの非互換の対応について、ユーザー向けと開発者向けの2部に分けられて語られました。

三廻部 大氏

ユーザー向け

三廻部氏:「現在の最新ステーブル版は<ver. 0.9.23>になっています。<ver 0.10>以降が開発版であり、プレステーブル版の<ver. 0.11>をいくつかリリースしたのち、最終の<ver 1.00>をリリースすることになります。世に出ているプラグインの最新版は、この<ver. 0.9.23>で動くようになっているはずで、これ以上のアップデートが出ることはない予定です」

「0.10系は開発版であり、安定していません。トライ&エラーの繰り返しをしており、一般的に使われているプロダクション環境では使われていません。プラグインはこの0.10系に追いつき、かつ0.10系の最新でも動くように開発を進めています。ユーザーさんとしては、とりあえず0.9系のプラグインを使いつつも最新にしていくことが、安定した使い方になると思います。

0.11系は基本的に最終の1.0系と同じものになる予定です。0.11.0をリリースした時点で、プラグインのためのAPIやSPIは定義されます。ですので、0.11系で使えるプラグインはそのまま1.0系でも使える予定です。しかしその過程で、0.9系向けに作られていた古いプラグインの一部には動かなくなるものが出ると思います」

開発者向け

続いて、プラグインの開発を手掛ける開発者向けのアップデート状況とその対応方法について紹介されました。

三廻部氏:「プラグインを開発していくにあたり、アップデートへの対応には大きく2つの選択肢があります。」

  1. 0.10系の開発進行に従って、順次最新版をキャッチアップしてもらう方法
  2. <ver. 0.11>がリリースされ、<ver 1.00>のAPIとSPIが定義され次第、一気にキャッチアップする方法

三廻部氏:「Embulkの開発チームではGitHubのEmbulk以下にある、いわゆる公式プラグインに近いものは①の方法に沿って順次アップデートしていきます。②の方法は開発者にとって、間違いなく楽だと思います。ただ、「実はこうして欲しかった」というフィードバックを<ver. 0.11>のリリース以降でいただいても対応できない場合もあることだけ留意してください」

詳しくはこちらのイベントレポートからご覧ください。

Embulk を利用したデータ統合SaaSの構築と運用

鈴木 健太

高際 兼一

データ統合SaaS、「trocco®」を開発、運営する弊社からは鈴木が講演しました。「trocco®」 の簡単な紹介と、ツールを支えるジョブ基板の構築とその運用について語りました。

株式会社primeNumber CTO 鈴木健太

鈴木:「データの分析基盤の構築全般を支援するSaaSが『trocco®』です。主にデータ転送の部分で、Embulkを利用しております。設定作成の際に接続テストを行なったり、Embulkのエラーに慣れていない方にも使いやすい設計であることが特徴です。
ユーザーには、画面上ののUIから入力するだけで、Embulkのコンフィグを意識せずに利用できるようなツールになっています」

UI上で設定した値が、Embulkのconfigと対応している

Kubernetesで運用されている「trocco®」のジョブ基盤

「trocco®」のジョブ基盤は、現状Kubernetesで運用されています。システム要件としては、ジョブ実行数は1日1万件くらいを捌いている状態で、ジョブによっては実行時間がバラバラであり、時間帯に偏りがあります。ジョブごとにCPUやメモリを調整してリソースを変更したいということも条件の1つでした。

「スケーラブルで安定的なジョブ基盤の構築をシンプルに実現するために、『trocco®』はKubernetes(EKS)を採用しております。UIはECSで動いており、転送ジョブの方がEKSで動いております。

Kubernetesを使用している理由として、ジョブの立ち上げをKubernetesに任せられることが要素として大きいです。create jobリクエストさえ成功すれば、nodeのリソース状況をみてKubernetesがリソース状況を見て、job(コンテナ)を立ち上げてくれます。また、Cluster Autoscalerにより、jobを立ち上げるリソースがない場合は、nodeのスケールアウトを行います。 

各コンテナがSQSをpollingしてジョブを実行し、完了したら再度pollingして、次のジョブを実行する『Fan-Out方式』を採用していましたが、Dispatcherがキューをpollingし、ジョブ単位で都度コンテナを立ち上げる『Dispatcher方式』に『trocco®』は移行しています。その理由として、コンテナのイメージタグを差し換えるだけでデプロイが完了するシンプルさがあります。ですので、実行中のジョブは基本的に影響を受けません。『trocco®』は多くのお客様にご利用いただいており、その都度コンテナ立ち上げ、完了後には破棄することで顧客のデータが混じることなく、セキュリティーを担保できています。

また、開発環境と本番環境を揃えて開発できるため、開発環境でKubernetesクラスター立ち上げて、本番環境と同じような状況を再現することが可能になることもポイントですね。

安定運用が『trocco®』の価値でもあるので、今後も品質を改善していきます」

embulk-output-bigquery_java

高際:「trocco®」の中では、様々なinとoutのEmbulk pluginが開発されています。今回はBig QueryのJavaバージョンについて紹介します。

Embulk同様に、embulk-output-bigquery pluginも長い歴史を持っており、もともとTreasureDataの方々が開発したもので、アウトプットファイルベースのプラグインでした。その後にJRubyで実装して頂いた時にアウトプットベースの実装に変わりました。

「trocco®」でとある案件の顧客から『指定の時間内にどうしても転送を終わらせたい』という要望がありました。。 しかしJRubyだとパフォーマンス的に無理ということが分かり、今年の2月頃からJRubyの実装を参考に開発するようになりました。もう1つの開発のモチベーションとして、Big Queryをデータウェアハウスとして使用されているお客様が多く、その転送時間をミニマイズすることでECSのランタイムが短くなればコストも下がるのではないかという仮説から開発を進めました

最後に、JRubyからJavaに置き換えたことでどの程度早くなるか、テストで計測した結果について発表しました。

環境としてはEC2を立てて、その中にローカルファイルを置いて、Big Queryに転送しました。計測したところは、図にある通りinputからデータを取得し、output threadが中間ファイルに書き出すまでスループットです。2.5倍くらい速くなってます。今回はあくまでベーシックなテストです。文字列をtimestampに変換するとtimestampのパースが遅いので、パフォーマンスが落ちたりしますが、運用している肌感として転送速度全体で最低でも1.2 ~ 1.5倍くらい早くなっております。

現在は、EmbulkのアウトプットBig Query(embulk/embulk-output-bigquery)と「trocco®」にあるEmbulk Output(trocco-io/embulk-output-bigquery_java)の2つのレポジトリがある状態です。embulk-output-bigquery_javaがembulk-output-bigqueryのコンフィグをほぼ満たした時に、EmbulkのアウトプットBig Queryの方にJavaバージョンをマージをして正式にリリースします。

Digdag updates

山縣 陽氏

TreasureDataの山縣氏からは、以下の3つのアジェンダについて語られました。

  1. v0.9のアップデート
  2. 次回のメジャーアップデート
  3. GraalJSのマイグレーション

詳しくはslideshareからご覧ください。

Digdagを利用したデータ基盤とその運用Tips

2018年にZOZOテクノロジーズに新卒入社し、データ基盤やプッシュ通知、LINE配信基盤のマーケティングオートメーションチームリーダーを務めている田島氏からは、Digdagの利用事例として同社のデータ基盤と、運用における3つのTipsが紹介されました。

田島 克哉氏

田島氏:ZOZOTOWNでのデータ基盤は、『ZOZOTOWN』のデータベースや、ファッションコーディネートアプリ『WEAR』のデータベースといった、複数のデータベースをBig  Queryに連携して、データマートをBig  Query上に構築しています。

その中で、データ連携をする際のワークフロー管理や並列処理させるためにDigdagを使っています。

そうした日々のDigdag運用の中で得たTipsをお話させていただきます。

ZOZOテクノロジーズのデータ基盤アーキテクチャ

Tips1:ワークフローの階層化

ワークフローを階層化するメリットとして、プログラミングでいうところのサブルーチンと同じようなメリットがあります。つまり、『責任が分割できる』『ワークフローの再利用が可能に』『ワークフローごとに独立して実行が可能になる』というメリットがあります。

Tips2:エラーハンドリング

Digdagのエラーハンドリングでよくやるものとして、『_errocr』を使ってキャッチし、Slackのプラグインを使って通知されているかと思います。しかし、Digdagのワーカーがジョブを実行していると『_error』が実行されなかったり、無言でワークフローが死ぬ問題があったりと、正確に通知されないことがあります。その解決策としてnotification機能があります。エラーが発生したタイミングでshell commandか、httpリクエストか、メール送信することができる機能です。

Tips3:自動スケールイン

スケールアウトの場合はサーバーが増えてしまえばDigdagが勝手にjobを取ってきて実行できるようにすれば問題がありません。しかしスケールインする場合、Digdagがjobをつかんでいたら、そのサーバーが死んでしまうとジョブは失敗します。そこでワーカーがjobをつかんでいないタイミングでスケールインをする必要があるのですが、その場合、ワーカーがjobを掴んでいないか判断しなければなりません。

そこで使ってないワーカーのself shutdown作戦として、「HARAKIRIスクリプト」っていうものを作成しました。lock_agent_IDの一覧を取得し、自分自身のホスト名が含まれていなければself shutdownするというスクリプトで、クーロンで実行しています。

まとめ

2〜3年ほどDigdagを運用してみての感想ですが、フルに機能を活用する場合、コードレベルまでの把握は必要だと思っています。逆にシンプルに使う分にはそこまで必要ありません。最後に、Digdagはめちゃめちゃ便利です!とお伝えさせていただきます!

スケーラブルなワークフロー実行環境を目指して

2020年初めてのEmbulk&Digdagオンラインミートアップ、最後は株式会社ZOZOテクノロジーズの平田氏が、ワークフローエンジンの問題点から考える理想の定義やKubernetes上で達成したスケーラリビティについて紹介しました。

平田 拓也氏

ZOZOではこれまでワークフローエンジンとして、GCPのマネージドサービスのCloud Composerを使用していました。ここでのメリットとして、まずPythonによるDAG定義が柔軟にできること。初めからさまざまなオペレーターが用意されていることで、少ない記述量で書くことができることがメリットです。また、必要なインフラを自動で構築・運用してくれる点もメリットですね。その一方で、『開発効率が悪い』『Workerのレプリカが固定でコストがかかること』『不安定な挙動がある』というデメリットもあります。そこで、ワークフローエンジンの理想形を以下のように定義しました。

  • 高い開発効率
  • スケーラビリティがある
  • 安定している

そのために、DigdagをKubernets(GKE)上でスケーラブルに動かすために、

  • Digdagサーバーを機能ごとに分けて動かす
  • スケールするインフラ構成を組む
  • Kubernetsの水平スケーラー(HPA)

この3つを行いました。

その後、Digdag v0.10の新機能「KubernetesCommandExecutor」でより柔軟に動かす方法をセッションでは紹介。結論としてGKE上でホストしてあげることでスケーラビリティを確保することができたそうです。独自の水平スケーラーを作ることでWorkerを安全にスケールアウト・スケールイン可能に。その一方で、ワークフローが終わらないとスケールインができないため、改善の余地はあるとしています。

最後に

EmbulkとDigdagの活用事例を中心に、各社様々な工夫やTipsが紹介された本ミートアップ。新型コロナウイルス感染拡大の影響で、ますますデータエンジニアやデータサイエンティスト同士の直接的な交流が難しくなる中で、質疑応答ツールの活用で非常にインタラクティブなイベントになりました。イベントのアンケート結果や質疑応答の詳細、そして各セッションのより詳しい内容は以下のURLよりご確認ください。

trocco®は、ETL/データ転送・データマート生成・ジョブ管理・データガバナンスなどのデータエンジニアリング領域をカバーした、分析基盤構築・運用の支援SaaSです。データの連携・整備・運用を効率的に進めていきたいとお考えの方や、プロダクトにご興味のある方はぜひ資料をご請求ください。

hirokazu.kobayashi

慶應義塾大学卒業後、2014年より株式会社リブセンスへ入社。データエンジニアとして同社分析基盤立ち上げをリードする。2017年より現職primeNumberに入社。自社プロダクト「systemN」におけるSpark/Redshift活用等のデータエンジニアリング業務を行うかたわら、データ統合業務における工数削減が課題だと感じ、データ統合を自動化するサービス「trocco®」を立ち上げる。