Simple Data Subsetting Using MapReduce

This example shows how to extract a subset of a large data set.

There are two aspects of subsetting, or performing a query. One is selecting a subset of the variables (columns) in the data set. The other is selecting a subset of the observations, or rows.

In this example, the selection of variables takes place in the definition of the datastore. (The map function could perform a further sub-selection of variables, but that is not within the scope of this example). In this example, the role of the map function is to perform the selection of observations. The role of the reduce function is to concatenate the subsetted records extracted by each call to the map function. This approach assumes that the data set can fit in memory after the Map phase.

Prepare Data

Create a datastore using the airlinesmall.csv data set. This 12-megabyte data set contains 29 columns of flight information for several airline carriers, including arrival and departure times. This example uses 15 variables out of the 29 variables available in the data.

ds = datastore('airlinesmall.csv', 'TreatAsMissing', 'NA');
ds.SelectedVariableNames = ds.VariableNames([1 2 5 9 12 13 15 16 17 ...
    18 20 21 25 26 27]);
ds.SelectedVariableNames
ans = 1x15 cell array
  Columns 1 through 4

    {'Year'}    {'Month'}    {'DepTime'}    {'UniqueCarrier'}

  Columns 5 through 8

    {'ActualElapsedTime'}    {'CRSElapsedTime'}    {'ArrDelay'}    {'DepDelay'}

  Columns 9 through 13

    {'Origin'}    {'Dest'}    {'TaxiIn'}    {'TaxiOut'}    {'CarrierDelay'}

  Columns 14 through 15

    {'WeatherDelay'}    {'NASDelay'}

The datastore treats 'NA' values as missing, and replaces the missing values with NaN values by default. Additionally, the SelectedVariableNames property allows you to work with only the specified variables of interest, which you can verify using preview.

preview(ds)
ans=8×15 table
    Year    Month    DepTime    UniqueCarrier    ActualElapsedTime    CRSElapsedTime    ArrDelay    DepDelay    Origin      Dest      TaxiIn    TaxiOut    CarrierDelay    WeatherDelay    NASDelay
    ____    _____    _______    _____________    _________________    ______________    ________    ________    _______    _______    ______    _______    ____________    ____________    ________

    1987     10        642         {'PS'}                53                 57              8          12       {'LAX'}    {'SJC'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       1021         {'PS'}                63                 56              8           1       {'SJC'}    {'BUR'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       2055         {'PS'}                83                 82             21          20       {'SAN'}    {'SMF'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       1332         {'PS'}                59                 58             13          12       {'BUR'}    {'SJC'}     NaN        NaN          NaN             NaN           NaN   
    1987     10        629         {'PS'}                77                 72              4          -1       {'SMF'}    {'LAX'}     NaN        NaN          NaN             NaN           NaN   
    1987     10       1446         {'PS'}                61                 65             59          63       {'LAX'}    {'SJC'}     NaN        NaN          NaN             NaN           NaN   
    1987     10        928         {'PS'}                84                 79              3          -2       {'SAN'}    {'SFO'}     NaN        NaN          NaN             NaN           NaN   
    1987     10        859         {'PS'}               155                143             11          -1       {'SEA'}    {'LAX'}     NaN        NaN          NaN             NaN           NaN   

Run MapReduce

The mapreduce function requires a map function and a reduce function as inputs. The mapper receives blocks of data and outputs intermediate results. The reducer reads the intermediate results and produces a final result.

In this example, the mapper receives a table with the variables described by the SelectedVariableNames property in the datastore. Then, the mapper extracts flights that had a high amount of delay after pushback from the gate. Specifically, it identifies flights with a duration exceeding 2.5 times the length of the scheduled duration. The mapper ignores flights prior to 1995, because some of the variables of interest for this example were not collected before that year.

Display the map function file.

function subsettingMapper(data, ~, intermKVStore)
  % Select flights from 1995 and later that had exceptionally long
  % elapsed flight times (including both time on the tarmac and time in 
  % the air).
  idx = data.Year > 1994 & (data.ActualElapsedTime - data.CRSElapsedTime)...
    > 1.50 * data.CRSElapsedTime;
  intermVal = data(idx,:);

  add(intermKVStore,'Null',intermVal);
end

The reducer receives the subsetted observations obtained from the mapper and simply concatenates them into a single table. The reducer returns one key (which is relatively meaningless) and one value (the concatenated table).

Display the reduce function file.

function subsettingReducer(~, intermValList, outKVStore)
  % get all intermediate results from the list
  outVal = {};

  while hasnext(intermValList)
    outVal = [outVal; getnext(intermValList)];
  end
  % Note that this approach assumes the concatenated intermediate values (the
  % subset of the whole data) fit in memory.
    
  add(outKVStore, 'Null', outVal);
end

Use mapreduce to apply the map and reduce functions to the datastore, ds.

result = mapreduce(ds, @subsettingMapper, @subsettingReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%

mapreduce returns an output datastore, result, with files in the current folder.

Display Results

Look for patterns in the first 10 variables that were pulled from the data set. These variables identify the airline, the destination, and the arrival airports, as well as some basic delay information.

r = readall(result);
tbl = r.Value{1};
tbl(:,1:10)
ans=37×10 table
    Year    Month    DepTime    UniqueCarrier    ActualElapsedTime    CRSElapsedTime    ArrDelay    DepDelay    Origin      Dest  
    ____    _____    _______    _____________    _________________    ______________    ________    ________    _______    _______

    1995      6       1601         {'US'}               162                 58            118          14       {'BWI'}    {'PIT'}
    1996      6       1834         {'CO'}               241                 75            220          54       {'IAD'}    {'EWR'}
    1997      1        730         {'DL'}               110                 43            137          70       {'ATL'}    {'GSP'}
    1997      4       1715         {'UA'}               152                 57            243         148       {'IND'}    {'ORD'}
    1997      9       2232         {'NW'}               143                 50            115          22       {'DTW'}    {'CMH'}
    1997     10       1419         {'CO'}               196                 58            157          19       {'DFW'}    {'IAH'}
    1998      3       2156         {'DL'}               139                 49            146          56       {'TYS'}    {'ATL'}
    1998     10       1803         {'NW'}               291                 81            213           3       {'MSP'}    {'ORD'}
    2000      5        830         {'WN'}               140                 55             85           0       {'DAL'}    {'HOU'}
    2000      8       1630         {'CO'}               357                123            244          10       {'EWR'}    {'CLT'}
    2002      6       1759         {'US'}               260                 67            192          -1       {'LGA'}    {'BOS'}
    2003      3       1214         {'XE'}               214                 84            124          -6       {'GPT'}    {'IAH'}
    2003      3        604         {'XE'}               175                 60            114          -1       {'LFT'}    {'IAH'}
    2003      4       1556         {'MQ'}               142                 52            182          92       {'PIA'}    {'ORD'}
    2003      5       1954         {'US'}               127                 48             78          -1       {'RDU'}    {'CLT'}
    2003      7       1250         {'FL'}               261                 95            166           0       {'ATL'}    {'IAD'}
      ⋮

Looking at the first record, a U.S. Air flight departed the gate 14 minutes after its scheduled departure time and arrived 118 minutes late. The flight experienced a delay of 104 minutes after pushback from the gate which is the difference between ActualElapsedTime and CRSElapsedTime.

There is one anomalous record. In February of 2006, a JetBlue flight had a departure time of 3:24 a.m. and an elapsed flight time of 1650 minutes, but an arrival delay of only 415 minutes. This might be a data entry error.

Otherwise, there are no clear cut patterns concerning when and where these exceptionally delayed flights occur. No airline, time of year, time of day, or single airport dominates. Some intuitive patterns, such as O'Hare (ORD) in the winter months, are certainly present.

Delay Patterns

Beginning in 1995, the airline system performance data began including measurements of how much delay took place in the taxi phases of a flight. Then, in 2003, the data also began to include certain causes of delay.

Examine these two variables in closer detail.

tbl(:,[1,7,8,11:end])
ans=37×8 table
    Year    ArrDelay    DepDelay    TaxiIn    TaxiOut    CarrierDelay    WeatherDelay    NASDelay
    ____    ________    ________    ______    _______    ____________    ____________    ________

    1995      118          14          7        101          NaN             NaN           NaN   
    1996      220          54         12        180          NaN             NaN           NaN   
    1997      137          70          2         12          NaN             NaN           NaN   
    1997      243         148          4         38          NaN             NaN           NaN   
    1997      115          22          4         98          NaN             NaN           NaN   
    1997      157          19          6         95          NaN             NaN           NaN   
    1998      146          56          9         47          NaN             NaN           NaN   
    1998      213           3         11        205          NaN             NaN           NaN   
    2000       85           0          5         51          NaN             NaN           NaN   
    2000      244          10          4        273          NaN             NaN           NaN   
    2002      192          -1          6        217          NaN             NaN           NaN   
    2003      124          -6         13        131          NaN             NaN           NaN   
    2003      114          -1          8        106          NaN             NaN           NaN   
    2003      182          92          9        106          NaN             NaN           NaN   
    2003       78          -1          5         90          NaN             NaN           NaN   
    2003      166           0         11        170            0               0           166   
      ⋮

For these exceptionally delayed flights, the great majority of delay occurs during taxi out, on the tarmac. Moreover, the major cause of the delay is NASDelay. NAS delays are holds imposed by the national aviation authorities on departures headed for an airport that is forecast to be unable to handle all scheduled arrivals at the time the flight is scheduled to arrive. NAS delay programs in effect at any given time are posted at https://www.fly.faa.gov/ois/.

Preferably, when NAS delays are imposed, boarding of the aircraft is simply delayed. Such a delay would show up as a departure delay. However, for most of the flights selected for this example, the delays took place largely after departure from the gate, leading to a taxi delay.

Rerun MapReduce

The previous map function had the subsetting criteria hard-wired in the function file. A new map function would have to be written for any new query, such as flights departing San Francisco on a given day.

A generic mapper can be more adaptive by separating out the subsetting criteria from the map function definition and using an anonymous function to configure the mapper for each query. This generic mapper uses a fourth input argument that supplies the desired query variable.

Display the generic map function file.

function subsettingMapperGeneric(data, ~, intermKVStore, subsetter)
  intermKey = 'Null';
  intermVal = data(subsetter(data), :);
  add(intermKVStore,intermKey,intermVal);
end

Create an anonymous function that performs the same selection of rows that is hard-coded in subsettingMapper.

inFlightDelay150percent = ...
   @(data) data.Year > 1994 & ...
   (data.ActualElapsedTime-data.CRSElapsedTime) > 1.50*data.CRSElapsedTime;

Since the mapreduce function requires the map and reduce functions to accept exactly three inputs, use another anonymous function to specify the fourth input to the mapper, subsettingMapperGeneric. Subsequently, you can use this anonymous function to call subsettingMapperGeneric using only three arguments (the fourth is implicit).

configuredMapper = ...
    @(data, info, intermKVStore) subsettingMapperGeneric(data, info, ...
    intermKVStore, inFlightDelay150percent);

Use mapreduce to apply the generic map function to the input datastore.

result2 = mapreduce(ds, configuredMapper, @subsettingReducer);
********************************
*      MAPREDUCE PROGRESS      *
********************************
Map   0% Reduce   0%
Map  16% Reduce   0%
Map  32% Reduce   0%
Map  48% Reduce   0%
Map  65% Reduce   0%
Map  81% Reduce   0%
Map  97% Reduce   0%
Map 100% Reduce   0%
Map 100% Reduce 100%

mapreduce returns an output datastore, result2, with files in the current folder.

Verify Results

Confirm that the generic mapper gets the same result as with the hard-wired subsetting logic.

r2 = readall(result2);
tbl2 = r2.Value{1};

if isequaln(tbl, tbl2)
    disp('Same results with the configurable mapper.')
else
    disp('Oops, back to the drawing board.')
end
Same results with the configurable mapper.

Local Functions

Listed here are the map and reduce functions that mapreduce applies to the data.

function subsettingMapper(data, ~, intermKVStore)
  % Select flights from 1995 and later that had exceptionally long
  % elapsed flight times (including both time on the tarmac and time in 
  % the air).
  idx = data.Year > 1994 & (data.ActualElapsedTime - data.CRSElapsedTime)...
    > 1.50 * data.CRSElapsedTime;
  intermVal = data(idx,:);

  add(intermKVStore,'Null',intermVal);
end
%-------------------------------------------------------------------------
function subsettingReducer(~, intermValList, outKVStore)
  % get all intermediate results from the list
  outVal = {};

  while hasnext(intermValList)
    outVal = [outVal; getnext(intermValList)];
  end
  % Note that this approach assumes the concatenated intermediate values (the
  % subset of the whole data) fit in memory.
    
  add(outKVStore, 'Null', outVal);
end
%-------------------------------------------------------------------------
function subsettingMapperGeneric(data, ~, intermKVStore, subsetter)
  intermKey = 'Null';
  intermVal = data(subsetter(data), :);
  add(intermKVStore,intermKey,intermVal);
end
%-------------------------------------------------------------------------

See Also

|

Related Topics