• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4761

28 Sep 2025 10:49AM UTC coverage: 57.837% (-1.0%) from 58.866%
#4761

push

travis-ci

web-flow
merge: set version (#33122)

136913 of 302095 branches covered (45.32%)

Branch coverage included in aggregate %.

207750 of 293830 relevant lines covered (70.7%)

5673932.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.45
/source/dnode/mnode/impl/src/mndMount.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifdef USE_MOUNT
16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "command.h"
19
#include "mndArbGroup.h"
20
#include "mndCluster.h"
21
#include "mndConfig.h"
22
#include "mndDb.h"
23
#include "mndDnode.h"
24
#include "mndIndex.h"
25
#include "mndIndexComm.h"
26
#include "mndMnode.h"
27
#include "mndMount.h"
28
#include "mndPrivilege.h"
29
#include "mndShow.h"
30
#include "mndSma.h"
31
#include "mndStb.h"
32
#include "mndStream.h"
33
#include "mndSubscribe.h"
34
#include "mndTopic.h"
35
#include "mndTrans.h"
36
#include "mndUser.h"
37
#include "mndVgroup.h"
38
#include "mndView.h"
39
#include "systable.h"
40
#include "thttp.h"
41
#include "tjson.h"
42

43
#define MND_MOUNT_VER_NUMBER 1
44

45
static int32_t  mndMountActionInsert(SSdb *pSdb, SMountObj *pObj);
46
static int32_t  mndMountActionDelete(SSdb *pSdb, SMountObj *pObj);
47
static int32_t  mndMountActionUpdate(SSdb *pSdb, SMountObj *pOld, SMountObj *pNew);
48
static int32_t  mndNewMountActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
49

50
static int32_t mndProcessCreateMountReq(SRpcMsg *pReq);
51
static int32_t mndProcessDropMountReq(SRpcMsg *pReq);
52
static int32_t mndProcessExecuteMountReq(SRpcMsg *pReq);
53
static int32_t mndProcessRetrieveMountPathRsp(SRpcMsg *pRsp);
54
static int32_t mndRetrieveMounts(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity);
55
static void    mndCancelGetNextMount(SMnode *pMnode, void *pIter);
56

57
int32_t mndInitMount(SMnode *pMnode) {
1,943✔
58
  SSdbTable table = {
1,943✔
59
      .sdbType = SDB_MOUNT,
60
      .keyType = SDB_KEY_BINARY,
61
      .encodeFp = (SdbEncodeFp)mndMountActionEncode,
62
      .decodeFp = (SdbDecodeFp)mndMountActionDecode,
63
      .insertFp = (SdbInsertFp)mndMountActionInsert,
64
      .updateFp = (SdbUpdateFp)mndMountActionUpdate,
65
      .deleteFp = (SdbDeleteFp)mndMountActionDelete,
66
      .validateFp = (SdbValidateFp)mndNewMountActionValidate,
67
  };
68

69
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MOUNT, mndProcessCreateMountReq);
1,943✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MOUNT, mndProcessDropMountReq);
1,943✔
71
  mndSetMsgHandle(pMnode, TDMT_MND_EXECUTE_MOUNT, mndProcessExecuteMountReq);
1,943✔
72
  mndSetMsgHandle(pMnode, TDMT_DND_RETRIEVE_MOUNT_PATH_RSP, mndProcessRetrieveMountPathRsp);
1,943✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_MOUNT_VNODE_RSP, mndTransProcessRsp);
1,943✔
74

75
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MOUNT, mndRetrieveMounts);
1,943✔
76
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MOUNT, mndCancelGetNextMount);
1,943✔
77

78
  return sdbSetTable(pMnode->pSdb, table);
1,943✔
79
}
80

81
void mndCleanupMount(SMnode *pMnode) {}
1,943✔
82

83
void mndMountFreeObj(SMountObj *pObj) {
12✔
84
  if (pObj) {
12!
85
    taosMemoryFreeClear(pObj->dnodeIds);
12!
86
    taosMemoryFreeClear(pObj->dbObj);
12!
87
    if (pObj->paths) {
12!
88
      for (int32_t i = 0; i < pObj->nMounts; ++i) {
24✔
89
        taosMemoryFreeClear(pObj->paths[i]);
12!
90
      }
91
      taosMemoryFreeClear(pObj->paths);
12!
92
    }
93
  }
94
}
12✔
95

96
void mndMountDestroyObj(SMountObj *pObj) {
×
97
  if (pObj) {
×
98
    mndMountFreeObj(pObj);
×
99
    taosMemoryFree(pObj);
×
100
  }
101
}
×
102

103
static int32_t tSerializeSMountObj(void *buf, int32_t bufLen, const SMountObj *pObj) {
20✔
104
  int32_t  code = 0, lino = 0;
20✔
105
  int32_t  tlen = 0;
20✔
106
  SEncoder encoder = {0};
20✔
107
  tEncoderInit(&encoder, buf, bufLen);
20✔
108

109
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
20!
110

111
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
40!
112
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->acct));
40!
113
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
40!
114
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
40!
115
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
40!
116
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
40!
117
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nMounts));
40!
118
  for (int16_t i = 0; i < pObj->nMounts; ++i) {
40✔
119
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->dnodeIds[i]));
40!
120
    TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->paths[i]));
40!
121
  }
122
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nDbs));
40!
123
  for (int16_t i = 0; i < pObj->nDbs; ++i) {
20!
124
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->dbObj[i].uid));
×
125
    TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbObj[i].name));
×
126
  }
127
  tEndEncode(&encoder);
20✔
128

129
  tlen = encoder.pos;
20✔
130
_exit:
20✔
131
  tEncoderClear(&encoder);
20✔
132
  if (code < 0) {
20!
133
    mError("mount, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
134
    TAOS_RETURN(code);
×
135
  }
136

137
  return tlen;
20✔
138
}
139

140
static int32_t tDeserializeSMountObj(void *buf, int32_t bufLen, SMountObj *pObj) {
10✔
141
  int32_t  code = 0, lino = 0;
10✔
142
  SDecoder decoder = {0};
10✔
143
  tDecoderInit(&decoder, buf, bufLen);
10✔
144

145
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
10!
146

147
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
10!
148
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->acct));
10!
149
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
10!
150
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
20!
151
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
20!
152
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
20!
153
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nMounts));
20!
154
  if (pObj->nMounts > 0) {
10!
155
    if (!(pObj->dnodeIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nMounts))) {
10!
156
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
157
    }
158
    if (!(pObj->paths = taosMemoryMalloc(sizeof(char *) * pObj->nMounts))) {
10!
159
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
160
    }
161
    for (int16_t i = 0; i < pObj->nMounts; ++i) {
20✔
162
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->dnodeIds[i]));
20!
163
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pObj->paths[i]));
20!
164
    }
165
  }
166
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nDbs));
20!
167
  if (pObj->nDbs > 0) {
10!
168
    if (!(pObj->dbObj = taosMemoryMalloc(sizeof(SMountDbObj) * pObj->nDbs))) {
×
169
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
170
    }
171
    for (int16_t i = 0; i < pObj->nDbs; ++i) {
×
172
      TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbObj[i].uid));
×
173
      TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbObj[i].name));
×
174
    }
175
  }
176

177
_exit:
10✔
178
  tEndDecode(&decoder);
10✔
179
  tDecoderClear(&decoder);
10✔
180
  if (code < 0) {
10!
181
    mError("mount, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
182
  }
183
  TAOS_RETURN(code);
10✔
184
}
185

186
SSdbRaw *mndMountActionEncode(SMountObj *pObj) {
10✔
187
  int32_t  code = 0, lino = 0;
10✔
188
  void    *buf = NULL;
10✔
189
  SSdbRaw *pRaw = NULL;
10✔
190
  int32_t  tlen = tSerializeSMountObj(NULL, 0, pObj);
10✔
191
  if (tlen < 0) {
10!
192
    TAOS_CHECK_EXIT(tlen);
×
193
  }
194

195
  int32_t size = sizeof(int32_t) + tlen;
10✔
196
  pRaw = sdbAllocRaw(SDB_MOUNT, MND_MOUNT_VER_NUMBER, size);
10✔
197
  if (pRaw == NULL) {
10!
198
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
199
  }
200

201
  buf = taosMemoryMalloc(tlen);
10!
202
  if (buf == NULL) {
10!
203
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
204
  }
205

206
  tlen = tSerializeSMountObj(buf, tlen, pObj);
10✔
207
  if (tlen < 0) {
10!
208
    TAOS_CHECK_EXIT(tlen);
×
209
  }
210

211
  int32_t dataPos = 0;
10✔
212
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
10!
213
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
10!
214
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
10!
215

216
_exit:
10✔
217
  taosMemoryFreeClear(buf);
10!
218
  if (code != TSDB_CODE_SUCCESS) {
10!
219
    terrno = code;
×
220
    mError("mount, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
221
    sdbFreeRaw(pRaw);
×
222
    return NULL;
×
223
  }
224

225
  mTrace("mount, encode to raw:%p, row:%p", pRaw, pObj);
10!
226
  return pRaw;
10✔
227
}
228

229
SSdbRow *mndMountActionDecode(SSdbRaw *pRaw) {
10✔
230
  int32_t    code = 0, lino = 0;
10✔
231
  SSdbRow   *pRow = NULL;
10✔
232
  SMountObj *pObj = NULL;
10✔
233
  void      *buf = NULL;
10✔
234

235
  int8_t sver = 0;
10✔
236
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
10!
237
    goto _exit;
×
238
  }
239

240
  if (sver != MND_MOUNT_VER_NUMBER) {
10!
241
    code = TSDB_CODE_SDB_INVALID_DATA_VER;
×
242
    mError("mount read invalid ver, data ver: %d, curr ver: %d", sver, MND_MOUNT_VER_NUMBER);
×
243
    goto _exit;
×
244
  }
245

246
  if (!(pRow = sdbAllocRow(sizeof(SMountObj)))) {
10!
247
    code = TSDB_CODE_OUT_OF_MEMORY;
×
248
    goto _exit;
×
249
  }
250

251
  if (!(pObj = sdbGetRowObj(pRow))) {
10!
252
    code = TSDB_CODE_OUT_OF_MEMORY;
×
253
    goto _exit;
×
254
  }
255

256
  int32_t tlen;
257
  int32_t dataPos = 0;
10✔
258
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
10!
259
  buf = taosMemoryMalloc(tlen + 1);
10!
260
  if (buf == NULL) {
10!
261
    code = TSDB_CODE_OUT_OF_MEMORY;
×
262
    goto _exit;
×
263
  }
264
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
10!
265

266
  if (tDeserializeSMountObj(buf, tlen, pObj) < 0) {
10!
267
    code = TSDB_CODE_OUT_OF_MEMORY;
×
268
    goto _exit;
×
269
  }
270

271
  taosInitRWLatch(&pObj->lock);
10✔
272

273
_exit:
10✔
274
  taosMemoryFreeClear(buf);
10!
275
  if (code != TSDB_CODE_SUCCESS) {
10!
276
    terrno = code;
×
277
    mError("mount, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
278
    mndMountFreeObj(pObj);
×
279
    taosMemoryFreeClear(pRow);
×
280
    return NULL;
×
281
  }
282
  mTrace("mount, decode from raw:%p, row:%p", pRaw, pObj);
10!
283
  return pRow;
10✔
284
}
285

286
static int32_t mndNewMountActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
2✔
287
  mTrace("mount, validate new mount action, raw:%p", pRaw);
2!
288
  return 0;
2✔
289
}
290

291
static int32_t mndMountActionInsert(SSdb *pSdb, SMountObj *pObj) {
4✔
292
  mTrace("mount:%s, perform insert action, row:%p", pObj->name, pObj);
4!
293
  return 0;
4✔
294
}
295

296
static int32_t mndMountActionDelete(SSdb *pSdb, SMountObj *pObj) {
10✔
297
  mTrace("mount:%s, perform delete action, row:%p", pObj->name, pObj);
10!
298
  mndMountFreeObj(pObj);
10✔
299
  return 0;
10✔
300
}
301

302
static int32_t mndMountActionUpdate(SSdb *pSdb, SMountObj *pOld, SMountObj *pNew) {
4✔
303
  mTrace("mount:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
4!
304
  taosWLockLatch(&pOld->lock);
4✔
305
  pOld->updateTime = pNew->updateTime;
4✔
306
  taosWUnLockLatch(&pOld->lock);
4✔
307
  return 0;
4✔
308
}
309

310
SMountObj *mndAcquireMount(SMnode *pMnode, const char *mountName) {
17✔
311
  SSdb      *pSdb = pMnode->pSdb;
17✔
312
  SMountObj *pObj = sdbAcquire(pSdb, SDB_MOUNT, mountName);
17✔
313
  if (pObj == NULL) {
17✔
314
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
13!
315
      terrno = TSDB_CODE_MND_MOUNT_NOT_EXIST;
13✔
316
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
317
      terrno = TSDB_CODE_MND_MOUNT_IN_CREATING;
×
318
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
319
      terrno = TSDB_CODE_MND_MOUNT_IN_DROPPING;
×
320
    } else {
321
      terrno = TSDB_CODE_APP_ERROR;
×
322
      mFatal("mount:%s, failed to acquire mount since %s", mountName, terrstr());
×
323
    }
324
  }
325
  return pObj;
17✔
326
}
327

328
void mndReleaseMount(SMnode *pMnode, SMountObj *pObj) {
15✔
329
  SSdb *pSdb = pMnode->pSdb;
15✔
330
  sdbRelease(pSdb, pObj);
15✔
331
}
15✔
332

333
bool mndMountIsExist(SMnode *pMnode, const char *mountName) {
×
334
  SMountObj *pObj = mndAcquireMount(pMnode, mountName);
×
335
  if (pObj == NULL) {
×
336
    return false;
×
337
  }
338
  mndReleaseMount(pMnode, pObj);
×
339
  return true;
×
340
}
341

342
void *mndBuildRetrieveMountPathReq(SMnode *pMnode, SRpcMsg *pMsg, const char *mountName, const char *mountPath,
6✔
343
                                   int32_t dnodeId, int32_t *pContLen) {
344
  int32_t code = 0, lino = 0;
6✔
345
  void   *pBuf = NULL;
6✔
346

347
  SRetrieveMountPathReq req = {0};
6✔
348
  req.dnodeId = dnodeId;
6✔
349
  req.pVal = &pMsg->info;
6✔
350
  req.valLen = sizeof(pMsg->info);
6✔
351
  TAOS_UNUSED(snprintf(req.mountName, TSDB_MOUNT_NAME_LEN, "%s", mountName));
6✔
352
  TAOS_UNUSED(snprintf(req.mountPath, TSDB_MOUNT_PATH_LEN, "%s", mountPath));
6✔
353

354
  int32_t contLen = tSerializeSRetrieveMountPathReq(NULL, 0, &req);
6✔
355
  TAOS_CHECK_EXIT(contLen);
6!
356
  TSDB_CHECK_NULL((pBuf = rpcMallocCont(contLen)), code, lino, _exit, terrno);
6!
357
  TAOS_CHECK_EXIT(tSerializeSRetrieveMountPathReq(pBuf, contLen, &req));
6!
358
_exit:
6✔
359
  if (code < 0) {
6!
360
    rpcFreeCont(pBuf);
×
361
    terrno = code;
×
362
    return NULL;
×
363
  }
364
  *pContLen = contLen;
6✔
365
  return pBuf;
6✔
366
}
367

368
#if 0
369
static int32_t mndSetCreateMountUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
370
  int32_t code = 0;
371
  for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
372
    SVgObj *pVgroup = pVgroups + vg;
373

374
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
375
      SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
376
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false));
377
    }
378
  }
379

380
  TAOS_RETURN(code);
381
}
382
#endif
383

384

385
#ifndef TD_ENTERPRISE
386
int32_t mndCreateMount(SMnode *pMnode, SRpcMsg *pReq, SMountInfo *pInfo, SUserObj *pUser) {
387
  return TSDB_CODE_OPS_NOT_SUPPORT;
388
}
389
#endif
390

391
static int32_t mndRetrieveMountInfo(SMnode *pMnode, SRpcMsg *pMsg, SCreateMountReq *pReq) {
6✔
392
  int32_t    code = 0, lino = 0;
6✔
393
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pReq->dnodeIds[0]);
6✔
394
  if (pDnode == NULL) TAOS_RETURN(terrno);
6!
395
  if (pDnode->offlineReason != DND_REASON_ONLINE) {
6!
396
    mndReleaseDnode(pMnode, pDnode);
×
397
    TAOS_RETURN(TSDB_CODE_DNODE_OFFLINE);
×
398
  }
399
  SEpSet epSet = mndGetDnodeEpset(pDnode);
6✔
400
  mndReleaseDnode(pMnode, pDnode);
6✔
401

402
  int32_t bufLen = 0;
6✔
403
  void   *pBuf =
404
      mndBuildRetrieveMountPathReq(pMnode, pMsg, pReq->mountName, pReq->mountPaths[0], pReq->dnodeIds[0], &bufLen);
6✔
405
  if (pBuf == NULL) TAOS_RETURN(terrno);
6!
406

407
  SRpcMsg rpcMsg = {.msgType = TDMT_DND_RETRIEVE_MOUNT_PATH, .pCont = pBuf, .contLen = bufLen};
6✔
408
  TAOS_CHECK_EXIT(tmsgSendReq(&epSet, &rpcMsg));
6!
409

410
  pMsg->info.handle = NULL;  // disable auto rsp to client
6✔
411
_exit:
6✔
412
  TAOS_RETURN(code);
6✔
413
}
414

415
static int32_t mndProcessRetrieveMountPathRsp(SRpcMsg *pRsp) {
6✔
416
  int32_t    code = 0, lino = 0;
6✔
417
  int32_t    rspCode = 0;
6✔
418
  SMnode    *pMnode = pRsp->info.node;
6✔
419
  SMountInfo mntInfo = {0};
6✔
420
  SDecoder   decoder = {0};
6✔
421
  void      *pBuf = NULL;
6✔
422
  int32_t    bufLen = 0;
6✔
423
  bool       rspToClient = false;
6✔
424

425
  // step 1: decode and preprocess in mnode read thread
426
  tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
6✔
427
  TAOS_CHECK_EXIT(tDeserializeSMountInfo(&decoder, &mntInfo, false));
6!
428
  const STraceId *trace = &pRsp->info.traceId;
6✔
429
  SRpcMsg         rsp = {
6✔
430
      // .code = pRsp->code,
431
      // .pCont = pRsp->info.rsp,
432
      // .contLen = pRsp->info.rspLen,
433
              .info = *(SRpcHandleInfo *)mntInfo.pVal,
6✔
434
  };
435
  rspToClient = true;
6✔
436
  if (pRsp->code != 0) {
6✔
437
    TAOS_CHECK_EXIT(pRsp->code);
4!
438
  }
439

440
  // wait for all retrieve response received
441
  // TODO: ...
442
  // make sure the clusterId from all rsp is the same, but not with the clusterId of the host cluster
443
  if (mntInfo.clusterId == pMnode->clusterId) {
2!
444
    mError("mount:%s, clusterId:%" PRIi64 " from dnode is identical to the host cluster's id:%" PRIi64,
×
445
           mntInfo.mountName, mntInfo.clusterId, pMnode->clusterId);
446
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_CLUSTER_EXIST);
×
447
  }
448

449
  // step 2: collect the responses from dnodes, process and push to mnode write thread to run as transaction
450
  // TODO: multiple retrieve dnodes and paths supported later
451
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mntInfo)) >= 0, code, lino, _exit, bufLen);
2!
452
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), code, lino, _exit, terrno);
2!
453
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mntInfo)) >= 0, code, lino, _exit, bufLen);
2!
454
  SRpcMsg rpcMsg = {.pCont = pBuf, .contLen = bufLen, .msgType = TDMT_MND_EXECUTE_MOUNT, .info.noResp = 1};
2✔
455
  SEpSet  mnodeEpset = {0};
2✔
456
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
2✔
457

458
  SMountObj *pObj = NULL;
2✔
459
  if ((pObj = mndAcquireMount(pMnode, mntInfo.mountName))) {
2!
460
    mndReleaseMount(pMnode, pObj);
×
461
    if (mntInfo.ignoreExist) {
×
462
      mInfo("mount:%s, already exist, ignore exist is set", mntInfo.mountName);
×
463
      code = 0;
×
464
      goto _exit;
×
465
    } else {
466
      TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_ALREADY_EXIST);
×
467
    }
468
  } else {
469
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
2!
470
      // continue
471
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
472
      TAOS_CHECK_EXIT(code);
×
473
    }
474
  }
475
  TAOS_CHECK_EXIT(tmsgSendReq(&mnodeEpset, &rpcMsg));
2!
476
_exit:
2✔
477
  if (code == 0) {
6✔
478
    mGInfo("mount:%s, msg:%p, retrieve mount path rsp with code:%d", mntInfo.mountName, pRsp, pRsp->code);
2!
479
  } else {
480
    mError("mount:%s, msg:%p, failed at line %d to retrieve mount path rsp since %s", mntInfo.mountName, pRsp, lino,
4!
481
           tstrerror(code));
482
    if (rspToClient) {
4!
483
      rsp.code = code;
4✔
484
      tmsgSendRsp(&rsp);
4✔
485
    }
486
  }
487
  tDecoderClear(&decoder);
6✔
488
  tFreeMountInfo(&mntInfo, false);
6✔
489
  TAOS_RETURN(code);
6✔
490
}
491

492
static int32_t mndProcessCreateMountReq(SRpcMsg *pReq) {
9✔
493
  int32_t         code = 0, lino = 0;
9✔
494
  SMnode         *pMnode = pReq->info.node;
9✔
495
  SDbObj         *pDb = NULL;
9✔
496
  SMountObj      *pObj = NULL;
9✔
497
  SUserObj       *pUser = NULL;
9✔
498
  SCreateMountReq createReq = {0};
9✔
499

500
  TAOS_CHECK_EXIT(tDeserializeSCreateMountReq(pReq->pCont, pReq->contLen, &createReq));
9!
501
  mInfo("mount:%s, start to create on dnode %d from %s", createReq.mountName, *createReq.dnodeIds,
9!
502
        createReq.mountPaths[0]);  // TODO: mutiple mounts
503

504
  if ((pObj = mndAcquireMount(pMnode, createReq.mountName))) {
9✔
505
    if (createReq.ignoreExist) {
2!
506
      mInfo("mount:%s, already exist, ignore exist is set", createReq.mountName);
×
507
      code = 0;
×
508
      goto _exit;
×
509
    } else {
510
      code = TSDB_CODE_MND_MOUNT_ALREADY_EXIST;
2✔
511
      goto _exit;
2✔
512
    }
513
  } else {
514
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
7!
515
      // continue
516
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
517
      goto _exit;
×
518
    }
519
  }
520
  // mount operation share the privileges of db
521
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MOUNT, (SDbObj *)pObj));
7!
522
  TAOS_CHECK_EXIT(grantCheck(TSDB_GRANT_MOUNT));
7!
523
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
7!
524
  char fullMountName[TSDB_MOUNT_NAME_LEN + 32] = {0};
7✔
525
  (void)snprintf(fullMountName, sizeof(fullMountName), "%d.%s", pUser->acctId, createReq.mountName);
7✔
526
  if ((pDb = mndAcquireDb(pMnode, fullMountName))) {
7✔
527
    mndReleaseDb(pMnode, pDb);
1✔
528
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_DB_NAME_EXIST);
1!
529
  }
530

531
  TAOS_CHECK_EXIT(mndRetrieveMountInfo(pMnode, pReq, &createReq));
6!
532
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
6!
533

534
  auditRecord(pReq, pMnode->clusterId, "createMount", createReq.mountName, "", createReq.sql, createReq.sqlLen);
6✔
535

536
_exit:
9✔
537
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
9!
538
    mError("mount:%s, dnode:%d, path:%s, failed to create at line:%d since %s", createReq.mountName,
3!
539
           createReq.dnodeIds ? createReq.dnodeIds[0] : 0, createReq.mountPaths ? createReq.mountPaths[0] : "", lino,
540
           tstrerror(code));  // TODO: mutiple mounts
541
  }
542

543
  mndReleaseMount(pMnode, pObj);
9✔
544
  mndReleaseUser(pMnode, pUser);
9✔
545
  tFreeSCreateMountReq(&createReq);
9✔
546

547
  TAOS_RETURN(code);
9✔
548
}
549

550
static int32_t mndProcessExecuteMountReq(SRpcMsg *pReq) {
2✔
551
  int32_t    code = 0, lino = 0;
2✔
552
  SMnode    *pMnode = pReq->info.node;
2✔
553
  SDbObj    *pDb = NULL;
2✔
554
  SMountObj *pObj = NULL;
2✔
555
  SUserObj  *pUser = NULL;
2✔
556
  SMountInfo mntInfo = {0};
2✔
557
  SDecoder   decoder = {0};
2✔
558
  SRpcMsg    rsp = {0};
2✔
559
  bool       rspToClient = false;
2✔
560

561
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
2✔
562

563
  TAOS_CHECK_EXIT(tDeserializeSMountInfo(&decoder, &mntInfo, true));
2!
564
  rspToClient = true;
2✔
565
  mInfo("mount:%s, start to execute on mnode", mntInfo.mountName);
2!
566

567
  if ((pDb = mndAcquireDb(pMnode, mntInfo.mountName))) {
2!
568
    mndReleaseDb(pMnode, pDb);
×
569
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_DB_NAME_EXIST);
×
570
  }
571

572
  if ((pObj = mndAcquireMount(pMnode, mntInfo.mountName))) {
2!
573
    if (mntInfo.ignoreExist) {
×
574
      mInfo("mount:%s, already exist, ignore exist is set", mntInfo.mountName);
×
575
      code = 0;
×
576
      goto _exit;
×
577
    } else {
578
      TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_ALREADY_EXIST);
×
579
    }
580
  } else {
581
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
2!
582
      // continue
583
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
584
      TAOS_CHECK_EXIT(code);
×
585
    }
586
  }
587
  // mount operation share the privileges of db
588
  TAOS_CHECK_EXIT(grantCheck(TSDB_GRANT_MOUNT));  // TODO: implement when the plan is ready
2!
589
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
2!
590

591
  TAOS_CHECK_EXIT(mndCreateMount(pMnode, pReq, &mntInfo, pUser));
2!
592
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
2!
593
_exit:
×
594
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2!
595
    // TODO: mutiple path mount
596
    rsp.code = code;
×
597
    mError("mount:%s, dnode:%d, path:%s, failed to create at line:%d since %s", mntInfo.mountName, mntInfo.dnodeId,
×
598
           mntInfo.mountPath, lino, tstrerror(code));
599
  }
600
  if (rspToClient) {
2!
601
    rsp.info = *(SRpcHandleInfo *)mntInfo.pVal, tmsgSendRsp(&rsp);
2✔
602
    tmsgSendRsp(&rsp);
2✔
603
  }
604
  mndReleaseMount(pMnode, pObj);
2✔
605
  mndReleaseUser(pMnode, pUser);
2✔
606
  tDecoderClear(&decoder);
2✔
607
  tFreeMountInfo(&mntInfo, true);
2✔
608

609
  TAOS_RETURN(code);
2✔
610
}
611

612
int32_t mndBuildDropMountRsp(SMountObj *pObj, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
2✔
613
  int32_t       code = 0;
2✔
614
  SDropMountRsp dropRsp = {0};
2✔
615
  if (pObj != NULL) {
2!
616
    (void)memcpy(dropRsp.name, pObj->name, TSDB_MOUNT_NAME_LEN);
2✔
617
    dropRsp.uid = pObj->uid;
2✔
618
  }
619

620
  int32_t rspLen = tSerializeSDropMountRsp(NULL, 0, &dropRsp);
2✔
621
  void   *pRsp = NULL;
2✔
622
  if (useRpcMalloc) {
2!
623
    pRsp = rpcMallocCont(rspLen);
×
624
  } else {
625
    pRsp = taosMemoryMalloc(rspLen);
2!
626
  }
627

628
  if (pRsp == NULL) {
2!
629
    code = terrno;
×
630
    TAOS_RETURN(code);
×
631
  }
632

633
  int32_t ret = 0;
2✔
634
  if ((ret = tSerializeSDropMountRsp(pRsp, rspLen, &dropRsp)) < 0) return ret;
2!
635
  *pRspLen = rspLen;
2✔
636
  *ppRsp = pRsp;
2✔
637
  TAOS_RETURN(code);
2✔
638
}
639

640
bool mndHasMountOnDnode(SMnode *pMnode, int32_t dnodeId) {
29✔
641
  SSdb *pSdb = pMnode->pSdb;
29✔
642
  void *pIter = NULL;
29✔
643

644
  while (1) {
×
645
    SMountObj *pMount = NULL;
29✔
646
    pIter = sdbFetch(pSdb, SDB_MOUNT, pIter, (void **)&pMount);
29✔
647
    if (pIter == NULL) break;
29!
648

649
    for (int32_t i = 0; i < pMount->nMounts; ++i) {
×
650
      if (pMount->dnodeIds[i] == dnodeId) {
×
651
        sdbRelease(pSdb, pMount);
×
652
        return true;
×
653
      }
654
    }
655
    sdbRelease(pSdb, pMount);
×
656
  }
657
  return false;
29✔
658
}
659

660
#ifndef TD_ENTERPRISE
661
int32_t mndDropMount(SMnode *pMnode, SRpcMsg *pReq, SMountObj *pObj) { return TSDB_CODE_OPS_NOT_SUPPORT; }
662
#endif
663

664
static int32_t mndProcessDropMountReq(SRpcMsg *pReq) {
4✔
665
  SMnode       *pMnode = pReq->info.node;
4✔
666
  int32_t       code = -1;
4✔
667
  SMountObj    *pObj = NULL;
4✔
668
  SDropMountReq dropReq = {0};
4✔
669

670
  TAOS_CHECK_GOTO(tDeserializeSDropMountReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
4!
671

672
  mInfo("mount:%s, start to drop", dropReq.mountName);
4!
673

674
  pObj = mndAcquireMount(pMnode, dropReq.mountName);
4✔
675
  if (pObj == NULL) {
4✔
676
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
2✔
677
    if (terrno != 0) code = terrno;
2!
678
    if (dropReq.ignoreNotExists) {
2!
679
      code = mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
680
    }
681
    goto _exit;
2✔
682
  }
683

684
  // mount operation share the privileges of db
685
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MOUNT, (SDbObj *)pObj), NULL, _exit);
2!
686

687
  code = mndDropMount(pMnode, pReq, pObj);
2✔
688
  if (code == TSDB_CODE_SUCCESS) {
2!
689
    code = TSDB_CODE_ACTION_IN_PROGRESS;
2✔
690
  }
691

692
  // SName name = {0};
693
  // if (tNameFromString(&name, dropReq.mountName, T_NAME_ACCT | T_NAME_DB) < 0)
694
  //   mError("mount:%s, failed to parse db name", dropReq.mountName);
695

696
  auditRecord(pReq, pMnode->clusterId, "dropMount", dropReq.mountName, "", dropReq.sql, dropReq.sqlLen);
2✔
697

698
_exit:
4✔
699
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
4!
700
    mError("mount:%s, failed to drop since %s", dropReq.mountName, tstrerror(code));
2!
701
  }
702

703
  mndReleaseMount(pMnode, pObj);
4✔
704
  tFreeSDropMountReq(&dropReq);
4✔
705
  TAOS_RETURN(code);
4✔
706
}
707

708
static int32_t mndRetrieveMounts(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
48✔
709
  SMnode          *pMnode = pReq->info.node;
48✔
710
  int32_t          code = 0, lino = 0;
48✔
711
  int32_t          numOfRows = 0;
48✔
712
  int32_t          cols = 0;
48✔
713
  char             tmp[512];
714
  int32_t          tmpLen = 0;
48✔
715
  int32_t          bufLen = 0;
48✔
716
  char            *pBuf = NULL;
48✔
717
  char            *qBuf = NULL;
48✔
718
  void            *pIter = NULL;
48✔
719
  SSdb            *pSdb = pMnode->pSdb;
48✔
720
  SColumnInfoData *pColInfo = NULL;
48✔
721

722
  pBuf = tmp;
48✔
723
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
48✔
724
  if (pShow->numOfRows < 1) {
48!
725
    SMountObj *pObj = NULL;
48✔
726
    int32_t    index = 0;
48✔
727
    while ((pIter = sdbFetch(pSdb, SDB_MOUNT, pIter, (void **)&pObj))) {
92✔
728
      cols = 0;
44✔
729
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
44✔
730
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
44✔
731
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
44✔
732
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
44✔
733
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
44!
734

735
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
44!
736
        // TAOS_UNUSED(snprintf(pBuf, bufLen, "%d", *(int32_t *)pObj->dnodeIds));  // TODO: support mutiple dnodes
737
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->dnodeIds[0], false, pObj, pIter, _exit);
44!
738
      }
739

740
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
44!
741
        // TAOS_UNUSED(snprintf(pBuf, bufLen, "%" PRIi64, pObj->createdTime));
742
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
44!
743
      }
744

745
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
44!
746
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
44✔
747
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->paths[0]));  // TODO: support mutiple paths
44✔
748
        varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
44✔
749
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
44!
750
      }
751

752
      sdbRelease(pSdb, pObj);
44✔
753
      ++numOfRows;
44✔
754
    }
755
  }
756

757
  pShow->numOfRows += numOfRows;
48✔
758

759
_exit:
48✔
760
  if (code < 0) {
48!
761
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
762
    TAOS_RETURN(code);
×
763
  }
764
  return numOfRows;
48✔
765
}
766

767
static void mndCancelGetNextMount(SMnode *pMnode, void *pIter) {
×
768
  SSdb *pSdb = pMnode->pSdb;
×
769
  sdbCancelFetchByType(pSdb, pIter, SDB_MOUNT);
×
770
}
×
771

772
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc