diff --git a/dtmcli-core/pom.xml b/dtmcli-core/pom.xml index 4734091..bbf6d6c 100644 --- a/dtmcli-core/pom.xml +++ b/dtmcli-core/pom.xml @@ -50,6 +50,13 @@ 8.0.33 test + + + com.microsoft.sqlserver + mssql-jdbc + 13.2.1.jre8 + test + diff --git a/dtmcli-core/sql/busi.serversql.sql b/dtmcli-core/sql/busi.serversql.sql new file mode 100644 index 0000000..74eefeb --- /dev/null +++ b/dtmcli-core/sql/busi.serversql.sql @@ -0,0 +1,36 @@ +IF DB_ID('dtm_busi') IS NULL +BEGIN + CREATE DATABASE dtm_busi; +END +GO + +USE dtm_busi; +GO + +IF OBJECT_ID('dbo.user_account', 'U') IS NOT NULL + DROP TABLE dbo.user_account; +GO + +CREATE TABLE dbo.user_account ( + id INT IDENTITY(1,1) PRIMARY KEY, + user_id INT NOT NULL UNIQUE, + balance DECIMAL(10, 2) NOT NULL DEFAULT 0, + trading_balance DECIMAL(10, 2) NOT NULL DEFAULT 0, + create_time DATETIME2 DEFAULT GETDATE(), + update_time DATETIME2 DEFAULT GETDATE() +); +GO + +CREATE NONCLUSTERED INDEX IX_user_account_create_time ON dbo.user_account (create_time); +CREATE NONCLUSTERED INDEX IX_user_account_update_time ON dbo.user_account (update_time); +GO + +MERGE dbo.user_account AS target +USING (VALUES (1, 10000), (2, 10000)) AS source (user_id, balance) +ON target.user_id = source.user_id +WHEN MATCHED THEN + UPDATE SET balance = source.balance +WHEN NOT MATCHED THEN + INSERT (user_id, balance) + VALUES (source.user_id, source.balance); +GO \ No newline at end of file diff --git a/dtmcli-core/sql/dtmcli.barrier.sqlserver.sql b/dtmcli-core/sql/dtmcli.barrier.sqlserver.sql new file mode 100644 index 0000000..022fbca --- /dev/null +++ b/dtmcli-core/sql/dtmcli.barrier.sqlserver.sql @@ -0,0 +1,37 @@ +IF NOT EXISTS(SELECT * FROM sys.databases WHERE name = 'dtm_busi') +BEGIN + CREATE DATABASE dtm_barrier +END + +GO + +USE dtm_busi +GO + +IF EXISTS (SELECT * FROM sysobjects WHERE id = object_id(N'[dbo].[barrier]') and OBJECTPROPERTY(id, N'IsUserTable') = 1) +BEGIN + DROP TABLE [dbo].[barrier] +END + +GO + +CREATE TABLE [dbo].[barrier] +( + [id] bigint NOT NULL IDENTITY(1,1) PRIMARY KEY, + [trans_type] varchar(45) NOT NULL DEFAULT(''), + [gid] varchar(128) NOT NULL DEFAULT(''), + [branch_id] varchar(128) NOT NULL DEFAULT(''), + [op] varchar(45) NOT NULL DEFAULT(''), + [barrier_id] varchar(45) NOT NULL DEFAULT(''), + [reason] varchar(45) NOT NULL DEFAULT(''), + [create_time] datetime NOT NULL DEFAULT(getdate()) , + [update_time] datetime NOT NULL DEFAULT(getdate()) +) + +GO + +CREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier] + ([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC) +WITH(IGNORE_DUP_KEY = ON) + +GO \ No newline at end of file diff --git a/dtmcli-core/src/main/java/pub/dtm/client/barrier/itfc/impl/BarrierSqlServerOperator.java b/dtmcli-core/src/main/java/pub/dtm/client/barrier/itfc/impl/BarrierSqlServerOperator.java new file mode 100644 index 0000000..dcbfeb2 --- /dev/null +++ b/dtmcli-core/src/main/java/pub/dtm/client/barrier/itfc/impl/BarrierSqlServerOperator.java @@ -0,0 +1,78 @@ +package pub.dtm.client.barrier.itfc.impl; + +import pub.dtm.client.barrier.itfc.BarrierDBOperator; +import pub.dtm.client.constant.ParamFieldConstants; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Objects; + +/** + * BarrierDBOperator for SqlServer + * + * @author yxou + * @date 2026-01-07 + */ +public class BarrierSqlServerOperator implements BarrierDBOperator { + private Object connection; + + public BarrierSqlServerOperator(Object connection) { + this.connection = connection; + } + + @Override + public boolean insertBarrier(String transType, String gid, String branchId, String op, int barrierId) throws Exception { + if (Objects.isNull(connection)) { + return false; + } + Connection conn = (Connection)this.connection; + conn.setAutoCommit(false); + PreparedStatement preparedStatement = null; + try { + String sql = "insert into dbo.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)"; + preparedStatement = conn.prepareStatement(sql); + preparedStatement.setString(1, transType); + preparedStatement.setString(2, gid); + preparedStatement.setString(3, branchId); + preparedStatement.setString(4, op); + preparedStatement.setString(5, String.format("%02d", barrierId)); + preparedStatement.setString(6, op); + + if (preparedStatement.executeUpdate() == 0) { + return false; + } + if (ParamFieldConstants.CANCEL.equals(op)) { + int opIndex = 4; + preparedStatement.setString(opIndex, ParamFieldConstants.TRY); + if (preparedStatement.executeUpdate() > 0) { + return false; + } + } + } finally { + if (Objects.nonNull(preparedStatement)) { + preparedStatement.close(); + } + } + return true; + } + + @Override + public void commit() throws Exception { + if (Objects.isNull(connection)) { + return; + } + Connection conn = (Connection)this.connection; + conn.commit(); + conn.setAutoCommit(true); + } + + @Override + public void rollback() throws Exception { + if (Objects.isNull(connection)) { + return; + } + Connection conn = (Connection)this.connection; + conn.rollback(); + conn.setAutoCommit(true); + } +} \ No newline at end of file diff --git a/dtmcli-core/src/test/java/pub/dtm/client/barrier/BranchBarrierTest.java b/dtmcli-core/src/test/java/pub/dtm/client/barrier/BranchBarrierTest.java index 0d32ddc..ba4bfe7 100644 --- a/dtmcli-core/src/test/java/pub/dtm/client/barrier/BranchBarrierTest.java +++ b/dtmcli-core/src/test/java/pub/dtm/client/barrier/BranchBarrierTest.java @@ -2,7 +2,9 @@ import org.junit.jupiter.api.Test; import pub.dtm.client.barrier.itfc.impl.BarrierMysqlOperator; +import pub.dtm.client.barrier.itfc.impl.BarrierSqlServerOperator; import pub.dtm.client.busi.BusiUtil; +import pub.dtm.client.busi.DbType; import pub.dtm.client.busi.TransReq; import pub.dtm.client.enums.TransTypeEnum; @@ -37,4 +39,32 @@ void mysql_call2() throws Exception { BusiUtil.adjustTrading(connection, new TransReq(1, 30)); }); } + + @Test + void sqlserver_call1() throws Exception { + BranchBarrier branchBarrier = new BranchBarrier(); + branchBarrier.setGid("gid-unit-test1-" + System.currentTimeMillis()); + branchBarrier.setTransTypeEnum(TransTypeEnum.TCC); + branchBarrier.setBranchId("branch-1"); + branchBarrier.setOp("try"); + + Connection connection = BusiUtil.getSqlServerConnection(); + branchBarrier.call(new BarrierSqlServerOperator(connection), (barrier) -> { + BusiUtil.adjustBalance(connection, new TransReq(1, 30), DbType.SQLSERVER); + }); + } + + @Test + void sqlserver_call2() throws Exception { + BranchBarrier branchBarrier = new BranchBarrier(); + branchBarrier.setGid("gid-unit-test2-" + System.currentTimeMillis()); + branchBarrier.setTransTypeEnum(TransTypeEnum.TCC); + branchBarrier.setBranchId("branch-1"); + branchBarrier.setOp("try"); + + Connection connection = BusiUtil.getSqlServerConnection(); + branchBarrier.call(new BarrierSqlServerOperator(connection), (barrier) -> { + BusiUtil.adjustTrading(connection, new TransReq(1, 30), DbType.SQLSERVER); + }); + } } \ No newline at end of file diff --git a/dtmcli-core/src/test/java/pub/dtm/client/busi/BusiUtil.java b/dtmcli-core/src/test/java/pub/dtm/client/busi/BusiUtil.java index 38c3bc9..1033333 100644 --- a/dtmcli-core/src/test/java/pub/dtm/client/busi/BusiUtil.java +++ b/dtmcli-core/src/test/java/pub/dtm/client/busi/BusiUtil.java @@ -23,8 +23,23 @@ public static Connection getConnection() throws Exception { * @throws SQLException */ public static void adjustTrading(Connection connection, TransReq transReq) throws Exception { - String sql = "update dtm_busi.user_account set trading_balance=trading_balance+?" + adjustTrading(connection, transReq, DbType.MYSQL); + } + + /** + * 更新交易金额 + * + * @param connection + * @param transReq + * @param dbType + * @throws SQLException + */ + public static void adjustTrading(Connection connection, TransReq transReq, DbType dbType) throws Exception { + + String sql = "update %s set trading_balance=trading_balance+?" + " where user_id=? and trading_balance + ? + balance >= 0"; + sql = String.format(sql, getTableName(dbType)); + PreparedStatement preparedStatement = null; try { preparedStatement = connection.prepareStatement(sql); @@ -48,9 +63,23 @@ public static void adjustTrading(Connection connection, TransReq transReq) throw * 更新余额 */ public static void adjustBalance(Connection connection, TransReq transReq) throws SQLException { + adjustBalance(connection, transReq, DbType.MYSQL); + } + + /** + * 更新余额 + * + * @param connection + * @param transReq + * @param dbType + * @throws SQLException + */ + public static void adjustBalance(Connection connection, TransReq transReq, DbType dbType) throws SQLException { PreparedStatement preparedStatement = null; try { - String sql = "update dtm_busi.user_account set trading_balance=trading_balance-?,balance=balance+? where user_id=?"; + String sql = "update %s set trading_balance=trading_balance-?,balance=balance+? where user_id=?"; + sql = String.format(sql, getTableName(dbType)); + preparedStatement = connection.prepareStatement(sql); preparedStatement.setInt(1, transReq.amount); preparedStatement.setInt(2, transReq.amount); @@ -64,4 +93,39 @@ public static void adjustBalance(Connection connection, TransReq transReq) throw } } } + + /** + * 获取MsSql连接 + * + * @return + * @throws Exception + */ + public static Connection getSqlServerConnection() throws Exception { + String url = "jdbc:sqlserver://localhost:1433;DatabaseName=dtm_busi;encrypt=true;trustServerCertificate=true;"; + String user = "sa"; + String password = ""; + return java.sql.DriverManager.getConnection(url, user, password); + } + + /** + * 根据数据库类型获取表名 + * + * @param dbType + * @return + */ + private static String getTableName(DbType dbType) { + String tableName; + switch (dbType){ + case MYSQL: + tableName="dtm_busi.user_account"; + break; + case SQLSERVER: + tableName="dtm_busi.dbo.user_account"; + break; + default: + tableName="dtm_busi.user_account"; + break; + } + return tableName; + } } diff --git a/dtmcli-core/src/test/java/pub/dtm/client/busi/DbType.java b/dtmcli-core/src/test/java/pub/dtm/client/busi/DbType.java new file mode 100644 index 0000000..c08dda4 --- /dev/null +++ b/dtmcli-core/src/test/java/pub/dtm/client/busi/DbType.java @@ -0,0 +1,6 @@ +package pub.dtm.client.busi; + +public enum DbType { + MYSQL, + SQLSERVER; +}