• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3661

17 Mar 2025 05:39AM UTC coverage: 62.007% (-0.03%) from 62.039%
#3661

push

travis-ci

web-flow
tests: add tdb ut (#30093)

* fix: compile warnings

* tests: add tdb ut

* test(tdb): fix return code

* test: recover ut

* fix: minor changes

* fix: enable test

* fix: ut errors

---------

Co-authored-by: Minglei Jin <mljin@taosdata.com>

153829 of 317582 branches covered (48.44%)

Branch coverage included in aggregate %.

240310 of 318051 relevant lines covered (75.56%)

19602636.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.55
/source/libs/stream/src/streamCheckpoint.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "rsync.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tcs.h"
20

21
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
22
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
23
#ifdef BUILD_NO_CALL
24
static int32_t deleteCheckpoint(const char* id);
25
#endif
26
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
27
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
28
                                          int32_t transId, int32_t srcTaskId);
29
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
30
static void    checkpointTriggerMonitorFn(void* param, void* tmrId);
31

32
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
6,415✔
33
                                int32_t srcTaskId, SStreamDataBlock** pRes) {
34
  SStreamDataBlock* pChkpoint = NULL;
6,415✔
35
  int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint);
6,415✔
36
  if (code) {
6,420!
37
    return code;
×
38
  }
39

40
  pChkpoint->type = checkpointType;
6,420✔
41
  if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
6,420!
42
    pChkpoint->srcTaskId = srcTaskId;
×
43
    if (srcTaskId <= 0) {
×
44
      stDebug("s-task:%s invalid src task id:%d for creating checkpoint trigger block", pTask->id.idStr, srcTaskId);
×
45
      return TSDB_CODE_INVALID_PARA;
×
46
    }
47
  }
48

49
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
6,420!
50
  if (pBlock == NULL) {
6,418!
51
    taosFreeQitem(pChkpoint);
×
52
    return terrno;
×
53
  }
54

55
  pBlock->info.type = STREAM_CHECKPOINT;
6,418✔
56
  pBlock->info.version = checkpointId;
6,418✔
57
  pBlock->info.window.ekey = pBlock->info.window.skey = transId;  // NOTE: set the transId
6,418✔
58
  pBlock->info.rows = 1;
6,418✔
59
  pBlock->info.childId = pTask->info.selfChildId;
6,418✔
60

61
  pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
6,418✔
62
  if (pChkpoint->blocks == NULL) {
6,421!
63
    taosMemoryFree(pBlock);
×
64
    taosFreeQitem(pChkpoint);
×
65
    return terrno;
×
66
  }
67

68
  void* p = taosArrayPush(pChkpoint->blocks, pBlock);
6,421✔
69
  if (p == NULL) {
6,423!
70
    taosArrayDestroy(pChkpoint->blocks);
×
71
    taosMemoryFree(pBlock);
×
72
    taosFreeQitem(pChkpoint);
×
73
    return terrno;
×
74
  }
75

76
  *pRes = pChkpoint;
6,423✔
77

78
  taosMemoryFree(pBlock);
6,423!
79
  return TSDB_CODE_SUCCESS;
6,425✔
80
}
81

82
// this message must be put into inputq successfully, continue retrying until it succeeds
83
// todo must be success
84
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
6,420✔
85
                                   int32_t srcTaskId) {
86
  SStreamDataBlock* pCheckpoint = NULL;
6,420✔
87
  int32_t code = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId, &pCheckpoint);
6,420✔
88
  if (code != TSDB_CODE_SUCCESS) {
6,426!
89
    return code;
×
90
  }
91

92
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
6,426!
93
    return TSDB_CODE_OUT_OF_MEMORY;
×
94
  }
95

96
  return streamTrySchedExec(pTask, true);
6,417✔
97
}
98

99
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
3,097✔
100
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
3,097!
101
    return TSDB_CODE_INVALID_MSG;
×
102
  }
103

104
  // todo this status may not be set here.
105
  // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
106
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
3,097✔
107
  if (code != TSDB_CODE_SUCCESS) {
3,100!
108
    stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr);
×
109
    return code;
×
110
  }
111

112
  pTask->chkInfo.pActiveInfo->transId = pReq->transId;
3,100✔
113
  pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
3,100✔
114
  pTask->chkInfo.startTs = taosGetTimestampMs();
3,098✔
115
  pTask->execInfo.checkpoint += 1;
3,098✔
116

117
  // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
118
  // and this is the last item in the inputQ.
119
  return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1);
3,098✔
120
}
121

122
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
2✔
123
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
2✔
124
  bool                   unQualified = false;
2✔
125
  const char*            id = pTask->id.idStr;
2✔
126

127
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2!
128
    stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", id);
×
129
    return TSDB_CODE_INVALID_MSG;
×
130
  }
131

132
  if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
2✔
133
    stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", id, pRsp->upstreamTaskId,
1!
134
            tstrerror(pRsp->rspCode));
135
    return TSDB_CODE_SUCCESS;
1✔
136
  }
137

138
  streamMutexLock(&pTask->lock);
1✔
139
  SStreamTaskState status = streamTaskGetStatus(pTask);
1✔
140
  streamMutexUnlock(&pTask->lock);
1✔
141

142
  if (status.state != TASK_STATUS__CK) {
1!
143
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
×
144
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
145
  }
146

147
  streamMutexLock(&pInfo->lock);
1✔
148
  unQualified = (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId);
1!
149
  streamMutexUnlock(&pInfo->lock);
1✔
150

151
  if (unQualified) {
1!
152
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
1!
153
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
1✔
154
  }
155

156
  // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
157
  int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
×
158
                                            pRsp->upstreamTaskId);
159
  return code;
×
160
}
161

162
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
1✔
163
                                           SRpcHandleInfo* pRpcInfo, int32_t code) {
164
  int32_t  ret = 0;
1✔
165
  int32_t  tlen = 0;
1✔
166
  void*    buf = NULL;
1✔
167
  SEncoder encoder;
168

169
  SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId,
1✔
170
                               .upstreamTaskId = pTask->id.taskId,
1✔
171
                               .taskId = dstTaskId,
172
                               .rspCode = code};
173

174
  if (code == TSDB_CODE_SUCCESS) {
1!
175
    req.checkpointId = pTask->chkInfo.pActiveInfo->activeId;
1✔
176
    req.transId = pTask->chkInfo.pActiveInfo->transId;
1✔
177
  } else {
178
    req.checkpointId = -1;
×
179
    req.transId = -1;
×
180
  }
181

182
  tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret);
1!
183
  if (ret < 0) {
1!
184
    stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code));
×
185
    return ret;
×
186
  }
187

188
  buf = rpcMallocCont(tlen + sizeof(SMsgHead));
1✔
189
  if (buf == NULL) {
1!
190
    stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId);
×
191
    return terrno;
×
192
  }
193

194
  ((SMsgHead*)buf)->vgId = htonl(downstreamNodeId);
1✔
195
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1✔
196

197
  tEncoderInit(&encoder, abuf, tlen);
1✔
198
  if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) {
1!
199
    rpcFreeCont(buf);
×
200
    tEncoderClear(&encoder);
×
201
    stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code));
×
202
    return ret;
×
203
  }
204
  tEncoderClear(&encoder);
1✔
205

206
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo};
1✔
207
  tmsgSendRsp(&rspMsg);
1✔
208

209
  return ret;
1✔
210
}
211

212
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
3,140✔
213
  pBlock->srcTaskId = pTask->id.taskId;
3,140✔
214
  pBlock->srcVgId = pTask->pMeta->vgId;
3,140✔
215

216
  if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
3,140!
217
    stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
×
218
    return 0;
×
219
  }
220

221
  int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
3,140✔
222
  if (code == 0) {
3,140!
223
    code = streamDispatchStreamBlock(pTask);
3,140✔
224
  } else {
225
    stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
×
226
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
227
  }
228

229
  return code;
3,140✔
230
}
231

232
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
11,321✔
233
                                        int32_t transId) {
234
  int32_t     code = 0;
11,321✔
235
  int32_t     vgId = pTask->pMeta->vgId;
11,321✔
236
  int32_t     taskLevel = pTask->info.taskLevel;
11,321✔
237
  const char* id = pTask->id.idStr;
11,321✔
238

239
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
11,321✔
240
  if (pTask->chkInfo.checkpointId > checkpointId) {
11,321✔
241
    stError("s-task:%s vgId:%d current checkpointId:%" PRId64
1!
242
            " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
243
            id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
244
    return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
245
  }
246

247
  if (pActiveInfo->failedId >= checkpointId) {
11,320✔
248
    stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
1!
249
            " discard the checkpoint-trigger block",
250
            id, vgId, checkpointId, transId, pActiveInfo->failedId);
251
    return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
252
  }
253

254
  if (pTask->chkInfo.checkpointId == checkpointId) {
11,319✔
255
    {  // send checkpoint-ready msg to upstream
256
      SRpcMsg                msg = {0};
1✔
257
      SStreamUpstreamEpInfo* pInfo = NULL;
1✔
258
      streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
1✔
259
      if (pInfo == NULL) {
1!
260
        return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
261
      }
262

263
      code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
1✔
264
      if (code == TSDB_CODE_SUCCESS) {
1!
265
        code = tmsgSendReq(&pInfo->epSet, &msg);
1✔
266
        if (code) {
1!
267
          stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code));
×
268
        }
269
      }
270
    }
271

272
    stWarn(
1!
273
        "s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume "
274
        "the interrupted checkpoint",
275
        id, vgId, pBlock->srcTaskId);
276

277
    return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
278
  }
279

280
  if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
11,318✔
281
    if (pActiveInfo->activeId != checkpointId) {
8,184✔
282
      stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
1!
283
              " discard",
284
              id, vgId, pActiveInfo->activeId, checkpointId);
285
      return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
286
    } else {  // checkpointId == pActiveInfo->activeId
287
      if (pActiveInfo->allUpstreamTriggerRecv == 1) {
8,183✔
288
        stDebug(
1!
289
            "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
290
            "checkpointId:%" PRId64 " transId:%d",
291
            id, vgId, checkpointId, transId);
292
        return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
293
      }
294

295
      if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
8,182✔
296
        //  check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
297
        for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
13,485✔
298
          STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
8,412✔
299
          if (p == NULL) {
8,412!
300
            return TSDB_CODE_INVALID_PARA;
×
301
          }
302

303
          if (p->upstreamTaskId == pBlock->srcTaskId) {
8,412✔
304
            stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64
1!
305
                   ", prev recvTs:%" PRId64 " discard",
306
                   pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
307
            return TSDB_CODE_STREAM_INVLD_CHKPT;
1✔
308
          }
309
        }
310
      }
311
    }
312
  }
313

314
  return TSDB_CODE_SUCCESS;
11,315✔
315
}
316

317
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
11,315✔
318
  int64_t                checkpointId = 0;
11,315✔
319
  int32_t                transId = 0;
11,315✔
320
  const char*            id = pTask->id.idStr;
11,315✔
321
  int32_t                code = TSDB_CODE_SUCCESS;
11,315✔
322
  int32_t                vgId = pTask->pMeta->vgId;
11,315✔
323
  int32_t                taskLevel = pTask->info.taskLevel;
11,315✔
324
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
11,315✔
325

326
  SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
11,315✔
327
  if (pDataBlock == NULL) {
11,315!
328
    return TSDB_CODE_INVALID_PARA;
×
329
  }
330

331
  checkpointId = pDataBlock->info.version;
11,315✔
332
  transId = pDataBlock->info.window.skey;
11,315✔
333

334
  streamMutexLock(&pTask->lock);
11,315✔
335
  code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
11,315✔
336
  streamMutexUnlock(&pTask->lock);
11,315✔
337
  if (code) {
11,315!
338
    if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
×
339
      streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
×
340
    }
341
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
342
    return code;
×
343
  }
344

345
  stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
11,315✔
346
          ", transId:%d current active checkpointId:%" PRId64,
347
          id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
348

349
  // set task status
350
  if (streamTaskGetStatus(pTask).state != TASK_STATUS__CK) {
11,315✔
351
    pActiveInfo->activeId = checkpointId;
3,134✔
352
    pActiveInfo->transId = transId;
3,134✔
353

354
    if (pTask->chkInfo.startTs == 0) {
3,134!
355
      pTask->chkInfo.startTs = taosGetTimestampMs();
3,134✔
356
      pTask->execInfo.checkpoint += 1;
3,134✔
357
    }
358

359
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
3,134✔
360
    if (code != TSDB_CODE_SUCCESS) {
3,134!
361
      stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
×
362
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
363
      return code;
×
364
    }
365

366
    // if previous launched timer not started yet, not start a new timer
367
    // todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch
368
    //  a new checkpoint-trigger timer right now.
369
    //  And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint
370
    //  procedure to be stucked.
371
    SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
3,134✔
372
    int8_t          old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
3,134✔
373
    if (old == 0) {
3,134!
374
      stDebug("s-task:%s start checkpoint-trigger monitor in 10s", pTask->id.idStr);
3,134✔
375

376
      int64_t* pTaskRefId = NULL;
3,134✔
377
      code = streamTaskAllocRefId(pTask, &pTaskRefId);
3,134✔
378
      if (code == 0) {
3,134!
379
        streamTmrStart(checkpointTriggerMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId,
3,134✔
380
                       "trigger-recv-monitor");
381
        pTmrInfo->launchChkptId = pActiveInfo->activeId;
3,134✔
382
      }
383
    } else {  // already launched, do nothing
384
      stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
×
385
    }
386
  }
387

388
#if 0
389
  taosMsleep(20*1000);
390
#endif
391

392
  if (taskLevel == TASK_LEVEL__SOURCE) {
11,315✔
393
    int8_t type = pTask->outputInfo.type;
3,108✔
394
    pActiveInfo->allUpstreamTriggerRecv = 1;
3,108✔
395

396
    // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
397
    // The transfer of state may generate new data that need to dispatch to downstream tasks,
398
    // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
399
    // before the next checkpoint.
400
    code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
3,108✔
401
    if (code) {
3,108!
402
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
403
      return code;
×
404
    }
405

406
#if 0
407
    chkptFailedByRetrieveReqToSource(pTask, checkpointId);
408
#endif
409

410
    if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH ||
3,108!
411
        type == TASK_OUTPUT__VTABLE_MAP) {
412
      stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
2,893✔
413
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);  // todo handle this failure
2,893✔
414
    } else {  // only one task exists, no need to dispatch downstream info
415
      code =
416
          appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1);
215✔
417
      streamFreeQitem((SStreamQueueItem*)pBlock);
215✔
418
    }
419
  } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
8,207!
420
    // todo: handle this
421
    // update the child Id for downstream tasks
422
    code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
8,207✔
423

424
    // there are still some upstream tasks not send checkpoint request, do nothing and wait for then
425
    if (pActiveInfo->allUpstreamTriggerRecv != 1) {
8,207✔
426
      streamFreeQitem((SStreamQueueItem*)pBlock);
5,105✔
427
      return code;
5,105✔
428
    }
429

430
    int32_t num = streamTaskGetNumOfUpstream(pTask);
3,102✔
431
    if (taskLevel == TASK_LEVEL__SINK) {
3,102✔
432
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
2,855✔
433
      streamFreeQitem((SStreamQueueItem*)pBlock);
2,855✔
434
      code = streamTaskBuildCheckpoint(pTask);  // todo: not handle error yet
2,855✔
435
    } else {                                    // source & agg tasks need to forward the checkpoint msg downwards
436
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
247✔
437
      code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
247✔
438
      if (code) {
247!
439
        return code;
×
440
      }
441

442
      // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
443
      // this task already. And then, dispatch check point msg to all downstream tasks
444
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
247✔
445
    }
446
  }
447

448
  return code;
6,210✔
449
}
450

451
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
452
static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
8,158✔
453
                                          int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
454
                                          const char* id, int32_t* pNotReady, int32_t* pTransId, bool* alreadyRecv) {
455
  *alreadyRecv = false;
8,158✔
456
  int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
8,158✔
457
  for (int32_t i = 0; i < size; ++i) {
16,560✔
458
    STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
8,402✔
459
    if (p == NULL) {
8,402!
460
      return TSDB_CODE_INVALID_PARA;
×
461
    }
462

463
    if (p->downstreamTaskId == downstreamTaskId) {
8,402!
464
      (*alreadyRecv) = true;
×
465
      break;
×
466
    }
467
  }
468

469
  if (*alreadyRecv) {
8,158!
470
    stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
×
471
            downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
472
            numOfDownstream);
473
  } else {
474
    STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
8,158✔
475
                                     .downstreamTaskId = downstreamTaskId,
476
                                     .checkpointId = pInfo->activeId,
8,158✔
477
                                     .transId = pInfo->transId,
8,158✔
478
                                     .streamId = streamId,
479
                                     .downstreamNodeId = downstreamNodeId};
480
    void*                    p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
8,158✔
481
    if (p == NULL) {
8,158!
482
      stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
×
483
      return terrno;
×
484
    }
485
  }
486

487
  *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
8,158✔
488
  *pTransId = pInfo->transId;
8,158✔
489
  return 0;
8,158✔
490
}
491

492
/**
493
 * All down stream tasks have successfully completed the check point task.
494
 * Current stream task is allowed to start to do checkpoint things in ASYNC model.
495
 */
496
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
8,158✔
497
                                        int32_t downstreamTaskId) {
498
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
8,158✔
499

500
  const char* id = pTask->id.idStr;
8,158✔
501
  int32_t     total = streamTaskGetNumOfDownstream(pTask);
8,158✔
502
  int32_t     code = 0;
8,157✔
503
  int32_t     notReady = 0;
8,157✔
504
  int32_t     transId = 0;
8,157✔
505
  bool        alreadyHandled = false;
8,157✔
506

507
  // 1. not in checkpoint status now
508
  SStreamTaskState pStat = streamTaskGetStatus(pTask);
8,157✔
509
  if (pStat.state != TASK_STATUS__CK) {
8,158!
510
    stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
×
511
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
512
  }
513

514
  // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
515
  if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
8,158!
516
    stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
×
517
            ") from task:0x%x, expired and discard",
518
            id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
519
    return TSDB_CODE_INVALID_MSG;
×
520
  }
521

522
  streamMutexLock(&pInfo->lock);
8,158✔
523
  code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, &notReady,
8,158✔
524
                                    &transId, &alreadyHandled);
525
  streamMutexUnlock(&pInfo->lock);
8,158✔
526

527
  if (alreadyHandled) {
8,158!
528
    stDebug("s-task:%s checkpoint-ready msg checkpointId:%" PRId64 " from task:0x%x already handled, not handle again",
×
529
            id, checkpointId, downstreamTaskId);
530
  } else {
531
    if ((notReady == 0) && (code == 0) && (!alreadyHandled)) {
8,158!
532
      stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
3,103✔
533
      code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
3,103✔
534
    }
535
  }
536

537
  return code;
8,158✔
538
}
539

540
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
8,158✔
541
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
8,158✔
542
  int64_t                now = taosGetTimestampMs();
8,158✔
543
  int32_t                numOfConfirmed = 0;
8,158✔
544

545
  streamMutexLock(&pInfo->lock);
8,158✔
546
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
16,552!
547
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
16,552✔
548
    if (pReadyInfo == NULL) {
16,552!
549
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
550
              pTask->id.idStr, i);
551
      continue;
×
552
    }
553

554
    if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
16,552!
555
      pReadyInfo->sendCompleted = 1;
8,158✔
556
      stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
8,158✔
557
              pTask->id.idStr, upstreamTaskId, checkpointId, now);
558
      break;
8,158✔
559
    }
560
  }
561

562
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
33,104✔
563
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
24,946✔
564
    if (pReadyInfo == NULL) {
24,946!
565
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
566
              pTask->id.idStr, i);
567
      continue;
×
568
    }
569

570
    if (pReadyInfo->sendCompleted == 1) {
24,946✔
571
      numOfConfirmed += 1;
16,552✔
572
    }
573
  }
574

575
  stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
8,158✔
576
          numOfConfirmed, checkpointId);
577

578
  streamMutexUnlock(&pInfo->lock);
8,158✔
579
  return TSDB_CODE_SUCCESS;
8,158✔
580
}
581

582
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
5,378✔
583
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
5,378✔
584

585
  pTask->chkInfo.startTs = 0;             // clear the recorded start time
5,378✔
586
  streamTaskOpenAllUpstreamInput(pTask);  // open inputQ for all upstream tasks
5,378✔
587

588
  streamMutexLock(&pInfo->lock);
5,368✔
589
  streamTaskClearActiveInfo(pInfo);
5,397✔
590
  if (clearChkpReadyMsg) {
5,382!
591
    streamClearChkptReadyMsg(pInfo);
5,384✔
592
  }
593
  streamMutexUnlock(&pInfo->lock);
5,380✔
594

595
  stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64,
5,396✔
596
          pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
597
}
5,396✔
598

599
// The checkpointInfo can be updated in the following three cases:
600
// 1. follower tasks; 2. leader task with status of TASK_STATUS__CK; 3. restore not completed
601
static int32_t doUpdateCheckpointInfoCheck(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq,
5,417✔
602
                                           bool* pContinue) {
603
  SStreamMeta*     pMeta = pTask->pMeta;
5,417✔
604
  int32_t          vgId = pMeta->vgId;
5,417✔
605
  int32_t          code = 0;
5,417✔
606
  const char*      id = pTask->id.idStr;
5,417✔
607
  SCheckpointInfo* pInfo = &pTask->chkInfo;
5,417✔
608

609
  *pContinue = true;
5,417✔
610

611
  // not update the checkpoint info if the checkpointId is less than the failed checkpointId
612
  if (pReq->checkpointId < pInfo->pActiveInfo->failedId) {
5,417!
613
    stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64
×
614
           " is less than the failed checkpointId:%" PRId64 ", discard",
615
           id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId);
616

617
    *pContinue = false;
×
618
    return TSDB_CODE_SUCCESS;
×
619
  }
620

621
  // it's an expired checkpointInfo update msg, we still try to drop the required drop fill-history task.
622
  if (pReq->checkpointId <= pInfo->checkpointId) {
5,417✔
623
    stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
24!
624
            " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
625
            id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
626
            pReq->transId);
627

628
    { // destroy the related fill-history tasks
629
      if (pReq->dropRelHTask) {
24!
630
        if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
×
631
          code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
×
632

633
          int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
634
          stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
×
635
                  id, vgId, pReq->taskId, numOfTasks);
636

637
          // todo: task may not exist, commit anyway, optimize this later
638
          code = streamMetaCommit(pMeta);
×
639
        }
640
      }
641
    }
642

643
    *pContinue = false;
24✔
644
    // always return true
645
    return TSDB_CODE_SUCCESS;
24✔
646
  }
647

648
  SStreamTaskState status = streamTaskGetStatus(pTask);
5,393✔
649

650
  if (!restored) {  // during restore procedure, do update checkpoint-info
5,392✔
651
    stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
6!
652
            " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
653
            id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
654
            pInfo->checkpointTime, pReq->checkpointTs);
655
  } else {  // not in restore status, must be in checkpoint status
656
    if (((status.state == TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) ||
5,386!
657
        (pMeta->role == NODE_ROLE_FOLLOWER)) {
×
658
      stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64
5,386✔
659
              "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
660
              id, vgId, status.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
661
              pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
662
    } else {
663
      stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
×
664
              " checkpointVer:%" PRId64 "->%" PRId64,
665
              id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
666
              pReq->checkpointVer);
667
    }
668
  }
669

670
  bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
10,788✔
671
                pInfo->processedVer <= pReq->checkpointVer);
5,393!
672

673
  if (!valid) {
5,395!
674
    // invalid update checkpoint info for leader, since the processedVer is greater than the checkpointVer
675
    // It is possible for follower tasks that the processedVer is greater than the checkpointVer, and the processed info
676
    // in follower tasks will be discarded, since the leader/follower switch happens before the checkpoint of the
677
    // processedVer being generated.
678
    if (pMeta->role == NODE_ROLE_LEADER) {
×
679

680
      stFatal("s-task:%s checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
×
681
              " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it",
682
              id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
683
              pReq->checkpointVer);
684

685
      *pContinue = false;
×
686
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
687
    } else {
688
      stInfo("s-task:%s vgId:%d follower recv checkpointId update info, current checkpointId:%" PRId64
×
689
             " checkpointVer:%" PRId64 " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64,
690
             id, pMeta->vgId, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
691
             pReq->checkpointVer);
692
    }
693
  }
694

695
  return TSDB_CODE_SUCCESS;
5,391✔
696
}
697

698
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
5,418✔
699
  SStreamMeta*     pMeta = pTask->pMeta;
5,418✔
700
  int32_t          vgId = pMeta->vgId;
5,418✔
701
  int32_t          code = 0;
5,418✔
702
  const char*      id = pTask->id.idStr;
5,418✔
703
  SCheckpointInfo* pInfo = &pTask->chkInfo;
5,418✔
704
  bool             continueUpdate = true;
5,418✔
705

706
  streamMutexLock(&pTask->lock);
5,418✔
707
  code = doUpdateCheckpointInfoCheck(pTask, restored, pReq, &continueUpdate);
5,421✔
708

709
  if (!continueUpdate) {
5,416✔
710
    streamMutexUnlock(&pTask->lock);
24✔
711
    return code;
24✔
712
  }
713

714
  SStreamTaskState pStatus = streamTaskGetStatus(pTask);
5,392✔
715

716
  // update only it is in checkpoint status, or during restore procedure.
717
  if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
5,392!
718
    pInfo->checkpointId = pReq->checkpointId;
5,392✔
719
    pInfo->checkpointVer = pReq->checkpointVer;
5,392✔
720
    pInfo->checkpointTime = pReq->checkpointTs;
5,392✔
721

722
    if (restored && (pMeta->role == NODE_ROLE_LEADER)) {
5,392!
723
      code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
5,381✔
724
    }
725
  }
726

727
  streamTaskClearCheckInfo(pTask, true);
5,389✔
728

729
  if (pReq->dropRelHTask) {
5,397✔
730
    if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
3,475✔
731
      stInfo("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
3,468!
732
              pReq->taskId, vgId, pReq->hTaskId);
733
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
3,469✔
734
    } else {
735
      stInfo("s-task:0x%x vgId:%d update the related fill-history task:0x%" PRIx64" to be recalculate task",
7!
736
             pReq->taskId, vgId, pReq->hTaskId);
737
    }
738
  }
739

740
  stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId,
5,398✔
741
          streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask).name);
742

743
  pTask->status.taskStatus = TASK_STATUS__READY;
5,398✔
744

745
  code = streamMetaSaveTaskInMeta(pMeta, pTask);
5,398✔
746
  streamMutexUnlock(&pTask->lock);
5,389✔
747

748
  if (code != TSDB_CODE_SUCCESS) {
5,395!
749
    stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
×
750
            vgId, pReq->checkpointId, tstrerror(code));
751
    return TSDB_CODE_SUCCESS;
×
752
  }
753

754
  // drop task should not in the meta-lock, and drop the related fill-history task now
755
  if (pReq->dropRelHTask) {
5,395✔
756
    if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
3,476✔
757
      code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
3,469✔
758
      int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
3,468✔
759
      stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
3,468✔
760
              (int32_t)pReq->hTaskId, numOfTasks);
761
    } else {
762
      STaskId hTaskId = {.streamId = pReq->hStreamId, .taskId = pReq->hTaskId};
7✔
763

764
      SStreamTask* pHTask = NULL;
7✔
765
      code = streamMetaAcquireTaskUnsafe(pMeta, &hTaskId, &pHTask);
7✔
766
      if (code == 0 && pHTask != NULL) {
7!
767
        stInfo("s-task:0x%x fill-history updated to recalculate task, reset step2Start ts, stream task:0x%x",
7!
768
               (int32_t)hTaskId.taskId, pReq->taskId);
769

770
        streamMutexLock(&pHTask->lock);
7✔
771

772
        pHTask->info.fillHistory = STREAM_RECALCUL_TASK;
7✔
773
        pHTask->execInfo.step2Start = 0; // clear the step2start timestamp
7✔
774

775
        SStreamTaskState status = streamTaskGetStatus(pHTask);
7✔
776
        if (status.state == TASK_STATUS__SCAN_HISTORY) {
7✔
777
          code = streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
4✔
778
        }
779

780
        if (pHTask->pBackend != NULL) {
7✔
781
          streamFreeTaskState(pHTask, TASK_STATUS__READY);
5✔
782
          pHTask->pBackend = NULL;
5✔
783
        }
784

785
        if (pHTask->exec.pExecutor != NULL) {
7✔
786
          qDestroyTask(pHTask->exec.pExecutor);
5✔
787
          pHTask->exec.pExecutor = NULL;
5✔
788
        }
789

790
        pMeta->expandTaskFn(pHTask);
7✔
791

792
        streamMutexUnlock(&pHTask->lock);
7✔
793

794
        code = streamMetaSaveTaskInMeta(pMeta, pHTask);
7✔
795
        streamMetaReleaseTask(pMeta, pHTask);
7✔
796
      }
797
    }
798
  }
799

800
  code = streamMetaCommit(pMeta);
5,394✔
801
  return TSDB_CODE_SUCCESS;
5,398✔
802
}
803

804
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) {
4✔
805
  struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
4✔
806

807
  if (failedId <= 0) {
4✔
808
    stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64
2!
809
           " activeId:%" PRId64,
810
           pTask->id.idStr, pInfo->failedId, pInfo->activeId);
811
  } else {
812
    if (failedId <= pInfo->failedId) {
2✔
813
      stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId);
1!
814
    } else {
815
      stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64
1!
816
              " prev failedId:%" PRId64,
817
              pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId);
818
      pInfo->failedId = failedId;
1✔
819
    }
820
  }
821
}
4✔
822

823
void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
547✔
824
  streamMutexLock(&pTask->lock);
547✔
825
  ETaskStatus status = streamTaskGetStatus(pTask).state;
548✔
826
  if (status == TASK_STATUS__CK) {
548!
827
    streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId);
×
828
  }
829
  streamMutexUnlock(&pTask->lock);
548✔
830
}
546✔
831

832
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
1✔
833
  int32_t code = 0;
1✔
834
  int32_t cap = strlen(path) + 64;
1✔
835

836
  char* filePath = taosMemoryCalloc(1, cap);
1!
837
  if (filePath == NULL) {
1!
838
    return terrno;
×
839
  }
840

841
  int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
1✔
842
  if (nBytes <= 0 || nBytes >= cap) {
1!
843
    taosMemoryFree(filePath);
×
844
    return TSDB_CODE_OUT_OF_RANGE;
×
845
  }
846

847
  code = downloadCheckpointDataByName(id, "META", filePath);
1✔
848
  if (code != 0) {
1!
849
    stError("%s chkp failed to download meta file:%s", id, filePath);
×
850
    taosMemoryFree(filePath);
×
851
    return code;
×
852
  }
853

854
  code = remoteChkpGetDelFile(filePath, list);
1✔
855
  if (code != 0) {
1!
856
    stError("%s chkp failed to get to del:%s", id, filePath);
1!
857
    taosMemoryFree(filePath);
1!
858
  }
859
  return 0;
1✔
860
}
861

862
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
3✔
863
  int32_t      code = 0;
3✔
864
  char*        path = NULL;
3✔
865
  int64_t      chkptSize = 0;
3✔
866
  SStreamMeta* pMeta = pTask->pMeta;
3✔
867
  const char*  idStr = pTask->id.idStr;
3✔
868
  int64_t      now = taosGetTimestampMs();
3✔
869

870
  SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
3✔
871
  if (toDelFiles == NULL) {
3!
872
    stError("s-task:%s failed to prepare array list during upload checkpoint, code:%s", pTask->id.idStr,
×
873
            tstrerror(terrno));
874
    return terrno;
×
875
  }
876

877
  if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
3✔
878
                                      pTask->id.idStr)) != 0) {
879
    stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code));
1!
880
  }
881

882
  if (type == DATA_UPLOAD_S3) {
3✔
883
    if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
1!
884
      stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
×
885
              tstrerror(code));
886
    }
887
  }
888

889
  if (code == TSDB_CODE_SUCCESS) {
3✔
890
    code = streamTaskUploadCheckpoint(idStr, path, checkpointId);
2✔
891
    if (code == TSDB_CODE_SUCCESS) {
2✔
892
      stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
1!
893
    } else {
894
      stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path,
1!
895
              tstrerror(code));
896
    }
897
  }
898

899
  int32_t num = taosArrayGetSize(toDelFiles);
3✔
900
  if (code == TSDB_CODE_SUCCESS && num > 0) {
3!
901
    stDebug("s-task:%s remove redundant %d files", idStr, num);
×
902

903
    for (int i = 0; i < num; i++) {
×
904
      char* pName = taosArrayGetP(toDelFiles, i);
×
905
      code = deleteCheckpointFile(idStr, pName);
×
906
      if (code != 0) {
×
907
        stDebug("s-task:%s failed to remove file: %s", idStr, pName);
×
908
        break;
×
909
      }
910
    }
911

912
    stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
×
913
  }
914

915
  taosArrayDestroyP(toDelFiles, NULL);
3✔
916
  double el = (taosGetTimestampMs() - now) / 1000.0;
3✔
917

918
  if (code == TSDB_CODE_SUCCESS) {
3✔
919
    code = taosGetDirSize(path, &chkptSize);
1✔
920
    stDebug("s-task:%s complete upload checkpointId:%" PRId64
1!
921
            ", elapsed time:%.2fs, checkpointSize:%.2fKiB local dir:%s",
922
            idStr, checkpointId, el, SIZE_IN_KiB(chkptSize), path);
923
  } else {
924
    stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " elapsed time:%.2fs, checkpointSize:%.2fKiB", idStr,
2!
925
            checkpointId, el, SIZE_IN_KiB(chkptSize));
926
  }
927

928
  taosMemoryFree(path);
3!
929
  return code;
3✔
930
}
931

932
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
6,153✔
933
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
6,153✔
934
  if (type == DATA_UPLOAD_DISABLE) {
6,153✔
935
    stDebug("s-task:%s not config to backup checkpoint data at snode, checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
6,152✔
936
    return 0;
6,152✔
937
  }
938

939
  if (pTask == NULL || pTask->pBackend == NULL) {
1!
940
    return 0;
×
941
  }
942

943
  int64_t dbRefId = taskGetDBRef(pTask->pBackend);
1✔
944
  void*   pBackend = taskAcquireDb(dbRefId);
1✔
945
  if (pBackend == NULL) {
1!
946
    stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData",
×
947
            pTask->id.idStr);
948
    return -1;
×
949
  }
950

951
  int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
1✔
952
  taskReleaseDb(dbRefId);
1✔
953

954
  return code;
1✔
955
}
956

957
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
6,153✔
958
  int32_t      code = TSDB_CODE_SUCCESS;
6,153✔
959
  int64_t      startTs = pTask->chkInfo.startTs;
6,153✔
960
  int64_t      ckId = pTask->chkInfo.pActiveInfo->activeId;
6,153✔
961
  const char*  id = pTask->id.idStr;
6,153✔
962
  SStreamMeta* pMeta = pTask->pMeta;
6,153✔
963

964
  streamMutexLock(&pTask->lock);
6,153✔
965
  bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
6,153✔
966
  streamMutexUnlock(&pTask->lock);
6,153✔
967

968
  // sink task does not need to save the status, and generated the checkpoint
969
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
6,153✔
970
    stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
3,297✔
971

972
    int64_t ver = pTask->chkInfo.processedVer;
3,297✔
973
    code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver);
3,297✔
974
    if (code != TSDB_CODE_SUCCESS) {
3,297!
975
      stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
×
976
    }
977

978
    int64_t et = taosGetTimestampMs();
3,297✔
979
    stDebug("s-task:%s gen local checkpoint completed, elapsed time:%.2fs", id, (et - startTs) / 1000.0);
3,297✔
980
  }
981

982
  // TODO: monitoring the checkpoint-source msg
983
  // send check point response to upstream task
984
  if (code == TSDB_CODE_SUCCESS) {
6,153!
985
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
6,153✔
986
      code = streamTaskSendCheckpointSourceRsp(pTask);
3,036✔
987
    } else {
988
      code = streamTaskSendCheckpointReadyMsg(pTask);
3,117✔
989
    }
990

991
    if (code != TSDB_CODE_SUCCESS) {
6,153!
992
      // todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
993
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
×
994
              tstrerror(code));
995
    }
996
  }
997

998
  if (code == TSDB_CODE_SUCCESS) {
6,153!
999
    code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
6,153✔
1000
    if (code != TSDB_CODE_SUCCESS) {
6,153✔
1001
      stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
1!
1002
    }
1003
  } else {
1004
    stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
×
1005
  }
1006

1007
  // TODO: monitoring the checkpoint-report msg
1008
  // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
1009
  if (code == TSDB_CODE_SUCCESS) {
6,153✔
1010
    if (pTask->pMsgCb != NULL) {
6,152✔
1011
      code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
6,138✔
1012
    }
1013
  } else {  // clear the checkpoint info if failed
1014
    // set failed checkpoint id before clear the checkpoint info
1015
    streamMutexLock(&pTask->lock);
1✔
1016
    streamTaskSetFailedCheckpointId(pTask, ckId);
1✔
1017
    streamMutexUnlock(&pTask->lock);
1✔
1018

1019
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
1✔
1020
    stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
1!
1021
  }
1022

1023
  double el = (taosGetTimestampMs() - startTs) / 1000.0;
6,153✔
1024
  stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
6,153!
1025
         pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
1026
         (code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
1027

1028
  return code;
6,153✔
1029
}
1030

1031
static int32_t doChkptStatusCheck(SStreamTask* pTask, void* param) {
1✔
1032
  const char*            id = pTask->id.idStr;
1✔
1033
  int32_t                vgId = pTask->pMeta->vgId;
1✔
1034
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1✔
1035
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
1✔
1036

1037
  // checkpoint-trigger recv flag is set, quit
1038
  if (pActiveInfo->allUpstreamTriggerRecv) {
1!
1039
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1040
    stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger", id, vgId);
×
1041
    return -1;
×
1042
  }
1043

1044
  if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
1!
1045
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1046
    stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
×
1047
           ", quit",
1048
           id, vgId, pTmrInfo->launchChkptId);
1049
    return -1;
×
1050
  }
1051

1052
  // active checkpoint info is cleared for now
1053
  if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
1!
1054
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1055
    stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr", id,
×
1056
           vgId);
1057
    return -1;
×
1058
  }
1059

1060
  return 0;
1✔
1061
}
1062

1063
static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) {
1✔
1064
  const char*            id = pTask->id.idStr;
1✔
1065
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1✔
1066

1067
  SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
1✔
1068
  if (pNotSendList == NULL) {
1!
1069
    stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
×
1070
    return terrno;
×
1071
  }
1072

1073
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
2✔
1074
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
1✔
1075

1076
    bool recved = false;
1✔
1077
    for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
2✔
1078
      STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
1✔
1079
      if (pReady == NULL) {
1!
1080
        continue;
×
1081
      }
1082

1083
      if (pInfo->nodeId == pReady->upstreamNodeId) {
1!
1084
        recved = true;
×
1085
        break;
×
1086
      }
1087
    }
1088

1089
    if (!recved) {  // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
1!
1090
      streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
1✔
1091
      void* px = taosArrayPush(pNotSendList, pInfo);
1✔
1092
      if (px == NULL) {
1!
1093
        stError("s-task:%s failed to record not send info, code: out of memory", id);
×
1094
        taosArrayDestroy(pNotSendList);
×
1095
        return terrno;
×
1096
      }
1097
    }
1098
  }
1099

1100
  *ppNotSendList = pNotSendList;
1✔
1101
  return 0;
1✔
1102
}
1103

1104
int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) {
1✔
1105
  const char*            id = pTask->id.idStr;
1✔
1106
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
1✔
1107
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1✔
1108
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
1✔
1109
  int32_t                vgId = pTask->pMeta->vgId;
1✔
1110

1111
  int32_t code = doChkptStatusCheck(pTask, param);
1✔
1112
  if (code) {
1!
1113
    return code;
×
1114
  }
1115

1116
  code = doFindNotSendUpstream(pTask, pList, ppNotSendList);
1✔
1117
  if (code) {
1!
1118
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1119
    stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
×
1120
    return code;
×
1121
  }
1122

1123
  // do send retrieve checkpoint trigger msg to upstream
1124
  code = doSendRetrieveTriggerMsg(pTask, *ppNotSendList);
1✔
1125
  if (code) {
1!
1126
    stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
1127
    code = 0;
×
1128
  }
1129

1130
  return code;
1✔
1131
}
1132

1133
static void doCleanup(SStreamTask* pTask, SArray* pList) {
112,352✔
1134
  streamMetaReleaseTask(pTask->pMeta, pTask);
112,352✔
1135
  taosArrayDestroy(pList);
112,352✔
1136
}
112,352✔
1137

1138
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
112,359✔
1139
  int32_t      code = 0;
112,359✔
1140
  int32_t      numOfNotSend = 0;
112,359✔
1141
  SArray*      pNotSendList = NULL;
112,359✔
1142
  int64_t      taskRefId = *(int64_t*)param;
112,359✔
1143
  int64_t      now = taosGetTimestampMs();
112,359✔
1144

1145
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
112,359✔
1146
  if (pTask == NULL) {
112,359✔
1147
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
7!
1148
    streamTaskFreeRefId(param);
7✔
1149
    return;
112,359✔
1150
  }
1151

1152
  int32_t                vgId = pTask->pMeta->vgId;
112,352✔
1153
  const char*            id = pTask->id.idStr;
112,352✔
1154
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
112,352✔
1155
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
112,352✔
1156
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
112,352✔
1157

1158
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
112,352!
1159
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1160
    stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id);
×
1161
    doCleanup(pTask, pNotSendList);
×
1162
    return;
×
1163
  }
1164

1165
  // check the status every 100ms
1166
  if (streamTaskShouldStop(pTask)) {
112,352✔
1167
    streamCleanBeforeQuitTmr(pTmrInfo, param);
70✔
1168
    stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger", id, vgId);
70!
1169
    doCleanup(pTask, pNotSendList);
70✔
1170
    return;
70✔
1171
  }
1172

1173
  if (++pTmrInfo->activeCounter < 50) {
112,282✔
1174
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
110,599✔
1175
                   "trigger-recv-monitor");
1176
    doCleanup(pTask, pNotSendList);
110,599✔
1177
    return;
110,599✔
1178
  }
1179

1180
  pTmrInfo->activeCounter = 0;
1,683✔
1181
  stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
1,683✔
1182

1183
  streamMutexLock(&pTask->lock);
1,683✔
1184
  SStreamTaskState state = streamTaskGetStatus(pTask);
1,683✔
1185
  streamMutexUnlock(&pTask->lock);
1,683✔
1186

1187
  if (state.state != TASK_STATUS__CK) {
1,683!
1188
    streamCleanBeforeQuitTmr(pTmrInfo, param);
1,683✔
1189
    stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger", id,
1,683✔
1190
            vgId, state.name);
1191
    doCleanup(pTask, pNotSendList);
1,683✔
1192
    return;
1,683✔
1193
  }
1194

1195
  streamMutexLock(&pActiveInfo->lock);
×
1196
  code = chkptTriggerRecvMonitorHelper(pTask, param, &pNotSendList);
×
1197
  streamMutexUnlock(&pActiveInfo->lock);
×
1198

1199
  if (code != TSDB_CODE_SUCCESS) {
×
1200
    doCleanup(pTask, pNotSendList);
×
1201
    return;
×
1202
  }
1203

1204
  // check every 100ms
1205
  numOfNotSend = taosArrayGetSize(pNotSendList);
×
1206
  if (numOfNotSend > 0) {
×
1207
    stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
×
1208
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
×
1209
                   "trigger-recv-monitor");
1210
  } else {
1211
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1212
    stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr", id);
×
1213
  }
1214

1215
  doCleanup(pTask, pNotSendList);
×
1216
}
1217

1218
int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
1✔
1219
  int32_t     code = 0;
1✔
1220
  int32_t     vgId = pTask->pMeta->vgId;
1✔
1221
  const char* pId = pTask->id.idStr;
1✔
1222
  int32_t     size = taosArrayGetSize(pNotSendList);
1✔
1223
  int32_t     numOfUpstream = streamTaskGetNumOfUpstream(pTask);
1✔
1224
  int64_t     checkpointId = pTask->chkInfo.pActiveInfo->activeId;
1✔
1225

1226
  if (size <= 0) {
1!
1227
    stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
×
1228
    return code;
×
1229
  }
1230

1231
  stDebug("s-task:%s %d/%d not recv checkpoint-trigger from upstream(s), start to send trigger-retrieve", pId, size,
1!
1232
          numOfUpstream);
1233

1234
  for (int32_t i = 0; i < size; i++) {
2✔
1235
    SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
1✔
1236
    if (pUpstreamTask == NULL) {
1!
1237
      return TSDB_CODE_INVALID_PARA;
×
1238
    }
1239

1240
    int32_t  ret = 0;
1✔
1241
    int32_t  tlen = 0;
1✔
1242
    void*    buf = NULL;
1✔
1243
    SRpcMsg  rpcMsg = {0};
1✔
1244
    SEncoder encoder;
1245

1246
    SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId,
1✔
1247
                                    .downstreamTaskId = pTask->id.taskId,
1✔
1248
                                    .downstreamNodeId = vgId,
1249
                                    .upstreamTaskId = pUpstreamTask->taskId,
1✔
1250
                                    .upstreamNodeId = pUpstreamTask->nodeId,
1✔
1251
                                    .checkpointId = checkpointId};
1252

1253
    tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
1!
1254
    if (ret < 0) {
1!
1255
      stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code));
×
1256
    }
1257

1258
    buf = rpcMallocCont(tlen + sizeof(SMsgHead));
1✔
1259
    if (buf == NULL) {
1!
1260
      stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId);
×
1261
      continue;
×
1262
    }
1263

1264
    ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId);
1✔
1265
    void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1✔
1266

1267
    tEncoderInit(&encoder, abuf, tlen);
1✔
1268
    if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) {
1!
1269
      rpcFreeCont(buf);
×
1270
      tEncoderClear(&encoder);
×
1271
      stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code));
×
1272
      continue;
×
1273
    }
1274
    tEncoderClear(&encoder);
1✔
1275

1276
    initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead));
1✔
1277

1278
    code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
1✔
1279
    if (code == TSDB_CODE_SUCCESS) {
1!
1280
      stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId,
1!
1281
              vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1282
    } else {
1283
      stError("s-task:%s vgId:%d failed to send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64,
×
1284
              pId, vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1285
    }
1286
  }
1287

1288
  return code;
1✔
1289
}
1290

1291
static int32_t isAlreadySendTriggerNoLock(SStreamTask* pTask, int32_t downstreamNodeId) {
1✔
1292
  int64_t                now = taosGetTimestampMs();
1✔
1293
  const char*            id = pTask->id.idStr;
1✔
1294
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
1✔
1295
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
1✔
1296

1297
  if (!pInfo->dispatchTrigger) {
1!
1298
    return false;
×
1299
  }
1300

1301
  int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList);
1✔
1302
  for (int32_t i = 0; i < num; ++i) {
1!
1303
    STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
1✔
1304
    if (pSendInfo == NULL) {
1!
1305
      stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num);
×
1306
      continue;
×
1307
    }
1308

1309
    if (pSendInfo->nodeId != downstreamNodeId) {
1!
1310
      continue;
×
1311
    }
1312

1313
    // has send trigger msg to downstream node,
1314
    double before = (now - pSendInfo->sendTs) / 1000.0;
1✔
1315
    if (pSendInfo->recved) {
1!
1316
      stWarn("s-task:%s checkpoint-trigger msg already send at:%" PRId64
×
1317
             "(%.2fs before) and recv confirmed by downstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
1318
             id, pSendInfo->sendTs, before, pSendInfo->taskId, pInfo->activeId, pInfo->transId);
1319
    } else {
1320
      stWarn("s-task:%s checkpoint-trigger already send at:%" PRId64 "(%.2fs before), checkpointId:%" PRId64
1!
1321
             ", transId:%d",
1322
             id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
1323
    }
1324

1325
    return true;
1✔
1326
  }
1327

1328
  return false;
×
1329
}
1330

1331
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
1✔
1332
  int64_t                now = taosGetTimestampMs();
1✔
1333
  const char*            id = pTask->id.idStr;
1✔
1334
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
1✔
1335
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
1✔
1336

1337
  if (pStatus.state != TASK_STATUS__CK) {
1!
1338
    return false;
×
1339
  }
1340

1341
  streamMutexLock(&pInfo->lock);
1✔
1342
  bool send = isAlreadySendTriggerNoLock(pTask, downstreamNodeId);
1✔
1343
  streamMutexUnlock(&pInfo->lock);
1✔
1344

1345
  return send;
1✔
1346
}
1347

1348
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) {
2✔
1349
  *pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList);
2✔
1350

1351
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2✔
1352
    *pTotal = 1;
1✔
1353
  } else {
1354
    *pTotal = streamTaskGetNumOfUpstream(pTask);
1✔
1355
  }
1356
}
2✔
1357

1358
// record the dispatch checkpoint trigger info in the list
1359
// memory insufficient may cause the stream computing stopped
1360
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId) {
3,140✔
1361
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
3,140✔
1362
  int64_t                now = taosGetTimestampMs();
3,140✔
1363
  int32_t                code = 0;
3,140✔
1364

1365
  streamMutexLock(&pInfo->lock);
3,140✔
1366

1367
  if (sendingChkptId > pInfo->failedId) {
3,140!
1368
    pInfo->dispatchTrigger = true;
3,140✔
1369
    if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
3,140✔
1370
      STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
524✔
1371

1372
      STaskTriggerSendInfo p = {
524✔
1373
          .sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
524✔
1374
      void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
524✔
1375
      if (px == NULL) {  // pause the stream task, if memory not enough
524!
1376
        code = terrno;
×
1377
      }
1378
    } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
2,616!
1379
      for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
×
1380
        STaskDispatcherFixed* pAddr = taosArrayGet(pTask->outputInfo.vtableMapDispatcher.taskInfos, i);
×
1381
        if (pAddr == NULL) {
×
1382
          continue;
×
1383
        }
1384

1385
        STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pAddr->nodeId, .taskId = pAddr->taskId};
×
1386
        void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
×
1387
        if (px == NULL) {  // pause the stream task, if memory not enough
×
1388
          code = terrno;
×
1389
          break;
×
1390
        }
1391
      }
1392
    } else {
1393
      for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
10,353✔
1394
        SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
7,737✔
1395
        if (pVgInfo == NULL) {
7,737!
1396
          continue;
×
1397
        }
1398

1399
        STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
7,737✔
1400
        void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
7,737✔
1401
        if (px == NULL) {  // pause the stream task, if memory not enough
7,737!
1402
          code = terrno;
×
1403
          break;
×
1404
        }
1405
      }
1406
    }
1407
  }
1408

1409
  streamMutexUnlock(&pInfo->lock);
3,140✔
1410

1411
  return code;
3,140✔
1412
}
1413

1414
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
8,255✔
1415
  int32_t num = 0;
8,255✔
1416
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
33,508✔
1417
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
25,253✔
1418
    if (p == NULL) {
25,253!
1419
      continue;
×
1420
    }
1421

1422
    if (p->recved) {
25,253✔
1423
      num++;
16,754✔
1424
    }
1425
  }
1426
  return num;
8,255✔
1427
}
1428

1429
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
8,255✔
1430
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
8,255✔
1431

1432
  int64_t now = taosGetTimestampMs();
8,255✔
1433
  int32_t taskId = 0;
8,255✔
1434
  int32_t total = streamTaskGetNumOfDownstream(pTask);
8,255✔
1435
  bool    alreadyRecv = false;
8,255✔
1436

1437
  streamMutexLock(&pInfo->lock);
8,255✔
1438

1439
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
16,754!
1440
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
16,754✔
1441
    if (p == NULL) {
16,754!
1442
      continue;
×
1443
    }
1444

1445
    if (p->nodeId == vgId) {
16,754✔
1446
      if (p->recved) {
8,255!
1447
        stWarn("s-task:%s already recv checkpoint-trigger msg rsp from vgId:%d down:0x%x %.2fs ago, req send:%" PRId64
×
1448
               " discard",
1449
               pTask->id.idStr, vgId, p->taskId, (now - p->recvTs) / 1000.0, p->sendTs);
1450
        alreadyRecv = true;
×
1451
      } else {
1452
        p->recved = true;
8,255✔
1453
        p->recvTs = taosGetTimestampMs();
8,255✔
1454
        taskId = p->taskId;
8,255✔
1455
      }
1456
      break;
8,255✔
1457
    }
1458
  }
1459

1460
  int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
8,255✔
1461
  streamMutexUnlock(&pInfo->lock);
8,255✔
1462

1463
  if (taskId == 0) {
8,255!
1464
    stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
×
1465
  } else {
1466
    if (!alreadyRecv) {
8,255!
1467
      stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
8,255✔
1468
              pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
1469
    }
1470
  }
1471
}
8,255✔
1472

1473
int32_t uploadCheckpointToS3(const char* id, const char* path) {
1✔
1474
  int32_t code = 0;
1✔
1475
  int32_t nBytes = 0;
1✔
1476
  /*
1477
  if (s3Init() != 0) {
1478
    return TSDB_CODE_THIRDPARTY_ERROR;
1479
  }
1480
  */
1481
  TdDirPtr pDir = taosOpenDir(path);
1✔
1482
  if (pDir == NULL) {
1!
1483
    return terrno;
×
1484
  }
1485

1486
  TdDirEntryPtr de = NULL;
1✔
1487
  while ((de = taosReadDir(pDir)) != NULL) {
5✔
1488
    char* name = taosGetDirEntryName(de);
4✔
1489
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
4!
1490

1491
    char filename[PATH_MAX] = {0};
2✔
1492
    if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
2!
1493
      nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
×
1494
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
×
1495
        code = TSDB_CODE_OUT_OF_RANGE;
×
1496
        break;
×
1497
      }
1498
    } else {
1499
      nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
2✔
1500
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
2!
1501
        code = TSDB_CODE_OUT_OF_RANGE;
×
1502
        break;
×
1503
      }
1504
    }
1505

1506
    char object[PATH_MAX] = {0};
2✔
1507
    nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
2✔
1508
    if (nBytes <= 0 || nBytes >= sizeof(object)) {
2!
1509
      code = TSDB_CODE_OUT_OF_RANGE;
×
1510
      break;
×
1511
    }
1512

1513
    code = tcsPutObjectFromFile2(filename, object, 0);
2✔
1514
    if (code != 0) {
2!
1515
      stError("[tcs] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
2!
1516
    } else {
1517
      stDebug("[tcs] upload checkpoint:%s", filename);
×
1518
    }
1519
  }
1520

1521
  int32_t ret = taosCloseDir(&pDir);
1✔
1522
  if (code == 0 && ret != 0) {
1!
1523
    code = ret;
×
1524
  }
1525

1526
  return code;
1✔
1527
}
1528

1529
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
1✔
1530
  int32_t nBytes;
1531
  int32_t cap = strlen(id) + strlen(dstName) + 16;
1✔
1532

1533
  char* buf = taosMemoryCalloc(1, cap);
1!
1534
  if (buf == NULL) {
1!
1535
    return terrno;
×
1536
  }
1537

1538
  nBytes = snprintf(buf, cap, "%s/%s", id, fname);
1✔
1539
  if (nBytes <= 0 || nBytes >= cap) {
1!
1540
    taosMemoryFree(buf);
×
1541
    return TSDB_CODE_OUT_OF_RANGE;
×
1542
  }
1543
  int32_t code = tcsGetObjectToFile(buf, dstName);
1✔
1544
  if (code != 0) {
1!
1545
    taosMemoryFree(buf);
1!
1546
    return TAOS_SYSTEM_ERROR(ERRNO);
1✔
1547
  }
1548
  taosMemoryFree(buf);
×
1549
  return 0;
×
1550
}
1551

1552
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
6,162✔
1553
  if (strlen(tsSnodeAddress) != 0) {
6,162✔
1554
    return DATA_UPLOAD_RSYNC;
1✔
1555
  } else if (tsS3StreamEnabled) {
6,161!
1556
    return DATA_UPLOAD_S3;
×
1557
  } else {
1558
    return DATA_UPLOAD_DISABLE;
6,161✔
1559
  }
1560
}
1561

1562
int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) {
2✔
1563
  int32_t code = 0;
2✔
1564
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
2!
1565
    stError("invalid parameters in upload checkpoint, %s", id);
1!
1566
    return TSDB_CODE_INVALID_CFG;
1✔
1567
  }
1568

1569
  if (strlen(tsSnodeAddress) != 0) {
1!
1570
    code = uploadByRsync(id, path, checkpointId);
×
1571
    if (code != 0) {
×
1572
      return TAOS_SYSTEM_ERROR(ERRNO);
×
1573
    }
1574
  } else if (tsS3StreamEnabled) {
1!
1575
    return uploadCheckpointToS3(id, path);
×
1576
  }
1577

1578
  return 0;
1✔
1579
}
1580

1581
// fileName:  CURRENT
1582
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
1✔
1583
  if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
1!
1584
    stError("down load checkpoint data parameters invalid");
×
1585
    return TSDB_CODE_INVALID_PARA;
×
1586
  }
1587

1588
  if (strlen(tsSnodeAddress) != 0) {
1!
1589
    return 0;
×
1590
  } else if (tsS3StreamEnabled) {
1!
1591
    return downloadCheckpointByNameS3(id, fname, dstName);
×
1592
  }
1593

1594
  return 0;
1✔
1595
}
1596

1597
int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) {
1✔
1598
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
1!
1599
    stError("down checkpoint data parameters invalid");
×
1600
    return -1;
×
1601
  }
1602

1603
  if (strlen(tsSnodeAddress) != 0) {
1!
1604
    return downloadByRsync(id, path, checkpointId);
1✔
1605
  } else if (tsS3StreamEnabled) {
×
1606
    return tcsGetObjectsByPrefix(id, path);
×
1607
  }
1608

1609
  return 0;
×
1610
}
1611

1612
#ifdef BUILD_NO_CALL
1613
int32_t deleteCheckpoint(const char* id) {
1614
  if (id == NULL || strlen(id) == 0) {
1615
    stError("deleteCheckpoint parameters invalid");
1616
    return TSDB_CODE_INVALID_PARA;
1617
  }
1618
  if (strlen(tsSnodeAddress) != 0) {
1619
    return deleteRsync(id);
1620
  } else if (tsS3StreamEnabled) {
1621
    tcsDeleteObjectsByPrefix(id);
1622
  }
1623
  return 0;
1624
}
1625
#endif
1626

1627
int32_t deleteCheckpointFile(const char* id, const char* name) {
1✔
1628
  char object[128] = {0};
1✔
1629

1630
  int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name);
1✔
1631
  if (nBytes <= 0 || nBytes >= sizeof(object)) {
1!
1632
    return TSDB_CODE_OUT_OF_RANGE;
×
1633
  }
1634

1635
  char*   tmp = object;
1✔
1636
  int32_t code = tcsDeleteObjects((const char**)&tmp, 1);
1✔
1637
  if (code != 0) {
1!
1638
    return TSDB_CODE_THIRDPARTY_ERROR;
1✔
1639
  }
1640
  return code;
×
1641
}
1642

1643
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
448✔
1644
  streamMutexLock(&pTask->lock);
448✔
1645
  ETaskStatus p = streamTaskGetStatus(pTask).state;
448✔
1646
  streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
448✔
1647
  streamMutexUnlock(&pTask->lock);
448✔
1648

1649
  if (pTask->pBackend != NULL) {
448!
1650
    streamFreeTaskState(pTask, p);
×
1651
    pTask->pBackend = NULL;
×
1652
  }
1653

1654
  streamMetaWLock(pTask->pMeta);
448✔
1655
  if (pTask->exec.pExecutor != NULL) {
448!
1656
    qDestroyTask(pTask->exec.pExecutor);
×
1657
    pTask->exec.pExecutor = NULL;
×
1658
  }
1659
  streamMetaWUnLock(pTask->pMeta);
448✔
1660

1661
  return 0;
448✔
1662
}
1663

1664
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
42✔
1665
  int32_t code = 0;
42✔
1666
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
42✔
1667
    return code;
22✔
1668
  }
1669

1670
  streamMutexLock(&pTask->lock);
20✔
1671
  SStreamTaskState p = streamTaskGetStatus(pTask);
20✔
1672
  if (p.state == TASK_STATUS__CK) {
20!
1673
    code = streamTaskSendCheckpointSourceRsp(pTask);
×
1674
  }
1675
  streamMutexUnlock(&pTask->lock);
20✔
1676

1677
  return code;
20✔
1678
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc