首页 > 开发 > Java > 正文

java 中自定义OutputFormat的实例详解

2024-07-13 10:11:10
字体:
来源:转载
供稿:网友

java/275090.html">java 中 自定义OutputFormat的实例详解

实例代码:

package com.ccse.hadoop.outputformat;  import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.StringTokenizer;  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;   public class MySelfOutputFormatApp {      public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput";   public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput";   public final static String OUTPUT_FILENAME = "/abc";      public static void main(String[] args) throws IOException, URISyntaxException,      ClassNotFoundException, InterruptedException {     Configuration conf = new Configuration();     FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);     fileSystem.delete(new Path(OUTPUT_PATH), true);          Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName());     job.setJarByClass(MySelfOutputFormatApp.class);          FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));     job.setMapperClass(MyMapper.class);     job.setMapOutputKeyClass(Text.class);     job.setMapOutputValueClass(LongWritable.class);          job.setReducerClass(MyReducer.class);     job.setOutputKeyClass(Text.class);     job.setOutputValueClass(LongWritable.class);     job.setOutputFormatClass(MyselfOutputFormat.class);          job.waitForCompletion(true);   }      public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {      private Text word = new Text();     private LongWritable writable = new LongWritable(1);          @Override     protected void map(LongWritable key, Text value,         Mapper<LongWritable, Text, Text, LongWritable>.Context context)         throws IOException, InterruptedException {       if (value != null) {         String line = value.toString();         StringTokenizer tokenizer = new StringTokenizer(line);         while (tokenizer.hasMoreTokens()) {           word.set(tokenizer.nextToken());           context.write(word, writable);         }       }     }        }      public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {      @Override     protected void reduce(Text key, Iterable<LongWritable> values,         Reducer<Text, LongWritable, Text, LongWritable>.Context context)         throws IOException, InterruptedException {       long sum = 0;        for (LongWritable value : values) {         sum += value.get();       }       context.write(key, new LongWritable(sum));     }   }    public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> {      private FSDataOutputStream outputStream = null;          @Override     public RecordWriter<Text, LongWritable> getRecordWriter(         TaskAttemptContext context) throws IOException,         InterruptedException {       try {         FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration());         //指定文件的输出路径         final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH                       + MySelfOutputFormatApp.OUTPUT_FILENAME);         this.outputStream = fileSystem.create(path, false);       } catch (URISyntaxException e) {         e.printStackTrace();       }       return new MySelfRecordWriter(outputStream);     }      @Override     public void checkOutputSpecs(JobContext context) throws IOException,         InterruptedException {     }      @Override     public OutputCommitter getOutputCommitter(TaskAttemptContext context)         throws IOException, InterruptedException {       return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context);     }        }      public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> {      private FSDataOutputStream outputStream = null;          public MySelfRecordWriter(FSDataOutputStream outputStream) {       this.outputStream = outputStream;     }          @Override     public void write(Text key, LongWritable value) throws IOException,         InterruptedException {       this.outputStream.writeBytes(key.toString());       this.outputStream.writeBytes("/t");       this.outputStream.writeLong(value.get());     }      @Override     public void close(TaskAttemptContext context) throws IOException,         InterruptedException {       this.outputStream.close();     }        }    } 

 2.OutputFormat是用于处理各种输出目的地的。

2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。

2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。

2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。

以上就是java 中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表