Main Content

parfeval

並列プール ワーカーでの関数の非同期実行

説明

F = parfeval(p,fcn,numout,in1,in2,...) は、関数 fcn を並列プール p のワーカー上で非同期実行するよう要求します。このとき、numout 個の出力引数が想定され、また入力引数 in1,in2,... が入力されます。fcn を非同期評価しても MATLAB はブロックされません。Fparallel.FevalFuture オブジェクトで、ワーカーが fcn の評価を完了した時点でこのオブジェクトから結果を取得できます。fcn の評価は必ず実行されます。ただし、cancel(F) を呼び出して明示的に実行をキャンセルした場合を除きます。関数の評価を複数回実行するには、parfeval を複数回呼び出さなければなりません(ただし、parfevalOnAll はすべてのワーカーで同じ関数を実行できます)。

F = parfeval(fcn,numout,in1,in2,...) は現在の並列プールでの非同期実行を要求します。プールが存在しない場合は新しい並列プールが起動されます。ただし、並列基本設定でプールの自動作成が無効になっている場合を除きます。

すべて折りたたむ

parfeval または parfevalOnAll を使用してバックグラウンドで計算を実行する際には、future と呼ばれるオブジェクトを作成します。future の State プロパティを使用して、future が実行中か、待機中か、完了しているかを調べることができます。また、並列プールの FevalQueue プロパティを使用して、実行中または待機中の future にアクセスすることもできます。future をキャンセルするには、関数 cancel を使用できます。この例では、以下を行います。

  • cancel を使用して future を直接キャンセルする。

  • 完了した future について完了エラーをチェックする。

  • FevalQueue プロパティを使用して future にアクセスする。

キューへの作業の追加

2 つのワーカーをもつ並列プール p を作成します。

p = parpool(2);
Starting parallel pool (parpool) using the 'local' profile ...
Connected to the parallel pool (number of workers: 2).

parfeval を使用してバックグラウンドで計算を実行するときに、この関数は各計算について future を作成し、プールのキューに追加します。ワーカーがアイドルになるまで、タスクはキューに残ります。ワーカーがアイドルになると、キューが空でない場合はタスクの計算が開始されます。ワーカーがタスクを完了すると、タスクはキューから削除され、そのワーカーはアイドルになります。

parfeval を使用して、ワーカーに関数 pause を実行するように命令することにより、future の配列 f を作成します。3 番目の future に引数 1 を使用し、その他すべての future に引数 Inf を使用します。

for n = 1:5
    if n == 3
        f(n) = parfeval(@pause,0,1);
    else
        f(n) = parfeval(@pause,0,Inf);
    end
end

parfeval の使用ごとに、ワーカー上での関数の実行を表す future オブジェクトが返されます。3 番目の future を除く各 future は、計算に無限量の時間を要します。parfeval(@pause,0,Inf) によって作成された future は、キューを低速化する future の極端な例です。

future の直接キャンセル

State プロパティを使用して、future のステータスを取得できます。f. の各 future の状態からなる cell 配列を作成します。

{f.State}
ans = 1×5 cell
    {'running'}    {'running'}    {'queued'}    {'queued'}    {'queued'}

3 番目を除く各タスクが永久に一時停止しています。

cancel を使用して、2 番目の future を直接キャンセルします。

cancel(f(2));
{f.State}
ans = 1×5 cell
    {'running'}    {'finished'}    {'running'}    {'queued'}    {'queued'}

2 番目の future をキャンセルすると、3 番目の future が実行されます。3 番目の future が完了するまで待ってから、再び状態を調べます。

wait(f(3));
{f.State}
ans = 1×5 cell
    {'running'}    {'finished'}    {'finished'}    {'running'}    {'queued'}

3 番目の future の状態は 'finished' になっています。

完了エラーのチェック

future が完了すると、その State プロパティは 'finished' になります。キャンセルされた future と正常に完了した future を区別するには、Error プロパティを使用します。

fprintf("f(2): %s\n", f(2).Error.message)
f(2): Execution of the future was cancelled.
fprintf("f(3): %s\n", f(3).Error.message)
f(3): 

message プロパティが示すように、コードは 2 番目の future をキャンセルします。message プロパティで述べられているように、2 番目の future はキャンセルされました。3 番目の future はエラーなしで完了するため、エラー メッセージはありません。

プール キュー内の future のキャンセル

FevalQueue プロパティを使用して、プール キュー内の future にアクセスできます。

p.FevalQueue
ans = 
 FevalQueue with properties: 

        Number Queued: 1
       Number Running: 2

キューには RunningFuturesQueuedFutures の 2 つのプロパティがあります。RunningFutures プロパティは、現在実行中のタスクに対応する future からなる配列です。

disp(p.FevalQueue.RunningFutures)
 1x2 FevalFuture array:
 
         ID              State  FinishDateTime  Function  Error
       --------------------------------------------------------
    1    12            running                    @pause       
    2    15            running                    @pause       

QueuedFutures プロパティは、現在待機中で実行されていないタスクに対応する future からなる配列です。

disp(p.FevalQueue.QueuedFutures)
 FevalFuture with properties: 

                   ID: 16
             Function: @pause
       CreateDateTime: 15-Jul-2020 17:29:37
        StartDateTime: 
     Running Duration: 0 days 0h 0m 0s
                State: queued
                Error: none

1 つの future、または future の配列をキャンセルできます。QueuedFutures にあるすべての future をキャンセルします。

cancel(p.FevalQueue.QueuedFutures);
{f.State}
ans = 1×5 cell
    {'running'}    {'finished'}    {'finished'}    {'running'}    {'finished'}

RunningFutures および QueuedFutures は、f が最新から最古へという順になっているかどうかにかかわらず、最新から最古へと並べ替えられます。各 future には、クライアントの有効期間に対する一意の ID プロパティがあります。f の各 future の ID プロパティをチェックします。

disp(f)
 1x5 FevalFuture array:
 
         ID              State        FinishDateTime  Function  Error
       --------------------------------------------------------------
    1    12            running                          @pause       
    2    13  finished (unread)  15-Jul-2020 17:29:37    @pause  Error
    3    14  finished (unread)  15-Jul-2020 17:29:39    @pause       
    4    15            running                          @pause       
    5    16  finished (unread)  15-Jul-2020 17:29:39    @pause  Error

結果を、それぞれの RunningFuturesID プロパティと比較します。

for j = 1:length(p.FevalQueue.RunningFutures)
    rf = p.FevalQueue.RunningFutures(j);
    fprintf("p.FevalQueue.RunningFutures(%i): ID = %i\n", j, rf.ID)
end
p.FevalQueue.RunningFutures(1): ID = 12
p.FevalQueue.RunningFutures(2): ID = 15

ここで、RunningFuturesf(1)f(4) を含む配列です。RunningFutures(2) をキャンセルすると、4 番目の future f(4) がキャンセルされます。

場合によっては、ワークスペースで future を使用できないことがあります。たとえば、コードが完了する前に再び同じコードを実行する場合や、関数内で parfeval を使用している場合などです。ワークスペースで使用できない future はキャンセルできます。

ワークスペースから f をクリアします。

clear f

RunningFuturesQueuedFutures を使用して、未完了の future にアクセスできます。RunningFutures を使用して f(4) をキャンセルします。

rf2 = p.FevalQueue.RunningFutures(2);
cancel(rf2)
rf2.State
ans = 
'finished'

まだキュー内にあるすべての future をキャンセルするには、次のコードを使用します。

cancel([p.FevalQueue.RunningFutures p.FevalQueue.QueuedFutures])

parfeval を使用して、ワーカー上での関数の非同期実行を要求します。

たとえば、並列プールに 1 件の要求を投入します。fetchOutputs を使用して出力を取得します。

f = parfeval(@magic,1,10);
value = fetchOutputs(f);

また、for ループに複数の Future 要求のベクトルを投入し、結果が使用可能になった時点で収集することもできます。効率性を高めるため、Future オブジェクトの配列を事前に割り当てます。

f(1:10) = parallel.FevalFuture;
for idx = 1:10
    f(idx) = parfeval(@magic,1,idx);
end

fetchNext を使用して、個々の Future 出力が使用可能になった時点で取得します。

magicResults = cell(1,10);
for idx = 1:10
    [completedIdx,value] = fetchNext(f);
    magicResults{completedIdx} = value;
    fprintf('Got result with index: %d.\n', completedIdx);
end

この例では、並列パラメーター スイープを parfeval により実行し、その結果を計算中に DataQueue オブジェクトによって戻す方法を示します。parfeval は MATLAB をブロックしないため、計算の実行中も作業を続行できます。

この例では、ローレンツ常微分方程式系のパラメーター σ および ρ に対してパラメーター スイープを実行し、この系のカオス的性質を説明します。

ddtx=σ(y-z)ddty=x(ρ-z)-yddtz=xy-βx

パラメーター グリッドの作成

パラメーター スイープで調べるパラメーターの範囲を定義します。

gridSize = 40;
sigma = linspace(5, 45, gridSize);
rho = linspace(50, 100, gridSize);
beta = 8/3;

関数 meshgrid を使用して、パラメーターの 2 次元グリッドを作成します。

[rho,sigma] = meshgrid(rho,sigma);

figure オブジェクトを作成し、'Visible'true に設定すると、このオブジェクトがライブ スクリプトの外側の新しいウィンドウで開きます。パラメーター スイープの結果を可視化するには、表面プロットを作成します。表面の Z 要素を NaN で初期化すると、空のプロットが作成されることに注意してください。

figure('Visible',true);
surface = surf(rho,sigma,NaN(size(sigma)));
xlabel('\rho','Interpreter','Tex')
ylabel('\sigma','Interpreter','Tex')

並列環境の設定

関数 parpool を使用して並列ワーカーのプールを作成します。

parpool;
Starting parallel pool (parpool) using the 'local' profile ...
Connected to the parallel pool (number of workers: 6).

ワーカーからデータを送信するには、DataQueue オブジェクトを作成します。関数 afterEach を使用して、ワーカーがデータを送信するたびに表面プロットを更新する関数を設定します。関数 updatePlot は、この例の最後で定義するサポート関数です。

Q = parallel.pool.DataQueue;
afterEach(Q,@(data) updatePlot(surface,data));

並列パラメーター スイープの実行

パラメーターを定義した後、並列パラメーター スイープを実行できます。

作業負荷を分散すると、parfeval の効率性が向上します。作業負荷を分散するには、調べるパラメーターをグループ化して分割します。この例では、コロン演算子 (:) を使用して、サイズを step として均等に分割します。この結果得られる配列 partitions には、分割の境界が含まれます。最後の分割の終点を追加しなければならないことに注意してください。

step = 100;
partitions = [1:step:numel(sigma), numel(sigma)+1]
partitions = 1×17

           1         101         201         301         401         501         601         701         801         901        1001        1101        1201        1301        1401        1501        1601

最良のパフォーマンスを得るには、次のように分割するようにします。

  • 分割のスケジューリングのオーバーヘッドよりも計算時間が長くなる程度に大きい

  • すべてのワーカーをビジー状態に維持するために十分な分割数が存在する程度に小さい

並列ワーカーでの関数の実行を表し、その結果を保持するには、future オブジェクトを使用します。

f(1:numel(partitions)-1) = parallel.FevalFuture;

関数 parfeval を使用して、計算を並列ワーカーにオフロードします。parameterSweep はこのスクリプトの最後で定義されている補助関数で、調べるパラメーターの分割についてのローレンツ系を解きます。これには出力引数が 1 つあるため、parfeval では出力の数として 1 を指定しなければなりません。

for ii = 1:numel(partitions)-1
    f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);
end

parfeval は MATLAB をブロックしないため、計算の実行中に作業を続行できます。ワーカーは並列で計算を実行し、中間結果を使用できるようになったら DataQueue によって送信します。

parfeval が完了するまで MATLAB をブロックする場合は、future オブジェクトに対して関数 wait を使用します。この後のコードが parfeval の完了に依存する場合、関数 wait を使用すると便利です。

wait(f);

parfeval が計算を完了すると、wait は終了し、さらにコードを実行できるようになります。たとえば、結果として生成される表面の輪郭をプロットします。関数 fetchOutputs を使用して、future オブジェクトに保存された結果を取得します。

results = reshape(fetchOutputs(f),gridSize,[]);
contourf(rho,sigma,results)
xlabel('\rho','Interpreter','Tex')
ylabel('\sigma','Interpreter','Tex')

パラメーター スイープで計算リソースが多く必要で、クラスターにアクセス可能な場合は、parfeval の計算をスケール アップできます。詳細については、デスクトップからクラスターへのスケール アップを参照してください。

補助関数の定義

調べるパラメーターの分割のローレンツ系を解く補助関数を定義します。DataQueue オブジェクトで関数 send を使用して、中間結果を MATLAB クライアントに送信します。

function results = parameterSweep(first,last,sigma,rho,beta,Q)
    results = zeros(last-first,1);
    for ii = first:last-1
        lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)];
        [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]);
        result = a(end,3);
        send(Q,[ii,result]);
        results(ii-first+1) = result;
    end
end

新しいデータを受け取った時点で表面プロットを更新する別の補助関数を定義します。

function updatePlot(surface,data)
    surface.ZData(data(1)) = data(2);
    drawnow('limitrate');
end

parfeval を使用してワーカーで非同期の計算を実行し、ユーザー インターフェイスを応答可能な状態に保つことができます。中間計算が使用可能になったら、afterEach を使用してユーザー インターフェイスを更新します。すべての計算が使用可能になったら、afterAll を使用してユーザー インターフェイスを更新します。

waitbar を使用して単純なユーザー インターフェイスを作成します。

h = waitbar(0, 'Waiting...');

parfeval を使用して、乱数行列の固有値などの時間のかかる計算をワーカーで実行します。計算は非同期で実行され、計算中にユーザー インターフェイスが更新されます。parfevalparpool が作成されていなければ、自動的に既定の基本設定で作成します。

for idx = 1:100
    f(idx) = parfeval(@(n) real(eig(randn(n))), 1, 5e2); 
end

各計算が使用可能になったら、afterEach を使用してその最大値を計算します。各 Future が完了するたびに、afterEach を使用して、ウェイトバーに示される終了した Future の比率を更新します。

maxFuture = afterEach(f, @max, 1);
updateWaitbarFuture = afterEach(f, @(~) waitbar(sum(strcmp('finished', {f.State}))/numel(f), h), 1);

すべての計算が完了したら、ウェイトバーを閉じます。閉じる操作で自動的に続行するには、afterAllupdateWaitbarFuture に対して使用します。afterAll は Figure のハンドルを updateWaitbarFuture から取得し、その関数をハンドルに対して実行します。

closeWaitbarFuture = afterAll(updateWaitbarFuture, @(h) delete(h), 0);

すべての最大値を計算した後、ヒストグラムを表示します。操作を自動的に続行するには、afterAllmaxFuture に対して使用します。afterAllmaxFuture から最大値を取得し、それに対して histogram を呼び出します。

showsHistogramFuture = afterAll(maxFuture, @histogram, 0);

入力引数

すべて折りたたむ

ワーカーの並列プール。parallel.Pool オブジェクトとして指定します。関数 parpool を使用して並列プールを作成できます。

データ型: parallel.Pool

ワーカーで実行する関数。関数ハンドルとして指定します。

例: fcn = @sum

データ型: function_handle

fcn から返される出力引数の数。

データ型: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

fcn に渡す関数の引数。変数または式のコンマ区切りリストとして指定します。

出力引数

すべて折りたたむ

Future オブジェクト。parallel.FevalFuture として返されます。これは並列ワーカーでの fcn の実行を表し、その結果が格納されます。結果を収集するには、fetchOutputs または fetchNext を使用します。

R2013b で導入