ワーカーでの通信の受信
この例では、ワーカーでデータを受信するためのデータ キューを設定する方法を説明します。
データ キューを使用して、クライアントとワーカーの間でデータまたはメッセージを転送できます。
この例では、ワーカーで計器データを生成し、そのデータをクライアントに送り返します。信号生成の開始と停止を行うために、クライアントはデータ キューを使用してメッセージをワーカーに送信できます。これは、ワーカー上の parfeval
の計算をよりスムーズに停止する方法を提供します。
3 つのワーカーをもつ並列プールを起動します。
pool = parpool("Processes",3);
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 3 workers.
クライアントでのデータ受信のためのキューの設定
ワーカーからの計器データを可視化するプロットを用意して初期化します。関数 createPlots
は、この例の最後で定義されています。
[fig,p] = createPlots;
DataQueue
を作成し、afterEach
を使用して、キューがデータを受信するたびに実行される関数を指定します。関数 receiveDataOnClient
はワーカーから受信したデータをプロットするもので、この例の最後で定義されています。
clientQueue = parallel.pool.DataQueue; afterEach(clientQueue,@(data) receiveDataOnClient(p,data));
ワーカーでの通信受信のためのキューの設定
クライアントにヘルパー PollableDataQueue
を作成します。
helperClientQueue = parallel.pool.PollableDataQueue;
parfeval
を使用して、並列プールの 3 つのワーカーにデータ キューを設定します。補助関数 connectToWorker
は各ワーカーに一意の ID を割り当て、各ワーカーに PollableDataQueue
を作成し、helperClientQueue
キューを使用してデータ キューをクライアントに送信します。その後、ワーカーはクライアントからのデータ生成の開始命令を待ちます。
wkrF(1:3) = parallel.FevalFuture; for ID = 1:3 wkrF(ID) = parfeval(@connectToWorker,0,clientQueue,helperClientQueue,ID); end
クライアントでは、ラベル付きワーカーのキューを受信します。これで、それらのキューを使用して各ワーカーにデータを送信できます。
allWkrQueues = struct('ID',{},'Queue',{}); for i = 1:3 wkrQueue = poll(helperClientQueue,inf); allWkrQueues(wkrQueue.ID) = wkrQueue; end
データ生成の開始と停止
次に、ワーカーにデータ生成を開始するように命令します。
for ID = 1:3 send(allWkrQueues(ID).Queue,"Start generating data"); end
次の図は、各ワーカーが生成してクライアントに送信した計器データを示しています。
fig.Visible="on";
10 秒間データを生成します。
pause(10)
ワーカー 2 でのデータ収集を停止するために、ワーカー 2 で作成したキューを使用して、そのワーカーにメッセージを送信します。Instrument 2 のラインが約 0.9 秒で停止していることが観察できます。
send(allWkrQueues(2).Queue,"stop");
helperClientQueue
キューをポーリングして、ワーカー 2 からの確認を受信します。
[status, ~] = poll(helperClientQueue,inf); disp(status)
Data generation stopped on worker 2
他のワーカーが計算を完了するまで待機します。
wait(wkrF);
補助関数
関数 connectToWorker
は、ワーカーに PollableDataQueue
を作成し、それをクライアントに送信してから、wkrQueue
キューをポーリングしてクライアントからの命令を待ちます。
ワーカーがクライアントからメッセージを受信すると、この関数は計器からの連続データを模したダミー信号をワーカー上に生成します。各タイム ステップで、ワーカーは clientQueue
キューを使用して信号の 1 点をクライアントに送信してから、wkrQueue
キューをポーリングしてそのキューにデータがあるかどうかをチェックします。受信するデータがある場合、ワーカーはデータ生成を停止し、データ生成を停止したことを確認するメッセージをクライアントに送信します。
function connectToWorker(clientQueue,helperClientQueue,ID) % Assign an ID to this worker. wkrQueue.ID = ID; % Create a PollableDataQueue on this specific worker. wkrQueue.Queue = parallel.pool.PollableDataQueue; % Send the queue to the client. send(helperClientQueue,wkrQueue); % Wait for instructions from client. [~, OK] = poll(wkrQueue.Queue,inf); if OK t = 0:0.01:4; step = 1; while step < numel(t) % Generate dummy instrument data. data_point = sin(ID*2*pi*t(step)); % Send data to client using a data queue. send(clientQueue,{ID,t(step),data_point}); % Check if worker queue has data to receive and use a timeout. [~, OK] = poll(wkrQueue.Queue,0.1); if OK send(helperClientQueue,sprintf("Data generation stopped on worker %d",ID)); return else step = step + 1; end end else return end end
ワーカーからのデータを可視化するプロットを用意して初期化する関数を定義します。ワーカーごとに異なるライン プロパティを指定します。
function [fig,p] = createPlots fig = figure(Name="Signal from Instruments",Visible="off"); t = tiledlayout(fig,3,1); lineColor = ["k","b","g"]; p = gobjects(1,3); for i=1:3 nexttile(t); xlabel("Time (s)"); ylabel("Amplitude"); title(sprintf("Instrument %d",i)) p(i) = animatedline(NaN,NaN,Color=lineColor(i)); end end
ワーカーがクライアントにデータを送信したときにプロットを更新する関数を定義します。
function receiveDataOnClient(p,data) addpoints(p(data{1,1}),data{1,2},data{1,3}) drawnow limitrate; end
参考
parallel.pool.PollableDataQueue
| parallel.pool.DataQueue
| afterEach