Skip to content

Commit

Permalink
DHFPROD-10566: Ignore flow when properties file is missing in legacy …
Browse files Browse the repository at this point in the history
…flow directory
  • Loading branch information
rahulvudutala authored and MarkLogic Builder committed Aug 18, 2023
1 parent 970aefd commit 974782e
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -497,10 +497,10 @@ public void upgradeProject(FlowManager flowManager) throws IOException {
}

public int upgradeLegacyFlows(FlowManager flowManager) {
return upgradeLegacyFlows(flowManager, new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
return upgradeLegacyFlows(flowManager, new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), "data-hub-STAGING", "data-hub-FINAL");
}

public int upgradeLegacyFlows(FlowManager flowManager, List<String> legacyEntities, List<String> legacyFlowTypes, List<String> legacyFlowNames) {
public int upgradeLegacyFlows(FlowManager flowManager, List<String> legacyEntities, List<String> legacyFlowTypes, List<String> legacyFlowNames, String sourceDb, String targetDb) {
Set<String> legacyEntitiesSet = new HashSet<>(legacyEntities);
Set<String> legacyFlowTypesSet = new HashSet<>(legacyFlowTypes);
Set<String> legacyFlowNamesSet = new HashSet<>(legacyFlowNames);
Expand Down Expand Up @@ -579,12 +579,12 @@ public int upgradeLegacyFlows(FlowManager flowManager, List<String> legacyEntiti
JsonNode stepPayLoad = scaffolding.getStepConfig(newStepName, stepType, newStepName, null, acceptSourceModule);
// Save StepDefinition to local file
scaffolding.saveStepDefinition(newStepName, newStepName, stepType, true);
updateStepOptionsFor4xFlow(stepName, stepFile, stepPayLoad, mainModulePath, legacyEntityDir.getName());
updateStepOptionsFor4xFlow(stepName, stepFile, stepPayLoad, mainModulePath, legacyEntityDir.getName(), sourceDb, targetDb);
// Save Step to local file
scaffolding.saveLocalStep(stepType, stepPayLoad);
// Add step to local Flow
ObjectNode stepIdObj = objectMapper.createObjectNode();
steps.put(Integer.toString(++stepNumber), stepIdObj);
steps.putIfAbsent(Integer.toString(++stepNumber), stepIdObj);
stepIdObj.put("stepId", newStepName.concat("-").concat(stepType));
flowsUpdated++;
}
Expand All @@ -598,14 +598,14 @@ public int upgradeLegacyFlows(FlowManager flowManager, List<String> legacyEntiti
return flowsUpdated;
}

private void updateStepOptionsFor4xFlow(String stepName, File stepFile, JsonNode stepPayLoad, String mainModulePath, String entityType) {
private void updateStepOptionsFor4xFlow(String stepName, File stepFile, JsonNode stepPayLoad, String mainModulePath, String entityType, String sourceDb, String finalDb) {
ObjectNode step = (ObjectNode) stepPayLoad;
ObjectMapper mapper = new ObjectMapper();
Properties properties = new Properties();
try {
File propsFile = stepFile.listFiles((File file, String name) -> name.equals(stepName.concat(".properties")))[0];
properties.load(Files.newInputStream(propsFile.toPath()));
} catch (IOException e) {
} catch (IOException | ArrayIndexOutOfBoundsException e) {
logger.warn("%s.properties file is missing in the %s directory. The dataFormat and mainModule is defaulted to json and main.sjs" +
"If the default values are inappropriate, change the values in steps/%s file", stepName, stepName, stepPayLoad.get("name").asText());
properties.put("mainModule", "main.sjs");
Expand All @@ -614,30 +614,31 @@ private void updateStepOptionsFor4xFlow(String stepName, File stepFile, JsonNode
ObjectNode optionsNode = mapper.createObjectNode();
optionsNode.put("flow", stepName);
optionsNode.put("entity", "");
optionsNode.put("dataFormat", properties.get("dataFormat").toString());
optionsNode.put("mainModuleUri", mainModulePath.concat("/").concat(properties.get("mainModule").toString()));
optionsNode.put("dataFormat", properties.getOrDefault("dataFormat", "json").toString());
optionsNode.put("mainModuleUri", mainModulePath.concat("/").concat(properties.getOrDefault("mainModule", "main.sjs").toString()));

if(step.get("stepDefinitionType").asText().equals("custom")) {
step.put("sourceDatabase", "data-hub-STAGING");
step.put("targetDatabase", "data-hub-FINAL");
step.put("sourceDatabase", sourceDb);
step.put("targetDatabase", finalDb);
step.put("sourceQueryIsModule", true);
mainModulePath = mainModulePath.concat("/").concat(properties.get("collectorModule").toString());
mainModulePath = mainModulePath.concat("/").concat(properties.getOrDefault("collectorModule", "collector.sjs").toString());
ObjectNode sourceModuleNode = (ObjectNode) step.get("sourceModule");
sourceModuleNode.put("modulePath", mainModulePath);
sourceModuleNode.put("functionName", "collect");

optionsNode.put("entity", entityType);
} else {
step.put("targetDatabase", "data-hub-STAGING");
step.put("targetDatabase", sourceDb);
step.put("inputFilePath", "");
}
step.put("options", optionsNode);
step.putIfAbsent("options", optionsNode);

step.putArray("collections").add(stepPayLoad.get("name").asText()).add(entityType);
step.put("permissions", "data-hub-common,read,data-hub-common,update");
step.put("stepId", step.get("name").asText().concat("-").concat(step.get("stepDefinitionType").asText()));
step.put("isUpgradedLegacyFlow", true);
step.put("sourceFormat", properties.get("dataFormat").toString());
step.put("targetFormat", properties.get("dataFormat").toString());
step.put("sourceFormat", properties.getOrDefault("dataFormat", "json").toString());
step.put("targetFormat", properties.getOrDefault("dataFormat", "json").toString());
}

private JsonNode retrieveEntityFromCommunityNode(String modelName, JsonNode modelNodes, Map<String, JsonNode> entityModels) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void upgrade43xToCurrentVersion() throws IOException {
File mappingDir = new File(projectDir, "mappings");
File entitiesDir = new File(projectDir, "entities");
verifyDirContents(mappingDir, 1);
verifyDirContents(entitiesDir, 3);
verifyDirContents(entitiesDir, 4);

File finalDbFile = hubProject.getUserConfigDir().resolve("databases").resolve("final-database.json").toFile();
ObjectNode db = (ObjectNode) ObjectMapperFactory.getObjectMapper().readTree(finalDbFile);
Expand Down Expand Up @@ -228,7 +228,7 @@ public void onlyLegacyEntitiesUpgrade() throws IOException {
List<String> legacyFlowNames = new ArrayList<>();
legacyEntities.add("Customer");
legacyEntities.add("Product");
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames);
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames, getHubConfig().getStagingDbName(), getHubConfig().getFinalDbName());
assertTrue(hubProject.getFlowsDir().resolve("dh_Upgrade_CustomerFlow.flow.json").toFile().exists());
assertTrue(hubProject.getFlowsDir().resolve("dh_Upgrade_ProductFlow.flow.json").toFile().exists());
assertFalse(hubProject.getFlowsDir().resolve("dh_Upgrade_OrderFlow.flow.json").toFile().exists());
Expand All @@ -241,7 +241,7 @@ public void onlyFlowTypeUpgrade() throws IOException {
List<String> legacyFlowTypes = new ArrayList<>();
List<String> legacyFlowNames = new ArrayList<>();
legacyFlowTypes.add("input");
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames);
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames, getHubConfig().getStagingDbName(), getHubConfig().getFinalDbName());
assertTrue(hubProject.getStepsPath(StepDefinition.StepDefinitionType.INGESTION).toFile().exists());
assertFalse(hubProject.getStepsPath(StepDefinition.StepDefinitionType.CUSTOM).toFile().exists());
}
Expand All @@ -254,7 +254,7 @@ public void onlyFlowNamesUpgrade() throws IOException {
List<String> legacyFlowNames = new ArrayList<>();
legacyFlowNames.add("Load Customers");
legacyFlowNames.add("Harmonize Products");
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames);
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames, getHubConfig().getStagingDbName(), getHubConfig().getFinalDbName());
assertTrue(hubProject.getStepsPath(StepDefinition.StepDefinitionType.INGESTION).resolve("LoadCustomers.step.json").toFile().exists());
assertFalse(hubProject.getStepsPath(StepDefinition.StepDefinitionType.INGESTION).resolve("LoadOrders.step.json").toFile().exists());
assertFalse(hubProject.getStepsPath(StepDefinition.StepDefinitionType.INGESTION).resolve("LoadProducts.step.json").toFile().exists());
Expand All @@ -270,7 +270,7 @@ public void entityAndFlowNameUpgrade() throws IOException {
legacyEntities.add("Customer");
legacyFlowNames.add("Load Customers");
legacyFlowNames.add("Harmonize Products");
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames);
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames, getHubConfig().getStagingDbName(), getHubConfig().getFinalDbName());
assertTrue(hubProject.getStepsPath(StepDefinition.StepDefinitionType.INGESTION).resolve("LoadCustomers.step.json").toFile().exists());
assertFalse(hubProject.getStepsPath(StepDefinition.StepDefinitionType.CUSTOM).resolve("HarmonizeProducts.step.json").toFile().exists());
}
Expand All @@ -283,7 +283,7 @@ public void entityAndFlowTypeUpgrade() throws IOException {
List<String> legacyFlowNames = new ArrayList<>();
legacyEntities.add("Product");
legacyFlowTypes.add("input");
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames);
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames, getHubConfig().getStagingDbName(), getHubConfig().getFinalDbName());
assertTrue(hubProject.getStepsPath(StepDefinition.StepDefinitionType.INGESTION).resolve("LoadProducts.step.json").toFile().exists());
assertFalse(hubProject.getStepsPath(StepDefinition.StepDefinitionType.CUSTOM).resolve("HarmonizeProducts.step.json").toFile().exists());
}
Expand All @@ -295,7 +295,7 @@ public void nonExistentFlowName() throws IOException {
List<String> legacyFlowTypes = new ArrayList<>();
List<String> legacyFlowNames = new ArrayList<>();
legacyFlowNames.add("NonExistentFlow");
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames);
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames, getHubConfig().getStagingDbName(), getHubConfig().getFinalDbName());
assertEquals(0, hubProject.getFlowsDir().toFile().listFiles().length);
}

Expand All @@ -307,13 +307,13 @@ public void multipleLegacyFlowUpgrades() throws IOException {
List<String> legacyFlowTypes = new ArrayList<>();
List<String> legacyFlowNames = new ArrayList<>();
legacyFlowTypes.add("input");
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames);
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames, getHubConfig().getStagingDbName(), getHubConfig().getFinalDbName());

assertTrue(hubProject.getStepsPath(StepDefinition.StepDefinitionType.INGESTION).toFile().exists());
assertFalse(hubProject.getStepsPath(StepDefinition.StepDefinitionType.CUSTOM).toFile().exists());

legacyFlowTypes = new ArrayList<>();
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames);
hubProject.upgradeLegacyFlows(flowManager, legacyEntities, legacyFlowTypes, legacyFlowNames, getHubConfig().getStagingDbName(), getHubConfig().getFinalDbName());
verify4xUpgradedFlows();
}

Expand Down Expand Up @@ -520,6 +520,18 @@ private void verify4xUpgradedFlows() throws IOException {
assertEquals("Product", node.get("options").get("entity").asText());
assertEquals("json", node.get("options").get("dataFormat").asText());
assertEquals("/entities/Product/harmonize/Harmonize Products/main.sjs", node.get("options").get("mainModuleUri").asText());

// validate the step default configuration when properties file is missing
assertTrue(hubProject.getFlowsDir().resolve("dh_Upgrade_EmployeeFlow.flow.json").toFile().exists());
assertTrue(hubProject.getStepsPath(StepDefinition.StepDefinitionType.CUSTOM).resolve("NoProperties.step.json").toFile().exists());
assertTrue(hubProject.getStepDefinitionPath(StepDefinition.StepDefinitionType.CUSTOM).resolve("NoProperties").toFile().exists());
assertNotNull(hubProject.getCustomModuleDir("NoProperties", StepDefinition.StepDefinitionType.CUSTOM.toString()));
JsonNode noPropertiesStep = mapper.readTree(hubProject.getStepFile(StepDefinition.StepDefinitionType.CUSTOM, "NoProperties"));
assertEquals("json", noPropertiesStep.get("sourceFormat").asText());
assertEquals("json", noPropertiesStep.get("targetFormat").asText());
assertEquals("json", noPropertiesStep.get("options").get("dataFormat").asText());
assertEquals("/entities/Employee/harmonize/NoProperties/main.sjs", noPropertiesStep.get("options").get("mainModuleUri").asText());
assertEquals("/entities/Employee/harmonize/NoProperties/collector.sjs", noPropertiesStep.get("sourceModule").get("modulePath").asText());
}

private HubProjectImpl setUpProject(String sourceProjectName, String destProjectName) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"info" : {
"title" : "Employee",
"version" : "0.0.1"
},
"definitions" : {
"Employee" : {
"required" : [ ],
"rangeIndex" : [ ],
"wordLexicon" : [ ],
"properties" : { }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Create Content Plugin
*
* @param id - the identifier returned by the collector
* @param rawContent - the raw content being loaded.
* @param options - an object containing options. Options are sent from Java
*
* @return - your content
*/
function createContent(id, rawContent, options) {
return rawContent;
}

module.exports = {
createContent: createContent
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Create Headers Plugin
*
* @param id - the identifier returned by the collector
* @param content - the output of your content plugin
* @param options - an object containing options. Options are sent from Java
*
* @return - an object of headers
*/
function createHeaders(id, content, options) {
return {};
}

module.exports = {
createHeaders: createHeaders
};

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// dhf.sjs exposes helper functions to make your life easier
// See documentation at:
// https://marklogic.github.io/marklogic-data-hub/docs/server-side/
const dhf = require('/data-hub/4/dhf.sjs');

const contentPlugin = require('./content.sjs');
const headersPlugin = require('./headers.sjs');
const triplesPlugin = require('./triples.sjs');

/*
* Plugin Entry point
*
* @param id - the identifier returned by the collector
* @param rawContent - the raw content being loaded
* @param options - a map containing options. Options are sent from Java
*
*/
function main(id, rawContent, options) {
var contentContext = dhf.contentContext(rawContent);
var content = dhf.run(contentContext, function() {
return contentPlugin.createContent(id, rawContent, options);
});

var headerContext = dhf.headersContext(content);
var headers = dhf.run(headerContext, function() {
return headersPlugin.createHeaders(id, content, options);
});

var tripleContext = dhf.triplesContext(content, headers);
var triples = dhf.run(tripleContext, function() {
return triplesPlugin.createTriples(id, content, headers, options);
});

var envelope = dhf.makeEnvelope(content, headers, triples, options.dataFormat);

// log the final envelope as a trace
// only fires if tracing is enabled
dhf.logTrace(dhf.writerContext(envelope));

return envelope;
}

module.exports = {
main: main
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Create Triples Plugin
*
* @param id - the identifier returned by the collector
* @param content - the output of your content plugin
* @param headers - the output of your heaaders plugin
* @param options - an object containing options. Options are sent from Java
*
* @return - an array of triples
*/
function createTriples(id, content, headers, options) {
return [];
}

module.exports = {
createTriples: createTriples
};

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*~
* Writer Plugin
*
* @param id - the identifier returned by the collector
* @param envelope - the final envelope
* @param options - an object options. Options are sent from Java
*
* @return - nothing
*/
function write(id, envelope, options) {
xdmp.documentInsert(id, envelope, xdmp.defaultPermissions(), options.flow);
}

module.exports = write;
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.marklogic.gradle.task

import com.marklogic.hub.impl.HubConfigImpl
import org.gradle.api.tasks.Input
import org.gradle.api.tasks.Optional
import org.gradle.api.tasks.TaskAction
Expand Down Expand Up @@ -51,7 +52,8 @@ class UpdateLegacyFlowsTask extends HubTask {
}

println "start upgradeLegacyFlows task ."
int flowsUpdated = getHubProject().upgradeLegacyFlows(getFlowManager(), legacyEntities, legacyFlowTypes, legacyFlowNames)
HubConfigImpl config = (HubConfigImpl) getHubConfig()
int flowsUpdated = getHubProject().upgradeLegacyFlows(getFlowManager(), legacyEntities, legacyFlowTypes, legacyFlowNames, config.getStagingDbName(), config.getFinalDbName())
if(flowsUpdated == 0) {
println("No legacy Flows found in plugins/entities directory to upgrade")
}
Expand Down

0 comments on commit 974782e

Please sign in to comment.