• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.36
/source/libs/qcom/src/queryUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "os.h"
17
#include "query.h"
18
#include "tglobal.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21
#include "tsched.h"
22
#include "tworker.h"
23
// clang-format off
24
#include "cJSON.h"
25
#include "queryInt.h"
26

27
typedef struct STaskQueue {
28
  SQueryAutoQWorkerPool wrokrerPool;
29
  STaosQueue* pTaskQueue;
30
} STaskQueue;
31

32
int32_t getAsofJoinReverseOp(EOperatorType op) {
252✔
33
  switch (op) {
252!
34
    case OP_TYPE_GREATER_THAN:
8✔
35
      return OP_TYPE_LOWER_THAN;
8✔
36
    case OP_TYPE_GREATER_EQUAL:
8✔
37
      return OP_TYPE_LOWER_EQUAL;
8✔
38
    case OP_TYPE_LOWER_THAN:
7✔
39
      return OP_TYPE_GREATER_THAN;
7✔
40
    case OP_TYPE_LOWER_EQUAL:
17✔
41
      return OP_TYPE_GREATER_EQUAL;
17✔
42
    case OP_TYPE_EQUAL:
212✔
43
      return OP_TYPE_EQUAL;
212✔
44
    default:
×
45
      break;
×
46
  }
47

48
  return -1;
×
49
}
50

51
const SSchema* tGetTbnameColumnSchema() { 
×
52
  static struct SSchema _s = {
53
      .colId = TSDB_TBNAME_COLUMN_INDEX,
54
      .type = TSDB_DATA_TYPE_BINARY,
55
      .bytes = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE,
56
      .name = "tbname",
57
  };
58
  return &_s; 
×
59
}
60

61
static bool doValidateSchema(SSchema* pSchema, int32_t numOfCols, int32_t maxLen) {
123,414✔
62
  if (!pSchema) {
123,414!
63
    return false;
×
64
  }
65
  int32_t rowLen = 0;
123,414✔
66

67
  for (int32_t i = 0; i < numOfCols; ++i) {
1,095,606✔
68
    // 1. valid types
69
    if (!isValidDataType(pSchema[i].type)) {
972,191!
70
      return false;
×
71
    }
72

73
    // 2. valid length for each type
74
    if (pSchema[i].type == TSDB_DATA_TYPE_BINARY || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY) {
972,192✔
75
      if (pSchema[i].bytes > TSDB_MAX_BINARY_LEN) {
64,587!
76
        return false;
×
77
      }
78
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
907,605✔
79
      if (pSchema[i].bytes > TSDB_MAX_NCHAR_LEN) {
80,270!
80
        return false;
×
81
      }
82
    } else if (pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
827,335✔
83
      if (pSchema[i].bytes > TSDB_MAX_GEOMETRY_LEN) {
177!
84
        return false;
×
85
      }
86
    } else {
87
      if (pSchema[i].bytes != tDataTypes[pSchema[i].type].bytes) {
827,158!
88
        return false;
×
89
      }
90
    }
91

92
    // 3. valid column names
93
    for (int32_t j = i + 1; j < numOfCols; ++j) {
716,598,180✔
94
      if (strncmp(pSchema[i].name, pSchema[j].name, sizeof(pSchema[i].name) - 1) == 0) {
715,625,988!
95
        return false;
×
96
      }
97
    }
98

99
    rowLen += pSchema[i].bytes;
972,192✔
100
  }
101

102
  return rowLen <= maxLen;
123,415✔
103
}
104

105
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags) {
61,708✔
106
  if (!pSchema || !VALIDNUMOFCOLS(numOfCols)) {
61,708!
107
    return false;
×
108
  }
109

110
  if (!VALIDNUMOFTAGS(numOfTags)) {
61,708!
111
    return false;
×
112
  }
113

114
  /* first column must be the timestamp, which is a primary key */
115
  if (pSchema[0].type != TSDB_DATA_TYPE_TIMESTAMP) {
61,710!
116
    return false;
×
117
  }
118

119
  if (!doValidateSchema(pSchema, numOfCols, TSDB_MAX_BYTES_PER_ROW)) {
61,710!
120
    return false;
×
121
  }
122

123
  if (!doValidateSchema(&pSchema[numOfCols], numOfTags, TSDB_MAX_TAGS_LEN)) {
61,707!
124
    return false;
×
125
  }
126

127
  return true;
61,709✔
128
}
129

130
static STaskQueue taskQueue = {0};
131

132
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
17,761,987✔
133
  if(!pSchedMsg || !pSchedMsg->ahandle) return;
17,761,987!
134
  __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
17,762,488✔
135
  (void)execFn(pSchedMsg->thandle);
17,762,488✔
136
  taosFreeQitem(pSchedMsg);
17,757,453✔
137
}
138

139
int32_t initTaskQueue() {
4,109✔
140
  taskQueue.wrokrerPool.name = "taskWorkPool";
4,109✔
141
  taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
4,109✔
142
  taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
4,109✔
143
  int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
4,109✔
144
  if (TSDB_CODE_SUCCESS != coce) {
4,109!
145
    qError("failed to init task thread pool");
×
146
    return -1;
×
147
  }
148

149
  taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
4,109✔
150
  if (NULL == taskQueue.pTaskQueue) {
4,109!
151
    qError("failed to init task queue");
×
152
    return -1;
×
153
  }
154

155
  qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
4,109✔
156
  return 0;
4,109✔
157
}
158

159
int32_t cleanupTaskQueue() {
4,109✔
160
  tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
4,109✔
161
  return 0;
4,109✔
162
}
163

164
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
17,733,475✔
165
  SSchedMsg* pSchedMsg; 
166
  int32_t rc = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0, (void **)&pSchedMsg);
17,733,475✔
167
  if (rc) return rc;
17,748,703!
168
  pSchedMsg->fp = NULL;
17,748,703✔
169
  pSchedMsg->ahandle = execFn;
17,748,703✔
170
  pSchedMsg->thandle = execParam;
17,748,703✔
171
  pSchedMsg->msg = code;
17,748,703✔
172

173
  return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
17,748,703✔
174
}
175

176
int32_t taosAsyncWait() {
665✔
177
  if (!taskQueue.wrokrerPool.pCb) {
665!
178
    qError("query task thread pool callback function is null");
×
179
    return -1;
×
180
  }
181
  return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
665✔
182
}
183

184
int32_t taosAsyncRecover() {
665✔
185
  if (!taskQueue.wrokrerPool.pCb) {
665!
186
    qError("query task thread pool callback function is null");
×
187
    return -1;
×
188
  }
189
  return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
665✔
190
}
191

192
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
29,442,316✔
193
  if (NULL == pMsgBody) {
29,442,316!
194
    return;
×
195
  }
196

197
  taosMemoryFreeClear(pMsgBody->target.dbFName);
29,442,316✔
198
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
29,445,922✔
199
  if (pMsgBody->paramFreeFp) {
29,447,094✔
200
    (*pMsgBody->paramFreeFp)(pMsgBody->param);
29,115,530✔
201
  }
202
  taosMemoryFreeClear(pMsgBody);
29,449,525!
203
}
204
void destroyAhandle(void *ahandle) {
1,833,824✔
205
  SMsgSendInfo *pSendInfo = ahandle;
1,833,824✔
206
  if (pSendInfo == NULL) return;
1,833,824!
207

208
  destroySendMsgInfo(pSendInfo);
1,833,824✔
209
}
210

211
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo,
21,391,084✔
212
                                bool persistHandle, void* rpcCtx) {                         
213
  QUERY_PARAM_CHECK(pTransporter);
21,391,084!
214
  QUERY_PARAM_CHECK(epSet);
21,391,083!
215
  QUERY_PARAM_CHECK(pInfo);
21,391,083!
216

217
  char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
21,391,083✔
218
  if (NULL == pMsg) {
21,420,485!
219
    qError("0x%" PRIx64 " msg:%s malloc failed", pInfo->requestId, TMSG_INFO(pInfo->msgType));
×
220
    destroySendMsgInfo(pInfo);
×
221
    return terrno;
×
222
  }
223

224
  memcpy(pMsg, pInfo->msgInfo.pData, pInfo->msgInfo.len);
21,420,485✔
225
  SRpcMsg rpcMsg = {
21,420,485✔
226
    .msgType = pInfo->msgType,
21,420,485✔
227
    .pCont = pMsg,
228
    .contLen = pInfo->msgInfo.len,
21,420,485✔
229
    .info.ahandle = (void*)pInfo,
230
    .info.handle = pInfo->msgInfo.handle,
21,420,485✔
231
    .info.persistHandle = persistHandle,
232
    .code = 0
233
  };
234
  TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
21,420,485✔
235

236
  int code = rpcSendRequestWithCtx(pTransporter, epSet, &rpcMsg, pTransporterId, rpcCtx);
21,420,485✔
237
  if (code) {
21,489,656✔
238
    destroySendMsgInfo(pInfo);
1,076✔
239
  }
240
  return code;
21,491,563✔
241
}
242

243
int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) {
6,186,971✔
244
  return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL);
6,186,971✔
245
}
246
int32_t asyncFreeConnById(void* pTransporter, int64_t pid) {
5,630,581✔
247
  QUERY_PARAM_CHECK(pTransporter);
5,630,581!
248
  return rpcFreeConnById(pTransporter, pid);
5,630,581✔
249
}
250

251
char* jobTaskStatusStr(int32_t status) {
30,177,307✔
252
  switch (status) {
30,177,307!
253
    case JOB_TASK_STATUS_NULL:
2,277,050✔
254
      return "NULL";
2,277,050✔
255
    case JOB_TASK_STATUS_INIT:
4,984,591✔
256
      return "INIT";
4,984,591✔
257
    case JOB_TASK_STATUS_EXEC:
10,508,716✔
258
      return "EXECUTING";
10,508,716✔
259
    case JOB_TASK_STATUS_PART_SUCC:
5,107,723✔
260
      return "PARTIAL_SUCCEED";
5,107,723✔
261
    case JOB_TASK_STATUS_FETCH:
1,235,846✔
262
      return "FETCHING";
1,235,846✔
263
    case JOB_TASK_STATUS_SUCC:
1,507,142✔
264
      return "SUCCEED";
1,507,142✔
265
    case JOB_TASK_STATUS_FAIL:
2,111✔
266
      return "FAILED";
2,111✔
267
    case JOB_TASK_STATUS_DROP:
4,554,178✔
268
      return "DROPPING";
4,554,178✔
269
    default:
×
270
      break;
×
271
  }
272

273
  return "UNKNOWN";
×
274
}
275

276
#if 0
277
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) {
278
  SSchema s = {0};
279
  s.type = type;
280
  s.bytes = bytes;
281
  s.colId = colId;
282

283
  tstrncpy(s.name, name, tListLen(s.name));
284
  return s;
285
}
286
#endif
287

288
void freeSTableMetaRspPointer(void *p) {
91,544✔
289
  tFreeSTableMetaRsp(*(void**)p);
91,544✔
290
  taosMemoryFreeClear(*(void**)p);
91,545!
291
}
91,546✔
292

293
void destroyQueryExecRes(SExecResult* pRes) {
31,884,510✔
294
  if (NULL == pRes || NULL == pRes->res) {
31,884,510!
295
    return;
21,511,244✔
296
  }
297

298
  switch (pRes->msgType) {
10,373,266!
299
    case TDMT_VND_CREATE_TABLE: {
57,613✔
300
      taosArrayDestroyEx((SArray*)pRes->res, freeSTableMetaRspPointer);
57,613✔
301
      break;
57,614✔
302
    }
303
    case TDMT_MND_CREATE_STB:
10,514✔
304
    case TDMT_VND_ALTER_TABLE:
305
    case TDMT_MND_ALTER_STB: {
306
      tFreeSTableMetaRsp(pRes->res);
10,514✔
307
      taosMemoryFreeClear(pRes->res);
10,514!
308
      break;
10,514✔
309
    }
310
    case TDMT_VND_SUBMIT: {
9,599,600✔
311
      tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
9,599,600✔
312
      taosMemoryFreeClear(pRes->res);
9,601,979!
313
      break;
9,603,095✔
314
    }
315
    case TDMT_SCH_QUERY:
705,542✔
316
    case TDMT_SCH_MERGE_QUERY: {
317
      taosArrayDestroy((SArray*)pRes->res);
705,542✔
318
      break;
705,544✔
319
    }
UNCOV
320
    default:
×
UNCOV
321
      qError("invalid exec result for request type %d", pRes->msgType);
×
322
  }
323
}
324
// clang-format on
325
int32_t dataConverToStr(char* str, int64_t capacity, int type, void* buf, int32_t bufSize, int32_t* len) {
140✔
326
  QUERY_PARAM_CHECK(str);
140!
327
  QUERY_PARAM_CHECK(buf);
140!
328
  int32_t n = 0;
140✔
329

330
  switch (type) {
140!
331
    case TSDB_DATA_TYPE_NULL:
×
332
      n = tsnprintf(str, capacity, "null");
×
333
      break;
×
334

335
    case TSDB_DATA_TYPE_BOOL:
3✔
336
      n = tsnprintf(str, capacity, (*(int8_t*)buf) ? "true" : "false");
3✔
337
      break;
3✔
338

339
    case TSDB_DATA_TYPE_TINYINT:
1✔
340
      n = tsnprintf(str, capacity, "%d", *(int8_t*)buf);
1✔
341
      break;
1✔
342

343
    case TSDB_DATA_TYPE_SMALLINT:
1✔
344
      n = tsnprintf(str, capacity, "%d", *(int16_t*)buf);
1✔
345
      break;
1✔
346

347
    case TSDB_DATA_TYPE_INT:
17✔
348
      n = tsnprintf(str, capacity, "%d", *(int32_t*)buf);
17✔
349
      break;
17✔
350

351
    case TSDB_DATA_TYPE_BIGINT:
3✔
352
    case TSDB_DATA_TYPE_TIMESTAMP:
353
      n = tsnprintf(str, capacity, "%" PRId64, *(int64_t*)buf);
3✔
354
      break;
3✔
355

356
    case TSDB_DATA_TYPE_FLOAT:
2✔
357
      n = tsnprintf(str, capacity, "%e", GET_FLOAT_VAL(buf));
2✔
358
      break;
2✔
359

360
    case TSDB_DATA_TYPE_DOUBLE:
1✔
361
      n = tsnprintf(str, capacity, "%e", GET_DOUBLE_VAL(buf));
1✔
362
      break;
1✔
363

364
    case TSDB_DATA_TYPE_VARBINARY: {
×
365
      if (bufSize < 0) {
×
366
        //        tscError("invalid buf size");
367
        return TSDB_CODE_TSC_INVALID_VALUE;
×
368
      }
369
      void*    data = NULL;
×
370
      uint32_t size = 0;
×
371
      if (taosAscii2Hex(buf, bufSize, &data, &size) < 0) {
×
372
        return TSDB_CODE_OUT_OF_MEMORY;
×
373
      }
374
      *str = '"';
×
375
      memcpy(str + 1, data, size);
×
376
      *(str + size + 1) = '"';
×
377
      n = size + 2;
×
378
      taosMemoryFree(data);
×
379
      break;
×
380
    }
381
    case TSDB_DATA_TYPE_BINARY:
4✔
382
    case TSDB_DATA_TYPE_GEOMETRY:
383
      if (bufSize < 0) {
4!
384
        //        tscError("invalid buf size");
385
        return TSDB_CODE_TSC_INVALID_VALUE;
×
386
      }
387

388
      *str = '"';
4✔
389
      memcpy(str + 1, buf, bufSize);
4✔
390
      *(str + bufSize + 1) = '"';
4✔
391
      n = bufSize + 2;
4✔
392
      break;
4✔
393
    case TSDB_DATA_TYPE_NCHAR:
104✔
394
      if (bufSize < 0) {
104!
395
        //        tscError("invalid buf size");
396
        return TSDB_CODE_TSC_INVALID_VALUE;
×
397
      }
398

399
      *str = '"';
104✔
400
      int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1);
104✔
401
      if (length <= 0) {
104!
402
        return TSDB_CODE_TSC_INVALID_VALUE;
×
403
      }
404
      *(str + length + 1) = '"';
104✔
405
      n = length + 2;
104✔
406
      break;
104✔
407
    case TSDB_DATA_TYPE_UTINYINT:
1✔
408
      n = tsnprintf(str, capacity, "%d", *(uint8_t*)buf);
1✔
409
      break;
1✔
410

411
    case TSDB_DATA_TYPE_USMALLINT:
1✔
412
      n = tsnprintf(str, capacity, "%d", *(uint16_t*)buf);
1✔
413
      break;
1✔
414

415
    case TSDB_DATA_TYPE_UINT:
1✔
416
      n = tsnprintf(str, capacity, "%u", *(uint32_t*)buf);
1✔
417
      break;
1✔
418

419
    case TSDB_DATA_TYPE_UBIGINT:
1✔
420
      n = tsnprintf(str, capacity, "%" PRIu64, *(uint64_t*)buf);
1✔
421
      break;
1✔
422

423
    default:
×
424
      //      tscError("unsupported type:%d", type);
425
      return TSDB_CODE_TSC_INVALID_VALUE;
×
426
  }
427

428
  if (len) *len = n;
140✔
429

430
  return TSDB_CODE_SUCCESS;
140✔
431
}
432

433
void parseTagDatatoJson(void* p, char** jsonStr) {
1,033✔
434
  if (!p || !jsonStr) {
1,033!
435
    qError("parseTagDatatoJson invalid input, line:%d", __LINE__);
×
436
    return;
×
437
  }
438
  char*   string = NULL;
1,033✔
439
  SArray* pTagVals = NULL;
1,033✔
440
  cJSON*  json = NULL;
1,033✔
441
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
1,033!
442
    goto end;
×
443
  }
444

445
  int16_t nCols = taosArrayGetSize(pTagVals);
1,033✔
446
  if (nCols == 0) {
1,033✔
447
    goto end;
188✔
448
  }
449
  char tagJsonKey[256] = {0};
845✔
450
  json = cJSON_CreateObject();
845✔
451
  if (json == NULL) {
845!
452
    goto end;
×
453
  }
454
  for (int j = 0; j < nCols; ++j) {
2,802✔
455
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
1,957✔
456
    if (pTagVal == NULL) {
1,957!
457
      continue;
×
458
    }
459
    // json key  encode by binary
460
    tstrncpy(tagJsonKey, pTagVal->pKey, sizeof(tagJsonKey));
1,957✔
461
    // json value
462
    char type = pTagVal->type;
1,957✔
463
    if (type == TSDB_DATA_TYPE_NULL) {
1,957✔
464
      cJSON* value = cJSON_CreateNull();
200✔
465
      if (value == NULL) {
200!
466
        goto end;
×
467
      }
468
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
200!
469
        goto end;
×
470
      }
471
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
1,757✔
472
      cJSON* value = NULL;
1,095✔
473
      if (pTagVal->nData > 0) {
1,095✔
474
        char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
935✔
475
        if (tagJsonValue == NULL) {
935!
476
          goto end;
×
477
        }
478
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
935✔
479
        if (length < 0) {
935!
480
          qError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
×
481
                 pTagVal->pData);
482
          taosMemoryFree(tagJsonValue);
×
483
          goto end;
×
484
        }
485
        value = cJSON_CreateString(tagJsonValue);
935✔
486
        taosMemoryFree(tagJsonValue);
935✔
487
        if (value == NULL) {
935!
488
          goto end;
×
489
        }
490
      } else if (pTagVal->nData == 0) {
160!
491
        value = cJSON_CreateString("");
160✔
492
        if (value == NULL) {
160!
493
          goto end;
×
494
        }
495
      } else {
496
        goto end;
×
497
      }
498

499
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
1,095!
500
        goto end;
×
501
      }
502
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
662✔
503
      double jsonVd = *(double*)(&pTagVal->i64);
446✔
504
      cJSON* value = cJSON_CreateNumber(jsonVd);
446✔
505
      if (value == NULL) {
446!
506
        goto end;
×
507
      }
508
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
446!
509
        goto end;
×
510
      }
511
    } else if (type == TSDB_DATA_TYPE_BOOL) {
216!
512
      char   jsonVd = *(char*)(&pTagVal->i64);
216✔
513
      cJSON* value = cJSON_CreateBool(jsonVd);
216✔
514
      if (value == NULL) {
216!
515
        goto end;
×
516
      }
517
      if (!cJSON_AddItemToObject(json, tagJsonKey, value)) {
216!
518
        goto end;
×
519
      }
520
    } else {
521
      goto end;
×
522
    }
523
  }
524
  string = cJSON_PrintUnformatted(json);
845✔
525
end:
1,033✔
526
  cJSON_Delete(json);
1,033✔
527
  taosArrayDestroy(pTagVals);
1,033✔
528
  if (string == NULL) {
1,033✔
529
    string = taosStrdup(TSDB_DATA_NULL_STR_L);
188✔
530
    if(string == NULL) {
188!
531
      qError("failed to strdup null string");
×
532
    }
533
  }
534
  *jsonStr = string;
1,033✔
535
}
536

537
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
97,627✔
538
  QUERY_PARAM_CHECK(pDst);
97,627!
539
  if (NULL == pSrc) {
97,627!
540
    *pDst = NULL;
×
541
    return TSDB_CODE_SUCCESS;
×
542
  }
543

544
  int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
97,627✔
545
  if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
97,627!
546
    *pDst = NULL;
×
547
    qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
×
548
    return TSDB_CODE_INVALID_PARA;
×
549
  }
550

551
  int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
97,627✔
552
  int32_t schemaExtSize = 0;
97,627✔
553
  if (useCompress(pSrc->tableType) && pSrc->schemaExt) {
97,627!
554
    schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
97,597✔
555
  }
556
  *pDst = taosMemoryMalloc(metaSize + schemaExtSize);
97,627✔
557
  if (NULL == *pDst) {
97,627!
558
    return terrno;
×
559
  }
560
  memcpy(*pDst, pSrc, metaSize);
97,627✔
561
  if (useCompress(pSrc->tableType) && pSrc->schemaExt) {
97,627!
562
    (*pDst)->schemaExt = (SSchemaExt*)((char*)*pDst + metaSize);
97,597✔
563
    memcpy((*pDst)->schemaExt, pSrc->schemaExt, schemaExtSize);
97,597✔
564
  } else {
565
    (*pDst)->schemaExt = NULL;
30✔
566
  }
567

568
  return TSDB_CODE_SUCCESS;
97,627✔
569
}
570

571
void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* pType) {
15✔
572
  if(!pMeta || !pName || !pType) return;
15!
573
  int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns;
15✔
574
  for (int32_t i = 0; i < nums; ++i) {
101✔
575
    if (0 == strcmp(pName, pMeta->schema[i].name)) {
100✔
576
      *pType = (i < pMeta->tableInfo.numOfColumns) ? TCOL_TYPE_COLUMN : TCOL_TYPE_TAG;
14✔
577
      return;
14✔
578
    }
579
  }
580

581
  *pType = TCOL_TYPE_NONE;
1✔
582
}
583

584
void freeVgInfo(SDBVgInfo* vgInfo) {
136,771✔
585
  if (NULL == vgInfo) {
136,771✔
586
    return;
36,186✔
587
  }
588

589
  taosHashCleanup(vgInfo->vgHash);
100,585✔
590
  taosArrayDestroy(vgInfo->vgArray);
100,587✔
591

592
  taosMemoryFreeClear(vgInfo);
100,588!
593
}
594

595
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
18,374✔
596
  QUERY_PARAM_CHECK(pDst);
18,374!
597
  if (NULL == pSrc) {
18,374!
598
    *pDst = NULL;
×
599
    return TSDB_CODE_SUCCESS;
×
600
  }
601

602
  *pDst = taosMemoryMalloc(sizeof(*pSrc));
18,374✔
603
  if (NULL == *pDst) {
18,374!
604
    return terrno;
×
605
  }
606
  memcpy(*pDst, pSrc, sizeof(*pSrc));
18,374✔
607
  (*pDst)->vgArray = NULL;
18,374✔
608

609
  if (pSrc->vgHash) {
18,374✔
610
    (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
17,497✔
611
                                   HASH_ENTRY_LOCK);
612
    if (NULL == (*pDst)->vgHash) {
17,497!
613
      taosMemoryFreeClear(*pDst);
×
614
      return terrno;
×
615
    }
616

617
    SVgroupInfo* vgInfo = NULL;
17,497✔
618
    void*        pIter = taosHashIterate(pSrc->vgHash, NULL);
17,497✔
619
    while (pIter) {
52,210✔
620
      vgInfo = pIter;
34,713✔
621
      int32_t* vgId = taosHashGetKey(pIter, NULL);
34,713✔
622

623
      if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
34,713!
624
        qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
×
625
        taosHashCancelIterate(pSrc->vgHash, pIter);
×
626
        freeVgInfo(*pDst);
×
627
        return TSDB_CODE_OUT_OF_MEMORY;
×
628
      }
629

630
      pIter = taosHashIterate(pSrc->vgHash, pIter);
34,713✔
631
    }
632
  }
633

634
  return TSDB_CODE_SUCCESS;
18,374✔
635
}
636

637
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
1,947✔
638
  QUERY_PARAM_CHECK(pDst);
1,947!
639
  if (NULL == pSrc) {
1,947✔
640
    *pDst = NULL;
5✔
641
    return TSDB_CODE_SUCCESS;
5✔
642
  }
643

644
  *pDst = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
1,942✔
645
  if (NULL == *pDst) {
1,943!
646
    return terrno;
×
647
  }
648

649
  (*pDst)->flags = pSrc->flags;
1,943✔
650
  if (pSrc->name) {
1,943!
651
    (*pDst)->name = taosStrdup(pSrc->name);
1,943✔
652
    if (NULL == (*pDst)->name) goto _exit;
1,942!
653
  }
654
  (*pDst)->uid = pSrc->uid;
1,942✔
655
  (*pDst)->btime = pSrc->btime;
1,942✔
656
  (*pDst)->ttl = pSrc->ttl;
1,942✔
657
  (*pDst)->commentLen = pSrc->commentLen;
1,942✔
658
  if (pSrc->comment) {
1,942!
659
    (*pDst)->comment = taosStrdup(pSrc->comment);
×
660
    if (NULL == (*pDst)->comment) goto _exit;
×
661
  }
662
  (*pDst)->type = pSrc->type;
1,942✔
663

664
  if (pSrc->type == TSDB_CHILD_TABLE) {
1,942!
665
    if (pSrc->ctb.stbName) {
1,942✔
666
      (*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
1,941✔
667
      if (NULL == (*pDst)->ctb.stbName) goto _exit;
1,943!
668
    }
669
    (*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
1,944✔
670
    (*pDst)->ctb.suid = pSrc->ctb.suid;
1,944✔
671
    if (pSrc->ctb.tagName) {
1,944✔
672
      (*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
1,943✔
673
      if (NULL == (*pDst)->ctb.tagName) goto _exit;
1,944!
674
    }
675
    STag* pTag = (STag*)pSrc->ctb.pTag;
1,945✔
676
    if (pTag) {
1,945✔
677
      (*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
1,944✔
678
      if(NULL == (*pDst)->ctb.pTag) goto _exit;
1,944!
679
      memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
1,944✔
680
    }
681
  } else {
682
    (*pDst)->ntb.schemaRow.nCols = pSrc->ntb.schemaRow.nCols;
×
683
    (*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
×
684
    if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
×
685
      (*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
686
      if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
×
687
      memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
×
688
    }
689
  }
690

691
  return TSDB_CODE_SUCCESS;
1,945✔
692

693
_exit:
×
694
  tdDestroySVCreateTbReq(*pDst);
×
695
  taosMemoryFree(*pDst);
×
696
  *pDst = NULL;
×
697
  return terrno;
×
698
}
699

700
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
25,564✔
701
  if (pInfo) {
25,564✔
702
    taosArrayDestroy(pInfo->pRetensions);
5,307✔
703
  }
704
  taosMemoryFree(pInfo);
25,564✔
705
}
25,564✔
706

707
void* getTaskPoolWorkerCb() {
10,525,616✔
708
  return taskQueue.wrokrerPool.pCb;
10,525,616✔
709
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc