• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.5
/source/libs/qcom/src/queryUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "query.h"
18
#include "tglobal.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21
#include "tsched.h"
22
#include "tworker.h"
23
// clang-format off
24
#include "cJSON.h"
25
#include "queryInt.h"
26

27
typedef struct STaskQueue {
28
  SQueryAutoQWorkerPool wrokrerPool;
29
  STaosQueue* pTaskQueue;
30
} STaskQueue;
31

32
int32_t getAsofJoinReverseOp(EOperatorType op) {
×
33
  switch (op) {
×
34
    case OP_TYPE_GREATER_THAN:
×
35
      return OP_TYPE_LOWER_THAN;
×
36
    case OP_TYPE_GREATER_EQUAL:
×
37
      return OP_TYPE_LOWER_EQUAL;
×
38
    case OP_TYPE_LOWER_THAN:
×
39
      return OP_TYPE_GREATER_THAN;
×
40
    case OP_TYPE_LOWER_EQUAL:
×
41
      return OP_TYPE_GREATER_EQUAL;
×
42
    case OP_TYPE_EQUAL:
×
43
      return OP_TYPE_EQUAL;
×
44
    default:
×
45
      break;
×
46
  }
47

48
  return -1;
×
49
}
50

51
const SSchema* tGetTbnameColumnSchema() { 
×
52
  static struct SSchema _s = {
53
      .colId = TSDB_TBNAME_COLUMN_INDEX,
54
      .type = TSDB_DATA_TYPE_BINARY,
55
      .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
56
      .name = "tbname",
57
  };
58
  return &_s; 
×
59
}
60

61
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
1,824✔
62
  if (!pSchema) {
1,824!
63
    return false;
×
64
  }
65
  int32_t rowLen = 0;
1,824✔
66

67
  for (int32_t i = 0; i < numOfCols; ++i) {
20,989✔
68
    // 1. valid types
69
    if (!isValidDataType(pSchema[i].type)) {
19,164!
70
      qError("The %d col/tag data type error, type:%d", i, pSchema[i].type);
×
71
      return false;
×
72
    }
73

74
    // 2. valid length for each type
75
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY) {
19,165✔
76
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
2,030!
77
        qError("The %d col/tag var data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
78
        return false;
×
79
      }
80
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
17,135✔
81
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
793!
82
        qError("The %d col/tag nchar data len error, len:%d", i, pSchema[i].bytes);
×
83
        return false;
×
84
      }
85
    } else if (pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
16,342✔
86
      if (pSchema[i].bytes > TSDB_MAX_GEOMETRY_LEN) {
287!
87
        qError("The %d col/tag geometry data len error, len:%d", i, pSchema[i].bytes);
×
88
        return false;
×
89
      }
90
    } else {
91
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
16,055!
92
        qError("The %d col/tag data len error, type:%d, len:%d", i, pSchema[i].type, pSchema[i].bytes);
×
93
        return false;
×
94
      }
95
    }
96

97
    // 3. valid column names
98
    for (int32_t j = i + 1; j < numOfCols; ++j) {
16,835,670✔
99
      if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
16,816,505!
100
        qError("The %d col/tag name %s is same with %d col/tag name %s", i, pSchema[i].name, j, pSchema[j].name);
×
101
        return false;
×
102
      }
103
    }
104

105
    rowLen += pSchema[i].bytes;
19,165✔
106
  }
107

108
  return rowLen <= maxLen;
1,825✔
109
}
110

111
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags) {
912✔
112
  if (!pSchema || !VALIDNUMOFCOLS(numOfCols)) {
912!
113
    qError("invalid numOfCols: %d", numOfCols);
×
114
    return false;
×
115
  }
116

117
  if (!VALIDNUMOFTAGS(numOfTags)) {
912!
118
    qError("invalid numOfTags: %d", numOfTags);
×
119
    return false;
×
120
  }
121

122
  /* first column must be the timestamp, which is a primary key */
123
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
912!
124
    qError("invalid first column type: %d", pSchema[0].type);
×
125
    return false;
×
126
  }
127

128
  if (!doValidateSchema(pSchema, numOfCols, TSDB_MAX_BYTES_PER_ROW)) {
912!
129
    qError("validate schema columns failed");
×
130
    return false;
×
131
  }
132

133
  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
912!
134
    qError("validate schema tags failed");
×
135
    return false;
×
136
  }
137

138
  return true;
912✔
139
}
140

141
static STaskQueue taskQueue = {0};
142

143
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
178,291✔
144
  if(!pSchedMsg || !pSchedMsg->ahandle) return;
178,291!
145
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
178,291✔
146
  (void)execFn(pSchedMsg->thandle);
178,291✔
147
  taosFreeQitem(pSchedMsg);
178,282✔
148
}
149

150
int32_t initTaskQueue() {
404✔
151
  memset(&taskQueue, 0, sizeof(taskQueue));
404✔
152
  
153
  taskQueue.wrokrerPool.name = "taskWorkPool";
404✔
154
  taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
404✔
155
  taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
404✔
156
  int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
404✔
157
  if (TSDB_CODE_SUCCESS != coce) {
404!
158
    qError("failed to init task thread pool");
×
159
    return -1;
×
160
  }
161

162
  taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
404✔
163
  if (NULL == taskQueue.pTaskQueue) {
404!
164
    qError("failed to init task queue");
×
165
    return -1;
×
166
  }
167

168
  qInfo("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
404!
169
  return 0;
404✔
170
}
171

172
int32_t cleanupTaskQueue() {
404✔
173
  tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
404✔
174
  return 0;
404✔
175
}
176

177
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
178,275✔
178
  SSchedMsg* pSchedMsg; 
179
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
178,275✔
180
  if (rc) return rc;
178,276!
181
  pSchedMsg->fp = NULL;
178,276✔
182
  pSchedMsg->ahandle = execFn;
178,276✔
183
  pSchedMsg->thandle = execParam;
178,276✔
184
  pSchedMsg->msg = code;
178,276✔
185

186
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
178,276✔
187
}
188

189
int32_t taosAsyncWait() {
×
190
  if (!taskQueue.wrokrerPool.pCb) {
×
191
    qError("query task thread pool callback function is null");
×
192
    return -1;
×
193
  }
194
  return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
×
195
}
196

197
int32_t taosAsyncRecover() {
×
198
  if (!taskQueue.wrokrerPool.pCb) {
×
199
    qError("query task thread pool callback function is null");
×
200
    return -1;
×
201
  }
202
  return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
×
203
}
204

205
int32_t taosStmt2AsyncBind(__async_exec_fn_t bindFn, void* bindParam) {
×
206
  SSchedMsg* pSchedMsg;
207
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
×
208
  if (rc) return rc;
×
209
  pSchedMsg->fp = NULL;
×
210
  pSchedMsg->ahandle = bindFn;
×
211
  pSchedMsg->thandle = bindParam;
×
212
  // pSchedMsg->msg = code;
213

214
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
×
215
}
216

217
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
201,327✔
218
  if (NULL == pMsgBody) {
201,327!
219
    return;
×
220
  }
221

222
  
223
  qDebug("ahandle %p freed, QID:0x%" PRIx64, pMsgBody, pMsgBody->requestId);
201,327✔
224
  
225
  taosMemoryFreeClear(pMsgBody->target.dbFName);
201,328!
226
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
201,325!
227
  if (pMsgBody->paramFreeFp) {
201,318✔
228
    (*pMsgBody->paramFreeFp)(pMsgBody->param);
195,674✔
229
  }
230
  taosMemoryFreeClear(pMsgBody);
201,312!
231
}
232
void destroyAhandle(void *ahandle) {
8,713✔
233
  SMsgSendInfo *pSendInfo = ahandle;
8,713✔
234
  if (pSendInfo == NULL) return;
8,713!
235

236
  destroySendMsgInfo(pSendInfo);
8,713✔
237
}
238

239
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
161,325✔
240
                                bool persistHandle, void* rpcCtx) {                         
241
  QUERY_PARAM_CHECK(pTransporter);
161,325!
242
  QUERY_PARAM_CHECK(epSet);
161,325!
243
  QUERY_PARAM_CHECK(pInfo);
161,325!
244

245
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
161,325✔
246
  if (NULL == pMsg) {
161,338!
247
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
×
248
    destroySendMsgInfo(pInfo);
×
249
    return terrno;
×
250
  }
251

252
  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
161,338✔
253
  SRpcMsg rpcMsg = {
161,338✔
254
    .msgType = pInfo->msgType,
161,338✔
255
    .pCont = pMsg,
256
    .contLen = pInfo->msgInfo.len,
161,338✔
257
    .info.ahandle = (void*)pInfo,
258
    .info.handle = pInfo->msgInfo.handle,
161,338✔
259
    .info.persistHandle = persistHandle,
260
    .code = 0
261
  };
262
  TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
161,338✔
263

264
  int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
161,338✔
265
  if (code) {
161,371✔
266
    destroySendMsgInfo(pInfo);
24✔
267
  }
268
  return code;
161,377✔
269
}
270

271
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
22,876✔
272
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
22,876✔
273
}
274
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
1,637✔
275
  QUERY_PARAM_CHECK(pTransporter);
1,637!
276
  return rpcFreeConnById(pTransporter, pid);
1,637✔
277
}
278

279
char* jobTaskStatusStr(int32_t status) {
462,448✔
280
  switch (status) {
462,448!
281
    case JOB_TASK_STATUS_NULL:
49,962✔
282
      return "NULL";
49,962✔
283
    case JOB_TASK_STATUS_INIT:
101,194✔
284
      return "INIT";
101,194✔
285
    case JOB_TASK_STATUS_EXEC:
151,985✔
286
      return "EXECUTING";
151,985✔
287
    case JOB_TASK_STATUS_PART_SUCC:
100,996✔
288
      return "PARTIAL_SUCCEED";
100,996✔
289
    case JOB_TASK_STATUS_FETCH:
3,332✔
290
      return "FETCHING";
3,332✔
291
    case JOB_TASK_STATUS_SUCC:
4,241✔
292
      return "SUCCEED";
4,241✔
293
    case JOB_TASK_STATUS_FAIL:
591✔
294
      return "FAILED";
591✔
295
    case JOB_TASK_STATUS_DROP:
50,147✔
296
      return "DROPPING";
50,147✔
297
    default:
×
298
      break;
×
299
  }
300

301
  return "UNKNOWN";
×
302
}
303

304
#if 0
305
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
306
  SSchema s = {0};
307
  s.type = type;
308
  s.bytes = bytes;
309
  s.colId = colId;
310

311
  tstrncpy(s.name, name, tListLen(s.name));
312
  return s;
313
}
314
#endif
315

316
void freeSTableMetaRspPointer(void *p) {
5,449✔
317
  tFreeSTableMetaRsp(*(void**)p);
5,449✔
318
  taosMemoryFreeClear(*(void**)p);
5,448!
319
}
5,449✔
320

321
void destroyQueryExecRes(SExecResult* pRes) {
340,069✔
322
  if (NULL == pRes || NULL == pRes->res) {
340,069!
323
    return;
229,451✔
324
  }
325

326
  switch (pRes->msgType) {
110,618✔
327
    case TDMT_VND_CREATE_TABLE: {
3,470✔
328
      taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
3,470✔
329
      break;
3,470✔
330
    }
331
    case TDMT_MND_CREATE_STB:
302✔
332
    case TDMT_VND_ALTER_TABLE:
333
    case TDMT_MND_ALTER_STB: {
334
      tFreeSTableMetaRsp(pRes->res);
302✔
335
      taosMemoryFreeClear(pRes->res);
302!
336
      break;
302✔
337
    }
338
    case TDMT_VND_SUBMIT: {
101,323✔
339
      tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
101,323✔
340
      taosMemoryFreeClear(pRes->res);
101,334!
341
      break;
101,337✔
342
    }
343
    case TDMT_SCH_QUERY:
5,510✔
344
    case TDMT_SCH_MERGE_QUERY: {
345
      taosArrayDestroy((SArray*)pRes->res);
5,510✔
346
      break;
5,510✔
347
    }
348
    default:
13✔
349
      qError("invalid exec result for request type:%d", pRes->msgType);
13!
350
  }
351
}
352
// clang-format on
353
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
×
354
  QUERY_PARAM_CHECK(str);
×
355
  QUERY_PARAM_CHECK(buf);
×
356
  int32_t n = 0;
×
357

358
  switch (type) {
×
359
    case TSDB_DATA_TYPE_NULL:
×
360
      n = tsnprintf(str, capacity, "null");
×
361
      break;
×
362

363
    case TSDB_DATA_TYPE_BOOL:
×
364
      n = tsnprintf(str, capacity, (*(int8_t*)buf) ? "true" : "false");
×
365
      break;
×
366

367
    case TSDB_DATA_TYPE_TINYINT:
×
368
      n = tsnprintf(str, capacity, "%d", *(int8_t*)buf);
×
369
      break;
×
370

371
    case TSDB_DATA_TYPE_SMALLINT:
×
372
      n = tsnprintf(str, capacity, "%d", *(int16_t*)buf);
×
373
      break;
×
374

375
    case TSDB_DATA_TYPE_INT:
×
376
      n = tsnprintf(str, capacity, "%d", *(int32_t*)buf);
×
377
      break;
×
378

379
    case TSDB_DATA_TYPE_BIGINT:
×
380
    case TSDB_DATA_TYPE_TIMESTAMP:
381
      n = tsnprintf(str, capacity, "%" PRId64, *(int64_t*)buf);
×
382
      break;
×
383

384
    case TSDB_DATA_TYPE_FLOAT:
×
385
      n = tsnprintf(str, capacity, "%e", GET_FLOAT_VAL(buf));
×
386
      break;
×
387

388
    case TSDB_DATA_TYPE_DOUBLE:
×
389
      n = tsnprintf(str, capacity, "%e", GET_DOUBLE_VAL(buf));
×
390
      break;
×
391

392
    case TSDB_DATA_TYPE_VARBINARY: {
×
393
      if (bufSize < 0) {
×
394
        //        tscError("invalid buf size");
395
        return TSDB_CODE_TSC_INVALID_VALUE;
×
396
      }
397
      void*    data = NULL;
×
398
      uint32_t size = 0;
×
399
      if (taosAscii2Hex(buf, bufSize, &data, &size) < 0) {
×
400
        return TSDB_CODE_OUT_OF_MEMORY;
×
401
      }
402
      *str = '"';
×
403
      memcpy(str + 1, data, size);
×
404
      *(str + size + 1) = '"';
×
405
      n = size + 2;
×
406
      taosMemoryFree(data);
×
407
      break;
×
408
    }
409
    case TSDB_DATA_TYPE_BINARY:
×
410
    case TSDB_DATA_TYPE_GEOMETRY:
411
      if (bufSize < 0) {
×
412
        //        tscError("invalid buf size");
413
        return TSDB_CODE_TSC_INVALID_VALUE;
×
414
      }
415

416
      *str = '"';
×
417
      memcpy(str + 1, buf, bufSize);
×
418
      *(str + bufSize + 1) = '"';
×
419
      n = bufSize + 2;
×
420
      break;
×
421
    case TSDB_DATA_TYPE_NCHAR:
×
422
      if (bufSize < 0) {
×
423
        //        tscError("invalid buf size");
424
        return TSDB_CODE_TSC_INVALID_VALUE;
×
425
      }
426

427
      *str = '"';
×
428
      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1, NULL);
×
429
      if (length <= 0) {
×
430
        return TSDB_CODE_TSC_INVALID_VALUE;
×
431
      }
432
      *(str + length + 1) = '"';
×
433
      n = length + 2;
×
434
      break;
×
435
    case TSDB_DATA_TYPE_UTINYINT:
×
436
      n = tsnprintf(str, capacity, "%d", *(uint8_t*)buf);
×
437
      break;
×
438

439
    case TSDB_DATA_TYPE_USMALLINT:
×
440
      n = tsnprintf(str, capacity, "%d", *(uint16_t*)buf);
×
441
      break;
×
442

443
    case TSDB_DATA_TYPE_UINT:
×
444
      n = tsnprintf(str, capacity, "%u", *(uint32_t*)buf);
×
445
      break;
×
446

447
    case TSDB_DATA_TYPE_UBIGINT:
×
448
      n = tsnprintf(str, capacity, "%" PRIu64, *(uint64_t*)buf);
×
449
      break;
×
450

451
    default:
×
452
      //      tscError("unsupported type:%d", type);
453
      return TSDB_CODE_TSC_INVALID_VALUE;
×
454
  }
455

456
  if (len) *len = n;
×
457

458
  return TSDB_CODE_SUCCESS;
×
459
}
460

461
void parseTagDatatoJson(void* p, char** jsonStr, void *charsetCxt) {
4✔
462
  if (!p || !jsonStr) {
4!
463
    qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
×
464
    return;
×
465
  }
466
  char*   string = NULL;
4✔
467
  SArray* pTagVals = NULL;
4✔
468
  cJSON*  json = NULL;
4✔
469
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
4!
470
    goto end;
×
471
  }
472

473
  int16_t nCols = taosArrayGetSize(pTagVals);
4✔
474
  if (nCols == 0) {
4!
475
    goto end;
×
476
  }
477
  char tagJsonKey[256] = {0};
4✔
478
  json = cJSON_CreateObject();
4✔
479
  if (json == NULL) {
4!
480
    goto end;
×
481
  }
482
  for (int j = 0; j < nCols; ++j) {
8✔
483
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
4✔
484
    if (pTagVal == NULL) {
4!
485
      continue;
×
486
    }
487
    // json key  encode by binary
488
    tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
4✔
489
    // json value
490
    char type = pTagVal->type;
4✔
491
    if (type == TSDB_DATA_TYPE_NULL) {
4!
492
      cJSON* value = cJSON_CreateNull();
×
493
      if (value == NULL) {
×
494
        goto end;
×
495
      }
496
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
×
497
        goto end;
×
498
      }
499
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
4!
500
      cJSON* value = NULL;
4✔
501
      if (pTagVal->nData > 0) {
4!
502
        char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
4!
503
        if (tagJsonValue == NULL) {
4!
504
          goto end;
×
505
        }
506
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue, charsetCxt);
4✔
507
        if (length < 0) {
4!
508
          qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC,
×
509
                 charsetCxt != NULL ? ((SConvInfo *)(charsetCxt))->charset : tsCharset,
510
                 pTagVal->pData);
511
          taosMemoryFree(tagJsonValue);
×
512
          goto end;
×
513
        }
514
        value = cJSON_CreateString(tagJsonValue);
4✔
515
        taosMemoryFree(tagJsonValue);
4!
516
        if (value == NULL) {
4!
517
          goto end;
×
518
        }
519
      } else if (pTagVal->nData == 0) {
×
520
        value = cJSON_CreateString("");
×
521
        if (value == NULL) {
×
522
          goto end;
×
523
        }
524
      } else {
525
        goto end;
×
526
      }
527

528
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
4!
529
        goto end;
×
530
      }
531
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
×
532
      double jsonVd = *(double*)(&pTagVal->i64);
×
533
      cJSON* value = cJSON_CreateNumber(jsonVd);
×
534
      if (value == NULL) {
×
535
        goto end;
×
536
      }
537
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
×
538
        goto end;
×
539
      }
540
    } else if (type == TSDB_DATA_TYPE_BOOL) {
×
541
      char   jsonVd = *(char*)(&pTagVal->i64);
×
542
      cJSON* value = cJSON_CreateBool(jsonVd);
×
543
      if (value == NULL) {
×
544
        goto end;
×
545
      }
546
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
×
547
        goto end;
×
548
      }
549
    } else {
550
      goto end;
×
551
    }
552
  }
553
  string = cJSON_PrintUnformatted(json);
4✔
554
end:
4✔
555
  cJSON_Delete(json);
4✔
556
  taosArrayDestroy(pTagVals);
4✔
557
  if (string == NULL) {
4!
558
    string = taosStrdup(TSDB_DATA_NULL_STR_L);
×
559
    if(string == NULL) {
×
560
      qError("failed to strdup null string");
×
561
    }
562
  }
563
  *jsonStr = string;
4✔
564
}
565

566
int32_t setColRef(SColRef* colRef, col_id_t colId, char* refColName, char* refTableName, char* refDbName) {
×
567
  colRef->id = colId;
×
568
  colRef->hasRef = true;
×
569
  tstrncpy(colRef->refDbName, refDbName, TSDB_DB_NAME_LEN);
×
570
  tstrncpy(colRef->refTableName, refTableName, TSDB_TABLE_NAME_LEN);
×
571
  tstrncpy(colRef->refColName, refColName, TSDB_COL_NAME_LEN);
×
572
  return TSDB_CODE_SUCCESS;
×
573
}
574

575
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
13,341✔
576
  QUERY_PARAM_CHECK(pDst);
13,341!
577
  if (NULL == pSrc) {
13,341✔
578
    *pDst = NULL;
9✔
579
    return TSDB_CODE_SUCCESS;
9✔
580
  }
581

582
  int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
13,332✔
583
  if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
13,332!
584
    *pDst = NULL;
×
585
    qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
×
586
    return TSDB_CODE_INVALID_PARA;
×
587
  }
588

589
  int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
13,332✔
590
  int32_t schemaExtSize = 0;
13,332✔
591
  int32_t colRefSize = 0;
13,332✔
592
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
13,332!
593
    schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
13,312✔
594
  }
595
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
13,332!
596
    colRefSize = pSrc->numOfColRefs * sizeof(SColRef);
×
597
  }
598
  *pDst = taosMemoryMalloc(metaSize + schemaExtSize + colRefSize);
13,332!
599
  if (NULL == *pDst) {
13,332!
600
    return terrno;
×
601
  }
602
  memcpy(*pDst, pSrc, metaSize);
13,332✔
603
  if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
13,332!
604
    (*pDst)->schemaExt = (SSchemaExt*)((char*)*pDst + metaSize);
13,312✔
605
    memcpy((*pDst)->schemaExt, pSrc->schemaExt, schemaExtSize);
13,312✔
606
  } else {
607
    (*pDst)->schemaExt = NULL;
20✔
608
  }
609
  if (hasRefCol(pSrc->tableType) && pSrc->colRef) {
13,332!
610
    (*pDst)->colRef = (SColRef*)((char*)*pDst + metaSize + schemaExtSize);
×
611
    memcpy((*pDst)->colRef, pSrc->colRef, colRefSize);
×
612
  } else {
613
    (*pDst)->colRef = NULL;
13,332✔
614
  }
615

616
  return TSDB_CODE_SUCCESS;
13,332✔
617
}
618

619
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
8✔
620
  if(!pMeta || !pName || !pType) return;
8!
621
  int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
8✔
622
  for (int32_t i = 0; i < nums; ++i) {
32!
623
    if (0 == strcmp(pName, pMeta->schema[i].name)) {
32✔
624
      *pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
8!
625
      return;
8✔
626
    }
627
  }
628

629
  *pType = TCOL_TYPE_NONE;
×
630
}
631

632
void freeVgInfo(SDBVgInfo* vgInfo) {
4,363✔
633
  if (NULL == vgInfo) {
4,363✔
634
    return;
1,705✔
635
  }
636

637
  taosHashCleanup(vgInfo->vgHash);
2,658✔
638
  taosArrayDestroy(vgInfo->vgArray);
2,659✔
639

640
  taosMemoryFreeClear(vgInfo);
2,659!
641
}
642

643
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
192✔
644
  QUERY_PARAM_CHECK(pDst);
192!
645
  if (NULL == pSrc) {
192!
646
    *pDst = NULL;
×
647
    return TSDB_CODE_SUCCESS;
×
648
  }
649

650
  *pDst = taosMemoryMalloc(sizeof(*pSrc));
192!
651
  if (NULL == *pDst) {
192!
652
    return terrno;
×
653
  }
654
  memcpy(*pDst, pSrc, sizeof(*pSrc));
192✔
655
  (*pDst)->vgArray = NULL;
192✔
656

657
  if (pSrc->vgHash) {
192✔
658
    (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
169✔
659
                                   HASH_ENTRY_LOCK);
660
    if (NULL == (*pDst)->vgHash) {
169!
661
      taosMemoryFreeClear(*pDst);
×
662
      return terrno;
×
663
    }
664

665
    SVgroupInfo* vgInfo = NULL;
169✔
666
    void*        pIter = taosHashIterate(pSrc->vgHash, NULL);
169✔
667
    while (pIter) {
563✔
668
      vgInfo = pIter;
394✔
669
      int32_t* vgId = taosHashGetKey(pIter, NULL);
394✔
670

671
      if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
394!
672
        qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
×
673
        taosHashCancelIterate(pSrc->vgHash, pIter);
×
674
        freeVgInfo(*pDst);
×
675
        return terrno;
×
676
      }
677

678
      pIter = taosHashIterate(pSrc->vgHash, pIter);
394✔
679
    }
680
  }
681

682
  return TSDB_CODE_SUCCESS;
192✔
683
}
684

685
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
761✔
686
  QUERY_PARAM_CHECK(pDst);
761!
687
  if (NULL == pSrc) {
761✔
688
    *pDst = NULL;
155✔
689
    return TSDB_CODE_SUCCESS;
155✔
690
  }
691

692
  *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
606!
693
  if (NULL == *pDst) {
605!
694
    return terrno;
×
695
  }
696

697
  (*pDst)->flags = pSrc->flags;
605✔
698
  if (pSrc->name) {
605!
699
    (*pDst)->name = taosStrdup(pSrc->name);
605!
700
    if (NULL == (*pDst)->name) goto _exit;
606!
701
  }
702
  (*pDst)->uid = pSrc->uid;
606✔
703
  (*pDst)->btime = pSrc->btime;
606✔
704
  (*pDst)->ttl = pSrc->ttl;
606✔
705
  (*pDst)->commentLen = pSrc->commentLen;
606✔
706
  if (pSrc->comment) {
606!
707
    (*pDst)->comment = taosStrdup(pSrc->comment);
×
708
    if (NULL == (*pDst)->comment) goto _exit;
×
709
  }
710
  (*pDst)->type = pSrc->type;
606✔
711

712
  if (pSrc->type == TSDB_CHILD_TABLE) {
606!
713
    if (pSrc->ctb.stbName) {
606!
714
      (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
606!
715
      if (NULL == (*pDst)->ctb.stbName) goto _exit;
606!
716
    }
717
    (*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
606✔
718
    (*pDst)->ctb.suid = pSrc->ctb.suid;
606✔
719
    if (pSrc->ctb.tagName) {
606!
720
      (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
606✔
721
      if (NULL == (*pDst)->ctb.tagName) goto _exit;
606!
722
    }
723
    STag* pTag = (STag*)pSrc->ctb.pTag;
606✔
724
    if (pTag) {
606!
725
      (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
606!
726
      if(NULL == (*pDst)->ctb.pTag) goto _exit;
606!
727
      memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
606✔
728
    }
729
  } else {
730
    (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
×
731
    (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
×
732
    if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
×
733
      (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
734
      if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
×
735
      memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
736
    }
737
  }
738

739
  return TSDB_CODE_SUCCESS;
606✔
740

741
_exit:
×
742
  tdDestroySVCreateTbReq(*pDst);
×
743
  taosMemoryFree(*pDst);
×
744
  *pDst = NULL;
×
745
  return terrno;
×
746
}
747

748
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
1,446✔
749
  if (pInfo) {
1,446✔
750
    taosArrayDestroy(pInfo->pRetensions);
298✔
751
  }
752
  taosMemoryFree(pInfo);
1,446!
753
}
1,446✔
754

755
void* getTaskPoolWorkerCb() {
112,136✔
756
  return taskQueue.wrokrerPool.pCb;
112,136✔
757
}
758

759

760
void tFreeStreamVtbOtbInfo(void* param);
761
void tFreeStreamVtbVtbInfo(void* param);
762
void tFreeStreamVtbDbVgInfo(void* param);
763

764

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc