UberでのKafkaのスケーリング
私もKafkaを使っているが、このレベルのデータ量やトピック数、パーティション数を扱ったり、パフォーマンスを求められてる訳ではないため、難易度が格段に上がる印象を受けた。
パフォーマンスを優先させながらも、データロスを抑えたり、検知する仕組みを導入して解決したのが面白かった。
また、複数のデータセンター間でのKafkaメッセージの集計を漏れなく行い続けるだけでも、トピック数やデータ量が増え続ける環境下では難しく、オープンソースを開発して解決したのも海外の大きいテック企業らしいと感じた。
How Uber scaled its Real Time Infrastructure to Trillion events per day
Use cases & Current Scale
ほとんどリアルタイムで処理が行われている。
- ドライバーとのマッチング、到着時間の計算
- UberEatsの料金計算
- 不正検知
- ドライバー、乗客の新規登録など
Kafkaのユースケース
- pub/sub
- ストリーム処理
- 分析
- データベースのchangelogの転送
- ログ
How We Scaled The Infrastructure
stats: 1兆メッセージ/day, PBsレベルのデータ量、数万のトピック
要件
- 水平スケーリング
- APIレイテンシーは5ms以下
- 99.99%の可用性
- 99.99%の永続性(顧客に致命的なものは100%)
- マルチDCのレプリケーション
- Java, Go, Python, Node.js, C++に対応
- Auditing
パイプライン
高速化させるために、各ステージで非同期でバッチ処理するようにしている。そうすることで、数ms以内にレスポンスを返すことができる。
Rest Proxy & Clients
なぜ、Rest Proxyを挟むか?
- クライアントのAPIを単純化して、複数の言語に対応するため
- Kafka brokerとクライアントを疎結合にするため
- 薄いクライアントだと、オペレーションが簡単に
- Kafkaのバージョンアップを行いやすく
- 信頼性を向上するため
内部
- オープンソースのRest Proxyがベース
- パフォーマンスの改善
- トピックのメタデータのキャッシュ
- 信頼性の向上
- Fallbackクラスターのサポート
- 複数producerのサポート
- コミュニティに還元する予定
Local Agent
producerライブラリ(アプリケーションのProxyクライアント)
Local Agent
uReplicator (MirrorMaker)
Kafkaのデータを高速かつ信頼性高く、複製するためのオープンソース。KafkaのMirrorMakerをベースに信頼性を高め、データロスが0になるよう保証させるために拡張させた。
MirrorMakerとは
Uberでは複数のデータセンターのKafkaクラスターを持っているが、ビジネスロジックや分析のために1つのデータセンターにデータをまとめないといけない。データセンター間のレプリケーションにMirrorMakerを使っていた。仕組みとしてはシンプルで、対象のクラスターからデータを取得するようにconsumeして、それを集計先にプロデュースする。
UberではMirrorMakerを導入したが、徐々にスケーリングの問題が発生して、遅延やデータロスが発生した。主な原因は以下の通り。
- リバランスのコストが高い
- リバランスすると処理が止まってしまい、メッセージが蓄積してリバランス終了後に大量のデータが流れて、consumer側に問題が発生した。
- トピック追加が難しい
- MirrorMakerの対象に含めるかどうかをトピックのホワイトリストで管理しなければならないため、変更の度にリスタートが必要。
- データロスが発生: (過去のバージョンのバグ)
- メタデータの同期
どのように解決したか
- 複数のMirrorMakerクラスターに分割
- レプリケーションにApache Samzaを使用
- Apache HelixベースのKafka consumerを使用
uReplicatorの概観
4つのコンポーネントから成る。
- Helix uReplicator controller
- uReplicator worker
- Helix agent
- topicの変更の通知を受け取り、DynamicKafkaConsumerにトピックとパーティの操作を指示
- DynamicKafkaConsumer
- 高レベルのconsumerの修正で、リバランスをなくし、オンラインでトピックとパーティションの操作を行う仕組みを追加する役割
Reliable Messaging & Tooling
At-Least-Once
高スループットをサポートするために、永続化前にackedしているので、ノードが永続化する前にダウンするとデータロストしてしまう。ログなどの情報はそれでも許容できるが、支払いなどの絶対データロストできないものには不十分。
支払いなどは、正規のKafka brokerがackを返すまで待ち、永続化させる。その代わりレイテンシーは大きくなってしまう。
Chaperone (Auditing)
各ステージで10秒のウィンドウ時間ごとに、Chaproneという内製ツールにauditを保存し、ステージ毎の差分を見て、データロストが発生していないかを監視する。
概要
4つのコンポーネントから成る。
- AuditLibrary
- auditのアルゴリズム、定期的に10分のwindowごとに集計して結果をメッセージとして出力する。
- Kafka Rest ProxyやKafka brokerにメッセージを渡す。
- ChaperoneService
- Kafkaからメッセージをconsumeして、auditのためにタイムスタンプを保存する
- 定期的に、あるKafkaのトピックにauditメッセージを生成する
- ChaperoneCollector
- ChaperoneServiceの生成したauditメッセージを取得してDBに保存する
- そのDBからダッシュボードを表示して、遅延やデータロスが発生していないか確認できる
- WebService
- webフロントエンドで、UIから簡単にメトリクスを確認できる
設計要件
- 各メッセージを漏れなく重複なく(exactly once)数える
- メッセージにUUIDを付与し、Kafkaに送信する前にWALに永続化させる。Kafkaからackが返ってきたらdoneのステータスに更新する。
- クラッシュした場合は、そのステータスを見て再送する。
- ティアにまたがったメッセージを監査するために一貫したタイムスタンプを使う
- JSONにタイムスタンプが含められるが、decodeしているとパフォーマンス上問題が出るので、独自のパーサーを開発した
ユースケース
- データロスの検出
- この開発までは、メッセージのconsumerがデータロスに気づいてからしか判明できず、特定や原因の調査も難しかった
- Kafkaで使用可能なオフセットを超えてデータを読み取る
- Kafkaは過去のデータの取得に制限があるが、同じインターフェースで過去のデータも取得できるようにした
- こうすることで、開発者は特定の時間の問題をデバッグしたり、必要あればメッセージをリプレイできる
Cluster Balancing
- 自動のリバランスがサポートされていない
- 手動の配置は難しい