Skip to content

PoC: Gzip-base64 encoding for aspect values #13360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/emitter/aspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@

JSON_CONTENT_TYPE = "application/json"
JSON_PATCH_CONTENT_TYPE = "application/json-patch+json"
GZIP_JSON_CONTENT_TYPE = "application/json+gzip"
128 changes: 99 additions & 29 deletions metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import base64
import dataclasses
import gzip
import json
import os
import warnings
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Tuple, Union

from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE
from datahub.emitter.aspect import ASPECT_MAP, GZIP_JSON_CONTENT_TYPE, JSON_CONTENT_TYPE
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.errors import DataHubDeprecationWarning
from datahub.metadata.schema_classes import (
Expand All @@ -23,13 +26,26 @@

_ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET"

DEFAULT_USE_GZIP = os.environ.get("DATAHUB_USE_GZIP_ENCODING", "true").lower() == "true"

def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:
GZIP_COMPRESSED_ASPECTS = {"schemaMetadata"}


def _make_generic_aspect(codegen_obj: DictWrapper, use_gzip: bool = DEFAULT_USE_GZIP) -> GenericAspectClass:
serialized = json.dumps(pre_json_transform(codegen_obj.to_obj()))
return GenericAspectClass(
value=serialized.encode(),
contentType=JSON_CONTENT_TYPE,
)
if use_gzip:
# Compress the data and then base64 encode it for safe JSON serialization
compressed = gzip.compress(serialized.encode())
base64_encoded = base64.b64encode(compressed)
return GenericAspectClass(
value=base64_encoded,
contentType=GZIP_JSON_CONTENT_TYPE,
)
else:
return GenericAspectClass(
value=serialized.encode(),
contentType=JSON_CONTENT_TYPE,
)


def _try_from_generic_aspect(
Expand All @@ -43,15 +59,30 @@ def _try_from_generic_aspect(
return True, None
assert aspectName is not None, "aspectName must be set if aspect is set"

if aspect.contentType != JSON_CONTENT_TYPE:
if aspect.contentType not in [JSON_CONTENT_TYPE, GZIP_JSON_CONTENT_TYPE]:
return False, None

if aspectName not in ASPECT_MAP:
return False, None

aspect_cls = ASPECT_MAP[aspectName]

serialized = aspect.value.decode()
if aspect.contentType == GZIP_JSON_CONTENT_TYPE:
# TODO: can we avoid repeating check for aspect.value? According to the schema it should be always bytes...
if isinstance(aspect.value, str):
binary_data = base64.b64decode(aspect.value)
decompressed = gzip.decompress(binary_data)
serialized = decompressed.decode()
else:
decompressed = gzip.decompress(aspect.value)
serialized = decompressed.decode()
else:
# Handle standard JSON content
if isinstance(aspect.value, bytes):
serialized = aspect.value.decode()
else:
serialized = aspect.value

obj = post_json_transform(json.loads(serialized))

return True, aspect_cls.from_obj(obj)
Expand All @@ -72,6 +103,7 @@ class MetadataChangeProposalWrapper:
aspect: Union[None, _Aspect] = None
systemMetadata: Union[None, SystemMetadataClass] = None
headers: Union[None, Dict[str, str]] = None
use_gzip: bool = DEFAULT_USE_GZIP

def __post_init__(self) -> None:
if self.entityUrn and self.entityType == _ENTITY_TYPE_UNSET:
Expand Down Expand Up @@ -112,9 +144,9 @@ def __post_init__(self) -> None:

@classmethod
def construct_many(
cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]]
cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]], use_gzip: bool = DEFAULT_USE_GZIP
) -> List["MetadataChangeProposalWrapper"]:
return [cls(entityUrn=entityUrn, aspect=aspect) for aspect in aspects if aspect]
return [cls(entityUrn=entityUrn, aspect=aspect, use_gzip=use_gzip) for aspect in aspects if aspect]

def _make_mcp_without_aspects(self) -> MetadataChangeProposalClass:
return MetadataChangeProposalClass(
Expand All @@ -127,14 +159,20 @@ def _make_mcp_without_aspects(self) -> MetadataChangeProposalClass:
headers=self.headers,
)

def make_mcp(self) -> MetadataChangeProposalClass:
def make_mcp(self, use_gzip: bool = None) -> MetadataChangeProposalClass:
if use_gzip is None:
use_gzip = self.use_gzip

serializedEntityKeyAspect: Union[None, GenericAspectClass] = None
if isinstance(self.entityKeyAspect, DictWrapper):
serializedEntityKeyAspect = _make_generic_aspect(self.entityKeyAspect)
serializedEntityKeyAspect = _make_generic_aspect(self.entityKeyAspect, False)

serializedAspect = None
if self.aspect is not None:
serializedAspect = _make_generic_aspect(self.aspect)
# Only compress specific aspects (for testing)
aspect_name = self.aspect.get_aspect_name()
should_compress = use_gzip and aspect_name in GZIP_COMPRESSED_ASPECTS
serializedAspect = _make_generic_aspect(self.aspect, should_compress)

mcp = self._make_mcp_without_aspects()
mcp.entityKeyAspect = serializedEntityKeyAspect
Expand All @@ -157,19 +195,29 @@ def validate(self) -> bool:
return False
return True

def to_obj(self, tuples: bool = False, simplified_structure: bool = False) -> dict:
def to_obj(self, tuples: bool = False, simplified_structure: bool = False, use_gzip: bool = None) -> dict:
# The simplified_structure parameter is used to make the output
# not contain nested JSON strings. Instead, it unpacks the JSON
# string into an object.

obj = self.make_mcp().to_obj(tuples=tuples)
obj = self.make_mcp(use_gzip=use_gzip).to_obj(tuples=tuples)
if simplified_structure:
# Undo the double JSON serialization that happens in the MCP aspect.
if (
obj.get("aspect")
and obj["aspect"].get("contentType") == JSON_CONTENT_TYPE
):
obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])}
if obj.get("aspect"):
content_type = obj["aspect"].get("contentType")
if content_type == JSON_CONTENT_TYPE:
obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])}
elif content_type == GZIP_JSON_CONTENT_TYPE:
# For gzipped content, we need to base64 decode, decompress, then load
# the value should be a base64 encoded string
value = obj["aspect"]["value"]
if isinstance(value, str):
binary_data = base64.b64decode(value)
decompressed = gzip.decompress(binary_data)
obj["aspect"] = {"json": json.loads(decompressed.decode('utf-8'))}
else:
decompressed = gzip.decompress(value)
obj["aspect"] = {"json": json.loads(decompressed.decode('utf-8'))}
return obj

@classmethod
Expand All @@ -185,23 +233,38 @@ def from_obj(
# Redo the double JSON serialization so that the rest of deserialization
# routine works.
if obj.get("aspect") and obj["aspect"].get("json"):
obj["aspect"] = {
"contentType": JSON_CONTENT_TYPE,
"value": json.dumps(obj["aspect"]["json"]),
}
content_type = obj["aspect"].get("contentType", JSON_CONTENT_TYPE)

if content_type == GZIP_JSON_CONTENT_TYPE:
json_str = json.dumps(obj["aspect"]["json"])
compressed = gzip.compress(json_str.encode('utf-8'))
base64_encoded = base64.b64encode(compressed).decode('ascii')
obj["aspect"] = {
"contentType": GZIP_JSON_CONTENT_TYPE,
"value": base64_encoded,
}
else:
obj["aspect"] = {
"contentType": JSON_CONTENT_TYPE,
"value": json.dumps(obj["aspect"]["json"]),
}

mcpc = MetadataChangeProposalClass.from_obj(obj, tuples=tuples)

# We don't know how to deserialize the entity key aspects yet.
if mcpc.entityKeyAspect is not None:
return mcpc

# Try to deserialize the aspect.
return cls.try_from_mcpc(mcpc) or mcpc
if obj.get("aspect") and obj["aspect"].get("contentType") == GZIP_JSON_CONTENT_TYPE:
use_gzip = True
else:
use_gzip = DEFAULT_USE_GZIP

return cls.try_from_mcpc(mcpc, use_gzip=use_gzip) or mcpc

@classmethod
def try_from_mcpc(
cls, mcpc: MetadataChangeProposalClass
cls, mcpc: MetadataChangeProposalClass, use_gzip: bool = DEFAULT_USE_GZIP
) -> Optional["MetadataChangeProposalWrapper"]:
"""Attempts to create a MetadataChangeProposalWrapper from a MetadataChangeProposalClass.
Neatly handles unsupported, expected cases, such as unknown aspect types or non-json content type.
Expand All @@ -216,6 +279,12 @@ def try_from_mcpc(

converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect)
if converted:
# Determine if the source was using gzip based on content type
if mcpc.aspect and mcpc.aspect.contentType == GZIP_JSON_CONTENT_TYPE:
inferred_use_gzip = True
else:
inferred_use_gzip = use_gzip

return cls(
entityType=mcpc.entityType,
entityUrn=mcpc.entityUrn,
Expand All @@ -225,13 +294,14 @@ def try_from_mcpc(
aspect=aspect,
systemMetadata=mcpc.systemMetadata,
headers=mcpc.headers,
use_gzip=inferred_use_gzip,
)
else:
return None

@classmethod
def try_from_mcl(
cls, mcl: MetadataChangeLogClass
cls, mcl: MetadataChangeLogClass, use_gzip: bool = DEFAULT_USE_GZIP
) -> Union["MetadataChangeProposalWrapper", MetadataChangeProposalClass]:
mcpc = MetadataChangeProposalClass(
entityUrn=mcl.entityUrn,
Expand All @@ -244,7 +314,7 @@ def try_from_mcl(
systemMetadata=mcl.systemMetadata,
headers=mcl.headers,
)
return cls.try_from_mcpc(mcpc) or mcpc
return cls.try_from_mcpc(mcpc, use_gzip=use_gzip) or mcpc

@classmethod
def from_obj_require_wrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ def emit(self) -> Iterable[SchemaField]:
# bare logicalType
or self._actual_schema.props.get("logicalType"),
)
json_props: Optional[Dict[str, Any]] = {k: v for k, v in merged_props.items() if k not in ['_nullable', 'native_data_type']} if merged_props else None

field = SchemaField(
fieldPath=field_path,
Expand All @@ -421,7 +422,7 @@ def emit(self) -> Iterable[SchemaField]:
isPartOfKey=self._converter._is_key_schema,
globalTags=tags_aspect,
glossaryTerms=meta_terms_aspect,
jsonProps=json.dumps(merged_props) if merged_props else None,
jsonProps=json.dumps(json_props) if json_props else None,
)
yield field

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ def struct(
"type": "record",
"name": self._gen_name("__struct_"),
"fields": field_results,
"native_data_type": str(struct),
"native_data_type": "struct",
"_nullable": nullable,
}

Expand All @@ -673,7 +673,7 @@ def list(
return {
"type": "array",
"items": element_result,
"native_data_type": str(list_type),
"native_data_type": "list",
"_nullable": not list_type.element_required,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
@Accessors(chain = true)
public class IgnoreUnknownMutator extends MutationHook {
private static final Set<String> SUPPORTED_MIME_TYPES =
Set.of("application/json", "application/json-patch+json");
Set.of("application/json", "application/json-patch+json", "application/json+gzip");
private static final Set<ChangeType> MUTATION_TYPES =
Set.of(CREATE, CREATE_ENTITY, UPSERT, UPDATE);

Expand Down
Loading
Loading