diff --git a/tests/BusiGrpcService/Services/BusiApiService.cs b/tests/BusiGrpcService/Services/BusiApiService.cs index df9e1a2..34d2f1e 100644 --- a/tests/BusiGrpcService/Services/BusiApiService.cs +++ b/tests/BusiGrpcService/Services/BusiApiService.cs @@ -5,6 +5,7 @@ using System.Text.Json; using Dapper; using System.Data.Common; +using DtmCommon; using DtmSERedisBarrier; namespace BusiGrpcService.Services @@ -140,6 +141,18 @@ public override async Task QueryPrepared(BusiReq request, ServerCallC throw Dtmgrpc.DtmGImp.Utils.DtmError2GrpcError(ex); } + // real mysql query prepared demo, just copy it! + public override async Task QueryPreparedMySqlReal(BusiReq request, ServerCallContext context) + { + BranchBarrier barrier = _barrierFactory.CreateBranchBarrier(context); + string result = await barrier.QueryPrepared(this.GetBarrierConn()); + + Exception ex = Dtmgrpc.DtmGImp.Utils.String2DtmError(result); + if (ex != null) + throw Dtmgrpc.DtmGImp.Utils.DtmError2GrpcError(ex); + return new Empty(); + } + public override async Task TransInRedis(BusiReq request, ServerCallContext context) { _logger.LogInformation("TransInRedis req={req}", JsonSerializer.Serialize(request)); diff --git a/tests/Dtmgrpc.IntegrationTests/Dtmgrpc.IntegrationTests.csproj b/tests/Dtmgrpc.IntegrationTests/Dtmgrpc.IntegrationTests.csproj index 42a896c..7774af3 100644 --- a/tests/Dtmgrpc.IntegrationTests/Dtmgrpc.IntegrationTests.csproj +++ b/tests/Dtmgrpc.IntegrationTests/Dtmgrpc.IntegrationTests.csproj @@ -19,6 +19,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all + diff --git a/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs b/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs index fa6a10d..d19ae00 100644 --- a/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs +++ b/tests/Dtmgrpc.IntegrationTests/MsgGrpcTest.cs @@ -1,6 +1,11 @@ using Microsoft.Extensions.DependencyInjection; using System; +using System.Data.Common; using System.Threading.Tasks; +using System.Transactions; +using Dapper; +using Grpc.Core; +using MySqlConnector; using Xunit; namespace Dtmgrpc.IntegrationTests @@ -27,5 +32,100 @@ public async Task Submit_Should_Succeed() var status = await ITTestHelper.GetTranStatus(gid); Assert.Equal("succeed", status); } + + [Fact] + public async Task DoAndSubmit_Should_DbTrans_Exception() + { + var provider = ITTestHelper.AddDtmGrpc(); + var transFactory = provider.GetRequiredService(); + + var gid = "msgTestGid" + Guid.NewGuid().ToString(); + var msg = transFactory.NewMsgGrpc(gid); + var req = ITTestHelper.GenBusiReq(false, false); + var busiGrpc = ITTestHelper.BuisgRPCUrl; + + msg.Add(busiGrpc + "/busi.Busi/TransIn", req); + // do TransOut local, then TransIn with DTM. + await Assert.ThrowsAsync(async () => + { + // System.InvalidOperationException: A TransactionScope must be disposed on the same thread that it was created. + // + // System.InvalidOperationException + // A TransactionScope must be disposed on the same thread that it was created. + // at Dtmgrpc.MsgGrpc.DoAndSubmit(String queryPrepared, Func`2 busiCall, CancellationToken cancellationToken) in /home/yunjin/Data/projects/github/dtm-labs/client-csharp/src/Dtmgrpc/Msg/MsgGrpc.cs:line 110 + + await msg.DoAndSubmit(busiGrpc + "/busi.Busi/QueryPreparedMySqlReal", async branchBarrier => + { + MySqlConnection conn = getBarrierMySqlConnection(); + await branchBarrier.Call(conn, () => + { + Task task = this.LocalAdjustBalance(conn, TransOutUID, -req.Amount, "SUCCESS"); + return task; + }, + TransactionScopeOption.Required, + IsolationLevel.ReadCommitted + // , default TransactionScopeAsyncFlowOption.Suppress + ); + }); + }); + + await Task.Delay(4000); + var status = await ITTestHelper.GetTranStatus(gid); + // The exception did not affect the local transaction commit + Assert.Equal("succeed", status); + } + + [Fact] + public async Task DoAndSubmit_Should_Succeed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var transFactory = provider.GetRequiredService(); + + var gid = "msgTestGid" + Guid.NewGuid().ToString(); + var msg = transFactory.NewMsgGrpc(gid); + var req = ITTestHelper.GenBusiReq(false, false); + var busiGrpc = ITTestHelper.BuisgRPCUrl; + + msg.Add(busiGrpc + "/busi.Busi/TransIn", req); + // do TransOut local, then TransIn with DTM. + + await msg.DoAndSubmit(busiGrpc + "/busi.Busi/QueryPreparedMySqlReal", async branchBarrier => + { + MySqlConnection conn = getBarrierMySqlConnection(); + await branchBarrier.Call(conn, () => + { + Task task = this.LocalAdjustBalance(conn, TransOutUID, -req.Amount, "SUCCESS"); + return task; + }, + TransactionScopeOption.Required, + IsolationLevel.ReadCommitted, + TransactionScopeAsyncFlowOption.Enabled); + }); + + await Task.Delay(2000); + var status = await ITTestHelper.GetTranStatus(gid); + Assert.Equal("succeed", status); + } + + private static readonly int TransOutUID = 1; + + private static readonly int TransInUID = 2; + + private MySqlConnection getBarrierMySqlConnection() => new("Server=localhost;port=3306;User ID=root;Password=123456;Database=dtm_barrier"); + + private async Task LocalAdjustBalance(DbConnection conn, int uid, long amount, string result) + { + // _logger.LogInformation("AdjustBalanceLocal uid={uid}, amount={amount}, result={result}", uid, amount, result); + + if (result.Equals("FAILURE")) + { + throw new RpcException(new Status(StatusCode.Aborted, "FAILURE")); + } + + await conn.ExecuteAsync( + sql: "update dtm_busi.user_account set balance = balance + @balance where user_id = @user_id", + param: new { balance = amount, user_id = uid } + ); + } } } diff --git a/tests/protos/busi.proto b/tests/protos/busi.proto index 86932ea..932ef1e 100644 --- a/tests/protos/busi.proto +++ b/tests/protos/busi.proto @@ -45,6 +45,6 @@ service Busi { rpc TransOutRevertRedis(BusiReq) returns (google.protobuf.Empty) {} rpc QueryPrepared(BusiReq) returns (BusiReply) {} - rpc QueryPreparedB(BusiReq) returns (google.protobuf.Empty) {} + rpc QueryPreparedMySqlReal(BusiReq) returns (google.protobuf.Empty) {} rpc QueryPreparedRedis(BusiReq) returns (google.protobuf.Empty) {} } \ No newline at end of file