• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3559

18 Dec 2024 12:59AM UTC coverage: 59.805% (+0.03%) from 59.778%
#3559

push

travis-ci

web-flow
Merge pull request #29187 from taosdata/merge/mainto3.0

merge: main to 3.0 branch

132705 of 287544 branches covered (46.15%)

Branch coverage included in aggregate %.

87 of 95 new or added lines in 19 files covered. (91.58%)

1132 existing lines in 133 files now uncovered.

209591 of 284807 relevant lines covered (73.59%)

8125235.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.0
/source/dnode/vnode/src/tq/tq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "osDef.h"
18
#include "taoserror.h"
19
#include "tqCommon.h"
20
#include "tstream.h"
21
#include "vnd.h"
22

23
// 0: not init
24
// 1: already inited
25
// 2: wait to be inited or cleanup
26
static int32_t tqInitialize(STQ* pTq);
27

28
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
816✔
29
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
67,908✔
30
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
67,916✔
31

32
void tqDestroyTqHandle(void* data) {
1,567✔
33
  STqHandle* pData = (STqHandle*)data;
1,567✔
34
  qDestroyTask(pData->execHandle.task);
1,567✔
35

36
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,565✔
37
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
1,217!
38
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
348✔
39
    tqReaderClose(pData->execHandle.pTqReader);
261✔
40
    walCloseReader(pData->pWalReader);
261✔
41
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
261✔
42
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
87!
43
    walCloseReader(pData->pWalReader);
87✔
44
    tqReaderClose(pData->execHandle.pTqReader);
87✔
45
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
87!
46
    nodesDestroyNode(pData->execHandle.execTb.node);
87✔
47
  }
48
  if (pData->msg != NULL) {
1,565!
49
    rpcFreeCont(pData->msg->pCont);
×
50
    taosMemoryFree(pData->msg);
×
51
    pData->msg = NULL;
×
52
  }
53
  if (pData->block != NULL) {
1,565!
54
    blockDataDestroy(pData->block);
×
55
  }
56
  if (pData->pRef) {
1,565✔
57
    walCloseRef(pData->pRef->pWal, pData->pRef->refId);
1,522✔
58
  }
59
}
1,566✔
60

61
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
5,830✔
62
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
11,359✔
63
         pLeft->val.version == pRight->val.version;
5,529✔
64
}
65

66
int32_t tqOpen(const char* path, SVnode* pVnode) {
10,042✔
67
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
10,042!
68
  if (pTq == NULL) {
10,046!
69
    return terrno;
×
70
  }
71
  pVnode->pTq = pTq;
10,046✔
72
  pTq->path = taosStrdup(path);
10,046!
73
  if (pTq->path == NULL) {
10,045!
74
    return terrno;
×
75
  }
76
  pTq->pVnode = pVnode;
10,045✔
77

78
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
10,045✔
79
  if (pTq->pHandle == NULL) {
10,046!
80
    return terrno;
×
81
  }
82
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
10,046✔
83

84
  taosInitRWLatch(&pTq->lock);
10,049✔
85

86
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
10,047✔
87
  if (pTq->pPushMgr == NULL) {
10,048!
88
    return terrno;
×
89
  }
90

91
  pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
10,048✔
92
  if (pTq->pCheckInfo == NULL) {
10,049!
93
    return terrno;
×
94
  }
95
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
10,049✔
96

97
  pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK);
10,049✔
98
  if (pTq->pOffset == NULL) {
10,049!
99
    return terrno;
×
100
  }
101
  taosHashSetFreeFp(pTq->pOffset, (FDelete)tDeleteSTqOffset);
10,049✔
102

103
  return tqInitialize(pTq);
10,049✔
104
}
105

106
int32_t tqInitialize(STQ* pTq) {
10,049✔
107
  int32_t vgId = TD_VID(pTq->pVnode);
10,049✔
108
  int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1,
10,049✔
109
                                tqStartTaskCompleteCallback, &pTq->pStreamMeta);
110
  if (code != TSDB_CODE_SUCCESS) {
10,052!
111
    return code;
×
112
  }
113

114
  streamMetaLoadAllTasks(pTq->pStreamMeta);
10,052✔
115
  return tqMetaOpen(pTq);
10,052✔
116
}
117

118
void tqClose(STQ* pTq) {
10,052✔
119
  qDebug("start to close tq");
10,052✔
120
  if (pTq == NULL) {
10,052!
121
    return;
×
122
  }
123

124
  void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
10,052✔
125
  while (pIter) {
10,078✔
126
    STqHandle* pHandle = *(STqHandle**)pIter;
26✔
127
    int32_t    vgId = TD_VID(pTq->pVnode);
26✔
128

129
    if (pHandle->msg != NULL) {
26!
130
      tqPushEmptyDataRsp(pHandle, vgId);
26✔
131
      rpcFreeCont(pHandle->msg->pCont);
26✔
132
      taosMemoryFree(pHandle->msg);
26!
133
      pHandle->msg = NULL;
26✔
134
    }
135
    pIter = taosHashIterate(pTq->pPushMgr, pIter);
26✔
136
  }
137

138
  taosHashCleanup(pTq->pHandle);
10,052✔
139
  taosHashCleanup(pTq->pPushMgr);
10,052✔
140
  taosHashCleanup(pTq->pCheckInfo);
10,052✔
141
  taosHashCleanup(pTq->pOffset);
10,052✔
142
  taosMemoryFree(pTq->path);
10,052!
143
  tqMetaClose(pTq);
10,052✔
144

145
  int32_t vgId = pTq->pStreamMeta->vgId;
10,051✔
146
  streamMetaClose(pTq->pStreamMeta);
10,051✔
147

148
  qDebug("vgId:%d end to close tq", vgId);
10,052✔
149
  taosMemoryFree(pTq);
10,052!
150
}
151

152
void tqNotifyClose(STQ* pTq) {
10,051✔
153
  if (pTq == NULL) {
10,051!
154
    return;
×
155
  }
156
  streamMetaNotifyClose(pTq->pStreamMeta);
10,051✔
157
}
158

159
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
644✔
160
  int32_t    code = 0;
644✔
161
  SMqPollReq req = {0};
644✔
162
  code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req);
644✔
163
  if (code < 0) {
644!
164
    tqError("tDeserializeSMqPollReq %d failed, code:%d", pHandle->msg->contLen, code);
×
165
    return;
×
166
  }
167

168
  SMqDataRsp dataRsp = {0};
644✔
169
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
644✔
170
  if (code != 0) {
644!
171
    tqError("tqInitDataRsp failed, code:%d", code);
×
172
    return;
×
173
  }
174
  dataRsp.blockNum = 0;
644✔
175
  char buf[TSDB_OFFSET_LEN] = {0};
644✔
176
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
644✔
177
  tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s,QID:0x%" PRIx64, req.consumerId, vgId, buf,
644!
178
         req.reqId);
179

180
  code = tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
644✔
181
  if (code != 0) {
644!
182
    tqError("tqSendDataRsp failed, code:%d", code);
×
183
  }
184
  tDeleteMqDataRsp(&dataRsp);
644✔
185
}
186

187
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type,
41,484✔
188
                      int32_t vgId) {
189
  int64_t sver = 0, ever = 0;
41,484✔
190
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
41,484✔
191

192
  char buf1[TSDB_OFFSET_LEN] = {0};
41,483✔
193
  char buf2[TSDB_OFFSET_LEN] = {0};
41,483✔
194
  (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
41,483✔
195
  (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
41,484✔
196

197
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s,QID:0x%" PRIx64,
41,483!
198
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
199

200
  return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
41,483✔
201
}
202

203
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
6,699✔
204
  SMqVgOffset vgOffset = {0};
6,699✔
205
  int32_t     vgId = TD_VID(pTq->pVnode);
6,699✔
206

207
  int32_t  code = 0;
6,699✔
208
  SDecoder decoder;
209
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
6,699✔
210
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
6,708!
211
    code = TSDB_CODE_INVALID_MSG;
×
212
    goto end;
×
213
  }
214

215
  tDecoderClear(&decoder);
6,711✔
216

217
  STqOffset* pOffset = &vgOffset.offset;
6,699✔
218

219
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
6,699!
220
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
361!
221
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
222
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
6,338✔
223
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
6,332!
224
            pOffset->val.version);
225
  } else {
226
    tqError("invalid commit offset type:%d", pOffset->val.type);
6!
227
    code = TSDB_CODE_INVALID_MSG;
×
228
    goto end;
×
229
  }
230

231
  STqOffset* pSavedOffset = NULL;
6,707✔
232
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
6,707✔
233
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
6,716✔
234
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
2!
235
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
236
    goto end;  // no need to update the offset value
2✔
237
  }
238

239
  // save the new offset value
240
  if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))) {
6,714!
241
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
242
    return -1;
×
243
  }
244

245
  if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg,
6,713!
246
                     msgLen - sizeof(vgOffset.consumerId)) < 0) {
6,714✔
UNCOV
247
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
248
    return -1;
×
249
  }
250

251
  return 0;
6,713✔
252
end:
2✔
253
  tOffsetDestroy(&vgOffset.offset.val);
2✔
254
  return code;
2✔
255
}
256

257
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
28✔
258
  SMqSeekReq req = {0};
28✔
259
  int32_t    vgId = TD_VID(pTq->pVnode);
28✔
260
  SRpcMsg    rsp = {.info = pMsg->info};
28✔
261
  int        code = 0;
28✔
262

263
  if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
28!
264
    code = TSDB_CODE_OUT_OF_MEMORY;
×
265
    goto end;
×
266
  }
267

268
  tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
28!
269
  taosWLockLatch(&pTq->lock);
28✔
270

271
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
28✔
272
  if (pHandle == NULL) {
28!
273
    tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
×
274
    code = 0;
×
275
    taosWUnLockLatch(&pTq->lock);
×
276
    goto end;
×
277
  }
278

279
  // 2. check consumer-vg assignment status
280
  if (pHandle->consumerId != req.consumerId) {
28!
281
    tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
282
            req.consumerId, vgId, req.subKey, pHandle->consumerId);
283
    taosWUnLockLatch(&pTq->lock);
×
284
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
285
    goto end;
×
286
  }
287

288
  // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
289
  // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
290
  tqUnregisterPushHandle(pTq, pHandle);
28✔
291
  taosWUnLockLatch(&pTq->lock);
28✔
292

293
end:
28✔
294
  rsp.code = code;
28✔
295
  tmsgSendRsp(&rsp);
28✔
296
  return 0;
28✔
297
}
298

299
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
130✔
300
  void* pIter = NULL;
130✔
301

302
  while (1) {
11✔
303
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
141✔
304
    if (pIter == NULL) {
141✔
305
      break;
95✔
306
    }
307

308
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
46✔
309

310
    if (pCheck->ntbUid == tbUid) {
46!
311
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
46✔
312
      for (int32_t i = 0; i < sz; i++) {
168✔
313
        int16_t* pForbidColId = taosArrayGet(pCheck->colIdList, i);
157✔
314
        if (pForbidColId == NULL) {
157!
315
          continue;
×
316
        }
317

318
        if ((*pForbidColId) == colId) {
157✔
319
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
35✔
320
          return -1;
35✔
321
        }
322
      }
323
    }
324
  }
325

326
  return 0;
95✔
327
}
328

329
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
24,226✔
330
  int32_t vgId = TD_VID(pTq->pVnode);
24,226✔
331
  taosWLockLatch(&pTq->lock);
24,226✔
332
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
24,226!
333
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
24,226✔
334

335
    while (pIter) {
49,162✔
336
      STqHandle* pHandle = *(STqHandle**)pIter;
24,936✔
337
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
24,936!
338

339
      if (pHandle->msg == NULL) {
24,936!
340
        tqError("pHandle->msg should not be null");
×
341
        taosHashCancelIterate(pTq->pPushMgr, pIter);
×
342
        break;
×
343
      } else {
344
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME,
24,936✔
345
                       .pCont = pHandle->msg->pCont,
24,936✔
346
                       .contLen = pHandle->msg->contLen,
24,936✔
347
                       .info = pHandle->msg->info};
24,936✔
348
        if (tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg) != 0){
24,936!
349
          tqError("vgId:%d tmsgPutToQueue failed, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
×
350
        }
351
        taosMemoryFree(pHandle->msg);
24,936!
352
        pHandle->msg = NULL;
24,936✔
353
      }
354

355
      pIter = taosHashIterate(pTq->pPushMgr, pIter);
24,936✔
356
    }
357

358
    taosHashClear(pTq->pPushMgr);
24,226✔
359
  }
360
  taosWUnLockLatch(&pTq->lock);
24,226✔
361
  return 0;
24,226✔
362
}
363

364
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
67,934✔
365
  SMqPollReq req = {0};
67,934✔
366
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
67,934✔
367
  if (code < 0) {
67,925!
368
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
369
    terrno = TSDB_CODE_INVALID_MSG;
×
370
    goto END;
×
371
  }
372

373
  int64_t      consumerId = req.consumerId;
67,936✔
374
  int32_t      reqEpoch = req.epoch;
67,936✔
375
  STqOffsetVal reqOffset = req.reqOffset;
67,936✔
376
  int32_t      vgId = TD_VID(pTq->pVnode);
67,936✔
377
  STqHandle*   pHandle = NULL;
67,936✔
378

379
  while (1) {
6✔
380
    taosWLockLatch(&pTq->lock);
67,942✔
381
    // 1. find handle
382
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
67,945✔
383
    if (code != TDB_CODE_SUCCESS) {
67,930✔
384
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
18!
385
      terrno = TSDB_CODE_INVALID_MSG;
18✔
386
      taosWUnLockLatch(&pTq->lock);
18✔
387
      return -1;
18✔
388
    }
389

390
    // 2. check rebalance status
391
    if (pHandle->consumerId != consumerId) {
67,912✔
392
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
6!
393
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
394
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
395
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
6✔
396
      taosWUnLockLatch(&pTq->lock);
6✔
397
      code = -1;
6✔
398
      goto END;
6✔
399
    }
400

401
    bool exec = tqIsHandleExec(pHandle);
67,906✔
402
    if (!exec) {
67,906!
403
      tqSetHandleExec(pHandle);
67,908✔
404
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
405
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
67,908!
406
              req.subKey, pHandle);
407
      taosWUnLockLatch(&pTq->lock);
67,910✔
408
      break;
67,918✔
409
    }
410
    taosWUnLockLatch(&pTq->lock);
×
411

412
    tqDebug("tmq poll: consumer:0x%" PRIx64
6!
413
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
414
            consumerId, vgId, req.subKey, pHandle);
415
    taosMsleep(10);
6✔
416
  }
417

418
  // 3. update the epoch value
419
  if (pHandle->epoch < reqEpoch) {
67,918✔
420
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
1,367!
421
            reqEpoch);
422
    pHandle->epoch = reqEpoch;
1,367✔
423
  }
424

425
  char buf[TSDB_OFFSET_LEN] = {0};
67,918✔
426
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
67,918✔
427
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s,QID:0x%" PRIx64,
67,918!
428
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
429

430
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
67,918✔
431
  tqSetHandleIdle(pHandle);
67,916✔
432

433
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
67,916✔
434
          req.subKey, pHandle);
435

436
END:
1✔
437
  tDestroySMqPollReq(&req);
67,925✔
438
  return code;
67,922✔
439
}
440

441
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
2✔
442
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
2✔
443
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
2✔
444

445
  SMqVgOffset vgOffset = {0};
2✔
446

447
  SDecoder decoder;
448
  tDecoderInit(&decoder, (uint8_t*)data, len);
2✔
449
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
2!
450
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
451
    return terrno;
×
452
  }
453

454
  tDecoderClear(&decoder);
2✔
455

456
  STqOffset* pSavedOffset = NULL;
2✔
457
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
2✔
458
  if (code != 0) {
2✔
459
    return TSDB_CODE_TMQ_NO_COMMITTED;
1✔
460
  }
461
  vgOffset.offset = *pSavedOffset;
1✔
462

463
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
1!
464
  if (code < 0) {
1!
465
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
466
  }
467

468
  void* buf = rpcMallocCont(len);
1✔
469
  if (buf == NULL) {
1!
470
    return terrno;
×
471
  }
472
  SEncoder encoder = {0};
1✔
473
  tEncoderInit(&encoder, buf, len);
1✔
474
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
1✔
475
  tEncoderClear(&encoder);
1✔
476
  if (code < 0) {
1!
477
    rpcFreeCont(buf);
×
478
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
479
  }
480

481
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
1✔
482

483
  tmsgSendRsp(&rsp);
1✔
484
  return 0;
1✔
485
}
486

487
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
13✔
488
  int32_t    code = 0;
13✔
489
  SMqPollReq req = {0};
13✔
490
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
13!
491
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
492
    return TSDB_CODE_INVALID_MSG;
×
493
  }
494

495
  int64_t      consumerId = req.consumerId;
13✔
496
  STqOffsetVal reqOffset = req.reqOffset;
13✔
497
  int32_t      vgId = TD_VID(pTq->pVnode);
13✔
498

499
  // 1. find handle
500
  taosRLockLatch(&pTq->lock);
13✔
501
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
13✔
502
  if (pHandle == NULL) {
13!
503
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
504
    taosRUnLockLatch(&pTq->lock);
×
505
    return TSDB_CODE_INVALID_MSG;
×
506
  }
507

508
  // 2. check rebalance status
509
  if (pHandle->consumerId != consumerId) {
13!
510
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
511
            consumerId, vgId, req.subKey, pHandle->consumerId);
512
    taosRUnLockLatch(&pTq->lock);
×
513
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
514
  }
515

516
  int64_t sver = 0, ever = 0;
13✔
517
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
13✔
518
  taosRUnLockLatch(&pTq->lock);
13✔
519

520
  SMqDataRsp dataRsp = {0};
13✔
521
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
13✔
522
  if (code != 0) {
13!
523
    return code;
×
524
  }
525

526
  if (req.useSnapshot == true) {
13!
527
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
528
    code = TSDB_CODE_INVALID_PARA;
×
529
    goto END;
×
530
  }
531

532
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
13✔
533

534
  if (reqOffset.type == TMQ_OFFSET__LOG) {
13✔
535
    dataRsp.rspOffset.version = reqOffset.version;
3✔
536
  } else if (reqOffset.type < 0) {
10!
537
    STqOffset* pOffset = NULL;
10✔
538
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
10✔
539
    if (code == 0) {
10✔
540
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
1!
541
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
542
        code = TSDB_CODE_INVALID_PARA;
×
543
        goto END;
×
544
      }
545

546
      dataRsp.rspOffset.version = pOffset->val.version;
1✔
547
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
1!
548
             req.subKey, dataRsp.rspOffset.version);
549
    } else {
550
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
9✔
551
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
8✔
552
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
1!
553
        dataRsp.rspOffset.version = ever;
1✔
554
      }
555
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
9!
556
             dataRsp.rspOffset.version);
557
    }
558
  } else {
559
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
560
            reqOffset.type);
561
    code = TSDB_CODE_INVALID_PARA;
×
562
    goto END;
×
563
  }
564

565
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
13✔
566

567
END:
13✔
568
  tDeleteMqDataRsp(&dataRsp);
13✔
569
  return code;
13✔
570
}
571

572
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
816✔
573
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
816✔
574
  int32_t        vgId = TD_VID(pTq->pVnode);
816✔
575

576
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
816!
577
  int32_t code = 0;
816✔
578

579
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
816✔
580
  if (pHandle) {
817✔
581
    while (1) {
×
582
      taosWLockLatch(&pTq->lock);
816✔
583
      bool exec = tqIsHandleExec(pHandle);
816✔
584

585
      if (exec) {
816!
586
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
×
587
               pHandle->subKey, pHandle);
588
        taosWUnLockLatch(&pTq->lock);
×
589
        taosMsleep(10);
×
590
        continue;
×
591
      }
592
      tqUnregisterPushHandle(pTq, pHandle);
816✔
593
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
816✔
594
      if (code != 0) {
816!
595
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
596
      }
597
      taosWUnLockLatch(&pTq->lock);
816✔
598
      break;
816✔
599
    }
600
  }
601

602
  taosWLockLatch(&pTq->lock);
817✔
603
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
817✔
604
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
257✔
605
  }
606
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
818✔
607
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
256!
608
  }
609

610
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
817!
611
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
612
  }
613
  taosWUnLockLatch(&pTq->lock);
817✔
614

615
  return 0;
817✔
616
}
617

618
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
116✔
619
  STqCheckInfo info = {0};
116✔
620
  int32_t      code = tqMetaDecodeCheckInfo(&info, msg, msgLen);
116✔
621
  if (code != 0) {
117!
622
    return code;
×
623
  }
624

625
  code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
117✔
626
  if (code != 0) {
117!
627
    tDeleteSTqCheckInfo(&info);
×
628
    return code;
×
629
  }
630

631
  return tqMetaSaveInfo(pTq, pTq->pCheckStore, info.topic, strlen(info.topic), msg, msgLen);
117✔
632
}
633

634
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
12✔
635
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
12✔
636
    return TSDB_CODE_TSC_INTERNAL_ERROR;
2✔
637
  }
638
  return tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, strlen(msg));
10✔
639
}
640

641
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
2,949✔
642
  int         ret = 0;
2,949✔
643
  SMqRebVgReq req = {0};
2,949✔
644
  SDecoder    dc = {0};
2,949✔
645

646
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
2,949✔
647
  ret = tDecodeSMqRebVgReq(&dc, &req);
2,949✔
648
  if (ret < 0) {
2,954!
649
    goto end;
×
650
  }
651

652
  tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
2,954✔
653
         req.oldConsumerId, req.newConsumerId);
654

655
  taosRLockLatch(&pTq->lock);
2,960✔
656
  STqHandle* pHandle = NULL;
2,955✔
657
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
2,955✔
658
  if (code != 0){
2,955✔
659
    tqInfo("vgId:%d, tq process sub req:%s, no such handle, create new one", pTq->pVnode->config.vgId, req.subKey);
1,341!
660
  }
661
  taosRUnLockLatch(&pTq->lock);
2,955✔
662
  if (pHandle == NULL) {
2,955✔
663
    if (req.oldConsumerId != -1) {
1,341✔
664
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
2!
665
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
666
    }
667
    if (req.newConsumerId == -1) {
1,341✔
668
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
1!
669
      ret = TSDB_CODE_INVALID_PARA;
1✔
670
      goto end;
1✔
671
    }
672
    STqHandle handle = {0};
1,340✔
673
    ret = tqMetaCreateHandle(pTq, &req, &handle);
1,340✔
674
    if (ret < 0) {
1,340!
675
      tqDestroyTqHandle(&handle);
×
676
      goto end;
×
677
    }
678
    taosWLockLatch(&pTq->lock);
1,340✔
679
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
1,340✔
680
    taosWUnLockLatch(&pTq->lock);
1,339✔
681
  } else {
682
    while (1) {
×
683
      taosWLockLatch(&pTq->lock);
1,614✔
684
      bool exec = tqIsHandleExec(pHandle);
1,614✔
685
      if (exec) {
1,614!
686
        tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p",
×
687
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
688
        taosWUnLockLatch(&pTq->lock);
×
689
        taosMsleep(10);
×
690
        continue;
×
691
      }
692
      if (pHandle->consumerId == req.newConsumerId) {  // do nothing
1,614✔
693
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
99!
694
      } else {
695
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
1,515!
696
               req.newConsumerId);
697

698
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
1,515✔
699
        atomic_store_32(&pHandle->epoch, 0);
1,515✔
700
        tqUnregisterPushHandle(pTq, pHandle);
1,515✔
701
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
1,515✔
702
      }
703
      taosWUnLockLatch(&pTq->lock);
1,613✔
704
      break;
1,614✔
705
    }
706
  }
707

708
end:
2,955✔
709
  tDecoderClear(&dc);
2,955✔
710
  return ret;
2,955✔
711
}
712

713
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
41,226!
714

715
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
14,037✔
716
  STQ*             pTq = (STQ*)pTqObj;
14,037✔
717
  int32_t          vgId = TD_VID(pTq->pVnode);
14,037✔
718
  SCheckpointInfo* pChkInfo = NULL;
14,037✔
719

720
  tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
14,037✔
721

722
  int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer);
14,038✔
723
  if (code != TSDB_CODE_SUCCESS) {
14,042!
724
    return code;
×
725
  }
726

727
  pTask->pBackend = NULL;
14,042✔
728

729
  // sink
730
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,042✔
731
  if (pOutputInfo->type == TASK_OUTPUT__SMA) {
14,042✔
732
    pOutputInfo->smaSink.vnode = pTq->pVnode;
58✔
733
    pOutputInfo->smaSink.smaSink = smaHandleRes;
58✔
734
  } else if (pOutputInfo->type == TASK_OUTPUT__TABLE) {
13,984✔
735
    pOutputInfo->tbSink.vnode = pTq->pVnode;
6,880✔
736
    pOutputInfo->tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
6,880✔
737

738
    int32_t   ver1 = 1;
6,880✔
739
    SMetaInfo info = {0};
6,880✔
740
    code = metaGetInfo(pTq->pVnode->pMeta, pOutputInfo->tbSink.stbUid, &info, NULL);
6,880✔
741
    if (code == TSDB_CODE_SUCCESS) {
6,871✔
742
      ver1 = info.skmVer;
6,387✔
743
    }
744

745
    SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper;
6,871✔
746
    pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
6,871✔
747
    if (pOutputInfo->tbSink.pTSchema == NULL) {
6,881!
748
      return terrno;
×
749
    }
750

751
    pOutputInfo->tbSink.pTbInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
6,881✔
752
    if (pOutputInfo->tbSink.pTbInfo == NULL) {
6,876!
753
      tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
×
754
      return terrno;
×
755
    }
756

757
    tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTbInfo, freePtr);
6,876✔
758
  }
759

760
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,035✔
761
    bool scanDropCtb = pTask->subtableWithoutMd5 ? true : false;
7,070✔
762
    SWalFilterCond cond = {.deleteMsg = 1, .scanDropCtb = scanDropCtb};  // delete msg also extract from wal files
7,070✔
763
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
7,070✔
764
    if (pTask->exec.pWalReader == NULL) {
7,071!
765
      tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno));
×
766
      return terrno;
×
767
    }
768
  }
769

770
  streamTaskResetUpstreamStageInfo(pTask);
14,036✔
771

772
  pChkInfo = &pTask->chkInfo;
14,045✔
773
  tqSetRestoreVersionInfo(pTask);
14,045✔
774

775
  char*       p = streamTaskGetStatus(pTask).name;
14,042✔
776
  const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
14,039✔
777

778
  if (pTask->info.fillHistory) {
14,041✔
779
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
4,994!
780
           " nextProcessVer:%" PRId64
781
           " child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x "
782
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
783
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
784
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
785
           (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
786
  } else {
787
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
9,047!
788
           " nextProcessVer:%" PRId64
789
           " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x "
790
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
791
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
792
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
793
           (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
794

795
    if (pChkInfo->checkpointVer > pChkInfo->nextProcessVer) {
9,060!
796
      tqError("vgId:%d build stream task, s-task:%s, checkpointVer:%" PRId64 " > nextProcessVer:%" PRId64, vgId,
×
797
              pTask->id.idStr, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
798
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
799
    }
800
  }
801

802
  return 0;
14,057✔
803
}
804

805
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
20,811✔
806

807
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
20,882✔
808
  return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
20,882✔
809
}
810

811
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
13,603✔
812
  return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen,
13,631✔
813
                                      vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
13,603✔
814
}
815

816
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
2,344✔
817
  const char*    id = pTask->id.idStr;
2,344✔
818
  int64_t        nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
2,344✔
819
  SVersionRange* pStep2Range = &pTask->step2Range;
2,344✔
820
  int32_t        vgId = pTask->pMeta->vgId;
2,344✔
821

822
  // if it's an source task, extract the last version in wal.
823
  bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
2,344✔
824
  pTask->execInfo.step2Start = taosGetTimestampMs();
2,344✔
825

826
  if (done) {
2,344✔
827
    qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id,
1,504✔
828
           pStep2Range->minVer, pStep2Range->maxVer, 0.0);
829
    int32_t code = streamTaskPutTranstateIntoInputQ(pTask);  // todo: msg lost.
1,504✔
830
    if (code) {
1,504!
831
      qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
×
832
    }
833
    (void)streamExecTask(pTask);  // exec directly
1,504✔
834
  } else {
835
    STimeWindow* pWindow = &pTask->dataRange.window;
840✔
836
    tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
840✔
837
            ", do secondary scan-history from WAL after halt the related stream task:%s",
838
            id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey,
839
            pStreamTask->id.idStr);
840
    if (pTask->status.schedStatus != TASK_SCHED_STATUS__WAITING) {
840!
841
      tqError("s-task:%s level:%d unexpected sched-status:%d", id, pTask->info.taskLevel, pTask->status.schedStatus);
×
842
    }
843

844
    int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
840✔
845
    if (code) {
840!
846
      tqError("s-task:%s level:%d failed to set step2 param", id, pTask->info.taskLevel);
×
847
    }
848

849
    int64_t dstVer = pStep2Range->minVer;
840✔
850
    pTask->chkInfo.nextProcessVer = dstVer;
840✔
851

852
    walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
840✔
853
    tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
840✔
854
            pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
855

856
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
840✔
857

858
    // now the fill-history task starts to scan data from wal files.
859
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
840✔
860
    if (code == TSDB_CODE_SUCCESS) {
840!
861
      code = tqScanWalAsync(pTq, false);
840✔
862
      if (code) {
840!
863
        tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
×
864
      }
865
    }
866
  }
867
}
2,344✔
868

869
int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
2,344✔
870
  STQ* pTq = param;
2,344✔
871

872
  SStreamMeta* pMeta = pStreamTask->pMeta;
2,344✔
873
  STaskId      hId = pStreamTask->hTaskInfo.id;
2,344✔
874
  SStreamTask* pTask = NULL;
2,344✔
875
  int32_t      code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask);
2,344✔
876
  if (pTask == NULL) {
2,344!
877
    tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t)hId.taskId);
×
878
    return TSDB_CODE_SUCCESS;
×
879
  }
880

881
  doStartFillhistoryStep2(pTask, pStreamTask, pTq);
2,344✔
882

883
  streamMetaReleaseTask(pMeta, pTask);
2,344✔
884
  return TSDB_CODE_SUCCESS;
2,344✔
885
}
886

887
// this function should be executed by only one thread, so we set an sentinel to protect this function
888
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
2,564✔
889
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
2,564✔
890
  SStreamMeta*           pMeta = pTq->pStreamMeta;
2,564✔
891
  int32_t                code = TSDB_CODE_SUCCESS;
2,564✔
892
  SStreamTask*           pTask = NULL;
2,564✔
893
  SStreamTask*           pStreamTask = NULL;
2,564✔
894

895
  code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
2,564✔
896
  if (pTask == NULL) {
2,564!
897
    tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed",
×
898
            pMeta->vgId, pReq->taskId);
899
    return code;
×
900
  }
901

902
  // do recovery step1
903
  const char* id = pTask->id.idStr;
2,564✔
904
  char*       pStatus = streamTaskGetStatus(pTask).name;
2,564✔
905

906
  // avoid multi-thread exec
907
  while (1) {
×
908
    int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1);
2,564✔
909
    if (sentinel != 0) {
2,564!
910
      tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id);
×
911
      taosMsleep(100);
×
912
    } else {
913
      break;
2,564✔
914
    }
915
  }
916

917
  // let's decide which step should be executed now
918
  if (pTask->execInfo.step1Start == 0) {
2,564✔
919
    int64_t ts = taosGetTimestampMs();
2,354✔
920
    pTask->execInfo.step1Start = ts;
2,354✔
921
    tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
2,354✔
922
  } else {
923
    if (pTask->execInfo.step2Start == 0) {
210✔
924
      tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
194!
925
              id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
926
    } else {
927
      tqDebug("s-task:%s already in step2, no need to scan-history data, step2 startTs:%" PRId64, id,
16!
928
              pTask->execInfo.step2Start);
929

930
      atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
16✔
931
      streamMetaReleaseTask(pMeta, pTask);
16✔
932
      return 0;
16✔
933
    }
934
  }
935

936
  // we have to continue retrying to successfully execute the scan history task.
937
  if (!streamTaskSetSchedStatusWait(pTask)) {
2,548!
938
    tqError(
×
939
        "s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
940
        "sched-status:%d",
941
        id, pTask->status.schedStatus);
942
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
×
943
    streamMetaReleaseTask(pMeta, pTask);
×
944
    return 0;
×
945
  }
946

947
  int64_t              st = taosGetTimestampMs();
2,548✔
948
  SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
2,548✔
949

950
  double el = (taosGetTimestampMs() - st) / 1000.0;
2,548✔
951
  pTask->execInfo.step1El += el;
2,548✔
952

953
  if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
2,548✔
954
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
199✔
955
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
199✔
956

957
    if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
199✔
958
      streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
194✔
959
    } else {
960
      SStreamTaskState p = streamTaskGetStatus(pTask);
5✔
961
      ETaskStatus      s = p.state;
5✔
962

963
      if (s == TASK_STATUS__PAUSE) {
5!
964
        tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", id, el,
×
965
                pTask->execInfo.step1El, status);
966
      } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
5!
967
        tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", id, p.name,
5!
968
                pTask->execInfo.step1El);
969
      }
970
    }
971

972
    streamMetaReleaseTask(pMeta, pTask);
199✔
973
    return 0;
199✔
974
  }
975

976
  // the following procedure should be executed, no matter status is stop/pause or not
977
  tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
2,349✔
978

979
  if (pTask->info.fillHistory != 1) {
2,349!
980
    tqError("s-task:%s fill-history is disabled, unexpected", id);
×
981
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
982
  }
983

984
  // 1. get the related stream task
985
  code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,349✔
986
  if (pStreamTask == NULL) {
2,349✔
987
    tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
5!
988
            pTask->streamTaskId.taskId, pTask->id.idStr);
989

990
    tqDebug("s-task:%s fill-history task set status to be dropping", id);
5!
991
    code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
5✔
992

993
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
5✔
994
    streamMetaReleaseTask(pMeta, pTask);
5✔
995
    return code;
5✔
996
  }
997

998
  if (pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) {
2,344!
999
    tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel);
×
1000
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1001
  }
1002

1003
  code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
2,344✔
1004
  streamMetaReleaseTask(pMeta, pStreamTask);
2,344✔
1005

1006
  atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
2,344✔
1007
  streamMetaReleaseTask(pMeta, pTask);
2,344✔
1008
  return code;
2,344✔
1009
}
1010

1011
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
140,110✔
1012
  int32_t  code = 0;
140,110✔
1013
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
140,110✔
1014
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
140,110✔
1015
  SDecoder decoder;
1016

1017
  SStreamTaskRunReq req = {0};
140,110✔
1018
  tDecoderInit(&decoder, (uint8_t*)msg, len);
140,110✔
1019
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
140,174!
1020
    tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
×
1021
    tDecoderClear(&decoder);
×
1022
    return TSDB_CODE_SUCCESS;
×
1023
  }
1024

1025
  tDecoderClear(&decoder);
140,146✔
1026

1027
  // extracted submit data from wal files for all tasks
1028
  if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
140,139✔
1029
    return tqScanWal(pTq);
52,654✔
1030
  }
1031

1032
  code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
87,485✔
1033
  if (code) {
87,516✔
1034
    tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
32!
1035
    return code;
32✔
1036
  }
1037

1038
  // let's continue scan data in the wal files
1039
  if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
87,484✔
1040
    code = tqScanWalAsync(pTq, false);  // it's ok to failed
66,615✔
1041
    if (code) {
66,609!
UNCOV
1042
      tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
×
1043
    }
1044
  }
1045

1046
  return code;
87,480✔
1047
}
1048

1049
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
56,485✔
1050
  return tqStreamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg);
56,485✔
1051
}
1052

1053
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
56,497✔
1054
  return tqStreamTaskProcessDispatchRsp(pTq->pStreamMeta, pMsg);
56,497✔
1055
}
1056

1057
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
6,860✔
1058
  return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen);
6,860✔
1059
}
1060

1061
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
3,590✔
1062
  return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg);
3,590✔
1063
}
1064

1065
int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg) {
104✔
1066
  return tqStreamTaskProcessConsenChkptIdReq(pTq->pStreamMeta, pMsg);
104✔
1067
}
1068

1069
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,383✔
1070
  return tqStreamTaskProcessTaskPauseReq(pTq->pStreamMeta, msg);
1,383✔
1071
}
1072

1073
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,373✔
1074
  return tqStreamTaskProcessTaskResumeReq(pTq, sversion, msg, true);
1,373✔
1075
}
1076

1077
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
539✔
1078
  return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
539✔
1079
}
1080

1081
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; }
428✔
1082

1083
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
×
1084
  char*               msgStr = pMsg->pCont;
×
1085
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
×
1086
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
×
1087
  int32_t             code = 0;
×
1088
  SStreamProgressReq  req;
1089
  char*               pRspBuf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(SStreamProgressRsp));
×
1090
  SStreamProgressRsp* pRsp = POINTER_SHIFT(pRspBuf, sizeof(SMsgHead));
×
1091
  if (!pRspBuf) {
×
1092
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1093
    code = -1;
×
1094
    goto _OVER;
×
1095
  }
1096

1097
  code = tDeserializeStreamProgressReq(msgBody, msgLen, &req);
×
1098
  if (code == TSDB_CODE_SUCCESS) {
×
1099
    code = tqGetStreamExecInfo(pTq->pVnode, req.streamId, &pRsp->progressDelay, &pRsp->fillHisFinished);
×
1100
  }
1101
  if (code == TSDB_CODE_SUCCESS) {
×
1102
    pRsp->fetchIdx = req.fetchIdx;
×
1103
    pRsp->subFetchIdx = req.subFetchIdx;
×
1104
    pRsp->vgId = req.vgId;
×
1105
    pRsp->streamId = req.streamId;
×
1106
    code = tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp);
×
1107
    if (code) {
×
1108
      goto _OVER;
×
1109
    }
1110

1111
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
×
1112
    rsp.pCont = pRspBuf;
×
1113
    pRspBuf = NULL;
×
1114
    rsp.contLen = sizeof(SMsgHead) + sizeof(SStreamProgressRsp);
×
1115
    tmsgSendRsp(&rsp);
×
1116
  }
1117

1118
_OVER:
×
1119
  if (pRspBuf) {
×
1120
    taosMemoryFree(pRspBuf);
×
1121
  }
1122
  return code;
×
1123
}
1124

1125
// always return success to mnode
1126
//todo: handle failure of build and send msg to mnode
1127
static void doSendChkptSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, int32_t code,
69✔
1128
                                 int32_t taskId) {
1129
  SRpcMsg rsp = {0};
69✔
1130
  int32_t ret = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &rsp, code);
69✔
1131
  if (ret) {  // suppress the error in build checkpoint source rsp
69!
1132
    tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", taskId, tstrerror(ret));
×
1133
  }
1134
  tmsgSendRsp(&rsp);  // error occurs
69✔
1135
}
69✔
1136

1137
// no matter what kinds of error happened, make sure the mnode will receive the success execution code.
1138
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
2,271✔
1139
  int32_t                    vgId = TD_VID(pTq->pVnode);
2,271✔
1140
  SStreamMeta*               pMeta = pTq->pStreamMeta;
2,271✔
1141
  char*                      msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
2,271✔
1142
  int32_t                    len = pMsg->contLen - sizeof(SMsgHead);
2,271✔
1143
  int32_t                    code = 0;
2,271✔
1144
  SStreamCheckpointSourceReq req = {0};
2,271✔
1145
  SDecoder                   decoder = {0};
2,271✔
1146
  SStreamTask*               pTask = NULL;
2,271✔
1147
  int64_t                    checkpointId = 0;
2,271✔
1148

1149
  // disable auto rsp to mnode
1150
  pRsp->info.handle = NULL;
2,271✔
1151

1152
  tDecoderInit(&decoder, (uint8_t*)msg, len);
2,271✔
1153
  if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
2,271!
1154
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
1155
    tDecoderClear(&decoder);
×
1156
    tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
×
1157
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1158
    return TSDB_CODE_SUCCESS;  // always return success to mnode,
×
1159
  }
1160

1161
  tDecoderClear(&decoder);
2,269✔
1162

1163
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
2,271✔
1164
    tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
11!
1165
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
11✔
1166
    return TSDB_CODE_SUCCESS;  // always return success to mnode
11✔
1167
  }
1168

1169
  if (!pTq->pVnode->restored) {
2,260✔
1170
    tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64
58✔
1171
            ", transId:%d s-task:0x%x ignore it",
1172
            vgId, req.checkpointId, req.transId, req.taskId);
1173
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
58✔
1174
    return TSDB_CODE_SUCCESS;  // always return success to mnode
58✔
1175
  }
1176

1177
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
2,202✔
1178
  if (pTask == NULL || code != 0) {
2,203✔
1179
    tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
2!
1180
            " transId:%d it may have been destroyed",
1181
            vgId, req.taskId, req.checkpointId, req.transId);
1182
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
2✔
1183
    return TSDB_CODE_SUCCESS;
×
1184
  }
1185

1186
  if (pTask->status.downstreamReady != 1) {
2,201!
1187
    // record the latest failed checkpoint id
1188
    streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId);
×
1189
    tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
×
1190
            ", transId:%d set it failed",
1191
            pTask->id.idStr, req.checkpointId, req.transId);
1192

1193
    streamMetaReleaseTask(pMeta, pTask);
×
1194
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1195
    return TSDB_CODE_SUCCESS;  // todo retry handle error
×
1196
  }
1197

1198
  // todo save the checkpoint failed info
1199
  streamMutexLock(&pTask->lock);
2,201✔
1200
  ETaskStatus status = streamTaskGetStatus(pTask).state;
2,205✔
1201

1202
  if (req.mndTrigger == 1) {
2,202✔
1203
    if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
34!
1204
      tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure",
×
1205
              pTask->id.idStr, req.checkpointId);
1206

1207
      streamMutexUnlock(&pTask->lock);
×
1208
      streamMetaReleaseTask(pMeta, pTask);
×
1209
      doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1210
      return TSDB_CODE_SUCCESS;
×
1211
    }
1212
  } else {
1213
    if (status != TASK_STATUS__HALT) {
2,168!
1214
      tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
×
1215
      //      streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
1216
    }
1217
  }
1218

1219
  // check if the checkpoint msg already sent or not.
1220
  if (status == TASK_STATUS__CK) {
2,200!
1221
    streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId);
×
1222

1223
    tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1224
           " transId:%d already handled, ignore msg and continue process checkpoint",
1225
           pTask->id.idStr, checkpointId, req.transId);
1226

1227
    streamMutexUnlock(&pTask->lock);
×
1228
    streamMetaReleaseTask(pMeta, pTask);
×
1229
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SYN_PROPOSE_NOT_READY, req.taskId);
×
1230
    return TSDB_CODE_SUCCESS;
×
1231
  } else {  // checkpoint already finished, and not in checkpoint status
1232
    if (req.checkpointId <= pTask->chkInfo.checkpointId) {
2,200!
1233
      tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1234
             " transId:%d already handled, return success",
1235
             pTask->id.idStr, req.checkpointId, req.transId);
1236

1237
      streamMutexUnlock(&pTask->lock);
×
1238
      streamMetaReleaseTask(pMeta, pTask);
×
1239
      doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1240
      return TSDB_CODE_SUCCESS;
×
1241
    }
1242
  }
1243

1244
  code = streamProcessCheckpointSourceReq(pTask, &req);
2,200✔
1245
  streamMutexUnlock(&pTask->lock);
2,204✔
1246

1247
  if (code) {
2,204!
1248
    qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
×
1249
           tstrerror(code));
1250
    streamMetaReleaseTask(pMeta, pTask);
×
1251
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1252
    return TSDB_CODE_SUCCESS;
×
1253
  }
1254

1255
  if (req.mndTrigger) {
2,204✔
1256
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ",
34!
1257
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
1258
  } else {
1259
    const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask));
2,170✔
1260
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64
2,170!
1261
           ", transId:%d after transfer-state, prev status:%s",
1262
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
1263
  }
1264

1265
  code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
2,205✔
1266
  if (code != TSDB_CODE_SUCCESS) {
2,205!
1267
    streamTaskSetCheckpointFailed(pTask);  // set the checkpoint failed
×
1268
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1269
  }
1270

1271
  streamMetaReleaseTask(pMeta, pTask);
2,205✔
1272
  return TSDB_CODE_SUCCESS;
2,204✔
1273
}
1274

1275
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
1276
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
6,622✔
1277
  int32_t vgId = TD_VID(pTq->pVnode);
6,622✔
1278

1279
  SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont;
6,622✔
1280
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
6,622!
1281
    tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
×
1282
            (int32_t)pReq->downstreamTaskId);
1283
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1284
  }
1285

1286
  return tqStreamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg);
6,614✔
1287
}
1288

1289
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
58✔
1290
  return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored);
58✔
1291
}
1292

1293
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
×
1294
  return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont);
×
1295
}
1296

1297
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
×
1298
  int32_t vgId = TD_VID(pTq->pVnode);
×
1299

1300
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
×
1301
    SRetrieveChkptTriggerReq req = {0};
×
1302

1303
    char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1304
    int32_t  len = pMsg->contLen - sizeof(SMsgHead);
×
1305
    SDecoder decoder = {0};
×
1306

1307
    tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1308
    if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1309
      tDecoderClear(&decoder);
×
1310
      tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId);
×
1311
      return TSDB_CODE_INVALID_MSG;
×
1312
    }
1313
    tDecoderClear(&decoder);
×
1314

1315
    tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId,
×
1316
            req.downstreamTaskId);
1317
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1318
  }
1319

1320
  return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg);
×
1321
}
1322

1323
int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) {
×
1324
  return tqStreamTaskProcessRetrieveTriggerRsp(pTq->pStreamMeta, pMsg);
×
1325
}
1326

1327
// this function is needed, do not try to remove it.
1328
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); }
6,097✔
1329

1330
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
4,445✔
1331
  return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
4,445✔
1332
}
1333

1334
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
6,618✔
1335
  return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg);
6,618✔
1336
}
1337

1338
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
4,263✔
1339
  return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
4,263✔
1340
}
1341

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc