Файл AWS EMR уже существует: чтение и запись задания Hadoop в S3

У меня есть задание Hadoop, работающее в EMR, и я передаю путь S3 в качестве ввода и вывода для этого задания.

Когда я запускаю локально, все работает нормально (поскольку есть один узел)

Однако, когда я запускаю EMR с кластером из 5 узлов, я запускаю исключение ввода-вывода «Файл уже существует».

Выходной путь имеет отметку времени, поэтому выходной путь не существует в S3.

Error: java.io.IOException: File already exists:s3://<mybucket_name>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED/part-m-00000

У меня есть очень простое приложение hadoop (в основном мой картограф), которое считывает каждую строку из файла и преобразует ее (используя существующую библиотеку).

Не уверен, почему каждый узел пытается писать с одним и тем же именем файла.

Вот маппер

public static class TokenizeMapper extends Mapper<Object,Text,Text,Text>{
        public void map(Object key, Text value,Mapper.Context context) throws IOException,InterruptedException{
            //TODO: Invoke Core Engine to transform the Data
            Encryption encryption = new Encryption();
            String tokenizedVal = encryption.apply(value.toString());
            context.write(tokenizedVal,1);
        }
    }

Любой мой Редуктор

public static class TokenizeReducer extends Reducer<Text,Text,Text,Text> {
        public void reduce(Text text,Iterable<Text> lines,Context context) throws IOException,InterruptedException{
            Iterator<Text> iterator = lines.iterator();
            int counter =0;
            while(iterator.hasNext()){
                counter++;
            }

            Text output = new Text(""+counter);
            context.write(text,output);
        }
    }

И мой основной класс

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        long startTime = System.currentTimeMillis();
        try {
            Configuration config = new Configuration();
            String[] additionalArgs = new GenericOptionsParser(config, args).getRemainingArgs();

            if (additionalArgs.length != 2) {
                System.err.println("Usage: Tokenizer Input_File and Output_File ");
                System.exit(2);
            }


            Job job = Job.getInstance(config, "Raw File Tokenizer");
            job.setJarByClass(Tokenizer.class);
            job.setMapperClass(TokenizeMapper.class);
            job.setReducerClass(TokenizeReducer.class);

            job.setNumReduceTasks(0);
            job.setOutputKeyClass(Text.class);
            job.setOutputKeyClass(Text.class);

            FileInputFormat.addInputPath(job, new Path(additionalArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(additionalArgs[1]));

            boolean status = job.waitForCompletion(true);
            if (status) {
                //System.exit(0);
                System.out.println("Completed Job Successfully");
            } else {
                System.out.println("Job did not Succeed");
            }
        }
        catch(Exception e){
            e.printStackTrace();
        }
        finally{
            System.out.println("Total Time for processing =["+(System.currentTimeMillis()-startTime)+"]");
        }
    }

Я передаю аргументы при запуске кластера как

s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt

s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED

Цените любые входные данные.

Спасибо


person Sateesh Kommineni    schedule 13.04.2018    source источник
comment
Просто вопрос, зачем хранить местоположение текстового файла (RawFile12.txt) в пути к выходному файлу? Нельзя ли убрать эту часть?   -  person Deepan Ram    schedule 13.04.2018
comment
Я могу удалить это. не думаю, что это проблема, поскольку ключ отличается, даже если он имеет имя файла. Более того, папка создается. К тому же я удалил имя файла из пути и снова столкнулся с той же проблемой.   -  person Sateesh Kommineni    schedule 13.04.2018


Ответы (1)


В коде драйвера вы установили Reducer равным 0, тогда код редьюсера нам не нужен.

Если вам нужно очистить выходной каталог перед запуском задания, вы можете использовать этот фрагмент, чтобы очистить каталог, если он существует: -

    FileSystem fileSystem = FileSystem.get(<hadoop config object>);

    if(fileSystem.exists(new Path(<pathTocheck>)))
    {
        fileSystem.delete(new Path(<pathTocheck>), true);
    }
person Deepan Ram    schedule 13.04.2018