Skip to content

Commit

Permalink
FINERACT-2194: Batch jobs arre failing in a multitenant environment
Browse files Browse the repository at this point in the history
  • Loading branch information
galovics committed Feb 27, 2025
1 parent 8c5606e commit a7629be
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.infrastructure.jobs;

import java.util.Objects;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public class TenantAwareEqualsHashCodeAdvice implements MethodInterceptor {

private final Object target;
private final String tenantIdentifier;

public TenantAwareEqualsHashCodeAdvice(Object target) {
this.target = target;
FineractPlatformTenant tenant = ThreadLocalContextUtil.getTenant();
this.tenantIdentifier = tenant != null ? tenant.getTenantIdentifier() : null;
}

@Nullable
@Override
public Object invoke(@NotNull MethodInvocation invocation) throws Throwable {
String methodName = invocation.getMethod().getName();

if (methodName.equals("equals") && invocation.getArguments().length == 1) {
Object other = invocation.getArguments()[0];
return Objects.equals(tenantIdentifier, ((TenantAwareEqualsHashCodeAdvice) target).tenantIdentifier) && target.equals(other);
}

if (methodName.equals("hashCode") && invocation.getArguments().length == 0) {
return Objects.hash(target.hashCode(), tenantIdentifier);
}

return invocation.proceed();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.springframework.batch.core.scope.context;

import org.apache.fineract.infrastructure.jobs.TenantAwareEqualsHashCodeAdvice;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.lang.Nullable;

// Temporary solution until spring-batch fixes the concurrency issue
// https://github.com/spring-projects/spring-batch/issues/4774
// Mostly copy from spring-batch
@SuppressWarnings({ "HideUtilityClassConstructor" })
public class StepSynchronizationManager {

private static final SynchronizationManagerSupport<StepExecution, StepContext> manager = new SynchronizationManagerSupport<>() {

@Override
protected StepContext createNewContext(StepExecution execution) {
return new StepContext(execution);
}

@Override
protected void close(StepContext context) {
context.close();
}
};

@Nullable
public static StepContext getContext() {
return manager.getContext();
}

public static StepContext register(StepExecution stepExecution) {
ProxyFactory factory = new ProxyFactory(stepExecution);
factory.setProxyTargetClass(true);
factory.addAdvice(new TenantAwareEqualsHashCodeAdvice(stepExecution));
return manager.register((StepExecution) factory.getProxy());
}

public static void close() {
manager.close();
}

public static void release() {
manager.release();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private void executeLoanCOBDayByDayUntilCOBBusinessDate(LocalDate oldestCOBProce
JobParameterDTO jobParameterCatchUpDTO = new JobParameterDTO(LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME, "true");
Set<JobParameterDTO> jobParameters = new HashSet<>();
Collections.addAll(jobParameters, jobParameterDTO, jobParameterCatchUpDTO);
jobStarter.run(job, scheduledJobDetail, jobParameters);
jobStarter.run(job, scheduledJobDetail, jobParameters, ThreadLocalContextUtil.getTenant().getTenantIdentifier());
executingBusinessDate = executingBusinessDate.plusDays(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private JobDetail createJobDetail(final ScheduledJobDetail scheduledJobDetail, S
jobDetailFactoryBean.setGroup(scheduledJobDetail.getGroupName());
jobDetailFactoryBean.setConcurrent(false);

jobDetailFactoryBean.setArguments(job, scheduledJobDetail, jobParameterDTOSet);
jobDetailFactoryBean.setArguments(job, scheduledJobDetail, jobParameterDTOSet, tenant.getTenantIdentifier());
jobDetailFactoryBean.afterPropertiesSet();
return jobDetailFactoryBean.getObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.fineract.infrastructure.jobs.service;

import java.time.LocalDate;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -27,11 +28,19 @@
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
import org.apache.fineract.infrastructure.businessdate.service.BusinessDateReadPlatformService;
import org.apache.fineract.infrastructure.core.domain.ActionContext;
import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.core.service.tenant.TenantDetailsService;
import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
import org.apache.fineract.infrastructure.jobs.domain.JobParameterRepository;
import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
import org.apache.fineract.infrastructure.jobs.service.jobname.JobNameService;
import org.apache.fineract.infrastructure.jobs.service.jobparameterprovider.JobParameterProvider;
import org.apache.fineract.useradministration.domain.AppUser;
import org.apache.fineract.useradministration.domain.AppUserRepositoryWrapper;
import org.quartz.JobExecutionException;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
Expand All @@ -45,6 +54,8 @@
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -57,24 +68,41 @@ public class JobStarter {
private final JobParameterRepository jobParameterRepository;
private final List<JobParameterProvider<?>> jobParameterProviders;
private final JobNameService jobNameService;
private final TenantDetailsService tenantDetailsService;
private final AppUserRepositoryWrapper userRepository;
private final BusinessDateReadPlatformService businessDateReadPlatformService;

public static final List<BatchStatus> FAILED_STATUSES = List.of(BatchStatus.FAILED, BatchStatus.ABANDONED, BatchStatus.STOPPED,
BatchStatus.STOPPING, BatchStatus.UNKNOWN);

public JobExecution run(Job job, ScheduledJobDetail scheduledJobDetail, Set<JobParameterDTO> jobParameterDTOSet)
throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException,
JobRestartException, JobExecutionException {
Map<String, JobParameter<?>> jobParameterMap = getJobParameter(scheduledJobDetail);
JobParameters jobParameters = new JobParametersBuilder(jobExplorer).getNextJobParameters(job)
.addJobParameters(new JobParameters(jobParameterMap))
.addJobParameters(new JobParameters(provideCustomJobParameters(
jobNameService.getJobByHumanReadableName(scheduledJobDetail.getJobName()).getEnumStyleName(), jobParameterDTOSet)))
.toJobParameters();
JobExecution result = jobLauncher.run(job, jobParameters);
if (FAILED_STATUSES.contains(result.getStatus())) {
throw new JobExecutionException(result.getExitStatus().toString());
public JobExecution run(Job job, ScheduledJobDetail scheduledJobDetail, Set<JobParameterDTO> jobParameterDTOSet,
String tenantIdentifier) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException,
JobParametersInvalidException, JobRestartException, JobExecutionException {
try {
FineractPlatformTenant tenant = tenantDetailsService.loadTenantById(tenantIdentifier);
ThreadLocalContextUtil.setTenant(tenant);
AppUser user = this.userRepository.fetchSystemUser();
UsernamePasswordAuthenticationToken auth = new UsernamePasswordAuthenticationToken(user, user.getPassword(),
user.getAuthorities());
SecurityContextHolder.getContext().setAuthentication(auth);
HashMap<BusinessDateType, LocalDate> businessDates = businessDateReadPlatformService.getBusinessDates();
ThreadLocalContextUtil.setActionContext(ActionContext.DEFAULT);
ThreadLocalContextUtil.setBusinessDates(businessDates);
Map<String, JobParameter<?>> jobParameterMap = getJobParameter(scheduledJobDetail);
JobParameters jobParameters = new JobParametersBuilder(jobExplorer).getNextJobParameters(job)
.addJobParameters(new JobParameters(jobParameterMap))
.addJobParameters(new JobParameters(provideCustomJobParameters(
jobNameService.getJobByHumanReadableName(scheduledJobDetail.getJobName()).getEnumStyleName(),
jobParameterDTOSet)))
.toJobParameters();
JobExecution result = jobLauncher.run(job, jobParameters);
if (FAILED_STATUSES.contains(result.getStatus())) {
throw new JobExecutionException(result.getExitStatus().toString());
}
return result;
} finally {
ThreadLocalContextUtil.reset();
}
return result;
}

protected Map<String, org.springframework.batch.core.JobParameter<?>> getJobParameter(ScheduledJobDetail scheduledJobDetail) {
Expand Down
Loading

0 comments on commit a7629be

Please sign in to comment.