Skip to content

Commit

Permalink
redis session sort by timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes committed May 3, 2022
1 parent d4ce0a5 commit 9452176
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package io.seata.server.session;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import io.seata.core.model.GlobalStatus;

/**
Expand Down Expand Up @@ -89,6 +92,7 @@ public GlobalStatus getStatus() {
*/
public void setStatus(GlobalStatus status) {
this.status = status;
this.statuses = new GlobalStatus[] {status};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public GlobalSession readSession(String xid, boolean withBranchSessions) {
return getGlobalSession(globalTransactionDO, branchTransactionDOs);
}

@Override
public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
return readSession(new GlobalStatus[] {GlobalStatus.Begin}, withBranchSessions);
}

/**
* Read session list.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,15 @@ public Collection<GlobalSession> allSessions() {
if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
} else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying, GlobalStatus.Committing}));
return findGlobalSessions(new SessionCondition(GlobalStatus.CommitRetrying, GlobalStatus.Committing));
} else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
return findGlobalSessions(new SessionCondition(GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking,
GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying));
} else {
// all data
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.UnKnown, GlobalStatus.Begin,
GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
GlobalStatus.RollbackRetrying, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying,
GlobalStatus.AsyncCommitting}));
return findGlobalSessions(new SessionCondition(GlobalStatus.UnKnown, GlobalStatus.Begin, GlobalStatus.Committing,
GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying, GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.AsyncCommitting));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.List;
import java.util.Map;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.Collections;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.seata.config.Configuration;
Expand Down Expand Up @@ -84,6 +86,9 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
/**the prefix of the global transaction status*/
private static final String REDIS_SEATA_STATUS_PREFIX = "SEATA_STATUS_";

/**the key of global transaction status for begin*/
private static final String REDIS_SEATA_BEGIN_TRANSACTIONS_KEY = "SEATA_BEGIN_TRANSACTIONS";

private static volatile RedisTransactionStoreManager instance;

private static final String OK = "OK";
Expand Down Expand Up @@ -264,7 +269,10 @@ private boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionD
globalTransactionDO.setGmtCreate(now);
globalTransactionDO.setGmtModified(now);
pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO));
pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), globalTransactionDO.getXid());
String xid = globalTransactionDO.getXid();
pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid);
pipelined.zadd(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY,
globalTransactionDO.getBeginTime() + globalTransactionDO.getTimeout(), globalKey);
pipelined.sync();
return true;
} catch (Exception ex) {
Expand Down Expand Up @@ -332,9 +340,10 @@ private boolean updateGlobalTransactionDO(GlobalTransactionDO globalTransactionD
Map<String,String> map = new HashMap<>(2);
map.put(REDIS_KEY_GLOBAL_STATUS,String.valueOf(globalTransactionDO.getStatus()));
map.put(REDIS_KEY_GLOBAL_GMT_MODIFIED,String.valueOf((new Date()).getTime()));
multi.hmset(globalKey,map);
multi.lrem(buildGlobalStatus(Integer.valueOf(previousStatus)),0, xid);
multi.hmset(globalKey, map);
multi.lrem(buildGlobalStatus(Integer.valueOf(previousStatus)), 0, xid);
multi.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid);
multi.zrem(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalKey);
List<Object> exec = multi.exec();
if (CollectionUtils.isEmpty(exec)) {
//The data has changed by another tc, so we still think the modification is successful.
Expand Down Expand Up @@ -362,10 +371,10 @@ private boolean updateGlobalTransactionDO(GlobalTransactionDO globalTransactionD
}
}
if (lrem > 0) {
jedis.rpush(buildGlobalStatus(Integer.valueOf(previousStatus)),xid);
jedis.rpush(buildGlobalStatus(Integer.valueOf(previousStatus)), xid);
}
if (rpush > 0) {
jedis.lrem(buildGlobalStatus(globalTransactionDO.getStatus()),0,xid);
jedis.lrem(buildGlobalStatus(globalTransactionDO.getStatus()), 0, xid);
}
return false;
}
Expand Down Expand Up @@ -420,10 +429,8 @@ public GlobalSession readSession(String xid) {
*/
@Override
public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBranchSessions) {

List<GlobalSession> globalSessions = Collections.synchronizedList(new ArrayList<>());
List<String> statusKeys = convertStatusKeys(statuses);

Map<String, Integer> targetMap = calculateStatuskeysHasData(statusKeys);
if (targetMap.size() == 0 || logQueryLimit <= 0) {
return globalSessions;
Expand All @@ -446,6 +453,46 @@ public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBran
return globalSessions;
}

@Override
public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
List<GlobalSession> list = Collections.emptyList();
List<String> statusKeys = convertStatusKeys(GlobalStatus.Begin);
Map<String, Integer> targetMap = calculateStatuskeysHasData(statusKeys);
if (targetMap.size() == 0 || logQueryLimit <= 0) {
return list;
}
final long countGlobalSessions = targetMap.values().stream().collect(Collectors.summarizingInt(Integer::intValue)).getSum();
// queryCount
final long queryCount = Math.min(logQueryLimit, countGlobalSessions);
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
Set<String> values = jedis.zrange(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, 0, System.currentTimeMillis());
List<Map<String, String>> rep;
try (Pipeline pipeline = jedis.pipelined()) {
for (String value : values) {
pipeline.hgetAll(value);
}
rep = (List<Map<String, String>>) (List) pipeline.syncAndReturnAll();
}
list = rep.stream().map(map -> {
GlobalTransactionDO globalTransactionDO = (GlobalTransactionDO) BeanUtils.mapToObject(map,
GlobalTransactionDO.class);
if (globalTransactionDO != null) {
String xid = globalTransactionDO.getXid();
List<BranchTransactionDO> branchTransactionDOs = new ArrayList<>();
if (withBranchSessions) {
branchTransactionDOs = this.readBranchSessionByXid(jedis, xid);
}
return getGlobalSession(globalTransactionDO, branchTransactionDOs, withBranchSessions);
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
if (list.size() > queryCount) {
list = list.subList(0, (int) queryCount);
}
}
return list;
}

/**
* get everyone keys limit
*
Expand Down Expand Up @@ -483,9 +530,11 @@ public List<GlobalSession> readSession(SessionCondition sessionCondition) {
}
return globalSessions;
} else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) {
return readSession(sessionCondition.getStatuses(), !sessionCondition.isLazyLoadBranch());
} else if (sessionCondition.getStatus() != null) {
return readSession(new GlobalStatus[] {sessionCondition.getStatus()}, !sessionCondition.isLazyLoadBranch());
if (sessionCondition.getStatuses().length == 1 && sessionCondition.getStatuses()[0] == GlobalStatus.Begin) {
return this.readSortByTimeoutBeginSessions(!sessionCondition.isLazyLoadBranch());
} else {
return readSession(sessionCondition.getStatuses(), !sessionCondition.isLazyLoadBranch());
}
}
return null;
}
Expand Down Expand Up @@ -699,7 +748,7 @@ public Long countByGlobalSessions(GlobalStatus[] values) {
}
}

private List<String> convertStatusKeys(GlobalStatus[] statuses) {
private List<String> convertStatusKeys(GlobalStatus... statuses) {
List<String> statusKeys = new ArrayList<>();
for (int i = 0; i < statuses.length; i++) {
statusKeys.add(buildGlobalStatus(statuses[i].getCode()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;

import java.util.Collections;
import java.util.List;

/**
Expand All @@ -38,14 +39,19 @@ public GlobalSession readSession(String xid, boolean withBranchSessions) {
return null;
}

@Override
public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
return Collections.emptyList();
}

@Override
public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBranchSessions) {
return null;
return Collections.emptyList();
}

@Override
public List<GlobalSession> readSession(SessionCondition sessionCondition) {
return null;
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public interface TransactionStoreManager {
*/
GlobalSession readSession(String xid, boolean withBranchSessions);

/**
* Read session global session by sort by timeout begin status.
*
* @param withBranchSessions the withBranchSessions
* @return the global session
*/
List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions);
/**
* Read session global session.
*
Expand Down

0 comments on commit 9452176

Please sign in to comment.