Kotlin Iterable не поддерживается в Apache Beam?

Луч Apache, похоже, отказывается распознавать Iterable Котлина. Вот пример кода:

@ProcessElement
fun processElement(
    @Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
    val output = input.key + "|" + input.value.toString()
    println("output: $output")
    receiver.output(output)
}

Я получаю следующую странную ошибку:

java.lang.IllegalArgumentException:
   ...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
   @Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>

Конечно, если я заменю Iterable на java.lang.Iterable, тот же код будет работать нормально. Что я делаю неправильно?

Версия зависимостей:

  • kotlin-jvm: 1.3.21
  • org.apache.beam: 2.11.0

Вот суть с полными кодами и трассировкой стека:

Обновление:

После небольшого количества проб и ошибок я обнаружил, что в то время как List<String> выдает аналогичное исключение, но MutableList<String> действительно работает:

class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
    @ProcessElement
    fun processElement(
        @Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
    ) {
        val output = input.key + "|" + input.value.toString()
        println("output: $output")
        receiver.output(output)
    }
}

Итак, это напомнило мне, что неизменяемая коллекция Kotlin на самом деле является всего лишь интерфейсом, а базовая коллекция все еще изменяема. Однако попытка заменить Iterable на MutableIterable продолжает вызывать ошибку.

Обновление 2:

Я развернул свое задание Kotlin Dataflow, используя MutableList, как указано выше, и задание завершилось ошибкой:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)

Пришлось снова переключиться на java.lang.Iterable.


person marcoseu    schedule 29.04.2019    source источник
comment
Это во время выполнения или во время компиляции? Можете ли вы поделиться большей частью трассировки стека?   -  person mkobit    schedule 30.04.2019
comment
Трассировка стека @mkobit добавлена ​​в gist.github.com/marcoslin/. Спасибо   -  person marcoseu    schedule 30.04.2019


Ответы (5)


Я также столкнулся с этой проблемой, когда использовал ParDo после GroupByKey. Оказывается, что аннотация @JvmWildcard необходима в универсальном типе Iterable при написании преобразования, которое принимает результат GroupByKey.

См. Надуманный пример ниже, который читает файл и группирует его по первому символу каждой строки.

class BeamPipe {
  class ConcatLines : DoFn<KV<String, Iterable<@JvmWildcard String>>, KV<String, String>>() {
    @ProcessElement
    fun processElement(@Element input: KV<String, Iterable<@JvmWildcard String>>, receiver: OutputReceiver<KV<String, String>>) {
      receiver.output(KV.of(input.key, input.value.joinToString("\n")))
    }
  }

  fun pipe(options: PipelineOptions) {
    val file =
        "testFile.txt"
    val p = Pipeline.create(options)
    p.apply(TextIO.read().from(file))
        .apply("Key lines by first character",
            WithKeys.of { line: String -> line[0].toString() }
                .withKeyType(TypeDescriptors.strings()))
        .apply("Group lines by first character", GroupByKey.create<String, String>())
        .apply("Concatenate lines", ParDo.of(ConcatLines()))
        .apply("Write to files", FileIO.writeDynamic<String, KV<String, String>>()
            .by { it.key }
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(ProcessFunction { it.value }), TextIO.sink())
            .to("whatever")
            .withNaming { key -> FileIO.Write.defaultNaming(key, ".txt") }
        )
    p.run()
  }
}
person AlecBrooks    schedule 16.03.2020

Похоже на ошибку в Beam Kotlin SDK. Анализ отражения для вашего @ProcessElement метода работает неправильно. Вероятно, вы можете обойти это, используя ProcessContext ctx вместо параметра @Element.

person mxm    schedule 08.05.2019
comment
Спасибо @mxm. Я пробовал это, но все равно получаю то же IllegalArgumentException. К приведенной выше сути добавлен пример кода и трассировка стека. Как и в другом примере, MutableList работает нормально. - person marcoseu; 09.05.2019
comment
Понятно. Спасибо за обновления! Мы рассмотрим это. - person mxm; 13.05.2019

Я не очень знаком с kotlin, но кажется, что вам нужно импортировать import java.lang.Iterable, прежде чем использовать его в своем коде.

person Ankur    schedule 29.04.2019
comment
спасибо, но на мой вопрос, если я сделаю импорт, он сработает. В Kotlin этого не должно быть, поскольку Iterable эквивалентно java.lang.Iterable. Эта проблема проявляется и в других сферах. Например, при использовании GroupByKey, который возвращает Iterable (который я не могу установить), он вызывает ту же ошибку в сочетании с ParDo. - person marcoseu; 30.04.2019

Могу ли я узнать, как исправить проблему, когда мы получаем итерацию из groupbykey.create (). я не мог groupbykey, как вы сделали javalang итерабельно

person HardWorker    schedule 07.12.2019
comment
это не ответ на вопрос - person MaxG; 07.12.2019
comment
@HardWorker, вы должны задать вопрос с образцом кода, где другие могут легко работать. Я могу попытаться помочь с дополнительной информацией. - person marcoseu; 08.12.2019
comment
@HardWorker Только что заметил, что у вас аналогичная проблема. Мой ответ выше может быть вам полезен. - person KalanyuZ; 09.12.2019

Для тех, кто столкнулся с этой проблемой и нашел здесь свой путь, мой текущий обходной путь для продолжения написания конвейера в kotlin заключается в создании статического класса Java с функцией (ами), которая создает, содержит и обрабатывает ваши Iterable (ы). Результат (в не повторяемом формате) может быть передан обратно в kotlin.

person KalanyuZ    schedule 09.12.2019