• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5053

13 May 2026 12:00PM UTC coverage: 73.397% (+0.06%) from 73.338%
#5053

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

627 existing lines in 131 files now uncovered.

281694 of 383795 relevant lines covered (73.4%)

132505311.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.88
/source/client/src/clientMain.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "clientStmt.h"
22
#include "clientStmt2.h"
23
#include "functionMgt.h"
24
#include "os.h"
25
#include "query.h"
26
#include "scheduler.h"
27
#include "tcompare.h"
28
#include "tconv.h"
29
#include "tdatablock.h"
30
#include "tglobal.h"
31
#include "tmisce.h"
32
#include "tmsg.h"
33
#include "tref.h"
34
#include "trpc.h"
35
#include "ttime.h"
36
#include "tversion.h"
37
#include "version.h"
38

39
#define CLIENT_CLEANUP_WAIT_TIMEOUT_MS 10000
40

41
#ifdef TAOSD_INTEGRATED
42
extern void shellStopDaemon();
43
#endif
44

45
static void instanceRpcGlobalCleanup(void);
46

47
static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SSqlCallbackWrapper *pWrapper);
48

49
static int32_t waitRefSetToBaseCount(int32_t rsetId, const char *name, int64_t startMs, int64_t timeoutMs) {
3,338,846✔
50
  if (rsetId < 0) {
3,338,846✔
51
    return TSDB_CODE_SUCCESS;
144✔
52
  }
53

54
  while (true) {
1,022,866,627✔
55
    int32_t count = 0;
1,026,205,329✔
56
    int32_t code = taosGetRefSetCount(rsetId, &count);
1,026,205,329✔
57
    if (code != TSDB_CODE_SUCCESS) {
1,026,205,329✔
58
      tscWarn("failed to inspect %s ref pool:%d before cleanup, code:%s", name, rsetId, tstrerror(code));
×
59
      return code;
34,370✔
60
    }
61

62
    if (count <= 1) {
1,026,205,329✔
63
      return TSDB_CODE_SUCCESS;
3,096,143✔
64
    }
65

66
    if (timeoutMs >= 0 && taosGetTimestampMs() - startMs >= timeoutMs) {
2,046,218,372✔
67
      tscWarn("timeout waiting for %s ref pool:%d to drain, count:%d", name, rsetId, count);
242,559✔
68
      return TSDB_CODE_TIMEOUT_ERROR;
242,559✔
69
    }
70

71
    taosMsleep(1);
1,022,866,627✔
72
  }
73
}
74

75
int taos_options(TSDB_OPTION option, const void *arg, ...) {
98,262,376✔
76
  if (arg == NULL) {
98,262,376✔
77
    return TSDB_CODE_INVALID_PARA;
×
78
  }
79
  static int32_t lock = 0;
80

81
  for (int i = 1; atomic_val_compare_exchange_32(&lock, 0, 1) != 0; ++i) {
2,147,483,647✔
82
    if (i % 1000 == 0) {
2,147,483,647✔
83
      (void)sched_yield();
12,091,917✔
84
    }
85
  }
86

87
  int ret = taos_options_imp(option, (const char *)arg);
1,002,625✔
88
  atomic_store_32(&lock, 0);
98,268,064✔
89
  return ret;
98,268,064✔
90
}
91

92
#if !defined(WINDOWS) && !defined(TD_ASTRA)
93
static void freeTz(void *p) {
3,454✔
94
  timezone_t tz = *(timezone_t *)p;
3,454✔
95
  tzfree(tz);
3,454✔
96
}
3,454✔
97
#endif
98

99
int32_t tzInit() {
1,669,351✔
100
#if !defined(WINDOWS) && !defined(TD_ASTRA)
101
  pTimezoneMap = taosHashInit(0, MurmurHash3_32, false, HASH_ENTRY_LOCK);
1,669,351✔
102
  if (pTimezoneMap == NULL) {
1,669,351✔
103
    return terrno;
×
104
  }
105
  taosHashSetFreeFp(pTimezoneMap, freeTz);
1,669,351✔
106

107
  pTimezoneNameMap = taosHashInit(0, taosIntHash_64, false, HASH_ENTRY_LOCK);
1,669,351✔
108
  if (pTimezoneNameMap == NULL) {
1,669,351✔
109
    return terrno;
×
110
  }
111
#endif
112
  return 0;
1,669,351✔
113
}
114

115
void tzCleanup() {
1,669,423✔
116
#if !defined(WINDOWS) && !defined(TD_ASTRA)
117
  taosHashCleanup(pTimezoneMap);
1,669,423✔
118
  taosHashCleanup(pTimezoneNameMap);
1,669,423✔
119
#endif
120
}
1,669,423✔
121

122
#if !defined(WINDOWS) && !defined(TD_ASTRA)
123
static timezone_t setConnnectionTz(const char *val) {
5,724✔
124
  timezone_t  tz = NULL;
5,724✔
125
  timezone_t *tmp = taosHashGet(pTimezoneMap, val, strlen(val));
5,724✔
126
  if (tmp != NULL && *tmp != NULL) {
5,724✔
127
    tz = *tmp;
2,270✔
128
    goto END;
2,270✔
129
  }
130

131
  tscDebug("set timezone to %s", val);
3,454✔
132
  tz = tzalloc(val);
3,454✔
133
  if (tz == NULL) {
3,454✔
134
    tscWarn("%s unknown timezone %s change to UTC", __func__, val);
296✔
135
    tz = tzalloc("UTC");
296✔
136
    if (tz == NULL) {
296✔
137
      tscError("%s set timezone UTC error", __func__);
×
138
      terrno = TAOS_SYSTEM_ERROR(ERRNO);
×
139
      goto END;
×
140
    }
141
  }
142
  int32_t code = taosHashPut(pTimezoneMap, val, strlen(val), &tz, sizeof(timezone_t));
3,454✔
143
  if (code != 0) {
3,454✔
144
    tscError("%s put timezone to tz map error:%d", __func__, code);
×
145
    tzfree(tz);
×
146
    tz = NULL;
×
147
    goto END;
×
148
  }
149

150
  time_t tx1 = taosGetTimestampSec();
3,454✔
151
  char   output[TD_TIMEZONE_LEN] = {0};
3,454✔
152
  code = taosFormatTimezoneStr(tx1, val, tz, output);
3,454✔
153
  if (code == 0) {
3,454✔
154
    code = taosHashPut(pTimezoneNameMap, &tz, sizeof(timezone_t), output, strlen(output) + 1);
3,454✔
155
  }
156
  if (code != 0) {
3,454✔
157
    tscError("failed to put timezone %s to map", val);
×
158
  }
159

160
END:
3,454✔
161
  return tz;
5,724✔
162
}
163
#endif
164

165
static int32_t setConnectionOption(TAOS *taos, TSDB_OPTION_CONNECTION option, const char *val) {
14,378✔
166
  if (taos == NULL) {
14,378✔
167
    return terrno = TSDB_CODE_INVALID_PARA;
296✔
168
  }
169

170
#ifdef WINDOWS
171
  if (option == TSDB_OPTION_CONNECTION_TIMEZONE) {
172
    return terrno = TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
173
  }
174
#endif
175

176
  if (option < TSDB_OPTION_CONNECTION_CLEAR || option >= TSDB_MAX_OPTIONS_CONNECTION) {
14,082✔
177
    return terrno = TSDB_CODE_INVALID_PARA;
296✔
178
  }
179

180
  int32_t code = taos_init();
13,786✔
181
  // initialize global config
182
  if (code != 0) {
13,786✔
183
    return terrno = code;
×
184
  }
185

186
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
13,786✔
187
  if (NULL == pObj) {
13,786✔
188
    tscError("invalid parameter for %s", __func__);
×
189
    return terrno;
×
190
  }
191

192
  if (option == TSDB_OPTION_CONNECTION_CLEAR) {
13,786✔
193
    val = NULL;
296✔
194
  }
195

196
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
197
  if (option == TSDB_OPTION_CONNECTION_CHARSET || option == TSDB_OPTION_CONNECTION_CLEAR) {
13,786✔
198
    if (val != NULL) {
2,564✔
199
      if (!taosValidateEncodec(val)) {
1,676✔
200
        code = terrno;
394✔
201
        goto END;
394✔
202
      }
203
      void *tmp = taosConvInit(val);
1,282✔
204
      if (tmp == NULL) {
1,282✔
205
        code = terrno;
296✔
206
        goto END;
296✔
207
      }
208
      pObj->optionInfo.charsetCxt = tmp;
986✔
209
    } else {
210
      pObj->optionInfo.charsetCxt = NULL;
888✔
211
    }
212
  }
213
#endif
214
  if (option == TSDB_OPTION_CONNECTION_TIMEZONE || option == TSDB_OPTION_CONNECTION_CLEAR) {
13,096✔
215
#if !defined(WINDOWS) && !defined(TD_ASTRA)
216
    if (val != NULL) {
6,316✔
217
      if (val[0] == 0) {
5,724✔
218
        val = "UTC";
296✔
219
      }
220
      timezone_t tz = setConnnectionTz(val);
5,724✔
221
      if (tz == NULL) {
5,724✔
222
        code = terrno;
×
223
        goto END;
×
224
      }
225
      pObj->optionInfo.timezone = tz;
5,724✔
226
    } else {
227
      pObj->optionInfo.timezone = NULL;
592✔
228
    }
229
#endif
230
  }
231

232
  if (option == TSDB_OPTION_CONNECTION_USER_APP || option == TSDB_OPTION_CONNECTION_CLEAR) {
13,096✔
233
    if (val != NULL) {
1,664✔
234
      tstrncpy(pObj->optionInfo.userApp, val, sizeof(pObj->optionInfo.userApp));
1,072✔
235
    } else {
236
      pObj->optionInfo.userApp[0] = 0;
592✔
237
    }
238
  }
239

240
  if (option == TSDB_OPTION_CONNECTION_CONNECTOR_INFO || option == TSDB_OPTION_CONNECTION_CLEAR) {
13,096✔
241
    if (val != NULL) {
1,664✔
242
      tstrncpy(pObj->optionInfo.cInfo, val, sizeof(pObj->optionInfo.cInfo));
1,072✔
243
    } else {
244
      pObj->optionInfo.cInfo[0] = 0;
592✔
245
    }
246
  }
247

248
  if (option == TSDB_OPTION_CONNECTION_USER_IP || option == TSDB_OPTION_CONNECTION_CLEAR) {
13,096✔
249
    SIpRange dualIp = {0};
2,762✔
250
    if (val != NULL) {
2,762✔
251
      pObj->optionInfo.userIp = taosInetAddr(val);
1,874✔
252
      SIpAddr addr = {0};
1,874✔
253
      code = taosGetIpFromFqdn(tsEnableIpv6, val, &addr);
1,874✔
254
      if (code == 0) {
1,874✔
255
        code = tIpStrToUint(&addr, &pObj->optionInfo.userDualIp);
986✔
256
      }
257
      if (code != 0) {
1,874✔
258
        tscError("ipv6 flag %d failed to convert user ip %s to dual ip since %s", tsEnableIpv6 ? 1 : 0, val,
888✔
259
                 tstrerror(code));
260
        pObj->optionInfo.userIp = INADDR_NONE;
888✔
261
        pObj->optionInfo.userDualIp = dualIp;
888✔
262
        code = 0;
888✔
263
      }
264
    } else {
265
      pObj->optionInfo.userIp = INADDR_NONE;
888✔
266
      pObj->optionInfo.userDualIp = dualIp;
888✔
267
    }
268
  }
269

270
END:
10,334✔
271
  releaseTscObj(*(int64_t *)taos);
13,786✔
272
  return terrno = code;
13,786✔
273
}
274

275
int taos_options_connection(TAOS *taos, TSDB_OPTION_CONNECTION option, const void *arg, ...) {
14,378✔
276
  return setConnectionOption(taos, option, (const char *)arg);
14,378✔
277
}
278

279
// this function may be called by user or system, or by both simultaneously.
280
void taos_cleanup(void) {
1,670,862✔
281
  tscInfo("start to cleanup client environment");
1,670,862✔
282
  if (!beginAsyncWorkShutdown()) {
1,670,862✔
283
    return;
1,439✔
284
  }
285

286
  int64_t cleanupStartMs = taosGetTimestampMs();
1,669,423✔
287

288
  if (TSDB_CODE_SUCCESS !=
1,669,423✔
289
      waitRefSetToBaseCount(clientReqRefPool, "request", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) {
1,669,423✔
290
    tscWarn("request ref pool did not drain cleanly before cleanup continues");
116,027✔
291
  }
292

293
  monitorClose();
1,669,423✔
294
  tscStopCrashReport();
1,669,423✔
295

296
  hbMgrCleanUp();
1,669,423✔
297

298
  catalogDestroy();
1,669,423✔
299
  schedulerDestroy();
1,669,423✔
300

301
  fmFuncMgtDestroy();
1,669,423✔
302
  qCleanupKeywordsTable();
1,669,423✔
303

304
#if !defined(WINDOWS) && !defined(TD_ASTRA)
305
  tzCleanup();
1,669,423✔
306
#endif
307
  tmqMgmtClose();
1,669,423✔
308

309
  int32_t id = clientReqRefPool;
1,669,423✔
310
  clientReqRefPool = -1;
1,669,423✔
311
  taosCloseRef(id);
1,669,423✔
312

313
  if (TSDB_CODE_SUCCESS !=
1,669,423✔
314
      waitRefSetToBaseCount(clientConnRefPool, "connection", cleanupStartMs, CLIENT_CLEANUP_WAIT_TIMEOUT_MS)) {
1,669,423✔
315
    tscWarn("connection ref pool did not drain cleanly before cleanup continues");
126,532✔
316
  }
317

318
  id = clientConnRefPool;
1,669,423✔
319
  clientConnRefPool = -1;
1,669,423✔
320
  taosCloseRef(id);
1,669,423✔
321

322
  nodesDestroyAllocatorSet();
1,669,423✔
323
  cleanupAppInfo();
1,669,423✔
324
  instanceRpcGlobalCleanup();
1,669,423✔
325
  rpcCleanup();
1,669,423✔
326
  tscDebug("rpc cleanup");
1,669,423✔
327

328
  if (TSDB_CODE_SUCCESS != cleanupTaskQueue()) {
1,669,423✔
329
    tscWarn("failed to cleanup task queue");
×
330
  }
331

332
  sessMgtDestroy();
1,669,423✔
333

334
  taosConvDestroy();
1,669,423✔
335
  DestroyRegexCache();
1,669,423✔
336
#ifdef TAOSD_INTEGRATED
337
  shellStopDaemon();
338
#endif
339
  tscInfo("all local resources released");
1,669,423✔
340
  taosCleanupCfg();
1,669,423✔
341
#ifndef TAOSD_INTEGRATED
342
  taosCloseLog();
1,669,423✔
343
#endif
344
}
345

346
static setConfRet taos_set_config_imp(const char *config) {
19✔
347
  setConfRet ret = {SET_CONF_RET_SUCC, {0}};
19✔
348
  // TODO: need re-implementation
349
  return ret;
19✔
350
}
351

352
setConfRet taos_set_config(const char *config) {
19✔
353
  // TODO  pthread_mutex_lock(&setConfMutex);
354
  setConfRet ret = taos_set_config_imp(config);
19✔
355
  //  pthread_mutex_unlock(&setConfMutex);
356
  return ret;
19✔
357
}
358

359
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
100,682,423✔
360
  tscInfo("try to connect to %s:%u, user:%s db:%s", ip, port, user, db);
100,682,423✔
361
  if (user == NULL) {
100,682,676✔
362
    user = TSDB_DEFAULT_USER;
221,470✔
363
  }
364

365
  if (pass == NULL) {
100,682,676✔
366
    pass = TSDB_DEFAULT_PASS;
221,470✔
367
  }
368

369
  STscObj *pObj = NULL;
100,682,676✔
370
  int32_t  code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj);
100,682,676✔
371
  if (TSDB_CODE_SUCCESS == code) {
100,664,606✔
372
    int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
100,652,976✔
373
    if (NULL == rid) {
100,655,305✔
374
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
375
      return NULL;
×
376
    }
377
    *rid = pObj->id;
100,655,305✔
378
    return (TAOS *)rid;
100,655,333✔
379
  } else {
380
    terrno = code;
19,123✔
381
  }
382

383
  return NULL;
19,323✔
384
}
385

386
void taos_set_option(OPTIONS *options, const char *key, const char *value) {
4,030✔
387
  if (options == NULL || key == NULL || value == NULL) {
4,030✔
388
    terrno = TSDB_CODE_INVALID_PARA;
294✔
389
    tscError("taos_set_option invalid parameter, options: %p, key: %p, value: %p", options, key, value);
294✔
390
    return;
294✔
391
  }
392

393
  size_t count = (size_t)options->count;
3,736✔
394
  size_t len = sizeof(options->keys) / sizeof(options->keys[0]);
3,736✔
395
  if (count >= len) {
3,736✔
396
    terrno = TSDB_CODE_INVALID_PARA;
98✔
397
    tscError("taos_set_option overflow, count: %zu, reached capacity: %zu", count, len);
98✔
398
    return;
98✔
399
  }
400

401
  options->keys[count] = key;
3,638✔
402
  options->values[count] = value;
3,638✔
403
  options->count = (uint16_t)(count + 1);
3,638✔
404
}
405

406
static int set_connection_option_or_close(TAOS *taos, TSDB_OPTION_CONNECTION option, const char *value) {
3,538✔
407
  if (value == NULL) return TSDB_CODE_SUCCESS;
3,538✔
408
  int code = taos_options_connection(taos, option, value);
588✔
409
  if (code != TSDB_CODE_SUCCESS) {
588✔
410
    tscError("failed to set option(%d): %s", (int)option, value);
98✔
411
    taos_close(taos);
98✔
412
    return code;
98✔
413
  }
414
  return TSDB_CODE_SUCCESS;
490✔
415
}
416

417
TAOS *taos_connect_with(const OPTIONS *options) {
884✔
418
  const char *ip = NULL;
884✔
419
  const char *user = NULL;
884✔
420
  const char *pass = NULL;
884✔
421
  const char *db = NULL;
884✔
422
  uint16_t    port = 0;
884✔
423

424
  const char *charset = NULL;
884✔
425
  const char *timezone = NULL;
884✔
426
  const char *userIp = NULL;
884✔
427
  const char *userApp = NULL;
884✔
428
  const char *connectorInfo = NULL;
884✔
429

430
  if (options && options->count > 0) {
884✔
431
    size_t count = (size_t)options->count;
786✔
432
    for (size_t i = 0; i < count; ++i) {
4,522✔
433
      const char *key = options->keys[i];
3,736✔
434
      const char *value = options->values[i];
3,736✔
435
      if (key == NULL || value == NULL) {
3,736✔
436
        tscWarn("taos_connect_with option key or value is NULL, index: %zu", i);
294✔
437
        continue;
294✔
438
      }
439

440
      if (strcmp(key, "ip") == 0) {
3,442✔
441
        ip = value;
688✔
442
      } else if (strcmp(key, "user") == 0) {
2,754✔
443
        user = value;
394✔
444
      } else if (strcmp(key, "pass") == 0) {
2,360✔
445
        pass = value;
394✔
446
      } else if (strcmp(key, "db") == 0) {
1,966✔
447
        db = value;
98✔
448
      } else if (strcmp(key, "port") == 0) {
1,868✔
449
        port = (uint16_t)taosStr2Int32(value, NULL, 10);
590✔
450
      } else if (strcmp(key, "charset") == 0) {
1,278✔
451
        charset = value;
196✔
452
      } else if (strcmp(key, "timezone") == 0) {
1,082✔
453
        timezone = value;
98✔
454
      } else if (strcmp(key, "userIp") == 0) {
984✔
455
        userIp = value;
98✔
456
      } else if (strcmp(key, "userApp") == 0) {
886✔
457
        userApp = value;
98✔
458
      } else if (strcmp(key, "connectorInfo") == 0) {
788✔
459
        connectorInfo = value;
98✔
460
      } else {
461
        tscWarn("taos_connect_with unknown option key: %s", key);
690✔
462
      }
463
    }
464
  }
465

466
  TAOS *taos = taos_connect(ip, user, pass, db, port);
884✔
467
  if (taos == NULL) return NULL;
884✔
468

469
  if (set_connection_option_or_close(taos, TSDB_OPTION_CONNECTION_CHARSET, charset) != TSDB_CODE_SUCCESS) return NULL;
786✔
470
  if (set_connection_option_or_close(taos, TSDB_OPTION_CONNECTION_TIMEZONE, timezone) != TSDB_CODE_SUCCESS) return NULL;
688✔
471
  if (set_connection_option_or_close(taos, TSDB_OPTION_CONNECTION_USER_IP, userIp) != TSDB_CODE_SUCCESS) return NULL;
688✔
472
  if (set_connection_option_or_close(taos, TSDB_OPTION_CONNECTION_USER_APP, userApp) != TSDB_CODE_SUCCESS) return NULL;
688✔
473
  if (set_connection_option_or_close(taos, TSDB_OPTION_CONNECTION_CONNECTOR_INFO, connectorInfo) != TSDB_CODE_SUCCESS)
688✔
474
    return NULL;
×
475

476
  return taos;
688✔
477
}
478

479
TAOS *taos_connect_with_dsn(const char *dsn) {
98✔
480
  terrno = TSDB_CODE_OPS_NOT_SUPPORT;
98✔
481
  tscError("taos_connect_with_dsn not supported");
98✔
482
  return NULL;
98✔
483
}
484

485
int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type) {
1,470✔
486
  if (taos == NULL) {
1,470✔
487
    terrno = TSDB_CODE_INVALID_PARA;
×
488
    return terrno;
×
489
  }
490

491
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
1,470✔
492
  if (NULL == pObj) {
1,470✔
493
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
494
    tscError("invalid parameter for %s", __func__);
×
495
    return terrno;
×
496
  }
497

498
  switch (type) {
1,470✔
499
    case TAOS_NOTIFY_PASSVER: {
420✔
500
      TSC_ERR_RET(taosThreadMutexLock(&pObj->mutex));
420✔
501
      pObj->passInfo.fp = fp;
420✔
502
      pObj->passInfo.param = param;
420✔
503
      TSC_ERR_RET(taosThreadMutexUnlock(&pObj->mutex));
420✔
504
      break;
420✔
505
    }
506
    case TAOS_NOTIFY_WHITELIST_VER: {
×
507
      TSC_ERR_RET(taosThreadMutexLock(&pObj->mutex));
×
508
      pObj->whiteListInfo.fp = fp;
×
509
      pObj->whiteListInfo.param = param;
×
510
      TSC_ERR_RET(taosThreadMutexUnlock(&pObj->mutex));
×
511
      break;
×
512
    }
513
    case TAOS_NOTIFY_USER_DROPPED: {
1,050✔
514
      TSC_ERR_RET(taosThreadMutexLock(&pObj->mutex));
1,050✔
515
      pObj->userDroppedInfo.fp = fp;
1,050✔
516
      pObj->userDroppedInfo.param = param;
1,050✔
517
      TSC_ERR_RET(taosThreadMutexUnlock(&pObj->mutex));
1,050✔
518
      break;
1,050✔
519
    }
520
    case TAOS_NOTIFY_DATETIME_WHITELIST_VER: {
×
521
      TSC_ERR_RET(taosThreadMutexLock(&pObj->mutex));
×
522
      pObj->dateTimeWhiteListInfo.fp = fp;
×
523
      pObj->dateTimeWhiteListInfo.param = param;
×
524
      TSC_ERR_RET(taosThreadMutexUnlock(&pObj->mutex));
×
525
      break;
×
526
    }
527
    case TAOS_NOTIFY_TOKEN: {
×
528
      TSC_ERR_RET(taosThreadMutexLock(&pObj->mutex));
×
529
      pObj->tokenNotifyInfo.fp = fp;
×
530
      pObj->tokenNotifyInfo.param = param;
×
531
      TSC_ERR_RET(taosThreadMutexUnlock(&pObj->mutex));
×
532
      break;
×
533
    }
534
    default: {
×
535
      terrno = TSDB_CODE_INVALID_PARA;
×
536
      releaseTscObj(*(int64_t *)taos);
×
537
      return terrno;
×
538
    }
539
  }
540

541
  releaseTscObj(*(int64_t *)taos);
1,470✔
542
  return 0;
1,470✔
543
}
544

545
typedef struct SFetchWhiteListInfo {
546
  int64_t                     connId;
547
  __taos_async_whitelist_fn_t userCbFn;
548
  void                       *userParam;
549
} SFetchWhiteListInfo;
550

551
int32_t fetchWhiteListCallbackFn(void *param, SDataBuf *pMsg, int32_t code) {
×
552
  SFetchWhiteListInfo *pInfo = (SFetchWhiteListInfo *)param;
×
553
  TAOS                *taos = &pInfo->connId;
×
554
  if (code != TSDB_CODE_SUCCESS) {
×
555
    pInfo->userCbFn(pInfo->userParam, code, taos, 0, NULL);
×
556
    taosMemoryFree(pMsg->pData);
×
557
    taosMemoryFree(pMsg->pEpSet);
×
558
    taosMemoryFree(pInfo);
×
559
    return code;
×
560
  }
561

562
  SGetUserIpWhiteListRsp wlRsp;
×
563
  if (TSDB_CODE_SUCCESS != tDeserializeSGetUserIpWhiteListRsp(pMsg->pData, pMsg->len, &wlRsp)) {
×
564
    taosMemoryFree(pMsg->pData);
×
565
    taosMemoryFree(pMsg->pEpSet);
×
566
    taosMemoryFree(pInfo);
×
567
    tFreeSGetUserIpWhiteListRsp(&wlRsp);
×
568
    return terrno;
×
569
  }
570

571
  uint64_t *pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(uint64_t));
×
572
  if (pWhiteLists == NULL) {
×
573
    taosMemoryFree(pMsg->pData);
×
574
    taosMemoryFree(pMsg->pEpSet);
×
575
    taosMemoryFree(pInfo);
×
576
    tFreeSGetUserIpWhiteListRsp(&wlRsp);
×
577
    return terrno;
×
578
  }
579

580
  for (int i = 0; i < wlRsp.numWhiteLists; ++i) {
×
581
    pWhiteLists[i] = ((uint64_t)wlRsp.pWhiteLists[i].mask << 32) | wlRsp.pWhiteLists[i].ip;
×
582
  }
583

584
  pInfo->userCbFn(pInfo->userParam, code, taos, wlRsp.numWhiteLists, pWhiteLists);
×
585

586
  taosMemoryFree(pWhiteLists);
×
587
  taosMemoryFree(pMsg->pData);
×
588
  taosMemoryFree(pMsg->pEpSet);
×
589
  taosMemoryFree(pInfo);
×
590
  tFreeSGetUserIpWhiteListRsp(&wlRsp);
×
591
  return code;
×
592
}
593

594
void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *param) {
×
595
  if (NULL == taos) {
×
596
    fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
×
597
    return;
×
598
  }
599

600
  int64_t connId = *(int64_t *)taos;
×
601

602
  STscObj *pTsc = acquireTscObj(connId);
×
603
  if (NULL == pTsc) {
×
604
    fp(param, TSDB_CODE_TSC_DISCONNECTED, taos, 0, NULL);
×
605
    return;
×
606
  }
607

608
  SGetUserWhiteListReq req;
×
609
  (void)memcpy(req.user, pTsc->user, TSDB_USER_LEN);
×
610
  int32_t msgLen = tSerializeSGetUserWhiteListReq(NULL, 0, &req);
×
611
  if (msgLen < 0) {
×
612
    fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
×
613
    releaseTscObj(connId);
×
614
    return;
×
615
  }
616

617
  void *pReq = taosMemoryMalloc(msgLen);
×
618
  if (pReq == NULL) {
×
619
    fp(param, terrno, taos, 0, NULL);
×
620
    releaseTscObj(connId);
×
621
    return;
×
622
  }
623

624
  if (tSerializeSGetUserWhiteListReq(pReq, msgLen, &req) < 0) {
×
625
    fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
×
626
    taosMemoryFree(pReq);
×
627
    releaseTscObj(connId);
×
628
    return;
×
629
  }
630

631
  SFetchWhiteListInfo *pParam = taosMemoryMalloc(sizeof(SFetchWhiteListInfo));
×
632
  if (pParam == NULL) {
×
633
    fp(param, terrno, taos, 0, NULL);
×
634
    taosMemoryFree(pReq);
×
635
    releaseTscObj(connId);
×
636
    return;
×
637
  }
638

639
  pParam->connId = connId;
×
640
  pParam->userCbFn = fp;
×
641

642
  pParam->userParam = param;
×
643
  SMsgSendInfo *pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
644
  if (pSendInfo == NULL) {
×
645
    fp(param, terrno, taos, 0, NULL);
×
646
    taosMemoryFree(pParam);
×
647
    taosMemoryFree(pReq);
×
648
    releaseTscObj(connId);
×
649
    return;
×
650
  }
651

652
  pSendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = msgLen, .handle = NULL};
×
653
  pSendInfo->requestId = generateRequestId();
×
654
  pSendInfo->requestObjRefId = 0;
×
655
  pSendInfo->param = pParam;
×
656
  pSendInfo->fp = fetchWhiteListCallbackFn;
×
657
  pSendInfo->msgType = TDMT_MND_GET_USER_IP_WHITELIST;
×
658

659
  SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
×
660
  if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, NULL, pSendInfo)) {
×
661
    tscWarn("failed to async send msg to server");
×
662
  }
663
  releaseTscObj(connId);
×
664
  return;
×
665
}
666

667
typedef struct SFetchIpWhiteListInfo {
668
  int64_t connId;
669
  bool    supportNeg;
670
  void   *userParam;
671

672
  __taos_async_ip_whitelist_fn_t userCbFn;
673
} SFetchIpWhiteListInfo;
674

675
int32_t fetchIpWhiteListCallbackFn(void *param, SDataBuf *pMsg, int32_t code) {
×
676
  int32_t lino = 0;
×
677
  char  **pWhiteLists = NULL;
×
678

679
  SGetUserIpWhiteListRsp wlRsp = {0};
×
680

681
  SFetchIpWhiteListInfo *pInfo = (SFetchIpWhiteListInfo *)param;
×
682
  TAOS                  *taos = &pInfo->connId;
×
683

684
  if (code != TSDB_CODE_SUCCESS) {
×
685
    pInfo->userCbFn(pInfo->userParam, code, taos, 0, NULL);
×
686
    TAOS_CHECK_GOTO(code, &lino, _error);
×
687
  }
688

689
  if ((code = tDeserializeSGetUserIpWhiteListDualRsp(pMsg->pData, pMsg->len, &wlRsp)) != TSDB_CODE_SUCCESS) {
×
690
    TAOS_CHECK_GOTO(code, &lino, _error);
×
691
  }
692

693
  pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(char *));
×
694
  if (pWhiteLists == NULL) {
×
695
    code = terrno;
×
696
    TAOS_CHECK_GOTO(code, &lino, _error);
×
697
  }
698

699
  int32_t numWhiteLists = 0;
×
700
  for (int32_t i = 0; i < wlRsp.numWhiteLists; i++) {
×
701
    SIpRange *pIpRange = &wlRsp.pWhiteListsDual[i];
×
702
    if (!pInfo->supportNeg && pIpRange->neg) {
×
703
      continue;
×
704
    }
705
    SIpAddr ipAddr = {0};
×
706

707
    code = tIpUintToStr(pIpRange, &ipAddr);
×
708
    TAOS_CHECK_GOTO(code, &lino, _error);
×
709

710
    char *ip = taosMemCalloc(1, IP_RESERVE_CAP);
×
711
    if (ip == NULL) {
×
712
      code = terrno;
×
713
      TAOS_CHECK_GOTO(code, &lino, _error);
×
714
    }
715
    if (ipAddr.type == 0) {
×
716
      if (pInfo->supportNeg) {
×
717
        snprintf(ip, IP_RESERVE_CAP, "%c %s/%d", pIpRange->neg ? '-' : '+', ipAddr.ipv4, ipAddr.mask);
×
718
      } else {
719
        snprintf(ip, IP_RESERVE_CAP, "%s/%d", ipAddr.ipv4, ipAddr.mask);
×
720
      }
721
    } else {
722
      if (ipAddr.ipv6[0] == 0) {
×
723
        memcpy(ipAddr.ipv6, "::", 2);
×
724
      }
725
      if (pInfo->supportNeg) {
×
726
        snprintf(ip, IP_RESERVE_CAP, "%c %s/%d", pIpRange->neg ? '-' : '+', ipAddr.ipv6, ipAddr.mask);
×
727
      } else {
728
        snprintf(ip, IP_RESERVE_CAP, "%s/%d", ipAddr.ipv6, ipAddr.mask);
×
729
      }
730
    }
731
    pWhiteLists[numWhiteLists++] = ip;
×
732
  }
733

734
  pInfo->userCbFn(pInfo->userParam, code, taos, numWhiteLists, pWhiteLists);
×
735
_error:
×
736
  if (pWhiteLists != NULL) {
×
737
    for (int32_t i = 0; i < numWhiteLists; i++) {
×
738
      taosMemFree(pWhiteLists[i]);
×
739
    }
740
    taosMemoryFree(pWhiteLists);
×
741
  }
742
  taosMemoryFree(pMsg->pData);
×
743
  taosMemoryFree(pMsg->pEpSet);
×
744
  taosMemoryFree(pInfo);
×
745
  tFreeSGetUserIpWhiteListDualRsp(&wlRsp);
×
746
  return code;
×
747
}
748

749
static void taosFetchIpWhiteList(TAOS *taos, __taos_async_whitelist_dual_stack_fn_t fp, void *param, bool supportNeg) {
×
750
  if (NULL == taos) {
×
751
    fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
×
752
    return;
×
753
  }
754
  int64_t connId = *(int64_t *)taos;
×
755

756
  STscObj *pTsc = acquireTscObj(connId);
×
757
  if (NULL == pTsc) {
×
758
    fp(param, TSDB_CODE_TSC_DISCONNECTED, taos, 0, NULL);
×
759
    return;
×
760
  }
761

762
  SGetUserWhiteListReq req;
×
763
  (void)memcpy(req.user, pTsc->user, TSDB_USER_LEN);
×
764
  int32_t msgLen = tSerializeSGetUserWhiteListReq(NULL, 0, &req);
×
765
  if (msgLen < 0) {
×
766
    fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
×
767
    releaseTscObj(connId);
×
768
    return;
×
769
  }
770

771
  void *pReq = taosMemoryMalloc(msgLen);
×
772
  if (pReq == NULL) {
×
773
    fp(param, terrno, taos, 0, NULL);
×
774
    releaseTscObj(connId);
×
775
    return;
×
776
  }
777

778
  if (tSerializeSGetUserWhiteListReq(pReq, msgLen, &req) < 0) {
×
779
    fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
×
780
    taosMemoryFree(pReq);
×
781
    releaseTscObj(connId);
×
782
    return;
×
783
  }
784

785
  SFetchIpWhiteListInfo *pParam = taosMemoryMalloc(sizeof(SFetchIpWhiteListInfo));
×
786
  if (pParam == NULL) {
×
787
    fp(param, terrno, taos, 0, NULL);
×
788
    taosMemoryFree(pReq);
×
789
    releaseTscObj(connId);
×
790
    return;
×
791
  }
792

793
  pParam->connId = connId;
×
794
  pParam->supportNeg = supportNeg;
×
795
  pParam->userCbFn = fp;
×
796
  pParam->userParam = param;
×
797

798
  SMsgSendInfo *pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
799
  if (pSendInfo == NULL) {
×
800
    fp(param, terrno, taos, 0, NULL);
×
801
    taosMemoryFree(pParam);
×
802
    taosMemoryFree(pReq);
×
803
    releaseTscObj(connId);
×
804
    return;
×
805
  }
806

807
  pSendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = msgLen, .handle = NULL};
×
808
  pSendInfo->requestId = generateRequestId();
×
809
  pSendInfo->requestObjRefId = 0;
×
810
  pSendInfo->param = pParam;
×
811
  pSendInfo->fp = fetchIpWhiteListCallbackFn;
×
812
  pSendInfo->msgType = TDMT_MND_GET_USER_IP_WHITELIST_DUAL;
×
813

814
  SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
×
815
  if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, NULL, pSendInfo)) {
×
816
    tscWarn("failed to async send msg to server");
×
817
  }
818
  releaseTscObj(connId);
×
819
  return;
×
820
}
821

822
void taos_fetch_whitelist_dual_stack_a(TAOS *taos, __taos_async_whitelist_dual_stack_fn_t fp, void *param) {
×
823
  taosFetchIpWhiteList(taos, fp, param, false);
×
824
}
×
825

826
void taos_fetch_ip_whitelist_a(TAOS *taos, __taos_async_ip_whitelist_fn_t fp, void *param) {
×
827
  taosFetchIpWhiteList(taos, fp, param, true);
×
828
}
×
829

830
typedef struct SFetchDateTimeWhiteListInfo {
831
  int64_t                              connId;
832
  void                                *userParam;
833
  __taos_async_datetime_whitelist_fn_t userCbFn;
834
} SFetchDateTimeWhiteListInfo;
835

836
static const char *weekdays[] = {"SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT"};
837
int32_t            fetchDateTimeWhiteListCallbackFn(void *param, SDataBuf *pMsg, int32_t code) {
×
838
  int32_t lino = 0;
×
839
  char  **pWhiteLists = NULL;
×
840

841
  SUserDateTimeWhiteList wlRsp = {0};
×
842

843
  SFetchDateTimeWhiteListInfo *pInfo = (SFetchDateTimeWhiteListInfo *)param;
×
844
  TAOS                        *taos = &pInfo->connId;
×
845

846
  if (code != TSDB_CODE_SUCCESS) {
×
847
    pInfo->userCbFn(pInfo->userParam, code, taos, 0, NULL);
×
848
    TAOS_CHECK_GOTO(code, &lino, _error);
×
849
  }
850

851
  if ((code = tDeserializeSUserDateTimeWhiteList(pMsg->pData, pMsg->len, &wlRsp)) != TSDB_CODE_SUCCESS) {
×
852
    TAOS_CHECK_GOTO(code, &lino, _error);
×
853
  }
854

855
  pWhiteLists = taosMemoryMalloc(wlRsp.numWhiteLists * sizeof(char *));
×
856
  if (pWhiteLists == NULL) {
×
857
    code = terrno;
×
858
    TAOS_CHECK_GOTO(code, &lino, _error);
×
859
  }
860

861
  int32_t numWhiteLists = 0;
×
862
  for (int32_t i = 0; i < wlRsp.numWhiteLists; i++) {
×
863
    SDateTimeWhiteListItem *item = &wlRsp.pWhiteLists[i];
×
864

865
    char *p = taosMemCalloc(1, 128);
×
866
    if (p == NULL) {
×
867
      code = terrno;
×
868
      TAOS_CHECK_GOTO(code, &lino, _error);
×
869
    }
870

871
    int duration = item->duration / 60;
×
872

873
    if (item->absolute) {
×
874
      struct STm tm;
×
875
      (void)taosTs2Tm(item->start, TSDB_TIME_PRECISION_SECONDS, &tm, NULL);
×
876
      snprintf(p, 128, "%c %04d-%02d-%02d %02d:%02d %d", item->neg ? '-' : '+', tm.tm.tm_year + 1900, tm.tm.tm_mon + 1,
×
877
                          tm.tm.tm_mday, tm.tm.tm_hour, tm.tm.tm_min, duration);
878
    } else {
879
      int day = item->start / 86400;
×
880
      int hour = (item->start % 86400) / 3600;
×
881
      int minute = (item->start % 3600) / 60;
×
882
      snprintf(p, 128, "%c %s %02d:%02d %d", item->neg ? '-' : '+', weekdays[day], hour, minute, duration);
×
883
    }
884
    pWhiteLists[numWhiteLists++] = p;
×
885
  }
886

887
  pInfo->userCbFn(pInfo->userParam, code, taos, numWhiteLists, pWhiteLists);
×
888
_error:
×
889
  if (pWhiteLists != NULL) {
×
890
    for (int32_t i = 0; i < numWhiteLists; i++) {
×
891
      taosMemFree(pWhiteLists[i]);
×
892
    }
893
    taosMemoryFree(pWhiteLists);
×
894
  }
895
  taosMemoryFree(pMsg->pData);
×
896
  taosMemoryFree(pMsg->pEpSet);
×
897
  taosMemoryFree(pInfo);
×
898
  tFreeSUserDateTimeWhiteList(&wlRsp);
×
899
  return code;
×
900
}
901

902
void taos_fetch_datetime_whitelist_a(TAOS *taos, __taos_async_datetime_whitelist_fn_t fp, void *param) {
×
903
  if (NULL == taos) {
×
904
    fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
×
905
    return;
×
906
  }
907
  int64_t connId = *(int64_t *)taos;
×
908

909
  STscObj *pTsc = acquireTscObj(connId);
×
910
  if (NULL == pTsc) {
×
911
    fp(param, TSDB_CODE_TSC_DISCONNECTED, taos, 0, NULL);
×
912
    return;
×
913
  }
914

915
  SGetUserWhiteListReq req;
×
916
  (void)memcpy(req.user, pTsc->user, TSDB_USER_LEN);
×
917
  int32_t msgLen = tSerializeSGetUserWhiteListReq(NULL, 0, &req);
×
918
  if (msgLen < 0) {
×
919
    fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
×
920
    releaseTscObj(connId);
×
921
    return;
×
922
  }
923

924
  void *pReq = taosMemoryMalloc(msgLen);
×
925
  if (pReq == NULL) {
×
926
    fp(param, terrno, taos, 0, NULL);
×
927
    releaseTscObj(connId);
×
928
    return;
×
929
  }
930

931
  if (tSerializeSGetUserWhiteListReq(pReq, msgLen, &req) < 0) {
×
932
    fp(param, TSDB_CODE_INVALID_PARA, taos, 0, NULL);
×
933
    taosMemoryFree(pReq);
×
934
    releaseTscObj(connId);
×
935
    return;
×
936
  }
937

938
  SFetchDateTimeWhiteListInfo *pParam = taosMemoryMalloc(sizeof(SFetchDateTimeWhiteListInfo));
×
939
  if (pParam == NULL) {
×
940
    fp(param, terrno, taos, 0, NULL);
×
941
    taosMemoryFree(pReq);
×
942
    releaseTscObj(connId);
×
943
    return;
×
944
  }
945

946
  pParam->connId = connId;
×
947
  pParam->userCbFn = fp;
×
948
  pParam->userParam = param;
×
949

950
  SMsgSendInfo *pSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
951
  if (pSendInfo == NULL) {
×
952
    fp(param, terrno, taos, 0, NULL);
×
953
    taosMemoryFree(pParam);
×
954
    taosMemoryFree(pReq);
×
955
    releaseTscObj(connId);
×
956
    return;
×
957
  }
958

959
  pSendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = msgLen, .handle = NULL};
×
960
  pSendInfo->requestId = generateRequestId();
×
961
  pSendInfo->requestObjRefId = 0;
×
962
  pSendInfo->param = pParam;
×
963
  pSendInfo->fp = fetchDateTimeWhiteListCallbackFn;
×
964
  pSendInfo->msgType = TDMT_MND_GET_USER_DATETIME_WHITELIST;
×
965

966
  SEpSet epSet = getEpSet_s(&pTsc->pAppInfo->mgmtEp);
×
967
  if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pTsc->pAppInfo->pTransporter, &epSet, NULL, pSendInfo)) {
×
968
    tscWarn("failed to async send msg to server");
×
969
  }
970
  releaseTscObj(connId);
×
971
  return;
×
972
}
973

974
void taos_close_internal(void *taos) {
100,841,979✔
975
  if (taos == NULL) {
100,841,979✔
976
    return;
373✔
977
  }
978
  int32_t code = 0;
100,841,606✔
979

980
  STscObj *pTscObj = (STscObj *)taos;
100,841,606✔
981
  tscDebug("conn:0x%" PRIx64 ", try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
100,841,606✔
982

983
  SSessParam para = {.type = SESSION_PER_USER, .value = -1, .noCheck = 1};
100,841,606✔
984

985
  code = tscUpdateSessMetric(pTscObj, &para);
100,841,606✔
986
  if (code != TSDB_CODE_SUCCESS) {
100,842,331✔
987
    tscWarn("conn:0x%" PRIx64 ", failed to update user:%s metric when close connection, code:%d", pTscObj->id,
×
988
            pTscObj->user, code);
989
  }
990

991
  code = tscUnrefSessMetric(pTscObj);
100,842,331✔
992
  if (TSDB_CODE_SUCCESS != taosRemoveRef(clientConnRefPool, pTscObj->id)) {
100,841,297✔
993
    tscError("conn:0x%" PRIx64 ", failed to remove ref from conn pool", pTscObj->id);
×
994
  }
995
}
996

997
void taos_close(TAOS *taos) {
100,666,991✔
998
  if (taos == NULL) {
100,666,991✔
999
    return;
904✔
1000
  }
1001

1002
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
100,666,087✔
1003
  if (NULL == pObj) {
100,667,865✔
1004
    taosMemoryFree(taos);
×
1005
    return;
×
1006
  }
1007

1008
  taos_close_internal(pObj);
100,667,865✔
1009
  releaseTscObj(*(int64_t *)taos);
100,666,181✔
1010
  taosMemoryFree(taos);
100,666,087✔
1011
}
1012

1013
int taos_errno(TAOS_RES *res) {
1,604,900,399✔
1014
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
1,604,900,399✔
1015
    return terrno;
14,541,324✔
1016
  }
1017

1018
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
1,590,365,978✔
1019
    return 0;
21,399,835✔
1020
  }
1021

1022
  return ((SRequestObj *)res)->code;
1,568,957,525✔
1023
}
1024

1025
const char *taos_errstr(TAOS_RES *res) {
132,731,715✔
1026
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
132,731,715✔
1027
    if (*(taosGetErrMsg()) == 0) {
14,562,798✔
1028
      return (const char *)tstrerror(terrno);
14,562,363✔
1029
    } else {
1030
      (void)snprintf(taosGetErrMsgReturn(), ERR_MSG_LEN, "%s", taosGetErrMsg());
72✔
1031
      return (const char *)taosGetErrMsgReturn();
72✔
1032
    }
1033
  }
1034

1035
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
118,168,917✔
1036
    return "success";
×
1037
  }
1038

1039
  SRequestObj *pRequest = (SRequestObj *)res;
118,169,808✔
1040
  if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
118,169,808✔
1041
    return pRequest->msgBuf;
33,685,350✔
1042
  } else {
1043
    return (const char *)tstrerror(pRequest->code);
84,484,458✔
1044
  }
1045
}
1046

1047
void taos_free_result(TAOS_RES *res) {
1,225,471,471✔
1048
  if (NULL == res) {
1,225,471,471✔
1049
    return;
15,538,518✔
1050
  }
1051

1052
  tscTrace("res:%p, will be freed", res);
1,209,932,953✔
1053

1054
  if (TD_RES_QUERY(res)) {
1,209,939,154✔
1055
    SRequestObj *pRequest = (SRequestObj *)res;
1,152,752,173✔
1056
    tscDebug("QID:0x%" PRIx64 ", call taos_free_result to free query, res:%p", pRequest->requestId, res);
1,152,752,173✔
1057
    destroyRequest(pRequest);
1,152,750,734✔
1058
    return;
1,152,742,121✔
1059
  }
1060

1061
  SMqRspObj *pRsp = (SMqRspObj *)res;
57,189,047✔
1062
  if (TD_RES_TMQ(res)) {
57,189,047✔
1063
    tDeleteMqDataRsp(&pRsp->dataRsp);
57,099,011✔
1064
    doFreeReqResultInfo(&pRsp->resInfo);
57,097,769✔
1065
  } else if (TD_RES_TMQ_METADATA(res)) {
110,254✔
1066
    tDeleteSTaosxRsp(&pRsp->dataRsp);
5,624✔
1067
    doFreeReqResultInfo(&pRsp->resInfo);
5,624✔
1068
  } else if (TD_RES_TMQ_META(res)) {
105,041✔
1069
    tDeleteMqMetaRsp(&pRsp->metaRsp);
95,562✔
1070
  } else if (TD_RES_TMQ_BATCH_META(res)) {
9,479✔
1071
    tDeleteMqBatchMetaRsp(&pRsp->batchMetaRsp);
9,479✔
1072
  } else if (TD_RES_TMQ_RAW(res)) {
×
1073
    tDeleteMqRawDataRsp(&pRsp->dataRsp);
×
1074
  }
1075
  taosMemoryFree(pRsp);
57,206,778✔
1076
}
1077

1078
void taos_kill_query(TAOS *taos) {
984✔
1079
  if (NULL == taos) {
984✔
1080
    return;
806✔
1081
  }
1082

1083
  int64_t  rid = *(int64_t *)taos;
178✔
1084
  STscObj *pTscObj = acquireTscObj(rid);
178✔
1085
  if (pTscObj) {
178✔
1086
    stopAllRequests(pTscObj->pRequests);
178✔
1087
  }
1088
  releaseTscObj(rid);
178✔
1089
}
1090

1091
int taos_field_count(TAOS_RES *res) {
2,147,483,647✔
1092
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
2,147,483,647✔
1093
    return 0;
×
1094
  }
1095

1096
  SReqResultInfo *pResInfo = tscGetCurResInfo(res);
2,147,483,647✔
1097
  return pResInfo->numOfCols;
2,147,483,647✔
1098
}
1099

1100
int taos_num_fields(TAOS_RES *res) { return taos_field_count(res); }
2,147,483,647✔
1101

1102
TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
2,147,483,647✔
1103
  if (taos_num_fields(res) == 0 || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
2,147,483,647✔
1104
    return NULL;
4,034,028✔
1105
  }
1106

1107
  SReqResultInfo *pResInfo = tscGetCurResInfo(res);
2,147,483,647✔
1108
  return pResInfo->userFields;
2,147,483,647✔
1109
}
1110

1111
TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taosQueryImpl(taos, sql, false, TD_REQ_FROM_APP); }
1,144,199,748✔
1112
TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqid) {
298✔
1113
  return taosQueryImplWithReqid(taos, sql, false, reqid);
298✔
1114
}
1115

1116
TAOS_FIELD_E *taos_fetch_fields_e(TAOS_RES *res) {
56,335✔
1117
  if (taos_num_fields(res) == 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
56,335✔
1118
    return NULL;
×
1119
  }
1120
  SReqResultInfo *pResInfo = tscGetCurResInfo(res);
56,312✔
1121
  return pResInfo->fields;
56,312✔
1122
}
1123

1124
TAOS_ROW taos_fetch_row(TAOS_RES *res) {
2,147,483,647✔
1125
  if (res == NULL) {
2,147,483,647✔
1126
    return NULL;
×
1127
  }
1128

1129
  if (TD_RES_QUERY(res)) {
2,147,483,647✔
1130
    SRequestObj *pRequest = (SRequestObj *)res;
1,483,770,743✔
1131
    if (pRequest->killed) {
1,483,770,743✔
1132
      tscInfo("query has been killed, can not fetch more row.");
×
1133
      pRequest->code = TSDB_CODE_TSC_QUERY_KILLED;
×
1134
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
×
1135
      return NULL;
×
1136
    }
1137

1138
    if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
1,483,770,791✔
1139
        pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
1,483,770,735✔
1140
      if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
812✔
1141
        CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
×
1142
      }
1143
      return NULL;
700✔
1144
    }
1145

1146
    if (pRequest->inCallback) {
1,483,769,986✔
1147
      tscError("can not call taos_fetch_row before query callback ends.");
296✔
1148
      terrno = TSDB_CODE_TSC_INVALID_OPERATION;
296✔
1149
      return NULL;
296✔
1150
    }
1151

1152
    return doAsyncFetchRows(pRequest, true, true);
1,483,769,684✔
1153
  } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
2,147,483,647✔
1154
    SMqRspObj      *msg = ((SMqRspObj *)res);
2,147,483,647✔
1155
    SReqResultInfo *pResultInfo = NULL;
2,147,483,647✔
1156
    if (msg->resIter == -1) {
2,147,483,647✔
1157
      if (tmqGetNextResInfo(res, true, &pResultInfo) != 0) {
35,646,945✔
1158
        return NULL;
×
1159
      }
1160
    } else {
1161
      pResultInfo = tmqGetCurResInfo(res);
2,147,483,647✔
1162
    }
1163

1164
    if (pResultInfo->current < pResultInfo->numOfRows) {
2,147,483,647✔
1165
      doSetOneRowPtr(pResultInfo);
2,147,483,647✔
1166
      pResultInfo->current += 1;
2,147,483,647✔
1167
      return pResultInfo->row;
2,147,483,647✔
1168
    } else {
1169
      if (tmqGetNextResInfo(res, true, &pResultInfo) != 0) {
63,784,296✔
1170
        return NULL;
35,645,215✔
1171
      }
1172

1173
      doSetOneRowPtr(pResultInfo);
28,137,853✔
1174
      pResultInfo->current += 1;
28,138,681✔
1175
      return pResultInfo->row;
28,137,853✔
1176
    }
1177
  } else if (TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
1178
    return NULL;
×
1179
  } else {
1180
    tscError("invalid result passed to taos_fetch_row");
×
1181
    terrno = TSDB_CODE_TMQ_INVALID_DATA;
×
1182
    return NULL;
×
1183
  }
1184
}
1185

1186
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
2,147,483,647✔
1187
  return taos_print_row_with_size(str, INT32_MAX, row, fields, num_fields);
2,147,483,647✔
1188
}
1189
int taos_print_row_with_size(char *str, uint32_t size, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
2,147,483,647✔
1190
  int32_t len = 0;
2,147,483,647✔
1191
  for (int i = 0; i < num_fields; ++i) {
2,147,483,647✔
1192
    if (i > 0 && len < size - 1) {
2,147,483,647✔
1193
      str[len++] = ' ';
2,147,483,647✔
1194
    }
1195

1196
    if (row[i] == NULL) {
2,147,483,647✔
1197
      len += snprintf(str + len, size - len, "%s", TSDB_DATA_NULL_STR);
167,872,657✔
1198
      continue;
167,873,033✔
1199
    }
1200

1201
    switch (fields[i].type) {
2,147,483,647✔
1202
      case TSDB_DATA_TYPE_TINYINT:
18,847,775✔
1203
        len += snprintf(str + len, size - len, "%d", *((int8_t *)row[i]));
18,847,775✔
1204
        break;
18,872,156✔
1205

1206
      case TSDB_DATA_TYPE_UTINYINT:
56,800✔
1207
        len += snprintf(str + len, size - len, "%u", *((uint8_t *)row[i]));
56,800✔
1208
        break;
56,800✔
1209

1210
      case TSDB_DATA_TYPE_SMALLINT:
56,996✔
1211
        len += snprintf(str + len, size - len, "%d", *((int16_t *)row[i]));
56,996✔
1212
        break;
56,996✔
1213

1214
      case TSDB_DATA_TYPE_USMALLINT:
56,800✔
1215
        len += snprintf(str + len, size - len, "%u", *((uint16_t *)row[i]));
56,800✔
1216
        break;
56,800✔
1217

1218
      case TSDB_DATA_TYPE_INT:
2,147,483,647✔
1219
        len += snprintf(str + len, size - len, "%d", *((int32_t *)row[i]));
2,147,483,647✔
1220
        break;
2,147,483,647✔
1221

1222
      case TSDB_DATA_TYPE_UINT:
56,800✔
1223
        len += snprintf(str + len, size - len, "%u", *((uint32_t *)row[i]));
56,800✔
1224
        break;
56,800✔
1225

1226
      case TSDB_DATA_TYPE_BIGINT:
2,147,483,647✔
1227
        len += snprintf(str + len, size - len, "%" PRId64, *((int64_t *)row[i]));
2,147,483,647✔
1228
        break;
2,147,483,647✔
1229

1230
      case TSDB_DATA_TYPE_UBIGINT:
56,800✔
1231
        len += snprintf(str + len, size - len, "%" PRIu64, *((uint64_t *)row[i]));
56,800✔
1232
        break;
56,800✔
1233

1234
      case TSDB_DATA_TYPE_FLOAT: {
37,554,348✔
1235
        float fv = 0;
37,554,348✔
1236
        fv = GET_FLOAT_VAL(row[i]);
37,554,348✔
1237
        len += snprintf(str + len, size - len, "%.*g", FLT_DIG, fv);
37,542,504✔
1238
      } break;
37,552,143✔
1239

1240
      case TSDB_DATA_TYPE_DOUBLE: {
2,147,483,647✔
1241
        double dv = 0;
2,147,483,647✔
1242
        dv = GET_DOUBLE_VAL(row[i]);
2,147,483,647✔
1243
        len += snprintf(str + len, size - len, "%.*g", DBL_DIG, dv);
2,147,483,647✔
1244
      } break;
2,147,483,647✔
1245

1246
      case TSDB_DATA_TYPE_VARBINARY: {
56,800✔
1247
        void    *data = NULL;
56,800✔
1248
        uint32_t tmp = 0;
56,800✔
1249
        int32_t  charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
56,800✔
1250
        if (taosAscii2Hex(row[i], charLen, &data, &tmp) < 0) {
56,800✔
1251
          break;
×
1252
        }
1253
        uint32_t copyLen = TMIN(size - len - 1, tmp);
56,800✔
1254
        (void)memcpy(str + len, data, copyLen);
56,800✔
1255
        len += copyLen;
56,800✔
1256
        taosMemoryFree(data);
56,800✔
1257
      } break;
56,800✔
1258
      case TSDB_DATA_TYPE_BINARY:
2,147,483,647✔
1259
      case TSDB_DATA_TYPE_NCHAR:
1260
      case TSDB_DATA_TYPE_GEOMETRY: {
1261
        int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
2,147,483,647✔
1262
        if (fields[i].type == TSDB_DATA_TYPE_BINARY || fields[i].type == TSDB_DATA_TYPE_VARBINARY ||
2,147,483,647✔
1263
            fields[i].type == TSDB_DATA_TYPE_GEOMETRY) {
2,147,483,647✔
1264
          if (charLen > fields[i].bytes || charLen < 0) {
2,147,483,647✔
1265
            tscError("taos_print_row error binary. charLen:%d, fields[i].bytes:%d", charLen, fields[i].bytes);
818✔
1266
            break;
×
1267
          }
1268
        } else {
1269
          if (charLen > fields[i].bytes * TSDB_NCHAR_SIZE || charLen < 0) {
2,147,483,647✔
1270
            tscError("taos_print_row error. charLen:%d, fields[i].bytes:%d", charLen, fields[i].bytes);
384✔
1271
            break;
×
1272
          }
1273
        }
1274

1275
        uint32_t copyLen = TMIN(size - len - 1, charLen);
2,147,483,647✔
1276
        (void)memcpy(str + len, row[i], copyLen);
2,147,483,647✔
1277
        len += copyLen;
2,147,483,647✔
1278
      } break;
2,147,483,647✔
1279
      case TSDB_DATA_TYPE_BLOB:
×
1280
      case TSDB_DATA_TYPE_MEDIUMBLOB: {
1281
        void    *data = NULL;
×
1282
        uint32_t tmp = 0;
×
1283
        int32_t  charLen = blobDataLen((char *)row[i] - BLOBSTR_HEADER_SIZE);
×
1284
        if (taosAscii2Hex(row[i], charLen, &data, &tmp) < 0) {
×
1285
          break;
×
1286
        }
1287

1288
        uint32_t copyLen = TMIN(size - len - 1, tmp);
×
1289
        (void)memcpy(str + len, data, copyLen);
×
1290
        len += copyLen;
×
1291

1292
        taosMemoryFree(data);
×
1293
      } break;
×
1294

1295
      case TSDB_DATA_TYPE_TIMESTAMP:
2,147,483,647✔
1296
        len += snprintf(str + len, size - len, "%" PRId64, *((int64_t *)row[i]));
2,147,483,647✔
1297
        break;
2,147,483,647✔
1298

1299
      case TSDB_DATA_TYPE_BOOL:
56,996✔
1300
        len += snprintf(str + len, size - len, "%d", *((int8_t *)row[i]));
56,996✔
1301
        break;
56,996✔
1302
      case TSDB_DATA_TYPE_DECIMAL64:
×
1303
      case TSDB_DATA_TYPE_DECIMAL: {
1304
        uint32_t decimalLen = strlen(row[i]);
×
1305
        uint32_t copyLen = TMIN(size - len - 1, decimalLen);
×
1306
        (void)memcpy(str + len, row[i], copyLen);
×
1307
        len += copyLen;
×
1308
      } break;
×
1309
      default:
×
1310
        break;
×
1311
    }
1312

1313
    if (len >= size - 1) {
2,147,483,647✔
1314
      break;
×
1315
    }
1316
  }
1317
  if (len < size) {
2,147,483,647✔
1318
    str[len] = 0;
2,147,483,647✔
1319
  }
1320

1321
  return len;
2,147,483,647✔
1322
}
1323

1324
int *taos_fetch_lengths(TAOS_RES *res) {
2,147,483,647✔
1325
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
2,147,483,647✔
1326
    return NULL;
×
1327
  }
1328

1329
  SReqResultInfo *pResInfo = tscGetCurResInfo(res);
2,147,483,647✔
1330
  return pResInfo->length;
2,147,483,647✔
1331
}
1332

1333
TAOS_ROW *taos_result_block(TAOS_RES *res) {
×
1334
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
1335
    terrno = TSDB_CODE_INVALID_PARA;
×
1336
    return NULL;
×
1337
  }
1338

1339
  if (taos_is_update_query(res)) {
×
1340
    return NULL;
×
1341
  }
1342

1343
  SReqResultInfo *pResInfo = tscGetCurResInfo(res);
×
1344
  return &pResInfo->row;
×
1345
}
1346

1347
// todo intergrate with tDataTypes
1348
const char *taos_data_type(int type) {
×
1349
  switch (type) {
×
1350
    case TSDB_DATA_TYPE_NULL:
×
1351
      return "TSDB_DATA_TYPE_NULL";
×
1352
    case TSDB_DATA_TYPE_BOOL:
×
1353
      return "TSDB_DATA_TYPE_BOOL";
×
1354
    case TSDB_DATA_TYPE_TINYINT:
×
1355
      return "TSDB_DATA_TYPE_TINYINT";
×
1356
    case TSDB_DATA_TYPE_SMALLINT:
×
1357
      return "TSDB_DATA_TYPE_SMALLINT";
×
1358
    case TSDB_DATA_TYPE_INT:
×
1359
      return "TSDB_DATA_TYPE_INT";
×
1360
    case TSDB_DATA_TYPE_BIGINT:
×
1361
      return "TSDB_DATA_TYPE_BIGINT";
×
1362
    case TSDB_DATA_TYPE_FLOAT:
×
1363
      return "TSDB_DATA_TYPE_FLOAT";
×
1364
    case TSDB_DATA_TYPE_DOUBLE:
×
1365
      return "TSDB_DATA_TYPE_DOUBLE";
×
1366
    case TSDB_DATA_TYPE_VARCHAR:
×
1367
      return "TSDB_DATA_TYPE_VARCHAR";
×
1368
      //    case TSDB_DATA_TYPE_BINARY:          return "TSDB_DATA_TYPE_VARCHAR";
1369
    case TSDB_DATA_TYPE_TIMESTAMP:
×
1370
      return "TSDB_DATA_TYPE_TIMESTAMP";
×
1371
    case TSDB_DATA_TYPE_NCHAR:
×
1372
      return "TSDB_DATA_TYPE_NCHAR";
×
1373
    case TSDB_DATA_TYPE_JSON:
×
1374
      return "TSDB_DATA_TYPE_JSON";
×
1375
    case TSDB_DATA_TYPE_GEOMETRY:
×
1376
      return "TSDB_DATA_TYPE_GEOMETRY";
×
1377
    case TSDB_DATA_TYPE_UTINYINT:
×
1378
      return "TSDB_DATA_TYPE_UTINYINT";
×
1379
    case TSDB_DATA_TYPE_USMALLINT:
×
1380
      return "TSDB_DATA_TYPE_USMALLINT";
×
1381
    case TSDB_DATA_TYPE_UINT:
×
1382
      return "TSDB_DATA_TYPE_UINT";
×
1383
    case TSDB_DATA_TYPE_UBIGINT:
×
1384
      return "TSDB_DATA_TYPE_UBIGINT";
×
1385
    case TSDB_DATA_TYPE_VARBINARY:
×
1386
      return "TSDB_DATA_TYPE_VARBINARY";
×
1387
    case TSDB_DATA_TYPE_DECIMAL:
×
1388
      return "TSDB_DATA_TYPE_DECIMAL";
×
1389
    case TSDB_DATA_TYPE_BLOB:
×
1390
      return "TSDB_DATA_TYPE_BLOB";
×
1391
    case TSDB_DATA_TYPE_MEDIUMBLOB:
×
1392
      return "TSDB_DATA_TYPE_MEDIUMBLOB";
×
1393
    default:
×
1394
      return "UNKNOWN";
×
1395
  }
1396
}
1397

1398
const char *taos_get_client_info() { return td_version; }
1,125,967✔
1399

1400
// return int32_t
1401
int taos_affected_rows(TAOS_RES *res) {
688,252,956✔
1402
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
688,252,956✔
1403
      TD_RES_TMQ_BATCH_META(res)) {
688,258,535✔
1404
    return 0;
×
1405
  }
1406

1407
  SRequestObj    *pRequest = (SRequestObj *)res;
688,256,297✔
1408
  SReqResultInfo *pResInfo = &pRequest->body.resInfo;
688,256,297✔
1409
  return (int)pResInfo->numOfRows;
688,256,426✔
1410
}
1411

1412
// return int64_t
1413
int64_t taos_affected_rows64(TAOS_RES *res) {
1,515,936✔
1414
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
1,515,936✔
1415
      TD_RES_TMQ_BATCH_META(res)) {
1,515,936✔
1416
    return 0;
×
1417
  }
1418

1419
  SRequestObj    *pRequest = (SRequestObj *)res;
1,515,936✔
1420
  SReqResultInfo *pResInfo = &pRequest->body.resInfo;
1,515,936✔
1421
  return pResInfo->numOfRows;
1,515,936✔
1422
}
1423

1424
int taos_result_precision(TAOS_RES *res) {
2,147,483,647✔
1425
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
2,147,483,647✔
1426
    return TSDB_TIME_PRECISION_MILLI;
×
1427
  }
1428

1429
  if (TD_RES_QUERY(res)) {
2,147,483,647✔
1430
    SRequestObj *pRequest = (SRequestObj *)res;
275,890,104✔
1431
    return pRequest->body.resInfo.precision;
275,890,104✔
1432
  } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
2,147,483,647✔
1433
    SReqResultInfo *info = tmqGetCurResInfo(res);
2,147,483,647✔
1434
    return info->precision;
2,147,483,647✔
1435
  }
1436
  return TSDB_TIME_PRECISION_MILLI;
×
1437
}
1438

1439
int taos_select_db(TAOS *taos, const char *db) {
105,116✔
1440
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
105,116✔
1441
  if (pObj == NULL) {
105,130✔
1442
    releaseTscObj(*(int64_t *)taos);
×
1443
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
1444
    return TSDB_CODE_TSC_DISCONNECTED;
×
1445
  }
1446

1447
  if (db == NULL || strlen(db) == 0) {
105,130✔
1448
    releaseTscObj(*(int64_t *)taos);
14✔
1449
    tscError("invalid parameter for %s", db == NULL ? "db is NULL" : "db is empty");
×
1450
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
×
1451
    return terrno;
×
1452
  }
1453

1454
  char sql[256] = {0};
105,152✔
1455
  (void)snprintf(sql, tListLen(sql), "use %s", db);
105,116✔
1456

1457
  TAOS_RES *pRequest = taos_query(taos, sql);
105,116✔
1458
  int32_t   code = taos_errno(pRequest);
105,141✔
1459

1460
  taos_free_result(pRequest);
104,998✔
1461
  releaseTscObj(*(int64_t *)taos);
105,103✔
1462
  return code;
105,166✔
1463
}
1464

1465
void taos_stop_query(TAOS_RES *res) {
1,254,752,101✔
1466
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res) ||
1,254,752,101✔
1467
      TD_RES_TMQ_BATCH_META(res)) {
1,254,770,468✔
1468
    return;
×
1469
  }
1470

1471
  stopAllQueries((SRequestObj *)res);
1,254,776,709✔
1472
}
1473

1474
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
×
1475
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
×
1476
    return true;
×
1477
  }
1478
  SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
×
1479
  if (col >= pResultInfo->numOfCols || col < 0 || row >= pResultInfo->numOfRows || row < 0) {
×
1480
    return true;
×
1481
  }
1482

1483
  SResultColumn *pCol = &pResultInfo->pCol[col];
×
1484
  if (IS_VAR_DATA_TYPE(pResultInfo->fields[col].type)) {
×
1485
    return (pCol->offset[row] == -1);
×
1486
  } else {
1487
    return colDataIsNull_f(pCol, row);
×
1488
  }
1489
}
1490

1491
bool taos_is_update_query(TAOS_RES *res) { return taos_num_fields(res) == 0; }
456,885✔
1492

1493
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
385,574,934✔
1494
  int32_t numOfRows = 0;
385,574,934✔
1495
  /*int32_t code = */ terrno = taos_fetch_block_s(res, &numOfRows, rows);
385,574,934✔
1496
  return numOfRows;
385,574,934✔
1497
}
1498

1499
int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
385,574,934✔
1500
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
385,574,934✔
1501
    return 0;
×
1502
  }
1503

1504
  if (TD_RES_QUERY(res)) {
385,574,934✔
1505
    SRequestObj *pRequest = (SRequestObj *)res;
342,791,684✔
1506
    (*rows) = NULL;
342,791,684✔
1507
    (*numOfRows) = 0;
342,791,684✔
1508

1509
    if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
342,791,684✔
1510
        pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
342,392,111✔
1511
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
3,316,738✔
1512
      return pRequest->code;
1,658,369✔
1513
    }
1514

1515
    (void)doAsyncFetchRows(pRequest, false, true);
341,133,315✔
1516

1517
    // TODO refactor
1518
    SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
341,133,315✔
1519
    pResultInfo->current = pResultInfo->numOfRows;
341,133,315✔
1520

1521
    (*rows) = pResultInfo->row;
341,133,315✔
1522
    (*numOfRows) = pResultInfo->numOfRows;
341,133,315✔
1523
    return pRequest->code;
341,133,315✔
1524
  } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
42,783,250✔
1525
    SReqResultInfo *pResultInfo = NULL;
42,783,250✔
1526
    int32_t         code = tmqGetNextResInfo(res, true, &pResultInfo);
42,783,250✔
1527
    if (code != 0) return code;
42,783,250✔
1528

1529
    pResultInfo->current = pResultInfo->numOfRows;
21,393,463✔
1530
    (*rows) = pResultInfo->row;
21,393,463✔
1531
    (*numOfRows) = pResultInfo->numOfRows;
21,393,463✔
1532
    return 0;
21,393,463✔
1533
  } else {
1534
    tscError("taos_fetch_block_s invalid res type");
×
1535
    return TSDB_CODE_TMQ_INVALID_DATA;
×
1536
  }
1537
}
1538

1539
int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
196,118✔
1540
  *numOfRows = 0;
196,118✔
1541
  *pData = NULL;
196,119✔
1542

1543
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
196,119✔
1544
    return 0;
×
1545
  }
1546

1547
  if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
196,142✔
1548
    SReqResultInfo *pResultInfo = NULL;
3,425✔
1549
    int32_t         code = tmqGetNextResInfo(res, false, &pResultInfo);
3,398✔
1550
    if (code != 0) {
3,398✔
1551
      (*numOfRows) = 0;
1,699✔
1552
      return 0;
1,699✔
1553
    }
1554

1555
    pResultInfo->current = pResultInfo->numOfRows;
1,699✔
1556
    (*numOfRows) = pResultInfo->numOfRows;
1,699✔
1557
    (*pData) = (void *)pResultInfo->pData;
1,699✔
1558
    return 0;
1,699✔
1559
  }
1560

1561
  SRequestObj *pRequest = (SRequestObj *)res;
192,719✔
1562

1563
  if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
192,719✔
1564
      pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
192,745✔
1565
    if (pRequest->code == TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
×
1566
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
×
1567
    }
1568
    return pRequest->code;
×
1569
  }
1570

1571
  (void)doAsyncFetchRows(pRequest, false, false);
192,721✔
1572

1573
  SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
192,720✔
1574

1575
  pResultInfo->current = pResultInfo->numOfRows;
192,744✔
1576
  (*numOfRows) = pResultInfo->numOfRows;
192,744✔
1577
  (*pData) = (void *)pResultInfo->pData;
192,745✔
1578

1579
  return pRequest->code;
192,745✔
1580
}
1581

1582
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
286,540,578✔
1583
  if (res == NULL || TD_RES_TMQ_RAW(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
286,540,578✔
1584
    return NULL;
×
1585
  }
1586

1587
  int32_t numOfFields = taos_num_fields(res);
286,540,578✔
1588
  if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) {
286,540,578✔
1589
    return NULL;
×
1590
  }
1591

1592
  SReqResultInfo *pResInfo = tscGetCurResInfo(res);
286,540,578✔
1593
  TAOS_FIELD     *pField = &pResInfo->userFields[columnIndex];
286,540,578✔
1594
  if (!IS_VAR_DATA_TYPE(pField->type)) {
286,540,578✔
1595
    return NULL;
×
1596
  }
1597

1598
  return pResInfo->pCol[columnIndex].offset;
286,540,578✔
1599
}
1600

1601
int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows) {
977,278,107✔
1602
  if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || columnIndex < 0 || TD_RES_TMQ_META(res) ||
977,278,107✔
1603
      TD_RES_TMQ_RAW(res) || TD_RES_TMQ_BATCH_META(res)) {
977,278,107✔
1604
    return TSDB_CODE_INVALID_PARA;
×
1605
  }
1606

1607
  int32_t numOfFields = taos_num_fields(res);
977,278,107✔
1608
  if (columnIndex >= numOfFields || numOfFields == 0) {
977,278,107✔
1609
    return TSDB_CODE_INVALID_PARA;
×
1610
  }
1611

1612
  SReqResultInfo *pResInfo = tscGetCurResInfo(res);
977,278,107✔
1613
  TAOS_FIELD     *pField = &pResInfo->userFields[columnIndex];
977,278,107✔
1614
  SResultColumn  *pCol = &pResInfo->pCol[columnIndex];
977,278,107✔
1615

1616
  if (*rows > pResInfo->numOfRows) {
977,278,107✔
1617
    *rows = pResInfo->numOfRows;
×
1618
  }
1619
  if (IS_VAR_DATA_TYPE(pField->type)) {
977,278,107✔
1620
    for (int i = 0; i < *rows; i++) {
×
1621
      if (pCol->offset[i] == -1) {
×
1622
        result[i] = true;
×
1623
      } else {
1624
        result[i] = false;
×
1625
      }
1626
    }
1627
  } else {
1628
    for (int i = 0; i < *rows; i++) {
2,147,483,647✔
1629
      if (colDataIsNull_f(pCol, i)) {
2,147,483,647✔
1630
        result[i] = true;
2,147,483,647✔
1631
      } else {
1632
        result[i] = false;
2,147,483,647✔
1633
      }
1634
    }
1635
  }
1636
  return 0;
977,278,107✔
1637
}
1638

1639
int taos_validate_sql(TAOS *taos, const char *sql) {
×
1640
  TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP);
×
1641

1642
  int code = taos_errno(pObj);
×
1643

1644
  taos_free_result(pObj);
×
1645
  return code;
×
1646
}
1647

1648
void taos_reset_current_db(TAOS *taos) {
×
1649
  STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
×
1650
  if (pTscObj == NULL) {
×
1651
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
1652
    return;
×
1653
  }
1654

1655
  resetConnectDB(pTscObj);
×
1656

1657
  releaseTscObj(*(int64_t *)taos);
×
1658
}
1659

1660
const char *taos_get_server_info(TAOS *taos) {
815✔
1661
  STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
815✔
1662
  if (pTscObj == NULL) {
815✔
1663
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
1664
    return NULL;
×
1665
  }
1666

1667
  releaseTscObj(*(int64_t *)taos);
815✔
1668

1669
  return pTscObj->sDetailVer;
815✔
1670
}
1671

1672
int taos_get_current_db(TAOS *taos, char *database, int len, int *required) {
2,692✔
1673
  STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
2,692✔
1674
  if (pTscObj == NULL) {
2,692✔
1675
    return TSDB_CODE_TSC_DISCONNECTED;
×
1676
  }
1677

1678
  int code = TSDB_CODE_SUCCESS;
2,692✔
1679
  (void)taosThreadMutexLock(&pTscObj->mutex);
2,692✔
1680
  if (database == NULL || len <= 0) {
2,692✔
1681
    if (required != NULL) *required = strlen(pTscObj->db) + 1;
1,346✔
1682
    TSC_ERR_JRET(TSDB_CODE_INVALID_PARA);
1,346✔
1683
  } else if (len < strlen(pTscObj->db) + 1) {
1,346✔
1684
    tstrncpy(database, pTscObj->db, len);
673✔
1685
    if (required) *required = strlen(pTscObj->db) + 1;
673✔
1686
    TSC_ERR_JRET(TSDB_CODE_INVALID_PARA);
673✔
1687
  } else {
1688
    tstrncpy(database, pTscObj->db, len);
673✔
1689
    code = 0;
673✔
1690
  }
1691
_return:
2,692✔
1692
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
2,692✔
1693
  releaseTscObj(*(int64_t *)taos);
2,692✔
1694
  return code;
2,692✔
1695
}
1696

1697
// buffer is allocated by caller, len is in/out parameter, input is buffer length, output is actual length.
1698
// because this is a general purpose api, buffer is not null-terminated string even for string info, and
1699
// the return length is the actual length of the info, not including null-terminator.
1700
int taos_get_connection_info(TAOS *taos, TSDB_CONNECTION_INFO info, char *buffer, int *len) {
196✔
1701
  if (len == NULL) {
196✔
1702
    return TSDB_CODE_INVALID_PARA;
×
1703
  }
1704

1705
  STscObj *pTscObj = acquireTscObj(*(int64_t *)taos);
196✔
1706
  if (pTscObj == NULL) {
196✔
1707
    return TSDB_CODE_TSC_DISCONNECTED;
×
1708
  }
1709

1710
  int code = TSDB_CODE_SUCCESS;
196✔
1711
  (void)taosThreadMutexLock(&pTscObj->mutex);
196✔
1712

1713
  switch (info) {
196✔
1714
    case TSDB_CONNECTION_INFO_USER: {
98✔
1715
      int userLen = strlen(pTscObj->user);
98✔
1716
      if (buffer == NULL || *len < userLen) {
98✔
1717
        *len = userLen;
×
1718
        TSC_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1719
      } else {
1720
        *len = userLen;
98✔
1721
        (void)memcpy(buffer, pTscObj->user, userLen);
98✔
1722
      }
1723
      break;
98✔
1724
    }
1725

1726
    case TSDB_CONNECTION_INFO_TOKEN: {
98✔
1727
      int tokenLen = strlen(pTscObj->tokenName);
98✔
1728
      if (tokenLen == 0) {
98✔
1729
        *len = 0;
×
1730
      } else if (buffer == NULL || *len < tokenLen) {
98✔
1731
        *len = tokenLen;
×
1732
        TSC_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1733
      } else {
1734
        *len = tokenLen;
98✔
1735
        (void)memcpy(buffer, pTscObj->tokenName, tokenLen);
98✔
1736
      }
1737
      break;
98✔
1738
    }
1739

1740
    default:
×
1741
      TSC_ERR_JRET(TSDB_CODE_INVALID_PARA);
×
1742
  }
1743

1744
_return:
×
1745
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
196✔
1746
  releaseTscObj(*(int64_t *)taos);
196✔
1747
  return code;
196✔
1748
}
1749

1750
void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) {
2,147,483,647✔
1751
  if (NULL == pWrapper) {
2,147,483,647✔
1752
    return;
1,258,198,827✔
1753
  }
1754
  destoryCatalogReq(pWrapper->pCatalogReq);
1,147,515,272✔
1755
  taosMemoryFree(pWrapper->pCatalogReq);
1,147,559,329✔
1756
  qDestroyParseContext(pWrapper->pParseCtx);
1,147,566,871✔
1757
  taosMemoryFree(pWrapper);
1,147,505,802✔
1758
}
1759

1760
void destroyCtxInRequest(SRequestObj *pRequest) {
3,879,548✔
1761
  schedulerFreeJob(&pRequest->body.queryJob, 0);
3,879,548✔
1762
  qDestroyQuery(pRequest->pQuery);
3,879,548✔
1763
  pRequest->pQuery = NULL;
3,879,548✔
1764
  destorySqlCallbackWrapper(pRequest->pWrapper);
3,879,548✔
1765
  pRequest->pWrapper = NULL;
3,879,548✔
1766
}
3,879,548✔
1767

1768
static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) {
501,499,982✔
1769
  SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
501,499,982✔
1770
  SRequestObj         *pRequest = pWrapper->pRequest;
501,499,982✔
1771
  SQuery              *pQuery = pRequest->pQuery;
501,500,118✔
1772

1773
  qDebug("req:0x%" PRIx64 ", start to semantic analysis, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
501,501,584✔
1774

1775
  int64_t analyseStart = taosGetTimestampUs();
501,501,160✔
1776
  pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart;
501,501,160✔
1777
  pWrapper->pParseCtx->parseOnly = pRequest->parseOnly;
501,501,686✔
1778

1779
  if (TSDB_CODE_SUCCESS == code) {
501,500,978✔
1780
    code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
501,499,633✔
1781
  }
1782

1783
  if (TSDB_CODE_SUCCESS == code) {
501,461,829✔
1784
    code = sqlSecurityCheckASTLevel(pRequest, pQuery);
413,011,994✔
1785
  }
1786

1787
  pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
501,446,724✔
1788

1789
  if (pRequest->parseOnly) {
501,441,952✔
1790
    (void)memcpy(&pRequest->parseMeta, pResultMeta, sizeof(*pResultMeta));
344,975✔
1791
    (void)memset(pResultMeta, 0, sizeof(*pResultMeta));
344,975✔
1792
  }
1793

1794
  handleQueryAnslyseRes(pWrapper, pResultMeta, code);
501,436,396✔
1795
}
501,452,863✔
1796

1797
int32_t cloneCatalogReq(SCatalogReq **ppTarget, SCatalogReq *pSrc) {
×
1798
  int32_t      code = TSDB_CODE_SUCCESS;
×
1799
  SCatalogReq *pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq));
×
1800
  if (pTarget == NULL) {
×
1801
    code = terrno;
×
1802
  } else {
1803
    pTarget->pDbVgroup = taosArrayDup(pSrc->pDbVgroup, NULL);
×
1804
    pTarget->pDbCfg = taosArrayDup(pSrc->pDbCfg, NULL);
×
1805
    pTarget->pDbInfo = taosArrayDup(pSrc->pDbInfo, NULL);
×
1806
    pTarget->pTableMeta = taosArrayDup(pSrc->pTableMeta, NULL);
×
1807
    pTarget->pTableHash = taosArrayDup(pSrc->pTableHash, NULL);
×
1808
    pTarget->pUdf = taosArrayDup(pSrc->pUdf, NULL);
×
1809
    pTarget->pIndex = taosArrayDup(pSrc->pIndex, NULL);
×
1810
    pTarget->pUser = taosArrayDup(pSrc->pUser, NULL);
×
1811
    pTarget->pTableIndex = taosArrayDup(pSrc->pTableIndex, NULL);
×
1812
    pTarget->pTableCfg = taosArrayDup(pSrc->pTableCfg, NULL);
×
1813
    pTarget->pTableTag = taosArrayDup(pSrc->pTableTag, NULL);
×
1814
    pTarget->pView = taosArrayDup(pSrc->pView, NULL);
×
1815
    pTarget->pTableTSMAs = taosArrayDup(pSrc->pTableTSMAs, NULL);
×
1816
    pTarget->pTSMAs = taosArrayDup(pSrc->pTSMAs, NULL);
×
1817
    pTarget->pVStbRefDbs = taosArrayDup(pSrc->pVStbRefDbs, NULL);
×
1818
    pTarget->qNodeRequired = pSrc->qNodeRequired;
×
1819
    pTarget->dNodeRequired = pSrc->dNodeRequired;
×
1820
    pTarget->svrVerRequired = pSrc->svrVerRequired;
×
1821
    pTarget->forceUpdate = pSrc->forceUpdate;
×
1822
    pTarget->cloned = true;
×
1823

1824
    *ppTarget = pTarget;
×
1825
  }
1826

1827
  return code;
×
1828
}
1829

1830
void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode *pRoot) {
×
1831
  SRequestObj         *pNewRequest = NULL;
×
1832
  SSqlCallbackWrapper *pNewWrapper = NULL;
×
1833
  int32_t              code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest);
×
1834
  if (code) {
×
1835
    handleQueryAnslyseRes(pWrapper, pResultMeta, code);
×
1836
    return;
×
1837
  }
1838

1839
  pNewRequest->pQuery = NULL;
×
1840
  code = nodesMakeNode(QUERY_NODE_QUERY, (SNode **)&pNewRequest->pQuery);
×
1841
  if (pNewRequest->pQuery) {
×
1842
    pNewRequest->pQuery->pRoot = pRoot;
×
1843
    pRoot = NULL;
×
1844
    pNewRequest->pQuery->execStage = QUERY_EXEC_STAGE_ANALYSE;
×
1845
  }
1846
  if (TSDB_CODE_SUCCESS == code) {
×
1847
    code = prepareAndParseSqlSyntax(&pNewWrapper, pNewRequest, false);
×
1848
  }
1849
  if (TSDB_CODE_SUCCESS == code) {
×
1850
    code = cloneCatalogReq(&pNewWrapper->pCatalogReq, pWrapper->pCatalogReq);
×
1851
  }
1852
  if (TSDB_CODE_SUCCESS == code) {
×
1853
    doAsyncQueryFromAnalyse(pResultMeta, pNewWrapper, code);
×
1854
    nodesDestroyNode(pRoot);
×
1855
  } else {
1856
    handleQueryAnslyseRes(pWrapper, pResultMeta, code);
×
1857
    return;
×
1858
  }
1859
}
1860

1861
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) {
501,432,505✔
1862
  SRequestObj *pRequest = pWrapper->pRequest;
501,432,505✔
1863
  SQuery      *pQuery = pRequest->pQuery;
501,444,046✔
1864

1865
  if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) {
501,434,819✔
1866
    SNode *prevRoot = pQuery->pPrevRoot;
×
1867
    pQuery->pPrevRoot = NULL;
×
1868
    handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot);
×
1869
    return;
×
1870
  }
1871

1872
  if (code == TSDB_CODE_SUCCESS) {
501,424,113✔
1873
    pRequest->stableQuery = pQuery->stableQuery;
412,993,604✔
1874
    if (pQuery->pRoot) {
412,998,843✔
1875
      pRequest->stmtType = pQuery->pRoot->type;
413,002,874✔
1876
      if (nodeType(pQuery->pRoot) == QUERY_NODE_DELETE_STMT) {
413,015,160✔
1877
        pRequest->secureDelete = ((SDeleteStmt *)pQuery->pRoot)->secureDelete;
1,843,837✔
1878
      }
1879
    }
1880

1881
    if (pQuery->haveResultSet) {
412,963,149✔
1882
      code = setResSchemaInfo(&pRequest->body.resInfo, pQuery->pResSchema, pQuery->numOfResCols, pQuery->pResExtSchema,
223,122,624✔
1883
                              pRequest->stmtBindVersion > 0);
223,122,461✔
1884
      setResPrecision(&pRequest->body.resInfo, pQuery->precision);
223,121,256✔
1885
    }
1886
  }
1887

1888
  if (code == TSDB_CODE_SUCCESS) {
501,443,730✔
1889
    TSWAP(pRequest->dbList, (pQuery)->pDbList);
412,992,363✔
1890
    TSWAP(pRequest->tableList, (pQuery)->pTableList);
412,958,486✔
1891
    TSWAP(pRequest->targetTableList, (pQuery)->pTargetTableList);
412,980,346✔
1892

1893
    launchAsyncQuery(pRequest, pQuery, pResultMeta, pWrapper);
412,994,442✔
1894
  } else {
1895
    destorySqlCallbackWrapper(pWrapper);
88,451,367✔
1896
    pRequest->pWrapper = NULL;
88,453,152✔
1897
    qDestroyQuery(pRequest->pQuery);
88,453,152✔
1898
    pRequest->pQuery = NULL;
88,459,597✔
1899

1900
    if (NEED_CLIENT_HANDLE_ERROR(code) && pRequest->stmtBindVersion == 0) {
88,459,597✔
1901
      tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%d - %s, tryCount:%d, QID:0x%" PRIx64,
3,792,383✔
1902
               pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
1903
      restartAsyncQuery(pRequest, code);
3,792,383✔
1904
      return;
3,792,383✔
1905
    }
1906

1907
    // return to app directly
1908
    tscError("req:0x%" PRIx64 ", error occurs, code:%s, return to user app, QID:0x%" PRIx64, pRequest->self,
84,667,214✔
1909
             tstrerror(code), pRequest->requestId);
1910
    pRequest->code = code;
84,690,890✔
1911
    returnToUser(pRequest);
84,690,890✔
1912
  }
1913
}
1914

1915
static int32_t getAllMetaAsync(SSqlCallbackWrapper *pWrapper, catalogCallback fp) {
510,032,880✔
1916
  SRequestConnInfo conn = {.pTrans = pWrapper->pParseCtx->pTransporter,
815,026,785✔
1917
                           .requestId = pWrapper->pParseCtx->requestId,
510,033,726✔
1918
                           .requestObjRefId = pWrapper->pParseCtx->requestRid,
510,044,860✔
1919
                           .mgmtEps = pWrapper->pParseCtx->mgmtEpSet};
510,028,943✔
1920

1921
  pWrapper->pRequest->metric.ctgStart = taosGetTimestampUs();
815,034,442✔
1922

1923
  return catalogAsyncGetAllMeta(pWrapper->pParseCtx->pCatalog, &conn, pWrapper->pCatalogReq, fp, pWrapper,
715,115,494✔
1924
                                &pWrapper->pRequest->body.queryJob);
510,058,983✔
1925
}
1926

1927
static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t code);
1928

1929
static int32_t phaseAsyncQuery(SSqlCallbackWrapper *pWrapper) {
1,140,136,214✔
1930
  int32_t      code = TSDB_CODE_SUCCESS;
1,140,136,214✔
1931
  SRequestObj *pRequest = pWrapper->pRequest;
1,140,136,214✔
1932

1933
  switch (pRequest->pQuery->execStage) {
1,140,191,709✔
1934
    case QUERY_EXEC_STAGE_PARSE: {
8,590,829✔
1935
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_CATALOG);
17,036,224✔
1936
      code = getAllMetaAsync(pWrapper, doAsyncQueryFromParse);
8,590,829✔
1937
      break;
8,590,829✔
1938
    }
1939
    case QUERY_EXEC_STAGE_ANALYSE: {
501,440,238✔
1940
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_CATALOG);
1,002,916,268✔
1941
      code = getAllMetaAsync(pWrapper, doAsyncQueryFromAnalyse);
501,486,568✔
1942
      break;
501,471,951✔
1943
    }
1944
    case QUERY_EXEC_STAGE_SCHEDULE: {
630,111,890✔
1945
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_SCHEDULE);
1,260,269,743✔
1946
      launchAsyncQuery(pRequest, pRequest->pQuery, NULL, pWrapper);
630,195,804✔
1947
      break;
630,125,458✔
1948
    }
UNCOV
1949
    default:
×
UNCOV
1950
      break;
×
1951
  }
1952
  return code;
1,140,180,586✔
1953
}
1954

1955
static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t code) {
8,590,829✔
1956
  SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
8,590,829✔
1957
  SRequestObj         *pRequest = pWrapper->pRequest;
8,590,829✔
1958
  SQuery              *pQuery = pRequest->pQuery;
8,590,829✔
1959

1960
  pRequest->metric.ctgCostUs += taosGetTimestampUs() - pRequest->metric.ctgStart;
8,590,829✔
1961
  qDebug("req:0x%" PRIx64 ", continue parse query, QID:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId,
8,590,829✔
1962
         tstrerror(code));
1963

1964
  if (code == TSDB_CODE_SUCCESS) {
8,590,829✔
1965
    // pWrapper->pCatalogReq->forceUpdate = false;
1966
    code = qContinueParseSql(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
8,563,255✔
1967
  }
1968

1969
  if (TSDB_CODE_SUCCESS == code) {
8,590,829✔
1970
    code = phaseAsyncQuery(pWrapper);
7,951,776✔
1971
  }
1972

1973
  if (TSDB_CODE_SUCCESS != code) {
8,590,829✔
1974
    tscError("req:0x%" PRIx64 ", error happens, code:%d - %s, QID:0x%" PRIx64, pWrapper->pRequest->self, code,
639,053✔
1975
             tstrerror(code), pWrapper->pRequest->requestId);
1976
    destorySqlCallbackWrapper(pWrapper);
639,053✔
1977
    pRequest->pWrapper = NULL;
639,053✔
1978
    terrno = code;
639,053✔
1979
    pRequest->code = code;
639,053✔
1980
    doRequestCallback(pRequest, code);
639,053✔
1981
  }
1982
}
8,590,829✔
1983

1984
void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest) {
13,590✔
1985
  int32_t code = qParseSqlSyntax(pWrapper->pParseCtx, &pRequest->pQuery, pWrapper->pCatalogReq);
13,590✔
1986
  if (TSDB_CODE_SUCCESS == code) {
13,590✔
1987
    code = phaseAsyncQuery(pWrapper);
13,590✔
1988
  }
1989

1990
  if (TSDB_CODE_SUCCESS != code) {
13,590✔
1991
    tscError("req:0x%" PRIx64 ", error happens, code:%d - %s, QID:0x%" PRIx64, pWrapper->pRequest->self, code,
×
1992
             tstrerror(code), pWrapper->pRequest->requestId);
1993
    destorySqlCallbackWrapper(pWrapper);
×
1994
    pRequest->pWrapper = NULL;
×
1995
    terrno = code;
×
1996
    pRequest->code = code;
×
1997
    doRequestCallback(pRequest, code);
×
1998
  }
1999
}
13,590✔
2000

2001
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
152,413✔
2002
  int64_t connId = *(int64_t *)taos;
152,413✔
2003
  taosAsyncQueryImpl(connId, sql, fp, param, false, TD_REQ_FROM_APP);
152,413✔
2004
}
152,413✔
2005

2006
void taos_query_a_with_reqid(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, int64_t reqid) {
×
2007
  int64_t connId = *(int64_t *)taos;
×
2008
  taosAsyncQueryImplWithReqid(connId, sql, fp, param, false, reqid);
×
2009
}
×
2010

2011
int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SSqlCallbackWrapper *pWrapper) {
1,147,511,139✔
2012
  const STscObj *pTscObj = pRequest->pTscObj;
1,147,511,139✔
2013

2014
  *pCxt = taosMemoryCalloc(1, sizeof(SParseContext));
1,147,556,642✔
2015
  if (*pCxt == NULL) {
1,147,464,044✔
2016
    return terrno;
×
2017
  }
2018

2019
  **pCxt = (SParseContext){.requestId = pRequest->requestId,
2,147,483,647✔
2020
                           .requestRid = pRequest->self,
1,147,507,415✔
2021
                           .acctId = pTscObj->acctId,
1,147,529,262✔
2022
                           .db = pRequest->pDb,
1,147,534,411✔
2023
                           .topicQuery = false,
2024
                           .pSql = pRequest->sqlstr,
1,147,566,542✔
2025
                           .sqlLen = pRequest->sqlLen,
1,147,561,460✔
2026
                           .pMsg = pRequest->msgBuf,
1,147,532,910✔
2027
                           .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
2028
                           .pTransporter = pTscObj->pAppInfo->pTransporter,
1,147,533,324✔
2029
                           .pStmtCb = NULL,
2030
                           .pUser = pTscObj->user,
1,147,537,961✔
2031
                           .userId = pTscObj->userId,
1,147,548,549✔
2032
                           .pEffectiveUser = pRequest->effectiveUser,
1,147,559,131✔
2033
                           .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
1,147,551,933✔
2034
                           .enableSysInfo = pTscObj->sysInfo,
1,147,530,981✔
2035
                           .privInfo = pWrapper->pParseCtx ? pWrapper->pParseCtx->privInfo : 0,
1,147,525,491✔
2036
                           .sodInitial = pTscObj->pAppInfo->serverCfg.sodInitial,
1,147,557,402✔
2037
                           .async = true,
2038
                           .svrVer = pTscObj->sVer,
1,147,510,393✔
2039
                           .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes), .allocatorId = pRequest->allocatorRefId,
2,078,221,854✔
2040
                           .parseSqlFp = clientParseSql,
2041
                           .parseSqlParam = pWrapper,
2042
                           .setQueryFp = setQueryRequest,
2043
                           .timezone = pTscObj->optionInfo.timezone,
1,147,511,132✔
2044
                           .charsetCxt = pTscObj->optionInfo.charsetCxt};
1,147,535,818✔
2045
  int8_t biMode = atomic_load_8(&((STscObj *)pTscObj)->biMode);
1,147,518,595✔
2046
  (*pCxt)->biMode = biMode;
1,147,478,458✔
2047
  (*pCxt)->minSecLevel = pTscObj->minSecLevel;
1,147,530,648✔
2048
  (*pCxt)->maxSecLevel = pTscObj->maxSecLevel;
1,147,534,182✔
2049
  (*pCxt)->macMode = pTscObj->pAppInfo->serverCfg.macActive;
1,147,562,601✔
2050
  return TSDB_CODE_SUCCESS;
1,147,567,555✔
2051
}
2052

2053
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) {
1,147,501,895✔
2054
  int32_t              code = TSDB_CODE_SUCCESS;
1,147,501,895✔
2055
  STscObj             *pTscObj = pRequest->pTscObj;
1,147,501,895✔
2056
  SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
1,147,541,828✔
2057
  if (pWrapper == NULL) {
1,147,512,913✔
2058
    code = terrno;
×
2059
  } else {
2060
    pWrapper->pRequest = pRequest;
1,147,512,913✔
2061
    pRequest->pWrapper = pWrapper;
1,147,514,350✔
2062
    *ppWrapper = pWrapper;
1,147,538,087✔
2063
  }
2064

2065
  if (TSDB_CODE_SUCCESS == code) {
1,147,527,893✔
2066
    code = createParseContext(pRequest, &pWrapper->pParseCtx, pWrapper);
1,147,521,857✔
2067
  }
2068

2069
  if (TSDB_CODE_SUCCESS == code) {
1,147,520,888✔
2070
    pWrapper->pParseCtx->mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,147,525,970✔
2071
    code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pWrapper->pParseCtx->pCatalog);
1,147,598,418✔
2072
  }
2073

2074
  if (TSDB_CODE_SUCCESS == code && NULL == pRequest->pQuery) {
1,147,569,860✔
2075
    int64_t syntaxStart = taosGetTimestampUs();
1,147,598,889✔
2076

2077
    pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq));
1,147,598,889✔
2078
    if (pWrapper->pCatalogReq == NULL) {
1,147,438,262✔
2079
      code = terrno;
×
2080
    } else {
2081
      pWrapper->pCatalogReq->forceUpdate = updateMetaForce;
1,147,461,389✔
2082
      TSC_ERR_RET(qnodeRequired(pRequest, &pWrapper->pCatalogReq->qNodeRequired));
1,147,504,918✔
2083
      code = qParseSqlSyntax(pWrapper->pParseCtx, &pRequest->pQuery, pWrapper->pCatalogReq);
1,147,519,398✔
2084
    }
2085

2086
    pRequest->metric.parseCostUs += taosGetTimestampUs() - syntaxStart;
1,147,543,200✔
2087
  }
2088

2089
  return code;
1,147,575,150✔
2090
}
2091

2092
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
1,148,767,078✔
2093
  SSqlCallbackWrapper *pWrapper = NULL;
1,148,767,078✔
2094
  int32_t              code = TSDB_CODE_SUCCESS;
1,148,804,125✔
2095

2096
  CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_PARSE);
2,147,483,647✔
2097

2098
  if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
1,148,870,152✔
2099
    code = pRequest->prevCode;
1,266,185✔
2100
    terrno = code;
1,266,185✔
2101
    pRequest->code = code;
1,266,185✔
2102
    tscDebug("req:0x%" PRIx64 ", call sync query cb with code:%s", pRequest->self, tstrerror(code));
1,266,185✔
2103
    doRequestCallback(pRequest, code);
1,266,185✔
2104
    return;
1,266,185✔
2105
  }
2106

2107
  if (TSDB_CODE_SUCCESS == code) {
1,147,515,129✔
2108
    code = prepareAndParseSqlSyntax(&pWrapper, pRequest, updateMetaForce);
1,147,496,612✔
2109
  }
2110

2111
  if (TSDB_CODE_SUCCESS == code) {
1,147,517,215✔
2112
    pRequest->stmtType = pRequest->pQuery->pRoot->type;
1,132,245,714✔
2113
    code = phaseAsyncQuery(pWrapper);
1,132,253,541✔
2114
  }
2115

2116
  if (TSDB_CODE_SUCCESS != code) {
1,147,516,633✔
2117
    if (NULL != pRequest->msgBuf && strlen(pRequest->msgBuf) > 0) {
15,291,052✔
2118
      tscError("req:0x%" PRIx64 ", error happens, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code, pRequest->msgBuf,
15,199,507✔
2119
               pRequest->requestId);
2120
    } else {
2121
      tscError("req:0x%" PRIx64 ", error happens, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
91,549✔
2122
               pRequest->requestId);
2123
    }
2124

2125
    destorySqlCallbackWrapper(pWrapper);
15,291,151✔
2126
    pRequest->pWrapper = NULL;
15,291,247✔
2127
    qDestroyQuery(pRequest->pQuery);
15,291,247✔
2128
    pRequest->pQuery = NULL;
15,291,152✔
2129

2130
    if (NEED_CLIENT_HANDLE_ERROR(code) && pRequest->stmtBindVersion == 0) {
15,291,152✔
2131
      tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%d - %s, tryCount:%d, QID:0x%" PRIx64,
14,095✔
2132
               pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
2133
      code = refreshMeta(pRequest->pTscObj, pRequest);
14,095✔
2134
      if (code != 0) {
14,095✔
2135
        tscWarn("req:0x%" PRIx64 ", refresh meta failed, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
14,095✔
2136
                tstrerror(code), pRequest->requestId);
2137
      }
2138
      pRequest->prevCode = code;
14,095✔
2139
      doAsyncQuery(pRequest, true);
14,095✔
2140
      return;
14,095✔
2141
    }
2142

2143
    terrno = code;
15,277,057✔
2144
    pRequest->code = code;
15,276,962✔
2145
    doRequestCallback(pRequest, code);
15,276,962✔
2146
  }
2147
}
2148

2149
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
3,879,548✔
2150
  tscInfo("restart request:%s p:%p", pRequest->sqlstr, pRequest);
3,879,548✔
2151
  SRequestObj *pUserReq = pRequest;
3,879,548✔
2152
  (void)acquireRequest(pRequest->self);
3,879,548✔
2153
  while (pUserReq) {
3,879,548✔
2154
    if (pUserReq->self == pUserReq->relation.userRefId || pUserReq->relation.userRefId == 0) {
3,879,548✔
2155
      break;
2156
    } else {
2157
      int64_t nextRefId = pUserReq->relation.nextRefId;
×
2158
      (void)releaseRequest(pUserReq->self);
×
2159
      if (nextRefId) {
×
2160
        pUserReq = acquireRequest(nextRefId);
×
2161
      }
2162
    }
2163
  }
2164
  bool hasSubRequest = pUserReq != pRequest || pRequest->relation.prevRefId != 0;
3,879,548✔
2165
  if (pUserReq) {
3,879,548✔
2166
    destroyCtxInRequest(pUserReq);
3,879,548✔
2167
    pUserReq->prevCode = code;
3,879,548✔
2168
    (void)memset(&pUserReq->relation, 0, sizeof(pUserReq->relation));
3,879,548✔
2169
  } else {
2170
    tscError("User req is missing");
×
2171
    (void)removeFromMostPrevReq(pRequest);
×
2172
    return;
×
2173
  }
2174
  if (hasSubRequest)
3,879,548✔
2175
    (void)removeFromMostPrevReq(pRequest);
×
2176
  else
2177
    (void)releaseRequest(pUserReq->self);
3,879,548✔
2178
  doAsyncQuery(pUserReq, true);
3,879,548✔
2179
}
2180

2181
typedef struct SAsyncFetchParam {
2182
  SRequestObj      *pReq;
2183
  __taos_async_fn_t fp;
2184
  void             *param;
2185
} SAsyncFetchParam;
2186

2187
static int32_t doAsyncFetch(void *pParam) {
324,511,056✔
2188
  SAsyncFetchParam *param = pParam;
324,511,056✔
2189
  taosAsyncFetchImpl(param->pReq, param->fp, param->param);
324,511,056✔
2190
  taosMemoryFree(param);
324,510,834✔
2191
  return TSDB_CODE_SUCCESS;
324,510,875✔
2192
}
2193

2194
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
324,551,984✔
2195
  if (res == NULL || fp == NULL) {
324,551,984✔
2196
    tscError("taos_fetch_rows_a invalid paras");
71✔
2197
    return;
×
2198
  }
2199
  if (!TD_RES_QUERY(res)) {
324,551,913✔
2200
    tscError("taos_fetch_rows_a res is NULL");
×
2201
    fp(param, res, TSDB_CODE_APP_ERROR);
×
2202
    return;
×
2203
  }
2204

2205
  SRequestObj *pRequest = res;
324,552,005✔
2206

2207
  // Each fetch call sets phase to IN_PROGRESS
2208

2209
  if (TSDB_SQL_RETRIEVE_EMPTY_RESULT == pRequest->type) {
324,552,005✔
2210
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
82,300✔
2211
    fp(param, res, 0);
41,150✔
2212
    return;
41,150✔
2213
  }
2214

2215
  SAsyncFetchParam *pParam = taosMemoryCalloc(1, sizeof(SAsyncFetchParam));
324,510,697✔
2216
  if (!pParam) {
324,510,966✔
2217
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
×
2218
    fp(param, res, terrno);
×
2219
    return;
×
2220
  }
2221

2222
  CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_IN_PROGRESS);
492,106,624✔
2223
  pParam->pReq = pRequest;
324,511,056✔
2224
  pParam->fp = fp;
324,511,056✔
2225
  pParam->param = param;
324,510,987✔
2226
  int32_t code = taosAsyncExec(doAsyncFetch, pParam, NULL);
324,511,035✔
2227
  if (TSDB_CODE_SUCCESS != code) {
324,511,056✔
2228
    taosMemoryFree(pParam);
×
2229
    fp(param, res, code);
×
2230
    return;
×
2231
  }
2232
}
2233

2234
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
582✔
2235
  if (res == NULL || fp == NULL) {
582✔
2236
    tscError("taos_fetch_raw_block_a invalid paras");
×
2237
    return;
×
2238
  }
2239
  if (!TD_RES_QUERY(res)) {
582✔
2240
    tscError("taos_fetch_raw_block_a res is NULL");
×
2241
    return;
×
2242
  }
2243
  SRequestObj    *pRequest = res;
582✔
2244
  SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
582✔
2245

2246
  CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_IN_PROGRESS);
858✔
2247
  // set the current block is all consumed
2248
  pResultInfo->convertUcs4 = false;
582✔
2249

2250
  // it is a local executed query, no need to do async fetch
2251
  taos_fetch_rows_a(pRequest, fp, param);
582✔
2252
}
2253

2254
const void *taos_get_raw_block(TAOS_RES *res) {
120✔
2255
  if (res == NULL) {
120✔
2256
    tscError("taos_get_raw_block invalid paras");
×
2257
    return NULL;
×
2258
  }
2259
  if (!TD_RES_QUERY(res)) {
120✔
2260
    tscError("taos_get_raw_block res is NULL");
×
2261
    return NULL;
×
2262
  }
2263
  SRequestObj *pRequest = res;
120✔
2264

2265
  return pRequest->body.resInfo.pData;
120✔
2266
}
2267

2268
int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo) {
×
2269
  if (NULL == taos) {
×
2270
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2271
    return terrno;
×
2272
  }
2273

2274
  if (NULL == db || NULL == dbInfo) {
×
2275
    tscError("invalid input param, db:%p, dbInfo:%p", db, dbInfo);
×
2276
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
×
2277
    return terrno;
×
2278
  }
2279

2280
  int64_t      connId = *(int64_t *)taos;
×
2281
  SRequestObj *pRequest = NULL;
×
2282
  char        *sql = "taos_get_db_route_info";
×
2283
  int32_t      code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
×
2284
  if (code != TSDB_CODE_SUCCESS) {
×
2285
    terrno = code;
×
2286
    return terrno;
×
2287
  }
2288

2289
  STscObj  *pTscObj = pRequest->pTscObj;
×
2290
  SCatalog *pCtg = NULL;
×
2291
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
×
2292
  if (code != TSDB_CODE_SUCCESS) {
×
2293
    goto _return;
×
2294
  }
2295

2296
  SRequestConnInfo conn = {
×
2297
      .pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
×
2298

2299
  conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
2300

2301
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
×
2302
  (void)snprintf(dbFName, sizeof(dbFName), "%d.%s", pTscObj->acctId, db);
×
2303

2304
  code = catalogGetDBVgInfo(pCtg, &conn, dbFName, dbInfo);
×
2305
  if (code) {
×
2306
    goto _return;
×
2307
  }
2308

2309
_return:
×
2310

2311
  terrno = code;
×
2312

2313
  destroyRequest(pRequest);
×
2314
  return code;
×
2315
}
2316

2317
int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId) {
×
2318
  if (NULL == taos) {
×
2319
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2320
    return terrno;
×
2321
  }
2322

2323
  if (NULL == db || NULL == table || NULL == vgId) {
×
2324
    tscError("invalid input param, db:%p, table:%p, vgId:%p", db, table, vgId);
×
2325
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
×
2326
    return terrno;
×
2327
  }
2328

2329
  int64_t      connId = *(int64_t *)taos;
×
2330
  SRequestObj *pRequest = NULL;
×
2331
  char        *sql = "taos_get_table_vgId";
×
2332
  int32_t      code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
×
2333
  if (code != TSDB_CODE_SUCCESS) {
×
2334
    return terrno;
×
2335
  }
2336

2337
  pRequest->syncQuery = true;
×
2338

2339
  STscObj  *pTscObj = pRequest->pTscObj;
×
2340
  SCatalog *pCtg = NULL;
×
2341
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
×
2342
  if (code != TSDB_CODE_SUCCESS) {
×
2343
    goto _return;
×
2344
  }
2345

2346
  SRequestConnInfo conn = {
×
2347
      .pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
×
2348

2349
  conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
2350

2351
  SName tableName = {0};
×
2352
  toName(pTscObj->acctId, db, table, &tableName);
×
2353

2354
  SVgroupInfo vgInfo;
×
2355
  code = catalogGetTableHashVgroup(pCtg, &conn, &tableName, &vgInfo);
×
2356
  if (code) {
×
2357
    goto _return;
×
2358
  }
2359

2360
  *vgId = vgInfo.vgId;
×
2361

2362
_return:
×
2363

2364
  terrno = code;
×
2365

2366
  destroyRequest(pRequest);
×
2367
  return code;
×
2368
}
2369

2370
int taos_get_tables_vgId(TAOS *taos, const char *db, const char *table[], int tableNum, int *vgId) {
×
2371
  if (NULL == taos) {
×
2372
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2373
    return terrno;
×
2374
  }
2375

2376
  if (NULL == db || NULL == table || NULL == vgId || tableNum <= 0) {
×
2377
    tscError("invalid input param, db:%p, table:%p, vgId:%p, tbNum:%d", db, table, vgId, tableNum);
×
2378
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
×
2379
    return terrno;
×
2380
  }
2381

2382
  int64_t      connId = *(int64_t *)taos;
×
2383
  SRequestObj *pRequest = NULL;
×
2384
  char        *sql = "taos_get_table_vgId";
×
2385
  int32_t      code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
×
2386
  if (code != TSDB_CODE_SUCCESS) {
×
2387
    return terrno;
×
2388
  }
2389

2390
  pRequest->syncQuery = true;
×
2391

2392
  STscObj  *pTscObj = pRequest->pTscObj;
×
2393
  SCatalog *pCtg = NULL;
×
2394
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
×
2395
  if (code != TSDB_CODE_SUCCESS) {
×
2396
    goto _return;
×
2397
  }
2398

2399
  SRequestConnInfo conn = {
×
2400
      .pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
×
2401

2402
  conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
2403

2404
  code = catalogGetTablesHashVgId(pCtg, &conn, pTscObj->acctId, db, table, tableNum, vgId);
×
2405
  if (code) {
×
2406
    goto _return;
×
2407
  }
2408

2409
_return:
×
2410

2411
  terrno = code;
×
2412

2413
  destroyRequest(pRequest);
×
2414
  return code;
×
2415
}
2416

2417
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
1,352✔
2418
  if (NULL == taos) {
1,352✔
2419
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2420
    return terrno;
×
2421
  }
2422

2423
  int64_t       connId = *(int64_t *)taos;
1,352✔
2424
  const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024;  // 12MB list
1,352✔
2425
  int32_t       code = 0;
1,352✔
2426
  SRequestObj  *pRequest = NULL;
1,352✔
2427
  SCatalogReq   catalogReq = {0};
1,352✔
2428

2429
  if (NULL == tableNameList) {
1,352✔
2430
    return TSDB_CODE_SUCCESS;
×
2431
  }
2432

2433
  int32_t length = (int32_t)strlen(tableNameList);
1,352✔
2434
  if (0 == length) {
1,352✔
2435
    return TSDB_CODE_SUCCESS;
×
2436
  } else if (length > MAX_TABLE_NAME_LENGTH) {
1,352✔
2437
    tscError("tableNameList too long, length:%d, maximum allowed:%d", length, MAX_TABLE_NAME_LENGTH);
×
2438
    return TSDB_CODE_TSC_INVALID_OPERATION;
×
2439
  }
2440

2441
  char *sql = "taos_load_table_info";
1,352✔
2442
  code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest, 0);
1,352✔
2443
  if (code != TSDB_CODE_SUCCESS) {
1,352✔
2444
    terrno = code;
×
2445
    goto _return;
×
2446
  }
2447

2448
  pRequest->syncQuery = true;
1,352✔
2449

2450
  STscObj *pTscObj = pRequest->pTscObj;
1,352✔
2451
  code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
1,352✔
2452
  if (code) {
1,352✔
2453
    goto _return;
×
2454
  }
2455

2456
  SCatalog *pCtg = NULL;
1,352✔
2457
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
1,352✔
2458
  if (code != TSDB_CODE_SUCCESS) {
1,352✔
2459
    goto _return;
×
2460
  }
2461

2462
  SRequestConnInfo conn = {
1,352✔
2463
      .pTrans = pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
1,352✔
2464

2465
  conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,352✔
2466

2467
  code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, pRequest->body.interParam, NULL);
1,352✔
2468
  if (code) {
1,352✔
2469
    goto _return;
×
2470
  }
2471

2472
  SSyncQueryParam *pParam = pRequest->body.interParam;
1,352✔
2473
  code = tsem_wait(&pParam->sem);
1,352✔
2474
  if (code) {
1,352✔
2475
    tscError("tsem wait failed, code:%d - %s", code, tstrerror(code));
×
2476
    goto _return;
×
2477
  }
2478
_return:
1,352✔
2479
  destoryCatalogReq(&catalogReq);
1,352✔
2480
  destroyRequest(pRequest);
1,352✔
2481
  return code;
1,352✔
2482
}
2483

2484
TAOS_STMT *taos_stmt_init(TAOS *taos) {
31,736✔
2485
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
31,736✔
2486
  if (NULL == pObj) {
31,736✔
2487
    tscError("invalid parameter for %s", __FUNCTION__);
×
2488
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2489
    return NULL;
×
2490
  }
2491

2492
  TAOS_STMT *pStmt = stmtInit(pObj, 0, NULL);
31,736✔
2493
  if (NULL == pStmt) {
31,736✔
2494
    tscError("stmt init failed, errcode:%s", terrstr());
×
2495
  }
2496
  releaseTscObj(*(int64_t *)taos);
31,736✔
2497

2498
  return pStmt;
31,736✔
2499
}
2500

2501
TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid) {
×
2502
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
×
2503
  if (NULL == pObj) {
×
2504
    tscError("invalid parameter for %s", __FUNCTION__);
×
2505
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2506
    return NULL;
×
2507
  }
2508

2509
  TAOS_STMT *pStmt = stmtInit(pObj, reqid, NULL);
×
2510
  if (NULL == pStmt) {
×
2511
    tscError("stmt init failed, errcode:%s", terrstr());
×
2512
  }
2513
  releaseTscObj(*(int64_t *)taos);
×
2514

2515
  return pStmt;
×
2516
}
2517

2518
TAOS_STMT *taos_stmt_init_with_options(TAOS *taos, TAOS_STMT_OPTIONS *options) {
14,968✔
2519
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
14,968✔
2520
  if (NULL == pObj) {
14,968✔
2521
    tscError("invalid parameter for %s", __FUNCTION__);
×
2522
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2523
    return NULL;
×
2524
  }
2525

2526
  TAOS_STMT *pStmt = stmtInit(pObj, options->reqId, options);
14,968✔
2527
  if (NULL == pStmt) {
14,968✔
2528
    tscError("stmt init failed, errcode:%s", terrstr());
×
2529
  }
2530
  releaseTscObj(*(int64_t *)taos);
14,968✔
2531

2532
  return pStmt;
14,968✔
2533
}
2534

2535
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length) {
7,676,895✔
2536
  if (stmt == NULL || sql == NULL) {
7,676,895✔
2537
    tscError("NULL parameter for %s", __FUNCTION__);
14✔
2538
    terrno = TSDB_CODE_INVALID_PARA;
14✔
2539
    return terrno;
×
2540
  }
2541

2542
  return stmtPrepare(stmt, sql, length);
7,677,631✔
2543
}
2544

2545
int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags) {
8,329✔
2546
  if (stmt == NULL || name == NULL) {
8,329✔
2547
    tscError("NULL parameter for %s", __FUNCTION__);
×
2548
    terrno = TSDB_CODE_INVALID_PARA;
×
2549
    return terrno;
×
2550
  }
2551

2552
  int32_t code = stmtSetTbName(stmt, name);
8,329✔
2553
  if (code) {
8,329✔
2554
    return code;
676✔
2555
  }
2556

2557
  if (tags) {
7,653✔
2558
    return stmtSetTbTags(stmt, tags);
7,653✔
2559
  }
2560

2561
  return TSDB_CODE_SUCCESS;
×
2562
}
2563

2564
int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
10,192,920✔
2565
  if (stmt == NULL || name == NULL) {
10,192,920✔
2566
    tscError("NULL parameter for %s", __FUNCTION__);
14✔
2567
    terrno = TSDB_CODE_INVALID_PARA;
14✔
2568
    return terrno;
×
2569
  }
2570

2571
  return stmtSetTbName(stmt, name);
10,194,321✔
2572
}
2573

2574
int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
991✔
2575
  if (stmt == NULL || tags == NULL) {
991✔
2576
    tscError("NULL parameter for %s", __FUNCTION__);
×
2577
    terrno = TSDB_CODE_INVALID_PARA;
×
2578
    return terrno;
×
2579
  }
2580

2581
  return stmtSetTbTags(stmt, tags);
991✔
2582
}
2583

2584
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); }
396✔
2585

2586
int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields) {
792✔
2587
  if (stmt == NULL || NULL == fieldNum) {
792✔
2588
    tscError("NULL parameter for %s", __FUNCTION__);
×
2589
    terrno = TSDB_CODE_INVALID_PARA;
×
2590
    return terrno;
×
2591
  }
2592

2593
  return stmtGetTagFields(stmt, fieldNum, fields);
792✔
2594
}
2595

2596
int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields) {
1,188✔
2597
  if (stmt == NULL || NULL == fieldNum) {
1,188✔
2598
    tscError("NULL parameter for %s", __FUNCTION__);
×
2599
    terrno = TSDB_CODE_INVALID_PARA;
×
2600
    return terrno;
×
2601
  }
2602

2603
  return stmtGetColFields(stmt, fieldNum, fields);
1,188✔
2604
}
2605

2606
// let stmt to reclaim TAOS_FIELD_E that was allocated by `taos_stmt_get_tag_fields`/`taos_stmt_get_col_fields`
2607
void taos_stmt_reclaim_fields(TAOS_STMT *stmt, TAOS_FIELD_E *fields) {
×
2608
  (void)stmt;
2609
  if (!fields) return;
×
2610
  taosMemoryFree(fields);
×
2611
}
2612

2613
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
52,023✔
2614
  if (stmt == NULL || bind == NULL) {
52,023✔
2615
    tscError("NULL parameter for %s", __FUNCTION__);
×
2616
    terrno = TSDB_CODE_INVALID_PARA;
×
2617
    return terrno;
×
2618
  }
2619

2620
  if (bind->num > 1) {
52,023✔
2621
    tscError("invalid bind number %d for %s", bind->num, __FUNCTION__);
4,404✔
2622
    terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
4,404✔
2623
    return terrno;
4,404✔
2624
  }
2625

2626
  return stmtBindBatch(stmt, bind, -1);
47,619✔
2627
}
2628

2629
int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
10,195,008✔
2630
  if (stmt == NULL || bind == NULL) {
10,195,008✔
2631
    tscError("NULL parameter for %s", __FUNCTION__);
311✔
2632
    terrno = TSDB_CODE_INVALID_PARA;
311✔
2633
    return terrno;
×
2634
  }
2635

2636
  if (bind->num <= 0 || bind->num > INT16_MAX) {
10,195,869✔
2637
    tscError("invalid bind num %d", bind->num);
4,506✔
2638
    terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
4,506✔
2639
    return terrno;
×
2640
  }
2641

2642
  int32_t insert = 0;
10,194,351✔
2643
  int32_t code = stmtIsInsert(stmt, &insert);
10,193,762✔
2644
  if (TSDB_CODE_SUCCESS != code) {
10,193,254✔
2645
    tscError("stmt insert failed, errcode:%s", tstrerror(code));
×
2646
    return code;
×
2647
  }
2648
  if (0 == insert && bind->num > 1) {
10,193,254✔
2649
    tscError("only one row data allowed for query");
×
2650
    terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
×
2651
    return terrno;
×
2652
  }
2653

2654
  return stmtBindBatch(stmt, bind, -1);
10,193,254✔
2655
}
2656

2657
int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx) {
1,260✔
2658
  if (stmt == NULL || bind == NULL) {
1,260✔
2659
    tscError("NULL parameter for %s", __FUNCTION__);
×
2660
    terrno = TSDB_CODE_INVALID_PARA;
×
2661
    return terrno;
×
2662
  }
2663

2664
  if (colIdx < 0) {
1,260✔
2665
    tscError("invalid bind column idx %d", colIdx);
×
2666
    terrno = TSDB_CODE_INVALID_PARA;
×
2667
    return terrno;
×
2668
  }
2669

2670
  int32_t insert = 0;
1,260✔
2671
  int32_t code = stmtIsInsert(stmt, &insert);
1,260✔
2672
  if (TSDB_CODE_SUCCESS != code) {
1,260✔
2673
    tscError("stmt insert failed, errcode:%s", tstrerror(code));
×
2674
    return code;
×
2675
  }
2676
  if (0 == insert && bind->num > 1) {
1,260✔
2677
    tscError("only one row data allowed for query");
×
2678
    terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
×
2679
    return terrno;
×
2680
  }
2681

2682
  return stmtBindBatch(stmt, bind, colIdx);
1,260✔
2683
}
2684

2685
int taos_stmt_add_batch(TAOS_STMT *stmt) {
8,080,640✔
2686
  if (stmt == NULL) {
8,080,640✔
2687
    tscError("NULL parameter for %s", __FUNCTION__);
×
2688
    terrno = TSDB_CODE_INVALID_PARA;
×
2689
    return terrno;
×
2690
  }
2691

2692
  return stmtAddBatch(stmt);
8,080,640✔
2693
}
2694

2695
int taos_stmt_execute(TAOS_STMT *stmt) {
8,079,210✔
2696
  if (stmt == NULL) {
8,079,210✔
2697
    tscError("NULL parameter for %s", __FUNCTION__);
×
2698
    terrno = TSDB_CODE_INVALID_PARA;
×
2699
    return terrno;
×
2700
  }
2701

2702
  return stmtExec(stmt);
8,079,210✔
2703
}
2704

2705
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
×
2706
  if (stmt == NULL || insert == NULL) {
×
2707
    tscError("NULL parameter for %s", __FUNCTION__);
×
2708
    terrno = TSDB_CODE_INVALID_PARA;
×
2709
    return terrno;
×
2710
  }
2711

2712
  return stmtIsInsert(stmt, insert);
×
2713
}
2714

2715
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
×
2716
  if (stmt == NULL || nums == NULL) {
×
2717
    tscError("NULL parameter for %s", __FUNCTION__);
×
2718
    terrno = TSDB_CODE_INVALID_PARA;
×
2719
    return terrno;
×
2720
  }
2721

2722
  return stmtGetParamNum(stmt, nums);
×
2723
}
2724

2725
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
792✔
2726
  if (stmt == NULL || type == NULL || NULL == bytes || idx < 0) {
792✔
2727
    tscError("invalid parameter for %s", __FUNCTION__);
×
2728
    terrno = TSDB_CODE_INVALID_PARA;
×
2729
    return terrno;
×
2730
  }
2731

2732
  return stmtGetParam(stmt, idx, type, bytes);
792✔
2733
}
2734

2735
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
14,463✔
2736
  if (stmt == NULL) {
14,463✔
2737
    tscError("NULL parameter for %s", __FUNCTION__);
×
2738
    terrno = TSDB_CODE_INVALID_PARA;
×
2739
    return NULL;
×
2740
  }
2741

2742
  return stmtUseResult(stmt);
14,463✔
2743
}
2744

2745
char *taos_stmt_errstr(TAOS_STMT *stmt) { return (char *)stmtErrstr(stmt); }
14,883✔
2746

2747
int taos_stmt_affected_rows(TAOS_STMT *stmt) {
3,547✔
2748
  if (stmt == NULL) {
3,547✔
2749
    tscError("NULL parameter for %s", __FUNCTION__);
×
2750
    terrno = TSDB_CODE_INVALID_PARA;
×
2751
    return 0;
×
2752
  }
2753

2754
  return stmtAffectedRows(stmt);
3,547✔
2755
}
2756

2757
int taos_stmt_affected_rows_once(TAOS_STMT *stmt) {
5,644✔
2758
  if (stmt == NULL) {
5,644✔
2759
    tscError("NULL parameter for %s", __FUNCTION__);
×
2760
    terrno = TSDB_CODE_INVALID_PARA;
×
2761
    return 0;
×
2762
  }
2763

2764
  return stmtAffectedRowsOnce(stmt);
5,644✔
2765
}
2766

2767
int taos_stmt_close(TAOS_STMT *stmt) {
46,704✔
2768
  if (stmt == NULL) {
46,704✔
2769
    tscError("NULL parameter for %s", __FUNCTION__);
×
2770
    terrno = TSDB_CODE_INVALID_PARA;
×
2771
    return terrno;
×
2772
  }
2773

2774
  return stmtClose(stmt);
46,704✔
2775
}
2776

2777
TAOS_STMT2 *taos_stmt2_init(TAOS *taos, TAOS_STMT2_OPTION *option) {
204,415✔
2778
  if (NULL == taos) {
204,415✔
2779
    tscError("NULL parameter for %s", __FUNCTION__);
198✔
2780
    terrno = TSDB_CODE_INVALID_PARA;
198✔
2781
    return NULL;
198✔
2782
  }
2783
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
204,217✔
2784
  if (NULL == pObj) {
204,355✔
2785
    tscError("invalid parameter for %s", __FUNCTION__);
×
2786
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2787
    return NULL;
×
2788
  }
2789

2790
  TAOS_STMT2 *pStmt = stmtInit2(pObj, option);
204,355✔
2791

2792
  releaseTscObj(*(int64_t *)taos);
204,235✔
2793

2794
  return pStmt;
204,315✔
2795
}
2796

2797
int taos_stmt2_prepare(TAOS_STMT2 *stmt, const char *sql, unsigned long length) {
211,042✔
2798
  if (stmt == NULL || sql == NULL) {
211,042✔
2799
    tscError("NULL parameter for %s", __FUNCTION__);
198✔
2800
    terrno = TSDB_CODE_INVALID_PARA;
198✔
2801
    return terrno;
198✔
2802
  }
2803

2804
  return stmtPrepare2(stmt, sql, length);
210,884✔
2805
}
2806

2807
int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx) {
825,757✔
2808
  if (stmt == NULL) {
825,757✔
2809
    tscError("NULL parameter for %s", __FUNCTION__);
×
2810
    terrno = TSDB_CODE_INVALID_PARA;
×
2811
    return terrno;
×
2812
  }
2813

2814
  STscStmt2 *pStmt = (STscStmt2 *)stmt;
825,757✔
2815
  int32_t    code = TSDB_CODE_SUCCESS;
825,757✔
2816
  STMT2_DLOG_E("start to bind param");
825,757✔
2817

2818
  // check query bind number
2819
  bool isQuery = (STMT_TYPE_QUERY == pStmt->sql.type || (pStmt->sql.type == 0 && stmt2IsSelect(stmt)));
825,798✔
2820
  if (isQuery) {
826,108✔
2821
    if (bindv->count != 1 || bindv->bind_cols[0]->num != 1) {
10,909✔
2822
      terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
×
2823
      STMT2_ELOG_E("query only support one table and one row bind");
×
2824
      return terrno;
×
2825
    }
2826
  }
2827

2828
  if (atomic_load_8((int8_t *)&pStmt->asyncBindParam.asyncBindNum) > 1) {
826,108✔
2829
    STMT2_ELOG_E("async bind param is still working, please try again later");
14✔
2830
    terrno = TSDB_CODE_TSC_STMT_API_ERROR;
14✔
2831
    return terrno;
×
2832
  }
2833

2834
  if (pStmt->options.asyncExecFn && !pStmt->execSemWaited) {
825,623✔
2835
    if (tsem_wait(&pStmt->asyncExecSem) != 0) {
4,950✔
2836
      STMT2_ELOG_E("bind param wait asyncExecSem failed");
×
2837
    }
2838
    pStmt->execSemWaited = true;
4,950✔
2839
  }
2840

2841
  for (int i = 0; i < bindv->count; ++i) {
2,321,936✔
2842
    SVCreateTbReq *pCreateTbReq = NULL;
1,499,742✔
2843
    if (!isQuery) {
1,499,611✔
2844
      STMT2_TLOG("start to bind %dth table", i);
1,488,628✔
2845
      if (bindv->tbnames && bindv->tbnames[i]) {
1,488,737✔
2846
        code = stmtSetTbName2(stmt, bindv->tbnames[i]);
1,317,743✔
2847
        if (code) {
1,317,521✔
2848
          terrno = code;
2,370✔
2849
          STMT2_ELOG("set tbname failed, code:%s", stmt2Errstr(stmt));
2,370✔
2850
          return terrno;
3,756✔
2851
        }
2852
      }
2853

2854
      if (bindv->tags && bindv->tags[i]) {
1,486,170✔
2855
        code = stmtSetTbTags2(stmt, bindv->tags[i], &pCreateTbReq);
563,139✔
2856
      } else if (pStmt->bInfo.tbNameFlag & IS_FIXED_TAG) {
922,726✔
2857
        code = stmtCheckTags2(stmt, &pCreateTbReq);
117,890✔
2858
      } else if (pStmt->sql.autoCreateTbl) {
804,809✔
2859
        code = stmtSetTbTags2(stmt, NULL, &pCreateTbReq);
4,158✔
2860
      }
2861

2862
      if (code) {
1,485,743✔
2863
        terrno = code;
198✔
2864
        STMT2_ELOG("set tags failed, code:%s", stmt2Errstr(stmt));
198✔
2865
        if (pCreateTbReq) {
198✔
2866
          tdDestroySVCreateTbReq(pCreateTbReq);
×
2867
          taosMemoryFreeClear(pCreateTbReq);
×
2868
        }
2869
        return terrno;
198✔
2870
      }
2871
    }
2872

2873
    if (bindv->bind_cols && bindv->bind_cols[i]) {
1,496,528✔
2874
      TAOS_STMT2_BIND *bind = bindv->bind_cols[i];
1,497,020✔
2875

2876
      if (bind->num <= 0 || bind->num > INT16_MAX) {
1,497,088✔
2877
        STMT2_ELOG("bind num:%d must > 0 and < INT16_MAX", bind->num);
567✔
2878
        code = terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
567✔
2879
        if (pCreateTbReq) {
×
2880
          tdDestroySVCreateTbReq(pCreateTbReq);
×
2881
          taosMemoryFreeClear(pCreateTbReq);
×
2882
        }
2883
        return terrno;
×
2884
      }
2885

2886
      code = stmtBindBatch2(stmt, bind, col_idx, pCreateTbReq);
1,496,652✔
2887
      if (TSDB_CODE_SUCCESS != code) {
1,496,892✔
2888
        terrno = code;
1,526✔
2889
        STMT2_ELOG("bind batch failed, code:%s", stmt2Errstr(stmt));
1,188✔
2890
        if (pCreateTbReq) {
1,188✔
2891
          tdDestroySVCreateTbReq(pCreateTbReq);
396✔
2892
          taosMemoryFreeClear(pCreateTbReq);
396✔
2893
        }
2894
        return terrno;
1,188✔
2895
      }
2896
    }
2897
  }
2898

2899
  return code;
822,360✔
2900
}
2901

2902
int taos_stmt2_bind_param_a(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col_idx, __taos_async_fn_t fp,
×
2903
                            void *param) {
2904
  if (stmt == NULL || bindv == NULL || fp == NULL) {
×
2905
    terrno = TSDB_CODE_INVALID_PARA;
×
2906
    return terrno;
×
2907
  }
2908

2909
  STscStmt2 *pStmt = (STscStmt2 *)stmt;
×
2910

2911
  ThreadArgs *args = (ThreadArgs *)taosMemoryMalloc(sizeof(ThreadArgs));
×
2912
  args->stmt = stmt;
×
2913
  args->bindv = bindv;
×
2914
  args->col_idx = col_idx;
×
2915
  args->fp = fp;
×
2916
  args->param = param;
×
2917

2918
  (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
2919
  if (atomic_load_8((int8_t *)&pStmt->asyncBindParam.asyncBindNum) > 0) {
×
2920
    (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
2921
    tscError("async bind param is still working, please try again later");
×
2922
    terrno = TSDB_CODE_TSC_STMT_API_ERROR;
×
2923
    return terrno;
×
2924
  }
2925
  (void)atomic_add_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
2926
  (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
2927

2928
  int code_s = taosStmt2AsyncBind(stmtAsyncBindThreadFunc, (void *)args);
×
2929
  if (code_s != TSDB_CODE_SUCCESS) {
×
2930
    terrno = code_s;
×
2931
    (void)taosThreadMutexLock(&(pStmt->asyncBindParam.mutex));
×
2932
    (void)taosThreadCondSignal(&(pStmt->asyncBindParam.waitCond));
×
2933
    (void)atomic_sub_fetch_8(&pStmt->asyncBindParam.asyncBindNum, 1);
×
2934
    (void)taosThreadMutexUnlock(&(pStmt->asyncBindParam.mutex));
×
2935
    tscError("async bind failed, code:%d , %s", code_s, tstrerror(code_s));
×
2936
  }
2937

2938
  return code_s;
×
2939
}
2940

2941
int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) {
818,702✔
2942
  if (stmt == NULL) {
818,702✔
2943
    tscError("NULL parameter for %s", __FUNCTION__);
×
2944
    terrno = TSDB_CODE_INVALID_PARA;
×
2945
    return terrno;
×
2946
  }
2947

2948
  return stmtExec2(stmt, affected_rows);
818,702✔
2949
}
2950

2951
int taos_stmt2_close(TAOS_STMT2 *stmt) {
203,460✔
2952
  if (stmt == NULL) {
203,460✔
2953
    tscError("NULL parameter for %s", __FUNCTION__);
×
2954
    terrno = TSDB_CODE_INVALID_PARA;
×
2955
    return terrno;
×
2956
  }
2957

2958
  return stmtClose2(stmt);
203,460✔
2959
}
2960

2961
int taos_stmt2_is_insert(TAOS_STMT2 *stmt, int *insert) {
3,410✔
2962
  if (stmt == NULL || insert == NULL) {
3,410✔
2963
    tscError("NULL parameter for %s", __FUNCTION__);
×
2964
    terrno = TSDB_CODE_INVALID_PARA;
×
2965
    return terrno;
×
2966
  }
2967
  *insert = stmt2IsInsert(stmt);
3,410✔
2968
  return TSDB_CODE_SUCCESS;
3,410✔
2969
}
2970

2971
int taos_stmt2_get_fields(TAOS_STMT2 *stmt, int *count, TAOS_FIELD_ALL **fields) {
19,627✔
2972
  if (stmt == NULL || count == NULL) {
19,627✔
2973
    tscError("NULL parameter for %s", __FUNCTION__);
198✔
2974
    terrno = TSDB_CODE_INVALID_PARA;
198✔
2975
    return terrno;
198✔
2976
  }
2977

2978
  STscStmt2 *pStmt = (STscStmt2 *)stmt;
19,429✔
2979
  STMT2_DLOG_E("start to get fields");
19,429✔
2980

2981
  if (STMT_TYPE_INSERT == pStmt->sql.type || STMT_TYPE_MULTI_INSERT == pStmt->sql.type ||
19,429✔
2982
      (pStmt->sql.type == 0 && stmt2IsInsert(stmt))) {
18,043✔
2983
    return stmtGetStbColFields2(stmt, count, fields);
14,281✔
2984
  }
2985
  if (STMT_TYPE_QUERY == pStmt->sql.type || (pStmt->sql.type == 0 && stmt2IsSelect(stmt))) {
5,148✔
2986
    return stmtGetParamNum2(stmt, count, fields);
4,950✔
2987
  }
2988

2989
  tscError("Invalid sql for stmt %s", pStmt->sql.sqlStr);
198✔
2990
  return TSDB_CODE_PAR_SYNTAX_ERROR;
198✔
2991
}
2992

2993
DLL_EXPORT void taos_stmt2_free_fields(TAOS_STMT2 *stmt, TAOS_FIELD_ALL *fields) {
14,478✔
2994
  (void)stmt;
2995
  if (!fields) return;
14,478✔
2996
  taosMemoryFree(fields);
10,914✔
2997
}
2998

2999
TAOS_RES *taos_stmt2_result(TAOS_STMT2 *stmt) {
9,919✔
3000
  if (stmt == NULL) {
9,919✔
3001
    tscError("NULL parameter for %s", __FUNCTION__);
×
3002
    terrno = TSDB_CODE_INVALID_PARA;
×
3003
    return NULL;
×
3004
  }
3005

3006
  return stmtUseResult2(stmt);
9,919✔
3007
}
3008

3009
char *taos_stmt2_error(TAOS_STMT2 *stmt) { return (char *)stmt2Errstr(stmt); }
7,437✔
3010

3011
int taos_set_conn_mode(TAOS *taos, int mode, int value) {
2,816✔
3012
  int32_t code = 0;
2,816✔
3013
  if (taos == NULL) {
2,816✔
3014
    terrno = TSDB_CODE_INVALID_PARA;
×
3015
    return terrno;
×
3016
  }
3017

3018
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
2,816✔
3019
  if (NULL == pObj) {
2,816✔
3020
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3021
    tscError("invalid parameter for %s", __func__);
×
3022
    return terrno;
×
3023
  }
3024
  switch (mode) {
2,816✔
3025
    case TAOS_CONN_MODE_BI:
2,816✔
3026
      atomic_store_8(&pObj->biMode, value);
2,816✔
3027
      break;
2,816✔
3028
    default:
×
3029
      tscError("not supported mode.");
×
3030
      code = TSDB_CODE_INVALID_PARA;
×
3031
  }
3032
  releaseTscObj(*(int64_t *)taos);
2,816✔
3033
  return code;
2,816✔
3034
}
3035

3036
char *getBuildInfo() { return td_buildinfo; }
×
3037

3038
int32_t taos_connect_is_alive(TAOS *taos) {
×
3039
  int32_t code = 0, lino = 0;
×
3040
  if (taos == NULL) {
×
3041
    terrno = TSDB_CODE_INVALID_PARA;
×
3042
    return terrno;
×
3043
  }
3044

3045
  STscObj *pObj = acquireTscObj(*(int64_t *)taos);
×
3046
  if (NULL == pObj) {
×
3047
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3048
    tscError("invalid parameter for %s", __func__);
×
3049
    return terrno;
×
3050
  }
3051

3052
  code = tscCheckConnSessionMetric(pObj);
×
3053
  TAOS_CHECK_GOTO(code, &lino, _error);
×
3054

3055
_error:
×
3056
  releaseTscObj(*(int64_t *)taos);
×
3057

3058
  if (code != 0) {
×
3059
    tscError("taos conn failed to check alive, code:%d - %s", code, tstrerror(code));
×
3060
  }
3061

3062
  return code != 0 ? 0 : 1;
×
3063
}
3064
static int32_t buildInstanceRegisterSql(const SInstanceRegisterReq *req, char **ppSql, uint32_t *pLen) {
×
3065
  const char *action = (req->expire < 0) ? "UNREGISTER" : "REGISTER";
×
3066
  int32_t     len = 0;
×
3067

3068
  len += snprintf(NULL, 0, "%s INSTANCE '%s'", action, req->id);
×
3069
  if (req->type[0] != 0) {
×
3070
    len += snprintf(NULL, 0, " TYPE '%s'", req->type);
×
3071
  }
3072
  if (req->desc[0] != 0) {
×
3073
    len += snprintf(NULL, 0, " DESC '%s'", req->desc);
×
3074
  }
3075
  if (req->expire >= 0) {
×
3076
    len += snprintf(NULL, 0, " EXPIRE %d", req->expire);
×
3077
  }
3078

3079
  char *sql = taosMemoryMalloc((size_t)len + 1);
×
3080
  if (sql == NULL) {
×
3081
    return terrno;
×
3082
  }
3083

3084
  int32_t offset = snprintf(sql, (size_t)len + 1, "%s INSTANCE '%s'", action, req->id);
×
3085
  if (req->type[0] != 0) {
×
3086
    offset += snprintf(sql + offset, (size_t)len + 1 - (size_t)offset, " TYPE '%s'", req->type);
×
3087
  }
3088
  if (req->desc[0] != 0) {
×
3089
    offset += snprintf(sql + offset, (size_t)len + 1 - (size_t)offset, " DESC '%s'", req->desc);
×
3090
  }
3091
  if (req->expire >= 0) {
×
3092
    (void)snprintf(sql + offset, (size_t)len + 1 - (size_t)offset, " EXPIRE %d", req->expire);
×
3093
  }
3094

3095
  *ppSql = sql;
×
3096
  if (pLen != NULL) {
×
3097
    *pLen = (uint32_t)len;
×
3098
  }
3099
  return TSDB_CODE_SUCCESS;
×
3100
}
3101

3102
static int32_t sendInstanceRegisterReq(STscObj *pObj, const SInstanceRegisterReq *req) {
×
3103
  SRequestObj *pRequest = NULL;
×
3104
  int32_t      code = createRequest(pObj->id, TDMT_MND_REGISTER_INSTANCE, 0, &pRequest);
×
3105
  if (code != TSDB_CODE_SUCCESS) {
×
3106
    terrno = code;
×
3107
    return code;
×
3108
  }
3109

3110
  code = buildInstanceRegisterSql(req, &pRequest->sqlstr, (uint32_t *)&pRequest->sqlLen);
×
3111
  if (code != TSDB_CODE_SUCCESS) {
×
3112
    goto _cleanup;
×
3113
  }
3114

3115
  int32_t msgLen = tSerializeSInstanceRegisterReq(NULL, 0, (SInstanceRegisterReq *)req);
×
3116
  if (msgLen <= 0) {
×
3117
    code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
×
3118
    goto _cleanup;
×
3119
  }
3120

3121
  void *pMsg = taosMemoryMalloc(msgLen);
×
3122
  if (pMsg == NULL) {
×
3123
    code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
3124
    goto _cleanup;
×
3125
  }
3126

3127
  if (tSerializeSInstanceRegisterReq(pMsg, msgLen, (SInstanceRegisterReq *)req) < 0) {
×
3128
    code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
×
3129
    taosMemoryFree(pMsg);
×
3130
    goto _cleanup;
×
3131
  }
3132

3133
  pRequest->type = TDMT_MND_REGISTER_INSTANCE;
×
3134
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsg, .len = msgLen, .handle = NULL};
×
3135

3136
  SMsgSendInfo *pSend = buildMsgInfoImpl(pRequest);
×
3137
  if (pSend == NULL) {
×
3138
    code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
×
3139
    taosMemoryFree(pMsg);
×
3140
    pRequest->body.requestMsg.pData = NULL;
×
3141
    goto _cleanup;
×
3142
  }
3143

3144
  SEpSet epSet = getEpSet_s(&pObj->pAppInfo->mgmtEp);
×
3145
  code = asyncSendMsgToServer(pObj->pAppInfo->pTransporter, &epSet, NULL, pSend);
×
3146
  if (code != TSDB_CODE_SUCCESS) {
×
3147
    destroySendMsgInfo(pSend);
×
3148
    pRequest->body.requestMsg = (SDataBuf){0};
×
3149
    goto _cleanup;
×
3150
  }
3151

3152
  code = tsem_wait(&pRequest->body.rspSem);
×
3153
  if (code != TSDB_CODE_SUCCESS) {
×
3154
    code = terrno != 0 ? terrno : code;
×
3155
    goto _cleanup;
×
3156
  }
3157

3158
  code = pRequest->code;
×
3159
  terrno = code;
×
3160

3161
_cleanup:
×
3162
  destroyRequest(pRequest);
×
3163
  return code;
×
3164
}
3165

3166
static bool instanceRegisterRpcRfp(int32_t code, tmsg_t msgType) {
×
3167
  if (NEED_REDIRECT_ERROR(code)) {
×
3168
    return true;
×
3169
  } else if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY || code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE ||
×
3170
             code == TSDB_CODE_SYN_WRITE_STALL || code == TSDB_CODE_SYN_PROPOSE_NOT_READY ||
×
3171
             code == TSDB_CODE_SYN_RESTORING) {
3172
    tscDebug("client msg type %s should retry since %s", TMSG_INFO(msgType), tstrerror(code));
×
3173
    return true;
×
3174
  } else {
3175
    return false;
×
3176
  }
3177
}
3178

3179
/** Build epSet from firstEp and secondEp in config. pCfg must have valid firstEp. */
3180
static int32_t instanceBuildEpSetFromCfg(SConfig *pCfg, SEpSet *pEpSet) {
39,368✔
3181
  SConfigItem *pFirstEpItem = cfgGetItem(pCfg, "firstEp");
39,368✔
3182
  if (pFirstEpItem == NULL || pFirstEpItem->str == NULL || pFirstEpItem->str[0] == 0) {
39,368✔
3183
    return TSDB_CODE_CFG_NOT_FOUND;
×
3184
  }
3185
  SEp     firstEp = {0};
39,368✔
3186
  int32_t code = taosGetFqdnPortFromEp(pFirstEpItem->str, &firstEp);
39,368✔
3187
  if (code != TSDB_CODE_SUCCESS) {
39,368✔
3188
    return code;
×
3189
  }
3190
  pEpSet->inUse = 0;
39,368✔
3191
  pEpSet->numOfEps = 1;
39,368✔
3192
  tstrncpy(pEpSet->eps[0].fqdn, firstEp.fqdn, TSDB_FQDN_LEN);
39,368✔
3193
  pEpSet->eps[0].port = firstEp.port;
39,368✔
3194

3195
  SConfigItem *pSecondEpItem = cfgGetItem(pCfg, "secondEp");
39,368✔
3196
  if (pSecondEpItem != NULL && pSecondEpItem->str != NULL && pSecondEpItem->str[0] != 0) {
39,368✔
3197
    SEp secondEp = {0};
39,368✔
3198
    if (taosGetFqdnPortFromEp(pSecondEpItem->str, &secondEp) == TSDB_CODE_SUCCESS) {
39,368✔
3199
      tstrncpy(pEpSet->eps[1].fqdn, secondEp.fqdn, TSDB_FQDN_LEN);
39,368✔
3200
      pEpSet->eps[1].port = secondEp.port;
39,368✔
3201
      pEpSet->numOfEps = 2;
39,368✔
3202
    }
3203
  }
3204
  return TSDB_CODE_SUCCESS;
39,368✔
3205
}
3206

3207
/** Init and open instance RPC client. label e.g. "INST" or "LIST". Returns handle or NULL. */
3208
static void *instanceOpenRpcClient(const char *label) {
296✔
3209
  SRpcInit rpcInit = {0};
296✔
3210
  rpcInit.label = (char *)label;
296✔
3211
  rpcInit.numOfThreads = 1;
296✔
3212
  rpcInit.cfp = NULL;
296✔
3213
  rpcInit.sessions = 16;
296✔
3214
  rpcInit.connType = TAOS_CONN_CLIENT;
296✔
3215
  rpcInit.idleTime = tsShellActivityTimer * 1000;
296✔
3216
  rpcInit.compressSize = tsCompressMsgSize;
296✔
3217
  rpcInit.user = TSDB_DEFAULT_USER;
296✔
3218
  rpcInit.rfp = instanceRegisterRpcRfp;
296✔
3219
  rpcInit.retryMinInterval = tsRedirectPeriod;
296✔
3220
  rpcInit.retryStepFactor = tsRedirectFactor;
296✔
3221
  rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
296✔
3222
  rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
296✔
3223

3224
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
296✔
3225
  connLimitNum = TMAX(connLimitNum, 10);
296✔
3226
  connLimitNum = TMIN(connLimitNum, 500);
296✔
3227
  rpcInit.connLimitNum = connLimitNum;
296✔
3228
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
296✔
3229
  rpcInit.readTimeout = tsReadTimeout;
296✔
3230
  rpcInit.ipv6 = tsEnableIpv6;
296✔
3231
  rpcInit.enableSSL = tsEnableTLS;
296✔
3232

3233
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
296✔
3234
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
296✔
3235
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
296✔
3236
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
296✔
3237
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
296✔
3238

3239
  int32_t code = taosVersionStrToInt(td_version, &rpcInit.compatibilityVer);
296✔
3240
  if (code != TSDB_CODE_SUCCESS) {
296✔
3241
    tscError("failed to convert taos version from str to int, errcode:%s", terrstr(code));
×
3242
    terrno = code;
×
3243
    return NULL;
×
3244
  }
3245

3246
  void *clientRpc = rpcOpen(&rpcInit);
296✔
3247
  if (clientRpc == NULL) {
296✔
3248
    tscError("failed to init instance rpc client since %s", tstrerror(terrno));
×
3249
  }
3250
  return clientRpc;
296✔
3251
}
3252

3253
/** Client-side rate limit: fixed 100 calls per 1s (register + list combined, protect mnode). */
3254
#define TSC_INSTANCE_API_RL_WINDOW_MS   1000
3255
#define TSC_INSTANCE_API_RL_MAX_PER_SEC 100
3256

3257
static TdThreadOnce  gInstRlOnce = PTHREAD_ONCE_INIT;
3258
static TdThreadMutex gInstRlMutex;
3259
static int32_t       gInstRlMutexInited = 0;
3260
static int64_t       gInstRlWindowStartMs = 0;
3261
static int32_t       gInstRlCountInWindow = 0;
3262

3263
static void instRlMutexInit(void) {
296✔
3264
  if (taosThreadMutexInit(&gInstRlMutex, NULL) == TSDB_CODE_SUCCESS) {
296✔
3265
    gInstRlMutexInited = 1;
296✔
3266
  }
3267
}
296✔
3268

3269
/** Call before instance RPC; shared by register and list. */
3270
static int32_t instanceApiRateLimitTry(void) {
39,368✔
3271
  int32_t c = taosThreadOnce(&gInstRlOnce, instRlMutexInit);
39,368✔
3272
  if (c != TSDB_CODE_SUCCESS) {
39,368✔
3273
    terrno = c;
×
3274
    return c;
×
3275
  }
3276
  if (!gInstRlMutexInited) {
39,368✔
3277
    tscError("instance API rate limiter init failed, block request");
×
3278
    terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
×
3279
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3280
  }
3281

3282
  int64_t now = taosGetTimestampMs();
39,368✔
3283
  (void)taosThreadMutexLock(&gInstRlMutex);
39,368✔
3284
  /* Reset window if first use, window elapsed, or clock moved backwards (NTP). */
3285
  if (gInstRlWindowStartMs == 0 || now < gInstRlWindowStartMs ||
39,368✔
3286
      (now - gInstRlWindowStartMs) >= (int64_t)TSC_INSTANCE_API_RL_WINDOW_MS) {
39,072✔
3287
    gInstRlWindowStartMs = now;
888✔
3288
    gInstRlCountInWindow = 0;
888✔
3289
  }
3290
  if (gInstRlCountInWindow >= TSC_INSTANCE_API_RL_MAX_PER_SEC) {
39,368✔
3291
    (void)taosThreadMutexUnlock(&gInstRlMutex);
296✔
3292
    tscWarn("instance API rate limit exceeded (max %d calls per %d ms, register and list combined)",
296✔
3293
            TSC_INSTANCE_API_RL_MAX_PER_SEC, TSC_INSTANCE_API_RL_WINDOW_MS);
3294
    terrno = TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT;
296✔
3295
    return TSDB_CODE_TSC_INSTANCE_API_RATE_LIMIT;
296✔
3296
  }
3297
  gInstRlCountInWindow++;
39,072✔
3298
  (void)taosThreadMutexUnlock(&gInstRlMutex);
39,072✔
3299
  return TSDB_CODE_SUCCESS;
39,072✔
3300
}
3301

3302
/** Process-wide singleton: connectionless instance APIs share one rpcOpen; closed in taos_cleanup. */
3303
static TdThreadOnce     gInstRpcOnce = PTHREAD_ONCE_INIT;
3304
static TdThreadMutex    gInstRpcMutex;
3305
static TdThreadCond     gInstRpcCond;
3306
static volatile int32_t gInstRpcMutexReady = 0;
3307
static volatile int32_t gInstRpcCondReady = 0;
3308
static void            *gInstRpc = NULL;
3309
static int32_t          gInstRpcRef = 0;
3310
static int32_t          gInstRpcClosing = 0;
3311

3312
static void instRpcMutexInit(void) {
296✔
3313
  if (taosThreadMutexInit(&gInstRpcMutex, NULL) == TSDB_CODE_SUCCESS) {
296✔
3314
    if (taosThreadCondInit(&gInstRpcCond, NULL) == TSDB_CODE_SUCCESS) {
296✔
3315
      gInstRpcCondReady = 1;
296✔
3316
      gInstRpcMutexReady = 1;
296✔
3317
      return;
296✔
3318
    }
3319
    (void)taosThreadMutexDestroy(&gInstRpcMutex);
×
3320
  }
3321
}
3322

3323
static int32_t instanceRpcAcquire(void **ppRpc) {
39,072✔
3324
  int32_t code = taosThreadOnce(&gInstRpcOnce, instRpcMutexInit);
39,072✔
3325
  if (code != TSDB_CODE_SUCCESS) {
39,072✔
3326
    terrno = code;
×
3327
    return code;
×
3328
  }
3329
  if (!gInstRpcMutexReady || !gInstRpcCondReady) {
39,072✔
3330
    tscError("instance RPC singleton not ready, block request");
×
3331
    terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
×
3332
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3333
  }
3334
  code = taosThreadMutexLock(&gInstRpcMutex);
39,072✔
3335
  if (code != TSDB_CODE_SUCCESS) {
39,072✔
3336
    return code;
×
3337
  }
3338
  if (gInstRpcClosing) {
39,072✔
3339
    (void)taosThreadMutexUnlock(&gInstRpcMutex);
×
3340
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
3341
  }
3342
  if (gInstRpc == NULL) {
39,072✔
3343
    gInstRpc = instanceOpenRpcClient("INST");
296✔
3344
    if (gInstRpc == NULL) {
296✔
3345
      code = terrno;
×
3346
      (void)taosThreadMutexUnlock(&gInstRpcMutex);
×
3347
      return code;
×
3348
    }
3349
    tscInfo("instance RPC singleton opened, handle:%p (search this line to count rpcOpen)", gInstRpc);
296✔
3350
  }
3351
  gInstRpcRef++;
39,072✔
3352
  *ppRpc = gInstRpc;
39,072✔
3353
  (void)taosThreadMutexUnlock(&gInstRpcMutex);
39,072✔
3354
  return TSDB_CODE_SUCCESS;
39,072✔
3355
}
3356

3357
static void instanceRpcRelease(void) {
39,072✔
3358
  if (!gInstRpcMutexReady || !gInstRpcCondReady) {
39,072✔
3359
    return;
×
3360
  }
3361
  (void)taosThreadMutexLock(&gInstRpcMutex);
39,072✔
3362
  if (gInstRpcRef > 0) {
39,072✔
3363
    gInstRpcRef--;
39,072✔
3364
    if (gInstRpcClosing && gInstRpcRef == 0) {
39,072✔
3365
      (void)taosThreadCondSignal(&gInstRpcCond);
×
3366
    }
3367
  }
3368
  (void)taosThreadMutexUnlock(&gInstRpcMutex);
39,072✔
3369
}
3370

3371
static void instanceRpcGlobalCleanup(void) {
1,669,423✔
3372
  if (!gInstRpcMutexReady || !gInstRpcCondReady) {
1,669,423✔
3373
    return;
1,669,127✔
3374
  }
3375
  (void)taosThreadMutexLock(&gInstRpcMutex);
296✔
3376
  gInstRpcClosing = 1;
296✔
3377
  while (gInstRpcRef > 0) {
296✔
3378
    (void)taosThreadCondWait(&gInstRpcCond, &gInstRpcMutex);
×
3379
  }
3380
  if (gInstRpc != NULL) {
296✔
3381
    tscInfo("instance RPC singleton closing, handle:%p (search this line to count rpcClose)", gInstRpc);
296✔
3382
    rpcClose(gInstRpc);
296✔
3383
    gInstRpc = NULL;
296✔
3384
  }
3385
  gInstRpcCondReady = 0;
296✔
3386
  gInstRpcMutexReady = 0;
296✔
3387
  (void)taosThreadMutexUnlock(&gInstRpcMutex);
296✔
3388
}
3389

3390
int32_t taos_register_instance(const char *id, const char *type, const char *desc, int32_t expire) {
35,224✔
3391
  if (id == NULL || id[0] == 0) {
35,224✔
3392
    return terrno = TSDB_CODE_INVALID_PARA;
×
3393
  }
3394

3395
  // Validate string lengths
3396
  size_t idLen = strlen(id);
35,224✔
3397
  if (idLen >= TSDB_INSTANCE_ID_LEN) {
35,224✔
3398
    tscError("instance id length %zu exceeds limit %d", idLen, TSDB_INSTANCE_ID_LEN - 1);
×
3399
    return terrno = TSDB_CODE_INVALID_PARA;
×
3400
  }
3401

3402
  if (type != NULL && type[0] != 0) {
35,224✔
3403
    size_t typeLen = strlen(type);
32,856✔
3404
    if (typeLen >= TSDB_INSTANCE_TYPE_LEN) {
32,856✔
3405
      tscError("instance type length %zu exceeds limit %d", typeLen, TSDB_INSTANCE_TYPE_LEN - 1);
×
3406
      return terrno = TSDB_CODE_INVALID_PARA;
×
3407
    }
3408
  }
3409

3410
  if (desc != NULL && desc[0] != 0) {
35,224✔
3411
    size_t descLen = strlen(desc);
32,856✔
3412
    if (descLen >= TSDB_INSTANCE_DESC_LEN) {
32,856✔
3413
      tscError("instance desc length %zu exceeds limit %d", descLen, TSDB_INSTANCE_DESC_LEN - 1);
×
3414
      return terrno = TSDB_CODE_INVALID_PARA;
×
3415
    }
3416
  }
3417

3418
  int32_t code = taos_init();
35,224✔
3419
  if (code != TSDB_CODE_SUCCESS) {
35,224✔
3420
    return code;
×
3421
  }
3422

3423
  SConfig *pCfg = taosGetCfg();
35,224✔
3424
  if (pCfg == NULL) {
35,224✔
3425
    return terrno = TSDB_CODE_CFG_NOT_FOUND;
×
3426
  }
3427

3428
  SEpSet epSet = {0};
35,224✔
3429
  code = instanceBuildEpSetFromCfg(pCfg, &epSet);
35,224✔
3430
  if (code != TSDB_CODE_SUCCESS) {
35,224✔
3431
    return terrno = code;
×
3432
  }
3433

3434
  code = instanceApiRateLimitTry();
35,224✔
3435
  if (code != TSDB_CODE_SUCCESS) {
35,224✔
3436
    return code;
296✔
3437
  }
3438

3439
  void *clientRpc = NULL;
34,928✔
3440
  code = instanceRpcAcquire(&clientRpc);
34,928✔
3441
  if (code != TSDB_CODE_SUCCESS) {
34,928✔
3442
    terrno = code;
×
3443
    return code;
×
3444
  }
3445

3446
  SRpcMsg rpcMsg = {0};
34,928✔
3447
  SRpcMsg rpcRsp = {0};
34,928✔
3448

3449
  // Prepare request
3450
  SInstanceRegisterReq req = {0};
34,928✔
3451
  tstrncpy(req.id, id, sizeof(req.id));
34,928✔
3452
  if (type != NULL && type[0] != 0) {
34,928✔
3453
    tstrncpy(req.type, type, sizeof(req.type));
32,560✔
3454
  }
3455
  if (desc != NULL && desc[0] != 0) {
34,928✔
3456
    tstrncpy(req.desc, desc, sizeof(req.desc));
32,560✔
3457
  }
3458
  req.expire = expire;
34,928✔
3459

3460
  int32_t contLen = tSerializeSInstanceRegisterReq(NULL, 0, &req);
34,928✔
3461
  if (contLen <= 0) {
34,928✔
3462
    code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
×
3463
    goto _register_inst_end;
×
3464
  }
3465

3466
  void *pCont = rpcMallocCont(contLen);
34,928✔
3467
  if (pCont == NULL) {
34,928✔
3468
    code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
3469
    goto _register_inst_end;
×
3470
  }
3471

3472
  if (tSerializeSInstanceRegisterReq(pCont, contLen, &req) < 0) {
34,928✔
3473
    code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
×
3474
    rpcFreeCont(pCont);
×
3475
    goto _register_inst_end;
×
3476
  }
3477

3478
  rpcMsg.pCont = pCont;
34,928✔
3479
  rpcMsg.contLen = contLen;
34,928✔
3480
  rpcMsg.msgType = TDMT_MND_REGISTER_INSTANCE;
34,928✔
3481
  rpcMsg.info.ahandle = (void *)0x9528;  // Different magic number from server status
34,928✔
3482
  rpcMsg.info.notFreeAhandle = 1;
34,928✔
3483

3484
  code = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
34,928✔
3485
  if (TSDB_CODE_SUCCESS != code) {
34,928✔
3486
    tscError("failed to send instance register req since %s", tstrerror(code));
×
3487
    goto _register_inst_end;
×
3488
  }
3489

3490
  if (rpcRsp.code != 0) {
34,928✔
3491
    code = rpcRsp.code;
×
3492
    tscError("instance register failed, code:%s", tstrerror(code));
×
3493
  } else {
3494
    code = TSDB_CODE_SUCCESS;
34,928✔
3495
  }
3496

3497
  if (rpcRsp.pCont != NULL) {
34,928✔
3498
    rpcFreeCont(rpcRsp.pCont);
34,928✔
3499
  }
3500

3501
_register_inst_end:
×
3502
  instanceRpcRelease();
34,928✔
3503
  terrno = code;
34,928✔
3504
  return code;
34,928✔
3505
}
3506

3507
int32_t taos_list_instances(const char *filter_type, char ***pList, int32_t *pCount) {
4,144✔
3508
  if (pList == NULL || pCount == NULL) {
4,144✔
3509
    return TSDB_CODE_INVALID_PARA;
×
3510
  }
3511

3512
  int32_t code = taos_init();
4,144✔
3513
  if (code != TSDB_CODE_SUCCESS) {
4,144✔
3514
    terrno = code;
×
3515
    return code;
×
3516
  }
3517

3518
  SConfig *pCfg = taosGetCfg();
4,144✔
3519
  if (pCfg == NULL) {
4,144✔
3520
    terrno = TSDB_CODE_CFG_NOT_FOUND;
×
3521
    return TSDB_CODE_CFG_NOT_FOUND;
×
3522
  }
3523

3524
  SEpSet epSet = {0};
4,144✔
3525
  code = instanceBuildEpSetFromCfg(pCfg, &epSet);
4,144✔
3526
  if (code != TSDB_CODE_SUCCESS) {
4,144✔
3527
    terrno = code;
×
3528
    return code;
×
3529
  }
3530

3531
  code = instanceApiRateLimitTry();
4,144✔
3532
  if (code != TSDB_CODE_SUCCESS) {
4,144✔
3533
    terrno = code;
×
3534
    return code;
×
3535
  }
3536

3537
  void *clientRpc = NULL;
4,144✔
3538
  code = instanceRpcAcquire(&clientRpc);
4,144✔
3539
  if (code != TSDB_CODE_SUCCESS) {
4,144✔
3540
    terrno = code;
×
3541
    return code;
×
3542
  }
3543

3544
  SRpcMsg rpcMsg = {0};
4,144✔
3545
  SRpcMsg rpcRsp = {0};
4,144✔
3546

3547
  SInstanceListReq req = {0};
4,144✔
3548
  if (filter_type != NULL && filter_type[0] != 0) {
4,144✔
3549
    tstrncpy(req.filter_type, filter_type, sizeof(req.filter_type));
2,368✔
3550
  }
3551

3552
  int32_t contLen = tSerializeSInstanceListReq(NULL, 0, &req);
4,144✔
3553
  if (contLen <= 0) {
4,144✔
3554
    code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
×
3555
    goto _list_inst_end;
×
3556
  }
3557

3558
  void *pCont = rpcMallocCont(contLen);
4,144✔
3559
  if (pCont == NULL) {
4,144✔
3560
    code = terrno != 0 ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
3561
    goto _list_inst_end;
×
3562
  }
3563

3564
  if (tSerializeSInstanceListReq(pCont, contLen, &req) < 0) {
4,144✔
3565
    code = terrno != 0 ? terrno : TSDB_CODE_TSC_INTERNAL_ERROR;
×
3566
    rpcFreeCont(pCont);
×
3567
    goto _list_inst_end;
×
3568
  }
3569

3570
  rpcMsg.pCont = pCont;
4,144✔
3571
  rpcMsg.contLen = contLen;
4,144✔
3572
  rpcMsg.msgType = TDMT_MND_LIST_INSTANCES;
4,144✔
3573
  rpcMsg.info.ahandle = (void *)0x9529;  // Different magic number from register
4,144✔
3574
  rpcMsg.info.notFreeAhandle = 1;
4,144✔
3575

3576
  code = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
4,144✔
3577
  if (TSDB_CODE_SUCCESS != code) {
4,144✔
3578
    tscError("failed to send instance list req since %s", tstrerror(code));
×
3579
    goto _list_inst_end;
×
3580
  }
3581

3582
  if (rpcRsp.code != 0) {
4,144✔
3583
    code = rpcRsp.code;
×
3584
    tscError("instance list failed, code:%s", tstrerror(code));
×
3585
    if (rpcRsp.pCont != NULL) {
×
3586
      rpcFreeCont(rpcRsp.pCont);
×
3587
    }
3588
    goto _list_inst_end;
×
3589
  }
3590

3591
  if (rpcRsp.pCont != NULL && rpcRsp.contLen > 0) {
4,144✔
3592
    SInstanceListRsp rsp = {0};
4,144✔
3593
    code = tDeserializeSInstanceListRsp(rpcRsp.pCont, rpcRsp.contLen, &rsp);
4,144✔
3594
    if (code != TSDB_CODE_SUCCESS) {
4,144✔
3595
      tscError("failed to deserialize instance list rsp, code:%s", tstrerror(code));
×
3596
      if (rsp.ids != NULL) {
×
3597
        for (int32_t i = 0; i < rsp.count; i++) {
×
3598
          if (rsp.ids[i] != NULL) {
×
3599
            taosMemoryFree(rsp.ids[i]);
×
3600
          }
3601
        }
3602
        taosMemoryFree(rsp.ids);
×
3603
        rsp.ids = NULL;
×
3604
      }
3605
      rsp.count = 0;
×
3606
      rpcFreeCont(rpcRsp.pCont);
×
3607
      goto _list_inst_end;
×
3608
    }
3609
    *pList = rsp.ids;
4,144✔
3610
    *pCount = rsp.count;
4,144✔
3611
  } else {
3612
    *pList = NULL;
×
3613
    *pCount = 0;
×
3614
  }
3615

3616
  if (rpcRsp.pCont != NULL) {
4,144✔
3617
    rpcFreeCont(rpcRsp.pCont);
4,144✔
3618
  }
3619
  code = TSDB_CODE_SUCCESS;
4,144✔
3620

3621
_list_inst_end:
4,144✔
3622
  instanceRpcRelease();
4,144✔
3623
  terrno = code;
4,144✔
3624
  return code;
4,144✔
3625
}
3626

3627
void taos_free_instances(char ***pList, int32_t count) {
2,960✔
3628
  if (pList == NULL || *pList == NULL || count <= 0) {
2,960✔
3629
    return;
×
3630
  }
3631

3632
  // Free each string in the array
3633
  for (int32_t i = 0; i < count; i++) {
9,176✔
3634
    if ((*pList)[i] != NULL) {
6,216✔
3635
      taosMemoryFree((*pList)[i]);
6,216✔
3636
      (*pList)[i] = NULL;
6,216✔
3637
    }
3638
  }
3639

3640
  // Free the array itself
3641
  taosMemoryFree(*pList);
2,960✔
3642
  *pList = NULL;
2,960✔
3643
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc