• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/function/src/udfd.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#ifdef USE_UDF
17
// clang-format off
18
#include "uv.h"
19
#include "os.h"
20
#include "fnLog.h"
21
#include "thash.h"
22

23
#include "tudf.h"
24
#include "tudfInt.h"
25
#include "version.h"
26

27
#include "tdatablock.h"
28
#include "tdataformat.h"
29
#include "tglobal.h"
30
#include "tmsg.h"
31
#include "trpc.h"
32
#include "tmisce.h"
33
#include "tversion.h"
34
// clang-format on
35

36
#define UDFD_MAX_SCRIPT_PLUGINS 64
37
#define UDFD_MAX_SCRIPT_TYPE    1
38
#define UDFD_MAX_PLUGIN_FUNCS   9
39

40
typedef struct SUdfCPluginCtx {
41
  uv_lib_t lib;
42

43
  TUdfScalarProcFunc scalarProcFunc;
44

45
  TUdfAggStartFunc   aggStartFunc;
46
  TUdfAggProcessFunc aggProcFunc;
47
  TUdfAggFinishFunc  aggFinishFunc;
48
  TUdfAggMergeFunc   aggMergeFunc;
49

50
  TUdfInitFunc    initFunc;
51
  TUdfDestroyFunc destroyFunc;
52
} SUdfCPluginCtx;
53

UNCOV
54
int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
×
55

UNCOV
56
int32_t udfdCPluginClose() { return 0; }
×
57

UNCOV
58
int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
×
UNCOV
59
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
×
UNCOV
60
  char  initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
×
UNCOV
61
  char *initSuffix = "_init";
×
UNCOV
62
  snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix);
×
UNCOV
63
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)));
×
64

UNCOV
65
  char  destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0};
×
UNCOV
66
  char *destroySuffix = "_destroy";
×
UNCOV
67
  snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix);
×
UNCOV
68
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)));
×
UNCOV
69
  return 0;
×
70
}
71

UNCOV
72
int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
×
UNCOV
73
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
×
UNCOV
74
  char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
×
UNCOV
75
  snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); 
×
UNCOV
76
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)));
×
77

UNCOV
78
  char  startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
×
UNCOV
79
  char *startSuffix = "_start";
×
UNCOV
80
  snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix);
×
UNCOV
81
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)));
×
82

UNCOV
83
  char  finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0};
×
UNCOV
84
  char *finishSuffix = "_finish";
×
UNCOV
85
  snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix);
×
UNCOV
86
  TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)));
×
87

UNCOV
88
  char  mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
×
UNCOV
89
  char *mergeSuffix = "_merge";
×
UNCOV
90
  snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix);
×
UNCOV
91
  int ret = uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc));
×
UNCOV
92
  if (ret != 0) {
×
UNCOV
93
    fnInfo("uv_dlsym function %s. error: %s", mergeFuncName, uv_strerror(ret));
×
94
  }
UNCOV
95
  return 0;
×
96
}
97

UNCOV
98
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
×
UNCOV
99
  TAOS_UDF_CHECK_PTR_RCODE(udf, pUdfCtx);
×
UNCOV
100
  int32_t         err = 0;
×
UNCOV
101
  SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx));
×
102
  if (NULL == udfCtx) {
×
UNCOV
103
    return terrno;
×
104
  }
UNCOV
105
  err = uv_dlopen(udf->path, &udfCtx->lib);
×
106
  if (err != 0) {
×
107
    fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
×
108
    taosMemoryFree(udfCtx);
×
UNCOV
109
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
110
  }
UNCOV
111
  const char *udfName = udf->name;
×
112

UNCOV
113
  err = udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName);
×
114
  if (err != 0) {
×
115
    fnError("can not load init/destroy functions. error: %d", err);
×
116
    err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
UNCOV
117
    goto _exit;
×
118
  }
119

UNCOV
120
  if (udf->funcType == UDF_FUNC_TYPE_SCALAR) {
×
UNCOV
121
    char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
×
UNCOV
122
    snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
×
123
    if (uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)) != 0) {
×
124
      fnError("can not load library function %s. error: %s", processFuncName, uv_strerror(err));
×
125
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
UNCOV
126
      goto _exit;
×
127
    }
UNCOV
128
  } else if (udf->funcType == UDF_FUNC_TYPE_AGG) {
×
UNCOV
129
    err = udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName);
×
130
    if (err != 0) {
×
131
      fnError("can not load aggregation functions. error: %d", err);
×
132
      err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
UNCOV
133
      goto _exit;
×
134
    }
135
  }
136

UNCOV
137
  if (udfCtx->initFunc) {
×
UNCOV
138
    err = (udfCtx->initFunc)();
×
139
    if (err != 0) {
×
140
      fnError("udf init function failed. error: %d", err);
×
UNCOV
141
      goto _exit;
×
142
    }
143
  }
UNCOV
144
  *pUdfCtx = udfCtx;
×
145
  return 0;
×
146
_exit:
×
147
  uv_dlclose(&udfCtx->lib);
×
148
  taosMemoryFree(udfCtx);
×
UNCOV
149
  return err;
×
150
}
151

UNCOV
152
int32_t udfdCPluginUdfDestroy(void *udfCtx) {
×
UNCOV
153
  TAOS_UDF_CHECK_PTR_RCODE(udfCtx);
×
UNCOV
154
  SUdfCPluginCtx *ctx = udfCtx;
×
UNCOV
155
  int32_t         code = 0;
×
UNCOV
156
  if (ctx->destroyFunc) {
×
UNCOV
157
    code = (ctx->destroyFunc)();
×
158
  }
UNCOV
159
  uv_dlclose(&ctx->lib);
×
UNCOV
160
  taosMemoryFree(ctx);
×
UNCOV
161
  return code;
×
162
}
163

UNCOV
164
int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) {
×
UNCOV
165
  TAOS_UDF_CHECK_PTR_RCODE(block, resultCol, udfCtx);
×
UNCOV
166
  SUdfCPluginCtx *ctx = udfCtx;
×
UNCOV
167
  if (ctx->scalarProcFunc) {
×
UNCOV
168
    return ctx->scalarProcFunc(block, resultCol);
×
169
  } else {
170
    fnError("taosudf c plugin scalar proc not implemented");
×
UNCOV
171
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
172
  }
173
}
174

UNCOV
175
int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) {
×
UNCOV
176
  TAOS_UDF_CHECK_PTR_RCODE(buf, udfCtx);
×
UNCOV
177
  SUdfCPluginCtx *ctx = udfCtx;
×
UNCOV
178
  if (ctx->aggStartFunc) {
×
UNCOV
179
    return ctx->aggStartFunc(buf);
×
180
  } else {
181
    fnError("taosudf c plugin aggregation start not implemented");
×
UNCOV
182
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
183
  }
184
  return 0;
185
}
186

UNCOV
187
int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) {
×
UNCOV
188
  TAOS_UDF_CHECK_PTR_RCODE(block, interBuf, newInterBuf, udfCtx);
×
UNCOV
189
  SUdfCPluginCtx *ctx = udfCtx;
×
UNCOV
190
  if (ctx->aggProcFunc) {
×
UNCOV
191
    return ctx->aggProcFunc(block, interBuf, newInterBuf);
×
192
  } else {
193
    fnError("taosudf c plugin aggregation process not implemented");
×
UNCOV
194
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
195
  }
196
}
197

198
// int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
199
//                                void *udfCtx) {
200
//   TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx);
201
//   SUdfCPluginCtx *ctx = udfCtx;
202
//   if (ctx->aggMergeFunc) {
203
//     return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
204
//   } else {
205
//     fnError("taosudf c plugin aggregation merge not implemented");
206
//     return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
207
//   }
208
// }
209

UNCOV
210
int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) {
×
UNCOV
211
  TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx);
×
UNCOV
212
  SUdfCPluginCtx *ctx = udfCtx;
×
UNCOV
213
  if (ctx->aggFinishFunc) {
×
UNCOV
214
    return ctx->aggFinishFunc(buf, resultData);
×
215
  } else {
216
    fnError("taosudf c plugin aggregation finish not implemented");
×
UNCOV
217
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
218
  }
219
  return 0;
220
}
221

222
// for c, the function pointer are filled directly and libloaded = true;
223
// for others, dlopen/dlsym to find function pointers
224
typedef struct SUdfScriptPlugin {
225
  int8_t scriptType;
226

227
  char     libPath[PATH_MAX];
228
  bool     libLoaded;
229
  uv_lib_t lib;
230

231
  TScriptUdfScalarProcFunc udfScalarProcFunc;
232
  TScriptUdfAggStartFunc   udfAggStartFunc;
233
  TScriptUdfAggProcessFunc udfAggProcFunc;
234
  TScriptUdfAggMergeFunc   udfAggMergeFunc;
235
  TScriptUdfAggFinishFunc  udfAggFinishFunc;
236

237
  TScriptUdfInitFunc    udfInitFunc;
238
  TScriptUdfDestoryFunc udfDestroyFunc;
239

240
  TScriptOpenFunc  openFunc;
241
  TScriptCloseFunc closeFunc;
242
} SUdfScriptPlugin;
243

244
typedef struct SUdfdContext {
245
  uv_loop_t  *loop;
246
  uv_pipe_t   ctrlPipe;
247
  uv_signal_t intrSignal;
248
  char        listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
249
  uv_pipe_t   listeningPipe;
250

251
  void     *clientRpc;
252
  SCorEpSet mgmtEp;
253

254
  uv_mutex_t udfsMutex;
255
  SHashObj  *udfsHash;
256

257
  uv_mutex_t        scriptPluginsMutex;
258
  SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS];
259

260
  SArray *residentFuncs;
261

262
  char udfDataDir[PATH_MAX];
263
  bool printVersion;
264
} SUdfdContext;
265

266
SUdfdContext global;
267

268
struct SUdfdUvConn;
269
struct SUvUdfWork;
270

271
typedef struct SUdfdUvConn {
272
  uv_stream_t *client;
273
  char        *inputBuf;
274
  int32_t      inputLen;
275
  int32_t      inputCap;
276
  int32_t      inputTotal;
277

278
  struct SUvUdfWork *pWorkList;  // head of work list
279
} SUdfdUvConn;
280

281
typedef struct SUvUdfWork {
282
  SUdfdUvConn *conn;
283
  uv_buf_t     input;
284
  uv_buf_t     output;
285

286
  struct SUvUdfWork *pWorkNext;
287
} SUvUdfWork;
288

289
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState;
290

291
typedef struct SUdf {
292
  char    name[TSDB_FUNC_NAME_LEN + 1];
293
  int32_t version;
294
  int64_t createdTime;
295

296
  int8_t  funcType;
297
  int8_t  scriptType;
298
  int8_t  outputType;
299
  int32_t outputLen;
300
  int32_t bufSize;
301

302
  char path[PATH_MAX];
303

304
  int32_t    refCount;
305
  EUdfState  state;
306
  uv_mutex_t lock;
307
  uv_cond_t  condReady;
308
  bool       resident;
309

310
  SUdfScriptPlugin *scriptPlugin;
311
  void             *scriptUdfCtx;
312

313
  int64_t lastFetchTime;  // last fetch time in milliseconds
314
  bool    expired;
315
} SUdf;
316

317
typedef struct SUdfcFuncHandle {
318
  SUdf *udf;
319
} SUdfcFuncHandle;
320

321
typedef enum EUdfdRpcReqRspType {
322
  UDFD_RPC_MNODE_CONNECT = 0,
323
  UDFD_RPC_RETRIVE_FUNC,
324
} EUdfdRpcReqRspType;
325

326
typedef struct SUdfdRpcSendRecvInfo {
327
  EUdfdRpcReqRspType rpcType;
328
  int32_t            code;
329
  void              *param;
330
  uv_sem_t           resultSem;
331
} SUdfdRpcSendRecvInfo;
332

333
static void    udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
334
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
335
static int32_t udfdConnectToMnode();
336
static bool    udfdRpcRfp(int32_t code, tmsg_t msgType);
337
static int     initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
338
static int32_t udfdOpenClientRpc();
339
static void    udfdCloseClientRpc();
340

341
static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
342
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
343
static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
344
static void udfdProcessRequest(uv_work_t *req);
345
static void udfdOnWrite(uv_write_t *req, int status);
346
static void udfdSendResponse(uv_work_t *work, int status);
347
static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
348
static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe);
349
static void udfdHandleRequest(SUdfdUvConn *conn);
350
static void udfdPipeCloseCb(uv_handle_t *pipe);
UNCOV
351
static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
×
352
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
353
static void udfdOnNewConnection(uv_stream_t *server, int status);
354

355
static void    udfdIntrSignalHandler(uv_signal_t *handle, int signum);
356
static void    removeListeningPipe();
357

358
static void    udfdPrintVersion();
359
static int32_t udfdParseArgs(int32_t argc, char *argv[]);
360
static int32_t udfdInitLog();
361

362
static void    udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
363
static void    udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
364
static int32_t udfdUvInit();
365
static void    udfdCloseWalkCb(uv_handle_t *handle, void *arg);
366
static void    udfdRun();
367
static void    udfdConnectMnodeThreadFunc(void *args);
368

369
int32_t udfdNewUdf(SUdf **pUdf,  const char *udfName);
370
void  udfdGetFuncBodyPath(const SUdf *udf, char *path);
371

UNCOV
372
int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
×
UNCOV
373
  TAOS_UDF_CHECK_PTR_RCODE(plugin);
×
UNCOV
374
  plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
×
UNCOV
375
  plugin->openFunc = udfdCPluginOpen;
×
UNCOV
376
  plugin->closeFunc = udfdCPluginClose;
×
UNCOV
377
  plugin->udfInitFunc = udfdCPluginUdfInit;
×
UNCOV
378
  plugin->udfDestroyFunc = udfdCPluginUdfDestroy;
×
UNCOV
379
  plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc;
×
UNCOV
380
  plugin->udfAggStartFunc = udfdCPluginUdfAggStart;
×
UNCOV
381
  plugin->udfAggProcFunc = udfdCPluginUdfAggProc;
×
382
  // plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge;
UNCOV
383
  plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish;
×
384

UNCOV
385
  SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}};
×
UNCOV
386
  int32_t           err = plugin->openFunc(items, 1);
×
UNCOV
387
  if (err != 0) return err;
×
UNCOV
388
  return 0;
×
389
}
390

UNCOV
391
int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) {
×
UNCOV
392
  TAOS_UDF_CHECK_PTR_RCODE(libPath, pLib, funcName, func);
×
UNCOV
393
  int err = uv_dlopen(libPath, pLib);
×
394
  if (err != 0) {
×
395
    fnError("can not load library %s. error: %s", libPath, uv_strerror(err));
×
UNCOV
396
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
397
  }
398

UNCOV
399
  for (int i = 0; i < numOfFuncs; ++i) {
×
UNCOV
400
    err = uv_dlsym(pLib, funcName[i], func[i]);
×
401
    if (err != 0) {
×
UNCOV
402
      fnError("load library function failed. lib %s function %s", libPath, funcName[i]);
×
403
    }
404
  }
UNCOV
405
  return 0;
×
406
}
407

UNCOV
408
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
×
UNCOV
409
  TAOS_UDF_CHECK_PTR_RCODE(plugin);
×
UNCOV
410
  plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
×
411
  // todo: windows support
UNCOV
412
  snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so");
×
UNCOV
413
  plugin->libLoaded = false;
×
UNCOV
414
  const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen",         "pyClose",         "pyUdfInit",
×
415
                                                 "pyUdfDestroy",   "pyUdfScalarProc", "pyUdfAggStart",
416
                                                 "pyUdfAggFinish", "pyUdfAggProc",    "pyUdfAggMerge"};
UNCOV
417
  void      **funcs[UDFD_MAX_PLUGIN_FUNCS] = {
×
UNCOV
418
           (void **)&plugin->openFunc,         (void **)&plugin->closeFunc,         (void **)&plugin->udfInitFunc,
×
UNCOV
419
           (void **)&plugin->udfDestroyFunc,   (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc,
×
UNCOV
420
           (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc,    (void **)&plugin->udfAggMergeFunc};
×
UNCOV
421
  int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS);
×
422
  if (err != 0) {
×
423
    fnError("can not load python plugin. lib path %s", plugin->libPath);
×
UNCOV
424
    return err;
×
425
  }
426

UNCOV
427
  if (plugin->openFunc) {
×
UNCOV
428
    int16_t lenPythonPath =
×
UNCOV
429
        strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1;  // global.udfDataDir:tsUdfdLdLibPath
×
UNCOV
430
    char *pythonPath = taosMemoryMalloc(lenPythonPath);
×
431
    if(pythonPath == NULL) {
×
432
      uv_dlclose(&plugin->lib);
×
UNCOV
433
      return terrno;
×
434
    }
435
#ifdef WINDOWS
436
    snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath);
437
#else
UNCOV
438
    snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath);
×
439
#endif
UNCOV
440
    SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}};
×
UNCOV
441
    err = plugin->openFunc(items, 2);
×
UNCOV
442
    taosMemoryFree(pythonPath);
×
443
  }
444
  if (err != 0) {
×
445
    fnError("udf script python plugin open func failed. error: %d", err);
×
446
    uv_dlclose(&plugin->lib);
×
UNCOV
447
    return err;
×
448
  }
UNCOV
449
  plugin->libLoaded = true;
×
450

UNCOV
451
  return 0;
×
452
}
453

UNCOV
454
void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
×
UNCOV
455
  TAOS_UDF_CHECK_PTR_RVOID(plugin);
×
UNCOV
456
  if (plugin->closeFunc) {
×
457
    if (plugin->closeFunc() != 0) {
×
UNCOV
458
      fnError("udf script c plugin close func failed.line:%d", __LINE__);
×
459
    }
460
  }
UNCOV
461
  plugin->openFunc = NULL;
×
UNCOV
462
  plugin->closeFunc = NULL;
×
UNCOV
463
  plugin->udfInitFunc = NULL;
×
UNCOV
464
  plugin->udfDestroyFunc = NULL;
×
UNCOV
465
  plugin->udfScalarProcFunc = NULL;
×
UNCOV
466
  plugin->udfAggStartFunc = NULL;
×
UNCOV
467
  plugin->udfAggProcFunc = NULL;
×
UNCOV
468
  plugin->udfAggMergeFunc = NULL;
×
UNCOV
469
  plugin->udfAggFinishFunc = NULL;
×
UNCOV
470
  return;
×
471
}
472

UNCOV
473
void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
×
UNCOV
474
  TAOS_UDF_CHECK_PTR_RVOID(plugin);
×
UNCOV
475
  if (plugin->closeFunc) {
×
476
    if (plugin->closeFunc() != 0) {
×
UNCOV
477
      fnError("udf script python plugin close func failed.line:%d", __LINE__);
×
478
    }
479
  }
UNCOV
480
  uv_dlclose(&plugin->lib);
×
UNCOV
481
  if (plugin->libLoaded) {
×
UNCOV
482
    plugin->libLoaded = false;
×
483
  }
UNCOV
484
  plugin->openFunc = NULL;
×
UNCOV
485
  plugin->closeFunc = NULL;
×
UNCOV
486
  plugin->udfInitFunc = NULL;
×
UNCOV
487
  plugin->udfDestroyFunc = NULL;
×
UNCOV
488
  plugin->udfScalarProcFunc = NULL;
×
UNCOV
489
  plugin->udfAggStartFunc = NULL;
×
UNCOV
490
  plugin->udfAggProcFunc = NULL;
×
UNCOV
491
  plugin->udfAggMergeFunc = NULL;
×
UNCOV
492
  plugin->udfAggFinishFunc = NULL;
×
493
}
494

UNCOV
495
int32_t udfdInitScriptPlugin(int8_t scriptType) {
×
UNCOV
496
  SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
×
497
  if (plugin == NULL) {
×
UNCOV
498
    return terrno;
×
499
  }
UNCOV
500
  int32_t err = 0;
×
UNCOV
501
  switch (scriptType) {
×
UNCOV
502
    case TSDB_FUNC_SCRIPT_BIN_LIB:
×
UNCOV
503
      err = udfdInitializeCPlugin(plugin);
×
504
      if (err != 0) {
×
505
        fnError("udf script c plugin init failed. error: %d", err);
×
506
        taosMemoryFree(plugin);
×
UNCOV
507
        return err;
×
508
      }
UNCOV
509
      break;
×
UNCOV
510
    case TSDB_FUNC_SCRIPT_PYTHON: {
×
UNCOV
511
      err = udfdInitializePythonPlugin(plugin);
×
512
      if (err != 0) {
×
513
        fnError("udf script python plugin init failed. error: %d", err);
×
514
        taosMemoryFree(plugin);
×
UNCOV
515
        return err;
×
516
      }
UNCOV
517
      break;
×
518
    }
519
    default:
×
520
      fnError("udf script type %d not supported", scriptType);
×
521
      taosMemoryFree(plugin);
×
UNCOV
522
      return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
523
  }
524

UNCOV
525
  global.scriptPlugins[scriptType] = plugin;
×
UNCOV
526
  return TSDB_CODE_SUCCESS;
×
527
}
528

UNCOV
529
void udfdDeinitScriptPlugins() {
×
UNCOV
530
  SUdfScriptPlugin *plugin = NULL;
×
UNCOV
531
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON];
×
UNCOV
532
  if (plugin != NULL) {
×
UNCOV
533
    udfdDeinitPythonPlugin(plugin);
×
UNCOV
534
    taosMemoryFree(plugin);
×
UNCOV
535
    global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON] = NULL;
×
536
  }
537

UNCOV
538
  plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB];
×
UNCOV
539
  if (plugin != NULL) {
×
UNCOV
540
    udfdDeinitCPlugin(plugin);
×
UNCOV
541
    taosMemoryFree(plugin);
×
UNCOV
542
    global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB] = NULL;
×
543
  }
UNCOV
544
  return;
×
545
}
546

UNCOV
547
void udfdProcessRequest(uv_work_t *req) {
×
UNCOV
548
  TAOS_UDF_CHECK_PTR_RVOID(req);
×
UNCOV
549
  SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
×
550
  if (uvUdf == NULL) {
×
551
    fnError("udf work is NULL");
×
UNCOV
552
    return;
×
553
  }
UNCOV
554
  SUdfRequest request = {0};
×
UNCOV
555
  if(decodeUdfRequest(uvUdf->input.base, &request) == NULL)
×
556
  {
UNCOV
557
    taosMemoryFreeClear(uvUdf->input.base);
×
558
    fnError("udf request decode failed");
×
UNCOV
559
    return;
×
560
  }
561

UNCOV
562
  switch (request.type) {
×
UNCOV
563
    case UDF_TASK_SETUP: {
×
UNCOV
564
      udfdProcessSetupRequest(uvUdf, &request);
×
UNCOV
565
      break;
×
566
    }
567

UNCOV
568
    case UDF_TASK_CALL: {
×
UNCOV
569
      udfdProcessCallRequest(uvUdf, &request);
×
UNCOV
570
      break;
×
571
    }
UNCOV
572
    case UDF_TASK_TEARDOWN: {
×
UNCOV
573
      udfdProcessTeardownRequest(uvUdf, &request);
×
UNCOV
574
      break;
×
575
    }
576
    default: {
×
UNCOV
577
      break;
×
578
    }
579
  }
580
}
581

UNCOV
582
static void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
×
UNCOV
583
  udfInfo->bufSize = udf->bufSize;
×
UNCOV
584
  if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
×
UNCOV
585
    udfInfo->funcType = UDF_FUNC_TYPE_AGG;
×
UNCOV
586
  } else if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
×
UNCOV
587
    udfInfo->funcType = UDF_FUNC_TYPE_SCALAR;
×
588
  }
UNCOV
589
  udfInfo->name = udf->name;
×
UNCOV
590
  udfInfo->version = udf->version;
×
UNCOV
591
  udfInfo->createdTime = udf->createdTime;
×
UNCOV
592
  udfInfo->outputLen = udf->outputLen;
×
UNCOV
593
  udfInfo->outputType = udf->outputType;
×
UNCOV
594
  udfInfo->path = udf->path;
×
UNCOV
595
  udfInfo->scriptType = udf->scriptType;
×
UNCOV
596
}
×
597

UNCOV
598
static int32_t udfdInitUdf(char *udfName, SUdf *udf) {
×
UNCOV
599
  TAOS_UDF_CHECK_PTR_RCODE(udfName, udf);
×
UNCOV
600
  int32_t err = 0;
×
UNCOV
601
  err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf);
×
602
  if (err != 0) {
×
603
    fnError("can not retrieve udf from mnode. udf name %s", udfName);
×
UNCOV
604
    return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
×
605
  }
606
  if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) {
×
607
    fnError("udf name %s script type %d not supported", udfName, udf->scriptType);
×
UNCOV
608
    return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
×
609
  }
610

UNCOV
611
  uv_mutex_lock(&global.scriptPluginsMutex);
×
UNCOV
612
  SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType];
×
UNCOV
613
  if (scriptPlugin == NULL) {
×
UNCOV
614
    err = udfdInitScriptPlugin(udf->scriptType);
×
615
    if (err != 0) {
×
616
      fnError("udf name %s init script plugin failed. error %d", udfName, err);
×
617
      uv_mutex_unlock(&global.scriptPluginsMutex);
×
UNCOV
618
      return err;
×
619
    }
620
  }
UNCOV
621
  uv_mutex_unlock(&global.scriptPluginsMutex);
×
UNCOV
622
  udf->scriptPlugin = global.scriptPlugins[udf->scriptType];
×
623

UNCOV
624
  SScriptUdfInfo info = {0};
×
UNCOV
625
  convertUdf2UdfInfo(udf, &info);
×
UNCOV
626
  err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx);
×
627
  if (err != 0) {
×
628
    fnError("udf name %s init failed. error %d", udfName, err);
×
UNCOV
629
    return err;
×
630
  }
631

UNCOV
632
  fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx);
×
UNCOV
633
  return 0;
×
634
}
635

UNCOV
636
int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) {
×
UNCOV
637
  TAOS_UDF_CHECK_PTR_RCODE(pUdf, udfName);
×
UNCOV
638
  SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
×
639
  if (NULL == udfNew) {
×
UNCOV
640
    return terrno;
×
641
  }
UNCOV
642
  udfNew->refCount = 1;
×
UNCOV
643
  udfNew->lastFetchTime = taosGetTimestampMs();
×
UNCOV
644
  tstrncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
×
645

UNCOV
646
  udfNew->state = UDF_STATE_INIT;
×
UNCOV
647
  if (uv_mutex_init(&udfNew->lock) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
UNCOV
648
  if (uv_cond_init(&udfNew->condReady) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
649

UNCOV
650
  udfNew->resident = false;
×
UNCOV
651
  udfNew->expired = false;
×
652
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
×
653
    char *funcName = taosArrayGet(global.residentFuncs, i);
×
654
    if (strcmp(udfName, funcName) == 0) {
×
655
      udfNew->resident = true;
×
UNCOV
656
      break;
×
657
    }
658
  }
UNCOV
659
  *pUdf =  udfNew;
×
UNCOV
660
  return 0;
×
661
}
662

663
void udfdFreeUdf(void *pData) {
×
664
  SUdf *pSudf = (SUdf *)pData;
×
665
  if (pSudf == NULL) {
×
UNCOV
666
    return;
×
667
  }
668

669
  if (pSudf->scriptPlugin != NULL) {
×
670
    if(pSudf->scriptPlugin->udfDestroyFunc(pSudf->scriptUdfCtx) != 0) {
×
UNCOV
671
      fnError("udfdFreeUdf: taosudf destroy udf %s failed", pSudf->name);
×
672
    }
673
  }
674

675
  uv_mutex_destroy(&pSudf->lock);
×
676
  uv_cond_destroy(&pSudf->condReady);
×
UNCOV
677
  taosMemoryFree(pSudf);
×
678
}
679

UNCOV
680
int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) {
×
UNCOV
681
  TAOS_UDF_CHECK_PTR_RCODE(ppUdf, udfName);
×
UNCOV
682
  uv_mutex_lock(&global.udfsMutex);
×
UNCOV
683
  SUdf  **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
×
UNCOV
684
  int64_t currTime = taosGetTimestampMs();
×
UNCOV
685
  bool    expired = false;
×
UNCOV
686
  if (pUdfHash) {
×
UNCOV
687
    expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000;  // 10s
×
UNCOV
688
    if (!expired) {
×
UNCOV
689
      ++(*pUdfHash)->refCount;
×
UNCOV
690
      *ppUdf = *pUdfHash;
×
UNCOV
691
      uv_mutex_unlock(&global.udfsMutex);
×
UNCOV
692
      fnInfo("taosudf reuse existing udf. udf  %s udf version %d, udf created time %" PRIx64, (*ppUdf)->name, (*ppUdf)->version,
×
693
             (*ppUdf)->createdTime);
UNCOV
694
      return 0;
×
695
    } else {
UNCOV
696
      (*pUdfHash)->expired = true;
×
UNCOV
697
      fnInfo("taosudf expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
×
698
             (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
699
      if(taosHashRemove(global.udfsHash, udfName, strlen(udfName)) != 0) {
×
UNCOV
700
        fnError("udfdGetOrCreateUdf: taosudf remove udf %s failed", udfName);
×
701
      }
702
    }
703
  }
704

UNCOV
705
  int32_t code = udfdNewUdf(ppUdf, udfName);
×
706
  if(code != 0) {
×
707
    uv_mutex_unlock(&global.udfsMutex);
×
UNCOV
708
    return code;
×
709
  }
710

711
  if ((code = taosHashPut(global.udfsHash, udfName, strlen(udfName), ppUdf, POINTER_BYTES)) != 0) {
×
712
    uv_mutex_unlock(&global.udfsMutex);
×
UNCOV
713
    return code;
×
714
  }
UNCOV
715
  uv_mutex_unlock(&global.udfsMutex);
×
716

UNCOV
717
  return 0;
×
718
}
719

UNCOV
720
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
×
UNCOV
721
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
×
722
  // TODO: tracable id from client. connect, setup, call, teardown
UNCOV
723
  fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
×
724

UNCOV
725
  SUdfSetupRequest *setup = &request->setup;
×
UNCOV
726
  int32_t           code = TSDB_CODE_SUCCESS;
×
UNCOV
727
  SUdf *udf = NULL;
×
728

UNCOV
729
  code = udfdGetOrCreateUdf(&udf, setup->udfName);
×
730
  if(code != 0) {
×
731
    fnError("udfdGetOrCreateUdf failed. udf name %s", setup->udfName);
×
UNCOV
732
    goto _send;
×
733
  }
UNCOV
734
  uv_mutex_lock(&udf->lock);
×
UNCOV
735
  if (udf->state == UDF_STATE_INIT) {
×
UNCOV
736
    udf->state = UDF_STATE_LOADING;
×
UNCOV
737
    code = udfdInitUdf(setup->udfName, udf);
×
UNCOV
738
    if (code == 0) {
×
UNCOV
739
      udf->state = UDF_STATE_READY;
×
740
    } else {
UNCOV
741
      udf->state = UDF_STATE_INIT;
×
742
    }
UNCOV
743
    uv_cond_broadcast(&udf->condReady);
×
UNCOV
744
    uv_mutex_unlock(&udf->lock);
×
745
  } else {
746
    while (udf->state == UDF_STATE_LOADING) {
×
UNCOV
747
      uv_cond_wait(&udf->condReady, &udf->lock);
×
748
    }
UNCOV
749
    uv_mutex_unlock(&udf->lock);
×
750
  }
UNCOV
751
  SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
×
752
  if(handle == NULL) {
×
753
    fnError("udfdProcessSetupRequest: malloc failed.");
×
UNCOV
754
    code = terrno;
×
755
  }
UNCOV
756
  handle->udf = udf;
×
757

UNCOV
758
_send:
×
759
  ;
760
  SUdfResponse rsp;
UNCOV
761
  rsp.seqNum = request->seqNum;
×
UNCOV
762
  rsp.type = request->type;
×
UNCOV
763
  rsp.code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
×
UNCOV
764
  rsp.setupRsp.udfHandle = (int64_t)(handle);
×
UNCOV
765
  rsp.setupRsp.outputType = udf->outputType;
×
UNCOV
766
  rsp.setupRsp.bytes = udf->outputLen;
×
UNCOV
767
  rsp.setupRsp.bufSize = udf->bufSize;
×
768

UNCOV
769
  int32_t len = encodeUdfResponse(NULL, &rsp);
×
770
  if(len < 0) {
×
771
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
UNCOV
772
    return;
×
773
  }
UNCOV
774
  rsp.msgLen = len;
×
UNCOV
775
  void *bufBegin = taosMemoryMalloc(len);
×
776
  if(bufBegin == NULL) {
×
777
    fnError("udfdProcessSetupRequest: malloc failed. len %d", len);
×
UNCOV
778
    return;
×
779
  }
UNCOV
780
  void *buf = bufBegin;
×
781
  if(encodeUdfResponse(&buf, &rsp) < 0) {
×
782
    fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
×
783
    taosMemoryFree(bufBegin);
×
UNCOV
784
    return;
×
785
  }
786
  
UNCOV
787
  uvUdf->output = uv_buf_init(bufBegin, len);
×
788

UNCOV
789
  taosMemoryFreeClear(uvUdf->input.base);
×
UNCOV
790
  return;
×
791
}
792

UNCOV
793
static int32_t checkUDFScalaResult(SSDataBlock *block, SUdfColumn *output) {
×
794
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
×
UNCOV
795
    return TSDB_CODE_SUCCESS;
×
796
  }
797
  if (output->colData.numOfRows != block->info.rows) {
×
798
    fnError("udf scala result num of rows %d not equal to input rows %" PRId64, output->colData.numOfRows, block->info.rows);
×
UNCOV
799
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
800
  }
801

UNCOV
802
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_BYROW) {
×
UNCOV
803
    for (int32_t i = 0; i < output->colData.numOfRows; ++i) {
×
UNCOV
804
      if (!udfColDataIsNull(output, i)) {
×
805
        if (IS_VAR_DATA_TYPE(output->colMeta.type)) {
×
806
          TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.payload != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
×
UNCOV
807
          TAOS_UDF_CHECK_CONDITION(output->colData.varLenCol.varOffsets[i] >= 0 &&
×
808
                                       output->colData.varLenCol.varOffsets[i] < output->colData.varLenCol.payloadLen,
809
                                   TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
810
        } else {
UNCOV
811
          TAOS_UDF_CHECK_CONDITION(
×
812
              output->colMeta.bytes * output->colData.numOfRows <= output->colData.fixLenCol.dataLen,
813
              TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
UNCOV
814
          break;
×
815
        }
816
      }
817
    }
818
  }
819

UNCOV
820
  return TSDB_CODE_SUCCESS;
×
821
}
822

UNCOV
823
static int32_t checkUDFAggResult(SSDataBlock *block, SUdfInterBuf *output) {
×
824
  if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
×
UNCOV
825
    return TSDB_CODE_SUCCESS;
×
826
  }
827
  if (output->numOfResult != 1 && output->numOfResult != 0) {
×
828
    fnError("udf agg result num of rows %d not equal to 1", output->numOfResult);
×
UNCOV
829
    return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
×
830
  }
UNCOV
831
  TAOS_UDF_CHECK_CONDITION(output->buf != NULL, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
×
UNCOV
832
  TAOS_UDF_CHECK_CONDITION(output->bufLen > 0, TSDB_CODE_UDF_FUNC_EXEC_FAILURE);
×
UNCOV
833
  return TSDB_CODE_SUCCESS;
×
834
}
835

UNCOV
836
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
×
UNCOV
837
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
×
UNCOV
838
  SUdfCallRequest *call = &request->call;
×
UNCOV
839
  fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle,
×
840
          request->seqNum);
UNCOV
841
  SUdfcFuncHandle  *handle = (SUdfcFuncHandle *)(call->udfHandle);
×
UNCOV
842
  SUdf             *udf = handle->udf;
×
UNCOV
843
  SUdfResponse      response = {0};
×
UNCOV
844
  SUdfResponse     *rsp = &response;
×
UNCOV
845
  SUdfCallResponse *subRsp = &rsp->callRsp;
×
846

UNCOV
847
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
848
  switch (call->callType) {
×
UNCOV
849
    case TSDB_UDF_CALL_SCALA_PROC: {
×
UNCOV
850
      SUdfColumn output = {0};
×
UNCOV
851
      output.colMeta.bytes = udf->outputLen;
×
UNCOV
852
      output.colMeta.type = udf->outputType;
×
UNCOV
853
      output.colMeta.precision = 0;
×
UNCOV
854
      output.colMeta.scale = 0;
×
UNCOV
855
      if (udfColEnsureCapacity(&output, call->block.info.rows) == TSDB_CODE_SUCCESS) {
×
UNCOV
856
        SUdfDataBlock input = {0};
×
UNCOV
857
        code = convertDataBlockToUdfDataBlock(&call->block, &input);
×
UNCOV
858
        if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
×
UNCOV
859
        freeUdfDataDataBlock(&input);
×
UNCOV
860
        if (code == TSDB_CODE_SUCCESS) code = checkUDFScalaResult(&call->block, &output);
×
UNCOV
861
        if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
×
862
      }
UNCOV
863
      freeUdfColumn(&output);
×
UNCOV
864
      break;
×
865
    }
UNCOV
866
    case TSDB_UDF_CALL_AGG_INIT: {
×
UNCOV
867
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
×
UNCOV
868
      if (outBuf.buf != NULL) {
×
UNCOV
869
        code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx);
×
870
      } else {
UNCOV
871
        code = terrno;
×
872
      }
UNCOV
873
      subRsp->resultBuf = outBuf;
×
UNCOV
874
      break;
×
875
    }
UNCOV
876
    case TSDB_UDF_CALL_AGG_PROC: {
×
UNCOV
877
      SUdfDataBlock input = {0};
×
UNCOV
878
      if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) {
×
UNCOV
879
        SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
×
UNCOV
880
        if (outBuf.buf != NULL) {
×
UNCOV
881
          code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx);
×
UNCOV
882
          freeUdfInterBuf(&call->interBuf);
×
UNCOV
883
          if (code == TSDB_CODE_SUCCESS) code = checkUDFAggResult(&call->block, &outBuf);
×
UNCOV
884
          subRsp->resultBuf = outBuf;
×
885
        } else {
UNCOV
886
          code = terrno;
×
887
        }
888
      }
UNCOV
889
      freeUdfDataDataBlock(&input);
×
890

UNCOV
891
      break;
×
892
    }
893
    // case TSDB_UDF_CALL_AGG_MERGE: {
894
    //   SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
895
    //   if (outBuf.buf != NULL) {
896
    //     code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
897
    //     freeUdfInterBuf(&call->interBuf);
898
    //     freeUdfInterBuf(&call->interBuf2);
899
    //     subRsp->resultBuf = outBuf;
900
    //   } else {
901
    //     code = terrno;
902
    //   }
903
    // 
904
    //   break;
905
    // }
UNCOV
906
    case TSDB_UDF_CALL_AGG_FIN: {
×
UNCOV
907
      SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
×
UNCOV
908
      if (outBuf.buf != NULL) {
×
UNCOV
909
        code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx);
×
UNCOV
910
        freeUdfInterBuf(&call->interBuf);
×
UNCOV
911
        subRsp->resultBuf = outBuf;
×
912
      } else {
UNCOV
913
        code = terrno;
×
914
      }
915

UNCOV
916
      break;
×
917
    }
918
    default:
×
UNCOV
919
      break;
×
920
  }
921

UNCOV
922
  rsp->seqNum = request->seqNum;
×
UNCOV
923
  rsp->type = request->type;
×
UNCOV
924
  rsp->code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
×
UNCOV
925
  subRsp->callType = call->callType;
×
926

UNCOV
927
  int32_t len = encodeUdfResponse(NULL, rsp);
×
928
  if(len < 0) {
×
929
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
UNCOV
930
    goto _exit;
×
931
  }
UNCOV
932
  rsp->msgLen = len;
×
UNCOV
933
  void *bufBegin = taosMemoryMalloc(len);
×
934
  if (bufBegin == NULL) {
×
935
    fnError("udfdProcessCallRequest: malloc failed. len %d", len);
×
UNCOV
936
    goto _exit;
×
937
  }
UNCOV
938
  void *buf = bufBegin;
×
939
  if(encodeUdfResponse(&buf, rsp) < 0) {
×
940
    fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
×
941
    taosMemoryFree(bufBegin);
×
UNCOV
942
    goto _exit;
×
943
  }
944

UNCOV
945
  uvUdf->output = uv_buf_init(bufBegin, len);
×
946

UNCOV
947
_exit:
×
UNCOV
948
  switch (call->callType) {
×
UNCOV
949
    case TSDB_UDF_CALL_SCALA_PROC: {
×
UNCOV
950
      blockDataFreeRes(&call->block);
×
UNCOV
951
      blockDataFreeRes(&subRsp->resultData);
×
UNCOV
952
      break;
×
953
    }
UNCOV
954
    case TSDB_UDF_CALL_AGG_INIT: {
×
UNCOV
955
      freeUdfInterBuf(&subRsp->resultBuf);
×
UNCOV
956
      break;
×
957
    }
UNCOV
958
    case TSDB_UDF_CALL_AGG_PROC: {
×
UNCOV
959
      blockDataFreeRes(&call->block);
×
960
      freeUdfInterBuf(&subRsp->resultBuf);
×
UNCOV
961
      break;
×
962
    }
963
    // case TSDB_UDF_CALL_AGG_MERGE: {
964
    //   freeUdfInterBuf(&subRsp->resultBuf);
965
    //   break;
966
    // }
UNCOV
967
    case TSDB_UDF_CALL_AGG_FIN: {
×
UNCOV
968
      freeUdfInterBuf(&subRsp->resultBuf);
×
UNCOV
969
      break;
×
970
    }
971
    default:
×
UNCOV
972
      break;
×
973
  }
974

UNCOV
975
  taosMemoryFreeClear(uvUdf->input.base);
×
UNCOV
976
  return;
×
977
}
978

UNCOV
979
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
×
UNCOV
980
  TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
×
UNCOV
981
  SUdfTeardownRequest *teardown = &request->teardown;
×
UNCOV
982
  fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
×
UNCOV
983
  SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
×
UNCOV
984
  SUdf            *udf = handle->udf;
×
UNCOV
985
  bool             unloadUdf = false;
×
UNCOV
986
  int32_t          code = TSDB_CODE_SUCCESS;
×
987

UNCOV
988
  uv_mutex_lock(&global.udfsMutex);
×
UNCOV
989
  udf->refCount--;
×
UNCOV
990
  if (udf->refCount == 0 && (!udf->resident || udf->expired)) {
×
UNCOV
991
    unloadUdf = true;
×
UNCOV
992
    code = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
×
993
    if (code != 0) {
×
994
      fnError("udf name %s remove from hash failed, err:%0x %s", udf->name, code, tstrerror(code));
×
995
      uv_mutex_unlock(&global.udfsMutex);
×
UNCOV
996
      goto _send;
×
997
    }
998
  }
UNCOV
999
  uv_mutex_unlock(&global.udfsMutex);
×
UNCOV
1000
  if (unloadUdf) {
×
UNCOV
1001
    fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx));
×
UNCOV
1002
    uv_cond_destroy(&udf->condReady);
×
UNCOV
1003
    uv_mutex_destroy(&udf->lock);
×
UNCOV
1004
    code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
×
UNCOV
1005
    fnDebug("taosudf destroy function returns %d", code);
×
UNCOV
1006
    taosMemoryFree(udf);
×
1007
  }
1008

UNCOV
1009
_send:
×
UNCOV
1010
  taosMemoryFree(handle);
×
UNCOV
1011
  SUdfResponse  response = {0};
×
UNCOV
1012
  SUdfResponse *rsp = &response;
×
UNCOV
1013
  rsp->seqNum = request->seqNum;
×
UNCOV
1014
  rsp->type = request->type;
×
UNCOV
1015
  rsp->code = code;
×
UNCOV
1016
  int32_t len = encodeUdfResponse(NULL, rsp);
×
1017
  if (len < 0) {
×
1018
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
UNCOV
1019
    return;
×
1020
  }
UNCOV
1021
  rsp->msgLen = len;
×
UNCOV
1022
  void *bufBegin = taosMemoryMalloc(len);
×
1023
  if(bufBegin == NULL) {
×
1024
    fnError("udfdProcessTeardownRequest: malloc failed. len %d", len);
×
UNCOV
1025
    return;
×
1026
  }
UNCOV
1027
  void *buf = bufBegin;
×
1028
  if (encodeUdfResponse(&buf, rsp) < 0) {
×
1029
    fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
×
1030
    taosMemoryFree(bufBegin);
×
UNCOV
1031
    return;
×
1032
  }
UNCOV
1033
  uvUdf->output = uv_buf_init(bufBegin, len);
×
1034

UNCOV
1035
  taosMemoryFree(uvUdf->input.base);
×
UNCOV
1036
  return;
×
1037
}
1038

UNCOV
1039
void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
×
UNCOV
1040
  TAOS_UDF_CHECK_PTR_RVOID(udf, path);
×
UNCOV
1041
  if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
×
1042
#ifdef WINDOWS
1043
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
1044
#else
UNCOV
1045
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version,
×
UNCOV
1046
             udf->createdTime);
×
1047
#endif
UNCOV
1048
  } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
×
1049
#ifdef WINDOWS
1050
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
1051
#else
UNCOV
1052
    snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime);
×
1053
#endif
1054
  } else {
1055
#ifdef WINDOWS
1056
    snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
1057
#else
UNCOV
1058
    snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime);
×
1059
#endif
1060
  }
1061
}
1062

UNCOV
1063
int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
×
UNCOV
1064
  TAOS_UDF_CHECK_PTR_RCODE(pFuncInfo, udf);
×
1065
  if (!osDataSpaceAvailable()) {
×
1066
    terrno = TSDB_CODE_NO_DISKSPACE;
×
1067
    fnError("taosudf create shared library failed since %s", terrstr());
×
UNCOV
1068
    return terrno;
×
1069
  }
1070

UNCOV
1071
  char path[PATH_MAX] = {0};
×
UNCOV
1072
  udfdGetFuncBodyPath(udf, path);
×
UNCOV
1073
  bool fileExist = !(taosStatFile(path, NULL, NULL, NULL) < 0);
×
UNCOV
1074
  if (fileExist) {
×
UNCOV
1075
    tstrncpy(udf->path, path, PATH_MAX);
×
UNCOV
1076
    fnInfo("taosudf func body file. reuse existing file %s", path);
×
UNCOV
1077
    return TSDB_CODE_SUCCESS;
×
1078
  }
1079

UNCOV
1080
  TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
×
1081
  if (file == NULL) {
×
1082
    fnError("taosudf write udf shared library: %s failed, error: %d %s", path, ERRNO, strerror(ERRNO));
×
UNCOV
1083
    return TSDB_CODE_FILE_CORRUPTED;
×
1084
  }
UNCOV
1085
  int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
×
1086
  if (count != pFuncInfo->codeSize) {
×
1087
    fnError("taosudf write udf shared library failed");
×
UNCOV
1088
    return TSDB_CODE_FILE_CORRUPTED;
×
1089
  }
1090
  if(taosCloseFile(&file) != 0) {
×
1091
    fnError("udfdSaveFuncBodyToFile, taosudf close file failed");
×
UNCOV
1092
    return TSDB_CODE_FILE_CORRUPTED;
×
1093
  }
1094

UNCOV
1095
  tstrncpy(udf->path, path, PATH_MAX);
×
UNCOV
1096
  return TSDB_CODE_SUCCESS;
×
1097
}
1098

UNCOV
1099
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
×
UNCOV
1100
  TAOS_UDF_CHECK_PTR_RVOID(parent, pMsg);
×
UNCOV
1101
  SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
×
1102

1103
  if (pEpSet) {
×
1104
    if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
×
UNCOV
1105
      updateEpSet_s(&global.mgmtEp, pEpSet);
×
1106
    }
1107
  }
1108

1109
  if (pMsg->code != TSDB_CODE_SUCCESS) {
×
1110
    fnError("taosudf rpc error, code:%s", tstrerror(pMsg->code));
×
1111
    msgInfo->code = pMsg->code;
×
UNCOV
1112
    goto _return;
×
1113
  }
1114

1115
  if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
×
1116
    SConnectRsp connectRsp = {0};
×
1117
    if(tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp) < 0){
×
1118
      fnError("taosudf deserialize connect response failed");
×
UNCOV
1119
      goto _return;
×
1120
    }
1121

1122
    int32_t now = taosGetTimestampSec();
×
1123
    int32_t delta = abs(now - connectRsp.svrTimestamp);
×
1124
    if (delta > 900) {
×
1125
      msgInfo->code = TSDB_CODE_TIME_UNSYNCED;
×
UNCOV
1126
      goto _return;
×
1127
    }
1128

1129
    if (connectRsp.epSet.numOfEps == 0) {
×
1130
      msgInfo->code = TSDB_CODE_APP_ERROR;
×
UNCOV
1131
      goto _return;
×
1132
    }
1133

1134
    if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
×
UNCOV
1135
      updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
×
1136
    }
UNCOV
1137
    msgInfo->code = 0;
×
UNCOV
1138
  } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
×
UNCOV
1139
    SRetrieveFuncRsp retrieveRsp = {0};
×
1140
    if(tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp) < 0){
×
1141
      fnError("taosudf deserialize retrieve func response failed");
×
UNCOV
1142
      goto _return;
×
1143
    }
1144

UNCOV
1145
    SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
×
UNCOV
1146
    SUdf      *udf = msgInfo->param;
×
UNCOV
1147
    udf->funcType = pFuncInfo->funcType;
×
UNCOV
1148
    udf->scriptType = pFuncInfo->scriptType;
×
UNCOV
1149
    udf->outputType = pFuncInfo->outputType;
×
UNCOV
1150
    udf->outputLen = pFuncInfo->outputLen;
×
UNCOV
1151
    udf->bufSize = pFuncInfo->bufSize;
×
1152

UNCOV
1153
    SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0);
×
UNCOV
1154
    udf->version = pFuncExtraInfo->funcVersion;
×
UNCOV
1155
    udf->createdTime = pFuncExtraInfo->funcCreatedTime;
×
UNCOV
1156
    msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf);
×
1157
    if (msgInfo->code != 0) {
×
UNCOV
1158
      udf->lastFetchTime = 0;
×
1159
    }
UNCOV
1160
    tFreeSFuncInfo(pFuncInfo);
×
UNCOV
1161
    taosArrayDestroy(retrieveRsp.pFuncInfos);
×
UNCOV
1162
    taosArrayDestroy(retrieveRsp.pFuncExtraInfos);
×
1163
  }
1164

UNCOV
1165
_return:
×
UNCOV
1166
  rpcFreeCont(pMsg->pCont);
×
UNCOV
1167
  uv_sem_post(&msgInfo->resultSem);
×
UNCOV
1168
  return;
×
1169
}
1170

UNCOV
1171
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
×
UNCOV
1172
  TAOS_UDF_CHECK_PTR_RCODE(clientRpc, udfName, udf);
×
UNCOV
1173
  SRetrieveFuncReq retrieveReq = {0};
×
UNCOV
1174
  retrieveReq.numOfFuncs = 1;
×
UNCOV
1175
  retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
×
1176
  if(taosArrayPush(retrieveReq.pFuncNames, udfName) == NULL) {
×
1177
    taosArrayDestroy(retrieveReq.pFuncNames);
×
UNCOV
1178
    return terrno;
×
1179
  }
1180

UNCOV
1181
  int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
×
1182
  if(contLen < 0) {
×
1183
    taosArrayDestroy(retrieveReq.pFuncNames);
×
UNCOV
1184
    return terrno;
×
1185
  }
UNCOV
1186
  void   *pReq = rpcMallocCont(contLen);
×
1187
  if(tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq)  < 0) {
×
1188
    taosArrayDestroy(retrieveReq.pFuncNames);
×
1189
    rpcFreeCont(pReq);
×
UNCOV
1190
    return terrno;
×
1191
  }
UNCOV
1192
  taosArrayDestroy(retrieveReq.pFuncNames);
×
1193

UNCOV
1194
  SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
×
1195
  if(NULL == msgInfo) {
×
UNCOV
1196
    return terrno;
×
1197
  }
UNCOV
1198
  msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
×
UNCOV
1199
  msgInfo->param = udf;
×
1200
  if(uv_sem_init(&msgInfo->resultSem, 0)  != 0) {
×
1201
    taosMemoryFree(msgInfo);
×
UNCOV
1202
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1203
  }
1204

UNCOV
1205
  SRpcMsg rpcMsg = {0};
×
UNCOV
1206
  rpcMsg.pCont = pReq;
×
UNCOV
1207
  rpcMsg.contLen = contLen;
×
UNCOV
1208
  rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
×
UNCOV
1209
  rpcMsg.info.ahandle = msgInfo;
×
UNCOV
1210
  int32_t code = rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
×
UNCOV
1211
  if (code == 0) {
×
UNCOV
1212
    uv_sem_wait(&msgInfo->resultSem);
×
UNCOV
1213
    uv_sem_destroy(&msgInfo->resultSem);
×
UNCOV
1214
    code = msgInfo->code;
×
1215
  }
UNCOV
1216
  taosMemoryFree(msgInfo);
×
UNCOV
1217
  return code;
×
1218
}
1219

1220
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
×
1221
  if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
×
1222
      code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
×
1223
      code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
×
1224
    if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
×
1225
        msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_TASK_NOTIFY) {
×
UNCOV
1226
      return false;
×
1227
    }
UNCOV
1228
    return true;
×
1229
  } else {
UNCOV
1230
    return false;
×
1231
  }
1232
}
1233

UNCOV
1234
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
×
UNCOV
1235
  pEpSet->version = 0;
×
1236

1237
  // init mnode ip set
UNCOV
1238
  SEpSet *mgmtEpSet = &(pEpSet->epSet);
×
UNCOV
1239
  mgmtEpSet->numOfEps = 0;
×
UNCOV
1240
  mgmtEpSet->inUse = 0;
×
1241

UNCOV
1242
  if (firstEp && firstEp[0] != 0) {
×
1243
    if (strlen(firstEp) >= TSDB_EP_LEN) {
×
1244
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1245
      return -1;
×
1246
    }
1247

UNCOV
1248
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
×
1249
    if (code != TSDB_CODE_SUCCESS) {
×
1250
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1251
      return terrno;
×
1252
    }
1253

UNCOV
1254
    mgmtEpSet->numOfEps++;
×
1255
  }
1256

UNCOV
1257
  if (secondEp && secondEp[0] != 0) {
×
1258
    if (strlen(secondEp) >= TSDB_EP_LEN) {
×
1259
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1260
      return -1;
×
1261
    }
1262

UNCOV
1263
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
×
1264
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1265
      fnError("invalid ep %s", secondEp);
×
1266
    } else {
UNCOV
1267
      mgmtEpSet->numOfEps++;
×
1268
    }
1269
  }
1270

1271
  if (mgmtEpSet->numOfEps == 0) {
×
1272
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1273
    return -1;
×
1274
  }
1275

UNCOV
1276
  return 0;
×
1277
}
1278

UNCOV
1279
int32_t udfdOpenClientRpc() {
×
UNCOV
1280
  SRpcInit rpcInit = {0};
×
UNCOV
1281
  rpcInit.label = "UDFD";
×
UNCOV
1282
  rpcInit.numOfThreads = 1;
×
UNCOV
1283
  rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
×
UNCOV
1284
  rpcInit.sessions = 1024;
×
UNCOV
1285
  rpcInit.connType = TAOS_CONN_CLIENT;
×
UNCOV
1286
  rpcInit.idleTime = tsShellActivityTimer * 1000;
×
UNCOV
1287
  rpcInit.user = TSDB_DEFAULT_USER;
×
UNCOV
1288
  rpcInit.parent = &global;
×
UNCOV
1289
  rpcInit.rfp = udfdRpcRfp;
×
UNCOV
1290
  rpcInit.compressSize = tsCompressMsgSize;
×
1291

UNCOV
1292
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
×
UNCOV
1293
  connLimitNum = TMAX(connLimitNum, 10);
×
UNCOV
1294
  connLimitNum = TMIN(connLimitNum, 500);
×
UNCOV
1295
  rpcInit.connLimitNum = connLimitNum;
×
UNCOV
1296
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
×
UNCOV
1297
  TAOS_CHECK_RETURN(taosVersionStrToInt(td_version, &rpcInit.compatibilityVer));
×
UNCOV
1298
  global.clientRpc = rpcOpen(&rpcInit);
×
1299
  if (global.clientRpc == NULL) {
×
1300
    fnError("failed to init dnode rpc client");
×
UNCOV
1301
    return terrno;
×
1302
  }
UNCOV
1303
  return 0;
×
1304
}
1305

UNCOV
1306
void udfdCloseClientRpc() {
×
UNCOV
1307
  fnInfo("taosudf begin closing rpc");
×
UNCOV
1308
  rpcClose(global.clientRpc);
×
UNCOV
1309
  fnInfo("taosudf finish closing rpc");
×
UNCOV
1310
}
×
1311

UNCOV
1312
void udfdOnWrite(uv_write_t *req, int status) {
×
UNCOV
1313
  TAOS_UDF_CHECK_PTR_RVOID(req);
×
UNCOV
1314
  SUvUdfWork *work = (SUvUdfWork *)req->data;
×
1315
  if (status < 0) {
×
UNCOV
1316
    fnError("taosudf send response error, length:%zu code:%s", work->output.len, uv_err_name(status));
×
1317
  }
1318
  // remove work from the connection work list
UNCOV
1319
  if (work->conn != NULL) {
×
1320
    SUvUdfWork **ppWork;
UNCOV
1321
    for (ppWork = &work->conn->pWorkList; *ppWork && (*ppWork != work); ppWork = &((*ppWork)->pWorkNext)) {
×
1322
    }
UNCOV
1323
    if (*ppWork == work) {
×
UNCOV
1324
      *ppWork = work->pWorkNext;
×
1325
    } else {
UNCOV
1326
      fnError("work not in conn any more");
×
1327
    }
1328
  }
UNCOV
1329
  taosMemoryFree(work->output.base);
×
UNCOV
1330
  taosMemoryFree(work);
×
UNCOV
1331
  taosMemoryFree(req);
×
1332
}
1333

UNCOV
1334
void udfdSendResponse(uv_work_t *work, int status) {
×
UNCOV
1335
  TAOS_UDF_CHECK_PTR_RVOID(work);
×
UNCOV
1336
  SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
×
1337

UNCOV
1338
  if (udfWork->conn != NULL) {
×
UNCOV
1339
    uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
×
1340
    if(write_req == NULL) {
×
1341
      fnError("taosudf send response error, malloc failed");
×
1342
      taosMemoryFree(work);
×
UNCOV
1343
      return;
×
1344
    }
UNCOV
1345
    write_req->data = udfWork;
×
UNCOV
1346
    int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
×
1347
    if (code != 0) {
×
1348
      fnError("taosudf send response error %s", uv_strerror(code));
×
UNCOV
1349
      taosMemoryFree(write_req);
×
1350
   }
1351
  }
UNCOV
1352
  taosMemoryFree(work);
×
1353
}
1354

UNCOV
1355
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
×
UNCOV
1356
  TAOS_UDF_CHECK_PTR_RVOID(handle, buf);
×
UNCOV
1357
  SUdfdUvConn *ctx = handle->data;
×
UNCOV
1358
  int32_t      msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
×
UNCOV
1359
  if (ctx->inputCap == 0) {
×
UNCOV
1360
    ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
×
UNCOV
1361
    if (ctx->inputBuf) {
×
UNCOV
1362
      ctx->inputLen = 0;
×
UNCOV
1363
      ctx->inputCap = msgHeadSize;
×
UNCOV
1364
      ctx->inputTotal = -1;
×
1365

UNCOV
1366
      buf->base = ctx->inputBuf;
×
UNCOV
1367
      buf->len = ctx->inputCap;
×
1368
    } else {
1369
      fnError("taosudf can not allocate enough memory") buf->base = NULL;
×
UNCOV
1370
      buf->len = 0;
×
1371
    }
UNCOV
1372
  } else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) {
×
UNCOV
1373
    buf->base = ctx->inputBuf + ctx->inputLen;
×
UNCOV
1374
    buf->len = msgHeadSize - ctx->inputLen;
×
1375
  } else {
UNCOV
1376
    ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
×
UNCOV
1377
    void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
×
UNCOV
1378
    if (inputBuf) {
×
UNCOV
1379
      ctx->inputBuf = inputBuf;
×
UNCOV
1380
      buf->base = ctx->inputBuf + ctx->inputLen;
×
UNCOV
1381
      buf->len = ctx->inputCap - ctx->inputLen;
×
1382
    } else {
1383
      fnError("taosudf can not allocate enough memory") buf->base = NULL;
×
UNCOV
1384
      buf->len = 0;
×
1385
    }
1386
  }
1387
}
1388

UNCOV
1389
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
×
1390
  if (pipe == NULL) {
×
1391
    fnError("taosudf pipe is NULL, LINE:%d", __LINE__);
×
UNCOV
1392
    return false;
×
1393
  }
UNCOV
1394
  if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
×
UNCOV
1395
    pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
×
1396
  }
UNCOV
1397
  if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
×
UNCOV
1398
    fnDebug("receive request complete. length %d", pipe->inputLen);
×
UNCOV
1399
    return true;
×
1400
  }
UNCOV
1401
  return false;
×
1402
}
1403

UNCOV
1404
void udfdHandleRequest(SUdfdUvConn *conn) {
×
UNCOV
1405
  TAOS_UDF_CHECK_PTR_RVOID(conn);
×
UNCOV
1406
  char   *inputBuf = conn->inputBuf;
×
UNCOV
1407
  int32_t inputLen = conn->inputLen;
×
1408

UNCOV
1409
  uv_work_t  *work = taosMemoryMalloc(sizeof(uv_work_t));
×
1410
  if(work == NULL) {
×
1411
    fnError("taosudf malloc work failed");
×
UNCOV
1412
    return;
×
1413
  }
UNCOV
1414
  SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
×
1415
  if(udfWork == NULL) {
×
1416
    fnError("taosudf malloc udf work failed");
×
1417
    taosMemoryFree(work);
×
UNCOV
1418
    return;
×
1419
  }
UNCOV
1420
  udfWork->conn = conn;
×
UNCOV
1421
  udfWork->pWorkNext = conn->pWorkList;
×
UNCOV
1422
  conn->pWorkList = udfWork;
×
UNCOV
1423
  udfWork->input = uv_buf_init(inputBuf, inputLen);
×
UNCOV
1424
  conn->inputBuf = NULL;
×
UNCOV
1425
  conn->inputLen = 0;
×
UNCOV
1426
  conn->inputCap = 0;
×
UNCOV
1427
  conn->inputTotal = -1;
×
UNCOV
1428
  work->data = udfWork;
×
UNCOV
1429
  if(uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse) != 0)
×
1430
  {
1431
    fnError("taosudf queue work failed");
×
1432
    taosMemoryFree(work);
×
UNCOV
1433
    taosMemoryFree(udfWork);
×
1434
  }
1435
}
1436

UNCOV
1437
void udfdPipeCloseCb(uv_handle_t *pipe) {
×
UNCOV
1438
  TAOS_UDF_CHECK_PTR_RVOID(pipe);
×
UNCOV
1439
  SUdfdUvConn *conn = pipe->data;
×
UNCOV
1440
  SUvUdfWork  *pWork = conn->pWorkList;
×
1441
  while (pWork != NULL) {
×
1442
    pWork->conn = NULL;
×
UNCOV
1443
    pWork = pWork->pWorkNext;
×
1444
  }
1445

UNCOV
1446
  taosMemoryFree(conn->client);
×
UNCOV
1447
  taosMemoryFree(conn->inputBuf);
×
UNCOV
1448
  taosMemoryFree(conn);
×
1449
}
1450

UNCOV
1451
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
×
UNCOV
1452
  TAOS_UDF_CHECK_PTR_RVOID(client, buf);
×
UNCOV
1453
  fnDebug("taosudf read %zd bytes from client", nread);
×
UNCOV
1454
  if (nread == 0) return;
×
1455

UNCOV
1456
  SUdfdUvConn *conn = client->data;
×
1457

UNCOV
1458
  if (nread > 0) {
×
UNCOV
1459
    conn->inputLen += nread;
×
UNCOV
1460
    if (isUdfdUvMsgComplete(conn)) {
×
UNCOV
1461
      udfdHandleRequest(conn);
×
1462
    } else {
1463
      // log error or continue;
1464
    }
UNCOV
1465
    return;
×
1466
  }
1467

UNCOV
1468
  if (nread < 0) {
×
UNCOV
1469
    if (nread == UV_EOF) {
×
UNCOV
1470
      fnInfo("taosudf pipe read EOF");
×
1471
    } else {
UNCOV
1472
      fnError("Receive error %s", uv_err_name(nread));
×
1473
    }
UNCOV
1474
    udfdUvHandleError(conn);
×
1475
  }
1476
}
1477

UNCOV
1478
void udfdOnNewConnection(uv_stream_t *server, int status) {
×
UNCOV
1479
  TAOS_UDF_CHECK_PTR_RVOID(server);
×
1480
  if (status < 0) {
×
1481
    fnError("taosudf new connection error, code:%s", uv_strerror(status));
×
UNCOV
1482
    return;
×
1483
  }
UNCOV
1484
  int32_t code = 0;
×
1485

UNCOV
1486
  uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
×
1487
  if(client == NULL) {
×
1488
    fnError("taosudf pipe malloc failed");
×
UNCOV
1489
    return;
×
1490
  }
UNCOV
1491
  code = uv_pipe_init(global.loop, client, 0);
×
1492
  if (code) {
×
1493
    fnError("taosudf pipe init error %s", uv_strerror(code));
×
1494
    taosMemoryFree(client);
×
UNCOV
1495
    return;
×
1496
  }
UNCOV
1497
  if (uv_accept(server, (uv_stream_t *)client) == 0) {
×
UNCOV
1498
    SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
×
1499
    if(ctx == NULL) {
×
1500
      fnError("taosudf conn malloc failed");
×
UNCOV
1501
      goto _exit;
×
1502
    }
UNCOV
1503
    ctx->pWorkList = NULL;
×
UNCOV
1504
    ctx->client = (uv_stream_t *)client;
×
UNCOV
1505
    ctx->inputBuf = 0;
×
UNCOV
1506
    ctx->inputLen = 0;
×
UNCOV
1507
    ctx->inputCap = 0;
×
UNCOV
1508
    client->data = ctx;
×
UNCOV
1509
    ctx->client = (uv_stream_t *)client;
×
UNCOV
1510
    code = uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
×
1511
    if (code) {
×
1512
      fnError("taosudf read start error %s", uv_strerror(code));
×
1513
      udfdUvHandleError(ctx);
×
1514
      taosMemoryFree(ctx);
×
UNCOV
1515
      taosMemoryFree(client);
×
1516
    }
UNCOV
1517
    return;
×
1518
  }
1519
_exit:
×
1520
    uv_close((uv_handle_t *)client, NULL);
×
UNCOV
1521
    taosMemoryFree(client);
×
1522
}
1523

1524
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
×
1525
  TAOS_UDF_CHECK_PTR_RVOID(handle);
×
UNCOV
1526
  fnInfo("taosudf signal received: %d\n", signum);
×
1527
  uv_fs_t req;
1528
  int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
×
1529
  if(code) {
×
UNCOV
1530
    fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(code), __LINE__);
×
1531
  }
1532
  code = uv_signal_stop(handle);
×
1533
  if(code) {
×
UNCOV
1534
    fnError("stop signal handler failed, reason:%s", uv_strerror(code));
×
1535
  }
UNCOV
1536
  uv_stop(global.loop);
×
1537
}
1538

UNCOV
1539
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
×
UNCOV
1540
  for (int32_t i = 1; i < argc; ++i) {
×
UNCOV
1541
    if (strcmp(argv[i], "-c") == 0) {
×
UNCOV
1542
      if (i < argc - 1) {
×
1543
        if (strlen(argv[++i]) >= PATH_MAX) {
×
1544
          (void)printf("config file path overflow");
×
UNCOV
1545
          return -1;
×
1546
        }
UNCOV
1547
        tstrncpy(configDir, argv[i], PATH_MAX);
×
1548
      } else {
1549
        (void)printf("'-c' requires a parameter, default is %s\n", configDir);
×
UNCOV
1550
        return -1;
×
1551
      }
1552
    } else if (strcmp(argv[i], "-V") == 0) {
×
UNCOV
1553
      global.printVersion = true;
×
1554
    } else {
1555
    }
1556
  }
1557

UNCOV
1558
  return 0;
×
1559
}
1560

1561
static void udfdPrintVersion() {
×
1562
  (void)printf("taosudf version: %s compatible_version: %s\n", td_version, td_compatible_version);
×
1563
  (void)printf("git: %s\n", td_gitinfo);
×
1564
  (void)printf("build: %s\n", td_buildinfo);
×
UNCOV
1565
}
×
1566

UNCOV
1567
static int32_t udfdInitLog() {
×
UNCOV
1568
  const char *logName = "udfdlog";
×
UNCOV
1569
  TAOS_CHECK_RETURN(taosInitLogOutput(&logName));
×
UNCOV
1570
  return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
×
1571
}
1572

UNCOV
1573
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
×
UNCOV
1574
  TAOS_UDF_CHECK_PTR_RVOID(buf);
×
UNCOV
1575
  buf->base = taosMemoryMalloc(suggested_size);
×
1576
  if (buf->base == NULL) {
×
1577
    fnError("taosudf ctrl pipe alloc buffer failed");
×
UNCOV
1578
    return;
×
1579
  }
UNCOV
1580
  buf->len = suggested_size;
×
1581
}
1582

UNCOV
1583
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
×
UNCOV
1584
  TAOS_UDF_CHECK_PTR_RVOID(q, buf);
×
UNCOV
1585
  if (nread < 0) {
×
UNCOV
1586
    fnError("taosudf ctrl pipe read error. %s", uv_err_name(nread));
×
UNCOV
1587
    taosMemoryFree(buf->base);
×
UNCOV
1588
    uv_close((uv_handle_t *)q, NULL);
×
UNCOV
1589
    uv_stop(global.loop);
×
UNCOV
1590
    return;
×
1591
  }
1592
  fnError("taosudf ctrl pipe read %zu bytes", nread);
×
UNCOV
1593
  taosMemoryFree(buf->base);
×
1594
}
1595

UNCOV
1596
static void removeListeningPipe() {
×
1597
  uv_fs_t req;
UNCOV
1598
  int     err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
×
UNCOV
1599
  uv_fs_req_cleanup(&req);
×
UNCOV
1600
  if(err) {
×
UNCOV
1601
    fnInfo("remove listening pipe %s : %s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__);
×
1602
  }
UNCOV
1603
}
×
1604

UNCOV
1605
static int32_t udfdUvInit() {
×
UNCOV
1606
  TAOS_CHECK_RETURN(uv_loop_init(global.loop));
×
1607

UNCOV
1608
  if (tsStartUdfd) {  // taosudf is started by taosd, which shall exit when taosd exit
×
UNCOV
1609
    TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1));
×
UNCOV
1610
    TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0));
×
UNCOV
1611
    TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb));
×
1612
  }
UNCOV
1613
  getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
×
1614

UNCOV
1615
  removeListeningPipe();
×
1616

UNCOV
1617
  TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.listeningPipe, 0));
×
1618

UNCOV
1619
  TAOS_CHECK_RETURN(uv_signal_init(global.loop, &global.intrSignal));
×
UNCOV
1620
  TAOS_CHECK_RETURN(uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT));
×
1621

1622
  int r;
UNCOV
1623
  fnInfo("bind to pipe %s", global.listenPipeName);
×
1624
  if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
×
1625
    fnError("Bind error %s", uv_err_name(r));
×
1626
    removeListeningPipe();
×
UNCOV
1627
    return -2;
×
1628
  }
1629
  if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
×
1630
    fnError("Listen error %s", uv_err_name(r));
×
1631
    removeListeningPipe();
×
UNCOV
1632
    return -3;
×
1633
  }
UNCOV
1634
  return 0;
×
1635
}
1636

UNCOV
1637
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
×
UNCOV
1638
  if (!uv_is_closing(handle)) {
×
UNCOV
1639
    uv_close(handle, NULL);
×
1640
  }
UNCOV
1641
}
×
1642

UNCOV
1643
static int32_t udfdGlobalDataInit() {
×
UNCOV
1644
  uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
×
1645
  if (loop == NULL) {
×
1646
    fnError("taosudf init uv loop failed, mem overflow");
×
UNCOV
1647
    return terrno;
×
1648
  }
UNCOV
1649
  global.loop = loop;
×
1650

1651
  if (uv_mutex_init(&global.scriptPluginsMutex) != 0) {
×
1652
    fnError("taosudf init script plugins mutex failed");
×
UNCOV
1653
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1654
  }
1655

UNCOV
1656
  global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
×
1657
  if (global.udfsHash == NULL) {
×
UNCOV
1658
    return terrno;
×
1659
  }
1660
  // taosHashSetFreeFp(global.udfsHash, udfdFreeUdf);
1661

1662
  if (uv_mutex_init(&global.udfsMutex) != 0) {
×
1663
    fnError("taosudf init udfs mutex failed");
×
UNCOV
1664
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1665
  }
1666

UNCOV
1667
  return 0;
×
1668
}
1669

UNCOV
1670
static void udfdGlobalDataDeinit() {
×
UNCOV
1671
  uv_mutex_destroy(&global.udfsMutex);
×
UNCOV
1672
  uv_mutex_destroy(&global.scriptPluginsMutex);
×
UNCOV
1673
  taosMemoryFreeClear(global.loop);
×
UNCOV
1674
  fnInfo("taosudf global data deinit");
×
UNCOV
1675
}
×
1676

UNCOV
1677
static void udfdRun() {
×
UNCOV
1678
  fnInfo("start taosudf event loop");
×
UNCOV
1679
  int32_t code = uv_run(global.loop, UV_RUN_DEFAULT);
×
UNCOV
1680
  if(code != 0) {
×
UNCOV
1681
    fnError("taosudf event loop still has active handles or requests.");
×
1682
  }
UNCOV
1683
  fnInfo("taosudf event loop stopped.");
×
1684

UNCOV
1685
  (void)uv_loop_close(global.loop);
×
1686

UNCOV
1687
  uv_walk(global.loop, udfdCloseWalkCb, NULL);
×
UNCOV
1688
  code = uv_run(global.loop, UV_RUN_DEFAULT);
×
1689
  if(code != 0) {
×
UNCOV
1690
    fnError("taosudf event loop still has active handles or requests.");
×
1691
  }
UNCOV
1692
  (void)uv_loop_close(global.loop);
×
UNCOV
1693
}
×
1694

UNCOV
1695
int32_t udfdInitResidentFuncs() {
×
UNCOV
1696
  if (strlen(tsUdfdResFuncs) == 0) {
×
UNCOV
1697
    return TSDB_CODE_SUCCESS;
×
1698
  }
1699

1700
  global.residentFuncs = taosArrayInit(2, TSDB_FUNC_NAME_LEN);
×
UNCOV
1701
  char *pSave = tsUdfdResFuncs;
×
1702
  char *token;
1703
  while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
×
1704
    char func[TSDB_FUNC_NAME_LEN + 1] = {0};
×
1705
    tstrncpy(func, token, TSDB_FUNC_NAME_LEN);
×
1706
    fnInfo("taosudf add resident function %s", func);
×
UNCOV
1707
    if(taosArrayPush(global.residentFuncs, func) == NULL)
×
1708
    {
1709
      taosArrayDestroy(global.residentFuncs);
×
UNCOV
1710
      return terrno;
×
1711
    }
1712
  }
1713

UNCOV
1714
  return TSDB_CODE_SUCCESS;
×
1715
}
1716

UNCOV
1717
void udfdDeinitResidentFuncs() {
×
1718
  for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
×
1719
    char  *funcName = taosArrayGet(global.residentFuncs, i);
×
1720
    SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
×
1721
    if (udfInHash) {
×
1722
      SUdf   *udf = *udfInHash;
×
1723
      int32_t code = 0;
×
1724
      if (udf->scriptPlugin->udfDestroyFunc) {
×
1725
        code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
×
UNCOV
1726
        fnDebug("taosudf %s destroy function returns %d", funcName, code);
×
1727
      }
UNCOV
1728
      if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0)
×
1729
      {
UNCOV
1730
        fnError("taosudf remove resident function %s failed", funcName);
×
1731
      }
UNCOV
1732
      taosMemoryFree(udf);
×
1733
    }
1734
  }
UNCOV
1735
  taosHashCleanup(global.udfsHash);
×
UNCOV
1736
  taosArrayDestroy(global.residentFuncs);
×
UNCOV
1737
  fnInfo("taosudf resident functions are deinit");
×
UNCOV
1738
}
×
1739

UNCOV
1740
int32_t udfdCreateUdfSourceDir() {
×
UNCOV
1741
  snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir);
×
UNCOV
1742
  int32_t code = taosMkDir(global.udfDataDir);
×
1743
  if (code != TSDB_CODE_SUCCESS) {
×
1744
    snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir);
×
UNCOV
1745
    code = taosMkDir(global.udfDataDir);
×
1746
  }
UNCOV
1747
  fnInfo("taosudf create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code));
×
1748

UNCOV
1749
  return code;
×
1750
}
1751

UNCOV
1752
void udfdDestroyUdfSourceDir() {
×
UNCOV
1753
  fnInfo("destory udf source directory %s", global.udfDataDir);
×
UNCOV
1754
  taosRemoveDir(global.udfDataDir);
×
UNCOV
1755
}
×
1756

UNCOV
1757
int main(int argc, char *argv[]) {
×
UNCOV
1758
  int  code = 0;
×
UNCOV
1759
  bool logInitialized = false;
×
UNCOV
1760
  bool cfgInitialized = false;
×
UNCOV
1761
  bool openClientRpcFinished = false;
×
UNCOV
1762
  bool residentFuncsInited = false;
×
UNCOV
1763
  bool udfSourceDirInited = false;
×
UNCOV
1764
  bool globalDataInited = false;
×
1765

1766
  if (!taosCheckSystemIsLittleEnd()) {
×
1767
    (void)printf("failed to start since on non-little-end machines\n");
×
UNCOV
1768
    return -1;
×
1769
  }
1770

1771
  if (udfdParseArgs(argc, argv) != 0) {
×
1772
    (void)printf("failed to start since parse args error\n");
×
UNCOV
1773
    return -1;
×
1774
  }
1775

1776
  if (global.printVersion) {
×
1777
    udfdPrintVersion();
×
UNCOV
1778
    return 0;
×
1779
  }
1780

UNCOV
1781
  if (udfdInitLog() != 0) {
×
1782
    // ignore create log failed, because this error no matter
UNCOV
1783
    (void)printf("failed to init taosudf log.");
×
1784
  } else {
UNCOV
1785
    logInitialized = true;  // log is initialized
×
1786
  }
1787

1788
  if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
×
1789
    fnError("failed to start since read config error");
×
1790
    code = -2;
×
UNCOV
1791
    goto _exit;
×
1792
  }
UNCOV
1793
  cfgInitialized = true;  // cfg is initialized
×
UNCOV
1794
  fnInfo("taosudf start with config file %s", configDir);
×
1795

1796
  if (initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp) != 0) {
×
1797
    fnError("init ep set from cfg failed");
×
1798
    code = -3;
×
UNCOV
1799
    goto _exit;
×
1800
  }
UNCOV
1801
  fnInfo("taosudf start with mnode ep %s", global.mgmtEp.epSet.eps[0].fqdn);
×
1802
  if (udfdOpenClientRpc() != 0) {
×
1803
    fnError("open rpc connection to mnode failed");
×
1804
    code = -4;
×
UNCOV
1805
    goto _exit;
×
1806
  }
UNCOV
1807
  fnInfo("taosudf rpc client is opened");
×
UNCOV
1808
  openClientRpcFinished = true;  // rpc is opened
×
1809

1810
  if (udfdCreateUdfSourceDir() != 0) {
×
1811
    fnError("create udf source directory failed");
×
1812
    code = -5;
×
UNCOV
1813
    goto _exit;
×
1814
  }
UNCOV
1815
  udfSourceDirInited = true;  // udf source dir is created
×
UNCOV
1816
  fnInfo("taosudf udf source directory is created");
×
1817

1818
  if (udfdGlobalDataInit() != 0) {
×
1819
    fnError("init global data failed");
×
1820
    code = -6;
×
UNCOV
1821
    goto _exit;
×
1822
  }
UNCOV
1823
  globalDataInited = true;  // global data is inited
×
UNCOV
1824
  fnInfo("taosudf global data is inited");
×
1825

1826
  if (udfdUvInit() != 0) {
×
1827
    fnError("uv init failure");
×
1828
    code = -7;
×
UNCOV
1829
    goto _exit;
×
1830
  }
UNCOV
1831
  fnInfo("taosudf uv is inited");
×
1832

1833
  if (udfdInitResidentFuncs() != 0) {
×
1834
    fnError("init resident functions failed");
×
1835
    code = -8;
×
UNCOV
1836
    goto _exit;
×
1837
  }
UNCOV
1838
  residentFuncsInited = true;  // resident functions are inited
×
UNCOV
1839
  fnInfo("taosudf resident functions are inited");
×
1840

UNCOV
1841
  udfdRun();
×
UNCOV
1842
  fnInfo("taosudf exit normally");
×
1843

UNCOV
1844
  removeListeningPipe();
×
1845

UNCOV
1846
_exit:
×
UNCOV
1847
  if (residentFuncsInited) {
×
UNCOV
1848
    udfdDeinitResidentFuncs();
×
1849
  }
UNCOV
1850
  udfdDeinitScriptPlugins();
×
UNCOV
1851
  if (globalDataInited) {
×
UNCOV
1852
    udfdGlobalDataDeinit();
×
1853
  }
UNCOV
1854
  if (udfSourceDirInited) {
×
UNCOV
1855
    udfdDestroyUdfSourceDir();
×
1856
  }
UNCOV
1857
  if (openClientRpcFinished) {
×
UNCOV
1858
    udfdCloseClientRpc();
×
1859
  }
UNCOV
1860
  if (cfgInitialized) {
×
UNCOV
1861
    taosCleanupCfg();
×
1862
  }
UNCOV
1863
  if (logInitialized) {
×
UNCOV
1864
    taosCloseLog();
×
1865
  }
1866

UNCOV
1867
  return code;
×
1868
}
1869
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc