Java Spark GroupByFailure

Я пытаюсь использовать библиотеки Java Spark с кластером, на котором работает Spark 2.3.0 поверх Hadoop 3.1.0 (и с использованием этих версий библиотек Java).

Я столкнулся с проблемой, когда я просто не могу использовать groupByKey, и я не могу объяснить, почему. Любая попытка использования groupByKey по любой причине и при любых обстоятельствах возвращает исключение java.lang.IllegalArgumentException.

Я свел это к простейшему тесту, который я могу придумать:

package com.failuretest;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class TestReport {

    public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setAppName("TestReport").set("spark.executor.memory", "20G");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> test = sc.parallelize(generateTestData());
        test.saveAsTextFile("/TEST/testfile1");
        test.mapToPair(line -> {
            String[] testParts = line.split(" ");
            return new Tuple2<String, String>(testParts[0], testParts[1]);
        }).groupByKey().saveAsTextFile("/TEST/testfile2");
        sc.close();
    }

    private static List<String> generateTestData() {
        List<String> testList = new ArrayList<String>();
        int keyCount = 0;
        int valCount = 0;
        while (valCount++ < 2000000) {
            if (valCount % 10 == 0) {
                keyCount++;
            }
            testList.add("Key" + keyCount + " " + "Val" + valCount);
        }
        return testList;
    }

}

Я просто программно создаю RDD, который создает 10 значений для каждого ключа, затем создаю свой JavaPairRDD с простым разделением, а затем пытаюсь выполнить groupByKey.

Когда он запускается, я получаю следующий стек:

Exception in thread "main" java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2292)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:88)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKeyWithClassTag$1.apply(PairRDDFunctions.scala:77)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:77)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$1.apply(PairRDDFunctions.scala:505)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$1.apply(PairRDDFunctions.scala:498)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.groupByKey(PairRDDFunctions.scala:498)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$3.apply(PairRDDFunctions.scala:641)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$groupByKey$3.apply(PairRDDFunctions.scala:641)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.groupByKey(PairRDDFunctions.scala:640)
    at org.apache.spark.api.java.JavaPairRDD.groupByKey(JavaPairRDD.scala:559)
    at com.failuretest.TestReport.main(TestReport.java:22)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Дальше groupByKey дело не идет (выше я пишу файл с результатами, но на самом деле это не имеет значения, так как он туда никогда не попадает).

Я могу запускать его весь день в моем локальном экземпляре dev, но запуск spark-submit с банкой, содержащей вышеперечисленное, каждый раз терпит неудачу в кластере.

Я действительно не уверен, куда идти дальше - то, что я пытаюсь сделать, - это немного сложно, если я не могу сгруппировать по ключу.

Я ошибаюсь? Это где-то конфликт версий?

Дэйв


person user3654762    schedule 30.05.2018    source источник


Ответы (1)


Я действительно понял это, прежде чем опубликовать это, но в интересах помощи другим...

Я обнаружил, что один из моих коллег решил поиграть с Java 10 на этом конкретном кластере. Переместил его обратно на Java 8 (извините - не пробовал 9), и проблема исчезла.

Дэйв

person user3654762    schedule 30.05.2018