Archive for the ‘Python’ Category

Hadoop – examples of Java and Python Map and Reduce for the average word length

September 24, 2011

It’s been a while since my last post, which is a little disappointing. On the up side, the reason for my absence has not been because I am slothful, it is because I have started on a new project and I am deep in the world of ‘Big Data’ !!!!

Over the past three/four months I have been getting to grips with DB2, initially I couldn’t stand it but it is growing on me now. Especially when I switch back to MS SQL Server… But that is another conversation.

Excitingly on the horizon, in fact fast approaching, is Hadoop. So I have started playing with this technology, โ€œjoinedโ€ the Hadoop user group (well when I say join, I have started attending the meetings) and quite enjoying it all.

I thought I would share some of what I have learned so far, in the hope others will benefit. I am working through multiple exercises but I liked this one because it was fairly straight forward and yet showed you how to write your own Mapper and Reducer in Java and then in Python!

In the first instance I found an amazing blog / tutorial on setting up a Hadoop server and writing a mapper and reducer in Python. Both amazingly useful posts, really appreciate the publication of them.

Ok, lets get on with it. A simple exercise โ€“ the problem:

Write a MapReduce job that reads any Text input and computes the average length of all words that start with each character.

The example, we will use is:

Now is definitely the time

With the output looking like:

N 3
d 10
I 2
t 3.5

So I will provide you with two solutions for this, one in Java (the native language) and the other in Python.

I used Netbeans to write my Java code, in the first instance lets look at the main section, it is basically the same as the WordCount main you will see everywhere. In this case, yes I did hardcode the values โ€“ just for the sake of ease.

public static void main(String[] args)
    {
        JobClient client = new JobClient();

        JobConf conf = new JobConf(AvgLen.class);


        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(DoubleWritable.class);
        
        FileInputFormat.addInputPath(conf, new Path("avg/text.txt"));
        FileOutputFormat.setOutputPath(conf, new Path("avgout"));
        
        conf.setMapperClass(AvgLenMapper.class);

        conf.setReducerClass(AvgLenReducer.class);
        conf.setCombinerClass(AvgLenReducer.class);

        client.setConf(conf);

         try
        {
            JobClient.runJob(conf);

        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        System.out.println("Done!");
    }

One thing to point out here is the use of the DoubleWritable class, because we need to use doubes, or longs I suppose, as part of our calculation rather than ints.

Now we have the Mapper:

public class AvgLenMapper extends MapReduceBase
        implements Mapper<LongWritable, Text, Text, DoubleWritable>
{
    private DoubleWritable oned = new DoubleWritable(1.0);
    private Text word = new Text();

    public void map(LongWritable l, Text t, OutputCollector o, Reporter r) throws IOException
    {
        String line = t.toString();

        int counter = 0;
        
        for (int i = 0; i < line.length(); i++)
        {

            if (line.charAt(i) == ' ')
            {
                word.set((line.substring(counter, i)).substring(0,1));
                oned.set(i-counter);
                counter = i+1;
                o.collect(word, oned);
            }

        }

        word.set((line.substring(counter)).substring(0,1));
        oned.set(line.length()-counter);
        o.collect(word, oned);
    }
}

Here we just collect the first letter of each word and the length of the word in our OutputCollector. The contents of this would be:

N 3
d 10
I 2
t 3
t 4

This is ready to be reduced. So lets look at the reducer:

public class AvgLenReducer extends MapReduceBase
        implements Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
    public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
    {
        double sum = 0;
        double counter = 0.0;

        while(values.hasNext())
        {
            DoubleWritable value = (DoubleWritable)values.next();
            sum += value.get();
            counter++;
        }
        output.collect(key, new DoubleWritable((sum/counter)));
        
        
    }
}

Again, notice the use of DoubleWritable over IntWritable. Here we reduce down the resultset from the mapper. Pretty simple really until we get to t which would involve 7/2 which = 3.5

With this done, and compiled. We just execute it with

hadoop jar AvgLen a b

then we look at our output with:

hadoop fs -cat avgout/part-00000

Nice.

So lets do the same but with Python!

Our Mapper:

#!/usr/bin/env python



import sys



for line in sys.stdin:



	line = line.strip()



	words = line.split()



	for word in words:



		print '%s\t%s' % (word[0:1], len(word))

Our Reducer:

#!/usr/bin/env python



from operator import itemgetter

import sys



current_word = None

current_count = 0.0

counter = 1.0

word = None



for line in sys.stdin:

	

	line = line.strip()



	word, count = line.split('\t', 1)



	try:

		count = int(count)

	except ValueError:

		continue



	if current_word == word:

		current_count += count

		counter = counter + 1

	else:

		current_count = current_count / counter

		if current_word:

			print '%s\t%s' % (current_word, current_count)

		current_count = count

		current_word = word

		counter = 1.0



if current_word == word:

	current_count = current_count / counter

	print '%s\t%s' % (current_word, current_count)

You can test your code, before involving hadoop with:

echo "Now is definitely the time" | /usr/local/hadoop/tmp/MyAvgMapper.py | sort | /usr/local/hadoop/tmp/MyAvgReducer.py

And if you are happy with the result, submit it to Hadoop with:

hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /usr/local/hadoop/tmp/MyAvgMapper.py -mapper /usr/local/hadoop/tmp/MyAvgMapper.py  -file /usr/local/hadoop/tmp/MyAvgReducer.py -reducer /usr/local/hadoop/tmp/MyAvgReducer.py -input /user/hduser/avg/text.txt -output /user/hduser/pythonAvgTest

This is command just rolls off the keyboard doesn’t it ๐Ÿ™‚

Note I uploaded the file text.txt which contained the test text.

I hope this proves useful.

Advertisements