lasciva blog

開発して得た知見やwebビジネスのストック

UberでのKafkaのスケーリング

私もKafkaを使っているが、このレベルのデータ量やトピック数、パーティション数を扱ったり、パフォーマンスを求められてる訳ではないため、難易度が格段に上がる印象を受けた。
パフォーマンスを優先させながらも、データロスを抑えたり、検知する仕組みを導入して解決したのが面白かった。
また、複数のデータセンター間でのKafkaメッセージの集計を漏れなく行い続けるだけでも、トピック数やデータ量が増え続ける環境下では難しく、オープンソースを開発して解決したのも海外の大きいテック企業らしいと感じた。

How Uber scaled its Real Time Infrastructure to Trillion events per day

www.youtube.com

Use cases & Current Scale

ほとんどリアルタイムで処理が行われている。

  • ドライバーとのマッチング、到着時間の計算
  • UberEatsの料金計算
  • 不正検知
  • ドライバー、乗客の新規登録など

UberではApache Kafkaを使っている。

Kafkaのユースケース

  • pub/sub
  • ストリーム処理
  • 分析
  • データベースのchangelogの転送
  • ログ

How We Scaled The Infrastructure

stats: 1兆メッセージ/day, PBsレベルのデータ量、数万のトピック

f:id:hacking15dog:20200618175500p:plain

要件
パイプライン

f:id:hacking15dog:20200618175535p:plain

高速化させるために、各ステージで非同期でバッチ処理するようにしている。そうすることで、数ms以内にレスポンスを返すことができる。

Rest Proxy & Clients

なぜ、Rest Proxyを挟むか?
  • クライアントのAPIを単純化して、複数の言語に対応するため
  • Kafka brokerとクライアントを疎結合にするため
    • 薄いクライアントだと、オペレーションが簡単に
    • Kafkaのバージョンアップを行いやすく
  • 信頼性を向上するため
内部

Local Agent

producerライブラリ(アプリケーションのProxyクライアント)
Local Agent
  • producer側の永続性(ローカルに保存)
  • 下流のダウンやbackpressureからクライアントを分離
  • リカバリした際に、過剰なリクエストを送ってクラスターを圧倒しないようにコントロール

uReplicator (MirrorMaker)

Kafkaのデータを高速かつ信頼性高く、複製するためのオープンソース。KafkaのMirrorMakerをベースに信頼性を高め、データロスが0になるよう保証させるために拡張させた。

ソースコード:GitHub eng.uber.com

MirrorMakerとは

Uberでは複数のデータセンターのKafkaクラスターを持っているが、ビジネスロジックや分析のために1つのデータセンターにデータをまとめないといけない。データセンター間のレプリケーションMirrorMakerを使っていた。仕組みとしてはシンプルで、対象のクラスターからデータを取得するようにconsumeして、それを集計先にプロデュースする。

UberではMirrorMakerを導入したが、徐々にスケーリングの問題が発生して、遅延やデータロスが発生した。主な原因は以下の通り。

  • リバランスのコストが高い
    • リバランスすると処理が止まってしまい、メッセージが蓄積してリバランス終了後に大量のデータが流れて、consumer側に問題が発生した。
  • トピック追加が難しい
    • MirrorMakerの対象に含めるかどうかをトピックのホワイトリストで管理しなければならないため、変更の度にリスタートが必要。
  • データロスが発生: (過去のバージョンのバグ)
  • メタデータの同期
どのように解決したか
uReplicatorの概観

f:id:hacking15dog:20200618175730p:plain

4つのコンポーネントから成る。

  1. Helix uReplicator controller
  2. uReplicator worker
    • 特定のトピックのセットをソースのクラスターからターゲットのクラスターに複製する。
    • リバランスの代わりに、controllerがuReplicatorアサインを決める。
    • また、Kafkaの高レベルのconsumerを使う代わりに、DynamicKafkaConsumerというシンプルなバージョンのものを使う
  3. Helix agent
    • topicの変更の通知を受け取り、DynamicKafkaConsumerにトピックとパーティの操作を指示
  4. DynamicKafkaConsumer
    • 高レベルのconsumerの修正で、リバランスをなくし、オンラインでトピックとパーティションの操作を行う仕組みを追加する役割

ref: 詳細な設計

Reliable Messaging & Tooling

At-Least-Once

スループットをサポートするために、永続化前にackedしているので、ノードが永続化する前にダウンするとデータロストしてしまう。ログなどの情報はそれでも許容できるが、支払いなどの絶対データロストできないものには不十分。

支払いなどは、正規のKafka brokerがackを返すまで待ち、永続化させる。その代わりレイテンシーは大きくなってしまう。

Chaperone (Auditing)

f:id:hacking15dog:20200618175836p:plain

各ステージで10秒のウィンドウ時間ごとに、Chaproneという内製ツールにauditを保存し、ステージ毎の差分を見て、データロストが発生していないかを監視する。

ソースコード: GitHub

eng.uber.com

概要

4つのコンポーネントから成る。

  • AuditLibrary
    • auditのアルゴリズム、定期的に10分のwindowごとに集計して結果をメッセージとして出力する。
    • Kafka Rest ProxyやKafka brokerにメッセージを渡す。
  • ChaperoneService
    • Kafkaからメッセージをconsumeして、auditのためにタイムスタンプを保存する
    • 定期的に、あるKafkaのトピックにauditメッセージを生成する
  • ChaperoneCollector
    • ChaperoneServiceの生成したauditメッセージを取得してDBに保存する
    • そのDBからダッシュボードを表示して、遅延やデータロスが発生していないか確認できる
  • WebService
    • webフロントエンドで、UIから簡単にメトリクスを確認できる
設計要件
  1. 各メッセージを漏れなく重複なく(exactly once)数える
    • メッセージにUUIDを付与し、Kafkaに送信する前にWALに永続化させる。Kafkaからackが返ってきたらdoneのステータスに更新する。
    • クラッシュした場合は、そのステータスを見て再送する。
  2. ティアにまたがったメッセージを監査するために一貫したタイムスタンプを使う
    • JSONにタイムスタンプが含められるが、decodeしているとパフォーマンス上問題が出るので、独自のパーサーを開発した
ユースケース
  • データロスの検出
    • この開発までは、メッセージのconsumerがデータロスに気づいてからしか判明できず、特定や原因の調査も難しかった
  • Kafkaで使用可能なオフセットを超えてデータを読み取る
    • Kafkaは過去のデータの取得に制限があるが、同じインターフェースで過去のデータも取得できるようにした
    • こうすることで、開発者は特定の時間の問題をデバッグしたり、必要あればメッセージをリプレイできる

Cluster Balancing

  • 自動のリバランスがサポートされていない
  • 手動の配置は難しい