Spark クラスターでの tall 配列の使用
この例では、tall table を作成する MATLAB® の例を変更して、Spark™ クラスターまたは Spark 対応 Hadoop® クラスターで実行する方法を説明します。この tall table を使用して tall 配列を作成し、統計プロパティを計算できます。コードをローカルで作成してからスケール アップすることで、アルゴリズムを書き換えることなく Parallel Computing Toolbox™ および MATLAB Parallel Server™ が提供する機能を利用できます。tall 配列およびデータ ストアを使用するビッグ データのワークフロー、Spark クラスター用の構成 (MATLAB Parallel Server)、およびHadoop クラスターの構成 (MATLAB Parallel Server)も参照してください。
クラスター プロファイルを使用した Spark クラスターへの接続
R2024a 以降
Spark クラスター プロファイルから parallel.cluster.Spark
オブジェクトを作成して使用し、mapreducer
を使用して Spark クラスターを実行環境として設定します。
sparkCluster = parcluster("SparkProfile")
mr = mapreducer(sparkCluster)
Spark クラスターのプロファイルの作成方法については、クライアント構成 (MATLAB Parallel Server)を参照してください。
Spark クラスターおよび Spark 対応 Hadoop クラスターへの手動による接続
クラスター プロファイルなしで Spark クラスターに接続することもできます。まず、環境変数とクラスター プロパティを特定の Spark クラスター構成に応じて適切に設定しなければなりません。これらの値とクラスターにジョブを投入するために必要なその他のプロパティについては、システム管理者に問い合わせてください。
Spark クラスター用のクラスター オブジェクトの手動による作成
MATLAB クライアントから Spark クラスターに接続するクラスター オブジェクトを作成します。
マシン上の Spark のインストール場所を指定することによってクラスター オブジェクトを作成します。関数 mapreducer
を使用して Spark クラスターを実行環境として設定します。
cluster = parallel.cluster.Spark(SparkInstallFolder="/path/to/spark/install"); % Optionally, if you want to control the exact number of workers: cluster.SparkProperties('spark.executor.instances') = '16'; mapreducer(cluster);
Spark 対応 Hadoop クラスター用のクラスター オブジェクトの手動による作成
MATLAB クライアントから Spark 対応 Hadoop クラスターに接続するクラスター オブジェクトを作成します。
環境変数を使用して、マシン上の Hadoop クラスターのインストール場所と Spark のインストール場所を指定します。クラスター オブジェクトを作成し、Spark 対応 Hadoop クラスターを実行環境として設定します。
setenv('HADOOP_HOME', '/path/to/hadoop/install') setenv('SPARK_HOME', '/path/to/spark/install'); cluster = parallel.cluster.Hadoop; % Optionally, if you want to control the exact number of workers: cluster.SparkProperties('spark.executor.instances') = '16'; mapreducer(cluster);
メモ
設定手順で、mapreducer
を使用してクラスター実行環境を設定します。次の手順で、tall 配列を作成します。tall 配列の作成後にクラスター実行環境の変更または削除を行った場合、tall 配列は無効になり、再作成しなければなりません。
メモ
逐次で開発してローカル ワーカーを使用しない場合は、次のコマンドを入力します。
mapreducer(0);
tall テーブルの作成と使用
これで、ローカル マシンではなく Spark クラスター上で並列 MATLAB コードを実行できます。
以下の手順は、Spark 対応 Hadoop クラスター上で tall table を作成して使用する方法を示していますが、この手順は任意の Spark クラスターに使用できます。
航空会社のフライト データの表形式ファイルを指すデータストアを作成します。'NA'
値を欠損データとして扱い、関数 datastore
がこれらを NaN
値に置き換えるようにすることで、データを整理します。
ds = datastore('airlinesmall.csv'); varnames = {'ArrDelay', 'DepDelay'}; ds.SelectedVariableNames = varnames; ds.TreatAsMissing = "NA";
データストアから tall table tt
を作成します。MATLAB により、tall table に対して以降の計算を実行する Spark ジョブが自動的に開始されます。
tt = tall(ds)
Starting a Spark job on the Hadoop cluster. This may take a few minutes while cluster resources are allocated ... Connected to the Spark job. tt = M×2 tall table ArrDelay DepDelay ________ ________ 8 12 8 1 21 20 13 12 4 -1 59 63 3 -2 11 -1 : : : :
この表示は、行数 M
が未知であることを示しています。M
は計算が完了するまではプレースホルダーです。
tall table から到着遅延 ArrDelay
を抽出します。このアクションで、以降の計算に使用する新たな tall 配列変数が作成されます。
a = tt.ArrDelay;
tall 配列に対して一連の操作を指定できます。この操作は、関数 gather
を呼び出すまで実行されません。こうすることで、長時間かかる可能性のある複数のコマンドをバッチ処理できます。例として、到着遅延の平均と標準偏差を計算します。これらの値を使用して、平均値から 1 標準偏差内にある遅延の上下のしきい値を作成します。
m = mean(a,'omitnan'); s = std(a,'omitnan'); one_sigma_bounds = [m-s m m+s];
gather
を使用して one_sigma_bounds
を計算し、その解をメモリに格納します。
sig1 = gather(one_sigma_bounds)
Evaluating tall expression using the Spark Cluster: - Pass 1 of 1: Completed in 0.95 sec Evaluation completed in 1.3 sec sig1 = -23.4572 7.1201 37.6975
複数の事項を一度に評価する場合は、複数の入力と出力を gather
に指定できます。これにより、それぞれの tall 配列で gather
を個別に呼び出すよりも高速になります。例として、到着遅延の最小値と最大値を計算します。
[max_delay, min_delay] = gather(max(a),min(a))
max_delay = 1014 min_delay = -64
メモ
クラスター ワーカー上で MATLAB を起動する場合、これらの例では、より長い時間が初回での完了にかかります。
Spark クラスター上で tall 配列を使用する場合、mapreducer 実行環境が存在している限り、クラスターの計算リソースは予約されています。これらのリソースをクリアするには、mapreducer を削除しなければなりません。
delete(gcmr);
mapreducer(0);
参考
gather
| tall
| datastore
| table
| mapreducer
| parallel.cluster.Hadoop
| parallel.cluster.Spark
関連する例
- tall 配列およびデータ ストアを使用するビッグ データのワークフロー
- 並列プールでの tall 配列の使用
- Spark クラスター用の構成 (MATLAB Parallel Server)
- Hadoop クラスターの構成 (MATLAB Parallel Server)
- メモリに収まらないデータの tall 配列