diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/README.md b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/README.md index 1fd156df..9c6e7f6c 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/README.md +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/README.md @@ -1,17 +1,16 @@ # Overview -`bg_import` is a utility that performs bulk import of categories and terms into -a Data Catalog business glossary from CSV files. To achieve that, the CSV files - one for -categories and one for terms - are parsed and validated. The resulting list of -categories and terms are then added into the target glossary via Data Catalog -API. If any errors occur at any stage of the process then an error report is -printed and import continues or completely stops depending on input flags. +`bg_import` is a utility that performs bulk import and export of categories and terms into a Data Catalog business glossary using CSV files. To achieve the import functionality, the CSV files - one for categories and one for terms - are parsed and validated. The resulting list of categories and terms are then added into the target glossary via Data Catalog API. If any errors occur at any stage of the process then an error report is printed and import continues or completely stops depending on input flags. Additionally, the export functionality allows you to export the data from a Data Catalog business glossary to CSV files. This feature retrieves all glossary entries (both categories and terms) and writes them to separate CSV files. Business Glossary API is currently on private preview, and it needs to be enabled on the project for it to be used. + + + ## Usage +### Import ``` python3 bg_import/business_glossary_import.py --project= @@ -31,6 +30,20 @@ is deprecated. \ Run `python3 bg_import/business_glossary_import.py -h` for description of individual arguments. +### Export +``` +python3 bg_import/business_glossary_export.py + --project=${PROJECT} + --group=${ENTRY_GROUP} + --glossary=${GLOSSARY} + --location=${LOCATION} + --categories-csv= + --terms-csv= + [-h] +``` + +* Provide a terms CSV file and categories CSV file using `--terms-csv` argument, `--categories-csv` argument respectively to export the terms and categories. + ### Access token For the utility to be able to access Data Catalog API an access token has to be @@ -97,3 +110,6 @@ In the case where a list of items inside a field contains the delimiter value comma (,) the field has to be escaped by using double quotes (" "). e.g. term 1, "Term 1, a description", "Data Steward1, Data teward2",,, + +#### Note: +* tagged_assets are not exported to CSV files as of now, it will be implemented soon diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/business_glossary_export.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/business_glossary_export.py new file mode 100644 index 00000000..d09ff9e6 --- /dev/null +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/business_glossary_export.py @@ -0,0 +1,259 @@ +"""This script is used to export the data from a Data Catalog glossary to CSV files - one for categories and one for terms. + +Categories CSV file contains the following columns: + - category_display_name: The display name of the category. + - description: Plain text or rich text encoded as plain text description for + the category + - steward: List of data stewards for the current category, with each steward + separated by a comma + - belongs_to_category: Display name of a category to which the category + belongs + + +Terms CSV file contains the following columns: + - term_display_name: Unique name for the entry term + - description: Plain text or rich text encoded as plain text description for + the term. + - steward: List of data stewards for the current term, with each steward + separated by a comma + - tagged_assets: List of assets tagged with the term, with each asset + separated by a comma (not implemented yet) + - synonyms: List of terms that have a synonym relation with the current term, + with each term separated by a comma + - related_terms: List of terms that have a related-to relation with the + current term, with each term separated by a comma + - belongs_to_category: Display name of a category to which the term belong +""" + + +import csv +import os +import requests +import sys +from typing import Any, List, Dict +import glossary as dc_glossary +import glossary_identification +import api_call_utils +import logging_utils +import utils + +logger = logging_utils.get_logger() +DATACATALOG_BASE_URL = 'https://datacatalog.googleapis.com/v2' + +# Assuming fetch_api_response is defined in api_call_utils +from api_call_utils import fetch_api_response + +def fetch_entries(project: str, location: str, entry_group: str) -> List[Dict[str, Any]]: + """Fetches all entries in the glossary. + + Args: + project: The Google Cloud Project ID. + location: The location of the glossary. + entry_group: The entry group of the glossary. + + Returns: + A list of dictionaries containing the entries. + """ + entries = [] + get_full_entry_url = ( + DATACATALOG_BASE_URL + + f'/projects/{project}/locations/{location}/entryGroups/{entry_group}/entries?view=FULL') + keep_reading, page_token = True, None + + while keep_reading: + if page_token: + endpoint_url = f'{get_full_entry_url}&pageToken={page_token}' + else: + endpoint_url = get_full_entry_url + + response = api_call_utils.fetch_api_response( + requests.get, endpoint_url, project + ) + + if response['error_msg']: + raise ValueError(response['error_msg']) + + if 'entries' in response['json']: + entries.extend(response['json']['entries']) + + page_token = response['json'].get('nextPageToken', None) + if not page_token: + keep_reading = False + + return entries + + +def fetch_entry_info(entry_name: str, project: str) -> Dict[str, Any]: + """Fetches details for a specific entry from the Data Catalog. + + Args: + entry_name: The full resource name of the entry. + project: The Google Cloud Project ID. + + Returns: + A dictionary containing the entry details. + """ + fetch_entry_info_url = DATACATALOG_BASE_URL + f'/{entry_name}' + + response = api_call_utils.fetch_api_response( + requests.get, fetch_entry_info_url, project + ) + if response['error_msg']: + raise ValueError(response['error_msg']) + return response['json'] + + +def fetch_relationships(entry_name: str, project: str) -> List[Dict[str, Any]]: + """Fetches relationships for a specific entry from the Data Catalog. + + Args: + entry_name: The full resource name of the entry. + project: The Google Cloud Project ID. + + Returns: + A list of dictionaries containing the relationships. + """ + fetch_relationships_url = DATACATALOG_BASE_URL + f'/{entry_name}/relationships' + response = api_call_utils.fetch_api_response( + requests.get, fetch_relationships_url, project + ) + if response['error_msg']: + raise ValueError(response['error_msg']) + return response['json'].get('relationships', []) + + +def get_entry_display_name(entry_name: str, project: str) -> str: + fetch_display_name_url = DATACATALOG_BASE_URL + f'/{entry_name}' + response = api_call_utils.fetch_api_response( + requests.get, fetch_display_name_url, project + ) + if response['error_msg']: + raise ValueError(response['error_msg']) + return response['json'].get('displayName', '') + + +def export_glossary_entries( + entries: List[Dict[str, Any]], + categories_csv: str, + terms_csv: str, + project: str, +): + """Exports the glossary entries to a CSV file. + + Args: + entries: The list of entries to export. + categories_csv: The path to the CSV file to export the categories data. + terms_csv: The path to the CSV file to export the terms data. + project: The Google Cloud Project ID. + """ + categories_fields = [ + 'category_display_name', + 'description', + 'steward', + 'belongs_to_category', + ] + terms_fields = [ + 'term_display_name', + 'description', + 'steward', + 'tagged_assets', + 'synonyms', + 'related_terms', + 'belongs_to_category', + ] + + with ( + open(categories_csv, mode='w', newline='') as categories_file, + open(terms_csv, mode='w', newline='') as terms_file, + ): + categories_writer = csv.DictWriter( + categories_file, fieldnames=categories_fields, quoting=csv.QUOTE_ALL + ) + terms_writer = csv.DictWriter( + terms_file, fieldnames=terms_fields, quoting=csv.QUOTE_ALL + ) + + for entry in entries: + entry_info = fetch_entry_info(entry['name'], project) + entry_type = entry_info.get('entryType', '') + display_name = entry_info.get('displayName', '') + + # Initialize core aspects and json content + core_aspects = entry_info.get('coreAspects', {}) + business_context = core_aspects.get('business_context', {}) + business_context = business_context.get('jsonContent', {}) + + # Extract description and stewards + description = business_context.get('description', '') + stewards = ', '.join(business_context.get('contacts', [])) + + # Fetch relationships + relationships = fetch_relationships(entry_info['name'], project) + belongs_to_category = '' + synonyms = '' + related_terms = '' + + for rel in relationships: + if rel['relationshipType'] == 'belongs_to': + belongs_to_category = get_entry_display_name( + rel['destinationEntryName'], project + ) + elif rel['relationshipType'] == 'is_synonymous_to': + synonyms += ( + get_entry_display_name(rel['destinationEntryName'], project) + + ', ' + ) + elif rel['relationshipType'] == 'is_related_to': + related_terms += ( + get_entry_display_name(rel['destinationEntryName'], project) + + ', ' + ) + + synonyms = synonyms.rstrip(', ') + related_terms = related_terms.rstrip(', ') + + if entry_type == 'glossary_term': + terms_writer.writerow({ + 'term_display_name': display_name, + 'description': description, + 'steward': stewards, + 'tagged_assets': '', + 'synonyms': synonyms, + 'related_terms': related_terms, + 'belongs_to_category': belongs_to_category, + }) + elif entry_type == 'glossary_category': + categories_writer.writerow({ + 'category_display_name': display_name, + 'description': description, + 'steward': stewards, + 'belongs_to_category': belongs_to_category, + }) + + +def main(): + args = utils.get_export_arguments() + utils.validate_export_args(args) + + try: + dc_glossary.Glossary( + glossary_identification.GlossaryId( + project_id=args.project, + location=args.location, + entry_group=args.group, + glossary_id=args.glossary, + ) + ) + except ValueError as e: + logger.error( + "Can't proceed with export. Please select a valid glossary.", e + ) + sys.exit(1) + entries = fetch_entries(args.project, args.location, args.group) + export_glossary_entries( + entries, args.categories_csv, args.terms_csv, args.project + ) + + +if __name__ == '__main__': + main() diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/category.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/category.py index 32116360..03613951 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/category.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/category.py @@ -54,14 +54,25 @@ def __repr__(self): def _generate_category_id(self): """Unique glossary category ID.""" + random_length = 7 + max_term_id_length = 64 + if not self.display_name: return "" - infix = re.sub(r"[^a-zA-Z0-9_]", "_", self.display_name).lower() - prefix = "_" if infix[0].isdigit() else "" - suffix = "".join( - random.choices(string.ascii_lowercase + string.digits, k=7) + + max_length = min(len(self.display_name), max_term_id_length - random_length) + unique_id = re.sub(r"[^a-zA-Z0-9_]", "_", self.display_name).lower()[ + :max_length + ] + + unique_id = unique_id + "".join( + random.choices(string.ascii_lowercase + string.digits, k=random_length) ) - return f"{prefix}{infix}{suffix}" + + if unique_id[0].isdigit(): + unique_id = "_" + unique_id + + return unique_id[:max_term_id_length] @classmethod def from_dict(cls, entry: dict[str, Any]) -> Category | None: diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/term.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/term.py index e6b1e240..2bfa7846 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/term.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/term.py @@ -64,14 +64,24 @@ def __repr__(self): def _generate_term_id(self): """Unique glossary term ID.""" + random_length = 7 + max_term_id_length = 64 + if not self.display_name: return "" - infix = re.sub(r"[^a-zA-Z0-9_]", "_", self.display_name).lower() - prefix = "_" if infix[0].isdigit() else "" - suffix = "".join( - random.choices(string.ascii_lowercase + string.digits, k=7) + + max_length = min(len(self.display_name), max_term_id_length - random_length) + unique_id = re.sub(r"[^a-zA-Z0-9_]", "_", self.display_name).lower()[ + :max_length + ] + + unique_id = unique_id + "".join( + random.choices(string.ascii_lowercase + string.digits, k=random_length) ) - return f"{prefix}{infix}{suffix}" + + if unique_id[0].isdigit(): + unique_id = "_" + unique_id + return unique_id[:max_term_id_length] @classmethod def from_dict(cls, entry: dict[str, Any]) -> Term | None: diff --git a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/utils.py b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/utils.py index 8cc98a2b..44ba6eb7 100644 --- a/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/utils.py +++ b/dataplex-quickstart-labs/00-resources/scripts/python/business-glossary-import/bg_import/utils.py @@ -194,3 +194,82 @@ def _verify_csv_file_existence( f"The CSV file path provided for {prefix}{arg_name} doesn't exist." ) sys.exit(1) + +def get_export_arguments() -> argparse.Namespace: + """Gets arguments for the export program. + + Returns: + Namespace object containing the export program arguments. + """ + parser = argparse.ArgumentParser( + formatter_class=argparse.RawTextHelpFormatter + ) + configure_export_argument_parser(parser) + return parser.parse_args() + +def configure_export_argument_parser(parser: argparse.ArgumentParser) -> None: + """Defines flags and parses arguments related to export. + + Args: + parser: argparse.ArgumentParser(). + """ + parser.add_argument( + "--project", + help="ID of Google Cloud Project containing the destination glossary.", + metavar="", + type=str, + required=True, + ) + parser.add_argument( + "--group", + help="Identifier of an existing Entry Group where the target glossary is located.", + metavar="", + type=str, + required=True, + ) + parser.add_argument( + "--glossary", + help="Identifier of the destination glossary to which data will be exported.", + metavar="", + type=str, + required=True, + ) + parser.add_argument( + "--location", + help="Location code where the glossary resource exists.", + metavar="", + type=str, + required=True, + ) + parser.add_argument( + "--categories-csv", + help="Path to the CSV file to export the categories data.", + metavar="[Categories CSV file for export]", + type=str, + required=True, + ) + parser.add_argument( + "--terms-csv", + help="Path to the CSV file to export the terms data.", + metavar="[Terms CSV file for export]", + type=str, + required=True, + ) + +def validate_export_args(args: argparse.Namespace) -> None: + """Validates script run arguments for exporting. + + Args: + args: script run arguments + """ + if not args.categories_csv or not args.terms_csv: + logger.error("Both --categories-csv and --terms-csv arguments must be provided for export.") + sys.exit(1) + + if not os.path.isdir(os.path.dirname(args.categories_csv)): + logger.error(f"Directory for categories CSV export path does not exist: {args.categories_csv}") + sys.exit(1) + + if not os.path.isdir(os.path.dirname(args.terms_csv)): + logger.error(f"Directory for terms CSV export path does not exist: {args.terms_csv}") + sys.exit(1)