• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4933

20 Jan 2026 10:44AM UTC coverage: 66.671% (+0.03%) from 66.646%
#4933

push

travis-ci

web-flow
merge: from main to 3.0 #34340

73 of 178 new or added lines in 9 files covered. (41.01%)

1199 existing lines in 124 files now uncovered.

203121 of 304663 relevant lines covered (66.67%)

132228377.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.06
/source/libs/qcom/src/queryUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "query.h"
18
#include "tglobal.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21
#include "tsched.h"
22
#include "tworker.h"
23
// clang-format off
24
#include "cJSON.h"
25
#include "queryInt.h"
26

27
typedef struct STaskQueue {
28
  SQueryAutoQWorkerPool wrokrerPool;
29
  STaosQueue* pTaskQueue;
30
} STaskQueue;
31

32
int32_t getAsofJoinReverseOp(EOperatorType op) {
97,185✔
33
  switch (op) {
97,185✔
34
    case OP_TYPE_GREATER_THAN:
2,936✔
35
      return OP_TYPE_LOWER_THAN;
2,936✔
36
    case OP_TYPE_GREATER_EQUAL:
2,936✔
37
      return OP_TYPE_LOWER_EQUAL;
2,936✔
38
    case OP_TYPE_LOWER_THAN:
3,699✔
39
      return OP_TYPE_GREATER_THAN;
3,699✔
40
    case OP_TYPE_LOWER_EQUAL:
6,293✔
41
      return OP_TYPE_GREATER_EQUAL;
6,293✔
42
    case OP_TYPE_EQUAL:
81,321✔
43
      return OP_TYPE_EQUAL;
81,321✔
44
    default:
×
45
      break;
×
46
  }
47

48
  return -1;
×
49
}
50

51
const SSchema* tGetTbnameColumnSchema() { 
×
52
  static struct SSchema _s = {
53
      .colId = TSDB_TBNAME_COLUMN_INDEX,
54
      .type = TSDB_DATA_TYPE_BINARY,
55
      .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
56
      .name = "tbname",
57
  };
58
  return &_s; 
×
59
}
60

61
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
67,584,255✔
62
  if (!pSchema) {
67,584,255✔
63
    return false;
×
64
  }
65
  int32_t rowLen = 0;
67,584,255✔
66

67
  for (int32_t i = 0; i < numOfCols; ++i) {
488,591,875✔
68
    // 1. valid types
69
    if (!isValidDataType(pSchema[i].type)) {
421,006,724✔
70
      qError("The %d col/tag data type error, type:%d", i, pSchema[i].type);
×
71
      return false;
×
72
    }
73

74
    // 2. valid length for each type
75
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY) {
421,006,672✔
76
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
27,954,919✔
77
        qError("The %d col/tag var data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
78
        return false;
×
79
      }
80
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
393,052,003✔
81
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
34,871,357✔
82
        qError("The %d col/tag nchar data len error, len:%d", i, pSchema[i].bytes);
×
83
        return false;
×
84
      }
85
    } else if (pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
358,180,022✔
86
      if (pSchema[i].bytes > TSDB_MAX_GEOMETRY_LEN) {
330,776✔
87
        qError("The %d col/tag geometry data len error, len:%d", i, pSchema[i].bytes);
×
88
        return false;
×
89
      }
90
    } else if (IS_STR_DATA_BLOB(pSchema[i].type)) {
357,849,616✔
91
      if (pSchema[i].bytes >= TSDB_MAX_BLOB_LEN) {
17,042✔
92
        qError("The %d col/tag blob data len error, len:%d", i, pSchema[i].bytes);
×
93
        return false;
×
94
      } 
95

96
    }else {
97
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
357,834,016✔
98
        qError("The %d col/tag data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
99
        return false;
×
100
      }
101
    }
102

103
    // 3. valid column names
104
    for (int32_t j = i + 1; j < numOfCols; ++j) {
2,147,483,647✔
105
      if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
2,147,483,647✔
106
        qError("The %d col/tag name %s is same with %d col/tag name %s", i, pSchema[i].name, j, pSchema[j].name);
×
107
        return false;
×
108
      }
109
    }
110

111
    rowLen += pSchema[i].bytes;
421,006,264✔
112
  }
113

114
  return rowLen <= maxLen;
67,585,151✔
115
}
116

117
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags, bool isVirtual) {
33,792,319✔
118
  if (!pSchema) {
33,792,319✔
119
    qError("invalid numOfCols: %d", numOfCols);
×
120
    return false;
×
121
  }
122

123
  if ((isVirtual && !VALIDNUMOFCOLSVIRTUAL(numOfCols)) || (!isVirtual && !VALIDNUMOFCOLS(numOfCols))) {
33,792,319✔
124
    qError("invalid numOfCols: %d", numOfCols);
×
125
    return false;
×
126
  }
127

128
  if (!VALIDNUMOFTAGS(numOfTags)) {
33,792,939✔
129
    qError("invalid numOfTags: %d", numOfTags);
620✔
130
    return false;
×
131
  }
132

133
  /* first column must be the timestamp, which is a primary key */
134
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
33,792,319✔
135
    qError("invalid first column type: %d", pSchema[0].type);
×
136
    return false;
×
137
  }
138

139
  if (!doValidateSchema(pSchema, numOfCols, isVirtual ? TSDB_MAX_BYTES_PER_ROW_VIRTUAL : TSDB_MAX_BYTES_PER_ROW)) {
33,792,319✔
140
    qError("validate schema columns failed");
×
141
    return false;
×
142
  }
143

144
  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
33,792,939✔
145
    qError("validate schema tags failed");
×
146
    return false;
×
147
  }
148

149
  return true;
33,792,277✔
150
}
151

152
static STaskQueue taskQueue = {0};
153

154
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
2,147,483,647✔
155
  if(!pSchedMsg || !pSchedMsg->ahandle) return;
2,147,483,647✔
156
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
2,147,483,647✔
157
  (void)execFn(pSchedMsg->thandle);
2,147,483,647✔
158
  taosFreeQitem(pSchedMsg);
2,147,483,647✔
159
}
160

161
int32_t initTaskQueue() {
1,256,971✔
162
  memset(&taskQueue, 0, sizeof(taskQueue));
1,256,971✔
163
  
164
  taskQueue.wrokrerPool.name = "taskWorkPool";
1,256,971✔
165
  taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
1,256,971✔
166
  taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
1,256,971✔
167
  int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
1,256,971✔
168
  if (TSDB_CODE_SUCCESS != coce) {
1,256,971✔
169
    qError("failed to init task thread pool");
×
170
    return -1;
×
171
  }
172

173
  taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
1,256,971✔
174
  if (NULL == taskQueue.pTaskQueue) {
1,256,971✔
175
    qError("failed to init task queue");
×
176
    return -1;
×
177
  }
178

179
  qInfo("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
1,256,971✔
180
  return 0;
1,256,971✔
181
}
182

183
int32_t cleanupTaskQueue() {
1,257,019✔
184
  tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
1,257,019✔
185
  return 0;
1,257,019✔
186
}
187

188
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
2,147,483,647✔
189
  SSchedMsg* pSchedMsg; 
2,147,483,647✔
190
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
2,147,483,647✔
191
  if (rc) return rc;
2,147,483,647✔
192
  pSchedMsg->fp = NULL;
2,147,483,647✔
193
  pSchedMsg->ahandle = execFn;
2,147,483,647✔
194
  pSchedMsg->thandle = execParam;
2,147,483,647✔
195
  pSchedMsg->msg = code;
2,147,483,647✔
196

197
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
2,147,483,647✔
198
}
199

200
int32_t taosAsyncWait() {
29,667✔
201
  if (!taskQueue.wrokrerPool.pCb) {
29,667✔
202
    qError("query task thread pool callback function is null");
×
203
    return -1;
×
204
  }
205
  return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
29,667✔
206
}
207

208
int32_t taosAsyncRecover() {
29,667✔
209
  if (!taskQueue.wrokrerPool.pCb) {
29,667✔
210
    qError("query task thread pool callback function is null");
×
211
    return -1;
×
212
  }
213
  return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
29,667✔
214
}
215

216
int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) {
×
217
  SSchedMsg* pSchedMsg;
×
218
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
×
219
  if (rc) return rc;
×
220
  pSchedMsg->fp = NULL;
×
221
  pSchedMsg->ahandle = bindFn;
×
222
  pSchedMsg->thandle = bindParam;
×
223
  // pSchedMsg->msg = code;
224

225
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
×
226
}
227

228
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
2,147,483,647✔
229
  if (NULL == pMsgBody) {
2,147,483,647✔
230
    return;
×
231
  }
232

233
  
234
  qDebug("ahandle %p freed, QID:0x%" PRIx64, pMsgBody, pMsgBody->requestId);
2,147,483,647✔
235
  
236
  taosMemoryFreeClear(pMsgBody->target.dbFName);
2,147,483,647✔
237
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
2,147,483,647✔
238
  if (pMsgBody->paramFreeFp) {
2,147,483,647✔
239
    (*pMsgBody->paramFreeFp)(pMsgBody->param);
2,147,483,647✔
240
  }
241
  taosMemoryFreeClear(pMsgBody);
2,147,483,647✔
242
}
243
void destroyAhandle(void *ahandle) {
455,490,760✔
244
  SMsgSendInfo *pSendInfo = ahandle;
455,490,760✔
245
  if (pSendInfo == NULL) return;
455,490,760✔
246

247
  if (pSendInfo->streamAHandle) {
455,490,760✔
248
    qDebug("stream ahandle %p freed", pSendInfo);
29,461,418✔
249
  }
250

251
  destroySendMsgInfo(pSendInfo);
455,592,101✔
252
}
253

254
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
2,147,483,647✔
255
                                bool persistHandle, void* rpcCtx) {                         
256
  if (NULL == pTransporter || NULL == epSet || NULL == pInfo) {
2,147,483,647✔
257
    destroySendMsgInfo(pInfo);
×
258
    return TSDB_CODE_TSC_INVALID_INPUT;
×
259
  }
260

261
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
2,147,483,647✔
262
  if (NULL == pMsg) {
2,147,483,647✔
263
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
×
264
    destroySendMsgInfo(pInfo);
×
265
    return terrno;
×
266
  }
267

268
  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
2,147,483,647✔
269
  SRpcMsg rpcMsg = {
2,147,483,647✔
270
    .msgType = pInfo->msgType,
2,147,483,647✔
271
    .pCont = pMsg,
272
    .contLen = pInfo->msgInfo.len,
2,147,483,647✔
273
    .info.ahandle = (void*)pInfo,
274
    .info.handle = pInfo->msgInfo.handle,
2,147,483,647✔
275
    .info.persistHandle = persistHandle,
276
    .code = 0
277
  };
278
  TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
2,147,483,647✔
279
  TRACE_SET_MSGID(&rpcMsg.info.traceId, tGenIdPI64());
2,147,483,647✔
280
  int32_t msgType = pInfo->msgType;
2,147,483,647✔
281

282
  int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
2,147,483,647✔
283
  if (code) {
2,147,483,647✔
284
    destroySendMsgInfo(pInfo);
226,598✔
285
  } else {
286
    qDebug("msg %s sent, 0x%" PRIx64 ":0x%" PRIx64, TMSG_INFO(msgType), TRACE_GET_ROOTID(&rpcMsg.info.traceId), TRACE_GET_MSGID(&rpcMsg.info.traceId));
2,147,483,647✔
287
  }
288
  return code;
2,147,483,647✔
289
}
290

291
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
474,836,507✔
292
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
474,836,507✔
293
}
294

295
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
343,975,514✔
296
  QUERY_PARAM_CHECK(pTransporter);
343,975,514✔
297
  return rpcFreeConnById(pTransporter, pid);
343,975,514✔
298
}
299

300
char* jobTaskStatusStr(int32_t status) {
2,147,483,647✔
301
  switch (status) {
2,147,483,647✔
302
    case JOB_TASK_STATUS_NULL:
841,543,565✔
303
      return "NULL";
841,543,565✔
304
    case JOB_TASK_STATUS_INIT:
1,781,752,657✔
305
      return "INIT";
1,781,752,657✔
306
    case JOB_TASK_STATUS_EXEC:
2,147,483,647✔
307
      return "EXECUTING";
2,147,483,647✔
308
    case JOB_TASK_STATUS_PART_SUCC:
1,810,494,208✔
309
      return "PARTIAL_SUCCEED";
1,810,494,208✔
310
    case JOB_TASK_STATUS_FETCH:
202,488,695✔
311
      return "FETCHING";
202,488,695✔
312
    case JOB_TASK_STATUS_SUCC:
416,588,743✔
313
      return "SUCCEED";
416,588,743✔
314
    case JOB_TASK_STATUS_FAIL:
15,927,659✔
315
      return "FAILED";
15,927,659✔
316
    case JOB_TASK_STATUS_DROP:
851,395,747✔
317
      return "DROPPING";
851,395,747✔
UNCOV
318
    default:
×
UNCOV
319
      break;
×
320
  }
321

UNCOV
322
  return "UNKNOWN";
×
323
}
324

325
#if 0
326
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
327
  SSchema s = {0};
328
  s.type = type;
329
  s.bytes = bytes;
330
  s.colId = colId;
331

332
  tstrncpy(s.name, name, tListLen(s.name));
333
  return s;
334
}
335
#endif
336

337
void freeSTableMetaRspPointer(void *p) {
42,974,764✔
338
  tFreeSTableMetaRsp(*(void**)p);
42,974,764✔
339
  taosMemoryFreeClear(*(void**)p);
42,972,030✔
340
}
42,970,857✔
341

342
void destroyQueryExecRes(SExecResult* pRes) {
2,147,483,647✔
343
  if (NULL == pRes || NULL == pRes->res) {
2,147,483,647✔
344
    return;
1,769,040,165✔
345
  }
346

347
  switch (pRes->msgType) {
785,216,285✔
348
    case TDMT_VND_CREATE_TABLE: {
36,615,809✔
349
      taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
36,615,809✔
350
      break;
36,613,955✔
351
    }
352
    case TDMT_MND_CREATE_STB:
9,839,828✔
353
    case TDMT_VND_ALTER_TABLE:
354
    case TDMT_MND_ALTER_STB: {
355
      tFreeSTableMetaRsp(pRes->res);
9,839,828✔
356
      taosMemoryFreeClear(pRes->res);
9,839,828✔
357
      break;
9,839,828✔
358
    }
359
    case TDMT_VND_SUBMIT: {
549,490,073✔
360
      tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
549,490,073✔
361
      taosMemoryFreeClear(pRes->res);
549,487,864✔
362
      break;
549,487,471✔
363
    }
364
    case TDMT_SCH_QUERY:
189,267,365✔
365
    case TDMT_SCH_MERGE_QUERY: {
366
      qDebug("query execRes %p freed", pRes->res);
189,267,365✔
367
      taosArrayDestroy((SArray*)pRes->res);
189,267,365✔
368
      break;
189,267,188✔
369
    }
370
    default:
76✔
371
      qError("invalid exec result for request type:%d", pRes->msgType);
76✔
372
  }
373
}
374
// clang-format on
375
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
60,362✔
376
  QUERY_PARAM_CHECK(str);
60,362✔
377
  QUERY_PARAM_CHECK(buf);
60,362✔
378
  int32_t n = 0;
60,362✔
379

380
  switch (type) {
60,362✔
381
    case TSDB_DATA_TYPE_NULL:
×
382
      n = tsnprintf(str, capacity, "null");
×
383
      break;
×
384

385
    case TSDB_DATA_TYPE_BOOL:
2,267✔
386
      n = tsnprintf(str, capacity, (*(int8_t*)buf) ? "true" : "false");
2,267✔
387
      break;
2,267✔
388

389
    case TSDB_DATA_TYPE_TINYINT:
167✔
390
      n = tsnprintf(str, capacity, "%d", *(int8_t*)buf);
167✔
391
      break;
167✔
392

393
    case TSDB_DATA_TYPE_SMALLINT:
167✔
394
      n = tsnprintf(str, capacity, "%d", *(int16_t*)buf);
167✔
395
      break;
167✔
396

397
    case TSDB_DATA_TYPE_INT:
14,016✔
398
      n = tsnprintf(str, capacity, "%d", *(int32_t*)buf);
14,016✔
399
      break;
14,016✔
400

401
    case TSDB_DATA_TYPE_BIGINT:
1,075✔
402
    case TSDB_DATA_TYPE_TIMESTAMP:
403
      n = tsnprintf(str, capacity, "%" PRId64, *(int64_t*)buf);
1,075✔
404
      break;
1,075✔
405

406
    case TSDB_DATA_TYPE_FLOAT:
1,526✔
407
      n = tsnprintf(str, capacity, "%e", GET_FLOAT_VAL(buf));
1,526✔
408
      break;
1,526✔
409

410
    case TSDB_DATA_TYPE_DOUBLE:
785✔
411
      n = tsnprintf(str, capacity, "%e", GET_DOUBLE_VAL(buf));
785✔
412
      break;
785✔
413

414
    case TSDB_DATA_TYPE_VARBINARY: {
×
415
      if (bufSize < 0) {
×
416
        //        tscError("invalid buf size");
417
        return TSDB_CODE_TSC_INVALID_VALUE;
×
418
      }
419
      void*    data = NULL;
×
420
      uint32_t size = 0;
×
421
      if (taosAscii2Hex(buf, bufSize, &data, &size) < 0) {
×
422
        return TSDB_CODE_OUT_OF_MEMORY;
×
423
      }
424
      *str = '"';
×
425
      memcpy(str + 1, data, size);
×
426
      *(str + size + 1) = '"';
×
427
      n = size + 2;
×
428
      taosMemoryFree(data);
×
429
      break;
×
430
    }
431
    case TSDB_DATA_TYPE_BINARY:
4,245✔
432
    case TSDB_DATA_TYPE_GEOMETRY:
433
      if (bufSize < 0) {
4,245✔
434
        //        tscError("invalid buf size");
435
        return TSDB_CODE_TSC_INVALID_VALUE;
×
436
      }
437

438
      *str = '"';
4,245✔
439
      memcpy(str + 1, buf, bufSize);
4,245✔
440
      *(str + bufSize + 1) = '"';
4,245✔
441
      n = bufSize + 2;
4,245✔
442
      break;
4,245✔
443
    case TSDB_DATA_TYPE_NCHAR:
32,422✔
444
      if (bufSize < 0) {
32,422✔
445
        //        tscError("invalid buf size");
446
        return TSDB_CODE_TSC_INVALID_VALUE;
×
447
      }
448

449
      *str = '"';
32,422✔
450
      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1, NULL);
32,422✔
451
      if (length < 0) {
32,422✔
452
        return TSDB_CODE_TSC_INVALID_VALUE;
×
453
      }
454
      *(str + length + 1) = '"';
32,422✔
455
      n = length + 2;
32,422✔
456
      break;
32,422✔
457
    case TSDB_DATA_TYPE_UTINYINT:
167✔
458
      n = tsnprintf(str, capacity, "%d", *(uint8_t*)buf);
167✔
459
      break;
167✔
460

461
    case TSDB_DATA_TYPE_USMALLINT:
167✔
462
      n = tsnprintf(str, capacity, "%d", *(uint16_t*)buf);
167✔
463
      break;
167✔
464

465
    case TSDB_DATA_TYPE_UINT:
167✔
466
      n = tsnprintf(str, capacity, "%u", *(uint32_t*)buf);
167✔
467
      break;
167✔
468

469
    case TSDB_DATA_TYPE_UBIGINT:
3,191✔
470
      n = tsnprintf(str, capacity, "%" PRIu64, *(uint64_t*)buf);
3,191✔
471
      break;
3,191✔
472

473
    default:
×
474
      //      tscError("unsupported type:%d", type);
475
      return TSDB_CODE_TSC_INVALID_VALUE;
×
476
  }
477

478
  if (len) *len = n;
60,362✔
479

480
  return TSDB_CODE_SUCCESS;
60,362✔
481
}
482

483
void parseTagDatatoJson(void* p, char** jsonStr, void* charsetCxt) {
335,656✔
484
  if (!p || !jsonStr) {
335,656✔
485
    qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
×
486
    return;
×
487
  }
488
  char*   string = NULL;
335,656✔
489
  SArray* pTagVals = NULL;
335,656✔
490
  cJSON*  json = NULL;
335,656✔
491
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
335,656✔
492
    goto end;
×
493
  }
494

495
  int16_t nCols = taosArrayGetSize(pTagVals);
335,656✔
496
  if (nCols == 0) {
335,656✔
497
    goto end;
127,432✔
498
  }
499
  char tagJsonKey[256] = {0};
208,224✔
500
  json = cJSON_CreateObject();
208,224✔
501
  if (json == NULL) {
208,224✔
502
    goto end;
×
503
  }
504
  for (int j = 0; j < nCols; ++j) {
660,176✔
505
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
451,952✔
506
    if (pTagVal == NULL) {
451,952✔
507
      continue;
×
508
    }
509
    // json key  encode by binary
510
    tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
451,952✔
511
    // json value
512
    char type = pTagVal->type;
451,952✔
513
    if (type == TSDB_DATA_TYPE_NULL) {
451,952✔
514
      cJSON* value = cJSON_CreateNull();
27,694✔
515
      if (value == NULL) {
27,694✔
516
        goto end;
×
517
      }
518
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
27,694✔
519
        goto end;
×
520
      }
521
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
424,258✔
522
      cJSON* value = NULL;
331,319✔
523
      if (pTagVal->nData > 0) {
331,319✔
524
        char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
309,527✔
525
        if (tagJsonValue == NULL) {
309,527✔
526
          goto end;
×
527
        }
528
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue, charsetCxt);
309,527✔
529
        if (length < 0) {
309,527✔
530
          qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC,
×
531
                 charsetCxt != NULL ? ((SConvInfo*)(charsetCxt))->charset : tsCharset, pTagVal->pData);
532
          taosMemoryFree(tagJsonValue);
×
533
          goto end;
×
534
        }
535
        value = cJSON_CreateString(tagJsonValue);
309,527✔
536
        taosMemoryFree(tagJsonValue);
309,527✔
537
        if (value == NULL) {
309,527✔
538
          goto end;
×
539
        }
540
      } else if (pTagVal->nData == 0) {
21,792✔
541
        value = cJSON_CreateString("");
21,792✔
542
        if (value == NULL) {
21,792✔
543
          goto end;
×
544
        }
545
      } else {
546
        goto end;
×
547
      }
548

549
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
331,319✔
550
        goto end;
×
551
      }
552
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
92,939✔
553
      double jsonVd = *(double*)(&pTagVal->i64);
64,337✔
554
      cJSON* value = cJSON_CreateNumber(jsonVd);
64,337✔
555
      if (value == NULL) {
64,337✔
556
        goto end;
×
557
      }
558
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
64,337✔
559
        goto end;
×
560
      }
561
    } else if (type == TSDB_DATA_TYPE_BOOL) {
28,602✔
562
      char   jsonVd = *(char*)(&pTagVal->i64);
28,602✔
563
      cJSON* value = cJSON_CreateBool(jsonVd);
28,602✔
564
      if (value == NULL) {
28,602✔
565
        goto end;
×
566
      }
567
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
28,602✔
568
        goto end;
×
569
      }
570
    } else {
571
      goto end;
×
572
    }
573
  }
574
  string = cJSON_PrintUnformatted(json);
208,224✔
575
end:
335,656✔
576
  cJSON_Delete(json);
335,656✔
577
  taosArrayDestroy(pTagVals);
335,656✔
578
  if (string == NULL) {
335,656✔
579
    string = taosStrdup(TSDB_DATA_NULL_STR_L);
127,432✔
580
    if (string == NULL) {
127,432✔
581
      qError("failed to strdup null string");
×
582
    }
583
  }
584
  *jsonStr = string;
335,656✔
585
}
586

587
int32_t setColRef(SColRef* colRef, col_id_t colId, char* refColName, char* refTableName, char* refDbName) {
126,266,428✔
588
  colRef->id = colId;
126,266,428✔
589
  colRef->hasRef = true;
126,266,428✔
590
  tstrncpy(colRef->refDbName, refDbName, TSDB_DB_NAME_LEN);
126,266,428✔
591
  tstrncpy(colRef->refTableName, refTableName, TSDB_TABLE_NAME_LEN);
126,266,428✔
592
  tstrncpy(colRef->refColName, refColName, TSDB_COL_NAME_LEN);
126,266,428✔
593
  return TSDB_CODE_SUCCESS;
126,266,428✔
594
}
595

596
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
37,066,274✔
597
  QUERY_PARAM_CHECK(pDst);
37,066,274✔
598
  if (NULL == pSrc) {
37,066,274✔
599
    *pDst = NULL;
68,067✔
600
    return TSDB_CODE_SUCCESS;
68,067✔
601
  }
602

603
  int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
36,998,207✔
604
  if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
37,001,537✔
605
    *pDst = NULL;
4,995✔
606
    qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
×
607
    return TSDB_CODE_INVALID_PARA;
×
608
  }
609

610
  int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
36,996,542✔
611
  int32_t schemaExtSize = 0;
36,996,542✔
612
  int32_t colRefSize = 0;
36,996,542✔
613
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
36,996,542✔
614
    schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
36,411,018✔
615
  }
616
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
36,991,892✔
617
    colRefSize = pSrc->numOfColRefs * sizeof(SColRef);
85,245✔
618
  }
619
  *pDst = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize);
36,997,208✔
620
  if (NULL == *pDst) {
36,980,903✔
621
    return terrno;
×
622
  }
623
  memcpy(*pDst, pSrc, metaSize);
36,980,903✔
624
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
36,981,569✔
625
    (*pDst)->schemaExt = (SSchemaExt*)((char*)*pDst + metaSize);
36,414,027✔
626
    memcpy((*pDst)->schemaExt, pSrc->schemaExt, schemaExtSize);
36,413,361✔
627
  } else {
628
    (*pDst)->schemaExt = NULL;
590,174✔
629
  }
630
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
36,996,875✔
631
    (*pDst)->colRef = (SColRef*)((char*)*pDst + metaSize + schemaExtSize);
85,245✔
632
    memcpy((*pDst)->colRef, pSrc->colRef, colRefSize);
85,245✔
633
  } else {
634
    (*pDst)->colRef = NULL;
36,916,970✔
635
  }
636

637
  return TSDB_CODE_SUCCESS;
37,001,882✔
638
}
639

640
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
5,796✔
641
  if (!pMeta || !pName || !pType) return;
5,796✔
642
  int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
5,796✔
643
  for (int32_t i = 0; i < nums; ++i) {
36,344✔
644
    if (0 == strcmp(pName, pMeta->schema[i].name)) {
35,992✔
645
      *pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
5,444✔
646
      return;
5,444✔
647
    }
648
  }
649

650
  *pType = TCOL_TYPE_NONE;
352✔
651
}
652

653
void freeVgInfo(SDBVgInfo* vgInfo) {
40,264,534✔
654
  if (NULL == vgInfo) {
40,264,534✔
655
    return;
9,885,985✔
656
  }
657

658
  taosHashCleanup(vgInfo->vgHash);
30,378,549✔
659
  taosArrayDestroy(vgInfo->vgArray);
30,378,602✔
660

661
  taosMemoryFreeClear(vgInfo);
30,378,602✔
662
}
663

664
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
4,216,913✔
665
  QUERY_PARAM_CHECK(pDst);
4,216,913✔
666
  if (NULL == pSrc) {
4,216,913✔
667
    *pDst = NULL;
×
668
    return TSDB_CODE_SUCCESS;
×
669
  }
670

671
  *pDst = taosMemoryMalloc(sizeof(*pSrc));
4,216,913✔
672
  if (NULL == *pDst) {
4,216,913✔
673
    return terrno;
×
674
  }
675
  memcpy(*pDst, pSrc, sizeof(*pSrc));
4,216,913✔
676
  (*pDst)->vgArray = NULL;
4,216,913✔
677

678
  if (pSrc->vgHash) {
4,216,913✔
679
    (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
3,684,023✔
680
                                   HASH_ENTRY_LOCK);
681
    if (NULL == (*pDst)->vgHash) {
3,684,023✔
682
      taosMemoryFreeClear(*pDst);
×
683
      return terrno;
×
684
    }
685

686
    SVgroupInfo* vgInfo = NULL;
3,684,023✔
687
    void*        pIter = taosHashIterate(pSrc->vgHash, NULL);
3,684,023✔
688
    while (pIter) {
10,999,210✔
689
      vgInfo = pIter;
7,315,187✔
690
      int32_t* vgId = taosHashGetKey(pIter, NULL);
7,315,187✔
691

692
      if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
7,315,187✔
693
        qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
×
694
        taosHashCancelIterate(pSrc->vgHash, pIter);
×
695
        freeVgInfo(*pDst);
×
696
        return terrno;
×
697
      }
698

699
      pIter = taosHashIterate(pSrc->vgHash, pIter);
7,315,187✔
700
    }
701
  }
702

703
  return TSDB_CODE_SUCCESS;
4,216,913✔
704
}
705

706
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
7,542,087✔
707
  QUERY_PARAM_CHECK(pDst);
7,542,087✔
708
  if (NULL == pSrc) {
7,542,087✔
709
    *pDst = NULL;
112,465✔
710
    return TSDB_CODE_SUCCESS;
112,465✔
711
  }
712

713
  *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
7,429,622✔
714
  if (NULL == *pDst) {
7,425,293✔
715
    return terrno;
×
716
  }
717

718
  (*pDst)->flags = pSrc->flags;
7,425,293✔
719
  if (pSrc->name) {
7,427,291✔
720
    (*pDst)->name = taosStrdup(pSrc->name);
7,428,956✔
721
    if (NULL == (*pDst)->name) goto _exit;
7,431,958✔
722
  }
723
  (*pDst)->uid = pSrc->uid;
7,431,248✔
724
  (*pDst)->btime = pSrc->btime;
7,429,583✔
725
  (*pDst)->ttl = pSrc->ttl;
7,430,582✔
726
  (*pDst)->commentLen = pSrc->commentLen;
7,431,625✔
727
  if (pSrc->comment) {
7,430,915✔
728
    (*pDst)->comment = taosStrdup(pSrc->comment);
×
729
    if (NULL == (*pDst)->comment) goto _exit;
52✔
730
  }
731
  (*pDst)->type = pSrc->type;
7,431,344✔
732

733
  if (pSrc->type == TSDB_CHILD_TABLE) {
7,430,967✔
734
    if (pSrc->ctb.stbName) {
7,430,634✔
735
      (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
7,432,010✔
736
      if (NULL == (*pDst)->ctb.stbName) goto _exit;
7,431,261✔
737
    }
738
    (*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
7,431,550✔
739
    (*pDst)->ctb.suid = pSrc->ctb.suid;
7,430,928✔
740
    if (pSrc->ctb.tagName) {
7,431,594✔
741
      (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
7,431,274✔
742
      if (NULL == (*pDst)->ctb.tagName) goto _exit;
7,431,620✔
743
    }
744
    STag* pTag = (STag*)pSrc->ctb.pTag;
7,432,983✔
745
    if (pTag) {
7,432,273✔
746
      (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
7,430,941✔
747
      if (NULL == (*pDst)->ctb.pTag) goto _exit;
7,431,331✔
748
      memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
7,430,332✔
749
    }
750
  } else {
751
    (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
×
752
    (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
×
753
    if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
×
754
      (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
755
      if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
×
756
      memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
757
    }
758
  }
759

760
  return TSDB_CODE_SUCCESS;
7,428,956✔
761

762
_exit:
×
763
  tdDestroySVCreateTbReq(*pDst);
×
764
  taosMemoryFree(*pDst);
×
765
  *pDst = NULL;
×
766
  return terrno;
×
767
}
768

769
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
12,638,614✔
770
  if (pInfo) {
12,638,614✔
771
    taosArrayDestroy(pInfo->pRetensions);
3,869,783✔
772
  }
773
  taosMemoryFree(pInfo);
12,638,614✔
774
}
12,638,614✔
775

776
void* getTaskPoolWorkerCb() { return taskQueue.wrokrerPool.pCb; }
761,190,137✔
777

778
void tFreeStreamVtbOtbInfo(void* param);
779
void tFreeStreamVtbVtbInfo(void* param);
780
void tFreeStreamVtbDbVgInfo(void* param);
781

782
void destroySTagsInfo(STagsInfo* pInfo) {
1,776,323,711✔
783
  if (NULL == pInfo) {
1,776,323,711✔
784
    return;
1,776,327,591✔
785
  }
UNCOV
786
  taosArrayDestroy(pInfo->STagNames);
×
787

788
  for (int i = 0; i < taosArrayGetSize(pInfo->pTagVals); ++i) {
×
789
    STagVal* p = (STagVal*)taosArrayGet(pInfo->pTagVals, i);
×
790
    if (IS_VAR_DATA_TYPE(p->type)) {
×
791
      taosMemoryFreeClear(p->pData);
×
792
    }
793
  }
794
  taosArrayDestroy(pInfo->pTagVals);
×
795
  taosMemoryFreeClear(pInfo->pTagIndex);
×
796
  taosMemoryFreeClear(pInfo);
×
797
}
798

799
void qDestroyBoundColInfo(void* pInfo) {
1,777,065,206✔
800
  if (NULL == pInfo) {
1,777,065,206✔
801
    return;
751,973✔
802
  }
803
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
1,776,313,233✔
804

805
  taosMemoryFreeClear(pBoundInfo->pColIndex);
1,776,313,233✔
806
  destroySTagsInfo(pBoundInfo->parseredTags);
1,776,329,244✔
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc