• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3825

01 Apr 2025 11:58AM UTC coverage: 34.067% (+0.003%) from 34.064%
#3825

push

travis-ci

happyguoxy
test:alter gcda dir

148492 of 599532 branches covered (24.77%)

Branch coverage included in aggregate %.

222504 of 489471 relevant lines covered (45.46%)

762290.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.53
/source/libs/function/src/tudf.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifdef USE_UDF
16
#include "uv.h"
17

18
#include "os.h"
19

20
#include "builtinsimpl.h"
21
#include "fnLog.h"
22
#include "functionMgt.h"
23
#include "querynodes.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tglobal.h"
27
#include "tudf.h"
28
#include "tudfInt.h"
29

30
#ifdef _TD_DARWIN_64
31
#include <mach-o/dyld.h>
32
#endif
33

34
typedef struct SUdfdData {
35
  bool         startCalled;
36
  bool         needCleanUp;
37
  uv_loop_t    loop;
38
  uv_thread_t  thread;
39
  uv_barrier_t barrier;
40
  uv_process_t process;
41
#ifdef WINDOWS
42
  HANDLE jobHandle;
43
#endif
44
  int32_t    spawnErr;
45
  uv_pipe_t  ctrlPipe;
46
  uv_async_t stopAsync;
47
  int32_t    stopCalled;
48

49
  int32_t dnodeId;
50
} SUdfdData;
51

52
SUdfdData udfdGlobal = {0};
53

54
int32_t udfStartUdfd(int32_t startDnodeId);
55
void    udfStopUdfd();
56

57
extern char **environ;
58

59
static int32_t udfSpawnUdfd(SUdfdData *pData);
60
void           udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
61
static void    udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
62
static void    udfUdfdStopAsyncCb(uv_async_t *async);
63
static void    udfWatchUdfd(void *args);
64

65
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
×
66
  TAOS_UDF_CHECK_PTR_RVOID(process);
×
67
  fnInfo("taosudf process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
×
68
  SUdfdData *pData = process->data;
×
69
  if(pData == NULL) {
×
70
    fnError("taosudf process data is NULL");
×
71
    return;
×
72
  }
73
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
×
74
    fnInfo("taosudf process exit due to SIGINT or dnode-mgmt called stop");
×
75
  } else {
76
    fnInfo("taosudf process restart");
×
77
    int32_t code = udfSpawnUdfd(pData);
×
78
    if (code != 0) {
×
79
      fnError("taosudf process restart failed with code:%d", code);
×
80
    }
81
  }
82
}
83

84
static int32_t udfSpawnUdfd(SUdfdData *pData) {
150✔
85
  fnInfo("start to init taosudf");
150!
86
  TAOS_UDF_CHECK_PTR_RCODE(pData);
300!
87

88
  int32_t              err = 0;
150✔
89
  uv_process_options_t options = {0};
150✔
90

91
  char path[PATH_MAX] = {0};
150✔
92
  if (tsProcPath == NULL) {
150✔
93
    path[0] = '.';
16✔
94
#ifdef WINDOWS
95
    GetModuleFileName(NULL, path, PATH_MAX);
96
    TAOS_DIRNAME(path);
97
#elif defined(_TD_DARWIN_64)
98
    uint32_t pathSize = sizeof(path);
99
    _NSGetExecutablePath(path, &pathSize);
100
    TAOS_DIRNAME(path);
101
#endif
102
  } else {
103
    TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
134✔
104
    TAOS_DIRNAME(path);
134✔
105
  }
106
#ifdef WINDOWS
107
  if (strlen(path) == 0) {
108
    TAOS_STRCAT(path, "C:\\TDengine");
109
  }
110
  TAOS_STRCAT(path, "\\taosudf.exe");
111
#else
112
  if (strlen(path) == 0) {
150✔
113
    TAOS_STRCAT(path, "/usr/bin");
18✔
114
  }
115
  TAOS_STRCAT(path, "/taosudf");
150✔
116
#endif
117
  char *argsUdfd[] = {path, "-c", configDir, NULL};
150✔
118
  options.args = argsUdfd;
150✔
119
  options.file = path;
150✔
120

121
  options.exit_cb = udfUdfdExit;
150✔
122

123
  TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
150!
124

125
  uv_stdio_container_t child_stdio[3];
126
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
150✔
127
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
150✔
128
  child_stdio[1].flags = UV_IGNORE;
150✔
129
  child_stdio[2].flags = UV_INHERIT_FD;
150✔
130
  child_stdio[2].data.fd = 2;
150✔
131
  options.stdio_count = 3;
150✔
132
  options.stdio = child_stdio;
150✔
133

134
  options.flags = UV_PROCESS_DETACHED;
150✔
135

136
  char dnodeIdEnvItem[32] = {0};
150✔
137
  char thrdPoolSizeEnvItem[32] = {0};
150✔
138
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
150✔
139

140
  float   numCpuCores = 4;
150✔
141
  int32_t code = taosGetCpuCores(&numCpuCores, false);
150✔
142
  if (code != 0) {
150!
143
    fnError("failed to get cpu cores, code:0x%x", code);
×
144
  }
145
  numCpuCores = TMAX(numCpuCores, 2);
150!
146
  snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);
150✔
147

148
  char    pathTaosdLdLib[512] = {0};
150✔
149
  size_t  taosdLdLibPathLen = sizeof(pathTaosdLdLib);
150✔
150
  int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
150✔
151
  if (ret != UV_ENOBUFS) {
150!
152
    taosdLdLibPathLen = strlen(pathTaosdLdLib);
150✔
153
  }
154

155
  char   udfdPathLdLib[1024] = {0};
150✔
156
  size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
150✔
157
  tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));
150✔
158

159
  udfdPathLdLib[udfdLdLibPathLen] = ':';
150✔
160
  tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
150✔
161
  if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
150!
162
    fnInfo("[UDFD]taosudf LD_LIBRARY_PATH: %s", udfdPathLdLib);
150!
163
  } else {
164
    fnError("[UDFD]can not set correct taosudf LD_LIBRARY_PATH");
×
165
  }
166
  char ldLibPathEnvItem[1024 + 32] = {0};
150✔
167
  snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
150✔
168

169
  char *taosFqdnEnvItem = NULL;
150✔
170
  char *taosFqdn = getenv("TAOS_FQDN");
150✔
171
  if (taosFqdn != NULL) {
150!
172
    int32_t subLen = strlen(taosFqdn);
×
173
    int32_t len = strlen("TAOS_FQDN=") + subLen + 1;
×
174
    taosFqdnEnvItem = taosMemoryMalloc(len);
×
175
    if (taosFqdnEnvItem != NULL) {
×
176
      tstrncpy(taosFqdnEnvItem, "TAOS_FQDN=", len);
×
177
      TAOS_STRNCAT(taosFqdnEnvItem, taosFqdn, subLen);
×
178
      fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
×
179
    } else {
180
      fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
×
181
      return terrno;
×
182
    }
183
  }
184

185
  char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};
150✔
186

187
  char **envUdfdWithPEnv = NULL;
150✔
188
  if (environ != NULL) {
150!
189
    int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
150✔
190
    int32_t numEnviron = 0;
150✔
191
    while (environ[numEnviron] != NULL) {
3,543✔
192
      numEnviron++;
3,393✔
193
    }
194

195
    envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
150!
196
    if (envUdfdWithPEnv == NULL) {
150!
197
      err = TSDB_CODE_OUT_OF_MEMORY;
×
198
      goto _OVER;
×
199
    }
200

201
    for (int32_t i = 0; i < numEnviron; i++) {
3,543✔
202
      int32_t len = strlen(environ[i]) + 1;
3,393✔
203
      envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
3,393!
204
      if (envUdfdWithPEnv[i] == NULL) {
3,393!
205
        err = TSDB_CODE_OUT_OF_MEMORY;
×
206
        goto _OVER;
×
207
      }
208

209
      tstrncpy(envUdfdWithPEnv[i], environ[i], len);
3,393✔
210
    }
211

212
    for (int32_t i = 0; i < lenEnvUdfd; i++) {
900✔
213
      if (envUdfd[i] != NULL) {
750✔
214
        int32_t len = strlen(envUdfd[i]) + 1;
450✔
215
        envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
450!
216
        if (envUdfdWithPEnv[numEnviron + i] == NULL) {
450!
217
          err = TSDB_CODE_OUT_OF_MEMORY;
×
218
          goto _OVER;
×
219
        }
220

221
        tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
450✔
222
      }
223
    }
224
    envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
150✔
225

226
    options.env = envUdfdWithPEnv;
150✔
227
  } else {
228
    options.env = envUdfd;
×
229
  }
230

231
  err = uv_spawn(&pData->loop, &pData->process, &options);
150✔
232
  pData->process.data = (void *)pData;
86✔
233

234
#ifdef WINDOWS
235
  // End taosudf.exe by Job.
236
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
237
  pData->jobHandle = CreateJobObject(NULL, NULL);
238
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
239
  if (!add_job_ok) {
240
    fnError("Assign taosudf to job failed.");
241
  } else {
242
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
243
    memset(&limit_info, 0x0, sizeof(limit_info));
244
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
245
    bool set_auto_kill_ok =
246
        SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
247
    if (!set_auto_kill_ok) {
248
      fnError("Set job auto kill taosudf failed.");
249
    }
250
  }
251
#endif
252

253
  if (err != 0) {
86✔
254
    fnError("can not spawn taosudf. path: %s, error: %s", path, uv_strerror(err));
25!
255
  } else {
256
    fnInfo("taosudf is initialized");
61!
257
  }
258

259
_OVER:
×
260
  if (taosFqdnEnvItem) {
86!
261
    taosMemoryFree(taosFqdnEnvItem);
×
262
  }
263

264
  if (envUdfdWithPEnv != NULL) {
86!
265
    int32_t i = 0;
86✔
266
    while (envUdfdWithPEnv[i] != NULL) {
2,321✔
267
      taosMemoryFree(envUdfdWithPEnv[i]);
2,235!
268
      i++;
2,235✔
269
    }
270
    taosMemoryFree(envUdfdWithPEnv);
86!
271
  }
272

273
  return err;
86✔
274
}
275

276
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) {
450✔
277
  TAOS_UDF_CHECK_PTR_RVOID(handle);
900!
278
  if (!uv_is_closing(handle)) {
450!
279
    uv_close(handle, NULL);
450✔
280
  }
281
}
282

283
static void udfUdfdStopAsyncCb(uv_async_t *async) {
61✔
284
  TAOS_UDF_CHECK_PTR_RVOID(async);
122!
285
  SUdfdData *pData = async->data;
61✔
286
  uv_stop(&pData->loop);
61✔
287
}
288

289
static void udfWatchUdfd(void *args) {
133✔
290
  TAOS_UDF_CHECK_PTR_RVOID(args);
266!
291
  SUdfdData *pData = args;
133✔
292
  TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
133!
293
  TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb));
133!
294
  pData->stopAsync.data = pData;
133✔
295
  TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData));
133✔
296
  atomic_store_32(&pData->spawnErr, 0);
61✔
297
  (void)uv_barrier_wait(&pData->barrier);
61✔
298
  int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
61✔
299
  fnInfo("taosudf loop exit with %d active handles, line:%d", num, __LINE__);
61!
300

301
  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
61✔
302
  num = uv_run(&pData->loop, UV_RUN_DEFAULT);
61✔
303
  fnInfo("taosudf loop exit with %d active handles, line:%d", num, __LINE__);
61!
304
  if (uv_loop_close(&pData->loop) != 0) {
61!
305
    fnError("taosudf loop close failed, lino:%d", __LINE__);
×
306
  }
307
  return;
61✔
308

309
_exit:
8✔
310
  if (terrno != 0) {
8!
311
    (void)uv_barrier_wait(&pData->barrier);
8✔
312
    atomic_store_32(&pData->spawnErr, terrno);
8✔
313
    if (uv_loop_close(&pData->loop) != 0) {
8!
314
      fnError("taosudf loop close failed, lino:%d", __LINE__);
8!
315
    }
316
    fnError("taosudf thread exit with code:%d lino:%d", terrno, terrln);
8!
317
    terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE;
8✔
318
  }
319
  return;
8✔
320
}
321

322
int32_t udfStartUdfd(int32_t startDnodeId) {
134✔
323
  int32_t code = 0, lino = 0;
134✔
324
  if (!tsStartUdfd) {
134✔
325
    fnInfo("start taosudf is disabled.") return 0;
1!
326
  }
327
  SUdfdData *pData = &udfdGlobal;
133✔
328
  if (pData->startCalled) {
133!
329
    fnInfo("dnode start taosudf already called");
×
330
    return 0;
×
331
  }
332
  pData->startCalled = true;
133✔
333
  char dnodeId[8] = {0};
133✔
334
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
133✔
335
  TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
133!
336
  pData->dnodeId = startDnodeId;
133✔
337

338
  TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
133!
339
  TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, udfWatchUdfd, pData), &lino, _exit);
133!
340
  (void)uv_barrier_wait(&pData->barrier);
133✔
341
  int32_t err = atomic_load_32(&pData->spawnErr);
69✔
342
  if (err != 0) {
69✔
343
    uv_barrier_destroy(&pData->barrier);
8✔
344
    if (uv_async_send(&pData->stopAsync) != 0) {
8!
345
      fnError("start taosudf: failed to send stop async");
×
346
    }
347
    if (uv_thread_join(&pData->thread) != 0) {
8!
348
      fnError("start taosudf: failed to join taosudf thread");
×
349
    }
350
    pData->needCleanUp = false;
8✔
351
    fnInfo("taosudf is cleaned up after spawn err");
8!
352
    TAOS_CHECK_GOTO(err, &lino, _exit);
8!
353
  } else {
354
    pData->needCleanUp = true;
61✔
355
  }
356
_exit:
69✔
357
  if (code != 0) {
69✔
358
    fnError("taosudf start failed with code:%d, lino:%d", code, lino);
8!
359
  }
360
  return code;
69✔
361
}
362

363
void udfStopUdfd() {
70✔
364
  SUdfdData *pData = &udfdGlobal;
70✔
365
  fnInfo("taosudf start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
70!
366
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
70!
367
    return;
9✔
368
  }
369
  atomic_store_32(&pData->stopCalled, 1);
61✔
370
  pData->needCleanUp = false;
61✔
371
  uv_barrier_destroy(&pData->barrier);
61✔
372
  if (uv_async_send(&pData->stopAsync) != 0) {
61!
373
    fnError("stop taosudf: failed to send stop async");
×
374
  }
375
  if (uv_thread_join(&pData->thread) != 0) {
61!
376
    fnError("stop taosudf: failed to join taosudf thread");
×
377
  }
378

379
#ifdef WINDOWS
380
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
381
#endif
382
  fnInfo("taosudf is cleaned up");
61!
383
  return;
61✔
384
}
385

386
/*
387
int32_t udfGetUdfdPid(int32_t* pUdfdPid) {
388
  SUdfdData *pData = &udfdGlobal;
389
  if (pData->spawnErr) {
390
    return pData->spawnErr;
391
  }
392
  uv_pid_t pid = uv_process_get_pid(&pData->process);
393
  if (pUdfdPid) {
394
    *pUdfdPid = (int32_t)pid;
395
  }
396
  return TSDB_CODE_SUCCESS;
397
}
398
*/
399

400
//==============================================================================================
401
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
402
 * The QUEUE is copied from queue.h under libuv
403
 * */
404

405
typedef void *QUEUE[2];
406

407
/* Private macros. */
408
#define QUEUE_NEXT(q)      (*(QUEUE **)&((*(q))[0]))
409
#define QUEUE_PREV(q)      (*(QUEUE **)&((*(q))[1]))
410
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
411
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
412

413
/* Public macros. */
414
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr) - offsetof(type, field)))
415

416
/* Important note: mutating the list while QUEUE_FOREACH is
417
 * iterating over its elements results in undefined behavior.
418
 */
419
#define QUEUE_FOREACH(q, h) for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
420

421
#define QUEUE_EMPTY(q) ((const QUEUE *)(q) == (const QUEUE *)QUEUE_NEXT(q))
422

423
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
424

425
#define QUEUE_INIT(q)    \
426
  do {                   \
427
    QUEUE_NEXT(q) = (q); \
428
    QUEUE_PREV(q) = (q); \
429
  } while (0)
430

431
#define QUEUE_ADD(h, n)                 \
432
  do {                                  \
433
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
434
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
435
    QUEUE_PREV(h) = QUEUE_PREV(n);      \
436
    QUEUE_PREV_NEXT(h) = (h);           \
437
  } while (0)
438

439
#define QUEUE_SPLIT(h, q, n)       \
440
  do {                             \
441
    QUEUE_PREV(n) = QUEUE_PREV(h); \
442
    QUEUE_PREV_NEXT(n) = (n);      \
443
    QUEUE_NEXT(n) = (q);           \
444
    QUEUE_PREV(h) = QUEUE_PREV(q); \
445
    QUEUE_PREV_NEXT(h) = (h);      \
446
    QUEUE_PREV(q) = (n);           \
447
  } while (0)
448

449
#define QUEUE_MOVE(h, n)        \
450
  do {                          \
451
    if (QUEUE_EMPTY(h))         \
452
      QUEUE_INIT(n);            \
453
    else {                      \
454
      QUEUE *q = QUEUE_HEAD(h); \
455
      QUEUE_SPLIT(h, q, n);     \
456
    }                           \
457
  } while (0)
458

459
#define QUEUE_INSERT_HEAD(h, q)    \
460
  do {                             \
461
    QUEUE_NEXT(q) = QUEUE_NEXT(h); \
462
    QUEUE_PREV(q) = (h);           \
463
    QUEUE_NEXT_PREV(q) = (q);      \
464
    QUEUE_NEXT(h) = (q);           \
465
  } while (0)
466

467
#define QUEUE_INSERT_TAIL(h, q)    \
468
  do {                             \
469
    QUEUE_NEXT(q) = (h);           \
470
    QUEUE_PREV(q) = QUEUE_PREV(h); \
471
    QUEUE_PREV_NEXT(q) = (q);      \
472
    QUEUE_PREV(h) = (q);           \
473
  } while (0)
474

475
#define QUEUE_REMOVE(q)                 \
476
  do {                                  \
477
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
478
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
479
  } while (0)
480

481
enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 };
482

483
int64_t gUdfTaskSeqNum = 0;
484
typedef struct SUdfcFuncStub {
485
  char           udfName[TSDB_FUNC_NAME_LEN + 1];
486
  UdfcFuncHandle handle;
487
  int32_t        refCount;
488
  int64_t        createTime;
489
} SUdfcFuncStub;
490

491
typedef struct SUdfcProxy {
492
  char         udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
493
  uv_barrier_t initBarrier;
494

495
  uv_loop_t   uvLoop;
496
  uv_thread_t loopThread;
497
  uv_async_t  loopTaskAync;
498

499
  uv_async_t loopStopAsync;
500

501
  uv_mutex_t taskQueueMutex;
502
  int8_t     udfcState;
503
  QUEUE      taskQueue;
504
  QUEUE      uvProcTaskQueue;
505

506
  uv_mutex_t udfStubsMutex;
507
  SArray    *udfStubs;         // SUdfcFuncStub
508
  SArray    *expiredUdfStubs;  // SUdfcFuncStub
509

510
  uv_mutex_t udfcUvMutex;
511
  int8_t     initialized;
512
} SUdfcProxy;
513

514
SUdfcProxy gUdfcProxy = {0};
515

516
typedef struct SUdfcUvSession {
517
  SUdfcProxy *udfc;
518
  int64_t     severHandle;
519
  uv_pipe_t  *udfUvPipe;
520

521
  int8_t  outputType;
522
  int32_t bytes;
523
  int32_t bufSize;
524

525
  char udfName[TSDB_FUNC_NAME_LEN + 1];
526
} SUdfcUvSession;
527

528
typedef struct SClientUvTaskNode {
529
  SUdfcProxy *udfc;
530
  int8_t      type;
531
  int32_t     errCode;
532

533
  uv_pipe_t *pipe;
534

535
  int64_t  seqNum;
536
  uv_buf_t reqBuf;
537

538
  uv_sem_t taskSem;
539
  uv_buf_t rspBuf;
540

541
  QUEUE recvTaskQueue;
542
  QUEUE procTaskQueue;
543
  QUEUE connTaskQueue;
544
} SClientUvTaskNode;
545

546
typedef struct SClientUdfTask {
547
  int8_t type;
548

549
  SUdfcUvSession *session;
550

551
  union {
552
    struct {
553
      SUdfSetupRequest  req;
554
      SUdfSetupResponse rsp;
555
    } _setup;
556
    struct {
557
      SUdfCallRequest  req;
558
      SUdfCallResponse rsp;
559
    } _call;
560
    struct {
561
      SUdfTeardownRequest  req;
562
      SUdfTeardownResponse rsp;
563
    } _teardown;
564
  };
565

566
} SClientUdfTask;
567

568
typedef struct SClientConnBuf {
569
  char   *buf;
570
  int32_t len;
571
  int32_t cap;
572
  int32_t total;
573
} SClientConnBuf;
574

575
typedef struct SClientUvConn {
576
  uv_pipe_t      *pipe;
577
  QUEUE           taskQueue;
578
  SClientConnBuf  readBuf;
579
  SUdfcUvSession *session;
580
} SClientUvConn;
581

582
enum {
583
  UDFC_STATE_INITAL = 0,  // initial state
584
  UDFC_STATE_STARTNG,     // starting after udfcOpen
585
  UDFC_STATE_READY,       // started and begin to receive quests
586
  UDFC_STATE_STOPPING,    // stopping after udfcClose
587
};
588

589
void    getUdfdPipeName(char *pipeName, int32_t size);
590
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
591
void   *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request);
592
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state);
593
void   *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state);
594
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call);
595
void   *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call);
596
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown);
597
void   *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown);
598
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request);
599
void   *decodeUdfRequest(const void *buf, SUdfRequest *request);
600
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp);
601
void   *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp);
602
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp);
603
void   *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp);
604
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp);
605
void   *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse);
606
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp);
607
void   *decodeUdfResponse(const void *buf, SUdfResponse *rsp);
608
void    freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
609
void    freeUdfColumn(SUdfColumn *col);
610
void    freeUdfDataDataBlock(SUdfDataBlock *block);
611
void    freeUdfInterBuf(SUdfInterBuf *buf);
612
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
613
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
614
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
615
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);
616

617
void getUdfdPipeName(char *pipeName, int32_t size) {
141✔
618
  char    dnodeId[8] = {0};
141✔
619
  size_t  dnodeIdSize = sizeof(dnodeId);
141✔
620
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
141✔
621
  if (err != 0) {
141✔
622
    fnError("failed to get dnodeId from env since %s", uv_err_name(err));
1!
623
    dnodeId[0] = '1';
1✔
624
  }
625
#ifdef _WIN32
626
  snprintf(pipeName, size, "%s.%x.%s", UDF_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir)),
627
           dnodeId);
628
#else
629
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
141✔
630
#endif
631
  fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName);
141!
632
}
141✔
633

634
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
682✔
635
  int32_t len = 0;
682✔
636
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
682✔
637
  return len;
682✔
638
}
639

640
void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request) {
341✔
641
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
341✔
642
  return (void *)buf;
341✔
643
}
644

645
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state) {
6,726✔
646
  int32_t len = 0;
6,726✔
647
  len += taosEncodeFixedI8(buf, state->numOfResult);
6,726✔
648
  len += taosEncodeFixedI32(buf, state->bufLen);
6,726✔
649
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
6,726✔
650
  return len;
6,726✔
651
}
652

653
void *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state) {
3,363✔
654
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
3,363✔
655
  buf = taosDecodeFixedI32(buf, &state->bufLen);
3,363!
656
  buf = taosDecodeBinary(buf, (void **)&state->buf, state->bufLen);
3,363!
657
  return (void *)buf;
3,363✔
658
}
659

660
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
4,893✔
661
  int32_t len = 0;
4,893✔
662
  len += taosEncodeFixedI64(buf, call->udfHandle);
4,893✔
663
  len += taosEncodeFixedI8(buf, call->callType);
4,893✔
664
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
4,893✔
665
    len += tEncodeDataBlock(buf, &call->block);
1,192✔
666
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
3,701✔
667
    len += taosEncodeFixedI8(buf, call->initFirst);
1,356✔
668
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
3,023✔
669
    len += tEncodeDataBlock(buf, &call->block);
2,410✔
670
    len += encodeUdfInterBuf(buf, &call->interBuf);
2,410✔
671
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
613!
672
    // len += encodeUdfInterBuf(buf, &call->interBuf);
673
    // len += encodeUdfInterBuf(buf, &call->interBuf2);
674
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
614!
675
    len += encodeUdfInterBuf(buf, &call->interBuf);
614✔
676
  }
677
  return len;
4,892✔
678
}
679

680
void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) {
2,447✔
681
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
2,447!
682
  buf = taosDecodeFixedI8(buf, &call->callType);
2,447✔
683
  switch (call->callType) {
2,447!
684
    case TSDB_UDF_CALL_SCALA_PROC:
596✔
685
      buf = tDecodeDataBlock(buf, &call->block);
596✔
686
      break;
596✔
687
    case TSDB_UDF_CALL_AGG_INIT:
339✔
688
      buf = taosDecodeFixedI8(buf, &call->initFirst);
339✔
689
      break;
339✔
690
    case TSDB_UDF_CALL_AGG_PROC:
1,205✔
691
      buf = tDecodeDataBlock(buf, &call->block);
1,205✔
692
      buf = decodeUdfInterBuf(buf, &call->interBuf);
1,205✔
693
      break;
1,205✔
694
    // case TSDB_UDF_CALL_AGG_MERGE:
695
    //   buf = decodeUdfInterBuf(buf, &call->interBuf);
696
    //   buf = decodeUdfInterBuf(buf, &call->interBuf2);
697
    //   break;
698
    case TSDB_UDF_CALL_AGG_FIN:
307✔
699
      buf = decodeUdfInterBuf(buf, &call->interBuf);
307✔
700
      break;
307✔
701
  }
702
  return (void *)buf;
2,447✔
703
}
704

705
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
612✔
706
  int32_t len = 0;
612✔
707
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
612✔
708
  return len;
612✔
709
}
710

711
void *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown) {
306✔
712
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
306!
713
  return (void *)buf;
306✔
714
}
715

716
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request) {
6,187✔
717
  int32_t len = 0;
6,187✔
718
  if (buf == NULL) {
6,187✔
719
    len += sizeof(request->msgLen);
3,094✔
720
  } else {
721
    *(int32_t *)(*buf) = request->msgLen;
3,093✔
722
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
3,093✔
723
  }
724
  len += taosEncodeFixedI64(buf, request->seqNum);
6,187✔
725
  len += taosEncodeFixedI8(buf, request->type);
6,187✔
726
  if (request->type == UDF_TASK_SETUP) {
6,187✔
727
    len += encodeUdfSetupRequest(buf, &request->setup);
682✔
728
  } else if (request->type == UDF_TASK_CALL) {
5,505✔
729
    len += encodeUdfCallRequest(buf, &request->call);
4,893✔
730
  } else if (request->type == UDF_TASK_TEARDOWN) {
612!
731
    len += encodeUdfTeardownRequest(buf, &request->teardown);
612✔
732
  }
733
  return len;
6,187✔
734
}
735

736
void *decodeUdfRequest(const void *buf, SUdfRequest *request) {
3,094✔
737
  request->msgLen = *(int32_t *)(buf);
3,094✔
738
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
3,094✔
739

740
  buf = taosDecodeFixedI64(buf, &request->seqNum);
3,094!
741
  buf = taosDecodeFixedI8(buf, &request->type);
3,094✔
742

743
  if (request->type == UDF_TASK_SETUP) {
3,094✔
744
    buf = decodeUdfSetupRequest(buf, &request->setup);
341✔
745
  } else if (request->type == UDF_TASK_CALL) {
2,753✔
746
    buf = decodeUdfCallRequest(buf, &request->call);
2,447✔
747
  } else if (request->type == UDF_TASK_TEARDOWN) {
306!
748
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
306✔
749
  }
750
  return (void *)buf;
3,094✔
751
}
752

753
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
682✔
754
  int32_t len = 0;
682✔
755
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
682✔
756
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
682✔
757
  len += taosEncodeFixedI32(buf, setupRsp->bytes);
682✔
758
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
682✔
759
  return len;
682✔
760
}
761

762
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
341✔
763
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
341!
764
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
341✔
765
  buf = taosDecodeFixedI32(buf, &setupRsp->bytes);
341!
766
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
341!
767
  return (void *)buf;
341✔
768
}
769

770
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
4,893✔
771
  int32_t len = 0;
4,893✔
772
  len += taosEncodeFixedI8(buf, callRsp->callType);
4,893✔
773
  switch (callRsp->callType) {
4,893!
774
    case TSDB_UDF_CALL_SCALA_PROC:
1,192✔
775
      len += tEncodeDataBlock(buf, &callRsp->resultData);
1,192✔
776
      break;
1,192✔
777
    case TSDB_UDF_CALL_AGG_INIT:
678✔
778
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
678✔
779
      break;
677✔
780
    case TSDB_UDF_CALL_AGG_PROC:
2,410✔
781
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
2,410✔
782
      break;
2,410✔
783
    // case TSDB_UDF_CALL_AGG_MERGE:
784
    //   len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
785
    //   break;
786
    case TSDB_UDF_CALL_AGG_FIN:
614✔
787
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
614✔
788
      break;
614✔
789
  }
790
  return len;
4,892✔
791
}
792

793
void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) {
2,447✔
794
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
2,447✔
795
  switch (callRsp->callType) {
2,447!
796
    case TSDB_UDF_CALL_SCALA_PROC:
596✔
797
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
596✔
798
      break;
596✔
799
    case TSDB_UDF_CALL_AGG_INIT:
339✔
800
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
339✔
801
      break;
339✔
802
    case TSDB_UDF_CALL_AGG_PROC:
1,205✔
803
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
1,205✔
804
      break;
1,205✔
805
    // case TSDB_UDF_CALL_AGG_MERGE:
806
    //   buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
807
    //   break;
808
    case TSDB_UDF_CALL_AGG_FIN:
307✔
809
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
307✔
810
      break;
307✔
811
  }
812
  return (void *)buf;
2,447✔
813
}
814

815
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp) { return 0; }
612✔
816

817
void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse) { return (void *)buf; }
306✔
818

819
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) {
6,188✔
820
  int32_t len = 0;
6,188✔
821
  len += sizeof(rsp->msgLen);
6,188✔
822
  if (buf != NULL) {
6,188✔
823
    *(int32_t *)(*buf) = rsp->msgLen;
3,094✔
824
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
3,094✔
825
  }
826

827
  len += sizeof(rsp->seqNum);
6,188✔
828
  if (buf != NULL) {
6,188✔
829
    *(int64_t *)(*buf) = rsp->seqNum;
3,094✔
830
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
3,094✔
831
  }
832

833
  len += taosEncodeFixedI64(buf, rsp->seqNum);
6,188✔
834
  len += taosEncodeFixedI8(buf, rsp->type);
6,188✔
835
  len += taosEncodeFixedI32(buf, rsp->code);
6,188✔
836

837
  switch (rsp->type) {
6,188✔
838
    case UDF_TASK_SETUP:
682✔
839
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
682✔
840
      break;
682✔
841
    case UDF_TASK_CALL:
4,893✔
842
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
4,893✔
843
      break;
4,893✔
844
    case UDF_TASK_TEARDOWN:
612✔
845
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
612✔
846
      break;
612✔
847
    default:
1✔
848
      fnError("encode udf response, invalid udf response type %d", rsp->type);
1!
849
      break;
×
850
  }
851
  return len;
6,187✔
852
}
853

854
void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) {
3,094✔
855
  rsp->msgLen = *(int32_t *)(buf);
3,094✔
856
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
3,094✔
857
  rsp->seqNum = *(int64_t *)(buf);
3,094✔
858
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
3,094✔
859
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
3,094!
860
  buf = taosDecodeFixedI8(buf, &rsp->type);
3,094✔
861
  buf = taosDecodeFixedI32(buf, &rsp->code);
3,094!
862

863
  switch (rsp->type) {
3,094!
864
    case UDF_TASK_SETUP:
341✔
865
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
341✔
866
      break;
341✔
867
    case UDF_TASK_CALL:
2,447✔
868
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
2,447✔
869
      break;
2,447✔
870
    case UDF_TASK_TEARDOWN:
306✔
871
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
306✔
872
      break;
306✔
873
    default:
×
874
      rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR;
×
875
      fnError("decode udf response, invalid udf response type %d", rsp->type);
×
876
      break;
×
877
  }
878
  if (buf == NULL) {
3,094!
879
    rsp->code = terrno;
×
880
    fnError("decode udf response failed, code:0x%x", rsp->code);
×
881
  }
882
  return (void *)buf;
3,094✔
883
}
884

885
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
2,980✔
886
  TAOS_UDF_CHECK_PTR_RVOID(data, meta);
8,940!
887
  if (IS_VAR_DATA_TYPE(meta->type)) {
2,980!
888
    taosMemoryFree(data->varLenCol.varOffsets);
254!
889
    data->varLenCol.varOffsets = NULL;
254✔
890
    taosMemoryFree(data->varLenCol.payload);
254!
891
    data->varLenCol.payload = NULL;
254✔
892
  } else {
893
    taosMemoryFree(data->fixLenCol.nullBitmap);
2,726!
894
    data->fixLenCol.nullBitmap = NULL;
2,726✔
895
    taosMemoryFree(data->fixLenCol.data);
2,726!
896
    data->fixLenCol.data = NULL;
2,726✔
897
  }
898
}
899

900
void freeUdfColumn(SUdfColumn *col) {
2,980✔
901
  TAOS_UDF_CHECK_PTR_RVOID(col);
5,960!
902
  freeUdfColumnData(&col->colData, &col->colMeta);
2,980✔
903
}
904

905
void freeUdfDataDataBlock(SUdfDataBlock *block) {
1,801✔
906
  TAOS_UDF_CHECK_PTR_RVOID(block);
3,602!
907
  for (int32_t i = 0; i < block->numOfCols; ++i) {
4,185✔
908
    freeUdfColumn(block->udfCols[i]);
2,384✔
909
    taosMemoryFree(block->udfCols[i]);
2,384!
910
    block->udfCols[i] = NULL;
2,384✔
911
  }
912
  taosMemoryFree(block->udfCols);
1,801!
913
  block->udfCols = NULL;
1,801✔
914
}
915

916
void freeUdfInterBuf(SUdfInterBuf *buf) {
5,186✔
917
  TAOS_UDF_CHECK_PTR_RVOID(buf);
10,372!
918
  taosMemoryFree(buf->buf);
5,186!
919
  buf->buf = NULL;
5,186✔
920
}
921

922
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
1,801✔
923
  TAOS_UDF_CHECK_PTR_RCODE(block, udfBlock);
5,403!
924
  int32_t code = blockDataCheck(block);
1,801✔
925
  if (code != TSDB_CODE_SUCCESS) {
1,801!
926
    return code;
×
927
  }
928
  udfBlock->numOfRows = block->info.rows;
1,801✔
929
  udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
1,801✔
930
  udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
1,801!
931
  if ((udfBlock->udfCols) == NULL) {
1,800!
932
    return terrno;
×
933
  }
934
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
4,184✔
935
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
2,384!
936
    if (udfBlock->udfCols[i] == NULL) {
2,384!
937
      return terrno;
×
938
    }
939
    SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
2,384✔
940
    SUdfColumn      *udfCol = udfBlock->udfCols[i];
2,383✔
941
    udfCol->colMeta.type = col->info.type;
2,383✔
942
    udfCol->colMeta.bytes = col->info.bytes;
2,383✔
943
    udfCol->colMeta.scale = col->info.scale;
2,383✔
944
    udfCol->colMeta.precision = col->info.precision;
2,383✔
945
    udfCol->colData.numOfRows = udfBlock->numOfRows;
2,383✔
946
    udfCol->hasNull = col->hasNull;
2,383✔
947
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
2,383!
948
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
155✔
949
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
155!
950
      if (udfCol->colData.varLenCol.varOffsets == NULL) {
154!
951
        return terrno;
×
952
      }
953
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
154✔
954
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
154✔
955
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
154!
956
      if (udfCol->colData.varLenCol.payload == NULL) {
154!
957
        return terrno;
×
958
      }
959
      if (col->reassigned) {
154!
960
        for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
×
961
          char   *pColData = col->pData + col->varmeta.offset[row];
×
962
          int32_t colSize = 0;
×
963
          if (col->info.type == TSDB_DATA_TYPE_JSON) {
×
964
            colSize = getJsonValueLen(pColData);
×
965
          } else {
966
            colSize = varDataTLen(pColData);
×
967
          }
968
          memcpy(udfCol->colData.varLenCol.payload, pColData, colSize);
×
969
          udfCol->colData.varLenCol.payload += colSize;
×
970
        }
971
      } else {
972
        memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
154✔
973
      }
974
    } else {
975
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
2,228✔
976
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
2,228✔
977
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
2,228!
978
      if (udfCol->colData.fixLenCol.nullBitmap == NULL) {
2,230!
979
        return terrno;
×
980
      }
981
      char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
2,230✔
982
      memcpy(bitmap, col->nullbitmap, bitmapLen);
2,230✔
983
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
2,230✔
984
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
2,230✔
985
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
2,230!
986
      if (NULL == udfCol->colData.fixLenCol.data) {
2,230!
987
        return terrno;
×
988
      }
989
      char *data = udfCol->colData.fixLenCol.data;
2,230✔
990
      memcpy(data, col->pData, dataLen);
2,230✔
991
    }
992
  }
993
  return TSDB_CODE_SUCCESS;
1,800✔
994
}
995

996
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
580✔
997
  TAOS_UDF_CHECK_PTR_RCODE(udfCol, block);
1,740!
998
  int32_t         code = 0, lino = 0;
580✔
999
  SUdfColumnMeta *meta = &udfCol->colMeta;
580✔
1000

1001
  SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
580✔
1002
  code = blockDataAppendColInfo(block, &colInfoData);
580✔
1003
  TAOS_CHECK_GOTO(code, &lino, _exit);
580!
1004

1005
  code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
580✔
1006
  TAOS_CHECK_GOTO(code, &lino, _exit);
580!
1007

1008
  SColumnInfoData *col = NULL;
580✔
1009
  code = bdGetColumnInfoData(block, 0, &col);
580✔
1010
  TAOS_CHECK_GOTO(code, &lino, _exit);
580!
1011

1012
  for (int32_t i = 0; i < udfCol->colData.numOfRows; ++i) {
716,897✔
1013
    if (udfColDataIsNull(udfCol, i)) {
716,302✔
1014
      colDataSetNULL(col, i);
699,952✔
1015
    } else {
1016
      char *data = udfColDataGetData(udfCol, i);
16,350✔
1017
      code = colDataSetVal(col, i, data, false);
16,350✔
1018
      TAOS_CHECK_GOTO(code, &lino, _exit);
16,365!
1019
    }
1020
  }
1021
  block->info.rows = udfCol->colData.numOfRows;
595✔
1022

1023
  code = blockDataCheck(block);
595✔
1024
  TAOS_CHECK_GOTO(code, &lino, _exit);
580!
1025
_exit:
580✔
1026
  if (code != 0) {
580!
1027
    fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino);
×
1028
  }
1029
  return TSDB_CODE_SUCCESS;
580✔
1030
}
1031

1032
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
596✔
1033
  TAOS_UDF_CHECK_PTR_RCODE(input, output);
1,788!
1034
  int32_t code = 0, lino = 0;
596✔
1035
  int32_t numOfRows = 0;
596✔
1036
  for (int32_t i = 0; i < numOfCols; ++i) {
1,735✔
1037
    numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
1,139✔
1038
  }
1039

1040
  // create the basic block info structure
1041
  for (int32_t i = 0; i < numOfCols; ++i) {
1,735✔
1042
    SColumnInfoData *pInfo = input[i].columnData;
1,139✔
1043
    SColumnInfoData  d = {0};
1,139✔
1044
    d.info = pInfo->info;
1,139✔
1045

1046
    TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit);
1,139!
1047
  }
1048

1049
  TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit);
596!
1050

1051
  for (int32_t i = 0; i < numOfCols; ++i) {
1,735✔
1052
    SColumnInfoData *pDest = taosArrayGet(output->pDataBlock, i);
1,139✔
1053

1054
    SColumnInfoData *pColInfoData = input[i].columnData;
1,139✔
1055
    TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit);
1,139!
1056

1057
    if (input[i].numOfRows < numOfRows) {
1,139!
1058
      int32_t startRow = input[i].numOfRows;
×
1059
      int32_t expandRows = numOfRows - startRow;
×
1060
      bool    isNull = colDataIsNull_s(pColInfoData, (input + i)->numOfRows - 1);
×
1061
      if (isNull) {
×
1062
        colDataSetNNULL(pDest, startRow, expandRows);
×
1063
      } else {
1064
        char *src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
×
1065
        for (int32_t j = 0; j < expandRows; ++j) {
×
1066
          TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow + j, src, false), &lino, _exit);
×
1067
        }
1068
      }
1069
    }
1070
  }
1071

1072
  output->info.rows = numOfRows;
596✔
1073
_exit:
596✔
1074
  if (code != 0) {
596!
1075
    fnError("failed to convert scalar param to data block, code:%d, line:%d", code, lino);
×
1076
  }
1077
  return code;
596✔
1078
}
1079

1080
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
580✔
1081
  TAOS_UDF_CHECK_PTR_RCODE(input, output);
1,740!
1082
  if (taosArrayGetSize(input->pDataBlock) != 1) {
580!
1083
    fnError("scalar function only support one column");
×
1084
    return 0;
×
1085
  }
1086
  output->numOfRows = input->info.rows;
580✔
1087

1088
  output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
580!
1089
  if (output->columnData == NULL) {
580!
1090
    return terrno;
×
1091
  }
1092
  memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
580✔
1093
  output->colAlloced = true;
580✔
1094

1095
  return 0;
580✔
1096
}
1097

1098
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
1099
// memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
1100
typedef struct SUdfAggRes {
1101
  int8_t  finalResNum;
1102
  int8_t  interResNum;
1103
  int32_t interResBufLen;
1104
  char   *finalResBuf;
1105
  char   *interResBuf;
1106
} SUdfAggRes;
1107

1108
void    onUdfcPipeClose(uv_handle_t *handle);
1109
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
1110
void    udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
1111
bool    isUdfcUvMsgComplete(SClientConnBuf *connBuf);
1112
void    udfcUvHandleRsp(SClientUvConn *conn);
1113
void    udfcUvHandleError(SClientUvConn *conn);
1114
void    onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
1115
void    onUdfcPipeWrite(uv_write_t *write, int32_t status);
1116
void    onUdfcPipeConnect(uv_connect_t *connect, int32_t status);
1117
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
1118
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
1119
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
1120
void    udfcAsyncTaskCb(uv_async_t *async);
1121
void    cleanUpUvTasks(SUdfcProxy *udfc);
1122
void    udfStopAsyncCb(uv_async_t *async);
1123
void    constructUdfService(void *argsThread);
1124
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
1125
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
1126
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2);
1127
int32_t doTeardownUdf(UdfcFuncHandle handle);
1128

1129
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1130
                SSDataBlock *output, SUdfInterBuf *newState);
1131
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
1132
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
1133
// udf todo:  aggmerge
1134
// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
1135
//                           SUdfInterBuf *resultBuf);
1136
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
1137
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1138
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1139

1140
int32_t udfcOpen();
1141
int32_t udfcClose();
1142

1143
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
1144
void    releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
1145
int32_t cleanUpUdfs();
1146

1147
bool    udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
1148
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
1149
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
1150
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
1151

1152
void    cleanupNotExpiredUdfs();
1153
void    cleanupExpiredUdfs();
1154
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) {
20,598✔
1155
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
20,598✔
1156
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
20,598✔
1157
  return strcmp(stub1->udfName, stub2->udfName);
20,598✔
1158
}
1159

1160
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
2,546✔
1161
  TAOS_UDF_CHECK_PTR_RCODE(udfName, pHandle);
7,639!
1162
  int32_t code = 0, line = 0;
2,546✔
1163
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
2,546✔
1164
  SUdfcFuncStub key = {0};
2,547✔
1165
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
2,547✔
1166
  int32_t stubIndex = taosArraySearchIdx(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
2,547✔
1167
  if (stubIndex != -1) {
2,547✔
1168
    SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
2,115✔
1169
    UdfcFuncHandle handle = foundStub->handle;
2,115✔
1170
    int64_t        currUs = taosGetTimestampUs();
2,115✔
1171
    bool           expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000;
2,115✔
1172
    if (!expired) {
2,115✔
1173
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
2,114!
1174
        *pHandle = foundStub->handle;
2,114✔
1175
        ++foundStub->refCount;
2,114✔
1176
        uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
2,114✔
1177
        return 0;
2,114✔
1178
      } else {
1179
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
×
1180
               foundStub->refCount, foundStub->createTime);
1181
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
×
1182
      }
1183
    } else {
1184
      fnDebug("udf handle expired for %s, will setup udf. move it to expired list", udfName);
1!
1185
      if (taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub) == NULL) {
2!
1186
        fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1187
      } else {
1188
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
1✔
1189
        taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
1✔
1190
      }
1191
    }
1192
  }
1193
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
433✔
1194
  *pHandle = NULL;
433✔
1195
  code = doSetupUdf(udfName, pHandle);
433✔
1196
  if (code == TSDB_CODE_SUCCESS) {
433✔
1197
    SUdfcFuncStub stub = {0};
333✔
1198
    tstrncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
333✔
1199
    stub.handle = *pHandle;
333✔
1200
    ++stub.refCount;
333✔
1201
    stub.createTime = taosGetTimestampUs();
333✔
1202
    uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
333✔
1203
    if (taosArrayPush(gUdfcProxy.udfStubs, &stub) == NULL) {
666!
1204
      fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1205
      uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
1206
      goto _exit;
×
1207
    } else {
1208
      taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
333✔
1209
    }
1210
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
333✔
1211
  } else {
1212
    *pHandle = NULL;
100✔
1213
  }
1214

1215
_exit:
433✔
1216
  return code;
433✔
1217
}
1218

1219
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
2,447✔
1220
  TAOS_UDF_CHECK_PTR_RVOID(udfName);
4,894!
1221
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
2,447✔
1222
  SUdfcFuncStub key = {0};
2,447✔
1223
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
2,447✔
1224
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
2,447✔
1225
  SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ);
2,447✔
1226
  if (!foundStub && !expiredStub) {
2,447!
1227
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
1228
    return;
×
1229
  }
1230
  if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) {
2,447!
1231
    --foundStub->refCount;
2,420✔
1232
  }
1233
  if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) {
2,447!
1234
    --expiredStub->refCount;
×
1235
  }
1236
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
2,447✔
1237
}
1238

1239
void cleanupExpiredUdfs() {
1,238✔
1240
  int32_t i = 0;
1,238✔
1241
  SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
1,238✔
1242
  if (expiredUdfStubs == NULL) {
1,238!
1243
    fnError("cleanupExpiredUdfs: failed to init array");
×
1244
    return;
×
1245
  }
1246
  while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
1,304✔
1247
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
66✔
1248
    if (stub->refCount == 0) {
66!
1249
      fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle,
×
1250
             stub->refCount);
1251
      (void)doTeardownUdf(stub->handle);
×
1252
    } else {
1253
      fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p",
66!
1254
             stub->udfName, stub->refCount, stub->createTime, stub->handle);
1255
      UdfcFuncHandle handle = stub->handle;
66✔
1256
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
66!
1257
        if (taosArrayPush(expiredUdfStubs, stub) == NULL) {
66!
1258
          fnError("cleanupExpiredUdfs: failed to push udf stub to array");
×
1259
        }
1260
      } else {
1261
        fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
×
1262
               stub->udfName, stub->refCount, stub->createTime);
1263
      }
1264
    }
1265
    ++i;
66✔
1266
  }
1267
  taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
1,238✔
1268
  gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
1,238✔
1269
}
1270

1271
void cleanupNotExpiredUdfs() {
1,238✔
1272
  SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
1,238✔
1273
  if (udfStubs == NULL) {
1,238!
1274
    fnError("cleanupNotExpiredUdfs: failed to init array");
×
1275
    return;
×
1276
  }
1277
  int32_t i = 0;
1,238✔
1278
  while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
16,336✔
1279
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
15,098✔
1280
    if (stub->refCount == 0) {
15,098✔
1281
      fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
306!
1282
      (void)doTeardownUdf(stub->handle);
306✔
1283
    } else {
1284
      fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
14,792!
1285
             stub->refCount, stub->createTime, stub->handle);
1286
      UdfcFuncHandle handle = stub->handle;
14,792✔
1287
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
14,792!
1288
        if (taosArrayPush(udfStubs, stub) == NULL) {
14,792!
1289
          fnError("cleanupNotExpiredUdfs: failed to push udf stub to array");
×
1290
        }
1291
      } else {
1292
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName,
×
1293
               stub->refCount, stub->createTime);
1294
      }
1295
    }
1296
    ++i;
15,098✔
1297
  }
1298
  taosArrayDestroy(gUdfcProxy.udfStubs);
1,238✔
1299
  gUdfcProxy.udfStubs = udfStubs;
1,238✔
1300
}
1301

1302
int32_t cleanUpUdfs() {
259,534✔
1303
  int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
259,534✔
1304
  if (!initialized) {
259,558!
1305
    return TSDB_CODE_SUCCESS;
×
1306
  }
1307

1308
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
259,558✔
1309
  if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
259,733!
1310
      (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
258,495!
1311
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
258,495✔
1312
    return TSDB_CODE_SUCCESS;
258,495✔
1313
  }
1314

1315
  cleanupNotExpiredUdfs();
1,238✔
1316
  cleanupExpiredUdfs();
1,238✔
1317

1318
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
1,238✔
1319
  return 0;
1,238✔
1320
}
1321

1322
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
679✔
1323
  TAOS_UDF_CHECK_PTR_RCODE(udfName, input, output);
2,719!
1324
  UdfcFuncHandle handle = NULL;
679✔
1325
  int32_t        code = acquireUdfFuncHandle(udfName, &handle);
679✔
1326
  if (code != 0) {
680✔
1327
    return code;
84✔
1328
  }
1329

1330
  SUdfcUvSession *session = handle;
596✔
1331
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
596✔
1332
  if (code != TSDB_CODE_SUCCESS) {
596✔
1333
    fnError("udfc scalar function execution failure");
16!
1334
    releaseUdfFuncHandle(udfName, handle);
16✔
1335
    return code;
16✔
1336
  }
1337

1338
  if (output->columnData == NULL) {
580!
1339
    fnError("udfc scalar function calculate error. no column data");
×
1340
    code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1341
  } else {
1342
    if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) {
580!
1343
      fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
×
1344
              session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes);
1345
      code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1346
    }
1347
  }
1348
  releaseUdfFuncHandle(udfName, handle);
580✔
1349
  return code;
580✔
1350
}
1351

1352
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
263✔
1353
  if (pFunc == NULL || pEnv == NULL) {
263!
1354
    fnError("udfAggGetEnv: invalid input lint: %d", __LINE__);
×
1355
    return false;
×
1356
  }
1357
  if (fmIsScalarFunc(pFunc->funcId)) {
263!
1358
    return false;
×
1359
  }
1360
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
263✔
1361
  return true;
263✔
1362
}
1363

1364
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) {
355✔
1365
  TAOS_UDF_CHECK_PTR_RCODE(pCtx, pResultCellInfo);
1,065!
1366
  if (pResultCellInfo->initialized) {
355!
1367
    return TSDB_CODE_SUCCESS;
×
1368
  }
1369
  if (functionSetup(pCtx, pResultCellInfo) != TSDB_CODE_SUCCESS) {
355!
1370
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1371
  }
1372
  UdfcFuncHandle handle;
1373
  int32_t        udfCode = 0;
355✔
1374
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
355✔
1375
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
16!
1376
    return TSDB_CODE_FUNC_SETUP_ERROR;
16✔
1377
  }
1378
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
339✔
1379
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
339✔
1380
  int32_t         envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize;
339✔
1381
  memset(udfRes, 0, envSize);
339✔
1382

1383
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
339✔
1384
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
339✔
1385

1386
  SUdfInterBuf buf = {0};
339✔
1387
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
339✔
1388
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
28!
1389
    releaseUdfFuncHandle(pCtx->udfName, handle);
28✔
1390
    return TSDB_CODE_FUNC_SETUP_ERROR;
28✔
1391
  }
1392
  if (buf.bufLen <= session->bufSize) {
311!
1393
    memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
311✔
1394
    udfRes->interResBufLen = buf.bufLen;
311✔
1395
    udfRes->interResNum = buf.numOfResult;
311✔
1396
  } else {
1397
    fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
×
1398
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
1399
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1400
  }
1401
  releaseUdfFuncHandle(pCtx->udfName, handle);
311✔
1402
  freeUdfInterBuf(&buf);
311✔
1403
  return TSDB_CODE_SUCCESS;
311✔
1404
}
1405

1406
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
1,205✔
1407
  TAOS_UDF_CHECK_PTR_RCODE(pCtx);
2,410!
1408
  int32_t        udfCode = 0;
1,205✔
1409
  UdfcFuncHandle handle = 0;
1,205✔
1410
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
1,205!
1411
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
1412
    return udfCode;
×
1413
  }
1414

1415
  SUdfcUvSession *session = handle;
1,205✔
1416
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
1,205✔
1417
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
1,205✔
1418
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
1,205✔
1419

1420
  SInputColumnInfoData *pInput = &pCtx->input;
1,205✔
1421
  int32_t               numOfCols = pInput->numOfInputCols;
1,205✔
1422
  int32_t               start = pInput->startRowIndex;
1,205✔
1423
  int32_t               numOfRows = pInput->numOfRows;
1,205✔
1424
  SSDataBlock          *pTempBlock = NULL;
1,205✔
1425
  int32_t               code = createDataBlock(&pTempBlock);
1,205✔
1426

1427
  if (code) {
1,205!
1428
    return code;
×
1429
  }
1430

1431
  pTempBlock->info.rows = pInput->totalRows;
1,205✔
1432
  pTempBlock->info.id.uid = pInput->uid;
1,205✔
1433
  for (int32_t i = 0; i < numOfCols; ++i) {
2,450✔
1434
    if ((udfCode = blockDataAppendColInfo(pTempBlock, pInput->pData[i])) != 0) {
1,245!
1435
      fnError("udfAggProcess error. step blockDataAppendColInfo. udf code: %d", udfCode);
×
1436
      blockDataDestroy(pTempBlock);
×
1437
      return udfCode;
×
1438
    }
1439
  }
1440

1441
  SSDataBlock *inputBlock = NULL;
1,205✔
1442
  code = blockDataExtractBlock(pTempBlock, start, numOfRows, &inputBlock);
1,205✔
1443
  if (code) {
1,205!
1444
    return code;
×
1445
  }
1446

1447
  SUdfInterBuf state = {
1,205✔
1448
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
1,205✔
1449
  SUdfInterBuf newState = {0};
1,205✔
1450

1451
  udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
1,205✔
1452
  if (udfCode != 0) {
1,205!
1453
    fnError("udfAggProcess error. code: %d", udfCode);
×
1454
    newState.numOfResult = 0;
×
1455
  } else {
1456
    if (newState.bufLen <= session->bufSize) {
1,205!
1457
      memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
1,205✔
1458
      udfRes->interResBufLen = newState.bufLen;
1,205✔
1459
      udfRes->interResNum = newState.numOfResult;
1,205✔
1460
    } else {
1461
      fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
×
1462
      udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
×
1463
    }
1464
  }
1465

1466
  GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum;
1,205✔
1467

1468
  blockDataDestroy(inputBlock);
1,205✔
1469

1470
  taosArrayDestroy(pTempBlock->pDataBlock);
1,205✔
1471
  taosMemoryFree(pTempBlock);
1,205!
1472

1473
  releaseUdfFuncHandle(pCtx->udfName, handle);
1,205✔
1474
  freeUdfInterBuf(&newState);
1,205✔
1475
  return udfCode;
1,205✔
1476
}
1477

1478
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
307✔
1479
  TAOS_UDF_CHECK_PTR_RCODE(pCtx, pBlock);
921!
1480
  int32_t        udfCode = 0;
307✔
1481
  UdfcFuncHandle handle = 0;
307✔
1482
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
307!
1483
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
1484
    return udfCode;
×
1485
  }
1486

1487
  SUdfcUvSession *session = handle;
307✔
1488
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
307✔
1489
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
307✔
1490
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
307✔
1491

1492
  SUdfInterBuf resultBuf = {0};
307✔
1493
  SUdfInterBuf state = {
307✔
1494
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
307✔
1495
  int32_t udfCallCode = 0;
307✔
1496
  udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
307✔
1497
  if (udfCallCode != 0) {
307!
1498
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
×
1499
    GET_RES_INFO(pCtx)->numOfRes = 0;
×
1500
  } else {
1501
    if (resultBuf.numOfResult == 0) {
307✔
1502
      udfRes->finalResNum = 0;
15✔
1503
      GET_RES_INFO(pCtx)->numOfRes = 0;
15✔
1504
    } else {
1505
      if (resultBuf.bufLen <= session->bytes) {
292!
1506
        memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen);
292✔
1507
        udfRes->finalResNum = resultBuf.numOfResult;
292✔
1508
        GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
292✔
1509
      } else {
1510
        fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
×
1511
        GET_RES_INFO(pCtx)->numOfRes = 0;
×
1512
        udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1513
      }
1514
    }
1515
  }
1516

1517
  freeUdfInterBuf(&resultBuf);
307✔
1518

1519
  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
307✔
1520
  releaseUdfFuncHandle(pCtx->udfName, handle);
307✔
1521
  return udfCallCode == 0 ? numOfResults : udfCallCode;
307!
1522
}
1523

1524
void onUdfcPipeClose(uv_handle_t *handle) {
306✔
1525
  SClientUvConn *conn = handle->data;
306✔
1526
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
306!
1527
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
306✔
1528
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
306✔
1529
    task->errCode = 0;
306✔
1530
    QUEUE_REMOVE(&task->procTaskQueue);
306✔
1531
    uv_sem_post(&task->taskSem);
306✔
1532
  }
1533
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
306✔
1534
  if (conn->session != NULL) {
306✔
1535
    conn->session->udfUvPipe = NULL;
302✔
1536
  }
1537
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
306✔
1538
  taosMemoryFree(conn->readBuf.buf);
306!
1539
  taosMemoryFree(conn);
306!
1540
  taosMemoryFree((uv_pipe_t *)handle);
306!
1541
}
306✔
1542

1543
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
3,833✔
1544
  int32_t code = 0;
3,833✔
1545
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
3,833✔
1546
  if (uvTask->type == UV_TASK_REQ_RSP) {
3,833✔
1547
    if (uvTask->rspBuf.base != NULL) {
3,094!
1548
      SUdfResponse rsp = {0};
3,094✔
1549
      void        *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
3,094✔
1550
      code = rsp.code;
3,094✔
1551
      if (code != 0) {
3,094✔
1552
        fnError("udfc get udf task result failure. code: %d", code);
52!
1553
      }
1554

1555
      switch (task->type) {
3,094!
1556
        case UDF_TASK_SETUP: {
341✔
1557
          task->_setup.rsp = rsp.setupRsp;
341✔
1558
          break;
341✔
1559
        }
1560
        case UDF_TASK_CALL: {
2,447✔
1561
          task->_call.rsp = rsp.callRsp;
2,447✔
1562
          break;
2,447✔
1563
        }
1564
        case UDF_TASK_TEARDOWN: {
306✔
1565
          task->_teardown.rsp = rsp.teardownRsp;
1566
          break;
306✔
1567
        }
1568
        default: {
×
1569
          break;
×
1570
        }
1571
      }
1572

1573
      // TODO: the call buffer is setup and freed by udf invocation
1574
      taosMemoryFreeClear(uvTask->rspBuf.base);
3,094!
1575
    } else {
1576
      code = uvTask->errCode;
×
1577
      if (code != 0) {
×
1578
        fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1579
      }
1580
    }
1581
  } else if (uvTask->type == UV_TASK_CONNECT) {
739✔
1582
    code = uvTask->errCode;
433✔
1583
    if (code != 0) {
433✔
1584
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
92!
1585
    }
1586
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
306!
1587
    code = uvTask->errCode;
306✔
1588
    if (code != 0) {
306!
1589
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1590
    }
1591
  }
1592
  return code;
3,833✔
1593
}
1594

1595
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
9,263✔
1596
  SClientUvConn  *conn = handle->data;
9,263✔
1597
  SClientConnBuf *connBuf = &conn->readBuf;
9,263✔
1598

1599
  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
9,263✔
1600
  if (connBuf->cap == 0) {
9,263✔
1601
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
3,435!
1602
    if (connBuf->buf) {
3,435!
1603
      connBuf->len = 0;
3,435✔
1604
      connBuf->cap = msgHeadSize;
3,435✔
1605
      connBuf->total = -1;
3,435✔
1606

1607
      buf->base = connBuf->buf;
3,435✔
1608
      buf->len = connBuf->cap;
3,435✔
1609
    } else {
1610
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
×
1611
      buf->base = NULL;
×
1612
      buf->len = 0;
×
1613
    }
1614
  } else if (connBuf->total == -1 && connBuf->len < msgHeadSize) {
5,828!
1615
    buf->base = connBuf->buf + connBuf->len;
2,730✔
1616
    buf->len = msgHeadSize - connBuf->len;
2,730✔
1617
  } else {
1618
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
3,098✔
1619
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
3,098!
1620
    if (resultBuf) {
3,098!
1621
      connBuf->buf = resultBuf;
3,098✔
1622
      buf->base = connBuf->buf + connBuf->len;
3,098✔
1623
      buf->len = connBuf->cap - connBuf->len;
3,098✔
1624
    } else {
1625
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
×
1626
      buf->base = NULL;
×
1627
      buf->len = 0;
×
1628
    }
1629
  }
1630

1631
  fnDebug("udfc uv alloc buffer: cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
9,263✔
1632
}
9,263✔
1633

1634
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
6,192✔
1635
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
6,192!
1636
    connBuf->total = *(int32_t *)(connBuf->buf);
3,094✔
1637
  }
1638
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
6,192✔
1639
    fnDebug("udfc complete message is received, now handle it");
3,094✔
1640
    return true;
3,094✔
1641
  }
1642
  return false;
3,098✔
1643
}
1644

1645
void udfcUvHandleRsp(SClientUvConn *conn) {
3,094✔
1646
  SClientConnBuf *connBuf = &conn->readBuf;
3,094✔
1647
  int64_t         seqNum = *(int64_t *)(connBuf->buf + sizeof(int32_t));  // msglen then seqnum
3,094✔
1648

1649
  if (QUEUE_EMPTY(&conn->taskQueue)) {
3,094!
1650
    fnError("udfc no task waiting on connection. response seqnum:%" PRId64, seqNum);
×
1651
    return;
×
1652
  }
1653
  bool               found = false;
3,094✔
1654
  SClientUvTaskNode *taskFound = NULL;
3,094✔
1655
  QUEUE             *h = QUEUE_NEXT(&conn->taskQueue);
3,094✔
1656
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
3,094✔
1657

1658
  while (h != &conn->taskQueue) {
6,387✔
1659
    fnDebug("udfc handle response iterate through queue. uvTask:%" PRId64 "-%p", task->seqNum, task);
3,293✔
1660
    if (task->seqNum == seqNum) {
3,293✔
1661
      if (found == false) {
3,094!
1662
        found = true;
3,094✔
1663
        taskFound = task;
3,094✔
1664
      } else {
1665
        fnError("udfc more than one task waiting for the same response");
×
1666
        continue;
×
1667
      }
1668
    }
1669
    h = QUEUE_NEXT(h);
3,293✔
1670
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
3,293✔
1671
  }
1672

1673
  if (taskFound) {
3,094!
1674
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
3,094✔
1675
    QUEUE_REMOVE(&taskFound->connTaskQueue);
3,094✔
1676
    QUEUE_REMOVE(&taskFound->procTaskQueue);
3,094✔
1677
    uv_sem_post(&taskFound->taskSem);
3,094✔
1678
  } else {
1679
    fnError("no task is waiting for the response.");
×
1680
  }
1681
  connBuf->buf = NULL;
3,094✔
1682
  connBuf->total = -1;
3,094✔
1683
  connBuf->len = 0;
3,094✔
1684
  connBuf->cap = 0;
3,094✔
1685
}
1686

1687
void udfcUvHandleError(SClientUvConn *conn) {
×
1688
  fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe);
×
1689
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
×
1690
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
×
1691
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
×
1692
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
×
1693
    QUEUE_REMOVE(&task->connTaskQueue);
×
1694
    QUEUE_REMOVE(&task->procTaskQueue);
×
1695
    uv_sem_post(&task->taskSem);
×
1696
  }
1697
  if (!uv_is_closing((uv_handle_t *)conn->pipe)) {
×
1698
    uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose);
×
1699
  }
1700
}
×
1701

1702
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
9,263✔
1703
  fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread);
9,263✔
1704
  if (nread == 0) return;
9,263✔
1705

1706
  SClientUvConn  *conn = client->data;
6,192✔
1707
  SClientConnBuf *connBuf = &conn->readBuf;
6,192✔
1708
  if (nread > 0) {
6,192!
1709
    connBuf->len += nread;
6,192✔
1710
    if (isUdfcUvMsgComplete(connBuf)) {
6,192✔
1711
      udfcUvHandleRsp(conn);
3,094✔
1712
    }
1713
  }
1714
  if (nread < 0) {
6,192!
1715
    fnError("udfc client pipe %p read error: %zd(%s).", client, nread, uv_strerror(nread));
×
1716
    if (nread == UV_EOF) {
×
1717
      fnError("\tudfc client pipe %p closed", client);
×
1718
    }
1719
    udfcUvHandleError(conn);
×
1720
  }
1721
}
1722

1723
void onUdfcPipeWrite(uv_write_t *write, int32_t status) {
3,094✔
1724
  SClientUvConn *conn = write->data;
3,094✔
1725
  if (status < 0) {
3,094!
1726
    fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
×
1727
    udfcUvHandleError(conn);
×
1728
  } else {
1729
    fnDebug("udfc client connection %p write succeed", conn);
3,094✔
1730
  }
1731
  taosMemoryFree(write);
3,094!
1732
}
3,094✔
1733

1734
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status) {
433✔
1735
  SClientUvTaskNode *uvTask = connect->data;
433✔
1736
  if (status != 0) {
433✔
1737
    fnError("client connect error, task seq: %" PRId64 ", code:%s", uvTask->seqNum, uv_strerror(status));
92!
1738
  }
1739
  uvTask->errCode = status;
433✔
1740

1741
  int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
433✔
1742
  if (code != 0) {
433✔
1743
    fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code));
92!
1744
    uvTask->errCode = code;
92✔
1745
  }
1746
  taosMemoryFree(connect);
433!
1747
  QUEUE_REMOVE(&uvTask->procTaskQueue);
433✔
1748
  uv_sem_post(&uvTask->taskSem);
433✔
1749
}
433✔
1750

1751
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask) {
3,833✔
1752
  uvTask->type = uvTaskType;
3,833✔
1753
  uvTask->udfc = task->session->udfc;
3,833✔
1754

1755
  if (uvTaskType == UV_TASK_CONNECT) {
3,833✔
1756
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
3,400✔
1757
    uvTask->pipe = task->session->udfUvPipe;
3,094✔
1758
    SUdfRequest request;
1759
    request.type = task->type;
3,094✔
1760
    request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
3,094✔
1761

1762
    if (task->type == UDF_TASK_SETUP) {
3,094✔
1763
      request.setup = task->_setup.req;
341✔
1764
      request.type = UDF_TASK_SETUP;
341✔
1765
    } else if (task->type == UDF_TASK_CALL) {
2,753✔
1766
      request.call = task->_call.req;
2,447✔
1767
      request.type = UDF_TASK_CALL;
2,447✔
1768
    } else if (task->type == UDF_TASK_TEARDOWN) {
306!
1769
      request.teardown = task->_teardown.req;
306✔
1770
      request.type = UDF_TASK_TEARDOWN;
306✔
1771
    } else {
1772
      fnError("udfc create uv task, invalid task type : %d", task->type);
×
1773
    }
1774
    int32_t bufLen = encodeUdfRequest(NULL, &request);
3,094✔
1775
    if (bufLen <= 0) {
3,093!
1776
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
1777
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1778
    }
1779
    request.msgLen = bufLen;
3,093✔
1780
    void *bufBegin = taosMemoryMalloc(bufLen);
3,093!
1781
    if (bufBegin == NULL) {
3,092!
1782
      fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen);
×
1783
      return terrno;
×
1784
    }
1785
    void *buf = bufBegin;
3,092✔
1786
    if (encodeUdfRequest(&buf, &request) <= 0) {
3,092!
1787
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
1788
      taosMemoryFree(bufBegin);
×
1789
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1790
    }
1791

1792
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
3,094✔
1793
    uvTask->seqNum = request.seqNum;
3,093✔
1794
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
306!
1795
    uvTask->pipe = task->session->udfUvPipe;
306✔
1796
  }
1797
  if (uv_sem_init(&uvTask->taskSem, 0) != 0) {
3,832✔
1798
    if (uvTaskType == UV_TASK_REQ_RSP) {
1!
1799
      taosMemoryFreeClear(uvTask->reqBuf.base);
×
1800
    }
1801
    fnError("udfc create uv task, init semaphore failed.");
1!
1802
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1803
  }
1804

1805
  return 0;
3,830✔
1806
}
1807

1808
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
3,830✔
1809
  fnDebug("queue uv task to event loop, uvTask: %d-%p", uvTask->type, uvTask);
3,830✔
1810
  SUdfcProxy *udfc = uvTask->udfc;
3,830✔
1811
  uv_mutex_lock(&udfc->taskQueueMutex);
3,830✔
1812
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
3,833✔
1813
  uv_mutex_unlock(&udfc->taskQueueMutex);
3,833✔
1814
  int32_t code = uv_async_send(&udfc->loopTaskAync);
3,833✔
1815
  if (code != 0) {
3,833!
1816
    fnError("udfc queue uv task to event loop failed. code:%s", uv_strerror(code));
×
1817
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1818
  }
1819

1820
  uv_sem_wait(&uvTask->taskSem);
3,833✔
1821
  fnDebug("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
3,833✔
1822
  uv_sem_destroy(&uvTask->taskSem);
3,833✔
1823

1824
  return 0;
3,833✔
1825
}
1826

1827
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
3,833✔
1828
  fnDebug("event loop start uv task. uvTask: %" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
3,833✔
1829
  int32_t code = 0;
3,833✔
1830

1831
  switch (uvTask->type) {
3,833!
1832
    case UV_TASK_CONNECT: {
433✔
1833
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
433!
1834
      if (pipe == NULL) {
433!
1835
        fnError("udfc event loop start connect task malloc pipe failed.");
×
1836
        return terrno;
×
1837
      }
1838
      if (uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0) != 0) {
433!
1839
        fnError("udfc event loop start connect task uv_pipe_init failed.");
×
1840
        taosMemoryFree(pipe);
×
1841
        return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1842
      }
1843
      uvTask->pipe = pipe;
433✔
1844

1845
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
433!
1846
      if (conn == NULL) {
433!
1847
        fnError("udfc event loop start connect task malloc conn failed.");
×
1848
        taosMemoryFree(pipe);
×
1849
        return terrno;
×
1850
      }
1851
      conn->pipe = pipe;
433✔
1852
      conn->readBuf.len = 0;
433✔
1853
      conn->readBuf.cap = 0;
433✔
1854
      conn->readBuf.buf = 0;
433✔
1855
      conn->readBuf.total = -1;
433✔
1856
      QUEUE_INIT(&conn->taskQueue);
433✔
1857

1858
      pipe->data = conn;
433✔
1859

1860
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
433!
1861
      if (connReq == NULL) {
433!
1862
        fnError("udfc event loop start connect task malloc connReq failed.");
×
1863
        taosMemoryFree(pipe);
×
1864
        taosMemoryFree(conn);
×
1865
        return terrno;
×
1866
      }
1867
      connReq->data = uvTask;
433✔
1868
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
433✔
1869
      code = 0;
433✔
1870
      break;
433✔
1871
    }
1872
    case UV_TASK_REQ_RSP: {
3,094✔
1873
      uv_pipe_t *pipe = uvTask->pipe;
3,094✔
1874
      if (pipe == NULL) {
3,094!
1875
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1876
      } else {
1877
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
3,094!
1878
        if (write == NULL) {
3,094!
1879
          fnError("udfc event loop start req_rsp task malloc write failed.");
×
1880
          return terrno;
×
1881
        }
1882
        write->data = pipe->data;
3,094✔
1883
        QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
3,094✔
1884
        QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
3,094✔
1885
        int32_t err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
3,094✔
1886
        if (err != 0) {
3,094!
1887
          taosMemoryFree(write);
×
1888
          fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code:%s", uvTask, uv_strerror(err));
×
1889
        }
1890
        code = err;
3,094✔
1891
      }
1892
      break;
3,094✔
1893
    }
1894
    case UV_TASK_DISCONNECT: {
306✔
1895
      uv_pipe_t *pipe = uvTask->pipe;
306✔
1896
      if (pipe == NULL) {
306!
1897
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1898
      } else {
1899
        SClientUvConn *conn = pipe->data;
306✔
1900
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
306✔
1901
        if (!uv_is_closing((uv_handle_t *)uvTask->pipe)) {
306!
1902
          uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
306✔
1903
        }
1904
        code = 0;
306✔
1905
      }
1906
      break;
306✔
1907
    }
1908
    default: {
×
1909
      fnError("udfc event loop unknown task type.") break;
×
1910
    }
1911
  }
1912

1913
  return code;
3,833✔
1914
}
1915

1916
void udfcAsyncTaskCb(uv_async_t *async) {
3,795✔
1917
  SUdfcProxy *udfc = async->data;
3,795✔
1918
  QUEUE       wq;
1919

1920
  uv_mutex_lock(&udfc->taskQueueMutex);
3,795✔
1921
  QUEUE_MOVE(&udfc->taskQueue, &wq);
3,795✔
1922
  uv_mutex_unlock(&udfc->taskQueueMutex);
3,795✔
1923

1924
  while (!QUEUE_EMPTY(&wq)) {
7,628✔
1925
    QUEUE *h = QUEUE_HEAD(&wq);
3,833✔
1926
    QUEUE_REMOVE(h);
3,833✔
1927
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
3,833✔
1928
    int32_t            code = udfcStartUvTask(task);
3,833✔
1929
    if (code == 0) {
3,833!
1930
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
3,833✔
1931
    } else {
1932
      task->errCode = code;
×
1933
      uv_sem_post(&task->taskSem);
×
1934
    }
1935
  }
1936
}
3,795✔
1937

1938
void cleanUpUvTasks(SUdfcProxy *udfc) {
70✔
1939
  fnDebug("clean up uv tasks") QUEUE wq;
70✔
1940

1941
  uv_mutex_lock(&udfc->taskQueueMutex);
70✔
1942
  QUEUE_MOVE(&udfc->taskQueue, &wq);
70!
1943
  uv_mutex_unlock(&udfc->taskQueueMutex);
70✔
1944

1945
  while (!QUEUE_EMPTY(&wq)) {
70!
1946
    QUEUE *h = QUEUE_HEAD(&wq);
×
1947
    QUEUE_REMOVE(h);
×
1948
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
×
1949
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1950
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1951
    }
1952
    uv_sem_post(&task->taskSem);
×
1953
  }
1954

1955
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
70!
1956
    QUEUE *h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
×
1957
    QUEUE_REMOVE(h);
×
1958
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
×
1959
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1960
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1961
    }
1962
    uv_sem_post(&task->taskSem);
×
1963
  }
1964
}
70✔
1965

1966
void udfStopAsyncCb(uv_async_t *async) {
70✔
1967
  SUdfcProxy *udfc = async->data;
70✔
1968
  cleanUpUvTasks(udfc);
70✔
1969
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
70!
1970
    uv_stop(&udfc->uvLoop);
70✔
1971
  }
1972
}
70✔
1973

1974
void constructUdfService(void *argsThread) {
70✔
1975
  int32_t     code = 0, lino = 0;
70✔
1976
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
70✔
1977
  code = uv_loop_init(&udfc->uvLoop);
70✔
1978
  TAOS_CHECK_GOTO(code, &lino, _exit);
70!
1979

1980
  code = uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
70✔
1981
  TAOS_CHECK_GOTO(code, &lino, _exit);
70!
1982
  udfc->loopTaskAync.data = udfc;
70✔
1983
  code = uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
70✔
1984
  TAOS_CHECK_GOTO(code, &lino, _exit);
70!
1985
  udfc->loopStopAsync.data = udfc;
70✔
1986
  code = uv_mutex_init(&udfc->taskQueueMutex);
70✔
1987
  TAOS_CHECK_GOTO(code, &lino, _exit);
70!
1988
  QUEUE_INIT(&udfc->taskQueue);
70✔
1989
  QUEUE_INIT(&udfc->uvProcTaskQueue);
70✔
1990
  (void)uv_barrier_wait(&udfc->initBarrier);
70✔
1991
  // TODO return value of uv_run
1992
  int32_t num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
70✔
1993
  fnInfo("udfc uv loop exit. active handle num: %d", num);
70!
1994
  (void)uv_loop_close(&udfc->uvLoop);
70✔
1995

1996
  uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL);
70✔
1997
  num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
70✔
1998
  fnInfo("udfc uv loop exit. active handle num: %d", num);
70!
1999

2000
  (void)uv_loop_close(&udfc->uvLoop);
70✔
2001
_exit:
70✔
2002
  if (code != 0) {
70!
2003
    fnError("udfc construct error. code: %d, line: %d", code, lino);
×
2004
  }
2005
  fnInfo("udfc construct finished");
70!
2006
}
70✔
2007

2008
int32_t udfcOpen() {
79✔
2009
  int32_t code = 0, lino = 0;
79✔
2010
  int8_t  old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
79✔
2011
  if (old == 1) {
79✔
2012
    return 0;
9✔
2013
  }
2014
  SUdfcProxy *proxy = &gUdfcProxy;
70✔
2015
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
70✔
2016
  proxy->udfcState = UDFC_STATE_STARTNG;
70✔
2017
  code = uv_barrier_init(&proxy->initBarrier, 2);
70✔
2018
  TAOS_CHECK_GOTO(code, &lino, _exit);
70!
2019
  code = uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
70✔
2020
  TAOS_CHECK_GOTO(code, &lino, _exit);
70!
2021
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
70✔
2022
  proxy->udfcState = UDFC_STATE_READY;
70✔
2023
  (void)uv_barrier_wait(&proxy->initBarrier);
70✔
2024
  TAOS_CHECK_GOTO(code, &lino, _exit);
70!
2025
  code = uv_mutex_init(&proxy->udfStubsMutex);
70✔
2026
  TAOS_CHECK_GOTO(code, &lino, _exit);
70!
2027
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
70✔
2028
  if (proxy->udfStubs == NULL) {
70!
2029
    fnError("udfc init failed. udfStubs: %p", proxy->udfStubs);
×
2030
    return -1;
×
2031
  }
2032
  proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
70✔
2033
  if (proxy->expiredUdfStubs == NULL) {
70!
2034
    taosArrayDestroy(proxy->udfStubs);
×
2035
    fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs);
×
2036
    return -1;
×
2037
  }
2038
  code = uv_mutex_init(&proxy->udfcUvMutex);
70✔
2039
  TAOS_CHECK_GOTO(code, &lino, _exit);
70!
2040
_exit:
70✔
2041
  if (code != 0) {
70!
2042
    fnError("udfc open error. code: %d, line: %d", code, lino);
×
2043
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
2044
  }
2045
  fnInfo("udfc initialized");
70!
2046
  return 0;
70✔
2047
}
2048

2049
int32_t udfcClose() {
70✔
2050
  int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 1, 0);
70✔
2051
  if (old == 0) {
70!
2052
    return 0;
×
2053
  }
2054

2055
  SUdfcProxy *udfc = &gUdfcProxy;
70✔
2056
  udfc->udfcState = UDFC_STATE_STOPPING;
70✔
2057
  if (uv_async_send(&udfc->loopStopAsync) != 0) {
70!
2058
    fnError("udfc close error to send stop async");
×
2059
  }
2060
  if (uv_thread_join(&udfc->loopThread) != 0) {
70!
2061
    fnError("udfc close errir to join loop thread");
×
2062
  }
2063
  uv_mutex_destroy(&udfc->taskQueueMutex);
70✔
2064
  uv_barrier_destroy(&udfc->initBarrier);
70✔
2065
  taosArrayDestroy(udfc->expiredUdfStubs);
70✔
2066
  taosArrayDestroy(udfc->udfStubs);
70✔
2067
  uv_mutex_destroy(&udfc->udfStubsMutex);
70✔
2068
  uv_mutex_destroy(&udfc->udfcUvMutex);
70✔
2069
  udfc->udfcState = UDFC_STATE_INITAL;
70✔
2070
  fnInfo("udfc is cleaned up");
70!
2071
  return 0;
70✔
2072
}
2073

2074
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
3,833✔
2075
  int32_t            code = 0, lino = 0;
3,833✔
2076
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
3,833!
2077
  if (uvTask == NULL) {
3,833!
2078
    fnError("udfc client task: %p failed to allocate memory for uvTask", task);
×
2079
    return terrno;
×
2080
  }
2081
  fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
3,833✔
2082

2083
  code = udfcInitializeUvTask(task, uvTaskType, uvTask);
3,833✔
2084
  TAOS_CHECK_GOTO(code, &lino, _exit);
3,830!
2085
  code = udfcQueueUvTask(uvTask);
3,830✔
2086
  TAOS_CHECK_GOTO(code, &lino, _exit);
3,833!
2087
  code = udfcGetUdfTaskResultFromUvTask(task, uvTask);
3,833✔
2088
  TAOS_CHECK_GOTO(code, &lino, _exit);
3,833✔
2089
  if (uvTaskType == UV_TASK_CONNECT) {
3,689✔
2090
    task->session->udfUvPipe = uvTask->pipe;
341✔
2091
    SClientUvConn *conn = uvTask->pipe->data;
341✔
2092
    conn->session = task->session;
341✔
2093
  }
2094

2095
_exit:
3,348✔
2096
  if (code != 0) {
3,833✔
2097
    fnError("udfc run udf uv task failure. task: %p, uvTask: %p, err: %d, line: %d", task, uvTask, code, lino);
144!
2098
  }
2099
  taosMemoryFree(uvTask->reqBuf.base);
3,833!
2100
  uvTask->reqBuf.base = NULL;
3,833✔
2101
  taosMemoryFree(uvTask);
3,833!
2102
  uvTask = NULL;
3,833✔
2103
  return code;
3,833✔
2104
}
2105

2106
static void freeTaskSession(SClientUdfTask *task) {
406✔
2107
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
406✔
2108
  if (task->session->udfUvPipe != NULL && task->session->udfUvPipe->data != NULL) {
406!
2109
    SClientUvConn *conn = task->session->udfUvPipe->data;
12✔
2110
    conn->session = NULL;
12✔
2111
  }
2112
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
406✔
2113
  taosMemoryFreeClear(task->session);
406!
2114
}
406✔
2115

2116
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
433✔
2117
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
433✔
2118
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
433!
2119
  if (task == NULL) {
433!
2120
    fnError("doSetupUdf, failed to allocate memory for task");
×
2121
    return terrno;
×
2122
  }
2123
  task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
433!
2124
  if (task->session == NULL) {
433!
2125
    fnError("doSetupUdf, failed to allocate memory for session");
×
2126
    taosMemoryFree(task);
×
2127
    return terrno;
×
2128
  }
2129
  task->session->udfc = &gUdfcProxy;
433✔
2130
  task->type = UDF_TASK_SETUP;
433✔
2131

2132
  SUdfSetupRequest *req = &task->_setup.req;
433✔
2133
  tstrncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
433✔
2134

2135
  code = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
433✔
2136
  TAOS_CHECK_GOTO(code, &lino, _exit);
433✔
2137

2138
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
341✔
2139
  TAOS_CHECK_GOTO(code, &lino, _exit);
341✔
2140

2141
  SUdfSetupResponse *rsp = &task->_setup.rsp;
333✔
2142
  task->session->severHandle = rsp->udfHandle;
333✔
2143
  task->session->outputType = rsp->outputType;
333✔
2144
  task->session->bytes = rsp->bytes;
333✔
2145
  task->session->bufSize = rsp->bufSize;
333✔
2146
  tstrncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
333✔
2147
  fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
333!
2148
  *funcHandle = task->session;
333✔
2149
  taosMemoryFree(task);
333!
2150
  return 0;
333✔
2151

2152
_exit:
100✔
2153
  if (code != 0) {
100!
2154
    fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino);
100!
2155
  }
2156
  freeTaskSession(task);
100✔
2157
  taosMemoryFree(task);
100!
2158
  return code;
100✔
2159
}
2160

2161
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
2,447✔
2162
                SSDataBlock *output, SUdfInterBuf *newState) {
2163
  fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
2,447✔
2164
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
2,447✔
2165
  if (session->udfUvPipe == NULL) {
2,447!
2166
    fnError("No pipe to taosudf");
×
2167
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2168
  }
2169
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
2,447!
2170
  if (task == NULL) {
2,447!
2171
    fnError("udfc call udf. failed to allocate memory for task");
×
2172
    return terrno;
×
2173
  }
2174
  task->session = (SUdfcUvSession *)handle;
2,447✔
2175
  task->type = UDF_TASK_CALL;
2,447✔
2176

2177
  SUdfCallRequest *req = &task->_call.req;
2,447✔
2178
  req->udfHandle = task->session->severHandle;
2,447✔
2179
  req->callType = callType;
2,447✔
2180

2181
  switch (callType) {
2,447!
2182
    case TSDB_UDF_CALL_AGG_INIT: {
339✔
2183
      req->initFirst = 1;
339✔
2184
      break;
339✔
2185
    }
2186
    case TSDB_UDF_CALL_AGG_PROC: {
1,205✔
2187
      req->block = *input;
1,205✔
2188
      req->interBuf = *state;
1,205✔
2189
      break;
1,205✔
2190
    }
2191
    // case TSDB_UDF_CALL_AGG_MERGE: {
2192
    //   req->interBuf = *state;
2193
    //   req->interBuf2 = *state2;
2194
    //   break;
2195
    // }
2196
    case TSDB_UDF_CALL_AGG_FIN: {
307✔
2197
      req->interBuf = *state;
307✔
2198
      break;
307✔
2199
    }
2200
    case TSDB_UDF_CALL_SCALA_PROC: {
596✔
2201
      req->block = *input;
596✔
2202
      break;
596✔
2203
    }
2204
  }
2205

2206
  int32_t code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
2,447✔
2207
  if (code != 0) {
2,447✔
2208
    fnError("call udf failure. udfcRunUdfUvTask err: %d", code);
44!
2209
  } else {
2210
    SUdfCallResponse *rsp = &task->_call.rsp;
2,403✔
2211
    switch (callType) {
2,403!
2212
      case TSDB_UDF_CALL_AGG_INIT: {
311✔
2213
        *newState = rsp->resultBuf;
311✔
2214
        break;
311✔
2215
      }
2216
      case TSDB_UDF_CALL_AGG_PROC: {
1,205✔
2217
        *newState = rsp->resultBuf;
1,205✔
2218
        break;
1,205✔
2219
      }
2220
      // case TSDB_UDF_CALL_AGG_MERGE: {
2221
      //   *newState = rsp->resultBuf;
2222
      //   break;
2223
      // }
2224
      case TSDB_UDF_CALL_AGG_FIN: {
307✔
2225
        *newState = rsp->resultBuf;
307✔
2226
        break;
307✔
2227
      }
2228
      case TSDB_UDF_CALL_SCALA_PROC: {
580✔
2229
        *output = rsp->resultData;
580✔
2230
        break;
580✔
2231
      }
2232
    }
2233
  }
2,447✔
2234
  taosMemoryFree(task);
2,447!
2235
  return code;
2,447✔
2236
}
2237

2238
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
339✔
2239
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;
339✔
2240

2241
  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
339✔
2242

2243
  return err;
339✔
2244
}
2245

2246
// input: block, state
2247
// output: interbuf,
2248
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
1,205✔
2249
  int8_t  callType = TSDB_UDF_CALL_AGG_PROC;
1,205✔
2250
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
1,205✔
2251
  return err;
1,205✔
2252
}
2253

2254
// input: interbuf1, interbuf2
2255
// output: resultBuf
2256
// udf todo:  aggmerge
2257
// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
2258
//                           SUdfInterBuf *resultBuf) {
2259
//   int8_t  callType = TSDB_UDF_CALL_AGG_MERGE;
2260
//   int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
2261
//   return err;
2262
// }
2263

2264
// input: interBuf
2265
// output: resultData
2266
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
307✔
2267
  int8_t  callType = TSDB_UDF_CALL_AGG_FIN;
307✔
2268
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
307✔
2269
  return err;
307✔
2270
}
2271

2272
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
596✔
2273
  int8_t      callType = TSDB_UDF_CALL_SCALA_PROC;
596✔
2274
  SSDataBlock inputBlock = {0};
596✔
2275
  int32_t     code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
596✔
2276
  if (code != 0) {
596!
2277
    fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code);
×
2278
    return code;
×
2279
  }
2280
  SSDataBlock resultBlock = {0};
596✔
2281
  int32_t     err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
596✔
2282
  if (err == 0) {
596✔
2283
    err = convertDataBlockToScalarParm(&resultBlock, output);
580✔
2284
    taosArrayDestroy(resultBlock.pDataBlock);
580✔
2285
  }
2286

2287
  blockDataFreeRes(&inputBlock);
596✔
2288
  return err;
596✔
2289
}
2290

2291
int32_t doTeardownUdf(UdfcFuncHandle handle) {
306✔
2292
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
306✔
2293
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
306✔
2294

2295
  if (session->udfUvPipe == NULL) {
306!
2296
    fnError("tear down udf. pipe to taosudf does not exist. udf name: %s", session->udfName);
×
2297
    taosMemoryFree(session);
×
2298
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2299
  }
2300

2301
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
306!
2302
  if (task == NULL) {
306!
2303
    fnError("doTeardownUdf, failed to allocate memory for task");
×
2304
    taosMemoryFree(session);
×
2305
    return terrno;
×
2306
  }
2307
  task->session = session;
306✔
2308
  task->type = UDF_TASK_TEARDOWN;
306✔
2309

2310
  SUdfTeardownRequest *req = &task->_teardown.req;
306✔
2311
  req->udfHandle = task->session->severHandle;
306✔
2312

2313
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
306✔
2314
  TAOS_CHECK_GOTO(code, &lino, _exit);
306!
2315

2316
  code = udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
306✔
2317
  TAOS_CHECK_GOTO(code, &lino, _exit);
306!
2318

2319
  fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
306!
2320
  // TODO: synchronization refactor between libuv event loop and request thread
2321
  // uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
2322
  // if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
2323
  //   SClientUvConn *conn = session->udfUvPipe->data;
2324
  //   conn->session = NULL;
2325
  // }
2326
  // uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
2327

2328
_exit:
×
2329
  if (code != 0) {
306!
2330
    fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino);
×
2331
  }
2332
  freeTaskSession(task);
306✔
2333
  taosMemoryFree(task);
306!
2334

2335
  return code;
306✔
2336
}
2337
#else
2338
#include "tudf.h"
2339

2340
int32_t cleanUpUdfs() { return 0; }
2341
int32_t udfcOpen() { return 0; }
2342
int32_t udfcClose() { return 0; }
2343
int32_t udfStartUdfd(int32_t startDnodeId) { return 0; }
2344
void    udfStopUdfd() { return; }
2345
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
2346
  return TSDB_CODE_OPS_NOT_SUPPORT;
2347
}
2348
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc