博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Custom KeyValueTextInputFormat
阅读量:2394 次
发布时间:2019-05-10

本文共 6654 字,大约阅读时间需要 22 分钟。

在看老版的API时,发现旧的KeyValueTextInputFormat的作者基本上都是拿算法自己写,hadoop源码的很多地方都是不会拿现成的api来用,都是自己定义,这样做对性能的可控性是很强,这也折射出国外程序员跟国内程序员的差异,国内提倡拿来主义,国外可能更强调创新精神吧。

 

而我属于前者:拿来主义者

 

自定义的KeyValueInputFormat:

 

package cn.edu.xmu.dm.mpdemo.ioformat;import java.io.IOException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.util.LineReader;/** * desc: custom KeyValueInputFormat * DMKeyValueInputFormat *  * @author chenwq (irwenqiang@gmail.com) * @version 1.0 2012/05/19 */public class DMKeyValueInputFormat extends FileInputFormat
{ protected static class KeyVRecordReader extends RecordReader
{ private static final Log LOG = LogFactory .getLog(KeyVRecordReader.class); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private Text key = null; private Text value = null; private String separator = "\t"; @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt( "mapred.linerecordreader.maxlength", Integer.MAX_VALUE); this.separator = job.get("key.value.separator.in.input.line", "\t"); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; if (codec != null) { in = new LineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; --start; fileIn.seek(start); } in = new LineReader(fileIn, job); } if (skipFirstLine) { start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; } @Override public synchronized void close() throws IOException { if (in != null) { in.close(); } } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } @Override public boolean nextKeyValue() throws IOException, InterruptedException { Text line = new Text(); if (key == null) { key = new Text(); } if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(line, maxLineLength, Math.max( (int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); // 此处添加额外处理即可,其他地方与TextInputFormat一样。 if (null != line) { String[] kv = line.toString().split(this.separator); if (kv.length == 2) { key.set(kv[0]); value.set(kv[1]); } else { LOG.info("Skipped line has no separator"); key.set(line.toString()); value.set(""); } } if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } } @Override public RecordReader
createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new KeyVRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path file) { CompressionCodec codec = new CompressionCodecFactory( context.getConfiguration()).getCodec(file); return codec == null; }}

 

测试定义的KeyValueInputForamt:

package cn.edu.xmu.dm.mpdemo.ioformat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * desc: Test custom KeyValueTextInputFormat * KeyValueTextInputFormatDemo *  * @author chenwq (irwenqiang@gmail.com) * @version 1.0 2012/05/19 */public class KeyValueTextInputFormatDemo extends Configured implements Tool {		public static class KVMapper extends Mapper
{ private final static Logger LOG = LoggerFactory .getLogger(KVMapper.class); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { System.out.println(key); System.out.println(value); LOG.info(key.toString()); LOG.info(value.toString()); context.write(key, value); } } @Override public int run(String[] args) throws Exception { String input = "input"; String output = "output"; Path inputDir = new Path(input); Path outputDir = new Path(output); Configuration conf = new Configuration(); Job job = new Job(conf, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); job.setMapperClass(KVMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(DMKeyValueInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, inputDir); FileOutputFormat.setOutputPath(job, outputDir); return job.waitForCompletion(true) ? 0 : -1; } public static void main(String[] args) throws Exception { ToolRunner.run(new KeyValueTextInputFormatDemo(), args); }}
 

 

 

 

转载地址:http://ztwob.baihongyu.com/

你可能感兴趣的文章
Treap
查看>>
编译原理 词法分析
查看>>
计算机系统结构 计算机指令集结构
查看>>
计算机系统结构 输入/输出系统
查看>>
信息安全技术及应用 常规加密技术
查看>>
02-线性结构1 两个有序链表序列的合并
查看>>
HDU 1080 DP LCS
查看>>
HDU 3308 线段树+区间合并
查看>>
ASP.NET 入手页面控件及事件触发
查看>>
HDU 4123 树状DP+RMQ
查看>>
vim配置文件(持续更新)
查看>>
Fedora 16下添加终端快捷键
查看>>
HDU 4001 DP LIS
查看>>
HDU 4023 贪心+博弈
查看>>
HDU 4036 物理坑爹题
查看>>
Linux文件解压命令汇总(持续更新)
查看>>
HDU 4046 树状数组
查看>>
HDU 4034 图论 Floyd
查看>>
HDU 4027 线段树
查看>>
HDU 4049 状态压缩DP
查看>>