Main Content

このページの翻訳は最新ではありません。ここをクリックして、英語の最新版を参照してください。

MapReduce を使用した MATLAB でのビッグ データの解析

この例では、関数 mapreduce を使用して、ファイルベースのデータを大量に処理する方法を説明します。MapReduce アルゴリズムは、現在の多くの "ビッグ データ" アプリケーションの主軸となっています。この例では単一のコンピューターで処理が行われますが、コードは Hadoop® を使用するようにスケールアップできます。

この例全体にわたって使用するデータセットは、1987 ~ 2008 年における米国の国内航空路線便に関するアメリカ統計学会の記録を集めたものです。以前に "ビッグ データ" に取り組んだ経験があれば、このデータセットについて既にご存知かもしれません。MATLAB® にはこのデータセットの小さなサブセットが含まれているため、この例および他の例を実行することができます。

データストアの紹介

データストアを作成すると、ブロックベースの方式でデータのコレクションにアクセスできます。データストアは、大量のデータを任意に処理することができます。データが複数のファイルに散在していても構いません。データストアは、表形式のテキスト ファイルのコレクション (ここで例示)、SQL データベース (Database Toolbox™ が必要)、Hadoop® 分散ファイル システム (HDFS™) など、多くのファイル タイプについて作成できます。

表形式のテキスト ファイルのコレクションに対し datastore を作成して、その内容をプレビューします。

ds = tabularTextDatastore('airlinesmall.csv');
dsPreview = preview(ds);
dsPreview(:,10:15)
ans=8×6 table
    FlightNum    TailNum    ActualElapsedTime    CRSElapsedTime    AirTime    ArrDelay
    _________    _______    _________________    ______________    _______    ________

      1503       {'NA'}             53                 57          {'NA'}         8   
      1550       {'NA'}             63                 56          {'NA'}         8   
      1589       {'NA'}             83                 82          {'NA'}        21   
      1655       {'NA'}             59                 58          {'NA'}        13   
      1702       {'NA'}             77                 72          {'NA'}         4   
      1729       {'NA'}             61                 65          {'NA'}        59   
      1763       {'NA'}             84                 79          {'NA'}         3   
      1800       {'NA'}            155                143          {'NA'}        11   

datastore は自動的に入力データを解析し、各列のデータ型について最適な推定を行います。ここでは、名前と値のペアの引数 'TreatAsMissing' を使用して、欠損値を正しく置き換えます。数値変数 ('AirTime' など) の場合、tabularTextDatastore によってすべての 'NA' インスタンスが、非数 (Not-a-Number) の IEEE 算術表現である NaN 値に置き換えられます。

ds = tabularTextDatastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'TailNum')} = '%s';
ds.SelectedFormats{strcmp(ds.SelectedVariableNames, 'CancellationCode')} = '%s';
dsPreview = preview(ds);
dsPreview(:,{'AirTime','TaxiIn','TailNum','CancellationCode'})
ans=8×4 table
    AirTime    TaxiIn    TailNum    CancellationCode
    _______    ______    _______    ________________

      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     
      NaN       NaN      {'NA'}          {'NA'}     

目的の行のスキャン

データストア オブジェクトには、関数 read が次に返すデータのブロックを追跡するための内部ポインターが含まれています。関数 hasdata および read を使用してデータセット全体を段階的に処理し、データセットをフィルター処理して目的の行のみを抽出します。ここでは、目的の行は Boston ("BOS") 発の United Airlines ("UA") のフライトです。

subset = [];

while hasdata(ds)
    t = read(ds);
    t = t(strcmp(t.UniqueCarrier, 'UA') & strcmp(t.Origin, 'BOS'), :);
    subset = vertcat(subset, t);
end

subset(1:10,[9,10,15:17])
ans=10×5 table
    UniqueCarrier    FlightNum    ArrDelay    DepDelay    Origin 
    _____________    _________    ________    ________    _______

       {'UA'}           121          -9           0       {'BOS'}
       {'UA'}          1021          -9          -1       {'BOS'}
       {'UA'}           519          15           8       {'BOS'}
       {'UA'}           354           9           8       {'BOS'}
       {'UA'}           701         -17           0       {'BOS'}
       {'UA'}           673          -9          -1       {'BOS'}
       {'UA'}            91          -3           2       {'BOS'}
       {'UA'}           335          18           4       {'BOS'}
       {'UA'}          1429           1          -2       {'BOS'}
       {'UA'}            53          52          13       {'BOS'}

mapreduce の紹介

MapReduce は、ビッグ データの問題を "分割統治" するためのアルゴリズム手法です。MATLAB では、mapreduce に 3 つの入力引数が必要です。

  1. データの読み取り元のデータストア

  2. 処理するデータのサブセットが与えられる "マッパー" 関数。map 関数の出力は部分的な計算値です。mapreduce は、データストアのブロックごとに 1 回マッパー関数を呼び出し、それぞれの呼び出しは独立して処理されます。

  3. マッパー関数からの集計出力が与えられる "リデューサー" 関数。リデューサー関数は、マッパー関数で開始された計算を完了させ、最終的な解答を出力します。

この説明はいくぶん省略が過ぎていますが、補足すると、マッパー関数の呼び出しの出力は、リデューサー関数に渡す前に興味深い方法でシャッフルして結合できます。これについては、この例の後半で検証します。

mapreduce を使用した計算の実行

mapreduce の単純な使用例は、航空路線のデータセット全体で最長のフライト時間を検索することです。このためには、以下のことを行います。

  1. "マッパー" 関数は、データストアからのブロックごとに最大値を計算します。

  2. 次に "リデューサー" 関数が、マッパー関数の呼び出しで計算されたすべての最大値の中での最大値を計算します。

まず、データストアをリセットし、変数をフィルター処理して目的の 1 列のみを抽出します。

reset(ds);
ds.SelectedVariableNames = {'ActualElapsedTime'};

マッパー関数 maxTimeMapper.m を作成します。これは 3 つの入力引数をとります。

  1. 入力データ。これは、関数 read をデータストアに適用して取得した table です。

  2. 構成およびコンテキスト情報のコレクション、info。これは多くの場合無視でき、それはここでも該当します。

  3. 中間データ ストレージ オブジェクト。マッパー関数からの計算結果を記録します。関数 add を使用して、キーと値のペアをこの中間出力に追加します。この例では、キーの名前 ('MaxElapsedTime') は任意です。

現在のフォルダーに次のマッパー関数 (maxTimeMapper.m) を保存します。

function maxTimeMapper(data, ~, intermKVStore)
  maxElapsedTime = max(data{:,:});
  add(intermKVStore, "MaxElapsedTime", maxElapsedTime)
end

次に、リデューサー関数を作成します。これも 3 つの入力引数をとります。

  1. 一連の入力 "キー"。キーについては以下で詳細を説明しますが、この例のような単純な問題では無視できます。

  2. mapreduce がリデューサー関数に渡す中間データ入力オブジェクト。このデータはキーと値のペアの形式をもち、関数 hasnext および getnext を使用して各キーの値を反復処理します。

  3. 最終的な出力データ ストレージ オブジェクト。関数 add および addmulti を使用して、キーと値のペアを出力に直接追加します。

現在のフォルダーに次のリデューサー関数 (maxTimeReducer.m) を保存します。

function maxTimeReducer(~, intermValsIter, outKVStore)
  maxElapsedTime = -Inf;
  while(hasnext(intermValsIter))
    maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter));
  end
  add(outKVStore, "MaxElapsedTime", maxElapsedTime);
end

マッパー関数およびリデューサー関数を作成して現在のフォルダーに保存すると、データストア、マッパー関数およびリデューサー関数を使用して mapreduce を呼び出すことができます。Parallel Computing Toolbox (PCT) を使用している場合、MATLAB は自動的にプールを起動し、実行を並列化します。関数 readall を使用して、MapReduce アルゴリズムの結果を表示します。

result = mapreduce(ds, @maxTimeMapper, @maxTimeReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%
readall(result)
ans=1×2 table
           Key             Value  
    __________________    ________

    {'MaxElapsedTime'}    {[1650]}

mapreduce でのキーの使用

キーの使用は mapreduce の重要かつ強力な機能です。マッパー関数のそれぞれの呼び出しは、キーと呼ばれる、名前の付いた 1 つ以上の "バケット" に中間結果を追加します。mapreduce によりマッパー関数が呼び出される回数は、データストア内のブロックの数に対応します。

マッパー関数で値を複数のキーに追加すると、リデューサー関数を複数回呼び出すことになり、それぞれの呼び出しでは 1 つのキーの中間値のみを処理します。関数 mapreduce は、アルゴリズムのマップとリデュースのフェーズ間におけるデータ移動を自動的に管理します。

多くの場面で、この柔軟性は有用です。以下の例では、わかりやすくするために比較的明白な方法でキーを使用します。

mapreduce を使用したグループごとの指標の計算

この適用例におけるマッパー関数の動作はより複雑です。入力データの航空会社ごとに、関数 add を使用して値のベクトルを追加します。このベクトルは、21 年以上のデータにおける、その航空会社の毎日のフライト回数です。航空会社のコードがこの値のベクトルのキーになります。これにより、mapreduce がキーをリデューサー関数に渡す際、各航空会社のすべてのデータが必ずグループにまとめられるようになります。

現在のフォルダーに次のマッパー関数 (countFlightsMapper.m) を保存します。

function countFlightsMapper(data, ~, intermKVStore)
  dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

  for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
  end
end

リデューサー関数は、簡単になります。中間値を反復処理して、ベクトルに追加していくだけです。終了時に、この集計ベクトルの値が出力されます。リデューサー関数が intermediateKeysIn 値の並べ替えや検証を行う必要はありません。mapreduce がリデューサー関数を呼び出すごとに、1 つの航空会社の値だけが渡されます。

現在のフォルダーに次のリデューサー関数 (countFlightsReducer.m) を保存します。

function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  dayArray = zeros(daysSinceEpoch, 1);

  while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
  end
  add(outKVStore, intermKeysIn, dayArray);
end

データストアをリセットして、目的の変数を選択します。マッパー関数およびリデューサー関数を作成して現在のフォルダーに保存すると、データストア、マッパー関数およびリデューサー関数を使用して mapreduce を呼び出すことができます。

reset(ds);
ds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'};
result = mapreduce(ds, @countFlightsMapper, @countFlightsReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce  10%
Map 100% Reduce  21%
Map 100% Reduce  31%
Map 100% Reduce  41%
Map 100% Reduce  52%
Map 100% Reduce  62%
Map 100% Reduce  72%
Map 100% Reduce  83%
Map 100% Reduce  93%
Map 100% Reduce 100%
result = readall(result);

サンプル データセットのみを指定してこの例を実行した場合に備えて、データセット全体で実行された mapreduce アルゴリズムの結果を読み込みます。

load airlineResults

結果の可視化

上位 7 社の航空会社のみを使用し、データを平滑化して週末旅行の影響を除去します。これを行わないと、可視化はうまく整理されません。

lines = result.Value;
lines = horzcat(lines{:});
[~,sortOrder] = sort(sum(lines), 'descend');
lines = lines(:,sortOrder(1:7));
result = result(sortOrder(1:7),:);

lines(lines==0) = nan;
lines = smoothdata(lines,'gaussian');

データをプロットします。

figure('Position',[1 1 800 600]);
plot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines,'LineWidth',2)
title ('Domestic airline flights per day per carrier')
xlabel('Date')
ylabel('Flights per day')
legend(result.Key, 'Location', 'Best')

Figure contains an axes object. The axes object with title Domestic airline flights per day per carrier contains 7 objects of type line. These objects represent DL, WN, AA, US, UA, NW, CO.

このプロットは、この時期における Southwest Airlines (WN) の台頭を示しています。

さらに学ぶには

この例では、mapreduce で処理できる事柄のごく表面に触れたにすぎません。これを Hadoop や MATLAB® Parallel Server™ と併用する方法などの詳細については、mapreduce のドキュメンテーションを参照してください。

ローカル関数

ここに挙げられているのは、mapreduce がデータに適用するローカル関数です。

function maxTimeMapper(data, ~, intermKVStore)
  maxElapsedTime = max(data{:,:});
  add(intermKVStore, "MaxElapsedTime", maxElapsedTime)
end
%-----------------------------------------------------------------------
function maxTimeReducer(~, intermValsIter, outKVStore)
  maxElapsedTime = -Inf;
  while(hasnext(intermValsIter))
    maxElapsedTime = max(maxElapsedTime, getnext(intermValsIter));
  end
  add(outKVStore, "MaxElapsedTime", maxElapsedTime);
end
%-----------------------------------------------------------------------
function countFlightsMapper(data, ~, intermKVStore)
  dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier, 'stable');

  for i = 1:numel(airlineName)
    dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);
    add(intermKVStore, airlineName{i}, dayTotals);
  end
end
%-----------------------------------------------------------------------
function countFlightsReducer(intermKeysIn, intermValsIter, outKVStore)
  daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;
  dayArray = zeros(daysSinceEpoch, 1);

  while hasnext(intermValsIter)
    dayArray = dayArray + getnext(intermValsIter);
  end
  add(outKVStore, intermKeysIn, dayArray);
end
%-----------------------------------------------------------------------

参考

|

関連するトピック