• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.05
/source/libs/qcom/src/queryUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "query.h"
18
#include "tglobal.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21
#include "tsched.h"
22
#include "tworker.h"
23
// clang-format off
24
#include "cJSON.h"
25
#include "queryInt.h"
26

27
typedef struct STaskQueue {
28
  SQueryAutoQWorkerPool wrokrerPool;
29
  STaosQueue* pTaskQueue;
30
} STaskQueue;
31

32
int32_t getAsofJoinReverseOp(EOperatorType op) {
108,233✔
33
  switch (op) {
108,233✔
34
    case OP_TYPE_GREATER_THAN:
3,180✔
35
      return OP_TYPE_LOWER_THAN;
3,180✔
36
    case OP_TYPE_GREATER_EQUAL:
3,180✔
37
      return OP_TYPE_LOWER_EQUAL;
3,180✔
38
    case OP_TYPE_LOWER_THAN:
5,437✔
39
      return OP_TYPE_GREATER_THAN;
5,437✔
40
    case OP_TYPE_LOWER_EQUAL:
6,771✔
41
      return OP_TYPE_GREATER_EQUAL;
6,771✔
42
    case OP_TYPE_EQUAL:
89,665✔
43
      return OP_TYPE_EQUAL;
89,665✔
44
    default:
×
45
      break;
×
46
  }
47

48
  return -1;
×
49
}
50

51
const SSchema* tGetTbnameColumnSchema() { 
×
52
  static struct SSchema _s = {
53
      .colId = TSDB_TBNAME_COLUMN_INDEX,
54
      .type = TSDB_DATA_TYPE_BINARY,
55
      .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
56
      .name = "tbname",
57
  };
58
  return &_s; 
×
59
}
60

61
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
73,664,339✔
62
  if (!pSchema) {
73,664,339✔
63
    return false;
×
64
  }
65
  int32_t    rowLen = 0;
73,664,339✔
66
  SSHashObj* pNameHash = tSimpleHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
73,664,339✔
67
  if (!pNameHash) {
73,665,108✔
68
    return false;
×
69
  }
70

71
  for (int32_t i = 0; i < numOfCols; ++i) {
527,364,271✔
72
    // 1. valid types
73
    if (!isValidDataType(pSchema[i].type)) {
453,699,443✔
74
      qError("The %d col/tag data type error, type:%d", i, pSchema[i].type);
107✔
75
      goto _error;
×
76
    }
77

78
    // 2. valid length for each type
79
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY) {
453,699,075✔
80
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
29,497,750✔
81
        qError("The %d col/tag var data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
82
        goto _error;
×
83
      }
84
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
424,202,000✔
85
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
37,553,834✔
86
        qError("The %d col/tag nchar data len error, len:%d", i, pSchema[i].bytes);
×
87
        goto _error;
×
88
      }
89
    } else if (pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
386,648,166✔
90
      if (pSchema[i].bytes > TSDB_MAX_GEOMETRY_LEN) {
351,868✔
91
        qError("The %d col/tag geometry data len error, len:%d", i, pSchema[i].bytes);
×
92
        goto _error;
×
93
      }
94
    } else if (IS_STR_DATA_BLOB(pSchema[i].type)) {
386,295,623✔
95
      if (pSchema[i].bytes >= TSDB_MAX_BLOB_LEN) {
32,654✔
96
        qError("The %d col/tag blob data len error, len:%d", i, pSchema[i].bytes);
×
97
        goto _error;
×
98
      } 
99

100
    } else {
101
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
386,263,644✔
102
        qError("The %d col/tag data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
103
        goto _error;
×
104
      }
105
    }
106

107
    // 3. valid column names
108
    size_t   nameLen = strnlen(pSchema[i].name, sizeof(pSchema[i].name));
453,698,804✔
109
    int32_t  nameIdx = i;
453,699,347✔
110
    int32_t* pIdx = tSimpleHashGet(pNameHash, pSchema[i].name, nameLen);
453,699,191✔
111
    if (pIdx != NULL) {
453,698,078✔
112
      qError("The %d col/tag name %s is same with %d col/tag name %s", i, pSchema[i].name, *pIdx,
×
113
             pSchema[*pIdx].name);
114
      goto _error;
×
115
    }
116
    if (tSimpleHashPut(pNameHash, pSchema[i].name, nameLen, &nameIdx, sizeof(nameIdx)) != TSDB_CODE_SUCCESS) {
453,698,078✔
117
      goto _error;
×
118
    }
119

120
    rowLen += pSchema[i].bytes;
453,699,163✔
121
  }
122

123
  tSimpleHashCleanup(pNameHash);
73,664,828✔
124
  return rowLen <= maxLen;
73,665,108✔
125
_error:
×
126
  tSimpleHashCleanup(pNameHash);
×
127
  return false;
×
128
}
129

130
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags, bool isVirtual) {
36,832,507✔
131
  if (!pSchema) {
36,832,507✔
132
    qError("invalid numOfCols: %d", numOfCols);
×
133
    return false;
×
134
  }
135

136
  if ((isVirtual && !VALIDNUMOFCOLSVIRTUAL(numOfCols)) || (!isVirtual && !VALIDNUMOFCOLS(numOfCols))) {
36,832,507✔
137
    qError("invalid numOfCols: %d", numOfCols);
611✔
138
    return false;
×
139
  }
140

141
  if (!VALIDNUMOFTAGS(numOfTags)) {
36,831,990✔
142
    qError("invalid numOfTags: %d", numOfTags);
94✔
143
    return false;
×
144
  }
145

146
  /* first column must be the timestamp, which is a primary key */
147
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
36,832,507✔
148
    qError("invalid first column type: %d", pSchema[0].type);
×
149
    return false;
×
150
  }
151

152
  if (!doValidateSchema(pSchema, numOfCols, isVirtual ? TSDB_MAX_BYTES_PER_ROW_VIRTUAL : TSDB_MAX_BYTES_PER_ROW)) {
36,832,507✔
153
    qError("validate schema columns failed");
×
154
    return false;
×
155
  }
156

157
  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
36,832,507✔
158
    qError("validate schema tags failed");
×
159
    return false;
×
160
  }
161

162
  return true;
36,832,601✔
163
}
164

165
static STaskQueue taskQueue = {0};
166

167
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
2,147,483,647✔
168
  if(!pSchedMsg || !pSchedMsg->ahandle) return;
2,147,483,647✔
169
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
2,147,483,647✔
170
  (void)execFn(pSchedMsg->thandle);
2,147,483,647✔
171
  taosFreeQitem(pSchedMsg);
2,147,483,647✔
172
}
173

174
int32_t initTaskQueue() {
1,471,660✔
175
  memset(&taskQueue, 0, sizeof(taskQueue));
1,471,660✔
176
  
177
  taskQueue.wrokrerPool.name = "taskWorkPool";
1,471,660✔
178
  taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
1,471,660✔
179
  taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
1,471,660✔
180
  int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
1,471,660✔
181
  if (TSDB_CODE_SUCCESS != coce) {
1,471,660✔
182
    qError("failed to init task thread pool");
×
183
    return -1;
×
184
  }
185

186
  taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
1,471,660✔
187
  if (NULL == taskQueue.pTaskQueue) {
1,471,660✔
188
    qError("failed to init task queue");
×
189
    return -1;
×
190
  }
191

192
  qInfo("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
1,471,660✔
193
  return 0;
1,471,660✔
194
}
195

196
int32_t cleanupTaskQueue() {
1,471,703✔
197
  tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
1,471,703✔
198
  return 0;
1,471,703✔
199
}
200

201
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
2,147,483,647✔
202
  SSchedMsg* pSchedMsg; 
2,147,483,647✔
203
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
2,147,483,647✔
204
  if (rc) return rc;
2,147,483,647✔
205
  pSchedMsg->fp = NULL;
2,147,483,647✔
206
  pSchedMsg->ahandle = execFn;
2,147,483,647✔
207
  pSchedMsg->thandle = execParam;
2,147,483,647✔
208
  pSchedMsg->msg = code;
2,147,483,647✔
209

210
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
2,147,483,647✔
211
}
212

213
int32_t taosAsyncWait() {
529,702✔
214
  if (!taskQueue.wrokrerPool.pCb) {
529,702✔
215
    qError("query task thread pool callback function is null");
×
216
    return -1;
×
217
  }
218
  return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
529,702✔
219
}
220

221
int32_t taosAsyncRecover() {
529,702✔
222
  if (!taskQueue.wrokrerPool.pCb) {
529,702✔
223
    qError("query task thread pool callback function is null");
×
224
    return -1;
×
225
  }
226
  return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
529,702✔
227
}
228

229
int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) {
×
230
  SSchedMsg* pSchedMsg;
×
231
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
×
232
  if (rc) return rc;
×
233
  pSchedMsg->fp = NULL;
×
234
  pSchedMsg->ahandle = bindFn;
×
235
  pSchedMsg->thandle = bindParam;
×
236
  // pSchedMsg->msg = code;
237

238
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
×
239
}
240

241
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
2,147,483,647✔
242
  if (NULL == pMsgBody) {
2,147,483,647✔
243
    return;
×
244
  }
245

246
  
247
  qDebug("ahandle %p freed, QID:0x%" PRIx64, pMsgBody, pMsgBody->requestId);
2,147,483,647✔
248
  
249
  taosMemoryFreeClear(pMsgBody->target.dbFName);
2,147,483,647✔
250
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
2,147,483,647✔
251
  if (pMsgBody->paramFreeFp) {
2,147,483,647✔
252
    (*pMsgBody->paramFreeFp)(pMsgBody->param);
2,147,483,647✔
253
  }
254
  taosMemoryFreeClear(pMsgBody);
2,147,483,647✔
255
}
256
void destroyAhandle(void *ahandle) {
532,064,231✔
257
  SMsgSendInfo *pSendInfo = ahandle;
532,064,231✔
258
  if (pSendInfo == NULL) return;
532,064,231✔
259

260
  if (pSendInfo->streamAHandle) {
532,063,459✔
261
    qDebug("stream ahandle %p freed", pSendInfo);
38,054,117✔
262
  }
263

264
  destroySendMsgInfo(pSendInfo);
532,175,077✔
265
}
266

267
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
2,147,483,647✔
268
                                bool persistHandle, void* rpcCtx) {                         
269
  if (NULL == pTransporter || NULL == epSet || NULL == pInfo) {
2,147,483,647✔
270
    destroySendMsgInfo(pInfo);
×
271
    return TSDB_CODE_TSC_INVALID_INPUT;
×
272
  }
273

274
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
2,147,483,647✔
275
  if (NULL == pMsg) {
2,147,483,647✔
276
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
×
277
    destroySendMsgInfo(pInfo);
×
278
    return terrno;
×
279
  }
280

281
  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
2,147,483,647✔
282
  taosMemoryFreeClear(pInfo->msgInfo.pData);
2,147,483,647✔
283
  SRpcMsg rpcMsg = {
2,147,483,647✔
284
    .msgType = pInfo->msgType,
2,147,483,647✔
285
    .pCont = pMsg,
286
    .contLen = pInfo->msgInfo.len,
2,147,483,647✔
287
    .info.ahandle = (void*)pInfo,
288
    .info.handle = pInfo->msgInfo.handle,
2,147,483,647✔
289
    .info.persistHandle = persistHandle,
290
    .code = 0
291
  };
292
  TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
2,147,483,647✔
293
  TRACE_SET_MSGID(&rpcMsg.info.traceId, tGenIdPI64());
2,147,483,647✔
294
  int32_t msgType = pInfo->msgType;
2,147,483,647✔
295

296
  int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
2,147,483,647✔
297
  if (code) {
2,147,483,647✔
298
    destroySendMsgInfo(pInfo);
265,210✔
299
  } else {
300
    qDebug("msg %s sent, 0x%" PRIx64 ":0x%" PRIx64, TMSG_INFO(msgType), TRACE_GET_ROOTID(&rpcMsg.info.traceId), TRACE_GET_MSGID(&rpcMsg.info.traceId));
2,147,483,647✔
301
  }
302
  return code;
2,147,483,647✔
303
}
304

305
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
622,872,215✔
306
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
622,872,215✔
307
}
308

309
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
297,702,409✔
310
  QUERY_PARAM_CHECK(pTransporter);
297,702,409✔
311
  return rpcFreeConnById(pTransporter, pid);
297,702,409✔
312
}
313

314
char* jobTaskStatusStr(int32_t status) {
2,147,483,647✔
315
  switch (status) {
2,147,483,647✔
316
    case JOB_TASK_STATUS_NULL:
786,565,021✔
317
      return "NULL";
786,565,021✔
318
    case JOB_TASK_STATUS_INIT:
1,708,871,452✔
319
      return "INIT";
1,708,871,452✔
320
    case JOB_TASK_STATUS_EXEC:
2,147,483,647✔
321
      return "EXECUTING";
2,147,483,647✔
322
    case JOB_TASK_STATUS_PART_SUCC:
1,763,307,684✔
323
      return "PARTIAL_SUCCEED";
1,763,307,684✔
324
    case JOB_TASK_STATUS_FETCH:
215,044,214✔
325
      return "FETCHING";
215,044,214✔
326
    case JOB_TASK_STATUS_SUCC:
464,458,107✔
327
      return "SUCCEED";
464,458,107✔
328
    case JOB_TASK_STATUS_FAIL:
8,226,415✔
329
      return "FAILED";
8,226,415✔
330
    case JOB_TASK_STATUS_DROP:
808,530,707✔
331
      return "DROPPING";
808,530,707✔
UNCOV
332
    default:
×
UNCOV
333
      break;
×
334
  }
335

UNCOV
336
  return "UNKNOWN";
×
337
}
338

339
#if 0
340
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
341
  SSchema s = {0};
342
  s.type = type;
343
  s.bytes = bytes;
344
  s.colId = colId;
345

346
  tstrncpy(s.name, name, tListLen(s.name));
347
  return s;
348
}
349
#endif
350

351
void freeSTableMetaRspPointer(void *p) {
51,027,078✔
352
  tFreeSTableMetaRsp(*(void**)p);
51,027,078✔
353
  taosMemoryFreeClear(*(void**)p);
51,021,834✔
354
}
51,019,259✔
355

356
void destroyQueryExecRes(SExecResult* pRes) {
2,147,483,647✔
357
  if (NULL == pRes || NULL == pRes->res) {
2,147,483,647✔
358
    return;
1,892,945,253✔
359
  }
360

361
  switch (pRes->msgType) {
761,941,230✔
362
    case TDMT_VND_CREATE_TABLE: {
43,829,956✔
363
      taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
43,829,956✔
364
      break;
43,824,857✔
365
    }
366
    case TDMT_MND_CREATE_STB:
10,580,137✔
367
    case TDMT_VND_ALTER_TABLE:
368
    case TDMT_MND_ALTER_STB: {
369
      tFreeSTableMetaRsp(pRes->res);
10,580,137✔
370
      taosMemoryFreeClear(pRes->res);
10,580,137✔
371
      break;
10,580,137✔
372
    }
373
    case TDMT_VND_SUBMIT: {
477,012,876✔
374
      tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
477,012,876✔
375
      taosMemoryFreeClear(pRes->res);
477,002,892✔
376
      break;
477,010,316✔
377
    }
378
    case TDMT_SCH_QUERY:
230,516,148✔
379
    case TDMT_SCH_MERGE_QUERY: {
380
      qDebug("query execRes %p freed", pRes->res);
230,516,148✔
381
      taosArrayDestroy((SArray*)pRes->res);
230,516,148✔
382
      break;
230,515,915✔
383
    }
384
    default:
337✔
385
      qError("invalid exec result for request type:%d", pRes->msgType);
337✔
386
  }
387
}
388
// clang-format on
389
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
64,402✔
390
  QUERY_PARAM_CHECK(str);
64,402✔
391
  QUERY_PARAM_CHECK(buf);
64,402✔
392
  int32_t n = 0;
64,402✔
393

394
  switch (type) {
64,402✔
395
    case TSDB_DATA_TYPE_NULL:
×
396
      n = snprintf(str, capacity, "null");
×
397
      break;
×
398

399
    case TSDB_DATA_TYPE_BOOL:
5,000✔
400
      n = snprintf(str, capacity, (*(int8_t*)buf) ? "true" : "false");
5,000✔
401
      break;
5,000✔
402

403
    case TSDB_DATA_TYPE_TINYINT:
223✔
404
      n = snprintf(str, capacity, "%d", *(int8_t*)buf);
223✔
405
      break;
223✔
406

407
    case TSDB_DATA_TYPE_SMALLINT:
223✔
408
      n = snprintf(str, capacity, "%d", *(int16_t*)buf);
223✔
409
      break;
223✔
410

411
    case TSDB_DATA_TYPE_INT:
18,953✔
412
      n = snprintf(str, capacity, "%d", *(int32_t*)buf);
18,953✔
413
      break;
18,953✔
414

415
    case TSDB_DATA_TYPE_BIGINT:
1,242✔
416
    case TSDB_DATA_TYPE_TIMESTAMP:
417
      n = snprintf(str, capacity, "%" PRId64, *(int64_t*)buf);
1,242✔
418
      break;
1,242✔
419

420
    case TSDB_DATA_TYPE_FLOAT:
4,204✔
421
      n = snprintf(str, capacity, "%e", GET_FLOAT_VAL(buf));
4,204✔
422
      break;
4,204✔
423

424
    case TSDB_DATA_TYPE_DOUBLE:
3,408✔
425
      n = snprintf(str, capacity, "%e", GET_DOUBLE_VAL(buf));
3,408✔
426
      break;
3,408✔
427

428
    case TSDB_DATA_TYPE_VARBINARY: {
×
429
      if (bufSize < 0) {
×
430
        //        tscError("invalid buf size");
431
        return TSDB_CODE_TSC_INVALID_VALUE;
×
432
      }
433
      void*    data = NULL;
×
434
      uint32_t size = 0;
×
435
      if (taosAscii2Hex(buf, bufSize, &data, &size) < 0) {
×
436
        return TSDB_CODE_OUT_OF_MEMORY;
×
437
      }
438
      *str = '"';
×
439
      memcpy(str + 1, data, size);
×
440
      *(str + size + 1) = '"';
×
441
      n = size + 2;
×
442
      taosMemoryFree(data);
×
443
      break;
×
444
    }
445
    case TSDB_DATA_TYPE_BINARY:
18,653✔
446
    case TSDB_DATA_TYPE_GEOMETRY:
447
      if (bufSize < 0) {
18,653✔
448
        //        tscError("invalid buf size");
449
        return TSDB_CODE_TSC_INVALID_VALUE;
×
450
      }
451

452
      *str = '"';
18,653✔
453
      memcpy(str + 1, buf, bufSize);
18,653✔
454
      *(str + bufSize + 1) = '"';
18,653✔
455
      n = bufSize + 2;
18,653✔
456
      break;
18,653✔
457
    case TSDB_DATA_TYPE_NCHAR:
8,196✔
458
      if (bufSize < 0) {
8,196✔
459
        //        tscError("invalid buf size");
460
        return TSDB_CODE_TSC_INVALID_VALUE;
×
461
      }
462

463
      *str = '"';
8,196✔
464
      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1, NULL);
8,196✔
465
      if (length < 0) {
8,196✔
466
        return TSDB_CODE_TSC_INVALID_VALUE;
×
467
      }
468
      *(str + length + 1) = '"';
8,196✔
469
      n = length + 2;
8,196✔
470
      break;
8,196✔
471
    case TSDB_DATA_TYPE_UTINYINT:
223✔
472
      n = snprintf(str, capacity, "%d", *(uint8_t*)buf);
223✔
473
      break;
223✔
474

475
    case TSDB_DATA_TYPE_USMALLINT:
223✔
476
      n = snprintf(str, capacity, "%d", *(uint16_t*)buf);
223✔
477
      break;
223✔
478

479
    case TSDB_DATA_TYPE_UINT:
223✔
480
      n = snprintf(str, capacity, "%u", *(uint32_t*)buf);
223✔
481
      break;
223✔
482

483
    case TSDB_DATA_TYPE_UBIGINT:
3,631✔
484
      n = snprintf(str, capacity, "%" PRIu64, *(uint64_t*)buf);
3,631✔
485
      break;
3,631✔
486

487
    default:
×
488
      //      tscError("unsupported type:%d", type);
489
      return TSDB_CODE_TSC_INVALID_VALUE;
×
490
  }
491

492
  if (len) *len = n;
64,402✔
493

494
  return TSDB_CODE_SUCCESS;
64,402✔
495
}
496

497
void parseTagDatatoJson(void* p, char** jsonStr, void* charsetCxt) {
350,377✔
498
  if (!p || !jsonStr) {
350,377✔
499
    qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
×
500
    return;
×
501
  }
502
  char*   string = NULL;
350,377✔
503
  SArray* pTagVals = NULL;
350,377✔
504
  cJSON*  json = NULL;
350,377✔
505
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
350,377✔
506
    goto end;
×
507
  }
508

509
  int16_t nCols = taosArrayGetSize(pTagVals);
350,377✔
510
  if (nCols == 0) {
350,377✔
511
    goto end;
136,772✔
512
  }
513
  char tagJsonKey[256] = {0};
213,605✔
514
  json = cJSON_CreateObject();
213,605✔
515
  if (json == NULL) {
213,605✔
516
    goto end;
×
517
  }
518
  for (int j = 0; j < nCols; ++j) {
679,435✔
519
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
465,830✔
520
    if (pTagVal == NULL) {
465,830✔
521
      continue;
×
522
    }
523
    // json key  encode by binary
524
    tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
465,830✔
525
    // json value
526
    char type = pTagVal->type;
465,830✔
527
    if (type == TSDB_DATA_TYPE_NULL) {
465,830✔
528
      cJSON* value = cJSON_CreateNull();
30,134✔
529
      if (value == NULL) {
30,134✔
530
        goto end;
×
531
      }
532
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
30,134✔
533
        goto end;
×
534
      }
535
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
435,696✔
536
      cJSON* value = NULL;
344,800✔
537
      if (pTagVal->nData > 0) {
344,800✔
538
        char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
321,088✔
539
        if (tagJsonValue == NULL) {
321,088✔
540
          goto end;
×
541
        }
542
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue, charsetCxt);
321,088✔
543
        if (length < 0) {
321,088✔
544
          qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC,
×
545
                 charsetCxt != NULL ? ((SConvInfo*)(charsetCxt))->charset : tsCharset, pTagVal->pData);
546
          taosMemoryFree(tagJsonValue);
×
547
          goto end;
×
548
        }
549
        value = cJSON_CreateString(tagJsonValue);
321,088✔
550
        taosMemoryFree(tagJsonValue);
321,088✔
551
        if (value == NULL) {
321,088✔
552
          goto end;
×
553
        }
554
      } else if (pTagVal->nData == 0) {
23,712✔
555
        value = cJSON_CreateString("");
23,712✔
556
        if (value == NULL) {
23,712✔
557
          goto end;
×
558
        }
559
      } else {
560
        goto end;
×
561
      }
562

563
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
344,800✔
564
        goto end;
×
565
      }
566
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
90,896✔
567
      double jsonVd = *(double*)(&pTagVal->i64);
59,774✔
568
      cJSON* value = cJSON_CreateNumber(jsonVd);
59,774✔
569
      if (value == NULL) {
59,774✔
570
        goto end;
×
571
      }
572
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
59,774✔
573
        goto end;
×
574
      }
575
    } else if (type == TSDB_DATA_TYPE_BOOL) {
31,122✔
576
      char   jsonVd = *(char*)(&pTagVal->i64);
31,122✔
577
      cJSON* value = cJSON_CreateBool(jsonVd);
31,122✔
578
      if (value == NULL) {
31,122✔
579
        goto end;
×
580
      }
581
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
31,122✔
582
        goto end;
×
583
      }
584
    } else {
585
      goto end;
×
586
    }
587
  }
588
  string = cJSON_PrintUnformatted(json);
213,605✔
589
end:
350,377✔
590
  cJSON_Delete(json);
350,377✔
591
  taosArrayDestroy(pTagVals);
350,377✔
592
  if (string == NULL) {
350,377✔
593
    string = taosStrdup(TSDB_DATA_NULL_STR_L);
136,772✔
594
    if (string == NULL) {
136,772✔
595
      qError("failed to strdup null string");
×
596
    }
597
  }
598
  *jsonStr = string;
350,377✔
599
}
600

601
int32_t setColRef(SColRef* colRef, col_id_t colId, const char* colName, char* refColName, char* refTableName, char* refDbName) {
135,759,656✔
602
  colRef->id = colId;
135,759,656✔
603
  colRef->hasRef = true;
135,759,656✔
604
  tstrncpy(colRef->refDbName, refDbName, TSDB_DB_NAME_LEN);
135,759,656✔
605
  tstrncpy(colRef->refTableName, refTableName, TSDB_TABLE_NAME_LEN);
135,759,656✔
606
  tstrncpy(colRef->refColName, refColName, TSDB_COL_NAME_LEN);
135,759,656✔
607
  if (colName) {
135,759,656✔
608
    tstrncpy(colRef->colName, colName, TSDB_COL_NAME_LEN);
58,729,282✔
609
  } else {
610
    colRef->colName[0] = '\0';
77,030,374✔
611
  }
612
  return TSDB_CODE_SUCCESS;
135,759,656✔
613
}
614

615
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
44,074,705✔
616
  QUERY_PARAM_CHECK(pDst);
44,074,705✔
617
  if (NULL == pSrc) {
44,074,705✔
618
    *pDst = NULL;
67,681✔
619
    return TSDB_CODE_SUCCESS;
67,681✔
620
  }
621

622
  int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
44,007,024✔
623
  if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
44,011,812✔
624
    *pDst = NULL;
1,355✔
625
    qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
×
626
    return TSDB_CODE_INVALID_PARA;
×
627
  }
628

629
  int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
44,010,457✔
630
  int32_t schemaExtSize = 0;
44,010,457✔
631
  int32_t colRefSize = 0;
44,010,457✔
632
  int32_t tagRefSize = 0;
44,010,457✔
633
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
44,010,457✔
634
    schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
43,239,889✔
635
  }
636
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
44,007,368✔
637
    colRefSize = pSrc->numOfColRefs * sizeof(SColRef);
175,692✔
638
  }
639
  if (hasRefCol(pSrc->tableType) && pSrc->tagRef) {
44,011,503✔
640
    tagRefSize = pSrc->numOfTagRefs * sizeof(SColRef);
×
641
  }
642
  *pDst = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize + tagRefSize);
44,015,273✔
643
  if (NULL == *pDst) {
43,979,335✔
644
    return terrno;
×
645
  }
646
  memcpy(*pDst, pSrc, metaSize);
43,980,023✔
647
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
43,987,219✔
648
    (*pDst)->schemaExt = (SSchemaExt*)((char*)*pDst + metaSize);
43,246,383✔
649
    memcpy((*pDst)->schemaExt, pSrc->schemaExt, schemaExtSize);
43,245,709✔
650
  } else {
651
    (*pDst)->schemaExt = NULL;
775,721✔
652
  }
653
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
44,017,323✔
654
    (*pDst)->colRef = (SColRef*)((char*)*pDst + metaSize + schemaExtSize);
175,692✔
655
    memcpy((*pDst)->colRef, pSrc->colRef, colRefSize);
175,692✔
656
  } else {
657
    (*pDst)->colRef = NULL;
43,844,320✔
658
  }
659
  if (hasRefCol(pSrc->tableType) && pSrc->tagRef) {
44,021,423✔
660
    (*pDst)->tagRef = (SColRef*)((char*)*pDst + metaSize + schemaExtSize + colRefSize);
×
661
    memcpy((*pDst)->tagRef, pSrc->tagRef, tagRefSize);
×
662
    (*pDst)->numOfTagRefs = pSrc->numOfTagRefs;
×
663
  } else {
664
    (*pDst)->tagRef = NULL;
44,018,355✔
665
    (*pDst)->numOfTagRefs = 0;
44,017,316✔
666
  }
667

668
  return TSDB_CODE_SUCCESS;
44,017,653✔
669
}
670

671
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
9,216✔
672
  if (!pMeta || !pName || !pType) return;
9,216✔
673
  int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
9,216✔
674
  for (int32_t i = 0; i < nums; ++i) {
50,315✔
675
    if (0 == strcmp(pName, pMeta->schema[i].name)) {
49,931✔
676
      *pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
8,832✔
677
      return;
8,832✔
678
    }
679
  }
680

681
  *pType = TCOL_TYPE_NONE;
384✔
682
}
683

684
void freeVgInfo(SDBVgInfo* vgInfo) {
48,017,886✔
685
  if (NULL == vgInfo) {
48,017,886✔
686
    return;
11,630,548✔
687
  }
688

689
  taosHashCleanup(vgInfo->vgHash);
36,387,338✔
690
  taosArrayDestroy(vgInfo->vgArray);
36,387,301✔
691

692
  taosMemoryFreeClear(vgInfo);
36,387,338✔
693
}
694

695
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
4,619,256✔
696
  QUERY_PARAM_CHECK(pDst);
4,619,256✔
697
  if (NULL == pSrc) {
4,619,256✔
698
    *pDst = NULL;
×
699
    return TSDB_CODE_SUCCESS;
×
700
  }
701

702
  *pDst = taosMemoryMalloc(sizeof(*pSrc));
4,619,256✔
703
  if (NULL == *pDst) {
4,619,256✔
704
    return terrno;
×
705
  }
706
  memcpy(*pDst, pSrc, sizeof(*pSrc));
4,619,256✔
707
  (*pDst)->vgArray = NULL;
4,619,256✔
708

709
  if (pSrc->vgHash) {
4,619,256✔
710
    (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
4,155,725✔
711
                                   HASH_ENTRY_LOCK);
712
    if (NULL == (*pDst)->vgHash) {
4,155,725✔
713
      taosMemoryFreeClear(*pDst);
×
714
      return terrno;
×
715
    }
716

717
    SVgroupInfo* vgInfo = NULL;
4,155,725✔
718
    void*        pIter = taosHashIterate(pSrc->vgHash, NULL);
4,155,725✔
719
    while (pIter) {
12,462,923✔
720
      vgInfo = pIter;
8,307,198✔
721
      int32_t* vgId = taosHashGetKey(pIter, NULL);
8,307,198✔
722

723
      if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
8,307,198✔
724
        qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
×
725
        taosHashCancelIterate(pSrc->vgHash, pIter);
×
726
        freeVgInfo(*pDst);
×
727
        return terrno;
×
728
      }
729

730
      pIter = taosHashIterate(pSrc->vgHash, pIter);
8,307,198✔
731
    }
732
  }
733

734
  return TSDB_CODE_SUCCESS;
4,619,256✔
735
}
736

737
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
14,563,156✔
738
  QUERY_PARAM_CHECK(pDst);
14,563,156✔
739
  if (NULL == pSrc) {
14,563,156✔
740
    *pDst = NULL;
117,000✔
741
    return TSDB_CODE_SUCCESS;
117,000✔
742
  }
743

744
  *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
14,446,156✔
745
  if (NULL == *pDst) {
14,421,383✔
746
    return terrno;
×
747
  }
748

749
  (*pDst)->flags = pSrc->flags;
14,422,745✔
750
  if (pSrc->name) {
14,445,863✔
751
    (*pDst)->name = taosStrdup(pSrc->name);
14,444,495✔
752
    if (NULL == (*pDst)->name) goto _exit;
14,450,907✔
753
  }
754
  (*pDst)->uid = pSrc->uid;
14,455,322✔
755
  (*pDst)->btime = pSrc->btime;
14,448,491✔
756
  (*pDst)->ttl = pSrc->ttl;
14,450,562✔
757
  (*pDst)->commentLen = pSrc->commentLen;
14,454,304✔
758
  if (pSrc->comment) {
14,448,863✔
759
    (*pDst)->comment = taosStrdup(pSrc->comment);
×
760
    if (NULL == (*pDst)->comment) goto _exit;
×
761
  }
762
  (*pDst)->type = pSrc->type;
14,453,254✔
763

764
  if (pSrc->type == TSDB_CHILD_TABLE) {
14,451,899✔
765
    if (pSrc->ctb.stbName) {
14,450,544✔
766
      (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
14,448,547✔
767
      if (NULL == (*pDst)->ctb.stbName) goto _exit;
14,450,258✔
768
    }
769
    (*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
14,452,936✔
770
    (*pDst)->ctb.suid = pSrc->ctb.suid;
14,455,309✔
771
    if (pSrc->ctb.tagName) {
14,440,019✔
772
      (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
14,453,987✔
773
      if (NULL == (*pDst)->ctb.tagName) goto _exit;
14,456,366✔
774
    }
775
    STag* pTag = (STag*)pSrc->ctb.pTag;
14,455,659✔
776
    if (pTag) {
14,450,927✔
777
      (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
14,452,308✔
778
      if (NULL == (*pDst)->ctb.pTag) goto _exit;
14,454,668✔
779
      memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
14,445,107✔
780
    }
781
  } else {
782
    (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
×
783
    (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
×
784
    if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
×
785
      (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
786
      if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
×
787
      memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
788
    }
789
  }
790

791
  return TSDB_CODE_SUCCESS;
14,447,172✔
792

793
_exit:
×
794
  tdDestroySVCreateTbReq(*pDst);
×
795
  taosMemoryFree(*pDst);
×
796
  *pDst = NULL;
×
797
  return terrno;
×
798
}
799

800
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
15,441,454✔
801
  if (pInfo) {
15,441,454✔
802
    taosArrayDestroy(pInfo->pRetensions);
5,092,999✔
803
  }
804
  taosMemoryFree(pInfo);
15,441,454✔
805
}
15,441,454✔
806

807
void* getTaskPoolWorkerCb() { return taskQueue.wrokrerPool.pCb; }
732,684,252✔
808

809
void tFreeStreamVtbOtbInfo(void* param);
810
void tFreeStreamVtbVtbInfo(void* param);
811
void tFreeStreamVtbDbVgInfo(void* param);
812

813
void destroySTagsInfo(STagsInfo* pInfo) {
1,566,574,908✔
814
  if (NULL == pInfo) {
1,566,574,908✔
815
    return;
1,566,587,214✔
816
  }
817
  taosArrayDestroy(pInfo->STagNames);
1,352✔
818

819
  for (int i = 0; i < taosArrayGetSize(pInfo->pTagVals); ++i) {
3,315✔
820
    STagVal* p = (STagVal*)taosArrayGet(pInfo->pTagVals, i);
1,950✔
821
    if (IS_VAR_DATA_TYPE(p->type)) {
1,950✔
822
      taosMemoryFreeClear(p->pData);
×
823
    }
824
  }
825
  taosArrayDestroy(pInfo->pTagVals);
1,365✔
826
  taosMemoryFreeClear(pInfo->pTagIndex);
1,365✔
827
  taosMemoryFreeClear(pInfo);
1,365✔
828
}
829

830
void qDestroyBoundColInfo(void* pInfo) {
1,567,472,403✔
831
  if (NULL == pInfo) {
1,567,472,403✔
832
    return;
891,021✔
833
  }
834
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
1,566,581,382✔
835

836
  taosMemoryFreeClear(pBoundInfo->pColIndex);
1,566,581,382✔
837
  destroySTagsInfo(pBoundInfo->parseredTags);
1,566,581,858✔
838
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc