diff --git a/utilities/Hive_metastore_migration/README.md b/utilities/Hive_metastore_migration/README.md index b228a59a..ab881486 100644 --- a/utilities/Hive_metastore_migration/README.md +++ b/utilities/Hive_metastore_migration/README.md @@ -243,6 +243,7 @@ as an Glue ETL job, if AWS Glue can directly connect to your Hive metastore. - `--database-input-path` set to the S3 path containing only databases. For example: `s3://someBucket/output_path_from_previous_job/databases` - `--table-input-path` set to the S3 path containing only tables. For example: `s3://someBucket/output_path_from_previous_job/tables` - `--partition-input-path` set to the S3 path containing only partitions. For example: `s3://someBucket/output_path_from_previous_job/partitions` + - `--database-prefix` (optional) set to a string prefix that is applied to the database name created in AWS Glue Data Catalog. You can use it as a way to track the origin of the metadata, and avoid naming conflicts. The default is the empty string. Also, because there is no need to connect to any JDBC source, the job doesn't require any connections. diff --git a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py index 5ba7aa71..d4244930 100644 --- a/utilities/Hive_metastore_migration/src/import_into_datacatalog.py +++ b/utilities/Hive_metastore_migration/src/import_into_datacatalog.py @@ -3,6 +3,8 @@ from __future__ import print_function +from pyspark.sql.functions import lit, struct, array, col, concat + from awsglue.context import GlueContext from awsglue.dynamicframe import DynamicFrame @@ -40,8 +42,8 @@ def import_datacatalog(sql_context, glue_context, datacatalog_name, databases, t connection_options={'catalog.name': datacatalog_name, 'catalog.region': region}) -def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix - , region): +def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix, + region): # extract hive_metastore = HiveMetastore(connection, sql_context) hive_metastore.extract_metastore() @@ -50,18 +52,26 @@ def metastore_full_migration(sc, sql_context, glue_context, connection, datacata (databases, tables, partitions) = HiveMetastoreTransformer( sc, sql_context, db_prefix, table_prefix).transform(hive_metastore) - #load + # load import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) -def metastore_import_from_s3(sql_context, glue_context,db_input_dir, tbl_input_dir, parts_input_dir, - datacatalog_name, region): +def metastore_import_from_s3(sql_context, glue_context, db_input_dir, tbl_input_dir, parts_input_dir, db_prefix, datacatalog_name, + region): # extract databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA) tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA) partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA) + # Changes to Prefix on database + if db_prefix: + databases = databases.withColumn('item', struct(col('item.description'), col('item.locationUri'), concat(lit(db_prefix),col('item.name')).alias('name'), col('item.parameters'))) + tables = tables.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) + partitions = partitions.withColumn("database",concat(lit(db_prefix),col('database')).alias('database')) + partitions = partitions.withColumn('item', struct(col('item.creationTime'), col('item.creationTime'), concat(lit(db_prefix),col('item.namespaceName')).alias('namespaceName'), col('item.parameters'), col('item.storageDescriptor'), col('item.values'))) + + # load import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region) @@ -71,21 +81,29 @@ def main(): from_s3 = 'from-s3' from_jdbc = 'from-jdbc' parser = argparse.ArgumentParser(prog=sys.argv[0]) - parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc], help='Choose to migrate metastore either from JDBC or from S3') - parser.add_argument('-c', '--connection-name', required=False, help='Glue Connection name for Hive metastore JDBC connection') - parser.add_argument('-R', '--region', required=False, help='AWS region of target Glue DataCatalog, default to "us-east-1"') - parser.add_argument('-d', '--database-prefix', required=False, help='Optional prefix for database names in Glue DataCatalog') - parser.add_argument('-t', '--table-prefix', required=False, help='Optional prefix for table name in Glue DataCatalog') - parser.add_argument('-D', '--database-input-path', required=False, help='An S3 path containing json files of metastore database entities') - parser.add_argument('-T', '--table-input-path', required=False, help='An S3 path containing json files of metastore table entities') - parser.add_argument('-P', '--partition-input-path', required=False, help='An S3 path containing json files of metastore partition entities') + parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc], + help='Choose to migrate metastore either from JDBC or from S3') + parser.add_argument('-c', '--connection-name', required=False, + help='Glue Connection name for Hive metastore JDBC connection') + parser.add_argument('-R', '--region', required=False, + help='AWS region of target Glue DataCatalog, default to "us-east-1"') + parser.add_argument('-d', '--database-prefix', required=False, + help='Optional prefix for database names in Glue DataCatalog') + parser.add_argument('-t', '--table-prefix', required=False, + help='Optional prefix for table name in Glue DataCatalog') + parser.add_argument('-D', '--database-input-path', required=False, + help='An S3 path containing json files of metastore database entities') + parser.add_argument('-T', '--table-input-path', required=False, + help='An S3 path containing json files of metastore table entities') + parser.add_argument('-P', '--partition-input-path', required=False, + help='An S3 path containing json files of metastore partition entities') options = get_options(parser, sys.argv) if options['mode'] == from_s3: validate_options_in_mode( options=options, mode=from_s3, required_options=['database_input_path', 'table_input_path', 'partition_input_path'], - not_allowed_options=['database_prefix', 'table_prefix'] + not_allowed_options=['table_prefix'] ) elif options['mode'] == from_jdbc: validate_options_in_mode( @@ -110,6 +128,7 @@ def main(): db_input_dir=options['database_input_path'], tbl_input_dir=options['table_input_path'], parts_input_dir=options['partition_input_path'], + db_prefix=options.get('database_prefix') or '', datacatalog_name='datacatalog', region=options.get('region') or 'us-east-1' ) @@ -126,5 +145,6 @@ def main(): region=options.get('region') or 'us-east-1' ) + if __name__ == '__main__': main()