• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.15
/source/dnode/vnode/src/tq/tq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "osDef.h"
18
#include "taoserror.h"
19
#include "tqCommon.h"
20
#include "tstream.h"
21
#include "vnd.h"
22

23
// 0: not init
24
// 1: already inited
25
// 2: wait to be inited or cleanup
26
static int32_t tqInitialize(STQ* pTq);
27

28
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
866✔
29
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
71,981✔
30
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
71,987✔
31

32
void tqDestroyTqHandle(void* data) {
1,628✔
33
  STqHandle* pData = (STqHandle*)data;
1,628✔
34
  qDestroyTask(pData->execHandle.task);
1,628✔
35

36
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,628✔
37
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
1,267!
38
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
361✔
39
    tqReaderClose(pData->execHandle.pTqReader);
274✔
40
    walCloseReader(pData->pWalReader);
274✔
41
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
274✔
42
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
87!
43
    walCloseReader(pData->pWalReader);
87✔
44
    tqReaderClose(pData->execHandle.pTqReader);
87✔
45
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
87!
46
    nodesDestroyNode(pData->execHandle.execTb.node);
87✔
47
  }
48
  if (pData->msg != NULL) {
1,627!
49
    rpcFreeCont(pData->msg->pCont);
×
50
    taosMemoryFree(pData->msg);
×
51
    pData->msg = NULL;
×
52
  }
53
  if (pData->block != NULL) {
1,627!
54
    blockDataDestroy(pData->block);
×
55
  }
56
  if (pData->pRef) {
1,627✔
57
    walCloseRef(pData->pRef->pWal, pData->pRef->refId);
1,585✔
58
  }
59
}
1,627✔
60

61
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
6,205✔
62
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
12,063✔
63
         pLeft->val.version == pRight->val.version;
5,858✔
64
}
65

66
int32_t tqOpen(const char* path, SVnode* pVnode) {
10,137✔
67
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
10,137!
68
  if (pTq == NULL) {
10,144!
69
    return terrno;
×
70
  }
71
  pVnode->pTq = pTq;
10,144✔
72
  pTq->path = taosStrdup(path);
10,144!
73
  if (pTq->path == NULL) {
10,136!
74
    return terrno;
×
75
  }
76
  pTq->pVnode = pVnode;
10,136✔
77

78
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
10,136✔
79
  if (pTq->pHandle == NULL) {
10,140!
80
    return terrno;
×
81
  }
82
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
10,140✔
83

84
  taosInitRWLatch(&pTq->lock);
10,139✔
85

86
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
10,138✔
87
  if (pTq->pPushMgr == NULL) {
10,140!
88
    return terrno;
×
89
  }
90

91
  pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
10,140✔
92
  if (pTq->pCheckInfo == NULL) {
10,143!
93
    return terrno;
×
94
  }
95
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
10,143✔
96

97
  pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK);
10,143✔
98
  if (pTq->pOffset == NULL) {
10,145!
99
    return terrno;
×
100
  }
101
  taosHashSetFreeFp(pTq->pOffset, (FDelete)tDeleteSTqOffset);
10,145✔
102

103
  return tqInitialize(pTq);
10,145✔
104
}
105

106
int32_t tqInitialize(STQ* pTq) {
10,145✔
107
  int32_t vgId = TD_VID(pTq->pVnode);
10,145✔
108
  int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1,
10,145✔
109
                                tqStartTaskCompleteCallback, &pTq->pStreamMeta);
110
  if (code != TSDB_CODE_SUCCESS) {
10,146!
111
    return code;
×
112
  }
113

114
  streamMetaLoadAllTasks(pTq->pStreamMeta);
10,146✔
115
  return tqMetaOpen(pTq);
10,146✔
116
}
117

118
void tqClose(STQ* pTq) {
10,146✔
119
  qDebug("start to close tq");
10,146✔
120
  if (pTq == NULL) {
10,146!
121
    return;
×
122
  }
123

124
  void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
10,146✔
125
  while (pIter) {
10,180✔
126
    STqHandle* pHandle = *(STqHandle**)pIter;
34✔
127
    int32_t    vgId = TD_VID(pTq->pVnode);
34✔
128

129
    if (pHandle->msg != NULL) {
34!
130
      tqPushEmptyDataRsp(pHandle, vgId);
34✔
131
      rpcFreeCont(pHandle->msg->pCont);
34✔
132
      taosMemoryFree(pHandle->msg);
34!
133
      pHandle->msg = NULL;
34✔
134
    }
135
    pIter = taosHashIterate(pTq->pPushMgr, pIter);
34✔
136
  }
137

138
  taosHashCleanup(pTq->pHandle);
10,146✔
139
  taosHashCleanup(pTq->pPushMgr);
10,146✔
140
  taosHashCleanup(pTq->pCheckInfo);
10,146✔
141
  taosHashCleanup(pTq->pOffset);
10,146✔
142
  taosMemoryFree(pTq->path);
10,146!
143
  tqMetaClose(pTq);
10,146✔
144

145
  int32_t vgId = pTq->pStreamMeta->vgId;
10,142✔
146
  streamMetaClose(pTq->pStreamMeta);
10,142✔
147

148
  qDebug("vgId:%d end to close tq", vgId);
10,146✔
149
  taosMemoryFree(pTq);
10,146!
150
}
151

152
void tqNotifyClose(STQ* pTq) {
10,145✔
153
  if (pTq == NULL) {
10,145!
154
    return;
×
155
  }
156
  streamMetaNotifyClose(pTq->pStreamMeta);
10,145✔
157
}
158

159
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
682✔
160
  int32_t    code = 0;
682✔
161
  SMqPollReq req = {0};
682✔
162
  code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req);
682✔
163
  if (code < 0) {
682!
164
    tqError("tDeserializeSMqPollReq %d failed, code:%d", pHandle->msg->contLen, code);
×
165
    return;
×
166
  }
167

168
  SMqDataRsp dataRsp = {0};
682✔
169
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
682✔
170
  if (code != 0) {
681!
171
    tqError("tqInitDataRsp failed, code:%d", code);
×
172
    return;
×
173
  }
174
  dataRsp.blockNum = 0;
681✔
175
  char buf[TSDB_OFFSET_LEN] = {0};
681✔
176
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
681✔
177
  tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s,QID:0x%" PRIx64, req.consumerId, vgId, buf,
682!
178
         req.reqId);
179

180
  code = tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
682✔
181
  if (code != 0) {
682!
182
    tqError("tqSendDataRsp failed, code:%d", code);
×
183
  }
184
  tDeleteMqDataRsp(&dataRsp);
682✔
185
}
186

187
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type,
43,128✔
188
                      int32_t vgId) {
189
  int64_t sver = 0, ever = 0;
43,128✔
190
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
43,128✔
191

192
  char buf1[TSDB_OFFSET_LEN] = {0};
43,128✔
193
  char buf2[TSDB_OFFSET_LEN] = {0};
43,128✔
194
  (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
43,128✔
195
  (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
43,127✔
196

197
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s,QID:0x%" PRIx64,
43,127!
198
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
199

200
  return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
43,127✔
201
}
202

203
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
7,148✔
204
  SMqVgOffset vgOffset = {0};
7,148✔
205
  int32_t     vgId = TD_VID(pTq->pVnode);
7,148✔
206

207
  int32_t  code = 0;
7,148✔
208
  SDecoder decoder;
209
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
7,148✔
210
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
7,155!
211
    code = TSDB_CODE_INVALID_MSG;
×
212
    goto end;
×
213
  }
214

215
  tDecoderClear(&decoder);
7,147✔
216

217
  STqOffset* pOffset = &vgOffset.offset;
7,146✔
218

219
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
7,146!
220
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
414!
221
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
222
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
6,732✔
223
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
6,728!
224
            pOffset->val.version);
225
  } else {
226
    tqError("invalid commit offset type:%d", pOffset->val.type);
4!
227
    code = TSDB_CODE_INVALID_MSG;
×
228
    goto end;
×
229
  }
230

231
  STqOffset* pSavedOffset = NULL;
7,154✔
232
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
7,154✔
233
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
7,156✔
234
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
2!
235
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
236
    goto end;  // no need to update the offset value
2✔
237
  }
238

239
  // save the new offset value
240
  if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))) {
7,154!
241
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
242
    return -1;
×
243
  }
244

245
  if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg,
7,154✔
246
                     msgLen - sizeof(vgOffset.consumerId)) < 0) {
7,154✔
247
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1✔
248
    return -1;
×
249
  }
250

251
  return 0;
7,153✔
252
end:
2✔
253
  tOffsetDestroy(&vgOffset.offset.val);
2✔
254
  return code;
2✔
255
}
256

257
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
24✔
258
  SMqSeekReq req = {0};
24✔
259
  int32_t    vgId = TD_VID(pTq->pVnode);
24✔
260
  SRpcMsg    rsp = {.info = pMsg->info};
24✔
261
  int        code = 0;
24✔
262

263
  if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
24!
264
    code = TSDB_CODE_OUT_OF_MEMORY;
×
265
    goto end;
×
266
  }
267

268
  tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
24!
269
  taosWLockLatch(&pTq->lock);
24✔
270

271
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
24✔
272
  if (pHandle == NULL) {
24!
273
    tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
×
274
    code = 0;
×
275
    taosWUnLockLatch(&pTq->lock);
×
276
    goto end;
×
277
  }
278

279
  // 2. check consumer-vg assignment status
280
  if (pHandle->consumerId != req.consumerId) {
24!
281
    tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
282
            req.consumerId, vgId, req.subKey, pHandle->consumerId);
283
    taosWUnLockLatch(&pTq->lock);
×
284
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
285
    goto end;
×
286
  }
287

288
  // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
289
  // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
290
  tqUnregisterPushHandle(pTq, pHandle);
24✔
291
  taosWUnLockLatch(&pTq->lock);
24✔
292

293
end:
24✔
294
  rsp.code = code;
24✔
295
  tmsgSendRsp(&rsp);
24✔
296
  return 0;
24✔
297
}
298

299
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
130✔
300
  void* pIter = NULL;
130✔
301

302
  while (1) {
11✔
303
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
141✔
304
    if (pIter == NULL) {
141✔
305
      break;
95✔
306
    }
307

308
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
46✔
309

310
    if (pCheck->ntbUid == tbUid) {
46!
311
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
46✔
312
      for (int32_t i = 0; i < sz; i++) {
168✔
313
        int16_t* pForbidColId = taosArrayGet(pCheck->colIdList, i);
157✔
314
        if (pForbidColId == NULL) {
157!
315
          continue;
×
316
        }
317

318
        if ((*pForbidColId) == colId) {
157✔
319
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
35✔
320
          return -1;
35✔
321
        }
322
      }
323
    }
324
  }
325

326
  return 0;
95✔
327
}
328

329
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
26,624✔
330
  int32_t vgId = TD_VID(pTq->pVnode);
26,624✔
331
  taosWLockLatch(&pTq->lock);
26,624✔
332
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
26,624!
333
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
26,624✔
334

335
    while (pIter) {
53,986✔
336
      STqHandle* pHandle = *(STqHandle**)pIter;
27,362✔
337
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
27,362!
338

339
      if (pHandle->msg == NULL) {
27,362!
340
        tqError("pHandle->msg should not be null");
×
341
        taosHashCancelIterate(pTq->pPushMgr, pIter);
×
342
        break;
×
343
      } else {
344
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME,
27,362✔
345
                       .pCont = pHandle->msg->pCont,
27,362✔
346
                       .contLen = pHandle->msg->contLen,
27,362✔
347
                       .info = pHandle->msg->info};
27,362✔
348
        if (tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg) != 0){
27,362!
349
          tqError("vgId:%d tmsgPutToQueue failed, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
×
350
        }
351
        taosMemoryFree(pHandle->msg);
27,362!
352
        pHandle->msg = NULL;
27,362✔
353
      }
354

355
      pIter = taosHashIterate(pTq->pPushMgr, pIter);
27,362✔
356
    }
357

358
    taosHashClear(pTq->pPushMgr);
26,624✔
359
  }
360
  taosWUnLockLatch(&pTq->lock);
26,624✔
361
  return 0;
26,624✔
362
}
363

364
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
72,001✔
365
  SMqPollReq req = {0};
72,001✔
366
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
72,001✔
367
  if (code < 0) {
71,988!
368
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
369
    terrno = TSDB_CODE_INVALID_MSG;
×
370
    goto END;
×
371
  }
372

373
  int64_t      consumerId = req.consumerId;
72,008✔
374
  int32_t      reqEpoch = req.epoch;
72,008✔
375
  STqOffsetVal reqOffset = req.reqOffset;
72,008✔
376
  int32_t      vgId = TD_VID(pTq->pVnode);
72,008✔
377
  STqHandle*   pHandle = NULL;
72,008✔
378

379
  while (1) {
5✔
380
    taosWLockLatch(&pTq->lock);
72,013✔
381
    // 1. find handle
382
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
72,017✔
383
    if (code != TDB_CODE_SUCCESS) {
72,000✔
384
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
17!
385
      terrno = TSDB_CODE_INVALID_MSG;
17✔
386
      taosWUnLockLatch(&pTq->lock);
17✔
387
      return -1;
17✔
388
    }
389

390
    // 2. check rebalance status
391
    if (pHandle->consumerId != consumerId) {
71,983✔
392
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
10!
393
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
394
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
395
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
10✔
396
      taosWUnLockLatch(&pTq->lock);
10✔
397
      code = -1;
10✔
398
      goto END;
10✔
399
    }
400

401
    bool exec = tqIsHandleExec(pHandle);
71,973✔
402
    if (!exec) {
71,973!
403
      tqSetHandleExec(pHandle);
71,981✔
404
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
405
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
71,981!
406
              req.subKey, pHandle);
407
      taosWUnLockLatch(&pTq->lock);
71,984✔
408
      break;
71,993✔
409
    }
410
    taosWUnLockLatch(&pTq->lock);
×
411

412
    tqDebug("tmq poll: consumer:0x%" PRIx64
5!
413
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
414
            consumerId, vgId, req.subKey, pHandle);
415
    taosMsleep(10);
5✔
416
  }
417

418
  // 3. update the epoch value
419
  if (pHandle->epoch < reqEpoch) {
71,993✔
420
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
1,405!
421
            reqEpoch);
422
    pHandle->epoch = reqEpoch;
1,405✔
423
  }
424

425
  char buf[TSDB_OFFSET_LEN] = {0};
71,993✔
426
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
71,993✔
427
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s,QID:0x%" PRIx64,
71,993!
428
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
429

430
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
71,993✔
431
  tqSetHandleIdle(pHandle);
71,987✔
432

433
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
71,987!
434
          req.subKey, pHandle);
435

436
END:
×
437
  tDestroySMqPollReq(&req);
72,002✔
438
  return code;
72,003✔
439
}
440

441
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
2✔
442
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
2✔
443
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
2✔
444

445
  SMqVgOffset vgOffset = {0};
2✔
446

447
  SDecoder decoder;
448
  tDecoderInit(&decoder, (uint8_t*)data, len);
2✔
449
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
2!
450
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
451
    return terrno;
×
452
  }
453

454
  tDecoderClear(&decoder);
2✔
455

456
  STqOffset* pSavedOffset = NULL;
2✔
457
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
2✔
458
  if (code != 0) {
2✔
459
    return TSDB_CODE_TMQ_NO_COMMITTED;
1✔
460
  }
461
  vgOffset.offset = *pSavedOffset;
1✔
462

463
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
1!
464
  if (code < 0) {
1!
465
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
466
  }
467

468
  void* buf = rpcMallocCont(len);
1✔
469
  if (buf == NULL) {
1!
470
    return terrno;
×
471
  }
472
  SEncoder encoder = {0};
1✔
473
  tEncoderInit(&encoder, buf, len);
1✔
474
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
1✔
475
  tEncoderClear(&encoder);
1✔
476
  if (code < 0) {
1!
477
    rpcFreeCont(buf);
×
478
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
479
  }
480

481
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
1✔
482

483
  tmsgSendRsp(&rsp);
1✔
484
  return 0;
1✔
485
}
486

487
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
13✔
488
  int32_t    code = 0;
13✔
489
  SMqPollReq req = {0};
13✔
490
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
13!
491
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
492
    return TSDB_CODE_INVALID_MSG;
×
493
  }
494

495
  int64_t      consumerId = req.consumerId;
13✔
496
  STqOffsetVal reqOffset = req.reqOffset;
13✔
497
  int32_t      vgId = TD_VID(pTq->pVnode);
13✔
498

499
  // 1. find handle
500
  taosRLockLatch(&pTq->lock);
13✔
501
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
13✔
502
  if (pHandle == NULL) {
13!
503
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
504
    taosRUnLockLatch(&pTq->lock);
×
505
    return TSDB_CODE_INVALID_MSG;
×
506
  }
507

508
  // 2. check rebalance status
509
  if (pHandle->consumerId != consumerId) {
13!
510
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
511
            consumerId, vgId, req.subKey, pHandle->consumerId);
512
    taosRUnLockLatch(&pTq->lock);
×
513
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
514
  }
515

516
  int64_t sver = 0, ever = 0;
13✔
517
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
13✔
518
  taosRUnLockLatch(&pTq->lock);
13✔
519

520
  SMqDataRsp dataRsp = {0};
13✔
521
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
13✔
522
  if (code != 0) {
13!
523
    return code;
×
524
  }
525

526
  if (req.useSnapshot == true) {
13!
527
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
528
    code = TSDB_CODE_INVALID_PARA;
×
529
    goto END;
×
530
  }
531

532
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
13✔
533

534
  if (reqOffset.type == TMQ_OFFSET__LOG) {
13✔
535
    dataRsp.rspOffset.version = reqOffset.version;
3✔
536
  } else if (reqOffset.type < 0) {
10!
537
    STqOffset* pOffset = NULL;
10✔
538
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
10✔
539
    if (code == 0) {
10✔
540
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
1!
541
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
542
        code = TSDB_CODE_INVALID_PARA;
×
543
        goto END;
×
544
      }
545

546
      dataRsp.rspOffset.version = pOffset->val.version;
1✔
547
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
1!
548
             req.subKey, dataRsp.rspOffset.version);
549
    } else {
550
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
9✔
551
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
8✔
552
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
1!
553
        dataRsp.rspOffset.version = ever;
1✔
554
      }
555
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
9!
556
             dataRsp.rspOffset.version);
557
    }
558
  } else {
UNCOV
559
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
560
            reqOffset.type);
561
    code = TSDB_CODE_INVALID_PARA;
×
562
    goto END;
×
563
  }
564

565
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
13✔
566

567
END:
13✔
568
  tDeleteMqDataRsp(&dataRsp);
13✔
569
  return code;
13✔
570
}
571

572
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
865✔
573
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
865✔
574
  int32_t        vgId = TD_VID(pTq->pVnode);
865✔
575

576
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
865!
577
  int32_t code = 0;
865✔
578

579
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
865✔
580
  if (pHandle) {
867✔
581
    while (1) {
×
582
      taosWLockLatch(&pTq->lock);
866✔
583
      bool exec = tqIsHandleExec(pHandle);
866✔
584

585
      if (exec) {
866!
586
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
×
587
               pHandle->subKey, pHandle);
588
        taosWUnLockLatch(&pTq->lock);
×
589
        taosMsleep(10);
×
590
        continue;
×
591
      }
592
      tqUnregisterPushHandle(pTq, pHandle);
866✔
593
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
866✔
594
      if (code != 0) {
865!
595
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
596
      }
597
      taosWUnLockLatch(&pTq->lock);
865✔
598
      break;
865✔
599
    }
600
  }
601

602
  taosWLockLatch(&pTq->lock);
866✔
603
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
866✔
604
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
259!
605
  }
606
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
867✔
607
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
257!
608
  }
609

610
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
867!
611
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
612
  }
613
  taosWUnLockLatch(&pTq->lock);
866✔
614

615
  return 0;
867✔
616
}
617

618
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
116✔
619
  STqCheckInfo info = {0};
116✔
620
  int32_t      code = tqMetaDecodeCheckInfo(&info, msg, msgLen);
116✔
621
  if (code != 0) {
116!
622
    return code;
×
623
  }
624

625
  code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
116✔
626
  if (code != 0) {
116!
627
    tDeleteSTqCheckInfo(&info);
×
628
    return code;
×
629
  }
630

631
  return tqMetaSaveInfo(pTq, pTq->pCheckStore, info.topic, strlen(info.topic), msg, msgLen);
116✔
632
}
633

634
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
12✔
635
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
12✔
636
    return TSDB_CODE_TSC_INTERNAL_ERROR;
2✔
637
  }
638
  return tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, strlen(msg));
10✔
639
}
640

641
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
3,079✔
642
  int         ret = 0;
3,079✔
643
  SMqRebVgReq req = {0};
3,079✔
644
  SDecoder    dc = {0};
3,079✔
645

646
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
3,079✔
647
  ret = tDecodeSMqRebVgReq(&dc, &req);
3,079✔
648
  if (ret < 0) {
3,080!
649
    goto end;
×
650
  }
651

652
  tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
3,080✔
653
         req.oldConsumerId, req.newConsumerId);
654

655
  taosRLockLatch(&pTq->lock);
3,086✔
656
  STqHandle* pHandle = NULL;
3,085✔
657
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
3,085✔
658
  if (code != 0){
3,085✔
659
    tqInfo("vgId:%d, tq process sub req:%s, no such handle, create new one", pTq->pVnode->config.vgId, req.subKey);
1,404!
660
  }
661
  taosRUnLockLatch(&pTq->lock);
3,085✔
662
  if (pHandle == NULL) {
3,085✔
663
    if (req.oldConsumerId != -1) {
1,404✔
664
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
1!
665
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
666
    }
667
    if (req.newConsumerId == -1) {
1,404✔
668
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
1!
669
      ret = TSDB_CODE_INVALID_PARA;
1✔
670
      goto end;
1✔
671
    }
672
    STqHandle handle = {0};
1,403✔
673
    ret = tqMetaCreateHandle(pTq, &req, &handle);
1,403✔
674
    if (ret < 0) {
1,403!
675
      tqDestroyTqHandle(&handle);
×
676
      goto end;
×
677
    }
678
    taosWLockLatch(&pTq->lock);
1,403✔
679
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
1,403✔
680
    taosWUnLockLatch(&pTq->lock);
1,403✔
681
  } else {
682
    while (1) {
×
683
      taosWLockLatch(&pTq->lock);
1,681✔
684
      bool exec = tqIsHandleExec(pHandle);
1,681✔
685
      if (exec) {
1,681!
686
        tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p",
×
687
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
688
        taosWUnLockLatch(&pTq->lock);
×
689
        taosMsleep(10);
×
690
        continue;
×
691
      }
692
      if (pHandle->consumerId == req.newConsumerId) {  // do nothing
1,681✔
693
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
101!
694
      } else {
695
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
1,580!
696
               req.newConsumerId);
697

698
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
1,580✔
699
        atomic_store_32(&pHandle->epoch, 0);
1,580✔
700
        tqUnregisterPushHandle(pTq, pHandle);
1,580✔
701
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
1,580✔
702
      }
703
      taosWUnLockLatch(&pTq->lock);
1,680✔
704
      break;
1,681✔
705
    }
706
  }
707

708
end:
3,085✔
709
  tDecoderClear(&dc);
3,085✔
710
  return ret;
3,085✔
711
}
712

713
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
44,864!
714

715
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
13,840✔
716
  STQ*             pTq = (STQ*)pTqObj;
13,840✔
717
  int32_t          vgId = TD_VID(pTq->pVnode);
13,840✔
718
  SCheckpointInfo* pChkInfo = NULL;
13,840✔
719

720
  tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
13,840✔
721

722
  int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer);
13,840✔
723
  if (code != TSDB_CODE_SUCCESS) {
13,841!
724
    return code;
×
725
  }
726

727
  pTask->pBackend = NULL;
13,841✔
728

729
  // sink
730
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
13,841✔
731
  if (pOutputInfo->type == TASK_OUTPUT__SMA) {
13,841✔
732
    pOutputInfo->smaSink.vnode = pTq->pVnode;
60✔
733
    pOutputInfo->smaSink.smaSink = smaHandleRes;
60✔
734
  } else if (pOutputInfo->type == TASK_OUTPUT__TABLE) {
13,781✔
735
    pOutputInfo->tbSink.vnode = pTq->pVnode;
6,774✔
736
    pOutputInfo->tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
6,774✔
737

738
    int32_t   ver1 = 1;
6,774✔
739
    SMetaInfo info = {0};
6,774✔
740
    code = metaGetInfo(pTq->pVnode->pMeta, pOutputInfo->tbSink.stbUid, &info, NULL);
6,774✔
741
    if (code == TSDB_CODE_SUCCESS) {
6,768✔
742
      ver1 = info.skmVer;
6,286✔
743
    }
744

745
    SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper;
6,768✔
746
    pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
6,768✔
747
    if (pOutputInfo->tbSink.pTSchema == NULL) {
6,773!
748
      return terrno;
×
749
    }
750

751
    pOutputInfo->tbSink.pTbInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
6,773✔
752
    if (pOutputInfo->tbSink.pTbInfo == NULL) {
6,777!
753
      tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
×
754
      return terrno;
×
755
    }
756

757
    tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTbInfo, freePtr);
6,777✔
758
  }
759

760
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
13,842✔
761
    bool scanDropCtb = pTask->subtableWithoutMd5 ? true : false;
6,974✔
762
    SWalFilterCond cond = {.deleteMsg = 1, .scanDropCtb = scanDropCtb};  // delete msg also extract from wal files
6,974✔
763
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
6,974✔
764
    if (pTask->exec.pWalReader == NULL) {
6,974!
UNCOV
765
      tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno));
×
UNCOV
766
      return terrno;
×
767
    }
768
  }
769

770
  streamTaskResetUpstreamStageInfo(pTask);
13,842✔
771

772
  pChkInfo = &pTask->chkInfo;
13,842✔
773
  tqSetRestoreVersionInfo(pTask);
13,842✔
774

775
  char*       p = streamTaskGetStatus(pTask).name;
13,840✔
776
  const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
13,840✔
777

778
  if (pTask->info.fillHistory) {
13,843✔
779
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
4,907!
780
           " nextProcessVer:%" PRId64
781
           " child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x "
782
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
783
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
784
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
785
           (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
786
  } else {
787
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
8,936!
788
           " nextProcessVer:%" PRId64
789
           " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x "
790
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
791
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
792
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
793
           (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
794

795
    if (pChkInfo->checkpointVer > pChkInfo->nextProcessVer) {
8,943!
796
      tqError("vgId:%d build stream task, s-task:%s, checkpointVer:%" PRId64 " > nextProcessVer:%" PRId64, vgId,
×
797
              pTask->id.idStr, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
798
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
799
    }
800
  }
801

802
  return 0;
13,850✔
803
}
804

805
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
20,143✔
806

807
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
20,274✔
808
  return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
20,274✔
809
}
810

811
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
13,248✔
812
  return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen,
13,265✔
813
                                      vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
13,248✔
814
}
815

816
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
2,278✔
817
  const char*    id = pTask->id.idStr;
2,278✔
818
  int64_t        nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
2,278✔
819
  SVersionRange* pStep2Range = &pTask->step2Range;
2,278✔
820
  int32_t        vgId = pTask->pMeta->vgId;
2,278✔
821

822
  // if it's an source task, extract the last version in wal.
823
  bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
2,278✔
824
  pTask->execInfo.step2Start = taosGetTimestampMs();
2,278✔
825

826
  if (done) {
2,278✔
827
    qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id,
1,468✔
828
           pStep2Range->minVer, pStep2Range->maxVer, 0.0);
829
    int32_t code = streamTaskPutTranstateIntoInputQ(pTask);  // todo: msg lost.
1,468✔
830
    if (code) {
1,467!
831
      qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
×
832
    }
833
    (void)streamExecTask(pTask);  // exec directly
1,467✔
834
  } else {
835
    STimeWindow* pWindow = &pTask->dataRange.window;
810✔
836
    tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
810✔
837
            ", do secondary scan-history from WAL after halt the related stream task:%s",
838
            id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey,
839
            pStreamTask->id.idStr);
840
    if (pTask->status.schedStatus != TASK_SCHED_STATUS__WAITING) {
810!
841
      tqError("s-task:%s level:%d unexpected sched-status:%d", id, pTask->info.taskLevel, pTask->status.schedStatus);
×
842
    }
843

844
    int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
810✔
845
    if (code) {
810!
846
      tqError("s-task:%s level:%d failed to set step2 param", id, pTask->info.taskLevel);
×
847
    }
848

849
    int64_t dstVer = pStep2Range->minVer;
810✔
850
    pTask->chkInfo.nextProcessVer = dstVer;
810✔
851

852
    walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
810✔
853
    tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
810✔
854
            pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
855

856
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
810✔
857

858
    // now the fill-history task starts to scan data from wal files.
859
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
810✔
860
    if (code == TSDB_CODE_SUCCESS) {
810!
861
      code = tqScanWalAsync(pTq, false);
810✔
862
      if (code) {
810!
863
        tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
×
864
      }
865
    }
866
  }
867
}
2,278✔
868

869
int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
2,278✔
870
  STQ* pTq = param;
2,278✔
871

872
  SStreamMeta* pMeta = pStreamTask->pMeta;
2,278✔
873
  STaskId      hId = pStreamTask->hTaskInfo.id;
2,278✔
874
  SStreamTask* pTask = NULL;
2,278✔
875
  int32_t      code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask);
2,278✔
876
  if (pTask == NULL) {
2,278!
877
    tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t)hId.taskId);
×
878
    return TSDB_CODE_SUCCESS;
×
879
  }
880

881
  doStartFillhistoryStep2(pTask, pStreamTask, pTq);
2,278✔
882

883
  streamMetaReleaseTask(pMeta, pTask);
2,278✔
884
  return TSDB_CODE_SUCCESS;
2,278✔
885
}
886

887
// this function should be executed by only one thread, so we set an sentinel to protect this function
888
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
2,486✔
889
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
2,486✔
890
  SStreamMeta*           pMeta = pTq->pStreamMeta;
2,486✔
891
  int32_t                code = TSDB_CODE_SUCCESS;
2,486✔
892
  SStreamTask*           pTask = NULL;
2,486✔
893
  SStreamTask*           pStreamTask = NULL;
2,486✔
894

895
  code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
2,486✔
896
  if (pTask == NULL) {
2,487!
897
    tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed",
×
898
            pMeta->vgId, pReq->taskId);
899
    return code;
×
900
  }
901

902
  // do recovery step1
903
  const char* id = pTask->id.idStr;
2,487✔
904
  char*       pStatus = streamTaskGetStatus(pTask).name;
2,487✔
905

906
  // avoid multi-thread exec
907
  while (1) {
×
908
    int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1);
2,486✔
909
    if (sentinel != 0) {
2,486!
910
      tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id);
×
911
      taosMsleep(100);
×
912
    } else {
913
      break;
2,486✔
914
    }
915
  }
916

917
  // let's decide which step should be executed now
918
  if (pTask->execInfo.step1Start == 0) {
2,486✔
919
    int64_t ts = taosGetTimestampMs();
2,289✔
920
    pTask->execInfo.step1Start = ts;
2,289✔
921
    tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
2,289✔
922
  } else {
923
    if (pTask->execInfo.step2Start == 0) {
197✔
924
      tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
162!
925
              id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
926
    } else {
927
      tqDebug("s-task:%s already in step2, no need to scan-history data, step2 startTs:%" PRId64, id,
35!
928
              pTask->execInfo.step2Start);
929

930
      atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
35✔
931
      streamMetaReleaseTask(pMeta, pTask);
36✔
932
      return 0;
36✔
933
    }
934
  }
935

936
  // we have to continue retrying to successfully execute the scan history task.
937
  if (!streamTaskSetSchedStatusWait(pTask)) {
2,451!
938
    tqError(
×
939
        "s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
940
        "sched-status:%d",
941
        id, pTask->status.schedStatus);
942
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
×
943
    streamMetaReleaseTask(pMeta, pTask);
×
944
    return 0;
×
945
  }
946

947
  int64_t              st = taosGetTimestampMs();
2,450✔
948
  SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
2,450✔
949

950
  double el = (taosGetTimestampMs() - st) / 1000.0;
2,451✔
951
  pTask->execInfo.step1El += el;
2,451✔
952

953
  if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
2,451✔
954
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
169✔
955
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
169✔
956

957
    if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
169✔
958
      streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
162✔
959
    } else {
960
      SStreamTaskState p = streamTaskGetStatus(pTask);
7✔
961
      ETaskStatus      s = p.state;
7✔
962

963
      if (s == TASK_STATUS__PAUSE) {
7!
964
        tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", id, el,
×
965
                pTask->execInfo.step1El, status);
966
      } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
7!
967
        tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", id, p.name,
7!
968
                pTask->execInfo.step1El);
969
      }
970
    }
971

972
    streamMetaReleaseTask(pMeta, pTask);
169✔
973
    return 0;
169✔
974
  }
975

976
  // the following procedure should be executed, no matter status is stop/pause or not
977
  tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
2,282✔
978

979
  if (pTask->info.fillHistory != 1) {
2,282!
980
    tqError("s-task:%s fill-history is disabled, unexpected", id);
×
981
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
982
  }
983

984
  // 1. get the related stream task
985
  code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,282✔
986
  if (pStreamTask == NULL) {
2,282✔
987
    tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
4!
988
            pTask->streamTaskId.taskId, pTask->id.idStr);
989

990
    tqDebug("s-task:%s fill-history task set status to be dropping", id);
4!
991
    code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
4✔
992

993
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
4✔
994
    streamMetaReleaseTask(pMeta, pTask);
4✔
995
    return code;
4✔
996
  }
997

998
  if (pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) {
2,278!
999
    tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel);
×
1000
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1001
  }
1002

1003
  code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
2,278✔
1004
  streamMetaReleaseTask(pMeta, pStreamTask);
2,278✔
1005

1006
  atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
2,278✔
1007
  streamMetaReleaseTask(pMeta, pTask);
2,278✔
1008
  return code;
2,278✔
1009
}
1010

1011
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
134,912✔
1012
  int32_t  code = 0;
134,912✔
1013
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
134,912✔
1014
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
134,912✔
1015
  SDecoder decoder;
1016

1017
  SStreamTaskRunReq req = {0};
134,912✔
1018
  tDecoderInit(&decoder, (uint8_t*)msg, len);
134,912✔
1019
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
134,959!
1020
    tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
×
1021
    tDecoderClear(&decoder);
×
1022
    return TSDB_CODE_SUCCESS;
×
1023
  }
1024

1025
  tDecoderClear(&decoder);
134,923✔
1026

1027
  // extracted submit data from wal files for all tasks
1028
  if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
134,927✔
1029
    return tqScanWal(pTq);
52,117✔
1030
  }
1031

1032
  code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
82,810✔
1033
  if (code) {
82,839✔
1034
    tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
33!
1035
    return code;
33✔
1036
  }
1037

1038
  // let's continue scan data in the wal files
1039
  if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
82,806✔
1040
    code = tqScanWalAsync(pTq, false);  // it's ok to failed
62,014✔
1041
    if (code) {
62,020✔
1042
      tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
4!
1043
    }
1044
  }
1045

1046
  return code;
82,814✔
1047
}
1048

1049
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
49,665✔
1050
  return tqStreamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg);
49,665✔
1051
}
1052

1053
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
49,631✔
1054
  return tqStreamTaskProcessDispatchRsp(pTq->pStreamMeta, pMsg);
49,631✔
1055
}
1056

1057
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
6,727✔
1058
  return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen);
6,727✔
1059
}
1060

1061
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
4,278✔
1062
  return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg);
4,278✔
1063
}
1064

1065
int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg) {
102✔
1066
  return tqStreamTaskProcessConsenChkptIdReq(pTq->pStreamMeta, pMsg);
102✔
1067
}
1068

1069
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,389✔
1070
  return tqStreamTaskProcessTaskPauseReq(pTq->pStreamMeta, msg);
1,389✔
1071
}
1072

1073
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,385✔
1074
  return tqStreamTaskProcessTaskResumeReq(pTq, sversion, msg, true);
1,385✔
1075
}
1076

1077
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
566✔
1078
  return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
566✔
1079
}
1080

1081
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; }
450✔
1082

1083
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
×
1084
  char*               msgStr = pMsg->pCont;
×
1085
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
×
1086
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
×
1087
  int32_t             code = 0;
×
1088
  SStreamProgressReq  req;
1089
  char*               pRspBuf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(SStreamProgressRsp));
×
1090
  SStreamProgressRsp* pRsp = POINTER_SHIFT(pRspBuf, sizeof(SMsgHead));
×
1091
  if (!pRspBuf) {
×
1092
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1093
    code = -1;
×
1094
    goto _OVER;
×
1095
  }
1096

1097
  code = tDeserializeStreamProgressReq(msgBody, msgLen, &req);
×
1098
  if (code == TSDB_CODE_SUCCESS) {
×
1099
    code = tqGetStreamExecInfo(pTq->pVnode, req.streamId, &pRsp->progressDelay, &pRsp->fillHisFinished);
×
1100
  }
1101
  if (code == TSDB_CODE_SUCCESS) {
×
1102
    pRsp->fetchIdx = req.fetchIdx;
×
1103
    pRsp->subFetchIdx = req.subFetchIdx;
×
1104
    pRsp->vgId = req.vgId;
×
1105
    pRsp->streamId = req.streamId;
×
1106
    code = tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp);
×
1107
    if (code) {
×
1108
      goto _OVER;
×
1109
    }
1110

1111
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
×
1112
    rsp.pCont = pRspBuf;
×
1113
    pRspBuf = NULL;
×
1114
    rsp.contLen = sizeof(SMsgHead) + sizeof(SStreamProgressRsp);
×
1115
    tmsgSendRsp(&rsp);
×
1116
  }
1117

1118
_OVER:
×
1119
  if (pRspBuf) {
×
1120
    taosMemoryFree(pRspBuf);
×
1121
  }
1122
  return code;
×
1123
}
1124

1125
// always return success to mnode
1126
//todo: handle failure of build and send msg to mnode
1127
static void doSendChkptSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, int32_t code,
588✔
1128
                                 int32_t taskId) {
1129
  SRpcMsg rsp = {0};
588✔
1130
  int32_t ret = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &rsp, code);
588✔
1131
  if (ret) {  // suppress the error in build checkpoint source rsp
588!
1132
    tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", taskId, tstrerror(ret));
×
1133
  }
1134
  tmsgSendRsp(&rsp);  // error occurs
588✔
1135
}
588✔
1136

1137
// no matter what kinds of error happened, make sure the mnode will receive the success execution code.
1138
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
2,695✔
1139
  int32_t                    vgId = TD_VID(pTq->pVnode);
2,695✔
1140
  SStreamMeta*               pMeta = pTq->pStreamMeta;
2,695✔
1141
  char*                      msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
2,695✔
1142
  int32_t                    len = pMsg->contLen - sizeof(SMsgHead);
2,695✔
1143
  int32_t                    code = 0;
2,695✔
1144
  SStreamCheckpointSourceReq req = {0};
2,695✔
1145
  SDecoder                   decoder = {0};
2,695✔
1146
  SStreamTask*               pTask = NULL;
2,695✔
1147
  int64_t                    checkpointId = 0;
2,695✔
1148

1149
  // disable auto rsp to mnode
1150
  pRsp->info.handle = NULL;
2,695✔
1151

1152
  tDecoderInit(&decoder, (uint8_t*)msg, len);
2,695✔
1153
  if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
2,696!
1154
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
1155
    tDecoderClear(&decoder);
×
1156
    tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
×
1157
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1158
    return TSDB_CODE_SUCCESS;  // always return success to mnode,
×
1159
  }
1160

1161
  tDecoderClear(&decoder);
2,694✔
1162

1163
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
2,694✔
1164
    tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
9!
1165
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
9✔
1166
    return TSDB_CODE_SUCCESS;  // always return success to mnode
9✔
1167
  }
1168

1169
  if (!pTq->pVnode->restored) {
2,683✔
1170
    tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64
579✔
1171
            ", transId:%d s-task:0x%x ignore it",
1172
            vgId, req.checkpointId, req.transId, req.taskId);
1173
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
579✔
1174
    return TSDB_CODE_SUCCESS;  // always return success to mnode
579✔
1175
  }
1176

1177
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
2,104✔
1178
  if (pTask == NULL || code != 0) {
2,116!
1179
    tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
8!
1180
            " transId:%d it may have been destroyed",
1181
            vgId, req.taskId, req.checkpointId, req.transId);
1182
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
8✔
1183
    return TSDB_CODE_SUCCESS;
×
1184
  }
1185

1186
  if (pTask->status.downstreamReady != 1) {
2,108!
1187
    // record the latest failed checkpoint id
1188
    streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId);
×
1189
    tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
×
1190
            ", transId:%d set it failed",
1191
            pTask->id.idStr, req.checkpointId, req.transId);
1192

1193
    streamMetaReleaseTask(pMeta, pTask);
×
1194
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1195
    return TSDB_CODE_SUCCESS;  // todo retry handle error
×
1196
  }
1197

1198
  // todo save the checkpoint failed info
1199
  streamMutexLock(&pTask->lock);
2,108✔
1200
  ETaskStatus status = streamTaskGetStatus(pTask).state;
2,114✔
1201

1202
  if (req.mndTrigger == 1) {
2,108✔
1203
    if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
28!
1204
      tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure",
×
1205
              pTask->id.idStr, req.checkpointId);
1206

1207
      streamMutexUnlock(&pTask->lock);
×
1208
      streamMetaReleaseTask(pMeta, pTask);
×
1209
      doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1210
      return TSDB_CODE_SUCCESS;
×
1211
    }
1212
  } else {
1213
    if (status != TASK_STATUS__HALT) {
2,080!
1214
      tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
×
1215
      //      streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
1216
    }
1217
  }
1218

1219
  // check if the checkpoint msg already sent or not.
1220
  if (status == TASK_STATUS__CK) {
2,104!
1221
    streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId);
×
1222

1223
    tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1224
           " transId:%d already handled, ignore msg and continue process checkpoint",
1225
           pTask->id.idStr, checkpointId, req.transId);
1226

1227
    streamMutexUnlock(&pTask->lock);
×
1228
    streamMetaReleaseTask(pMeta, pTask);
×
1229
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SYN_PROPOSE_NOT_READY, req.taskId);
×
1230
    return TSDB_CODE_SUCCESS;
×
1231
  } else {  // checkpoint already finished, and not in checkpoint status
1232
    if (req.checkpointId <= pTask->chkInfo.checkpointId) {
2,104!
1233
      tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1234
             " transId:%d already handled, return success",
1235
             pTask->id.idStr, req.checkpointId, req.transId);
1236

1237
      streamMutexUnlock(&pTask->lock);
×
1238
      streamMetaReleaseTask(pMeta, pTask);
×
1239
      doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1240
      return TSDB_CODE_SUCCESS;
×
1241
    }
1242
  }
1243

1244
  code = streamProcessCheckpointSourceReq(pTask, &req);
2,104✔
1245
  streamMutexUnlock(&pTask->lock);
2,111✔
1246

1247
  if (code) {
2,109!
1248
    qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
×
1249
           tstrerror(code));
1250
    streamMetaReleaseTask(pMeta, pTask);
×
1251
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1252
    return TSDB_CODE_SUCCESS;
×
1253
  }
1254

1255
  if (req.mndTrigger) {
2,109✔
1256
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ",
28!
1257
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
1258
  } else {
1259
    const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask));
2,081✔
1260
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64
2,082!
1261
           ", transId:%d after transfer-state, prev status:%s",
1262
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
1263
  }
1264

1265
  code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
2,115✔
1266
  if (code != TSDB_CODE_SUCCESS) {
2,116!
1267
    streamTaskSetCheckpointFailed(pTask);  // set the checkpoint failed
×
1268
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1269
  }
1270

1271
  streamMetaReleaseTask(pMeta, pTask);
2,116✔
1272
  return TSDB_CODE_SUCCESS;
2,116✔
1273
}
1274

1275
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
1276
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
6,253✔
1277
  int32_t vgId = TD_VID(pTq->pVnode);
6,253✔
1278

1279
  SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont;
6,253✔
1280
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
6,253!
1281
    tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
×
1282
            (int32_t)pReq->downstreamTaskId);
1283
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1284
  }
1285

1286
  return tqStreamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg);
6,252✔
1287
}
1288

1289
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
80✔
1290
  return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored);
80✔
1291
}
1292

1293
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
×
1294
  return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont);
×
1295
}
1296

1297
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
×
1298
  int32_t vgId = TD_VID(pTq->pVnode);
×
1299

1300
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
×
1301
    SRetrieveChkptTriggerReq req = {0};
×
1302

1303
    char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1304
    int32_t  len = pMsg->contLen - sizeof(SMsgHead);
×
1305
    SDecoder decoder = {0};
×
1306

1307
    tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1308
    if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1309
      tDecoderClear(&decoder);
×
1310
      tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId);
×
1311
      return TSDB_CODE_INVALID_MSG;
×
1312
    }
1313
    tDecoderClear(&decoder);
×
1314

1315
    tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId,
×
1316
            req.downstreamTaskId);
1317
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1318
  }
1319

1320
  return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg);
×
1321
}
1322

1323
int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) {
×
1324
  return tqStreamTaskProcessRetrieveTriggerRsp(pTq->pStreamMeta, pMsg);
×
1325
}
1326

1327
// this function is needed, do not try to remove it.
1328
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); }
7,535✔
1329

1330
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
4,276✔
1331
  return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
4,276✔
1332
}
1333

1334
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
6,244✔
1335
  return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg);
6,244✔
1336
}
1337

1338
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
4,096✔
1339
  return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
4,096✔
1340
}
1341

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc