Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dtmcli-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
<version>8.0.33</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>13.2.1.jre8</version>
<scope>test</scope>
</dependency>
</dependencies>


Expand Down
36 changes: 36 additions & 0 deletions dtmcli-core/sql/busi.serversql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
IF DB_ID('dtm_busi') IS NULL
BEGIN
CREATE DATABASE dtm_busi;
END
GO

USE dtm_busi;
GO

IF OBJECT_ID('dbo.user_account', 'U') IS NOT NULL
DROP TABLE dbo.user_account;
GO

CREATE TABLE dbo.user_account (
id INT IDENTITY(1,1) PRIMARY KEY,
user_id INT NOT NULL UNIQUE,
balance DECIMAL(10, 2) NOT NULL DEFAULT 0,
trading_balance DECIMAL(10, 2) NOT NULL DEFAULT 0,
create_time DATETIME2 DEFAULT GETDATE(),
update_time DATETIME2 DEFAULT GETDATE()
);
GO

CREATE NONCLUSTERED INDEX IX_user_account_create_time ON dbo.user_account (create_time);
CREATE NONCLUSTERED INDEX IX_user_account_update_time ON dbo.user_account (update_time);
GO

MERGE dbo.user_account AS target
USING (VALUES (1, 10000), (2, 10000)) AS source (user_id, balance)
ON target.user_id = source.user_id
WHEN MATCHED THEN
UPDATE SET balance = source.balance
WHEN NOT MATCHED THEN
INSERT (user_id, balance)
VALUES (source.user_id, source.balance);
GO
37 changes: 37 additions & 0 deletions dtmcli-core/sql/dtmcli.barrier.sqlserver.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
IF NOT EXISTS(SELECT * FROM sys.databases WHERE name = 'dtm_busi')
BEGIN
CREATE DATABASE dtm_barrier
END

GO

USE dtm_busi
GO

IF EXISTS (SELECT * FROM sysobjects WHERE id = object_id(N'[dbo].[barrier]') and OBJECTPROPERTY(id, N'IsUserTable') = 1)
BEGIN
DROP TABLE [dbo].[barrier]
END

GO

CREATE TABLE [dbo].[barrier]
(
[id] bigint NOT NULL IDENTITY(1,1) PRIMARY KEY,
[trans_type] varchar(45) NOT NULL DEFAULT(''),
[gid] varchar(128) NOT NULL DEFAULT(''),
[branch_id] varchar(128) NOT NULL DEFAULT(''),
[op] varchar(45) NOT NULL DEFAULT(''),
[barrier_id] varchar(45) NOT NULL DEFAULT(''),
[reason] varchar(45) NOT NULL DEFAULT(''),
[create_time] datetime NOT NULL DEFAULT(getdate()) ,
[update_time] datetime NOT NULL DEFAULT(getdate())
)

GO

CREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier]
([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC)
WITH(IGNORE_DUP_KEY = ON)

GO
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package pub.dtm.client.barrier.itfc.impl;

import pub.dtm.client.barrier.itfc.BarrierDBOperator;
import pub.dtm.client.constant.ParamFieldConstants;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.Objects;

/**
* BarrierDBOperator for SqlServer
*
* @author yxou
* @date 2026-01-07
*/
public class BarrierSqlServerOperator implements BarrierDBOperator {
private Object connection;

public BarrierSqlServerOperator(Object connection) {
this.connection = connection;
}

@Override
public boolean insertBarrier(String transType, String gid, String branchId, String op, int barrierId) throws Exception {
if (Objects.isNull(connection)) {
return false;
}
Connection conn = (Connection)this.connection;
conn.setAutoCommit(false);
PreparedStatement preparedStatement = null;
try {
String sql = "insert into dbo.barrier(trans_type, gid, branch_id, op, barrier_id, reason) values(?,?,?,?,?,?)";
preparedStatement = conn.prepareStatement(sql);
preparedStatement.setString(1, transType);
preparedStatement.setString(2, gid);
preparedStatement.setString(3, branchId);
preparedStatement.setString(4, op);
preparedStatement.setString(5, String.format("%02d", barrierId));
preparedStatement.setString(6, op);

if (preparedStatement.executeUpdate() == 0) {
return false;
}
if (ParamFieldConstants.CANCEL.equals(op)) {
int opIndex = 4;
preparedStatement.setString(opIndex, ParamFieldConstants.TRY);
if (preparedStatement.executeUpdate() > 0) {
return false;
}
}
} finally {
if (Objects.nonNull(preparedStatement)) {
preparedStatement.close();
}
}
return true;
}

@Override
public void commit() throws Exception {
if (Objects.isNull(connection)) {
return;
}
Connection conn = (Connection)this.connection;
conn.commit();
conn.setAutoCommit(true);
}

@Override
public void rollback() throws Exception {
if (Objects.isNull(connection)) {
return;
}
Connection conn = (Connection)this.connection;
conn.rollback();
conn.setAutoCommit(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import org.junit.jupiter.api.Test;
import pub.dtm.client.barrier.itfc.impl.BarrierMysqlOperator;
import pub.dtm.client.barrier.itfc.impl.BarrierSqlServerOperator;
import pub.dtm.client.busi.BusiUtil;
import pub.dtm.client.busi.DbType;
import pub.dtm.client.busi.TransReq;
import pub.dtm.client.enums.TransTypeEnum;

Expand Down Expand Up @@ -37,4 +39,32 @@ void mysql_call2() throws Exception {
BusiUtil.adjustTrading(connection, new TransReq(1, 30));
});
}

@Test
void sqlserver_call1() throws Exception {
BranchBarrier branchBarrier = new BranchBarrier();
branchBarrier.setGid("gid-unit-test1-" + System.currentTimeMillis());
branchBarrier.setTransTypeEnum(TransTypeEnum.TCC);
branchBarrier.setBranchId("branch-1");
branchBarrier.setOp("try");

Connection connection = BusiUtil.getSqlServerConnection();
branchBarrier.call(new BarrierSqlServerOperator(connection), (barrier) -> {
BusiUtil.adjustBalance(connection, new TransReq(1, 30), DbType.SQLSERVER);
});
}

@Test
void sqlserver_call2() throws Exception {
BranchBarrier branchBarrier = new BranchBarrier();
branchBarrier.setGid("gid-unit-test2-" + System.currentTimeMillis());
branchBarrier.setTransTypeEnum(TransTypeEnum.TCC);
branchBarrier.setBranchId("branch-1");
branchBarrier.setOp("try");

Connection connection = BusiUtil.getSqlServerConnection();
branchBarrier.call(new BarrierSqlServerOperator(connection), (barrier) -> {
BusiUtil.adjustTrading(connection, new TransReq(1, 30), DbType.SQLSERVER);
});
}
}
68 changes: 66 additions & 2 deletions dtmcli-core/src/test/java/pub/dtm/client/busi/BusiUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,23 @@ public static Connection getConnection() throws Exception {
* @throws SQLException
*/
public static void adjustTrading(Connection connection, TransReq transReq) throws Exception {
String sql = "update dtm_busi.user_account set trading_balance=trading_balance+?"
adjustTrading(connection, transReq, DbType.MYSQL);
}

/**
* 更新交易金额
*
* @param connection
* @param transReq
* @param dbType
* @throws SQLException
*/
public static void adjustTrading(Connection connection, TransReq transReq, DbType dbType) throws Exception {

String sql = "update %s set trading_balance=trading_balance+?"
+ " where user_id=? and trading_balance + ? + balance >= 0";
sql = String.format(sql, getTableName(dbType));

PreparedStatement preparedStatement = null;
try {
preparedStatement = connection.prepareStatement(sql);
Expand All @@ -48,9 +63,23 @@ public static void adjustTrading(Connection connection, TransReq transReq) throw
* 更新余额
*/
public static void adjustBalance(Connection connection, TransReq transReq) throws SQLException {
adjustBalance(connection, transReq, DbType.MYSQL);
}

/**
* 更新余额
*
* @param connection
* @param transReq
* @param dbType
* @throws SQLException
*/
public static void adjustBalance(Connection connection, TransReq transReq, DbType dbType) throws SQLException {
PreparedStatement preparedStatement = null;
try {
String sql = "update dtm_busi.user_account set trading_balance=trading_balance-?,balance=balance+? where user_id=?";
String sql = "update %s set trading_balance=trading_balance-?,balance=balance+? where user_id=?";
sql = String.format(sql, getTableName(dbType));

preparedStatement = connection.prepareStatement(sql);
preparedStatement.setInt(1, transReq.amount);
preparedStatement.setInt(2, transReq.amount);
Expand All @@ -64,4 +93,39 @@ public static void adjustBalance(Connection connection, TransReq transReq) throw
}
}
}

/**
* 获取MsSql连接
*
* @return
* @throws Exception
*/
public static Connection getSqlServerConnection() throws Exception {
String url = "jdbc:sqlserver://localhost:1433;DatabaseName=dtm_busi;encrypt=true;trustServerCertificate=true;";
String user = "sa";
String password = "";
return java.sql.DriverManager.getConnection(url, user, password);
}

/**
* 根据数据库类型获取表名
*
* @param dbType
* @return
*/
private static String getTableName(DbType dbType) {
String tableName;
switch (dbType){
case MYSQL:
tableName="dtm_busi.user_account";
break;
case SQLSERVER:
tableName="dtm_busi.dbo.user_account";
break;
default:
tableName="dtm_busi.user_account";
break;
}
return tableName;
}
}
6 changes: 6 additions & 0 deletions dtmcli-core/src/test/java/pub/dtm/client/busi/DbType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package pub.dtm.client.busi;

public enum DbType {
MYSQL,
SQLSERVER;
}