Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
# Overview

`bg_import` is a utility that performs bulk import of categories and terms into
a Data Catalog business glossary from CSV files. To achieve that, the CSV files - one for
categories and one for terms - are parsed and validated. The resulting list of
categories and terms are then added into the target glossary via Data Catalog
API. If any errors occur at any stage of the process then an error report is
printed and import continues or completely stops depending on input flags.
`bg_import` is a utility that performs bulk import and export of categories and terms into a Data Catalog business glossary using CSV files. To achieve the import functionality, the CSV files - one for categories and one for terms - are parsed and validated. The resulting list of categories and terms are then added into the target glossary via Data Catalog API. If any errors occur at any stage of the process then an error report is printed and import continues or completely stops depending on input flags. Additionally, the export functionality allows you to export the data from a Data Catalog business glossary to CSV files. This feature retrieves all glossary entries (both categories and terms) and writes them to separate CSV files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change "error report is printed and import continues" to "error report is printed and export continues"


Business Glossary API is currently on private preview, and it needs to be
enabled on the project for it to be used.




## Usage

### Import
```
python3 bg_import/business_glossary_import.py <terms csv file legacy>
--project=<project_id>
Expand All @@ -31,6 +30,20 @@ is deprecated. \
Run `python3 bg_import/business_glossary_import.py -h` for description of
individual arguments.

### Export
```
python3 bg_import/business_glossary_export.py
--project=${PROJECT}
--group=${ENTRY_GROUP}
--glossary=${GLOSSARY}
--location=${LOCATION}
--categories-csv=<categories csv file>
--terms-csv=<terms csv file>
[-h]
```

* Provide a terms CSV file and categories CSV file using `--terms-csv` argument, `--categories-csv` argument respectively to export the terms and categories.

### Access token

For the utility to be able to access Data Catalog API an access token has to be
Expand Down Expand Up @@ -97,3 +110,6 @@ In the case where a list of items inside a field contains the delimiter value
comma (,) the field has to be escaped by using double quotes (" "). e.g. term 1,
"Term 1, a description", "Data Steward1<[email protected]>, Data
teward2<[email protected]>",,,

#### Note:
* tagged_assets are not exported to CSV files as of now, it will be implemented soon
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
"""This script is used to export the data from a Data Catalog glossary to CSV files - one for categories and one for terms.

Categories CSV file contains the following columns:
- category_display_name: The display name of the category.
- description: Plain text or rich text encoded as plain text description for
the category
- steward: List of data stewards for the current category, with each steward
separated by a comma
- belongs_to_category: Display name of a category to which the category
belongs


Terms CSV file contains the following columns:
- term_display_name: Unique name for the entry term
- description: Plain text or rich text encoded as plain text description for
the term.
- steward: List of data stewards for the current term, with each steward
separated by a comma
- tagged_assets: List of assets tagged with the term, with each asset
separated by a comma (not implemented yet)
- synonyms: List of terms that have a synonym relation with the current term,
with each term separated by a comma
- related_terms: List of terms that have a related-to relation with the
current term, with each term separated by a comma
- belongs_to_category: Display name of a category to which the term belong
"""


import csv
import os
import requests
import sys
from typing import Any, List, Dict
import glossary as dc_glossary
import glossary_identification
import api_call_utils
import logging_utils
import utils

logger = logging_utils.get_logger()
DATACATALOG_BASE_URL = 'https://datacatalog.googleapis.com/v2'

# Assuming fetch_api_response is defined in api_call_utils
from api_call_utils import fetch_api_response

def fetch_entries(project: str, location: str, entry_group: str) -> List[Dict[str, Any]]:
"""Fetches all entries in the glossary.

Args:
project: The Google Cloud Project ID.
location: The location of the glossary.
entry_group: The entry group of the glossary.

Returns:
A list of dictionaries containing the entries.
"""
entries = []
get_full_entry_url = (
DATACATALOG_BASE_URL
+ f'/projects/{project}/locations/{location}/entryGroups/{entry_group}/entries?view=FULL')
keep_reading, page_token = True, None

while keep_reading:
if page_token:
endpoint_url = f'{get_full_entry_url}&pageToken={page_token}'
else:
endpoint_url = get_full_entry_url

response = api_call_utils.fetch_api_response(
requests.get, endpoint_url, project
)

if response['error_msg']:
raise ValueError(response['error_msg'])

if 'entries' in response['json']:
entries.extend(response['json']['entries'])

page_token = response['json'].get('nextPageToken', None)
if not page_token:
keep_reading = False

return entries


def fetch_entry_info(entry_name: str, project: str) -> Dict[str, Any]:
"""Fetches details for a specific entry from the Data Catalog.

Args:
entry_name: The full resource name of the entry.
project: The Google Cloud Project ID.

Returns:
A dictionary containing the entry details.
"""
fetch_entry_info_url = DATACATALOG_BASE_URL + f'/{entry_name}'

response = api_call_utils.fetch_api_response(
requests.get, fetch_entry_info_url, project
)
if response['error_msg']:
raise ValueError(response['error_msg'])
return response['json']


def fetch_relationships(entry_name: str, project: str) -> List[Dict[str, Any]]:
"""Fetches relationships for a specific entry from the Data Catalog.

Args:
entry_name: The full resource name of the entry.
project: The Google Cloud Project ID.

Returns:
A list of dictionaries containing the relationships.
"""
fetch_relationships_url = DATACATALOG_BASE_URL + f'/{entry_name}/relationships'
response = api_call_utils.fetch_api_response(
requests.get, fetch_relationships_url, project
)
if response['error_msg']:
raise ValueError(response['error_msg'])
return response['json'].get('relationships', [])


def get_entry_display_name(entry_name: str, project: str) -> str:
fetch_display_name_url = DATACATALOG_BASE_URL + f'/{entry_name}'
response = api_call_utils.fetch_api_response(
requests.get, fetch_display_name_url, project
)
if response['error_msg']:
raise ValueError(response['error_msg'])
return response['json'].get('displayName', '')


def export_glossary_entries(
entries: List[Dict[str, Any]],
categories_csv: str,
terms_csv: str,
project: str,
):
"""Exports the glossary entries to a CSV file.

Args:
entries: The list of entries to export.
categories_csv: The path to the CSV file to export the categories data.
terms_csv: The path to the CSV file to export the terms data.
project: The Google Cloud Project ID.
"""
categories_fields = [
'category_display_name',
'description',
'steward',
'belongs_to_category',
]
terms_fields = [
'term_display_name',
'description',
'steward',
'tagged_assets',
'synonyms',
'related_terms',
'belongs_to_category',
]

with (
open(categories_csv, mode='w', newline='') as categories_file,
open(terms_csv, mode='w', newline='') as terms_file,
):
categories_writer = csv.DictWriter(
categories_file, fieldnames=categories_fields, quoting=csv.QUOTE_ALL
)
terms_writer = csv.DictWriter(
terms_file, fieldnames=terms_fields, quoting=csv.QUOTE_ALL
)

for entry in entries:
entry_info = fetch_entry_info(entry['name'], project)
entry_type = entry_info.get('entryType', '')
display_name = entry_info.get('displayName', '')

# Initialize core aspects and json content
core_aspects = entry_info.get('coreAspects', {})
business_context = core_aspects.get('business_context', {})
business_context = business_context.get('jsonContent', {})

# Extract description and stewards
description = business_context.get('description', '')
stewards = ', '.join(business_context.get('contacts', []))

# Fetch relationships
relationships = fetch_relationships(entry_info['name'], project)
belongs_to_category = ''
synonyms = ''
related_terms = ''

for rel in relationships:
if rel['relationshipType'] == 'belongs_to':
belongs_to_category = get_entry_display_name(
rel['destinationEntryName'], project
)
elif rel['relationshipType'] == 'is_synonymous_to':
synonyms += (
get_entry_display_name(rel['destinationEntryName'], project)
+ ', '
)
elif rel['relationshipType'] == 'is_related_to':
related_terms += (
get_entry_display_name(rel['destinationEntryName'], project)
+ ', '
)

synonyms = synonyms.rstrip(', ')
related_terms = related_terms.rstrip(', ')

if entry_type == 'glossary_term':
terms_writer.writerow({
'term_display_name': display_name,
'description': description,
'steward': stewards,
'tagged_assets': '',
'synonyms': synonyms,
'related_terms': related_terms,
'belongs_to_category': belongs_to_category,
})
elif entry_type == 'glossary_category':
categories_writer.writerow({
'category_display_name': display_name,
'description': description,
'steward': stewards,
'belongs_to_category': belongs_to_category,
})


def main():
args = utils.get_export_arguments()
utils.validate_export_args(args)

try:
dc_glossary.Glossary(
glossary_identification.GlossaryId(
project_id=args.project,
location=args.location,
entry_group=args.group,
glossary_id=args.glossary,
)
)
except ValueError as e:
logger.error(
"Can't proceed with export. Please select a valid glossary.", e
)
sys.exit(1)
entries = fetch_entries(args.project, args.location, args.group)
export_glossary_entries(
entries, args.categories_csv, args.terms_csv, args.project
)


if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,25 @@ def __repr__(self):

def _generate_category_id(self):
"""Unique glossary category ID."""
random_length = 7
max_term_id_length = 64

if not self.display_name:
return ""
infix = re.sub(r"[^a-zA-Z0-9_]", "_", self.display_name).lower()
prefix = "_" if infix[0].isdigit() else ""
suffix = "".join(
random.choices(string.ascii_lowercase + string.digits, k=7)

max_length = min(len(self.display_name), max_term_id_length - random_length)
unique_id = re.sub(r"[^a-zA-Z0-9_]", "_", self.display_name).lower()[
:max_length
]

unique_id = unique_id + "".join(
random.choices(string.ascii_lowercase + string.digits, k=random_length)
)
return f"{prefix}{infix}{suffix}"

if unique_id[0].isdigit():
unique_id = "_" + unique_id

return unique_id[:max_term_id_length]

@classmethod
def from_dict(cls, entry: dict[str, Any]) -> Category | None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,24 @@ def __repr__(self):

def _generate_term_id(self):
"""Unique glossary term ID."""
random_length = 7
max_term_id_length = 64

if not self.display_name:
return ""
infix = re.sub(r"[^a-zA-Z0-9_]", "_", self.display_name).lower()
prefix = "_" if infix[0].isdigit() else ""
suffix = "".join(
random.choices(string.ascii_lowercase + string.digits, k=7)

max_length = min(len(self.display_name), max_term_id_length - random_length)
unique_id = re.sub(r"[^a-zA-Z0-9_]", "_", self.display_name).lower()[
:max_length
]

unique_id = unique_id + "".join(
random.choices(string.ascii_lowercase + string.digits, k=random_length)
)
return f"{prefix}{infix}{suffix}"

if unique_id[0].isdigit():
unique_id = "_" + unique_id
return unique_id[:max_term_id_length]

@classmethod
def from_dict(cls, entry: dict[str, Any]) -> Term | None:
Expand Down
Loading