Main Content

データ ストアの並列分割

並列プールのワーカーごとにデータ ストアの部分を並列に分割すると、次のように多くのケースで利点が得られます。

  • 何らかのアクションを、データ ストア全体の一部に対してのみ実行するかまたは定義済みのいくつかの部分に対して同時に実行

  • それぞれのパーティションですべてのワーカーを同時に稼働させながら、データ ストア内の特定の値を検索

  • すべてのパーティションのワーカーのリダクション計算を実行

データストアからのデータの並列読み取り

この例では、関数partitionを使用してデータストアからのデータの読み取りを並列処理する方法を説明します。MATLAB® に提供されている航空路線の小規模なデータストアを使用し、'ArrDelay' 列から NaN 以外の値の平均を求めます。

逐次実行

平均を計算する簡単な方法は、NaN 以外のすべての値の合計を NaN 以外の値の数で除算することです。補助関数 sumAndCountArrivalDelay 内のコードは、データストアに対してこれをまず、並列以外の方法で実行します。

まず、既存の並列プールをすべて削除します。

delete(gcp('nocreate'));

airlinesmall_subset.xlsx にあるワークシートのコレクションからデータストアを作成し、インポートする ArrDelay 変数を選択します。

関数 sumAndCountArrivalDelay を使用して、並列実行せずに平均を計算します。関数 tic および toc を使用して、この場合と、後の並列の場合での実行時間を測定します。

ds = spreadsheetDatastore(repmat({'airlinesmall_subset.xlsx'},20,1));
ds.SelectedVariableNames = 'ArrDelay';
reset(ds);
tic
  [total,count] = sumAndCountArrivalDelay(ds)
total = 3098060
count = 394940
sumtime = toc
sumtime = 36.6618
mean = total/count
mean = 7.8444

並列実行

関数 partition を使用すると、データストアを、それ自体がそれぞれデータストアとして表現される、より小さな部分に分割できます。これらの小さなデータストアは、互いに完全に独立して動作するため、parfor ループや spmd ブロックなどの並列言語機能内で使用できます。

自動分割の使用

関数numpartitionsを使用して、データストア自体と並列プールのサイズに基づき分割数を指定できます。これは、必ずしもプール内のワーカー数と等しくなくてもかまいません。ループ反復の回数をパーティションの数 (N) に設定します。

以下のコードを入力すると、ローカル クラスター上の並列プールが起動され、次にループ反復のためにワーカー間にデータ ストアが分割されます。このコードは、並列ループ反復のカウントと合計を収集する parfor ループを含む補助関数 parforSumAndCountArrivalDelay を呼び出します。

p = parpool('Processes',4);
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 4 workers.
reset(ds);
tic
  [total,count] = parforSumAndCountArrivalDelay(ds)
total = 3098060
count = 394940
parfortime = toc
parfortime = 11.6383
mean = total/count
mean = 7.8444

分割数の指定

ソフトウェアでパーティション数を計算する代わりに、アルゴリズムに適合させた適切なデータ分割が可能になるように明示的にこの値を設定できます。たとえば、spmd ブロック内からのデータを並列処理する場合、使用するパーティション数にワーカー数 (spmdSize) を指定できます。補助関数 spmdSumAndCountArrivalDelay は、spmd ブロックを使用して並列読み取りを実行し、ワーカー数と等しいパーティション数を明示的に設定します。

reset(ds);
tic
[total,count] = spmdSumAndCountArrivalDelay(ds)
total = 3098060
count = 394940
spmdtime = toc
spmdtime = 11.7520
mean = total/count
mean = 7.8444

計算が完了したら、現在の並列プールを削除できます。

delete(p);

補助関数

並列以外の方法でカウントと合計を収集する補助関数を作成します。

function [total,count] = sumAndCountArrivalDelay(ds)
    total = 0;
    count = 0;
    while hasdata(ds)
        data = read(ds);
        total = total + sum(data.ArrDelay,1,'OmitNaN');
        count = count + sum(~isnan(data.ArrDelay));
    end
end

parfor を使用して、並列でカウントと合計を収集する補助関数を作成します。

function [total, count] = parforSumAndCountArrivalDelay(ds)
    N = numpartitions(ds,gcp);
    total = 0;
    count = 0;    
    parfor ii = 1:N
        % Get partition ii of the datastore.
        subds = partition(ds,N,ii);
    
        [localTotal,localCount] = sumAndCountArrivalDelay(subds);
        total = total + localTotal;
        count = count + localCount;
    end
end

spmd を使用して、並列でカウントと合計を収集する補助関数を作成します。

function [total,count] = spmdSumAndCountArrivalDelay(ds)
    spmd
        subds = partition(ds,spmdSize,spmdIndex);
        [total,count] = sumAndCountArrivalDelay(subds);    
    end
    total = sum([total{:}]);
    count = sum([count{:}]);
end

参考

|

関連する例

詳細