• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3522

07 Nov 2024 05:59AM UTC coverage: 58.216% (+1.3%) from 56.943%
#3522

push

travis-ci

web-flow
Merge pull request #28663 from taosdata/fix/3_liaohj

fix(stream): stop the underlying scan operations for stream

111884 of 248391 branches covered (45.04%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

1164 existing lines in 134 files now uncovered.

191720 of 273118 relevant lines covered (70.2%)

13088725.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.56
/source/libs/function/src/tudf.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "uv.h"
16

17
#include "os.h"
18

19
#include "builtinsimpl.h"
20
#include "fnLog.h"
21
#include "functionMgt.h"
22
#include "querynodes.h"
23
#include "tarray.h"
24
#include "tdatablock.h"
25
#include "tglobal.h"
26
#include "tudf.h"
27
#include "tudfInt.h"
28

29
#ifdef _TD_DARWIN_64
30
#include <mach-o/dyld.h>
31
#endif
32

33
typedef struct SUdfdData {
34
  bool         startCalled;
35
  bool         needCleanUp;
36
  uv_loop_t    loop;
37
  uv_thread_t  thread;
38
  uv_barrier_t barrier;
39
  uv_process_t process;
40
#ifdef WINDOWS
41
  HANDLE jobHandle;
42
#endif
43
  int32_t    spawnErr;
44
  uv_pipe_t  ctrlPipe;
45
  uv_async_t stopAsync;
46
  int32_t    stopCalled;
47

48
  int32_t dnodeId;
49
} SUdfdData;
50

51
SUdfdData udfdGlobal = {0};
52

53
int32_t udfStartUdfd(int32_t startDnodeId);
54
void    udfStopUdfd();
55

56
extern char **environ;
57

58
static int32_t udfSpawnUdfd(SUdfdData *pData);
59
void           udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
60
static void    udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
61
static void    udfUdfdStopAsyncCb(uv_async_t *async);
62
static void    udfWatchUdfd(void *args);
63

64
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
19✔
65
  fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
19!
66
  SUdfdData *pData = process->data;
19✔
67
  if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
19!
UNCOV
68
    fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
×
69
  } else {
70
    fnInfo("udfd process restart");
19!
71
    int32_t code = udfSpawnUdfd(pData);
19✔
72
    if (code != 0) {
19!
73
      fnError("udfd process restart failed with code:%d", code);
×
74
    }
75
  }
76
}
19✔
77

78
static int32_t udfSpawnUdfd(SUdfdData *pData) {
1,185✔
79
  fnInfo("start to init udfd");
1,185!
80

81
  int32_t              err = 0;
1,185✔
82
  uv_process_options_t options = {0};
1,185✔
83

84
  char path[PATH_MAX] = {0};
1,185✔
85
  if (tsProcPath == NULL) {
1,185!
86
    path[0] = '.';
×
87
#ifdef WINDOWS
88
    GetModuleFileName(NULL, path, PATH_MAX);
89
    TAOS_DIRNAME(path);
90
#elif defined(_TD_DARWIN_64)
91
    uint32_t pathSize = sizeof(path);
92
    _NSGetExecutablePath(path, &pathSize);
93
    TAOS_DIRNAME(path);
94
#endif
95
  } else {
96
    TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
1,185✔
97
    TAOS_DIRNAME(path);
1,185✔
98
  }
99
#ifdef WINDOWS
100
  if (strlen(path) == 0) {
101
    TAOS_STRCAT(path, "C:\\TDengine");
102
  }
103
  TAOS_STRCAT(path, "\\udfd.exe");
104
#else
105
  if (strlen(path) == 0) {
1,185!
106
    TAOS_STRCAT(path, "/usr/bin");
×
107
  }
108
  TAOS_STRCAT(path, "/udfd");
1,185✔
109
#endif
110
  char *argsUdfd[] = {path, "-c", configDir, NULL};
1,185✔
111
  options.args = argsUdfd;
1,185✔
112
  options.file = path;
1,185✔
113

114
  options.exit_cb = udfUdfdExit;
1,185✔
115

116
  TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
1,185!
117

118
  uv_stdio_container_t child_stdio[3];
119
  child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
1,185✔
120
  child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe;
1,185✔
121
  child_stdio[1].flags = UV_IGNORE;
1,185✔
122
  child_stdio[2].flags = UV_INHERIT_FD;
1,185✔
123
  child_stdio[2].data.fd = 2;
1,185✔
124
  options.stdio_count = 3;
1,185✔
125
  options.stdio = child_stdio;
1,185✔
126

127
  options.flags = UV_PROCESS_DETACHED;
1,185✔
128

129
  char dnodeIdEnvItem[32] = {0};
1,185✔
130
  char thrdPoolSizeEnvItem[32] = {0};
1,185✔
131
  snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
1,185✔
132

133
  float   numCpuCores = 4;
1,185✔
134
  int32_t code = taosGetCpuCores(&numCpuCores, false);
1,185✔
135
  if (code != 0) {
1,185!
136
    fnError("failed to get cpu cores, code:0x%x", code);
×
137
  }
138
  numCpuCores = TMAX(numCpuCores, 2);
1,185!
139
  snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);
1,185✔
140

141
  char    pathTaosdLdLib[512] = {0};
1,185✔
142
  size_t  taosdLdLibPathLen = sizeof(pathTaosdLdLib);
1,185✔
143
  int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
1,185✔
144
  if (ret != UV_ENOBUFS) {
1,185!
145
    taosdLdLibPathLen = strlen(pathTaosdLdLib);
1,185✔
146
  }
147

148
  char   udfdPathLdLib[1024] = {0};
1,185✔
149
  size_t udfdLdLibPathLen = strlen(tsUdfdLdLibPath);
1,185✔
150
  tstrncpy(udfdPathLdLib, tsUdfdLdLibPath, sizeof(udfdPathLdLib));
1,185✔
151

152
  udfdPathLdLib[udfdLdLibPathLen] = ':';
1,185✔
153
  tstrncpy(udfdPathLdLib + udfdLdLibPathLen + 1, pathTaosdLdLib, sizeof(udfdPathLdLib) - udfdLdLibPathLen - 1);
1,185✔
154
  if (udfdLdLibPathLen + taosdLdLibPathLen < 1024) {
1,185!
155
    fnInfo("[UDFD]udfd LD_LIBRARY_PATH: %s", udfdPathLdLib);
1,185!
156
  } else {
157
    fnError("[UDFD]can not set correct udfd LD_LIBRARY_PATH");
×
158
  }
159
  char ldLibPathEnvItem[1024 + 32] = {0};
1,185✔
160
  snprintf(ldLibPathEnvItem, 1024 + 32, "%s=%s", "LD_LIBRARY_PATH", udfdPathLdLib);
1,185✔
161

162
  char *taosFqdnEnvItem = NULL;
1,185✔
163
  char *taosFqdn = getenv("TAOS_FQDN");
1,185✔
164
  if (taosFqdn != NULL) {
1,185!
165
    int32_t subLen = strlen(taosFqdn);
×
166
    int32_t len = strlen("TAOS_FQDN=") + subLen + 1;
×
167
    taosFqdnEnvItem = taosMemoryMalloc(len);
×
168
    if (taosFqdnEnvItem != NULL) {
×
169
      tstrncpy(taosFqdnEnvItem, "TAOS_FQDN=", len);
×
170
      TAOS_STRNCAT(taosFqdnEnvItem, taosFqdn, subLen);
×
171
      fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
×
172
    } else {
173
      fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
×
174
      return terrno;
×
175
    }
176
  }
177

178
  char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};
1,185✔
179

180
  char **envUdfdWithPEnv = NULL;
1,185✔
181
  if (environ != NULL) {
1,185!
182
    int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
1,185✔
183
    int32_t numEnviron = 0;
1,185✔
184
    while (environ[numEnviron] != NULL) {
22,090✔
185
      numEnviron++;
20,905✔
186
    }
187

188
    envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *));
1,185✔
189
    if (envUdfdWithPEnv == NULL) {
1,185!
190
      err = TSDB_CODE_OUT_OF_MEMORY;
×
191
      goto _OVER;
×
192
    }
193

194
    for (int32_t i = 0; i < numEnviron; i++) {
22,090✔
195
      int32_t len = strlen(environ[i]) + 1;
20,905✔
196
      envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1);
20,905✔
197
      if (envUdfdWithPEnv[i] == NULL) {
20,905!
198
        err = TSDB_CODE_OUT_OF_MEMORY;
×
199
        goto _OVER;
×
200
      }
201

202
      tstrncpy(envUdfdWithPEnv[i], environ[i], len);
20,905✔
203
    }
204

205
    for (int32_t i = 0; i < lenEnvUdfd; i++) {
7,110✔
206
      if (envUdfd[i] != NULL) {
5,925✔
207
        int32_t len = strlen(envUdfd[i]) + 1;
3,555✔
208
        envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1);
3,555✔
209
        if (envUdfdWithPEnv[numEnviron + i] == NULL) {
3,555!
210
          err = TSDB_CODE_OUT_OF_MEMORY;
×
211
          goto _OVER;
×
212
        }
213

214
        tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len);
3,555✔
215
      }
216
    }
217
    envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
1,185✔
218

219
    options.env = envUdfdWithPEnv;
1,185✔
220
  } else {
221
    options.env = envUdfd;
×
222
  }
223

224
  err = uv_spawn(&pData->loop, &pData->process, &options);
1,185✔
225
  pData->process.data = (void *)pData;
1,185✔
226

227
#ifdef WINDOWS
228
  // End udfd.exe by Job.
229
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
230
  pData->jobHandle = CreateJobObject(NULL, NULL);
231
  bool add_job_ok = AssignProcessToJobObject(pData->jobHandle, pData->process.process_handle);
232
  if (!add_job_ok) {
233
    fnError("Assign udfd to job failed.");
234
  } else {
235
    JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info;
236
    memset(&limit_info, 0x0, sizeof(limit_info));
237
    limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
238
    bool set_auto_kill_ok =
239
        SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info));
240
    if (!set_auto_kill_ok) {
241
      fnError("Set job auto kill udfd failed.");
242
    }
243
  }
244
#endif
245

246
  if (err != 0) {
1,185!
247
    fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
×
248
  } else {
249
    fnInfo("udfd is initialized");
1,185!
250
  }
251

252
_OVER:
×
253
  if (taosFqdnEnvItem) {
1,185!
254
    taosMemoryFree(taosFqdnEnvItem);
×
255
  }
256

257
  if (envUdfdWithPEnv != NULL) {
1,185!
258
    int32_t i = 0;
1,185✔
259
    while (envUdfdWithPEnv[i] != NULL) {
25,645✔
260
      taosMemoryFree(envUdfdWithPEnv[i]);
24,460✔
261
      i++;
24,460✔
262
    }
263
    taosMemoryFree(envUdfdWithPEnv);
1,185✔
264
  }
265

266
  return err;
1,185✔
267
}
268

269
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) {
5,771✔
270
  if (!uv_is_closing(handle)) {
5,771!
271
    uv_close(handle, NULL);
5,771✔
272
  }
273
}
5,771✔
274

275
static void udfUdfdStopAsyncCb(uv_async_t *async) {
1,166✔
276
  SUdfdData *pData = async->data;
1,166✔
277
  uv_stop(&pData->loop);
1,166✔
278
}
1,166✔
279

280
static void udfWatchUdfd(void *args) {
1,166✔
281
  SUdfdData *pData = args;
1,166✔
282
  TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
1,166!
283
  TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb));
1,166!
284
  pData->stopAsync.data = pData;
1,166✔
285
  TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData));
1,166!
286
  atomic_store_32(&pData->spawnErr, 0);
1,166✔
287
  (void)uv_barrier_wait(&pData->barrier);
1,166✔
288
  int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
1,166✔
289
  fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
1,166!
290

291
  uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
1,166✔
292
  num = uv_run(&pData->loop, UV_RUN_DEFAULT);
1,166✔
293
  fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
1,166!
294
  if (uv_loop_close(&pData->loop) != 0) {
1,166!
295
    fnError("udfd loop close failed, lino:%d", __LINE__);
×
296
  }
297
  return;
1,166✔
298

299
_exit:
×
300
  if (terrno != 0) {
×
301
    (void)uv_barrier_wait(&pData->barrier);
×
302
    atomic_store_32(&pData->spawnErr, terrno);
×
303
    if (uv_loop_close(&pData->loop) != 0) {
×
304
      fnError("udfd loop close failed, lino:%d", __LINE__);
×
305
    }
306
    fnError("udfd thread exit with code:%d lino:%d", terrno, terrln);
×
307
    terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
308
  }
309
  return;
×
310
}
311

312
int32_t udfStartUdfd(int32_t startDnodeId) {
1,166✔
313
  int32_t code = 0, lino = 0;
1,166✔
314
  if (!tsStartUdfd) {
1,166!
315
    fnInfo("start udfd is disabled.") return 0;
×
316
  }
317
  SUdfdData *pData = &udfdGlobal;
1,166✔
318
  if (pData->startCalled) {
1,166!
319
    fnInfo("dnode start udfd already called");
×
320
    return 0;
×
321
  }
322
  pData->startCalled = true;
1,166✔
323
  char dnodeId[8] = {0};
1,166✔
324
  snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
1,166✔
325
  TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
1,166!
326
  pData->dnodeId = startDnodeId;
1,166✔
327

328
  TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
1,166!
329
  TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, udfWatchUdfd, pData), &lino, _exit);
1,166!
330
  (void)uv_barrier_wait(&pData->barrier);
1,166✔
331
  int32_t err = atomic_load_32(&pData->spawnErr);
1,166✔
332
  if (err != 0) {
1,166!
333
    uv_barrier_destroy(&pData->barrier);
×
334
    if (uv_async_send(&pData->stopAsync) != 0) {
×
335
      fnError("start udfd: failed to send stop async");
×
336
    }
337
    if (uv_thread_join(&pData->thread) != 0) {
×
338
      fnError("start udfd: failed to join udfd thread");
×
339
    }
340
    pData->needCleanUp = false;
×
341
    fnInfo("udfd is cleaned up after spawn err");
×
342
    TAOS_CHECK_GOTO(err, &lino, _exit);
×
343
  } else {
344
    pData->needCleanUp = true;
1,166✔
345
  }
346
_exit:
1,166✔
347
  if (code != 0) {
1,166!
348
    fnError("udfd start failed with code:%d, lino:%d", code, lino);
×
349
  }
350
  return code;
1,166✔
351
}
352

353
void udfStopUdfd() {
1,166✔
354
  SUdfdData *pData = &udfdGlobal;
1,166✔
355
  fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
1,166!
356
  if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
1,166!
357
    return;
×
358
  }
359
  atomic_store_32(&pData->stopCalled, 1);
1,166✔
360
  pData->needCleanUp = false;
1,166✔
361
  uv_barrier_destroy(&pData->barrier);
1,166✔
362
  if (uv_async_send(&pData->stopAsync) != 0) {
1,166!
363
    fnError("stop udfd: failed to send stop async");
×
364
  }
365
  if (uv_thread_join(&pData->thread) != 0) {
1,166!
366
    fnError("stop udfd: failed to join udfd thread");
×
367
  }
368

369
#ifdef WINDOWS
370
  if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
371
#endif
372
  fnInfo("udfd is cleaned up");
1,166!
373
  return;
1,166✔
374
}
375

376
/*
377
int32_t udfGetUdfdPid(int32_t* pUdfdPid) {
378
  SUdfdData *pData = &udfdGlobal;
379
  if (pData->spawnErr) {
380
    return pData->spawnErr;
381
  }
382
  uv_pid_t pid = uv_process_get_pid(&pData->process);
383
  if (pUdfdPid) {
384
    *pUdfdPid = (int32_t)pid;
385
  }
386
  return TSDB_CODE_SUCCESS;
387
}
388
*/
389

390
//==============================================================================================
391
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
392
 * The QUEUE is copied from queue.h under libuv
393
 * */
394

395
typedef void *QUEUE[2];
396

397
/* Private macros. */
398
#define QUEUE_NEXT(q)      (*(QUEUE **)&((*(q))[0]))
399
#define QUEUE_PREV(q)      (*(QUEUE **)&((*(q))[1]))
400
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
401
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
402

403
/* Public macros. */
404
#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr) - offsetof(type, field)))
405

406
/* Important note: mutating the list while QUEUE_FOREACH is
407
 * iterating over its elements results in undefined behavior.
408
 */
409
#define QUEUE_FOREACH(q, h) for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
410

411
#define QUEUE_EMPTY(q) ((const QUEUE *)(q) == (const QUEUE *)QUEUE_NEXT(q))
412

413
#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
414

415
#define QUEUE_INIT(q)    \
416
  do {                   \
417
    QUEUE_NEXT(q) = (q); \
418
    QUEUE_PREV(q) = (q); \
419
  } while (0)
420

421
#define QUEUE_ADD(h, n)                 \
422
  do {                                  \
423
    QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
424
    QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
425
    QUEUE_PREV(h) = QUEUE_PREV(n);      \
426
    QUEUE_PREV_NEXT(h) = (h);           \
427
  } while (0)
428

429
#define QUEUE_SPLIT(h, q, n)       \
430
  do {                             \
431
    QUEUE_PREV(n) = QUEUE_PREV(h); \
432
    QUEUE_PREV_NEXT(n) = (n);      \
433
    QUEUE_NEXT(n) = (q);           \
434
    QUEUE_PREV(h) = QUEUE_PREV(q); \
435
    QUEUE_PREV_NEXT(h) = (h);      \
436
    QUEUE_PREV(q) = (n);           \
437
  } while (0)
438

439
#define QUEUE_MOVE(h, n)        \
440
  do {                          \
441
    if (QUEUE_EMPTY(h))         \
442
      QUEUE_INIT(n);            \
443
    else {                      \
444
      QUEUE *q = QUEUE_HEAD(h); \
445
      QUEUE_SPLIT(h, q, n);     \
446
    }                           \
447
  } while (0)
448

449
#define QUEUE_INSERT_HEAD(h, q)    \
450
  do {                             \
451
    QUEUE_NEXT(q) = QUEUE_NEXT(h); \
452
    QUEUE_PREV(q) = (h);           \
453
    QUEUE_NEXT_PREV(q) = (q);      \
454
    QUEUE_NEXT(h) = (q);           \
455
  } while (0)
456

457
#define QUEUE_INSERT_TAIL(h, q)    \
458
  do {                             \
459
    QUEUE_NEXT(q) = (h);           \
460
    QUEUE_PREV(q) = QUEUE_PREV(h); \
461
    QUEUE_PREV_NEXT(q) = (q);      \
462
    QUEUE_PREV(h) = (q);           \
463
  } while (0)
464

465
#define QUEUE_REMOVE(q)                 \
466
  do {                                  \
467
    QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
468
    QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
469
  } while (0)
470

471
enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 };
472

473
int64_t gUdfTaskSeqNum = 0;
474
typedef struct SUdfcFuncStub {
475
  char           udfName[TSDB_FUNC_NAME_LEN + 1];
476
  UdfcFuncHandle handle;
477
  int32_t        refCount;
478
  int64_t        createTime;
479
} SUdfcFuncStub;
480

481
typedef struct SUdfcProxy {
482
  char         udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
483
  uv_barrier_t initBarrier;
484

485
  uv_loop_t   uvLoop;
486
  uv_thread_t loopThread;
487
  uv_async_t  loopTaskAync;
488

489
  uv_async_t loopStopAsync;
490

491
  uv_mutex_t taskQueueMutex;
492
  int8_t     udfcState;
493
  QUEUE      taskQueue;
494
  QUEUE      uvProcTaskQueue;
495

496
  uv_mutex_t udfStubsMutex;
497
  SArray    *udfStubs;         // SUdfcFuncStub
498
  SArray    *expiredUdfStubs;  // SUdfcFuncStub
499

500
  uv_mutex_t udfcUvMutex;
501
  int8_t     initialized;
502
} SUdfcProxy;
503

504
SUdfcProxy gUdfcProxy = {0};
505

506
typedef struct SUdfcUvSession {
507
  SUdfcProxy *udfc;
508
  int64_t     severHandle;
509
  uv_pipe_t  *udfUvPipe;
510

511
  int8_t  outputType;
512
  int32_t bytes;
513
  int32_t bufSize;
514

515
  char udfName[TSDB_FUNC_NAME_LEN + 1];
516
} SUdfcUvSession;
517

518
typedef struct SClientUvTaskNode {
519
  SUdfcProxy *udfc;
520
  int8_t      type;
521
  int32_t     errCode;
522

523
  uv_pipe_t *pipe;
524

525
  int64_t  seqNum;
526
  uv_buf_t reqBuf;
527

528
  uv_sem_t taskSem;
529
  uv_buf_t rspBuf;
530

531
  QUEUE recvTaskQueue;
532
  QUEUE procTaskQueue;
533
  QUEUE connTaskQueue;
534
} SClientUvTaskNode;
535

536
typedef struct SClientUdfTask {
537
  int8_t type;
538

539
  SUdfcUvSession *session;
540

541
  union {
542
    struct {
543
      SUdfSetupRequest  req;
544
      SUdfSetupResponse rsp;
545
    } _setup;
546
    struct {
547
      SUdfCallRequest  req;
548
      SUdfCallResponse rsp;
549
    } _call;
550
    struct {
551
      SUdfTeardownRequest  req;
552
      SUdfTeardownResponse rsp;
553
    } _teardown;
554
  };
555

556
} SClientUdfTask;
557

558
typedef struct SClientConnBuf {
559
  char   *buf;
560
  int32_t len;
561
  int32_t cap;
562
  int32_t total;
563
} SClientConnBuf;
564

565
typedef struct SClientUvConn {
566
  uv_pipe_t      *pipe;
567
  QUEUE           taskQueue;
568
  SClientConnBuf  readBuf;
569
  SUdfcUvSession *session;
570
} SClientUvConn;
571

572
enum {
573
  UDFC_STATE_INITAL = 0,  // initial state
574
  UDFC_STATE_STARTNG,     // starting after udfcOpen
575
  UDFC_STATE_READY,       // started and begin to receive quests
576
  UDFC_STATE_STOPPING,    // stopping after udfcClose
577
};
578

579
void    getUdfdPipeName(char *pipeName, int32_t size);
580
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
581
void   *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request);
582
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state);
583
void   *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state);
584
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call);
585
void   *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call);
586
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown);
587
void   *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown);
588
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request);
589
void   *decodeUdfRequest(const void *buf, SUdfRequest *request);
590
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp);
591
void   *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp);
592
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp);
593
void   *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp);
594
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp);
595
void   *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse);
596
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp);
597
void   *decodeUdfResponse(const void *buf, SUdfResponse *rsp);
598
void    freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta);
599
void    freeUdfColumn(SUdfColumn *col);
600
void    freeUdfDataDataBlock(SUdfDataBlock *block);
601
void    freeUdfInterBuf(SUdfInterBuf *buf);
602
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
603
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
604
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
605
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);
606

607
void getUdfdPipeName(char *pipeName, int32_t size) {
3,651✔
608
  char    dnodeId[8] = {0};
3,651✔
609
  size_t  dnodeIdSize = sizeof(dnodeId);
3,651✔
610
  int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
3,651✔
611
  if (err != 0) {
3,651!
612
    fnError("failed to get dnodeId from env since %s", uv_err_name(err));
×
613
    dnodeId[0] = '1';
×
614
  }
615
#ifdef _WIN32
616
  snprintf(pipeName, size, "%s.%x.%s", UDF_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir)),
617
           dnodeId);
618
#else
619
  snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
3,651✔
620
#endif
621
  fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName);
3,651!
622
}
3,651✔
623

624
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
262✔
625
  int32_t len = 0;
262✔
626
  len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
262✔
627
  return len;
262✔
628
}
629

630
void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request) {
735✔
631
  buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
735✔
632
  return (void *)buf;
735✔
633
}
634

635
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state) {
10,228✔
636
  int32_t len = 0;
10,228✔
637
  len += taosEncodeFixedI8(buf, state->numOfResult);
10,228✔
638
  len += taosEncodeFixedI32(buf, state->bufLen);
10,228✔
639
  len += taosEncodeBinary(buf, state->buf, state->bufLen);
10,228✔
640
  return len;
10,228✔
641
}
642

643
void *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state) {
4,430✔
644
  buf = taosDecodeFixedI8(buf, &state->numOfResult);
4,430✔
645
  buf = taosDecodeFixedI32(buf, &state->bufLen);
4,430!
646
  buf = taosDecodeBinary(buf, (void **)&state->buf, state->bufLen);
4,430✔
647
  return (void *)buf;
4,430✔
648
}
649

650
int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
2,270✔
651
  int32_t len = 0;
2,270✔
652
  len += taosEncodeFixedI64(buf, call->udfHandle);
2,270✔
653
  len += taosEncodeFixedI8(buf, call->callType);
2,270✔
654
  if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
2,270✔
655
    len += tEncodeDataBlock(buf, &call->block);
334✔
656
  } else if (call->callType == TSDB_UDF_CALL_AGG_INIT) {
1,936✔
657
    len += taosEncodeFixedI8(buf, call->initFirst);
932✔
658
  } else if (call->callType == TSDB_UDF_CALL_AGG_PROC) {
1,470✔
659
    len += tEncodeDataBlock(buf, &call->block);
1,004✔
660
    len += encodeUdfInterBuf(buf, &call->interBuf);
1,004✔
661
  } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
466!
662
    len += encodeUdfInterBuf(buf, &call->interBuf);
×
663
    len += encodeUdfInterBuf(buf, &call->interBuf2);
×
664
  } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
466!
665
    len += encodeUdfInterBuf(buf, &call->interBuf);
466✔
666
  }
667
  return len;
2,270✔
668
}
669

670
void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) {
32,133✔
671
  buf = taosDecodeFixedI64(buf, &call->udfHandle);
32,133!
672
  buf = taosDecodeFixedI8(buf, &call->callType);
32,133✔
673
  switch (call->callType) {
32,133!
674
    case TSDB_UDF_CALL_SCALA_PROC:
27,756✔
675
      buf = tDecodeDataBlock(buf, &call->block);
27,756✔
676
      break;
27,738✔
677
    case TSDB_UDF_CALL_AGG_INIT:
917✔
678
      buf = taosDecodeFixedI8(buf, &call->initFirst);
917✔
679
      break;
917✔
680
    case TSDB_UDF_CALL_AGG_PROC:
2,571✔
681
      buf = tDecodeDataBlock(buf, &call->block);
2,571✔
682
      buf = decodeUdfInterBuf(buf, &call->interBuf);
2,571✔
683
      break;
2,571✔
684
    case TSDB_UDF_CALL_AGG_MERGE:
×
685
      buf = decodeUdfInterBuf(buf, &call->interBuf);
×
686
      buf = decodeUdfInterBuf(buf, &call->interBuf2);
×
687
      break;
×
688
    case TSDB_UDF_CALL_AGG_FIN:
891✔
689
      buf = decodeUdfInterBuf(buf, &call->interBuf);
891✔
690
      break;
891✔
691
  }
692
  return (void *)buf;
32,093✔
693
}
694

695
int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) {
260✔
696
  int32_t len = 0;
260✔
697
  len += taosEncodeFixedI64(buf, teardown->udfHandle);
260✔
698
  return len;
260✔
699
}
700

701
void *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown) {
677✔
702
  buf = taosDecodeFixedI64(buf, &teardown->udfHandle);
677!
703
  return (void *)buf;
677✔
704
}
705

706
int32_t encodeUdfRequest(void **buf, const SUdfRequest *request) {
2,792✔
707
  int32_t len = 0;
2,792✔
708
  if (buf == NULL) {
2,792✔
709
    len += sizeof(request->msgLen);
1,396✔
710
  } else {
711
    *(int32_t *)(*buf) = request->msgLen;
1,396✔
712
    *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen));
1,396✔
713
  }
714
  len += taosEncodeFixedI64(buf, request->seqNum);
2,792✔
715
  len += taosEncodeFixedI8(buf, request->type);
2,792✔
716
  if (request->type == UDF_TASK_SETUP) {
2,792✔
717
    len += encodeUdfSetupRequest(buf, &request->setup);
262✔
718
  } else if (request->type == UDF_TASK_CALL) {
2,530✔
719
    len += encodeUdfCallRequest(buf, &request->call);
2,270✔
720
  } else if (request->type == UDF_TASK_TEARDOWN) {
260!
721
    len += encodeUdfTeardownRequest(buf, &request->teardown);
260✔
722
  }
723
  return len;
2,792✔
724
}
725

726
void *decodeUdfRequest(const void *buf, SUdfRequest *request) {
33,547✔
727
  request->msgLen = *(int32_t *)(buf);
33,547✔
728
  buf = POINTER_SHIFT(buf, sizeof(request->msgLen));
33,547✔
729

730
  buf = taosDecodeFixedI64(buf, &request->seqNum);
33,547!
731
  buf = taosDecodeFixedI8(buf, &request->type);
33,547✔
732

733
  if (request->type == UDF_TASK_SETUP) {
33,547✔
734
    buf = decodeUdfSetupRequest(buf, &request->setup);
735✔
735
  } else if (request->type == UDF_TASK_CALL) {
32,812✔
736
    buf = decodeUdfCallRequest(buf, &request->call);
32,137✔
737
  } else if (request->type == UDF_TASK_TEARDOWN) {
675!
738
    buf = decodeUdfTeardownRequest(buf, &request->teardown);
677✔
739
  }
740
  return (void *)buf;
33,517✔
741
}
742

743
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
1,470✔
744
  int32_t len = 0;
1,470✔
745
  len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
1,470✔
746
  len += taosEncodeFixedI8(buf, setupRsp->outputType);
1,470✔
747
  len += taosEncodeFixedI32(buf, setupRsp->bytes);
1,470✔
748
  len += taosEncodeFixedI32(buf, setupRsp->bufSize);
1,470✔
749
  return len;
1,470✔
750
}
751

752
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
131✔
753
  buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
131!
754
  buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
131✔
755
  buf = taosDecodeFixedI32(buf, &setupRsp->bytes);
131!
756
  buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
131!
757
  return (void *)buf;
131✔
758
}
759

760
int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
64,173✔
761
  int32_t len = 0;
64,173✔
762
  len += taosEncodeFixedI8(buf, callRsp->callType);
64,173✔
763
  switch (callRsp->callType) {
64,173!
764
    case TSDB_UDF_CALL_SCALA_PROC:
55,419✔
765
      len += tEncodeDataBlock(buf, &callRsp->resultData);
55,419✔
766
      break;
55,373✔
767
    case TSDB_UDF_CALL_AGG_INIT:
1,834✔
768
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
1,834✔
769
      break;
1,834✔
770
    case TSDB_UDF_CALL_AGG_PROC:
5,142✔
771
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
5,142✔
772
      break;
5,142✔
773
    case TSDB_UDF_CALL_AGG_MERGE:
×
774
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
×
775
      break;
×
776
    case TSDB_UDF_CALL_AGG_FIN:
1,782✔
777
      len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
1,782✔
778
      break;
1,782✔
779
  }
780
  return len;
64,127✔
781
}
782

783
void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) {
1,135✔
784
  buf = taosDecodeFixedI8(buf, &callRsp->callType);
1,135✔
785
  switch (callRsp->callType) {
1,135!
786
    case TSDB_UDF_CALL_SCALA_PROC:
167✔
787
      buf = tDecodeDataBlock(buf, &callRsp->resultData);
167✔
788
      break;
167✔
789
    case TSDB_UDF_CALL_AGG_INIT:
233✔
790
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
233✔
791
      break;
233✔
792
    case TSDB_UDF_CALL_AGG_PROC:
502✔
793
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
502✔
794
      break;
502✔
795
    case TSDB_UDF_CALL_AGG_MERGE:
×
796
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
×
797
      break;
×
798
    case TSDB_UDF_CALL_AGG_FIN:
233✔
799
      buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
233✔
800
      break;
233✔
801
  }
802
  return (void *)buf;
1,135✔
803
}
804

805
int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp) { return 0; }
1,354✔
806

807
void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse) { return (void *)buf; }
130✔
808

809
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) {
67,041✔
810
  int32_t len = 0;
67,041✔
811
  len += sizeof(rsp->msgLen);
67,041✔
812
  if (buf != NULL) {
67,041✔
813
    *(int32_t *)(*buf) = rsp->msgLen;
33,539✔
814
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
33,539✔
815
  }
816

817
  len += sizeof(rsp->seqNum);
67,041✔
818
  if (buf != NULL) {
67,041✔
819
    *(int64_t *)(*buf) = rsp->seqNum;
33,539✔
820
    *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
33,539✔
821
  }
822

823
  len += taosEncodeFixedI64(buf, rsp->seqNum);
67,041✔
824
  len += taosEncodeFixedI8(buf, rsp->type);
67,041✔
825
  len += taosEncodeFixedI32(buf, rsp->code);
67,041✔
826

827
  switch (rsp->type) {
67,041!
828
    case UDF_TASK_SETUP:
1,470✔
829
      len += encodeUdfSetupResponse(buf, &rsp->setupRsp);
1,470✔
830
      break;
1,470✔
831
    case UDF_TASK_CALL:
64,218✔
832
      len += encodeUdfCallResponse(buf, &rsp->callRsp);
64,218✔
833
      break;
64,103✔
834
    case UDF_TASK_TEARDOWN:
1,354✔
835
      len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp);
1,354✔
836
      break;
1,354✔
UNCOV
837
    default:
×
UNCOV
838
      fnError("encode udf response, invalid udf response type %d", rsp->type);
×
839
      break;
×
840
  }
841
  return len;
66,927✔
842
}
843

844
void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) {
1,396✔
845
  rsp->msgLen = *(int32_t *)(buf);
1,396✔
846
  buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
1,396✔
847
  rsp->seqNum = *(int64_t *)(buf);
1,396✔
848
  buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
1,396✔
849
  buf = taosDecodeFixedI64(buf, &rsp->seqNum);
1,396!
850
  buf = taosDecodeFixedI8(buf, &rsp->type);
1,396✔
851
  buf = taosDecodeFixedI32(buf, &rsp->code);
1,396!
852

853
  switch (rsp->type) {
1,396!
854
    case UDF_TASK_SETUP:
131✔
855
      buf = decodeUdfSetupResponse(buf, &rsp->setupRsp);
131✔
856
      break;
131✔
857
    case UDF_TASK_CALL:
1,135✔
858
      buf = decodeUdfCallResponse(buf, &rsp->callRsp);
1,135✔
859
      break;
1,135✔
860
    case UDF_TASK_TEARDOWN:
130✔
861
      buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp);
130✔
862
      break;
130✔
863
    default:
×
864
      rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR;
×
865
      fnError("decode udf response, invalid udf response type %d", rsp->type);
×
866
      break;
×
867
  }
868
  if (buf == NULL) {
1,396!
869
    rsp->code = terrno;
×
870
    fnError("decode udf response failed, code:0x%x", rsp->code);
×
871
  }
872
  return (void *)buf;
1,396✔
873
}
874

875
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
58,745✔
876
  if (IS_VAR_DATA_TYPE(meta->type)) {
58,745!
877
    taosMemoryFree(data->varLenCol.varOffsets);
251✔
878
    data->varLenCol.varOffsets = NULL;
261✔
879
    taosMemoryFree(data->varLenCol.payload);
261✔
880
    data->varLenCol.payload = NULL;
261✔
881
  } else {
882
    taosMemoryFree(data->fixLenCol.nullBitmap);
58,494✔
883
    data->fixLenCol.nullBitmap = NULL;
58,377✔
884
    taosMemoryFree(data->fixLenCol.data);
58,377✔
885
    data->fixLenCol.data = NULL;
58,497✔
886
  }
887
}
58,758✔
888

889
void freeUdfColumn(SUdfColumn *col) { freeUdfColumnData(&col->colData, &col->colMeta); }
58,765✔
890

891
void freeUdfDataDataBlock(SUdfDataBlock *block) {
30,211✔
892
  for (int32_t i = 0; i < block->numOfCols; ++i) {
61,287✔
893
    freeUdfColumn(block->udfCols[i]);
30,968✔
894
    taosMemoryFree(block->udfCols[i]);
31,037✔
895
    block->udfCols[i] = NULL;
31,076✔
896
  }
897
  taosMemoryFree(block->udfCols);
30,319✔
898
  block->udfCols = NULL;
30,318✔
899
}
30,318✔
900

901
void freeUdfInterBuf(SUdfInterBuf *buf) {
8,809✔
902
  taosMemoryFree(buf->buf);
8,809✔
903
  buf->buf = NULL;
8,809✔
904
}
8,809✔
905

906
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
30,267✔
907
  udfBlock->numOfRows = block->info.rows;
30,267✔
908
  udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
30,267✔
909
  udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
30,280✔
910
  if ((udfBlock->udfCols) == NULL) {
30,316!
911
    return terrno;
×
912
  }
913
  for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
61,326✔
914
    udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
31,005✔
915
    if (udfBlock->udfCols[i] == NULL) {
31,037!
916
      return terrno;
×
917
    }
918
    SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
31,037✔
919
    SUdfColumn      *udfCol = udfBlock->udfCols[i];
31,006✔
920
    udfCol->colMeta.type = col->info.type;
31,006✔
921
    udfCol->colMeta.bytes = col->info.bytes;
31,006✔
922
    udfCol->colMeta.scale = col->info.scale;
31,006✔
923
    udfCol->colMeta.precision = col->info.precision;
31,006✔
924
    udfCol->colData.numOfRows = udfBlock->numOfRows;
31,006✔
925
    udfCol->hasNull = col->hasNull;
31,006✔
926
    if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
31,006!
927
      udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
134✔
928
      udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
134✔
929
      if (udfCol->colData.varLenCol.varOffsets == NULL) {
161!
930
        return terrno;
×
931
      }
932
      memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
161✔
933
      udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
161✔
934
      udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
161✔
935
      if (udfCol->colData.varLenCol.payload == NULL) {
161!
936
        return terrno;
×
937
      }
938
      if (col->reassigned) {
161!
939
        for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
×
940
          char   *pColData = col->pData + col->varmeta.offset[row];
×
941
          int32_t colSize = 0;
×
942
          if (col->info.type == TSDB_DATA_TYPE_JSON) {
×
943
            colSize = getJsonValueLen(pColData);
×
944
          } else {
945
            colSize = varDataTLen(pColData);
×
946
          }
947
          memcpy(udfCol->colData.varLenCol.payload, pColData, colSize);
×
948
          udfCol->colData.varLenCol.payload += colSize;
×
949
        }
950
      } else {
951
        memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
161✔
952
      }
953
    } else {
954
      udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
30,872✔
955
      int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
30,872✔
956
      udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
30,872✔
957
      if (udfCol->colData.fixLenCol.nullBitmap == NULL) {
30,873!
958
        return terrno;
×
959
      }
960
      char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
30,873✔
961
      memcpy(bitmap, col->nullbitmap, bitmapLen);
30,873✔
962
      udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
30,873✔
963
      int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
30,880✔
964
      udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
30,880✔
965
      if (NULL == udfCol->colData.fixLenCol.data) {
30,849!
966
        return terrno;
×
967
      }
968
      char *data = udfCol->colData.fixLenCol.data;
30,849✔
969
      memcpy(data, col->pData, dataLen);
30,849✔
970
    }
971
  }
972
  return TSDB_CODE_SUCCESS;
30,321✔
973
}
974

975
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
27,624✔
976
  int32_t         code = 0, lino = 0;
27,624✔
977
  SUdfColumnMeta *meta = &udfCol->colMeta;
27,624✔
978

979
  SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
27,624✔
980
  code = blockDataAppendColInfo(block, &colInfoData);
27,641✔
981
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,728!
982

983
  code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
27,728✔
984
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,653!
985

986
  SColumnInfoData *col = NULL;
27,653✔
987
  code = bdGetColumnInfoData(block, 0, &col);
27,653✔
988
  TAOS_CHECK_GOTO(code, &lino, _exit);
27,722!
989

990
  for (int32_t i = 0; i < udfCol->colData.numOfRows; ++i) {
6,651,258✔
991
    if (udfColDataIsNull(udfCol, i)) {
6,599,196✔
992
      colDataSetNULL(col, i);
698,646✔
993
    } else {
994
      char *data = udfColDataGetData(udfCol, i);
5,900,550✔
995
      code = colDataSetVal(col, i, data, false);
5,900,550✔
996
      TAOS_CHECK_GOTO(code, &lino, _exit);
5,924,890!
997
    }
998
  }
999
  block->info.rows = udfCol->colData.numOfRows;
52,062✔
1000

1001
_exit:
52,062✔
1002
  if (code != 0) {
52,062!
1003
    fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino);
×
1004
  }
1005
  return TSDB_CODE_SUCCESS;
27,721✔
1006
}
1007

1008
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
167✔
1009
  int32_t code = 0, lino = 0;
167✔
1010
  int32_t numOfRows = 0;
167✔
1011
  for (int32_t i = 0; i < numOfCols; ++i) {
365✔
1012
    numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
198✔
1013
  }
1014

1015
  // create the basic block info structure
1016
  for (int32_t i = 0; i < numOfCols; ++i) {
365✔
1017
    SColumnInfoData *pInfo = input[i].columnData;
198✔
1018
    SColumnInfoData  d = {0};
198✔
1019
    d.info = pInfo->info;
198✔
1020

1021
    TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit);
198!
1022
  }
1023

1024
  TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit);
167!
1025

1026
  for (int32_t i = 0; i < numOfCols; ++i) {
365✔
1027
    SColumnInfoData *pDest = taosArrayGet(output->pDataBlock, i);
198✔
1028

1029
    SColumnInfoData *pColInfoData = input[i].columnData;
198✔
1030
    TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit);
198!
1031

1032
    if (input[i].numOfRows < numOfRows) {
198✔
1033
      int32_t startRow = input[i].numOfRows;
1✔
1034
      int32_t expandRows = numOfRows - startRow;
1✔
1035
      bool    isNull = colDataIsNull_s(pColInfoData, (input + i)->numOfRows - 1);
1!
1036
      if (isNull) {
1!
1037
        colDataSetNNULL(pDest, startRow, expandRows);
×
1038
      } else {
1039
        char *src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
1!
1040
        for (int32_t j = 0; j < expandRows; ++j) {
7✔
1041
          TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow + j, src, false), &lino, _exit);
6!
1042
        }
1043
      }
1044
    }
1045
  }
1046

1047
  output->info.rows = numOfRows;
167✔
1048
_exit:
167✔
1049
  if (code != 0) {
167!
1050
    fnError("failed to convert scalar param to data block, code:%d, line:%d", code, lino);
×
1051
  }
1052
  return code;
167✔
1053
}
1054

1055
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
167✔
1056
  if (taosArrayGetSize(input->pDataBlock) != 1) {
167!
1057
    fnError("scalar function only support one column");
×
1058
    return 0;
×
1059
  }
1060
  output->numOfRows = input->info.rows;
167✔
1061

1062
  output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
167✔
1063
  if (output->columnData == NULL) {
167!
1064
    return terrno;
×
1065
  }
1066
  memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
167✔
1067
  output->colAlloced = true;
167✔
1068

1069
  return 0;
167✔
1070
}
1071

1072
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
1073
// memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
1074
typedef struct SUdfAggRes {
1075
  int8_t  finalResNum;
1076
  int8_t  interResNum;
1077
  int32_t interResBufLen;
1078
  char   *finalResBuf;
1079
  char   *interResBuf;
1080
} SUdfAggRes;
1081

1082
void    onUdfcPipeClose(uv_handle_t *handle);
1083
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
1084
void    udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
1085
bool    isUdfcUvMsgComplete(SClientConnBuf *connBuf);
1086
void    udfcUvHandleRsp(SClientUvConn *conn);
1087
void    udfcUvHandleError(SClientUvConn *conn);
1088
void    onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
1089
void    onUdfcPipeWrite(uv_write_t *write, int32_t status);
1090
void    onUdfcPipeConnect(uv_connect_t *connect, int32_t status);
1091
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
1092
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
1093
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
1094
void    udfcAsyncTaskCb(uv_async_t *async);
1095
void    cleanUpUvTasks(SUdfcProxy *udfc);
1096
void    udfStopAsyncCb(uv_async_t *async);
1097
void    constructUdfService(void *argsThread);
1098
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
1099
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
1100
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2);
1101
int32_t doTeardownUdf(UdfcFuncHandle handle);
1102

1103
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1104
                SSDataBlock *output, SUdfInterBuf *newState);
1105
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
1106
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
1107
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
1108
                          SUdfInterBuf *resultBuf);
1109
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
1110
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1111
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
1112

1113
int32_t udfcOpen();
1114
int32_t udfcClose();
1115

1116
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
1117
void    releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
1118
int32_t cleanUpUdfs();
1119

1120
bool    udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
1121
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
1122
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
1123
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
1124

1125
void    cleanupNotExpiredUdfs();
1126
void    cleanupExpiredUdfs();
1127
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) {
3,876✔
1128
  SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
3,876✔
1129
  SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
3,876✔
1130
  return strcmp(stub1->udfName, stub2->udfName);
3,876✔
1131
}
1132

1133
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
1,135✔
1134
  int32_t code = 0, line = 0;
1,135✔
1135
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
1,135✔
1136
  SUdfcFuncStub key = {0};
1,135✔
1137
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
1,135✔
1138
  int32_t stubIndex = taosArraySearchIdx(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
1,135✔
1139
  if (stubIndex != -1) {
1,135✔
1140
    SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
1,004✔
1141
    UdfcFuncHandle handle = foundStub->handle;
1,004✔
1142
    int64_t        currUs = taosGetTimestampUs();
1,004✔
1143
    bool           expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000;
1,004✔
1144
    if (!expired) {
1,004!
1145
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
1,004!
1146
        *pHandle = foundStub->handle;
1,004✔
1147
        ++foundStub->refCount;
1,004✔
1148
        uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
1,004✔
1149
        return 0;
1,004✔
1150
      } else {
1151
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
×
1152
               foundStub->refCount, foundStub->createTime);
1153
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
×
1154
      }
1155
    } else {
1156
      fnDebug("udf handle expired for %s, will setup udf. move it to expired list", udfName);
×
1157
      if (taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub) == NULL) {
×
1158
        fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1159
      } else {
1160
        taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
×
1161
        taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
×
1162
      }
1163
    }
1164
  }
1165
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
131✔
1166
  *pHandle = NULL;
131✔
1167
  code = doSetupUdf(udfName, pHandle);
131✔
1168
  if (code == TSDB_CODE_SUCCESS) {
131!
1169
    SUdfcFuncStub stub = {0};
131✔
1170
    tstrncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
131✔
1171
    stub.handle = *pHandle;
131✔
1172
    ++stub.refCount;
131✔
1173
    stub.createTime = taosGetTimestampUs();
131✔
1174
    uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
131✔
1175
    if (taosArrayPush(gUdfcProxy.udfStubs, &stub) == NULL) {
262!
1176
      fnError("acquireUdfFuncHandle: failed to push udf stub to array");
×
1177
      uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
1178
      goto _exit;
×
1179
    } else {
1180
      taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
131✔
1181
    }
1182
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
131✔
1183
  } else {
1184
    *pHandle = NULL;
×
1185
  }
1186

1187
_exit:
131✔
1188
  return code;
131✔
1189
}
1190

1191
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
1,135✔
1192
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
1,135✔
1193
  SUdfcFuncStub key = {0};
1,135✔
1194
  tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
1,135✔
1195
  SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
1,135✔
1196
  SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ);
1,135✔
1197
  if (!foundStub && !expiredStub) {
1,135!
1198
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
×
1199
    return;
×
1200
  }
1201
  if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) {
1,135!
1202
    --foundStub->refCount;
1,134✔
1203
  }
1204
  if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) {
1,135!
1205
    --expiredStub->refCount;
×
1206
  }
1207
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
1,135✔
1208
}
1209

1210
void cleanupExpiredUdfs() {
274✔
1211
  int32_t i = 0;
274✔
1212
  SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
274✔
1213
  if (expiredUdfStubs == NULL) {
274!
1214
    fnError("cleanupExpiredUdfs: failed to init array");
×
1215
    return;
×
1216
  }
1217
  while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
274!
1218
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
×
1219
    if (stub->refCount == 0) {
×
1220
      fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle,
×
1221
             stub->refCount);
1222
      (void)doTeardownUdf(stub->handle);
×
1223
    } else {
1224
      fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p",
×
1225
             stub->udfName, stub->refCount, stub->createTime, stub->handle);
1226
      UdfcFuncHandle handle = stub->handle;
×
1227
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
×
1228
        if (taosArrayPush(expiredUdfStubs, stub) == NULL) {
×
1229
          fnError("cleanupExpiredUdfs: failed to push udf stub to array");
×
1230
        }
1231
      } else {
1232
        fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
×
1233
               stub->udfName, stub->refCount, stub->createTime);
1234
      }
1235
    }
1236
    ++i;
×
1237
  }
1238
  taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
274✔
1239
  gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
274✔
1240
}
1241

1242
void cleanupNotExpiredUdfs() {
274✔
1243
  SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
274✔
1244
  if (udfStubs == NULL) {
274!
1245
    fnError("cleanupNotExpiredUdfs: failed to init array");
×
1246
    return;
×
1247
  }
1248
  int32_t i = 0;
274✔
1249
  while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
617✔
1250
    SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
343✔
1251
    if (stub->refCount == 0) {
343✔
1252
      fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
130!
1253
      (void)doTeardownUdf(stub->handle);
130✔
1254
    } else {
1255
      fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
213!
1256
             stub->refCount, stub->createTime, stub->handle);
1257
      UdfcFuncHandle handle = stub->handle;
213✔
1258
      if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
213!
1259
        if (taosArrayPush(udfStubs, stub) == NULL) {
213!
1260
          fnError("cleanupNotExpiredUdfs: failed to push udf stub to array");
×
1261
        }
1262
      } else {
1263
        fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName,
×
1264
               stub->refCount, stub->createTime);
1265
      }
1266
    }
1267
    ++i;
343✔
1268
  }
1269
  taosArrayDestroy(gUdfcProxy.udfStubs);
274✔
1270
  gUdfcProxy.udfStubs = udfStubs;
274✔
1271
}
1272

1273
int32_t cleanUpUdfs() {
6,257,982✔
1274
  int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
6,257,982✔
1275
  if (!initialized) {
6,258,748✔
1276
    return TSDB_CODE_SUCCESS;
90,232✔
1277
  }
1278

1279
  uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
6,168,516✔
1280
  if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
6,170,165!
1281
      (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
6,169,891!
1282
    uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
6,169,891✔
1283
    return TSDB_CODE_SUCCESS;
6,169,877✔
1284
  }
1285

1286
  cleanupNotExpiredUdfs();
274✔
1287
  cleanupExpiredUdfs();
274✔
1288

1289
  uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
274✔
1290
  return 0;
274✔
1291
}
1292

1293
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
167✔
1294
  UdfcFuncHandle handle = NULL;
167✔
1295
  int32_t        code = acquireUdfFuncHandle(udfName, &handle);
167✔
1296
  if (code != 0) {
167!
1297
    return code;
×
1298
  }
1299

1300
  SUdfcUvSession *session = handle;
167✔
1301
  code = doCallUdfScalarFunc(handle, input, numOfCols, output);
167✔
1302
  if (code != TSDB_CODE_SUCCESS) {
167!
1303
    fnError("udfc scalar function execution failure");
×
1304
    releaseUdfFuncHandle(udfName, handle);
×
1305
    return code;
×
1306
  }
1307

1308
  if (output->columnData == NULL) {
167!
1309
    fnError("udfc scalar function calculate error. no column data");
×
1310
    code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1311
  } else {
1312
    if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) {
167!
1313
      fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
×
1314
              session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes);
1315
      code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1316
    }
1317
  }
1318
  releaseUdfFuncHandle(udfName, handle);
167✔
1319
  return code;
167✔
1320
}
1321

1322
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
117✔
1323
  if (fmIsScalarFunc(pFunc->funcId)) {
117!
1324
    return false;
×
1325
  }
1326
  pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
117✔
1327
  return true;
117✔
1328
}
1329

1330
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) {
233✔
1331
  if (pResultCellInfo->initialized) {
233!
1332
    return TSDB_CODE_SUCCESS;
×
1333
  }
1334
  if (functionSetup(pCtx, pResultCellInfo) != TSDB_CODE_SUCCESS) {
233!
1335
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1336
  }
1337
  UdfcFuncHandle handle;
1338
  int32_t        udfCode = 0;
233✔
1339
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
233!
1340
    fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
×
1341
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1342
  }
1343
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
233✔
1344
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
233✔
1345
  int32_t         envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize;
233✔
1346
  memset(udfRes, 0, envSize);
233✔
1347

1348
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
233✔
1349
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
233✔
1350

1351
  SUdfInterBuf buf = {0};
233✔
1352
  if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
233!
1353
    fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
×
1354
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
1355
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1356
  }
1357
  if (buf.bufLen <= session->bufSize) {
233!
1358
    memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
233✔
1359
    udfRes->interResBufLen = buf.bufLen;
233✔
1360
    udfRes->interResNum = buf.numOfResult;
233✔
1361
  } else {
1362
    fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
×
1363
    releaseUdfFuncHandle(pCtx->udfName, handle);
×
1364
    return TSDB_CODE_FUNC_SETUP_ERROR;
×
1365
  }
1366
  releaseUdfFuncHandle(pCtx->udfName, handle);
233✔
1367
  freeUdfInterBuf(&buf);
233✔
1368
  return TSDB_CODE_SUCCESS;
233✔
1369
}
1370

1371
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
502✔
1372
  int32_t        udfCode = 0;
502✔
1373
  UdfcFuncHandle handle = 0;
502✔
1374
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
502!
1375
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
1376
    return udfCode;
×
1377
  }
1378

1379
  SUdfcUvSession *session = handle;
502✔
1380
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
502✔
1381
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
502✔
1382
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
502✔
1383

1384
  SInputColumnInfoData *pInput = &pCtx->input;
502✔
1385
  int32_t               numOfCols = pInput->numOfInputCols;
502✔
1386
  int32_t               start = pInput->startRowIndex;
502✔
1387
  int32_t               numOfRows = pInput->numOfRows;
502✔
1388
  SSDataBlock          *pTempBlock = NULL;
502✔
1389
  int32_t               code = createDataBlock(&pTempBlock);
502✔
1390

1391
  if (code) {
502!
1392
    return code;
×
1393
  }
1394

1395
  pTempBlock->info.rows = pInput->totalRows;
502✔
1396
  pTempBlock->info.id.uid = pInput->uid;
502✔
1397
  for (int32_t i = 0; i < numOfCols; ++i) {
1,054✔
1398
    if ((udfCode = blockDataAppendColInfo(pTempBlock, pInput->pData[i])) != 0) {
552!
1399
      fnError("udfAggProcess error. step blockDataAppendColInfo. udf code: %d", udfCode);
×
1400
      blockDataDestroy(pTempBlock);
×
1401
      return udfCode;
×
1402
    }
1403
  }
1404

1405
  SSDataBlock *inputBlock = NULL;
502✔
1406
  code = blockDataExtractBlock(pTempBlock, start, numOfRows, &inputBlock);
502✔
1407
  if (code) {
502!
1408
    return code;
×
1409
  }
1410

1411
  SUdfInterBuf state = {
502✔
1412
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
502✔
1413
  SUdfInterBuf newState = {0};
502✔
1414

1415
  udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
502✔
1416
  if (udfCode != 0) {
502!
1417
    fnError("udfAggProcess error. code: %d", udfCode);
×
1418
    newState.numOfResult = 0;
×
1419
  } else {
1420
    if (newState.bufLen <= session->bufSize) {
502!
1421
      memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
502✔
1422
      udfRes->interResBufLen = newState.bufLen;
502✔
1423
      udfRes->interResNum = newState.numOfResult;
502✔
1424
    } else {
1425
      fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
×
1426
      udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
×
1427
    }
1428
  }
1429

1430
  GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum;
502✔
1431

1432
  blockDataDestroy(inputBlock);
502✔
1433

1434
  taosArrayDestroy(pTempBlock->pDataBlock);
502✔
1435
  taosMemoryFree(pTempBlock);
502✔
1436

1437
  releaseUdfFuncHandle(pCtx->udfName, handle);
502✔
1438
  freeUdfInterBuf(&newState);
502✔
1439
  return udfCode;
502✔
1440
}
1441

1442
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
233✔
1443
  int32_t        udfCode = 0;
233✔
1444
  UdfcFuncHandle handle = 0;
233✔
1445
  if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
233!
1446
    fnError("udfAggProcess  error. step acquireUdfFuncHandle. udf code: %d", udfCode);
×
1447
    return udfCode;
×
1448
  }
1449

1450
  SUdfcUvSession *session = handle;
233✔
1451
  SUdfAggRes     *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
233✔
1452
  udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
233✔
1453
  udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
233✔
1454

1455
  SUdfInterBuf resultBuf = {0};
233✔
1456
  SUdfInterBuf state = {
233✔
1457
      .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
233✔
1458
  int32_t udfCallCode = 0;
233✔
1459
  udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
233✔
1460
  if (udfCallCode != 0) {
233!
1461
    fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
×
1462
    GET_RES_INFO(pCtx)->numOfRes = 0;
×
1463
  } else {
1464
    if (resultBuf.numOfResult == 0) {
233!
1465
      udfRes->finalResNum = 0;
×
1466
      GET_RES_INFO(pCtx)->numOfRes = 0;
×
1467
    } else {
1468
      if (resultBuf.bufLen <= session->bytes) {
233!
1469
        memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen);
233✔
1470
        udfRes->finalResNum = resultBuf.numOfResult;
233✔
1471
        GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
233✔
1472
      } else {
1473
        fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
×
1474
        GET_RES_INFO(pCtx)->numOfRes = 0;
×
1475
        udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
×
1476
      }
1477
    }
1478
  }
1479

1480
  freeUdfInterBuf(&resultBuf);
233✔
1481

1482
  int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
233✔
1483
  releaseUdfFuncHandle(pCtx->udfName, handle);
233✔
1484
  return udfCallCode == 0 ? numOfResults : udfCallCode;
233!
1485
}
1486

1487
void onUdfcPipeClose(uv_handle_t *handle) {
130✔
1488
  SClientUvConn *conn = handle->data;
130✔
1489
  if (!QUEUE_EMPTY(&conn->taskQueue)) {
130!
1490
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
130✔
1491
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
130✔
1492
    task->errCode = 0;
130✔
1493
    QUEUE_REMOVE(&task->procTaskQueue);
130✔
1494
    uv_sem_post(&task->taskSem);
130✔
1495
  }
1496
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
130✔
1497
  if (conn->session != NULL) {
130!
1498
    conn->session->udfUvPipe = NULL;
130✔
1499
  }
1500
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
130✔
1501
  taosMemoryFree(conn->readBuf.buf);
130✔
1502
  taosMemoryFree(conn);
130✔
1503
  taosMemoryFree((uv_pipe_t *)handle);
130✔
1504
}
130✔
1505

1506
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
1,657✔
1507
  int32_t code = 0;
1,657✔
1508
  fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
1,657✔
1509
  if (uvTask->type == UV_TASK_REQ_RSP) {
1,657✔
1510
    if (uvTask->rspBuf.base != NULL) {
1,396!
1511
      SUdfResponse rsp = {0};
1,396✔
1512
      void        *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
1,396✔
1513
      code = rsp.code;
1,396✔
1514
      if (code != 0) {
1,396!
1515
        fnError("udfc get udf task result failure. code: %d", code);
×
1516
      }
1517

1518
      switch (task->type) {
1,396!
1519
        case UDF_TASK_SETUP: {
131✔
1520
          task->_setup.rsp = rsp.setupRsp;
131✔
1521
          break;
131✔
1522
        }
1523
        case UDF_TASK_CALL: {
1,135✔
1524
          task->_call.rsp = rsp.callRsp;
1,135✔
1525
          break;
1,135✔
1526
        }
1527
        case UDF_TASK_TEARDOWN: {
130✔
1528
          task->_teardown.rsp = rsp.teardownRsp;
1529
          break;
130✔
1530
        }
1531
        default: {
×
1532
          break;
×
1533
        }
1534
      }
1535

1536
      // TODO: the call buffer is setup and freed by udf invocation
1537
      taosMemoryFree(uvTask->rspBuf.base);
1,396✔
1538
    } else {
1539
      code = uvTask->errCode;
×
1540
      if (code != 0) {
×
1541
        fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1542
      }
1543
    }
1544
  } else if (uvTask->type == UV_TASK_CONNECT) {
261✔
1545
    code = uvTask->errCode;
131✔
1546
    if (code != 0) {
131!
1547
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1548
    }
1549
  } else if (uvTask->type == UV_TASK_DISCONNECT) {
130!
1550
    code = uvTask->errCode;
130✔
1551
    if (code != 0) {
130!
1552
      fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
×
1553
    }
1554
  }
1555
  return code;
1,657✔
1556
}
1557

1558
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
4,188✔
1559
  SClientUvConn  *conn = handle->data;
4,188✔
1560
  SClientConnBuf *connBuf = &conn->readBuf;
4,188✔
1561

1562
  int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
4,188✔
1563
  if (connBuf->cap == 0) {
4,188✔
1564
    connBuf->buf = taosMemoryMalloc(msgHeadSize);
1,527✔
1565
    if (connBuf->buf) {
1,527!
1566
      connBuf->len = 0;
1,527✔
1567
      connBuf->cap = msgHeadSize;
1,527✔
1568
      connBuf->total = -1;
1,527✔
1569

1570
      buf->base = connBuf->buf;
1,527✔
1571
      buf->len = connBuf->cap;
1,527✔
1572
    } else {
1573
      fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
×
1574
      buf->base = NULL;
×
1575
      buf->len = 0;
×
1576
    }
1577
  } else if (connBuf->total == -1 && connBuf->len < msgHeadSize) {
2,661!
1578
    buf->base = connBuf->buf + connBuf->len;
1,265✔
1579
    buf->len = msgHeadSize - connBuf->len;
1,265✔
1580
  } else {
1581
    connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
1,396✔
1582
    void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
1,396✔
1583
    if (resultBuf) {
1,396!
1584
      connBuf->buf = resultBuf;
1,396✔
1585
      buf->base = connBuf->buf + connBuf->len;
1,396✔
1586
      buf->len = connBuf->cap - connBuf->len;
1,396✔
1587
    } else {
1588
      fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
×
1589
      buf->base = NULL;
×
1590
      buf->len = 0;
×
1591
    }
1592
  }
1593

1594
  fnDebug("udfc uv alloc buffer: cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
4,188✔
1595
}
4,188✔
1596

1597
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
2,792✔
1598
  if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) {
2,792!
1599
    connBuf->total = *(int32_t *)(connBuf->buf);
1,396✔
1600
  }
1601
  if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
2,792!
1602
    fnDebug("udfc complete message is received, now handle it");
1,396✔
1603
    return true;
1,396✔
1604
  }
1605
  return false;
1,396✔
1606
}
1607

1608
void udfcUvHandleRsp(SClientUvConn *conn) {
1,396✔
1609
  SClientConnBuf *connBuf = &conn->readBuf;
1,396✔
1610
  int64_t         seqNum = *(int64_t *)(connBuf->buf + sizeof(int32_t));  // msglen then seqnum
1,396✔
1611

1612
  if (QUEUE_EMPTY(&conn->taskQueue)) {
1,396!
1613
    fnError("udfc no task waiting on connection. response seqnum:%" PRId64, seqNum);
×
1614
    return;
×
1615
  }
1616
  bool               found = false;
1,396✔
1617
  SClientUvTaskNode *taskFound = NULL;
1,396✔
1618
  QUEUE             *h = QUEUE_NEXT(&conn->taskQueue);
1,396✔
1619
  SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
1,396✔
1620

1621
  while (h != &conn->taskQueue) {
2,792✔
1622
    fnDebug("udfc handle response iterate through queue. uvTask:%" PRId64 "-%p", task->seqNum, task);
1,396✔
1623
    if (task->seqNum == seqNum) {
1,396!
1624
      if (found == false) {
1,396!
1625
        found = true;
1,396✔
1626
        taskFound = task;
1,396✔
1627
      } else {
1628
        fnError("udfc more than one task waiting for the same response");
×
1629
        continue;
×
1630
      }
1631
    }
1632
    h = QUEUE_NEXT(h);
1,396✔
1633
    task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
1,396✔
1634
  }
1635

1636
  if (taskFound) {
1,396!
1637
    taskFound->rspBuf = uv_buf_init(connBuf->buf, connBuf->len);
1,396✔
1638
    QUEUE_REMOVE(&taskFound->connTaskQueue);
1,396✔
1639
    QUEUE_REMOVE(&taskFound->procTaskQueue);
1,396✔
1640
    uv_sem_post(&taskFound->taskSem);
1,396✔
1641
  } else {
1642
    fnError("no task is waiting for the response.");
×
1643
  }
1644
  connBuf->buf = NULL;
1,396✔
1645
  connBuf->total = -1;
1,396✔
1646
  connBuf->len = 0;
1,396✔
1647
  connBuf->cap = 0;
1,396✔
1648
}
1649

1650
void udfcUvHandleError(SClientUvConn *conn) {
×
1651
  fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe);
×
1652
  while (!QUEUE_EMPTY(&conn->taskQueue)) {
×
1653
    QUEUE             *h = QUEUE_HEAD(&conn->taskQueue);
×
1654
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
×
1655
    task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR;
×
1656
    QUEUE_REMOVE(&task->connTaskQueue);
×
1657
    QUEUE_REMOVE(&task->procTaskQueue);
×
1658
    uv_sem_post(&task->taskSem);
×
1659
  }
1660
  if (!uv_is_closing((uv_handle_t *)conn->pipe)) {
×
1661
    uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose);
×
1662
  }
1663
}
×
1664

1665
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
4,188✔
1666
  fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread);
4,188✔
1667
  if (nread == 0) return;
4,188✔
1668

1669
  SClientUvConn  *conn = client->data;
2,792✔
1670
  SClientConnBuf *connBuf = &conn->readBuf;
2,792✔
1671
  if (nread > 0) {
2,792!
1672
    connBuf->len += nread;
2,792✔
1673
    if (isUdfcUvMsgComplete(connBuf)) {
2,792✔
1674
      udfcUvHandleRsp(conn);
1,396✔
1675
    }
1676
  }
1677
  if (nread < 0) {
2,792!
1678
    fnError("udfc client pipe %p read error: %zd(%s).", client, nread, uv_strerror(nread));
×
1679
    if (nread == UV_EOF) {
×
1680
      fnError("\tudfc client pipe %p closed", client);
×
1681
    }
1682
    udfcUvHandleError(conn);
×
1683
  }
1684
}
1685

1686
void onUdfcPipeWrite(uv_write_t *write, int32_t status) {
1,396✔
1687
  SClientUvConn *conn = write->data;
1,396✔
1688
  if (status < 0) {
1,396!
1689
    fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
×
1690
    udfcUvHandleError(conn);
×
1691
  } else {
1692
    fnDebug("udfc client connection %p write succeed", conn);
1,396✔
1693
  }
1694
  taosMemoryFree(write);
1,396✔
1695
}
1,396✔
1696

1697
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status) {
131✔
1698
  SClientUvTaskNode *uvTask = connect->data;
131✔
1699
  if (status != 0) {
131!
1700
    fnError("client connect error, task seq: %" PRId64 ", code: %s", uvTask->seqNum, uv_strerror(status));
×
1701
  }
1702
  uvTask->errCode = status;
131✔
1703

1704
  int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
131✔
1705
  if (code != 0) {
131!
1706
    fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code));
×
1707
    uvTask->errCode = code;
×
1708
  }
1709
  taosMemoryFree(connect);
131✔
1710
  QUEUE_REMOVE(&uvTask->procTaskQueue);
131✔
1711
  uv_sem_post(&uvTask->taskSem);
131✔
1712
}
131✔
1713

1714
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask) {
1,657✔
1715
  uvTask->type = uvTaskType;
1,657✔
1716
  uvTask->udfc = task->session->udfc;
1,657✔
1717

1718
  if (uvTaskType == UV_TASK_CONNECT) {
1,657✔
1719
  } else if (uvTaskType == UV_TASK_REQ_RSP) {
1,526✔
1720
    uvTask->pipe = task->session->udfUvPipe;
1,396✔
1721
    SUdfRequest request;
1722
    request.type = task->type;
1,396✔
1723
    request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
1,396✔
1724

1725
    if (task->type == UDF_TASK_SETUP) {
1,396✔
1726
      request.setup = task->_setup.req;
131✔
1727
      request.type = UDF_TASK_SETUP;
131✔
1728
    } else if (task->type == UDF_TASK_CALL) {
1,265✔
1729
      request.call = task->_call.req;
1,135✔
1730
      request.type = UDF_TASK_CALL;
1,135✔
1731
    } else if (task->type == UDF_TASK_TEARDOWN) {
130!
1732
      request.teardown = task->_teardown.req;
130✔
1733
      request.type = UDF_TASK_TEARDOWN;
130✔
1734
    } else {
1735
      fnError("udfc create uv task, invalid task type : %d", task->type);
×
1736
    }
1737
    int32_t bufLen = encodeUdfRequest(NULL, &request);
1,396✔
1738
    if (bufLen <= 0) {
1,396!
1739
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
1740
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1741
    }
1742
    request.msgLen = bufLen;
1,396✔
1743
    void *bufBegin = taosMemoryMalloc(bufLen);
1,396✔
1744
    if (bufBegin == NULL) {
1,396!
1745
      fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen);
×
1746
      return terrno;
×
1747
    }
1748
    void *buf = bufBegin;
1,396✔
1749
    if (encodeUdfRequest(&buf, &request) <= 0) {
1,396!
1750
      fnError("udfc create uv task, encode request failed. size: %d", bufLen);
×
1751
      taosMemoryFree(bufBegin);
×
1752
      return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1753
    }
1754

1755
    uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
1,396✔
1756
    uvTask->seqNum = request.seqNum;
1,396✔
1757
  } else if (uvTaskType == UV_TASK_DISCONNECT) {
130!
1758
    uvTask->pipe = task->session->udfUvPipe;
130✔
1759
  }
1760
  if (uv_sem_init(&uvTask->taskSem, 0) != 0) {
1,657!
1761
    if (uvTaskType == UV_TASK_REQ_RSP) {
×
1762
      taosMemoryFree(uvTask->reqBuf.base);
×
1763
    }
1764
    fnError("udfc create uv task, init semaphore failed.");
×
1765
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1766
  }
1767

1768
  return 0;
1,657✔
1769
}
1770

1771
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
1,657✔
1772
  fnDebug("queue uv task to event loop, uvTask: %d-%p", uvTask->type, uvTask);
1,657✔
1773
  SUdfcProxy *udfc = uvTask->udfc;
1,657✔
1774
  uv_mutex_lock(&udfc->taskQueueMutex);
1,657✔
1775
  QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
1,657✔
1776
  uv_mutex_unlock(&udfc->taskQueueMutex);
1,657✔
1777
  int32_t code = uv_async_send(&udfc->loopTaskAync);
1,657✔
1778
  if (code != 0) {
1,657!
1779
    fnError("udfc queue uv task to event loop failed. code: %s", uv_strerror(code));
×
1780
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1781
  }
1782

1783
  uv_sem_wait(&uvTask->taskSem);
1,657✔
1784
  fnDebug("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
1,657✔
1785
  uv_sem_destroy(&uvTask->taskSem);
1,657✔
1786

1787
  return 0;
1,657✔
1788
}
1789

1790
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
1,657✔
1791
  fnDebug("event loop start uv task. uvTask: %" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
1,657✔
1792
  int32_t code = 0;
1,657✔
1793

1794
  switch (uvTask->type) {
1,657!
1795
    case UV_TASK_CONNECT: {
131✔
1796
      uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
131✔
1797
      if (pipe == NULL) {
131!
1798
        fnError("udfc event loop start connect task malloc pipe failed.");
×
1799
        return terrno;
×
1800
      }
1801
      if (uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0) != 0) {
131!
1802
        fnError("udfc event loop start connect task uv_pipe_init failed.");
×
1803
        taosMemoryFree(pipe);
×
1804
        return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
1805
      }
1806
      uvTask->pipe = pipe;
131✔
1807

1808
      SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
131✔
1809
      if (conn == NULL) {
131!
1810
        fnError("udfc event loop start connect task malloc conn failed.");
×
1811
        taosMemoryFree(pipe);
×
1812
        return terrno;
×
1813
      }
1814
      conn->pipe = pipe;
131✔
1815
      conn->readBuf.len = 0;
131✔
1816
      conn->readBuf.cap = 0;
131✔
1817
      conn->readBuf.buf = 0;
131✔
1818
      conn->readBuf.total = -1;
131✔
1819
      QUEUE_INIT(&conn->taskQueue);
131✔
1820

1821
      pipe->data = conn;
131✔
1822

1823
      uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
131✔
1824
      if (connReq == NULL) {
131!
1825
        fnError("udfc event loop start connect task malloc connReq failed.");
×
1826
        taosMemoryFree(pipe);
×
1827
        taosMemoryFree(conn);
×
1828
        return terrno;
×
1829
      }
1830
      connReq->data = uvTask;
131✔
1831
      uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
131✔
1832
      code = 0;
131✔
1833
      break;
131✔
1834
    }
1835
    case UV_TASK_REQ_RSP: {
1,396✔
1836
      uv_pipe_t *pipe = uvTask->pipe;
1,396✔
1837
      if (pipe == NULL) {
1,396!
1838
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1839
      } else {
1840
        uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
1,396✔
1841
        if (write == NULL) {
1,396!
1842
          fnError("udfc event loop start req_rsp task malloc write failed.");
×
1843
          return terrno;
×
1844
        }
1845
        write->data = pipe->data;
1,396✔
1846
        QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
1,396✔
1847
        QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
1,396✔
1848
        int32_t err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
1,396✔
1849
        if (err != 0) {
1,396!
1850
          taosMemoryFree(write);
×
1851
          fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err));
×
1852
        }
1853
        code = err;
1,396✔
1854
      }
1855
      break;
1,396✔
1856
    }
1857
    case UV_TASK_DISCONNECT: {
130✔
1858
      uv_pipe_t *pipe = uvTask->pipe;
130✔
1859
      if (pipe == NULL) {
130!
1860
        code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
1861
      } else {
1862
        SClientUvConn *conn = pipe->data;
130✔
1863
        QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
130✔
1864
        if (!uv_is_closing((uv_handle_t *)uvTask->pipe)) {
130!
1865
          uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose);
130✔
1866
        }
1867
        code = 0;
130✔
1868
      }
1869
      break;
130✔
1870
    }
1871
    default: {
×
1872
      fnError("udfc event loop unknown task type.") break;
×
1873
    }
1874
  }
1875

1876
  return code;
1,657✔
1877
}
1878

1879
void udfcAsyncTaskCb(uv_async_t *async) {
1,657✔
1880
  SUdfcProxy *udfc = async->data;
1,657✔
1881
  QUEUE       wq;
1882

1883
  uv_mutex_lock(&udfc->taskQueueMutex);
1,657✔
1884
  QUEUE_MOVE(&udfc->taskQueue, &wq);
1,657!
1885
  uv_mutex_unlock(&udfc->taskQueueMutex);
1,657✔
1886

1887
  while (!QUEUE_EMPTY(&wq)) {
3,314✔
1888
    QUEUE *h = QUEUE_HEAD(&wq);
1,657✔
1889
    QUEUE_REMOVE(h);
1,657✔
1890
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
1,657✔
1891
    int32_t            code = udfcStartUvTask(task);
1,657✔
1892
    if (code == 0) {
1,657!
1893
      QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
1,657✔
1894
    } else {
1895
      task->errCode = code;
×
1896
      uv_sem_post(&task->taskSem);
×
1897
    }
1898
  }
1899
}
1,657✔
1900

1901
void cleanUpUvTasks(SUdfcProxy *udfc) {
1,136✔
1902
  fnDebug("clean up uv tasks") QUEUE wq;
1,136✔
1903

1904
  uv_mutex_lock(&udfc->taskQueueMutex);
1,136✔
1905
  QUEUE_MOVE(&udfc->taskQueue, &wq);
1,136!
1906
  uv_mutex_unlock(&udfc->taskQueueMutex);
1,136✔
1907

1908
  while (!QUEUE_EMPTY(&wq)) {
1,136!
1909
    QUEUE *h = QUEUE_HEAD(&wq);
×
1910
    QUEUE_REMOVE(h);
×
1911
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
×
1912
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1913
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1914
    }
1915
    uv_sem_post(&task->taskSem);
×
1916
  }
1917

1918
  while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) {
1,136!
1919
    QUEUE *h = QUEUE_HEAD(&udfc->uvProcTaskQueue);
×
1920
    QUEUE_REMOVE(h);
×
1921
    SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
×
1922
    if (udfc->udfcState == UDFC_STATE_STOPPING) {
×
1923
      task->errCode = TSDB_CODE_UDF_STOPPING;
×
1924
    }
1925
    uv_sem_post(&task->taskSem);
×
1926
  }
1927
}
1,136✔
1928

1929
void udfStopAsyncCb(uv_async_t *async) {
1,136✔
1930
  SUdfcProxy *udfc = async->data;
1,136✔
1931
  cleanUpUvTasks(udfc);
1,136✔
1932
  if (udfc->udfcState == UDFC_STATE_STOPPING) {
1,136!
1933
    uv_stop(&udfc->uvLoop);
1,136✔
1934
  }
1935
}
1,136✔
1936

1937
void constructUdfService(void *argsThread) {
1,136✔
1938
  int32_t     code = 0, lino = 0;
1,136✔
1939
  SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
1,136✔
1940
  code = uv_loop_init(&udfc->uvLoop);
1,136✔
1941
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,136!
1942

1943
  code = uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
1,136✔
1944
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,136!
1945
  udfc->loopTaskAync.data = udfc;
1,136✔
1946
  code = uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
1,136✔
1947
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,136!
1948
  udfc->loopStopAsync.data = udfc;
1,136✔
1949
  code = uv_mutex_init(&udfc->taskQueueMutex);
1,136✔
1950
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,136!
1951
  QUEUE_INIT(&udfc->taskQueue);
1,136✔
1952
  QUEUE_INIT(&udfc->uvProcTaskQueue);
1,136✔
1953
  (void)uv_barrier_wait(&udfc->initBarrier);
1,136✔
1954
  // TODO return value of uv_run
1955
  int32_t num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
1,136✔
1956
  fnInfo("udfc uv loop exit. active handle num: %d", num);
1,136!
1957
  (void)uv_loop_close(&udfc->uvLoop);
1,136✔
1958

1959
  uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL);
1,136✔
1960
  num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
1,136✔
1961
  fnInfo("udfc uv loop exit. active handle num: %d", num);
1,136!
1962

1963
  (void)uv_loop_close(&udfc->uvLoop);
1,136✔
1964
_exit:
1,136✔
1965
  if (code != 0) {
1,136!
1966
    fnError("udfc construct error. code: %d, line: %d", code, lino);
×
1967
  }
1968
  fnInfo("udfc construct finished");
1,136!
1969
}
1,136✔
1970

1971
int32_t udfcOpen() {
1,261✔
1972
  int32_t code = 0, lino = 0;
1,261✔
1973
  int8_t  old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
1,261✔
1974
  if (old == 1) {
1,261✔
1975
    return 0;
125✔
1976
  }
1977
  SUdfcProxy *proxy = &gUdfcProxy;
1,136✔
1978
  getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
1,136✔
1979
  proxy->udfcState = UDFC_STATE_STARTNG;
1,136✔
1980
  code = uv_barrier_init(&proxy->initBarrier, 2);
1,136✔
1981
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,136!
1982
  code = uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
1,136✔
1983
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,136!
1984
  atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
1,136✔
1985
  proxy->udfcState = UDFC_STATE_READY;
1,136✔
1986
  (void)uv_barrier_wait(&proxy->initBarrier);
1,136✔
1987
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,136!
1988
  code = uv_mutex_init(&proxy->udfStubsMutex);
1,136✔
1989
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,136!
1990
  proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
1,136✔
1991
  if (proxy->udfStubs == NULL) {
1,136!
1992
    fnError("udfc init failed. udfStubs: %p", proxy->udfStubs);
×
1993
    return -1;
×
1994
  }
1995
  proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
1,136✔
1996
  if (proxy->expiredUdfStubs == NULL) {
1,136!
1997
    taosArrayDestroy(proxy->udfStubs);
×
1998
    fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs);
×
1999
    return -1;
×
2000
  }
2001
  code = uv_mutex_init(&proxy->udfcUvMutex);
1,136✔
2002
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,136!
2003
_exit:
1,136✔
2004
  if (code != 0) {
1,136!
2005
    fnError("udfc open error. code: %d, line: %d", code, lino);
×
2006
    return TSDB_CODE_UDF_UV_EXEC_FAILURE;
×
2007
  }
2008
  fnInfo("udfc initialized");
1,136!
2009
  return 0;
1,136✔
2010
}
2011

2012
int32_t udfcClose() {
1,166✔
2013
  int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 1, 0);
1,166✔
2014
  if (old == 0) {
1,166✔
2015
    return 0;
30✔
2016
  }
2017

2018
  SUdfcProxy *udfc = &gUdfcProxy;
1,136✔
2019
  udfc->udfcState = UDFC_STATE_STOPPING;
1,136✔
2020
  if (uv_async_send(&udfc->loopStopAsync) != 0) {
1,136!
2021
    fnError("udfc close error to send stop async");
×
2022
  }
2023
  if (uv_thread_join(&udfc->loopThread) != 0) {
1,136!
2024
    fnError("udfc close errir to join loop thread");
×
2025
  }
2026
  uv_mutex_destroy(&udfc->taskQueueMutex);
1,136✔
2027
  uv_barrier_destroy(&udfc->initBarrier);
1,136✔
2028
  taosArrayDestroy(udfc->expiredUdfStubs);
1,136✔
2029
  taosArrayDestroy(udfc->udfStubs);
1,136✔
2030
  uv_mutex_destroy(&udfc->udfStubsMutex);
1,136✔
2031
  uv_mutex_destroy(&udfc->udfcUvMutex);
1,136✔
2032
  udfc->udfcState = UDFC_STATE_INITAL;
1,136✔
2033
  fnInfo("udfc is cleaned up");
1,136!
2034
  return 0;
1,136✔
2035
}
2036

2037
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
1,657✔
2038
  int32_t            code = 0, lino = 0;
1,657✔
2039
  SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
1,657✔
2040
  if (uvTask == NULL) {
1,657!
2041
    fnError("udfc client task: %p failed to allocate memory for uvTask", task);
×
2042
    return terrno;
×
2043
  }
2044
  fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
1,657✔
2045

2046
  code = udfcInitializeUvTask(task, uvTaskType, uvTask);
1,657✔
2047
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,657!
2048
  code = udfcQueueUvTask(uvTask);
1,657✔
2049
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,657!
2050
  code = udfcGetUdfTaskResultFromUvTask(task, uvTask);
1,657✔
2051
  TAOS_CHECK_GOTO(code, &lino, _exit);
1,657!
2052
  if (uvTaskType == UV_TASK_CONNECT) {
1,657✔
2053
    task->session->udfUvPipe = uvTask->pipe;
131✔
2054
    SClientUvConn *conn = uvTask->pipe->data;
131✔
2055
    conn->session = task->session;
131✔
2056
  }
2057

2058
_exit:
1,526✔
2059
  if (code != 0) {
1,657!
2060
    fnError("udfc run udf uv task failure. task: %p, uvTask: %p, err: %d, line: %d", task, uvTask, code, lino);
×
2061
  }
2062
  taosMemoryFree(uvTask->reqBuf.base);
1,657✔
2063
  uvTask->reqBuf.base = NULL;
1,657✔
2064
  taosMemoryFree(uvTask);
1,657✔
2065
  uvTask = NULL;
1,657✔
2066
  return code;
1,657✔
2067
}
2068

2069
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
131✔
2070
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
131✔
2071
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
131✔
2072
  if (task == NULL) {
131!
2073
    fnError("doSetupUdf, failed to allocate memory for task");
×
2074
    return terrno;
×
2075
  }
2076
  task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
131✔
2077
  if (task->session == NULL) {
131!
2078
    fnError("doSetupUdf, failed to allocate memory for session");
×
2079
    taosMemoryFree(task);
×
2080
    return terrno;
×
2081
  }
2082
  task->session->udfc = &gUdfcProxy;
131✔
2083
  task->type = UDF_TASK_SETUP;
131✔
2084

2085
  SUdfSetupRequest *req = &task->_setup.req;
131✔
2086
  tstrncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
131✔
2087

2088
  code = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
131✔
2089
  TAOS_CHECK_GOTO(code, &lino, _exit);
131!
2090

2091
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
131✔
2092
  TAOS_CHECK_GOTO(code, &lino, _exit);
131!
2093

2094
  SUdfSetupResponse *rsp = &task->_setup.rsp;
131✔
2095
  task->session->severHandle = rsp->udfHandle;
131✔
2096
  task->session->outputType = rsp->outputType;
131✔
2097
  task->session->bytes = rsp->bytes;
131✔
2098
  task->session->bufSize = rsp->bufSize;
131✔
2099
  tstrncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
131✔
2100
  fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
131!
2101
  *funcHandle = task->session;
131✔
2102
  taosMemoryFree(task);
131✔
2103
  return 0;
131✔
2104

2105
_exit:
×
2106
  if (code != 0) {
×
2107
    fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino);
×
2108
  }
2109
  taosMemoryFree(task->session);
×
2110
  taosMemoryFree(task);
×
2111
  return code;
×
2112
}
2113

2114
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
1,135✔
2115
                SSDataBlock *output, SUdfInterBuf *newState) {
2116
  fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
1,135✔
2117
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
1,135✔
2118
  if (session->udfUvPipe == NULL) {
1,135!
2119
    fnError("No pipe to udfd");
×
2120
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2121
  }
2122
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
1,135✔
2123
  if (task == NULL) {
1,135!
2124
    fnError("udfc call udf. failed to allocate memory for task");
×
2125
    return terrno;
×
2126
  }
2127
  task->session = (SUdfcUvSession *)handle;
1,135✔
2128
  task->type = UDF_TASK_CALL;
1,135✔
2129

2130
  SUdfCallRequest *req = &task->_call.req;
1,135✔
2131
  req->udfHandle = task->session->severHandle;
1,135✔
2132
  req->callType = callType;
1,135✔
2133

2134
  switch (callType) {
1,135!
2135
    case TSDB_UDF_CALL_AGG_INIT: {
233✔
2136
      req->initFirst = 1;
233✔
2137
      break;
233✔
2138
    }
2139
    case TSDB_UDF_CALL_AGG_PROC: {
502✔
2140
      req->block = *input;
502✔
2141
      req->interBuf = *state;
502✔
2142
      break;
502✔
2143
    }
2144
    case TSDB_UDF_CALL_AGG_MERGE: {
×
2145
      req->interBuf = *state;
×
2146
      req->interBuf2 = *state2;
×
2147
      break;
×
2148
    }
2149
    case TSDB_UDF_CALL_AGG_FIN: {
233✔
2150
      req->interBuf = *state;
233✔
2151
      break;
233✔
2152
    }
2153
    case TSDB_UDF_CALL_SCALA_PROC: {
167✔
2154
      req->block = *input;
167✔
2155
      break;
167✔
2156
    }
2157
  }
2158

2159
  int32_t code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
1,135✔
2160
  if (code != 0) {
1,135!
2161
    fnError("call udf failure. udfcRunUdfUvTask err: %d", code);
×
2162
  } else {
2163
    SUdfCallResponse *rsp = &task->_call.rsp;
1,135✔
2164
    switch (callType) {
1,135!
2165
      case TSDB_UDF_CALL_AGG_INIT: {
233✔
2166
        *newState = rsp->resultBuf;
233✔
2167
        break;
233✔
2168
      }
2169
      case TSDB_UDF_CALL_AGG_PROC: {
502✔
2170
        *newState = rsp->resultBuf;
502✔
2171
        break;
502✔
2172
      }
2173
      case TSDB_UDF_CALL_AGG_MERGE: {
×
2174
        *newState = rsp->resultBuf;
×
2175
        break;
×
2176
      }
2177
      case TSDB_UDF_CALL_AGG_FIN: {
233✔
2178
        *newState = rsp->resultBuf;
233✔
2179
        break;
233✔
2180
      }
2181
      case TSDB_UDF_CALL_SCALA_PROC: {
167✔
2182
        *output = rsp->resultData;
167✔
2183
        break;
167✔
2184
      }
2185
    }
2186
  };
2187
  taosMemoryFree(task);
1,135✔
2188
  return code;
1,135✔
2189
}
2190

2191
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
233✔
2192
  int8_t callType = TSDB_UDF_CALL_AGG_INIT;
233✔
2193

2194
  int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
233✔
2195

2196
  return err;
233✔
2197
}
2198

2199
// input: block, state
2200
// output: interbuf,
2201
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
502✔
2202
  int8_t  callType = TSDB_UDF_CALL_AGG_PROC;
502✔
2203
  int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
502✔
2204
  return err;
502✔
2205
}
2206

2207
// input: interbuf1, interbuf2
2208
// output: resultBuf
2209
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
×
2210
                          SUdfInterBuf *resultBuf) {
2211
  int8_t  callType = TSDB_UDF_CALL_AGG_MERGE;
×
2212
  int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
×
2213
  return err;
×
2214
}
2215

2216
// input: interBuf
2217
// output: resultData
2218
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
233✔
2219
  int8_t  callType = TSDB_UDF_CALL_AGG_FIN;
233✔
2220
  int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
233✔
2221
  return err;
233✔
2222
}
2223

2224
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
167✔
2225
  int8_t      callType = TSDB_UDF_CALL_SCALA_PROC;
167✔
2226
  SSDataBlock inputBlock = {0};
167✔
2227
  int32_t     code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
167✔
2228
  if (code != 0) {
167!
2229
    fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code);
×
2230
    return code;
×
2231
  }
2232
  SSDataBlock resultBlock = {0};
167✔
2233
  int32_t     err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
167✔
2234
  if (err == 0) {
167!
2235
    err = convertDataBlockToScalarParm(&resultBlock, output);
167✔
2236
    taosArrayDestroy(resultBlock.pDataBlock);
167✔
2237
  }
2238

2239
  blockDataFreeRes(&inputBlock);
167✔
2240
  return err;
167✔
2241
}
2242

2243
int32_t doTeardownUdf(UdfcFuncHandle handle) {
130✔
2244
  int32_t         code = TSDB_CODE_SUCCESS, lino = 0;
130✔
2245
  SUdfcUvSession *session = (SUdfcUvSession *)handle;
130✔
2246

2247
  if (session->udfUvPipe == NULL) {
130!
2248
    fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName);
×
2249
    taosMemoryFree(session);
×
2250
    return TSDB_CODE_UDF_PIPE_NOT_EXIST;
×
2251
  }
2252

2253
  SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
130✔
2254
  if (task == NULL) {
130!
2255
    fnError("doTeardownUdf, failed to allocate memory for task");
×
2256
    taosMemoryFree(session);
×
2257
    return terrno;
×
2258
  }
2259
  task->session = session;
130✔
2260
  task->type = UDF_TASK_TEARDOWN;
130✔
2261

2262
  SUdfTeardownRequest *req = &task->_teardown.req;
130✔
2263
  req->udfHandle = task->session->severHandle;
130✔
2264

2265
  code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
130✔
2266
  TAOS_CHECK_GOTO(code, &lino, _exit);
130!
2267

2268
  code = udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
130✔
2269
  TAOS_CHECK_GOTO(code, &lino, _exit);
130!
2270

2271
  fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
130!
2272
  // TODO: synchronization refactor between libuv event loop and request thread
2273
  uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
130✔
2274
  if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
130!
2275
    SClientUvConn *conn = session->udfUvPipe->data;
×
2276
    conn->session = NULL;
×
2277
  }
2278
  uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
130✔
2279

2280
_exit:
130✔
2281
  if (code != 0) {
130!
2282
    fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino);
×
2283
  }
2284
  taosMemoryFree(session);
130✔
2285
  taosMemoryFree(task);
130✔
2286

2287
  return code;
130✔
2288
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc