Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(system): support conditional write semantics #10868

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ record MetadataChangeProposal {
**/
systemMetadata: optional SystemMetadata

/**
* Headers - intended to mimic http headers
*/
headers: optional map[string, string]
}
```

Each proposal comprises of the following:
Each proposal is comprised of the following:

1. entityType

Expand All @@ -82,12 +86,13 @@ Each proposal comprises of the following:
Type of change you are proposing: one of

- UPSERT: Insert if not exists, update otherwise
- CREATE: Insert if not exists, fail otherwise
- CREATE: Insert aspect if not exists, fail otherwise
- CREATE_ENTITY: Insert if entity does not exist, fail otherwise
- UPDATE: Update if exists, fail otherwise
- DELETE: Delete
- PATCH: Patch the aspect instead of doing a full replace

Only UPSERT, CREATE, DELETE, PATCH are supported as of now.
Only UPSERT, CREATE, CREATE_ENTITY, DELETE, PATCH are supported as of now.

5. aspectName

Expand All @@ -110,6 +115,10 @@ Each proposal comprises of the following:

Extra metadata about the proposal like run_id or updated timestamp.

8. headers

Optional headers which are meant to mimic http headers. These are currently used for implementing conditional write logic.

GMS processes the proposal and produces the Metadata Change Log, which looks like this.

```protobuf
Expand Down Expand Up @@ -156,4 +165,32 @@ entities:
keyAspect: datasetKey
aspects:
- datasetProfile
```
```

## Features

david-leifker marked this conversation as resolved.
Show resolved Hide resolved
### Conditional Writes

Conditional write semantics use extra information contained in the MCP `headers` field to possibly avoid writing new aspects
if the conditions are not met.

#### If-Version-Match

david-leifker marked this conversation as resolved.
Show resolved Hide resolved
Each time an aspect is updated a `version` is incremented to represent the change to the aspect. This `version` is stored and returned
in `SystemMetadata`.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will batch or multiple aspect updates work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch endpoint was adjusted so that the entity/aspect object contains a headers Map<String, String> which will carry the information for each aspect. If any one of the aspect's conditions is violated, it will cancel the entire batch.

A writer can provide a header with the expected `version` when initiating the request. If the expected `version` does not
david-leifker marked this conversation as resolved.
Show resolved Hide resolved
match the actual `version` stored in the database, the write will fail. This prevents overwriting an aspect that has
been modified by another process.

#### If-Modified-Since / If-Unmodified-Since

david-leifker marked this conversation as resolved.
Show resolved Hide resolved
A writer may also specify time-based conditions using http header semantics. Similar to version based conditional writes
david-leifker marked this conversation as resolved.
Show resolved Hide resolved
this method can be used to prevent the write if the target aspect was modified after a reading the aspect. Per the
david-leifker marked this conversation as resolved.
Show resolved Hide resolved
http specification dates must comply with ISO-8601 standard.

`If-Unmodified-Since`:
A writer can specify that the aspect must NOT have been modified after a specific time, following [If-Unmodified-Since](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Unmodified-Since) http headers.

`If-Modified-Since`
A writer can specify that the aspect must have been modified after a specific time, following [If-Modified-Since](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Modified-Since) http headers.
9 changes: 8 additions & 1 deletion docs/api/openapi/openapi-usage-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ curl --location --request POST 'localhost:8080/openapi/entities/v1/' \
The second POST example will write the update ONLY if the entity doesn't exist. If the entity does exist the
command will return an error instead of overwriting the entity.

In this example we've added an additional URL parameter `createEntityIfNotExists=true`
In this example we've added a URL parameter `createEntityIfNotExists=true`
david-leifker marked this conversation as resolved.
Show resolved Hide resolved

```shell
curl --location --request POST 'localhost:8080/openapi/entities/v1/?createEntityIfNotExists=true' \
Expand Down Expand Up @@ -582,3 +582,10 @@ public class Main {
}
}
```

## OpenAPI v3 Features

### Conditional Writes

All the create/POST endpoints for aspects support `headers` in the POST body to support batch APIs. See the docs in the
[MetadataChangeProposal](../../advanced/mcp-mcl.md) section for the use of these headers to support conditional writes semantics.
32 changes: 32 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,38 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- Protobuf CLI will no longer create binary encoded protoc custom properties. Flag added `-protocProp` in case this
behavior is required.
- #10868 - OpenAPI V3 - Creation of aspects will need to be wrapped within a `value` key and the API is now symmetric with respect to input and outputs.

Example Global Tags Aspect:

Previous:
```json
{
"tags": [
{
"tag": "string",
"context": "string"
}
]
}
```

New (optional fields `systemMetadata` and `headers`):

```json
{
"value": {
"tags": [
{
"tag": "string",
"context": "string"
}
]
},
"systemMetadata": {},
"headers": {}
}
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

symmetric input & output is easier to work with :)


### Potential Downtime

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.aspect;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.Aspect;
Expand Down Expand Up @@ -31,6 +32,23 @@ default Aspect getLatestAspectObject(@Nonnull final Urn urn, @Nonnull final Stri
@Nonnull
Map<Urn, Map<String, Aspect>> getLatestAspectObjects(Set<Urn> urns, Set<String> aspectNames);

@Nullable
default SystemAspect getLatestSystemAspect(
@Nonnull final Urn urn, @Nonnull final String aspectName) {
return getLatestSystemAspects(ImmutableMap.of(urn, ImmutableSet.of(aspectName)))
.getOrDefault(urn, Collections.emptyMap())
.get(aspectName);
}

/**
* Returns for each URN, the map of aspectName to Aspect
*
* @param urnAspectNames urns and aspect names to fetch
* @return urn to aspect name and values
*/
@Nonnull
Map<Urn, Map<String, SystemAspect>> getLatestSystemAspects(Map<Urn, Set<String>> urnAspectNames);

@Nonnull
default Map<Urn, Boolean> entityExists(Set<Urn> urns) {
Set<String> keyAspectNames =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.linkedin.metadata.aspect;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.SystemMetadata;
import java.sql.Timestamp;
import java.time.Instant;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.Getter;

/** Delegate to restli class */
public class EnvelopedSystemAspect implements SystemAspect {

public static SystemAspect of(
@Nonnull Urn urn, @Nonnull EnvelopedAspect envelopedAspect, @Nonnull EntitySpec entitySpec) {
return new EnvelopedSystemAspect(urn, envelopedAspect, entitySpec);
}

@Getter @Nonnull private final Urn urn;
@Nonnull private final EnvelopedAspect envelopedAspect;
@Getter @Nonnull private final EntitySpec entitySpec;
@Getter @Nonnull private final AspectSpec aspectSpec;

public EnvelopedSystemAspect(
@Nonnull Urn urn, @Nonnull EnvelopedAspect envelopedAspect, @Nonnull EntitySpec entitySpec) {
this.urn = urn;
this.envelopedAspect = envelopedAspect;
this.entitySpec = entitySpec;
this.aspectSpec = this.entitySpec.getAspectSpec(envelopedAspect.getName());
}

@Nullable
@Override
public RecordTemplate getRecordTemplate() {
return envelopedAspect.getValue();
}

@Nullable
@Override
public SystemMetadata getSystemMetadata() {
return envelopedAspect.getSystemMetadata();
}

@Override
public long getVersion() {
return envelopedAspect.getVersion();
}

@Override
public Timestamp getCreatedOn() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why the updatedBy and lastUpdatedOn are not included?

Copy link
Collaborator Author

@david-leifker david-leifker Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly because createdon is an actual column in the datastore. The other concepts are not necessarily uniformly implemented for all aspects. This one is required for every single aspect because it is part of the aspect's row in the database.
The System Aspect is a closer representation to the database aspect's row, whereas the other timestamps are higher level abstractions in the non-System aspect which is defined inside the metadata and systemmetadata json strings.

return Timestamp.from(Instant.ofEpochMilli(envelopedAspect.getCreated().getTime()));
}

@Override
public String getCreatedBy() {
return envelopedAspect.getCreated().getActor().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.mxe.SystemMetadata;
import java.sql.Timestamp;
import java.util.Optional;
import javax.annotation.Nonnull;

/**
Expand All @@ -22,4 +24,16 @@ default AuditStamp getAuditStamp() {
.setActor(UrnUtils.getUrn(getCreatedBy()))
.setTime(getCreatedOn().getTime());
}

/**
* If aspect version exists in system metadata, return it
*
* @return version of the aspect
*/
default Optional<Long> getSystemMetadataVersion() {
return Optional.ofNullable(getSystemMetadata())
.filter(SystemMetadata::hasVersion)
.map(SystemMetadata::getVersion)
.map(Long::parseLong);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,42 @@
import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.mxe.MetadataChangeProposal;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/** Represents a proposal to write to the primary data store which may be represented by an MCP */
public interface MCPItem extends BatchItem {

Set<ChangeType> CHANGE_TYPES =
ImmutableSet.of(ChangeType.UPSERT, ChangeType.CREATE, ChangeType.CREATE_ENTITY);
ImmutableSet.of(
ChangeType.UPSERT, ChangeType.UPDATE, ChangeType.CREATE, ChangeType.CREATE_ENTITY);

@Nullable
MetadataChangeProposal getMetadataChangeProposal();

@Nonnull
default Map<String, String> getHeaders() {
if (getMetadataChangeProposal() != null && getMetadataChangeProposal().getHeaders() != null) {
return getMetadataChangeProposal().getHeaders();
}
return Collections.emptyMap();
}

default boolean hasHeader(@Nonnull String headerName) {
return getHeaders().keySet().stream().anyMatch(hdr -> hdr.equalsIgnoreCase(headerName));
}

default Optional<String> getHeader(@Nonnull String headerName) {
return getHeaders().entrySet().stream()
.filter(entry -> entry.getKey().equalsIgnoreCase(headerName))
.map(Map.Entry::getValue)
.findAny();
}

/**
* Validates that a change type is valid for the given aspect
*
Expand Down
Loading
Loading