• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4908

30 Dec 2025 10:52AM UTC coverage: 65.386% (-0.2%) from 65.541%
#4908

push

travis-ci

web-flow
enh: drop multi-stream (#33962)

60 of 106 new or added lines in 4 files covered. (56.6%)

1330 existing lines in 113 files now uncovered.

193461 of 295877 relevant lines covered (65.39%)

115765274.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/source/libs/qcom/src/queryUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "query.h"
18
#include "tglobal.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21
#include "tsched.h"
22
#include "tworker.h"
23
// clang-format off
24
#include "cJSON.h"
25
#include "queryInt.h"
26

27
typedef struct STaskQueue {
28
  SQueryAutoQWorkerPool wrokrerPool;
29
  STaosQueue* pTaskQueue;
30
} STaskQueue;
31

32
int32_t getAsofJoinReverseOp(EOperatorType op) {
90,543✔
33
  switch (op) {
90,543✔
34
    case OP_TYPE_GREATER_THAN:
2,728✔
35
      return OP_TYPE_LOWER_THAN;
2,728✔
36
    case OP_TYPE_GREATER_EQUAL:
2,728✔
37
      return OP_TYPE_LOWER_EQUAL;
2,728✔
38
    case OP_TYPE_LOWER_THAN:
3,544✔
39
      return OP_TYPE_GREATER_THAN;
3,544✔
40
    case OP_TYPE_LOWER_EQUAL:
5,860✔
41
      return OP_TYPE_GREATER_EQUAL;
5,860✔
42
    case OP_TYPE_EQUAL:
75,683✔
43
      return OP_TYPE_EQUAL;
75,683✔
44
    default:
×
45
      break;
×
46
  }
47

48
  return -1;
×
49
}
50

51
const SSchema* tGetTbnameColumnSchema() { 
×
52
  static struct SSchema _s = {
53
      .colId = TSDB_TBNAME_COLUMN_INDEX,
54
      .type = TSDB_DATA_TYPE_BINARY,
55
      .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
56
      .name = "tbname",
57
  };
58
  return &_s; 
×
59
}
60

61
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
63,693,486✔
62
  if (!pSchema) {
63,693,486✔
63
    return false;
×
64
  }
65
  int32_t rowLen = 0;
63,693,486✔
66

67
  for (int32_t i = 0; i < numOfCols; ++i) {
456,078,688✔
68
    // 1. valid types
69
    if (!isValidDataType(pSchema[i].type)) {
392,385,202✔
70
      qError("The %d col/tag data type error, type:%d", i, pSchema[i].type);
×
71
      return false;
×
72
    }
73

74
    // 2. valid length for each type
75
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY) {
392,385,750✔
76
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
25,927,662✔
77
        qError("The %d col/tag var data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
78
        return false;
×
79
      }
80
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
366,458,088✔
81
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
32,543,534✔
82
        qError("The %d col/tag nchar data len error, len:%d", i, pSchema[i].bytes);
×
83
        return false;
×
84
      }
85
    } else if (pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
333,914,554✔
86
      if (pSchema[i].bytes > TSDB_MAX_GEOMETRY_LEN) {
306,852✔
87
        qError("The %d col/tag geometry data len error, len:%d", i, pSchema[i].bytes);
×
88
        return false;
×
89
      }
90
    } else if (IS_STR_DATA_BLOB(pSchema[i].type)) {
333,607,702✔
91
      if (pSchema[i].bytes >= TSDB_MAX_BLOB_LEN) {
14,873✔
92
        qError("The %d col/tag blob data len error, len:%d", i, pSchema[i].bytes);
×
93
        return false;
×
94
      } 
95

96
    }else {
97
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
333,592,219✔
98
        qError("The %d col/tag data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
99
        return false;
×
100
      }
101
    }
102

103
    // 3. valid column names
104
    for (int32_t j = i + 1; j < numOfCols; ++j) {
2,147,483,647✔
105
      if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
2,147,483,647✔
106
        qError("The %d col/tag name %s is same with %d col/tag name %s", i, pSchema[i].name, j, pSchema[j].name);
×
107
        return false;
×
108
      }
109
    }
110

111
    rowLen += pSchema[i].bytes;
392,420,810✔
112
  }
113

114
  return rowLen <= maxLen;
63,693,486✔
115
}
116

117
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags, bool isVirtual) {
31,846,743✔
118
  if (!pSchema) {
31,846,743✔
119
    qError("invalid numOfCols: %d", numOfCols);
×
120
    return false;
×
121
  }
122

123
  if ((isVirtual && !VALIDNUMOFCOLSVIRTUAL(numOfCols)) || (!isVirtual && !VALIDNUMOFCOLS(numOfCols))) {
31,846,743✔
124
    qError("invalid numOfCols: %d", numOfCols);
×
125
    return false;
×
126
  }
127

128
  if (!VALIDNUMOFTAGS(numOfTags)) {
31,846,743✔
129
    qError("invalid numOfTags: %d", numOfTags);
548✔
130
    return false;
×
131
  }
132

133
  /* first column must be the timestamp, which is a primary key */
134
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
31,846,195✔
135
    qError("invalid first column type: %d", pSchema[0].type);
×
136
    return false;
×
137
  }
138

139
  if (!doValidateSchema(pSchema, numOfCols, isVirtual ? TSDB_MAX_BYTES_PER_ROW_VIRTUAL : TSDB_MAX_BYTES_PER_ROW)) {
31,846,195✔
140
    qError("validate schema columns failed");
×
141
    return false;
×
142
  }
143

144
  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
31,846,743✔
145
    qError("validate schema tags failed");
×
146
    return false;
×
147
  }
148

149
  return true;
31,846,195✔
150
}
151

152
static STaskQueue taskQueue = {0};
153

154
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
1,814,631,712✔
155
  if(!pSchedMsg || !pSchedMsg->ahandle) return;
1,814,631,712✔
156
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
1,814,635,045✔
157
  (void)execFn(pSchedMsg->thandle);
1,814,632,853✔
158
  taosFreeQitem(pSchedMsg);
1,814,527,460✔
159
}
160

161
int32_t initTaskQueue() {
1,135,414✔
162
  memset(&taskQueue, 0, sizeof(taskQueue));
1,135,414✔
163
  
164
  taskQueue.wrokrerPool.name = "taskWorkPool";
1,135,414✔
165
  taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
1,135,414✔
166
  taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
1,135,414✔
167
  int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
1,135,414✔
168
  if (TSDB_CODE_SUCCESS != coce) {
1,135,414✔
169
    qError("failed to init task thread pool");
×
170
    return -1;
×
171
  }
172

173
  taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
1,135,414✔
174
  if (NULL == taskQueue.pTaskQueue) {
1,135,414✔
175
    qError("failed to init task queue");
×
176
    return -1;
×
177
  }
178

179
  qInfo("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
1,135,414✔
180
  return 0;
1,135,414✔
181
}
182

183
int32_t cleanupTaskQueue() {
1,135,428✔
184
  tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
1,135,428✔
185
  return 0;
1,135,428✔
186
}
187

188
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
1,814,546,619✔
189
  SSchedMsg* pSchedMsg; 
1,807,689,930✔
190
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
1,814,560,152✔
191
  if (rc) return rc;
1,814,567,434✔
192
  pSchedMsg->fp = NULL;
1,814,567,434✔
193
  pSchedMsg->ahandle = execFn;
1,814,589,324✔
194
  pSchedMsg->thandle = execParam;
1,814,594,077✔
195
  pSchedMsg->msg = code;
1,814,596,355✔
196

197
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
1,814,602,595✔
198
}
199

200
int32_t taosAsyncWait() {
29,654✔
201
  if (!taskQueue.wrokrerPool.pCb) {
29,654✔
202
    qError("query task thread pool callback function is null");
×
203
    return -1;
×
204
  }
205
  return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
29,654✔
206
}
207

208
int32_t taosAsyncRecover() {
29,654✔
209
  if (!taskQueue.wrokrerPool.pCb) {
29,654✔
210
    qError("query task thread pool callback function is null");
×
211
    return -1;
×
212
  }
213
  return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
29,654✔
214
}
215

216
int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) {
×
217
  SSchedMsg* pSchedMsg;
×
218
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
×
219
  if (rc) return rc;
×
220
  pSchedMsg->fp = NULL;
×
221
  pSchedMsg->ahandle = bindFn;
×
222
  pSchedMsg->thandle = bindParam;
×
223
  // pSchedMsg->msg = code;
224

225
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
×
226
}
227

228
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
2,147,483,647✔
229
  if (NULL == pMsgBody) {
2,147,483,647✔
230
    return;
×
231
  }
232

233
  
234
  qDebug("ahandle %p freed, QID:0x%" PRIx64, pMsgBody, pMsgBody->requestId);
2,147,483,647✔
235
  
236
  taosMemoryFreeClear(pMsgBody->target.dbFName);
2,147,483,647✔
237
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
2,147,483,647✔
238
  if (pMsgBody->paramFreeFp) {
2,147,483,647✔
239
    (*pMsgBody->paramFreeFp)(pMsgBody->param);
2,147,483,647✔
240
  }
241
  taosMemoryFreeClear(pMsgBody);
2,147,483,647✔
242
}
243
void destroyAhandle(void *ahandle) {
311,969,024✔
244
  SMsgSendInfo *pSendInfo = ahandle;
311,969,024✔
245
  if (pSendInfo == NULL) return;
311,969,024✔
246

247
  if (pSendInfo->streamAHandle) {
311,969,024✔
248
    qDebug("stream ahandle %p freed", pSendInfo);
26,168,261✔
249
  }
250

251
  destroySendMsgInfo(pSendInfo);
312,060,068✔
252
}
253

254
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
1,710,151,704✔
255
                                bool persistHandle, void* rpcCtx) {                         
256
  if (NULL == pTransporter || NULL == epSet || NULL == pInfo) {
1,710,151,704✔
257
    destroySendMsgInfo(pInfo);
×
258
    return TSDB_CODE_TSC_INVALID_INPUT;
×
259
  }
260

261
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
1,710,352,841✔
262
  if (NULL == pMsg) {
1,709,943,338✔
263
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
×
264
    destroySendMsgInfo(pInfo);
×
265
    return terrno;
×
266
  }
267

268
  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
1,709,943,338✔
269
  SRpcMsg rpcMsg = {
1,710,071,196✔
270
    .msgType = pInfo->msgType,
1,710,325,020✔
271
    .pCont = pMsg,
272
    .contLen = pInfo->msgInfo.len,
1,710,370,035✔
273
    .info.ahandle = (void*)pInfo,
274
    .info.handle = pInfo->msgInfo.handle,
1,710,351,049✔
275
    .info.persistHandle = persistHandle,
276
    .code = 0
277
  };
278
  TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
1,710,245,985✔
279
  TRACE_SET_MSGID(&rpcMsg.info.traceId, tGenIdPI64());
1,710,307,471✔
280
  int32_t msgType = pInfo->msgType;
1,710,177,577✔
281

282
  int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
1,710,180,791✔
283
  if (code) {
1,710,395,542✔
284
    destroySendMsgInfo(pInfo);
211,819✔
285
  } else {
286
    qDebug("msg %s sent, 0x%" PRIx64 ":0x%" PRIx64, TMSG_INFO(msgType), TRACE_GET_ROOTID(&rpcMsg.info.traceId), TRACE_GET_MSGID(&rpcMsg.info.traceId));
1,710,183,723✔
287
  }
288
  return code;
1,710,364,098✔
289
}
290

291
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
301,421,696✔
292
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
301,421,696✔
293
}
294

295
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
189,804,953✔
296
  QUERY_PARAM_CHECK(pTransporter);
189,804,953✔
297
  return rpcFreeConnById(pTransporter, pid);
189,804,953✔
298
}
299

300
char* jobTaskStatusStr(int32_t status) {
2,147,483,647✔
301
  switch (status) {
2,147,483,647✔
302
    case JOB_TASK_STATUS_NULL:
675,225,947✔
303
      return "NULL";
675,225,947✔
304
    case JOB_TASK_STATUS_INIT:
1,597,829,177✔
305
      return "INIT";
1,597,829,177✔
306
    case JOB_TASK_STATUS_EXEC:
2,147,483,647✔
307
      return "EXECUTING";
2,147,483,647✔
308
    case JOB_TASK_STATUS_PART_SUCC:
1,608,618,204✔
309
      return "PARTIAL_SUCCEED";
1,608,618,204✔
310
    case JOB_TASK_STATUS_FETCH:
179,293,608✔
311
      return "FETCHING";
179,293,608✔
312
    case JOB_TASK_STATUS_SUCC:
365,239,160✔
313
      return "SUCCEED";
365,239,160✔
314
    case JOB_TASK_STATUS_FAIL:
5,711,961✔
315
      return "FAILED";
5,711,961✔
316
    case JOB_TASK_STATUS_DROP:
689,273,657✔
317
      return "DROPPING";
689,273,657✔
UNCOV
318
    default:
×
UNCOV
319
      break;
×
320
  }
321

UNCOV
322
  return "UNKNOWN";
×
323
}
324

325
#if 0
326
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
327
  SSchema s = {0};
328
  s.type = type;
329
  s.bytes = bytes;
330
  s.colId = colId;
331

332
  tstrncpy(s.name, name, tListLen(s.name));
333
  return s;
334
}
335
#endif
336

337
void freeSTableMetaRspPointer(void *p) {
53,957,479✔
338
  tFreeSTableMetaRsp(*(void**)p);
53,957,479✔
339
  taosMemoryFreeClear(*(void**)p);
53,952,048✔
340
}
53,953,849✔
341

342
void destroyQueryExecRes(SExecResult* pRes) {
2,125,011,989✔
343
  if (NULL == pRes || NULL == pRes->res) {
2,125,011,989✔
344
    return;
1,440,778,828✔
345
  }
346

347
  switch (pRes->msgType) {
684,239,531✔
348
    case TDMT_VND_CREATE_TABLE: {
47,896,074✔
349
      taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
47,896,074✔
350
      break;
47,890,986✔
351
    }
352
    case TDMT_MND_CREATE_STB:
9,354,086✔
353
    case TDMT_VND_ALTER_TABLE:
354
    case TDMT_MND_ALTER_STB: {
355
      tFreeSTableMetaRsp(pRes->res);
9,354,086✔
356
      taosMemoryFreeClear(pRes->res);
9,354,086✔
357
      break;
9,354,086✔
358
    }
359
    case TDMT_VND_SUBMIT: {
484,301,392✔
360
      tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
484,301,392✔
361
      taosMemoryFreeClear(pRes->res);
484,296,673✔
362
      break;
484,301,755✔
363
    }
364
    case TDMT_SCH_QUERY:
142,683,422✔
365
    case TDMT_SCH_MERGE_QUERY: {
366
      qDebug("query execRes %p freed", pRes->res);
142,683,422✔
367
      taosArrayDestroy((SArray*)pRes->res);
142,683,422✔
368
      break;
142,683,413✔
369
    }
370
    default:
56✔
371
      qError("invalid exec result for request type:%d", pRes->msgType);
56✔
372
  }
373
}
374
// clang-format on
375
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
57,063✔
376
  QUERY_PARAM_CHECK(str);
57,063✔
377
  QUERY_PARAM_CHECK(buf);
57,063✔
378
  int32_t n = 0;
57,063✔
379

380
  switch (type) {
57,063✔
381
    case TSDB_DATA_TYPE_NULL:
×
382
      n = tsnprintf(str, capacity, "null");
×
383
      break;
×
384

385
    case TSDB_DATA_TYPE_BOOL:
2,212✔
386
      n = tsnprintf(str, capacity, (*(int8_t*)buf) ? "true" : "false");
2,212✔
387
      break;
2,212✔
388

389
    case TSDB_DATA_TYPE_TINYINT:
156✔
390
      n = tsnprintf(str, capacity, "%d", *(int8_t*)buf);
156✔
391
      break;
156✔
392

393
    case TSDB_DATA_TYPE_SMALLINT:
156✔
394
      n = tsnprintf(str, capacity, "%d", *(int16_t*)buf);
156✔
395
      break;
156✔
396

397
    case TSDB_DATA_TYPE_INT:
13,541✔
398
      n = tsnprintf(str, capacity, "%d", *(int32_t*)buf);
13,541✔
399
      break;
13,541✔
400

401
    case TSDB_DATA_TYPE_BIGINT:
1,039✔
402
    case TSDB_DATA_TYPE_TIMESTAMP:
403
      n = tsnprintf(str, capacity, "%" PRId64, *(int64_t*)buf);
1,039✔
404
      break;
1,039✔
405

406
    case TSDB_DATA_TYPE_FLOAT:
1,485✔
407
      n = tsnprintf(str, capacity, "%e", GET_FLOAT_VAL(buf));
1,485✔
408
      break;
1,485✔
409

410
    case TSDB_DATA_TYPE_DOUBLE:
758✔
411
      n = tsnprintf(str, capacity, "%e", GET_DOUBLE_VAL(buf));
758✔
412
      break;
758✔
413

414
    case TSDB_DATA_TYPE_VARBINARY: {
×
415
      if (bufSize < 0) {
×
416
        //        tscError("invalid buf size");
417
        return TSDB_CODE_TSC_INVALID_VALUE;
×
418
      }
419
      void*    data = NULL;
×
420
      uint32_t size = 0;
×
421
      if (taosAscii2Hex(buf, bufSize, &data, &size) < 0) {
×
422
        return TSDB_CODE_OUT_OF_MEMORY;
×
423
      }
424
      *str = '"';
×
425
      memcpy(str + 1, data, size);
×
426
      *(str + size + 1) = '"';
×
427
      n = size + 2;
×
428
      taosMemoryFree(data);
×
429
      break;
×
430
    }
431
    case TSDB_DATA_TYPE_BINARY:
4,093✔
432
    case TSDB_DATA_TYPE_GEOMETRY:
433
      if (bufSize < 0) {
4,093✔
434
        //        tscError("invalid buf size");
435
        return TSDB_CODE_TSC_INVALID_VALUE;
×
436
      }
437

438
      *str = '"';
4,093✔
439
      memcpy(str + 1, buf, bufSize);
4,093✔
440
      *(str + bufSize + 1) = '"';
4,093✔
441
      n = bufSize + 2;
4,093✔
442
      break;
4,093✔
443
    case TSDB_DATA_TYPE_NCHAR:
30,311✔
444
      if (bufSize < 0) {
30,311✔
445
        //        tscError("invalid buf size");
446
        return TSDB_CODE_TSC_INVALID_VALUE;
×
447
      }
448

449
      *str = '"';
30,311✔
450
      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1, NULL);
30,311✔
451
      if (length < 0) {
30,311✔
452
        return TSDB_CODE_TSC_INVALID_VALUE;
×
453
      }
454
      *(str + length + 1) = '"';
30,311✔
455
      n = length + 2;
30,311✔
456
      break;
30,311✔
457
    case TSDB_DATA_TYPE_UTINYINT:
156✔
458
      n = tsnprintf(str, capacity, "%d", *(uint8_t*)buf);
156✔
459
      break;
156✔
460

461
    case TSDB_DATA_TYPE_USMALLINT:
156✔
462
      n = tsnprintf(str, capacity, "%d", *(uint16_t*)buf);
156✔
463
      break;
156✔
464

465
    case TSDB_DATA_TYPE_UINT:
156✔
466
      n = tsnprintf(str, capacity, "%u", *(uint32_t*)buf);
156✔
467
      break;
156✔
468

469
    case TSDB_DATA_TYPE_UBIGINT:
2,844✔
470
      n = tsnprintf(str, capacity, "%" PRIu64, *(uint64_t*)buf);
2,844✔
471
      break;
2,844✔
472

473
    default:
×
474
      //      tscError("unsupported type:%d", type);
475
      return TSDB_CODE_TSC_INVALID_VALUE;
×
476
  }
477

478
  if (len) *len = n;
57,063✔
479

480
  return TSDB_CODE_SUCCESS;
57,063✔
481
}
482

483
void parseTagDatatoJson(void* p, char** jsonStr, void* charsetCxt) {
325,925✔
484
  if (!p || !jsonStr) {
325,925✔
485
    qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
×
486
    return;
×
487
  }
488
  char*   string = NULL;
325,925✔
489
  SArray* pTagVals = NULL;
325,925✔
490
  cJSON*  json = NULL;
325,925✔
491
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
325,925✔
492
    goto end;
×
493
  }
494

495
  int16_t nCols = taosArrayGetSize(pTagVals);
325,925✔
496
  if (nCols == 0) {
325,925✔
497
    goto end;
124,940✔
498
  }
499
  char tagJsonKey[256] = {0};
200,985✔
500
  json = cJSON_CreateObject();
200,985✔
501
  if (json == NULL) {
200,985✔
502
    goto end;
×
503
  }
504
  for (int j = 0; j < nCols; ++j) {
637,336✔
505
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
436,351✔
506
    if (pTagVal == NULL) {
436,351✔
507
      continue;
×
508
    }
509
    // json key  encode by binary
510
    tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
436,351✔
511
    // json value
512
    char type = pTagVal->type;
436,351✔
513
    if (type == TSDB_DATA_TYPE_NULL) {
436,351✔
514
      cJSON* value = cJSON_CreateNull();
26,779✔
515
      if (value == NULL) {
26,779✔
516
        goto end;
×
517
      }
518
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
26,779✔
519
        goto end;
×
520
      }
521
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
409,572✔
522
      cJSON* value = NULL;
320,245✔
523
      if (pTagVal->nData > 0) {
320,245✔
524
        char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
299,173✔
525
        if (tagJsonValue == NULL) {
299,173✔
526
          goto end;
×
527
        }
528
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue, charsetCxt);
299,173✔
529
        if (length < 0) {
299,173✔
530
          qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC,
×
531
                 charsetCxt != NULL ? ((SConvInfo*)(charsetCxt))->charset : tsCharset, pTagVal->pData);
532
          taosMemoryFree(tagJsonValue);
×
533
          goto end;
×
534
        }
535
        value = cJSON_CreateString(tagJsonValue);
299,173✔
536
        taosMemoryFree(tagJsonValue);
299,173✔
537
        if (value == NULL) {
299,173✔
538
          goto end;
×
539
        }
540
      } else if (pTagVal->nData == 0) {
21,072✔
541
        value = cJSON_CreateString("");
21,072✔
542
        if (value == NULL) {
21,072✔
543
          goto end;
×
544
        }
545
      } else {
546
        goto end;
×
547
      }
548

549
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
320,245✔
550
        goto end;
×
551
      }
552
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
89,327✔
553
      double jsonVd = *(double*)(&pTagVal->i64);
61,670✔
554
      cJSON* value = cJSON_CreateNumber(jsonVd);
61,670✔
555
      if (value == NULL) {
61,670✔
556
        goto end;
×
557
      }
558
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
61,670✔
559
        goto end;
×
560
      }
561
    } else if (type == TSDB_DATA_TYPE_BOOL) {
27,657✔
562
      char   jsonVd = *(char*)(&pTagVal->i64);
27,657✔
563
      cJSON* value = cJSON_CreateBool(jsonVd);
27,657✔
564
      if (value == NULL) {
27,657✔
565
        goto end;
×
566
      }
567
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
27,657✔
568
        goto end;
×
569
      }
570
    } else {
571
      goto end;
×
572
    }
573
  }
574
  string = cJSON_PrintUnformatted(json);
200,985✔
575
end:
325,925✔
576
  cJSON_Delete(json);
325,925✔
577
  taosArrayDestroy(pTagVals);
325,925✔
578
  if (string == NULL) {
325,925✔
579
    string = taosStrdup(TSDB_DATA_NULL_STR_L);
124,940✔
580
    if (string == NULL) {
124,940✔
581
      qError("failed to strdup null string");
×
582
    }
583
  }
584
  *jsonStr = string;
325,925✔
585
}
586

587
int32_t setColRef(SColRef* colRef, col_id_t colId, char* refColName, char* refTableName, char* refDbName) {
106,533,391✔
588
  colRef->id = colId;
106,533,391✔
589
  colRef->hasRef = true;
106,533,391✔
590
  tstrncpy(colRef->refDbName, refDbName, TSDB_DB_NAME_LEN);
106,533,391✔
591
  tstrncpy(colRef->refTableName, refTableName, TSDB_TABLE_NAME_LEN);
106,533,391✔
592
  tstrncpy(colRef->refColName, refColName, TSDB_COL_NAME_LEN);
106,533,391✔
593
  return TSDB_CODE_SUCCESS;
106,533,391✔
594
}
595

596
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
33,886,068✔
597
  QUERY_PARAM_CHECK(pDst);
33,886,068✔
598
  if (NULL == pSrc) {
33,886,068✔
599
    *pDst = NULL;
64,368✔
600
    return TSDB_CODE_SUCCESS;
64,368✔
601
  }
602

603
  int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
33,821,700✔
604
  if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
33,826,885✔
605
    *pDst = NULL;
4,890✔
606
    qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
10✔
607
    return TSDB_CODE_INVALID_PARA;
×
608
  }
609

610
  int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
33,821,995✔
611
  int32_t schemaExtSize = 0;
33,821,995✔
612
  int32_t colRefSize = 0;
33,821,995✔
613
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
33,821,995✔
614
    schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
33,307,239✔
615
  }
616
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
33,821,385✔
617
    colRefSize = pSrc->numOfColRefs * sizeof(SColRef);
39,596✔
618
  }
619
  *pDst = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize);
33,825,045✔
620
  if (NULL == *pDst) {
33,800,350✔
621
    return terrno;
×
622
  }
623
  memcpy(*pDst, pSrc, metaSize);
33,801,875✔
624
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
33,802,790✔
625
    (*pDst)->schemaExt = (SSchemaExt*)((char*)*pDst + metaSize);
33,309,984✔
626
    memcpy((*pDst)->schemaExt, pSrc->schemaExt, schemaExtSize);
33,312,119✔
627
  } else {
628
    (*pDst)->schemaExt = NULL;
519,941✔
629
  }
630
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
33,828,705✔
631
    (*pDst)->colRef = (SColRef*)((char*)*pDst + metaSize + schemaExtSize);
39,596✔
632
    memcpy((*pDst)->colRef, pSrc->colRef, colRefSize);
39,596✔
633
  } else {
634
    (*pDst)->colRef = NULL;
33,784,229✔
635
  }
636

637
  return TSDB_CODE_SUCCESS;
33,828,705✔
638
}
639

640
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
5,500✔
641
  if (!pMeta || !pName || !pType) return;
5,500✔
642
  int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
5,500✔
643
  for (int32_t i = 0; i < nums; ++i) {
34,308✔
644
    if (0 == strcmp(pName, pMeta->schema[i].name)) {
33,966✔
645
      *pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
5,158✔
646
      return;
5,158✔
647
    }
648
  }
649

650
  *pType = TCOL_TYPE_NONE;
342✔
651
}
652

653
void freeVgInfo(SDBVgInfo* vgInfo) {
35,135,138✔
654
  if (NULL == vgInfo) {
35,135,138✔
655
    return;
9,244,853✔
656
  }
657

658
  taosHashCleanup(vgInfo->vgHash);
25,890,285✔
659
  taosArrayDestroy(vgInfo->vgArray);
25,892,391✔
660

661
  taosMemoryFreeClear(vgInfo);
25,892,391✔
662
}
663

664
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
3,843,499✔
665
  QUERY_PARAM_CHECK(pDst);
3,843,499✔
666
  if (NULL == pSrc) {
3,843,499✔
667
    *pDst = NULL;
×
668
    return TSDB_CODE_SUCCESS;
×
669
  }
670

671
  *pDst = taosMemoryMalloc(sizeof(*pSrc));
3,843,499✔
672
  if (NULL == *pDst) {
3,843,499✔
673
    return terrno;
×
674
  }
675
  memcpy(*pDst, pSrc, sizeof(*pSrc));
3,843,499✔
676
  (*pDst)->vgArray = NULL;
3,843,499✔
677

678
  if (pSrc->vgHash) {
3,843,499✔
679
    (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
3,353,938✔
680
                                   HASH_ENTRY_LOCK);
681
    if (NULL == (*pDst)->vgHash) {
3,353,938✔
682
      taosMemoryFreeClear(*pDst);
×
683
      return terrno;
×
684
    }
685

686
    SVgroupInfo* vgInfo = NULL;
3,353,938✔
687
    void*        pIter = taosHashIterate(pSrc->vgHash, NULL);
3,353,938✔
688
    while (pIter) {
9,985,130✔
689
      vgInfo = pIter;
6,631,192✔
690
      int32_t* vgId = taosHashGetKey(pIter, NULL);
6,631,192✔
691

692
      if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
6,631,192✔
693
        qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
×
694
        taosHashCancelIterate(pSrc->vgHash, pIter);
×
695
        freeVgInfo(*pDst);
×
696
        return terrno;
×
697
      }
698

699
      pIter = taosHashIterate(pSrc->vgHash, pIter);
6,631,192✔
700
    }
701
  }
702

703
  return TSDB_CODE_SUCCESS;
3,843,499✔
704
}
705

706
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
6,948,508✔
707
  QUERY_PARAM_CHECK(pDst);
6,948,508✔
708
  if (NULL == pSrc) {
6,948,508✔
709
    *pDst = NULL;
105,624✔
710
    return TSDB_CODE_SUCCESS;
105,624✔
711
  }
712

713
  *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
6,842,884✔
714
  if (NULL == *pDst) {
6,834,356✔
715
    return terrno;
×
716
  }
717

718
  (*pDst)->flags = pSrc->flags;
6,834,661✔
719
  if (pSrc->name) {
6,842,896✔
720
    (*pDst)->name = taosStrdup(pSrc->name);
6,842,962✔
721
    if (NULL == (*pDst)->name) goto _exit;
6,846,317✔
722
  }
723
  (*pDst)->uid = pSrc->uid;
6,846,251✔
724
  (*pDst)->btime = pSrc->btime;
6,847,166✔
725
  (*pDst)->ttl = pSrc->ttl;
6,845,031✔
726
  (*pDst)->commentLen = pSrc->commentLen;
6,843,506✔
727
  if (pSrc->comment) {
6,841,676✔
728
    (*pDst)->comment = taosStrdup(pSrc->comment);
×
729
    if (NULL == (*pDst)->comment) goto _exit;
1✔
730
  }
731
  (*pDst)->type = pSrc->type;
6,844,714✔
732

733
  if (pSrc->type == TSDB_CHILD_TABLE) {
6,845,019✔
734
    if (pSrc->ctb.stbName) {
6,842,884✔
735
      (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
6,843,838✔
736
      if (NULL == (*pDst)->ctb.stbName) goto _exit;
6,846,514✔
737
    }
738
    (*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
6,842,205✔
739
    (*pDst)->ctb.suid = pSrc->ctb.suid;
6,844,645✔
740
    if (pSrc->ctb.tagName) {
6,845,560✔
741
      (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
6,846,490✔
742
      if (NULL == (*pDst)->ctb.tagName) goto _exit;
6,848,148✔
743
    }
744
    STag* pTag = (STag*)pSrc->ctb.pTag;
6,846,303✔
745
    if (pTag) {
6,847,523✔
746
      (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
6,847,564✔
747
      if (NULL == (*pDst)->ctb.pTag) goto _exit;
6,846,941✔
748
      memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
6,845,721✔
749
    }
750
  } else {
751
    (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
×
752
    (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
×
753
    if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
×
754
      (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
755
      if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
×
756
      memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
757
    }
758
  }
759

760
  return TSDB_CODE_SUCCESS;
6,842,020✔
761

762
_exit:
×
763
  tdDestroySVCreateTbReq(*pDst);
×
764
  taosMemoryFree(*pDst);
×
765
  *pDst = NULL;
×
766
  return terrno;
×
767
}
768

769
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
11,791,803✔
770
  if (pInfo) {
11,791,803✔
771
    taosArrayDestroy(pInfo->pRetensions);
3,620,236✔
772
  }
773
  taosMemoryFree(pInfo);
11,791,803✔
774
}
11,791,803✔
775

776
void* getTaskPoolWorkerCb() { return taskQueue.wrokrerPool.pCb; }
664,834,997✔
777

778
void tFreeStreamVtbOtbInfo(void* param);
779
void tFreeStreamVtbVtbInfo(void* param);
780
void tFreeStreamVtbDbVgInfo(void* param);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc