Skip to content

Commit

Permalink
optimize: redis mode support sorted set by timeout (apache#4582)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Nov 4, 2022
1 parent 810c304 commit 2a20f0f
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 141 deletions.
6 changes: 3 additions & 3 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
<apache-zookeeper.version>3.5.9</apache-zookeeper.version>
<curator-test.version>2.9.1</curator-test.version>
<spring-context-support.version>1.0.2</spring-context-support.version>
<mock-jedis.version>0.3.0</mock-jedis.version>
<apollo-client.version>2.0.1</apollo-client.version>
<mock-jedis.version>0.1.16</mock-jedis.version>
<eureka-clients.version>1.10.17</eureka-clients.version>
<consul-clients.version>1.4.2</consul-clients.version>
<nacos-client.version>1.4.2</nacos-client.version>
Expand Down Expand Up @@ -346,8 +346,8 @@
<version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>com.github.fppt</groupId>
<artifactId>jedis-mock</artifactId>
<groupId>com.github.microwww</groupId>
<artifactId>redis-server</artifactId>
<version>${mock-jedis.version}</version>
</dependency>
<dependency>
Expand Down
4 changes: 2 additions & 2 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@
</dependency>

<dependency>
<groupId>com.github.fppt</groupId>
<artifactId>jedis-mock</artifactId>
<groupId>com.github.microwww</groupId>
<artifactId>redis-server</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public GlobalStatus getStatus() {
*/
public void setStatus(GlobalStatus status) {
this.status = status;
this.statuses = new GlobalStatus[] {status};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public GlobalSession readSession(String xid, boolean withBranchSessions) {
return getGlobalSession(globalTransactionDO, branchTransactionDOs);
}

@Override
public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
return readSession(new GlobalStatus[] {GlobalStatus.Begin}, withBranchSessions);
}

/**
* Read session list.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ public boolean acquireLock(DistributedLockDO distributedLockDO) {
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
//Don't need retry,if can't acquire the lock,let the other get the lock
String result = jedis.set(distributedLockDO.getLockKey(), distributedLockDO.getLockValue(), SetParams.setParams().nx().px(distributedLockDO.getExpireTime()));
if (SUCCESS.equalsIgnoreCase(result)) {
return true;
}
return false;
return SUCCESS.equalsIgnoreCase(result);
} catch (Exception ex) {
LOGGER.error("The {} acquired the {} distributed lock failed.", distributedLockDO.getLockValue(), distributedLockDO.getLockKey(), ex);
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,15 @@ public Collection<GlobalSession> allSessions() {
if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
} else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying, GlobalStatus.Committing}));
return findGlobalSessions(new SessionCondition(GlobalStatus.CommitRetrying, GlobalStatus.Committing));
} else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
return findGlobalSessions(new SessionCondition(GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking,
GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying));
} else {
// all data
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.UnKnown, GlobalStatus.Begin,
GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
GlobalStatus.RollbackRetrying, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying,
GlobalStatus.AsyncCommitting}));
return findGlobalSessions(new SessionCondition(GlobalStatus.UnKnown, GlobalStatus.Begin, GlobalStatus.Committing,
GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying, GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.AsyncCommitting));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.List;
import java.util.Map;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.Collections;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import io.seata.config.Configuration;
Expand Down Expand Up @@ -86,6 +88,9 @@ public class RedisTransactionStoreManager extends AbstractTransactionStoreManage
/**the prefix of the global transaction status*/
private static final String REDIS_SEATA_STATUS_PREFIX = "SEATA_STATUS_";

/**the key of global transaction status for begin*/
private static final String REDIS_SEATA_BEGIN_TRANSACTIONS_KEY = "SEATA_BEGIN_TRANSACTIONS";

private static volatile RedisTransactionStoreManager instance;

private static final String OK = "OK";
Expand Down Expand Up @@ -257,7 +262,10 @@ private boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionD
globalTransactionDO.setGmtCreate(now);
globalTransactionDO.setGmtModified(now);
pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO));
pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), globalTransactionDO.getXid());
String xid = globalTransactionDO.getXid();
pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid);
pipelined.zadd(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY,
globalTransactionDO.getBeginTime() + globalTransactionDO.getTimeout(), globalKey);
pipelined.sync();
return true;
} catch (Exception ex) {
Expand Down Expand Up @@ -286,6 +294,10 @@ private boolean deleteGlobalTransactionDO(GlobalTransactionDO globalTransactionD
try (Pipeline pipelined = jedis.pipelined()) {
pipelined.lrem(buildGlobalStatus(globalTransactionDO.getStatus()), 0, globalTransactionDO.getXid());
pipelined.del(globalKey);
if (GlobalStatus.Begin.getCode() == globalTransactionDO.getStatus()
|| GlobalStatus.UnKnown.getCode() == globalTransactionDO.getStatus()) {
pipelined.zrem(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalKey);
}
pipelined.sync();
}
return true;
Expand Down Expand Up @@ -331,9 +343,10 @@ private boolean updateGlobalTransactionDO(GlobalTransactionDO globalTransactionD
Map<String,String> map = new HashMap<>(2);
map.put(REDIS_KEY_GLOBAL_STATUS,String.valueOf(globalTransactionDO.getStatus()));
map.put(REDIS_KEY_GLOBAL_GMT_MODIFIED,String.valueOf((new Date()).getTime()));
multi.hmset(globalKey,map);
multi.lrem(buildGlobalStatus(Integer.valueOf(previousStatus)),0, xid);
multi.hmset(globalKey, map);
multi.lrem(buildGlobalStatus(Integer.valueOf(previousStatus)), 0, xid);
multi.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), xid);
multi.zrem(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, globalKey);
List<Object> exec = multi.exec();
if (CollectionUtils.isEmpty(exec)) {
//The data has changed by another tc, so we still think the modification is successful.
Expand Down Expand Up @@ -361,10 +374,10 @@ private boolean updateGlobalTransactionDO(GlobalTransactionDO globalTransactionD
}
}
if (lrem > 0) {
jedis.rpush(buildGlobalStatus(Integer.valueOf(previousStatus)),xid);
jedis.rpush(buildGlobalStatus(Integer.valueOf(previousStatus)), xid);
}
if (rpush > 0) {
jedis.lrem(buildGlobalStatus(globalTransactionDO.getStatus()),0,xid);
jedis.lrem(buildGlobalStatus(globalTransactionDO.getStatus()), 0, xid);
}
return false;
}
Expand Down Expand Up @@ -419,10 +432,8 @@ public GlobalSession readSession(String xid) {
*/
@Override
public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBranchSessions) {

List<GlobalSession> globalSessions = Collections.synchronizedList(new ArrayList<>());
List<String> statusKeys = convertStatusKeys(statuses);

Map<String, Integer> targetMap = calculateStatuskeysHasData(statusKeys);
if (targetMap.size() == 0 || logQueryLimit <= 0) {
return globalSessions;
Expand All @@ -445,6 +456,45 @@ public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBran
return globalSessions;
}

@Override
public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
List<GlobalSession> list = Collections.emptyList();
List<String> statusKeys = convertStatusKeys(GlobalStatus.Begin);
Map<String, Integer> targetMap = calculateStatuskeysHasData(statusKeys);
if (targetMap.size() == 0 || logQueryLimit <= 0) {
return list;
}
final long countGlobalSessions = targetMap.values().stream().collect(Collectors.summarizingInt(Integer::intValue)).getSum();
// queryCount
final long queryCount = Math.min(logQueryLimit, countGlobalSessions);
try (Jedis jedis = JedisPooledFactory.getJedisInstance()) {
Set<String> values =
jedis.zrangeByScore(REDIS_SEATA_BEGIN_TRANSACTIONS_KEY, 0, System.currentTimeMillis(), 0,
(int) queryCount);
List<Map<String, String>> rep;
try (Pipeline pipeline = jedis.pipelined()) {
for (String value : values) {
pipeline.hgetAll(value);
}
rep = (List<Map<String, String>>) (List) pipeline.syncAndReturnAll();
}
list = rep.stream().map(map -> {
GlobalTransactionDO globalTransactionDO = (GlobalTransactionDO) BeanUtils.mapToObject(map,
GlobalTransactionDO.class);
if (globalTransactionDO != null) {
String xid = globalTransactionDO.getXid();
List<BranchTransactionDO> branchTransactionDOs = new ArrayList<>();
if (withBranchSessions) {
branchTransactionDOs = this.readBranchSessionByXid(jedis, xid);
}
return getGlobalSession(globalTransactionDO, branchTransactionDOs, withBranchSessions);
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());
}
return list;
}

/**
* get everyone keys limit
*
Expand Down Expand Up @@ -482,9 +532,11 @@ public List<GlobalSession> readSession(SessionCondition sessionCondition) {
}
return globalSessions;
} else if (CollectionUtils.isNotEmpty(sessionCondition.getStatuses())) {
return readSession(sessionCondition.getStatuses(), !sessionCondition.isLazyLoadBranch());
} else if (sessionCondition.getStatus() != null) {
return readSession(new GlobalStatus[] {sessionCondition.getStatus()}, !sessionCondition.isLazyLoadBranch());
if (sessionCondition.getStatuses().length == 1 && sessionCondition.getStatuses()[0] == GlobalStatus.Begin) {
return this.readSortByTimeoutBeginSessions(!sessionCondition.isLazyLoadBranch());
} else {
return readSession(sessionCondition.getStatuses(), !sessionCondition.isLazyLoadBranch());
}
}
return null;
}
Expand Down Expand Up @@ -698,7 +750,7 @@ public Long countByGlobalSessions(GlobalStatus[] values) {
}
}

private List<String> convertStatusKeys(GlobalStatus[] statuses) {
private List<String> convertStatusKeys(GlobalStatus... statuses) {
List<String> statusKeys = new ArrayList<>();
for (int i = 0; i < statuses.length; i++) {
statusKeys.add(buildGlobalStatus(statuses[i].getCode()));
Expand Down Expand Up @@ -743,9 +795,7 @@ private void dogetXidsForTargetMapRecursive(Map<String, Integer> targetMap, long
if (list.size() > 0) {
listList.add(list);
} else {
if (list.size() == 0) {
iterator.remove();
}
iterator.remove();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionCondition;

import java.util.Collections;
import java.util.List;

/**
Expand All @@ -38,14 +39,19 @@ public GlobalSession readSession(String xid, boolean withBranchSessions) {
return null;
}

@Override
public List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions) {
return Collections.emptyList();
}

@Override
public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBranchSessions) {
return null;
return Collections.emptyList();
}

@Override
public List<GlobalSession> readSession(SessionCondition sessionCondition) {
return null;
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public interface TransactionStoreManager {
*/
GlobalSession readSession(String xid, boolean withBranchSessions);

/**
* Read session global session by sort by timeout begin status.
*
* @param withBranchSessions the withBranchSessions
* @return the global session
*/
List<GlobalSession> readSortByTimeoutBeginSessions(boolean withBranchSessions);
/**
* Read session global session.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,30 @@

import java.io.IOException;

import com.github.fppt.jedismock.RedisServer;
import com.github.microwww.redis.RedisServer;
import io.seata.core.exception.TransactionException;
import io.seata.core.lock.Locker;
import io.seata.server.lock.LockManager;
import io.seata.server.session.BranchSession;
import io.seata.server.storage.redis.JedisPooledFactory;
import io.seata.server.session.redis.MockRedisServer;
import io.seata.server.storage.redis.lock.RedisLockManager;
import io.seata.server.storage.redis.lock.RedisLocker;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
* @author funkye
*/
@SpringBootTest
public class RedisLockManagerTest {
static RedisServer server = null;
static LockManager lockManager = null;

@BeforeAll
public static void start(ApplicationContext context) throws IOException {
server = RedisServer.newRedisServer(6789);
server.start();
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMinIdle(1);
poolConfig.setMaxIdle(10);
JedisPooledFactory.getJedisPoolInstance(new JedisPool(poolConfig, "127.0.0.1", 6789, 60000));
MockRedisServer.getInstance();
lockManager = new RedisLockManagerForTest();
}

Expand Down Expand Up @@ -94,12 +85,6 @@ public void isLockable() throws TransactionException {
branchSession2.getLockKey()));
}

@AfterAll
public static void after() {
server.stop();
server = null;
}

public static class RedisLockManagerForTest extends RedisLockManager {

public RedisLockManagerForTest() {}
Expand Down
Loading

0 comments on commit 2a20f0f

Please sign in to comment.