Skip to content

Commit

Permalink
Feat/rcp sync (#113)
Browse files Browse the repository at this point in the history
* refactor rpc
  • Loading branch information
shihuili1218 authored Oct 23, 2024
1 parent dc6d9d5 commit 819e564
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
*/
package com.ofcoder.klein.consensus.paxos;

import java.io.Serializable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ofcoder.klein.consensus.facade.Result;
import com.ofcoder.klein.consensus.facade.config.ConsensusProp;
import com.ofcoder.klein.consensus.paxos.core.MasterState;
Expand All @@ -29,7 +24,12 @@
import com.ofcoder.klein.consensus.paxos.rpc.vo.RedirectRes;
import com.ofcoder.klein.rpc.facade.Endpoint;
import com.ofcoder.klein.rpc.facade.RpcClient;
import com.ofcoder.klein.rpc.facade.exception.ConnectionException;
import com.ofcoder.klein.spi.ExtensionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;

import static com.ofcoder.klein.consensus.paxos.rpc.vo.RedirectReq.TRANSACTION_REQUEST;

Expand Down Expand Up @@ -74,11 +74,17 @@ public <D extends Serializable> Result<D> propose(final Proposal data, final boo
.proposal(data)
.apply(apply)
.build();
RedirectRes res = this.client.sendRequestSync(master, req, this.prop.getRoundTimeout() * this.prop.getRetry() + client.requestTimeout());
if (res == null) {
return builder.state(Result.State.UNKNOWN).build();
try {
RedirectRes res = this.client.sendRequestSync(master, req, this.prop.getRoundTimeout() * this.prop.getRetry() + client.requestTimeout());
return (Result<D>) res.getProposeResult();
} catch (Exception e) {
if (e instanceof ConnectionException) {
return builder.state(Result.State.FAILURE).build();
} else {
LOG.error("redirect request fail, {}: {}", e.getClass().getName(), e.getMessage());
return builder.state(Result.State.UNKNOWN).build();
}
}
return (Result<D>) res.getProposeResult();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
*/
package com.ofcoder.klein.consensus.paxos;

import java.io.Serializable;
import java.util.List;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ofcoder.klein.consensus.facade.Consensus;
import com.ofcoder.klein.consensus.facade.MemberConfiguration;
import com.ofcoder.klein.consensus.facade.Result;
Expand Down Expand Up @@ -54,6 +47,12 @@
import com.ofcoder.klein.spi.ExtensionLoader;
import com.ofcoder.klein.spi.Join;
import com.ofcoder.klein.storage.facade.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.List;
import java.util.Set;

/**
* Paxos Consensus.
Expand Down Expand Up @@ -155,9 +154,13 @@ private void joinCluster() {
req.setOp(ElasticReq.LAUNCH);

for (Endpoint endpoint : MemberRegistry.getInstance().getMemberConfiguration().getAllMembers()) {
ElasticRes res = client.sendRequestSync(endpoint, req);
if (res != null && res.isResult()) {
return;
try {
ElasticRes res = client.sendRequestSync(endpoint, req);
if (res.isResult()) {
return;
}
} catch (Exception e) {
LOG.warn("join cluster fail, {}", e.getMessage());
}
}
}
Expand All @@ -169,9 +172,13 @@ private void exitCluster() {
req.setOp(ElasticReq.SHUTDOWN);

for (Endpoint endpoint : MemberRegistry.getInstance().getMemberConfiguration().getAllMembers()) {
ElasticRes res = client.sendRequestSync(endpoint, req);
if (res.isResult()) {
return;
try {
ElasticRes res = client.sendRequestSync(endpoint, req);
if (res.isResult()) {
return;
}
} catch (Exception e) {
LOG.warn("exit cluster fail, {}", e.getMessage());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
*/
package com.ofcoder.klein.consensus.paxos.core;

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ofcoder.klein.common.util.KleinThreadFactory;
import com.ofcoder.klein.consensus.facade.AbstractInvokeCallback;
import com.ofcoder.klein.consensus.facade.config.ConsensusProp;
Expand All @@ -44,6 +34,15 @@
import com.ofcoder.klein.storage.facade.Instance;
import com.ofcoder.klein.storage.facade.LogManager;
import com.ofcoder.klein.storage.facade.Snap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.function.Predicate;

public class DataAligner {
private static final Logger LOG = LoggerFactory.getLogger(DataAligner.class);
Expand Down Expand Up @@ -176,18 +175,17 @@ private void _snapSync(final Endpoint target, final LearnCallback callback) {
.build();
SnapSyncRes res = client.sendRequestSync(target, req, 1000);

if (res != null) {
RuntimeAccessor.getLearner().loadSnapSync(res.getImages());
checkpoint = res.getImages().values().stream().max(Comparator.comparingLong(Snap::getCheckpoint)).orElse(new Snap(checkpoint, null)).getCheckpoint();
RuntimeAccessor.getLearner().loadSnapSync(res.getImages());
checkpoint = res.getImages().values().stream().max(Comparator.comparingLong(Snap::getCheckpoint)).orElse(new Snap(checkpoint, null)).getCheckpoint();

long finalCheckpoint = checkpoint;
Predicate<Task> successPredicate = it -> it.priority != Task.HIGH_PRIORITY && it.priority < finalCheckpoint;
this.learnQueue.stream().filter(successPredicate).forEach(it -> it.callback.learned(true));
this.learnQueue.removeIf(successPredicate);
result = true;

long finalCheckpoint = checkpoint;
Predicate<Task> successPredicate = it -> it.priority != Task.HIGH_PRIORITY && it.priority < finalCheckpoint;
this.learnQueue.stream().filter(successPredicate).forEach(it -> it.callback.learned(true));
this.learnQueue.removeIf(successPredicate);
result = true;
}
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
LOG.error("pull snap from {} fail. {}", target, e.getMessage());
} finally {
callback.learned(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,13 @@ private void apply(final long instanceId) {

final long lastApplyId = getLastAppliedInstanceId();
final long expectConfirmId = lastApplyId + 1;
LOG.debug("start apply, instanceId: {}, curAppliedInstanceId: {}, lastCheckpoint: {}", instanceId, getLastAppliedInstanceId(), getLastCheckpoint());

if (instanceId <= lastApplyId) {
// the instance has been applied.
return;
}

LOG.debug("start apply, instanceId: {}, curAppliedInstanceId: {}, lastCheckpoint: {}", instanceId, getLastAppliedInstanceId(), getLastCheckpoint());
if (instanceId > expectConfirmId) {
registerApplyCallback(instanceId - 1, Lists.newArrayList(new ProposeDone() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,16 @@ public void searchMaster() {
.proposalNo(self.getCurProposalNo())
.build();
for (Endpoint it : memberConfig.getMembersWithout(self.getSelf().getId())) {
PreElectRes res = client.sendRequestSync(it, req);
LOG.debug("looking for master, node-{}: {}", it.getId(), res);
if (res != null && res.getMaster() != null) {
try {
PreElectRes res = client.sendRequestSync(it, req);
LOG.debug("looking for master, node-{}: {}", it.getId(), res);
if (res != null && res.getMaster() != null) {
// restartWaitHb();
changeMaster(res.getMaster().getId());
return;
changeMaster(res.getMaster().getId());
return;
}
} catch (Exception e) {
LOG.warn("looking for master fail, {}", e.getMessage());
}
}
// there is no master in the cluster, do elect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;

import com.ofcoder.klein.rpc.facade.exception.ConnectionException;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -90,7 +91,7 @@ public boolean put(final Integer value) throws UnsupportedEncodingException {
throw new IllegalArgumentException("seq: " + req.getSeq() + ", put: " + value + " on node: " + endpoint.getId() + ", " + result);
}
return true;
} catch (IllegalArgumentException e) {
} catch (IllegalArgumentException | ConnectionException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException("seq: " + req.getSeq() + ", put: " + value + " on node: " + endpoint.getId() + ", result: UNKNOWN");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,7 @@ default <R> R sendRequestSync(Endpoint target, Serializable request, long timeou
.service(request.getClass().getSimpleName())
.method(RpcProcessor.KLEIN)
.data(ByteBuffer.wrap(Hessian2Util.serialize(request))).build();
try {
return sendRequestSync(target, param, timeoutMs);
} catch (Exception e) {
LOG.warn(e.getMessage());
return null;
}
return sendRequestSync(target, param, timeoutMs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@
*/
package com.ofcoder.klein.rpc.grpc;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.DynamicMessage;
import com.ofcoder.klein.common.serialization.Hessian2Util;
import com.ofcoder.klein.common.util.ThreadExecutor;
Expand All @@ -38,7 +28,6 @@
import com.ofcoder.klein.rpc.facade.exception.InvokeTimeoutException;
import com.ofcoder.klein.rpc.facade.exception.RpcException;
import com.ofcoder.klein.spi.Join;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ConnectivityState;
Expand All @@ -47,6 +36,16 @@
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Grpc Client.
Expand Down Expand Up @@ -154,6 +153,12 @@ public void complete(final ByteBuffer result) {
} catch (final TimeoutException e) {
future.cancel(true);
throw new InvokeTimeoutException(e.getMessage(), e);
} catch (ExecutionException e) {
if (e.getCause() instanceof ConnectionException) {
throw new ConnectionException(e.getMessage());
} else {
throw new RpcException(e.getMessage(), e);
}
} catch (final Throwable t) {
future.cancel(true);
throw new RpcException(t.getMessage(), t);
Expand Down Expand Up @@ -188,9 +193,8 @@ private ManagedChannel getChannel(final Endpoint endpoint, final boolean createI
private void invokeAsync(final Endpoint endpoint, final InvokeParam invokeParam, final InvokeCallback callback, final long timeoutMs) {
final Channel ch = getCheckedChannel(endpoint);
if (ch == null) {
ThreadExecutor.execute(() -> {
callback.error(new ConnectionException(String.format("connection not available, %s", endpoint)));
});
ThreadExecutor.execute(() ->
callback.error(new ConnectionException(String.format("connection not available, %s", endpoint))));
return;
}

Expand Down

0 comments on commit 819e564

Please sign in to comment.