• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.16
/source/dnode/mnode/impl/src/mndCompact.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "mndCompact.h"
16
#include "audit.h"
17
#include "mndCompactDetail.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndPrivilege.h"
21
#include "mndShow.h"
22
#include "mndTrans.h"
23
#include "mndVgroup.h"
24
#include "tmisce.h"
25
#include "tmsgcb.h"
26

27
#define MND_COMPACT_VER_NUMBER 1
28
#define MND_COMPACT_ID_LEN     11
29

30
static int32_t mndProcessCompactTimer(SRpcMsg *pReq);
31

32
int32_t mndInitCompact(SMnode *pMnode) {
716✔
33
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact);
716✔
34
  mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq);
716✔
35
  mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp);
716✔
36
  mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer);
716✔
37
  mndSetMsgHandle(pMnode, TDMT_VND_KILL_COMPACT_RSP, mndTransProcessRsp);
716✔
38

39
  SSdbTable table = {
716✔
40
      .sdbType = SDB_COMPACT,
41
      .keyType = SDB_KEY_INT32,
42
      .encodeFp = (SdbEncodeFp)mndCompactActionEncode,
43
      .decodeFp = (SdbDecodeFp)mndCompactActionDecode,
44
      .insertFp = (SdbInsertFp)mndCompactActionInsert,
45
      .updateFp = (SdbUpdateFp)mndCompactActionUpdate,
46
      .deleteFp = (SdbDeleteFp)mndCompactActionDelete,
47
  };
48

49
  return sdbSetTable(pMnode->pSdb, table);
716✔
50
}
51

52
void mndCleanupCompact(SMnode *pMnode) { mDebug("mnd compact cleanup"); }
715✔
53

UNCOV
54
void tFreeCompactObj(SCompactObj *pCompact) {}
×
55

UNCOV
56
int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) {
×
UNCOV
57
  SEncoder encoder = {0};
×
UNCOV
58
  int32_t  code = 0;
×
59
  int32_t  lino;
60
  int32_t  tlen;
UNCOV
61
  tEncoderInit(&encoder, buf, bufLen);
×
62

UNCOV
63
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
UNCOV
64
  TAOS_CHECK_EXIT(tEncodeI32(&encoder, pObj->compactId));
×
UNCOV
65
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbname));
×
UNCOV
66
  TAOS_CHECK_EXIT(tEncodeI64(&encoder, pObj->startTime));
×
67

UNCOV
68
  tEndEncode(&encoder);
×
69

UNCOV
70
_exit:
×
UNCOV
71
  if (code) {
×
72
    tlen = code;
×
73
  } else {
UNCOV
74
    tlen = encoder.pos;
×
75
  }
UNCOV
76
  tEncoderClear(&encoder);
×
UNCOV
77
  return tlen;
×
78
}
79

UNCOV
80
int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) {
×
UNCOV
81
  int32_t  code = 0;
×
82
  int32_t  lino;
UNCOV
83
  SDecoder decoder = {0};
×
UNCOV
84
  tDecoderInit(&decoder, buf, bufLen);
×
85

UNCOV
86
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
UNCOV
87
  TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pObj->compactId));
×
UNCOV
88
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbname));
×
UNCOV
89
  TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pObj->startTime));
×
90

UNCOV
91
  tEndDecode(&decoder);
×
92

UNCOV
93
_exit:
×
UNCOV
94
  tDecoderClear(&decoder);
×
UNCOV
95
  return code;
×
96
}
97

UNCOV
98
SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) {
×
UNCOV
99
  int32_t code = 0;
×
UNCOV
100
  int32_t lino = 0;
×
UNCOV
101
  terrno = TSDB_CODE_SUCCESS;
×
102

UNCOV
103
  void    *buf = NULL;
×
UNCOV
104
  SSdbRaw *pRaw = NULL;
×
105

UNCOV
106
  int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact);
×
UNCOV
107
  if (tlen < 0) {
×
108
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
109
    goto OVER;
×
110
  }
111

UNCOV
112
  int32_t size = sizeof(int32_t) + tlen;
×
UNCOV
113
  pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size);
×
UNCOV
114
  if (pRaw == NULL) {
×
115
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
116
    goto OVER;
×
117
  }
118

UNCOV
119
  buf = taosMemoryMalloc(tlen);
×
UNCOV
120
  if (buf == NULL) {
×
121
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
122
    goto OVER;
×
123
  }
124

UNCOV
125
  tlen = tSerializeSCompactObj(buf, tlen, pCompact);
×
UNCOV
126
  if (tlen < 0) {
×
127
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
128
    goto OVER;
×
129
  }
130

UNCOV
131
  int32_t dataPos = 0;
×
UNCOV
132
  SDB_SET_INT32(pRaw, dataPos, tlen, OVER);
×
UNCOV
133
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
UNCOV
134
  SDB_SET_DATALEN(pRaw, dataPos, OVER);
×
135

UNCOV
136
OVER:
×
UNCOV
137
  taosMemoryFreeClear(buf);
×
UNCOV
138
  if (terrno != TSDB_CODE_SUCCESS) {
×
139
    mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
140
    sdbFreeRaw(pRaw);
×
141
    return NULL;
×
142
  }
143

UNCOV
144
  mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
×
UNCOV
145
  return pRaw;
×
146
}
147

UNCOV
148
SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) {
×
UNCOV
149
  int32_t      code = 0;
×
UNCOV
150
  int32_t      lino = 0;
×
UNCOV
151
  SSdbRow     *pRow = NULL;
×
UNCOV
152
  SCompactObj *pCompact = NULL;
×
UNCOV
153
  void        *buf = NULL;
×
UNCOV
154
  terrno = TSDB_CODE_SUCCESS;
×
155

UNCOV
156
  int8_t sver = 0;
×
UNCOV
157
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
158
    goto OVER;
×
159
  }
160

UNCOV
161
  if (sver != MND_COMPACT_VER_NUMBER) {
×
162
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
163
    mError("compact read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER);
×
164
    goto OVER;
×
165
  }
166

UNCOV
167
  pRow = sdbAllocRow(sizeof(SCompactObj));
×
UNCOV
168
  if (pRow == NULL) {
×
169
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
170
    goto OVER;
×
171
  }
172

UNCOV
173
  pCompact = sdbGetRowObj(pRow);
×
UNCOV
174
  if (pCompact == NULL) {
×
175
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
176
    goto OVER;
×
177
  }
178

179
  int32_t tlen;
UNCOV
180
  int32_t dataPos = 0;
×
UNCOV
181
  SDB_GET_INT32(pRaw, dataPos, &tlen, OVER);
×
UNCOV
182
  buf = taosMemoryMalloc(tlen + 1);
×
UNCOV
183
  if (buf == NULL) {
×
184
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
185
    goto OVER;
×
186
  }
UNCOV
187
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER);
×
188

UNCOV
189
  if ((terrno = tDeserializeSCompactObj(buf, tlen, pCompact)) < 0) {
×
190
    goto OVER;
×
191
  }
192

UNCOV
193
OVER:
×
UNCOV
194
  taosMemoryFreeClear(buf);
×
UNCOV
195
  if (terrno != TSDB_CODE_SUCCESS) {
×
196
    mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr());
×
197
    taosMemoryFreeClear(pRow);
×
198
    return NULL;
×
199
  }
200

UNCOV
201
  mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact);
×
UNCOV
202
  return pRow;
×
203
}
204

UNCOV
205
int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) {
×
UNCOV
206
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
×
UNCOV
207
  return 0;
×
208
}
209

UNCOV
210
int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) {
×
UNCOV
211
  mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId);
×
UNCOV
212
  tFreeCompactObj(pCompact);
×
UNCOV
213
  return 0;
×
214
}
215

216
int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) {
×
217
  mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", pOldCompact->compactId, pOldCompact,
×
218
         pNewCompact);
219

220
  return 0;
×
221
}
222

UNCOV
223
SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) {
×
UNCOV
224
  SSdb        *pSdb = pMnode->pSdb;
×
UNCOV
225
  SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId);
×
UNCOV
226
  if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
227
    terrno = TSDB_CODE_SUCCESS;
×
228
  }
UNCOV
229
  return pCompact;
×
230
}
231

UNCOV
232
void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) {
×
UNCOV
233
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
234
  sdbRelease(pSdb, pCompact);
×
UNCOV
235
  pCompact = NULL;
×
UNCOV
236
}
×
237

UNCOV
238
int32_t mndCompactGetDbName(SMnode *pMnode, int32_t compactId, char *dbname, int32_t len) {
×
UNCOV
239
  int32_t      code = 0;
×
UNCOV
240
  SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
×
UNCOV
241
  if (pCompact == NULL) {
×
242
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
243
    if (terrno != 0) code = terrno;
×
244
    TAOS_RETURN(code);
×
245
  }
246

UNCOV
247
  (void)strncpy(dbname, pCompact->dbname, len);
×
UNCOV
248
  mndReleaseCompact(pMnode, pCompact);
×
UNCOV
249
  TAOS_RETURN(code);
×
250
}
251

252
// compact db
UNCOV
253
int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompact, SDbObj *pDb, SCompactDbRsp *rsp) {
×
UNCOV
254
  int32_t code = 0;
×
UNCOV
255
  pCompact->compactId = tGenIdPI32();
×
256

UNCOV
257
  (void)strcpy(pCompact->dbname, pDb->name);
×
258

UNCOV
259
  pCompact->startTime = taosGetTimestampMs();
×
260

UNCOV
261
  SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact);
×
UNCOV
262
  if (pVgRaw == NULL) {
×
263
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
264
    if (terrno != 0) code = terrno;
×
265
    TAOS_RETURN(code);
×
266
  }
UNCOV
267
  if ((code = mndTransAppendPrepareLog(pTrans, pVgRaw)) != 0) {
×
268
    sdbFreeRaw(pVgRaw);
×
269
    TAOS_RETURN(code);
×
270
  }
271

UNCOV
272
  if ((code = sdbSetRawStatus(pVgRaw, SDB_STATUS_READY)) != 0) {
×
273
    sdbFreeRaw(pVgRaw);
×
274
    TAOS_RETURN(code);
×
275
  }
276

UNCOV
277
  rsp->compactId = pCompact->compactId;
×
278

UNCOV
279
  return 0;
×
280
}
281

282
// retrieve compact
283
int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
284
  SMnode      *pMnode = pReq->info.node;
×
285
  SSdb        *pSdb = pMnode->pSdb;
×
286
  int32_t      numOfRows = 0;
×
287
  SCompactObj *pCompact = NULL;
×
288
  char        *sep = NULL;
×
289
  SDbObj      *pDb = NULL;
×
290
  int32_t      code = 0;
×
291
  int32_t      lino = 0;
×
292

293
  if (strlen(pShow->db) > 0) {
×
294
    sep = strchr(pShow->db, '.');
×
295
    if (sep &&
×
296
        ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) {
×
297
      sep++;
×
298
    } else {
299
      pDb = mndAcquireDb(pMnode, pShow->db);
×
300
      if (pDb == NULL) return terrno;
×
301
    }
302
  }
303

304
  while (numOfRows < rows) {
×
305
    pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact);
×
306
    if (pShow->pIter == NULL) break;
×
307

308
    SColumnInfoData *pColInfo;
309
    SName            n;
310
    int32_t          cols = 0;
×
311

312
    char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
×
313

314
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
315
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false), pCompact, &lino,
×
316
                        _OVER);
317

318
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
319
    if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
×
320
      SName name = {0};
×
321
      TAOS_CHECK_GOTO(tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB), &lino, _OVER);
×
322
      (void)tNameGetDbName(&name, varDataVal(tmpBuf));
×
323
    } else {
324
      (void)strncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
×
325
    }
326
    varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
×
327
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false), pCompact, &lino, _OVER);
×
328

329
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
330
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false), pCompact, &lino,
×
331
                        _OVER);
332

333
    numOfRows++;
×
334
    sdbRelease(pSdb, pCompact);
×
335
  }
336

337
_OVER:
×
338
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
×
339
  pShow->numOfRows += numOfRows;
×
340
  mndReleaseDb(pMnode, pDb);
×
341
  return numOfRows;
×
342
}
343

344
// kill compact
345
static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId,
×
346
                                    int32_t dnodeid) {
347
  SVKillCompactReq req = {0};
×
348
  req.compactId = compactId;
×
349
  req.vgId = pVgroup->vgId;
×
350
  req.dnodeId = dnodeid;
×
351

352
  mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId);
×
353
  int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req);
×
354
  if (contLen < 0) {
×
355
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
356
    return NULL;
×
357
  }
358
  contLen += sizeof(SMsgHead);
×
359

360
  void *pReq = taosMemoryMalloc(contLen);
×
361
  if (pReq == NULL) {
×
362
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
363
    return NULL;
×
364
  }
365

366
  SMsgHead *pHead = pReq;
×
367
  pHead->contLen = htonl(contLen);
×
368
  pHead->vgId = htonl(pVgroup->vgId);
×
369

370
  if ((contLen = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req)) < 0) {
×
371
    terrno = contLen;
×
372
    return NULL;
×
373
  }
374
  *pContLen = contLen;
×
375
  return pReq;
×
376
}
377

378
static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId,
×
379
                                       int32_t dnodeid) {
380
  int32_t      code = 0;
×
381
  STransAction action = {0};
×
382

383
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeid);
×
384
  if (pDnode == NULL) {
×
385
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
386
    if (terrno != 0) code = terrno;
×
387
    TAOS_RETURN(code);
×
388
  }
389
  action.epSet = mndGetDnodeEpset(pDnode);
×
390
  mndReleaseDnode(pMnode, pDnode);
×
391

392
  int32_t contLen = 0;
×
393
  void   *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId, dnodeid);
×
394
  if (pReq == NULL) {
×
395
    code = TSDB_CODE_SDB_OBJ_NOT_THERE;
×
396
    if (terrno != 0) code = terrno;
×
397
    TAOS_RETURN(code);
×
398
  }
399

400
  action.pCont = pReq;
×
401
  action.contLen = contLen;
×
402
  action.msgType = TDMT_VND_KILL_COMPACT;
×
403

404
  if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
×
405
    taosMemoryFree(pReq);
×
406
    TAOS_RETURN(code);
×
407
  }
408

409
  return 0;
×
410
}
411

412
static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) {
×
413
  int32_t code = 0;
×
414
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "kill-compact");
×
415
  if (pTrans == NULL) {
×
416
    mError("compact:%" PRId32 ", failed to drop since %s", pCompact->compactId, terrstr());
×
417
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
418
    if (terrno != 0) code = terrno;
×
419
    TAOS_RETURN(code);
×
420
  }
421
  mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId);
×
422

423
  mndTransSetDbName(pTrans, pCompact->dbname, NULL);
×
424

425
  SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
×
426
  if (pCommitRaw == NULL) {
×
427
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
428
    if (terrno != 0) code = terrno;
×
429
    mndTransDrop(pTrans);
×
430
    TAOS_RETURN(code);
×
431
  }
432
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
433
    mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
×
434
    mndTransDrop(pTrans);
×
435
    TAOS_RETURN(code);
×
436
  }
437
  if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
×
438
    mndTransDrop(pTrans);
×
439
    TAOS_RETURN(code);
×
440
  }
441

442
  void *pIter = NULL;
×
443
  while (1) {
×
444
    SCompactDetailObj *pDetail = NULL;
×
445
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
446
    if (pIter == NULL) break;
×
447

448
    if (pDetail->compactId == pCompact->compactId) {
×
449
      SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId);
×
450
      if (pVgroup == NULL) {
×
451
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
452
        sdbCancelFetch(pMnode->pSdb, pIter);
×
453
        sdbRelease(pMnode->pSdb, pDetail);
×
454
        mndTransDrop(pTrans);
×
455
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
456
        if (terrno != 0) code = terrno;
×
457
        TAOS_RETURN(code);
×
458
      }
459

460
      if ((code = mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId, pDetail->dnodeId)) != 0) {
×
461
        mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr());
×
462
        sdbCancelFetch(pMnode->pSdb, pIter);
×
463
        sdbRelease(pMnode->pSdb, pDetail);
×
464
        mndTransDrop(pTrans);
×
465
        TAOS_RETURN(code);
×
466
      }
467

468
      mndReleaseVgroup(pMnode, pVgroup);
×
469

470
      /*
471
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
472
      if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
473
        mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
474
        mndTransDrop(pTrans);
475
        return -1;
476
      }
477
      sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
478
      */
479
    }
480

481
    sdbRelease(pMnode->pSdb, pDetail);
×
482
  }
483

484
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
485
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
×
486
    mndTransDrop(pTrans);
×
487
    TAOS_RETURN(code);
×
488
  }
489

490
  mndTransDrop(pTrans);
×
491
  return 0;
×
492
}
493

494
int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
×
495
  int32_t         code = 0;
×
496
  int32_t         lino = 0;
×
497
  SKillCompactReq killCompactReq = {0};
×
498

499
  if ((code = tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq)) != 0) {
×
500
    TAOS_RETURN(code);
×
501
  }
502

503
  mInfo("start to kill compact:%" PRId32, killCompactReq.compactId);
×
504

505
  SMnode      *pMnode = pReq->info.node;
×
506
  SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId);
×
507
  if (pCompact == NULL) {
×
508
    code = TSDB_CODE_MND_INVALID_COMPACT_ID;
×
509
    tFreeSKillCompactReq(&killCompactReq);
×
510
    TAOS_RETURN(code);
×
511
  }
512

513
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB), &lino, _OVER);
×
514

515
  TAOS_CHECK_GOTO(mndKillCompact(pMnode, pReq, pCompact), &lino, _OVER);
×
516

517
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
518

519
  char obj[TSDB_INT32_ID_LEN] = {0};
×
520
  (void)sprintf(obj, "%d", pCompact->compactId);
×
521

522
  auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql, killCompactReq.sqlLen);
×
523

524
_OVER:
×
525
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
526
    mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr());
×
527
  }
528

529
  tFreeSKillCompactReq(&killCompactReq);
×
530
  mndReleaseCompact(pMnode, pCompact);
×
531

532
  TAOS_RETURN(code);
×
533
}
534

535
// update progress
UNCOV
536
static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId,
×
537
                                        SQueryCompactProgressRsp *rsp) {
UNCOV
538
  int32_t code = 0;
×
539

UNCOV
540
  void *pIter = NULL;
×
UNCOV
541
  while (1) {
×
UNCOV
542
    SCompactDetailObj *pDetail = NULL;
×
UNCOV
543
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
UNCOV
544
    if (pIter == NULL) break;
×
545

UNCOV
546
    if (pDetail->compactId == compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) {
×
UNCOV
547
      pDetail->newNumberFileset = rsp->numberFileset;
×
UNCOV
548
      pDetail->newFinished = rsp->finished;
×
549

UNCOV
550
      sdbCancelFetch(pMnode->pSdb, pIter);
×
UNCOV
551
      sdbRelease(pMnode->pSdb, pDetail);
×
552

UNCOV
553
      TAOS_RETURN(code);
×
554
    }
555

UNCOV
556
    sdbRelease(pMnode->pSdb, pDetail);
×
557
  }
558

559
  return TSDB_CODE_MND_COMPACT_DETAIL_NOT_EXIST;
×
560
}
561

UNCOV
562
int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq) {
×
UNCOV
563
  int32_t                  code = 0;
×
UNCOV
564
  SQueryCompactProgressRsp req = {0};
×
UNCOV
565
  if (pReq->code != 0) {
×
566
    mError("received wrong compact response, req code is %s", tstrerror(pReq->code));
×
567
    TAOS_RETURN(pReq->code);
×
568
  }
UNCOV
569
  code = tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req);
×
UNCOV
570
  if (code != 0) {
×
571
    mError("failed to deserialize vnode-query-compact-progress-rsp, ret:%d, pCont:%p, len:%d", code, pReq->pCont,
×
572
           pReq->contLen);
573
    TAOS_RETURN(code);
×
574
  }
575

UNCOV
576
  mDebug("compact:%d, receive query response, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
577
         req.vgId, req.dnodeId, req.numberFileset, req.finished);
578

UNCOV
579
  SMnode *pMnode = pReq->info.node;
×
580

UNCOV
581
  code = mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req);
×
UNCOV
582
  if (code != 0) {
×
583
    mError("compact:%d, failed to update progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d", req.compactId,
×
584
           req.vgId, req.dnodeId, req.numberFileset, req.finished);
585
    TAOS_RETURN(code);
×
586
  }
587

UNCOV
588
  TAOS_RETURN(code);
×
589
}
590

591
// timer
UNCOV
592
void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
×
UNCOV
593
  void *pIter = NULL;
×
594

UNCOV
595
  while (1) {
×
UNCOV
596
    SCompactDetailObj *pDetail = NULL;
×
UNCOV
597
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
UNCOV
598
    if (pIter == NULL) break;
×
599

UNCOV
600
    if (pDetail->compactId == pCompact->compactId) {
×
UNCOV
601
      SEpSet epSet = {0};
×
602

UNCOV
603
      SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDetail->dnodeId);
×
UNCOV
604
      if (pDnode == NULL) break;
×
UNCOV
605
      if (addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port) != 0) {
×
606
        sdbRelease(pMnode->pSdb, pDetail);
×
607
        continue;
×
608
      }
UNCOV
609
      mndReleaseDnode(pMnode, pDnode);
×
610

611
      SQueryCompactProgressReq req;
UNCOV
612
      req.compactId = pDetail->compactId;
×
UNCOV
613
      req.vgId = pDetail->vgId;
×
UNCOV
614
      req.dnodeId = pDetail->dnodeId;
×
615

UNCOV
616
      int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
×
UNCOV
617
      if (contLen < 0) {
×
618
        sdbRelease(pMnode->pSdb, pDetail);
×
619
        continue;
×
620
      }
621

UNCOV
622
      contLen += sizeof(SMsgHead);
×
623

UNCOV
624
      SMsgHead *pHead = rpcMallocCont(contLen);
×
UNCOV
625
      if (pHead == NULL) {
×
626
        sdbRelease(pMnode->pSdb, pDetail);
×
627
        continue;
×
628
      }
629

UNCOV
630
      pHead->contLen = htonl(contLen);
×
UNCOV
631
      pHead->vgId = htonl(pDetail->vgId);
×
632

UNCOV
633
      if (tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req) <= 0) {
×
634
        sdbRelease(pMnode->pSdb, pDetail);
×
635
        continue;
×
636
      }
637

UNCOV
638
      SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
×
639

UNCOV
640
      rpcMsg.pCont = pHead;
×
641

UNCOV
642
      char    detail[1024] = {0};
×
UNCOV
643
      int32_t len = tsnprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d",
×
UNCOV
644
                             TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), epSet.numOfEps, epSet.inUse);
×
UNCOV
645
      for (int32_t i = 0; i < epSet.numOfEps; ++i) {
×
UNCOV
646
        len += tsnprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
×
647
      }
648

UNCOV
649
      mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
×
650

UNCOV
651
      if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
×
652
        sdbRelease(pMnode->pSdb, pDetail);
×
653
        continue;
×
654
      }
655
    }
656

UNCOV
657
    sdbRelease(pMnode->pSdb, pDetail);
×
658
  }
UNCOV
659
}
×
660

UNCOV
661
static int32_t mndSaveCompactProgress(SMnode *pMnode, int32_t compactId) {
×
UNCOV
662
  int32_t code = 0;
×
UNCOV
663
  bool    needSave = false;
×
UNCOV
664
  void   *pIter = NULL;
×
UNCOV
665
  while (1) {
×
UNCOV
666
    SCompactDetailObj *pDetail = NULL;
×
UNCOV
667
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
UNCOV
668
    if (pIter == NULL) break;
×
669

UNCOV
670
    if (pDetail->compactId == compactId) {
×
UNCOV
671
      mDebug(
×
672
          "compact:%d, check save progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
673
          "newNumberFileset:%d, newFinished:%d",
674
          pDetail->compactId, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
675
          pDetail->newNumberFileset, pDetail->newFinished);
676

677
      // these 2 number will jump back after dnode restart, so < is not used here
UNCOV
678
      if (pDetail->numberFileset != pDetail->newNumberFileset || pDetail->finished != pDetail->newFinished)
×
679
        needSave = true;
×
680
    }
681

UNCOV
682
    sdbRelease(pMnode->pSdb, pDetail);
×
683
  }
684

UNCOV
685
  char dbname[TSDB_TABLE_FNAME_LEN] = {0};
×
UNCOV
686
  TAOS_CHECK_RETURN(mndCompactGetDbName(pMnode, compactId, dbname, TSDB_TABLE_FNAME_LEN));
×
687

UNCOV
688
  if (!mndDbIsExist(pMnode, dbname)) {
×
689
    needSave = true;
×
690
    mWarn("compact:%" PRId32 ", no db exist, set needSave:%s", compactId, dbname);
×
691
  }
692

UNCOV
693
  if (!needSave) {
×
UNCOV
694
    mDebug("compact:%" PRId32 ", no need to save", compactId);
×
UNCOV
695
    TAOS_RETURN(code);
×
696
  }
697

698
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, NULL, "update-compact-progress");
×
699
  if (pTrans == NULL) {
×
700
    mError("trans:%" PRId32 ", failed to create since %s", pTrans->id, terrstr());
×
701
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
702
    if (terrno != 0) code = terrno;
×
703
    TAOS_RETURN(code);
×
704
  }
705
  mInfo("compact:%d, trans:%d, used to update compact progress.", compactId, pTrans->id);
×
706

707
  mndTransSetDbName(pTrans, dbname, NULL);
×
708

709
  pIter = NULL;
×
710
  while (1) {
×
711
    SCompactDetailObj *pDetail = NULL;
×
712
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
713
    if (pIter == NULL) break;
×
714

715
    if (pDetail->compactId == compactId) {
×
716
      mInfo(
×
717
          "compact:%d, trans:%d, check compact progress, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d, "
718
          "newNumberFileset:%d, newFinished:%d",
719
          pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished,
720
          pDetail->newNumberFileset, pDetail->newFinished);
721

722
      pDetail->numberFileset = pDetail->newNumberFileset;
×
723
      pDetail->finished = pDetail->newFinished;
×
724

725
      SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
×
726
      if (pCommitRaw == NULL) {
×
727
        sdbCancelFetch(pMnode->pSdb, pIter);
×
728
        sdbRelease(pMnode->pSdb, pDetail);
×
729
        mndTransDrop(pTrans);
×
730
        code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
731
        if (terrno != 0) code = terrno;
×
732
        TAOS_RETURN(code);
×
733
      }
734
      if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
735
        mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id, terrstr());
×
736
        sdbCancelFetch(pMnode->pSdb, pIter);
×
737
        sdbRelease(pMnode->pSdb, pDetail);
×
738
        mndTransDrop(pTrans);
×
739
        TAOS_RETURN(code);
×
740
      }
741
      if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) {
×
742
        sdbCancelFetch(pMnode->pSdb, pIter);
×
743
        sdbRelease(pMnode->pSdb, pDetail);
×
744
        mndTransDrop(pTrans);
×
745
        TAOS_RETURN(code);
×
746
      }
747
    }
748

749
    sdbRelease(pMnode->pSdb, pDetail);
×
750
  }
751

752
  bool allFinished = true;
×
753
  pIter = NULL;
×
754
  while (1) {
×
755
    SCompactDetailObj *pDetail = NULL;
×
756
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
757
    if (pIter == NULL) break;
×
758

759
    if (pDetail->compactId == compactId) {
×
760
      mInfo("compact:%d, trans:%d, check compact finished, vgId:%d, dnodeId:%d, numberFileset:%d, finished:%d",
×
761
            pDetail->compactId, pTrans->id, pDetail->vgId, pDetail->dnodeId, pDetail->numberFileset, pDetail->finished);
762

763
      if (pDetail->numberFileset == -1 && pDetail->finished == -1) {
×
764
        allFinished = false;
×
765
        sdbCancelFetch(pMnode->pSdb, pIter);
×
766
        sdbRelease(pMnode->pSdb, pDetail);
×
767
        break;
×
768
      }
769
      if (pDetail->numberFileset != -1 && pDetail->finished != -1 && pDetail->numberFileset != pDetail->finished) {
×
770
        allFinished = false;
×
771
        sdbCancelFetch(pMnode->pSdb, pIter);
×
772
        sdbRelease(pMnode->pSdb, pDetail);
×
773
        break;
×
774
      }
775
    }
776

777
    sdbRelease(pMnode->pSdb, pDetail);
×
778
  }
779

780
  if (!mndDbIsExist(pMnode, dbname)) {
×
781
    allFinished = true;
×
782
    mWarn("compact:%" PRId32 ", no db exist, set all finished:%s", compactId, dbname);
×
783
  }
784

785
  if (allFinished) {
×
786
    mInfo("compact:%d, all finished", compactId);
×
787
    pIter = NULL;
×
788
    while (1) {
×
789
      SCompactDetailObj *pDetail = NULL;
×
790
      pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail);
×
791
      if (pIter == NULL) break;
×
792

793
      if (pDetail->compactId == compactId) {
×
794
        SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail);
×
795
        if (pCommitRaw == NULL) {
×
796
          mndTransDrop(pTrans);
×
797
          code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
798
          if (terrno != 0) code = terrno;
×
799
          TAOS_RETURN(code);
×
800
        }
801
        if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
802
          mError("compact:%d, trans:%d, failed to append commit log since %s", pDetail->compactId, pTrans->id,
×
803
                 terrstr());
804
          sdbCancelFetch(pMnode->pSdb, pIter);
×
805
          sdbRelease(pMnode->pSdb, pDetail);
×
806
          mndTransDrop(pTrans);
×
807
          TAOS_RETURN(code);
×
808
        }
809
        if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
×
810
          sdbCancelFetch(pMnode->pSdb, pIter);
×
811
          sdbRelease(pMnode->pSdb, pDetail);
×
812
          mndTransDrop(pTrans);
×
813
          TAOS_RETURN(code);
×
814
        }
815
        mInfo("compact:%d, add drop compactdetail action", pDetail->compactDetailId);
×
816
      }
817

818
      sdbRelease(pMnode->pSdb, pDetail);
×
819
    }
820

821
    SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId);
×
822
    if (pCompact == NULL) {
×
823
      mndTransDrop(pTrans);
×
824
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
825
      if (terrno != 0) code = terrno;
×
826
      TAOS_RETURN(code);
×
827
    }
828
    SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact);
×
829
    mndReleaseCompact(pMnode, pCompact);
×
830
    if (pCommitRaw == NULL) {
×
831
      mndTransDrop(pTrans);
×
832
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
833
      if (terrno != 0) code = terrno;
×
834
      TAOS_RETURN(code);
×
835
    }
836
    if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
×
837
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
838
      mndTransDrop(pTrans);
×
839
      TAOS_RETURN(code);
×
840
    }
841
    if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) {
×
842
      mError("compact:%d, trans:%d, failed to append commit log since %s", compactId, pTrans->id, terrstr());
×
843
      mndTransDrop(pTrans);
×
844
      TAOS_RETURN(code);
×
845
    }
846
    mInfo("compact:%d, add drop compact action", pCompact->compactId);
×
847
  }
848

849
  if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
×
850
    mError("compact:%d, trans:%d, failed to prepare since %s", compactId, pTrans->id, terrstr());
×
851
    mndTransDrop(pTrans);
×
852
    TAOS_RETURN(code);
×
853
  }
854

855
  mndTransDrop(pTrans);
×
856
  return 0;
×
857
}
858

859
void mndCompactPullup(SMnode *pMnode) {
4,140✔
860
  int32_t code = 0;
4,140✔
861
  SSdb   *pSdb = pMnode->pSdb;
4,140✔
862
  SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t));
4,140✔
863
  if (pArray == NULL) return;
4,140!
864

865
  void *pIter = NULL;
4,140✔
UNCOV
866
  while (1) {
×
867
    SCompactObj *pCompact = NULL;
4,140✔
868
    pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
4,140✔
869
    if (pIter == NULL) break;
4,140!
UNCOV
870
    if (taosArrayPush(pArray, &pCompact->compactId) == NULL) {
×
871
      mError("failed to push compact id:%d into array, but continue pull up", pCompact->compactId);
×
872
    }
UNCOV
873
    sdbRelease(pSdb, pCompact);
×
874
  }
875

876
  for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
4,140!
UNCOV
877
    mInfo("begin to pull up");
×
UNCOV
878
    int32_t     *pCompactId = taosArrayGet(pArray, i);
×
UNCOV
879
    SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId);
×
UNCOV
880
    if (pCompact != NULL) {
×
UNCOV
881
      mInfo("compact:%d, begin to pull up", pCompact->compactId);
×
UNCOV
882
      mndCompactSendProgressReq(pMnode, pCompact);
×
UNCOV
883
      if ((code = mndSaveCompactProgress(pMnode, pCompact->compactId)) != 0) {
×
884
        mError("compact:%d, failed to save compact progress since %s", pCompact->compactId, tstrerror(code));
×
885
      }
UNCOV
886
      mndReleaseCompact(pMnode, pCompact);
×
887
    }
888
  }
889
  taosArrayDestroy(pArray);
4,140✔
890
}
891

892
static int32_t mndProcessCompactTimer(SRpcMsg *pReq) {
4,140✔
893
  mTrace("start to process compact timer");
4,140!
894
  mndCompactPullup(pReq->info.node);
4,140✔
895
  return 0;
4,140✔
896
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc