このページの翻訳は最新ではありません。ここをクリックして、英語の最新版を参照してください。
MapReduce を使用した MATLAB でのビッグ データの解析
この例では、関数 mapreduce
を使用して、ファイルベースのデータを大量に処理する方法を説明します。MapReduce アルゴリズムは、現在の多くの "ビッグ データ" アプリケーションの主軸となっています。この例では単一のコンピューターで処理が行われますが、コードは Hadoop® を使用するようにスケールアップできます。
この例全体にわたって使用するデータセットは、1987 ~ 2008 年における米国の国内航空路線便に関するアメリカ統計学会の記録を集めたものです。以前に "ビッグ データ" に取り組んだ経験があれば、このデータセットについて既にご存知かもしれません。MATLAB® にはこのデータセットの小さなサブセットが含まれているため、この例および他の例を実行することができます。
データストアの紹介
データストアを作成すると、ブロックベースの方式でデータのコレクションにアクセスできます。データストアは、大量のデータを任意に処理することができます。データが複数のファイルに散在していても構いません。データストアは、表形式のテキスト ファイルのコレクション (ここで例示)、SQL データベース (Database Toolbox™ が必要)、Hadoop® 分散ファイル システム (HDFS™) など、多くのファイル タイプについて作成できます。
表形式のテキスト ファイルのコレクションに対し datastore を作成して、その内容をプレビューします。
ds = tabularTextDatastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
FlightNum TailNum ActualElapsedTime CRSElapsedTime AirTime ArrDelay
_________ _______ _________________ ______________ _______ ________
1503 {'NA'} 53 57 {'NA'} 8
1550 {'NA'} 63 56 {'NA'} 8
1589 {'NA'} 83 82 {'NA'} 21
1655 {'NA'} 59 58 {'NA'} 13
1702 {'NA'} 77 72 {'NA'} 4
1729 {'NA'} 61 65 {'NA'} 59
1763 {'NA'} 84 79 {'NA'} 3
1800 {'NA'} 155 143 {'NA'} 11
datastore は自動的に入力データを解析し、各列のデータ型について最適な推定を行います。ここでは、名前と値のペアの引数 'TreatAsMissing'
を使用して、欠損値を正しく置き換えます。数値変数 ('AirTime'
など) の場合、tabularTextDatastore
によってすべての 'NA'
インスタンスが、非数 (Not-a-Number) の IEEE 算術表現である NaN
値に置き換えられます。
ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA'); ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s'; ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s'; dsPreview = preview(ds); dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
AirTime TaxiIn TailNum CancellationCode
_______ ______ _______ ________________
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
NaN NaN {'NA'} {'NA'}
目的の行のスキャン
データストア オブジェクトには、関数 read
が次に返すデータのブロックを追跡するための内部ポインターが含まれています。関数 hasdata
および read
を使用してデータセット全体を段階的に処理し、データセットをフィルター処理して目的の行のみを抽出します。ここでは、目的の行は Boston ("BOS") 発の United Airlines ("UA") のフライトです。
subset = []; while hasdata(ds) t = read(ds); t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :); subset = vertcat(subset, t); end subset(1:10,[9,10,15:17])
ans=10×5 table
UniqueCarrier FlightNum ArrDelay DepDelay Origin
_____________ _________ ________ ________ _______
{'UA'} 121 -9 0 {'BOS'}
{'UA'} 1021 -9 -1 {'BOS'}
{'UA'} 519 15 8 {'BOS'}
{'UA'} 354 9 8 {'BOS'}
{'UA'} 701 -17 0 {'BOS'}
{'UA'} 673 -9 -1 {'BOS'}
{'UA'} 91 -3 2 {'BOS'}
{'UA'} 335 18 4 {'BOS'}
{'UA'} 1429 1 -2 {'BOS'}
{'UA'} 53 52 13 {'BOS'}
mapreduce
の紹介
MapReduce は、ビッグ データの問題を "分割統治" するためのアルゴリズム手法です。MATLAB では、mapreduce
に 3 つの入力引数が必要です。
データの読み取り元のデータストア
処理するデータのサブセットが与えられる "マッパー" 関数。map 関数の出力は部分的な計算値です。
mapreduce
は、データストアのブロックごとに 1 回マッパー関数を呼び出し、それぞれの呼び出しは独立して処理されます。マッパー関数からの集計出力が与えられる "リデューサー" 関数。リデューサー関数は、マッパー関数で開始された計算を完了させ、最終的な解答を出力します。
この説明はいくぶん省略が過ぎていますが、補足すると、マッパー関数の呼び出しの出力は、リデューサー関数に渡す前に興味深い方法でシャッフルして結合できます。これについては、この例の後半で検証します。
mapreduce
を使用した計算の実行
mapreduce
の単純な使用例は、航空路線のデータセット全体で最長のフライト時間を検索することです。このためには、以下のことを行います。
"マッパー" 関数は、データストアからのブロックごとに最大値を計算します。
次に "リデューサー" 関数が、マッパー関数の呼び出しで計算されたすべての最大値の中での最大値を計算します。
まず、データストアをリセットし、変数をフィルター処理して目的の 1 列のみを抽出します。
reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};
マッパー関数 maxTimeMapper.m
を作成します。これは 3 つの入力引数をとります。
入力データ。これは、関数
read
をデータストアに適用して取得した table です。構成およびコンテキスト情報のコレクション、
info
。これは多くの場合無視でき、それはここでも該当します。中間データ ストレージ オブジェクト。マッパー関数からの計算結果を記録します。関数
add
を使用して、キーと値のペアをこの中間出力に追加します。この例では、キーの名前 ('MaxElapsedTime'
) は任意です。
現在のフォルダーに次のマッパー関数 (maxTimeMapper.m
) を保存します。
function maxTimeMapper(data, ~, intermKVStore) maxElapsedTime = max(data{:,:}); add(intermKVStore, "MaxElapsedTime", maxElapsedTime) end
次に、リデューサー関数を作成します。これも 3 つの入力引数をとります。
一連の入力 "キー"。キーについては以下で詳細を説明しますが、この例のような単純な問題では無視できます。
mapreduce
がリデューサー関数に渡す中間データ入力オブジェクト。このデータはキーと値のペアの形式をもち、関数hasnext
およびgetnext
を使用して各キーの値を反復処理します。最終的な出力データ ストレージ オブジェクト。関数
add
およびaddmulti
を使用して、キーと値のペアを出力に直接追加します。
現在のフォルダーに次のリデューサー関数 (maxTimeReducer.m
) を保存します。
function maxTimeReducer(~, intermValsIter, outKVStore) maxElapsedTime = -Inf; while(hasnext(intermValsIter)) maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter)); end add(outKVStore, "MaxElapsedTime", maxElapsedTime); end
マッパー関数およびリデューサー関数を作成して現在のフォルダーに保存すると、データストア、マッパー関数およびリデューサー関数を使用して mapreduce
を呼び出すことができます。Parallel Computing Toolbox (PCT) を使用している場合、MATLAB は自動的にプールを起動し、実行を並列化します。関数 readall
を使用して、MapReduce アルゴリズムの結果を表示します。
result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 100%
readall(result)
ans=1×2 table
Key Value
__________________ ________
{'MaxElapsedTime'} {[1650]}
mapreduce
でのキーの使用
キーの使用は mapreduce
の重要かつ強力な機能です。マッパー関数のそれぞれの呼び出しは、キーと呼ばれる、名前の付いた 1 つ以上の "バケット" に中間結果を追加します。mapreduce
によりマッパー関数が呼び出される回数は、データストア内のブロックの数に対応します。
マッパー関数で値を複数のキーに追加すると、リデューサー関数を複数回呼び出すことになり、それぞれの呼び出しでは 1 つのキーの中間値のみを処理します。関数 mapreduce
は、アルゴリズムのマップとリデュースのフェーズ間におけるデータ移動を自動的に管理します。
多くの場面で、この柔軟性は有用です。以下の例では、わかりやすくするために比較的明白な方法でキーを使用します。
mapreduce
を使用したグループごとの指標の計算
この適用例におけるマッパー関数の動作はより複雑です。入力データの航空会社ごとに、関数 add
を使用して値のベクトルを追加します。このベクトルは、21 年以上のデータにおける、その航空会社の毎日のフライト回数です。航空会社のコードがこの値のベクトルのキーになります。これにより、mapreduce
がキーをリデューサー関数に渡す際、各航空会社のすべてのデータが必ずグループにまとめられるようになります。
現在のフォルダーに次のマッパー関数 (countFlightsMapper.m
) を保存します。
function countFlightsMapper(data, ~, intermKVStore) dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1; daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable'); for i = 1:numel(airlineName) dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]); add(intermKVStore, airlineName{i}, dayTotals); end end
リデューサー関数は、簡単になります。中間値を反復処理して、ベクトルに追加していくだけです。終了時に、この集計ベクトルの値が出力されます。リデューサー関数が intermediateKeysIn
値の並べ替えや検証を行う必要はありません。mapreduce
がリデューサー関数を呼び出すごとに、1 つの航空会社の値だけが渡されます。
現在のフォルダーに次のリデューサー関数 (countFlightsReducer.m
) を保存します。
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore) daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; dayArray = zeros(daysSinceEpoch, 1); while hasnext(intermValsIter) dayArray = dayArray + getnext(intermValsIter); end add(outKVStore, intermKeysIn, dayArray); end
データストアをリセットして、目的の変数を選択します。マッパー関数およびリデューサー関数を作成して現在のフォルダーに保存すると、データストア、マッパー関数およびリデューサー関数を使用して mapreduce
を呼び出すことができます。
reset(ds); ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'}; result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 16% Reduce 0% Map 32% Reduce 0% Map 48% Reduce 0% Map 65% Reduce 0% Map 81% Reduce 0% Map 97% Reduce 0% Map 100% Reduce 0% Map 100% Reduce 10% Map 100% Reduce 21% Map 100% Reduce 31% Map 100% Reduce 41% Map 100% Reduce 52% Map 100% Reduce 62% Map 100% Reduce 72% Map 100% Reduce 83% Map 100% Reduce 93% Map 100% Reduce 100%
result = readall(result);
サンプル データセットのみを指定してこの例を実行した場合に備えて、データセット全体で実行された mapreduce
アルゴリズムの結果を読み込みます。
load airlineResults
結果の可視化
上位 7 社の航空会社のみを使用し、データを平滑化して週末旅行の影響を除去します。これを行わないと、可視化はうまく整理されません。
lines = result.Value; lines = horzcat(lines{:}); [~,sortOrder] = sort(sum(lines), 'descend'); lines = lines(:,sortOrder(1:7)); result = result(sortOrder(1:7),:); lines(lines==0) = nan; lines = smoothdata(lines,'gaussian');
データをプロットします。
figure('Position',[1 1 800 600]); plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines,'LineWidth',2) title ('Domestic airline flights per day per carrier') xlabel('Date') ylabel('Flights per day') legend(result.Key, 'Location', 'Best')
このプロットは、この時期における Southwest Airlines (WN) の台頭を示しています。
さらに学ぶには
この例では、mapreduce
で処理できる事柄のごく表面に触れたにすぎません。これを Hadoop や MATLAB® Parallel Server™ と併用する方法などの詳細については、mapreduce
のドキュメンテーションを参照してください。
ローカル関数
ここに挙げられているのは、mapreduce
がデータに適用するローカル関数です。
function maxTimeMapper(data, ~, intermKVStore) maxElapsedTime = max(data{:,:}); add(intermKVStore, "MaxElapsedTime", maxElapsedTime) end %----------------------------------------------------------------------- function maxTimeReducer(~, intermValsIter, outKVStore) maxElapsedTime = -Inf; while(hasnext(intermValsIter)) maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter)); end add(outKVStore, "MaxElapsedTime", maxElapsedTime); end %----------------------------------------------------------------------- function countFlightsMapper(data, ~, intermKVStore) dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1; daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable'); for i = 1:numel(airlineName) dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]); add(intermKVStore, airlineName{i}, dayTotals); end end %----------------------------------------------------------------------- function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore) daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1; dayArray = zeros(daysSinceEpoch, 1); while hasnext(intermValsIter) dayArray = dayArray + getnext(intermValsIter); end add(outKVStore, intermKeysIn, dayArray); end %-----------------------------------------------------------------------
参考
mapreduce
| tabularTextDatastore