lasciva blog

開発して得た知見やwebビジネスのストック

UberでのKafkaのスケーリング

私もKafkaを使っているが、このレベルのデータ量やトピック数、パーティション数を扱ったり、パフォーマンスを求められてる訳ではないため、難易度が格段に上がる印象を受けた。
パフォーマンスを優先させながらも、データロスを抑えたり、検知する仕組みを導入して解決したのが面白かった。
また、複数のデータセンター間でのKafkaメッセージの集計を漏れなく行い続けるだけでも、トピック数やデータ量が増え続ける環境下では難しく、オープンソースを開発して解決したのも海外の大きいテック企業らしいと感じた。

How Uber scaled its Real Time Infrastructure to Trillion events per day

www.youtube.com

Use cases & Current Scale

ほとんどリアルタイムで処理が行われている。

  • ドライバーとのマッチング、到着時間の計算
  • UberEatsの料金計算
  • 不正検知
  • ドライバー、乗客の新規登録など

UberではApache Kafkaを使っている。

Kafkaのユースケース

  • pub/sub
  • ストリーム処理
  • 分析
  • データベースのchangelogの転送
  • ログ

How We Scaled The Infrastructure

stats: 1兆メッセージ/day, PBsレベルのデータ量、数万のトピック

f:id:hacking15dog:20200618175500p:plain

要件
パイプライン

f:id:hacking15dog:20200618175535p:plain

高速化させるために、各ステージで非同期でバッチ処理するようにしている。そうすることで、数ms以内にレスポンスを返すことができる。

Rest Proxy & Clients

なぜ、Rest Proxyを挟むか?
  • クライアントのAPIを単純化して、複数の言語に対応するため
  • Kafka brokerとクライアントを疎結合にするため
    • 薄いクライアントだと、オペレーションが簡単に
    • Kafkaのバージョンアップを行いやすく
  • 信頼性を向上するため
内部

Local Agent

producerライブラリ(アプリケーションのProxyクライアント)
Local Agent
  • producer側の永続性(ローカルに保存)
  • 下流のダウンやbackpressureからクライアントを分離
  • リカバリした際に、過剰なリクエストを送ってクラスターを圧倒しないようにコントロール

uReplicator (MirrorMaker)

Kafkaのデータを高速かつ信頼性高く、複製するためのオープンソース。KafkaのMirrorMakerをベースに信頼性を高め、データロスが0になるよう保証させるために拡張させた。

ソースコード:GitHub eng.uber.com

MirrorMakerとは

Uberでは複数のデータセンターのKafkaクラスターを持っているが、ビジネスロジックや分析のために1つのデータセンターにデータをまとめないといけない。データセンター間のレプリケーションMirrorMakerを使っていた。仕組みとしてはシンプルで、対象のクラスターからデータを取得するようにconsumeして、それを集計先にプロデュースする。

UberではMirrorMakerを導入したが、徐々にスケーリングの問題が発生して、遅延やデータロスが発生した。主な原因は以下の通り。

  • リバランスのコストが高い
    • リバランスすると処理が止まってしまい、メッセージが蓄積してリバランス終了後に大量のデータが流れて、consumer側に問題が発生した。
  • トピック追加が難しい
    • MirrorMakerの対象に含めるかどうかをトピックのホワイトリストで管理しなければならないため、変更の度にリスタートが必要。
  • データロスが発生: (過去のバージョンのバグ)
  • メタデータの同期
どのように解決したか
uReplicatorの概観

f:id:hacking15dog:20200618175730p:plain

4つのコンポーネントから成る。

  1. Helix uReplicator controller
  2. uReplicator worker
    • 特定のトピックのセットをソースのクラスターからターゲットのクラスターに複製する。
    • リバランスの代わりに、controllerがuReplicatorアサインを決める。
    • また、Kafkaの高レベルのconsumerを使う代わりに、DynamicKafkaConsumerというシンプルなバージョンのものを使う
  3. Helix agent
    • topicの変更の通知を受け取り、DynamicKafkaConsumerにトピックとパーティの操作を指示
  4. DynamicKafkaConsumer
    • 高レベルのconsumerの修正で、リバランスをなくし、オンラインでトピックとパーティションの操作を行う仕組みを追加する役割

ref: 詳細な設計

Reliable Messaging & Tooling

At-Least-Once

スループットをサポートするために、永続化前にackedしているので、ノードが永続化する前にダウンするとデータロストしてしまう。ログなどの情報はそれでも許容できるが、支払いなどの絶対データロストできないものには不十分。

支払いなどは、正規のKafka brokerがackを返すまで待ち、永続化させる。その代わりレイテンシーは大きくなってしまう。

Chaperone (Auditing)

f:id:hacking15dog:20200618175836p:plain

各ステージで10秒のウィンドウ時間ごとに、Chaproneという内製ツールにauditを保存し、ステージ毎の差分を見て、データロストが発生していないかを監視する。

ソースコード: GitHub

eng.uber.com

概要

4つのコンポーネントから成る。

  • AuditLibrary
    • auditのアルゴリズム、定期的に10分のwindowごとに集計して結果をメッセージとして出力する。
    • Kafka Rest ProxyやKafka brokerにメッセージを渡す。
  • ChaperoneService
    • Kafkaからメッセージをconsumeして、auditのためにタイムスタンプを保存する
    • 定期的に、あるKafkaのトピックにauditメッセージを生成する
  • ChaperoneCollector
    • ChaperoneServiceの生成したauditメッセージを取得してDBに保存する
    • そのDBからダッシュボードを表示して、遅延やデータロスが発生していないか確認できる
  • WebService
    • webフロントエンドで、UIから簡単にメトリクスを確認できる
設計要件
  1. 各メッセージを漏れなく重複なく(exactly once)数える
    • メッセージにUUIDを付与し、Kafkaに送信する前にWALに永続化させる。Kafkaからackが返ってきたらdoneのステータスに更新する。
    • クラッシュした場合は、そのステータスを見て再送する。
  2. ティアにまたがったメッセージを監査するために一貫したタイムスタンプを使う
    • JSONにタイムスタンプが含められるが、decodeしているとパフォーマンス上問題が出るので、独自のパーサーを開発した
ユースケース
  • データロスの検出
    • この開発までは、メッセージのconsumerがデータロスに気づいてからしか判明できず、特定や原因の調査も難しかった
  • Kafkaで使用可能なオフセットを超えてデータを読み取る
    • Kafkaは過去のデータの取得に制限があるが、同じインターフェースで過去のデータも取得できるようにした
    • こうすることで、開発者は特定の時間の問題をデバッグしたり、必要あればメッセージをリプレイできる

Cluster Balancing

  • 自動のリバランスがサポートされていない
  • 手動の配置は難しい

Uberのリアルタイムマーケットプラットフォームのスケーリング

Scaling Uber's Real-time Market Platformという2015年のUserによるスケーリングに関する講演を見たので、簡単にまとめた。
他の海外の大きなサービスと比べても、技術的にはかなり複雑なように感じた。ユーザとドライバーをマッチングさせるところだけでも、リアルタイムで位置情報を更新するように大量のwriteリクエストを捌く必要がある。また、その位置情報からどのドライバーが最適なのかを選ぶのも、車種や乗車可能な人数、距離の計算なども遅延なく行わなければならない。また、支払いや

技術的に面白かったのは、スケーラブルと可用性を達成するために、スレッドがkillされたりノードがダウンするのも正常系の一部とする前提で設計が行われていたことだった。位置情報からの高速な検索も全く触れたことがなかったので興味深かった。

アーキテクチャ

f:id:hacking15dog:20200616113411p:plain

  • dispatch: 乗客とドライバーのマッチングとか。
  • maps/ETA: 交通マップや到着時間の計算など。
  • database: 色んな種類のものが使われている(Postgre, Redis, MySQL, Riak)
  • Post trip pipeline: 到着後の処理を扱う(ドライバー評価、メール送信、支払いなど)
  • money: 外部の支払いサービスとの連携

Problems

旧システムでは、以下のような前提で設計されていて、スケールさせるのが難しかった。アンチパターンではあるが、Dispatchを全部書き直した。

  • 1ライダーに対して1台: Uber Poolとか他のビジネスに対応するのが難しい
  • 人が動く: UserEatsのように食品とかが動くように適応するのが難しい
  • 都市ごとにシャーディング: さらに都市を追加してスケールするためには、都市の大小差や負荷の差を考慮しなければならない
  • MPOF: Many Points Of Failures

Dispatch

supply(ドライバー)とdemand(乗客)をマッチングさせるサービス群。主にNode.jsで作られている。

位置情報は4秒に1回更新されるので、100万RPSに耐えられるぐらいかなりスケーラブルである必要がある。

主に以下のようなサービスから成る。

  • supply: ドライバーの情報(何人まで乗れるか、チャイルドシートはあるかなど)
  • demand: 注文の情報(何人乗る必要があるかなど)
  • DISCO: ビジネスロジックなど
  • geo by supply: ドライバーの位置情報
  • routing/ETA(Estimate Time of Arrival): 道路情報を加味して、距離を計算したりする
  • geo by demand: 乗客の位置情報(例えば、空港の何階にいるかなど)

S2

地球は球体なので、経緯度だけでは正確に計算を行うことが難しいため、S2を導入した。S2はGoogleのライブラリで、位置情報や計算を簡単に行える。

球体表面をcellで分割し、cellはidとlevelで表現され最小のcellは0.48cm2で、これは64bitで表現できる。

Level 最小面積 最大面積
0 85,011,012 km2 85,011,012 km2
1 21,252,753 km2 21,252,753 km2
12 3.31 km2 6.38 km2
30 0.48 cm2 0.93 cm2

このidをシャーディングとインデックスのキーとして使うことで、該当エリアにいるsupply一覧の取得等ができる。

Routing

目標

  • 待ち時間を減らす
  • 余分な運転を減らす
  • ETAで最小にする

ベストな選択は、現在乗車可能なものとは限らない。例えば、今から1分後に近くで空きになるタクシーがベストな選択かもしれない。

スケーリング

Node.jsはそれぞれのモバイルのクライアントとのstateを保持しているため、単純なアプリケーションの水平スケーリングは適用できない。DBなどと同様にアプリケーションをstatefulにスケールさせないといけない。そこで、ringpopを開発した。CAP定理ではAP型で、可用性を重視している。ゴシップ型のメンバー管理で、アプリケーションレイヤーのシャーディングを行っている。クライアントからリクエストが来ると、適切なノードにリダイレクトする。

SWIM(Scalable Weakly-consistent Infection-style Process Group Membership)プロトコルがベースになっている。

TChannelというプロトコルでノード間の通信を行う。以下のような目標をもって設計されたプロトコル

  • パフォーマンス: 特に複数の言語にまたがる箇所に関して。HTTPの20倍速い。
  • 転送
  • 言語サポート
  • 正規パイプライン
  • チェックサム / トレーシング
  • カプセル化

eng.uber.com

可用性

ダウンしていると、他のサービスに流れてしまうので、遅延実行できるようにするなどしておかないとビジネス的に重要。

  • すべてリトライ可能に
  • すべてkillできるようにする: 失敗やダウンは例外ではなく、正常系の一つとして扱えるように。
  • クラッシュのみに対応: グレースフルシャットダウンに対応すると複雑性も増し、ちゃんと機能するかの確認も必要になってしまう
  • 小さく分解する: 大きいインスタンスが少数だと、killするリスクが大きくなる

カルチャーチェンジ

  • すべて(DBでさえも)killする: Redisはリスタートのコストが高いので、この観点ではよくない

backup requests with cross server cancellation

分散システムで、レイテンシーを減らす方法。例えば、平均のレイテンシーは良いが、99パーセンタイルのレイテンシーが悪いときに、同じ種類のサービスに2リクエストを投げれば、どちらかのレスポンスは速く返ってくる可能性が高い。

詳細には、少し遅延してから2度目のリクエストを投げ、また1つ目のサービスは2つ目のサービスにキャンセルリクエストを投げる。遅延があることで、通常は1つ目のサービスのみが処理して、タイムアウトなどで失敗した場合は、2つ目のサービスがより低いレイテンシーで処理を返す。

更に詳細は、こちら

f:id:hacking15dog:20200616113539p:plain

データセンターのダウン

DCが切り替わってもセッションや処理が継続しているかのように見せるために、定期的に状態のダイジェストをクライアントに保持させておき、データセンターがダウンすると、その情報をもとに再開させる。

Facebookのライブビデオのスケーリング

最近、海外のサービスのスケーリングに関して色々勉強をしていて、面白かったのでまとめた。
動画系のバックエンドがあまり担当したことないので興味があった。動画配信と言っても、一度保存された動画を配信するのと、ライブ配信とでは考慮すべきことが異なっていて面白かった。

Scaling Facebook Live Videos to a Billion Users

www.youtube.com

2017年の発表

サービスの概要

12.3億DAU

ライブ配信と動画の違い

  • リアルタイムに即座に視聴者が見れるようにしないといけないこと
  • コミュニケーションを取れるソーシャル性

History

  • 2015年4月: ハッカソン(業務とは関係ないものを開発できる)
  • 2015年8月: Celebrityが配信できるように開放
  • 2015年12月: 全ユーザ向けにリリース

なぜライブか?

  • エンゲージメントが高い
  • 公開されたプロフィール
  • 世界をつなげる(現地の災害の人など、他のメディアでは共有できないようなコンテンツ)

ライブストリーミングインフラ

1. High Level Architecture

ライブストリーミング

f:id:hacking15dog:20200613011430p:plain

配信者はRTMPS(Real Time Messaging Protocol Secure)を使って、POP(Point Of Presence)に接続。

POPはデータセンターに接続し、データセンターはエンコーディングを行う。

データセンターはCDNや他のPOPに配信し、それ経由で視聴者に届く。

リソースの種類
  • コンピューティング
  • メモリ: ストリームのエンコード、デコード
  • ストレージ: 長期保存
  • ネットワーク: アップロードと視聴

2. Scaling Challenges

  1. 同時配信数
    • 夕方にピークが来るなど予測できる
    • Ingest Protocol, ネットワークキャパシティ、サーバのエンコーディングリソース
  2. 全ストリームの視聴者数
    • こちらも規則性があり、予測できる
    • Delivery protocol, ネットワークキャパシティ、効率的なキャッシュ
  3. 単一ストリームの最大視聴者数
    • 予想が難しい
    • キャッシュ、ストリームの分散

Facebookのライブビデオはどのように異なるか

  • 事前のキャッシュがトリッキー
  • 視聴者数の予測が困難
  • ライブイベントの計画と事前にリソースをスケールさせるのが問題
  • 世界的なイベントによる同時配信のスパイクの予測が難しい

3. Protocol

ブロードキャストプロトコルに必要なこと
  • 開発のかかる時間: リリースまで4〜8ヶ月しかない
  • ネットワークの互換性: Facebookのインフラだけでは完結しない
  • E2Eのレイテンシー: 大きいとインタラクティブなコミュニケーションが取れない
  • アプリケーションの大きさ: Facebookのアプリに搭載するので大きくはできない

f:id:hacking15dog:20200613011448p:plain

エンコーディングのプロパティ
  • アスペクト比: 1:1
  • 解像度: 400x400, 720x720 (ネットワークが十分に発達していない地域も考慮)
  • AUDIO CODEC: AAC 64KBIT
  • VIDEO CODEC: H264 500KBIT 1MBIT

4. Stream Ingestion and Processing

POPのメリットは、POPやデータセンターのネットワークが安定して速いことと、キャッシュ。

f:id:hacking15dog:20200613011507p:plain

POPは2種類のホストからなる

  • Proxygen Hosts: 配信者とのコネクションを維持し、適切なDCにデータを送る
  • BigCache Hosts: キャッシュ

データセンターは上の2つにEncodingを加えた3種類のホストからなる。配信者がネットワークを切り替えて新しいコネクションを開始しても、Encodingのホストが変わらないようにするために、ProxygenとEncodingのマッピングは重要。

Encoding Hostの役割
  • ストリームの認証
  • ストリームとホストを結びつける
  • エンコーディングの生成
  • プレイバック用の出力の生成
  • ビデオ用に保存

5. Playback: HTTP streaming (MPEG-DASH)

DASHはHTTP上のストリームプロトコル

マニフェストの取得のフロー

クライアントがPOPにアクセスする。POPはキャッシュがあればそれを返し、なければデータセンターにアクセスする。データセンターはキャッシュがあればそれを返し、なければEncodingホストから取得してキャッシュホストに保存して、POPに返す。POPはキャッシュに保存して、クライアントに返す。

キャッシュをPOPとデータセンターを2段階に分けることで、低いコストでスケールでき、必要に応じて別々にスケールアウトできる。データセンターへのアクセス数はPOPの数と同じになる。

6. Reliability Challenges

ネットワークの問題

配信者とPOPとの間にネットワークの問題が発生すると、どうしようもない。

  • Adaptive Bit Rateで配信者と通信できるようにすることで、ネットワークが悪くなっても継続できるようにする。
  • バッファーを利用して、一時的な切断をハンドリングする
  • 音声のみ

7. New Features

  • 複数の配信者が同時に画面に登場できる: 会話がスムーズに成立するように数十ms程度のレイテンシーにしなければならない
  • ビデオタブ(一覧から探せる): 複数のストリームを扱わなければならない
  • バイスキャスティング: apple TVなどのプラットフォームに対応
  • 360度

8. Lessons Learned

  • 大きなサービスは小さな出発点から成長できる
  • ネットワーク環境にサービスを適応させる
  • リライアビリティとスケーラビリティは設計に食い込み、後からは追加できない
  • ホットスポットと大規模なトラフィックは複数のコンポーネントで起こりうる
  • 計画されるものとされないサービスダウンに備えて設計する
  • 大きいプロジェクトを届けるためには、妥協もする
  • 将来の機能のために、柔軟性のあるアーキテクチャを保つ

Q&A

  • ポルノ対策は? - システムと人力でモニタリングしているが、難しい問題

Scaling Facebook Live

atscaleconference.com

テスト方法

  • ロードテストサービスを構築
  • 視聴者がグローバルになるように
  • 本番環境の10倍の負荷

E2Eレイテンシーの調整

Push vs Pull

Pullモデルの場合、POPがデータセンターからキャッシュを取得した際に生じる時差がPullしたタイミングによって、レイテンシーが大きくなってしまう可能性がある。さらに、POP間でのレイテンシーの差も大きくなりやすい。

Pushモデルの場合、レイテンシーが小さく抑えることができ、POP間の差も小さくできる。

「Dynamo: Amazon’s Highly Available Key-value Store」を読んだ

NoSQLの勉強をしていて、またDynamoDBを使う機会もあり、Dynamoの論文がNoSQLの原典的な存在らしく、1度読んでみたかったので読んだ。
全部理解できた訳ではないが、データ指向アプリケーションデザイン ―信頼性、拡張性、保守性の高い分散システム設計の原理Database Internals: A Deep Dive into How Distributed Data Systems Work (English Edition)を以前読んだので、要点はまぁまぁ抑えれた気がする。

私はMySQLを使うことが多いので、コンフリクトを防ぐための書き込みのロック等は当たり前の存在だと思っていた。しかしDynamoは書き込みを優先させるために、読み込みの際にコンフリクトを解消するアプローチをしていて、技術で解決するとはこういうことかと思って面白かった。

英語版のペーパー

他の方による日本語翻訳: Dynamo: Amazonの高可用性Key-value Store[和訳] · GitHub

気になったところのメモ

2. Background

2.1 System Assumptions and Requirements

ACID

ACIDを担保すると可用性が失われてしまうので、弱い一貫性(C)を提供し、分離性(I)も提供せず単一のキーのみの更新しか許可しない。

2.3 Design Considerations

RDBでは強一貫性を提供するために、同期的なレプリケーションを提供している。そのため可用性が落ちる。Dynamoでは、それを防ぐために結果整合性を持つように設計された。

それに伴って更新衝突を「いつ解決するのか、誰が解決するのか」という問題を解決しないといけない。RDBではwriteの実行中に解決してreadがシンプルになるように設計されている。そのため、writeのときにレプリカを更新しなければwriteは却下される。それに対して、Dynamoはwriteの可用性を優先させるために、readの段階で衝突解決を試みる。

「誰が解決するのか」という問題はデータストアかアプリケーションのどちらかになる。データストアはどれが正しいかを知るのは難しいため、last write winsのようなシンプルなポリシーになる。アプリケーション側での実装によって衝突をより適切に解決できる。

4. System Architecture

4.1 System Interface

getputの2種類の命令のみをサポートする。シンプルなインターフェイスでkeyと関連づけられたオブジェクトをストアする。

4.2 Partitioning Algorithm

大容量のデータを扱えるように線形スケールしなければならない。そのため、ノード間の動的なパーティショニングができなければならない。コンシステント・ハッシングを採用して解決。

コンシステント・ハッシング

ノードの追加/削除の際に、隣接ノードのみに影響を留めることができる。
一般的なコンシステント・ハッシングの問題点としては、「各ノードのランダムな位置割り当てでは不均一なデータと負荷分散になってしまう」ことや、「ノードのパフォーマンスのばらつきが考慮されていない」ことが挙げられる。
それに対して、Dynamoでは「仮想ノード」の概念を使い、各ノードに1つ以上の「仮想ノード」を割り当てる。そうすることで、各ノードが環の複数の点に関連付けることができ、次のような利点がある。

  • あるノードがダウンまたは削除されたら、このノードによって処理されていた負荷は残りの使用可能なノードに平等に分散できる
  • ノードが復活したり追加されたら、新しいノードに対して他の使用可能なノードからおおよそ同じ量だけの負荷を受け入れられる
  • ノードに割り当てられる仮想ノードの数は、その物理インフラのばらつきによるキャパシティから決定される

4.4 Data Versioning

Dynamoでは高い可用性を担保するためノード間で分断が起きたりして最新のアイテムが取得できない場合でも、書き込みを続ける。例えばカートの最新の状態が不明でも、カートの追加処理を行うなど。そうすると、複数のバージョンが発生してしまうが、アプリケーション側で適切に解決させる。 Dynamoではこの同一アイテムのバージョン管理のために vector clocksを導入している。 vector clocksはノードとカウンターのペアのリストである。書き込みの際には、どのバージョンに対して書き込むか(以前の読み込みの際の値)をコンテキストで指定させることで、バージョンの継承関係を管理する。読み込みの際にバージョンに衝突があった際は、すべてのありうるバージョンを返して、リコンサイルさせて新しいバージョンを作る。
バージョンがあまりに多いとパフォーマンス上の問題となるので、上限値を設定して、古いバージョンは削除するようにしている。

4.6 Handling Failures: Hinted Handoff

Dynamoでは伝統的なquorumの手法ではなく、可用性や永続性を担保するために「緩やかなquorum」を採用している。あるノードがダウンしてしまったら、そのノードが保存すべきデータは、本来そのデータを割り当てられてはいない他のノードが一時的に保存し、ダウンしていたノードが復活したらそのデータを渡すことで、レプリカ数を減らさずに処理を続行できる。
また、Dynamoでは異なるデータセンター間で保存することで、データセンター全体の障害によるデータ消失を防いでいる。

4.7 Handling permanent failures: Replica synchronization

仮想ノードごとに異なるマークル木を管理することで、差分の検知と同期のパフォーマンスを向上させている。

5. Implementation

read repair

リクエストコーディネーターがノードからレスポンスを受け取った際に、古いデータだった場合、内部で非同期的にwriteリクエストを送って最新の状態にする。

6. Experiences & Lessons Learned

6.1 Balancing Performance and Durability

Dynamoは永続性の保証とパフォーマンスのトレードオフ機能を提供している。この最適化では、各ストレージノードはメインメモリ内にオブジェクトバッファを保持する。各writeオペレーションをバッファに保存し、別のthreadで定期的にストレージに書き込む。read命令の際には、まず最初に要求されたキーがバッファに存在するかどうかを調べる。もしあれば、バッファから直接readする。

この手法では、バッファ中のwriteの書き込みが実行される前にサーバがダウンするとデータロストが発生しうる。そのリスクを軽減させるために、writeオペレーションはNレプリカのうちから durable writeを行うコーディネーターを持つように改良されている。コーディネーターはW個のレスポンスだけ待つので、 durable writeオペレーションのパフォーマンスの影響を受けない。

6.3 Divergent Versions: When and How Many?

データ管理には、アイテムのバージョンは最小限に抑えられた方がパフォーマンス的に良い。分岐は障害か、同じアイテムに対する大量の並列書き込みによって生じる。大量の並列書き込みは、通常ボットによるもので、ここでは詳細に触れない。

6.4 Client-driven or Server-driven Coordination

クライアントが、サーバ側のノード情報を取得して適切にノードへリクエストすることで、ロードバランサーが不要になり、また適切なノードへのリダイレクトする際のレイテンシーも節約できる。

6.5 Balancing background vs. foreground tasks

それぞれのノードはそれぞれの通常のフォアグラウンド put/get処理に加え、レプリカ同期やデータ受け渡しなど異なる種類のバックグラウンドタスクを実行しています。Admission controllerがフォアグラウンド処理の計測を行うことでリソースの可用性を評価して、バックグラウンド処理のスケジュール管理を行う。

Redis公式ドキュメントのメモ

モチベーション

NoSQLの勉強をしていて、中身の実装方法とかをもう1段階理解を深めたいと思った。Redisを選んだのは本番環境でも使っていて、シンプルかつパワフルであまり嫌いな人を聞いたことが比較的ないし、一番とっかかりやすそうだったから。あとは、あまりRedisに関して深ぼった本を知らないから。
ソースコードをできれば読みたいが、C言語で断念しそうなので、一旦様子見。

感想としては、公式ドキュメント読んだだけでも、だいぶ理解が変わったので読んでよかった。データ構造・アルゴリズムやDBの基礎的な本を読んでたおかげで、かなり理解しやすくなったので、やはりCSの基礎は重要なんだなと感じた。

公式ドキュメント

この記事は、英語だがコンパクトにまとまっていた。

Overview Of Redis Architecture

読んで気になった箇所のメモ

expire

UNIX timestamp(ms単位)を使って比較している。

expireするタイミングは2種類ある。

  • passive: キーにアクセスされたとき
  • active: 1秒に10回以下を実行
    1. expireが設定されているキーの集合から20個取得
    2. そのうち、expireしてるものは削除
    3. もし25%以上がexpireしていた場合は、1から再度実行

削除するときは、AOLには DELコマンドのログを残す。

メモリの上限とアルゴリズム

maxmemory でデータ容量の最大値を指定できる。

上限に到達したときの挙動は maxmemory-policyで指定できる。

ポリシー 説明
noeviction エラーを返す(主には書き込みやDELなどの操作)
allkeys-lru LRUで削除
volatile-lru expireがセットされているデータをLRUで削除
allkeys-random ランダムで削除
volatile-random expireがセットされているデータをランダムで削除
volatile-ttl ttlが短いデータから削除
volatile-lfu expireがセットされているデータをLFUで削除
allkeys-lfu LFUで削除

volatile-*は削除できるデータがない場合は、エラーを返す。

RedisのLRUアルゴリズム

厳密なLRUを実現するにはメモリが必要なため、Redisでは擬似的なLRUアルゴリズムを実装している。 maxmemory-samples で指定した数のサンプルを取得して、それらの時間が一番古いものを削除する。

LFU(4.0からサポート)

LRUのように擬似的な実装がされている。Morris Counterという1オブジェクトあたり数ビットだけでオブジェクトのアクセス頻度がわかる確率的なカウンターを利用している。

頻度は0-255で与えられ、lfu-log-factorで頻度を増加させるのにどれだけのアクセスが必要かを指定し、lfu-decay-timeで頻度が減衰する時間を指定する。

トランザクション

悲観的ロック

MULTI- EXEC コマンドで、順番に連続で実行されることが保証される(シングルスレッドなので独立性が担保)。アトミック性も保証できる。

AOLでログに書き込む際には、一度の write(2)システムコールでディスクにsyncしようとするが、Redisサーバやプロセスが途中でクラッシュしたりした場合は、トランザクションの一部のみ書き込まれる可能性がある。その場合は、リスタートしようとするとエラーで終了するので、 redis-check-aofで部分的に書き込まれたトランザクションを削除してリスタートできる。

楽観的ロック

WATCHコマンドを使えば実現できる。 WATCHコマンドを実行してから、 EXECを実行するまでに修正された場合は、エラーが発生する。

WATCHコマンドの注意事項

  • WATCHしたクライアントによってMULTI-EXEC間に書き換えられた場合は失敗する。
  • WATCHしたクライアントによってMULTI前に書き換えられた場合は成功する。
  • WATCHからEXECまでの間にkeyがexpireされた場合は、そのまま実行される。
  • コネクションが切れた場合は、UNWATCHの状態になる

クライアントサイドキャッシュ

Redis6.0からサポート。クライアント側で

  • default mode:
    • サーバ側で、どのキーがクライアントでキャッシュされているかを管理して、修正されたらメッセージを送り、クライアントがキャッシュを削除する
    • サーバ側で、どのクライアントがどのキーをキャッシュしているかを管理しないといけないので、サーバ側で余分にメモリが必要
  • Broadcasting mode
    • 修正されたら、クライアントキャッシュを削除するようにクライアントにメッセージを送る
    • subscribeするprefixを指定することができる
    • サーバサイドでは、prefixがマッチするクライアントに対してメッセージを送り、keyを管理する必要はない。

コネクションが切れた場合は、キャッシュを削除する。

大量のデータのインサート

ファイルを作って、pipeモードで挿入するのが良い。

アプリケーションのloopで挿入すると、round-trip分の無駄な通信が発生するので、遅い。

cat data.txt | redis-cli --pipe

パーティション

実装方法
  • クライアントサイド
  • プロキシ
    • Redisに直接リクエストを送るのではなく、プロキシを挟んで、プロキシが振り分ける。
    • twemproxyなどがある。
  • クエリールーティング
    • ランダムにインスタンスに送って、インスタンスが正しいノードにリダイレクトする。
    • Redis Clusterは部分的に採用していて、直接リダイレクトはせず、クライアント側にリダイレクトを指示する。
デメリット
  • 複数のキーにまたがる操作がサポートされていない
  • 複数のキーにまたがるトランザクションは不可能
  • 1つのキーで大きなデータを持つ場合は分離できない
  • ノードの増減でリバランスが走るので、複雑。Pre-shardingを使えば解決できる

データストアとして使う場合は、リバランスを行うにはダウンタイムが発生するので、キーとノードのmappingを固定しなければならない。キャッシュとして使う場合は、オンラインでリバランスを実行できる。

Presharding

リバランスを行うのは複雑なので、事前にシャーディングを行えばよい。

Redisノードは軽いので、一つのサーバで始める場合でもRedisノードを多数立ち上げて、シャーディングを行っておく。リソースの追加が必要になれば、新しいサーバにRedisノードを移せばよい。

実装
  • Redis Cluster: 基本的にはデファクト
  • Twemproxy: 少しの複雑性を伴う。複数ノード用意すればSPOFにならない
  • consistent hashing

セカンダリインデックス

ZSETはスコアが同じ場合は値の辞書順にソートされる。

ZRANGEBYLEX を使えば、値から絞ることができる

  • 検索ワードのサジェスト
  • フォロー関係のグラフ

データ型

Redis keys
  • 長いキーは良くない
    • メモリを喰うし、検索にも時間がかかる
    • 長いならハッシュ化した方がよい
  • 最大サイズは512MB
SETコマンド

すでに値があるかどうかで保存するかどうかのオプションがある。

# mykeyに値がないとき
> set mykey newval nx
(nil)
> set mykey newval xx
OK
List

データ構造としてはLinkedListで実装されているので、先頭や最後尾の要素の取得、挿入、削除を一定時間で取得できる。中間の要素を取る場合でパフォーマンスが重要な場合はSortedSetsを検討すべき。

pollingを実装する場合は、 BRPOPBLPOPが便利。

集合型データの場合は、要素挿入時にキーが存在しない場合は自動的に初期化され、要素削除時にキーが空になる場合は自動的に削除される。また、存在しないキーに対して LLENなどの読み込みコマンドや削除系のコマンドを実行すると、存在するかのように振る舞う(この場合0を返したりする)。

Sorted sets

並び順は、スコア -> 値の辞書順の順番で評価される。

スキップリストとハッシュテーブルの両方を持つデータ構造からなる。そのため、追加時の計算量は O(log(N))で、取得時はすでにソート済みなので何もしなくてよい。

Bitmaps

主な用途は以下

  • リアルタイム分析
    • 例えば1時間ごとに、アクセスしたかどうかを記録
  • 省スペースかつ高パフォーマンスなboolean値の保存

Twitter demo

PHPの例で、一つのLinuxサーバで10万リクエストを100のクライアントで並列で実行すると、平均で5msのレイテンシーのパフォーマンスが出たそう。

Replication

レプリケーションがどのように行われるか

レプリカがマスターにつながれると、 PSYNCコマンドでレプリケーションIDを渡して、差分のコマンドを受け取り、同期する。

マスター側にレプリケーションIDから差分がわからない場合は、以下のようにフルバックアップをとる。

  1. スナップショットを作成するためにRDBファイルの生成を開始するとともに、新しいコマンドはバッファにため始める。
  2. RDBファイルが完成したら、レプリカにファイルを送信し、レプリカはディスクに保存。
  3. ディスクからメモリに読み込み終わった後にバッファに溜まったコマンドを受け取り処理をする。
Replication id

Replication idは、リスタートしたり、masterに昇格した際にランダムに生成される。offsetによってその間の時系列を把握できる。Replication idが切り替わる際には、2つ保持しておき、切り替わる前のデータをレプリカを

レプリカの接続状況によって書き込みを禁止する

データロスの可能性を減らすために、レプリカが一定以上に正常な場合にのみ書き込みを許可することができる。

Redisではレプリカがマスターに対してpingすることで死活監視を行う。マスターはレプリカが最後にpingした時刻を保存しておき、設定した時間内にpingされたかどうかで判定する。そのため、厳密なデータロスを防ぐことは保証されていない。

  • min-replicas-to-write: 接続が確認できるレプリカの最小数
  • min-replicas-max-lag: 最後にpingしてからどれぐらいのラグまでは正常な状態とみなすかどうか
expireの取り扱い

node間で時刻がずれていると、expireの扱いがノード間でずれてしまう可能性がある。

  • レプリカでは直接expireの処理は行わず、マスターがexpireしたDELコマンドを同期することでexpireさせる
  • しかし、同期が遅れてexpireしているべきオブジェクトをレプリカで読み込まれると矛盾が生じるため、データの整合性を担保できるように読み込み時のみ、論理的なexpireした結果をレプリカは返す

永続性

メモリが揮発する可能性があるので、永続性を担保するにはバックアップが必須

  1. DBに保存する
    • リカバリ時のパフォーマンスはよい
    • データを一部失う可能性がある
    • 大きなデータを書き込む際にCPU負荷が上がる
  2. AOL: 全ログを保存する
    • 確実に再現できる
    • データ量はDBより大きくなってしまう
    • fsyncの頻度によってパフォーマンスは左右される

セキュリティ

  • 文字エスケープという概念がないから、NoSQLインジェクションは起こらない
  • シークレットはconfigに平文で保存して、Redisサーバにさえアクセスできなければ悪用されない。忘れたときは、そこを見ればいいので値は長い方がセキュア。
  • 実行可能なコマンドを制限できる。もしくは rename-command を使って、空文字とかに上書いて実行できないようにする

Redis Sentinel

モニタリング、通知、自動フェイルオーバー、サービスディスカバリなどを提供し、Redisの可用性を向上させる仕組み。自身は分散システムからなっていて、複数のノードで監視することで死活監視の偽陽性を防いだり、自身の可用性を向上させられる。

設定によるが最低2つのノードで合意が得られるとフェイルオーバーを実行させるため、1つのノードが稼働しなくても問題ないように3つ以上のノードで構成すること。

Latency

ネットワーク
  • コネクションをできるだけ維持する
  • MSET, MGETなどのコマンドやパイプラインを使ってラウンドトリップを減らす
シングルスレッド

multiplexingを使って一つのプロセスで全リクエストを処理している。

Node.jsもシングルスレッドだがパフォーマンスは十分出ている。これは、システムコールでブロックされないように設計されているから。

厳密にはシングルスレッドではなく、遅いI/O処理をバックグラウンドで行っている。

コマンド

SORT, LREM, SUNIONなどのコマンドは速くはないので、必ずコマンドの計算量をドキュメントで確認すること。

KEYS は特にパフォーマンスが悪いので、あくまでもデバッグ目的で使うこと。代わりにSCANを使うこと。

AOF, disk I/O

AOFは write(2)fdatasync(2)の2つのシステムコールを使う。

ディスクへの書き込みの頻度は appendfsyncの設定によって変わるが、他のプロセスでディスクI/Oがないようにするのが望ましい。

一般的にfsyncしている同じファイルへのwriteはブロックされる。 everysecに設定されてる場合、Redisではfsyncが行われてるとき2秒まではwriteの処理はバッファーしておくが、そのあとはwriteを実行するため、遅くなる可能性がある。

その他
  • Linuxではtransparent huge pagesをオフにすること。
  • expiresは同時刻で多数のオブジェクトのTTLを設定しないこと。
    • Redisがexpireしたオブジェクトを削除するときに、通常20件サンプリングした中で、25%以上がexpireしていた場合は、25%以下になるまで削除し続けるため。

Redisクライアント

コネクションの最大数は設定で指定できる。ただし、ファイルディスクリプタの最大数が小さい場合は、異なる値に設定される(ログにエラーが出力される)。

タイムアウト

デフォルトではアイドル時間が長くても、コネクションは切れない設定になっている。

configで設定できるが、全てのコネクションをチェックするのにO(N)時間かかるので、パフォーマンスを優先して厳密な時間を保証はしていない。

シグナルハンドリング

  • SIGTERM, SIGINT
    • 即座に終わるのではなく SHUTDOWNコマンドを実行したときと似た挙動を起こす
    • コマンドの実行完了次第すぐに終了
    • RDBファイルに保存失敗した場合は、終了しない
  • SIGSEGV, SIGBUS, SIGFPE, SIGILL
    • ログファイルにバグレポートを生成
    • シグナルハンドリングを解除して、自身で同じシグナルを送って終了する
  • RDBの子プロセス等がkillされた場合
    • 永続性が担保できないので、READのみ受け付けて書き込み操作はエラーを返すようになる

次のアクション

ソースコード読むか、他のNoSQLを調べる。