Posted by : Sushanth Thursday 24 December 2015

France    Europe   female   45000
France    Europe   male       55000
Spain      Europe   female   65000
Spain      Europe   male       75000
India      Asia        female   155000
India      Asia        male       165000

package com.countrypart;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CountryDriver extends Configured implements Tool {
                public int run(String[] args) throws Exception {
                                // getting configuration object and setting job name
                                Job partition_job = getJobConfPartition();

                                String input1 = args[0];
                                // String input2 = args[1];
                                String output = args[1];

                                FileSystem fs = FileSystem.getLocal(partition_job.getConfiguration());
                                Path opPath = new Path(output);
                                fs.delete(opPath, true);

                                // FileInputFormat.setInputPaths(partition_job, new Path(input1),new
                                // Path(input2)); // setting
                                FileInputFormat.setInputPaths(partition_job, new Path(input1)); // setting

                                // the input
                                // files for
                                // the job
                                FileOutputFormat.setOutputPath(partition_job, new Path(output)); // setting
                                // the
                                // output
                                // files
                                // for
                                // the
                                // job


                                return 0;


                protected abstract class JobInfo {
                                public abstract Class<?> getJarByClass();

                                public abstract Class<? extends Mapper> getMapperClass();

                                public abstract Class<? extends Reducer> getCombinerClass();

                                public abstract Class<? extends Reducer> getReducerClass();

                                public abstract Class<?> getOutputKeyClass();

                                public abstract Class<?> getOutputValueClass();


                // method to get job configuration for the customized partitioning MapReduce
                // program
                private Job getJobConfPartition() throws Exception {

                                JobInfo jobInfo = new JobInfo() {
                                                public Class<? extends Reducer> getCombinerClass() {
                                                                return null;

                                                public Class<?> getJarByClass() {
                                                                return CountryDriver.class;

                                                public Class<? extends Mapper> getMapperClass() {
                                                                return CountryMapper.class;

                                                public Class<?> getOutputKeyClass() {
                                                                return Text.class;

                                                public Class<?> getOutputValueClass() {
                                                                return Text.class;

                                                public Class<? extends Reducer> getReducerClass() {
                                                                return CountryReducer.class;

                                Job job = setupJob("CountryPartition", jobInfo);
                                // job.setPartitionerClass(HashPartitioner.class);

                                return job;

                protected Job setupJob(String jobName, JobInfo jobInfo) throws Exception {

                                Job job = new Job(new Configuration(), jobName);

                                // set the several classes

                                // set the mapper class

                                // the combiner class is optional, so set it only if it is required by
                                // the program
                                if (jobInfo.getCombinerClass() != null)

                                // set the reducer class

                                // the number of reducers is set to 3, this can be altered according to
                                // the program's requirements

                                // set the type of the output key and value for the Map & Reduce
                                // functions

                                return job;

                public static void main(String[] args) throws Exception {
                                int res = ToolRunner
                                                                .run(new Configuration(), new CountryDriver(), args);

package com.countrypart;

import org.apache.hadoop.mapreduce.Partitioner;

public class CountryPartitioner extends Partitioner<Text, Text> {
                public int getPartition(Text key, Text value, int numReduceTasks) {
                                String[] countrycensus = value.toString().split("\t");
                                String country = countrycensus[0];
                                // int census = Integer.parseInt(age);
                                // this is done to avoid performing mod with 0
                                if (numReduceTasks == 0)
                                                return 0;
                                if (country.equals("France")) {
                                                return 0;
                                if (country.equals("Spain")) {
                                                return 1 % numReduceTasks;
                                } else

                                                return 2 % numReduceTasks;

package com.countrypart;


import org.apache.hadoop.mapreduce.Mapper;

public class CountryMapper extends Mapper<Object, Text, Text, Text> {
                public void map(Object key, Text value, Context context)
                                                throws IOException, InterruptedException {
                                if (value.toString().length() > 0) {
                                                String[] tokens = value.toString().split("\t");
                                                System.out.println(tokens[0].toString() + " "
                                                                                + tokens[1].toString());
                                                String gender = tokens[2].toString();
                                                String countrycensus = tokens[0] + "\t" + tokens[3];
                                                // String gendercensus = tokens[2]+"\t"+tokens[3];
                                                // the mapper emits key, value pair where the key is the gender and
                                                // the value is the other information which includes name, age and
                                                // score
                                                context.write(new Text(gender), new Text(countrycensus));

package com.countrypart;

import org.apache.hadoop.mapreduce.Reducer;

//The data belonging to the same partition go to the same reducer. In a particular partition, all the values with the same key are iterated and the person with the maximum score is found.
//Therefore the output of the reducer will contain the male and female maximum scorers in each of the 3 age categories.
// the type parameters are the input keys type, the input values type, the
// output keys type, the output values type
public class CountryReducer extends Reducer<Text, Text, Text, Text> {
                int census, pcount = 0;

                public void reduce(Text key, Iterable<Text> values, Context context)
                                                throws IOException, InterruptedException {
                                // int maxScore = Integer.MIN_VALUE;
                                String gender = " ";
                                // String census = " ";
                                // String gender = " ";

                                // iterating through the values corresponding to a particular key
                                for (Text val : values) {
                                                String[] valTokens = val.toString().split("\t");
                                                pcount = Integer.parseInt(valTokens[1]);
                                                census = pcount + census;
                                                // if the new score is greater than the current maximum score,
                                                // update the fields as they will be the output of the reducer after
                                                // all the values are processed for a particular key

                                gender = key.toString();
                                context.write(new Text(gender), new Text("census- " + census));
                                pcount = 0;
                                census = 0;



Leave a Reply

Subscribe to Posts | Subscribe to Comments

- Copyright © Technical Articles - Skyblue - Powered by Blogger - Designed by Johanes Djogan -