• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3526

10 Nov 2024 03:50AM UTC coverage: 60.225% (-0.6%) from 60.818%
#3526

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

117031 of 249004 branches covered (47.0%)

Branch coverage included in aggregate %.

130 of 169 new or added lines in 23 files covered. (76.92%)

4149 existing lines in 176 files now uncovered.

197577 of 273386 relevant lines covered (72.27%)

5840219.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.49
/source/libs/function/src/tudf.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "uv.h"
16

17
#include "os.h"
18

19
#include "builtinsimpl.h"
20
#include "fnLog.h"
21
#include "functionMgt.h"
22
#include "querynodes.h"
23
#include "tarray.h"
24
#include "tdatablock.h"
25
#include "tglobal.h"
26
#include "tudf.h"
27
#include "tudfInt.h"
28

29
#ifdef _TD_DARWIN_64
30
#include <mach-o/dyld.h>
31
#endif
32

33
typedef struct SUdfdData {
34
  bool         startCalled;
35
  bool         needCleanUp;
36
  uv_loop_t    loop;
37
  uv_thread_t  thread;
38
  uv_barrier_t barrier;
39
  uv_process_t process;
40
#ifdef WINDOWS
41
  HANDLE jobHandle;
42
#endif
43
  int32_t    spawnErr;
44
  uv_pipe_t  ctrlPipe;
45
  uv_async_t stopAsync;
46
  int32_t    stopCalled;
47

48
  int32_t dnodeId;
49
} SUdfdData;
50

51
SUdfdData udfdGlobal = {0};
52

53
int32_t udfStartUdfd(int32_t startDnodeId);
54
void    udfStopUdfd();
55

56
extern char **environ;
57

58
static int32_t udfSpawnUdfd(SUdfdData *pData);
59
void           udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
60
static void    udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
61
static void    udfUdfdStopAsyncCb(uv_async_t *async);
62
static void    udfWatchUdfd(void *args);
63

64
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
3✔
65
  fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
3!
66
  SUdfdData *pData = process->data;
3✔
67
  if(pData == NULL) {
3!
NEW
68
    fnError("udfd process data is NULL");
×
NEW
69
    return;
×
70
  }
71
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
3!
72
    fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
×
73
  } else {
74
    fnInfo("udfd process restart");
3!
75
    int32_t code = udfSpawnUdfd(pData);
3✔
76
    if (code != 0) {
3!
UNCOV
77
      fnError("udfd process restart failed with code:%d", code);
×
78
    }
79
  }
80
}
81

82
static int32_t udfSpawnUdfd(SUdfdData *pData) {
2,364✔
83
  fnInfo("start to init udfd");
2,364!
84

85
  int32_t              err = 0;
2,364✔
86
  uv_process_options_t options = {0};
2,364✔
87

88
  char path[PATH_MAX] = {0};
2,364✔
89
  if (tsProcPath == NULL) {
2,364!
UNCOV
90
    path[0] = '.';
×
91
#ifdef WINDOWS
92
    GetModuleFileName(NULL, path, PATH_MAX);
93
    TAOS_DIRNAME(path);
94
#elif defined(_TD_DARWIN_64)
95
    uint32_t pathSize = sizeof(path);
96
    _NSGetExecutablePath(path, &pathSize);
97
    TAOS_DIRNAME(path);
98
#endif
99
  } else {
100
    TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
2,364✔
101
    TAOS_DIRNAME(path);
2,364✔
102
  }
103
#ifdef WINDOWS
104
  if (strlen(path) == 0) {
105
    TAOS_STRCAT(path, "C:\\TDengine");
106
  }
107
  TAOS_STRCAT(path, "\\udfd.exe");
108
#else
109
  if (strlen(path) == 0) {
2,364!
UNCOV
110
    TAOS_STRCAT(path, "/usr/bin");
×
111
  }
112
  TAOS_STRCAT(path, "/udfd");
2,364✔
113
#endif
114
  char *argsUdfd[] = {path, "-c", configDir, NULL};
2,364✔
115
  options.args = argsUdfd;
2,364✔
116
  options.file = path;
2,364✔
117

118
  options.exit_cb = udfUdfdExit;
2,364✔
119

120
  TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
2,364!
121

122
  uv_stdio_container_t child_stdio[3];
123
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
2,364✔
124
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
2,364✔
125
  child_stdio[1].flags = UV_IGNORE;
2,364✔
126
  child_stdio[2].flags = UV_INHERIT_FD;
2,364✔
127
  child_stdio[2].data.fd = 2;
2,364✔
128
  options.stdio_count = 3;
2,364✔
129
  options.stdio = child_stdio;
2,364✔
130

131
  options.flags = UV_PROCESS_DETACHED;
2,364✔
132

133
  char dnodeIdEnvItem[32] = {0};
2,364✔
134
  char thrdPoolSizeEnvItem[32] = {0};
2,364✔
135
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
2,364✔
136

137
  float   numCpuCores = 4;
2,364✔
138
  int32_t code = taosGetCpuCores(&numCpuCores, false);
2,364✔
139
  if (code != 0) {
2,364!
UNCOV
140
    fnError("failed to get cpu cores, code:0x%x", code);
×
141
  }
142
  numCpuCores = TMAX(numCpuCores, 2);
2,364!
143
  snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);
2,364✔
144

145
  char    pathTaosdLdLib[512] = {0};
2,364✔
146
  size_t  taosdLdLibPathLen = sizeof(pathTaosdLdLib);
2,364✔
147
  int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
2,364✔
148
  if (ret != UV_ENOBUFS) {
2,364!
149
    taosdLdLibPathLen = strlen(pathTaosdLdLib);
2,364✔
150
  }
151

152
  char   udfdPathLdLib[1024] = {0};
2,364✔
153
  size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
2,364✔
154
  tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));
2,364✔
155

156
  udfdPathLdLib[udfdLdLibPathLen] = ':';
2,364✔
157
  tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
2,364✔
158
  if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
2,364!
159
    fnInfo("[UDFD]udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
2,364!
160
  } else {
UNCOV
161
    fnError("[UDFD]can not set correct udfd LD_LIBRARY_PATH");
×
162
  }
163
  char ldLibPathEnvItem[1024 + 32] = {0};
2,364✔
164
  snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
2,364✔
165

166
  char *taosFqdnEnvItem = NULL;
2,364✔
167
  char *taosFqdn = getenv("TAOS_FQDN");
2,364✔
168
  if (taosFqdn != NULL) {
2,364!
UNCOV
169
    int32_t subLen = strlen(taosFqdn);
×
UNCOV
170
    int32_t len = strlen("TAOS_FQDN=") + subLen + 1;
×
UNCOV
171
    taosFqdnEnvItem = taosMemoryMalloc(len);
×
UNCOV
172
    if (taosFqdnEnvItem != NULL) {
×
173
      tstrncpy(taosFqdnEnvItem, "TAOS_FQDN=", len);
×
174
      TAOS_STRNCAT(taosFqdnEnvItem, taosFqdn, subLen);
×
175
      fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
×
176
    } else {
177
      fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
×
178
      return terrno;
×
179
    }
180
  }
181

182
  char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};
2,364✔
183

184
  char **envUdfdWithPEnv = NULL;
2,364✔
185
  if (environ != NULL) {
2,364!
186
    int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
2,364✔
187
    int32_t numEnviron = 0;
2,364✔
188
    while (environ[numEnviron] != NULL) {
116,653✔
189
      numEnviron++;
114,289✔
190
    }
191

192
    envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
2,364✔
193
    if (envUdfdWithPEnv == NULL) {
2,364!
UNCOV
194
      err = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
195
      goto _OVER;
×
196
    }
197

198
    for (int32_t i = 0; i < numEnviron; i++) {
116,653✔
199
      int32_t len = strlen(environ[i]) + 1;
114,289✔
200
      envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
114,289✔
201
      if (envUdfdWithPEnv[i] == NULL) {
114,289!
UNCOV
202
        err = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
203
        goto _OVER;
×
204
      }
205

206
      tstrncpy(envUdfdWithPEnv[i], environ[i], len);
114,289✔
207
    }
208

209
    for (int32_t i = 0; i < lenEnvUdfd; i++) {
14,184✔
210
      if (envUdfd[i] != NULL) {
11,820✔
211
        int32_t len = strlen(envUdfd[i]) + 1;
7,092✔
212
        envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
7,092✔
213
        if (envUdfdWithPEnv[numEnviron + i] == NULL) {
7,092!
UNCOV
214
          err = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
215
          goto _OVER;
×
216
        }
217

218
        tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
7,092✔
219
      }
220
    }
221
    envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
2,364✔
222

223
    options.env = envUdfdWithPEnv;
2,364✔
224
  } else {
UNCOV
225
    options.env = envUdfd;
×
226
  }
227

228
  err = uv_spawn(&pData->loop, &pData->process, &options);
2,364✔
229
  pData->process.data = (void *)pData;
2,364✔
230

231
#ifdef WINDOWS
232
  // End udfd.exe by Job.
233
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
234
  pData->jobHandle = CreateJobObject(NULL, NULL);
235
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
236
  if (!add_job_ok) {
237
    fnError("Assign udfd to job failed.");
238
  } else {
239
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
240
    memset(&limit_info, 0x0, sizeof(limit_info));
241
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
242
    bool set_auto_kill_ok =
243
        SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
244
    if (!set_auto_kill_ok) {
245
      fnError("Set job auto kill udfd failed.");
246
    }
247
  }
248
#endif
249

250
  if (err != 0) {
2,364!
UNCOV
251
    fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
×
252
  } else {
253
    fnInfo("udfd is initialized");
2,364!
254
  }
255

UNCOV
256
_OVER:
×
257
  if (taosFqdnEnvItem) {
2,364!
UNCOV
258
    taosMemoryFree(taosFqdnEnvItem);
×
259
  }
260

261
  if (envUdfdWithPEnv != NULL) {
2,364!
262
    int32_t i = 0;
2,364✔
263
    while (envUdfdWithPEnv[i] != NULL) {
123,745✔
264
      taosMemoryFree(envUdfdWithPEnv[i]);
121,381✔
265
      i++;
121,381✔
266
    }
267
    taosMemoryFree(envUdfdWithPEnv);
2,364✔
268
  }
269

270
  return err;
2,364✔
271
}
272

273
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) {
11,887✔
274
  if (!uv_is_closing(handle)) {
11,887!
275
    uv_close(handle, NULL);
11,887✔
276
  }
277
}
11,887✔
278

279
static void udfUdfdStopAsyncCb(uv_async_t *async) {
2,361✔
280
  SUdfdData *pData = async->data;
2,361✔
281
  uv_stop(&pData->loop);
2,361✔
282
}
2,361✔
283

284
static void udfWatchUdfd(void *args) {
2,361✔
285
  SUdfdData *pData = args;
2,361✔
286
  TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
2,361!
287
  TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb));
2,361!
288
  pData->stopAsync.data = pData;
2,361✔
289
  TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData));
2,361!
290
  atomic_store_32(&pData->spawnErr, 0);
2,361✔
291
  (void)uv_barrier_wait(&pData->barrier);
2,361✔
292
  int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
2,361✔
293
  fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
2,361!
294

295
  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
2,361✔
296
  num = uv_run(&pData->loop, UV_RUN_DEFAULT);
2,361✔
297
  fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
2,361!
298
  if (uv_loop_close(&pData->loop) != 0) {
2,361!
UNCOV
299
    fnError("udfd loop close failed, lino:%d", __LINE__);
×
300
  }
301
  return;
2,361✔
302

303
_exit:
×
UNCOV
304
  if (terrno != 0) {
×
UNCOV
305
    (void)uv_barrier_wait(&pData->barrier);
×
UNCOV
306
    atomic_store_32(&pData->spawnErr, terrno);
×
307
    if (uv_loop_close(&pData->loop) != 0) {
×
308
      fnError("udfd loop close failed, lino:%d", __LINE__);
×
309
    }
310
    fnError("udfd thread exit with code:%d lino:%d", terrno, terrln);
×
311
    terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
312
  }
UNCOV
313
  return;
×
314
}
315

316
int32_t udfStartUdfd(int32_t startDnodeId) {
2,362✔
317
  int32_t code = 0, lino = 0;
2,362✔
318
  if (!tsStartUdfd) {
2,362✔
319
    fnInfo("start udfd is disabled.") return 0;
1!
320
  }
321
  SUdfdData *pData = &udfdGlobal;
2,361✔
322
  if (pData->startCalled) {
2,361!
UNCOV
323
    fnInfo("dnode start udfd already called");
×
UNCOV
324
    return 0;
×
325
  }
326
  pData->startCalled = true;
2,361✔
327
  char dnodeId[8] = {0};
2,361✔
328
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
2,361✔
329
  TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
2,361!
330
  pData->dnodeId = startDnodeId;
2,361✔
331

332
  TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
2,361!
333
  TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, udfWatchUdfd, pData), &lino, _exit);
2,361!
334
  (void)uv_barrier_wait(&pData->barrier);
2,361✔
335
  int32_t err = atomic_load_32(&pData->spawnErr);
2,361✔
336
  if (err != 0) {
2,361!
UNCOV
337
    uv_barrier_destroy(&pData->barrier);
×
UNCOV
338
    if (uv_async_send(&pData->stopAsync) != 0) {
×
UNCOV
339
      fnError("start udfd: failed to send stop async");
×
340
    }
341
    if (uv_thread_join(&pData->thread) != 0) {
×
342
      fnError("start udfd: failed to join udfd thread");
×
343
    }
UNCOV
344
    pData->needCleanUp = false;
×
345
    fnInfo("udfd is cleaned up after spawn err");
×
346
    TAOS_CHECK_GOTO(err, &lino, _exit);
×
347
  } else {
348
    pData->needCleanUp = true;
2,361✔
349
  }
350
_exit:
2,361✔
351
  if (code != 0) {
2,361!
UNCOV
352
    fnError("udfd start failed with code:%d, lino:%d", code, lino);
×
353
  }
354
  return code;
2,361✔
355
}
356

357
void udfStopUdfd() {
2,362✔
358
  SUdfdData *pData = &udfdGlobal;
2,362✔
359
  fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
2,362!
360
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
2,362!
361
    return;
1✔
362
  }
363
  atomic_store_32(&pData->stopCalled, 1);
2,361✔
364
  pData->needCleanUp = false;
2,361✔
365
  uv_barrier_destroy(&pData->barrier);
2,361✔
366
  if (uv_async_send(&pData->stopAsync) != 0) {
2,361!
UNCOV
367
    fnError("stop udfd: failed to send stop async");
×
368
  }
369
  if (uv_thread_join(&pData->thread) != 0) {
2,361!
UNCOV
370
    fnError("stop udfd: failed to join udfd thread");
×
371
  }
372

373
#ifdef WINDOWS
374
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
375
#endif
376
  fnInfo("udfd is cleaned up");
2,361!
377
  return;
2,361✔
378
}
379

380
/*
381
int32_t udfGetUdfdPid(int32_t* pUdfdPid) {
382
  SUdfdData *pData = &udfdGlobal;
383
  if (pData->spawnErr) {
384
    return pData->spawnErr;
385
  }
386
  uv_pid_t pid = uv_process_get_pid(&pData->process);
387
  if (pUdfdPid) {
388
    *pUdfdPid = (int32_t)pid;
389
  }
390
  return TSDB_CODE_SUCCESS;
391
}
392
*/
393

394
//==============================================================================================
395
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
396
 * The QUEUE is copied from queue.h under libuv
397
 * */
398

399
typedef void *QUEUE[2];
400

401
/* Private macros. */
402
#define QUEUE_NEXT(q)      (*(QUEUE **)&((*(q))[0]))
403
#define QUEUE_PREV(q)      (*(QUEUE **)&((*(q))[1]))
404
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
405
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
406

407
/* Public macros. */
408
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr) - offsetof(type, field)))
409

410
/* Important note: mutating the list while QUEUE_FOREACH is
411
 * iterating over its elements results in undefined behavior.
412
 */
413
#define QUEUE_FOREACH(q, h) for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
414

415
#define QUEUE_EMPTY(q) ((const QUEUE *)(q) == (const QUEUE *)QUEUE_NEXT(q))
416

417
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
418

419
#define QUEUE_INIT(q)    \
420
  do {                   \
421
    QUEUE_NEXT(q) = (q); \
422
    QUEUE_PREV(q) = (q); \
423
  } while (0)
424

425
#define QUEUE_ADD(h, n)                 \
426
  do {                                  \
427
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
428
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
429
    QUEUE_PREV(h) = QUEUE_PREV(n);      \
430
    QUEUE_PREV_NEXT(h) = (h);           \
431
  } while (0)
432

433
#define QUEUE_SPLIT(h, q, n)       \
434
  do {                             \
435
    QUEUE_PREV(n) = QUEUE_PREV(h); \
436
    QUEUE_PREV_NEXT(n) = (n);      \
437
    QUEUE_NEXT(n) = (q);           \
438
    QUEUE_PREV(h) = QUEUE_PREV(q); \
439
    QUEUE_PREV_NEXT(h) = (h);      \
440
    QUEUE_PREV(q) = (n);           \
441
  } while (0)
442

443
#define QUEUE_MOVE(h, n)        \
444
  do {                          \
445
    if (QUEUE_EMPTY(h))         \
446
      QUEUE_INIT(n);            \
447
    else {                      \
448
      QUEUE *q = QUEUE_HEAD(h); \
449
      QUEUE_SPLIT(h, q, n);     \
450
    }                           \
451
  } while (0)
452

453
#define QUEUE_INSERT_HEAD(h, q)    \
454
  do {                             \
455
    QUEUE_NEXT(q) = QUEUE_NEXT(h); \
456
    QUEUE_PREV(q) = (h);           \
457
    QUEUE_NEXT_PREV(q) = (q);      \
458
    QUEUE_NEXT(h) = (q);           \
459
  } while (0)
460

461
#define QUEUE_INSERT_TAIL(h, q)    \
462
  do {                             \
463
    QUEUE_NEXT(q) = (h);           \
464
    QUEUE_PREV(q) = QUEUE_PREV(h); \
465
    QUEUE_PREV_NEXT(q) = (q);      \
466
    QUEUE_PREV(h) = (q);           \
467
  } while (0)
468

469
#define QUEUE_REMOVE(q)                 \
470
  do {                                  \
471
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
472
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
473
  } while (0)
474

475
enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 };
476

477
int64_t gUdfTaskSeqNum = 0;
478
typedef struct SUdfcFuncStub {
479
  char           udfName[TSDB_FUNC_NAME_LEN + 1];
480
  UdfcFuncHandle handle;
481
  int32_t        refCount;
482
  int64_t        createTime;
483
} SUdfcFuncStub;
484

485
typedef struct SUdfcProxy {
486
  char         udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
487
  uv_barrier_t initBarrier;
488

489
  uv_loop_t   uvLoop;
490
  uv_thread_t loopThread;
491
  uv_async_t  loopTaskAync;
492

493
  uv_async_t loopStopAsync;
494

495
  uv_mutex_t taskQueueMutex;
496
  int8_t     udfcState;
497
  QUEUE      taskQueue;
498
  QUEUE      uvProcTaskQueue;
499

500
  uv_mutex_t udfStubsMutex;
501
  SArray    *udfStubs;         // SUdfcFuncStub
502
  SArray    *expiredUdfStubs;  // SUdfcFuncStub
503

504
  uv_mutex_t udfcUvMutex;
505
  int8_t     initialized;
506
} SUdfcProxy;
507

508
SUdfcProxy gUdfcProxy = {0};
509

510
typedef struct SUdfcUvSession {
511
  SUdfcProxy *udfc;
512
  int64_t     severHandle;
513
  uv_pipe_t  *udfUvPipe;
514

515
  int8_t  outputType;
516
  int32_t bytes;
517
  int32_t bufSize;
518

519
  char udfName[TSDB_FUNC_NAME_LEN + 1];
520
} SUdfcUvSession;
521

522
typedef struct SClientUvTaskNode {
523
  SUdfcProxy *udfc;
524
  int8_t      type;
525
  int32_t     errCode;
526

527
  uv_pipe_t *pipe;
528

529
  int64_t  seqNum;
530
  uv_buf_t reqBuf;
531

532
  uv_sem_t taskSem;
533
  uv_buf_t rspBuf;
534

535
  QUEUE recvTaskQueue;
536
  QUEUE procTaskQueue;
537
  QUEUE connTaskQueue;
538
} SClientUvTaskNode;
539

540
typedef struct SClientUdfTask {
541
  int8_t type;
542

543
  SUdfcUvSession *session;
544

545
  union {
546
    struct {
547
      SUdfSetupRequest  req;
548
      SUdfSetupResponse rsp;
549
    } _setup;
550
    struct {
551
      SUdfCallRequest  req;
552
      SUdfCallResponse rsp;
553
    } _call;
554
    struct {
555
      SUdfTeardownRequest  req;
556
      SUdfTeardownResponse rsp;
557
    } _teardown;
558
  };
559

560
} SClientUdfTask;
561

562
typedef struct SClientConnBuf {
563
  char   *buf;
564
  int32_t len;
565
  int32_t cap;
566
  int32_t total;
567
} SClientConnBuf;
568

569
typedef struct SClientUvConn {
570
  uv_pipe_t      *pipe;
571
  QUEUE           taskQueue;
572
  SClientConnBuf  readBuf;
573
  SUdfcUvSession *session;
574
} SClientUvConn;
575

576
enum {
577
  UDFC_STATE_INITAL = 0,  // initial state
578
  UDFC_STATE_STARTNG,     // starting after udfcOpen
579
  UDFC_STATE_READY,       // started and begin to receive quests
580
  UDFC_STATE_STOPPING,    // stopping after udfcClose
581
};
582

583
void    getUdfdPipeName(char *pipeName, int32_t size);
584
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
585
void   *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request);
586
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state);
587
void   *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state);
588
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call);
589
void   *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call);
590
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown);
591
void   *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown);
592
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request);
593
void   *decodeUdfRequest(const void *buf, SUdfRequest *request);
594
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp);
595
void   *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp);
596
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp);
597
void   *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp);
598
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp);
599
void   *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse);
600
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp);
601
void   *decodeUdfResponse(const void *buf, SUdfResponse *rsp);
602
void    freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
603
void    freeUdfColumn(SUdfColumn *col);
604
void    freeUdfDataDataBlock(SUdfDataBlock *block);
605
void    freeUdfInterBuf(SUdfInterBuf *buf);
606
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
607
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
608
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
609
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);
610

611
void getUdfdPipeName(char *pipeName, int32_t size) {
4,800✔
612
  char    dnodeId[8] = {0};
4,800✔
613
  size_t  dnodeIdSize = sizeof(dnodeId);
4,800✔
614
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
4,800✔
615
  if (err != 0) {
4,800✔
616
    fnError("failed to get dnodeId from env since %s", uv_err_name(err));
1!
617
    dnodeId[0] = '1';
1✔
618
  }
619
#ifdef _WIN32
620
  snprintf(pipeName, size, "%s.%x.%s", UDF_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir)),
621
           dnodeId);
622
#else
623
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
4,800✔
624
#endif
625
  fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName);
4,800!
626
}
4,800✔
627

628
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
1,664✔
629
  int32_t len = 0;
1,664✔
630
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
1,664✔
631
  return len;
1,664✔
632
}
633

634
void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request) {
744✔
635
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
744✔
636
  return (void *)buf;
744✔
637
}
638

639
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state) {
16,974✔
640
  int32_t len = 0;
16,974✔
641
  len += taosEncodeFixedI8(buf, state->numOfResult);
16,974✔
642
  len += taosEncodeFixedI32(buf, state->bufLen);
16,974✔
643
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
16,974✔
644
  return len;
16,974✔
645
}
646

647
void *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state) {
8,683✔
648
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
8,683✔
649
  buf = taosDecodeFixedI32(buf, &state->bufLen);
8,683!
650
  buf = taosDecodeBinary(buf, (void **)&state->buf, state->bufLen);
8,683✔
651
  return (void *)buf;
8,683✔
652
}
653

654
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
65,972✔
655
  int32_t len = 0;
65,972✔
656
  len += taosEncodeFixedI64(buf, call->udfHandle);
65,972✔
657
  len += taosEncodeFixedI8(buf, call->callType);
65,972✔
658
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
65,972✔
659
    len += tEncodeDataBlock(buf, &call->block);
55,526✔
660
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
10,446✔
661
    len += taosEncodeFixedI8(buf, call->initFirst);
4,452✔
662
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
8,220✔
663
    len += tEncodeDataBlock(buf, &call->block);
6,058✔
664
    len += encodeUdfInterBuf(buf, &call->interBuf);
6,058✔
665
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
2,162!
UNCOV
666
    len += encodeUdfInterBuf(buf, &call->interBuf);
×
UNCOV
667
    len += encodeUdfInterBuf(buf, &call->interBuf2);
×
668
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
2,162!
669
    len += encodeUdfInterBuf(buf, &call->interBuf);
2,166✔
670
  }
671
  return len;
65,885✔
672
}
673

674
void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) {
32,035✔
675
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
32,035!
676
  buf = taosDecodeFixedI8(buf, &call->callType);
32,035✔
677
  switch (call->callType) {
32,035!
678
    case TSDB_UDF_CALL_SCALA_PROC:
27,664✔
679
      buf = tDecodeDataBlock(buf, &call->block);
27,664✔
680
      break;
27,643✔
681
    case TSDB_UDF_CALL_AGG_INIT:
917✔
682
      buf = taosDecodeFixedI8(buf, &call->initFirst);
917✔
683
      break;
917✔
684
    case TSDB_UDF_CALL_AGG_PROC:
2,571✔
685
      buf = tDecodeDataBlock(buf, &call->block);
2,571✔
686
      buf = decodeUdfInterBuf(buf, &call->interBuf);
2,571✔
687
      break;
2,571✔
UNCOV
688
    case TSDB_UDF_CALL_AGG_MERGE:
×
UNCOV
689
      buf = decodeUdfInterBuf(buf, &call->interBuf);
×
UNCOV
690
      buf = decodeUdfInterBuf(buf, &call->interBuf2);
×
UNCOV
691
      break;
×
692
    case TSDB_UDF_CALL_AGG_FIN:
887✔
693
      buf = decodeUdfInterBuf(buf, &call->interBuf);
887✔
694
      break;
887✔
695
  }
696
  return (void *)buf;
31,988✔
697
}
698

699
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
1,546✔
700
  int32_t len = 0;
1,546✔
701
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
1,546✔
702
  return len;
1,546✔
703
}
704

705
void *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown) {
686✔
706
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
686!
707
  return (void *)buf;
686✔
708
}
709

710
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request) {
69,139✔
711
  int32_t len = 0;
69,139✔
712
  if (buf == NULL) {
69,139✔
713
    len += sizeof(request->msgLen);
34,587✔
714
  } else {
715
    *(int32_t *)(*buf) = request->msgLen;
34,552✔
716
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
34,552✔
717
  }
718
  len += taosEncodeFixedI64(buf, request->seqNum);
69,139✔
719
  len += taosEncodeFixedI8(buf, request->type);
69,139✔
720
  if (request->type == UDF_TASK_SETUP) {
69,139✔
721
    len += encodeUdfSetupRequest(buf, &request->setup);
1,664✔
722
  } else if (request->type == UDF_TASK_CALL) {
67,475✔
723
    len += encodeUdfCallRequest(buf, &request->call);
66,022✔
724
  } else if (request->type == UDF_TASK_TEARDOWN) {
1,453!
725
    len += encodeUdfTeardownRequest(buf, &request->teardown);
1,546✔
726
  }
727
  return len;
69,010✔
728
}
729

730
void *decodeUdfRequest(const void *buf, SUdfRequest *request) {
33,470✔
731
  request->msgLen = *(int32_t *)(buf);
33,470✔
732
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
33,470✔
733

734
  buf = taosDecodeFixedI64(buf, &request->seqNum);
33,470✔
735
  buf = taosDecodeFixedI8(buf, &request->type);
33,470✔
736

737
  if (request->type == UDF_TASK_SETUP) {
33,470✔
738
    buf = decodeUdfSetupRequest(buf, &request->setup);
744✔
739
  } else if (request->type == UDF_TASK_CALL) {
32,726✔
740
    buf = decodeUdfCallRequest(buf, &request->call);
32,039✔
741
  } else if (request->type == UDF_TASK_TEARDOWN) {
687✔
742
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
686✔
743
  }
744
  return (void *)buf;
33,431✔
745
}
746

747
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
1,488✔
748
  int32_t len = 0;
1,488✔
749
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
1,488✔
750
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
1,488✔
751
  len += taosEncodeFixedI32(buf, setupRsp->bytes);
1,488✔
752
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
1,488✔
753
  return len;
1,488✔
754
}
755

756
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
831✔
757
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
831!
758
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
831✔
759
  buf = taosDecodeFixedI32(buf, &setupRsp->bytes);
831!
760
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
831!
761
  return (void *)buf;
831✔
762
}
763

764
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
63,897✔
765
  int32_t len = 0;
63,897✔
766
  len += taosEncodeFixedI8(buf, callRsp->callType);
63,897✔
767
  switch (callRsp->callType) {
63,897!
768
    case TSDB_UDF_CALL_SCALA_PROC:
55,149✔
769
      len += tEncodeDataBlock(buf, &callRsp->resultData);
55,149✔
770
      break;
55,061✔
771
    case TSDB_UDF_CALL_AGG_INIT:
1,834✔
772
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
1,834✔
773
      break;
1,834✔
774
    case TSDB_UDF_CALL_AGG_PROC:
5,142✔
775
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
5,142✔
776
      break;
5,142✔
UNCOV
777
    case TSDB_UDF_CALL_AGG_MERGE:
×
UNCOV
778
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
×
UNCOV
779
      break;
×
780
    case TSDB_UDF_CALL_AGG_FIN:
1,774✔
781
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
1,774✔
782
      break;
1,774✔
783
  }
784
  return len;
63,809✔
785
}
786

787
void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) {
32,772✔
788
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
32,772✔
789
  switch (callRsp->callType) {
32,772!
790
    case TSDB_UDF_CALL_SCALA_PROC:
27,557✔
791
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
27,557✔
792
      break;
27,714✔
793
    case TSDB_UDF_CALL_AGG_INIT:
1,113✔
794
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
1,113✔
795
      break;
1,113✔
796
    case TSDB_UDF_CALL_AGG_PROC:
3,029✔
797
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
3,029✔
798
      break;
3,029✔
UNCOV
799
    case TSDB_UDF_CALL_AGG_MERGE:
×
UNCOV
800
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
×
UNCOV
801
      break;
×
802
    case TSDB_UDF_CALL_AGG_FIN:
1,083✔
803
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
1,083✔
804
      break;
1,083✔
805
  }
806
  return (void *)buf;
32,929✔
807
}
808

809
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp) { return 0; }
1,372✔
810

811
void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse) { return (void *)buf; }
773✔
812

813
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) {
66,811✔
814
  int32_t len = 0;
66,811✔
815
  len += sizeof(rsp->msgLen);
66,811✔
816
  if (buf != NULL) {
66,811✔
817
    *(int32_t *)(*buf) = rsp->msgLen;
33,452✔
818
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
33,452✔
819
  }
820

821
  len += sizeof(rsp->seqNum);
66,811✔
822
  if (buf != NULL) {
66,811✔
823
    *(int64_t *)(*buf) = rsp->seqNum;
33,456✔
824
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
33,456✔
825
  }
826

827
  len += taosEncodeFixedI64(buf, rsp->seqNum);
66,811✔
828
  len += taosEncodeFixedI8(buf, rsp->type);
66,811✔
829
  len += taosEncodeFixedI32(buf, rsp->code);
66,811✔
830

831
  switch (rsp->type) {
66,811!
832
    case UDF_TASK_SETUP:
1,488✔
833
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
1,488✔
834
      break;
1,488✔
835
    case UDF_TASK_CALL:
63,981✔
836
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
63,981✔
837
      break;
63,753✔
838
    case UDF_TASK_TEARDOWN:
1,372✔
839
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
1,372✔
840
      break;
1,372✔
UNCOV
841
    default:
×
UNCOV
842
      fnError("encode udf response, invalid udf response type %d", rsp->type);
×
UNCOV
843
      break;
×
844
  }
845
  return len;
66,613✔
846
}
847

848
void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) {
34,351✔
849
  rsp->msgLen = *(int32_t *)(buf);
34,351✔
850
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
34,351✔
851
  rsp->seqNum = *(int64_t *)(buf);
34,351✔
852
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
34,351✔
853
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
34,351!
854
  buf = taosDecodeFixedI8(buf, &rsp->type);
34,351✔
855
  buf = taosDecodeFixedI32(buf, &rsp->code);
34,351!
856

857
  switch (rsp->type) {
34,351!
858
    case UDF_TASK_SETUP:
832✔
859
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
832✔
860
      break;
832✔
861
    case UDF_TASK_CALL:
32,999✔
862
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
32,999✔
863
      break;
32,728✔
864
    case UDF_TASK_TEARDOWN:
773✔
865
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
773✔
866
      break;
773✔
UNCOV
867
    default:
×
UNCOV
868
      rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR;
×
UNCOV
869
      fnError("decode udf response, invalid udf response type %d", rsp->type);
×
UNCOV
870
      break;
×
871
  }
872
  if (buf == NULL) {
34,333!
873
    rsp->code = terrno;
×
874
    fnError("decode udf response failed, code:0x%x", rsp->code);
×
875
  }
876
  return (void *)buf;
34,366✔
877
}
878

879
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
58,554✔
880
  if (IS_VAR_DATA_TYPE(meta->type)) {
58,554!
881
    taosMemoryFree(data->varLenCol.varOffsets);
253✔
882
    data->varLenCol.varOffsets = NULL;
255✔
883
    taosMemoryFree(data->varLenCol.payload);
255✔
884
    data->varLenCol.payload = NULL;
255✔
885
  } else {
886
    taosMemoryFree(data->fixLenCol.nullBitmap);
58,301✔
887
    data->fixLenCol.nullBitmap = NULL;
58,075✔
888
    taosMemoryFree(data->fixLenCol.data);
58,075✔
889
    data->fixLenCol.data = NULL;
58,324✔
890
  }
891
}
58,579✔
892

893
void freeUdfColumn(SUdfColumn *col) { freeUdfColumnData(&col->colData, &col->colMeta); }
58,551✔
894

895
void freeUdfDataDataBlock(SUdfDataBlock *block) {
29,988✔
896
  for (int32_t i = 0; i < block->numOfCols; ++i) {
61,007✔
897
    freeUdfColumn(block->udfCols[i]);
30,786✔
898
    taosMemoryFree(block->udfCols[i]);
30,932✔
899
    block->udfCols[i] = NULL;
31,019✔
900
  }
901
  taosMemoryFree(block->udfCols);
30,221✔
902
  block->udfCols = NULL;
30,218✔
903
}
30,218✔
904

905
void freeUdfInterBuf(SUdfInterBuf *buf) {
13,030✔
906
  taosMemoryFree(buf->buf);
13,030✔
907
  buf->buf = NULL;
13,030✔
908
}
13,030✔
909

910
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
30,103✔
911
  udfBlock->numOfRows = block->info.rows;
30,103✔
912
  udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
30,103✔
913
  udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
30,121✔
914
  if ((udfBlock->udfCols) == NULL) {
30,198!
UNCOV
915
    return terrno;
×
916
  }
917
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
61,070✔
918
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
30,864✔
919
    if (udfBlock->udfCols[i] == NULL) {
30,937!
UNCOV
920
      return terrno;
×
921
    }
922
    SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
30,937✔
923
    SUdfColumn      *udfCol = udfBlock->udfCols[i];
30,852✔
924
    udfCol->colMeta.type = col->info.type;
30,852✔
925
    udfCol->colMeta.bytes = col->info.bytes;
30,852✔
926
    udfCol->colMeta.scale = col->info.scale;
30,852✔
927
    udfCol->colMeta.precision = col->info.precision;
30,852✔
928
    udfCol->colData.numOfRows = udfBlock->numOfRows;
30,852✔
929
    udfCol->hasNull = col->hasNull;
30,852✔
930
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
30,852!
931
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
93✔
932
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
93✔
933
      if (udfCol->colData.varLenCol.varOffsets == NULL) {
155!
UNCOV
934
        return terrno;
×
935
      }
936
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
155✔
937
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
155✔
938
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
155✔
939
      if (udfCol->colData.varLenCol.payload == NULL) {
155!
UNCOV
940
        return terrno;
×
941
      }
942
      if (col->reassigned) {
155!
UNCOV
943
        for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
×
944
          char   *pColData = col->pData + col->varmeta.offset[row];
×
UNCOV
945
          int32_t colSize = 0;
×
UNCOV
946
          if (col->info.type == TSDB_DATA_TYPE_JSON) {
×
947
            colSize = getJsonValueLen(pColData);
×
948
          } else {
949
            colSize = varDataTLen(pColData);
×
950
          }
951
          memcpy(udfCol->colData.varLenCol.payload, pColData, colSize);
×
UNCOV
952
          udfCol->colData.varLenCol.payload += colSize;
×
953
        }
954
      } else {
955
        memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
155✔
956
      }
957
    } else {
958
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
30,759✔
959
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
30,759✔
960
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
30,759✔
961
      if (udfCol->colData.fixLenCol.nullBitmap == NULL) {
30,779!
UNCOV
962
        return terrno;
×
963
      }
964
      char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
30,779✔
965
      memcpy(bitmap, col->nullbitmap, bitmapLen);
30,779✔
966
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
30,779✔
967
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
30,776✔
968
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
30,776✔
969
      if (NULL == udfCol->colData.fixLenCol.data) {
30,717!
UNCOV
970
        return terrno;
×
971
      }
972
      char *data = udfCol->colData.fixLenCol.data;
30,717✔
973
      memcpy(data, col->pData, dataLen);
30,717✔
974
    }
975
  }
976
  return TSDB_CODE_SUCCESS;
30,206✔
977
}
978

979
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
27,435✔
980
  int32_t         code = 0, lino = 0;
27,435✔
981
  SUdfColumnMeta *meta = &udfCol->colMeta;
27,435✔
982

983
  SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
27,435✔
984
  code = blockDataAppendColInfo(block, &colInfoData);
27,437✔
985
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,610!
986

987
  code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
27,610✔
988
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,492!
989

990
  SColumnInfoData *col = NULL;
27,492✔
991
  code = bdGetColumnInfoData(block, 0, &col);
27,492✔
992
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,631!
993

994
  for (int32_t i = 0; i < udfCol->colData.numOfRows; ++i) {
6,593,623✔
995
    if (udfColDataIsNull(udfCol, i)) {
6,560,888✔
996
      colDataSetNULL(col, i);
699,115✔
997
    } else {
998
      char *data = udfColDataGetData(udfCol, i);
5,861,773✔
999
      code = colDataSetVal(col, i, data, false);
5,861,773✔
1000
      TAOS_CHECK_GOTO(code, &lino, _exit);
5,866,877!
1001
    }
1002
  }
1003
  block->info.rows = udfCol->colData.numOfRows;
32,735✔
1004

1005
_exit:
32,735✔
1006
  if (code != 0) {
32,735!
UNCOV
1007
    fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino);
×
1008
  }
1009
  return TSDB_CODE_SUCCESS;
27,582✔
1010
}
1011

1012
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
27,808✔
1013
  int32_t code = 0, lino = 0;
27,808✔
1014
  int32_t numOfRows = 0;
27,808✔
1015
  for (int32_t i = 0; i < numOfCols; ++i) {
56,181✔
1016
    numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
28,373✔
1017
  }
1018

1019
  // create the basic block info structure
1020
  for (int32_t i = 0; i < numOfCols; ++i) {
56,176✔
1021
    SColumnInfoData *pInfo = input[i].columnData;
28,372✔
1022
    SColumnInfoData  d = {0};
28,372✔
1023
    d.info = pInfo->info;
28,372✔
1024

1025
    TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit);
28,372!
1026
  }
1027

1028
  TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit);
27,804!
1029

1030
  for (int32_t i = 0; i < numOfCols; ++i) {
56,178✔
1031
    SColumnInfoData *pDest = taosArrayGet(output->pDataBlock, i);
28,372✔
1032

1033
    SColumnInfoData *pColInfoData = input[i].columnData;
28,373✔
1034
    TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit);
28,373!
1035

1036
    if (input[i].numOfRows < numOfRows) {
28,370✔
1037
      int32_t startRow = input[i].numOfRows;
1✔
1038
      int32_t expandRows = numOfRows - startRow;
1✔
1039
      bool    isNull = colDataIsNull_s(pColInfoData, (input + i)->numOfRows - 1);
1!
1040
      if (isNull) {
1!
UNCOV
1041
        colDataSetNNULL(pDest, startRow, expandRows);
×
1042
      } else {
1043
        char *src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
1!
1044
        for (int32_t j = 0; j < expandRows; ++j) {
7✔
1045
          TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow + j, src, false), &lino, _exit);
6!
1046
        }
1047
      }
1048
    }
1049
  }
1050

1051
  output->info.rows = numOfRows;
27,806✔
1052
_exit:
27,806✔
1053
  if (code != 0) {
27,806!
UNCOV
1054
    fnError("failed to convert scalar param to data block, code:%d, line:%d", code, lino);
×
1055
  }
1056
  return code;
27,805✔
1057
}
1058

1059
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
27,680✔
1060
  if (taosArrayGetSize(input->pDataBlock) != 1) {
27,680!
UNCOV
1061
    fnError("scalar function only support one column");
×
UNCOV
1062
    return 0;
×
1063
  }
1064
  output->numOfRows = input->info.rows;
27,686✔
1065

1066
  output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
27,686✔
1067
  if (output->columnData == NULL) {
27,752!
UNCOV
1068
    return terrno;
×
1069
  }
1070
  memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
27,752✔
1071
  output->colAlloced = true;
27,692✔
1072

1073
  return 0;
27,692✔
1074
}
1075

1076
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
1077
// memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
1078
typedef struct SUdfAggRes {
1079
  int8_t  finalResNum;
1080
  int8_t  interResNum;
1081
  int32_t interResBufLen;
1082
  char   *finalResBuf;
1083
  char   *interResBuf;
1084
} SUdfAggRes;
1085

1086
void    onUdfcPipeClose(uv_handle_t *handle);
1087
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
1088
void    udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
1089
bool    isUdfcUvMsgComplete(SClientConnBuf *connBuf);
1090
void    udfcUvHandleRsp(SClientUvConn *conn);
1091
void    udfcUvHandleError(SClientUvConn *conn);
1092
void    onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
1093
void    onUdfcPipeWrite(uv_write_t *write, int32_t status);
1094
void    onUdfcPipeConnect(uv_connect_t *connect, int32_t status);
1095
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
1096
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
1097
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
1098
void    udfcAsyncTaskCb(uv_async_t *async);
1099
void    cleanUpUvTasks(SUdfcProxy *udfc);
1100
void    udfStopAsyncCb(uv_async_t *async);
1101
void    constructUdfService(void *argsThread);
1102
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
1103
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
1104
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2);
1105
int32_t doTeardownUdf(UdfcFuncHandle handle);
1106

1107
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1108
                SSDataBlock *output, SUdfInterBuf *newState);
1109
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
1110
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
1111
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
1112
                          SUdfInterBuf *resultBuf);
1113
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
1114
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1115
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1116

1117
int32_t udfcOpen();
1118
int32_t udfcClose();
1119

1120
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
1121
void    releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
1122
int32_t cleanUpUdfs();
1123

1124
bool    udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
1125
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
1126
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
1127
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
1128

1129
void    cleanupNotExpiredUdfs();
1130
void    cleanupExpiredUdfs();
1131
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) {
107,680✔
1132
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
107,680✔
1133
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
107,680✔
1134
  return strcmp(stub1->udfName, stub2->udfName);
107,680✔
1135
}
1136

1137
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
33,013✔
1138
  int32_t code = 0, line = 0;
33,013✔
1139
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
33,013✔
1140
  SUdfcFuncStub key = {0};
33,133✔
1141
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
33,133✔
1142
  int32_t stubIndex = taosArraySearchIdx(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
33,133✔
1143
  if (stubIndex != -1) {
33,133✔
1144
    SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
32,228✔
1145
    UdfcFuncHandle handle = foundStub->handle;
32,228✔
1146
    int64_t        currUs = taosGetTimestampUs();
32,228✔
1147
    bool           expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000;
32,228✔
1148
    if (!expired) {
32,228✔
1149
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
32,209!
1150
        *pHandle = foundStub->handle;
32,209✔
1151
        ++foundStub->refCount;
32,209✔
1152
        uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
32,209✔
1153
        return 0;
32,209✔
1154
      } else {
UNCOV
1155
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
×
1156
               foundStub->refCount, foundStub->createTime);
UNCOV
1157
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
×
1158
      }
1159
    } else {
1160
      fnDebug("udf handle expired for %s, will setup udf. move it to expired list", udfName);
19!
1161
      if (taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub) == NULL) {
38!
UNCOV
1162
        fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1163
      } else {
1164
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
19✔
1165
        taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
19✔
1166
      }
1167
    }
1168
  }
1169
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
924✔
1170
  *pHandle = NULL;
924✔
1171
  code = doSetupUdf(udfName, pHandle);
924✔
1172
  if (code == TSDB_CODE_SUCCESS) {
924✔
1173
    SUdfcFuncStub stub = {0};
824✔
1174
    tstrncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
824✔
1175
    stub.handle = *pHandle;
824✔
1176
    ++stub.refCount;
824✔
1177
    stub.createTime = taosGetTimestampUs();
824✔
1178
    uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
824✔
1179
    if (taosArrayPush(gUdfcProxy.udfStubs, &stub) == NULL) {
1,648!
UNCOV
1180
      fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
UNCOV
1181
      uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
UNCOV
1182
      goto _exit;
×
1183
    } else {
1184
      taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
824✔
1185
    }
1186
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
824✔
1187
  } else {
1188
    *pHandle = NULL;
100✔
1189
  }
1190

1191
_exit:
924✔
1192
  return code;
924✔
1193
}
1194

1195
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
32,893✔
1196
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
32,893✔
1197
  SUdfcFuncStub key = {0};
33,033✔
1198
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
33,033✔
1199
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
33,033✔
1200
  SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ);
33,033✔
1201
  if (!foundStub && !expiredStub) {
33,033!
UNCOV
1202
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
UNCOV
1203
    return;
×
1204
  }
1205
  if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) {
33,033!
1206
    --foundStub->refCount;
32,982✔
1207
  }
1208
  if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) {
33,033!
UNCOV
1209
    --expiredStub->refCount;
×
1210
  }
1211
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
33,033✔
1212
}
1213

1214
void cleanupExpiredUdfs() {
10,243✔
1215
  int32_t i = 0;
10,243✔
1216
  SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
10,243✔
1217
  if (expiredUdfStubs == NULL) {
10,243!
UNCOV
1218
    fnError("cleanupExpiredUdfs: failed to init array");
×
UNCOV
1219
    return;
×
1220
  }
1221
  while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
54,417✔
1222
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
44,174✔
1223
    if (stub->refCount == 0) {
44,174!
UNCOV
1224
      fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle,
×
1225
             stub->refCount);
UNCOV
1226
      (void)doTeardownUdf(stub->handle);
×
1227
    } else {
1228
      fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p",
44,174!
1229
             stub->udfName, stub->refCount, stub->createTime, stub->handle);
1230
      UdfcFuncHandle handle = stub->handle;
44,174✔
1231
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
44,174!
1232
        if (taosArrayPush(expiredUdfStubs, stub) == NULL) {
44,174!
UNCOV
1233
          fnError("cleanupExpiredUdfs: failed to push udf stub to array");
×
1234
        }
1235
      } else {
UNCOV
1236
        fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
×
1237
               stub->udfName, stub->refCount, stub->createTime);
1238
      }
1239
    }
1240
    ++i;
44,174✔
1241
  }
1242
  taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
10,243✔
1243
  gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
10,243✔
1244
}
1245

1246
void cleanupNotExpiredUdfs() {
10,243✔
1247
  SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
10,243✔
1248
  if (udfStubs == NULL) {
10,243!
UNCOV
1249
    fnError("cleanupNotExpiredUdfs: failed to init array");
×
UNCOV
1250
    return;
×
1251
  }
1252
  int32_t i = 0;
10,243✔
1253
  while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
46,421✔
1254
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
36,178✔
1255
    if (stub->refCount == 0) {
36,178✔
1256
      fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
773!
1257
      (void)doTeardownUdf(stub->handle);
773✔
1258
    } else {
1259
      fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
35,405!
1260
             stub->refCount, stub->createTime, stub->handle);
1261
      UdfcFuncHandle handle = stub->handle;
35,405✔
1262
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
35,405!
1263
        if (taosArrayPush(udfStubs, stub) == NULL) {
35,404!
UNCOV
1264
          fnError("cleanupNotExpiredUdfs: failed to push udf stub to array");
×
1265
        }
1266
      } else {
1267
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName,
1!
1268
               stub->refCount, stub->createTime);
1269
      }
1270
    }
1271
    ++i;
36,178✔
1272
  }
1273
  taosArrayDestroy(gUdfcProxy.udfStubs);
10,243✔
1274
  gUdfcProxy.udfStubs = udfStubs;
10,243✔
1275
}
1276

1277
int32_t cleanUpUdfs() {
2,852,331✔
1278
  int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
2,852,331✔
1279
  if (!initialized) {
2,852,537✔
1280
    return TSDB_CODE_SUCCESS;
88,888✔
1281
  }
1282

1283
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
2,763,649✔
1284
  if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
2,764,633!
1285
      (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
2,754,390!
1286
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
2,754,390✔
1287
    return TSDB_CODE_SUCCESS;
2,754,373✔
1288
  }
1289

1290
  cleanupNotExpiredUdfs();
10,243✔
1291
  cleanupExpiredUdfs();
10,243✔
1292

1293
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
10,243✔
1294
  return 0;
10,243✔
1295
}
1296

1297
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
27,784✔
1298
  UdfcFuncHandle handle = NULL;
27,784✔
1299
  int32_t        code = acquireUdfFuncHandle(udfName, &handle);
27,784✔
1300
  if (code != 0) {
27,891✔
1301
    return code;
84✔
1302
  }
1303

1304
  SUdfcUvSession *session = handle;
27,807✔
1305
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
27,807✔
1306
  if (code != TSDB_CODE_SUCCESS) {
27,675✔
1307
    fnError("udfc scalar function execution failure");
14!
1308
    releaseUdfFuncHandle(udfName, handle);
14✔
1309
    return code;
16✔
1310
  }
1311

1312
  if (output->columnData == NULL) {
27,661!
UNCOV
1313
    fnError("udfc scalar function calculate error. no column data");
×
UNCOV
1314
    code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1315
  } else {
1316
    if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) {
27,661!
1317
      fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
×
1318
              session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes);
UNCOV
1319
      code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1320
    }
1321
  }
1322
  releaseUdfFuncHandle(udfName, handle);
27,740✔
1323
  return code;
27,790✔
1324
}
1325

1326
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
599✔
1327
  if (fmIsScalarFunc(pFunc->funcId)) {
599!
UNCOV
1328
    return false;
×
1329
  }
1330
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
599✔
1331
  return true;
599✔
1332
}
1333

1334
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) {
1,129✔
1335
  if (pResultCellInfo->initialized) {
1,129!
UNCOV
1336
    return TSDB_CODE_SUCCESS;
×
1337
  }
1338
  if (functionSetup(pCtx, pResultCellInfo) != TSDB_CODE_SUCCESS) {
1,129!
UNCOV
1339
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1340
  }
1341
  UdfcFuncHandle handle;
1342
  int32_t        udfCode = 0;
1,129✔
1343
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
1,129✔
1344
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
16!
1345
    return TSDB_CODE_FUNC_SETUP_ERROR;
16✔
1346
  }
1347
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
1,113✔
1348
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
1,113✔
1349
  int32_t         envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize;
1,113✔
1350
  memset(udfRes, 0, envSize);
1,113✔
1351

1352
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
1,113✔
1353
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
1,113✔
1354

1355
  SUdfInterBuf buf = {0};
1,113✔
1356
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
1,113✔
1357
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
28!
1358
    releaseUdfFuncHandle(pCtx->udfName, handle);
28✔
1359
    return TSDB_CODE_FUNC_SETUP_ERROR;
28✔
1360
  }
1361
  if (buf.bufLen <= session->bufSize) {
1,085!
1362
    memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
1,085✔
1363
    udfRes->interResBufLen = buf.bufLen;
1,085✔
1364
    udfRes->interResNum = buf.numOfResult;
1,085✔
1365
  } else {
UNCOV
1366
    fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
×
UNCOV
1367
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
UNCOV
1368
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1369
  }
1370
  releaseUdfFuncHandle(pCtx->udfName, handle);
1,085✔
1371
  freeUdfInterBuf(&buf);
1,085✔
1372
  return TSDB_CODE_SUCCESS;
1,085✔
1373
}
1374

1375
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
3,029✔
1376
  int32_t        udfCode = 0;
3,029✔
1377
  UdfcFuncHandle handle = 0;
3,029✔
1378
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
3,029!
UNCOV
1379
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
UNCOV
1380
    return udfCode;
×
1381
  }
1382

1383
  SUdfcUvSession *session = handle;
3,029✔
1384
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
3,029✔
1385
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
3,029✔
1386
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
3,029✔
1387

1388
  SInputColumnInfoData *pInput = &pCtx->input;
3,029✔
1389
  int32_t               numOfCols = pInput->numOfInputCols;
3,029✔
1390
  int32_t               start = pInput->startRowIndex;
3,029✔
1391
  int32_t               numOfRows = pInput->numOfRows;
3,029✔
1392
  SSDataBlock          *pTempBlock = NULL;
3,029✔
1393
  int32_t               code = createDataBlock(&pTempBlock);
3,029✔
1394

1395
  if (code) {
3,029!
UNCOV
1396
    return code;
×
1397
  }
1398

1399
  pTempBlock->info.rows = pInput->totalRows;
3,029✔
1400
  pTempBlock->info.id.uid = pInput->uid;
3,029✔
1401
  for (int32_t i = 0; i < numOfCols; ++i) {
6,256✔
1402
    if ((udfCode = blockDataAppendColInfo(pTempBlock, pInput->pData[i])) != 0) {
3,227!
UNCOV
1403
      fnError("udfAggProcess error. step blockDataAppendColInfo. udf code: %d", udfCode);
×
UNCOV
1404
      blockDataDestroy(pTempBlock);
×
UNCOV
1405
      return udfCode;
×
1406
    }
1407
  }
1408

1409
  SSDataBlock *inputBlock = NULL;
3,029✔
1410
  code = blockDataExtractBlock(pTempBlock, start, numOfRows, &inputBlock);
3,029✔
1411
  if (code) {
3,029!
UNCOV
1412
    return code;
×
1413
  }
1414

1415
  SUdfInterBuf state = {
3,029✔
1416
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
3,029✔
1417
  SUdfInterBuf newState = {0};
3,029✔
1418

1419
  udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
3,029✔
1420
  if (udfCode != 0) {
3,029!
UNCOV
1421
    fnError("udfAggProcess error. code: %d", udfCode);
×
UNCOV
1422
    newState.numOfResult = 0;
×
1423
  } else {
1424
    if (newState.bufLen <= session->bufSize) {
3,029!
1425
      memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
3,029✔
1426
      udfRes->interResBufLen = newState.bufLen;
3,029✔
1427
      udfRes->interResNum = newState.numOfResult;
3,029✔
1428
    } else {
UNCOV
1429
      fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
×
UNCOV
1430
      udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
×
1431
    }
1432
  }
1433

1434
  GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum;
3,029✔
1435

1436
  blockDataDestroy(inputBlock);
3,029✔
1437

1438
  taosArrayDestroy(pTempBlock->pDataBlock);
3,029✔
1439
  taosMemoryFree(pTempBlock);
3,029✔
1440

1441
  releaseUdfFuncHandle(pCtx->udfName, handle);
3,029✔
1442
  freeUdfInterBuf(&newState);
3,029✔
1443
  return udfCode;
3,029✔
1444
}
1445

1446
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
1,083✔
1447
  int32_t        udfCode = 0;
1,083✔
1448
  UdfcFuncHandle handle = 0;
1,083✔
1449
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
1,083!
UNCOV
1450
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
UNCOV
1451
    return udfCode;
×
1452
  }
1453

1454
  SUdfcUvSession *session = handle;
1,083✔
1455
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
1,083✔
1456
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
1,083✔
1457
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
1,083✔
1458

1459
  SUdfInterBuf resultBuf = {0};
1,083✔
1460
  SUdfInterBuf state = {
1,083✔
1461
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
1,083✔
1462
  int32_t udfCallCode = 0;
1,083✔
1463
  udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
1,083✔
1464
  if (udfCallCode != 0) {
1,083!
UNCOV
1465
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
×
UNCOV
1466
    GET_RES_INFO(pCtx)->numOfRes = 0;
×
1467
  } else {
1468
    if (resultBuf.numOfResult == 0) {
1,083✔
1469
      udfRes->finalResNum = 0;
15✔
1470
      GET_RES_INFO(pCtx)->numOfRes = 0;
15✔
1471
    } else {
1472
      if (resultBuf.bufLen <= session->bytes) {
1,068!
1473
        memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen);
1,068✔
1474
        udfRes->finalResNum = resultBuf.numOfResult;
1,068✔
1475
        GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
1,068✔
1476
      } else {
UNCOV
1477
        fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
×
UNCOV
1478
        GET_RES_INFO(pCtx)->numOfRes = 0;
×
UNCOV
1479
        udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1480
      }
1481
    }
1482
  }
1483

1484
  freeUdfInterBuf(&resultBuf);
1,083✔
1485

1486
  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
1,083✔
1487
  releaseUdfFuncHandle(pCtx->udfName, handle);
1,083✔
1488
  return udfCallCode == 0 ? numOfResults : udfCallCode;
1,083!
1489
}
1490

1491
void onUdfcPipeClose(uv_handle_t *handle) {
774✔
1492
  SClientUvConn *conn = handle->data;
774✔
1493
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
774✔
1494
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
773✔
1495
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
773✔
1496
    task->errCode = 0;
773✔
1497
    QUEUE_REMOVE(&task->procTaskQueue);
773✔
1498
    uv_sem_post(&task->taskSem);
773✔
1499
  }
1500
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
774✔
1501
  if (conn->session != NULL) {
774!
1502
    conn->session->udfUvPipe = NULL;
774✔
1503
  }
1504
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
774✔
1505
  taosMemoryFree(conn->readBuf.buf);
774✔
1506
  taosMemoryFree(conn);
774✔
1507
  taosMemoryFree((uv_pipe_t *)handle);
774✔
1508
}
774✔
1509

1510
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
35,982✔
1511
  int32_t code = 0;
35,982✔
1512
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
35,982✔
1513
  if (uvTask->type == UV_TASK_REQ_RSP) {
36,115✔
1514
    if (uvTask->rspBuf.base != NULL) {
34,418!
1515
      SUdfResponse rsp = {0};
34,578✔
1516
      void        *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
34,578✔
1517
      code = rsp.code;
34,419✔
1518
      if (code != 0) {
34,419✔
1519
        fnError("udfc get udf task result failure. code: %d", code);
52!
1520
      }
1521

1522
      switch (task->type) {
34,514!
1523
        case UDF_TASK_SETUP: {
832✔
1524
          task->_setup.rsp = rsp.setupRsp;
832✔
1525
          break;
832✔
1526
        }
1527
        case UDF_TASK_CALL: {
32,909✔
1528
          task->_call.rsp = rsp.callRsp;
32,909✔
1529
          break;
32,909✔
1530
        }
1531
        case UDF_TASK_TEARDOWN: {
773✔
1532
          task->_teardown.rsp = rsp.teardownRsp;
1533
          break;
773✔
1534
        }
UNCOV
1535
        default: {
×
UNCOV
1536
          break;
×
1537
        }
1538
      }
1539

1540
      // TODO: the call buffer is setup and freed by udf invocation
1541
      taosMemoryFree(uvTask->rspBuf.base);
34,514✔
1542
    } else {
UNCOV
1543
      code = uvTask->errCode;
×
UNCOV
1544
      if (code != 0) {
×
UNCOV
1545
        fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1546
      }
1547
    }
1548
  } else if (uvTask->type == UV_TASK_CONNECT) {
1,697✔
1549
    code = uvTask->errCode;
924✔
1550
    if (code != 0) {
924✔
1551
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
92!
1552
    }
1553
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
773!
1554
    code = uvTask->errCode;
773✔
1555
    if (code != 0) {
773!
UNCOV
1556
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1557
    }
1558
  }
1559
  return code;
36,223✔
1560
}
1561

1562
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
97,200✔
1563
  SClientUvConn  *conn = handle->data;
97,200✔
1564
  SClientConnBuf *connBuf = &conn->readBuf;
97,200✔
1565

1566
  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
97,200✔
1567
  if (connBuf->cap == 0) {
97,200✔
1568
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
35,470✔
1569
    if (connBuf->buf) {
35,470!
1570
      connBuf->len = 0;
35,470✔
1571
      connBuf->cap = msgHeadSize;
35,470✔
1572
      connBuf->total = -1;
35,470✔
1573

1574
      buf->base = connBuf->buf;
35,470✔
1575
      buf->len = connBuf->cap;
35,470✔
1576
    } else {
UNCOV
1577
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
×
UNCOV
1578
      buf->base = NULL;
×
UNCOV
1579
      buf->len = 0;
×
1580
    }
1581
  } else if (connBuf->total == -1 && connBuf->len < msgHeadSize) {
61,730!
1582
    buf->base = connBuf->buf + connBuf->len;
27,078✔
1583
    buf->len = msgHeadSize - connBuf->len;
27,078✔
1584
  } else {
1585
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
34,652✔
1586
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
34,652✔
1587
    if (resultBuf) {
34,652!
1588
      connBuf->buf = resultBuf;
34,652✔
1589
      buf->base = connBuf->buf + connBuf->len;
34,652✔
1590
      buf->len = connBuf->cap - connBuf->len;
34,652✔
1591
    } else {
UNCOV
1592
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
×
UNCOV
1593
      buf->base = NULL;
×
UNCOV
1594
      buf->len = 0;
×
1595
    }
1596
  }
1597

1598
  fnDebug("udfc uv alloc buffer: cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
97,200✔
1599
}
97,200✔
1600

1601
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
69,290✔
1602
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
69,290!
1603
    connBuf->total = *(int32_t *)(connBuf->buf);
34,638✔
1604
  }
1605
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
69,290✔
1606
    fnDebug("udfc complete message is received, now handle it");
34,638✔
1607
    return true;
34,638✔
1608
  }
1609
  return false;
34,652✔
1610
}
1611

1612
void udfcUvHandleRsp(SClientUvConn *conn) {
34,638✔
1613
  SClientConnBuf *connBuf = &conn->readBuf;
34,638✔
1614
  int64_t         seqNum = *(int64_t *)(connBuf->buf + sizeof(int32_t));  // msglen then seqnum
34,638✔
1615

1616
  if (QUEUE_EMPTY(&conn->taskQueue)) {
34,638!
UNCOV
1617
    fnError("udfc no task waiting on connection. response seqnum:%" PRId64, seqNum);
×
UNCOV
1618
    return;
×
1619
  }
1620
  bool               found = false;
34,638✔
1621
  SClientUvTaskNode *taskFound = NULL;
34,638✔
1622
  QUEUE             *h = QUEUE_NEXT(&conn->taskQueue);
34,638✔
1623
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
34,638✔
1624

1625
  while (h != &conn->taskQueue) {
109,552✔
1626
    fnDebug("udfc handle response iterate through queue. uvTask:%" PRId64 "-%p", task->seqNum, task);
74,914✔
1627
    if (task->seqNum == seqNum) {
74,914✔
1628
      if (found == false) {
34,638!
1629
        found = true;
34,638✔
1630
        taskFound = task;
34,638✔
1631
      } else {
UNCOV
1632
        fnError("udfc more than one task waiting for the same response");
×
UNCOV
1633
        continue;
×
1634
      }
1635
    }
1636
    h = QUEUE_NEXT(h);
74,914✔
1637
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
74,914✔
1638
  }
1639

1640
  if (taskFound) {
34,638!
1641
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
34,638✔
1642
    QUEUE_REMOVE(&taskFound->connTaskQueue);
34,638✔
1643
    QUEUE_REMOVE(&taskFound->procTaskQueue);
34,638✔
1644
    uv_sem_post(&taskFound->taskSem);
34,638✔
1645
  } else {
UNCOV
1646
    fnError("no task is waiting for the response.");
×
1647
  }
1648
  connBuf->buf = NULL;
34,638✔
1649
  connBuf->total = -1;
34,638✔
1650
  connBuf->len = 0;
34,638✔
1651
  connBuf->cap = 0;
34,638✔
1652
}
1653

1654
void udfcUvHandleError(SClientUvConn *conn) {
1✔
1655
  fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe);
1!
1656
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
1!
UNCOV
1657
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
×
UNCOV
1658
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
×
UNCOV
1659
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
×
UNCOV
1660
    QUEUE_REMOVE(&task->connTaskQueue);
×
1661
    QUEUE_REMOVE(&task->procTaskQueue);
×
1662
    uv_sem_post(&task->taskSem);
×
1663
  }
1664
  if (!uv_is_closing((uv_handle_t *)conn->pipe)) {
1!
1665
    uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose);
1✔
1666
  }
1667
}
1✔
1668

1669
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
97,200✔
1670
  fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread);
97,200✔
1671
  if (nread == 0) return;
97,200✔
1672

1673
  SClientUvConn  *conn = client->data;
69,291✔
1674
  SClientConnBuf *connBuf = &conn->readBuf;
69,291✔
1675
  if (nread > 0) {
69,291✔
1676
    connBuf->len += nread;
69,290✔
1677
    if (isUdfcUvMsgComplete(connBuf)) {
69,290✔
1678
      udfcUvHandleRsp(conn);
34,638✔
1679
    }
1680
  }
1681
  if (nread < 0) {
69,291✔
1682
    fnError("udfc client pipe %p read error: %zd(%s).", client, nread, uv_strerror(nread));
1!
1683
    if (nread == UV_EOF) {
1!
1684
      fnError("\tudfc client pipe %p closed", client);
1!
1685
    }
1686
    udfcUvHandleError(conn);
1✔
1687
  }
1688
}
1689

1690
void onUdfcPipeWrite(uv_write_t *write, int32_t status) {
34,638✔
1691
  SClientUvConn *conn = write->data;
34,638✔
1692
  if (status < 0) {
34,638!
UNCOV
1693
    fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
×
UNCOV
1694
    udfcUvHandleError(conn);
×
1695
  } else {
1696
    fnDebug("udfc client connection %p write succeed", conn);
34,638✔
1697
  }
1698
  taosMemoryFree(write);
34,638✔
1699
}
34,638✔
1700

1701
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status) {
924✔
1702
  SClientUvTaskNode *uvTask = connect->data;
924✔
1703
  if (status != 0) {
924✔
1704
    fnError("client connect error, task seq: %" PRId64 ", code: %s", uvTask->seqNum, uv_strerror(status));
92!
1705
  }
1706
  uvTask->errCode = status;
924✔
1707

1708
  int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
924✔
1709
  if (code != 0) {
924✔
1710
    fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code));
92!
1711
    uvTask->errCode = code;
92✔
1712
  }
1713
  taosMemoryFree(connect);
924✔
1714
  QUEUE_REMOVE(&uvTask->procTaskQueue);
924✔
1715
  uv_sem_post(&uvTask->taskSem);
924✔
1716
}
924✔
1717

1718
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask) {
36,295✔
1719
  uvTask->type = uvTaskType;
36,295✔
1720
  uvTask->udfc = task->session->udfc;
36,295✔
1721

1722
  if (uvTaskType == UV_TASK_CONNECT) {
36,295✔
1723
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
35,394✔
1724
    uvTask->pipe = task->session->udfUvPipe;
34,631✔
1725
    SUdfRequest request;
1726
    request.type = task->type;
34,631✔
1727
    request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
34,631✔
1728

1729
    if (task->type == UDF_TASK_SETUP) {
34,633✔
1730
      request.setup = task->_setup.req;
832✔
1731
      request.type = UDF_TASK_SETUP;
832✔
1732
    } else if (task->type == UDF_TASK_CALL) {
33,801✔
1733
      request.call = task->_call.req;
33,031✔
1734
      request.type = UDF_TASK_CALL;
33,031✔
1735
    } else if (task->type == UDF_TASK_TEARDOWN) {
770!
1736
      request.teardown = task->_teardown.req;
773✔
1737
      request.type = UDF_TASK_TEARDOWN;
773✔
1738
    } else {
UNCOV
1739
      fnError("udfc create uv task, invalid task type : %d", task->type);
×
1740
    }
1741
    int32_t bufLen = encodeUdfRequest(NULL, &request);
34,633✔
1742
    if (bufLen <= 0) {
34,569!
1743
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
UNCOV
1744
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1745
    }
1746
    request.msgLen = bufLen;
34,569✔
1747
    void *bufBegin = taosMemoryMalloc(bufLen);
34,569✔
1748
    if (bufBegin == NULL) {
34,570!
UNCOV
1749
      fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen);
×
UNCOV
1750
      return terrno;
×
1751
    }
1752
    void *buf = bufBegin;
34,570✔
1753
    if (encodeUdfRequest(&buf, &request) <= 0) {
34,570!
1754
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
UNCOV
1755
      taosMemoryFree(bufBegin);
×
UNCOV
1756
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1757
    }
1758

1759
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
34,597✔
1760
    uvTask->seqNum = request.seqNum;
34,517✔
1761
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
763!
1762
    uvTask->pipe = task->session->udfUvPipe;
773✔
1763
  }
1764
  if (uv_sem_init(&uvTask->taskSem, 0) != 0) {
36,181✔
1765
    if (uvTaskType == UV_TASK_REQ_RSP) {
14!
UNCOV
1766
      taosMemoryFree(uvTask->reqBuf.base);
×
1767
    }
1768
    fnError("udfc create uv task, init semaphore failed.");
14!
UNCOV
1769
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1770
  }
1771

1772
  return 0;
36,218✔
1773
}
1774

1775
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
36,222✔
1776
  fnDebug("queue uv task to event loop, uvTask: %d-%p", uvTask->type, uvTask);
36,222✔
1777
  SUdfcProxy *udfc = uvTask->udfc;
36,222✔
1778
  uv_mutex_lock(&udfc->taskQueueMutex);
36,222✔
1779
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
36,335✔
1780
  uv_mutex_unlock(&udfc->taskQueueMutex);
36,335✔
1781
  int32_t code = uv_async_send(&udfc->loopTaskAync);
36,335✔
1782
  if (code != 0) {
36,309!
UNCOV
1783
    fnError("udfc queue uv task to event loop failed. code: %s", uv_strerror(code));
×
UNCOV
1784
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1785
  }
1786

1787
  uv_sem_wait(&uvTask->taskSem);
36,309✔
1788
  fnDebug("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
36,030✔
1789
  uv_sem_destroy(&uvTask->taskSem);
36,030✔
1790

1791
  return 0;
35,993✔
1792
}
1793

1794
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
36,335✔
1795
  fnDebug("event loop start uv task. uvTask: %" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
36,335✔
1796
  int32_t code = 0;
36,335✔
1797

1798
  switch (uvTask->type) {
36,335!
1799
    case UV_TASK_CONNECT: {
924✔
1800
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
924✔
1801
      if (pipe == NULL) {
924!
UNCOV
1802
        fnError("udfc event loop start connect task malloc pipe failed.");
×
UNCOV
1803
        return terrno;
×
1804
      }
1805
      if (uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0) != 0) {
924!
1806
        fnError("udfc event loop start connect task uv_pipe_init failed.");
×
1807
        taosMemoryFree(pipe);
×
UNCOV
1808
        return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1809
      }
1810
      uvTask->pipe = pipe;
924✔
1811

1812
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
924✔
1813
      if (conn == NULL) {
924!
UNCOV
1814
        fnError("udfc event loop start connect task malloc conn failed.");
×
UNCOV
1815
        taosMemoryFree(pipe);
×
UNCOV
1816
        return terrno;
×
1817
      }
1818
      conn->pipe = pipe;
924✔
1819
      conn->readBuf.len = 0;
924✔
1820
      conn->readBuf.cap = 0;
924✔
1821
      conn->readBuf.buf = 0;
924✔
1822
      conn->readBuf.total = -1;
924✔
1823
      QUEUE_INIT(&conn->taskQueue);
924✔
1824

1825
      pipe->data = conn;
924✔
1826

1827
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
924✔
1828
      if (connReq == NULL) {
924!
UNCOV
1829
        fnError("udfc event loop start connect task malloc connReq failed.");
×
UNCOV
1830
        taosMemoryFree(pipe);
×
UNCOV
1831
        taosMemoryFree(conn);
×
UNCOV
1832
        return terrno;
×
1833
      }
1834
      connReq->data = uvTask;
924✔
1835
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
924✔
1836
      code = 0;
924✔
1837
      break;
924✔
1838
    }
1839
    case UV_TASK_REQ_RSP: {
34,638✔
1840
      uv_pipe_t *pipe = uvTask->pipe;
34,638✔
1841
      if (pipe == NULL) {
34,638!
UNCOV
1842
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1843
      } else {
1844
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
34,638✔
1845
        if (write == NULL) {
34,638!
1846
          fnError("udfc event loop start req_rsp task malloc write failed.");
×
UNCOV
1847
          return terrno;
×
1848
        }
1849
        write->data = pipe->data;
34,638✔
1850
        QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
34,638✔
1851
        QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
34,638✔
1852
        int32_t err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
34,638✔
1853
        if (err != 0) {
34,638!
UNCOV
1854
          taosMemoryFree(write);
×
UNCOV
1855
          fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err));
×
1856
        }
1857
        code = err;
34,638✔
1858
      }
1859
      break;
34,638✔
1860
    }
1861
    case UV_TASK_DISCONNECT: {
773✔
1862
      uv_pipe_t *pipe = uvTask->pipe;
773✔
1863
      if (pipe == NULL) {
773!
UNCOV
1864
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1865
      } else {
1866
        SClientUvConn *conn = pipe->data;
773✔
1867
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
773✔
1868
        if (!uv_is_closing((uv_handle_t *)uvTask->pipe)) {
773!
1869
          uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
773✔
1870
        }
1871
        code = 0;
773✔
1872
      }
1873
      break;
773✔
1874
    }
UNCOV
1875
    default: {
×
UNCOV
1876
      fnError("udfc event loop unknown task type.") break;
×
1877
    }
1878
  }
1879

1880
  return code;
36,335✔
1881
}
1882

1883
void udfcAsyncTaskCb(uv_async_t *async) {
33,303✔
1884
  SUdfcProxy *udfc = async->data;
33,303✔
1885
  QUEUE       wq;
1886

1887
  uv_mutex_lock(&udfc->taskQueueMutex);
33,303✔
1888
  QUEUE_MOVE(&udfc->taskQueue, &wq);
33,303✔
1889
  uv_mutex_unlock(&udfc->taskQueueMutex);
33,303✔
1890

1891
  while (!QUEUE_EMPTY(&wq)) {
69,638✔
1892
    QUEUE *h = QUEUE_HEAD(&wq);
36,335✔
1893
    QUEUE_REMOVE(h);
36,335✔
1894
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
36,335✔
1895
    int32_t            code = udfcStartUvTask(task);
36,335✔
1896
    if (code == 0) {
36,335!
1897
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
36,335✔
1898
    } else {
UNCOV
1899
      task->errCode = code;
×
UNCOV
1900
      uv_sem_post(&task->taskSem);
×
1901
    }
1902
  }
1903
}
33,303✔
1904

1905
void cleanUpUvTasks(SUdfcProxy *udfc) {
2,327✔
1906
  fnDebug("clean up uv tasks") QUEUE wq;
2,327✔
1907

1908
  uv_mutex_lock(&udfc->taskQueueMutex);
2,327✔
1909
  QUEUE_MOVE(&udfc->taskQueue, &wq);
2,327!
1910
  uv_mutex_unlock(&udfc->taskQueueMutex);
2,327✔
1911

1912
  while (!QUEUE_EMPTY(&wq)) {
2,327!
UNCOV
1913
    QUEUE *h = QUEUE_HEAD(&wq);
×
UNCOV
1914
    QUEUE_REMOVE(h);
×
UNCOV
1915
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
×
UNCOV
1916
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1917
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1918
    }
1919
    uv_sem_post(&task->taskSem);
×
1920
  }
1921

1922
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
2,327!
1923
    QUEUE *h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
×
UNCOV
1924
    QUEUE_REMOVE(h);
×
UNCOV
1925
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
×
UNCOV
1926
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1927
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1928
    }
1929
    uv_sem_post(&task->taskSem);
×
1930
  }
1931
}
2,327✔
1932

1933
void udfStopAsyncCb(uv_async_t *async) {
2,327✔
1934
  SUdfcProxy *udfc = async->data;
2,327✔
1935
  cleanUpUvTasks(udfc);
2,327✔
1936
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
2,327!
1937
    uv_stop(&udfc->uvLoop);
2,327✔
1938
  }
1939
}
2,327✔
1940

1941
void constructUdfService(void *argsThread) {
2,327✔
1942
  int32_t     code = 0, lino = 0;
2,327✔
1943
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
2,327✔
1944
  code = uv_loop_init(&udfc->uvLoop);
2,327✔
1945
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,327!
1946

1947
  code = uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
2,327✔
1948
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,327!
1949
  udfc->loopTaskAync.data = udfc;
2,327✔
1950
  code = uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
2,327✔
1951
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,327!
1952
  udfc->loopStopAsync.data = udfc;
2,327✔
1953
  code = uv_mutex_init(&udfc->taskQueueMutex);
2,327✔
1954
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,327!
1955
  QUEUE_INIT(&udfc->taskQueue);
2,327✔
1956
  QUEUE_INIT(&udfc->uvProcTaskQueue);
2,327✔
1957
  (void)uv_barrier_wait(&udfc->initBarrier);
2,327✔
1958
  // TODO return value of uv_run
1959
  int32_t num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
2,327✔
1960
  fnInfo("udfc uv loop exit. active handle num: %d", num);
2,327!
1961
  (void)uv_loop_close(&udfc->uvLoop);
2,327✔
1962

1963
  uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL);
2,327✔
1964
  num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
2,327✔
1965
  fnInfo("udfc uv loop exit. active handle num: %d", num);
2,327!
1966

1967
  (void)uv_loop_close(&udfc->uvLoop);
2,327✔
1968
_exit:
2,327✔
1969
  if (code != 0) {
2,327!
UNCOV
1970
    fnError("udfc construct error. code: %d, line: %d", code, lino);
×
1971
  }
1972
  fnInfo("udfc construct finished");
2,327!
1973
}
2,327✔
1974

1975
int32_t udfcOpen() {
2,834✔
1976
  int32_t code = 0, lino = 0;
2,834✔
1977
  int8_t  old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
2,834✔
1978
  if (old == 1) {
2,834✔
1979
    return 0;
507✔
1980
  }
1981
  SUdfcProxy *proxy = &gUdfcProxy;
2,327✔
1982
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
2,327✔
1983
  proxy->udfcState = UDFC_STATE_STARTNG;
2,327✔
1984
  code = uv_barrier_init(&proxy->initBarrier, 2);
2,327✔
1985
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,327!
1986
  code = uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
2,327✔
1987
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,327!
1988
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
2,327✔
1989
  proxy->udfcState = UDFC_STATE_READY;
2,327✔
1990
  (void)uv_barrier_wait(&proxy->initBarrier);
2,327✔
1991
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,327!
1992
  code = uv_mutex_init(&proxy->udfStubsMutex);
2,327✔
1993
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,327!
1994
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
2,327✔
1995
  if (proxy->udfStubs == NULL) {
2,327!
UNCOV
1996
    fnError("udfc init failed. udfStubs: %p", proxy->udfStubs);
×
UNCOV
1997
    return -1;
×
1998
  }
1999
  proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
2,327✔
2000
  if (proxy->expiredUdfStubs == NULL) {
2,327!
2001
    taosArrayDestroy(proxy->udfStubs);
×
UNCOV
2002
    fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs);
×
UNCOV
2003
    return -1;
×
2004
  }
2005
  code = uv_mutex_init(&proxy->udfcUvMutex);
2,327✔
2006
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,327!
2007
_exit:
2,327✔
2008
  if (code != 0) {
2,327!
UNCOV
2009
    fnError("udfc open error. code: %d, line: %d", code, lino);
×
UNCOV
2010
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
2011
  }
2012
  fnInfo("udfc initialized");
2,327!
2013
  return 0;
2,327✔
2014
}
2015

2016
int32_t udfcClose() {
2,362✔
2017
  int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 1, 0);
2,362✔
2018
  if (old == 0) {
2,362✔
2019
    return 0;
35✔
2020
  }
2021

2022
  SUdfcProxy *udfc = &gUdfcProxy;
2,327✔
2023
  udfc->udfcState = UDFC_STATE_STOPPING;
2,327✔
2024
  if (uv_async_send(&udfc->loopStopAsync) != 0) {
2,327!
UNCOV
2025
    fnError("udfc close error to send stop async");
×
2026
  }
2027
  if (uv_thread_join(&udfc->loopThread) != 0) {
2,327!
UNCOV
2028
    fnError("udfc close errir to join loop thread");
×
2029
  }
2030
  uv_mutex_destroy(&udfc->taskQueueMutex);
2,327✔
2031
  uv_barrier_destroy(&udfc->initBarrier);
2,327✔
2032
  taosArrayDestroy(udfc->expiredUdfStubs);
2,327✔
2033
  taosArrayDestroy(udfc->udfStubs);
2,327✔
2034
  uv_mutex_destroy(&udfc->udfStubsMutex);
2,327✔
2035
  uv_mutex_destroy(&udfc->udfcUvMutex);
2,327✔
2036
  udfc->udfcState = UDFC_STATE_INITAL;
2,327✔
2037
  fnInfo("udfc is cleaned up");
2,327!
2038
  return 0;
2,327✔
2039
}
2040

2041
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
36,311✔
2042
  int32_t            code = 0, lino = 0;
36,311✔
2043
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
36,311✔
2044
  if (uvTask == NULL) {
36,317!
UNCOV
2045
    fnError("udfc client task: %p failed to allocate memory for uvTask", task);
×
UNCOV
2046
    return terrno;
×
2047
  }
2048
  fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
36,317✔
2049

2050
  code = udfcInitializeUvTask(task, uvTaskType, uvTask);
36,317✔
2051
  TAOS_CHECK_GOTO(code, &lino, _exit);
36,241!
2052
  code = udfcQueueUvTask(uvTask);
36,241✔
2053
  TAOS_CHECK_GOTO(code, &lino, _exit);
35,990!
2054
  code = udfcGetUdfTaskResultFromUvTask(task, uvTask);
35,990✔
2055
  TAOS_CHECK_GOTO(code, &lino, _exit);
36,231✔
2056
  if (uvTaskType == UV_TASK_CONNECT) {
36,087✔
2057
    task->session->udfUvPipe = uvTask->pipe;
832✔
2058
    SClientUvConn *conn = uvTask->pipe->data;
832✔
2059
    conn->session = task->session;
832✔
2060
  }
2061

2062
_exit:
35,255✔
2063
  if (code != 0) {
36,231✔
2064
    fnError("udfc run udf uv task failure. task: %p, uvTask: %p, err: %d, line: %d", task, uvTask, code, lino);
144!
2065
  }
2066
  taosMemoryFree(uvTask->reqBuf.base);
36,231✔
2067
  uvTask->reqBuf.base = NULL;
36,275✔
2068
  taosMemoryFree(uvTask);
36,275✔
2069
  uvTask = NULL;
36,330✔
2070
  return code;
36,330✔
2071
}
2072

2073
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
924✔
2074
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
924✔
2075
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
924✔
2076
  if (task == NULL) {
924!
UNCOV
2077
    fnError("doSetupUdf, failed to allocate memory for task");
×
UNCOV
2078
    return terrno;
×
2079
  }
2080
  task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
924✔
2081
  if (task->session == NULL) {
924!
2082
    fnError("doSetupUdf, failed to allocate memory for session");
×
UNCOV
2083
    taosMemoryFree(task);
×
UNCOV
2084
    return terrno;
×
2085
  }
2086
  task->session->udfc = &gUdfcProxy;
924✔
2087
  task->type = UDF_TASK_SETUP;
924✔
2088

2089
  SUdfSetupRequest *req = &task->_setup.req;
924✔
2090
  tstrncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
924✔
2091

2092
  code = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
924✔
2093
  TAOS_CHECK_GOTO(code, &lino, _exit);
924✔
2094

2095
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
832✔
2096
  TAOS_CHECK_GOTO(code, &lino, _exit);
831✔
2097

2098
  SUdfSetupResponse *rsp = &task->_setup.rsp;
823✔
2099
  task->session->severHandle = rsp->udfHandle;
823✔
2100
  task->session->outputType = rsp->outputType;
823✔
2101
  task->session->bytes = rsp->bytes;
823✔
2102
  task->session->bufSize = rsp->bufSize;
823✔
2103
  tstrncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
823✔
2104
  fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
823!
2105
  *funcHandle = task->session;
824✔
2106
  taosMemoryFree(task);
824✔
2107
  return 0;
824✔
2108

2109
_exit:
100✔
2110
  if (code != 0) {
100!
2111
    fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino);
100!
2112
  }
2113
  taosMemoryFree(task->session);
100✔
2114
  taosMemoryFree(task);
100✔
2115
  return code;
100✔
2116
}
2117

2118
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
33,019✔
2119
                SSDataBlock *output, SUdfInterBuf *newState) {
2120
  fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
33,019✔
2121
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
33,026✔
2122
  if (session->udfUvPipe == NULL) {
33,026!
UNCOV
2123
    fnError("No pipe to udfd");
×
UNCOV
2124
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2125
  }
2126
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
33,026✔
2127
  if (task == NULL) {
33,027!
2128
    fnError("udfc call udf. failed to allocate memory for task");
×
UNCOV
2129
    return terrno;
×
2130
  }
2131
  task->session = (SUdfcUvSession *)handle;
33,027✔
2132
  task->type = UDF_TASK_CALL;
33,027✔
2133

2134
  SUdfCallRequest *req = &task->_call.req;
33,027✔
2135
  req->udfHandle = task->session->severHandle;
33,027✔
2136
  req->callType = callType;
33,027✔
2137

2138
  switch (callType) {
33,027!
2139
    case TSDB_UDF_CALL_AGG_INIT: {
1,113✔
2140
      req->initFirst = 1;
1,113✔
2141
      break;
1,113✔
2142
    }
2143
    case TSDB_UDF_CALL_AGG_PROC: {
3,029✔
2144
      req->block = *input;
3,029✔
2145
      req->interBuf = *state;
3,029✔
2146
      break;
3,029✔
2147
    }
UNCOV
2148
    case TSDB_UDF_CALL_AGG_MERGE: {
×
UNCOV
2149
      req->interBuf = *state;
×
UNCOV
2150
      req->interBuf2 = *state2;
×
UNCOV
2151
      break;
×
2152
    }
2153
    case TSDB_UDF_CALL_AGG_FIN: {
1,083✔
2154
      req->interBuf = *state;
1,083✔
2155
      break;
1,083✔
2156
    }
2157
    case TSDB_UDF_CALL_SCALA_PROC: {
27,795✔
2158
      req->block = *input;
27,795✔
2159
      break;
27,795✔
2160
    }
2161
  }
2162

2163
  int32_t code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
33,027✔
2164
  if (code != 0) {
32,917✔
2165
    fnError("call udf failure. udfcRunUdfUvTask err: %d", code);
44!
2166
  } else {
2167
    SUdfCallResponse *rsp = &task->_call.rsp;
32,873✔
2168
    switch (callType) {
32,873!
2169
      case TSDB_UDF_CALL_AGG_INIT: {
1,085✔
2170
        *newState = rsp->resultBuf;
1,085✔
2171
        break;
1,085✔
2172
      }
2173
      case TSDB_UDF_CALL_AGG_PROC: {
3,029✔
2174
        *newState = rsp->resultBuf;
3,029✔
2175
        break;
3,029✔
2176
      }
UNCOV
2177
      case TSDB_UDF_CALL_AGG_MERGE: {
×
UNCOV
2178
        *newState = rsp->resultBuf;
×
UNCOV
2179
        break;
×
2180
      }
2181
      case TSDB_UDF_CALL_AGG_FIN: {
1,083✔
2182
        *newState = rsp->resultBuf;
1,083✔
2183
        break;
1,083✔
2184
      }
2185
      case TSDB_UDF_CALL_SCALA_PROC: {
27,686✔
2186
        *output = rsp->resultData;
27,686✔
2187
        break;
27,686✔
2188
      }
2189
    }
2190
  };
2191
  taosMemoryFree(task);
32,917✔
2192
  return code;
33,028✔
2193
}
2194

2195
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
1,113✔
2196
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;
1,113✔
2197

2198
  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
1,113✔
2199

2200
  return err;
1,113✔
2201
}
2202

2203
// input: block, state
2204
// output: interbuf,
2205
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
3,029✔
2206
  int8_t  callType = TSDB_UDF_CALL_AGG_PROC;
3,029✔
2207
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
3,029✔
2208
  return err;
3,029✔
2209
}
2210

2211
// input: interbuf1, interbuf2
2212
// output: resultBuf
UNCOV
2213
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
×
2214
                          SUdfInterBuf *resultBuf) {
UNCOV
2215
  int8_t  callType = TSDB_UDF_CALL_AGG_MERGE;
×
UNCOV
2216
  int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
×
2217
  return err;
×
2218
}
2219

2220
// input: interBuf
2221
// output: resultData
2222
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
1,083✔
2223
  int8_t  callType = TSDB_UDF_CALL_AGG_FIN;
1,083✔
2224
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
1,083✔
2225
  return err;
1,083✔
2226
}
2227

2228
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
27,807✔
2229
  int8_t      callType = TSDB_UDF_CALL_SCALA_PROC;
27,807✔
2230
  SSDataBlock inputBlock = {0};
27,807✔
2231
  int32_t     code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
27,807✔
2232
  if (code != 0) {
27,805!
UNCOV
2233
    fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code);
×
UNCOV
2234
    return code;
×
2235
  }
2236
  SSDataBlock resultBlock = {0};
27,805✔
2237
  int32_t     err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
27,805✔
2238
  if (err == 0) {
27,705!
2239
    err = convertDataBlockToScalarParm(&resultBlock, output);
27,714✔
2240
    taosArrayDestroy(resultBlock.pDataBlock);
27,704✔
2241
  }
2242

2243
  blockDataFreeRes(&inputBlock);
27,772✔
2244
  return err;
27,725✔
2245
}
2246

2247
int32_t doTeardownUdf(UdfcFuncHandle handle) {
773✔
2248
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
773✔
2249
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
773✔
2250

2251
  if (session->udfUvPipe == NULL) {
773!
UNCOV
2252
    fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
×
UNCOV
2253
    taosMemoryFree(session);
×
UNCOV
2254
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2255
  }
2256

2257
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
773✔
2258
  if (task == NULL) {
773!
UNCOV
2259
    fnError("doTeardownUdf, failed to allocate memory for task");
×
UNCOV
2260
    taosMemoryFree(session);
×
UNCOV
2261
    return terrno;
×
2262
  }
2263
  task->session = session;
773✔
2264
  task->type = UDF_TASK_TEARDOWN;
773✔
2265

2266
  SUdfTeardownRequest *req = &task->_teardown.req;
773✔
2267
  req->udfHandle = task->session->severHandle;
773✔
2268

2269
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
773✔
2270
  TAOS_CHECK_GOTO(code, &lino, _exit);
773!
2271

2272
  code = udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
773✔
2273
  TAOS_CHECK_GOTO(code, &lino, _exit);
773!
2274

2275
  fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
773!
2276
  // TODO: synchronization refactor between libuv event loop and request thread
2277
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
773✔
2278
  if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
773!
UNCOV
2279
    SClientUvConn *conn = session->udfUvPipe->data;
×
UNCOV
2280
    conn->session = NULL;
×
2281
  }
2282
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
773✔
2283

2284
_exit:
773✔
2285
  if (code != 0) {
773!
UNCOV
2286
    fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino);
×
2287
  }
2288
  taosMemoryFree(session);
773✔
2289
  taosMemoryFree(task);
773✔
2290

2291
  return code;
773✔
2292
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc