Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring Batch issue with MultiResourceItemWriter and ClassifierCompositeItemWriter as its returning the invalid count into the output file #4660

Open
javaHelper opened this issue Sep 10, 2024 · 7 comments
Labels
has: minimal-example Bug reports that provide a minimal complete reproducible example in: infrastructure type: bug

Comments

@javaHelper
Copy link

javaHelper commented Sep 10, 2024

I am using Spring Boot + Batch v2.7.1 in my project and looks like there is a bug when Reading from FlatFileItemReader using ClassifierCompositeItemWriter and MultiResourceItemWriter as itemCountLimitPerResource value doesn't works well and gives wrong responses.

I am reading csv file and splitting into multiple files having max records in every file should be 5 only, but the code which I developed giving me 7 values.

Code Uploaded here: https://github.com/javaHelper/bug-4660/tree/main/bug-4660

@EnableBatchProcessing
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class MultiResourceSplitApplication {

    public static void main(String[] args) {
        SpringApplication.run(MultiResourceSplitApplication.class, args);
    }
}
package com.example;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.builder.MultiResourceItemWriterBuilder;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.builder.ClassifierCompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.classify.Classifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;

@Configuration
public class MyJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public FlatFileItemReader<Employee> itemReader() {
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("empId", "firstName", "lastName", "role");

        DefaultLineMapper<Employee> employeeLineMapper = new DefaultLineMapper<>();
        employeeLineMapper.setLineTokenizer(tokenizer);
        employeeLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
        employeeLineMapper.afterPropertiesSet();

        return new FlatFileItemReaderBuilder<Employee>()
                .name("flatFileReader")
                .linesToSkip(1)
                .resource(new ClassPathResource("employee.csv"))
                .lineMapper(employeeLineMapper)
                .build();
    }

    @Bean
    public ClassifierCompositeItemWriter<Employee> classifierCompositeItemWriter() throws Exception {
        Classifier<Employee, ItemWriter<? super Employee>> classifier = new EmployeeClassifier(
                javaDeveloperItemWriter(), 
                pythonDeveloperItemWriter(), 
                cloudDeveloperItemWriter());
        
        return new ClassifierCompositeItemWriterBuilder<Employee>()
                .classifier(classifier)
                .build();
    }

    @Bean
    public ItemWriter<Employee> javaDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw1")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("javaDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("javaDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public ItemWriter<Employee> pythonDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw2")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("pythonDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("pythonDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public ItemWriter<Employee> cloudDeveloperItemWriter() {
        FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriterBuilder<Employee>()
                .lineAggregator(new PassThroughLineAggregator<>())
                .name("iw3")
                .build();

        return new MultiResourceItemWriterBuilder<Employee>()
                .name("cloudDeveloperItemWriter")
                .delegate(itemWriter)
                .resource(new FileSystemResource("cloudDeveloper-employee.csv"))
                .itemCountLimitPerResource(5)
                .resourceSuffixCreator(index -> "-" + index)
                .build();
    }

    @Bean
    public Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<Employee, Employee>chunk(3)
                .reader(itemReader())
                .writer(classifierCompositeItemWriter())
                .build();
    }

    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .start(step())
                .build();
    }
}
import org.springframework.batch.item.ItemWriter;
import org.springframework.classify.Classifier;

import lombok.Setter;


@Setter
public class EmployeeClassifier implements Classifier<Employee, ItemWriter<? super Employee>> {
    private static final long serialVersionUID = 1L;
    private ItemWriter<Employee> javaDeveloperFileItemWriter;
    private ItemWriter<Employee> pythonDeveloperFileItemWriter;
    private ItemWriter<Employee> cloudDeveloperFileItemWriter;
    
    public EmployeeClassifier() {
        
    }

    public EmployeeClassifier(ItemWriter<Employee> javaDeveloperFileItemWriter,
                              ItemWriter<Employee> pythonDeveloperFileItemWriter,
                              ItemWriter<Employee> cloudDeveloperFileItemWriter) {
        this.javaDeveloperFileItemWriter = javaDeveloperFileItemWriter;
        this.pythonDeveloperFileItemWriter = pythonDeveloperFileItemWriter;
        this.cloudDeveloperFileItemWriter = cloudDeveloperFileItemWriter;
    }

    @Override
    public ItemWriter<? super Employee> classify(Employee employee) {
        if(employee.getRole().equals("Java Developer")){
            return javaDeveloperFileItemWriter;
        }
        else if(employee.getRole().equals("Python Developer")){
            return pythonDeveloperFileItemWriter;
        }
        return cloudDeveloperFileItemWriter;
    }
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Employee {
    private String empId;
    private String firstName;
    private String lastName;
    private String role;

    @Override
    public String toString() {
        return empId + "," + firstName + "," + lastName + "," + role;
    }
}
public class EmployeeFieldSetMapper implements FieldSetMapper<Employee> {
    @Override
    public Employee mapFieldSet(FieldSet fieldSet) throws BindException {
        return Employee.builder()
                .empId(fieldSet.readRawString("empId"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .role(fieldSet.readRawString("role"))
                .build();
    }
}

employee.csv

empId,firstName,lastName,role
1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer
10,Ravi,Doe,Python Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer
16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer

Output: javaDeveloper-employee.csv-1

1,Mike ,Doe,Java Developer
2,Matt ,Doe,Java Developer
11,Gagan,Doe,Java Developer
12,Ashish ,Doe,Java Developer
13,Rajesh,Doe,Java Developer
14,Anosh ,Doe,Java Developer
15,Arpit ,Doe,Java Developer

javaDeveloper-employee.csv-2

16,Sneha ,Doe,Java Developer
17,Sneha ,Doe,Java Developer

pythonDeveloper-employee.csv-1

3,Deepak ,Doe,Python Developer
4,Neha ,Doe,Python Developer
5,Harish,Doe,Python Developer
6,Parag ,Doe,Python Developer
7,Harshita ,Doe,Python Developer
8,Pranali ,Doe,Python Developer
9,Raj ,Doe,Python Developer

pythonDeveloper-employee.csv-1

10,Ravi,Doe,Python Developer

When I used chunk(0) - its resulting into correct count in to the file, but when it comes to huge data, the performance is very very flow. Any suggestion

@javaHelper javaHelper added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Sep 10, 2024
@javaHelper
Copy link
Author

@fmbenhassine - Were you able to re-produce this issue?

@fmbenhassine
Copy link
Contributor

No, I did not start working on this yet. I would love to help if you can package all code snippets with a pom.xml in a zip or a repository that I can clone/run. There is a project template that you can use as a starting point here: https://github.com/spring-projects/spring-batch/blob/main/ISSUE_REPORTING.md

@javaHelper
Copy link
Author

@fmbenhassine - I've uploaded code here: https://github.com/javaHelper/bug-4660/tree/main/bug-4660. Could you please have a look?

@fmbenhassine
Copy link
Contributor

Great! Thank you for the sample. I will take a look and get back to you asap.

@fmbenhassine fmbenhassine added status: feedback-provided Issues for which the feedback requested from the reporter was provided and removed status: waiting-for-triage Issues that we did not analyse yet labels Sep 19, 2024
@fmbenhassine
Copy link
Contributor

Thank you for providing a minimal example, well done! I am able to reproduce the issue and I think this is a bug. In fact, the itemCountLimitPerResource is not respected in this case.

An interesting finding I noticed as well is that when I set the chunk size to 10, everything is written in the same file (ie the itemCountLimitPerResource is not respected neither).

I will plan the fix in a future patch release, because as of now we are working on the upcoming 5.2. If someone manages to fix the issue, then a PR is welcome.

@fmbenhassine fmbenhassine added in: infrastructure has: minimal-example Bug reports that provide a minimal complete reproducible example and removed status: feedback-provided Issues for which the feedback requested from the reporter was provided labels Sep 20, 2024
@hpoettker
Copy link
Contributor

I think the problem isn't related to the ClassifierCompositeItemWriter but a duplicate of #1722.

The effect of itemCountLimitPerResource is indeed surprising, so it might be a good idea to change it. But the current behaviour is documented in the JavaDoc of MultiResourceItemWriter: https://github.com/spring-projects/spring-batch/blob/main/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/file/MultiResourceItemWriter.java#L37

@javaHelper
Copy link
Author

@fmbenhassine - Thanks. Agree its a bug and its reproducible. When I tested in my project with the huge volume keeping 1/2 millions records as itemCountLimitPerResource, I surprisingly saw the different outputs.
Hope this will be fix soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
has: minimal-example Bug reports that provide a minimal complete reproducible example in: infrastructure type: bug
Projects
None yet
Development

No branches or pull requests

3 participants