Skip to content

Commit

Permalink
Merge pull request #2319 from jsonwan/github_perf/archive
Browse files Browse the repository at this point in the history
perf: 容器化环境支持配置单独的归档库 #2317
  • Loading branch information
wangyu096 authored Aug 8, 2023
2 parents 23481bc + c774fbd commit 5ff9d21
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.tencent.bk.job.backup.archive.JobExecuteArchiveManage;
import com.tencent.bk.job.backup.dao.ExecuteArchiveDAO;
import com.tencent.bk.job.backup.dao.impl.ExecuteArchiveDAOImpl;
import com.tencent.bk.job.backup.dao.impl.FileSourceTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.FileSourceTaskLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseFileAgentTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseScriptAgentTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskIpLogRecordDAO;
Expand Down Expand Up @@ -142,12 +142,12 @@ public GseTaskIpLogRecordDAO gseTaskIpLogRecordDAO(
return new GseTaskIpLogRecordDAO(context, archiveConfig);
}

@Bean(name = "fileSourceTaskRecordDAO")
public FileSourceTaskRecordDAO fileSourceTaskRecordDAO(
@Bean(name = "fileSourceTaskLogRecordDAO")
public FileSourceTaskLogRecordDAO fileSourceTaskLogRecordDAO(
@Qualifier("job-execute-dsl-context") DSLContext context,
ArchiveConfig archiveConfig) {
log.info("Init FileSourceTaskRecordDAO");
return new FileSourceTaskRecordDAO(context, archiveConfig);
log.info("Init FileSourceTaskLogRecordDAO");
return new FileSourceTaskLogRecordDAO(context, archiveConfig);
}

@Bean(name = "gseTaskRecordDAO")
Expand Down Expand Up @@ -218,6 +218,7 @@ public JobExecuteArchiveManage jobExecuteArchiveManage(
@Autowired(required = false) StepInstanceVariableRecordDAO stepInstanceVariableRecordDAO,
@Autowired(required = false) TaskInstanceVariableRecordDAO taskInstanceVariableRecordDAO,
@Autowired(required = false) OperationLogRecordDAO operationLogRecordDAO,
@Autowired(required = false) FileSourceTaskLogRecordDAO fileSourceTaskLogRecordDAO,
@Autowired(required = false) GseTaskLogRecordDAO gseTaskLogRecordDAO,
@Autowired(required = false) GseTaskIpLogRecordDAO gseTaskIpLogRecordDAO,
@Autowired(required = false) GseTaskRecordDAO gseTaskRecordDAO,
Expand All @@ -241,6 +242,7 @@ public JobExecuteArchiveManage jobExecuteArchiveManage(
stepInstanceVariableRecordDAO,
taskInstanceVariableRecordDAO,
operationLogRecordDAO,
fileSourceTaskLogRecordDAO,
gseTaskLogRecordDAO,
gseTaskIpLogRecordDAO,
gseTaskRecordDAO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.jooq.impl.DefaultConfiguration;
import org.jooq.impl.DefaultDSLContext;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
Expand Down Expand Up @@ -79,13 +80,15 @@ public ConnectionProvider executeConnectionProvider(@Qualifier("job-execute-sour
@Qualifier("job-execute-archive-source")
@Bean(name = "job-execute-archive-source")
@ConfigurationProperties(prefix = "spring.datasource.job-execute-archive")
@ConditionalOnExpression("${job.execute.archive.enabled:false}")
@ConditionalOnProperty("spring.datasource.job-execute-archive.jdbc-url")
public DataSource executeArchiveDataSource() {
return DataSourceBuilder.create().build();
}

@Qualifier("job-execute-archive-dsl-context")
@Bean(name = "job-execute-archive-dsl-context")
@ConditionalOnExpression("${job.execute.archive.enabled:false}")
@ConditionalOnProperty("spring.datasource.job-execute-archive.jdbc-url")
public DSLContext executeArchiveDslContext(
@Qualifier("job-execute-archive-jooq-conf") org.jooq.Configuration configuration) {
Expand All @@ -94,6 +97,7 @@ public DSLContext executeArchiveDslContext(

@Qualifier("job-execute-archive-jooq-conf")
@Bean(name = "job-execute-archive-jooq-conf")
@ConditionalOnExpression("${job.execute.archive.enabled:false}")
@ConditionalOnProperty("spring.datasource.job-execute-archive.jdbc-url")
public org.jooq.Configuration
executeArchiveJooqConf(@Qualifier("job-execute-archive-conn-provider") ConnectionProvider connectionProvider) {
Expand All @@ -102,6 +106,7 @@ public DSLContext executeArchiveDslContext(

@Qualifier("job-execute-archive-conn-provider")
@Bean(name = "job-execute-archive-conn-provider")
@ConditionalOnExpression("${job.execute.archive.enabled:false}")
@ConditionalOnProperty("spring.datasource.job-execute-archive.jdbc-url")
public ConnectionProvider executeArchiveConnectionProvider(
@Qualifier("job-execute-archive-source") DataSource dataSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.tencent.bk.job.backup.archive;

import com.tencent.bk.job.backup.archive.impl.FileSourceTaskLogArchivist;
import com.tencent.bk.job.backup.archive.impl.GseFileAgentTaskArchivist;
import com.tencent.bk.job.backup.archive.impl.GseScriptAgentTaskArchivist;
import com.tencent.bk.job.backup.archive.impl.GseTaskArchivist;
Expand All @@ -43,6 +44,7 @@
import com.tencent.bk.job.backup.config.ArchiveConfig;
import com.tencent.bk.job.backup.dao.ExecuteArchiveDAO;
import com.tencent.bk.job.backup.dao.ExecuteRecordDAO;
import com.tencent.bk.job.backup.dao.impl.FileSourceTaskLogRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseFileAgentTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseScriptAgentTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.GseTaskIpLogRecordDAO;
Expand Down Expand Up @@ -84,6 +86,7 @@ public class JobExecuteArchiveManage implements SmartLifecycle {
private final StepInstanceVariableRecordDAO stepInstanceVariableRecordDAO;
private final TaskInstanceVariableRecordDAO taskInstanceVariableRecordDAO;
private final OperationLogRecordDAO operationLogRecordDAO;
private final FileSourceTaskLogRecordDAO fileSourceTaskLogRecordDAO;
private final GseTaskLogRecordDAO gseTaskLogRecordDAO;
private final GseTaskIpLogRecordDAO gseTaskIpLogRecordDAO;
private final GseTaskRecordDAO gseTaskRecordDAO;
Expand All @@ -108,6 +111,7 @@ public JobExecuteArchiveManage(TaskInstanceRecordDAO taskInstanceRecordDAO,
StepInstanceVariableRecordDAO stepInstanceVariableRecordDAO,
TaskInstanceVariableRecordDAO taskInstanceVariableRecordDAO,
OperationLogRecordDAO operationLogRecordDAO,
FileSourceTaskLogRecordDAO fileSourceTaskLogRecordDAO,
GseTaskLogRecordDAO gseTaskLogRecordDAO,
GseTaskIpLogRecordDAO gseTaskIpLogRecordDAO,
GseTaskRecordDAO gseTaskRecordDAO,
Expand All @@ -132,6 +136,7 @@ public JobExecuteArchiveManage(TaskInstanceRecordDAO taskInstanceRecordDAO,
this.stepInstanceVariableRecordDAO = stepInstanceVariableRecordDAO;
this.taskInstanceVariableRecordDAO = taskInstanceVariableRecordDAO;
this.operationLogRecordDAO = operationLogRecordDAO;
this.fileSourceTaskLogRecordDAO = fileSourceTaskLogRecordDAO;
this.gseTaskLogRecordDAO = gseTaskLogRecordDAO;
this.gseTaskIpLogRecordDAO = gseTaskIpLogRecordDAO;
this.gseTaskRecordDAO = gseTaskRecordDAO;
Expand Down Expand Up @@ -257,7 +262,7 @@ private long getLastArchiveId(ExecuteRecordDAO<?> executeRecordDAO) {

private void archive(long maxNeedArchiveTaskInstanceId, long maxNeedArchiveStepInstanceId)
throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(16);
CountDownLatch countDownLatch = new CountDownLatch(17);
log.info("Submitting archive task...");

// task_instance
Expand All @@ -270,6 +275,8 @@ private void archive(long maxNeedArchiveTaskInstanceId, long maxNeedArchiveStepI
addStepInstanceFileArchiveTask(maxNeedArchiveStepInstanceId, countDownLatch);
// step_instance_script
addStepInstanceScriptArchiveTask(maxNeedArchiveStepInstanceId, countDownLatch);
// file_source_task_log
addFileSourceTaskLogArchiveTask(maxNeedArchiveStepInstanceId, countDownLatch);
// gse_task_log
addGseTaskLogArchiveTask(maxNeedArchiveStepInstanceId, countDownLatch);
// gse_task_ip_log
Expand Down Expand Up @@ -360,6 +367,18 @@ private void addStepInstanceScriptArchiveTask(Long maxNeedArchiveStepInstanceId,
.archive());
}

private void addFileSourceTaskLogArchiveTask(Long maxNeedArchiveStepInstanceId, CountDownLatch countDownLatch) {
archiveExecutor.execute(() ->
new FileSourceTaskLogArchivist(
fileSourceTaskLogRecordDAO,
executeArchiveDAO,
archiveProgressService,
archiveConfig,
maxNeedArchiveStepInstanceId,
countDownLatch)
.archive());
}

private void addGseTaskLogArchiveTask(Long maxNeedArchiveStepInstanceId, CountDownLatch countDownLatch) {
archiveExecutor.execute(() ->
new GseTaskLogArchivist(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.tencent.bk.job.backup.archive.AbstractArchivist;
import com.tencent.bk.job.backup.config.ArchiveConfig;
import com.tencent.bk.job.backup.dao.ExecuteArchiveDAO;
import com.tencent.bk.job.backup.dao.impl.FileSourceTaskRecordDAO;
import com.tencent.bk.job.backup.dao.impl.FileSourceTaskLogRecordDAO;
import com.tencent.bk.job.backup.service.ArchiveProgressService;
import org.jooq.generated.tables.records.FileSourceTaskLogRecord;

Expand All @@ -38,7 +38,7 @@
*/
public class FileSourceTaskLogArchivist extends AbstractArchivist<FileSourceTaskLogRecord> {

public FileSourceTaskLogArchivist(FileSourceTaskRecordDAO executeRecordDAO,
public FileSourceTaskLogArchivist(FileSourceTaskLogRecordDAO executeRecordDAO,
ExecuteArchiveDAO executeArchiveDAO,
ArchiveProgressService archiveProgressService,
ArchiveConfig archiveConfig,
Expand All @@ -50,6 +50,6 @@ public FileSourceTaskLogArchivist(FileSourceTaskRecordDAO executeRecordDAO,
archiveConfig,
maxNeedArchiveId,
countDownLatch);
this.deleteIdStepSize = 100_000;
this.deleteIdStepSize = 1_000;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
import org.jooq.generated.tables.records.FileSourceTaskLogRecord;

/**
* file_source_task DAO
* file_source_task_log DAO
*/
public class FileSourceTaskRecordDAO extends AbstractExecuteRecordDAO<FileSourceTaskLogRecord> {
public class FileSourceTaskLogRecordDAO extends AbstractExecuteRecordDAO<FileSourceTaskLogRecord> {

private static final FileSourceTaskLog TABLE = FileSourceTaskLog.FILE_SOURCE_TASK_LOG;

public FileSourceTaskRecordDAO(DSLContext context, ArchiveConfig archiveConfig) {
public FileSourceTaskLogRecordDAO(DSLContext context, ArchiveConfig archiveConfig) {
super(context, archiveConfig);
}

Expand Down
29 changes: 29 additions & 0 deletions support-files/kubernetes/charts/bk-job/VALUES_LOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
# chart values 更新日志
## 0.4.6
1.增加备份服务中的数据归档相关配置
```yaml
## job-backup备份服务配置
backupConfig:
## 数据归档配置
archive:
# 归档使用的MariaDB实例,若开启归档,必须配置该项内容
mariadb:
host: ""
port: ""
username: "job"
password: "job"
connection:
properties: ?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
# job-execute模块的归档配置
execute:
# 是否开启数据归档,默认不开启
enabled: false
# 归档任务运行的cron表达式,默认每天凌晨04:00
cron: 0 0 4 * * *
data:
# 热库中的数据保留时间(天)
keep_days: 30
delete:
# 是否删除热库中的过期老数据,默认不删除
enabled: false
```
## 0.4.5
1.增加 bkDomain 配置
Expand Down
8 changes: 8 additions & 0 deletions support-files/kubernetes/charts/bk-job/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -653,3 +653,11 @@ readinessProbe:
failureThreshold: 1
successThreshold: 1
{{- end -}}


{{/*
Return the Archive MariaDB secret name
*/}}
{{- define "job.archiveMariadb.secretName" -}}
{{ printf "%s-%s" (include "job.fullname" .) "archive-mariadb" }}
{{- end -}}
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,19 @@ data:
idle-timeout: 6000
poolName: "job-execute-db"
validationTimeout: 5000
{{- if .Values.backupConfig.archive.execute.enabled }}
job-execute-archive:
driver-class-name: {{ include "job.jdbcMysqlDriverClass" . }}
type: com.zaxxer.hikari.HikariDataSource
jdbc-url: {{ include "job.jdbcMysqlScheme" . }}://{{- include "job.mariadb.host" . }}:{{- include "job.mariadb.port" . }}/job_execute{{ include "job.mariadb.connection.properties" . }}
username: {{ include "job.mariadb.username" . }}
{{ if .Values.externalMariaDB.existingPasswordSecret }}
password: {{ .Values.externalMariaDB.existingPasswordKey | default "mariadb-password" | printf "${%s}" }}
{{- else -}}
password: ${mariadb-password}
{{- end }}
jdbc-url: {{ include "job.jdbcMysqlScheme" . }}://{{ .Values.backupConfig.archive.mariadb.host }}:{{ .Values.backupConfig.archive.mariadb.port }}/job_execute{{ .Values.backupConfig.archive.mariadb.connection.properties }}
username: {{ .Values.backupConfig.archive.mariadb.username }}
password: ${archive-mariadb-password}
maximum-pool-size: 10
minimum-idle: 2
idle-timeout: 6000
poolName: "job-execute-archive"
validationTimeout: 5000
{{- end }}
redis:
{{- include "job.redis.config" . | indent 8 }}
database: 0
Expand Down Expand Up @@ -108,10 +106,5 @@ data:
repo: {{ .Values.backupConfig.artifactory.repo }}
execute:
archive:
enabled: true
cron: 0 0 4 * * *
data:
keep_days: 30
delete:
enabled: true
{{- toYaml .Values.backupConfig.archive.execute | nindent 10 }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ spec:
- name: mariadb
secret:
secretName: {{ include "job.mariadb.secretName" . }}
{{- if .Values.backupConfig.archive.execute.enabled }}
- name: archive-mariadb
secret:
secretName: {{ include "job.archiveMariadb.secretName" . }}
{{- end }}
- name: rabbitmq
secret:
secretName: {{ include "job.rabbitmq.secretName" . }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{{- if .Values.backupConfig.archive.execute.enabled }}
apiVersion: v1
kind: Secret
metadata:
name: {{ include "job.archiveMariadb.secretName" . }}
labels: {{- include "common.labels.standard" . | nindent 4 }}
{{- if .Values.commonLabels }}
{{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }}
{{- end }}
namespace: {{ .Release.Namespace }}
{{- if .Values.commonAnnotations }}
annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }}
{{- end }}
type: Opaque
data:
archive-mariadb-password: {{ .Values.backupConfig.archive.mariadb.password | b64enc | quote }}
{{- end }}
22 changes: 22 additions & 0 deletions support-files/kubernetes/charts/bk-job/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,28 @@ backupConfig:
artifactory:
# 存储后端使用制品库时使用的备份服务仓库代码
repo: backup
## 数据归档配置
archive:
# 归档使用的MariaDB实例,若开启归档,必须配置该项内容
mariadb:
host: ""
port: ""
username: "job"
password: "job"
connection:
properties: ?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
# job-execute模块的归档配置
execute:
# 是否开启数据归档,默认不开启
enabled: false
# 归档任务运行的cron表达式,默认每天凌晨04:00
cron: 0 0 4 * * *
data:
# 热库中的数据保留时间(天)
keep_days: 30
delete:
# 是否删除热库中的过期老数据,默认不删除
enabled: false
image:
registry: hub.bktencent.com
repository: blueking/job-backup
Expand Down

0 comments on commit 5ff9d21

Please sign in to comment.