• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

2.0
/source/dnode/mnode/impl/src/mndMount.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifdef USE_MOUNT
16
#define _DEFAULT_SOURCE
17
#include "audit.h"
18
#include "command.h"
19
#include "mndArbGroup.h"
20
#include "mndCluster.h"
21
#include "mndConfig.h"
22
#include "mndDb.h"
23
#include "mndDnode.h"
24
#include "mndIndex.h"
25
#include "mndIndexComm.h"
26
#include "mndMnode.h"
27
#include "mndMount.h"
28
#include "mndPrivilege.h"
29
#include "mndShow.h"
30
#include "mndSma.h"
31
#include "mndStb.h"
32
#include "mndStream.h"
33
#include "mndSubscribe.h"
34
#include "mndTopic.h"
35
#include "mndTrans.h"
36
#include "mndUser.h"
37
#include "mndVgroup.h"
38
#include "mndView.h"
39
#include "systable.h"
40
#include "thttp.h"
41
#include "tjson.h"
42

43
#define MND_MOUNT_VER_NUMBER 1
44

45
static int32_t  mndMountActionInsert(SSdb *pSdb, SMountObj *pObj);
46
static int32_t  mndMountActionDelete(SSdb *pSdb, SMountObj *pObj);
47
static int32_t  mndMountActionUpdate(SSdb *pSdb, SMountObj *pOld, SMountObj *pNew);
48
static int32_t  mndNewMountActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw);
49

50
static int32_t mndProcessCreateMountReq(SRpcMsg *pReq);
51
static int32_t mndProcessDropMountReq(SRpcMsg *pReq);
52
static int32_t mndProcessExecuteMountReq(SRpcMsg *pReq);
53
static int32_t mndProcessRetrieveMountPathRsp(SRpcMsg *pRsp);
54
static int32_t mndRetrieveMounts(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity);
55
static void    mndCancelGetNextMount(SMnode *pMnode, void *pIter);
56

57
int32_t mndInitMount(SMnode *pMnode) {
121,817✔
58
  SSdbTable table = {
121,817✔
59
      .sdbType = SDB_MOUNT,
60
      .keyType = SDB_KEY_BINARY,
61
      .encodeFp = (SdbEncodeFp)mndMountActionEncode,
62
      .decodeFp = (SdbDecodeFp)mndMountActionDecode,
63
      .insertFp = (SdbInsertFp)mndMountActionInsert,
64
      .updateFp = (SdbUpdateFp)mndMountActionUpdate,
65
      .deleteFp = (SdbDeleteFp)mndMountActionDelete,
66
      .validateFp = (SdbValidateFp)mndNewMountActionValidate,
67
  };
68

69
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MOUNT, mndProcessCreateMountReq);
121,817✔
70
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_MOUNT, mndProcessDropMountReq);
121,817✔
71
  mndSetMsgHandle(pMnode, TDMT_MND_EXECUTE_MOUNT, mndProcessExecuteMountReq);
121,817✔
72
  mndSetMsgHandle(pMnode, TDMT_DND_RETRIEVE_MOUNT_PATH_RSP, mndProcessRetrieveMountPathRsp);
121,817✔
73
  mndSetMsgHandle(pMnode, TDMT_DND_MOUNT_VNODE_RSP, mndTransProcessRsp);
121,817✔
74

75
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MOUNT, mndRetrieveMounts);
121,817✔
76
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MOUNT, mndCancelGetNextMount);
121,817✔
77

78
  return sdbSetTable(pMnode->pSdb, table);
121,817✔
79
}
80

81
void mndCleanupMount(SMnode *pMnode) {}
121,747✔
82

UNCOV
83
void mndMountFreeObj(SMountObj *pObj) {
×
UNCOV
84
  if (pObj) {
×
UNCOV
85
    taosMemoryFreeClear(pObj->dnodeIds);
×
UNCOV
86
    taosMemoryFreeClear(pObj->dbObj);
×
UNCOV
87
    if (pObj->paths) {
×
UNCOV
88
      for (int32_t i = 0; i < pObj->nMounts; ++i) {
×
UNCOV
89
        taosMemoryFreeClear(pObj->paths[i]);
×
90
      }
UNCOV
91
      taosMemoryFreeClear(pObj->paths);
×
92
    }
93
  }
UNCOV
94
}
×
95

96
void mndMountDestroyObj(SMountObj *pObj) {
×
97
  if (pObj) {
×
98
    mndMountFreeObj(pObj);
×
99
    taosMemoryFree(pObj);
×
100
  }
101
}
×
102

UNCOV
103
static int32_t tSerializeSMountObj(void *buf, int32_t bufLen, const SMountObj *pObj) {
×
UNCOV
104
  int32_t  code = 0, lino = 0;
×
UNCOV
105
  int32_t  tlen = 0;
×
UNCOV
106
  SEncoder encoder = {0};
×
UNCOV
107
  tEncoderInit(&encoder, buf, bufLen);
×
108

UNCOV
109
  TAOS_CHECK_EXIT(tStartEncode(&encoder));
×
110

UNCOV
111
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->name));
×
UNCOV
112
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->acct));
×
UNCOV
113
  TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->createUser));
×
UNCOV
114
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->createdTime));
×
UNCOV
115
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->updateTime));
×
UNCOV
116
  TAOS_CHECK_EXIT(tEncodeI64v(&encoder, pObj->uid));
×
UNCOV
117
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nMounts));
×
UNCOV
118
  for (int16_t i = 0; i < pObj->nMounts; ++i) {
×
UNCOV
119
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->dnodeIds[i]));
×
UNCOV
120
    TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->paths[i]));
×
121
  }
UNCOV
122
  TAOS_CHECK_EXIT(tEncodeI16v(&encoder, pObj->nDbs));
×
UNCOV
123
  for (int16_t i = 0; i < pObj->nDbs; ++i) {
×
124
    TAOS_CHECK_EXIT(tEncodeI32v(&encoder, pObj->dbObj[i].uid));
×
125
    TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pObj->dbObj[i].name));
×
126
  }
UNCOV
127
  tEndEncode(&encoder);
×
128

UNCOV
129
  tlen = encoder.pos;
×
UNCOV
130
_exit:
×
UNCOV
131
  tEncoderClear(&encoder);
×
UNCOV
132
  if (code < 0) {
×
133
    mError("mount, %s failed at line %d since %s", __func__, lino, tstrerror(code));
×
134
    TAOS_RETURN(code);
×
135
  }
136

UNCOV
137
  return tlen;
×
138
}
139

UNCOV
140
static int32_t tDeserializeSMountObj(void *buf, int32_t bufLen, SMountObj *pObj) {
×
UNCOV
141
  int32_t  code = 0, lino = 0;
×
UNCOV
142
  SDecoder decoder = {0};
×
UNCOV
143
  tDecoderInit(&decoder, buf, bufLen);
×
144

UNCOV
145
  TAOS_CHECK_EXIT(tStartDecode(&decoder));
×
146

UNCOV
147
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->name));
×
UNCOV
148
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->acct));
×
UNCOV
149
  TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->createUser));
×
UNCOV
150
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->createdTime));
×
UNCOV
151
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->updateTime));
×
UNCOV
152
  TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->uid));
×
UNCOV
153
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nMounts));
×
UNCOV
154
  if (pObj->nMounts > 0) {
×
UNCOV
155
    if (!(pObj->dnodeIds = taosMemoryMalloc(sizeof(int32_t) * pObj->nMounts))) {
×
156
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
157
    }
UNCOV
158
    if (!(pObj->paths = taosMemoryMalloc(sizeof(char *) * pObj->nMounts))) {
×
159
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
160
    }
UNCOV
161
    for (int16_t i = 0; i < pObj->nMounts; ++i) {
×
UNCOV
162
      TAOS_CHECK_EXIT(tDecodeI32v(&decoder, &pObj->dnodeIds[i]));
×
UNCOV
163
      TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &pObj->paths[i]));
×
164
    }
165
  }
UNCOV
166
  TAOS_CHECK_EXIT(tDecodeI16v(&decoder, &pObj->nDbs));
×
UNCOV
167
  if (pObj->nDbs > 0) {
×
168
    if (!(pObj->dbObj = taosMemoryMalloc(sizeof(SMountDbObj) * pObj->nDbs))) {
×
169
      TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
170
    }
171
    for (int16_t i = 0; i < pObj->nDbs; ++i) {
×
172
      TAOS_CHECK_EXIT(tDecodeI64v(&decoder, &pObj->dbObj[i].uid));
×
173
      TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pObj->dbObj[i].name));
×
174
    }
175
  }
176

UNCOV
177
_exit:
×
UNCOV
178
  tEndDecode(&decoder);
×
UNCOV
179
  tDecoderClear(&decoder);
×
UNCOV
180
  if (code < 0) {
×
181
    mError("mount, %s failed at line %d since %s, row:%p", __func__, lino, tstrerror(code), pObj);
×
182
  }
UNCOV
183
  TAOS_RETURN(code);
×
184
}
185

UNCOV
186
SSdbRaw *mndMountActionEncode(SMountObj *pObj) {
×
UNCOV
187
  int32_t  code = 0, lino = 0;
×
UNCOV
188
  void    *buf = NULL;
×
UNCOV
189
  SSdbRaw *pRaw = NULL;
×
UNCOV
190
  int32_t  tlen = tSerializeSMountObj(NULL, 0, pObj);
×
UNCOV
191
  if (tlen < 0) {
×
192
    TAOS_CHECK_EXIT(tlen);
×
193
  }
194

UNCOV
195
  int32_t size = sizeof(int32_t) + tlen;
×
UNCOV
196
  pRaw = sdbAllocRaw(SDB_MOUNT, MND_MOUNT_VER_NUMBER, size);
×
UNCOV
197
  if (pRaw == NULL) {
×
198
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
199
  }
200

UNCOV
201
  buf = taosMemoryMalloc(tlen);
×
UNCOV
202
  if (buf == NULL) {
×
203
    TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
×
204
  }
205

UNCOV
206
  tlen = tSerializeSMountObj(buf, tlen, pObj);
×
UNCOV
207
  if (tlen < 0) {
×
208
    TAOS_CHECK_EXIT(tlen);
×
209
  }
210

UNCOV
211
  int32_t dataPos = 0;
×
UNCOV
212
  SDB_SET_INT32(pRaw, dataPos, tlen, _exit);
×
UNCOV
213
  SDB_SET_BINARY(pRaw, dataPos, buf, tlen, _exit);
×
UNCOV
214
  SDB_SET_DATALEN(pRaw, dataPos, _exit);
×
215

UNCOV
216
_exit:
×
UNCOV
217
  taosMemoryFreeClear(buf);
×
UNCOV
218
  if (code != TSDB_CODE_SUCCESS) {
×
219
    terrno = code;
×
220
    mError("mount, failed at line %d to encode to raw:%p since %s", lino, pRaw, tstrerror(code));
×
221
    sdbFreeRaw(pRaw);
×
222
    return NULL;
×
223
  }
224

UNCOV
225
  mTrace("mount, encode to raw:%p, row:%p", pRaw, pObj);
×
UNCOV
226
  return pRaw;
×
227
}
228

UNCOV
229
SSdbRow *mndMountActionDecode(SSdbRaw *pRaw) {
×
UNCOV
230
  int32_t    code = 0, lino = 0;
×
UNCOV
231
  SSdbRow   *pRow = NULL;
×
UNCOV
232
  SMountObj *pObj = NULL;
×
UNCOV
233
  void      *buf = NULL;
×
234

UNCOV
235
  int8_t sver = 0;
×
UNCOV
236
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
×
237
    goto _exit;
×
238
  }
239

UNCOV
240
  if (sver != MND_MOUNT_VER_NUMBER) {
×
241
    code = TSDB_CODE_SDB_INVALID_DATA_VER;
×
242
    mError("mount read invalid ver, data ver: %d, curr ver: %d", sver, MND_MOUNT_VER_NUMBER);
×
243
    goto _exit;
×
244
  }
245

UNCOV
246
  if (!(pRow = sdbAllocRow(sizeof(SMountObj)))) {
×
247
    code = TSDB_CODE_OUT_OF_MEMORY;
×
248
    goto _exit;
×
249
  }
250

UNCOV
251
  if (!(pObj = sdbGetRowObj(pRow))) {
×
252
    code = TSDB_CODE_OUT_OF_MEMORY;
×
253
    goto _exit;
×
254
  }
255

UNCOV
256
  int32_t tlen;
×
UNCOV
257
  int32_t dataPos = 0;
×
UNCOV
258
  SDB_GET_INT32(pRaw, dataPos, &tlen, _exit);
×
UNCOV
259
  buf = taosMemoryMalloc(tlen + 1);
×
UNCOV
260
  if (buf == NULL) {
×
261
    code = TSDB_CODE_OUT_OF_MEMORY;
×
262
    goto _exit;
×
263
  }
UNCOV
264
  SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _exit);
×
265

UNCOV
266
  if (tDeserializeSMountObj(buf, tlen, pObj) < 0) {
×
267
    code = TSDB_CODE_OUT_OF_MEMORY;
×
268
    goto _exit;
×
269
  }
270

UNCOV
271
  taosInitRWLatch(&pObj->lock);
×
272

UNCOV
273
_exit:
×
UNCOV
274
  taosMemoryFreeClear(buf);
×
UNCOV
275
  if (code != TSDB_CODE_SUCCESS) {
×
276
    terrno = code;
×
277
    mError("mount, failed at line %d to decode from raw:%p since %s", lino, pRaw, tstrerror(code));
×
278
    mndMountFreeObj(pObj);
×
279
    taosMemoryFreeClear(pRow);
×
280
    return NULL;
×
281
  }
UNCOV
282
  mTrace("mount, decode from raw:%p, row:%p", pRaw, pObj);
×
UNCOV
283
  return pRow;
×
284
}
285

UNCOV
286
static int32_t mndNewMountActionValidate(SMnode *pMnode, STrans *pTrans, SSdbRaw *pRaw) {
×
UNCOV
287
  mTrace("mount, validate new mount action, raw:%p", pRaw);
×
UNCOV
288
  return 0;
×
289
}
290

UNCOV
291
static int32_t mndMountActionInsert(SSdb *pSdb, SMountObj *pObj) {
×
UNCOV
292
  mTrace("mount:%s, perform insert action, row:%p", pObj->name, pObj);
×
UNCOV
293
  return 0;
×
294
}
295

UNCOV
296
static int32_t mndMountActionDelete(SSdb *pSdb, SMountObj *pObj) {
×
UNCOV
297
  mTrace("mount:%s, perform delete action, row:%p", pObj->name, pObj);
×
UNCOV
298
  mndMountFreeObj(pObj);
×
UNCOV
299
  return 0;
×
300
}
301

UNCOV
302
static int32_t mndMountActionUpdate(SSdb *pSdb, SMountObj *pOld, SMountObj *pNew) {
×
UNCOV
303
  mTrace("mount:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
×
UNCOV
304
  taosWLockLatch(&pOld->lock);
×
UNCOV
305
  pOld->updateTime = pNew->updateTime;
×
UNCOV
306
  taosWUnLockLatch(&pOld->lock);
×
UNCOV
307
  return 0;
×
308
}
309

UNCOV
310
SMountObj *mndAcquireMount(SMnode *pMnode, const char *mountName) {
×
UNCOV
311
  SSdb      *pSdb = pMnode->pSdb;
×
UNCOV
312
  SMountObj *pObj = sdbAcquire(pSdb, SDB_MOUNT, mountName);
×
UNCOV
313
  if (pObj == NULL) {
×
UNCOV
314
    if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
UNCOV
315
      terrno = TSDB_CODE_MND_MOUNT_NOT_EXIST;
×
316
    } else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
×
317
      terrno = TSDB_CODE_MND_MOUNT_IN_CREATING;
×
318
    } else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
×
319
      terrno = TSDB_CODE_MND_MOUNT_IN_DROPPING;
×
320
    } else {
321
      terrno = TSDB_CODE_APP_ERROR;
×
322
      mFatal("mount:%s, failed to acquire mount since %s", mountName, terrstr());
×
323
    }
324
  }
UNCOV
325
  return pObj;
×
326
}
327

UNCOV
328
void mndReleaseMount(SMnode *pMnode, SMountObj *pObj) {
×
UNCOV
329
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
330
  sdbRelease(pSdb, pObj);
×
UNCOV
331
}
×
332

333
bool mndMountIsExist(SMnode *pMnode, const char *mountName) {
×
334
  SMountObj *pObj = mndAcquireMount(pMnode, mountName);
×
335
  if (pObj == NULL) {
×
336
    return false;
×
337
  }
338
  mndReleaseMount(pMnode, pObj);
×
339
  return true;
×
340
}
341

UNCOV
342
void *mndBuildRetrieveMountPathReq(SMnode *pMnode, SRpcMsg *pMsg, const char *mountName, const char *mountPath,
×
343
                                   int32_t dnodeId, int32_t *pContLen) {
UNCOV
344
  int32_t code = 0, lino = 0;
×
UNCOV
345
  void   *pBuf = NULL;
×
346

UNCOV
347
  SRetrieveMountPathReq req = {0};
×
UNCOV
348
  req.dnodeId = dnodeId;
×
UNCOV
349
  req.pVal = &pMsg->info;
×
UNCOV
350
  req.valLen = sizeof(pMsg->info);
×
UNCOV
351
  TAOS_UNUSED(snprintf(req.mountName, TSDB_MOUNT_NAME_LEN, "%s", mountName));
×
UNCOV
352
  TAOS_UNUSED(snprintf(req.mountPath, TSDB_MOUNT_PATH_LEN, "%s", mountPath));
×
353

UNCOV
354
  int32_t contLen = tSerializeSRetrieveMountPathReq(NULL, 0, &req);
×
UNCOV
355
  TAOS_CHECK_EXIT(contLen);
×
UNCOV
356
  TSDB_CHECK_NULL((pBuf = rpcMallocCont(contLen)), code, lino, _exit, terrno);
×
UNCOV
357
  TAOS_CHECK_EXIT(tSerializeSRetrieveMountPathReq(pBuf, contLen, &req));
×
UNCOV
358
_exit:
×
UNCOV
359
  if (code < 0) {
×
360
    rpcFreeCont(pBuf);
×
361
    terrno = code;
×
362
    return NULL;
×
363
  }
UNCOV
364
  *pContLen = contLen;
×
UNCOV
365
  return pBuf;
×
366
}
367

368
#if 0
369
static int32_t mndSetCreateMountUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
370
  int32_t code = 0;
371
  for (int32_t vg = 0; vg < pDb->cfg.numOfVgroups; ++vg) {
372
    SVgObj *pVgroup = pVgroups + vg;
373

374
    for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
375
      SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
376
      TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false));
377
    }
378
  }
379

380
  TAOS_RETURN(code);
381
}
382
#endif
383

384

385
#ifndef TD_ENTERPRISE
386
int32_t mndCreateMount(SMnode *pMnode, SRpcMsg *pReq, SMountInfo *pInfo, SUserObj *pUser) {
387
  return TSDB_CODE_OPS_NOT_SUPPORT;
388
}
389
#endif
390

UNCOV
391
static int32_t mndRetrieveMountInfo(SMnode *pMnode, SRpcMsg *pMsg, SCreateMountReq *pReq) {
×
UNCOV
392
  int32_t    code = 0, lino = 0;
×
UNCOV
393
  SDnodeObj *pDnode = mndAcquireDnode(pMnode, pReq->dnodeIds[0]);
×
UNCOV
394
  if (pDnode == NULL) TAOS_RETURN(terrno);
×
UNCOV
395
  if (pDnode->offlineReason != DND_REASON_ONLINE) {
×
396
    mndReleaseDnode(pMnode, pDnode);
×
397
    TAOS_RETURN(TSDB_CODE_DNODE_OFFLINE);
×
398
  }
UNCOV
399
  SEpSet epSet = mndGetDnodeEpset(pDnode);
×
UNCOV
400
  mndReleaseDnode(pMnode, pDnode);
×
401

UNCOV
402
  int32_t bufLen = 0;
×
403
  void   *pBuf =
UNCOV
404
      mndBuildRetrieveMountPathReq(pMnode, pMsg, pReq->mountName, pReq->mountPaths[0], pReq->dnodeIds[0], &bufLen);
×
UNCOV
405
  if (pBuf == NULL) TAOS_RETURN(terrno);
×
406

UNCOV
407
  SRpcMsg rpcMsg = {.msgType = TDMT_DND_RETRIEVE_MOUNT_PATH, .pCont = pBuf, .contLen = bufLen};
×
UNCOV
408
  TAOS_CHECK_EXIT(tmsgSendReq(&epSet, &rpcMsg));
×
409

UNCOV
410
  pMsg->info.handle = NULL;  // disable auto rsp to client
×
UNCOV
411
_exit:
×
UNCOV
412
  TAOS_RETURN(code);
×
413
}
414

UNCOV
415
static int32_t mndProcessRetrieveMountPathRsp(SRpcMsg *pRsp) {
×
UNCOV
416
  int32_t    code = 0, lino = 0;
×
UNCOV
417
  int32_t    rspCode = 0;
×
UNCOV
418
  SMnode    *pMnode = pRsp->info.node;
×
UNCOV
419
  SMountInfo mntInfo = {0};
×
UNCOV
420
  SDecoder   decoder = {0};
×
UNCOV
421
  void      *pBuf = NULL;
×
UNCOV
422
  int32_t    bufLen = 0;
×
UNCOV
423
  bool       rspToClient = false;
×
424

425
  // step 1: decode and preprocess in mnode read thread
UNCOV
426
  tDecoderInit(&decoder, pRsp->pCont, pRsp->contLen);
×
UNCOV
427
  TAOS_CHECK_EXIT(tDeserializeSMountInfo(&decoder, &mntInfo, false));
×
UNCOV
428
  const STraceId *trace = &pRsp->info.traceId;
×
UNCOV
429
  SRpcMsg         rsp = {
×
430
      // .code = pRsp->code,
431
      // .pCont = pRsp->info.rsp,
432
      // .contLen = pRsp->info.rspLen,
UNCOV
433
              .info = *(SRpcHandleInfo *)mntInfo.pVal,
×
434
  };
UNCOV
435
  rspToClient = true;
×
UNCOV
436
  if (pRsp->code != 0) {
×
UNCOV
437
    TAOS_CHECK_EXIT(pRsp->code);
×
438
  }
439

440
  // wait for all retrieve response received
441
  // TODO: ...
442
  // make sure the clusterId from all rsp is the same, but not with the clusterId of the host cluster
UNCOV
443
  if (mntInfo.clusterId == pMnode->clusterId) {
×
444
    mError("mount:%s, clusterId:%" PRIi64 " from dnode is identical to the host cluster's id:%" PRIi64,
×
445
           mntInfo.mountName, mntInfo.clusterId, pMnode->clusterId);
446
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_CLUSTER_EXIST);
×
447
  }
448

449
  // step 2: collect the responses from dnodes, process and push to mnode write thread to run as transaction
450
  // TODO: multiple retrieve dnodes and paths supported later
UNCOV
451
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(NULL, 0, &mntInfo)) >= 0, code, lino, _exit, bufLen);
×
UNCOV
452
  TSDB_CHECK_CONDITION((pBuf = rpcMallocCont(bufLen)), code, lino, _exit, terrno);
×
UNCOV
453
  TSDB_CHECK_CONDITION((bufLen = tSerializeSMountInfo(pBuf, bufLen, &mntInfo)) >= 0, code, lino, _exit, bufLen);
×
UNCOV
454
  SRpcMsg rpcMsg = {.pCont = pBuf, .contLen = bufLen, .msgType = TDMT_MND_EXECUTE_MOUNT, .info.noResp = 1};
×
UNCOV
455
  SEpSet  mnodeEpset = {0};
×
UNCOV
456
  mndGetMnodeEpSet(pMnode, &mnodeEpset);
×
457

UNCOV
458
  SMountObj *pObj = NULL;
×
UNCOV
459
  if ((pObj = mndAcquireMount(pMnode, mntInfo.mountName))) {
×
460
    mndReleaseMount(pMnode, pObj);
×
461
    if (mntInfo.ignoreExist) {
×
462
      mInfo("mount:%s, already exist, ignore exist is set", mntInfo.mountName);
×
463
      code = 0;
×
464
      goto _exit;
×
465
    } else {
466
      TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_ALREADY_EXIST);
×
467
    }
468
  } else {
UNCOV
469
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
×
470
      // continue
471
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
472
      TAOS_CHECK_EXIT(code);
×
473
    }
474
  }
UNCOV
475
  TAOS_CHECK_EXIT(tmsgSendReq(&mnodeEpset, &rpcMsg));
×
UNCOV
476
_exit:
×
UNCOV
477
  if (code == 0) {
×
UNCOV
478
    mGInfo("mount:%s, msg:%p, retrieve mount path rsp with code:%d", mntInfo.mountName, pRsp, pRsp->code);
×
479
  } else {
UNCOV
480
    mError("mount:%s, msg:%p, failed at line %d to retrieve mount path rsp since %s", mntInfo.mountName, pRsp, lino,
×
481
           tstrerror(code));
UNCOV
482
    if (rspToClient) {
×
UNCOV
483
      rsp.code = code;
×
UNCOV
484
      tmsgSendRsp(&rsp);
×
485
    }
486
  }
UNCOV
487
  tDecoderClear(&decoder);
×
UNCOV
488
  tFreeMountInfo(&mntInfo, false);
×
UNCOV
489
  TAOS_RETURN(code);
×
490
}
491

UNCOV
492
static int32_t mndProcessCreateMountReq(SRpcMsg *pReq) {
×
UNCOV
493
  int32_t         code = 0, lino = 0;
×
UNCOV
494
  SMnode         *pMnode = pReq->info.node;
×
UNCOV
495
  SDbObj         *pDb = NULL;
×
UNCOV
496
  SMountObj      *pObj = NULL;
×
UNCOV
497
  SUserObj       *pUser = NULL;
×
UNCOV
498
  SCreateMountReq createReq = {0};
×
499

UNCOV
500
  TAOS_CHECK_EXIT(tDeserializeSCreateMountReq(pReq->pCont, pReq->contLen, &createReq));
×
UNCOV
501
  mInfo("mount:%s, start to create on dnode %d from %s", createReq.mountName, *createReq.dnodeIds,
×
502
        createReq.mountPaths[0]);  // TODO: mutiple mounts
503

UNCOV
504
  if ((pObj = mndAcquireMount(pMnode, createReq.mountName))) {
×
UNCOV
505
    if (createReq.ignoreExist) {
×
506
      mInfo("mount:%s, already exist, ignore exist is set", createReq.mountName);
×
507
      code = 0;
×
508
      goto _exit;
×
509
    } else {
UNCOV
510
      code = TSDB_CODE_MND_MOUNT_ALREADY_EXIST;
×
UNCOV
511
      goto _exit;
×
512
    }
513
  } else {
UNCOV
514
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
×
515
      // continue
516
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
517
      goto _exit;
×
518
    }
519
  }
520
  // mount operation share the privileges of db
UNCOV
521
  TAOS_CHECK_EXIT(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MOUNT, (SDbObj *)pObj));
×
UNCOV
522
  TAOS_CHECK_EXIT(grantCheck(TSDB_GRANT_MOUNT));
×
UNCOV
523
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
×
UNCOV
524
  char fullMountName[TSDB_MOUNT_NAME_LEN + 32] = {0};
×
UNCOV
525
  (void)snprintf(fullMountName, sizeof(fullMountName), "%d.%s", pUser->acctId, createReq.mountName);
×
UNCOV
526
  if ((pDb = mndAcquireDb(pMnode, fullMountName))) {
×
UNCOV
527
    mndReleaseDb(pMnode, pDb);
×
UNCOV
528
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_DB_NAME_EXIST);
×
529
  }
530

UNCOV
531
  TAOS_CHECK_EXIT(mndRetrieveMountInfo(pMnode, pReq, &createReq));
×
UNCOV
532
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
533

UNCOV
534
  auditRecord(pReq, pMnode->clusterId, "createMount", createReq.mountName, "", createReq.sql, createReq.sqlLen);
×
535

UNCOV
536
_exit:
×
UNCOV
537
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
538
    mError("mount:%s, dnode:%d, path:%s, failed to create at line:%d since %s", createReq.mountName,
×
539
           createReq.dnodeIds ? createReq.dnodeIds[0] : 0, createReq.mountPaths ? createReq.mountPaths[0] : "", lino,
540
           tstrerror(code));  // TODO: mutiple mounts
541
  }
542

UNCOV
543
  mndReleaseMount(pMnode, pObj);
×
UNCOV
544
  mndReleaseUser(pMnode, pUser);
×
UNCOV
545
  tFreeSCreateMountReq(&createReq);
×
546

UNCOV
547
  TAOS_RETURN(code);
×
548
}
549

UNCOV
550
static int32_t mndProcessExecuteMountReq(SRpcMsg *pReq) {
×
UNCOV
551
  int32_t    code = 0, lino = 0;
×
UNCOV
552
  SMnode    *pMnode = pReq->info.node;
×
UNCOV
553
  SDbObj    *pDb = NULL;
×
UNCOV
554
  SMountObj *pObj = NULL;
×
UNCOV
555
  SUserObj  *pUser = NULL;
×
UNCOV
556
  SMountInfo mntInfo = {0};
×
UNCOV
557
  SDecoder   decoder = {0};
×
UNCOV
558
  SRpcMsg    rsp = {0};
×
UNCOV
559
  bool       rspToClient = false;
×
560

UNCOV
561
  tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
×
562

UNCOV
563
  TAOS_CHECK_EXIT(tDeserializeSMountInfo(&decoder, &mntInfo, true));
×
UNCOV
564
  rspToClient = true;
×
UNCOV
565
  mInfo("mount:%s, start to execute on mnode", mntInfo.mountName);
×
566

UNCOV
567
  if ((pDb = mndAcquireDb(pMnode, mntInfo.mountName))) {
×
568
    mndReleaseDb(pMnode, pDb);
×
569
    TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_DUP_DB_NAME_EXIST);
×
570
  }
571

UNCOV
572
  if ((pObj = mndAcquireMount(pMnode, mntInfo.mountName))) {
×
573
    if (mntInfo.ignoreExist) {
×
574
      mInfo("mount:%s, already exist, ignore exist is set", mntInfo.mountName);
×
575
      code = 0;
×
576
      goto _exit;
×
577
    } else {
578
      TAOS_CHECK_EXIT(TSDB_CODE_MND_MOUNT_ALREADY_EXIST);
×
579
    }
580
  } else {
UNCOV
581
    if ((code = terrno) == TSDB_CODE_MND_MOUNT_NOT_EXIST) {
×
582
      // continue
583
    } else {  // TSDB_CODE_MND_MOUNT_IN_CREATING | TSDB_CODE_MND_MOUNT_IN_DROPPING | TSDB_CODE_APP_ERROR
584
      TAOS_CHECK_EXIT(code);
×
585
    }
586
  }
587
  // mount operation share the privileges of db
UNCOV
588
  TAOS_CHECK_EXIT(grantCheck(TSDB_GRANT_MOUNT));  // TODO: implement when the plan is ready
×
UNCOV
589
  TAOS_CHECK_EXIT(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser));
×
590

UNCOV
591
  TAOS_CHECK_EXIT(mndCreateMount(pMnode, pReq, &mntInfo, pUser));
×
UNCOV
592
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
UNCOV
593
_exit:
×
UNCOV
594
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
595
    // TODO: mutiple path mount
596
    rsp.code = code;
×
597
    mError("mount:%s, dnode:%d, path:%s, failed to create at line:%d since %s", mntInfo.mountName, mntInfo.dnodeId,
×
598
           mntInfo.mountPath, lino, tstrerror(code));
599
  }
UNCOV
600
  if (rspToClient) {
×
UNCOV
601
    rsp.info = *(SRpcHandleInfo *)mntInfo.pVal, tmsgSendRsp(&rsp);
×
UNCOV
602
    tmsgSendRsp(&rsp);
×
603
  }
UNCOV
604
  mndReleaseMount(pMnode, pObj);
×
UNCOV
605
  mndReleaseUser(pMnode, pUser);
×
UNCOV
606
  tDecoderClear(&decoder);
×
UNCOV
607
  tFreeMountInfo(&mntInfo, true);
×
608

UNCOV
609
  TAOS_RETURN(code);
×
610
}
611

UNCOV
612
int32_t mndBuildDropMountRsp(SMountObj *pObj, int32_t *pRspLen, void **ppRsp, bool useRpcMalloc) {
×
UNCOV
613
  int32_t       code = 0;
×
UNCOV
614
  SDropMountRsp dropRsp = {0};
×
UNCOV
615
  if (pObj != NULL) {
×
UNCOV
616
    (void)memcpy(dropRsp.name, pObj->name, TSDB_MOUNT_NAME_LEN);
×
UNCOV
617
    dropRsp.uid = pObj->uid;
×
618
  }
619

UNCOV
620
  int32_t rspLen = tSerializeSDropMountRsp(NULL, 0, &dropRsp);
×
UNCOV
621
  void   *pRsp = NULL;
×
UNCOV
622
  if (useRpcMalloc) {
×
623
    pRsp = rpcMallocCont(rspLen);
×
624
  } else {
UNCOV
625
    pRsp = taosMemoryMalloc(rspLen);
×
626
  }
627

UNCOV
628
  if (pRsp == NULL) {
×
629
    code = terrno;
×
630
    TAOS_RETURN(code);
×
631
  }
632

UNCOV
633
  int32_t ret = 0;
×
UNCOV
634
  if ((ret = tSerializeSDropMountRsp(pRsp, rspLen, &dropRsp)) < 0) return ret;
×
UNCOV
635
  *pRspLen = rspLen;
×
UNCOV
636
  *ppRsp = pRsp;
×
UNCOV
637
  TAOS_RETURN(code);
×
638
}
639

640
bool mndHasMountOnDnode(SMnode *pMnode, int32_t dnodeId) {
774✔
641
  SSdb *pSdb = pMnode->pSdb;
774✔
642
  void *pIter = NULL;
774✔
643

644
  while (1) {
×
645
    SMountObj *pMount = NULL;
774✔
646
    pIter = sdbFetch(pSdb, SDB_MOUNT, pIter, (void **)&pMount);
774✔
647
    if (pIter == NULL) break;
774!
648

649
    for (int32_t i = 0; i < pMount->nMounts; ++i) {
×
650
      if (pMount->dnodeIds[i] == dnodeId) {
×
651
        sdbRelease(pSdb, pMount);
×
652
        return true;
×
653
      }
654
    }
655
    sdbRelease(pSdb, pMount);
×
656
  }
657
  return false;
774✔
658
}
659

660
#ifndef TD_ENTERPRISE
661
int32_t mndDropMount(SMnode *pMnode, SRpcMsg *pReq, SMountObj *pObj) { return TSDB_CODE_OPS_NOT_SUPPORT; }
662
#endif
663

UNCOV
664
static int32_t mndProcessDropMountReq(SRpcMsg *pReq) {
×
UNCOV
665
  SMnode       *pMnode = pReq->info.node;
×
UNCOV
666
  int32_t       code = -1;
×
UNCOV
667
  SMountObj    *pObj = NULL;
×
UNCOV
668
  SDropMountReq dropReq = {0};
×
669

UNCOV
670
  TAOS_CHECK_GOTO(tDeserializeSDropMountReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _exit);
×
671

UNCOV
672
  mInfo("mount:%s, start to drop", dropReq.mountName);
×
673

UNCOV
674
  pObj = mndAcquireMount(pMnode, dropReq.mountName);
×
UNCOV
675
  if (pObj == NULL) {
×
UNCOV
676
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
UNCOV
677
    if (terrno != 0) code = terrno;
×
UNCOV
678
    if (dropReq.ignoreNotExists) {
×
679
      code = mndBuildDropMountRsp(pObj, &pReq->info.rspLen, &pReq->info.rsp, true);
×
680
    }
UNCOV
681
    goto _exit;
×
682
  }
683

684
  // mount operation share the privileges of db
UNCOV
685
  TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MOUNT, (SDbObj *)pObj), NULL, _exit);
×
686

UNCOV
687
  code = mndDropMount(pMnode, pReq, pObj);
×
UNCOV
688
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
689
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
690
  }
691

692
  // SName name = {0};
693
  // if (tNameFromString(&name, dropReq.mountName, T_NAME_ACCT | T_NAME_DB) < 0)
694
  //   mError("mount:%s, failed to parse db name", dropReq.mountName);
695

UNCOV
696
  auditRecord(pReq, pMnode->clusterId, "dropMount", dropReq.mountName, "", dropReq.sql, dropReq.sqlLen);
×
697

UNCOV
698
_exit:
×
UNCOV
699
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
700
    mError("mount:%s, failed to drop since %s", dropReq.mountName, tstrerror(code));
×
701
  }
702

UNCOV
703
  mndReleaseMount(pMnode, pObj);
×
UNCOV
704
  tFreeSDropMountReq(&dropReq);
×
UNCOV
705
  TAOS_RETURN(code);
×
706
}
707

UNCOV
708
static int32_t mndRetrieveMounts(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
×
UNCOV
709
  SMnode          *pMnode = pReq->info.node;
×
UNCOV
710
  int32_t          code = 0, lino = 0;
×
UNCOV
711
  int32_t          numOfRows = 0;
×
UNCOV
712
  int32_t          cols = 0;
×
UNCOV
713
  char             tmp[512];
×
UNCOV
714
  int32_t          tmpLen = 0;
×
UNCOV
715
  int32_t          bufLen = 0;
×
UNCOV
716
  char            *pBuf = NULL;
×
UNCOV
717
  char            *qBuf = NULL;
×
UNCOV
718
  void            *pIter = NULL;
×
UNCOV
719
  SSdb            *pSdb = pMnode->pSdb;
×
UNCOV
720
  SColumnInfoData *pColInfo = NULL;
×
721

UNCOV
722
  pBuf = tmp;
×
UNCOV
723
  bufLen = sizeof(tmp) - VARSTR_HEADER_SIZE;
×
UNCOV
724
  if (pShow->numOfRows < 1) {
×
UNCOV
725
    SMountObj *pObj = NULL;
×
UNCOV
726
    int32_t    index = 0;
×
UNCOV
727
    while ((pIter = sdbFetch(pSdb, SDB_MOUNT, pIter, (void **)&pObj))) {
×
UNCOV
728
      cols = 0;
×
UNCOV
729
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
×
UNCOV
730
      qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
×
UNCOV
731
      TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->name));
×
UNCOV
732
      varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
×
UNCOV
733
      COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
×
734

UNCOV
735
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
×
736
        // TAOS_UNUSED(snprintf(pBuf, bufLen, "%d", *(int32_t *)pObj->dnodeIds));  // TODO: support mutiple dnodes
UNCOV
737
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->dnodeIds[0], false, pObj, pIter, _exit);
×
738
      }
739

UNCOV
740
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
×
741
        // TAOS_UNUSED(snprintf(pBuf, bufLen, "%" PRIi64, pObj->createdTime));
UNCOV
742
        COL_DATA_SET_VAL_GOTO((const char *)&pObj->createdTime, false, pObj, pIter, _exit);
×
743
      }
744

UNCOV
745
      if ((pColInfo = taosArrayGet(pBlock->pDataBlock, ++cols))) {
×
UNCOV
746
        qBuf = POINTER_SHIFT(pBuf, VARSTR_HEADER_SIZE);
×
UNCOV
747
        TAOS_UNUSED(snprintf(qBuf, bufLen, "%s", pObj->paths[0]));  // TODO: support mutiple paths
×
UNCOV
748
        varDataSetLen(pBuf, strlen(pBuf + VARSTR_HEADER_SIZE));
×
UNCOV
749
        COL_DATA_SET_VAL_GOTO(pBuf, false, pObj, pIter, _exit);
×
750
      }
751

UNCOV
752
      sdbRelease(pSdb, pObj);
×
UNCOV
753
      ++numOfRows;
×
754
    }
755
  }
756

UNCOV
757
  pShow->numOfRows += numOfRows;
×
758

UNCOV
759
_exit:
×
UNCOV
760
  if (code < 0) {
×
761
    mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
762
    TAOS_RETURN(code);
×
763
  }
UNCOV
764
  return numOfRows;
×
765
}
766

767
static void mndCancelGetNextMount(SMnode *pMnode, void *pIter) {
×
768
  SSdb *pSdb = pMnode->pSdb;
×
769
  sdbCancelFetchByType(pSdb, pIter, SDB_MOUNT);
×
770
}
×
771

772
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc