• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5082

17 May 2026 01:15AM UTC coverage: 73.328% (-0.04%) from 73.368%
#5082

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281429 of 383795 relevant lines covered (73.33%)

133921458.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.11
/source/dnode/mnode/impl/src/mndTopic.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *f
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndTopic.h"
17
#include "audit.h"
18
#include "mndConsumer.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndMnode.h"
22
#include "mndPrivilege.h"
23
#include "mndShow.h"
24
#include "mndStb.h"
25
#include "mndSubscribe.h"
26
#include "mndTrans.h"
27
#include "mndUser.h"
28
#include "mndVgroup.h"
29
#include "osMemPool.h"
30
#include "parser.h"
31
#include "tlockfree.h"
32
#include "tname.h"
33

34
#define MND_TOPIC_VER_SUPPORT_OWNER 4
35
#define MND_TOPIC_VER_NUMBER        MND_TOPIC_VER_SUPPORT_OWNER
36
#define MND_TOPIC_RESERVE_SIZE      64
37

38
SHashObj *topicsToReload = NULL;
39

40
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
41
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
42

43
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
44
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
45
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic);
46
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
47
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
48

49
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
50
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
51
static int32_t processAst(SMqTopicObj *topicObj, const char *ast);
52

53
int32_t mndInitTopic(SMnode *pMnode) {
527,025✔
54
  SSdbTable table = {
527,025✔
55
      .sdbType = SDB_TOPIC,
56
      .keyType = SDB_KEY_BINARY,
57
      .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
58
      .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
59
      .insertFp = (SdbInsertFp)mndTopicActionInsert,
60
      .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
61
      .deleteFp = (SdbDeleteFp)mndTopicActionDelete,
62
  };
63

64
  if (pMnode == NULL) {
527,025✔
65
    return TSDB_CODE_INVALID_PARA;
×
66
  }
67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CREATE_TOPIC, mndProcessCreateTopicReq);
527,025✔
68
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_TOPIC, mndProcessDropTopicReq);
527,025✔
69

70
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
527,025✔
71
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
527,025✔
72

73
  return sdbSetTable(pMnode->pSdb, table);
527,025✔
74
}
75

76
void mndCleanupTopic(SMnode *pMnode) {}
526,963✔
77

78
void mndTopicGetShowName(const char *fullTopic, char *topic) {
175,462✔
79
  if (fullTopic == NULL) {
175,462✔
80
    return;
×
81
  }
82
  char *tmp = strchr(fullTopic, '.');
175,462✔
83
  if (tmp == NULL) {
175,462✔
84
    tstrncpy(topic, fullTopic, TSDB_TOPIC_FNAME_LEN);
×
85
  } else {
86
    tstrncpy(topic, tmp + 1, TSDB_TOPIC_FNAME_LEN);
175,462✔
87
  }
88
}
89
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
360,568✔
90
  if (pTopic == NULL) {
360,568✔
91
    return NULL;
×
92
  }
93
  int32_t code = 0;
360,568✔
94
  int32_t lino = 0;
360,568✔
95
  terrno = TSDB_CODE_OUT_OF_MEMORY;
360,568✔
96

97
  void *  swBuf = NULL;
360,568✔
98
  int32_t physicalPlanLen = 0;
360,568✔
99
  if (pTopic->physicalPlan) {
360,568✔
100
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
360,568✔
101
  }
102

103
  int32_t schemaLen = 0;
360,568✔
104
  if (pTopic->schema.nCols) {
360,568✔
105
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
554,326✔
106
  }
107

108
  int32_t  size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
360,568✔
109
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
360,568✔
110
  if (pRaw == NULL) {
360,568✔
111
    goto TOPIC_ENCODE_OVER;
×
112
  }
113

114
  int32_t dataPos = 0;
360,568✔
115
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
360,568✔
116
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
360,568✔
117
  SDB_SET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_ENCODE_OVER);
360,568✔
118
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
360,568✔
119
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
360,568✔
120
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
360,568✔
121
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
360,568✔
122
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
360,568✔
123
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
360,568✔
124
  SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER);
360,568✔
125

126
  SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
360,568✔
127
  SDB_SET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER);
360,568✔
128
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
360,568✔
129
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
360,568✔
130
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
360,568✔
131
  if (physicalPlanLen) {
360,568✔
132
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
360,568✔
133
  }
134
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
360,568✔
135
  if (schemaLen) {
360,568✔
136
    swBuf = taosMemoryMalloc(schemaLen);
277,163✔
137
    if (swBuf == NULL) {
277,163✔
138
      goto TOPIC_ENCODE_OVER;
×
139
    }
140
    void *aswBuf = swBuf;
277,163✔
141
    if (taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema) < 0) {
554,326✔
142
      goto TOPIC_ENCODE_OVER;
×
143
    }
144
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
277,163✔
145
  }
146
  SDB_SET_INT64(pRaw, dataPos, pTopic->ownerId, TOPIC_ENCODE_OVER); // since ver 4
360,568✔
147
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
360,568✔
148
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
360,568✔
149

150
  terrno = TSDB_CODE_SUCCESS;
360,568✔
151

152
TOPIC_ENCODE_OVER:
360,568✔
153
  if (swBuf) taosMemoryFree(swBuf);
360,568✔
154
  if (terrno != TSDB_CODE_SUCCESS) {
360,568✔
155
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
×
156
    sdbFreeRaw(pRaw);
×
157
    return NULL;
×
158
  }
159

160
  mDebug("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
360,568✔
161
  return pRaw;
360,568✔
162
}
163

164
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
306,549✔
165
  if (pRaw == NULL) return NULL;
306,549✔
166
  int32_t code = 0;
306,549✔
167
  int32_t lino = 0;
306,549✔
168
  terrno = TSDB_CODE_OUT_OF_MEMORY;
306,549✔
169
  SSdbRow *    pRow = NULL;
306,549✔
170
  SMqTopicObj *pTopic = NULL;
306,549✔
171
  void *       buf = NULL;
306,549✔
172
  char*        ast = NULL;
306,549✔
173

174
  int8_t sver = 0;
306,549✔
175
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
306,549✔
176

177
  if (sver < 1 || sver > MND_TOPIC_VER_NUMBER) {
306,549✔
178
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
179
    goto TOPIC_DECODE_OVER;
×
180
  }
181

182
  pRow = sdbAllocRow(sizeof(SMqTopicObj));
306,549✔
183
  if (pRow == NULL) goto TOPIC_DECODE_OVER;
306,549✔
184

185
  pTopic = sdbGetRowObj(pRow);
306,549✔
186
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
306,549✔
187

188
  int32_t len = 0;
306,549✔
189
  int32_t dataPos = 0;
306,549✔
190
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
306,549✔
191
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
306,549✔
192
  if (sver >= 2) {
306,549✔
193
    SDB_GET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_DECODE_OVER);
306,549✔
194
  }
195
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
306,549✔
196
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
306,549✔
197
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
306,549✔
198
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
306,549✔
199
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
306,549✔
200
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
306,549✔
201
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
306,549✔
202

203
  SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
306,549✔
204
  if (sver >= 3) {
306,549✔
205
    SDB_GET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
306,549✔
206
  }
207
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
306,549✔
208
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
306,549✔
209
  if (pTopic->sql == NULL) {
306,549✔
210
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
211
    goto TOPIC_DECODE_OVER;
×
212
  }
213
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
306,549✔
214

215
  if (sver < MND_TOPIC_VER_SUPPORT_OWNER) {
306,549✔
216
    int32_t astLen = 0;
×
217
    SDB_GET_INT32(pRaw, dataPos, &astLen, TOPIC_DECODE_OVER);
×
218
    if (astLen) {
×
219
      ast = taosMemoryCalloc(astLen, sizeof(char));
×
220
      if (ast == NULL) {
×
221
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
222
        goto TOPIC_DECODE_OVER;
×
223
      }
224
      SDB_GET_BINARY(pRaw, dataPos, ast, astLen, TOPIC_DECODE_OVER);
×
225
      terrno = processAst(pTopic, ast);
×
226
      if (terrno != TSDB_CODE_SUCCESS) {
×
227
        goto TOPIC_DECODE_OVER;
×
228
      }
229
    }
230
  } else {
231
    SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
306,549✔
232
    if (len) {
306,549✔
233
      pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
306,549✔
234
      if (pTopic->physicalPlan == NULL) {
306,549✔
235
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
236
        goto TOPIC_DECODE_OVER;
×
237
      }
238
      SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
306,549✔
239
    } else {
240
      pTopic->physicalPlan = NULL;
×
241
    }
242

243
    SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
306,549✔
244
    if (len) {
306,549✔
245
      buf = taosMemoryMalloc(len);
237,998✔
246
      if (buf == NULL) {
237,998✔
247
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
248
        goto TOPIC_DECODE_OVER;
×
249
      }
250
      SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
237,998✔
251
      if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
475,996✔
252
        goto TOPIC_DECODE_OVER;
×
253
      }
254
    } else {
255
      pTopic->schema.nCols = 0;
68,551✔
256
      pTopic->schema.version = 0;
68,551✔
257
      pTopic->schema.pSchema = NULL;
68,551✔
258
    }
259
    SDB_GET_INT64(pRaw, dataPos, &pTopic->ownerId, TOPIC_DECODE_OVER);
306,549✔
260
  }
261

262
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
306,549✔
263
  terrno = TSDB_CODE_SUCCESS;
306,549✔
264

265
TOPIC_DECODE_OVER:
306,549✔
266
  taosMemoryFreeClear(buf);
306,549✔
267
  taosMemoryFreeClear(ast);
306,549✔
268

269
  if (terrno != TSDB_CODE_SUCCESS) {
306,549✔
270
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic == NULL ? "null" : pTopic->name, pRaw, terrstr());
×
271
    taosMemoryFreeClear(pRow);
×
272
    return NULL;
×
273
  }
274

275
  mDebug("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
306,549✔
276
  return pRow;
306,549✔
277
}
278

279
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
182,475✔
280
  mDebug("topic:%s perform insert action", pTopic != NULL ? pTopic->name : "null");
182,475✔
281
  return 0;
182,475✔
282
}
283

284
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
306,549✔
285
  if (pTopic == NULL) return 0;
306,549✔
286
  mDebug("%p topic:%s perform delete action", pTopic, pTopic->name);
306,549✔
287
  taosMemoryFreeClear(pTopic->sql);
306,549✔
288
  taosMemoryFreeClear(pTopic->physicalPlan);
306,549✔
289
  if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
306,549✔
290
  return 0;
306,549✔
291
}
292

293
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
2,485✔
294
  if (pOldTopic == NULL || pNewTopic == NULL) return 0;
2,485✔
295
  mDebug("topic:%s perform update action", pOldTopic->name);
2,485✔
296
  taosWLockLatch(&pOldTopic->lock);
2,485✔
297
  SMqTopicObj tmpTopic = *pOldTopic;
2,485✔
298
  (void)memcpy(pOldTopic, pNewTopic, offsetof(SMqTopicObj, lock));
2,485✔
299
  *pNewTopic = tmpTopic;
2,485✔
300
  taosWUnLockLatch(&pOldTopic->lock);
2,485✔
301
  return 0;
2,485✔
302
}
303

304
int32_t mndAcquireTopic(SMnode *pMnode, const char *topicName, SMqTopicObj **pTopic) {
2,827,622✔
305
  if (pMnode == NULL || topicName == NULL || pTopic == NULL) {
2,827,622✔
306
    return TSDB_CODE_INVALID_PARA;
×
307
  }
308
  SSdb *pSdb = pMnode->pSdb;
2,827,622✔
309
  *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
2,827,622✔
310
  if (*pTopic == NULL) {
2,827,622✔
311
    return TSDB_CODE_MND_TOPIC_NOT_EXIST;
208,701✔
312
  }
313
  return TSDB_CODE_SUCCESS;
2,618,921✔
314
}
315

316
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
3,087,745✔
317
  if (pMnode == NULL) return;
3,087,745✔
318
  SSdb *pSdb = pMnode->pSdb;
3,087,745✔
319
  sdbRelease(pSdb, pTopic);
3,087,745✔
320
}
321

322
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
162,609✔
323
  if (pCreate == NULL) return TSDB_CODE_INVALID_PARA;
162,609✔
324
  if (pCreate->sql == NULL) return TSDB_CODE_MND_INVALID_TOPIC;
162,609✔
325

326
  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
162,609✔
327
    if (pCreate->ast == NULL || pCreate->ast[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
117,796✔
328
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
44,813✔
329
    if (pCreate->subStbName[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
15,109✔
330
  } else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
29,704✔
331
    if (pCreate->subDbName[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
29,704✔
332
  }
333

334
  return 0;
162,609✔
335
}
336

337
static int32_t processAst(SMqTopicObj *topicObj, const char *ast) {
158,819✔
338
  SNode *     pAst = NULL;
158,819✔
339
  SQueryPlan *pPlan = NULL;
158,819✔
340
  int32_t     code = TSDB_CODE_SUCCESS;
158,819✔
341
  int32_t     lino = 0;
158,819✔
342

343
  PRINT_LOG_START
158,819✔
344
  if (ast == NULL) {
158,819✔
345
    topicObj->physicalPlan = taosStrdup("");
36,115✔
346
    goto END;
36,115✔
347
  }
348
  qDebugL("%s topic:%s ast %s", __func__, topicObj->name, ast);
122,704✔
349
  MND_TMQ_RETURN_CHECK(nodesStringToNode(ast, &pAst));
122,704✔
350
  MND_TMQ_RETURN_CHECK(qExtractResultSchema(pAst, &topicObj->schema.nCols, &topicObj->schema.pSchema));
122,704✔
351

352
  SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
122,704✔
353
  MND_TMQ_RETURN_CHECK(qCreateQueryPlan(&cxt, &pPlan, NULL));
122,704✔
354
  if (pPlan == NULL) {
122,704✔
355
    code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
356
    goto END;
×
357
  }
358
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
122,704✔
359
  if (levelNum != 1) {
122,704✔
360
    code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
361
    goto END;
×
362
  }
363

364
  SNodeListNode *pNodeListNode = (SNodeListNode *)nodesListGetNode(pPlan->pSubplans, 0);
122,704✔
365
  MND_TMQ_NULL_CHECK(pNodeListNode);
122,704✔
366
  int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
122,704✔
367
  if (opNum != 1) {
122,704✔
368
    code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
369
    goto END;
×
370
  }
371

372
  code = nodesNodeToString(nodesListGetNode(pNodeListNode->pNodeList, 0), false, &topicObj->physicalPlan, NULL);
122,704✔
373

374
END:
158,819✔
375
  nodesDestroyNode(pAst);
158,819✔
376
  qDestroyQueryPlan(pPlan);
158,819✔
377
  PRINT_LOG_END
158,819✔
378
  return code;
158,819✔
379
}
380

381
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
156,334✔
382
                              SUserObj *pOperUser) {
383
  if (pMnode == NULL || pReq == NULL || pCreate == NULL || pDb == NULL || pOperUser == NULL)
156,334✔
384
    return TSDB_CODE_INVALID_PARA;
×
385
  STrans *    pTrans = NULL;
156,334✔
386
  int32_t     code = 0;
156,334✔
387
  int32_t     lino = 0;
156,334✔
388
  SMqTopicObj topicObj = {0};
156,334✔
389

390
  PRINT_LOG_START
156,334✔
391
  mInfo("start to create topic:%s", pCreate->name);
156,334✔
392
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-topic");
156,334✔
393
  MND_TMQ_NULL_CHECK(pTrans);
156,334✔
394
  mndTransSetDbName(pTrans, pDb->name, NULL);
156,334✔
395
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
156,334✔
396

397
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
156,334✔
398
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
156,334✔
399
  tstrncpy(topicObj.createUser, pOperUser->name, TSDB_USER_LEN);
156,334✔
400
  topicObj.ownerId = pOperUser->uid;
156,334✔
401

402
  // MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_CREATE_TOPIC, &topicObj));
403
  if (pDb) {
156,334✔
404
    // already checked in parser, just check db use privilege here
405
    MND_TMQ_RETURN_CHECK(
156,334✔
406
        mndCheckDbPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_TOPIC, pDb));
407
  }
408

409
  topicObj.createTime = taosGetTimestampMs();
156,334✔
410
  topicObj.updateTime = topicObj.createTime;
156,334✔
411
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
156,334✔
412
  topicObj.dbUid = pDb->uid;
156,334✔
413
  topicObj.version = 1;
156,334✔
414
  topicObj.sql = taosStrdup(pCreate->sql);
156,334✔
415
  MND_TMQ_NULL_CHECK(topicObj.sql);
156,334✔
416
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
156,334✔
417
  topicObj.subType = pCreate->subType;
156,334✔
418
  topicObj.withMeta = pCreate->withMeta;
156,334✔
419
  taosInitRWLatch(&topicObj.lock);
156,334✔
420

421
  MND_TMQ_RETURN_CHECK(processAst(&topicObj, pCreate->ast));
156,334✔
422

423
  if (pCreate->subStbName[0] != 0) {
156,334✔
424
    tstrncpy(topicObj.stbName, pCreate->subStbName, TSDB_TABLE_FNAME_LEN);
116,314✔
425
    SStbObj *pStb = mndAcquireStb(pMnode, topicObj.stbName);
116,314✔
426
    MND_TMQ_NULL_CHECK(pStb);
116,314✔
427
    char stbName[TSDB_TABLE_NAME_LEN] = {0};
115,552✔
428
    mndExtractTbNameFromStbFullName(pStb->name, stbName, TSDB_TABLE_NAME_LEN);
115,552✔
429
    MND_TMQ_RETURN_CHECK(
115,552✔
430
        mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_TOPIC_CREATE, PRIV_OBJ_DB, pStb->ownerId, pDb->name, NULL));
431
    MND_TMQ_RETURN_CHECK(
115,552✔
432
        mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_TBL_SELECT, PRIV_OBJ_TBL, pStb->ownerId, pDb->name, stbName));
433
    topicObj.stbUid = pStb->uid;
115,552✔
434
    mndReleaseStb(pMnode, pStb);
115,552✔
435
  }
436

437
  if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
155,572✔
438
    MND_TMQ_RETURN_CHECK(
28,530✔
439
        mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_TOPIC_CREATE, PRIV_OBJ_DB, pDb->ownerId, pDb->name, NULL));
440
    MND_TMQ_RETURN_CHECK(mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_TBL_SELECT, PRIV_OBJ_TBL, 0, pDb->name, "*"));
28,530✔
441
  } else if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
127,042✔
442
    // TODO: check privilege on table
443
  }
444

445
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
155,572✔
446
  MND_TMQ_NULL_CHECK(pCommitRaw);
155,572✔
447
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
155,572✔
448
  if (code != 0) {
155,572✔
449
    sdbFreeRaw(pCommitRaw);
×
450
    goto END;
×
451
  }
452

453
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
155,572✔
454
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
155,572✔
455

456
END:
156,334✔
457
  PRINT_LOG_END
156,334✔
458
  taosMemoryFreeClear(topicObj.sql);
156,334✔
459
  taosMemoryFreeClear(topicObj.physicalPlan);
156,334✔
460
  if (topicObj.schema.nCols) {
156,334✔
461
    taosMemoryFreeClear(topicObj.schema.pSchema);
120,219✔
462
  }
463
  mndTransDrop(pTrans);
156,334✔
464
  return code;
156,334✔
465
}
466

467
static int32_t mndReloadTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
2,485✔
468
                              const char *userName, SMqTopicObj *topicObjOri) {
469
  if (pMnode == NULL || pReq == NULL || pCreate == NULL || pDb == NULL || userName == NULL)
2,485✔
470
    return TSDB_CODE_INVALID_PARA;
×
471
  STrans *    pTrans = NULL;
2,485✔
472
  int32_t     code = 0;
2,485✔
473
  int32_t     lino = 0;
2,485✔
474
  SMqTopicObj topicObj = {0};
2,485✔
475

476
  PRINT_LOG_START
2,485✔
477
  mInfo("start to reload topic:%s", pCreate->name);
2,485✔
478
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "reload-topic");
2,485✔
479
  MND_TMQ_NULL_CHECK(pTrans);
2,485✔
480
  mndTransSetDbName(pTrans, pDb->name, NULL);
2,485✔
481
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
2,485✔
482

483
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
2,485✔
484
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
2,485✔
485
  tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
2,485✔
486

487
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_TOPIC, &topicObj));
2,485✔
488

489
  taosRLockLatch(&topicObjOri->lock);
2,485✔
490
  topicObj.createTime = topicObjOri->createTime;
2,485✔
491
  topicObj.updateTime = taosGetTimestampMs();
2,485✔
492
  topicObj.uid = topicObjOri->uid;
2,485✔
493
  topicObj.dbUid = pDb->uid;
2,485✔
494
  topicObj.version = topicObjOri->version + 1;
2,485✔
495
  topicObj.sql = taosStrdup(pCreate->sql);
2,485✔
496
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
2,485✔
497
  topicObj.subType = pCreate->subType;
2,485✔
498
  topicObj.withMeta = pCreate->withMeta;
2,485✔
499
  taosInitRWLatch(&topicObj.lock);
2,485✔
500
  taosRUnLockLatch(&topicObjOri->lock);
2,485✔
501

502
  MND_TMQ_RETURN_CHECK(processAst(&topicObj, pCreate->ast));
2,485✔
503

504
  if (pCreate->subStbName[0] != 0) {
2,485✔
505
    tstrncpy(topicObj.stbName, pCreate->subStbName, TSDB_TABLE_FNAME_LEN);
1,420✔
506
    SStbObj *pStb = mndAcquireStb(pMnode, topicObj.stbName);
1,420✔
507
    MND_TMQ_NULL_CHECK(pStb);
1,420✔
508
    topicObj.stbUid = pStb->uid;
1,420✔
509
    mndReleaseStb(pMnode, pStb);
1,420✔
510
  }
511

512
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
2,485✔
513
  MND_TMQ_NULL_CHECK(pCommitRaw);
2,485✔
514
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
2,485✔
515
  if (code != 0) {
2,485✔
516
    sdbFreeRaw(pCommitRaw);
×
517
    goto END;
×
518
  }
519

520
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
2,485✔
521
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
2,485✔
522

523
END:
2,485✔
524
  PRINT_LOG_END
2,485✔
525
  taosMemoryFreeClear(topicObj.sql);
2,485✔
526
  taosMemoryFreeClear(topicObj.physicalPlan);
2,485✔
527
  if (topicObj.schema.nCols) {
2,485✔
528
    taosMemoryFreeClear(topicObj.schema.pSchema);
2,485✔
529
  }
530
  mndTransDrop(pTrans);
2,485✔
531
  return code;
2,485✔
532
}
533

534
static int32_t creatTopic(SRpcMsg *pReq, SCMCreateTopicReq *createTopicReq, SUserObj *pOperUser) {
159,414✔
535
  SMqTopicObj *pTopic = NULL;
159,414✔
536
  SDbObj *     pDb = NULL;
159,414✔
537
  int32_t      code = TSDB_CODE_SUCCESS;
159,414✔
538
  int32_t      lino = 0;
159,414✔
539
  SMnode *     pMnode = pReq->info.node;
159,414✔
540
  int64_t      tss = taosGetTimestampMs();
159,414✔
541

542
  PRINT_LOG_START
159,414✔
543
  mInfo("topic:%s start to create, sql:%s", createTopicReq->name, createTopicReq->sql);
159,414✔
544
  code = mndAcquireTopic(pMnode, createTopicReq->name, &pTopic);
159,414✔
545
  if (code == TSDB_CODE_SUCCESS) {
159,414✔
546
    mndReleaseTopic(pMnode, pTopic);
676✔
547
    if (createTopicReq->igExists) {
676✔
548
      mInfo("topic:%s already exist, ignore exist is set", createTopicReq->name);
676✔
549
    } else {
550
      code = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
×
551
    }
552
    goto END;
676✔
553
  } else if (code != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
158,738✔
554
    goto END;
×
555
  }
556

557
  pDb = mndAcquireDb(pMnode, createTopicReq->subDbName);
158,738✔
558
  MND_TMQ_NULL_CHECK(pDb);
158,738✔
559

560
  if (pDb->cfg.walRetentionPeriod == 0) {
157,976✔
561
    code = TSDB_CODE_MND_DB_RETENTION_PERIOD_ZERO;
×
562
    goto END;
×
563
  }
564

565
  if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum) {
157,976✔
566
    code = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE;
1,642✔
567
    goto END;
1,642✔
568
  }
569

570
  MND_TMQ_RETURN_CHECK(grantCheck(TSDB_GRANT_SUBSCRIPTION));
156,334✔
571
  MND_TMQ_RETURN_CHECK(mndCreateTopic(pMnode, pReq, createTopicReq, pDb, pOperUser));
156,334✔
572
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
155,572✔
573
    int64_t tse = taosGetTimestampMs();
155,572✔
574
    double  duration = (double)(tse - tss);
155,572✔
575
    duration = duration / 1000;
155,572✔
576
    auditRecord(pReq, pMnode->clusterId, "createTopic", createTopicReq->subDbName, createTopicReq->name,
155,572✔
577
                createTopicReq->sql, strlen(createTopicReq->sql), duration, 0);
155,572✔
578
  }
579
  code = TSDB_CODE_ACTION_IN_PROGRESS;
155,572✔
580

581
END:
159,414✔
582
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
159,414✔
583
    mError("%s failed, topic:%s since %s", __func__, createTopicReq->name, tstrerror(code));
3,166✔
584
  } else {
585
    mInfo("topic:%s create successfully", createTopicReq->name);
156,248✔
586
  }
587
  mndReleaseDb(pMnode, pDb);
159,414✔
588
  return code;
159,414✔
589
}
590

591
static int32_t reloadTopic(SRpcMsg *pReq, SCMCreateTopicReq *createTopicReq) {
3,195✔
592
  SMnode *     pMnode = pReq->info.node;
3,195✔
593
  int32_t      code = TSDB_CODE_SUCCESS;
3,195✔
594
  int32_t      lino = 0;
3,195✔
595
  SDbObj *     pDb = NULL;
3,195✔
596
  SMqTopicObj *pTopic = NULL;
3,195✔
597
  int64_t      tss = taosGetTimestampMs();
3,195✔
598

599
  PRINT_LOG_START
3,195✔
600
  code = mndAcquireTopic(pMnode, createTopicReq->name, &pTopic);
3,195✔
601
  if (code != 0) {
3,195✔
602
    if (createTopicReq->igExists) {
710✔
603
      mInfo("topic:%s, not exist, ignore not exist is set", createTopicReq->name);
355✔
604
      code = 0;
355✔
605
      goto END;
355✔
606
    } else {
607
      mError("topic:%s, failed to reload since %s", createTopicReq->name, tstrerror(code));
355✔
608
      goto END;
355✔
609
    }
610
  }
611

612
  pDb = mndAcquireDb(pMnode, createTopicReq->subDbName);
2,485✔
613
  MND_TMQ_NULL_CHECK(pDb);
2,485✔
614

615
  MND_TMQ_RETURN_CHECK(grantCheck(TSDB_GRANT_SUBSCRIPTION));
2,485✔
616
  MND_TMQ_RETURN_CHECK(mndReloadTopic(pMnode, pReq, createTopicReq, pDb, RPC_MSG_USER(pReq), pTopic));
2,485✔
617

618
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
2,485✔
619
    int64_t tse = taosGetTimestampMs();
2,485✔
620
    double  duration = (double)(tse - tss);
2,485✔
621
    duration = duration / 1000;
2,485✔
622
    auditRecord(pReq, pMnode->clusterId, "reloadTopic", createTopicReq->subDbName, createTopicReq->name,
2,485✔
623
                createTopicReq->sql, strlen(createTopicReq->sql), duration, 0);
2,485✔
624
  }
625

626
  code = TSDB_CODE_ACTION_IN_PROGRESS;
2,485✔
627

628
  if (topicsToReload == NULL) {
2,485✔
629
    topicsToReload = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
355✔
630
    MND_TMQ_NULL_CHECK(topicsToReload);
355✔
631
  }
632
  MND_TMQ_RETURN_CHECK(
2,485✔
633
      taosHashPut(topicsToReload, createTopicReq->name, strlen(createTopicReq->name), createTopicReq->name, 1));
634
  mInfo("topic:%s, marked to reload", createTopicReq->name);
2,485✔
635

636
END:
3,195✔
637
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3,195✔
638
    mError("%s failed, topic:%s since %s", __func__, createTopicReq->name, tstrerror(code));
355✔
639
  } else {
640
    mInfo("topic:%s create successfully", createTopicReq->name);
2,840✔
641
  }
642
  mndReleaseTopic(pMnode, pTopic);
3,195✔
643
  mndReleaseDb(pMnode, pDb);
3,195✔
644

645
  return code;
3,195✔
646
}
647

648
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
162,609✔
649
  if (pReq == NULL || pReq->contLen <= 0) {
162,609✔
650
    return TSDB_CODE_INVALID_MSG;
×
651
  }
652
  SMnode *pMnode = pReq->info.node;
162,609✔
653
  SUserObj *pOperUser = NULL;
162,609✔
654
  int32_t code = TSDB_CODE_SUCCESS;
162,609✔
655
  int32_t lino = 0;
162,609✔
656

657
  SCMCreateTopicReq createTopicReq = {0};
162,609✔
658

659
  PRINT_LOG_START
162,609✔
660
  MND_TMQ_RETURN_CHECK(tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq));
162,609✔
661

662
  if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser)) != 0) goto END;
162,609✔
663

664
  mInfo("topic:%s start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
162,609✔
665

666
  MND_TMQ_RETURN_CHECK(mndCheckCreateTopicReq(&createTopicReq));
162,609✔
667

668
  if (createTopicReq.reload) {
162,609✔
669
    MND_TMQ_RETURN_CHECK(reloadTopic(pReq, &createTopicReq));
3,195✔
670
  } else {
671
    MND_TMQ_RETURN_CHECK(creatTopic(pReq, &createTopicReq, pOperUser));
159,414✔
672
  }
673

674
END:
162,495✔
675
  tFreeSCMCreateTopicReq(&createTopicReq);
162,609✔
676
  mndReleaseUser(pMnode, pOperUser);
162,609✔
677
  return code;
162,609✔
678
}
679

680
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
120,943✔
681
  if (pMnode == NULL || pTrans == NULL || pReq == NULL || pTopic == NULL) {
120,943✔
682
    return TSDB_CODE_INVALID_MSG;
×
683
  }
684
  int32_t  code = 0;
120,943✔
685
  int32_t  lino = 0;
120,943✔
686
  SSdbRaw *pCommitRaw = NULL;
120,943✔
687
  PRINT_LOG_START
120,943✔
688
  char topicFName[TSDB_TOPIC_FNAME_LEN + 1] = {0};                       // 1.topic
120,943✔
689
  mndTopicGetShowName(pTopic->name, topicFName);
120,943✔
690
  char topicDbFName[TSDB_DB_NAME_LEN + TSDB_TOPIC_FNAME_LEN + 1] = {0};  // 1.db.topic
120,943✔
691
  (void)snprintf(topicDbFName, sizeof(topicDbFName), "%s.%s", pTopic->db, topicFName);
120,943✔
692
  MND_TMQ_RETURN_CHECK(mndUserRemoveTopic(pMnode, pTrans, topicDbFName));
120,943✔
693
  pCommitRaw = mndTopicActionEncode(pTopic);
120,943✔
694
  MND_TMQ_NULL_CHECK(pCommitRaw);
120,943✔
695
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
120,943✔
696
  if (code != 0) {
120,943✔
697
    sdbFreeRaw(pCommitRaw);
×
698
    goto END;
×
699
  }
700
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
120,943✔
701
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
120,943✔
702

703
END:
120,943✔
704
  PRINT_LOG_END
120,943✔
705
  return code;
120,943✔
706
}
707

708
bool checkTopic(SArray *topics, char *topicName) {
111,372✔
709
  if (topics == NULL || topicName == NULL) {
111,372✔
710
    return false;
×
711
  }
712
  int32_t sz = taosArrayGetSize(topics);
111,372✔
713
  for (int32_t i = 0; i < sz; i++) {
112,123✔
714
    char *name = taosArrayGetP(topics, i);
1,934✔
715
    if (name && strcmp(name, topicName) == 0) {
1,934✔
716
      return true;
1,183✔
717
    }
718
  }
719
  return false;
110,189✔
720
}
721

722
static int32_t checkConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName) {
36,413✔
723
  int32_t         code = 0;
36,413✔
724
  int32_t         lino = 0;
36,413✔
725
  SMqConsumerObj *pConsumerNew = NULL;
36,413✔
726

727
  taosRLockLatch(&pConsumer->lock);
36,413✔
728
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
36,413✔
729
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
36,413✔
730
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
36,413✔
731
  if (found1 || found2 || found3) {
36,413✔
732
    if (deleteConsumer) {
802✔
733
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
802✔
734
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
802✔
735
      tDeleteSMqConsumerObj(pConsumerNew);
802✔
736
      pConsumerNew = NULL;
802✔
737
    } else {
738
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
739
             pConsumer->consumerId, pConsumer->cgroup);
740
      code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
×
741
      goto END;
×
742
    }
743
  }
744
END:
36,413✔
745
  taosRUnLockLatch(&pConsumer->lock);
36,413✔
746
  tDeleteSMqConsumerObj(pConsumerNew);
36,413✔
747
  return code;
36,413✔
748
}
749

750
static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName, bool deleteConsumer) {
120,943✔
751
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) {
120,943✔
752
    return TSDB_CODE_INVALID_MSG;
×
753
  }
754
  int32_t         code = 0;
120,943✔
755
  int32_t         lino = 0;
120,943✔
756
  SSdb *          pSdb = pMnode->pSdb;
120,943✔
757
  void *          pIter = NULL;
120,943✔
758
  SMqConsumerObj *pConsumer = NULL;
120,943✔
759

760
  PRINT_LOG_START
120,943✔
761
  while (1) {
762
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
157,356✔
763
    if (pIter == NULL) {
157,356✔
764
      break;
120,943✔
765
    }
766

767
    MND_TMQ_RETURN_CHECK(checkConsumer(pTrans, pConsumer, deleteConsumer, topicName));
36,413✔
768
    sdbRelease(pSdb, pConsumer);
36,413✔
769
  }
770

771
END:
120,943✔
772
  PRINT_LOG_END
120,943✔
773
  sdbRelease(pSdb, pConsumer);
120,943✔
774
  sdbCancelFetch(pSdb, pIter);
120,943✔
775
  return code;
120,943✔
776
}
777

778
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
130,571✔
779
  if (pReq == NULL) {
130,571✔
780
    return TSDB_CODE_INVALID_MSG;
×
781
  }
782
  SMnode *       pMnode = pReq->info.node;
130,571✔
783
  SMDropTopicReq dropReq = {0};
130,571✔
784
  int32_t        code = 0;
130,571✔
785
  int32_t        lino = 0;
130,571✔
786
  SMqTopicObj *  pTopic = NULL;
130,571✔
787
  STrans *       pTrans = NULL;
130,571✔
788
  SUserObj      *pOperUser = NULL;
130,571✔
789
  int64_t        tss = taosGetTimestampMs();
130,571✔
790

791
  PRINT_LOG_START
130,571✔
792
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq));
130,571✔
793

794
  code = mndAcquireTopic(pMnode, dropReq.name, &pTopic);
130,571✔
795
  if (code != 0) {
130,571✔
796
    if (dropReq.igNotExists) {
9,086✔
797
      mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name);
9,086✔
798
      code = 0;
9,086✔
799
    }
800
    goto END;
9,086✔
801
  }
802
  taosRLockLatch(&pTopic->lock);
121,485✔
803

804
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-topic");
121,485✔
805
  MND_TMQ_NULL_CHECK(pTrans);
121,485✔
806

807
  mndTransSetDbName(pTrans, pTopic->db, NULL);
121,485✔
808
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
121,485✔
809
  mInfo("trans:%d, used to drop topic:%s, force:%d", pTrans->id, pTopic->name, dropReq.force);
121,485✔
810

811
  MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
121,485✔
812

813
  // MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, RPC_MSG_USER(pReq), MND_OPER_DROP_TOPIC, pTopic));
814
  // MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_READ_DB, pTopic->db));
815
  MND_TMQ_RETURN_CHECK(
121,485✔
816
      mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pTopic->db, true));
817
  MND_TMQ_RETURN_CHECK(mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_DROP, PRIV_OBJ_TOPIC, pTopic->ownerId,
121,317✔
818
                                                pTopic->db, mndGetDbStr(pTopic->name)));
819
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
120,943✔
820
  MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
120,943✔
821
  MND_TMQ_RETURN_CHECK(mndDropTopic(pMnode, pTrans, pReq, pTopic));
120,943✔
822
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
120,943✔
823
    int64_t tse = taosGetTimestampMs();
120,943✔
824
    double  duration = (double)(tse - tss);
120,943✔
825
    duration = duration / 1000;
120,943✔
826
    auditRecord(pReq, pMnode->clusterId, "dropTopic", pTopic->db, dropReq.name, dropReq.sql, dropReq.sqlLen, duration,
120,943✔
827
                0);
828
  }
829

830
  code = TSDB_CODE_ACTION_IN_PROGRESS;
120,943✔
831

832
END:
130,571✔
833
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
130,571✔
834
    mError("%s failed, topic:%s since %s", __func__, dropReq.name, tstrerror(code));
542✔
835
  } else {
836
    mInfo("topic:%s dropped successfully", dropReq.name);
130,029✔
837
  }
838
  if (pTopic != NULL) {
130,571✔
839
    taosRUnLockLatch(&pTopic->lock);
121,485✔
840
  }
841
  mndReleaseTopic(pMnode, pTopic);
130,571✔
842
  mndReleaseUser(pMnode, pOperUser);
130,571✔
843
  mndTransDrop(pTrans);
130,571✔
844
  tFreeSMDropTopicReq(&dropReq);
130,571✔
845
  return code;
130,571✔
846
}
847

848
int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
129,428✔
849
  if (pMnode == NULL || dbName == NULL || pNumOfTopics == NULL) {
129,428✔
850
    return TSDB_CODE_INVALID_MSG;
×
851
  }
852
  *pNumOfTopics = 0;
129,428✔
853

854
  SSdb *  pSdb = pMnode->pSdb;
129,428✔
855
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
129,428✔
856
  if (pDb == NULL) {
129,428✔
857
    return TSDB_CODE_MND_DB_NOT_SELECTED;
×
858
  }
859

860
  int32_t numOfTopics = 0;
129,428✔
861
  void *  pIter = NULL;
129,428✔
862
  while (1) {
×
863
    SMqTopicObj *pTopic = NULL;
129,428✔
864
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
129,428✔
865
    if (pIter == NULL) {
129,428✔
866
      break;
129,428✔
867
    }
868
    taosRLockLatch(&pTopic->lock);
×
869
    if (pTopic->dbUid == pDb->uid) {
×
870
      numOfTopics++;
×
871
    }
872
    taosRUnLockLatch(&pTopic->lock);
×
873

874
    sdbRelease(pSdb, pTopic);
×
875
  }
876

877
  *pNumOfTopics = numOfTopics;
129,428✔
878
  mndReleaseDb(pMnode, pDb);
129,428✔
879
  return 0;
129,428✔
880
}
881

882
static void schemaToJson(SSchema *schema, int32_t nCols, char *schemaJson) {
57,407✔
883
  if (schema == NULL || schemaJson == NULL) {
57,407✔
884
    return;
×
885
  }
886
  char *  string = NULL;
57,407✔
887
  int32_t code = 0;
57,407✔
888
  int32_t lino = 0;
57,407✔
889

890
  cJSON *cbytes = NULL;
57,407✔
891
  cJSON *ctype = NULL;
57,407✔
892
  cJSON *cname = NULL;
57,407✔
893
  cJSON *column = NULL;
57,407✔
894
  cJSON *columns = cJSON_CreateArray();
57,407✔
895
  MND_TMQ_NULL_CHECK(columns);
57,407✔
896
  for (int i = 0; i < nCols; i++) {
644,151✔
897
    column = cJSON_CreateObject();
586,744✔
898
    MND_TMQ_NULL_CHECK(column);
586,744✔
899
    SSchema *s = schema + i;
586,744✔
900
    cname = cJSON_CreateString(s->name);
586,744✔
901
    MND_TMQ_NULL_CHECK(cname);
586,744✔
902
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToObject(column, "name", cname), 0);
586,744✔
903
    cname = NULL;  // ownership transferred to column object
586,744✔
904

905
    ctype = cJSON_CreateString(tDataTypes[s->type].name);
586,744✔
906
    MND_TMQ_NULL_CHECK(ctype);
586,744✔
907
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToObject(column, "type", ctype), 0);
586,744✔
908
    ctype = NULL;  // ownership transferred to column object
586,744✔
909

910
    int32_t length = 0;
586,744✔
911
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
586,744✔
912
      length = s->bytes - VARSTR_HEADER_SIZE;
110,198✔
913
    } else if (s->type == TSDB_DATA_TYPE_NCHAR || s->type == TSDB_DATA_TYPE_JSON) {
476,546✔
914
      length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
73,314✔
915
    } else {
916
      length = s->bytes;
403,232✔
917
    }
918
    cbytes = cJSON_CreateNumber(length);
586,744✔
919
    MND_TMQ_NULL_CHECK(cbytes);
586,744✔
920
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToObject(column, "length", cbytes), 0);
586,744✔
921
    cbytes = NULL;  // ownership transferred to column object
586,744✔
922

923
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToArray(columns, column), 0);
586,744✔
924
    column = NULL;  // ownership transferred to columns array
586,744✔
925
  }
926

927
END:
57,407✔
928
  string = cJSON_PrintUnformatted(columns);
57,407✔
929
  cJSON_Delete(columns);
57,407✔
930
  cJSON_Delete(column);
57,407✔
931
  cJSON_Delete(cname);
57,407✔
932
  cJSON_Delete(ctype);
57,407✔
933
  cJSON_Delete(cbytes);
57,407✔
934

935
  size_t len = strlen(string);
57,407✔
936
  if (string && len <= TSDB_SHOW_SCHEMA_JSON_LEN) {
57,407✔
937
    STR_TO_VARSTR(schemaJson, string);
57,407✔
938
  } else {
939
    mError("mndRetrieveTopic build schema error json:%p, json len:%zu", string, len);
×
940
    STR_TO_VARSTR(schemaJson, "NULL");
×
941
  }
942
  taosMemoryFree(string);
57,407✔
943
}
944

945
static int32_t buildResult(SMqTopicObj *pTopic, int32_t *numOfRows, SMnode *pMnode, SSDataBlock *pBlock) {
59,061✔
946
  SColumnInfoData *pColInfo = NULL;
59,061✔
947
  SName            n = {0};
59,061✔
948
  int32_t          cols = 0;
59,061✔
949
  char *           schemaJson = NULL;
59,061✔
950
  char *           sql = NULL;
59,061✔
951
  int32_t          code = 0;
59,061✔
952
  int32_t          lino = 0;
59,061✔
953

954
  taosRLockLatch(&pTopic->lock);
59,061✔
955

956
  char        topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
59,061✔
957
  const char *pName = mndGetDbStr(pTopic->name);
59,061✔
958
  STR_TO_VARSTR(topicName, pName);
59,061✔
959

960
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
59,061✔
961
  MND_TMQ_NULL_CHECK(pColInfo);
59,061✔
962
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topicName, false));
59,061✔
963

964
  char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
59,061✔
965
  MND_TMQ_RETURN_CHECK(tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB));
59,061✔
966
  MND_TMQ_RETURN_CHECK(tNameGetDbName(&n, varDataVal(dbName)));
59,061✔
967
  varDataSetLen(dbName, strlen(varDataVal(dbName)));
59,061✔
968

969
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
59,061✔
970
  MND_TMQ_NULL_CHECK(pColInfo);
59,061✔
971
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)dbName, false));
59,061✔
972

973
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
59,061✔
974
  MND_TMQ_NULL_CHECK(pColInfo);
59,061✔
975
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pTopic->createTime, false));
59,061✔
976

977
  sql = taosMemoryMalloc(strlen(pTopic->sql) + VARSTR_HEADER_SIZE);
59,061✔
978
  MND_TMQ_NULL_CHECK(sql);
59,061✔
979
  STR_TO_VARSTR(sql, pTopic->sql);
59,061✔
980

981
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
59,061✔
982
  MND_TMQ_NULL_CHECK(pColInfo);
59,061✔
983
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)sql, false));
59,061✔
984

985
  taosMemoryFreeClear(sql);
59,061✔
986

987
  schemaJson = taosMemoryMalloc(TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE);
59,061✔
988
  MND_TMQ_NULL_CHECK(schemaJson);
59,061✔
989
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
59,061✔
990
    schemaToJson(pTopic->schema.pSchema, pTopic->schema.nCols, schemaJson);
57,016✔
991
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
2,045✔
992
    SStbObj *pStb = mndAcquireStb(pMnode, pTopic->stbName);
391✔
993
    if (pStb == NULL) {
391✔
994
      STR_TO_VARSTR(schemaJson, "NULL");
×
995
      mError("mndRetrieveTopic mndAcquireStb null stbName:%s", pTopic->stbName);
×
996
    } else {
997
      schemaToJson(pStb->pColumns, pStb->numOfColumns, schemaJson);
391✔
998
      mndReleaseStb(pMnode, pStb);
391✔
999
    }
1000
  } else {
1001
    STR_TO_VARSTR(schemaJson, "NULL");
1,654✔
1002
  }
1003

1004
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
59,061✔
1005
  MND_TMQ_NULL_CHECK(pColInfo);
59,061✔
1006
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)schemaJson, false));
59,061✔
1007
  taosMemoryFreeClear(schemaJson);
59,061✔
1008

1009
  (*numOfRows)++;
59,061✔
1010

1011
END:
59,061✔
1012
  taosRUnLockLatch(&pTopic->lock);
59,061✔
1013

1014
  taosMemoryFreeClear(sql);
59,061✔
1015
  taosMemoryFreeClear(schemaJson);
59,061✔
1016
  return code;
59,061✔
1017
}
1018

1019
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
22,246✔
1020
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
22,246✔
1021
    return TSDB_CODE_INVALID_MSG;
×
1022
  }
1023
  SMnode      *pMnode = pReq->info.node;
22,246✔
1024
  SSdb        *pSdb = pMnode->pSdb;
22,246✔
1025
  int32_t      numOfRows = 0;
22,246✔
1026
  SMqTopicObj *pTopic = NULL;
22,246✔
1027
  SUserObj    *pOperUser = NULL;
22,246✔
1028
  int32_t      code = 0, lino = 0;
22,246✔
1029
  char        *sql = NULL;
22,246✔
1030
  char        *schemaJson = NULL;
22,246✔
1031
  char         objFName[TSDB_OBJ_FNAME_LEN + 1] = {0};
22,246✔
1032
  bool         showAll = false;
22,246✔
1033

1034
  MND_TMQ_RETURN_CHECK(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser));
22,246✔
1035
  (void)snprintf(objFName, sizeof(objFName), "%d.*", pOperUser->acctId);
22,246✔
1036
  int32_t objLevel = privObjGetLevel(PRIV_OBJ_TOPIC);
22,246✔
1037
  showAll =
22,246✔
1038
      (0 == mndCheckSysObjPrivilege(pMnode, pOperUser, RPC_MSG_TOKEN(pReq), PRIV_CM_SHOW, PRIV_OBJ_TOPIC, 0, objFName,
22,246✔
1039
                                    objLevel == 0 ? NULL : "*"));  // 1.*.*
1040

1041
  PRINT_LOG_START
22,246✔
1042

1043
  while (numOfRows < rowsCapacity) {
81,811✔
1044
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
81,811✔
1045
    if (pShow->pIter == NULL) break;
81,811✔
1046

1047
    if (!showAll) {
59,565✔
1048
      if (mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_TOPIC, pTopic->ownerId, pTopic->db,
1,344✔
1049
                                   objLevel == 0 ? NULL : mndGetDbStr(pTopic->name)) != 0) {  // 1.topic1
672✔
1050
        sdbRelease(pSdb, pTopic);
504✔
1051
        continue;
504✔
1052
      }
1053
    }
1054

1055
    MND_TMQ_RETURN_CHECK(buildResult(pTopic, &numOfRows, pMnode, pBlock));
59,061✔
1056
    sdbRelease(pSdb, pTopic);
59,061✔
1057
    pTopic = NULL;
59,061✔
1058
  }
1059
  pShow->numOfRows += numOfRows;
22,246✔
1060

1061
END:
22,246✔
1062
  sdbCancelFetch(pSdb, pShow->pIter);
22,246✔
1063
  sdbRelease(pSdb, pTopic);
22,246✔
1064
  mndReleaseUser(pMnode, pOperUser);
22,246✔
1065
  if (code != TSDB_CODE_SUCCESS) {
22,246✔
1066
    mError("%s failed since %s", __func__, tstrerror(code));
×
1067
    return code;
×
1068
  } else {
1069
    mDebug("%s retrieved %d rows successfully", __func__, numOfRows);
22,246✔
1070
    return numOfRows;
22,246✔
1071
  }
1072
}
1073

1074
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
×
1075
  if (pMnode == NULL) return;
×
1076
  SSdb *pSdb = pMnode->pSdb;
×
1077
  sdbCancelFetchByType(pSdb, pIter, SDB_TOPIC);
×
1078
}
1079

1080
bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) {
817,044✔
1081
  if (pMnode == NULL || pDb == NULL) {
817,044✔
1082
    return false;
×
1083
  }
1084
  SSdb *       pSdb = pMnode->pSdb;
817,044✔
1085
  void *       pIter = NULL;
817,044✔
1086
  SMqTopicObj *pTopic = NULL;
817,044✔
1087

1088
  while (1) {
62,616✔
1089
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
879,660✔
1090
    if (pIter == NULL) {
879,660✔
1091
      break;
816,634✔
1092
    }
1093

1094
    taosRLockLatch(&pTopic->lock);
63,026✔
1095
    bool found = pTopic->dbUid == pDb->uid;
63,026✔
1096
    taosRUnLockLatch(&pTopic->lock);
63,026✔
1097

1098
    if (found) {
63,026✔
1099
      sdbRelease(pSdb, pTopic);
410✔
1100
      sdbCancelFetch(pSdb, pIter);
410✔
1101
      return true;
410✔
1102
    }
1103

1104
    sdbRelease(pSdb, pTopic);
62,616✔
1105
  }
1106

1107
  return false;
816,634✔
1108
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc