• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4899

27 Dec 2025 07:32AM UTC coverage: 65.534% (+0.5%) from 65.061%
#4899

push

travis-ci

web-flow
test: remove semaphore test (#34071)

189567 of 289265 relevant lines covered (65.53%)

114701701.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.86
/source/dnode/mnode/impl/src/mndTopic.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *f
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "mndTopic.h"
17
#include "audit.h"
18
#include "mndConsumer.h"
19
#include "mndDb.h"
20
#include "mndDnode.h"
21
#include "mndMnode.h"
22
#include "mndPrivilege.h"
23
#include "mndShow.h"
24
#include "mndStb.h"
25
#include "mndSubscribe.h"
26
#include "mndTrans.h"
27
#include "mndUser.h"
28
#include "mndVgroup.h"
29
#include "osMemPool.h"
30
#include "parser.h"
31
#include "tlockfree.h"
32
#include "tname.h"
33

34
#define MND_TOPIC_VER_NUMBER   4
35
#define MND_TOPIC_RESERVE_SIZE 64
36

37
SHashObj *topicsToReload = NULL;
38

39
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
40
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
41

42
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
43
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
44
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic);
45
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
46
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
47

48
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
49
static void    mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
50
static int32_t processAst(SMqTopicObj *topicObj, const char *ast);
51

52
int32_t mndInitTopic(SMnode *pMnode) {
385,438✔
53
  SSdbTable table = {
385,438✔
54
      .sdbType = SDB_TOPIC,
55
      .keyType = SDB_KEY_BINARY,
56
      .encodeFp = (SdbEncodeFp)mndTopicActionEncode,
57
      .decodeFp = (SdbDecodeFp)mndTopicActionDecode,
58
      .insertFp = (SdbInsertFp)mndTopicActionInsert,
59
      .updateFp = (SdbUpdateFp)mndTopicActionUpdate,
60
      .deleteFp = (SdbDeleteFp)mndTopicActionDelete,
61
  };
62

63
  if (pMnode == NULL) {
385,438✔
64
    return TSDB_CODE_INVALID_PARA;
×
65
  }
66
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CREATE_TOPIC, mndProcessCreateTopicReq);
385,438✔
67
  mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_TOPIC, mndProcessDropTopicReq);
385,438✔
68

69
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
385,438✔
70
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
385,438✔
71

72
  return sdbSetTable(pMnode->pSdb, table);
385,438✔
73
}
74

75
void mndCleanupTopic(SMnode *pMnode) {}
385,375✔
76

77
void mndTopicGetShowName(const char *fullTopic, char *topic) {
67,530✔
78
  if (fullTopic == NULL) {
67,530✔
79
    return;
×
80
  }
81
  char *tmp = strchr(fullTopic, '.');
67,530✔
82
  if (tmp == NULL) {
67,530✔
83
    tstrncpy(topic, fullTopic, TSDB_TOPIC_FNAME_LEN);
×
84
  } else {
85
    tstrncpy(topic, tmp + 1, TSDB_TOPIC_FNAME_LEN);
67,530✔
86
  }
87
}
88

89
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
263,848✔
90
  if (pTopic == NULL) {
263,848✔
91
    return NULL;
×
92
  }
93
  int32_t code = 0;
263,848✔
94
  int32_t lino = 0;
263,848✔
95
  terrno = TSDB_CODE_OUT_OF_MEMORY;
263,848✔
96

97
  void *  swBuf = NULL;
263,848✔
98
  int32_t physicalPlanLen = 0;
263,848✔
99
  if (pTopic->physicalPlan) {
263,848✔
100
    physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
263,848✔
101
  }
102

103
  int32_t schemaLen = 0;
263,848✔
104
  if (pTopic->schema.nCols) {
263,848✔
105
    schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
411,222✔
106
  }
107

108
  int32_t  size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
263,848✔
109
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
263,848✔
110
  if (pRaw == NULL) {
263,848✔
111
    goto TOPIC_ENCODE_OVER;
×
112
  }
113

114
  int32_t dataPos = 0;
263,848✔
115
  SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
263,848✔
116
  SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER);
263,848✔
117
  SDB_SET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_ENCODE_OVER);
263,848✔
118
  SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER);
263,848✔
119
  SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER);
263,848✔
120
  SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER);
263,848✔
121
  SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER);
263,848✔
122
  SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
263,848✔
123
  SDB_SET_INT8(pRaw, dataPos, pTopic->subType, TOPIC_ENCODE_OVER);
263,848✔
124
  SDB_SET_INT8(pRaw, dataPos, pTopic->withMeta, TOPIC_ENCODE_OVER);
263,848✔
125

126
  SDB_SET_INT64(pRaw, dataPos, pTopic->stbUid, TOPIC_ENCODE_OVER);
263,848✔
127
  SDB_SET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER);
263,848✔
128
  SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
263,848✔
129
  SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
263,848✔
130
  SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
263,848✔
131
  if (physicalPlanLen) {
263,848✔
132
    SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
263,848✔
133
  }
134
  SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
263,848✔
135
  if (schemaLen) {
263,848✔
136
    swBuf = taosMemoryMalloc(schemaLen);
205,611✔
137
    if (swBuf == NULL) {
205,611✔
138
      goto TOPIC_ENCODE_OVER;
×
139
    }
140
    void *aswBuf = swBuf;
205,611✔
141
    if (taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema) < 0) {
411,222✔
142
      goto TOPIC_ENCODE_OVER;
×
143
    }
144
    SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
205,611✔
145
  }
146

147
  SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
263,848✔
148
  SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
263,848✔
149

150
  terrno = TSDB_CODE_SUCCESS;
263,848✔
151

152
TOPIC_ENCODE_OVER:
263,848✔
153
  if (swBuf) taosMemoryFree(swBuf);
263,848✔
154
  if (terrno != TSDB_CODE_SUCCESS) {
263,848✔
155
    mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr());
×
156
    sdbFreeRaw(pRaw);
×
157
    return NULL;
×
158
  }
159

160
  mDebug("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic);
263,848✔
161
  return pRaw;
263,848✔
162
}
163

164
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
228,498✔
165
  if (pRaw == NULL) return NULL;
228,498✔
166
  int32_t code = 0;
228,498✔
167
  int32_t lino = 0;
228,498✔
168
  terrno = TSDB_CODE_OUT_OF_MEMORY;
228,498✔
169
  SSdbRow *    pRow = NULL;
228,498✔
170
  SMqTopicObj *pTopic = NULL;
228,498✔
171
  void *       buf = NULL;
228,498✔
172
  char*        ast = NULL;
228,498✔
173

174
  int8_t sver = 0;
228,498✔
175
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
228,498✔
176

177
  if (sver < 1 || sver > MND_TOPIC_VER_NUMBER) {
228,498✔
178
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
179
    goto TOPIC_DECODE_OVER;
×
180
  }
181

182
  pRow = sdbAllocRow(sizeof(SMqTopicObj));
228,498✔
183
  if (pRow == NULL) goto TOPIC_DECODE_OVER;
228,498✔
184

185
  pTopic = sdbGetRowObj(pRow);
228,498✔
186
  if (pTopic == NULL) goto TOPIC_DECODE_OVER;
228,498✔
187

188
  int32_t len = 0;
228,498✔
189
  int32_t dataPos = 0;
228,498✔
190
  SDB_GET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_DECODE_OVER);
228,498✔
191
  SDB_GET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_DECODE_OVER);
228,498✔
192
  if (sver >= 2) {
228,498✔
193
    SDB_GET_BINARY(pRaw, dataPos, pTopic->createUser, TSDB_USER_LEN, TOPIC_DECODE_OVER);
228,498✔
194
  }
195
  SDB_GET_INT64(pRaw, dataPos, &pTopic->createTime, TOPIC_DECODE_OVER);
228,498✔
196
  SDB_GET_INT64(pRaw, dataPos, &pTopic->updateTime, TOPIC_DECODE_OVER);
228,498✔
197
  SDB_GET_INT64(pRaw, dataPos, &pTopic->uid, TOPIC_DECODE_OVER);
228,498✔
198
  SDB_GET_INT64(pRaw, dataPos, &pTopic->dbUid, TOPIC_DECODE_OVER);
228,498✔
199
  SDB_GET_INT32(pRaw, dataPos, &pTopic->version, TOPIC_DECODE_OVER);
228,498✔
200
  SDB_GET_INT8(pRaw, dataPos, &pTopic->subType, TOPIC_DECODE_OVER);
228,498✔
201
  SDB_GET_INT8(pRaw, dataPos, &pTopic->withMeta, TOPIC_DECODE_OVER);
228,498✔
202

203
  SDB_GET_INT64(pRaw, dataPos, &pTopic->stbUid, TOPIC_DECODE_OVER);
228,498✔
204
  if (sver >= 3) {
228,498✔
205
    SDB_GET_BINARY(pRaw, dataPos, pTopic->stbName, TSDB_TABLE_FNAME_LEN, TOPIC_DECODE_OVER);
228,498✔
206
  }
207
  SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
228,498✔
208
  pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
228,498✔
209
  if (pTopic->sql == NULL) {
228,498✔
210
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
211
    goto TOPIC_DECODE_OVER;
×
212
  }
213
  SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER);
228,498✔
214

215
  if (sver < 4) {
228,498✔
216
    int32_t astLen = 0;
×
217
    SDB_GET_INT32(pRaw, dataPos, &astLen, TOPIC_DECODE_OVER);
×
218
    if (astLen) {
×
219
      ast = taosMemoryCalloc(astLen, sizeof(char));
×
220
      if (ast == NULL) {
×
221
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
222
        goto TOPIC_DECODE_OVER;
×
223
      }
224
      SDB_GET_BINARY(pRaw, dataPos, ast, astLen, TOPIC_DECODE_OVER);
×
225
      terrno = processAst(pTopic, ast);
×
226
      if (terrno != TSDB_CODE_SUCCESS) {
×
227
        goto TOPIC_DECODE_OVER;
×
228
      }
229
    }
230
  } else {
231
    SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
228,498✔
232
    if (len) {
228,498✔
233
      pTopic->physicalPlan = taosMemoryCalloc(len, sizeof(char));
228,498✔
234
      if (pTopic->physicalPlan == NULL) {
228,498✔
235
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
236
        goto TOPIC_DECODE_OVER;
×
237
      }
238
      SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
228,498✔
239
    } else {
240
      pTopic->physicalPlan = NULL;
×
241
    }
242

243
    SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
228,498✔
244
    if (len) {
228,498✔
245
      buf = taosMemoryMalloc(len);
179,812✔
246
      if (buf == NULL) {
179,812✔
247
        terrno = TSDB_CODE_OUT_OF_MEMORY;
×
248
        goto TOPIC_DECODE_OVER;
×
249
      }
250
      SDB_GET_BINARY(pRaw, dataPos, buf, len, TOPIC_DECODE_OVER);
179,812✔
251
      if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
359,624✔
252
        goto TOPIC_DECODE_OVER;
×
253
      }
254
    } else {
255
      pTopic->schema.nCols = 0;
48,686✔
256
      pTopic->schema.version = 0;
48,686✔
257
      pTopic->schema.pSchema = NULL;
48,686✔
258
    }
259
  }
260

261
  SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
228,498✔
262
  terrno = TSDB_CODE_SUCCESS;
228,498✔
263

264
TOPIC_DECODE_OVER:
228,498✔
265
  taosMemoryFreeClear(buf);
228,498✔
266
  taosMemoryFreeClear(ast);
228,498✔
267

268
  if (terrno != TSDB_CODE_SUCCESS) {
228,498✔
269
    mError("topic:%s, failed to decode from raw:%p since %s", pTopic == NULL ? "null" : pTopic->name, pRaw, terrstr());
×
270
    taosMemoryFreeClear(pRow);
×
271
    return NULL;
×
272
  }
273

274
  mDebug("topic:%s, decode from raw:%p, row:%p", pTopic->name, pRaw, pTopic);
228,498✔
275
  return pRow;
228,498✔
276
}
277

278
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
135,773✔
279
  mDebug("topic:%s perform insert action", pTopic != NULL ? pTopic->name : "null");
135,773✔
280
  return 0;
135,773✔
281
}
282

283
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
228,498✔
284
  if (pTopic == NULL) return 0;
228,498✔
285
  mDebug("%p topic:%s perform delete action", pTopic, pTopic->name);
228,498✔
286
  taosMemoryFreeClear(pTopic->sql);
228,498✔
287
  taosMemoryFreeClear(pTopic->physicalPlan);
228,498✔
288
  if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
228,498✔
289
  return 0;
228,498✔
290
}
291

292
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
×
293
  if (pOldTopic == NULL || pNewTopic == NULL) return 0;
×
294
  mDebug("topic:%s perform update action", pOldTopic->name);
×
295
  taosWLockLatch(&pOldTopic->lock);
×
296
  SMqTopicObj tmpTopic = *pOldTopic;
×
297
  (void)memcpy(pOldTopic, pNewTopic, offsetof(SMqTopicObj, lock));
×
298
  *pNewTopic = tmpTopic;
×
299
  taosWUnLockLatch(&pOldTopic->lock);
×
300
  return 0;
×
301
}
302

303
int32_t mndAcquireTopic(SMnode *pMnode, const char *topicName, SMqTopicObj **pTopic) {
2,123,365✔
304
  if (pMnode == NULL || topicName == NULL || pTopic == NULL) {
2,123,365✔
305
    return TSDB_CODE_INVALID_PARA;
×
306
  }
307
  SSdb *pSdb = pMnode->pSdb;
2,123,365✔
308
  *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
2,123,365✔
309
  if (*pTopic == NULL) {
2,123,365✔
310
    return TSDB_CODE_MND_TOPIC_NOT_EXIST;
126,664✔
311
  }
312
  return TSDB_CODE_SUCCESS;
1,996,701✔
313
}
314

315
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
2,201,931✔
316
  if (pMnode == NULL) return;
2,201,931✔
317
  SSdb *pSdb = pMnode->pSdb;
2,201,931✔
318
  sdbRelease(pSdb, pTopic);
2,201,931✔
319
}
320

321
static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
119,093✔
322
  if (pCreate == NULL) return TSDB_CODE_INVALID_PARA;
119,093✔
323
  if (pCreate->sql == NULL) return TSDB_CODE_MND_INVALID_TOPIC;
119,093✔
324

325
  if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
119,093✔
326
    if (pCreate->ast == NULL || pCreate->ast[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
88,148✔
327
  } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
30,945✔
328
    if (pCreate->subStbName[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
8,730✔
329
  } else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {
22,215✔
330
    if (pCreate->subDbName[0] == 0) return TSDB_CODE_MND_INVALID_TOPIC;
22,215✔
331
  }
332

333
  return 0;
119,093✔
334
}
335

336
static int32_t processAst(SMqTopicObj *topicObj, const char *ast) {
115,797✔
337
  SNode *     pAst = NULL;
115,797✔
338
  SQueryPlan *pPlan = NULL;
115,797✔
339
  int32_t     code = TSDB_CODE_SUCCESS;
115,797✔
340
  int32_t     lino = 0;
115,797✔
341

342
  PRINT_LOG_START
115,797✔
343
  if (ast == NULL) {
115,797✔
344
    topicObj->physicalPlan = taosStrdup("");
25,600✔
345
    goto END;
25,600✔
346
  }
347
  qDebugL("%s topic:%s ast %s", __func__, topicObj->name, ast);
90,197✔
348
  MND_TMQ_RETURN_CHECK(nodesStringToNode(ast, &pAst));
90,197✔
349
  MND_TMQ_RETURN_CHECK(qExtractResultSchema(pAst, &topicObj->schema.nCols, &topicObj->schema.pSchema));
90,197✔
350

351
  SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
90,197✔
352
  MND_TMQ_RETURN_CHECK(qCreateQueryPlan(&cxt, &pPlan, NULL));
90,197✔
353
  if (pPlan == NULL) {
90,197✔
354
    code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
355
    goto END;
×
356
  }
357
  int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
90,197✔
358
  if (levelNum != 1) {
90,197✔
359
    code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
360
    goto END;
×
361
  }
362

363
  SNodeListNode *pNodeListNode = (SNodeListNode *)nodesListGetNode(pPlan->pSubplans, 0);
90,197✔
364
  MND_TMQ_NULL_CHECK(pNodeListNode);
90,197✔
365
  int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList);
90,197✔
366
  if (opNum != 1) {
90,197✔
367
    code = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
×
368
    goto END;
×
369
  }
370

371
  code = nodesNodeToString(nodesListGetNode(pNodeListNode->pNodeList, 0), false, &topicObj->physicalPlan, NULL);
90,197✔
372

373
END:
115,797✔
374
  nodesDestroyNode(pAst);
115,797✔
375
  qDestroyQueryPlan(pPlan);
115,797✔
376
  PRINT_LOG_END
115,797✔
377
  return code;
115,797✔
378
}
379

380
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
116,502✔
381
                              const char *userName) {
382
  if (pMnode == NULL || pReq == NULL || pCreate == NULL || pDb == NULL || userName == NULL)
116,502✔
383
    return TSDB_CODE_INVALID_PARA;
×
384
  STrans *    pTrans = NULL;
116,502✔
385
  int32_t     code = 0;
116,502✔
386
  int32_t     lino = 0;
116,502✔
387
  SMqTopicObj topicObj = {0};
116,502✔
388

389
  PRINT_LOG_START
116,502✔
390
  mInfo("start to create topic:%s", pCreate->name);
116,502✔
391
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-topic");
116,502✔
392
  MND_TMQ_NULL_CHECK(pTrans);
116,502✔
393
  mndTransSetDbName(pTrans, pDb->name, NULL);
116,502✔
394
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
116,502✔
395

396
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
116,502✔
397
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
116,502✔
398
  tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
116,502✔
399

400
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_TOPIC, &topicObj));
116,502✔
401

402
  topicObj.createTime = taosGetTimestampMs();
115,797✔
403
  topicObj.updateTime = topicObj.createTime;
115,797✔
404
  topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
115,797✔
405
  topicObj.dbUid = pDb->uid;
115,797✔
406
  topicObj.version = 1;
115,797✔
407
  topicObj.sql = taosStrdup(pCreate->sql);
115,797✔
408
  MND_TMQ_NULL_CHECK(topicObj.sql);
115,797✔
409
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
115,797✔
410
  topicObj.subType = pCreate->subType;
115,797✔
411
  topicObj.withMeta = pCreate->withMeta;
115,797✔
412
  taosInitRWLatch(&topicObj.lock);
115,797✔
413

414
  MND_TMQ_RETURN_CHECK(processAst(&topicObj, pCreate->ast));
115,797✔
415

416
  if (pCreate->subStbName[0] != 0) {
115,797✔
417
    tstrncpy(topicObj.stbName, pCreate->subStbName, TSDB_TABLE_FNAME_LEN);
86,484✔
418
    SStbObj *pStb = mndAcquireStb(pMnode, topicObj.stbName);
86,484✔
419
    MND_TMQ_NULL_CHECK(pStb);
86,484✔
420
    topicObj.stbUid = pStb->uid;
85,152✔
421
    mndReleaseStb(pMnode, pStb);
85,152✔
422
  }
423

424
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
114,465✔
425
  MND_TMQ_NULL_CHECK(pCommitRaw);
114,465✔
426
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
114,465✔
427
  if (code != 0) {
114,465✔
428
    sdbFreeRaw(pCommitRaw);
×
429
    goto END;
×
430
  }
431

432
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
114,465✔
433
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
114,465✔
434

435
END:
116,502✔
436
  PRINT_LOG_END
116,502✔
437
  taosMemoryFreeClear(topicObj.sql);
116,502✔
438
  taosMemoryFreeClear(topicObj.physicalPlan);
116,502✔
439
  if (topicObj.schema.nCols) {
116,502✔
440
    taosMemoryFreeClear(topicObj.schema.pSchema);
90,197✔
441
  }
442
  mndTransDrop(pTrans);
116,502✔
443
  return code;
116,502✔
444
}
445

446
static int32_t mndReloadTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
×
447
                              const char *userName, SMqTopicObj *topicObjOri) {
448
  if (pMnode == NULL || pReq == NULL || pCreate == NULL || pDb == NULL || userName == NULL)
×
449
    return TSDB_CODE_INVALID_PARA;
×
450
  STrans *    pTrans = NULL;
×
451
  int32_t     code = 0;
×
452
  int32_t     lino = 0;
×
453
  SMqTopicObj topicObj = {0};
×
454

455
  PRINT_LOG_START
×
456
  mInfo("start to reload topic:%s", pCreate->name);
×
457
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "reload-topic");
×
458
  MND_TMQ_NULL_CHECK(pTrans);
×
459
  mndTransSetDbName(pTrans, pDb->name, NULL);
×
460
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
×
461

462
  tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
×
463
  tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
×
464
  tstrncpy(topicObj.createUser, userName, TSDB_USER_LEN);
×
465

466
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_TOPIC, &topicObj));
×
467

468
  taosRLockLatch(&topicObjOri->lock);
×
469
  topicObj.createTime = topicObjOri->createTime;
×
470
  topicObj.updateTime = taosGetTimestampMs();
×
471
  topicObj.uid = topicObjOri->uid;
×
472
  topicObj.dbUid = pDb->uid;
×
473
  topicObj.version = topicObjOri->version + 1;
×
474
  topicObj.sql = taosStrdup(pCreate->sql);
×
475
  topicObj.sqlLen = strlen(pCreate->sql) + 1;
×
476
  topicObj.subType = pCreate->subType;
×
477
  topicObj.withMeta = pCreate->withMeta;
×
478
  taosInitRWLatch(&topicObj.lock);
×
479
  taosRUnLockLatch(&topicObjOri->lock);
×
480

481
  MND_TMQ_RETURN_CHECK(processAst(&topicObj, pCreate->ast));
×
482

483
  if (pCreate->subStbName[0] != 0) {
×
484
    tstrncpy(topicObj.stbName, pCreate->subStbName, TSDB_TABLE_FNAME_LEN);
×
485
    SStbObj *pStb = mndAcquireStb(pMnode, topicObj.stbName);
×
486
    MND_TMQ_NULL_CHECK(pStb);
×
487
    topicObj.stbUid = pStb->uid;
×
488
    mndReleaseStb(pMnode, pStb);
×
489
  }
490

491
  SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
×
492
  MND_TMQ_NULL_CHECK(pCommitRaw);
×
493
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
×
494
  if (code != 0) {
×
495
    sdbFreeRaw(pCommitRaw);
×
496
    goto END;
×
497
  }
498

499
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
500
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
×
501

502
END:
×
503
  PRINT_LOG_END
×
504
  taosMemoryFreeClear(topicObj.sql);
×
505
  taosMemoryFreeClear(topicObj.physicalPlan);
×
506
  if (topicObj.schema.nCols) {
×
507
    taosMemoryFreeClear(topicObj.schema.pSchema);
×
508
  }
509
  mndTransDrop(pTrans);
×
510
  return code;
×
511
}
512

513
static int32_t creatTopic(SRpcMsg *pReq, SCMCreateTopicReq *createTopicReq) {
119,093✔
514
  SMqTopicObj *pTopic = NULL;
119,093✔
515
  SDbObj *     pDb = NULL;
119,093✔
516
  int32_t      code = TSDB_CODE_SUCCESS;
119,093✔
517
  int32_t      lino = 0;
119,093✔
518
  SMnode *     pMnode = pReq->info.node;
119,093✔
519
  int64_t      tss = taosGetTimestampMs();
119,093✔
520

521
  PRINT_LOG_START
119,093✔
522
  mInfo("topic:%s start to create, sql:%s", createTopicReq->name, createTopicReq->sql);
119,093✔
523
  code = mndAcquireTopic(pMnode, createTopicReq->name, &pTopic);
119,093✔
524
  if (code == TSDB_CODE_SUCCESS) {
119,093✔
525
    mndReleaseTopic(pMnode, pTopic);
656✔
526
    if (createTopicReq->igExists) {
656✔
527
      mInfo("topic:%s already exist, ignore exist is set", createTopicReq->name);
656✔
528
    } else {
529
      code = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
×
530
    }
531
    goto END;
656✔
532
  } else if (code != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
118,437✔
533
    goto END;
×
534
  }
535

536
  pDb = mndAcquireDb(pMnode, createTopicReq->subDbName);
118,437✔
537
  MND_TMQ_NULL_CHECK(pDb);
118,437✔
538

539
  if (pDb->cfg.walRetentionPeriod == 0) {
117,831✔
540
    code = TSDB_CODE_MND_DB_RETENTION_PERIOD_ZERO;
×
541
    goto END;
×
542
  }
543

544
  if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum) {
117,831✔
545
    code = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE;
1,329✔
546
    goto END;
1,329✔
547
  }
548

549
  MND_TMQ_RETURN_CHECK(grantCheck(TSDB_GRANT_SUBSCRIPTION));
116,502✔
550
  MND_TMQ_RETURN_CHECK(mndCreateTopic(pMnode, pReq, createTopicReq, pDb, RPC_MSG_USER(pReq)));
116,502✔
551
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
114,465✔
552
    int64_t tse = taosGetTimestampMs();
114,465✔
553
    double  duration = (double)(tse - tss);
114,465✔
554
    duration = duration / 1000;
114,465✔
555
    auditRecord(pReq, pMnode->clusterId, "createTopic", createTopicReq->subDbName, createTopicReq->name,
114,465✔
556
                createTopicReq->sql, strlen(createTopicReq->sql), duration, 0);
114,465✔
557
  }
558
  code = TSDB_CODE_ACTION_IN_PROGRESS;
114,465✔
559

560
END:
119,093✔
561
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
119,093✔
562
    mError("%s failed, topic:%s since %s", __func__, createTopicReq->name, tstrerror(code));
3,972✔
563
  } else {
564
    mInfo("topic:%s create successfully", createTopicReq->name);
115,121✔
565
  }
566
  mndReleaseDb(pMnode, pDb);
119,093✔
567
  return code;
119,093✔
568
}
569

570
static int32_t reloadTopic(SRpcMsg *pReq, SCMCreateTopicReq *createTopicReq) {
×
571
  SMnode *     pMnode = pReq->info.node;
×
572
  int32_t      code = TSDB_CODE_SUCCESS;
×
573
  int32_t      lino = 0;
×
574
  SDbObj *     pDb = NULL;
×
575
  SMqTopicObj *pTopic = NULL;
×
576
  int64_t      tss = taosGetTimestampMs();
×
577

578
  PRINT_LOG_START
×
579
  code = mndAcquireTopic(pMnode, createTopicReq->name, &pTopic);
×
580
  if (code != 0) {
×
581
    if (createTopicReq->igExists) {
×
582
      mInfo("topic:%s, not exist, ignore not exist is set", createTopicReq->name);
×
583
      code = 0;
×
584
      goto END;
×
585
    } else {
586
      mError("topic:%s, failed to reload since %s", createTopicReq->name, tstrerror(code));
×
587
      goto END;
×
588
    }
589
  }
590

591
  pDb = mndAcquireDb(pMnode, createTopicReq->subDbName);
×
592
  MND_TMQ_NULL_CHECK(pDb);
×
593

594
  MND_TMQ_RETURN_CHECK(grantCheck(TSDB_GRANT_SUBSCRIPTION));
×
595
  MND_TMQ_RETURN_CHECK(mndReloadTopic(pMnode, pReq, createTopicReq, pDb, RPC_MSG_USER(pReq), pTopic));
×
596

597
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
×
598
    int64_t tse = taosGetTimestampMs();
×
599
    double  duration = (double)(tse - tss);
×
600
    duration = duration / 1000;
×
601
    auditRecord(pReq, pMnode->clusterId, "reloadTopic", createTopicReq->subDbName, createTopicReq->name,
×
602
                createTopicReq->sql, strlen(createTopicReq->sql), duration, 0);
×
603
  }
604

605
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
606

607
  if (topicsToReload == NULL) {
×
608
    topicsToReload = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
×
609
    MND_TMQ_NULL_CHECK(topicsToReload);
×
610
  }
611
  MND_TMQ_RETURN_CHECK(
×
612
      taosHashPut(topicsToReload, createTopicReq->name, strlen(createTopicReq->name), createTopicReq->name, 1));
613
  mInfo("topic:%s, marked to reload", createTopicReq->name);
×
614

615
END:
×
616
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
617
    mError("%s failed, topic:%s since %s", __func__, createTopicReq->name, tstrerror(code));
×
618
  } else {
619
    mInfo("topic:%s create successfully", createTopicReq->name);
×
620
  }
621
  mndReleaseTopic(pMnode, pTopic);
×
622
  mndReleaseDb(pMnode, pDb);
×
623

624
  return code;
×
625
}
626

627
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
119,093✔
628
  if (pReq == NULL || pReq->contLen <= 0) {
119,093✔
629
    return TSDB_CODE_INVALID_MSG;
×
630
  }
631
  SMnode *pMnode = pReq->info.node;
119,093✔
632
  int32_t code = TSDB_CODE_SUCCESS;
119,093✔
633
  int32_t lino = 0;
119,093✔
634

635
  SCMCreateTopicReq createTopicReq = {0};
119,093✔
636

637
  PRINT_LOG_START
119,093✔
638
  MND_TMQ_RETURN_CHECK(tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq));
119,093✔
639

640
  mInfo("topic:%s start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
119,093✔
641

642
  MND_TMQ_RETURN_CHECK(mndCheckCreateTopicReq(&createTopicReq));
119,093✔
643

644
  if (createTopicReq.reload) {
119,093✔
645
    MND_TMQ_RETURN_CHECK(reloadTopic(pReq, &createTopicReq));
×
646
  } else {
647
    MND_TMQ_RETURN_CHECK(creatTopic(pReq, &createTopicReq));
119,093✔
648
  }
649

650
END:
119,039✔
651
  tFreeSCMCreateTopicReq(&createTopicReq);
119,093✔
652
  return code;
119,093✔
653
}
654

655
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
92,233✔
656
  if (pMnode == NULL || pTrans == NULL || pReq == NULL || pTopic == NULL) {
92,233✔
657
    return TSDB_CODE_INVALID_MSG;
×
658
  }
659
  int32_t  code = 0;
92,233✔
660
  int32_t  lino = 0;
92,233✔
661
  SSdbRaw *pCommitRaw = NULL;
92,233✔
662
  PRINT_LOG_START
92,233✔
663
  MND_TMQ_RETURN_CHECK(mndUserRemoveTopic(pMnode, pTrans, pTopic->name));
92,233✔
664
  pCommitRaw = mndTopicActionEncode(pTopic);
92,233✔
665
  MND_TMQ_NULL_CHECK(pCommitRaw);
92,233✔
666
  code = mndTransAppendCommitlog(pTrans, pCommitRaw);
92,233✔
667
  if (code != 0) {
92,233✔
668
    sdbFreeRaw(pCommitRaw);
×
669
    goto END;
×
670
  }
671
  MND_TMQ_RETURN_CHECK(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
92,233✔
672
  MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
92,233✔
673

674
END:
92,233✔
675
  PRINT_LOG_END
92,233✔
676
  return code;
92,233✔
677
}
678

679
bool checkTopic(SArray *topics, char *topicName) {
90,237✔
680
  if (topics == NULL || topicName == NULL) {
90,237✔
681
    return false;
×
682
  }
683
  int32_t sz = taosArrayGetSize(topics);
90,237✔
684
  for (int32_t i = 0; i < sz; i++) {
90,816✔
685
    char *name = taosArrayGetP(topics, i);
1,522✔
686
    if (name && strcmp(name, topicName) == 0) {
1,522✔
687
      return true;
943✔
688
    }
689
  }
690
  return false;
89,294✔
691
}
692

693
static int32_t checkConsumer(STrans *pTrans, SMqConsumerObj *pConsumer, bool deleteConsumer, char *topicName) {
29,534✔
694
  int32_t         code = 0;
29,534✔
695
  int32_t         lino = 0;
29,534✔
696
  SMqConsumerObj *pConsumerNew = NULL;
29,534✔
697

698
  taosRLockLatch(&pConsumer->lock);
29,534✔
699
  bool found1 = checkTopic(pConsumer->assignedTopics, topicName);
29,534✔
700
  bool found2 = checkTopic(pConsumer->rebRemovedTopics, topicName);
29,534✔
701
  bool found3 = checkTopic(pConsumer->rebNewTopics, topicName);
29,534✔
702
  if (found1 || found2 || found3) {
29,534✔
703
    if (deleteConsumer) {
640✔
704
      MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, CONSUMER_CLEAR, NULL, NULL, &pConsumerNew));
640✔
705
      MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
640✔
706
      tDeleteSMqConsumerObj(pConsumerNew);
640✔
707
      pConsumerNew = NULL;
640✔
708
    } else {
709
      mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", topicName,
×
710
             pConsumer->consumerId, pConsumer->cgroup);
711
      code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
×
712
      goto END;
×
713
    }
714
  }
715
END:
29,534✔
716
  taosRUnLockLatch(&pConsumer->lock);
29,534✔
717
  tDeleteSMqConsumerObj(pConsumerNew);
29,534✔
718
  return code;
29,534✔
719
}
720

721
static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName, bool deleteConsumer) {
92,233✔
722
  if (pMnode == NULL || pTrans == NULL || topicName == NULL) {
92,233✔
723
    return TSDB_CODE_INVALID_MSG;
×
724
  }
725
  int32_t         code = 0;
92,233✔
726
  int32_t         lino = 0;
92,233✔
727
  SSdb *          pSdb = pMnode->pSdb;
92,233✔
728
  void *          pIter = NULL;
92,233✔
729
  SMqConsumerObj *pConsumer = NULL;
92,233✔
730

731
  PRINT_LOG_START
92,233✔
732
  while (1) {
733
    pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
121,767✔
734
    if (pIter == NULL) {
121,767✔
735
      break;
92,233✔
736
    }
737

738
    MND_TMQ_RETURN_CHECK(checkConsumer(pTrans, pConsumer, deleteConsumer, topicName));
29,534✔
739
    sdbRelease(pSdb, pConsumer);
29,534✔
740
  }
741

742
END:
92,233✔
743
  PRINT_LOG_END
92,233✔
744
  sdbRelease(pSdb, pConsumer);
92,233✔
745
  sdbCancelFetch(pSdb, pIter);
92,233✔
746
  return code;
92,233✔
747
}
748

749
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
100,235✔
750
  if (pReq == NULL) {
100,235✔
751
    return TSDB_CODE_INVALID_MSG;
×
752
  }
753
  SMnode *       pMnode = pReq->info.node;
100,235✔
754
  SMDropTopicReq dropReq = {0};
100,235✔
755
  int32_t        code = 0;
100,235✔
756
  int32_t        lino = 0;
100,235✔
757
  SMqTopicObj *  pTopic = NULL;
100,235✔
758
  STrans *       pTrans = NULL;
100,235✔
759
  int64_t        tss = taosGetTimestampMs();
100,235✔
760

761
  PRINT_LOG_START
100,235✔
762
  MND_TMQ_RETURN_CHECK(tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq));
100,235✔
763

764
  code = mndAcquireTopic(pMnode, dropReq.name, &pTopic);
100,235✔
765
  if (code != 0) {
100,235✔
766
    if (dropReq.igNotExists) {
6,745✔
767
      mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name);
6,745✔
768
      code = 0;
6,745✔
769
    }
770
    goto END;
6,745✔
771
  }
772
  taosRLockLatch(&pTopic->lock);
93,490✔
773

774
  pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-topic");
93,490✔
775
  MND_TMQ_NULL_CHECK(pTrans);
93,490✔
776

777
  mndTransSetDbName(pTrans, pTopic->db, NULL);
93,490✔
778
  MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
93,490✔
779
  mInfo("trans:%d, used to drop topic:%s, force:%d", pTrans->id, pTopic->name, dropReq.force);
93,490✔
780

781
  MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_TOPIC, pTopic));
93,490✔
782
  MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_READ_DB, pTopic->db));
92,233✔
783
  MND_TMQ_RETURN_CHECK(mndCheckConsumerByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
92,233✔
784
  MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
92,233✔
785
  MND_TMQ_RETURN_CHECK(mndDropTopic(pMnode, pTrans, pReq, pTopic));
92,233✔
786
  if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) {
92,233✔
787
    int64_t tse = taosGetTimestampMs();
92,233✔
788
    double  duration = (double)(tse - tss);
92,233✔
789
    duration = duration / 1000;
92,233✔
790
    auditRecord(pReq, pMnode->clusterId, "dropTopic", pTopic->db, dropReq.name, dropReq.sql, dropReq.sqlLen, duration,
92,233✔
791
                0);
792
  }
793

794
  code = TSDB_CODE_ACTION_IN_PROGRESS;
92,233✔
795

796
END:
100,235✔
797
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
100,235✔
798
    mError("%s failed, topic:%s since %s", __func__, dropReq.name, tstrerror(code));
1,257✔
799
  } else {
800
    mInfo("topic:%s dropped successfully", dropReq.name);
98,978✔
801
  }
802
  if (pTopic != NULL) {
100,235✔
803
    taosRUnLockLatch(&pTopic->lock);
93,490✔
804
  }
805
  mndReleaseTopic(pMnode, pTopic);
100,235✔
806
  mndTransDrop(pTrans);
100,235✔
807
  tFreeSMDropTopicReq(&dropReq);
100,235✔
808
  return code;
100,235✔
809
}
810

811
int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
108,159✔
812
  if (pMnode == NULL || dbName == NULL || pNumOfTopics == NULL) {
108,159✔
813
    return TSDB_CODE_INVALID_MSG;
×
814
  }
815
  *pNumOfTopics = 0;
108,159✔
816

817
  SSdb *  pSdb = pMnode->pSdb;
108,159✔
818
  SDbObj *pDb = mndAcquireDb(pMnode, dbName);
108,159✔
819
  if (pDb == NULL) {
108,159✔
820
    return TSDB_CODE_MND_DB_NOT_SELECTED;
×
821
  }
822

823
  int32_t numOfTopics = 0;
108,159✔
824
  void *  pIter = NULL;
108,159✔
825
  while (1) {
×
826
    SMqTopicObj *pTopic = NULL;
108,159✔
827
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
108,159✔
828
    if (pIter == NULL) {
108,159✔
829
      break;
108,159✔
830
    }
831
    taosRLockLatch(&pTopic->lock);
×
832
    if (pTopic->dbUid == pDb->uid) {
×
833
      numOfTopics++;
×
834
    }
835
    taosRUnLockLatch(&pTopic->lock);
×
836

837
    sdbRelease(pSdb, pTopic);
×
838
  }
839

840
  *pNumOfTopics = numOfTopics;
108,159✔
841
  mndReleaseDb(pMnode, pDb);
108,159✔
842
  return 0;
108,159✔
843
}
844

845
static void schemaToJson(SSchema *schema, int32_t nCols, char *schemaJson) {
45,550✔
846
  if (schema == NULL || schemaJson == NULL) {
45,550✔
847
    return;
×
848
  }
849
  char *  string = NULL;
45,550✔
850
  int32_t code = 0;
45,550✔
851
  int32_t lino = 0;
45,550✔
852

853
  cJSON *cbytes = NULL;
45,550✔
854
  cJSON *ctype = NULL;
45,550✔
855
  cJSON *cname = NULL;
45,550✔
856
  cJSON *column = NULL;
45,550✔
857
  cJSON *columns = cJSON_CreateArray();
45,550✔
858
  MND_TMQ_NULL_CHECK(columns);
45,550✔
859
  for (int i = 0; i < nCols; i++) {
510,170✔
860
    column = cJSON_CreateObject();
464,620✔
861
    MND_TMQ_NULL_CHECK(column);
464,620✔
862
    SSchema *s = schema + i;
464,620✔
863
    cname = cJSON_CreateString(s->name);
464,620✔
864
    MND_TMQ_NULL_CHECK(cname);
464,620✔
865
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToObject(column, "name", cname), 0);
464,620✔
866
    cname = NULL;  // ownership transferred to column object
464,620✔
867

868
    ctype = cJSON_CreateString(tDataTypes[s->type].name);
464,620✔
869
    MND_TMQ_NULL_CHECK(ctype);
464,620✔
870
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToObject(column, "type", ctype), 0);
464,620✔
871
    ctype = NULL;  // ownership transferred to column object
464,620✔
872

873
    int32_t length = 0;
464,620✔
874
    if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
464,620✔
875
      length = s->bytes - VARSTR_HEADER_SIZE;
86,579✔
876
    } else if (s->type == TSDB_DATA_TYPE_NCHAR || s->type == TSDB_DATA_TYPE_JSON) {
378,041✔
877
      length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
59,098✔
878
    } else {
879
      length = s->bytes;
318,943✔
880
    }
881
    cbytes = cJSON_CreateNumber(length);
464,620✔
882
    MND_TMQ_NULL_CHECK(cbytes);
464,620✔
883
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToObject(column, "length", cbytes), 0);
464,620✔
884
    cbytes = NULL;  // ownership transferred to column object
464,620✔
885

886
    MND_TMQ_CONDITION_CHECK(cJSON_AddItemToArray(columns, column), 0);
464,620✔
887
    column = NULL;  // ownership transferred to columns array
464,620✔
888
  }
889

890
END:
45,550✔
891
  string = cJSON_PrintUnformatted(columns);
45,550✔
892
  cJSON_Delete(columns);
45,550✔
893
  cJSON_Delete(column);
45,550✔
894
  cJSON_Delete(cname);
45,550✔
895
  cJSON_Delete(ctype);
45,550✔
896
  cJSON_Delete(cbytes);
45,550✔
897

898
  size_t len = strlen(string);
45,550✔
899
  if (string && len <= TSDB_SHOW_SCHEMA_JSON_LEN) {
45,550✔
900
    STR_TO_VARSTR(schemaJson, string);
45,550✔
901
  } else {
902
    mError("mndRetrieveTopic build schema error json:%p, json len:%zu", string, len);
×
903
    STR_TO_VARSTR(schemaJson, "NULL");
×
904
  }
905
  taosMemoryFree(string);
45,550✔
906
}
907

908
static int32_t buildResult(SMqTopicObj *pTopic, int32_t *numOfRows, SMnode *pMnode, SSDataBlock *pBlock) {
46,875✔
909
  SColumnInfoData *pColInfo = NULL;
46,875✔
910
  SName            n = {0};
46,875✔
911
  int32_t          cols = 0;
46,875✔
912
  char *           schemaJson = NULL;
46,875✔
913
  char *           sql = NULL;
46,875✔
914
  int32_t          code = 0;
46,875✔
915
  int32_t          lino = 0;
46,875✔
916

917
  taosRLockLatch(&pTopic->lock);
46,875✔
918

919
  char        topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
46,875✔
920
  const char *pName = mndGetDbStr(pTopic->name);
46,875✔
921
  STR_TO_VARSTR(topicName, pName);
46,875✔
922

923
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
46,875✔
924
  MND_TMQ_NULL_CHECK(pColInfo);
46,875✔
925
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)topicName, false));
46,875✔
926

927
  char dbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
46,875✔
928
  MND_TMQ_RETURN_CHECK(tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB));
46,875✔
929
  MND_TMQ_RETURN_CHECK(tNameGetDbName(&n, varDataVal(dbName)));
46,875✔
930
  varDataSetLen(dbName, strlen(varDataVal(dbName)));
46,875✔
931

932
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
46,875✔
933
  MND_TMQ_NULL_CHECK(pColInfo);
46,875✔
934
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)dbName, false));
46,875✔
935

936
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
46,875✔
937
  MND_TMQ_NULL_CHECK(pColInfo);
46,875✔
938
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)&pTopic->createTime, false));
46,875✔
939

940
  sql = taosMemoryMalloc(strlen(pTopic->sql) + VARSTR_HEADER_SIZE);
46,875✔
941
  MND_TMQ_NULL_CHECK(sql);
46,875✔
942
  STR_TO_VARSTR(sql, pTopic->sql);
46,875✔
943

944
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
46,875✔
945
  MND_TMQ_NULL_CHECK(pColInfo);
46,875✔
946
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)sql, false));
46,875✔
947

948
  taosMemoryFreeClear(sql);
46,875✔
949

950
  schemaJson = taosMemoryMalloc(TSDB_SHOW_SCHEMA_JSON_LEN + VARSTR_HEADER_SIZE);
46,875✔
951
  MND_TMQ_NULL_CHECK(schemaJson);
46,875✔
952
  if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) {
46,875✔
953
    schemaToJson(pTopic->schema.pSchema, pTopic->schema.nCols, schemaJson);
45,236✔
954
  } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
1,639✔
955
    SStbObj *pStb = mndAcquireStb(pMnode, pTopic->stbName);
314✔
956
    if (pStb == NULL) {
314✔
957
      STR_TO_VARSTR(schemaJson, "NULL");
×
958
      mError("mndRetrieveTopic mndAcquireStb null stbName:%s", pTopic->stbName);
×
959
    } else {
960
      schemaToJson(pStb->pColumns, pStb->numOfColumns, schemaJson);
314✔
961
      mndReleaseStb(pMnode, pStb);
314✔
962
    }
963
  } else {
964
    STR_TO_VARSTR(schemaJson, "NULL");
1,325✔
965
  }
966

967
  pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
46,875✔
968
  MND_TMQ_NULL_CHECK(pColInfo);
46,875✔
969
  MND_TMQ_RETURN_CHECK(colDataSetVal(pColInfo, *numOfRows, (const char *)schemaJson, false));
46,875✔
970
  taosMemoryFreeClear(schemaJson);
46,875✔
971

972
  (*numOfRows)++;
46,875✔
973

974
END:
46,875✔
975
  taosRUnLockLatch(&pTopic->lock);
46,875✔
976

977
  taosMemoryFreeClear(sql);
46,875✔
978
  taosMemoryFreeClear(schemaJson);
46,875✔
979
  return code;
46,875✔
980
}
981

982
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
16,545✔
983
  if (pReq == NULL || pShow == NULL || pBlock == NULL) {
16,545✔
984
    return TSDB_CODE_INVALID_MSG;
×
985
  }
986
  SMnode *     pMnode = pReq->info.node;
16,545✔
987
  SSdb *       pSdb = pMnode->pSdb;
16,545✔
988
  int32_t      numOfRows = 0;
16,545✔
989
  SMqTopicObj *pTopic = NULL;
16,545✔
990
  int32_t      code = 0;
16,545✔
991
  int32_t      lino = 0;
16,545✔
992
  PRINT_LOG_START
16,545✔
993

994
  while (numOfRows < rowsCapacity) {
63,420✔
995
    pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
63,420✔
996
    if (pShow->pIter == NULL) break;
63,420✔
997

998
    MND_TMQ_RETURN_CHECK(buildResult(pTopic, &numOfRows, pMnode, pBlock));
46,875✔
999
    sdbRelease(pSdb, pTopic);
46,875✔
1000
    pTopic = NULL;
46,875✔
1001
  }
1002

1003
  pShow->numOfRows += numOfRows;
16,545✔
1004

1005
END:
16,545✔
1006
  sdbCancelFetch(pSdb, pShow->pIter);
16,545✔
1007
  sdbRelease(pSdb, pTopic);
16,545✔
1008
  if (code != TSDB_CODE_SUCCESS) {
16,545✔
1009
    mError("%s failed since %s", __func__, tstrerror(code));
×
1010
    return code;
×
1011
  } else {
1012
    mDebug("%s retrieved %d rows successfully", __func__, numOfRows);
16,545✔
1013
    return numOfRows;
16,545✔
1014
  }
1015
}
1016

1017
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
×
1018
  if (pMnode == NULL) return;
×
1019
  SSdb *pSdb = pMnode->pSdb;
×
1020
  sdbCancelFetchByType(pSdb, pIter, SDB_TOPIC);
×
1021
}
1022

1023
bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) {
586,351✔
1024
  if (pMnode == NULL || pDb == NULL) {
586,351✔
1025
    return false;
×
1026
  }
1027
  SSdb *       pSdb = pMnode->pSdb;
586,351✔
1028
  void *       pIter = NULL;
586,351✔
1029
  SMqTopicObj *pTopic = NULL;
586,351✔
1030

1031
  while (1) {
46,150✔
1032
    pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
632,501✔
1033
    if (pIter == NULL) {
632,501✔
1034
      break;
586,019✔
1035
    }
1036

1037
    taosRLockLatch(&pTopic->lock);
46,482✔
1038
    bool found = pTopic->dbUid == pDb->uid;
46,482✔
1039
    taosRUnLockLatch(&pTopic->lock);
46,482✔
1040

1041
    if (found) {
46,482✔
1042
      sdbRelease(pSdb, pTopic);
332✔
1043
      sdbCancelFetch(pSdb, pIter);
332✔
1044
      return true;
332✔
1045
    }
1046

1047
    sdbRelease(pSdb, pTopic);
46,150✔
1048
  }
1049

1050
  return false;
586,019✔
1051
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc