きどたかのブログ

いつか誰かがこのブログからトラブルを解決しますように。

PySparkでの最頻値mode

pandasにはあるけど、pysparkにはない代表格と言っても良いかもしれない。
modeでピンとこない人は、most frequentだったり、most commonなどでも検索します。

ネットで探すといくつか実装例がある。
少しは参考にした。

自分が先日書いたコードだとどんな感じになったかを思い出しながら書いてみる

実現したい最頻値の取り方は、
「あるグループの中での、あるカラムの最頻値を複数カラム分」というものです。
都道府県ごとに知りたい最頻値がたくさんあるというようなことです。

最頻値を取らないといけないカラムが多すぎて、ループで回して書いてたら鈍足になってしまっていたので、できるだけ速度的にズバッといける書き方を模索した。

from pyspark.sql import functions as f
from pyspark.sql import Window as w

df = something 

keys = ["key1", "key2"] # グループキー
cols_mode = ["col1", "col2"]

expression_count = 
  [f.col(c) for c in keys] + 
  [f.struct(
     f.count(c).over(
        w.partitionBy(keys + [c])
     ),
     c).alias(f"{c}_struct")
   for c in cols_mode
   ]

df_count = df.select(*expression_count)
# key1, key2, col1_struct, col2_struct

expression_max = [f.max(f"{c}_struct").getItem(c).alias(f"mode({c})") for c in cols_mode]

df_mode = df_count.groupBy(*keys).agg(*expression_max)
# key1, key2, mode(col1), mode(col2)


いくつかのポイント。

Windowを用いたcount集約を使う。

最頻値の性質として、件数カウントがどうしても必要になってくるが、df.groupBy(...)で書き始めてしまうと沼にハマる。

key1 key2 col1_struct col2_struct
count col1 count col2

structを活用する。

ネット上の例でもstructを使ったものがあり、非常に参考になった。

structに対してmaxを取ると良い感じに動く。
orderbyやlimitは書かなくていい。

注意点1 Null

このコードでは最頻値てして、Nullはカウントされてない。Nullをカウントする場合は、whenを用いる必要がある。NaNも。

注意点2

最頻値に同じ件数の出現があった場合、カウント対象の値が大きい方が使われる。これはstructにmaxを使ってるのでそうなる。異なる基準で選ぶ必要があるなら、structの2番目にそれを入れ込む必要がある。