Skip to content

Commit

Permalink
fix cloudProvider not supply correct
Browse files Browse the repository at this point in the history
  • Loading branch information
LioRoger committed Jan 24, 2025
1 parent 80baae5 commit 3614d9a
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,14 @@ public static Map<String, String> buildK8sEnv(JobContext context) {
}

public static JobCaller buildK8sJobCaller(PodConfig podConfig, JobContext context,
ResourceManager resourceManager) {
ResourceManager resourceManager, String resourceType) {
Map<String, String> environments = buildK8sEnv(context);
// common environment variables
environments.put(JobEnvKeyConstants.ODC_LOG_DIRECTORY, podConfig.getMountPath());
// do encryption for sensitive information
JobUtils.encryptEnvironments(environments);

podConfig.setEnvironments(environments);
return new K8sJobCaller(podConfig, resourceManager);
return new K8sJobCaller(podConfig, resourceManager, resourceType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.oceanbase.odc.service.resource.ResourceState;
import com.oceanbase.odc.service.resource.ResourceWithID;
import com.oceanbase.odc.service.task.exception.JobException;
import com.oceanbase.odc.service.task.resource.DefaultResourceOperatorBuilder;
import com.oceanbase.odc.service.task.resource.K8sPodResource;
import com.oceanbase.odc.service.task.resource.K8sResourceContext;
import com.oceanbase.odc.service.task.resource.PodConfig;
Expand All @@ -46,10 +45,12 @@ public class K8sJobCaller extends BaseJobCaller {
*/
private final PodConfig defaultPodConfig;
private final ResourceManager resourceManager;
private final String resourceType;

public K8sJobCaller(PodConfig podConfig, ResourceManager resourceManager) {
public K8sJobCaller(PodConfig podConfig, ResourceManager resourceManager, String resourceType) {
this.defaultPodConfig = podConfig;
this.resourceManager = resourceManager;
this.resourceType = resourceType;
}

@Override
Expand All @@ -73,7 +74,7 @@ protected K8sResourceContext buildK8sResourceContext(JobContext context, Resourc
String jobName = JobUtils.generateExecutorName(context.getJobIdentity());
return new K8sResourceContext(defaultPodConfig, jobName, resourceLocation.getRegion(),
resourceLocation.getGroup(),
DefaultResourceOperatorBuilder.CLOUD_K8S_POD_TYPE, context);
resourceType, context);
}

protected ResourceLocation buildResourceLocation(JobContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.oceanbase.odc.service.resource.ResourceLocation;
import com.oceanbase.odc.service.task.config.K8sProperties;
import com.oceanbase.odc.service.task.config.TaskFrameworkProperties;
import com.oceanbase.odc.service.task.resource.DefaultResourceOperatorBuilder;
import com.oceanbase.odc.service.task.resource.AbstractK8sResourceOperatorBuilder;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -78,7 +78,7 @@ public static ResourceID getResourceID(ExecutorIdentifier executorIdentifier,
String region = checkAndGetJobProperties(jobProperties, REGION_PROP_NAME, DEFAULT_PROP_VALUE);
String group = checkAndGetJobProperties(jobProperties, GROUP_PROP_NAME, DEFAULT_PROP_VALUE);
String type = checkAndGetJobProperties(jobProperties, RESOURCE_TYPE_PROP_NAME,
DefaultResourceOperatorBuilder.CLOUD_K8S_POD_TYPE);
AbstractK8sResourceOperatorBuilder.CLOUD_K8S_POD_TYPE);
// if namespace has saved, that new logic, use saved namespace that should equals to
// podConfig.namespace
String savedNamespace = checkAndGetJobProperties(jobProperties, RESOURCE_NAMESPACE_PROP_NAME, namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.oceanbase.odc.service.task.exception.JobException;
import com.oceanbase.odc.service.task.jasypt.DefaultJasyptEncryptorConfigProperties;
import com.oceanbase.odc.service.task.jasypt.JasyptEncryptorConfigProperties;
import com.oceanbase.odc.service.task.resource.DefaultResourceOperatorBuilder;
import com.oceanbase.odc.service.task.resource.DefaultNativeK8sOperatorBuilder;
import com.oceanbase.odc.service.task.schedule.DefaultJobCredentialProvider;
import com.oceanbase.odc.service.task.schedule.JobCredentialProvider;
import com.oceanbase.odc.service.task.schedule.JobDefinition;
Expand Down Expand Up @@ -107,10 +107,10 @@ public TaskFrameworkEnabledProperties taskFrameworkEnabledProperties(

@Bean
@ConditionalOnProperty(value = "odc.task-framework.enable-k8s-local-debug-mode", havingValue = "true")
public DefaultResourceOperatorBuilder localDebugK8sOperatorBuilder(
public DefaultNativeK8sOperatorBuilder localDebugK8sOperatorBuilder(
@Autowired TaskFrameworkProperties taskFrameworkProperties,
@Autowired ResourceRepository resourceRepository) throws IOException {
return new DefaultResourceOperatorBuilder(taskFrameworkProperties, resourceRepository);
return new DefaultNativeK8sOperatorBuilder(taskFrameworkProperties, resourceRepository);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import com.oceanbase.odc.service.task.constants.JobEnvKeyConstants;
import com.oceanbase.odc.service.task.enums.TaskRunMode;
import com.oceanbase.odc.service.task.exception.JobException;
import com.oceanbase.odc.service.task.resource.AbstractK8sResourceOperatorBuilder;
import com.oceanbase.odc.service.task.resource.DefaultNativeK8sOperatorBuilder;
import com.oceanbase.odc.service.task.resource.PodConfig;
import com.oceanbase.odc.service.task.schedule.DefaultJobContextBuilder;
import com.oceanbase.odc.service.task.schedule.JobIdentity;
Expand Down Expand Up @@ -106,7 +108,13 @@ private JobCaller getJobCaller(JobIdentity ji, JobContext context) {
if (StringUtils.isNotBlank(regionName)) {
podConfig.setRegion(regionName);
}
return JobCallerBuilder.buildK8sJobCaller(podConfig, context, resourceManager);
String resourceType = AbstractK8sResourceOperatorBuilder.CLOUD_K8S_POD_TYPE;
// TODO(tianke): this is ugly code, resourceType should be dispatched before job started
// task frame work should know where job should be dispatched to
if (config.getTaskFrameworkProperties().isEnableK8sLocalDebugMode()) {
resourceType = DefaultNativeK8sOperatorBuilder.NATIVE_K8S_POD_TYPE;
}
return JobCallerBuilder.buildK8sJobCaller(podConfig, context, resourceManager, resourceType);
} else {
return JobCallerBuilder.buildProcessCaller(context,
new JobEnvironmentFactory().build(context, TaskRunMode.PROCESS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.oceanbase.odc.service.task.caller.ProcessJobCaller;
import com.oceanbase.odc.service.task.caller.ResourceIDUtil;
import com.oceanbase.odc.service.task.exception.JobException;
import com.oceanbase.odc.service.task.resource.DefaultResourceOperatorBuilder;
import com.oceanbase.odc.service.task.resource.AbstractK8sResourceOperatorBuilder;
import com.oceanbase.odc.service.task.resource.K8sPodResource;
import com.oceanbase.odc.service.task.resource.K8sResourceContext;
import com.oceanbase.odc.service.task.resource.client.K8sJobClient;
Expand Down Expand Up @@ -79,7 +79,7 @@ private JobContext getJobContext(Object extraData) {
@Override
public Optional<K8sPodResource> get(String namespace, String arn) throws JobException {
K8sPodResource ret = new K8sPodResource(ResourceIDUtil.REGION_PROP_NAME,
ResourceIDUtil.GROUP_PROP_NAME, DefaultResourceOperatorBuilder.CLOUD_K8S_POD_TYPE,
ResourceIDUtil.GROUP_PROP_NAME, AbstractK8sResourceOperatorBuilder.CLOUD_K8S_POD_TYPE,
namespace, arn, ResourceState.AVAILABLE,
"127.0.0.1", new Date(System.currentTimeMillis()));
return Optional.of(ret);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@
import com.oceanbase.odc.service.resource.ResourceOperatorBuilder;
import com.oceanbase.odc.service.task.config.K8sProperties;
import com.oceanbase.odc.service.task.config.TaskFrameworkProperties;
import com.oceanbase.odc.service.task.dummy.LocalMockK8sJobClient;
import com.oceanbase.odc.service.task.resource.client.DefaultK8sJobClientSelector;
import com.oceanbase.odc.service.task.resource.client.K8sJobClientSelector;
import com.oceanbase.odc.service.task.resource.client.NativeK8sJobClient;
import com.oceanbase.odc.service.task.resource.client.NullK8sJobClientSelector;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -41,46 +37,30 @@
* @date 2024/9/2 17:33
*/
@Slf4j
public class DefaultResourceOperatorBuilder implements ResourceOperatorBuilder<K8sResourceContext, K8sPodResource> {
public abstract class AbstractK8sResourceOperatorBuilder
implements ResourceOperatorBuilder<K8sResourceContext, K8sPodResource> {
public static final String CLOUD_K8S_POD_TYPE = "cloudK8sPod";
protected K8sJobClientSelector k8sJobClientSelector;
protected K8sProperties k8sProperties;
protected ResourceRepository resourceRepository;
protected String operatorType;

public DefaultResourceOperatorBuilder(TaskFrameworkProperties taskFrameworkProperties,
ResourceRepository resourceRepository) throws IOException {
public AbstractK8sResourceOperatorBuilder(TaskFrameworkProperties taskFrameworkProperties,
ResourceRepository resourceRepository, String typeName) throws IOException {
this.k8sProperties = taskFrameworkProperties.getK8sProperties();
this.resourceRepository = resourceRepository;
this.k8sJobClientSelector = buildK8sJobSelector(taskFrameworkProperties);
this.operatorType = CLOUD_K8S_POD_TYPE;
this.operatorType = typeName;
}

/**
* build k8s job selector
* create k8s client selector
*
* @param taskFrameworkProperties
* @return
*/
protected K8sJobClientSelector buildK8sJobSelector(
TaskFrameworkProperties taskFrameworkProperties) throws IOException {
K8sProperties k8sProperties = taskFrameworkProperties.getK8sProperties();
K8sJobClientSelector k8sJobClientSelector;
if (taskFrameworkProperties.isEnableK8sLocalDebugMode()) {
// k8s use in local debug mode
log.info("local debug k8s cluster enabled.");
k8sJobClientSelector = new LocalMockK8sJobClient();
} else if (StringUtils.isBlank(k8sProperties.getKubeUrl())) {
log.info("local task k8s cluster is not enabled.");
k8sJobClientSelector = new NullK8sJobClientSelector();
} else {
// normal mode
log.info("build k8sJobClientSelector, kubeUrl={}, namespace={}",
k8sProperties.getKubeUrl(), k8sProperties.getNamespace());
NativeK8sJobClient nativeK8sJobClient = new NativeK8sJobClient(k8sProperties);
k8sJobClientSelector = new DefaultK8sJobClientSelector(nativeK8sJobClient);
}
return k8sJobClientSelector;
}


protected abstract K8sJobClientSelector buildK8sJobSelector(TaskFrameworkProperties taskFrameworkProperties)
throws IOException;

@Override
public K8sResourceOperator build(ResourceLocation resourceLocation) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.service.task.resource;

import java.io.IOException;

import org.apache.commons.lang3.StringUtils;

import com.oceanbase.odc.metadb.resource.ResourceRepository;
import com.oceanbase.odc.service.task.config.K8sProperties;
import com.oceanbase.odc.service.task.config.TaskFrameworkProperties;
import com.oceanbase.odc.service.task.dummy.LocalMockK8sJobClient;
import com.oceanbase.odc.service.task.resource.client.DefaultK8sJobClientSelector;
import com.oceanbase.odc.service.task.resource.client.K8sJobClientSelector;
import com.oceanbase.odc.service.task.resource.client.NativeK8sJobClient;
import com.oceanbase.odc.service.task.resource.client.NullK8sJobClientSelector;

import lombok.extern.slf4j.Slf4j;

/**
* native k8s operator
*
* @author longpeng.zlp
* @date 2025/1/22 13:57
*/
@Slf4j
public class DefaultNativeK8sOperatorBuilder extends AbstractK8sResourceOperatorBuilder {
public static final String NATIVE_K8S_POD_TYPE = "nativeK8sPod";

public DefaultNativeK8sOperatorBuilder(TaskFrameworkProperties taskFrameworkProperties,
ResourceRepository resourceRepository) throws IOException {
super(taskFrameworkProperties, resourceRepository, NATIVE_K8S_POD_TYPE);
}

/**
* build k8s job selector
*/
protected K8sJobClientSelector buildK8sJobSelector(
TaskFrameworkProperties taskFrameworkProperties) throws IOException {
K8sProperties k8sProperties = taskFrameworkProperties.getK8sProperties();
K8sJobClientSelector k8sJobClientSelector;
if (taskFrameworkProperties.isEnableK8sLocalDebugMode()) {
// k8s use in local debug mode
log.info("local debug k8s cluster enabled.");
k8sJobClientSelector = new LocalMockK8sJobClient();
} else if (StringUtils.isBlank(k8sProperties.getKubeUrl())) {
log.info("local task k8s cluster is not enabled.");
k8sJobClientSelector = new NullK8sJobClientSelector();
} else {
// normal mode
log.info("build k8sJobClientSelector, kubeUrl={}, namespace={}",
k8sProperties.getKubeUrl(), k8sProperties.getNamespace());
NativeK8sJobClient nativeK8sJobClient = new NativeK8sJobClient(k8sProperties);
k8sJobClientSelector = new DefaultK8sJobClientSelector(nativeK8sJobClient);
}
return k8sJobClientSelector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
import com.oceanbase.odc.service.task.listener.DefaultJobProcessUpdateEvent;
import com.oceanbase.odc.service.task.listener.JobTerminateEvent;
import com.oceanbase.odc.service.task.processor.result.ResultProcessor;
import com.oceanbase.odc.service.task.resource.DefaultResourceOperatorBuilder;
import com.oceanbase.odc.service.task.resource.AbstractK8sResourceOperatorBuilder;
import com.oceanbase.odc.service.task.schedule.JobDefinition;
import com.oceanbase.odc.service.task.schedule.JobIdentity;
import com.oceanbase.odc.service.task.state.JobStatusFsm;
Expand Down Expand Up @@ -189,7 +189,7 @@ public Page<ResourceEntity> findAbandonedResource(int page, int size) {
Specification<ResourceEntity> condition = Specification.where(specification)
.and(SpecificationUtil.columnEqual(ResourceEntity.STATUS, ResourceState.ABANDONED))
.and(SpecificationUtil.columnEqual(ResourceEntity.TYPE,
DefaultResourceOperatorBuilder.CLOUD_K8S_POD_TYPE));
AbstractK8sResourceOperatorBuilder.CLOUD_K8S_POD_TYPE));
return resourceRepository.findAll(condition, PageRequest.of(page, size));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.oceanbase.odc.service.resource.ResourceLocation;
import com.oceanbase.odc.service.resource.ResourceState;
import com.oceanbase.odc.service.task.exception.JobException;
import com.oceanbase.odc.service.task.resource.DefaultResourceOperatorBuilder;
import com.oceanbase.odc.service.task.resource.AbstractK8sResourceOperatorBuilder;
import com.oceanbase.odc.service.task.resource.K8sPodResource;
import com.oceanbase.odc.service.task.resource.K8sResourceContext;
import com.oceanbase.odc.service.task.resource.K8sResourceOperator;
Expand All @@ -45,7 +45,7 @@ public class K8sResourceOperatorTest {
private final String resourceName = "myResource";
private final String regionName = "region";
private final String groupName = "group";
private final String defaultType = DefaultResourceOperatorBuilder.CLOUD_K8S_POD_TYPE;
private final String defaultType = AbstractK8sResourceOperatorBuilder.CLOUD_K8S_POD_TYPE;
private K8sResourceOperatorContext context;
private MockK8sJobClient mockK8sJobClient;
private MockK8sJobSelector mockK8sJobSelector;
Expand Down Expand Up @@ -155,7 +155,7 @@ public String delete(String namespace, String arn) throws JobException {

private K8sPodResource buildByK8sContext(K8sResourceContext k8sResourceContext) {
return new K8sPodResource(k8sResourceContext.region(), k8sResourceContext.getGroup(),
DefaultResourceOperatorBuilder.CLOUD_K8S_POD_TYPE,
AbstractK8sResourceOperatorBuilder.CLOUD_K8S_POD_TYPE,
k8sResourceContext.resourceNamespace(), k8sResourceContext.getResourceName(),
ResourceState.CREATING,
"localhost:8080", new Date(1024));
Expand Down

0 comments on commit 3614d9a

Please sign in to comment.