• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4983

13 Mar 2026 03:38AM UTC coverage: 68.653% (+0.07%) from 68.587%
#4983

push

travis-ci

web-flow
feat/6641435300-save-audit-in-self (#34738)

434 of 584 new or added lines in 10 files covered. (74.32%)

434 existing lines in 121 files now uncovered.

212745 of 309883 relevant lines covered (68.65%)

134272959.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.99
/source/libs/qcom/src/queryUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "query.h"
18
#include "tglobal.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21
#include "tsched.h"
22
#include "tworker.h"
23
// clang-format off
24
#include "cJSON.h"
25
#include "queryInt.h"
26

27
typedef struct STaskQueue {
28
  SQueryAutoQWorkerPool wrokrerPool;
29
  STaosQueue* pTaskQueue;
30
} STaskQueue;
31

32
int32_t getAsofJoinReverseOp(EOperatorType op) {
105,751✔
33
  switch (op) {
105,751✔
34
    case OP_TYPE_GREATER_THAN:
3,104✔
35
      return OP_TYPE_LOWER_THAN;
3,104✔
36
    case OP_TYPE_GREATER_EQUAL:
3,104✔
37
      return OP_TYPE_LOWER_EQUAL;
3,104✔
38
    case OP_TYPE_LOWER_THAN:
5,116✔
39
      return OP_TYPE_GREATER_THAN;
5,116✔
40
    case OP_TYPE_LOWER_EQUAL:
6,596✔
41
      return OP_TYPE_GREATER_EQUAL;
6,596✔
42
    case OP_TYPE_EQUAL:
87,831✔
43
      return OP_TYPE_EQUAL;
87,831✔
44
    default:
×
45
      break;
×
46
  }
47

48
  return -1;
×
49
}
50

51
const SSchema* tGetTbnameColumnSchema() { 
×
52
  static struct SSchema _s = {
53
      .colId = TSDB_TBNAME_COLUMN_INDEX,
54
      .type = TSDB_DATA_TYPE_BINARY,
55
      .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
56
      .name = "tbname",
57
  };
58
  return &_s; 
×
59
}
60

61
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
71,359,075✔
62
  if (!pSchema) {
71,359,075✔
63
    return false;
×
64
  }
65
  int32_t    rowLen = 0;
71,359,075✔
66
  SSHashObj* pNameHash = tSimpleHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
71,359,075✔
67
  if (!pNameHash) {
71,359,088✔
68
    return false;
×
69
  }
70

71
  for (int32_t i = 0; i < numOfCols; ++i) {
518,718,488✔
72
    // 1. valid types
73
    if (!isValidDataType(pSchema[i].type)) {
447,359,413✔
74
      qError("The %d col/tag data type error, type:%d", i, pSchema[i].type);
×
75
      goto _error;
×
76
    }
77

78
    // 2. valid length for each type
79
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY) {
447,359,413✔
80
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
28,725,921✔
81
        qError("The %d col/tag var data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
82
        goto _error;
×
83
      }
84
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
418,633,492✔
85
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
36,492,275✔
86
        qError("The %d col/tag nchar data len error, len:%d", i, pSchema[i].bytes);
×
87
        goto _error;
×
88
      }
89
    } else if (pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
382,140,566✔
90
      if (pSchema[i].bytes > TSDB_MAX_GEOMETRY_LEN) {
346,341✔
91
        qError("The %d col/tag geometry data len error, len:%d", i, pSchema[i].bytes);
×
92
        goto _error;
×
93
      }
94
    } else if (IS_STR_DATA_BLOB(pSchema[i].type)) {
381,794,224✔
95
      if (pSchema[i].bytes >= TSDB_MAX_BLOB_LEN) {
28,859✔
96
        qError("The %d col/tag blob data len error, len:%d", i, pSchema[i].bytes);
×
97
        goto _error;
×
98
      } 
99

100
    } else {
101
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
381,766,667✔
102
        qError("The %d col/tag data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
103
        goto _error;
×
104
      }
105
    }
106

107
    // 3. valid column names
108
    size_t   nameLen = strnlen(pSchema[i].name, sizeof(pSchema[i].name));
447,360,064✔
109
    int32_t  nameIdx = i;
447,359,024✔
110
    int32_t* pIdx = tSimpleHashGet(pNameHash, pSchema[i].name, nameLen);
447,358,762✔
111
    if (pIdx != NULL) {
447,357,191✔
112
      qError("The %d col/tag name %s is same with %d col/tag name %s", i, pSchema[i].name, *pIdx,
×
113
             pSchema[*pIdx].name);
114
      goto _error;
×
115
    }
116
    if (tSimpleHashPut(pNameHash, pSchema[i].name, nameLen, &nameIdx, sizeof(nameIdx)) != TSDB_CODE_SUCCESS) {
447,357,191✔
117
      goto _error;
×
118
    }
119

120
    rowLen += pSchema[i].bytes;
447,358,098✔
121
  }
122

123
  tSimpleHashCleanup(pNameHash);
71,359,075✔
124
  return rowLen <= maxLen;
71,359,088✔
125
_error:
×
126
  tSimpleHashCleanup(pNameHash);
×
127
  return false;
×
128
}
129

130
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags, bool isVirtual) {
35,678,893✔
131
  if (!pSchema) {
35,678,893✔
132
    qError("invalid numOfCols: %d", numOfCols);
×
133
    return false;
×
134
  }
135

136
  if ((isVirtual && !VALIDNUMOFCOLSVIRTUAL(numOfCols)) || (!isVirtual && !VALIDNUMOFCOLS(numOfCols))) {
35,678,893✔
137
    qError("invalid numOfCols: %d", numOfCols);
×
138
    return false;
×
139
  }
140

141
  if (!VALIDNUMOFTAGS(numOfTags)) {
35,678,893✔
142
    qError("invalid numOfTags: %d", numOfTags);
35✔
143
    return false;
×
144
  }
145

146
  /* first column must be the timestamp, which is a primary key */
147
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
35,678,858✔
UNCOV
148
    qError("invalid first column type: %d", pSchema[0].type);
×
149
    return false;
×
150
  }
151

152
  if (!doValidateSchema(pSchema, numOfCols, isVirtual ? TSDB_MAX_BYTES_PER_ROW_VIRTUAL : TSDB_MAX_BYTES_PER_ROW)) {
35,679,544✔
153
    qError("validate schema columns failed");
×
154
    return false;
×
155
  }
156

157
  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
35,679,544✔
158
    qError("validate schema tags failed");
×
159
    return false;
×
160
  }
161

162
  return true;
35,679,544✔
163
}
164

165
static STaskQueue taskQueue = {0};
166

167
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
2,147,483,647✔
168
  if(!pSchedMsg || !pSchedMsg->ahandle) return;
2,147,483,647✔
169
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
2,147,483,647✔
170
  (void)execFn(pSchedMsg->thandle);
2,147,483,647✔
171
  taosFreeQitem(pSchedMsg);
2,147,483,647✔
172
}
173

174
int32_t initTaskQueue() {
1,325,728✔
175
  memset(&taskQueue, 0, sizeof(taskQueue));
1,325,728✔
176
  
177
  taskQueue.wrokrerPool.name = "taskWorkPool";
1,325,728✔
178
  taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
1,325,728✔
179
  taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
1,325,728✔
180
  int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
1,325,728✔
181
  if (TSDB_CODE_SUCCESS != coce) {
1,325,728✔
182
    qError("failed to init task thread pool");
×
183
    return -1;
×
184
  }
185

186
  taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
1,325,728✔
187
  if (NULL == taskQueue.pTaskQueue) {
1,325,728✔
188
    qError("failed to init task queue");
×
189
    return -1;
×
190
  }
191

192
  qInfo("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
1,325,728✔
193
  return 0;
1,325,728✔
194
}
195

196
int32_t cleanupTaskQueue() {
1,325,783✔
197
  tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
1,325,783✔
198
  return 0;
1,325,783✔
199
}
200

201
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
2,147,483,647✔
202
  SSchedMsg* pSchedMsg; 
2,147,483,647✔
203
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
2,147,483,647✔
204
  if (rc) return rc;
2,147,483,647✔
205
  pSchedMsg->fp = NULL;
2,147,483,647✔
206
  pSchedMsg->ahandle = execFn;
2,147,483,647✔
207
  pSchedMsg->thandle = execParam;
2,147,483,647✔
208
  pSchedMsg->msg = code;
2,147,483,647✔
209

210
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
2,147,483,647✔
211
}
212

213
int32_t taosAsyncWait() {
510,556✔
214
  if (!taskQueue.wrokrerPool.pCb) {
510,556✔
215
    qError("query task thread pool callback function is null");
×
216
    return -1;
×
217
  }
218
  return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
510,556✔
219
}
220

221
int32_t taosAsyncRecover() {
510,556✔
222
  if (!taskQueue.wrokrerPool.pCb) {
510,556✔
223
    qError("query task thread pool callback function is null");
×
224
    return -1;
×
225
  }
226
  return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
510,556✔
227
}
228

229
int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) {
×
230
  SSchedMsg* pSchedMsg;
×
231
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
×
232
  if (rc) return rc;
×
233
  pSchedMsg->fp = NULL;
×
234
  pSchedMsg->ahandle = bindFn;
×
235
  pSchedMsg->thandle = bindParam;
×
236
  // pSchedMsg->msg = code;
237

238
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
×
239
}
240

241
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
2,147,483,647✔
242
  if (NULL == pMsgBody) {
2,147,483,647✔
243
    return;
×
244
  }
245

246
  
247
  qDebug("ahandle %p freed, QID:0x%" PRIx64, pMsgBody, pMsgBody->requestId);
2,147,483,647✔
248
  
249
  taosMemoryFreeClear(pMsgBody->target.dbFName);
2,147,483,647✔
250
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
2,147,483,647✔
251
  if (pMsgBody->paramFreeFp) {
2,147,483,647✔
252
    (*pMsgBody->paramFreeFp)(pMsgBody->param);
2,147,483,647✔
253
  }
254
  taosMemoryFreeClear(pMsgBody);
2,147,483,647✔
255
}
256
void destroyAhandle(void *ahandle) {
482,462,560✔
257
  SMsgSendInfo *pSendInfo = ahandle;
482,462,560✔
258
  if (pSendInfo == NULL) return;
482,462,560✔
259

260
  if (pSendInfo->streamAHandle) {
482,461,066✔
261
    qDebug("stream ahandle %p freed", pSendInfo);
34,890,343✔
262
  }
263

264
  destroySendMsgInfo(pSendInfo);
482,474,690✔
265
}
266

267
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
2,147,483,647✔
268
                                bool persistHandle, void* rpcCtx) {                         
269
  if (NULL == pTransporter || NULL == epSet || NULL == pInfo) {
2,147,483,647✔
270
    destroySendMsgInfo(pInfo);
23✔
271
    return TSDB_CODE_TSC_INVALID_INPUT;
×
272
  }
273

274
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
2,147,483,647✔
275
  if (NULL == pMsg) {
2,147,483,647✔
276
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
×
277
    destroySendMsgInfo(pInfo);
×
278
    return terrno;
×
279
  }
280

281
  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
2,147,483,647✔
282
  SRpcMsg rpcMsg = {
2,147,483,647✔
283
    .msgType = pInfo->msgType,
2,147,483,647✔
284
    .pCont = pMsg,
285
    .contLen = pInfo->msgInfo.len,
2,147,483,647✔
286
    .info.ahandle = (void*)pInfo,
287
    .info.handle = pInfo->msgInfo.handle,
2,147,483,647✔
288
    .info.persistHandle = persistHandle,
289
    .code = 0
290
  };
291
  TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
2,147,483,647✔
292
  TRACE_SET_MSGID(&rpcMsg.info.traceId, tGenIdPI64());
2,147,483,647✔
293
  int32_t msgType = pInfo->msgType;
2,147,483,647✔
294

295
  int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
2,147,483,647✔
296
  if (code) {
2,147,483,647✔
297
    destroySendMsgInfo(pInfo);
246,113✔
298
  } else {
299
    qDebug("msg %s sent, 0x%" PRIx64 ":0x%" PRIx64, TMSG_INFO(msgType), TRACE_GET_ROOTID(&rpcMsg.info.traceId), TRACE_GET_MSGID(&rpcMsg.info.traceId));
2,147,483,647✔
300
  }
301
  return code;
2,147,483,647✔
302
}
303

304
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
565,448,043✔
305
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
565,448,043✔
306
}
307

308
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
260,219,813✔
309
  QUERY_PARAM_CHECK(pTransporter);
260,219,813✔
310
  return rpcFreeConnById(pTransporter, pid);
260,219,813✔
311
}
312

313
char* jobTaskStatusStr(int32_t status) {
2,147,483,647✔
314
  switch (status) {
2,147,483,647✔
315
    case JOB_TASK_STATUS_NULL:
861,884,831✔
316
      return "NULL";
861,884,831✔
317
    case JOB_TASK_STATUS_INIT:
1,876,584,615✔
318
      return "INIT";
1,876,584,615✔
319
    case JOB_TASK_STATUS_EXEC:
2,147,483,647✔
320
      return "EXECUTING";
2,147,483,647✔
321
    case JOB_TASK_STATUS_PART_SUCC:
2,062,374,716✔
322
      return "PARTIAL_SUCCEED";
2,062,374,716✔
323
    case JOB_TASK_STATUS_FETCH:
210,131,929✔
324
      return "FETCHING";
210,131,929✔
325
    case JOB_TASK_STATUS_SUCC:
446,650,907✔
326
      return "SUCCEED";
446,650,907✔
327
    case JOB_TASK_STATUS_FAIL:
7,564,833✔
328
      return "FAILED";
7,564,833✔
329
    case JOB_TASK_STATUS_DROP:
882,518,378✔
330
      return "DROPPING";
882,518,378✔
UNCOV
331
    default:
×
UNCOV
332
      break;
×
333
  }
334

UNCOV
335
  return "UNKNOWN";
×
336
}
337

338
#if 0
339
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
340
  SSchema s = {0};
341
  s.type = type;
342
  s.bytes = bytes;
343
  s.colId = colId;
344

345
  tstrncpy(s.name, name, tListLen(s.name));
346
  return s;
347
}
348
#endif
349

350
void freeSTableMetaRspPointer(void *p) {
57,310,677✔
351
  tFreeSTableMetaRsp(*(void**)p);
57,310,677✔
352
  taosMemoryFreeClear(*(void**)p);
57,300,782✔
353
}
57,297,550✔
354

355
void destroyQueryExecRes(SExecResult* pRes) {
2,147,483,647✔
356
  if (NULL == pRes || NULL == pRes->res) {
2,147,483,647✔
357
    return;
2,054,801,232✔
358
  }
359

360
  switch (pRes->msgType) {
856,297,188✔
361
    case TDMT_VND_CREATE_TABLE: {
50,267,822✔
362
      taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
50,267,822✔
363
      break;
50,261,682✔
364
    }
365
    case TDMT_MND_CREATE_STB:
10,252,642✔
366
    case TDMT_VND_ALTER_TABLE:
367
    case TDMT_MND_ALTER_STB: {
368
      tFreeSTableMetaRsp(pRes->res);
10,252,642✔
369
      taosMemoryFreeClear(pRes->res);
10,252,642✔
370
      break;
10,252,642✔
371
    }
372
    case TDMT_VND_SUBMIT: {
576,778,597✔
373
      tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
576,778,597✔
374
      taosMemoryFreeClear(pRes->res);
576,772,638✔
375
      break;
576,777,201✔
376
    }
377
    case TDMT_SCH_QUERY:
218,993,804✔
378
    case TDMT_SCH_MERGE_QUERY: {
379
      qDebug("query execRes %p freed", pRes->res);
218,993,804✔
380
      taosArrayDestroy((SArray*)pRes->res);
218,993,804✔
381
      break;
218,993,702✔
382
    }
383
    default:
282✔
384
      qError("invalid exec result for request type:%d", pRes->msgType);
282✔
385
  }
386
}
387
// clang-format on
388
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
84,068✔
389
  QUERY_PARAM_CHECK(str);
84,068✔
390
  QUERY_PARAM_CHECK(buf);
84,068✔
391
  int32_t n = 0;
84,068✔
392

393
  switch (type) {
84,068✔
394
    case TSDB_DATA_TYPE_NULL:
×
395
      n = tsnprintf(str, capacity, "null");
×
396
      break;
×
397

398
    case TSDB_DATA_TYPE_BOOL:
4,862✔
399
      n = tsnprintf(str, capacity, (*(int8_t*)buf) ? "true" : "false");
4,862✔
400
      break;
4,862✔
401

402
    case TSDB_DATA_TYPE_TINYINT:
214✔
403
      n = tsnprintf(str, capacity, "%d", *(int8_t*)buf);
214✔
404
      break;
214✔
405

406
    case TSDB_DATA_TYPE_SMALLINT:
214✔
407
      n = tsnprintf(str, capacity, "%d", *(int16_t*)buf);
214✔
408
      break;
214✔
409

410
    case TSDB_DATA_TYPE_INT:
17,098✔
411
      n = tsnprintf(str, capacity, "%d", *(int32_t*)buf);
17,098✔
412
      break;
17,098✔
413

414
    case TSDB_DATA_TYPE_BIGINT:
1,195✔
415
    case TSDB_DATA_TYPE_TIMESTAMP:
416
      n = tsnprintf(str, capacity, "%" PRId64, *(int64_t*)buf);
1,195✔
417
      break;
1,195✔
418

419
    case TSDB_DATA_TYPE_FLOAT:
4,095✔
420
      n = tsnprintf(str, capacity, "%e", GET_FLOAT_VAL(buf));
4,095✔
421
      break;
4,095✔
422

423
    case TSDB_DATA_TYPE_DOUBLE:
3,328✔
424
      n = tsnprintf(str, capacity, "%e", GET_DOUBLE_VAL(buf));
3,328✔
425
      break;
3,328✔
426

427
    case TSDB_DATA_TYPE_VARBINARY: {
×
428
      if (bufSize < 0) {
×
429
        //        tscError("invalid buf size");
430
        return TSDB_CODE_TSC_INVALID_VALUE;
×
431
      }
432
      void*    data = NULL;
×
433
      uint32_t size = 0;
×
434
      if (taosAscii2Hex(buf, bufSize, &data, &size) < 0) {
×
435
        return TSDB_CODE_OUT_OF_MEMORY;
×
436
      }
437
      *str = '"';
×
438
      memcpy(str + 1, data, size);
×
439
      *(str + size + 1) = '"';
×
440
      n = size + 2;
×
441
      taosMemoryFree(data);
×
442
      break;
×
443
    }
444
    case TSDB_DATA_TYPE_BINARY:
12,413✔
445
    case TSDB_DATA_TYPE_GEOMETRY:
446
      if (bufSize < 0) {
12,413✔
447
        //        tscError("invalid buf size");
448
        return TSDB_CODE_TSC_INVALID_VALUE;
×
449
      }
450

451
      *str = '"';
12,413✔
452
      memcpy(str + 1, buf, bufSize);
12,413✔
453
      *(str + bufSize + 1) = '"';
12,413✔
454
      n = bufSize + 2;
12,413✔
455
      break;
12,413✔
456
    case TSDB_DATA_TYPE_NCHAR:
36,481✔
457
      if (bufSize < 0) {
36,481✔
458
        //        tscError("invalid buf size");
459
        return TSDB_CODE_TSC_INVALID_VALUE;
×
460
      }
461

462
      *str = '"';
36,481✔
463
      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1, NULL);
36,481✔
464
      if (length < 0) {
36,481✔
465
        return TSDB_CODE_TSC_INVALID_VALUE;
×
466
      }
467
      *(str + length + 1) = '"';
36,481✔
468
      n = length + 2;
36,481✔
469
      break;
36,481✔
470
    case TSDB_DATA_TYPE_UTINYINT:
214✔
471
      n = tsnprintf(str, capacity, "%d", *(uint8_t*)buf);
214✔
472
      break;
214✔
473

474
    case TSDB_DATA_TYPE_USMALLINT:
214✔
475
      n = tsnprintf(str, capacity, "%d", *(uint16_t*)buf);
214✔
476
      break;
214✔
477

478
    case TSDB_DATA_TYPE_UINT:
214✔
479
      n = tsnprintf(str, capacity, "%u", *(uint32_t*)buf);
214✔
480
      break;
214✔
481

482
    case TSDB_DATA_TYPE_UBIGINT:
3,526✔
483
      n = tsnprintf(str, capacity, "%" PRIu64, *(uint64_t*)buf);
3,526✔
484
      break;
3,526✔
485

486
    default:
×
487
      //      tscError("unsupported type:%d", type);
488
      return TSDB_CODE_TSC_INVALID_VALUE;
×
489
  }
490

491
  if (len) *len = n;
84,068✔
492

493
  return TSDB_CODE_SUCCESS;
84,068✔
494
}
495

496
void parseTagDatatoJson(void* p, char** jsonStr, void* charsetCxt) {
348,668✔
497
  if (!p || !jsonStr) {
348,668✔
498
    qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
×
499
    return;
×
500
  }
501
  char*   string = NULL;
348,668✔
502
  SArray* pTagVals = NULL;
348,668✔
503
  cJSON*  json = NULL;
348,668✔
504
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
348,668✔
505
    goto end;
×
506
  }
507

508
  int16_t nCols = taosArrayGetSize(pTagVals);
348,668✔
509
  if (nCols == 0) {
348,668✔
510
    goto end;
131,884✔
511
  }
512
  char tagJsonKey[256] = {0};
216,784✔
513
  json = cJSON_CreateObject();
216,784✔
514
  if (json == NULL) {
216,784✔
515
    goto end;
×
516
  }
517
  for (int j = 0; j < nCols; ++j) {
687,824✔
518
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
471,040✔
519
    if (pTagVal == NULL) {
471,040✔
520
      continue;
×
521
    }
522
    // json key  encode by binary
523
    tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
471,040✔
524
    // json value
525
    char type = pTagVal->type;
471,040✔
526
    if (type == TSDB_DATA_TYPE_NULL) {
471,040✔
527
      cJSON* value = cJSON_CreateNull();
29,158✔
528
      if (value == NULL) {
29,158✔
529
        goto end;
×
530
      }
531
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
29,158✔
532
        goto end;
×
533
      }
534
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
441,882✔
535
      cJSON* value = NULL;
344,249✔
536
      if (pTagVal->nData > 0) {
344,249✔
537
        char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
321,305✔
538
        if (tagJsonValue == NULL) {
321,305✔
539
          goto end;
×
540
        }
541
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue, charsetCxt);
321,305✔
542
        if (length < 0) {
321,305✔
543
          qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC,
×
544
                 charsetCxt != NULL ? ((SConvInfo*)(charsetCxt))->charset : tsCharset, pTagVal->pData);
545
          taosMemoryFree(tagJsonValue);
×
546
          goto end;
×
547
        }
548
        value = cJSON_CreateString(tagJsonValue);
321,305✔
549
        taosMemoryFree(tagJsonValue);
321,305✔
550
        if (value == NULL) {
321,305✔
551
          goto end;
×
552
        }
553
      } else if (pTagVal->nData == 0) {
22,944✔
554
        value = cJSON_CreateString("");
22,944✔
555
        if (value == NULL) {
22,944✔
556
          goto end;
×
557
        }
558
      } else {
559
        goto end;
×
560
      }
561

562
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
344,249✔
563
        goto end;
×
564
      }
565
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
97,633✔
566
      double jsonVd = *(double*)(&pTagVal->i64);
67,519✔
567
      cJSON* value = cJSON_CreateNumber(jsonVd);
67,519✔
568
      if (value == NULL) {
67,519✔
569
        goto end;
×
570
      }
571
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
67,519✔
572
        goto end;
×
573
      }
574
    } else if (type == TSDB_DATA_TYPE_BOOL) {
30,114✔
575
      char   jsonVd = *(char*)(&pTagVal->i64);
30,114✔
576
      cJSON* value = cJSON_CreateBool(jsonVd);
30,114✔
577
      if (value == NULL) {
30,114✔
578
        goto end;
×
579
      }
580
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
30,114✔
581
        goto end;
×
582
      }
583
    } else {
584
      goto end;
×
585
    }
586
  }
587
  string = cJSON_PrintUnformatted(json);
216,784✔
588
end:
348,668✔
589
  cJSON_Delete(json);
348,668✔
590
  taosArrayDestroy(pTagVals);
348,668✔
591
  if (string == NULL) {
348,668✔
592
    string = taosStrdup(TSDB_DATA_NULL_STR_L);
131,884✔
593
    if (string == NULL) {
131,884✔
594
      qError("failed to strdup null string");
×
595
    }
596
  }
597
  *jsonStr = string;
348,668✔
598
}
599

600
int32_t setColRef(SColRef* colRef, col_id_t colId, const char* colName, char* refColName, char* refTableName, char* refDbName) {
126,730,915✔
601
  colRef->id = colId;
126,730,915✔
602
  colRef->hasRef = true;
126,730,915✔
603
  tstrncpy(colRef->refDbName, refDbName, TSDB_DB_NAME_LEN);
126,730,915✔
604
  tstrncpy(colRef->refTableName, refTableName, TSDB_TABLE_NAME_LEN);
126,730,915✔
605
  tstrncpy(colRef->refColName, refColName, TSDB_COL_NAME_LEN);
126,730,915✔
606
  if (colName) {
126,730,915✔
607
    tstrncpy(colRef->colName, colName, TSDB_COL_NAME_LEN);
54,798,511✔
608
  } else {
609
    colRef->colName[0] = '\0';
71,932,404✔
610
  }
611
  return TSDB_CODE_SUCCESS;
126,730,915✔
612
}
613

614
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
44,397,780✔
615
  QUERY_PARAM_CHECK(pDst);
44,397,780✔
616
  if (NULL == pSrc) {
44,397,780✔
617
    *pDst = NULL;
80,511✔
618
    return TSDB_CODE_SUCCESS;
80,511✔
619
  }
620

621
  int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
44,317,269✔
622
  if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
44,327,041✔
623
    *pDst = NULL;
10,437✔
624
    qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
×
625
    return TSDB_CODE_INVALID_PARA;
×
626
  }
627

628
  int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
44,316,617✔
629
  int32_t schemaExtSize = 0;
44,316,617✔
630
  int32_t colRefSize = 0;
44,316,617✔
631
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
44,316,617✔
632
    schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
43,597,314✔
633
  }
634
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
44,326,705✔
635
    colRefSize = pSrc->numOfColRefs * sizeof(SColRef);
169,644✔
636
  }
637
  *pDst = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize);
44,328,371✔
638
  if (NULL == *pDst) {
44,289,520✔
639
    return terrno;
×
640
  }
641
  memcpy(*pDst, pSrc, metaSize);
44,296,079✔
642
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
44,298,676✔
643
    (*pDst)->schemaExt = (SSchemaExt*)((char*)*pDst + metaSize);
43,606,435✔
644
    memcpy((*pDst)->schemaExt, pSrc->schemaExt, schemaExtSize);
43,607,086✔
645
  } else {
646
    (*pDst)->schemaExt = NULL;
722,216✔
647
  }
648
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
44,327,377✔
649
    (*pDst)->colRef = (SColRef*)((char*)*pDst + metaSize + schemaExtSize);
169,644✔
650
    memcpy((*pDst)->colRef, pSrc->colRef, colRefSize);
169,644✔
651
  } else {
652
    (*pDst)->colRef = NULL;
44,160,671✔
653
  }
654

655
  return TSDB_CODE_SUCCESS;
44,329,986✔
656
}
657

658
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
7,147✔
659
  if (!pMeta || !pName || !pType) return;
7,147✔
660
  int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
7,147✔
661
  for (int32_t i = 0; i < nums; ++i) {
41,929✔
662
    if (0 == strcmp(pName, pMeta->schema[i].name)) {
41,552✔
663
      *pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
6,770✔
664
      return;
6,770✔
665
    }
666
  }
667

668
  *pType = TCOL_TYPE_NONE;
377✔
669
}
670

671
void freeVgInfo(SDBVgInfo* vgInfo) {
43,188,075✔
672
  if (NULL == vgInfo) {
43,188,075✔
673
    return;
10,683,395✔
674
  }
675

676
  taosHashCleanup(vgInfo->vgHash);
32,504,680✔
677
  taosArrayDestroy(vgInfo->vgArray);
32,506,271✔
678

679
  taosMemoryFreeClear(vgInfo);
32,506,271✔
680
}
681

682
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
4,294,520✔
683
  QUERY_PARAM_CHECK(pDst);
4,294,520✔
684
  if (NULL == pSrc) {
4,294,520✔
685
    *pDst = NULL;
×
686
    return TSDB_CODE_SUCCESS;
×
687
  }
688

689
  *pDst = taosMemoryMalloc(sizeof(*pSrc));
4,294,520✔
690
  if (NULL == *pDst) {
4,294,520✔
691
    return terrno;
×
692
  }
693
  memcpy(*pDst, pSrc, sizeof(*pSrc));
4,294,520✔
694
  (*pDst)->vgArray = NULL;
4,294,520✔
695

696
  if (pSrc->vgHash) {
4,294,520✔
697
    (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
3,855,495✔
698
                                   HASH_ENTRY_LOCK);
699
    if (NULL == (*pDst)->vgHash) {
3,855,495✔
700
      taosMemoryFreeClear(*pDst);
×
701
      return terrno;
×
702
    }
703

704
    SVgroupInfo* vgInfo = NULL;
3,855,495✔
705
    void*        pIter = taosHashIterate(pSrc->vgHash, NULL);
3,855,495✔
706
    while (pIter) {
11,555,562✔
707
      vgInfo = pIter;
7,700,067✔
708
      int32_t* vgId = taosHashGetKey(pIter, NULL);
7,700,067✔
709

710
      if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
7,700,067✔
711
        qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
×
712
        taosHashCancelIterate(pSrc->vgHash, pIter);
×
713
        freeVgInfo(*pDst);
×
714
        return terrno;
×
715
      }
716

717
      pIter = taosHashIterate(pSrc->vgHash, pIter);
7,700,067✔
718
    }
719
  }
720

721
  return TSDB_CODE_SUCCESS;
4,294,520✔
722
}
723

724
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
13,912,253✔
725
  QUERY_PARAM_CHECK(pDst);
13,912,253✔
726
  if (NULL == pSrc) {
13,912,253✔
727
    *pDst = NULL;
115,216✔
728
    return TSDB_CODE_SUCCESS;
115,216✔
729
  }
730

731
  *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
13,797,060✔
732
  if (NULL == *pDst) {
13,784,499✔
733
    return terrno;
×
734
  }
735

736
  (*pDst)->flags = pSrc->flags;
13,786,116✔
737
  if (pSrc->name) {
13,799,115✔
738
    (*pDst)->name = taosStrdup(pSrc->name);
13,801,374✔
739
    if (NULL == (*pDst)->name) goto _exit;
13,806,156✔
740
  }
741
  (*pDst)->uid = pSrc->uid;
13,797,688✔
742
  (*pDst)->btime = pSrc->btime;
13,803,512✔
743
  (*pDst)->ttl = pSrc->ttl;
13,806,445✔
744
  (*pDst)->commentLen = pSrc->commentLen;
13,802,833✔
745
  if (pSrc->comment) {
13,805,794✔
746
    (*pDst)->comment = taosStrdup(pSrc->comment);
×
747
    if (NULL == (*pDst)->comment) goto _exit;
65✔
748
  }
749
  (*pDst)->type = pSrc->type;
13,805,866✔
750

751
  if (pSrc->type == TSDB_CHILD_TABLE) {
13,805,544✔
752
    if (pSrc->ctb.stbName) {
13,804,879✔
753
      (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
13,804,536✔
754
      if (NULL == (*pDst)->ctb.stbName) goto _exit;
13,804,636✔
755
    }
756
    (*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
13,797,167✔
757
    (*pDst)->ctb.suid = pSrc->ctb.suid;
13,806,239✔
758
    if (pSrc->ctb.tagName) {
13,804,272✔
759
      (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
13,803,654✔
760
      if (NULL == (*pDst)->ctb.tagName) goto _exit;
13,809,552✔
761
    }
762
    STag* pTag = (STag*)pSrc->ctb.pTag;
13,809,169✔
763
    if (pTag) {
13,806,586✔
764
      (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
13,808,271✔
765
      if (NULL == (*pDst)->ctb.pTag) goto _exit;
13,807,518✔
766
      memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
13,803,969✔
767
    }
768
  } else {
769
    (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
×
770
    (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
×
771
    if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
×
772
      (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
773
      if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
×
774
      memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
775
    }
776
  }
777

778
  return TSDB_CODE_SUCCESS;
13,800,023✔
779

780
_exit:
×
781
  tdDestroySVCreateTbReq(*pDst);
×
782
  taosMemoryFree(*pDst);
×
783
  *pDst = NULL;
×
784
  return terrno;
×
785
}
786

787
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
14,483,682✔
788
  if (pInfo) {
14,483,682✔
789
    taosArrayDestroy(pInfo->pRetensions);
4,778,205✔
790
  }
791
  taosMemoryFree(pInfo);
14,483,682✔
792
}
14,483,682✔
793

794
void* getTaskPoolWorkerCb() { return taskQueue.wrokrerPool.pCb; }
830,154,932✔
795

796
void tFreeStreamVtbOtbInfo(void* param);
797
void tFreeStreamVtbVtbInfo(void* param);
798
void tFreeStreamVtbDbVgInfo(void* param);
799

800
void destroySTagsInfo(STagsInfo* pInfo) {
1,870,167,021✔
801
  if (NULL == pInfo) {
1,870,167,021✔
802
    return;
1,870,181,520✔
803
  }
804
  taosArrayDestroy(pInfo->STagNames);
×
805

806
  for (int i = 0; i < taosArrayGetSize(pInfo->pTagVals); ++i) {
×
807
    STagVal* p = (STagVal*)taosArrayGet(pInfo->pTagVals, i);
×
808
    if (IS_VAR_DATA_TYPE(p->type)) {
×
809
      taosMemoryFreeClear(p->pData);
×
810
    }
811
  }
812
  taosArrayDestroy(pInfo->pTagVals);
×
813
  taosMemoryFreeClear(pInfo->pTagIndex);
×
814
  taosMemoryFreeClear(pInfo);
×
815
}
816

817
void qDestroyBoundColInfo(void* pInfo) {
1,871,030,368✔
818
  if (NULL == pInfo) {
1,871,030,368✔
819
    return;
855,906✔
820
  }
821
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
1,870,174,462✔
822

823
  taosMemoryFreeClear(pBoundInfo->pColIndex);
1,870,174,462✔
824
  destroySTagsInfo(pBoundInfo->parseredTags);
1,870,182,288✔
825
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc