• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.48
/source/libs/qcom/src/queryUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "query.h"
18
#include "tglobal.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21
#include "tsched.h"
22
#include "tworker.h"
23
// clang-format off
24
#include "cJSON.h"
25
#include "queryInt.h"
26

27
typedef struct STaskQueue {
28
  SQueryAutoQWorkerPool wrokrerPool;
29
  STaosQueue* pTaskQueue;
30
} STaskQueue;
31

32
int32_t getAsofJoinReverseOp(EOperatorType op) {
478✔
33
  switch (op) {
478!
34
    case OP_TYPE_GREATER_THAN:
×
35
      return OP_TYPE_LOWER_THAN;
×
36
    case OP_TYPE_GREATER_EQUAL:
×
37
      return OP_TYPE_LOWER_EQUAL;
×
38
    case OP_TYPE_LOWER_THAN:
×
39
      return OP_TYPE_GREATER_THAN;
×
40
    case OP_TYPE_LOWER_EQUAL:
×
41
      return OP_TYPE_GREATER_EQUAL;
×
42
    case OP_TYPE_EQUAL:
478✔
43
      return OP_TYPE_EQUAL;
478✔
44
    default:
×
45
      break;
×
46
  }
47

48
  return -1;
×
49
}
50

51
const SSchema* tGetTbnameColumnSchema() { 
×
52
  static struct SSchema _s = {
53
      .colId = TSDB_TBNAME_COLUMN_INDEX,
54
      .type = TSDB_DATA_TYPE_BINARY,
55
      .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
56
      .name = "tbname",
57
  };
58
  return &_s; 
×
59
}
60

61
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
17,220✔
62
  if (!pSchema) {
17,220!
63
    return false;
×
64
  }
65
  int32_t rowLen = 0;
17,220✔
66

67
  for (int32_t i = 0; i < numOfCols; ++i) {
304,302✔
68
    // 1. valid types
69
    if (!isValidDataType(pSchema[i].type)) {
287,081!
70
      qError("The %d col/tag data type error, type:%d", i, pSchema[i].type);
×
71
      return false;
×
72
    }
73

74
    // 2. valid length for each type
75
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY) {
287,082✔
76
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
41,191!
77
        qError("The %d col/tag var data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
78
        return false;
×
79
      }
80
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
245,891✔
81
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
51,006!
82
        qError("The %d col/tag nchar data len error, len:%d", i, pSchema[i].bytes);
×
83
        return false;
×
84
      }
85
    } else if (pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
194,885✔
86
      if (pSchema[i].bytes > TSDB_MAX_GEOMETRY_LEN) {
77!
87
        qError("The %d col/tag geometry data len error, len:%d", i, pSchema[i].bytes);
×
88
        return false;
×
89
      }
90
    } else if (IS_STR_DATA_BLOB(pSchema[i].type)) {
194,808!
91
      if (pSchema[i].bytes >= TSDB_MAX_BLOB_LEN) {
×
92
        qError("The %d col/tag blob data len error, len:%d", i, pSchema[i].bytes);
×
93
        return false;
×
94
      } 
95

96
    }else {
97
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
194,808!
98
        qError("The %d col/tag data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
99
        return false;
×
100
      }
101
    }
102

103
    // 3. valid column names
104
    for (int32_t j = i + 1; j < numOfCols; ++j) {
3,891,362✔
105
      if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
3,604,280!
106
        qError("The %d col/tag name %s is same with %d col/tag name %s", i, pSchema[i].name, j, pSchema[j].name);
×
107
        return false;
×
108
      }
109
    }
110

111
    rowLen += pSchema[i].bytes;
287,082✔
112
  }
113

114
  return rowLen <= maxLen;
17,221✔
115
}
116

117
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags) {
8,610✔
118
  if (!pSchema || !VALIDNUMOFCOLS(numOfCols)) {
8,610!
119
    qError("invalid numOfCols: %d", numOfCols);
×
120
    return false;
×
121
  }
122

123
  if (!VALIDNUMOFTAGS(numOfTags)) {
8,610!
124
    qError("invalid numOfTags: %d", numOfTags);
×
125
    return false;
×
126
  }
127

128
  /* first column must be the timestamp, which is a primary key */
129
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
8,610!
130
    qError("invalid first column type: %d", pSchema[0].type);
×
131
    return false;
×
132
  }
133

134
  if (!doValidateSchema(pSchema, numOfCols, TSDB_MAX_BYTES_PER_ROW)) {
8,610!
135
    qError("validate schema columns failed");
×
136
    return false;
×
137
  }
138

139
  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
8,610!
140
    qError("validate schema tags failed");
×
141
    return false;
×
142
  }
143

144
  return true;
8,610✔
145
}
146

147
static STaskQueue taskQueue = {0};
148

149
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
350,662✔
150
  if(!pSchedMsg || !pSchedMsg->ahandle) return;
350,662!
151
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
350,666✔
152
  (void)execFn(pSchedMsg->thandle);
350,666✔
153
  taosFreeQitem(pSchedMsg);
350,476✔
154
}
155

156
int32_t initTaskQueue() {
2,065✔
157
  memset(&taskQueue, 0, sizeof(taskQueue));
2,065✔
158
  
159
  taskQueue.wrokrerPool.name = "taskWorkPool";
2,065✔
160
  taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
2,065✔
161
  taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
2,065✔
162
  int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
2,065✔
163
  if (TSDB_CODE_SUCCESS != coce) {
2,065!
164
    qError("failed to init task thread pool");
×
165
    return -1;
×
166
  }
167

168
  taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
2,065✔
169
  if (NULL == taskQueue.pTaskQueue) {
2,065!
170
    qError("failed to init task queue");
×
171
    return -1;
×
172
  }
173

174
  qInfo("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
2,065!
175
  return 0;
2,065✔
176
}
177

178
int32_t cleanupTaskQueue() {
2,066✔
179
  tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
2,066✔
180
  return 0;
2,066✔
181
}
182

183
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
350,483✔
184
  SSchedMsg* pSchedMsg; 
185
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
350,483✔
186
  if (rc) return rc;
350,527!
187
  pSchedMsg->fp = NULL;
350,527✔
188
  pSchedMsg->ahandle = execFn;
350,527✔
189
  pSchedMsg->thandle = execParam;
350,527✔
190
  pSchedMsg->msg = code;
350,527✔
191

192
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
350,527✔
193
}
194

195
int32_t taosAsyncWait() {
17✔
196
  if (!taskQueue.wrokrerPool.pCb) {
17!
197
    qError("query task thread pool callback function is null");
×
198
    return -1;
×
199
  }
200
  return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
17✔
201
}
202

203
int32_t taosAsyncRecover() {
17✔
204
  if (!taskQueue.wrokrerPool.pCb) {
17!
205
    qError("query task thread pool callback function is null");
×
206
    return -1;
×
207
  }
208
  return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
17✔
209
}
210

211
int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) {
×
212
  SSchedMsg* pSchedMsg;
213
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
×
214
  if (rc) return rc;
×
215
  pSchedMsg->fp = NULL;
×
216
  pSchedMsg->ahandle = bindFn;
×
217
  pSchedMsg->thandle = bindParam;
×
218
  // pSchedMsg->msg = code;
219

220
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
×
221
}
222

223
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
1,406,752✔
224
  if (NULL == pMsgBody) {
1,406,752!
225
    return;
×
226
  }
227

228
  
229
  qDebug("ahandle %p freed, QID:0x%" PRIx64, pMsgBody, pMsgBody->requestId);
1,406,752✔
230
  
231
  taosMemoryFreeClear(pMsgBody->target.dbFName);
1,406,826!
232
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
1,406,813!
233
  if (pMsgBody->paramFreeFp) {
1,406,809✔
234
    (*pMsgBody->paramFreeFp)(pMsgBody->param);
1,373,432✔
235
  }
236
  taosMemoryFreeClear(pMsgBody);
1,406,823!
237
}
238
void destroyAhandle(void *ahandle) {
87,578✔
239
  SMsgSendInfo *pSendInfo = ahandle;
87,578✔
240
  if (pSendInfo == NULL) return;
87,578!
241

242
  if (pSendInfo->streamAHandle) {
87,578✔
243
    qDebug("stream ahandle %p freed", pSendInfo);
50,876✔
244
  }
245

246
  destroySendMsgInfo(pSendInfo);
87,578✔
247
}
248

249
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
1,194,239✔
250
                                bool persistHandle, void* rpcCtx) {                         
251
  if (NULL == pTransporter || NULL == epSet || NULL == pInfo) {
1,194,239!
252
    destroySendMsgInfo(pInfo);
×
253
    return TSDB_CODE_TSC_INVALID_INPUT;
×
254
  }
255

256
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
1,194,394✔
257
  if (NULL == pMsg) {
1,194,269!
258
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
×
259
    destroySendMsgInfo(pInfo);
×
260
    return terrno;
×
261
  }
262

263
  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
1,194,269✔
264
  SRpcMsg rpcMsg = {
1,194,269✔
265
    .msgType = pInfo->msgType,
1,194,269✔
266
    .pCont = pMsg,
267
    .contLen = pInfo->msgInfo.len,
1,194,269✔
268
    .info.ahandle = (void*)pInfo,
269
    .info.handle = pInfo->msgInfo.handle,
1,194,269✔
270
    .info.persistHandle = persistHandle,
271
    .code = 0
272
  };
273
  TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
1,194,269✔
274
  TRACE_SET_MSGID(&rpcMsg.info.traceId, tGenIdPI64());
1,194,269✔
275
  int32_t msgType = pInfo->msgType;
1,194,303✔
276

277
  int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
1,194,303✔
278
  if (code) {
1,194,481✔
279
    destroySendMsgInfo(pInfo);
1✔
280
  } else {
281
    qDebug("msg %s sent, 0x%" PRIx64 ":0x%" PRIx64, TMSG_INFO(msgType), TRACE_GET_ROOTID(&rpcMsg.info.traceId), TRACE_GET_MSGID(&rpcMsg.info.traceId));
1,194,480!
282
  }
283
  return code;
1,194,457✔
284
}
285

286
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
955,848✔
287
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
955,848✔
288
}
289
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
898,479✔
290
  QUERY_PARAM_CHECK(pTransporter);
898,479!
291
  return rpcFreeConnById(pTransporter, pid);
898,479✔
292
}
293

294
char* jobTaskStatusStr(int32_t status) {
10,379,298✔
295
  switch (status) {
10,379,298!
296
    case JOB_TASK_STATUS_NULL:
8,438✔
297
      return "NULL";
8,438✔
298
    case JOB_TASK_STATUS_INIT:
2,961,643✔
299
      return "INIT";
2,961,643✔
300
    case JOB_TASK_STATUS_EXEC:
3,007,693✔
301
      return "EXECUTING";
3,007,693✔
302
    case JOB_TASK_STATUS_PART_SUCC:
2,928,525✔
303
      return "PARTIAL_SUCCEED";
2,928,525✔
304
    case JOB_TASK_STATUS_FETCH:
13,149✔
305
      return "FETCHING";
13,149✔
306
    case JOB_TASK_STATUS_SUCC:
1,449,927✔
307
      return "SUCCEED";
1,449,927✔
308
    case JOB_TASK_STATUS_FAIL:
643✔
309
      return "FAILED";
643✔
310
    case JOB_TASK_STATUS_DROP:
9,354✔
311
      return "DROPPING";
9,354✔
312
    default:
×
313
      break;
×
314
  }
315

316
  return "UNKNOWN";
×
317
}
318

319
#if 0
320
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
321
  SSchema s = {0};
322
  s.type = type;
323
  s.bytes = bytes;
324
  s.colId = colId;
325

326
  tstrncpy(s.name, name, tListLen(s.name));
327
  return s;
328
}
329
#endif
330

331
void freeSTableMetaRspPointer(void *p) {
29,608✔
332
  tFreeSTableMetaRsp(*(void**)p);
29,608✔
333
  taosMemoryFreeClear(*(void**)p);
29,614!
334
}
29,615✔
335

336
void destroyQueryExecRes(SExecResult* pRes) {
424,168✔
337
  if (NULL == pRes || NULL == pRes->res) {
424,168!
338
    return;
289,891✔
339
  }
340

341
  switch (pRes->msgType) {
134,277✔
342
    case TDMT_VND_CREATE_TABLE: {
25,004✔
343
      taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
25,004✔
344
      break;
25,005✔
345
    }
346
    case TDMT_MND_CREATE_STB:
2,833✔
347
    case TDMT_VND_ALTER_TABLE:
348
    case TDMT_MND_ALTER_STB: {
349
      tFreeSTableMetaRsp(pRes->res);
2,833✔
350
      taosMemoryFreeClear(pRes->res);
2,833!
351
      break;
2,833✔
352
    }
353
    case TDMT_VND_SUBMIT: {
90,762✔
354
      tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
90,762✔
355
      taosMemoryFreeClear(pRes->res);
90,781!
356
      break;
90,805✔
357
    }
358
    case TDMT_SCH_QUERY:
15,639✔
359
    case TDMT_SCH_MERGE_QUERY: {
360
      taosArrayDestroy((SArray*)pRes->res);
15,639✔
361
      break;
15,626✔
362
    }
363
    default:
39✔
364
      qError("invalid exec result for request type:%d", pRes->msgType);
39!
365
  }
366
}
367
// clang-format on
368
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
107✔
369
  QUERY_PARAM_CHECK(str);
107!
370
  QUERY_PARAM_CHECK(buf);
107!
371
  int32_t n = 0;
107✔
372

373
  switch (type) {
107!
374
    case TSDB_DATA_TYPE_NULL:
×
375
      n = tsnprintf(str, capacity, "null");
×
376
      break;
×
377

378
    case TSDB_DATA_TYPE_BOOL:
1✔
379
      n = tsnprintf(str, capacity, (*(int8_t*)buf) ? "true" : "false");
1!
380
      break;
1✔
381

382
    case TSDB_DATA_TYPE_TINYINT:
×
383
      n = tsnprintf(str, capacity, "%d", *(int8_t*)buf);
×
384
      break;
×
385

386
    case TSDB_DATA_TYPE_SMALLINT:
×
387
      n = tsnprintf(str, capacity, "%d", *(int16_t*)buf);
×
388
      break;
×
389

390
    case TSDB_DATA_TYPE_INT:
5✔
391
      n = tsnprintf(str, capacity, "%d", *(int32_t*)buf);
5✔
392
      break;
5✔
393

394
    case TSDB_DATA_TYPE_BIGINT:
×
395
    case TSDB_DATA_TYPE_TIMESTAMP:
396
      n = tsnprintf(str, capacity, "%" PRId64, *(int64_t*)buf);
×
397
      break;
×
398

399
    case TSDB_DATA_TYPE_FLOAT:
1✔
400
      n = tsnprintf(str, capacity, "%e", GET_FLOAT_VAL(buf));
1✔
401
      break;
1✔
402

403
    case TSDB_DATA_TYPE_DOUBLE:
1✔
404
      n = tsnprintf(str, capacity, "%e", GET_DOUBLE_VAL(buf));
1✔
405
      break;
1✔
406

407
    case TSDB_DATA_TYPE_VARBINARY: {
×
408
      if (bufSize < 0) {
×
409
        //        tscError("invalid buf size");
410
        return TSDB_CODE_TSC_INVALID_VALUE;
×
411
      }
412
      void*    data = NULL;
×
413
      uint32_t size = 0;
×
414
      if (taosAscii2Hex(buf, bufSize, &data, &size) < 0) {
×
415
        return TSDB_CODE_OUT_OF_MEMORY;
×
416
      }
417
      *str = '"';
×
418
      memcpy(str + 1, data, size);
×
419
      *(str + size + 1) = '"';
×
420
      n = size + 2;
×
421
      taosMemoryFree(data);
×
422
      break;
×
423
    }
424
    case TSDB_DATA_TYPE_BINARY:
2✔
425
    case TSDB_DATA_TYPE_GEOMETRY:
426
      if (bufSize < 0) {
2!
427
        //        tscError("invalid buf size");
428
        return TSDB_CODE_TSC_INVALID_VALUE;
×
429
      }
430

431
      *str = '"';
2✔
432
      memcpy(str + 1, buf, bufSize);
2✔
433
      *(str + bufSize + 1) = '"';
2✔
434
      n = bufSize + 2;
2✔
435
      break;
2✔
436
    case TSDB_DATA_TYPE_NCHAR:
97✔
437
      if (bufSize < 0) {
97!
438
        //        tscError("invalid buf size");
439
        return TSDB_CODE_TSC_INVALID_VALUE;
×
440
      }
441

442
      *str = '"';
97✔
443
      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1, NULL);
97✔
444
      if (length <= 0) {
97!
445
        return TSDB_CODE_TSC_INVALID_VALUE;
×
446
      }
447
      *(str + length + 1) = '"';
97✔
448
      n = length + 2;
97✔
449
      break;
97✔
450
    case TSDB_DATA_TYPE_UTINYINT:
×
451
      n = tsnprintf(str, capacity, "%d", *(uint8_t*)buf);
×
452
      break;
×
453

454
    case TSDB_DATA_TYPE_USMALLINT:
×
455
      n = tsnprintf(str, capacity, "%d", *(uint16_t*)buf);
×
456
      break;
×
457

458
    case TSDB_DATA_TYPE_UINT:
×
459
      n = tsnprintf(str, capacity, "%u", *(uint32_t*)buf);
×
460
      break;
×
461

462
    case TSDB_DATA_TYPE_UBIGINT:
×
463
      n = tsnprintf(str, capacity, "%" PRIu64, *(uint64_t*)buf);
×
464
      break;
×
465

466
    default:
×
467
      //      tscError("unsupported type:%d", type);
468
      return TSDB_CODE_TSC_INVALID_VALUE;
×
469
  }
470

471
  if (len) *len = n;
107✔
472

473
  return TSDB_CODE_SUCCESS;
107✔
474
}
475

476
void parseTagDatatoJson(void* p, char** jsonStr, void* charsetCxt) {
203✔
477
  if (!p || !jsonStr) {
203!
478
    qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
×
479
    return;
×
480
  }
481
  char*   string = NULL;
203✔
482
  SArray* pTagVals = NULL;
203✔
483
  cJSON*  json = NULL;
203✔
484
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
203!
485
    goto end;
×
486
  }
487

488
  int16_t nCols = taosArrayGetSize(pTagVals);
203✔
489
  if (nCols == 0) {
203✔
490
    goto end;
173✔
491
  }
492
  char tagJsonKey[256] = {0};
30✔
493
  json = cJSON_CreateObject();
30✔
494
  if (json == NULL) {
30!
495
    goto end;
×
496
  }
497
  for (int j = 0; j < nCols; ++j) {
85✔
498
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
55✔
499
    if (pTagVal == NULL) {
55!
500
      continue;
×
501
    }
502
    // json key  encode by binary
503
    tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
55✔
504
    // json value
505
    char type = pTagVal->type;
55✔
506
    if (type == TSDB_DATA_TYPE_NULL) {
55!
507
      cJSON* value = cJSON_CreateNull();
×
508
      if (value == NULL) {
×
509
        goto end;
×
510
      }
511
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
×
512
        goto end;
×
513
      }
514
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
55✔
515
      cJSON* value = NULL;
45✔
516
      if (pTagVal->nData > 0) {
45!
517
        char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
45!
518
        if (tagJsonValue == NULL) {
45!
519
          goto end;
×
520
        }
521
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue, charsetCxt);
45✔
522
        if (length < 0) {
45!
523
          qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC,
×
524
                 charsetCxt != NULL ? ((SConvInfo*)(charsetCxt))->charset : tsCharset, pTagVal->pData);
525
          taosMemoryFree(tagJsonValue);
×
526
          goto end;
×
527
        }
528
        value = cJSON_CreateString(tagJsonValue);
45✔
529
        taosMemoryFree(tagJsonValue);
45!
530
        if (value == NULL) {
45!
531
          goto end;
×
532
        }
533
      } else if (pTagVal->nData == 0) {
×
534
        value = cJSON_CreateString("");
×
535
        if (value == NULL) {
×
536
          goto end;
×
537
        }
538
      } else {
539
        goto end;
×
540
      }
541

542
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
45!
543
        goto end;
×
544
      }
545
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
10!
546
      double jsonVd = *(double*)(&pTagVal->i64);
10✔
547
      cJSON* value = cJSON_CreateNumber(jsonVd);
10✔
548
      if (value == NULL) {
10!
549
        goto end;
×
550
      }
551
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
10!
552
        goto end;
×
553
      }
554
    } else if (type == TSDB_DATA_TYPE_BOOL) {
×
555
      char   jsonVd = *(char*)(&pTagVal->i64);
×
556
      cJSON* value = cJSON_CreateBool(jsonVd);
×
557
      if (value == NULL) {
×
558
        goto end;
×
559
      }
560
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
×
561
        goto end;
×
562
      }
563
    } else {
564
      goto end;
×
565
    }
566
  }
567
  string = cJSON_PrintUnformatted(json);
30✔
568
end:
203✔
569
  cJSON_Delete(json);
203✔
570
  taosArrayDestroy(pTagVals);
203✔
571
  if (string == NULL) {
203✔
572
    string = taosStrdup(TSDB_DATA_NULL_STR_L);
173!
573
    if (string == NULL) {
173!
574
      qError("failed to strdup null string");
×
575
    }
576
  }
577
  *jsonStr = string;
203✔
578
}
579

580
int32_t setColRef(SColRef* colRef, col_id_t colId, char* refColName, char* refTableName, char* refDbName) {
2✔
581
  colRef->id = colId;
2✔
582
  colRef->hasRef = true;
2✔
583
  tstrncpy(colRef->refDbName, refDbName, TSDB_DB_NAME_LEN);
2✔
584
  tstrncpy(colRef->refTableName, refTableName, TSDB_TABLE_NAME_LEN);
2✔
585
  tstrncpy(colRef->refColName, refColName, TSDB_COL_NAME_LEN);
2✔
586
  return TSDB_CODE_SUCCESS;
2✔
587
}
588

589
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
22,884✔
590
  QUERY_PARAM_CHECK(pDst);
22,884!
591
  if (NULL == pSrc) {
22,884!
592
    *pDst = NULL;
×
593
    return TSDB_CODE_SUCCESS;
×
594
  }
595

596
  int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
22,884✔
597
  if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
22,884!
598
    *pDst = NULL;
×
599
    qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
×
600
    return TSDB_CODE_INVALID_PARA;
×
601
  }
602

603
  int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
22,898✔
604
  int32_t schemaExtSize = 0;
22,898✔
605
  int32_t colRefSize = 0;
22,898✔
606
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
22,898✔
607
    schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
22,582✔
608
  }
609
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
22,904!
610
    colRefSize = pSrc->numOfColRefs * sizeof(SColRef);
28✔
611
  }
612
  *pDst = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize);
22,916!
613
  if (NULL == *pDst) {
22,869!
614
    return terrno;
×
615
  }
616
  memcpy(*pDst, pSrc, metaSize);
22,869✔
617
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
22,869✔
618
    (*pDst)->schemaExt = (SSchemaExt*)((char*)*pDst + metaSize);
22,587✔
619
    memcpy((*pDst)->schemaExt, pSrc->schemaExt, schemaExtSize);
22,587✔
620
  } else {
621
    (*pDst)->schemaExt = NULL;
331✔
622
  }
623
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
22,918!
624
    (*pDst)->colRef = (SColRef*)((char*)*pDst + metaSize + schemaExtSize);
28✔
625
    memcpy((*pDst)->colRef, pSrc->colRef, colRefSize);
28✔
626
  } else {
627
    (*pDst)->colRef = NULL;
22,894✔
628
  }
629

630
  return TSDB_CODE_SUCCESS;
22,922✔
631
}
632

633
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
24✔
634
  if (!pMeta || !pName || !pType) return;
24!
635
  int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
24✔
636
  for (int32_t i = 0; i < nums; ++i) {
96!
637
    if (0 == strcmp(pName, pMeta->schema[i].name)) {
96✔
638
      *pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
24!
639
      return;
24✔
640
    }
641
  }
642

643
  *pType = TCOL_TYPE_NONE;
×
644
}
645

646
void freeVgInfo(SDBVgInfo* vgInfo) {
268,647✔
647
  if (NULL == vgInfo) {
268,647✔
648
    return;
60,115✔
649
  }
650

651
  taosHashCleanup(vgInfo->vgHash);
208,532✔
652
  taosArrayDestroy(vgInfo->vgArray);
208,534✔
653

654
  taosMemoryFreeClear(vgInfo);
208,533!
655
}
656

657
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
1,561✔
658
  QUERY_PARAM_CHECK(pDst);
1,561!
659
  if (NULL == pSrc) {
1,561!
660
    *pDst = NULL;
×
661
    return TSDB_CODE_SUCCESS;
×
662
  }
663

664
  *pDst = taosMemoryMalloc(sizeof(*pSrc));
1,561!
665
  if (NULL == *pDst) {
1,561!
666
    return terrno;
×
667
  }
668
  memcpy(*pDst, pSrc, sizeof(*pSrc));
1,561✔
669
  (*pDst)->vgArray = NULL;
1,561✔
670

671
  if (pSrc->vgHash) {
1,561!
672
    (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
1,561✔
673
                                   HASH_ENTRY_LOCK);
674
    if (NULL == (*pDst)->vgHash) {
1,561!
675
      taosMemoryFreeClear(*pDst);
×
676
      return terrno;
×
677
    }
678

679
    SVgroupInfo* vgInfo = NULL;
1,561✔
680
    void*        pIter = taosHashIterate(pSrc->vgHash, NULL);
1,561✔
681
    while (pIter) {
4,930✔
682
      vgInfo = pIter;
3,369✔
683
      int32_t* vgId = taosHashGetKey(pIter, NULL);
3,369✔
684

685
      if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
3,369!
686
        qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
×
687
        taosHashCancelIterate(pSrc->vgHash, pIter);
×
688
        freeVgInfo(*pDst);
×
689
        return terrno;
×
690
      }
691

692
      pIter = taosHashIterate(pSrc->vgHash, pIter);
3,369✔
693
    }
694
  }
695

696
  return TSDB_CODE_SUCCESS;
1,561✔
697
}
698

699
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
24,929✔
700
  QUERY_PARAM_CHECK(pDst);
24,929!
701
  if (NULL == pSrc) {
24,929✔
702
    *pDst = NULL;
853✔
703
    return TSDB_CODE_SUCCESS;
853✔
704
  }
705

706
  *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
24,076!
707
  if (NULL == *pDst) {
24,067!
708
    return terrno;
×
709
  }
710

711
  (*pDst)->flags = pSrc->flags;
24,067✔
712
  if (pSrc->name) {
24,067!
713
    (*pDst)->name = taosStrdup(pSrc->name);
24,071!
714
    if (NULL == (*pDst)->name) goto _exit;
24,092!
715
  }
716
  (*pDst)->uid = pSrc->uid;
24,088✔
717
  (*pDst)->btime = pSrc->btime;
24,088✔
718
  (*pDst)->ttl = pSrc->ttl;
24,088✔
719
  (*pDst)->commentLen = pSrc->commentLen;
24,088✔
720
  if (pSrc->comment) {
24,088!
721
    (*pDst)->comment = taosStrdup(pSrc->comment);
×
722
    if (NULL == (*pDst)->comment) goto _exit;
×
723
  }
724
  (*pDst)->type = pSrc->type;
24,085✔
725

726
  if (pSrc->type == TSDB_CHILD_TABLE) {
24,085!
727
    if (pSrc->ctb.stbName) {
24,085!
728
      (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
24,088!
729
      if (NULL == (*pDst)->ctb.stbName) goto _exit;
24,114!
730
    }
731
    (*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
24,111✔
732
    (*pDst)->ctb.suid = pSrc->ctb.suid;
24,111✔
733
    if (pSrc->ctb.tagName) {
24,111✔
734
      (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
24,099✔
735
      if (NULL == (*pDst)->ctb.tagName) goto _exit;
24,099!
736
    }
737
    STag* pTag = (STag*)pSrc->ctb.pTag;
24,111✔
738
    if (pTag) {
24,111✔
739
      (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
24,099!
740
      if (NULL == (*pDst)->ctb.pTag) goto _exit;
24,087!
741
      memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
24,087✔
742
    }
743
  } else {
744
    (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
×
745
    (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
×
746
    if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
×
747
      (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
748
      if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
×
749
      memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
750
    }
751
  }
752

753
  return TSDB_CODE_SUCCESS;
24,099✔
754

755
_exit:
×
756
  tdDestroySVCreateTbReq(*pDst);
×
757
  taosMemoryFree(*pDst);
×
758
  *pDst = NULL;
×
759
  return terrno;
×
760
}
761

762
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
29,997✔
763
  if (pInfo) {
29,997✔
764
    taosArrayDestroy(pInfo->pRetensions);
1,763✔
765
  }
766
  taosMemoryFree(pInfo);
29,997!
767
}
29,997✔
768

769
void* getTaskPoolWorkerCb() { return taskQueue.wrokrerPool.pCb; }
137,749✔
770

771
void tFreeStreamVtbOtbInfo(void* param);
772
void tFreeStreamVtbVtbInfo(void* param);
773
void tFreeStreamVtbDbVgInfo(void* param);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc