このページの内容は最新ではありません。最新版の英語を参照するには、ここをクリックします。
parallel.pool.DataQueue
クライアントとワーカーの間でのデータの送信とリスニング
説明
DataQueue
オブジェクトにより、計算の実行中に並列プール内のワーカーとクライアント間で送信されるデータまたはメッセージを非同期で自動処理できます。たとえば、中間値をクライアントに送信して、計算の進行状況を自動計算できます。
並列プールのワーカーからクライアントにデータを送り返すには、まずクライアントに DataQueue
オブジェクトを作成します。この DataQueue
を、parfor
ループ、または spmd
などの他の並列言語構成に渡します。ワーカーから send
を呼び出して、データをクライアントに送り返します。クライアントで、受信したデータを自動処理する関数を afterEach
を使用して指定します。
必要に応じて、
DataQueue
を作成したワーカーまたはクライアントからsend
を呼び出すことができます。ワーカー上にキューを作成し、そのキューをクライアントに送り返すことにより、逆方向の通信を有効にできます。
R2023b より前: ワーカーから別のワーカーにキューを送信することはできません。ワーカー間でデータを転送するには、代わりに
spmd
、spmdSend
またはspmdReceive
を使用してください。その他すべてのハンドル オブジェクトとは異なり、
DataQueue
インスタンスとPollableDataQueue
インスタンスはワーカーに送信されても接続されたままになります。
作成
説明
は、クライアントとワーカー間でのメッセージ (またはデータ) の送信またはリスニングに使用できるオブジェクトを作成します。データを受信するワーカーまたはクライアント上で、q
= parallel.pool.DataQueueDataQueue
を作成します。
プロパティ
QueueLength
— 現在キューに保持されているアイテムの数
0 または正の整数
このプロパティは読み取り専用です。
キューからの削除待ちのデータ アイテム数。0 または正の整数として指定します。値は 0
か、または DataQueue
インスタンスを作成したワーカーまたはクライアント上では正の整数です。クライアントが DataQueue
インスタンスを作成する場合、値はすべてのワーカー上で 0
です。あるワーカーが DataQueue
を作成する場合、値はクライアント上およびその他すべてのワーカー上で 0
です。
例
parfor
ループでのメッセージの送信、およびキューでのメッセージのディスパッチ
DataQueue
を作成し、afterEach
を呼び出します。
q = parallel.pool.DataQueue; afterEach(q, @disp);
parfor
ループを開始し、メッセージを送信します。保留中のメッセージ (この例では @disp
) が関数 afterEach
に渡されます。
DataQueue
の長さを求める
メッセージを DataQueue
オブジェクトに送信すると、メッセージはリスナーによって処理されるまでキューで待機します。メッセージごとに、キューの長さに 1
が加算されます。この例では、QueueLength
プロパティを使用して DataQueue
オブジェクトの長さを求めます。
いずれかのクライアントまたはワーカーが DataQueue
オブジェクトを作成すると、キューに送信されたすべてのメッセージはそのクライアントまたはワーカーのメモリに保持されます。クライアントが DataQueue
オブジェクトを作成する場合、すべてのワーカー上の QueueLength
プロパティは 0
です。この例ではクライアント上に DataQueue
オブジェクトを作成し、ワーカーからデータを送信します。
まず、1 つのワーカーをもつ並列プールを作成します。
parpool(1);
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 1 workers.
次に、DataQueue
を作成します。
q = parallel.pool.DataQueue
q = DataQueue with properties: QueueLength: 0
新規に作成された DataQueue
には空のキューがあります。parfor
を使用して、ワーカー上の q.QueueLength
を求めることができます。クライアント上のキューの長さ、およびワーカー上のキューの長さを求めます。
fprintf('On the client: %i\n', q.QueueLength)
On the client: 0
parfor i = 1 fprintf('On the worker: %i\n', q.QueueLength) end
On the worker: 0
キューは空であるため、クライアントとワーカーの両方で QueueLength
は 0
です。次に、ワーカーからキューにメッセージを送信します。続いて、QueueLength
プロパティを使用してキューの長さを求めます。
% Send a message first parfor i = 1 send(q, 'A message'); end % Find the length fprintf('On the client: %i\n', q.QueueLength)
On the client: 1
parfor i = 1 fprintf('On the worker: %i\n', q.QueueLength) end
On the worker: 0
QueueLength
プロパティはクライアントで 1
であり、ワーカーで 0
です。データをただちに表示してキューを処理するリスナーを作成します。
el = afterEach(q, @disp);
キューが空になるまで待ってから、リスナーを削除します。
while q.QueueLength > 0 pause(0.1); end delete(el);
QueueLength
プロパティを使用して、キューの長さを求めます。
fprintf('On the client: %i\n', q.QueueLength)
On the client: 0
キューの処理が完了しているため、QueueLength
は 0
です。
DataQueue
オブジェクトと parfor
を使用したウェイト バーの更新
この例では、DataQueue
を使用して、parfor
ループの進行状況に合わせてウェイト バーを更新します。
parfor
ループを作成するときに、各反復を並列プール内のワーカーにオフロードします。ワーカーから情報が返されるのは、parfor
ループが完了したときのみです。DataQueue
を使用すると、各反復の最後にウェイト バーを更新できます。
parfor
ループの進行状況に合わせてウェイト バーを更新する場合、クライアントは残りの反復回数に関する情報を記録しなければなりません。
ヒント
新しい並列コードを作成しているときにコードの進行状況を監視する場合は、parfeval
ワークフローの使用を検討してください。詳細については、afterEach と afterAll を使用したユーザー インターフェイスの非同期更新を参照してください。
この例の最後に定義されている補助関数 parforWaitbar
によって、ウェイト バーは更新されます。この関数は、persistent
を使用して、残りの反復回数に関する情報を保存します。
waitbar
を使用して、ウェイト バー w
を作成します。
w = waitbar(0,'Please wait ...');
DataQueue
、D
を作成します。次に、afterEach
を使用して、メッセージが DataQueue
に送信された後に parforWaitbar
を実行します。
% Create DataQueue and listener
D = parallel.pool.DataQueue;
afterEach(D,@parforWaitbar);
parfor
ループの反復回数 N
を設定します。ウェイト バー w
と反復回数 N
を使用して関数 parforWaitbar
を初期化します。
parfor
ループの反復が終了するたびに、クライアントは parforWaitbar
を実行してウェイト バーを増分更新します。
N = 100; parforWaitbar(w,N)
関数 parforWaitbar
は永続変数を使用して、完了した反復の数をクライアント上で保存します。ワーカーからの情報は必要ありません。
parfor
ループを反復回数 N
で実行します。この例では、pause
と rand
を使用していくつかの作業をシミュレートします。反復が終了するたびに、send
を使用して DataQueue
にメッセージを送信します。メッセージが DataQueue
に送信されると、ウェイト バーは更新されます。ワーカーからの情報は必要ないため、不要なデータ転送を避けるために空のメッセージを送信します。
parfor
ループが完了したら、delete
を使用してウェイト バーを閉じます。
parfor i = 1:N pause(rand) send(D,[]); end delete(w);
補助関数 parforWaitbar
を定義します。入力引数を 2 つ指定して関数 parforWaitbar
を実行すると、3 つの永続変数 (count
、h
および N
) が初期化されます。入力引数を 1 つ指定して parforWaitbar
を実行すると、ウェイト バーが更新されます。
function parforWaitbar(waitbarHandle,iterations) persistent count h N if nargin == 2 % Initialize count = 0; h = waitbarHandle; N = iterations; else % Update the waitbar % Check whether the handle is a reference to a deleted object if isvalid(h) count = count + 1; waitbar(count / N,h); end end end
parfeval
を使用したパラメーター スイープ中のプロット
この例では、並列パラメーター スイープを parfeval
により実行し、その結果を計算中に DataQueue
オブジェクトによって戻す方法を示します。
parfeval
は MATLAB をブロックしないため、計算の実行中に作業を続行できます。
この例では、ローレンツ常微分方程式系のパラメーター および に対してパラメーター スイープを実行し、この系のカオス的性質を説明します。
パラメーター グリッドの作成
パラメーター スイープで調べるパラメーターの範囲を定義します。
gridSize = 40; sigma = linspace(5, 45, gridSize); rho = linspace(50, 100, gridSize); beta = 8/3;
関数 meshgrid
を使用して、パラメーターの 2 次元グリッドを作成します。
[rho,sigma] = meshgrid(rho,sigma);
figure オブジェクトを作成し、'Visible'
を true
に設定すると、このオブジェクトがライブ スクリプトの外側の新しいウィンドウで開きます。パラメーター スイープの結果を可視化するには、表面プロットを作成します。表面の Z
要素を NaN
で初期化すると、空のプロットが作成されることに注意してください。
figure('Visible',true); surface = surf(rho,sigma,NaN(size(sigma))); xlabel('\rho','Interpreter','Tex') ylabel('\sigma','Interpreter','Tex')
並列環境の設定
関数 parpool
を使用して並列ワーカーのプールを作成します。
parpool;
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to the parallel pool (number of workers: 6).
ワーカーからデータを送信するには、DataQueue
オブジェクトを作成します。関数 afterEach
を使用して、ワーカーがデータを送信するたびに表面プロットを更新する関数を設定します。関数 updatePlot
は、この例の最後で定義するサポート関数です。
Q = parallel.pool.DataQueue; afterEach(Q,@(data) updatePlot(surface,data));
並列パラメーター スイープの実行
パラメーターを定義した後、並列パラメーター スイープを実行できます。
作業負荷を分散すると、parfeval
の効率性が向上します。作業負荷を分散するには、調べるパラメーターをグループ化して分割します。この例では、コロン演算子 (:
) を使用して、サイズを step
として均等に分割します。この結果得られる配列 partitions
には、分割の境界が含まれます。最後の分割の終点を追加しなければならないことに注意してください。
step = 100; partitions = [1:step:numel(sigma), numel(sigma)+1]
partitions = 1×17
1 101 201 301 401 501 601 701 801 901 1001 1101 1201 1301 1401 1501 1601
最良のパフォーマンスを得るには、次のように分割するようにします。
分割のスケジューリングのオーバーヘッドよりも計算時間が長くなる程度に大きい
すべてのワーカーをビジー状態に維持するために十分な分割数が存在する程度に小さい
並列ワーカーでの関数の実行を表し、その結果を保持するには、future オブジェクトを使用します。
f(1:numel(partitions)-1) = parallel.FevalFuture;
関数 parfeval
を使用して、計算を並列ワーカーにオフロードします。parameterSweep
はこのスクリプトの最後で定義されている補助関数で、調べるパラメーターの分割についてのローレンツ系を解きます。これには出力引数が 1 つあるため、parfeval
では出力の数として 1
を指定しなければなりません。
for ii = 1:numel(partitions)-1 f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q); end
parfeval
は MATLAB をブロックしないため、計算の実行中に作業を続行できます。ワーカーは並列で計算を実行し、中間結果を使用できるようになったら DataQueue
によって送信します。
parfeval
が完了するまで MATLAB をブロックする場合は、future オブジェクトに対して関数 wait
を使用します。この後のコードが parfeval
の完了に依存する場合、関数 wait
を使用すると便利です。
wait(f);
parfeval
が計算を完了すると、wait
は終了し、さらにコードを実行できるようになります。たとえば、結果として生成される表面の輪郭をプロットします。関数 fetchOutputs
を使用して、future オブジェクトに保存された結果を取得します。
results = reshape(fetchOutputs(f),gridSize,[]); contourf(rho,sigma,results) xlabel('\rho','Interpreter','Tex') ylabel('\sigma','Interpreter','Tex')
パラメーター スイープで計算リソースが多く必要で、クラスターにアクセス可能な場合は、parfeval
の計算をスケール アップできます。詳細については、デスクトップからクラスターへのスケール アップを参照してください。
補助関数の定義
調べるパラメーターの分割のローレンツ系を解く補助関数を定義します。DataQueue
オブジェクトで関数 send
を使用して、中間結果を MATLAB クライアントに送信します。
function results = parameterSweep(first,last,sigma,rho,beta,Q) results = zeros(last-first,1); for ii = first:last-1 lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)]; [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]); result = a(end,3); send(Q,[ii,result]); results(ii-first+1) = result; end end
新しいデータを受け取った時点で表面プロットを更新する別の補助関数を定義します。
function updatePlot(surface,data) surface.ZData(data(1)) = data(2); drawnow('limitrate'); end
ヒント
DataQueue
オブジェクトを使用して送信されたデータまたはメッセージは、自動処理のみができます。クライアントまたはワーカー上での受信後にデータを手動で取得するには、代わりにparallel.pool.PollableDataQueue
オブジェクトを使用してデータを送信します。
バージョン履歴
R2017a で導入
MATLAB コマンド
次の MATLAB コマンドに対応するリンクがクリックされました。
コマンドを MATLAB コマンド ウィンドウに入力して実行してください。Web ブラウザーは MATLAB コマンドをサポートしていません。
Select a Web Site
Choose a web site to get translated content where available and see local events and offers. Based on your location, we recommend that you select: .
You can also select a web site from the following list:
How to Get Best Site Performance
Select the China site (in Chinese or English) for best site performance. Other MathWorks country sites are not optimized for visits from your location.
Americas
- América Latina (Español)
- Canada (English)
- United States (English)
Europe
- Belgium (English)
- Denmark (English)
- Deutschland (Deutsch)
- España (Español)
- Finland (English)
- France (Français)
- Ireland (English)
- Italia (Italiano)
- Luxembourg (English)
- Netherlands (English)
- Norway (English)
- Österreich (Deutsch)
- Portugal (English)
- Sweden (English)
- Switzerland
- United Kingdom (English)