• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.68
/source/libs/qcom/src/queryUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "query.h"
18
#include "tglobal.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21
#include "tsched.h"
22
#include "tworker.h"
23
// clang-format off
24
#include "cJSON.h"
25
#include "queryInt.h"
26

27
typedef struct STaskQueue {
28
  SQueryAutoQWorkerPool wrokrerPool;
29
  STaosQueue* pTaskQueue;
30
} STaskQueue;
31

32
int32_t getAsofJoinReverseOp(EOperatorType op) {
112,359✔
33
  switch (op) {
112,359✔
34
    case OP_TYPE_GREATER_THAN:
3,308✔
35
      return OP_TYPE_LOWER_THAN;
3,308✔
36
    case OP_TYPE_GREATER_EQUAL:
3,308✔
37
      return OP_TYPE_LOWER_EQUAL;
3,308✔
38
    case OP_TYPE_LOWER_THAN:
5,413✔
39
      return OP_TYPE_GREATER_THAN;
5,413✔
40
    case OP_TYPE_LOWER_EQUAL:
7,043✔
41
      return OP_TYPE_GREATER_EQUAL;
7,043✔
42
    case OP_TYPE_EQUAL:
93,287✔
43
      return OP_TYPE_EQUAL;
93,287✔
44
    default:
×
45
      break;
×
46
  }
47

48
  return -1;
×
49
}
50

51
const SSchema* tGetTbnameColumnSchema() { 
×
52
  static struct SSchema _s = {
53
      .colId = TSDB_TBNAME_COLUMN_INDEX,
54
      .type = TSDB_DATA_TYPE_BINARY,
55
      .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
56
      .name = "tbname",
57
  };
58
  return &_s; 
×
59
}
60

61
bool hasDecimalBytesTypeInfo(int32_t bytes) { return (((uint32_t)bytes >> 24) != 0); }
×
62

63
/*
64
 * Build a comparable ref-column type from schema metadata.
65
 *
66
 * typeMod carries decimal precision/scale when schemaExt is present. When it is
67
 * absent, fall back to the legacy decimal metadata encoded in schema bytes.
68
 */
69
void schemaToRefDataType(const SSchema* pSchema, STypeMod typeMod, SDataType* pType) {
376,910,100✔
70
  if (NULL == pSchema || NULL == pType) {
376,910,100✔
71
    return;
×
72
  }
73

74
  memset(pType, 0, sizeof(*pType));
376,910,748✔
75
  pType->type = pSchema->type;
376,910,748✔
76
  pType->bytes = pSchema->bytes;
376,913,332✔
77
  if (IS_DECIMAL_TYPE(pSchema->type)) {
376,911,396✔
78
    if (typeMod != 0) {
410,563✔
79
      fillTypeFromTypeMod(pType, typeMod);
410,563✔
80
    } else if ((((uint32_t)pSchema->bytes) >> 24) != 0) {
×
81
      extractDecimalTypeInfoFromBytes(&pType->bytes, &pType->precision, &pType->scale);
×
82
    }
83
  }
84
}
85

86
bool isSameRefDataType(const SDataType* pLeft, const SDataType* pRight) {
143,014,907✔
87
  if (pLeft->type != pRight->type) {
143,014,907✔
88
    return false;
7,038✔
89
  }
90
  if (!IS_VAR_DATA_TYPE(pLeft->type) && pLeft->bytes != pRight->bytes) {
143,007,869✔
91
    return false;
×
92
  }
93
  if (IS_DECIMAL_TYPE(pLeft->type) &&
143,007,869✔
94
      (pLeft->precision != pRight->precision || pLeft->scale != pRight->scale)) {
16,952✔
95
    return false;
3,260✔
96
  }
97

98
  return true;
143,004,609✔
99
}
100

101
int32_t getNormalColSchemaIndex(const STableMeta* pTableMeta, const char* pColName) {
265,430,224✔
102
  if (NULL == pTableMeta || NULL == pColName) {
265,430,224✔
103
    return -1;
×
104
  }
105

106
  for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) {
2,147,483,647✔
107
    if (0 == strcmp(pColName, pTableMeta->schema[i].name)) {
2,147,483,647✔
108
      return i;
265,426,057✔
109
    }
110
  }
111

112
  return -1;
4,167✔
113
}
114

115
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
79,534,762✔
116
  if (!pSchema) {
79,534,762✔
117
    return false;
×
118
  }
119
  int32_t    rowLen = 0;
79,534,762✔
120
  SSHashObj* pNameHash = tSimpleHashInit(numOfCols, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
79,534,762✔
121
  if (!pNameHash) {
79,535,562✔
122
    return false;
×
123
  }
124

125
  for (int32_t i = 0; i < numOfCols; ++i) {
644,193,689✔
126
    // 1. valid types
127
    if (!isValidDataType(pSchema[i].type)) {
564,658,913✔
UNCOV
128
      qError("The %d col/tag data type error, type:%d", i, pSchema[i].type);
×
129
      goto _error;
×
130
    }
131

132
    // 2. valid length for each type
133
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY) {
564,657,622✔
134
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
33,818,815✔
135
        qError("The %d col/tag var data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
136
        goto _error;
×
137
      }
138
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
530,839,297✔
139
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
40,776,513✔
140
        qError("The %d col/tag nchar data len error, len:%d", i, pSchema[i].bytes);
×
141
        goto _error;
×
142
      }
143
    } else if (pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
490,062,784✔
144
      if (pSchema[i].bytes > TSDB_MAX_GEOMETRY_LEN) {
354,386✔
145
        qError("The %d col/tag geometry data len error, len:%d", i, pSchema[i].bytes);
×
146
        goto _error;
×
147
      }
148
    } else if (IS_STR_DATA_BLOB(pSchema[i].type)) {
489,708,780✔
149
      if (pSchema[i].bytes >= TSDB_MAX_BLOB_LEN) {
37,755✔
150
        qError("The %d col/tag blob data len error, len:%d", i, pSchema[i].bytes);
×
151
        goto _error;
×
152
      } 
153

154
    } else {
155
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
489,671,048✔
156
        qError("The %d col/tag data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
157
        goto _error;
×
158
      }
159
    }
160

161
    // 3. valid column names
162
    size_t   nameLen = strnlen(pSchema[i].name, sizeof(pSchema[i].name));
564,658,530✔
163
    int32_t  nameIdx = i;
564,658,559✔
164
    int32_t* pIdx = tSimpleHashGet(pNameHash, pSchema[i].name, nameLen);
564,658,153✔
165
    if (pIdx != NULL) {
564,658,075✔
166
      qError("The %d col/tag name %s is same with %d col/tag name %s", i, pSchema[i].name, *pIdx,
×
167
             pSchema[*pIdx].name);
168
      goto _error;
×
169
    }
170
    if (tSimpleHashPut(pNameHash, pSchema[i].name, nameLen, &nameIdx, sizeof(nameIdx)) != TSDB_CODE_SUCCESS) {
564,658,075✔
171
      goto _error;
×
172
    }
173

174
    rowLen += pSchema[i].bytes;
564,658,431✔
175
  }
176

177
  tSimpleHashCleanup(pNameHash);
79,534,776✔
178
  return rowLen <= maxLen;
79,535,166✔
179
_error:
×
180
  tSimpleHashCleanup(pNameHash);
×
181
  return false;
×
182
}
183

184
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags, bool isVirtual) {
39,767,388✔
185
  if (!pSchema) {
39,767,388✔
186
    qError("invalid numOfCols: %d", numOfCols);
×
187
    return false;
×
188
  }
189

190
  if ((isVirtual && !VALIDNUMOFCOLSVIRTUAL(numOfCols)) || (!isVirtual && !VALIDNUMOFCOLS(numOfCols))) {
39,767,388✔
191
    qError("invalid numOfCols: %d", numOfCols);
×
192
    return false;
×
193
  }
194

195
  if (!VALIDNUMOFTAGS(numOfTags)) {
39,767,388✔
196
    qError("invalid numOfTags: %d", numOfTags);
50✔
197
    return false;
×
198
  }
199

200
  /* first column must be the timestamp, which is a primary key */
201
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
39,767,338✔
202
    qError("invalid first column type: %d", pSchema[0].type);
×
203
    return false;
×
204
  }
205

206
  if (!doValidateSchema(pSchema, numOfCols, isVirtual ? TSDB_MAX_BYTES_PER_ROW_VIRTUAL : TSDB_MAX_BYTES_PER_ROW)) {
39,767,374✔
207
    qError("validate schema columns failed");
×
208
    return false;
×
209
  }
210

211
  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
39,767,374✔
212
    qError("validate schema tags failed");
×
213
    return false;
×
214
  }
215

216
  return true;
39,767,374✔
217
}
218

219
static STaskQueue taskQueue = {0};
220
static int32_t    shutdownSentinel = 0;
221

222
bool beginAsyncWorkShutdown() {
1,581,717✔
223
  return atomic_val_compare_exchange_32(&shutdownSentinel, 0, 1) == 0;
1,581,717✔
224
}
225

226
bool mayCreateAsyncWork() { return atomic_load_32(&shutdownSentinel) == 0; }
2,147,483,647✔
227

228
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
2,147,483,647✔
229
  if(!pSchedMsg || !pSchedMsg->ahandle) return;
2,147,483,647✔
230
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
2,147,483,647✔
231
  (void)execFn(pSchedMsg->thandle);
2,147,483,647✔
232
  taosFreeQitem(pSchedMsg);
2,147,483,647✔
233
}
234

235
int32_t initTaskQueue() {
1,582,400✔
236
  memset(&taskQueue, 0, sizeof(taskQueue));
1,582,400✔
237
  
238
  taskQueue.wrokrerPool.name = "taskWorkPool";
1,582,400✔
239
  taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
1,582,400✔
240
  taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
1,582,400✔
241
  int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
1,582,400✔
242
  if (TSDB_CODE_SUCCESS != coce) {
1,582,400✔
UNCOV
243
    qError("failed to init task thread pool");
×
UNCOV
244
    return -1;
×
245
  }
246

247
  taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
1,582,400✔
248
  if (NULL == taskQueue.pTaskQueue) {
1,582,400✔
UNCOV
249
    qError("failed to init task queue");
×
250
    return -1;
×
251
  }
252

253
  qInfo("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
1,582,400✔
254
  return 0;
1,582,400✔
255
}
256

257
int32_t cleanupTaskQueue() {
1,582,451✔
258
  tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
1,582,451✔
259
  return 0;
1,582,451✔
260
}
261

262
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
2,147,483,647✔
263
  SSchedMsg* pSchedMsg; 
2,147,483,647✔
264
  if (!mayCreateAsyncWork()) {
2,147,483,647✔
265
    return TSDB_CODE_APP_IS_STOPPING;
315,427✔
266
  }
267

268
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
2,147,483,647✔
269
  if (rc) {
2,147,483,647✔
NEW
270
    return rc;
×
271
  }
272
  pSchedMsg->fp = NULL;
2,147,483,647✔
273
  pSchedMsg->ahandle = execFn;
2,147,483,647✔
274
  pSchedMsg->thandle = execParam;
2,147,483,647✔
275
  pSchedMsg->msg = code;
2,147,483,647✔
276

277
  rc = taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
2,147,483,647✔
278
  if (rc) {
2,147,483,647✔
NEW
279
    taosFreeQitem(pSchedMsg);
×
280
  }
281

282
  return rc;
2,147,483,647✔
283
}
284

285
int32_t taosAsyncWait() {
548,485✔
286
  if (!taskQueue.wrokrerPool.pCb) {
548,485✔
UNCOV
287
    qError("query task thread pool callback function is null");
×
288
    return -1;
×
289
  }
290
  return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
548,485✔
291
}
292

293
int32_t taosAsyncRecover() {
548,485✔
294
  if (!taskQueue.wrokrerPool.pCb) {
548,485✔
UNCOV
295
    qError("query task thread pool callback function is null");
×
UNCOV
296
    return -1;
×
297
  }
298
  return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
548,485✔
299
}
300

NEW
301
int32_t taosStmt2AsyncBind(__async_exec_fn_t execFn, void* execParam) {
×
UNCOV
302
  SSchedMsg* pSchedMsg;
×
NEW
303
  if (!mayCreateAsyncWork()) {
×
NEW
304
    return TSDB_CODE_APP_IS_STOPPING;
×
305
  }
306

UNCOV
307
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
×
NEW
308
  if (rc) {
×
NEW
309
    return rc;
×
310
  }
311

312
  pSchedMsg->fp = NULL;
×
NEW
313
  pSchedMsg->ahandle = execFn;
×
NEW
314
  pSchedMsg->thandle = execParam;
×
315
  // pSchedMsg->msg = code;
316

NEW
317
  rc = taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
×
NEW
318
  if (rc) {
×
NEW
319
    taosFreeQitem(pSchedMsg);
×
320
  }
321

NEW
322
  return rc;
×
323
}
324

325
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
2,147,483,647✔
326
  if (NULL == pMsgBody) {
2,147,483,647✔
UNCOV
327
    return;
×
328
  }
329

330
  
331
  qDebug("ahandle %p freed, QID:0x%" PRIx64, pMsgBody, pMsgBody->requestId);
2,147,483,647✔
332
  
333
  taosMemoryFreeClear(pMsgBody->target.dbFName);
2,147,483,647✔
334
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
2,147,483,647✔
335
  if (pMsgBody->paramFreeFp) {
2,147,483,647✔
336
    (*pMsgBody->paramFreeFp)(pMsgBody->param);
2,147,483,647✔
337
  }
338
  taosMemoryFreeClear(pMsgBody);
2,147,483,647✔
339
}
340
void destroyAhandle(void *ahandle) {
568,218,249✔
341
  SMsgSendInfo *pSendInfo = ahandle;
568,218,249✔
342
  if (pSendInfo == NULL) return;
568,218,249✔
343

344
  if (pSendInfo->streamAHandle) {
568,218,249✔
345
    qDebug("stream ahandle %p freed", pSendInfo);
43,382,742✔
346
  }
347

348
  destroySendMsgInfo(pSendInfo);
568,283,516✔
349
}
350

351
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
2,147,483,647✔
352
                                bool persistHandle, void* rpcCtx) {                         
353
  if (NULL == pTransporter || NULL == epSet || NULL == pInfo) {
2,147,483,647✔
UNCOV
354
    destroySendMsgInfo(pInfo);
×
UNCOV
355
    return TSDB_CODE_TSC_INVALID_INPUT;
×
356
  }
357

358
  if (!mayCreateAsyncWork()) {
2,147,483,647✔
359
    destroySendMsgInfo(pInfo);
824,534✔
360
    return TSDB_CODE_APP_IS_STOPPING;
824,534✔
361
  }
362

363
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
2,147,483,647✔
364
  if (NULL == pMsg) {
2,147,483,647✔
UNCOV
365
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
×
UNCOV
366
    destroySendMsgInfo(pInfo);
×
UNCOV
367
    return terrno;
×
368
  }
369

370
  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
2,147,483,647✔
371
  taosMemoryFreeClear(pInfo->msgInfo.pData);
2,147,483,647✔
372
  SRpcMsg rpcMsg = {
2,147,483,647✔
373
    .msgType = pInfo->msgType,
2,147,483,647✔
374
    .pCont = pMsg,
375
    .contLen = pInfo->msgInfo.len,
2,147,483,647✔
376
    .info.ahandle = (void*)pInfo,
377
    .info.handle = pInfo->msgInfo.handle,
2,147,483,647✔
378
    .info.persistHandle = persistHandle,
379
    .code = 0
380
  };
381
  TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
2,147,483,647✔
382
  TRACE_SET_MSGID(&rpcMsg.info.traceId, tGenIdPI64());
2,147,483,647✔
383
  int32_t msgType = pInfo->msgType;
2,147,483,647✔
384

385
  int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
2,147,483,647✔
386
  if (code) {
2,147,483,647✔
387
    destroySendMsgInfo(pInfo);
264,253✔
388
  } else {
389
    qDebug("msg %s sent, 0x%" PRIx64 ":0x%" PRIx64, TMSG_INFO(msgType), TRACE_GET_ROOTID(&rpcMsg.info.traceId), TRACE_GET_MSGID(&rpcMsg.info.traceId));
2,147,483,647✔
390
  }
391
  return code;
2,147,483,647✔
392
}
393

394
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
1,010,914,623✔
395
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
1,010,914,623✔
396
}
397

398
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
304,974,617✔
399
  QUERY_PARAM_CHECK(pTransporter);
304,974,617✔
400
  return rpcFreeConnById(pTransporter, pid);
304,974,617✔
401
}
402

403
char* jobTaskStatusStr(int32_t status) {
2,147,483,647✔
404
  switch (status) {
2,147,483,647✔
405
    case JOB_TASK_STATUS_NULL:
840,375,753✔
406
      return "NULL";
840,375,753✔
407
    case JOB_TASK_STATUS_INIT:
1,804,382,996✔
408
      return "INIT";
1,804,382,996✔
409
    case JOB_TASK_STATUS_EXEC:
2,147,483,647✔
410
      return "EXECUTING";
2,147,483,647✔
411
    case JOB_TASK_STATUS_PART_SUCC:
1,854,487,887✔
412
      return "PARTIAL_SUCCEED";
1,854,487,887✔
413
    case JOB_TASK_STATUS_FETCH:
227,896,368✔
414
      return "FETCHING";
227,896,368✔
415
    case JOB_TASK_STATUS_SUCC:
494,579,905✔
416
      return "SUCCEED";
494,579,905✔
417
    case JOB_TASK_STATUS_FAIL:
9,844,826✔
418
      return "FAILED";
9,844,826✔
419
    case JOB_TASK_STATUS_DROP:
880,216,283✔
420
      return "DROPPING";
880,216,283✔
421
    default:
24✔
422
      break;
24✔
423
  }
424

425
  return "UNKNOWN";
24✔
426
}
427

428
#if 0
429
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
430
  SSchema s = {0};
431
  s.type = type;
432
  s.bytes = bytes;
433
  s.colId = colId;
434

435
  tstrncpy(s.name, name, tListLen(s.name));
436
  return s;
437
}
438
#endif
439

440
void freeSTableMetaRspPointer(void *p) {
53,804,150✔
441
  tFreeSTableMetaRsp(*(void**)p);
53,804,150✔
442
  taosMemoryFreeClear(*(void**)p);
53,796,345✔
443
}
53,795,051✔
444

445
void destroyQueryExecRes(SExecResult* pRes) {
2,147,483,647✔
446
  if (NULL == pRes || NULL == pRes->res) {
2,147,483,647✔
447
    return;
2,147,483,647✔
448
  }
449

450
  switch (pRes->msgType) {
936,184,386✔
451
    case TDMT_VND_CREATE_TABLE: {
46,215,027✔
452
      taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
46,215,027✔
453
      break;
46,210,002✔
454
    }
455
    case TDMT_MND_CREATE_STB:
14,038,059✔
456
    case TDMT_VND_ALTER_TABLE:
457
    case TDMT_MND_ALTER_STB: {
458
      tFreeSTableMetaRsp(pRes->res);
14,038,059✔
459
      taosMemoryFreeClear(pRes->res);
14,038,059✔
460
      break;
14,038,059✔
461
    }
462
    case TDMT_VND_SUBMIT: {
627,106,104✔
463
      tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
627,106,104✔
464
      taosMemoryFreeClear(pRes->res);
627,118,721✔
465
      break;
627,136,847✔
466
    }
467
    case TDMT_SCH_QUERY:
248,787,769✔
468
    case TDMT_SCH_MERGE_QUERY: {
469
      qDebug("query execRes %p freed", pRes->res);
248,787,769✔
470
      taosArrayDestroy((SArray*)pRes->res);
248,787,769✔
471
      break;
248,788,492✔
472
    }
473
    default:
302✔
474
      qError("invalid exec result for request type:%d", pRes->msgType);
302✔
475
  }
476
}
477
// clang-format on
478
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
102,797✔
479
  QUERY_PARAM_CHECK(str);
102,797✔
480
  QUERY_PARAM_CHECK(buf);
102,797✔
481
  int32_t n = 0;
102,797✔
482

483
  switch (type) {
102,797✔
UNCOV
484
    case TSDB_DATA_TYPE_NULL:
×
UNCOV
485
      n = snprintf(str, capacity, "null");
×
UNCOV
486
      break;
×
487

488
    case TSDB_DATA_TYPE_BOOL:
5,164✔
489
      n = snprintf(str, capacity, (*(int8_t*)buf) ? "true" : "false");
5,164✔
490
      break;
5,164✔
491

492
    case TSDB_DATA_TYPE_TINYINT:
237✔
493
      n = snprintf(str, capacity, "%d", *(int8_t*)buf);
237✔
494
      break;
237✔
495

496
    case TSDB_DATA_TYPE_SMALLINT:
237✔
497
      n = snprintf(str, capacity, "%d", *(int16_t*)buf);
237✔
498
      break;
237✔
499

500
    case TSDB_DATA_TYPE_INT:
23,936✔
501
      n = snprintf(str, capacity, "%d", *(int32_t*)buf);
23,936✔
502
      break;
23,936✔
503

504
    case TSDB_DATA_TYPE_BIGINT:
1,289✔
505
    case TSDB_DATA_TYPE_TIMESTAMP:
506
      n = snprintf(str, capacity, "%" PRId64, *(int64_t*)buf);
1,289✔
507
      break;
1,289✔
508

509
    case TSDB_DATA_TYPE_FLOAT:
4,349✔
510
      n = snprintf(str, capacity, "%e", GET_FLOAT_VAL(buf));
4,349✔
511
      break;
4,349✔
512

513
    case TSDB_DATA_TYPE_DOUBLE:
3,534✔
514
      n = snprintf(str, capacity, "%e", GET_DOUBLE_VAL(buf));
3,534✔
515
      break;
3,534✔
516

UNCOV
517
    case TSDB_DATA_TYPE_VARBINARY: {
×
UNCOV
518
      if (bufSize < 0) {
×
519
        //        tscError("invalid buf size");
520
        return TSDB_CODE_TSC_INVALID_VALUE;
×
521
      }
UNCOV
522
      void*    data = NULL;
×
UNCOV
523
      uint32_t size = 0;
×
UNCOV
524
      if (taosAscii2Hex(buf, bufSize, &data, &size) < 0) {
×
UNCOV
525
        return TSDB_CODE_OUT_OF_MEMORY;
×
526
      }
UNCOV
527
      *str = '"';
×
UNCOV
528
      memcpy(str + 1, data, size);
×
UNCOV
529
      *(str + size + 1) = '"';
×
UNCOV
530
      n = size + 2;
×
UNCOV
531
      taosMemoryFree(data);
×
UNCOV
532
      break;
×
533
    }
534
    case TSDB_DATA_TYPE_BINARY:
19,118✔
535
    case TSDB_DATA_TYPE_GEOMETRY:
536
      if (bufSize < 0) {
19,118✔
537
        //        tscError("invalid buf size");
UNCOV
538
        return TSDB_CODE_TSC_INVALID_VALUE;
×
539
      }
540

541
      *str = '"';
19,118✔
542
      memcpy(str + 1, buf, bufSize);
19,118✔
543
      *(str + bufSize + 1) = '"';
19,118✔
544
      n = bufSize + 2;
19,118✔
545
      break;
19,118✔
546
    case TSDB_DATA_TYPE_NCHAR:
40,385✔
547
      if (bufSize < 0) {
40,385✔
548
        //        tscError("invalid buf size");
UNCOV
549
        return TSDB_CODE_TSC_INVALID_VALUE;
×
550
      }
551

552
      *str = '"';
40,385✔
553
      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1, NULL);
40,385✔
554
      if (length < 0) {
40,385✔
555
        return TSDB_CODE_TSC_INVALID_VALUE;
×
556
      }
557
      *(str + length + 1) = '"';
40,385✔
558
      n = length + 2;
40,385✔
559
      break;
40,385✔
560
    case TSDB_DATA_TYPE_UTINYINT:
237✔
561
      n = snprintf(str, capacity, "%d", *(uint8_t*)buf);
237✔
562
      break;
237✔
563

564
    case TSDB_DATA_TYPE_USMALLINT:
237✔
565
      n = snprintf(str, capacity, "%d", *(uint16_t*)buf);
237✔
566
      break;
237✔
567

568
    case TSDB_DATA_TYPE_UINT:
237✔
569
      n = snprintf(str, capacity, "%u", *(uint32_t*)buf);
237✔
570
      break;
237✔
571

572
    case TSDB_DATA_TYPE_UBIGINT:
3,837✔
573
      n = snprintf(str, capacity, "%" PRIu64, *(uint64_t*)buf);
3,837✔
574
      break;
3,837✔
575

UNCOV
576
    default:
×
577
      //      tscError("unsupported type:%d", type);
UNCOV
578
      return TSDB_CODE_TSC_INVALID_VALUE;
×
579
  }
580

581
  if (len) *len = n;
102,797✔
582

583
  return TSDB_CODE_SUCCESS;
102,797✔
584
}
585

586
void parseTagDatatoJson(void* p, char** jsonStr, void* charsetCxt) {
371,365✔
587
  if (!p || !jsonStr) {
371,365✔
UNCOV
588
    qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
×
UNCOV
589
    return;
×
590
  }
591
  char*   string = NULL;
371,365✔
592
  SArray* pTagVals = NULL;
371,365✔
593
  cJSON*  json = NULL;
371,365✔
594
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
371,365✔
UNCOV
595
    goto end;
×
596
  }
597

598
  int16_t nCols = taosArrayGetSize(pTagVals);
371,365✔
599
  if (nCols == 0) {
371,365✔
600
    goto end;
140,240✔
601
  }
602
  char tagJsonKey[256] = {0};
231,125✔
603
  json = cJSON_CreateObject();
231,125✔
604
  if (json == NULL) {
231,125✔
UNCOV
605
    goto end;
×
606
  }
607
  for (int j = 0; j < nCols; ++j) {
733,075✔
608
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
501,950✔
609
    if (pTagVal == NULL) {
501,950✔
UNCOV
610
      continue;
×
611
    }
612
    // json key  encode by binary
613
    tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
501,950✔
614
    // json value
615
    char type = pTagVal->type;
501,950✔
616
    if (type == TSDB_DATA_TYPE_NULL) {
501,950✔
617
      cJSON* value = cJSON_CreateNull();
31,110✔
618
      if (value == NULL) {
31,110✔
UNCOV
619
        goto end;
×
620
      }
621
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
31,110✔
UNCOV
622
        goto end;
×
623
      }
624
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
470,840✔
625
      cJSON* value = NULL;
366,250✔
626
      if (pTagVal->nData > 0) {
366,250✔
627
        char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
341,770✔
628
        if (tagJsonValue == NULL) {
341,770✔
UNCOV
629
          goto end;
×
630
        }
631
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue, charsetCxt);
341,770✔
632
        if (length < 0) {
341,770✔
UNCOV
633
          qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC,
×
634
                 charsetCxt != NULL ? ((SConvInfo*)(charsetCxt))->charset : tsCharset, pTagVal->pData);
UNCOV
635
          taosMemoryFree(tagJsonValue);
×
UNCOV
636
          goto end;
×
637
        }
638
        value = cJSON_CreateString(tagJsonValue);
341,770✔
639
        taosMemoryFree(tagJsonValue);
341,770✔
640
        if (value == NULL) {
341,770✔
UNCOV
641
          goto end;
×
642
        }
643
      } else if (pTagVal->nData == 0) {
24,480✔
644
        value = cJSON_CreateString("");
24,480✔
645
        if (value == NULL) {
24,480✔
UNCOV
646
          goto end;
×
647
        }
648
      } else {
UNCOV
649
        goto end;
×
650
      }
651

652
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
366,250✔
UNCOV
653
        goto end;
×
654
      }
655
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
104,590✔
656
      double jsonVd = *(double*)(&pTagVal->i64);
72,460✔
657
      cJSON* value = cJSON_CreateNumber(jsonVd);
72,460✔
658
      if (value == NULL) {
72,460✔
UNCOV
659
        goto end;
×
660
      }
661
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
72,460✔
UNCOV
662
        goto end;
×
663
      }
664
    } else if (type == TSDB_DATA_TYPE_BOOL) {
32,130✔
665
      char   jsonVd = *(char*)(&pTagVal->i64);
32,130✔
666
      cJSON* value = cJSON_CreateBool(jsonVd);
32,130✔
667
      if (value == NULL) {
32,130✔
668
        goto end;
×
669
      }
670
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
32,130✔
671
        goto end;
×
672
      }
673
    } else {
UNCOV
674
      goto end;
×
675
    }
676
  }
677
  string = cJSON_PrintUnformatted(json);
231,125✔
678
end:
371,365✔
679
  cJSON_Delete(json);
371,365✔
680
  taosArrayDestroy(pTagVals);
371,365✔
681
  if (string == NULL) {
371,365✔
682
    string = taosStrdup(TSDB_DATA_NULL_STR_L);
140,240✔
683
    if (string == NULL) {
140,240✔
684
      qError("failed to strdup null string");
×
685
    }
686
  }
687
  *jsonStr = string;
371,365✔
688
}
689

690
int32_t setColRef(SColRef* colRef, col_id_t colId, const char* colName, char* refColName, char* refTableName, char* refDbName) {
143,010,897✔
691
  colRef->id = colId;
143,010,897✔
692
  colRef->hasRef = true;
143,010,897✔
693
  tstrncpy(colRef->refDbName, refDbName, TSDB_DB_NAME_LEN);
143,010,897✔
694
  tstrncpy(colRef->refTableName, refTableName, TSDB_TABLE_NAME_LEN);
143,010,897✔
695
  tstrncpy(colRef->refColName, refColName, TSDB_COL_NAME_LEN);
143,010,897✔
696
  if (colName) {
143,010,897✔
697
    tstrncpy(colRef->colName, colName, TSDB_COL_NAME_LEN);
61,872,359✔
698
  } else {
699
    colRef->colName[0] = '\0';
81,138,538✔
700
  }
701
  return TSDB_CODE_SUCCESS;
143,010,897✔
702
}
703

704
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
652,724,499✔
705
  QUERY_PARAM_CHECK(pDst);
652,724,499✔
706
  if (NULL == pSrc) {
652,724,499✔
707
    *pDst = NULL;
75,401✔
708
    return TSDB_CODE_SUCCESS;
75,401✔
709
  }
710

711
  int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
652,649,098✔
712
  if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
652,642,338✔
713
    *pDst = NULL;
18,164✔
UNCOV
714
    qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
×
UNCOV
715
    return TSDB_CODE_INVALID_PARA;
×
716
  }
717

718
  int32_t metaSize = TABLE_META_FULL_SIZE(pSrc);
652,631,783✔
719
  *pDst = taosMemoryMalloc(metaSize);
652,650,211✔
720
  if (NULL == *pDst) {
652,646,695✔
UNCOV
721
    return terrno;
×
722
  }
723
  memcpy(*pDst, pSrc, metaSize);
652,651,547✔
724
  tableMetaResetPointers(*pDst);
652,651,190✔
725

726
  return TSDB_CODE_SUCCESS;
652,671,652✔
727
}
728

729
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
10,039✔
730
  if (!pMeta || !pName || !pType) return;
10,039✔
731
  int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
10,039✔
732
  for (int32_t i = 0; i < nums; ++i) {
54,836✔
733
    if (0 == strcmp(pName, pMeta->schema[i].name)) {
54,433✔
734
      *pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
9,636✔
735
      return;
9,636✔
736
    }
737
  }
738

739
  *pType = TCOL_TYPE_NONE;
403✔
740
}
741

742
void freeVgInfo(SDBVgInfo* vgInfo) {
49,583,727✔
743
  if (NULL == vgInfo) {
49,583,727✔
744
    return;
12,361,081✔
745
  }
746

747
  taosHashCleanup(vgInfo->vgHash);
37,222,646✔
748
  taosArrayDestroy(vgInfo->vgArray);
37,223,438✔
749

750
  taosMemoryFreeClear(vgInfo);
37,223,595✔
751
}
752

753
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
4,790,867✔
754
  QUERY_PARAM_CHECK(pDst);
4,790,867✔
755
  if (NULL == pSrc) {
4,790,867✔
756
    *pDst = NULL;
×
UNCOV
757
    return TSDB_CODE_SUCCESS;
×
758
  }
759

760
  *pDst = taosMemoryMalloc(sizeof(*pSrc));
4,790,867✔
761
  if (NULL == *pDst) {
4,790,867✔
UNCOV
762
    return terrno;
×
763
  }
764
  memcpy(*pDst, pSrc, sizeof(*pSrc));
4,790,867✔
765
  (*pDst)->vgArray = NULL;
4,790,867✔
766

767
  if (pSrc->vgHash) {
4,790,867✔
768
    (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
4,310,224✔
769
                                   HASH_ENTRY_LOCK);
770
    if (NULL == (*pDst)->vgHash) {
4,310,224✔
UNCOV
771
      taosMemoryFreeClear(*pDst);
×
UNCOV
772
      return terrno;
×
773
    }
774

775
    SVgroupInfo* vgInfo = NULL;
4,310,224✔
776
    void*        pIter = taosHashIterate(pSrc->vgHash, NULL);
4,310,224✔
777
    while (pIter) {
12,924,590✔
778
      vgInfo = pIter;
8,614,366✔
779
      int32_t* vgId = taosHashGetKey(pIter, NULL);
8,614,366✔
780

781
      if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
8,614,366✔
UNCOV
782
        qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
×
UNCOV
783
        taosHashCancelIterate(pSrc->vgHash, pIter);
×
UNCOV
784
        freeVgInfo(*pDst);
×
UNCOV
785
        return terrno;
×
786
      }
787

788
      pIter = taosHashIterate(pSrc->vgHash, pIter);
8,614,366✔
789
    }
790
  }
791

792
  return TSDB_CODE_SUCCESS;
4,790,867✔
793
}
794

795
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
15,567,355✔
796
  QUERY_PARAM_CHECK(pDst);
15,567,355✔
797
  if (NULL == pSrc) {
15,567,355✔
798
    *pDst = NULL;
139,041✔
799
    return TSDB_CODE_SUCCESS;
139,041✔
800
  }
801

802
  *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
15,428,314✔
803
  if (NULL == *pDst) {
15,407,766✔
UNCOV
804
    return terrno;
×
805
  }
806

807
  (*pDst)->flags = pSrc->flags;
15,408,494✔
808
  if (pSrc->name) {
15,423,257✔
809
    (*pDst)->name = taosStrdup(pSrc->name);
15,426,169✔
810
    if (NULL == (*pDst)->name) goto _exit;
15,431,111✔
811
  }
812
  (*pDst)->uid = pSrc->uid;
15,435,808✔
813
  (*pDst)->btime = pSrc->btime;
15,430,796✔
814
  (*pDst)->ttl = pSrc->ttl;
15,431,853✔
815
  (*pDst)->commentLen = pSrc->commentLen;
15,436,578✔
816
  if (pSrc->comment) {
15,431,825✔
817
    (*pDst)->comment = taosStrdup(pSrc->comment);
×
818
    if (NULL == (*pDst)->comment) goto _exit;
56✔
819
  }
820
  (*pDst)->type = pSrc->type;
15,433,008✔
821

822
  if (pSrc->type == TSDB_CHILD_TABLE) {
15,428,967✔
823
    if (pSrc->ctb.stbName) {
15,435,505✔
824
      (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
15,433,692✔
825
      if (NULL == (*pDst)->ctb.stbName) goto _exit;
15,433,307✔
826
    }
827
    (*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
15,437,920✔
828
    (*pDst)->ctb.suid = pSrc->ctb.suid;
15,432,966✔
829
    if (pSrc->ctb.tagName) {
15,428,640✔
830
      (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
15,434,520✔
831
      if (NULL == (*pDst)->ctb.tagName) goto _exit;
15,431,636✔
832
    }
833
    STag* pTag = (STag*)pSrc->ctb.pTag;
15,434,737✔
834
    if (pTag) {
15,431,853✔
835
      (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
15,433,365✔
836
      if (NULL == (*pDst)->ctb.pTag) goto _exit;
15,431,965✔
837
      memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
15,429,081✔
838
    }
839
  } else {
UNCOV
840
    (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
×
UNCOV
841
    (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
×
UNCOV
842
    if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
×
UNCOV
843
      (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
UNCOV
844
      if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
×
UNCOV
845
      memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
846
    }
847
  }
848

849
  return TSDB_CODE_SUCCESS;
15,427,639✔
850

UNCOV
851
_exit:
×
852
  tdDestroySVCreateTbReq(*pDst);
×
853
  taosMemoryFree(*pDst);
×
UNCOV
854
  *pDst = NULL;
×
UNCOV
855
  return terrno;
×
856
}
857

858
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
16,504,893✔
859
  if (pInfo) {
16,504,893✔
860
    taosArrayDestroy(pInfo->pRetensions);
5,440,951✔
861
  }
862
  taosMemoryFree(pInfo);
16,504,893✔
863
}
16,504,893✔
864

865
void* getTaskPoolWorkerCb() { return taskQueue.wrokrerPool.pCb; }
903,278,164✔
866

867
void tFreeStreamVtbOtbInfo(void* param);
868
void tFreeStreamVtbVtbInfo(void* param);
869
void tFreeStreamVtbDbVgInfo(void* param);
870

871
void destroySTagsInfo(STagsInfo* pInfo) {
2,087,242,168✔
872
  if (NULL == pInfo) {
2,087,242,168✔
873
    return;
2,087,314,494✔
874
  }
875
  taosArrayDestroy(pInfo->STagNames);
1,372✔
876

877
  for (int i = 0; i < taosArrayGetSize(pInfo->pTagVals); ++i) {
3,332✔
878
    STagVal* p = (STagVal*)taosArrayGet(pInfo->pTagVals, i);
1,960✔
879
    if (IS_VAR_DATA_TYPE(p->type)) {
1,960✔
880
      taosMemoryFreeClear(p->pData);
×
881
    }
882
  }
883
  taosArrayDestroy(pInfo->pTagVals);
1,372✔
884
  taosMemoryFreeClear(pInfo->pTagIndex);
1,372✔
885
  taosMemoryFreeClear(pInfo);
1,372✔
886
}
887

888
void qDestroyBoundColInfo(void* pInfo) {
2,088,194,401✔
889
  if (NULL == pInfo) {
2,088,194,401✔
890
    return;
979,345✔
891
  }
892
  SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
2,087,215,056✔
893

894
  taosMemoryFreeClear(pBoundInfo->pColIndex);
2,087,215,056✔
895
  destroySTagsInfo(pBoundInfo->parseredTags);
2,087,323,280✔
896
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc