この項ではKaggle Titanic生存者予測をSparkMLというライブラリーを使って実施していきます。
SparkMLとは
Apache Spark の機械学習ライブラリで、シンプルでスケーラビリティが高く、他のツールと容易に統合できるように設計された、機械学習を実装するためのツールです。Spark上で扱う事の出来るパッケージであるため、ビッグデータに対して高速に機械学習処理を行うことが出来るのが特徴です。
Sparkの機械学習ライブラリーとして、SparkMLの他にもSparkMLlibが挙げられます。
SparkMLlibとSparkMLの違いとしては、SparkMLlibはRDDで扱うことを想定して実装されており、SparkMLはDataFrameおよびDatasetで扱うことを想定して開発されています。
チュートリアル
- 機械学習入門者向け ランダムフォレストによる Kaggle Titanic生存者予測を参考にして train.csv, test.csvをダウンロードして、zeppelin/data/ ディレクトリ下の任意の場所に移動します
1. 前処理
1.1 ライブラリーのimport
今回の処理に必要なライブラリーをimportします。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import org.apache.spark.sql.SparkSession import org.apache.spark.ml._ import org.apache.spark.ml.feature._ import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.sql.functions._ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.IntegerType import org.apache.spark.ml.tuning.ParamGridBuilder import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tuning.CrossValidator import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.mllib.evaluation.MulticlassMetrics |
1.2 データの読み込み
まずは、データをSpark data frameとして読み込みましょう
1 2 |
val sparkSession = SparkSession.builder().getOrCreate() val df = sparkSession.read.option("header", "true").option("inferSchema", "true").csv("/data/titanic/train.csv") |
下記のコマンド等を使ってデータの中身を把握しましょう。
1 2 3 |
df.printSchema() df.show() df.groupBy("column_name").count().show() |
1.3 欠損値のハンドリング
機械学習をするにあたって、欠損値の処理をする必要があります。
今回のデータにおいてはage
カラムに多くの欠損値が見られます。ageの平均を求めた後、na.fill
APIを用いて欠損値に平均の値を代入します。
1 2 |
val meanValue = df.agg(mean(df("Age"))).first.getDouble(0) val fixedDf = df.na.fill(meanValue, Array("Age")) |
1.4 データの分割
次にrandomSplit method
を使って、データを二つに分割します. 片方はモデルのトレーニング、もう片方は精度の検証に使います。
1 2 3 |
val dfs = fixedDf.randomSplit(Array(0.7, 0.3)) val trainDf = dfs(0).withColumnRenamed("Survived", "label") val crossDf = dfs(1) |
1.5 カテゴリカルデータの処理
Sex(性別)やEmbarked(出港地)などのカテゴリカル(質的)データを処理していきます。
ここでは、one-hot encodingを用いてSparkMLパイプラインを作成していきます。
1 2 3 4 5 6 7 |
def handleCategorical(column: String): Array[PipelineStage] = { val stringIndexer = new StringIndexer().setInputCol(column) .setOutputCol(s"${column}_index") .setHandleInvalid("skip") val oneHot = new OneHotEncoder().setInputCol(s"${column}_index").setOutputCol(s"${column}_onehot") Array(stringIndexer, oneHot) } |
全てのカテゴリ変数に対してstageを作成します。
1 2 3 |
val genderStages = handleCategorical("Sex") val embarkedStages = handleCategorical("Embarked") val pClassStages = handleCategorical("Pclass") |
2. RandomForestを使った分類
2.1. RandomForestを使った分類器の作成
次のコードで、vector assembler stageと、分類用のrandom forest stageを作成し、pipelineの定義をします。
1 2 3 4 5 6 7 8 9 10 |
//columns for training val cols = Array("Sex_onehot", "Embarked_onehot", "Pclass_onehot", "SibSp", "Parch", "Age", "Fare") val vectorAssembler = new VectorAssembler().setInputCols(cols).setOutputCol("features") //algorithm stage val randomForestClassifier = new RandomForestClassifier() //pipeline val preProcessStages = genderStages ++ embarkedStages ++ pClassStages ++ Array(vectorAssembler) val pipeline = new Pipeline().setStages(preProcessStages ++ Array(randomForestClassifier)) |
2.2 modelのfitting
モデルを訓練用データに “fit”(適合)させます。
1 |
val model = pipeline.fit(trainDf) |
2.3. スコアの算出
モデルのパフォーマンスを計るためにスコアを算出していきます。今回の評価指標としてはaccuracy scoreを使用します。
1 2 3 4 5 6 7 |
def accuracyScore(df: DataFrame, label: String, predictCol: String) = { val rdd = df.select(label, predictCol).rdd.map(row ⇒ (row.getInt(0).toDouble, row.getDouble(1))) new MulticlassMetrics(rdd).accuracy } println("train accuracy with pipeline" + accuracyScore(model.transform(trainDf), "label", "prediction")) println("test accuracy with pipeline" + accuracyScore(model.transform(crossDf), "Survived", "prediction")) |
Result
1 2 |
train accuracy with pipeline0.8516746411483254 test accuracy with pipeline0.816793893129771 |
3. Cross-ValidationとHyper Parameter Tuning
Random forestにはいくつかの調整可能なパラメータがあります。手作業で最適なチューニングをするのは大変な作業です。今回はSparkMLのcross validation機能を使用してチューニングを行います。
3.1. Parameter gridの定義
以下のように試すべきパラメータを定義します。
1 2 3 4 5 |
val paramMap = new ParamGridBuilder() .addGrid(randomForestClassifier.impurity, Array("gini", "entropy")) .addGrid(randomForestClassifier.maxDepth, Array(1,2,5, 10, 15)) .addGrid(randomForestClassifier.minInstancesPerNode, Array(1, 2, 4,5,10)) .build() |
3.2 Cross-Validation
Cross-validation stageを定義して最適なパラメータを検索します。また、Cross validationを行うことで、データにover fittingしていないことも確認出来ます。
1 2 3 4 5 6 7 8 9 |
def crossValidation(pipeline: Pipeline, paramMap: Array[ParamMap], df: DataFrame): Model[_] = { val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new BinaryClassificationEvaluator) .setEstimatorParamMaps(paramMap) .setNumFolds(5) cv.fit(df) } val cvModel = crossValidation(pipeline, paramMap, trainDf) |
2.3の時のスコアに比べてスコアが改善されたのが分かります。
1 2 |
train accuracy with cross validation0.8787878787878788 test accuracy with cross validation 0.8577099236641222 |
4. Submit Fileの生成
Kaggle competitionのように提出用のテスト結果ファイルを生成してみましょう。
1 2 3 |
val testDf = sparkSession.read.option("header", "true").option("inferSchema", "true").csv("src/main/resources/titanic/test.csv") val fareMeanValue = df.agg(mean(df("Fare"))).first.getDouble(0) val fixedOutputDf = testDf.na.fill(meanValue, Array("age")).na.fill(fareMeanValue, Array("Fare")) |
1 2 3 4 5 6 |
def generateOutputFile(testDF: DataFrame, model: Model[_]) = { val scoredDf = model.transform(testDF) val outputDf = scoredDf.select("PassengerId", "prediction") val castedDf = outputDf.select(outputDf("PassengerId"), outputDf("prediction").cast(IntegerType).as("Survived")) castedDf.write.format("csv").option("header", "true").mode(SaveMode.Overwrite).save("src/main/resources/output/") } |
参考
あなたも、Avintonでこのような最先端技術を習得し活用してみませんか?
社員の成長を導きながら、AIやビッグデータなどの最先端技術をプロジェクトに活用していくことが私たちのビジョンです。Avintonの充実した技術研修でスキルアップを図り、あなたのキャリア目標を一緒に達成しませんか?