Skip to content

Commit

Permalink
Merge pull request #302 from europeana/EA-3720_zoho_eu_migration
Browse files Browse the repository at this point in the history
replace wikidata country ids in consolidated version, allow zoho sync
  • Loading branch information
gsergiu authored Feb 20, 2024
2 parents 36672a8 + bb3c3db commit 142d4b6
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public class EntityManagementConfiguration implements InitializingBean {
private String roleVocabularyFilename;

Map<String, ZohoLabelUriMapping> countryMappings = new HashMap<>();
Map<String, String> wikidataCountryMappings = new HashMap<>();
Map<String, String> roleMappings = new HashMap<>();

@Autowired
Expand Down Expand Up @@ -236,7 +237,12 @@ private void initCountryMappings() throws IOException {
String contents = reader.lines().collect(Collectors.joining(System.lineSeparator()));
List<ZohoLabelUriMapping> countryMappingList = emJsonMapper.readValue(contents, new TypeReference<List<ZohoLabelUriMapping>>(){});
for (ZohoLabelUriMapping countryMapping : countryMappingList) {
//init zoho country mapping
countryMappings.put(countryMapping.getZohoLabel(), countryMapping);
//init wikidata country mapping
if(StringUtils.isNotEmpty(countryMapping.getWikidataUri())){
wikidataCountryMappings.put(countryMapping.getWikidataUri(), countryMapping.getEntityUri());
}
}
}
}
Expand Down Expand Up @@ -460,4 +466,8 @@ public Map<String, String> getRoleMappings() {
public String getRoleVocabularyFilename() {
return roleVocabularyFilename;
}

public Map<String, String> getWikidataCountryMappings() {
return wikidataCountryMappings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,16 @@ public static List<String> getEntityIds(List<EntityRecord> entities) {
}
return entities.stream().map(e -> e.getEntityId()).toList();
}


/**
* Checks if the entity with the given id is a Europeana entity (data.europeana.eu)
*
* @param id entity id
* @return true if given entity is a Europeana entity, false otherwise
*/
public static boolean isEuropeanaEntity(String id) {

return id!=null && id.startsWith(WebEntityFields.BASE_DATA_EUROPEANA_URI);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#CHECK organizations
db.getCollection('EntityRecord').count({"entity.type":"Organization"})
4574

#DELETE Organizations
db.getCollection('EntityRecord').deleteMany({"entity.type":"Organization"})

#Remove From Generator
db.getCollection("EntityIdGenerator").deleteMany({"_id" : "Organization"})

#remove from ZohoSyncReport
db.getCollection("ZohoSyncReport").find({})
db.getCollection("ZohoSyncReport").deleteMany({})


#check remove failed tasks for organizations
db.getCollection("FailedTasks").find({entityId: /.*organization.*/})
db.getCollection("FailedTasks").deleteMany({entityId: /.*organization.*/})

#check existing schedules tasks
db.getCollection("ScheduledTasks").find({})
db.getCollection("ScheduledTasks").deleteMany({})

#job instances
db.getCollection("JobInstance").deleteMany({})

#check failed job executions
db.getCollection("JobExecution").find({status: {$ne: "COMPLETED" } })
db.getCollection("JobExecution").deleteMany({})


#check failed job executions
db.getCollection("StepExecution").find({status: {$ne: "COMPLETED" } })
db.getCollection("StepExecution").deleteMany({})

#clean execution context
db.getCollection("ExecutionContext").deleteMany({})

#eventually
#db.getCollection("Sequence").delete({})
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,35 @@ public static void main(String[] args) {
Arrays.toString(args));
}
ScheduledTaskService scheduledTaskService = getScheduledTasksService(context);
long runningTasks;
long notCompletedTasks = 0;
boolean processingComplete = false;
do {
//wait for execution of schedules tasks
runningTasks = scheduledTaskService.getRunningTasksCount();
long currentRunningTasks = scheduledTaskService.getRunningTasksCount();
// log progress
if (LOG.isInfoEnabled()) {
LOG.info("Scheduled Tasks to process : {}", runningTasks);
LOG.info("Scheduled Tasks to process : {}", notCompletedTasks);
}

//failed tasks will not complete, therefore not all scheduled tasks are marked as completed in the database
//untill we have a better mechanism to reschedule failed tasks we wait for the next executions to mark them as complete
if (currentRunningTasks == 0 || currentRunningTasks == notCompletedTasks){
//if the open tasks is the same after waiting interval, than the processing is considered complete
processingComplete = true;
currentRunningTasks = 0;
} else {
processingComplete = false;
notCompletedTasks = currentRunningTasks;
}

try {
Thread.sleep(Duration.ofMinutes(WAITING_INTREVAL).toMillis());
} catch (InterruptedException e) {
LOG.error("Cannot complete execution!", e);
SpringApplication.exit(context);
System.exit(-2);
}
} while (runningTasks > 0);
} while (notCompletedTasks > 0);

// failed application execution should be indicated with negative codes
LOG.info("Stoping application after processing all Schdeduled Tasks!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,7 @@ public EntityRecord createEntityFromRequest(Entity europeanaProxyEntity,
// prevent registration of organizations if id generation is not enabled and now EuropeanaID
// available in zoho
boolean isZohoOrg = isZohoOrg(externalEntityId, datasourceResponse);
if (isZohoOrg && !emConfiguration.isGenerateOrganizationEuropeanaId()
&& predefinedEntityId == null) {
if (isRegistrationRejected(predefinedEntityId, isZohoOrg)) {
throw new EntityCreationException(
"This instance is not allowed to register new Organizations. Registration of external entity refused: "
+ externalEntityId);
Expand Down Expand Up @@ -508,6 +507,11 @@ public EntityRecord createEntityFromRequest(Entity europeanaProxyEntity,
return entityRecordRepository.save(entityRecord);
}

boolean isRegistrationRejected(String predefinedEntityId, boolean isZohoOrg) {
return isZohoOrg && !emConfiguration.isGenerateOrganizationEuropeanaId()
&& predefinedEntityId == null;
}

EntityRecord buildEntityRecordObject(String entityId, Entity europeanaProxyEntity,
Entity datasourceResponse, DataSource dataSource, String externalEntityId, boolean isZohoOrg)
throws EntityCreationException {
Expand Down Expand Up @@ -1474,28 +1478,58 @@ void updateEuropeanaIDFieldInZoho(String zohoOrganizationUrl, String europeanaId
public void processReferenceFields(Entity entity) {
if (EntityTypes.isOrganization(entity.getType())) {
Organization org = (Organization) entity;
//country reference
if (StringUtils.isNotEmpty(org.getCountryId())) {
EntityRecord orgCountry = entityRecordRepository.findByEntityId(org.getCountryId());
//update country reference
processCountryReference(org);

//update role reference
processRoleReference(org);
}
}

void processRoleReference(Organization org) {
if(org.getEuropeanaRoleIds()!=null && !org.getEuropeanaRoleIds().isEmpty()) {
List<Vocabulary> vocabs=vocabRepository.findByUri(org.getEuropeanaRoleIds());
if (vocabs.isEmpty()) {
logger.warn(
"No vocabularies with the uris: {} were found in the database. Cannot assign role reference to organization with id {}",
org.getEuropeanaRoleIds(), org.getEntityId());
} else {
org.setEuropeanaRoleRefs(vocabs);
}
}
}

void processCountryReference(Organization org) {
//country reference
if (StringUtils.isNotEmpty(org.getCountryId())) {
String europeanaCountryId = getEuropeanaCountryId(org);
if(europeanaCountryId == null) {
logger.warn("Dropping unsupported country id in consolidated entity version: {} -- {} ", org.getEntityId(), org.getCountryId());
org.setCountryId(null);
} else {
//replace wikidata country ids
org.setCountryId(europeanaCountryId);
//search reference
EntityRecord orgCountry = entityRecordRepository.findByEntityId(europeanaCountryId);
if (orgCountry == null) {
logger.info(
"No entity record with the entity id: {} was found in the database. Cannot assign country reference to organization with id {}",
org.getCountryId(), org.getEntityId());
logger.warn(
"No country found in database for the entity id: {}. Cannot assign country reference to organization with id {}",
europeanaCountryId, org.getEntityId());
} else {
org.setCountryRef(orgCountry);
}
}
//role reference
if(org.getEuropeanaRoleIds()!=null && !org.getEuropeanaRoleIds().isEmpty()) {
List<Vocabulary> vocabs=vocabRepository.findByUri(org.getEuropeanaRoleIds());
if (vocabs.isEmpty()) {
logger.info(
"No vocabularies with the uris: {} were found in the database. Cannot assign role reference to organization with id {}",
org.getEuropeanaRoleIds(), org.getEntityId());
} else {
org.setEuropeanaRoleRefs(vocabs);
}
}
}
}
}

String getEuropeanaCountryId(Organization org) {
if(EntityRecordUtils.isEuropeanaEntity(org.getCountryId())) {
// country id is already europeana entity
return org.getCountryId();
}else if(WikidataUtils.isWikidataEntity(org.getCountryId())) {
//get europeana country id by wikidata id
return emConfiguration.getWikidataCountryMappings().getOrDefault(org.getCountryId(), null);
}
return null;
}
}

0 comments on commit 142d4b6

Please sign in to comment.