揮発性と不揮発性
・ストリーミングデータの処理
ストリーミングデータの処理は、リアルタイムでの活用、かつ膨大な情報量を処理が求められる現在では必要不可欠になっています。
今までもメッセージキューやログの収集システムなどで、企業はさまざまなデータの処理を行ってきました。
リアルタイムデータが主になってきている今では、IBM MQやApache Active MQなどのメッセージングサービスで行うにはする0プットやスケールアウトへの対応など多くの問題があり、リアルタイム処理に最適化して対応する必要が出てきました。
ストリーミングデータを処理するために必要な機能としては以下のようなものがあります。
- 無限に発生するデータを処理
- アクセスログやIoTなどのセンサーなど様々なシステムから発生する無限のデータを処理します。発生するデータは、比較的容量の小さいものが多いがその限りではありません。
- 高スループット、不定期な処理
- 容量の小さなデータだけではなく、大容量のデータなどさまざまなデータを永続的に処理することからデータに合わせた並列処理などを行う必要があります。また、データによって異なるため、処理は不定期な処理になります。
膨大かつ、不定期で送られてくるデータを永続的に処理を行うため、それに応じた環境と処理が要求されます。
ストリーミングデータの処理の中には、大量に発生するデータがアクセスログやセンサーのログなどもあり、通信や処理の遮断などから一部のデータが欠損したり、重複することなどが起きていました。
これらのデータの送信を行ったということが事実として残る形の「At Least Once(少なくとも1回はメッセージが配達される)」という概念で作られていました。それらが正常に処理されているのか、いないのかは把握していません。まずは1回は必ず配信されるという部分の処理にフォーカスしてシステムが構築され、処理が継続されることにフォーカスされていました。送信されるデータ量が膨大にあることからパフォーマンスなどを考えてもそういった処理を行う必要がありました。
しかし、データの重要性が増せば増すほど、送ったというだけで、受け取られたという部分が把握できていないことから、データが重複するなど問題が発生します。
その為、技術者がデータの精密な処理について対応しなければいけなくなり、さまざまなストリーム処理を行っている部分に正確性を追加する形で構築されるようになりました。
リアルタイムデータとバッチデータの時間軸
参考資料:
イベントファーストとイベントコマンド
・これまでのメッセージブローカー
一時データというのはそれがなくなっても大きな問題にならないものであったりします。例えば、メモリーDBは処理を高速化するためによく用いられていますがメモリーに展開することから再起動、プロセスがクラッシュするなどでデータが失われます。現在でも、スマップショットなどを利用してデータを保持するなどをしていますが直前のデータについては消えてしまうため、データの持続性については解決していません。
ストリーム処理においても同様でメッセージブローカーは、データの送達において持続性が検討されていないことが多くあります。
例えばオープンソースのakkaは、データを書き込んだりすることで発生するDisk IOなどのオーバーヘッドを減らし、大量なデータを処理することに主眼を置いて構築されています。そのため、一度は送信するということを行っているが、送信したデータが正常に受け取れているかは保証がされていませんでした。さらに一度送ったデータは消えてしまう状態にありました。軽量のメッセージプロトコルであるMQTTもこの形式で通常は送達します。(設定で変更は可能。)
仕組みとしては、メッセージブローカーがデータを受け取り、必要とする宛先にデータを配送すると配送が完了したデータは削除されてしまうというようなモデルが広く採用されていました。
この処理の場合には、データは消えてしまうので重複して保持することはないが、処理が正常に行われているかは判断していませんでした。
そのため、重要度の高い情報にこれらの仕組みは利用できませんでした。大量のデータを処理するにはオーバーヘッドを可能な限り減らし、処理を行えるようにすることを目指していました。
しかし、この処理で行った場合はメモリ上に保持することができない大容量のデータが入った場合には溢れたデータがロストしてしまいます。また、メモリーDBと同じようにクラッシュが起きた場合にデータが消失してしまいます。
さらには、メッセージを1度送るということにフォーカスしているため、受け取る側の処理に問題があったり、受け取る側が応答していないなどした場合には対処を行っている期間のデータは全てなくなることが前提になります。
以前の記事で書いているようなカードの不正検知などのサービスを提供するとした場合に、このような状態であった場合にはすべてのシステムが正常に動き続けなければサービスレベルは担保できない状態にあります。
リリース時のKafkaは、データを消失しないようにするために、「At Least Once(少なくとも1回は送信する)」の形でデータの受け取りを確実に行う仕組みで提供されました。この場合は重複が起きることを許容しなくてはいけません。
仕組みとしては、Producerからデータを送信し、データを受けとったBrokerがProducerへAckを返すことで正常な送信処理ができているかを把握するようにしています。また、ConsumerもBrokerにデータ配信の依頼を行い、その依頼でメッセージを正常に取得したらOffset Commitすることで状態の把握を行っていました。しかし、この場合は受け取ってもAckを返送する際に何らかエラーが起きた場合はその処理が完了してないことになるため、重複が発生します。そのため、重複が起きた際にその重複を取り除く処理を追加する必要があります。
日に日にデータの重要性が高まっている中でKafkaにおいても1度だけ正確に送信するExactly Onceの処理を実現しました。それを実現するために以下の処理を実現しています。
- べき等性を保つProducer
- トランザクション処理
Producerは、Brokerとのやり取りを行う際にやり取りを記録し、一度送って正常に終了しているが双方で保持している内容が異なる場合は、再度送信されても受け取った際に処理を行わず正常に応答する処理を行うことで重複が行われないように対応をしています。
Consumerにおいてもトランザクションの開始と終了をもち、受け取ったデータごとに処理終了のメッセージを送信し、Offset Commitを行った際にトランザクションを完了するようにしています。途中で障害があった際にはその処理を破棄するようになりました。
これによりProducerからのデータの受け取り、Consumerからの要求に対してデータを配達するという部分に対して送達の保証をしています。
参考資料:
ConfluentにてExactly Once処理の実現
https://www.confluent.io/blog/we-will-say-exactly-confluent-platform-3-3-available-now/
・再利用可能な処理を実現
Kafkaでは、より正確な処理、障害などが起きた際にも対応が可能な状態を実現するため、すべてのデータをdiskへ書き出し、持続性を確保するというデザインで開発が行われました。データの書き出しにより消失することを防ぐことはさまざまなメリットをもたらします。
- 実質的に無限のデータバッファリングを実現
- 障害時における耐障害性の確保
Diskにデータを保持することから、一度送達したデータについては、削除することができるため、データを受け取れる領域を確保できるようになります。
また、プロセスやマシンがクラッシュした際にデータが消失してしまい、再処理などが行えない場合が起こりえますが、Diskにデータが保持されているため、消失することがなく再処理を実行することができるという耐障害性の確保が可能になります。
Kafkaが実現したこのDiskにデータ書き出すという処理は、データの持続性を確保するというだけではなく、それ以外にもメリットがあります。
一度Diskに書き込まれたデータは、かなり過去に遡って再度取得することができます。この保持期間はKafkaの設定で行うことができます。遡って再度取得することができるという部分で以下の2つのメリットがあります。
- 実装を誤った際に再処理を行う
- 新たな処理を過去に遡って実装できる
- データストアとして利用する
これらの機能は非常に有益なものになります。通常ではデータがなくなってしまうことから実装が誤れば修正対応したところから再度始まるものであるが、それまでの処理を再度受け取り、時間を巻き戻すことができるようになります。
さらに、利用開始時においては想定していなかったが、新たに実装を行いたいと思った機能に対して、本来は新たに実装してから始まるものであるが実装が完了してから過去に遡ってデータを取得して利用することができます。
もちろんデータを永続的に保持していることから、データストアとして利用することができるのは容易に想像がつくと思います。これらの機能は、データをさまざまなサービスに利用するという部分において、非常に強力な機能となっており、Kafkaを選択する一つのポイントとなります。
Kafkaが実現しているものは、データの書き出しという処理が加えられたということではありません。前述のようにDisk IO で発生するオーバーヘッドがあるため、こういった機能が実装されていなかったが、Kafkaはこれらの処理を行っている中でも効率的に動作する機能を実装しています。
ローカルにデータが存在している場合、そのデータをファイルからソケットへデータ転送を行います。この処理を行うためには、通常では4つのデータコピーと2つのシステムコールが行われます。Kafkaでは、Sendfile APIを利用することで処理数を減らしてオーバーヘッドを避け効率的に動作することを実現しています。
また、大量データを処理することを考えると処理を行うたびに発生するオーバーヘッドがありますが、Kafkaではバッチングという機能で複数のレコードをまとめて一つの大きなリクエストとして送る機能を提供しています。まとめることによるリクエストの数を減らすことができます。
さらにdiskへデータを書き出す際に非同期なライトバックを行うことなど大量データを処理するという命題をクリアしています。Kafkaは、処理するためにデータを消失するなど犠牲をせずに正確にデータの送達を行い、さらにデータ処理だけではなく、再利用などを考えた、より多くのメリットを享受できる仕組みになっています。
参考資料
ニューヨークタイムズでの利用例
https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/
https://www.confluent.io/blog/introducing-apache-kafka-for-the-enterprise/
まとめ
高い信頼性を持つストリーム処理を実現するためには、データが消失しないことが重要なポイントとなります。KafkaはDiskへの書き込みを行い、データを持続的に保持することで実現しています。また、それを行うことによるデメリットを出さないようなにさまざまな処理を考え、設計されたソフトウェアになります。データの持続性を持つことでデータの送達だけではなく幅広い活用をKafkaを使うことで実現することができます。
次回は、技術的な課題の重要なポイントであるスケーラビリティについて解説します。