Несоответствие типов дженериков Spark UDAF

Я пытаюсь создать UDAF в Spark (2.0.1, Scala 2.11), как показано ниже. Это по существу агрегирует кортежи и выводит Map

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{Row, Column}

class mySumToMap[K, V](keyType: DataType, valueType: DataType) extends UserDefinedAggregateFunction {
  override def inputSchema = new StructType()
    .add("a_key", keyType)
    .add("a_value", valueType)

  override def bufferSchema = new StructType()
    .add("buffer_map", MapType(keyType, valueType))

  override def dataType = MapType(keyType, valueType)

  override def deterministic = true 

  override def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = Map[K, V]()
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

    // input :: 0 = a_key (k), 1 = a_value
    if ( !(input.isNullAt(0)) ) {

      val a_map = buffer(0).asInstanceOf[Map[K, V]]
      val k = input.getAs[K](0)  // get the value of position 0 of the input as string (a_key)

      // I've split these on purpose to show that return values are all of type V
      val new_v1: V = a_map.getOrElse(k, 0.asInstanceOf[V])
      val new_v2: V = input.getAs[V](1)
      val new_v: V = new_v1 + new_v2

      buffer(0) = if (new_v != 0) a_map + (k -> new_v) else a_map - k
    }
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    val map1: Map[K, V] = buffer1(0).asInstanceOf[Map[K, V]]
    val map2: Map[K, V] = buffer2(0).asInstanceOf[Map[K, V]]

    buffer1(0) = map1 ++ map2.map{ case (k,v) => k -> (v + map1.getOrElse(k, 0.asInstanceOf[V])) }
  }

  override def evaluate(buffer: Row) = buffer(0).asInstanceOf[Map[K, V]]

}

Но когда я компилирую это, я вижу следующую ошибку:

<console>:74: error: type mismatch;
 found   : V
 required: String
             val new_v: V = new_v1 + new_v2
                                     ^
<console>:84: error: type mismatch;
 found   : V
 required: String
           buffer1(0) = map1 ++ map2.map{ case (k,v) => k -> (v + map1.getOrElse(k, 0.asInstanceOf[V])) }

Что я делаю не так?

EDIT: Для людей, помечающих это как дубликат Spark UDAF — использование дженериков в качестве типа ввода? — это не дубликат этой проблемы, так как он не имеет отношения к типу данных Map. Вышеприведенный код очень специфичен и завершен в отношении проблемы, с которой столкнулись при использовании типа данных Map.


person mrbrahman    schedule 28.09.2017    source источник
comment
Почему вы предполагаете, что тип V имеет оператор (метод) +? Вы не привязывали его к чему-то конкретному, поэтому это может быть любой класс, включая классы, которые не определяют этот оператор. Вы хотите привязать V к любому числовому типу?   -  person Tzach Zohar    schedule 28.09.2017
comment
@TzachZohar Похоже, эта ошибка как-то связана с тем, как я делаю сложение? В вашем примере CombineMaps (это действительно здорово! Кстати), я попытался избавиться от merge (поскольку мне нужно только добавить в моем случае использования). Оператор val result = map1 ++ map2.map{case(k,v) => k -> map1.get(k).map(v + _).getOrElse(v) } выдает ту же ошибку, что и выше!   -  person mrbrahman    schedule 28.09.2017
comment
Именно поэтому здесь необходим этот аргумент merge: для универсального типа V откуда UDAF узнает, как объединить два значения в одно? Для числовых значений + является хорошим вариантом, но для нечисловых значений, для которых не определен оператор +, вам потребуется, чтобы вызывающий объект предоставил соответствующую функцию. В любом случае - вы получили здесь хороший ответ от @user8371915   -  person Tzach Zohar    schedule 29.09.2017


Ответы (1)


Ограничьте типы теми, у которых есть Numeric[_]

class mySumToMap[K, V: Numeric](keyType: DataType, valueType: DataType) 
  extends UserDefinedAggregateFunction {
    ...

используйте Implicitly, чтобы получить его во время выполнения:

val n = implicitly[Numeric[V]]

и используйте его метод plus вместо + и zero вместо 0

buffer1(0) = map1 ++ map2.map{ 
  case (k,v) => k -> n.plus(v,  map1.getOrElse(k, n.zero))
}

Для поддержки более широкого набора типов вы можете использовать cats Monoid:

import cats._
import cats.implicits._

и откорректируйте код:

class mySumToMap[K, V: Monoid](keyType: DataType, valueType: DataType) 
  extends UserDefinedAggregateFunction {
    ...

и позже:

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
  val map1: Map[K, V] = buffer1.getMap[K, V](0)
  val map2: Map[K, V] = buffer2.getMap[K, V](0)

  val m = implicitly[Monoid[Map[K, V]]]

  buffer1(0) = m.combine(map1, map2)
}
person Alper t. Turker    schedule 29.09.2017