У меня есть задание Hadoop, работающее в EMR, и я передаю путь S3 в качестве ввода и вывода для этого задания.
Когда я запускаю локально, все работает нормально (поскольку есть один узел)
Однако, когда я запускаю EMR с кластером из 5 узлов, я запускаю исключение ввода-вывода «Файл уже существует».
Выходной путь имеет отметку времени, поэтому выходной путь не существует в S3.
Error: java.io.IOException: File already exists:s3://<mybucket_name>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED/part-m-00000
У меня есть очень простое приложение hadoop (в основном мой картограф), которое считывает каждую строку из файла и преобразует ее (используя существующую библиотеку).
Не уверен, почему каждый узел пытается писать с одним и тем же именем файла.
Вот маппер
public static class TokenizeMapper extends Mapper<Object,Text,Text,Text>{
public void map(Object key, Text value,Mapper.Context context) throws IOException,InterruptedException{
//TODO: Invoke Core Engine to transform the Data
Encryption encryption = new Encryption();
String tokenizedVal = encryption.apply(value.toString());
context.write(tokenizedVal,1);
}
}
Любой мой Редуктор
public static class TokenizeReducer extends Reducer<Text,Text,Text,Text> {
public void reduce(Text text,Iterable<Text> lines,Context context) throws IOException,InterruptedException{
Iterator<Text> iterator = lines.iterator();
int counter =0;
while(iterator.hasNext()){
counter++;
}
Text output = new Text(""+counter);
context.write(text,output);
}
}
И мой основной класс
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
long startTime = System.currentTimeMillis();
try {
Configuration config = new Configuration();
String[] additionalArgs = new GenericOptionsParser(config, args).getRemainingArgs();
if (additionalArgs.length != 2) {
System.err.println("Usage: Tokenizer Input_File and Output_File ");
System.exit(2);
}
Job job = Job.getInstance(config, "Raw File Tokenizer");
job.setJarByClass(Tokenizer.class);
job.setMapperClass(TokenizeMapper.class);
job.setReducerClass(TokenizeReducer.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
FileInputFormat.addInputPath(job, new Path(additionalArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(additionalArgs[1]));
boolean status = job.waitForCompletion(true);
if (status) {
//System.exit(0);
System.out.println("Completed Job Successfully");
} else {
System.out.println("Job did not Succeed");
}
}
catch(Exception e){
e.printStackTrace();
}
finally{
System.out.println("Total Time for processing =["+(System.currentTimeMillis()-startTime)+"]");
}
}
Я передаю аргументы при запуске кластера как
s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt
s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED
Цените любые входные данные.
Спасибо