Skip to content

Commit c831320

Browse files
Add option to put "database_prefix" when import is mode is "from_s3". (#173)
* Add option to put "database_prefix" when import is mode is "from_s3". Using this approach its possible to import between multiple accounts and add a database prefix. * Modifications to merge #66 * Reverted license header * Removed unused codes and fixed indent * Added --database-prefix parameter for from-s3 mode in README --------- Co-authored-by: daniloffantinato <[email protected]>
1 parent f1d4142 commit c831320

File tree

2 files changed

+35
-14
lines changed

2 files changed

+35
-14
lines changed

utilities/Hive_metastore_migration/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ as an Glue ETL job, if AWS Glue can directly connect to your Hive metastore.
245245
- `--database-input-path` set to the S3 path containing only databases. For example: `s3://someBucket/output_path_from_previous_job/databases`
246246
- `--table-input-path` set to the S3 path containing only tables. For example: `s3://someBucket/output_path_from_previous_job/tables`
247247
- `--partition-input-path` set to the S3 path containing only partitions. For example: `s3://someBucket/output_path_from_previous_job/partitions`
248+
- `--database-prefix` (optional) set to a string prefix that is applied to the database name created in AWS Glue Data Catalog. You can use it as a way to track the origin of the metadata, and avoid naming conflicts. The default is the empty string.
248249

249250
Also, because there is no need to connect to any JDBC source, the job doesn't
250251
require any connections.

utilities/Hive_metastore_migration/src/import_into_datacatalog.py

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
# Work with Python 3 in Glue 2.0 and Glue 3.0
55
from __future__ import print_function
66

7+
from pyspark.sql.functions import lit, struct, array, col, concat
8+
79
from awsglue.context import GlueContext
810
from awsglue.dynamicframe import DynamicFrame
911

@@ -41,8 +43,8 @@ def import_datacatalog(sql_context, glue_context, datacatalog_name, databases, t
4143
connection_options={'catalog.name': datacatalog_name, 'catalog.region': region})
4244

4345

44-
def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix
45-
, region):
46+
def metastore_full_migration(sc, sql_context, glue_context, connection, datacatalog_name, db_prefix, table_prefix,
47+
region):
4648
# extract
4749
hive_metastore = HiveMetastore(connection, sql_context)
4850
hive_metastore.extract_metastore()
@@ -51,18 +53,26 @@ def metastore_full_migration(sc, sql_context, glue_context, connection, datacata
5153
(databases, tables, partitions) = HiveMetastoreTransformer(
5254
sc, sql_context, db_prefix, table_prefix).transform(hive_metastore)
5355

54-
#load
56+
# load
5557
import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region)
5658

5759

58-
def metastore_import_from_s3(sql_context, glue_context,db_input_dir, tbl_input_dir, parts_input_dir,
59-
datacatalog_name, region):
60+
def metastore_import_from_s3(sql_context, glue_context, db_input_dir, tbl_input_dir, parts_input_dir, db_prefix, datacatalog_name,
61+
region):
6062

6163
# extract
6264
databases = sql_context.read.json(path=db_input_dir, schema=METASTORE_DATABASE_SCHEMA)
6365
tables = sql_context.read.json(path=tbl_input_dir, schema=METASTORE_TABLE_SCHEMA)
6466
partitions = sql_context.read.json(path=parts_input_dir, schema=METASTORE_PARTITION_SCHEMA)
6567

68+
# Changes to Prefix on database
69+
if db_prefix:
70+
databases = databases.withColumn('item', struct(col('item.description'), col('item.locationUri'), concat(lit(db_prefix),col('item.name')).alias('name'), col('item.parameters')))
71+
tables = tables.withColumn("database",concat(lit(db_prefix),col('database')).alias('database'))
72+
partitions = partitions.withColumn("database",concat(lit(db_prefix),col('database')).alias('database'))
73+
partitions = partitions.withColumn('item', struct(col('item.creationTime'), col('item.creationTime'), concat(lit(db_prefix),col('item.namespaceName')).alias('namespaceName'), col('item.parameters'), col('item.storageDescriptor'), col('item.values')))
74+
75+
6676
# load
6777
import_datacatalog(sql_context, glue_context, datacatalog_name, databases, tables, partitions, region)
6878

@@ -72,21 +82,29 @@ def main():
7282
from_s3 = 'from-s3'
7383
from_jdbc = 'from-jdbc'
7484
parser = argparse.ArgumentParser(prog=sys.argv[0])
75-
parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc], help='Choose to migrate metastore either from JDBC or from S3')
76-
parser.add_argument('-c', '--connection-name', required=False, help='Glue Connection name for Hive metastore JDBC connection')
77-
parser.add_argument('-R', '--region', required=False, help='AWS region of target Glue DataCatalog, default to "us-east-1"')
78-
parser.add_argument('-d', '--database-prefix', required=False, help='Optional prefix for database names in Glue DataCatalog')
79-
parser.add_argument('-t', '--table-prefix', required=False, help='Optional prefix for table name in Glue DataCatalog')
80-
parser.add_argument('-D', '--database-input-path', required=False, help='An S3 path containing json files of metastore database entities')
81-
parser.add_argument('-T', '--table-input-path', required=False, help='An S3 path containing json files of metastore table entities')
82-
parser.add_argument('-P', '--partition-input-path', required=False, help='An S3 path containing json files of metastore partition entities')
85+
parser.add_argument('-m', '--mode', required=True, choices=[from_s3, from_jdbc],
86+
help='Choose to migrate metastore either from JDBC or from S3')
87+
parser.add_argument('-c', '--connection-name', required=False,
88+
help='Glue Connection name for Hive metastore JDBC connection')
89+
parser.add_argument('-R', '--region', required=False,
90+
help='AWS region of target Glue DataCatalog, default to "us-east-1"')
91+
parser.add_argument('-d', '--database-prefix', required=False,
92+
help='Optional prefix for database names in Glue DataCatalog')
93+
parser.add_argument('-t', '--table-prefix', required=False,
94+
help='Optional prefix for table name in Glue DataCatalog')
95+
parser.add_argument('-D', '--database-input-path', required=False,
96+
help='An S3 path containing json files of metastore database entities')
97+
parser.add_argument('-T', '--table-input-path', required=False,
98+
help='An S3 path containing json files of metastore table entities')
99+
parser.add_argument('-P', '--partition-input-path', required=False,
100+
help='An S3 path containing json files of metastore partition entities')
83101

84102
options = get_options(parser, sys.argv)
85103
if options['mode'] == from_s3:
86104
validate_options_in_mode(
87105
options=options, mode=from_s3,
88106
required_options=['database_input_path', 'table_input_path', 'partition_input_path'],
89-
not_allowed_options=['database_prefix', 'table_prefix']
107+
not_allowed_options=['table_prefix']
90108
)
91109
elif options['mode'] == from_jdbc:
92110
validate_options_in_mode(
@@ -111,6 +129,7 @@ def main():
111129
db_input_dir=options['database_input_path'],
112130
tbl_input_dir=options['table_input_path'],
113131
parts_input_dir=options['partition_input_path'],
132+
db_prefix=options.get('database_prefix') or '',
114133
datacatalog_name='datacatalog',
115134
region=options.get('region') or 'us-east-1'
116135
)
@@ -127,5 +146,6 @@ def main():
127146
region=options.get('region') or 'us-east-1'
128147
)
129148

149+
130150
if __name__ == '__main__':
131151
main()

0 commit comments

Comments
 (0)