1901 461902 211903 481904 331905 431906 471907 311908 281909 261910 351911 301912 161913 291914 291915 51916 211917 221918 311919 271920 431921 341922 271923 26以上为结果package com.teset;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class AvgTemprature extends Configured implements Tool { public static class AvgMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { // map处理数据 String str = value.toString(); String year = null; int temprature = 0; StringTokenizer tokenstr = new StringTokenizer(str); int i = 0; while (tokenstr.hasMoreTokens()) { String tempstr = tokenstr.nextToken(); i++; if (i == 1) { year = tempstr; continue; } else if (i == 5 && Integer.parseInt(tempstr) != -9999) { temprature = Integer.parseInt(tempstr); context.write(new Text(year), new IntWritable(temprature)); break; } } } } public static class AvgReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { int sum=0; int count=0; for (IntWritable value : values) { sum=sum+value.get(); count++; } if (count!=0){ context.write(key, new IntWritable(sum/count)); } } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new AvgTemprature(), args); System.exit(res); } @Override public int run(String[] arg0) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "AvgTem");// 任务名 job.setJarByClass(AvgTemprature.class);// 指定class // 输入和输出流 job.setMapperClass(AvgMapper.class);// map job.setReducerClass(AvgReducer.class); FileInputFormat.addInputPath(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.setCombinerClass(AvgReducer.class); job.setOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; }}
版权声明:本文为博主原创文章,未经博主允许不得转载。