Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jul 29, 2024
1 parent 6aa4948 commit 388e75e
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,20 @@
import org.apache.doris.job.extensions.cdc.utils.CdcLoadConstants;
import org.apache.doris.job.extensions.cdc.utils.RestService;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PStartCdcScannerResult;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.resource.Tag;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -75,9 +80,10 @@
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.concurrent.Future;

public class CdcDatabaseJob extends AbstractJob<CdcDatabaseTask, Map<Object, Object>> {
public static final String BINLOG_SPLIT_ID = "binlog-split";
Expand Down Expand Up @@ -346,7 +352,8 @@ public void initialize() throws JobException {
super.initialize();
if (!remainingTables.isEmpty()) {
Backend backend = createCdcProcess();
startSplitAsync(backend);
System.out.println(backend);
//startSplitAsync(backend);
}
}

Expand All @@ -366,42 +373,27 @@ public void onStatusChanged(JobStatus oldStatus, JobStatus newStatus) throws Job

private Backend createCdcProcess() throws JobException {
Backend backend = selectBackend();
// TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
// String params = generateParams();
// int cdcPort = startCdcJob(address, params);
// cdcPort = 10000;

int cdcPort = 10000;
LOG.info("Cdc server started on backend: " + backend.getHost() + ":" + cdcPort);
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
startCdcScanner(address, "");
LOG.info("Cdc server started on backend: " + backend.getHost());
return backend;
}

// private int startCdcJob(TNetworkAddress address, String params) throws JobException {
// InternalService.PCdcJobStartRequest request =
// InternalService.PCdcJobStartRequest.newBuilder().setParams(params).build();
// InternalService.PCdcJobStartResult result = null;
// try {
// Future<InternalService.PCdcJobStartResult> future =
// BackendServiceProxy.getInstance().startCdcJobAsync(address, request);
// result = future.get();
// TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
// if (code != TStatusCode.OK) {
// throw new JobException("Failed to start cdc server on backend: " + result.getStatus().getErrorMsgs(0));
// }
// return result.getPort();
// } catch (RpcException | ExecutionException | InterruptedException ex) {
// throw new JobException(ex);
// }
// }

private String generateParams() {
Long jobId = getJobId();
List<Frontend> frontends = Env.getCurrentEnv().getFrontends(null);
int httpPort = Env.getCurrentEnv().getMasterHttpPort();
List<String> fenodes = frontends.stream().filter(Frontend::isAlive).map(v -> v.getHost() + ":" + httpPort)
.collect(Collectors.toList());
ParamsBuilder paramsBuilder = new ParamsBuilder(jobId.toString(), String.join(",", fenodes), config);
return paramsBuilder.buildParams();
private void startCdcScanner(TNetworkAddress address, String params) throws JobException {
InternalService.PStartCdcScannerRequest request =
InternalService.PStartCdcScannerRequest.newBuilder().setParams(params).build();
InternalService.PStartCdcScannerResult result = null;
try {
Future<PStartCdcScannerResult> future =
BackendServiceProxy.getInstance().startCdcScanner(address, request);
result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new JobException("Failed to start cdc server on backend: " + result.getStatus().getErrorMsgs(0));
}
} catch (RpcException | ExecutionException | InterruptedException ex) {
throw new JobException(ex);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

public enum DataSourceType {

MYSQL("mysql", "com.mysql.cj.jdbc.Driver", "mysql-connector-java-8.0.25.jar");
MYSQL("mysql", "com.mysql.cj.jdbc.Driver", "mysql-connector-java-8.0.26.jar");

private String type;
private String driverClass;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ public Future<InternalService.PJdbcTestConnectionResult> testJdbcConnection(
return stub.testJdbcConnection(request);
}

public Future<InternalService.PCdcJobStartResult> startCdcJobAsync(
InternalService.PCdcJobStartRequest request) {
return stub.startCdcJobAsync(request);
public Future<InternalService.PStartCdcScannerResult> startCdcScanner(InternalService.PStartCdcScannerRequest request) {
return stub.startCdcScanner(request);
}

public Future<InternalService.PCacheResponse> updateCache(InternalService.PUpdateCacheRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,11 @@ public Future<InternalService.PJdbcTestConnectionResult> testJdbcConnection(
}
}

public Future<InternalService.PCdcJobStartResult> startCdcJobAsync(
TNetworkAddress address, InternalService.PCdcJobStartRequest request) throws RpcException {
public Future<InternalService.PStartCdcScannerResult> startCdcScanner(
TNetworkAddress address, InternalService.PStartCdcScannerRequest request) throws RpcException {
try {
final BackendServiceClient client = getProxy(address);
return client.startCdcJobAsync(request);
return client.startCdcScanner(request);
} catch (Throwable e) {
LOG.warn("start cdc job catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
Expand Down

0 comments on commit 388e75e

Please sign in to comment.