It’s been a while since my last post, which is a little disappointing. On the up side, the reason for my absence has not been because I am slothful, it is because I have started on a new project and I am deep in the world of ‘Big Data’ !!!!
Over the past three/four months I have been getting to grips with DB2, initially I couldn’t stand it but it is growing on me now. Especially when I switch back to MS SQL Server… But that is another conversation.
Excitingly on the horizon, in fact fast approaching, is Hadoop. So I have started playing with this technology, “joined” the Hadoop user group (well when I say join, I have started attending the meetings) and quite enjoying it all.
I thought I would share some of what I have learned so far, in the hope others will benefit. I am working through multiple exercises but I liked this one because it was fairly straight forward and yet showed you how to write your own Mapper and Reducer in Java and then in Python!
In the first instance I found an amazing blog / tutorial on setting up a Hadoop server and writing a mapper and reducer in Python. Both amazingly useful posts, really appreciate the publication of them.
Ok, lets get on with it. A simple exercise – the problem:
Write a MapReduce job that reads any Text input and computes the average length of all words that start with each character.
The example, we will use is:
Now is definitely the time
With the output looking like:
N 3
d 10
I 2
t 3.5
So I will provide you with two solutions for this, one in Java (the native language) and the other in Python.
I used Netbeans to write my Java code, in the first instance lets look at the main section, it is basically the same as the WordCount main you will see everywhere. In this case, yes I did hardcode the values – just for the sake of ease.
public static void main(String[] args)
{
JobClient client = new JobClient();
JobConf conf = new JobConf(AvgLen.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(conf, new Path("avg/text.txt"));
FileOutputFormat.setOutputPath(conf, new Path("avgout"));
conf.setMapperClass(AvgLenMapper.class);
conf.setReducerClass(AvgLenReducer.class);
conf.setCombinerClass(AvgLenReducer.class);
client.setConf(conf);
try
{
JobClient.runJob(conf);
}
catch (Exception e)
{
e.printStackTrace();
}
System.out.println("Done!");
}
One thing to point out here is the use of the DoubleWritable class, because we need to use doubes, or longs I suppose, as part of our calculation rather than ints.
Now we have the Mapper:
public class AvgLenMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, DoubleWritable>
{
private DoubleWritable oned = new DoubleWritable(1.0);
private Text word = new Text();
public void map(LongWritable l, Text t, OutputCollector o, Reporter r) throws IOException
{
String line = t.toString();
int counter = 0;
for (int i = 0; i < line.length(); i++)
{
if (line.charAt(i) == ' ')
{
word.set((line.substring(counter, i)).substring(0,1));
oned.set(i-counter);
counter = i+1;
o.collect(word, oned);
}
}
word.set((line.substring(counter)).substring(0,1));
oned.set(line.length()-counter);
o.collect(word, oned);
}
}
Here we just collect the first letter of each word and the length of the word in our OutputCollector. The contents of this would be:
N 3
d 10
I 2
t 3
t 4
This is ready to be reduced. So lets look at the reducer:
public class AvgLenReducer extends MapReduceBase
implements Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
{
double sum = 0;
double counter = 0.0;
while(values.hasNext())
{
DoubleWritable value = (DoubleWritable)values.next();
sum += value.get();
counter++;
}
output.collect(key, new DoubleWritable((sum/counter)));
}
}
Again, notice the use of DoubleWritable over IntWritable. Here we reduce down the resultset from the mapper. Pretty simple really until we get to t which would involve 7/2 which = 3.5
With this done, and compiled. We just execute it with
hadoop jar AvgLen a b
then we look at our output with:
hadoop fs -cat avgout/part-00000
Nice.
So lets do the same but with Python!
Our Mapper:
#!/usr/bin/env python import sys for line in sys.stdin: line = line.strip() words = line.split() for word in words: print '%s\t%s' % (word[0:1], len(word))
Our Reducer:
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0.0
counter = 1.0
word = None
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
counter = counter + 1
else:
current_count = current_count / counter
if current_word:
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
counter = 1.0
if current_word == word:
current_count = current_count / counter
print '%s\t%s' % (current_word, current_count)
You can test your code, before involving hadoop with:
echo "Now is definitely the time" | /usr/local/hadoop/tmp/MyAvgMapper.py | sort | /usr/local/hadoop/tmp/MyAvgReducer.py
And if you are happy with the result, submit it to Hadoop with:
hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /usr/local/hadoop/tmp/MyAvgMapper.py -mapper /usr/local/hadoop/tmp/MyAvgMapper.py -file /usr/local/hadoop/tmp/MyAvgReducer.py -reducer /usr/local/hadoop/tmp/MyAvgReducer.py -input /user/hduser/avg/text.txt -output /user/hduser/pythonAvgTest
This is command just rolls off the keyboard doesn’t it
Note I uploaded the file text.txt which contained the test text.
I hope this proves useful.



