• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.85
/source/libs/stream/src/streamCheckpoint.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "rsync.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tcs.h"
20

21
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
22
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
23
#ifdef BUILD_NO_CALL
24
static int32_t deleteCheckpoint(const char* id);
25
#endif
26
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
27
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
28
                                          int32_t transId, int32_t srcTaskId);
29
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
30
static void    checkpointTriggerMonitorFn(void* param, void* tmrId);
31

32
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
6,825✔
33
                                int32_t srcTaskId, SStreamDataBlock** pRes) {
34
  SStreamDataBlock* pChkpoint = NULL;
6,825✔
35
  int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint);
6,825✔
36
  if (code) {
6,828!
37
    return code;
×
38
  }
39

40
  pChkpoint->type = checkpointType;
6,828✔
41
  if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
6,828!
42
    pChkpoint->srcTaskId = srcTaskId;
×
43
    if (srcTaskId <= 0) {
×
44
      stDebug("s-task:%s invalid src task id:%d for creating checkpoint trigger block", pTask->id.idStr, srcTaskId);
×
45
      return TSDB_CODE_INVALID_PARA;
×
46
    }
47
  }
48

49
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
6,828!
50
  if (pBlock == NULL) {
6,824!
51
    taosFreeQitem(pChkpoint);
×
52
    return terrno;
×
53
  }
54

55
  pBlock->info.type = STREAM_CHECKPOINT;
6,824✔
56
  pBlock->info.version = checkpointId;
6,824✔
57
  pBlock->info.window.ekey = pBlock->info.window.skey = transId;  // NOTE: set the transId
6,824✔
58
  pBlock->info.rows = 1;
6,824✔
59
  pBlock->info.childId = pTask->info.selfChildId;
6,824✔
60

61
  pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
6,824✔
62
  if (pChkpoint->blocks == NULL) {
6,827!
63
    taosMemoryFree(pBlock);
×
64
    taosFreeQitem(pChkpoint);
×
65
    return terrno;
×
66
  }
67

68
  void* p = taosArrayPush(pChkpoint->blocks, pBlock);
6,827✔
69
  if (p == NULL) {
6,830!
70
    taosArrayDestroy(pChkpoint->blocks);
×
71
    taosMemoryFree(pBlock);
×
72
    taosFreeQitem(pChkpoint);
×
73
    return terrno;
×
74
  }
75

76
  *pRes = pChkpoint;
6,830✔
77

78
  taosMemoryFree(pBlock);
6,830!
79
  return TSDB_CODE_SUCCESS;
6,831✔
80
}
81

82
// this message must be put into inputq successfully, continue retrying until it succeeds
83
// todo must be success
84
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
6,827✔
85
                                   int32_t srcTaskId) {
86
  SStreamDataBlock* pCheckpoint = NULL;
6,827✔
87
  int32_t code = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId, &pCheckpoint);
6,827✔
88
  if (code != TSDB_CODE_SUCCESS) {
6,831!
89
    return code;
×
90
  }
91

92
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
6,831!
93
    return TSDB_CODE_OUT_OF_MEMORY;
×
94
  }
95

96
  return streamTrySchedExec(pTask, true);
6,822✔
97
}
98

99
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
3,281✔
100
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
3,281!
101
    return TSDB_CODE_INVALID_MSG;
×
102
  }
103

104
  // todo this status may not be set here.
105
  // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
106
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
3,281✔
107
  if (code != TSDB_CODE_SUCCESS) {
3,288!
108
    stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr);
×
109
    return code;
×
110
  }
111

112
  pTask->chkInfo.pActiveInfo->transId = pReq->transId;
3,288✔
113
  pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
3,288✔
114
  pTask->chkInfo.startTs = taosGetTimestampMs();
3,282✔
115
  pTask->execInfo.checkpoint += 1;
3,282✔
116

117
  // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
118
  // and this is the last item in the inputQ.
119
  return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1);
3,282✔
120
}
121

122
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
2✔
123
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
2✔
124
  bool                   unQualified = false;
2✔
125
  const char*            id = pTask->id.idStr;
2✔
126

127
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2!
128
    stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", id);
×
129
    return TSDB_CODE_INVALID_MSG;
×
130
  }
131

132
  if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
2✔
133
    stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", id, pRsp->upstreamTaskId,
1!
134
            tstrerror(pRsp->rspCode));
135
    return TSDB_CODE_SUCCESS;
1✔
136
  }
137

138
  streamMutexLock(&pTask->lock);
1✔
139
  SStreamTaskState status = streamTaskGetStatus(pTask);
1✔
140
  streamMutexUnlock(&pTask->lock);
1✔
141

142
  if (status.state != TASK_STATUS__CK) {
1!
143
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
×
144
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
145
  }
146

147
  streamMutexLock(&pInfo->lock);
1✔
148
  unQualified = (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId);
1!
149
  streamMutexUnlock(&pInfo->lock);
1✔
150

151
  if (unQualified) {
1!
152
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
1!
153
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
1✔
154
  }
155

156
  // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
157
  int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
×
158
                                            pRsp->upstreamTaskId);
159
  return code;
×
160
}
161

162
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
1✔
163
                                           SRpcHandleInfo* pRpcInfo, int32_t code) {
164
  int32_t  ret = 0;
1✔
165
  int32_t  tlen = 0;
1✔
166
  void*    buf = NULL;
1✔
167
  SEncoder encoder;
168

169
  SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId,
1✔
170
                               .upstreamTaskId = pTask->id.taskId,
1✔
171
                               .taskId = dstTaskId,
172
                               .rspCode = code};
173

174
  if (code == TSDB_CODE_SUCCESS) {
1!
175
    req.checkpointId = pTask->chkInfo.pActiveInfo->activeId;
1✔
176
    req.transId = pTask->chkInfo.pActiveInfo->transId;
1✔
177
  } else {
178
    req.checkpointId = -1;
×
179
    req.transId = -1;
×
180
  }
181

182
  tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret);
1!
183
  if (ret < 0) {
1!
184
    stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code));
×
185
    return ret;
×
186
  }
187

188
  buf = rpcMallocCont(tlen + sizeof(SMsgHead));
1✔
189
  if (buf == NULL) {
1!
190
    stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId);
×
191
    return terrno;
×
192
  }
193

194
  ((SMsgHead*)buf)->vgId = htonl(downstreamNodeId);
1✔
195
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1✔
196

197
  tEncoderInit(&encoder, abuf, tlen);
1✔
198
  if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) {
1!
199
    rpcFreeCont(buf);
×
200
    tEncoderClear(&encoder);
×
201
    stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code));
×
202
    return ret;
×
203
  }
204
  tEncoderClear(&encoder);
1✔
205

206
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo};
1✔
207
  tmsgSendRsp(&rspMsg);
1✔
208

209
  return ret;
1✔
210
}
211

212
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
3,331✔
213
  pBlock->srcTaskId = pTask->id.taskId;
3,331✔
214
  pBlock->srcVgId = pTask->pMeta->vgId;
3,331✔
215

216
  if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
3,331!
217
    stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
×
218
    return 0;
×
219
  }
220

221
  int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
3,331✔
222
  if (code == 0) {
3,331!
223
    code = streamDispatchStreamBlock(pTask);
3,331✔
224
  } else {
225
    stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
×
226
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
227
  }
228

229
  return code;
3,331✔
230
}
231

232
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
11,850✔
233
                                        int32_t transId) {
234
  int32_t     code = 0;
11,850✔
235
  int32_t     vgId = pTask->pMeta->vgId;
11,850✔
236
  int32_t     taskLevel = pTask->info.taskLevel;
11,850✔
237
  const char* id = pTask->id.idStr;
11,850✔
238

239
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
11,850✔
240
  if (pTask->chkInfo.checkpointId > checkpointId) {
11,850✔
241
    stError("s-task:%s vgId:%d current checkpointId:%" PRId64
1!
242
            " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
243
            id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
244
    return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
245
  }
246

247
  if (pActiveInfo->failedId >= checkpointId) {
11,849✔
248
    stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
1!
249
            " discard the checkpoint-trigger block",
250
            id, vgId, checkpointId, transId, pActiveInfo->failedId);
251
    return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
252
  }
253

254
  if (pTask->chkInfo.checkpointId == checkpointId) {
11,848✔
255
    {  // send checkpoint-ready msg to upstream
256
      SRpcMsg                msg = {0};
1✔
257
      SStreamUpstreamEpInfo* pInfo = NULL;
1✔
258
      streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
1✔
259
      if (pInfo == NULL) {
1!
260
        return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
261
      }
262

263
      code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
1✔
264
      if (code == TSDB_CODE_SUCCESS) {
1!
265
        code = tmsgSendReq(&pInfo->epSet, &msg);
1✔
266
        if (code) {
1!
267
          stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code));
×
268
        }
269
      }
270
    }
271

272
    stWarn(
1!
273
        "s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume "
274
        "the interrupted checkpoint",
275
        id, vgId, pBlock->srcTaskId);
276

277
    return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
278
  }
279

280
  if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
11,847✔
281
    if (pActiveInfo->activeId != checkpointId) {
8,519✔
282
      stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
1!
283
              " discard",
284
              id, vgId, pActiveInfo->activeId, checkpointId);
285
      return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
286
    } else {  // checkpointId == pActiveInfo->activeId
287
      if (pActiveInfo->allUpstreamTriggerRecv == 1) {
8,518✔
288
        stDebug(
1!
289
            "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
290
            "checkpointId:%" PRId64 " transId:%d",
291
            id, vgId, checkpointId, transId);
292
        return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
293
      }
294

295
      if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
8,517✔
296
        //  check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
297
        for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
13,807✔
298
          STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
8,588✔
299
          if (p == NULL) {
8,588!
300
            return TSDB_CODE_INVALID_PARA;
×
301
          }
302

303
          if (p->upstreamTaskId == pBlock->srcTaskId) {
8,588✔
304
            stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64
1!
305
                   ", prev recvTs:%" PRId64 " discard",
306
                   pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
307
            return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
308
          }
309
        }
310
      }
311
    }
312
  }
313

314
  return TSDB_CODE_SUCCESS;
11,844✔
315
}
316

317
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
11,844✔
318
  int64_t                checkpointId = 0;
11,844✔
319
  int32_t                transId = 0;
11,844✔
320
  const char*            id = pTask->id.idStr;
11,844✔
321
  int32_t                code = TSDB_CODE_SUCCESS;
11,844✔
322
  int32_t                vgId = pTask->pMeta->vgId;
11,844✔
323
  int32_t                taskLevel = pTask->info.taskLevel;
11,844✔
324
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
11,844✔
325

326
  SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
11,844✔
327
  if (pDataBlock == NULL) {
11,844!
328
    return TSDB_CODE_INVALID_PARA;
×
329
  }
330

331
  checkpointId = pDataBlock->info.version;
11,844✔
332
  transId = pDataBlock->info.window.skey;
11,844✔
333

334
  streamMutexLock(&pTask->lock);
11,844✔
335
  code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
11,844✔
336
  streamMutexUnlock(&pTask->lock);
11,844✔
337
  if (code) {
11,844!
338
    if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
×
339
      streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
×
340
    }
341
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
342
    return code;
×
343
  }
344

345
  stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
11,844✔
346
          ", transId:%d current active checkpointId:%" PRId64,
347
          id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
348

349
  // set task status
350
  if (streamTaskGetStatus(pTask).state != TASK_STATUS__CK) {
11,844✔
351
    pActiveInfo->activeId = checkpointId;
3,328✔
352
    pActiveInfo->transId = transId;
3,328✔
353

354
    if (pTask->chkInfo.startTs == 0) {
3,328!
355
      pTask->chkInfo.startTs = taosGetTimestampMs();
3,328✔
356
      pTask->execInfo.checkpoint += 1;
3,328✔
357
    }
358

359
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
3,328✔
360
    if (code != TSDB_CODE_SUCCESS) {
3,328!
361
      stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
×
362
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
363
      return code;
×
364
    }
365

366
    // if previous launched timer not started yet, not start a new timer
367
    // todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch
368
    //  a new checkpoint-trigger timer right now.
369
    //  And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint
370
    //  procedure to be stucked.
371
    SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
3,328✔
372
    int8_t          old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
3,328✔
373
    if (old == 0) {
3,328!
374
      stDebug("s-task:%s start checkpoint-trigger monitor in 10s", pTask->id.idStr);
3,328✔
375

376
      int64_t* pTaskRefId = NULL;
3,328✔
377
      code = streamTaskAllocRefId(pTask, &pTaskRefId);
3,328✔
378
      if (code == 0) {
3,328!
379
        streamTmrStart(checkpointTriggerMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId,
3,328✔
380
                       "trigger-recv-monitor");
381
        pTmrInfo->launchChkptId = pActiveInfo->activeId;
3,328✔
382
      }
383
    } else {  // already launched, do nothing
384
      stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
×
385
    }
386
  }
387

388
#if 0
389
  taosMsleep(20*1000);
390
#endif
391

392
  if (taskLevel == TASK_LEVEL__SOURCE) {
11,844✔
393
    int8_t type = pTask->outputInfo.type;
3,297✔
394
    pActiveInfo->allUpstreamTriggerRecv = 1;
3,297✔
395

396
    // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
397
    // The transfer of state may generate new data that need to dispatch to downstream tasks,
398
    // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
399
    // before the next checkpoint.
400
    code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
3,297✔
401
    if (code) {
3,297!
402
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
403
      return code;
×
404
    }
405

406
#if 0
407
    chkptFailedByRetrieveReqToSource(pTask, checkpointId);
408
#endif
409

410
    if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH ||
3,297!
411
        type == TASK_OUTPUT__VTABLE_MAP) {
412
      stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
3,050✔
413
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);  // todo handle this failure
3,050✔
414
    } else {  // only one task exists, no need to dispatch downstream info
415
      code =
416
          appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1);
247✔
417
      streamFreeQitem((SStreamQueueItem*)pBlock);
247✔
418
    }
419
  } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
8,547!
420
    // todo: handle this
421
    // update the child Id for downstream tasks
422
    code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
8,547✔
423

424
    // there are still some upstream tasks not send checkpoint request, do nothing and wait for then
425
    if (pActiveInfo->allUpstreamTriggerRecv != 1) {
8,547✔
426
      streamFreeQitem((SStreamQueueItem*)pBlock);
5,257✔
427
      return code;
5,257✔
428
    }
429

430
    int32_t num = streamTaskGetNumOfUpstream(pTask);
3,290✔
431
    if (taskLevel == TASK_LEVEL__SINK) {
3,290✔
432
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
3,009✔
433
      streamFreeQitem((SStreamQueueItem*)pBlock);
3,009✔
434
      code = streamTaskBuildCheckpoint(pTask);  // todo: not handle error yet
3,009✔
435
    } else {                                    // source & agg tasks need to forward the checkpoint msg downwards
436
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
281✔
437
      code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
281✔
438
      if (code) {
281!
439
        return code;
×
440
      }
441

442
      // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
443
      // this task already. And then, dispatch check point msg to all downstream tasks
444
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
281✔
445
    }
446
  }
447

448
  return code;
6,587✔
449
}
450

451
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
452
static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
8,492✔
453
                                          int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
454
                                          const char* id, int32_t* pNotReady, int32_t* pTransId, bool* alreadyRecv) {
455
  *alreadyRecv = false;
8,492✔
456
  int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
8,492✔
457
  for (int32_t i = 0; i < size; ++i) {
17,069✔
458
    STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
8,577✔
459
    if (p == NULL) {
8,577!
460
      return TSDB_CODE_INVALID_PARA;
×
461
    }
462

463
    if (p->downstreamTaskId == downstreamTaskId) {
8,577!
464
      (*alreadyRecv) = true;
×
465
      break;
×
466
    }
467
  }
468

469
  if (*alreadyRecv) {
8,492!
470
    stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
×
471
            downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
472
            numOfDownstream);
473
  } else {
474
    STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
8,492✔
475
                                     .downstreamTaskId = downstreamTaskId,
476
                                     .checkpointId = pInfo->activeId,
8,492✔
477
                                     .transId = pInfo->transId,
8,492✔
478
                                     .streamId = streamId,
479
                                     .downstreamNodeId = downstreamNodeId};
480
    void*                    p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
8,492✔
481
    if (p == NULL) {
8,492!
482
      stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
×
483
      return terrno;
×
484
    }
485
  }
486

487
  *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
8,492✔
488
  *pTransId = pInfo->transId;
8,492✔
489
  return 0;
8,492✔
490
}
491

492
/**
493
 * All down stream tasks have successfully completed the check point task.
494
 * Current stream task is allowed to start to do checkpoint things in ASYNC model.
495
 */
496
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
8,492✔
497
                                        int32_t downstreamTaskId) {
498
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
8,492✔
499

500
  const char* id = pTask->id.idStr;
8,492✔
501
  int32_t     total = streamTaskGetNumOfDownstream(pTask);
8,492✔
502
  int32_t     code = 0;
8,492✔
503
  int32_t     notReady = 0;
8,492✔
504
  int32_t     transId = 0;
8,492✔
505
  bool        alreadyHandled = false;
8,492✔
506

507
  // 1. not in checkpoint status now
508
  SStreamTaskState pStat = streamTaskGetStatus(pTask);
8,492✔
509
  if (pStat.state != TASK_STATUS__CK) {
8,492!
510
    stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
×
511
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
512
  }
513

514
  // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
515
  if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
8,492!
516
    stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
×
517
            ") from task:0x%x, expired and discard",
518
            id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
519
    return TSDB_CODE_INVALID_MSG;
×
520
  }
521

522
  streamMutexLock(&pInfo->lock);
8,492✔
523
  code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, &notReady,
8,492✔
524
                                    &transId, &alreadyHandled);
525
  streamMutexUnlock(&pInfo->lock);
8,492✔
526

527
  if (alreadyHandled) {
8,492!
528
    stDebug("s-task:%s checkpoint-ready msg checkpointId:%" PRId64 " from task:0x%x already handled, not handle again",
×
529
            id, checkpointId, downstreamTaskId);
530
  } else {
531
    if ((notReady == 0) && (code == 0) && (!alreadyHandled)) {
8,492!
532
      stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
3,290✔
533
      code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
3,290✔
534
    }
535
  }
536

537
  return code;
8,492✔
538
}
539

540
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
8,491✔
541
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
8,491✔
542
  int64_t                now = taosGetTimestampMs();
8,492✔
543
  int32_t                numOfConfirmed = 0;
8,492✔
544

545
  streamMutexLock(&pInfo->lock);
8,492✔
546
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
17,063!
547
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
17,063✔
548
    if (pReadyInfo == NULL) {
17,063!
549
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
550
              pTask->id.idStr, i);
551
      continue;
×
552
    }
553

554
    if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
17,063!
555
      pReadyInfo->sendCompleted = 1;
8,492✔
556
      stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
8,492✔
557
              pTask->id.idStr, upstreamTaskId, checkpointId, now);
558
      break;
8,492✔
559
    }
560
  }
561

562
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
34,126✔
563
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
25,634✔
564
    if (pReadyInfo == NULL) {
25,634!
565
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
566
              pTask->id.idStr, i);
567
      continue;
×
568
    }
569

570
    if (pReadyInfo->sendCompleted == 1) {
25,634✔
571
      numOfConfirmed += 1;
17,063✔
572
    }
573
  }
574

575
  stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
8,492✔
576
          numOfConfirmed, checkpointId);
577

578
  streamMutexUnlock(&pInfo->lock);
8,492✔
579
  return TSDB_CODE_SUCCESS;
8,492✔
580
}
581

582
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
5,712✔
583
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
5,712✔
584

585
  pTask->chkInfo.startTs = 0;             // clear the recorded start time
5,712✔
586
  streamTaskOpenAllUpstreamInput(pTask);  // open inputQ for all upstream tasks
5,712✔
587

588
  streamMutexLock(&pInfo->lock);
5,695✔
589
  streamTaskClearActiveInfo(pInfo);
5,735✔
590
  if (clearChkpReadyMsg) {
5,701!
591
    streamClearChkptReadyMsg(pInfo);
5,701✔
592
  }
593
  streamMutexUnlock(&pInfo->lock);
5,719✔
594

595
  stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64,
5,736✔
596
          pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
597
}
5,736✔
598

599
// The checkpointInfo can be updated in the following three cases:
600
// 1. follower tasks; 2. leader task with status of TASK_STATUS__CK; 3. restore not completed
601
static int32_t doUpdateCheckpointInfoCheck(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq,
5,825✔
602
                                           bool* pContinue) {
603
  SStreamMeta*     pMeta = pTask->pMeta;
5,825✔
604
  int32_t          vgId = pMeta->vgId;
5,825✔
605
  int32_t          code = 0;
5,825✔
606
  const char*      id = pTask->id.idStr;
5,825✔
607
  SCheckpointInfo* pInfo = &pTask->chkInfo;
5,825✔
608

609
  *pContinue = true;
5,825✔
610

611
  // not update the checkpoint info if the checkpointId is less than the failed checkpointId
612
  if (pReq->checkpointId < pInfo->pActiveInfo->failedId) {
5,825!
613
    stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64
×
614
           " is less than the failed checkpointId:%" PRId64 ", discard",
615
           id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId);
616

617
    *pContinue = false;
×
618
    return TSDB_CODE_SUCCESS;
×
619
  }
620

621
  // it's an expired checkpointInfo update msg, we still try to drop the required drop fill-history task.
622
  if (pReq->checkpointId <= pInfo->checkpointId) {
5,825✔
623
    stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
96!
624
            " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
625
            id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
626
            pReq->transId);
627

628
    { // destroy the related fill-history tasks
629
      if (pReq->dropRelHTask) {
96!
630
        if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
×
UNCOV
631
          code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
×
632

633
          int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
UNCOV
634
          stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
×
635
                  id, vgId, pReq->taskId, numOfTasks);
636

637
          // todo: task may not exist, commit anyway, optimize this later
UNCOV
638
          code = streamMetaCommit(pMeta);
×
639
        }
640
      }
641
    }
642

643
    *pContinue = false;
96✔
644
    // always return true
645
    return TSDB_CODE_SUCCESS;
96✔
646
  }
647

648
  SStreamTaskState status = streamTaskGetStatus(pTask);
5,729✔
649

650
  if (!restored) {  // during restore procedure, do update checkpoint-info
5,735✔
651
    stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
6!
652
            " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
653
            id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
654
            pInfo->checkpointTime, pReq->checkpointTs);
655
  } else {  // not in restore status, must be in checkpoint status
656
    if (((status.state == TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) ||
5,729!
657
        (pMeta->role == NODE_ROLE_FOLLOWER)) {
27!
658
      stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64
5,729✔
659
              "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
660
              id, vgId, status.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
661
              pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
662
    } else {
UNCOV
663
      stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
×
664
              " checkpointVer:%" PRId64 "->%" PRId64,
665
              id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
666
              pReq->checkpointVer);
667
    }
668
  }
669

670
  bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
11,461!
671
                pInfo->processedVer <= pReq->checkpointVer);
5,729!
672

673
  if (!valid) {
5,732!
674
    // invalid update checkpoint info for leader, since the processedVer is greater than the checkpointVer
675
    // It is possible for follower tasks that the processedVer is greater than the checkpointVer, and the processed info
676
    // in follower tasks will be discarded, since the leader/follower switch happens before the checkpoint of the
677
    // processedVer being generated.
678
    if (pMeta->role == NODE_ROLE_LEADER) {
×
679

UNCOV
680
      stFatal("s-task:%s checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
×
681
              " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it",
682
              id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
683
              pReq->checkpointVer);
684

UNCOV
685
      *pContinue = false;
×
686
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
687
    } else {
UNCOV
688
      stInfo("s-task:%s vgId:%d follower recv checkpointId update info, current checkpointId:%" PRId64
×
689
             " checkpointVer:%" PRId64 " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64,
690
             id, pMeta->vgId, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
691
             pReq->checkpointVer);
692
    }
693
  }
694

695
  return TSDB_CODE_SUCCESS;
5,722✔
696
}
697

698
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
5,824✔
699
  SStreamMeta*     pMeta = pTask->pMeta;
5,824✔
700
  int32_t          vgId = pMeta->vgId;
5,824✔
701
  int32_t          code = 0;
5,824✔
702
  const char*      id = pTask->id.idStr;
5,824✔
703
  SCheckpointInfo* pInfo = &pTask->chkInfo;
5,824✔
704
  bool             continueUpdate = true;
5,824✔
705

706
  streamMutexLock(&pTask->lock);
5,824✔
707
  code = doUpdateCheckpointInfoCheck(pTask, restored, pReq, &continueUpdate);
5,832✔
708

709
  if (!continueUpdate) {
5,821✔
710
    streamMutexUnlock(&pTask->lock);
96✔
711
    return code;
96✔
712
  }
713

714
  SStreamTaskState pStatus = streamTaskGetStatus(pTask);
5,725✔
715

716
  // update only it is in checkpoint status, or during restore procedure.
717
  if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
5,723!
718
    pInfo->checkpointId = pReq->checkpointId;
5,723✔
719
    pInfo->checkpointVer = pReq->checkpointVer;
5,723✔
720
    pInfo->checkpointTime = pReq->checkpointTs;
5,723✔
721

722
    if (restored && (pMeta->role == NODE_ROLE_LEADER)) {
5,723✔
723
      code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
5,692✔
724
    }
725
  }
726

727
  streamTaskClearCheckInfo(pTask, true);
5,727✔
728

729
  if (pReq->dropRelHTask) {
5,736✔
730
    if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
3,468✔
731
      stInfo("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
3,461✔
732
              pReq->taskId, vgId, pReq->hTaskId);
733
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
3,462✔
734
    } else {
735
      stInfo("s-task:0x%x vgId:%d update the related fill-history task:0x%" PRIx64" to be recalculate task",
7!
736
             pReq->taskId, vgId, pReq->hTaskId);
737
    }
738
  }
739

740
  stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId,
5,737✔
741
          streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask).name);
742

743
  pTask->status.taskStatus = TASK_STATUS__READY;
5,737✔
744

745
  code = streamMetaSaveTaskInMeta(pMeta, pTask);
5,737✔
746
  streamMutexUnlock(&pTask->lock);
5,726✔
747

748
  if (code != TSDB_CODE_SUCCESS) {
5,727!
UNCOV
749
    stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
×
750
            vgId, pReq->checkpointId, tstrerror(code));
UNCOV
751
    return TSDB_CODE_SUCCESS;
×
752
  }
753

754
  // drop task should not in the meta-lock, and drop the related fill-history task now
755
  if (pReq->dropRelHTask) {
5,727✔
756
    if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
3,469✔
757
      code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
3,462✔
758
      int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
3,458✔
759
      stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
3,459✔
760
              (int32_t)pReq->hTaskId, numOfTasks);
761
    } else {
762
      STaskId hTaskId = {.streamId = pReq->hStreamId, .taskId = pReq->hTaskId};
7✔
763

764
      SStreamTask* pHTask = NULL;
7✔
765
      code = streamMetaAcquireTaskUnsafe(pMeta, &hTaskId, &pHTask);
7✔
766
      if (code == 0 && pHTask != NULL) {
7!
767
        stInfo("s-task:0x%x fill-history updated to recalculate task, reset step2Start ts, stream task:0x%x",
7!
768
               (int32_t)hTaskId.taskId, pReq->taskId);
769

770
        streamMutexLock(&pHTask->lock);
7✔
771

772
        pHTask->info.fillHistory = STREAM_RECALCUL_TASK;
7✔
773
        pHTask->execInfo.step2Start = 0; // clear the step2start timestamp
7✔
774

775
        SStreamTaskState status = streamTaskGetStatus(pHTask);
7✔
776
        if (status.state == TASK_STATUS__SCAN_HISTORY) {
7✔
777
          code = streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
4✔
778
        }
779

780
        if (pHTask->pBackend != NULL) {
7✔
781
          streamFreeTaskState(pHTask, TASK_STATUS__READY);
5✔
782
          pHTask->pBackend = NULL;
5✔
783
        }
784

785
        if (pHTask->exec.pExecutor != NULL) {
7✔
786
          qDestroyTask(pHTask->exec.pExecutor);
5✔
787
          pHTask->exec.pExecutor = NULL;
5✔
788
        }
789

790
        pMeta->expandTaskFn(pHTask);
7✔
791

792
        streamMutexUnlock(&pHTask->lock);
7✔
793

794
        code = streamMetaSaveTaskInMeta(pMeta, pHTask);
7✔
795
        streamMetaReleaseTask(pMeta, pHTask);
7✔
796
      }
797
    }
798
  }
799

800
  code = streamMetaCommit(pMeta);
5,724✔
801
  return TSDB_CODE_SUCCESS;
5,737✔
802
}
803

804
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) {
4✔
805
  struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
4✔
806

807
  if (failedId <= 0) {
4✔
808
    stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64
2!
809
           " activeId:%" PRId64,
810
           pTask->id.idStr, pInfo->failedId, pInfo->activeId);
811
  } else {
812
    if (failedId <= pInfo->failedId) {
2✔
813
      stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId);
1!
814
    } else {
815
      stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64
1!
816
              " prev failedId:%" PRId64,
817
              pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId);
818
      pInfo->failedId = failedId;
1✔
819
    }
820
  }
821
}
4✔
822

823
void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
559✔
824
  streamMutexLock(&pTask->lock);
559✔
825
  ETaskStatus status = streamTaskGetStatus(pTask).state;
560✔
826
  if (status == TASK_STATUS__CK) {
560!
827
    streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId);
×
828
  }
829
  streamMutexUnlock(&pTask->lock);
560✔
830
}
560✔
831

832
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
1✔
833
  int32_t code = 0;
1✔
834
  int32_t cap = strlen(path) + 64;
1✔
835

836
  char* filePath = taosMemoryCalloc(1, cap);
1!
837
  if (filePath == NULL) {
1!
UNCOV
838
    return terrno;
×
839
  }
840

841
  int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
1✔
842
  if (nBytes <= 0 || nBytes >= cap) {
1!
UNCOV
843
    taosMemoryFree(filePath);
×
UNCOV
844
    return TSDB_CODE_OUT_OF_RANGE;
×
845
  }
846

847
  code = downloadCheckpointDataByName(id, "META", filePath);
1✔
848
  if (code != 0) {
1!
UNCOV
849
    stError("%s chkp failed to download meta file:%s", id, filePath);
×
UNCOV
850
    taosMemoryFree(filePath);
×
UNCOV
851
    return code;
×
852
  }
853

854
  code = remoteChkpGetDelFile(filePath, list);
1✔
855
  if (code != 0) {
1!
856
    stError("%s chkp failed to get to del:%s", id, filePath);
1!
857
    taosMemoryFree(filePath);
1!
858
  }
859
  return 0;
1✔
860
}
861

862
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
3✔
863
  int32_t      code = 0;
3✔
864
  char*        path = NULL;
3✔
865
  int64_t      chkptSize = 0;
3✔
866
  SStreamMeta* pMeta = pTask->pMeta;
3✔
867
  const char*  idStr = pTask->id.idStr;
3✔
868
  int64_t      now = taosGetTimestampMs();
3✔
869

870
  SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
3✔
871
  if (toDelFiles == NULL) {
3!
UNCOV
872
    stError("s-task:%s failed to prepare array list during upload checkpoint, code:%s", pTask->id.idStr,
×
873
            tstrerror(terrno));
UNCOV
874
    return terrno;
×
875
  }
876

877
  if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
3✔
878
                                      pTask->id.idStr)) != 0) {
879
    stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code));
1!
880
  }
881

882
  if (type == DATA_UPLOAD_S3) {
3✔
883
    if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
1!
UNCOV
884
      stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
×
885
              tstrerror(code));
886
    }
887
  }
888

889
  if (code == TSDB_CODE_SUCCESS) {
3✔
890
    code = streamTaskUploadCheckpoint(idStr, path, checkpointId);
2✔
891
    if (code == TSDB_CODE_SUCCESS) {
2✔
892
      stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
1!
893
    } else {
894
      stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path,
1!
895
              tstrerror(code));
896
    }
897
  }
898

899
  int32_t num = taosArrayGetSize(toDelFiles);
3✔
900
  if (code == TSDB_CODE_SUCCESS && num > 0) {
3!
901
    stDebug("s-task:%s remove redundant %d files", idStr, num);
×
902

903
    for (int i = 0; i < num; i++) {
×
UNCOV
904
      char* pName = taosArrayGetP(toDelFiles, i);
×
UNCOV
905
      code = deleteCheckpointFile(idStr, pName);
×
UNCOV
906
      if (code != 0) {
×
UNCOV
907
        stDebug("s-task:%s failed to remove file: %s", idStr, pName);
×
UNCOV
908
        break;
×
909
      }
910
    }
911

UNCOV
912
    stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
×
913
  }
914

915
  taosArrayDestroyP(toDelFiles, NULL);
3✔
916
  double el = (taosGetTimestampMs() - now) / 1000.0;
3✔
917

918
  if (code == TSDB_CODE_SUCCESS) {
3✔
919
    code = taosGetDirSize(path, &chkptSize);
1✔
920
    stDebug("s-task:%s complete upload checkpointId:%" PRId64
1!
921
            ", elapsed time:%.2fs, checkpointSize:%.2fKiB local dir:%s",
922
            idStr, checkpointId, el, SIZE_IN_KiB(chkptSize), path);
923
  } else {
924
    stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " elapsed time:%.2fs, checkpointSize:%.2fKiB", idStr,
2!
925
            checkpointId, el, SIZE_IN_KiB(chkptSize));
926
  }
927

928
  taosMemoryFree(path);
3!
929
  return code;
3✔
930
}
931

932
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
6,514✔
933
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
6,514✔
934
  if (type == DATA_UPLOAD_DISABLE) {
6,514✔
935
    stDebug("s-task:%s not config to backup checkpoint data at snode, checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
6,513✔
936
    return 0;
6,513✔
937
  }
938

939
  if (pTask == NULL || pTask->pBackend == NULL) {
1!
UNCOV
940
    return 0;
×
941
  }
942

943
  int64_t dbRefId = taskGetDBRef(pTask->pBackend);
1✔
944
  void*   pBackend = taskAcquireDb(dbRefId);
1✔
945
  if (pBackend == NULL) {
1!
UNCOV
946
    stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData",
×
947
            pTask->id.idStr);
948
    return -1;
×
949
  }
950

951
  int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
1✔
952
  taskReleaseDb(dbRefId);
1✔
953

954
  return code;
1✔
955
}
956

957
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
6,514✔
958
  int32_t      code = TSDB_CODE_SUCCESS;
6,514✔
959
  int64_t      startTs = pTask->chkInfo.startTs;
6,514✔
960
  int64_t      ckId = pTask->chkInfo.pActiveInfo->activeId;
6,514✔
961
  const char*  id = pTask->id.idStr;
6,514✔
962
  SStreamMeta* pMeta = pTask->pMeta;
6,514✔
963

964
  streamMutexLock(&pTask->lock);
6,514✔
965
  bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
6,514✔
966
  streamMutexUnlock(&pTask->lock);
6,514✔
967

968
  // sink task does not need to save the status, and generated the checkpoint
969
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
6,514✔
970
    stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
3,504✔
971

972
    int64_t ver = pTask->chkInfo.processedVer;
3,504✔
973
    code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver);
3,504✔
974
    if (code != TSDB_CODE_SUCCESS) {
3,504!
UNCOV
975
      stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
×
976
    }
977

978
    int64_t et = taosGetTimestampMs();
3,504✔
979
    stDebug("s-task:%s gen local checkpoint completed, elapsed time:%.2fs", id, (et - startTs) / 1000.0);
3,504✔
980
  }
981

982
  // TODO: monitoring the checkpoint-source msg
983
  // send check point response to upstream task
984
  if (code == TSDB_CODE_SUCCESS) {
6,514!
985
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
6,514✔
986
      code = streamTaskSendCheckpointSourceRsp(pTask);
3,209✔
987
    } else {
988
      code = streamTaskSendCheckpointReadyMsg(pTask);
3,305✔
989
    }
990

991
    if (code != TSDB_CODE_SUCCESS) {
6,514!
992
      // todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
UNCOV
993
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
×
994
              tstrerror(code));
995
    }
996
  }
997

998
  if (code == TSDB_CODE_SUCCESS) {
6,514!
999
    code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
6,514✔
1000
    if (code != TSDB_CODE_SUCCESS) {
6,514✔
1001
      stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
1!
1002
    }
1003
  } else {
1004
    stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
×
1005
  }
1006

1007
  // TODO: monitoring the checkpoint-report msg
1008
  // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
1009
  if (code == TSDB_CODE_SUCCESS) {
6,514✔
1010
    if (pTask->pMsgCb != NULL) {
6,513✔
1011
      code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
6,499✔
1012
    }
1013
  } else {  // clear the checkpoint info if failed
1014
    // set failed checkpoint id before clear the checkpoint info
1015
    streamMutexLock(&pTask->lock);
1✔
1016
    streamTaskSetFailedCheckpointId(pTask, ckId);
1✔
1017
    streamMutexUnlock(&pTask->lock);
1✔
1018

1019
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
1✔
1020
    stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
1!
1021
  }
1022

1023
  double el = (taosGetTimestampMs() - startTs) / 1000.0;
6,514✔
1024
  stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
6,514!
1025
         pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
1026
         (code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
1027

1028
  return code;
6,514✔
1029
}
1030

1031
static int32_t doChkptStatusCheck(SStreamTask* pTask, void* param) {
1✔
1032
  const char*            id = pTask->id.idStr;
1✔
1033
  int32_t                vgId = pTask->pMeta->vgId;
1✔
1034
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1✔
1035
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
1✔
1036

1037
  // checkpoint-trigger recv flag is set, quit
1038
  if (pActiveInfo->allUpstreamTriggerRecv) {
1!
1039
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1040
    stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger", id, vgId);
×
UNCOV
1041
    return -1;
×
1042
  }
1043

1044
  if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
1!
UNCOV
1045
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
UNCOV
1046
    stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
×
1047
           ", quit",
1048
           id, vgId, pTmrInfo->launchChkptId);
1049
    return -1;
×
1050
  }
1051

1052
  // active checkpoint info is cleared for now
1053
  if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
1!
UNCOV
1054
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
UNCOV
1055
    stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr", id,
×
1056
           vgId);
UNCOV
1057
    return -1;
×
1058
  }
1059

1060
  return 0;
1✔
1061
}
1062

1063
static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) {
1✔
1064
  const char*            id = pTask->id.idStr;
1✔
1065
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1✔
1066

1067
  SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
1✔
1068
  if (pNotSendList == NULL) {
1!
UNCOV
1069
    stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
×
UNCOV
1070
    return terrno;
×
1071
  }
1072

1073
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
2✔
1074
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
1✔
1075

1076
    bool recved = false;
1✔
1077
    for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
2✔
1078
      STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
1✔
1079
      if (pReady == NULL) {
1!
UNCOV
1080
        continue;
×
1081
      }
1082

1083
      if (pInfo->nodeId == pReady->upstreamNodeId) {
1!
UNCOV
1084
        recved = true;
×
UNCOV
1085
        break;
×
1086
      }
1087
    }
1088

1089
    if (!recved) {  // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
1!
1090
      streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
1✔
1091
      void* px = taosArrayPush(pNotSendList, pInfo);
1✔
1092
      if (px == NULL) {
1!
UNCOV
1093
        stError("s-task:%s failed to record not send info, code: out of memory", id);
×
UNCOV
1094
        taosArrayDestroy(pNotSendList);
×
UNCOV
1095
        return terrno;
×
1096
      }
1097
    }
1098
  }
1099

1100
  *ppNotSendList = pNotSendList;
1✔
1101
  return 0;
1✔
1102
}
1103

1104
int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) {
1✔
1105
  const char*            id = pTask->id.idStr;
1✔
1106
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
1✔
1107
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1✔
1108
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
1✔
1109
  int32_t                vgId = pTask->pMeta->vgId;
1✔
1110

1111
  int32_t code = doChkptStatusCheck(pTask, param);
1✔
1112
  if (code) {
1!
UNCOV
1113
    return code;
×
1114
  }
1115

1116
  code = doFindNotSendUpstream(pTask, pList, ppNotSendList);
1✔
1117
  if (code) {
1!
UNCOV
1118
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
UNCOV
1119
    stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
×
UNCOV
1120
    return code;
×
1121
  }
1122

1123
  // do send retrieve checkpoint trigger msg to upstream
1124
  code = doSendRetrieveTriggerMsg(pTask, *ppNotSendList);
1✔
1125
  if (code) {
1!
UNCOV
1126
    stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
UNCOV
1127
    code = 0;
×
1128
  }
1129

1130
  return code;
1✔
1131
}
1132

1133
static void doCleanup(SStreamTask* pTask, SArray* pList) {
120,123✔
1134
  streamMetaReleaseTask(pTask->pMeta, pTask);
120,123✔
1135
  taosArrayDestroy(pList);
120,123✔
1136
}
120,123✔
1137

1138
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
120,124✔
1139
  int32_t      code = 0;
120,124✔
1140
  int32_t      numOfNotSend = 0;
120,124✔
1141
  SArray*      pNotSendList = NULL;
120,124✔
1142
  int64_t      taskRefId = *(int64_t*)param;
120,124✔
1143
  int64_t      now = taosGetTimestampMs();
120,124✔
1144

1145
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
120,124✔
1146
  if (pTask == NULL) {
120,124✔
1147
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
1!
1148
    streamTaskFreeRefId(param);
1✔
1149
    return;
120,124✔
1150
  }
1151

1152
  int32_t                vgId = pTask->pMeta->vgId;
120,123✔
1153
  const char*            id = pTask->id.idStr;
120,123✔
1154
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
120,123✔
1155
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
120,123✔
1156
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
120,123✔
1157

1158
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
120,123!
UNCOV
1159
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1160
    stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id);
×
1161
    doCleanup(pTask, pNotSendList);
×
1162
    return;
×
1163
  }
1164

1165
  // check the status every 100ms
1166
  if (streamTaskShouldStop(pTask)) {
120,123✔
1167
    streamCleanBeforeQuitTmr(pTmrInfo, param);
72✔
1168
    stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger", id, vgId);
72!
1169
    doCleanup(pTask, pNotSendList);
72✔
1170
    return;
72✔
1171
  }
1172

1173
  if (++pTmrInfo->activeCounter < 50) {
120,051✔
1174
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
118,209✔
1175
                   "trigger-recv-monitor");
1176
    doCleanup(pTask, pNotSendList);
118,209✔
1177
    return;
118,209✔
1178
  }
1179

1180
  pTmrInfo->activeCounter = 0;
1,842✔
1181
  stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
1,842✔
1182

1183
  streamMutexLock(&pTask->lock);
1,842✔
1184
  SStreamTaskState state = streamTaskGetStatus(pTask);
1,842✔
1185
  streamMutexUnlock(&pTask->lock);
1,842✔
1186

1187
  if (state.state != TASK_STATUS__CK) {
1,842!
1188
    streamCleanBeforeQuitTmr(pTmrInfo, param);
1,842✔
1189
    stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger", id,
1,842✔
1190
            vgId, state.name);
1191
    doCleanup(pTask, pNotSendList);
1,842✔
1192
    return;
1,842✔
1193
  }
1194

UNCOV
1195
  streamMutexLock(&pActiveInfo->lock);
×
UNCOV
1196
  code = chkptTriggerRecvMonitorHelper(pTask, param, &pNotSendList);
×
UNCOV
1197
  streamMutexUnlock(&pActiveInfo->lock);
×
1198

UNCOV
1199
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1200
    doCleanup(pTask, pNotSendList);
×
UNCOV
1201
    return;
×
1202
  }
1203

1204
  // check every 100ms
UNCOV
1205
  numOfNotSend = taosArrayGetSize(pNotSendList);
×
UNCOV
1206
  if (numOfNotSend > 0) {
×
UNCOV
1207
    stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
×
UNCOV
1208
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
×
1209
                   "trigger-recv-monitor");
1210
  } else {
UNCOV
1211
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
UNCOV
1212
    stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr", id);
×
1213
  }
1214

1215
  doCleanup(pTask, pNotSendList);
×
1216
}
1217

1218
int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
1✔
1219
  int32_t     code = 0;
1✔
1220
  int32_t     vgId = pTask->pMeta->vgId;
1✔
1221
  const char* pId = pTask->id.idStr;
1✔
1222
  int32_t     size = taosArrayGetSize(pNotSendList);
1✔
1223
  int32_t     numOfUpstream = streamTaskGetNumOfUpstream(pTask);
1✔
1224
  int64_t     checkpointId = pTask->chkInfo.pActiveInfo->activeId;
1✔
1225

1226
  if (size <= 0) {
1!
1227
    stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
×
UNCOV
1228
    return code;
×
1229
  }
1230

1231
  stDebug("s-task:%s %d/%d not recv checkpoint-trigger from upstream(s), start to send trigger-retrieve", pId, size,
1!
1232
          numOfUpstream);
1233

1234
  for (int32_t i = 0; i < size; i++) {
2✔
1235
    SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
1✔
1236
    if (pUpstreamTask == NULL) {
1!
UNCOV
1237
      return TSDB_CODE_INVALID_PARA;
×
1238
    }
1239

1240
    int32_t  ret = 0;
1✔
1241
    int32_t  tlen = 0;
1✔
1242
    void*    buf = NULL;
1✔
1243
    SRpcMsg  rpcMsg = {0};
1✔
1244
    SEncoder encoder;
1245

1246
    SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId,
1✔
1247
                                    .downstreamTaskId = pTask->id.taskId,
1✔
1248
                                    .downstreamNodeId = vgId,
1249
                                    .upstreamTaskId = pUpstreamTask->taskId,
1✔
1250
                                    .upstreamNodeId = pUpstreamTask->nodeId,
1✔
1251
                                    .checkpointId = checkpointId};
1252

1253
    tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
1!
1254
    if (ret < 0) {
1!
UNCOV
1255
      stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code));
×
1256
    }
1257

1258
    buf = rpcMallocCont(tlen + sizeof(SMsgHead));
1✔
1259
    if (buf == NULL) {
1!
1260
      stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId);
×
1261
      continue;
×
1262
    }
1263

1264
    ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId);
1✔
1265
    void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1✔
1266

1267
    tEncoderInit(&encoder, abuf, tlen);
1✔
1268
    if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) {
1!
UNCOV
1269
      rpcFreeCont(buf);
×
UNCOV
1270
      tEncoderClear(&encoder);
×
1271
      stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code));
×
UNCOV
1272
      continue;
×
1273
    }
1274
    tEncoderClear(&encoder);
1✔
1275

1276
    initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead));
1✔
1277

1278
    code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
1✔
1279
    if (code == TSDB_CODE_SUCCESS) {
1!
1280
      stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId,
1!
1281
              vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1282
    } else {
1283
      stError("s-task:%s vgId:%d failed to send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64,
×
1284
              pId, vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1285
    }
1286
  }
1287

1288
  return code;
1✔
1289
}
1290

1291
static int32_t isAlreadySendTriggerNoLock(SStreamTask* pTask, int32_t downstreamNodeId) {
1✔
1292
  int64_t                now = taosGetTimestampMs();
1✔
1293
  const char*            id = pTask->id.idStr;
1✔
1294
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
1✔
1295
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
1✔
1296

1297
  if (!pInfo->dispatchTrigger) {
1!
UNCOV
1298
    return false;
×
1299
  }
1300

1301
  int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList);
1✔
1302
  for (int32_t i = 0; i < num; ++i) {
1!
1303
    STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
1✔
1304
    if (pSendInfo == NULL) {
1!
UNCOV
1305
      stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num);
×
UNCOV
1306
      continue;
×
1307
    }
1308

1309
    if (pSendInfo->nodeId != downstreamNodeId) {
1!
UNCOV
1310
      continue;
×
1311
    }
1312

1313
    // has send trigger msg to downstream node,
1314
    double before = (now - pSendInfo->sendTs) / 1000.0;
1✔
1315
    if (pSendInfo->recved) {
1!
UNCOV
1316
      stWarn("s-task:%s checkpoint-trigger msg already send at:%" PRId64
×
1317
             "(%.2fs before) and recv confirmed by downstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
1318
             id, pSendInfo->sendTs, before, pSendInfo->taskId, pInfo->activeId, pInfo->transId);
1319
    } else {
1320
      stWarn("s-task:%s checkpoint-trigger already send at:%" PRId64 "(%.2fs before), checkpointId:%" PRId64
1!
1321
             ", transId:%d",
1322
             id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
1323
    }
1324

1325
    return true;
1✔
1326
  }
1327

UNCOV
1328
  return false;
×
1329
}
1330

1331
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
1✔
1332
  int64_t                now = taosGetTimestampMs();
1✔
1333
  const char*            id = pTask->id.idStr;
1✔
1334
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
1✔
1335
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
1✔
1336

1337
  if (pStatus.state != TASK_STATUS__CK) {
1!
UNCOV
1338
    return false;
×
1339
  }
1340

1341
  streamMutexLock(&pInfo->lock);
1✔
1342
  bool send = isAlreadySendTriggerNoLock(pTask, downstreamNodeId);
1✔
1343
  streamMutexUnlock(&pInfo->lock);
1✔
1344

1345
  return send;
1✔
1346
}
1347

1348
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) {
2✔
1349
  *pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList);
2✔
1350

1351
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2✔
1352
    *pTotal = 1;
1✔
1353
  } else {
1354
    *pTotal = streamTaskGetNumOfUpstream(pTask);
1✔
1355
  }
1356
}
2✔
1357

1358
// record the dispatch checkpoint trigger info in the list
1359
// memory insufficient may cause the stream computing stopped
1360
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId) {
3,331✔
1361
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
3,331✔
1362
  int64_t                now = taosGetTimestampMs();
3,331✔
1363
  int32_t                code = 0;
3,331✔
1364

1365
  streamMutexLock(&pInfo->lock);
3,331✔
1366

1367
  if (sendingChkptId > pInfo->failedId) {
3,331!
1368
    pInfo->dispatchTrigger = true;
3,331✔
1369
    if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
3,331✔
1370
      STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
594✔
1371

1372
      STaskTriggerSendInfo p = {
594✔
1373
          .sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
594✔
1374
      void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
594✔
1375
      if (px == NULL) {  // pause the stream task, if memory not enough
594!
UNCOV
1376
        code = terrno;
×
1377
      }
1378
    } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
2,737!
NEW
1379
      for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
×
NEW
1380
        STaskDispatcherFixed* pAddr = taosArrayGet(pTask->outputInfo.vtableMapDispatcher.taskInfos, i);
×
NEW
1381
        if (pAddr == NULL) {
×
NEW
1382
          continue;
×
1383
        }
1384

NEW
1385
        STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pAddr->nodeId, .taskId = pAddr->taskId};
×
NEW
1386
        void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
×
NEW
1387
        if (px == NULL) {  // pause the stream task, if memory not enough
×
NEW
1388
          code = terrno;
×
NEW
1389
          break;
×
1390
        }
1391
      }
1392
    } else {
1393
      for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
10,745✔
1394
        SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
8,008✔
1395
        if (pVgInfo == NULL) {
8,008!
UNCOV
1396
          continue;
×
1397
        }
1398

1399
        STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
8,008✔
1400
        void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
8,008✔
1401
        if (px == NULL) {  // pause the stream task, if memory not enough
8,008!
1402
          code = terrno;
×
UNCOV
1403
          break;
×
1404
        }
1405
      }
1406
    }
1407
  }
1408

1409
  streamMutexUnlock(&pInfo->lock);
3,331✔
1410

1411
  return code;
3,331✔
1412
}
1413

1414
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
8,599✔
1415
  int32_t num = 0;
8,599✔
1416
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
34,563✔
1417
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
25,964✔
1418
    if (p == NULL) {
25,964!
1419
      continue;
×
1420
    }
1421

1422
    if (p->recved) {
25,964✔
1423
      num++;
17,281✔
1424
    }
1425
  }
1426
  return num;
8,599✔
1427
}
1428

1429
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
8,599✔
1430
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
8,599✔
1431

1432
  int64_t now = taosGetTimestampMs();
8,599✔
1433
  int32_t taskId = 0;
8,599✔
1434
  int32_t total = streamTaskGetNumOfDownstream(pTask);
8,599✔
1435
  bool    alreadyRecv = false;
8,599✔
1436

1437
  streamMutexLock(&pInfo->lock);
8,599✔
1438

1439
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
17,281!
1440
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
17,281✔
1441
    if (p == NULL) {
17,281!
UNCOV
1442
      continue;
×
1443
    }
1444

1445
    if (p->nodeId == vgId) {
17,281✔
1446
      if (p->recved) {
8,599!
UNCOV
1447
        stWarn("s-task:%s already recv checkpoint-trigger msg rsp from vgId:%d down:0x%x %.2fs ago, req send:%" PRId64
×
1448
               " discard",
1449
               pTask->id.idStr, vgId, p->taskId, (now - p->recvTs) / 1000.0, p->sendTs);
1450
        alreadyRecv = true;
×
1451
      } else {
1452
        p->recved = true;
8,599✔
1453
        p->recvTs = taosGetTimestampMs();
8,599✔
1454
        taskId = p->taskId;
8,599✔
1455
      }
1456
      break;
8,599✔
1457
    }
1458
  }
1459

1460
  int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
8,599✔
1461
  streamMutexUnlock(&pInfo->lock);
8,599✔
1462

1463
  if (taskId == 0) {
8,599!
1464
    stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
×
1465
  } else {
1466
    if (!alreadyRecv) {
8,599!
1467
      stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
8,599✔
1468
              pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
1469
    }
1470
  }
1471
}
8,599✔
1472

1473
int32_t uploadCheckpointToS3(const char* id, const char* path) {
1✔
1474
  int32_t code = 0;
1✔
1475
  int32_t nBytes = 0;
1✔
1476
  /*
1477
  if (s3Init() != 0) {
1478
    return TSDB_CODE_THIRDPARTY_ERROR;
1479
  }
1480
  */
1481
  TdDirPtr pDir = taosOpenDir(path);
1✔
1482
  if (pDir == NULL) {
1!
UNCOV
1483
    return terrno;
×
1484
  }
1485

1486
  TdDirEntryPtr de = NULL;
1✔
1487
  while ((de = taosReadDir(pDir)) != NULL) {
5✔
1488
    char* name = taosGetDirEntryName(de);
4✔
1489
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
4!
1490

1491
    char filename[PATH_MAX] = {0};
2✔
1492
    if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
2!
UNCOV
1493
      nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
×
UNCOV
1494
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
×
1495
        code = TSDB_CODE_OUT_OF_RANGE;
×
1496
        break;
×
1497
      }
1498
    } else {
1499
      nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
2✔
1500
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
2!
UNCOV
1501
        code = TSDB_CODE_OUT_OF_RANGE;
×
UNCOV
1502
        break;
×
1503
      }
1504
    }
1505

1506
    char object[PATH_MAX] = {0};
2✔
1507
    nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
2✔
1508
    if (nBytes <= 0 || nBytes >= sizeof(object)) {
2!
UNCOV
1509
      code = TSDB_CODE_OUT_OF_RANGE;
×
UNCOV
1510
      break;
×
1511
    }
1512

1513
    code = tcsPutObjectFromFile2(filename, object, 0);
2✔
1514
    if (code != 0) {
2!
1515
      stError("[tcs] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
2!
1516
    } else {
UNCOV
1517
      stDebug("[tcs] upload checkpoint:%s", filename);
×
1518
    }
1519
  }
1520

1521
  int32_t ret = taosCloseDir(&pDir);
1✔
1522
  if (code == 0 && ret != 0) {
1!
UNCOV
1523
    code = ret;
×
1524
  }
1525

1526
  return code;
1✔
1527
}
1528

1529
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
1✔
1530
  int32_t nBytes;
1531
  int32_t cap = strlen(id) + strlen(dstName) + 16;
1✔
1532

1533
  char* buf = taosMemoryCalloc(1, cap);
1!
1534
  if (buf == NULL) {
1!
UNCOV
1535
    return terrno;
×
1536
  }
1537

1538
  nBytes = snprintf(buf, cap, "%s/%s", id, fname);
1✔
1539
  if (nBytes <= 0 || nBytes >= cap) {
1!
1540
    taosMemoryFree(buf);
×
UNCOV
1541
    return TSDB_CODE_OUT_OF_RANGE;
×
1542
  }
1543
  int32_t code = tcsGetObjectToFile(buf, dstName);
1✔
1544
  if (code != 0) {
1!
1545
    taosMemoryFree(buf);
1!
1546
    return TAOS_SYSTEM_ERROR(ERRNO);
1✔
1547
  }
UNCOV
1548
  taosMemoryFree(buf);
×
UNCOV
1549
  return 0;
×
1550
}
1551

1552
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
6,524✔
1553
  if (strlen(tsSnodeAddress) != 0) {
6,524✔
1554
    return DATA_UPLOAD_RSYNC;
1✔
1555
  } else if (tsS3StreamEnabled) {
6,523!
UNCOV
1556
    return DATA_UPLOAD_S3;
×
1557
  } else {
1558
    return DATA_UPLOAD_DISABLE;
6,523✔
1559
  }
1560
}
1561

1562
int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) {
2✔
1563
  int32_t code = 0;
2✔
1564
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
2!
1565
    stError("invalid parameters in upload checkpoint, %s", id);
1!
1566
    return TSDB_CODE_INVALID_CFG;
1✔
1567
  }
1568

1569
  if (strlen(tsSnodeAddress) != 0) {
1!
UNCOV
1570
    code = uploadByRsync(id, path, checkpointId);
×
UNCOV
1571
    if (code != 0) {
×
UNCOV
1572
      return TAOS_SYSTEM_ERROR(ERRNO);
×
1573
    }
1574
  } else if (tsS3StreamEnabled) {
1!
UNCOV
1575
    return uploadCheckpointToS3(id, path);
×
1576
  }
1577

1578
  return 0;
1✔
1579
}
1580

1581
// fileName:  CURRENT
1582
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
1✔
1583
  if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
1!
UNCOV
1584
    stError("down load checkpoint data parameters invalid");
×
UNCOV
1585
    return TSDB_CODE_INVALID_PARA;
×
1586
  }
1587

1588
  if (strlen(tsSnodeAddress) != 0) {
1!
UNCOV
1589
    return 0;
×
1590
  } else if (tsS3StreamEnabled) {
1!
UNCOV
1591
    return downloadCheckpointByNameS3(id, fname, dstName);
×
1592
  }
1593

1594
  return 0;
1✔
1595
}
1596

1597
int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) {
1✔
1598
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
1!
UNCOV
1599
    stError("down checkpoint data parameters invalid");
×
UNCOV
1600
    return -1;
×
1601
  }
1602

1603
  if (strlen(tsSnodeAddress) != 0) {
1!
1604
    return downloadByRsync(id, path, checkpointId);
1✔
1605
  } else if (tsS3StreamEnabled) {
×
1606
    return tcsGetObjectsByPrefix(id, path);
×
1607
  }
1608

UNCOV
1609
  return 0;
×
1610
}
1611

1612
#ifdef BUILD_NO_CALL
1613
int32_t deleteCheckpoint(const char* id) {
1614
  if (id == NULL || strlen(id) == 0) {
1615
    stError("deleteCheckpoint parameters invalid");
1616
    return TSDB_CODE_INVALID_PARA;
1617
  }
1618
  if (strlen(tsSnodeAddress) != 0) {
1619
    return deleteRsync(id);
1620
  } else if (tsS3StreamEnabled) {
1621
    tcsDeleteObjectsByPrefix(id);
1622
  }
1623
  return 0;
1624
}
1625
#endif
1626

1627
int32_t deleteCheckpointFile(const char* id, const char* name) {
1✔
1628
  char object[128] = {0};
1✔
1629

1630
  int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name);
1✔
1631
  if (nBytes <= 0 || nBytes >= sizeof(object)) {
1!
UNCOV
1632
    return TSDB_CODE_OUT_OF_RANGE;
×
1633
  }
1634

1635
  char*   tmp = object;
1✔
1636
  int32_t code = tcsDeleteObjects((const char**)&tmp, 1);
1✔
1637
  if (code != 0) {
1!
1638
    return TSDB_CODE_THIRDPARTY_ERROR;
1✔
1639
  }
UNCOV
1640
  return code;
×
1641
}
1642

1643
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
408✔
1644
  streamMutexLock(&pTask->lock);
408✔
1645
  ETaskStatus p = streamTaskGetStatus(pTask).state;
414✔
1646
  streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
413✔
1647
  streamMutexUnlock(&pTask->lock);
413✔
1648

1649
  if (pTask->pBackend != NULL) {
415!
UNCOV
1650
    streamFreeTaskState(pTask, p);
×
UNCOV
1651
    pTask->pBackend = NULL;
×
1652
  }
1653

1654
  streamMetaWLock(pTask->pMeta);
415✔
1655
  if (pTask->exec.pExecutor != NULL) {
414!
UNCOV
1656
    qDestroyTask(pTask->exec.pExecutor);
×
UNCOV
1657
    pTask->exec.pExecutor = NULL;
×
1658
  }
1659
  streamMetaWUnLock(pTask->pMeta);
414✔
1660

1661
  return 0;
416✔
1662
}
1663

1664
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
89✔
1665
  int32_t code = 0;
89✔
1666
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
89✔
1667
    return code;
46✔
1668
  }
1669

1670
  streamMutexLock(&pTask->lock);
43✔
1671
  SStreamTaskState p = streamTaskGetStatus(pTask);
43✔
1672
  if (p.state == TASK_STATUS__CK) {
43!
UNCOV
1673
    code = streamTaskSendCheckpointSourceRsp(pTask);
×
1674
  }
1675
  streamMutexUnlock(&pTask->lock);
43✔
1676

1677
  return code;
43✔
1678
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc