Main Content

このページの内容は最新ではありません。最新版の英語を参照するには、ここをクリックします。

parallel.pool.DataQueue

クライアントとワーカーの間でのデータの送信とリスニング

    説明

    DataQueue オブジェクトにより、計算の実行中に並列プール内のワーカーとクライアント間で送信されるデータまたはメッセージを非同期で自動処理できます。たとえば、中間値をクライアントに送信して、計算の進行状況を自動計算できます。

    並列プールのワーカーからクライアントにデータを送り返すには、まずクライアントに DataQueue オブジェクトを作成します。この DataQueue を、parfor ループ、または spmd などの他の並列言語構成に渡します。ワーカーから send を呼び出して、データをクライアントに送り返します。クライアントで、受信したデータを自動処理する関数を afterEach を使用して指定します。

    • 必要に応じて、DataQueue を作成したワーカーまたはクライアントから send を呼び出すことができます。

    • ワーカー上にキューを作成し、そのキューをクライアントに送り返すことにより、逆方向の通信を有効にできます。

      R2023b より前: ワーカーから別のワーカーにキューを送信することはできません。ワーカー間でデータを転送するには、代わりに spmdspmdSend または spmdReceive を使用してください。

    • その他すべてのハンドル オブジェクトとは異なり、DataQueue インスタンスと PollableDataQueue インスタンスはワーカーに送信されても接続されたままになります。

    作成

    説明

    q = parallel.pool.DataQueue は、クライアントとワーカー間でのメッセージ (またはデータ) の送信またはリスニングに使用できるオブジェクトを作成します。データを受信するワーカーまたはクライアント上で、DataQueue を作成します。

    プロパティ

    すべて展開する

    このプロパティは読み取り専用です。

    キューからの削除待ちのデータ アイテム数。0 または正の整数として指定します。値は 0 か、または DataQueue インスタンスを作成したワーカーまたはクライアント上では正の整数です。クライアントが DataQueue インスタンスを作成する場合、値はすべてのワーカー上で 0 です。あるワーカーが DataQueue を作成する場合、値はクライアント上およびその他すべてのワーカー上で 0 です。

    オブジェクト関数

    afterEachDataQueue で新規データを受信したときに呼び出す関数の定義
    sendデータ キューを使用したクライアントからワーカーへのデータの送信

    すべて折りたたむ

    DataQueue を作成し、afterEach を呼び出します。

    q = parallel.pool.DataQueue;
    afterEach(q, @disp);
    
    parfor ループを開始し、メッセージを送信します。保留中のメッセージ (この例では @disp) が関数 afterEach に渡されます。

    parfor i = 1:3
        send(q, i); 
    end;
         1
    
         2
    
         3

    DataQueue によるデータのリスニングの詳細については、afterEach を参照してください。

    メッセージを DataQueue オブジェクトに送信すると、メッセージはリスナーによって処理されるまでキューで待機します。メッセージごとに、キューの長さに 1 が加算されます。この例では、QueueLength プロパティを使用して DataQueue オブジェクトの長さを求めます。

    いずれかのクライアントまたはワーカーが DataQueue オブジェクトを作成すると、キューに送信されたすべてのメッセージはそのクライアントまたはワーカーのメモリに保持されます。クライアントが DataQueue オブジェクトを作成する場合、すべてのワーカー上の QueueLength プロパティは 0 です。この例ではクライアント上に DataQueue オブジェクトを作成し、ワーカーからデータを送信します。

    まず、1 つのワーカーをもつ並列プールを作成します。

    parpool(1);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 1 workers.
    

    次に、DataQueue を作成します。

    q = parallel.pool.DataQueue
    q = 
      DataQueue with properties:
    
        QueueLength: 0
    
    

    新規に作成された DataQueue には空のキューがあります。parfor を使用して、ワーカー上の q.QueueLength を求めることができます。クライアント上のキューの長さ、およびワーカー上のキューの長さを求めます。

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    キューは空であるため、クライアントとワーカーの両方で QueueLength0 です。次に、ワーカーからキューにメッセージを送信します。続いて、QueueLength プロパティを使用してキューの長さを求めます。

    % Send a message first
    parfor i = 1
        send(q, 'A message');
    end
    
    % Find the length
    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 1
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    QueueLength プロパティはクライアントで 1 であり、ワーカーで 0 です。データをただちに表示してキューを処理するリスナーを作成します。

    el = afterEach(q, @disp);

    キューが空になるまで待ってから、リスナーを削除します。

    while q.QueueLength > 0
        pause(0.1);
    end
    delete(el);

    QueueLength プロパティを使用して、キューの長さを求めます。

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    

    キューの処理が完了しているため、QueueLength0 です。

    この例では、DataQueue を使用して、parfor ループの進行状況に合わせてウェイト バーを更新します。

    parfor ループを作成するときに、各反復を並列プール内のワーカーにオフロードします。ワーカーから情報が返されるのは、parfor ループが完了したときのみです。DataQueue を使用すると、各反復の最後にウェイト バーを更新できます。

    parfor ループの進行状況に合わせてウェイト バーを更新する場合、クライアントは残りの反復回数に関する情報を記録しなければなりません。

    ヒント

    新しい並列コードを作成しているときにコードの進行状況を監視する場合は、parfeval ワークフローの使用を検討してください。詳細については、afterEach と afterAll を使用したユーザー インターフェイスの非同期更新を参照してください。

    この例の最後に定義されている補助関数 parforWaitbar によって、ウェイト バーは更新されます。この関数は、persistent を使用して、残りの反復回数に関する情報を保存します。

    waitbar を使用して、ウェイト バー w を作成します。

    w = waitbar(0,'Please wait ...');

    DataQueueD を作成します。次に、afterEach を使用して、メッセージが DataQueue に送信された後に parforWaitbar を実行します。

    % Create DataQueue and listener
    D = parallel.pool.DataQueue;
    afterEach(D,@parforWaitbar);

    parfor ループの反復回数 N を設定します。ウェイト バー w と反復回数 N を使用して関数 parforWaitbar を初期化します。

    parfor ループの反復が終了するたびに、クライアントは parforWaitbar を実行してウェイト バーを増分更新します。

    N = 100;
    parforWaitbar(w,N)
    

    関数 parforWaitbar は永続変数を使用して、完了した反復の数をクライアント上で保存します。ワーカーからの情報は必要ありません。

    parfor ループを反復回数 N で実行します。この例では、pauserand を使用していくつかの作業をシミュレートします。反復が終了するたびに、send を使用して DataQueue にメッセージを送信します。メッセージが DataQueue に送信されると、ウェイト バーは更新されます。ワーカーからの情報は必要ないため、不要なデータ転送を避けるために空のメッセージを送信します。

    parfor ループが完了したら、delete を使用してウェイト バーを閉じます。

    parfor i = 1:N
        pause(rand)
        send(D,[]);
    end
    
    delete(w);
    

    補助関数 parforWaitbar を定義します。入力引数を 2 つ指定して関数 parforWaitbar を実行すると、3 つの永続変数 (counth および N) が初期化されます。入力引数を 1 つ指定して parforWaitbar を実行すると、ウェイト バーが更新されます。

    function parforWaitbar(waitbarHandle,iterations)
        persistent count h N
        
        if nargin == 2
            % Initialize
            
            count = 0;
            h = waitbarHandle;
            N = iterations;
        else
            % Update the waitbar
            
            % Check whether the handle is a reference to a deleted object
            if isvalid(h)
                count = count + 1;
                waitbar(count / N,h);
            end
        end
    end

    Status bar indicating roughly one third completion.

    この例では、並列パラメーター スイープを parfeval により実行し、その結果を計算中に DataQueue オブジェクトによって戻す方法を示します。

    parfeval は MATLAB をブロックしないため、計算の実行中に作業を続行できます。

    この例では、ローレンツ常微分方程式系のパラメーター σ および ρ に対してパラメーター スイープを実行し、この系のカオス的性質を説明します。

    ddtx=σ(y-z)ddty=x(ρ-z)-yddtz=xy-βx

    パラメーター グリッドの作成

    パラメーター スイープで調べるパラメーターの範囲を定義します。

    gridSize = 40;
    sigma = linspace(5, 45, gridSize);
    rho = linspace(50, 100, gridSize);
    beta = 8/3;

    関数 meshgrid を使用して、パラメーターの 2 次元グリッドを作成します。

    [rho,sigma] = meshgrid(rho,sigma);

    figure オブジェクトを作成し、'Visible'true に設定すると、このオブジェクトがライブ スクリプトの外側の新しいウィンドウで開きます。パラメーター スイープの結果を可視化するには、表面プロットを作成します。表面の Z 要素を NaN で初期化すると、空のプロットが作成されることに注意してください。

    figure('Visible',true);
    surface = surf(rho,sigma,NaN(size(sigma)));
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    並列環境の設定

    関数 parpool を使用して並列ワーカーのプールを作成します。

    parpool;
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to the parallel pool (number of workers: 6).
    

    ワーカーからデータを送信するには、DataQueue オブジェクトを作成します。関数 afterEach を使用して、ワーカーがデータを送信するたびに表面プロットを更新する関数を設定します。関数 updatePlot は、この例の最後で定義するサポート関数です。

    Q = parallel.pool.DataQueue;
    afterEach(Q,@(data) updatePlot(surface,data));

    並列パラメーター スイープの実行

    パラメーターを定義した後、並列パラメーター スイープを実行できます。

    作業負荷を分散すると、parfeval の効率性が向上します。作業負荷を分散するには、調べるパラメーターをグループ化して分割します。この例では、コロン演算子 (:) を使用して、サイズを step として均等に分割します。この結果得られる配列 partitions には、分割の境界が含まれます。最後の分割の終点を追加しなければならないことに注意してください。

    step = 100;
    partitions = [1:step:numel(sigma), numel(sigma)+1]
    partitions = 1×17
    
               1         101         201         301         401         501         601         701         801         901        1001        1101        1201        1301        1401        1501        1601
    
    

    最良のパフォーマンスを得るには、次のように分割するようにします。

    • 分割のスケジューリングのオーバーヘッドよりも計算時間が長くなる程度に大きい

    • すべてのワーカーをビジー状態に維持するために十分な分割数が存在する程度に小さい

    並列ワーカーでの関数の実行を表し、その結果を保持するには、future オブジェクトを使用します。

    f(1:numel(partitions)-1) = parallel.FevalFuture;

    関数 parfeval を使用して、計算を並列ワーカーにオフロードします。parameterSweep はこのスクリプトの最後で定義されている補助関数で、調べるパラメーターの分割についてのローレンツ系を解きます。これには出力引数が 1 つあるため、parfeval では出力の数として 1 を指定しなければなりません。

    for ii = 1:numel(partitions)-1
        f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);
    end

    parfeval は MATLAB をブロックしないため、計算の実行中に作業を続行できます。ワーカーは並列で計算を実行し、中間結果を使用できるようになったら DataQueue によって送信します。

    parfeval が完了するまで MATLAB をブロックする場合は、future オブジェクトに対して関数 wait を使用します。この後のコードが parfeval の完了に依存する場合、関数 wait を使用すると便利です。

    wait(f);

    parfeval が計算を完了すると、wait は終了し、さらにコードを実行できるようになります。たとえば、結果として生成される表面の輪郭をプロットします。関数 fetchOutputs を使用して、future オブジェクトに保存された結果を取得します。

    results = reshape(fetchOutputs(f),gridSize,[]);
    contourf(rho,sigma,results)
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    パラメーター スイープで計算リソースが多く必要で、クラスターにアクセス可能な場合は、parfeval の計算をスケール アップできます。詳細については、デスクトップからクラスターへのスケール アップを参照してください。

    補助関数の定義

    調べるパラメーターの分割のローレンツ系を解く補助関数を定義します。DataQueue オブジェクトで関数 send を使用して、中間結果を MATLAB クライアントに送信します。

    function results = parameterSweep(first,last,sigma,rho,beta,Q)
        results = zeros(last-first,1);
        for ii = first:last-1
            lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)];
            [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]);
            result = a(end,3);
            send(Q,[ii,result]);
            results(ii-first+1) = result;
        end
    end

    新しいデータを受け取った時点で表面プロットを更新する別の補助関数を定義します。

    function updatePlot(surface,data)
        surface.ZData(data(1)) = data(2);
        drawnow('limitrate');
    end

    ヒント

    • DataQueue オブジェクトを使用して送信されたデータまたはメッセージは、自動処理のみができます。クライアントまたはワーカー上での受信後にデータを手動で取得するには、代わりに parallel.pool.PollableDataQueue オブジェクトを使用してデータを送信します。

    バージョン履歴

    R2017a で導入