• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4800

16 Oct 2025 09:19AM UTC coverage: 53.935% (-7.1%) from 61.083%
#4800

push

travis-ci

web-flow
Merge b32e3a393 into a190048d5

134724 of 323629 branches covered (41.63%)

Branch coverage included in aggregate %.

184803 of 268802 relevant lines covered (68.75%)

69058627.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.0
/source/dnode/mnode/impl/src/mndMount.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifdef USE_MOUNT
16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "command.h"
19
#include "mndArbGroup.h"
20
#include "mndCluster.h"
21
#include "mndConfig.h"
22
#include "mndDb.h"
23
#include "mndDnode.h"
24
#include "mndIndex.h"
25
#include "mndIndexComm.h"
26
#include "mndMnode.h"
27
#include "mndMount.h"
28
#include "mndPrivilege.h"
29
#include "mndShow.h"
30
#include "mndSma.h"
31
#include "mndStb.h"
32
#include "mndStream.h"
33
#include "mndSubscribe.h"
34
#include "mndTopic.h"
35
#include "mndTrans.h"
36
#include "mndUser.h"
37
#include "mndVgroup.h"
38
#include "mndView.h"
39
#include "systable.h"
40
#include "thttp.h"
41
#include "tjson.h"
42

43
#define MND_MOUNT_VER_NUMBER 1
44

45
static int32_t  mndMountActionInsert(SSdb *pSdb, SMountObj *pObj);
46
static int32_t  mndMountActionDelete(SSdb *pSdb, SMountObj *pObj);
47
static int32_t  mndMountActionUpdate(SSdb *pSdb, SMountObj *pOld, SMountObj *pNew);
48
static int32_t  mndNewMountActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
49

50
static int32_t mndProcessCreateMountReq(SRpcMsg *pReq);
51
static int32_t mndProcessDropMountReq(SRpcMsg *pReq);
52
static int32_t mndProcessExecuteMountReq(SRpcMsg *pReq);
53
static int32_t mndProcessRetrieveMountPathRsp(SRpcMsg *pRsp);
54
static int32_t mndRetrieveMounts(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity);
55
static void    mndCancelGetNextMount(SMnode *pMnode, void *pIter);
56

57
int32_t mndInitMount(SMnode *pMnode) {
167,873✔
58
  SSdbTable table = {
167,873✔
59
      .sdbType = SDB_MOUNT,
60
      .keyType = SDB_KEY_BINARY,
61
      .encodeFp = (SdbEncodeFp)mndMountActionEncode,
62
      .decodeFp = (SdbDecodeFp)mndMountActionDecode,
63
      .insertFp = (SdbInsertFp)mndMountActionInsert,
64
      .updateFp = (SdbUpdateFp)mndMountActionUpdate,
65
      .deleteFp = (SdbDeleteFp)mndMountActionDelete,
66
      .validateFp = (SdbValidateFp)mndNewMountActionValidate,
67
  };
68

69
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MOUNT, mndProcessCreateMountReq);
167,873✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MOUNT, mndProcessDropMountReq);
167,873✔
71
  mndSetMsgHandle(pMnode, TDMT_MND_EXECUTE_MOUNT, mndProcessExecuteMountReq);
167,873✔
72
  mndSetMsgHandle(pMnode, TDMT_DND_RETRIEVE_MOUNT_PATH_RSP, mndProcessRetrieveMountPathRsp);
167,873✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_MOUNT_VNODE_RSP, mndTransProcessRsp);
167,873✔
74

75
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MOUNT, mndRetrieveMounts);
167,873✔
76
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MOUNT, mndCancelGetNextMount);
167,873✔
77

78
  return sdbSetTable(pMnode->pSdb, table);
167,873✔
79
}
80

81
void mndCleanupMount(SMnode *pMnode) {}
167,827✔
82

83
void mndMountFreeObj(SMountObj *pObj) {
×
84
  if (pObj) {
×
85
    taosMemoryFreeClear(pObj->dnodeIds);
×
86
    taosMemoryFreeClear(pObj->dbObj);
×
87
    if (pObj->paths) {
×
88
      for (int32_t i = 0; i < pObj->nMounts; ++i) {
×
89
        taosMemoryFreeClear(pObj->paths[i]);
×
90
      }
91
      taosMemoryFreeClear(pObj->paths);
×
92
    }
93
  }
94
}
×
95

96
void mndMountDestroyObj(SMountObj *pObj) {
×
97
  if (pObj) {
×
98
    mndMountFreeObj(pObj);
×
99
    taosMemoryFree(pObj);
×
100
  }
101
}
×
102

103
static int32_t tSerializeSMountObj(void *buf, int32_t bufLen, const SMountObj *pObj) {
×
104
  int32_t  code = 0, lino = 0;
×
105
  int32_t  tlen = 0;
×
106
  SEncoder encoder = {0};
×
107
  tEncoderInit(&encoder, buf, bufLen);
×
108

109
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
110

111
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
×
112
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->acct));
×
113
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
×
114
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
×
115
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
×
116
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
×
117
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nMounts));
×
118
  for (int16_t i = 0; i < pObj->nMounts; ++i) {
×
119
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->dnodeIds[i]));
×
120
    TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->paths[i]));
×
121
  }
122
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nDbs));
×
123
  for (int16_t i = 0; i < pObj->nDbs; ++i) {
×
124
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->dbObj[i].uid));
×
125
    TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbObj[i].name));
×
126
  }
127
  tEndEncode(&encoder);
×
128

129
  tlen = encoder.pos;
×
130
_exit:
×
131
  tEncoderClear(&encoder);
×
132
  if (code < 0) {
×
133
    mError("mount, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
134
    TAOS_RETURN(code);
×
135
  }
136

137
  return tlen;
×
138
}
139

140
static int32_t tDeserializeSMountObj(void *buf, int32_t bufLen, SMountObj *pObj) {
×
141
  int32_t  code = 0, lino = 0;
×
142
  SDecoder decoder = {0};
×
143
  tDecoderInit(&decoder, buf, bufLen);
×
144

145
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
146

147
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
×
148
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->acct));
×
149
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
×
150
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
×
151
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
×
152
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
×
153
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nMounts));
×
154
  if (pObj->nMounts > 0) {
×
155
    if (!(pObj->dnodeIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nMounts))) {
×
156
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
157
    }
158
    if (!(pObj->paths = taosMemoryMalloc(sizeof(char *) * pObj->nMounts))) {
×
159
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
160
    }
161
    for (int16_t i = 0; i < pObj->nMounts; ++i) {
×
162
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->dnodeIds[i]));
×
163
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pObj->paths[i]));
×
164
    }
165
  }
166
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nDbs));
×
167
  if (pObj->nDbs > 0) {
×
168
    if (!(pObj->dbObj = taosMemoryMalloc(sizeof(SMountDbObj) * pObj->nDbs))) {
×
169
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
170
    }
171
    for (int16_t i = 0; i < pObj->nDbs; ++i) {
×
172
      TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbObj[i].uid));
×
173
      TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbObj[i].name));
×
174
    }
175
  }
176

177
_exit:
×
178
  tEndDecode(&decoder);
×
179
  tDecoderClear(&decoder);
×
180
  if (code < 0) {
×
181
    mError("mount, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
182
  }
183
  TAOS_RETURN(code);
×
184
}
185

186
SSdbRaw *mndMountActionEncode(SMountObj *pObj) {
×
187
  int32_t  code = 0, lino = 0;
×
188
  void    *buf = NULL;
×
189
  SSdbRaw *pRaw = NULL;
×
190
  int32_t  tlen = tSerializeSMountObj(NULL, 0, pObj);
×
191
  if (tlen < 0) {
×
192
    TAOS_CHECK_EXIT(tlen);
×
193
  }
194

195
  int32_t size = sizeof(int32_t) + tlen;
×
196
  pRaw = sdbAllocRaw(SDB_MOUNT, MND_MOUNT_VER_NUMBER, size);
×
197
  if (pRaw == NULL) {
×
198
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
199
  }
200

201
  buf = taosMemoryMalloc(tlen);
×
202
  if (buf == NULL) {
×
203
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
204
  }
205

206
  tlen = tSerializeSMountObj(buf, tlen, pObj);
×
207
  if (tlen < 0) {
×
208
    TAOS_CHECK_EXIT(tlen);
×
209
  }
210

211
  int32_t dataPos = 0;
×
212
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
×
213
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
×
214
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
×
215

216
_exit:
×
217
  taosMemoryFreeClear(buf);
×
218
  if (code != TSDB_CODE_SUCCESS) {
×
219
    terrno = code;
×
220
    mError("mount, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
221
    sdbFreeRaw(pRaw);
×
222
    return NULL;
×
223
  }
224

225
  mTrace("mount, encode to raw:%p, row:%p", pRaw, pObj);
×
226
  return pRaw;
×
227
}
228

229
SSdbRow *mndMountActionDecode(SSdbRaw *pRaw) {
×
230
  int32_t    code = 0, lino = 0;
×
231
  SSdbRow   *pRow = NULL;
×
232
  SMountObj *pObj = NULL;
×
233
  void      *buf = NULL;
×
234

235
  int8_t sver = 0;
×
236
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
237
    goto _exit;
×
238
  }
239

240
  if (sver != MND_MOUNT_VER_NUMBER) {
×
241
    code = TSDB_CODE_SDB_INVALID_DATA_VER;
×
242
    mError("mount read invalid ver, data ver: %d, curr ver: %d", sver, MND_MOUNT_VER_NUMBER);
×
243
    goto _exit;
×
244
  }
245

246
  if (!(pRow = sdbAllocRow(sizeof(SMountObj)))) {
×
247
    code = TSDB_CODE_OUT_OF_MEMORY;
×
248
    goto _exit;
×
249
  }
250

251
  if (!(pObj = sdbGetRowObj(pRow))) {
×
252
    code = TSDB_CODE_OUT_OF_MEMORY;
×
253
    goto _exit;
×
254
  }
255

256
  int32_t tlen;
×
257
  int32_t dataPos = 0;
×
258
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
×
259
  buf = taosMemoryMalloc(tlen + 1);
×
260
  if (buf == NULL) {
×
261
    code = TSDB_CODE_OUT_OF_MEMORY;
×
262
    goto _exit;
×
263
  }
264
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
×
265

266
  if (tDeserializeSMountObj(buf, tlen, pObj) < 0) {
×
267
    code = TSDB_CODE_OUT_OF_MEMORY;
×
268
    goto _exit;
×
269
  }
270

271
  taosInitRWLatch(&pObj->lock);
×
272

273
_exit:
×
274
  taosMemoryFreeClear(buf);
×
275
  if (code != TSDB_CODE_SUCCESS) {
×
276
    terrno = code;
×
277
    mError("mount, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
278
    mndMountFreeObj(pObj);
×
279
    taosMemoryFreeClear(pRow);
×
280
    return NULL;
×
281
  }
282
  mTrace("mount, decode from raw:%p, row:%p", pRaw, pObj);
×
283
  return pRow;
×
284
}
285

286
static int32_t mndNewMountActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
×
287
  mTrace("mount, validate new mount action, raw:%p", pRaw);
×
288
  return 0;
×
289
}
290

291
static int32_t mndMountActionInsert(SSdb *pSdb, SMountObj *pObj) {
×
292
  mTrace("mount:%s, perform insert action, row:%p", pObj->name, pObj);
×
293
  return 0;
×
294
}
295

296
static int32_t mndMountActionDelete(SSdb *pSdb, SMountObj *pObj) {
×
297
  mTrace("mount:%s, perform delete action, row:%p", pObj->name, pObj);
×
298
  mndMountFreeObj(pObj);
×
299
  return 0;
×
300
}
301

302
static int32_t mndMountActionUpdate(SSdb *pSdb, SMountObj *pOld, SMountObj *pNew) {
×
303
  mTrace("mount:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
×
304
  taosWLockLatch(&pOld->lock);
×
305
  pOld->updateTime = pNew->updateTime;
×
306
  taosWUnLockLatch(&pOld->lock);
×
307
  return 0;
×
308
}
309

310
SMountObj *mndAcquireMount(SMnode *pMnode, const char *mountName) {
×
311
  SSdb      *pSdb = pMnode->pSdb;
×
312
  SMountObj *pObj = sdbAcquire(pSdb, SDB_MOUNT, mountName);
×
313
  if (pObj == NULL) {
×
314
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
315
      terrno = TSDB_CODE_MND_MOUNT_NOT_EXIST;
×
316
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
317
      terrno = TSDB_CODE_MND_MOUNT_IN_CREATING;
×
318
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
319
      terrno = TSDB_CODE_MND_MOUNT_IN_DROPPING;
×
320
    } else {
321
      terrno = TSDB_CODE_APP_ERROR;
×
322
      mFatal("mount:%s, failed to acquire mount since %s", mountName, terrstr());
×
323
    }
324
  }
325
  return pObj;
×
326
}
327

328
void mndReleaseMount(SMnode *pMnode, SMountObj *pObj) {
×
329
  SSdb *pSdb = pMnode->pSdb;
×
330
  sdbRelease(pSdb, pObj);
×
331
}
×
332

333
bool mndMountIsExist(SMnode *pMnode, const char *mountName) {
×
334
  SMountObj *pObj = mndAcquireMount(pMnode, mountName);
×
335
  if (pObj == NULL) {
×
336
    return false;
×
337
  }
338
  mndReleaseMount(pMnode, pObj);
×
339
  return true;
×
340
}
341

342
void *mndBuildRetrieveMountPathReq(SMnode *pMnode, SRpcMsg *pMsg, const char *mountName, const char *mountPath,
×
343
                                   int32_t dnodeId, int32_t *pContLen) {
344
  int32_t code = 0, lino = 0;
×
345
  void   *pBuf = NULL;
×
346

347
  SRetrieveMountPathReq req = {0};
×
348
  req.dnodeId = dnodeId;
×
349
  req.pVal = &pMsg->info;
×
350
  req.valLen = sizeof(pMsg->info);
×
351
  TAOS_UNUSED(snprintf(req.mountName, TSDB_MOUNT_NAME_LEN, "%s", mountName));
×
352
  TAOS_UNUSED(snprintf(req.mountPath, TSDB_MOUNT_PATH_LEN, "%s", mountPath));
×
353

354
  int32_t contLen = tSerializeSRetrieveMountPathReq(NULL, 0, &req);
×
355
  TAOS_CHECK_EXIT(contLen);
×
356
  TSDB_CHECK_NULL((pBuf = rpcMallocCont(contLen)), code, lino, _exit, terrno);
×
357
  TAOS_CHECK_EXIT(tSerializeSRetrieveMountPathReq(pBuf, contLen, &req));
×
358
_exit:
×
359
  if (code < 0) {
×
360
    rpcFreeCont(pBuf);
×
361
    terrno = code;
×
362
    return NULL;
×
363
  }
364
  *pContLen = contLen;
×
365
  return pBuf;
×
366
}
367

368
#if 0
369
static int32_t mndSetCreateMountUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
370
  int32_t code = 0;
371
  for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
372
    SVgObj *pVgroup = pVgroups + vg;
373

374
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
375
      SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
376
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false));
377
    }
378
  }
379

380
  TAOS_RETURN(code);
381
}
382
#endif
383

384

385
#ifndef TD_ENTERPRISE
386
int32_t mndCreateMount(SMnode *pMnode, SRpcMsg *pReq, SMountInfo *pInfo, SUserObj *pUser) {
387
  return TSDB_CODE_OPS_NOT_SUPPORT;
388
}
389
#endif
390

391
static int32_t mndRetrieveMountInfo(SMnode *pMnode, SRpcMsg *pMsg, SCreateMountReq *pReq) {
×
392
  int32_t    code = 0, lino = 0;
×
393
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pReq->dnodeIds[0]);
×
394
  if (pDnode == NULL) TAOS_RETURN(terrno);
×
395
  if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
396
    mndReleaseDnode(pMnode, pDnode);
×
397
    TAOS_RETURN(TSDB_CODE_DNODE_OFFLINE);
×
398
  }
399
  SEpSet epSet = mndGetDnodeEpset(pDnode);
×
400
  mndReleaseDnode(pMnode, pDnode);
×
401

402
  int32_t bufLen = 0;
×
403
  void   *pBuf =
404
      mndBuildRetrieveMountPathReq(pMnode, pMsg, pReq->mountName, pReq->mountPaths[0], pReq->dnodeIds[0], &bufLen);
×
405
  if (pBuf == NULL) TAOS_RETURN(terrno);
×
406

407
  SRpcMsg rpcMsg = {.msgType = TDMT_DND_RETRIEVE_MOUNT_PATH, .pCont = pBuf, .contLen = bufLen};
×
408
  TAOS_CHECK_EXIT(tmsgSendReq(&epSet, &rpcMsg));
×
409

410
  pMsg->info.handle = NULL;  // disable auto rsp to client
×
411
_exit:
×
412
  TAOS_RETURN(code);
×
413
}
414

415
static int32_t mndProcessRetrieveMountPathRsp(SRpcMsg *pRsp) {
×
416
  int32_t    code = 0, lino = 0;
×
417
  int32_t    rspCode = 0;
×
418
  SMnode    *pMnode = pRsp->info.node;
×
419
  SMountInfo mntInfo = {0};
×
420
  SDecoder   decoder = {0};
×
421
  void      *pBuf = NULL;
×
422
  int32_t    bufLen = 0;
×
423
  bool       rspToClient = false;
×
424

425
  // step 1: decode and preprocess in mnode read thread
426
  tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
×
427
  TAOS_CHECK_EXIT(tDeserializeSMountInfo(&decoder, &mntInfo, false));
×
428
  const STraceId *trace = &pRsp->info.traceId;
×
429
  SRpcMsg         rsp = {
×
430
      // .code = pRsp->code,
431
      // .pCont = pRsp->info.rsp,
432
      // .contLen = pRsp->info.rspLen,
433
              .info = *(SRpcHandleInfo *)mntInfo.pVal,
×
434
  };
435
  rspToClient = true;
×
436
  if (pRsp->code != 0) {
×
437
    TAOS_CHECK_EXIT(pRsp->code);
×
438
  }
439

440
  // wait for all retrieve response received
441
  // TODO: ...
442
  // make sure the clusterId from all rsp is the same, but not with the clusterId of the host cluster
443
  if (mntInfo.clusterId == pMnode->clusterId) {
×
444
    mError("mount:%s, clusterId:%" PRIi64 " from dnode is identical to the host cluster's id:%" PRIi64,
×
445
           mntInfo.mountName, mntInfo.clusterId, pMnode->clusterId);
446
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_CLUSTER_EXIST);
×
447
  }
448

449
  // step 2: collect the responses from dnodes, process and push to mnode write thread to run as transaction
450
  // TODO: multiple retrieve dnodes and paths supported later
451
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mntInfo)) >= 0, code, lino, _exit, bufLen);
×
452
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), code, lino, _exit, terrno);
×
453
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mntInfo)) >= 0, code, lino, _exit, bufLen);
×
454
  SRpcMsg rpcMsg = {.pCont = pBuf, .contLen = bufLen, .msgType = TDMT_MND_EXECUTE_MOUNT, .info.noResp = 1};
×
455
  SEpSet  mnodeEpset = {0};
×
456
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
×
457

458
  SMountObj *pObj = NULL;
×
459
  if ((pObj = mndAcquireMount(pMnode, mntInfo.mountName))) {
×
460
    mndReleaseMount(pMnode, pObj);
×
461
    if (mntInfo.ignoreExist) {
×
462
      mInfo("mount:%s, already exist, ignore exist is set", mntInfo.mountName);
×
463
      code = 0;
×
464
      goto _exit;
×
465
    } else {
466
      TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_ALREADY_EXIST);
×
467
    }
468
  } else {
469
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
×
470
      // continue
471
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
472
      TAOS_CHECK_EXIT(code);
×
473
    }
474
  }
475
  TAOS_CHECK_EXIT(tmsgSendReq(&mnodeEpset, &rpcMsg));
×
476
_exit:
×
477
  if (code == 0) {
×
478
    mGInfo("mount:%s, msg:%p, retrieve mount path rsp with code:%d", mntInfo.mountName, pRsp, pRsp->code);
×
479
  } else {
480
    mError("mount:%s, msg:%p, failed at line %d to retrieve mount path rsp since %s", mntInfo.mountName, pRsp, lino,
×
481
           tstrerror(code));
482
    if (rspToClient) {
×
483
      rsp.code = code;
×
484
      tmsgSendRsp(&rsp);
×
485
    }
486
  }
487
  tDecoderClear(&decoder);
×
488
  tFreeMountInfo(&mntInfo, false);
×
489
  TAOS_RETURN(code);
×
490
}
491

492
static int32_t mndProcessCreateMountReq(SRpcMsg *pReq) {
×
493
  int32_t         code = 0, lino = 0;
×
494
  SMnode         *pMnode = pReq->info.node;
×
495
  SDbObj         *pDb = NULL;
×
496
  SMountObj      *pObj = NULL;
×
497
  SUserObj       *pUser = NULL;
×
498
  SCreateMountReq createReq = {0};
×
499

500
  TAOS_CHECK_EXIT(tDeserializeSCreateMountReq(pReq->pCont, pReq->contLen, &createReq));
×
501
  mInfo("mount:%s, start to create on dnode %d from %s", createReq.mountName, *createReq.dnodeIds,
×
502
        createReq.mountPaths[0]);  // TODO: mutiple mounts
503

504
  if ((pObj = mndAcquireMount(pMnode, createReq.mountName))) {
×
505
    if (createReq.ignoreExist) {
×
506
      mInfo("mount:%s, already exist, ignore exist is set", createReq.mountName);
×
507
      code = 0;
×
508
      goto _exit;
×
509
    } else {
510
      code = TSDB_CODE_MND_MOUNT_ALREADY_EXIST;
×
511
      goto _exit;
×
512
    }
513
  } else {
514
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
×
515
      // continue
516
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
517
      goto _exit;
×
518
    }
519
  }
520
  // mount operation share the privileges of db
521
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MOUNT, (SDbObj *)pObj));
×
522
  TAOS_CHECK_EXIT(grantCheck(TSDB_GRANT_MOUNT));
×
523
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
×
524
  char fullMountName[TSDB_MOUNT_NAME_LEN + 32] = {0};
×
525
  (void)snprintf(fullMountName, sizeof(fullMountName), "%d.%s", pUser->acctId, createReq.mountName);
×
526
  if ((pDb = mndAcquireDb(pMnode, fullMountName))) {
×
527
    mndReleaseDb(pMnode, pDb);
×
528
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_DB_NAME_EXIST);
×
529
  }
530

531
  TAOS_CHECK_EXIT(mndRetrieveMountInfo(pMnode, pReq, &createReq));
×
532
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
533

534
  auditRecord(pReq, pMnode->clusterId, "createMount", createReq.mountName, "", createReq.sql, createReq.sqlLen);
×
535

536
_exit:
×
537
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
538
    mError("mount:%s, dnode:%d, path:%s, failed to create at line:%d since %s", createReq.mountName,
×
539
           createReq.dnodeIds ? createReq.dnodeIds[0] : 0, createReq.mountPaths ? createReq.mountPaths[0] : "", lino,
540
           tstrerror(code));  // TODO: mutiple mounts
541
  }
542

543
  mndReleaseMount(pMnode, pObj);
×
544
  mndReleaseUser(pMnode, pUser);
×
545
  tFreeSCreateMountReq(&createReq);
×
546

547
  TAOS_RETURN(code);
×
548
}
549

550
static int32_t mndProcessExecuteMountReq(SRpcMsg *pReq) {
×
551
  int32_t    code = 0, lino = 0;
×
552
  SMnode    *pMnode = pReq->info.node;
×
553
  SDbObj    *pDb = NULL;
×
554
  SMountObj *pObj = NULL;
×
555
  SUserObj  *pUser = NULL;
×
556
  SMountInfo mntInfo = {0};
×
557
  SDecoder   decoder = {0};
×
558
  SRpcMsg    rsp = {0};
×
559
  bool       rspToClient = false;
×
560

561
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
562

563
  TAOS_CHECK_EXIT(tDeserializeSMountInfo(&decoder, &mntInfo, true));
×
564
  rspToClient = true;
×
565
  mInfo("mount:%s, start to execute on mnode", mntInfo.mountName);
×
566

567
  if ((pDb = mndAcquireDb(pMnode, mntInfo.mountName))) {
×
568
    mndReleaseDb(pMnode, pDb);
×
569
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_DB_NAME_EXIST);
×
570
  }
571

572
  if ((pObj = mndAcquireMount(pMnode, mntInfo.mountName))) {
×
573
    if (mntInfo.ignoreExist) {
×
574
      mInfo("mount:%s, already exist, ignore exist is set", mntInfo.mountName);
×
575
      code = 0;
×
576
      goto _exit;
×
577
    } else {
578
      TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_ALREADY_EXIST);
×
579
    }
580
  } else {
581
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
×
582
      // continue
583
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
584
      TAOS_CHECK_EXIT(code);
×
585
    }
586
  }
587
  // mount operation share the privileges of db
588
  TAOS_CHECK_EXIT(grantCheck(TSDB_GRANT_MOUNT));  // TODO: implement when the plan is ready
×
589
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
×
590

591
  TAOS_CHECK_EXIT(mndCreateMount(pMnode, pReq, &mntInfo, pUser));
×
592
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
593
_exit:
×
594
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
595
    // TODO: mutiple path mount
596
    rsp.code = code;
×
597
    mError("mount:%s, dnode:%d, path:%s, failed to create at line:%d since %s", mntInfo.mountName, mntInfo.dnodeId,
×
598
           mntInfo.mountPath, lino, tstrerror(code));
599
  }
600
  if (rspToClient) {
×
601
    rsp.info = *(SRpcHandleInfo *)mntInfo.pVal, tmsgSendRsp(&rsp);
×
602
    tmsgSendRsp(&rsp);
×
603
  }
604
  mndReleaseMount(pMnode, pObj);
×
605
  mndReleaseUser(pMnode, pUser);
×
606
  tDecoderClear(&decoder);
×
607
  tFreeMountInfo(&mntInfo, true);
×
608

609
  TAOS_RETURN(code);
×
610
}
611

612
int32_t mndBuildDropMountRsp(SMountObj *pObj, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
×
613
  int32_t       code = 0;
×
614
  SDropMountRsp dropRsp = {0};
×
615
  if (pObj != NULL) {
×
616
    (void)memcpy(dropRsp.name, pObj->name, TSDB_MOUNT_NAME_LEN);
×
617
    dropRsp.uid = pObj->uid;
×
618
  }
619

620
  int32_t rspLen = tSerializeSDropMountRsp(NULL, 0, &dropRsp);
×
621
  void   *pRsp = NULL;
×
622
  if (useRpcMalloc) {
×
623
    pRsp = rpcMallocCont(rspLen);
×
624
  } else {
625
    pRsp = taosMemoryMalloc(rspLen);
×
626
  }
627

628
  if (pRsp == NULL) {
×
629
    code = terrno;
×
630
    TAOS_RETURN(code);
×
631
  }
632

633
  int32_t ret = 0;
×
634
  if ((ret = tSerializeSDropMountRsp(pRsp, rspLen, &dropRsp)) < 0) return ret;
×
635
  *pRspLen = rspLen;
×
636
  *ppRsp = pRsp;
×
637
  TAOS_RETURN(code);
×
638
}
639

640
bool mndHasMountOnDnode(SMnode *pMnode, int32_t dnodeId) {
6,070✔
641
  SSdb *pSdb = pMnode->pSdb;
6,070✔
642
  void *pIter = NULL;
6,070✔
643

644
  while (1) {
×
645
    SMountObj *pMount = NULL;
6,070✔
646
    pIter = sdbFetch(pSdb, SDB_MOUNT, pIter, (void **)&pMount);
6,070✔
647
    if (pIter == NULL) break;
6,070!
648

649
    for (int32_t i = 0; i < pMount->nMounts; ++i) {
×
650
      if (pMount->dnodeIds[i] == dnodeId) {
×
651
        sdbRelease(pSdb, pMount);
×
652
        return true;
×
653
      }
654
    }
655
    sdbRelease(pSdb, pMount);
×
656
  }
657
  return false;
6,070✔
658
}
659

660
#ifndef TD_ENTERPRISE
661
int32_t mndDropMount(SMnode *pMnode, SRpcMsg *pReq, SMountObj *pObj) { return TSDB_CODE_OPS_NOT_SUPPORT; }
662
#endif
663

664
static int32_t mndProcessDropMountReq(SRpcMsg *pReq) {
×
665
  SMnode       *pMnode = pReq->info.node;
×
666
  int32_t       code = -1;
×
667
  SMountObj    *pObj = NULL;
×
668
  SDropMountReq dropReq = {0};
×
669

670
  TAOS_CHECK_GOTO(tDeserializeSDropMountReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
×
671

672
  mInfo("mount:%s, start to drop", dropReq.mountName);
×
673

674
  pObj = mndAcquireMount(pMnode, dropReq.mountName);
×
675
  if (pObj == NULL) {
×
676
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
677
    if (terrno != 0) code = terrno;
×
678
    if (dropReq.ignoreNotExists) {
×
679
      code = mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
680
    }
681
    goto _exit;
×
682
  }
683

684
  // mount operation share the privileges of db
685
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MOUNT, (SDbObj *)pObj), NULL, _exit);
×
686

687
  code = mndDropMount(pMnode, pReq, pObj);
×
688
  if (code == TSDB_CODE_SUCCESS) {
×
689
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
690
  }
691

692
  // SName name = {0};
693
  // if (tNameFromString(&name, dropReq.mountName, T_NAME_ACCT | T_NAME_DB) < 0)
694
  //   mError("mount:%s, failed to parse db name", dropReq.mountName);
695

696
  auditRecord(pReq, pMnode->clusterId, "dropMount", dropReq.mountName, "", dropReq.sql, dropReq.sqlLen);
×
697

698
_exit:
×
699
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
700
    mError("mount:%s, failed to drop since %s", dropReq.mountName, tstrerror(code));
×
701
  }
702

703
  mndReleaseMount(pMnode, pObj);
×
704
  tFreeSDropMountReq(&dropReq);
×
705
  TAOS_RETURN(code);
×
706
}
707

708
static int32_t mndRetrieveMounts(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
709
  SMnode          *pMnode = pReq->info.node;
×
710
  int32_t          code = 0, lino = 0;
×
711
  int32_t          numOfRows = 0;
×
712
  int32_t          cols = 0;
×
713
  char             tmp[512];
×
714
  int32_t          tmpLen = 0;
×
715
  int32_t          bufLen = 0;
×
716
  char            *pBuf = NULL;
×
717
  char            *qBuf = NULL;
×
718
  void            *pIter = NULL;
×
719
  SSdb            *pSdb = pMnode->pSdb;
×
720
  SColumnInfoData *pColInfo = NULL;
×
721

722
  pBuf = tmp;
×
723
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
×
724
  if (pShow->numOfRows < 1) {
×
725
    SMountObj *pObj = NULL;
×
726
    int32_t    index = 0;
×
727
    while ((pIter = sdbFetch(pSdb, SDB_MOUNT, pIter, (void **)&pObj))) {
×
728
      cols = 0;
×
729
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
×
730
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
×
731
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
×
732
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
×
733
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
×
734

735
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
×
736
        // TAOS_UNUSED(snprintf(pBuf, bufLen, "%d", *(int32_t *)pObj->dnodeIds));  // TODO: support mutiple dnodes
737
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->dnodeIds[0], false, pObj, pIter, _exit);
×
738
      }
739

740
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
×
741
        // TAOS_UNUSED(snprintf(pBuf, bufLen, "%" PRIi64, pObj->createdTime));
742
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
×
743
      }
744

745
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
×
746
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
×
747
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->paths[0]));  // TODO: support mutiple paths
×
748
        varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
×
749
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
×
750
      }
751

752
      sdbRelease(pSdb, pObj);
×
753
      ++numOfRows;
×
754
    }
755
  }
756

757
  pShow->numOfRows += numOfRows;
×
758

759
_exit:
×
760
  if (code < 0) {
×
761
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
762
    TAOS_RETURN(code);
×
763
  }
764
  return numOfRows;
×
765
}
766

767
static void mndCancelGetNextMount(SMnode *pMnode, void *pIter) {
×
768
  SSdb *pSdb = pMnode->pSdb;
×
769
  sdbCancelFetchByType(pSdb, pIter, SDB_MOUNT);
×
770
}
×
771

772
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc