From bb3c3db63b2a99223001e3a292629c4cc89ade4d Mon Sep 17 00:00:00 2001 From: GordeaS Date: Tue, 20 Feb 2024 18:22:58 +0100 Subject: [PATCH] replace wikidata country ids in consolidated version, allow zoho sync application to stop when tasks are failing #EA-3641 #EA-3720 --- .../config/EntityManagementConfiguration.java | 10 +++ .../utils/EntityRecordUtils.java | 12 +++ .../04_migrate_organization_country.txt | 40 ++++++++++ .../entitymanagement/EntityManagementApp.java | 22 +++++- .../web/service/EntityRecordService.java | 74 ++++++++++++++----- 5 files changed, 134 insertions(+), 24 deletions(-) create mode 100644 entity-management-mongo/src/test/resources/migrations/04_migrate_organization_country.txt diff --git a/entity-management-common/src/main/java/eu/europeana/entitymanagement/common/config/EntityManagementConfiguration.java b/entity-management-common/src/main/java/eu/europeana/entitymanagement/common/config/EntityManagementConfiguration.java index 8646bdf47..368a3018a 100644 --- a/entity-management-common/src/main/java/eu/europeana/entitymanagement/common/config/EntityManagementConfiguration.java +++ b/entity-management-common/src/main/java/eu/europeana/entitymanagement/common/config/EntityManagementConfiguration.java @@ -176,6 +176,7 @@ public class EntityManagementConfiguration implements InitializingBean { private String roleVocabularyFilename; Map countryMappings = new HashMap<>(); + Map wikidataCountryMappings = new HashMap<>(); Map roleMappings = new HashMap<>(); @Autowired @@ -236,7 +237,12 @@ private void initCountryMappings() throws IOException { String contents = reader.lines().collect(Collectors.joining(System.lineSeparator())); List countryMappingList = emJsonMapper.readValue(contents, new TypeReference>(){}); for (ZohoLabelUriMapping countryMapping : countryMappingList) { + //init zoho country mapping countryMappings.put(countryMapping.getZohoLabel(), countryMapping); + //init wikidata country mapping + if(StringUtils.isNotEmpty(countryMapping.getWikidataUri())){ + wikidataCountryMappings.put(countryMapping.getWikidataUri(), countryMapping.getEntityUri()); + } } } } @@ -460,4 +466,8 @@ public Map getRoleMappings() { public String getRoleVocabularyFilename() { return roleVocabularyFilename; } + + public Map getWikidataCountryMappings() { + return wikidataCountryMappings; + } } diff --git a/entity-management-definitions/src/main/java/eu/europeana/entitymanagement/utils/EntityRecordUtils.java b/entity-management-definitions/src/main/java/eu/europeana/entitymanagement/utils/EntityRecordUtils.java index 10a88344c..785e247ab 100644 --- a/entity-management-definitions/src/main/java/eu/europeana/entitymanagement/utils/EntityRecordUtils.java +++ b/entity-management-definitions/src/main/java/eu/europeana/entitymanagement/utils/EntityRecordUtils.java @@ -110,4 +110,16 @@ public static List getEntityIds(List entities) { } return entities.stream().map(e -> e.getEntityId()).toList(); } + + + /** + * Checks if the entity with the given id is a Europeana entity (data.europeana.eu) + * + * @param id entity id + * @return true if given entity is a Europeana entity, false otherwise + */ + public static boolean isEuropeanaEntity(String id) { + + return id!=null && id.startsWith(WebEntityFields.BASE_DATA_EUROPEANA_URI); + } } diff --git a/entity-management-mongo/src/test/resources/migrations/04_migrate_organization_country.txt b/entity-management-mongo/src/test/resources/migrations/04_migrate_organization_country.txt new file mode 100644 index 000000000..b8a1328f3 --- /dev/null +++ b/entity-management-mongo/src/test/resources/migrations/04_migrate_organization_country.txt @@ -0,0 +1,40 @@ +#CHECK organizations +db.getCollection('EntityRecord').count({"entity.type":"Organization"}) +4574 + +#DELETE Organizations +db.getCollection('EntityRecord').deleteMany({"entity.type":"Organization"}) + +#Remove From Generator +db.getCollection("EntityIdGenerator").deleteMany({"_id" : "Organization"}) + +#remove from ZohoSyncReport +db.getCollection("ZohoSyncReport").find({}) +db.getCollection("ZohoSyncReport").deleteMany({}) + + +#check remove failed tasks for organizations +db.getCollection("FailedTasks").find({entityId: /.*organization.*/}) +db.getCollection("FailedTasks").deleteMany({entityId: /.*organization.*/}) + +#check existing schedules tasks +db.getCollection("ScheduledTasks").find({}) +db.getCollection("ScheduledTasks").deleteMany({}) + +#job instances +db.getCollection("JobInstance").deleteMany({}) + +#check failed job executions +db.getCollection("JobExecution").find({status: {$ne: "COMPLETED" } }) +db.getCollection("JobExecution").deleteMany({}) + + +#check failed job executions +db.getCollection("StepExecution").find({status: {$ne: "COMPLETED" } }) +db.getCollection("StepExecution").deleteMany({}) + +#clean execution context +db.getCollection("ExecutionContext").deleteMany({}) + +#eventually +#db.getCollection("Sequence").delete({}) \ No newline at end of file diff --git a/entity-management-web/src/main/java/eu/europeana/entitymanagement/EntityManagementApp.java b/entity-management-web/src/main/java/eu/europeana/entitymanagement/EntityManagementApp.java index 4ba75eeb8..8879b896e 100644 --- a/entity-management-web/src/main/java/eu/europeana/entitymanagement/EntityManagementApp.java +++ b/entity-management-web/src/main/java/eu/europeana/entitymanagement/EntityManagementApp.java @@ -69,13 +69,27 @@ public static void main(String[] args) { Arrays.toString(args)); } ScheduledTaskService scheduledTaskService = getScheduledTasksService(context); - long runningTasks; + long notCompletedTasks = 0; + boolean processingComplete = false; do { //wait for execution of schedules tasks - runningTasks = scheduledTaskService.getRunningTasksCount(); + long currentRunningTasks = scheduledTaskService.getRunningTasksCount(); + // log progress if (LOG.isInfoEnabled()) { - LOG.info("Scheduled Tasks to process : {}", runningTasks); + LOG.info("Scheduled Tasks to process : {}", notCompletedTasks); } + + //failed tasks will not complete, therefore not all scheduled tasks are marked as completed in the database + //untill we have a better mechanism to reschedule failed tasks we wait for the next executions to mark them as complete + if (currentRunningTasks == 0 || currentRunningTasks == notCompletedTasks){ + //if the open tasks is the same after waiting interval, than the processing is considered complete + processingComplete = true; + currentRunningTasks = 0; + } else { + processingComplete = false; + notCompletedTasks = currentRunningTasks; + } + try { Thread.sleep(Duration.ofMinutes(WAITING_INTREVAL).toMillis()); } catch (InterruptedException e) { @@ -83,7 +97,7 @@ public static void main(String[] args) { SpringApplication.exit(context); System.exit(-2); } - } while (runningTasks > 0); + } while (notCompletedTasks > 0); // failed application execution should be indicated with negative codes LOG.info("Stoping application after processing all Schdeduled Tasks!"); diff --git a/entity-management-web/src/main/java/eu/europeana/entitymanagement/web/service/EntityRecordService.java b/entity-management-web/src/main/java/eu/europeana/entitymanagement/web/service/EntityRecordService.java index b4339d5ed..e40b49a1f 100644 --- a/entity-management-web/src/main/java/eu/europeana/entitymanagement/web/service/EntityRecordService.java +++ b/entity-management-web/src/main/java/eu/europeana/entitymanagement/web/service/EntityRecordService.java @@ -479,8 +479,7 @@ public EntityRecord createEntityFromRequest(Entity europeanaProxyEntity, // prevent registration of organizations if id generation is not enabled and now EuropeanaID // available in zoho boolean isZohoOrg = isZohoOrg(externalEntityId, datasourceResponse); - if (isZohoOrg && !emConfiguration.isGenerateOrganizationEuropeanaId() - && predefinedEntityId == null) { + if (isRegistrationRejected(predefinedEntityId, isZohoOrg)) { throw new EntityCreationException( "This instance is not allowed to register new Organizations. Registration of external entity refused: " + externalEntityId); @@ -508,6 +507,11 @@ public EntityRecord createEntityFromRequest(Entity europeanaProxyEntity, return entityRecordRepository.save(entityRecord); } + boolean isRegistrationRejected(String predefinedEntityId, boolean isZohoOrg) { + return isZohoOrg && !emConfiguration.isGenerateOrganizationEuropeanaId() + && predefinedEntityId == null; + } + EntityRecord buildEntityRecordObject(String entityId, Entity europeanaProxyEntity, Entity datasourceResponse, DataSource dataSource, String externalEntityId, boolean isZohoOrg) throws EntityCreationException { @@ -1474,28 +1478,58 @@ void updateEuropeanaIDFieldInZoho(String zohoOrganizationUrl, String europeanaId public void processReferenceFields(Entity entity) { if (EntityTypes.isOrganization(entity.getType())) { Organization org = (Organization) entity; - //country reference - if (StringUtils.isNotEmpty(org.getCountryId())) { - EntityRecord orgCountry = entityRecordRepository.findByEntityId(org.getCountryId()); + //update country reference + processCountryReference(org); + + //update role reference + processRoleReference(org); + } + } + + void processRoleReference(Organization org) { + if(org.getEuropeanaRoleIds()!=null && !org.getEuropeanaRoleIds().isEmpty()) { + List vocabs=vocabRepository.findByUri(org.getEuropeanaRoleIds()); + if (vocabs.isEmpty()) { + logger.warn( + "No vocabularies with the uris: {} were found in the database. Cannot assign role reference to organization with id {}", + org.getEuropeanaRoleIds(), org.getEntityId()); + } else { + org.setEuropeanaRoleRefs(vocabs); + } + } + } + + void processCountryReference(Organization org) { + //country reference + if (StringUtils.isNotEmpty(org.getCountryId())) { + String europeanaCountryId = getEuropeanaCountryId(org); + if(europeanaCountryId == null) { + logger.warn("Dropping unsupported country id in consolidated entity version: {} -- {} ", org.getEntityId(), org.getCountryId()); + org.setCountryId(null); + } else { + //replace wikidata country ids + org.setCountryId(europeanaCountryId); + //search reference + EntityRecord orgCountry = entityRecordRepository.findByEntityId(europeanaCountryId); if (orgCountry == null) { - logger.info( - "No entity record with the entity id: {} was found in the database. Cannot assign country reference to organization with id {}", - org.getCountryId(), org.getEntityId()); + logger.warn( + "No country found in database for the entity id: {}. Cannot assign country reference to organization with id {}", + europeanaCountryId, org.getEntityId()); } else { org.setCountryRef(orgCountry); - } - } - //role reference - if(org.getEuropeanaRoleIds()!=null && !org.getEuropeanaRoleIds().isEmpty()) { - List vocabs=vocabRepository.findByUri(org.getEuropeanaRoleIds()); - if (vocabs.isEmpty()) { - logger.info( - "No vocabularies with the uris: {} were found in the database. Cannot assign role reference to organization with id {}", - org.getEuropeanaRoleIds(), org.getEntityId()); - } else { - org.setEuropeanaRoleRefs(vocabs); - } + } } } } + + String getEuropeanaCountryId(Organization org) { + if(EntityRecordUtils.isEuropeanaEntity(org.getCountryId())) { + // country id is already europeana entity + return org.getCountryId(); + }else if(WikidataUtils.isWikidataEntity(org.getCountryId())) { + //get europeana country id by wikidata id + return emConfiguration.getWikidataCountryMappings().getOrDefault(org.getCountryId(), null); + } + return null; + } }