• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.94
/source/dnode/mnode/impl/src/mndFunc.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndFunc.h"
18
#include "mndPrivilege.h"
19
#include "mndShow.h"
20
#include "mndSync.h"
21
#include "mndTrans.h"
22
#include "mndUser.h"
23

24
#define SDB_FUNC_VER          2
25
#define SDB_FUNC_RESERVE_SIZE 64
26

27
static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
28
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
29
static int32_t  mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
30
static int32_t  mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
31
static int32_t  mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew);
32
static int32_t  mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCreate);
33
static int32_t  mndDropFunc(SMnode *pMnode, SRpcMsg *pReq, SFuncObj *pFunc);
34
static int32_t  mndProcessCreateFuncReq(SRpcMsg *pReq);
35
static int32_t  mndProcessDropFuncReq(SRpcMsg *pReq);
36
static int32_t  mndProcessRetrieveFuncReq(SRpcMsg *pReq);
37
static int32_t  mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
38
static void     mndCancelGetNextFunc(SMnode *pMnode, void *pIter);
39

40
int32_t mndInitFunc(SMnode *pMnode) {
13✔
41
  SSdbTable table = {
13✔
42
      .sdbType = SDB_FUNC,
43
      .keyType = SDB_KEY_BINARY,
44
      .encodeFp = (SdbEncodeFp)mndFuncActionEncode,
45
      .decodeFp = (SdbDecodeFp)mndFuncActionDecode,
46
      .insertFp = (SdbInsertFp)mndFuncActionInsert,
47
      .updateFp = (SdbUpdateFp)mndFuncActionUpdate,
48
      .deleteFp = (SdbDeleteFp)mndFuncActionDelete,
49
  };
50

51
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNC, mndProcessCreateFuncReq);
13✔
52
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
13✔
53
  mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
13✔
54

55
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
13✔
56
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);
13✔
57

58
  return sdbSetTable(pMnode->pSdb, table);
13✔
59
}
60

61
void mndCleanupFunc(SMnode *pMnode) {}
13✔
62

UNCOV
63
static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
×
UNCOV
64
  int32_t code = 0;
×
UNCOV
65
  int32_t lino = 0;
×
UNCOV
66
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
67

UNCOV
68
  int32_t  size = pFunc->commentSize + pFunc->codeSize + sizeof(SFuncObj) + SDB_FUNC_RESERVE_SIZE;
×
UNCOV
69
  SSdbRaw *pRaw = sdbAllocRaw(SDB_FUNC, SDB_FUNC_VER, size);
×
UNCOV
70
  if (pRaw == NULL) goto _OVER;
×
71

UNCOV
72
  int32_t dataPos = 0;
×
UNCOV
73
  SDB_SET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, _OVER)
×
UNCOV
74
  SDB_SET_INT64(pRaw, dataPos, pFunc->createdTime, _OVER)
×
UNCOV
75
  SDB_SET_INT8(pRaw, dataPos, pFunc->funcType, _OVER)
×
UNCOV
76
  SDB_SET_INT8(pRaw, dataPos, pFunc->scriptType, _OVER)
×
UNCOV
77
  SDB_SET_INT8(pRaw, dataPos, pFunc->align, _OVER)
×
UNCOV
78
  SDB_SET_INT8(pRaw, dataPos, pFunc->outputType, _OVER)
×
UNCOV
79
  SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen, _OVER)
×
UNCOV
80
  SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize, _OVER)
×
UNCOV
81
  SDB_SET_INT64(pRaw, dataPos, pFunc->signature, _OVER)
×
UNCOV
82
  SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, _OVER)
×
UNCOV
83
  SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, _OVER)
×
UNCOV
84
  if (pFunc->commentSize > 0) {
×
UNCOV
85
    SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
×
86
  }
UNCOV
87
  SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
×
UNCOV
88
  SDB_SET_INT32(pRaw, dataPos, pFunc->funcVersion, _OVER)
×
UNCOV
89
  SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
×
UNCOV
90
  SDB_SET_DATALEN(pRaw, dataPos, _OVER);
×
91

UNCOV
92
  terrno = 0;
×
93

UNCOV
94
_OVER:
×
UNCOV
95
  if (terrno != 0) {
×
96
    mError("func:%s, failed to encode to raw:%p since %s", pFunc->name, pRaw, terrstr());
×
97
    sdbFreeRaw(pRaw);
×
98
    return NULL;
×
99
  }
100

UNCOV
101
  mTrace("func:%s, encode to raw:%p, row:%p", pFunc->name, pRaw, pFunc);
×
UNCOV
102
  return pRaw;
×
103
}
104

UNCOV
105
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
×
UNCOV
106
  int32_t code = 0;
×
UNCOV
107
  int32_t lino = 0;
×
UNCOV
108
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
109
  SSdbRow  *pRow = NULL;
×
UNCOV
110
  SFuncObj *pFunc = NULL;
×
111

UNCOV
112
  int8_t sver = 0;
×
UNCOV
113
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
114

UNCOV
115
  if (sver != 1 && sver != 2) {
×
116
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
117
    goto _OVER;
×
118
  }
119

UNCOV
120
  pRow = sdbAllocRow(sizeof(SFuncObj));
×
UNCOV
121
  if (pRow == NULL) goto _OVER;
×
122

UNCOV
123
  pFunc = sdbGetRowObj(pRow);
×
UNCOV
124
  if (pFunc == NULL) goto _OVER;
×
125

UNCOV
126
  int32_t dataPos = 0;
×
UNCOV
127
  SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, _OVER)
×
UNCOV
128
  SDB_GET_INT64(pRaw, dataPos, &pFunc->createdTime, _OVER)
×
UNCOV
129
  SDB_GET_INT8(pRaw, dataPos, &pFunc->funcType, _OVER)
×
UNCOV
130
  SDB_GET_INT8(pRaw, dataPos, &pFunc->scriptType, _OVER)
×
UNCOV
131
  SDB_GET_INT8(pRaw, dataPos, &pFunc->align, _OVER)
×
UNCOV
132
  SDB_GET_INT8(pRaw, dataPos, &pFunc->outputType, _OVER)
×
UNCOV
133
  SDB_GET_INT32(pRaw, dataPos, &pFunc->outputLen, _OVER)
×
UNCOV
134
  SDB_GET_INT32(pRaw, dataPos, &pFunc->bufSize, _OVER)
×
UNCOV
135
  SDB_GET_INT64(pRaw, dataPos, &pFunc->signature, _OVER)
×
UNCOV
136
  SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, _OVER)
×
UNCOV
137
  SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, _OVER)
×
138

UNCOV
139
  if (pFunc->commentSize > 0) {
×
UNCOV
140
    pFunc->pComment = taosMemoryCalloc(1, pFunc->commentSize);
×
UNCOV
141
    if (pFunc->pComment == NULL) {
×
142
      goto _OVER;
×
143
    }
UNCOV
144
    SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
×
145
  }
146

UNCOV
147
  pFunc->pCode = taosMemoryCalloc(1, pFunc->codeSize);
×
UNCOV
148
  if (pFunc->pCode == NULL) {
×
149
    goto _OVER;
×
150
  }
UNCOV
151
  SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
×
152

UNCOV
153
  if (sver >= 2) {
×
UNCOV
154
    SDB_GET_INT32(pRaw, dataPos, &pFunc->funcVersion, _OVER)
×
155
  }
156

UNCOV
157
  SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
×
158

UNCOV
159
  taosInitRWLatch(&pFunc->lock);
×
160

UNCOV
161
  terrno = 0;
×
162

UNCOV
163
_OVER:
×
UNCOV
164
  if (terrno != 0) {
×
165
    mError("func:%s, failed to decode from raw:%p since %s", pFunc == NULL ? "null" : pFunc->name, pRaw, terrstr());
×
166
    taosMemoryFreeClear(pRow);
×
167
    return NULL;
×
168
  }
169

UNCOV
170
  mTrace("func:%s, decode from raw:%p, row:%p", pFunc->name, pRaw, pFunc);
×
UNCOV
171
  return pRow;
×
172
}
173

UNCOV
174
static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc) {
×
UNCOV
175
  mTrace("func:%s, perform insert action, row:%p", pFunc->name, pFunc);
×
UNCOV
176
  return 0;
×
177
}
178

UNCOV
179
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
×
UNCOV
180
  mTrace("func:%s, perform delete action, row:%p", pFunc->name, pFunc);
×
UNCOV
181
  taosMemoryFreeClear(pFunc->pCode);
×
UNCOV
182
  taosMemoryFreeClear(pFunc->pComment);
×
UNCOV
183
  return 0;
×
184
}
185

UNCOV
186
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
×
UNCOV
187
  int32_t code = 0;
×
UNCOV
188
  mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
×
189

UNCOV
190
  taosWLockLatch(&pOld->lock);
×
191

UNCOV
192
  pOld->align = pNew->align;
×
UNCOV
193
  pOld->bufSize = pNew->bufSize;
×
UNCOV
194
  pOld->codeSize = pNew->codeSize;
×
UNCOV
195
  pOld->commentSize = pNew->commentSize;
×
UNCOV
196
  pOld->createdTime = pNew->createdTime;
×
UNCOV
197
  pOld->funcType = pNew->funcType;
×
UNCOV
198
  pOld->funcVersion = pNew->funcVersion;
×
UNCOV
199
  pOld->outputLen = pNew->outputLen;
×
UNCOV
200
  pOld->outputType = pNew->outputType;
×
201

UNCOV
202
  if (pOld->pComment != NULL) {
×
UNCOV
203
    taosMemoryFree(pOld->pComment);
×
UNCOV
204
    pOld->pComment = NULL;
×
205
  }
UNCOV
206
  if (pNew->commentSize > 0 && pNew->pComment != NULL) {
×
UNCOV
207
    pOld->commentSize = pNew->commentSize;
×
UNCOV
208
    pOld->pComment = taosMemoryMalloc(pOld->commentSize);
×
UNCOV
209
    if (pOld->pComment == NULL) {
×
210
      code = terrno;
×
211
      taosWUnLockLatch(&pOld->lock);
×
212
      return code;
×
213
    }
UNCOV
214
    (void)memcpy(pOld->pComment, pNew->pComment, pOld->commentSize);
×
215
  }
216

UNCOV
217
  if (pOld->pCode != NULL) {
×
UNCOV
218
    taosMemoryFree(pOld->pCode);
×
UNCOV
219
    pOld->pCode = NULL;
×
220
  }
UNCOV
221
  if (pNew->codeSize > 0 && pNew->pCode != NULL) {
×
UNCOV
222
    pOld->codeSize = pNew->codeSize;
×
UNCOV
223
    pOld->pCode = taosMemoryMalloc(pOld->codeSize);
×
UNCOV
224
    if (pOld->pCode == NULL) {
×
225
      code = terrno;
×
226
      taosWUnLockLatch(&pOld->lock);
×
227
      return code;
×
228
    }
UNCOV
229
    (void)memcpy(pOld->pCode, pNew->pCode, pOld->codeSize);
×
230
  }
231

UNCOV
232
  pOld->scriptType = pNew->scriptType;
×
UNCOV
233
  pOld->signature = pNew->signature;
×
234

UNCOV
235
  taosWUnLockLatch(&pOld->lock);
×
236

UNCOV
237
  return 0;
×
238
}
239

UNCOV
240
static SFuncObj *mndAcquireFunc(SMnode *pMnode, char *funcName) {
×
UNCOV
241
  terrno = 0;
×
UNCOV
242
  SSdb     *pSdb = pMnode->pSdb;
×
UNCOV
243
  SFuncObj *pFunc = sdbAcquire(pSdb, SDB_FUNC, funcName);
×
UNCOV
244
  if (pFunc == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
UNCOV
245
    terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
×
246
  }
UNCOV
247
  return pFunc;
×
248
}
249

UNCOV
250
static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) {
×
UNCOV
251
  SSdb *pSdb = pMnode->pSdb;
×
UNCOV
252
  sdbRelease(pSdb, pFunc);
×
UNCOV
253
}
×
254

UNCOV
255
static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCreate) {
×
UNCOV
256
  int32_t code = -1;
×
UNCOV
257
  STrans *pTrans = NULL;
×
258

UNCOV
259
  if ((code = grantCheck(TSDB_GRANT_USER)) < 0) {
×
260
    return code;
×
261
  }
262

UNCOV
263
  SFuncObj func = {0};
×
UNCOV
264
  (void)memcpy(func.name, pCreate->name, TSDB_FUNC_NAME_LEN);
×
UNCOV
265
  func.createdTime = taosGetTimestampMs();
×
UNCOV
266
  func.funcType = pCreate->funcType;
×
UNCOV
267
  func.scriptType = pCreate->scriptType;
×
UNCOV
268
  func.outputType = pCreate->outputType;
×
UNCOV
269
  func.outputLen = pCreate->outputLen;
×
UNCOV
270
  func.bufSize = pCreate->bufSize;
×
UNCOV
271
  func.signature = pCreate->signature;
×
UNCOV
272
  if (NULL != pCreate->pComment) {
×
UNCOV
273
    func.commentSize = strlen(pCreate->pComment) + 1;
×
UNCOV
274
    func.pComment = taosMemoryMalloc(func.commentSize);
×
UNCOV
275
    if (func.pComment == NULL) {
×
276
      code = terrno;
×
277
      goto _OVER;
×
278
    }
279
  }
UNCOV
280
  func.codeSize = pCreate->codeLen;
×
UNCOV
281
  func.pCode = taosMemoryMalloc(func.codeSize);
×
UNCOV
282
  if (func.pCode == NULL || func.pCode == NULL) {
×
283
    code = terrno;
×
284
    goto _OVER;
×
285
  }
286

UNCOV
287
  if (func.commentSize > 0) {
×
UNCOV
288
    (void)memcpy(func.pComment, pCreate->pComment, func.commentSize);
×
289
  }
UNCOV
290
  (void)memcpy(func.pCode, pCreate->pCode, func.codeSize);
×
291

UNCOV
292
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func");
×
UNCOV
293
  if (pTrans == NULL) {
×
294
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
295
    if (terrno != 0) code = terrno;
×
296
    goto _OVER;
×
297
  }
UNCOV
298
  mInfo("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
×
299

UNCOV
300
  SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name);
×
UNCOV
301
  if (pCreate->orReplace == 1 && oldFunc != NULL) {
×
UNCOV
302
    func.funcVersion = oldFunc->funcVersion + 1;
×
UNCOV
303
    func.createdTime = oldFunc->createdTime;
×
304

UNCOV
305
    SSdbRaw *pRedoRaw = mndFuncActionEncode(oldFunc);
×
UNCOV
306
    if (pRedoRaw == NULL) {
×
307
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
308
      if (terrno != 0) code = terrno;
×
309
      goto _OVER;
×
310
    }
UNCOV
311
    TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRedoRaw), NULL, _OVER);
×
UNCOV
312
    TAOS_CHECK_GOTO(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY), NULL, _OVER);
×
313

UNCOV
314
    SSdbRaw *pUndoRaw = mndFuncActionEncode(oldFunc);
×
UNCOV
315
    if (pUndoRaw == NULL) {
×
316
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
317
      if (terrno != 0) code = terrno;
×
318
      goto _OVER;
×
319
    }
UNCOV
320
    TAOS_CHECK_GOTO(mndTransAppendUndolog(pTrans, pUndoRaw), NULL, _OVER);
×
UNCOV
321
    TAOS_CHECK_GOTO(sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY), NULL, _OVER);
×
322

UNCOV
323
    SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
×
UNCOV
324
    if (pCommitRaw == NULL) {
×
325
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
326
      if (terrno != 0) code = terrno;
×
327
      goto _OVER;
×
328
    }
UNCOV
329
    TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pCommitRaw), NULL, _OVER);
×
UNCOV
330
    TAOS_CHECK_GOTO(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY), NULL, _OVER);
×
331
  } else {
UNCOV
332
    SSdbRaw *pRedoRaw = mndFuncActionEncode(&func);
×
UNCOV
333
    if (pRedoRaw == NULL) {
×
334
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
335
      if (terrno != 0) code = terrno;
×
336
      goto _OVER;
×
337
    }
UNCOV
338
    TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRedoRaw), NULL, _OVER);
×
UNCOV
339
    TAOS_CHECK_GOTO(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING), NULL, _OVER);
×
340

UNCOV
341
    SSdbRaw *pUndoRaw = mndFuncActionEncode(&func);
×
UNCOV
342
    if (pUndoRaw == NULL) {
×
343
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
344
      if (terrno != 0) code = terrno;
×
345
      goto _OVER;
×
346
    }
UNCOV
347
    TAOS_CHECK_GOTO(mndTransAppendUndolog(pTrans, pUndoRaw), NULL, _OVER);
×
UNCOV
348
    TAOS_CHECK_GOTO(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED), NULL, _OVER);
×
349

UNCOV
350
    SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
×
UNCOV
351
    if (pCommitRaw == NULL) {
×
352
      code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
353
      if (terrno != 0) code = terrno;
×
354
      goto _OVER;
×
355
    }
UNCOV
356
    TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pCommitRaw), NULL, _OVER);
×
UNCOV
357
    TAOS_CHECK_GOTO(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY), NULL, _OVER);
×
358
  }
359

UNCOV
360
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
361

UNCOV
362
  code = 0;
×
363

UNCOV
364
_OVER:
×
UNCOV
365
  if (oldFunc != NULL) {
×
UNCOV
366
    mndReleaseFunc(pMnode, oldFunc);
×
367
  }
368

UNCOV
369
  taosMemoryFree(func.pCode);
×
UNCOV
370
  taosMemoryFree(func.pComment);
×
UNCOV
371
  mndTransDrop(pTrans);
×
UNCOV
372
  TAOS_RETURN(code);
×
373
}
374

UNCOV
375
static int32_t mndDropFunc(SMnode *pMnode, SRpcMsg *pReq, SFuncObj *pFunc) {
×
UNCOV
376
  int32_t code = -1;
×
UNCOV
377
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "drop-func");
×
UNCOV
378
  if (pTrans == NULL) {
×
379
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
380
    if (terrno != 0) code = terrno;
×
381
    goto _OVER;
×
382
  }
383

UNCOV
384
  mInfo("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
×
385

UNCOV
386
  SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc);
×
UNCOV
387
  if (pRedoRaw == NULL) {
×
388
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
389
    if (terrno != 0) code = terrno;
×
390
    goto _OVER;
×
391
  }
UNCOV
392
  TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRedoRaw), NULL, _OVER);
×
UNCOV
393
  TAOS_CHECK_GOTO(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING), NULL, _OVER);
×
394

UNCOV
395
  SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc);
×
UNCOV
396
  if (pUndoRaw == NULL) {
×
397
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
398
    if (terrno != 0) code = terrno;
×
399
    goto _OVER;
×
400
  }
UNCOV
401
  TAOS_CHECK_GOTO(mndTransAppendUndolog(pTrans, pUndoRaw), NULL, _OVER);
×
UNCOV
402
  TAOS_CHECK_GOTO(sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY), NULL, _OVER);
×
403

UNCOV
404
  SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc);
×
UNCOV
405
  if (pCommitRaw == NULL) {
×
406
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
407
    if (terrno != 0) code = terrno;
×
408
    goto _OVER;
×
409
  }
UNCOV
410
  TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pCommitRaw), NULL, _OVER);
×
UNCOV
411
  TAOS_CHECK_GOTO(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED), NULL, _OVER);
×
412

UNCOV
413
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
414

UNCOV
415
  code = 0;
×
416

UNCOV
417
_OVER:
×
UNCOV
418
  mndTransDrop(pTrans);
×
UNCOV
419
  return code;
×
420
}
421

UNCOV
422
static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
×
UNCOV
423
  SMnode        *pMnode = pReq->info.node;
×
UNCOV
424
  int32_t        code = -1;
×
UNCOV
425
  SFuncObj      *pFunc = NULL;
×
UNCOV
426
  SCreateFuncReq createReq = {0};
×
427

UNCOV
428
  TAOS_CHECK_GOTO(tDeserializeSCreateFuncReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
429

430
#ifdef WINDOWS
431
  code = TSDB_CODE_MND_INVALID_PLATFORM;
432
  goto _OVER;
433
#endif
UNCOV
434
  mInfo("func:%s, start to create, size:%d", createReq.name, createReq.codeLen);
×
UNCOV
435
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC), NULL, _OVER);
×
436

UNCOV
437
  pFunc = mndAcquireFunc(pMnode, createReq.name);
×
UNCOV
438
  if (pFunc != NULL) {
×
UNCOV
439
    if (createReq.igExists) {
×
UNCOV
440
      mInfo("func:%s, already exist, ignore exist is set", createReq.name);
×
UNCOV
441
      code = 0;
×
UNCOV
442
      goto _OVER;
×
UNCOV
443
    } else if (createReq.orReplace) {
×
UNCOV
444
      mInfo("func:%s, replace function is set", createReq.name);
×
UNCOV
445
      code = 0;
×
446
    } else {
UNCOV
447
      code = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
×
UNCOV
448
      goto _OVER;
×
449
    }
UNCOV
450
  } else if (terrno == TSDB_CODE_MND_FUNC_ALREADY_EXIST) {
×
451
    goto _OVER;
×
452
  }
453

UNCOV
454
  if (createReq.name[0] == 0) {
×
UNCOV
455
    code = TSDB_CODE_MND_INVALID_FUNC_NAME;
×
UNCOV
456
    goto _OVER;
×
457
  }
458

UNCOV
459
  if (createReq.pCode == NULL) {
×
UNCOV
460
    code = TSDB_CODE_MND_INVALID_FUNC_CODE;
×
UNCOV
461
    goto _OVER;
×
462
  }
463

UNCOV
464
  if (createReq.codeLen <= 1) {
×
UNCOV
465
    code = TSDB_CODE_MND_INVALID_FUNC_CODE;
×
UNCOV
466
    goto _OVER;
×
467
  }
468

UNCOV
469
  if (createReq.bufSize < 0 || createReq.bufSize > TSDB_FUNC_BUF_SIZE) {
×
UNCOV
470
    code = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
×
UNCOV
471
    goto _OVER;
×
472
  }
473

UNCOV
474
  code = mndCreateFunc(pMnode, pReq, &createReq);
×
UNCOV
475
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
476

477
_OVER:
×
UNCOV
478
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
479
    mError("func:%s, failed to create since %s", createReq.name, tstrerror(code));
×
480
  }
481

UNCOV
482
  mndReleaseFunc(pMnode, pFunc);
×
UNCOV
483
  tFreeSCreateFuncReq(&createReq);
×
UNCOV
484
  TAOS_RETURN(code);
×
485
}
486

UNCOV
487
static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
×
UNCOV
488
  SMnode      *pMnode = pReq->info.node;
×
UNCOV
489
  int32_t      code = -1;
×
UNCOV
490
  SFuncObj    *pFunc = NULL;
×
UNCOV
491
  SDropFuncReq dropReq = {0};
×
492

UNCOV
493
  TAOS_CHECK_GOTO(tDeserializeSDropFuncReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
494

UNCOV
495
  mInfo("func:%s, start to drop", dropReq.name);
×
UNCOV
496
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC), NULL, _OVER);
×
497

UNCOV
498
  if (dropReq.name[0] == 0) {
×
UNCOV
499
    code = TSDB_CODE_MND_INVALID_FUNC_NAME;
×
UNCOV
500
    goto _OVER;
×
501
  }
502

UNCOV
503
  pFunc = mndAcquireFunc(pMnode, dropReq.name);
×
UNCOV
504
  if (pFunc == NULL) {
×
UNCOV
505
    if (dropReq.igNotExists) {
×
UNCOV
506
      mInfo("func:%s, not exist, ignore not exist is set", dropReq.name);
×
UNCOV
507
      code = 0;
×
UNCOV
508
      goto _OVER;
×
509
    } else {
UNCOV
510
      code = TSDB_CODE_MND_FUNC_NOT_EXIST;
×
UNCOV
511
      goto _OVER;
×
512
    }
513
  }
514

UNCOV
515
  code = mndDropFunc(pMnode, pReq, pFunc);
×
UNCOV
516
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
517

518
_OVER:
×
UNCOV
519
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
UNCOV
520
    mError("func:%s, failed to drop since %s", dropReq.name, tstrerror(code));
×
521
  }
522

UNCOV
523
  mndReleaseFunc(pMnode, pFunc);
×
UNCOV
524
  TAOS_RETURN(code);
×
525
}
526

UNCOV
527
static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
×
UNCOV
528
  SMnode          *pMnode = pReq->info.node;
×
UNCOV
529
  int32_t          code = -1;
×
UNCOV
530
  SRetrieveFuncReq retrieveReq = {0};
×
UNCOV
531
  SRetrieveFuncRsp retrieveRsp = {0};
×
532

UNCOV
533
  if (tDeserializeSRetrieveFuncReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
×
534
    code = TSDB_CODE_INVALID_MSG;
×
535
    goto RETRIEVE_FUNC_OVER;
×
536
  }
537

UNCOV
538
  if (retrieveReq.numOfFuncs <= 0 || retrieveReq.numOfFuncs > TSDB_FUNC_MAX_RETRIEVE) {
×
UNCOV
539
    code = TSDB_CODE_MND_INVALID_FUNC_RETRIEVE;
×
UNCOV
540
    goto RETRIEVE_FUNC_OVER;
×
541
  }
542

UNCOV
543
  retrieveRsp.numOfFuncs = retrieveReq.numOfFuncs;
×
UNCOV
544
  retrieveRsp.pFuncInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncInfo));
×
UNCOV
545
  if (retrieveRsp.pFuncInfos == NULL) {
×
546
    code = terrno;
×
547
    goto RETRIEVE_FUNC_OVER;
×
548
  }
549

UNCOV
550
  retrieveRsp.pFuncExtraInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncExtraInfo));
×
UNCOV
551
  if (retrieveRsp.pFuncExtraInfos == NULL) {
×
552
    code = terrno;
×
553
    goto RETRIEVE_FUNC_OVER;
×
554
  }
555

UNCOV
556
  for (int32_t i = 0; i < retrieveReq.numOfFuncs; ++i) {
×
UNCOV
557
    char *funcName = taosArrayGet(retrieveReq.pFuncNames, i);
×
558

UNCOV
559
    SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
×
UNCOV
560
    if (pFunc == NULL) {
×
UNCOV
561
      if (terrno != 0) code = terrno;
×
UNCOV
562
      goto RETRIEVE_FUNC_OVER;
×
563
    }
564

UNCOV
565
    SFuncInfo funcInfo = {0};
×
UNCOV
566
    (void)memcpy(funcInfo.name, pFunc->name, TSDB_FUNC_NAME_LEN);
×
UNCOV
567
    funcInfo.funcType = pFunc->funcType;
×
UNCOV
568
    funcInfo.scriptType = pFunc->scriptType;
×
UNCOV
569
    funcInfo.outputType = pFunc->outputType;
×
UNCOV
570
    funcInfo.outputLen = pFunc->outputLen;
×
UNCOV
571
    funcInfo.bufSize = pFunc->bufSize;
×
UNCOV
572
    funcInfo.signature = pFunc->signature;
×
UNCOV
573
    if (retrieveReq.ignoreCodeComment) {
×
UNCOV
574
      funcInfo.commentSize = 0;
×
UNCOV
575
      funcInfo.codeSize = 0;
×
576
    } else {
UNCOV
577
      funcInfo.commentSize = pFunc->commentSize;
×
UNCOV
578
      funcInfo.codeSize = pFunc->codeSize;
×
UNCOV
579
      funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize);
×
UNCOV
580
      if (funcInfo.pCode == NULL) {
×
581
        terrno = terrno;
×
582
        goto RETRIEVE_FUNC_OVER;
×
583
      }
UNCOV
584
      (void)memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
×
UNCOV
585
      if (funcInfo.commentSize > 0) {
×
UNCOV
586
        funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize);
×
UNCOV
587
        if (funcInfo.pComment == NULL) {
×
588
          terrno = terrno;
×
589
          goto RETRIEVE_FUNC_OVER;
×
590
        }
UNCOV
591
        (void)memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
×
592
      }
593
    }
UNCOV
594
    if (taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo) == NULL) {
×
595
      terrno = terrno;
×
596
      goto RETRIEVE_FUNC_OVER;
×
597
    }
UNCOV
598
    SFuncExtraInfo extraInfo = {0};
×
UNCOV
599
    extraInfo.funcVersion = pFunc->funcVersion;
×
UNCOV
600
    extraInfo.funcCreatedTime = pFunc->createdTime;
×
UNCOV
601
    if (taosArrayPush(retrieveRsp.pFuncExtraInfos, &extraInfo) == NULL) {
×
602
      terrno = terrno;
×
603
      goto RETRIEVE_FUNC_OVER;
×
604
    }
605

UNCOV
606
    mndReleaseFunc(pMnode, pFunc);
×
607
  }
608

UNCOV
609
  int32_t contLen = tSerializeSRetrieveFuncRsp(NULL, 0, &retrieveRsp);
×
UNCOV
610
  void   *pRsp = rpcMallocCont(contLen);
×
UNCOV
611
  if (pRsp == NULL) {
×
612
    code = terrno;
×
613
    goto RETRIEVE_FUNC_OVER;
×
614
  }
615

UNCOV
616
  if ((contLen = tSerializeSRetrieveFuncRsp(pRsp, contLen, &retrieveRsp)) <= 0) {
×
617
    code = contLen;
×
618
    goto RETRIEVE_FUNC_OVER;
×
619
  }
620

UNCOV
621
  pReq->info.rsp = pRsp;
×
UNCOV
622
  pReq->info.rspLen = contLen;
×
623

UNCOV
624
  code = 0;
×
625

UNCOV
626
RETRIEVE_FUNC_OVER:
×
UNCOV
627
  tFreeSRetrieveFuncReq(&retrieveReq);
×
UNCOV
628
  tFreeSRetrieveFuncRsp(&retrieveRsp);
×
629

UNCOV
630
  TAOS_RETURN(code);
×
631
}
632

UNCOV
633
static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int32_t len) {
×
UNCOV
634
  char *msg = "unknown";
×
UNCOV
635
  if (type >= sizeof(tDataTypes) / sizeof(tDataTypes[0])) {
×
636
    return msg;
×
637
  }
638

UNCOV
639
  if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_BINARY ||
×
640
      type == TSDB_DATA_TYPE_GEOMETRY) {
UNCOV
641
    int32_t bytes = len > 0 ? (int32_t)(len - VARSTR_HEADER_SIZE) : len;
×
642

UNCOV
643
    (void)snprintf(buf, buflen - 1, "%s(%d)", tDataTypes[type].name, type == TSDB_DATA_TYPE_NCHAR ? bytes / 4 : bytes);
×
UNCOV
644
    buf[buflen - 1] = 0;
×
645

UNCOV
646
    return buf;
×
647
  }
648

UNCOV
649
  return tDataTypes[type].name;
×
650
}
651

UNCOV
652
static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
UNCOV
653
  SMnode   *pMnode = pReq->info.node;
×
UNCOV
654
  SSdb     *pSdb = pMnode->pSdb;
×
UNCOV
655
  int32_t   numOfRows = 0;
×
UNCOV
656
  SFuncObj *pFunc = NULL;
×
UNCOV
657
  int32_t   cols = 0;
×
UNCOV
658
  int32_t   code = 0;
×
659
  char      buf[TSDB_TYPE_STR_MAX_LEN];
660

UNCOV
661
  while (numOfRows < rows) {
×
UNCOV
662
    pShow->pIter = sdbFetch(pSdb, SDB_FUNC, pShow->pIter, (void **)&pFunc);
×
UNCOV
663
    if (pShow->pIter == NULL) break;
×
664

UNCOV
665
    cols = 0;
×
666

UNCOV
667
    char b1[tListLen(pFunc->name) + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
668
    STR_WITH_MAXSIZE_TO_VARSTR(b1, pFunc->name, pShow->pMeta->pSchemas[cols].bytes);
×
669

UNCOV
670
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
671
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)b1, false), pSdb, pFunc);
×
672

UNCOV
673
    if (pFunc->pComment) {
×
UNCOV
674
      char *b2 = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
×
UNCOV
675
      STR_WITH_MAXSIZE_TO_VARSTR(b2, pFunc->pComment, pShow->pMeta->pSchemas[cols].bytes);
×
676

UNCOV
677
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
678
      code = colDataSetVal(pColInfo, numOfRows, (const char *)b2, false);
×
UNCOV
679
      if (code != 0) {
×
680
        sdbRelease(pSdb, pFunc);
×
681
        taosMemoryFree(b2);
×
682
        TAOS_RETURN(code);
×
683
      }
UNCOV
684
      taosMemoryFree(b2);
×
685
    } else {
UNCOV
686
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
687
      TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, NULL, true), pSdb, pFunc);
×
UNCOV
688
      if (code != 0) {
×
689
        sdbRelease(pSdb, pFunc);
×
690
        TAOS_RETURN(code);
×
691
      }
692
    }
693

UNCOV
694
    int32_t isAgg = (pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE) ? 1 : 0;
×
695

UNCOV
696
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
697
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&isAgg, false), pSdb, pFunc);
×
UNCOV
698
    char b3[TSDB_TYPE_STR_MAX_LEN + 1] = {0};
×
UNCOV
699
    STR_WITH_MAXSIZE_TO_VARSTR(b3, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->outputType, pFunc->outputLen),
×
700
                               pShow->pMeta->pSchemas[cols].bytes);
701

UNCOV
702
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
703
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)b3, false), pSdb, pFunc);
×
704

UNCOV
705
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
706
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->createdTime, false), pSdb,
×
707
                                   pFunc);
708

UNCOV
709
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
710
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->codeSize, false), pSdb,
×
711
                                   pFunc);
712

UNCOV
713
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
714
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->bufSize, false), pSdb,
×
715
                                   pFunc);
716

UNCOV
717
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
718
    char *language = "";
×
UNCOV
719
    if (pFunc->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
×
UNCOV
720
      language = "C";
×
UNCOV
721
    } else if (pFunc->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
×
UNCOV
722
      language = "Python";
×
723
    }
UNCOV
724
    char varLang[TSDB_TYPE_STR_MAX_LEN + 1] = {0};
×
UNCOV
725
    varDataSetLen(varLang, strlen(language));
×
UNCOV
726
    tstrncpy(varDataVal(varLang), language, sizeof(varLang) - VARSTR_HEADER_SIZE);
×
UNCOV
727
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)varLang, false), pSdb, pFunc);
×
728

UNCOV
729
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
730
    int32_t varCodeLen = (pFunc->codeSize + VARSTR_HEADER_SIZE) > TSDB_MAX_BINARY_LEN
×
731
                             ? TSDB_MAX_BINARY_LEN
UNCOV
732
                             : pFunc->codeSize + VARSTR_HEADER_SIZE;
×
UNCOV
733
    char   *b4 = taosMemoryMalloc(varCodeLen);
×
UNCOV
734
    if (b4 == NULL) {
×
735
      code = terrno;
×
736
      sdbRelease(pSdb, pFunc);
×
737
      TAOS_RETURN(code);
×
738
    }
UNCOV
739
    (void)memcpy(varDataVal(b4), pFunc->pCode, varCodeLen - VARSTR_HEADER_SIZE);
×
UNCOV
740
    varDataSetLen(b4, varCodeLen - VARSTR_HEADER_SIZE);
×
UNCOV
741
    code = colDataSetVal(pColInfo, numOfRows, (const char *)b4, false);
×
UNCOV
742
    if (code < 0) {
×
743
      sdbRelease(pSdb, pFunc);
×
744
      taosMemoryFree(b4);
×
745
      TAOS_RETURN(code);
×
746
    }
UNCOV
747
    taosMemoryFree(b4);
×
748

UNCOV
749
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
UNCOV
750
    TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->funcVersion, false), pSdb,
×
751
                                   pFunc);
752

UNCOV
753
    numOfRows++;
×
UNCOV
754
    sdbRelease(pSdb, pFunc);
×
755
  }
756

UNCOV
757
  pShow->numOfRows += numOfRows;
×
UNCOV
758
  return numOfRows;
×
759
}
760

761
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter) {
×
762
  SSdb *pSdb = pMnode->pSdb;
×
763
  sdbCancelFetchByType(pSdb, pIter, SDB_FUNC);
×
764
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc