最新のリリースでは、このページがまだ翻訳されていません。 このページの最新版は英語でご覧になれます。

MapReduce を使用するグループごとの平均の計算

この例では、mapreduce を使用してデータセット内のグループごとの平均を計算する方法を示します。データのサブグループに対する計算の実行方法を示しています。

データの準備

airlinesmall.csv データセットを使用してデータ ストアを作成します。この 12 MB のデータセットには、到着時間と出発時間を含む、いくつかの航空会社のフライト情報が 29 列に含まれます。この例では、DayOfWeek および ArrDelay (フライト到着遅延時間) を目的の変数として選択します。

ds = datastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedVariableNames = {'ArrDelay', 'DayOfWeek'};

データ ストアは、既定では 'NA' 値を欠損として扱い、欠損値を NaN 値に置換します。さらに、SelectedVariableNames プロパティにより、選択した目的の変数のみを処理することができ、preview を使用して検査できます。

preview(ds)
ans=8×2 table
    ArrDelay    DayOfWeek
    ________    _________

        8           3    
        8           1    
       21           5    
       13           5    
        4           4    
       59           3    
        3           4    
       11           6    

mapreduce の実行

関数 mapreduce は、入力として map 関数と reduce 関数を必要とします。マッパーはデータのブロックを受け取って中間結果を出力します。リデューサーは中間結果を読み取って最終結果を生成します。

次の例で、マッパーはデータの各ブロックで遅延時間のカウントと合計を曜日ごとに計算し、次に、結果を中間的なキーと値のペアとして保存します。キーは曜日を表す整数 (1 から 7) で、値は各曜日の遅延時間のカウントと合計を表す 2 要素のベクトルです。

map 関数のファイルを表示します。

function meanArrivalDelayByDayMapper(data, ~, intermKVStore)
  % Data is an n-by-2 table: first column is the DayOfWeek and the second
  % is the ArrDelay. Remove missing values first.
  delays = data.ArrDelay;
  day = data.DayOfWeek;
  notNaN = ~isnan(delays);
  day = day(notNaN);
  delays = delays(notNaN);

  % find the unique days in this chunk
  [intermKeys,~,idx] = unique(day, 'stable');

  % group delays by idx and apply @grpstatsfun function to each group
  intermVals = accumarray(idx,delays,size(intermKeys),@countsum);
  addmulti(intermKVStore,intermKeys,intermVals);

  function out = countsum(x)
    n = length(x); % count
    s = sum(x); % mean
    out = {[n, s]};
  end
end

Map フェーズの後、mapreduce は中間のキーと値のペアを一意なキー (この場合、曜日) ごとにグループ化します。したがって、リデューサーへの各呼び出しは、ある曜日に関連付けられた値を処理します。リデューサーは、入力キー (intermKey) で指定された曜日の遅延時間の中間的なカウントと合計のリストを受け取り、総カウント n と総合計 s に値を合計します。次に、リデューサーは全体的な平均を計算し、最終的なキーと値のペアを 1 つ出力に追加します。このキーと値のペアは、ある曜日のフライト到着遅延時間の平均を表します。

reduce 関数のファイルを表示します。

function meanArrivalDelayByDayReducer(intermKey, intermValIter, outKVStore)
  n = 0;
  s = 0;

  % get all sets of intermediate results
  while hasnext(intermValIter)
    intermValue = getnext(intermValIter);
    n = n + intermValue(1);
    s = s + intermValue(2);
  end

  % accumulate the sum and count
  mean = s/n;
  % add results to the output datastore
  add(outKVStore,intermKey,mean);
end

mapreduce を使用して、map 関数および reduce 関数をデータ ストア ds に適用します。

meanDelayByDay = mapreduce(ds, @meanArrivalDelayByDayMapper, ...
                               @meanArrivalDelayByDayReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce  14%
Map 100% Reduce  29%
Map 100% Reduce  43%
Map 100% Reduce  57%
Map 100% Reduce  71%
Map 100% Reduce  86%
Map 100% Reduce 100%

mapreduce は、現在のフォルダー内のファイルでデータ ストア meanDelayByDay を返します。

出力データ ストア meanDelayByDay から最終結果を読み取ります。

result = readall(meanDelayByDay)
result=7×2 table
    Key      Value   
    ___    __________

     3     {[7.0038]}
     1     {[7.0833]}
     5     {[9.4193]}
     4     {[9.3185]}
     6     {[4.2095]}
     2     {[5.8569]}
     7     {[6.5241]}

結果の整理

整数のキー (1 から 7) は曜日を表します。結果をさらに整理するには、キーを categorical 配列に変換し、単独のセル要素から数値を取得して結果のテーブルの変数の名前を変更します。

result.Key = categorical(result.Key, 1:7, ...
               {'Mon','Tue','Wed','Thu','Fri','Sat','Sun'});
result.Value = cell2mat(result.Value);
result.Properties.VariableNames = {'DayOfWeek', 'MeanArrDelay'}
result=7×2 table
    DayOfWeek    MeanArrDelay
    _________    ____________

       Wed          7.0038   
       Mon          7.0833   
       Fri          9.4193   
       Thu          9.3185   
       Sat          4.2095   
       Tue          5.8569   
       Sun          6.5241   

テーブルの行を、平均フライト到着遅延時間で並べ替えます。これにより、旅行をするなら土曜日が最良で、金曜日が最悪なことがわかります。

result = sortrows(result,'MeanArrDelay')
result=7×2 table
    DayOfWeek    MeanArrDelay
    _________    ____________

       Sat          4.2095   
       Tue          5.8569   
       Sun          6.5241   
       Wed          7.0038   
       Mon          7.0833   
       Thu          9.3185   
       Fri          9.4193   

ローカル関数

ここに挙げるのは、mapreduce がデータに適用する map 関数と reduce 関数です。

function meanArrivalDelayByDayMapper(data, ~, intermKVStore)
  % Data is an n-by-2 table: first column is the DayOfWeek and the second
  % is the ArrDelay. Remove missing values first.
  delays = data.ArrDelay;
  day = data.DayOfWeek;
  notNaN = ~isnan(delays);
  day = day(notNaN);
  delays = delays(notNaN);

  % find the unique days in this chunk
  [intermKeys,~,idx] = unique(day, 'stable');

  % group delays by idx and apply @grpstatsfun function to each group
  intermVals = accumarray(idx,delays,size(intermKeys),@countsum);
  addmulti(intermKVStore,intermKeys,intermVals);

  function out = countsum(x)
    n = length(x); % count
    s = sum(x); % mean
    out = {[n, s]};
  end
end
%---------------------------------------------------------------------------
function meanArrivalDelayByDayReducer(intermKey, intermValIter, outKVStore)
  n = 0;
  s = 0;

  % get all sets of intermediate results
  while hasnext(intermValIter)
    intermValue = getnext(intermValIter);
    n = n + intermValue(1);
    s = s + intermValue(2);
  end

  % accumulate the sum and count
  mean = s/n;
  % add results to the output datastore
  add(outKVStore,intermKey,mean);
end
%---------------------------------------------------------------------------

参考

|

関連するトピック