本チュートリアルでは、Apache Spark SQLの基本的な関数を学ぶことができます。
関数についての詳細な説明に関しては、公式ドキュメントを参照ください。
前置き
Apache SparkとApache Zeppelinの概要と環境構築で説明した通り、Apache Sparkはビッグデータに対する高速分散処理が出来ることが大きなメリットの一つです。
興味のある方はぜひ複数ノードのクラスターなどを構築し、より大きいデータでの処理を体験することをおすすめします。
本チュートリアルで扱うデータの大きさと環境ではSparkのメリットを享受するには十分とは言えませんが、Sparkの基本文法等を身に着けるのに役立つはずです。
準備
環境
- Apache Zeppelin 0.9.0
- Apache Spark 2.4.5
チュートリアル
サンプルデータのダウンロード
- 都道府県市町村区人口 japan.csv
- 男女別人口及び世帯数-行政区 e1yokohama2204.csv
- 出典:横浜市政策局総務部統計情報課 男女別人口及び世帯数-行政区
データの整形
日本の都道府県市町村区の人口のデータを作成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
%spark // japan.csvをデータフレームとして読み込み val dfPopulation = spark .read .format("csv") .option("header","true") .load("/data/japan.csv") // populationカラムがnullではない値でフィルタリング .filter(col("population").isNotNull) // city_wardカラムの作成 .withColumn( "city_ward", when(col("ward").isNull, lit("c")) .when(col("ward").isNotNull, lit("w")) .otherwise(lit(null)) ) // idカラムの作成 .withColumn("id", monotonically_increasing_id()) // population, idカラムのデータタイプをInteger型にキャスト .withColumn("population", col("population").cast("Integer")) .withColumn("id", col("id").cast("Integer")) // データフレームをメモリにキャッシュ .cache() // カラム数と行数を表示 println(dfPopulation.columns.length, dfPopulation.count()) // スキーマを表示 dfPopulation .printSchema // データフレームを表示 dfPopulation .show(1000) // parquetファイル形式で保存 dfPopulation .write .format("parquet") .mode("overwrite") .option("header", "true") .save("/data/japan_population/") |
横浜市の区の面積のデータを作成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
%spark // e1yokohama2204.csvをデータフレームとして読み込み val dfAreaYokohama = spark .read .option("header","true") .csv("/data/e1yokohama2204.csv") // prefecture, cityカラムの作成 .withColumn("prefecture", lit("Kanagawa-ken")) .withColumn("city", lit("Yokohama")) // カラム名の変更 .withColumnRenamed("市区名","ward") .withColumnRenamed("面積[平方キロメートル]","area") // 必要なカラムを選択 .select("prefecture","city","ward","area") // 必要のない行を削除 .filter(col("ward")=!="横浜市") // wardカラムを作成 .withColumn( "ward", when(col("ward")==="鶴見区", "Tsurumi-ku") .when(col("ward")==="神奈川区", "Kanagawa-ku") .when(col("ward")==="西区", "Nishi-ku") .when(col("ward")==="中区", "Naka-ku") .when(col("ward")==="南区", "Minami-ku") .when(col("ward")==="港南区", "Konan-ku") .when(col("ward")==="保土ケ谷区", "Hodogaya-ku") .when(col("ward")==="旭区", "Asahi-ku") .when(col("ward")==="磯子区", "Isogo-ku") .when(col("ward")==="金沢区", "Kanazawa-ku") .when(col("ward")==="港北区", "Kohoku-ku") .when(col("ward")==="緑区", "Midori-ku") .when(col("ward")==="青葉区", "Aoba-ku") .when(col("ward")==="都筑区", "Tsuzuki-ku") .when(col("ward")==="戸塚区", "Totsuka-ku") .when(col("ward")==="栄区", "Sakae-ku") .when(col("ward")==="泉区", "Izumi-ku") .when(col("ward")==="瀬谷区", "Seya-ku") .otherwise(lit(null)) ) // データフレームをメモリにキャッシュ .cache() // カラム数と行数を表示 println(dfAreaYokohama.columns.length, dfAreaYokohama.count()) // スキーマを表示 dfAreaYokohama .printSchema // データフレームを表示 dfAreaYokohama .show() // parquetファイル形式で保存 dfAreaYokohama .write .format("parquet") .mode("overwrite") .option("header", "true") .save("/data/area_yokohama/") |
データの加工
データのユニオン
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
%spark // japan_populationデータの読み込み val dfPopulation = spark .read .format("parquet") .option("header","true") .load("/data/japan_population/") // データフレームをメモリにキャッシュ .cache() // dfPopulationHokkaidoKitamiを作成 val dfPopulationHokkaidoKitami = dfPopulation // フィルタリング .filter(col("prefecture")==="Hokkaido"&&col("city")==="Kitami") // データフレームを表示 dfPopulationHokkaidoKitami.show() // dfPopulationHokkaidoAkabiraを作成 val dfPopulationHokkaidoAkabira = dfPopulation // フィルタリング .filter(col("prefecture")==="Hokkaido"&&col("city")==="Akabira") // データフレームを表示 dfPopulationHokkaidoAkabira.show() // 2つのデータフレームをユニオン val dfUnion = dfPopulationHokkaidoKitami .union(dfPopulationHokkaidoAkabira) // データフレームを表示 dfUnion.show() |
データのジョイン
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
%spark // japan_populationデータの読み込み val dfPopulation = spark .read .format("parquet") .option("header","true") .load("/data/japan_population/") // area_yokohamaデータの読み込み val dfAreaYokohama = spark .read .format("parquet") .option("header","true") .load("/data/area_yokohama/") // dfPopulationとdfAraaYokohamaのジョイン val dfJoin = dfPopulation .join( dfAreaYokohama, Seq("prefecture","city","ward"), "inner" ) // population_density(人口密度)カラムの作成 .withColumn("population_density", col("population")/col("area")) // データフレームをメモリにキャッシュ .cache() // カラム数と行数を表示 println(dfJoin.columns.length, dfJoin.count()) // スキーマを表示 dfJoin .printSchema // データフレームを表示 dfJoin .show() |
演習
下記のデータの集計をSparkSQLを使って実践してみましょう
- 最も人口の多い区
- 市の人口の標準偏差
- 北海道の市の数
- 全国の区の総数
- 各県の人口を降順で
- 各県の名前と県内で最も人口の多い市
- 各市の名前と属す県、最も人口の少ない区と多い区、市内の総人口をそれぞれの市につき一つの行で表示
あなたも、Avintonでこのような最先端技術を習得し活用してみませんか?
社員の成長を導きながら、AIやビッグデータなどの最先端技術をプロジェクトに活用していくことが私たちのビジョンです。Avintonの充実した技術研修でスキルアップを図り、あなたのキャリア目標を一緒に達成しませんか?