Skip to content

Commit

Permalink
FINERACT-2194: Batch jobs arre failing in a multitenant environment
Browse files Browse the repository at this point in the history
  • Loading branch information
galovics committed Feb 27, 2025
1 parent 8c5606e commit ad0674b
Show file tree
Hide file tree
Showing 11 changed files with 325 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.infrastructure.jobs;

import java.lang.reflect.Method;
import java.util.Objects;
import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.springframework.cglib.proxy.Factory;
import org.springframework.cglib.proxy.MethodInterceptor;
import org.springframework.cglib.proxy.MethodProxy;

public class TenantAwareEqualsHashCodeAdvice implements MethodInterceptor {

private final Object target;
private final String tenantIdentifier;

public TenantAwareEqualsHashCodeAdvice(Object target) {
this.target = target;
FineractPlatformTenant tenant = ThreadLocalContextUtil.getTenant();
this.tenantIdentifier = tenant != null ? tenant.getTenantIdentifier() : null;
}

@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
String methodName = method.getName();

if ("equals".equals(methodName) && args.length == 1) {
Object other = args[0];

if (other instanceof Factory) {

TenantAwareEqualsHashCodeAdvice otherProxy = (TenantAwareEqualsHashCodeAdvice) ((Factory) other).getCallback(0);
return Objects.equals(target, otherProxy.target) && Objects.equals(tenantIdentifier, otherProxy.tenantIdentifier);
}
return false;
}

if ("hashCode".equals(methodName) && args.length == 0) {
return Objects.hash(target.hashCode(), tenantIdentifier);
}

return proxy.invoke(target, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.springframework.batch.core.scope.context;

import org.apache.fineract.infrastructure.jobs.TenantAwareEqualsHashCodeAdvice;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.cglib.proxy.Enhancer;
import org.springframework.lang.Nullable;

// Temporary solution until spring-batch fixes the concurrency issue
// https://github.com/spring-projects/spring-batch/issues/4774
// Mostly copy from spring-batch
@SuppressWarnings({ "HideUtilityClassConstructor" })
public class JobSynchronizationManager {

private static final SynchronizationManagerSupport<JobExecution, JobContext> manager = new SynchronizationManagerSupport<>() {

@Override
protected JobContext createNewContext(JobExecution execution) {
return new JobContext(execution);
}

@Override
protected void close(JobContext context) {
context.close();
}
};

@Nullable
public static JobContext getContext() {
return manager.getContext();
}

public static JobContext register(JobExecution jobExecution) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(JobExecution.class);
enhancer.setCallback(new TenantAwareEqualsHashCodeAdvice(jobExecution));
return manager.register((JobExecution) enhancer.create(new Class[] { JobInstance.class, Long.class, JobParameters.class },
new Object[] { jobExecution.getJobInstance(), jobExecution.getId(), jobExecution.getJobParameters() }));
}

public static void close() {
manager.close();
}

public static void release() {
manager.release();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.springframework.batch.core.scope.context;

import org.apache.fineract.infrastructure.jobs.TenantAwareEqualsHashCodeAdvice;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.cglib.proxy.Enhancer;
import org.springframework.lang.Nullable;

// Temporary solution until spring-batch fixes the concurrency issue
// https://github.com/spring-projects/spring-batch/issues/4774
// Mostly copy from spring-batch
@SuppressWarnings({ "HideUtilityClassConstructor" })
public class StepSynchronizationManager {

private static final SynchronizationManagerSupport<StepExecution, StepContext> manager = new SynchronizationManagerSupport<>() {

@Override
protected StepContext createNewContext(StepExecution execution) {
return new StepContext(execution);
}

@Override
protected void close(StepContext context) {
context.close();
}
};

@Nullable
public static StepContext getContext() {
return manager.getContext();
}

public static StepContext register(StepExecution stepExecution) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(StepExecution.class);
enhancer.setCallback(new TenantAwareEqualsHashCodeAdvice(stepExecution));
return manager.register((StepExecution) enhancer.create(new Class[] { String.class, JobExecution.class },
new Object[] { stepExecution.getStepName(), stepExecution.getJobExecution() }));
}

public static void close() {
manager.close();
}

public static void release() {
manager.release();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void executeLoanCOBCatchUpAsync(FineractContext context) {
? loanIdAndLastClosedBusinessDate.get(0).getLastClosedBusinessDate()
: cobBusinessDate;
if (DateUtils.isBefore(oldestCOBProcessedDate, cobBusinessDate)) {
executeLoanCOBDayByDayUntilCOBBusinessDate(oldestCOBProcessedDate, cobBusinessDate);
executeLoanCOBDayByDayUntilCOBBusinessDate(oldestCOBProcessedDate, cobBusinessDate, context);
}
} catch (NoSuchJobException e) {
// Throwing an error here is useless as it will be swallowed hence it is async method
Expand All @@ -94,19 +94,21 @@ public void executeLoanCOBCatchUpAsync(FineractContext context) {
}
}

private void executeLoanCOBDayByDayUntilCOBBusinessDate(LocalDate oldestCOBProcessedDate, LocalDate cobBusinessDate)
throws NoSuchJobException, JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException,
private void executeLoanCOBDayByDayUntilCOBBusinessDate(LocalDate oldestCOBProcessedDate, LocalDate cobBusinessDate,
FineractContext context) throws NoSuchJobException, JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException,
JobParametersInvalidException, JobRestartException, JobExecutionException {
Job job = jobLocator.getJob(LoanCOBConstant.JOB_NAME);
ScheduledJobDetail scheduledJobDetail = scheduledJobDetailRepository.findByJobName(LoanCOBConstant.JOB_HUMAN_READABLE_NAME);
LocalDate executingBusinessDate = oldestCOBProcessedDate.plusDays(1);
while (!DateUtils.isAfter(executingBusinessDate, cobBusinessDate)) {
// Need to reinitialize the thread-local tenant info because after running the job, it resets the thread
ThreadLocalContextUtil.init(context);
JobParameterDTO jobParameterDTO = new JobParameterDTO(LoanCOBConstant.BUSINESS_DATE_PARAMETER_NAME,
executingBusinessDate.format(DateTimeFormatter.ISO_DATE));
JobParameterDTO jobParameterCatchUpDTO = new JobParameterDTO(LoanCOBConstant.IS_CATCH_UP_PARAMETER_NAME, "true");
Set<JobParameterDTO> jobParameters = new HashSet<>();
Collections.addAll(jobParameters, jobParameterDTO, jobParameterCatchUpDTO);
jobStarter.run(job, scheduledJobDetail, jobParameters);
jobStarter.run(job, scheduledJobDetail, jobParameters, ThreadLocalContextUtil.getTenant().getTenantIdentifier());
executingBusinessDate = executingBusinessDate.plusDays(1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandle
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {

log.error("Exception message - {}", throwable.getMessage());
log.error("Exception", throwable);
log.error("Method name - {}", method.getName());
for (Object param : obj) {
log.error("Parameter value - {}", param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private JobDetail createJobDetail(final ScheduledJobDetail scheduledJobDetail, S
jobDetailFactoryBean.setGroup(scheduledJobDetail.getGroupName());
jobDetailFactoryBean.setConcurrent(false);

jobDetailFactoryBean.setArguments(job, scheduledJobDetail, jobParameterDTOSet);
jobDetailFactoryBean.setArguments(job, scheduledJobDetail, jobParameterDTOSet, tenant.getTenantIdentifier());
jobDetailFactoryBean.afterPropertiesSet();
return jobDetailFactoryBean.getObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.fineract.infrastructure.jobs.service;

import java.time.LocalDate;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -27,11 +28,19 @@
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.fineract.infrastructure.businessdate.domain.BusinessDateType;
import org.apache.fineract.infrastructure.businessdate.service.BusinessDateReadPlatformService;
import org.apache.fineract.infrastructure.core.domain.ActionContext;
import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.core.service.tenant.TenantDetailsService;
import org.apache.fineract.infrastructure.jobs.data.JobParameterDTO;
import org.apache.fineract.infrastructure.jobs.domain.JobParameterRepository;
import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
import org.apache.fineract.infrastructure.jobs.service.jobname.JobNameService;
import org.apache.fineract.infrastructure.jobs.service.jobparameterprovider.JobParameterProvider;
import org.apache.fineract.useradministration.domain.AppUser;
import org.apache.fineract.useradministration.domain.AppUserRepositoryWrapper;
import org.quartz.JobExecutionException;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
Expand All @@ -45,6 +54,8 @@
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -57,24 +68,41 @@ public class JobStarter {
private final JobParameterRepository jobParameterRepository;
private final List<JobParameterProvider<?>> jobParameterProviders;
private final JobNameService jobNameService;
private final TenantDetailsService tenantDetailsService;
private final AppUserRepositoryWrapper userRepository;
private final BusinessDateReadPlatformService businessDateReadPlatformService;

public static final List<BatchStatus> FAILED_STATUSES = List.of(BatchStatus.FAILED, BatchStatus.ABANDONED, BatchStatus.STOPPED,
BatchStatus.STOPPING, BatchStatus.UNKNOWN);

public JobExecution run(Job job, ScheduledJobDetail scheduledJobDetail, Set<JobParameterDTO> jobParameterDTOSet)
throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException,
JobRestartException, JobExecutionException {
Map<String, JobParameter<?>> jobParameterMap = getJobParameter(scheduledJobDetail);
JobParameters jobParameters = new JobParametersBuilder(jobExplorer).getNextJobParameters(job)
.addJobParameters(new JobParameters(jobParameterMap))
.addJobParameters(new JobParameters(provideCustomJobParameters(
jobNameService.getJobByHumanReadableName(scheduledJobDetail.getJobName()).getEnumStyleName(), jobParameterDTOSet)))
.toJobParameters();
JobExecution result = jobLauncher.run(job, jobParameters);
if (FAILED_STATUSES.contains(result.getStatus())) {
throw new JobExecutionException(result.getExitStatus().toString());
public JobExecution run(Job job, ScheduledJobDetail scheduledJobDetail, Set<JobParameterDTO> jobParameterDTOSet,
String tenantIdentifier) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException,
JobParametersInvalidException, JobRestartException, JobExecutionException {
try {
FineractPlatformTenant tenant = tenantDetailsService.loadTenantById(tenantIdentifier);
ThreadLocalContextUtil.setTenant(tenant);
AppUser user = this.userRepository.fetchSystemUser();
UsernamePasswordAuthenticationToken auth = new UsernamePasswordAuthenticationToken(user, user.getPassword(),
user.getAuthorities());
SecurityContextHolder.getContext().setAuthentication(auth);
HashMap<BusinessDateType, LocalDate> businessDates = businessDateReadPlatformService.getBusinessDates();
ThreadLocalContextUtil.setActionContext(ActionContext.DEFAULT);
ThreadLocalContextUtil.setBusinessDates(businessDates);
Map<String, JobParameter<?>> jobParameterMap = getJobParameter(scheduledJobDetail);
JobParameters jobParameters = new JobParametersBuilder(jobExplorer).getNextJobParameters(job)
.addJobParameters(new JobParameters(jobParameterMap))
.addJobParameters(new JobParameters(provideCustomJobParameters(
jobNameService.getJobByHumanReadableName(scheduledJobDetail.getJobName()).getEnumStyleName(),
jobParameterDTOSet)))
.toJobParameters();
JobExecution result = jobLauncher.run(job, jobParameters);
if (FAILED_STATUSES.contains(result.getStatus())) {
throw new JobExecutionException(result.getExitStatus().toString());
}
return result;
} finally {
ThreadLocalContextUtil.reset();
}
return result;
}

protected Map<String, org.springframework.batch.core.JobParameter<?>> getJobParameter(ScheduledJobDetail scheduledJobDetail) {
Expand Down
Loading

0 comments on commit ad0674b

Please sign in to comment.