• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.02
/source/libs/function/src/tudf.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#ifdef USE_UDF
16
#include "uv.h"
17

18
#include "os.h"
19

20
#include "builtinsimpl.h"
21
#include "fnLog.h"
22
#include "functionMgt.h"
23
#include "querynodes.h"
24
#include "tarray.h"
25
#include "tdatablock.h"
26
#include "tglobal.h"
27
#include "tudf.h"
28
#include "tudfInt.h"
29

30
#ifdef _TD_DARWIN_64
31
#include <mach-o/dyld.h>
32
#endif
33

34
typedef struct SUdfdData {
35
  bool         startCalled;
36
  bool         needCleanUp;
37
  uv_loop_t    loop;
38
  uv_thread_t  thread;
39
  uv_barrier_t barrier;
40
  uv_process_t process;
41
#ifdef WINDOWS
42
  HANDLE jobHandle;
43
#endif
44
  int32_t    spawnErr;
45
  uv_pipe_t  ctrlPipe;
46
  uv_async_t stopAsync;
47
  int32_t    stopCalled;
48

49
  int32_t dnodeId;
50
} SUdfdData;
51

52
SUdfdData udfdGlobal = {0};
53

54
int32_t udfStartUdfd(int32_t startDnodeId);
55
void    udfStopUdfd();
56

57
extern char **environ;
58

59
static int32_t udfSpawnUdfd(SUdfdData *pData);
60
void           udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
61
static void    udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
62
static void    udfUdfdStopAsyncCb(uv_async_t *async);
63
static void    udfWatchUdfd(void *args);
64

65
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
24✔
66
  TAOS_UDF_CHECK_PTR_RVOID(process);
48!
67
  fnInfo("taosudf process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
24!
68
  SUdfdData *pData = process->data;
24✔
69
  if(pData == NULL) {
24!
70
    fnError("taosudf process data is NULL");
×
71
    return;
×
72
  }
73
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
24!
74
    fnInfo("taosudf process exit due to SIGINT or dnode-mgmt called stop");
×
75
  } else {
76
    fnInfo("taosudf process restart");
24!
77
    int32_t code = udfSpawnUdfd(pData);
24✔
78
    if (code != 0) {
24!
79
      fnError("taosudf process restart failed with code:%d", code);
×
80
    }
81
  }
82
}
83

84
static int32_t udfSpawnUdfd(SUdfdData *pData) {
2,189✔
85
  fnInfo("start to init taosudf");
2,189!
86
  TAOS_UDF_CHECK_PTR_RCODE(pData);
4,378!
87

88
  int32_t              err = 0;
2,189✔
89
  uv_process_options_t options = {0};
2,189✔
90

91
  char path[PATH_MAX] = {0};
2,189✔
92
  if (tsProcPath == NULL) {
2,189✔
93
    path[0] = '.';
16✔
94
#ifdef WINDOWS
95
    GetModuleFileName(NULL, path, PATH_MAX);
96
    TAOS_DIRNAME(path);
97
#elif defined(_TD_DARWIN_64)
98
    uint32_t pathSize = sizeof(path);
99
    _NSGetExecutablePath(path, &pathSize);
100
    TAOS_DIRNAME(path);
101
#endif
102
  } else {
103
    TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
2,173✔
104
    TAOS_DIRNAME(path);
2,173✔
105
  }
106
#ifdef WINDOWS
107
  if (strlen(path) == 0) {
108
    TAOS_STRCAT(path, "C:\\TDengine");
109
  }
110
  TAOS_STRCAT(path, "\\taosudf.exe");
111
#else
112
  if (strlen(path) == 0) {
2,189!
113
    TAOS_STRCAT(path, "/usr/bin");
×
114
  }
115
  TAOS_STRCAT(path, "/taosudf");
2,189✔
116
#endif
117
  char *argsUdfd[] = {path, "-c", configDir, NULL};
2,189✔
118
  options.args = argsUdfd;
2,189✔
119
  options.file = path;
2,189✔
120

121
  options.exit_cb = udfUdfdExit;
2,189✔
122

123
  TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
2,189!
124

125
  uv_stdio_container_t child_stdio[3];
126
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
2,189✔
127
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
2,189✔
128
  child_stdio[1].flags = UV_IGNORE;
2,189✔
129
  child_stdio[2].flags = UV_INHERIT_FD;
2,189✔
130
  child_stdio[2].data.fd = 2;
2,189✔
131
  options.stdio_count = 3;
2,189✔
132
  options.stdio = child_stdio;
2,189✔
133

134
  options.flags = UV_PROCESS_DETACHED;
2,189✔
135

136
  char dnodeIdEnvItem[32] = {0};
2,189✔
137
  char thrdPoolSizeEnvItem[32] = {0};
2,189✔
138
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
2,189✔
139

140
  float   numCpuCores = 4;
2,189✔
141
  int32_t code = taosGetCpuCores(&numCpuCores, false);
2,189✔
142
  if (code != 0) {
2,189!
143
    fnError("failed to get cpu cores, code:0x%x", code);
×
144
  }
145
  numCpuCores = TMAX(numCpuCores, 2);
2,189!
146
  snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);
2,189✔
147

148
  char    pathTaosdLdLib[512] = {0};
2,189✔
149
  size_t  taosdLdLibPathLen = sizeof(pathTaosdLdLib);
2,189✔
150
  int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
2,189✔
151
  if (ret != UV_ENOBUFS) {
2,189!
152
    taosdLdLibPathLen = strlen(pathTaosdLdLib);
2,189✔
153
  }
154

155
  char   udfdPathLdLib[1024] = {0};
2,189✔
156
  size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
2,189✔
157
  tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));
2,189✔
158

159
  udfdPathLdLib[udfdLdLibPathLen] = ':';
2,189✔
160
  tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
2,189✔
161
  if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
2,189!
162
    fnInfo("[UDFD]taosudf LD_LIBRARY_PATH: %s", udfdPathLdLib);
2,189!
163
  } else {
164
    fnError("[UDFD]can not set correct taosudf LD_LIBRARY_PATH");
×
165
  }
166
  char ldLibPathEnvItem[1024 + 32] = {0};
2,189✔
167
  snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
2,189✔
168

169
  char *taosFqdnEnvItem = NULL;
2,189✔
170
  char *taosFqdn = getenv("TAOS_FQDN");
2,189✔
171
  if (taosFqdn != NULL) {
2,189!
172
    int32_t subLen = strlen(taosFqdn);
×
173
    int32_t len = strlen("TAOS_FQDN=") + subLen + 1;
×
174
    taosFqdnEnvItem = taosMemoryMalloc(len);
×
175
    if (taosFqdnEnvItem != NULL) {
×
176
      tstrncpy(taosFqdnEnvItem, "TAOS_FQDN=", len);
×
177
      TAOS_STRNCAT(taosFqdnEnvItem, taosFqdn, subLen);
×
178
      fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
×
179
    } else {
180
      fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
×
181
      return terrno;
×
182
    }
183
  }
184

185
  char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};
2,189✔
186

187
  char **envUdfdWithPEnv = NULL;
2,189✔
188
  if (environ != NULL) {
2,189!
189
    int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
2,189✔
190
    int32_t numEnviron = 0;
2,189✔
191
    while (environ[numEnviron] != NULL) {
43,656✔
192
      numEnviron++;
41,467✔
193
    }
194

195
    envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
2,189!
196
    if (envUdfdWithPEnv == NULL) {
2,189!
197
      err = TSDB_CODE_OUT_OF_MEMORY;
×
198
      goto _OVER;
×
199
    }
200

201
    for (int32_t i = 0; i < numEnviron; i++) {
43,656✔
202
      int32_t len = strlen(environ[i]) + 1;
41,467✔
203
      envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
41,467!
204
      if (envUdfdWithPEnv[i] == NULL) {
41,467!
205
        err = TSDB_CODE_OUT_OF_MEMORY;
×
206
        goto _OVER;
×
207
      }
208

209
      tstrncpy(envUdfdWithPEnv[i], environ[i], len);
41,467✔
210
    }
211

212
    for (int32_t i = 0; i < lenEnvUdfd; i++) {
13,134✔
213
      if (envUdfd[i] != NULL) {
10,945✔
214
        int32_t len = strlen(envUdfd[i]) + 1;
6,567✔
215
        envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
6,567!
216
        if (envUdfdWithPEnv[numEnviron + i] == NULL) {
6,567!
217
          err = TSDB_CODE_OUT_OF_MEMORY;
×
218
          goto _OVER;
×
219
        }
220

221
        tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
6,567✔
222
      }
223
    }
224
    envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
2,189✔
225

226
    options.env = envUdfdWithPEnv;
2,189✔
227
  } else {
228
    options.env = envUdfd;
×
229
  }
230

231
  err = uv_spawn(&pData->loop, &pData->process, &options);
2,189✔
232
  pData->process.data = (void *)pData;
2,189✔
233

234
#ifdef WINDOWS
235
  // End taosudf.exe by Job.
236
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
237
  pData->jobHandle = CreateJobObject(NULL, NULL);
238
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
239
  if (!add_job_ok) {
240
    fnError("Assign taosudf to job failed.");
241
  } else {
242
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
243
    memset(&limit_info, 0x0, sizeof(limit_info));
244
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
245
    bool set_auto_kill_ok =
246
        SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
247
    if (!set_auto_kill_ok) {
248
      fnError("Set job auto kill taosudf failed.");
249
    }
250
  }
251
#endif
252

253
  if (err != 0) {
2,189✔
254
    fnError("can not spawn taosudf. path: %s, error: %s", path, uv_strerror(err));
16!
255
  } else {
256
    fnInfo("taosudf is initialized");
2,173!
257
  }
258

259
_OVER:
×
260
  if (taosFqdnEnvItem) {
2,189!
261
    taosMemoryFree(taosFqdnEnvItem);
×
262
  }
263

264
  if (envUdfdWithPEnv != NULL) {
2,189!
265
    int32_t i = 0;
2,189✔
266
    while (envUdfdWithPEnv[i] != NULL) {
50,223✔
267
      taosMemoryFree(envUdfdWithPEnv[i]);
48,034!
268
      i++;
48,034✔
269
    }
270
    taosMemoryFree(envUdfdWithPEnv);
2,189!
271
  }
272

273
  return err;
2,189✔
274
}
275

276
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) {
10,720✔
277
  TAOS_UDF_CHECK_PTR_RVOID(handle);
21,440!
278
  if (!uv_is_closing(handle)) {
10,720!
279
    uv_close(handle, NULL);
10,720✔
280
  }
281
}
282

283
static void udfUdfdStopAsyncCb(uv_async_t *async) {
2,149✔
284
  TAOS_UDF_CHECK_PTR_RVOID(async);
4,298!
285
  SUdfdData *pData = async->data;
2,149✔
286
  uv_stop(&pData->loop);
2,149✔
287
}
288

289
static void udfWatchUdfd(void *args) {
2,157✔
290
  TAOS_UDF_CHECK_PTR_RVOID(args);
4,314!
291
  SUdfdData *pData = args;
2,157✔
292
  TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
2,157!
293
  TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb));
2,157!
294
  pData->stopAsync.data = pData;
2,157✔
295
  TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData));
2,157✔
296
  atomic_store_32(&pData->spawnErr, 0);
2,149✔
297
  (void)uv_barrier_wait(&pData->barrier);
2,149✔
298
  int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
2,149✔
299
  fnInfo("taosudf loop exit with %d active handles, line:%d", num, __LINE__);
2,149!
300

301
  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
2,149✔
302
  num = uv_run(&pData->loop, UV_RUN_DEFAULT);
2,149✔
303
  fnInfo("taosudf loop exit with %d active handles, line:%d", num, __LINE__);
2,149!
304
  if (uv_loop_close(&pData->loop) != 0) {
2,149!
305
    fnError("taosudf loop close failed, lino:%d", __LINE__);
×
306
  }
307
  return;
2,149✔
308

309
_exit:
8✔
310
  if (terrno != 0) {
8!
311
    (void)uv_barrier_wait(&pData->barrier);
8✔
312
    atomic_store_32(&pData->spawnErr, terrno);
8✔
313
    if (uv_loop_close(&pData->loop) != 0) {
8!
314
      fnError("taosudf loop close failed, lino:%d", __LINE__);
8!
315
    }
316
    fnError("taosudf thread exit with code:%d lino:%d", terrno, terrln);
8!
317
    terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE;
8✔
318
  }
319
  return;
8✔
320
}
321

322
int32_t udfStartUdfd(int32_t startDnodeId) {
2,157✔
323
  int32_t code = 0, lino = 0;
2,157✔
324
  if (!tsStartUdfd) {
2,157!
325
    fnInfo("start taosudf is disabled.") return 0;
×
326
  }
327
  SUdfdData *pData = &udfdGlobal;
2,157✔
328
  if (pData->startCalled) {
2,157!
329
    fnInfo("dnode start taosudf already called");
×
330
    return 0;
×
331
  }
332
  pData->startCalled = true;
2,157✔
333
  char dnodeId[8] = {0};
2,157✔
334
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
2,157✔
335
  TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
2,157!
336
  pData->dnodeId = startDnodeId;
2,157✔
337

338
  TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
2,157!
339
  TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, udfWatchUdfd, pData), &lino, _exit);
2,157!
340
  (void)uv_barrier_wait(&pData->barrier);
2,157✔
341
  int32_t err = atomic_load_32(&pData->spawnErr);
2,157✔
342
  if (err != 0) {
2,157✔
343
    uv_barrier_destroy(&pData->barrier);
8✔
344
    if (uv_async_send(&pData->stopAsync) != 0) {
8!
345
      fnError("start taosudf: failed to send stop async");
×
346
    }
347
    if (uv_thread_join(&pData->thread) != 0) {
8!
348
      fnError("start taosudf: failed to join taosudf thread");
×
349
    }
350
    pData->needCleanUp = false;
8✔
351
    fnInfo("taosudf is cleaned up after spawn err");
8!
352
    TAOS_CHECK_GOTO(err, &lino, _exit);
8!
353
  } else {
354
    pData->needCleanUp = true;
2,149✔
355
  }
356
_exit:
2,157✔
357
  if (code != 0) {
2,157✔
358
    fnError("taosudf start failed with code:%d, lino:%d", code, lino);
8!
359
  }
360
  return code;
2,157✔
361
}
362

363
void udfStopUdfd() {
2,157✔
364
  SUdfdData *pData = &udfdGlobal;
2,157✔
365
  fnInfo("taosudf start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
2,157!
366
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
2,157!
367
    return;
8✔
368
  }
369
  atomic_store_32(&pData->stopCalled, 1);
2,149✔
370
  pData->needCleanUp = false;
2,149✔
371
  uv_barrier_destroy(&pData->barrier);
2,149✔
372
  if (uv_async_send(&pData->stopAsync) != 0) {
2,149!
373
    fnError("stop taosudf: failed to send stop async");
×
374
  }
375
  if (uv_thread_join(&pData->thread) != 0) {
2,149!
376
    fnError("stop taosudf: failed to join taosudf thread");
×
377
  }
378

379
#ifdef WINDOWS
380
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
381
#endif
382
  fnInfo("taosudf is cleaned up");
2,149!
383
  return;
2,149✔
384
}
385

386
/*
387
int32_t udfGetUdfdPid(int32_t* pUdfdPid) {
388
  SUdfdData *pData = &udfdGlobal;
389
  if (pData->spawnErr) {
390
    return pData->spawnErr;
391
  }
392
  uv_pid_t pid = uv_process_get_pid(&pData->process);
393
  if (pUdfdPid) {
394
    *pUdfdPid = (int32_t)pid;
395
  }
396
  return TSDB_CODE_SUCCESS;
397
}
398
*/
399

400
//==============================================================================================
401
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
402
 * The QUEUE is copied from queue.h under libuv
403
 * */
404

405
typedef void *QUEUE[2];
406

407
/* Private macros. */
408
#define QUEUE_NEXT(q)      (*(QUEUE **)&((*(q))[0]))
409
#define QUEUE_PREV(q)      (*(QUEUE **)&((*(q))[1]))
410
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
411
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
412

413
/* Public macros. */
414
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr) - offsetof(type, field)))
415

416
/* Important note: mutating the list while QUEUE_FOREACH is
417
 * iterating over its elements results in undefined behavior.
418
 */
419
#define QUEUE_FOREACH(q, h) for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
420

421
#define QUEUE_EMPTY(q) ((const QUEUE *)(q) == (const QUEUE *)QUEUE_NEXT(q))
422

423
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
424

425
#define QUEUE_INIT(q)    \
426
  do {                   \
427
    QUEUE_NEXT(q) = (q); \
428
    QUEUE_PREV(q) = (q); \
429
  } while (0)
430

431
#define QUEUE_ADD(h, n)                 \
432
  do {                                  \
433
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
434
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
435
    QUEUE_PREV(h) = QUEUE_PREV(n);      \
436
    QUEUE_PREV_NEXT(h) = (h);           \
437
  } while (0)
438

439
#define QUEUE_SPLIT(h, q, n)       \
440
  do {                             \
441
    QUEUE_PREV(n) = QUEUE_PREV(h); \
442
    QUEUE_PREV_NEXT(n) = (n);      \
443
    QUEUE_NEXT(n) = (q);           \
444
    QUEUE_PREV(h) = QUEUE_PREV(q); \
445
    QUEUE_PREV_NEXT(h) = (h);      \
446
    QUEUE_PREV(q) = (n);           \
447
  } while (0)
448

449
#define QUEUE_MOVE(h, n)        \
450
  do {                          \
451
    if (QUEUE_EMPTY(h))         \
452
      QUEUE_INIT(n);            \
453
    else {                      \
454
      QUEUE *q = QUEUE_HEAD(h); \
455
      QUEUE_SPLIT(h, q, n);     \
456
    }                           \
457
  } while (0)
458

459
#define QUEUE_INSERT_HEAD(h, q)    \
460
  do {                             \
461
    QUEUE_NEXT(q) = QUEUE_NEXT(h); \
462
    QUEUE_PREV(q) = (h);           \
463
    QUEUE_NEXT_PREV(q) = (q);      \
464
    QUEUE_NEXT(h) = (q);           \
465
  } while (0)
466

467
#define QUEUE_INSERT_TAIL(h, q)    \
468
  do {                             \
469
    QUEUE_NEXT(q) = (h);           \
470
    QUEUE_PREV(q) = QUEUE_PREV(h); \
471
    QUEUE_PREV_NEXT(q) = (q);      \
472
    QUEUE_PREV(h) = (q);           \
473
  } while (0)
474

475
#define QUEUE_REMOVE(q)                 \
476
  do {                                  \
477
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
478
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
479
  } while (0)
480

481
enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 };
482

483
int64_t gUdfTaskSeqNum = 0;
484
typedef struct SUdfcFuncStub {
485
  char           udfName[TSDB_FUNC_NAME_LEN + 1];
486
  UdfcFuncHandle handle;
487
  int32_t        refCount;
488
  int64_t        createTime;
489
} SUdfcFuncStub;
490

491
typedef struct SUdfcProxy {
492
  char         udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
493
  uv_barrier_t initBarrier;
494

495
  uv_loop_t   uvLoop;
496
  uv_thread_t loopThread;
497
  uv_async_t  loopTaskAync;
498

499
  uv_async_t loopStopAsync;
500

501
  uv_mutex_t taskQueueMutex;
502
  int8_t     udfcState;
503
  QUEUE      taskQueue;
504
  QUEUE      uvProcTaskQueue;
505

506
  uv_mutex_t udfStubsMutex;
507
  SArray    *udfStubs;         // SUdfcFuncStub
508
  SArray    *expiredUdfStubs;  // SUdfcFuncStub
509

510
  uv_mutex_t udfcUvMutex;
511
  int8_t     initialized;
512
} SUdfcProxy;
513

514
SUdfcProxy gUdfcProxy = {0};
515

516
typedef struct SUdfcUvSession {
517
  SUdfcProxy *udfc;
518
  int64_t     severHandle;
519
  uv_pipe_t  *udfUvPipe;
520

521
  int8_t  outputType;
522
  int32_t bytes;
523
  int32_t bufSize;
524

525
  char udfName[TSDB_FUNC_NAME_LEN + 1];
526
} SUdfcUvSession;
527

528
typedef struct SClientUvTaskNode {
529
  SUdfcProxy *udfc;
530
  int8_t      type;
531
  int32_t     errCode;
532

533
  uv_pipe_t *pipe;
534

535
  int64_t  seqNum;
536
  uv_buf_t reqBuf;
537

538
  uv_sem_t taskSem;
539
  uv_buf_t rspBuf;
540

541
  QUEUE recvTaskQueue;
542
  QUEUE procTaskQueue;
543
  QUEUE connTaskQueue;
544
} SClientUvTaskNode;
545

546
typedef struct SClientUdfTask {
547
  int8_t type;
548

549
  SUdfcUvSession *session;
550

551
  union {
552
    struct {
553
      SUdfSetupRequest  req;
554
      SUdfSetupResponse rsp;
555
    } _setup;
556
    struct {
557
      SUdfCallRequest  req;
558
      SUdfCallResponse rsp;
559
    } _call;
560
    struct {
561
      SUdfTeardownRequest  req;
562
      SUdfTeardownResponse rsp;
563
    } _teardown;
564
  };
565

566
} SClientUdfTask;
567

568
typedef struct SClientConnBuf {
569
  char   *buf;
570
  int32_t len;
571
  int32_t cap;
572
  int32_t total;
573
} SClientConnBuf;
574

575
typedef struct SClientUvConn {
576
  uv_pipe_t      *pipe;
577
  QUEUE           taskQueue;
578
  SClientConnBuf  readBuf;
579
  SUdfcUvSession *session;
580
} SClientUvConn;
581

582
enum {
583
  UDFC_STATE_INITAL = 0,  // initial state
584
  UDFC_STATE_STARTNG,     // starting after udfcOpen
585
  UDFC_STATE_READY,       // started and begin to receive quests
586
  UDFC_STATE_STOPPING,    // stopping after udfcClose
587
};
588

589
void    getUdfdPipeName(char *pipeName, int32_t size);
590
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
591
void   *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request);
592
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state);
593
void   *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state);
594
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call);
595
void   *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call);
596
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown);
597
void   *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown);
598
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request);
599
void   *decodeUdfRequest(const void *buf, SUdfRequest *request);
600
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp);
601
void   *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp);
602
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp);
603
void   *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp);
604
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp);
605
void   *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse);
606
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp);
607
void   *decodeUdfResponse(const void *buf, SUdfResponse *rsp);
608
void    freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
609
void    freeUdfColumn(SUdfColumn *col);
610
void    freeUdfDataDataBlock(SUdfDataBlock *block);
611
void    freeUdfInterBuf(SUdfInterBuf *buf);
612
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
613
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
614
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
615
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);
616

617
void getUdfdPipeName(char *pipeName, int32_t size) {
4,317✔
618
  char    dnodeId[8] = {0};
4,317✔
619
  size_t  dnodeIdSize = sizeof(dnodeId);
4,317✔
620
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
4,317✔
621
  if (err != 0) {
4,317!
622
    fnError("failed to get dnodeId from env since %s", uv_err_name(err));
×
623
    dnodeId[0] = '1';
×
624
  }
625
#ifdef _WIN32
626
  snprintf(pipeName, size, "%s.%x.%s", UDF_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir)),
627
           dnodeId);
628
#else
629
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
4,317✔
630
#endif
631
  fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName);
4,317!
632
}
4,317✔
633

634
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
314✔
635
  int32_t len = 0;
314✔
636
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
314✔
637
  return len;
314✔
638
}
639

640
void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request) {
157✔
641
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
157✔
642
  return (void *)buf;
157✔
643
}
644

645
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state) {
418✔
646
  int32_t len = 0;
418✔
647
  len += taosEncodeFixedI8(buf, state->numOfResult);
418✔
648
  len += taosEncodeFixedI32(buf, state->bufLen);
418✔
649
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
418✔
650
  return len;
418✔
651
}
652

653
void *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state) {
209✔
654
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
209✔
655
  buf = taosDecodeFixedI32(buf, &state->bufLen);
209!
656
  buf = taosDecodeBinary(buf, (void **)&state->buf, state->bufLen);
209!
657
  return (void *)buf;
209✔
658
}
659

660
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
53,872✔
661
  int32_t len = 0;
53,872✔
662
  len += taosEncodeFixedI64(buf, call->udfHandle);
53,872✔
663
  len += taosEncodeFixedI8(buf, call->callType);
53,872✔
664
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
53,872✔
665
    len += tEncodeDataBlock(buf, &call->block);
53,653✔
666
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
219✔
667
    len += taosEncodeFixedI8(buf, call->initFirst);
156✔
668
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
141✔
669
    len += tEncodeDataBlock(buf, &call->block);
92✔
670
    len += encodeUdfInterBuf(buf, &call->interBuf);
92✔
671
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
49!
672
    // len += encodeUdfInterBuf(buf, &call->interBuf);
673
    // len += encodeUdfInterBuf(buf, &call->interBuf2);
674
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
78!
675
    len += encodeUdfInterBuf(buf, &call->interBuf);
78✔
676
  }
677
  return len;
53,781✔
678
}
679

680
void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) {
26,990✔
681
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
26,990!
682
  buf = taosDecodeFixedI8(buf, &call->callType);
26,990✔
683
  switch (call->callType) {
26,990!
684
    case TSDB_UDF_CALL_SCALA_PROC:
26,869✔
685
      buf = tDecodeDataBlock(buf, &call->block);
26,869✔
686
      break;
26,835✔
687
    case TSDB_UDF_CALL_AGG_INIT:
39✔
688
      buf = taosDecodeFixedI8(buf, &call->initFirst);
39✔
689
      break;
39✔
690
    case TSDB_UDF_CALL_AGG_PROC:
46✔
691
      buf = tDecodeDataBlock(buf, &call->block);
46✔
692
      buf = decodeUdfInterBuf(buf, &call->interBuf);
46✔
693
      break;
55✔
694
    // case TSDB_UDF_CALL_AGG_MERGE:
695
    //   buf = decodeUdfInterBuf(buf, &call->interBuf);
696
    //   buf = decodeUdfInterBuf(buf, &call->interBuf2);
697
    //   break;
698
    case TSDB_UDF_CALL_AGG_FIN:
39✔
699
      buf = decodeUdfInterBuf(buf, &call->interBuf);
39✔
700
      break;
39✔
701
  }
702
  return (void *)buf;
26,965✔
703
}
704

705
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
268✔
706
  int32_t len = 0;
268✔
707
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
268✔
708
  return len;
268✔
709
}
710

711
void *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown) {
134✔
712
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
134!
713
  return (void *)buf;
134✔
714
}
715

716
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request) {
54,469✔
717
  int32_t len = 0;
54,469✔
718
  if (buf == NULL) {
54,469✔
719
    len += sizeof(request->msgLen);
27,224✔
720
  } else {
721
    *(int32_t *)(*buf) = request->msgLen;
27,245✔
722
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
27,245✔
723
  }
724
  len += taosEncodeFixedI64(buf, request->seqNum);
54,469✔
725
  len += taosEncodeFixedI8(buf, request->type);
54,469✔
726
  if (request->type == UDF_TASK_SETUP) {
54,469✔
727
    len += encodeUdfSetupRequest(buf, &request->setup);
314✔
728
  } else if (request->type == UDF_TASK_CALL) {
54,155✔
729
    len += encodeUdfCallRequest(buf, &request->call);
53,925✔
730
  } else if (request->type == UDF_TASK_TEARDOWN) {
230!
731
    len += encodeUdfTeardownRequest(buf, &request->teardown);
268✔
732
  }
733
  return len;
54,340✔
734
}
735

736
void *decodeUdfRequest(const void *buf, SUdfRequest *request) {
27,284✔
737
  request->msgLen = *(int32_t *)(buf);
27,284✔
738
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
27,284✔
739

740
  buf = taosDecodeFixedI64(buf, &request->seqNum);
27,284!
741
  buf = taosDecodeFixedI8(buf, &request->type);
27,284✔
742

743
  if (request->type == UDF_TASK_SETUP) {
27,284✔
744
    buf = decodeUdfSetupRequest(buf, &request->setup);
157✔
745
  } else if (request->type == UDF_TASK_CALL) {
27,127✔
746
    buf = decodeUdfCallRequest(buf, &request->call);
26,996✔
747
  } else if (request->type == UDF_TASK_TEARDOWN) {
131!
748
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
134✔
749
  }
750
  return (void *)buf;
27,257✔
751
}
752

753
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
314✔
754
  int32_t len = 0;
314✔
755
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
314✔
756
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
314✔
757
  len += taosEncodeFixedI32(buf, setupRsp->bytes);
314✔
758
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
314✔
759
  return len;
314✔
760
}
761

762
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
157✔
763
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
157!
764
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
157✔
765
  buf = taosDecodeFixedI32(buf, &setupRsp->bytes);
157!
766
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
157!
767
  return (void *)buf;
157✔
768
}
769

770
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
53,714✔
771
  int32_t len = 0;
53,714✔
772
  len += taosEncodeFixedI8(buf, callRsp->callType);
53,714✔
773
  switch (callRsp->callType) {
53,714!
774
    case TSDB_UDF_CALL_SCALA_PROC:
53,547✔
775
      len += tEncodeDataBlock(buf, &callRsp->resultData);
53,547✔
776
      break;
53,353✔
777
    case TSDB_UDF_CALL_AGG_INIT:
78✔
778
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
78✔
779
      break;
78✔
780
    case TSDB_UDF_CALL_AGG_PROC:
92✔
781
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
92✔
782
      break;
92✔
783
    // case TSDB_UDF_CALL_AGG_MERGE:
784
    //   len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
785
    //   break;
786
    case TSDB_UDF_CALL_AGG_FIN:
78✔
787
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
78✔
788
      break;
78✔
789
  }
790
  return len;
53,520✔
791
}
792

793
void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) {
26,821✔
794
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
26,821✔
795
  switch (callRsp->callType) {
26,821✔
796
    case TSDB_UDF_CALL_SCALA_PROC:
26,668✔
797
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
26,668✔
798
      break;
26,773✔
799
    case TSDB_UDF_CALL_AGG_INIT:
39✔
800
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
39✔
801
      break;
39✔
802
    case TSDB_UDF_CALL_AGG_PROC:
46✔
803
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
46✔
804
      break;
46✔
805
    // case TSDB_UDF_CALL_AGG_MERGE:
806
    //   buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
807
    //   break;
808
    case TSDB_UDF_CALL_AGG_FIN:
39✔
809
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
39✔
810
      break;
39✔
811
  }
812
  return (void *)buf;
26,926✔
813
}
814

815
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp) { return 0; }
268✔
816

817
void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse) { return (void *)buf; }
134✔
818

819
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) {
54,300✔
820
  int32_t len = 0;
54,300✔
821
  len += sizeof(rsp->msgLen);
54,300✔
822
  if (buf != NULL) {
54,300✔
823
    *(int32_t *)(*buf) = rsp->msgLen;
27,241✔
824
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
27,241✔
825
  }
826

827
  len += sizeof(rsp->seqNum);
54,300✔
828
  if (buf != NULL) {
54,300✔
829
    *(int64_t *)(*buf) = rsp->seqNum;
27,251✔
830
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
27,251✔
831
  }
832

833
  len += taosEncodeFixedI64(buf, rsp->seqNum);
54,300✔
834
  len += taosEncodeFixedI8(buf, rsp->type);
54,300✔
835
  len += taosEncodeFixedI32(buf, rsp->code);
54,300✔
836

837
  switch (rsp->type) {
54,300!
838
    case UDF_TASK_SETUP:
314✔
839
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
314✔
840
      break;
314✔
841
    case UDF_TASK_CALL:
53,794✔
842
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
53,794✔
843
      break;
53,575✔
844
    case UDF_TASK_TEARDOWN:
268✔
845
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
268✔
846
      break;
268✔
UNCOV
847
    default:
×
UNCOV
848
      fnError("encode udf response, invalid udf response type %d", rsp->type);
×
849
      break;
×
850
  }
851
  return len;
54,157✔
852
}
853

854
void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) {
27,065✔
855
  rsp->msgLen = *(int32_t *)(buf);
27,065✔
856
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
27,065✔
857
  rsp->seqNum = *(int64_t *)(buf);
27,065✔
858
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
27,065✔
859
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
27,065!
860
  buf = taosDecodeFixedI8(buf, &rsp->type);
27,065✔
861
  buf = taosDecodeFixedI32(buf, &rsp->code);
27,065!
862

863
  switch (rsp->type) {
27,065!
864
    case UDF_TASK_SETUP:
157✔
865
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
157✔
866
      break;
157✔
867
    case UDF_TASK_CALL:
26,973✔
868
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
26,973✔
869
      break;
26,872✔
870
    case UDF_TASK_TEARDOWN:
134✔
871
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
134✔
872
      break;
134✔
873
    default:
×
874
      rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR;
×
875
      fnError("decode udf response, invalid udf response type %d", rsp->type);
×
876
      break;
×
877
  }
878
  if (buf == NULL) {
27,163!
879
    rsp->code = terrno;
×
880
    fnError("decode udf response failed, code:0x%x", rsp->code);
×
881
  }
882
  return (void *)buf;
27,065✔
883
}
884

885
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
53,675✔
886
  TAOS_UDF_CHECK_PTR_RVOID(data, meta);
161,009!
887
  if (IS_VAR_DATA_TYPE(meta->type)) {
53,675!
888
    taosMemoryFree(data->varLenCol.varOffsets);
×
889
    data->varLenCol.varOffsets = NULL;
2✔
890
    taosMemoryFree(data->varLenCol.payload);
2!
891
    data->varLenCol.payload = NULL;
2✔
892
  } else {
893
    taosMemoryFree(data->fixLenCol.nullBitmap);
53,736!
894
    data->fixLenCol.nullBitmap = NULL;
53,412✔
895
    taosMemoryFree(data->fixLenCol.data);
53,412!
896
    data->fixLenCol.data = NULL;
53,739✔
897
  }
898
}
899

900
void freeUdfColumn(SUdfColumn *col) {
53,703✔
901
  TAOS_UDF_CHECK_PTR_RVOID(col);
107,449!
902
  freeUdfColumnData(&col->colData, &col->colMeta);
53,703✔
903
}
904

905
void freeUdfDataDataBlock(SUdfDataBlock *block) {
26,567✔
906
  TAOS_UDF_CHECK_PTR_RVOID(block);
53,273!
907
  for (int32_t i = 0; i < block->numOfCols; ++i) {
53,504✔
908
    freeUdfColumn(block->udfCols[i]);
26,778✔
909
    taosMemoryFree(block->udfCols[i]);
26,884!
910
    block->udfCols[i] = NULL;
26,937✔
911
  }
912
  taosMemoryFree(block->udfCols);
26,726!
913
  block->udfCols = NULL;
26,884✔
914
}
915

916
void freeUdfInterBuf(SUdfInterBuf *buf) {
333✔
917
  TAOS_UDF_CHECK_PTR_RVOID(buf);
666!
918
  taosMemoryFree(buf->buf);
333!
919
  buf->buf = NULL;
333✔
920
}
921

922
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
26,707✔
923
  TAOS_UDF_CHECK_PTR_RCODE(block, udfBlock);
80,226!
924
  int32_t code = blockDataCheck(block);
26,707✔
925
  if (code != TSDB_CODE_SUCCESS) {
26,811!
926
    return code;
×
927
  }
928
  udfBlock->numOfRows = block->info.rows;
26,811✔
929
  udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
26,811✔
930
  udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
26,853!
931
  if ((udfBlock->udfCols) == NULL) {
26,897!
932
    return terrno;
×
933
  }
934
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
53,831✔
935
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
26,842!
936
    if (udfBlock->udfCols[i] == NULL) {
26,866!
937
      return terrno;
×
938
    }
939
    SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
26,866✔
940
    SUdfColumn      *udfCol = udfBlock->udfCols[i];
26,834✔
941
    udfCol->colMeta.type = col->info.type;
26,834✔
942
    udfCol->colMeta.bytes = col->info.bytes;
26,834✔
943
    udfCol->colMeta.scale = col->info.scale;
26,834✔
944
    udfCol->colMeta.precision = col->info.precision;
26,834✔
945
    udfCol->colData.numOfRows = udfBlock->numOfRows;
26,834✔
946
    udfCol->hasNull = col->hasNull;
26,834✔
947
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
26,834!
948
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
×
949
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
×
950
      if (udfCol->colData.varLenCol.varOffsets == NULL) {
2!
951
        return terrno;
×
952
      }
953
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
2✔
954
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
2✔
955
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
2!
956
      if (udfCol->colData.varLenCol.payload == NULL) {
2!
957
        return terrno;
×
958
      }
959
      if (col->reassigned) {
2!
960
        for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
×
961
          char   *pColData = col->pData + col->varmeta.offset[row];
×
962
          int32_t colSize = 0;
×
963
          if (col->info.type == TSDB_DATA_TYPE_JSON) {
×
964
            colSize = getJsonValueLen(pColData);
×
965
          } else {
966
            colSize = varDataTLen(pColData);
×
967
          }
968
          memcpy(udfCol->colData.varLenCol.payload, pColData, colSize);
×
969
          udfCol->colData.varLenCol.payload += colSize;
×
970
        }
971
      } else {
972
        memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
2✔
973
      }
974
    } else {
975
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
26,895✔
976
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
26,895✔
977
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
26,895!
978
      if (udfCol->colData.fixLenCol.nullBitmap == NULL) {
26,905!
979
        return terrno;
×
980
      }
981
      char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
26,905✔
982
      memcpy(bitmap, col->nullbitmap, bitmapLen);
26,905✔
983
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
26,905✔
984
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
26,936✔
985
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
26,936!
986
      if (NULL == udfCol->colData.fixLenCol.data) {
26,932!
987
        return terrno;
×
988
      }
989
      char *data = udfCol->colData.fixLenCol.data;
26,932✔
990
      memcpy(data, col->pData, dataLen);
26,932✔
991
    }
992
  }
993
  return TSDB_CODE_SUCCESS;
26,989✔
994
}
995

996
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
26,523✔
997
  TAOS_UDF_CHECK_PTR_RCODE(udfCol, block);
79,802!
998
  int32_t         code = 0, lino = 0;
26,523✔
999
  SUdfColumnMeta *meta = &udfCol->colMeta;
26,523✔
1000

1001
  SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
26,523✔
1002
  code = blockDataAppendColInfo(block, &colInfoData);
26,642✔
1003
  TAOS_CHECK_GOTO(code, &lino, _exit);
26,819!
1004

1005
  code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
26,819✔
1006
  TAOS_CHECK_GOTO(code, &lino, _exit);
26,758!
1007

1008
  SColumnInfoData *col = NULL;
26,758✔
1009
  code = bdGetColumnInfoData(block, 0, &col);
26,758✔
1010
  TAOS_CHECK_GOTO(code, &lino, _exit);
26,821!
1011

1012
  for (int32_t i = 0; i < udfCol->colData.numOfRows; ++i) {
5,651,484✔
1013
    if (udfColDataIsNull(udfCol, i)) {
5,603,610✔
1014
      colDataSetNULL(col, i);
34!
1015
    } else {
1016
      char *data = udfColDataGetData(udfCol, i);
5,603,576✔
1017
      code = colDataSetVal(col, i, data, false);
5,603,576✔
1018
      TAOS_CHECK_GOTO(code, &lino, _exit);
5,624,629!
1019
    }
1020
  }
1021
  block->info.rows = udfCol->colData.numOfRows;
47,874✔
1022

1023
  code = blockDataCheck(block);
47,874✔
1024
  TAOS_CHECK_GOTO(code, &lino, _exit);
26,759!
1025
_exit:
26,759✔
1026
  if (code != 0) {
26,759!
1027
    fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino);
×
1028
  }
1029
  return TSDB_CODE_SUCCESS;
26,753✔
1030
}
1031

1032
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
26,877✔
1033
  TAOS_UDF_CHECK_PTR_RCODE(input, output);
80,631!
1034
  int32_t code = 0, lino = 0;
26,877✔
1035
  int32_t numOfRows = 0;
26,877✔
1036
  for (int32_t i = 0; i < numOfCols; ++i) {
53,776✔
1037
    numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
26,899✔
1038
  }
1039

1040
  // create the basic block info structure
1041
  for (int32_t i = 0; i < numOfCols; ++i) {
53,775✔
1042
    SColumnInfoData *pInfo = input[i].columnData;
26,899✔
1043
    SColumnInfoData  d = {0};
26,899✔
1044
    d.info = pInfo->info;
26,899✔
1045

1046
    TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit);
26,899!
1047
  }
1048

1049
  TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit);
26,876!
1050

1051
  for (int32_t i = 0; i < numOfCols; ++i) {
53,769✔
1052
    SColumnInfoData *pDest = taosArrayGet(output->pDataBlock, i);
26,888✔
1053

1054
    SColumnInfoData *pColInfoData = input[i].columnData;
26,890✔
1055
    TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit);
26,890!
1056

1057
    if (input[i].numOfRows < numOfRows) {
26,895✔
1058
      int32_t startRow = input[i].numOfRows;
1✔
1059
      int32_t expandRows = numOfRows - startRow;
1✔
1060
      bool    isNull = colDataIsNull_s(pColInfoData, (input + i)->numOfRows - 1);
1!
1061
      if (isNull) {
1!
1062
        colDataSetNNULL(pDest, startRow, expandRows);
×
1063
      } else {
1064
        char *src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
1!
1065
        for (int32_t j = 0; j < expandRows; ++j) {
7✔
1066
          TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow + j, src, false), &lino, _exit);
6!
1067
        }
1068
      }
1069
    }
1070
  }
1071

1072
  output->info.rows = numOfRows;
26,881✔
1073
_exit:
26,881✔
1074
  if (code != 0) {
26,881!
1075
    fnError("failed to convert scalar param to data block, code:%d, line:%d", code, lino);
×
1076
  }
1077
  return code;
26,873✔
1078
}
1079

1080
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
26,815✔
1081
  TAOS_UDF_CHECK_PTR_RCODE(input, output);
80,467!
1082
  if (taosArrayGetSize(input->pDataBlock) != 1) {
26,815!
1083
    fnError("scalar function only support one column");
×
1084
    return 0;
×
1085
  }
1086
  output->numOfRows = input->info.rows;
26,836✔
1087

1088
  output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
26,836!
1089
  if (output->columnData == NULL) {
26,866!
1090
    return terrno;
×
1091
  }
1092
  memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
26,866✔
1093
  output->colAlloced = true;
26,824✔
1094

1095
  return 0;
26,824✔
1096
}
1097

1098
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
1099
// memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
1100
typedef struct SUdfAggRes {
1101
  int8_t  finalResNum;
1102
  int8_t  interResNum;
1103
  int32_t interResBufLen;
1104
  char   *finalResBuf;
1105
  char   *interResBuf;
1106
} SUdfAggRes;
1107

1108
void    onUdfcPipeClose(uv_handle_t *handle);
1109
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
1110
void    udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
1111
bool    isUdfcUvMsgComplete(SClientConnBuf *connBuf);
1112
void    udfcUvHandleRsp(SClientUvConn *conn);
1113
void    udfcUvHandleError(SClientUvConn *conn);
1114
void    onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
1115
void    onUdfcPipeWrite(uv_write_t *write, int32_t status);
1116
void    onUdfcPipeConnect(uv_connect_t *connect, int32_t status);
1117
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
1118
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
1119
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
1120
void    udfcAsyncTaskCb(uv_async_t *async);
1121
void    cleanUpUvTasks(SUdfcProxy *udfc);
1122
void    udfStopAsyncCb(uv_async_t *async);
1123
void    constructUdfService(void *argsThread);
1124
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
1125
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
1126
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2);
1127
int32_t doTeardownUdf(UdfcFuncHandle handle);
1128

1129
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1130
                SSDataBlock *output, SUdfInterBuf *newState);
1131
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
1132
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
1133
// udf todo:  aggmerge
1134
// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
1135
//                           SUdfInterBuf *resultBuf);
1136
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
1137
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1138
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1139

1140
int32_t udfcOpen();
1141
int32_t udfcClose();
1142

1143
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
1144
void    releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
1145
int32_t cleanUpUdfs();
1146

1147
bool    udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
1148
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
1149
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
1150
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
1151

1152
void    cleanupNotExpiredUdfs();
1153
void    cleanupExpiredUdfs();
1154
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) {
78,132✔
1155
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
78,132✔
1156
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
78,132✔
1157
  return strcmp(stub1->udfName, stub2->udfName);
78,132✔
1158
}
1159

1160
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
26,913✔
1161
  TAOS_UDF_CHECK_PTR_RCODE(udfName, pHandle);
80,692!
1162
  int32_t code = 0, line = 0;
26,913✔
1163
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
26,913✔
1164
  SUdfcFuncStub key = {0};
27,002✔
1165
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
27,002✔
1166
  int32_t stubIndex = taosArraySearchIdx(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
27,002✔
1167
  if (stubIndex != -1) {
27,002✔
1168
    SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
26,863✔
1169
    UdfcFuncHandle handle = foundStub->handle;
26,863✔
1170
    int64_t        currUs = taosGetTimestampUs();
26,863✔
1171
    bool           expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000;
26,863✔
1172
    if (!expired) {
26,863✔
1173
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
26,845!
1174
        *pHandle = foundStub->handle;
26,845✔
1175
        ++foundStub->refCount;
26,845✔
1176
        uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
26,845✔
1177
        return 0;
26,844✔
1178
      } else {
1179
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
×
1180
               foundStub->refCount, foundStub->createTime);
1181
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
×
1182
      }
1183
    } else {
1184
      fnDebug("udf handle expired for %s, will setup udf. move it to expired list", udfName);
18!
1185
      if (taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub) == NULL) {
36!
1186
        fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1187
      } else {
1188
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
18✔
1189
        taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
18✔
1190
      }
1191
    }
1192
  }
1193
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
157✔
1194
  *pHandle = NULL;
157✔
1195
  code = doSetupUdf(udfName, pHandle);
157✔
1196
  if (code == TSDB_CODE_SUCCESS) {
157!
1197
    SUdfcFuncStub stub = {0};
157✔
1198
    tstrncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
157✔
1199
    stub.handle = *pHandle;
157✔
1200
    ++stub.refCount;
157✔
1201
    stub.createTime = taosGetTimestampUs();
157✔
1202
    uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
157✔
1203
    if (taosArrayPush(gUdfcProxy.udfStubs, &stub) == NULL) {
314!
1204
      fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1205
      uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
1206
      goto _exit;
×
1207
    } else {
1208
      taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
157✔
1209
    }
1210
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
157✔
1211
  } else {
1212
    *pHandle = NULL;
×
1213
  }
1214

1215
_exit:
157✔
1216
  return code;
157✔
1217
}
1218

1219
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
26,897✔
1220
  TAOS_UDF_CHECK_PTR_RVOID(udfName);
53,801!
1221
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
26,897✔
1222
  SUdfcFuncStub key = {0};
27,002✔
1223
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
27,002✔
1224
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
27,002✔
1225
  SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ);
27,002✔
1226
  if (!foundStub && !expiredStub) {
27,002!
1227
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
1228
    return;
×
1229
  }
1230
  if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) {
27,002!
1231
    --foundStub->refCount;
26,979✔
1232
  }
1233
  if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) {
27,002!
1234
    --expiredStub->refCount;
×
1235
  }
1236
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
27,002✔
1237
}
1238

1239
void cleanupExpiredUdfs() {
8,148✔
1240
  int32_t i = 0;
8,148✔
1241
  SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
8,148✔
1242
  if (expiredUdfStubs == NULL) {
8,148!
1243
    fnError("cleanupExpiredUdfs: failed to init array");
×
1244
    return;
×
1245
  }
1246
  while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
52,484✔
1247
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
44,336✔
1248
    if (stub->refCount == 0) {
44,336!
1249
      fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle,
×
1250
             stub->refCount);
1251
      (void)doTeardownUdf(stub->handle);
×
1252
    } else {
1253
      fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p",
44,336!
1254
             stub->udfName, stub->refCount, stub->createTime, stub->handle);
1255
      UdfcFuncHandle handle = stub->handle;
44,336✔
1256
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
44,336!
1257
        if (taosArrayPush(expiredUdfStubs, stub) == NULL) {
44,336!
1258
          fnError("cleanupExpiredUdfs: failed to push udf stub to array");
×
1259
        }
1260
      } else {
1261
        fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
×
1262
               stub->udfName, stub->refCount, stub->createTime);
1263
      }
1264
    }
1265
    ++i;
44,336✔
1266
  }
1267
  taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
8,148✔
1268
  gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
8,148✔
1269
}
1270

1271
void cleanupNotExpiredUdfs() {
8,148✔
1272
  SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
8,148✔
1273
  if (udfStubs == NULL) {
8,148!
1274
    fnError("cleanupNotExpiredUdfs: failed to init array");
×
1275
    return;
×
1276
  }
1277
  int32_t i = 0;
8,148✔
1278
  while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
30,280✔
1279
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
22,132✔
1280
    if (stub->refCount == 0) {
22,132✔
1281
      fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
134!
1282
      (void)doTeardownUdf(stub->handle);
134✔
1283
    } else {
1284
      fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
21,998!
1285
             stub->refCount, stub->createTime, stub->handle);
1286
      UdfcFuncHandle handle = stub->handle;
21,998✔
1287
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
21,998!
1288
        if (taosArrayPush(udfStubs, stub) == NULL) {
21,998!
1289
          fnError("cleanupNotExpiredUdfs: failed to push udf stub to array");
×
1290
        }
1291
      } else {
1292
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName,
×
1293
               stub->refCount, stub->createTime);
1294
      }
1295
    }
1296
    ++i;
22,132✔
1297
  }
1298
  taosArrayDestroy(gUdfcProxy.udfStubs);
8,148✔
1299
  gUdfcProxy.udfStubs = udfStubs;
8,148✔
1300
}
1301

1302
int32_t cleanUpUdfs() {
10,611,039✔
1303
  int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
10,611,039✔
1304
  if (!initialized) {
10,611,411✔
1305
    return TSDB_CODE_SUCCESS;
73,780✔
1306
  }
1307

1308
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
10,537,631✔
1309
  if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
10,541,413!
1310
      (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
10,533,265!
1311
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
10,533,265✔
1312
    return TSDB_CODE_SUCCESS;
10,533,248✔
1313
  }
1314

1315
  cleanupNotExpiredUdfs();
8,148✔
1316
  cleanupExpiredUdfs();
8,148✔
1317

1318
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
8,148✔
1319
  return 0;
8,148✔
1320
}
1321

1322
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
26,747✔
1323
  TAOS_UDF_CHECK_PTR_RCODE(udfName, input, output);
107,079!
1324
  UdfcFuncHandle handle = NULL;
26,747✔
1325
  int32_t        code = acquireUdfFuncHandle(udfName, &handle);
26,747✔
1326
  if (code != 0) {
26,877!
1327
    return code;
×
1328
  }
1329

1330
  SUdfcUvSession *session = handle;
26,877✔
1331
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
26,877✔
1332
  if (code != TSDB_CODE_SUCCESS) {
26,799!
1333
    fnError("udfc scalar function execution failure");
×
1334
    releaseUdfFuncHandle(udfName, handle);
×
1335
    return code;
×
1336
  }
1337

1338
  if (output->columnData == NULL) {
26,804!
1339
    fnError("udfc scalar function calculate error. no column data");
×
1340
    code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1341
  } else {
1342
    if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) {
26,804!
1343
      fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
×
1344
              session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes);
1345
      code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1346
    }
1347
  }
1348
  releaseUdfFuncHandle(udfName, handle);
26,850✔
1349
  return code;
26,878✔
1350
}
1351

1352
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
32✔
1353
  if (pFunc == NULL || pEnv == NULL) {
32!
1354
    fnError("udfAggGetEnv: invalid input lint: %d", __LINE__);
×
1355
    return false;
×
1356
  }
1357
  if (fmIsScalarFunc(pFunc->funcId)) {
32!
1358
    return false;
×
1359
  }
1360
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
32✔
1361
  return true;
32✔
1362
}
1363

1364
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) {
39✔
1365
  TAOS_UDF_CHECK_PTR_RCODE(pCtx, pResultCellInfo);
117!
1366
  if (pResultCellInfo->initialized) {
39!
1367
    return TSDB_CODE_SUCCESS;
×
1368
  }
1369
  if (functionSetup(pCtx, pResultCellInfo) != TSDB_CODE_SUCCESS) {
39!
1370
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1371
  }
1372
  UdfcFuncHandle handle;
1373
  int32_t        udfCode = 0;
39✔
1374
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
39!
1375
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
×
1376
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1377
  }
1378
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
39✔
1379
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
39✔
1380
  int32_t         envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize;
39✔
1381
  memset(udfRes, 0, envSize);
39✔
1382

1383
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
39✔
1384
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
39✔
1385

1386
  SUdfInterBuf buf = {0};
39✔
1387
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
39!
1388
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
×
1389
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
1390
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1391
  }
1392
  if (buf.bufLen <= session->bufSize) {
39!
1393
    memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
39✔
1394
    udfRes->interResBufLen = buf.bufLen;
39✔
1395
    udfRes->interResNum = buf.numOfResult;
39✔
1396
  } else {
1397
    fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
×
1398
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
1399
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1400
  }
1401
  releaseUdfFuncHandle(pCtx->udfName, handle);
39✔
1402
  freeUdfInterBuf(&buf);
39✔
1403
  return TSDB_CODE_SUCCESS;
39✔
1404
}
1405

1406
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
46✔
1407
  TAOS_UDF_CHECK_PTR_RCODE(pCtx);
92!
1408
  int32_t        udfCode = 0;
46✔
1409
  UdfcFuncHandle handle = 0;
46✔
1410
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
46!
1411
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
1412
    return udfCode;
×
1413
  }
1414

1415
  SUdfcUvSession *session = handle;
46✔
1416
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
46✔
1417
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
46✔
1418
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
46✔
1419

1420
  SInputColumnInfoData *pInput = &pCtx->input;
46✔
1421
  int32_t               numOfCols = pInput->numOfInputCols;
46✔
1422
  int32_t               start = pInput->startRowIndex;
46✔
1423
  int32_t               numOfRows = pInput->numOfRows;
46✔
1424
  SSDataBlock          *pTempBlock = NULL;
46✔
1425
  int32_t               code = createDataBlock(&pTempBlock);
46✔
1426

1427
  if (code) {
46!
1428
    return code;
×
1429
  }
1430

1431
  pTempBlock->info.rows = pInput->totalRows;
46✔
1432
  pTempBlock->info.id.uid = pInput->uid;
46✔
1433
  for (int32_t i = 0; i < numOfCols; ++i) {
100✔
1434
    if ((udfCode = blockDataAppendColInfo(pTempBlock, pInput->pData[i])) != 0) {
54!
1435
      fnError("udfAggProcess error. step blockDataAppendColInfo. udf code: %d", udfCode);
×
1436
      blockDataDestroy(pTempBlock);
×
1437
      return udfCode;
×
1438
    }
1439
  }
1440

1441
  SSDataBlock *inputBlock = NULL;
46✔
1442
  code = blockDataExtractBlock(pTempBlock, start, numOfRows, &inputBlock);
46✔
1443
  if (code) {
46!
1444
    return code;
×
1445
  }
1446

1447
  SUdfInterBuf state = {
46✔
1448
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
46✔
1449
  SUdfInterBuf newState = {0};
46✔
1450

1451
  udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
46✔
1452
  if (udfCode != 0) {
46!
1453
    fnError("udfAggProcess error. code: %d", udfCode);
×
1454
    newState.numOfResult = 0;
×
1455
  } else {
1456
    if (newState.bufLen <= session->bufSize) {
46!
1457
      memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
46✔
1458
      udfRes->interResBufLen = newState.bufLen;
46✔
1459
      udfRes->interResNum = newState.numOfResult;
46✔
1460
    } else {
1461
      fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
×
1462
      udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
×
1463
    }
1464
  }
1465

1466
  GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum;
46✔
1467

1468
  blockDataDestroy(inputBlock);
46✔
1469

1470
  taosArrayDestroy(pTempBlock->pDataBlock);
46✔
1471
  taosMemoryFree(pTempBlock);
46!
1472

1473
  releaseUdfFuncHandle(pCtx->udfName, handle);
46✔
1474
  freeUdfInterBuf(&newState);
46✔
1475
  return udfCode;
46✔
1476
}
1477

1478
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
39✔
1479
  TAOS_UDF_CHECK_PTR_RCODE(pCtx, pBlock);
117!
1480
  int32_t        udfCode = 0;
39✔
1481
  UdfcFuncHandle handle = 0;
39✔
1482
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
39!
1483
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
1484
    return udfCode;
×
1485
  }
1486

1487
  SUdfcUvSession *session = handle;
39✔
1488
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
39✔
1489
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
39✔
1490
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
39✔
1491

1492
  SUdfInterBuf resultBuf = {0};
39✔
1493
  SUdfInterBuf state = {
39✔
1494
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
39✔
1495
  int32_t udfCallCode = 0;
39✔
1496
  udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
39✔
1497
  if (udfCallCode != 0) {
39!
1498
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
×
1499
    GET_RES_INFO(pCtx)->numOfRes = 0;
×
1500
  } else {
1501
    if (resultBuf.numOfResult == 0) {
39!
1502
      udfRes->finalResNum = 0;
×
1503
      GET_RES_INFO(pCtx)->numOfRes = 0;
×
1504
    } else {
1505
      if (resultBuf.bufLen <= session->bytes) {
39!
1506
        memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen);
39✔
1507
        udfRes->finalResNum = resultBuf.numOfResult;
39✔
1508
        GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
39✔
1509
      } else {
1510
        fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
×
1511
        GET_RES_INFO(pCtx)->numOfRes = 0;
×
1512
        udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1513
      }
1514
    }
1515
  }
1516

1517
  freeUdfInterBuf(&resultBuf);
39✔
1518

1519
  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
39✔
1520
  releaseUdfFuncHandle(pCtx->udfName, handle);
39✔
1521
  return udfCallCode == 0 ? numOfResults : udfCallCode;
39!
1522
}
1523

1524
void onUdfcPipeClose(uv_handle_t *handle) {
134✔
1525
  SClientUvConn *conn = handle->data;
134✔
1526
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
134!
1527
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
134✔
1528
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
134✔
1529
    task->errCode = 0;
134✔
1530
    QUEUE_REMOVE(&task->procTaskQueue);
134✔
1531
    uv_sem_post(&task->taskSem);
134✔
1532
  }
1533
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
134✔
1534
  if (conn->session != NULL) {
134!
1535
    conn->session->udfUvPipe = NULL;
134✔
1536
  }
1537
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
134✔
1538
  taosMemoryFree(conn->readBuf.buf);
134!
1539
  taosMemoryFree(conn);
134!
1540
  taosMemoryFree((uv_pipe_t *)handle);
134!
1541
}
134✔
1542

1543
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
27,302✔
1544
  int32_t code = 0;
27,302✔
1545
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
27,302✔
1546
  if (uvTask->type == UV_TASK_REQ_RSP) {
27,432✔
1547
    if (uvTask->rspBuf.base != NULL) {
27,141!
1548
      SUdfResponse rsp = {0};
27,245✔
1549
      void        *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
27,245✔
1550
      code = rsp.code;
27,087✔
1551
      if (code != 0) {
27,087!
1552
        fnError("udfc get udf task result failure. code: %d", code);
×
1553
      }
1554

1555
      switch (task->type) {
27,182!
1556
        case UDF_TASK_SETUP: {
157✔
1557
          task->_setup.rsp = rsp.setupRsp;
157✔
1558
          break;
157✔
1559
        }
1560
        case UDF_TASK_CALL: {
26,891✔
1561
          task->_call.rsp = rsp.callRsp;
26,891✔
1562
          break;
26,891✔
1563
        }
1564
        case UDF_TASK_TEARDOWN: {
134✔
1565
          task->_teardown.rsp = rsp.teardownRsp;
1566
          break;
134✔
1567
        }
1568
        default: {
×
1569
          break;
×
1570
        }
1571
      }
1572

1573
      // TODO: the call buffer is setup and freed by udf invocation
1574
      taosMemoryFreeClear(uvTask->rspBuf.base);
27,182!
1575
    } else {
1576
      code = uvTask->errCode;
×
1577
      if (code != 0) {
×
1578
        fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1579
      }
1580
    }
1581
  } else if (uvTask->type == UV_TASK_CONNECT) {
291✔
1582
    code = uvTask->errCode;
157✔
1583
    if (code != 0) {
157!
1584
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1585
    }
1586
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
134!
1587
    code = uvTask->errCode;
134✔
1588
    if (code != 0) {
134!
1589
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1590
    }
1591
  }
1592
  return code;
27,543✔
1593
}
1594

1595
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
74,083✔
1596
  SClientUvConn  *conn = handle->data;
74,083✔
1597
  SClientConnBuf *connBuf = &conn->readBuf;
74,083✔
1598

1599
  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
74,083✔
1600
  if (connBuf->cap == 0) {
74,083✔
1601
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
27,450!
1602
    if (connBuf->buf) {
27,450!
1603
      connBuf->len = 0;
27,450✔
1604
      connBuf->cap = msgHeadSize;
27,450✔
1605
      connBuf->total = -1;
27,450✔
1606

1607
      buf->base = connBuf->buf;
27,450✔
1608
      buf->len = connBuf->cap;
27,450✔
1609
    } else {
1610
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
×
1611
      buf->base = NULL;
×
1612
      buf->len = 0;
×
1613
    }
1614
  } else if (connBuf->total == -1 && connBuf->len < msgHeadSize) {
46,633!
1615
    buf->base = connBuf->buf + connBuf->len;
19,340✔
1616
    buf->len = msgHeadSize - connBuf->len;
19,340✔
1617
  } else {
1618
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
27,293✔
1619
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
27,293!
1620
    if (resultBuf) {
27,293!
1621
      connBuf->buf = resultBuf;
27,293✔
1622
      buf->base = connBuf->buf + connBuf->len;
27,293✔
1623
      buf->len = connBuf->cap - connBuf->len;
27,293✔
1624
    } else {
1625
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
×
1626
      buf->base = NULL;
×
1627
      buf->len = 0;
×
1628
    }
1629
  }
1630

1631
  fnDebug("udfc uv alloc buffer: cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
74,083✔
1632
}
74,083✔
1633

1634
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
54,586✔
1635
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
54,586!
1636
    connBuf->total = *(int32_t *)(connBuf->buf);
27,293✔
1637
  }
1638
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
54,586!
1639
    fnDebug("udfc complete message is received, now handle it");
27,293✔
1640
    return true;
27,293✔
1641
  }
1642
  return false;
27,293✔
1643
}
1644

1645
void udfcUvHandleRsp(SClientUvConn *conn) {
27,293✔
1646
  SClientConnBuf *connBuf = &conn->readBuf;
27,293✔
1647
  int64_t         seqNum = *(int64_t *)(connBuf->buf + sizeof(int32_t));  // msglen then seqnum
27,293✔
1648

1649
  if (QUEUE_EMPTY(&conn->taskQueue)) {
27,293!
1650
    fnError("udfc no task waiting on connection. response seqnum:%" PRId64, seqNum);
×
1651
    return;
×
1652
  }
1653
  bool               found = false;
27,293✔
1654
  SClientUvTaskNode *taskFound = NULL;
27,293✔
1655
  QUEUE             *h = QUEUE_NEXT(&conn->taskQueue);
27,293✔
1656
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
27,293✔
1657

1658
  while (h != &conn->taskQueue) {
97,282✔
1659
    fnDebug("udfc handle response iterate through queue. uvTask:%" PRId64 "-%p", task->seqNum, task);
69,989✔
1660
    if (task->seqNum == seqNum) {
69,989✔
1661
      if (found == false) {
27,293!
1662
        found = true;
27,293✔
1663
        taskFound = task;
27,293✔
1664
      } else {
1665
        fnError("udfc more than one task waiting for the same response");
×
1666
        continue;
×
1667
      }
1668
    }
1669
    h = QUEUE_NEXT(h);
69,989✔
1670
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
69,989✔
1671
  }
1672

1673
  if (taskFound) {
27,293!
1674
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
27,293✔
1675
    QUEUE_REMOVE(&taskFound->connTaskQueue);
27,293✔
1676
    QUEUE_REMOVE(&taskFound->procTaskQueue);
27,293✔
1677
    uv_sem_post(&taskFound->taskSem);
27,293✔
1678
  } else {
1679
    fnError("no task is waiting for the response.");
×
1680
  }
1681
  connBuf->buf = NULL;
27,293✔
1682
  connBuf->total = -1;
27,293✔
1683
  connBuf->len = 0;
27,293✔
1684
  connBuf->cap = 0;
27,293✔
1685
}
1686

1687
void udfcUvHandleError(SClientUvConn *conn) {
×
1688
  fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe);
×
1689
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
×
1690
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
×
1691
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
×
1692
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
×
1693
    QUEUE_REMOVE(&task->connTaskQueue);
×
1694
    QUEUE_REMOVE(&task->procTaskQueue);
×
1695
    uv_sem_post(&task->taskSem);
×
1696
  }
1697
  if (!uv_is_closing((uv_handle_t *)conn->pipe)) {
×
1698
    uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose);
×
1699
  }
1700
}
×
1701

1702
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
74,083✔
1703
  fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread);
74,083✔
1704
  if (nread == 0) return;
74,083✔
1705

1706
  SClientUvConn  *conn = client->data;
54,586✔
1707
  SClientConnBuf *connBuf = &conn->readBuf;
54,586✔
1708
  if (nread > 0) {
54,586!
1709
    connBuf->len += nread;
54,586✔
1710
    if (isUdfcUvMsgComplete(connBuf)) {
54,586✔
1711
      udfcUvHandleRsp(conn);
27,293✔
1712
    }
1713
  }
1714
  if (nread < 0) {
54,586!
1715
    fnError("udfc client pipe %p read error: %zd(%s).", client, nread, uv_strerror(nread));
×
1716
    if (nread == UV_EOF) {
×
1717
      fnError("\tudfc client pipe %p closed", client);
×
1718
    }
1719
    udfcUvHandleError(conn);
×
1720
  }
1721
}
1722

1723
void onUdfcPipeWrite(uv_write_t *write, int32_t status) {
27,293✔
1724
  SClientUvConn *conn = write->data;
27,293✔
1725
  if (status < 0) {
27,293!
1726
    fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
×
1727
    udfcUvHandleError(conn);
×
1728
  } else {
1729
    fnDebug("udfc client connection %p write succeed", conn);
27,293✔
1730
  }
1731
  taosMemoryFree(write);
27,293!
1732
}
27,293✔
1733

1734
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status) {
157✔
1735
  SClientUvTaskNode *uvTask = connect->data;
157✔
1736
  if (status != 0) {
157!
1737
    fnError("client connect error, task seq: %" PRId64 ", code:%s", uvTask->seqNum, uv_strerror(status));
×
1738
  }
1739
  uvTask->errCode = status;
157✔
1740

1741
  int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
157✔
1742
  if (code != 0) {
157!
1743
    fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code));
×
1744
    uvTask->errCode = code;
×
1745
  }
1746
  taosMemoryFree(connect);
157!
1747
  QUEUE_REMOVE(&uvTask->procTaskQueue);
157✔
1748
  uv_sem_post(&uvTask->taskSem);
157✔
1749
}
157✔
1750

1751
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask) {
27,521✔
1752
  uvTask->type = uvTaskType;
27,521✔
1753
  uvTask->udfc = task->session->udfc;
27,521✔
1754

1755
  if (uvTaskType == UV_TASK_CONNECT) {
27,521✔
1756
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
27,398✔
1757
    uvTask->pipe = task->session->udfUvPipe;
27,280✔
1758
    SUdfRequest request;
1759
    request.type = task->type;
27,280✔
1760
    request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
27,280✔
1761

1762
    if (task->type == UDF_TASK_SETUP) {
27,285✔
1763
      request.setup = task->_setup.req;
157✔
1764
      request.type = UDF_TASK_SETUP;
157✔
1765
    } else if (task->type == UDF_TASK_CALL) {
27,128✔
1766
      request.call = task->_call.req;
26,996✔
1767
      request.type = UDF_TASK_CALL;
26,996✔
1768
    } else if (task->type == UDF_TASK_TEARDOWN) {
132!
1769
      request.teardown = task->_teardown.req;
134✔
1770
      request.type = UDF_TASK_TEARDOWN;
134✔
1771
    } else {
1772
      fnError("udfc create uv task, invalid task type : %d", task->type);
×
1773
    }
1774
    int32_t bufLen = encodeUdfRequest(NULL, &request);
27,285✔
1775
    if (bufLen <= 0) {
27,181!
1776
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
1777
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1778
    }
1779
    request.msgLen = bufLen;
27,181✔
1780
    void *bufBegin = taosMemoryMalloc(bufLen);
27,181✔
1781
    if (bufBegin == NULL) {
27,246!
1782
      fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen);
×
1783
      return terrno;
×
1784
    }
1785
    void *buf = bufBegin;
27,246✔
1786
    if (encodeUdfRequest(&buf, &request) <= 0) {
27,246!
1787
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
1788
      taosMemoryFree(bufBegin);
×
1789
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1790
    }
1791

1792
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
27,216✔
1793
    uvTask->seqNum = request.seqNum;
27,136✔
1794
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
118!
1795
    uvTask->pipe = task->session->udfUvPipe;
134✔
1796
  }
1797
  if (uv_sem_init(&uvTask->taskSem, 0) != 0) {
27,377✔
1798
    if (uvTaskType == UV_TASK_REQ_RSP) {
10!
1799
      taosMemoryFreeClear(uvTask->reqBuf.base);
×
1800
    }
1801
    fnError("udfc create uv task, init semaphore failed.");
10!
1802
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1803
  }
1804

1805
  return 0;
27,433✔
1806
}
1807

1808
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
27,435✔
1809
  fnDebug("queue uv task to event loop, uvTask: %d-%p", uvTask->type, uvTask);
27,435✔
1810
  SUdfcProxy *udfc = uvTask->udfc;
27,435✔
1811
  uv_mutex_lock(&udfc->taskQueueMutex);
27,435✔
1812
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
27,584✔
1813
  uv_mutex_unlock(&udfc->taskQueueMutex);
27,584✔
1814
  int32_t code = uv_async_send(&udfc->loopTaskAync);
27,581✔
1815
  if (code != 0) {
27,554!
1816
    fnError("udfc queue uv task to event loop failed. code:%s", uv_strerror(code));
×
1817
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1818
  }
1819

1820
  uv_sem_wait(&uvTask->taskSem);
27,554✔
1821
  fnDebug("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
27,316✔
1822
  uv_sem_destroy(&uvTask->taskSem);
27,316✔
1823

1824
  return 0;
27,303✔
1825
}
1826

1827
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
27,584✔
1828
  fnDebug("event loop start uv task. uvTask: %" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
27,584✔
1829
  int32_t code = 0;
27,584✔
1830

1831
  switch (uvTask->type) {
27,584!
1832
    case UV_TASK_CONNECT: {
157✔
1833
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
157!
1834
      if (pipe == NULL) {
157!
1835
        fnError("udfc event loop start connect task malloc pipe failed.");
×
1836
        return terrno;
×
1837
      }
1838
      if (uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0) != 0) {
157!
1839
        fnError("udfc event loop start connect task uv_pipe_init failed.");
×
1840
        taosMemoryFree(pipe);
×
1841
        return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1842
      }
1843
      uvTask->pipe = pipe;
157✔
1844

1845
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
157!
1846
      if (conn == NULL) {
157!
1847
        fnError("udfc event loop start connect task malloc conn failed.");
×
1848
        taosMemoryFree(pipe);
×
1849
        return terrno;
×
1850
      }
1851
      conn->pipe = pipe;
157✔
1852
      conn->readBuf.len = 0;
157✔
1853
      conn->readBuf.cap = 0;
157✔
1854
      conn->readBuf.buf = 0;
157✔
1855
      conn->readBuf.total = -1;
157✔
1856
      QUEUE_INIT(&conn->taskQueue);
157✔
1857

1858
      pipe->data = conn;
157✔
1859

1860
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
157!
1861
      if (connReq == NULL) {
157!
1862
        fnError("udfc event loop start connect task malloc connReq failed.");
×
1863
        taosMemoryFree(pipe);
×
1864
        taosMemoryFree(conn);
×
1865
        return terrno;
×
1866
      }
1867
      connReq->data = uvTask;
157✔
1868
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
157✔
1869
      code = 0;
157✔
1870
      break;
157✔
1871
    }
1872
    case UV_TASK_REQ_RSP: {
27,293✔
1873
      uv_pipe_t *pipe = uvTask->pipe;
27,293✔
1874
      if (pipe == NULL) {
27,293!
1875
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1876
      } else {
1877
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
27,293!
1878
        if (write == NULL) {
27,293!
1879
          fnError("udfc event loop start req_rsp task malloc write failed.");
×
1880
          return terrno;
×
1881
        }
1882
        write->data = pipe->data;
27,293✔
1883
        QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
27,293✔
1884
        QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
27,293✔
1885
        int32_t err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
27,293✔
1886
        if (err != 0) {
27,293!
1887
          taosMemoryFree(write);
×
1888
          fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code:%s", uvTask, uv_strerror(err));
×
1889
        }
1890
        code = err;
27,293✔
1891
      }
1892
      break;
27,293✔
1893
    }
1894
    case UV_TASK_DISCONNECT: {
134✔
1895
      uv_pipe_t *pipe = uvTask->pipe;
134✔
1896
      if (pipe == NULL) {
134!
1897
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1898
      } else {
1899
        SClientUvConn *conn = pipe->data;
134✔
1900
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
134✔
1901
        if (!uv_is_closing((uv_handle_t *)uvTask->pipe)) {
134!
1902
          uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
134✔
1903
        }
1904
        code = 0;
134✔
1905
      }
1906
      break;
134✔
1907
    }
1908
    default: {
×
1909
      fnError("udfc event loop unknown task type.") break;
×
1910
    }
1911
  }
1912

1913
  return code;
27,584✔
1914
}
1915

1916
void udfcAsyncTaskCb(uv_async_t *async) {
23,479✔
1917
  SUdfcProxy *udfc = async->data;
23,479✔
1918
  QUEUE       wq;
1919

1920
  uv_mutex_lock(&udfc->taskQueueMutex);
23,479✔
1921
  QUEUE_MOVE(&udfc->taskQueue, &wq);
23,479✔
1922
  uv_mutex_unlock(&udfc->taskQueueMutex);
23,479✔
1923

1924
  while (!QUEUE_EMPTY(&wq)) {
51,063✔
1925
    QUEUE *h = QUEUE_HEAD(&wq);
27,584✔
1926
    QUEUE_REMOVE(h);
27,584✔
1927
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
27,584✔
1928
    int32_t            code = udfcStartUvTask(task);
27,584✔
1929
    if (code == 0) {
27,584!
1930
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
27,584✔
1931
    } else {
1932
      task->errCode = code;
×
1933
      uv_sem_post(&task->taskSem);
×
1934
    }
1935
  }
1936
}
23,479✔
1937

1938
void cleanUpUvTasks(SUdfcProxy *udfc) {
2,125✔
1939
  fnDebug("clean up uv tasks") QUEUE wq;
2,125✔
1940

1941
  uv_mutex_lock(&udfc->taskQueueMutex);
2,125✔
1942
  QUEUE_MOVE(&udfc->taskQueue, &wq);
2,125!
1943
  uv_mutex_unlock(&udfc->taskQueueMutex);
2,125✔
1944

1945
  while (!QUEUE_EMPTY(&wq)) {
2,125!
1946
    QUEUE *h = QUEUE_HEAD(&wq);
×
1947
    QUEUE_REMOVE(h);
×
1948
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
×
1949
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1950
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1951
    }
1952
    uv_sem_post(&task->taskSem);
×
1953
  }
1954

1955
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
2,125!
1956
    QUEUE *h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
×
1957
    QUEUE_REMOVE(h);
×
1958
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
×
1959
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1960
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1961
    }
1962
    uv_sem_post(&task->taskSem);
×
1963
  }
1964
}
2,125✔
1965

1966
void udfStopAsyncCb(uv_async_t *async) {
2,125✔
1967
  SUdfcProxy *udfc = async->data;
2,125✔
1968
  cleanUpUvTasks(udfc);
2,125✔
1969
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
2,125!
1970
    uv_stop(&udfc->uvLoop);
2,125✔
1971
  }
1972
}
2,125✔
1973

1974
void constructUdfService(void *argsThread) {
2,125✔
1975
  int32_t     code = 0, lino = 0;
2,125✔
1976
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
2,125✔
1977
  code = uv_loop_init(&udfc->uvLoop);
2,125✔
1978
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,125!
1979

1980
  code = uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
2,125✔
1981
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,125!
1982
  udfc->loopTaskAync.data = udfc;
2,125✔
1983
  code = uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
2,125✔
1984
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,125!
1985
  udfc->loopStopAsync.data = udfc;
2,125✔
1986
  code = uv_mutex_init(&udfc->taskQueueMutex);
2,125✔
1987
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,125!
1988
  QUEUE_INIT(&udfc->taskQueue);
2,125✔
1989
  QUEUE_INIT(&udfc->uvProcTaskQueue);
2,125✔
1990
  (void)uv_barrier_wait(&udfc->initBarrier);
2,125✔
1991
  // TODO return value of uv_run
1992
  int32_t num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
2,125✔
1993
  fnInfo("udfc uv loop exit. active handle num: %d", num);
2,125!
1994
  (void)uv_loop_close(&udfc->uvLoop);
2,125✔
1995

1996
  uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL);
2,125✔
1997
  num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
2,125✔
1998
  fnInfo("udfc uv loop exit. active handle num: %d", num);
2,125!
1999

2000
  (void)uv_loop_close(&udfc->uvLoop);
2,125✔
2001
_exit:
2,125✔
2002
  if (code != 0) {
2,125!
2003
    fnError("udfc construct error. code: %d, line: %d", code, lino);
×
2004
  }
2005
  fnInfo("udfc construct finished");
2,125!
2006
}
2,125✔
2007

2008
int32_t udfcOpen() {
2,692✔
2009
  int32_t code = 0, lino = 0;
2,692✔
2010
  int8_t  old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
2,692✔
2011
  if (old == 1) {
2,692✔
2012
    return 0;
567✔
2013
  }
2014
  SUdfcProxy *proxy = &gUdfcProxy;
2,125✔
2015
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
2,125✔
2016
  proxy->udfcState = UDFC_STATE_STARTNG;
2,125✔
2017
  code = uv_barrier_init(&proxy->initBarrier, 2);
2,125✔
2018
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,125!
2019
  code = uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
2,125✔
2020
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,125!
2021
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
2,125✔
2022
  proxy->udfcState = UDFC_STATE_READY;
2,125✔
2023
  (void)uv_barrier_wait(&proxy->initBarrier);
2,125✔
2024
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,125!
2025
  code = uv_mutex_init(&proxy->udfStubsMutex);
2,125✔
2026
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,125!
2027
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
2,125✔
2028
  if (proxy->udfStubs == NULL) {
2,125!
2029
    fnError("udfc init failed. udfStubs: %p", proxy->udfStubs);
×
2030
    return -1;
×
2031
  }
2032
  proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
2,125✔
2033
  if (proxy->expiredUdfStubs == NULL) {
2,125!
2034
    taosArrayDestroy(proxy->udfStubs);
×
2035
    fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs);
×
2036
    return -1;
×
2037
  }
2038
  code = uv_mutex_init(&proxy->udfcUvMutex);
2,125✔
2039
  TAOS_CHECK_GOTO(code, &lino, _exit);
2,125!
2040
_exit:
2,125✔
2041
  if (code != 0) {
2,125!
2042
    fnError("udfc open error. code: %d, line: %d", code, lino);
×
2043
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
2044
  }
2045
  fnInfo("udfc initialized");
2,125!
2046
  return 0;
2,125✔
2047
}
2048

2049
int32_t udfcClose() {
2,157✔
2050
  int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 1, 0);
2,157✔
2051
  if (old == 0) {
2,157✔
2052
    return 0;
32✔
2053
  }
2054

2055
  SUdfcProxy *udfc = &gUdfcProxy;
2,125✔
2056
  udfc->udfcState = UDFC_STATE_STOPPING;
2,125✔
2057
  if (uv_async_send(&udfc->loopStopAsync) != 0) {
2,125!
2058
    fnError("udfc close error to send stop async");
×
2059
  }
2060
  if (uv_thread_join(&udfc->loopThread) != 0) {
2,125!
2061
    fnError("udfc close errir to join loop thread");
×
2062
  }
2063
  uv_mutex_destroy(&udfc->taskQueueMutex);
2,125✔
2064
  uv_barrier_destroy(&udfc->initBarrier);
2,125✔
2065
  taosArrayDestroy(udfc->expiredUdfStubs);
2,125✔
2066
  taosArrayDestroy(udfc->udfStubs);
2,125✔
2067
  uv_mutex_destroy(&udfc->udfStubsMutex);
2,125✔
2068
  uv_mutex_destroy(&udfc->udfcUvMutex);
2,125✔
2069
  udfc->udfcState = UDFC_STATE_INITAL;
2,125✔
2070
  fnInfo("udfc is cleaned up");
2,125!
2071
  return 0;
2,125✔
2072
}
2073

2074
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
27,534✔
2075
  int32_t            code = 0, lino = 0;
27,534✔
2076
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
27,534!
2077
  if (uvTask == NULL) {
27,578!
2078
    fnError("udfc client task: %p failed to allocate memory for uvTask", task);
×
2079
    return terrno;
×
2080
  }
2081
  fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
27,578✔
2082

2083
  code = udfcInitializeUvTask(task, uvTaskType, uvTask);
27,578✔
2084
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,451!
2085
  code = udfcQueueUvTask(uvTask);
27,451✔
2086
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,312!
2087
  code = udfcGetUdfTaskResultFromUvTask(task, uvTask);
27,312✔
2088
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,549!
2089
  if (uvTaskType == UV_TASK_CONNECT) {
27,549✔
2090
    task->session->udfUvPipe = uvTask->pipe;
157✔
2091
    SClientUvConn *conn = uvTask->pipe->data;
157✔
2092
    conn->session = task->session;
157✔
2093
  }
2094

2095
_exit:
27,392✔
2096
  if (code != 0) {
27,549!
2097
    fnError("udfc run udf uv task failure. task: %p, uvTask: %p, err: %d, line: %d", task, uvTask, code, lino);
×
2098
  }
2099
  taosMemoryFree(uvTask->reqBuf.base);
27,549!
2100
  uvTask->reqBuf.base = NULL;
27,568✔
2101
  taosMemoryFree(uvTask);
27,568!
2102
  uvTask = NULL;
27,553✔
2103
  return code;
27,553✔
2104
}
2105

2106
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
157✔
2107
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
157✔
2108
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
157!
2109
  if (task == NULL) {
157!
2110
    fnError("doSetupUdf, failed to allocate memory for task");
×
2111
    return terrno;
×
2112
  }
2113
  task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
157!
2114
  if (task->session == NULL) {
157!
2115
    fnError("doSetupUdf, failed to allocate memory for session");
×
2116
    taosMemoryFree(task);
×
2117
    return terrno;
×
2118
  }
2119
  task->session->udfc = &gUdfcProxy;
157✔
2120
  task->type = UDF_TASK_SETUP;
157✔
2121

2122
  SUdfSetupRequest *req = &task->_setup.req;
157✔
2123
  tstrncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
157✔
2124

2125
  code = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
157✔
2126
  TAOS_CHECK_GOTO(code, &lino, _exit);
157!
2127

2128
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
157✔
2129
  TAOS_CHECK_GOTO(code, &lino, _exit);
157!
2130

2131
  SUdfSetupResponse *rsp = &task->_setup.rsp;
157✔
2132
  task->session->severHandle = rsp->udfHandle;
157✔
2133
  task->session->outputType = rsp->outputType;
157✔
2134
  task->session->bytes = rsp->bytes;
157✔
2135
  task->session->bufSize = rsp->bufSize;
157✔
2136
  tstrncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
157✔
2137
  fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
157!
2138
  *funcHandle = task->session;
157✔
2139
  taosMemoryFree(task);
157!
2140
  return 0;
157✔
2141

2142
_exit:
×
2143
  if (code != 0) {
×
2144
    fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino);
×
2145
  }
2146
  taosMemoryFree(task->session);
×
2147
  taosMemoryFree(task);
×
2148
  return code;
×
2149
}
2150

2151
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
26,972✔
2152
                SSDataBlock *output, SUdfInterBuf *newState) {
2153
  fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
26,972✔
2154
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
26,979✔
2155
  if (session->udfUvPipe == NULL) {
26,979!
2156
    fnError("No pipe to taosudf");
×
2157
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2158
  }
2159
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
26,979!
2160
  if (task == NULL) {
26,987!
2161
    fnError("udfc call udf. failed to allocate memory for task");
×
2162
    return terrno;
×
2163
  }
2164
  task->session = (SUdfcUvSession *)handle;
26,987✔
2165
  task->type = UDF_TASK_CALL;
26,987✔
2166

2167
  SUdfCallRequest *req = &task->_call.req;
26,987✔
2168
  req->udfHandle = task->session->severHandle;
26,987✔
2169
  req->callType = callType;
26,987✔
2170

2171
  switch (callType) {
26,987✔
2172
    case TSDB_UDF_CALL_AGG_INIT: {
39✔
2173
      req->initFirst = 1;
39✔
2174
      break;
39✔
2175
    }
2176
    case TSDB_UDF_CALL_AGG_PROC: {
46✔
2177
      req->block = *input;
46✔
2178
      req->interBuf = *state;
46✔
2179
      break;
46✔
2180
    }
2181
    // case TSDB_UDF_CALL_AGG_MERGE: {
2182
    //   req->interBuf = *state;
2183
    //   req->interBuf2 = *state2;
2184
    //   break;
2185
    // }
2186
    case TSDB_UDF_CALL_AGG_FIN: {
39✔
2187
      req->interBuf = *state;
39✔
2188
      break;
39✔
2189
    }
2190
    case TSDB_UDF_CALL_SCALA_PROC: {
26,851✔
2191
      req->block = *input;
26,851✔
2192
      break;
26,851✔
2193
    }
2194
  }
2195

2196
  int32_t code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
26,987✔
2197
  if (code != 0) {
26,945!
2198
    fnError("call udf failure. udfcRunUdfUvTask err: %d", code);
×
2199
  } else {
2200
    SUdfCallResponse *rsp = &task->_call.rsp;
26,945✔
2201
    switch (callType) {
26,945!
2202
      case TSDB_UDF_CALL_AGG_INIT: {
39✔
2203
        *newState = rsp->resultBuf;
39✔
2204
        break;
39✔
2205
      }
2206
      case TSDB_UDF_CALL_AGG_PROC: {
46✔
2207
        *newState = rsp->resultBuf;
46✔
2208
        break;
46✔
2209
      }
2210
      // case TSDB_UDF_CALL_AGG_MERGE: {
2211
      //   *newState = rsp->resultBuf;
2212
      //   break;
2213
      // }
2214
      case TSDB_UDF_CALL_AGG_FIN: {
39✔
2215
        *newState = rsp->resultBuf;
39✔
2216
        break;
39✔
2217
      }
2218
      case TSDB_UDF_CALL_SCALA_PROC: {
26,826✔
2219
        *output = rsp->resultData;
26,826✔
2220
        break;
26,826✔
2221
      }
2222
    }
2223
  }
2224
  taosMemoryFree(task);
26,945!
2225
  return code;
26,998✔
2226
}
2227

2228
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
39✔
2229
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;
39✔
2230

2231
  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
39✔
2232

2233
  return err;
39✔
2234
}
2235

2236
// input: block, state
2237
// output: interbuf,
2238
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
46✔
2239
  int8_t  callType = TSDB_UDF_CALL_AGG_PROC;
46✔
2240
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
46✔
2241
  return err;
46✔
2242
}
2243

2244
// input: interbuf1, interbuf2
2245
// output: resultBuf
2246
// udf todo:  aggmerge
2247
// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
2248
//                           SUdfInterBuf *resultBuf) {
2249
//   int8_t  callType = TSDB_UDF_CALL_AGG_MERGE;
2250
//   int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
2251
//   return err;
2252
// }
2253

2254
// input: interBuf
2255
// output: resultData
2256
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
39✔
2257
  int8_t  callType = TSDB_UDF_CALL_AGG_FIN;
39✔
2258
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
39✔
2259
  return err;
39✔
2260
}
2261

2262
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
26,876✔
2263
  int8_t      callType = TSDB_UDF_CALL_SCALA_PROC;
26,876✔
2264
  SSDataBlock inputBlock = {0};
26,876✔
2265
  int32_t     code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
26,876✔
2266
  if (code != 0) {
26,866!
2267
    fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code);
×
2268
    return code;
×
2269
  }
2270
  SSDataBlock resultBlock = {0};
26,866✔
2271
  int32_t     err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
26,866✔
2272
  if (err == 0) {
26,825!
2273
    err = convertDataBlockToScalarParm(&resultBlock, output);
26,841✔
2274
    taosArrayDestroy(resultBlock.pDataBlock);
26,832✔
2275
  }
2276

2277
  blockDataFreeRes(&inputBlock);
26,846✔
2278
  return err;
26,819✔
2279
}
2280

2281
int32_t doTeardownUdf(UdfcFuncHandle handle) {
134✔
2282
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
134✔
2283
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
134✔
2284

2285
  if (session->udfUvPipe == NULL) {
134!
2286
    fnError("tear down udf. pipe to taosudf does not exist. udf name: %s", session->udfName);
×
2287
    taosMemoryFree(session);
×
2288
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2289
  }
2290

2291
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
134!
2292
  if (task == NULL) {
134!
2293
    fnError("doTeardownUdf, failed to allocate memory for task");
×
2294
    taosMemoryFree(session);
×
2295
    return terrno;
×
2296
  }
2297
  task->session = session;
134✔
2298
  task->type = UDF_TASK_TEARDOWN;
134✔
2299

2300
  SUdfTeardownRequest *req = &task->_teardown.req;
134✔
2301
  req->udfHandle = task->session->severHandle;
134✔
2302

2303
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
134✔
2304
  TAOS_CHECK_GOTO(code, &lino, _exit);
134!
2305

2306
  code = udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
134✔
2307
  TAOS_CHECK_GOTO(code, &lino, _exit);
134!
2308

2309
  fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
134!
2310
  // TODO: synchronization refactor between libuv event loop and request thread
2311
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
134✔
2312
  if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
134!
2313
    SClientUvConn *conn = session->udfUvPipe->data;
×
2314
    conn->session = NULL;
×
2315
  }
2316
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
134✔
2317

2318
_exit:
134✔
2319
  if (code != 0) {
134!
2320
    fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino);
×
2321
  }
2322
  taosMemoryFree(session);
134!
2323
  taosMemoryFree(task);
134!
2324

2325
  return code;
134✔
2326
}
2327
#else
2328
#include "tudf.h"
2329

2330
int32_t cleanUpUdfs() { return 0; }
2331
int32_t udfcOpen() { return 0; }
2332
int32_t udfcClose() { return 0; }
2333
int32_t udfStartUdfd(int32_t startDnodeId) { return 0; }
2334
void    udfStopUdfd() { return; }
2335
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
2336
  return TSDB_CODE_OPS_NOT_SUPPORT;
2337
}
2338
#endif
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc