diff --git a/metadata-ingestion/src/datahub/emitter/aspect.py b/metadata-ingestion/src/datahub/emitter/aspect.py index 0be2b3336980cb..094006b4f76a3b 100644 --- a/metadata-ingestion/src/datahub/emitter/aspect.py +++ b/metadata-ingestion/src/datahub/emitter/aspect.py @@ -14,3 +14,4 @@ JSON_CONTENT_TYPE = "application/json" JSON_PATCH_CONTENT_TYPE = "application/json-patch+json" +GZIP_JSON_CONTENT_TYPE = "application/json+gzip" diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index 12ca39d13691ad..d3e802da722064 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -1,9 +1,12 @@ +import base64 import dataclasses +import gzip import json +import os import warnings from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Tuple, Union -from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE +from datahub.emitter.aspect import ASPECT_MAP, GZIP_JSON_CONTENT_TYPE, JSON_CONTENT_TYPE from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.errors import DataHubDeprecationWarning from datahub.metadata.schema_classes import ( @@ -23,13 +26,26 @@ _ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET" +DEFAULT_USE_GZIP = os.environ.get("DATAHUB_USE_GZIP_ENCODING", "true").lower() == "true" -def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass: +GZIP_COMPRESSED_ASPECTS = {"schemaMetadata"} + + +def _make_generic_aspect(codegen_obj: DictWrapper, use_gzip: bool = DEFAULT_USE_GZIP) -> GenericAspectClass: serialized = json.dumps(pre_json_transform(codegen_obj.to_obj())) - return GenericAspectClass( - value=serialized.encode(), - contentType=JSON_CONTENT_TYPE, - ) + if use_gzip: + # Compress the data and then base64 encode it for safe JSON serialization + compressed = gzip.compress(serialized.encode()) + base64_encoded = base64.b64encode(compressed) + return GenericAspectClass( + value=base64_encoded, + contentType=GZIP_JSON_CONTENT_TYPE, + ) + else: + return GenericAspectClass( + value=serialized.encode(), + contentType=JSON_CONTENT_TYPE, + ) def _try_from_generic_aspect( @@ -43,7 +59,7 @@ def _try_from_generic_aspect( return True, None assert aspectName is not None, "aspectName must be set if aspect is set" - if aspect.contentType != JSON_CONTENT_TYPE: + if aspect.contentType not in [JSON_CONTENT_TYPE, GZIP_JSON_CONTENT_TYPE]: return False, None if aspectName not in ASPECT_MAP: @@ -51,7 +67,22 @@ def _try_from_generic_aspect( aspect_cls = ASPECT_MAP[aspectName] - serialized = aspect.value.decode() + if aspect.contentType == GZIP_JSON_CONTENT_TYPE: + # TODO: can we avoid repeating check for aspect.value? According to the schema it should be always bytes... + if isinstance(aspect.value, str): + binary_data = base64.b64decode(aspect.value) + decompressed = gzip.decompress(binary_data) + serialized = decompressed.decode() + else: + decompressed = gzip.decompress(aspect.value) + serialized = decompressed.decode() + else: + # Handle standard JSON content + if isinstance(aspect.value, bytes): + serialized = aspect.value.decode() + else: + serialized = aspect.value + obj = post_json_transform(json.loads(serialized)) return True, aspect_cls.from_obj(obj) @@ -72,6 +103,7 @@ class MetadataChangeProposalWrapper: aspect: Union[None, _Aspect] = None systemMetadata: Union[None, SystemMetadataClass] = None headers: Union[None, Dict[str, str]] = None + use_gzip: bool = DEFAULT_USE_GZIP def __post_init__(self) -> None: if self.entityUrn and self.entityType == _ENTITY_TYPE_UNSET: @@ -112,9 +144,9 @@ def __post_init__(self) -> None: @classmethod def construct_many( - cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]] + cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]], use_gzip: bool = DEFAULT_USE_GZIP ) -> List["MetadataChangeProposalWrapper"]: - return [cls(entityUrn=entityUrn, aspect=aspect) for aspect in aspects if aspect] + return [cls(entityUrn=entityUrn, aspect=aspect, use_gzip=use_gzip) for aspect in aspects if aspect] def _make_mcp_without_aspects(self) -> MetadataChangeProposalClass: return MetadataChangeProposalClass( @@ -127,14 +159,20 @@ def _make_mcp_without_aspects(self) -> MetadataChangeProposalClass: headers=self.headers, ) - def make_mcp(self) -> MetadataChangeProposalClass: + def make_mcp(self, use_gzip: bool = None) -> MetadataChangeProposalClass: + if use_gzip is None: + use_gzip = self.use_gzip + serializedEntityKeyAspect: Union[None, GenericAspectClass] = None if isinstance(self.entityKeyAspect, DictWrapper): - serializedEntityKeyAspect = _make_generic_aspect(self.entityKeyAspect) + serializedEntityKeyAspect = _make_generic_aspect(self.entityKeyAspect, False) serializedAspect = None if self.aspect is not None: - serializedAspect = _make_generic_aspect(self.aspect) + # Only compress specific aspects (for testing) + aspect_name = self.aspect.get_aspect_name() + should_compress = use_gzip and aspect_name in GZIP_COMPRESSED_ASPECTS + serializedAspect = _make_generic_aspect(self.aspect, should_compress) mcp = self._make_mcp_without_aspects() mcp.entityKeyAspect = serializedEntityKeyAspect @@ -157,19 +195,29 @@ def validate(self) -> bool: return False return True - def to_obj(self, tuples: bool = False, simplified_structure: bool = False) -> dict: + def to_obj(self, tuples: bool = False, simplified_structure: bool = False, use_gzip: bool = None) -> dict: # The simplified_structure parameter is used to make the output # not contain nested JSON strings. Instead, it unpacks the JSON # string into an object. - obj = self.make_mcp().to_obj(tuples=tuples) + obj = self.make_mcp(use_gzip=use_gzip).to_obj(tuples=tuples) if simplified_structure: # Undo the double JSON serialization that happens in the MCP aspect. - if ( - obj.get("aspect") - and obj["aspect"].get("contentType") == JSON_CONTENT_TYPE - ): - obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])} + if obj.get("aspect"): + content_type = obj["aspect"].get("contentType") + if content_type == JSON_CONTENT_TYPE: + obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])} + elif content_type == GZIP_JSON_CONTENT_TYPE: + # For gzipped content, we need to base64 decode, decompress, then load + # the value should be a base64 encoded string + value = obj["aspect"]["value"] + if isinstance(value, str): + binary_data = base64.b64decode(value) + decompressed = gzip.decompress(binary_data) + obj["aspect"] = {"json": json.loads(decompressed.decode('utf-8'))} + else: + decompressed = gzip.decompress(value) + obj["aspect"] = {"json": json.loads(decompressed.decode('utf-8'))} return obj @classmethod @@ -185,10 +233,21 @@ def from_obj( # Redo the double JSON serialization so that the rest of deserialization # routine works. if obj.get("aspect") and obj["aspect"].get("json"): - obj["aspect"] = { - "contentType": JSON_CONTENT_TYPE, - "value": json.dumps(obj["aspect"]["json"]), - } + content_type = obj["aspect"].get("contentType", JSON_CONTENT_TYPE) + + if content_type == GZIP_JSON_CONTENT_TYPE: + json_str = json.dumps(obj["aspect"]["json"]) + compressed = gzip.compress(json_str.encode('utf-8')) + base64_encoded = base64.b64encode(compressed).decode('ascii') + obj["aspect"] = { + "contentType": GZIP_JSON_CONTENT_TYPE, + "value": base64_encoded, + } + else: + obj["aspect"] = { + "contentType": JSON_CONTENT_TYPE, + "value": json.dumps(obj["aspect"]["json"]), + } mcpc = MetadataChangeProposalClass.from_obj(obj, tuples=tuples) @@ -196,12 +255,16 @@ def from_obj( if mcpc.entityKeyAspect is not None: return mcpc - # Try to deserialize the aspect. - return cls.try_from_mcpc(mcpc) or mcpc + if obj.get("aspect") and obj["aspect"].get("contentType") == GZIP_JSON_CONTENT_TYPE: + use_gzip = True + else: + use_gzip = DEFAULT_USE_GZIP + + return cls.try_from_mcpc(mcpc, use_gzip=use_gzip) or mcpc @classmethod def try_from_mcpc( - cls, mcpc: MetadataChangeProposalClass + cls, mcpc: MetadataChangeProposalClass, use_gzip: bool = DEFAULT_USE_GZIP ) -> Optional["MetadataChangeProposalWrapper"]: """Attempts to create a MetadataChangeProposalWrapper from a MetadataChangeProposalClass. Neatly handles unsupported, expected cases, such as unknown aspect types or non-json content type. @@ -216,6 +279,12 @@ def try_from_mcpc( converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect) if converted: + # Determine if the source was using gzip based on content type + if mcpc.aspect and mcpc.aspect.contentType == GZIP_JSON_CONTENT_TYPE: + inferred_use_gzip = True + else: + inferred_use_gzip = use_gzip + return cls( entityType=mcpc.entityType, entityUrn=mcpc.entityUrn, @@ -225,13 +294,14 @@ def try_from_mcpc( aspect=aspect, systemMetadata=mcpc.systemMetadata, headers=mcpc.headers, + use_gzip=inferred_use_gzip, ) else: return None @classmethod def try_from_mcl( - cls, mcl: MetadataChangeLogClass + cls, mcl: MetadataChangeLogClass, use_gzip: bool = DEFAULT_USE_GZIP ) -> Union["MetadataChangeProposalWrapper", MetadataChangeProposalClass]: mcpc = MetadataChangeProposalClass( entityUrn=mcl.entityUrn, @@ -244,7 +314,7 @@ def try_from_mcl( systemMetadata=mcl.systemMetadata, headers=mcl.headers, ) - return cls.try_from_mcpc(mcpc) or mcpc + return cls.try_from_mcpc(mcpc, use_gzip=use_gzip) or mcpc @classmethod def from_obj_require_wrapper( diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py index 545b88c8b8fc34..625960a13dfd74 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/schema_util.py @@ -406,6 +406,7 @@ def emit(self) -> Iterable[SchemaField]: # bare logicalType or self._actual_schema.props.get("logicalType"), ) + json_props: Optional[Dict[str, Any]] = {k: v for k, v in merged_props.items() if k not in ['_nullable', 'native_data_type']} if merged_props else None field = SchemaField( fieldPath=field_path, @@ -421,7 +422,7 @@ def emit(self) -> Iterable[SchemaField]: isPartOfKey=self._converter._is_key_schema, globalTags=tags_aspect, glossaryTerms=meta_terms_aspect, - jsonProps=json.dumps(merged_props) if merged_props else None, + jsonProps=json.dumps(json_props) if json_props else None, ) yield field diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index 5cafdec01573db..3bf0922572e696 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -655,7 +655,7 @@ def struct( "type": "record", "name": self._gen_name("__struct_"), "fields": field_results, - "native_data_type": str(struct), + "native_data_type": "struct", "_nullable": nullable, } @@ -673,7 +673,7 @@ def list( return { "type": "array", "items": element_result, - "native_data_type": str(list_type), + "native_data_type": "list", "_nullable": not list_type.element_required, } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java index f5cc421042e368..f89b821f096594 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java @@ -35,7 +35,7 @@ @Accessors(chain = true) public class IgnoreUnknownMutator extends MutationHook { private static final Set SUPPORTED_MIME_TYPES = - Set.of("application/json", "application/json-patch+json"); + Set.of("application/json", "application/json-patch+json", "application/json+gzip"); private static final Set MUTATION_TYPES = Set.of(CREATE, CREATE_ENTITY, UPSERT, UPDATE); diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java index 724a66f7fe901d..f05b6853b002cc 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java @@ -28,19 +28,26 @@ import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.GenericPayload; import jakarta.json.JsonPatch; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Base64; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import javax.annotation.Nonnull; public class GenericRecordUtils { public static final String JSON = "application/json"; public static final String JSON_PATCH = "application/json-patch+json"; + public static final String JSON_GZIP = "application/json+gzip"; private GenericRecordUtils() {} @@ -69,24 +76,73 @@ public static RecordTemplate deserializeAspect( return deserializeAspect(aspectValue, contentType, aspectSpec.getDataTemplateClass()); } + /** + * Decompresses and decodes base64 gzipped data. + * + * @param base64GzippedData The base64 encoded, gzipped data + * @return The decompressed string + * @throws IOException if decompression fails + */ + private static String decompressGzippedBase64String(String base64GzippedData) throws IOException { + // Decode from Base64 + byte[] decodedBytes = Base64.getDecoder().decode(base64GzippedData); + + // Set up the GZIP streams + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(decodedBytes); + GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + // Decompress the data + byte[] buffer = new byte[1024]; + int len; + while ((len = gzipInputStream.read(buffer)) > 0) { + byteArrayOutputStream.write(buffer, 0, len); + } + + // Clean up + gzipInputStream.close(); + byteArrayOutputStream.close(); + + // Return the decompressed string + return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name()); + } + @Nonnull public static T deserializeAspect( @Nonnull ByteString aspectValue, @Nonnull String contentType, @Nonnull Class clazz) { - if (!contentType.equals(JSON)) { + if (contentType.equals(JSON)) { + return RecordUtils.toRecordTemplate(clazz, aspectValue.asString(StandardCharsets.UTF_8)); + } else if (contentType.equals(JSON_GZIP)) { + try { + // For gzipped content, we first need to base64 decode and then decompress + String decompressedJson = decompressGzippedBase64String(aspectValue.asString(StandardCharsets.UTF_8)); + return RecordUtils.toRecordTemplate(clazz, decompressedJson); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to decompress gzipped aspect value", e); + } + } else { throw new IllegalArgumentException( String.format("%s content type is not supported", contentType)); } - return RecordUtils.toRecordTemplate(clazz, aspectValue.asString(StandardCharsets.UTF_8)); } @Nonnull public static T deserializePayload( @Nonnull ByteString payloadValue, @Nonnull String contentType, @Nonnull Class clazz) { - if (!contentType.equals(JSON)) { + if (contentType.equals(JSON)) { + return RecordUtils.toRecordTemplate(clazz, payloadValue.asString(StandardCharsets.UTF_8)); + } else if (contentType.equals(JSON_GZIP)) { + try { + // For gzipped content, we first need to base64 decode and then decompress + String decompressedJson = decompressGzippedBase64String(payloadValue.asString(StandardCharsets.UTF_8)); + return RecordUtils.toRecordTemplate(clazz, decompressedJson); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to decompress gzipped payload value", e); + } + } else { throw new IllegalArgumentException( String.format("%s content type is not supported", contentType)); } - return RecordUtils.toRecordTemplate(clazz, payloadValue.asString(StandardCharsets.UTF_8)); } @Nonnull @@ -95,6 +151,24 @@ public static T deserializePayload( return deserializePayload(payloadValue, JSON, clazz); } + /** + * Compresses and base64-encodes a string. + * + * @param uncompressedData The string to compress + * @return Base64-encoded compressed data + * @throws IOException if compression fails + */ + private static String compressAndBase64EncodeString(String uncompressedData) throws IOException { + // Compress the data + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); + gzipOutputStream.write(uncompressedData.getBytes(StandardCharsets.UTF_8)); + gzipOutputStream.close(); + + // Base64 encode the compressed data + return Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray()); + } + @Nonnull public static GenericAspect serializeAspect(@Nonnull RecordTemplate aspect) { return serializeAspect(RecordUtils.toJsonString(aspect)); @@ -107,6 +181,26 @@ public static GenericAspect serializeAspect(@Nonnull String str) { genericAspect.setContentType(GenericRecordUtils.JSON); return genericAspect; } + + @Nonnull + public static GenericAspect serializeGzippedAspect(@Nonnull RecordTemplate aspect) { + return serializeGzippedAspect(RecordUtils.toJsonString(aspect)); + } + + @Nonnull + public static GenericAspect serializeGzippedAspect(@Nonnull String str) { + GenericAspect genericAspect = new GenericAspect(); + try { + String compressedBase64 = compressAndBase64EncodeString(str); + genericAspect.setValue(ByteString.unsafeWrap(compressedBase64.getBytes(StandardCharsets.UTF_8))); + genericAspect.setContentType(GenericRecordUtils.JSON_GZIP); + } catch (IOException e) { + // Fallback to standard JSON if compression fails + genericAspect.setValue(ByteString.unsafeWrap(str.getBytes(StandardCharsets.UTF_8))); + genericAspect.setContentType(GenericRecordUtils.JSON); + } + return genericAspect; + } @Nonnull public static GenericAspect serializeAspect(@Nonnull JsonNode json) { @@ -115,6 +209,11 @@ public static GenericAspect serializeAspect(@Nonnull JsonNode json) { genericAspect.setContentType(GenericRecordUtils.JSON); return genericAspect; } + + @Nonnull + public static GenericAspect serializeGzippedAspect(@Nonnull JsonNode json) { + return serializeGzippedAspect(json.toString()); + } @Nonnull public static GenericAspect serializePatch(@Nonnull JsonPatch jsonPatch) { diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/PegasusUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/PegasusUtils.java index cde83c1382283a..4b2dd56a21b0ba 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/PegasusUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/PegasusUtils.java @@ -20,6 +20,10 @@ /** Static utility class providing methods for extracting entity metadata from Pegasus models. */ @Slf4j public class PegasusUtils { + + private static final boolean USE_GZIP_ENCODING = + System.getenv("DATAHUB_USE_GZIP_ENCODING") == null || + "true".equalsIgnoreCase(System.getenv("DATAHUB_USE_GZIP_ENCODING")); private PegasusUtils() {} @@ -84,6 +88,19 @@ public static String urnToEntityName(final Urn urn) { return urn.getEntityType(); } + private static final java.util.Set COMPRESSED_ASPECTS = + java.util.Collections.singleton("schemaMetadata"); + + + private static boolean shouldCompressAspect(String aspectName) { + + if (!USE_GZIP_ENCODING) { + return false; + } + + return COMPRESSED_ASPECTS.contains(aspectName); + } + public static MetadataChangeLog constructMCL( @Nullable MetadataChangeProposal base, String entityName, @@ -106,18 +123,35 @@ public static MetadataChangeLog constructMCL( metadataChangeLog.setChangeType(changeType); metadataChangeLog.setAspectName(aspectName); metadataChangeLog.setCreated(auditStamp); + + boolean shouldCompress = shouldCompressAspect(aspectName); + if (newAspectValue != null) { - metadataChangeLog.setAspect(GenericRecordUtils.serializeAspect(newAspectValue)); + if (shouldCompress) { + log.debug("Using GZIP compression for aspect: {}", aspectName); + metadataChangeLog.setAspect(GenericRecordUtils.serializeGzippedAspect(newAspectValue)); + } else { + metadataChangeLog.setAspect(GenericRecordUtils.serializeAspect(newAspectValue)); + } } + if (newSystemMetadata != null) { metadataChangeLog.setSystemMetadata(newSystemMetadata); } + if (oldAspectValue != null) { - metadataChangeLog.setPreviousAspectValue(GenericRecordUtils.serializeAspect(oldAspectValue)); + if (shouldCompress) { + log.debug("Using GZIP compression for previous aspect: {}", aspectName); + metadataChangeLog.setPreviousAspectValue(GenericRecordUtils.serializeGzippedAspect(oldAspectValue)); + } else { + metadataChangeLog.setPreviousAspectValue(GenericRecordUtils.serializeAspect(oldAspectValue)); + } } + if (oldSystemMetadata != null) { metadataChangeLog.setPreviousSystemMetadata(oldSystemMetadata); } + return metadataChangeLog; } }