• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.2
/source/libs/scheduler/src/schUtil.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "command.h"
18
#include "query.h"
19
#include "schInt.h"
20
#include "tglobal.h"
21
#include "tmsg.h"
22
#include "tref.h"
23
#include "trpc.h"
24

25
FORCE_INLINE int32_t schAcquireJob(int64_t refId, SSchJob** ppJob) {
16,874,251✔
26
  qDebug("sch acquire jobId:0x%" PRIx64, refId);
16,874,251✔
27
  *ppJob = (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
16,874,798✔
28
  if (NULL == *ppJob) {
16,875,223✔
29
    return terrno;
3,372,169✔
30
  }
31

32
  return TSDB_CODE_SUCCESS;
13,503,054✔
33
}
34

35
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
11,046,716✔
36
  if (0 == refId) {
11,046,716!
37
    return TSDB_CODE_SUCCESS;
×
38
  }
39

40
  qDebug("sch release jobId:0x%" PRIx64, refId);
11,046,716✔
41
  return taosReleaseRef(schMgmt.jobRef, refId);
11,046,726✔
42
}
43

44
FORCE_INLINE int32_t schReleaseJobEx(int64_t refId, int32_t* released) {
2,454,996✔
45
  if (0 == refId) {
2,454,996!
46
    return TSDB_CODE_SUCCESS;
×
47
  }
48

49
  qDebug("sch release ex jobId:0x%" PRIx64, refId);
2,454,996✔
50
  return taosReleaseRefEx(schMgmt.jobRef, refId, released);
2,454,996✔
51
}
52

53
int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) {
754✔
54
  *ppRes = NULL;
754✔
55
  if (NULL == pEpSet) {
754!
56
    return TSDB_CODE_SUCCESS;
×
57
  }
58

59
  int32_t maxSize = 1024;
754✔
60
  char   *str = taosMemoryMalloc(maxSize);
754✔
61
  if (NULL == str) {
754!
62
    return terrno;
×
63
  }
64

65
  int32_t n = 0;
754✔
66
  n += tsnprintf(str + n, maxSize - n, "numOfEps:%d, inUse:%d eps:", pEpSet->numOfEps, pEpSet->inUse);
754✔
67
  for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
2,871✔
68
    SEp *pEp = &pEpSet->eps[i];
2,117✔
69
    n += tsnprintf(str + n, maxSize - n, "[%s:%d]", pEp->fqdn, pEp->port);
2,117✔
70
  }
71

72
  *ppRes = str;
754✔
73
  return TSDB_CODE_SUCCESS;
754✔
74
}
75

76
char *schGetOpStr(SCH_OP_TYPE type) {
5,988,365✔
77
  switch (type) {
5,988,365!
78
    case SCH_OP_NULL:
×
79
      return "NULL";
×
80
    case SCH_OP_EXEC:
4,375,064✔
81
      return "EXEC";
4,375,064✔
82
    case SCH_OP_FETCH:
1,595,866✔
83
      return "FETCH";
1,595,866✔
84
    case SCH_OP_GET_STATUS:
17,436✔
85
      return "GET STATUS";
17,436✔
86
    default:
×
87
      return "UNKNOWN";
×
88
  }
89
}
90

91
void schFreeHbTrans(SSchHbTrans *pTrans) {
936,443✔
92
  (void)rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT);
936,443✔
93

94
  schFreeRpcCtx(&pTrans->rpcCtx);
936,443✔
95
}
936,443✔
96

97
void schCleanClusterHb(void *pTrans) {
8,566✔
98
  int32_t code = 0;
8,566✔
99
  SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
8,566!
100

101
  SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
8,566✔
102
  while (hb) {
9,457✔
103
    if (hb->trans.pTrans == pTrans) {
891✔
104
      SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
90✔
105
      schFreeHbTrans(hb);
90✔
106
      code = taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
90✔
107
      if (code) {
90!
108
        qError("taosHashRemove hb connection failed, error:%s", tstrerror(code));
×
109
      }
110
    }
111

112
    hb = taosHashIterate(schMgmt.hbConnections, hb);
891✔
113
  }
114

115
  SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
8,566!
116
}
8,566✔
117

118
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
936,353✔
119
  int32_t code = 0;
936,353✔
120

121
  SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
936,353!
122
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
936,353✔
123
  if (NULL == hb) {
936,353!
124
    SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
×
125
    SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId->nodeId, epId->ep.fqdn, epId->ep.port);
×
126
    return TSDB_CODE_SUCCESS;
×
127
  }
128

129
  int64_t taskNum = atomic_load_64(&hb->taskNum);
936,353✔
130
  if (taskNum <= 0) {
936,353!
131
    schFreeHbTrans(hb);
936,353✔
132
    code = taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
936,353✔
133
    if (code) {
936,353!
134
      SCH_TASK_WLOG("taosHashRemove hb connection failed, error:%s", tstrerror(code));
×
135
    }
136
  }
137
  SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
936,353!
138

139
  return TSDB_CODE_SUCCESS;
936,353✔
140
}
141

142
int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
942,745✔
143
  int32_t     code = 0;
942,745✔
144
  SSchHbTrans hb = {0};
942,745✔
145

146
  hb.trans.pTrans = pJob->conn.pTrans;
942,745✔
147
  hb.taskNum = 1;
942,745✔
148

149
  SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
942,745!
150

151
  SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
942,396!
152
  code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
943,096✔
153
  if (code) {
943,096✔
154
    SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
6,653!
155
    schFreeRpcCtx(&hb.rpcCtx);
6,653✔
156

157
    if (HASH_NODE_EXIST(code)) {
6,651!
158
      *exist = true;
6,651✔
159
      return TSDB_CODE_SUCCESS;
6,651✔
160
    }
161

162
    qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
×
163
    SCH_ERR_RET(code);
×
164
  }
165

166
  SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
936,443!
167

168
  return TSDB_CODE_SUCCESS;
936,443✔
169
}
170

171
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *pEpId) {
1,689,247✔
172
  SSchHbTrans *hb = NULL;
1,689,247✔
173

174
  while (true) {
175
    SCH_LOCK(SCH_READ, &schMgmt.hbLock);
1,695,962!
176
    hb = taosHashGet(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
1,696,874✔
177
    if (NULL == hb) {
1,696,574✔
178
      bool exist = false;
943,022✔
179
      SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
943,022!
180
      SCH_ERR_RET(schAddHbConnection(pJob, pTask, pEpId, &exist));
1,879,216!
181
      if (!exist) {
943,088✔
182
        SCH_RET(schBuildAndSendHbMsg(pEpId, NULL));
936,373!
183
      }
184

185
      continue;
6,715✔
186
    }
187

188
    break;
753,552✔
189
  }
190

191
  (void)atomic_add_fetch_64(&hb->taskNum, 1);
753,552✔
192

193
  SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
753,811!
194

195
  return TSDB_CODE_SUCCESS;
753,646✔
196
}
197

198
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
3,953,322✔
199
  if (!pTask->registerdHb) {
3,953,322✔
200
    return;
2,263,175✔
201
  }
202

203
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
1,690,285✔
204
  if (NULL == addr) {
1,690,286!
UNCOV
205
    SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
×
206
    return;
×
207
  }
208

209
  SQueryNodeEpId  epId = {0};
1,690,286✔
210

211
  epId.nodeId = addr->nodeId;
1,690,286✔
212

213
  SEp *pEp = SCH_GET_CUR_EP(addr);
1,690,286✔
214
  TAOS_STRCPY(epId.ep.fqdn, pEp->fqdn);
1,690,286✔
215
  epId.ep.port = pEp->port;
1,690,286✔
216

217
  SCH_LOCK(SCH_READ, &schMgmt.hbLock);
1,690,286!
218
  SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
1,690,288✔
219
  if (NULL == hb) {
1,690,285✔
220
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
138!
221
    SCH_TASK_WLOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
138!
222
    return;
138✔
223
  }
224

225
  int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1);
1,690,147✔
226
  if (0 == taskNum) {
1,690,150✔
227
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
936,353!
228
    (void)schRemoveHbConnection(pJob, pTask, &epId);
936,353✔
229
  } else {
230
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
753,797!
231
  }
232

233
  pTask->registerdHb = false;
1,690,150✔
234
}
235

236
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
1,689,587✔
237
  if (!tsEnableQueryHb) {
1,689,587!
238
    return TSDB_CODE_SUCCESS;
×
239
  }
240

241
  SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
1,689,587✔
242
  if (NULL == addr) {
1,689,419!
243
    SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
×
244
    return TSDB_CODE_SCH_INTERNAL_ERROR;
×
245
  }
246

247
  SQueryNodeEpId  epId = {0};
1,689,419✔
248

249
  epId.nodeId = addr->nodeId;
1,689,419✔
250

251
  SEp *pEp = SCH_GET_CUR_EP(addr);
1,689,419✔
252
  TAOS_STRCPY(epId.ep.fqdn, pEp->fqdn);
1,689,419✔
253
  epId.ep.port = pEp->port;
1,689,419✔
254

255
  SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
1,689,419!
256

257
  pTask->registerdHb = true;
1,689,552✔
258

259
  return TSDB_CODE_SUCCESS;
1,689,552✔
260
}
261

262
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
958,031✔
263
  int32_t      code = 0;
958,031✔
264
  SSchHbTrans *hb = NULL;
958,031✔
265

266
  SCH_LOCK(SCH_READ, &schMgmt.hbLock);
958,031!
267
  hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
958,327✔
268
  if (NULL == hb) {
958,200✔
269
    SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
14,957!
270
    (void)atomic_add_fetch_64(&schMgmt.stat.runtime.hbConnNotFound, 1);
14,957✔
271
    return TSDB_CODE_SUCCESS;
14,960✔
272
  }
273

274
  SCH_LOCK(SCH_WRITE, &hb->lock);
943,243!
275
  TAOS_MEMCPY(&hb->trans, trans, sizeof(*trans));
943,158✔
276
  SCH_UNLOCK(SCH_WRITE, &hb->lock);
943,158!
277
  SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
943,344!
278

279
  qDebug("hb connection updated, sId:0x%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
943,254✔
280
         epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
281

282
  return TSDB_CODE_SUCCESS;
943,331✔
283
}
284

285
void schCloseJobRef(void) {
1,993,374✔
286
  if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
1,993,374!
287
    return;
1,993,374✔
288
  }
289

290
  if (schMgmt.jobRef >= 0) {
×
291
    taosCloseRef(schMgmt.jobRef);
×
292
    schMgmt.jobRef = -1;
×
293
  }
294
}
295

296
int32_t initClientId(void) {
3,289✔
297
  int32_t code = taosGetSystemUUIDU64(&schMgmt.clientId);
3,289✔
298
  if (code != TSDB_CODE_SUCCESS) {
3,289!
299
    qError("failed to generate clientId since %s", tstrerror(code));
×
300
    SCH_ERR_RET(code);
×
301
  }
302
  qInfo("initialize");
3,289!
303
  return TSDB_CODE_SUCCESS;
3,289✔
304
}
305

306
uint64_t getClientId(void) { return schMgmt.clientId; }
3,952,821✔
307

308
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
3,952,821✔
309

310
#ifdef BUILD_NO_CALL
311
uint64_t schGenUUID(void) {
312
  static uint32_t hashId = 0;
313
  static int32_t  requestSerialId = 0;
314

315
  if (hashId == 0) {
316
    int32_t code = taosGetSystemUUID32(&hashId);
317
    if (code != TSDB_CODE_SUCCESS) {
318
      qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
319
    }
320
  }
321

322
  int64_t  ts = taosGetTimestampMs();
323
  uint64_t pid = taosGetPId();
324
  int32_t  val = atomic_add_fetch_32(&requestSerialId, 1);
325

326
  uint64_t id = ((uint64_t)((hashId & 0x0FFF)) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
327
  return id;
328
}
329
#endif
330

331
void schFreeRpcCtxVal(const void *arg) {
7,135,951✔
332
  if (NULL == arg) {
7,135,951!
333
    return;
×
334
  }
335

336
  SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
7,135,951✔
337
  destroySendMsgInfo(pMsgSendInfo);
7,135,951✔
338
}
339

340
void schFreeRpcCtx(SRpcCtx *pCtx) {
944,173✔
341
  if (NULL == pCtx) {
944,173!
342
    return;
×
343
  }
344
  void *pIter = taosHashIterate(pCtx->args, NULL);
944,173✔
345
  while (pIter) {
1,887,267✔
346
    SRpcCtxVal *ctxVal = (SRpcCtxVal *)pIter;
943,096✔
347

348
    (*pCtx->freeFunc)(ctxVal->val);
943,096✔
349

350
    pIter = taosHashIterate(pCtx->args, pIter);
943,096✔
351
  }
352

353
  taosHashCleanup(pCtx->args);
944,171✔
354

355
  if (pCtx->freeFunc) {
944,168✔
356
    (*pCtx->freeFunc)(pCtx->brokenVal.val);
943,093✔
357
  }
358
}
359

360
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
4,157,749✔
361
  *pTask = NULL;
4,157,749✔
362

363
  int32_t s = taosHashGetSize(pTaskList);
4,157,749✔
364
  if (s <= 0) {
4,157,732!
365
    return;
×
366
  }
367

368
  SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
4,157,732✔
369
  if (NULL == task || NULL == (*task)) {
4,157,841!
370
    return;
×
371
  }
372

373
  *pTask = *task;
4,157,851✔
374
}
375

376
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum) {
3,952,965✔
377
  if (NULL == pSubplan) {
3,952,965!
378
    SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d, level: %d", idx, taskNum, level);
×
379
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
380
  }
381
  
382
  if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(pSubplan)) {
3,952,965!
383
    SCH_JOB_ELOG("invalid subplan type, level:%d, subplanNodeType:%d", level, nodeType(pSubplan));
×
384
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
385
  }
386
  
387
  if (pSubplan->subplanType < SUBPLAN_TYPE_MERGE || pSubplan->subplanType > SUBPLAN_TYPE_COMPUTE) {
3,952,965!
UNCOV
388
    SCH_JOB_ELOG("invalid subplanType %d, level:%d, subplan idx:%d", pSubplan->subplanType, level, idx);
×
UNCOV
389
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
390
  }
391

392
  if (pSubplan->level != level) {
3,952,965!
393
    SCH_JOB_ELOG("plan level %d mis-match with current level %d", pSubplan->level, level);
×
394
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
395
  }
396

397
  if (SCH_IS_DATA_BIND_PLAN(pSubplan)) {
3,952,965✔
398
    if (pSubplan->execNode.epSet.numOfEps <= 0) {
3,319,404✔
399
      SCH_JOB_ELOG("no execNode specifed for data src plan %d, numOfEps:%d", pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps);
142!
400
      SCH_ERR_RET(TSDB_CODE_SCH_DATA_SRC_EP_MISS);
142!
401
    }
402
    if (pSubplan->execNode.epSet.inUse >= pSubplan->execNode.epSet.numOfEps) {
3,319,262!
403
      SCH_JOB_ELOG("invalid epset inUse %d for data src plan %d, numOfEps:%d", pSubplan->execNode.epSet.inUse, pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps);
×
404
      SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
405
    }
406
  }
407
  
408
  if (NULL == pSubplan->pNode && pSubplan->subplanType != SUBPLAN_TYPE_MODIFY) {
3,952,823!
409
    SCH_JOB_ELOG("empty plan root node, level:%d, subplan idx:%d, subplanType:%d", level, idx, pSubplan->subplanType);
×
410
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
411
  }
412

413
  if (NULL == pSubplan->pDataSink) {
3,952,823!
414
    SCH_JOB_ELOG("empty plan dataSink, level:%d, subplan idx:%d", level, idx);
×
415
    SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
416
  }
417

418
  return TSDB_CODE_SUCCESS;
3,952,819✔
419
}
420

421

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc