• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.44
/source/libs/qworker/src/qwMsg.c
1
#include "qwMsg.h"
2
#include "dataSinkMgt.h"
3
#include "executor.h"
4
#include "planner.h"
5
#include "query.h"
6
#include "qwInt.h"
7
#include "qworker.h"
8
#include "tcommon.h"
9
#include "tmsg.h"
10
#include "tname.h"
11
#include "tgrant.h"
12

13
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) {
943,109,263✔
14
  int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
943,109,263✔
15

16
  SRetrieveTableRsp *pRsp =
943,112,211✔
17
      (SRetrieveTableRsp *)(rpcMalloc ? rpcReallocCont(*rsp, msgSize) : taosMemoryRealloc(*rsp, msgSize));
943,109,263✔
18
  if (NULL == pRsp) {
943,029,522✔
19
    qError("rpcMallocCont %d failed", msgSize);
×
20
    QW_RET(terrno);
×
21
  }
22

23
  if (NULL == *rsp) {
943,029,522✔
24
    TAOS_MEMSET(pRsp, 0, sizeof(SRetrieveTableRsp));
571,562,149✔
25
  }
26

27
  *rsp = pRsp;
943,077,438✔
28

29
  return TSDB_CODE_SUCCESS;
943,119,541✔
30
}
31

32
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, int32_t rawDataLen, bool qComplete) {
533,132,090✔
33
  SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
533,132,090✔
34

35
  rsp->useconds = htobe64(input->useconds);
533,132,090✔
36
  rsp->completed = qComplete;
533,138,422✔
37
  rsp->precision = input->precision;
533,156,440✔
38
  rsp->compressed = input->compressed;
533,149,515✔
39
  rsp->payloadLen = htonl(rawDataLen);
533,135,894✔
40
  rsp->compLen = htonl(len);
533,117,237✔
41
  rsp->numOfRows = htobe64(input->numOfRows);
533,144,637✔
42
  rsp->numOfCols = htonl(input->numOfCols);
533,115,341✔
43
  rsp->numOfBlocks = htonl(input->numOfBlocks);
533,142,631✔
44
}
533,119,151✔
45

46
void qwFreeFetchRsp(SQWTaskCtx *ctx, void *msg) {
100,692,580✔
47
  if (NULL == msg) {
100,692,580✔
48
    return;
63,048,293✔
49
  }
50
  
51
  if (NULL == ctx || !ctx->localExec) {
37,644,287✔
52
    rpcFreeCont(msg);
37,644,287✔
53
  }
54
}
55

56

57
int32_t qwCloneSubQRsp(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppRes, int32_t* dataLen, bool* toFetch, SQWRspItem* pItem) {
763,228✔
58
  SRetrieveTableRsp* pRsp = NULL;
763,228✔
59
  int32_t tcode = qwMallocFetchRsp(!ctx->localExec, pItem->dataLen, &pRsp);
763,228✔
60
  if (tcode) {
763,228✔
61
    QW_TASK_ELOG("qwMallocFetchRsp size %d, localExec:%d failed, error:%s", pItem->dataLen, ctx->localExec, tstrerror(tcode));
×
62
    return tcode;
×
63
  }
64

65
  memcpy(pRsp, pItem->rsp, sizeof(*pRsp) + pItem->dataLen);
763,228✔
66

67
  *ppRes = pRsp;
763,228✔
68
  *dataLen = pItem->dataLen;
763,228✔
69
  int32_t code = ctx->subQRes.code;
763,228✔
70
  
71
  QW_TASK_DLOG("subQ task got res from rspList, rsp:%p, dataLen:%d, code:%d", *ppRes, *dataLen, code);
763,228✔
72

73
  if (toFetch) {
763,228✔
74
    *toFetch = false;
724,173✔
75
  }
76

77
  return code;
763,228✔
78
}
79

80
int32_t qwSaveSubQueryFetchRsp(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWRspItem* pItem, void* rsp, int32_t dataLen, int32_t code) {
37,635,577✔
81
  SQWSubQRes* pRes = &ctx->subQRes;
37,635,577✔
82
  if (code) {
37,635,577✔
83
    pRes->code = code;
755,667✔
84
  }
85
  
86
  pItem->dataLen = dataLen;
37,635,577✔
87

88
  SRetrieveTableRsp* pRsp = NULL;
37,635,577✔
89
  if (rsp && dataLen >= 0) {
37,635,577✔
90
    int32_t tcode = qwMallocFetchRsp(!ctx->localExec, dataLen, &pRsp);
37,635,577✔
91
    if (tcode) {
37,635,577✔
92
      QW_TASK_ELOG("qwMallocFetchRsp size %d, localExec:%d failed, error:%s", dataLen, ctx->localExec, tstrerror(tcode));
×
93
      return tcode;
×
94
    }
95

96
    memcpy(pRsp, rsp, sizeof(*pRsp) + dataLen);
37,635,577✔
97
  }
98
  
99
  pItem->rsp = pRsp;
37,635,577✔
100

101
  return code;
37,635,577✔
102
}
103

104
int32_t qwChkSaveScalarSubQRsp(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void* rsp, int32_t dataLen, int32_t code, bool queryEnd) {
3,990,293✔
105
  SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)rsp;
3,990,293✔
106
  if (!queryEnd || NULL == rsp || be64toh(pRsp->numOfRows) > 1) {
3,990,293✔
107
    QW_TASK_ELOG("invalid subQ rsp, queryEnd:%d, numOfRows:%" PRId64, queryEnd, pRsp ? be64toh(pRsp->numOfRows) : 0);
755,667✔
108
    code = TSDB_CODE_PAR_INVALID_SCALAR_SUBQ_RES_ROWS;
755,667✔
109
  }
110

111
  code = qwSaveSubQueryFetchRsp(QW_FPARAMS(), ctx, &ctx->subQRes.scalarRsp, rsp, dataLen, code);
3,990,293✔
112

113
  ctx->subQRes.fetchDone = queryEnd;
3,990,293✔
114

115
  return code;
3,990,293✔
116
}
117

118
int32_t qwSaveAndHandleWailtList(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWRspItem* newItem, bool fetchDone) {
33,645,284✔
119
  int32_t code = 0;
33,645,284✔
120
  
121
  taosWWaitLockLatch(&ctx->subQRes.lock);
33,645,284✔
122
  if (NULL == taosArrayPush(ctx->subQRes.rspList, newItem)) {
67,290,568✔
123
    qwFreeFetchRsp(ctx, newItem->rsp);
×
124
    QW_TASK_ELOG("taosArrayPush SQWRspItem to list failed, error:%s", tstrerror(terrno));
×
125
    code = terrno;
×
126
    if (TSDB_CODE_SUCCESS == ctx->subQRes.code) {
×
127
      ctx->subQRes.code = code;
×
128
    }
129
  }
130

131
  ctx->subQRes.fetchDone = fetchDone;
33,645,284✔
132

133
  QW_TASK_DLOG("%s code:%d, fetchDone:%d, rspListSize:%d", __func__, ctx->subQRes.code, fetchDone, (int32_t)taosArrayGetSize(ctx->subQRes.rspList));
33,645,284✔
134

135
  int32_t waitNum = taosArrayGetSize(ctx->subQRes.waitList);
33,645,284✔
136
  if (TSDB_CODE_SUCCESS != ctx->subQRes.code) {
33,645,284✔
137
    for (int32_t i = 0; i < waitNum; ++i) {
×
138
      SQWWaitItem* pWaitItem = taosArrayGet(ctx->subQRes.waitList, i);
×
139
      code = qwBuildAndSendFetchRsp(ctx, pWaitItem->reqMsgType + 1, &pWaitItem->connInfo, NULL, 0, ctx->subQRes.code);
×
140
    }
141

142
    taosArrayClear(ctx->subQRes.waitList);
×
143

144
    taosWUnLockLatch(&ctx->subQRes.lock);
×
145

146
    return ctx->subQRes.code;
×
147
  }
148

149
  void* pRsp = NULL;
33,645,284✔
150
  int32_t dataLen = 0;
33,645,284✔
151
  for (int32_t i = 0; i < waitNum; ++i) {
33,684,339✔
152
    SQWWaitItem* pWaitItem = taosArrayGet(ctx->subQRes.waitList, i);
39,055✔
153
    SQWRspItem* pRspItem = taosArrayGet(ctx->subQRes.rspList, pWaitItem->blockIdx);
39,055✔
154
    if (NULL == pRspItem) {
39,055✔
155
      ctx->subQRes.code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
156
      QW_TASK_ELOG("subQ srcTask 0x%" PRIx64 " with invalid blockIdx %" PRIu64 ", currentBlockNum:%d", 
×
157
        pWaitItem->srcTaskId, pWaitItem->blockIdx, (int32_t)taosArrayGetSize(ctx->subQRes.rspList));
158
        
159
      code = qwBuildAndSendFetchRsp(ctx, pWaitItem->reqMsgType + 1, &pWaitItem->connInfo, NULL, 0, ctx->subQRes.code);
×
160
      if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == ctx->subQRes.code) {
×
161
        ctx->subQRes.code = code;
×
162
      }
163

164
      continue;
×
165
    }
166
    
167
    code = qwCloneSubQRsp(QW_FPARAMS(), ctx, &pRsp, &dataLen, NULL, pRspItem);
39,055✔
168
    if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == ctx->subQRes.code) {
39,055✔
169
      ctx->subQRes.code = code;
×
170
    }
171
    code = qwBuildAndSendFetchRsp(ctx, pWaitItem->reqMsgType + 1, &pWaitItem->connInfo, (SRetrieveTableRsp*)pRsp, dataLen, ctx->subQRes.code);
39,055✔
172
    if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == ctx->subQRes.code) {
39,055✔
173
      ctx->subQRes.code = code;
×
174
    }
175
  }
176

177
  taosArrayClear(ctx->subQRes.waitList);
33,645,284✔
178

179
  taosWUnLockLatch(&ctx->subQRes.lock);
33,645,284✔
180

181
  return ctx->subQRes.code;
33,645,284✔
182
}
183

184
int32_t qwChkSaveNonScalarSubQRsp(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void* rsp, int32_t dataLen, int32_t code, bool queryEnd) {
33,645,284✔
185
  if (NULL == rsp || TSDB_CODE_SUCCESS != code) {
33,645,284✔
186
    ctx->subQRes.code = code;
×
187
    return code;
×
188
  }
189
  
190
  if (NULL == ctx->subQRes.rspList) {
33,645,284✔
191
    ctx->subQRes.rspList = taosArrayInit(32, sizeof(SQWRspItem));
33,440,707✔
192
    if (NULL == ctx->subQRes.rspList) {
33,440,707✔
193
      QW_TASK_ELOG("taosArrayInit SQWRspItem list failed, error:%s", tstrerror(terrno));
×
194
      return terrno;
×
195
    }
196
  }
197

198
  SQWRspItem item = {0};
33,645,284✔
199
  code = qwSaveSubQueryFetchRsp(QW_FPARAMS(), ctx, &item, rsp, dataLen, code);
33,645,284✔
200
  if (code) {
33,645,284✔
201
    return code;
×
202
  }
203

204
  return qwSaveAndHandleWailtList(QW_FPARAMS(), ctx, &item, queryEnd);
33,645,284✔
205
}
206

207
int32_t qwChkSaveSubQFetchRsp(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void* rsp, int32_t dataLen, int32_t code, bool queryEnd) {
37,635,577✔
208
  return QW_IS_SCALAR_SUBQ(ctx) ? qwChkSaveScalarSubQRsp(QW_FPARAMS(), ctx, rsp, dataLen, code, queryEnd) : qwChkSaveNonScalarSubQRsp(QW_FPARAMS(), ctx, rsp, dataLen, code, queryEnd);
37,635,577✔
209
}
210

211

212
int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) {
×
213
  SRpcMsg rpcRsp = {
×
214
      .msgType = rspType,
215
      .pCont = NULL,
216
      .contLen = 0,
217
      .code = code,
218
      .info = *pConn,
219
  };
220

221
  tmsgSendRsp(&rpcRsp);
×
222

223
  return TSDB_CODE_SUCCESS;
×
224
}
225

226
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
373,405,970✔
227
  int64_t         affectedRows = ctx ? ctx->affectedRows : 0;
373,405,970✔
228
  SQueryTableRsp  rsp = {0};
373,406,303✔
229
  rsp.code = code;
373,422,469✔
230
  rsp.affectedRows = affectedRows;
373,422,469✔
231
  rsp.tbVerInfo = ctx->tbInfo;
373,422,469✔
232

233
  int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp);
373,384,539✔
234
  if (msgSize < 0) {
373,315,265✔
235
    qError("tSerializeSQueryTableRsp failed");
×
236
    QW_RET(msgSize);
×
237
  }
238
  
239
  void *pRsp = rpcMallocCont(msgSize);
373,315,265✔
240
  if (NULL == pRsp) {
373,351,959✔
241
    qError("rpcMallocCont %d failed", msgSize);
×
242
    QW_RET(terrno);
×
243
  }
244

245
  msgSize = tSerializeSQueryTableRsp(pRsp, msgSize, &rsp);
373,351,959✔
246
  if (msgSize < 0) {
373,365,145✔
247
    qError("tSerializeSQueryTableRsp %d failed", msgSize);
×
248
    QW_RET(msgSize);
×
249
  }
250

251
  SRpcMsg rpcRsp = {
373,365,145✔
252
      .msgType = rspType,
253
      .pCont = pRsp,
254
      .contLen = msgSize,
255
      .code = code,
256
      .info = *pConn,
257
  };
258

259
  tmsgSendRsp(&rpcRsp);
373,322,632✔
260

261
  return TSDB_CODE_SUCCESS;
373,443,809✔
262
}
263

264
int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SArray *pExecList) {
3,159,981✔
265
  SExplainExecInfo *pInfo = taosArrayGet(pExecList, 0);
3,159,981✔
266
  SExplainRsp       rsp = {.numOfPlans = taosArrayGetSize(pExecList), .subplanInfo = pInfo};
3,158,357✔
267

268
  int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
3,158,357✔
269
  if (contLen < 0) {
3,158,759✔
270
    qError("tSerializeSExplainRsp failed, error: %x", terrno);
×
271
    QW_RET(terrno);
×
272
  }
273
  void   *pRsp = rpcMallocCont(contLen);
3,158,759✔
274
  if (NULL == pRsp) {
3,153,871✔
275
    QW_RET(terrno);
×
276
  }
277
  contLen = tSerializeSExplainRsp(pRsp, contLen, &rsp);
3,153,871✔
278
  if (contLen < 0) {
3,159,587✔
279
    qError("tSerializeSExplainRsp second failed, error: %x", terrno);
×
280
    QW_RET(terrno);
×
281
  }
282

283
  SRpcMsg rpcRsp = {
3,159,587✔
284
      .msgType = TDMT_SCH_EXPLAIN_RSP,
285
      .pCont = pRsp,
286
      .contLen = contLen,
287
      .code = 0,
288
      .info = *pConn,
289
  };
290

291
  rpcRsp.info.ahandle = NULL;
3,155,511✔
292
  tmsgSendRsp(&rpcRsp);
3,155,511✔
293
  return TSDB_CODE_SUCCESS;
3,160,399✔
294
}
295

296
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
214,497,502✔
297
  int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
214,497,502✔
298
  if (contLen < 0) {
214,486,601✔
299
    qError("tSerializeSSchedulerHbRsp failed, error: %x", terrno);
×
300
    QW_RET(terrno);
×
301
  }
302

303
  void   *pRsp = rpcMallocCont(contLen);
214,486,601✔
304
  if (NULL == pRsp) {
214,477,234✔
305
    QW_RET(terrno);
×
306
  }
307
  contLen = tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
214,477,234✔
308
  if (contLen < 0) {
214,479,451✔
309
    qError("tSerializeSSchedulerHbRsp second failed, error: %x", terrno);
×
310
    QW_RET(terrno);
×
311
  }
312

313
  SRpcMsg rpcRsp = {
214,479,451✔
314
      .msgType = TDMT_SCH_QUERY_HEARTBEAT_RSP,
315
      .contLen = contLen,
316
      .pCont = pRsp,
317
      .code = code,
318
      .info = *pConn,
319
  };
320

321
  tmsgSendRsp(&rpcRsp);
214,465,368✔
322

323
  return TSDB_CODE_SUCCESS;
214,500,581✔
324
}
325

326
int32_t qwBuildAndSendFetchRsp(SQWTaskCtx *ctx, int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
534,051,336✔
327
                               int32_t code) {
328
  if (NULL == pRsp) {
534,051,336✔
329
    pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
187,368✔
330
    if (NULL == pRsp) {
187,368✔
331
      QW_RET(terrno);
×
332
    }
333
    TAOS_MEMSET(pRsp, 0, sizeof(SRetrieveTableRsp));
187,368✔
334
    dataLength = 0;
187,368✔
335
  }
336

337
  SRpcMsg rpcRsp = {
534,051,336✔
338
      .msgType = rspType,
339
      .pCont = pRsp,
340
      .contLen = sizeof(*pRsp) + dataLength,
7,325✔
341
      .code = code,
342
      .info = *pConn,
343
  };
344

345
  rpcRsp.info.compressed = pRsp->compressed;
534,026,861✔
346
  tmsgSendRsp(&rpcRsp);
534,029,430✔
347

348
  if (NULL != ctx) {
534,094,754✔
349
    ctx->lastAckTs = taosGetTimestampSec();
534,092,015✔
350
  }
351

352
  return TSDB_CODE_SUCCESS;
534,108,352✔
353
}
354

355
#if 0
356
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
357
  STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
358
  if (NULL == pRsp) {
359
    QW_RET(terrno);
360
  }
361
  pRsp->code = code;
362

363
  SRpcMsg rpcRsp = {
364
      .msgType = TDMT_SCH_CANCEL_TASK_RSP,
365
      .pCont = pRsp,
366
      .contLen = sizeof(*pRsp),
367
      .code = code,
368
      .info = *pConn,
369
  };
370

371
  tmsgSendRsp(&rpcRsp);
372
  return TSDB_CODE_SUCCESS;
373
}
374

375
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
376
  STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
377
  if (NULL == pRsp) {
378
    QW_RET(terrno);
379
  }
380
  pRsp->code = code;
381

382
  SRpcMsg rpcRsp = {
383
      .msgType = TDMT_SCH_DROP_TASK_RSP,
384
      .pCont = pRsp,
385
      .contLen = sizeof(*pRsp),
386
      .code = code,
387
      .info = *pConn,
388
  };
389

390
  tmsgSendRsp(&rpcRsp);
391
  return TSDB_CODE_SUCCESS;
392
}
393
#endif
394

395
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
×
396
  STaskDropReq qMsg;
×
397
  qMsg.header.vgId = mgmt->nodeId;
×
398
  qMsg.header.contLen = 0;
×
399
  qMsg.sId = sId;
×
400
  qMsg.queryId = qId;
×
401
  qMsg.clientId = cId;
×
402
  qMsg.taskId = tId;
×
403
  qMsg.refId = rId;
×
404
  qMsg.execId = eId;
×
405
  
406
  int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
×
407
  if (msgSize < 0) {
×
408
    QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
×
409
    QW_ERR_RET(msgSize);
×
410
  }
411
  
412
  void *msg = rpcMallocCont(msgSize);
×
413
  if (NULL == msg) {
×
414
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize);
×
415
    QW_ERR_RET(terrno);
×
416
  }
417

418
  msgSize = tSerializeSTaskDropReq(msg, msgSize, &qMsg);
×
419
  if (msgSize < 0) {
×
420
    QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
×
421
    rpcFreeCont(msg);
×
422
    QW_ERR_RET(msgSize);
×
423
  }
424

425
  SRpcMsg pNewMsg = {
×
426
      .msgType = TDMT_SCH_DROP_TASK,
427
      .pCont = msg,
428
      .contLen = msgSize,
429
      .code = 0,
430
      .info = *pConn,
431
  };
432

433
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg);
×
434
  if (TSDB_CODE_SUCCESS != code) {
×
435
    QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
×
436
    QW_ERR_RET(code);
×
437
  }
438

439
  QW_SCH_TASK_DLOG("drop task msg put to queue, vgId:%d", mgmt->nodeId);
×
440

441
  return TSDB_CODE_SUCCESS;
×
442
}
443

444
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
55,925,702✔
445
  SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
55,925,702✔
446
  if (NULL == req) {
55,911,985✔
447
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
×
448
    QW_ERR_RET(terrno);
×
449
  }
450

451
  req->header.vgId = mgmt->nodeId;
55,911,985✔
452
  req->sId = sId;
55,923,776✔
453
  req->queryId = qId;
55,925,046✔
454
  req->clientId = cId;
55,921,401✔
455
  req->taskId = tId;
55,917,819✔
456
  req->execId = eId;
55,920,124✔
457

458
  SRpcMsg pNewMsg = {
55,926,927✔
459
      .msgType = TDMT_SCH_QUERY_CONTINUE,
460
      .pCont = req,
461
      .contLen = sizeof(SQueryContinueReq),
462
      .code = 0,
463
      .info = *pConn,
464
  };
465

466
  int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
55,919,494✔
467
  if (TSDB_CODE_SUCCESS != code) {
55,926,306✔
468
    QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
×
469
    QW_ERR_RET(code);
×
470
  }
471

472
  QW_SCH_TASK_DLOG("query continue msg put to queue, vgId:%d", mgmt->nodeId);
55,926,306✔
473

474
  return TSDB_CODE_SUCCESS;
55,926,292✔
475
}
476

477
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
375,503,141✔
478
  STaskDropReq qMsg;
375,496,018✔
479
  qMsg.header.vgId = mgmt->nodeId;
375,510,982✔
480
  qMsg.header.contLen = 0;
375,519,866✔
481
  qMsg.sId = sId;
375,519,866✔
482
  qMsg.queryId = qId;
375,519,866✔
483
  qMsg.clientId = cId;
375,519,866✔
484
  qMsg.taskId = tId;
375,519,866✔
485
  qMsg.refId = rId;
375,519,866✔
486
  qMsg.execId = eId;
375,519,866✔
487
  
488
  int32_t msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg);
375,519,866✔
489
  if (msgSize < 0) {
375,467,663✔
490
    QW_SCH_TASK_ELOG("tSerializeSTaskDropReq get size, msgSize:%d", msgSize);
×
491
    QW_ERR_RET(msgSize);
×
492
  }
493
  
494
  void *msg = rpcMallocCont(msgSize);
375,467,663✔
495
  if (NULL == msg) {
375,424,045✔
496
    QW_SCH_TASK_ELOG("rpcMallocCont %d failed", msgSize);
×
497
    QW_ERR_RET(terrno);
×
498
  }
499

500
  msgSize = tSerializeSTaskDropReq(msg, msgSize, &qMsg);
375,424,045✔
501
  if (msgSize < 0) {
375,447,615✔
502
    QW_SCH_TASK_ELOG("tSerializeSTaskDropReq failed, msgSize:%d", msgSize);
×
503
    rpcFreeCont(msg);
×
504
    QW_ERR_RET(msgSize);
×
505
  }
506

507
  SRpcMsg brokenMsg = {
375,447,615✔
508
      .msgType = TDMT_SCH_DROP_TASK,
509
      .pCont = msg,
510
      .contLen = msgSize,
511
      .code = TSDB_CODE_RPC_BROKEN_LINK,
512
      .info = *pConn,
513
  };
514

515
  tmsgRegisterBrokenLinkArg(&brokenMsg);
375,389,842✔
516

517
  return TSDB_CODE_SUCCESS;
375,492,754✔
518
}
519

520
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t clientId, SRpcHandleInfo *pConn) {
192,704,384✔
521
  SSchedulerHbReq req = {0};
192,704,384✔
522
  req.header.vgId = mgmt->nodeId;
192,708,171✔
523
  req.clientId = clientId;
192,704,880✔
524

525
  int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
192,704,880✔
526
  if (msgSize < 0) {
192,675,016✔
527
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
×
528
    QW_ERR_RET(msgSize);
×
529
  }
530
  void *msg = rpcMallocCont(msgSize);
192,675,016✔
531
  if (NULL == msg) {
192,669,896✔
532
    QW_SCH_ELOG("calloc %d failed", msgSize);
×
533
    QW_ERR_RET(terrno);
×
534
  }
535

536
  msgSize = tSerializeSSchedulerHbReq(msg, msgSize, &req);
192,669,896✔
537
  if (msgSize < 0) {
192,668,468✔
538
    QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
×
539
    rpcFreeCont(msg);
×
540
    QW_ERR_RET(msgSize);
×
541
  }
542

543
  SRpcMsg brokenMsg = {
192,668,468✔
544
      .msgType = TDMT_SCH_QUERY_HEARTBEAT,
545
      .pCont = msg,
546
      .contLen = msgSize,
547
      .code = TSDB_CODE_RPC_BROKEN_LINK,
548
      .info = *pConn,
549
  };
550

551
  tmsgRegisterBrokenLinkArg(&brokenMsg);
192,613,157✔
552

553
  return TSDB_CODE_SUCCESS;
192,714,014✔
554
}
555

556
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGrant, int32_t* qType) {
375,468,090✔
557
  if (NULL == qWorkerMgmt || NULL == pMsg) {
375,468,090✔
558
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
30✔
559
  }
560

561
  int32_t       code = 0;
375,468,090✔
562
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
375,468,090✔
563
  SSubQueryMsg  msg = {0};
375,468,090✔
564
  if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
375,488,498✔
565
    QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
×
566
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
567
  }
568

569
  if (chkGrant) {
375,493,013✔
570
    if ((!TEST_SHOW_REWRITE_MASK(msg.msgMask))) {
306,143,563✔
571
      if ((code = taosGranted(TSDB_GRANT_ALL)) < 0) {
302,958,436✔
572
        QW_ELOG("query failed cause of grant expired, msgMask:%d", msg.msgMask);
×
573
        tFreeSSubQueryMsg(&msg);
×
574
        QW_ERR_RET(code);
×
575
      }
576
      if ((TEST_VIEW_MASK(msg.msgMask)) && ((code = taosGranted(TSDB_GRANT_VIEW)) < 0)) {
302,900,859✔
577
        QW_ELOG("query failed cause of view grant expired, msgMask:%d", msg.msgMask);
×
578
        tFreeSSubQueryMsg(&msg);
×
579
        QW_ERR_RET(code);
×
580
      }
581
      if ((TEST_AUDIT_MASK(msg.msgMask)) && ((code = taosGranted(TSDB_GRANT_AUDIT)) < 0)) {
302,900,299✔
582
        QW_ELOG("query failed cause of audit grant expired, msgMask:%d", msg.msgMask);
×
583
        tFreeSSubQueryMsg(&msg);
×
584
        QW_ERR_RET(code);
×
585
      }
586
    }
587
  }
588

589
  uint64_t sId = msg.sId;
375,434,876✔
590
  uint64_t qId = msg.queryId;
375,434,876✔
591
  uint64_t cId = msg.clientId;
375,434,876✔
592
  uint64_t tId = msg.taskId;
375,434,876✔
593
  int64_t  rId = msg.refId;
375,434,876✔
594
  int32_t  eId = msg.execId;
375,434,876✔
595

596
  *qType = msg.taskType;  // task type: TASK_TYPE_HQUERY or TASK_TYPE_QUERY
375,434,876✔
597

598
  SQWMsg qwMsg = {
750,838,326✔
599
      .msgType = pMsg->msgType, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info, .msgMask = msg.msgMask, .subQType = msg.subQType};
375,480,338✔
600

601
  QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p, SQL:%s", pMsg->info.handle, msg.sql);
375,403,953✔
602
  code = qwPreprocessQuery(QW_FPARAMS(), &qwMsg);
375,465,507✔
603
  QW_SCH_TASK_DLOG("prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code);
375,499,695✔
604

605
  tFreeSSubQueryMsg(&msg);
375,501,068✔
606

607
  return code;
375,456,356✔
608
}
609

610
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
×
611
  if (NULL == qWorkerMgmt || NULL == pMsg) {
×
612
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
613
  }
614

615
  int32_t       code = 0;
×
616
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
×
617
  SSubQueryMsg msg = {0};
×
618
  if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
×
619
    QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
×
620
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
621
  }
622

623
  uint64_t sId = msg.sId;
×
624
  uint64_t qId = msg.queryId;
×
625
  uint64_t cId = msg.clientId;
×
626
  uint64_t tId = msg.taskId;
×
627
  int64_t  rId = msg.refId;
×
628
  int32_t  eId = msg.execId;
×
629

630
  QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle);
×
631
  code = qwAbortPrerocessQuery(QW_FPARAMS());
×
632
  QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code);
×
633

634
  tFreeSSubQueryMsg(&msg);
×
635

636
  return TSDB_CODE_SUCCESS;
×
637
}
638

639
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
375,480,417✔
640
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
375,480,417✔
641
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
642
  }
643

644
  int32_t       code = 0;
375,480,417✔
645
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
375,480,417✔
646

647
  QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
375,480,417✔
648
  QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1);
375,457,814✔
649

650
  SSubQueryMsg  msg = {0};
375,506,899✔
651
  if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) {
375,506,768✔
652
    QW_ELOG("tDeserializeSSubQueryMsg failed, contLen:%d", pMsg->contLen);
×
653
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
654
  }
655

656
  uint64_t sId = msg.sId;
375,507,128✔
657
  uint64_t qId = msg.queryId;
375,507,128✔
658
  uint64_t cId = msg.clientId;
375,507,128✔
659
  uint64_t tId = msg.taskId;
375,507,128✔
660
  int64_t  rId = msg.refId;
375,507,128✔
661
  int32_t  eId = msg.execId;
375,507,128✔
662

663
  SQWMsg qwMsg = {.node = node, .msg = msg.msg, .msgLen = msg.msgLen, .connInfo = pMsg->info, .msgType = pMsg->msgType, .code = pMsg->code, .subEndPoints = &msg.subEndPoints};
375,507,128✔
664
  qwMsg.msgInfo.explain = msg.explain;
375,462,433✔
665
  qwMsg.msgInfo.taskType = msg.taskType;
375,462,433✔
666
  qwMsg.msgInfo.needFetch = msg.needFetch;
375,462,433✔
667
  qwMsg.msgInfo.compressMsg = msg.compress;
375,462,433✔
668

669
  QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, compress:%d, handle:%p, SQL:%s, code:0x%x, subEndPointsNum:%d", 
375,462,433✔
670
    node, TMSG_INFO(pMsg->msgType), msg.compress, pMsg->info.handle, msg.sql, qwMsg.code, (int32_t)taosArrayGetSize(*qwMsg.subEndPoints));
671

672
  code = qwProcessQuery(QW_FPARAMS(), &qwMsg, msg.sql);
375,480,416✔
673
  msg.sql = NULL;
375,483,480✔
674

675
  QW_SCH_TASK_DLOG("processQuery end, node:%p, code:%x", node, code);
375,483,480✔
676
  tFreeSSubQueryMsg(&msg);
375,486,301✔
677

678
  return TSDB_CODE_SUCCESS;
375,490,493✔
679
}
680

681
int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
55,925,042✔
682
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
55,925,042✔
UNCOV
683
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
684
  }
685

686
  int32_t            code = 0;
55,925,042✔
687
  int8_t             status = 0;
55,925,042✔
688
  bool               queryDone = false;
55,925,042✔
689
  SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
55,925,042✔
690
  bool               needStop = false;
55,927,536✔
691
  SQWTaskCtx        *handles = NULL;
55,927,536✔
692
  SQWorker          *mgmt = (SQWorker *)qWorkerMgmt;
55,927,536✔
693

694
  QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE));
55,927,536✔
695
  QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1);
55,923,805✔
696

697
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
55,927,646✔
698
    QW_ELOG("invalid cquery msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
×
699
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
700
  }
701

702
  uint64_t sId = msg->sId;
55,928,267✔
703
  uint64_t qId = msg->queryId;
55,928,177✔
704
  uint64_t cId = msg->clientId;
55,925,763✔
705
  uint64_t tId = msg->taskId;
55,924,513✔
706
  int64_t  rId = 0;
55,926,325✔
707
  int32_t  eId = msg->execId;
55,926,325✔
708

709
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
55,926,299✔
710

711
  QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
55,916,767✔
712

713
  code = qwProcessCQuery(QW_FPARAMS(), &qwMsg);
55,916,767✔
714

715
  QW_SCH_TASK_DLOG("processCQuery end, node:%p, code:0x%x", node, code);
55,926,931✔
716

717
  return TSDB_CODE_SUCCESS;
55,926,308✔
718
}
719

720
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
534,037,446✔
721
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
534,037,446✔
UNCOV
722
    return TSDB_CODE_QRY_INVALID_INPUT;
×
723
  }
724

725
  SResFetchReq req = {0};
534,049,983✔
726
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
534,049,925✔
727

728
  QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
534,049,925✔
729
  QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1);
534,010,139✔
730

731
  if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
534,127,378✔
732
    QW_ELOG("tDeserializeSResFetchReq %d failed", pMsg->contLen);
×
733
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
734
  }
735

736
  uint64_t sId = req.sId;
534,072,020✔
737
  uint64_t qId = req.queryId;
534,072,020✔
738
  uint64_t cId = req.clientId;
534,072,020✔
739
  uint64_t tId = req.taskId;
534,072,020✔
740
  int64_t  rId = 0;
534,072,020✔
741
  int32_t  eId = req.execId;
534,072,020✔
742

743
  SQWMsg qwMsg = {.req = &req, .node = node, .msg = req.pOpParam, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
534,072,020✔
744

745
  QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
534,059,928✔
746

747
  int32_t code = qwProcessFetch(QW_FPARAMS(), &qwMsg);
534,078,548✔
748

749
  QW_SCH_TASK_DLOG("processFetch end, node:%p, code:%x", node, code);
534,090,458✔
750

751
  tDestroySResFetchReq(&req);
534,091,995✔
752

753
  return TSDB_CODE_SUCCESS;
534,067,262✔
754
}
755

756
int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
309,387,177✔
757
  SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
309,387,177✔
758
  if (mgmt) {
309,387,177✔
759
    QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
×
760
    QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1);
×
761
  }
762

763
  qProcessRspMsg(NULL, pMsg, NULL);
309,387,177✔
764
  pMsg->pCont = NULL;
309,420,072✔
765
  return TSDB_CODE_SUCCESS;
309,431,498✔
766
}
767

768
#if 0
769
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
770
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
771
    return TSDB_CODE_QRY_INVALID_INPUT;
772
  }
773

774
  SQWorker       *mgmt = (SQWorker *)qWorkerMgmt;
775
  int32_t         code = 0;
776
  STaskCancelReq *msg = pMsg->pCont;
777

778
  QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
779
  QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1);
780

781
  if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
782
    qError("invalid task cancel msg");
783
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
784
  }
785

786
  msg->sId = be64toh(msg->sId);
787
  msg->queryId = be64toh(msg->queryId);
788
  msg->clientId = be64toh(msg->clientId);
789
  msg->taskId = be64toh(msg->taskId);
790
  msg->refId = be64toh(msg->refId);
791
  msg->execId = ntohl(msg->execId);
792

793
  uint64_t sId = msg->sId;
794
  uint64_t qId = msg->queryId;
795
  uint64_t cId = msg->clientId;
796
  uint64_t tId = msg->taskId;
797
  int64_t  rId = msg->refId;
798
  int32_t  eId = msg->execId;
799

800
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
801

802
  // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
803

804
_return:
805

806
  QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
807
  QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
808

809
  return TSDB_CODE_SUCCESS;
810
}
811
#endif
812

813
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
375,214,186✔
814
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
375,214,186✔
815
    return TSDB_CODE_QRY_INVALID_INPUT;
×
816
  }
817

818
  int32_t       code = 0;
375,218,964✔
819
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
375,218,964✔
820

821
  QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
375,218,964✔
822
  QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1);
375,213,852✔
823

824
  STaskDropReq  msg = {0};
375,224,188✔
825
  if (tDeserializeSTaskDropReq(pMsg->pCont, pMsg->contLen, &msg) < 0) {
375,224,567✔
826
    QW_ELOG("tDeserializeSTaskDropReq failed, contLen:%d", pMsg->contLen);
×
827
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
828
  }
829

830
  uint64_t sId = msg.sId;
375,208,676✔
831
  uint64_t qId = msg.queryId;
375,208,676✔
832
  uint64_t cId = msg.clientId;
375,208,676✔
833
  uint64_t tId = msg.taskId;
375,208,676✔
834
  int64_t  rId = msg.refId;
375,208,676✔
835
  int32_t  eId = msg.execId;
375,208,676✔
836

837
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
375,208,676✔
838

839
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
375,205,821✔
840
    QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
16,980✔
841
  }
842

843
  QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
375,203,736✔
844

845
  code = qwProcessDrop(QW_FPARAMS(), &qwMsg);
375,211,309✔
846

847
  QW_SCH_TASK_DLOG("processDrop end, node:%p, code:%x", node, code);
375,195,391✔
848

849
  return TSDB_CODE_SUCCESS;
375,215,426✔
850
}
851

852
int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
31,431✔
853
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
31,431✔
854
    return TSDB_CODE_QRY_INVALID_INPUT;
×
855
  }
856

857
  int32_t       code = 0;
31,431✔
858
  SQWorker     *mgmt = (SQWorker *)qWorkerMgmt;
31,431✔
859

860
  QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
31,431✔
861
  QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1);
31,431✔
862

863
  STaskNotifyReq  msg = {0};
31,431✔
864
  if (tDeserializeSTaskNotifyReq(pMsg->pCont, pMsg->contLen, &msg) < 0) {
31,431✔
865
    QW_ELOG("tDeserializeSTaskNotifyReq failed, contLen:%d", pMsg->contLen);
×
866
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
867
  }
868

869
  uint64_t sId = msg.sId;
31,431✔
870
  uint64_t qId = msg.queryId;
31,431✔
871
  uint64_t cId = msg.clientId;
31,431✔
872
  uint64_t tId = msg.taskId;
31,431✔
873
  int64_t  rId = msg.refId;
31,431✔
874
  int32_t  eId = msg.execId;
31,431✔
875

876
  SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info, .msgType = msg.type};
31,431✔
877

878
  QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle);
31,431✔
879

880
  code = qwProcessNotify(QW_FPARAMS(), &qwMsg);
31,431✔
881

882
  QW_SCH_TASK_DLOG("processNotify end, node:%p, code:%x", node, code);
31,431✔
883

884
  return TSDB_CODE_SUCCESS;
31,431✔
885
}
886

887

888
int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) {
193,875,588✔
889
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
193,875,588✔
UNCOV
890
    return TSDB_CODE_QRY_INVALID_INPUT;
×
891
  }
892

893
  int32_t         code = 0;
193,877,887✔
894
  SSchedulerHbReq req = {0};
193,877,887✔
895
  SQWorker       *mgmt = (SQWorker *)qWorkerMgmt;
193,877,663✔
896
  uint64_t        clientId = 0;
193,877,663✔
897

898
  QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
193,877,663✔
899
  QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
193,876,698✔
900

901
  if (NULL == pMsg->pCont) {
193,880,768✔
902
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
×
903
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
904
  }
905

906
  if (tDeserializeSSchedulerHbReq(pMsg->pCont, pMsg->contLen, &req)) {
193,881,277✔
907
    QW_ELOG("invalid hb msg, msg:%p, msgLen:%d", pMsg->pCont, pMsg->contLen);
×
908
    tFreeSSchedulerHbReq(&req);
×
909
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
910
  }
911

912
  clientId = req.clientId;
193,869,991✔
913

914
  SQWMsg   qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
193,869,991✔
915
  if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
193,852,393✔
916
    QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
1,166,241✔
917
  }
918

919
  QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
193,865,295✔
920

921
  code = qwProcessHb(mgmt, &qwMsg, &req);
193,874,487✔
922

923
  QW_SCH_DLOG("processHb end, node:%p, code:%x", node, code);
193,867,785✔
924

925
  return TSDB_CODE_SUCCESS;
193,874,030✔
926
}
927

928
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes) {
1,822,893✔
929
  if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
1,822,893✔
930
    QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
×
931
  }
932

933
  int32_t     code = 0;
1,822,893✔
934
  SVDeleteReq req = {0};
1,822,893✔
935
  SQWorker   *mgmt = (SQWorker *)qWorkerMgmt;
1,822,893✔
936

937
  QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1);
1,822,893✔
938

939
  QW_ERR_RET(tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req));
1,822,893✔
940

941
  uint64_t sId = req.sId;
1,822,314✔
942
  uint64_t qId = req.queryId;
1,822,314✔
943
  uint64_t cId = req.clientId;
1,822,314✔
944
  uint64_t tId = req.taskId;
1,822,314✔
945
  int64_t  rId = 0;
1,822,314✔
946
  int32_t  eId = -1;
1,822,314✔
947
  pRes->source = req.source;
1,822,314✔
948
  pRes->secureDelete = req.secureDelete;
1,822,314✔
949

950
  SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info};
1,822,893✔
951
  QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql);
1,822,893✔
952
  taosMemoryFreeClear(req.sql);
1,822,893✔
953

954
  QW_ERR_JRET(qwProcessDelete(QW_FPARAMS(), &qwMsg, pRes));
1,822,893✔
955

956
  taosMemoryFreeClear(req.msg);
1,822,893✔
957
  QW_SCH_TASK_DLOG("processDelete end, node:%p", node);
1,822,893✔
958

959
_return:
1,822,893✔
960

961
  QW_RET(code);
1,822,893✔
962
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc