Posted by : Sushanth Thursday 24 December 2015


Input:  
France    Europe   female   45000
France    Europe   male       55000
Spain      Europe   female   65000
Spain      Europe   male       75000
India      Asia        female   155000
India      Asia        male       165000

Program:
package com.countrypart;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CountryDriver extends Configured implements Tool {
                public int run(String[] args) throws Exception {
                                // getting configuration object and setting job name
                                Job partition_job = getJobConfPartition();

                                String input1 = args[0];
                                // String input2 = args[1];
                                String output = args[1];

                                FileSystem fs = FileSystem.getLocal(partition_job.getConfiguration());
                                Path opPath = new Path(output);
                                fs.delete(opPath, true);

                                // FileInputFormat.setInputPaths(partition_job, new Path(input1),new
                                // Path(input2)); // setting
                                FileInputFormat.setInputPaths(partition_job, new Path(input1)); // setting

                                // the input
                                // files for
                                // the job
                                FileOutputFormat.setOutputPath(partition_job, new Path(output)); // setting
                                // the
                                // output
                                // files
                                // for
                                // the
                                // job

                                partition_job.waitForCompletion(true);

                                return 0;

                }

                protected abstract class JobInfo {
                                public abstract Class<?> getJarByClass();

                                public abstract Class<? extends Mapper> getMapperClass();

                                public abstract Class<? extends Reducer> getCombinerClass();

                                public abstract Class<? extends Reducer> getReducerClass();

                                public abstract Class<?> getOutputKeyClass();

                                public abstract Class<?> getOutputValueClass();

                }

                // method to get job configuration for the customized partitioning MapReduce
                // program
                private Job getJobConfPartition() throws Exception {

                                JobInfo jobInfo = new JobInfo() {
                                                @Override
                                                public Class<? extends Reducer> getCombinerClass() {
                                                                return null;
                                                }

                                                @Override
                                                public Class<?> getJarByClass() {
                                                                return CountryDriver.class;
                                                }

                                                @Override
                                                public Class<? extends Mapper> getMapperClass() {
                                                                return CountryMapper.class;
                                                }

                                                public Class<?> getOutputKeyClass() {
                                                                return Text.class;
                                                }

                                                public Class<?> getOutputValueClass() {
                                                                return Text.class;
                                                }

                                                public Class<? extends Reducer> getReducerClass() {
                                                                return CountryReducer.class;
                                                }
                                };

                                Job job = setupJob("CountryPartition", jobInfo);
                                job.setPartitionerClass(CountryPartitioner.class);
                                // job.setPartitionerClass(HashPartitioner.class);
                                job.setInputFormatClass(TextInputFormat.class);

                                return job;
                }

                protected Job setupJob(String jobName, JobInfo jobInfo) throws Exception {

                                Job job = new Job(new Configuration(), jobName);

                                // set the several classes
                                job.setJarByClass(jobInfo.getJarByClass());

                                // set the mapper class
                                job.setMapperClass(jobInfo.getMapperClass());

                                // the combiner class is optional, so set it only if it is required by
                                // the program
                                if (jobInfo.getCombinerClass() != null)
                                                job.setCombinerClass(jobInfo.getCombinerClass());

                                // set the reducer class
                                job.setReducerClass(jobInfo.getReducerClass());

                                // the number of reducers is set to 3, this can be altered according to
                                // the program's requirements
                                job.setNumReduceTasks(2);

                                // set the type of the output key and value for the Map & Reduce
                                // functions
                                job.setOutputKeyClass(jobInfo.getOutputKeyClass());
                                job.setOutputValueClass(jobInfo.getOutputValueClass());

                                return job;
                }

                public static void main(String[] args) throws Exception {
                                int res = ToolRunner
                                                                .run(new Configuration(), new CountryDriver(), args);
                                System.exit(res);
                }
}

package com.countrypart;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class CountryPartitioner extends Partitioner<Text, Text> {
                public int getPartition(Text key, Text value, int numReduceTasks) {
                                String[] countrycensus = value.toString().split("\t");
                                String country = countrycensus[0];
                                // int census = Integer.parseInt(age);
                                System.out.println(numReduceTasks);
                                System.out.println(country);
                                // this is done to avoid performing mod with 0
                                if (numReduceTasks == 0)
                                                return 0;
                               
                                if (country.equals("France")) {
                                                return 0;
                                }
                                if (country.equals("Spain")) {
                                                return 1 % numReduceTasks;
                                } else

                                {
                                                return 2 % numReduceTasks;
                                }

                }
}
package com.countrypart;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class CountryMapper extends Mapper<Object, Text, Text, Text> {
                public void map(Object key, Text value, Context context)
                                                throws IOException, InterruptedException {
                                if (value.toString().length() > 0) {
                                                String[] tokens = value.toString().split("\t");
                                                System.out.println(tokens[0].toString() + " "
                                                                                + tokens[1].toString());
                                                System.out.println(value.toString());
                                                String gender = tokens[2].toString();
                                                String countrycensus = tokens[0] + "\t" + tokens[3];
                                                // String gendercensus = tokens[2]+"\t"+tokens[3];
                                                // the mapper emits key, value pair where the key is the gender and
                                                // the value is the other information which includes name, age and
                                                // score
                                                context.write(new Text(gender), new Text(countrycensus));
                                }
                }
}

package com.countrypart;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//The data belonging to the same partition go to the same reducer. In a particular partition, all the values with the same key are iterated and the person with the maximum score is found.
//Therefore the output of the reducer will contain the male and female maximum scorers in each of the 3 age categories.
// the type parameters are the input keys type, the input values type, the
// output keys type, the output values type
public class CountryReducer extends Reducer<Text, Text, Text, Text> {
                int census, pcount = 0;

                @Override
                public void reduce(Text key, Iterable<Text> values, Context context)
                                                throws IOException, InterruptedException {
                                // int maxScore = Integer.MIN_VALUE;
                                String gender = " ";
                                // String census = " ";
                                // String gender = " ";

                                // iterating through the values corresponding to a particular key
                                for (Text val : values) {
                                                String[] valTokens = val.toString().split("\t");
                                                pcount = Integer.parseInt(valTokens[1]);
                                                census = pcount + census;
                                                // if the new score is greater than the current maximum score,
                                                // update the fields as they will be the output of the reducer after
                                                // all the values are processed for a particular key

                                }
                                gender = key.toString();
                                context.write(new Text(gender), new Text("census- " + census));
                                pcount = 0;
                                census = 0;
                }

}



Output:

Leave a Reply

Subscribe to Posts | Subscribe to Comments

- Copyright © Technical Articles - Skyblue - Powered by Blogger - Designed by Johanes Djogan -