Как рассчитать процентиль столбца в DataFrame в искре?

Я пытаюсь вычислить процентиль столбца в DataFrame? Я не могу найти ни одной функции percentile_approx в функциях агрегирования Spark.

Например, в Hive у нас есть percentile_approx, и мы можем использовать его следующим образом

hiveContext.sql("select percentile_approx("Open_Rate",0.10) from myTable); 

Но я хочу сделать это с помощью Spark DataFrame из соображений производительности.

Образец набора данных

|User ID|Open_Rate|
------------------- 
|A1     |10.3     |
|B1     |4.04     |
|C1     |21.7     |
|D1     |18.6     |

Я хочу узнать, сколько пользователей попадает в 10 или 20 процентиль и так далее. Я хочу сделать что-то подобное

df.select($"id",Percentile($"Open_Rate",0.1)).show

person dheee    schedule 06.06.2016    source источник
comment
Нет разницы в производительности между использованием SQL-запроса и DataFrame - оба используют один и тот же механизм выполнения.   -  person zero323    schedule 07.06.2016
comment
Вы можете использовать свой собственный UDAF. Вот как я это сделал: stackoverflow.com/a/51859138/2166220   -  person Brown nightingale    schedule 15.08.2018


Ответы (3)


Начиная со Spark2.0, все становится проще, просто используйте эту функцию в DataFrameStatFunctions, например:

df.stat.approxQuantile("Open_Rate",Array(0.25,0.50,0.75),0.0)

Также есть несколько полезных статистических функций для DataFrame в DataFrameStatFunctions.

person Yulin GUO    schedule 04.08.2017
comment
Хорошая находка. Мне нравится, что он позволяет определять относительную ошибку как число от 0 до 1. Одно предостережение: эта функция не будет работать для агрегированных вычислений для нескольких групп одновременно. Для тех, кто хочет рассчитать процентиль сразу для нескольких групп, обратите внимание на percentile_approx, который является искровой функцией sql. Требуется необязательный аргумент Integer, связанный с количеством наблюдений в группе: по умолчанию 10 000. Это означает, что эта функция возвращает точный процентиль для групп с менее чем 10 000 наблюдений. Укажите большее значение для большей точности. - person Raphvanns; 23.01.2018
comment
что нам нужно импортировать для df.stat.approxQuantile? - person Haha TTpro; 12.12.2018
comment
@ HahaTTpro, не более того. С любым экземпляром DataFrame вы можете использовать dataframeInstance.stat.approxQuantile. - person Yulin GUO; 14.12.2018

SparkSQL и API фреймов / наборов данных Scala выполняются одним и тем же механизмом. Эквивалентные операции будут генерировать эквивалентные планы выполнения. Вы можете увидеть планы выполнения с помощью explain.

sql(...).explain
df.explain

Когда дело доходит до вашего конкретного вопроса, это обычный шаблон для смешивания синтаксиса SparkSQL и Scala DSL, потому что, как вы обнаружили, их возможности еще не эквивалентны. (Другой пример - разница между SQL explode() и explode() DSL, последний является более мощным, но и более неэффективным из-за маршалинга.)

Самый простой способ сделать это:

df.registerTempTable("tmp_tbl")
val newDF = sql(/* do something with tmp_tbl */)
// Continue using newDF with Scala DSL

Что вам нужно иметь в виду, если вы идете простым путем, так это то, что имена временных таблиц являются глобальными для кластера (до 1.6.x). Следовательно, вы должны использовать рандомизированные имена таблиц, если код может запускаться одновременно более одного раза в одном кластере.

В моей команде этот шаблон достаточно распространен, поэтому мы добавили .sql() неявный к DataFrame, который автоматически регистрирует, а затем отменяет регистрацию временной таблицы для области действия оператора SQL.

person Sim    schedule 12.06.2016
comment
Ты прав. И спасибо за подробное объяснение. Но причина, по которой я хотел сделать это в фреймах данных, заключается в том, что у меня есть несколько других методов и UDF, которые мне нужно применить. И если я собираюсь использовать простой sql / hive, мне придется изменить много кода, чтобы достичь этого процентиля. Также не уверен, какие еще проблемы могут возникнуть, если я изменю код. - person dheee; 14.06.2016
comment
@dheee Я не уверен, что понимаю вашу озабоченность ... (1) Вы можете использовать свой собственный UDF как из SQL, так и из DSL. (2) Вам не нужно менять весь код, только этап, на котором создается столбец с процентилем. - person Sim; 16.06.2016

Я создал библиотеку bebe, которая упрощает вычисление процентиля столбца.

Начнем с создания вашего DataFrame.

val df = spark
  .createDF(
    List(
      ("A1", 10.3),
      ("B1", 4.04),
      ("C1", 21.7),
      ("D1", 18.6)
    ),
    List(
      ("User ID", StringType, true),
      ("Open_Rate", DoubleType, true)
    )
  )
df.show()
+-------+---------+
|User ID|Open_Rate|
+-------+---------+
|     A1|     10.3|
|     B1|     4.04|
|     C1|     21.7|
|     D1|     18.6|
+-------+---------+

Теперь давайте посчитаем 10-й процентиль:

val resDF = df.agg(bebe_percentile(col("Open_Rate"), lit(0.1)).as("10_percentile"))
resDF.show()
+-----------------+
|    10_percentile|
+-----------------+
|5.918000000000001|
+-----------------+

Он использует тот же базовый код, что и метод процентилей SQL.

person Powers    schedule 11.04.2021