Skip to content

Commit

Permalink
Merge branch 'master' of github.com:folio-org/mod-source-record-stora…
Browse files Browse the repository at this point in the history
…ge into MODSOURCE-860
  • Loading branch information
RuslanLavrov committed Feb 10, 2025
2 parents 7a9df6f + 9474145 commit 9031c70
Show file tree
Hide file tree
Showing 12 changed files with 539 additions and 161 deletions.
2 changes: 1 addition & 1 deletion mod-source-record-storage-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@
</dependencies>

<properties>
<testcontainers.version>1.18.3</testcontainers.version>
<testcontainers.version>1.20.4</testcontainers.version>
<basedir>${project.parent.basedir}</basedir>
<ramlfiles_path>${project.parent.basedir}/ramls</ramlfiles_path>
<jooq.version>3.16.19</jooq.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.folio.consumers;

import static org.folio.dao.util.RecordDaoUtil.filterRecordByExternalId;
import static org.folio.services.util.EventHandlingUtil.toOkapiHeaders;

import io.vertx.core.Future;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Collections;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.util.RecordType;
Expand Down Expand Up @@ -34,6 +37,10 @@ public AuthorityDomainKafkaHandler(RecordService recordService) {
@Override
public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord) {
log.trace("handle:: Handling kafka record: '{}'", consumerRecord);

var kafkaHeaders = consumerRecord.headers();
var okapiHeaders = toOkapiHeaders(kafkaHeaders);

String authorityId = consumerRecord.key();
if (isUnexpectedDomainEvent(consumerRecord)) {
log.trace("handle:: Expected only {} domain type. Skipping authority domain kafka record [ID: '{}']",
Expand All @@ -48,7 +55,7 @@ public Future<String> handle(KafkaConsumerRecord<String, String> consumerRecord)
logInput(authorityId, eventSubType, tenantId);
return (switch (eventSubType) {
case SOFT_DELETE -> performSoftDelete(authorityId, tenantId);
case HARD_DELETE -> performHardDelete(authorityId, tenantId);
case HARD_DELETE -> performHardDelete(authorityId, okapiHeaders);
}).onFailure(throwable -> logError(authorityId, eventSubType, tenantId));
}

Expand All @@ -66,8 +73,8 @@ private Future<String> performSoftDelete(String authorityId, String tenantId) {
}).map(authorityId);
}

private Future<String> performHardDelete(String authorityId, String tenantId) {
return recordService.deleteRecordsByExternalId(authorityId, tenantId).map(authorityId);
private Future<String> performHardDelete(String authorityId, Map<String, String> okapiHeaders) {
return recordService.deleteRecordsByExternalId(authorityId, okapiHeaders).map(authorityId);
}

private void logError(String authorityId, EventSubType subType, String tenantId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,10 @@ Future<RecordsBatchResponse> saveRecordsByExternalIds(List<String> externalIds,
* Deletes in transaction all records associated with externalId
*
* @param externalId external id
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with true if succeeded
*/
Future<Boolean> deleteRecordsByExternalId(String externalId, String tenantId);
Future<Boolean> deleteRecordsByExternalId(String externalId, Map<String, String> okapiHeaders);

/**
* Performs purge the 'DELETED' records.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.folio.services.util.AdditionalFieldsUtil.TAG_005;
import static org.folio.services.util.AdditionalFieldsUtil.TAG_00X_PREFIX;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand All @@ -20,6 +21,7 @@
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dbschema.ObjectMapperTool;
import org.marc4j.MarcException;
import org.marc4j.MarcJsonReader;
import org.marc4j.MarcJsonWriter;
Expand Down Expand Up @@ -254,4 +256,13 @@ private static List<String> getSourceFields(String source) {
}
return sourceFields;
}

public static <T> T clone(T obj, Class<T> type) {
try {
final ObjectMapper jsonMapper = ObjectMapperTool.getMapper();
return jsonMapper.readValue(jsonMapper.writeValueAsString(obj), type);
} catch (JsonProcessingException ex) {
throw new IllegalArgumentException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ Future<RecordsBatchResponse> saveRecordsByExternalIds(List<String> externalIds,
* Deletes records by external id
*
* @param externalId external id
* @param tenantId tenant id
* @param okapiHeaders okapi headers
* @return future with true if succeeded
*/
Future<Void> deleteRecordsByExternalId(String externalId, String tenantId);
Future<Void> deleteRecordsByExternalId(String externalId, Map<String, String> okapiHeaders);

/**
* Creates new updated Record with incremented generation linked to a new Snapshot, and sets OLD status to the "old" Record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ public Future<Boolean> deleteRecordsBySnapshotId(String snapshotId, String tenan
}

@Override
public Future<Void> deleteRecordsByExternalId(String externalId, String tenantId) {
return recordDao.deleteRecordsByExternalId(externalId, tenantId).map(b -> null);
public Future<Void> deleteRecordsByExternalId(String externalId, Map<String, String> okapiHeaders) {
return recordDao.deleteRecordsByExternalId(externalId, okapiHeaders).map(b -> null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_TOKEN_HEADER;
import static org.folio.rest.util.OkapiConnectionParams.OKAPI_URL_HEADER;
import static org.folio.services.domainevent.SourceRecordDomainEventType.SOURCE_RECORD_CREATED;
import static org.folio.services.domainevent.SourceRecordDomainEventType.SOURCE_RECORD_DELETED;
import static org.folio.services.domainevent.SourceRecordDomainEventType.SOURCE_RECORD_UPDATED;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.vertx.core.json.JsonObject;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.folio.dao.util.MarcUtil;
import org.folio.rest.jaxrs.model.Record;
import org.folio.services.kafka.KafkaSender;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -31,39 +35,59 @@ public class RecordDomainEventPublisher {
private KafkaSender kafkaSender;

public void publishRecordCreated(Record created, Map<String, String> okapiHeaders) {
publishRecord(created, okapiHeaders, SOURCE_RECORD_CREATED);
publishRecord(new DomainEventPayload(null, simplifyRecord(created)), okapiHeaders, SOURCE_RECORD_CREATED);
}

public void publishRecordUpdated(Record updated, Map<String, String> okapiHeaders) {
publishRecord(updated, okapiHeaders, SOURCE_RECORD_UPDATED);
public void publishRecordUpdated(Record old, Record updated, Map<String, String> okapiHeaders) {
publishRecord(new DomainEventPayload(simplifyRecord(old), simplifyRecord(updated)), okapiHeaders, SOURCE_RECORD_UPDATED);
}

private void publishRecord(Record aRecord, Map<String, String> okapiHeaders, SourceRecordDomainEventType eventType) {
if (!domainEventsEnabled || notValidForPublishing(aRecord)) {
public void publishRecordDeleted(Record deleted, Map<String, String> okapiHeaders) {
publishRecord(new DomainEventPayload(simplifyRecord(deleted), null), okapiHeaders, SOURCE_RECORD_DELETED);
}

private void publishRecord(DomainEventPayload domainEventPayload, Map<String, String> okapiHeaders, SourceRecordDomainEventType eventType) {
if (!domainEventsEnabled || notValidForPublishing(domainEventPayload)) {
return;
}
try {
Record aRecord = domainEventPayload.newRecord() != null ? domainEventPayload.newRecord() : domainEventPayload.oldRecord();
var kafkaHeaders = getKafkaHeaders(okapiHeaders, aRecord.getRecordType());
var key = aRecord.getId();
var jsonContent = JsonObject.mapFrom(aRecord);
var jsonContent = JsonObject.mapFrom(domainEventPayload);
kafkaSender.sendEventToKafka(okapiHeaders.get(OKAPI_TENANT_HEADER), jsonContent.encode(),
eventType.name(), kafkaHeaders, key);
} catch (Exception e) {
LOG.error("Exception during Record domain event sending", e);
LOG.warn("Exception during Record domain event sending", e);
}
}

private boolean notValidForPublishing(Record aRecord) {
private boolean notValidForPublishing(DomainEventPayload domainEventPayload) {
if (domainEventPayload.newRecord() == null && domainEventPayload.oldRecord() == null) {
LOG.warn("Old and new records are null and won't be sent as domain event");
return true;
}
if (domainEventPayload.newRecord() != null && notValidRecord(domainEventPayload.newRecord())) {
return true;
}
return domainEventPayload.oldRecord() != null && notValidRecord(domainEventPayload.oldRecord());
}

private static boolean notValidRecord(Record aRecord) {
if (isNull(aRecord)) {
LOG.warn("Record is null and won't be sent as domain event");
return true;
}
if (isNull(aRecord.getRecordType())) {
LOG.error("Record [with id {}] contains no type information and won't be sent as domain event", aRecord.getId());
LOG.warn("Record [with id {}] contains no type information and won't be sent as domain event", aRecord.getId());
return true;
}
if (isNull(aRecord.getParsedRecord())) {
LOG.error("Record [with id {}] contains no parsed record and won't be sent as domain event", aRecord.getId());
LOG.warn("Record [with id {}] contains no parsed record and won't be sent as domain event", aRecord.getId());
return true;
}
if (isNull(aRecord.getParsedRecord().getContent())) {
LOG.error("Record [with id {}] contains no parsed record content and won't be sent as domain event",
LOG.warn("Record [with id {}] contains no parsed record content and won't be sent as domain event",
aRecord.getId());
return true;
}
Expand All @@ -79,4 +103,15 @@ private List<KafkaHeader> getKafkaHeaders(Map<String, String> okapiHeaders, Reco
);
}

@JsonInclude(JsonInclude.Include.NON_NULL)
private record DomainEventPayload(@JsonProperty("old") Record oldRecord, @JsonProperty("new") Record newRecord) {}

private Record simplifyRecord(Record aRecord) {
if (aRecord != null) {
return MarcUtil.clone(aRecord, Record.class)
.withErrorRecord(null)
.withRawRecord(null);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package org.folio.services.domainevent;

public enum SourceRecordDomainEventType {
SOURCE_RECORD_CREATED, SOURCE_RECORD_UPDATED
SOURCE_RECORD_CREATED, SOURCE_RECORD_UPDATED, SOURCE_RECORD_DELETED
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.folio.rest.jaxrs.model.Snapshot;
import org.folio.rest.jaxrs.model.SourceRecord;
import org.folio.rest.jooq.enums.RecordState;
import org.folio.rest.util.OkapiConnectionParams;
import org.folio.services.AbstractLBServiceTest;
import org.folio.services.RecordService;
import org.folio.services.RecordServiceImpl;
Expand Down Expand Up @@ -156,6 +157,9 @@ public void shouldHardDeleteMarcAuthorityRecordOnHardDeleteDomainEvent(TestConte
private ConsumerRecord<String, String> getConsumerRecord(HashMap<String, String> payload) {
ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>("topic", 1, 1, recordId, Json.encode(payload));
consumerRecord.headers().add(new RecordHeader("domain-event-type", "DELETE".getBytes(StandardCharsets.UTF_8)));
consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_URL_HEADER, OKAPI_URL.getBytes(StandardCharsets.UTF_8)));
consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_TENANT_HEADER, TENANT_ID.getBytes(StandardCharsets.UTF_8)));
consumerRecord.headers().add(new RecordHeader(OkapiConnectionParams.OKAPI_TOKEN_HEADER, TOKEN.getBytes(StandardCharsets.UTF_8)));
return consumerRecord;
}

Expand Down
Loading

0 comments on commit 9031c70

Please sign in to comment.