• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4885

15 Dec 2025 03:26AM UTC coverage: 65.258% (+4.6%) from 60.617%
#4885

push

travis-ci

web-flow
feat(tmq): [TS-6379]remove limition for table operation in tmq  (#33834)

872 of 1074 new or added lines in 16 files covered. (81.19%)

659 existing lines in 92 files now uncovered.

177890 of 272597 relevant lines covered (65.26%)

103732965.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.04
/source/dnode/mnode/impl/src/mndTopic.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *f
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndTopic.h"
17
#include "audit.h"
18
#include "mndConsumer.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndMnode.h"
22
#include "mndPrivilege.h"
23
#include "mndShow.h"
24
#include "mndStb.h"
25
#include "mndSubscribe.h"
26
#include "mndTrans.h"
27
#include "mndUser.h"
28
#include "mndVgroup.h"
29
#include "osMemPool.h"
30
#include "parser.h"
31
#include "tlockfree.h"
32
#include "tname.h"
33

34
#define MND_TOPIC_VER_NUMBER   4
35
#define MND_TOPIC_RESERVE_SIZE 64
36

37
SHashObj *topicsToReload = NULL;
38

39
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
40
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
41

42
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
43
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
44
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic);
45
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
46
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
47

48
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
50
static int32_t processAst(SMqTopicObj *topicObj, const char *ast);
51

52
int32_t mndInitTopic(SMnode *pMnode) {
505,964✔
53
  SSdbTable table = {
505,964✔
54
      .sdbType = SDB_TOPIC,
55
      .keyType = SDB_KEY_BINARY,
56
      .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
57
      .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
58
      .insertFp = (SdbInsertFp)mndTopicActionInsert,
59
      .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
60
      .deleteFp = (SdbDeleteFp)mndTopicActionDelete,
61
  };
62

63
  if (pMnode == NULL) {
505,964✔
64
    return TSDB_CODE_INVALID_PARA;
×
65
  }
66
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CREATE_TOPIC, mndProcessCreateTopicReq);
505,964✔
67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_TOPIC, mndProcessDropTopicReq);
505,964✔
68

69
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
505,964✔
70
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
505,964✔
71

72
  return sdbSetTable(pMnode->pSdb, table);
505,964✔
73
}
74

75
void mndCleanupTopic(SMnode *pMnode) {}
505,845✔
76

77
void mndTopicGetShowName(const char *fullTopic, char *topic) {
21,320✔
78
  if (fullTopic == NULL) {
21,320✔
79
    return;
×
80
  }
81
  char *tmp = strchr(fullTopic, '.');
21,320✔
82
  if (tmp == NULL) {
21,320✔
83
    tstrncpy(topic, fullTopic, TSDB_TOPIC_FNAME_LEN);
×
84
  } else {
85
    tstrncpy(topic, tmp + 1, TSDB_TOPIC_FNAME_LEN);
21,320✔
86
  }
87
}
88

89
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
103,153✔
90
  if (pTopic == NULL) {
103,153✔
91
    return NULL;
×
92
  }
93
  int32_t code = 0;
103,153✔
94
  int32_t lino = 0;
103,153✔
95
  terrno = TSDB_CODE_OUT_OF_MEMORY;
103,153✔
96

97
  void *  swBuf = NULL;
103,153✔
98
  int32_t physicalPlanLen = 0;
103,153✔
99
  if (pTopic->physicalPlan) {
103,153✔
100
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
103,153✔
101
  }
102

103
  int32_t schemaLen = 0;
103,153✔
104
  if (pTopic->schema.nCols) {
103,153✔
105
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
180,398✔
106
  }
107

108
  int32_t  size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
103,153✔
109
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
103,153✔
110
  if (pRaw == NULL) {
103,153✔
111
    goto TOPIC_ENCODE_OVER;
×
112
  }
113

114
  int32_t dataPos = 0;
103,153✔
115
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
103,153✔
116
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
103,153✔
117
  SDB_SET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_ENCODE_OVER);
103,153✔
118
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
103,153✔
119
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
103,153✔
120
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
103,153✔
121
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
103,153✔
122
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
103,153✔
123
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
103,153✔
124
  SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER);
103,153✔
125

126
  SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
103,153✔
127
  SDB_SET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER);
103,153✔
128
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
103,153✔
129
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
103,153✔
130
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
103,153✔
131
  if (physicalPlanLen) {
103,153✔
132
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
103,153✔
133
  }
134
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
103,153✔
135
  if (schemaLen) {
103,153✔
136
    swBuf = taosMemoryMalloc(schemaLen);
90,199✔
137
    if (swBuf == NULL) {
90,199✔
138
      goto TOPIC_ENCODE_OVER;
×
139
    }
140
    void *aswBuf = swBuf;
90,199✔
141
    if (taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema) < 0) {
180,398✔
142
      goto TOPIC_ENCODE_OVER;
×
143
    }
144
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
90,199✔
145
  }
146

147
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
103,153✔
148
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
103,153✔
149

150
  terrno = TSDB_CODE_SUCCESS;
103,153✔
151

152
TOPIC_ENCODE_OVER:
103,153✔
153
  if (swBuf) taosMemoryFree(swBuf);
103,153✔
154
  if (terrno != TSDB_CODE_SUCCESS) {
103,153✔
155
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
×
156
    sdbFreeRaw(pRaw);
×
157
    return NULL;
×
158
  }
159

160
  mDebug("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
103,153✔
161
  return pRaw;
103,153✔
162
}
163

164
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
86,298✔
165
  if (pRaw == NULL) return NULL;
86,298✔
166
  int32_t code = 0;
86,298✔
167
  int32_t lino = 0;
86,298✔
168
  terrno = TSDB_CODE_OUT_OF_MEMORY;
86,298✔
169
  SSdbRow *    pRow = NULL;
86,298✔
170
  SMqTopicObj *pTopic = NULL;
86,298✔
171
  void *       buf = NULL;
86,298✔
172
  char*        ast = NULL;
86,298✔
173

174
  int8_t sver = 0;
86,298✔
175
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
86,298✔
176

177
  if (sver < 1 || sver > MND_TOPIC_VER_NUMBER) {
86,298✔
178
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
179
    goto TOPIC_DECODE_OVER;
×
180
  }
181

182
  pRow = sdbAllocRow(sizeof(SMqTopicObj));
86,298✔
183
  if (pRow == NULL) goto TOPIC_DECODE_OVER;
86,298✔
184

185
  pTopic = sdbGetRowObj(pRow);
86,298✔
186
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
86,298✔
187

188
  int32_t len = 0;
86,298✔
189
  int32_t dataPos = 0;
86,298✔
190
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
86,298✔
191
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
86,298✔
192
  if (sver >= 2) {
86,298✔
193
    SDB_GET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_DECODE_OVER);
86,298✔
194
  }
195
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
86,298✔
196
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
86,298✔
197
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
86,298✔
198
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
86,298✔
199
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
86,298✔
200
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
86,298✔
201
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
86,298✔
202

203
  SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
86,298✔
204
  if (sver >= 3) {
86,298✔
205
    SDB_GET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
86,298✔
206
  }
207
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
86,298✔
208
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
86,298✔
209
  if (pTopic->sql == NULL) {
86,298✔
210
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
211
    goto TOPIC_DECODE_OVER;
×
212
  }
213
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
86,298✔
214

215
  if (sver < 4) {
86,298✔
NEW
216
    int32_t astLen = 0;
×
NEW
217
    SDB_GET_INT32(pRaw, dataPos, &astLen, TOPIC_DECODE_OVER);
×
NEW
218
    if (astLen) {
×
NEW
219
      ast = taosMemoryCalloc(astLen, sizeof(char));
×
NEW
220
      if (ast == NULL) {
×
NEW
221
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
222
        goto TOPIC_DECODE_OVER;
×
223
      }
NEW
224
      SDB_GET_BINARY(pRaw, dataPos, ast, astLen, TOPIC_DECODE_OVER);
×
NEW
225
      terrno = processAst(pTopic, ast);
×
NEW
226
      if (terrno != TSDB_CODE_SUCCESS) {
×
NEW
227
        goto TOPIC_DECODE_OVER;
×
228
      }
229
    }
230
  } else {
231
    SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
86,298✔
232
    if (len) {
86,298✔
233
      pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
86,298✔
234
      if (pTopic->physicalPlan == NULL) {
86,298✔
NEW
235
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
236
        goto TOPIC_DECODE_OVER;
×
237
      }
238
      SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
86,298✔
239
    } else {
NEW
240
      pTopic->physicalPlan = NULL;
×
241
    }
242

243
    SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
86,298✔
244
    if (len) {
86,298✔
245
      buf = taosMemoryMalloc(len);
75,209✔
246
      if (buf == NULL) {
75,209✔
NEW
247
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
248
        goto TOPIC_DECODE_OVER;
×
249
      }
250
      SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
75,209✔
251
      if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
150,418✔
NEW
252
        goto TOPIC_DECODE_OVER;
×
253
      }
254
    } else {
255
      pTopic->schema.nCols = 0;
11,089✔
256
      pTopic->schema.version = 0;
11,089✔
257
      pTopic->schema.pSchema = NULL;
11,089✔
258
    }
259
  }
260

261
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
86,298✔
262
  terrno = TSDB_CODE_SUCCESS;
86,298✔
263

264
TOPIC_DECODE_OVER:
86,298✔
265
  taosMemoryFreeClear(buf);
86,298✔
266
  taosMemoryFreeClear(ast);
86,298✔
267

268
  if (terrno != TSDB_CODE_SUCCESS) {
86,298✔
269
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic == NULL ? "null" : pTopic->name, pRaw, terrstr());
×
270
    taosMemoryFreeClear(pRow);
×
271
    return NULL;
×
272
  }
273

274
  mDebug("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
86,298✔
275
  return pRow;
86,298✔
276
}
277

278
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
54,732✔
279
  mDebug("topic:%s perform insert action", pTopic != NULL ? pTopic->name : "null");
54,732✔
280
  return 0;
54,732✔
281
}
282

283
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
86,298✔
284
  if (pTopic == NULL) return 0;
86,298✔
285
  mDebug("%p topic:%s perform delete action", pTopic, pTopic->name);
86,298✔
286
  taosMemoryFreeClear(pTopic->sql);
86,298✔
287
  taosMemoryFreeClear(pTopic->physicalPlan);
86,298✔
288
  if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
86,298✔
289
  return 0;
86,298✔
290
}
291

292
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
×
293
  if (pOldTopic == NULL || pNewTopic == NULL) return 0;
×
NEW
294
  mDebug("topic:%s perform update action", pOldTopic->name);
×
NEW
295
  taosWLockLatch(&pOldTopic->lock);
×
NEW
296
  SMqTopicObj tmpTopic = *pOldTopic;
×
NEW
297
  (void)memcpy(pOldTopic, pNewTopic, offsetof(SMqTopicObj, lock));
×
NEW
298
  *pNewTopic = tmpTopic;
×
NEW
299
  taosWUnLockLatch(&pOldTopic->lock);
×
UNCOV
300
  return 0;
×
301
}
302

303
int32_t mndAcquireTopic(SMnode *pMnode, const char *topicName, SMqTopicObj **pTopic) {
808,164✔
304
  if (pMnode == NULL || topicName == NULL || pTopic == NULL) {
808,164✔
305
    return TSDB_CODE_INVALID_PARA;
×
306
  }
307
  SSdb *pSdb = pMnode->pSdb;
808,164✔
308
  *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
808,164✔
309
  if (*pTopic == NULL) {
808,164✔
310
    return TSDB_CODE_MND_TOPIC_NOT_EXIST;
52,800✔
311
  }
312
  return TSDB_CODE_SUCCESS;
755,364✔
313
}
314

315
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
820,657✔
316
  if (pMnode == NULL) return;
820,657✔
317
  SSdb *pSdb = pMnode->pSdb;
820,657✔
318
  sdbRelease(pSdb, pTopic);
820,657✔
319
}
320

321
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
50,284✔
322
  if (pCreate == NULL) return TSDB_CODE_INVALID_PARA;
50,284✔
323
  if (pCreate->sql == NULL) return TSDB_CODE_MND_INVALID_TOPIC;
50,284✔
324

325
  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
50,284✔
326
    if (pCreate->ast == NULL || pCreate->ast[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
38,765✔
327
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
11,519✔
328
    if (pCreate->subStbName[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
5,062✔
329
  } else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
6,457✔
330
    if (pCreate->subDbName[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
6,457✔
331
  }
332

333
  return 0;
50,284✔
334
}
335

336
static int32_t processAst(SMqTopicObj *topicObj, const char *ast) {
46,982✔
337
  SNode *     pAst = NULL;
46,982✔
338
  SQueryPlan *pPlan = NULL;
46,982✔
339
  int32_t     code = TSDB_CODE_SUCCESS;
46,982✔
340
  int32_t     lino = 0;
46,982✔
341

342
  PRINT_LOG_START
46,982✔
343
  if (ast == NULL) {
46,982✔
344
    topicObj->physicalPlan = taosStrdup("");
7,173✔
345
    goto END;
7,173✔
346
  }
347
  qDebugL("%s topic:%s ast %s", __func__, topicObj->name, ast);
39,809✔
348
  MND_TMQ_RETURN_CHECK(nodesStringToNode(ast, &pAst));
39,809✔
349
  MND_TMQ_RETURN_CHECK(qExtractResultSchema(pAst, &topicObj->schema.nCols, &topicObj->schema.pSchema));
39,809✔
350

351
  SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
39,809✔
352
  MND_TMQ_RETURN_CHECK(qCreateQueryPlan(&cxt, &pPlan, NULL));
39,809✔
353
  if (pPlan == NULL) {
39,809✔
NEW
354
    code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
NEW
355
    goto END;
×
356
  }
357
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
39,809✔
358
  if (levelNum != 1) {
39,809✔
NEW
359
    code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
NEW
360
    goto END;
×
361
  }
362

363
  SNodeListNode *pNodeListNode = (SNodeListNode *)nodesListGetNode(pPlan->pSubplans, 0);
39,809✔
364
  MND_TMQ_NULL_CHECK(pNodeListNode);
39,809✔
365
  int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
39,809✔
366
  if (opNum != 1) {
39,809✔
NEW
367
    code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
NEW
368
    goto END;
×
369
  }
370

371
  code = nodesNodeToString(nodesListGetNode(pNodeListNode->pNodeList, 0), false, &topicObj->physicalPlan, NULL);
39,809✔
372

373
END:
46,982✔
374
  nodesDestroyNode(pAst);
46,982✔
375
  qDestroyQueryPlan(pPlan);
46,982✔
376
  PRINT_LOG_END
46,982✔
377
  return code;
46,982✔
378
}
379

380
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
48,720✔
381
                              const char *userName) {
382
  if (pMnode == NULL || pReq == NULL || pCreate == NULL || pDb == NULL || userName == NULL)
48,720✔
NEW
383
    return TSDB_CODE_INVALID_PARA;
×
384
  STrans *    pTrans = NULL;
48,720✔
385
  int32_t     code = 0;
48,720✔
386
  int32_t     lino = 0;
48,720✔
387
  SMqTopicObj topicObj = {0};
48,720✔
388

389
  PRINT_LOG_START
48,720✔
390
  mInfo("start to create topic:%s", pCreate->name);
48,720✔
391
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-topic");
48,720✔
392
  MND_TMQ_NULL_CHECK(pTrans);
48,720✔
393
  mndTransSetDbName(pTrans, pDb->name, NULL);
48,720✔
394
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
48,720✔
395

396
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
48,720✔
397
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
48,720✔
398
  tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
48,720✔
399

400
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj));
48,720✔
401

402
  topicObj.createTime = taosGetTimestampMs();
46,982✔
403
  topicObj.updateTime = topicObj.createTime;
46,982✔
404
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
46,982✔
405
  topicObj.dbUid = pDb->uid;
46,982✔
406
  topicObj.version = 1;
46,982✔
407
  topicObj.sql = taosStrdup(pCreate->sql);
46,982✔
408
  MND_TMQ_NULL_CHECK(topicObj.sql);
46,982✔
409
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
46,982✔
410
  topicObj.subType = pCreate->subType;
46,982✔
411
  topicObj.withMeta = pCreate->withMeta;
46,982✔
412
  taosInitRWLatch(&topicObj.lock);
46,982✔
413

414
  MND_TMQ_RETURN_CHECK(processAst(&topicObj, pCreate->ast));
46,982✔
415

416
  if (pCreate->subStbName[0] != 0) {
46,982✔
417
    tstrncpy(topicObj.stbName, pCreate->subStbName, TSDB_TABLE_FNAME_LEN);
35,942✔
418
    SStbObj *pStb = mndAcquireStb(pMnode, topicObj.stbName);
35,942✔
419
    MND_TMQ_NULL_CHECK(pStb);
35,942✔
420
    topicObj.stbUid = pStb->uid;
34,960✔
421
    mndReleaseStb(pMnode, pStb);
34,960✔
422
  }
423

424
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
46,000✔
425
  MND_TMQ_NULL_CHECK(pCommitRaw);
46,000✔
426
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
46,000✔
427
  if (code != 0) {
46,000✔
428
    sdbFreeRaw(pCommitRaw);
×
429
    goto END;
×
430
  }
431

432
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
46,000✔
433
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
46,000✔
434

435
END:
48,720✔
436
  PRINT_LOG_END
48,720✔
437
  taosMemoryFreeClear(topicObj.sql);
48,720✔
438
  taosMemoryFreeClear(topicObj.physicalPlan);
48,720✔
439
  if (topicObj.schema.nCols) {
48,720✔
440
    taosMemoryFreeClear(topicObj.schema.pSchema);
39,809✔
441
  }
442
  mndTransDrop(pTrans);
48,720✔
443
  return code;
48,720✔
444
}
445

NEW
446
static int32_t mndReloadTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
×
447
                              const char *userName, SMqTopicObj *topicObjOri) {
NEW
448
  if (pMnode == NULL || pReq == NULL || pCreate == NULL || pDb == NULL || userName == NULL)
×
NEW
449
    return TSDB_CODE_INVALID_PARA;
×
NEW
450
  STrans *    pTrans = NULL;
×
NEW
451
  int32_t     code = 0;
×
NEW
452
  int32_t     lino = 0;
×
NEW
453
  SMqTopicObj topicObj = {0};
×
454

UNCOV
455
  PRINT_LOG_START
×
NEW
456
  mInfo("start to reload topic:%s", pCreate->name);
×
NEW
457
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "reload-topic");
×
NEW
458
  MND_TMQ_NULL_CHECK(pTrans);
×
NEW
459
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
NEW
460
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
×
461

NEW
462
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
×
NEW
463
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
×
NEW
464
  tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
×
465

NEW
466
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC, &topicObj));
×
467

NEW
468
  taosRLockLatch(&topicObjOri->lock);
×
NEW
469
  topicObj.createTime = topicObjOri->createTime;
×
NEW
470
  topicObj.updateTime = taosGetTimestampMs();
×
NEW
471
  topicObj.uid = topicObjOri->uid;
×
NEW
472
  topicObj.dbUid = pDb->uid;
×
NEW
473
  topicObj.version = topicObjOri->version + 1;
×
NEW
474
  topicObj.sql = taosStrdup(pCreate->sql);
×
NEW
475
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
×
NEW
476
  topicObj.subType = pCreate->subType;
×
NEW
477
  topicObj.withMeta = pCreate->withMeta;
×
NEW
478
  taosInitRWLatch(&topicObj.lock);
×
NEW
479
  taosRUnLockLatch(&topicObjOri->lock);
×
480

NEW
481
  MND_TMQ_RETURN_CHECK(processAst(&topicObj, pCreate->ast));
×
482

NEW
483
  if (pCreate->subStbName[0] != 0) {
×
NEW
484
    tstrncpy(topicObj.stbName, pCreate->subStbName, TSDB_TABLE_FNAME_LEN);
×
NEW
485
    SStbObj *pStb = mndAcquireStb(pMnode, topicObj.stbName);
×
NEW
486
    MND_TMQ_NULL_CHECK(pStb);
×
NEW
487
    topicObj.stbUid = pStb->uid;
×
NEW
488
    mndReleaseStb(pMnode, pStb);
×
489
  }
490

NEW
491
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
×
NEW
492
  MND_TMQ_NULL_CHECK(pCommitRaw);
×
NEW
493
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
×
NEW
494
  if (code != 0) {
×
NEW
495
    sdbFreeRaw(pCommitRaw);
×
UNCOV
496
    goto END;
×
497
  }
498

NEW
499
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
NEW
500
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
×
501

NEW
502
END:
×
NEW
503
  PRINT_LOG_END
×
NEW
504
  taosMemoryFreeClear(topicObj.sql);
×
NEW
505
  taosMemoryFreeClear(topicObj.physicalPlan);
×
NEW
506
  if (topicObj.schema.nCols) {
×
NEW
507
    taosMemoryFreeClear(topicObj.schema.pSchema);
×
508
  }
NEW
509
  mndTransDrop(pTrans);
×
NEW
510
  return code;
×
511
}
512

513
static int32_t creatTopic(SRpcMsg *pReq, SCMCreateTopicReq *createTopicReq) {
50,284✔
514
  SMqTopicObj *pTopic = NULL;
50,284✔
515
  SDbObj *     pDb = NULL;
50,284✔
516
  int32_t      code = TSDB_CODE_SUCCESS;
50,284✔
517
  int32_t      lino = 0;
50,284✔
518
  SMnode *     pMnode = pReq->info.node;
50,284✔
519
  PRINT_LOG_START
50,284✔
520
  mInfo("topic:%s start to create, sql:%s", createTopicReq->name, createTopicReq->sql);
50,284✔
521
  code = mndAcquireTopic(pMnode, createTopicReq->name, &pTopic);
50,284✔
522
  if (code == TSDB_CODE_SUCCESS) {
50,284✔
523
    mndReleaseTopic(pMnode, pTopic);
215✔
524
    if (createTopicReq->igExists) {
215✔
525
      mInfo("topic:%s already exist, ignore exist is set", createTopicReq->name);
215✔
526
    } else {
527
      code = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
×
528
    }
529
    goto END;
215✔
530
  } else if (code != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
50,069✔
531
    goto END;
×
532
  }
533

534
  pDb = mndAcquireDb(pMnode, createTopicReq->subDbName);
50,069✔
535
  MND_TMQ_NULL_CHECK(pDb);
50,069✔
536

537
  if (pDb->cfg.walRetentionPeriod == 0) {
49,153✔
538
    code = TSDB_CODE_MND_DB_RETENTION_PERIOD_ZERO;
×
539
    goto END;
×
540
  }
541

542
  if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum) {
49,153✔
543
    code = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE;
433✔
544
    goto END;
433✔
545
  }
546

547
  MND_TMQ_RETURN_CHECK(grantCheck(TSDB_GRANT_SUBSCRIPTION));
48,720✔
548
  MND_TMQ_RETURN_CHECK(mndCreateTopic(pMnode, pReq, createTopicReq, pDb, pReq->info.conn.user));
48,720✔
549

550
  auditRecord(pReq, pMnode->clusterId, "createTopic", createTopicReq->subDbName, createTopicReq->name,
46,000✔
551
              createTopicReq->sql, strlen(createTopicReq->sql));
46,000✔
552
  code = TSDB_CODE_ACTION_IN_PROGRESS;
46,000✔
553

554
END:
50,284✔
555
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
50,284✔
556
    mError("%s failed, topic:%s since %s", __func__, createTopicReq->name, tstrerror(code));
4,069✔
557
  } else {
558
    mInfo("topic:%s create successfully", createTopicReq->name);
46,215✔
559
  }
560
  mndReleaseDb(pMnode, pDb);
50,284✔
561
  return code;
50,284✔
562
}
563

NEW
564
static int32_t reloadTopic(SRpcMsg *pReq, SCMCreateTopicReq *createTopicReq) {
×
NEW
565
  SMnode *     pMnode = pReq->info.node;
×
NEW
566
  int32_t      code = TSDB_CODE_SUCCESS;
×
NEW
567
  int32_t      lino = 0;
×
NEW
568
  SDbObj *     pDb = NULL;
×
NEW
569
  SMqTopicObj *pTopic = NULL;
×
570

NEW
571
  PRINT_LOG_START
×
NEW
572
  code = mndAcquireTopic(pMnode, createTopicReq->name, &pTopic);
×
NEW
573
  if (code != 0) {
×
NEW
574
    if (createTopicReq->igExists) {
×
NEW
575
      mInfo("topic:%s, not exist, ignore not exist is set", createTopicReq->name);
×
NEW
576
      code = 0;
×
NEW
577
      goto END;
×
578
    } else {
NEW
579
      mError("topic:%s, failed to reload since %s", createTopicReq->name, tstrerror(code));
×
NEW
580
      goto END;
×
581
    }
582
  }
583

NEW
584
  pDb = mndAcquireDb(pMnode, createTopicReq->subDbName);
×
NEW
585
  MND_TMQ_NULL_CHECK(pDb);
×
586

NEW
587
  MND_TMQ_RETURN_CHECK(grantCheck(TSDB_GRANT_SUBSCRIPTION));
×
NEW
588
  MND_TMQ_RETURN_CHECK(mndReloadTopic(pMnode, pReq, createTopicReq, pDb, pReq->info.conn.user, pTopic));
×
589

NEW
590
  auditRecord(pReq, pMnode->clusterId, "reloadTopic", createTopicReq->subDbName, createTopicReq->name,
×
NEW
591
              createTopicReq->sql, strlen(createTopicReq->sql));
×
NEW
592
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
593

NEW
594
  if (topicsToReload == NULL) {
×
NEW
595
    topicsToReload = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
×
NEW
596
    MND_TMQ_NULL_CHECK(topicsToReload);
×
597
  }
NEW
598
  MND_TMQ_RETURN_CHECK(
×
599
      taosHashPut(topicsToReload, createTopicReq->name, strlen(createTopicReq->name), createTopicReq->name, 1));
NEW
600
  mInfo("topic:%s, marked to reload", createTopicReq->name);
×
601

UNCOV
602
END:
×
UNCOV
603
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
604
    mError("%s failed, topic:%s since %s", __func__, createTopicReq->name, tstrerror(code));
×
605
  } else {
NEW
606
    mInfo("topic:%s create successfully", createTopicReq->name);
×
607
  }
UNCOV
608
  mndReleaseTopic(pMnode, pTopic);
×
UNCOV
609
  mndReleaseDb(pMnode, pDb);
×
610

NEW
611
  return code;
×
612
}
613

614
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
50,284✔
615
  if (pReq == NULL || pReq->contLen <= 0) {
50,284✔
NEW
616
    return TSDB_CODE_INVALID_MSG;
×
617
  }
618
  SMnode *pMnode = pReq->info.node;
50,284✔
619
  int32_t code = TSDB_CODE_SUCCESS;
50,284✔
620
  int32_t lino = 0;
50,284✔
621

622
  SCMCreateTopicReq createTopicReq = {0};
50,284✔
623

624
  PRINT_LOG_START
50,284✔
625
  MND_TMQ_RETURN_CHECK(tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq));
50,284✔
626

627
  mInfo("topic:%s start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
50,284✔
628

629
  MND_TMQ_RETURN_CHECK(mndCheckCreateTopicReq(&createTopicReq));
50,284✔
630

631
  if (createTopicReq.reload) {
50,284✔
NEW
632
    MND_TMQ_RETURN_CHECK(reloadTopic(pReq, &createTopicReq));
×
633
  } else {
634
    MND_TMQ_RETURN_CHECK(creatTopic(pReq, &createTopicReq));
50,284✔
635
  }
636

637
END:
50,281✔
638
  tFreeSCMCreateTopicReq(&createTopicReq);
50,284✔
639
  return code;
50,284✔
640
}
641

642
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
31,566✔
643
  if (pMnode == NULL || pTrans == NULL || pReq == NULL || pTopic == NULL) {
31,566✔
644
    return TSDB_CODE_INVALID_MSG;
×
645
  }
646
  int32_t  code = 0;
31,566✔
647
  int32_t  lino = 0;
31,566✔
648
  SSdbRaw *pCommitRaw = NULL;
31,566✔
649
  PRINT_LOG_START
31,566✔
650
  MND_TMQ_RETURN_CHECK(mndUserRemoveTopic(pMnode, pTrans, pTopic->name));
31,566✔
651
  pCommitRaw = mndTopicActionEncode(pTopic);
31,566✔
652
  MND_TMQ_NULL_CHECK(pCommitRaw);
31,566✔
653
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
31,566✔
654
  if (code != 0) {
31,566✔
655
    sdbFreeRaw(pCommitRaw);
×
656
    goto END;
×
657
  }
658
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
31,566✔
659
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
31,566✔
660

661
END:
31,566✔
662
  PRINT_LOG_END
31,566✔
663
  return code;
31,566✔
664
}
665

666
bool checkTopic(SArray *topics, char *topicName) {
27,033✔
667
  if (topics == NULL || topicName == NULL) {
27,033✔
668
    return false;
×
669
  }
670
  int32_t sz = taosArrayGetSize(topics);
27,033✔
671
  for (int32_t i = 0; i < sz; i++) {
27,510✔
672
    char *name = taosArrayGetP(topics, i);
998✔
673
    if (name && strcmp(name, topicName) == 0) {
998✔
674
      return true;
521✔
675
    }
676
  }
677
  return false;
26,512✔
678
}
679

680
static int32_t checkConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName) {
8,956✔
681
  int32_t         code = 0;
8,956✔
682
  int32_t         lino = 0;
8,956✔
683
  SMqConsumerObj *pConsumerNew = NULL;
8,956✔
684

685
  taosRLockLatch(&pConsumer->lock);
8,956✔
686
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
8,956✔
687
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
8,956✔
688
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
8,956✔
689
  if (found1 || found2 || found3) {
8,956✔
690
    if (deleteConsumer) {
488✔
691
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
488✔
692
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
488✔
693
      tDeleteSMqConsumerObj(pConsumerNew);
488✔
694
      pConsumerNew = NULL;
488✔
695
    } else {
NEW
696
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
697
             pConsumer->consumerId, pConsumer->cgroup);
NEW
698
      code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
×
NEW
699
      goto END;
×
700
    }
701
  }
702
END:
8,956✔
703
  taosRUnLockLatch(&pConsumer->lock);
8,956✔
704
  tDeleteSMqConsumerObj(pConsumerNew);
8,956✔
705
  return code;
8,956✔
706
}
707

708
static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName, bool deleteConsumer) {
31,566✔
709
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) {
31,566✔
UNCOV
710
    return TSDB_CODE_INVALID_MSG;
×
711
  }
712
  int32_t         code = 0;
31,566✔
713
  int32_t         lino = 0;
31,566✔
714
  SSdb *          pSdb = pMnode->pSdb;
31,566✔
715
  void *          pIter = NULL;
31,566✔
716
  SMqConsumerObj *pConsumer = NULL;
31,566✔
717

718
  PRINT_LOG_START
31,566✔
719
  while (1) {
720
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
40,522✔
721
    if (pIter == NULL) {
40,522✔
722
      break;
31,566✔
723
    }
724

725
    MND_TMQ_RETURN_CHECK(checkConsumer(pTrans, pConsumer, deleteConsumer, topicName));
8,956✔
726
    sdbRelease(pSdb, pConsumer);
8,956✔
727
  }
728

729
END:
31,566✔
730
  PRINT_LOG_END
31,566✔
731
  sdbRelease(pSdb, pConsumer);
31,566✔
732
  sdbCancelFetch(pSdb, pIter);
31,566✔
733
  return code;
31,566✔
734
}
735

736
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
36,986✔
737
  if (pReq == NULL) {
36,986✔
738
    return TSDB_CODE_INVALID_MSG;
×
739
  }
740
  SMnode *       pMnode = pReq->info.node;
36,986✔
741
  SMDropTopicReq dropReq = {0};
36,986✔
742
  int32_t        code = 0;
36,986✔
743
  int32_t        lino = 0;
36,986✔
744
  SMqTopicObj *  pTopic = NULL;
36,986✔
745
  STrans *       pTrans = NULL;
36,986✔
746

747
  PRINT_LOG_START
36,986✔
748
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq));
36,986✔
749

750
  code = mndAcquireTopic(pMnode, dropReq.name, &pTopic);
36,986✔
751
  if (code != 0) {
36,986✔
752
    if (dropReq.igNotExists) {
2,306✔
753
      mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name);
2,306✔
754
      code = 0;
2,306✔
755
    }
756
    goto END;
2,306✔
757
  }
758
  taosRLockLatch(&pTopic->lock);
34,680✔
759

760
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-topic");
34,680✔
761
  MND_TMQ_NULL_CHECK(pTrans);
34,680✔
762

763
  mndTransSetDbName(pTrans, pTopic->db, NULL);
34,680✔
764
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
34,680✔
765
  mInfo("trans:%d, used to drop topic:%s, force:%d", pTrans->id, pTopic->name, dropReq.force);
34,680✔
766

767
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic));
34,680✔
768
  MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db));
31,566✔
769
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
31,566✔
770
  MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
31,566✔
771
  MND_TMQ_RETURN_CHECK(mndDropTopic(pMnode, pTrans, pReq, pTopic));
31,566✔
772
  auditRecord(pReq, pMnode->clusterId, "dropTopic", pTopic->db, dropReq.name, dropReq.sql, dropReq.sqlLen);
31,566✔
773
  code = TSDB_CODE_ACTION_IN_PROGRESS;
31,566✔
774

775
END:
36,986✔
776
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
36,986✔
777
    mError("%s failed, topic:%s since %s", __func__, dropReq.name, tstrerror(code));
3,114✔
778
  } else {
779
    mInfo("topic:%s dropped successfully", dropReq.name);
33,872✔
780
  }
781
  if (pTopic != NULL) {
36,986✔
782
    taosRUnLockLatch(&pTopic->lock);
34,680✔
783
  }
784
  mndReleaseTopic(pMnode, pTopic);
36,986✔
785
  mndTransDrop(pTrans);
36,986✔
786
  tFreeSMDropTopicReq(&dropReq);
36,986✔
787
  return code;
36,986✔
788
}
789

790
int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
116,435✔
791
  if (pMnode == NULL || dbName == NULL || pNumOfTopics == NULL) {
116,435✔
792
    return TSDB_CODE_INVALID_MSG;
×
793
  }
794
  *pNumOfTopics = 0;
116,435✔
795

796
  SSdb *  pSdb = pMnode->pSdb;
116,435✔
797
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
116,435✔
798
  if (pDb == NULL) {
116,435✔
799
    return TSDB_CODE_MND_DB_NOT_SELECTED;
×
800
  }
801

802
  int32_t numOfTopics = 0;
116,435✔
803
  void *  pIter = NULL;
116,435✔
804
  while (1) {
×
805
    SMqTopicObj *pTopic = NULL;
116,435✔
806
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
116,435✔
807
    if (pIter == NULL) {
116,435✔
808
      break;
116,435✔
809
    }
NEW
810
    taosRLockLatch(&pTopic->lock);
×
811
    if (pTopic->dbUid == pDb->uid) {
×
812
      numOfTopics++;
×
813
    }
NEW
814
    taosRUnLockLatch(&pTopic->lock);
×
815

816
    sdbRelease(pSdb, pTopic);
×
817
  }
818

819
  *pNumOfTopics = numOfTopics;
116,435✔
820
  mndReleaseDb(pMnode, pDb);
116,435✔
821
  return 0;
116,435✔
822
}
823

824
static void schemaToJson(SSchema *schema, int32_t nCols, char *schemaJson) {
33,524✔
825
  if (schema == NULL || schemaJson == NULL) {
33,524✔
826
    return;
×
827
  }
828
  char *  string = NULL;
33,524✔
829
  int32_t code = 0;
33,524✔
830
  int32_t lino = 0;
33,524✔
831

832
  cJSON *cbytes = NULL;
33,524✔
833
  cJSON *ctype = NULL;
33,524✔
834
  cJSON *cname = NULL;
33,524✔
835
  cJSON *column = NULL;
33,524✔
836
  cJSON *columns = cJSON_CreateArray();
33,524✔
837
  MND_TMQ_NULL_CHECK(columns);
33,524✔
838
  for (int i = 0; i < nCols; i++) {
366,636✔
839
    column = cJSON_CreateObject();
333,112✔
840
    MND_TMQ_NULL_CHECK(column);
333,112✔
841
    SSchema *s = schema + i;
333,112✔
842
    cname = cJSON_CreateString(s->name);
333,112✔
843
    MND_TMQ_NULL_CHECK(cname);
333,112✔
844
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToObject(column, "name", cname), 0);
333,112✔
845
    cname = NULL;  // ownership transferred to column object
333,112✔
846

847
    ctype = cJSON_CreateString(tDataTypes[s->type].name);
333,112✔
848
    MND_TMQ_NULL_CHECK(ctype);
333,112✔
849
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToObject(column, "type", ctype), 0);
333,112✔
850
    ctype = NULL;  // ownership transferred to column object
333,112✔
851

852
    int32_t length = 0;
333,112✔
853
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
333,112✔
854
      length = s->bytes - VARSTR_HEADER_SIZE;
70,506✔
855
    } else if (s->type == TSDB_DATA_TYPE_NCHAR || s->type == TSDB_DATA_TYPE_JSON) {
262,606✔
856
      length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
26,524✔
857
    } else {
858
      length = s->bytes;
236,082✔
859
    }
860
    cbytes = cJSON_CreateNumber(length);
333,112✔
861
    MND_TMQ_NULL_CHECK(cbytes);
333,112✔
862
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToObject(column, "length", cbytes), 0);
333,112✔
863
    cbytes = NULL;  // ownership transferred to column object
333,112✔
864

865
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToArray(columns, column), 0);
333,112✔
866
    column = NULL;  // ownership transferred to columns array
333,112✔
867
  }
868

869
END:
33,524✔
870
  string = cJSON_PrintUnformatted(columns);
33,524✔
871
  cJSON_Delete(columns);
33,524✔
872
  cJSON_Delete(column);
33,524✔
873
  cJSON_Delete(cname);
33,524✔
874
  cJSON_Delete(ctype);
33,524✔
875
  cJSON_Delete(cbytes);
33,524✔
876

877
  size_t len = strlen(string);
33,524✔
878
  if (string && len <= TSDB_SHOW_SCHEMA_JSON_LEN) {
33,524✔
879
    STR_TO_VARSTR(schemaJson, string);
33,524✔
880
  } else {
881
    mError("mndRetrieveTopic build schema error json:%p, json len:%zu", string, len);
×
NEW
882
    STR_TO_VARSTR(schemaJson, "NULL");
×
883
  }
884
  taosMemoryFree(string);
33,524✔
885
}
886

887
static int32_t buildResult(SMqTopicObj *pTopic, int32_t *numOfRows, SMnode *pMnode, SSDataBlock *pBlock) {
34,983✔
888
  SColumnInfoData *pColInfo = NULL;
34,983✔
889
  SName            n = {0};
34,983✔
890
  int32_t          cols = 0;
34,983✔
891
  char *           schemaJson = NULL;
34,983✔
892
  char *           sql = NULL;
34,983✔
893
  int32_t          code = 0;
34,983✔
894
  int32_t          lino = 0;
34,983✔
895

896
  taosRLockLatch(&pTopic->lock);
34,983✔
897

898
  char        topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
34,983✔
899
  const char *pName = mndGetDbStr(pTopic->name);
34,983✔
900
  STR_TO_VARSTR(topicName, pName);
34,983✔
901

902
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
34,983✔
903
  MND_TMQ_NULL_CHECK(pColInfo);
34,983✔
904
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topicName, false));
34,983✔
905

906
  char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
34,983✔
907
  MND_TMQ_RETURN_CHECK(tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB));
34,983✔
908
  MND_TMQ_RETURN_CHECK(tNameGetDbName(&n, varDataVal(dbName)));
34,983✔
909
  varDataSetLen(dbName, strlen(varDataVal(dbName)));
34,983✔
910

911
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
34,983✔
912
  MND_TMQ_NULL_CHECK(pColInfo);
34,983✔
913
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)dbName, false));
34,983✔
914

915
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
34,983✔
916
  MND_TMQ_NULL_CHECK(pColInfo);
34,983✔
917
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pTopic->createTime, false));
34,983✔
918

919
  sql = taosMemoryMalloc(strlen(pTopic->sql) + VARSTR_HEADER_SIZE);
34,983✔
920
  MND_TMQ_NULL_CHECK(sql);
34,983✔
921
  STR_TO_VARSTR(sql, pTopic->sql);
34,983✔
922

923
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
34,983✔
924
  MND_TMQ_NULL_CHECK(pColInfo);
34,983✔
925
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)sql, false));
34,983✔
926

927
  taosMemoryFreeClear(sql);
34,983✔
928

929
  schemaJson = taosMemoryMalloc(TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE);
34,983✔
930
  MND_TMQ_NULL_CHECK(schemaJson);
34,983✔
931
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
34,983✔
932
    schemaToJson(pTopic->schema.pSchema, pTopic->schema.nCols, schemaJson);
33,430✔
933
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
1,553✔
934
    SStbObj *pStb = mndAcquireStb(pMnode, pTopic->stbName);
94✔
935
    if (pStb == NULL) {
94✔
NEW
936
      STR_TO_VARSTR(schemaJson, "NULL");
×
NEW
937
      mError("mndRetrieveTopic mndAcquireStb null stbName:%s", pTopic->stbName);
×
938
    } else {
939
      schemaToJson(pStb->pColumns, pStb->numOfColumns, schemaJson);
94✔
940
      mndReleaseStb(pMnode, pStb);
94✔
941
    }
942
  } else {
943
    STR_TO_VARSTR(schemaJson, "NULL");
1,459✔
944
  }
945

946
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
34,983✔
947
  MND_TMQ_NULL_CHECK(pColInfo);
34,983✔
948
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)schemaJson, false));
34,983✔
949
  taosMemoryFreeClear(schemaJson);
34,983✔
950

951
  (*numOfRows)++;
34,983✔
952

953
END:
34,983✔
954
  taosRUnLockLatch(&pTopic->lock);
34,983✔
955

956
  taosMemoryFreeClear(sql);
34,983✔
957
  taosMemoryFreeClear(schemaJson);
34,983✔
958
  return code;
34,983✔
959
}
960

961
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
26,183✔
962
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
26,183✔
963
    return TSDB_CODE_INVALID_MSG;
×
964
  }
965
  SMnode *     pMnode = pReq->info.node;
26,183✔
966
  SSdb *       pSdb = pMnode->pSdb;
26,183✔
967
  int32_t      numOfRows = 0;
26,183✔
968
  SMqTopicObj *pTopic = NULL;
26,183✔
969
  int32_t      code = 0;
26,183✔
970
  int32_t      lino = 0;
26,183✔
971
  PRINT_LOG_START
26,183✔
972

973
  while (numOfRows < rowsCapacity) {
61,166✔
974
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
61,166✔
975
    if (pShow->pIter == NULL) break;
61,166✔
976

977
    MND_TMQ_RETURN_CHECK(buildResult(pTopic, &numOfRows, pMnode, pBlock));
34,983✔
978
    sdbRelease(pSdb, pTopic);
34,983✔
979
    pTopic = NULL;
34,983✔
980
  }
981

982
  pShow->numOfRows += numOfRows;
26,183✔
983

984
END:
26,183✔
985
  sdbCancelFetch(pSdb, pShow->pIter);
26,183✔
986
  sdbRelease(pSdb, pTopic);
26,183✔
987
  if (code != TSDB_CODE_SUCCESS) {
26,183✔
NEW
988
    mError("%s failed since %s", __func__, tstrerror(code));
×
NEW
989
    return code;
×
990
  } else {
991
    mDebug("%s retrieved %d rows successfully", __func__, numOfRows);
26,183✔
992
    return numOfRows;
26,183✔
993
  }
994
}
995

996
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
×
997
  if (pMnode == NULL) return;
×
998
  SSdb *pSdb = pMnode->pSdb;
×
999
  sdbCancelFetchByType(pSdb, pIter, SDB_TOPIC);
×
1000
}
1001

1002
bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) {
705,663✔
1003
  if (pMnode == NULL || pDb == NULL) {
705,663✔
1004
    return false;
×
1005
  }
1006
  SSdb *       pSdb = pMnode->pSdb;
705,663✔
1007
  void *       pIter = NULL;
705,663✔
1008
  SMqTopicObj *pTopic = NULL;
705,663✔
1009

1010
  while (1) {
3,204✔
1011
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
708,867✔
1012
    if (pIter == NULL) {
708,867✔
1013
      break;
705,555✔
1014
    }
1015

1016
    taosRLockLatch(&pTopic->lock);
3,312✔
1017
    bool found = pTopic->dbUid == pDb->uid;
3,312✔
1018
    taosRUnLockLatch(&pTopic->lock);
3,312✔
1019

1020
    if (found) {
3,312✔
1021
      sdbRelease(pSdb, pTopic);
108✔
1022
      sdbCancelFetch(pSdb, pIter);
108✔
1023
      return true;
108✔
1024
    }
1025

1026
    sdbRelease(pSdb, pTopic);
3,204✔
1027
  }
1028

1029
  return false;
705,555✔
1030
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc