このページの翻訳は最新ではありません。ここをクリックして、英語の最新版を参照してください。
MapReduce を使用する簡単なデータのサブセット化
次の例では、大規模なデータセットのサブセットを抽出する方法を示します。
サブセット化またはクエリの実行には 2 つの面があります。1 つは、データセット内の変数 (列) のサブセットを選択することです。もう 1 つは、観測値、すなわち行のサブセットを選択することです。
次の例では、変数の選択がデータストアの定義時に起こります (map 関数はこれ以上の変数の選択は実行できませんが、この例の範囲内ではありません)。この例では、map 関数の役割は観測値の選択を実行することです。reduce 関数の役割は、map 関数への呼び出しごとに抽出された、サブセット化されたレコードを連結することです。このアプローチでは、Map フェーズ後にデータセットがメモリに収まることを前提としています。
データの準備
airlinesmall.csv
データセットを使用してデータストアを作成します。この 12 MB のデータセットには、到着時間と出発時間を含む、いくつかの航空会社のフライト情報が 29 列に含まれます。この例ではデータ内の 29 個の変数のうち、15 変数を使用します。
ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedVariableNames = ds.VariableNames([1 2 5 9 12 13 15 16 17 ... 18 20 21 25 26 27]); ds.SelectedVariableNames
ans = 1x15 cell
Columns 1 through 4
{'Year'} {'Month'} {'DepTime'} {'UniqueCarrier'}
Columns 5 through 8
{'ActualElapsedTime'} {'CRSElapsedTime'} {'ArrDelay'} {'DepDelay'}
Columns 9 through 13
{'Origin'} {'Dest'} {'TaxiIn'} {'TaxiOut'} {'CarrierDelay'}
Columns 14 through 15
{'WeatherDelay'} {'NASDelay'}
データストアは、既定では 'NA'
値を欠損として扱い、欠損値を NaN
値に置換します。さらに、SelectedVariableNames
プロパティを使用すると、指定された対象の変数のみを処理して、preview
を使用して確認できます。
preview(ds)
ans=8×15 table
Year Month DepTime UniqueCarrier ActualElapsedTime CRSElapsedTime ArrDelay DepDelay Origin Dest TaxiIn TaxiOut CarrierDelay WeatherDelay NASDelay
____ _____ _______ _____________ _________________ ______________ ________ ________ _______ _______ ______ _______ ____________ ____________ ________
1987 10 642 {'PS'} 53 57 8 12 {'LAX'} {'SJC'} NaN NaN NaN NaN NaN
1987 10 1021 {'PS'} 63 56 8 1 {'SJC'} {'BUR'} NaN NaN NaN NaN NaN
1987 10 2055 {'PS'} 83 82 21 20 {'SAN'} {'SMF'} NaN NaN NaN NaN NaN
1987 10 1332 {'PS'} 59 58 13 12 {'BUR'} {'SJC'} NaN NaN NaN NaN NaN
1987 10 629 {'PS'} 77 72 4 -1 {'SMF'} {'LAX'} NaN NaN NaN NaN NaN
1987 10 1446 {'PS'} 61 65 59 63 {'LAX'} {'SJC'} NaN NaN NaN NaN NaN
1987 10 928 {'PS'} 84 79 3 -2 {'SAN'} {'SFO'} NaN NaN NaN NaN NaN
1987 10 859 {'PS'} 155 143 11 -1 {'SEA'} {'LAX'} NaN NaN NaN NaN NaN
mapreduce の実行
関数 mapreduce
は、入力として map 関数と reduce 関数を必要とします。マッパーはデータのブロックを受け取って中間結果を出力します。リデューサーは中間結果を読み取って最終結果を生成します。
次の例では、マッパーはデータストア内の SelectedVariableNames
プロパティで記述される変数をもつ table を受け取ります。次に、ゲートからの離陸時間後の遅延時間が大きかったフライトを抽出します。具体的には、予定時間の 2.5 倍を超える時間のフライトを特定します。この例で対象とする変数の一部が、1995 年より前の年には収集されていなかったため、マッパーではその年より前のデータを無視します。
map 関数のファイルを表示します。
function subsettingMapper(data, ~, intermKVStore) % Select flights from 1995 and later that had exceptionally long % elapsed flight times (including both time on the tarmac and time in % the air). idx = data.Year > 1994 & (data.ActualElapsedTime - data.CRSElapsedTime)... > 1.50 * data.CRSElapsedTime; intermVal = data(idx,:); add(intermKVStore,'Null',intermVal); end
リデューサーは、マッパーからのサブセット化された観測値を受け取り、単一のテーブルに連結するだけです。リデューサーは、1 つのキー (あまり意味はありません) と 1 つの値 (連結されたテーブル) を返します。
reduce 関数のファイルを表示します。
function subsettingReducer(~, intermValList, outKVStore) % get all intermediate results from the list outVal = {}; while hasnext(intermValList) outVal = [outVal; getnext(intermValList)]; end % Note that this approach assumes the concatenated intermediate values (the % subset of the whole data) fit in memory. add(outKVStore, 'Null', outVal); end
mapreduce
を使用して、map 関数および reduce 関数をデータストア ds
に適用します。
result = mapreduce(ds, @subsettingMapper, @subsettingReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
mapreduce
は、現在のフォルダー内のファイルで出力データストア result
を返します。
結果の表示
データセットから引き出された最初の 10 個の変数のパターンを調べます。これらの変数は、基本的な遅延時間情報のほかに、航空会社、目的地および到着空港を含みます。
r = readall(result); tbl = r.Value{1}; tbl(:,1:10)
ans=37×10 table
Year Month DepTime UniqueCarrier ActualElapsedTime CRSElapsedTime ArrDelay DepDelay Origin Dest
____ _____ _______ _____________ _________________ ______________ ________ ________ _______ _______
1995 6 1601 {'US'} 162 58 118 14 {'BWI'} {'PIT'}
1996 6 1834 {'CO'} 241 75 220 54 {'IAD'} {'EWR'}
1997 1 730 {'DL'} 110 43 137 70 {'ATL'} {'GSP'}
1997 4 1715 {'UA'} 152 57 243 148 {'IND'} {'ORD'}
1997 9 2232 {'NW'} 143 50 115 22 {'DTW'} {'CMH'}
1997 10 1419 {'CO'} 196 58 157 19 {'DFW'} {'IAH'}
1998 3 2156 {'DL'} 139 49 146 56 {'TYS'} {'ATL'}
1998 10 1803 {'NW'} 291 81 213 3 {'MSP'} {'ORD'}
2000 5 830 {'WN'} 140 55 85 0 {'DAL'} {'HOU'}
2000 8 1630 {'CO'} 357 123 244 10 {'EWR'} {'CLT'}
2002 6 1759 {'US'} 260 67 192 -1 {'LGA'} {'BOS'}
2003 3 1214 {'XE'} 214 84 124 -6 {'GPT'} {'IAH'}
2003 3 604 {'XE'} 175 60 114 -1 {'LFT'} {'IAH'}
2003 4 1556 {'MQ'} 142 52 182 92 {'PIA'} {'ORD'}
2003 5 1954 {'US'} 127 48 78 -1 {'RDU'} {'CLT'}
2003 7 1250 {'FL'} 261 95 166 0 {'ATL'} {'IAD'}
⋮
最初のレコードを確認して、予定出発時刻より 14 分遅れてゲートから出発し、118 分遅れで到着した U.S. Air のフライトを調べます。このフライトは、ゲートからの離陸時間後の遅延時間として、ActualElapsedTime
と CRSElapsedTime
の差の 104 分の遅延を生じました。
これは異常なレコードです。2006 年 2 月には、JetBlue のフライトは出発時刻は 3:24 a.m. で飛行経過時間は 1,650 分でしたが、到着遅延時間はわずか 415 分でした。これはデータ入力エラーの可能性があります。
この他には、遅延時間が極端に大きいフライトがいつどこで生じるかについての明確なパターンはありません。目立った航空会社、時期、時刻および空港はありません。冬季のオヘア空港など、直観的なパターンは確かに存在します。
遅延のパターン
1995 年から、航空会社のシステム パフォーマンス データにはフライトの地上走行フェーズでどれだけ遅延が生じたかの測定値が含まれるようになりました。また 2003 年には、一部の遅延理由も含まれるようになりました。
これら 2 つの変数を詳細に調べます。
tbl(:,[1,7,8,11:end])
ans=37×8 table
Year ArrDelay DepDelay TaxiIn TaxiOut CarrierDelay WeatherDelay NASDelay
____ ________ ________ ______ _______ ____________ ____________ ________
1995 118 14 7 101 NaN NaN NaN
1996 220 54 12 180 NaN NaN NaN
1997 137 70 2 12 NaN NaN NaN
1997 243 148 4 38 NaN NaN NaN
1997 115 22 4 98 NaN NaN NaN
1997 157 19 6 95 NaN NaN NaN
1998 146 56 9 47 NaN NaN NaN
1998 213 3 11 205 NaN NaN NaN
2000 85 0 5 51 NaN NaN NaN
2000 244 10 4 273 NaN NaN NaN
2002 192 -1 6 217 NaN NaN NaN
2003 124 -6 13 131 NaN NaN NaN
2003 114 -1 8 106 NaN NaN NaN
2003 182 92 9 106 NaN NaN NaN
2003 78 -1 5 90 NaN NaN NaN
2003 166 0 11 170 0 0 166
⋮
遅延時間が極端に大きいフライトでは、遅延の大部分は誘導滑走での走行中に生じています。さらに、遅延の主な原因は NASDelay です。NAS 遅延は、目的地の空港が、フライトの予定到着時刻に到着する予定のすべてオプションの便を処理できないと予想される場合に、国家航空当局がホールドを命じるものです。任意の特定時刻に実施されている NAS 遅延プログラムは、https:/
/
nasstatus.
faa.
gov/
に投稿されます。
NAS 遅延の実施中は、飛行機への搭乗もできれば遅らせます。このような遅延は出発の遅延として現れます。しかし、この例で選択したフライトのほとんどでは、遅延は主にゲートからの出発後に生じ、地上走行の遅延となります。
MapReduce の再実行
前の map 関数の例では、サブセット化の基準は関数ファイルに組み込まれていました。所定の日にサンフランシスコを出発するフライトなど、新しいクエリに対しては、新しい map 関数を作成する必要があります。
汎用的なマッパーは、サブセット化の基準を map 関数の定義から独立させ、各クエリに対するマッパーを構成する無名関数を使用することにより、より順応性が高くなります。この汎用マッパーでは、目的のクエリ変数を提供する 4 番目の入力引数を使用します。
汎用の map 関数ファイルを表示します。
function subsettingMapperGeneric(data, ~, intermKVStore, subsetter) intermKey = 'Null'; intermVal = data(subsetter(data), :); add(intermKVStore,intermKey,intermVal); end
subsettingMapper
でハードコード化されているものと同じ行の選択を実行する、無名関数を作成します。
inFlightDelay150percent = ... @(data) data.Year > 1994 & ... (data.ActualElapsedTime-data.CRSElapsedTime) > 1.50*data.CRSElapsedTime;
関数 mapreduce
では、map 関数と reduce 関数が厳密に 3 つの入力を受け入れる必要があるため、別の無名関数を使用してマッパー subsettingMapperGeneric
に 4 番目の入力を指定します。そうすれば、この無名関数を使用し、3 つの引数のみを使って subsettingMapperGeneric
を呼び出すことができます (4 番目は暗黙的)。
configuredMapper = ... @(data, info, intermKVStore) subsettingMapperGeneric(data, info, ... intermKVStore, inFlightDelay150percent);
mapreduce
を使用して、汎用的な map 関数を入力データストアに適用します。
result2 = mapreduce(ds, configuredMapper, @subsettingReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
mapreduce
は、現在のフォルダー内のファイルで出力データストア result2
を返します。
結果の確認
汎用的なマッパーが、組み込まれたサブセット化のロジックと同じ結果が得られることを確認します。
r2 = readall(result2); tbl2 = r2.Value{1}; if isequaln(tbl, tbl2) disp('Same results with the configurable mapper.') else disp('Oops, back to the drawing board.') end
Same results with the configurable mapper.
ローカル関数
ここに挙げるのは、mapreduce
がデータに適用する map 関数と reduce 関数です。
function subsettingMapper(data, ~, intermKVStore) % Select flights from 1995 and later that had exceptionally long % elapsed flight times (including both time on the tarmac and time in % the air). idx = data.Year > 1994 & (data.ActualElapsedTime - data.CRSElapsedTime)... > 1.50 * data.CRSElapsedTime; intermVal = data(idx,:); add(intermKVStore,'Null',intermVal); end %------------------------------------------------------------------------- function subsettingReducer(~, intermValList, outKVStore) % get all intermediate results from the list outVal = {}; while hasnext(intermValList) outVal = [outVal; getnext(intermValList)]; end % Note that this approach assumes the concatenated intermediate values (the % subset of the whole data) fit in memory. add(outKVStore, 'Null', outVal); end %------------------------------------------------------------------------- function subsettingMapperGeneric(data, ~, intermKVStore, subsetter) intermKey = 'Null'; intermVal = data(subsetter(data), :); add(intermKVStore,intermKey,intermVal); end %-------------------------------------------------------------------------
参考
mapreduce
| tabularTextDatastore