Archive for the ‘Sqoop’ Category

Using Apache’s Sqoop as an API

October 2, 2014

Been doing some work with Sqoop recently and using the book Apache Sqoop Cookbook which I found very easy to follow. However, do we really want to do everything via the command line interface (CLI), I think not. Surely we also want the option to do it via an application programming interface (API).

I did some research and found some examples (here and here), so I thought I would document what I have done taking some of the examples from the book mentioned and turn these into code. I hope you find this useful.

Which Sqoop am I talking about, there are two!!!!

I am making reference to Sqoop 1, the original. The version I am using at the time of writing is 1.44.

Where do I get the API library from?

The library is sqoop-1.4.4.2.1.1.0-385.jar (well that is the library I am using). For me, it is included in the Hortonworks VirtualBox Hadoop Sandbox 2.1 I am using, it can be found in

 /usr/lib/sqoop/sqoop-1.4.4.2.1.1.0-385.jar 

What do I can?

You can do everything you can do at the CLI, but within your code rather than having to fork to the shell to sync data into HDFS.

How does it work?

I thought I would run through some of the examples in the book, but before doing that you need to look at “The Code” section below, as I have encapsulated the methods to try and make it easier. The book covers syncing a complete table, specifying the HDFS location you want the data in, compressing the data, placing a Hive table over the top, and the list goes on.

I will provide you with the API examples based on my code, as well as the CLI commands so you can get a good understanding. I am basing all of my examples on a simple MySQL database and table(s) I have created, these are really simple.

Syncing an entire table:

CLI:


sqoop import --connect jdbc:mysql://localhost/sqoop \
--username TheUsername \
--password ThePassword \
--table cities

API:


public static void main(String[] args) {

  setUp();

  TransferringEntireTable("cities_api");

  runIt();

}

 

Syncing an entire table, and specify the directory:

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop \
--username sqoop \
--password password \
--table cities \
--warehouse-dir /etc/input

API:

public static void main(String[] args) {

  setUp();

  TansferringEntireTableSpecificDir("cities_api","/etl/input");

  runIt();
}

Syncing an entire table, specifying the directory and compressing

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop \
--username sqoop \
--password password \
--table cities \
--compress \
--compression-codec org.apache.hadoop.io.compress.BZip2Codec \
--warehouse-dir /etl/input/compressed

API:

public static void main(String[] args) {

  setUp();

  CompressingImportedData("cities_api","/etl/input/compressed","org.hadoop.io.compress.BZip2Codec");

  runIt();
}

An incremental import of a table

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop \
--username sqoop \
--password password \
--table visits \
--incremental lastmodified \
--check-column last_update_date \
--last-value "2014-08-14 00:00:00"

API:

public static void main(String[] args) {

  setUp();

  incrementalImport("booking_api","/etl/input",IncrementalMode.DateLastModified,"last_modified","2014-05-14 00:00:00");

  runIt();
}

Syncing an entire table and putting a Hive table over the top.

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop_test \
--username sqoop \
--password password \
--table cities \
--hive-import \
--warehouse-dir /etl/input

API:

public static void main(String[] args) {

  setUp();

  TansferringEntireTableSpecificDirHiveMerge("cities_api","/etl/input");

  runIt();
}

Syncing a partitioned table and updating/creating the Hive table.

CLI:

sqoop import --connect jdbc:mysql://localhost/sqoop \
--username sqoop \
--password password \
--table visits_by_day \
--hive-import \
--hive-partition-key visit_day_partition \
--hive-partition-value "2014-10-01"

API:

public static void main(String[] args) {

  setUp();

  TransferringEntireTableSpecificDirHivePartition("visits_by_day","/etl/input","visit_day_partition", "2014-10-01");

  runIt();
}

Below is the code, you can see how I have encapsulated the methods to try and make the implementation as “simple” as possible.

The Code:


package sqoopexamples;

import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.tool.ImportTool;

public class SqoopExamples {

  private static SqoopOptions SqoopOptions = new SqoopOptions();
  private static final String connectionString = "jdbc:mysql://127.0.0.1/sqoop_test";
  private static final String username = "MySQL database username";
  private static final String password = "MySQL database password";

  private static void setUp() {
    SqoopOptions.setConnectString(connectionString);
    SqoopOptions.setUsername(username);
    SqoopOptions.setPassword(password);
  }

  private static int runIt() {
    int res;
    res = new ImportTool().run(SqoopOptions);
    if (res != 0) {
      throw new RuntimeException("Sqoop API Failed - return code : "+Integer.toString(res));
    }
    return res;
  }

  private static void TransferringEntireTable(String table) {
    SqoopOptions.setTableName(table);
  }

  private static void TansferringEntireTableSpecificDir(String table,
  String directory) {
    TransferringEntireTable(table);
    SqoopOptions.setWarehouseDir(directory);
  }

  private static void TansferringEntireTableSpecificDirHiveMerge(String table,
  String directory) {
    TansferringEntireTableSpecificDir(table,directory);
    SqoopOptions.setHiveImport(true);
  }

  private static void TansferringEntireTableSpecificDirHivePartitionMerge(String table
    , String directory
    , String partitionKey
    , String partitionValue) {
    TansferringEntireTableSpecificDirHiveMerge(table,directory);
    SqoopOptions.setHivePartitionKey(partitionKey);
    SqoopOptions.setHivePartitionValue(partitionValue);
  }

  private static void TansferringEntireTableWhereClause(String table,
    String whereClause) {
    //To do
  }

  private static void CompressingImportedData(String table, String directory
  ,String compress) {
    TansferringEntireTableSpecificDir(table,directory);
    SqoopOptions.setCompressionCodec(compress);
  }

  private static void incrementalImport(String table
    , String directory
    , IncrementalMode mode
    , String checkColumn
    , String lastVale) {
    TansferringEntireTableSpecificDir(table,directory);
    SqoopOptions.setIncrementalMode(mode);
    SqoopOptions.setAppendMode(true);
    SqoopOptions.setIncrementalTestColumn(checkColumn);
    SqoopOptions.setIncrementalLastValue(lastVale);
  }

  private static void TransferringEntireTableSpecificDirHive(String table,
                                                        String directory) {
    TransferringEntireTableSpecificDir(table,directory);
    SqoopOptions.setHiveImport(true);
  }
 
  private static void TransferringEntireTableSpecificDirHivePartition(String table,
                                                        String directory
                                                        , String partitionKey
                                                        , String partitionValue) {
    TransferringEntireTableSpecificDirHive(table,directory);
    SqoopOptions.setHivePartitionKey(partitionKey);
    SqoopOptions.setHivePartitionValue(partitionValue);
  }
}

And Finally

There is another way to use the Sqoop API, not one that excites me but I thought I would share it:

int performHdfsImport() throws SqoopException {

              SqoopTool tool = new ImportTool();
              String[] args = {
              "--connection-manager", "<em>driver</em>"
              , "--connect", "jdbc:db2://dle-db2edw01.dl.karmalab.net:50001/EXPDEV01"
              , "--username", "<em>username</em>"
              , "--password", "<em>password</em>"
              , "--table", "<em>table_name</em>"
              , "--target-dir", "/<em>directory</em>/<em>name</em>"
              };

              SqoopOptions options = new SqoopOptions();

              try {
                options = tool.parseArguments(args, null, options, false);
                tool.validateOptions(options);
              } catch (Exception e) {
                    System.err.println(e.getMessage());
                throw new SqoopException(e.message);
            }
            tool.run(options);
           }
   return tool.run(options);
}

The reason I am not that keen on this is, it doesn’t really differ from running it via the CLI, I like having the control of setting the values via the methods with this you would have plenty of final strings as the keys and then assigning the corresponding values and then populating the string array to pass into the method. Not my cup of tea.

Advertisements