Archive for the ‘Java’ Category

Using Apache’s Sqoop as an API

October 2, 2014

Been doing some work with Sqoop recently and using the book Apache Sqoop Cookbook which I found very easy to follow. However, do we really want to do everything via the command line interface (CLI), I think not. Surely we also want the option to do it via an application programming interface (API).

I did some research and found some examples (here and here), so I thought I would document what I have done taking some of the examples from the book mentioned and turn these into code. I hope you find this useful.

Which Sqoop am I talking about, there are two!!!!

I am making reference to Sqoop 1, the original. The version I am using at the time of writing is 1.44.

Where do I get the API library from?

The library is sqoop-1.4.4.2.1.1.0-385.jar (well that is the library I am using). For me, it is included in the Hortonworks VirtualBox Hadoop Sandbox 2.1 I am using, it can be found in

 /usr/lib/sqoop/sqoop-1.4.4.2.1.1.0-385.jar 

What do I can?

You can do everything you can do at the CLI, but within your code rather than having to fork to the shell to sync data into HDFS.

How does it work?

I thought I would run through some of the examples in the book, but before doing that you need to look at “The Code” section below, as I have encapsulated the methods to try and make it easier. The book covers syncing a complete table, specifying the HDFS location you want the data in, compressing the data, placing a Hive table over the top, and the list goes on.

I will provide you with the API examples based on my code, as well as the CLI commands so you can get a good understanding. I am basing all of my examples on a simple MySQL database and table(s) I have created, these are really simple.

Syncing an entire table:

CLI:


sqoop import --connect jdbc:mysql://localhost/sqoop \
--username TheUsername \
--password ThePassword \
--table cities

API:


public static void main(String[] args) {

  setUp();

  TransferringEntireTable("cities_api");

  runIt();

}

 

Syncing an entire table, and specify the directory:

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop \
--username sqoop \
--password password \
--table cities \
--warehouse-dir /etc/input

API:

public static void main(String[] args) {

  setUp();

  TansferringEntireTableSpecificDir("cities_api","/etl/input");

  runIt();
}

Syncing an entire table, specifying the directory and compressing

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop \
--username sqoop \
--password password \
--table cities \
--compress \
--compression-codec org.apache.hadoop.io.compress.BZip2Codec \
--warehouse-dir /etl/input/compressed

API:

public static void main(String[] args) {

  setUp();

  CompressingImportedData("cities_api","/etl/input/compressed","org.hadoop.io.compress.BZip2Codec");

  runIt();
}

An incremental import of a table

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop \
--username sqoop \
--password password \
--table visits \
--incremental lastmodified \
--check-column last_update_date \
--last-value "2014-08-14 00:00:00"

API:

public static void main(String[] args) {

  setUp();

  incrementalImport("booking_api","/etl/input",IncrementalMode.DateLastModified,"last_modified","2014-05-14 00:00:00");

  runIt();
}

Syncing an entire table and putting a Hive table over the top.

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop_test \
--username sqoop \
--password password \
--table cities \
--hive-import \
--warehouse-dir /etl/input

API:

public static void main(String[] args) {

  setUp();

  TansferringEntireTableSpecificDirHiveMerge("cities_api","/etl/input");

  runIt();
}

Syncing a partitioned table and updating/creating the Hive table.

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop \
--username sqoop \
--password password \
--table visits_by_day \
--hive-import \
--hive-partition-key visit_day_partition \
--hive-partition-value "2014-10-01"

API:

public static void main(String[] args) {

  setUp();

  TransferringEntireTableSpecificDirHivePartition("visits_by_day","/etl/input","visit_day_partition", "2014-10-01");

  runIt();
}

Below is the code, you can see how I have encapsulated the methods to try and make the implementation as “simple” as possible.

The Code:


package sqoopexamples;

import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.tool.ImportTool;

public class SqoopExamples {

  private static SqoopOptions SqoopOptions = new SqoopOptions();
  private static final String connectionString = "jdbc:mysql://127.0.0.1/sqoop_test";
  private static final String username = "MySQL database username";
  private static final String password = "MySQL database password";

  private static void setUp() {
    SqoopOptions.setConnectString(connectionString);
    SqoopOptions.setUsername(username);
    SqoopOptions.setPassword(password);
  }

  private static int runIt() {
    int res;
    res = new ImportTool().run(SqoopOptions);
    if (res != 0) {
      throw new RuntimeException("Sqoop API Failed - return code : "+Integer.toString(res));
    }
    return res;
  }

  private static void TransferringEntireTable(String table) {
    SqoopOptions.setTableName(table);
  }

  private static void TansferringEntireTableSpecificDir(String table,
  String directory) {
    TransferringEntireTable(table);
    SqoopOptions.setWarehouseDir(directory);
  }

  private static void TansferringEntireTableSpecificDirHiveMerge(String table,
  String directory) {
    TansferringEntireTableSpecificDir(table,directory);
    SqoopOptions.setHiveImport(true);
  }

  private static void TansferringEntireTableSpecificDirHivePartitionMerge(String table
    , String directory
    , String partitionKey
    , String partitionValue) {
    TansferringEntireTableSpecificDirHiveMerge(table,directory);
    SqoopOptions.setHivePartitionKey(partitionKey);
    SqoopOptions.setHivePartitionValue(partitionValue);
  }

  private static void TansferringEntireTableWhereClause(String table,
    String whereClause) {
    //To do
  }

  private static void CompressingImportedData(String table, String directory
  ,String compress) {
    TansferringEntireTableSpecificDir(table,directory);
    SqoopOptions.setCompressionCodec(compress);
  }

  private static void incrementalImport(String table
    , String directory
    , IncrementalMode mode
    , String checkColumn
    , String lastVale) {
    TansferringEntireTableSpecificDir(table,directory);
    SqoopOptions.setIncrementalMode(mode);
    SqoopOptions.setAppendMode(true);
    SqoopOptions.setIncrementalTestColumn(checkColumn);
    SqoopOptions.setIncrementalLastValue(lastVale);
  }

  private static void TransferringEntireTableSpecificDirHive(String table,
                                                        String directory) {
    TransferringEntireTableSpecificDir(table,directory);
    SqoopOptions.setHiveImport(true);
  }
 
  private static void TransferringEntireTableSpecificDirHivePartition(String table,
                                                        String directory
                                                        , String partitionKey
                                                        , String partitionValue) {
    TransferringEntireTableSpecificDirHive(table,directory);
    SqoopOptions.setHivePartitionKey(partitionKey);
    SqoopOptions.setHivePartitionValue(partitionValue);
  }
}

And Finally

There is another way to use the Sqoop API, not one that excites me but I thought I would share it:

int performHdfsImport() throws SqoopException {

              SqoopTool tool = new ImportTool();
              String[] args = {
              "--connection-manager", "<em>driver</em>"
              , "--connect", "jdbc:db2://dle-db2edw01.dl.karmalab.net:50001/EXPDEV01"
              , "--username", "<em>username</em>"
              , "--password", "<em>password</em>"
              , "--table", "<em>table_name</em>"
              , "--target-dir", "/<em>directory</em>/<em>name</em>"
              };

              SqoopOptions options = new SqoopOptions();

              try {
                options = tool.parseArguments(args, null, options, false);
                tool.validateOptions(options);
              } catch (Exception e) {
                    System.err.println(e.getMessage());
                throw new SqoopException(e.message);
            }
            tool.run(options);
           }
   return tool.run(options);
}

The reason I am not that keen on this is, it doesn’t really differ from running it via the CLI, I like having the control of setting the values via the methods with this you would have plenty of final strings as the keys and then assigning the corresponding values and then populating the string array to pass into the method. Not my cup of tea.

Advertisements

Hadoop – examples of Java and Python Map and Reduce for the average word length

September 24, 2011

It’s been a while since my last post, which is a little disappointing. On the up side, the reason for my absence has not been because I am slothful, it is because I have started on a new project and I am deep in the world of ‘Big Data’ !!!!

Over the past three/four months I have been getting to grips with DB2, initially I couldn’t stand it but it is growing on me now. Especially when I switch back to MS SQL Server… But that is another conversation.

Excitingly on the horizon, in fact fast approaching, is Hadoop. So I have started playing with this technology, “joined” the Hadoop user group (well when I say join, I have started attending the meetings) and quite enjoying it all.

I thought I would share some of what I have learned so far, in the hope others will benefit. I am working through multiple exercises but I liked this one because it was fairly straight forward and yet showed you how to write your own Mapper and Reducer in Java and then in Python!

In the first instance I found an amazing blog / tutorial on setting up a Hadoop server and writing a mapper and reducer in Python. Both amazingly useful posts, really appreciate the publication of them.

Ok, lets get on with it. A simple exercise – the problem:

Write a MapReduce job that reads any Text input and computes the average length of all words that start with each character.

The example, we will use is:

Now is definitely the time

With the output looking like:

N 3
d 10
I 2
t 3.5

So I will provide you with two solutions for this, one in Java (the native language) and the other in Python.

I used Netbeans to write my Java code, in the first instance lets look at the main section, it is basically the same as the WordCount main you will see everywhere. In this case, yes I did hardcode the values – just for the sake of ease.

public static void main(String[] args)
    {
        JobClient client = new JobClient();

        JobConf conf = new JobConf(AvgLen.class);


        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(DoubleWritable.class);
        
        FileInputFormat.addInputPath(conf, new Path("avg/text.txt"));
        FileOutputFormat.setOutputPath(conf, new Path("avgout"));
        
        conf.setMapperClass(AvgLenMapper.class);

        conf.setReducerClass(AvgLenReducer.class);
        conf.setCombinerClass(AvgLenReducer.class);

        client.setConf(conf);

         try
        {
            JobClient.runJob(conf);

        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        System.out.println("Done!");
    }

One thing to point out here is the use of the DoubleWritable class, because we need to use doubes, or longs I suppose, as part of our calculation rather than ints.

Now we have the Mapper:

public class AvgLenMapper extends MapReduceBase
        implements Mapper<LongWritable, Text, Text, DoubleWritable>
{
    private DoubleWritable oned = new DoubleWritable(1.0);
    private Text word = new Text();

    public void map(LongWritable l, Text t, OutputCollector o, Reporter r) throws IOException
    {
        String line = t.toString();

        int counter = 0;
        
        for (int i = 0; i < line.length(); i++)
        {

            if (line.charAt(i) == ' ')
            {
                word.set((line.substring(counter, i)).substring(0,1));
                oned.set(i-counter);
                counter = i+1;
                o.collect(word, oned);
            }

        }

        word.set((line.substring(counter)).substring(0,1));
        oned.set(line.length()-counter);
        o.collect(word, oned);
    }
}

Here we just collect the first letter of each word and the length of the word in our OutputCollector. The contents of this would be:

N 3
d 10
I 2
t 3
t 4

This is ready to be reduced. So lets look at the reducer:

public class AvgLenReducer extends MapReduceBase
        implements Reducer<Text, DoubleWritable, Text, DoubleWritable>
{
    public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException
    {
        double sum = 0;
        double counter = 0.0;

        while(values.hasNext())
        {
            DoubleWritable value = (DoubleWritable)values.next();
            sum += value.get();
            counter++;
        }
        output.collect(key, new DoubleWritable((sum/counter)));
        
        
    }
}

Again, notice the use of DoubleWritable over IntWritable. Here we reduce down the resultset from the mapper. Pretty simple really until we get to t which would involve 7/2 which = 3.5

With this done, and compiled. We just execute it with

hadoop jar AvgLen a b

then we look at our output with:

hadoop fs -cat avgout/part-00000

Nice.

So lets do the same but with Python!

Our Mapper:

#!/usr/bin/env python



import sys



for line in sys.stdin:



	line = line.strip()



	words = line.split()



	for word in words:



		print '%s\t%s' % (word[0:1], len(word))

Our Reducer:

#!/usr/bin/env python



from operator import itemgetter

import sys



current_word = None

current_count = 0.0

counter = 1.0

word = None



for line in sys.stdin:

	

	line = line.strip()



	word, count = line.split('\t', 1)



	try:

		count = int(count)

	except ValueError:

		continue



	if current_word == word:

		current_count += count

		counter = counter + 1

	else:

		current_count = current_count / counter

		if current_word:

			print '%s\t%s' % (current_word, current_count)

		current_count = count

		current_word = word

		counter = 1.0



if current_word == word:

	current_count = current_count / counter

	print '%s\t%s' % (current_word, current_count)

You can test your code, before involving hadoop with:

echo "Now is definitely the time" | /usr/local/hadoop/tmp/MyAvgMapper.py | sort | /usr/local/hadoop/tmp/MyAvgReducer.py

And if you are happy with the result, submit it to Hadoop with:

hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /usr/local/hadoop/tmp/MyAvgMapper.py -mapper /usr/local/hadoop/tmp/MyAvgMapper.py  -file /usr/local/hadoop/tmp/MyAvgReducer.py -reducer /usr/local/hadoop/tmp/MyAvgReducer.py -input /user/hduser/avg/text.txt -output /user/hduser/pythonAvgTest

This is command just rolls off the keyboard doesn’t it 🙂

Note I uploaded the file text.txt which contained the test text.

I hope this proves useful.