Skip to content

Commit

Permalink
feat(structured-properties): soft delete
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Feb 12, 2024
1 parent 709c596 commit a10b734
Show file tree
Hide file tree
Showing 138 changed files with 4,553 additions and 2,656 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ plugins {
id 'com.gorylenko.gradle-git-properties' version '2.4.1'
id 'com.github.johnrengelman.shadow' version '8.1.1' apply false
id 'com.palantir.docker' version '0.35.0' apply false
id 'com.avast.gradle.docker-compose' version '0.17.5'
id 'com.avast.gradle.docker-compose' version '0.17.6'
id "com.diffplug.spotless" version "6.23.3"
// https://blog.ltgt.net/javax-jakarta-mess-and-gradle-solution/
// TODO id "org.gradlex.java-ecosystem-capabilities" version "1.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.MCPUpsertBatchItem;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.MetadataChangeProposal;
Expand All @@ -22,14 +22,14 @@

public class TestUtils {

public static EntityService<MCPUpsertBatchItem> getMockEntityService() {
public static EntityService<ChangeItemImpl> getMockEntityService() {
PathSpecBasedSchemaAnnotationVisitor.class
.getClassLoader()
.setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false);
EntityRegistry registry =
new ConfigEntityRegistry(TestUtils.class.getResourceAsStream("/test-entity-registry.yaml"));
EntityService<MCPUpsertBatchItem> mockEntityService =
(EntityService<MCPUpsertBatchItem>) Mockito.mock(EntityService.class);
EntityService<ChangeItemImpl> mockEntityService =
(EntityService<ChangeItemImpl>) Mockito.mock(EntityService.class);
Mockito.when(mockEntityService.getEntityRegistry()).thenReturn(registry);
return mockEntityService;
}
Expand Down Expand Up @@ -111,14 +111,14 @@ public static QueryContext getMockDenyContext(String actorUrn, AuthorizationRequ
}

public static void verifyIngestProposal(
EntityService<MCPUpsertBatchItem> mockService,
EntityService<ChangeItemImpl> mockService,
int numberOfInvocations,
MetadataChangeProposal proposal) {
verifyIngestProposal(mockService, numberOfInvocations, List.of(proposal));
}

public static void verifyIngestProposal(
EntityService<MCPUpsertBatchItem> mockService,
EntityService<ChangeItemImpl> mockService,
int numberOfInvocations,
List<MetadataChangeProposal> proposals) {
AspectsBatchImpl batch =
Expand All @@ -128,29 +128,29 @@ public static void verifyIngestProposal(
}

public static void verifySingleIngestProposal(
EntityService<MCPUpsertBatchItem> mockService,
EntityService<ChangeItemImpl> mockService,
int numberOfInvocations,
MetadataChangeProposal proposal) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations))
.ingestProposal(Mockito.eq(proposal), Mockito.any(AuditStamp.class), Mockito.eq(false));
}

public static void verifyIngestProposal(
EntityService<MCPUpsertBatchItem> mockService, int numberOfInvocations) {
EntityService<ChangeItemImpl> mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations))
.ingestProposal(Mockito.any(AspectsBatchImpl.class), Mockito.eq(false));
}

public static void verifySingleIngestProposal(
EntityService<MCPUpsertBatchItem> mockService, int numberOfInvocations) {
EntityService<ChangeItemImpl> mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations))
.ingestProposal(
Mockito.any(MetadataChangeProposal.class),
Mockito.any(AuditStamp.class),
Mockito.eq(false));
}

public static void verifyNoIngestProposal(EntityService<MCPUpsertBatchItem> mockService) {
public static void verifyNoIngestProposal(EntityService<ChangeItemImpl> mockService) {
Mockito.verify(mockService, Mockito.times(0))
.ingestProposal(Mockito.any(AspectsBatchImpl.class), Mockito.anyBoolean());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.MCPUpsertBatchItem;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -221,7 +221,7 @@ public void testGetUnauthorized() throws Exception {

@Test
public void testGetEntityClientException() throws Exception {
EntityService<MCPUpsertBatchItem> mockService = getMockEntityService();
EntityService<ChangeItemImpl> mockService = getMockEntityService();

Mockito.doThrow(RuntimeException.class)
.when(mockService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ private void readerExecutable(ReaderWrapper reader, UpgradeContext context) {
final RecordTemplate aspectRecord;
try {
aspectRecord =
EntityUtils.toAspectRecord(
entityName, aspectName, aspect.getMetadata(), _entityRegistry);
EntityUtils.toSystemAspect(aspect.toEntityAspect(), _entityService)
.get()
.getRecordTemplate();
} catch (Exception e) {
context
.report()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import static com.linkedin.datahub.upgrade.system.elasticsearch.util.IndexUtils.INDEX_BLOCKS_WRITE_SETTING;
import static com.linkedin.datahub.upgrade.system.elasticsearch.util.IndexUtils.getAllReindexConfigs;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME;
import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_ENTITY_NAME;

import com.datahub.util.RecordUtils;
import com.google.common.collect.ImmutableMap;
import com.linkedin.common.Status;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
Expand All @@ -14,14 +17,15 @@
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ReindexConfig;
import com.linkedin.metadata.shared.ElasticSearchIndexed;
import com.linkedin.structured.StructuredPropertyDefinition;
import com.linkedin.util.Pair;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -54,24 +58,13 @@ public int retryCount() {
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
List<ReindexConfig> reindexConfigs =
_configurationProvider.getStructuredProperties().isSystemUpdateEnabled()
? getAllReindexConfigs(
_services,
_aspectDao
.streamAspects(
STRUCTURED_PROPERTY_ENTITY_NAME,
STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME)
.map(
entityAspect ->
EntityUtils.toAspectRecord(
STRUCTURED_PROPERTY_ENTITY_NAME,
STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME,
entityAspect.getMetadata(),
_entityRegistry))
.map(recordTemplate -> (StructuredPropertyDefinition) recordTemplate)
.collect(Collectors.toSet()))
: getAllReindexConfigs(_services);
final List<ReindexConfig> reindexConfigs;
if (_configurationProvider.getStructuredProperties().isSystemUpdateEnabled()) {
reindexConfigs =
getAllReindexConfigs(_services, getActiveStructuredPropertiesDefinitions(_aspectDao));
} else {
reindexConfigs = getAllReindexConfigs(_services);
}

// Get indices to update
List<ReindexConfig> indexConfigs =
Expand Down Expand Up @@ -160,4 +153,31 @@ private boolean blockWrites(String indexName) throws InterruptedException, IOExc

return ack;
}

private static Set<StructuredPropertyDefinition> getActiveStructuredPropertiesDefinitions(
AspectDao aspectDao) {
Set<String> removedStructuredPropertyUrns =
aspectDao
.streamAspects(STRUCTURED_PROPERTY_ENTITY_NAME, STATUS_ASPECT_NAME)
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(Status.class, entityAspect.getMetadata())))
.filter(status -> status.getSecond().isRemoved())
.map(Pair::getFirst)
.collect(Collectors.toSet());

return aspectDao
.streamAspects(STRUCTURED_PROPERTY_ENTITY_NAME, STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME)
.map(
entityAspect ->
Pair.of(
entityAspect.getUrn(),
RecordUtils.toRecordTemplate(
StructuredPropertyDefinition.class, entityAspect.getMetadata())))
.filter(definition -> !removedStructuredPropertyUrns.contains(definition.getKey()))
.map(Pair::getSecond)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testSystemUpdateInit() {
@Test
public void testSystemUpdateKafkaProducerOverride() {
assertEquals(kafkaEventProducer, duheKafkaEventProducer);
assertEquals(entityService.get_producer(), duheKafkaEventProducer);
assertEquals(entityService.getProducer(), duheKafkaEventProducer);
}

@Test
Expand Down
1 change: 1 addition & 0 deletions entity-registry/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
testImplementation externalDependency.mockito
testImplementation externalDependency.mockitoInline
testCompileOnly externalDependency.lombok
testAnnotationProcessor externalDependency.lombok
testImplementation externalDependency.classGraph

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.metadata.aspect.plugins.validation;
package com.linkedin.metadata.aspect;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.linkedin.metadata.aspect;

/** Responses can be cached based on application.yaml caching configuration for the EntityClient */
public interface CachingAspectRetriever extends AspectRetriever {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.linkedin.metadata.aspect;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.SystemMetadata;
import java.lang.reflect.InvocationTargetException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public interface ReadItem {
/**
* The urn associated with the aspect
*
* @return
*/
@Nonnull
Urn getUrn();

/**
* Aspect's name
*
* @return the name
*/
@Nonnull
default String getAspectName() {
return getAspectSpec().getName();
}

@Nullable
RecordTemplate getRecordTemplate();

default <T> T getAspect(Class<T> clazz) {
return getAspect(clazz, getRecordTemplate());
}

static <T> T getAspect(Class<T> clazz, @Nullable RecordTemplate recordTemplate) {
if (recordTemplate != null) {
try {
return clazz.getConstructor(DataMap.class).newInstance(recordTemplate.data());
} catch (InstantiationException
| IllegalAccessException
| InvocationTargetException
| NoSuchMethodException e) {
throw new RuntimeException(e);
}
} else {
return null;
}
}

/**
* System information
*
* @return the system metadata
*/
@Nullable
SystemMetadata getSystemMetadata();

/**
* The entity's schema
*
* @return entity specification
*/
@Nonnull
EntitySpec getEntitySpec();

/**
* The aspect's schema
*
* @return aspect's specification
*/
@Nonnull
AspectSpec getAspectSpec();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.linkedin.metadata.aspect;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.UrnUtils;
import java.sql.Timestamp;
import javax.annotation.Nonnull;

/**
* An aspect along with system metadata and creation timestamp. Represents an aspect as stored in
* primary storage.
*/
public interface SystemAspect extends ReadItem {
long getVersion();

Timestamp getCreatedOn();

String getCreatedBy();

@Nonnull
default AuditStamp getAuditStamp() {
return new AuditStamp()
.setActor(UrnUtils.getUrn(getCreatedBy()))
.setTime(getCreatedOn().getTime());
}
}
Loading

0 comments on commit a10b734

Please sign in to comment.