Skip to content

Commit

Permalink
bugfix: duplicate image row for update join (apache#5004)
Browse files Browse the repository at this point in the history
  • Loading branch information
renliangyu857 authored Oct 25, 2022
1 parent 83ac9f3 commit 9e6a3b5
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 4 deletions.
3 changes: 3 additions & 0 deletions changes/en-us/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#4479](https://github.com/seata/seata/pull/4479)] TCC mode supports tcc annotation marked on both interface and implementation class
- [[#4877](https://github.com/seata/seata/pull/4877)] seata client support jdk17
- [[#4468](https://github.com/seata/seata/pull/4968)] support kryo 5.3.0
- [[#4914](https://github.com/seata/seata/pull/4914)] support mysql update join sql


### bugfix:
Expand All @@ -24,6 +25,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#4928](https://github.com/seata/seata/pull/4928)] fix rpcContext.getClientRMHolderMap NPE
- [[#4953](https://github.com/seata/seata/pull/4953)] fix InsertOnDuplicateUpdate bypass modify pk
- [[#4978](https://github.com/seata/seata/pull/4978)] fix kryo support circular reference
- [[#5004](https://github.com/seata/seata/pull/5004)] fix duplicate image row for update join


### optimize:
Expand Down Expand Up @@ -77,5 +79,6 @@ Thanks to these contributors for their code commits. Please report an unintended
- [tuwenlin](https://github.com/tuwenlin)
- [CrazyLionLi](https://github.com/JavaLionLi)
- [whxxxxx](https://github.com/whxxxxx)
- [renliangyu857](https://github.com/renliangyu857)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
3 changes: 3 additions & 0 deletions changes/zh-cn/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [[#4479](https://github.com/seata/seata/pull/4479)] TCC注解支持添加在实现类及其方法上也生效
- [[#4877](https://github.com/seata/seata/pull/4877)] seata client支持jdk17
- [[#4468](https://github.com/seata/seata/pull/4968)] 支持kryo 5.3.0
- [[#4914](https://github.com/seata/seata/pull/4914)] 支持mysql的update join联表更新语法



Expand All @@ -26,6 +27,7 @@
- [[#4953](https://github.com/seata/seata/pull/4953)] 修复InsertOnDuplicateUpdate可绕过修改主键的问题
- [[#4978](https://github.com/seata/seata/pull/4978)] 修复 kryo 支持循环依赖
- [[#4985](https://github.com/seata/seata/pull/4985)] 修复 undo_log id重复的问题
- [[#5004](https://github.com/seata/seata/pull/5004)] 修复mysql update join行数据重复的问题


### optimize:
Expand Down Expand Up @@ -77,5 +79,6 @@
- [tuwenlin](https://github.com/tuwenlin)
- [CrazyLionLi](https://github.com/JavaLionLi)
- [whxxxxx](https://github.com/whxxxxx)
- [renliangyu857](https://github.com/renliangyu857)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
9 changes: 9 additions & 0 deletions core/src/main/java/io/seata/core/protocol/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ public static long convertVersion(String version) throws IncompatibleVersionExce
return result;
}

public static long convertVersionNotThrowException(String version) {
try {
return convertVersion(version);
} catch (Exception e) {
LOGGER.error("convert version error,version:{}",version,e);
}
return -1;
}

private static long calculatePartValue(String partNumeric, int size, int index) {
return Long.parseLong(partNumeric) * Double.valueOf(Math.pow(100, size - index)).longValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package io.seata.rm.datasource;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -26,6 +28,7 @@
import io.seata.common.thread.NamedThreadFactory;
import io.seata.config.ConfigurationFactory;
import io.seata.common.ConfigurationKeys;
import io.seata.core.constants.DBType;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import io.seata.core.model.Resource;
Expand Down Expand Up @@ -60,6 +63,8 @@ public class DataSourceProxy extends AbstractDataSourceProxy implements Resource

private String userName;

private String version;

/**
* Enable the table meta checker
*/
Expand Down Expand Up @@ -109,6 +114,7 @@ private void init(DataSource dataSource, String resourceGroupId) {
} else if (JdbcConstants.MARIADB.equals(dbType)) {
dbType = JdbcConstants.MYSQL;
}
version = selectDbVersion(connection);
} catch (SQLException e) {
throw new IllegalStateException("can not init dataSource", e);
}
Expand Down Expand Up @@ -268,4 +274,22 @@ private void initPGResourceId() {
public BranchType getBranchType() {
return BranchType.AT;
}

public String getVersion() {
return version;
}

private String selectDbVersion(Connection connection) {
if (DBType.MYSQL.name().equalsIgnoreCase(dbType)) {
try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT VERSION()");
ResultSet versionResult = preparedStatement.executeQuery()) {
if (versionResult.next()) {
return versionResult.getString("VERSION()");
}
} catch (Exception e) {
LOGGER.error("get mysql version fail error: {}", e.getMessage());
}
}
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends Ba

protected static final String WHERE = " WHERE ";

protected static final String GROUP_BY = " GROUP BY ";


/**
* Instantiates a new Abstract dml base executor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.StringJoiner;

import io.seata.common.util.CollectionUtils;
import io.seata.core.protocol.Version;
import io.seata.rm.datasource.ConnectionProxy;
import io.seata.rm.datasource.sql.struct.TableMetaCacheFactory;
import io.seata.rm.datasource.undo.SQLUndoLog;
Expand All @@ -42,16 +43,21 @@
import io.seata.rm.datasource.sql.struct.TableRecords;
import io.seata.sqlparser.SQLRecognizer;
import io.seata.sqlparser.SQLUpdateRecognizer;

import io.seata.sqlparser.util.ColumnUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* @author renliangyu857
*/
public class MySQLUpdateJoinExecutor<T, S extends Statement> extends UpdateExecutor<T, S> {
private static final Logger LOGGER = LoggerFactory.getLogger(MySQLUpdateJoinExecutor.class);
private static final String DOT = ".";
private final Map<String, TableRecords> beforeImagesMap = new LinkedHashMap<>(4);
private final Map<String, TableRecords> afterImagesMap = new LinkedHashMap<>(4);
private final boolean isLowerSupportGroupByPksVersion = Version.convertVersionNotThrowException(getDbVersion()) < Version.convertVersionNotThrowException("5.7.5");
private String sqlMode = "";

/**
* Instantiates a new Update executor.
Expand Down Expand Up @@ -89,6 +95,7 @@ protected TableRecords beforeImage() throws SQLException {
private String buildBeforeImageSQL(String joinTable, String itemTable, List<String> itemTableUpdateColumns,
ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
TableMeta itemTableMeta = getTableMeta(itemTable);
StringBuilder prefix = new StringBuilder("SELECT ");
StringBuilder suffix = new StringBuilder(" FROM ").append(joinTable);
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
Expand All @@ -103,9 +110,13 @@ private String buildBeforeImageSQL(String joinTable, String itemTable, List<Stri
if (StringUtils.isNotBlank(limitCondition)) {
suffix.append(" ").append(limitCondition);
}
//maybe duplicate row for select join sql.remove duplicate row by 'group by' condition
suffix.append(GROUP_BY);
List<String> pkColumnNames = getColumnNamesWithTablePrefixList(itemTable, recognizer.getTableAlias(itemTable), itemTableMeta.getPrimaryKeyOnlyName());
List<String> needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns);
suffix.append(buildGroupBy(pkColumnNames,needUpdateColumns));
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
List<String> needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns);
for (String needUpdateColumn : needUpdateColumns) {
selectSQLJoin.add(needUpdateColumn);
}
Expand Down Expand Up @@ -143,11 +154,15 @@ private String buildAfterImageSQL(String joinTable, String itemTable,
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
TableMeta itemTableMeta = getTableMeta(itemTable);
StringBuilder prefix = new StringBuilder("SELECT ");
String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(getColumnNamesWithTablePrefixList(itemTable, recognizer.getTableAlias(itemTable), itemTableMeta.getPrimaryKeyOnlyName()), beforeImage.pkRows().size(), getDbType());
List<String> pkColumns = getColumnNamesWithTablePrefixList(itemTable, recognizer.getTableAlias(itemTable), itemTableMeta.getPrimaryKeyOnlyName());
String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(pkColumns, beforeImage.pkRows().size(), getDbType());
String suffix = " FROM " + joinTable + " WHERE " + whereSql;
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
//maybe duplicate row for select join sql.remove duplicate row by 'group by' condition
suffix += GROUP_BY;
List<String> itemTableUpdateColumns = getItemUpdateColumns(itemTableMeta, recognizer.getUpdateColumns());
List<String> needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns);
suffix += buildGroupBy(pkColumns,needUpdateColumns);
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
for (String needUpdateColumn : needUpdateColumns) {
selectSQLJoiner.add(needUpdateColumn);
}
Expand Down Expand Up @@ -214,4 +229,47 @@ protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterI
sqlUndoLog.setAfterImage(afterImage);
return sqlUndoLog;
}

/**
* build group by condition which used for removing duplicate row in select join sql"
*
* @param pkColumns pkColumnsList
* @param allSelectColumns allSelectColumns
* @return return group by condition string.
*/
private String buildGroupBy(List<String> pkColumns,List<String> allSelectColumns) {
boolean groupByPks = true;
//only pks group by is valid when db version >= 5.7.5
try {
if (isLowerSupportGroupByPksVersion) {
if (StringUtils.isEmpty(sqlMode)) {
try (PreparedStatement preparedStatement = statementProxy.getConnection().prepareStatement("SELECT @@SQL_MODE");
ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
sqlMode = resultSet.getString("@@SQL_MODE");
}
}
}
if (sqlMode.contains("ONLY_FULL_GROUP_BY")) {
groupByPks = false;
}
}
} catch (Exception e) {
groupByPks = false;
LOGGER.warn("determine group by pks or all columns error:{}",e.getMessage());
}
List<String> groupByColumns = groupByPks ? pkColumns : allSelectColumns;
StringBuilder groupByStr = new StringBuilder();
for (int i = 0; i < groupByColumns.size(); i++) {
if (i > 0) {
groupByStr.append(",");
}
groupByStr.append(ColumnUtils.addEscape(groupByColumns.get(i),getDbType()));
}
return groupByStr.toString();
}

private String getDbVersion() {
return statementProxy.getConnectionProxy().getDataSourceProxy().getVersion();
}
}

0 comments on commit 9e6a3b5

Please sign in to comment.