• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5036

28 Apr 2026 02:11PM UTC coverage: 73.055% (-0.005%) from 73.06%
#5036

push

travis-ci

web-flow
func/regexp_extract: new scalar func and test cases (#35191)

46 of 59 new or added lines in 1 file covered. (77.97%)

5784 existing lines in 151 files now uncovered.

276105 of 377940 relevant lines covered (73.06%)

133708549.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.1
/source/dnode/mnode/impl/src/mndMount.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifdef USE_MOUNT
16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "command.h"
19
#include "mndArbGroup.h"
20
#include "mndCluster.h"
21
#include "mndConfig.h"
22
#include "mndDb.h"
23
#include "mndDnode.h"
24
#include "mndIndex.h"
25
#include "mndIndexComm.h"
26
#include "mndMnode.h"
27
#include "mndMount.h"
28
#include "mndPrivilege.h"
29
#include "mndShow.h"
30
#include "mndSma.h"
31
#include "mndStb.h"
32
#include "mndStream.h"
33
#include "mndSubscribe.h"
34
#include "mndTopic.h"
35
#include "mndTrans.h"
36
#include "mndUser.h"
37
#include "mndVgroup.h"
38
#include "mndView.h"
39
#include "systable.h"
40
#include "thttp.h"
41
#include "tjson.h"
42

43
#define MND_MOUNT_VER_NUMBER 1
44

45
static int32_t  mndMountActionInsert(SSdb *pSdb, SMountObj *pObj);
46
static int32_t  mndMountActionDelete(SSdb *pSdb, SMountObj *pObj);
47
static int32_t  mndMountActionUpdate(SSdb *pSdb, SMountObj *pOld, SMountObj *pNew);
48
static int32_t  mndNewMountActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
49

50
static int32_t mndProcessCreateMountReq(SRpcMsg *pReq);
51
static int32_t mndProcessDropMountReq(SRpcMsg *pReq);
52
static int32_t mndProcessExecuteMountReq(SRpcMsg *pReq);
53
static int32_t mndProcessRetrieveMountPathRsp(SRpcMsg *pRsp);
54
static int32_t mndRetrieveMounts(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity);
55
static void    mndCancelGetNextMount(SMnode *pMnode, void *pIter);
56

57
int32_t mndInitMount(SMnode *pMnode) {
483,771✔
58
  SSdbTable table = {
483,771✔
59
      .sdbType = SDB_MOUNT,
60
      .keyType = SDB_KEY_BINARY,
61
      .encodeFp = (SdbEncodeFp)mndMountActionEncode,
62
      .decodeFp = (SdbDecodeFp)mndMountActionDecode,
63
      .insertFp = (SdbInsertFp)mndMountActionInsert,
64
      .updateFp = (SdbUpdateFp)mndMountActionUpdate,
65
      .deleteFp = (SdbDeleteFp)mndMountActionDelete,
66
      .validateFp = (SdbValidateFp)mndNewMountActionValidate,
67
  };
68

69
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MOUNT, mndProcessCreateMountReq);
483,771✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MOUNT, mndProcessDropMountReq);
483,771✔
71
  mndSetMsgHandle(pMnode, TDMT_MND_EXECUTE_MOUNT, mndProcessExecuteMountReq);
483,771✔
72
  mndSetMsgHandle(pMnode, TDMT_DND_RETRIEVE_MOUNT_PATH_RSP, mndProcessRetrieveMountPathRsp);
483,771✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_MOUNT_VNODE_RSP, mndTransProcessRsp);
483,771✔
74

75
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MOUNT, mndRetrieveMounts);
483,771✔
76
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MOUNT, mndCancelGetNextMount);
483,771✔
77

78
  return sdbSetTable(pMnode->pSdb, table);
483,771✔
79
}
80

81
void mndCleanupMount(SMnode *pMnode) {}
483,710✔
82

83
void mndMountFreeObj(SMountObj *pObj) {
1,725✔
84
  if (pObj) {
1,725✔
85
    taosMemoryFreeClear(pObj->dnodeIds);
1,725✔
86
    taosMemoryFreeClear(pObj->dbObj);
1,725✔
87
    if (pObj->paths) {
1,725✔
88
      for (int32_t i = 0; i < pObj->nMounts; ++i) {
3,450✔
89
        taosMemoryFreeClear(pObj->paths[i]);
1,725✔
90
      }
91
      taosMemoryFreeClear(pObj->paths);
1,725✔
92
    }
93
  }
94
}
1,725✔
95

96
void mndMountDestroyObj(SMountObj *pObj) {
×
97
  if (pObj) {
×
98
    mndMountFreeObj(pObj);
×
99
    taosMemoryFree(pObj);
×
100
  }
101
}
×
102

103
static int32_t tSerializeSMountObj(void *buf, int32_t bufLen, const SMountObj *pObj) {
2,878✔
104
  int32_t  code = 0, lino = 0;
2,878✔
105
  int32_t  tlen = 0;
2,878✔
106
  SEncoder encoder = {0};
2,878✔
107
  tEncoderInit(&encoder, buf, bufLen);
2,878✔
108

109
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
2,878✔
110

111
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
5,756✔
112
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->acct));
5,756✔
113
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
5,756✔
114
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
5,756✔
115
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
5,756✔
116
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
5,756✔
117
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nMounts));
5,756✔
118
  for (int16_t i = 0; i < pObj->nMounts; ++i) {
5,756✔
119
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->dnodeIds[i]));
5,756✔
120
    TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->paths[i]));
5,756✔
121
  }
122
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nDbs));
5,756✔
123
  for (int16_t i = 0; i < pObj->nDbs; ++i) {
2,878✔
124
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->dbObj[i].uid));
×
125
    TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbObj[i].name));
×
126
  }
127
  tEndEncode(&encoder);
2,878✔
128

129
  tlen = encoder.pos;
2,878✔
130
_exit:
2,878✔
131
  tEncoderClear(&encoder);
2,878✔
132
  if (code < 0) {
2,878✔
133
    mError("mount, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
134
    TAOS_RETURN(code);
×
135
  }
136

137
  return tlen;
2,878✔
138
}
139

140
static int32_t tDeserializeSMountObj(void *buf, int32_t bufLen, SMountObj *pObj) {
1,436✔
141
  int32_t  code = 0, lino = 0;
1,436✔
142
  SDecoder decoder = {0};
1,436✔
143
  tDecoderInit(&decoder, buf, bufLen);
1,436✔
144

145
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
1,436✔
146

147
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
1,436✔
148
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->acct));
1,436✔
149
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
1,436✔
150
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
2,872✔
151
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
2,872✔
152
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
2,872✔
153
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nMounts));
2,872✔
154
  if (pObj->nMounts > 0) {
1,436✔
155
    if (!(pObj->dnodeIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nMounts))) {
1,436✔
156
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
157
    }
158
    if (!(pObj->paths = taosMemoryMalloc(sizeof(char *) * pObj->nMounts))) {
1,436✔
159
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
160
    }
161
    for (int16_t i = 0; i < pObj->nMounts; ++i) {
2,872✔
162
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->dnodeIds[i]));
2,872✔
163
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pObj->paths[i]));
2,872✔
164
    }
165
  }
166
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nDbs));
2,872✔
167
  if (pObj->nDbs > 0) {
1,436✔
168
    if (!(pObj->dbObj = taosMemoryMalloc(sizeof(SMountDbObj) * pObj->nDbs))) {
×
169
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
170
    }
171
    for (int16_t i = 0; i < pObj->nDbs; ++i) {
×
172
      TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbObj[i].uid));
×
173
      TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbObj[i].name));
×
174
    }
175
  }
176

177
_exit:
1,436✔
178
  tEndDecode(&decoder);
1,436✔
179
  tDecoderClear(&decoder);
1,436✔
180
  if (code < 0) {
1,436✔
181
    mError("mount, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
182
  }
183
  TAOS_RETURN(code);
1,436✔
184
}
185

186
SSdbRaw *mndMountActionEncode(SMountObj *pObj) {
1,439✔
187
  int32_t  code = 0, lino = 0;
1,439✔
188
  void    *buf = NULL;
1,439✔
189
  SSdbRaw *pRaw = NULL;
1,439✔
190
  int32_t  tlen = tSerializeSMountObj(NULL, 0, pObj);
1,439✔
191
  if (tlen < 0) {
1,439✔
192
    TAOS_CHECK_EXIT(tlen);
×
193
  }
194

195
  int32_t size = sizeof(int32_t) + tlen;
1,439✔
196
  pRaw = sdbAllocRaw(SDB_MOUNT, MND_MOUNT_VER_NUMBER, size);
1,439✔
197
  if (pRaw == NULL) {
1,439✔
198
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
199
  }
200

201
  buf = taosMemoryMalloc(tlen);
1,439✔
202
  if (buf == NULL) {
1,439✔
203
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
204
  }
205

206
  tlen = tSerializeSMountObj(buf, tlen, pObj);
1,439✔
207
  if (tlen < 0) {
1,439✔
208
    TAOS_CHECK_EXIT(tlen);
×
209
  }
210

211
  int32_t dataPos = 0;
1,439✔
212
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
1,439✔
213
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
1,439✔
214
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
1,439✔
215

216
_exit:
1,439✔
217
  taosMemoryFreeClear(buf);
1,439✔
218
  if (code != TSDB_CODE_SUCCESS) {
1,439✔
219
    terrno = code;
×
220
    mError("mount, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
221
    sdbFreeRaw(pRaw);
×
222
    return NULL;
×
223
  }
224

225
  mTrace("mount, encode to raw:%p, row:%p", pRaw, pObj);
1,439✔
226
  return pRaw;
1,439✔
227
}
228

229
SSdbRow *mndMountActionDecode(SSdbRaw *pRaw) {
1,436✔
230
  int32_t    code = 0, lino = 0;
1,436✔
231
  SSdbRow   *pRow = NULL;
1,436✔
232
  SMountObj *pObj = NULL;
1,436✔
233
  void      *buf = NULL;
1,436✔
234

235
  int8_t sver = 0;
1,436✔
236
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
1,436✔
237
    goto _exit;
×
238
  }
239

240
  if (sver != MND_MOUNT_VER_NUMBER) {
1,436✔
241
    code = TSDB_CODE_SDB_INVALID_DATA_VER;
×
242
    mError("mount read invalid ver, data ver: %d, curr ver: %d", sver, MND_MOUNT_VER_NUMBER);
×
243
    goto _exit;
×
244
  }
245

246
  if (!(pRow = sdbAllocRow(sizeof(SMountObj)))) {
1,436✔
247
    code = TSDB_CODE_OUT_OF_MEMORY;
×
248
    goto _exit;
×
249
  }
250

251
  if (!(pObj = sdbGetRowObj(pRow))) {
1,436✔
252
    code = TSDB_CODE_OUT_OF_MEMORY;
×
253
    goto _exit;
×
254
  }
255

256
  int32_t tlen;
1,436✔
257
  int32_t dataPos = 0;
1,436✔
258
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
1,436✔
259
  buf = taosMemoryMalloc(tlen + 1);
1,436✔
260
  if (buf == NULL) {
1,436✔
261
    code = TSDB_CODE_OUT_OF_MEMORY;
×
262
    goto _exit;
×
263
  }
264
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
1,436✔
265

266
  if (tDeserializeSMountObj(buf, tlen, pObj) < 0) {
1,436✔
267
    code = TSDB_CODE_OUT_OF_MEMORY;
×
268
    goto _exit;
×
269
  }
270

271
  taosInitRWLatch(&pObj->lock);
1,436✔
272

273
_exit:
1,436✔
274
  taosMemoryFreeClear(buf);
1,436✔
275
  if (code != TSDB_CODE_SUCCESS) {
1,436✔
276
    terrno = code;
×
277
    mError("mount, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
278
    mndMountFreeObj(pObj);
×
279
    taosMemoryFreeClear(pRow);
×
280
    return NULL;
×
281
  }
282
  mTrace("mount, decode from raw:%p, row:%p", pRaw, pObj);
1,436✔
283
  return pRow;
1,436✔
284
}
285

286
static int32_t mndNewMountActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
289✔
287
  mTrace("mount, validate new mount action, raw:%p", pRaw);
289✔
288
  return 0;
289✔
289
}
290

291
static int32_t mndMountActionInsert(SSdb *pSdb, SMountObj *pObj) {
575✔
292
  mTrace("mount:%s, perform insert action, row:%p", pObj->name, pObj);
575✔
293
  return 0;
575✔
294
}
295

296
static int32_t mndMountActionDelete(SSdb *pSdb, SMountObj *pObj) {
1,436✔
297
  mTrace("mount:%s, perform delete action, row:%p", pObj->name, pObj);
1,436✔
298
  mndMountFreeObj(pObj);
1,436✔
299
  return 0;
1,436✔
300
}
301

302
static int32_t mndMountActionUpdate(SSdb *pSdb, SMountObj *pOld, SMountObj *pNew) {
575✔
303
  mTrace("mount:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
575✔
304
  taosWLockLatch(&pOld->lock);
575✔
305
  pOld->updateTime = pNew->updateTime;
575✔
306
  taosWUnLockLatch(&pOld->lock);
575✔
307
  return 0;
575✔
308
}
309

310
SMountObj *mndAcquireMount(SMnode *pMnode, const char *mountName) {
2,747✔
311
  SSdb      *pSdb = pMnode->pSdb;
2,747✔
312
  SMountObj *pObj = sdbAcquire(pSdb, SDB_MOUNT, mountName);
2,747✔
313
  if (pObj == NULL) {
2,747✔
314
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
2,172✔
315
      terrno = TSDB_CODE_MND_MOUNT_NOT_EXIST;
2,172✔
316
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
317
      terrno = TSDB_CODE_MND_MOUNT_IN_CREATING;
×
318
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
319
      terrno = TSDB_CODE_MND_MOUNT_IN_DROPPING;
×
320
    } else {
321
      terrno = TSDB_CODE_APP_ERROR;
×
322
      mFatal("mount:%s, failed to acquire mount since %s", mountName, terrstr());
×
323
    }
324
  }
325
  return pObj;
2,747✔
326
}
327

328
void mndReleaseMount(SMnode *pMnode, SMountObj *pObj) {
2,458✔
329
  SSdb *pSdb = pMnode->pSdb;
2,458✔
330
  sdbRelease(pSdb, pObj);
2,458✔
331
}
2,458✔
332

333
bool mndMountIsExist(SMnode *pMnode, const char *mountName) {
×
334
  SMountObj *pObj = mndAcquireMount(pMnode, mountName);
×
335
  if (pObj == NULL) {
×
336
    return false;
×
337
  }
338
  mndReleaseMount(pMnode, pObj);
×
339
  return true;
×
340
}
341

342
void *mndBuildRetrieveMountPathReq(SMnode *pMnode, SRpcMsg *pMsg, const char *mountName, const char *mountPath,
1,014✔
343
                                   int32_t dnodeId, int32_t *pContLen) {
344
  int32_t code = 0, lino = 0;
1,014✔
345
  void   *pBuf = NULL;
1,014✔
346

347
  SRetrieveMountPathReq req = {0};
1,014✔
348
  req.dnodeId = dnodeId;
1,014✔
349
  req.pVal = &pMsg->info;
1,014✔
350
  req.valLen = sizeof(pMsg->info);
1,014✔
351
  TAOS_UNUSED(snprintf(req.mountName, TSDB_MOUNT_NAME_LEN, "%s", mountName));
1,014✔
352
  TAOS_UNUSED(snprintf(req.mountPath, TSDB_MOUNT_PATH_LEN, "%s", mountPath));
1,014✔
353

354
  int32_t contLen = tSerializeSRetrieveMountPathReq(NULL, 0, &req);
1,014✔
355
  TAOS_CHECK_EXIT(contLen);
1,014✔
356
  TSDB_CHECK_NULL((pBuf = rpcMallocCont(contLen)), code, lino, _exit, terrno);
1,014✔
357
  TAOS_CHECK_EXIT(tSerializeSRetrieveMountPathReq(pBuf, contLen, &req));
1,014✔
358
_exit:
1,014✔
359
  if (code < 0) {
1,014✔
360
    rpcFreeCont(pBuf);
×
361
    terrno = code;
×
362
    return NULL;
×
363
  }
364
  *pContLen = contLen;
1,014✔
365
  return pBuf;
1,014✔
366
}
367

368
#if 0
369
static int32_t mndSetCreateMountUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
370
  int32_t code = 0;
371
  for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
372
    SVgObj *pVgroup = pVgroups + vg;
373

374
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
375
      SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
376
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false));
377
    }
378
  }
379

380
  TAOS_RETURN(code);
381
}
382
#endif
383

384

385
#ifndef TD_ENTERPRISE
386
int32_t mndCreateMount(SMnode *pMnode, SRpcMsg *pReq, SMountInfo *pInfo, SUserObj *pUser) {
387
  return TSDB_CODE_OPS_NOT_SUPPORT;
388
}
389
#endif
390

391
static int32_t mndRetrieveMountInfo(SMnode *pMnode, SRpcMsg *pMsg, SCreateMountReq *pReq) {
1,014✔
392
  int32_t    code = 0, lino = 0;
1,014✔
393
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pReq->dnodeIds[0]);
1,014✔
394
  if (pDnode == NULL) TAOS_RETURN(terrno);
1,014✔
395
  if (pDnode->offlineReason != DND_REASON_ONLINE) {
1,014✔
396
    mndReleaseDnode(pMnode, pDnode);
×
397
    TAOS_RETURN(TSDB_CODE_DNODE_OFFLINE);
×
398
  }
399
  SEpSet epSet = mndGetDnodeEpset(pDnode);
1,014✔
400
  mndReleaseDnode(pMnode, pDnode);
1,014✔
401

402
  int32_t bufLen = 0;
1,014✔
403
  void   *pBuf =
404
      mndBuildRetrieveMountPathReq(pMnode, pMsg, pReq->mountName, pReq->mountPaths[0], pReq->dnodeIds[0], &bufLen);
1,014✔
405
  if (pBuf == NULL) TAOS_RETURN(terrno);
1,014✔
406

407
  SRpcMsg rpcMsg = {.msgType = TDMT_DND_RETRIEVE_MOUNT_PATH, .pCont = pBuf, .contLen = bufLen};
1,014✔
408
  TAOS_CHECK_EXIT(tmsgSendReq(&epSet, &rpcMsg));
1,014✔
409

410
  pMsg->info.handle = NULL;  // disable auto rsp to client
1,014✔
411
_exit:
1,014✔
412
  TAOS_RETURN(code);
1,014✔
413
}
414

415
static int32_t mndProcessRetrieveMountPathRsp(SRpcMsg *pRsp) {
1,014✔
416
  int32_t    code = 0, lino = 0;
1,014✔
417
  int32_t    rspCode = 0;
1,014✔
418
  SMnode    *pMnode = pRsp->info.node;
1,014✔
419
  SMountInfo mntInfo = {0};
1,014✔
420
  SDecoder   decoder = {0};
1,014✔
421
  void      *pBuf = NULL;
1,014✔
422
  int32_t    bufLen = 0;
1,014✔
423
  bool       rspToClient = false;
1,014✔
424

425
  // step 1: decode and preprocess in mnode read thread
426
  tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
1,014✔
427
  TAOS_CHECK_EXIT(tDeserializeSMountInfo(&decoder, &mntInfo, false));
1,014✔
428
  const STraceId *trace = &pRsp->info.traceId;
1,014✔
429
  SRpcMsg         rsp = {
1,014✔
430
      // .code = pRsp->code,
431
      // .pCont = pRsp->info.rsp,
432
      // .contLen = pRsp->info.rspLen,
433
              .info = *(SRpcHandleInfo *)mntInfo.pVal,
1,014✔
434
  };
435
  rspToClient = true;
1,014✔
436
  if (pRsp->code != 0) {
1,014✔
437
    TAOS_CHECK_EXIT(pRsp->code);
725✔
438
  }
439

440
  // wait for all retrieve response received
441
  // TODO: ...
442
  // make sure the clusterId from all rsp is the same, but not with the clusterId of the host cluster
443
  if (mntInfo.clusterId == pMnode->clusterId) {
289✔
444
    mError("mount:%s, clusterId:%" PRIi64 " from dnode is identical to the host cluster's id:%" PRIi64,
×
445
           mntInfo.mountName, mntInfo.clusterId, pMnode->clusterId);
446
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_CLUSTER_EXIST);
×
447
  }
448

449
  // step 2: collect the responses from dnodes, process and push to mnode write thread to run as transaction
450
  // TODO: multiple retrieve dnodes and paths supported later
451
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mntInfo)) >= 0, code, lino, _exit, bufLen);
289✔
452
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), code, lino, _exit, terrno);
289✔
453
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mntInfo)) >= 0, code, lino, _exit, bufLen);
289✔
454
  SRpcMsg rpcMsg = {.pCont = pBuf, .contLen = bufLen, .msgType = TDMT_MND_EXECUTE_MOUNT, .info.noResp = 1};
289✔
455
  SEpSet  mnodeEpset = {0};
289✔
456
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
289✔
457

458
  SMountObj *pObj = NULL;
289✔
459
  if ((pObj = mndAcquireMount(pMnode, mntInfo.mountName))) {
289✔
460
    mndReleaseMount(pMnode, pObj);
×
461
    if (mntInfo.ignoreExist) {
×
462
      mInfo("mount:%s, already exist, ignore exist is set", mntInfo.mountName);
×
463
      code = 0;
×
464
      goto _exit;
×
465
    } else {
466
      TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_ALREADY_EXIST);
×
467
    }
468
  } else {
469
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
289✔
470
      // continue
471
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
472
      TAOS_CHECK_EXIT(code);
×
473
    }
474
  }
475
  TAOS_CHECK_EXIT(tmsgSendReq(&mnodeEpset, &rpcMsg));
289✔
476
_exit:
1,014✔
477
  if (code == 0) {
1,014✔
478
    mGInfo("mount:%s, msg:%p, retrieve mount path rsp with code:%d", mntInfo.mountName, pRsp, pRsp->code);
289✔
479
  } else {
480
    mError("mount:%s, msg:%p, failed at line %d to retrieve mount path rsp since %s", mntInfo.mountName, pRsp, lino,
725✔
481
           tstrerror(code));
482
    if (rspToClient) {
725✔
483
      rsp.code = code;
725✔
484
      tmsgSendRsp(&rsp);
725✔
485
    }
486
  }
487
  tDecoderClear(&decoder);
1,014✔
488
  tFreeMountInfo(&mntInfo, false);
1,014✔
489
  TAOS_RETURN(code);
1,014✔
490
}
491

492
static int32_t mndProcessCreateMountReq(SRpcMsg *pReq) {
1,593✔
493
  int32_t         code = 0, lino = 0;
1,593✔
494
  SMnode         *pMnode = pReq->info.node;
1,593✔
495
  SDbObj         *pDb = NULL;
1,593✔
496
  SMountObj      *pObj = NULL;
1,593✔
497
  SUserObj       *pUser = NULL;
1,593✔
498
  SCreateMountReq createReq = {0};
1,593✔
499
  int64_t         tss = taosGetTimestampMs();
1,593✔
500

501
  TAOS_CHECK_EXIT(tDeserializeSCreateMountReq(pReq->pCont, pReq->contLen, &createReq));
1,593✔
502
  mInfo("mount:%s, start to create on dnode %d from %s", createReq.mountName, *createReq.dnodeIds,
1,593✔
503
        createReq.mountPaths[0]);  // TODO: mutiple mounts
504

505
  if ((pObj = mndAcquireMount(pMnode, createReq.mountName))) {
1,593✔
506
    if (createReq.ignoreExist) {
289✔
507
      mInfo("mount:%s, already exist, ignore exist is set", createReq.mountName);
×
508
      code = 0;
×
509
      goto _exit;
×
510
    } else {
511
      code = TSDB_CODE_MND_MOUNT_ALREADY_EXIST;
289✔
512
      goto _exit;
289✔
513
    }
514
  } else {
515
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
1,304✔
516
      // continue
517
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
518
      goto _exit;
×
519
    }
520
  }
521
  TAOS_CHECK_EXIT(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_CREATE_MOUNT));
1,304✔
522
  TAOS_CHECK_EXIT(grantCheck(TSDB_GRANT_MOUNT));
1,159✔
523
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
1,159✔
524
  char fullMountName[TSDB_MOUNT_NAME_LEN + 32] = {0};
1,159✔
525
  (void)snprintf(fullMountName, sizeof(fullMountName), "%d.%s", pUser->acctId, createReq.mountName);
1,159✔
526
  if ((pDb = mndAcquireDb(pMnode, fullMountName))) {
1,159✔
527
    mndReleaseDb(pMnode, pDb);
145✔
528
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_DB_NAME_EXIST);
145✔
529
  }
530

531
  TAOS_CHECK_EXIT(mndRetrieveMountInfo(pMnode, pReq, &createReq));
1,014✔
532
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
1,014✔
533

534
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
1,014✔
535
    int64_t tse = taosGetTimestampMs();
1,014✔
536
    double  duration = (double)(tse - tss);
1,014✔
537
    duration = duration / 1000;
1,014✔
538
    auditRecord(pReq, pMnode->clusterId, "createMount", createReq.mountName, "", createReq.sql, createReq.sqlLen,
1,014✔
539
                duration, 0);
540
  }
541

542
_exit:
1,593✔
543
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,593✔
544
    mError("mount:%s, dnode:%d, path:%s, failed to create at line:%d since %s", createReq.mountName,
579✔
545
           createReq.dnodeIds ? createReq.dnodeIds[0] : 0, createReq.mountPaths ? createReq.mountPaths[0] : "", lino,
546
           tstrerror(code));  // TODO: mutiple mounts
547
  }
548

549
  mndReleaseMount(pMnode, pObj);
1,593✔
550
  mndReleaseUser(pMnode, pUser);
1,593✔
551
  tFreeSCreateMountReq(&createReq);
1,593✔
552

553
  TAOS_RETURN(code);
1,593✔
554
}
555

556
static int32_t mndProcessExecuteMountReq(SRpcMsg *pReq) {
289✔
557
  int32_t    code = 0, lino = 0;
289✔
558
  SMnode    *pMnode = pReq->info.node;
289✔
559
  SDbObj    *pDb = NULL;
289✔
560
  SMountObj *pObj = NULL;
289✔
561
  SUserObj  *pUser = NULL;
289✔
562
  SMountInfo mntInfo = {0};
289✔
563
  SDecoder   decoder = {0};
289✔
564
  SRpcMsg    rsp = {0};
289✔
565
  bool       rspToClient = false;
289✔
566

567
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
289✔
568

569
  TAOS_CHECK_EXIT(tDeserializeSMountInfo(&decoder, &mntInfo, true));
289✔
570
  rspToClient = true;
289✔
571
  mInfo("mount:%s, start to execute on mnode", mntInfo.mountName);
289✔
572

573
  if ((pDb = mndAcquireDb(pMnode, mntInfo.mountName))) {
289✔
UNCOV
574
    mndReleaseDb(pMnode, pDb);
×
575
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_DB_NAME_EXIST);
×
576
  }
577

578
  if ((pObj = mndAcquireMount(pMnode, mntInfo.mountName))) {
289✔
UNCOV
579
    if (mntInfo.ignoreExist) {
×
580
      mInfo("mount:%s, already exist, ignore exist is set", mntInfo.mountName);
×
581
      code = 0;
×
582
      goto _exit;
×
583
    } else {
UNCOV
584
      TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_ALREADY_EXIST);
×
585
    }
586
  } else {
587
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
289✔
588
      // continue
589
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
UNCOV
590
      TAOS_CHECK_EXIT(code);
×
591
    }
592
  }
593
  // mount operation share the privileges of db
594
  TAOS_CHECK_EXIT(grantCheck(TSDB_GRANT_MOUNT));  // TODO: implement when the plan is ready
289✔
595
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pUser));
289✔
596

597
  TAOS_CHECK_EXIT(mndCreateMount(pMnode, pReq, &mntInfo, pUser));
289✔
598
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
289✔
599
_exit:
289✔
600
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
289✔
601
    // TODO: mutiple path mount
UNCOV
602
    rsp.code = code;
×
603
    mError("mount:%s, dnode:%d, path:%s, failed to create at line:%d since %s", mntInfo.mountName, mntInfo.dnodeId,
×
604
           mntInfo.mountPath, lino, tstrerror(code));
605
  }
606
  if (rspToClient) {
289✔
607
    rsp.info = *(SRpcHandleInfo *)mntInfo.pVal;
289✔
608
    tmsgSendRsp(&rsp);
289✔
609
  }
610
  mndReleaseMount(pMnode, pObj);
289✔
611
  mndReleaseUser(pMnode, pUser);
289✔
612
  tDecoderClear(&decoder);
289✔
613
  tFreeMountInfo(&mntInfo, true);
289✔
614

615
  TAOS_RETURN(code);
289✔
616
}
617

618
int32_t mndBuildDropMountRsp(SMountObj *pObj, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
286✔
619
  int32_t       code = 0;
286✔
620
  SDropMountRsp dropRsp = {0};
286✔
621
  if (pObj != NULL) {
286✔
622
    (void)memcpy(dropRsp.name, pObj->name, TSDB_MOUNT_NAME_LEN);
286✔
623
    dropRsp.uid = pObj->uid;
286✔
624
  }
625

626
  int32_t rspLen = tSerializeSDropMountRsp(NULL, 0, &dropRsp);
286✔
627
  void   *pRsp = NULL;
286✔
628
  if (useRpcMalloc) {
286✔
UNCOV
629
    pRsp = rpcMallocCont(rspLen);
×
630
  } else {
631
    pRsp = taosMemoryMalloc(rspLen);
286✔
632
  }
633

634
  if (pRsp == NULL) {
286✔
UNCOV
635
    code = terrno;
×
636
    TAOS_RETURN(code);
×
637
  }
638

639
  int32_t ret = 0;
286✔
640
  if ((ret = tSerializeSDropMountRsp(pRsp, rspLen, &dropRsp)) < 0) return ret;
286✔
641
  *pRspLen = rspLen;
286✔
642
  *ppRsp = pRsp;
286✔
643
  TAOS_RETURN(code);
286✔
644
}
645

646
bool mndHasMountOnDnode(SMnode *pMnode, int32_t dnodeId) {
12,129✔
647
  SSdb *pSdb = pMnode->pSdb;
12,129✔
648
  void *pIter = NULL;
12,129✔
649

UNCOV
650
  while (1) {
×
651
    SMountObj *pMount = NULL;
12,129✔
652
    pIter = sdbFetch(pSdb, SDB_MOUNT, pIter, (void **)&pMount);
12,129✔
653
    if (pIter == NULL) break;
12,129✔
654

UNCOV
655
    for (int32_t i = 0; i < pMount->nMounts; ++i) {
×
656
      if (pMount->dnodeIds[i] == dnodeId) {
×
657
        sdbRelease(pSdb, pMount);
×
658
        return true;
×
659
      }
660
    }
UNCOV
661
    sdbRelease(pSdb, pMount);
×
662
  }
663
  return false;
12,129✔
664
}
665

666
#ifndef TD_ENTERPRISE
667
int32_t mndDropMount(SMnode *pMnode, SRpcMsg *pReq, SMountObj *pObj) { return TSDB_CODE_OPS_NOT_SUPPORT; }
668
#endif
669

670
static int32_t mndProcessDropMountReq(SRpcMsg *pReq) {
576✔
671
  SMnode       *pMnode = pReq->info.node;
576✔
672
  int32_t       code = -1;
576✔
673
  SMountObj    *pObj = NULL;
576✔
674
  SDropMountReq dropReq = {0};
576✔
675
  int64_t       tss = taosGetTimestampMs();
576✔
676

677
  TAOS_CHECK_GOTO(tDeserializeSDropMountReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
576✔
678

679
  mInfo("mount:%s, start to drop", dropReq.mountName);
576✔
680

681
  pObj = mndAcquireMount(pMnode, dropReq.mountName);
576✔
682
  if (pObj == NULL) {
576✔
683
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
290✔
684
    if (terrno != 0) code = terrno;
290✔
685
    if (dropReq.ignoreNotExists) {
290✔
UNCOV
686
      code = mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
687
    }
688
    goto _exit;
290✔
689
  }
690

691
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_DROP_MOUNT), NULL, _exit);
286✔
692

693
  code = mndDropMount(pMnode, pReq, pObj);
286✔
694
  if (code == TSDB_CODE_SUCCESS) {
286✔
695
    code = TSDB_CODE_ACTION_IN_PROGRESS;
286✔
696
  }
697

698
  // SName name = {0};
699
  // if (tNameFromString(&name, dropReq.mountName, T_NAME_ACCT | T_NAME_DB) < 0)
700
  //   mError("mount:%s, failed to parse db name", dropReq.mountName);
701

702
  if (tsAuditLevel >= AUDIT_LEVEL_CLUSTER) {
286✔
703
    int64_t tse = taosGetTimestampMs();
286✔
704
    double  duration = (double)(tse - tss);
286✔
705
    duration = duration / 1000;
286✔
706
    auditRecord(pReq, pMnode->clusterId, "dropMount", dropReq.mountName, "", dropReq.sql, dropReq.sqlLen, duration, 0);
286✔
707
  }
708

709
_exit:
576✔
710
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
576✔
711
    mError("mount:%s, failed to drop since %s", dropReq.mountName, tstrerror(code));
290✔
712
  }
713

714
  mndReleaseMount(pMnode, pObj);
576✔
715
  tFreeSDropMountReq(&dropReq);
576✔
716
  TAOS_RETURN(code);
576✔
717
}
718

719
static int32_t mndRetrieveMounts(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
1,443✔
720
  SMnode          *pMnode = pReq->info.node;
1,443✔
721
  int32_t          code = 0, lino = 0;
1,443✔
722
  int32_t          numOfRows = 0;
1,443✔
723
  int32_t          cols = 0;
1,443✔
724
  char             tmp[512];
1,443✔
725
  int32_t          tmpLen = 0;
1,443✔
726
  int32_t          bufLen = 0;
1,443✔
727
  char            *pBuf = NULL;
1,443✔
728
  char            *qBuf = NULL;
1,443✔
729
  void            *pIter = NULL;
1,443✔
730
  SSdb            *pSdb = pMnode->pSdb;
1,443✔
731
  SColumnInfoData *pColInfo = NULL;
1,443✔
732

733
  pBuf = tmp;
1,443✔
734
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
1,443✔
735
  if (pShow->numOfRows < 1) {
1,443✔
736
    SMountObj *pObj = NULL;
1,443✔
737
    int32_t    index = 0;
1,443✔
738
    while ((pIter = sdbFetch(pSdb, SDB_MOUNT, pIter, (void **)&pObj))) {
2,021✔
739
      cols = 0;
578✔
740
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
578✔
741
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
578✔
742
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
578✔
743
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
578✔
744
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
578✔
745

746
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
578✔
747
        // TAOS_UNUSED(snprintf(pBuf, bufLen, "%d", *(int32_t *)pObj->dnodeIds));  // TODO: support mutiple dnodes
748
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->dnodeIds[0], false, pObj, pIter, _exit);
578✔
749
      }
750

751
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
578✔
752
        // TAOS_UNUSED(snprintf(pBuf, bufLen, "%" PRIi64, pObj->createdTime));
753
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
578✔
754
      }
755

756
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
578✔
757
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
578✔
758
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->paths[0]));  // TODO: support mutiple paths
578✔
759
        varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
578✔
760
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
578✔
761
      }
762

763
      sdbRelease(pSdb, pObj);
578✔
764
      ++numOfRows;
578✔
765
    }
766
  }
767

768
  pShow->numOfRows += numOfRows;
1,443✔
769

770
_exit:
1,443✔
771
  if (code < 0) {
1,443✔
UNCOV
772
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
UNCOV
773
    TAOS_RETURN(code);
×
774
  }
775
  return numOfRows;
1,443✔
776
}
777

UNCOV
778
static void mndCancelGetNextMount(SMnode *pMnode, void *pIter) {
×
UNCOV
779
  SSdb *pSdb = pMnode->pSdb;
×
780
  sdbCancelFetchByType(pSdb, pIter, SDB_MOUNT);
×
781
}
×
782

783
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc