Применение UDF к подмножествам фрейма данных pyspark

У меня есть Dataframe, подобный следующему, содержащий два отсортированных списка строк для каждой возможной комбинации key1 и key2.

df=
+----+------------+-------+-------+
|key1|        key2| value1| value2|
+----+------------+-------+-------+
| 'a'|  '10,0,10' |  'abc'|  'abc'|
| 'a'|  '10,0,10' |  'aab'|  'aab'|
| 'a'|  '10,0,10' |  'acb'|  'acb'|
| 'a'|  '10,0,20' |  'abc'|  'abc'|
| 'a'|  '10,0,20' |  'acb'|  'aab'|
| 'a'|  '10,0,20' |  'aab'|  'acb'|
| 'b'|  '10,0,10' |  'bcd'|  'bcd'|
| 'b'|  '10,0,10' |  'bbc'|  'bdc'|
| 'b'|  '10,0,10' |  'bdc'|  'bbc'|
|...

Теперь я хочу применить такую ​​​​функцию:

for c in [x for x in df.select('key1').distinct().collect()]:
    for s in [x for x in df.select('key2').distinct().collect()]:
       jaccard_sim([x for x in df.select('value1').filter(df['key1']==c).filter(df['key2']==s).collect()], 
              [x for x in df.select('value2').filter(df['key1']==c).filter(df['key2']==s).collect()])

Но поскольку я хочу использовать возможность искр для распараллеливания выполнения, я думаю, что приведенная выше реализация может быть довольно глупой;) Кто-нибудь знает, как ее решить?

Предыстория заключается в том, что у меня есть отсортированный список (value1) для комбинации key1 и key2, который я хочу сравнить с эталонным списком для каждого ключа 1 (value2) и вычислить сходство жаккарда между списками. Если у кого-то есть (лучшее) предложение о том, как это сделать с помощью pyspark, я бы очень его оценил! Спасибо:)


person Seastar    schedule 20.06.2018    source источник


Ответы (1)


Вы можете подойти так,

import pyspark.sql.functions as F

def convert_form(x):
    print type(x)
    val1 = [y['value1'] for y in x]
    val2 = [y['value2'] for y in x]
    return [val1, val2]

jaccard_udf = F.udf(lambda x: jaccard_sim(*convert_form(x)) ) #assuming you have jaccard_sim function

df = df.select('key1', 'key2', F.struct('value1','value2').alias('values'))\
       .groupby('key1', 'key2').agg(F.collect_list('values').alias('collected_col'))\
       .withColumn('jaccard_distance', jaccard_udf(F.col('collected_col')) )

df.show()
person mayank agrawal    schedule 20.06.2018