PySparkでDataframeを操作する

ビッグデータのETL処理を行う際に使用したPySparkについて書いてみたいと思います。 私が携わった案件ではAWS Glueを用いた開発でしたが、Sparkを動かしてみるだけなら無料でお手軽に使うことができるDatabricksがオススメです!(本記事ではDatabricksについては割愛します)

ビッグデータとは

昨今、当たり前のように耳にするようになった「ビッグデータ」ですが、 そもそもビッグデータとは何なのでしょうか?「大きなデータ」ではあるのですが、単純に膨大なデータを指す言葉ではありません。
ビックデータとは様々な形、特性、種類を持ったデータであり、DataVolume(データの量)、DataVariety(データの種類)、DataVelocity(データの発生・更新頻度)の重要な3つのVから成っていると言われています。また、IT用語辞典では下記のように定義されています。

ビッグデータとは、従来のデータベース管理システムなどでは記録や保管、解析が難しいような巨大なデータ群。明確な定義があるわけではなく、企業向け情報システムメーカーのマーケティング用語として多用されている。 多くの場合、ビッグデータとは単に量が多いだけでなく、様々な種類・形式が含まれる非構造化データ・非定型的データであり、さらに、日々膨大に生成・記録される時系列性・リアルタイム性のあるようなものを指すことが多い。 今までは管理しきれないため見過ごされてきたそのようなデータ群を記録・保管して即座に解析することで、ビジネスや社会に有用な知見を得たり、これまでにないような新たな仕組みやシステムを産み出す可能性が高まるとされている。

引用元: ビッグデータとは – IT用語辞典 e-Words

このようにビッグデータとは単に大きなデータといった意味ではなく、ビジネスや社会における新たな可能性を秘めたものであることがわかります。

PySparkとは

PySparkの前にApache Spark(以下、Spark)の説明をします。Sparkは前述したビッグデータに対して高速に分散して処理を行うオープンソースのフレームワークです。SparkにはJavaやScala、Pythonといった様々なプログラミング言語から利用できるAPIが用意されています。PythonでSparkを利用する際に用いるAPIがPySparkです。

Dataframeとは

DataframeとはSpark上でParquetやCSVといったファイルをデータベースのテーブルのように扱うことができるオブジェクトです。SQLと同様にSELECTやWHERE(FILTER)、Dataframe同士を縦に結合するUNIONや横に結合するJOIN、集計に必要となるAGG(COUNT、SUM、AVG)などのメソッドが用意されています。

PySparkで出来ること

PySparkで出来ること、それは当たり前ですがPythonでSparkを扱うこと。 つまりPythonを使ってDataframeを作り、SQLのように操作し、ビッグデータを集計したり分散処理することが出来ます。実際によく使う例を紹介します。

ファイルロード

処理を行いたいデータファイルを読み込みDataframe化します。 format()でファイル形式を選択し、option()で文字コードの指定や、見出し行を読み込む指定を行います。また、option(‘inferSchema’, ‘true’)の指定をするとカラムの型を自動判別して読み込むことが可能です。 load()で読み込むデータファイルのパスを指定します。

フィルター

または

意味も使い方も同じです。SQLに慣れている人はwhereを使うほうが馴染み深いかもしれません。Dataframeから抽出したい条件を絞り込むのに使用します。

縦結合

SQLでも2つのテーブルを縦(行)に結合する際に使用されることのあるUNIONです。同じカラムを持った2つのDataframeを縦に結合します。 なお、Dataframeの縦結合には3種類存在します。

  • union
  • unionAll
  • unionByName

それぞれどういった違いがあるのでしょう。

unionとunionAllの違い

unionはv2.0より、unionAllはv1.3より実装されたもので、実はこの2つのメソッドには機能的な違いはありません。

SQLではunionAllは重複の制御が行われず、unionでは重複の制御が行われますが、Sparkのunionはシンプルに縦結合するだけで重複の制御が行われません。繋いだものをdistinctして重複を除外する必要があります。この点は使用する際に注意が必要です。

unionとunionByNameの違い

unionとunionByNameの違いはカラム名を参照するか、しないかという点です。カラムの順番が異なる2つのDataframeを縦に繋ぐ際、unionはカラム名が異なっていても縦に繋ぎます。一方でunionByNameの方は一致するカラム名を探し縦に結合します。

unionByNameはv2.3より実装されたもので、2つのDataframeを縦に繋ぐ際にはunionByNameを使うのが無難と言えるでしょう。

横結合

SQLでテーブル同士をカラム名を指定して横に繋ぐ際に使用するJOINと同じです。 howの箇所には以下のいずれかを指定します。

  • Inner
    • SQLで言うところのINNER JOINです。 2つのDataframeを結合する際に両方に共通してあるデータ(行)のみ残ります。
  • left, left_outer
    • SQLでいうところのLEFT JOIN, LEFT OUTER JOINです。 left joinとleft outer joinの挙動は同じで、左のリレーションからは全ての値を残し、右のリレーションからは一致する値を残し、一致しないものについてはNULLで結合されます。
  • right, right_outer
    • SQLでいうところのRIGHT JOIN, RIGHT OUTER JOINです。 leftの時と同様にright joinとright outer joinの挙動は同じで、右のリレーションからは全ての値を残し、左のリレーションからは一致する値を残し、一致しないものについてはNULLで結合されます。
  • cross
      SQLで言うところのCROSS JOINです。 cross joinは結合するテーブルのデカルト積、つまり全ての組み合わせを返します。 実務に使用する場面は限られているかもしれません。 私自身、性能検証などで使用する以外使ったことがありません。

その他にもouter, full, fullouter, leftsemi, left_antiなどがあり、場面に応じて使い分けるのがいいでしょう。

集計

基本的にはgroupByからのaggという使い方になり、aggメソッドの引数でcountやmin,max、sum,avgなどを記述します。 SQLと書き方は違えど基本的な考え方は同じです。

列の追加、リネーム、エイリアス

列の追加

列のリネーム

エイリアス

まとめ

Dataframeを加工しながら目的のデータを作っていくのは楽しいです。今回紹介したものは本当にごくごく一部でしかないので、使えるメソッドは沢山あり、もっと奥は深いです。SQLに馴染みのある方は最初こそ違和感を感じることもあるかもしれませんが、書いてみるとすぐに慣れると思います。

Sparkを書く際に注意したいこととして分散処理されるのはDataframe内の処理というところ。作成したDataframeをリスト化してPythonのループなどで処理すると、その部分は分散処理されません。そういった場合に、よく用いるのがUDF(User Defined Function)です。

本記事では紹介していませんが、UDFというのはユーザ定義関数のことで、Dataframe内で呼び出すことでSparkクラスタ内での分散処理を行うことが可能となります。実装をする際は分散処理になるかならないかを意識しながら、Sparkを有効活用した処理を書いていくのが良いです。