Skip to main content

Patch Semantics

Why Would You Use Patch

By default, ingestion does a full upsert at the aspect level. This means that when you want to change one field progrmamatically you need to do a read-modify-write to not overwrite existing fields. With patch based semantics for proposals, targeted changes to single fields or values within arrays of fields are possible without impacting other existing metadata.

Examples of patch

# Inlined from /metadata-ingestion/examples/library/dataset_add_properties.py
import logging
from typing import Union

from datahub.configuration.kafka import KafkaProducerConnectionConfig
from datahub.emitter.kafka_emitter import DatahubKafkaEmitter, KafkaEmitterConfig
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DataHubRestEmitter
from datahub.specific.dataset import DatasetPatchBuilder

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


# Get an emitter, either REST or Kafka, this example shows you both
def get_emitter() -> Union[DataHubRestEmitter, DatahubKafkaEmitter]:
USE_REST_EMITTER = True
if USE_REST_EMITTER:
gms_endpoint = "http://localhost:8080"
return DataHubRestEmitter(gms_server=gms_endpoint)
else:
kafka_server = "localhost:9092"
schema_registry_url = "http://localhost:8081"
return DatahubKafkaEmitter(
config=KafkaEmitterConfig(
connection=KafkaProducerConnectionConfig(
bootstrap=kafka_server, schema_registry_url=schema_registry_url
)
)
)


dataset_urn = make_dataset_urn(platform="hive", name="fct_users_created", env="PROD")

with get_emitter() as emitter:
for patch_mcp in (
DatasetPatchBuilder(dataset_urn)
.add_custom_property("cluster_name", "datahubproject.acryl.io")
.add_custom_property("retention_time", "2 years")
.build()
):
emitter.emit(patch_mcp)


log.info(f"Added cluster_name, retention_time properties to dataset {dataset_urn}")