• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.45
/source/dnode/mnode/impl/src/mndTrans.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndTrans.h"
18
#include "mndDb.h"
19
#include "mndPrivilege.h"
20
#include "mndShow.h"
21
#include "mndStb.h"
22
#include "mndSubscribe.h"
23
#include "mndSync.h"
24
#include "mndUser.h"
25
#include "mndVgroup.h"
26
#include "osTime.h"
27

28
#define TRANS_VER1_NUMBER  1
29
#define TRANS_VER2_NUMBER  2
30
#define TRANS_ARRAY_SIZE   8
31
#define TRANS_RESERVE_SIZE 42
32

33
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
34
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
35
static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc);
36

37
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
38
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
39
static void    mndTransDropLogs(SArray *pArray);
40
static void    mndTransDropActions(SArray *pArray);
41

42
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf);
43
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf);
44
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf);
45
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
46
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
47
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
48
static bool    mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
49
static bool    mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
50
static bool    mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
51
static bool    mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
52
static bool    mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
53
static bool    mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
54
static bool    mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
55
static bool    mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
56

57
static inline bool mndTransIsInSyncContext(bool topHalf) { return !topHalf; }
512,751✔
58

59
static bool mndCannotExecuteTrans(SMnode *pMnode, bool topHalf) {
428,192✔
60
  bool isLeader = mndIsLeader(pMnode);
428,192✔
61
  bool ret = (!pMnode->deploy && !isLeader) || mndTransIsInSyncContext(topHalf);
428,192✔
62
  if (ret) mDebug("cannot execute trans action, deploy:%d, isLeader:%d, topHalf:%d", pMnode->deploy, isLeader, topHalf);
428,192✔
63
  return ret;
428,192✔
64
}
65

66
static inline char *mndStrExecutionContext(bool topHalf) { return topHalf ? "transContext" : "syncContext"; }
529,165✔
67

68
static void    mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
69
static int32_t mndProcessTransTimer(SRpcMsg *pReq);
70
static int32_t mndProcessTtl(SRpcMsg *pReq);
71
static int32_t mndProcessKillTransReq(SRpcMsg *pReq);
72

73
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
74
static void    mndCancelGetNextTrans(SMnode *pMnode, void *pIter);
75
static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
76
static int32_t tsMaxTransId = 0;
77

78
int32_t mndInitTrans(SMnode *pMnode) {
1,748✔
79
  SSdbTable table = {
1,748✔
80
      .sdbType = SDB_TRANS,
81
      .keyType = SDB_KEY_INT32,
82
      .encodeFp = (SdbEncodeFp)mndTransEncode,
83
      .decodeFp = (SdbDecodeFp)mndTransDecode,
84
      .insertFp = (SdbInsertFp)mndTransActionInsert,
85
      .updateFp = (SdbUpdateFp)mndTransActionUpdate,
86
      .deleteFp = (SdbDeleteFp)mndTransDelete,
87
  };
88

89
  mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransTimer);
1,748✔
90
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
1,748✔
91

92
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
1,748✔
93
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans);
1,748✔
94
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANSACTION_DETAIL, mndRetrieveTransDetail);
1,748✔
95
  return sdbSetTable(pMnode->pSdb, table);
1,748✔
96
}
97

98
void mndCleanupTrans(SMnode *pMnode) {}
1,747✔
99

100
static int32_t mndTransGetActionsSize(SArray *pArray) {
614,156✔
101
  int32_t actionNum = taosArrayGetSize(pArray);
614,156✔
102
  int32_t rawDataLen = 0;
614,156✔
103

104
  for (int32_t i = 0; i < actionNum; ++i) {
1,838,126✔
105
    STransAction *pAction = taosArrayGet(pArray, i);
1,223,970✔
106
    if (pAction->actionType == TRANS_ACTION_RAW) {
1,223,970✔
107
      rawDataLen += (sizeof(STransAction) + sdbGetRawTotalSize(pAction->pRaw));
745,874✔
108
    } else if (pAction->actionType == TRANS_ACTION_MSG) {
478,096!
109
      rawDataLen += (sizeof(STransAction) + pAction->contLen);
478,096✔
110
    } else {
111
      // empty
112
    }
113
    rawDataLen += sizeof(int8_t);
1,223,970✔
114
  }
115

116
  return rawDataLen;
614,156✔
117
}
118

119
static int32_t mndTransEncodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionsNum) {
614,156✔
120
  int32_t code = 0;
614,156✔
121
  int32_t lino = 0;
614,156✔
122
  int32_t dataPos = *offset;
614,156✔
123
  int8_t  unused = 0;
614,156✔
124
  int32_t ret = -1;
614,156✔
125

126
  for (int32_t i = 0; i < actionsNum; ++i) {
1,838,126✔
127
    STransAction *pAction = taosArrayGet(pActions, i);
1,223,970✔
128
    SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
1,223,970!
129
    SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
1,223,970!
130
    SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
1,223,970!
131
    SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
1,223,970!
132
    SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
1,223,970!
133
    SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
1,223,970!
134
    SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
1,223,970!
135
    if (pAction->actionType == TRANS_ACTION_RAW) {
1,223,970✔
136
      int32_t len = sdbGetRawTotalSize(pAction->pRaw);
745,874✔
137
      SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
745,874!
138
      SDB_SET_INT32(pRaw, dataPos, len, _OVER)
745,874!
139
      SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
745,874!
140
    } else if (pAction->actionType == TRANS_ACTION_MSG) {
478,096!
141
      SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
478,096!
142
      SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
478,096!
143
      SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
478,096!
144
      SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
478,096!
145
      SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
478,096!
146
      SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
478,096!
147
    } else {
148
      // nothing
149
    }
150
  }
151
  ret = 0;
614,156✔
152

153
_OVER:
614,156✔
154
  *offset = dataPos;
614,156✔
155
  return ret;
614,156✔
156
}
157

158
SSdbRaw *mndTransEncode(STrans *pTrans) {
153,539✔
159
  int32_t code = 0;
153,539✔
160
  int32_t lino = 0;
153,539✔
161
  terrno = TSDB_CODE_INVALID_MSG;
153,539✔
162
  int8_t sver = TRANS_VER2_NUMBER;
153,539✔
163

164
  int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE + pTrans->paramLen;
153,539✔
165
  rawDataLen += mndTransGetActionsSize(pTrans->prepareActions);
153,539✔
166
  rawDataLen += mndTransGetActionsSize(pTrans->redoActions);
153,539✔
167
  rawDataLen += mndTransGetActionsSize(pTrans->undoActions);
153,539✔
168
  rawDataLen += mndTransGetActionsSize(pTrans->commitActions);
153,539✔
169

170
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, sver, rawDataLen);
153,539✔
171
  if (pRaw == NULL) {
153,539!
172
    mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
×
173
    return NULL;
×
174
  }
175

176
  int32_t dataPos = 0;
153,539✔
177
  SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER)
153,539!
178
  SDB_SET_INT8(pRaw, dataPos, pTrans->stage, _OVER)
153,539!
179
  SDB_SET_INT8(pRaw, dataPos, pTrans->policy, _OVER)
153,539!
180
  SDB_SET_INT8(pRaw, dataPos, pTrans->conflict, _OVER)
153,539!
181
  SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER)
153,539!
182
  SDB_SET_INT8(pRaw, dataPos, pTrans->oper, _OVER)
153,539!
183
  SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
153,539!
184
  SDB_SET_INT16(pRaw, dataPos, pTrans->originRpcType, _OVER)
153,539!
185
  SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
153,539!
186
  SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
153,539!
187
  SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
153,539!
188
  SDB_SET_INT32(pRaw, dataPos, pTrans->actionPos, _OVER)
153,539!
189

190
  int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions);
153,539✔
191
  int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
153,539✔
192
  int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
153,539✔
193
  int32_t commitActionNum = taosArrayGetSize(pTrans->commitActions);
153,539✔
194

195
  if (sver > TRANS_VER1_NUMBER) {
153,539!
196
    SDB_SET_INT32(pRaw, dataPos, prepareActionNum, _OVER)
153,539!
197
  }
198
  SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER)
153,539!
199
  SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER)
153,539!
200
  SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER)
153,539!
201

202
  if (mndTransEncodeAction(pRaw, &dataPos, pTrans->prepareActions, prepareActionNum) < 0) goto _OVER;
153,539!
203
  if (mndTransEncodeAction(pRaw, &dataPos, pTrans->redoActions, redoActionNum) < 0) goto _OVER;
153,539!
204
  if (mndTransEncodeAction(pRaw, &dataPos, pTrans->undoActions, undoActionNum) < 0) goto _OVER;
153,539!
205
  if (mndTransEncodeAction(pRaw, &dataPos, pTrans->commitActions, commitActionNum) < 0) goto _OVER;
153,539!
206

207
  SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER)
153,539!
208
  SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, _OVER)
153,539!
209
  SDB_SET_INT32(pRaw, dataPos, pTrans->paramLen, _OVER)
153,539!
210
  if (pTrans->param != NULL) {
153,539!
211
    SDB_SET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, _OVER)
×
212
  }
213

214
  SDB_SET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER)
153,539!
215

216
  int32_t arbGroupNum = taosHashGetSize(pTrans->arbGroupIds);
153,539✔
217
  SDB_SET_INT32(pRaw, dataPos, arbGroupNum, _OVER)
153,539!
218
  void *pIter = NULL;
153,539✔
219
  pIter = taosHashIterate(pTrans->arbGroupIds, NULL);
153,539✔
220
  while (pIter) {
153,575✔
221
    int32_t arbGroupId = *(int32_t *)pIter;
36✔
222
    SDB_SET_INT32(pRaw, dataPos, arbGroupId, _OVER)
36!
223
    pIter = taosHashIterate(pTrans->arbGroupIds, pIter);
36✔
224
  }
225

226
  if (sver > TRANS_VER1_NUMBER) {
153,539!
227
    SDB_SET_INT8(pRaw, dataPos, pTrans->ableToBeKilled, _OVER)
153,539!
228
    SDB_SET_INT32(pRaw, dataPos, pTrans->killMode, _OVER)
153,539!
229
  }
230

231
  SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
153,539!
232
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
153,539!
233

234
  terrno = 0;
153,539✔
235

236
_OVER:
153,539✔
237
  if (terrno != 0) {
153,539!
238
    mError("trans:%d, failed to encode to raw:%p maxlen:%d len:%d since %s", pTrans->id, pRaw, sdbGetRawTotalSize(pRaw),
×
239
           dataPos, terrstr());
240
    sdbFreeRaw(pRaw);
×
241
    return NULL;
×
242
  }
243

244
  mTrace("trans:%d, encode to raw:%p, row:%p len:%d", pTrans->id, pRaw, pTrans, dataPos);
153,539✔
245
  return pRaw;
153,539✔
246
}
247

248
static int32_t mndTransDecodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionNum) {
1,159,624✔
249
  int32_t      code = 0;
1,159,624✔
250
  int32_t      lino = 0;
1,159,624✔
251
  STransAction action = {0};
1,159,624✔
252
  int32_t      dataPos = *offset;
1,159,624✔
253
  int8_t       unused = 0;
1,159,624✔
254
  int8_t       stage = 0;
1,159,624✔
255
  int8_t       actionType = 0;
1,159,624✔
256
  int32_t      dataLen = 0;
1,159,624✔
257
  int32_t      ret = -1;
1,159,624✔
258

259
  for (int32_t i = 0; i < actionNum; ++i) {
3,431,536✔
260
    memset(&action, 0, sizeof(action));
2,271,912✔
261
    SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
2,271,912!
262
    SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
2,271,912!
263
    SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
2,271,912!
264
    SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
2,271,912!
265
    SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
2,271,912!
266
    action.actionType = actionType;
2,271,912✔
267
    SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
2,271,912!
268
    action.stage = stage;
2,271,912✔
269
    SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
2,271,912!
270
    if (action.actionType == TRANS_ACTION_RAW) {
2,271,912✔
271
      SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
1,351,843!
272
      SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
1,351,843!
273
      action.pRaw = taosMemoryMalloc(dataLen);
1,351,843!
274
      if (action.pRaw == NULL) goto _OVER;
1,351,843!
275
      mTrace("raw:%p, is created", action.pRaw);
1,351,843✔
276
      SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
1,351,843!
277
      if (taosArrayPush(pActions, &action) == NULL) goto _OVER;
1,351,843!
278
      action.pRaw = NULL;
1,351,843✔
279
    } else if (action.actionType == TRANS_ACTION_MSG) {
920,069!
280
      SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
920,069!
281
      tmsgUpdateDnodeEpSet(&action.epSet);
920,069✔
282
      SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
920,069!
283
      SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
920,069!
284
      SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
920,069!
285
      SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
920,069!
286
      action.pCont = taosMemoryMalloc(action.contLen);
920,069!
287
      if (action.pCont == NULL) goto _OVER;
920,069!
288
      SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
920,069!
289
      if (taosArrayPush(pActions, &action) == NULL) goto _OVER;
920,069!
290
      action.pCont = NULL;
920,069✔
291
    } else {
292
      if (taosArrayPush(pActions, &action) == NULL) goto _OVER;
×
293
    }
294
  }
295
  ret = 0;
1,159,624✔
296

297
_OVER:
1,159,624✔
298
  *offset = dataPos;
1,159,624✔
299
  taosMemoryFreeClear(action.pCont);
1,159,624!
300
  return ret;
1,159,624✔
301
}
302

303
SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
289,906✔
304
  terrno = TSDB_CODE_INVALID_MSG;
289,906✔
305
  int32_t  code = 0;
289,906✔
306
  int32_t  lino = 0;
289,906✔
307
  SSdbRow *pRow = NULL;
289,906✔
308
  STrans  *pTrans = NULL;
289,906✔
309
  char    *pData = NULL;
289,906✔
310
  int32_t  dataLen = 0;
289,906✔
311
  int8_t   sver = 0;
289,906✔
312
  int32_t  prepareActionNum = 0;
289,906✔
313
  int32_t  redoActionNum = 0;
289,906✔
314
  int32_t  undoActionNum = 0;
289,906✔
315
  int32_t  commitActionNum = 0;
289,906✔
316
  int32_t  dataPos = 0;
289,906✔
317
  int32_t  arbgroupIdNum = 0;
289,906✔
318

319
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
289,906!
320

321
  if (sver > TRANS_VER2_NUMBER) {
289,906!
322
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
323
    goto _OVER;
×
324
  }
325

326
  pRow = sdbAllocRow(sizeof(STrans));
289,906✔
327
  if (pRow == NULL) goto _OVER;
289,906!
328

329
  pTrans = sdbGetRowObj(pRow);
289,906✔
330
  if (pTrans == NULL) goto _OVER;
289,906!
331

332
  SDB_GET_INT32(pRaw, dataPos, &pTrans->id, _OVER)
289,906!
333

334
  int8_t stage = 0;
289,906✔
335
  int8_t policy = 0;
289,906✔
336
  int8_t conflict = 0;
289,906✔
337
  int8_t exec = 0;
289,906✔
338
  int8_t oper = 0;
289,906✔
339
  int8_t reserved = 0;
289,906✔
340
  int8_t actionType = 0;
289,906✔
341
  SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
289,906!
342
  SDB_GET_INT8(pRaw, dataPos, &policy, _OVER)
289,906!
343
  SDB_GET_INT8(pRaw, dataPos, &conflict, _OVER)
289,906!
344
  SDB_GET_INT8(pRaw, dataPos, &exec, _OVER)
289,906!
345
  SDB_GET_INT8(pRaw, dataPos, &oper, _OVER)
289,906!
346
  SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
289,906!
347
  pTrans->stage = stage;
289,906✔
348
  pTrans->policy = policy;
289,906✔
349
  pTrans->conflict = conflict;
289,906✔
350
  pTrans->exec = exec;
289,906✔
351
  pTrans->oper = oper;
289,906✔
352
  SDB_GET_INT16(pRaw, dataPos, &pTrans->originRpcType, _OVER)
289,906!
353
  SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
289,906!
354
  SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
289,906!
355
  SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
289,906!
356
  SDB_GET_INT32(pRaw, dataPos, &pTrans->actionPos, _OVER)
289,906!
357

358
  if (sver > TRANS_VER1_NUMBER) {
289,906!
359
    SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER)
289,906!
360
  }
361
  SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
289,906!
362
  SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
289,906!
363
  SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER)
289,906!
364

365
  pTrans->prepareActions = taosArrayInit(prepareActionNum, sizeof(STransAction));
289,906✔
366
  pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction));
289,906✔
367
  pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction));
289,906✔
368
  pTrans->commitActions = taosArrayInit(commitActionNum, sizeof(STransAction));
289,906✔
369

370
  if (pTrans->prepareActions == NULL) goto _OVER;
289,906!
371
  if (pTrans->redoActions == NULL) goto _OVER;
289,906!
372
  if (pTrans->undoActions == NULL) goto _OVER;
289,906!
373
  if (pTrans->commitActions == NULL) goto _OVER;
289,906!
374

375
  if (mndTransDecodeAction(pRaw, &dataPos, pTrans->prepareActions, prepareActionNum) < 0) goto _OVER;
289,906!
376
  if (mndTransDecodeAction(pRaw, &dataPos, pTrans->redoActions, redoActionNum) < 0) goto _OVER;
289,906!
377
  if (mndTransDecodeAction(pRaw, &dataPos, pTrans->undoActions, undoActionNum) < 0) goto _OVER;
289,906!
378
  if (mndTransDecodeAction(pRaw, &dataPos, pTrans->commitActions, commitActionNum) < 0) goto _OVER;
289,906!
379

380
  SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER)
289,906!
381
  SDB_GET_INT32(pRaw, dataPos, &pTrans->stopFunc, _OVER)
289,906!
382
  SDB_GET_INT32(pRaw, dataPos, &pTrans->paramLen, _OVER)
289,906!
383
  if (pTrans->paramLen != 0) {
289,906!
384
    pTrans->param = taosMemoryMalloc(pTrans->paramLen);
×
385
    if (pTrans->param == NULL) goto _OVER;
×
386
    SDB_GET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, _OVER);
×
387
  }
388

389
  SDB_GET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER);
289,906!
390

391
  pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
289,906✔
392

393
  SDB_GET_INT32(pRaw, dataPos, &arbgroupIdNum, _OVER)
289,906!
394
  for (int32_t i = 0; i < arbgroupIdNum; ++i) {
289,966✔
395
    int32_t arbGroupId = 0;
60✔
396
    SDB_GET_INT32(pRaw, dataPos, &arbGroupId, _OVER)
60!
397
    if ((terrno = taosHashPut(pTrans->arbGroupIds, &arbGroupId, sizeof(int32_t), NULL, 0)) != 0) goto _OVER;
60!
398
  }
399

400
  int8_t ableKill = 0;
289,906✔
401
  int8_t killMode = 0;
289,906✔
402
  SDB_GET_INT8(pRaw, dataPos, &ableKill, _OVER)
289,906!
403
  SDB_GET_INT8(pRaw, dataPos, &killMode, _OVER)
289,906!
404
  pTrans->ableToBeKilled = ableKill;
289,906✔
405
  pTrans->killMode = killMode;
289,906✔
406

407
  SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
289,906!
408

409
  terrno = 0;
289,906✔
410

411
_OVER:
289,906✔
412
  if (terrno != 0 && pTrans != NULL) {
289,906!
413
    mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
×
414
    mndTransDropData(pTrans);
×
415
    taosMemoryFreeClear(pRow);
×
416
    return NULL;
×
417
  }
418

419
  if (pTrans != NULL) {
289,906!
420
    mTrace("trans:%d, decode from raw:%p, row:%p", pTrans->id, pRaw, pTrans);
289,906✔
421
  }
422
  return pRow;
289,906✔
423
}
424

425
static const char *mndTransStr(ETrnStage stage) {
2,307,620✔
426
  switch (stage) {
2,307,620!
427
    case TRN_STAGE_PREPARE:
146,199✔
428
      return "prepare";
146,199✔
429
    case TRN_STAGE_REDO_ACTION:
1,006,887✔
430
      return "redoAction";
1,006,887✔
431
    case TRN_STAGE_ROLLBACK:
19✔
432
      return "rollback";
19✔
433
    case TRN_STAGE_UNDO_ACTION:
45,550✔
434
      return "undoAction";
45,550✔
435
    case TRN_STAGE_COMMIT:
244,909✔
436
      return "commit";
244,909✔
437
    case TRN_STAGE_COMMIT_ACTION:
502,610✔
438
      return "commitAction";
502,610✔
439
    case TRN_STAGE_FINISH:
361,427✔
440
      return "finished";
361,427✔
441
    case TRN_STAGE_PRE_FINISH:
19✔
442
      return "pre-finish";
19✔
443
    default:
×
444
      return "invalid";
×
445
  }
446
}
447

448
static const char *mndTransTypeStr(ETrnAct actionType) {
×
449
  switch (actionType) {
×
450
    case TRANS_ACTION_MSG:
×
451
      return "msg";
×
452
    case TRANS_ACTION_RAW:
×
453
      return "sdb";
×
454
    default:
×
455
      return "invalid";
×
456
  }
457
}
458

459
static void mndSetTransLastAction(STrans *pTrans, STransAction *pAction) {
683,467✔
460
  if (pAction != NULL) {
683,467✔
461
    if (pAction->errCode != TSDB_CODE_ACTION_IN_PROGRESS) {
558,657✔
462
      pTrans->lastAction = pAction->id;
358,339✔
463
      pTrans->lastMsgType = pAction->msgType;
358,339✔
464
      pTrans->lastEpset = pAction->epSet;
358,339✔
465
      pTrans->lastErrorNo = pAction->errCode;
358,339✔
466
    }
467
  } else {
468
    pTrans->lastAction = 0;
124,810✔
469
    pTrans->lastMsgType = 0;
124,810✔
470
    memset(&pTrans->lastEpset, 0, sizeof(pTrans->lastEpset));
124,810✔
471
    pTrans->lastErrorNo = 0;
124,810✔
472
  }
473
}
683,467✔
474

475
static void mndTransTestStartFunc(SMnode *pMnode, void *param, int32_t paramLen) {
×
476
  mInfo("test trans start, param:%s, len:%d", (char *)param, paramLen);
×
477
}
×
478

479
static void mndTransTestStopFunc(SMnode *pMnode, void *param, int32_t paramLen) {
×
480
  mInfo("test trans stop, param:%s, len:%d", (char *)param, paramLen);
×
481
}
×
482

483
static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
2,064✔
484
  switch (ftype) {
2,064!
485
    case TRANS_START_FUNC_TEST:
×
486
      return mndTransTestStartFunc;
×
487
    case TRANS_STOP_FUNC_TEST:
×
488
      return mndTransTestStopFunc;
×
489
    case TRANS_START_FUNC_MQ_REB:
1,032✔
490
      return mndRebCntInc;
1,032✔
491
    case TRANS_STOP_FUNC_MQ_REB:
1,032✔
492
      return mndRebCntDec;
1,032✔
493
    default:
×
494
      return NULL;
×
495
  }
496
}
497

498
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
54,536✔
499
  mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans,
54,536!
500
        mndTransStr(pTrans->stage), pTrans->startFunc);
501

502
  (void)taosThreadMutexInit(&pTrans->mutex, NULL);
54,536✔
503

504
  if (pTrans->startFunc > 0) {
54,536✔
505
    TransCbFp fp = mndTransGetCbFp(pTrans->startFunc);
1,032✔
506
    if (fp) {
1,032!
507
      (*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen);
1,032✔
508
    }
509
    // pTrans->startFunc = 0;
510
  }
511

512
  if (pTrans->stage == TRN_STAGE_COMMIT) {
54,536✔
513
    pTrans->stage = TRN_STAGE_COMMIT_ACTION;
4✔
514
    mInfo("trans:%d, stage from commit to commitAction since perform update action", pTrans->id);
4!
515
  }
516

517
  if (pTrans->stage == TRN_STAGE_ROLLBACK) {
54,536!
UNCOV
518
    pTrans->stage = TRN_STAGE_UNDO_ACTION;
×
UNCOV
519
    mInfo("trans:%d, stage from rollback to undoAction since perform update action", pTrans->id);
×
520
  }
521

522
  if (pTrans->stage == TRN_STAGE_PRE_FINISH) {
54,536!
523
    pTrans->stage = TRN_STAGE_FINISH;
×
524
    mInfo("trans:%d, stage from pre-finish to finished since perform update action", pTrans->id);
×
525
  }
526

527
  return 0;
54,536✔
528
}
529

530
void mndTransDropData(STrans *pTrans) {
335,698✔
531
  if (pTrans->prepareActions != NULL) {
335,698!
532
    mndTransDropActions(pTrans->prepareActions);
335,698✔
533
    pTrans->prepareActions = NULL;
335,698✔
534
  }
535
  if (pTrans->redoActions != NULL) {
335,698!
536
    mndTransDropActions(pTrans->redoActions);
335,698✔
537
    pTrans->redoActions = NULL;
335,698✔
538
  }
539
  if (pTrans->undoActions != NULL) {
335,698!
540
    mndTransDropActions(pTrans->undoActions);
335,698✔
541
    pTrans->undoActions = NULL;
335,698✔
542
  }
543
  if (pTrans->commitActions != NULL) {
335,698!
544
    mndTransDropActions(pTrans->commitActions);
335,698✔
545
    pTrans->commitActions = NULL;
335,698✔
546
  }
547
  if (pTrans->arbGroupIds != NULL) {
335,698!
548
    taosHashCleanup(pTrans->arbGroupIds);
335,698✔
549
  }
550
  if (pTrans->pRpcArray != NULL) {
335,698✔
551
    taosArrayDestroy(pTrans->pRpcArray);
45,792✔
552
    pTrans->pRpcArray = NULL;
45,792✔
553
  }
554
  if (pTrans->rpcRsp != NULL) {
335,698✔
555
    taosMemoryFree(pTrans->rpcRsp);
12,379!
556
    pTrans->rpcRsp = NULL;
12,379✔
557
    pTrans->rpcRspLen = 0;
12,379✔
558
  }
559
  if (pTrans->param != NULL) {
335,698!
560
    taosMemoryFree(pTrans->param);
×
561
    pTrans->param = NULL;
×
562
    pTrans->paramLen = 0;
×
563
  }
564
  (void)taosThreadMutexDestroy(&pTrans->mutex);
335,698✔
565
}
335,698✔
566

567
static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
172,192✔
568
  mInfo("trans:%d, perform delete action, row:%p stage:%s callfunc:%d, stopFunc:%d", pTrans->id, pTrans,
172,192!
569
        mndTransStr(pTrans->stage), callFunc, pTrans->stopFunc);
570

571
  if (pTrans->stopFunc > 0 && callFunc) {
172,192✔
572
    TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc);
1,032✔
573
    if (fp) {
1,032!
574
      (*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen);
1,032✔
575
    }
576
    // pTrans->stopFunc = 0;
577
  }
578

579
  mndTransDropData(pTrans);
172,192✔
580
  return 0;
172,192✔
581
}
582

583
static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) {
253,376✔
584
  for (int32_t i = 0; i < taosArrayGetSize(pOldArray); ++i) {
781,551✔
585
    STransAction *pOldAction = taosArrayGet(pOldArray, i);
528,175✔
586
    STransAction *pNewAction = taosArrayGet(pNewArray, i);
528,175✔
587
    pOldAction->rawWritten = pNewAction->rawWritten;
528,175✔
588
    pOldAction->msgSent = pNewAction->msgSent;
528,175✔
589
    pOldAction->msgReceived = pNewAction->msgReceived;
528,175✔
590
    pOldAction->errCode = pNewAction->errCode;
528,175✔
591
  }
592
}
253,376✔
593

594
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
63,344✔
595
  mInfo("trans:%d, perform update action, old row:%p stage:%s create:%" PRId64 ", new row:%p stage:%s create:%" PRId64,
63,344!
596
        pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage), pNew->createdTime);
597

598
  if (pOld->createdTime != pNew->createdTime) {
63,344!
599
    mError("trans:%d, failed to perform update action since createTime not match, old row:%p stage:%s create:%" PRId64
×
600
           ", new row:%p stage:%s create:%" PRId64,
601
           pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
602
           pNew->createdTime);
603
    // only occured while sync timeout
604
    TAOS_RETURN(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT);
×
605
  }
606

607
  mndTransUpdateActions(pOld->prepareActions, pNew->prepareActions);
63,344✔
608
  mndTransUpdateActions(pOld->redoActions, pNew->redoActions);
63,344✔
609
  mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
63,344✔
610
  mndTransUpdateActions(pOld->commitActions, pNew->commitActions);
63,344✔
611
  pOld->stage = pNew->stage;
63,344✔
612
  pOld->actionPos = pNew->actionPos;
63,344✔
613

614
  if (pOld->stage == TRN_STAGE_COMMIT) {
63,344✔
615
    pOld->stage = TRN_STAGE_COMMIT_ACTION;
54,306✔
616
    mInfo("trans:%d, stage from commit to commitAction since perform update action", pNew->id);
54,306!
617
  }
618

619
  if (pOld->stage == TRN_STAGE_ROLLBACK) {
63,344✔
620
    pOld->stage = TRN_STAGE_UNDO_ACTION;
5✔
621
    mInfo("trans:%d, stage from rollback to undoAction since perform update action", pNew->id);
5!
622
  }
623

624
  if (pOld->stage == TRN_STAGE_PRE_FINISH) {
63,344✔
625
    pOld->stage = TRN_STAGE_FINISH;
5✔
626
    mInfo("trans:%d, stage from pre-finish to finished since perform update action", pNew->id);
5!
627
  }
628

629
  return 0;
63,344✔
630
}
631

632
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
272,135✔
633
  STrans *pTrans = sdbAcquire(pMnode->pSdb, SDB_TRANS, &transId);
272,135✔
634
  if (pTrans == NULL) {
272,135✔
635
    terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
4,668✔
636
  }
637
  return pTrans;
272,135✔
638
}
639

640
void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
267,468✔
641
  SSdb *pSdb = pMnode->pSdb;
267,468✔
642
  if (pTrans != NULL) mInfo("vgId:1, trans:%d, release transaction", pTrans->id);
267,468!
643
  sdbRelease(pSdb, pTrans);
267,468✔
644
}
267,468✔
645

646
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq,
45,792✔
647
                       const char *opername) {
648
  STrans *pTrans = taosMemoryCalloc(1, sizeof(STrans));
45,792!
649
  if (pTrans == NULL) {
45,792!
650
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
651
    mError("failed to create transaction since %s", terrstr());
×
652
    return NULL;
×
653
  }
654

655
  if (opername != NULL) {
45,792!
656
    tstrncpy(pTrans->opername, opername, TSDB_TRANS_OPER_LEN);
45,792✔
657
  }
658

659
  int32_t sdbMaxId = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
45,792✔
660
  sdbReadLock(pMnode->pSdb, SDB_TRANS);
45,792✔
661
  pTrans->id = TMAX(sdbMaxId, tsMaxTransId + 1);
45,792✔
662
  sdbUnLock(pMnode->pSdb, SDB_TRANS);
45,792✔
663
  pTrans->stage = TRN_STAGE_PREPARE;
45,792✔
664
  pTrans->policy = policy;
45,792✔
665
  pTrans->conflict = conflict;
45,792✔
666
  pTrans->exec = TRN_EXEC_PARALLEL;
45,792✔
667
  pTrans->ableToBeKilled = false;
45,792✔
668
  pTrans->createdTime = taosGetTimestampMs();
45,792✔
669
  pTrans->prepareActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
45,792✔
670
  pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
45,792✔
671
  pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
45,792✔
672
  pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
45,792✔
673
  pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
45,792✔
674
  pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
45,792✔
675
  pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64();
45,792✔
676
  taosInitRWLatch(&pTrans->lockRpcArray);
45,792✔
677
  (void)taosThreadMutexInit(&pTrans->mutex, NULL);
45,792✔
678

679
  if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
45,792!
680
      pTrans->pRpcArray == NULL) {
45,792!
681
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
682
    mError("failed to create transaction since %s", terrstr());
×
683
    mndTransDrop(pTrans);
×
684
    return NULL;
×
685
  }
686

687
  if (pReq != NULL) {
45,792✔
688
    if (taosArrayPush(pTrans->pRpcArray, &pReq->info) == NULL) {
57,118!
689
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
690
      return NULL;
×
691
    }
692
    pTrans->originRpcType = pReq->msgType;
28,559✔
693
  }
694

695
  mInfo("trans:%d, create transaction:%s, origin:%s", pTrans->id, pTrans->opername, opername);
45,792!
696

697
  mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
45,792✔
698
  return pTrans;
45,792✔
699
}
700

701
static void mndTransDropActions(SArray *pArray) {
1,342,792✔
702
  int32_t size = taosArrayGetSize(pArray);
1,342,792✔
703
  for (int32_t i = 0; i < size; ++i) {
3,967,517✔
704
    STransAction *pAction = taosArrayGet(pArray, i);
2,624,725✔
705
    if (pAction->actionType == TRANS_ACTION_RAW) {
2,624,725✔
706
      taosMemoryFreeClear(pAction->pRaw);
1,578,879!
707
    } else if (pAction->actionType == TRANS_ACTION_MSG) {
1,045,846!
708
      taosMemoryFreeClear(pAction->pCont);
1,045,846!
709
    } else {
710
      // nothing
711
    }
712
  }
713

714
  taosArrayDestroy(pArray);
1,342,792✔
715
}
1,342,792✔
716

717
void mndTransDrop(STrans *pTrans) {
46,702✔
718
  if (pTrans != NULL) {
46,702✔
719
    mndTransDropData(pTrans);
45,792✔
720
    mTrace("trans:%d, local object is freed, data:%p", pTrans->id, pTrans);
45,792✔
721
    taosMemoryFreeClear(pTrans);
45,792!
722
  }
723
}
46,702✔
724

725
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
352,813✔
726
  pAction->id = taosArrayGetSize(pArray);
352,813✔
727

728
  void *ptr = taosArrayPush(pArray, pAction);
352,813✔
729
  if (ptr == NULL) {
352,813!
730
    TAOS_RETURN(terrno);
×
731
  }
732

733
  return 0;
352,813✔
734
}
735

736
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
1,620✔
737
  STransAction action = {
1,620✔
738
      .stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw, .mTraceId = pTrans->mTraceId};
1,620✔
739
  return mndTransAppendAction(pTrans->redoActions, &action);
1,620✔
740
}
741

742
int32_t mndTransAppendNullLog(STrans *pTrans) {
×
743
  STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_NULL};
×
744
  return mndTransAppendAction(pTrans->redoActions, &action);
×
745
}
746

747
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
13,326✔
748
  STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw};
13,326✔
749
  return mndTransAppendAction(pTrans->undoActions, &action);
13,326✔
750
}
751

752
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
191,348✔
753
  STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw};
191,348✔
754
  return mndTransAppendAction(pTrans->commitActions, &action);
191,348✔
755
}
756

757
int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw) {
20,742✔
758
  STransAction action = {
20,742✔
759
      .pRaw = pRaw, .stage = TRN_STAGE_PREPARE, .actionType = TRANS_ACTION_RAW, .mTraceId = pTrans->mTraceId};
20,742✔
760
  return mndTransAppendAction(pTrans->prepareActions, &action);
20,742✔
761
}
762

763
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
93,676✔
764
  pAction->stage = TRN_STAGE_REDO_ACTION;
93,676✔
765
  pAction->actionType = TRANS_ACTION_MSG;
93,676✔
766
  pAction->mTraceId = pTrans->mTraceId;
93,676✔
767
  return mndTransAppendAction(pTrans->redoActions, pAction);
93,676✔
768
}
769

770
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
32,101✔
771
  pAction->stage = TRN_STAGE_UNDO_ACTION;
32,101✔
772
  pAction->actionType = TRANS_ACTION_MSG;
32,101✔
773
  return mndTransAppendAction(pTrans->undoActions, pAction);
32,101✔
774
}
775

776
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) {
12,379✔
777
  pTrans->rpcRsp = pCont;
12,379✔
778
  pTrans->rpcRspLen = contLen;
12,379✔
779
}
12,379✔
780

781
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen) {
1,020✔
782
  pTrans->startFunc = startFunc;
1,020✔
783
  pTrans->stopFunc = stopFunc;
1,020✔
784
  pTrans->param = param;
1,020✔
785
  pTrans->paramLen = paramLen;
1,020✔
786
}
1,020✔
787

788
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname) {
×
789
  STrans *pTrans = NULL;
×
790
  void   *pIter = NULL;
×
791
  int32_t code = -1;
×
792

793
  while (1) {
794
    pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
×
795
    if (pIter == NULL) break;
×
796

797
    if (pTrans->oper == oper) {
×
798
      if (taosStrcasecmp(dbname, pTrans->dbname) == 0) {
×
799
        mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
×
800
        taosWLockLatch(&pTrans->lockRpcArray);
×
801
        if (pTrans->pRpcArray == NULL) {
×
802
          pTrans->pRpcArray = taosArrayInit(4, sizeof(SRpcHandleInfo));
×
803
        }
804
        if (pTrans->pRpcArray != NULL && taosArrayPush(pTrans->pRpcArray, &pMsg->info) != NULL) {
×
805
          code = 0;
×
806
        }
807
        taosWUnLockLatch(&pTrans->lockRpcArray);
×
808

809
        sdbRelease(pMnode->pSdb, pTrans);
×
810
        break;
×
811
      }
812
    }
813

814
    sdbRelease(pMnode->pSdb, pTrans);
×
815
  }
816
  return code;
×
817
}
818

819
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) {
30,324✔
820
  if (dbname != NULL) {
30,324!
821
    tstrncpy(pTrans->dbname, dbname, TSDB_DB_FNAME_LEN);
30,324✔
822
  }
823
  if (stbname != NULL) {
30,324✔
824
    tstrncpy(pTrans->stbname, stbname, TSDB_TABLE_FNAME_LEN);
22,911✔
825
  }
826
}
30,324✔
827

828
void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId) {
12✔
829
  if (taosHashPut(pTrans->arbGroupIds, &groupId, sizeof(int32_t), NULL, 0) != 0) {
12!
830
    mError("trans:%d, failed to put groupid into hash, groupId:%d", pTrans->id, groupId);
×
831
  }
832
}
12✔
833

834
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
2,820✔
835

836
void mndTransSetBeKilled(STrans *pTrans, bool ableToBeKilled) { pTrans->ableToBeKilled = ableToBeKilled; }
×
837

838
void mndTransSetKillMode(STrans *pTrans, ETrnKillMode killMode) {
8✔
839
  pTrans->ableToBeKilled = true; 
8✔
840
  pTrans->killMode = killMode; 
8✔
841
}
8✔
842

843
void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; }
×
844

845
void mndTransSetChangeless(STrans *pTrans) { pTrans->changeless = true; }
34✔
846

847
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
3,871✔
848

849
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
98,922✔
850
  int32_t  code = 0;
98,922✔
851
  SSdbRaw *pRaw = mndTransEncode(pTrans);
98,922✔
852
  if (pRaw == NULL) {
98,922!
853
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
854
    if (terrno != 0) code = terrno;
×
855
    mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, tstrerror(code));
×
856
    TAOS_RETURN(code);
×
857
  }
858
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRaw, SDB_STATUS_READY));
98,922!
859

860
  mInfo("trans:%d, sync to other mnodes, stage:%s createTime:%" PRId64, pTrans->id, mndTransStr(pTrans->stage),
98,922!
861
        pTrans->createdTime);
862
  code = mndSyncPropose(pMnode, pRaw, pTrans->id);
98,922✔
863
  if (code != 0) {
98,922✔
864
    mError("trans:%d, failed to sync, errno:%s code:0x%x createTime:%" PRId64 " saved trans:%d", pTrans->id,
22!
865
           tstrerror(code), code, pTrans->createdTime, pMnode->syncMgmt.transId);
866
    sdbFreeRaw(pRaw);
22✔
867
    TAOS_RETURN(code);
22✔
868
  }
869

870
  sdbFreeRaw(pRaw);
98,900✔
871
  mInfo("trans:%d, sync finished, createTime:%" PRId64, pTrans->id, pTrans->createdTime);
98,900!
872
  TAOS_RETURN(code);
98,900✔
873
}
874

875
static bool mndCheckDbConflict(const char *conflict, STrans *pTrans) {
3,325✔
876
  if (conflict[0] == 0) return false;
3,325!
877
  if (taosStrcasecmp(conflict, pTrans->dbname) == 0) return true;
3,325✔
878
  return false;
3,042✔
879
}
880

881
static bool mndCheckStbConflict(const char *conflict, STrans *pTrans) {
20,417✔
882
  if (conflict[0] == 0) return false;
20,417✔
883
  if (taosStrcasecmp(conflict, pTrans->stbname) == 0) return true;
19,052✔
884
  return false;
19,030✔
885
}
886

887
static void mndTransLogConflict(STrans *pNew, STrans *pTrans, bool conflict, bool *globalConflict) {
23,742✔
888
  if (conflict) {
23,742✔
889
    mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d", pNew->id,
305!
890
           pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname, pTrans->conflict);
891
    *globalConflict = true;
305✔
892
  } else {
893
    mInfo("trans:%d, db:%s stb:%s type:%d, not conflict with trans:%d db:%s stb:%s type:%d", pNew->id, pNew->dbname,
23,437!
894
          pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname, pTrans->conflict);
895
  }
896
}
23,742✔
897

898
static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
130,659✔
899
  STrans *pTrans = NULL;
130,659✔
900
  void   *pIter = NULL;
130,659✔
901
  bool    conflict = false;
130,659✔
902

903
  if (pNew->conflict == TRN_CONFLICT_NOTHING) return conflict;
130,659✔
904

905
  int32_t size = sdbGetSize(pMnode->pSdb, SDB_TRANS);
85,535✔
906
  mInfo("trans:%d, trans hash size %d", pNew->id, size);
85,535!
907

908
  while (1) {
909
    pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
107,705✔
910
    if (pIter == NULL) break;
107,705✔
911

912
    if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
22,170!
913

914
    if (pNew->conflict == TRN_CONFLICT_DB) {
22,170✔
915
      if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
2,994✔
916
      if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
2,994✔
917
        mndTransLogConflict(pNew, pTrans, mndCheckDbConflict(pNew->dbname, pTrans), &conflict);
1,917✔
918
        mndTransLogConflict(pNew, pTrans, mndCheckStbConflict(pNew->stbname, pTrans), &conflict);
1,917✔
919
      }
920
    }
921

922
    if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
22,170✔
923
      if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
19,173✔
924
      if (pTrans->conflict == TRN_CONFLICT_DB) {
19,173✔
925
        mndTransLogConflict(pNew, pTrans, mndCheckDbConflict(pNew->dbname, pTrans), &conflict);
1,408✔
926
        mndTransLogConflict(pNew, pTrans, mndCheckStbConflict(pNew->stbname, pTrans), &conflict);
1,408✔
927
      }
928
      if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
19,173✔
929
        mndTransLogConflict(pNew, pTrans, mndCheckStbConflict(pNew->stbname, pTrans), &conflict);  // for stb
17,092✔
930
      }
931
    }
932

933
    if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
22,170✔
934
      if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
3!
935
      if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
3!
936
        void *pGidIter = taosHashIterate(pNew->arbGroupIds, NULL);
×
937
        while (pGidIter != NULL) {
×
938
          int32_t groupId = *(int32_t *)pGidIter;
×
939
          if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) {
×
940
            taosHashCancelIterate(pNew->arbGroupIds, pGidIter);
×
941
            mndTransLogConflict(pNew, pTrans, true, &conflict);
×
942
            break;
×
943
          } else {
944
            mndTransLogConflict(pNew, pTrans, false, &conflict);
×
945
          }
946
          pGidIter = taosHashIterate(pNew->arbGroupIds, pGidIter);
×
947
        }
948
      }
949
    }
950

951
    if (pNew->conflict == TRN_CONFLICT_TSMA) {
22,170!
952
      if (pTrans->conflict == TRN_CONFLICT_GLOBAL || pTrans->conflict == TRN_CONFLICT_TSMA) {
×
953
        mndTransLogConflict(pNew, pTrans, true, &conflict);
×
954
      } else {
955
        mndTransLogConflict(pNew, pTrans, false, &conflict);
×
956
      }
957
    }
958

959
    sdbRelease(pMnode->pSdb, pTrans);
22,170✔
960
  }
961

962
  return conflict;
85,535✔
963
}
964

965
int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
130,659✔
966
  int32_t code = 0;
130,659✔
967
  if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
130,659✔
968
    if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) {
76,219!
969
      code = TSDB_CODE_MND_TRANS_CONFLICT;
×
970
      mError("trans:%d, failed to check tran conflict since db not set", pTrans->id);
×
971
      TAOS_RETURN(code);
×
972
    }
973
  }
974

975
  if (mndCheckTransConflict(pMnode, pTrans)) {
130,659✔
976
    code = TSDB_CODE_MND_TRANS_CONFLICT;
328✔
977
    mError("trans:%d, failed to check tran conflict since %s", pTrans->id, tstrerror(code));
328!
978
    TAOS_RETURN(code);
328✔
979
  }
980

981
  TAOS_RETURN(code);
130,331✔
982
}
983

984
int32_t mndTransCheckConflictWithCompact(SMnode *pMnode, STrans *pTrans) {
279✔
985
  int32_t      code = 0;
279✔
986
  void        *pIter = NULL;
279✔
987
  bool         conflict = false;
279✔
988
  SCompactObj *pCompact = NULL;
279✔
989

990
  while (1) {
4✔
991
    bool thisConflict = false;
283✔
992
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
283✔
993
    if (pIter == NULL) break;
283✔
994

995
    if (pTrans->conflict == TRN_CONFLICT_GLOBAL) {
4✔
996
      thisConflict = true;
2✔
997
    }
998
    if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
4!
999
      if (taosStrcasecmp(pTrans->dbname, pCompact->dbname) == 0) thisConflict = true;
2!
1000
    }
1001

1002
    if (thisConflict) {
4!
1003
      mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with compact:%d db:%s", pTrans->id,
4!
1004
             pTrans->dbname, pTrans->stbname, pTrans->conflict, pCompact->compactId, pCompact->dbname);
1005
      conflict = true;
4✔
1006
    } else {
1007
      mInfo("trans:%d, db:%s stb:%s type:%d, not conflict with compact:%d db:%s", pTrans->id, pTrans->dbname,
×
1008
            pTrans->stbname, pTrans->conflict, pCompact->compactId, pCompact->dbname);
1009
    }
1010
    sdbRelease(pMnode->pSdb, pCompact);
4✔
1011
  }
1012

1013
  if (conflict) {
279✔
1014
    code = TSDB_CODE_MND_TRANS_CONFLICT_COMPACT;
4✔
1015
    mError("trans:%d, failed to check tran conflict with compact since %s", pTrans->id, tstrerror(code));
4!
1016
    TAOS_RETURN(code);
4✔
1017
  }
1018

1019
  TAOS_RETURN(code);
275✔
1020
}
1021

1022
static bool mndTransActionsOfSameType(SArray *pActions) {
103,320✔
1023
  int32_t size = taosArrayGetSize(pActions);
103,320✔
1024
  ETrnAct lastActType = TRANS_ACTION_NULL;
103,320✔
1025
  bool    same = true;
103,320✔
1026
  for (int32_t i = 0; i < size; ++i) {
399,005✔
1027
    STransAction *pAction = taosArrayGet(pActions, i);
295,685✔
1028
    if (i > 0) {
295,685✔
1029
      if (lastActType != pAction->actionType) {
218,061!
1030
        same = false;
×
1031
        break;
×
1032
      }
1033
    }
1034
    lastActType = pAction->actionType;
295,685✔
1035
  }
1036
  return same;
103,320✔
1037
}
1038

1039
static int32_t mndTransCheckParallelActions(SMnode *pMnode, STrans *pTrans) {
45,432✔
1040
  int32_t code = 0;
45,432✔
1041
  if (pTrans->exec == TRN_EXEC_PARALLEL) {
45,432✔
1042
    if (mndTransActionsOfSameType(pTrans->redoActions) == false) {
42,676!
1043
      code = TSDB_CODE_MND_TRANS_INVALID_STAGE;
×
1044
      mError("trans:%d, types of parallel redo actions are not the same", pTrans->id);
×
1045
      TAOS_RETURN(code);
×
1046
    }
1047

1048
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
42,676✔
1049
      if (mndTransActionsOfSameType(pTrans->undoActions) == false) {
15,212!
1050
        code = TSDB_CODE_MND_TRANS_INVALID_STAGE;
×
1051
        mError("trans:%d, types of parallel undo actions are not the same", pTrans->id);
×
1052
        TAOS_RETURN(code);
×
1053
      }
1054
    }
1055
  }
1056

1057
  TAOS_RETURN(code);
45,432✔
1058
}
1059

1060
static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) {
45,432✔
1061
  int32_t code = 0;
45,432✔
1062
  if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) {
45,432!
1063
    code = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
×
1064
    mError("trans:%d, commit actions of non-changeless trans are empty", pTrans->id);
×
1065
    TAOS_RETURN(code);
×
1066
  }
1067
  if (mndTransActionsOfSameType(pTrans->commitActions) == false) {
45,432!
1068
    code = TSDB_CODE_MND_TRANS_INVALID_STAGE;
×
1069
    mError("trans:%d, types of commit actions are not the same", pTrans->id);
×
1070
    TAOS_RETURN(code);
×
1071
  }
1072

1073
  TAOS_RETURN(code);
45,432✔
1074
}
1075

1076
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
45,432✔
1077
  int32_t code = 0;
45,432✔
1078
  if (pTrans == NULL) {
45,432!
1079
    return TSDB_CODE_INVALID_PARA;
×
1080
  }
1081

1082
  mInfo("trans:%d, action list:", pTrans->id);
45,432!
1083
  int32_t index = 0;
45,432✔
1084
  for (int32_t i = 0; i < taosArrayGetSize(pTrans->prepareActions); ++i, ++index) {
66,155✔
1085
    STransAction *pAction = taosArrayGet(pTrans->prepareActions, i);
20,723✔
1086
    mInfo("trans:%d, action:%d, %s:%d sdbType:%s, sdbStatus:%s", pTrans->id, index,
20,723!
1087
          mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
1088
  }
1089

1090
  for (int32_t i = 0; i < taosArrayGetSize(pTrans->redoActions); ++i, ++index) {
140,425✔
1091
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
94,993✔
1092
    mInfo("trans:%d, action:%d, %s:%d msgType:%s", pTrans->id, index,
94,993!
1093
          mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType));;
1094
  }
1095

1096
  for (int32_t i = 0; i < taosArrayGetSize(pTrans->commitActions); ++i, ++index) {
236,735✔
1097
    STransAction *pAction = taosArrayGet(pTrans->commitActions, i);
191,303✔
1098
    mInfo("trans:%d, action:%d, %s:%d sdbType:%s, sdbStatus:%s", pTrans->id, index,
191,303!
1099
          mndTransStr(pAction->stage), i, sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
1100
  }
1101

1102
  for (int32_t i = 0; i < taosArrayGetSize(pTrans->undoActions); ++i, ++index) {
90,859✔
1103
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
45,427✔
1104
    if(pAction->actionType == TRANS_ACTION_MSG){
45,427✔
1105
      mInfo("trans:%d, action:%d, %s:%d msgType:%s", pTrans->id, index,
32,101!
1106
            mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType));;
1107
    }
1108
    else{
1109
      mInfo("trans:%d, action:%d, %s:%d sdbType:%s, sdbStatus:%s", pTrans->id, index,
13,326!
1110
            mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
1111
    }
1112
  }
1113

1114

1115
  TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
45,432!
1116

1117
  TAOS_CHECK_RETURN(mndTransCheckParallelActions(pMnode, pTrans));
45,432!
1118

1119
  TAOS_CHECK_RETURN(mndTransCheckCommitActions(pMnode, pTrans));
45,432!
1120

1121
  mInfo("trans:%d, prepare transaction", pTrans->id);
45,432!
1122
  if ((code = mndTransSync(pMnode, pTrans)) != 0) {
45,432✔
1123
    mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
12!
1124
    sdbWriteLock(pMnode->pSdb, SDB_TRANS);
12✔
1125
    tsMaxTransId = TMAX(pTrans->id, tsMaxTransId);
12✔
1126
    sdbUnLock(pMnode->pSdb, SDB_TRANS);
12✔
1127
    TAOS_RETURN(code);
12✔
1128
  }
1129
  mInfo("trans:%d, prepare finished", pTrans->id);
45,420!
1130

1131
  STrans *pNew = mndAcquireTrans(pMnode, pTrans->id);
45,420✔
1132
  if (pNew == NULL) {
45,420!
1133
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1134
    if (terrno != 0) code = terrno;
×
1135
    mError("trans:%d, failed to read from sdb since %s", pTrans->id, tstrerror(code));
×
1136
    TAOS_RETURN(code);
×
1137
  }
1138

1139
  pNew->pRpcArray = pTrans->pRpcArray;
45,420✔
1140
  pNew->rpcRsp = pTrans->rpcRsp;
45,420✔
1141
  pNew->rpcRspLen = pTrans->rpcRspLen;
45,420✔
1142
  pNew->mTraceId = pTrans->mTraceId;
45,420✔
1143
  pTrans->pRpcArray = NULL;
45,420✔
1144
  pTrans->rpcRsp = NULL;
45,420✔
1145
  pTrans->rpcRspLen = 0;
45,420✔
1146

1147
  mndTransExecute(pMnode, pNew);
45,420✔
1148
  mndReleaseTrans(pMnode, pNew);
45,420✔
1149
  // TDOD change to TAOS_RETURN(code);
1150
  return 0;
45,420✔
1151
}
1152

1153
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
45,431✔
1154
  int32_t code = 0;
45,431✔
1155
  mInfo("trans:%d, commit transaction", pTrans->id);
45,431!
1156
  if ((code = mndTransSync(pMnode, pTrans)) != 0) {
45,431✔
1157
    mError("trans:%d, failed to commit since %s", pTrans->id, tstrerror(code));
10!
1158
    TAOS_RETURN(code);
10✔
1159
  }
1160
  mInfo("trans:%d, commit finished", pTrans->id);
45,421!
1161
  TAOS_RETURN(code);
45,421✔
1162
}
1163

1164
static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
3✔
1165
  int32_t code = 0;
3✔
1166
  mInfo("trans:%d, rollback transaction", pTrans->id);
3!
1167
  if ((code = mndTransSync(pMnode, pTrans)) != 0) {
3!
UNCOV
1168
    mError("trans:%d, failed to rollback since %s", pTrans->id, tstrerror(code));
×
UNCOV
1169
    TAOS_RETURN(code);
×
1170
  }
1171
  mInfo("trans:%d, rollback finished", pTrans->id);
3!
1172
  TAOS_RETURN(code);
3✔
1173
}
1174

1175
static int32_t mndTransPreFinish(SMnode *pMnode, STrans *pTrans) {
3✔
1176
  int32_t code = 0;
3✔
1177
  mInfo("trans:%d, pre-finish transaction", pTrans->id);
3!
1178
  if ((code = mndTransSync(pMnode, pTrans)) != 0) {
3!
1179
    mError("trans:%d, failed to pre-finish since %s", pTrans->id, tstrerror(code));
×
1180
    TAOS_RETURN(code);
×
1181
  }
1182
  mInfo("trans:%d, pre-finish finished", pTrans->id);
3!
1183
  TAOS_RETURN(code);
3✔
1184
}
1185

1186
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
265,855✔
1187
  bool    sendRsp = false;
265,855✔
1188
  int32_t code = pTrans->code;
265,855✔
1189

1190
  if (pTrans->stage == TRN_STAGE_FINISH) {
265,855✔
1191
    sendRsp = true;
99,737✔
1192
  }
1193

1194
  if (pTrans->policy == TRN_POLICY_ROLLBACK) {
265,855✔
1195
    if (pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) {
75,444!
1196
      if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
18✔
1197
      sendRsp = true;
18✔
1198
    }
1199
  } else {
1200
    if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
190,411✔
1201
      if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_APP_IS_STARTING ||
126,339!
1202
          code == TSDB_CODE_SYN_PROPOSE_NOT_READY) {
1203
        if (pTrans->failedTimes > 60) sendRsp = true;
×
1204
      } else {
1205
        if (pTrans->failedTimes > 6) sendRsp = true;
126,339✔
1206
      }
1207
      if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
126,339✔
1208
    }
1209
  }
1210

1211
  if (!sendRsp) {
265,855✔
1212
    return;
166,098✔
1213
  } else {
1214
    mInfo("vgId:1, trans:%d, start to send rsp, stage:%s failedTimes:%d code:0x%x", pTrans->id,
99,757!
1215
          mndTransStr(pTrans->stage), pTrans->failedTimes, code);
1216
  }
1217

1218
  mInfo("vgId:1, trans:%d, start to lock rpc array", pTrans->id);
99,757!
1219
  taosWLockLatch(&pTrans->lockRpcArray);
99,757✔
1220
  int32_t size = taosArrayGetSize(pTrans->pRpcArray);
99,757✔
1221
  if (size <= 0) {
99,757✔
1222
    taosWUnLockLatch(&pTrans->lockRpcArray);
71,584✔
1223
    return;
71,584✔
1224
  }
1225

1226
  for (int32_t i = 0; i < size; ++i) {
56,346✔
1227
    SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i);
28,173✔
1228
    if (pInfo->handle != NULL) {
28,173✔
1229
      if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
26,669!
1230
        code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
1✔
1231
      }
1232
      if (code == TSDB_CODE_SYN_TIMEOUT) {
26,669!
1233
        code = TSDB_CODE_MND_TRANS_SYNC_TIMEOUT;
×
1234
      }
1235

1236
      if (i != 0 && code == 0) {
26,669!
1237
        code = TSDB_CODE_MNODE_NOT_FOUND;
×
1238
      }
1239
      mInfo("vgId:1, trans:%d, client:%d start to send rsp, code:0x%x stage:%s app:%p", pTrans->id, i, code,
26,669!
1240
            mndTransStr(pTrans->stage), pInfo->ahandle);
1241

1242
      SRpcMsg rspMsg = {.code = code, .info = *pInfo};
26,669✔
1243

1244
      if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
26,669✔
1245
        mInfo("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
3,855!
1246
        SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname);
3,855✔
1247
        if (pDb != NULL) {
3,855!
1248
          for (int32_t j = 0; j < 12; j++) {
4,651✔
1249
            bool ready = mndIsDbReady(pMnode, pDb);
4,647✔
1250
            if (!ready) {
4,647✔
1251
              mInfo("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname, j);
796!
1252
              taosMsleep(1000);
796✔
1253
            } else {
1254
              break;
3,851✔
1255
            }
1256
          }
1257
        }
1258
        mndReleaseDb(pMnode, pDb);
3,855✔
1259
      } else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) {
22,814✔
1260
        void   *pCont = NULL;
7,145✔
1261
        int32_t contLen = 0;
7,145✔
1262
        if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen)) {
7,145✔
1263
          mndTransSetRpcRsp(pTrans, pCont, contLen);
7,142✔
1264
        }
1265
      } else if (pTrans->originRpcType == TDMT_MND_CREATE_INDEX) {
15,669✔
1266
        void   *pCont = NULL;
473✔
1267
        int32_t contLen = 0;
473✔
1268
        if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen)) {
473!
1269
          mndTransSetRpcRsp(pTrans, pCont, contLen);
473✔
1270
        }
1271
      }
1272

1273
      if (pTrans->rpcRspLen != 0) {
26,669✔
1274
        void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
12,379✔
1275
        if (rpcCont != NULL) {
12,379!
1276
          memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
12,379✔
1277
          rspMsg.pCont = rpcCont;
12,379✔
1278
          rspMsg.contLen = pTrans->rpcRspLen;
12,379✔
1279
        }
1280
      }
1281

1282
      tmsgSendRsp(&rspMsg);
26,669✔
1283

1284
      mInfo("vgId:1, trans:%d, client:%d send rsp finished, code:0x%x stage:%s app:%p", pTrans->id, i, code,
26,669!
1285
            mndTransStr(pTrans->stage), pInfo->ahandle);
1286
    }
1287
  }
1288
  taosArrayClear(pTrans->pRpcArray);
28,173✔
1289
  taosWUnLockLatch(&pTrans->lockRpcArray);
28,173✔
1290
}
1291

1292
int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
97,798✔
1293
  int32_t code = 0;
97,798✔
1294
  SMnode *pMnode = pRsp->info.node;
97,798✔
1295
#ifndef TD_ASTRA_32
1296
  int64_t signature = (int64_t)(pRsp->info.ahandle);
97,798✔
1297
  int32_t transId = (int32_t)(signature >> 32);
97,798✔
1298
  int32_t action = (int32_t)((signature << 32) >> 32);
97,798✔
1299
#else
1300
  int32_t transId = (int32_t)(pRsp->info.ahandle);
1301
  int32_t action = (int32_t)(pRsp->info.ahandleEx);
1302
#endif
1303
  STrans *pTrans = mndAcquireTrans(pMnode, transId);
97,798✔
1304
  if (pTrans == NULL) {
97,798!
1305
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1306
    if (terrno != 0) code = terrno;
×
1307
    mError("trans:%d, failed to get transId from vnode rsp since %s", transId, tstrerror(code));
×
1308
    goto _OVER;
×
1309
  }
1310

1311
  SArray *pArray = NULL;
97,798✔
1312
  if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
97,798✔
1313
    pArray = pTrans->redoActions;
97,786✔
1314
  } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
12!
1315
    pArray = pTrans->undoActions;
12✔
1316
  } else {
1317
    mError("trans:%d, invalid trans stage:%d while recv action rsp", pTrans->id, pTrans->stage);
×
1318
    goto _OVER;
×
1319
  }
1320

1321
  if (pArray == NULL) {
97,798!
1322
    mError("trans:%d, invalid trans stage:%d", transId, pTrans->stage);
×
1323
    goto _OVER;
×
1324
  }
1325

1326
  int32_t actionNum = taosArrayGetSize(pArray);
97,798✔
1327
  if (action < 0 || action >= actionNum) {
97,798!
1328
    mError("trans:%d, invalid action:%d", transId, action);
×
1329
    goto _OVER;
×
1330
  }
1331

1332
  STransAction *pAction = taosArrayGet(pArray, action);
97,798✔
1333
  if (pAction != NULL) {
97,798!
1334
    pAction->msgReceived = 1;
97,798✔
1335
    pAction->errCode = pRsp->code;
97,798✔
1336
    pAction->endTime = taosGetTimestampMs();
97,798✔
1337

1338
    // pTrans->lastErrorNo = pRsp->code;
1339
    mndSetTransLastAction(pTrans, pAction);
97,798✔
1340

1341
    mInfo("trans:%d, %s:%d response is received, received code:0x%x(%s), accept:0x%x(%s) retry:0x%x(%s)", transId,
97,798!
1342
          mndTransStr(pAction->stage), action, pRsp->code, tstrerror(pRsp->code), pAction->acceptableCode,
1343
          tstrerror(pAction->acceptableCode), pAction->retryCode, tstrerror(pAction->retryCode));
1344
  } else {
1345
    mInfo("trans:%d, invalid action, index:%d, code:0x%x", transId, action, pRsp->code);
×
1346
  }
1347

1348
  mndTransExecute(pMnode, pTrans);
97,798✔
1349

1350
_OVER:
97,798✔
1351
  mndReleaseTrans(pMnode, pTrans);
97,798✔
1352
  TAOS_RETURN(code);
97,798✔
1353
}
1354

1355
static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
4,311✔
1356
  pAction->rawWritten = 0;
4,311✔
1357
  pAction->msgSent = 0;
4,311✔
1358
  pAction->msgReceived = 0;
4,311✔
1359
  if (pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR || pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR ||
4,311!
1360
      pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
4,311✔
1361
    pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
14✔
1362
    mInfo("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
14!
1363
          pAction->id, pAction->epSet.inUse);
1364
  } else {
1365
    mInfo("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), pAction->id);
4,297!
1366
  }
1367
  pAction->errCode = 0;
4,311✔
1368
}
4,311✔
1369

1370
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
3✔
1371
  int32_t numOfActions = taosArrayGetSize(pArray);
3✔
1372

1373
  for (int32_t action = 0; action < numOfActions; ++action) {
15✔
1374
    STransAction *pAction = taosArrayGet(pArray, action);
12✔
1375
    if (pAction->msgSent && pAction->msgReceived &&
12!
1376
        (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode))
12!
1377
      continue;
9✔
1378
    if (pAction->rawWritten && (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) continue;
3!
1379

1380
    mndTransResetAction(pMnode, pTrans, pAction);
3✔
1381
  }
1382
}
3✔
1383

1384
// execute in sync context
1385
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
431,910✔
1386
  if (pAction->rawWritten) return 0;
431,910✔
1387
  if (topHalf) {
239,098!
1388
    TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
×
1389
  }
1390

1391
  if (pAction->pRaw->type >= SDB_MAX) {
239,098!
1392
    pAction->rawWritten = true;
×
1393
    pAction->errCode = 0;
×
1394
    mndSetTransLastAction(pTrans, pAction);
×
1395
    mInfo("skip sdb raw type:%d since it is not supported", pAction->pRaw->type);
×
1396
    TAOS_RETURN(TSDB_CODE_SUCCESS);
×
1397
  }
1398

1399
  int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
239,098✔
1400
  if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
239,098!
1401
    pAction->rawWritten = true;
239,098✔
1402
    pAction->errCode = 0;
239,098✔
1403
    code = 0;
239,098✔
1404
    mInfo("trans:%d, %s:%d write to sdb, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
239,098!
1405
          sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
1406

1407
    mndSetTransLastAction(pTrans, pAction);
239,098✔
1408
  } else {
1409
    pAction->errCode = (terrno != 0) ? terrno : code;
×
1410
    mError("trans:%d, %s:%d failed to write sdb since %s, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage),
×
1411
           pAction->id, tstrerror(code), sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
1412
    mndSetTransLastAction(pTrans, pAction);
×
1413
  }
1414

1415
  TAOS_RETURN(code);
239,098✔
1416
}
1417

1418
// execute in trans context
1419
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
674,907✔
1420
  if (pAction->msgSent) return 0;
674,907✔
1421
  if (mndCannotExecuteTrans(pMnode, topHalf)) {
136,237✔
1422
    TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
38,385✔
1423
  }
1424

1425
#ifndef TD_ASTRA_32
1426
  int64_t signature = pTrans->id;
97,852✔
1427
  signature = (signature << 32);
97,852✔
1428
  signature += pAction->id;
97,852✔
1429

1430
  SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
97,852✔
1431
#else
1432
  SRpcMsg rpcMsg = {.msgType = pAction->msgType,
1433
                    .contLen = pAction->contLen,
1434
                    .info.ahandle = (void *)pTrans->id,
1435
                    .info.ahandleEx = (void *)pAction->id};
1436
#endif
1437
  rpcMsg.pCont = rpcMallocCont(pAction->contLen);
97,852✔
1438
  if (rpcMsg.pCont == NULL) {
97,852!
1439
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1440
    return -1;
×
1441
  }
1442
  rpcMsg.info.traceId.rootId = pTrans->mTraceId;
97,852✔
1443
  rpcMsg.info.notFreeAhandle = 1;
97,852✔
1444

1445
  memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
97,852✔
1446

1447
  char    detail[1024] = {0};
97,852✔
1448
  int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d", TMSG_INFO(pAction->msgType),
97,852!
1449
                          pAction->epSet.numOfEps, pAction->epSet.inUse);
97,852✔
1450
  for (int32_t i = 0; i < pAction->epSet.numOfEps; ++i) {
201,420✔
1451
    len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, pAction->epSet.eps[i].fqdn,
103,568✔
1452
                     pAction->epSet.eps[i].port);
103,568✔
1453
  }
1454

1455
  int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
97,852✔
1456
  if (code == 0) {
97,852✔
1457
    pAction->msgSent = 1;
97,847✔
1458
    // pAction->msgReceived = 0;
1459
    pAction->errCode = TSDB_CODE_ACTION_IN_PROGRESS;
97,847✔
1460
    pAction->startTime = taosGetTimestampMs();
97,847✔
1461
    pAction->endTime = 0;
97,847✔
1462
    mInfo("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);
97,847!
1463

1464
    mndSetTransLastAction(pTrans, pAction);
97,847✔
1465
  } else {
1466
    pAction->msgSent = 0;
5✔
1467
    pAction->msgReceived = 0;
5✔
1468
    pAction->errCode = (terrno != 0) ? terrno : code;
5!
1469
    mError("trans:%d, %s:%d not send since %s, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr(),
5!
1470
           detail);
1471

1472
    mndSetTransLastAction(pTrans, pAction);
5✔
1473
  }
1474

1475
  TAOS_RETURN(code);
97,852✔
1476
}
1477

1478
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
×
1479
  if (!topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH;
×
1480
  pAction->rawWritten = 0;
×
1481
  pAction->errCode = 0;
×
1482
  mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
×
1483

1484
  mndSetTransLastAction(pTrans, pAction);
×
1485
  return 0;
×
1486
}
1487

1488
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
1,106,817✔
1489
  if (pAction->actionType == TRANS_ACTION_RAW) {
1,106,817✔
1490
    return mndTransWriteSingleLog(pMnode, pTrans, pAction, topHalf);
431,910✔
1491
  } else if (pAction->actionType == TRANS_ACTION_MSG) {
674,907!
1492
    return mndTransSendSingleMsg(pMnode, pTrans, pAction, topHalf);
674,907✔
1493
  } else {
1494
    return mndTransExecNullMsg(pMnode, pTrans, pAction, topHalf);
×
1495
  }
1496
}
1497

1498
static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) {
244,865✔
1499
  int32_t numOfActions = taosArrayGetSize(pArray);
244,865✔
1500
  int32_t code = 0;
244,865✔
1501

1502
  for (int32_t action = 0; action < numOfActions; ++action) {
1,259,917✔
1503
    STransAction *pAction = taosArrayGet(pArray, action);
1,046,170✔
1504
    code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
1,046,170✔
1505
    if (code != 0) {
1,046,170✔
1506
      mInfo("trans:%d, action:%d not executed since %s. numOfActions:%d", pTrans->id, action, tstrerror(code),
31,118!
1507
            numOfActions);
1508
      break;
31,118✔
1509
    }
1510
  }
1511

1512
  return code;
244,865✔
1513
}
1514

1515
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) {
282,796✔
1516
  int32_t numOfActions = taosArrayGetSize(pArray);
282,796✔
1517
  int32_t code = 0;
282,796✔
1518
  if (numOfActions == 0) return 0;
282,796✔
1519

1520
  if ((code = mndTransExecSingleActions(pMnode, pTrans, pArray, topHalf)) != 0) {
244,865✔
1521
    return code;
31,118✔
1522
  }
1523

1524
  int32_t       numOfExecuted = 0;
213,747✔
1525
  int32_t       errCode = 0;
213,747✔
1526
  STransAction *pErrAction = NULL;
213,747✔
1527
  for (int32_t action = 0; action < numOfActions; ++action) {
1,228,799✔
1528
    STransAction *pAction = taosArrayGet(pArray, action);
1,015,052✔
1529
    if (pAction->msgReceived || pAction->rawWritten) {
1,015,052✔
1530
      numOfExecuted++;
709,339✔
1531
      if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
709,339✔
1532
        errCode = pAction->errCode;
28✔
1533
        pErrAction = pAction;
28✔
1534
      }
1535
    } else {
1536
      pErrAction = pAction;
305,713✔
1537
    }
1538
  }
1539

1540
  mndSetTransLastAction(pTrans, pErrAction);
213,747✔
1541

1542
  if (numOfExecuted == numOfActions) {
213,747✔
1543
    if (errCode == 0) {
124,813✔
1544
      mInfo("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
124,810!
1545
      return 0;
124,810✔
1546
    } else {
1547
      mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode & 0XFFFF);
3!
1548
      mndTransResetActions(pMnode, pTrans, pArray);
3✔
1549
      terrno = errCode;
3✔
1550
      return errCode;
3✔
1551
    }
1552
  } else {
1553
    mInfo("trans:%d, %d of %d actions executed", pTrans->id, numOfExecuted, numOfActions);
88,934!
1554

1555
    for (int32_t action = 0; action < numOfActions; ++action) {
614,169✔
1556
      STransAction *pAction = taosArrayGet(pArray, action);
525,235✔
1557
      mDebug("trans:%d, %s:%d Sent:%d, Received:%d, errCode:0x%x, acceptableCode:0x%x, retryCode:0x%x", pTrans->id,
525,235✔
1558
             mndTransStr(pAction->stage), pAction->id, pAction->msgSent, pAction->msgReceived, pAction->errCode,
1559
             pAction->acceptableCode, pAction->retryCode);
1560
      if (pAction->msgSent) {
525,235!
1561
        if (pAction->msgReceived) {
525,235✔
1562
          if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
219,522✔
1563
            mndTransResetAction(pMnode, pTrans, pAction);
25✔
1564
            mInfo("trans:%d, %s:%d reset", pTrans->id, mndTransStr(pAction->stage), pAction->id);
25!
1565
          }
1566
        }
1567
      }
1568
    }
1569
    return TSDB_CODE_ACTION_IN_PROGRESS;
88,934✔
1570
  }
1571
}
1572

1573
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
183,047✔
1574
  int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf);
183,047✔
1575
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
183,047✔
1576
    mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, in %s", pTrans->id, terrstr(), terrno,
8!
1577
           mndStrExecutionContext(topHalf));
1578
  }
1579
  return code;
183,047✔
1580
}
1581

1582
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
21✔
1583
  int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf);
21✔
1584
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
21!
UNCOV
1585
    mError("trans:%d, failed to execute undoActions since %s. in %s", pTrans->id, terrstr(),
×
1586
           mndStrExecutionContext(topHalf));
1587
  }
1588
  return code;
21✔
1589
}
1590

1591
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
99,728✔
1592
  int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf);
99,728✔
1593
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
99,728!
1594
    mError("trans:%d, failed to execute commitActions since %s. in %s", pTrans->id, terrstr(),
×
1595
           mndStrExecutionContext(topHalf));
1596
  }
1597
  return code;
99,728✔
1598
}
1599

1600
static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArray *pActions, bool topHalf) {
28,478✔
1601
  int32_t code = 0;
28,478✔
1602
  int32_t numOfActions = taosArrayGetSize(pActions);
28,478✔
1603
  if (numOfActions == 0) return code;
28,478✔
1604

1605
  if (pTrans->actionPos >= numOfActions) {
28,476✔
1606
    return code;
3,069✔
1607
  }
1608

1609
  mInfo("trans:%d, execute %d actions serial, begin at action:%d, stage:%s", pTrans->id, numOfActions,
25,407!
1610
        pTrans->actionPos, mndTransStr(pTrans->stage));
1611

1612
  for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
37,726✔
1613
    STransAction *pAction = taosArrayGet(pActions, action);
34,972✔
1614

1615
    mInfo("trans:%d, current action:%d, stage:%s, actionType(1:msg,2:log):%d, msgSent:%d, msgReceived:%d", 
34,972!
1616
          pTrans->id, pTrans->actionPos, mndTransStr(pAction->stage), pAction->actionType, pAction->msgSent,
1617
          pAction->msgReceived);
1618

1619
    code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
34,972✔
1620
    if (code == 0) {
34,972✔
1621
      if (pAction->msgSent) {
27,700✔
1622
        if (pAction->msgReceived) {
24,362✔
1623
          if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
10,825✔
1624
            code = pAction->errCode;
4,283✔
1625
            mndTransResetAction(pMnode, pTrans, pAction);
4,283✔
1626
          } else {
1627
            mInfo("trans:%d, %s:%d execute successfully", pTrans->id, mndTransStr(pAction->stage), action);
6,542!
1628
          }
1629
        } else {
1630
          code = TSDB_CODE_ACTION_IN_PROGRESS;
13,537✔
1631
        }
1632
      } else if (pAction->rawWritten) {
3,338!
1633
        if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
3,338!
1634
          code = pAction->errCode;
×
1635
        } else {
1636
          mInfo("trans:%d, %s:%d write successfully", pTrans->id, mndTransStr(pAction->stage), action);
3,338!
1637
        }
1638
      } else {
1639
      }
1640
    }
1641

1642
    if (code == 0) {
34,972✔
1643
      pTrans->failedTimes = 0;
9,880✔
1644
    }
1645
    mndSetTransLastAction(pTrans, pAction);
34,972✔
1646

1647
    if (mndCannotExecuteTrans(pMnode, topHalf)) {
34,972✔
1648
      pTrans->lastErrorNo = code;
9,099✔
1649
      pTrans->code = code;
9,099✔
1650
      mInfo("trans:%d, %s:%d, cannot execute next action in %s, code:%s", pTrans->id, mndTransStr(pAction->stage),
9,099!
1651
            action, mndStrExecutionContext(topHalf), tstrerror(code));
1652
      break;
9,099✔
1653
    }
1654

1655
    if (code == 0) {
25,873✔
1656
      pTrans->code = 0;
8,053✔
1657
      pTrans->actionPos++;
8,053✔
1658
      mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
8,053!
1659
            pAction->id);
1660
      (void)taosThreadMutexUnlock(&pTrans->mutex);
8,053✔
1661
      code = mndTransSync(pMnode, pTrans);
8,053✔
1662
      (void)taosThreadMutexLock(&pTrans->mutex);
8,053✔
1663
      if (code != 0) {
8,053!
1664
        pTrans->actionPos--;
×
1665
        pTrans->code = terrno;
×
1666
        mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id,
×
1667
               mndTransStr(pAction->stage), pAction->id, terrstr());
1668
        break;
×
1669
      }
1670
    } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
17,820✔
1671
      mInfo("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
13,537!
1672
      break;
13,537✔
1673
    } else if (code == pAction->retryCode || code == TSDB_CODE_SYN_PROPOSE_NOT_READY ||
4,283!
1674
               code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_SYN_NOT_LEADER) {
30✔
1675
      mInfo("trans:%d, %s:%d receive code:0x%x(%s) and retry", pTrans->id, mndTransStr(pAction->stage), pAction->id,
4,266!
1676
            code, tstrerror(code));
1677
      pTrans->lastErrorNo = code;
4,266✔
1678
      taosMsleep(300);
4,266✔
1679
      action--;
4,266✔
1680
      continue;
4,266✔
1681
    } else {
1682
      terrno = code;
17✔
1683
      pTrans->lastErrorNo = code;
17✔
1684
      pTrans->code = code;
17✔
1685
      mInfo("trans:%d, %s:%d receive code:0x%x(%s) and wait another schedule, failedTimes:%d", pTrans->id,
17!
1686
            mndTransStr(pAction->stage), pAction->id, code, tstrerror(code), pTrans->failedTimes);
1687
      break;
17✔
1688
    }
1689
  }
1690

1691
  return code;
25,407✔
1692
}
1693

1694
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
28,478✔
1695
  int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
28,478✔
1696
  (void)taosThreadMutexLock(&pTrans->mutex);
28,478✔
1697
  if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
28,478!
1698
    code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->redoActions, topHalf);
28,478✔
1699
  }
1700
  (void)taosThreadMutexUnlock(&pTrans->mutex);
28,478✔
1701
  return code;
28,478✔
1702
}
1703

1704
static int32_t mndTransExecuteUndoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
×
1705
  int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1706
  (void)taosThreadMutexLock(&pTrans->mutex);
×
1707
  if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
×
1708
    code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->undoActions, topHalf);
×
1709
  }
1710
  (void)taosThreadMutexUnlock(&pTrans->mutex);
×
1711
  return code;
×
1712
}
1713

1714
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
54,369✔
1715
  bool    continueExec = true;
54,369✔
1716
  int32_t code = 0;
54,369✔
1717
  terrno = 0;
54,369✔
1718

1719
  int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
54,369✔
1720
  if (numOfActions == 0) goto _OVER;
54,369✔
1721

1722
  mInfo("trans:%d, execute %d prepare actions.", pTrans->id, numOfActions);
16,209!
1723

1724
  for (int32_t action = 0; action < numOfActions; ++action) {
41,884✔
1725
    STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
25,675✔
1726
    code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
25,675✔
1727
    if (code != 0) {
25,675!
1728
      terrno = code;
×
1729
      mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d, since %s", pTrans->id, action,
×
1730
             numOfActions, tstrerror(code));
1731
      return false;
×
1732
    }
1733
  }
1734

1735
_OVER:
16,209✔
1736
  pTrans->stage = TRN_STAGE_REDO_ACTION;
54,369✔
1737
  mInfo("trans:%d, stage from prepare to redoAction", pTrans->id);
54,369!
1738
  return continueExec;
54,369✔
1739
}
1740

1741
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
211,525✔
1742
  bool    continueExec = true;
211,525✔
1743
  int32_t code = 0;
211,525✔
1744
  terrno = 0;
211,525✔
1745

1746
  if (pTrans->exec == TRN_EXEC_SERIAL) {
211,525✔
1747
    code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf);
28,478✔
1748
  } else {
1749
    code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf);
183,047✔
1750
  }
1751

1752
  if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH && mndTransIsInSyncContext(topHalf)) {
211,525!
1753
    pTrans->lastErrorNo = code;
×
1754
    pTrans->code = code;
×
1755
    mInfo(
×
1756
        "trans:%d, failed to execute, will retry redo action stage in 100 ms , in %s, "
1757
        "continueExec:%d, code:%s",
1758
        pTrans->id, mndStrExecutionContext(topHalf), continueExec, tstrerror(code));
1759
    taosMsleep(100);
×
1760
    return true;
×
1761
  } else {
1762
    if (mndCannotExecuteTrans(pMnode, topHalf)) {
211,525✔
1763
      mInfo("trans:%d, cannot continue to execute redo action stage in %s, continueExec:%d, code:%s", pTrans->id,
63,611!
1764
            mndStrExecutionContext(topHalf), continueExec, tstrerror(code));
1765
      return false;
63,611✔
1766
    }
1767
  }
1768
  terrno = code;
147,914✔
1769

1770
  if (code == 0) {
147,914✔
1771
    pTrans->code = 0;
45,431✔
1772
    pTrans->stage = TRN_STAGE_COMMIT;
45,431✔
1773
    mInfo("trans:%d, stage from redoAction to commit", pTrans->id);
45,431!
1774
    continueExec = true;
45,431✔
1775
  } else if (code == TSDB_CODE_ACTION_IN_PROGRESS || code == TSDB_CODE_MND_TRANS_CTX_SWITCH) {
102,483!
1776
    mInfo("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
102,458!
1777
    continueExec = false;
102,458✔
1778
  } else {
1779
    pTrans->failedTimes++;
25✔
1780
    pTrans->code = terrno;
25✔
1781
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
25✔
1782
      if (pTrans->lastAction != 0) {
3!
1783
        STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction);
3✔
1784
        if (pAction->retryCode != 0 && pAction->retryCode == pAction->errCode) {
3!
1785
          if (pTrans->failedTimes < 6) {
×
1786
            mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id,
×
1787
                   pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes);
1788
            taosMsleep(1000);
×
1789
            continueExec = true;
×
1790
            return true;
×
1791
          }
1792
        }
1793
      }
1794

1795
      pTrans->stage = TRN_STAGE_ROLLBACK;
3✔
1796
      pTrans->actionPos = 0;
3✔
1797
      mError("trans:%d, stage from redoAction to rollback since %s, and set actionPos to %d", pTrans->id, terrstr(),
3!
1798
             pTrans->actionPos);
1799
      continueExec = true;
3✔
1800
    } else {
1801
      mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
22!
1802
      continueExec = false;
22✔
1803
    }
1804
  }
1805

1806
  return continueExec;
147,914✔
1807
}
1808

1809
// execute in trans context
1810
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
45,431✔
1811
  if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
45,431!
1812

1813
  bool    continueExec = true;
45,431✔
1814
  int32_t code = mndTransCommit(pMnode, pTrans);
45,431✔
1815

1816
  if (code == 0) {
45,431✔
1817
    pTrans->code = 0;
45,421✔
1818
    pTrans->stage = TRN_STAGE_COMMIT_ACTION;
45,421✔
1819
    mInfo("trans:%d, stage from commit to commitAction", pTrans->id);
45,421!
1820
    continueExec = true;
45,421✔
1821
  } else {
1822
    pTrans->code = terrno;
10✔
1823
    pTrans->failedTimes++;
10✔
1824
    mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
10!
1825
    continueExec = false;
10✔
1826
  }
1827

1828
  return continueExec;
45,431✔
1829
}
1830

1831
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
99,728✔
1832
  bool    continueExec = true;
99,728✔
1833
  int32_t code = mndTransExecuteCommitActions(pMnode, pTrans, topHalf);
99,728✔
1834

1835
  if (code == 0) {
99,728!
1836
    pTrans->code = 0;
99,728✔
1837
    pTrans->stage = TRN_STAGE_FINISH;  // TRN_STAGE_PRE_FINISH is not necessary
99,728✔
1838
    mInfo("trans:%d, stage from commitAction to finished", pTrans->id);
99,728!
1839
    continueExec = true;
99,728✔
1840
  } else if (code == TSDB_CODE_MND_TRANS_CTX_SWITCH && topHalf) {
×
1841
    pTrans->code = 0;
×
1842
    pTrans->stage = TRN_STAGE_COMMIT;
×
1843
    mInfo("trans:%d, back to commit stage", pTrans->id);
×
1844
    continueExec = true;
×
1845
  } else {
1846
    pTrans->code = terrno;
×
1847
    pTrans->failedTimes++;
×
1848
    mError("trans:%d, stage keep on commitAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
×
1849
    continueExec = false;
×
1850
  }
1851

1852
  return continueExec;
99,728✔
1853
}
1854

1855
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
21✔
1856
  bool    continueExec = true;
21✔
1857
  int32_t code = 0;
21✔
1858

1859
  if (pTrans->exec == TRN_EXEC_SERIAL) {
21!
1860
    code = mndTransExecuteUndoActionsSerial(pMnode, pTrans, topHalf);
×
1861
  } else {
1862
    code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
21✔
1863
  }
1864

1865
  if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
21✔
1866
  terrno = code;
16✔
1867

1868
  if (code == 0) {
16✔
1869
    pTrans->stage = TRN_STAGE_PRE_FINISH;
3✔
1870
    mInfo("trans:%d, stage from undoAction to pre-finish", pTrans->id);
3!
1871
    continueExec = true;
3✔
1872
  } else if (code == TSDB_CODE_ACTION_IN_PROGRESS || code == TSDB_CODE_MND_TRANS_CTX_SWITCH) {
13!
1873
    mInfo("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
13!
1874
    continueExec = false;
13✔
1875
  } else {
UNCOV
1876
    pTrans->failedTimes++;
×
UNCOV
1877
    mError("trans:%d, stage keep on undoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
×
UNCOV
1878
    continueExec = false;
×
1879
  }
1880

1881
  return continueExec;
16✔
1882
}
1883

1884
// in trans context
1885
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
3✔
1886
  if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
3!
1887

1888
  bool    continueExec = true;
3✔
1889
  int32_t code = mndTransRollback(pMnode, pTrans);
3✔
1890

1891
  if (code == 0) {
3!
1892
    pTrans->stage = TRN_STAGE_UNDO_ACTION;
3✔
1893
    continueExec = true;
3✔
1894
  } else {
UNCOV
1895
    pTrans->failedTimes++;
×
UNCOV
1896
    mError("trans:%d, stage keep on rollback since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
×
UNCOV
1897
    continueExec = false;
×
1898
  }
1899

1900
  return continueExec;
3✔
1901
}
1902

1903
// excute in trans context
1904
static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
3✔
1905
  if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
3!
1906

1907
  bool    continueExec = true;
3✔
1908
  int32_t code = mndTransPreFinish(pMnode, pTrans);
3✔
1909

1910
  if (code == 0) {
3!
1911
    pTrans->stage = TRN_STAGE_FINISH;
3✔
1912
    mInfo("trans:%d, stage from pre-finish to finish", pTrans->id);
3!
1913
    continueExec = true;
3✔
1914
  } else {
1915
    pTrans->failedTimes++;
×
1916
    mError("trans:%d, stage keep on pre-finish since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
×
1917
    continueExec = false;
×
1918
  }
1919

1920
  return continueExec;
3✔
1921
}
1922

1923
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
99,736✔
1924
  bool continueExec = false;
99,736✔
1925
  if (topHalf) return continueExec;
99,736✔
1926

1927
  SSdbRaw *pRaw = mndTransEncode(pTrans);
54,312✔
1928
  if (pRaw == NULL) {
54,312!
1929
    mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr());
×
1930
    return false;
×
1931
  }
1932
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED));
54,312!
1933

1934
  int32_t code = sdbWrite(pMnode->pSdb, pRaw);
54,312✔
1935
  if (code != 0) {
54,312!
1936
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
×
1937
  }
1938

1939
  mInfo("trans:%d, execute finished, code:0x%x, failedTimes:%d createTime:%" PRId64, pTrans->id, pTrans->code,
54,312!
1940
        pTrans->failedTimes, pTrans->createdTime);
1941
  return continueExec;
54,312✔
1942
}
1943

1944
void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
265,855✔
1945
  bool continueExec = true;
265,855✔
1946

1947
  while (continueExec) {
722,302✔
1948
    mInfo("trans:%d, continue to execute stage:%s in %s, createTime:%" PRId64 "", pTrans->id,
456,447!
1949
          mndTransStr(pTrans->stage), mndStrExecutionContext(topHalf), pTrans->createdTime);
1950
    pTrans->lastExecTime = taosGetTimestampMs();
456,447✔
1951
    switch (pTrans->stage) {
456,447!
1952
      case TRN_STAGE_PREPARE:
×
1953
        continueExec = mndTransPerformPrepareStage(pMnode, pTrans, topHalf);
×
1954
        break;
×
1955
      case TRN_STAGE_REDO_ACTION:
211,525✔
1956
        continueExec = mndTransPerformRedoActionStage(pMnode, pTrans, topHalf);
211,525✔
1957
        break;
211,525✔
1958
      case TRN_STAGE_COMMIT:
45,431✔
1959
        continueExec = mndTransPerformCommitStage(pMnode, pTrans, topHalf);
45,431✔
1960
        break;
45,431✔
1961
      case TRN_STAGE_COMMIT_ACTION:
99,728✔
1962
        continueExec = mndTransPerformCommitActionStage(pMnode, pTrans, topHalf);
99,728✔
1963
        break;
99,728✔
1964
      case TRN_STAGE_ROLLBACK:
3✔
1965
        continueExec = mndTransPerformRollbackStage(pMnode, pTrans, topHalf);
3✔
1966
        break;
3✔
1967
      case TRN_STAGE_UNDO_ACTION:
21✔
1968
        continueExec = mndTransPerformUndoActionStage(pMnode, pTrans, topHalf);
21✔
1969
        break;
21✔
1970
      case TRN_STAGE_PRE_FINISH:
3✔
1971
        continueExec = mndTransPerformPreFinishStage(pMnode, pTrans, topHalf);
3✔
1972
        break;
3✔
1973
      case TRN_STAGE_FINISH:
99,736✔
1974
        continueExec = mndTransPerformFinishStage(pMnode, pTrans, topHalf);
99,736✔
1975
        break;
99,736✔
1976
      default:
×
1977
        continueExec = false;
×
1978
        break;
×
1979
    }
1980
  }
1981

1982
  mndTransSendRpcRsp(pMnode, pTrans);
265,855✔
1983
}
265,855✔
1984

1985
// start trans, pullup, receive rsp, kill
1986
void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
148,141✔
1987
  bool topHalf = true;
148,141✔
1988
  mndTransExecuteImp(pMnode, pTrans, topHalf);
148,141✔
1989
}
148,141✔
1990

1991
// update trans
1992
void mndTransRefresh(SMnode *pMnode, STrans *pTrans) {
117,714✔
1993
  bool topHalf = false;
117,714✔
1994
  mndTransExecuteImp(pMnode, pTrans, topHalf);
117,714✔
1995
}
117,714✔
1996

1997
static int32_t mndProcessTransTimer(SRpcMsg *pReq) {
34,888✔
1998
  mTrace("start to process trans timer");
34,888✔
1999
  mndTransPullup(pReq->info.node);
34,888✔
2000
  return 0;
34,888✔
2001
}
2002

2003
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
×
2004
  SArray *pArray = NULL;
×
2005
  if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
×
2006
    pArray = pTrans->redoActions;
×
2007
  } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
×
2008
    pArray = pTrans->undoActions;
×
2009
  } else {
2010
    TAOS_RETURN(TSDB_CODE_MND_TRANS_INVALID_STAGE);
×
2011
  }
2012

2013
  if(pTrans->ableToBeKilled == false){
×
2014
    return TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED;
×
2015
  }
2016
  
2017
  if(pTrans->killMode == TRN_KILL_MODE_SKIP){
×
2018
    for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
×
2019
      STransAction *pAction = taosArrayGet(pArray, i);
×
2020
      mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id,
×
2021
            mndTransStr(pAction->stage), i, tstrerror(pAction->errCode));
2022
      pAction->msgSent = 1;
×
2023
      pAction->msgReceived = 1;
×
2024
      pAction->errCode = 0;
×
2025
    }
2026
  }
2027
  else if(pTrans->killMode == TRN_KILL_MODE_INTERUPT){
×
2028
    pTrans->stage = TRN_STAGE_PRE_FINISH;
×
2029
  }
2030
  else{
2031
    return TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED;
×
2032
  }
2033

2034
  mndTransExecute(pMnode, pTrans);
×
2035
  return 0;
×
2036
}
2037

2038
static int32_t mndProcessKillTransReq(SRpcMsg *pReq) {
1✔
2039
  SMnode       *pMnode = pReq->info.node;
1✔
2040
  SKillTransReq killReq = {0};
1✔
2041
  int32_t       code = -1;
1✔
2042
  STrans       *pTrans = NULL;
1✔
2043

2044
  if (tDeserializeSKillTransReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
1!
2045
    code = TSDB_CODE_INVALID_MSG;
×
2046
    goto _OVER;
×
2047
  }
2048

2049
  mInfo("trans:%d, start to kill", killReq.transId);
1!
2050
  if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS)) != 0) {
1!
2051
    goto _OVER;
1✔
2052
  }
2053

2054
  pTrans = mndAcquireTrans(pMnode, killReq.transId);
×
2055
  if (pTrans == NULL) {
×
2056
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2057
    if (terrno != 0) code = terrno;
×
2058
    goto _OVER;
×
2059
  }
2060

2061
  code = mndKillTrans(pMnode, pTrans);
×
2062

2063
_OVER:
1✔
2064
  if (code != 0) {
1!
2065
    mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
1!
2066
  }
2067

2068
  mndReleaseTrans(pMnode, pTrans);
1✔
2069
  TAOS_RETURN(code);
1✔
2070
}
2071

2072
static int32_t mndCompareTransId(int32_t *pTransId1, int32_t *pTransId2) { return *pTransId1 >= *pTransId2 ? 1 : 0; }
250✔
2073

2074
void mndTransPullup(SMnode *pMnode) {
35,373✔
2075
  SSdb   *pSdb = pMnode->pSdb;
35,373✔
2076
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_TRANS), sizeof(int32_t));
35,373✔
2077
  if (pArray == NULL) return;
35,373!
2078

2079
  void *pIter = NULL;
35,373✔
2080
  while (1) {
4,923✔
2081
    STrans *pTrans = NULL;
40,296✔
2082
    pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
40,296✔
2083
    if (pIter == NULL) break;
40,296✔
2084
    if (taosArrayPush(pArray, &pTrans->id) == NULL) {
9,846!
2085
      mError("failed to put trans into array, trans:%d, but pull up will continute", pTrans->id);
×
2086
    }
2087
    sdbRelease(pSdb, pTrans);
4,923✔
2088
  }
2089

2090
  taosArraySort(pArray, (__compar_fn_t)mndCompareTransId);
35,373✔
2091

2092
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
40,296✔
2093
    int32_t *pTransId = taosArrayGet(pArray, i);
4,923✔
2094
    STrans  *pTrans = mndAcquireTrans(pMnode, *pTransId);
4,923✔
2095
    if (pTrans != NULL) {
4,923!
2096
      mndTransExecute(pMnode, pTrans);
4,923✔
2097
    }
2098
    mndReleaseTrans(pMnode, pTrans);
4,923✔
2099
  }
2100
  taosArrayDestroy(pArray);
35,373✔
2101
}
2102

2103
static char *formatTimestamp(char *buf, int64_t val, int precision) {
11,955✔
2104
  time_t tt;
2105
  if (precision == TSDB_TIME_PRECISION_MICRO) {
11,955!
2106
    tt = (time_t)(val / 1000000);
×
2107
  }
2108
  if (precision == TSDB_TIME_PRECISION_NANO) {
11,955!
2109
    tt = (time_t)(val / 1000000000);
×
2110
  } else {
2111
    tt = (time_t)(val / 1000);
11,955✔
2112
  }
2113

2114
  struct tm tm;
2115
  if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
11,955!
2116
    mError("failed to get local time");
×
2117
    return NULL;
×
2118
  }
2119
  size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
11,956✔
2120

2121
  if (precision == TSDB_TIME_PRECISION_MICRO) {
11,956!
2122
    sprintf(buf + pos, ".%06d", (int)(val % 1000000));
×
2123
  } else if (precision == TSDB_TIME_PRECISION_NANO) {
11,956!
2124
    sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
×
2125
  } else {
2126
    sprintf(buf + pos, ".%03d", (int)(val % 1000));
11,956✔
2127
  }
2128

2129
  return buf;
11,956✔
2130
}
2131

2132
static void mndTransLogAction(STrans *pTrans) {
176✔
2133
  char    detail[512] = {0};
176✔
2134
  int32_t len = 0;
176✔
2135
  int32_t index = 0;
176✔
2136

2137
  if (pTrans->stage == TRN_STAGE_PREPARE) {
176!
2138
    for (int32_t i = 0; i < taosArrayGetSize(pTrans->prepareActions); ++i, ++index) {
×
2139
      len = 0;
×
2140
      STransAction *pAction = taosArrayGet(pTrans->prepareActions, i);
×
2141
      len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s\n", index,
×
2142
                      mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type),
×
2143
                      sdbStatusName(pAction->pRaw->status));
×
2144
      mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail);
×
2145
    }
2146
  }
2147

2148
  if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
176!
2149
    for (int32_t i = 0; i < taosArrayGetSize(pTrans->redoActions); ++i, ++index) {
7,565✔
2150
      len = 0;
7,388✔
2151
      STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
7,388✔
2152
      if (pAction->actionType == TRANS_ACTION_MSG) {
7,387✔
2153
        char bufStart[40] = {0};
5,977✔
2154
        (void)formatTimestamp(bufStart, pAction->startTime, TSDB_TIME_PRECISION_MILLI);
5,977✔
2155

2156
        char endStart[40] = {0};
5,978✔
2157
        (void)formatTimestamp(endStart, pAction->endTime, TSDB_TIME_PRECISION_MILLI);
5,978✔
2158
        len += snprintf(detail + len, sizeof(detail) - len,
11,956!
2159
                        "action:%d, %s:%d msgType:%s,"
2160
                        "sent:%d, received:%d, startTime:%s, endTime:%s, ",
2161
                        index, mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType), pAction->msgSent,
5,978✔
2162
                        pAction->msgReceived, bufStart, endStart);
5,978✔
2163

2164
        SEpSet epset = pAction->epSet;
5,978✔
2165
        if (epset.numOfEps > 0) {
5,978!
2166
          len += snprintf(detail + len, sizeof(detail) - len, "numOfEps:%d inUse:%d ", epset.numOfEps, epset.inUse);
5,978✔
2167
          for (int32_t i = 0; i < epset.numOfEps; ++i) {
13,862✔
2168
            len +=
7,884✔
2169
                snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
7,884✔
2170
          }
2171
        }
2172

2173
        len += snprintf(detail + len, sizeof(detail) - len, ", errCode:0x%x(%s)\n", pAction->errCode & 0xFFFF,
5,978✔
2174
                        tstrerror(pAction->errCode));
2175
      } else {
2176
        len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s, written:%d\n",
1,410✔
2177
                        index, mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type),
1,410✔
2178
                        sdbStatusName(pAction->pRaw->status), pAction->rawWritten);
1,410✔
2179
      }
2180
      mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail);
7,388✔
2181
    }
2182
  }
2183

2184
  if (pTrans->stage == TRN_STAGE_COMMIT_ACTION) {
176!
UNCOV
2185
    for (int32_t i = 0; i < taosArrayGetSize(pTrans->commitActions); ++i, ++index) {
×
UNCOV
2186
      len = 0;
×
UNCOV
2187
      STransAction *pAction = taosArrayGet(pTrans->commitActions, i);
×
UNCOV
2188
      len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s\n", index,
×
UNCOV
2189
                      mndTransStr(pAction->stage), i, sdbTableName(pAction->pRaw->type),
×
UNCOV
2190
                      sdbStatusName(pAction->pRaw->status));
×
UNCOV
2191
      mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail);
×
2192
    }
2193

UNCOV
2194
    for (int32_t i = 0; i < taosArrayGetSize(pTrans->undoActions); ++i, ++index) {
×
2195
      len = 0;
×
2196
      STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
×
2197
      if (pAction->actionType == TRANS_ACTION_MSG) {
×
2198
        len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d msgType:%s\n", index,
×
2199
                        mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType));
×
2200
        ;
2201
      } else {
2202
        len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s\n", index,
×
2203
                        mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type),
×
2204
                        sdbStatusName(pAction->pRaw->status));
×
2205
      }
2206
      mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail);
×
2207
    }
2208
  }
2209
}
176✔
2210

2211
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
7,466✔
2212
  SMnode *pMnode = pReq->info.node;
7,466✔
2213
  SSdb   *pSdb = pMnode->pSdb;
7,466✔
2214
  int32_t numOfRows = 0;
7,466✔
2215
  STrans *pTrans = NULL;
7,466✔
2216
  int32_t cols = 0;
7,466✔
2217
  int32_t code = 0;
7,466✔
2218
  int32_t lino = 0;
7,466✔
2219

2220
  while (numOfRows < rows) {
7,643!
2221
    pShow->pIter = sdbFetch(pSdb, SDB_TRANS, pShow->pIter, (void **)&pTrans);
7,643✔
2222
    if (pShow->pIter == NULL) break;
7,649✔
2223

2224
    cols = 0;
177✔
2225

2226
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
177✔
2227
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->id, false), pTrans, &lino, _OVER);
177!
2228

2229
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
177✔
2230
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->createdTime, false), pTrans, &lino,
177!
2231
                        _OVER);
2232

2233
    char stage[TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE] = {0};
177✔
2234
    STR_WITH_MAXSIZE_TO_VARSTR(stage, mndTransStr(pTrans->stage), pShow->pMeta->pSchemas[cols].bytes);
177✔
2235
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
177✔
2236
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stage, false), pTrans, &lino, _OVER);
176!
2237

2238
    char opername[TSDB_TRANS_OPER_LEN + VARSTR_HEADER_SIZE] = {0};
176✔
2239
    STR_WITH_MAXSIZE_TO_VARSTR(opername, pTrans->opername, pShow->pMeta->pSchemas[cols].bytes);
176✔
2240
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
176✔
2241
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)opername, false), pTrans, &lino, _OVER);
176!
2242

2243
    char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
177✔
2244
    STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pTrans->dbname), pShow->pMeta->pSchemas[cols].bytes);
177✔
2245
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
176✔
2246
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false), pTrans, &lino, _OVER);
177!
2247

2248
    char stbname[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
177✔
2249
    STR_WITH_MAXSIZE_TO_VARSTR(stbname, mndGetDbStr(pTrans->stbname), pShow->pMeta->pSchemas[cols].bytes);
177✔
2250
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
177✔
2251
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbname, false), pTrans, &lino, _OVER);
176!
2252

2253
    const char *killableStr = pTrans->ableToBeKilled ? "yes" : "no";
177!
2254
    char        killableVstr[10 + VARSTR_HEADER_SIZE] = {0};
177✔
2255
    STR_WITH_MAXSIZE_TO_VARSTR(killableVstr, killableStr, 10 + VARSTR_HEADER_SIZE);
177✔
2256
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
177✔
2257
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)killableVstr, false), pTrans, &lino, _OVER);
177!
2258

2259
    /*
2260
    const char *killModeStr = pTrans->killMode == TRN_KILL_MODE_SKIP ? "skip" : "interrupt";
2261
    char        killModeVstr[10 + VARSTR_HEADER_SIZE] = {0};
2262
    STR_WITH_MAXSIZE_TO_VARSTR(killModeVstr, killModeStr, 24);
2263
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2264
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)killModeVstr, false), pTrans, &lino, _OVER);
2265
    */
2266

2267
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
177✔
2268
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false), pTrans, &lino,
177!
2269
                        _OVER);
2270

2271
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
177✔
2272
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false), pTrans, &lino,
177!
2273
                        _OVER);
2274

2275
    char    lastInfo[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
177✔
2276
    char    detail[TSDB_TRANS_ERROR_LEN + 1] = {0};
177✔
2277
    int32_t len = tsnprintf(detail, sizeof(detail), "action:%d code:0x%x(%s) ", pTrans->lastAction,
530✔
2278
                            pTrans->lastErrorNo & 0xFFFF, tstrerror(pTrans->lastErrorNo));
177✔
2279
    SEpSet  epset = pTrans->lastEpset;
176✔
2280
    if (epset.numOfEps > 0) {
176✔
2281
      len += tsnprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ",
320!
2282
                       TMSG_INFO(pTrans->lastMsgType), epset.numOfEps, epset.inUse);
320✔
2283
      for (int32_t i = 0; i < pTrans->lastEpset.numOfEps; ++i) {
440✔
2284
        len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
280✔
2285
      }
2286
    }
2287
    STR_WITH_MAXSIZE_TO_VARSTR(lastInfo, detail, pShow->pMeta->pSchemas[cols].bytes);
176✔
2288
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
176✔
2289
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)lastInfo, false), pTrans, &lino, _OVER);
177!
2290

2291
    mndTransLogAction(pTrans);
176✔
2292

2293
    numOfRows++;
177✔
2294
    sdbRelease(pSdb, pTrans);
177✔
2295
  }
2296

2297
_OVER:
×
2298
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
7,472!
2299
  pShow->numOfRows += numOfRows;
7,469✔
2300
  return numOfRows;
7,469✔
2301
}
2302

2303
static int32_t mndShowTransCommonColumns(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction,
×
2304
                                         int32_t transactionId, int32_t curActionId, int32_t numOfRows, int32_t *cols) {
2305
  int32_t code = 0;
×
2306
  int32_t lino = 0;
×
2307
  int32_t len = 0;
×
2308

2309
  SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, (*cols)++);
×
2310
  TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&transactionId, false), &lino, _OVER);
×
2311

2312
  char action[30 + 1] = {0};
×
2313
  if (curActionId == pAction->id) {
×
2314
    len += snprintf(action + len, sizeof(action) - len, "%s:%d(%s)<-last", mndTransStr(pAction->stage), pAction->id,
×
2315
                    mndTransTypeStr(pAction->actionType));
2316
  } else {
2317
    len += snprintf(action + len, sizeof(action) - len, "%s:%d(%s)", mndTransStr(pAction->stage), pAction->id,
×
2318
                    mndTransTypeStr(pAction->actionType));
2319
  }
2320
  char actionVStr[30 + VARSTR_HEADER_SIZE] = {0};
×
2321
  STR_WITH_MAXSIZE_TO_VARSTR(actionVStr, action, pShow->pMeta->pSchemas[*cols].bytes);
×
2322
  pColInfo = taosArrayGet(pBlock->pDataBlock, (*cols)++);
×
2323
  TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)actionVStr, false), &lino, _OVER);
×
2324
_OVER:
×
2325
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
2326
  return code;
×
2327
}
2328

2329
static void mndShowTransAction(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction, int32_t transactionId,
×
2330
                               int32_t curActionId, int32_t rows, int32_t numOfRows) {
2331
  int32_t code = 0;
×
2332
  int32_t lino = 0;
×
2333
  int32_t len = 0;
×
2334
  int32_t cols = 0;
×
2335

2336
  cols = 0;
×
2337

2338
  if (mndShowTransCommonColumns(pShow, pBlock, pAction, transactionId, curActionId, numOfRows, &cols) != 0) return;
×
2339

2340
  if (pAction->actionType == TRANS_ACTION_MSG) {
×
2341
    int32_t len = 0;
×
2342

2343
    char objType[TSDB_TRANS_OBJTYPE_LEN + 1] = {0};
×
2344
    len += snprintf(objType + len, sizeof(objType) - len, "%s(s:%d,r:%d)", TMSG_INFO(pAction->msgType),
×
2345
                    pAction->msgSent, pAction->msgReceived);
×
2346
    char objTypeVStr[TSDB_TRANS_OBJTYPE_LEN + VARSTR_HEADER_SIZE] = {0};
×
2347
    STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes);
×
2348
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2349
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER);
×
2350

2351
    char result[TSDB_TRANS_RESULT_LEN + 1] = {0};
×
2352
    len = 0;
×
2353
    len += snprintf(result + len, sizeof(result) - len, "errCode:0x%x(%s)", pAction->errCode & 0xFFFF,
×
2354
                    tstrerror(pAction->errCode));
2355
    char resultVStr[TSDB_TRANS_RESULT_LEN + VARSTR_HEADER_SIZE] = {0};
×
2356
    STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes);
×
2357
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2358
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER);
×
2359

2360
    char target[TSDB_TRANS_TARGET_LEN] = {0};
×
2361
    len = 0;
×
2362
    SEpSet epset = pAction->epSet;
×
2363
    if (epset.numOfEps > 0) {
×
2364
      for (int32_t i = 0; i < epset.numOfEps; ++i) {
×
2365
        len += snprintf(target + len, sizeof(target) - len, "ep:%d-%s:%u,", i, epset.eps[i].fqdn, epset.eps[i].port);
×
2366
      }
2367
      len += snprintf(target + len, sizeof(target) - len, "(%d:%d) ", epset.numOfEps, epset.inUse);
×
2368
    }
2369
    char targetVStr[TSDB_TRANS_TARGET_LEN + VARSTR_HEADER_SIZE] = {0};
×
2370
    STR_WITH_MAXSIZE_TO_VARSTR(targetVStr, target, pShow->pMeta->pSchemas[cols].bytes);
×
2371
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2372
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)targetVStr, false), &lino, _OVER);
×
2373

2374
    char detail[TSDB_TRANS_DETAIL_LEN] = {0};
×
2375
    len = 0;
×
2376
    char bufStart[40] = {0};
×
2377
    if (pAction->startTime > 0) (void)formatTimestamp(bufStart, pAction->startTime, TSDB_TIME_PRECISION_MILLI);
×
2378
    char bufEnd[40] = {0};
×
2379
    if (pAction->endTime > 0) (void)formatTimestamp(bufEnd, pAction->endTime, TSDB_TIME_PRECISION_MILLI);
×
2380
    len += snprintf(detail + len, sizeof(detail) - len, "startTime:%s, endTime:%s, ", bufStart, bufEnd);
×
2381
    char detailVStr[TSDB_TRANS_DETAIL_LEN + VARSTR_HEADER_SIZE] = {0};
×
2382
    STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes);
×
2383
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2384
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER);
×
2385

2386
  } else {
2387
    int32_t len = 0;
×
2388

2389
    char objType[TSDB_TRANS_OBJTYPE_LEN + 1] = {0};
×
2390
    if (pAction->pRaw->type == SDB_VGROUP) {
×
2391
      SSdbRow *pRow = mndVgroupActionDecode(pAction->pRaw);
×
2392
      SVgObj  *pVgroup = sdbGetRowObj(pRow);
×
2393
      len += snprintf(objType + len, sizeof(objType) - len, "%s(%d)", sdbTableName(pAction->pRaw->type), pVgroup->vgId);
×
2394
      taosMemoryFreeClear(pRow);
×
2395
    } else {
2396
      strcpy(objType, sdbTableName(pAction->pRaw->type));
×
2397
    }
2398
    char objTypeVStr[TSDB_TRANS_OBJTYPE_LEN + VARSTR_HEADER_SIZE] = {0};
×
2399
    STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes);
×
2400
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2401
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER);
×
2402

2403
    char result[TSDB_TRANS_RESULT_LEN + 1] = {0};
×
2404
    len = 0;
×
2405
    len += snprintf(result + len, sizeof(result) - len, "rawWritten:%d", pAction->rawWritten);
×
2406
    char resultVStr[TSDB_TRANS_RESULT_LEN + VARSTR_HEADER_SIZE] = {0};
×
2407
    STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes);
×
2408
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2409
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER);
×
2410

2411
    char target[TSDB_TRANS_TARGET_LEN] = "";
×
2412
    char targetVStr[TSDB_TRANS_TARGET_LEN + VARSTR_HEADER_SIZE] = {0};
×
2413
    STR_WITH_MAXSIZE_TO_VARSTR(targetVStr, target, pShow->pMeta->pSchemas[cols].bytes);
×
2414
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2415
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)targetVStr, false), &lino, _OVER);
×
2416

2417
    char detail[TSDB_TRANS_DETAIL_LEN] = {0};
×
2418
    len = 0;
×
2419
    len += snprintf(detail + len, sizeof(detail) - len, "sdbStatus:%s", sdbStatusName(pAction->pRaw->status));
×
2420
    char detailVStr[TSDB_TRANS_DETAIL_LEN + VARSTR_HEADER_SIZE] = {0};
×
2421
    STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes);
×
2422
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2423
    TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER);
×
2424
  }
2425

2426
_OVER:
×
2427
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
2428
}
2429

2430
static SArray *mndTransGetAction(STrans *pTrans, ETrnStage stage) {
×
2431
  if (stage == TRN_STAGE_PREPARE) {
×
2432
    return pTrans->prepareActions;
×
2433
  }
2434
  if (stage == TRN_STAGE_REDO_ACTION) {
×
2435
    return pTrans->redoActions;
×
2436
  }
2437
  if (stage == TRN_STAGE_COMMIT_ACTION) {
×
2438
    return pTrans->commitActions;
×
2439
  }
2440
  if (stage == TRN_STAGE_UNDO_ACTION) {
×
2441
    return pTrans->undoActions;
×
2442
  }
2443
  return NULL;
×
2444
}
2445

2446
typedef struct STransDetailIter {
2447
  void     *pIter;
2448
  STrans   *pTrans;
2449
  ETrnStage stage;
2450
  int32_t   num;
2451
} STransDetailIter;
2452

2453
static void mndTransShowActions(SSdb *pSdb, STransDetailIter *pShowIter, SShowObj *pShow, SSDataBlock *pBlock,
×
2454
                                int32_t rows, int32_t *numOfRows, SArray *pActions, int32_t end, int32_t start) {
2455
  int32_t actionNum = taosArrayGetSize(pActions);
×
2456
  mInfo("stage:%s, Actions num:%d", mndTransStr(pShowIter->stage), actionNum);
×
2457

2458
  for (int32_t i = start; i < actionNum; ++i) {
×
2459
    STransAction *pAction = taosArrayGet(pShowIter->pTrans->redoActions, i);
×
2460
    mndShowTransAction(pShow, pBlock, pAction, pShowIter->pTrans->id, pShowIter->pTrans->lastAction, rows, *numOfRows);
×
2461
    (*numOfRows)++;
×
2462
    if (*numOfRows >= rows) break;
×
2463
  }
2464

2465
  if (*numOfRows == end) {
×
2466
    sdbRelease(pSdb, pShowIter->pTrans);
×
2467
    pShowIter->pTrans = NULL;
×
2468
    pShowIter->num = 0;
×
2469
  } else {
2470
    pShowIter->pTrans = pShowIter->pTrans;
×
2471
    pShowIter->stage = pShowIter->pTrans->stage;
×
2472
    pShowIter->num += (*numOfRows);
×
2473
  }
2474
}
×
2475

2476
static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
2477
  SMnode *pMnode = pReq->info.node;
×
2478
  SSdb   *pSdb = pMnode->pSdb;
×
2479
  int32_t numOfRows = 0;
×
2480

2481
  int32_t code = 0;
×
2482
  int32_t lino = 0;
×
2483

2484
  mInfo("start to mndRetrieveTransDetail, rows:%d, pShow->numOfRows:%d, pShow->pIter:%p", rows, pShow->numOfRows,
×
2485
        pShow->pIter);
2486

2487
  if (pShow->pIter == NULL) {
×
2488
    pShow->pIter = taosMemoryMalloc(sizeof(STransDetailIter));
×
2489
    if (pShow->pIter == NULL) {
×
2490
      mError("failed to malloc for pShow->pIter");
×
2491
      return 0;
×
2492
    }
2493
    memset(pShow->pIter, 0, sizeof(STransDetailIter));
×
2494
  }
2495

2496
  STransDetailIter *pShowIter = (STransDetailIter *)pShow->pIter;
×
2497

2498
  while (numOfRows < rows) {
×
2499
    if (pShowIter->pTrans == NULL) {
×
2500
      pShowIter->pIter = sdbFetch(pSdb, SDB_TRANS, pShowIter->pIter, (void **)&(pShowIter->pTrans));
×
2501
      mDebug("retrieve trans detail from fetch, pShow->pIter:%p, pTrans:%p", pShowIter->pIter, pShowIter->pTrans);
×
2502
      if (pShowIter->pIter == NULL) break;
×
2503
      mInfo("retrieve trans detail from fetch, id:%d, trans stage:%d, IterNum:%d", pShowIter->pTrans->id,
×
2504
            pShowIter->pTrans->stage, pShowIter->num);
2505

2506
      SArray *pActions = mndTransGetAction(pShowIter->pTrans, pShowIter->pTrans->stage);
×
2507

2508
      mndTransShowActions(pSdb, pShowIter, pShow, pBlock, rows, &numOfRows, pActions, taosArrayGetSize(pActions), 0);
×
2509
      break;
×
2510
    } else {
2511
      mInfo("retrieve trans detail from iter, id:%d, iterStage:%d, IterNum:%d", pShowIter->pTrans->id, pShowIter->stage,
×
2512
            pShowIter->num);
2513
      SArray *pActions = mndTransGetAction(pShowIter->pTrans, pShowIter->stage);
×
2514

2515
      mndTransShowActions(pSdb, pShowIter, pShow, pBlock, rows, &numOfRows, pActions,
×
2516
                          taosArrayGetSize(pActions) - pShowIter->num, pShowIter->num);
×
2517
      break;
×
2518
    }
2519
  }
2520

2521
_OVER:
×
2522
  pShow->numOfRows += numOfRows;
×
2523

2524
  if (code != 0) {
×
2525
    mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
2526
  } else {
2527
    mInfo("retrieve trans detail, numOfRows:%d, pShow->numOfRows:%d", numOfRows, pShow->numOfRows)
×
2528
  }
2529
  if (numOfRows == 0) {
×
2530
    taosMemoryFree(pShow->pIter);
×
2531
    pShow->pIter = NULL;
×
2532
  }
2533
  return numOfRows;
×
2534
}
2535

2536
static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter) {
×
2537
  SSdb *pSdb = pMnode->pSdb;
×
2538
  sdbCancelFetchByType(pSdb, pIter, SDB_TRANS);
×
2539
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc