• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.23
/source/dnode/vnode/src/tq/tq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "osDef.h"
18
#include "taoserror.h"
19
#include "tqCommon.h"
20
#include "tstream.h"
21
#include "vnd.h"
22

23
// 0: not init
24
// 1: already inited
25
// 2: wait to be inited or cleanup
26
static int32_t tqInitialize(STQ* pTq);
27

28
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return pHandle != NULL ? TMQ_HANDLE_STATUS_EXEC == pHandle->status : true; }
3,176,633!
29
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
3,175,855✔
30
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
3,176,653✔
31

32
void tqDestroyTqHandle(void* data) {
4,191✔
33
  if (data == NULL) return;
4,191!
34
  STqHandle* pData = (STqHandle*)data;
4,191✔
35
  qDestroyTask(pData->execHandle.task);
4,191✔
36

37
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4,191✔
38
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
3,853!
39
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
338✔
40
    tqReaderClose(pData->execHandle.pTqReader);
254✔
41
    walCloseReader(pData->pWalReader);
254✔
42
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
254✔
43
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
84!
44
    walCloseReader(pData->pWalReader);
84✔
45
    tqReaderClose(pData->execHandle.pTqReader);
84✔
46
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
84!
47
    nodesDestroyNode(pData->execHandle.execTb.node);
84✔
48
  }
49
  if (pData->msg != NULL) {
4,190!
50
    rpcFreeCont(pData->msg->pCont);
×
51
    taosMemoryFree(pData->msg);
×
52
    pData->msg = NULL;
×
53
  }
54
  if (pData->block != NULL) {
4,190!
55
    blockDataDestroy(pData->block);
×
56
  }
57
  if (pData->pRef) {
4,190✔
58
    walCloseRef(pData->pRef->pWal, pData->pRef->refId);
4,153✔
59
  }
60
  taosHashCleanup(pData->tableCreateTimeHash);
4,191✔
61
}
62

63
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
11,423✔
64
  if (pLeft == NULL || pRight == NULL) {
11,423!
65
    return false;
×
66
  }
67
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
22,461✔
68
         pLeft->val.version == pRight->val.version;
11,038✔
69
}
70

71
int32_t tqOpen(const char* path, SVnode* pVnode) {
11,889✔
72
  if (path == NULL || pVnode == NULL) {
11,889!
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
75
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
11,906!
76
  if (pTq == NULL) {
11,904!
77
    return terrno;
×
78
  }
79

80
  pVnode->pTq = pTq;
11,904✔
81
  pTq->pVnode = pVnode;
11,904✔
82

83
  pTq->path = taosStrdup(path);
11,904!
84
  if (pTq->path == NULL) {
11,900!
85
    return terrno;
×
86
  }
87

88
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
11,900✔
89
  if (pTq->pHandle == NULL) {
11,901!
90
    return terrno;
×
91
  }
92
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
11,901✔
93

94
  taosInitRWLatch(&pTq->lock);
11,900✔
95

96
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
11,900✔
97
  if (pTq->pPushMgr == NULL) {
11,900!
98
    return terrno;
×
99
  }
100

101
  pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
11,900✔
102
  if (pTq->pCheckInfo == NULL) {
11,902!
103
    return terrno;
×
104
  }
105
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
11,902✔
106

107
  pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK);
11,902✔
108
  if (pTq->pOffset == NULL) {
11,902!
109
    return terrno;
×
110
  }
111
  taosHashSetFreeFp(pTq->pOffset, (FDelete)tDeleteSTqOffset);
11,902✔
112

113
  return tqInitialize(pTq);
11,902✔
114
}
115

116
int32_t tqInitialize(STQ* pTq) {
11,902✔
117
  if (pTq == NULL) {
11,902!
118
    return TSDB_CODE_INVALID_PARA;
×
119
  }
120
  int32_t vgId = TD_VID(pTq->pVnode);
11,902✔
121
  int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1,
11,902✔
122
                                tqStartTaskCompleteCallback, &pTq->pStreamMeta);
123
  if (code != TSDB_CODE_SUCCESS) {
11,907!
124
    return code;
×
125
  }
126

127
  streamMetaLoadAllTasks(pTq->pStreamMeta);
11,907✔
128
  return tqMetaOpen(pTq);
11,907✔
129
}
130

131
void tqClose(STQ* pTq) {
11,904✔
132
  qDebug("start to close tq");
11,904✔
133
  if (pTq == NULL) {
11,906!
134
    return;
×
135
  }
136

137
  int32_t vgId = 0;
11,906✔
138
  if (pTq->pVnode != NULL) {
11,906✔
139
    vgId = TD_VID(pTq->pVnode);
11,904✔
140
  } else if (pTq->pStreamMeta != NULL) {
2!
141
    vgId = pTq->pStreamMeta->vgId;
×
142
  }
143

144
  // close the stream meta firstly
145
  streamMetaClose(pTq->pStreamMeta);
11,906✔
146

147
  void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
11,908✔
148
  while (pIter) {
11,952✔
149
    STqHandle* pHandle = *(STqHandle**)pIter;
44✔
150
    if (pHandle->msg != NULL) {
44!
151
      tqPushEmptyDataRsp(pHandle, vgId);
44✔
152
      rpcFreeCont(pHandle->msg->pCont);
44✔
153
      taosMemoryFree(pHandle->msg);
44!
154
      pHandle->msg = NULL;
44✔
155
    }
156
    pIter = taosHashIterate(pTq->pPushMgr, pIter);
44✔
157
  }
158

159
  taosHashCleanup(pTq->pHandle);
11,908✔
160
  taosHashCleanup(pTq->pPushMgr);
11,908✔
161
  taosHashCleanup(pTq->pCheckInfo);
11,908✔
162
  taosHashCleanup(pTq->pOffset);
11,908✔
163
  taosMemoryFree(pTq->path);
11,908!
164
  tqMetaClose(pTq);
11,908✔
165
  qDebug("vgId:%d end to close tq", vgId);
11,908✔
166

167
#if 0
168
  streamMetaFreeTQDuringScanWalError(pTq);
169
#endif
170

171
  taosMemoryFree(pTq);
11,908!
172
}
173

174
void tqNotifyClose(STQ* pTq) {
17,925✔
175
  if (pTq == NULL) {
17,925!
176
    return;
×
177
  }
178

179
  if (pTq->pStreamMeta != NULL) {
17,925!
180
    streamMetaNotifyClose(pTq->pStreamMeta);
17,925✔
181
  }
182
}
183

184
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
765✔
185
  if (pHandle == NULL) {
765!
186
    return;
×
187
  }
188
  int32_t    code = 0;
765✔
189
  SMqPollReq req = {0};
765✔
190
  code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req);
765✔
191
  if (code < 0) {
765!
192
    tqError("tDeserializeSMqPollReq %d failed, code:%d", pHandle->msg->contLen, code);
×
193
    return;
×
194
  }
195

196
  SMqDataRsp dataRsp = {0};
765✔
197
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
765✔
198
  if (code != 0) {
765!
199
    tqError("tqInitDataRsp failed, code:%d", code);
×
200
    return;
×
201
  }
202
  dataRsp.blockNum = 0;
765✔
203
  char buf[TSDB_OFFSET_LEN] = {0};
765✔
204
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
765✔
205
  tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, QID:0x%" PRIx64, req.consumerId, vgId, buf,
765!
206
         req.reqId);
207

208
  code = tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
765✔
209
  if (code != 0) {
765!
210
    tqError("tqSendDataRsp failed, code:%d", code);
×
211
  }
212
  tDeleteMqDataRsp(&dataRsp);
765✔
213
}
214

215
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp, int32_t type,
2,843,022✔
216
                      int32_t vgId) {
217
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
2,843,022!
218
    return TSDB_CODE_INVALID_PARA;
×
219
  }
220
  int64_t sver = 0, ever = 0;
2,843,025✔
221
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
2,843,025✔
222

223
  char buf1[TSDB_OFFSET_LEN] = {0};
2,843,014✔
224
  char buf2[TSDB_OFFSET_LEN] = {0};
2,843,014✔
225
  (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
2,843,014✔
226
  (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
2,843,027✔
227

228
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) start to send rsp, block num:%d, req:%s, rsp:%s, QID:0x%" PRIx64,
2,843,022✔
229
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
230

231
  return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
2,843,027✔
232
}
233

234
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
12,418✔
235
  if (pTq == NULL) {
12,418!
236
    return TSDB_CODE_INVALID_PARA;
×
237
  }
238
  SMqVgOffset vgOffset = {0};
12,418✔
239
  int32_t     vgId = TD_VID(pTq->pVnode);
12,418✔
240

241
  int32_t  code = 0;
12,418✔
242
  SDecoder decoder;
243
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
12,418✔
244
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
12,425!
245
    code = TSDB_CODE_INVALID_MSG;
×
246
    goto end;
×
247
  }
248

249
  tDecoderClear(&decoder);
12,423✔
250

251
  STqOffset* pOffset = &vgOffset.offset;
12,416✔
252

253
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
12,416!
254
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
446!
255
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
256
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
11,970!
257
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
11,971✔
258
            pOffset->val.version);
259
  } else {
UNCOV
260
    tqError("invalid commit offset type:%d", pOffset->val.type);
×
261
    code = TSDB_CODE_INVALID_MSG;
×
262
    goto end;
×
263
  }
264

265
  STqOffset* pSavedOffset = NULL;
12,430✔
266
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
12,430✔
267
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
12,430✔
268
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
7!
269
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
270
    goto end;  // no need to update the offset value
7✔
271
  }
272

273
  // save the new offset value
274
  if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))) {
12,423!
275
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
276
    return -1;
×
277
  }
278

279
  if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg,
12,423!
280
                     msgLen >= sizeof(vgOffset.consumerId) ? msgLen - sizeof(vgOffset.consumerId) : 0) < 0) {
281
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
282
    return -1;
×
283
  }
284

285
  return 0;
12,422✔
286
end:
7✔
287
  tOffsetDestroy(&vgOffset.offset.val);
7✔
288
  return code;
7✔
289
}
290

291
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
28✔
292
  if (pTq == NULL || pMsg == NULL) {
28!
293
    return TSDB_CODE_INVALID_PARA;
×
294
  }
295
  SMqSeekReq req = {0};
28✔
296
  int32_t    vgId = TD_VID(pTq->pVnode);
28✔
297
  SRpcMsg    rsp = {.info = pMsg->info};
28✔
298
  int        code = 0;
28✔
299

300
  if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
28!
301
    code = TSDB_CODE_OUT_OF_MEMORY;
×
302
    goto end;
×
303
  }
304

305
  tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
28✔
306
  taosWLockLatch(&pTq->lock);
28✔
307

308
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
28✔
309
  if (pHandle == NULL) {
28!
310
    tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
×
311
    code = 0;
×
312
    taosWUnLockLatch(&pTq->lock);
×
313
    goto end;
×
314
  }
315

316
  // 2. check consumer-vg assignment status
317
  if (pHandle->consumerId != req.consumerId) {
28!
318
    tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
319
            req.consumerId, vgId, req.subKey, pHandle->consumerId);
320
    taosWUnLockLatch(&pTq->lock);
×
321
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
322
    goto end;
×
323
  }
324

325
  // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
326
  // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
327
  tqUnregisterPushHandle(pTq, pHandle);
28✔
328
  taosWUnLockLatch(&pTq->lock);
28✔
329

330
end:
28✔
331
  rsp.code = code;
28✔
332
  tmsgSendRsp(&rsp);
28✔
333
  return 0;
28✔
334
}
335

336
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
191✔
337
  if (pTq == NULL) {
191!
338
    return TSDB_CODE_INVALID_PARA;
×
339
  }
340
  void* pIter = NULL;
191✔
341

342
  while (1) {
11✔
343
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
202✔
344
    if (pIter == NULL) {
202✔
345
      break;
156✔
346
    }
347

348
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
46✔
349

350
    if (pCheck->ntbUid == tbUid) {
46!
351
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
46✔
352
      for (int32_t i = 0; i < sz; i++) {
168✔
353
        int16_t* pForbidColId = taosArrayGet(pCheck->colIdList, i);
157✔
354
        if (pForbidColId == NULL) {
157!
355
          continue;
×
356
        }
357

358
        if ((*pForbidColId) == colId) {
157✔
359
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
35✔
360
          return -1;
35✔
361
        }
362
      }
363
    }
364
  }
365

366
  return 0;
156✔
367
}
368

369
int32_t tqProcessPollPush(STQ* pTq) {
27,245✔
370
  if (pTq == NULL) {
27,245!
371
    return TSDB_CODE_INVALID_PARA;
×
372
  }
373
  int32_t vgId = TD_VID(pTq->pVnode);
27,245✔
374
  taosWLockLatch(&pTq->lock);
27,245✔
375
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
27,245!
376
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
27,245✔
377

378
    while (pIter) {
56,416✔
379
      STqHandle* pHandle = *(STqHandle**)pIter;
29,171✔
380
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
29,171✔
381

382
      if (pHandle->msg == NULL) {
29,171!
383
        tqError("pHandle->msg should not be null");
×
384
        taosHashCancelIterate(pTq->pPushMgr, pIter);
×
385
        break;
×
386
      } else {
387
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME,
29,171✔
388
                       .pCont = pHandle->msg->pCont,
29,171✔
389
                       .contLen = pHandle->msg->contLen,
29,171✔
390
                       .info = pHandle->msg->info};
29,171✔
391
        if (tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg) != 0){
29,171!
392
          tqError("vgId:%d tmsgPutToQueue failed, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
×
393
        }
394
        taosMemoryFree(pHandle->msg);
29,171!
395
        pHandle->msg = NULL;
29,171✔
396
      }
397

398
      pIter = taosHashIterate(pTq->pPushMgr, pIter);
29,171✔
399
    }
400

401
    taosHashClear(pTq->pPushMgr);
27,245✔
402
  }
403
  taosWUnLockLatch(&pTq->lock);
27,245✔
404
  return 0;
27,245✔
405
}
406

407
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
3,177,901✔
408
  if (pTq == NULL || pMsg == NULL) {
3,177,901!
409
    return TSDB_CODE_INVALID_PARA;
×
410
  }
411
  SMqPollReq req = {0};
3,178,692✔
412
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
3,178,692✔
413
  if (code < 0) {
3,176,602!
414
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
415
    terrno = TSDB_CODE_INVALID_MSG;
×
416
    goto END;
×
417
  }
418
  if (req.rawData == 1){
3,176,602✔
419
    req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
19,291✔
420
    if (req.uidHash == NULL) {
21,392!
421
      tqError("tq poll rawData taosHashInit failed");
×
422
      code = terrno;
×
423
      goto END;
×
424
    }
425
  }
426
  int64_t      consumerId = req.consumerId;
3,178,703✔
427
  int32_t      reqEpoch = req.epoch;
3,178,703✔
428
  STqOffsetVal reqOffset = req.reqOffset;
3,178,703✔
429
  int32_t      vgId = TD_VID(pTq->pVnode);
3,178,703✔
430
  STqHandle*   pHandle = NULL;
3,178,703✔
431

432
  while (1) {
11✔
433
    taosWLockLatch(&pTq->lock);
3,178,714✔
434
    // 1. find handle
435
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
3,178,783✔
436
    if (code != TDB_CODE_SUCCESS) {
3,176,555✔
437
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
2,552!
438
      terrno = TSDB_CODE_INVALID_MSG;
2,552✔
439
      taosWUnLockLatch(&pTq->lock);
2,552✔
440
      return -1;
3,346✔
441
    }
442

443
    // 2. check rebalance status
444
    if (pHandle->consumerId != consumerId) {
3,174,003✔
445
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
15!
446
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
447
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
448
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
15✔
449
      taosWUnLockLatch(&pTq->lock);
15✔
450
      code = -1;
15✔
451
      goto END;
15✔
452
    }
453

454
    bool exec = tqIsHandleExec(pHandle);
3,173,988✔
455
    if (!exec) {
3,173,988!
456
      tqSetHandleExec(pHandle);
3,175,855!
457
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
458
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
3,175,855✔
459
              req.subKey, pHandle);
460
      taosWUnLockLatch(&pTq->lock);
3,176,750✔
461
      break;
3,176,846✔
462
    }
463
    taosWUnLockLatch(&pTq->lock);
×
464

465
    tqDebug("tmq poll: consumer:0x%" PRIx64
11!
466
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
467
            consumerId, vgId, req.subKey, pHandle);
468
    taosMsleep(10);
11✔
469
  }
470

471
  // 3. update the epoch value
472
  if (pHandle->epoch < reqEpoch) {
3,176,846✔
473
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
1,444✔
474
            reqEpoch);
475
    pHandle->epoch = reqEpoch;
1,444✔
476
  }
477

478
  char buf[TSDB_OFFSET_LEN] = {0};
3,176,846✔
479
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
3,176,846✔
480
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, QID:0x%" PRIx64,
3,176,840✔
481
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
482

483
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
3,176,854✔
484
  tqSetHandleIdle(pHandle);
3,176,653✔
485

486
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
3,176,653✔
487
          req.subKey, pHandle);
488

489
END:
310✔
490
  tDestroySMqPollReq(&req);
3,176,886✔
491
  return code;
3,176,827✔
492
}
493

494
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
5✔
495
  if (pTq == NULL || pMsg == NULL) {
5!
496
    return TSDB_CODE_INVALID_PARA;
×
497
  }
498
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
5✔
499
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
5✔
500

501
  SMqVgOffset vgOffset = {0};
5✔
502

503
  SDecoder decoder;
504
  tDecoderInit(&decoder, (uint8_t*)data, len);
5✔
505
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
5!
506
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
507
    return terrno;
×
508
  }
509

510
  tDecoderClear(&decoder);
5✔
511

512
  STqOffset* pSavedOffset = NULL;
5✔
513
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
5✔
514
  if (code != 0) {
5✔
515
    return TSDB_CODE_TMQ_NO_COMMITTED;
4✔
516
  }
517
  vgOffset.offset = *pSavedOffset;
1✔
518

519
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
1!
520
  if (code < 0) {
1!
521
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
522
  }
523

524
  void* buf = rpcMallocCont(len);
1✔
525
  if (buf == NULL) {
1!
526
    return terrno;
×
527
  }
528
  SEncoder encoder = {0};
1✔
529
  tEncoderInit(&encoder, buf, len);
1✔
530
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
1✔
531
  tEncoderClear(&encoder);
1✔
532
  if (code < 0) {
1!
533
    rpcFreeCont(buf);
×
534
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
535
  }
536

537
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
1✔
538

539
  tmsgSendRsp(&rsp);
1✔
540
  return 0;
1✔
541
}
542

543
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
43✔
544
  if (pTq == NULL || pMsg == NULL) {
43!
545
    return TSDB_CODE_INVALID_PARA;
×
546
  }
547
  int32_t    code = 0;
43✔
548
  SMqPollReq req = {0};
43✔
549
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
43!
550
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
551
    return TSDB_CODE_INVALID_MSG;
×
552
  }
553

554
  int64_t      consumerId = req.consumerId;
43✔
555
  STqOffsetVal reqOffset = req.reqOffset;
43✔
556
  int32_t      vgId = TD_VID(pTq->pVnode);
43✔
557

558
  // 1. find handle
559
  taosRLockLatch(&pTq->lock);
43✔
560
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
43✔
561
  if (pHandle == NULL) {
43!
562
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
563
    taosRUnLockLatch(&pTq->lock);
×
564
    return TSDB_CODE_INVALID_MSG;
×
565
  }
566

567
  // 2. check rebalance status
568
  if (pHandle->consumerId != consumerId) {
43!
569
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
570
            consumerId, vgId, req.subKey, pHandle->consumerId);
571
    taosRUnLockLatch(&pTq->lock);
×
572
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
573
  }
574

575
  int64_t sver = 0, ever = 0;
43✔
576
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
43✔
577
  taosRUnLockLatch(&pTq->lock);
43✔
578

579
  SMqDataRsp dataRsp = {0};
43✔
580
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
43✔
581
  if (code != 0) {
43!
582
    return code;
×
583
  }
584

585
  if (req.useSnapshot == true) {
43!
586
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
587
    code = TSDB_CODE_INVALID_PARA;
×
588
    goto END;
×
589
  }
590

591
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
43✔
592

593
  if (reqOffset.type == TMQ_OFFSET__LOG) {
43✔
594
    dataRsp.rspOffset.version = reqOffset.version;
17✔
595
  } else if (reqOffset.type < 0) {
26!
596
    STqOffset* pOffset = NULL;
26✔
597
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
26✔
598
    if (code == 0) {
26✔
599
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
10!
600
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
601
        code = TSDB_CODE_INVALID_PARA;
×
602
        goto END;
×
603
      }
604

605
      dataRsp.rspOffset.version = pOffset->val.version;
10✔
606
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
10!
607
             req.subKey, dataRsp.rspOffset.version);
608
    } else {
609
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
16✔
610
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
13✔
611
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
3!
612
        dataRsp.rspOffset.version = ever;
3✔
613
      }
614
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
16!
615
             dataRsp.rspOffset.version);
616
    }
617
  } else {
618
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
619
            reqOffset.type);
620
    code = TSDB_CODE_INVALID_PARA;
×
621
    goto END;
×
622
  }
623

624
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
43✔
625

626
END:
43✔
627
  tDeleteMqDataRsp(&dataRsp);
43✔
628
  return code;
43✔
629
}
630

631
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
925✔
632
  if (pTq == NULL || msg == NULL) {
925!
633
    return TSDB_CODE_INVALID_PARA;
×
634
  }
635
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
925✔
636
  int32_t        vgId = TD_VID(pTq->pVnode);
925✔
637

638
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
925!
639
  int32_t code = 0;
925✔
640

641
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
925✔
642
  if (pHandle) {
927✔
643
    while (1) {
×
644
      taosWLockLatch(&pTq->lock);
924✔
645
      bool exec = tqIsHandleExec(pHandle);
924✔
646

647
      if (exec) {
924!
648
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
×
649
               pHandle->subKey, pHandle);
650
        taosWUnLockLatch(&pTq->lock);
×
651
        taosMsleep(10);
×
652
        continue;
×
653
      }
654
      tqUnregisterPushHandle(pTq, pHandle);
924✔
655
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
924✔
656
      if (code != 0) {
924!
657
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
658
      }
659
      taosWUnLockLatch(&pTq->lock);
924✔
660
      break;
924✔
661
    }
662
  }
663

664
  taosWLockLatch(&pTq->lock);
927✔
665
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
927✔
666
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
257!
667
  }
668
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
927✔
669
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
254!
670
  }
671

672
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
927!
673
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
674
  }
675
  taosWUnLockLatch(&pTq->lock);
926✔
676

677
  return 0;
927✔
678
}
679

680
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
122✔
681
  if (pTq == NULL || msg == NULL) {
122!
682
    return TSDB_CODE_INVALID_PARA;
×
683
  }
684
  STqCheckInfo info = {0};
122✔
685
  int32_t      code = tqMetaDecodeCheckInfo(&info, msg, msgLen >= 0 ? msgLen : 0);
122✔
686
  if (code != 0) {
122!
687
    return code;
×
688
  }
689

690
  code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
122✔
691
  if (code != 0) {
122!
692
    tDeleteSTqCheckInfo(&info);
×
693
    return code;
×
694
  }
695

696
  return tqMetaSaveInfo(pTq, pTq->pCheckStore, info.topic, strlen(info.topic), msg, msgLen >= 0 ? msgLen : 0);
122✔
697
}
698

699
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
10✔
700
  if (pTq == NULL || msg == NULL) {
10!
701
    return TSDB_CODE_INVALID_PARA;
×
702
  }
703
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
10!
704
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
705
  }
706
  return tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, strlen(msg));
10✔
707
}
708

709
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
3,161✔
710
  if (pTq == NULL || msg == NULL) {
3,161!
711
    return TSDB_CODE_INVALID_PARA;
×
712
  }
713
  int         ret = 0;
3,162✔
714
  SMqRebVgReq req = {0};
3,162✔
715
  SDecoder    dc = {0};
3,162✔
716

717
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
3,162✔
718
  ret = tDecodeSMqRebVgReq(&dc, &req);
3,159✔
719
  if (ret < 0) {
3,157!
720
    goto end;
×
721
  }
722

723
  tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
3,157!
724
         req.oldConsumerId, req.newConsumerId);
725

726
  taosRLockLatch(&pTq->lock);
3,165✔
727
  STqHandle* pHandle = NULL;
3,165✔
728
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
3,165✔
729
  if (code != 0){
3,163✔
730
    tqInfo("vgId:%d, tq process sub req:%s, no such handle, create new one", pTq->pVnode->config.vgId, req.subKey);
1,443!
731
  }
732
  taosRUnLockLatch(&pTq->lock);
3,164✔
733
  if (pHandle == NULL) {
3,165✔
734
    if (req.oldConsumerId != -1) {
1,444✔
735
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
1!
736
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
737
    }
738
    if (req.newConsumerId == -1) {
1,444✔
739
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
1!
740
      ret = TSDB_CODE_INVALID_PARA;
1✔
741
      goto end;
1✔
742
    }
743
    STqHandle handle = {0};
1,443✔
744
    ret = tqMetaCreateHandle(pTq, &req, &handle);
1,443✔
745
    if (ret < 0) {
1,443!
746
      tqDestroyTqHandle(&handle);
×
747
      goto end;
×
748
    }
749
    taosWLockLatch(&pTq->lock);
1,443✔
750
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
1,443✔
751
    taosWUnLockLatch(&pTq->lock);
1,443✔
752
  } else {
753
    while (1) {
×
754
      taosWLockLatch(&pTq->lock);
1,721✔
755
      bool exec = tqIsHandleExec(pHandle);
1,721!
756
      if (exec) {
1,721!
757
        tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p",
×
758
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
759
        taosWUnLockLatch(&pTq->lock);
×
760
        taosMsleep(10);
×
761
        continue;
×
762
      }
763
      if (pHandle->consumerId == req.newConsumerId) {  // do nothing
1,721✔
764
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
98!
765
      } else {
766
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
1,623!
767
               req.newConsumerId);
768

769
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
1,623✔
770
        atomic_store_32(&pHandle->epoch, 0);
1,623✔
771
        tqUnregisterPushHandle(pTq, pHandle);
1,623✔
772
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
1,622✔
773
      }
774
      taosWUnLockLatch(&pTq->lock);
1,720✔
775
      break;
1,721✔
776
    }
777
  }
778

779
end:
3,165✔
780
  tDecoderClear(&dc);
3,165✔
781
  return ret;
3,164✔
782
}
783

784
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
38,377!
785

786
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
14,600✔
787
  STQ*             pTq = (STQ*)pTqObj;
14,600✔
788
  int32_t          vgId = TD_VID(pTq->pVnode);
14,600✔
789
  SCheckpointInfo* pChkInfo = NULL;
14,600✔
790

791
  tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
14,600✔
792

793
  int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer);
14,600✔
794
  if (code != TSDB_CODE_SUCCESS) {
14,597!
795
    return code;
×
796
  }
797

798
  pTask->pBackend = NULL;
14,597✔
799

800
  // sink
801
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,597✔
802
  if (pOutputInfo->type == TASK_OUTPUT__SMA) {
14,597!
UNCOV
803
    pOutputInfo->smaSink.vnode = pTq->pVnode;
×
UNCOV
804
    pOutputInfo->smaSink.smaSink = smaHandleRes;
×
805
  } else if (pOutputInfo->type == TASK_OUTPUT__TABLE) {
14,597✔
806
    pOutputInfo->tbSink.vnode = pTq->pVnode;
7,293✔
807
    pOutputInfo->tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
7,293✔
808

809
    int32_t   ver1 = 1;
7,293✔
810
    SMetaInfo info = {0};
7,293✔
811
    code = metaGetInfo(pTq->pVnode->pMeta, pOutputInfo->tbSink.stbUid, &info, NULL);
7,293✔
812
    if (code == TSDB_CODE_SUCCESS) {
7,285✔
813
      ver1 = info.skmVer;
6,777✔
814
    }
815

816
    SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper;
7,285✔
817
    pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
7,285✔
818
    if (pOutputInfo->tbSink.pTSchema == NULL) {
7,281!
819
      return terrno;
×
820
    }
821

822
    pOutputInfo->tbSink.pTbInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,281✔
823
    if (pOutputInfo->tbSink.pTbInfo == NULL) {
7,302!
824
      tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
×
825
      return terrno;
×
826
    }
827

828
    tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTbInfo, freePtr);
7,302✔
829
  }
830

831
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,607✔
832
    bool scanDropCtb = pTask->subtableWithoutMd5 ? true : false;
7,395✔
833
    SWalFilterCond cond = {.deleteMsg = 1, .scanDropCtb = scanDropCtb};  // delete msg also extract from wal files
7,395✔
834
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
7,395✔
835
    if (pTask->exec.pWalReader == NULL) {
7,394!
836
      tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno));
×
837
      return terrno;
×
838
    }
839
  }
840

841
  streamTaskResetUpstreamStageInfo(pTask);
14,607✔
842

843
  pChkInfo = &pTask->chkInfo;
14,616✔
844
  tqSetRestoreVersionInfo(pTask);
14,616✔
845

846
  char*       p = streamTaskGetStatus(pTask).name;
14,611✔
847
  const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
14,609✔
848

849
  if (pTask->info.fillHistory) {
14,606✔
850
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
4,887!
851
           " nextProcessVer:%" PRId64
852
           " child id:%d, level:%d, cur-status:%s, next-status:%s taskType:%d, related stream task:0x%x "
853
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
854
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
855
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
856
           (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
857
  } else {
858
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
9,719!
859
           " nextProcessVer:%" PRId64
860
           " child id:%d, level:%d, cur-status:%s next-status:%s taskType:%d, related helper-task:0x%x "
861
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
862
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
863
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
864
           (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
865

866
    if (pChkInfo->checkpointVer > pChkInfo->nextProcessVer) {
9,731!
867
      tqError("vgId:%d build stream task, s-task:%s, checkpointVer:%" PRId64 " > nextProcessVer:%" PRId64, vgId,
×
868
              pTask->id.idStr, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
869
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
870
    }
871
  }
872

873
  return 0;
14,619✔
874
}
875

876
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
22,545✔
877

878
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
22,963✔
879
  return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
22,963✔
880
}
881

882
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
14,067✔
883
  return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen,
14,082✔
884
                                      vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
14,067✔
885
}
886

887
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
2,347✔
888
  const char*    id = pTask->id.idStr;
2,347✔
889
  int64_t        nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
2,347✔
890
  SVersionRange* pStep2Range = &pTask->step2Range;
2,347✔
891
  int32_t        vgId = pTask->pMeta->vgId;
2,347✔
892

893
  // if it's an source task, extract the last version in wal.
894
  bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
2,347✔
895
  pTask->execInfo.step2Start = taosGetTimestampMs();
2,347✔
896

897
  if (done) {
2,347✔
898
    qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id,
1,201✔
899
           pStep2Range->minVer, pStep2Range->maxVer, 0.0);
900
    int32_t code = streamTaskPutTranstateIntoInputQ(pTask);  // todo: msg lost.
1,201✔
901
    if (code) {
1,201!
902
      qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
×
903
    }
904
    (void)streamExecTask(pTask);  // exec directly
1,201✔
905
  } else {
906
    STimeWindow* pWindow = &pTask->dataRange.window;
1,146✔
907
    tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
1,146✔
908
            ", do secondary scan-history from WAL after halt the related stream task:%s",
909
            id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey,
910
            pStreamTask->id.idStr);
911
    if (pTask->status.schedStatus != TASK_SCHED_STATUS__WAITING) {
1,146!
912
      tqError("s-task:%s level:%d unexpected sched-status:%d", id, pTask->info.taskLevel, pTask->status.schedStatus);
×
913
    }
914

915
    int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
1,146✔
916
    if (code) {
1,145!
917
      tqError("s-task:%s level:%d failed to set step2 param", id, pTask->info.taskLevel);
×
918
    }
919

920
    int64_t dstVer = pStep2Range->minVer;
1,145✔
921
    pTask->chkInfo.nextProcessVer = dstVer;
1,145✔
922

923
    walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
1,145✔
924
    tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
1,146✔
925
            pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
926

927
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
1,146✔
928

929
    // now the fill-history task starts to scan data from wal files.
930
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
1,146✔
931
  }
932
}
2,347✔
933

934
int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
2,347✔
935
  STQ* pTq = param;
2,347✔
936

937
  SStreamMeta* pMeta = pStreamTask->pMeta;
2,347✔
938
  STaskId      hId = pStreamTask->hTaskInfo.id;
2,347✔
939
  SStreamTask* pTask = NULL;
2,347✔
940
  int32_t      code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask);
2,347✔
941
  if (pTask == NULL) {
2,347!
942
    tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t)hId.taskId);
×
943
    return TSDB_CODE_SUCCESS;
×
944
  }
945

946
  doStartFillhistoryStep2(pTask, pStreamTask, pTq);
2,347✔
947

948
  streamMetaReleaseTask(pMeta, pTask);
2,347✔
949
  return TSDB_CODE_SUCCESS;
2,347✔
950
}
951

952
// this function should be executed by only one thread, so we set a sentinel to protect this function
953
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
2,527✔
954
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
2,527✔
955
  SStreamMeta*           pMeta = pTq->pStreamMeta;
2,527✔
956
  int32_t                code = TSDB_CODE_SUCCESS;
2,527✔
957
  SStreamTask*           pTask = NULL;
2,527✔
958
  SStreamTask*           pStreamTask = NULL;
2,527✔
959
  char*                  pStatus = NULL;
2,527✔
960
  int32_t                taskType = 0;
2,527✔
961

962
  code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
2,527✔
963
  if (pTask == NULL) {
2,527✔
964
    tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed",
2!
965
            pMeta->vgId, pReq->taskId);
966
    return code;
2✔
967
  }
968

969
  // do recovery step1
970
  const char* id = pTask->id.idStr;
2,525✔
971
  streamMutexLock(&pTask->lock);
2,525✔
972

973
  SStreamTaskState s = streamTaskGetStatus(pTask);
2,525✔
974
  pStatus = s.name;
2,525✔
975
  taskType = pTask->info.fillHistory;
2,525✔
976

977
  if ((s.state != TASK_STATUS__SCAN_HISTORY && taskType == STREAM_HISTORY_TASK) ||
2,525!
978
      (s.state != TASK_STATUS__READY && taskType == STREAM_RECALCUL_TASK) ||
2,525!
979
      (pTask->status.downstreamReady == 0)) {
2,525!
980
    tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not allowed/ready for scan-history data, quit", id,
×
981
            pMeta->vgId, s.name, pTask->status.downstreamReady);
982

UNCOV
983
    streamMutexUnlock(&pTask->lock);
×
UNCOV
984
    streamMetaReleaseTask(pMeta, pTask);
×
985
    return 0;
×
986
  }
987

988
  if (pTask->exec.pExecutor == NULL) {
2,525!
989
    tqError("s-task:%s vgId:%d executor is null, not executor scan history", id, pMeta->vgId);
×
990

UNCOV
991
    streamMutexUnlock(&pTask->lock);
×
UNCOV
992
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
993
    return 0;
×
994
  }
995

996
  streamMutexUnlock(&pTask->lock);
2,525✔
997

998
  // avoid multi-thread exec
999
  while (1) {
×
1000
    int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1);
2,525✔
1001
    if (sentinel != 0) {
2,525!
UNCOV
1002
      tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id);
×
UNCOV
1003
      taosMsleep(100);
×
1004
    } else {
1005
      break;
2,525✔
1006
    }
1007
  }
1008

1009
  // let's decide which step should be executed now
1010
  if (pTask->execInfo.step1Start == 0) {
2,525✔
1011
    int64_t ts = taosGetTimestampMs();
2,349✔
1012
    pTask->execInfo.step1Start = ts;
2,349✔
1013
    tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
2,349✔
1014
  } else {
1015
    if (pTask->execInfo.step2Start == 0) {
176✔
1016
      tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
140!
1017
              id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
1018
    } else {
1019
      tqDebug("s-task:%s already in step2, no need to scan-history data, step2 startTs:%" PRId64, id,
36!
1020
              pTask->execInfo.step2Start);
1021

1022
      atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
36✔
1023
      streamMetaReleaseTask(pMeta, pTask);
36✔
1024
      return 0;
36✔
1025
    }
1026
  }
1027

1028
  // we have to continue retrying to successfully execute the scan history task.
1029
  if (!streamTaskSetSchedStatusWait(pTask)) {
2,489!
1030
    tqError(
×
1031
        "s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
1032
        "sched-status:%d",
1033
        id, pTask->status.schedStatus);
UNCOV
1034
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
×
UNCOV
1035
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1036
    return 0;
×
1037
  }
1038

1039
  int64_t              st = taosGetTimestampMs();
2,489✔
1040
  SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
2,489✔
1041

1042
  double el = (taosGetTimestampMs() - st) / 1000.0;
2,489✔
1043
  pTask->execInfo.step1El += el;
2,489✔
1044

1045
  if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
2,489✔
1046
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
142✔
1047
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
142✔
1048

1049
    if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
142✔
1050
      streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
140✔
1051
    } else {
1052
      SStreamTaskState p = streamTaskGetStatus(pTask);
2✔
1053
      ETaskStatus      localStatus = p.state;
2✔
1054

1055
      if (localStatus == TASK_STATUS__PAUSE) {
2!
UNCOV
1056
        tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", id, el,
×
1057
                pTask->execInfo.step1El, status);
1058
      } else if (localStatus == TASK_STATUS__STOP || localStatus == TASK_STATUS__DROPPING) {
2!
1059
        tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", id, p.name,
2!
1060
                pTask->execInfo.step1El);
1061
      }
1062
    }
1063

1064
    streamMetaReleaseTask(pMeta, pTask);
142✔
1065
    return 0;
142✔
1066
  }
1067

1068
  // the following procedure should be executed, no matter status is stop/pause or not
1069
  if (taskType == STREAM_HISTORY_TASK) {
2,347!
1070
    tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
2,347✔
UNCOV
1071
  } else if (taskType == STREAM_RECALCUL_TASK) {
×
UNCOV
1072
    tqDebug("s-task:%s recalculate ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
×
1073
  } else {
UNCOV
1074
    tqError("s-task:%s fill-history is disabled, unexpected", id);
×
UNCOV
1075
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1076
  }
1077

1078
  // 1. get the related stream task
1079
  code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,347✔
1080
  if (pStreamTask == NULL) {
2,347!
UNCOV
1081
    tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
×
1082
            pTask->streamTaskId.taskId, pTask->id.idStr);
1083

UNCOV
1084
    tqDebug("s-task:%s fill-history task set status to be dropping", id);
×
UNCOV
1085
    code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
×
1086

1087
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
×
1088
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1089
    return code;
×
1090
  }
1091

1092
  if (pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) {
2,347!
UNCOV
1093
    tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel);
×
UNCOV
1094
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1095
  }
1096

1097
  if (taskType == STREAM_HISTORY_TASK) {
2,347!
1098
    code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
2,347✔
UNCOV
1099
  } else if (taskType == STREAM_RECALCUL_TASK) {
×
1100
    // send recalculate end block
UNCOV
1101
    code = streamCreateAddRecalculateEndBlock(pStreamTask);
×
UNCOV
1102
    if (code) {
×
UNCOV
1103
      tqError("s-task:%s failed to create-add recalculate end block, code:%s", id, tstrerror(code));
×
1104
    }
UNCOV
1105
    streamTaskSetSchedStatusInactive(pTask);
×
1106
  }
1107

1108
  streamMetaReleaseTask(pMeta, pStreamTask);
2,347✔
1109

1110
  atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
2,347✔
1111
  streamMetaReleaseTask(pMeta, pTask);
2,347✔
1112
  return code;
2,347✔
1113
}
1114

1115
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
385,568✔
1116
  int32_t  code = 0;
385,568✔
1117
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
385,568✔
1118
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
385,568✔
1119
  SDecoder decoder;
1120

1121
  SStreamTaskRunReq req = {0};
385,568✔
1122
  tDecoderInit(&decoder, (uint8_t*)msg, len);
385,568✔
1123
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
385,521!
UNCOV
1124
    tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
×
UNCOV
1125
    tDecoderClear(&decoder);
×
UNCOV
1126
    return TSDB_CODE_SUCCESS;
×
1127
  }
1128

1129
  tDecoderClear(&decoder);
385,432✔
1130

1131
  // extracted submit data from wal files for all tasks
1132
  if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
385,548✔
1133
    return tqScanWal(pTq);
288,985✔
1134
  } else {
1135
    code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
96,563✔
1136
    if (code) {
96,553✔
1137
      tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
13!
1138
    }
1139

1140
    return code;
96,557✔
1141
  }
1142
}
1143

1144
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
58,512✔
1145
  return tqStreamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg);
58,512✔
1146
}
1147

1148
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
58,445✔
1149
  return tqStreamTaskProcessDispatchRsp(pTq->pStreamMeta, pMsg);
58,445✔
1150
}
1151

1152
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
6,943✔
1153
  return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen);
6,943✔
1154
}
1155

1156
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
5,574✔
1157
  return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg);
5,574✔
1158
}
1159

1160
int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg) {
228✔
1161
  return tqStreamTaskProcessConsenChkptIdReq(pTq->pStreamMeta, pMsg);
228✔
1162
}
1163

1164
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,395✔
1165
  return tqStreamTaskProcessTaskPauseReq(pTq->pStreamMeta, msg);
1,395✔
1166
}
1167

1168
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
2,558✔
1169
  return tqStreamTaskProcessTaskResumeReq(pTq, sversion, msg, true);
2,558✔
1170
}
1171

1172
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
534✔
1173
  return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
534✔
1174
}
1175

1176
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; }
420✔
1177

1178
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
×
UNCOV
1179
  char*               msgStr = pMsg->pCont;
×
1180
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
×
1181
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
×
1182
  int32_t             code = 0;
×
1183
  SStreamProgressReq  req;
1184
  char*               pRspBuf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(SStreamProgressRsp));
×
1185
  SStreamProgressRsp* pRsp = POINTER_SHIFT(pRspBuf, sizeof(SMsgHead));
×
1186
  if (!pRspBuf) {
×
1187
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1188
    code = -1;
×
UNCOV
1189
    goto _OVER;
×
1190
  }
1191

1192
  code = tDeserializeStreamProgressReq(msgBody, msgLen, &req);
×
1193
  if (code == TSDB_CODE_SUCCESS) {
×
1194
    code = tqGetStreamExecInfo(pTq->pVnode, req.streamId, &pRsp->progressDelay, &pRsp->fillHisFinished);
×
1195
  }
UNCOV
1196
  if (code == TSDB_CODE_SUCCESS) {
×
1197
    pRsp->fetchIdx = req.fetchIdx;
×
1198
    pRsp->subFetchIdx = req.subFetchIdx;
×
1199
    pRsp->vgId = req.vgId;
×
UNCOV
1200
    pRsp->streamId = req.streamId;
×
1201
    code = tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp);
×
UNCOV
1202
    if (code) {
×
UNCOV
1203
      goto _OVER;
×
1204
    }
1205

UNCOV
1206
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
×
UNCOV
1207
    rsp.pCont = pRspBuf;
×
UNCOV
1208
    pRspBuf = NULL;
×
UNCOV
1209
    rsp.contLen = sizeof(SMsgHead) + sizeof(SStreamProgressRsp);
×
UNCOV
1210
    tmsgSendRsp(&rsp);
×
1211
  }
1212

UNCOV
1213
_OVER:
×
UNCOV
1214
  if (pRspBuf) {
×
UNCOV
1215
    taosMemoryFree(pRspBuf);
×
1216
  }
UNCOV
1217
  return code;
×
1218
}
1219

1220
// always return success to mnode
1221
//todo: handle failure of build and send msg to mnode
1222
static void doSendChkptSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, int32_t code,
63✔
1223
                                 int32_t taskId) {
1224
  SRpcMsg rsp = {0};
63✔
1225
  int32_t ret = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &rsp, code);
63✔
1226
  if (ret) {  // suppress the error in build checkpoint source rsp
63!
UNCOV
1227
    tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", taskId, tstrerror(ret));
×
1228
  }
1229
  tmsgSendRsp(&rsp);  // error occurs
63✔
1230
}
63✔
1231

1232
// no matter what kinds of error happened, make sure the mnode will receive the success execution code.
1233
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
3,352✔
1234
  int32_t                    vgId = TD_VID(pTq->pVnode);
3,352✔
1235
  SStreamMeta*               pMeta = pTq->pStreamMeta;
3,352✔
1236
  char*                      msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
3,352✔
1237
  int32_t                    len = pMsg->contLen - sizeof(SMsgHead);
3,352✔
1238
  int32_t                    code = 0;
3,352✔
1239
  SStreamCheckpointSourceReq req = {0};
3,352✔
1240
  SDecoder                   decoder = {0};
3,352✔
1241
  SStreamTask*               pTask = NULL;
3,352✔
1242
  int64_t                    checkpointId = 0;
3,352✔
1243

1244
  // disable auto rsp to mnode
1245
  pRsp->info.handle = NULL;
3,352✔
1246

1247
  tDecoderInit(&decoder, (uint8_t*)msg, len);
3,352✔
1248
  if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
3,357!
UNCOV
1249
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
UNCOV
1250
    tDecoderClear(&decoder);
×
UNCOV
1251
    tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
×
UNCOV
1252
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
UNCOV
1253
    return TSDB_CODE_SUCCESS;  // always return success to mnode,
×
1254
  }
1255

1256
  tDecoderClear(&decoder);
3,352✔
1257

1258
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
3,350✔
1259
    tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
15!
1260
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
15✔
1261
    return TSDB_CODE_SUCCESS;  // always return success to mnode
15✔
1262
  }
1263

1264
  if (!pTq->pVnode->restored) {
3,337✔
1265
    tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64
48!
1266
            ", transId:%d s-task:0x%x ignore it",
1267
            vgId, req.checkpointId, req.transId, req.taskId);
1268
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
48✔
1269
    return TSDB_CODE_SUCCESS;  // always return success to mnode
48✔
1270
  }
1271

1272
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
3,289✔
1273
  if (pTask == NULL || code != 0) {
3,289!
1274
    tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
1!
1275
            " transId:%d it may have been destroyed",
1276
            vgId, req.taskId, req.checkpointId, req.transId);
1277
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
1✔
UNCOV
1278
    return TSDB_CODE_SUCCESS;
×
1279
  }
1280

1281
  if (pTask->status.downstreamReady != 1) {
3,288!
1282
    // record the latest failed checkpoint id
1283
    streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId);
×
UNCOV
1284
    tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
×
1285
            ", transId:%d set it failed",
1286
            pTask->id.idStr, req.checkpointId, req.transId);
1287

1288
    streamMetaReleaseTask(pMeta, pTask);
×
1289
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
UNCOV
1290
    return TSDB_CODE_SUCCESS;  // todo retry handle error
×
1291
  }
1292

1293
  // todo save the checkpoint failed info
1294
  streamMutexLock(&pTask->lock);
3,288✔
1295
  ETaskStatus status = streamTaskGetStatus(pTask).state;
3,291✔
1296

1297
  if (req.mndTrigger == 1) {
3,289✔
1298
    if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
1,131!
UNCOV
1299
      tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure",
×
1300
              pTask->id.idStr, req.checkpointId);
1301

1302
      streamMutexUnlock(&pTask->lock);
×
UNCOV
1303
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1304
      doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
UNCOV
1305
      return TSDB_CODE_SUCCESS;
×
1306
    }
1307
  } else {
1308
    if (status != TASK_STATUS__HALT) {
2,158!
1309
      tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
×
1310
      //      streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
1311
    }
1312
  }
1313

1314
  // check if the checkpoint msg already sent or not.
1315
  if (status == TASK_STATUS__CK) {
3,284!
1316
    streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId);
×
1317

1318
    tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1319
           " transId:%d already handled, ignore msg and continue process checkpoint",
1320
           pTask->id.idStr, checkpointId, req.transId);
1321

UNCOV
1322
    streamMutexUnlock(&pTask->lock);
×
UNCOV
1323
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1324
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SYN_PROPOSE_NOT_READY, req.taskId);
×
UNCOV
1325
    return TSDB_CODE_SUCCESS;
×
1326
  } else {  // checkpoint already finished, and not in checkpoint status
1327
    if (req.checkpointId <= pTask->chkInfo.checkpointId) {
3,284!
UNCOV
1328
      tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1329
             " transId:%d already handled, return success",
1330
             pTask->id.idStr, req.checkpointId, req.transId);
1331

UNCOV
1332
      streamMutexUnlock(&pTask->lock);
×
UNCOV
1333
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
1334
      doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
UNCOV
1335
      return TSDB_CODE_SUCCESS;
×
1336
    }
1337
  }
1338

1339
  code = streamProcessCheckpointSourceReq(pTask, &req);
3,284✔
1340
  streamMutexUnlock(&pTask->lock);
3,293✔
1341

1342
  if (code) {
3,289!
UNCOV
1343
    qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
×
1344
           tstrerror(code));
UNCOV
1345
    streamMetaReleaseTask(pMeta, pTask);
×
1346
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1347
    return TSDB_CODE_SUCCESS;
×
1348
  }
1349

1350
  if (req.mndTrigger) {
3,289✔
1351
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ",
1,137!
1352
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
1353
  } else {
1354
    const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask));
2,152✔
1355
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64
2,152!
1356
           ", transId:%d after transfer-state, prev status:%s",
1357
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
1358
  }
1359

1360
  code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
3,295✔
1361
  if (code != TSDB_CODE_SUCCESS) {
3,296!
1362
    streamTaskSetCheckpointFailed(pTask);  // set the checkpoint failed
×
UNCOV
1363
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1364
  }
1365

1366
  streamMetaReleaseTask(pMeta, pTask);
3,296✔
1367
  return TSDB_CODE_SUCCESS;
3,295✔
1368
}
1369

1370
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
1371
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
7,978✔
1372
  int32_t vgId = TD_VID(pTq->pVnode);
7,978✔
1373

1374
  SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont;
7,978✔
1375
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
7,978!
UNCOV
1376
    tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
×
1377
            (int32_t)pReq->downstreamTaskId);
UNCOV
1378
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1379
  }
1380

1381
  return tqStreamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg);
7,978✔
1382
}
1383

1384
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
85✔
1385
  return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg,
171✔
1386
                                      pTq->pVnode->restored, (pTq->pStreamMeta->role == NODE_ROLE_LEADER));
85✔
1387
}
1388

1389
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
×
UNCOV
1390
  return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont);
×
1391
}
1392

1393
int32_t tqProcessAllTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {
4,115✔
1394
  return tqStreamTaskProcessAllTaskStopReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg);
4,115✔
1395
}
1396

1397
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
×
UNCOV
1398
  int32_t vgId = TD_VID(pTq->pVnode);
×
1399

UNCOV
1400
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
×
1401
    SRetrieveChkptTriggerReq req = {0};
×
1402

UNCOV
1403
    char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1404
    int32_t  len = pMsg->contLen - sizeof(SMsgHead);
×
UNCOV
1405
    SDecoder decoder = {0};
×
1406

1407
    tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1408
    if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
UNCOV
1409
      tDecoderClear(&decoder);
×
UNCOV
1410
      tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId);
×
UNCOV
1411
      return TSDB_CODE_INVALID_MSG;
×
1412
    }
UNCOV
1413
    tDecoderClear(&decoder);
×
1414

UNCOV
1415
    tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId,
×
1416
            req.downstreamTaskId);
UNCOV
1417
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1418
  }
1419

UNCOV
1420
  return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg);
×
1421
}
1422

UNCOV
1423
int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) {
×
UNCOV
1424
  return tqStreamTaskProcessRetrieveTriggerRsp(pTq->pStreamMeta, pMsg);
×
1425
}
1426

1427
// this function is needed, do not try to remove it.
1428
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); }
27,768✔
1429

1430
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
4,383✔
1431
  return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
4,383✔
1432
}
1433

1434
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
7,976✔
1435
  return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg);
7,976✔
1436
}
1437

1438
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
6,255✔
1439
  return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
6,255✔
1440
}
1441

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc