From c33f779e5c7731e52214c885c33ca0901935411b Mon Sep 17 00:00:00 2001 From: ravimagham Date: Mon, 26 Oct 2015 01:23:24 -0700 Subject: [PATCH 1/6] scoop phoenix first code drop --- build.xml | 2 +- ivy.xml | 16 ++- src/java/org/apache/sqoop/SqoopOptions.java | 23 +++- .../org/apache/sqoop/manager/SqlManager.java | 10 +- .../sqoop/mapreduce/PhoenixImportJob.java | 107 ++++++++++++++++++ .../sqoop/mapreduce/PhoenixImportMapper.java | 86 ++++++++++++++ .../sqoop/phoenix/PhoenixSqoopWritable.java | 87 ++++++++++++++ .../org/apache/sqoop/tool/BaseSqoopTool.java | 32 ++++++ .../org/apache/sqoop/tool/ImportTool.java | 2 + 9 files changed, 354 insertions(+), 11 deletions(-) create mode 100644 src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java create mode 100644 src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java create mode 100644 src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java diff --git a/build.xml b/build.xml index d614d0985..86339b599 100644 --- a/build.xml +++ b/build.xml @@ -191,7 +191,7 @@ - + diff --git a/ivy.xml b/ivy.xml index a93d0afd2..bcac330c3 100644 --- a/ivy.xml +++ b/ivy.xml @@ -37,6 +37,7 @@ under the License. extends="runtime" description="artifacts needed to compile/test the application"/> + @@ -46,15 +47,15 @@ under the License. + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> + extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo,phoenix" /> @@ -291,7 +292,10 @@ under the License. - + + + + diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index ef6e0cec5..798741b0c 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -316,6 +316,12 @@ public String toString() { // explicit split by cols @StoredAsProperty("reset.onemapper") private boolean autoResetToOneMapper; + // Phoenix table to import into. + @StoredAsProperty("phoenix.table") private String phoenixTable; + + // Phoenix columns to be upserted to . The order should confirm to the import table select query + @StoredAsProperty("phoenix.columns") private String phoenixColumns; + // These next two fields are not serialized to the metastore. // If this SqoopOptions is created by reading a saved job, these will // be populated by the JobStorage to facilitate updating the same @@ -432,7 +438,7 @@ private void setDelimiterProperties(Properties props, putProperty(props, prefix + ".escape", Integer.toString((int) values.getEscapedBy())); putProperty(props, prefix + ".enclose.required", - Boolean.toString(values.isEncloseRequired())); + Boolean.toString(values.isEncloseRequired())); } /** Take a comma-delimited list of input and split the elements @@ -1894,6 +1900,21 @@ public void setConf(Configuration config) { this.conf = config; } + public String getPhoenixTable() { + return phoenixTable; + } + public void setPhoenixTable(String phoenixTable) { + this.phoenixTable = phoenixTable; + } + + public String getPhoenixColumns() { + return phoenixColumns; + } + + public void setPhoenixColumns(String phoenixColumns) { + this.phoenixColumns = phoenixColumns; + } + /** * @return command-line arguments after a '-'. */ diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index ead581df1..d97858334 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -44,6 +44,7 @@ import org.apache.sqoop.mapreduce.AccumuloImportJob; import org.apache.sqoop.mapreduce.HBaseBulkImportJob; import org.apache.sqoop.mapreduce.JdbcCallExportJob; +import org.apache.sqoop.mapreduce.PhoenixImportJob; import org.apache.sqoop.util.LoggingUtils; import org.apache.sqoop.util.SqlTypeMap; @@ -643,7 +644,9 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) context.setConnManager(this); ImportJobBase importer; - if (opts.getHBaseTable() != null) { + if(opts.getPhoenixTable() != null) { + importer = new PhoenixImportJob(opts, context); + } else if (opts.getHBaseTable() != null) { // Import to HBase. if (!HBaseUtil.isHBaseJarPresent()) { throw new ImportException("HBase jars are not present in " @@ -666,7 +669,6 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) importer = new DataDrivenImportJob(opts, context.getInputFormat(), context); } - checkTableImportOptions(context); String splitCol = getSplitColumn(opts, tableName); @@ -686,7 +688,9 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context) context.setConnManager(this); ImportJobBase importer; - if (opts.getHBaseTable() != null) { + if(opts.getPhoenixTable() != null) { + importer = new PhoenixImportJob(opts, context); + } else if (opts.getHBaseTable() != null) { // Import to HBase. if (!HBaseUtil.isHBaseJarPresent()) { throw new ImportException("HBase jars are not present in classpath," diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java b/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java new file mode 100644 index 000000000..f6210bad3 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java @@ -0,0 +1,107 @@ +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.sqoop.phoenix.PhoenixSqoopWritable; +import org.apache.hadoop.hbase.HBaseConfiguration; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.lib.FieldMapProcessor; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.util.ImportException; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; + +import org.apache.phoenix.mapreduce.PhoenixOutputFormat; + +/** + * + */ +public class PhoenixImportJob extends DataDrivenImportJob { + + public static final Log LOG = LogFactory.getLog( + PhoenixImportJob.class.getName()); + + public static final String PHOENIX_IMPORT_COLUMNS = "phoenix.sqoop.import.columns"; + + public PhoenixImportJob(final SqoopOptions opts, + final ImportJobContext importContext) { + super(opts, importContext.getInputFormat(), importContext); + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(PhoenixSqoopWritable.class); + job.setMapperClass(getMapperClass()); + } + + @Override + protected Class getMapperClass() { + return PhoenixImportMapper.class; + } + + @Override + protected Class getOutputFormatClass() + throws ClassNotFoundException { + return PhoenixOutputFormat.class; + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws ClassNotFoundException, IOException { + + ConnManager manager = getContext().getConnManager(); + String[] columnNames = manager.getColumnNames(tableName); + final String phoenixColumns = options.getPhoenixColumns(); + + if(phoenixColumns == null || phoenixColumns.length() == 0) { + job.getConfiguration().set(PHOENIX_IMPORT_COLUMNS, Joiner.on(",").join(columnNames).toUpperCase()); + } else { + // validate if the columns count match. + String[] phoenixColumnNames = phoenixColumns.split("\\s*,\\s*"); + if(phoenixColumnNames.length != columnNames.length) { + throw new RuntimeException(String.format(" We import [%s] columns from table [%s] " + + " but are writing to [%s] columns of [%s] phoenix table", columnNames.length,tableName,phoenixColumnNames.length,options.getPhoenixTable())); + } + job.getConfiguration().set(PHOENIX_IMPORT_COLUMNS,phoenixColumns); + } + job.setOutputFormatClass(getOutputFormatClass()); + } + + @Override + protected void jobSetup(Job job) throws IOException, ImportException { + super.jobSetup(job); + Configuration conf = job.getConfiguration(); + final String tableName = options.getPhoenixTable(); + final String columns = conf.get(PHOENIX_IMPORT_COLUMNS); + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(columns); + + HBaseConfiguration.addHbaseResources(conf); + + // set the table and columns. + PhoenixMapReduceUtil.setOutput(job, options.getPhoenixTable(), columns); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJars(conf, HTable.class); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), PhoenixDriver.class); + } + + +} diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java b/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java new file mode 100644 index 000000000..4c35f1094 --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.sqoop.phoenix.PhoenixSqoopWritable; + +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.mapreduce.AutoProgressMapper; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Imports records by writing them to Phoenix + * + */ +public class PhoenixImportMapper + extends AutoProgressMapper { + + public static final Log LOG = LogFactory.getLog( + PhoenixImportMapper.class.getName()); + + private Configuration conf; + private List columnInfos; + + @Override + protected void setup(Mapper.Context context) + throws IOException, InterruptedException { + conf = context.getConfiguration(); + try { + columnInfos = PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf); + } catch (SQLException e) { + throw new RuntimeException("Failed to load the upsert column metadata for table."); + } + } + + @Override + public void map(LongWritable key, SqoopRecord val, Context context) + throws IOException, InterruptedException { + + Map fields = val.getFieldMap(); + //TODO: need to optimize this call. + Map keysToUpper = Maps.newHashMapWithExpectedSize(fields.size()); + for(Map.Entry kv : fields.entrySet()) { + keysToUpper.put(kv.getKey().toUpperCase(), kv.getValue()); + } + PhoenixSqoopWritable recordWritable = new PhoenixSqoopWritable(); + recordWritable.setColumnMetadata(columnInfos); + List columnValues = Lists.newArrayListWithCapacity(columnInfos.size()); + for(ColumnInfo column : columnInfos) { + String columnName = column.getDisplayName(); + Object columnValue = keysToUpper.get(columnName); + columnValues.add(columnValue); + } + recordWritable.setValues(columnValues); + context.write(NullWritable.get(), recordWritable); + } +} diff --git a/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java b/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java new file mode 100644 index 000000000..a1721a23c --- /dev/null +++ b/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.phoenix; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.util.ColumnInfo; + +/** + * + * Writable class. + * + */ +public class PhoenixSqoopWritable implements DBWritable { + + private List columnMetadata; + + private List values; + + private int columnCount = -1; + + @Override + public void write(PreparedStatement statement) throws SQLException { + for(int i = 0 ; i < values.size() ; i++) { + Object value = values.get(i); + ColumnInfo columnInfo = columnMetadata.get(i); + if(value == null) { + statement.setNull(i + 1, columnInfo.getSqlType()); + } else { + statement.setObject(i + 1, value , columnInfo.getSqlType()); + } + } + + } + + @Override + public void readFields(ResultSet resultSet) throws SQLException { + // we do this once per mapper. + if(columnCount == -1) { + this.columnCount = resultSet.getMetaData().getColumnCount(); + } + + values = new ArrayList(columnCount); + for(int i = 0 ; i < columnCount ; i++) { + Object value = resultSet.getObject(i + 1); + values.add(value); + } + + } + + public List getColumnMetadata() { + return columnMetadata; + } + + public void setColumnMetadata(List columnMetadata) { + this.columnMetadata = columnMetadata; + } + + public List getValues() { + return values; + } + + public void setValues(List values) { + this.values = values; + } + +} diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index c97bb589f..1cfe3de35 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -188,6 +188,10 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { "hbase-bulkload"; public static final String HBASE_CREATE_TABLE_ARG = "hbase-create-table"; + // Phoenix arguments + public static final String PHOENIX_TABLE_ARG = "phoenix-table"; + public static final String PHOENIX_COLUMNS_ARG = "phoenix-columns"; + //Accumulo arguments. public static final String ACCUMULO_TABLE_ARG = "accumulo-table"; public static final String ACCUMULO_COL_FAM_ARG = "accumulo-column-family"; @@ -774,6 +778,34 @@ protected RelatedOptions getHBaseOptions() { return hbaseOpts; } + protected RelatedOptions getPhoenixOptions() { + RelatedOptions phoenixOpts = + new RelatedOptions("Phoenix arguments"); + phoenixOpts.addOption(OptionBuilder.withArgName("table") + .hasArg() + .withDescription("Phoenix table to import data to") + .withLongOpt(PHOENIX_TABLE_ARG) + .create()); + phoenixOpts.addOption(OptionBuilder.withArgName("columns") + .hasArg() + .withDescription("columns to import.") + .withLongOpt(PHOENIX_COLUMNS_ARG) + .create()); + + return phoenixOpts; + + } + + protected void applyPhoenixOptions(CommandLine in, SqoopOptions out) { + if (in.hasOption(PHOENIX_TABLE_ARG)) { + out.setPhoenixTable(in.getOptionValue(PHOENIX_TABLE_ARG)); + } + + if (in.hasOption(PHOENIX_COLUMNS_ARG)) { + out.setPhoenixColumns(in.getOptionValue(PHOENIX_COLUMNS_ARG)); + } + } + protected RelatedOptions getAccumuloOptions() { RelatedOptions accumuloOpts = new RelatedOptions("Accumulo arguments"); diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index c79e044d4..e81f5bc14 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -800,6 +800,7 @@ public void configureOptions(ToolOptions toolOptions) { toolOptions.addUniqueOptions(getHCatalogOptions()); toolOptions.addUniqueOptions(getHCatImportOnlyOptions()); toolOptions.addUniqueOptions(getAccumuloOptions()); + toolOptions.addUniqueOptions(getPhoenixOptions()); // get common codegen opts. RelatedOptions codeGenOpts = getCodeGenOpts(allTables); @@ -984,6 +985,7 @@ public void applyOptions(CommandLine in, SqoopOptions out) applyHBaseOptions(in, out); applyHCatalogOptions(in, out); applyAccumuloOptions(in, out); + applyPhoenixOptions(in, out); } catch (NumberFormatException nfe) { throw new InvalidOptionsException("Error: expected numeric argument.\n" From de739925d10641e091350469a656ea7b64de82b9 Mon Sep 17 00:00:00 2001 From: ravimagham Date: Sat, 31 Oct 2015 20:55:07 -0700 Subject: [PATCH 2/6] code drop for bulk load onto phoenix --- build.xml | 3 + ivy.xml | 3 +- ivy/ivysettings.xml | 3 + src/docs/user/import.txt | 3 + src/docs/user/phoenix-args.txt | 33 +++ src/docs/user/phoenix.txt | 47 ++++ src/docs/user/validation.txt | 2 +- src/java/org/apache/sqoop/SqoopOptions.java | 68 +++-- .../org/apache/sqoop/manager/ConnManager.java | 9 + .../org/apache/sqoop/manager/SqlManager.java | 30 ++- .../apache/sqoop/mapreduce/ImportJobBase.java | 5 + .../sqoop/mapreduce/PhoenixBulkImportJob.java | 194 +++++++++++++++ .../mapreduce/PhoenixBulkImportMapper.java | 123 ++++++++++ .../sqoop/mapreduce/PhoenixImportJob.java | 70 +++--- .../sqoop/mapreduce/PhoenixImportMapper.java | 21 +- .../sqoop/phoenix/PhoenixConstants.java | 30 +++ .../org/apache/sqoop/phoenix/PhoenixUtil.java | 98 ++++++++ .../org/apache/sqoop/tool/BaseSqoopTool.java | 32 ++- .../org/apache/sqoop/tool/ImportTool.java | 1 + .../sqoop/phoenix/PhoenixImportTest.java | 83 +++++++ .../sqoop/phoenix/PhoenixTestCase.java | 232 ++++++++++++++++++ 21 files changed, 1016 insertions(+), 74 deletions(-) create mode 100644 src/docs/user/phoenix-args.txt create mode 100644 src/docs/user/phoenix.txt create mode 100644 src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java create mode 100644 src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java create mode 100644 src/java/org/apache/sqoop/phoenix/PhoenixConstants.java create mode 100644 src/java/org/apache/sqoop/phoenix/PhoenixUtil.java create mode 100644 src/test/com/cloudera/sqoop/phoenix/PhoenixImportTest.java create mode 100644 src/test/com/cloudera/sqoop/phoenix/PhoenixTestCase.java diff --git a/build.xml b/build.xml index 86339b599..06c985dc1 100644 --- a/build.xml +++ b/build.xml @@ -192,6 +192,9 @@ + + + diff --git a/ivy.xml b/ivy.xml index bcac330c3..fd341f09a 100644 --- a/ivy.xml +++ b/ivy.xml @@ -292,8 +292,9 @@ under the License. - + + + diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt index df0415766..1e80fbd4b 100644 --- a/src/docs/user/import.txt +++ b/src/docs/user/import.txt @@ -561,6 +561,9 @@ include::hbase.txt[] include::accumulo-args.txt[] include::accumulo.txt[] +include::phoenix-args.txt[] +include::phoenix.txt[] + include::codegen-args.txt[] As mentioned earlier, a byproduct of importing a table to HDFS is a diff --git a/src/docs/user/phoenix-args.txt b/src/docs/user/phoenix-args.txt new file mode 100644 index 000000000..2c39675cf --- /dev/null +++ b/src/docs/user/phoenix-args.txt @@ -0,0 +1,33 @@ + +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +.Phoenix arguments: +[grid="all"] +`-------------------------------------`----------------------------------- +Argument Description +-------------------------------------------------------------------------- ++\--phoenix-table + Specifies Phoenix table to use\ + as the target instead of HDFS ++\--phoenix-column-mapping + (Optional)Comma-separated mapping of\ + sqoop column to phoenix column. + The two should be separated by ; ++\--phoenix-bulkload+ Enables bulk loading +-------------------------------------------------------------------------- + diff --git a/src/docs/user/phoenix.txt b/src/docs/user/phoenix.txt new file mode 100644 index 000000000..18e24b11a --- /dev/null +++ b/src/docs/user/phoenix.txt @@ -0,0 +1,47 @@ + +//// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +//// + + +Importing Data Into Phoenix +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Sqoop supports additional import targets beyond HDFS and Hive. Sqoop +can also import records into a table in Phoenix. + +By specifying +\--phoenix-table+, you instruct Sqoop to import +to a table in Phoenix backed Hbase rather than a directory in HDFS. Sqoop will +import data to the table specified as the argument to +\--phoenix-table+. + +NOTE: This function is incompatible with direct import (parameter ++\--direct+). + +If the column names in the input table differ from the column names +in Phoenix, specify +\--phoenix-column-mapping+. The argument is a +comma-separated list of input column name and phoenix column name . The +two should be separated by a ;. +Ex: sqoop_col1;phoenix_col1,sqoop_col2;phoenix_col2.. +The mapper transforms each row of the input table into an upsert +into Phoenix and persists using PhoenixOutputFormat. + + +If the target Phoenix table do not exist, the Sqoop job will +exit with an error. You should create the target table before running an import. + +To decrease the load on hbase, Sqoop can do bulk loading as opposed to +direct writes. To use bulk loading, enable it using +\--phoenix-bulkload+. diff --git a/src/docs/user/validation.txt b/src/docs/user/validation.txt index 27a78e22d..ccb8f69e6 100644 --- a/src/docs/user/validation.txt +++ b/src/docs/user/validation.txt @@ -101,7 +101,7 @@ The following are the limitations in the current implementation: * all-tables option * free-form query option -* Data imported into Hive, HBase or Accumulo +* Data imported into Hive, HBase, Phoenix or Accumulo * table import with --where argument * incremental imports diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index 798741b0c..a34f1edff 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -319,9 +319,12 @@ public String toString() { // Phoenix table to import into. @StoredAsProperty("phoenix.table") private String phoenixTable; - // Phoenix columns to be upserted to . The order should confirm to the import table select query - @StoredAsProperty("phoenix.columns") private String phoenixColumns; + //Phoenix column mapping to db columns. + @StoredAsProperty("phoenix.column.mapping") private String phoenixColumnMapping; + //Is bulkload. + @StoredAsProperty("phoenix.bulk.load.enabled") private boolean phoenixBulkLoadEnabled; + // These next two fields are not serialized to the metastore. // If this SqoopOptions is created by reading a saved job, these will // be populated by the JobStorage to facilitate updating the same @@ -1900,21 +1903,7 @@ public void setConf(Configuration config) { this.conf = config; } - public String getPhoenixTable() { - return phoenixTable; - } - public void setPhoenixTable(String phoenixTable) { - this.phoenixTable = phoenixTable; - } - - public String getPhoenixColumns() { - return phoenixColumns; - } - - public void setPhoenixColumns(String phoenixColumns) { - this.phoenixColumns = phoenixColumns; - } - + /** * @return command-line arguments after a '-'. */ @@ -2425,6 +2414,51 @@ public void setAccumuloZookeepers(String zookeepers) { this.accumuloZookeepers = zookeepers; } + /** + * Get the phoenix table to import + * @return + */ + public String getPhoenixTable() { + return phoenixTable; + } + + /** + * sets the target phoenix table + * @param phoenixTable + */ + public void setPhoenixTable(String phoenixTable) { + this.phoenixTable = phoenixTable; + } + + /** + * one to one mapping between db columns and phoenix columns for the table + * the pattern is dbcolumn1;phoenixcolumn1,dbcolumn2;phoenixcolumn2 + * @return + */ + public String getPhoenixColumnMapping() { + return phoenixColumnMapping; + } + + /** + * sets the db column manpping to phoenix column mapping. + * @param phoenixColumnMapping + */ + public void setPhoenixColumnMapping(String phoenixColumnMapping) { + this.phoenixColumnMapping = phoenixColumnMapping; + } + + /** + * returns if the load to phoenix is through the bulk load + * @return + */ + public boolean isPhoenixBulkLoadEnabled() { + return phoenixBulkLoadEnabled; + } + + public void setPhoenixBulkLoadEnabled(boolean phoenixBulkLoadEnabled) { + this.phoenixBulkLoadEnabled = phoenixBulkLoadEnabled; + } + public void setConnManagerClassName(String connManagerClass) { this.connManagerClassName = connManagerClass; } diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java index d9569c590..517c8cf4a 100644 --- a/src/java/org/apache/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/sqoop/manager/ConnManager.java @@ -813,6 +813,15 @@ public boolean isDirectModeHBaseSupported() { public boolean isDirectModeAccumuloSupported() { return false; } + + /** + * Determine if phoenix operations from direct mode of the connector is + * allowed. By default direct mode is not compatible with Phoenix + * @return Whether direct mode is allowed. + */ + public boolean isDirectModePhoenixSupported() { + return false; + } } diff --git a/src/java/org/apache/sqoop/manager/SqlManager.java b/src/java/org/apache/sqoop/manager/SqlManager.java index d97858334..f2789b56a 100644 --- a/src/java/org/apache/sqoop/manager/SqlManager.java +++ b/src/java/org/apache/sqoop/manager/SqlManager.java @@ -44,7 +44,9 @@ import org.apache.sqoop.mapreduce.AccumuloImportJob; import org.apache.sqoop.mapreduce.HBaseBulkImportJob; import org.apache.sqoop.mapreduce.JdbcCallExportJob; +import org.apache.sqoop.mapreduce.PhoenixBulkImportJob; import org.apache.sqoop.mapreduce.PhoenixImportJob; +import org.apache.sqoop.phoenix.PhoenixUtil; import org.apache.sqoop.util.LoggingUtils; import org.apache.sqoop.util.SqlTypeMap; @@ -644,9 +646,7 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) context.setConnManager(this); ImportJobBase importer; - if(opts.getPhoenixTable() != null) { - importer = new PhoenixImportJob(opts, context); - } else if (opts.getHBaseTable() != null) { + if (opts.getHBaseTable() != null) { // Import to HBase. if (!HBaseUtil.isHBaseJarPresent()) { throw new ImportException("HBase jars are not present in " @@ -664,6 +664,16 @@ public void importTable(com.cloudera.sqoop.manager.ImportJobContext context) + "classpath, cannot import to Accumulo!"); } importer = new AccumuloImportJob(opts, context); + } else if(opts.getPhoenixTable() != null) { + if(!PhoenixUtil.isPhoenixJarPresent()) { + throw new ImportException("Phoenix jars are not present in " + + "classpath, cannot import to Phoenix!"); + } + if(opts.isPhoenixBulkLoadEnabled()) { + importer = new PhoenixBulkImportJob(opts, context); + } else { + importer = new PhoenixImportJob(opts, context); + } } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), @@ -688,9 +698,7 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context) context.setConnManager(this); ImportJobBase importer; - if(opts.getPhoenixTable() != null) { - importer = new PhoenixImportJob(opts, context); - } else if (opts.getHBaseTable() != null) { + if (opts.getHBaseTable() != null) { // Import to HBase. if (!HBaseUtil.isHBaseJarPresent()) { throw new ImportException("HBase jars are not present in classpath," @@ -708,6 +716,16 @@ public void importQuery(com.cloudera.sqoop.manager.ImportJobContext context) + " cannot import to Accumulo!"); } importer = new AccumuloImportJob(opts, context); + } else if(opts.getPhoenixTable() != null) { + if(!PhoenixUtil.isPhoenixJarPresent()) { + throw new ImportException("Phoenix jars are not present in " + + "classpath, cannot import to Phoenix!"); + } + if(opts.isPhoenixBulkLoadEnabled()) { + importer = new PhoenixBulkImportJob(opts, context); + } else { + importer = new PhoenixImportJob(opts, context); + } } else { // Import to HDFS. importer = new DataDrivenImportJob(opts, context.getInputFormat(), diff --git a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java index 04d60fd13..a3bcdec6a 100644 --- a/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java +++ b/src/java/org/apache/sqoop/mapreduce/ImportJobBase.java @@ -231,6 +231,11 @@ public void runImport(String tableName, String ormJarFile, String splitByCol, throw new IOException("Direct mode is incompatible with " + "HBase. Please remove the parameter --direct"); } + if (options.getPhoenixTable() != null && options.isDirect() + && !getContext().getConnManager().isDirectModePhoenixSupported()) { + throw new IOException("Direct mode is incompatible with " + + "Phoenix. Please remove the parameter --direct"); + } if (null != tableName) { LOG.info("Beginning import of " + tableName); } else { diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java new file mode 100644 index 000000000..2855a4a6a --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.sqoop.phoenix.PhoenixConstants; +import org.apache.sqoop.phoenix.PhoenixUtil; + +import com.cloudera.sqoop.SqoopOptions; +import com.cloudera.sqoop.manager.ConnManager; +import com.cloudera.sqoop.manager.ImportJobContext; +import com.cloudera.sqoop.util.ImportException; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; + +/** + * Job to bulk import data from db to phoenix. + * + */ +public class PhoenixBulkImportJob extends DataDrivenImportJob { + + public static final Log LOG = LogFactory.getLog( + PhoenixBulkImportJob.class.getName()); + + public PhoenixBulkImportJob(final SqoopOptions opts, + final ImportJobContext importContext) { + super(opts, importContext.getInputFormat(), importContext); + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setMapperClass(getMapperClass()); + } + + @Override + protected Class getMapperClass() { + return PhoenixBulkImportMapper.class; + } + + @Override + protected Class getOutputFormatClass() + throws ClassNotFoundException { + return HFileOutputFormat.class; + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws ClassNotFoundException, IOException { + + ConnManager manager = getContext().getConnManager(); + String[] sColumnNames = null; + if(null == tableName) { + String sqlQuery = options.getSqlQuery(); + sColumnNames = manager.getColumnNamesForQuery(sqlQuery); + } else { + if(null == options.getColumns()) { + sColumnNames = manager.getColumnNames(tableName); + } else { + sColumnNames = options.getColumns(); + } + } + String columnMappings = options.getPhoenixColumnMapping(); + // if column mappings aren't provided, we assume the column names in sqoop and phoenix match. + if(columnMappings == null || columnMappings.isEmpty()) { + columnMappings = Joiner.on(",").join(sColumnNames); + } + job.getConfiguration().set(PhoenixConstants.PHOENIX_SQOOP_COLUMNS, Joiner.on(",").join(sColumnNames)); + job.getConfiguration().set(PhoenixConstants.PHOENIX_COLUMN_MAPPING, columnMappings); + job.setOutputFormatClass(getOutputFormatClass()); + } + + @Override + protected void jobSetup(Job job) throws IOException, ImportException { + super.jobSetup(job); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.addHbaseResources(conf); + final String tableName = options.getPhoenixTable(); + final String sColumnNames = conf.get(PhoenixConstants.PHOENIX_SQOOP_COLUMNS); + final String columnMappings = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(sColumnNames); + + + final Map phoenixToSqoopMapping = PhoenixUtil.getPhoenixToSqoopMap(columnMappings); + final String pColumnNames = Joiner.on(",").join(phoenixToSqoopMapping.keySet()); + boolean isValidColumns = PhoenixUtil.validateColumns(sColumnNames,columnMappings); + if(!isValidColumns) { + throw new RuntimeException(String.format("Failure to map sqoop columns [%s] to phoenix columns [%s] ", sColumnNames,pColumnNames)); + } + + PhoenixMapReduceUtil.setOutput(job, tableName,pColumnNames); + + FileOutputFormat.setOutputPath(job, getContext().getDestination()); + HTable hTable = new HTable(job.getConfiguration(), tableName); + HFileOutputFormat.configureIncrementalLoad(job, hTable); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJars(conf, HTable.class); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), PhoenixDriver.class); + } + + @Override + protected void completeImport(Job job) throws IOException, ImportException { + super.completeImport(job); + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + + Path bulkLoadDir = getContext().getDestination(); + setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), + FsPermission.createImmutable((short) 00777)); + + HTable hTable = new HTable(job.getConfiguration(), options.getPhoenixTable()); + + // Load generated HFiles into table + try { + LoadIncrementalHFiles loader = new LoadIncrementalHFiles( + job.getConfiguration()); + loader.doBulkLoad(bulkLoadDir, hTable); + } + catch (Exception e) { + String errorMessage = String.format("Unrecoverable error while " + + "performing the bulk load of files in [%s]", + bulkLoadDir.toString()); + throw new ImportException(errorMessage, e); + } + } + + @Override + protected void jobTeardown(Job job) throws IOException, ImportException { + super.jobTeardown(job); + // Delete the hfiles directory after we are finished. + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + fileSystem.delete(getContext().getDestination(), true); + } + + /** + * Set the file permission of the path of the given fileStatus. If the path + * is a directory, apply permission recursively to all subdirectories and + * files. + * + * @param fs the filesystem + * @param fileStatus containing the path + * @param permission the permission + * @throws java.io.IOException + */ + private void setPermission(FileSystem fs, FileStatus fileStatus, + FsPermission permission) throws IOException { + if(fileStatus.isDir()) { + for(FileStatus file : fs.listStatus(fileStatus.getPath())){ + setPermission(fs, file, permission); + } + } + fs.setPermission(fileStatus.getPath(), permission); + } +} diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java new file mode 100644 index 000000000..b84921a8f --- /dev/null +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java @@ -0,0 +1,123 @@ +package org.apache.sqoop.mapreduce; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.sqoop.phoenix.PhoenixConstants; +import org.apache.sqoop.phoenix.PhoenixUtil; + +import com.cloudera.sqoop.lib.SqoopRecord; +import com.cloudera.sqoop.mapreduce.AutoProgressMapper; +import com.google.common.base.Splitter; +import com.google.common.collect.Maps; + +public class PhoenixBulkImportMapper + extends AutoProgressMapper { + + public static final Log LOG = LogFactory.getLog( + PhoenixBulkImportMapper.class.getName()); + + private Configuration conf; + private List columnInfos; + private PhoenixConnection conn; + private byte[] tableName; + private PreparedStatement preparedStatement = null; + + /* holds the mapping of phoenix column to db column */ + private Map columnMappings = null; + + @Override + protected void setup(Mapper.Context context) + throws IOException, InterruptedException { + conf = context.getConfiguration(); + try { + conn = (PhoenixConnection) QueryUtil.getConnection(conf); + conn.setAutoCommit(false); + String phoenixTable = PhoenixConfigurationUtil.getOutputTableName(conf); + tableName = Bytes.toBytes(phoenixTable); + columnInfos = PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf); + String upsertSql = QueryUtil.constructUpsertStatement(phoenixTable, columnInfos); + preparedStatement = conn.prepareStatement(upsertSql); + + String columnMaps = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); + for(Map.Entry prop : conf) { + System.out.println(String.format(" the key is [%s] and the value is [%s]" , prop.getKey(),prop.getValue())); + } + columnMappings = PhoenixUtil.getPhoenixToSqoopMap(columnMaps); + + } catch (Exception e) { + throw new RuntimeException("Failed to setup due to ." + e.getMessage()); + } + } + + + + @Override + protected void map(LongWritable key, SqoopRecord value, + Mapper.Context context) + throws IOException, InterruptedException { + + try { + ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); + Map fields = value.getFieldMap(); + int i = 1; + for(ColumnInfo colInfo : columnInfos) { + String pColName = colInfo.getDisplayName(); + String sColName = columnMappings.get(pColName); + Object sColValue = fields.get(sColName); + preparedStatement.setObject(i++, sColValue); + } + preparedStatement.execute(); + + Iterator>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(conn, true); + while (uncommittedDataIterator.hasNext()) { + Pair> kvPair = uncommittedDataIterator.next(); + if (Bytes.compareTo(tableName, kvPair.getFirst()) != 0) { + // skip edits for other tables + continue; + } + List keyValueList = kvPair.getSecond(); + for (KeyValue kv : keyValueList) { + outputKey.set(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()); + context.write(outputKey, kv); + } + } + conn.rollback(); + + } catch (SQLException e) { + throw new RuntimeException("Failed to process the record in the mapper due to " + e.getMessage()); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + conn.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java b/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java index f6210bad3..6cb7e3312 100644 --- a/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java @@ -1,11 +1,12 @@ package org.apache.sqoop.mapreduce; import java.io.IOException; -import java.util.Arrays; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.NullWritable; @@ -13,31 +14,27 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.mapreduce.PhoenixOutputFormat; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.sqoop.phoenix.PhoenixConstants; import org.apache.sqoop.phoenix.PhoenixSqoopWritable; -import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.sqoop.phoenix.PhoenixUtil; import com.cloudera.sqoop.SqoopOptions; -import com.cloudera.sqoop.lib.FieldMapProcessor; import com.cloudera.sqoop.manager.ConnManager; import com.cloudera.sqoop.manager.ImportJobContext; import com.cloudera.sqoop.util.ImportException; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; - -import org.apache.phoenix.mapreduce.PhoenixOutputFormat; /** - * + * Runs an Phoenix import via DataDrivenDBInputFormat and PhoenixOutputFormat */ public class PhoenixImportJob extends DataDrivenImportJob { public static final Log LOG = LogFactory.getLog( PhoenixImportJob.class.getName()); - public static final String PHOENIX_IMPORT_COLUMNS = "phoenix.sqoop.import.columns"; - public PhoenixImportJob(final SqoopOptions opts, final ImportJobContext importContext) { super(opts, importContext.getInputFormat(), importContext); @@ -65,43 +62,52 @@ protected Class getOutputFormatClass() @Override protected void configureOutputFormat(Job job, String tableName, String tableClassName) throws ClassNotFoundException, IOException { - ConnManager manager = getContext().getConnManager(); - String[] columnNames = manager.getColumnNames(tableName); - final String phoenixColumns = options.getPhoenixColumns(); - - if(phoenixColumns == null || phoenixColumns.length() == 0) { - job.getConfiguration().set(PHOENIX_IMPORT_COLUMNS, Joiner.on(",").join(columnNames).toUpperCase()); - } else { - // validate if the columns count match. - String[] phoenixColumnNames = phoenixColumns.split("\\s*,\\s*"); - if(phoenixColumnNames.length != columnNames.length) { - throw new RuntimeException(String.format(" We import [%s] columns from table [%s] " - + " but are writing to [%s] columns of [%s] phoenix table", columnNames.length,tableName,phoenixColumnNames.length,options.getPhoenixTable())); - } - job.getConfiguration().set(PHOENIX_IMPORT_COLUMNS,phoenixColumns); + String[] sColumnNames = null; + if(null == tableName) { + String sqlQuery = options.getSqlQuery(); + sColumnNames = manager.getColumnNamesForQuery(sqlQuery); + } else { + if(null == options.getColumns()) { + sColumnNames = manager.getColumnNames(tableName); + } else { + sColumnNames = options.getColumns(); + } + } + String columnMappings = options.getPhoenixColumnMapping(); + // if column mappings aren't provided, we assume the column names in sqoop and phoenix match. + if(columnMappings == null || columnMappings.isEmpty()) { + columnMappings = Joiner.on(",").join(sColumnNames); } + job.getConfiguration().set(PhoenixConstants.PHOENIX_SQOOP_COLUMNS, Joiner.on(",").join(sColumnNames)); + job.getConfiguration().set(PhoenixConstants.PHOENIX_COLUMN_MAPPING, columnMappings); job.setOutputFormatClass(getOutputFormatClass()); } @Override protected void jobSetup(Job job) throws IOException, ImportException { - super.jobSetup(job); - Configuration conf = job.getConfiguration(); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.addHbaseResources(conf); final String tableName = options.getPhoenixTable(); - final String columns = conf.get(PHOENIX_IMPORT_COLUMNS); + final String sColumnNames = conf.get(PhoenixConstants.PHOENIX_SQOOP_COLUMNS); + final String columnMappings = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); Preconditions.checkNotNull(tableName); - Preconditions.checkNotNull(columns); - - HBaseConfiguration.addHbaseResources(conf); + Preconditions.checkNotNull(sColumnNames); + Preconditions.checkNotNull(columnMappings); + final Map phoenixToSqoopMapping = PhoenixUtil.getPhoenixToSqoopMap(columnMappings); + final String pColumnNames = Joiner.on(",").join(phoenixToSqoopMapping.keySet()); + boolean isValidColumns = PhoenixUtil.validateColumns(sColumnNames,columnMappings); + if(!isValidColumns) { + throw new RuntimeException(String.format("Failure to map sqoop columns [%s] to phoenix columns [%s] ", sColumnNames,pColumnNames)); + } // set the table and columns. - PhoenixMapReduceUtil.setOutput(job, options.getPhoenixTable(), columns); + PhoenixMapReduceUtil.setOutput(job, tableName,pColumnNames); TableMapReduceUtil.initCredentials(job); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.addDependencyJars(conf, HTable.class); TableMapReduceUtil.addDependencyJars(job.getConfiguration(), PhoenixDriver.class); + super.jobSetup(job); + } - - } diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java b/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java index 4c35f1094..8510b24e2 100644 --- a/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java @@ -31,7 +31,9 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.util.ColumnInfo; +import org.apache.sqoop.phoenix.PhoenixConstants; import org.apache.sqoop.phoenix.PhoenixSqoopWritable; +import org.apache.sqoop.phoenix.PhoenixUtil; import com.cloudera.sqoop.lib.SqoopRecord; import com.cloudera.sqoop.mapreduce.AutoProgressMapper; @@ -50,6 +52,8 @@ public class PhoenixImportMapper private Configuration conf; private List columnInfos; + /* holds the mapping of phoenix column to db column */ + private Map columnMappings; @Override protected void setup(Mapper.Context context) @@ -57,6 +61,11 @@ protected void setup(Mapper prop : conf) { + System.out.println(String.format(" the key is [%s] and the value is [%s]" , prop.getKey(),prop.getValue())); + } + columnMappings = PhoenixUtil.getPhoenixToSqoopMap(columnMaps); } catch (SQLException e) { throw new RuntimeException("Failed to load the upsert column metadata for table."); } @@ -67,18 +76,14 @@ public void map(LongWritable key, SqoopRecord val, Context context) throws IOException, InterruptedException { Map fields = val.getFieldMap(); - //TODO: need to optimize this call. - Map keysToUpper = Maps.newHashMapWithExpectedSize(fields.size()); - for(Map.Entry kv : fields.entrySet()) { - keysToUpper.put(kv.getKey().toUpperCase(), kv.getValue()); - } PhoenixSqoopWritable recordWritable = new PhoenixSqoopWritable(); recordWritable.setColumnMetadata(columnInfos); List columnValues = Lists.newArrayListWithCapacity(columnInfos.size()); for(ColumnInfo column : columnInfos) { - String columnName = column.getDisplayName(); - Object columnValue = keysToUpper.get(columnName); - columnValues.add(columnValue); + String pColName = column.getDisplayName(); + String sColName = columnMappings.get(pColName); + Object sColValue = fields.get(sColName); + columnValues.add(sColValue); } recordWritable.setValues(columnValues); context.write(NullWritable.get(), recordWritable); diff --git a/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java b/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java new file mode 100644 index 000000000..3630db4e0 --- /dev/null +++ b/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.phoenix; + +/** + * Set of constants specifically for phoenix. + */ +public class PhoenixConstants { + + /** property used to specify the column mapping of db to phoenix **/ + public static final String PHOENIX_COLUMN_MAPPING = "sqoop.phoenix.import.column.mapping"; + + /** property used to specify the columns beings imported from sqoop. */ + public static final String PHOENIX_SQOOP_COLUMNS = "sqoop.phoenix.columns"; +} diff --git a/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java b/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java new file mode 100644 index 000000000..afeb68b25 --- /dev/null +++ b/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.phoenix; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * + * Utility class . + * + */ +public class PhoenixUtil { + + private static boolean testingMode = false; + + private PhoenixUtil() { } + + /** + * This is a way to make this always return false for testing. + */ + public static void setAlwaysNoPhoenixJarMode(boolean mode) { + testingMode = mode; + } + + public static boolean isPhoenixJarPresent() { + if (testingMode) { + return false; + } + + try { + // validate if hbase jars also exist in classpath. + Class.forName("org.apache.hadoop.hbase.client.HTable"); + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + } catch (ClassNotFoundException cnfe) { + return false; + } + return true; + } + + /** + * Generates a map of phoenix_column to sqoop column. + * @param columnMapping + * @return + */ + public static Map getPhoenixToSqoopMap(String columnMappings) { + + String[] split = columnMappings.split(","); + Map columnMappingsMap = new HashMap(); + for(String each : split) { + String[] sqoopToPhoenixMapping = each.split(";"); + // if the sqoop column name is the same as phoenix column name, we don't need to separate the columns by + // a ';' delimiter. + if(sqoopToPhoenixMapping.length == 2) { + columnMappingsMap.put(sqoopToPhoenixMapping[1], sqoopToPhoenixMapping[0]); + } else { + columnMappingsMap.put(sqoopToPhoenixMapping[0].toUpperCase(), sqoopToPhoenixMapping[0]); + } + } + return columnMappingsMap; + } + + /** + * does the following validations + * 1. count of columns in sqoop match phoenix + * 2. 1 to 1 mapping between sqoop column to phoenix column. + * @param columnNames + * @param phoenixColumnMappings + */ + public static boolean validateColumns(String sColumns, String columnMappings) { + Map phoenixToSqoopColumnMap = getPhoenixToSqoopMap(columnMappings); + String sqoopColumns[] = sColumns.split(","); + if(sqoopColumns.length != phoenixToSqoopColumnMap.size()) { + throw new RuntimeException("Mismatch in the columns being imported from Sqoop and writtent to phoenix"); + } + Collection values = phoenixToSqoopColumnMap.values(); + for(String sqoopColumn : sqoopColumns) { + assert values.contains(sqoopColumn); + } + return true; + } + } diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java index 1cfe3de35..0d8530b7c 100644 --- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java +++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java @@ -190,7 +190,8 @@ public abstract class BaseSqoopTool extends com.cloudera.sqoop.tool.SqoopTool { // Phoenix arguments public static final String PHOENIX_TABLE_ARG = "phoenix-table"; - public static final String PHOENIX_COLUMNS_ARG = "phoenix-columns"; + public static final String PHOENIX_COLUMN_MAPPING_ARG = "phoenix-column-mapping"; + public static final String PHOENIX_BULK_LOAD_ENABLED_ARG = "phoenix-bulkload"; //Accumulo arguments. public static final String ACCUMULO_TABLE_ARG = "accumulo-table"; @@ -786,12 +787,15 @@ protected RelatedOptions getPhoenixOptions() { .withDescription("Phoenix table to import data to") .withLongOpt(PHOENIX_TABLE_ARG) .create()); - phoenixOpts.addOption(OptionBuilder.withArgName("columns") - .hasArg() - .withDescription("columns to import.") - .withLongOpt(PHOENIX_COLUMNS_ARG) - .create()); - + phoenixOpts.addOption(OptionBuilder.withArgName("phoenix-column-mapping") + .hasArg() + .withDescription("column mapping between db and phoenix.") + .withLongOpt(PHOENIX_COLUMN_MAPPING_ARG) + .create()); + phoenixOpts.addOption(OptionBuilder + .withDescription("Enable Phoenix bulk load") + .withLongOpt(PHOENIX_BULK_LOAD_ENABLED_ARG) + .create()); return phoenixOpts; } @@ -801,9 +805,11 @@ protected void applyPhoenixOptions(CommandLine in, SqoopOptions out) { out.setPhoenixTable(in.getOptionValue(PHOENIX_TABLE_ARG)); } - if (in.hasOption(PHOENIX_COLUMNS_ARG)) { - out.setPhoenixColumns(in.getOptionValue(PHOENIX_COLUMNS_ARG)); + if(in.hasOption(PHOENIX_COLUMN_MAPPING_ARG)) { + out.setPhoenixColumnMapping(in.getOptionValue(PHOENIX_COLUMN_MAPPING_ARG)); } + + out.setPhoenixBulkLoadEnabled(in.hasOption(PHOENIX_BULK_LOAD_ENABLED_ARG)); } protected RelatedOptions getAccumuloOptions() { @@ -1668,6 +1674,14 @@ protected void validateHBaseOptions(SqoopOptions options) throw new InvalidOptionsException(validationMessage); } } + + protected void validatePhoenixOptions(SqoopOptions options) + throws InvalidOptionsException { + if(options.isPhoenixBulkLoadEnabled() && options.getPhoenixTable() == null) { + throw new InvalidOptionsException("bulkload requires --phoenix-bulkload and --phoenix-table options " + + HELP_STR); + } + } /** * Given an array of extra arguments (usually populated via diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java index e81f5bc14..16aeb84e4 100644 --- a/src/java/org/apache/sqoop/tool/ImportTool.java +++ b/src/java/org/apache/sqoop/tool/ImportTool.java @@ -1132,6 +1132,7 @@ public void validateOptions(SqoopOptions options) validateHiveOptions(options); validateHCatalogOptions(options); validateAccumuloOptions(options); + validatePhoenixOptions(options); } } diff --git a/src/test/com/cloudera/sqoop/phoenix/PhoenixImportTest.java b/src/test/com/cloudera/sqoop/phoenix/PhoenixImportTest.java new file mode 100644 index 000000000..9e13362c0 --- /dev/null +++ b/src/test/com/cloudera/sqoop/phoenix/PhoenixImportTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.phoenix; + +import java.sql.ResultSet; +import java.sql.Statement; + +import org.junit.Test; + +/** + * + * Tests import to phoenix tables. + * + */ +public class PhoenixImportTest extends PhoenixTestCase { + + @Test + public void testBasicUsageWithNoColumnMapping() throws Exception { + String tableName = "TABLE1"; + String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createTestTable(ddl); + String [] argv = getArgv(true, tableName, null, null); + String [] columnNames = { "ID" , "NAME" , "AGE"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "Name 1" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + runImport(argv); + + //verify the data. + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + + @Test + public void testBasicUsageWithColumnMapping() throws Exception { + String tableName = "TABLE2"; + String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createTestTable(ddl); + String [] argv = getArgv(true, tableName, "rowid;ID,name;NAME,age;AGE", null); + String [] columnNames = { "rowid" , "name" , "age"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "Name 1" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + runImport(argv); + + //verify the data. + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + +} diff --git a/src/test/com/cloudera/sqoop/phoenix/PhoenixTestCase.java b/src/test/com/cloudera/sqoop/phoenix/PhoenixTestCase.java new file mode 100644 index 000000000..c794cabba --- /dev/null +++ b/src/test/com/cloudera/sqoop/phoenix/PhoenixTestCase.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.cloudera.sqoop.phoenix; + +import java.io.File; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Properties; +import java.util.UUID; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.util.StringUtils; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.schema.TableAlreadyExistsException; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.After; +import org.junit.Before; + +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.HsqldbTestServer; +import com.cloudera.sqoop.testutil.ImportJobTestCase; + +/** + * + * Base test class for all phoenix tests. + * + */ +public abstract class PhoenixTestCase extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog( + PhoenixTestCase.class.getName()); + + private static String testBuildDataProperty = ""; + + private static void recordTestBuildDataProperty() { + testBuildDataProperty = System.getProperty("test.build.data", ""); + } + + private static void restoreTestBuidlDataProperty() { + System.setProperty("test.build.data", testBuildDataProperty); + } + + private HBaseTestingUtility hbaseTestUtil; + private String workDir = createTempDir().getAbsolutePath(); + private MiniZooKeeperCluster zookeeperCluster; + private MiniHBaseCluster hbaseCluster; + private static String zkQuorum; + protected static Connection conn; + + @Override + @Before + public void setUp() { + try { + PhoenixTestCase.recordTestBuildDataProperty(); + String hbaseDir = new File(workDir, "hbase").getAbsolutePath(); + String hbaseRoot = "file://" + hbaseDir; + Configuration hbaseConf = HBaseConfiguration.create(); + hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot); + //Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT + hbaseConf.setInt("hbase.zookeeper.property.clientPort", 21818); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0"); + hbaseConf.setInt("hbase.master.info.port", -1); + hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500); + String zookeeperDir = new File(workDir, "zk").getAbsolutePath(); + int zookeeperPort = 21818; + zookeeperCluster = new MiniZooKeeperCluster(); + Method m; + Class zkParam[] = {Integer.TYPE}; + try { + m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort", + zkParam); + } catch (NoSuchMethodException e) { + m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", + zkParam); + } + m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)}); + zookeeperCluster.startup(new File(zookeeperDir)); + hbaseCluster = new MiniHBaseCluster(hbaseConf, 1); + HMaster master = hbaseCluster.getMaster(); + Object serverName = master.getServerName(); + + String hostAndPort; + if (serverName instanceof String) { + System.out.println("Server name is string, using HServerAddress."); + m = HMaster.class.getDeclaredMethod("getMasterAddress", + new Class[]{}); + Class clazz = Class.forName("org.apache.hadoop.hbase.HServerAddress"); + /* + * Call method to get server address + */ + Object serverAddr = clazz.cast(m.invoke(master, new Object[]{})); + //returns the address as hostname:port + hostAndPort = serverAddr.toString(); + } else { + System.out.println("ServerName is org.apache.hadoop.hbase.ServerName," + + "using getHostAndPort()"); + Class clazz = Class.forName("org.apache.hadoop.hbase.ServerName"); + m = clazz.getDeclaredMethod("getHostAndPort", new Class[]{}); + hostAndPort = m.invoke(serverName, new Object[]{}).toString(); + } + hbaseConf.set("hbase.master", hostAndPort); + hbaseTestUtil = new HBaseTestingUtility(hbaseConf); + hbaseTestUtil.setZkCluster(zookeeperCluster); + hbaseCluster.startMaster(); + Class.forName(PhoenixDriver.class.getName()); + zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum); + super.setUp(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + public static File createTempDir() { + File baseDir = new File(System.getProperty("java.io.tmpdir")); + File tempDir = new File(baseDir, UUID.randomUUID().toString()); + if (tempDir.mkdir()) { + return tempDir; + } + throw new IllegalStateException("Failed to create directory"); + } + + /** + * creates the test table given the ddl + * @param url + * @param ddl + * @throws SQLException + */ + protected static void createTestTable(String ddl) throws SQLException { + assertNotNull(ddl); + try { + PreparedStatement stmt = conn.prepareStatement(ddl); + stmt.execute(ddl); + } catch (TableAlreadyExistsException e) { + // ignore if already exists + } + } + + public void shutdown() throws Exception { + LOG.info("In shutdown() method"); + if (null != hbaseTestUtil) { + LOG.info("Shutting down HBase cluster"); + hbaseCluster.shutdown(); + zookeeperCluster.shutdown(); + hbaseTestUtil = null; + conn.close(); + } + FileUtils.deleteDirectory(new File(workDir)); + LOG.info("shutdown() method returning."); + } + + @Override + @After + public void tearDown() { + try { + shutdown(); + } catch (Exception e) { + LOG.warn("Error shutting down HBase minicluster: " + + StringUtils.stringifyException(e)); + } + PhoenixTestCase.restoreTestBuidlDataProperty(); + super.tearDown(); + } + + /** + * Create the argv to pass to Sqoop. + * @return the argv as an array of strings. + */ + protected String [] getArgv(boolean includeHadoopFlags, + String phoenixTable, String phoenixColumnMapping,String queryStr) { + + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + args.add("-D"); + args.add("hbase.zookeeper.property.clientPort=21818"); + } + + if (null != queryStr) { + args.add("--query"); + args.add(queryStr); + } else { + args.add("--table"); + args.add(getTableName()); + } + args.add("--split-by"); + args.add(getColName(0)); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--num-mappers"); + args.add("1"); + args.add("--phoenix-table"); + args.add(phoenixTable); + if(null != phoenixColumnMapping) { + args.add("--phoenix-column-mapping"); + args.add(phoenixColumnMapping); + } + + return args.toArray(new String[0]); + } +} From c7839fd1ffe7575d77612dc20da4d06be7db374f Mon Sep 17 00:00:00 2001 From: Ravi Magham Date: Thu, 12 Nov 2015 09:39:13 -0800 Subject: [PATCH 3/6] code formatting to meet Sqoop indentation style --- src/java/org/apache/sqoop/SqoopOptions.java | 4 +- .../org/apache/sqoop/manager/ConnManager.java | 3 +- .../sqoop/mapreduce/PhoenixBulkImportJob.java | 243 +++++++++--------- .../mapreduce/PhoenixBulkImportMapper.java | 63 +++-- .../sqoop/mapreduce/PhoenixImportJob.java | 154 ++++++----- .../sqoop/mapreduce/PhoenixImportMapper.java | 13 +- .../sqoop/phoenix/PhoenixConstants.java | 8 +- .../sqoop/phoenix/PhoenixSqoopWritable.java | 93 +++---- .../org/apache/sqoop/phoenix/PhoenixUtil.java | 61 +++-- 9 files changed, 340 insertions(+), 302 deletions(-) diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java index a34f1edff..220dfc95e 100644 --- a/src/java/org/apache/sqoop/SqoopOptions.java +++ b/src/java/org/apache/sqoop/SqoopOptions.java @@ -319,10 +319,10 @@ public String toString() { // Phoenix table to import into. @StoredAsProperty("phoenix.table") private String phoenixTable; - //Phoenix column mapping to db columns. + //Phoenix column mapping to sqoop columns. @StoredAsProperty("phoenix.column.mapping") private String phoenixColumnMapping; - //Is bulkload. + //Is this a bulkload job. @StoredAsProperty("phoenix.bulk.load.enabled") private boolean phoenixBulkLoadEnabled; // These next two fields are not serialized to the metastore. diff --git a/src/java/org/apache/sqoop/manager/ConnManager.java b/src/java/org/apache/sqoop/manager/ConnManager.java index 517c8cf4a..46b870f04 100644 --- a/src/java/org/apache/sqoop/manager/ConnManager.java +++ b/src/java/org/apache/sqoop/manager/ConnManager.java @@ -815,8 +815,7 @@ public boolean isDirectModeAccumuloSupported() { } /** - * Determine if phoenix operations from direct mode of the connector is - * allowed. By default direct mode is not compatible with Phoenix + * By default direct mode is not compatible with Phoenix * @return Whether direct mode is allowed. */ public boolean isDirectModePhoenixSupported() { diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java index 2855a4a6a..1931ee9ac 100644 --- a/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportJob.java @@ -52,143 +52,144 @@ /** * Job to bulk import data from db to phoenix. - * */ public class PhoenixBulkImportJob extends DataDrivenImportJob { public static final Log LOG = LogFactory.getLog( PhoenixBulkImportJob.class.getName()); - public PhoenixBulkImportJob(final SqoopOptions opts, + public PhoenixBulkImportJob(final SqoopOptions opts, final ImportJobContext importContext) { - super(opts, importContext.getInputFormat(), importContext); + super(opts, importContext.getInputFormat(), importContext); + } + + @Override + protected void configureMapper(Job job, String tableName, + String tableClassName) throws IOException { + job.setOutputKeyClass(ImmutableBytesWritable.class); + job.setOutputValueClass(KeyValue.class); + job.setMapperClass(getMapperClass()); + } + + @Override + protected Class getMapperClass() { + return PhoenixBulkImportMapper.class; + } + + @Override + protected Class getOutputFormatClass() + throws ClassNotFoundException { + return HFileOutputFormat.class; + } + + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws ClassNotFoundException, IOException { + + ConnManager manager = getContext().getConnManager(); + String[] sColumnNames = null; + if (null == tableName) { + String sqlQuery = options.getSqlQuery(); + sColumnNames = manager.getColumnNamesForQuery(sqlQuery); + } else { + if (null == options.getColumns()) { + sColumnNames = manager.getColumnNames(tableName); + } else { + sColumnNames = options.getColumns(); + } + } + String columnMappings = options.getPhoenixColumnMapping(); + // if column mappings aren't provided, + // we assume the column names in sqoop and phoenix match. + if(columnMappings == null || columnMappings.isEmpty()) { + columnMappings = Joiner.on(",").join(sColumnNames); } - - @Override - protected void configureMapper(Job job, String tableName, - String tableClassName) throws IOException { - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); - job.setMapperClass(getMapperClass()); + job.getConfiguration().set(PhoenixConstants.PHOENIX_SQOOP_COLUMNS, + Joiner.on(",").join(sColumnNames)); + job.getConfiguration().set(PhoenixConstants.PHOENIX_COLUMN_MAPPING, columnMappings); + job.setOutputFormatClass(getOutputFormatClass()); + } + + @Override + protected void jobSetup(Job job) throws IOException, ImportException { + super.jobSetup(job); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.addHbaseResources(conf); + final String tableName = options.getPhoenixTable(); + final String sColumnNames = conf.get(PhoenixConstants.PHOENIX_SQOOP_COLUMNS); + final String columnMappings = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(sColumnNames); + + final Map phoenixToSqoopMapping = PhoenixUtil.getPhoenixToSqoopMap(columnMappings); + final String pColumnNames = Joiner. + on(PhoenixConstants.PHOENIX_COLUMN_MAPPING_SEPARATOR). + join(phoenixToSqoopMapping.keySet()); + boolean isValidColumns = PhoenixUtil.validateColumns(sColumnNames,columnMappings); + if (!isValidColumns) { + throw new RuntimeException(String.format("Failure to map sqoop columns [%s] " + + "to phoenix columns [%s] ", sColumnNames,pColumnNames)); } - - @Override - protected Class getMapperClass() { - return PhoenixBulkImportMapper.class; - } - - @Override - protected Class getOutputFormatClass() - throws ClassNotFoundException { - return HFileOutputFormat.class; - } - - @Override - protected void configureOutputFormat(Job job, String tableName, - String tableClassName) throws ClassNotFoundException, IOException { - - ConnManager manager = getContext().getConnManager(); - String[] sColumnNames = null; - if(null == tableName) { - String sqlQuery = options.getSqlQuery(); - sColumnNames = manager.getColumnNamesForQuery(sqlQuery); - } else { - if(null == options.getColumns()) { - sColumnNames = manager.getColumnNames(tableName); - } else { - sColumnNames = options.getColumns(); - } - } - String columnMappings = options.getPhoenixColumnMapping(); - // if column mappings aren't provided, we assume the column names in sqoop and phoenix match. - if(columnMappings == null || columnMappings.isEmpty()) { - columnMappings = Joiner.on(",").join(sColumnNames); - } - job.getConfiguration().set(PhoenixConstants.PHOENIX_SQOOP_COLUMNS, Joiner.on(",").join(sColumnNames)); - job.getConfiguration().set(PhoenixConstants.PHOENIX_COLUMN_MAPPING, columnMappings); - job.setOutputFormatClass(getOutputFormatClass()); - } - - @Override - protected void jobSetup(Job job) throws IOException, ImportException { - super.jobSetup(job); - Configuration conf = job.getConfiguration(); - HBaseConfiguration.addHbaseResources(conf); - final String tableName = options.getPhoenixTable(); - final String sColumnNames = conf.get(PhoenixConstants.PHOENIX_SQOOP_COLUMNS); - final String columnMappings = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); - Preconditions.checkNotNull(tableName); - Preconditions.checkNotNull(sColumnNames); - - - final Map phoenixToSqoopMapping = PhoenixUtil.getPhoenixToSqoopMap(columnMappings); - final String pColumnNames = Joiner.on(",").join(phoenixToSqoopMapping.keySet()); - boolean isValidColumns = PhoenixUtil.validateColumns(sColumnNames,columnMappings); - if(!isValidColumns) { - throw new RuntimeException(String.format("Failure to map sqoop columns [%s] to phoenix columns [%s] ", sColumnNames,pColumnNames)); - } - - PhoenixMapReduceUtil.setOutput(job, tableName,pColumnNames); - - FileOutputFormat.setOutputPath(job, getContext().getDestination()); - HTable hTable = new HTable(job.getConfiguration(), tableName); - HFileOutputFormat.configureIncrementalLoad(job, hTable); - TableMapReduceUtil.initCredentials(job); - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.addDependencyJars(conf, HTable.class); - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), PhoenixDriver.class); - } + + PhoenixMapReduceUtil.setOutput(job,tableName,pColumnNames); + + FileOutputFormat.setOutputPath(job,getContext().getDestination()); + HTable hTable = new HTable(job.getConfiguration(), tableName); + HFileOutputFormat.configureIncrementalLoad(job, hTable); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJars(conf, HTable.class); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), PhoenixDriver.class); + } @Override protected void completeImport(Job job) throws IOException, ImportException { super.completeImport(job); FileSystem fileSystem = FileSystem.get(job.getConfiguration()); - - Path bulkLoadDir = getContext().getDestination(); - setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), - FsPermission.createImmutable((short) 00777)); - - HTable hTable = new HTable(job.getConfiguration(), options.getPhoenixTable()); - - // Load generated HFiles into table - try { - LoadIncrementalHFiles loader = new LoadIncrementalHFiles( - job.getConfiguration()); - loader.doBulkLoad(bulkLoadDir, hTable); - } - catch (Exception e) { - String errorMessage = String.format("Unrecoverable error while " + - "performing the bulk load of files in [%s]", - bulkLoadDir.toString()); - throw new ImportException(errorMessage, e); - } + Path bulkLoadDir = getContext().getDestination(); + setPermission(fileSystem, fileSystem.getFileStatus(bulkLoadDir), + FsPermission.createImmutable((short) 00777)); + HTable hTable = new HTable(job.getConfiguration(), options.getPhoenixTable()); + + // Load generated HFiles into table + try { + LoadIncrementalHFiles loader = new LoadIncrementalHFiles( + job.getConfiguration()); + loader.doBulkLoad(bulkLoadDir, hTable); } - - @Override - protected void jobTeardown(Job job) throws IOException, ImportException { - super.jobTeardown(job); - // Delete the hfiles directory after we are finished. - FileSystem fileSystem = FileSystem.get(job.getConfiguration()); - fileSystem.delete(getContext().getDestination(), true); + catch (Exception e) { + String errorMessage = String.format("Unrecoverable error while " + + "performing the bulk load of files in [%s]", + bulkLoadDir.toString()); + throw new ImportException(errorMessage, e); } + } - /** - * Set the file permission of the path of the given fileStatus. If the path - * is a directory, apply permission recursively to all subdirectories and - * files. - * - * @param fs the filesystem - * @param fileStatus containing the path - * @param permission the permission - * @throws java.io.IOException - */ - private void setPermission(FileSystem fs, FileStatus fileStatus, - FsPermission permission) throws IOException { - if(fileStatus.isDir()) { - for(FileStatus file : fs.listStatus(fileStatus.getPath())){ - setPermission(fs, file, permission); - } - } - fs.setPermission(fileStatus.getPath(), permission); + @Override + protected void jobTeardown(Job job) throws IOException, ImportException { + super.jobTeardown(job); + // Delete the hfiles directory after we are finished. + FileSystem fileSystem = FileSystem.get(job.getConfiguration()); + fileSystem.delete(getContext().getDestination(), true); + } + + /** + * Set the file permission of the path of the given fileStatus. If the path + * is a directory, apply permission recursively to all subdirectories and + * files. + * + * @param fs the filesystem + * @param fileStatus containing the path + * @param permission the permission + * @throws java.io.IOException + */ + private void setPermission(FileSystem fs, FileStatus fileStatus, + FsPermission permission) throws IOException { + if (fileStatus.isDirectory()) { + for (FileStatus file : fs.listStatus(fileStatus.getPath())){ + setPermission(fs, file, permission); + } + } + fs.setPermission(fileStatus.getPath(), permission); } } diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java index b84921a8f..09c302f27 100644 --- a/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixBulkImportMapper.java @@ -1,14 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.sqoop.mapreduce; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -19,7 +33,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.util.ColumnInfo; @@ -28,11 +41,14 @@ import org.apache.sqoop.phoenix.PhoenixConstants; import org.apache.sqoop.phoenix.PhoenixUtil; -import com.cloudera.sqoop.lib.SqoopRecord; -import com.cloudera.sqoop.mapreduce.AutoProgressMapper; -import com.google.common.base.Splitter; -import com.google.common.collect.Maps; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.AutoProgressMapper; +/** + * + * Mapper class for phoenix bulk import job. + * + */ public class PhoenixBulkImportMapper extends AutoProgressMapper { @@ -57,16 +73,11 @@ protected void setup(Mapper prop : conf) { - System.out.println(String.format(" the key is [%s] and the value is [%s]" , prop.getKey(),prop.getValue())); - } - columnMappings = PhoenixUtil.getPhoenixToSqoopMap(columnMaps); - + columnInfos = PhoenixConfigurationUtil.getUpsertColumnMetadataList(conf); + String upsertSql = QueryUtil.constructUpsertStatement(phoenixTable, columnInfos); + preparedStatement = conn.prepareStatement(upsertSql); + String columnMaps = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); + columnMappings = PhoenixUtil.getPhoenixToSqoopMap(columnMaps); } catch (Exception e) { throw new RuntimeException("Failed to setup due to ." + e.getMessage()); } @@ -78,12 +89,11 @@ protected void setup(Mapper.Context context) throws IOException, InterruptedException { - try { ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); Map fields = value.getFieldMap(); int i = 1; - for(ColumnInfo colInfo : columnInfos) { + for (ColumnInfo colInfo : columnInfos) { String pColName = colInfo.getDisplayName(); String sColName = columnMappings.get(pColName); Object sColValue = fields.get(sColName); @@ -112,12 +122,11 @@ protected void map(LongWritable key, SqoopRecord value, } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - try { - conn.close(); - } catch (SQLException e) { - throw new RuntimeException(e); - } + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + conn.close(); + } catch (SQLException e) { + throw new RuntimeException(e); } - + } } diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java b/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java index 6cb7e3312..101d9242a 100644 --- a/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixImportJob.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.sqoop.mapreduce; import java.io.IOException; @@ -32,82 +49,81 @@ */ public class PhoenixImportJob extends DataDrivenImportJob { - public static final Log LOG = LogFactory.getLog( - PhoenixImportJob.class.getName()); + public static final Log LOG = LogFactory.getLog( + PhoenixImportJob.class.getName()); - public PhoenixImportJob(final SqoopOptions opts, - final ImportJobContext importContext) { - super(opts, importContext.getInputFormat(), importContext); - } + public PhoenixImportJob(final SqoopOptions opts, + final ImportJobContext importContext) { + super(opts, importContext.getInputFormat(), importContext); + } - @Override - protected void configureMapper(Job job, String tableName, + @Override + protected void configureMapper(Job job, String tableName, String tableClassName) throws IOException { - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(PhoenixSqoopWritable.class); - job.setMapperClass(getMapperClass()); - } + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(PhoenixSqoopWritable.class); + job.setMapperClass(getMapperClass()); + } - @Override - protected Class getMapperClass() { - return PhoenixImportMapper.class; - } + @Override + protected Class getMapperClass() { + return PhoenixImportMapper.class; + } - @Override - protected Class getOutputFormatClass() + @Override + protected Class getOutputFormatClass() throws ClassNotFoundException { - return PhoenixOutputFormat.class; - } + return PhoenixOutputFormat.class; + } - @Override - protected void configureOutputFormat(Job job, String tableName, - String tableClassName) throws ClassNotFoundException, IOException { - ConnManager manager = getContext().getConnManager(); - String[] sColumnNames = null; - if(null == tableName) { - String sqlQuery = options.getSqlQuery(); - sColumnNames = manager.getColumnNamesForQuery(sqlQuery); - } else { - if(null == options.getColumns()) { - sColumnNames = manager.getColumnNames(tableName); - } else { - sColumnNames = options.getColumns(); - } - } - String columnMappings = options.getPhoenixColumnMapping(); - // if column mappings aren't provided, we assume the column names in sqoop and phoenix match. - if(columnMappings == null || columnMappings.isEmpty()) { - columnMappings = Joiner.on(",").join(sColumnNames); - } - job.getConfiguration().set(PhoenixConstants.PHOENIX_SQOOP_COLUMNS, Joiner.on(",").join(sColumnNames)); - job.getConfiguration().set(PhoenixConstants.PHOENIX_COLUMN_MAPPING, columnMappings); - job.setOutputFormatClass(getOutputFormatClass()); + @Override + protected void configureOutputFormat(Job job, String tableName, + String tableClassName) throws ClassNotFoundException, IOException { + ConnManager manager = getContext().getConnManager(); + String[] sColumnNames = null; + if (null == tableName) { + String sqlQuery = options.getSqlQuery(); + sColumnNames = manager.getColumnNamesForQuery(sqlQuery); + } else { + if (null == options.getColumns()) { + sColumnNames = manager.getColumnNames(tableName); + } else { + sColumnNames = options.getColumns(); + } + } + String columnMappings = options.getPhoenixColumnMapping(); + // if column mappings aren't provided, we assume the column names in sqoop and phoenix match. + if (columnMappings == null || columnMappings.isEmpty()) { + columnMappings = Joiner.on(",").join(sColumnNames); } + job.getConfiguration().set(PhoenixConstants.PHOENIX_SQOOP_COLUMNS, Joiner.on(",").join(sColumnNames)); + job.getConfiguration().set(PhoenixConstants.PHOENIX_COLUMN_MAPPING, columnMappings); + job.setOutputFormatClass(getOutputFormatClass()); + } - @Override - protected void jobSetup(Job job) throws IOException, ImportException { - Configuration conf = job.getConfiguration(); - HBaseConfiguration.addHbaseResources(conf); - final String tableName = options.getPhoenixTable(); - final String sColumnNames = conf.get(PhoenixConstants.PHOENIX_SQOOP_COLUMNS); - final String columnMappings = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); - Preconditions.checkNotNull(tableName); - Preconditions.checkNotNull(sColumnNames); - Preconditions.checkNotNull(columnMappings); - - final Map phoenixToSqoopMapping = PhoenixUtil.getPhoenixToSqoopMap(columnMappings); - final String pColumnNames = Joiner.on(",").join(phoenixToSqoopMapping.keySet()); - boolean isValidColumns = PhoenixUtil.validateColumns(sColumnNames,columnMappings); - if(!isValidColumns) { - throw new RuntimeException(String.format("Failure to map sqoop columns [%s] to phoenix columns [%s] ", sColumnNames,pColumnNames)); - } - // set the table and columns. - PhoenixMapReduceUtil.setOutput(job, tableName,pColumnNames); - TableMapReduceUtil.initCredentials(job); - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.addDependencyJars(conf, HTable.class); - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), PhoenixDriver.class); - super.jobSetup(job); - - } + @Override + protected void jobSetup(Job job) throws IOException, ImportException { + Configuration conf = job.getConfiguration(); + HBaseConfiguration.addHbaseResources(conf); + final String tableName = options.getPhoenixTable(); + final String sColumnNames = conf.get(PhoenixConstants.PHOENIX_SQOOP_COLUMNS); + final String columnMappings = conf.get(PhoenixConstants.PHOENIX_COLUMN_MAPPING); + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(sColumnNames); + Preconditions.checkNotNull(columnMappings); + + final Map phoenixToSqoopMapping = PhoenixUtil.getPhoenixToSqoopMap(columnMappings); + final String pColumnNames = Joiner.on(",").join(phoenixToSqoopMapping.keySet()); + boolean isValidColumns = PhoenixUtil.validateColumns(sColumnNames,columnMappings); + if (!isValidColumns) { + throw new RuntimeException(String.format("Failure to map sqoop columns [%s] to phoenix columns [%s] ", sColumnNames,pColumnNames)); + } + // set the table and columns. + PhoenixMapReduceUtil.setOutput(job, tableName,pColumnNames); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + TableMapReduceUtil.addDependencyJars(conf, HTable.class); + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), PhoenixDriver.class); + super.jobSetup(job); + } } diff --git a/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java b/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java index 8510b24e2..04d2c86f5 100644 --- a/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java +++ b/src/java/org/apache/sqoop/mapreduce/PhoenixImportMapper.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.sqoop.mapreduce; import java.io.IOException; @@ -35,10 +34,9 @@ import org.apache.sqoop.phoenix.PhoenixSqoopWritable; import org.apache.sqoop.phoenix.PhoenixUtil; -import com.cloudera.sqoop.lib.SqoopRecord; -import com.cloudera.sqoop.mapreduce.AutoProgressMapper; +import org.apache.sqoop.lib.SqoopRecord; +import org.apache.sqoop.mapreduce.AutoProgressMapper; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; /** * Imports records by writing them to Phoenix @@ -62,10 +60,7 @@ protected void setup(Mapper prop : conf) { - System.out.println(String.format(" the key is [%s] and the value is [%s]" , prop.getKey(),prop.getValue())); - } - columnMappings = PhoenixUtil.getPhoenixToSqoopMap(columnMaps); + columnMappings = PhoenixUtil.getPhoenixToSqoopMap(columnMaps); } catch (SQLException e) { throw new RuntimeException("Failed to load the upsert column metadata for table."); } @@ -79,7 +74,7 @@ public void map(LongWritable key, SqoopRecord val, Context context) PhoenixSqoopWritable recordWritable = new PhoenixSqoopWritable(); recordWritable.setColumnMetadata(columnInfos); List columnValues = Lists.newArrayListWithCapacity(columnInfos.size()); - for(ColumnInfo column : columnInfos) { + for (ColumnInfo column : columnInfos) { String pColName = column.getDisplayName(); String sColName = columnMappings.get(pColName); Object sColValue = fields.get(sColName); diff --git a/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java b/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java index 3630db4e0..9aaee3013 100644 --- a/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java +++ b/src/java/org/apache/sqoop/phoenix/PhoenixConstants.java @@ -20,11 +20,17 @@ /** * Set of constants specifically for phoenix. */ -public class PhoenixConstants { +public final class PhoenixConstants { /** property used to specify the column mapping of db to phoenix **/ public static final String PHOENIX_COLUMN_MAPPING = "sqoop.phoenix.import.column.mapping"; /** property used to specify the columns beings imported from sqoop. */ public static final String PHOENIX_SQOOP_COLUMNS = "sqoop.phoenix.columns"; + + /** separator for phoenix columns */ + public static final String PHOENIX_COLUMN_MAPPING_SEPARATOR = ","; + + /** separator between phoenix and sqoop column. */ + public static final String PHOENIX_SQOOP_COLUMN_SEPARATOR = ";"; } diff --git a/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java b/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java index a1721a23c..174ca5b70 100644 --- a/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java +++ b/src/java/org/apache/sqoop/phoenix/PhoenixSqoopWritable.java @@ -20,68 +20,69 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; import java.util.List; import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.phoenix.util.ColumnInfo; +import com.google.common.base.Preconditions; + /** * - * Writable class. + * A custom {@linkplain DBWritable} to map input columns to phoenix columns to upsert. * */ public class PhoenixSqoopWritable implements DBWritable { private List columnMetadata; - - private List values; - - private int columnCount = -1; - - @Override - public void write(PreparedStatement statement) throws SQLException { - for(int i = 0 ; i < values.size() ; i++) { - Object value = values.get(i); - ColumnInfo columnInfo = columnMetadata.get(i); - if(value == null) { - statement.setNull(i + 1, columnInfo.getSqlType()); - } else { - statement.setObject(i + 1, value , columnInfo.getSqlType()); - } - } - - } - - @Override - public void readFields(ResultSet resultSet) throws SQLException { - // we do this once per mapper. - if(columnCount == -1) { - this.columnCount = resultSet.getMetaData().getColumnCount(); - } + private List values; + + /** + * Default constructor + */ + public PhoenixSqoopWritable() { + } + + public PhoenixSqoopWritable(final List columnMetadata, final List values) { + Preconditions.checkNotNull(values); + Preconditions.checkNotNull(columnMetadata); + this.columnMetadata = columnMetadata; + this.values = values; + } - values = new ArrayList(columnCount); - for(int i = 0 ; i < columnCount ; i++) { - Object value = resultSet.getObject(i + 1); - values.add(value); - } - + @Override + public void write(PreparedStatement statement) throws SQLException { + Preconditions.checkNotNull(values); + Preconditions.checkNotNull(columnMetadata); + for (int i = 0 ; i < values.size() ; i++) { + Object value = values.get(i); + ColumnInfo columnInfo = columnMetadata.get(i); + if (value == null) { + statement.setNull(i + 1, columnInfo.getSqlType()); + } else { + statement.setObject(i + 1, value , columnInfo.getSqlType()); + } } + } - public List getColumnMetadata() { - return columnMetadata; - } + @Override + public void readFields(ResultSet resultSet) throws SQLException { + // NO-OP for now + } - public void setColumnMetadata(List columnMetadata) { - this.columnMetadata = columnMetadata; - } + public List getColumnMetadata() { + return columnMetadata; + } - public List getValues() { - return values; - } + public void setColumnMetadata(List columnMetadata) { + this.columnMetadata = columnMetadata; + } + + public List getValues() { + return values; + } - public void setValues(List values) { - this.values = values; - } - + public void setValues(List values) { + this.values = values; + } } diff --git a/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java b/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java index afeb68b25..4571d6ea4 100644 --- a/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java +++ b/src/java/org/apache/sqoop/phoenix/PhoenixUtil.java @@ -17,20 +17,28 @@ */ package org.apache.sqoop.phoenix; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + /** * - * Utility class . + * Utility class . * */ public class PhoenixUtil { + public static final Log LOG = LogFactory.getLog( + PhoenixUtil.class.getName()); + private static boolean testingMode = false; - private PhoenixUtil() { } + private PhoenixUtil() { + } /** * This is a way to make this always return false for testing. @@ -40,19 +48,19 @@ public static void setAlwaysNoPhoenixJarMode(boolean mode) { } public static boolean isPhoenixJarPresent() { - if (testingMode) { - return false; - } - - try { - // validate if hbase jars also exist in classpath. - Class.forName("org.apache.hadoop.hbase.client.HTable"); - Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); - } catch (ClassNotFoundException cnfe) { - return false; - } - return true; + if (testingMode) { + return false; } + try { + // validate if hbase jars also exist in classpath. + Class.forName("org.apache.hadoop.hbase.client.HTable"); + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + } catch (ClassNotFoundException cnfe) { + LOG.error("Failed to find phoenix dependencies in classpath : " + cnfe.getMessage()); + return false; + } + return true; + } /** * Generates a map of phoenix_column to sqoop column. @@ -60,14 +68,13 @@ public static boolean isPhoenixJarPresent() { * @return */ public static Map getPhoenixToSqoopMap(String columnMappings) { - - String[] split = columnMappings.split(","); + String[] split = columnMappings.split(PhoenixConstants.PHOENIX_COLUMN_MAPPING_SEPARATOR); Map columnMappingsMap = new HashMap(); - for(String each : split) { - String[] sqoopToPhoenixMapping = each.split(";"); - // if the sqoop column name is the same as phoenix column name, we don't need to separate the columns by - // a ';' delimiter. - if(sqoopToPhoenixMapping.length == 2) { + for (String each : split) { + String[] sqoopToPhoenixMapping = each.split(PhoenixConstants.PHOENIX_SQOOP_COLUMN_SEPARATOR); + // if the sqoop column name is the same as phoenix column name, + // we don't need to separate the columns by a ';' delimiter. + if (sqoopToPhoenixMapping.length == 2) { columnMappingsMap.put(sqoopToPhoenixMapping[1], sqoopToPhoenixMapping[0]); } else { columnMappingsMap.put(sqoopToPhoenixMapping[0].toUpperCase(), sqoopToPhoenixMapping[0]); @@ -86,12 +93,16 @@ public static Map getPhoenixToSqoopMap(String columnMappings) { public static boolean validateColumns(String sColumns, String columnMappings) { Map phoenixToSqoopColumnMap = getPhoenixToSqoopMap(columnMappings); String sqoopColumns[] = sColumns.split(","); - if(sqoopColumns.length != phoenixToSqoopColumnMap.size()) { - throw new RuntimeException("Mismatch in the columns being imported from Sqoop and writtent to phoenix"); + if (sqoopColumns.length != phoenixToSqoopColumnMap.size()) { + throw new RuntimeException("Mismatch in the number of columns being imported from Sqoop " + + "and written to phoenix."); } Collection values = phoenixToSqoopColumnMap.values(); - for(String sqoopColumn : sqoopColumns) { - assert values.contains(sqoopColumn); + for (String sqoopColumn : sqoopColumns) { + if (!values.contains(sqoopColumn)) { + throw new RuntimeException(String.format("Sqoop column [%s] doesn't exist in the valid list" + + " of phoenix columns [%s] ",sqoopColumn, Arrays.toString(values.toArray()))); + } } return true; } From e06f6f2a978e23d2ca4dc1439f3013b5e523f7ec Mon Sep 17 00:00:00 2001 From: Ravi Magham Date: Sun, 15 Nov 2015 22:20:35 -0800 Subject: [PATCH 4/6] unit tests. --- build.xml | 2 +- ivy.xml | 9 +- .../org/apache/sqoop/phoenix/PhoenixUtil.java | 2 +- .../org/apache/sqoop/tool/ImportTool.java | 3 +- .../sqoop/phoenix/PhoenixImportTest.java | 83 ------- .../sqoop/phoenix/PhoenixTestCase.java | 232 ------------------ .../sqoop/phoenix/PhoenixBasicImportTest.java | 119 +++++++++ .../sqoop/phoenix/PhoenixBulkImportTest.java | 119 +++++++++ .../sqoop/phoenix/PhoenixQueryImportTest.java | 74 ++++++ .../apache/sqoop/phoenix/PhoenixTestCase.java | 232 ++++++++++++++++++ 10 files changed, 555 insertions(+), 320 deletions(-) delete mode 100644 src/test/com/cloudera/sqoop/phoenix/PhoenixImportTest.java delete mode 100644 src/test/com/cloudera/sqoop/phoenix/PhoenixTestCase.java create mode 100644 src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java create mode 100644 src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java create mode 100644 src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java create mode 100644 src/test/org/apache/sqoop/phoenix/PhoenixTestCase.java diff --git a/build.xml b/build.xml index 06c985dc1..f30deb98e 100644 --- a/build.xml +++ b/build.xml @@ -191,7 +191,7 @@ - + diff --git a/ivy.xml b/ivy.xml index fd341f09a..f430cdef2 100644 --- a/ivy.xml +++ b/ivy.xml @@ -293,8 +293,13 @@ under the License. - - + + + + + + + zkParam[] = {Integer.TYPE}; - try { - m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort", - zkParam); - } catch (NoSuchMethodException e) { - m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", - zkParam); - } - m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)}); - zookeeperCluster.startup(new File(zookeeperDir)); - hbaseCluster = new MiniHBaseCluster(hbaseConf, 1); - HMaster master = hbaseCluster.getMaster(); - Object serverName = master.getServerName(); - - String hostAndPort; - if (serverName instanceof String) { - System.out.println("Server name is string, using HServerAddress."); - m = HMaster.class.getDeclaredMethod("getMasterAddress", - new Class[]{}); - Class clazz = Class.forName("org.apache.hadoop.hbase.HServerAddress"); - /* - * Call method to get server address - */ - Object serverAddr = clazz.cast(m.invoke(master, new Object[]{})); - //returns the address as hostname:port - hostAndPort = serverAddr.toString(); - } else { - System.out.println("ServerName is org.apache.hadoop.hbase.ServerName," - + "using getHostAndPort()"); - Class clazz = Class.forName("org.apache.hadoop.hbase.ServerName"); - m = clazz.getDeclaredMethod("getHostAndPort", new Class[]{}); - hostAndPort = m.invoke(serverName, new Object[]{}).toString(); - } - hbaseConf.set("hbase.master", hostAndPort); - hbaseTestUtil = new HBaseTestingUtility(hbaseConf); - hbaseTestUtil.setZkCluster(zookeeperCluster); - hbaseCluster.startMaster(); - Class.forName(PhoenixDriver.class.getName()); - zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); - conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL - + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum); - super.setUp(); - } catch (Throwable e) { - throw new RuntimeException(e); - } - } - - public static File createTempDir() { - File baseDir = new File(System.getProperty("java.io.tmpdir")); - File tempDir = new File(baseDir, UUID.randomUUID().toString()); - if (tempDir.mkdir()) { - return tempDir; - } - throw new IllegalStateException("Failed to create directory"); - } - - /** - * creates the test table given the ddl - * @param url - * @param ddl - * @throws SQLException - */ - protected static void createTestTable(String ddl) throws SQLException { - assertNotNull(ddl); - try { - PreparedStatement stmt = conn.prepareStatement(ddl); - stmt.execute(ddl); - } catch (TableAlreadyExistsException e) { - // ignore if already exists - } - } - - public void shutdown() throws Exception { - LOG.info("In shutdown() method"); - if (null != hbaseTestUtil) { - LOG.info("Shutting down HBase cluster"); - hbaseCluster.shutdown(); - zookeeperCluster.shutdown(); - hbaseTestUtil = null; - conn.close(); - } - FileUtils.deleteDirectory(new File(workDir)); - LOG.info("shutdown() method returning."); - } - - @Override - @After - public void tearDown() { - try { - shutdown(); - } catch (Exception e) { - LOG.warn("Error shutting down HBase minicluster: " - + StringUtils.stringifyException(e)); - } - PhoenixTestCase.restoreTestBuidlDataProperty(); - super.tearDown(); - } - - /** - * Create the argv to pass to Sqoop. - * @return the argv as an array of strings. - */ - protected String [] getArgv(boolean includeHadoopFlags, - String phoenixTable, String phoenixColumnMapping,String queryStr) { - - ArrayList args = new ArrayList(); - - if (includeHadoopFlags) { - CommonArgs.addHadoopFlags(args); - args.add("-D"); - args.add("hbase.zookeeper.property.clientPort=21818"); - } - - if (null != queryStr) { - args.add("--query"); - args.add(queryStr); - } else { - args.add("--table"); - args.add(getTableName()); - } - args.add("--split-by"); - args.add(getColName(0)); - args.add("--connect"); - args.add(HsqldbTestServer.getUrl()); - args.add("--num-mappers"); - args.add("1"); - args.add("--phoenix-table"); - args.add(phoenixTable); - if(null != phoenixColumnMapping) { - args.add("--phoenix-column-mapping"); - args.add(phoenixColumnMapping); - } - - return args.toArray(new String[0]); - } -} diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java b/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java new file mode 100644 index 000000000..10fb9352a --- /dev/null +++ b/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.phoenix; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import org.junit.Test; + +/** + * + * Tests to import onto phoenix tables. + * + */ +public class PhoenixBasicImportTest extends PhoenixTestCase { + + /** + * Test where the sqoop and phoenix table column names are the same. + * @throws Exception + */ + @Test + public void testBasicUsageWithNoColumnMapping() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + final String tableName = "TABLE1"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createPhoenixTestTable(ddl); + + final String [] argv = getArgv(true, "ID", tableName, null, false, null); + + // create sqoop table + String [] columnNames = { "ID" , "NAME" , "AGE"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "'first'" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run the import. + runImport(argv); + + // verify the result + phoenixConnection = getPhoenixConnection(); + Statement stmt = phoenixConnection.createStatement(); + rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertEquals("first", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } + + /** + * Test where the sqoop table column names differ from phoenix table column name + * @throws Exception + */ + @Test + public void testBasicUsageWithColumnMapping() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + // create phoenix table + final String tableName = "TABLE2"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createPhoenixTestTable(ddl); + + // create sqoop table + String [] columnNames = { "rowid" , "name" , "age"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "'Name 1'" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run the import + String [] argv = getArgv(true, "rowid", tableName, "ROWID;ID,NAME,AGE", false, null); + runImport(argv); + + //verify the data. + phoenixConnection = getPhoenixConnection(); + Statement stmt = phoenixConnection.createStatement(); + rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } +} diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java b/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java new file mode 100644 index 000000000..4194a4349 --- /dev/null +++ b/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.phoenix; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import org.junit.Test; + +/** + * + * Tests to bulk import onto phoenix tables. + * + */ +public class PhoenixBulkImportTest extends PhoenixTestCase { + + /** + * Test where the sqoop and phoenix table column names are the same. + * @throws Exception + */ + @Test + public void testBulkloadWithNoColumnMapping() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + final String tableName = "TABLE1"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createPhoenixTestTable(ddl); + + final String [] argv = getArgv(true, "ID", tableName, null, true, null); + + // create sqoop table + String [] columnNames = { "ID" , "NAME" , "AGE"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "'first'" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run the import. + runImport(argv); + + // verify the result + phoenixConnection = getPhoenixConnection(); + Statement stmt = phoenixConnection.createStatement(); + rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertEquals("first", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } + + /** + * Test where the sqoop table column names differ from phoenix table column name + * @throws Exception + */ + @Test + public void testBulkImportWithColumnMapping() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + // create phoenix table + final String tableName = "TABLE2"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER)" , tableName); + createPhoenixTestTable(ddl); + + // create sqoop table + String [] columnNames = { "rowid" , "name" , "age"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "INT"}; + String [] vals = { "0", "'Name 1'" , "1" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run the import + String [] argv = getArgv(true, "rowid", tableName, "ROWID;ID,NAME,AGE", true , null); + runImport(argv); + + //verify the data. + phoenixConnection = getPhoenixConnection(); + Statement stmt = phoenixConnection.createStatement(); + rs = stmt.executeQuery(String.format("SELECT id, name, age FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(1, rs.getInt(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } +} diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java b/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java new file mode 100644 index 000000000..a6519aceb --- /dev/null +++ b/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java @@ -0,0 +1,74 @@ +package org.apache.sqoop.phoenix; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +import org.junit.Test; + +/** + * Tests to insert onto phoenix table using query + * + */ +public class PhoenixQueryImportTest extends PhoenixTestCase { + + /** + * + * @throws Exception + */ + @Test + public void testQueryImport() throws Exception { + Connection phoenixConnection = null; + ResultSet rs = null; + try { + + // create phoenix table + final String tableName = "TABLE1"; + final String ddl = String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, LOCATION VARCHAR)" , tableName); + createPhoenixTestTable(ddl); + + // create sqoop table + String [] columnNames = { "ID" , "NAME" , "LOCATION"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "VARCHAR(32)"}; + String [] vals = { "1", "'first'" , "'CA'" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + + // run import + String sqoopTableName = getTableName(); + String query = String.format("SELECT id AS ID, name AS NAME FROM %s WHERE $CONDITIONS",sqoopTableName); + String [] argv = getArgv(true, "ID", tableName, null, false ,query); + runImport(argv); + + //verify the data. + Statement pstmt = getPhoenixConnection().createStatement(); + rs = pstmt.executeQuery(String.format("SELECT id, name,location FROM %s",tableName)); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("first", rs.getString(2)); + assertEquals(null, rs.getString(3)); + assertFalse(rs.next()); + } finally { + if(rs != null) { + rs.close(); + } + if(phoenixConnection != null) { + phoenixConnection.close(); + } + } + } + + @Test + public void testInvalidArgument() { + try { + String [] columnNames = { "ID" , "NAME" , "LOCATION"}; + String [] colTypes = { "INT", "VARCHAR(32)" , "VARCHAR(32)"}; + String [] vals = { "1", "'first'" , "'CA'" }; + createTableWithColTypesAndNames(columnNames, colTypes, vals); + String [] argv = getArgv(true, "ID", "SAMPLE", null, false , null); + runImport(argv); + fail("The test should fail as niether sqoop table nor sql query is passed."); + } catch (Exception ex) { + return; + } + } +} diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixTestCase.java b/src/test/org/apache/sqoop/phoenix/PhoenixTestCase.java new file mode 100644 index 000000000..19dfe83a6 --- /dev/null +++ b/src/test/org/apache/sqoop/phoenix/PhoenixTestCase.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sqoop.phoenix; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.UUID; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.util.QueryUtil; +import org.junit.After; +import org.junit.Before; +import org.kitesdk.shaded.com.google.common.base.Preconditions; + +import com.cloudera.sqoop.testutil.CommonArgs; +import com.cloudera.sqoop.testutil.HsqldbTestServer; +import com.cloudera.sqoop.testutil.ImportJobTestCase; + +/** + * + * Base test class for all phoenix tests. + * + */ +public class PhoenixTestCase extends ImportJobTestCase { + + public static final Log LOG = LogFactory.getLog(PhoenixTestCase.class.getName()); + + private static String testBuildDataProperty = ""; + + private static void recordTestBuildDataProperty() { + testBuildDataProperty = System.getProperty("test.build.data", ""); + } + + private static void restoreTestBuidlDataProperty() { + System.setProperty("test.build.data", testBuildDataProperty); + } + + protected HBaseTestingUtility hbaseTestUtil; + private String workDir = createTempDir().getAbsolutePath(); + private MiniZooKeeperCluster zookeeperCluster; + private MiniHBaseCluster hbaseCluster; + + + @Override + @Before + public void setUp() { + try { + recordTestBuildDataProperty(); + String hbaseDir = new File(workDir, "phoenix" + UUID.randomUUID().toString() ).getAbsolutePath(); + String hbaseRoot = "file://" + hbaseDir; + Configuration hbaseConf = HBaseConfiguration.create(); + hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot); + //Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT + hbaseConf.setInt("hbase.zookeeper.property.clientPort", 2181); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0"); + hbaseConf.setInt("hbase.master.info.port", -1); + hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500); + String zookeeperDir = new File(workDir, "zk").getAbsolutePath(); + int zookeeperPort = 2181; + zookeeperCluster = new MiniZooKeeperCluster(); + Method m; + Class zkParam[] = {Integer.TYPE}; + try { + m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort", + zkParam); + } catch (NoSuchMethodException e) { + m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", + zkParam); + } + m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)}); + zookeeperCluster.startup(new File(zookeeperDir)); + hbaseCluster = new MiniHBaseCluster(hbaseConf, 1); + HMaster master = hbaseCluster.getMaster(); + Object serverName = master.getServerName(); + + String hostAndPort; + if (serverName instanceof String) { + m = HMaster.class.getDeclaredMethod("getMasterAddress", + new Class[]{}); + Class clazz = Class.forName("org.apache.hadoop.hbase.HServerAddress"); + Object serverAddr = clazz.cast(m.invoke(master, new Object[]{})); + //returns the address as hostname:port + hostAndPort = serverAddr.toString(); + } else { + Class clazz = Class.forName("org.apache.hadoop.hbase.ServerName"); + m = clazz.getDeclaredMethod("getHostAndPort", new Class[]{}); + hostAndPort = m.invoke(serverName, new Object[]{}).toString(); + } + hbaseConf.set("hbase.master", hostAndPort); + hbaseTestUtil = new HBaseTestingUtility(hbaseConf); + hbaseTestUtil.setZkCluster(zookeeperCluster); + hbaseCluster.startMaster(); + super.setUp(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + public static File createTempDir() { + File baseDir = new File(System.getProperty("java.io.tmpdir")); + File tempDir = new File(baseDir, UUID.randomUUID().toString()); + if (tempDir.mkdir()) { + return tempDir; + } + throw new IllegalStateException("Failed to create directory"); + } + + /** + * creates the test table given the ddl + * @param url + * @param ddl + * @throws SQLException + */ + protected void createPhoenixTestTable(final String ddl) throws SQLException { + assertNotNull(ddl); + PreparedStatement stmt = getPhoenixConnection().prepareStatement(ddl); + stmt.execute(ddl); + } + + /** + * Returns a {@linkplain PhoenixConnection} + * @return + */ + protected Connection getPhoenixConnection() throws SQLException { + int zkport = hbaseTestUtil.getConfiguration().getInt("hbase.zookeeper.property.clientPort",2181); + String zkServer = hbaseTestUtil.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM); + String url = QueryUtil.getUrl(zkServer, zkport); + Connection phoenixConnection = DriverManager.getConnection(url); + return phoenixConnection; + } + + /** + * shutdown the hbase mini cluster + * @throws IOException + */ + public void shutdown() throws IOException { + if (null != hbaseTestUtil) { + LOG.info("Shutting down HBase cluster"); + hbaseCluster.shutdown(); + zookeeperCluster.shutdown(); + hbaseTestUtil = null; + } + FileUtils.deleteDirectory(new File(workDir)); + LOG.info("shutdown() method returning."); + restoreTestBuidlDataProperty(); + + } + + @After + @Override + public void tearDown() { + try { + shutdown(); + } catch(Exception ex) { + ex.printStackTrace(); + } + super.tearDown(); + } + + /** + * Create the argv to pass to Sqoop. + * @return the argv as an array of strings. + */ + protected String [] getArgv(boolean includeHadoopFlags,String splitBy,String phoenixTable, + String phoenixColumnMapping,boolean isBulkload, String queryStr) { + + Preconditions.checkNotNull(phoenixTable); + ArrayList args = new ArrayList(); + + if (includeHadoopFlags) { + CommonArgs.addHadoopFlags(args); + args.add("-D"); + args.add("hbase.zookeeper.property.clientPort=2181"); + } + + if (null != queryStr) { + args.add("--query"); + args.add(queryStr); + } else { + args.add("--table"); + args.add(getTableName()); + } + args.add("--split-by"); + args.add(splitBy); + args.add("--connect"); + args.add(HsqldbTestServer.getUrl()); + args.add("--num-mappers"); + args.add("1"); + args.add("--phoenix-table"); + args.add(phoenixTable); + if(null != phoenixColumnMapping) { + args.add("--phoenix-column-mapping"); + args.add(phoenixColumnMapping); + } + if(isBulkload) { + args.add("--phoenix-bulkload"); + } + return args.toArray(new String[0]); + } +} From 95a621d884dc1b3dc9f80d76c15babc48640f973 Mon Sep 17 00:00:00 2001 From: Ravi Magham Date: Sun, 15 Nov 2015 22:29:18 -0800 Subject: [PATCH 5/6] code formatting! --- ...TestCase.java => PhoenixBaseTestCase.java} | 36 +++++++++---------- .../sqoop/phoenix/PhoenixBasicImportTest.java | 2 +- .../sqoop/phoenix/PhoenixBulkImportTest.java | 2 +- .../sqoop/phoenix/PhoenixQueryImportTest.java | 2 +- 4 files changed, 21 insertions(+), 21 deletions(-) rename src/test/org/apache/sqoop/phoenix/{PhoenixTestCase.java => PhoenixBaseTestCase.java} (90%) diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixTestCase.java b/src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java similarity index 90% rename from src/test/org/apache/sqoop/phoenix/PhoenixTestCase.java rename to src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java index 19dfe83a6..d777c491f 100644 --- a/src/test/org/apache/sqoop/phoenix/PhoenixTestCase.java +++ b/src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java @@ -53,9 +53,9 @@ * Base test class for all phoenix tests. * */ -public class PhoenixTestCase extends ImportJobTestCase { +public class PhoenixBaseTestCase extends ImportJobTestCase { - public static final Log LOG = LogFactory.getLog(PhoenixTestCase.class.getName()); + public static final Log LOG = LogFactory.getLog(PhoenixBaseTestCase.class.getName()); private static String testBuildDataProperty = ""; @@ -88,16 +88,16 @@ public void setUp() { hbaseConf.setInt("hbase.master.info.port", -1); hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500); String zookeeperDir = new File(workDir, "zk").getAbsolutePath(); - int zookeeperPort = 2181; - zookeeperCluster = new MiniZooKeeperCluster(); - Method m; - Class zkParam[] = {Integer.TYPE}; - try { - m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort", - zkParam); - } catch (NoSuchMethodException e) { - m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", + int zookeeperPort = 2181; + zookeeperCluster = new MiniZooKeeperCluster(); + Method m; + Class zkParam[] = {Integer.TYPE}; + try { + m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort", zkParam); + } catch (NoSuchMethodException e) { + m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", + zkParam); } m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)}); zookeeperCluster.startup(new File(zookeeperDir)); @@ -117,13 +117,13 @@ public void setUp() { Class clazz = Class.forName("org.apache.hadoop.hbase.ServerName"); m = clazz.getDeclaredMethod("getHostAndPort", new Class[]{}); hostAndPort = m.invoke(serverName, new Object[]{}).toString(); - } - hbaseConf.set("hbase.master", hostAndPort); - hbaseTestUtil = new HBaseTestingUtility(hbaseConf); - hbaseTestUtil.setZkCluster(zookeeperCluster); - hbaseCluster.startMaster(); - super.setUp(); - } catch (Throwable e) { + } + hbaseConf.set("hbase.master", hostAndPort); + hbaseTestUtil = new HBaseTestingUtility(hbaseConf); + hbaseTestUtil.setZkCluster(zookeeperCluster); + hbaseCluster.startMaster(); + super.setUp(); + } catch (Throwable e) { throw new RuntimeException(e); } } diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java b/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java index 10fb9352a..2ab58bae7 100644 --- a/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java +++ b/src/test/org/apache/sqoop/phoenix/PhoenixBasicImportTest.java @@ -29,7 +29,7 @@ * Tests to import onto phoenix tables. * */ -public class PhoenixBasicImportTest extends PhoenixTestCase { +public class PhoenixBasicImportTest extends PhoenixBaseTestCase { /** * Test where the sqoop and phoenix table column names are the same. diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java b/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java index 4194a4349..a0c7fbb37 100644 --- a/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java +++ b/src/test/org/apache/sqoop/phoenix/PhoenixBulkImportTest.java @@ -29,7 +29,7 @@ * Tests to bulk import onto phoenix tables. * */ -public class PhoenixBulkImportTest extends PhoenixTestCase { +public class PhoenixBulkImportTest extends PhoenixBaseTestCase { /** * Test where the sqoop and phoenix table column names are the same. diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java b/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java index a6519aceb..1f368e281 100644 --- a/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java +++ b/src/test/org/apache/sqoop/phoenix/PhoenixQueryImportTest.java @@ -10,7 +10,7 @@ * Tests to insert onto phoenix table using query * */ -public class PhoenixQueryImportTest extends PhoenixTestCase { +public class PhoenixQueryImportTest extends PhoenixBaseTestCase { /** * From c24a7fb1ec1d41298eba88fabf8755326a9606ca Mon Sep 17 00:00:00 2001 From: Ravi Magham Date: Mon, 16 Nov 2015 09:53:49 -0800 Subject: [PATCH 6/6] upgrade phoenix v and fix tests --- build.xml | 2 +- .../sqoop/phoenix/PhoenixBaseTestCase.java | 74 ++++++++++--------- 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/build.xml b/build.xml index f30deb98e..64aca6dde 100644 --- a/build.xml +++ b/build.xml @@ -194,7 +194,7 @@ - + diff --git a/src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java b/src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java index d777c491f..4dae19ae1 100644 --- a/src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java +++ b/src/test/org/apache/sqoop/phoenix/PhoenixBaseTestCase.java @@ -26,6 +26,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Random; import java.util.UUID; import org.apache.commons.io.FileUtils; @@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.util.StringUtils; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.util.QueryUtil; import org.junit.After; @@ -58,7 +60,9 @@ public class PhoenixBaseTestCase extends ImportJobTestCase { public static final Log LOG = LogFactory.getLog(PhoenixBaseTestCase.class.getName()); private static String testBuildDataProperty = ""; - + private static final int ZK_DEFAULT_PORT = 2181; + private static final Random random = new Random(); + private static void recordTestBuildDataProperty() { testBuildDataProperty = System.getProperty("test.build.data", ""); } @@ -71,8 +75,7 @@ private static void restoreTestBuidlDataProperty() { private String workDir = createTempDir().getAbsolutePath(); private MiniZooKeeperCluster zookeeperCluster; private MiniHBaseCluster hbaseCluster; - - + @Override @Before public void setUp() { @@ -83,12 +86,12 @@ public void setUp() { Configuration hbaseConf = HBaseConfiguration.create(); hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot); //Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT - hbaseConf.setInt("hbase.zookeeper.property.clientPort", 2181); + int zookeeperPort = ZK_DEFAULT_PORT + random.nextInt(1000); + hbaseConf.setInt("hbase.zookeeper.property.clientPort",zookeeperPort ); hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0"); hbaseConf.setInt("hbase.master.info.port", -1); hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500); String zookeeperDir = new File(workDir, "zk").getAbsolutePath(); - int zookeeperPort = 2181; zookeeperCluster = new MiniZooKeeperCluster(); Method m; Class zkParam[] = {Integer.TYPE}; @@ -98,34 +101,34 @@ public void setUp() { } catch (NoSuchMethodException e) { m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort", zkParam); - } - m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)}); - zookeeperCluster.startup(new File(zookeeperDir)); - hbaseCluster = new MiniHBaseCluster(hbaseConf, 1); - HMaster master = hbaseCluster.getMaster(); - Object serverName = master.getServerName(); + } + m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)}); + zookeeperCluster.startup(new File(zookeeperDir)); + hbaseCluster = new MiniHBaseCluster(hbaseConf, 1); + HMaster master = hbaseCluster.getMaster(); + Object serverName = master.getServerName(); - String hostAndPort; - if (serverName instanceof String) { - m = HMaster.class.getDeclaredMethod("getMasterAddress", + String hostAndPort; + if (serverName instanceof String) { + m = HMaster.class.getDeclaredMethod("getMasterAddress", new Class[]{}); - Class clazz = Class.forName("org.apache.hadoop.hbase.HServerAddress"); - Object serverAddr = clazz.cast(m.invoke(master, new Object[]{})); - //returns the address as hostname:port - hostAndPort = serverAddr.toString(); - } else { - Class clazz = Class.forName("org.apache.hadoop.hbase.ServerName"); - m = clazz.getDeclaredMethod("getHostAndPort", new Class[]{}); - hostAndPort = m.invoke(serverName, new Object[]{}).toString(); + Class clazz = Class.forName("org.apache.hadoop.hbase.HServerAddress"); + Object serverAddr = clazz.cast(m.invoke(master, new Object[]{})); + //returns the address as hostname:port + hostAndPort = serverAddr.toString(); + } else { + Class clazz = Class.forName("org.apache.hadoop.hbase.ServerName"); + m = clazz.getDeclaredMethod("getHostAndPort", new Class[]{}); + hostAndPort = m.invoke(serverName, new Object[]{}).toString(); + } + hbaseConf.set("hbase.master", hostAndPort); + hbaseTestUtil = new HBaseTestingUtility(hbaseConf); + hbaseTestUtil.setZkCluster(zookeeperCluster); + hbaseCluster.startMaster(); + super.setUp(); + } catch (Throwable e) { + throw new RuntimeException(e); } - hbaseConf.set("hbase.master", hostAndPort); - hbaseTestUtil = new HBaseTestingUtility(hbaseConf); - hbaseTestUtil.setZkCluster(zookeeperCluster); - hbaseCluster.startMaster(); - super.setUp(); - } catch (Throwable e) { - throw new RuntimeException(e); - } } public static File createTempDir() { @@ -154,7 +157,7 @@ protected void createPhoenixTestTable(final String ddl) throws SQLException { * @return */ protected Connection getPhoenixConnection() throws SQLException { - int zkport = hbaseTestUtil.getConfiguration().getInt("hbase.zookeeper.property.clientPort",2181); + int zkport = hbaseTestUtil.getConfiguration().getInt("hbase.zookeeper.property.clientPort",ZK_DEFAULT_PORT); String zkServer = hbaseTestUtil.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM); String url = QueryUtil.getUrl(zkServer, zkport); Connection phoenixConnection = DriverManager.getConnection(url); @@ -175,8 +178,7 @@ public void shutdown() throws IOException { FileUtils.deleteDirectory(new File(workDir)); LOG.info("shutdown() method returning."); restoreTestBuidlDataProperty(); - - } + } @After @Override @@ -184,7 +186,8 @@ public void tearDown() { try { shutdown(); } catch(Exception ex) { - ex.printStackTrace(); + LOG.error("Error shutting down HBase minicluster: " + + StringUtils.stringifyException(ex)); } super.tearDown(); } @@ -197,12 +200,13 @@ public void tearDown() { String phoenixColumnMapping,boolean isBulkload, String queryStr) { Preconditions.checkNotNull(phoenixTable); + int zkPort = hbaseTestUtil.getConfiguration().getInt("hbase.zookeeper.property.clientPort",ZK_DEFAULT_PORT); ArrayList args = new ArrayList(); if (includeHadoopFlags) { CommonArgs.addHadoopFlags(args); args.add("-D"); - args.add("hbase.zookeeper.property.clientPort=2181"); + args.add(String.format("hbase.zookeeper.property.clientPort=%s",zkPort)); } if (null != queryStr) {