• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.58
/source/dnode/mnode/impl/src/mndIndex.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndIndex.h"
18
#include "mndDb.h"
19
#include "mndDnode.h"
20
#include "mndIndexComm.h"
21
#include "mndInfoSchema.h"
22
#include "mndMnode.h"
23
#include "mndPrivilege.h"
24
#include "mndScheduler.h"
25
#include "mndShow.h"
26
#include "mndStb.h"
27
#include "mndStream.h"
28
#include "mndTrans.h"
29
#include "mndUser.h"
30
#include "mndVgroup.h"
31
#include "parser.h"
32
#include "tname.h"
33

34
#define TSDB_IDX_VER_NUMBER   1
35
#define TSDB_IDX_RESERVE_SIZE 64
36

37
static SSdbRaw *mndIdxActionEncode(SIdxObj *pSma);
38
static SSdbRow *mndIdxActionDecode(SSdbRaw *pRaw);
39
static int32_t  mndIdxActionInsert(SSdb *pSdb, SIdxObj *pIdx);
40
static int32_t  mndIdxActionDelete(SSdb *pSdb, SIdxObj *pIdx);
41
static int32_t  mndIdxActionUpdate(SSdb *pSdb, SIdxObj *pOld, SIdxObj *pNew);
42
static int32_t  mndProcessCreateIdxReq(SRpcMsg *pReq);
43
// static int32_t  mndProcessDropIdxReq(SRpcMsg *pReq);
44
static int32_t mndProcessGetIdxReq(SRpcMsg *pReq);
45
static int32_t mndProcessGetTbIdxReq(SRpcMsg *pReq);
46
// static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
47
//  static void    mndCancelGetNextIdx(SMnode *pMnode, void *pIter);
48
static void mndDestroyIdxObj(SIdxObj *pIdxObj);
49

50
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb);
51

52
int32_t mndInitIdx(SMnode *pMnode) {
1,748✔
53
  SSdbTable table = {
1,748✔
54
      .sdbType = SDB_IDX,
55
      .keyType = SDB_KEY_BINARY,
56
      .encodeFp = (SdbEncodeFp)mndIdxActionEncode,
57
      .decodeFp = (SdbDecodeFp)mndIdxActionDecode,
58
      .insertFp = (SdbInsertFp)mndIdxActionInsert,
59
      .updateFp = (SdbUpdateFp)mndIdxActionUpdate,
60
      .deleteFp = (SdbDeleteFp)mndIdxActionDelete,
61
  };
62

63
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIdxReq);
1,748✔
64
  // mndSetMsgHandle(pMnode, TDMT_MND_DROP_INDEX, mndProcessDropIdxReq);
65

66
  mndSetMsgHandle(pMnode, TDMT_VND_CREATE_INDEX_RSP, mndTransProcessRsp);
1,748✔
67
  mndSetMsgHandle(pMnode, TDMT_VND_DROP_INDEX_RSP, mndTransProcessRsp);
1,748✔
68

69
  // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateIdxReq);
70
  // mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropIdxReq);
71
  // mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
72
  // mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
73
  // mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetIdxReq);
74
  // mndSetMsgHandle(pMnode, TDMT_MND_GET_TABLE_INDEX, mndProcessGetTbIdxReq);
75

76
  // type same with sma
77
  // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx);
78
  // mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelGetNextIdx);
79
  return sdbSetTable(pMnode->pSdb, table);
1,748✔
80
}
81

82
static int32_t mndFindSuperTableTagId(const SStbObj *pStb, const char *tagName, int8_t *hasIdx) {
2,015✔
83
  for (int32_t tag = 0; tag < pStb->numOfTags; tag++) {
11,005!
84
    if (taosStrcasecmp(pStb->pTags[tag].name, tagName) == 0) {
11,005✔
85
      if (IS_IDX_ON(&pStb->pTags[tag])) {
2,015✔
86
        *hasIdx = 1;
1,069✔
87
      }
88
      return tag;
2,015✔
89
    }
90
  }
91

92
  return TSDB_CODE_MND_TAG_NOT_EXIST;
×
93
}
94

95
int mndSetCreateIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) {
473✔
96
  int32_t code = 0;
473✔
97
  SSdb   *pSdb = pMnode->pSdb;
473✔
98
  SVgObj *pVgroup = NULL;
473✔
99
  void   *pIter = NULL;
473✔
100
  int32_t contLen;
101

102
  while (1) {
24,292✔
103
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
24,765✔
104
    if (pIter == NULL) break;
24,765✔
105
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
24,292✔
106
      sdbRelease(pSdb, pVgroup);
23,347✔
107
      continue;
23,347✔
108
    }
109

110
    void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0);
945✔
111
    if (pReq == NULL) {
945!
112
      sdbCancelFetch(pSdb, pIter);
×
113
      sdbRelease(pSdb, pVgroup);
×
114
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
115
      if (terrno != 0) code = terrno;
×
116
      TAOS_RETURN(code);
×
117
    }
118
    STransAction action = {0};
945✔
119
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
945✔
120
    action.pCont = pReq;
945✔
121
    action.contLen = contLen;
945✔
122
    action.msgType = TDMT_VND_CREATE_INDEX;
945✔
123
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
945!
124
      taosMemoryFree(pReq);
×
125
      sdbCancelFetch(pSdb, pIter);
×
126
      sdbRelease(pSdb, pVgroup);
×
127
      TAOS_RETURN(code);
×
128
    }
129
    sdbRelease(pSdb, pVgroup);
945✔
130
  }
131

132
  return 0;
473✔
133
}
134
static void *mndBuildDropIdxReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStbObj, SIdxObj *pIdx, int32_t *contLen) {
2,130✔
135
  int32_t len = 0;
2,130✔
136

137
  SDropIndexReq req = {0};
2,130✔
138
  memcpy(req.colName, pIdx->colName, sizeof(pIdx->colName));
2,130✔
139
  memcpy(req.stb, pIdx->stb, sizeof(pIdx->stb));
2,130✔
140
  req.dbUid = pIdx->dbUid;
2,130✔
141
  req.stbUid = pIdx->stbUid;
2,130✔
142

143
  mInfo("idx: %s start to build drop index req", pIdx->name);
2,130!
144

145
  len = tSerializeSDropIdxReq(NULL, 0, &req);
2,130✔
146
  if (len < 0) {
2,130!
147
    goto _err;
×
148
  }
149

150
  len += sizeof(SMsgHead);
2,130✔
151
  SMsgHead *pHead = taosMemoryCalloc(1, len);
2,130!
152
  if (pHead == NULL) {
2,130!
153
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
154
    goto _err;
×
155
  }
156

157
  pHead->contLen = htonl(len);
2,130✔
158
  pHead->vgId = htonl(pVgroup->vgId);
2,130✔
159

160
  void   *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
2,130✔
161
  int32_t ret = 0;
2,130✔
162
  if ((ret = tSerializeSDropIdxReq(pBuf, len - sizeof(SMsgHead), &req)) < 0) {
2,130!
163
    terrno = ret;
×
164
    return NULL;
×
165
  }
166
  *contLen = len;
2,130✔
167
  return pHead;
2,130✔
168
_err:
×
169

170
  return NULL;
×
171
}
172
int mndSetDropIdxRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) {
1,065✔
173
  int32_t code = 0;
1,065✔
174
  SSdb   *pSdb = pMnode->pSdb;
1,065✔
175
  SVgObj *pVgroup = NULL;
1,065✔
176
  void   *pIter = NULL;
1,065✔
177
  int32_t contLen;
178

179
  while (1) {
55,583✔
180
    pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
56,648✔
181
    if (pIter == NULL) break;
56,648✔
182
    if (!mndVgroupInDb(pVgroup, pDb->uid)) {
55,583✔
183
      sdbRelease(pSdb, pVgroup);
53,453✔
184
      continue;
53,453✔
185
    }
186

187
    int32_t len;
188
    void   *pReq = mndBuildDropIdxReq(pMnode, pVgroup, pStb, pIdx, &len);
2,130✔
189
    if (pReq == NULL) {
2,130!
190
      sdbCancelFetch(pSdb, pIter);
×
191
      sdbRelease(pSdb, pVgroup);
×
192
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
193
      if (terrno != 0) code = terrno;
×
194
      return code;
×
195
    }
196
    STransAction action = {0};
2,130✔
197
    action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2,130✔
198
    action.pCont = pReq;
2,130✔
199
    action.contLen = len;
2,130✔
200
    action.msgType = TDMT_VND_DROP_INDEX;
2,130✔
201
    if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) {
2,130!
202
      taosMemoryFree(pReq);
×
203
      sdbCancelFetch(pSdb, pIter);
×
204
      sdbRelease(pSdb, pVgroup);
×
205
      return code;
×
206
    }
207
    sdbRelease(pSdb, pVgroup);
2,130✔
208
  }
209

210
  TAOS_RETURN(code);
1,065✔
211
}
212

213
void mndCleanupIdx(SMnode *pMnode) {
1,747✔
214
  // do nothing
215
  return;
1,747✔
216
}
217

218
static SSdbRaw *mndIdxActionEncode(SIdxObj *pIdx) {
22,860✔
219
  int32_t code = 0;
22,860✔
220
  int32_t lino = 0;
22,860✔
221
  terrno = TSDB_CODE_OUT_OF_MEMORY;
22,860✔
222

223
  // int32_t size =
224
  //     sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_IDX_RESERVE_SIZE;
225
  int32_t size = sizeof(SIdxObj) + TSDB_IDX_RESERVE_SIZE;
22,860✔
226

227
  SSdbRaw *pRaw = sdbAllocRaw(SDB_IDX, TSDB_IDX_VER_NUMBER, size);
22,860✔
228
  if (pRaw == NULL) goto _OVER;
22,860!
229

230
  int32_t dataPos = 0;
22,860✔
231
  SDB_SET_BINARY(pRaw, dataPos, pIdx->name, TSDB_TABLE_FNAME_LEN, _OVER)
22,860!
232
  SDB_SET_BINARY(pRaw, dataPos, pIdx->stb, TSDB_TABLE_FNAME_LEN, _OVER)
22,860!
233
  SDB_SET_BINARY(pRaw, dataPos, pIdx->db, TSDB_DB_FNAME_LEN, _OVER)
22,860!
234
  SDB_SET_BINARY(pRaw, dataPos, pIdx->dstTbName, TSDB_DB_FNAME_LEN, _OVER)
22,860!
235
  SDB_SET_BINARY(pRaw, dataPos, pIdx->colName, TSDB_COL_NAME_LEN, _OVER)
22,860!
236
  SDB_SET_INT64(pRaw, dataPos, pIdx->createdTime, _OVER)
22,860!
237
  SDB_SET_INT64(pRaw, dataPos, pIdx->uid, _OVER)
22,860!
238
  SDB_SET_INT64(pRaw, dataPos, pIdx->stbUid, _OVER)
22,860!
239
  SDB_SET_INT64(pRaw, dataPos, pIdx->dbUid, _OVER)
22,860!
240

241
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_IDX_RESERVE_SIZE, _OVER)
22,860!
242
  SDB_SET_DATALEN(pRaw, dataPos, _OVER)
22,860!
243

244
  terrno = 0;
22,860✔
245

246
_OVER:
22,860✔
247
  if (terrno != 0) {
22,860!
248
    mError("idx:%s, failed to encode to raw:%p since %s", pIdx->name, pRaw, terrstr());
×
249
    sdbFreeRaw(pRaw);
×
250
    return NULL;
×
251
  }
252

253
  mTrace("idx:%s, encode to raw:%p, row:%p", pIdx->name, pRaw, pIdx);
22,860✔
254
  return pRaw;
22,860✔
255
}
256

257
static SSdbRow *mndIdxActionDecode(SSdbRaw *pRaw) {
16,631✔
258
  int32_t code = 0;
16,631✔
259
  int32_t lino = 0;
16,631✔
260
  terrno = TSDB_CODE_OUT_OF_MEMORY;
16,631✔
261
  SSdbRow *pRow = NULL;
16,631✔
262
  SIdxObj *pIdx = NULL;
16,631✔
263

264
  int8_t sver = 0;
16,631✔
265
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
16,631!
266

267
  if (sver != TSDB_IDX_VER_NUMBER) {
16,631!
268
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
269
    goto _OVER;
×
270
  }
271

272
  pRow = sdbAllocRow(sizeof(SIdxObj));
16,631✔
273
  if (pRow == NULL) goto _OVER;
16,631!
274

275
  pIdx = sdbGetRowObj(pRow);
16,631✔
276
  if (pIdx == NULL) goto _OVER;
16,631!
277

278
  int32_t dataPos = 0;
16,631✔
279

280
  SDB_GET_BINARY(pRaw, dataPos, pIdx->name, TSDB_TABLE_FNAME_LEN, _OVER)
16,631!
281
  SDB_GET_BINARY(pRaw, dataPos, pIdx->stb, TSDB_TABLE_FNAME_LEN, _OVER)
16,631!
282
  SDB_GET_BINARY(pRaw, dataPos, pIdx->db, TSDB_DB_FNAME_LEN, _OVER)
16,631!
283
  SDB_GET_BINARY(pRaw, dataPos, pIdx->dstTbName, TSDB_DB_FNAME_LEN, _OVER)
16,631!
284
  SDB_GET_BINARY(pRaw, dataPos, pIdx->colName, TSDB_COL_NAME_LEN, _OVER)
16,631!
285
  SDB_GET_INT64(pRaw, dataPos, &pIdx->createdTime, _OVER)
16,631!
286
  SDB_GET_INT64(pRaw, dataPos, &pIdx->uid, _OVER)
16,631!
287
  SDB_GET_INT64(pRaw, dataPos, &pIdx->stbUid, _OVER)
16,631!
288
  SDB_GET_INT64(pRaw, dataPos, &pIdx->dbUid, _OVER)
16,631!
289

290
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_IDX_RESERVE_SIZE, _OVER)
16,631!
291

292
  terrno = 0;
16,631✔
293

294
_OVER:
16,631✔
295
  if (terrno != 0) {
16,631!
296
    taosMemoryFree(pRow);
×
297
    return NULL;
×
298
  }
299

300
  mTrace("idx:%s, decode from raw:%p, row:%p", pIdx->name, pRaw, pIdx);
16,631✔
301
  return pRow;
16,631✔
302
}
303

304
static int32_t mndIdxActionInsert(SSdb *pSdb, SIdxObj *pIdx) {
10,316✔
305
  mTrace("idx:%s, perform insert action, row:%p", pIdx->name, pIdx);
10,316✔
306
  return 0;
10,316✔
307
}
308

309
static int32_t mndIdxActionDelete(SSdb *pSdb, SIdxObj *pIdx) {
16,631✔
310
  mTrace("idx:%s, perform delete action, row:%p", pIdx->name, pIdx);
16,631✔
311
  return 0;
16,631✔
312
}
313

314
static int32_t mndIdxActionUpdate(SSdb *pSdb, SIdxObj *pOld, SIdxObj *pNew) {
1,615✔
315
  // lock no not
316
  if (strncmp(pOld->colName, pNew->colName, TSDB_COL_NAME_LEN) != 0) {
1,615✔
317
    memcpy(pOld->colName, pNew->colName, sizeof(pNew->colName));
36✔
318
  }
319
  mTrace("idx:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
1,615!
320
  return 0;
1,615✔
321
}
322

323
SIdxObj *mndAcquireIdx(SMnode *pMnode, char *idxName) {
×
324
  SSdb    *pSdb = pMnode->pSdb;
×
325
  SIdxObj *pIdx = sdbAcquire(pSdb, SDB_IDX, idxName);
×
326
  if (pIdx == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
327
    terrno = TSDB_CODE_MND_TAG_INDEX_NOT_EXIST;
×
328
  }
329
  return pIdx;
×
330
}
331

332
void mndReleaseIdx(SMnode *pMnode, SIdxObj *pIdx) {
2,607✔
333
  SSdb *pSdb = pMnode->pSdb;
2,607✔
334
  sdbRelease(pSdb, pIdx);
2,607✔
335
}
2,607✔
336

337
SDbObj *mndAcquireDbByIdx(SMnode *pMnode, const char *idxName) {
1,065✔
338
  SName name = {0};
1,065✔
339
  if ((terrno = tNameFromString(&name, idxName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) < 0) return NULL;
1,065!
340

341
  char db[TSDB_TABLE_FNAME_LEN] = {0};
1,065✔
342
  (void)tNameGetFullDbName(&name, db);
1,065✔
343

344
  return mndAcquireDb(pMnode, db);
1,065✔
345
}
346

347
int32_t mndSetCreateIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
473✔
348
  int32_t  code = 0;
473✔
349
  SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
473✔
350
  if (pRedoRaw == NULL) {
473!
351
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
352
    if (terrno != 0) code = terrno;
×
353
    return -1;
×
354
  }
355
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
473!
356
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
473!
357

358
  TAOS_RETURN(code);
473✔
359
}
360

361
int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
7,574✔
362
  int32_t  code = 0;
7,574✔
363
  SSdbRaw *pCommitRaw = mndIdxActionEncode(pIdx);
7,574✔
364
  if (pCommitRaw == NULL) {
7,574!
365
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
366
    if (terrno != 0) code = terrno;
×
367
    return -1;
×
368
  }
369
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
7,574!
370
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
7,574!
371

372
  TAOS_RETURN(code);
7,574✔
373
}
374

375
int32_t mndSetAlterIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
32✔
376
  int32_t  code = 0;
32✔
377
  SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
32✔
378
  if (pRedoRaw == NULL) {
32!
379
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
380
    if (terrno != 0) code = terrno;
×
381
    return -1;
×
382
  }
383
  if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) {
32!
384
    sdbFreeRaw(pRedoRaw);
×
385
    return -1;
×
386
  }
387
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY));
32!
388

389
  TAOS_RETURN(code);
32✔
390
}
391

392
int32_t mndSetAlterIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
32✔
393
  int32_t  code = 0;
32✔
394
  SSdbRaw *pCommitRaw = mndIdxActionEncode(pIdx);
32✔
395
  if (pCommitRaw == NULL) {
32!
396
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
397
    if (terrno != 0) code = terrno;
×
398
    TAOS_RETURN(code);
×
399
  }
400
  if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
32!
401
    sdbFreeRaw(pCommitRaw);
×
402
    TAOS_RETURN(code);
×
403
  }
404
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
32!
405

406
  TAOS_RETURN(code);
32✔
407
}
408

409
static int32_t mndSetCreateIdxVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
410
  int32_t  code = 0;
×
411
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
412
  if (pVgRaw == NULL) {
×
413
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
414
    if (terrno != 0) code = terrno;
×
415
    TAOS_RETURN(code);
×
416
  }
417
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pVgRaw));
×
418
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING));
×
419
  TAOS_RETURN(code);
×
420
}
421

422
static int32_t mndSetCreateIdxVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
×
423
  int32_t  code = 0;
×
424
  SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
×
425
  if (pVgRaw == NULL) {
×
426
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
427
    if (terrno != 0) code = terrno;
×
428
    TAOS_RETURN(code);
×
429
  }
430
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pVgRaw));
×
431
  TAOS_CHECK_RETURN(sdbSetRawStatus(pVgRaw, SDB_STATUS_READY));
×
432
  TAOS_RETURN(code);
×
433
}
434

435
// static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
436
//   SStbObj stbObj = {0};
437
//   taosRLockLatch(&pStb->lock);
438
//   memcpy(&stbObj, pStb, sizeof(SStbObj));
439
//   taosRUnLockLatch(&pStb->lock);
440
//   stbObj.numOfColumns = 0;
441
//   stbObj.pColumns = NULL;
442
//   stbObj.numOfTags = 0;
443
//  stbObj.pTags = NULL;
444
//   stbObj.numOfFuncs = 0;
445
//   stbObj.pFuncs = NULL;
446
//   stbObj.updateTime = taosGetTimestampMs();
447
//   stbObj.lock = 0;
448
//   stbObj.tagVer++;
449

450
// SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj);
451
// if (pCommitRaw == NULL) return -1;
452
// if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
453
// if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
454
//
455
// return 0;
456
//}
457

458
static void mndDestroyIdxObj(SIdxObj *pIdxObj) {
×
459
  if (pIdxObj) {
460
    // do nothing
461
  }
462
}
×
463

464
static int32_t mndProcessCreateIdxReq(SRpcMsg *pReq) {
478✔
465
  SMnode  *pMnode = pReq->info.node;
478✔
466
  int32_t  code = -1;
478✔
467
  SStbObj *pStb = NULL;
478✔
468
  SIdxObj *pIdx = NULL;
478✔
469

470
  SDbObj            *pDb = NULL;
478✔
471
  SCreateTagIndexReq createReq = {0};
478✔
472

473
  TAOS_CHECK_GOTO(tDeserializeSCreateTagIdxReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
478!
474

475
  mInfo("idx:%s start to create", createReq.idxName);
478!
476
  // if (mndCheckCreateIdxReq(&createReq) != 0) {
477
  //   goto _OVER;
478
  // }
479

480
  pDb = mndAcquireDbByStb(pMnode, createReq.stbName);
478✔
481
  if (pDb == NULL) {
478!
482
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
483
    if (terrno != 0) code = terrno;
×
484
    goto _OVER;
×
485
  }
486

487
  pStb = mndAcquireStb(pMnode, createReq.stbName);
478✔
488
  if (pStb == NULL) {
478!
489
    mError("idx:%s, failed to create since stb:%s not exist", createReq.idxName, createReq.stbName);
×
490
    code = TSDB_CODE_MND_DB_NOT_EXIST;
×
491
    if (terrno != 0) code = terrno;
×
492
    goto _OVER;
×
493
  }
494
  SSIdx idx = {0};
478✔
495
  if ((code = mndAcquireGlobalIdx(pMnode, createReq.idxName, SDB_IDX, &idx)) == 0) {
478!
496
    pIdx = idx.pIdx;
478✔
497
  } else {
UNCOV
498
    goto _OVER;
×
499
  }
500
  if (pIdx != NULL) {
478✔
501
    code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
1✔
502
    goto _OVER;
1✔
503
  }
504

505
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
477!
506

507
  code = mndAddIndex(pMnode, pReq, &createReq, pDb, pStb);
477✔
508
  if (terrno == TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST || terrno == TSDB_CODE_MND_TAG_NOT_EXIST) {
477!
509
    return terrno;
4✔
510
  } else {
511
    if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
473!
512
  }
513

514
_OVER:
×
515
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
474!
516
    mError("stb:%s, failed to create since %s", createReq.idxName, tstrerror(code));
1!
517
  }
518

519
  mndReleaseStb(pMnode, pStb);
474✔
520
  mndReleaseIdx(pMnode, pIdx);
474✔
521
  mndReleaseDb(pMnode, pDb);
474✔
522

523
  TAOS_RETURN(code);
474✔
524
}
525

526
int32_t mndSetDropIdxPrepareLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
1,070✔
527
  int32_t  code = 0;
1,070✔
528
  SSdbRaw *pRedoRaw = mndIdxActionEncode(pIdx);
1,070✔
529
  if (pRedoRaw == NULL) {
1,070!
530
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
531
    if (terrno != 0) code = terrno;
×
532
    TAOS_RETURN(code);
×
533
  }
534
  TAOS_CHECK_RETURN(mndTransAppendPrepareLog(pTrans, pRedoRaw));
1,070!
535
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
1,070!
536

537
  TAOS_RETURN(code);
1,070✔
538
}
539

540
int32_t mndSetDropIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) {
4,682✔
541
  int32_t  code = 0;
4,682✔
542
  SSdbRaw *pCommitRaw = mndIdxActionEncode(pIdx);
4,682✔
543
  if (pCommitRaw == NULL) {
4,682!
544
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
545
    if (terrno != 0) code = terrno;
×
546
    TAOS_RETURN(code);
×
547
  }
548
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
4,682!
549
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
4,682!
550

551
  TAOS_RETURN(code);
4,682✔
552
}
553

554
static int32_t mndProcessGetTbIdxReq(SRpcMsg *pReq) {
×
555
  //
556
  return 0;
×
557
}
558

559
int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
6,750✔
560
  SMnode  *pMnode = pReq->info.node;
6,750✔
561
  SSdb    *pSdb = pMnode->pSdb;
6,750✔
562
  int32_t  numOfRows = 0;
6,750✔
563
  SIdxObj *pIdx = NULL;
6,750✔
564
  int32_t  cols = 0;
6,750✔
565
  int32_t  code = 0;
6,750✔
566
  int32_t  lino = 0;
6,750✔
567

568
  SDbObj *pDb = NULL;
6,750✔
569
  if (strlen(pShow->db) > 0) {
6,750✔
570
    pDb = mndAcquireDb(pMnode, pShow->db);
103✔
571
    if (pDb == NULL) return 0;
103!
572
  }
573
  SSmaAndTagIter *pIter = pShow->pIter;
6,750✔
574
  while (numOfRows < rows) {
247,591✔
575
    pIter->pIdxIter = sdbFetch(pSdb, SDB_IDX, pIter->pIdxIter, (void **)&pIdx);
247,570✔
576
    if (pIter->pIdxIter == NULL) break;
247,571✔
577

578
    if (NULL != pDb && pIdx->dbUid != pDb->uid) {
240,813✔
579
      sdbRelease(pSdb, pIdx);
2,350✔
580
      continue;
2,350✔
581
    }
582

583
    cols = 0;
238,463✔
584

585
    SName idxName = {0};
238,463✔
586
    if ((code = tNameFromString(&idxName, pIdx->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
238,463!
587
      sdbRelease(pSdb, pIdx);
×
588
      goto _OVER;
×
589
    }
590
    char n1[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
238,416✔
591

592
    STR_TO_VARSTR(n1, (char *)tNameGetTableName(&idxName));
238,416✔
593

594
    char n2[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
238,368✔
595
    STR_TO_VARSTR(n2, (char *)mndGetDbStr(pIdx->db));
238,368✔
596

597
    SName stbName = {0};
238,276✔
598
    if ((code = tNameFromString(&stbName, pIdx->stb, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)) != 0) {
238,276!
599
      sdbRelease(pSdb, pIdx);
×
600
      goto _OVER;
×
601
    }
602
    char n3[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
238,405✔
603
    STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
238,405✔
604

605
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
238,361✔
606
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)n1, false), pIdx, &lino, _OVER);
238,263!
607

608
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
238,150✔
609
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)n2, false), pIdx, &lino, _OVER);
238,114!
610

611
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
238,122✔
612
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)n3, false), pIdx, &lino, _OVER);
238,083!
613

614
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
238,111✔
615

616
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, NULL, true), pIdx, &lino, _OVER);
238,086!
617

618
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
238,043✔
619
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pIdx->createdTime, false), pIdx, &lino,
238,002!
620
                        _OVER);
621

622
    char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
237,984✔
623
    STR_TO_VARSTR(col, (char *)pIdx->colName);
237,984✔
624

625
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
237,984✔
626
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)col, false), pIdx, &lino, _OVER);
238,334!
627

628
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
238,156✔
629

630
    char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
238,105✔
631
    STR_TO_VARSTR(tag, (char *)"tag_index");
238,105✔
632
    RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)tag, false), pIdx, &lino, _OVER);
238,105!
633

634
    numOfRows++;
238,209✔
635
    sdbRelease(pSdb, pIdx);
238,209✔
636
  }
637

638
_OVER:
21✔
639
  if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
6,779!
640

641
  mndReleaseDb(pMnode, pDb);
6,779✔
642
  pShow->numOfRows += numOfRows;
6,758✔
643
  return numOfRows;
6,758✔
644
}
645

646
// static void mndCancelGetNextIdx(SMnode *pMnode, void *pIter) {
647
//   SSdb *pSdb = pMnode->pSdb;
648
//
649
//  sdbCancelFetch(pSdb, pIter);
650
//}
651
static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
×
652
  // impl
653
  return TSDB_CODE_SUCCESS;
×
654
}
655

656
static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pOld, SStbObj *pNew, char *tagName,
1,538✔
657
                                            int on) {
658
  int32_t code = 0;
1,538✔
659
  taosRLockLatch(&pOld->lock);
1,538✔
660
  memcpy(pNew, pOld, sizeof(SStbObj));
1,538✔
661
  taosRUnLockLatch(&pOld->lock);
1,538✔
662

663
  pNew->pTags = NULL;
1,538✔
664
  pNew->pColumns = NULL;
1,538✔
665
  pNew->pCmpr = NULL;
1,538✔
666
  pNew->pTags = NULL;
1,538✔
667
  pNew->pExtSchemas = NULL;
1,538✔
668
  pNew->updateTime = taosGetTimestampMs();
1,538✔
669
  pNew->lock = 0;
1,538✔
670

671
  int8_t  hasIdx = 0;
1,538✔
672
  int32_t tag = mndFindSuperTableTagId(pOld, tagName, &hasIdx);
1,538✔
673
  if (tag < 0) {
1,538!
674
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
×
UNCOV
675
    TAOS_RETURN(code);
×
676
  }
677
  TAOS_CHECK_RETURN(mndAllocStbSchemas(pOld, pNew));
1,538!
678
  SSchema *pTag = pNew->pTags + tag;
1,538✔
679

680
  if (on == 1) {
1,538✔
681
    if (hasIdx && tag != 0) {
473!
682
      code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
×
UNCOV
683
      TAOS_RETURN(code);
×
684
    } else {
685
      SSCHMEA_SET_IDX_ON(pTag);
473✔
686
    }
687
  } else {
688
    if (hasIdx == 0) {
1,065!
UNCOV
689
      code = TSDB_CODE_MND_SMA_NOT_EXIST;
×
690
    } else {
691
      SSCHMEA_SET_IDX_OFF(pTag);
1,065✔
692
      pTag->flags = 0;
1,065✔
693
    }
694
  }
695
  pNew->tagVer++;
1,538✔
696

697
  SSdbRaw *pCommitRaw = mndStbActionEncode(pNew);
1,538✔
698
  if (pCommitRaw == NULL) {
1,538!
699
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
700
    if (terrno != 0) code = terrno;
×
UNCOV
701
    TAOS_RETURN(code);
×
702
  }
703
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
1,538!
704
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
1,538!
705

706
  TAOS_RETURN(code);
1,538✔
707
}
708
int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, SIdxObj *pIdx) {
473✔
709
  // impl later
710
  int32_t code = -1;
473✔
711
  SStbObj newStb = {0};
473✔
712
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "create-stb-index");
473✔
713
  if (pTrans == NULL) goto _OVER;
473!
714

715
  // mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
716
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
473✔
717
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
473!
718

719
  mndTransSetSerial(pTrans);
473✔
720

721
  TAOS_CHECK_GOTO(mndSetCreateIdxPrepareLogs(pMnode, pTrans, pIdx), NULL, _OVER);
473!
722
  TAOS_CHECK_GOTO(mndSetCreateIdxCommitLogs(pMnode, pTrans, pIdx), NULL, _OVER);
473!
723

724
  TAOS_CHECK_GOTO(mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newStb, pIdx->colName, 1), NULL, _OVER);
473!
725
  TAOS_CHECK_GOTO(mndSetCreateIdxRedoActions(pMnode, pTrans, pDb, &newStb, pIdx), NULL, _OVER);
473!
726

727
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
473!
728

729
  code = 0;
473✔
730

731
_OVER:
473✔
732
  // mndDestoryIdxObj(pIdx);
733
  if (newStb.pTags != NULL) {
473!
734
    taosMemoryFree(newStb.pTags);
473!
735
    taosMemoryFree(newStb.pColumns);
473!
736
    taosMemoryFree(newStb.pCmpr);
473!
737
    taosMemoryFreeClear(newStb.pExtSchemas);
473!
738
  }
739
  mndTransDrop(pTrans);
473✔
740
  TAOS_RETURN(code);
473✔
741
}
742
int8_t mndCheckIndexNameByTagName(SMnode *pMnode, SIdxObj *pIdxObj) {
3✔
743
  // build index on first tag, and no index name;
744
  int8_t  exist = 0;
3✔
745
  SDbObj *pDb = NULL;
3✔
746
  if (strlen(pIdxObj->db) > 0) {
3!
747
    pDb = mndAcquireDb(pMnode, pIdxObj->db);
3✔
748
    if (pDb == NULL) return 0;
3!
749
  }
750
  SSmaAndTagIter *pIter = NULL;
3✔
751
  SIdxObj        *pIdx = NULL;
3✔
752
  SSdb           *pSdb = pMnode->pSdb;
3✔
753

754
  while (1) {
755
    pIter = sdbFetch(pSdb, SDB_IDX, pIter, (void **)&pIdx);
3✔
756
    if (pIter == NULL) break;
3!
757

758
    if (NULL != pDb && pIdx->dbUid != pDb->uid) {
3!
UNCOV
759
      sdbRelease(pSdb, pIdx);
×
UNCOV
760
      continue;
×
761
    }
762
    if (pIdxObj->stbUid != pIdx->stbUid) {
3!
UNCOV
763
      sdbRelease(pSdb, pIdx);
×
UNCOV
764
      continue;
×
765
    }
766
    if (strncmp(pIdxObj->colName, pIdx->colName, TSDB_COL_NAME_LEN) == 0) {
3!
767
      sdbCancelFetch(pSdb, pIter);
3✔
768
      sdbRelease(pSdb, pIdx);
3✔
769
      exist = 1;
3✔
770
      break;
3✔
771
    }
UNCOV
772
    sdbRelease(pSdb, pIdx);
×
773
  }
774

775
  mndReleaseDb(pMnode, pDb);
3✔
776
  return exist;
3✔
777
}
778
static int32_t mndAddIndex(SMnode *pMnode, SRpcMsg *pReq, SCreateTagIndexReq *req, SDbObj *pDb, SStbObj *pStb) {
477✔
779
  int32_t code = -1;
477✔
780
  SIdxObj idxObj = {0};
477✔
781
  memcpy(idxObj.name, req->idxName, TSDB_INDEX_FNAME_LEN);
477✔
782
  memcpy(idxObj.stb, pStb->name, TSDB_TABLE_FNAME_LEN);
477✔
783
  memcpy(idxObj.db, pDb->name, TSDB_DB_FNAME_LEN);
477✔
784
  memcpy(idxObj.colName, req->colName, TSDB_COL_NAME_LEN);
477✔
785

786
  idxObj.createdTime = taosGetTimestampMs();
477✔
787
  idxObj.uid = mndGenerateUid(req->idxName, strlen(req->idxName));
477✔
788
  idxObj.stbUid = pStb->uid;
477✔
789
  idxObj.dbUid = pStb->dbUid;
477✔
790

791
  int8_t  hasIdx = 0;
477✔
792
  int32_t tag = mndFindSuperTableTagId(pStb, req->colName, &hasIdx);
477✔
793
  if (tag < 0) {
477!
UNCOV
794
    code = TSDB_CODE_MND_TAG_NOT_EXIST;
×
UNCOV
795
    TAOS_RETURN(code);
×
796
  }
797
  int8_t exist = 0;
477✔
798
  if (tag == 0 && hasIdx == 1) {
477✔
799
    exist = mndCheckIndexNameByTagName(pMnode, &idxObj);
3✔
800
    if (exist) {
3!
801
      code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
3✔
802
      TAOS_RETURN(code);
3✔
803
    }
804
  } else if (hasIdx == 1) {
474✔
805
    code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST;
1✔
806
    TAOS_RETURN(code);
1✔
807
  }
808

809
  code = mndAddIndexImpl(pMnode, pReq, pDb, pStb, &idxObj);
473✔
810
  TAOS_RETURN(code);
473✔
811
}
812

813
static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *pIdx) {
1,065✔
814
  int32_t  code = -1;
1,065✔
815
  SStbObj *pStb = NULL;
1,065✔
816
  STrans  *pTrans = NULL;
1,065✔
817

818
  SStbObj newObj = {0};
1,065✔
819

820
  pStb = mndAcquireStb(pMnode, pIdx->stb);
1,065✔
821
  if (pStb == NULL) {
1,065!
822
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
823
    if (terrno != 0) code = terrno;
×
UNCOV
824
    goto _OVER;
×
825
  }
826

827
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-index");
1,065✔
828
  if (pTrans == NULL) {
1,065!
829
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
830
    if (terrno != 0) code = terrno;
×
UNCOV
831
    goto _OVER;
×
832
  }
833

834
  mInfo("trans:%d, used to drop idx:%s", pTrans->id, pIdx->name);
1,065!
835
  mndTransSetDbName(pTrans, pDb->name, pStb->name);
1,065✔
836
  TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
1,065!
837

838
  mndTransSetSerial(pTrans);
1,065✔
839
  TAOS_CHECK_GOTO(mndSetDropIdxPrepareLogs(pMnode, pTrans, pIdx), NULL, _OVER);
1,065!
840
  TAOS_CHECK_GOTO(mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx), NULL, _OVER);
1,065!
841

842
  TAOS_CHECK_GOTO(mndSetUpdateIdxStbCommitLogs(pMnode, pTrans, pStb, &newObj, pIdx->colName, 0), NULL, _OVER);
1,065!
843
  TAOS_CHECK_GOTO(mndSetDropIdxRedoActions(pMnode, pTrans, pDb, &newObj, pIdx), NULL, _OVER);
1,065!
844
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
1,065!
845

846
  code = 0;
1,065✔
847

848
_OVER:
1,065✔
849
  taosMemoryFree(newObj.pTags);
1,065!
850
  taosMemoryFree(newObj.pColumns);
1,065!
851
  taosMemoryFree(newObj.pCmpr);
1,065!
852
  taosMemoryFreeClear(newObj.pExtSchemas);
1,065!
853

854
  mndTransDrop(pTrans);
1,065✔
855
  mndReleaseStb(pMnode, pStb);
1,065✔
856
  TAOS_RETURN(code);
1,065✔
857
}
858
int32_t mndProcessDropTagIdxReq(SRpcMsg *pReq) {
1,068✔
859
  SMnode  *pMnode = pReq->info.node;
1,068✔
860
  int32_t  code = -1;
1,068✔
861
  SDbObj  *pDb = NULL;
1,068✔
862
  SIdxObj *pIdx = NULL;
1,068✔
863

864
  SDropTagIndexReq req = {0};
1,068✔
865
  TAOS_CHECK_GOTO(tDeserializeSDropTagIdxReq(pReq->pCont, pReq->contLen, &req), NULL, _OVER);
1,068!
866
  mInfo("idx:%s, start to drop", req.name);
1,068!
867
  SSIdx idx = {0};
1,068✔
868
  if ((code = mndAcquireGlobalIdx(pMnode, req.name, SDB_IDX, &idx)) == 0) {
1,068!
869
    pIdx = idx.pIdx;
1,068✔
870
  } else {
UNCOV
871
    goto _OVER;
×
872
  }
873
  if (pIdx == NULL) {
1,068✔
874
    if (req.igNotExists) {
3!
UNCOV
875
      mInfo("idx:%s, not exist, ignore not exist is set", req.name);
×
UNCOV
876
      code = 0;
×
UNCOV
877
      goto _OVER;
×
878
    } else {
879
      code = TSDB_CODE_MND_TAG_INDEX_NOT_EXIST;
3✔
880
      goto _OVER;
3✔
881
    }
882
  }
883

884
  pDb = mndAcquireDbByIdx(pMnode, req.name);
1,065✔
885
  if (pDb == NULL) {
1,065!
UNCOV
886
    terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
×
UNCOV
887
    if (terrno != 0) code = terrno;
×
UNCOV
888
    goto _OVER;
×
889
  }
890

891
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb), NULL, _OVER);
1,065!
892

893
  code = mndDropIdx(pMnode, pReq, pDb, pIdx);
1,065✔
894
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
1,065!
895

UNCOV
896
_OVER:
×
897
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,068!
898
    mError("idx:%s, failed to drop since %s", req.name, tstrerror(code));
3!
899
  }
900
  mndReleaseIdx(pMnode, pIdx);
1,068✔
901
  mndReleaseDb(pMnode, pDb);
1,068✔
902
  TAOS_RETURN(code);
1,068✔
903
}
UNCOV
904
static int32_t mndProcessGetIdxReq(SRpcMsg *pReq) {
×
905
  // do nothing
UNCOV
906
  return 0;
×
907
}
908

909
int32_t mndDropIdxsByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
1,043✔
910
  int32_t code = 0;
1,043✔
911
  SSdb   *pSdb = pMnode->pSdb;
1,043✔
912
  void   *pIter = NULL;
1,043✔
913

914
  while (1) {
9,550✔
915
    SIdxObj *pIdx = NULL;
10,593✔
916
    pIter = sdbFetch(pSdb, SDB_IDX, pIter, (void **)&pIdx);
10,593✔
917
    if (pIter == NULL) break;
10,593✔
918

919
    if (pIdx->stbUid == pStb->uid) {
9,550✔
920
      if ((code = mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx)) != 0) {
828!
UNCOV
921
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
922
        sdbRelease(pSdb, pIdx);
×
UNCOV
923
        TAOS_RETURN(code);
×
924
      }
925
    }
926

927
    sdbRelease(pSdb, pIdx);
9,550✔
928
  }
929

930
  TAOS_RETURN(code);
1,043✔
931
}
932

933
int32_t mndGetIdxsByTagName(SMnode *pMnode, SStbObj *pStb, char *tagName, SIdxObj *idx) {
914✔
934
  SSdb *pSdb = pMnode->pSdb;
914✔
935
  void *pIter = NULL;
914✔
936

937
  while (1) {
1,666✔
938
    SIdxObj *pIdx = NULL;
2,580✔
939
    pIter = sdbFetch(pSdb, SDB_IDX, pIter, (void **)&pIdx);
2,580✔
940
    if (pIter == NULL) break;
2,580✔
941

942
    if (pIdx->stbUid == pStb->uid && taosStrcasecmp(pIdx->colName, tagName) == 0) {
1,703✔
943
      memcpy((char *)idx, (char *)pIdx, sizeof(SIdxObj));
37✔
944
      sdbRelease(pSdb, pIdx);
37✔
945
      sdbCancelFetch(pSdb, pIter);
37✔
946
      return 0;
37✔
947
    }
948

949
    sdbRelease(pSdb, pIdx);
1,666✔
950
  }
951

952
  return -1;
877✔
953
}
954
int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
1,922✔
955
  int32_t code = 0;
1,922✔
956
  SSdb   *pSdb = pMnode->pSdb;
1,922✔
957
  void   *pIter = NULL;
1,922✔
958

959
  while (1) {
9,850✔
960
    SIdxObj *pIdx = NULL;
11,772✔
961
    pIter = sdbFetch(pSdb, SDB_IDX, pIter, (void **)&pIdx);
11,772✔
962
    if (pIter == NULL) break;
11,772✔
963

964
    if (pIdx->dbUid == pDb->uid) {
9,850✔
965
      if ((code = mndSetDropIdxCommitLogs(pMnode, pTrans, pIdx)) != 0) {
2,784!
UNCOV
966
        sdbRelease(pSdb, pIdx);
×
UNCOV
967
        sdbCancelFetch(pSdb, pIter);
×
UNCOV
968
        TAOS_RETURN(code);
×
969
      }
970
    }
971

972
    sdbRelease(pSdb, pIdx);
9,850✔
973
  }
974

975
  TAOS_RETURN(code);
1,922✔
976
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc