博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce 依赖组合
阅读量:5342 次
发布时间:2019-06-15

本文共 3677 字,大约阅读时间需要 12 分钟。

mport java.io.IOException;

import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Driver {
    public static class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    public static class IntSumReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    public static class DependenceMapper extends
            Mapper<Object, Text, Text, Text> {
        private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String []sep=value.toString().split("\t");
            word.set(sep[1]+"\t"+sep[0]);
            System.out.println(value.toString());
            context.write(word,new Text(""));
        }
    }
    public static class DependenceReducer extends
            Reducer<Text,Text,Text,Text> {
        public void reduce(Text key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            String[] sep = key.toString().split("\t");
            System.out.println( sep[0]+"++++++++="+ sep[1]);
            context.write(key,new Text(""));
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        //加入控制容器
        ControlledJob ctrljob1=new  ControlledJob(conf);
        ctrljob1.setJob(job);
        job.setJarByClass(Driver.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
//        job.waitForCompletion(true);
        Configuration conf2 = new Configuration();
        Job job2 = new Job(conf2, "word count1");
         ControlledJob ctrljob2=new ControlledJob(conf);
            ctrljob2.setJob(job2);
            ctrljob2.addDependingJob(ctrljob1);
        job2.setJarByClass(Driver.class);
        job2.setMapperClass(DependenceMapper.class);
        job2.setReducerClass(DependenceReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job2, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job2, new Path(otherArgs[2]));
    //    job2.waitForCompletion(true);
          JobControl jobCtrl=new JobControl("myctrl");
          
            //添加到总的JobControl里,进行控制
            jobCtrl.addJob(ctrljob1);
            jobCtrl.addJob(ctrljob2);
            jobCtrl.run();
            
    }
}

转载于:https://www.cnblogs.com/1130136248wlxk/p/5011142.html

你可能感兴趣的文章
我说我在总结谁会信。。
查看>>
数据库索引的作用和长处缺点
查看>>
Laravel 安装代码智能提示扩展「laravel-ide-helper」
查看>>
java开发配套版本
查看>>
MySQL的 Grant命令权限分配
查看>>
非阻塞的c/s,epoll服务器模型
查看>>
YII框架安装过程总结
查看>>
HDOJ(HDU) 1862 EXCEL排序(类对象的快排)
查看>>
Codeforces Round #381 (Div. 2) 复习倍增//
查看>>
Money类型转化为String去除小数点后0解决方法
查看>>
ArcScene 高程不同的表面无法叠加
查看>>
[ONTAK2010] Peaks
查看>>
DLL 导出函数
查看>>
windows超过最大连接数解决命令
查看>>
12个大调都是什么
查看>>
angular、jquery、vue 的区别与联系
查看>>
参数范围的选择
查看>>
使用 MarkDown & DocFX 升级 Rafy 帮助文档
查看>>
THUPC2019/CTS2019/APIO2019游记
查看>>
Nodejs Express模块server.address().address为::
查看>>