• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3549

06 Dec 2024 09:44AM UTC coverage: 59.948% (+0.1%) from 59.846%
#3549

push

travis-ci

web-flow
Merge pull request #29057 from taosdata/docs/TD-33031-3.0

docs: description of user privileges

118833 of 254191 branches covered (46.75%)

Branch coverage included in aggregate %.

199893 of 277480 relevant lines covered (72.04%)

19006119.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.13
/source/dnode/vnode/src/tq/tq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "osDef.h"
18
#include "taoserror.h"
19
#include "tqCommon.h"
20
#include "tstream.h"
21
#include "vnd.h"
22

23
// 0: not init
24
// 1: already inited
25
// 2: wait to be inited or cleanup
26
static int32_t tqInitialize(STQ* pTq);
27

28
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
861✔
29
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
56,849✔
30
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
56,863✔
31

32
void tqDestroyTqHandle(void* data) {
1,677✔
33
  STqHandle* pData = (STqHandle*)data;
1,677✔
34
  qDestroyTask(pData->execHandle.task);
1,677✔
35

36
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,676✔
37
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
1,289!
38
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
387✔
39
    tqReaderClose(pData->execHandle.pTqReader);
301✔
40
    walCloseReader(pData->pWalReader);
300✔
41
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
301✔
42
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
86!
43
    walCloseReader(pData->pWalReader);
86✔
44
    tqReaderClose(pData->execHandle.pTqReader);
86✔
45
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
86!
46
    nodesDestroyNode(pData->execHandle.execTb.node);
86✔
47
  }
48
  if (pData->msg != NULL) {
1,678!
49
    rpcFreeCont(pData->msg->pCont);
×
50
    taosMemoryFree(pData->msg);
×
51
    pData->msg = NULL;
×
52
  }
53
  if (pData->block != NULL) {
1,678!
54
    blockDataDestroy(pData->block);
×
55
  }
56
  if (pData->pRef) {
1,678✔
57
    walCloseRef(pData->pRef->pWal, pData->pRef->refId);
1,635✔
58
  }
59
}
1,678✔
60

61
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
3,962✔
62
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
7,605✔
63
         pLeft->val.version == pRight->val.version;
3,643✔
64
}
65

66
int32_t tqOpen(const char* path, SVnode* pVnode) {
12,378✔
67
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
12,378✔
68
  if (pTq == NULL) {
12,383!
69
    return terrno;
×
70
  }
71
  pVnode->pTq = pTq;
12,383✔
72
  pTq->path = taosStrdup(path);
12,383✔
73
  if (pTq->path == NULL) {
12,379!
74
    return terrno;
×
75
  }
76
  pTq->pVnode = pVnode;
12,379✔
77

78
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
12,379✔
79
  if (pTq->pHandle == NULL) {
12,379!
80
    return terrno;
×
81
  }
82
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
12,379✔
83

84
  taosInitRWLatch(&pTq->lock);
12,380✔
85

86
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
12,379✔
87
  if (pTq->pPushMgr == NULL) {
12,380!
88
    return terrno;
×
89
  }
90

91
  pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
12,380✔
92
  if (pTq->pCheckInfo == NULL) {
12,382!
93
    return terrno;
×
94
  }
95
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
12,382✔
96

97
  pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK);
12,382✔
98
  if (pTq->pOffset == NULL) {
12,381!
99
    return terrno;
×
100
  }
101
  taosHashSetFreeFp(pTq->pOffset, (FDelete)tDeleteSTqOffset);
12,381✔
102

103
  return tqInitialize(pTq);
12,381✔
104
}
105

106
int32_t tqInitialize(STQ* pTq) {
12,381✔
107
  int32_t vgId = TD_VID(pTq->pVnode);
12,381✔
108
  int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1,
12,381✔
109
                                tqStartTaskCompleteCallback, &pTq->pStreamMeta);
110
  if (code != TSDB_CODE_SUCCESS) {
12,387!
111
    return code;
×
112
  }
113

114
  streamMetaLoadAllTasks(pTq->pStreamMeta);
12,387✔
115
  return tqMetaOpen(pTq);
12,387✔
116
}
117

118
void tqClose(STQ* pTq) {
12,387✔
119
  qDebug("start to close tq");
12,387✔
120
  if (pTq == NULL) {
12,387!
121
    return;
×
122
  }
123

124
  void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
12,387✔
125
  while (pIter) {
12,419✔
126
    STqHandle* pHandle = *(STqHandle**)pIter;
32✔
127
    int32_t    vgId = TD_VID(pTq->pVnode);
32✔
128

129
    if (pHandle->msg != NULL) {
32!
130
      tqPushEmptyDataRsp(pHandle, vgId);
32✔
131
      rpcFreeCont(pHandle->msg->pCont);
32✔
132
      taosMemoryFree(pHandle->msg);
32✔
133
      pHandle->msg = NULL;
32✔
134
    }
135
    pIter = taosHashIterate(pTq->pPushMgr, pIter);
32✔
136
  }
137

138
  taosHashCleanup(pTq->pHandle);
12,387✔
139
  taosHashCleanup(pTq->pPushMgr);
12,387✔
140
  taosHashCleanup(pTq->pCheckInfo);
12,387✔
141
  taosHashCleanup(pTq->pOffset);
12,387✔
142
  taosMemoryFree(pTq->path);
12,387✔
143
  tqMetaClose(pTq);
12,387✔
144

145
  int32_t vgId = pTq->pStreamMeta->vgId;
12,386✔
146
  streamMetaClose(pTq->pStreamMeta);
12,386✔
147

148
  qDebug("vgId:%d end to close tq", vgId);
12,387✔
149
  taosMemoryFree(pTq);
12,387✔
150
}
151

152
void tqNotifyClose(STQ* pTq) {
12,384✔
153
  if (pTq == NULL) {
12,384!
154
    return;
×
155
  }
156
  streamMetaNotifyClose(pTq->pStreamMeta);
12,384✔
157
}
158

159
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
682✔
160
  int32_t    code = 0;
682✔
161
  SMqPollReq req = {0};
682✔
162
  code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req);
682✔
163
  if (code < 0) {
682!
164
    tqError("tDeserializeSMqPollReq %d failed, code:%d", pHandle->msg->contLen, code);
×
165
    return;
×
166
  }
167

168
  SMqDataRsp dataRsp = {0};
682✔
169
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
682✔
170
  if (code != 0) {
682!
171
    tqError("tqInitDataRsp failed, code:%d", code);
×
172
    return;
×
173
  }
174
  dataRsp.blockNum = 0;
682✔
175
  char buf[TSDB_OFFSET_LEN] = {0};
682✔
176
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
682✔
177
  tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s,QID:0x%" PRIx64, req.consumerId, vgId, buf,
681!
178
         req.reqId);
179

180
  code = tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
682✔
181
  if (code != 0) {
681!
182
    tqError("tqSendDataRsp failed, code:%d", code);
×
183
  }
184
  tDeleteMqDataRsp(&dataRsp);
681✔
185
}
186

187
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type,
35,929✔
188
                      int32_t vgId) {
189
  int64_t sver = 0, ever = 0;
35,929✔
190
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
35,929✔
191

192
  char buf1[TSDB_OFFSET_LEN] = {0};
35,929✔
193
  char buf2[TSDB_OFFSET_LEN] = {0};
35,929✔
194
  (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
35,929✔
195
  (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
35,929✔
196

197
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s,QID:0x%" PRIx64,
35,930!
198
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
199

200
  return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
35,930✔
201
}
202

203
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
4,914✔
204
  SMqVgOffset vgOffset = {0};
4,914✔
205
  int32_t     vgId = TD_VID(pTq->pVnode);
4,914✔
206

207
  int32_t  code = 0;
4,914✔
208
  SDecoder decoder;
209
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
4,914✔
210
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
4,921!
211
    code = TSDB_CODE_INVALID_MSG;
×
212
    goto end;
×
213
  }
214

215
  tDecoderClear(&decoder);
4,914✔
216

217
  STqOffset* pOffset = &vgOffset.offset;
4,913✔
218

219
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
4,913✔
220
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
386!
221
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
222
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
4,527✔
223
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
4,524!
224
            pOffset->val.version);
225
  } else {
226
    tqError("invalid commit offset type:%d", pOffset->val.type);
3!
227
    code = TSDB_CODE_INVALID_MSG;
×
228
    goto end;
×
229
  }
230

231
  STqOffset* pSavedOffset = NULL;
4,922✔
232
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
4,922✔
233
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
4,922✔
234
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
3!
235
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
236
    goto end;  // no need to update the offset value
3✔
237
  }
238

239
  // save the new offset value
240
  if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))) {
4,919!
241
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
242
    return -1;
×
243
  }
244

245
  if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg,
4,917!
246
                     msgLen - sizeof(vgOffset.consumerId)) < 0) {
4,919✔
247
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
248
    return -1;
×
249
  }
250

251
  return 0;
4,917✔
252
end:
3✔
253
  tOffsetDestroy(&vgOffset.offset.val);
3✔
254
  return code;
3✔
255
}
256

257
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
26✔
258
  SMqSeekReq req = {0};
26✔
259
  int32_t    vgId = TD_VID(pTq->pVnode);
26✔
260
  SRpcMsg    rsp = {.info = pMsg->info};
26✔
261
  int        code = 0;
26✔
262

263
  if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
26!
264
    code = TSDB_CODE_OUT_OF_MEMORY;
×
265
    goto end;
×
266
  }
267

268
  tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
26!
269
  taosWLockLatch(&pTq->lock);
26✔
270

271
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
26✔
272
  if (pHandle == NULL) {
26!
273
    tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
×
274
    code = 0;
×
275
    taosWUnLockLatch(&pTq->lock);
×
276
    goto end;
×
277
  }
278

279
  // 2. check consumer-vg assignment status
280
  if (pHandle->consumerId != req.consumerId) {
26!
281
    tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
282
            req.consumerId, vgId, req.subKey, pHandle->consumerId);
283
    taosWUnLockLatch(&pTq->lock);
×
284
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
285
    goto end;
×
286
  }
287

288
  // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
289
  // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
290
  tqUnregisterPushHandle(pTq, pHandle);
26✔
291
  taosWUnLockLatch(&pTq->lock);
26✔
292

293
end:
26✔
294
  rsp.code = code;
26✔
295
  tmsgSendRsp(&rsp);
26✔
296
  return 0;
26✔
297
}
298

299
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
133✔
300
  void* pIter = NULL;
133✔
301

302
  while (1) {
11✔
303
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
144✔
304
    if (pIter == NULL) {
144✔
305
      break;
98✔
306
    }
307

308
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
46✔
309

310
    if (pCheck->ntbUid == tbUid) {
46!
311
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
46✔
312
      for (int32_t i = 0; i < sz; i++) {
168✔
313
        int16_t* pForbidColId = taosArrayGet(pCheck->colIdList, i);
157✔
314
        if (pForbidColId == NULL) {
157!
315
          continue;
×
316
        }
317

318
        if ((*pForbidColId) == colId) {
157✔
319
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
35✔
320
          return -1;
35✔
321
        }
322
      }
323
    }
324
  }
325

326
  return 0;
98✔
327
}
328

329
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
18,969✔
330
  int32_t vgId = TD_VID(pTq->pVnode);
18,969✔
331
  taosWLockLatch(&pTq->lock);
18,969✔
332
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
18,969!
333
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
18,969✔
334

335
    while (pIter) {
38,408✔
336
      STqHandle* pHandle = *(STqHandle**)pIter;
19,439✔
337
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
19,439!
338

339
      if (pHandle->msg == NULL) {
19,439!
340
        tqError("pHandle->msg should not be null");
×
341
        taosHashCancelIterate(pTq->pPushMgr, pIter);
×
342
        break;
×
343
      } else {
344
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME,
19,439✔
345
                       .pCont = pHandle->msg->pCont,
19,439✔
346
                       .contLen = pHandle->msg->contLen,
19,439✔
347
                       .info = pHandle->msg->info};
19,439✔
348
        if (tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg) != 0){
19,439!
349
          tqError("vgId:%d tmsgPutToQueue failed, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
×
350
        }
351
        taosMemoryFree(pHandle->msg);
19,439✔
352
        pHandle->msg = NULL;
19,439✔
353
      }
354

355
      pIter = taosHashIterate(pTq->pPushMgr, pIter);
19,439✔
356
    }
357

358
    taosHashClear(pTq->pPushMgr);
18,969✔
359
  }
360
  taosWUnLockLatch(&pTq->lock);
18,969✔
361
  return 0;
18,969✔
362
}
363

364
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
56,912✔
365
  SMqPollReq req = {0};
56,912✔
366
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
56,912✔
367
  if (code < 0) {
56,907!
368
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
369
    terrno = TSDB_CODE_INVALID_MSG;
×
370
    goto END;
×
371
  }
372

373
  int64_t      consumerId = req.consumerId;
56,915✔
374
  int32_t      reqEpoch = req.epoch;
56,915✔
375
  STqOffsetVal reqOffset = req.reqOffset;
56,915✔
376
  int32_t      vgId = TD_VID(pTq->pVnode);
56,915✔
377
  STqHandle*   pHandle = NULL;
56,915✔
378

379
  while (1) {
2✔
380
    taosWLockLatch(&pTq->lock);
56,917✔
381
    // 1. find handle
382
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
56,922✔
383
    if (code != TDB_CODE_SUCCESS) {
56,911✔
384
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
53!
385
      terrno = TSDB_CODE_INVALID_MSG;
53✔
386
      taosWUnLockLatch(&pTq->lock);
53✔
387
      return -1;
53✔
388
    }
389

390
    // 2. check rebalance status
391
    if (pHandle->consumerId != consumerId) {
56,858✔
392
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
6!
393
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
394
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
395
      terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
6✔
396
      taosWUnLockLatch(&pTq->lock);
6✔
397
      code = -1;
6✔
398
      goto END;
6✔
399
    }
400

401
    bool exec = tqIsHandleExec(pHandle);
56,852✔
402
    if (!exec) {
56,852✔
403
      tqSetHandleExec(pHandle);
56,849✔
404
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
405
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
56,849!
406
              req.subKey, pHandle);
407
      taosWUnLockLatch(&pTq->lock);
56,858✔
408
      break;
56,863✔
409
    }
410
    taosWUnLockLatch(&pTq->lock);
3✔
411

412
    tqDebug("tmq poll: consumer:0x%" PRIx64
2!
413
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
414
            consumerId, vgId, req.subKey, pHandle);
415
    taosMsleep(10);
2✔
416
  }
417

418
  // 3. update the epoch value
419
  if (pHandle->epoch < reqEpoch) {
56,863✔
420
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
1,388!
421
            reqEpoch);
422
    pHandle->epoch = reqEpoch;
1,388✔
423
  }
424

425
  char buf[TSDB_OFFSET_LEN] = {0};
56,863✔
426
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
56,863✔
427
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s,QID:0x%" PRIx64,
56,863!
428
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
429

430
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
56,863✔
431
  tqSetHandleIdle(pHandle);
56,863✔
432

433
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
56,863!
434
          req.subKey, pHandle);
435

436
END:
×
437
  tDestroySMqPollReq(&req);
56,869✔
438
  return code;
56,869✔
439
}
440

441
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
6✔
442
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
6✔
443
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
6✔
444

445
  SMqVgOffset vgOffset = {0};
6✔
446

447
  SDecoder decoder;
448
  tDecoderInit(&decoder, (uint8_t*)data, len);
6✔
449
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
6!
450
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
451
    return terrno;
×
452
  }
453

454
  tDecoderClear(&decoder);
6✔
455

456
  STqOffset* pSavedOffset = NULL;
6✔
457
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
6✔
458
  if (code != 0) {
6✔
459
    return TSDB_CODE_TMQ_NO_COMMITTED;
5✔
460
  }
461
  vgOffset.offset = *pSavedOffset;
1✔
462

463
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
1!
464
  if (code < 0) {
1!
465
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
466
  }
467

468
  void* buf = rpcMallocCont(len);
1✔
469
  if (buf == NULL) {
1!
470
    return terrno;
×
471
  }
472
  SEncoder encoder = {0};
1✔
473
  tEncoderInit(&encoder, buf, len);
1✔
474
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
1✔
475
  tEncoderClear(&encoder);
1✔
476
  if (code < 0) {
1!
477
    rpcFreeCont(buf);
×
478
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
479
  }
480

481
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
1✔
482

483
  tmsgSendRsp(&rsp);
1✔
484
  return 0;
1✔
485
}
486

487
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
13✔
488
  int32_t    code = 0;
13✔
489
  SMqPollReq req = {0};
13✔
490
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
13!
491
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
492
    return TSDB_CODE_INVALID_MSG;
×
493
  }
494

495
  int64_t      consumerId = req.consumerId;
13✔
496
  STqOffsetVal reqOffset = req.reqOffset;
13✔
497
  int32_t      vgId = TD_VID(pTq->pVnode);
13✔
498

499
  // 1. find handle
500
  taosRLockLatch(&pTq->lock);
13✔
501
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
13✔
502
  if (pHandle == NULL) {
13!
503
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
504
    taosRUnLockLatch(&pTq->lock);
×
505
    return TSDB_CODE_INVALID_MSG;
×
506
  }
507

508
  // 2. check rebalance status
509
  if (pHandle->consumerId != consumerId) {
13!
510
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
511
            consumerId, vgId, req.subKey, pHandle->consumerId);
512
    taosRUnLockLatch(&pTq->lock);
×
513
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
514
  }
515

516
  int64_t sver = 0, ever = 0;
13✔
517
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
13✔
518
  taosRUnLockLatch(&pTq->lock);
13✔
519

520
  SMqDataRsp dataRsp = {0};
13✔
521
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
13✔
522
  if (code != 0) {
13!
523
    return code;
×
524
  }
525

526
  if (req.useSnapshot == true) {
13!
527
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
528
    code = TSDB_CODE_INVALID_PARA;
×
529
    goto END;
×
530
  }
531

532
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
13✔
533

534
  if (reqOffset.type == TMQ_OFFSET__LOG) {
13✔
535
    dataRsp.rspOffset.version = reqOffset.version;
3✔
536
  } else if (reqOffset.type < 0) {
10!
537
    STqOffset* pOffset = NULL;
10✔
538
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
10✔
539
    if (code == 0) {
10✔
540
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
1!
541
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
542
        code = TSDB_CODE_INVALID_PARA;
×
543
        goto END;
×
544
      }
545

546
      dataRsp.rspOffset.version = pOffset->val.version;
1✔
547
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
1!
548
             req.subKey, dataRsp.rspOffset.version);
549
    } else {
550
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
9✔
551
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
8✔
552
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
1!
553
        dataRsp.rspOffset.version = ever;
1✔
554
      }
555
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
9!
556
             dataRsp.rspOffset.version);
557
    }
558
  } else {
559
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
560
            reqOffset.type);
561
    code = TSDB_CODE_INVALID_PARA;
×
562
    goto END;
×
563
  }
564

565
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
13✔
566

567
END:
13✔
568
  tDeleteMqDataRsp(&dataRsp);
13✔
569
  return code;
13✔
570
}
571

572
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
861✔
573
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
861✔
574
  int32_t        vgId = TD_VID(pTq->pVnode);
861✔
575

576
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
861!
577
  int32_t code = 0;
861✔
578

579
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
861✔
580
  if (pHandle) {
862✔
581
    while (1) {
×
582
      taosWLockLatch(&pTq->lock);
861✔
583
      bool exec = tqIsHandleExec(pHandle);
861✔
584

585
      if (exec) {
861!
586
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
×
587
               pHandle->subKey, pHandle);
588
        taosWUnLockLatch(&pTq->lock);
×
589
        taosMsleep(10);
×
590
        continue;
×
591
      }
592
      tqUnregisterPushHandle(pTq, pHandle);
861✔
593
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
861✔
594
      if (code != 0) {
861!
595
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
596
      }
597
      taosWUnLockLatch(&pTq->lock);
861✔
598
      break;
861✔
599
    }
600
  }
601

602
  taosWLockLatch(&pTq->lock);
862✔
603
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
862✔
604
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
257!
605
  }
606
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
862✔
607
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
256!
608
  }
609

610
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
862!
611
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
612
  }
613
  taosWUnLockLatch(&pTq->lock);
862✔
614

615
  return 0;
862✔
616
}
617

618
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
118✔
619
  STqCheckInfo info = {0};
118✔
620
  int32_t      code = tqMetaDecodeCheckInfo(&info, msg, msgLen);
118✔
621
  if (code != 0) {
118!
622
    return code;
×
623
  }
624

625
  code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
118✔
626
  if (code != 0) {
118!
627
    tDeleteSTqCheckInfo(&info);
×
628
    return code;
×
629
  }
630

631
  return tqMetaSaveInfo(pTq, pTq->pCheckStore, info.topic, strlen(info.topic), msg, msgLen);
118✔
632
}
633

634
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
12✔
635
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
12✔
636
    return TSDB_CODE_TSC_INTERNAL_ERROR;
2✔
637
  }
638
  return tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, strlen(msg));
10✔
639
}
640

641
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
3,107✔
642
  int         ret = 0;
3,107✔
643
  SMqRebVgReq req = {0};
3,107✔
644
  SDecoder    dc = {0};
3,107✔
645

646
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
3,107✔
647
  ret = tDecodeSMqRebVgReq(&dc, &req);
3,107✔
648
  if (ret < 0) {
3,106!
649
    goto end;
×
650
  }
651

652
  tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
3,106!
653
         req.oldConsumerId, req.newConsumerId);
654

655
  taosRLockLatch(&pTq->lock);
3,112✔
656
  STqHandle* pHandle = NULL;
3,112✔
657
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
3,112✔
658
  if (code != 0){
3,111✔
659
    tqInfo("vgId:%d, tq process sub req:%s, no such handle, create new one", pTq->pVnode->config.vgId, req.subKey);
1,415!
660
  }
661
  taosRUnLockLatch(&pTq->lock);
3,112✔
662
  if (pHandle == NULL) {
3,112✔
663
    if (req.oldConsumerId != -1) {
1,416✔
664
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
2!
665
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
666
    }
667
    if (req.newConsumerId == -1) {
1,416✔
668
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
2!
669
      ret = TSDB_CODE_INVALID_PARA;
2✔
670
      goto end;
2✔
671
    }
672
    STqHandle handle = {0};
1,414✔
673
    ret = tqMetaCreateHandle(pTq, &req, &handle);
1,414✔
674
    if (ret < 0) {
1,414!
675
      tqDestroyTqHandle(&handle);
×
676
      goto end;
×
677
    }
678
    taosWLockLatch(&pTq->lock);
1,414✔
679
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
1,414✔
680
    taosWUnLockLatch(&pTq->lock);
1,414✔
681
  } else {
682
    while (1) {
×
683
      taosWLockLatch(&pTq->lock);
1,696✔
684
      bool exec = tqIsHandleExec(pHandle);
1,696✔
685
      if (exec) {
1,696!
686
        tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p",
×
687
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
688
        taosWUnLockLatch(&pTq->lock);
×
689
        taosMsleep(10);
×
690
        continue;
×
691
      }
692
      if (pHandle->consumerId == req.newConsumerId) {  // do nothing
1,696✔
693
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
99!
694
      } else {
695
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
1,597!
696
               req.newConsumerId);
697

698
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
1,597✔
699
        atomic_store_32(&pHandle->epoch, 0);
1,597✔
700
        tqUnregisterPushHandle(pTq, pHandle);
1,597✔
701
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
1,597✔
702
      }
703
      taosWUnLockLatch(&pTq->lock);
1,696✔
704
      break;
1,696✔
705
    }
706
  }
707

708
end:
3,112✔
709
  tDecoderClear(&dc);
3,112✔
710
  return ret;
3,111✔
711
}
712

713
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
50,062✔
714

715
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
14,329✔
716
  STQ*             pTq = (STQ*)pTqObj;
14,329✔
717
  int32_t          vgId = TD_VID(pTq->pVnode);
14,329✔
718
  SCheckpointInfo* pChkInfo = NULL;
14,329✔
719

720
  tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
14,329✔
721

722
  int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer);
14,330✔
723
  if (code != TSDB_CODE_SUCCESS) {
14,328!
724
    return code;
×
725
  }
726

727
  pTask->pBackend = NULL;
14,328✔
728

729
  // sink
730
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
14,328✔
731
  if (pOutputInfo->type == TASK_OUTPUT__SMA) {
14,328✔
732
    pOutputInfo->smaSink.vnode = pTq->pVnode;
66✔
733
    pOutputInfo->smaSink.smaSink = smaHandleRes;
66✔
734
  } else if (pOutputInfo->type == TASK_OUTPUT__TABLE) {
14,262✔
735
    pOutputInfo->tbSink.vnode = pTq->pVnode;
7,003✔
736
    pOutputInfo->tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
7,003✔
737

738
    int32_t   ver1 = 1;
7,003✔
739
    SMetaInfo info = {0};
7,003✔
740
    code = metaGetInfo(pTq->pVnode->pMeta, pOutputInfo->tbSink.stbUid, &info, NULL);
7,003✔
741
    if (code == TSDB_CODE_SUCCESS) {
7,003✔
742
      ver1 = info.skmVer;
6,436✔
743
    }
744

745
    SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper;
7,003✔
746
    pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
7,003✔
747
    if (pOutputInfo->tbSink.pTSchema == NULL) {
7,007!
748
      return terrno;
×
749
    }
750

751
    pOutputInfo->tbSink.pTbInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
7,007✔
752
    if (pOutputInfo->tbSink.pTbInfo == NULL) {
7,003!
753
      tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
×
754
      return terrno;
×
755
    }
756

757
    tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTbInfo, freePtr);
7,003✔
758
  }
759

760
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
14,327✔
761
    bool scanDropCtb = pTask->subtableWithoutMd5 ? true : false;
7,214✔
762
    SWalFilterCond cond = {.deleteMsg = 1, .scanDropCtb = scanDropCtb};  // delete msg also extract from wal files
7,214✔
763
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
7,214✔
764
    if (pTask->exec.pWalReader == NULL) {
7,217✔
765
      tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno));
1!
766
      return terrno;
1✔
767
    }
768
  }
769

770
  streamTaskResetUpstreamStageInfo(pTask);
14,329✔
771

772
  pChkInfo = &pTask->chkInfo;
14,334✔
773
  tqSetRestoreVersionInfo(pTask);
14,334✔
774

775
  char*       p = streamTaskGetStatus(pTask).name;
14,329✔
776
  const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
14,329✔
777

778
  if (pTask->info.fillHistory) {
14,331✔
779
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
5,135!
780
           " nextProcessVer:%" PRId64
781
           " child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x "
782
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
783
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
784
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
785
           (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
786
  } else {
787
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
9,196✔
788
           " nextProcessVer:%" PRId64
789
           " child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x "
790
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
791
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
792
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
793
           (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
794

795
    if (pChkInfo->checkpointVer > pChkInfo->nextProcessVer) {
9,202!
796
      tqError("vgId:%d build stream task, s-task:%s, checkpointVer:%" PRId64 " > nextProcessVer:%" PRId64, vgId,
×
797
              pTask->id.idStr, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
798
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
799
    }
800
  }
801

802
  return 0;
14,338✔
803
}
804

805
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
21,380✔
806

807
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
21,495✔
808
  return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
21,495✔
809
}
810

811
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
13,759✔
812
  return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen,
13,772✔
813
                                      vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
13,759✔
814
}
815

816
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
2,371✔
817
  const char*    id = pTask->id.idStr;
2,371✔
818
  int64_t        nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
2,371✔
819
  SVersionRange* pStep2Range = &pTask->step2Range;
2,371✔
820
  int32_t        vgId = pTask->pMeta->vgId;
2,371✔
821

822
  // if it's an source task, extract the last version in wal.
823
  bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
2,371✔
824
  pTask->execInfo.step2Start = taosGetTimestampMs();
2,371✔
825

826
  if (done) {
2,371✔
827
    qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id,
860✔
828
           pStep2Range->minVer, pStep2Range->maxVer, 0.0);
829
    int32_t code = streamTaskPutTranstateIntoInputQ(pTask);  // todo: msg lost.
860✔
830
    if (code) {
860!
831
      qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
×
832
    }
833
    (void)streamExecTask(pTask);  // exec directly
860✔
834
  } else {
835
    STimeWindow* pWindow = &pTask->dataRange.window;
1,511✔
836
    tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
1,511✔
837
            ", do secondary scan-history from WAL after halt the related stream task:%s",
838
            id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey,
839
            pStreamTask->id.idStr);
840
    if (pTask->status.schedStatus != TASK_SCHED_STATUS__WAITING) {
1,511!
841
      tqError("s-task:%s level:%d unexpected sched-status:%d", id, pTask->info.taskLevel, pTask->status.schedStatus);
×
842
    }
843

844
    int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
1,511✔
845
    if (code) {
1,511!
846
      tqError("s-task:%s level:%d failed to set step2 param", id, pTask->info.taskLevel);
×
847
    }
848

849
    int64_t dstVer = pStep2Range->minVer;
1,511✔
850
    pTask->chkInfo.nextProcessVer = dstVer;
1,511✔
851

852
    walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
1,511✔
853
    tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
1,511✔
854
            pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
855

856
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
1,511✔
857

858
    // now the fill-history task starts to scan data from wal files.
859
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
1,511✔
860
    if (code == TSDB_CODE_SUCCESS) {
1,511!
861
      code = tqScanWalAsync(pTq, false);
1,511✔
862
      if (code) {
1,511!
863
        tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
×
864
      }
865
    }
866
  }
867
}
2,371✔
868

869
int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
2,371✔
870
  STQ* pTq = param;
2,371✔
871

872
  SStreamMeta* pMeta = pStreamTask->pMeta;
2,371✔
873
  STaskId      hId = pStreamTask->hTaskInfo.id;
2,371✔
874
  SStreamTask* pTask = NULL;
2,371✔
875
  int32_t      code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask);
2,371✔
876
  if (pTask == NULL) {
2,371!
877
    tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t)hId.taskId);
×
878
    return TSDB_CODE_SUCCESS;
×
879
  }
880

881
  doStartFillhistoryStep2(pTask, pStreamTask, pTq);
2,371✔
882

883
  streamMetaReleaseTask(pMeta, pTask);
2,371✔
884
  return TSDB_CODE_SUCCESS;
2,371✔
885
}
886

887
// this function should be executed by only one thread, so we set an sentinel to protect this function
888
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
2,568✔
889
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
2,568✔
890
  SStreamMeta*           pMeta = pTq->pStreamMeta;
2,568✔
891
  int32_t                code = TSDB_CODE_SUCCESS;
2,568✔
892
  SStreamTask*           pTask = NULL;
2,568✔
893
  SStreamTask*           pStreamTask = NULL;
2,568✔
894

895
  code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
2,568✔
896
  if (pTask == NULL) {
2,568!
897
    tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed",
×
898
            pMeta->vgId, pReq->taskId);
899
    return code;
×
900
  }
901

902
  // do recovery step1
903
  const char* id = pTask->id.idStr;
2,568✔
904
  char*       pStatus = streamTaskGetStatus(pTask).name;
2,568✔
905

906
  // avoid multi-thread exec
907
  while (1) {
×
908
    int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1);
2,568✔
909
    if (sentinel != 0) {
2,568!
910
      tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id);
×
911
      taosMsleep(100);
×
912
    } else {
913
      break;
2,568✔
914
    }
915
  }
916

917
  // let's decide which step should be executed now
918
  if (pTask->execInfo.step1Start == 0) {
2,568✔
919
    int64_t ts = taosGetTimestampMs();
2,385✔
920
    pTask->execInfo.step1Start = ts;
2,385✔
921
    tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
2,385✔
922
  } else {
923
    if (pTask->execInfo.step2Start == 0) {
183✔
924
      tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
180!
925
              id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
926
    } else {
927
      tqDebug("s-task:%s already in step2, no need to scan-history data, step2 startTs:%" PRId64, id,
3!
928
              pTask->execInfo.step2Start);
929

930
      atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
3✔
931
      streamMetaReleaseTask(pMeta, pTask);
3✔
932
      return 0;
3✔
933
    }
934
  }
935

936
  // we have to continue retrying to successfully execute the scan history task.
937
  if (!streamTaskSetSchedStatusWait(pTask)) {
2,565!
938
    tqError(
×
939
        "s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
940
        "sched-status:%d",
941
        id, pTask->status.schedStatus);
942
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
×
943
    streamMetaReleaseTask(pMeta, pTask);
×
944
    return 0;
×
945
  }
946

947
  int64_t              st = taosGetTimestampMs();
2,565✔
948
  SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
2,565✔
949

950
  double el = (taosGetTimestampMs() - st) / 1000.0;
2,565✔
951
  pTask->execInfo.step1El += el;
2,565✔
952

953
  if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
2,565✔
954
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
190✔
955
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
190✔
956

957
    if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
190✔
958
      streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
181✔
959
    } else {
960
      SStreamTaskState p = streamTaskGetStatus(pTask);
9✔
961
      ETaskStatus      s = p.state;
9✔
962

963
      if (s == TASK_STATUS__PAUSE) {
9!
964
        tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", id, el,
×
965
                pTask->execInfo.step1El, status);
966
      } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) {
9!
967
        tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", id, p.name,
9!
968
                pTask->execInfo.step1El);
969
      }
970
    }
971

972
    streamMetaReleaseTask(pMeta, pTask);
190✔
973
    return 0;
190✔
974
  }
975

976
  // the following procedure should be executed, no matter status is stop/pause or not
977
  tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
2,375✔
978

979
  if (pTask->info.fillHistory != 1) {
2,375!
980
    tqError("s-task:%s fill-history is disabled, unexpected", id);
×
981
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
982
  }
983

984
  // 1. get the related stream task
985
  code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
2,375✔
986
  if (pStreamTask == NULL) {
2,375✔
987
    tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
4!
988
            pTask->streamTaskId.taskId, pTask->id.idStr);
989

990
    tqDebug("s-task:%s fill-history task set status to be dropping", id);
4!
991
    code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
4✔
992

993
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
4✔
994
    streamMetaReleaseTask(pMeta, pTask);
4✔
995
    return code;
4✔
996
  }
997

998
  if (pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) {
2,371!
999
    tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel);
×
1000
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1001
  }
1002

1003
  code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
2,371✔
1004
  streamMetaReleaseTask(pMeta, pStreamTask);
2,371✔
1005

1006
  atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
2,371✔
1007
  streamMetaReleaseTask(pMeta, pTask);
2,371✔
1008
  return code;
2,371✔
1009
}
1010

1011
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
211,469✔
1012
  int32_t  code = 0;
211,469✔
1013
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
211,469✔
1014
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
211,469✔
1015
  SDecoder decoder;
1016

1017
  SStreamTaskRunReq req = {0};
211,469✔
1018
  tDecoderInit(&decoder, (uint8_t*)msg, len);
211,469✔
1019
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
211,538!
1020
    tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
×
1021
    tDecoderClear(&decoder);
×
1022
    return TSDB_CODE_SUCCESS;
×
1023
  }
1024

1025
  tDecoderClear(&decoder);
211,532✔
1026

1027
  // extracted submit data from wal files for all tasks
1028
  if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
211,491✔
1029
    return tqScanWal(pTq);
97,445✔
1030
  }
1031

1032
  code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
114,046✔
1033
  if (code) {
114,086✔
1034
    tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
22!
1035
    return code;
22✔
1036
  }
1037

1038
  // let's continue scan data in the wal files
1039
  if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
114,064✔
1040
    code = tqScanWalAsync(pTq, false);  // it's ok to failed
90,314✔
1041
    if (code) {
90,348✔
1042
      tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
4!
1043
    }
1044
  }
1045

1046
  return code;
114,094✔
1047
}
1048

1049
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
54,559✔
1050
  return tqStreamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg);
54,559✔
1051
}
1052

1053
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
54,796✔
1054
  return tqStreamTaskProcessDispatchRsp(pTq->pStreamMeta, pMsg);
54,796✔
1055
}
1056

1057
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
6,908✔
1058
  return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen);
6,908✔
1059
}
1060

1061
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
5,785✔
1062
  return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg);
5,785✔
1063
}
1064

1065
int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg) {
98✔
1066
  return tqStreamTaskProcessConsenChkptIdReq(pTq->pStreamMeta, pMsg);
98✔
1067
}
1068

1069
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,385✔
1070
  return tqStreamTaskProcessTaskPauseReq(pTq->pStreamMeta, msg);
1,385✔
1071
}
1072

1073
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,369✔
1074
  return tqStreamTaskProcessTaskResumeReq(pTq, sversion, msg, true);
1,369✔
1075
}
1076

1077
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
557✔
1078
  return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
557✔
1079
}
1080

1081
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; }
442✔
1082

1083
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
×
1084
  char*               msgStr = pMsg->pCont;
×
1085
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
×
1086
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
×
1087
  int32_t             code = 0;
×
1088
  SStreamProgressReq  req;
1089
  char*               pRspBuf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(SStreamProgressRsp));
×
1090
  SStreamProgressRsp* pRsp = POINTER_SHIFT(pRspBuf, sizeof(SMsgHead));
×
1091
  if (!pRspBuf) {
×
1092
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1093
    code = -1;
×
1094
    goto _OVER;
×
1095
  }
1096

1097
  code = tDeserializeStreamProgressReq(msgBody, msgLen, &req);
×
1098
  if (code == TSDB_CODE_SUCCESS) {
×
1099
    code = tqGetStreamExecInfo(pTq->pVnode, req.streamId, &pRsp->progressDelay, &pRsp->fillHisFinished);
×
1100
  }
1101
  if (code == TSDB_CODE_SUCCESS) {
×
1102
    pRsp->fetchIdx = req.fetchIdx;
×
1103
    pRsp->subFetchIdx = req.subFetchIdx;
×
1104
    pRsp->vgId = req.vgId;
×
1105
    pRsp->streamId = req.streamId;
×
1106
    code = tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp);
×
1107
    if (code) {
×
1108
      goto _OVER;
×
1109
    }
1110

1111
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
×
1112
    rsp.pCont = pRspBuf;
×
1113
    pRspBuf = NULL;
×
1114
    rsp.contLen = sizeof(SMsgHead) + sizeof(SStreamProgressRsp);
×
1115
    tmsgSendRsp(&rsp);
×
1116
  }
1117

1118
_OVER:
×
1119
  if (pRspBuf) {
×
1120
    taosMemoryFree(pRspBuf);
×
1121
  }
1122
  return code;
×
1123
}
1124

1125
// no matter what kinds of error happened, make sure the mnode will receive the success execution code.
1126
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
3,630✔
1127
  int32_t      vgId = TD_VID(pTq->pVnode);
3,630✔
1128
  SStreamMeta* pMeta = pTq->pStreamMeta;
3,630✔
1129
  char*        msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
3,630✔
1130
  int32_t      len = pMsg->contLen - sizeof(SMsgHead);
3,630✔
1131
  int32_t      code = 0;
3,630✔
1132

1133
  // disable auto rsp to mnode
1134
  pRsp->info.handle = NULL;
3,630✔
1135

1136
  SStreamCheckpointSourceReq req = {0};
3,630✔
1137
  SDecoder                   decoder;
1138
  tDecoderInit(&decoder, (uint8_t*)msg, len);
3,630✔
1139
  if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
3,624!
1140
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
1141
    tDecoderClear(&decoder);
×
1142
    tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
×
1143

1144
    SRpcMsg rsp = {0};
×
1145
    int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
×
1146
    if (ret) {  // suppress the error in build checkpointsource rsp
×
1147
      tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
×
1148
    }
1149

1150
    tmsgSendRsp(&rsp);         // error occurs
×
1151
    return TSDB_CODE_SUCCESS;  // always return success to mnode, todo: handle failure of build and send msg to mnode
×
1152
  }
1153
  tDecoderClear(&decoder);
3,611✔
1154

1155
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
3,631✔
1156
    tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId);
11!
1157
    SRpcMsg rsp = {0};
11✔
1158
    int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
11✔
1159
    if (ret) {  // suppress the error in build checkpointsource rsp
11!
1160
      tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
×
1161
    }
1162

1163
    tmsgSendRsp(&rsp);         // error occurs
11✔
1164
    return TSDB_CODE_SUCCESS;  // always return success to mnode, todo: handle failure of build and send msg to mnode
11✔
1165
  }
1166

1167
  if (!pTq->pVnode->restored) {
3,618✔
1168
    tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64
394✔
1169
            ", transId:%d s-task:0x%x ignore it",
1170
            vgId, req.checkpointId, req.transId, req.taskId);
1171
    SRpcMsg rsp = {0};
394✔
1172
    int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
394✔
1173
    if (ret) {  // suppress the error in build checkpointsource rsp
389!
1174
      tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code));
×
1175
    }
1176

1177
    tmsgSendRsp(&rsp);         // error occurs
389✔
1178
    return TSDB_CODE_SUCCESS;  // always return success to mnode, , todo: handle failure of build and send msg to mnode
390✔
1179
  }
1180

1181
  SStreamTask* pTask = NULL;
3,224✔
1182
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
3,224✔
1183
  if (pTask == NULL || code != 0) {
3,225!
1184
    tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
2!
1185
            " transId:%d it may have been destroyed",
1186
            vgId, req.taskId, req.checkpointId, req.transId);
1187
    SRpcMsg rsp = {0};
2✔
1188
    int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
2✔
1189
    if (ret) {  // suppress the error in build checkpointsource rsp
×
1190
      tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
1191
    }
1192
    tmsgSendRsp(&rsp);  // error occurs
×
1193
    return TSDB_CODE_SUCCESS;
×
1194
  }
1195

1196
  if (pTask->status.downstreamReady != 1) {
3,223!
1197
    streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId);  // record the latest failed checkpoint id
×
1198
    tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
×
1199
            ", transId:%d set it failed",
1200
            pTask->id.idStr, req.checkpointId, req.transId);
1201
    streamMetaReleaseTask(pMeta, pTask);
×
1202

1203
    SRpcMsg rsp = {0};
×
1204
    int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
×
1205
    if (ret) {  // suppress the error in build checkpointsource rsp
×
1206
      tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
1207
    }
1208

1209
    tmsgSendRsp(&rsp);         // error occurs
×
1210
    return TSDB_CODE_SUCCESS;  // todo retry handle error
×
1211
  }
1212

1213
  // todo save the checkpoint failed info
1214
  streamMutexLock(&pTask->lock);
3,223✔
1215
  ETaskStatus status = streamTaskGetStatus(pTask).state;
3,230✔
1216

1217
  if (req.mndTrigger == 1) {
3,230✔
1218
    if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
910!
1219
      tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure",
×
1220
              pTask->id.idStr, req.checkpointId);
1221

1222
      streamMutexUnlock(&pTask->lock);
×
1223
      streamMetaReleaseTask(pMeta, pTask);
×
1224

1225
      SRpcMsg rsp = {0};
×
1226
      int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
×
1227
      if (ret) {  // suppress the error in build checkpointsource rsp
×
1228
        tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
1229
      }
1230

1231
      tmsgSendRsp(&rsp);  // error occurs
×
1232
      return TSDB_CODE_SUCCESS;
×
1233
    }
1234
  } else {
1235
    if (status != TASK_STATUS__HALT) {
2,320!
1236
      tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
×
1237
      //      streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
1238
    }
1239
  }
1240

1241
  // check if the checkpoint msg already sent or not.
1242
  if (status == TASK_STATUS__CK) {
3,222!
1243
    int64_t checkpointId = 0;
×
1244
    streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId);
×
1245

1246
    tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1247
           " transId:%d already handled, ignore msg and continue process checkpoint",
1248
           pTask->id.idStr, checkpointId, req.transId);
1249

1250
    streamMutexUnlock(&pTask->lock);
×
1251
    streamMetaReleaseTask(pMeta, pTask);
×
1252

1253
    return TSDB_CODE_SUCCESS;
1✔
1254
  } else {  // checkpoint already finished, and not in checkpoint status
1255
    if (req.checkpointId <= pTask->chkInfo.checkpointId) {
3,222!
1256
      tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1257
             " transId:%d already handled, return success",
1258
             pTask->id.idStr, req.checkpointId, req.transId);
1259

1260
      streamMutexUnlock(&pTask->lock);
×
1261
      streamMetaReleaseTask(pMeta, pTask);
×
1262

1263
      SRpcMsg rsp = {0};
×
1264
      int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
×
1265
      if (ret) {  // suppress the error in build checkpointsource rsp
×
1266
        tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
1267
      }
1268

1269
      tmsgSendRsp(&rsp);  // error occurs
×
1270

1271
      return TSDB_CODE_SUCCESS;
×
1272
    }
1273
  }
1274

1275
  code = streamProcessCheckpointSourceReq(pTask, &req);
3,222✔
1276
  streamMutexUnlock(&pTask->lock);
3,230✔
1277

1278
  if (code) {
3,229!
1279
    qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
×
1280
           tstrerror(code));
1281
    return code;
×
1282
  }
1283

1284
  if (req.mndTrigger) {
3,229✔
1285
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ",
911!
1286
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
1287
  } else {
1288
    const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask));
2,318✔
1289
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64
2,322!
1290
           ", transId:%d after transfer-state, prev status:%s",
1291
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
1292
  }
1293

1294
  code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
3,234✔
1295
  if (code != TSDB_CODE_SUCCESS) {
3,235!
1296
    SRpcMsg rsp = {0};
×
1297
    int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS);
×
1298
    if (ret) {  // suppress the error in build checkpointsource rsp
×
1299
      tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code));
×
1300
    }
1301
    tmsgSendRsp(&rsp);  // error occurs
×
1302
    return TSDB_CODE_SUCCESS;
×
1303
  }
1304

1305
  streamMetaReleaseTask(pMeta, pTask);
3,235✔
1306
  return TSDB_CODE_SUCCESS;
3,233✔
1307
}
1308

1309
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
1310
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
8,053✔
1311
  int32_t vgId = TD_VID(pTq->pVnode);
8,053✔
1312

1313
  SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont;
8,053✔
1314
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
8,053!
1315
    tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
×
1316
            (int32_t)pReq->downstreamTaskId);
1317
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1318
  }
1319

1320
  return tqStreamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg);
8,061✔
1321
}
1322

1323
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
78✔
1324
  return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored);
78✔
1325
}
1326

1327
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
×
1328
  return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont);
×
1329
}
1330

1331
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
×
1332
  int32_t vgId = TD_VID(pTq->pVnode);
×
1333

1334
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
×
1335
    SRetrieveChkptTriggerReq req = {0};
×
1336

1337
    char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1338
    int32_t  len = pMsg->contLen - sizeof(SMsgHead);
×
1339
    SDecoder decoder = {0};
×
1340

1341
    tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1342
    if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1343
      tDecoderClear(&decoder);
×
1344
      tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId);
×
1345
      return TSDB_CODE_INVALID_MSG;
×
1346
    }
1347
    tDecoderClear(&decoder);
×
1348

1349
    tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId,
×
1350
            req.downstreamTaskId);
1351
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1352
  }
1353

1354
  return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg);
×
1355
}
1356

1357
int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) {
×
1358
  return tqStreamTaskProcessRetrieveTriggerRsp(pTq->pStreamMeta, pMsg);
×
1359
}
1360

1361
// this function is needed, do not try to remove it.
1362
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); }
21,619✔
1363

1364
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
4,693✔
1365
  return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
4,693✔
1366
}
1367

1368
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
8,051✔
1369
  return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg);
8,051✔
1370
}
1371

1372
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
6,171✔
1373
  return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
6,171✔
1374
}
1375

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc