• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3951

28 Apr 2025 05:42AM UTC coverage: 62.445% (-0.4%) from 62.853%
#3951

push

travis-ci

web-flow
fix: mnode-status-case (#30871)

155256 of 317429 branches covered (48.91%)

Branch coverage included in aggregate %.

240626 of 316539 relevant lines covered (76.02%)

6221630.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.14
/source/dnode/vnode/src/tq/tq.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include "osDef.h"
18
#include "taoserror.h"
19
#include "tqCommon.h"
20
#include "tstream.h"
21
#include "vnd.h"
22

23
// 0: not init
24
// 1: already inited
25
// 2: wait to be inited or cleanup
26
static int32_t tqInitialize(STQ* pTq);
27

28
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return pHandle != NULL ? TMQ_HANDLE_STATUS_EXEC == pHandle->status : true; }
3,346,736!
29
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
3,346,106✔
30
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
3,346,386✔
31

32
void tqDestroyTqHandle(void* data) {
1,633✔
33
  if (data == NULL) return;
1,633!
34
  STqHandle* pData = (STqHandle*)data;
1,633✔
35
  qDestroyTask(pData->execHandle.task);
1,633✔
36

37
  if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,633✔
38
    taosMemoryFreeClear(pData->execHandle.execCol.qmsg);
1,273!
39
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) {
360✔
40
    tqReaderClose(pData->execHandle.pTqReader);
288✔
41
    walCloseReader(pData->pWalReader);
288✔
42
    taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid);
288✔
43
  } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
72!
44
    walCloseReader(pData->pWalReader);
72✔
45
    tqReaderClose(pData->execHandle.pTqReader);
72✔
46
    taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
72!
47
    nodesDestroyNode(pData->execHandle.execTb.node);
72✔
48
  }
49
  if (pData->msg != NULL) {
1,633!
50
    rpcFreeCont(pData->msg->pCont);
×
51
    taosMemoryFree(pData->msg);
×
52
    pData->msg = NULL;
×
53
  }
54
  if (pData->block != NULL) {
1,633!
55
    blockDataDestroy(pData->block);
×
56
  }
57
  if (pData->pRef) {
1,633✔
58
    walCloseRef(pData->pRef->pWal, pData->pRef->refId);
1,587✔
59
  }
60
  taosHashCleanup(pData->tableCreateTimeHash);
1,633✔
61
}
62

63
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
8,092✔
64
  if (pLeft == NULL || pRight == NULL) {
8,092!
65
    return false;
×
66
  }
67
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
15,811✔
68
         pLeft->val.version == pRight->val.version;
7,719✔
69
}
70

71
int32_t tqOpen(const char* path, SVnode* pVnode) {
11,707✔
72
  if (path == NULL || pVnode == NULL) {
11,707!
73
    return TSDB_CODE_INVALID_PARA;
×
74
  }
75
  STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
11,723!
76
  if (pTq == NULL) {
11,723!
77
    return terrno;
×
78
  }
79

80
  pVnode->pTq = pTq;
11,723✔
81
  pTq->pVnode = pVnode;
11,723✔
82

83
  pTq->path = taosStrdup(path);
11,723!
84
  if (pTq->path == NULL) {
11,721!
85
    return terrno;
×
86
  }
87

88
  pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
11,721✔
89
  if (pTq->pHandle == NULL) {
11,720!
90
    return terrno;
×
91
  }
92
  taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
11,720✔
93

94
  taosInitRWLatch(&pTq->lock);
11,721✔
95

96
  pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
11,721✔
97
  if (pTq->pPushMgr == NULL) {
11,724!
98
    return terrno;
×
99
  }
100

101
  pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
11,724✔
102
  if (pTq->pCheckInfo == NULL) {
11,723!
103
    return terrno;
×
104
  }
105
  taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
11,723✔
106

107
  pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK);
11,723✔
108
  if (pTq->pOffset == NULL) {
11,723!
109
    return terrno;
×
110
  }
111
  taosHashSetFreeFp(pTq->pOffset, (FDelete)tDeleteSTqOffset);
11,723✔
112

113
  return tqInitialize(pTq);
11,723✔
114
}
115

116
int32_t tqInitialize(STQ* pTq) {
11,723✔
117
  if (pTq == NULL) {
11,723!
118
    return TSDB_CODE_INVALID_PARA;
×
119
  }
120
  int32_t vgId = TD_VID(pTq->pVnode);
11,723✔
121
  int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1,
11,723✔
122
                                tqStartTaskCompleteCallback, &pTq->pStreamMeta);
123
  if (code != TSDB_CODE_SUCCESS) {
11,725!
124
    return code;
×
125
  }
126

127
  streamMetaLoadAllTasks(pTq->pStreamMeta);
11,725✔
128
  return tqMetaOpen(pTq);
11,725✔
129
}
130

131
void tqClose(STQ* pTq) {
11,726✔
132
  qDebug("start to close tq");
11,726✔
133
  if (pTq == NULL) {
11,726!
134
    return;
×
135
  }
136

137
  int32_t vgId = 0;
11,726✔
138
  if (pTq->pVnode != NULL) {
11,726✔
139
    vgId = TD_VID(pTq->pVnode);
11,725✔
140
  } else if (pTq->pStreamMeta != NULL) {
1!
141
    vgId = pTq->pStreamMeta->vgId;
×
142
  }
143

144
  // close the stream meta firstly
145
  streamMetaClose(pTq->pStreamMeta);
11,726✔
146

147
  void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
11,726✔
148
  while (pIter) {
11,772✔
149
    STqHandle* pHandle = *(STqHandle**)pIter;
46✔
150
    if (pHandle->msg != NULL) {
46!
151
      tqPushEmptyDataRsp(pHandle, vgId);
46✔
152
      rpcFreeCont(pHandle->msg->pCont);
46✔
153
      taosMemoryFree(pHandle->msg);
46!
154
      pHandle->msg = NULL;
46✔
155
    }
156
    pIter = taosHashIterate(pTq->pPushMgr, pIter);
46✔
157
  }
158

159
  taosHashCleanup(pTq->pHandle);
11,726✔
160
  taosHashCleanup(pTq->pPushMgr);
11,726✔
161
  taosHashCleanup(pTq->pCheckInfo);
11,726✔
162
  taosHashCleanup(pTq->pOffset);
11,726✔
163
  taosMemoryFree(pTq->path);
11,726!
164
  tqMetaClose(pTq);
11,726✔
165
  qDebug("vgId:%d end to close tq", vgId);
11,725✔
166

167
#if 0
168
  streamMetaFreeTQDuringScanWalError(pTq);
169
#endif
170

171
  taosMemoryFree(pTq);
11,726!
172
}
173

174
void tqNotifyClose(STQ* pTq) {
18,637✔
175
  if (pTq == NULL) {
18,637!
176
    return;
×
177
  }
178

179
  if (pTq->pStreamMeta != NULL) {
18,637!
180
    streamMetaNotifyClose(pTq->pStreamMeta);
18,637✔
181
  }
182
}
183

184
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
744✔
185
  if (pHandle == NULL) {
744!
186
    return;
×
187
  }
188
  int32_t    code = 0;
744✔
189
  SMqPollReq req = {0};
744✔
190
  code = tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req);
744✔
191
  if (code < 0) {
743!
192
    tqError("tDeserializeSMqPollReq %d failed, code:%d", pHandle->msg->contLen, code);
×
193
    return;
×
194
  }
195

196
  SMqDataRsp dataRsp = {0};
743✔
197
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
743✔
198
  if (code != 0) {
744!
199
    tqError("tqInitDataRsp failed, code:%d", code);
×
200
    return;
×
201
  }
202
  dataRsp.blockNum = 0;
744✔
203
  char buf[TSDB_OFFSET_LEN] = {0};
744✔
204
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
744✔
205
  tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, QID:0x%" PRIx64, req.consumerId, vgId, buf,
744!
206
         req.reqId);
207

208
  code = tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
744✔
209
  if (code != 0) {
743!
210
    tqError("tqSendDataRsp failed, code:%d", code);
×
211
  }
212
  tDeleteMqDataRsp(&dataRsp);
743✔
213
}
214

215
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp, int32_t type,
2,934,467✔
216
                      int32_t vgId) {
217
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
2,934,467!
218
    return TSDB_CODE_INVALID_PARA;
×
219
  }
220
  int64_t sver = 0, ever = 0;
2,934,471✔
221
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
2,934,471✔
222

223
  char buf1[TSDB_OFFSET_LEN] = {0};
2,934,451✔
224
  char buf2[TSDB_OFFSET_LEN] = {0};
2,934,451✔
225
  (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
2,934,451✔
226
  (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
2,934,464✔
227

228
  tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) start to send rsp, block num:%d, req:%s, rsp:%s, QID:0x%" PRIx64,
2,934,467!
229
          vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
230

231
  return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
2,934,472✔
232
}
233

234
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
9,086✔
235
  if (pTq == NULL) {
9,086!
236
    return TSDB_CODE_INVALID_PARA;
×
237
  }
238
  SMqVgOffset vgOffset = {0};
9,086✔
239
  int32_t     vgId = TD_VID(pTq->pVnode);
9,086✔
240

241
  int32_t  code = 0;
9,086✔
242
  SDecoder decoder;
243
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
9,086✔
244
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
9,087!
245
    code = TSDB_CODE_INVALID_MSG;
×
246
    goto end;
×
247
  }
248

249
  tDecoderClear(&decoder);
9,088✔
250

251
  STqOffset* pOffset = &vgOffset.offset;
9,084✔
252

253
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
9,084!
254
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
436!
255
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
256
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
8,648✔
257
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
8,647✔
258
            pOffset->val.version);
259
  } else {
260
    tqError("invalid commit offset type:%d", pOffset->val.type);
1!
261
    code = TSDB_CODE_INVALID_MSG;
×
262
    goto end;
×
263
  }
264

265
  STqOffset* pSavedOffset = NULL;
9,091✔
266
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
9,091✔
267
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
9,092✔
268
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
8!
269
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
270
    goto end;  // no need to update the offset value
8✔
271
  }
272

273
  // save the new offset value
274
  if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))) {
9,084!
275
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
276
    return -1;
×
277
  }
278

279
  if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg,
9,084!
280
                     msgLen >= sizeof(vgOffset.consumerId) ? msgLen - sizeof(vgOffset.consumerId) : 0) < 0) {
281
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
282
    return -1;
×
283
  }
284

285
  return 0;
9,078✔
286
end:
8✔
287
  tOffsetDestroy(&vgOffset.offset.val);
8✔
288
  return code;
8✔
289
}
290

291
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
26✔
292
  if (pTq == NULL || pMsg == NULL) {
26!
293
    return TSDB_CODE_INVALID_PARA;
×
294
  }
295
  SMqSeekReq req = {0};
26✔
296
  int32_t    vgId = TD_VID(pTq->pVnode);
26✔
297
  SRpcMsg    rsp = {.info = pMsg->info};
26✔
298
  int        code = 0;
26✔
299

300
  if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
26!
301
    code = TSDB_CODE_OUT_OF_MEMORY;
×
302
    goto end;
×
303
  }
304

305
  tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
26!
306
  taosWLockLatch(&pTq->lock);
26✔
307

308
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
26✔
309
  if (pHandle == NULL) {
26!
310
    tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
×
311
    code = 0;
×
312
    taosWUnLockLatch(&pTq->lock);
×
313
    goto end;
×
314
  }
315

316
  // 2. check consumer-vg assignment status
317
  if (pHandle->consumerId != req.consumerId) {
26!
318
    tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
319
            req.consumerId, vgId, req.subKey, pHandle->consumerId);
320
    taosWUnLockLatch(&pTq->lock);
×
321
    code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
322
    goto end;
×
323
  }
324

325
  // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
326
  // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
327
  tqUnregisterPushHandle(pTq, pHandle);
26✔
328
  taosWUnLockLatch(&pTq->lock);
26✔
329

330
end:
26✔
331
  rsp.code = code;
26✔
332
  tmsgSendRsp(&rsp);
26✔
333
  return 0;
26✔
334
}
335

336
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
215✔
337
  if (pTq == NULL) {
215!
338
    return TSDB_CODE_INVALID_PARA;
×
339
  }
340
  void* pIter = NULL;
215✔
341

342
  while (1) {
11✔
343
    pIter = taosHashIterate(pTq->pCheckInfo, pIter);
226✔
344
    if (pIter == NULL) {
226✔
345
      break;
180✔
346
    }
347

348
    STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
46✔
349

350
    if (pCheck->ntbUid == tbUid) {
46!
351
      int32_t sz = taosArrayGetSize(pCheck->colIdList);
46✔
352
      for (int32_t i = 0; i < sz; i++) {
168✔
353
        int16_t* pForbidColId = taosArrayGet(pCheck->colIdList, i);
157✔
354
        if (pForbidColId == NULL) {
157!
355
          continue;
×
356
        }
357

358
        if ((*pForbidColId) == colId) {
157✔
359
          taosHashCancelIterate(pTq->pCheckInfo, pIter);
35✔
360
          return -1;
35✔
361
        }
362
      }
363
    }
364
  }
365

366
  return 0;
180✔
367
}
368

369
int32_t tqProcessPollPush(STQ* pTq) {
22,817✔
370
  if (pTq == NULL) {
22,817!
371
    return TSDB_CODE_INVALID_PARA;
×
372
  }
373
  int32_t vgId = TD_VID(pTq->pVnode);
22,817✔
374
  taosWLockLatch(&pTq->lock);
22,817✔
375
  if (taosHashGetSize(pTq->pPushMgr) > 0) {
22,817!
376
    void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
22,817✔
377

378
    while (pIter) {
46,338✔
379
      STqHandle* pHandle = *(STqHandle**)pIter;
23,521✔
380
      tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
23,521!
381

382
      if (pHandle->msg == NULL) {
23,521!
383
        tqError("pHandle->msg should not be null");
×
384
        taosHashCancelIterate(pTq->pPushMgr, pIter);
×
385
        break;
×
386
      } else {
387
        SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME,
23,521✔
388
                       .pCont = pHandle->msg->pCont,
23,521✔
389
                       .contLen = pHandle->msg->contLen,
23,521✔
390
                       .info = pHandle->msg->info};
23,521✔
391
        if (tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg) != 0){
23,521!
392
          tqError("vgId:%d tmsgPutToQueue failed, consumer:0x%" PRIx64, vgId, pHandle->consumerId);
×
393
        }
394
        taosMemoryFree(pHandle->msg);
23,521!
395
        pHandle->msg = NULL;
23,521✔
396
      }
397

398
      pIter = taosHashIterate(pTq->pPushMgr, pIter);
23,521✔
399
    }
400

401
    taosHashClear(pTq->pPushMgr);
22,817✔
402
  }
403
  taosWUnLockLatch(&pTq->lock);
22,817✔
404
  return 0;
22,817✔
405
}
406

407
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
3,345,739✔
408
  if (pTq == NULL || pMsg == NULL) {
3,345,739!
409
    return TSDB_CODE_INVALID_PARA;
×
410
  }
411
  SMqPollReq req = {0};
3,346,392✔
412
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
3,346,392✔
413
  if (code < 0) {
3,344,600!
414
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
415
    code = TSDB_CODE_INVALID_MSG;
×
416
    goto END;
×
417
  }
418
  if (req.rawData == 1){
3,344,600✔
419
    req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
22,744✔
420
    if (req.uidHash == NULL) {
24,566!
421
      tqError("tq poll rawData taosHashInit failed");
×
422
      code = terrno;
×
423
      goto END;
×
424
    }
425
  }
426
  int64_t      consumerId = req.consumerId;
3,346,422✔
427
  int32_t      reqEpoch = req.epoch;
3,346,422✔
428
  STqOffsetVal reqOffset = req.reqOffset;
3,346,422✔
429
  int32_t      vgId = TD_VID(pTq->pVnode);
3,346,422✔
430
  STqHandle*   pHandle = NULL;
3,346,422✔
431

432
  while (1) {
4✔
433
    taosWLockLatch(&pTq->lock);
3,346,426✔
434
    // 1. find handle
435
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
3,345,333✔
436
    if (code != TDB_CODE_SUCCESS) {
3,344,073✔
437
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found, msg:%s", consumerId, vgId, req.subKey, tstrerror(code));
44!
438
      taosWUnLockLatch(&pTq->lock);
45✔
439
      return code;
45✔
440
    }
441

442
    // 2. check rebalance status
443
    if (pHandle->consumerId != consumerId) {
3,344,029✔
444
      tqError("ERROR tmq poll: consumer:0x%" PRIx64
2!
445
              " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
446
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
447
      code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
2✔
448
      taosWUnLockLatch(&pTq->lock);
2✔
449
      goto END;
2✔
450
    }
451

452
    bool exec = tqIsHandleExec(pHandle);
3,344,027✔
453
    if (!exec) {
3,344,027!
454
      tqSetHandleExec(pHandle);
3,346,106!
455
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
456
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
3,346,106!
457
              req.subKey, pHandle);
458
      taosWUnLockLatch(&pTq->lock);
3,346,754✔
459
      break;
3,346,827✔
460
    }
461
    taosWUnLockLatch(&pTq->lock);
×
462

463
    tqDebug("tmq poll: consumer:0x%" PRIx64
4!
464
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
465
            consumerId, vgId, req.subKey, pHandle);
466
    taosMsleep(10);
4✔
467
  }
468

469
  // 3. update the epoch value
470
  if (pHandle->epoch < reqEpoch) {
3,346,827✔
471
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
1,456!
472
            reqEpoch);
473
    pHandle->epoch = reqEpoch;
1,456✔
474
  }
475

476
  char buf[TSDB_OFFSET_LEN] = {0};
3,346,827✔
477
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
3,346,827✔
478
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, QID:0x%" PRIx64,
3,346,816!
479
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
480

481
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
3,346,837✔
482
  tqSetHandleIdle(pHandle);
3,346,386!
483

484
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
3,346,386!
485
          req.subKey, pHandle);
486

487
END:
×
488
  tDestroySMqPollReq(&req);
3,346,738✔
489
  return code;
3,346,782✔
490
}
491

492
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
2✔
493
  if (pTq == NULL || pMsg == NULL) {
2!
494
    return TSDB_CODE_INVALID_PARA;
×
495
  }
496
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
2✔
497
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
2✔
498

499
  SMqVgOffset vgOffset = {0};
2✔
500

501
  SDecoder decoder;
502
  tDecoderInit(&decoder, (uint8_t*)data, len);
2✔
503
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
2!
504
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
505
    return terrno;
×
506
  }
507

508
  tDecoderClear(&decoder);
2✔
509

510
  STqOffset* pSavedOffset = NULL;
2✔
511
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
2✔
512
  if (code != 0) {
2✔
513
    return TSDB_CODE_TMQ_NO_COMMITTED;
1✔
514
  }
515
  vgOffset.offset = *pSavedOffset;
1✔
516

517
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
1!
518
  if (code < 0) {
1!
519
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
520
  }
521

522
  void* buf = rpcMallocCont(len);
1✔
523
  if (buf == NULL) {
1!
524
    return terrno;
×
525
  }
526
  SEncoder encoder = {0};
1✔
527
  tEncoderInit(&encoder, buf, len);
1✔
528
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
1✔
529
  tEncoderClear(&encoder);
1✔
530
  if (code < 0) {
1!
531
    rpcFreeCont(buf);
×
532
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
533
  }
534

535
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
1✔
536

537
  tmsgSendRsp(&rsp);
1✔
538
  return 0;
1✔
539
}
540

541
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
11✔
542
  if (pTq == NULL || pMsg == NULL) {
11!
543
    return TSDB_CODE_INVALID_PARA;
×
544
  }
545
  int32_t    code = 0;
11✔
546
  SMqPollReq req = {0};
11✔
547
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
11!
548
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
549
    return TSDB_CODE_INVALID_MSG;
×
550
  }
551

552
  int64_t      consumerId = req.consumerId;
11✔
553
  STqOffsetVal reqOffset = req.reqOffset;
11✔
554
  int32_t      vgId = TD_VID(pTq->pVnode);
11✔
555

556
  // 1. find handle
557
  taosRLockLatch(&pTq->lock);
11✔
558
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
11✔
559
  if (pHandle == NULL) {
11!
560
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
561
    taosRUnLockLatch(&pTq->lock);
×
562
    return TSDB_CODE_INVALID_MSG;
×
563
  }
564

565
  // 2. check rebalance status
566
  if (pHandle->consumerId != consumerId) {
11!
567
    tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
568
            consumerId, vgId, req.subKey, pHandle->consumerId);
569
    taosRUnLockLatch(&pTq->lock);
×
570
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
571
  }
572

573
  int64_t sver = 0, ever = 0;
11✔
574
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
11✔
575
  taosRUnLockLatch(&pTq->lock);
11✔
576

577
  SMqDataRsp dataRsp = {0};
11✔
578
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
11✔
579
  if (code != 0) {
11!
580
    return code;
×
581
  }
582

583
  if (req.useSnapshot == true) {
11!
584
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
585
    code = TSDB_CODE_INVALID_PARA;
×
586
    goto END;
×
587
  }
588

589
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
11✔
590

591
  if (reqOffset.type == TMQ_OFFSET__LOG) {
11✔
592
    dataRsp.rspOffset.version = reqOffset.version;
2✔
593
  } else if (reqOffset.type < 0) {
9!
594
    STqOffset* pOffset = NULL;
9✔
595
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
9✔
596
    if (code == 0) {
9✔
597
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
1!
598
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
599
        code = TSDB_CODE_INVALID_PARA;
×
600
        goto END;
×
601
      }
602

603
      dataRsp.rspOffset.version = pOffset->val.version;
1✔
604
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
1!
605
             req.subKey, dataRsp.rspOffset.version);
606
    } else {
607
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
8!
608
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
8✔
609
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
×
610
        dataRsp.rspOffset.version = ever;
×
611
      }
612
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
8!
613
             dataRsp.rspOffset.version);
614
    }
615
  } else {
616
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
617
            reqOffset.type);
618
    code = TSDB_CODE_INVALID_PARA;
×
619
    goto END;
×
620
  }
621

622
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
11✔
623

624
END:
11✔
625
  tDeleteMqDataRsp(&dataRsp);
11✔
626
  return code;
11✔
627
}
628

629
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
932✔
630
  if (pTq == NULL || msg == NULL) {
932!
631
    return TSDB_CODE_INVALID_PARA;
×
632
  }
633
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
933✔
634
  int32_t        vgId = TD_VID(pTq->pVnode);
933✔
635

636
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
933✔
637
  int32_t code = 0;
936✔
638

639
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
936✔
640
  if (pHandle) {
935✔
641
    while (1) {
×
642
      taosWLockLatch(&pTq->lock);
933✔
643
      bool exec = tqIsHandleExec(pHandle);
933✔
644

645
      if (exec) {
933!
646
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
×
647
               pHandle->subKey, pHandle);
648
        taosWUnLockLatch(&pTq->lock);
×
649
        taosMsleep(10);
×
650
        continue;
×
651
      }
652
      tqUnregisterPushHandle(pTq, pHandle);
933✔
653
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
933✔
654
      if (code != 0) {
933!
655
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
656
      }
657
      taosWUnLockLatch(&pTq->lock);
933✔
658
      break;
933✔
659
    }
660
  }
661

662
  taosWLockLatch(&pTq->lock);
935✔
663
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
935✔
664
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
276!
665
  }
666
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
935✔
667
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
273!
668
  }
669

670
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
935!
671
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
672
  }
673
  taosWUnLockLatch(&pTq->lock);
935✔
674

675
  return 0;
935✔
676
}
677

678
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
118✔
679
  if (pTq == NULL || msg == NULL) {
118!
680
    return TSDB_CODE_INVALID_PARA;
×
681
  }
682
  STqCheckInfo info = {0};
118✔
683
  int32_t      code = tqMetaDecodeCheckInfo(&info, msg, msgLen >= 0 ? msgLen : 0);
118✔
684
  if (code != 0) {
118!
685
    return code;
×
686
  }
687

688
  code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
118✔
689
  if (code != 0) {
118!
690
    tDeleteSTqCheckInfo(&info);
×
691
    return code;
×
692
  }
693

694
  return tqMetaSaveInfo(pTq, pTq->pCheckStore, info.topic, strlen(info.topic), msg, msgLen >= 0 ? msgLen : 0);
118✔
695
}
696

697
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
10✔
698
  if (pTq == NULL || msg == NULL) {
10!
699
    return TSDB_CODE_INVALID_PARA;
×
700
  }
701
  if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
10!
702
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
703
  }
704
  return tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, strlen(msg));
10✔
705
}
706

707
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
3,178✔
708
  if (pTq == NULL || msg == NULL) {
3,178!
709
    return TSDB_CODE_INVALID_PARA;
×
710
  }
711
  int         ret = 0;
3,182✔
712
  SMqRebVgReq req = {0};
3,182✔
713
  SDecoder    dc = {0};
3,182✔
714

715
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
3,182✔
716
  ret = tDecodeSMqRebVgReq(&dc, &req);
3,179✔
717
  if (ret < 0) {
3,177!
718
    goto end;
×
719
  }
720

721
  tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
3,177!
722
         req.oldConsumerId, req.newConsumerId);
723

724
  taosRLockLatch(&pTq->lock);
3,181✔
725
  STqHandle* pHandle = NULL;
3,182✔
726
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
3,182✔
727
  if (code != 0){
3,180✔
728
    tqInfo("vgId:%d, tq process sub req:%s, no such handle, create new one, msg:%s", pTq->pVnode->config.vgId, req.subKey, tstrerror(code));
1,404!
729
  }
730
  taosRUnLockLatch(&pTq->lock);
3,182✔
731
  if (pHandle == NULL) {
3,182✔
732
    if (req.oldConsumerId != -1) {
1,406!
733
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
×
734
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
735
    }
736
    if (req.newConsumerId == -1) {
1,406!
737
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64, req.vgId, req.newConsumerId);
×
738
      ret = TSDB_CODE_INVALID_PARA;
×
739
      goto end;
×
740
    }
741
    STqHandle handle = {0};
1,406✔
742
    ret = tqMetaCreateHandle(pTq, &req, &handle);
1,406✔
743
    if (ret < 0) {
1,406!
744
      tqDestroyTqHandle(&handle);
×
745
      goto end;
×
746
    }
747
    taosWLockLatch(&pTq->lock);
1,406✔
748
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
1,406✔
749
    taosWUnLockLatch(&pTq->lock);
1,405✔
750
  } else {
751
    while (1) {
×
752
      taosWLockLatch(&pTq->lock);
1,776✔
753
      bool exec = tqIsHandleExec(pHandle);
1,776!
754
      if (exec) {
1,776!
755
        tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p",
×
756
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
757
        taosWUnLockLatch(&pTq->lock);
×
758
        taosMsleep(10);
×
759
        continue;
×
760
      }
761
      if (pHandle->consumerId == req.newConsumerId) {  // do nothing
1,776✔
762
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
163!
763
      } else {
764
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
1,613!
765
               req.newConsumerId);
766

767
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
1,613✔
768
        atomic_store_32(&pHandle->epoch, 0);
1,613✔
769
        tqUnregisterPushHandle(pTq, pHandle);
1,613✔
770
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
1,612✔
771
      }
772
      taosWUnLockLatch(&pTq->lock);
1,775✔
773
      break;
1,776✔
774
    }
775
  }
776

777
end:
3,182✔
778
  tDecoderClear(&dc);
3,182✔
779
  return ret;
3,181✔
780
}
781

782
static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
2,769!
783

784
int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) {
6,801✔
785
  STQ*             pTq = (STQ*)pTqObj;
6,801✔
786
  int32_t          vgId = TD_VID(pTq->pVnode);
6,801✔
787
  SCheckpointInfo* pChkInfo = NULL;
6,801✔
788

789
  tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
6,801✔
790

791
  int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer);
6,805✔
792
  if (code != TSDB_CODE_SUCCESS) {
6,804!
793
    return code;
×
794
  }
795

796
  pTask->pBackend = NULL;
6,804✔
797

798
  // sink
799
  STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
6,804✔
800
  if (pOutputInfo->type == TASK_OUTPUT__SMA) {
6,804!
801
    pOutputInfo->smaSink.vnode = pTq->pVnode;
×
802
    pOutputInfo->smaSink.smaSink = smaHandleRes;
×
803
  } else if (pOutputInfo->type == TASK_OUTPUT__TABLE) {
6,804✔
804
    pOutputInfo->tbSink.vnode = pTq->pVnode;
3,399✔
805
    pOutputInfo->tbSink.tbSinkFunc = tqSinkDataIntoDstTable;
3,399✔
806

807
    int32_t   ver1 = 1;
3,399✔
808
    SMetaInfo info = {0};
3,399✔
809
    code = metaGetInfo(pTq->pVnode->pMeta, pOutputInfo->tbSink.stbUid, &info, NULL);
3,399✔
810
    if (code == TSDB_CODE_SUCCESS) {
3,399✔
811
      ver1 = info.skmVer;
3,114✔
812
    }
813

814
    SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper;
3,399✔
815
    pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1);
3,399✔
816
    if (pOutputInfo->tbSink.pTSchema == NULL) {
3,399!
817
      return terrno;
×
818
    }
819

820
    pOutputInfo->tbSink.pTbInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
3,399✔
821
    if (pOutputInfo->tbSink.pTbInfo == NULL) {
3,399!
822
      tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno));
×
823
      return terrno;
×
824
    }
825

826
    tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTbInfo, freePtr);
3,399✔
827
  }
828

829
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
6,804✔
830
    bool scanDropCtb = pTask->subtableWithoutMd5 ? true : false;
3,490✔
831
    SWalFilterCond cond = {.deleteMsg = 1, .scanDropCtb = scanDropCtb};  // delete msg also extract from wal files
3,490✔
832
    pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId);
3,490✔
833
    if (pTask->exec.pWalReader == NULL) {
3,490!
834
      tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno));
×
835
      return terrno;
×
836
    }
837
  }
838

839
  streamTaskResetUpstreamStageInfo(pTask);
6,804✔
840

841
  pChkInfo = &pTask->chkInfo;
6,804✔
842
  tqSetRestoreVersionInfo(pTask);
6,804✔
843

844
  char*       p = streamTaskGetStatus(pTask).name;
6,804✔
845
  const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
6,804✔
846

847
  if (pTask->info.fillHistory) {
6,804✔
848
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
1,897!
849
           " nextProcessVer:%" PRId64
850
           " child id:%d, level:%d, cur-status:%s, next-status:%s taskType:%d, related stream task:0x%x "
851
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
852
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
853
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
854
           (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
855
  } else {
856
    tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
4,907!
857
           " nextProcessVer:%" PRId64
858
           " child id:%d, level:%d, cur-status:%s next-status:%s taskType:%d, related helper-task:0x%x "
859
           "delaySched:%" PRId64 " ms, inputVer:%" PRId64,
860
           vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
861
           pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
862
           (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);
863

864
    if (pChkInfo->checkpointVer > pChkInfo->nextProcessVer) {
4,907!
865
      tqError("vgId:%d build stream task, s-task:%s, checkpointVer:%" PRId64 " > nextProcessVer:%" PRId64, vgId,
×
866
              pTask->id.idStr, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
867
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
868
    }
869
  }
870

871
  return 0;
6,804✔
872
}
873

874
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessCheckReq(pTq->pStreamMeta, pMsg); }
9,850✔
875

876
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
10,000✔
877
  return tqStreamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
10,000✔
878
}
879

880
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
6,609✔
881
  return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen,
6,609✔
882
                                      vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
6,609✔
883
}
884

885
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
892✔
886
  const char*    id = pTask->id.idStr;
892✔
887
  int64_t        nextProcessedVer = pStreamTask->hTaskInfo.haltVer;
892✔
888
  SVersionRange* pStep2Range = &pTask->step2Range;
892✔
889
  int32_t        vgId = pTask->pMeta->vgId;
892✔
890

891
  // if it's an source task, extract the last version in wal.
892
  bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer);
892✔
893
  pTask->execInfo.step2Start = taosGetTimestampMs();
892✔
894

895
  if (done) {
892✔
896
    qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id,
371✔
897
           pStep2Range->minVer, pStep2Range->maxVer, 0.0);
898
    int32_t code = streamTaskPutTranstateIntoInputQ(pTask);  // todo: msg lost.
371✔
899
    if (code) {
371!
900
      qError("s-task:%s failed put trans-state into inputQ, code:%s", id, tstrerror(code));
×
901
    }
902
    (void)streamExecTask(pTask);  // exec directly
371✔
903
  } else {
904
    STimeWindow* pWindow = &pTask->dataRange.window;
521✔
905
    tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64
521!
906
            ", do secondary scan-history from WAL after halt the related stream task:%s",
907
            id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey,
908
            pStreamTask->id.idStr);
909
    if (pTask->status.schedStatus != TASK_SCHED_STATUS__WAITING) {
521!
910
      tqError("s-task:%s level:%d unexpected sched-status:%d", id, pTask->info.taskLevel, pTask->status.schedStatus);
×
911
    }
912

913
    int32_t code = streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
521✔
914
    if (code) {
521!
915
      tqError("s-task:%s level:%d failed to set step2 param", id, pTask->info.taskLevel);
×
916
    }
917

918
    int64_t dstVer = pStep2Range->minVer;
521✔
919
    pTask->chkInfo.nextProcessVer = dstVer;
521✔
920

921
    walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
521✔
922
    tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer,
521!
923
            pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE);
924

925
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
521✔
926

927
    // now the fill-history task starts to scan data from wal files.
928
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
521✔
929
  }
930
}
892✔
931

932
int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
892✔
933
  STQ* pTq = param;
892✔
934

935
  SStreamMeta* pMeta = pStreamTask->pMeta;
892✔
936
  STaskId      hId = pStreamTask->hTaskInfo.id;
892✔
937
  SStreamTask* pTask = NULL;
892✔
938
  int32_t      code = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId, &pTask);
892✔
939
  if (pTask == NULL) {
892!
940
    tqWarn("s-task:0x%x failed to acquired it to exec step 2, scan wal quit", (int32_t)hId.taskId);
×
941
    return TSDB_CODE_SUCCESS;
×
942
  }
943

944
  doStartFillhistoryStep2(pTask, pStreamTask, pTq);
892✔
945

946
  streamMetaReleaseTask(pMeta, pTask);
892✔
947
  return TSDB_CODE_SUCCESS;
892✔
948
}
949

950
// this function should be executed by only one thread, so we set a sentinel to protect this function
951
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
933✔
952
  SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
933✔
953
  SStreamMeta*           pMeta = pTq->pStreamMeta;
933✔
954
  int32_t                code = TSDB_CODE_SUCCESS;
933✔
955
  SStreamTask*           pTask = NULL;
933✔
956
  SStreamTask*           pStreamTask = NULL;
933✔
957
  char*                  pStatus = NULL;
933✔
958
  int32_t                taskType = 0;
933✔
959

960
  code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
933✔
961
  if (pTask == NULL) {
933✔
962
    tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed",
1!
963
            pMeta->vgId, pReq->taskId);
964
    return code;
1✔
965
  }
966

967
  // do recovery step1
968
  const char* id = pTask->id.idStr;
932✔
969
  streamMutexLock(&pTask->lock);
932✔
970

971
  SStreamTaskState s = streamTaskGetStatus(pTask);
932✔
972
  pStatus = s.name;
932✔
973
  taskType = pTask->info.fillHistory;
932✔
974

975
  if ((s.state != TASK_STATUS__SCAN_HISTORY && taskType == STREAM_HISTORY_TASK) ||
932!
976
      (s.state != TASK_STATUS__READY && taskType == STREAM_RECALCUL_TASK) ||
932!
977
      (pTask->status.downstreamReady == 0)) {
932!
978
    tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not allowed/ready for scan-history data, quit", id,
×
979
            pMeta->vgId, s.name, pTask->status.downstreamReady);
980

981
    streamMutexUnlock(&pTask->lock);
×
982
    streamMetaReleaseTask(pMeta, pTask);
×
983
    return 0;
×
984
  }
985

986
  if (pTask->exec.pExecutor == NULL) {
932!
987
    tqError("s-task:%s vgId:%d executor is null, not executor scan history", id, pMeta->vgId);
×
988

989
    streamMutexUnlock(&pTask->lock);
×
990
    streamMetaReleaseTask(pMeta, pTask);
×
991
    return 0;
×
992
  }
993

994
  streamMutexUnlock(&pTask->lock);
932✔
995

996
  // avoid multi-thread exec
997
  while (1) {
×
998
    int32_t sentinel = atomic_val_compare_exchange_32(&pTask->status.inScanHistorySentinel, 0, 1);
932✔
999
    if (sentinel != 0) {
932!
1000
      tqDebug("s-task:%s already in scan-history func, wait for 100ms, and try again", id);
×
1001
      taosMsleep(100);
×
1002
    } else {
1003
      break;
932✔
1004
    }
1005
  }
1006

1007
  // let's decide which step should be executed now
1008
  if (pTask->execInfo.step1Start == 0) {
932✔
1009
    int64_t ts = taosGetTimestampMs();
893✔
1010
    pTask->execInfo.step1Start = ts;
893✔
1011
    tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
893✔
1012
  } else {
1013
    if (pTask->execInfo.step2Start == 0) {
39!
1014
      tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64 ", already elapsed:%.2fs",
39!
1015
              id, pTask->execInfo.step1Start, pTask->execInfo.step1El);
1016
    } else {
1017
      tqDebug("s-task:%s already in step2, no need to scan-history data, step2 startTs:%" PRId64, id,
×
1018
              pTask->execInfo.step2Start);
1019

1020
      atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
×
1021
      streamMetaReleaseTask(pMeta, pTask);
×
1022
      return 0;
×
1023
    }
1024
  }
1025

1026
  // we have to continue retrying to successfully execute the scan history task.
1027
  if (!streamTaskSetSchedStatusWait(pTask)) {
932!
1028
    tqError(
×
1029
        "s-task:%s failed to start scan-history in first stream time window since already started, unexpected "
1030
        "sched-status:%d",
1031
        id, pTask->status.schedStatus);
1032
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
×
1033
    streamMetaReleaseTask(pMeta, pTask);
×
1034
    return 0;
×
1035
  }
1036

1037
  int64_t              st = taosGetTimestampMs();
932✔
1038
  SScanhistoryDataInfo retInfo = streamScanHistoryData(pTask, st);
932✔
1039

1040
  double el = (taosGetTimestampMs() - st) / 1000.0;
932✔
1041
  pTask->execInfo.step1El += el;
932✔
1042

1043
  if (retInfo.ret == TASK_SCANHISTORY_QUIT || retInfo.ret == TASK_SCANHISTORY_REXEC) {
932✔
1044
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
40✔
1045
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
40✔
1046

1047
    if (retInfo.ret == TASK_SCANHISTORY_REXEC) {
40✔
1048
      streamExecScanHistoryInFuture(pTask, retInfo.idleTime);
39✔
1049
    } else {
1050
      SStreamTaskState p = streamTaskGetStatus(pTask);
1✔
1051
      ETaskStatus      localStatus = p.state;
1✔
1052

1053
      if (localStatus == TASK_STATUS__PAUSE) {
1!
1054
        tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", id, el,
×
1055
                pTask->execInfo.step1El, status);
1056
      } else if (localStatus == TASK_STATUS__STOP || localStatus == TASK_STATUS__DROPPING) {
1!
1057
        tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", id, p.name,
1!
1058
                pTask->execInfo.step1El);
1059
      }
1060
    }
1061

1062
    streamMetaReleaseTask(pMeta, pTask);
40✔
1063
    return 0;
40✔
1064
  }
1065

1066
  // the following procedure should be executed, no matter status is stop/pause or not
1067
  if (taskType == STREAM_HISTORY_TASK) {
892!
1068
    tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
892✔
1069
  } else if (taskType == STREAM_RECALCUL_TASK) {
×
1070
    tqDebug("s-task:%s recalculate ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
×
1071
  } else {
1072
    tqError("s-task:%s fill-history is disabled, unexpected", id);
×
1073
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1074
  }
1075

1076
  // 1. get the related stream task
1077
  code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask);
892✔
1078
  if (pStreamTask == NULL) {
892!
1079

1080
    int32_t ret = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
×
1081
    if (ret == 0 && pStreamTask != NULL) {
×
1082
      tqWarn("s-task:0x%" PRIx64 " stopped, not ready for related task:%s scan-history work, do nothing",
×
1083
             pTask->streamTaskId.taskId, pTask->id.idStr);
1084
      streamMetaReleaseTask(pMeta, pStreamTask);
×
1085
    } else {
1086
      tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
×
1087
              pTask->streamTaskId.taskId, pTask->id.idStr);
1088

1089
      tqDebug("s-task:%s fill-history task set status to be dropping", id);
×
1090
      code = streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
×
1091
    }
1092

1093
    atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
×
1094
    streamMetaReleaseTask(pMeta, pTask);
×
1095
    return code;
×
1096
  }
1097

1098
  if (pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) {
892!
1099
    tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel);
×
1100
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1101
  }
1102

1103
  if (taskType == STREAM_HISTORY_TASK) {
892!
1104
    code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
892✔
1105
  } else if (taskType == STREAM_RECALCUL_TASK) {
×
1106
    // send recalculate end block
1107
    code = streamCreateAddRecalculateEndBlock(pStreamTask);
×
1108
    if (code) {
×
1109
      tqError("s-task:%s failed to create-add recalculate end block, code:%s", id, tstrerror(code));
×
1110
    }
1111
    streamTaskSetSchedStatusInactive(pTask);
×
1112
  }
1113

1114
  streamMetaReleaseTask(pMeta, pStreamTask);
892✔
1115

1116
  atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
892✔
1117
  streamMetaReleaseTask(pMeta, pTask);
892✔
1118
  return code;
892✔
1119
}
1120

1121
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
100,198✔
1122
  int32_t  code = 0;
100,198✔
1123
  char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
100,198✔
1124
  int32_t  len = pMsg->contLen - sizeof(SMsgHead);
100,198✔
1125
  SDecoder decoder;
1126

1127
  SStreamTaskRunReq req = {0};
100,198✔
1128
  tDecoderInit(&decoder, (uint8_t*)msg, len);
100,198✔
1129
  if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
100,287!
1130
    tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
×
1131
    tDecoderClear(&decoder);
×
1132
    return TSDB_CODE_SUCCESS;
×
1133
  }
1134

1135
  tDecoderClear(&decoder);
100,284✔
1136

1137
  // extracted submit data from wal files for all tasks
1138
  if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
100,248✔
1139
    return tqScanWal(pTq);
57,252✔
1140
  } else {
1141
    code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
42,996✔
1142
    if (code) {
43,007✔
1143
      tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
11!
1144
    }
1145

1146
    return code;
43,007✔
1147
  }
1148
}
1149

1150
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
12,332✔
1151
  return tqStreamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg);
12,332✔
1152
}
1153

1154
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
12,298✔
1155
  return tqStreamTaskProcessDispatchRsp(pTq->pStreamMeta, pMsg);
12,298✔
1156
}
1157

1158
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
2,632✔
1159
  return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen);
2,632✔
1160
}
1161

1162
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
1,336✔
1163
  return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg);
1,336✔
1164
}
1165

1166
int32_t tqProcessTaskConsenChkptIdReq(STQ* pTq, SRpcMsg* pMsg) {
70✔
1167
  return tqStreamTaskProcessConsenChkptIdReq(pTq->pStreamMeta, pMsg);
70✔
1168
}
1169

1170
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
141✔
1171
  return tqStreamTaskProcessTaskPauseReq(pTq->pStreamMeta, msg);
141✔
1172
}
1173

1174
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
134✔
1175
  return tqStreamTaskProcessTaskResumeReq(pTq, sversion, msg, true);
134✔
1176
}
1177

1178
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
420✔
1179
  return tqStreamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
420✔
1180
}
1181

1182
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; }
414✔
1183

1184
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
×
1185
  char*               msgStr = pMsg->pCont;
×
1186
  char*               msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
×
1187
  int32_t             msgLen = pMsg->contLen - sizeof(SMsgHead);
×
1188
  int32_t             code = 0;
×
1189
  SStreamProgressReq  req;
1190
  char*               pRspBuf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(SStreamProgressRsp));
×
1191
  SStreamProgressRsp* pRsp = POINTER_SHIFT(pRspBuf, sizeof(SMsgHead));
×
1192
  if (!pRspBuf) {
×
1193
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1194
    code = -1;
×
1195
    goto _OVER;
×
1196
  }
1197

1198
  code = tDeserializeStreamProgressReq(msgBody, msgLen, &req);
×
1199
  if (code == TSDB_CODE_SUCCESS) {
×
1200
    code = tqGetStreamExecInfo(pTq->pVnode, req.streamId, &pRsp->progressDelay, &pRsp->fillHisFinished);
×
1201
  }
1202
  if (code == TSDB_CODE_SUCCESS) {
×
1203
    pRsp->fetchIdx = req.fetchIdx;
×
1204
    pRsp->subFetchIdx = req.subFetchIdx;
×
1205
    pRsp->vgId = req.vgId;
×
1206
    pRsp->streamId = req.streamId;
×
1207
    code = tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp);
×
1208
    if (code) {
×
1209
      goto _OVER;
×
1210
    }
1211

1212
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
×
1213
    rsp.pCont = pRspBuf;
×
1214
    pRspBuf = NULL;
×
1215
    rsp.contLen = sizeof(SMsgHead) + sizeof(SStreamProgressRsp);
×
1216
    tmsgSendRsp(&rsp);
×
1217
  }
1218

1219
_OVER:
×
1220
  if (pRspBuf) {
×
1221
    taosMemoryFree(pRspBuf);
×
1222
  }
1223
  return code;
×
1224
}
1225

1226
// always return success to mnode
1227
//todo: handle failure of build and send msg to mnode
1228
static void doSendChkptSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, int32_t code,
15✔
1229
                                 int32_t taskId) {
1230
  SRpcMsg rsp = {0};
15✔
1231
  int32_t ret = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &rsp, code);
15✔
1232
  if (ret) {  // suppress the error in build checkpoint source rsp
15!
1233
    tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", taskId, tstrerror(ret));
×
1234
  }
1235
  tmsgSendRsp(&rsp);  // error occurs
15✔
1236
}
15✔
1237

1238
// no matter what kinds of error happened, make sure the mnode will receive the success execution code.
1239
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
870✔
1240
  int32_t                    vgId = TD_VID(pTq->pVnode);
870✔
1241
  SStreamMeta*               pMeta = pTq->pStreamMeta;
870✔
1242
  char*                      msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
870✔
1243
  int32_t                    len = pMsg->contLen - sizeof(SMsgHead);
870✔
1244
  int32_t                    code = 0;
870✔
1245
  SStreamCheckpointSourceReq req = {0};
870✔
1246
  SDecoder                   decoder = {0};
870✔
1247
  SStreamTask*               pTask = NULL;
870✔
1248
  int64_t                    checkpointId = 0;
870✔
1249

1250
  // disable auto rsp to mnode
1251
  pRsp->info.handle = NULL;
870✔
1252

1253
  tDecoderInit(&decoder, (uint8_t*)msg, len);
870✔
1254
  if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
870!
1255
    code = TSDB_CODE_MSG_DECODE_ERROR;
×
1256
    tDecoderClear(&decoder);
×
1257
    tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
×
1258
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1259
    return TSDB_CODE_SUCCESS;  // always return success to mnode,
×
1260
  }
1261

1262
  tDecoderClear(&decoder);
870✔
1263

1264
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
870✔
1265
    tqDebug("vgId:%d not leader, ignore checkpoint-source msg, checkpontId:%" PRId64 ", s-task:0x%x", vgId,
15!
1266
            req.checkpointId, req.taskId);
1267
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
15✔
1268
    return TSDB_CODE_SUCCESS;  // always return success to mnode
15✔
1269
  }
1270

1271
  if (!pTq->pVnode->restored) {
855!
1272
    tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64
×
1273
            ", transId:%d s-task:0x%x ignore it",
1274
            vgId, req.checkpointId, req.transId, req.taskId);
1275
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1276
    return TSDB_CODE_SUCCESS;  // always return success to mnode
×
1277
  }
1278

1279
  code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
855✔
1280
  if (pTask == NULL || code != 0) {
855!
1281
    tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
×
1282
            " transId:%d it may have been destroyed",
1283
            vgId, req.taskId, req.checkpointId, req.transId);
1284
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1285
    return TSDB_CODE_SUCCESS;
×
1286
  }
1287

1288
  if (pTask->status.downstreamReady != 1) {
855!
1289
    // record the latest failed checkpoint id
1290
    streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId);
×
1291
    tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
×
1292
            ", transId:%d set it failed",
1293
            pTask->id.idStr, req.checkpointId, req.transId);
1294

1295
    streamMetaReleaseTask(pMeta, pTask);
×
1296
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1297
    return TSDB_CODE_SUCCESS;  // todo retry handle error
×
1298
  }
1299

1300
  // todo save the checkpoint failed info
1301
  streamMutexLock(&pTask->lock);
855✔
1302
  ETaskStatus status = streamTaskGetStatus(pTask).state;
855✔
1303

1304
  if (req.mndTrigger == 1) {
855✔
1305
    if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
35!
1306
      tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure",
×
1307
              pTask->id.idStr, req.checkpointId);
1308

1309
      streamMutexUnlock(&pTask->lock);
×
1310
      streamMetaReleaseTask(pMeta, pTask);
×
1311
      doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1312
      return TSDB_CODE_SUCCESS;
×
1313
    }
1314
  } else {
1315
    if (status != TASK_STATUS__HALT) {
820!
1316
      tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
×
1317
      //      streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
1318
    }
1319
  }
1320

1321
  // check if the checkpoint msg already sent or not.
1322
  if (status == TASK_STATUS__CK) {
855!
1323
    streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId);
×
1324

1325
    tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1326
           " transId:%d already handled, ignore msg and continue process checkpoint",
1327
           pTask->id.idStr, checkpointId, req.transId);
1328

1329
    streamMutexUnlock(&pTask->lock);
×
1330
    streamMetaReleaseTask(pMeta, pTask);
×
1331
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SYN_PROPOSE_NOT_READY, req.taskId);
×
1332
    return TSDB_CODE_SUCCESS;
×
1333
  } else {  // checkpoint already finished, and not in checkpoint status
1334
    if (req.checkpointId <= pTask->chkInfo.checkpointId) {
855!
1335
      tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64
×
1336
             " transId:%d already handled, return success",
1337
             pTask->id.idStr, req.checkpointId, req.transId);
1338

1339
      streamMutexUnlock(&pTask->lock);
×
1340
      streamMetaReleaseTask(pMeta, pTask);
×
1341
      doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1342
      return TSDB_CODE_SUCCESS;
×
1343
    }
1344
  }
1345

1346
  code = streamProcessCheckpointSourceReq(pTask, &req);
855✔
1347
  streamMutexUnlock(&pTask->lock);
855✔
1348

1349
  if (code) {
855!
1350
    qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId,
×
1351
           tstrerror(code));
1352
    streamMetaReleaseTask(pMeta, pTask);
×
1353
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1354
    return TSDB_CODE_SUCCESS;
×
1355
  }
1356

1357
  if (req.mndTrigger) {
855✔
1358
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ",
35!
1359
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
1360
  } else {
1361
    const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask));
820✔
1362
    tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64
819!
1363
           ", transId:%d after transfer-state, prev status:%s",
1364
           pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
1365
  }
1366

1367
  code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
855✔
1368
  if (code != TSDB_CODE_SUCCESS) {
855!
1369
    streamTaskSetCheckpointFailed(pTask);  // set the checkpoint failed
×
1370
    doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId);
×
1371
  }
1372

1373
  streamMetaReleaseTask(pMeta, pTask);
855✔
1374
  return TSDB_CODE_SUCCESS;
855✔
1375
}
1376

1377
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
1378
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
2,439✔
1379
  int32_t vgId = TD_VID(pTq->pVnode);
2,439✔
1380

1381
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
2,439!
1382
    char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1383
    int32_t  len = pMsg->contLen - sizeof(SMsgHead);
×
1384
    int32_t  code = 0;
×
1385
    SDecoder decoder;
1386

1387
    SStreamCheckpointReadyMsg req = {0};
×
1388
    tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1389
    if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
×
1390
      code = TSDB_CODE_MSG_DECODE_ERROR;
×
1391
      tDecoderClear(&decoder);
×
1392
      return code;
×
1393
    }
1394
    tDecoderClear(&decoder);
×
1395

1396
    tqError("vgId:%d not leader, s-task:0x%x ignore the retrieve checkpoint-trigger msg from s-task:0x%x vgId:%d", vgId,
×
1397
            req.upstreamTaskId, req.downstreamTaskId, req.downstreamNodeId);
1398

1399
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1400
  }
1401

1402
  return tqStreamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg);
2,439✔
1403
}
1404

1405
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
28✔
1406
  return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg,
56✔
1407
                                      pTq->pVnode->restored, (pTq->pStreamMeta->role == NODE_ROLE_LEADER));
28✔
1408
}
1409

1410
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
×
1411
  return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg->pCont);
×
1412
}
1413

1414
int32_t tqProcessAllTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {
3,096✔
1415
  return tqStreamTaskProcessAllTaskStopReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg);
3,096✔
1416
}
1417

1418
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
×
1419
  int32_t vgId = TD_VID(pTq->pVnode);
×
1420

1421
  if (!vnodeIsRoleLeader(pTq->pVnode)) {
×
1422
    SRetrieveChkptTriggerReq req = {0};
×
1423

1424
    char*    msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
1425
    int32_t  len = pMsg->contLen - sizeof(SMsgHead);
×
1426
    SDecoder decoder = {0};
×
1427

1428
    tDecoderInit(&decoder, (uint8_t*)msg, len);
×
1429
    if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
×
1430
      tDecoderClear(&decoder);
×
1431
      tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId);
×
1432
      return TSDB_CODE_INVALID_MSG;
×
1433
    }
1434
    tDecoderClear(&decoder);
×
1435

1436
    tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId,
×
1437
            req.downstreamTaskId);
1438
    return TSDB_CODE_STREAM_NOT_LEADER;
×
1439
  }
1440

1441
  return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg);
×
1442
}
1443

1444
int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) {
×
1445
  return tqStreamTaskProcessRetrieveTriggerRsp(pTq->pStreamMeta, pMsg);
×
1446
}
1447

1448
// this function is needed, do not try to remove it.
1449
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); }
4,884✔
1450

1451
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
1,687✔
1452
  return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
1,687✔
1453
}
1454

1455
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
2,438✔
1456
  return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg);
2,438✔
1457
}
1458

1459
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
1,678✔
1460
  return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
1,678✔
1461
}
1462

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc