Main Content

MapReduce を使用する簡単なデータのサブセット化

次の例では、大規模なデータセットのサブセットを抽出する方法を示します。

サブセット化またはクエリの実行には 2 つの面があります。1 つは、データセット内の変数 (列) のサブセットを選択することです。もう 1 つは、観測値、すなわち行のサブセットを選択することです。

次の例では、変数の選択がデータストアの定義時に起こります (map 関数はこれ以上の変数の選択は実行できませんが、この例の範囲内ではありません)。この例では、map 関数の役割は観測値の選択を実行することです。reduce 関数の役割は、map 関数への呼び出しごとに抽出された、サブセット化されたレコードを連結することです。このアプローチでは、Map フェーズ後にデータセットがメモリに収まることを前提としています。

データの準備

airlinesmall.csv データセットを使用してデータストアを作成します。この 12 MB のデータセットには、到着時間と出発時間を含む、いくつかの航空会社のフライト情報が 29 列に含まれます。この例ではデータ内の 29 個の変数のうち、15 変数を使用します。

ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedVariableNames = ds.VariableNames([1 2 5 9 12 13 15 16 17 ...
    18 20 21 25 26 27]);
ds.SelectedVariableNames
ans = 1x15 cell
    {'Year'}    {'Month'}    {'DepTime'}    {'UniqueCarrier'}    {'ActualElapsedTime'}    {'CRSElapsedTime'}    {'ArrDelay'}    {'DepDelay'}    {'Origin'}    {'Dest'}    {'TaxiIn'}    {'TaxiOut'}    {'CarrierDelay'}    {'WeatherDelay'}    {'NASDelay'}

データストアは、既定では 'NA' 値を欠損として扱い、欠損値を NaN 値に置換します。さらに、SelectedVariableNames プロパティを使用すると、指定された対象の変数のみを処理して、preview を使用して確認できます。

preview(ds)
ans=8×15 table
    Year    Month    DepTime    UniqueCarrier    ActualElapsedTime    CRSElapsedTime    ArrDelay    DepDelay    Origin      Dest      TaxiIn    TaxiOut    CarrierDelay    WeatherDelay    NASDelay
    ____    _____    _______    _____________    _________________    ______________    ________    ________    _______    _______    ______    _______    ____________    ____________    ________

    1987     10        642         {'PS'}                53                 57              8          12       {'LAX'}    {'SJC'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       1021         {'PS'}                63                 56              8           1       {'SJC'}    {'BUR'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       2055         {'PS'}                83                 82             21          20       {'SAN'}    {'SMF'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       1332         {'PS'}                59                 58             13          12       {'BUR'}    {'SJC'}     NaN        NaN          NaN             NaN           NaN   
    1987     10        629         {'PS'}                77                 72              4          -1       {'SMF'}    {'LAX'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       1446         {'PS'}                61                 65             59          63       {'LAX'}    {'SJC'}     NaN        NaN          NaN             NaN           NaN   
    1987     10        928         {'PS'}                84                 79              3          -2       {'SAN'}    {'SFO'}     NaN        NaN          NaN             NaN           NaN   
    1987     10        859         {'PS'}               155                143             11          -1       {'SEA'}    {'LAX'}     NaN        NaN          NaN             NaN           NaN   

mapreduce の実行

関数 mapreduce は、入力として map 関数と reduce 関数を必要とします。マッパーはデータのブロックを受け取って中間結果を出力します。リデューサーは中間結果を読み取って最終結果を生成します。

次の例では、マッパーはデータストア内の SelectedVariableNames プロパティで記述される変数をもつ table を受け取ります。次に、ゲートからの離陸時間後の遅延時間が大きかったフライトを抽出します。具体的には、予定時間の 2.5 倍を超える時間のフライトを特定します。この例で対象とする変数の一部が、1995 年より前の年には収集されていなかったため、マッパーではその年より前のデータを無視します。

map 関数のファイルを表示します。

function subsettingMapper(data, ~, intermKVStore)
  % Select flights from 1995 and later that had exceptionally long
  % elapsed flight times (including both time on the tarmac and time in 
  % the air).
  idx = data.Year > 1994 & (data.ActualElapsedTime - data.CRSElapsedTime)...
    > 1.50 * data.CRSElapsedTime;
  intermVal = data(idx,:);

  add(intermKVStore,'Null',intermVal);
end

リデューサーは、マッパーからのサブセット化された観測値を受け取り、単一のテーブルに連結するだけです。リデューサーは、1 つのキー (あまり意味はありません) と 1 つの値 (連結されたテーブル) を返します。

reduce 関数のファイルを表示します。

function subsettingReducer(~, intermValList, outKVStore)
  % get all intermediate results from the list
  outVal = {};

  while hasnext(intermValList)
    outVal = [outVal; getnext(intermValList)];
  end
  % Note that this approach assumes the concatenated intermediate values (the
  % subset of the whole data) fit in memory.
    
  add(outKVStore, 'Null', outVal);
end

mapreduce を使用して、map 関数および reduce 関数をデータストア ds に適用します。

result = mapreduce(ds, @subsettingMapper, @subsettingReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%

mapreduce は、現在のフォルダー内のファイルで出力データストア result を返します。

結果の表示

データセットから引き出された最初の 10 個の変数のパターンを調べます。これらの変数は、基本的な遅延時間情報のほかに、航空会社、目的地および到着空港を含みます。

r = readall(result);
tbl = r.Value{1};
tbl(:,1:10)
ans=37×10 table
    Year    Month    DepTime    UniqueCarrier    ActualElapsedTime    CRSElapsedTime    ArrDelay    DepDelay    Origin      Dest  
    ____    _____    _______    _____________    _________________    ______________    ________    ________    _______    _______

    1995      6       1601         {'US'}               162                 58            118          14       {'BWI'}    {'PIT'}
    1996      6       1834         {'CO'}               241                 75            220          54       {'IAD'}    {'EWR'}
    1997      1        730         {'DL'}               110                 43            137          70       {'ATL'}    {'GSP'}
    1997      4       1715         {'UA'}               152                 57            243         148       {'IND'}    {'ORD'}
    1997      9       2232         {'NW'}               143                 50            115          22       {'DTW'}    {'CMH'}
    1997     10       1419         {'CO'}               196                 58            157          19       {'DFW'}    {'IAH'}
    1998      3       2156         {'DL'}               139                 49            146          56       {'TYS'}    {'ATL'}
    1998     10       1803         {'NW'}               291                 81            213           3       {'MSP'}    {'ORD'}
    2000      5        830         {'WN'}               140                 55             85           0       {'DAL'}    {'HOU'}
    2000      8       1630         {'CO'}               357                123            244          10       {'EWR'}    {'CLT'}
    2002      6       1759         {'US'}               260                 67            192          -1       {'LGA'}    {'BOS'}
    2003      3       1214         {'XE'}               214                 84            124          -6       {'GPT'}    {'IAH'}
    2003      3        604         {'XE'}               175                 60            114          -1       {'LFT'}    {'IAH'}
    2003      4       1556         {'MQ'}               142                 52            182          92       {'PIA'}    {'ORD'}
    2003      5       1954         {'US'}               127                 48             78          -1       {'RDU'}    {'CLT'}
    2003      7       1250         {'FL'}               261                 95            166           0       {'ATL'}    {'IAD'}
      ⋮

最初のレコードを確認して、予定出発時刻より 14 分遅れてゲートから出発し、118 分遅れで到着した U.S. Air のフライトを調べます。このフライトは、ゲートからの離陸時間後の遅延時間として、ActualElapsedTimeCRSElapsedTime の差の 104 分の遅延を生じました。

これは異常なレコードです。2006 年 2 月には、JetBlue のフライトは出発時刻は 3:24 a.m. で飛行経過時間は 1,650 分でしたが、到着遅延時間はわずか 415 分でした。これはデータ入力エラーの可能性があります。

この他には、遅延時間が極端に大きいフライトがいつどこで生じるかについての明確なパターンはありません。目立った航空会社、時期、時刻および空港はありません。冬季のオヘア空港など、直観的なパターンは確かに存在します。

遅延のパターン

1995 年から、航空会社のシステム パフォーマンス データにはフライトの地上走行フェーズでどれだけ遅延が生じたかの測定値が含まれるようになりました。また 2003 年には、一部の遅延理由も含まれるようになりました。

これら 2 つの変数を詳細に調べます。

tbl(:,[1,7,8,11:end])
ans=37×8 table
    Year    ArrDelay    DepDelay    TaxiIn    TaxiOut    CarrierDelay    WeatherDelay    NASDelay
    ____    ________    ________    ______    _______    ____________    ____________    ________

    1995      118          14          7        101          NaN             NaN           NaN   
    1996      220          54         12        180          NaN             NaN           NaN   
    1997      137          70          2         12          NaN             NaN           NaN   
    1997      243         148          4         38          NaN             NaN           NaN   
    1997      115          22          4         98          NaN             NaN           NaN   
    1997      157          19          6         95          NaN             NaN           NaN   
    1998      146          56          9         47          NaN             NaN           NaN   
    1998      213           3         11        205          NaN             NaN           NaN   
    2000       85           0          5         51          NaN             NaN           NaN   
    2000      244          10          4        273          NaN             NaN           NaN   
    2002      192          -1          6        217          NaN             NaN           NaN   
    2003      124          -6         13        131          NaN             NaN           NaN   
    2003      114          -1          8        106          NaN             NaN           NaN   
    2003      182          92          9        106          NaN             NaN           NaN   
    2003       78          -1          5         90          NaN             NaN           NaN   
    2003      166           0         11        170            0               0           166   
      ⋮

遅延時間が極端に大きいフライトでは、遅延の大部分は誘導滑走での走行中に生じています。さらに、遅延の主な原因は NASDelay です。NAS 遅延は、目的地の空港が、フライトの予定到着時刻に到着する予定のすべてオプションの便を処理できないと予想される場合に、国家航空当局がホールドを命じるものです。任意の特定時刻に実施されている NAS 遅延プログラムは、https://nasstatus.faa.gov/ に投稿されます。

NAS 遅延の実施中は、飛行機への搭乗もできれば遅らせます。このような遅延は出発の遅延として現れます。しかし、この例で選択したフライトのほとんどでは、遅延は主にゲートからの出発後に生じ、地上走行の遅延となります。

MapReduce の再実行

前の map 関数の例では、サブセット化の基準は関数ファイルに組み込まれていました。所定の日にサンフランシスコを出発するフライトなど、新しいクエリに対しては、新しい map 関数を作成する必要があります。

汎用的なマッパーは、サブセット化の基準を map 関数の定義から独立させ、各クエリに対するマッパーを構成する無名関数を使用することにより、より順応性が高くなります。この汎用マッパーでは、目的のクエリ変数を提供する 4 番目の入力引数を使用します。

汎用の map 関数ファイルを表示します。

function subsettingMapperGeneric(data, ~, intermKVStore, subsetter)
  intermKey = 'Null';
  intermVal = data(subsetter(data), :);
  add(intermKVStore,intermKey,intermVal);
end

subsettingMapper でハードコード化されているものと同じ行の選択を実行する、無名関数を作成します。

inFlightDelay150percent = ...
   @(data) data.Year > 1994 & ...
   (data.ActualElapsedTime-data.CRSElapsedTime) > 1.50*data.CRSElapsedTime;

関数 mapreduce では、map 関数と reduce 関数が厳密に 3 つの入力を受け入れる必要があるため、別の無名関数を使用してマッパー subsettingMapperGeneric に 4 番目の入力を指定します。そうすれば、この無名関数を使用し、3 つの引数のみを使って subsettingMapperGeneric を呼び出すことができます (4 番目は暗黙的)。

configuredMapper = ...
    @(data, info, intermKVStore) subsettingMapperGeneric(data, info, ...
    intermKVStore, inFlightDelay150percent);

mapreduce を使用して、汎用的な map 関数を入力データストアに適用します。

result2 = mapreduce(ds, configuredMapper, @subsettingReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%

mapreduce は、現在のフォルダー内のファイルで出力データストア result2 を返します。

結果の確認

汎用的なマッパーが、組み込まれたサブセット化のロジックと同じ結果が得られることを確認します。

r2 = readall(result2);
tbl2 = r2.Value{1};

if isequaln(tbl, tbl2)
    disp('Same results with the configurable mapper.')
else
    disp('Oops, back to the drawing board.')
end
Same results with the configurable mapper.

参考

|

関連するトピック