Confluent Japan Community Blog

Confluent Japan Community が提供するストリーミングデータを扱うConfluent、Apache Kafkaに関する情報を提供します。

Confluent Japan Community が提供する apache Kafka および Confluent に関するブログ

KSQLの活用

- より簡単な利用を実現 -

 

・KSQLとは

KSQLは、Confluent社が中心になり開発した Apache Kafka 用のSQLライクな言語で処理を記述できるストリーミング処理エンジンです。Kafkaを利用してストリーミングデータを処理する際に、そのデータの変換、加工(フィルタリング、変換、集約、結合、ウィンドウ処理、セッション化)等を実現し、今までのKafkaの処理をより広く対応することが可能になりました。

KSQLはKafkaとは別に起動されるサービスであり、Confluent Platformのパッケージにバンドルされています。全体の構成としては以下のようになっています。

f:id:Confluent:20190330174639p:plain

KSQLは、KSQL Server と KSQL CLI(Command Line Interface)で提供されるものであり、Confluentのエンタープライズ向けサービスでで提供されるControl Centerにてその動作などをUIで確認することができます。

 

  • KSQL Server
    • KSQLクエリを実行するエンジンであり、Kafka クラスターからデータを取得、処理し、書き戻すなどの処理を行います。KSQLにて記述された実行情報から必要なデータの抽出や処理を行います。

  • KSQL CLI
    • KSQL Serverのクライアントとして機能するものであり、処理を行いたい内容をKSQLの構文で作成し、そのクエリ情報をKSQL Serverに送って処理を行います。

 

KSQLは、2つのデプロイタイプを持っており、開発段階では、インタラクティブモードを利用し、プロダクション環境では、ヘッドレスモードを利用します。モードごとに特徴があり、その利用シーンも異なりますがが、どちらのモードにおいてもKSQL Server のインスタンスをアプリケーションの停止なしで追加することができます。

 

  • インタラクティブデプロイメント
    • インタラクティブの場合は、KSQLサーバーがREST APIを開放し、インスタンスの再起動なしにSQLクエリを追加することができます。また、Confluent Control Center やREST Clientとのやり取りを自由に行うことができます。開発段階においては、いろいろと試行錯誤して開発を行うため自由度の高く適しています。ただプロダクション環境においては自由度が高すぎるものになります。

f:id:Confluent:20190330174724p:plain

  • ヘッドレスデプロイメント
    • ヘッドレスの場合は、KSQLサーバーが指定されたファイルから読み込んだSQLのみを実行し、動的なSQLの追加実行を許容しない。すでに開発が完了しており、決まった形式での実行のみを行うため、ヘッドレスモードを使うことでKSQLサーバーの安定性を担保します。

f:id:Confluent:20190330174809p:plain

それぞれのモードのサポート内容に関する詳細は以下を参照してください。

 

参考資料:

モードごとのサポート情報

https://docs.confluent.io/current/ksql/docs/concepts/ksql-architecture.html

 

・KSQLの利用で広がるデータ活用の幅

KSQLは、SQLライクな記述でストリーミングデータの処理を実行することができます。SQLはデータベースに対して一度処理を実行してデータを抽出するのに対して、KSQLは無限に流れてくるデータに対して処理を実行し続ける点が大きな違いになります。

永続的に流れてくるデータに対して、複数の動作をまとめて一つの情報として取得したり、複数のデータを結合して新しい情報を出力するといった処理がKSQLでは可能になります。Kafkaはストリーム処理を一元化し、複数のシステムからデータを受け取り、トピックやコンシューマーグループを使って効率的にデータを配信する仕組みであるが、KSQLを使うことで、慣れ親しんだSQLを用いてデータを加工し、新しいデータ形式を生み出すことができます。

f:id:Confluent:20190413143129p:plain

f:id:Confluent:20190413143145p:plain

参考サイト:KSQLを使ったストリーミングアプリケーションの構築
https://www.confluent.io/blog/building-streaming-application-ksql
動画:

www.youtube.com

デモデータ:

github.com


上記はKSQLを使った簡単な例になります。例えばECサイトのように順次処理が行われるオーダーと、配送に関する処理情報をKSQLクエリを用いて処理を行うことで現在存在している手持ち在庫を洗い出すことができます。この際、利用している情報は、オーダー情報と配送情報の2つのストリーミングデータで、KSQLのクエリを使うことで、手持ち在庫の情報を抽出することが出来ます。この様に、KSQLであれば二つの処理を一つの処理で行うことができ、それぞれのデータを受け取った先の仕組みで加工や処理を実施する必要がなくなります。

このようにKafkaを利用してさまざまなProducerからデータを収集しますが、収集して配信するだけではなく、加工や別の形での利用も実現することができるため、リアルタイムのストリーミングデータに対しての利用の幅が向上します。ストリーム処理を行う場合に、KafkaではKafka Streamを提供しており、これらを利用して処理を行うことができます。さまざまな処理がライブラリで提供されていることから複雑な処理をこのライブラリを使って作成することができます。しかし、本来であれば、これを行うには複雑な処理を実行することができるプログラムを書く必要があります。KSQLとKafka Streamの違いについては以下の参考資料を確認してみてください。

 

参考資料:

KSQLとKafka Streamの違い

https://docs.confluent.io/current/ksql/docs/concepts/ksql-and-kafka-streams.html

・KSQLの概要

KSQLは、内部的には、Kafka Stream APIを利用していて、APIを通じてKafkaトピックを操作します。それを可能としている2つの抽象化概念があります。

 

  • Stream
    • ストリームは無限の構造化データを指します。このデータは取得したデータであり、変換できないデータになります。新たな情報を差し込むことはできますが既存の情報を更新したりすることはできません。
      Confluentで説明されている内容としては以下のような内容です。
      • AさんがBさんに100ドル送金した
      • CさんがBさんに50ドル送金した


この処理は事実であり、変換も更新もできないデータになります。

  • Table
    • テーブルは、上記のストリームであったり、他のテーブルのビューとなるものです。ストリームデータを加工などして新しい情報を作成することができます。上記の例で行くと
      • Bさんの口座には150ドルの送金受け取りがある


この情報は変更することができ、別の形でも作成することができるものになります。例えばBさんは2名からお金を受け取った。などという形の変換になります。

流れてくる事実のデータを加工し、リアルタイムデータのマテリアライズドビューが作成されるイメージで考えると分かりやすいかもしれません。これらのデータを構築することにより、リアルタイムでさまざまな処理を実現することが可能になります。今までリアルタイムにデータは取得してもその先にさまざまな複雑な構成を行っていた仕組みからKSQLを用いて容易に分析等を行うことができるようになります。

 

例:以下はKSQLを使った簡単な処理の概要で、ATMで利用されている利用データと顧客のデータをKafkaにて受け取ります。この2つのデータはストリームになります。その受け取った2つのストリームをKSQLを使い、利用データの詐欺情報を検出するテーブルやトランザクションデータと顧客を紐づけて、データとして利用できるテーブルを作成し、その先のシステムで処理を行うことができます。他のシステムで処理を実行するために必要なデータは、KafkaおよびKSQLで実行されて作り上げられます。

f:id:Confluent:20190330175102p:plain

まとめ

KSQLは、ストリーミングデータをSQLの記述のみで処理を行うことができるため、利用しやすく、データの加工や取り出し、書き戻しなどさまざまな個所で利用することができます。KSQLを使うことで永続的に流れてくるSQLデータに対して、SQLを実行し続けてくれることなどのメリットがあり、これらを利用した幅広いサービスなどを開発することが可能になります。

次回は、ConfluentのEnterprise機能であるConfluent Control Centerについて解説します。