Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding CombineInputFileFormat; only single use case so far #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.vividsolutions.jts.geom.Geometry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
Expand Down Expand Up @@ -257,7 +258,7 @@ protected Path getWorkOutputDirectoryPath() throws IOException {

Path appendDatePart(Path path) {
if (getConfiguration().getBoolean(JobConfigNames.CALVALUS_OUTPUT_PRESERVE_DATE_TREE, false)) {
String datePart = getDatePart(getInputPath());
String datePart = getDatePart(getInputPaths()[0]);
if (datePart != null) {
path = new Path(path, datePart);
}
Expand Down Expand Up @@ -329,13 +330,16 @@ public void setProcessingRectangle(Rectangle roiRectangle) {
*
* @return The path of the input product.
*/
public Path getInputPath() {
public Path[] getInputPaths() {
if (inputSplit instanceof ProductSplit) {
ProductSplit productSplit = (ProductSplit) inputSplit;
return productSplit.getPath();
return new Path[] {productSplit.getPath()};
} else if (inputSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) inputSplit;
return fileSplit.getPath();
return new Path[] {fileSplit.getPath()};
} else if (inputSplit instanceof CombineFileSplit) {
CombineFileSplit fileSplit = (CombineFileSplit) inputSplit;
return fileSplit.getPaths();
} else {
throw new IllegalArgumentException("input split is neither a FileSplit nor a ProductSplit");
}
Expand Down Expand Up @@ -434,8 +438,8 @@ private Product openInputProduct() throws IOException {
CalvalusProductIO.printProductOnStdout(product, "opened from local file");
return product;
} else {
LOG.info(String.format("openInputProduct: inputPath = %s inputFormat = %s", getInputPath(), inputFormat));
Product product = CalvalusProductIO.readProduct(getInputPath(), getConfiguration(), inputFormat);
LOG.info(String.format("openInputProduct: inputPath = %s inputFormat = %s", getInputPaths()[0], inputFormat));
Product product = CalvalusProductIO.readProduct(getInputPaths()[0], getConfiguration(), inputFormat);

if (inputSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) inputSplit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void run(Mapper.Context context) throws IOException, InterruptedException
try {
Product product = processorAdapter.getProcessedProduct(SubProgressMonitor.create(pm, 5));
if (product != null) {
final String inputFileName = processorAdapter.getInputPath().getName();
final String inputFileName = processorAdapter.getInputPaths()[0].getName();
final String productName = FileUtils.getFilenameWithoutExtension(inputFileName);
final Quicklooks.QLConfig[] configs = Quicklooks.get(context.getConfiguration());
for (Quicklooks.QLConfig config : configs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private boolean executeGraphAndCollectOutput(Graph graph, XppDom calvalusAppData

private boolean postprocessTargetProduct() throws IOException {
if (getConfiguration().getBoolean(JobConfigNames.CALVALUS_OUTPUT_SUBSETTING, false)) {
getLogger().info("output subsetting of split " + getInputPath());
getLogger().info("output subsetting of split " + getInputPaths());
targetProduct = createSubsetFromOutput(targetProduct);
}
if (targetProduct.getSceneRasterWidth() == 0 || targetProduct.getSceneRasterHeight() == 0) {
Expand All @@ -265,7 +265,7 @@ private Product getTargetProductFromGraph(Graph graph, String targetNodeId) {

private GraphContext buildGraphContext(Graph graph, Header header) throws IOException, GraphException {
List<HeaderSource> sources = header.getSources();
Path inputPath = getInputPath();
Path inputPath = getInputPaths()[0];
Path qualifiedInputPath = inputPath.getFileSystem(getConfiguration()).makeQualified(inputPath);
Operator sourceProducts = new SourceProductContainerOperator();
for (HeaderSource headerSource : sources) {
Expand Down Expand Up @@ -325,7 +325,7 @@ public void dispose() {
}

public Graph createGraph() throws GraphException, IOException {
Path inputPath = getInputPath();
Path inputPath = getInputPaths()[0];
CalvalusLogger.getLogger().info("Creating graph for input: " + inputPath);
Configuration conf = getConfiguration();
String processorParameters = conf.get(JobConfigNames.CALVALUS_L2_PARAMETERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected String getOutputProductFilename() {
return FileUtils.exchangeExtension(getInputParameters()[i + 1], ".seq");
}
}
String inputFilename = getInputPath().getName();
String inputFilename = getInputPaths()[0].getName();
return "L2_of_" + FileUtils.exchangeExtension(inputFilename, ".seq");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import com.bc.calvalus.processing.ProcessorAdapter;
import com.bc.calvalus.processing.beam.CalvalusProductIO;
import com.bc.calvalus.processing.beam.LandsatCalvalusReaderPlugin;
import com.bc.calvalus.processing.beam.PathConfiguration;
import com.bc.calvalus.processing.beam.Sentinel2CalvalusReaderPlugin;
import com.bc.calvalus.processing.beam.SnapGraphAdapter;
import com.bc.calvalus.processing.l2.ProductFormatter;
import com.bc.calvalus.processing.utils.ProductTransformation;
Expand Down Expand Up @@ -107,7 +105,7 @@ public void prepareProcessing() throws IOException {
velocityContext.put("parameterText", processorParameters);
velocityContext.put("parameters", PropertiesHandler.asProperties(processorParameters));

Path inputPath = getInputPath();
Path inputPath = getInputPaths()[0];
Path outputPath = getOutputDirectoryPath();
velocityContext.put("inputPath", inputPath);
velocityContext.put("outputPath", outputPath);
Expand Down Expand Up @@ -145,11 +143,13 @@ public boolean canSkipInputProduct() throws IOException {
@Override
public boolean processSourceProduct(MODE mode, ProgressMonitor pm) throws IOException {

Path inputPath = getInputPath();
Path[] inputPaths = getInputPaths();
File inputFile = getInputFile();
if (inputFile == null) {
for (Path inputPath : inputPaths) {
inputFile = CalvalusProductIO.copyFileToLocal(inputPath, getConfiguration());
setInputFile(inputFile);
if (inputFile == null) {
setInputFile(inputFile);
}
}
Comment on lines -150 to 153
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, it seems the logic has unintentionally changed. The test for null was located before the second assignment of the copyFileToLocal before, and will never be true now.

if (getMapContext().getInputSplit() instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) getMapContext().getInputSplit();
Expand All @@ -162,7 +162,7 @@ public boolean processSourceProduct(MODE mode, ProgressMonitor pm) throws IOExce
productRect = new Rectangle(inputProduct.getSceneRasterWidth(), inputProduct.getSceneRasterHeight());
}

outputFilesNames = processInput(pm, inputRectangle, inputPath, inputFile, productRect, null);
outputFilesNames = processInput(pm, inputRectangle, inputPaths[0], inputFile, productRect, null);
return outputFilesNames.length > 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void run(Context context) throws IOException, InterruptedException {
if (endUTC != null) {
endTime = dateFormat.format(endUTC.getAsDate());
}
String dbPath = getDBPath(processorAdapter.getInputPath(), context.getConfiguration());
String dbPath = getDBPath(processorAdapter.getInputPaths()[0], context.getConfiguration());

String result = startTime + "\t" + endTime + "\t" + wkt;
context.write(new Text(dbPath), new Text(result));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package com.bc.calvalus.processing.hadoop;

import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.CALVALUS_SOFTWARE_PATH;
import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.DEFAULT_CALVALUS_BUNDLE;
import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.DEFAULT_SNAP_BUNDLE;

import com.bc.calvalus.commons.AbstractWorkflowItem;
import com.bc.calvalus.commons.CalvalusLogger;
import com.bc.calvalus.commons.ProcessState;
Expand All @@ -46,6 +42,10 @@
import java.util.function.Consumer;
import java.util.logging.Level;

import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.CALVALUS_SOFTWARE_PATH;
import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.DEFAULT_CALVALUS_BUNDLE;
import static com.bc.calvalus.processing.hadoop.HadoopProcessingService.DEFAULT_SNAP_BUNDLE;

/**
* A workflow item that corresponds to a single Hadoop job.
*/
Expand Down Expand Up @@ -213,7 +213,7 @@ public void submit() throws WorkflowException {
CalvalusLogger.getLogger().info("Submitted Job with Id: " + jobId);
CalvalusLogger.getLogger().info("-------------------------------");
CalvalusLogger.getLogger().info("remoteUser=" + remoteUser.getShortUserName()
+ " mapreduce.job.user.name=" + job.getConfiguration().get("mapreduce.job.user.name"));
+ " mapreduce.job.user.name=" + job.getConfiguration().get("mapreduce.job.user.name"));
HashMap<String, String> calvalusConfMap = new HashMap<>();
for (Map.Entry<String, String> keyValue : job.getConfiguration()) {
if (keyValue.getKey().startsWith("calvalus")) {
Expand All @@ -230,7 +230,7 @@ public void accept(Map.Entry<String, String> keyValue) {
}
});
CalvalusLogger.getLogger().info("-------------------------------");

setJobId(jobId);
return job;
});
Expand Down Expand Up @@ -287,13 +287,20 @@ protected JobID submitJob(Job job) throws IOException {
protected Class<? extends InputFormat> getInputFormatClass(Configuration conf) throws IOException {
if (conf.get(JobConfigNames.CALVALUS_INPUT_TABLE) != null) {
return TableInputFormat.class;
} else if (conf.get(JobConfigNames.CALVALUS_INPUT_GEO_INVENTORY) != null || conf.get(JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS) != null) {
} else if (conf.get(JobConfigNames.CALVALUS_INPUT_GEO_INVENTORY) != null || conf.get(JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS) != null && conf.get("mapreduce.job.inputformat.class") == null) {
return PatternBasedInputFormat.class;
} else if (conf.get("mapreduce.job.inputformat.class") != null) {
String classname = conf.get("mapreduce.job.inputformat.class");
try {
return (Class<? extends InputFormat>) Class.forName(classname);
} catch (ClassNotFoundException e) {
throw new IOException("Unable to create class '" + classname + "'", e);
}
} else {
throw new IOException(String.format("Missing job parameter for inputFormat. Neither %s nor %s nor %s had been set.",
JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS,
JobConfigNames.CALVALUS_INPUT_TABLE,
JobConfigNames.CALVALUS_INPUT_GEO_INVENTORY));
JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS,
JobConfigNames.CALVALUS_INPUT_TABLE,
JobConfigNames.CALVALUS_INPUT_GEO_INVENTORY));
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.bc.calvalus.processing.l2;

import com.bc.calvalus.JobClientsMap;
import com.bc.calvalus.commons.CalvalusLogger;
import com.bc.calvalus.commons.InputPathResolver;
import com.bc.calvalus.inventory.hadoop.FileSystemPathIterator;
import com.bc.calvalus.inventory.hadoop.HdfsFileSystemService;
import com.bc.calvalus.processing.JobConfigNames;
import com.bc.calvalus.processing.geodb.GeodbScanMapper;
import com.bc.calvalus.processing.hadoop.NoRecordReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
* @author thomas
*/
public class CombineFileInputFormat extends InputFormat {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a relation to org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat ?


/**
* Creates a single split from a given pattern
*/
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about our other methods to determine inputs, in particular those using the geo-inventory? I know that PatternBasedInputFormat needs refactoring and decomposition but I think the other ways to determine inputs are required.
Thinking of how to refactor PatternBasedInputFormat it may be good to distinguish the way the inputs shall be determined (geo-inventory, opensearch query, path pattern, ...) by different classes as they have different parameters anyway, and whichever parameter is specified the client could automatically select the right class. Then, we could either derive a class for CombineFileSplit generation from each of them, or we make this a parameter. In any case, the old PatternBasedInputFormat could delegate the getSplits() call to the new implementations to keep backwards compatibility.

Configuration conf = context.getConfiguration();
String inputPathPattern = conf.get(JobConfigNames.CALVALUS_INPUT_PATH_PATTERNS);

List<InputSplit> splits = new ArrayList<>(1);
JobClientsMap jobClientsMap = new JobClientsMap(new JobConf(conf));
HdfsFileSystemService hdfsFileSystemService = new HdfsFileSystemService(jobClientsMap);
List<String> inputPatterns = new InputPathResolver().resolve(inputPathPattern);
RemoteIterator<LocatedFileStatus> fileStatusIt = getFileStatuses(hdfsFileSystemService, inputPatterns, conf, null);
addSplit(fileStatusIt, splits);
CalvalusLogger.getLogger().info(String.format("Created %d split(s).", splits.size()));
return splits;
}

private void addSplit(RemoteIterator<LocatedFileStatus> fileStatuses, List<InputSplit> splits) throws IOException {
List<Path> filePaths = new ArrayList<>();
List<Long> fileLengths = new ArrayList<>();
while (fileStatuses.hasNext()) {
LocatedFileStatus fileStatus = fileStatuses.next();
Path path = fileStatus.getPath();
filePaths.add(path);
fileLengths.add(fileStatus.getLen());
}
CombineFileSplit combineFileSplit = new CombineFileSplit(filePaths.toArray(new Path[filePaths.size()]),
fileLengths.stream().mapToLong(Long::longValue).toArray());
splits.add(combineFileSplit);
}


protected RemoteIterator<LocatedFileStatus> getFileStatuses(HdfsFileSystemService fileSystemService,
List<String> inputPatterns,
Configuration conf,
Set<String> existingPathes) throws IOException {
FileSystemPathIterator.FileStatusFilter extraFilter = null;
if (existingPathes != null && existingPathes.size() > 0) {
extraFilter = fileStatus -> {
String dbPath = GeodbScanMapper.getDBPath(fileStatus.getPath(), conf);
return !existingPathes.contains(dbPath);
};
}
return fileSystemService.globFileStatusIterator(inputPatterns, conf, extraFilter);
}

public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) {
return new NoRecordReader();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void run(Mapper.Context context) throws IOException, InterruptedException
pm.beginTask("Level 2 format", 100 + 20);
try {
Configuration jobConfig = context.getConfiguration();
Path inputPath = processorAdapter.getInputPath();
Path inputPath = processorAdapter.getInputPaths()[0];
String productName = getProductName(jobConfig, inputPath.getName());

String format = jobConfig.get(JobConfigNames.CALVALUS_OUTPUT_FORMAT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void run(Context context) throws IOException, InterruptedException {
Configuration jobConfig = context.getConfiguration();
ProcessorAdapter processorAdapter = ProcessorFactory.createAdapter(context);
ProgressMonitor pm = new ProgressSplitProgressMonitor(context);
LOG.info("processing input " + processorAdapter.getInputPath() + " ...");
LOG.info("processing input " + processorAdapter.getInputPaths() + " ...");
final int progressForProcessing = processorAdapter.supportsPullProcessing() ? 5 : 95;
final int progressForSaving = processorAdapter.supportsPullProcessing() ? 95 : 5;
pm.beginTask("Level 2 processing", progressForProcessing + progressForSaving);
Expand Down Expand Up @@ -114,7 +114,7 @@ public void run(Context context) throws IOException, InterruptedException {

if (jobConfig.get(JobConfigNames.CALVALUS_METADATA_TEMPLATE) != null) {
processMetadata(context,
processorAdapter.getInputPath().toString(),
processorAdapter.getInputPaths().toString(),
processorAdapter.getInputProduct(),
processorAdapter.getOutputProductPath().toString(),
processorAdapter.openProcessedProduct());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void run(Context context) throws IOException, InterruptedException {
final String outputCompression = jobConfig.get(JobConfigNames.OUTPUT_COMPRESSION);

final ProcessorAdapter processorAdapter = ProcessorFactory.createAdapter(context);
String inputName = processorAdapter.getInputPath().getName();
String inputName = processorAdapter.getInputPaths()[0].getName();
String productName = null;
if (processorAdapter.getInputParameters() != null) {
for (int i = 0; i < processorAdapter.getInputParameters().length; i += 2) {
Expand All @@ -195,7 +195,7 @@ public void run(Context context) throws IOException, InterruptedException {
if (! "MTD_MSIL1C.xml".equals(inputName)) { // TODO
productName = getProductName(jobConfig, inputName);
} else {
productName = getProductName(jobConfig, processorAdapter.getInputPath().getParent().getName());
productName = getProductName(jobConfig, processorAdapter.getInputPaths()[0].getParent().getName());
}
}
final ProductFormatter productFormatter = outputFormat != null ? new ProductFormatter(productName, outputFormat, outputCompression) : null;
Expand All @@ -204,7 +204,7 @@ public void run(Context context) throws IOException, InterruptedException {
final int progressForProcessing = processorAdapter.supportsPullProcessing() ? 5 : 95;
final int progressForSaving = processorAdapter.supportsPullProcessing() ? 95 : 5;

LOG.info("processing input " + processorAdapter.getInputPath() + " ...");
LOG.info("processing input " + processorAdapter.getInputPaths() + " ...");
pm.beginTask("Level 2 processing", progressForProcessing + progressForSaving);

try {
Expand Down Expand Up @@ -244,7 +244,7 @@ public void run(Context context) throws IOException, InterruptedException {
if (jobConfig.get(JobConfigNames.METADATA_TEMPLATE) != null) {
context.setStatus("Metadata");
processMetadata(context,
processorAdapter.getInputPath().toString(),
processorAdapter.getInputPaths().toString(),
processorAdapter.getInputProduct(),
processorAdapter.getOutputProductPath().toString(),
targetProduct);
Expand Down
Loading