Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch: ✨ Batch Job For Write Back & Write Allocate Strategy (This is Last Piece πŸ€—) #194

Merged
merged 22 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
63b4b39
feat: add key_value record in batch module
psychology50 Nov 7, 2024
264f41a
feat: impl last_message_id reader
psychology50 Nov 7, 2024
47a6f9c
feat: impl last_message_id processor
psychology50 Nov 7, 2024
fd3b124
feat: impl last_message_id writer
psychology50 Nov 7, 2024
3d28975
fix: convert prefix_pattern from member various to static
psychology50 Nov 7, 2024
e387351
feat: last_message_id_job_config setting
psychology50 Nov 7, 2024
136532a
chore: batch module test log setting
psychology50 Nov 7, 2024
2d04359
refactor: remove state from the reader
psychology50 Nov 7, 2024
0bae8d9
test: last_message_id job batch test
psychology50 Nov 7, 2024
803668a
chore: add testcontainer dependency within batch module
psychology50 Nov 7, 2024
1ea65aa
chore: batch integration test setting
psychology50 Nov 7, 2024
c4fa229
chore: add sql script in batch test package due to init job instance …
psychology50 Nov 7, 2024
4e3dc2e
fix: sperate cursor bean from job_config to batch_redis_config
psychology50 Nov 7, 2024
54453a8
chore: insert init sql into batch mysql testcontainer config
psychology50 Nov 7, 2024
6e4f73c
fix: add @spring_batch_test in @batch_integration_test
psychology50 Nov 7, 2024
731b191
rename: add logging in reader, processor, writer
psychology50 Nov 7, 2024
d7b3362
chore: convert reader component to bean in job config
psychology50 Nov 7, 2024
fe7017f
fix: add try-catch about number_format_exception in processor
psychology50 Nov 7, 2024
53f6355
feat: chat_message_statue entity overrid to_string
psychology50 Nov 7, 2024
fb658ac
test: last_message_id integration test
psychology50 Nov 7, 2024
366a1d7
test: add integartion test case for large data
psychology50 Nov 7, 2024
3a38547
chore: last_message_id job scheuling
psychology50 Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pennyway-batch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,12 @@ dependencies {

implementation 'org.springframework.boot:spring-boot-starter-batch:3.3.0'
testImplementation('org.springframework.batch:spring-batch-test:5.1.2')

/* testcontainer */
testImplementation "org.junit.jupiter:junit-jupiter:5.8.1"
testImplementation "org.testcontainers:testcontainers:1.19.7"
testImplementation "org.testcontainers:junit-jupiter:1.19.7"
testImplementation "org.testcontainers:mysql:1.19.7"
testImplementation "com.redis.testcontainers:testcontainers-redis-junit:1.6.4"
testImplementation "org.springframework.cloud:spring-cloud-contract-wiremock:4.1.2"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package kr.co.pennyway.batch.common.dto;

public record KeyValue(
String key,
String value
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package kr.co.pennyway.batch.job;

import kr.co.pennyway.batch.common.dto.KeyValue;
import kr.co.pennyway.batch.processor.LastMessageIdProcessor;
import kr.co.pennyway.batch.reader.LastMessageIdReader;
import kr.co.pennyway.batch.writer.LastMessageIdWriter;
import kr.co.pennyway.domain.domains.chatstatus.domain.ChatMessageStatus;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@RequiredArgsConstructor
public class LastMessageIdJobConfig {
private static final int CHUNK_SIZE = 1000;
private static final String PREFIX_PATTERN = "chat:last_read:*";
private final JobRepository jobRepository;
private final LastMessageIdProcessor processor;
private final LastMessageIdWriter writer;
private final RedisTemplate<String, String> redisTemplate;

@Bean
public Job lastMessageIdJob(PlatformTransactionManager transactionManager) {
return new JobBuilder("lastMessageIdJob", jobRepository)
.start(lastMessageIdStep(transactionManager))
.on("FAILED")
.stopAndRestart(lastMessageIdStep(transactionManager))
.on("*")
.end()
.end()
.build();
}

@Bean
@JobScope
public Step lastMessageIdStep(PlatformTransactionManager transactionManager) {
return new StepBuilder("lastMessageIdStep", jobRepository)
.<KeyValue, ChatMessageStatus>chunk(CHUNK_SIZE, transactionManager)
.reader(lastMessageIdReader())
.processor(processor)
.writer(writer)
.build();
}

@Bean
@StepScope
public LastMessageIdReader lastMessageIdReader() {
ScanOptions options = ScanOptions.scanOptions().match(PREFIX_PATTERN).count(CHUNK_SIZE).build();
Cursor<String> cursor = redisTemplate.scan(options);
return new LastMessageIdReader(redisTemplate, cursor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package kr.co.pennyway.batch.processor;

import kr.co.pennyway.batch.common.dto.KeyValue;
import kr.co.pennyway.domain.domains.chatstatus.domain.ChatMessageStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class LastMessageIdProcessor implements ItemProcessor<KeyValue, ChatMessageStatus> {

@Override
public ChatMessageStatus process(KeyValue item) throws Exception {
log.debug("Processing item - key: {}, value: {}", item.key(), item.value());

String[] parts = item.key().split(":");

if (parts.length != 4) {
log.error("Invalid key format: {}", item.key());
return null;
}

try {
Long roomId = Long.parseLong(parts[2]);
Long userId = Long.parseLong(parts[3]);
Long messageId = Long.parseLong(item.value());
log.debug("Parsed roomId: {}, userId: {}, messageId: {}", roomId, userId, messageId);

return new ChatMessageStatus(userId, roomId, messageId);
} catch (NoSuchFieldError | NumberFormatException e) {
log.error("Failed to parse key: {}", item.key(), e);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package kr.co.pennyway.batch.reader;

import kr.co.pennyway.batch.common.dto.KeyValue;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisTemplate;

@Slf4j
@RequiredArgsConstructor
public class LastMessageIdReader implements ItemReader<KeyValue> {
private final RedisTemplate<String, String> redisTemplate;
private final Cursor<String> cursor;

@Override
public KeyValue read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (!cursor.hasNext()) {
log.debug("No more keys to read cursor: {}", cursor);
return null;
}

String key = cursor.next();
String value = redisTemplate.opsForValue().get(key);
log.debug("Read key: {}, value: {}", key, value);

if (value == null) {
log.warn("Value not found for key: {}", key);
return null;
}

return new KeyValue(key, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@RequiredArgsConstructor
public class SpendingNotifyScheduler {
private final JobLauncher jobLauncher;
private final Job dailyNotificationJob;
private final Job monthlyNotificationJob;
private final Job lastMessageIdJob;

@Scheduled(cron = "0 0 20 * * ?")
public void runDailyNotificationJob() {
Expand Down Expand Up @@ -48,4 +51,18 @@ public void runMonthlyNotificationJob() {
log.error("Failed to run monthlyNotificationJob", e);
}
}

@Scheduled(fixedRate = 30, timeUnit = TimeUnit.MINUTES)
public void runLastMessageIdJob() {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();

try {
jobLauncher.run(lastMessageIdJob, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error("Failed to run lastMessageIdJob", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package kr.co.pennyway.batch.writer;

import kr.co.pennyway.domain.domains.chatstatus.domain.ChatMessageStatus;
import kr.co.pennyway.domain.domains.chatstatus.repository.ChatMessageStatusRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.stream.Collectors;

@Slf4j
@Component
@RequiredArgsConstructor
public class LastMessageIdWriter implements ItemWriter<ChatMessageStatus> {
private final ChatMessageStatusRepository repository;

@Override
public void write(Chunk<? extends ChatMessageStatus> chunk) throws Exception {
log.debug("Writing chunk size: {}", chunk.getItems().size());

Map<Long, Map<Long, Long>> updates = chunk.getItems().stream()
.collect(
Collectors.groupingBy(
ChatMessageStatus::getUserId,
Collectors.toMap(
ChatMessageStatus::getChatRoomId,
ChatMessageStatus::getLastReadMessageId,
Long::max
)
)
);
log.debug("Grouped updates: {}", updates);

updates.forEach((userId, roomUpdates) ->
roomUpdates.forEach((roomId, messageId) -> {
log.debug("Saving - userId: {}, roomId: {}, messageId: {}", userId, roomId, messageId);
repository.saveLastReadMessageIdInBulk(userId, roomId, messageId);
})
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package kr.co.pennyway.batch.config;

import com.redis.testcontainers.RedisContainer;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@ActiveProfiles("test")
public abstract class BatchDBTestConfig {
private static final String REDIS_CONTAINER_IMAGE = "redis:7.4";
private static final String MYSQL_CONTAINER_IMAGE = "mysql:8.0.26";

private static final RedisContainer REDIS_CONTAINER;
private static final MySQLContainer<?> MYSQL_CONTAINER;

static {
REDIS_CONTAINER =
new RedisContainer(DockerImageName.parse(REDIS_CONTAINER_IMAGE))
.withExposedPorts(6379)
.withCommand("redis-server", "--requirepass testpass")
.withReuse(true);
MYSQL_CONTAINER =
new MySQLContainer<>(DockerImageName.parse(MYSQL_CONTAINER_IMAGE))
.withDatabaseName("pennyway")
.withUsername("root")
.withPassword("testpass")
.withCommand("--sql_mode=STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION")
.withInitScript("sql/schema-mysql.sql")
.withReuse(true);

REDIS_CONTAINER.start();
MYSQL_CONTAINER.start();
}

@DynamicPropertySource
public static void setRedisProperties(DynamicPropertyRegistry registry) {
registry.add("spring.data.redis.host", REDIS_CONTAINER::getHost);
registry.add("spring.data.redis.port", () -> String.valueOf(REDIS_CONTAINER.getMappedPort(6379)));
registry.add("spring.data.redis.password", () -> "testpass");
registry.add("spring.datasource.url", () -> String.format("jdbc:mysql://%s:%s/pennyway?serverTimezone=Asia/Seoul&characterEncoding=utf8&postfileSQL=true&logger=Slf4JLogger&rewriteBatchedStatements=true", MYSQL_CONTAINER.getHost(), MYSQL_CONTAINER.getMappedPort(3306)));
registry.add("spring.datasource.username", () -> "root");
registry.add("spring.datasource.password", () -> "testpass");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kr.co.pennyway.batch.config;

import org.springframework.lang.NonNull;
import org.springframework.test.context.ActiveProfilesResolver;

public class BatchIntegrationProfileResolver implements ActiveProfilesResolver {
@Override
@NonNull
public String[] resolve(@NonNull Class<?> testClass) {
return new String[]{"common", "infra", "domain"};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kr.co.pennyway.batch.config;

import org.springframework.batch.test.context.SpringBatchTest;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

import java.lang.annotation.*;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@SpringBatchTest
@SpringBootTest(classes = BatchIntegrationTestConfig.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ActiveProfiles(profiles = {"test"}, resolver = BatchIntegrationProfileResolver.class)
@Documented
public @interface BatchIntegrationTest {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package kr.co.pennyway.batch.config;

import kr.co.pennyway.PennywayBatchApplication;
import kr.co.pennyway.common.PennywayCommonApplication;
import kr.co.pennyway.domain.DomainPackageLocation;
import kr.co.pennyway.infra.PennywayInfraApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan(
basePackageClasses = {
PennywayBatchApplication.class,
PennywayInfraApplication.class,
DomainPackageLocation.class,
PennywayCommonApplication.class
}
)
public class BatchIntegrationTestConfig {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package kr.co.pennyway.batch.config;

import com.querydsl.jpa.impl.JPAQueryFactory;
import com.querydsl.sql.MySQLTemplates;
import com.querydsl.sql.SQLTemplates;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;

@TestConfiguration
public class TestJpaConfig {
@PersistenceContext
private EntityManager em;

@Bean
@ConditionalOnMissingBean
public JPAQueryFactory testJpaQueryFactory() {
return new JPAQueryFactory(em);
}

@Bean
@ConditionalOnMissingBean
public SQLTemplates testSqlTemplates() {
return new MySQLTemplates();
}

@Bean
@ConditionalOnMissingBean
public RedisTemplate<String, ?> testRedisTemplate() {
return null;
}
}
Loading
Loading