メインコンテンツ

Streaming Data Framework for MATLAB Production Server の基礎

Streaming Data Framework for MATLAB® Production Server™ を使用して、Kafka® などのイベント ストリーミング プラットフォームに対して読み取りと書き込みを行う。このフレームワークを使用して次のことを実行できます。

  1. イベント ストリーム データをフィルター処理、変換、記録、または処理するストリーミング解析関数を MATLAB で開発する。

  2. ストリーミング ソースに接続し、Streaming Data Framework for MATLAB Production Server 関数を使用して、解析関数がイベント ストリームに対してどのように読み取りと書き込みを行うかをテストする。

  3. ストリーミング解析アルゴリズムをテストするために実稼働環境をシミュレートする (MATLAB Compiler SDK™ が必要)。

  4. 解析関数をパッケージ化し (MATLAB Compiler SDK が必要)、MATLAB Production Server にデプロイする。

Streaming Data Framework for MATLAB Production Server のインストール

MATLAB アドオン エクスプローラーから Streaming Data Framework for MATLAB Production Server サポート パッケージをインストールします。アドオンのインストールの詳細については、アドオンの取得と管理 (MATLAB)を参照してください。

インストールが完了したら、support_package_root\toolbox\mps\streaming\Examples で例を見つけます。ここで、support_package_root はシステム上のサポート パッケージのルート フォルダーです。このフォルダーへのパスを取得するには、次のコマンドを使用します。

fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples')

システム要件

Streaming Data Framework for MATLAB Production Server のシステム要件は MATLAB と同じです。詳細については、MATLAB のシステム要件を参照してください。

ストリーミング解析 MATLAB 関数の記述

イベント ストリーム解析関数は通常、入力イベントのストリームを消費し、出力イベントのストリームを生成できます。この関数は、MATLAB Production Server にデプロイできる MATLAB 機能を使用して、イベントのストリームをフィルター処理、変換、記録、または処理できます。

イベント ストリーム解析関数は、イベントのウィンドウまたはバッチを処理します。イベントは次の 3 つの部分で構成されます。

  • キー — イベント ソースを識別する

  • タイムスタンプ — イベントが発生した時刻を示す

  • 本体 — (名前、値) のペアで指定される順序付けのないセットとしてのイベント データを含む

解析関数はイベントを timetable に読み取ります。timetable の各行はストリーミング イベントを表し、通常は時系列順です。解析関数が結果を生成する場合、それらも timetable でなければなりません。

ストリームを処理する場合、ウィンドウ サイズは通常、ストリーム内のメッセージ数よりもはるかに小さいため、解析関数を複数回呼び出すことができます。MATLAB Production Server のステートレス実行モデルは各ウィンドウの処理を分離するため、1 つのウィンドウの処理が次のウィンドウの処理に影響を与えることはありません。連続するウィンドウの処理間で対話を必要とする "ステートフル" 関数は、ウィンドウ間で保持されて解析関数の次の呼び出しに渡される MATLAB 構造体を指定します。

解析関数は、次の 3 つのシグネチャのいずれかをもつことができます。

関数シグネチャ説明
results = analyticFcn(data)結果のストリームを出力するステートレス解析関数
[ results, state ] = analyticFcn(data, state)バッチ間で状態を保持し、結果のストリームを出力するステートフル解析関数
analyticFcn(data)結果のストリームを出力しないステートレス解析関数

ステートレス解析関数

次の plotSierpinski 関数はステートレス解析関数の例です。plotSierpinski は、入力 timetable の X 列と Y 列をプロットします。この関数のソース コードとそれを実行するスクリプトは、\Examples\ExportOptions フォルダーにあります。

function howMany = plotSierpinski(xyData)

    hold on
    arrayfun(@(x,y)plot(x,y,'ro-', 'MarkerSize', 2), [xyData.X], [xyData.Y]);
    hold off
    drawnow
    count = height(xyData);
    howMany = timetable(xyData.Properties.RowTimes(end), count);
end

ステートフル解析関数

次の recamanSum 関数はステートフル解析関数の例です。ステートフル関数では、データ状態はイベント間で共有され、過去のイベントが現在のイベントの処理方法に影響する可能性があります。recamanSum は数値シーケンスの累積和を計算します。これは次の 2 つの値を返します。

  1. cSum — ストリーム内の要素の累積和を格納するテーブル

  2. state — シーケンスの最終値を格納する構造体

recamanSum 関数のソース コード、その初期化関数 initRecamanSum、および解析関数の実行に使用するスクリプトは \Examples\Numeric フォルダーにあります。

function [cSum, state] = recamanSum(data, state)
    timestamp = data.Properties.RowTimes; 
    key = data.key;
    
    sum = cumsum(data.R) + state.cumsum;
    
    state.cumsum = sum(end);

    cSum = timetable(timestamp, key, sum);
end

MATLAB を使用した Kafka イベントの処理

ストリームからイベントを処理するには、ストリームに接続するオブジェクトを作成し、ストリームからイベントを読み取り、ストリーミング解析関数を繰り返してイベントの複数のウィンドウを処理します。また、解析関数が結果を生成する場合は、結果をストリームに書き込むための別のストリーム オブジェクトを作成します。

次のコード サンプルは、フレームワークを使用してイベントの 1 つのウィンドウを処理する概要を示しています。トピック recamanSum_data をもつネットワーク アドレス kafka.host.com:9092 で稼働している Kafka ホストがあると仮定します。また、recamanSum_data トピックに、Recamán シーケンスの最初の 1,000 要素が含まれていると仮定します。

  1. recamanSum_data トピックからの読み取りと、このトピックへの書き込みを行うための KafkaStream オブジェクトを作成します。

    inKS = kafkaStream("kafka.host.com",9092,"recamanSum_data");

  2. recamanSum_data トピックのイベントを timetable tt に読み取ります。

    tt = readtimetable(inKS);

  3. recamanSum 関数を呼び出し、tt 内の Recamán シーケンスの一部の累積和を計算します。recamanSum はステートフル関数であるため、まず initRecamSum 関数を呼び出して状態を初期化します。

    state = initRecamanSum();
    [results, state] = recamanSum(tt,state);

イベントの複数のウィンドウを処理する方法の詳細な例については、Process Kafka Events Using MATLABを参照してください。

MATLAB Production Server の開発用バージョンを使用した実稼働環境のシミュレーション

MATLAB Production Server にデプロイする前に、ローカル テスト サーバーとして機能する MATLAB Production Server の開発用バージョンを使用してストリーミング解析関数をテストできます。詳細な例については、Test Streaming Analytic Function Using Local Test Serverを参照してください。

MATLAB Production Server へのストリーミング解析のデプロイ

解析関数をパッケージ化し、MATLAB Production Server にデプロイすることもできます。詳細な例については、Deploy Streaming Analytic Function to MATLAB Production Serverを参照してください。

参考

| | | |

トピック