Skip to content

Commit

Permalink
feat(sql): ebean transaction batches
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Jul 19, 2023
1 parent 0e37907 commit 70c5b5e
Show file tree
Hide file tree
Showing 78 changed files with 1,728 additions and 864 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
updateEmbed(embed, input);

final MetadataChangeProposal proposal = buildMetadataChangeProposalWithUrn(entityUrn, EMBED_ASPECT_NAME, embed);
_entityService.ingestProposal(
_entityService.ingestSingleProposal(
proposal,
new AuditStamp().setActor(UrnUtils.getUrn(context.getActorUrn())).setTime(System.currentTimeMillis()),
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private MutationUtils() { }

public static void persistAspect(Urn urn, String aspectName, RecordTemplate aspect, Urn actor, EntityService entityService) {
final MetadataChangeProposal proposal = buildMetadataChangeProposalWithUrn(urn, aspectName, aspect);
entityService.ingestProposal(proposal, getAuditStamp(actor), false);
entityService.ingestSingleProposal(proposal, getAuditStamp(actor), false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public CompletableFuture<Boolean> get(DataFetchingEnvironment environment) throw
MetadataChangeProposal proposal =
buildMetadataChangeProposalWithUrn(actor, CORP_USER_SETTINGS_ASPECT_NAME, newSettings);

_entityService.ingestProposal(proposal, getAuditStamp(actor), false);
_entityService.ingestSingleProposal(proposal, getAuditStamp(actor), false);

return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch;
import com.linkedin.mxe.MetadataChangeProposal;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -72,9 +73,7 @@ private static MetadataChangeProposal buildSoftDeleteProposal(
}

private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
entityService.ingestProposal(AspectsBatch.builder()
.mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch;
import com.linkedin.mxe.MetadataChangeProposal;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -87,9 +88,7 @@ private static MetadataChangeProposal buildUpdateDeprecationProposal(
}

private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
entityService.ingestProposal(AspectsBatch.builder()
.mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch;
import com.linkedin.mxe.MetadataChangeProposal;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -86,9 +87,7 @@ public static void validateDomain(Urn domainUrn, EntityService entityService) {
}

private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
entityService.ingestProposal(AspectsBatch.builder()
.mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.schema.EditableSchemaFieldInfo;
import com.linkedin.schema.EditableSchemaMetadata;
Expand Down Expand Up @@ -554,9 +555,7 @@ private static GlossaryTermAssociationArray removeTermsIfExists(GlossaryTerms te
}

private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
entityService.ingestProposal(AspectsBatch.builder()
.mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch;
import com.linkedin.mxe.MetadataChangeProposal;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -270,10 +271,8 @@ public static Boolean validateRemoveInput(
}

private static void ingestChangeProposals(List<MetadataChangeProposal> changes, EntityService entityService, Urn actor) {
// TODO: Replace this with a batch ingest proposals endpoint.
for (MetadataChangeProposal change : changes) {
entityService.ingestProposal(change, getAuditStamp(actor), false);
}
entityService.ingestProposal(AspectsBatch.builder()
.mcps(changes, entityService.getEntityRegistry()).build(), getAuditStamp(actor), false);
}

public static void addCreatorAsOwner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,24 @@
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.transactions.AspectsBatch;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.MetadataChangeProposal;
import org.mockito.Mockito;

import java.util.List;


public class TestUtils {

public static EntityService getMockEntityService() {
EntityRegistry registry = new ConfigEntityRegistry(TestUtils.class.getResourceAsStream("/test-entity-registry.yaml"));
EntityService mockEntityService = Mockito.mock(EntityService.class);
Mockito.when(mockEntityService.getEntityRegistry()).thenReturn(registry);
return mockEntityService;
}

public static QueryContext getMockAllowContext() {
return getMockAllowContext("urn:li:corpuser:test");
}
Expand Down Expand Up @@ -88,21 +100,44 @@ public static QueryContext getMockDenyContext(String actorUrn, AuthorizationRequ
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations, MetadataChangeProposal proposal) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.eq(proposal),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
verifyIngestProposal(mockService, numberOfInvocations, List.of(proposal));
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations, List<MetadataChangeProposal> proposals) {
AspectsBatch batch = AspectsBatch.builder()
.mcps(proposals, mockService.getEntityRegistry())
.build();
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.eq(batch),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifySingleIngestProposal(EntityService mockService, int numberOfInvocations, MetadataChangeProposal proposal) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestSingleProposal(
Mockito.eq(proposal),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifyIngestProposal(EntityService mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestProposal(
Mockito.any(MetadataChangeProposal.class),
Mockito.any(AspectsBatch.class),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifySingleIngestProposal(EntityService mockService, int numberOfInvocations) {
Mockito.verify(mockService, Mockito.times(numberOfInvocations)).ingestSingleProposal(
Mockito.any(MetadataChangeProposal.class),
Mockito.any(AuditStamp.class),
Mockito.eq(false)
);
}

public static void verifyNoIngestProposal(EntityService mockService) {
Mockito.verify(mockService, Mockito.times(0)).ingestProposal(
Mockito.any(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class DeleteAssertionResolverTest {
public void testGetSuccess() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(true);
Mockito.when(mockService.getAspect(
Urn.createFromString(TEST_ASSERTION_URN),
Expand Down Expand Up @@ -78,7 +78,7 @@ public void testGetSuccess() throws Exception {
public void testGetSuccessNoAssertionInfoFound() throws Exception {
EntityClient mockClient = Mockito.mock(EntityClient.class);

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(true);
Mockito.when(mockService.getAspect(
Urn.createFromString(TEST_ASSERTION_URN),
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testGetSuccessAssertionAlreadyRemoved() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(false);

DeleteAssertionResolver resolver = new DeleteAssertionResolver(mockClient, mockService);
Expand Down Expand Up @@ -151,7 +151,7 @@ public void testGetSuccessAssertionAlreadyRemoved() throws Exception {
public void testGetUnauthorized() throws Exception {
// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(true);
Mockito.when(mockService.getAspect(
Urn.createFromString(TEST_ASSERTION_URN),
Expand Down Expand Up @@ -189,7 +189,7 @@ public void testGetEntityClientException() throws Exception {
Mockito.any(),
Mockito.any(Authentication.class));

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();
Mockito.when(mockService.exists(Urn.createFromString(TEST_ASSERTION_URN))).thenReturn(true);

DeleteAssertionResolver resolver = new DeleteAssertionResolver(mockClient, mockService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetchingEnvironment;

import java.util.List;
import java.util.concurrent.CompletionException;
import org.mockito.Mockito;
import org.testng.annotations.Test;
Expand All @@ -29,7 +31,7 @@ public class BatchUpdateSoftDeletedResolverTest {

@Test
public void testGetSuccessNoExistingStatus() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.when(mockService.getAspect(
Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN_1)),
Expand Down Expand Up @@ -61,20 +63,17 @@ public void testGetSuccessNoExistingStatus() throws Exception {

final MetadataChangeProposal proposal1 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_1),
STATUS_ASPECT_NAME, newStatus);

verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_2),
STATUS_ASPECT_NAME, newStatus);

verifyIngestProposal(mockService, 1, proposal2);
verifyIngestProposal(mockService, 1, List.of(proposal1, proposal2));
}

@Test
public void testGetSuccessExistingStatus() throws Exception {
final Status originalStatus = new Status().setRemoved(true);

EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.when(mockService.getAspect(
Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN_1)),
Expand Down Expand Up @@ -105,18 +104,15 @@ public void testGetSuccessExistingStatus() throws Exception {

final MetadataChangeProposal proposal1 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_1),
STATUS_ASPECT_NAME, newStatus);

verifyIngestProposal(mockService, 1, proposal1);

final MetadataChangeProposal proposal2 = MutationUtils.buildMetadataChangeProposalWithUrn(Urn.createFromString(TEST_ENTITY_URN_2),
STATUS_ASPECT_NAME, newStatus);

verifyIngestProposal(mockService, 1, proposal2);
verifyIngestProposal(mockService, 1, List.of(proposal1, proposal2));
}

@Test
public void testGetFailureResourceDoesNotExist() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.when(mockService.getAspect(
Mockito.eq(UrnUtils.getUrn(TEST_ENTITY_URN_1)),
Expand Down Expand Up @@ -148,7 +144,7 @@ public void testGetFailureResourceDoesNotExist() throws Exception {

@Test
public void testGetUnauthorized() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

BatchUpdateSoftDeletedResolver resolver = new BatchUpdateSoftDeletedResolver(mockService);

Expand All @@ -166,7 +162,7 @@ public void testGetUnauthorized() throws Exception {

@Test
public void testGetEntityClientException() throws Exception {
EntityService mockService = Mockito.mock(EntityService.class);
EntityService mockService = getMockEntityService();

Mockito.doThrow(RuntimeException.class).when(mockService).ingestProposal(
Mockito.any(),
Expand Down
Loading

0 comments on commit 70c5b5e

Please sign in to comment.