Skip to content

Commit

Permalink
optimize: optimize transaction timeout judgment (apache#4445)
Browse files Browse the repository at this point in the history
  • Loading branch information
miaoxueyu authored Sep 21, 2022
1 parent a986644 commit 0cdaa08
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 29 deletions.
13 changes: 10 additions & 3 deletions server/src/main/java/io/seata/server/coordinator/DefaultCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.seata.server.session.GlobalSession;
import io.seata.server.session.SessionHelper;
import io.seata.server.session.SessionHolder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand Down Expand Up @@ -128,8 +129,7 @@ public BranchStatus branchRollback(GlobalSession globalSession, BranchSession br
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

Expand All @@ -141,12 +141,20 @@ public String begin(String applicationId, String transactionServiceGroup, String
return session.getXid();
}



@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}

if (globalSession.isTimeout()) {
LOGGER.info("TC detected timeout, xid = {}", globalSession.getXid());
return GlobalStatus.TimeoutRollbacking;
}

globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus

Expand Down Expand Up @@ -388,5 +396,4 @@ private boolean isXaerNotaTimeout(GlobalSession globalSession, BranchStatus bran
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.eventbus.Subscribe;
import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.StringUtils;
Expand All @@ -36,17 +35,19 @@
import io.seata.core.event.EventBus;
import io.seata.core.event.GuavaEventBus;
import io.seata.core.model.GlobalLockConfig;
import io.seata.rm.GlobalLockExecutor;
import io.seata.rm.GlobalLockTemplate;
import io.seata.spring.event.DegradeCheckEvent;
import io.seata.tm.TransactionManagerHolder;
import io.seata.tm.api.DefaultFailureHandlerImpl;
import io.seata.tm.api.FailureHandler;
import io.seata.rm.GlobalLockExecutor;
import io.seata.rm.GlobalLockTemplate;
import io.seata.tm.api.TransactionalExecutor;
import io.seata.tm.api.TransactionalTemplate;
import io.seata.tm.api.transaction.NoRollbackRule;
import io.seata.tm.api.transaction.RollbackRule;
import io.seata.tm.api.transaction.TransactionInfo;

import com.google.common.eventbus.Subscribe;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.slf4j.Logger;
Expand Down Expand Up @@ -260,6 +261,9 @@ public TransactionInfo getTransactionInfo() {
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
case TimeoutRollback:
failureHandler.onTimeoutRollback(e.getTransaction(), e.getOriginalException());
throw e.getCause();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class MockGlobalTransaction implements GlobalTransaction {

private String xid;
private GlobalStatus status;
private long createTime;

private static SpringJvmUUIDSeqGenerator uuidSeqGenerator = new SpringJvmUUIDSeqGenerator();

Expand All @@ -52,6 +53,7 @@ public void begin() throws TransactionException {

@Override
public void begin(int timeout) throws TransactionException {
this.createTime = System.currentTimeMillis();
status = GlobalStatus.Begin;
xid = uuidSeqGenerator.generate(null).toString();
RootContext.bind(xid);
Expand Down Expand Up @@ -109,4 +111,8 @@ public GlobalTransactionRole getGlobalTransactionRole() {
return null;
}

@Override
public long getCreateTime() {
return createTime;
}
}
12 changes: 9 additions & 3 deletions tm/src/main/java/io/seata/tm/api/DefaultFailureHandlerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

import java.util.concurrent.TimeUnit;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.core.exception.TransactionException;
import io.seata.core.logger.StackTraceLogger;
import io.seata.core.model.GlobalStatus;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,6 +76,11 @@ public void onRollbackRetrying(GlobalTransaction tx, Throwable originalException
TimeUnit.SECONDS);
}

@Override
public void onTimeoutRollback(GlobalTransaction tx, Throwable originalException) {
StackTraceLogger.warn(LOGGER, originalException, "Transaction timeout rollback[{}]", new String[] {tx.getXid()});
}

protected class CheckTimerTask implements TimerTask {

private final GlobalTransaction tx;
Expand Down
14 changes: 14 additions & 0 deletions tm/src/main/java/io/seata/tm/api/DefaultGlobalTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.seata.core.model.TransactionManager;
import io.seata.tm.TransactionManagerHolder;
import io.seata.tm.api.transaction.SuspendedResourcesHolder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -50,6 +51,13 @@ public class DefaultGlobalTransaction implements GlobalTransaction {

private GlobalTransactionRole role;

/**
* Used to calculate the timeout
*
* @see System#currentTimeMillis();
*/
private long createTime;

private static final int COMMIT_RETRY_COUNT = ConfigurationFactory.getInstance().getInt(
ConfigurationKeys.CLIENT_TM_COMMIT_RETRY_COUNT, DEFAULT_TM_COMMIT_RETRY_COUNT);

Expand Down Expand Up @@ -89,6 +97,7 @@ public void begin(int timeout) throws TransactionException {

@Override
public void begin(int timeout, String name) throws TransactionException {
this.createTime = System.currentTimeMillis();
if (role != GlobalTransactionRole.Launcher) {
assertXIDNotNull();
if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -250,6 +259,11 @@ public GlobalTransactionRole getGlobalTransactionRole() {
return role;
}

@Override
public long getCreateTime() {
return createTime;
}

private void assertXIDNotNull() {
if (xid == null) {
throw new IllegalStateException();
Expand Down
7 changes: 7 additions & 0 deletions tm/src/main/java/io/seata/tm/api/FailureHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,11 @@ public interface FailureHandler {
* @param originalException the originalException
*/
void onRollbackRetrying(GlobalTransaction tx, Throwable originalException);

/**
* On timeout rollback
* @param tx the tx
* @param originalException the originalException
*/
void onTimeoutRollback(GlobalTransaction tx, Throwable originalException);
}
6 changes: 6 additions & 0 deletions tm/src/main/java/io/seata/tm/api/GlobalTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,10 @@ public interface GlobalTransaction {
*/
GlobalTransactionRole getGlobalTransactionRole();

/**
* get create time
*
* @return create time
*/
long getCreateTime();
}
8 changes: 7 additions & 1 deletion tm/src/main/java/io/seata/tm/api/TransactionalExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ enum Code {
* Rollback retrying code.
*/
//
RollbackRetrying
RollbackRetrying,

/**
* Timeout Rollback code.
*/
//
TimeoutRollback
}

/**
Expand Down
71 changes: 52 additions & 19 deletions tm/src/main/java/io/seata/tm/api/TransactionalTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package io.seata.tm.api;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeoutException;

import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.core.context.GlobalLockConfigHolder;
Expand All @@ -27,6 +29,7 @@
import io.seata.tm.api.transaction.TransactionHook;
import io.seata.tm.api.transaction.TransactionHookManager;
import io.seata.tm.api.transaction.TransactionInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -91,8 +94,8 @@ public Object execute(TransactionalExecutor business) throws Throwable {
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
Expand Down Expand Up @@ -132,7 +135,7 @@ public Object execute(TransactionalExecutor business) throws Throwable {
}

// 4. everything is fine, commit.
commitTransaction(tx);
commitTransaction(tx, txInfo);

return rs;
} finally {
Expand All @@ -149,6 +152,19 @@ public Object execute(TransactionalExecutor business) throws Throwable {
}
}

/**
* Judge whether timeout
*
* @param beginTime the beginTime
* @param txInfo the transaction info
* @return is timeout
*/
private boolean isTimeout(long beginTime, TransactionInfo txInfo) {

return (System.currentTimeMillis() - beginTime) > txInfo.getTimeOut();
}


private boolean existingTransaction(GlobalTransaction tx) {
return tx != null;
}
Expand All @@ -172,41 +188,58 @@ private void resumeGlobalLockConfig(GlobalLockConfig config) {
}
}

private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)
throws TransactionalExecutor.ExecutionException, TransactionException {
//roll back
if (txInfo != null && txInfo.rollbackOn(originalException)) {
try {
rollbackTransaction(tx, originalException);
} catch (TransactionException txe) {
// Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, originalException);
}
rollbackTransaction(tx, originalException);
} else {
// not roll back on this exception, so commit
commitTransaction(tx);
commitTransaction(tx, txInfo);
}
}

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
throws TransactionalExecutor.ExecutionException, TransactionException {
if (isTimeout(tx.getCreateTime(), txInfo)) {
// business execution timeout
LOGGER.info("TM detected timeout, xid = {}", tx.getXid());
tx.rollback();
return;
}

try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
TransactionalExecutor.Code.CommitFailure);
}

if (Arrays.asList(GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbacked).contains(tx.getLocalStatus())) {
throw new TransactionalExecutor.ExecutionException(tx,
new TimeoutException(String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid())),
TransactionalExecutor.Code.TimeoutRollback);
}
}

private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();

try {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
} catch (TransactionException txe) {
// Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, originalException);
}

// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
}

private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
Expand All @@ -216,7 +249,7 @@ private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) thro
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
TransactionalExecutor.Code.BeginFailure);

}
}
Expand Down

0 comments on commit 0cdaa08

Please sign in to comment.