diff --git a/changes/en-us/develop.md b/changes/en-us/develop.md index 115a669cca7..f6756f95edc 100644 --- a/changes/en-us/develop.md +++ b/changes/en-us/develop.md @@ -9,6 +9,7 @@ Add changes here for all PR submitted to the develop branch. - [[#4479](https://github.com/seata/seata/pull/4479)] TCC mode supports tcc annotation marked on both interface and implementation class - [[#4877](https://github.com/seata/seata/pull/4877)] seata client support jdk17 - [[#4468](https://github.com/seata/seata/pull/4968)] support kryo 5.3.0 +- [[#4914](https://github.com/seata/seata/pull/4914)] support mysql update join sql ### bugfix: @@ -24,6 +25,7 @@ Add changes here for all PR submitted to the develop branch. - [[#4928](https://github.com/seata/seata/pull/4928)] fix rpcContext.getClientRMHolderMap NPE - [[#4953](https://github.com/seata/seata/pull/4953)] fix InsertOnDuplicateUpdate bypass modify pk - [[#4978](https://github.com/seata/seata/pull/4978)] fix kryo support circular reference +- [[#5004](https://github.com/seata/seata/pull/5004)] fix duplicate image row for update join ### optimize: @@ -77,5 +79,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [tuwenlin](https://github.com/tuwenlin) - [CrazyLionLi](https://github.com/JavaLionLi) - [whxxxxx](https://github.com/whxxxxx) +- [renliangyu857](https://github.com/renliangyu857) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/develop.md b/changes/zh-cn/develop.md index 0a95fe0167f..94ba7cfc86d 100644 --- a/changes/zh-cn/develop.md +++ b/changes/zh-cn/develop.md @@ -9,6 +9,7 @@ - [[#4479](https://github.com/seata/seata/pull/4479)] TCC注解支持添加在实现类及其方法上也生效 - [[#4877](https://github.com/seata/seata/pull/4877)] seata client支持jdk17 - [[#4468](https://github.com/seata/seata/pull/4968)] 支持kryo 5.3.0 +- [[#4914](https://github.com/seata/seata/pull/4914)] 支持mysql的update join联表更新语法 @@ -26,6 +27,7 @@ - [[#4953](https://github.com/seata/seata/pull/4953)] 修复InsertOnDuplicateUpdate可绕过修改主键的问题 - [[#4978](https://github.com/seata/seata/pull/4978)] 修复 kryo 支持循环依赖 - [[#4985](https://github.com/seata/seata/pull/4985)] 修复 undo_log id重复的问题 +- [[#5004](https://github.com/seata/seata/pull/5004)] 修复mysql update join行数据重复的问题 ### optimize: @@ -77,5 +79,6 @@ - [tuwenlin](https://github.com/tuwenlin) - [CrazyLionLi](https://github.com/JavaLionLi) - [whxxxxx](https://github.com/whxxxxx) +- [renliangyu857](https://github.com/renliangyu857) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 diff --git a/core/src/main/java/io/seata/core/protocol/Version.java b/core/src/main/java/io/seata/core/protocol/Version.java index 722eda734bf..63f6ccfb215 100644 --- a/core/src/main/java/io/seata/core/protocol/Version.java +++ b/core/src/main/java/io/seata/core/protocol/Version.java @@ -136,6 +136,15 @@ public static long convertVersion(String version) throws IncompatibleVersionExce return result; } + public static long convertVersionNotThrowException(String version) { + try { + return convertVersion(version); + } catch (Exception e) { + LOGGER.error("convert version error,version:{}",version,e); + } + return -1; + } + private static long calculatePartValue(String partNumeric, int size, int index) { return Long.parseLong(partNumeric) * Double.valueOf(Math.pow(100, size - index)).longValue(); } diff --git a/rm-datasource/src/main/java/io/seata/rm/datasource/DataSourceProxy.java b/rm-datasource/src/main/java/io/seata/rm/datasource/DataSourceProxy.java index 43b8510a1aa..d0bcad362fd 100644 --- a/rm-datasource/src/main/java/io/seata/rm/datasource/DataSourceProxy.java +++ b/rm-datasource/src/main/java/io/seata/rm/datasource/DataSourceProxy.java @@ -16,6 +16,8 @@ package io.seata.rm.datasource; import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -26,6 +28,7 @@ import io.seata.common.thread.NamedThreadFactory; import io.seata.config.ConfigurationFactory; import io.seata.common.ConfigurationKeys; +import io.seata.core.constants.DBType; import io.seata.core.context.RootContext; import io.seata.core.model.BranchType; import io.seata.core.model.Resource; @@ -60,6 +63,8 @@ public class DataSourceProxy extends AbstractDataSourceProxy implements Resource private String userName; + private String version; + /** * Enable the table meta checker */ @@ -109,6 +114,7 @@ private void init(DataSource dataSource, String resourceGroupId) { } else if (JdbcConstants.MARIADB.equals(dbType)) { dbType = JdbcConstants.MYSQL; } + version = selectDbVersion(connection); } catch (SQLException e) { throw new IllegalStateException("can not init dataSource", e); } @@ -268,4 +274,22 @@ private void initPGResourceId() { public BranchType getBranchType() { return BranchType.AT; } + + public String getVersion() { + return version; + } + + private String selectDbVersion(Connection connection) { + if (DBType.MYSQL.name().equalsIgnoreCase(dbType)) { + try (PreparedStatement preparedStatement = connection.prepareStatement("SELECT VERSION()"); + ResultSet versionResult = preparedStatement.executeQuery()) { + if (versionResult.next()) { + return versionResult.getString("VERSION()"); + } + } catch (Exception e) { + LOGGER.error("get mysql version fail error: {}", e.getMessage()); + } + } + return ""; + } } diff --git a/rm-datasource/src/main/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java b/rm-datasource/src/main/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java index 64a612a5691..f1d5ebbce0a 100644 --- a/rm-datasource/src/main/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java +++ b/rm-datasource/src/main/java/io/seata/rm/datasource/exec/AbstractDMLBaseExecutor.java @@ -48,6 +48,8 @@ public abstract class AbstractDMLBaseExecutor extends Ba protected static final String WHERE = " WHERE "; + protected static final String GROUP_BY = " GROUP BY "; + /** * Instantiates a new Abstract dml base executor. diff --git a/rm-datasource/src/main/java/io/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java b/rm-datasource/src/main/java/io/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java index 1895968174c..4ae946815d6 100644 --- a/rm-datasource/src/main/java/io/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java +++ b/rm-datasource/src/main/java/io/seata/rm/datasource/exec/mysql/MySQLUpdateJoinExecutor.java @@ -27,6 +27,7 @@ import java.util.StringJoiner; import io.seata.common.util.CollectionUtils; +import io.seata.core.protocol.Version; import io.seata.rm.datasource.ConnectionProxy; import io.seata.rm.datasource.sql.struct.TableMetaCacheFactory; import io.seata.rm.datasource.undo.SQLUndoLog; @@ -42,16 +43,21 @@ import io.seata.rm.datasource.sql.struct.TableRecords; import io.seata.sqlparser.SQLRecognizer; import io.seata.sqlparser.SQLUpdateRecognizer; - +import io.seata.sqlparser.util.ColumnUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author renliangyu857 */ public class MySQLUpdateJoinExecutor extends UpdateExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(MySQLUpdateJoinExecutor.class); private static final String DOT = "."; private final Map beforeImagesMap = new LinkedHashMap<>(4); private final Map afterImagesMap = new LinkedHashMap<>(4); + private final boolean isLowerSupportGroupByPksVersion = Version.convertVersionNotThrowException(getDbVersion()) < Version.convertVersionNotThrowException("5.7.5"); + private String sqlMode = ""; /** * Instantiates a new Update executor. @@ -89,6 +95,7 @@ protected TableRecords beforeImage() throws SQLException { private String buildBeforeImageSQL(String joinTable, String itemTable, List itemTableUpdateColumns, ArrayList> paramAppenderList) { SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer; + TableMeta itemTableMeta = getTableMeta(itemTable); StringBuilder prefix = new StringBuilder("SELECT "); StringBuilder suffix = new StringBuilder(" FROM ").append(joinTable); String whereCondition = buildWhereCondition(recognizer, paramAppenderList); @@ -103,9 +110,13 @@ private String buildBeforeImageSQL(String joinTable, String itemTable, List pkColumnNames = getColumnNamesWithTablePrefixList(itemTable, recognizer.getTableAlias(itemTable), itemTableMeta.getPrimaryKeyOnlyName()); + List needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns); + suffix.append(buildGroupBy(pkColumnNames,needUpdateColumns)); suffix.append(" FOR UPDATE"); StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString()); - List needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns); for (String needUpdateColumn : needUpdateColumns) { selectSQLJoin.add(needUpdateColumn); } @@ -143,11 +154,15 @@ private String buildAfterImageSQL(String joinTable, String itemTable, SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer; TableMeta itemTableMeta = getTableMeta(itemTable); StringBuilder prefix = new StringBuilder("SELECT "); - String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(getColumnNamesWithTablePrefixList(itemTable, recognizer.getTableAlias(itemTable), itemTableMeta.getPrimaryKeyOnlyName()), beforeImage.pkRows().size(), getDbType()); + List pkColumns = getColumnNamesWithTablePrefixList(itemTable, recognizer.getTableAlias(itemTable), itemTableMeta.getPrimaryKeyOnlyName()); + String whereSql = SqlGenerateUtils.buildWhereConditionByPKs(pkColumns, beforeImage.pkRows().size(), getDbType()); String suffix = " FROM " + joinTable + " WHERE " + whereSql; - StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix); + //maybe duplicate row for select join sql.remove duplicate row by 'group by' condition + suffix += GROUP_BY; List itemTableUpdateColumns = getItemUpdateColumns(itemTableMeta, recognizer.getUpdateColumns()); List needUpdateColumns = getNeedUpdateColumns(itemTable, recognizer.getTableAlias(itemTable), itemTableUpdateColumns); + suffix += buildGroupBy(pkColumns,needUpdateColumns); + StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix); for (String needUpdateColumn : needUpdateColumns) { selectSQLJoiner.add(needUpdateColumn); } @@ -214,4 +229,47 @@ protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterI sqlUndoLog.setAfterImage(afterImage); return sqlUndoLog; } + + /** + * build group by condition which used for removing duplicate row in select join sql" + * + * @param pkColumns pkColumnsList + * @param allSelectColumns allSelectColumns + * @return return group by condition string. + */ + private String buildGroupBy(List pkColumns,List allSelectColumns) { + boolean groupByPks = true; + //only pks group by is valid when db version >= 5.7.5 + try { + if (isLowerSupportGroupByPksVersion) { + if (StringUtils.isEmpty(sqlMode)) { + try (PreparedStatement preparedStatement = statementProxy.getConnection().prepareStatement("SELECT @@SQL_MODE"); + ResultSet resultSet = preparedStatement.executeQuery()) { + if (resultSet.next()) { + sqlMode = resultSet.getString("@@SQL_MODE"); + } + } + } + if (sqlMode.contains("ONLY_FULL_GROUP_BY")) { + groupByPks = false; + } + } + } catch (Exception e) { + groupByPks = false; + LOGGER.warn("determine group by pks or all columns error:{}",e.getMessage()); + } + List groupByColumns = groupByPks ? pkColumns : allSelectColumns; + StringBuilder groupByStr = new StringBuilder(); + for (int i = 0; i < groupByColumns.size(); i++) { + if (i > 0) { + groupByStr.append(","); + } + groupByStr.append(ColumnUtils.addEscape(groupByColumns.get(i),getDbType())); + } + return groupByStr.toString(); + } + + private String getDbVersion() { + return statementProxy.getConnectionProxy().getDataSourceProxy().getVersion(); + } }