• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4921

08 Jan 2026 11:50AM UTC coverage: 65.541%. Remained the same
#4921

push

travis-ci

web-flow
merge: from main to 3.0 branch #34215

16 of 22 new or added lines in 3 files covered. (72.73%)

305 existing lines in 2 files now uncovered.

198770 of 303278 relevant lines covered (65.54%)

127655036.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.24
/source/libs/function/src/tudf.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifdef USE_UDF
16
#include "uv.h"
17

18
#include "os.h"
19

20
#include "builtinsimpl.h"
21
#include "fnLog.h"
22
#include "functionMgt.h"
23
#include "querynodes.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tglobal.h"
27
#include "tudf.h"
28
#include "tudfInt.h"
29

30
#ifdef _TD_DARWIN_64
31
#include <mach-o/dyld.h>
32
#endif
33

34
typedef struct SUdfdData {
35
  bool         startCalled;
36
  bool         needCleanUp;
37
  uv_loop_t    loop;
38
  uv_thread_t  thread;
39
  uv_barrier_t barrier;
40
  uv_process_t process;
41
#ifdef WINDOWS
42
  HANDLE jobHandle;
43
#endif
44
  int32_t    spawnErr;
45
  uv_pipe_t  ctrlPipe;
46
  uv_async_t stopAsync;
47
  int32_t    stopCalled;
48

49
  int32_t dnodeId;
50
} SUdfdData;
51

52
SUdfdData udfdGlobal = {0};
53

54
int32_t udfStartUdfd(int32_t startDnodeId);
55
void    udfStopUdfd();
56

57
extern char **environ;
58

59
static int32_t udfSpawnUdfd(SUdfdData *pData);
60
void           udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
61
static void    udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
62
static void    udfUdfdStopAsyncCb(uv_async_t *async);
63
static void    udfWatchUdfd(void *args);
64

65
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
1,131✔
66
  TAOS_UDF_CHECK_PTR_RVOID(process);
2,262✔
67
  fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
1,131✔
68
  SUdfdData *pData = process->data;
1,131✔
69
  if (pData == NULL) {
1,131✔
70
    fnError("udfd process data is NULL");
×
71
    return;
×
72
  }
73
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
1,131✔
74
    fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
×
75
  } else {
76
    fnInfo("udfd process restart");
1,131✔
77
    int32_t code = udfSpawnUdfd(pData);
1,131✔
78
    if (code != 0) {
1,131✔
79
      fnError("udfd process restart failed with code:%d", code);
×
80
    }
81
  }
82
}
83

84
static int32_t udfSpawnUdfd(SUdfdData *pData) {
556,611✔
85
  fnInfo("start to init udfd");
556,611✔
86
  TAOS_UDF_CHECK_PTR_RCODE(pData);
1,113,222✔
87

88
  int32_t              err = 0;
556,611✔
89
  uv_process_options_t options = {0};
556,611✔
90

91
  char path[PATH_MAX] = {0};
556,611✔
92
  if (tsProcPath == NULL) {
556,611✔
93
    path[0] = '.';
×
94
#ifdef WINDOWS
95
    GetModuleFileName(NULL, path, PATH_MAX);
96
    TAOS_DIRNAME(path);
97
#elif defined(_TD_DARWIN_64)
98
    uint32_t pathSize = sizeof(path);
99
    _NSGetExecutablePath(path, &pathSize);
100
    TAOS_DIRNAME(path);
101
#endif
102
  } else {
103
    TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
556,611✔
104
    TAOS_DIRNAME(path);
556,611✔
105
  }
106

107
#ifdef WINDOWS
108
  if (strlen(path) == 0) {
109
    TAOS_STRCAT(path, "C:\\TDengine");
110
  }
111
  TAOS_STRCAT(path, "\\" CUS_PROMPT "udf.exe");
112
#else
113
  if (strlen(path) == 0) {
556,611✔
114
    TAOS_STRCAT(path, "/usr/bin");
×
115
  }
116
  TAOS_STRCAT(path, "/" CUS_PROMPT "udf");
556,611✔
117
#endif
118
  char *argsUdfd[] = {path, "-c", configDir, NULL};
556,611✔
119
  options.args = argsUdfd;
556,611✔
120
  options.file = path;
556,611✔
121

122
  options.exit_cb = udfUdfdExit;
556,611✔
123

124
  TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
556,611✔
125

126
  uv_stdio_container_t child_stdio[3];
556,395✔
127
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
556,611✔
128
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
556,611✔
129
  child_stdio[1].flags = UV_IGNORE;
556,611✔
130
  child_stdio[2].flags = UV_INHERIT_FD;
556,611✔
131
  child_stdio[2].data.fd = 2;
556,611✔
132
  options.stdio_count = 3;
556,611✔
133
  options.stdio = child_stdio;
556,611✔
134

135
  options.flags = UV_PROCESS_DETACHED;
556,611✔
136

137
  char dnodeIdEnvItem[32] = {0};
556,611✔
138
  char thrdPoolSizeEnvItem[32] = {0};
556,611✔
139
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
556,611✔
140

141
  float   numCpuCores = 4;
556,611✔
142
  int32_t code = taosGetCpuCores(&numCpuCores, false);
556,611✔
143
  if (code != 0) {
556,611✔
144
    fnError("failed to get cpu cores, code:0x%x", code);
×
145
  }
146
  numCpuCores = TMAX(numCpuCores, 2);
556,611✔
147
  snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);
556,611✔
148

149
  char    pathTaosdLdLib[512] = {0};
556,611✔
150
  size_t  taosdLdLibPathLen = sizeof(pathTaosdLdLib);
556,611✔
151
  int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
556,611✔
152
  if (ret != UV_ENOBUFS) {
556,611✔
153
    taosdLdLibPathLen = strlen(pathTaosdLdLib);
556,611✔
154
  }
155

156
  char   udfdPathLdLib[1024] = {0};
556,611✔
157
  size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
556,611✔
158
  tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));
556,611✔
159

160
  udfdPathLdLib[udfdLdLibPathLen] = ':';
556,611✔
161
  tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
556,611✔
162
  if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
556,611✔
163
    fnInfo("udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
556,611✔
164
  } else {
165
    fnError("can not set correct udfd LD_LIBRARY_PATH");
×
166
  }
167
  char ldLibPathEnvItem[1024 + 32] = {0};
556,611✔
168
  snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
556,611✔
169

170
  char *taosFqdnEnvItem = NULL;
556,611✔
171
  char *taosFqdn = getenv("TAOS_FQDN");
556,611✔
172
  if (taosFqdn != NULL) {
556,611✔
173
    int32_t subLen = strlen(taosFqdn);
×
174
    int32_t len = strlen("TAOS_FQDN=") + subLen + 1;
×
175
    taosFqdnEnvItem = taosMemoryMalloc(len);
×
176
    if (taosFqdnEnvItem != NULL) {
×
177
      tstrncpy(taosFqdnEnvItem, "TAOS_FQDN=", len);
×
178
      TAOS_STRNCAT(taosFqdnEnvItem, taosFqdn, subLen);
×
179
      fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
×
180
    } else {
181
      fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
×
182
      return terrno;
×
183
    }
184
  }
185

186
  char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};
556,611✔
187

188
  char **envUdfdWithPEnv = NULL;
556,611✔
189
  if (environ != NULL) {
556,611✔
190
    int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
556,611✔
191
    int32_t numEnviron = 0;
556,611✔
192
    while (environ[numEnviron] != NULL) {
16,841,331✔
193
      numEnviron++;
16,284,720✔
194
    }
195

196
    envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
556,611✔
197
    if (envUdfdWithPEnv == NULL) {
556,611✔
198
      err = TSDB_CODE_OUT_OF_MEMORY;
×
199
      goto _OVER;
×
200
    }
201

202
    for (int32_t i = 0; i < numEnviron; i++) {
16,841,331✔
203
      int32_t len = strlen(environ[i]) + 1;
16,284,720✔
204
      envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
16,284,720✔
205
      if (envUdfdWithPEnv[i] == NULL) {
16,284,720✔
206
        err = TSDB_CODE_OUT_OF_MEMORY;
×
207
        goto _OVER;
×
208
      }
209

210
      tstrncpy(envUdfdWithPEnv[i], environ[i], len);
16,284,720✔
211
    }
212

213
    for (int32_t i = 0; i < lenEnvUdfd; i++) {
3,339,666✔
214
      if (envUdfd[i] != NULL) {
2,783,055✔
215
        int32_t len = strlen(envUdfd[i]) + 1;
1,669,833✔
216
        envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
1,669,833✔
217
        if (envUdfdWithPEnv[numEnviron + i] == NULL) {
1,669,833✔
218
          err = TSDB_CODE_OUT_OF_MEMORY;
×
219
          goto _OVER;
×
220
        }
221

222
        tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
1,669,833✔
223
      }
224
    }
225
    envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
556,611✔
226

227
    options.env = envUdfdWithPEnv;
556,611✔
228
  } else {
229
    options.env = envUdfd;
×
230
  }
231

232
  err = uv_spawn(&pData->loop, &pData->process, &options);
556,611✔
233
  pData->process.data = (void *)pData;
556,611✔
234

235
#ifdef WINDOWS
236
  // End udfd.exe by Job.
237
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
238
  pData->jobHandle = CreateJobObject(NULL, NULL);
239
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
240
  if (!add_job_ok) {
241
    fnError("Assign udfd to job failed.");
242
  } else {
243
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
244
    memset(&limit_info, 0x0, sizeof(limit_info));
245
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
246
    bool set_auto_kill_ok =
247
        SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
248
    if (!set_auto_kill_ok) {
249
      fnError("Set job auto kill udfd failed.");
250
    }
251
  }
252
#endif
253

254
  if (err != 0) {
556,611✔
255
    fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
×
256
  } else {
257
    fnInfo("udfd is initialized");
556,611✔
258
  }
259

260
_OVER:
556,395✔
261
  if (taosFqdnEnvItem) {
556,611✔
262
    taosMemoryFree(taosFqdnEnvItem);
×
263
  }
264

265
  if (envUdfdWithPEnv != NULL) {
556,611✔
266
    int32_t i = 0;
556,611✔
267
    while (envUdfdWithPEnv[i] != NULL) {
18,511,164✔
268
      taosMemoryFree(envUdfdWithPEnv[i]);
17,954,553✔
269
      i++;
17,954,553✔
270
    }
271
    taosMemoryFree(envUdfdWithPEnv);
556,611✔
272
  }
273

274
  return err;
556,611✔
275
}
276

277
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) {
2,772,358✔
278
  TAOS_UDF_CHECK_PTR_RVOID(handle);
5,544,716✔
279
  if (!uv_is_closing(handle)) {
2,772,358✔
280
    uv_close(handle, NULL);
2,772,358✔
281
  }
282
}
283

284
static void udfUdfdStopAsyncCb(uv_async_t *async) {
555,480✔
285
  TAOS_UDF_CHECK_PTR_RVOID(async);
1,110,960✔
286
  SUdfdData *pData = async->data;
555,480✔
287
  uv_stop(&pData->loop);
555,480✔
288
}
289

290
static void udfWatchUdfd(void *args) {
555,480✔
291
  TAOS_UDF_CHECK_PTR_RVOID(args);
1,110,960✔
292
  SUdfdData *pData = args;
555,480✔
293
  TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
555,480✔
294
  TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb));
555,480✔
295
  pData->stopAsync.data = pData;
555,480✔
296
  TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData));
555,480✔
297
  atomic_store_32(&pData->spawnErr, 0);
555,480✔
298
  (void)uv_barrier_wait(&pData->barrier);
555,480✔
299
  int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
555,480✔
300
  fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
555,480✔
301

302
  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
555,480✔
303
  num = uv_run(&pData->loop, UV_RUN_DEFAULT);
555,480✔
304
  fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
555,480✔
305
  if (uv_loop_close(&pData->loop) != 0) {
555,480✔
306
    fnError("udfd loop close failed, lino:%d", __LINE__);
×
307
  }
308
  return;
555,480✔
309

310
_exit:
×
311
  if (terrno != 0) {
×
312
    (void)uv_barrier_wait(&pData->barrier);
×
313
    atomic_store_32(&pData->spawnErr, terrno);
×
314
    if (uv_loop_close(&pData->loop) != 0) {
×
315
      fnError("udfd loop close failed, lino:%d", __LINE__);
×
316
    }
317
    fnError("udfd thread exit with code:%d lino:%d", terrno, terrln);
×
318
    terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
319
  }
320
  return;
×
321
}
322

323
int32_t udfStartUdfd(int32_t startDnodeId) {
555,912✔
324
  int32_t code = 0, lino = 0;
555,912✔
325
  if (!tsStartUdfd) {
555,912✔
326
    fnInfo("start udfd is disabled.") return 0;
432✔
327
  }
328
  SUdfdData *pData = &udfdGlobal;
555,480✔
329
  if (pData->startCalled) {
555,480✔
330
    fnInfo("dnode start udfd already called");
×
331
    return 0;
×
332
  }
333
  pData->startCalled = true;
555,480✔
334
  char dnodeId[8] = {0};
555,480✔
335
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
555,480✔
336
  TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
555,480✔
337
  pData->dnodeId = startDnodeId;
555,480✔
338

339
  TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
555,480✔
340
  TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, udfWatchUdfd, pData), &lino, _exit);
555,480✔
341
  (void)uv_barrier_wait(&pData->barrier);
555,480✔
342
  int32_t err = atomic_load_32(&pData->spawnErr);
555,480✔
343
  if (err != 0) {
555,480✔
344
    uv_barrier_destroy(&pData->barrier);
×
345
    if (uv_async_send(&pData->stopAsync) != 0) {
×
346
      fnError("start udfd: failed to send stop async");
×
347
    }
348
    if (uv_thread_join(&pData->thread) != 0) {
×
349
      fnError("start udfd: failed to join udfd thread");
×
350
    }
351
    pData->needCleanUp = false;
×
352
    fnInfo("udfd is cleaned up after spawn err");
×
353
    TAOS_CHECK_GOTO(err, &lino, _exit);
×
354
  } else {
355
    pData->needCleanUp = true;
555,480✔
356
  }
357
_exit:
555,480✔
358
  if (code != 0) {
555,480✔
359
    fnError("udfd start failed with code:%d, lino:%d", code, lino);
×
360
  }
361
  return code;
555,480✔
362
}
363

364
void udfStopUdfd() {
555,912✔
365
  SUdfdData *pData = &udfdGlobal;
555,912✔
366
  fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
555,912✔
367
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
555,912✔
368
    return;
432✔
369
  }
370
  atomic_store_32(&pData->stopCalled, 1);
555,480✔
371
  pData->needCleanUp = false;
555,480✔
372
  uv_barrier_destroy(&pData->barrier);
555,480✔
373
  if (uv_async_send(&pData->stopAsync) != 0) {
555,480✔
374
    fnError("stop udfd: failed to send stop async");
×
375
  }
376
  if (uv_thread_join(&pData->thread) != 0) {
555,480✔
377
    fnError("stop udfd: failed to join udfd thread");
×
378
  }
379

380
#ifdef WINDOWS
381
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
382
#endif
383
  fnInfo("udfd is cleaned up");
555,480✔
384
  return;
555,480✔
385
}
386

387
/*
388
int32_t udfGetUdfdPid(int32_t* pUdfdPid) {
389
  SUdfdData *pData = &udfdGlobal;
390
  if (pData->spawnErr) {
391
    return pData->spawnErr;
392
  }
393
  uv_pid_t pid = uv_process_get_pid(&pData->process);
394
  if (pUdfdPid) {
395
    *pUdfdPid = (int32_t)pid;
396
  }
397
  return TSDB_CODE_SUCCESS;
398
}
399
*/
400

401
//==============================================================================================
402
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
403
 * The QUEUE is copied from queue.h under libuv
404
 * */
405

406
typedef void *QUEUE[2];
407

408
/* Private macros. */
409
#define QUEUE_NEXT(q)      (*(QUEUE **)&((*(q))[0]))
410
#define QUEUE_PREV(q)      (*(QUEUE **)&((*(q))[1]))
411
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
412
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
413

414
/* Public macros. */
415
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr)-offsetof(type, field)))
416

417
/* Important note: mutating the list while QUEUE_FOREACH is
418
 * iterating over its elements results in undefined behavior.
419
 */
420
#define QUEUE_FOREACH(q, h) for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
421

422
#define QUEUE_EMPTY(q) ((const QUEUE *)(q) == (const QUEUE *)QUEUE_NEXT(q))
423

424
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
425

426
#define QUEUE_INIT(q)    \
427
  do {                   \
428
    QUEUE_NEXT(q) = (q); \
429
    QUEUE_PREV(q) = (q); \
430
  } while (0)
431

432
#define QUEUE_ADD(h, n)                 \
433
  do {                                  \
434
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
435
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
436
    QUEUE_PREV(h) = QUEUE_PREV(n);      \
437
    QUEUE_PREV_NEXT(h) = (h);           \
438
  } while (0)
439

440
#define QUEUE_SPLIT(h, q, n)       \
441
  do {                             \
442
    QUEUE_PREV(n) = QUEUE_PREV(h); \
443
    QUEUE_PREV_NEXT(n) = (n);      \
444
    QUEUE_NEXT(n) = (q);           \
445
    QUEUE_PREV(h) = QUEUE_PREV(q); \
446
    QUEUE_PREV_NEXT(h) = (h);      \
447
    QUEUE_PREV(q) = (n);           \
448
  } while (0)
449

450
#define QUEUE_MOVE(h, n)        \
451
  do {                          \
452
    if (QUEUE_EMPTY(h))         \
453
      QUEUE_INIT(n);            \
454
    else {                      \
455
      QUEUE *q = QUEUE_HEAD(h); \
456
      QUEUE_SPLIT(h, q, n);     \
457
    }                           \
458
  } while (0)
459

460
#define QUEUE_INSERT_HEAD(h, q)    \
461
  do {                             \
462
    QUEUE_NEXT(q) = QUEUE_NEXT(h); \
463
    QUEUE_PREV(q) = (h);           \
464
    QUEUE_NEXT_PREV(q) = (q);      \
465
    QUEUE_NEXT(h) = (q);           \
466
  } while (0)
467

468
#define QUEUE_INSERT_TAIL(h, q)    \
469
  do {                             \
470
    QUEUE_NEXT(q) = (h);           \
471
    QUEUE_PREV(q) = QUEUE_PREV(h); \
472
    QUEUE_PREV_NEXT(q) = (q);      \
473
    QUEUE_PREV(h) = (q);           \
474
  } while (0)
475

476
#define QUEUE_REMOVE(q)                 \
477
  do {                                  \
478
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
479
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
480
  } while (0)
481

482
enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 };
483

484
int64_t gUdfTaskSeqNum = 0;
485
typedef struct SUdfcFuncStub {
486
  char           udfName[TSDB_FUNC_NAME_LEN + 1];
487
  UdfcFuncHandle handle;
488
  int32_t        refCount;
489
  int64_t        createTime;
490
} SUdfcFuncStub;
491

492
typedef struct SUdfcProxy {
493
  char         udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
494
  uv_barrier_t initBarrier;
495

496
  uv_loop_t   uvLoop;
497
  uv_thread_t loopThread;
498
  uv_async_t  loopTaskAync;
499

500
  uv_async_t loopStopAsync;
501

502
  uv_mutex_t taskQueueMutex;
503
  int8_t     udfcState;
504
  QUEUE      taskQueue;
505
  QUEUE      uvProcTaskQueue;
506

507
  uv_mutex_t udfStubsMutex;
508
  SArray    *udfStubs;         // SUdfcFuncStub
509
  SArray    *expiredUdfStubs;  // SUdfcFuncStub
510

511
  uv_mutex_t udfcUvMutex;
512
  int8_t     initialized;
513
} SUdfcProxy;
514

515
SUdfcProxy gUdfcProxy = {0};
516

517
typedef struct SUdfcUvSession {
518
  SUdfcProxy *udfc;
519
  int64_t     severHandle;
520
  uv_pipe_t  *udfUvPipe;
521

522
  int8_t  outputType;
523
  int32_t bytes;
524
  int32_t bufSize;
525

526
  char udfName[TSDB_FUNC_NAME_LEN + 1];
527
} SUdfcUvSession;
528

529
typedef struct SClientUvTaskNode {
530
  SUdfcProxy *udfc;
531
  int8_t      type;
532
  int32_t     errCode;
533

534
  uv_pipe_t *pipe;
535

536
  int64_t  seqNum;
537
  uv_buf_t reqBuf;
538

539
  uv_sem_t taskSem;
540
  uv_buf_t rspBuf;
541

542
  QUEUE recvTaskQueue;
543
  QUEUE procTaskQueue;
544
  QUEUE connTaskQueue;
545
} SClientUvTaskNode;
546

547
typedef struct SClientUdfTask {
548
  int8_t type;
549

550
  SUdfcUvSession *session;
551

552
  union {
553
    struct {
554
      SUdfSetupRequest  req;
555
      SUdfSetupResponse rsp;
556
    } _setup;
557
    struct {
558
      SUdfCallRequest  req;
559
      SUdfCallResponse rsp;
560
    } _call;
561
    struct {
562
      SUdfTeardownRequest  req;
563
      SUdfTeardownResponse rsp;
564
    } _teardown;
565
  };
566

567
} SClientUdfTask;
568

569
typedef struct SClientConnBuf {
570
  char   *buf;
571
  int32_t len;
572
  int32_t cap;
573
  int32_t total;
574
} SClientConnBuf;
575

576
typedef struct SClientUvConn {
577
  uv_pipe_t      *pipe;
578
  QUEUE           taskQueue;
579
  SClientConnBuf  readBuf;
580
  SUdfcUvSession *session;
581
} SClientUvConn;
582

583
enum {
584
  UDFC_STATE_INITAL = 0,  // initial state
585
  UDFC_STATE_STARTNG,     // starting after udfcOpen
586
  UDFC_STATE_READY,       // started and begin to receive quests
587
  UDFC_STATE_STOPPING,    // stopping after udfcClose
588
};
589

590
void    getUdfdPipeName(char *pipeName, int32_t size);
591
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
592
void   *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request);
593
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state);
594
void   *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state);
595
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call);
596
void   *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call);
597
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown);
598
void   *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown);
599
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request);
600
void   *decodeUdfRequest(const void *buf, SUdfRequest *request);
601
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp);
602
void   *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp);
603
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp);
604
void   *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp);
605
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp);
606
void   *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse);
607
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp);
608
void   *decodeUdfResponse(const void *buf, SUdfResponse *rsp);
609
void    freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
610
void    freeUdfColumn(SUdfColumn *col);
611
void    freeUdfDataDataBlock(SUdfDataBlock *block);
612
void    freeUdfInterBuf(SUdfInterBuf *buf);
613
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
614
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
615
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
616
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);
617

618
void getUdfdPipeName(char *pipeName, int32_t size) {
1,125,009✔
619
  char    dnodeId[8] = {0};
1,125,009✔
620
  size_t  dnodeIdSize = sizeof(dnodeId);
1,125,009✔
621
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
1,125,009✔
622
  if (err != 0) {
1,125,009✔
623
    fnError("failed to get dnodeId from env since %s", uv_err_name(err));
432✔
624
    dnodeId[0] = '1';
432✔
625
  }
626
#ifdef _WIN32
627
  snprintf(pipeName, size, "%s.%x.%s", UDF_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir)),
628
           dnodeId);
629
#else
630
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
1,125,009✔
631
#endif
632
  fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName);
1,125,009✔
633
}
1,125,009✔
634

635
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
402,340✔
636
  int32_t len = 0;
402,340✔
637
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
402,340✔
638
  return len;
402,340✔
639
}
640

641
void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request) {
219,936✔
642
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
219,936✔
643
  return (void *)buf;
219,936✔
644
}
645

646
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state) {
3,138,192✔
647
  int32_t len = 0;
3,138,192✔
648
  len += taosEncodeFixedI8(buf, state->numOfResult);
3,138,192✔
649
  len += taosEncodeFixedI32(buf, state->bufLen);
3,138,192✔
650
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
3,138,192✔
651
  return len;
3,138,192✔
652
}
653

654
void *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state) {
1,603,848✔
655
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
1,603,848✔
656
  buf = taosDecodeFixedI32(buf, &state->bufLen);
1,603,848✔
657
  buf = taosDecodeBinary(buf, (void **)&state->buf, state->bufLen);
1,603,848✔
658
  return (void *)buf;
1,603,848✔
659
}
660

661
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
2,428,264✔
662
  int32_t len = 0;
2,428,264✔
663
  len += taosEncodeFixedI64(buf, call->udfHandle);
2,428,264✔
664
  len += taosEncodeFixedI8(buf, call->callType);
2,428,264✔
665
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
2,428,264✔
666
    len += tEncodeDataBlock(buf, &call->block);
514,728✔
667
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
1,913,536✔
668
    len += taosEncodeFixedI8(buf, call->initFirst);
877,800✔
669
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
1,474,636✔
670
    len += tEncodeDataBlock(buf, &call->block);
1,035,736✔
671
    len += encodeUdfInterBuf(buf, &call->interBuf);
1,035,736✔
672
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
438,900✔
673
    // len += encodeUdfInterBuf(buf, &call->interBuf);
674
    // len += encodeUdfInterBuf(buf, &call->interBuf2);
675
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
438,900✔
676
    len += encodeUdfInterBuf(buf, &call->interBuf);
438,900✔
677
  }
678
  return len;
2,428,264✔
679
}
680

681
void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) {
2,553,052✔
682
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
2,553,052✔
683
  buf = taosDecodeFixedI8(buf, &call->callType);
2,553,052✔
684
  switch (call->callType) {
2,553,052✔
685
    case TSDB_UDF_CALL_SCALA_PROC:
1,721,330✔
686
      buf = tDecodeDataBlock(buf, &call->block);
1,721,330✔
687
      break;
1,721,218✔
688
    case TSDB_UDF_CALL_AGG_INIT:
184,698✔
689
      buf = taosDecodeFixedI8(buf, &call->initFirst);
184,698✔
690
      break;
184,698✔
691
    case TSDB_UDF_CALL_AGG_PROC:
463,579✔
692
      buf = tDecodeDataBlock(buf, &call->block);
463,579✔
693
      buf = decodeUdfInterBuf(buf, &call->interBuf);
463,579✔
694
      break;
463,467✔
695
    // case TSDB_UDF_CALL_AGG_MERGE:
696
    //   buf = decodeUdfInterBuf(buf, &call->interBuf);
697
    //   buf = decodeUdfInterBuf(buf, &call->interBuf2);
698
    //   break;
699
    case TSDB_UDF_CALL_AGG_FIN:
183,501✔
700
      buf = decodeUdfInterBuf(buf, &call->interBuf);
183,501✔
701
      break;
183,501✔
702
  }
703
  return (void *)buf;
2,552,828✔
704
}
705

706
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
394,800✔
707
  int32_t len = 0;
394,800✔
708
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
394,800✔
709
  return len;
394,800✔
710
}
711

712
void *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown) {
218,865✔
713
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
218,865✔
714
  return (void *)buf;
218,865✔
715
}
716

717
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request) {
3,225,404✔
718
  int32_t len = 0;
3,225,404✔
719
  if (buf == NULL) {
3,225,404✔
720
    len += sizeof(request->msgLen);
1,612,702✔
721
  } else {
722
    *(int32_t *)(*buf) = request->msgLen;
1,612,702✔
723
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
1,612,702✔
724
  }
725
  len += taosEncodeFixedI64(buf, request->seqNum);
3,225,404✔
726
  len += taosEncodeFixedI8(buf, request->type);
3,225,404✔
727
  if (request->type == UDF_TASK_SETUP) {
3,225,404✔
728
    len += encodeUdfSetupRequest(buf, &request->setup);
402,340✔
729
  } else if (request->type == UDF_TASK_CALL) {
2,823,064✔
730
    len += encodeUdfCallRequest(buf, &request->call);
2,428,264✔
731
  } else if (request->type == UDF_TASK_TEARDOWN) {
394,800✔
732
    len += encodeUdfTeardownRequest(buf, &request->teardown);
394,800✔
733
  }
734
  return len;
3,225,404✔
735
}
736

737
void *decodeUdfRequest(const void *buf, SUdfRequest *request) {
2,991,909✔
738
  request->msgLen = *(int32_t *)(buf);
2,991,909✔
739
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
2,991,909✔
740

741
  buf = taosDecodeFixedI64(buf, &request->seqNum);
2,991,909✔
742
  buf = taosDecodeFixedI8(buf, &request->type);
2,991,909✔
743

744
  if (request->type == UDF_TASK_SETUP) {
2,991,909✔
745
    buf = decodeUdfSetupRequest(buf, &request->setup);
219,936✔
746
  } else if (request->type == UDF_TASK_CALL) {
2,771,973✔
747
    buf = decodeUdfCallRequest(buf, &request->call);
2,553,220✔
748
  } else if (request->type == UDF_TASK_TEARDOWN) {
218,753✔
749
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
218,865✔
750
  }
751
  return (void *)buf;
2,991,797✔
752
}
753

754
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
439,872✔
755
  int32_t len = 0;
439,872✔
756
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
439,872✔
757
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
439,872✔
758
  len += taosEncodeFixedI32(buf, setupRsp->bytes);
439,872✔
759
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
439,872✔
760
  return len;
439,872✔
761
}
762

763
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
201,170✔
764
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
201,170✔
765
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
201,170✔
766
  buf = taosDecodeFixedI32(buf, &setupRsp->bytes);
201,170✔
767
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
201,170✔
768
  return (void *)buf;
201,170✔
769
}
770

771
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
5,103,360✔
772
  int32_t len = 0;
5,103,360✔
773
  len += taosEncodeFixedI8(buf, callRsp->callType);
5,103,360✔
774
  switch (callRsp->callType) {
5,103,360✔
775
    case TSDB_UDF_CALL_SCALA_PROC:
3,440,532✔
776
      len += tEncodeDataBlock(buf, &callRsp->resultData);
3,440,532✔
777
      break;
3,437,617✔
778
    case TSDB_UDF_CALL_AGG_INIT:
369,396✔
779
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
369,396✔
780
      break;
369,396✔
781
    case TSDB_UDF_CALL_AGG_PROC:
927,158✔
782
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
927,158✔
783
      break;
927,158✔
784
    // case TSDB_UDF_CALL_AGG_MERGE:
785
    //   len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
786
    //   break;
787
    case TSDB_UDF_CALL_AGG_FIN:
367,002✔
788
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
367,002✔
789
      break;
367,002✔
790
  }
791
  return len;
5,100,445✔
792
}
793

794
void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) {
1,213,378✔
795
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
1,213,378✔
796
  switch (callRsp->callType) {
1,213,378✔
797
    case TSDB_UDF_CALL_SCALA_PROC:
256,610✔
798
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
256,610✔
799
      break;
256,610✔
800
    case TSDB_UDF_CALL_AGG_INIT:
219,450✔
801
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
219,450✔
802
      break;
219,450✔
803
    case TSDB_UDF_CALL_AGG_PROC:
517,868✔
804
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
517,868✔
805
      break;
517,868✔
806
    // case TSDB_UDF_CALL_AGG_MERGE:
807
    //   buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
808
    //   break;
809
    case TSDB_UDF_CALL_AGG_FIN:
219,450✔
810
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
219,450✔
811
      break;
219,450✔
812
  }
813
  return (void *)buf;
1,213,378✔
814
}
815

816
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp) { return 0; }
437,730✔
817

818
void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse) { return (void *)buf; }
197,400✔
819

820
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) {
5,981,410✔
821
  int32_t len = 0;
5,981,410✔
822
  len += sizeof(rsp->msgLen);
5,981,410✔
823
  if (buf != NULL) {
5,981,410✔
824
    *(int32_t *)(*buf) = rsp->msgLen;
2,991,629✔
825
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
2,991,629✔
826
  }
827

828
  len += sizeof(rsp->seqNum);
5,981,410✔
829
  if (buf != NULL) {
5,981,410✔
830
    *(int64_t *)(*buf) = rsp->seqNum;
2,991,125✔
831
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
2,991,125✔
832
  }
833

834
  len += taosEncodeFixedI64(buf, rsp->seqNum);
5,981,410✔
835
  len += taosEncodeFixedI8(buf, rsp->type);
5,981,410✔
836
  len += taosEncodeFixedI32(buf, rsp->code);
5,981,410✔
837

838
  switch (rsp->type) {
5,981,410✔
839
    case UDF_TASK_SETUP:
439,872✔
840
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
439,872✔
841
      break;
439,872✔
842
    case UDF_TASK_CALL:
5,103,248✔
843
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
5,103,248✔
844
      break;
5,101,733✔
845
    case UDF_TASK_TEARDOWN:
437,730✔
846
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
437,730✔
847
      break;
437,730✔
848
    default:
560✔
849
      fnError("encode udf response, invalid udf response type %d", rsp->type);
560✔
850
      break;
×
851
  }
852
  return len;
5,979,335✔
853
}
854

855
void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) {
1,612,702✔
856
  rsp->msgLen = *(int32_t *)(buf);
1,612,702✔
857
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
1,612,702✔
858
  rsp->seqNum = *(int64_t *)(buf);
1,612,702✔
859
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
1,612,702✔
860
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
1,612,702✔
861
  buf = taosDecodeFixedI8(buf, &rsp->type);
1,612,702✔
862
  buf = taosDecodeFixedI32(buf, &rsp->code);
1,612,702✔
863

864
  switch (rsp->type) {
1,612,702✔
865
    case UDF_TASK_SETUP:
201,170✔
866
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
201,170✔
867
      break;
201,170✔
868
    case UDF_TASK_CALL:
1,214,132✔
869
      if (rsp->code) {
1,214,132✔
870
        fnError("udf response failed, code:0x%x", rsp->code);
754✔
871

872
        return NULL;
754✔
873
      }
874

875
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
1,213,378✔
876
      break;
1,213,378✔
877
    case UDF_TASK_TEARDOWN:
197,400✔
878
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
197,400✔
879
      break;
197,400✔
UNCOV
880
    default:
×
UNCOV
881
      rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR;
×
UNCOV
882
      fnError("decode udf response, invalid udf response type %d", rsp->type);
×
UNCOV
883
      break;
×
884
  }
885
  if (buf == NULL) {
1,611,948✔
886
    rsp->code = terrno;
×
887
    fnError("decode udf response failed, code:0x%x", rsp->code);
×
888
  }
889
  return (void *)buf;
1,611,948✔
890
}
891

892
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
3,976,469✔
893
  TAOS_UDF_CHECK_PTR_RVOID(data, meta);
11,929,519✔
894
  if (IS_VAR_DATA_TYPE(meta->type)) {
3,976,469✔
895
    taosMemoryFree(data->varLenCol.varOffsets);
15,252✔
896
    data->varLenCol.varOffsets = NULL;
15,528✔
897
    taosMemoryFree(data->varLenCol.payload);
15,528✔
898
    data->varLenCol.payload = NULL;
15,528✔
899
  } else {
900
    taosMemoryFree(data->fixLenCol.nullBitmap);
3,961,217✔
901
    data->fixLenCol.nullBitmap = NULL;
3,960,269✔
902
    taosMemoryFree(data->fixLenCol.data);
3,960,269✔
903
    data->fixLenCol.data = NULL;
3,960,997✔
904
  }
905
}
906

907
void freeUdfColumn(SUdfColumn *col) {
3,976,189✔
908
  TAOS_UDF_CHECK_PTR_RVOID(col);
7,952,938✔
909
  freeUdfColumnData(&col->colData, &col->colMeta);
3,976,189✔
910
}
911

912
void freeUdfDataDataBlock(SUdfDataBlock *block) {
2,183,453✔
913
  TAOS_UDF_CHECK_PTR_RVOID(block);
4,367,466✔
914
  for (int32_t i = 0; i < block->numOfCols; ++i) {
4,439,267✔
915
    freeUdfColumn(block->udfCols[i]);
2,255,366✔
916
    taosMemoryFree(block->udfCols[i]);
2,255,758✔
917
    block->udfCols[i] = NULL;
2,255,814✔
918
  }
919
  taosMemoryFree(block->udfCols);
2,183,901✔
920
  block->udfCols = NULL;
2,184,405✔
921
}
922

923
void freeUdfInterBuf(SUdfInterBuf *buf) {
2,435,626✔
924
  TAOS_UDF_CHECK_PTR_RVOID(buf);
4,871,252✔
925
  taosMemoryFree(buf->buf);
2,435,626✔
926
  buf->buf = NULL;
2,435,626✔
927
}
928

929
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
2,184,573✔
930
  TAOS_UDF_CHECK_PTR_RCODE(block, udfBlock);
6,553,657✔
931
  int32_t code = blockDataCheck(block);
2,184,573✔
932
  if (code != TSDB_CODE_SUCCESS) {
2,184,461✔
UNCOV
933
    return code;
×
934
  }
935
  udfBlock->numOfRows = block->info.rows;
2,184,461✔
936
  udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
2,184,461✔
937
  udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
2,184,797✔
938
  if ((udfBlock->udfCols) == NULL) {
2,184,797✔
939
    return terrno;
×
940
  }
941
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
4,440,838✔
942
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
2,256,153✔
943
    if (udfBlock->udfCols[i] == NULL) {
2,255,982✔
UNCOV
944
      return terrno;
×
945
    }
946
    SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
2,255,982✔
947
    SUdfColumn      *udfCol = udfBlock->udfCols[i];
2,255,590✔
948
    udfCol->colMeta.type = col->info.type;
2,255,590✔
949
    udfCol->colMeta.bytes = col->info.bytes;
2,255,590✔
950
    udfCol->colMeta.scale = col->info.scale;
2,255,590✔
951
    udfCol->colMeta.precision = col->info.precision;
2,255,590✔
952
    udfCol->colData.numOfRows = udfBlock->numOfRows;
2,255,590✔
953
    udfCol->hasNull = col->hasNull;
2,255,590✔
954
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
2,255,590✔
955
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
9,410✔
956
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
9,410✔
957
      if (udfCol->colData.varLenCol.varOffsets == NULL) {
9,628✔
UNCOV
958
        return terrno;
×
959
      }
960
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
9,628✔
961
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
9,628✔
962
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
9,628✔
963
      if (udfCol->colData.varLenCol.payload == NULL) {
9,628✔
964
        return terrno;
×
965
      }
966
      if (col->reassigned) {
9,628✔
UNCOV
967
        for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
×
UNCOV
968
          char   *pColData = col->pData + col->varmeta.offset[row];
×
UNCOV
969
          int32_t colSize = 0;
×
970
          if (col->info.type == TSDB_DATA_TYPE_JSON) {
×
UNCOV
971
            colSize = getJsonValueLen(pColData);
×
UNCOV
972
          } else if (IS_STR_DATA_BLOB(col->info.type)) {
×
973
            colSize = blobDataTLen(pColData);
×
974
          } else {
975
            colSize = varDataTLen(pColData);
×
976
          }
977
          memcpy(udfCol->colData.varLenCol.payload, pColData, colSize);
×
978
          udfCol->colData.varLenCol.payload += colSize;
×
979
        }
980
      } else {
981
        memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
9,628✔
982
      }
983
    } else {
984
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
2,246,180✔
985
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
2,246,180✔
986
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
2,246,180✔
987
      if (udfCol->colData.fixLenCol.nullBitmap == NULL) {
2,246,746✔
UNCOV
988
        return terrno;
×
989
      }
990
      char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
2,246,746✔
991
      memcpy(bitmap, col->nullbitmap, bitmapLen);
2,246,746✔
992
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
2,246,746✔
993
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
2,246,183✔
994
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
2,246,183✔
995
      if (NULL == udfCol->colData.fixLenCol.data) {
2,246,413✔
UNCOV
996
        return terrno;
×
997
      }
998
      char *data = udfCol->colData.fixLenCol.data;
2,246,413✔
999
      memcpy(data, col->pData, dataLen);
2,246,413✔
1000
    }
1001
  }
1002
  return TSDB_CODE_SUCCESS;
2,184,685✔
1003
}
1004

1005
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
1,718,306✔
1006
  TAOS_UDF_CHECK_PTR_RCODE(udfCol, block);
5,154,470✔
1007
  int32_t         code = 0, lino = 0;
1,718,306✔
1008
  SUdfColumnMeta *meta = &udfCol->colMeta;
1,718,306✔
1009

1010
  SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
1,718,306✔
1011
  code = blockDataAppendColInfo(block, &colInfoData);
1,718,698✔
1012
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,719,538✔
1013

1014
  code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
1,719,538✔
1015
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,719,258✔
1016

1017
  SColumnInfoData *col = NULL;
1,719,258✔
1018
  code = bdGetColumnInfoData(block, 0, &col);
1,719,258✔
1019
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,719,815✔
1020

1021
  for (int32_t i = 0; i < udfCol->colData.numOfRows; ++i) {
419,258,057✔
1022
    if (udfColDataIsNull(udfCol, i)) {
418,174,461✔
1023
      colDataSetNULL(col, i);
41,488,448✔
1024
    } else {
1025
      char *data = udfColDataGetData(udfCol, i);
376,686,013✔
1026
      code = colDataSetVal(col, i, data, false);
376,686,013✔
1027
      TAOS_CHECK_GOTO(code, &lino, _exit);
376,049,794✔
1028
    }
1029
  }
1030
  block->info.rows = udfCol->colData.numOfRows;
1,083,596✔
1031

1032
  code = blockDataCheck(block);
1,083,596✔
1033
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,719,538✔
1034
_exit:
1,719,538✔
1035
  if (code != 0) {
1,719,538✔
UNCOV
1036
    fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino);
×
1037
  }
1038
  return TSDB_CODE_SUCCESS;
1,718,922✔
1039
}
1040

1041
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
257,364✔
1042
  TAOS_UDF_CHECK_PTR_RCODE(input, output);
772,092✔
1043
  int32_t code = 0, lino = 0;
257,364✔
1044
  int32_t numOfRows = 0;
257,364✔
1045
  for (int32_t i = 0; i < numOfCols; ++i) {
519,316✔
1046
    numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
261,952✔
1047
  }
1048

1049
  // create the basic block info structure
1050
  for (int32_t i = 0; i < numOfCols; ++i) {
519,316✔
1051
    SColumnInfoData *pInfo = input[i].columnData;
261,952✔
1052
    SColumnInfoData  d = {0};
261,952✔
1053
    d.info = pInfo->info;
261,952✔
1054

1055
    TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit);
261,952✔
1056
  }
1057

1058
  TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit);
257,364✔
1059

1060
  for (int32_t i = 0; i < numOfCols; ++i) {
519,316✔
1061
    SColumnInfoData *pDest = taosArrayGet(output->pDataBlock, i);
261,952✔
1062

1063
    SColumnInfoData *pColInfoData = input[i].columnData;
261,952✔
1064
    TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit);
261,952✔
1065

1066
    if (input[i].numOfRows < numOfRows) {
261,952✔
UNCOV
1067
      int32_t startRow = input[i].numOfRows;
×
UNCOV
1068
      int32_t expandRows = numOfRows - startRow;
×
UNCOV
1069
      bool    isNull = colDataIsNull_s(pColInfoData, (input + i)->numOfRows - 1);
×
UNCOV
1070
      if (isNull) {
×
UNCOV
1071
        colDataSetNNULL(pDest, startRow, expandRows);
×
1072
      } else {
1073
        char *src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
×
1074
        for (int32_t j = 0; j < expandRows; ++j) {
×
1075
          TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow + j, src, false), &lino, _exit);
×
1076
        }
1077
      }
1078
    }
1079
  }
1080

1081
  output->info.rows = numOfRows;
257,364✔
1082
_exit:
257,364✔
1083
  if (code != 0) {
257,364✔
UNCOV
1084
    fnError("failed to convert scalar param to data block, code:%d, line:%d", code, lino);
×
1085
  }
1086
  return code;
257,364✔
1087
}
1088

1089
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
256,610✔
1090
  TAOS_UDF_CHECK_PTR_RCODE(input, output);
769,830✔
1091
  if (taosArrayGetSize(input->pDataBlock) != 1) {
256,610✔
UNCOV
1092
    fnError("scalar function only support one column");
×
UNCOV
1093
    return 0;
×
1094
  }
1095
  output->numOfRows = input->info.rows;
256,610✔
1096

1097
  output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
256,610✔
1098
  if (output->columnData == NULL) {
256,610✔
1099
    return terrno;
×
1100
  }
1101
  memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
256,610✔
1102
  output->colAlloced = true;
256,610✔
1103

1104
  return 0;
256,610✔
1105
}
1106

1107
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
1108
// memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
1109
typedef struct SUdfAggRes {
1110
  int8_t  finalResNum;
1111
  int8_t  interResNum;
1112
  int32_t interResBufLen;
1113
  char   *finalResBuf;
1114
  char   *interResBuf;
1115
} SUdfAggRes;
1116

1117
void    onUdfcPipeClose(uv_handle_t *handle);
1118
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
1119
void    udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
1120
bool    isUdfcUvMsgComplete(SClientConnBuf *connBuf);
1121
void    udfcUvHandleRsp(SClientUvConn *conn);
1122
void    udfcUvHandleError(SClientUvConn *conn);
1123
void    onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
1124
void    onUdfcPipeWrite(uv_write_t *write, int32_t status);
1125
void    onUdfcPipeConnect(uv_connect_t *connect, int32_t status);
1126
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
1127
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
1128
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
1129
void    udfcAsyncTaskCb(uv_async_t *async);
1130
void    cleanUpUvTasks(SUdfcProxy *udfc);
1131
void    udfStopAsyncCb(uv_async_t *async);
1132
void    constructUdfService(void *argsThread);
1133
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
1134
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
1135
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2);
1136
int32_t doTeardownUdf(UdfcFuncHandle handle);
1137

1138
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1139
                SSDataBlock *output, SUdfInterBuf *newState);
1140
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
1141
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
1142
// udf todo:  aggmerge
1143
// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
1144
//                           SUdfInterBuf *resultBuf);
1145
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
1146
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1147
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1148

1149
int32_t udfcOpen();
1150
int32_t udfcClose();
1151

1152
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
1153
void    releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
1154
int32_t cleanUpUdfs();
1155

1156
bool    udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
1157
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
1158
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
1159
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
1160

1161
void    cleanupNotExpiredUdfs();
1162
void    cleanupExpiredUdfs();
1163
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) {
3,508,822✔
1164
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
3,508,822✔
1165
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
3,508,822✔
1166
  return strcmp(stub1->udfName, stub2->udfName);
3,508,822✔
1167
}
1168

1169
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
1,217,902✔
1170
  TAOS_UDF_CHECK_PTR_RCODE(udfName, pHandle);
3,653,706✔
1171
  int32_t code = 0, line = 0;
1,217,902✔
1172
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
1,217,902✔
1173
  SUdfcFuncStub key = {0};
1,217,902✔
1174
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
1,217,902✔
1175
  int32_t stubIndex = taosArraySearchIdx(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
1,217,902✔
1176
  if (stubIndex != -1) {
1,217,902✔
1177
    SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
1,016,732✔
1178
    UdfcFuncHandle handle = foundStub->handle;
1,016,732✔
1179
    int64_t        currUs = taosGetTimestampUs();
1,016,732✔
1180
    bool           expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000;
1,016,732✔
1181
    if (!expired) {
1,016,732✔
1182
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
1,016,732✔
1183
        *pHandle = foundStub->handle;
1,016,732✔
1184
        ++foundStub->refCount;
1,016,732✔
1185
        uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
1,016,732✔
1186
        return 0;
1,016,732✔
1187
      } else {
UNCOV
1188
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
×
1189
               foundStub->refCount, foundStub->createTime);
UNCOV
1190
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
×
1191
      }
1192
    } else {
UNCOV
1193
      fnDebug("udf handle expired for %s, will setup udf. move it to expired list", udfName);
×
1194
      if (taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub) == NULL) {
×
UNCOV
1195
        fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1196
      } else {
UNCOV
1197
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
×
UNCOV
1198
        taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
×
1199
      }
1200
    }
1201
  }
1202
  //uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
1203
  *pHandle = NULL;
201,170✔
1204
  code = doSetupUdf(udfName, pHandle);
201,170✔
1205
  if (code == TSDB_CODE_SUCCESS) {
201,170✔
1206
    SUdfcFuncStub stub = {0};
197,400✔
1207
    tstrncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
197,400✔
1208
    stub.handle = *pHandle;
197,400✔
1209
    ++stub.refCount;
197,400✔
1210
    stub.createTime = taosGetTimestampUs();
197,400✔
1211
    //uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
1212
    if (taosArrayPush(gUdfcProxy.udfStubs, &stub) == NULL) {
394,800✔
UNCOV
1213
      fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
UNCOV
1214
      uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
UNCOV
1215
      goto _exit;
×
1216
    } else {
1217
      taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
197,400✔
1218
    }
1219
  } else {
1220
    *pHandle = NULL;
3,770✔
1221
  }
1222

1223
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
201,170✔
1224

1225
_exit:
201,170✔
1226
  return code;
201,170✔
1227
}
1228

1229
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
1,214,132✔
1230
  TAOS_UDF_CHECK_PTR_RVOID(udfName);
2,428,264✔
1231
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
1,214,132✔
1232
  SUdfcFuncStub key = {0};
1,214,132✔
1233
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
1,214,132✔
1234
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
1,214,132✔
1235
  SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ);
1,214,132✔
1236
  if (!foundStub && !expiredStub) {
1,214,132✔
UNCOV
1237
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
UNCOV
1238
    return;
×
1239
  }
1240
  if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) {
1,214,132✔
1241
    --foundStub->refCount;
1,214,132✔
1242
  }
1243
  if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) {
1,214,132✔
1244
    --expiredStub->refCount;
×
1245
  }
1246
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
1,214,132✔
1247
}
1248

1249
void cleanupExpiredUdfs() {
142,445✔
1250
  int32_t i = 0;
142,445✔
1251
  SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
142,445✔
1252
  if (expiredUdfStubs == NULL) {
142,445✔
UNCOV
1253
    fnError("cleanupExpiredUdfs: failed to init array");
×
UNCOV
1254
    return;
×
1255
  }
1256
  while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
142,445✔
UNCOV
1257
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
×
UNCOV
1258
    if (stub->refCount == 0) {
×
UNCOV
1259
      fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle,
×
1260
             stub->refCount);
1261
      (void)doTeardownUdf(stub->handle);
×
1262
    } else {
UNCOV
1263
      fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p",
×
1264
             stub->udfName, stub->refCount, stub->createTime, stub->handle);
1265
      UdfcFuncHandle handle = stub->handle;
×
1266
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
×
UNCOV
1267
        if (taosArrayPush(expiredUdfStubs, stub) == NULL) {
×
1268
          fnError("cleanupExpiredUdfs: failed to push udf stub to array");
×
1269
        }
1270
      } else {
UNCOV
1271
        fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
×
1272
               stub->udfName, stub->refCount, stub->createTime);
1273
      }
1274
    }
1275
    ++i;
×
1276
  }
1277
  taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
142,445✔
1278
  gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
142,445✔
1279
}
1280

1281
void cleanupNotExpiredUdfs() {
142,445✔
1282
  SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
142,445✔
1283
  if (udfStubs == NULL) {
142,445✔
UNCOV
1284
    fnError("cleanupNotExpiredUdfs: failed to init array");
×
UNCOV
1285
    return;
×
1286
  }
1287
  int32_t i = 0;
142,445✔
1288
  while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
342,979✔
1289
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
200,534✔
1290
    if (stub->refCount == 0) {
200,534✔
1291
      fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
197,400✔
1292
      (void)doTeardownUdf(stub->handle);
197,400✔
1293
    } else {
1294
      fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
3,134✔
1295
             stub->refCount, stub->createTime, stub->handle);
1296
      UdfcFuncHandle handle = stub->handle;
3,134✔
1297
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
3,134✔
1298
        if (taosArrayPush(udfStubs, stub) == NULL) {
3,134✔
UNCOV
1299
          fnError("cleanupNotExpiredUdfs: failed to push udf stub to array");
×
1300
        }
1301
      } else {
UNCOV
1302
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName,
×
1303
               stub->refCount, stub->createTime);
1304
      }
1305
    }
1306
    ++i;
200,534✔
1307
  }
1308
  taosArrayDestroy(gUdfcProxy.udfStubs);
142,445✔
1309
  gUdfcProxy.udfStubs = udfStubs;
142,445✔
1310
}
1311

1312
int32_t cleanUpUdfs() {
558,480,772✔
1313
  int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
558,480,772✔
1314
  if (!initialized) {
558,244,536✔
1315
    return TSDB_CODE_SUCCESS;
18,715✔
1316
  }
1317

1318
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
558,225,821✔
1319
  if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
558,660,583✔
1320
      (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
558,518,138✔
1321
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
558,518,138✔
1322
    return TSDB_CODE_SUCCESS;
558,518,138✔
1323
  }
1324

1325
  cleanupNotExpiredUdfs();
142,445✔
1326
  cleanupExpiredUdfs();
142,445✔
1327

1328
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
142,445✔
1329
  return 0;
142,445✔
1330
}
1331

1332
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
261,134✔
1333
  TAOS_UDF_CHECK_PTR_RCODE(udfName, input, output);
1,044,536✔
1334
  UdfcFuncHandle handle = NULL;
261,134✔
1335
  int32_t        code = acquireUdfFuncHandle(udfName, &handle);
261,134✔
1336
  if (code != 0) {
261,134✔
1337
    return code;
3,770✔
1338
  }
1339

1340
  SUdfcUvSession *session = handle;
257,364✔
1341
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
257,364✔
1342
  if (code != TSDB_CODE_SUCCESS) {
257,364✔
1343
    fnError("udfc scalar function execution failure");
754✔
1344
    releaseUdfFuncHandle(udfName, handle);
754✔
1345
    return code;
754✔
1346
  }
1347

1348
  if (output->columnData == NULL) {
256,610✔
UNCOV
1349
    fnError("udfc scalar function calculate error. no column data");
×
UNCOV
1350
    code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1351
  } else {
1352
    if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) {
256,610✔
UNCOV
1353
      fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
×
1354
              session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes);
UNCOV
1355
      code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1356
    }
1357
  }
1358
  releaseUdfFuncHandle(udfName, handle);
256,610✔
1359
  return code;
256,610✔
1360
}
1361

1362
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
95,299✔
1363
  if (pFunc == NULL || pEnv == NULL) {
95,299✔
UNCOV
1364
    fnError("udfAggGetEnv: invalid input lint: %d", __LINE__);
×
UNCOV
1365
    return false;
×
1366
  }
1367
  if (fmIsScalarFunc(pFunc->funcId)) {
95,299✔
UNCOV
1368
    return false;
×
1369
  }
1370
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
95,299✔
1371
  return true;
95,299✔
1372
}
1373

1374
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) {
219,450✔
1375
  TAOS_UDF_CHECK_PTR_RCODE(pCtx, pResultCellInfo);
658,350✔
1376
  if (pResultCellInfo->initialized) {
219,450✔
UNCOV
1377
    return TSDB_CODE_SUCCESS;
×
1378
  }
1379
  if (functionSetup(pCtx, pResultCellInfo) != TSDB_CODE_SUCCESS) {
219,450✔
UNCOV
1380
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1381
  }
1382
  UdfcFuncHandle handle;
219,450✔
1383
  int32_t        udfCode = 0;
219,450✔
1384
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
219,450✔
UNCOV
1385
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
×
UNCOV
1386
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1387
  }
1388
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
219,450✔
1389
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
219,450✔
1390
  int32_t         envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize;
219,450✔
1391
  memset(udfRes, 0, envSize);
219,450✔
1392

1393
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
219,450✔
1394
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
219,450✔
1395

1396
  SUdfInterBuf buf = {0};
219,450✔
1397
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
219,450✔
UNCOV
1398
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
×
UNCOV
1399
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
UNCOV
1400
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1401
  }
1402
  if (buf.bufLen <= session->bufSize) {
219,450✔
1403
    memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
219,450✔
1404
    udfRes->interResBufLen = buf.bufLen;
219,450✔
1405
    udfRes->interResNum = buf.numOfResult;
219,450✔
1406
  } else {
1407
    fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
×
UNCOV
1408
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
UNCOV
1409
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1410
  }
1411
  releaseUdfFuncHandle(pCtx->udfName, handle);
219,450✔
1412
  freeUdfInterBuf(&buf);
219,450✔
1413
  return TSDB_CODE_SUCCESS;
219,450✔
1414
}
1415

1416
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
517,868✔
1417
  TAOS_UDF_CHECK_PTR_RCODE(pCtx);
1,035,736✔
1418
  int32_t        udfCode = 0;
517,868✔
1419
  UdfcFuncHandle handle = 0;
517,868✔
1420
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
517,868✔
UNCOV
1421
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
UNCOV
1422
    return udfCode;
×
1423
  }
1424

1425
  SUdfcUvSession *session = handle;
517,868✔
1426
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
517,868✔
1427
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
517,868✔
1428
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
517,868✔
1429

1430
  SInputColumnInfoData *pInput = &pCtx->input;
517,868✔
1431
  int32_t               numOfCols = pInput->numOfInputCols;
517,868✔
1432
  int32_t               start = pInput->startRowIndex;
517,868✔
1433
  int32_t               numOfRows = pInput->numOfRows;
517,868✔
1434
  SSDataBlock          *pTempBlock = NULL;
517,868✔
1435
  int32_t               code = createDataBlock(&pTempBlock);
517,868✔
1436

1437
  if (code) {
517,868✔
UNCOV
1438
    return code;
×
1439
  }
1440

1441
  pTempBlock->info.rows = pInput->totalRows;
517,868✔
1442
  pTempBlock->info.id.uid = pInput->uid;
517,868✔
1443
  for (int32_t i = 0; i < numOfCols; ++i) {
1,082,058✔
1444
    if ((udfCode = blockDataAppendColInfo(pTempBlock, pInput->pData[i])) != 0) {
564,190✔
1445
      fnError("udfAggProcess error. step blockDataAppendColInfo. udf code: %d", udfCode);
×
UNCOV
1446
      blockDataDestroy(pTempBlock);
×
UNCOV
1447
      return udfCode;
×
1448
    }
1449
  }
1450

1451
  SSDataBlock *inputBlock = NULL;
517,868✔
1452
  code = blockDataExtractBlock(pTempBlock, start, numOfRows, &inputBlock);
517,868✔
1453
  if (code) {
517,868✔
1454
    return code;
×
1455
  }
1456

1457
  SUdfInterBuf state = {
517,868✔
1458
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
517,868✔
1459
  SUdfInterBuf newState = {0};
517,868✔
1460

1461
  udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
517,868✔
1462
  if (udfCode != 0) {
517,868✔
UNCOV
1463
    fnError("udfAggProcess error. code: %d", udfCode);
×
UNCOV
1464
    newState.numOfResult = 0;
×
1465
  } else {
1466
    if (newState.bufLen <= session->bufSize) {
517,868✔
1467
      memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
517,868✔
1468
      udfRes->interResBufLen = newState.bufLen;
517,868✔
1469
      udfRes->interResNum = newState.numOfResult;
517,868✔
1470
    } else {
1471
      fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
×
UNCOV
1472
      udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
×
1473
    }
1474
  }
1475

1476
  GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum;
517,868✔
1477

1478
  blockDataDestroy(inputBlock);
517,868✔
1479

1480
  taosArrayDestroy(pTempBlock->pDataBlock);
517,868✔
1481
  taosMemoryFree(pTempBlock);
517,868✔
1482

1483
  releaseUdfFuncHandle(pCtx->udfName, handle);
517,868✔
1484
  freeUdfInterBuf(&newState);
517,868✔
1485
  return udfCode;
517,868✔
1486
}
1487

1488
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
219,450✔
1489
  TAOS_UDF_CHECK_PTR_RCODE(pCtx, pBlock);
658,350✔
1490
  int32_t        udfCode = 0;
219,450✔
1491
  UdfcFuncHandle handle = 0;
219,450✔
1492
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
219,450✔
UNCOV
1493
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
UNCOV
1494
    return udfCode;
×
1495
  }
1496

1497
  SUdfcUvSession *session = handle;
219,450✔
1498
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
219,450✔
1499
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
219,450✔
1500
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
219,450✔
1501

1502
  SUdfInterBuf resultBuf = {0};
219,450✔
1503
  SUdfInterBuf state = {
219,450✔
1504
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
219,450✔
1505
  int32_t udfCallCode = 0;
219,450✔
1506
  udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
219,450✔
1507
  if (udfCallCode != 0) {
219,450✔
UNCOV
1508
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
×
UNCOV
1509
    GET_RES_INFO(pCtx)->numOfRes = 0;
×
1510
  } else {
1511
    if (resultBuf.numOfResult == 0) {
219,450✔
UNCOV
1512
      udfRes->finalResNum = 0;
×
UNCOV
1513
      GET_RES_INFO(pCtx)->numOfRes = 0;
×
1514
    } else {
1515
      if (resultBuf.bufLen <= session->bytes) {
219,450✔
1516
        memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen);
219,450✔
1517
        udfRes->finalResNum = resultBuf.numOfResult;
219,450✔
1518
        GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
219,450✔
1519
      } else {
1520
        fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
×
UNCOV
1521
        GET_RES_INFO(pCtx)->numOfRes = 0;
×
UNCOV
1522
        udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1523
      }
1524
    }
1525
  }
1526

1527
  freeUdfInterBuf(&resultBuf);
219,450✔
1528

1529
  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
219,450✔
1530
  releaseUdfFuncHandle(pCtx->udfName, handle);
219,450✔
1531
  return udfCallCode == 0 ? numOfResults : udfCallCode;
219,450✔
1532
}
1533

1534
void onUdfcPipeClose(uv_handle_t *handle) {
201,170✔
1535
  SClientUvConn *conn = handle->data;
201,170✔
1536
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
201,170✔
1537
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
197,400✔
1538
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
197,400✔
1539
    task->errCode = 0;
197,400✔
1540
    QUEUE_REMOVE(&task->procTaskQueue);
197,400✔
1541
    uv_sem_post(&task->taskSem);
197,400✔
1542
  }
1543
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
201,170✔
1544
  if (conn->session != NULL) {
201,170✔
1545
    conn->session->udfUvPipe = NULL;
179,216✔
1546
  }
1547
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
201,170✔
1548
  taosMemoryFree(conn->readBuf.buf);
201,170✔
1549
  taosMemoryFree(conn);
201,170✔
1550
  taosMemoryFree((uv_pipe_t *)handle);
201,170✔
1551
}
201,170✔
1552

1553
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
2,011,272✔
1554
  int32_t code = 0;
2,011,272✔
1555
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
2,011,272✔
1556
  if (uvTask->type == UV_TASK_REQ_RSP) {
2,011,272✔
1557
    if (uvTask->rspBuf.base != NULL) {
1,612,702✔
1558
      SUdfResponse rsp = {0};
1,612,702✔
1559
      void        *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
1,612,702✔
1560
      code = rsp.code;
1,612,702✔
1561
      if (code != 0) {
1,612,702✔
1562
        fnError("udfc get udf task result failure. code: %d", code);
4,524✔
1563
      }
1564

1565
      switch (task->type) {
1,612,702✔
1566
        case UDF_TASK_SETUP: {
201,170✔
1567
          task->_setup.rsp = rsp.setupRsp;
201,170✔
1568
          break;
201,170✔
1569
        }
1570
        case UDF_TASK_CALL: {
1,214,132✔
1571
          task->_call.rsp = rsp.callRsp;
1,214,132✔
1572
          break;
1,214,132✔
1573
        }
1574
        case UDF_TASK_TEARDOWN: {
197,400✔
1575
          task->_teardown.rsp = rsp.teardownRsp;
1576
          break;
197,400✔
1577
        }
UNCOV
1578
        default: {
×
UNCOV
1579
          break;
×
1580
        }
1581
      }
1582

1583
      // TODO: the call buffer is setup and freed by udf invocation
1584
      taosMemoryFreeClear(uvTask->rspBuf.base);
1,612,702✔
1585
    } else {
1586
      code = uvTask->errCode;
×
UNCOV
1587
      if (code != 0) {
×
UNCOV
1588
        fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1589
      }
1590
    }
1591
  } else if (uvTask->type == UV_TASK_CONNECT) {
398,570✔
1592
    code = uvTask->errCode;
201,170✔
1593
    if (code != 0) {
201,170✔
1594
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1595
    }
1596
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
197,400✔
1597
    code = uvTask->errCode;
197,400✔
1598
    if (code != 0) {
197,400✔
UNCOV
1599
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1600
    }
1601
  }
1602
  return code;
2,011,272✔
1603
}
1604

1605
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
4,841,876✔
1606
  SClientUvConn  *conn = handle->data;
4,841,876✔
1607
  SClientConnBuf *connBuf = &conn->readBuf;
4,841,876✔
1608

1609
  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
4,841,876✔
1610
  if (connBuf->cap == 0) {
4,841,876✔
1611
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
1,813,872✔
1612
    if (connBuf->buf) {
1,813,872✔
1613
      connBuf->len = 0;
1,813,872✔
1614
      connBuf->cap = msgHeadSize;
1,813,872✔
1615
      connBuf->total = -1;
1,813,872✔
1616

1617
      buf->base = connBuf->buf;
1,813,872✔
1618
      buf->len = connBuf->cap;
1,813,872✔
1619
    } else {
UNCOV
1620
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
×
UNCOV
1621
      buf->base = NULL;
×
UNCOV
1622
      buf->len = 0;
×
1623
    }
1624
  } else if (connBuf->total == -1 && connBuf->len < msgHeadSize) {
3,028,004✔
1625
    buf->base = connBuf->buf + connBuf->len;
1,415,302✔
1626
    buf->len = msgHeadSize - connBuf->len;
1,415,302✔
1627
  } else {
1628
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
1,612,702✔
1629
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
1,612,702✔
1630
    if (resultBuf) {
1,612,702✔
1631
      connBuf->buf = resultBuf;
1,612,702✔
1632
      buf->base = connBuf->buf + connBuf->len;
1,612,702✔
1633
      buf->len = connBuf->cap - connBuf->len;
1,612,702✔
1634
    } else {
UNCOV
1635
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
×
UNCOV
1636
      buf->base = NULL;
×
UNCOV
1637
      buf->len = 0;
×
1638
    }
1639
  }
1640

1641
  fnDebug("udfc uv alloc buffer: cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
4,841,876✔
1642
}
4,841,876✔
1643

1644
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
3,225,404✔
1645
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
3,225,404✔
1646
    connBuf->total = *(int32_t *)(connBuf->buf);
1,612,702✔
1647
  }
1648
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
3,225,404✔
1649
    fnDebug("udfc complete message is received, now handle it");
1,612,702✔
1650
    return true;
1,612,702✔
1651
  }
1652
  return false;
1,612,702✔
1653
}
1654

1655
void udfcUvHandleRsp(SClientUvConn *conn) {
1,612,702✔
1656
  SClientConnBuf *connBuf = &conn->readBuf;
1,612,702✔
1657
  int64_t         seqNum = *(int64_t *)(connBuf->buf + sizeof(int32_t));  // msglen then seqnum
1,612,702✔
1658

1659
  if (QUEUE_EMPTY(&conn->taskQueue)) {
1,612,702✔
UNCOV
1660
    fnError("udfc no task waiting on connection. response seqnum:%" PRId64, seqNum);
×
UNCOV
1661
    return;
×
1662
  }
1663
  bool               found = false;
1,612,702✔
1664
  SClientUvTaskNode *taskFound = NULL;
1,612,702✔
1665
  QUEUE             *h = QUEUE_NEXT(&conn->taskQueue);
1,612,702✔
1666
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
1,612,702✔
1667

1668
  while (h != &conn->taskQueue) {
3,226,166✔
1669
    fnDebug("udfc handle response iterate through queue. uvTask:%" PRId64 "-%p", task->seqNum, task);
1,613,464✔
1670
    if (task->seqNum == seqNum) {
1,613,464✔
1671
      if (found == false) {
1,612,702✔
1672
        found = true;
1,612,702✔
1673
        taskFound = task;
1,612,702✔
1674
      } else {
UNCOV
1675
        fnError("udfc more than one task waiting for the same response");
×
UNCOV
1676
        continue;
×
1677
      }
1678
    }
1679
    h = QUEUE_NEXT(h);
1,613,464✔
1680
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
1,613,464✔
1681
  }
1682

1683
  if (taskFound) {
1,612,702✔
1684
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
1,612,702✔
1685
    QUEUE_REMOVE(&taskFound->connTaskQueue);
1,612,702✔
1686
    QUEUE_REMOVE(&taskFound->procTaskQueue);
1,612,702✔
1687
    uv_sem_post(&taskFound->taskSem);
1,612,702✔
1688
  } else {
UNCOV
1689
    fnError("no task is waiting for the response.");
×
1690
  }
1691
  connBuf->buf = NULL;
1,612,702✔
1692
  connBuf->total = -1;
1,612,702✔
1693
  connBuf->len = 0;
1,612,702✔
1694
  connBuf->cap = 0;
1,612,702✔
1695
}
1696

1697
void udfcUvHandleError(SClientUvConn *conn) {
3,770✔
1698
  fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe);
3,770✔
1699
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
3,770✔
UNCOV
1700
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
×
UNCOV
1701
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
×
UNCOV
1702
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
×
UNCOV
1703
    QUEUE_REMOVE(&task->connTaskQueue);
×
UNCOV
1704
    QUEUE_REMOVE(&task->procTaskQueue);
×
UNCOV
1705
    uv_sem_post(&task->taskSem);
×
1706
  }
1707
  if (!uv_is_closing((uv_handle_t *)conn->pipe)) {
3,770✔
1708
    uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose);
3,770✔
1709
  }
1710
}
3,770✔
1711

1712
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
4,841,876✔
1713
  fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread);
4,841,876✔
1714
  if (nread == 0) return;
4,841,876✔
1715

1716
  SClientUvConn  *conn = client->data;
3,229,174✔
1717
  SClientConnBuf *connBuf = &conn->readBuf;
3,229,174✔
1718
  if (nread > 0) {
3,229,174✔
1719
    connBuf->len += nread;
3,225,404✔
1720
    if (isUdfcUvMsgComplete(connBuf)) {
3,225,404✔
1721
      udfcUvHandleRsp(conn);
1,612,702✔
1722
    }
1723
  }
1724
  if (nread < 0) {
3,229,174✔
1725
    fnError("udfc client pipe %p read error: %zd(%s).", client, nread, uv_strerror(nread));
3,770✔
1726
    if (nread == UV_EOF) {
3,770✔
1727
      fnError("\tudfc client pipe %p closed", client);
3,770✔
1728
    }
1729
    udfcUvHandleError(conn);
3,770✔
1730
  }
1731
}
1732

1733
void onUdfcPipeWrite(uv_write_t *write, int32_t status) {
1,612,702✔
1734
  SClientUvConn *conn = write->data;
1,612,702✔
1735
  if (status < 0) {
1,612,702✔
UNCOV
1736
    fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
×
UNCOV
1737
    udfcUvHandleError(conn);
×
1738
  } else {
1739
    fnDebug("udfc client connection %p write succeed", conn);
1,612,702✔
1740
  }
1741
  taosMemoryFree(write);
1,612,702✔
1742
}
1,612,702✔
1743

1744
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status) {
201,170✔
1745
  SClientUvTaskNode *uvTask = connect->data;
201,170✔
1746
  if (status != 0) {
201,170✔
UNCOV
1747
    fnError("client connect error, task seq: %" PRId64 ", code:%s", uvTask->seqNum, uv_strerror(status));
×
1748
  }
1749
  uvTask->errCode = status;
201,170✔
1750

1751
  int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
201,170✔
1752
  if (code != 0) {
201,170✔
UNCOV
1753
    fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code));
×
1754
    uvTask->errCode = code;
×
1755
  }
1756
  taosMemoryFree(connect);
201,170✔
1757
  QUEUE_REMOVE(&uvTask->procTaskQueue);
201,170✔
1758
  uv_sem_post(&uvTask->taskSem);
201,170✔
1759
}
201,170✔
1760

1761
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask) {
2,011,272✔
1762
  uvTask->type = uvTaskType;
2,011,272✔
1763
  uvTask->udfc = task->session->udfc;
2,011,272✔
1764

1765
  if (uvTaskType == UV_TASK_CONNECT) {
2,011,272✔
1766
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
1,810,102✔
1767
    uvTask->pipe = task->session->udfUvPipe;
1,612,702✔
1768
    SUdfRequest request;
1,612,702✔
1769
    request.type = task->type;
1,612,702✔
1770
    request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
1,612,702✔
1771

1772
    if (task->type == UDF_TASK_SETUP) {
1,612,702✔
1773
      request.setup = task->_setup.req;
201,170✔
1774
      request.type = UDF_TASK_SETUP;
201,170✔
1775
    } else if (task->type == UDF_TASK_CALL) {
1,411,532✔
1776
      request.call = task->_call.req;
1,214,132✔
1777
      request.type = UDF_TASK_CALL;
1,214,132✔
1778
    } else if (task->type == UDF_TASK_TEARDOWN) {
197,400✔
1779
      request.teardown = task->_teardown.req;
197,400✔
1780
      request.type = UDF_TASK_TEARDOWN;
197,400✔
1781
    } else {
UNCOV
1782
      fnError("udfc create uv task, invalid task type : %d", task->type);
×
1783
    }
1784
    int32_t bufLen = encodeUdfRequest(NULL, &request);
1,612,702✔
1785
    if (bufLen <= 0) {
1,612,702✔
UNCOV
1786
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
UNCOV
1787
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1788
    }
1789
    request.msgLen = bufLen;
1,612,702✔
1790
    void *bufBegin = taosMemoryMalloc(bufLen);
1,612,702✔
1791
    if (bufBegin == NULL) {
1,612,702✔
UNCOV
1792
      fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen);
×
1793
      return terrno;
×
1794
    }
1795
    void *buf = bufBegin;
1,612,702✔
1796
    if (encodeUdfRequest(&buf, &request) <= 0) {
1,612,702✔
UNCOV
1797
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
UNCOV
1798
      taosMemoryFree(bufBegin);
×
1799
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1800
    }
1801

1802
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
1,612,702✔
1803
    uvTask->seqNum = request.seqNum;
1,612,702✔
1804
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
197,400✔
1805
    uvTask->pipe = task->session->udfUvPipe;
197,400✔
1806
  }
1807
  if (uv_sem_init(&uvTask->taskSem, 0) != 0) {
2,011,272✔
UNCOV
1808
    if (uvTaskType == UV_TASK_REQ_RSP) {
×
UNCOV
1809
      taosMemoryFreeClear(uvTask->reqBuf.base);
×
1810
    }
UNCOV
1811
    fnError("udfc create uv task, init semaphore failed.");
×
UNCOV
1812
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1813
  }
1814

1815
  return 0;
2,011,272✔
1816
}
1817

1818
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
2,011,272✔
1819
  fnDebug("queue uv task to event loop, uvTask: %d-%p", uvTask->type, uvTask);
2,011,272✔
1820
  SUdfcProxy *udfc = uvTask->udfc;
2,011,272✔
1821
  uv_mutex_lock(&udfc->taskQueueMutex);
2,011,272✔
1822
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
2,011,272✔
1823
  uv_mutex_unlock(&udfc->taskQueueMutex);
2,011,272✔
1824
  int32_t code = uv_async_send(&udfc->loopTaskAync);
2,011,272✔
1825
  if (code != 0) {
2,011,272✔
UNCOV
1826
    fnError("udfc queue uv task to event loop failed. code:%s", uv_strerror(code));
×
UNCOV
1827
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1828
  }
1829

1830
  uv_sem_wait(&uvTask->taskSem);
2,011,272✔
1831
  fnDebug("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
2,011,272✔
1832
  uv_sem_destroy(&uvTask->taskSem);
2,011,272✔
1833

1834
  return 0;
2,011,272✔
1835
}
1836

1837
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
2,011,272✔
1838
  fnDebug("event loop start uv task. uvTask: %" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
2,011,272✔
1839
  int32_t code = 0;
2,011,272✔
1840

1841
  switch (uvTask->type) {
2,011,272✔
1842
    case UV_TASK_CONNECT: {
201,170✔
1843
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
201,170✔
1844
      if (pipe == NULL) {
201,170✔
UNCOV
1845
        fnError("udfc event loop start connect task malloc pipe failed.");
×
UNCOV
1846
        return terrno;
×
1847
      }
1848
      if (uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0) != 0) {
201,170✔
UNCOV
1849
        fnError("udfc event loop start connect task uv_pipe_init failed.");
×
UNCOV
1850
        taosMemoryFree(pipe);
×
UNCOV
1851
        return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1852
      }
1853
      uvTask->pipe = pipe;
201,170✔
1854

1855
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
201,170✔
1856
      if (conn == NULL) {
201,170✔
1857
        fnError("udfc event loop start connect task malloc conn failed.");
×
1858
        taosMemoryFree(pipe);
×
UNCOV
1859
        return terrno;
×
1860
      }
1861
      conn->pipe = pipe;
201,170✔
1862
      conn->readBuf.len = 0;
201,170✔
1863
      conn->readBuf.cap = 0;
201,170✔
1864
      conn->readBuf.buf = 0;
201,170✔
1865
      conn->readBuf.total = -1;
201,170✔
1866
      QUEUE_INIT(&conn->taskQueue);
201,170✔
1867

1868
      pipe->data = conn;
201,170✔
1869

1870
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
201,170✔
1871
      if (connReq == NULL) {
201,170✔
UNCOV
1872
        fnError("udfc event loop start connect task malloc connReq failed.");
×
UNCOV
1873
        taosMemoryFree(pipe);
×
UNCOV
1874
        taosMemoryFree(conn);
×
UNCOV
1875
        return terrno;
×
1876
      }
1877
      connReq->data = uvTask;
201,170✔
1878
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
201,170✔
1879
      code = 0;
201,170✔
1880
      break;
201,170✔
1881
    }
1882
    case UV_TASK_REQ_RSP: {
1,612,702✔
1883
      uv_pipe_t *pipe = uvTask->pipe;
1,612,702✔
1884
      if (pipe == NULL) {
1,612,702✔
UNCOV
1885
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1886
      } else {
1887
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
1,612,702✔
1888
        if (write == NULL) {
1,612,702✔
UNCOV
1889
          fnError("udfc event loop start req_rsp task malloc write failed.");
×
UNCOV
1890
          return terrno;
×
1891
        }
1892
        write->data = pipe->data;
1,612,702✔
1893
        QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
1,612,702✔
1894
        QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
1,612,702✔
1895
        int32_t err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
1,612,702✔
1896
        if (err != 0) {
1,612,702✔
1897
          taosMemoryFree(write);
×
UNCOV
1898
          fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code:%s", uvTask, uv_strerror(err));
×
1899
        }
1900
        code = err;
1,612,702✔
1901
      }
1902
      break;
1,612,702✔
1903
    }
1904
    case UV_TASK_DISCONNECT: {
197,400✔
1905
      uv_pipe_t *pipe = uvTask->pipe;
197,400✔
1906
      if (pipe == NULL) {
197,400✔
UNCOV
1907
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1908
      } else {
1909
        SClientUvConn *conn = pipe->data;
197,400✔
1910
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
197,400✔
1911
        if (!uv_is_closing((uv_handle_t *)uvTask->pipe)) {
197,400✔
1912
          uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
197,400✔
1913
        }
1914
        code = 0;
197,400✔
1915
      }
1916
      break;
197,400✔
1917
    }
UNCOV
1918
    default: {
×
UNCOV
1919
      fnError("udfc event loop unknown task type.") break;
×
1920
    }
1921
  }
1922

1923
  return code;
2,011,272✔
1924
}
1925

1926
void udfcAsyncTaskCb(uv_async_t *async) {
2,011,272✔
1927
  SUdfcProxy *udfc = async->data;
2,011,272✔
1928
  QUEUE       wq;
2,011,272✔
1929

1930
  uv_mutex_lock(&udfc->taskQueueMutex);
2,011,272✔
1931
  QUEUE_MOVE(&udfc->taskQueue, &wq);
2,011,272✔
1932
  uv_mutex_unlock(&udfc->taskQueueMutex);
2,011,272✔
1933

1934
  while (!QUEUE_EMPTY(&wq)) {
4,022,544✔
1935
    QUEUE *h = QUEUE_HEAD(&wq);
2,011,272✔
1936
    QUEUE_REMOVE(h);
2,011,272✔
1937
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
2,011,272✔
1938
    int32_t            code = udfcStartUvTask(task);
2,011,272✔
1939
    if (code == 0) {
2,011,272✔
1940
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
2,011,272✔
1941
    } else {
UNCOV
1942
      task->errCode = code;
×
UNCOV
1943
      uv_sem_post(&task->taskSem);
×
1944
    }
1945
  }
1946
}
2,011,272✔
1947

1948
void cleanUpUvTasks(SUdfcProxy *udfc) {
552,959✔
1949
  fnDebug("clean up uv tasks") QUEUE wq;
552,959✔
1950

1951
  uv_mutex_lock(&udfc->taskQueueMutex);
552,959✔
1952
  QUEUE_MOVE(&udfc->taskQueue, &wq);
552,959✔
1953
  uv_mutex_unlock(&udfc->taskQueueMutex);
552,959✔
1954

1955
  while (!QUEUE_EMPTY(&wq)) {
552,959✔
UNCOV
1956
    QUEUE *h = QUEUE_HEAD(&wq);
×
UNCOV
1957
    QUEUE_REMOVE(h);
×
UNCOV
1958
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
×
UNCOV
1959
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
UNCOV
1960
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1961
    }
UNCOV
1962
    uv_sem_post(&task->taskSem);
×
1963
  }
1964

1965
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
552,959✔
1966
    QUEUE *h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
×
1967
    QUEUE_REMOVE(h);
×
UNCOV
1968
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
×
1969
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
UNCOV
1970
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1971
    }
UNCOV
1972
    uv_sem_post(&task->taskSem);
×
1973
  }
1974
}
552,959✔
1975

1976
void udfStopAsyncCb(uv_async_t *async) {
552,959✔
1977
  SUdfcProxy *udfc = async->data;
552,959✔
1978
  cleanUpUvTasks(udfc);
552,959✔
1979
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
552,959✔
1980
    uv_stop(&udfc->uvLoop);
552,959✔
1981
  }
1982
}
552,959✔
1983

1984
void constructUdfService(void *argsThread) {
552,959✔
1985
  int32_t     code = 0, lino = 0;
552,959✔
1986
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
552,959✔
1987
  code = uv_loop_init(&udfc->uvLoop);
552,959✔
1988
  TAOS_CHECK_GOTO(code, &lino, _exit);
552,959✔
1989

1990
  code = uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
552,959✔
1991
  TAOS_CHECK_GOTO(code, &lino, _exit);
552,959✔
1992
  udfc->loopTaskAync.data = udfc;
552,959✔
1993
  code = uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
552,959✔
1994
  TAOS_CHECK_GOTO(code, &lino, _exit);
552,959✔
1995
  udfc->loopStopAsync.data = udfc;
552,959✔
1996
  code = uv_mutex_init(&udfc->taskQueueMutex);
552,959✔
1997
  TAOS_CHECK_GOTO(code, &lino, _exit);
552,959✔
1998
  QUEUE_INIT(&udfc->taskQueue);
552,959✔
1999
  QUEUE_INIT(&udfc->uvProcTaskQueue);
552,959✔
2000
  (void)uv_barrier_wait(&udfc->initBarrier);
552,959✔
2001
  // TODO return value of uv_run
2002
  int32_t num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
552,959✔
2003
  fnInfo("udfc uv loop exit. active handle num: %d", num);
552,959✔
2004
  (void)uv_loop_close(&udfc->uvLoop);
552,959✔
2005

2006
  uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL);
552,959✔
2007
  num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
552,959✔
2008
  fnInfo("udfc uv loop exit. active handle num: %d", num);
552,959✔
2009

2010
  (void)uv_loop_close(&udfc->uvLoop);
552,959✔
2011
_exit:
552,959✔
2012
  if (code != 0) {
552,959✔
UNCOV
2013
    fnError("udfc construct error. code: %d, line: %d", code, lino);
×
2014
  }
2015
  fnInfo("udfc construct finished");
552,959✔
2016
}
552,959✔
2017

2018
int32_t udfcOpen() {
615,697✔
2019
  int32_t code = 0, lino = 0;
615,697✔
2020
  int8_t  old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
615,697✔
2021
  if (old == 1) {
615,697✔
2022
    return 0;
62,738✔
2023
  }
2024
  SUdfcProxy *proxy = &gUdfcProxy;
552,959✔
2025
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
552,959✔
2026
  proxy->udfcState = UDFC_STATE_STARTNG;
552,959✔
2027
  code = uv_barrier_init(&proxy->initBarrier, 2);
552,959✔
2028
  TAOS_CHECK_GOTO(code, &lino, _exit);
552,959✔
2029
  code = uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
552,959✔
2030
  TAOS_CHECK_GOTO(code, &lino, _exit);
552,959✔
2031
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
552,959✔
2032
  proxy->udfcState = UDFC_STATE_READY;
552,959✔
2033
  (void)uv_barrier_wait(&proxy->initBarrier);
552,959✔
2034
  TAOS_CHECK_GOTO(code, &lino, _exit);
552,959✔
2035
  code = uv_mutex_init(&proxy->udfStubsMutex);
552,959✔
2036
  TAOS_CHECK_GOTO(code, &lino, _exit);
552,959✔
2037
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
552,959✔
2038
  if (proxy->udfStubs == NULL) {
552,959✔
UNCOV
2039
    fnError("udfc init failed. udfStubs: %p", proxy->udfStubs);
×
UNCOV
2040
    return -1;
×
2041
  }
2042
  proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
552,959✔
2043
  if (proxy->expiredUdfStubs == NULL) {
552,959✔
UNCOV
2044
    taosArrayDestroy(proxy->udfStubs);
×
UNCOV
2045
    fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs);
×
2046
    return -1;
×
2047
  }
2048
  code = uv_mutex_init(&proxy->udfcUvMutex);
552,959✔
2049
  TAOS_CHECK_GOTO(code, &lino, _exit);
552,959✔
2050
_exit:
552,959✔
2051
  if (code != 0) {
552,959✔
2052
    fnError("udfc open error. code: %d, line: %d", code, lino);
×
2053
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
2054
  }
2055
  fnInfo("udfc initialized");
552,959✔
2056
  return 0;
552,959✔
2057
}
2058

2059
int32_t udfcClose() {
555,912✔
2060
  int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 1, 0);
555,912✔
2061
  if (old == 0) {
555,912✔
2062
    return 0;
2,953✔
2063
  }
2064

2065
  SUdfcProxy *udfc = &gUdfcProxy;
552,959✔
2066
  udfc->udfcState = UDFC_STATE_STOPPING;
552,959✔
2067
  if (uv_async_send(&udfc->loopStopAsync) != 0) {
552,959✔
UNCOV
2068
    fnError("udfc close error to send stop async");
×
2069
  }
2070
  if (uv_thread_join(&udfc->loopThread) != 0) {
552,959✔
UNCOV
2071
    fnError("udfc close errir to join loop thread");
×
2072
  }
2073
  uv_mutex_destroy(&udfc->taskQueueMutex);
552,959✔
2074
  uv_barrier_destroy(&udfc->initBarrier);
552,959✔
2075
  taosArrayDestroy(udfc->expiredUdfStubs);
552,959✔
2076
  taosArrayDestroy(udfc->udfStubs);
552,959✔
2077
  uv_mutex_destroy(&udfc->udfStubsMutex);
552,959✔
2078
  uv_mutex_destroy(&udfc->udfcUvMutex);
552,959✔
2079
  udfc->udfcState = UDFC_STATE_INITAL;
552,959✔
2080
  fnInfo("udfc is cleaned up");
552,959✔
2081
  return 0;
552,959✔
2082
}
2083

2084
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
2,011,272✔
2085
  int32_t            code = 0, lino = 0;
2,011,272✔
2086
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
2,011,272✔
2087
  if (uvTask == NULL) {
2,011,272✔
UNCOV
2088
    fnError("udfc client task: %p failed to allocate memory for uvTask", task);
×
UNCOV
2089
    return terrno;
×
2090
  }
2091
  fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
2,011,272✔
2092

2093
  code = udfcInitializeUvTask(task, uvTaskType, uvTask);
2,011,272✔
2094
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,011,272✔
2095
  code = udfcQueueUvTask(uvTask);
2,011,272✔
2096
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,011,272✔
2097
  code = udfcGetUdfTaskResultFromUvTask(task, uvTask);
2,011,272✔
2098
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,011,272✔
2099
  if (uvTaskType == UV_TASK_CONNECT) {
2,006,748✔
2100
    task->session->udfUvPipe = uvTask->pipe;
201,170✔
2101
    SClientUvConn *conn = uvTask->pipe->data;
201,170✔
2102
    conn->session = task->session;
201,170✔
2103
  }
2104

2105
_exit:
2,011,272✔
2106
  if (code != 0) {
2,011,272✔
2107
    fnError("udfc run udf uv task failure. task: %p, uvTask: %p, err: %d, line: %d", task, uvTask, code, lino);
4,524✔
2108
  }
2109
  taosMemoryFree(uvTask->reqBuf.base);
2,011,272✔
2110
  uvTask->reqBuf.base = NULL;
2,011,272✔
2111
  taosMemoryFree(uvTask);
2,011,272✔
2112
  uvTask = NULL;
2,011,272✔
2113
  return code;
2,011,272✔
2114
}
2115

2116
static void freeTaskSession(SClientUdfTask *task) {
201,170✔
2117
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
201,170✔
2118
  if (task->session->udfUvPipe != NULL && task->session->udfUvPipe->data != NULL) {
201,170✔
2119
    SClientUvConn *conn = task->session->udfUvPipe->data;
21,954✔
2120
    conn->session = NULL;
21,954✔
2121
  }
2122
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
201,170✔
2123
  taosMemoryFreeClear(task->session);
201,170✔
2124
}
201,170✔
2125

2126
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
201,170✔
2127
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
201,170✔
2128
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
201,170✔
2129
  if (task == NULL) {
201,170✔
UNCOV
2130
    fnError("doSetupUdf, failed to allocate memory for task");
×
UNCOV
2131
    return terrno;
×
2132
  }
2133
  task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
201,170✔
2134
  if (task->session == NULL) {
201,170✔
UNCOV
2135
    fnError("doSetupUdf, failed to allocate memory for session");
×
UNCOV
2136
    taosMemoryFree(task);
×
2137
    return terrno;
×
2138
  }
2139
  task->session->udfc = &gUdfcProxy;
201,170✔
2140
  task->type = UDF_TASK_SETUP;
201,170✔
2141

2142
  SUdfSetupRequest *req = &task->_setup.req;
201,170✔
2143
  tstrncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
201,170✔
2144

2145
  code = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
201,170✔
2146
  TAOS_CHECK_GOTO(code, &lino, _exit);
201,170✔
2147

2148
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
201,170✔
2149
  TAOS_CHECK_GOTO(code, &lino, _exit);
201,170✔
2150

2151
  SUdfSetupResponse *rsp = &task->_setup.rsp;
197,400✔
2152
  task->session->severHandle = rsp->udfHandle;
197,400✔
2153
  task->session->outputType = rsp->outputType;
197,400✔
2154
  task->session->bytes = rsp->bytes;
197,400✔
2155
  task->session->bufSize = rsp->bufSize;
197,400✔
2156
  tstrncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
197,400✔
2157
  fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
197,400✔
2158
  *funcHandle = task->session;
197,400✔
2159
  taosMemoryFree(task);
197,400✔
2160
  return 0;
197,400✔
2161

2162
_exit:
3,770✔
2163
  if (code != 0) {
3,770✔
2164
    fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino);
3,770✔
2165
  }
2166
  freeTaskSession(task);
3,770✔
2167
  taosMemoryFree(task);
3,770✔
2168
  return code;
3,770✔
2169
}
2170

2171
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1,214,132✔
2172
                SSDataBlock *output, SUdfInterBuf *newState) {
2173
  fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
1,214,132✔
2174
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
1,214,132✔
2175
  if (session->udfUvPipe == NULL) {
1,214,132✔
UNCOV
2176
    fnError("No pipe to udfd");
×
UNCOV
2177
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2178
  }
2179
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1,214,132✔
2180
  if (task == NULL) {
1,214,132✔
UNCOV
2181
    fnError("udfc call udf. failed to allocate memory for task");
×
UNCOV
2182
    return terrno;
×
2183
  }
2184
  task->session = (SUdfcUvSession *)handle;
1,214,132✔
2185
  task->type = UDF_TASK_CALL;
1,214,132✔
2186

2187
  SUdfCallRequest *req = &task->_call.req;
1,214,132✔
2188
  req->udfHandle = task->session->severHandle;
1,214,132✔
2189
  req->callType = callType;
1,214,132✔
2190

2191
  switch (callType) {
1,214,132✔
2192
    case TSDB_UDF_CALL_AGG_INIT: {
219,450✔
2193
      req->initFirst = 1;
219,450✔
2194
      break;
219,450✔
2195
    }
2196
    case TSDB_UDF_CALL_AGG_PROC: {
517,868✔
2197
      req->block = *input;
517,868✔
2198
      req->interBuf = *state;
517,868✔
2199
      break;
517,868✔
2200
    }
2201
    // case TSDB_UDF_CALL_AGG_MERGE: {
2202
    //   req->interBuf = *state;
2203
    //   req->interBuf2 = *state2;
2204
    //   break;
2205
    // }
2206
    case TSDB_UDF_CALL_AGG_FIN: {
219,450✔
2207
      req->interBuf = *state;
219,450✔
2208
      break;
219,450✔
2209
    }
2210
    case TSDB_UDF_CALL_SCALA_PROC: {
257,364✔
2211
      req->block = *input;
257,364✔
2212
      break;
257,364✔
2213
    }
2214
  }
2215

2216
  int32_t code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1,214,132✔
2217
  if (code != 0) {
1,214,132✔
2218
    fnError("call udf failure. udfcRunUdfUvTask err: %d", code);
754✔
2219
  } else {
2220
    SUdfCallResponse *rsp = &task->_call.rsp;
1,213,378✔
2221
    switch (callType) {
1,213,378✔
2222
      case TSDB_UDF_CALL_AGG_INIT: {
219,450✔
2223
        *newState = rsp->resultBuf;
219,450✔
2224
        break;
219,450✔
2225
      }
2226
      case TSDB_UDF_CALL_AGG_PROC: {
517,868✔
2227
        *newState = rsp->resultBuf;
517,868✔
2228
        break;
517,868✔
2229
      }
2230
      // case TSDB_UDF_CALL_AGG_MERGE: {
2231
      //   *newState = rsp->resultBuf;
2232
      //   break;
2233
      // }
2234
      case TSDB_UDF_CALL_AGG_FIN: {
219,450✔
2235
        *newState = rsp->resultBuf;
219,450✔
2236
        break;
219,450✔
2237
      }
2238
      case TSDB_UDF_CALL_SCALA_PROC: {
256,610✔
2239
        *output = rsp->resultData;
256,610✔
2240
        break;
256,610✔
2241
      }
2242
    }
2243
  }
2244
  taosMemoryFree(task);
1,214,132✔
2245
  return code;
1,214,132✔
2246
}
2247

2248
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
219,450✔
2249
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;
219,450✔
2250

2251
  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
219,450✔
2252

2253
  return err;
219,450✔
2254
}
2255

2256
// input: block, state
2257
// output: interbuf,
2258
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
517,868✔
2259
  int8_t  callType = TSDB_UDF_CALL_AGG_PROC;
517,868✔
2260
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
517,868✔
2261
  return err;
517,868✔
2262
}
2263

2264
// input: interbuf1, interbuf2
2265
// output: resultBuf
2266
// udf todo:  aggmerge
2267
// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
2268
//                           SUdfInterBuf *resultBuf) {
2269
//   int8_t  callType = TSDB_UDF_CALL_AGG_MERGE;
2270
//   int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
2271
//   return err;
2272
// }
2273

2274
// input: interBuf
2275
// output: resultData
2276
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
219,450✔
2277
  int8_t  callType = TSDB_UDF_CALL_AGG_FIN;
219,450✔
2278
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
219,450✔
2279
  return err;
219,450✔
2280
}
2281

2282
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
257,364✔
2283
  int8_t      callType = TSDB_UDF_CALL_SCALA_PROC;
257,364✔
2284
  SSDataBlock inputBlock = {0};
257,364✔
2285
  int32_t     code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
257,364✔
2286
  if (code != 0) {
257,364✔
UNCOV
2287
    fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code);
×
UNCOV
2288
    return code;
×
2289
  }
2290
  SSDataBlock resultBlock = {0};
257,364✔
2291
  int32_t     err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
257,364✔
2292
  if (err == 0) {
257,364✔
2293
    err = convertDataBlockToScalarParm(&resultBlock, output);
256,610✔
2294
  }
2295
  taosArrayDestroy(resultBlock.pDataBlock);
257,364✔
2296

2297
  blockDataFreeRes(&inputBlock);
257,364✔
2298
  return err;
257,364✔
2299
}
2300

2301
int32_t doTeardownUdf(UdfcFuncHandle handle) {
197,400✔
2302
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
197,400✔
2303
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
197,400✔
2304

2305
  if (session->udfUvPipe == NULL) {
197,400✔
UNCOV
2306
    fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
×
UNCOV
2307
    taosMemoryFree(session);
×
UNCOV
2308
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2309
  }
2310

2311
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
197,400✔
2312
  if (task == NULL) {
197,400✔
2313
    fnError("doTeardownUdf, failed to allocate memory for task");
×
2314
    taosMemoryFree(session);
×
2315
    return terrno;
×
2316
  }
2317
  task->session = session;
197,400✔
2318
  task->type = UDF_TASK_TEARDOWN;
197,400✔
2319

2320
  SUdfTeardownRequest *req = &task->_teardown.req;
197,400✔
2321
  req->udfHandle = task->session->severHandle;
197,400✔
2322

2323
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
197,400✔
2324
  TAOS_CHECK_GOTO(code, &lino, _exit);
197,400✔
2325

2326
  code = udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
197,400✔
2327
  TAOS_CHECK_GOTO(code, &lino, _exit);
197,400✔
2328

2329
  fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
197,400✔
2330
  // TODO: synchronization refactor between libuv event loop and request thread
2331
  // uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
2332
  // if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
2333
  //   SClientUvConn *conn = session->udfUvPipe->data;
2334
  //   conn->session = NULL;
2335
  // }
2336
  // uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
2337

2338
_exit:
197,400✔
2339
  if (code != 0) {
197,400✔
UNCOV
2340
    fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino);
×
2341
  }
2342
  freeTaskSession(task);
197,400✔
2343
  taosMemoryFree(task);
197,400✔
2344

2345
  return code;
197,400✔
2346
}
2347
#else
2348
#include "tudf.h"
2349

2350
int32_t cleanUpUdfs() { return 0; }
2351
int32_t udfcOpen() { return 0; }
2352
int32_t udfcClose() { return 0; }
2353
int32_t udfStartUdfd(int32_t startDnodeId) { return 0; }
2354
void    udfStopUdfd() { return; }
2355
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
2356
  return TSDB_CODE_OPS_NOT_SUPPORT;
2357
}
2358
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc