Main Content

このページの内容は最新ではありません。最新版の英語を参照するには、ここをクリックします。

parallel.pool.PollableDataQueue

クライアントとワーカーの間でのデータの送信とポーリング

    説明

    PollableDataQueue オブジェクトにより、計算の実行中に並列プール内のワーカーとクライアント間でデータまたはメッセージの同期送信およびポーリングができます。たとえば、中間値をクライアントに送信して、その値を別の計算に使用できます。

    並列プールのワーカーからクライアントにデータを送り返すには、まずクライアントに PollableDataQueue オブジェクトを作成します。この PollableDataQueue オブジェクトを、parfor ループ、または parfeval などの他の並列言語構成に渡します。ワーカーから send を呼び出して、データをクライアントに送り返します。クライアントで poll を使用して、ワーカーから送信されたメッセージまたはデータの結果を取得します。

    • 必要に応じて、PollableDataQueue を作成したワーカーまたはクライアントから send を呼び出すことができます。

    • ワーカー上にキューを作成し、そのキューをクライアントに送り返すことにより、逆方向の通信を有効にできます。

      R2023b より前: ワーカーから別のワーカーにキューを送信することはできません。ワーカー間でデータを転送するには、代わりに spmdspmdSend または spmdReceive を使用してください。

    • その他すべてのハンドル オブジェクトとは異なり、PollableDataQueue インスタンスと DataQueue インスタンスはワーカーに送信されても接続されたままになります。

    作成

    説明

    p = parallel.pool.PollableDataQueue は、さまざまなワーカーからのメッセージ (またはデータ) の送信およびポーリングに使用できるオブジェクトを作成します。データを受信するワーカーまたはクライアント上で、PollableDataQueue を作成します。

    プロパティ

    すべて展開する

    このプロパティは読み取り専用です。

    キューからの削除待ちのデータ アイテム数。0 または正の整数として指定します。値は 0 か、または PollableDataQueue インスタンスを作成するワーカーまたはクライアント上では正の整数です。クライアントが PollableDataQueue インスタンスを作成する場合、値はすべてのワーカー上で 0 です。いずれかのワーカーが PollableDataQueue を作成する場合、値はクライアント上およびその他すべてのワーカー上で 0 です。

    オブジェクト関数

    poll ワーカーから送信されたデータの取得
    sendデータ キューを使用したクライアントからワーカーへのデータの送信

    すべて折りたたむ

    PollableDataQueue を作成します。

    p = parallel.pool.PollableDataQueue;
    

    parfor ループを開始し、値 1 を持つデータなどのメッセージを送信します。

    parfor i = 1
        send(p, i); 
    end
    

    結果をポーリングします。

    poll(p)
    1
    

    PollableDataQueue によるデータのポーリングの詳細については、poll を参照してください。

    PollableDataQueue オブジェクトにメッセージを送信すると、メッセージはキューで待機します。メッセージごとに、キューの長さに 1 が加算されます。poll を使用すると、1 つのメッセージがキューから収集されます。この例では、QueueLength プロパティを使用して PollableDataQueue オブジェクトの長さを求めます。

    いずれかのクライアントまたはワーカーが PollableDataQueue オブジェクトを作成すると、キューに送信されたすべてのメッセージはそのクライアントまたはワーカーのメモリに保持されます。クライアントが DataQueue オブジェクトを作成する場合、すべてのワーカー上の QueueLength プロパティは 0 です。この例ではクライアント上に PollableDataQueue オブジェクトを作成し、ワーカーからデータを送信します。

    まず、1 つのワーカーをもつ並列プールを作成します。

    parpool(1);
    Starting parallel pool (parpool) using the 'local' profile ...
    Connected to the parallel pool (number of workers: 1).
    

    PollableDataQueue を作成します。

    pdq = parallel.pool.PollableDataQueue
    pdq = 
      PollableDataQueue with properties:
    
        QueueLength: 0
    
    

    新規に作成された PollableDataQueue には空のキューがあります。parfor を使用して、ワーカー上の pdq.QueueLength を求めることができます。クライアント上のキューの長さ、およびワーカー上のキューの長さを求めます。

    fprintf('On the client: %i\n', pdq.QueueLength)
    On the client: 0
    
    parfor i = 1
        fprintf('On the worker: %i\n', pdq.QueueLength)
    end
    On the worker: 0
    

    キューは空であるため、クライアントとワーカーの両方で QueueLength0 です。次に、メッセージをキューに送信します。続いて、QueueLength プロパティを使用してキューの長さを求めます。

    % Send a message first
    parfor i = 1
        send(pdq, 'A message');
    end
    
    % Find the length
    fprintf('On the client: %i\n', pdq.QueueLength)
    On the client: 1
    
    parfor i = 1
        fprintf('On the worker: %i\n', pdq.QueueLength)
    end
    On the worker: 0
    

    QueueLength プロパティはクライアントで 1 であり、ワーカーで 0 です。poll を使用して、キューからメッセージを取得します。

    msg = poll(pdq);
    disp(msg)
    A message
    

    QueueLength プロパティを使用して、キューの長さを求めます。

    fprintf('On the client: %i\n', pdq.QueueLength)
    On the client: 0
    

    キューの処理が完了しているため、QueueLength0 です。

    この例では、ワーカーでデータを受信するためのデータ キューを設定する方法を説明します。

    データ キューを使用して、クライアントとワーカーの間でデータまたはメッセージを転送できます。

    この例では、ワーカーで計器データを生成し、そのデータをクライアントに送り返します。信号生成の開始と停止を行うために、クライアントはデータ キューを使用してメッセージをワーカーに送信できます。これは、ワーカー上の parfeval の計算をよりスムーズに停止する方法を提供します。

    3 つのワーカーをもつ並列プールを起動します。

    pool = parpool("Processes",3);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 3 workers.
    

    クライアントでのデータ受信のためのキューの設定

    ワーカーからの計器データを可視化するプロットを用意して初期化します。関数 createPlots は、この例の最後で定義されています。

    [fig,p] = createPlots;

    DataQueue を作成し、afterEach を使用して、キューがデータを受信するたびに実行される関数を指定します。関数 receiveDataOnClient はワーカーから受信したデータをプロットするもので、この例の最後で定義しています。

    clientQueue = parallel.pool.DataQueue;
    afterEach(clientQueue,@(data) receiveDataOnClient(p,data));

    ワーカーでの通信受信のためのキューの設定

    クライアントにヘルパー PollableDataQueue を作成します。

    helperClientQueue = parallel.pool.PollableDataQueue;

    parfeval を使用して、並列プールの 3 つのワーカーにデータ キューを設定します。補助関数 connectToWorker は各ワーカーに一意の ID を割り当て、各ワーカーに PollableDataQueue を作成し、helperClientQueue キューを使用してデータ キューをクライアントに送信します。その後、ワーカーはクライアントからのデータ生成の開始命令を待ちます。

    wkrF(1:3) = parallel.FevalFuture;
    for ID = 1:3
        wkrF(ID) = parfeval(@connectToWorker,0,clientQueue,helperClientQueue,ID);
    end

    クライアントでは、ラベル付きワーカーのキューを受信します。これで、それらのキューを使用して各ワーカーにデータを送信できます。

    allWkrQueues = struct('ID',{},'Queue',{});
    for i = 1:3
        wkrQueue = poll(helperClientQueue,inf);
        allWkrQueues(wkrQueue.ID) = wkrQueue;
    end

    データ生成の開始と停止

    次に、ワーカーにデータ生成を開始するように命令します。

    for ID = 1:3
        send(allWkrQueues(ID).Queue,"Start generating data");
    end

    次の図は、各ワーカーが生成してクライアントに送信した計器データを示しています。

    fig.Visible="on";

    10 秒間データを生成します。

    pause(10)

    ワーカー 2 でのデータ収集を停止するために、ワーカー 2 で作成したキューを使用して、そのワーカーにメッセージを送信します。Instrument 2 のラインが約 0.9 秒で停止していることが観察できます。

    send(allWkrQueues(2).Queue,"stop");

    helperClientQueue キューをポーリングして、ワーカー 2 からの確認を受信します。

    [status, ~] = poll(helperClientQueue,inf);
    disp(status)
    Data generation stopped on worker 2
    

    他のワーカーが計算を完了するまで待機します。

    wait(wkrF);

    補助関数

    関数 connectToWorker は、ワーカーに PollableDataQueue を作成し、それをクライアントに送信してから、wkrQueue キューをポーリングしてクライアントからの命令を待ちます。

    ワーカーがクライアントからメッセージを受信すると、この関数は計器からの連続データを模したダミー信号をワーカー上に生成します。各タイム ステップで、ワーカーは clientQueue キューを使用して信号の 1 点をクライアントに送信してから、wkrQueue キューをポーリングしてそのキューにデータがあるかどうかをチェックします。受信するデータがある場合、ワーカーはデータ生成を停止し、データ生成を停止したことを確認するメッセージをクライアントに送信します。

    function connectToWorker(clientQueue,helperClientQueue,ID)
    % Assign an ID to this worker.
    wkrQueue.ID = ID;
    % Create a PollableDataQueue on this specific worker.
    wkrQueue.Queue = parallel.pool.PollableDataQueue;
    % Send the queue to the client.
    send(helperClientQueue,wkrQueue);
    
    % Wait for instructions from client.
    [~, OK] = poll(wkrQueue.Queue,inf);
    if OK
        t = 0:0.01:4;
        step = 1;
        while step < numel(t)
            % Generate dummy instrument data.
            data_point = sin(ID*2*pi*t(step));
            % Send data to client using a data queue.
            send(clientQueue,{ID,t(step),data_point});
            % Check if worker queue has data to receive and use a timeout.
            [~, OK] = poll(wkrQueue.Queue,0.1);
            if OK
                send(helperClientQueue,sprintf("Data generation stopped on worker %d",ID));
                return
            else
                step = step + 1;
            end
        end
    else
        return
    end
    end

    ワーカーからのデータを可視化するプロットを用意して初期化する関数を定義します。ワーカーごとに異なるライン プロパティを指定します。

    function [fig,p] = createPlots
    fig = figure(Name="Signal from Instruments",Visible="off");
    t = tiledlayout(fig,3,1);
    lineColor = ["k","b","g"];
    p = gobjects(1,3);
    for i=1:3
        nexttile(t);
        xlabel("Time (s)");
        ylabel("Amplitude");
        title(sprintf("Instrument %d",i))
        p(i) = animatedline(NaN,NaN,Color=lineColor(i));
    end
    end

    ワーカーがクライアントにデータを送信したときにプロットを更新する関数を定義します。

    function receiveDataOnClient(p,data)
    addpoints(p(data{1,1}),data{1,2},data{1,3})
    drawnow limitrate;
    end

    ヒント

    • PollableDataQueue オブジェクトを使用して送信されたデータまたはメッセージは、手動でのデータ取得のみができます。クライアントでデータを受信した後に自動処理するには、代わりに parallel.pool.DataQueue オブジェクトを使用してデータを送信します。

    バージョン履歴

    R2017a で導入