diff --git a/dtmcli-core/pom.xml b/dtmcli-core/pom.xml
index 4734091..bbf6d6c 100644
--- a/dtmcli-core/pom.xml
+++ b/dtmcli-core/pom.xml
@@ -50,6 +50,13 @@
8.0.33
test
+
+
+ com.microsoft.sqlserver
+ mssql-jdbc
+ 13.2.1.jre8
+ test
+
diff --git a/dtmcli-core/sql/busi.serversql.sql b/dtmcli-core/sql/busi.serversql.sql
new file mode 100644
index 0000000..74eefeb
--- /dev/null
+++ b/dtmcli-core/sql/busi.serversql.sql
@@ -0,0 +1,36 @@
+IF DB_ID('dtm_busi') IS NULL
+BEGIN
+ CREATE DATABASE dtm_busi;
+END
+GO
+
+USE dtm_busi;
+GO
+
+IF OBJECT_ID('dbo.user_account', 'U') IS NOT NULL
+ DROP TABLE dbo.user_account;
+GO
+
+CREATE TABLE dbo.user_account (
+ id INT IDENTITY(1,1) PRIMARY KEY,
+ user_id INT NOT NULL UNIQUE,
+ balance DECIMAL(10, 2) NOT NULL DEFAULT 0,
+ trading_balance DECIMAL(10, 2) NOT NULL DEFAULT 0,
+ create_time DATETIME2 DEFAULT GETDATE(),
+ update_time DATETIME2 DEFAULT GETDATE()
+);
+GO
+
+CREATE NONCLUSTERED INDEX IX_user_account_create_time ON dbo.user_account (create_time);
+CREATE NONCLUSTERED INDEX IX_user_account_update_time ON dbo.user_account (update_time);
+GO
+
+MERGE dbo.user_account AS target
+USING (VALUES (1, 10000), (2, 10000)) AS source (user_id, balance)
+ON target.user_id = source.user_id
+WHEN MATCHED THEN
+ UPDATE SET balance = source.balance
+WHEN NOT MATCHED THEN
+ INSERT (user_id, balance)
+ VALUES (source.user_id, source.balance);
+GO
\ No newline at end of file
diff --git a/dtmcli-core/sql/dtmcli.barrier.sqlserver.sql b/dtmcli-core/sql/dtmcli.barrier.sqlserver.sql
new file mode 100644
index 0000000..022fbca
--- /dev/null
+++ b/dtmcli-core/sql/dtmcli.barrier.sqlserver.sql
@@ -0,0 +1,37 @@
+IF NOT EXISTS(SELECT * FROM sys.databases WHERE name = 'dtm_busi')
+BEGIN
+ CREATE DATABASE dtm_barrier
+END
+
+GO
+
+USE dtm_busi
+GO
+
+IF EXISTS (SELECT * FROM sysobjects WHERE id = object_id(N'[dbo].[barrier]') and OBJECTPROPERTY(id, N'IsUserTable') = 1)
+BEGIN
+ DROP TABLE [dbo].[barrier]
+END
+
+GO
+
+CREATE TABLE [dbo].[barrier]
+(
+ [id] bigint NOT NULL IDENTITY(1,1) PRIMARY KEY,
+ [trans_type] varchar(45) NOT NULL DEFAULT(''),
+ [gid] varchar(128) NOT NULL DEFAULT(''),
+ [branch_id] varchar(128) NOT NULL DEFAULT(''),
+ [op] varchar(45) NOT NULL DEFAULT(''),
+ [barrier_id] varchar(45) NOT NULL DEFAULT(''),
+ [reason] varchar(45) NOT NULL DEFAULT(''),
+ [create_time] datetime NOT NULL DEFAULT(getdate()) ,
+ [update_time] datetime NOT NULL DEFAULT(getdate())
+)
+
+GO
+
+CREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier]
+ ([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC)
+WITH(IGNORE_DUP_KEY = ON)
+
+GO
\ No newline at end of file
diff --git a/dtmcli-core/src/main/java/pub/dtm/client/barrier/itfc/impl/BarrierSqlServerOperator.java b/dtmcli-core/src/main/java/pub/dtm/client/barrier/itfc/impl/BarrierSqlServerOperator.java
new file mode 100644
index 0000000..dcbfeb2
--- /dev/null
+++ b/dtmcli-core/src/main/java/pub/dtm/client/barrier/itfc/impl/BarrierSqlServerOperator.java
@@ -0,0 +1,78 @@
+package pub.dtm.client.barrier.itfc.impl;
+
+import pub.dtm.client.barrier.itfc.BarrierDBOperator;
+import pub.dtm.client.constant.ParamFieldConstants;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Objects;
+
+/**
+ * BarrierDBOperator for SqlServer
+ *
+ * @author yxou
+ * @date 2026-01-07
+ */
+public class BarrierSqlServerOperator implements BarrierDBOperator {
+ private Object connection;
+
+ public BarrierSqlServerOperator(Object connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public boolean insertBarrier(String transType, String gid, String branchId, String op, int barrierId) throws Exception {
+ if (Objects.isNull(connection)) {
+ return false;
+ }
+ Connection conn = (Connection)this.connection;
+ conn.setAutoCommit(false);
+ PreparedStatement preparedStatement = null;
+ try {
+ String sql = "insert into dbo.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)";
+ preparedStatement = conn.prepareStatement(sql);
+ preparedStatement.setString(1, transType);
+ preparedStatement.setString(2, gid);
+ preparedStatement.setString(3, branchId);
+ preparedStatement.setString(4, op);
+ preparedStatement.setString(5, String.format("%02d", barrierId));
+ preparedStatement.setString(6, op);
+
+ if (preparedStatement.executeUpdate() == 0) {
+ return false;
+ }
+ if (ParamFieldConstants.CANCEL.equals(op)) {
+ int opIndex = 4;
+ preparedStatement.setString(opIndex, ParamFieldConstants.TRY);
+ if (preparedStatement.executeUpdate() > 0) {
+ return false;
+ }
+ }
+ } finally {
+ if (Objects.nonNull(preparedStatement)) {
+ preparedStatement.close();
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void commit() throws Exception {
+ if (Objects.isNull(connection)) {
+ return;
+ }
+ Connection conn = (Connection)this.connection;
+ conn.commit();
+ conn.setAutoCommit(true);
+ }
+
+ @Override
+ public void rollback() throws Exception {
+ if (Objects.isNull(connection)) {
+ return;
+ }
+ Connection conn = (Connection)this.connection;
+ conn.rollback();
+ conn.setAutoCommit(true);
+ }
+}
\ No newline at end of file
diff --git a/dtmcli-core/src/test/java/pub/dtm/client/barrier/BranchBarrierTest.java b/dtmcli-core/src/test/java/pub/dtm/client/barrier/BranchBarrierTest.java
index 0d32ddc..ba4bfe7 100644
--- a/dtmcli-core/src/test/java/pub/dtm/client/barrier/BranchBarrierTest.java
+++ b/dtmcli-core/src/test/java/pub/dtm/client/barrier/BranchBarrierTest.java
@@ -2,7 +2,9 @@
import org.junit.jupiter.api.Test;
import pub.dtm.client.barrier.itfc.impl.BarrierMysqlOperator;
+import pub.dtm.client.barrier.itfc.impl.BarrierSqlServerOperator;
import pub.dtm.client.busi.BusiUtil;
+import pub.dtm.client.busi.DbType;
import pub.dtm.client.busi.TransReq;
import pub.dtm.client.enums.TransTypeEnum;
@@ -37,4 +39,32 @@ void mysql_call2() throws Exception {
BusiUtil.adjustTrading(connection, new TransReq(1, 30));
});
}
+
+ @Test
+ void sqlserver_call1() throws Exception {
+ BranchBarrier branchBarrier = new BranchBarrier();
+ branchBarrier.setGid("gid-unit-test1-" + System.currentTimeMillis());
+ branchBarrier.setTransTypeEnum(TransTypeEnum.TCC);
+ branchBarrier.setBranchId("branch-1");
+ branchBarrier.setOp("try");
+
+ Connection connection = BusiUtil.getSqlServerConnection();
+ branchBarrier.call(new BarrierSqlServerOperator(connection), (barrier) -> {
+ BusiUtil.adjustBalance(connection, new TransReq(1, 30), DbType.SQLSERVER);
+ });
+ }
+
+ @Test
+ void sqlserver_call2() throws Exception {
+ BranchBarrier branchBarrier = new BranchBarrier();
+ branchBarrier.setGid("gid-unit-test2-" + System.currentTimeMillis());
+ branchBarrier.setTransTypeEnum(TransTypeEnum.TCC);
+ branchBarrier.setBranchId("branch-1");
+ branchBarrier.setOp("try");
+
+ Connection connection = BusiUtil.getSqlServerConnection();
+ branchBarrier.call(new BarrierSqlServerOperator(connection), (barrier) -> {
+ BusiUtil.adjustTrading(connection, new TransReq(1, 30), DbType.SQLSERVER);
+ });
+ }
}
\ No newline at end of file
diff --git a/dtmcli-core/src/test/java/pub/dtm/client/busi/BusiUtil.java b/dtmcli-core/src/test/java/pub/dtm/client/busi/BusiUtil.java
index 38c3bc9..1033333 100644
--- a/dtmcli-core/src/test/java/pub/dtm/client/busi/BusiUtil.java
+++ b/dtmcli-core/src/test/java/pub/dtm/client/busi/BusiUtil.java
@@ -23,8 +23,23 @@ public static Connection getConnection() throws Exception {
* @throws SQLException
*/
public static void adjustTrading(Connection connection, TransReq transReq) throws Exception {
- String sql = "update dtm_busi.user_account set trading_balance=trading_balance+?"
+ adjustTrading(connection, transReq, DbType.MYSQL);
+ }
+
+ /**
+ * 更新交易金额
+ *
+ * @param connection
+ * @param transReq
+ * @param dbType
+ * @throws SQLException
+ */
+ public static void adjustTrading(Connection connection, TransReq transReq, DbType dbType) throws Exception {
+
+ String sql = "update %s set trading_balance=trading_balance+?"
+ " where user_id=? and trading_balance + ? + balance >= 0";
+ sql = String.format(sql, getTableName(dbType));
+
PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
@@ -48,9 +63,23 @@ public static void adjustTrading(Connection connection, TransReq transReq) throw
* 更新余额
*/
public static void adjustBalance(Connection connection, TransReq transReq) throws SQLException {
+ adjustBalance(connection, transReq, DbType.MYSQL);
+ }
+
+ /**
+ * 更新余额
+ *
+ * @param connection
+ * @param transReq
+ * @param dbType
+ * @throws SQLException
+ */
+ public static void adjustBalance(Connection connection, TransReq transReq, DbType dbType) throws SQLException {
PreparedStatement preparedStatement = null;
try {
- String sql = "update dtm_busi.user_account set trading_balance=trading_balance-?,balance=balance+? where user_id=?";
+ String sql = "update %s set trading_balance=trading_balance-?,balance=balance+? where user_id=?";
+ sql = String.format(sql, getTableName(dbType));
+
preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, transReq.amount);
preparedStatement.setInt(2, transReq.amount);
@@ -64,4 +93,39 @@ public static void adjustBalance(Connection connection, TransReq transReq) throw
}
}
}
+
+ /**
+ * 获取MsSql连接
+ *
+ * @return
+ * @throws Exception
+ */
+ public static Connection getSqlServerConnection() throws Exception {
+ String url = "jdbc:sqlserver://localhost:1433;DatabaseName=dtm_busi;encrypt=true;trustServerCertificate=true;";
+ String user = "sa";
+ String password = "";
+ return java.sql.DriverManager.getConnection(url, user, password);
+ }
+
+ /**
+ * 根据数据库类型获取表名
+ *
+ * @param dbType
+ * @return
+ */
+ private static String getTableName(DbType dbType) {
+ String tableName;
+ switch (dbType){
+ case MYSQL:
+ tableName="dtm_busi.user_account";
+ break;
+ case SQLSERVER:
+ tableName="dtm_busi.dbo.user_account";
+ break;
+ default:
+ tableName="dtm_busi.user_account";
+ break;
+ }
+ return tableName;
+ }
}
diff --git a/dtmcli-core/src/test/java/pub/dtm/client/busi/DbType.java b/dtmcli-core/src/test/java/pub/dtm/client/busi/DbType.java
new file mode 100644
index 0000000..c08dda4
--- /dev/null
+++ b/dtmcli-core/src/test/java/pub/dtm/client/busi/DbType.java
@@ -0,0 +1,6 @@
+package pub.dtm.client.busi;
+
+public enum DbType {
+ MYSQL,
+ SQLSERVER;
+}