• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.15
/source/libs/qworker/src/qwUtil.c
1
#include "dataSinkMgt.h"
2
#include "executor.h"
3
#include "planner.h"
4
#include "query.h"
5
#include "qwInt.h"
6
#include "qwMsg.h"
7
#include "qworker.h"
8
#include "tcommon.h"
9
#include "tmsg.h"
10
#include "tname.h"
11

12
char *qwPhaseStr(int32_t phase) {
12,299,944✔
13
  switch (phase) {
12,299,944!
14
    case QW_PHASE_PRE_QUERY:
2,945,039✔
15
      return "PRE_QUERY";
2,945,039✔
16
    case QW_PHASE_POST_QUERY:
2,944,943✔
17
      return "POST_QUERY";
2,944,943✔
18
    case QW_PHASE_PRE_FETCH:
3,128,807✔
19
      return "PRE_FETCH";
3,128,807✔
20
    case QW_PHASE_POST_FETCH:
3,128,805✔
21
      return "POST_FETCH";
3,128,805✔
22
    case QW_PHASE_PRE_CQUERY:
109,629✔
23
      return "PRE_CQUERY";
109,629✔
24
    case QW_PHASE_POST_CQUERY:
42,844✔
25
      return "POST_CQUERY";
42,844✔
26
    default:
×
27
      break;
×
28
  }
29

30
  return "UNKNOWN";
×
31
}
32

33
char *qwBufStatusStr(int32_t bufStatus) {
101,670✔
34
  switch (bufStatus) {
101,670!
35
    case DS_BUF_LOW:
8,418✔
36
      return "LOW";
8,418✔
37
    case DS_BUF_FULL:
×
38
      return "FULL";
×
39
    case DS_BUF_EMPTY:
93,252✔
40
      return "EMPTY";
93,252✔
41
    default:
×
42
      break;
×
43
  }
44

45
  return "UNKNOWN";
×
46
}
47

48
int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status, bool dynamicTask) {
4,753,763✔
49
  int32_t code = 0;
4,753,763✔
50
  int8_t  origStatus = 0;
4,753,763✔
51
  bool    ignore = false;
4,753,763✔
52

53
  while (true) {
54
    origStatus = atomic_load_8(&task->status);
4,753,763✔
55

56
    QW_ERR_RET(qwDbgValidateStatus(QW_FPARAMS(), origStatus, status, &ignore, dynamicTask));
4,753,732✔
57
    if (ignore) {
4,753,583✔
58
      break;
4,116✔
59
    }
60

61
    if (origStatus != atomic_val_compare_exchange_8(&task->status, origStatus, status)) {
4,749,467!
62
      continue;
×
63
    }
64

65
    QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status));
4,749,813✔
66

67
    break;
4,749,815✔
68
  }
69

70
  return TSDB_CODE_SUCCESS;
4,753,931✔
71
}
72

73
int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t clientId, int32_t rwType) {
9,441✔
74
  SQWSchStatus newSch = {0};
9,441✔
75
  newSch.tasksHash =
9,441✔
76
      taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
9,441✔
77
  newSch.hbBrokenTs = taosGetTimestampMs();
9,441✔
78

79
  if (NULL == newSch.tasksHash) {
9,441!
80
    QW_SCH_ELOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
×
81
    QW_ERR_RET(terrno);
×
82
  }
83

84
  QW_LOCK(QW_WRITE, &mgmt->schLock);
9,441!
85
  int32_t code = taosHashPut(mgmt->schHash, &clientId, sizeof(clientId), &newSch, sizeof(newSch));
9,440✔
86
  if (0 != code) {
9,441✔
87
    if (!HASH_NODE_EXIST(code)) {
183!
88
      QW_UNLOCK(QW_WRITE, &mgmt->schLock);
×
89

90
      QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", ERRNO);
×
91
      taosHashCleanup(newSch.tasksHash);
×
92
      QW_ERR_RET(code);
×
93
    }
94

95
    taosHashCleanup(newSch.tasksHash);
183✔
96
  }
97
  QW_UNLOCK(QW_WRITE, &mgmt->schLock);
9,441✔
98

99
  return TSDB_CODE_SUCCESS;
9,441✔
100
}
101

102
int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
8,910,003✔
103
  while (true) {
104
    QW_LOCK(rwType, &mgmt->schLock);
8,910,003!
105
    *sch = taosHashGet(mgmt->schHash, &clientId, sizeof(clientId));
8,910,321✔
106
    if (NULL == (*sch)) {
8,910,155✔
107
      QW_UNLOCK(rwType, &mgmt->schLock);
9,441!
108

109
      if (QW_NOT_EXIST_ADD == nOpt) {
9,441!
110
        QW_ERR_RET(qwAddSchedulerImpl(mgmt, clientId, rwType));
9,441!
111

112
        nOpt = QW_NOT_EXIST_RET_ERR;
9,513✔
113

114
        continue;
9,513✔
115
      } else if (QW_NOT_EXIST_RET_ERR == nOpt) {
×
116
        QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
×
117
      } else {
118
        QW_SCH_ELOG("unknown notExistOpt:%d", nOpt);
×
119
        QW_ERR_RET(TSDB_CODE_APP_ERROR);
×
120
      }
121
    }
122

123
    break;
8,900,627✔
124
  }
125

126
  return TSDB_CODE_SUCCESS;
8,900,627✔
127
}
128

129
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch) {
2,552,733✔
130
  return qwAcquireSchedulerImpl(mgmt, clientId, rwType, sch, QW_NOT_EXIST_ADD);
2,552,733✔
131
}
132

133
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch) {
6,347,824✔
134
  return qwAcquireSchedulerImpl(mgmt, clientId, rwType, sch, QW_NOT_EXIST_RET_ERR);
6,347,824✔
135
}
136

137
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
8,900,819✔
138

139
int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
6,346,725✔
140
  char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
6,346,725✔
141
  QW_SET_QTID(id, qId, cId, tId, eId);
6,346,725✔
142

143
  QW_LOCK(rwType, &sch->tasksLock);
6,346,725!
144
  *task = taosHashGet(sch->tasksHash, id, sizeof(id));
6,346,840✔
145
  if (NULL == (*task)) {
6,346,749✔
146
    QW_UNLOCK(rwType, &sch->tasksLock);
1,077!
147
    QW_TASK_ELOG_E("task status not exists");
1,077!
148
    QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
1,045!
149
  }
150

151
  return TSDB_CODE_SUCCESS;
6,345,640✔
152
}
153

154
int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
1,591,748✔
155
  int32_t code = 0;
1,591,748✔
156

157
  char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
1,591,748✔
158
  QW_SET_QTID(id, qId, cId, tId, eId);
1,591,748✔
159

160
  SQWTaskStatus ntask = {0};
1,591,748✔
161
  ntask.status = status;
1,591,748✔
162
  ntask.refId = rId;
1,591,748✔
163

164
  QW_LOCK(QW_WRITE, &sch->tasksLock);
1,591,748!
165
  code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
1,591,856✔
166
  if (0 != code) {
1,591,906!
167
    QW_UNLOCK(QW_WRITE, &sch->tasksLock);
×
168
    if (HASH_NODE_EXIST(code)) {
×
169
      if (rwType && task) {
×
170
        QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
×
171
      } else {
172
        QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status));
×
173
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
×
174
      }
175
    } else {
176
      QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code));
×
177
      QW_ERR_RET(code);
×
178
    }
179
  }
180
  QW_UNLOCK(QW_WRITE, &sch->tasksLock);
1,591,906✔
181

182
  QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status));
1,591,854✔
183

184
  if (rwType && task) {
1,591,858!
185
    QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task));
×
186
  }
187

188
  return TSDB_CODE_SUCCESS;
1,591,858✔
189
}
190

191
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
1,591,922✔
192
  SQWSchStatus *tsch = NULL;
1,591,922✔
193
  int32_t       code = 0;
1,591,922✔
194
  QW_ERR_RET(qwAcquireAddScheduler(mgmt, cId, QW_READ, &tsch));
1,591,922!
195

196
  QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
1,591,808!
197

198
_return:
1,591,836✔
199

200
  qwReleaseScheduler(QW_READ, mgmt);
1,591,836✔
201

202
  QW_RET(code);
1,591,898!
203
}
204

205
int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, int32_t status,
×
206
                               SQWTaskStatus **task) {
207
  return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
×
208
}
209

210
void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); }
6,345,742✔
211

212
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
11,400,990✔
213
  char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
11,400,990✔
214
  QW_SET_QTID(id, qId, cId, tId, eId);
11,400,990✔
215

216
  *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
11,400,990✔
217
  if (NULL == (*ctx)) {
11,401,580✔
218
    QW_TASK_DLOG_E("acquired task ctx not exist, may be dropped");
881✔
219
    QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
881!
220
  }
221

222
  return TSDB_CODE_SUCCESS;
11,400,699✔
223
}
224

225
int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
3,311,629✔
226
  char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
3,311,629✔
227
  QW_SET_QTID(id, qId, cId, tId, eId);
3,311,629✔
228

229
  *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
3,311,629✔
230
  if (NULL == (*ctx)) {
3,311,994✔
231
    QW_TASK_DLOG_E("get task ctx not exist, may be dropped");
140!
232
    QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
140!
233
  }
234

235
  return TSDB_CODE_SUCCESS;
3,311,854✔
236
}
237

238
int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) {
1,591,400✔
239
  char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
1,591,400✔
240
  QW_SET_QTID(id, qId, cId, tId, eId);
1,591,400✔
241

242
  SQWTaskCtx nctx = {0};
1,591,400✔
243

244
  int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
1,591,400✔
245
  if (0 != code) {
1,591,868!
246
    if (HASH_NODE_EXIST(code)) {
×
247
      if (acquire && ctx) {
×
248
        QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
×
249
      } else if (ctx) {
×
250
        QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
×
251
      } else {
252
        QW_TASK_ELOG_E("task ctx already exist");
×
253
        QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST);
×
254
      }
255
    } else {
256
      QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code);
×
257
      QW_ERR_RET(code);
×
258
    }
259
  }
260

261
  (void)atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1);
1,591,868✔
262

263
  if (acquire && ctx) {
1,591,889!
264
    QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx));
×
265
  } else if (ctx) {
1,591,889!
266
    QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx));
×
267
  }
268

269
  return TSDB_CODE_SUCCESS;
1,591,889✔
270
}
271

272
int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL)); }
1,591,437!
273

274
int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); }
×
275

276
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
11,400,845✔
277

278
void qwFreeTaskHandle(SQWTaskCtx *ctx) {
3,164,864✔
279
  // Note: free/kill may in RC
280
  qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle);
3,164,864✔
281
  if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) {
3,164,881✔
282
    taosEnableMemPoolUsage(ctx->memPoolSession);
1,598,746✔
283
    qDestroyTask(otaskHandle);
1,598,746✔
284
    taosDisableMemPoolUsage();
1,598,296✔
285

286
    (void)atomic_add_fetch_64(&gQueryMgmt.stat.taskExecDestroyNum, 1);
1,598,296✔
287

288
    qDebug("task handle destroyed");
1,598,728✔
289
  }
290
}
3,164,900✔
291

292
void qwFreeSinkHandle(SQWTaskCtx *ctx) {
3,114,531✔
293
  // Note: free/kill may in RC
294
  void *osinkHandle = atomic_load_ptr(&ctx->sinkHandle);
3,114,531✔
295
  if (osinkHandle && osinkHandle == atomic_val_compare_exchange_ptr(&ctx->sinkHandle, osinkHandle, NULL)) {
3,114,564!
296
    QW_SINK_ENABLE_MEMPOOL(ctx);
1,598,737✔
297
    dsDestroyDataSinker(osinkHandle);
1,598,737✔
298
    QW_SINK_DISABLE_MEMPOOL();
1,598,717✔
299

300
    (void)atomic_add_fetch_64(&gQueryMgmt.stat.taskSinkDestroyNum, 1);
1,598,717✔
301
    
302
    qDebug("sink handle destroyed");
1,598,739✔
303
  }
304
}
3,114,583✔
305

306
int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
1,075✔
307
  int32_t code = 0;
1,075✔
308

309
  // Note: free/kill may in RC
310
  qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
1,075✔
311
  if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
1,075!
312
    qDebug("start to kill task");
997!
313
    code = qAsyncKillTask(taskHandle, rspCode);
997✔
314
    atomic_store_ptr(&ctx->taskHandle, taskHandle);
997✔
315
  }
316

317
  QW_RET(code);
1,075!
318
}
319

320
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
1,598,925✔
321
  if (ctx->ctrlConnInfo.handle) {
1,598,925✔
322
    tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER, ctx->rspCode);
1,591,773✔
323
  }
324

325
  ctx->ctrlConnInfo.handle = NULL;
1,598,910✔
326
  ctx->ctrlConnInfo.refId = -1;
1,598,910✔
327

328
  // NO need to release dataConnInfo
329

330
  qwFreeTaskHandle(ctx);
1,598,910✔
331

332
  qwFreeSinkHandle(ctx);
1,598,908✔
333

334
  taosArrayDestroy(ctx->tbInfo);
1,598,900✔
335

336
  if (gMemPoolHandle && ctx->memPoolSession) {
1,598,906!
337
    qwDestroySession(QW_FPARAMS(), ctx->pJobInfo, ctx->memPoolSession, true);
1,591,901✔
338
    ctx->memPoolSession = NULL;
1,591,926✔
339
  }
340
}
1,598,931✔
341

342
static void freeExplainExecItem(void *param) {
58,780✔
343
  SExplainExecInfo *pInfo = param;
58,780✔
344
  taosMemoryFree(pInfo->verboseInfo);
58,780!
345
}
58,783✔
346

347
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
27,228✔
348
  int32_t     code = TSDB_CODE_SUCCESS;
27,228✔
349
  qTaskInfo_t taskHandle = ctx->taskHandle;
27,228✔
350

351
  ctx->explainRsped = true;
27,228✔
352

353
  SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo));
27,228✔
354
  if (NULL == execInfoList) {
27,227!
355
    QW_ERR_JRET(terrno);
×
356
  }
357

358
  QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList));
27,227!
359

360
  if (ctx->localExec) {
27,228!
361
    SExplainLocalRsp localRsp = {0};
×
362
    localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList);
×
363
    SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo));
×
364
    if (NULL == pExec) {
×
365
      QW_ERR_JRET(terrno);
×
366
    }
367
    (void)memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo));
×
368
    localRsp.rsp.subplanInfo = pExec;
×
369
    localRsp.qId = qId;
×
370
    localRsp.cId = cId;
×
371
    localRsp.tId = tId;
×
372
    localRsp.rId = rId;
×
373
    localRsp.eId = eId;
×
374
    if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) {
×
375
      QW_ERR_JRET(terrno);
×
376
    }
377

378
    taosArrayDestroy(execInfoList);
×
379
    execInfoList = NULL;
×
380
  } else {
381
    SRpcHandleInfo connInfo = ctx->ctrlConnInfo;
27,228✔
382
    connInfo.ahandle = NULL;
27,228✔
383
    int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList);
27,228✔
384
    taosArrayDestroyEx(execInfoList, freeExplainExecItem);
27,221✔
385
    execInfoList = NULL;
27,225✔
386

387
    QW_ERR_RET(code);
27,225!
388
  }
389

390
_return:
27,225✔
391

392
  taosArrayDestroyEx(execInfoList, freeExplainExecItem);
27,225✔
393

394
  return code;
27,228✔
395
}
396

397
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
1,591,794✔
398
  char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
1,591,794✔
399
  QW_SET_QTID(id, qId, cId, tId, eId);
1,591,794✔
400
  SQWTaskCtx octx;
401
  int32_t code = TSDB_CODE_SUCCESS;
1,591,794✔
402

403
  SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
1,591,794✔
404
  if (NULL == ctx) {
1,591,805!
405
    QW_TASK_DLOG_E("drop task ctx not exist, may be dropped");
×
406
    QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
×
407
  }
408

409
  octx = *ctx;
1,591,805✔
410

411
  if (ctx->pJobInfo && TSDB_CODE_SUCCESS != ctx->pJobInfo->errCode) {
1,591,805!
412
    QW_UPDATE_RSP_CODE(ctx, ctx->pJobInfo->errCode);
×
413
  } else {
414
    QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED);
1,591,805✔
415
  }
416

417
  atomic_store_ptr(&ctx->taskHandle, NULL);
1,591,802✔
418
  atomic_store_ptr(&ctx->sinkHandle, NULL);
1,591,799✔
419
  atomic_store_ptr(&ctx->pJobInfo, NULL);
1,591,801✔
420
  atomic_store_ptr(&ctx->memPoolSession, NULL);
1,591,802✔
421

422
  QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
1,591,808✔
423

424
  if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
1,591,808!
425
    QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
×
426
    code = QW_CTX_NOT_EXISTS_ERR_CODE(mgmt);
×
427
  }
428

429
  qwFreeTaskCtx(QW_FPARAMS(), &octx);
1,591,807✔
430
  ctx->tbInfo = NULL;
1,591,802✔
431

432
  QW_TASK_DLOG_E("task ctx dropped");
1,591,802✔
433
  
434
  (void)atomic_add_fetch_64(&gQueryMgmt.stat.taskDestroyNum, 1);
1,591,809✔
435

436
  return code;
1,591,809✔
437
}
438

439
int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
1,591,801✔
440
  SQWSchStatus  *sch = NULL;
1,591,801✔
441
  SQWTaskStatus *task = NULL;
1,591,801✔
442
  int32_t        code = 0;
1,591,801✔
443

444
  char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
1,591,801✔
445
  QW_SET_QTID(id, qId, cId, tId, eId);
1,591,801✔
446

447
  if (qwAcquireScheduler(mgmt, cId, QW_WRITE, &sch)) {
1,591,801!
448
    QW_TASK_WLOG_E("scheduler does not exist");
×
449
    return TSDB_CODE_SUCCESS;
×
450
  }
451

452
  if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
1,591,809!
453
    qwReleaseScheduler(QW_WRITE, mgmt);
×
454

455
    QW_TASK_WLOG_E("task does not exist");
×
456
    return TSDB_CODE_SUCCESS;
×
457
  }
458

459
  if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
1,591,797!
460
    QW_TASK_ELOG_E("taosHashRemove task from hash failed");
×
461
    QW_ERR_JRET(TSDB_CODE_APP_ERROR);
1!
462
  }
463

464
  QW_TASK_DLOG_E("task status dropped");
1,591,806✔
465

466
_return:
119,445✔
467

468
  if (task) {
1,591,808✔
469
    qwReleaseTaskStatus(QW_WRITE, sch);
1,591,807✔
470
  }
471
  qwReleaseScheduler(QW_WRITE, mgmt);
1,591,802✔
472

473
  QW_RET(code);
1,591,807!
474
}
475

476
int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask) {
4,754,726✔
477
  SQWSchStatus  *sch = NULL;
4,754,726✔
478
  SQWTaskStatus *task = NULL;
4,754,726✔
479
  int32_t        code = 0;
4,754,726✔
480

481
  QW_ERR_RET(qwAcquireScheduler(mgmt, cId, QW_READ, &sch));
4,754,726!
482
  QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
4,754,968✔
483

484
  QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status, dynamicTask));
4,753,817✔
485

486
_return:
4,753,930✔
487

488
  if (task) {
4,755,086✔
489
    qwReleaseTaskStatus(QW_READ, sch);
4,754,011✔
490
  }
491
  qwReleaseScheduler(QW_READ, mgmt);
4,755,047✔
492

493
  QW_RET(code);
4,754,960✔
494
}
495

496
int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) {
1,591,793✔
497
  char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
1,591,793✔
498
  QW_SET_QTID(id, qId, cId, tId, eId);
1,591,793✔
499
  SQWTaskCtx octx;
500

501
  SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
1,591,793✔
502
  if (NULL == ctx) {
1,591,809✔
503
    QW_TASK_DLOG_E("task ctx not exist, may be dropped");
2!
504
    QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
2!
505
  }
506

507
  if (!ctx->dynamicTask) {
1,591,807✔
508
    return TSDB_CODE_SUCCESS;
1,559,233✔
509
  }
510

511
  QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
32,574!
512

513
  QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
32,574!
514

515
  return TSDB_CODE_SUCCESS;
32,573✔
516
}
517

518
int32_t qwDropTask(QW_FPARAMS_DEF) {
1,591,792✔
519
  QW_ERR_RET(qwHandleDynamicTaskEnd(QW_FPARAMS()));
1,591,792!
520
  QW_ERR_RET(qwDropTaskStatus(QW_FPARAMS()));
1,591,806!
521
  QW_ERR_RET(qwDropTaskCtx(QW_FPARAMS()));
1,591,800!
522

523
  QW_TASK_DLOG_E("task is dropped");
1,591,809✔
524

525
  return TSDB_CODE_SUCCESS;
1,591,806✔
526
}
527

528
void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
14,829✔
529
  int32_t paramIdx = 0;
14,829✔
530
  int32_t newParamIdx = 0;
14,829✔
531

532
  while (true) {
533
    paramIdx = atomic_load_32(&gQwMgmt.paramIdx);
14,831✔
534
    if (paramIdx == tListLen(gQwMgmt.param)) {
14,835!
535
      newParamIdx = 1;
×
536
    } else {
537
      newParamIdx = paramIdx + 1;
14,835✔
538
    }
539

540
    if (paramIdx == atomic_val_compare_exchange_32(&gQwMgmt.paramIdx, paramIdx, newParamIdx)) {
14,835✔
541
      break;
14,831✔
542
    }
543
  }
544

545
  if (paramIdx == tListLen(gQwMgmt.param)) {
14,831!
546
    paramIdx = 0;
×
547
  }
548

549
  gQwMgmt.param[paramIdx].qwrId = gQwMgmt.qwRef;
14,831✔
550
  gQwMgmt.param[paramIdx].refId = refId;
14,831✔
551

552
  *pParam = &gQwMgmt.param[paramIdx];
14,831✔
553
}
14,831✔
554

555
int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
1,591,583✔
556
  char       dbFName[TSDB_DB_FNAME_LEN];
557
  char       tbName[TSDB_TABLE_NAME_LEN];
558
  STbVerInfo tbInfo;
559
  int32_t    i = 0;
1,591,583✔
560
  int32_t    code = TSDB_CODE_SUCCESS;
1,591,583✔
561
  bool       tbGet = false;
1,591,583✔
562

563
  while (true) {
564
    tbGet = false;
2,751,818✔
565
    code = qGetQueryTableSchemaVersion(pTaskInfo, dbFName, TSDB_DB_FNAME_LEN, tbName, TSDB_TABLE_NAME_LEN,
2,751,818✔
566
                                       &tbInfo.sversion, &tbInfo.tversion, &tbInfo.rversion, i, &tbGet);
567
    if (TSDB_CODE_SUCCESS != code || !tbGet) {
2,751,824!
568
      break;
569
    }
570

571
    if (dbFName[0] && tbName[0]) {
1,160,298!
572
      (void)snprintf(tbInfo.tbFName, sizeof(tbInfo.tbFName), "%s.%s", dbFName, tbName);
1,122,110✔
573
    } else {
574
      tbInfo.tbFName[0] = 0;
38,188✔
575
    }
576

577
    if (NULL == ctx->tbInfo) {
1,160,298✔
578
      ctx->tbInfo = taosArrayInit(1, sizeof(tbInfo));
1,160,266✔
579
      if (NULL == ctx->tbInfo) {
1,160,049!
580
        QW_ERR_RET(terrno);
×
581
      }
582
    }
583

584
    if (NULL == taosArrayPush(ctx->tbInfo, &tbInfo)) {
2,320,310!
585
      QW_ERR_RET(terrno);
×
586
    }
587

588
    i++;
1,160,235✔
589
  }
590

591
  QW_RET(code);
1,591,526!
592
}
593

594
void qwCloseRef(void) {
14,831✔
595
  taosWLockLatch(&gQwMgmt.lock);
14,831✔
596
  if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
14,831!
597
    taosCloseRef(gQwMgmt.qwRef);  // ignore error
3,255✔
598
    gQwMgmt.qwRef = -1;
3,255✔
599

600
    taosHashCleanup(gQueryMgmt.pJobInfo);
3,255✔
601
    gQueryMgmt.pJobInfo = NULL;
3,255✔
602
  }
603
  taosWUnLockLatch(&gQwMgmt.lock);
14,831✔
604
}
14,831✔
605

606
void qwDestroySchStatus(SQWSchStatus *pStatus) { taosHashCleanup(pStatus->tasksHash); }
9,258✔
607

608
void qwDestroyImpl(void *pMgmt) {
14,830✔
609
  SQWorker *mgmt = (SQWorker *)pMgmt;
14,830✔
610
  int8_t    nodeType = mgmt->nodeType;
14,830✔
611
  int32_t   nodeId = mgmt->nodeId;
14,830✔
612

613
  int32_t taskCount = 0;
14,830✔
614
  int32_t schStatusCount = 0;
14,830✔
615
  qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt);
14,830✔
616

617
  if (taosTmrStop(mgmt->hbTimer)) {
14,830!
618
    qTrace("stop qworker hb timer may failed");
14,831✔
619
  }
620

621
  mgmt->hbTimer = NULL;
14,831✔
622
  taosTmrCleanUp(mgmt->timer);
14,831✔
623

624
  uint64_t qId, cId, tId, sId;
625
  int32_t  eId;
626
  int64_t  rId = 0;
14,831✔
627
  void    *pIter = taosHashIterate(mgmt->ctxHash, NULL);
14,831✔
628

629
  while (pIter) {
14,953✔
630
    SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
122✔
631
    void       *key = taosHashGetKey(pIter, NULL);
122✔
632
    QW_GET_QTID(key, qId, cId, tId, eId);
122✔
633
    sId = ctx->sId;
122✔
634

635
    qwFreeTaskCtx(QW_FPARAMS(), ctx);
122✔
636
    QW_TASK_DLOG_E("task ctx freed");
122✔
637
    pIter = taosHashIterate(mgmt->ctxHash, pIter);
122✔
638
    taskCount++;
122✔
639
  }
640
  taosHashCleanup(mgmt->ctxHash);
14,831✔
641

642
  pIter = taosHashIterate(mgmt->schHash, NULL);
14,831✔
643
  while (pIter) {
24,088✔
644
    SQWSchStatus *sch = (SQWSchStatus *)pIter;
9,257✔
645
    qwDestroySchStatus(sch);
9,257✔
646

647
    pIter = taosHashIterate(mgmt->schHash, pIter);
9,257✔
648
    schStatusCount++;
9,257✔
649
  }
650
  taosHashCleanup(mgmt->schHash);
14,831✔
651

652
  *mgmt->destroyed = 1;
14,831✔
653

654
  taosMemoryFree(mgmt);
14,831!
655

656
  (void)atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
14,831✔
657

658
  qwCloseRef();
14,831✔
659

660
  qDebug("qworker destroyed, type:%d, id:%d, handle:%p, taskCount:%d, schStatusCount: %d", nodeType, nodeId, mgmt,
14,831✔
661
         taskCount, schStatusCount);
662
}
14,831✔
663

664
int32_t qwOpenRef(void) {
14,824✔
665
  taosWLockLatch(&gQwMgmt.lock);
14,824✔
666
  if (gQwMgmt.qwRef < 0) {
14,831✔
667
    gQwMgmt.qwRef = taosOpenRef(100, qwDestroyImpl);
3,255✔
668
    if (gQwMgmt.qwRef < 0) {
3,255!
669
      taosWUnLockLatch(&gQwMgmt.lock);
×
670
      qError("init qworker ref failed");
×
671
      QW_RET(gQwMgmt.qwRef);
×
672
    }
673
  }
674
  taosWUnLockLatch(&gQwMgmt.lock);
14,831✔
675

676
  return TSDB_CODE_SUCCESS;
14,831✔
677
}
678

679
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type) {
5,830,968✔
680
  if (ts <= 0) {
5,830,968✔
681
    return TSDB_CODE_SUCCESS;
4,870,037✔
682
  }
683

684
  int64_t duration = taosGetTimestampUs() - ts;
961,311✔
685
  switch (type) {
961,311!
686
    case QUERY_QUEUE:
270,667✔
687
      ++mgmt->stat.msgStat.waitTime[0].num;
270,667✔
688
      mgmt->stat.msgStat.waitTime[0].total += duration;
270,667✔
689
      break;
270,667✔
690
    case FETCH_QUEUE:
690,649✔
691
      ++mgmt->stat.msgStat.waitTime[1].num;
690,649✔
692
      mgmt->stat.msgStat.waitTime[1].total += duration;
690,649✔
693
      break;
690,649✔
694
    default:
×
695
      qError("unsupported queue type %d", type);
×
696
      return TSDB_CODE_APP_ERROR;
×
697
  }
698

699
  return TSDB_CODE_SUCCESS;
961,316✔
700
}
701

702
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
36,034✔
703
  SQWTimeInQ *pStat = NULL;
36,034✔
704
  switch (type) {
36,034!
705
    case QUERY_QUEUE:
18,017✔
706
      pStat = &mgmt->stat.msgStat.waitTime[0];
18,017✔
707
      return pStat->num ? (pStat->total / pStat->num) : 0;
18,017✔
708
    case FETCH_QUEUE:
18,017✔
709
      pStat = &mgmt->stat.msgStat.waitTime[1];
18,017✔
710
      return pStat->num ? (pStat->total / pStat->num) : 0;
18,017✔
711
    default:
×
712
      qError("unsupported queue type %d", type);
×
713
      break;
×
714
  }
715

716
  return -1;
×
717
}
718

719
void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
1✔
720
  int32_t code = TSDB_CODE_SUCCESS;
1✔
721
  int32_t num = taosArrayGetSize(pExpiredSch);
1✔
722
  for (int32_t i = 0; i < num; ++i) {
2✔
723
    uint64_t     *clientId = taosArrayGet(pExpiredSch, i);
1✔
724
    SQWSchStatus *pSch = NULL;
1✔
725
    if (NULL == clientId) {
1!
726
      qError("get the %dth client failed, code:%x", i, terrno);
×
727
      break;
×
728
    }
729

730
    code = qwAcquireScheduler(mgmt, *clientId, QW_WRITE, &pSch);
1✔
731
    if (TSDB_CODE_SUCCESS != code) {
1!
732
      qError("acquire client %" PRIx64 " failed, code:%x", *clientId, code);
×
733
      continue;
×
734
    }
735

736
    if (taosHashGetSize(pSch->tasksHash) <= 0) {
1!
737
      qwDestroySchStatus(pSch);
1✔
738
      code = taosHashRemove(mgmt->schHash, clientId, sizeof(*clientId));
1✔
739
      qDebug("client %" PRIx64 " destroy result code:%x", *clientId, code);
1!
740
    }
741

742
    qwReleaseScheduler(QW_WRITE, mgmt);
1✔
743
  }
744
}
1✔
745

746
void qwDestroyJobInfo(void *job) {
668,604✔
747
  if (NULL == job) {
668,604!
748
    return;
×
749
  }
750

751
  SQWJobInfo *pJob = (SQWJobInfo *)job;
668,604✔
752

753
  taosMemoryFreeClear(pJob->memInfo);
668,604!
754
  taosHashCleanup(pJob->pSessions);
668,604✔
755
  pJob->pSessions = NULL;
668,604✔
756
}
757

758
bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx    *ctx, bool forceStop, int32_t errCode) {
1,131✔
759
  int32_t code = TSDB_CODE_SUCCESS;
1,131✔
760
  bool resFreed = false;
1,131✔
761
  
762
  QW_LOCK(QW_WRITE, &ctx->lock);
1,131!
763
  
764
  QW_TASK_DLOG("start to stop task, forceStop:%d, error:%s", forceStop, tstrerror(errCode));
1,131✔
765
  
766
  if ((!forceStop) && (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP))) {
1,131!
767
    QW_TASK_WLOG_E("task already dropping");
×
768
    QW_UNLOCK(QW_WRITE, &ctx->lock);
×
769
  
770
    return resFreed;
×
771
  }
772

773
  if (QW_QUERY_RUNNING(ctx)) {
1,131!
774
    code = qwKillTaskHandle(ctx, errCode);
×
775
    if (TSDB_CODE_SUCCESS != code) {
×
776
      QW_TASK_ELOG("task running, async kill failed, error: %x", code);
×
777
    } else {
778
      QW_TASK_DLOG_E("task running, async killed");
×
779
    }
780
  } else if (QW_FETCH_RUNNING(ctx)) {
1,131!
781
    QW_TASK_DLOG_E("task fetching");
×
782
    QW_UPDATE_RSP_CODE(ctx, errCode);
×
783
    if (forceStop) {
×
784
      QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
×
785
      QW_TASK_DLOG_E("update drop received");
×
786
    }
787
  } else if (forceStop) {
1,131!
788
    QW_UPDATE_RSP_CODE(ctx, errCode);
1,131✔
789
    code = qwDropTask(QW_FPARAMS());
1,131✔
790
    if (TSDB_CODE_SUCCESS != code) {
1,131!
791
      QW_TASK_ELOG("task drop failed, error: %x", code);
×
792
    } else {
793
      QW_TASK_DLOG_E("task dropped");
1,131✔
794
      resFreed = true;
1,131✔
795
    }
796
  } else {
797
    QW_UPDATE_RSP_CODE(ctx, errCode);
×
798
    
799
    qwFreeTaskHandle(ctx);
×
800
    qwFreeSinkHandle(ctx);
×
801

802
    resFreed = true;
×
803
    
804
    QW_TASK_DLOG_E("task resources freed");
×
805
  }
806

807
  QW_UNLOCK(QW_WRITE, &ctx->lock);
1,131✔
808

809
  return resFreed;
1,131✔
810
}
811

812
bool qwRetireTask(QW_FPARAMS_DEF, int32_t errCode) {
×
813
  SQWTaskCtx    *ctx = NULL;
×
814

815
  int32_t code = qwAcquireTaskCtx(QW_FPARAMS(), &ctx);
×
816
  if (TSDB_CODE_SUCCESS != code) {
×
817
    return false;
×
818
  }
819

820
  bool retired = qwStopTask(QW_FPARAMS(), ctx, false, errCode);
×
821

822
  qwReleaseTaskCtx(mgmt, ctx);
×
823

824
  return retired;
×
825
}
826

827
bool qwRetireJob(SQWJobInfo *pJob) {
×
828
  if (NULL == pJob) {
×
829
    return false;
×
830
  }
831

832
  bool  retired = true;
×
833
  void *pIter = taosHashIterate(pJob->pSessions, NULL);
×
834
  while (pIter) {
×
835
    SQWSessionInfo *pSession = (SQWSessionInfo *)pIter;
×
836

837
    if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId, pJob->errCode)) {
×
838
      retired = false;
×
839
    }
840

841
    pIter = taosHashIterate(pJob->pSessions, pIter);
×
842
  }
843

844
  return retired;
×
845
}
846

847

848
void qwStopAllTasks(SQWorker *mgmt) {
12,408✔
849
  uint64_t qId, cId, tId, sId;
850
  int32_t  eId;
851
  int64_t  rId = 0;
12,408✔
852

853
  void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
12,408✔
854
  while (pIter) {
13,540✔
855
    SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
1,131✔
856
    void       *key = taosHashGetKey(pIter, NULL);
1,131✔
857
    QW_GET_QTID(key, qId, cId, tId, eId);
1,131✔
858

859
    sId = ctx->sId;
1,131✔
860

861
    (void)qwStopTask(QW_FPARAMS(), ctx, true, TSDB_CODE_VND_STOPPED);
1,131✔
862

863
    pIter = taosHashIterate(mgmt->ctxHash, pIter);
1,131✔
864
  }
865
}
12,409✔
866

867

868
void qwChkDropTimeoutQuery(SQWorker *mgmt, int32_t currTs) {
×
869
  uint64_t qId, cId, tId, sId;
870
  int32_t  eId;
871
  int64_t  rId = 0;
×
872

873
  void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
×
874
  while (pIter) {
×
875
    SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
×
876
    if (((ctx->lastAckTs <= 0) || (currTs - ctx->lastAckTs) < tsQueryNoFetchTimeoutSec) && (!QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP))) {
×
877
      pIter = taosHashIterate(mgmt->ctxHash, pIter);
×
878
      continue;
×
879
    }
880
    
881
    void       *key = taosHashGetKey(pIter, NULL);
×
882
    QW_GET_QTID(key, qId, cId, tId, eId);
×
883

884
    sId = ctx->sId;
×
885

886
    (void)qwStopTask(QW_FPARAMS(), ctx, true, (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) ? ctx->rspCode : TSDB_CODE_QRY_NO_FETCH_TIMEOUT);
×
887

888
    pIter = taosHashIterate(mgmt->ctxHash, pIter);
×
889
  }
890
}
×
891

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc