• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4917

07 Jan 2026 03:52PM UTC coverage: 65.42% (+0.02%) from 65.402%
#4917

push

travis-ci

web-flow
merge: from main to 3.0 branch #34204

31 of 34 new or added lines in 2 files covered. (91.18%)

819 existing lines in 129 files now uncovered.

202679 of 309814 relevant lines covered (65.42%)

116724351.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.39
/source/libs/qcom/src/queryUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "query.h"
18
#include "tglobal.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21
#include "tsched.h"
22
#include "tworker.h"
23
// clang-format off
24
#include "cJSON.h"
25
#include "queryInt.h"
26

27
typedef struct STaskQueue {
28
  SQueryAutoQWorkerPool wrokrerPool;
29
  STaosQueue* pTaskQueue;
30
} STaskQueue;
31

32
int32_t getAsofJoinReverseOp(EOperatorType op) {
93,337✔
33
  switch (op) {
93,337✔
34
    case OP_TYPE_GREATER_THAN:
2,828✔
35
      return OP_TYPE_LOWER_THAN;
2,828✔
36
    case OP_TYPE_GREATER_EQUAL:
2,828✔
37
      return OP_TYPE_LOWER_EQUAL;
2,828✔
38
    case OP_TYPE_LOWER_THAN:
3,618✔
39
      return OP_TYPE_GREATER_THAN;
3,618✔
40
    case OP_TYPE_LOWER_EQUAL:
6,086✔
41
      return OP_TYPE_GREATER_EQUAL;
6,086✔
42
    case OP_TYPE_EQUAL:
77,977✔
43
      return OP_TYPE_EQUAL;
77,977✔
44
    default:
×
45
      break;
×
46
  }
47

48
  return -1;
×
49
}
50

51
const SSchema* tGetTbnameColumnSchema() { 
×
52
  static struct SSchema _s = {
53
      .colId = TSDB_TBNAME_COLUMN_INDEX,
54
      .type = TSDB_DATA_TYPE_BINARY,
55
      .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
56
      .name = "tbname",
57
  };
58
  return &_s; 
×
59
}
60

61
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
65,587,328✔
62
  if (!pSchema) {
65,587,328✔
63
    return false;
×
64
  }
65
  int32_t rowLen = 0;
65,587,328✔
66

67
  for (int32_t i = 0; i < numOfCols; ++i) {
470,378,884✔
68
    // 1. valid types
69
    if (!isValidDataType(pSchema[i].type)) {
404,791,556✔
70
      qError("The %d col/tag data type error, type:%d", i, pSchema[i].type);
×
71
      return false;
×
72
    }
73

74
    // 2. valid length for each type
75
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY) {
404,791,403✔
76
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
26,077,565✔
77
        qError("The %d col/tag var data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
78
        return false;
×
79
      }
80
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
378,714,159✔
81
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
33,426,703✔
82
        qError("The %d col/tag nchar data len error, len:%d", i, pSchema[i].bytes);
×
83
        return false;
×
84
      }
85
    } else if (pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
345,287,535✔
86
      if (pSchema[i].bytes > TSDB_MAX_GEOMETRY_LEN) {
307,456✔
87
        qError("The %d col/tag geometry data len error, len:%d", i, pSchema[i].bytes);
×
88
        return false;
×
89
      }
90
    } else if (IS_STR_DATA_BLOB(pSchema[i].type)) {
344,980,000✔
91
      if (pSchema[i].bytes >= TSDB_MAX_BLOB_LEN) {
16,003✔
92
        qError("The %d col/tag blob data len error, len:%d", i, pSchema[i].bytes);
×
93
        return false;
×
94
      } 
95

96
    }else {
97
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
344,963,997✔
98
        qError("The %d col/tag data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
99
        return false;
×
100
      }
101
    }
102

103
    // 3. valid column names
104
    for (int32_t j = i + 1; j < numOfCols; ++j) {
2,147,483,647✔
105
      if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
2,147,483,647✔
106
        qError("The %d col/tag name %s is same with %d col/tag name %s", i, pSchema[i].name, j, pSchema[j].name);
×
107
        return false;
×
108
      }
109
    }
110

111
    rowLen += pSchema[i].bytes;
404,793,752✔
112
  }
113

114
  return rowLen <= maxLen;
65,587,328✔
115
}
116

117
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags, bool isVirtual) {
32,793,664✔
118
  if (!pSchema) {
32,793,664✔
119
    qError("invalid numOfCols: %d", numOfCols);
×
120
    return false;
×
121
  }
122

123
  if ((isVirtual && !VALIDNUMOFCOLSVIRTUAL(numOfCols)) || (!isVirtual && !VALIDNUMOFCOLS(numOfCols))) {
32,793,664✔
124
    qError("invalid numOfCols: %d", numOfCols);
×
125
    return false;
×
126
  }
127

128
  if (!VALIDNUMOFTAGS(numOfTags)) {
32,793,664✔
129
    qError("invalid numOfTags: %d", numOfTags);
38✔
130
    return false;
×
131
  }
132

133
  /* first column must be the timestamp, which is a primary key */
134
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
32,793,626✔
135
    qError("invalid first column type: %d", pSchema[0].type);
×
136
    return false;
×
137
  }
138

139
  if (!doValidateSchema(pSchema, numOfCols, isVirtual ? TSDB_MAX_BYTES_PER_ROW_VIRTUAL : TSDB_MAX_BYTES_PER_ROW)) {
32,793,664✔
140
    qError("validate schema columns failed");
×
141
    return false;
×
142
  }
143

144
  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
32,793,664✔
145
    qError("validate schema tags failed");
×
146
    return false;
×
147
  }
148

149
  return true;
32,793,664✔
150
}
151

152
static STaskQueue taskQueue = {0};
153

154
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
2,006,332,233✔
155
  if(!pSchedMsg || !pSchedMsg->ahandle) return;
2,006,332,233✔
156
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
2,006,335,516✔
157
  (void)execFn(pSchedMsg->thandle);
2,006,336,059✔
158
  taosFreeQitem(pSchedMsg);
2,006,134,245✔
159
}
160

161
int32_t initTaskQueue() {
1,186,621✔
162
  memset(&taskQueue, 0, sizeof(taskQueue));
1,186,621✔
163
  
164
  taskQueue.wrokrerPool.name = "taskWorkPool";
1,186,621✔
165
  taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
1,186,621✔
166
  taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
1,186,621✔
167
  int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
1,186,621✔
168
  if (TSDB_CODE_SUCCESS != coce) {
1,186,621✔
169
    qError("failed to init task thread pool");
×
170
    return -1;
×
171
  }
172

173
  taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
1,186,621✔
174
  if (NULL == taskQueue.pTaskQueue) {
1,186,621✔
175
    qError("failed to init task queue");
×
176
    return -1;
×
177
  }
178

179
  qInfo("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
1,186,621✔
180
  return 0;
1,186,621✔
181
}
182

183
int32_t cleanupTaskQueue() {
1,186,668✔
184
  tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
1,186,668✔
185
  return 0;
1,186,668✔
186
}
187

188
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
2,006,222,043✔
189
  SSchedMsg* pSchedMsg; 
1,999,432,519✔
190
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
2,006,233,791✔
191
  if (rc) return rc;
2,006,272,164✔
192
  pSchedMsg->fp = NULL;
2,006,272,164✔
193
  pSchedMsg->ahandle = execFn;
2,006,272,345✔
194
  pSchedMsg->thandle = execParam;
2,006,294,590✔
195
  pSchedMsg->msg = code;
2,006,289,301✔
196

197
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
2,006,294,492✔
198
}
199

200
int32_t taosAsyncWait() {
28,943✔
201
  if (!taskQueue.wrokrerPool.pCb) {
28,943✔
202
    qError("query task thread pool callback function is null");
×
203
    return -1;
×
204
  }
205
  return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
28,943✔
206
}
207

208
int32_t taosAsyncRecover() {
28,943✔
209
  if (!taskQueue.wrokrerPool.pCb) {
28,943✔
210
    qError("query task thread pool callback function is null");
×
211
    return -1;
×
212
  }
213
  return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
28,943✔
214
}
215

216
int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) {
×
217
  SSchedMsg* pSchedMsg;
×
218
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
×
219
  if (rc) return rc;
×
220
  pSchedMsg->fp = NULL;
×
221
  pSchedMsg->ahandle = bindFn;
×
222
  pSchedMsg->thandle = bindParam;
×
223
  // pSchedMsg->msg = code;
224

225
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
×
226
}
227

228
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
2,147,483,647✔
229
  if (NULL == pMsgBody) {
2,147,483,647✔
230
    return;
×
231
  }
232

233
  
234
  qDebug("ahandle %p freed, QID:0x%" PRIx64, pMsgBody, pMsgBody->requestId);
2,147,483,647✔
235
  
236
  taosMemoryFreeClear(pMsgBody->target.dbFName);
2,147,483,647✔
237
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
2,147,483,647✔
238
  if (pMsgBody->paramFreeFp) {
2,147,483,647✔
239
    (*pMsgBody->paramFreeFp)(pMsgBody->param);
2,147,483,647✔
240
  }
241
  taosMemoryFreeClear(pMsgBody);
2,147,483,647✔
242
}
243
void destroyAhandle(void *ahandle) {
368,924,481✔
244
  SMsgSendInfo *pSendInfo = ahandle;
368,924,481✔
245
  if (pSendInfo == NULL) return;
368,924,481✔
246

247
  if (pSendInfo->streamAHandle) {
368,924,481✔
248
    qDebug("stream ahandle %p freed", pSendInfo);
29,219,336✔
249
  }
250

251
  destroySendMsgInfo(pSendInfo);
369,027,991✔
252
}
253

254
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
1,932,561,235✔
255
                                bool persistHandle, void* rpcCtx) {                         
256
  if (NULL == pTransporter || NULL == epSet || NULL == pInfo) {
1,932,561,235✔
257
    destroySendMsgInfo(pInfo);
×
258
    return TSDB_CODE_TSC_INVALID_INPUT;
×
259
  }
260

261
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
1,933,142,620✔
262
  if (NULL == pMsg) {
1,931,963,678✔
263
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
×
264
    destroySendMsgInfo(pInfo);
×
265
    return terrno;
×
266
  }
267

268
  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
1,931,963,678✔
269
  SRpcMsg rpcMsg = {
1,932,213,712✔
270
    .msgType = pInfo->msgType,
1,932,955,575✔
271
    .pCont = pMsg,
272
    .contLen = pInfo->msgInfo.len,
1,933,120,665✔
273
    .info.ahandle = (void*)pInfo,
274
    .info.handle = pInfo->msgInfo.handle,
1,933,022,160✔
275
    .info.persistHandle = persistHandle,
276
    .code = 0
277
  };
278
  TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
1,932,801,881✔
279
  TRACE_SET_MSGID(&rpcMsg.info.traceId, tGenIdPI64());
1,932,984,315✔
280
  int32_t msgType = pInfo->msgType;
1,932,565,944✔
281

282
  int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
1,932,609,808✔
283
  if (code) {
1,933,339,780✔
284
    destroySendMsgInfo(pInfo);
222,638✔
285
  } else {
286
    qDebug("msg %s sent, 0x%" PRIx64 ":0x%" PRIx64, TMSG_INFO(msgType), TRACE_GET_ROOTID(&rpcMsg.info.traceId), TRACE_GET_MSGID(&rpcMsg.info.traceId));
1,933,117,142✔
287
  }
288
  return code;
1,933,238,918✔
289
}
290

291
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
358,762,336✔
292
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
358,762,336✔
293
}
294

295
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
242,011,457✔
296
  QUERY_PARAM_CHECK(pTransporter);
242,011,457✔
297
  return rpcFreeConnById(pTransporter, pid);
242,011,457✔
298
}
299

300
char* jobTaskStatusStr(int32_t status) {
2,147,483,647✔
301
  switch (status) {
2,147,483,647✔
302
    case JOB_TASK_STATUS_NULL:
717,668,421✔
303
      return "NULL";
717,668,421✔
304
    case JOB_TASK_STATUS_INIT:
1,650,053,599✔
305
      return "INIT";
1,650,053,599✔
306
    case JOB_TASK_STATUS_EXEC:
2,147,483,647✔
307
      return "EXECUTING";
2,147,483,647✔
308
    case JOB_TASK_STATUS_PART_SUCC:
1,662,208,428✔
309
      return "PARTIAL_SUCCEED";
1,662,208,428✔
310
    case JOB_TASK_STATUS_FETCH:
186,622,850✔
311
      return "FETCHING";
186,622,850✔
312
    case JOB_TASK_STATUS_SUCC:
380,895,963✔
313
      return "SUCCEED";
380,895,963✔
314
    case JOB_TASK_STATUS_FAIL:
9,083,167✔
315
      return "FAILED";
9,083,167✔
316
    case JOB_TASK_STATUS_DROP:
737,769,434✔
317
      return "DROPPING";
737,769,434✔
UNCOV
318
    default:
×
UNCOV
319
      break;
×
320
  }
321

UNCOV
322
  return "UNKNOWN";
×
323
}
324

325
#if 0
326
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
327
  SSchema s = {0};
328
  s.type = type;
329
  s.bytes = bytes;
330
  s.colId = colId;
331

332
  tstrncpy(s.name, name, tListLen(s.name));
333
  return s;
334
}
335
#endif
336

337
void freeSTableMetaRspPointer(void *p) {
51,476,001✔
338
  tFreeSTableMetaRsp(*(void**)p);
51,476,001✔
339
  taosMemoryFreeClear(*(void**)p);
51,471,485✔
340
}
51,471,694✔
341

342
void destroyQueryExecRes(SExecResult* pRes) {
2,147,483,647✔
343
  if (NULL == pRes || NULL == pRes->res) {
2,147,483,647✔
344
    return;
1,514,931,581✔
345
  }
346

347
  switch (pRes->msgType) {
717,380,343✔
348
    case TDMT_VND_CREATE_TABLE: {
45,263,354✔
349
      taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
45,263,354✔
350
      break;
45,259,114✔
351
    }
352
    case TDMT_MND_CREATE_STB:
9,658,083✔
353
    case TDMT_VND_ALTER_TABLE:
354
    case TDMT_MND_ALTER_STB: {
355
      tFreeSTableMetaRsp(pRes->res);
9,658,083✔
356
      taosMemoryFreeClear(pRes->res);
9,658,083✔
357
      break;
9,658,083✔
358
    }
359
    case TDMT_VND_SUBMIT: {
501,248,381✔
360
      tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
501,248,381✔
361
      taosMemoryFreeClear(pRes->res);
501,245,286✔
362
      break;
501,250,369✔
363
    }
364
    case TDMT_SCH_QUERY:
161,208,247✔
365
    case TDMT_SCH_MERGE_QUERY: {
366
      qDebug("query execRes %p freed", pRes->res);
161,208,247✔
367
      taosArrayDestroy((SArray*)pRes->res);
161,208,247✔
368
      break;
161,208,319✔
369
    }
370
    default:
87✔
371
      qError("invalid exec result for request type:%d", pRes->msgType);
87✔
372
  }
373
}
374
// clang-format on
375
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
58,748✔
376
  QUERY_PARAM_CHECK(str);
58,748✔
377
  QUERY_PARAM_CHECK(buf);
58,748✔
378
  int32_t n = 0;
58,748✔
379

380
  switch (type) {
58,748✔
381
    case TSDB_DATA_TYPE_NULL:
×
382
      n = tsnprintf(str, capacity, "null");
×
383
      break;
×
384

385
    case TSDB_DATA_TYPE_BOOL:
2,234✔
386
      n = tsnprintf(str, capacity, (*(int8_t*)buf) ? "true" : "false");
2,234✔
387
      break;
2,234✔
388

389
    case TSDB_DATA_TYPE_TINYINT:
163✔
390
      n = tsnprintf(str, capacity, "%d", *(int8_t*)buf);
163✔
391
      break;
163✔
392

393
    case TSDB_DATA_TYPE_SMALLINT:
163✔
394
      n = tsnprintf(str, capacity, "%d", *(int16_t*)buf);
163✔
395
      break;
163✔
396

397
    case TSDB_DATA_TYPE_INT:
13,736✔
398
      n = tsnprintf(str, capacity, "%d", *(int32_t*)buf);
13,736✔
399
      break;
13,736✔
400

401
    case TSDB_DATA_TYPE_BIGINT:
1,057✔
402
    case TSDB_DATA_TYPE_TIMESTAMP:
403
      n = tsnprintf(str, capacity, "%" PRId64, *(int64_t*)buf);
1,057✔
404
      break;
1,057✔
405

406
    case TSDB_DATA_TYPE_FLOAT:
1,503✔
407
      n = tsnprintf(str, capacity, "%e", GET_FLOAT_VAL(buf));
1,503✔
408
      break;
1,503✔
409

410
    case TSDB_DATA_TYPE_DOUBLE:
772✔
411
      n = tsnprintf(str, capacity, "%e", GET_DOUBLE_VAL(buf));
772✔
412
      break;
772✔
413

414
    case TSDB_DATA_TYPE_VARBINARY: {
×
415
      if (bufSize < 0) {
×
416
        //        tscError("invalid buf size");
417
        return TSDB_CODE_TSC_INVALID_VALUE;
×
418
      }
419
      void*    data = NULL;
×
420
      uint32_t size = 0;
×
421
      if (taosAscii2Hex(buf, bufSize, &data, &size) < 0) {
×
422
        return TSDB_CODE_OUT_OF_MEMORY;
×
423
      }
424
      *str = '"';
×
425
      memcpy(str + 1, data, size);
×
426
      *(str + size + 1) = '"';
×
427
      n = size + 2;
×
428
      taosMemoryFree(data);
×
429
      break;
×
430
    }
431
    case TSDB_DATA_TYPE_BINARY:
4,161✔
432
    case TSDB_DATA_TYPE_GEOMETRY:
433
      if (bufSize < 0) {
4,161✔
434
        //        tscError("invalid buf size");
435
        return TSDB_CODE_TSC_INVALID_VALUE;
×
436
      }
437

438
      *str = '"';
4,161✔
439
      memcpy(str + 1, buf, bufSize);
4,161✔
440
      *(str + bufSize + 1) = '"';
4,161✔
441
      n = bufSize + 2;
4,161✔
442
      break;
4,161✔
443
    case TSDB_DATA_TYPE_NCHAR:
31,307✔
444
      if (bufSize < 0) {
31,307✔
445
        //        tscError("invalid buf size");
446
        return TSDB_CODE_TSC_INVALID_VALUE;
×
447
      }
448

449
      *str = '"';
31,307✔
450
      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1, NULL);
31,307✔
451
      if (length < 0) {
31,307✔
452
        return TSDB_CODE_TSC_INVALID_VALUE;
×
453
      }
454
      *(str + length + 1) = '"';
31,307✔
455
      n = length + 2;
31,307✔
456
      break;
31,307✔
457
    case TSDB_DATA_TYPE_UTINYINT:
163✔
458
      n = tsnprintf(str, capacity, "%d", *(uint8_t*)buf);
163✔
459
      break;
163✔
460

461
    case TSDB_DATA_TYPE_USMALLINT:
163✔
462
      n = tsnprintf(str, capacity, "%d", *(uint16_t*)buf);
163✔
463
      break;
163✔
464

465
    case TSDB_DATA_TYPE_UINT:
163✔
466
      n = tsnprintf(str, capacity, "%u", *(uint32_t*)buf);
163✔
467
      break;
163✔
468

469
    case TSDB_DATA_TYPE_UBIGINT:
3,163✔
470
      n = tsnprintf(str, capacity, "%" PRIu64, *(uint64_t*)buf);
3,163✔
471
      break;
3,163✔
472

473
    default:
×
474
      //      tscError("unsupported type:%d", type);
475
      return TSDB_CODE_TSC_INVALID_VALUE;
×
476
  }
477

478
  if (len) *len = n;
58,748✔
479

480
  return TSDB_CODE_SUCCESS;
58,748✔
481
}
482

483
void parseTagDatatoJson(void* p, char** jsonStr, void* charsetCxt) {
331,105✔
484
  if (!p || !jsonStr) {
331,105✔
485
    qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
×
486
    return;
×
487
  }
488
  char*   string = NULL;
331,105✔
489
  SArray* pTagVals = NULL;
331,105✔
490
  cJSON*  json = NULL;
331,105✔
491
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
331,105✔
492
    goto end;
×
493
  }
494

495
  int16_t nCols = taosArrayGetSize(pTagVals);
331,105✔
496
  if (nCols == 0) {
331,105✔
497
    goto end;
125,732✔
498
  }
499
  char tagJsonKey[256] = {0};
205,373✔
500
  json = cJSON_CreateObject();
205,373✔
501
  if (json == NULL) {
205,373✔
502
    goto end;
×
503
  }
504
  for (int j = 0; j < nCols; ++j) {
651,229✔
505
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
445,856✔
506
    if (pTagVal == NULL) {
445,856✔
507
      continue;
×
508
    }
509
    // json key  encode by binary
510
    tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
445,856✔
511
    // json value
512
    char type = pTagVal->type;
445,856✔
513
    if (type == TSDB_DATA_TYPE_NULL) {
445,856✔
514
      cJSON* value = cJSON_CreateNull();
27,389✔
515
      if (value == NULL) {
27,389✔
516
        goto end;
×
517
      }
518
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
27,389✔
519
        goto end;
×
520
      }
521
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
418,467✔
522
      cJSON* value = NULL;
326,818✔
523
      if (pTagVal->nData > 0) {
326,818✔
524
        char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
305,266✔
525
        if (tagJsonValue == NULL) {
305,266✔
526
          goto end;
×
527
        }
528
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue, charsetCxt);
305,266✔
529
        if (length < 0) {
305,266✔
530
          qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC,
×
531
                 charsetCxt != NULL ? ((SConvInfo*)(charsetCxt))->charset : tsCharset, pTagVal->pData);
532
          taosMemoryFree(tagJsonValue);
×
533
          goto end;
×
534
        }
535
        value = cJSON_CreateString(tagJsonValue);
305,266✔
536
        taosMemoryFree(tagJsonValue);
305,266✔
537
        if (value == NULL) {
305,266✔
538
          goto end;
×
539
        }
540
      } else if (pTagVal->nData == 0) {
21,552✔
541
        value = cJSON_CreateString("");
21,552✔
542
        if (value == NULL) {
21,552✔
543
          goto end;
×
544
        }
545
      } else {
546
        goto end;
×
547
      }
548

549
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
326,818✔
550
        goto end;
×
551
      }
552
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
91,649✔
553
      double jsonVd = *(double*)(&pTagVal->i64);
63,362✔
554
      cJSON* value = cJSON_CreateNumber(jsonVd);
63,362✔
555
      if (value == NULL) {
63,362✔
556
        goto end;
×
557
      }
558
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
63,362✔
559
        goto end;
×
560
      }
561
    } else if (type == TSDB_DATA_TYPE_BOOL) {
28,287✔
562
      char   jsonVd = *(char*)(&pTagVal->i64);
28,287✔
563
      cJSON* value = cJSON_CreateBool(jsonVd);
28,287✔
564
      if (value == NULL) {
28,287✔
565
        goto end;
×
566
      }
567
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
28,287✔
568
        goto end;
×
569
      }
570
    } else {
571
      goto end;
×
572
    }
573
  }
574
  string = cJSON_PrintUnformatted(json);
205,373✔
575
end:
331,105✔
576
  cJSON_Delete(json);
331,105✔
577
  taosArrayDestroy(pTagVals);
331,105✔
578
  if (string == NULL) {
331,105✔
579
    string = taosStrdup(TSDB_DATA_NULL_STR_L);
125,732✔
580
    if (string == NULL) {
125,732✔
581
      qError("failed to strdup null string");
×
582
    }
583
  }
584
  *jsonStr = string;
331,105✔
585
}
586

587
int32_t setColRef(SColRef* colRef, col_id_t colId, char* refColName, char* refTableName, char* refDbName) {
80,837,797✔
588
  colRef->id = colId;
80,837,797✔
589
  colRef->hasRef = true;
80,837,797✔
590
  tstrncpy(colRef->refDbName, refDbName, TSDB_DB_NAME_LEN);
80,837,797✔
591
  tstrncpy(colRef->refTableName, refTableName, TSDB_TABLE_NAME_LEN);
80,837,797✔
592
  tstrncpy(colRef->refColName, refColName, TSDB_COL_NAME_LEN);
80,837,797✔
593
  return TSDB_CODE_SUCCESS;
80,837,797✔
594
}
595

596
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
35,702,915✔
597
  QUERY_PARAM_CHECK(pDst);
35,702,915✔
598
  if (NULL == pSrc) {
35,702,915✔
599
    *pDst = NULL;
67,887✔
600
    return TSDB_CODE_SUCCESS;
67,887✔
601
  }
602

603
  int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
35,635,028✔
604
  if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
35,631,814✔
UNCOV
605
    *pDst = NULL;
×
606
    qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
×
607
    return TSDB_CODE_INVALID_PARA;
×
608
  }
609

610
  int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
35,633,098✔
611
  int32_t schemaExtSize = 0;
35,633,098✔
612
  int32_t colRefSize = 0;
35,633,098✔
613
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
35,633,098✔
614
    schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
35,098,109✔
615
  }
616
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
35,630,859✔
617
    colRefSize = pSrc->numOfColRefs * sizeof(SColRef);
43,627✔
618
  }
619
  *pDst = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize);
35,637,271✔
620
  if (NULL == *pDst) {
35,620,904✔
621
    return terrno;
×
622
  }
623
  memcpy(*pDst, pSrc, metaSize);
35,623,151✔
624
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
35,623,151✔
625
    (*pDst)->schemaExt = (SSchemaExt*)((char*)*pDst + metaSize);
35,099,706✔
626
    memcpy((*pDst)->schemaExt, pSrc->schemaExt, schemaExtSize);
35,102,916✔
627
  } else {
628
    (*pDst)->schemaExt = NULL;
539,483✔
629
  }
630
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
35,640,473✔
631
    (*pDst)->colRef = (SColRef*)((char*)*pDst + metaSize + schemaExtSize);
43,627✔
632
    memcpy((*pDst)->colRef, pSrc->colRef, colRefSize);
43,627✔
633
  } else {
634
    (*pDst)->colRef = NULL;
35,592,957✔
635
  }
636

637
  return TSDB_CODE_SUCCESS;
35,638,559✔
638
}
639

640
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
5,668✔
641
  if (!pMeta || !pName || !pType) return;
5,668✔
642
  int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
5,668✔
643
  for (int32_t i = 0; i < nums; ++i) {
35,484✔
644
    if (0 == strcmp(pName, pMeta->schema[i].name)) {
35,136✔
645
      *pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
5,320✔
646
      return;
5,320✔
647
    }
648
  }
649

650
  *pType = TCOL_TYPE_NONE;
348✔
651
}
652

653
void freeVgInfo(SDBVgInfo* vgInfo) {
36,941,083✔
654
  if (NULL == vgInfo) {
36,941,083✔
655
    return;
9,651,850✔
656
  }
657

658
  taosHashCleanup(vgInfo->vgHash);
27,289,233✔
659
  taosArrayDestroy(vgInfo->vgArray);
27,290,047✔
660

661
  taosMemoryFreeClear(vgInfo);
27,290,047✔
662
}
663

664
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
4,051,867✔
665
  QUERY_PARAM_CHECK(pDst);
4,051,867✔
666
  if (NULL == pSrc) {
4,051,867✔
667
    *pDst = NULL;
×
668
    return TSDB_CODE_SUCCESS;
×
669
  }
670

671
  *pDst = taosMemoryMalloc(sizeof(*pSrc));
4,051,867✔
672
  if (NULL == *pDst) {
4,051,867✔
673
    return terrno;
×
674
  }
675
  memcpy(*pDst, pSrc, sizeof(*pSrc));
4,051,867✔
676
  (*pDst)->vgArray = NULL;
4,051,867✔
677

678
  if (pSrc->vgHash) {
4,051,867✔
679
    (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
3,550,512✔
680
                                   HASH_ENTRY_LOCK);
681
    if (NULL == (*pDst)->vgHash) {
3,550,512✔
682
      taosMemoryFreeClear(*pDst);
×
683
      return terrno;
×
684
    }
685

686
    SVgroupInfo* vgInfo = NULL;
3,550,512✔
687
    void*        pIter = taosHashIterate(pSrc->vgHash, NULL);
3,550,512✔
688
    while (pIter) {
10,627,279✔
689
      vgInfo = pIter;
7,076,767✔
690
      int32_t* vgId = taosHashGetKey(pIter, NULL);
7,076,767✔
691

692
      if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
7,076,767✔
693
        qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
×
694
        taosHashCancelIterate(pSrc->vgHash, pIter);
×
695
        freeVgInfo(*pDst);
×
696
        return terrno;
×
697
      }
698

699
      pIter = taosHashIterate(pSrc->vgHash, pIter);
7,076,767✔
700
    }
701
  }
702

703
  return TSDB_CODE_SUCCESS;
4,051,867✔
704
}
705

706
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
7,287,967✔
707
  QUERY_PARAM_CHECK(pDst);
7,287,967✔
708
  if (NULL == pSrc) {
7,287,967✔
709
    *pDst = NULL;
109,843✔
710
    return TSDB_CODE_SUCCESS;
109,843✔
711
  }
712

713
  *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
7,178,124✔
714
  if (NULL == *pDst) {
7,170,722✔
715
    return terrno;
×
716
  }
717

718
  (*pDst)->flags = pSrc->flags;
7,171,364✔
719
  if (pSrc->name) {
7,174,619✔
720
    (*pDst)->name = taosStrdup(pSrc->name);
7,175,582✔
721
    if (NULL == (*pDst)->name) goto _exit;
7,179,087✔
722
  }
723
  (*pDst)->uid = pSrc->uid;
7,177,803✔
724
  (*pDst)->btime = pSrc->btime;
7,177,803✔
725
  (*pDst)->ttl = pSrc->ttl;
7,177,482✔
726
  (*pDst)->commentLen = pSrc->commentLen;
7,176,198✔
727
  if (pSrc->comment) {
7,176,198✔
728
    (*pDst)->comment = taosStrdup(pSrc->comment);
×
UNCOV
729
    if (NULL == (*pDst)->comment) goto _exit;
×
730
  }
731
  (*pDst)->type = pSrc->type;
7,176,506✔
732

733
  if (pSrc->type == TSDB_CHILD_TABLE) {
7,178,753✔
734
    if (pSrc->ctb.stbName) {
7,176,185✔
735
      (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
7,176,198✔
736
      if (NULL == (*pDst)->ctb.stbName) goto _exit;
7,177,109✔
737
    }
738
    (*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
7,177,738✔
739
    (*pDst)->ctb.suid = pSrc->ctb.suid;
7,177,417✔
740
    if (pSrc->ctb.tagName) {
7,178,059✔
741
      (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
7,177,495✔
742
      if (NULL == (*pDst)->ctb.tagName) goto _exit;
7,178,805✔
743
    }
744
    STag* pTag = (STag*)pSrc->ctb.pTag;
7,177,764✔
745
    if (pTag) {
7,178,085✔
746
      (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
7,178,150✔
747
      if (NULL == (*pDst)->ctb.pTag) goto _exit;
7,179,126✔
748
      memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
7,176,237✔
749
    }
750
  } else {
751
    (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
×
752
    (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
×
753
    if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
×
754
      (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
755
      if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
×
756
      memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
757
    }
758
  }
759

760
  return TSDB_CODE_SUCCESS;
7,176,814✔
761

762
_exit:
×
763
  tdDestroySVCreateTbReq(*pDst);
×
764
  taosMemoryFree(*pDst);
×
765
  *pDst = NULL;
×
766
  return terrno;
×
767
}
768

769
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
12,156,112✔
770
  if (pInfo) {
12,156,112✔
771
    taosArrayDestroy(pInfo->pRetensions);
3,725,584✔
772
  }
773
  taosMemoryFree(pInfo);
12,156,112✔
774
}
12,156,112✔
775

776
void* getTaskPoolWorkerCb() { return taskQueue.wrokrerPool.pCb; }
690,554,148✔
777

778
void tFreeStreamVtbOtbInfo(void* param);
779
void tFreeStreamVtbVtbInfo(void* param);
780
void tFreeStreamVtbDbVgInfo(void* param);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc