• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.36
/source/libs/function/src/tudf.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "uv.h"
16

17
#include "os.h"
18

19
#include "builtinsimpl.h"
20
#include "fnLog.h"
21
#include "functionMgt.h"
22
#include "querynodes.h"
23
#include "tarray.h"
24
#include "tdatablock.h"
25
#include "tglobal.h"
26
#include "tudf.h"
27
#include "tudfInt.h"
28

29
#ifdef _TD_DARWIN_64
30
#include <mach-o/dyld.h>
31
#endif
32

33
typedef struct SUdfdData {
34
  bool         startCalled;
35
  bool         needCleanUp;
36
  uv_loop_t    loop;
37
  uv_thread_t  thread;
38
  uv_barrier_t barrier;
39
  uv_process_t process;
40
#ifdef WINDOWS
41
  HANDLE jobHandle;
42
#endif
43
  int32_t    spawnErr;
44
  uv_pipe_t  ctrlPipe;
45
  uv_async_t stopAsync;
46
  int32_t    stopCalled;
47

48
  int32_t dnodeId;
49
} SUdfdData;
50

51
SUdfdData udfdGlobal = {0};
52

53
int32_t udfStartUdfd(int32_t startDnodeId);
54
void    udfStopUdfd();
55

56
extern char **environ;
57

58
static int32_t udfSpawnUdfd(SUdfdData *pData);
59
void           udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
60
static void    udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
61
static void    udfUdfdStopAsyncCb(uv_async_t *async);
62
static void    udfWatchUdfd(void *args);
63

64
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
17✔
65
  TAOS_UDF_CHECK_PTR_RVOID(process);
34!
66
  fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
17!
67
  SUdfdData *pData = process->data;
17✔
68
  if(pData == NULL) {
17!
69
    fnError("udfd process data is NULL");
×
70
    return;
×
71
  }
72
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
17!
73
    fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
×
74
  } else {
75
    fnInfo("udfd process restart");
17!
76
    int32_t code = udfSpawnUdfd(pData);
17✔
77
    if (code != 0) {
17!
78
      fnError("udfd process restart failed with code:%d", code);
×
79
    }
80
  }
81
}
82

83
static int32_t udfSpawnUdfd(SUdfdData *pData) {
1,904✔
84
  fnInfo("start to init udfd");
1,904!
85
  TAOS_UDF_CHECK_PTR_RCODE(pData);
3,808!
86

87
  int32_t              err = 0;
1,904✔
88
  uv_process_options_t options = {0};
1,904✔
89

90
  char path[PATH_MAX] = {0};
1,904✔
91
  if (tsProcPath == NULL) {
1,904✔
92
    path[0] = '.';
16✔
93
#ifdef WINDOWS
94
    GetModuleFileName(NULL, path, PATH_MAX);
95
    TAOS_DIRNAME(path);
96
#elif defined(_TD_DARWIN_64)
97
    uint32_t pathSize = sizeof(path);
98
    _NSGetExecutablePath(path, &pathSize);
99
    TAOS_DIRNAME(path);
100
#endif
101
  } else {
102
    TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
1,888✔
103
    TAOS_DIRNAME(path);
1,888✔
104
  }
105
#ifdef WINDOWS
106
  if (strlen(path) == 0) {
107
    TAOS_STRCAT(path, "C:\\TDengine");
108
  }
109
  TAOS_STRCAT(path, "\\udfd.exe");
110
#else
111
  if (strlen(path) == 0) {
1,904!
112
    TAOS_STRCAT(path, "/usr/bin");
×
113
  }
114
  TAOS_STRCAT(path, "/udfd");
1,904✔
115
#endif
116
  char *argsUdfd[] = {path, "-c", configDir, NULL};
1,904✔
117
  options.args = argsUdfd;
1,904✔
118
  options.file = path;
1,904✔
119

120
  options.exit_cb = udfUdfdExit;
1,904✔
121

122
  TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
1,904!
123

124
  uv_stdio_container_t child_stdio[3];
125
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
1,904✔
126
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
1,904✔
127
  child_stdio[1].flags = UV_IGNORE;
1,904✔
128
  child_stdio[2].flags = UV_INHERIT_FD;
1,904✔
129
  child_stdio[2].data.fd = 2;
1,904✔
130
  options.stdio_count = 3;
1,904✔
131
  options.stdio = child_stdio;
1,904✔
132

133
  options.flags = UV_PROCESS_DETACHED;
1,904✔
134

135
  char dnodeIdEnvItem[32] = {0};
1,904✔
136
  char thrdPoolSizeEnvItem[32] = {0};
1,904✔
137
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
1,904✔
138

139
  float   numCpuCores = 4;
1,904✔
140
  int32_t code = taosGetCpuCores(&numCpuCores, false);
1,904✔
141
  if (code != 0) {
1,904!
142
    fnError("failed to get cpu cores, code:0x%x", code);
×
143
  }
144
  numCpuCores = TMAX(numCpuCores, 2);
1,904!
145
  snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);
1,904✔
146

147
  char    pathTaosdLdLib[512] = {0};
1,904✔
148
  size_t  taosdLdLibPathLen = sizeof(pathTaosdLdLib);
1,904✔
149
  int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
1,904✔
150
  if (ret != UV_ENOBUFS) {
1,904!
151
    taosdLdLibPathLen = strlen(pathTaosdLdLib);
1,904✔
152
  }
153

154
  char   udfdPathLdLib[1024] = {0};
1,904✔
155
  size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
1,904✔
156
  tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));
1,904✔
157

158
  udfdPathLdLib[udfdLdLibPathLen] = ':';
1,904✔
159
  tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
1,904✔
160
  if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
1,904!
161
    fnInfo("[UDFD]udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
1,904!
162
  } else {
163
    fnError("[UDFD]can not set correct udfd LD_LIBRARY_PATH");
×
164
  }
165
  char ldLibPathEnvItem[1024 + 32] = {0};
1,904✔
166
  snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
1,904✔
167

168
  char *taosFqdnEnvItem = NULL;
1,904✔
169
  char *taosFqdn = getenv("TAOS_FQDN");
1,904✔
170
  if (taosFqdn != NULL) {
1,904!
171
    int32_t subLen = strlen(taosFqdn);
×
172
    int32_t len = strlen("TAOS_FQDN=") + subLen + 1;
×
173
    taosFqdnEnvItem = taosMemoryMalloc(len);
×
174
    if (taosFqdnEnvItem != NULL) {
×
175
      tstrncpy(taosFqdnEnvItem, "TAOS_FQDN=", len);
×
176
      TAOS_STRNCAT(taosFqdnEnvItem, taosFqdn, subLen);
×
177
      fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
×
178
    } else {
179
      fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
×
180
      return terrno;
×
181
    }
182
  }
183

184
  char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};
1,904✔
185

186
  char **envUdfdWithPEnv = NULL;
1,904✔
187
  if (environ != NULL) {
1,904!
188
    int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
1,904✔
189
    int32_t numEnviron = 0;
1,904✔
190
    while (environ[numEnviron] != NULL) {
32,532✔
191
      numEnviron++;
30,628✔
192
    }
193

194
    envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
1,904!
195
    if (envUdfdWithPEnv == NULL) {
1,904!
196
      err = TSDB_CODE_OUT_OF_MEMORY;
×
197
      goto _OVER;
×
198
    }
199

200
    for (int32_t i = 0; i < numEnviron; i++) {
32,532✔
201
      int32_t len = strlen(environ[i]) + 1;
30,628✔
202
      envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
30,628!
203
      if (envUdfdWithPEnv[i] == NULL) {
30,628!
204
        err = TSDB_CODE_OUT_OF_MEMORY;
×
205
        goto _OVER;
×
206
      }
207

208
      tstrncpy(envUdfdWithPEnv[i], environ[i], len);
30,628✔
209
    }
210

211
    for (int32_t i = 0; i < lenEnvUdfd; i++) {
11,424✔
212
      if (envUdfd[i] != NULL) {
9,520✔
213
        int32_t len = strlen(envUdfd[i]) + 1;
5,712✔
214
        envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
5,712!
215
        if (envUdfdWithPEnv[numEnviron + i] == NULL) {
5,712!
216
          err = TSDB_CODE_OUT_OF_MEMORY;
×
217
          goto _OVER;
×
218
        }
219

220
        tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
5,712✔
221
      }
222
    }
223
    envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
1,904✔
224

225
    options.env = envUdfdWithPEnv;
1,904✔
226
  } else {
227
    options.env = envUdfd;
×
228
  }
229

230
  err = uv_spawn(&pData->loop, &pData->process, &options);
1,904✔
231
  pData->process.data = (void *)pData;
1,904✔
232

233
#ifdef WINDOWS
234
  // End udfd.exe by Job.
235
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
236
  pData->jobHandle = CreateJobObject(NULL, NULL);
237
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
238
  if (!add_job_ok) {
239
    fnError("Assign udfd to job failed.");
240
  } else {
241
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
242
    memset(&limit_info, 0x0, sizeof(limit_info));
243
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
244
    bool set_auto_kill_ok =
245
        SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
246
    if (!set_auto_kill_ok) {
247
      fnError("Set job auto kill udfd failed.");
248
    }
249
  }
250
#endif
251

252
  if (err != 0) {
1,904✔
253
    fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
16!
254
  } else {
255
    fnInfo("udfd is initialized");
1,888!
256
  }
257

258
_OVER:
×
259
  if (taosFqdnEnvItem) {
1,904!
260
    taosMemoryFree(taosFqdnEnvItem);
×
261
  }
262

263
  if (envUdfdWithPEnv != NULL) {
1,904!
264
    int32_t i = 0;
1,904✔
265
    while (envUdfdWithPEnv[i] != NULL) {
38,244✔
266
      taosMemoryFree(envUdfdWithPEnv[i]);
36,340!
267
      i++;
36,340✔
268
    }
269
    taosMemoryFree(envUdfdWithPEnv);
1,904!
270
  }
271

272
  return err;
1,904✔
273
}
274

275
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) {
9,327✔
276
  TAOS_UDF_CHECK_PTR_RVOID(handle);
18,654!
277
  if (!uv_is_closing(handle)) {
9,327!
278
    uv_close(handle, NULL);
9,327✔
279
  }
280
}
281

282
static void udfUdfdStopAsyncCb(uv_async_t *async) {
1,871✔
283
  TAOS_UDF_CHECK_PTR_RVOID(async);
3,742!
284
  SUdfdData *pData = async->data;
1,871✔
285
  uv_stop(&pData->loop);
1,871✔
286
}
287

288
static void udfWatchUdfd(void *args) {
1,879✔
289
  TAOS_UDF_CHECK_PTR_RVOID(args);
3,758!
290
  SUdfdData *pData = args;
1,879✔
291
  TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
1,879!
292
  TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb));
1,879!
293
  pData->stopAsync.data = pData;
1,879✔
294
  TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData));
1,879✔
295
  atomic_store_32(&pData->spawnErr, 0);
1,871✔
296
  (void)uv_barrier_wait(&pData->barrier);
1,871✔
297
  int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
1,871✔
298
  fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
1,871!
299

300
  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
1,871✔
301
  num = uv_run(&pData->loop, UV_RUN_DEFAULT);
1,871✔
302
  fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
1,871!
303
  if (uv_loop_close(&pData->loop) != 0) {
1,871!
304
    fnError("udfd loop close failed, lino:%d", __LINE__);
×
305
  }
306
  return;
1,871✔
307

308
_exit:
8✔
309
  if (terrno != 0) {
8!
310
    (void)uv_barrier_wait(&pData->barrier);
8✔
311
    atomic_store_32(&pData->spawnErr, terrno);
8✔
312
    if (uv_loop_close(&pData->loop) != 0) {
8!
313
      fnError("udfd loop close failed, lino:%d", __LINE__);
8!
314
    }
315
    fnError("udfd thread exit with code:%d lino:%d", terrno, terrln);
8!
316
    terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE;
8✔
317
  }
318
  return;
8✔
319
}
320

321
int32_t udfStartUdfd(int32_t startDnodeId) {
1,879✔
322
  int32_t code = 0, lino = 0;
1,879✔
323
  if (!tsStartUdfd) {
1,879!
324
    fnInfo("start udfd is disabled.") return 0;
×
325
  }
326
  SUdfdData *pData = &udfdGlobal;
1,879✔
327
  if (pData->startCalled) {
1,879!
328
    fnInfo("dnode start udfd already called");
×
329
    return 0;
×
330
  }
331
  pData->startCalled = true;
1,879✔
332
  char dnodeId[8] = {0};
1,879✔
333
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
1,879✔
334
  TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
1,879!
335
  pData->dnodeId = startDnodeId;
1,879✔
336

337
  TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
1,879!
338
  TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, udfWatchUdfd, pData), &lino, _exit);
1,879!
339
  (void)uv_barrier_wait(&pData->barrier);
1,879✔
340
  int32_t err = atomic_load_32(&pData->spawnErr);
1,879✔
341
  if (err != 0) {
1,879✔
342
    uv_barrier_destroy(&pData->barrier);
8✔
343
    if (uv_async_send(&pData->stopAsync) != 0) {
8!
344
      fnError("start udfd: failed to send stop async");
×
345
    }
346
    if (uv_thread_join(&pData->thread) != 0) {
8!
347
      fnError("start udfd: failed to join udfd thread");
×
348
    }
349
    pData->needCleanUp = false;
8✔
350
    fnInfo("udfd is cleaned up after spawn err");
8!
351
    TAOS_CHECK_GOTO(err, &lino, _exit);
8!
352
  } else {
353
    pData->needCleanUp = true;
1,871✔
354
  }
355
_exit:
1,879✔
356
  if (code != 0) {
1,879✔
357
    fnError("udfd start failed with code:%d, lino:%d", code, lino);
8!
358
  }
359
  return code;
1,879✔
360
}
361

362
void udfStopUdfd() {
1,879✔
363
  SUdfdData *pData = &udfdGlobal;
1,879✔
364
  fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
1,879!
365
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
1,879!
366
    return;
8✔
367
  }
368
  atomic_store_32(&pData->stopCalled, 1);
1,871✔
369
  pData->needCleanUp = false;
1,871✔
370
  uv_barrier_destroy(&pData->barrier);
1,871✔
371
  if (uv_async_send(&pData->stopAsync) != 0) {
1,871!
372
    fnError("stop udfd: failed to send stop async");
×
373
  }
374
  if (uv_thread_join(&pData->thread) != 0) {
1,871!
375
    fnError("stop udfd: failed to join udfd thread");
×
376
  }
377

378
#ifdef WINDOWS
379
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
380
#endif
381
  fnInfo("udfd is cleaned up");
1,871!
382
  return;
1,871✔
383
}
384

385
/*
386
int32_t udfGetUdfdPid(int32_t* pUdfdPid) {
387
  SUdfdData *pData = &udfdGlobal;
388
  if (pData->spawnErr) {
389
    return pData->spawnErr;
390
  }
391
  uv_pid_t pid = uv_process_get_pid(&pData->process);
392
  if (pUdfdPid) {
393
    *pUdfdPid = (int32_t)pid;
394
  }
395
  return TSDB_CODE_SUCCESS;
396
}
397
*/
398

399
//==============================================================================================
400
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
401
 * The QUEUE is copied from queue.h under libuv
402
 * */
403

404
typedef void *QUEUE[2];
405

406
/* Private macros. */
407
#define QUEUE_NEXT(q)      (*(QUEUE **)&((*(q))[0]))
408
#define QUEUE_PREV(q)      (*(QUEUE **)&((*(q))[1]))
409
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
410
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
411

412
/* Public macros. */
413
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr) - offsetof(type, field)))
414

415
/* Important note: mutating the list while QUEUE_FOREACH is
416
 * iterating over its elements results in undefined behavior.
417
 */
418
#define QUEUE_FOREACH(q, h) for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
419

420
#define QUEUE_EMPTY(q) ((const QUEUE *)(q) == (const QUEUE *)QUEUE_NEXT(q))
421

422
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
423

424
#define QUEUE_INIT(q)    \
425
  do {                   \
426
    QUEUE_NEXT(q) = (q); \
427
    QUEUE_PREV(q) = (q); \
428
  } while (0)
429

430
#define QUEUE_ADD(h, n)                 \
431
  do {                                  \
432
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
433
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
434
    QUEUE_PREV(h) = QUEUE_PREV(n);      \
435
    QUEUE_PREV_NEXT(h) = (h);           \
436
  } while (0)
437

438
#define QUEUE_SPLIT(h, q, n)       \
439
  do {                             \
440
    QUEUE_PREV(n) = QUEUE_PREV(h); \
441
    QUEUE_PREV_NEXT(n) = (n);      \
442
    QUEUE_NEXT(n) = (q);           \
443
    QUEUE_PREV(h) = QUEUE_PREV(q); \
444
    QUEUE_PREV_NEXT(h) = (h);      \
445
    QUEUE_PREV(q) = (n);           \
446
  } while (0)
447

448
#define QUEUE_MOVE(h, n)        \
449
  do {                          \
450
    if (QUEUE_EMPTY(h))         \
451
      QUEUE_INIT(n);            \
452
    else {                      \
453
      QUEUE *q = QUEUE_HEAD(h); \
454
      QUEUE_SPLIT(h, q, n);     \
455
    }                           \
456
  } while (0)
457

458
#define QUEUE_INSERT_HEAD(h, q)    \
459
  do {                             \
460
    QUEUE_NEXT(q) = QUEUE_NEXT(h); \
461
    QUEUE_PREV(q) = (h);           \
462
    QUEUE_NEXT_PREV(q) = (q);      \
463
    QUEUE_NEXT(h) = (q);           \
464
  } while (0)
465

466
#define QUEUE_INSERT_TAIL(h, q)    \
467
  do {                             \
468
    QUEUE_NEXT(q) = (h);           \
469
    QUEUE_PREV(q) = QUEUE_PREV(h); \
470
    QUEUE_PREV_NEXT(q) = (q);      \
471
    QUEUE_PREV(h) = (q);           \
472
  } while (0)
473

474
#define QUEUE_REMOVE(q)                 \
475
  do {                                  \
476
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
477
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
478
  } while (0)
479

480
enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 };
481

482
int64_t gUdfTaskSeqNum = 0;
483
typedef struct SUdfcFuncStub {
484
  char           udfName[TSDB_FUNC_NAME_LEN + 1];
485
  UdfcFuncHandle handle;
486
  int32_t        refCount;
487
  int64_t        createTime;
488
} SUdfcFuncStub;
489

490
typedef struct SUdfcProxy {
491
  char         udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
492
  uv_barrier_t initBarrier;
493

494
  uv_loop_t   uvLoop;
495
  uv_thread_t loopThread;
496
  uv_async_t  loopTaskAync;
497

498
  uv_async_t loopStopAsync;
499

500
  uv_mutex_t taskQueueMutex;
501
  int8_t     udfcState;
502
  QUEUE      taskQueue;
503
  QUEUE      uvProcTaskQueue;
504

505
  uv_mutex_t udfStubsMutex;
506
  SArray    *udfStubs;         // SUdfcFuncStub
507
  SArray    *expiredUdfStubs;  // SUdfcFuncStub
508

509
  uv_mutex_t udfcUvMutex;
510
  int8_t     initialized;
511
} SUdfcProxy;
512

513
SUdfcProxy gUdfcProxy = {0};
514

515
typedef struct SUdfcUvSession {
516
  SUdfcProxy *udfc;
517
  int64_t     severHandle;
518
  uv_pipe_t  *udfUvPipe;
519

520
  int8_t  outputType;
521
  int32_t bytes;
522
  int32_t bufSize;
523

524
  char udfName[TSDB_FUNC_NAME_LEN + 1];
525
} SUdfcUvSession;
526

527
typedef struct SClientUvTaskNode {
528
  SUdfcProxy *udfc;
529
  int8_t      type;
530
  int32_t     errCode;
531

532
  uv_pipe_t *pipe;
533

534
  int64_t  seqNum;
535
  uv_buf_t reqBuf;
536

537
  uv_sem_t taskSem;
538
  uv_buf_t rspBuf;
539

540
  QUEUE recvTaskQueue;
541
  QUEUE procTaskQueue;
542
  QUEUE connTaskQueue;
543
} SClientUvTaskNode;
544

545
typedef struct SClientUdfTask {
546
  int8_t type;
547

548
  SUdfcUvSession *session;
549

550
  union {
551
    struct {
552
      SUdfSetupRequest  req;
553
      SUdfSetupResponse rsp;
554
    } _setup;
555
    struct {
556
      SUdfCallRequest  req;
557
      SUdfCallResponse rsp;
558
    } _call;
559
    struct {
560
      SUdfTeardownRequest  req;
561
      SUdfTeardownResponse rsp;
562
    } _teardown;
563
  };
564

565
} SClientUdfTask;
566

567
typedef struct SClientConnBuf {
568
  char   *buf;
569
  int32_t len;
570
  int32_t cap;
571
  int32_t total;
572
} SClientConnBuf;
573

574
typedef struct SClientUvConn {
575
  uv_pipe_t      *pipe;
576
  QUEUE           taskQueue;
577
  SClientConnBuf  readBuf;
578
  SUdfcUvSession *session;
579
} SClientUvConn;
580

581
enum {
582
  UDFC_STATE_INITAL = 0,  // initial state
583
  UDFC_STATE_STARTNG,     // starting after udfcOpen
584
  UDFC_STATE_READY,       // started and begin to receive quests
585
  UDFC_STATE_STOPPING,    // stopping after udfcClose
586
};
587

588
void    getUdfdPipeName(char *pipeName, int32_t size);
589
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
590
void   *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request);
591
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state);
592
void   *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state);
593
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call);
594
void   *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call);
595
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown);
596
void   *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown);
597
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request);
598
void   *decodeUdfRequest(const void *buf, SUdfRequest *request);
599
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp);
600
void   *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp);
601
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp);
602
void   *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp);
603
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp);
604
void   *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse);
605
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp);
606
void   *decodeUdfResponse(const void *buf, SUdfResponse *rsp);
607
void    freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
608
void    freeUdfColumn(SUdfColumn *col);
609
void    freeUdfDataDataBlock(SUdfDataBlock *block);
610
void    freeUdfInterBuf(SUdfInterBuf *buf);
611
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
612
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
613
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
614
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);
615

616
void getUdfdPipeName(char *pipeName, int32_t size) {
3,853✔
617
  char    dnodeId[8] = {0};
3,853✔
618
  size_t  dnodeIdSize = sizeof(dnodeId);
3,853✔
619
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
3,853✔
620
  if (err != 0) {
3,853!
621
    fnError("failed to get dnodeId from env since %s", uv_err_name(err));
×
622
    dnodeId[0] = '1';
×
623
  }
624
#ifdef _WIN32
625
  snprintf(pipeName, size, "%s.%x.%s", UDF_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir)),
626
           dnodeId);
627
#else
628
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
3,853✔
629
#endif
630
  fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName);
3,853!
631
}
3,853✔
632

633
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
314✔
634
  int32_t len = 0;
314✔
635
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
314✔
636
  return len;
314✔
637
}
638

639
void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request) {
157✔
640
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
157✔
641
  return (void *)buf;
157✔
642
}
643

644
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state) {
418✔
645
  int32_t len = 0;
418✔
646
  len += taosEncodeFixedI8(buf, state->numOfResult);
418✔
647
  len += taosEncodeFixedI32(buf, state->bufLen);
418✔
648
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
418✔
649
  return len;
418✔
650
}
651

652
void *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state) {
209✔
653
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
209✔
654
  buf = taosDecodeFixedI32(buf, &state->bufLen);
209!
655
  buf = taosDecodeBinary(buf, (void **)&state->buf, state->bufLen);
209!
656
  return (void *)buf;
209✔
657
}
658

659
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
53,361✔
660
  int32_t len = 0;
53,361✔
661
  len += taosEncodeFixedI64(buf, call->udfHandle);
53,361✔
662
  len += taosEncodeFixedI8(buf, call->callType);
53,361✔
663
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
53,361✔
664
    len += tEncodeDataBlock(buf, &call->block);
53,107✔
665
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
254✔
666
    len += taosEncodeFixedI8(buf, call->initFirst);
156✔
667
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
176✔
668
    len += tEncodeDataBlock(buf, &call->block);
92✔
669
    len += encodeUdfInterBuf(buf, &call->interBuf);
92✔
670
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
84!
671
    len += encodeUdfInterBuf(buf, &call->interBuf);
×
672
    len += encodeUdfInterBuf(buf, &call->interBuf2);
×
673
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
84✔
674
    len += encodeUdfInterBuf(buf, &call->interBuf);
78✔
675
  }
676
  return len;
53,131✔
677
}
678

679
void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) {
26,731✔
680
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
26,731!
681
  buf = taosDecodeFixedI8(buf, &call->callType);
26,731✔
682
  switch (call->callType) {
26,731!
683
    case TSDB_UDF_CALL_SCALA_PROC:
26,609✔
684
      buf = tDecodeDataBlock(buf, &call->block);
26,609✔
685
      break;
26,576✔
686
    case TSDB_UDF_CALL_AGG_INIT:
39✔
687
      buf = taosDecodeFixedI8(buf, &call->initFirst);
39✔
688
      break;
39✔
689
    case TSDB_UDF_CALL_AGG_PROC:
46✔
690
      buf = tDecodeDataBlock(buf, &call->block);
46✔
691
      buf = decodeUdfInterBuf(buf, &call->interBuf);
46✔
692
      break;
46✔
693
    case TSDB_UDF_CALL_AGG_MERGE:
×
694
      buf = decodeUdfInterBuf(buf, &call->interBuf);
×
695
      buf = decodeUdfInterBuf(buf, &call->interBuf2);
×
696
      break;
×
697
    case TSDB_UDF_CALL_AGG_FIN:
39✔
698
      buf = decodeUdfInterBuf(buf, &call->interBuf);
39✔
699
      break;
39✔
700
  }
701
  return (void *)buf;
26,635✔
702
}
703

704
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
266✔
705
  int32_t len = 0;
266✔
706
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
266✔
707
  return len;
266✔
708
}
709

710
void *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown) {
133✔
711
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
133!
712
  return (void *)buf;
133✔
713
}
714

715
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request) {
53,889✔
716
  int32_t len = 0;
53,889✔
717
  if (buf == NULL) {
53,889✔
718
    len += sizeof(request->msgLen);
26,962✔
719
  } else {
720
    *(int32_t *)(*buf) = request->msgLen;
26,927✔
721
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
26,927✔
722
  }
723
  len += taosEncodeFixedI64(buf, request->seqNum);
53,889✔
724
  len += taosEncodeFixedI8(buf, request->type);
53,889✔
725
  if (request->type == UDF_TASK_SETUP) {
53,889✔
726
    len += encodeUdfSetupRequest(buf, &request->setup);
314✔
727
  } else if (request->type == UDF_TASK_CALL) {
53,575✔
728
    len += encodeUdfCallRequest(buf, &request->call);
53,418✔
729
  } else if (request->type == UDF_TASK_TEARDOWN) {
157!
730
    len += encodeUdfTeardownRequest(buf, &request->teardown);
266✔
731
  }
732
  return len;
53,618✔
733
}
734

735
void *decodeUdfRequest(const void *buf, SUdfRequest *request) {
27,018✔
736
  request->msgLen = *(int32_t *)(buf);
27,018✔
737
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
27,018✔
738

739
  buf = taosDecodeFixedI64(buf, &request->seqNum);
27,018!
740
  buf = taosDecodeFixedI8(buf, &request->type);
27,018✔
741

742
  if (request->type == UDF_TASK_SETUP) {
27,018✔
743
    buf = decodeUdfSetupRequest(buf, &request->setup);
157✔
744
  } else if (request->type == UDF_TASK_CALL) {
26,861✔
745
    buf = decodeUdfCallRequest(buf, &request->call);
26,735✔
746
  } else if (request->type == UDF_TASK_TEARDOWN) {
126!
747
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
133✔
748
  }
749
  return (void *)buf;
26,954✔
750
}
751

752
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
314✔
753
  int32_t len = 0;
314✔
754
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
314✔
755
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
314✔
756
  len += taosEncodeFixedI32(buf, setupRsp->bytes);
314✔
757
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
314✔
758
  return len;
314✔
759
}
760

761
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
157✔
762
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
157!
763
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
157✔
764
  buf = taosDecodeFixedI32(buf, &setupRsp->bytes);
157!
765
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
157!
766
  return (void *)buf;
157✔
767
}
768

769
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
53,191✔
770
  int32_t len = 0;
53,191✔
771
  len += taosEncodeFixedI8(buf, callRsp->callType);
53,191✔
772
  switch (callRsp->callType) {
53,191!
773
    case TSDB_UDF_CALL_SCALA_PROC:
53,006✔
774
      len += tEncodeDataBlock(buf, &callRsp->resultData);
53,006✔
775
      break;
52,848✔
776
    case TSDB_UDF_CALL_AGG_INIT:
78✔
777
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
78✔
778
      break;
78✔
779
    case TSDB_UDF_CALL_AGG_PROC:
92✔
780
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
92✔
781
      break;
92✔
782
    case TSDB_UDF_CALL_AGG_MERGE:
×
783
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
×
784
      break;
×
785
    case TSDB_UDF_CALL_AGG_FIN:
78✔
786
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
78✔
787
      break;
78✔
788
  }
789
  return len;
53,033✔
790
}
791

792
void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) {
26,442✔
793
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
26,442✔
794
  switch (callRsp->callType) {
26,442!
795
    case TSDB_UDF_CALL_SCALA_PROC:
26,399✔
796
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
26,399✔
797
      break;
26,444✔
798
    case TSDB_UDF_CALL_AGG_INIT:
39✔
799
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
39✔
800
      break;
39✔
801
    case TSDB_UDF_CALL_AGG_PROC:
46✔
802
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
46✔
803
      break;
46✔
804
    case TSDB_UDF_CALL_AGG_MERGE:
×
805
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
×
806
      break;
×
807
    case TSDB_UDF_CALL_AGG_FIN:
39✔
808
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
39✔
809
      break;
39✔
810
  }
811
  return (void *)buf;
26,487✔
812
}
813

814
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp) { return 0; }
266✔
815

816
void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse) { return (void *)buf; }
133✔
817

818
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) {
53,826✔
819
  int32_t len = 0;
53,826✔
820
  len += sizeof(rsp->msgLen);
53,826✔
821
  if (buf != NULL) {
53,826✔
822
    *(int32_t *)(*buf) = rsp->msgLen;
26,995✔
823
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
26,995✔
824
  }
825

826
  len += sizeof(rsp->seqNum);
53,826✔
827
  if (buf != NULL) {
53,826✔
828
    *(int64_t *)(*buf) = rsp->seqNum;
26,985✔
829
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
26,985✔
830
  }
831

832
  len += taosEncodeFixedI64(buf, rsp->seqNum);
53,826✔
833
  len += taosEncodeFixedI8(buf, rsp->type);
53,826✔
834
  len += taosEncodeFixedI32(buf, rsp->code);
53,826✔
835

836
  switch (rsp->type) {
53,826!
837
    case UDF_TASK_SETUP:
314✔
838
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
314✔
839
      break;
314✔
840
    case UDF_TASK_CALL:
53,306✔
841
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
53,306✔
842
      break;
53,050✔
843
    case UDF_TASK_TEARDOWN:
266✔
844
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
266✔
845
      break;
266✔
846
    default:
×
847
      fnError("encode udf response, invalid udf response type %d", rsp->type);
×
848
      break;
×
849
  }
850
  return len;
53,630✔
851
}
852

853
void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) {
26,733✔
854
  rsp->msgLen = *(int32_t *)(buf);
26,733✔
855
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
26,733✔
856
  rsp->seqNum = *(int64_t *)(buf);
26,733✔
857
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
26,733✔
858
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
26,733!
859
  buf = taosDecodeFixedI8(buf, &rsp->type);
26,733✔
860
  buf = taosDecodeFixedI32(buf, &rsp->code);
26,733!
861

862
  switch (rsp->type) {
26,733!
863
    case UDF_TASK_SETUP:
157✔
864
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
157✔
865
      break;
157✔
866
    case UDF_TASK_CALL:
26,687✔
867
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
26,687✔
868
      break;
26,370✔
869
    case UDF_TASK_TEARDOWN:
133✔
870
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
133✔
871
      break;
133✔
872
    default:
×
873
      rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR;
×
874
      fnError("decode udf response, invalid udf response type %d", rsp->type);
×
875
      break;
×
876
  }
877
  if (buf == NULL) {
26,660!
878
    rsp->code = terrno;
×
879
    fnError("decode udf response failed, code:0x%x", rsp->code);
×
880
  }
881
  return (void *)buf;
26,642✔
882
}
883

884
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
53,234✔
885
  TAOS_UDF_CHECK_PTR_RVOID(data, meta);
159,570!
886
  if (IS_VAR_DATA_TYPE(meta->type)) {
53,234!
887
    taosMemoryFree(data->varLenCol.varOffsets);
10!
888
    data->varLenCol.varOffsets = NULL;
2✔
889
    taosMemoryFree(data->varLenCol.payload);
2!
890
    data->varLenCol.payload = NULL;
2✔
891
  } else {
892
    taosMemoryFree(data->fixLenCol.nullBitmap);
53,224!
893
    data->fixLenCol.nullBitmap = NULL;
52,917✔
894
    taosMemoryFree(data->fixLenCol.data);
52,917!
895
    data->fixLenCol.data = NULL;
53,201✔
896
  }
897
}
898

899
void freeUdfColumn(SUdfColumn *col) {
53,233✔
900
  TAOS_UDF_CHECK_PTR_RVOID(col);
106,475!
901
  freeUdfColumnData(&col->colData, &col->colMeta);
53,233✔
902
}
903

904
void freeUdfDataDataBlock(SUdfDataBlock *block) {
26,286✔
905
  TAOS_UDF_CHECK_PTR_RVOID(block);
52,662!
906
  for (int32_t i = 0; i < block->numOfCols; ++i) {
52,955✔
907
    freeUdfColumn(block->udfCols[i]);
26,497✔
908
    taosMemoryFree(block->udfCols[i]);
26,632!
909
    block->udfCols[i] = NULL;
26,669✔
910
  }
911
  taosMemoryFree(block->udfCols);
26,458!
912
  block->udfCols = NULL;
26,618✔
913
}
914

915
void freeUdfInterBuf(SUdfInterBuf *buf) {
333✔
916
  TAOS_UDF_CHECK_PTR_RVOID(buf);
666!
917
  taosMemoryFree(buf->buf);
333!
918
  buf->buf = NULL;
333✔
919
}
920

921
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
26,450✔
922
  TAOS_UDF_CHECK_PTR_RCODE(block, udfBlock);
79,478!
923
  int32_t code = blockDataCheck(block);
26,450✔
924
  if (code != TSDB_CODE_SUCCESS) {
26,550!
925
    return code;
×
926
  }
927
  udfBlock->numOfRows = block->info.rows;
26,550✔
928
  udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
26,550✔
929
  udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
26,573!
930
  if ((udfBlock->udfCols) == NULL) {
26,641!
931
    return terrno;
×
932
  }
933
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
53,302✔
934
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
26,598!
935
    if (udfBlock->udfCols[i] == NULL) {
26,593!
936
      return terrno;
×
937
    }
938
    SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
26,593✔
939
    SUdfColumn      *udfCol = udfBlock->udfCols[i];
26,568✔
940
    udfCol->colMeta.type = col->info.type;
26,568✔
941
    udfCol->colMeta.bytes = col->info.bytes;
26,568✔
942
    udfCol->colMeta.scale = col->info.scale;
26,568✔
943
    udfCol->colMeta.precision = col->info.precision;
26,568✔
944
    udfCol->colData.numOfRows = udfBlock->numOfRows;
26,568✔
945
    udfCol->hasNull = col->hasNull;
26,568✔
946
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
26,568!
947
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
×
948
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
×
949
      if (udfCol->colData.varLenCol.varOffsets == NULL) {
2!
950
        return terrno;
×
951
      }
952
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
2✔
953
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
2✔
954
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
2!
955
      if (udfCol->colData.varLenCol.payload == NULL) {
2!
956
        return terrno;
×
957
      }
958
      if (col->reassigned) {
2!
959
        for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
×
960
          char   *pColData = col->pData + col->varmeta.offset[row];
×
961
          int32_t colSize = 0;
×
962
          if (col->info.type == TSDB_DATA_TYPE_JSON) {
×
963
            colSize = getJsonValueLen(pColData);
×
964
          } else {
965
            colSize = varDataTLen(pColData);
×
966
          }
967
          memcpy(udfCol->colData.varLenCol.payload, pColData, colSize);
×
968
          udfCol->colData.varLenCol.payload += colSize;
×
969
        }
970
      } else {
971
        memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
2✔
972
      }
973
    } else {
974
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
26,630✔
975
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
26,630✔
976
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
26,630!
977
      if (udfCol->colData.fixLenCol.nullBitmap == NULL) {
26,639!
978
        return terrno;
×
979
      }
980
      char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
26,639✔
981
      memcpy(bitmap, col->nullbitmap, bitmapLen);
26,639✔
982
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
26,639✔
983
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
26,651✔
984
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
26,651!
985
      if (NULL == udfCol->colData.fixLenCol.data) {
26,659!
986
        return terrno;
×
987
      }
988
      char *data = udfCol->colData.fixLenCol.data;
26,659✔
989
      memcpy(data, col->pData, dataLen);
26,659✔
990
    }
991
  }
992
  return TSDB_CODE_SUCCESS;
26,704✔
993
}
994

995
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
26,288✔
996
  TAOS_UDF_CHECK_PTR_RCODE(udfCol, block);
79,045!
997
  int32_t         code = 0, lino = 0;
26,288✔
998
  SUdfColumnMeta *meta = &udfCol->colMeta;
26,288✔
999

1000
  SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
26,288✔
1001
  code = blockDataAppendColInfo(block, &colInfoData);
26,315✔
1002
  TAOS_CHECK_GOTO(code, &lino, _exit);
26,542!
1003

1004
  code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
26,542✔
1005
  TAOS_CHECK_GOTO(code, &lino, _exit);
26,443!
1006

1007
  SColumnInfoData *col = NULL;
26,443✔
1008
  code = bdGetColumnInfoData(block, 0, &col);
26,443✔
1009
  TAOS_CHECK_GOTO(code, &lino, _exit);
26,573!
1010

1011
  for (int32_t i = 0; i < udfCol->colData.numOfRows; ++i) {
5,972,095✔
1012
    if (udfColDataIsNull(udfCol, i)) {
5,943,973✔
1013
      colDataSetNULL(col, i);
34!
1014
    } else {
1015
      char *data = udfColDataGetData(udfCol, i);
5,943,939✔
1016
      code = colDataSetVal(col, i, data, false);
5,943,939✔
1017
      TAOS_CHECK_GOTO(code, &lino, _exit);
5,945,488!
1018
    }
1019
  }
1020
  block->info.rows = udfCol->colData.numOfRows;
28,122✔
1021

1022
  code = blockDataCheck(block);
28,122✔
1023
  TAOS_CHECK_GOTO(code, &lino, _exit);
26,586!
1024
_exit:
26,586✔
1025
  if (code != 0) {
26,586!
1026
    fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino);
×
1027
  }
1028
  return TSDB_CODE_SUCCESS;
26,576✔
1029
}
1030

1031
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
26,612✔
1032
  TAOS_UDF_CHECK_PTR_RCODE(input, output);
79,835!
1033
  int32_t code = 0, lino = 0;
26,612✔
1034
  int32_t numOfRows = 0;
26,612✔
1035
  for (int32_t i = 0; i < numOfCols; ++i) {
53,245✔
1036
    numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
26,633✔
1037
  }
1038

1039
  // create the basic block info structure
1040
  for (int32_t i = 0; i < numOfCols; ++i) {
53,243✔
1041
    SColumnInfoData *pInfo = input[i].columnData;
26,633✔
1042
    SColumnInfoData  d = {0};
26,633✔
1043
    d.info = pInfo->info;
26,633✔
1044

1045
    TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit);
26,633!
1046
  }
1047

1048
  TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit);
26,610!
1049

1050
  for (int32_t i = 0; i < numOfCols; ++i) {
53,237✔
1051
    SColumnInfoData *pDest = taosArrayGet(output->pDataBlock, i);
26,624✔
1052

1053
    SColumnInfoData *pColInfoData = input[i].columnData;
26,622✔
1054
    TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit);
26,622!
1055

1056
    if (input[i].numOfRows < numOfRows) {
26,627✔
1057
      int32_t startRow = input[i].numOfRows;
1✔
1058
      int32_t expandRows = numOfRows - startRow;
1✔
1059
      bool    isNull = colDataIsNull_s(pColInfoData, (input + i)->numOfRows - 1);
1!
1060
      if (isNull) {
1!
1061
        colDataSetNNULL(pDest, startRow, expandRows);
×
1062
      } else {
1063
        char *src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
1!
1064
        for (int32_t j = 0; j < expandRows; ++j) {
7✔
1065
          TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow + j, src, false), &lino, _exit);
6!
1066
        }
1067
      }
1068
    }
1069
  }
1070

1071
  output->info.rows = numOfRows;
26,613✔
1072
_exit:
26,613✔
1073
  if (code != 0) {
26,613!
1074
    fnError("failed to convert scalar param to data block, code:%d, line:%d", code, lino);
×
1075
  }
1076
  return code;
26,602✔
1077
}
1078

1079
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
26,569✔
1080
  TAOS_UDF_CHECK_PTR_RCODE(input, output);
79,715!
1081
  if (taosArrayGetSize(input->pDataBlock) != 1) {
26,569!
1082
    fnError("scalar function only support one column");
×
1083
    return 0;
×
1084
  }
1085
  output->numOfRows = input->info.rows;
26,591✔
1086

1087
  output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
26,591✔
1088
  if (output->columnData == NULL) {
26,595!
1089
    return terrno;
×
1090
  }
1091
  memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
26,595✔
1092
  output->colAlloced = true;
26,564✔
1093

1094
  return 0;
26,564✔
1095
}
1096

1097
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
1098
// memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
1099
typedef struct SUdfAggRes {
1100
  int8_t  finalResNum;
1101
  int8_t  interResNum;
1102
  int32_t interResBufLen;
1103
  char   *finalResBuf;
1104
  char   *interResBuf;
1105
} SUdfAggRes;
1106

1107
void    onUdfcPipeClose(uv_handle_t *handle);
1108
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
1109
void    udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
1110
bool    isUdfcUvMsgComplete(SClientConnBuf *connBuf);
1111
void    udfcUvHandleRsp(SClientUvConn *conn);
1112
void    udfcUvHandleError(SClientUvConn *conn);
1113
void    onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
1114
void    onUdfcPipeWrite(uv_write_t *write, int32_t status);
1115
void    onUdfcPipeConnect(uv_connect_t *connect, int32_t status);
1116
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
1117
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
1118
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
1119
void    udfcAsyncTaskCb(uv_async_t *async);
1120
void    cleanUpUvTasks(SUdfcProxy *udfc);
1121
void    udfStopAsyncCb(uv_async_t *async);
1122
void    constructUdfService(void *argsThread);
1123
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
1124
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
1125
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2);
1126
int32_t doTeardownUdf(UdfcFuncHandle handle);
1127

1128
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1129
                SSDataBlock *output, SUdfInterBuf *newState);
1130
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
1131
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
1132
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
1133
                          SUdfInterBuf *resultBuf);
1134
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
1135
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1136
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1137

1138
int32_t udfcOpen();
1139
int32_t udfcClose();
1140

1141
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
1142
void    releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
1143
int32_t cleanUpUdfs();
1144

1145
bool    udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
1146
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
1147
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
1148
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
1149

1150
void    cleanupNotExpiredUdfs();
1151
void    cleanupExpiredUdfs();
1152
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) {
77,343✔
1153
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
77,343✔
1154
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
77,343✔
1155
  return strcmp(stub1->udfName, stub2->udfName);
77,343✔
1156
}
1157

1158
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
26,581✔
1159
  TAOS_UDF_CHECK_PTR_RCODE(udfName, pHandle);
79,780!
1160
  int32_t code = 0, line = 0;
26,581✔
1161
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
26,581✔
1162
  SUdfcFuncStub key = {0};
26,736✔
1163
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
26,736✔
1164
  int32_t stubIndex = taosArraySearchIdx(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
26,736✔
1165
  if (stubIndex != -1) {
26,736✔
1166
    SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
26,597✔
1167
    UdfcFuncHandle handle = foundStub->handle;
26,597✔
1168
    int64_t        currUs = taosGetTimestampUs();
26,597✔
1169
    bool           expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000;
26,597✔
1170
    if (!expired) {
26,597✔
1171
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
26,579!
1172
        *pHandle = foundStub->handle;
26,579✔
1173
        ++foundStub->refCount;
26,579✔
1174
        uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
26,579✔
1175
        return 0;
26,578✔
1176
      } else {
1177
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
×
1178
               foundStub->refCount, foundStub->createTime);
1179
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
×
1180
      }
1181
    } else {
1182
      fnDebug("udf handle expired for %s, will setup udf. move it to expired list", udfName);
18!
1183
      if (taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub) == NULL) {
36!
1184
        fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1185
      } else {
1186
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
18✔
1187
        taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
18✔
1188
      }
1189
    }
1190
  }
1191
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
157✔
1192
  *pHandle = NULL;
157✔
1193
  code = doSetupUdf(udfName, pHandle);
157✔
1194
  if (code == TSDB_CODE_SUCCESS) {
157!
1195
    SUdfcFuncStub stub = {0};
157✔
1196
    tstrncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
157✔
1197
    stub.handle = *pHandle;
157✔
1198
    ++stub.refCount;
157✔
1199
    stub.createTime = taosGetTimestampUs();
157✔
1200
    uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
157✔
1201
    if (taosArrayPush(gUdfcProxy.udfStubs, &stub) == NULL) {
314!
1202
      fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1203
      uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
1204
      goto _exit;
×
1205
    } else {
1206
      taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
157✔
1207
    }
1208
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
157✔
1209
  } else {
1210
    *pHandle = NULL;
×
1211
  }
1212

1213
_exit:
157✔
1214
  return code;
157✔
1215
}
1216

1217
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
26,655✔
1218
  TAOS_UDF_CHECK_PTR_RVOID(udfName);
53,314!
1219
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
26,655✔
1220
  SUdfcFuncStub key = {0};
26,736✔
1221
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
26,736✔
1222
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
26,736✔
1223
  SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ);
26,736✔
1224
  if (!foundStub && !expiredStub) {
26,736!
1225
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
1226
    return;
×
1227
  }
1228
  if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) {
26,736!
1229
    --foundStub->refCount;
26,711✔
1230
  }
1231
  if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) {
26,736!
1232
    --expiredStub->refCount;
×
1233
  }
1234
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
26,736✔
1235
}
1236

1237
void cleanupExpiredUdfs() {
8,108✔
1238
  int32_t i = 0;
8,108✔
1239
  SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
8,108✔
1240
  if (expiredUdfStubs == NULL) {
8,108!
1241
    fnError("cleanupExpiredUdfs: failed to init array");
×
1242
    return;
×
1243
  }
1244
  while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
50,679✔
1245
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
42,571✔
1246
    if (stub->refCount == 0) {
42,571!
1247
      fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle,
×
1248
             stub->refCount);
1249
      (void)doTeardownUdf(stub->handle);
×
1250
    } else {
1251
      fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p",
42,571!
1252
             stub->udfName, stub->refCount, stub->createTime, stub->handle);
1253
      UdfcFuncHandle handle = stub->handle;
42,571✔
1254
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
42,571!
1255
        if (taosArrayPush(expiredUdfStubs, stub) == NULL) {
42,571!
1256
          fnError("cleanupExpiredUdfs: failed to push udf stub to array");
×
1257
        }
1258
      } else {
1259
        fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
×
1260
               stub->udfName, stub->refCount, stub->createTime);
1261
      }
1262
    }
1263
    ++i;
42,571✔
1264
  }
1265
  taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
8,108✔
1266
  gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
8,108✔
1267
}
1268

1269
void cleanupNotExpiredUdfs() {
8,108✔
1270
  SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
8,108✔
1271
  if (udfStubs == NULL) {
8,108!
1272
    fnError("cleanupNotExpiredUdfs: failed to init array");
×
1273
    return;
×
1274
  }
1275
  int32_t i = 0;
8,108✔
1276
  while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
32,073✔
1277
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
23,965✔
1278
    if (stub->refCount == 0) {
23,965✔
1279
      fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
133!
1280
      (void)doTeardownUdf(stub->handle);
133✔
1281
    } else {
1282
      fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
23,832!
1283
             stub->refCount, stub->createTime, stub->handle);
1284
      UdfcFuncHandle handle = stub->handle;
23,832✔
1285
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
23,832!
1286
        if (taosArrayPush(udfStubs, stub) == NULL) {
23,832!
1287
          fnError("cleanupNotExpiredUdfs: failed to push udf stub to array");
×
1288
        }
1289
      } else {
1290
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName,
×
1291
               stub->refCount, stub->createTime);
1292
      }
1293
    }
1294
    ++i;
23,965✔
1295
  }
1296
  taosArrayDestroy(gUdfcProxy.udfStubs);
8,108✔
1297
  gUdfcProxy.udfStubs = udfStubs;
8,108✔
1298
}
1299

1300
int32_t cleanUpUdfs() {
2,472,995✔
1301
  int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
2,472,995✔
1302
  if (!initialized) {
2,473,228✔
1303
    return TSDB_CODE_SUCCESS;
73,411✔
1304
  }
1305

1306
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
2,399,817✔
1307
  if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
2,400,597!
1308
      (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
2,392,489!
1309
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
2,392,489✔
1310
    return TSDB_CODE_SUCCESS;
2,392,481✔
1311
  }
1312

1313
  cleanupNotExpiredUdfs();
8,108✔
1314
  cleanupExpiredUdfs();
8,108✔
1315

1316
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
8,108✔
1317
  return 0;
8,108✔
1318
}
1319

1320
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
26,452✔
1321
  TAOS_UDF_CHECK_PTR_RCODE(udfName, input, output);
105,924!
1322
  UdfcFuncHandle handle = NULL;
26,452✔
1323
  int32_t        code = acquireUdfFuncHandle(udfName, &handle);
26,452✔
1324
  if (code != 0) {
26,611!
1325
    return code;
×
1326
  }
1327

1328
  SUdfcUvSession *session = handle;
26,611✔
1329
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
26,611✔
1330
  if (code != TSDB_CODE_SUCCESS) {
26,544!
1331
    fnError("udfc scalar function execution failure");
×
1332
    releaseUdfFuncHandle(udfName, handle);
×
1333
    return code;
×
1334
  }
1335

1336
  if (output->columnData == NULL) {
26,557!
1337
    fnError("udfc scalar function calculate error. no column data");
×
1338
    code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1339
  } else {
1340
    if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) {
26,557!
1341
      fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
×
1342
              session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes);
1343
      code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1344
    }
1345
  }
1346
  releaseUdfFuncHandle(udfName, handle);
26,561✔
1347
  return code;
26,612✔
1348
}
1349

1350
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
32✔
1351
  if (pFunc == NULL || pEnv == NULL) {
32!
1352
    fnError("udfAggGetEnv: invalid input lint: %d", __LINE__);
×
1353
    return false;
×
1354
  }
1355
  if (fmIsScalarFunc(pFunc->funcId)) {
32!
1356
    return false;
×
1357
  }
1358
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
32✔
1359
  return true;
32✔
1360
}
1361

1362
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) {
39✔
1363
  TAOS_UDF_CHECK_PTR_RCODE(pCtx, pResultCellInfo);
117!
1364
  if (pResultCellInfo->initialized) {
39!
1365
    return TSDB_CODE_SUCCESS;
×
1366
  }
1367
  if (functionSetup(pCtx, pResultCellInfo) != TSDB_CODE_SUCCESS) {
39!
1368
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1369
  }
1370
  UdfcFuncHandle handle;
1371
  int32_t        udfCode = 0;
39✔
1372
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
39!
1373
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
×
1374
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1375
  }
1376
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
39✔
1377
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
39✔
1378
  int32_t         envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize;
39✔
1379
  memset(udfRes, 0, envSize);
39✔
1380

1381
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
39✔
1382
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
39✔
1383

1384
  SUdfInterBuf buf = {0};
39✔
1385
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
39!
1386
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
×
1387
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
1388
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1389
  }
1390
  if (buf.bufLen <= session->bufSize) {
39!
1391
    memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
39✔
1392
    udfRes->interResBufLen = buf.bufLen;
39✔
1393
    udfRes->interResNum = buf.numOfResult;
39✔
1394
  } else {
1395
    fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
×
1396
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
1397
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1398
  }
1399
  releaseUdfFuncHandle(pCtx->udfName, handle);
39✔
1400
  freeUdfInterBuf(&buf);
39✔
1401
  return TSDB_CODE_SUCCESS;
39✔
1402
}
1403

1404
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
46✔
1405
  TAOS_UDF_CHECK_PTR_RCODE(pCtx);
92!
1406
  int32_t        udfCode = 0;
46✔
1407
  UdfcFuncHandle handle = 0;
46✔
1408
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
46!
1409
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
1410
    return udfCode;
×
1411
  }
1412

1413
  SUdfcUvSession *session = handle;
46✔
1414
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
46✔
1415
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
46✔
1416
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
46✔
1417

1418
  SInputColumnInfoData *pInput = &pCtx->input;
46✔
1419
  int32_t               numOfCols = pInput->numOfInputCols;
46✔
1420
  int32_t               start = pInput->startRowIndex;
46✔
1421
  int32_t               numOfRows = pInput->numOfRows;
46✔
1422
  SSDataBlock          *pTempBlock = NULL;
46✔
1423
  int32_t               code = createDataBlock(&pTempBlock);
46✔
1424

1425
  if (code) {
46!
1426
    return code;
×
1427
  }
1428

1429
  pTempBlock->info.rows = pInput->totalRows;
46✔
1430
  pTempBlock->info.id.uid = pInput->uid;
46✔
1431
  for (int32_t i = 0; i < numOfCols; ++i) {
100✔
1432
    if ((udfCode = blockDataAppendColInfo(pTempBlock, pInput->pData[i])) != 0) {
54!
1433
      fnError("udfAggProcess error. step blockDataAppendColInfo. udf code: %d", udfCode);
×
1434
      blockDataDestroy(pTempBlock);
×
1435
      return udfCode;
×
1436
    }
1437
  }
1438

1439
  SSDataBlock *inputBlock = NULL;
46✔
1440
  code = blockDataExtractBlock(pTempBlock, start, numOfRows, &inputBlock);
46✔
1441
  if (code) {
46!
1442
    return code;
×
1443
  }
1444

1445
  SUdfInterBuf state = {
46✔
1446
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
46✔
1447
  SUdfInterBuf newState = {0};
46✔
1448

1449
  udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
46✔
1450
  if (udfCode != 0) {
46!
1451
    fnError("udfAggProcess error. code: %d", udfCode);
×
1452
    newState.numOfResult = 0;
×
1453
  } else {
1454
    if (newState.bufLen <= session->bufSize) {
46!
1455
      memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
46✔
1456
      udfRes->interResBufLen = newState.bufLen;
46✔
1457
      udfRes->interResNum = newState.numOfResult;
46✔
1458
    } else {
1459
      fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
×
1460
      udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
×
1461
    }
1462
  }
1463

1464
  GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum;
46✔
1465

1466
  blockDataDestroy(inputBlock);
46✔
1467

1468
  taosArrayDestroy(pTempBlock->pDataBlock);
46✔
1469
  taosMemoryFree(pTempBlock);
46!
1470

1471
  releaseUdfFuncHandle(pCtx->udfName, handle);
46✔
1472
  freeUdfInterBuf(&newState);
46✔
1473
  return udfCode;
46✔
1474
}
1475

1476
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
39✔
1477
  TAOS_UDF_CHECK_PTR_RCODE(pCtx, pBlock);
117!
1478
  int32_t        udfCode = 0;
39✔
1479
  UdfcFuncHandle handle = 0;
39✔
1480
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
39!
1481
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
1482
    return udfCode;
×
1483
  }
1484

1485
  SUdfcUvSession *session = handle;
39✔
1486
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
39✔
1487
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
39✔
1488
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
39✔
1489

1490
  SUdfInterBuf resultBuf = {0};
39✔
1491
  SUdfInterBuf state = {
39✔
1492
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
39✔
1493
  int32_t udfCallCode = 0;
39✔
1494
  udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
39✔
1495
  if (udfCallCode != 0) {
39!
1496
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
×
1497
    GET_RES_INFO(pCtx)->numOfRes = 0;
×
1498
  } else {
1499
    if (resultBuf.numOfResult == 0) {
39!
1500
      udfRes->finalResNum = 0;
×
1501
      GET_RES_INFO(pCtx)->numOfRes = 0;
×
1502
    } else {
1503
      if (resultBuf.bufLen <= session->bytes) {
39!
1504
        memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen);
39✔
1505
        udfRes->finalResNum = resultBuf.numOfResult;
39✔
1506
        GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
39✔
1507
      } else {
1508
        fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
×
1509
        GET_RES_INFO(pCtx)->numOfRes = 0;
×
1510
        udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1511
      }
1512
    }
1513
  }
1514

1515
  freeUdfInterBuf(&resultBuf);
39✔
1516

1517
  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
39✔
1518
  releaseUdfFuncHandle(pCtx->udfName, handle);
39✔
1519
  return udfCallCode == 0 ? numOfResults : udfCallCode;
39!
1520
}
1521

1522
void onUdfcPipeClose(uv_handle_t *handle) {
133✔
1523
  SClientUvConn *conn = handle->data;
133✔
1524
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
133!
1525
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
133✔
1526
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
133✔
1527
    task->errCode = 0;
133✔
1528
    QUEUE_REMOVE(&task->procTaskQueue);
133✔
1529
    uv_sem_post(&task->taskSem);
133✔
1530
  }
1531
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
133✔
1532
  if (conn->session != NULL) {
133!
1533
    conn->session->udfUvPipe = NULL;
133✔
1534
  }
1535
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
133✔
1536
  taosMemoryFree(conn->readBuf.buf);
133!
1537
  taosMemoryFree(conn);
133!
1538
  taosMemoryFree((uv_pipe_t *)handle);
133!
1539
}
133✔
1540

1541
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
26,893✔
1542
  int32_t code = 0;
26,893✔
1543
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
26,893✔
1544
  if (uvTask->type == UV_TASK_REQ_RSP) {
27,030✔
1545
    if (uvTask->rspBuf.base != NULL) {
26,740!
1546
      SUdfResponse rsp = {0};
26,958✔
1547
      void        *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
26,958✔
1548
      code = rsp.code;
26,698✔
1549
      if (code != 0) {
26,698!
1550
        fnError("udfc get udf task result failure. code: %d", code);
×
1551
      }
1552

1553
      switch (task->type) {
26,889!
1554
        case UDF_TASK_SETUP: {
157✔
1555
          task->_setup.rsp = rsp.setupRsp;
157✔
1556
          break;
157✔
1557
        }
1558
        case UDF_TASK_CALL: {
26,599✔
1559
          task->_call.rsp = rsp.callRsp;
26,599✔
1560
          break;
26,599✔
1561
        }
1562
        case UDF_TASK_TEARDOWN: {
133✔
1563
          task->_teardown.rsp = rsp.teardownRsp;
1564
          break;
133✔
1565
        }
1566
        default: {
×
1567
          break;
×
1568
        }
1569
      }
1570

1571
      // TODO: the call buffer is setup and freed by udf invocation
1572
      taosMemoryFreeClear(uvTask->rspBuf.base);
26,889!
1573
    } else {
1574
      code = uvTask->errCode;
×
1575
      if (code != 0) {
×
1576
        fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1577
      }
1578
    }
1579
  } else if (uvTask->type == UV_TASK_CONNECT) {
290✔
1580
    code = uvTask->errCode;
157✔
1581
    if (code != 0) {
157!
1582
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1583
    }
1584
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
133!
1585
    code = uvTask->errCode;
133✔
1586
    if (code != 0) {
133!
1587
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1588
    }
1589
  }
1590
  return code;
27,274✔
1591
}
1592

1593
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
74,217✔
1594
  SClientUvConn  *conn = handle->data;
74,217✔
1595
  SClientConnBuf *connBuf = &conn->readBuf;
74,217✔
1596

1597
  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
74,217✔
1598
  if (connBuf->cap == 0) {
74,217✔
1599
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
27,183!
1600
    if (connBuf->buf) {
27,183!
1601
      connBuf->len = 0;
27,183✔
1602
      connBuf->cap = msgHeadSize;
27,183✔
1603
      connBuf->total = -1;
27,183✔
1604

1605
      buf->base = connBuf->buf;
27,183✔
1606
      buf->len = connBuf->cap;
27,183✔
1607
    } else {
1608
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
×
1609
      buf->base = NULL;
×
1610
      buf->len = 0;
×
1611
    }
1612
  } else if (connBuf->total == -1 && connBuf->len < msgHeadSize) {
47,034!
1613
    buf->base = connBuf->buf + connBuf->len;
20,008✔
1614
    buf->len = msgHeadSize - connBuf->len;
20,008✔
1615
  } else {
1616
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
27,026✔
1617
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
27,026!
1618
    if (resultBuf) {
27,026!
1619
      connBuf->buf = resultBuf;
27,026✔
1620
      buf->base = connBuf->buf + connBuf->len;
27,026✔
1621
      buf->len = connBuf->cap - connBuf->len;
27,026✔
1622
    } else {
1623
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
×
1624
      buf->base = NULL;
×
1625
      buf->len = 0;
×
1626
    }
1627
  }
1628

1629
  fnDebug("udfc uv alloc buffer: cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
74,217✔
1630
}
74,217✔
1631

1632
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
54,052✔
1633
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
54,052!
1634
    connBuf->total = *(int32_t *)(connBuf->buf);
27,026✔
1635
  }
1636
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
54,052!
1637
    fnDebug("udfc complete message is received, now handle it");
27,026✔
1638
    return true;
27,026✔
1639
  }
1640
  return false;
27,026✔
1641
}
1642

1643
void udfcUvHandleRsp(SClientUvConn *conn) {
27,026✔
1644
  SClientConnBuf *connBuf = &conn->readBuf;
27,026✔
1645
  int64_t         seqNum = *(int64_t *)(connBuf->buf + sizeof(int32_t));  // msglen then seqnum
27,026✔
1646

1647
  if (QUEUE_EMPTY(&conn->taskQueue)) {
27,026!
1648
    fnError("udfc no task waiting on connection. response seqnum:%" PRId64, seqNum);
×
1649
    return;
×
1650
  }
1651
  bool               found = false;
27,026✔
1652
  SClientUvTaskNode *taskFound = NULL;
27,026✔
1653
  QUEUE             *h = QUEUE_NEXT(&conn->taskQueue);
27,026✔
1654
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
27,026✔
1655

1656
  while (h != &conn->taskQueue) {
93,566✔
1657
    fnDebug("udfc handle response iterate through queue. uvTask:%" PRId64 "-%p", task->seqNum, task);
66,540✔
1658
    if (task->seqNum == seqNum) {
66,540✔
1659
      if (found == false) {
27,026!
1660
        found = true;
27,026✔
1661
        taskFound = task;
27,026✔
1662
      } else {
1663
        fnError("udfc more than one task waiting for the same response");
×
1664
        continue;
×
1665
      }
1666
    }
1667
    h = QUEUE_NEXT(h);
66,540✔
1668
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
66,540✔
1669
  }
1670

1671
  if (taskFound) {
27,026!
1672
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
27,026✔
1673
    QUEUE_REMOVE(&taskFound->connTaskQueue);
27,026✔
1674
    QUEUE_REMOVE(&taskFound->procTaskQueue);
27,026✔
1675
    uv_sem_post(&taskFound->taskSem);
27,026✔
1676
  } else {
1677
    fnError("no task is waiting for the response.");
×
1678
  }
1679
  connBuf->buf = NULL;
27,026✔
1680
  connBuf->total = -1;
27,026✔
1681
  connBuf->len = 0;
27,026✔
1682
  connBuf->cap = 0;
27,026✔
1683
}
1684

1685
void udfcUvHandleError(SClientUvConn *conn) {
×
1686
  fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe);
×
1687
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
×
1688
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
×
1689
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
×
1690
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
×
1691
    QUEUE_REMOVE(&task->connTaskQueue);
×
1692
    QUEUE_REMOVE(&task->procTaskQueue);
×
1693
    uv_sem_post(&task->taskSem);
×
1694
  }
1695
  if (!uv_is_closing((uv_handle_t *)conn->pipe)) {
×
1696
    uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose);
×
1697
  }
1698
}
×
1699

1700
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
74,217✔
1701
  fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread);
74,217✔
1702
  if (nread == 0) return;
74,217✔
1703

1704
  SClientUvConn  *conn = client->data;
54,052✔
1705
  SClientConnBuf *connBuf = &conn->readBuf;
54,052✔
1706
  if (nread > 0) {
54,052!
1707
    connBuf->len += nread;
54,052✔
1708
    if (isUdfcUvMsgComplete(connBuf)) {
54,052✔
1709
      udfcUvHandleRsp(conn);
27,026✔
1710
    }
1711
  }
1712
  if (nread < 0) {
54,052!
1713
    fnError("udfc client pipe %p read error: %zd(%s).", client, nread, uv_strerror(nread));
×
1714
    if (nread == UV_EOF) {
×
1715
      fnError("\tudfc client pipe %p closed", client);
×
1716
    }
1717
    udfcUvHandleError(conn);
×
1718
  }
1719
}
1720

1721
void onUdfcPipeWrite(uv_write_t *write, int32_t status) {
27,026✔
1722
  SClientUvConn *conn = write->data;
27,026✔
1723
  if (status < 0) {
27,026!
1724
    fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
×
1725
    udfcUvHandleError(conn);
×
1726
  } else {
1727
    fnDebug("udfc client connection %p write succeed", conn);
27,026✔
1728
  }
1729
  taosMemoryFree(write);
27,026!
1730
}
27,026✔
1731

1732
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status) {
157✔
1733
  SClientUvTaskNode *uvTask = connect->data;
157✔
1734
  if (status != 0) {
157!
1735
    fnError("client connect error, task seq: %" PRId64 ", code: %s", uvTask->seqNum, uv_strerror(status));
×
1736
  }
1737
  uvTask->errCode = status;
157✔
1738

1739
  int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
157✔
1740
  if (code != 0) {
157!
1741
    fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code));
×
1742
    uvTask->errCode = code;
×
1743
  }
1744
  taosMemoryFree(connect);
157!
1745
  QUEUE_REMOVE(&uvTask->procTaskQueue);
157✔
1746
  uv_sem_post(&uvTask->taskSem);
157✔
1747
}
157✔
1748

1749
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask) {
27,277✔
1750
  uvTask->type = uvTaskType;
27,277✔
1751
  uvTask->udfc = task->session->udfc;
27,277✔
1752

1753
  if (uvTaskType == UV_TASK_CONNECT) {
27,277✔
1754
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
27,125✔
1755
    uvTask->pipe = task->session->udfUvPipe;
27,011✔
1756
    SUdfRequest request;
1757
    request.type = task->type;
27,011✔
1758
    request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
27,011✔
1759

1760
    if (task->type == UDF_TASK_SETUP) {
27,021✔
1761
      request.setup = task->_setup.req;
157✔
1762
      request.type = UDF_TASK_SETUP;
157✔
1763
    } else if (task->type == UDF_TASK_CALL) {
26,864✔
1764
      request.call = task->_call.req;
26,732✔
1765
      request.type = UDF_TASK_CALL;
26,732✔
1766
    } else if (task->type == UDF_TASK_TEARDOWN) {
132!
1767
      request.teardown = task->_teardown.req;
133✔
1768
      request.type = UDF_TASK_TEARDOWN;
133✔
1769
    } else {
UNCOV
1770
      fnError("udfc create uv task, invalid task type : %d", task->type);
×
1771
    }
1772
    int32_t bufLen = encodeUdfRequest(NULL, &request);
27,021✔
1773
    if (bufLen <= 0) {
26,869!
1774
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
1775
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1776
    }
1777
    request.msgLen = bufLen;
26,869✔
1778
    void *bufBegin = taosMemoryMalloc(bufLen);
26,869!
1779
    if (bufBegin == NULL) {
26,957!
1780
      fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen);
×
1781
      return terrno;
×
1782
    }
1783
    void *buf = bufBegin;
26,957✔
1784
    if (encodeUdfRequest(&buf, &request) <= 0) {
26,957!
1785
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
1786
      taosMemoryFree(bufBegin);
×
1787
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1788
    }
1789

1790
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
26,908✔
1791
    uvTask->seqNum = request.seqNum;
26,837✔
1792
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
114!
1793
    uvTask->pipe = task->session->udfUvPipe;
133✔
1794
  }
1795
  if (uv_sem_init(&uvTask->taskSem, 0) != 0) {
27,103✔
1796
    if (uvTaskType == UV_TASK_REQ_RSP) {
37!
1797
      taosMemoryFreeClear(uvTask->reqBuf.base);
×
1798
    }
1799
    fnError("udfc create uv task, init semaphore failed.");
37!
1800
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1801
  }
1802

1803
  return 0;
27,118✔
1804
}
1805

1806
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
27,129✔
1807
  fnDebug("queue uv task to event loop, uvTask: %d-%p", uvTask->type, uvTask);
27,129✔
1808
  SUdfcProxy *udfc = uvTask->udfc;
27,129✔
1809
  uv_mutex_lock(&udfc->taskQueueMutex);
27,129✔
1810
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
27,316✔
1811
  uv_mutex_unlock(&udfc->taskQueueMutex);
27,316✔
1812
  int32_t code = uv_async_send(&udfc->loopTaskAync);
27,316✔
1813
  if (code != 0) {
27,297!
1814
    fnError("udfc queue uv task to event loop failed. code: %s", uv_strerror(code));
×
1815
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1816
  }
1817

1818
  uv_sem_wait(&uvTask->taskSem);
27,297✔
1819
  fnDebug("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
26,924✔
1820
  uv_sem_destroy(&uvTask->taskSem);
26,924✔
1821

1822
  return 0;
26,901✔
1823
}
1824

1825
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
27,316✔
1826
  fnDebug("event loop start uv task. uvTask: %" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
27,316✔
1827
  int32_t code = 0;
27,316✔
1828

1829
  switch (uvTask->type) {
27,316!
1830
    case UV_TASK_CONNECT: {
157✔
1831
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
157!
1832
      if (pipe == NULL) {
157!
1833
        fnError("udfc event loop start connect task malloc pipe failed.");
×
1834
        return terrno;
×
1835
      }
1836
      if (uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0) != 0) {
157!
1837
        fnError("udfc event loop start connect task uv_pipe_init failed.");
×
1838
        taosMemoryFree(pipe);
×
1839
        return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1840
      }
1841
      uvTask->pipe = pipe;
157✔
1842

1843
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
157!
1844
      if (conn == NULL) {
157!
1845
        fnError("udfc event loop start connect task malloc conn failed.");
×
1846
        taosMemoryFree(pipe);
×
1847
        return terrno;
×
1848
      }
1849
      conn->pipe = pipe;
157✔
1850
      conn->readBuf.len = 0;
157✔
1851
      conn->readBuf.cap = 0;
157✔
1852
      conn->readBuf.buf = 0;
157✔
1853
      conn->readBuf.total = -1;
157✔
1854
      QUEUE_INIT(&conn->taskQueue);
157✔
1855

1856
      pipe->data = conn;
157✔
1857

1858
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
157!
1859
      if (connReq == NULL) {
157!
1860
        fnError("udfc event loop start connect task malloc connReq failed.");
×
1861
        taosMemoryFree(pipe);
×
1862
        taosMemoryFree(conn);
×
1863
        return terrno;
×
1864
      }
1865
      connReq->data = uvTask;
157✔
1866
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
157✔
1867
      code = 0;
157✔
1868
      break;
157✔
1869
    }
1870
    case UV_TASK_REQ_RSP: {
27,026✔
1871
      uv_pipe_t *pipe = uvTask->pipe;
27,026✔
1872
      if (pipe == NULL) {
27,026!
1873
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1874
      } else {
1875
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
27,026!
1876
        if (write == NULL) {
27,026!
1877
          fnError("udfc event loop start req_rsp task malloc write failed.");
×
1878
          return terrno;
×
1879
        }
1880
        write->data = pipe->data;
27,026✔
1881
        QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
27,026✔
1882
        QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
27,026✔
1883
        int32_t err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
27,026✔
1884
        if (err != 0) {
27,026!
1885
          taosMemoryFree(write);
×
1886
          fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err));
×
1887
        }
1888
        code = err;
27,026✔
1889
      }
1890
      break;
27,026✔
1891
    }
1892
    case UV_TASK_DISCONNECT: {
133✔
1893
      uv_pipe_t *pipe = uvTask->pipe;
133✔
1894
      if (pipe == NULL) {
133!
1895
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1896
      } else {
1897
        SClientUvConn *conn = pipe->data;
133✔
1898
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
133✔
1899
        if (!uv_is_closing((uv_handle_t *)uvTask->pipe)) {
133!
1900
          uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
133✔
1901
        }
1902
        code = 0;
133✔
1903
      }
1904
      break;
133✔
1905
    }
1906
    default: {
×
1907
      fnError("udfc event loop unknown task type.") break;
×
1908
    }
1909
  }
1910

1911
  return code;
27,316✔
1912
}
1913

1914
void udfcAsyncTaskCb(uv_async_t *async) {
24,046✔
1915
  SUdfcProxy *udfc = async->data;
24,046✔
1916
  QUEUE       wq;
1917

1918
  uv_mutex_lock(&udfc->taskQueueMutex);
24,046✔
1919
  QUEUE_MOVE(&udfc->taskQueue, &wq);
24,046✔
1920
  uv_mutex_unlock(&udfc->taskQueueMutex);
24,046✔
1921

1922
  while (!QUEUE_EMPTY(&wq)) {
51,362✔
1923
    QUEUE *h = QUEUE_HEAD(&wq);
27,316✔
1924
    QUEUE_REMOVE(h);
27,316✔
1925
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
27,316✔
1926
    int32_t            code = udfcStartUvTask(task);
27,316✔
1927
    if (code == 0) {
27,316!
1928
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
27,316✔
1929
    } else {
1930
      task->errCode = code;
×
1931
      uv_sem_post(&task->taskSem);
×
1932
    }
1933
  }
1934
}
24,046✔
1935

1936
void cleanUpUvTasks(SUdfcProxy *udfc) {
1,845✔
1937
  fnDebug("clean up uv tasks") QUEUE wq;
1,845✔
1938

1939
  uv_mutex_lock(&udfc->taskQueueMutex);
1,845✔
1940
  QUEUE_MOVE(&udfc->taskQueue, &wq);
1,845!
1941
  uv_mutex_unlock(&udfc->taskQueueMutex);
1,845✔
1942

1943
  while (!QUEUE_EMPTY(&wq)) {
1,845!
1944
    QUEUE *h = QUEUE_HEAD(&wq);
×
1945
    QUEUE_REMOVE(h);
×
1946
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
×
1947
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1948
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1949
    }
1950
    uv_sem_post(&task->taskSem);
×
1951
  }
1952

1953
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
1,845!
1954
    QUEUE *h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
×
1955
    QUEUE_REMOVE(h);
×
1956
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
×
1957
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1958
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1959
    }
1960
    uv_sem_post(&task->taskSem);
×
1961
  }
1962
}
1,845✔
1963

1964
void udfStopAsyncCb(uv_async_t *async) {
1,845✔
1965
  SUdfcProxy *udfc = async->data;
1,845✔
1966
  cleanUpUvTasks(udfc);
1,845✔
1967
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
1,845!
1968
    uv_stop(&udfc->uvLoop);
1,845✔
1969
  }
1970
}
1,845✔
1971

1972
void constructUdfService(void *argsThread) {
1,845✔
1973
  int32_t     code = 0, lino = 0;
1,845✔
1974
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
1,845✔
1975
  code = uv_loop_init(&udfc->uvLoop);
1,845✔
1976
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,845!
1977

1978
  code = uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
1,845✔
1979
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,845!
1980
  udfc->loopTaskAync.data = udfc;
1,845✔
1981
  code = uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
1,845✔
1982
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,845!
1983
  udfc->loopStopAsync.data = udfc;
1,845✔
1984
  code = uv_mutex_init(&udfc->taskQueueMutex);
1,845✔
1985
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,845!
1986
  QUEUE_INIT(&udfc->taskQueue);
1,845✔
1987
  QUEUE_INIT(&udfc->uvProcTaskQueue);
1,845✔
1988
  (void)uv_barrier_wait(&udfc->initBarrier);
1,845✔
1989
  // TODO return value of uv_run
1990
  int32_t num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
1,845✔
1991
  fnInfo("udfc uv loop exit. active handle num: %d", num);
1,845!
1992
  (void)uv_loop_close(&udfc->uvLoop);
1,845✔
1993

1994
  uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL);
1,845✔
1995
  num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
1,845✔
1996
  fnInfo("udfc uv loop exit. active handle num: %d", num);
1,845!
1997

1998
  (void)uv_loop_close(&udfc->uvLoop);
1,845✔
1999
_exit:
1,845✔
2000
  if (code != 0) {
1,845!
2001
    fnError("udfc construct error. code: %d, line: %d", code, lino);
×
2002
  }
2003
  fnInfo("udfc construct finished");
1,845!
2004
}
1,845✔
2005

2006
int32_t udfcOpen() {
2,383✔
2007
  int32_t code = 0, lino = 0;
2,383✔
2008
  int8_t  old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
2,383✔
2009
  if (old == 1) {
2,383✔
2010
    return 0;
538✔
2011
  }
2012
  SUdfcProxy *proxy = &gUdfcProxy;
1,845✔
2013
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
1,845✔
2014
  proxy->udfcState = UDFC_STATE_STARTNG;
1,845✔
2015
  code = uv_barrier_init(&proxy->initBarrier, 2);
1,845✔
2016
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,845!
2017
  code = uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
1,845✔
2018
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,845!
2019
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
1,845✔
2020
  proxy->udfcState = UDFC_STATE_READY;
1,845✔
2021
  (void)uv_barrier_wait(&proxy->initBarrier);
1,845✔
2022
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,845!
2023
  code = uv_mutex_init(&proxy->udfStubsMutex);
1,845✔
2024
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,845!
2025
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
1,845✔
2026
  if (proxy->udfStubs == NULL) {
1,845!
2027
    fnError("udfc init failed. udfStubs: %p", proxy->udfStubs);
×
2028
    return -1;
×
2029
  }
2030
  proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
1,845✔
2031
  if (proxy->expiredUdfStubs == NULL) {
1,845!
2032
    taosArrayDestroy(proxy->udfStubs);
×
2033
    fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs);
×
2034
    return -1;
×
2035
  }
2036
  code = uv_mutex_init(&proxy->udfcUvMutex);
1,845✔
2037
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,845!
2038
_exit:
1,845✔
2039
  if (code != 0) {
1,845!
2040
    fnError("udfc open error. code: %d, line: %d", code, lino);
×
2041
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
2042
  }
2043
  fnInfo("udfc initialized");
1,845!
2044
  return 0;
1,845✔
2045
}
2046

2047
int32_t udfcClose() {
1,879✔
2048
  int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 1, 0);
1,879✔
2049
  if (old == 0) {
1,879✔
2050
    return 0;
34✔
2051
  }
2052

2053
  SUdfcProxy *udfc = &gUdfcProxy;
1,845✔
2054
  udfc->udfcState = UDFC_STATE_STOPPING;
1,845✔
2055
  if (uv_async_send(&udfc->loopStopAsync) != 0) {
1,845!
2056
    fnError("udfc close error to send stop async");
×
2057
  }
2058
  if (uv_thread_join(&udfc->loopThread) != 0) {
1,845!
2059
    fnError("udfc close errir to join loop thread");
×
2060
  }
2061
  uv_mutex_destroy(&udfc->taskQueueMutex);
1,845✔
2062
  uv_barrier_destroy(&udfc->initBarrier);
1,845✔
2063
  taosArrayDestroy(udfc->expiredUdfStubs);
1,845✔
2064
  taosArrayDestroy(udfc->udfStubs);
1,845✔
2065
  uv_mutex_destroy(&udfc->udfStubsMutex);
1,845✔
2066
  uv_mutex_destroy(&udfc->udfcUvMutex);
1,845✔
2067
  udfc->udfcState = UDFC_STATE_INITAL;
1,845✔
2068
  fnInfo("udfc is cleaned up");
1,845!
2069
  return 0;
1,845✔
2070
}
2071

2072
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
27,272✔
2073
  int32_t            code = 0, lino = 0;
27,272✔
2074
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
27,272!
2075
  if (uvTask == NULL) {
27,307!
2076
    fnError("udfc client task: %p failed to allocate memory for uvTask", task);
×
2077
    return terrno;
×
2078
  }
2079
  fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
27,307✔
2080

2081
  code = udfcInitializeUvTask(task, uvTaskType, uvTask);
27,307✔
2082
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,127!
2083
  code = udfcQueueUvTask(uvTask);
27,127✔
2084
  TAOS_CHECK_GOTO(code, &lino, _exit);
26,900!
2085
  code = udfcGetUdfTaskResultFromUvTask(task, uvTask);
26,900✔
2086
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,282!
2087
  if (uvTaskType == UV_TASK_CONNECT) {
27,282✔
2088
    task->session->udfUvPipe = uvTask->pipe;
157✔
2089
    SClientUvConn *conn = uvTask->pipe->data;
157✔
2090
    conn->session = task->session;
157✔
2091
  }
2092

2093
_exit:
27,125✔
2094
  if (code != 0) {
27,282!
2095
    fnError("udfc run udf uv task failure. task: %p, uvTask: %p, err: %d, line: %d", task, uvTask, code, lino);
×
2096
  }
2097
  taosMemoryFree(uvTask->reqBuf.base);
27,282!
2098
  uvTask->reqBuf.base = NULL;
27,280✔
2099
  taosMemoryFree(uvTask);
27,280!
2100
  uvTask = NULL;
27,310✔
2101
  return code;
27,310✔
2102
}
2103

2104
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
157✔
2105
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
157✔
2106
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
157!
2107
  if (task == NULL) {
157!
2108
    fnError("doSetupUdf, failed to allocate memory for task");
×
2109
    return terrno;
×
2110
  }
2111
  task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
157!
2112
  if (task->session == NULL) {
157!
2113
    fnError("doSetupUdf, failed to allocate memory for session");
×
2114
    taosMemoryFree(task);
×
2115
    return terrno;
×
2116
  }
2117
  task->session->udfc = &gUdfcProxy;
157✔
2118
  task->type = UDF_TASK_SETUP;
157✔
2119

2120
  SUdfSetupRequest *req = &task->_setup.req;
157✔
2121
  tstrncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
157✔
2122

2123
  code = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
157✔
2124
  TAOS_CHECK_GOTO(code, &lino, _exit);
157!
2125

2126
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
157✔
2127
  TAOS_CHECK_GOTO(code, &lino, _exit);
157!
2128

2129
  SUdfSetupResponse *rsp = &task->_setup.rsp;
157✔
2130
  task->session->severHandle = rsp->udfHandle;
157✔
2131
  task->session->outputType = rsp->outputType;
157✔
2132
  task->session->bytes = rsp->bytes;
157✔
2133
  task->session->bufSize = rsp->bufSize;
157✔
2134
  tstrncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
157✔
2135
  fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
157!
2136
  *funcHandle = task->session;
157✔
2137
  taosMemoryFree(task);
157!
2138
  return 0;
157✔
2139

2140
_exit:
×
2141
  if (code != 0) {
×
2142
    fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino);
×
2143
  }
2144
  taosMemoryFree(task->session);
×
2145
  taosMemoryFree(task);
×
2146
  return code;
×
2147
}
2148

2149
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
26,712✔
2150
                SSDataBlock *output, SUdfInterBuf *newState) {
2151
  fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
26,712✔
2152
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
26,716✔
2153
  if (session->udfUvPipe == NULL) {
26,716!
2154
    fnError("No pipe to udfd");
×
2155
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2156
  }
2157
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
26,716!
2158
  if (task == NULL) {
26,725!
2159
    fnError("udfc call udf. failed to allocate memory for task");
×
2160
    return terrno;
×
2161
  }
2162
  task->session = (SUdfcUvSession *)handle;
26,725✔
2163
  task->type = UDF_TASK_CALL;
26,725✔
2164

2165
  SUdfCallRequest *req = &task->_call.req;
26,725✔
2166
  req->udfHandle = task->session->severHandle;
26,725✔
2167
  req->callType = callType;
26,725✔
2168

2169
  switch (callType) {
26,725!
2170
    case TSDB_UDF_CALL_AGG_INIT: {
39✔
2171
      req->initFirst = 1;
39✔
2172
      break;
39✔
2173
    }
2174
    case TSDB_UDF_CALL_AGG_PROC: {
46✔
2175
      req->block = *input;
46✔
2176
      req->interBuf = *state;
46✔
2177
      break;
46✔
2178
    }
2179
    case TSDB_UDF_CALL_AGG_MERGE: {
×
2180
      req->interBuf = *state;
×
2181
      req->interBuf2 = *state2;
×
2182
      break;
×
2183
    }
2184
    case TSDB_UDF_CALL_AGG_FIN: {
39✔
2185
      req->interBuf = *state;
39✔
2186
      break;
39✔
2187
    }
2188
    case TSDB_UDF_CALL_SCALA_PROC: {
26,588✔
2189
      req->block = *input;
26,588✔
2190
      break;
26,588✔
2191
    }
2192
  }
2193

2194
  int32_t code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
26,725✔
2195
  if (code != 0) {
26,699!
2196
    fnError("call udf failure. udfcRunUdfUvTask err: %d", code);
×
2197
  } else {
2198
    SUdfCallResponse *rsp = &task->_call.rsp;
26,699✔
2199
    switch (callType) {
26,699!
2200
      case TSDB_UDF_CALL_AGG_INIT: {
39✔
2201
        *newState = rsp->resultBuf;
39✔
2202
        break;
39✔
2203
      }
2204
      case TSDB_UDF_CALL_AGG_PROC: {
46✔
2205
        *newState = rsp->resultBuf;
46✔
2206
        break;
46✔
2207
      }
2208
      case TSDB_UDF_CALL_AGG_MERGE: {
×
2209
        *newState = rsp->resultBuf;
×
2210
        break;
×
2211
      }
2212
      case TSDB_UDF_CALL_AGG_FIN: {
39✔
2213
        *newState = rsp->resultBuf;
39✔
2214
        break;
39✔
2215
      }
2216
      case TSDB_UDF_CALL_SCALA_PROC: {
26,576✔
2217
        *output = rsp->resultData;
26,576✔
2218
        break;
26,576✔
2219
      }
2220
    }
2221
  };
2222
  taosMemoryFree(task);
26,699!
2223
  return code;
26,732✔
2224
}
2225

2226
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
39✔
2227
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;
39✔
2228

2229
  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
39✔
2230

2231
  return err;
39✔
2232
}
2233

2234
// input: block, state
2235
// output: interbuf,
2236
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
46✔
2237
  int8_t  callType = TSDB_UDF_CALL_AGG_PROC;
46✔
2238
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
46✔
2239
  return err;
46✔
2240
}
2241

2242
// input: interbuf1, interbuf2
2243
// output: resultBuf
2244
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
×
2245
                          SUdfInterBuf *resultBuf) {
2246
  int8_t  callType = TSDB_UDF_CALL_AGG_MERGE;
×
2247
  int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
×
2248
  return err;
×
2249
}
2250

2251
// input: interBuf
2252
// output: resultData
2253
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
39✔
2254
  int8_t  callType = TSDB_UDF_CALL_AGG_FIN;
39✔
2255
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
39✔
2256
  return err;
39✔
2257
}
2258

2259
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
26,612✔
2260
  int8_t      callType = TSDB_UDF_CALL_SCALA_PROC;
26,612✔
2261
  SSDataBlock inputBlock = {0};
26,612✔
2262
  int32_t     code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
26,612✔
2263
  if (code != 0) {
26,603!
2264
    fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code);
×
2265
    return code;
×
2266
  }
2267
  SSDataBlock resultBlock = {0};
26,603✔
2268
  int32_t     err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
26,603✔
2269
  if (err == 0) {
26,571!
2270
    err = convertDataBlockToScalarParm(&resultBlock, output);
26,578✔
2271
    taosArrayDestroy(resultBlock.pDataBlock);
26,573✔
2272
  }
2273

2274
  blockDataFreeRes(&inputBlock);
26,596✔
2275
  return err;
26,568✔
2276
}
2277

2278
int32_t doTeardownUdf(UdfcFuncHandle handle) {
133✔
2279
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
133✔
2280
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
133✔
2281

2282
  if (session->udfUvPipe == NULL) {
133!
2283
    fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
×
2284
    taosMemoryFree(session);
×
2285
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2286
  }
2287

2288
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
133!
2289
  if (task == NULL) {
133!
2290
    fnError("doTeardownUdf, failed to allocate memory for task");
×
2291
    taosMemoryFree(session);
×
2292
    return terrno;
×
2293
  }
2294
  task->session = session;
133✔
2295
  task->type = UDF_TASK_TEARDOWN;
133✔
2296

2297
  SUdfTeardownRequest *req = &task->_teardown.req;
133✔
2298
  req->udfHandle = task->session->severHandle;
133✔
2299

2300
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
133✔
2301
  TAOS_CHECK_GOTO(code, &lino, _exit);
133!
2302

2303
  code = udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
133✔
2304
  TAOS_CHECK_GOTO(code, &lino, _exit);
133!
2305

2306
  fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
133!
2307
  // TODO: synchronization refactor between libuv event loop and request thread
2308
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
133✔
2309
  if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
133!
2310
    SClientUvConn *conn = session->udfUvPipe->data;
×
2311
    conn->session = NULL;
×
2312
  }
2313
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
133✔
2314

2315
_exit:
133✔
2316
  if (code != 0) {
133!
2317
    fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino);
×
2318
  }
2319
  taosMemoryFree(session);
133!
2320
  taosMemoryFree(task);
133!
2321

2322
  return code;
133✔
2323
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc