Main Content

ワーカーでの通信の受信

R2023b 以降

この例では、ワーカーでデータを受信するためのデータ キューを設定する方法を説明します。

データ キューを使用して、クライアントとワーカーの間でデータまたはメッセージを転送できます。

この例では、ワーカーで計器データを生成し、そのデータをクライアントに送り返します。信号生成の開始と停止を行うために、クライアントはデータ キューを使用してメッセージをワーカーに送信できます。これは、ワーカー上の parfeval の計算をよりスムーズに停止する方法を提供します。

3 つのワーカーをもつ並列プールを起動します。

pool = parpool("Processes",3);
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 3 workers.

クライアントでのデータ受信のためのキューの設定

ワーカーからの計器データを可視化するプロットを用意して初期化します。関数 createPlots は、この例の最後で定義されています。

[fig,p] = createPlots;

DataQueue を作成し、afterEach を使用して、キューがデータを受信するたびに実行される関数を指定します。関数 receiveDataOnClient はワーカーから受信したデータをプロットするもので、この例の最後で定義されています。

clientQueue = parallel.pool.DataQueue;
afterEach(clientQueue,@(data) receiveDataOnClient(p,data));

ワーカーでの通信受信のためのキューの設定

クライアントにヘルパー PollableDataQueue を作成します。

helperClientQueue = parallel.pool.PollableDataQueue;

parfeval を使用して、並列プールの 3 つのワーカーにデータ キューを設定します。補助関数 connectToWorker は各ワーカーに一意の ID を割り当て、各ワーカーに PollableDataQueue を作成し、helperClientQueue キューを使用してデータ キューをクライアントに送信します。その後、ワーカーはクライアントからのデータ生成の開始命令を待ちます。

wkrF(1:3) = parallel.FevalFuture;
for ID = 1:3
    wkrF(ID) = parfeval(@connectToWorker,0,clientQueue,helperClientQueue,ID);
end

クライアントでは、ラベル付きワーカーのキューを受信します。これで、それらのキューを使用して各ワーカーにデータを送信できます。

allWkrQueues = struct('ID',{},'Queue',{});
for i = 1:3
    wkrQueue = poll(helperClientQueue,inf);
    allWkrQueues(wkrQueue.ID) = wkrQueue;
end

データ生成の開始と停止

次に、ワーカーにデータ生成を開始するように命令します。

for ID = 1:3
    send(allWkrQueues(ID).Queue,"Start generating data");
end

次の図は、各ワーカーが生成してクライアントに送信した計器データを示しています。

fig.Visible="on";

10 秒間データを生成します。

pause(10)

ワーカー 2 でのデータ収集を停止するために、ワーカー 2 で作成したキューを使用して、そのワーカーにメッセージを送信します。Instrument 2 のラインが約 0.9 秒で停止していることが観察できます。

send(allWkrQueues(2).Queue,"stop");

helperClientQueue キューをポーリングして、ワーカー 2 からの確認を受信します。

[status, ~] = poll(helperClientQueue,inf);
disp(status)
Data generation stopped on worker 2

他のワーカーが計算を完了するまで待機します。

wait(wkrF);

補助関数

関数 connectToWorker は、ワーカーに PollableDataQueue を作成し、それをクライアントに送信してから、wkrQueue キューをポーリングしてクライアントからの命令を待ちます。

ワーカーがクライアントからメッセージを受信すると、この関数は計器からの連続データを模したダミー信号をワーカー上に生成します。各タイム ステップで、ワーカーは clientQueue キューを使用して信号の 1 点をクライアントに送信してから、wkrQueue キューをポーリングしてそのキューにデータがあるかどうかをチェックします。受信するデータがある場合、ワーカーはデータ生成を停止し、データ生成を停止したことを確認するメッセージをクライアントに送信します。

function connectToWorker(clientQueue,helperClientQueue,ID)
% Assign an ID to this worker.
wkrQueue.ID = ID;
% Create a PollableDataQueue on this specific worker.
wkrQueue.Queue = parallel.pool.PollableDataQueue;
% Send the queue to the client.
send(helperClientQueue,wkrQueue);

% Wait for instructions from client.
[~, OK] = poll(wkrQueue.Queue,inf);
if OK
    t = 0:0.01:4;
    step = 1;
    while step < numel(t)
        % Generate dummy instrument data.
        data_point = sin(ID*2*pi*t(step));
        % Send data to client using a data queue.
        send(clientQueue,{ID,t(step),data_point});
        % Check if worker queue has data to receive and use a timeout.
        [~, OK] = poll(wkrQueue.Queue,0.1);
        if OK
            send(helperClientQueue,sprintf("Data generation stopped on worker %d",ID));
            return
        else
            step = step + 1;
        end
    end
else
    return
end
end

ワーカーからのデータを可視化するプロットを用意して初期化する関数を定義します。ワーカーごとに異なるライン プロパティを指定します。

function [fig,p] = createPlots
fig = figure(Name="Signal from Instruments",Visible="off");
t = tiledlayout(fig,3,1);
lineColor = ["k","b","g"];
p = gobjects(1,3);
for i=1:3
    nexttile(t);
    xlabel("Time (s)");
    ylabel("Amplitude");
    title(sprintf("Instrument %d",i))
    p(i) = animatedline(NaN,NaN,Color=lineColor(i));
end
end

ワーカーがクライアントにデータを送信したときにプロットを更新する関数を定義します。

function receiveDataOnClient(p,data)
addpoints(p(data{1,1}),data{1,2},data{1,3})
drawnow limitrate;
end

参考

| |

関連するトピック