• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3541

26 Nov 2024 03:56AM UTC coverage: 60.776% (-0.07%) from 60.846%
#3541

push

travis-ci

web-flow
Merge pull request #28920 from taosdata/fix/TD-33008-3.0

fix(query)[TD-33008]. fix error handling in tsdbCacheRead

120076 of 252763 branches covered (47.51%)

Branch coverage included in aggregate %.

0 of 2 new or added lines in 1 file covered. (0.0%)

1395 existing lines in 154 files now uncovered.

200995 of 275526 relevant lines covered (72.95%)

19612328.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.78
/source/dnode/mnode/impl/src/mndCompact.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndCompact.h"
16
#include "audit.h"
17
#include "mndCompactDetail.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndPrivilege.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tmisce.h"
25
#include "tmsgcb.h"
26

27
#define MND_COMPACT_VER_NUMBER 1
28
#define MND_COMPACT_ID_LEN     11
29

30
static int32_t mndProcessCompactTimer(SRpcMsg *pReq);
31

32
int32_t mndInitCompact(SMnode *pMnode) {
1,996✔
33
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
1,996✔
34
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
1,996✔
35
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
1,996✔
36
  mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
1,996✔
37
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_COMPACT_RSP, mndTransProcessRsp);
1,996✔
38

39
  SSdbTable table = {
1,996✔
40
      .sdbType = SDB_COMPACT,
41
      .keyType = SDB_KEY_INT32,
42
      .encodeFp = (SdbEncodeFp)mndCompactActionEncode,
43
      .decodeFp = (SdbDecodeFp)mndCompactActionDecode,
44
      .insertFp = (SdbInsertFp)mndCompactActionInsert,
45
      .updateFp = (SdbUpdateFp)mndCompactActionUpdate,
46
      .deleteFp = (SdbDeleteFp)mndCompactActionDelete,
47
  };
48

49
  return sdbSetTable(pMnode->pSdb, table);
1,996✔
50
}
51

52
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
1,995✔
53

54
void tFreeCompactObj(SCompactObj *pCompact) {}
12✔
55

56
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
36✔
57
  SEncoder encoder = {0};
36✔
58
  int32_t  code = 0;
36✔
59
  int32_t  lino;
60
  int32_t  tlen;
61
  tEncoderInit(&encoder, buf, bufLen);
36✔
62

63
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
36!
64
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId));
72!
65
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
72!
66
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
72!
67

68
  tEndEncode(&encoder);
36✔
69

70
_exit:
36✔
71
  if (code) {
36!
72
    tlen = code;
×
73
  } else {
74
    tlen = encoder.pos;
36✔
75
  }
76
  tEncoderClear(&encoder);
36✔
77
  return tlen;
36✔
78
}
79

80
int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
12✔
81
  int32_t  code = 0;
12✔
82
  int32_t  lino;
83
  SDecoder decoder = {0};
12✔
84
  tDecoderInit(&decoder, buf, bufLen);
12✔
85

86
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
12!
87
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId));
24!
88
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
12!
89
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
24!
90

91
  tEndDecode(&decoder);
12✔
92

93
_exit:
12✔
94
  tDecoderClear(&decoder);
12✔
95
  return code;
12✔
96
}
97

98
SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
18✔
99
  int32_t code = 0;
18✔
100
  int32_t lino = 0;
18✔
101
  terrno = TSDB_CODE_SUCCESS;
18✔
102

103
  void    *buf = NULL;
18✔
104
  SSdbRaw *pRaw = NULL;
18✔
105

106
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact);
18✔
107
  if (tlen < 0) {
18!
108
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
109
    goto OVER;
×
110
  }
111

112
  int32_t size = sizeof(int32_t) + tlen;
18✔
113
  pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size);
18✔
114
  if (pRaw == NULL) {
18!
115
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
116
    goto OVER;
×
117
  }
118

119
  buf = taosMemoryMalloc(tlen);
18✔
120
  if (buf == NULL) {
18!
121
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
122
    goto OVER;
×
123
  }
124

125
  tlen = tSerializeSCompactObj(buf, tlen, pCompact);
18✔
126
  if (tlen < 0) {
18!
127
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
128
    goto OVER;
×
129
  }
130

131
  int32_t dataPos = 0;
18✔
132
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
18!
133
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
18!
134
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
18!
135

136
OVER:
18✔
137
  taosMemoryFreeClear(buf);
18!
138
  if (terrno != TSDB_CODE_SUCCESS) {
18!
139
    mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
140
    sdbFreeRaw(pRaw);
×
141
    return NULL;
×
142
  }
143

144
  mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
18!
145
  return pRaw;
18✔
146
}
147

148
SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
12✔
149
  int32_t      code = 0;
12✔
150
  int32_t      lino = 0;
12✔
151
  SSdbRow     *pRow = NULL;
12✔
152
  SCompactObj *pCompact = NULL;
12✔
153
  void        *buf = NULL;
12✔
154
  terrno = TSDB_CODE_SUCCESS;
12✔
155

156
  int8_t sver = 0;
12✔
157
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
12!
158
    goto OVER;
×
159
  }
160

161
  if (sver != MND_COMPACT_VER_NUMBER) {
12!
162
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
163
    mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
×
164
    goto OVER;
×
165
  }
166

167
  pRow = sdbAllocRow(sizeof(SCompactObj));
12✔
168
  if (pRow == NULL) {
12!
169
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
170
    goto OVER;
×
171
  }
172

173
  pCompact = sdbGetRowObj(pRow);
12✔
174
  if (pCompact == NULL) {
12!
175
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
176
    goto OVER;
×
177
  }
178

179
  int32_t tlen;
180
  int32_t dataPos = 0;
12✔
181
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
12!
182
  buf = taosMemoryMalloc(tlen + 1);
12✔
183
  if (buf == NULL) {
12!
184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
185
    goto OVER;
×
186
  }
187
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
12!
188

189
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
12!
190
    goto OVER;
×
191
  }
192

193
OVER:
12✔
194
  taosMemoryFreeClear(buf);
12!
195
  if (terrno != TSDB_CODE_SUCCESS) {
12!
196
    mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
197
    taosMemoryFreeClear(pRow);
×
198
    return NULL;
×
199
  }
200

201
  mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
12!
202
  return pRow;
12✔
203
}
204

205
int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) {
9✔
206
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
9!
207
  return 0;
9✔
208
}
209

210
int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
12✔
211
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
12!
212
  tFreeCompactObj(pCompact);
12✔
213
  return 0;
12✔
214
}
215

216
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
×
217
  mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
×
218
         pNewCompact);
219

220
  return 0;
×
221
}
222

223
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
21✔
224
  SSdb        *pSdb = pMnode->pSdb;
21✔
225
  SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
21✔
226
  if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
21!
227
    terrno = TSDB_CODE_SUCCESS;
×
228
  }
229
  return pCompact;
21✔
230
}
231

232
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
21✔
233
  SSdb *pSdb = pMnode->pSdb;
21✔
234
  sdbRelease(pSdb, pCompact);
21✔
235
  pCompact = NULL;
21✔
236
}
21✔
237

238
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len) {
9✔
239
  int32_t      code = 0;
9✔
240
  SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
9✔
241
  if (pCompact == NULL) {
9!
242
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
243
    if (terrno != 0) code = terrno;
×
244
    TAOS_RETURN(code);
×
245
  }
246

247
  (void)strncpy(dbname, pCompact->dbname, len);
9✔
248
  mndReleaseCompact(pMnode, pCompact);
9✔
249
  TAOS_RETURN(code);
9✔
250
}
251

252
// compact db
253
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
9✔
254
  int32_t code = 0;
9✔
255
  pCompact->compactId = tGenIdPI32();
9✔
256

257
  (void)strcpy(pCompact->dbname, pDb->name);
9✔
258

259
  pCompact->startTime = taosGetTimestampMs();
9✔
260

261
  SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
9✔
262
  if (pVgRaw == NULL) {
9!
263
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
264
    if (terrno != 0) code = terrno;
×
265
    TAOS_RETURN(code);
×
266
  }
267
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
9!
268
    sdbFreeRaw(pVgRaw);
×
269
    TAOS_RETURN(code);
×
270
  }
271

272
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
9!
273
    sdbFreeRaw(pVgRaw);
×
274
    TAOS_RETURN(code);
×
275
  }
276

277
  rsp->compactId = pCompact->compactId;
9✔
278

279
  return 0;
9✔
280
}
281

282
// retrieve compact
283
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
2,214✔
284
  SMnode      *pMnode = pReq->info.node;
2,214✔
285
  SSdb        *pSdb = pMnode->pSdb;
2,214✔
286
  int32_t      numOfRows = 0;
2,214✔
287
  SCompactObj *pCompact = NULL;
2,214✔
288
  char        *sep = NULL;
2,214✔
289
  SDbObj      *pDb = NULL;
2,214✔
290
  int32_t      code = 0;
2,214✔
291
  int32_t      lino = 0;
2,214✔
292

293
  if (strlen(pShow->db) > 0) {
2,214!
294
    sep = strchr(pShow->db, '.');
×
295
    if (sep &&
×
296
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
297
      sep++;
×
298
    } else {
299
      pDb = mndAcquireDb(pMnode, pShow->db);
×
300
      if (pDb == NULL) return terrno;
×
301
    }
302
  }
303

304
  while (numOfRows < rows) {
2,265!
305
    pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact);
2,265✔
306
    if (pShow->pIter == NULL) break;
2,265✔
307

308
    SColumnInfoData *pColInfo;
309
    SName            n;
310
    int32_t          cols = 0;
51✔
311

312
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
51✔
313

314
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
51✔
315
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false), pCompact, &lino,
51!
316
                        _OVER);
317

318
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
51✔
319
    if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
102!
320
      SName name = {0};
51✔
321
      TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
51!
322
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
51✔
323
    } else {
324
      (void)strncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
×
325
    }
326
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
51✔
327
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
51!
328

329
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
51✔
330
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false), pCompact, &lino,
51!
331
                        _OVER);
332

333
    numOfRows++;
51✔
334
    sdbRelease(pSdb, pCompact);
51✔
335
  }
336

UNCOV
337
_OVER:
×
338
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
2,214!
339
  pShow->numOfRows += numOfRows;
2,214✔
340
  mndReleaseDb(pMnode, pDb);
2,214✔
341
  return numOfRows;
2,213✔
342
}
343

344
// kill compact
345
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
×
346
                                    int32_t dnodeid) {
347
  SVKillCompactReq req = {0};
×
348
  req.compactId = compactId;
×
349
  req.vgId = pVgroup->vgId;
×
350
  req.dnodeId = dnodeid;
×
351

352
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
×
353
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
×
354
  if (contLen < 0) {
×
355
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
356
    return NULL;
×
357
  }
358
  contLen += sizeof(SMsgHead);
×
359

360
  void *pReq = taosMemoryMalloc(contLen);
×
361
  if (pReq == NULL) {
×
362
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
363
    return NULL;
×
364
  }
365

366
  SMsgHead *pHead = pReq;
×
367
  pHead->contLen = htonl(contLen);
×
368
  pHead->vgId = htonl(pVgroup->vgId);
×
369

370
  if ((contLen = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
×
371
    terrno = contLen;
×
372
    return NULL;
×
373
  }
374
  *pContLen = contLen;
×
375
  return pReq;
×
376
}
377

378
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
×
379
                                       int32_t dnodeid) {
380
  int32_t      code = 0;
×
381
  STransAction action = {0};
×
382

383
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
×
384
  if (pDnode == NULL) {
×
385
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
386
    if (terrno != 0) code = terrno;
×
387
    TAOS_RETURN(code);
×
388
  }
389
  action.epSet = mndGetDnodeEpset(pDnode);
×
390
  mndReleaseDnode(pMnode, pDnode);
×
391

392
  int32_t contLen = 0;
×
393
  void   *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
×
394
  if (pReq == NULL) {
×
395
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
396
    if (terrno != 0) code = terrno;
×
397
    TAOS_RETURN(code);
×
398
  }
399

400
  action.pCont = pReq;
×
401
  action.contLen = contLen;
×
402
  action.msgType = TDMT_VND_KILL_COMPACT;
×
403

404
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
405
    taosMemoryFree(pReq);
×
406
    TAOS_RETURN(code);
×
407
  }
408

409
  return 0;
×
410
}
411

412
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
×
413
  int32_t code = 0;
×
414
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
×
415
  if (pTrans == NULL) {
×
416
    mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
×
417
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
418
    if (terrno != 0) code = terrno;
×
419
    TAOS_RETURN(code);
×
420
  }
421
  mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
×
422

423
  mndTransSetDbName(pTrans, pCompact->dbname, NULL);
×
424

425
  SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
×
426
  if (pCommitRaw == NULL) {
×
427
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
428
    if (terrno != 0) code = terrno;
×
429
    mndTransDrop(pTrans);
×
430
    TAOS_RETURN(code);
×
431
  }
432
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
433
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
434
    mndTransDrop(pTrans);
×
435
    TAOS_RETURN(code);
×
436
  }
437
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
×
438
    mndTransDrop(pTrans);
×
439
    TAOS_RETURN(code);
×
440
  }
441

442
  void *pIter = NULL;
×
443
  while (1) {
×
444
    SCompactDetailObj *pDetail = NULL;
×
445
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
446
    if (pIter == NULL) break;
×
447

448
    if (pDetail->compactId == pCompact->compactId) {
×
449
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
×
450
      if (pVgroup == NULL) {
×
451
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
452
        sdbCancelFetch(pMnode->pSdb, pIter);
×
453
        sdbRelease(pMnode->pSdb, pDetail);
×
454
        mndTransDrop(pTrans);
×
455
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
        if (terrno != 0) code = terrno;
×
457
        TAOS_RETURN(code);
×
458
      }
459

460
      if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
×
461
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
462
        sdbCancelFetch(pMnode->pSdb, pIter);
×
463
        sdbRelease(pMnode->pSdb, pDetail);
×
464
        mndTransDrop(pTrans);
×
465
        TAOS_RETURN(code);
×
466
      }
467

468
      mndReleaseVgroup(pMnode, pVgroup);
×
469

470
      /*
471
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
472
      if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
473
        mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
474
        mndTransDrop(pTrans);
475
        return -1;
476
      }
477
      sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
478
      */
479
    }
480

481
    sdbRelease(pMnode->pSdb, pDetail);
×
482
  }
483

484
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
485
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
486
    mndTransDrop(pTrans);
×
487
    TAOS_RETURN(code);
×
488
  }
489

490
  mndTransDrop(pTrans);
×
491
  return 0;
×
492
}
493

494
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
×
495
  int32_t         code = 0;
×
496
  int32_t         lino = 0;
×
497
  SKillCompactReq killCompactReq = {0};
×
498

499
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
×
500
    TAOS_RETURN(code);
×
501
  }
502

503
  mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
×
504

505
  SMnode      *pMnode = pReq->info.node;
×
506
  SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
×
507
  if (pCompact == NULL) {
×
508
    code = TSDB_CODE_MND_INVALID_COMPACT_ID;
×
509
    tFreeSKillCompactReq(&killCompactReq);
×
510
    TAOS_RETURN(code);
×
511
  }
512

513
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB), &lino, _OVER);
×
514

515
  TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
×
516

517
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
518

519
  char obj[TSDB_INT32_ID_LEN] = {0};
×
520
  (void)sprintf(obj, "%d", pCompact->compactId);
×
521

522
  auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql, killCompactReq.sqlLen);
×
523

524
_OVER:
×
525
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
526
    mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
×
527
  }
528

529
  tFreeSKillCompactReq(&killCompactReq);
×
530
  mndReleaseCompact(pMnode, pCompact);
×
531

532
  TAOS_RETURN(code);
×
533
}
534

535
// update progress
536
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
22✔
537
                                        SQueryCompactProgressRsp *rsp) {
538
  int32_t code = 0;
22✔
539

540
  void *pIter = NULL;
22✔
541
  while (1) {
51✔
542
    SCompactDetailObj *pDetail = NULL;
73✔
543
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
73✔
544
    if (pIter == NULL) break;
73!
545

546
    if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
73!
547
      pDetail->newNumberFileset = rsp->numberFileset;
22✔
548
      pDetail->newFinished = rsp->finished;
22✔
549

550
      sdbCancelFetch(pMnode->pSdb, pIter);
22✔
551
      sdbRelease(pMnode->pSdb, pDetail);
22✔
552

553
      TAOS_RETURN(code);
22✔
554
    }
555

556
    sdbRelease(pMnode->pSdb, pDetail);
51✔
557
  }
558

559
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
560
}
561

562
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
22✔
563
  int32_t                  code = 0;
22✔
564
  SQueryCompactProgressRsp req = {0};
22✔
565
  if (pReq->code != 0) {
22!
566
    mError("received wrong compact response, req code is %s", tstrerror(pReq->code));
×
567
    TAOS_RETURN(pReq->code);
×
568
  }
569
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
22✔
570
  if (code != 0) {
22!
571
    mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
572
           pReq->contLen);
573
    TAOS_RETURN(code);
×
574
  }
575

576
  mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
22✔
577
         req.vgId, req.dnodeId, req.numberFileset, req.finished);
578

579
  SMnode *pMnode = pReq->info.node;
22✔
580

581
  code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
22✔
582
  if (code != 0) {
22!
583
    mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
584
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
585
    TAOS_RETURN(code);
×
586
  }
587

588
  TAOS_RETURN(code);
22✔
589
}
590

591
// timer
592
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
9✔
593
  void *pIter = NULL;
9✔
594

595
  while (1) {
38✔
596
    SCompactDetailObj *pDetail = NULL;
47✔
597
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
47✔
598
    if (pIter == NULL) break;
47✔
599

600
    if (pDetail->compactId == pCompact->compactId) {
38!
601
      SEpSet epSet = {0};
38✔
602

603
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
38✔
604
      if (pDnode == NULL) break;
38!
605
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
38!
606
        sdbRelease(pMnode->pSdb, pDetail);
×
607
        continue;
16✔
608
      }
609
      mndReleaseDnode(pMnode, pDnode);
38✔
610

611
      SQueryCompactProgressReq req;
612
      req.compactId = pDetail->compactId;
38✔
613
      req.vgId = pDetail->vgId;
38✔
614
      req.dnodeId = pDetail->dnodeId;
38✔
615

616
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
38✔
617
      if (contLen < 0) {
38!
618
        sdbRelease(pMnode->pSdb, pDetail);
×
619
        continue;
×
620
      }
621

622
      contLen += sizeof(SMsgHead);
38✔
623

624
      SMsgHead *pHead = rpcMallocCont(contLen);
38✔
625
      if (pHead == NULL) {
38!
626
        sdbRelease(pMnode->pSdb, pDetail);
×
627
        continue;
×
628
      }
629

630
      pHead->contLen = htonl(contLen);
38✔
631
      pHead->vgId = htonl(pDetail->vgId);
38✔
632

633
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
38!
634
        sdbRelease(pMnode->pSdb, pDetail);
×
635
        continue;
×
636
      }
637

638
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
38✔
639

640
      rpcMsg.pCont = pHead;
38✔
641

642
      char    detail[1024] = {0};
38✔
643
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
76!
644
                             TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
76✔
645
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
76✔
646
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
38✔
647
      }
648

649
      mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
38✔
650

651
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
38✔
652
        sdbRelease(pMnode->pSdb, pDetail);
16✔
653
        continue;
16✔
654
      }
655
    }
656

657
    sdbRelease(pMnode->pSdb, pDetail);
22✔
658
  }
659
}
9✔
660

661
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
9✔
662
  int32_t code = 0;
9✔
663
  bool    needSave = false;
9✔
664
  void   *pIter = NULL;
9✔
665
  while (1) {
38✔
666
    SCompactDetailObj *pDetail = NULL;
47✔
667
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
47✔
668
    if (pIter == NULL) break;
47✔
669

670
    if (pDetail->compactId == compactId) {
38!
671
      mDebug(
38✔
672
          "compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
673
          "newNumberFileset:%d, newFinished:%d",
674
          pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
675
          pDetail->newNumberFileset, pDetail->newFinished);
676

677
      // these 2 number will jump back after dnode restart, so < is not used here
678
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
38!
679
        needSave = true;
6✔
680
    }
681

682
    sdbRelease(pMnode->pSdb, pDetail);
38✔
683
  }
684

685
  char dbname[TSDB_TABLE_FNAME_LEN] = {0};
9✔
686
  TAOS_CHECK_RETURN(mndCompactGetDbName(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN));
9!
687

688
  if (!mndDbIsExist(pMnode, dbname)) {
9!
689
    needSave = true;
×
690
    mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
×
691
  }
692

693
  if (!needSave) {
9✔
694
    mDebug("compact:%" PRId32 ", no need to save", compactId);
6✔
695
    TAOS_RETURN(code);
6✔
696
  }
697

698
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
3✔
699
  if (pTrans == NULL) {
3!
700
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
701
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
702
    if (terrno != 0) code = terrno;
×
703
    TAOS_RETURN(code);
×
704
  }
705
  mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
3!
706

707
  mndTransSetDbName(pTrans, dbname, NULL);
3✔
708

709
  pIter = NULL;
3✔
710
  while (1) {
6✔
711
    SCompactDetailObj *pDetail = NULL;
9✔
712
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
9✔
713
    if (pIter == NULL) break;
9✔
714

715
    if (pDetail->compactId == compactId) {
6!
716
      mInfo(
6!
717
          "compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
718
          "newNumberFileset:%d, newFinished:%d",
719
          pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
720
          pDetail->newNumberFileset, pDetail->newFinished);
721

722
      pDetail->numberFileset = pDetail->newNumberFileset;
6✔
723
      pDetail->finished = pDetail->newFinished;
6✔
724

725
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
6✔
726
      if (pCommitRaw == NULL) {
6!
727
        sdbCancelFetch(pMnode->pSdb, pIter);
×
728
        sdbRelease(pMnode->pSdb, pDetail);
×
729
        mndTransDrop(pTrans);
×
730
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
731
        if (terrno != 0) code = terrno;
×
732
        TAOS_RETURN(code);
×
733
      }
734
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
6!
735
        mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
×
736
        sdbCancelFetch(pMnode->pSdb, pIter);
×
737
        sdbRelease(pMnode->pSdb, pDetail);
×
738
        mndTransDrop(pTrans);
×
739
        TAOS_RETURN(code);
×
740
      }
741
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
6!
742
        sdbCancelFetch(pMnode->pSdb, pIter);
×
743
        sdbRelease(pMnode->pSdb, pDetail);
×
744
        mndTransDrop(pTrans);
×
745
        TAOS_RETURN(code);
×
746
      }
747
    }
748

749
    sdbRelease(pMnode->pSdb, pDetail);
6✔
750
  }
751

752
  bool allFinished = true;
3✔
753
  pIter = NULL;
3✔
754
  while (1) {
6✔
755
    SCompactDetailObj *pDetail = NULL;
9✔
756
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
9✔
757
    if (pIter == NULL) break;
9✔
758

759
    if (pDetail->compactId == compactId) {
6!
760
      mInfo("compact:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
6!
761
            pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
762

763
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
6!
764
        allFinished = false;
×
765
        sdbCancelFetch(pMnode->pSdb, pIter);
×
766
        sdbRelease(pMnode->pSdb, pDetail);
×
767
        break;
×
768
      }
769
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
6!
770
        allFinished = false;
×
771
        sdbCancelFetch(pMnode->pSdb, pIter);
×
772
        sdbRelease(pMnode->pSdb, pDetail);
×
773
        break;
×
774
      }
775
    }
776

777
    sdbRelease(pMnode->pSdb, pDetail);
6✔
778
  }
779

780
  if (!mndDbIsExist(pMnode, dbname)) {
3!
781
    allFinished = true;
×
782
    mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
×
783
  }
784

785
  if (allFinished) {
3!
786
    mInfo("compact:%d, all finished", compactId);
3!
787
    pIter = NULL;
3✔
788
    while (1) {
6✔
789
      SCompactDetailObj *pDetail = NULL;
9✔
790
      pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
9✔
791
      if (pIter == NULL) break;
9✔
792

793
      if (pDetail->compactId == compactId) {
6!
794
        SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
6✔
795
        if (pCommitRaw == NULL) {
6!
796
          mndTransDrop(pTrans);
×
797
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
798
          if (terrno != 0) code = terrno;
×
799
          TAOS_RETURN(code);
×
800
        }
801
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
6!
802
          mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
×
803
                 terrstr());
804
          sdbCancelFetch(pMnode->pSdb, pIter);
×
805
          sdbRelease(pMnode->pSdb, pDetail);
×
806
          mndTransDrop(pTrans);
×
807
          TAOS_RETURN(code);
×
808
        }
809
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
6!
810
          sdbCancelFetch(pMnode->pSdb, pIter);
×
811
          sdbRelease(pMnode->pSdb, pDetail);
×
812
          mndTransDrop(pTrans);
×
813
          TAOS_RETURN(code);
×
814
        }
815
        mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
6!
816
      }
817

818
      sdbRelease(pMnode->pSdb, pDetail);
6✔
819
    }
820

821
    SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
3✔
822
    if (pCompact == NULL) {
3!
823
      mndTransDrop(pTrans);
×
824
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
825
      if (terrno != 0) code = terrno;
×
826
      TAOS_RETURN(code);
×
827
    }
828
    SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
3✔
829
    mndReleaseCompact(pMnode, pCompact);
3✔
830
    if (pCommitRaw == NULL) {
3!
831
      mndTransDrop(pTrans);
×
832
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
833
      if (terrno != 0) code = terrno;
×
834
      TAOS_RETURN(code);
×
835
    }
836
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
3!
837
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
838
      mndTransDrop(pTrans);
×
839
      TAOS_RETURN(code);
×
840
    }
841
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
3!
842
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
843
      mndTransDrop(pTrans);
×
844
      TAOS_RETURN(code);
×
845
    }
846
    mInfo("compact:%d, add drop compact action", pCompact->compactId);
3!
847
  }
848

849
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
3!
850
    mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
×
851
    mndTransDrop(pTrans);
×
852
    TAOS_RETURN(code);
×
853
  }
854

855
  mndTransDrop(pTrans);
3✔
856
  return 0;
3✔
857
}
858

859
void mndCompactPullup(SMnode *pMnode) {
5,879✔
860
  int32_t code = 0;
5,879✔
861
  SSdb   *pSdb = pMnode->pSdb;
5,879✔
862
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
5,879✔
863
  if (pArray == NULL) return;
5,879!
864

865
  void *pIter = NULL;
5,879✔
866
  while (1) {
9✔
867
    SCompactObj *pCompact = NULL;
5,888✔
868
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
5,888✔
869
    if (pIter == NULL) break;
5,888✔
870
    if (taosArrayPush(pArray, &pCompact->compactId) == NULL) {
18!
871
      mError("failed to push compact id:%d into array, but continue pull up", pCompact->compactId);
×
872
    }
873
    sdbRelease(pSdb, pCompact);
9✔
874
  }
875

876
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
5,888✔
877
    mInfo("begin to pull up");
9!
878
    int32_t     *pCompactId = taosArrayGet(pArray, i);
9✔
879
    SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
9✔
880
    if (pCompact != NULL) {
9!
881
      mInfo("compact:%d, begin to pull up", pCompact->compactId);
9!
882
      mndCompactSendProgressReq(pMnode, pCompact);
9✔
883
      if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
9!
884
        mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
×
885
      }
886
      mndReleaseCompact(pMnode, pCompact);
9✔
887
    }
888
  }
889
  taosArrayDestroy(pArray);
5,879✔
890
}
891

892
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
5,879✔
893
  mTrace("start to process compact timer");
5,879✔
894
  mndCompactPullup(pReq->info.node);
5,879✔
895
  return 0;
5,879✔
896
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc