diff --git a/build.xml b/build.xml index d614d0985..64aca6dde 100644 --- a/build.xml +++ b/build.xml @@ -191,7 +191,10 @@ - + + + + diff --git a/ivy.xml b/ivy.xml index a93d0afd2..f430cdef2 100644 --- a/ivy.xml +++ b/ivy.xml @@ -37,6 +37,7 @@ under the License. extends="runtime" description="artifacts needed to compile/test the application"/> + @@ -46,15 +47,15 @@ under the License. + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> @@ -291,7 +292,16 @@ under the License. - + + + + + + + + + + diff --git a/ivy/ivysettings.xml b/ivy/ivysettings.xml index 2920c892f..1161742ad 100644 --- a/ivy/ivysettings.xml +++ b/ivy/ivysettings.xml @@ -33,6 +33,9 @@ under the License. --> + diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index df0415766..1e80fbd4b 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -561,6 +561,9 @@ include::hbase.txt[] include::accumulo-args.txt[] include::accumulo.txt[] +include::phoenix-args.txt[] +include::phoenix.txt[] + include::codegen-args.txt[] As mentioned earlier, a byproduct of importing a table to HDFS is a diff --git a/src/docs/user/phoenix-args.txt b/src/docs/user/phoenix-args.txt new file mode 100644 index 000000000..2c39675cf --- /dev/null +++ b/src/docs/user/phoenix-args.txt @@ -0,0 +1,33 @@ + +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +.Phoenix arguments: +[grid="all"] +`-------------------------------------`----------------------------------- +Argument Description +-------------------------------------------------------------------------- ++\--phoenix-table + Specifies Phoenix table to use\ + as the target instead of HDFS ++\--phoenix-column-mapping + (Optional)Comma-separated mapping of\ + sqoop column to phoenix column. + The two should be separated by ; ++\--phoenix-bulkload+ Enables bulk loading +-------------------------------------------------------------------------- + diff --git a/src/docs/user/phoenix.txt b/src/docs/user/phoenix.txt new file mode 100644 index 000000000..18e24b11a --- /dev/null +++ b/src/docs/user/phoenix.txt @@ -0,0 +1,47 @@ + +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +Importing Data Into Phoenix +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Sqoop supports additional import targets beyond HDFS and Hive. Sqoop +can also import records into a table in Phoenix. + +By specifying +\--phoenix-table+, you instruct Sqoop to import +to a table in Phoenix backed Hbase rather than a directory in HDFS. Sqoop will +import data to the table specified as the argument to +\--phoenix-table+. + +NOTE: This function is incompatible with direct import (parameter ++\--direct+). + +If the column names in the input table differ from the column names +in Phoenix, specify +\--phoenix-column-mapping+. The argument is a +comma-separated list of input column name and phoenix column name . The +two should be separated by a ;. +Ex: sqoop_col1;phoenix_col1,sqoop_col2;phoenix_col2.. +The mapper transforms each row of the input table into an upsert +into Phoenix and persists using PhoenixOutputFormat. + + +If the target Phoenix table do not exist, the Sqoop job will +exit with an error. You should create the target table before running an import. + +To decrease the load on hbase, Sqoop can do bulk loading as opposed to +direct writes. To use bulk loading, enable it using +\--phoenix-bulkload+. diff --git a/src/docs/user/validation.txt b/src/docs/user/validation.txt index 27a78e22d..ccb8f69e6 100644 --- a/src/docs/user/validation.txt +++ b/src/docs/user/validation.txt @@ -101,7 +101,7 @@ The following are the limitations in the current implementation: * all-tables option * free-form query option -* Data imported into Hive, HBase or Accumulo +* Data imported into Hive, HBase, Phoenix or Accumulo * table import with --where argument * incremental imports diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index ef6e0cec5..220dfc95e 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -316,6 +316,15 @@ public String toString() { // explicit split by cols @StoredAsProperty("reset.onemapper") private boolean autoResetToOneMapper; + // Phoenix table to import into. + @StoredAsProperty("phoenix.table") private String phoenixTable; + + //Phoenix column mapping to sqoop columns. + @StoredAsProperty("phoenix.column.mapping") private String phoenixColumnMapping; + + //Is this a bulkload job. + @StoredAsProperty("phoenix.bulk.load.enabled") private boolean phoenixBulkLoadEnabled; + // These next two fields are not serialized to the metastore. // If this SqoopOptions is created by reading a saved job, these will // be populated by the JobStorage to facilitate updating the same @@ -432,7 +441,7 @@ private void setDelimiterProperties(Properties props, putProperty(props, prefix + ".escape", Integer.toString((int) values.getEscapedBy())); putProperty(props, prefix + ".enclose.required", - Boolean.toString(values.isEncloseRequired())); + Boolean.toString(values.isEncloseRequired())); } /** Take a comma-delimited list of input and split the elements @@ -1894,6 +1903,7 @@ public void setConf(Configuration config) { this.conf = config; } + /** * @return command-line arguments after a '-'. */ @@ -2404,6 +2414,51 @@ public void setAccumuloZookeepers(String zookeepers) { this.accumuloZookeepers = zookeepers; } + /** + * Get the phoenix table to import + * @return + */ + public String getPhoenixTable() { + return phoenixTable; + } + + /** + * sets the target phoenix table + * @param phoenixTable + */ + public void setPhoenixTable(String phoenixTable) { + this.phoenixTable = phoenixTable; + } + + /** + * one to one mapping between db columns and phoenix columns for the table + * the pattern is dbcolumn1;phoenixcolumn1,dbcolumn2;phoenixcolumn2 + * @return + */ + public String getPhoenixColumnMapping() { + return phoenixColumnMapping; + } + + /** + * sets the db column manpping to phoenix column mapping. + * @param phoenixColumnMapping + */ + public void setPhoenixColumnMapping(String phoenixColumnMapping) { + this.phoenixColumnMapping = phoenixColumnMapping; + } + + /** + * returns if the load to phoenix is through the bulk load + * @return + */ + public boolean isPhoenixBulkLoadEnabled() { + return phoenixBulkLoadEnabled; + } + + public void setPhoenixBulkLoadEnabled(boolean phoenixBulkLoadEnabled) { + this.phoenixBulkLoadEnabled = phoenixBulkLoadEnabled; + } + public void setConnManagerClassName(String connManagerClass) { this.connManagerClassName = connManagerClass; } diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java index d9569c590..46b870f04 100644 --- a/src/java/org/apache/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/sqoop/manager/ConnManager.java @@ -813,6 +813,14 @@ public boolean isDirectModeHBaseSupported() { public boolean isDirectModeAccumuloSupported() { return false; } + + /** + * By default direct mode is not compatible with Phoenix + * @return Whether direct mode is allowed. + */ + public boolean isDirectModePhoenixSupported() { + return false; + } } diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index ead581df1..f2789b56a 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -44,6 +44,9 @@ import org.apache.sqoop.mapreduce.AccumuloImportJob; import org.apache.sqoop.mapreduce.HBaseBulkImportJob; import org.apache.sqoop.mapreduce.JdbcCallExportJob; +import org.apache.sqoop.mapreduce.PhoenixBulkImportJob; +import org.apache.sqoop.mapreduce.PhoenixImportJob; +import org.apache.sqoop.phoenix.PhoenixUtil; import org.apache.sqoop.util.LoggingUtils; import org.apache.sqoop.util.SqlTypeMap; @@ -661,12 +664,21 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) + "classpath, cannot import to Accumulo!"); } importer = new AccumuloImportJob(opts, context); + } else if(opts.getPhoenixTable() != null) { + if(!PhoenixUtil.isPhoenixJarPresent()) { + throw new ImportException("Phoenix jars are not present in " + + "classpath, cannot import to Phoenix!"); + } + if(opts.isPhoenixBulkLoadEnabled()) { + importer = new PhoenixBulkImportJob(opts, context); + } else { + importer = new PhoenixImportJob(opts, context); + } } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), context); } - checkTableImportOptions(context); String splitCol = getSplitColumn(opts, tableName); @@ -704,6 +716,16 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context) + " cannot import to Accumulo!"); } importer = new AccumuloImportJob(opts, context); + } else if(opts.getPhoenixTable() != null) { + if(!PhoenixUtil.isPhoenixJarPresent()) { + throw new ImportException("Phoenix jars are not present in " + + "classpath, cannot import to Phoenix!"); + } + if(opts.isPhoenixBulkLoadEnabled()) { + importer = new PhoenixBulkImportJob(opts, context); + } else { + importer = new PhoenixImportJob(opts, context); + } } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index 04d60fd13..a3bcdec6a 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -231,6 +231,11 @@ public void runImport(String tableName, String ormJarFile, String splitByCol, throw new IOException("Direct mode is incompatible with " + "HBase. Please remove the parameter --direct"); } + if (options.getPhoenixTable() != null && options.isDirect() + && !getContext().getConnManager().isDirectModePhoenixSupported()) { + throw new IOException("Direct mode is incompatible with " + + "Phoenix. Please remove the parameter --direct"); + } if (null != tableName) { LOG.info("Beginning import of " + tableName); } else { diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java new file mode 100644 index 000000000..1931ee9ac --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.sqoop.phoenix.PhoenixConstants; +import org.apache.sqoop.phoenix.PhoenixUtil; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.util.ImportException; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; + +/** + * Job to bulk import data from db to phoenix. + */ +public class PhoenixBulkImportJob extends DataDrivenImportJob { + + public static final Log LOG = LogFactory.getLog( + PhoenixBulkImportJob.class.getName()); + + public PhoenixBulkImportJob(final SqoopOptions opts, + final ImportJobContext importContext) { + super(opts, importContext.getInputFormat(), importContext); + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setMapperClass(getMapperClass()); + } + + @Override + protected Class getMapperClass() { + return PhoenixBulkImportMapper.class; + } + + @Override + protected Class getOutputFormatClass() + throws ClassNotFoundException { + return HFileOutputFormat.class; + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws ClassNotFoundException, IOException { + + ConnManager manager = getContext().getConnManager(); + String[] sColumnNames = null; + if (null == tableName) { + String sqlQuery = options.getSqlQuery(); + sColumnNames = manager.getColumnNamesForQuery(sqlQuery); + } else { + if (null == options.getColumns()) { + sColumnNames = manager.getColumnNames(tableName); + } else { + sColumnNames = options.getColumns(); + } + } + String columnMappings = options.getPhoenixColumnMapping(); + // if column mappings aren't provided, + // we assume the column names in sqoop and phoenix match. + if(columnMappings == null || columnMappings.isEmpty()) { + columnMappings = Joiner.on(",").join(sColumnNames); + } + job.getConfiguration().set(PhoenixConstants.PHOENIX_SQOOP_COLUMNS, + Joiner.on(",").join(sColumnNames)); + job.getConfiguration().set(PhoenixConstants.PHOENIX_COLUMN_MAPPING, columnMappings); + job.setOutputFormatClass(getOutputFormatClass()); + } + + @Override + protected void jobSetup(Job job) throws IOException, ImportException { + super.jobSetup(job); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.addHbaseResources(conf); + final String tableName = options.getPhoenixTable(); + final String sColumnNames = conf.get(PhoenixConstants.PHOENIX_SQOOP_COLUMNS); + final String columnMappings = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(sColumnNames); + + final Map phoenixToSqoopMapping = PhoenixUtil.getPhoenixToSqoopMap(columnMappings); + final String pColumnNames = Joiner. + on(PhoenixConstants.PHOENIX_COLUMN_MAPPING_SEPARATOR). + join(phoenixToSqoopMapping.keySet()); + boolean isValidColumns = PhoenixUtil.validateColumns(sColumnNames,columnMappings); + if (!isValidColumns) { + throw new RuntimeException(String.format("Failure to map sqoop columns [%s] " + + "to phoenix columns [%s] ", sColumnNames,pColumnNames)); + } + + PhoenixMapReduceUtil.setOutput(job,tableName,pColumnNames); + + FileOutputFormat.setOutputPath(job,getContext().getDestination()); + HTable hTable = new HTable(job.getConfiguration(), tableName); + HFileOutputFormat.configureIncrementalLoad(job, hTable); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJars(conf, HTable.class); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), PhoenixDriver.class); + } + + @Override + protected void completeImport(Job job) throws IOException, ImportException { + super.completeImport(job); + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + Path bulkLoadDir = getContext().getDestination(); + setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), + FsPermission.createImmutable((short) 00777)); + HTable hTable = new HTable(job.getConfiguration(), options.getPhoenixTable()); + + // Load generated HFiles into table + try { + LoadIncrementalHFiles loader = new LoadIncrementalHFiles( + job.getConfiguration()); + loader.doBulkLoad(bulkLoadDir, hTable); + } + catch (Exception e) { + String errorMessage = String.format("Unrecoverable error while " + + "performing the bulk load of files in [%s]", + bulkLoadDir.toString()); + throw new ImportException(errorMessage, e); + } + } + + @Override + protected void jobTeardown(Job job) throws IOException, ImportException { + super.jobTeardown(job); + // Delete the hfiles directory after we are finished. + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + fileSystem.delete(getContext().getDestination(), true); + } + + /** + * Set the file permission of the path of the given fileStatus. If the path + * is a directory, apply permission recursively to all subdirectories and + * files. + * + * @param fs the filesystem + * @param fileStatus containing the path + * @param permission the permission + * @throws java.io.IOException + */ + private void setPermission(FileSystem fs, FileStatus fileStatus, + FsPermission permission) throws IOException { + if (fileStatus.isDirectory()) { + for (FileStatus file : fs.listStatus(fileStatus.getPath())){ + setPermission(fs, file, permission); + } + } + fs.setPermission(fileStatus.getPath(), permission); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java new file mode 100644 index 000000000..09c302f27 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.sqoop.phoenix.PhoenixConstants; +import org.apache.sqoop.phoenix.PhoenixUtil; + +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.AutoProgressMapper; + +/** + * + * Mapper class for phoenix bulk import job. + * + */ +public class PhoenixBulkImportMapper + extends AutoProgressMapper { + + public static final Log LOG = LogFactory.getLog( + PhoenixBulkImportMapper.class.getName()); + + private Configuration conf; + private List columnInfos; + private PhoenixConnection conn; + private byte[] tableName; + private PreparedStatement preparedStatement = null; + + /* holds the mapping of phoenix column to db column */ + private Map columnMappings = null; + + @Override + protected void setup(Mapper.Context context) + throws IOException, InterruptedException { + conf = context.getConfiguration(); + try { + conn = (PhoenixConnection) QueryUtil.getConnection(conf); + conn.setAutoCommit(false); + String phoenixTable = PhoenixConfigurationUtil.getOutputTableName(conf); + tableName = Bytes.toBytes(phoenixTable); + columnInfos = PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf); + String upsertSql = QueryUtil.constructUpsertStatement(phoenixTable, columnInfos); + preparedStatement = conn.prepareStatement(upsertSql); + String columnMaps = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); + columnMappings = PhoenixUtil.getPhoenixToSqoopMap(columnMaps); + } catch (Exception e) { + throw new RuntimeException("Failed to setup due to ." + e.getMessage()); + } + } + + + + @Override + protected void map(LongWritable key, SqoopRecord value, + Mapper.Context context) + throws IOException, InterruptedException { + try { + ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); + Map fields = value.getFieldMap(); + int i = 1; + for (ColumnInfo colInfo : columnInfos) { + String pColName = colInfo.getDisplayName(); + String sColName = columnMappings.get(pColName); + Object sColValue = fields.get(sColName); + preparedStatement.setObject(i++, sColValue); + } + preparedStatement.execute(); + + Iterator>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(conn, true); + while (uncommittedDataIterator.hasNext()) { + Pair> kvPair = uncommittedDataIterator.next(); + if (Bytes.compareTo(tableName, kvPair.getFirst()) != 0) { + // skip edits for other tables + continue; + } + List keyValueList = kvPair.getSecond(); + for (KeyValue kv : keyValueList) { + outputKey.set(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()); + context.write(outputKey, kv); + } + } + conn.rollback(); + + } catch (SQLException e) { + throw new RuntimeException("Failed to process the record in the mapper due to " + e.getMessage()); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + conn.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java b/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java new file mode 100644 index 000000000..101d9242a --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.PhoenixOutputFormat; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.sqoop.phoenix.PhoenixConstants; +import org.apache.sqoop.phoenix.PhoenixSqoopWritable; +import org.apache.sqoop.phoenix.PhoenixUtil; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.util.ImportException; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; + +/** + * Runs an Phoenix import via DataDrivenDBInputFormat and PhoenixOutputFormat + */ +public class PhoenixImportJob extends DataDrivenImportJob { + + public static final Log LOG = LogFactory.getLog( + PhoenixImportJob.class.getName()); + + public PhoenixImportJob(final SqoopOptions opts, + final ImportJobContext importContext) { + super(opts, importContext.getInputFormat(), importContext); + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(PhoenixSqoopWritable.class); + job.setMapperClass(getMapperClass()); + } + + @Override + protected Class getMapperClass() { + return PhoenixImportMapper.class; + } + + @Override + protected Class getOutputFormatClass() + throws ClassNotFoundException { + return PhoenixOutputFormat.class; + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws ClassNotFoundException, IOException { + ConnManager manager = getContext().getConnManager(); + String[] sColumnNames = null; + if (null == tableName) { + String sqlQuery = options.getSqlQuery(); + sColumnNames = manager.getColumnNamesForQuery(sqlQuery); + } else { + if (null == options.getColumns()) { + sColumnNames = manager.getColumnNames(tableName); + } else { + sColumnNames = options.getColumns(); + } + } + String columnMappings = options.getPhoenixColumnMapping(); + // if column mappings aren't provided, we assume the column names in sqoop and phoenix match. + if (columnMappings == null || columnMappings.isEmpty()) { + columnMappings = Joiner.on(",").join(sColumnNames); + } + job.getConfiguration().set(PhoenixConstants.PHOENIX_SQOOP_COLUMNS, Joiner.on(",").join(sColumnNames)); + job.getConfiguration().set(PhoenixConstants.PHOENIX_COLUMN_MAPPING, columnMappings); + job.setOutputFormatClass(getOutputFormatClass()); + } + + @Override + protected void jobSetup(Job job) throws IOException, ImportException { + Configuration conf = job.getConfiguration(); + HBaseConfiguration.addHbaseResources(conf); + final String tableName = options.getPhoenixTable(); + final String sColumnNames = conf.get(PhoenixConstants.PHOENIX_SQOOP_COLUMNS); + final String columnMappings = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(sColumnNames); + Preconditions.checkNotNull(columnMappings); + + final Map phoenixToSqoopMapping = PhoenixUtil.getPhoenixToSqoopMap(columnMappings); + final String pColumnNames = Joiner.on(",").join(phoenixToSqoopMapping.keySet()); + boolean isValidColumns = PhoenixUtil.validateColumns(sColumnNames,columnMappings); + if (!isValidColumns) { + throw new RuntimeException(String.format("Failure to map sqoop columns [%s] to phoenix columns [%s] ", sColumnNames,pColumnNames)); + } + // set the table and columns. + PhoenixMapReduceUtil.setOutput(job, tableName,pColumnNames); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJars(conf, HTable.class); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), PhoenixDriver.class); + super.jobSetup(job); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java b/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java new file mode 100644 index 000000000..04d2c86f5 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.sqoop.phoenix.PhoenixConstants; +import org.apache.sqoop.phoenix.PhoenixSqoopWritable; +import org.apache.sqoop.phoenix.PhoenixUtil; + +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.AutoProgressMapper; +import com.google.common.collect.Lists; + +/** + * Imports records by writing them to Phoenix + * + */ +public class PhoenixImportMapper + extends AutoProgressMapper { + + public static final Log LOG = LogFactory.getLog( + PhoenixImportMapper.class.getName()); + + private Configuration conf; + private List columnInfos; + /* holds the mapping of phoenix column to db column */ + private Map columnMappings; + + @Override + protected void setup(Mapper.Context context) + throws IOException, InterruptedException { + conf = context.getConfiguration(); + try { + columnInfos = PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf); + String columnMaps = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); + columnMappings = PhoenixUtil.getPhoenixToSqoopMap(columnMaps); + } catch (SQLException e) { + throw new RuntimeException("Failed to load the upsert column metadata for table."); + } + } + + @Override + public void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + + Map fields = val.getFieldMap(); + PhoenixSqoopWritable recordWritable = new PhoenixSqoopWritable(); + recordWritable.setColumnMetadata(columnInfos); + List columnValues = Lists.newArrayListWithCapacity(columnInfos.size()); + for (ColumnInfo column : columnInfos) { + String pColName = column.getDisplayName(); + String sColName = columnMappings.get(pColName); + Object sColValue = fields.get(sColName); + columnValues.add(sColValue); + } + recordWritable.setValues(columnValues); + context.write(NullWritable.get(), recordWritable); + } +} diff --git a/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java b/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java new file mode 100644 index 000000000..9aaee3013 --- /dev/null +++ b/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.phoenix; + +/** + * Set of constants specifically for phoenix. + */ +public final class PhoenixConstants { + + /** property used to specify the column mapping of db to phoenix **/ + public static final String PHOENIX_COLUMN_MAPPING = "sqoop.phoenix.import.column.mapping"; + + /** property used to specify the columns beings imported from sqoop. */ + public static final String PHOENIX_SQOOP_COLUMNS = "sqoop.phoenix.columns"; + + /** separator for phoenix columns */ + public static final String PHOENIX_COLUMN_MAPPING_SEPARATOR = ","; + + /** separator between phoenix and sqoop column. */ + public static final String PHOENIX_SQOOP_COLUMN_SEPARATOR = ";"; +} diff --git a/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java b/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java new file mode 100644 index 000000000..174ca5b70 --- /dev/null +++ b/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.phoenix; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.util.ColumnInfo; + +import com.google.common.base.Preconditions; + +/** + * + * A custom {@linkplain DBWritable} to map input columns to phoenix columns to upsert. + * + */ +public class PhoenixSqoopWritable implements DBWritable { + + private List columnMetadata; + private List values; + + /** + * Default constructor + */ + public PhoenixSqoopWritable() { + } + + public PhoenixSqoopWritable(final List columnMetadata, final List values) { + Preconditions.checkNotNull(values); + Preconditions.checkNotNull(columnMetadata); + this.columnMetadata = columnMetadata; + this.values = values; + } + + @Override + public void write(PreparedStatement statement) throws SQLException { + Preconditions.checkNotNull(values); + Preconditions.checkNotNull(columnMetadata); + for (int i = 0 ; i < values.size() ; i++) { + Object value = values.get(i); + ColumnInfo columnInfo = columnMetadata.get(i); + if (value == null) { + statement.setNull(i + 1, columnInfo.getSqlType()); + } else { + statement.setObject(i + 1, value , columnInfo.getSqlType()); + } + } + } + + @Override + public void readFields(ResultSet resultSet) throws SQLException { + // NO-OP for now + } + + public List getColumnMetadata() { + return columnMetadata; + } + + public void setColumnMetadata(List columnMetadata) { + this.columnMetadata = columnMetadata; + } + + public List getValues() { + return values; + } + + public void setValues(List values) { + this.values = values; + } +} diff --git a/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java b/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java new file mode 100644 index 000000000..93a41d928 --- /dev/null +++ b/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.phoenix; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + * Utility class . + * + */ +public class PhoenixUtil { + + public static final Log LOG = LogFactory.getLog( + PhoenixUtil.class.getName()); + + private static boolean testingMode = false; + + private PhoenixUtil() { + } + + /** + * This is a way to make this always return false for testing. + */ + public static void setAlwaysNoPhoenixJarMode(boolean mode) { + testingMode = mode; + } + + public static boolean isPhoenixJarPresent() { + if (testingMode) { + return false; + } + try { + // validate if hbase jars also exist in classpath. + Class.forName("org.apache.hadoop.hbase.client.HTable"); + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + } catch (ClassNotFoundException cnfe) { + LOG.error("Failed to find phoenix dependencies in classpath : " + cnfe.getMessage()); + return false; + } + return true; + } + + /** + * Generates a map of phoenix_column to sqoop column. + * @param columnMapping + * @return + */ + public static Map getPhoenixToSqoopMap(String columnMappings) { + String[] split = columnMappings.split(PhoenixConstants.PHOENIX_COLUMN_MAPPING_SEPARATOR); + Map columnMappingsMap = new HashMap(); + for (String each : split) { + String[] sqoopToPhoenixMapping = each.split(PhoenixConstants.PHOENIX_SQOOP_COLUMN_SEPARATOR); + // if the sqoop column name is the same as phoenix column name, + // we don't need to separate the columns by a ';' delimiter. + if (sqoopToPhoenixMapping.length == 2) { + columnMappingsMap.put(sqoopToPhoenixMapping[1], sqoopToPhoenixMapping[0]); + } else { + columnMappingsMap.put(sqoopToPhoenixMapping[0].toUpperCase(), sqoopToPhoenixMapping[0]); + } + } + return columnMappingsMap; + } + + /** + * does the following validations + * 1. count of columns in sqoop match phoenix + * 2. 1 to 1 mapping between sqoop column to phoenix column. + * @param columnNames + * @param phoenixColumnMappings + */ + public static boolean validateColumns(String sColumns, String columnMappings) { + Map phoenixToSqoopColumnMap = getPhoenixToSqoopMap(columnMappings); + String sqoopColumns[] = sColumns.split(","); + if (sqoopColumns.length != phoenixToSqoopColumnMap.size()) { + throw new RuntimeException("Mismatch in the number of columns being imported from Sqoop " + + "and written to phoenix."); + } + Collection values = phoenixToSqoopColumnMap.values(); + for (String sqoopColumn : sqoopColumns) { + if (!values.contains(sqoopColumn)) { + throw new RuntimeException(String.format("Sqoop column [%s] doesn't exist in the valid list" + + " of column mappings [%s] ",sqoopColumn, Arrays.toString(values.toArray()))); + } + } + return true; + } + } diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index c97bb589f..0d8530b7c 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -188,6 +188,11 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { "hbase-bulkload"; public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table"; + // Phoenix arguments + public static final String PHOENIX_TABLE_ARG = "phoenix-table"; + public static final String PHOENIX_COLUMN_MAPPING_ARG = "phoenix-column-mapping"; + public static final String PHOENIX_BULK_LOAD_ENABLED_ARG = "phoenix-bulkload"; + //Accumulo arguments. public static final String ACCUMULO_TABLE_ARG = "accumulo-table"; public static final String ACCUMULO_COL_FAM_ARG = "accumulo-column-family"; @@ -774,6 +779,39 @@ protected RelatedOptions getHBaseOptions() { return hbaseOpts; } + protected RelatedOptions getPhoenixOptions() { + RelatedOptions phoenixOpts = + new RelatedOptions("Phoenix arguments"); + phoenixOpts.addOption(OptionBuilder.withArgName("table") + .hasArg() + .withDescription("Phoenix table to import data to") + .withLongOpt(PHOENIX_TABLE_ARG) + .create()); + phoenixOpts.addOption(OptionBuilder.withArgName("phoenix-column-mapping") + .hasArg() + .withDescription("column mapping between db and phoenix.") + .withLongOpt(PHOENIX_COLUMN_MAPPING_ARG) + .create()); + phoenixOpts.addOption(OptionBuilder + .withDescription("Enable Phoenix bulk load") + .withLongOpt(PHOENIX_BULK_LOAD_ENABLED_ARG) + .create()); + return phoenixOpts; + + } + + protected void applyPhoenixOptions(CommandLine in, SqoopOptions out) { + if (in.hasOption(PHOENIX_TABLE_ARG)) { + out.setPhoenixTable(in.getOptionValue(PHOENIX_TABLE_ARG)); + } + + if(in.hasOption(PHOENIX_COLUMN_MAPPING_ARG)) { + out.setPhoenixColumnMapping(in.getOptionValue(PHOENIX_COLUMN_MAPPING_ARG)); + } + + out.setPhoenixBulkLoadEnabled(in.hasOption(PHOENIX_BULK_LOAD_ENABLED_ARG)); + } + protected RelatedOptions getAccumuloOptions() { RelatedOptions accumuloOpts = new RelatedOptions("Accumulo arguments"); @@ -1636,6 +1674,14 @@ protected void validateHBaseOptions(SqoopOptions options) throw new InvalidOptionsException(validationMessage); } } + + protected void validatePhoenixOptions(SqoopOptions options) + throws InvalidOptionsException { + if(options.isPhoenixBulkLoadEnabled() && options.getPhoenixTable() == null) { + throw new InvalidOptionsException("bulkload requires --phoenix-bulkload and --phoenix-table options " + + HELP_STR); + } + } /** * Given an array of extra arguments (usually populated via diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index c79e044d4..9e973ec62 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -800,6 +800,7 @@ public void configureOptions(ToolOptions toolOptions) { toolOptions.addUniqueOptions(getHCatalogOptions()); toolOptions.addUniqueOptions(getHCatImportOnlyOptions()); toolOptions.addUniqueOptions(getAccumuloOptions()); + toolOptions.addUniqueOptions(getPhoenixOptions()); // get common codegen opts. RelatedOptions codeGenOpts = getCodeGenOpts(allTables); @@ -984,6 +985,7 @@ public void applyOptions(CommandLine in, SqoopOptions out) applyHBaseOptions(in, out); applyHCatalogOptions(in, out); applyAccumuloOptions(in, out); + applyPhoenixOptions(in, out); } catch (NumberFormatException nfe) { throw new InvalidOptionsException("Error: expected numeric argument.\n" @@ -1021,7 +1023,8 @@ protected void validateImportOptions(SqoopOptions options) && options.getTargetDir() == null && options.getHBaseTable() == null && options.getHCatTableName() == null - && options.getAccumuloTable() == null) { + && options.getAccumuloTable() == null + && options.getPhoenixTable() == null) { throw new InvalidOptionsException( "Must specify destination with --target-dir. " + HELP_STR); @@ -1130,6 +1133,7 @@ public void validateOptions(SqoopOptions options) validateHiveOptions(options); validateHCatalogOptions(options); validateAccumuloOptions(options); + validatePhoenixOptions(options); } } diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java b/src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java new file mode 100644 index 000000000..4dae19ae1 --- /dev/null +++ b/src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.phoenix; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Random; +import java.util.UUID; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.util.StringUtils; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.util.QueryUtil; +import org.junit.After; +import org.junit.Before; +import org.kitesdk.shaded.com.google.common.base.Preconditions; + +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.HsqldbTestServer; +import com.cloudera.sqoop.testutil.ImportJobTestCase; + +/** + * + * Base test class for all phoenix tests. + * + */ +public class PhoenixBaseTestCase extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog(PhoenixBaseTestCase.class.getName()); + + private static String testBuildDataProperty = ""; + private static final int ZK_DEFAULT_PORT = 2181; + private static final Random random = new Random(); + + private static void recordTestBuildDataProperty() { + testBuildDataProperty = System.getProperty("test.build.data", ""); + } + + private static void restoreTestBuidlDataProperty() { + System.setProperty("test.build.data", testBuildDataProperty); + } + + protected HBaseTestingUtility hbaseTestUtil; + private String workDir = createTempDir().getAbsolutePath(); + private MiniZooKeeperCluster zookeeperCluster; + private MiniHBaseCluster hbaseCluster; + + @Override + @Before + public void setUp() { + try { + recordTestBuildDataProperty(); + String hbaseDir = new File(workDir, "phoenix" + UUID.randomUUID().toString() ).getAbsolutePath(); + String hbaseRoot = "file://" + hbaseDir; + Configuration hbaseConf = HBaseConfiguration.create(); + hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot); + //Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT + int zookeeperPort = ZK_DEFAULT_PORT + random.nextInt(1000); + hbaseConf.setInt("hbase.zookeeper.property.clientPort",zookeeperPort ); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0"); + hbaseConf.setInt("hbase.master.info.port", -1); + hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500); + String zookeeperDir = new File(workDir, "zk").getAbsolutePath(); + zookeeperCluster = new MiniZooKeeperCluster(); + Method m; + Class zkParam[] = {Integer.TYPE}; + try { + m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort", + zkParam); + } catch (NoSuchMethodException e) { + m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", + zkParam); + } + m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)}); + zookeeperCluster.startup(new File(zookeeperDir)); + hbaseCluster = new MiniHBaseCluster(hbaseConf, 1); + HMaster master = hbaseCluster.getMaster(); + Object serverName = master.getServerName(); + + String hostAndPort; + if (serverName instanceof String) { + m = HMaster.class.getDeclaredMethod("getMasterAddress", + new Class[]{}); + Class clazz = Class.forName("org.apache.hadoop.hbase.HServerAddress"); + Object serverAddr = clazz.cast(m.invoke(master, new Object[]{})); + //returns the address as hostname:port + hostAndPort = serverAddr.toString(); + } else { + Class clazz = Class.forName("org.apache.hadoop.hbase.ServerName"); + m = clazz.getDeclaredMethod("getHostAndPort", new Class[]{}); + hostAndPort = m.invoke(serverName, new Object[]{}).toString(); + } + hbaseConf.set("hbase.master", hostAndPort); + hbaseTestUtil = new HBaseTestingUtility(hbaseConf); + hbaseTestUtil.setZkCluster(zookeeperCluster); + hbaseCluster.startMaster(); + super.setUp(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + public static File createTempDir() { + File baseDir = new File(System.getProperty("java.io.tmpdir")); + File tempDir = new File(baseDir, UUID.randomUUID().toString()); + if (tempDir.mkdir()) { + return tempDir; + } + throw new IllegalStateException("Failed to create directory"); + } + + /** + * creates the test table given the ddl + * @param url + * @param ddl + * @throws SQLException + */ + protected void createPhoenixTestTable(final String ddl) throws SQLException { + assertNotNull(ddl); + PreparedStatement stmt = getPhoenixConnection().prepareStatement(ddl); + stmt.execute(ddl); + } + + /** + * Returns a {@linkplain PhoenixConnection} + * @return + */ + protected Connection getPhoenixConnection() throws SQLException { + int zkport = hbaseTestUtil.getConfiguration().getInt("hbase.zookeeper.property.clientPort",ZK_DEFAULT_PORT); + String zkServer = hbaseTestUtil.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM); + String url = QueryUtil.getUrl(zkServer, zkport); + Connection phoenixConnection = DriverManager.getConnection(url); + return phoenixConnection; + } + + /** + * shutdown the hbase mini cluster + * @throws IOException + */ + public void shutdown() throws IOException { + if (null != hbaseTestUtil) { + LOG.info("Shutting down HBase cluster"); + hbaseCluster.shutdown(); + zookeeperCluster.shutdown(); + hbaseTestUtil = null; + } + FileUtils.deleteDirectory(new File(workDir)); + LOG.info("shutdown() method returning."); + restoreTestBuidlDataProperty(); + } + + @After + @Override + public void tearDown() { + try { + shutdown(); + } catch(Exception ex) { + LOG.error("Error shutting down HBase minicluster: " + + StringUtils.stringifyException(ex)); + } + super.tearDown(); + } + + /** + * Create the argv to pass to Sqoop. + * @return the argv as an array of strings. + */ + protected String [] getArgv(boolean includeHadoopFlags,String splitBy,String phoenixTable, + String phoenixColumnMapping,boolean isBulkload, String queryStr) { + + Preconditions.checkNotNull(phoenixTable); + int zkPort = hbaseTestUtil.getConfiguration().getInt("hbase.zookeeper.property.clientPort",ZK_DEFAULT_PORT); + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + args.add("-D"); + args.add(String.format("hbase.zookeeper.property.clientPort=%s",zkPort)); + } + + if (null != queryStr) { + args.add("--query"); + args.add(queryStr); + } else { + args.add("--table"); + args.add(getTableName()); + } + args.add("--split-by"); + args.add(splitBy); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--num-mappers"); + args.add("1"); + args.add("--phoenix-table"); + args.add(phoenixTable); + if(null != phoenixColumnMapping) { + args.add("--phoenix-column-mapping"); + args.add(phoenixColumnMapping); + } + if(isBulkload) { + args.add("--phoenix-bulkload"); + } + return args.toArray(new String[0]); + } +} diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java b/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java new file mode 100644 index 000000000..2ab58bae7 --- /dev/null +++ b/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.phoenix; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import org.junit.Test; + +/** + * + * Tests to import onto phoenix tables. + * + */ +public class PhoenixBasicImportTest extends PhoenixBaseTestCase { + + /** + * Test where the sqoop and phoenix table column names are the same. + * @throws Exception + */ + @Test + public void testBasicUsageWithNoColumnMapping() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + final String tableName = "TABLE1"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createPhoenixTestTable(ddl); + + final String [] argv = getArgv(true, "ID", tableName, null, false, null); + + // create sqoop table + String [] columnNames = { "ID" , "NAME" , "AGE"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "'first'" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run the import. + runImport(argv); + + // verify the result + phoenixConnection = getPhoenixConnection(); + Statement stmt = phoenixConnection.createStatement(); + rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertEquals("first", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } + + /** + * Test where the sqoop table column names differ from phoenix table column name + * @throws Exception + */ + @Test + public void testBasicUsageWithColumnMapping() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + // create phoenix table + final String tableName = "TABLE2"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createPhoenixTestTable(ddl); + + // create sqoop table + String [] columnNames = { "rowid" , "name" , "age"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "'Name 1'" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run the import + String [] argv = getArgv(true, "rowid", tableName, "ROWID;ID,NAME,AGE", false, null); + runImport(argv); + + //verify the data. + phoenixConnection = getPhoenixConnection(); + Statement stmt = phoenixConnection.createStatement(); + rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } +} diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java b/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java new file mode 100644 index 000000000..a0c7fbb37 --- /dev/null +++ b/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.phoenix; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import org.junit.Test; + +/** + * + * Tests to bulk import onto phoenix tables. + * + */ +public class PhoenixBulkImportTest extends PhoenixBaseTestCase { + + /** + * Test where the sqoop and phoenix table column names are the same. + * @throws Exception + */ + @Test + public void testBulkloadWithNoColumnMapping() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + final String tableName = "TABLE1"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createPhoenixTestTable(ddl); + + final String [] argv = getArgv(true, "ID", tableName, null, true, null); + + // create sqoop table + String [] columnNames = { "ID" , "NAME" , "AGE"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "'first'" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run the import. + runImport(argv); + + // verify the result + phoenixConnection = getPhoenixConnection(); + Statement stmt = phoenixConnection.createStatement(); + rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertEquals("first", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } + + /** + * Test where the sqoop table column names differ from phoenix table column name + * @throws Exception + */ + @Test + public void testBulkImportWithColumnMapping() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + // create phoenix table + final String tableName = "TABLE2"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createPhoenixTestTable(ddl); + + // create sqoop table + String [] columnNames = { "rowid" , "name" , "age"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "'Name 1'" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run the import + String [] argv = getArgv(true, "rowid", tableName, "ROWID;ID,NAME,AGE", true , null); + runImport(argv); + + //verify the data. + phoenixConnection = getPhoenixConnection(); + Statement stmt = phoenixConnection.createStatement(); + rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } +} diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java b/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java new file mode 100644 index 000000000..1f368e281 --- /dev/null +++ b/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java @@ -0,0 +1,74 @@ +package org.apache.sqoop.phoenix; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import org.junit.Test; + +/** + * Tests to insert onto phoenix table using query + * + */ +public class PhoenixQueryImportTest extends PhoenixBaseTestCase { + + /** + * + * @throws Exception + */ + @Test + public void testQueryImport() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + + // create phoenix table + final String tableName = "TABLE1"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, LOCATION VARCHAR)" , tableName); + createPhoenixTestTable(ddl); + + // create sqoop table + String [] columnNames = { "ID" , "NAME" , "LOCATION"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "VARCHAR(32)"}; + String [] vals = { "1", "'first'" , "'CA'" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run import + String sqoopTableName = getTableName(); + String query = String.format("SELECT id AS ID, name AS NAME FROM %s WHERE $CONDITIONS",sqoopTableName); + String [] argv = getArgv(true, "ID", tableName, null, false ,query); + runImport(argv); + + //verify the data. + Statement pstmt = getPhoenixConnection().createStatement(); + rs = pstmt.executeQuery(String.format("SELECT id, name,location FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("first", rs.getString(2)); + assertEquals(null, rs.getString(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } + + @Test + public void testInvalidArgument() { + try { + String [] columnNames = { "ID" , "NAME" , "LOCATION"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "VARCHAR(32)"}; + String [] vals = { "1", "'first'" , "'CA'" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + String [] argv = getArgv(true, "ID", "SAMPLE", null, false , null); + runImport(argv); + fail("The test should fail as niether sqoop table nor sql query is passed."); + } catch (Exception ex) { + return; + } + } +}