• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/stream/src/streamCheckpoint.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "rsync.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tcs.h"
20

21
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
22
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
23
#ifdef BUILD_NO_CALL
24
static int32_t deleteCheckpoint(const char* id);
25
#endif
26
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
27
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
28
                                          int32_t transId, int32_t srcTaskId);
29
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
30
static void    checkpointTriggerMonitorFn(void* param, void* tmrId);
31

UNCOV
32
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
×
33
                                int32_t srcTaskId, SStreamDataBlock** pRes) {
UNCOV
34
  SStreamDataBlock* pChkpoint = NULL;
×
UNCOV
35
  int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint);
×
UNCOV
36
  if (code) {
×
37
    return code;
×
38
  }
39

UNCOV
40
  pChkpoint->type = checkpointType;
×
UNCOV
41
  if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
×
42
    pChkpoint->srcTaskId = srcTaskId;
×
43
    if (srcTaskId <= 0) {
×
44
      stDebug("s-task:%s invalid src task id:%d for creating checkpoint trigger block", pTask->id.idStr, srcTaskId);
×
45
      return TSDB_CODE_INVALID_PARA;
×
46
    }
47
  }
48

UNCOV
49
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
UNCOV
50
  if (pBlock == NULL) {
×
51
    taosFreeQitem(pChkpoint);
×
52
    return terrno;
×
53
  }
54

UNCOV
55
  pBlock->info.type = STREAM_CHECKPOINT;
×
UNCOV
56
  pBlock->info.version = checkpointId;
×
UNCOV
57
  pBlock->info.window.ekey = pBlock->info.window.skey = transId;  // NOTE: set the transId
×
UNCOV
58
  pBlock->info.rows = 1;
×
UNCOV
59
  pBlock->info.childId = pTask->info.selfChildId;
×
60

UNCOV
61
  pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
×
UNCOV
62
  if (pChkpoint->blocks == NULL) {
×
63
    taosMemoryFree(pBlock);
×
64
    taosFreeQitem(pChkpoint);
×
65
    return terrno;
×
66
  }
67

UNCOV
68
  void* p = taosArrayPush(pChkpoint->blocks, pBlock);
×
UNCOV
69
  if (p == NULL) {
×
70
    taosArrayDestroy(pChkpoint->blocks);
×
71
    taosMemoryFree(pBlock);
×
72
    taosFreeQitem(pChkpoint);
×
73
    return terrno;
×
74
  }
75

UNCOV
76
  *pRes = pChkpoint;
×
77

UNCOV
78
  taosMemoryFree(pBlock);
×
UNCOV
79
  return TSDB_CODE_SUCCESS;
×
80
}
81

82
// this message must be put into inputq successfully, continue retrying until it succeeds
83
// todo must be success
UNCOV
84
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
×
85
                                   int32_t srcTaskId) {
UNCOV
86
  SStreamDataBlock* pCheckpoint = NULL;
×
UNCOV
87
  int32_t code = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId, &pCheckpoint);
×
UNCOV
88
  if (code != TSDB_CODE_SUCCESS) {
×
89
    return code;
×
90
  }
91

UNCOV
92
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
×
93
    return TSDB_CODE_OUT_OF_MEMORY;
×
94
  }
95

UNCOV
96
  return streamTrySchedExec(pTask);
×
97
}
98

UNCOV
99
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
×
UNCOV
100
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
×
101
    return TSDB_CODE_INVALID_MSG;
×
102
  }
103

104
  // todo this status may not be set here.
105
  // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
UNCOV
106
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
×
UNCOV
107
  if (code != TSDB_CODE_SUCCESS) {
×
108
    stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr);
×
109
    return code;
×
110
  }
111

UNCOV
112
  pTask->chkInfo.pActiveInfo->transId = pReq->transId;
×
UNCOV
113
  pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
×
UNCOV
114
  pTask->chkInfo.startTs = taosGetTimestampMs();
×
UNCOV
115
  pTask->execInfo.checkpoint += 1;
×
116

117
  // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
118
  // and this is the last item in the inputQ.
UNCOV
119
  return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1);
×
120
}
121

UNCOV
122
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
×
UNCOV
123
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
UNCOV
124
  bool                   unQualified = false;
×
UNCOV
125
  const char*            id = pTask->id.idStr;
×
126

UNCOV
127
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
128
    stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", id);
×
129
    return TSDB_CODE_INVALID_MSG;
×
130
  }
131

UNCOV
132
  if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
×
UNCOV
133
    stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", id, pRsp->upstreamTaskId,
×
134
            tstrerror(pRsp->rspCode));
UNCOV
135
    return TSDB_CODE_SUCCESS;
×
136
  }
137

UNCOV
138
  streamMutexLock(&pTask->lock);
×
UNCOV
139
  SStreamTaskState status = streamTaskGetStatus(pTask);
×
UNCOV
140
  streamMutexUnlock(&pTask->lock);
×
141

UNCOV
142
  if (status.state != TASK_STATUS__CK) {
×
143
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
×
144
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
145
  }
146

UNCOV
147
  streamMutexLock(&pInfo->lock);
×
UNCOV
148
  unQualified = (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId);
×
UNCOV
149
  streamMutexUnlock(&pInfo->lock);
×
150

UNCOV
151
  if (unQualified) {
×
UNCOV
152
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
×
UNCOV
153
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
154
  }
155

156
  // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
157
  int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
×
158
                                            pRsp->upstreamTaskId);
159
  return code;
×
160
}
161

UNCOV
162
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
×
163
                                           SRpcHandleInfo* pRpcInfo, int32_t code) {
UNCOV
164
  int32_t  ret = 0;
×
UNCOV
165
  int32_t  tlen = 0;
×
UNCOV
166
  void*    buf = NULL;
×
167
  SEncoder encoder;
168

UNCOV
169
  SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId,
×
UNCOV
170
                               .upstreamTaskId = pTask->id.taskId,
×
171
                               .taskId = dstTaskId,
172
                               .rspCode = code};
173

UNCOV
174
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
175
    req.checkpointId = pTask->chkInfo.pActiveInfo->activeId;
×
UNCOV
176
    req.transId = pTask->chkInfo.pActiveInfo->transId;
×
177
  } else {
UNCOV
178
    req.checkpointId = -1;
×
UNCOV
179
    req.transId = -1;
×
180
  }
181

UNCOV
182
  tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret);
×
UNCOV
183
  if (ret < 0) {
×
184
    stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code));
×
185
    return ret;
×
186
  }
187

UNCOV
188
  buf = rpcMallocCont(tlen + sizeof(SMsgHead));
×
UNCOV
189
  if (buf == NULL) {
×
190
    stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId);
×
191
    return terrno;
×
192
  }
193

UNCOV
194
  ((SMsgHead*)buf)->vgId = htonl(downstreamNodeId);
×
UNCOV
195
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
196

UNCOV
197
  tEncoderInit(&encoder, abuf, tlen);
×
UNCOV
198
  if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) {
×
199
    rpcFreeCont(buf);
×
200
    tEncoderClear(&encoder);
×
201
    stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code));
×
202
    return ret;
×
203
  }
UNCOV
204
  tEncoderClear(&encoder);
×
205

UNCOV
206
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo};
×
UNCOV
207
  tmsgSendRsp(&rspMsg);
×
208

UNCOV
209
  return ret;
×
210
}
211

UNCOV
212
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
×
UNCOV
213
  pBlock->srcTaskId = pTask->id.taskId;
×
UNCOV
214
  pBlock->srcVgId = pTask->pMeta->vgId;
×
215

UNCOV
216
  if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
×
217
    stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
×
218
    return 0;
×
219
  }
220

UNCOV
221
  int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
×
UNCOV
222
  if (code == 0) {
×
UNCOV
223
    code = streamDispatchStreamBlock(pTask);
×
224
  } else {
225
    stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
×
226
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
227
  }
228

UNCOV
229
  return code;
×
230
}
231

UNCOV
232
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
×
233
                                        int32_t transId) {
UNCOV
234
  int32_t     code = 0;
×
UNCOV
235
  int32_t     vgId = pTask->pMeta->vgId;
×
UNCOV
236
  int32_t     taskLevel = pTask->info.taskLevel;
×
UNCOV
237
  const char* id = pTask->id.idStr;
×
238

UNCOV
239
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
×
UNCOV
240
  if (pTask->chkInfo.checkpointId > checkpointId) {
×
UNCOV
241
    stError("s-task:%s vgId:%d current checkpointId:%" PRId64
×
242
            " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
243
            id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
UNCOV
244
    return TSDB_CODE_STREAM_INVLD_CHKPT;
×
245
  }
246

UNCOV
247
  if (pActiveInfo->failedId >= checkpointId) {
×
UNCOV
248
    stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
×
249
            " discard the checkpoint-trigger block",
250
            id, vgId, checkpointId, transId, pActiveInfo->failedId);
UNCOV
251
    return TSDB_CODE_STREAM_INVLD_CHKPT;
×
252
  }
253

UNCOV
254
  if (pTask->chkInfo.checkpointId == checkpointId) {
×
255
    {  // send checkpoint-ready msg to upstream
UNCOV
256
      SRpcMsg                msg = {0};
×
UNCOV
257
      SStreamUpstreamEpInfo* pInfo = NULL;
×
UNCOV
258
      streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
×
UNCOV
259
      if (pInfo == NULL) {
×
260
        return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
261
      }
262

UNCOV
263
      code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
×
UNCOV
264
      if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
265
        code = tmsgSendReq(&pInfo->epSet, &msg);
×
UNCOV
266
        if (code) {
×
267
          stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code));
×
268
        }
269
      }
270
    }
271

UNCOV
272
    stWarn(
×
273
        "s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume "
274
        "the interrupted checkpoint",
275
        id, vgId, pBlock->srcTaskId);
276

UNCOV
277
    return TSDB_CODE_STREAM_INVLD_CHKPT;
×
278
  }
279

UNCOV
280
  if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
×
UNCOV
281
    if (pActiveInfo->activeId != checkpointId) {
×
UNCOV
282
      stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
×
283
              " discard",
284
              id, vgId, pActiveInfo->activeId, checkpointId);
UNCOV
285
      return TSDB_CODE_STREAM_INVLD_CHKPT;
×
286
    } else {  // checkpointId == pActiveInfo->activeId
UNCOV
287
      if (pActiveInfo->allUpstreamTriggerRecv == 1) {
×
UNCOV
288
        stDebug(
×
289
            "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
290
            "checkpointId:%" PRId64 " transId:%d",
291
            id, vgId, checkpointId, transId);
UNCOV
292
        return TSDB_CODE_STREAM_INVLD_CHKPT;
×
293
      }
294

UNCOV
295
      if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
×
296
        //  check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
UNCOV
297
        for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
×
UNCOV
298
          STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
×
UNCOV
299
          if (p == NULL) {
×
300
            return TSDB_CODE_INVALID_PARA;
×
301
          }
302

UNCOV
303
          if (p->upstreamTaskId == pBlock->srcTaskId) {
×
UNCOV
304
            stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64
×
305
                   ", prev recvTs:%" PRId64 " discard",
306
                   pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
UNCOV
307
            return TSDB_CODE_STREAM_INVLD_CHKPT;
×
308
          }
309
        }
310
      }
311
    }
312
  }
313

UNCOV
314
  return TSDB_CODE_SUCCESS;
×
315
}
316

UNCOV
317
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
×
UNCOV
318
  int64_t                checkpointId = 0;
×
UNCOV
319
  int32_t                transId = 0;
×
UNCOV
320
  const char*            id = pTask->id.idStr;
×
UNCOV
321
  int32_t                code = TSDB_CODE_SUCCESS;
×
UNCOV
322
  int32_t                vgId = pTask->pMeta->vgId;
×
UNCOV
323
  int32_t                taskLevel = pTask->info.taskLevel;
×
UNCOV
324
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
×
325

UNCOV
326
  SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
×
UNCOV
327
  if (pDataBlock == NULL) {
×
328
    return TSDB_CODE_INVALID_PARA;
×
329
  }
330

UNCOV
331
  checkpointId = pDataBlock->info.version;
×
UNCOV
332
  transId = pDataBlock->info.window.skey;
×
333

UNCOV
334
  streamMutexLock(&pTask->lock);
×
UNCOV
335
  code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
×
UNCOV
336
  streamMutexUnlock(&pTask->lock);
×
UNCOV
337
  if (code) {
×
338
    if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
×
339
      streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
×
340
    }
341
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
342
    return code;
×
343
  }
344

UNCOV
345
  stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
×
346
          ", transId:%d current active checkpointId:%" PRId64,
347
          id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
348

349
  // set task status
UNCOV
350
  if (streamTaskGetStatus(pTask).state != TASK_STATUS__CK) {
×
UNCOV
351
    pActiveInfo->activeId = checkpointId;
×
UNCOV
352
    pActiveInfo->transId = transId;
×
353

UNCOV
354
    if (pTask->chkInfo.startTs == 0) {
×
UNCOV
355
      pTask->chkInfo.startTs = taosGetTimestampMs();
×
UNCOV
356
      pTask->execInfo.checkpoint += 1;
×
357
    }
358

UNCOV
359
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
×
UNCOV
360
    if (code != TSDB_CODE_SUCCESS) {
×
361
      stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
×
362
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
363
      return code;
×
364
    }
365

366
    // if previous launched timer not started yet, not start a new timer
367
    // todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch
368
    //  a new checkpoint-trigger timer right now.
369
    //  And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint
370
    //  procedure to be stucked.
UNCOV
371
    SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
×
UNCOV
372
    int8_t          old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
×
UNCOV
373
    if (old == 0) {
×
UNCOV
374
      stDebug("s-task:%s start checkpoint-trigger monitor in 10s", pTask->id.idStr);
×
375

UNCOV
376
      int64_t* pTaskRefId = NULL;
×
UNCOV
377
      code = streamTaskAllocRefId(pTask, &pTaskRefId);
×
UNCOV
378
      if (code == 0) {
×
UNCOV
379
        streamTmrStart(checkpointTriggerMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId,
×
380
                       "trigger-recv-monitor");
UNCOV
381
        pTmrInfo->launchChkptId = pActiveInfo->activeId;
×
382
      }
383
    } else {  // already launched, do nothing
384
      stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
×
385
    }
386
  }
387

388
#if 0
389
  taosMsleep(20*1000);
390
#endif
391

UNCOV
392
  if (taskLevel == TASK_LEVEL__SOURCE) {
×
UNCOV
393
    int8_t type = pTask->outputInfo.type;
×
UNCOV
394
    pActiveInfo->allUpstreamTriggerRecv = 1;
×
395

396
    // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
397
    // The transfer of state may generate new data that need to dispatch to downstream tasks,
398
    // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
399
    // before the next checkpoint.
UNCOV
400
    code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
×
UNCOV
401
    if (code) {
×
402
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
403
      return code;
×
404
    }
405

406
#if 0
407
    chkptFailedByRetrieveReqToSource(pTask, checkpointId);
408
#endif
409

UNCOV
410
    if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
×
UNCOV
411
      stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
×
UNCOV
412
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);  // todo handle this failure
×
413
    } else {  // only one task exists, no need to dispatch downstream info
414
      code =
UNCOV
415
          appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1);
×
UNCOV
416
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
417
    }
UNCOV
418
  } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
×
419
    // todo: handle this
420
    // update the child Id for downstream tasks
UNCOV
421
    code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
×
422

423
    // there are still some upstream tasks not send checkpoint request, do nothing and wait for then
UNCOV
424
    if (pActiveInfo->allUpstreamTriggerRecv != 1) {
×
UNCOV
425
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
UNCOV
426
      return code;
×
427
    }
428

UNCOV
429
    int32_t num = streamTaskGetNumOfUpstream(pTask);
×
UNCOV
430
    if (taskLevel == TASK_LEVEL__SINK) {
×
UNCOV
431
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
×
UNCOV
432
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
UNCOV
433
      code = streamTaskBuildCheckpoint(pTask);  // todo: not handle error yet
×
434
    } else {                                    // source & agg tasks need to forward the checkpoint msg downwards
UNCOV
435
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
×
UNCOV
436
      code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
×
UNCOV
437
      if (code) {
×
438
        return code;
×
439
      }
440

441
      // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
442
      // this task already. And then, dispatch check point msg to all downstream tasks
UNCOV
443
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
×
444
    }
445
  }
446

UNCOV
447
  return code;
×
448
}
449

450
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
UNCOV
451
static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
×
452
                                          int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
453
                                          const char* id, int32_t* pNotReady, int32_t* pTransId, bool* alreadyRecv) {
UNCOV
454
  *alreadyRecv = false;
×
UNCOV
455
  int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
×
UNCOV
456
  for (int32_t i = 0; i < size; ++i) {
×
UNCOV
457
    STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
×
UNCOV
458
    if (p == NULL) {
×
459
      return TSDB_CODE_INVALID_PARA;
×
460
    }
461

UNCOV
462
    if (p->downstreamTaskId == downstreamTaskId) {
×
463
      (*alreadyRecv) = true;
×
464
      break;
×
465
    }
466
  }
467

UNCOV
468
  if (*alreadyRecv) {
×
469
    stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
×
470
            downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
471
            numOfDownstream);
472
  } else {
UNCOV
473
    STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
×
474
                                     .downstreamTaskId = downstreamTaskId,
UNCOV
475
                                     .checkpointId = pInfo->activeId,
×
UNCOV
476
                                     .transId = pInfo->transId,
×
477
                                     .streamId = streamId,
478
                                     .downstreamNodeId = downstreamNodeId};
UNCOV
479
    void*                    p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
×
UNCOV
480
    if (p == NULL) {
×
481
      stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
×
482
      return terrno;
×
483
    }
484
  }
485

UNCOV
486
  *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
×
UNCOV
487
  *pTransId = pInfo->transId;
×
UNCOV
488
  return 0;
×
489
}
490

491
/**
492
 * All down stream tasks have successfully completed the check point task.
493
 * Current stream task is allowed to start to do checkpoint things in ASYNC model.
494
 */
UNCOV
495
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
×
496
                                        int32_t downstreamTaskId) {
UNCOV
497
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
498

UNCOV
499
  const char* id = pTask->id.idStr;
×
UNCOV
500
  int32_t     total = streamTaskGetNumOfDownstream(pTask);
×
UNCOV
501
  int32_t     code = 0;
×
UNCOV
502
  int32_t     notReady = 0;
×
UNCOV
503
  int32_t     transId = 0;
×
UNCOV
504
  bool        alreadyHandled = false;
×
505

506
  // 1. not in checkpoint status now
UNCOV
507
  SStreamTaskState pStat = streamTaskGetStatus(pTask);
×
UNCOV
508
  if (pStat.state != TASK_STATUS__CK) {
×
509
    stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
×
510
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
511
  }
512

513
  // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
UNCOV
514
  if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
×
515
    stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
×
516
            ") from task:0x%x, expired and discard",
517
            id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
518
    return TSDB_CODE_INVALID_MSG;
×
519
  }
520

UNCOV
521
  streamMutexLock(&pInfo->lock);
×
UNCOV
522
  code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, &notReady,
×
523
                                    &transId, &alreadyHandled);
UNCOV
524
  streamMutexUnlock(&pInfo->lock);
×
525

UNCOV
526
  if (alreadyHandled) {
×
527
    stDebug("s-task:%s checkpoint-ready msg checkpointId:%" PRId64 " from task:0x%x already handled, not handle again",
×
528
            id, checkpointId, downstreamTaskId);
529
  } else {
UNCOV
530
    if ((notReady == 0) && (code == 0) && (!alreadyHandled)) {
×
UNCOV
531
      stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
×
UNCOV
532
      code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
×
533
    }
534
  }
535

UNCOV
536
  return code;
×
537
}
538

UNCOV
539
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
×
UNCOV
540
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
UNCOV
541
  int64_t                now = taosGetTimestampMs();
×
UNCOV
542
  int32_t                numOfConfirmed = 0;
×
543

UNCOV
544
  streamMutexLock(&pInfo->lock);
×
UNCOV
545
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
×
UNCOV
546
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
×
UNCOV
547
    if (pReadyInfo == NULL) {
×
548
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
549
              pTask->id.idStr, i);
550
      continue;
×
551
    }
552

UNCOV
553
    if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
×
UNCOV
554
      pReadyInfo->sendCompleted = 1;
×
UNCOV
555
      stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
×
556
              pTask->id.idStr, upstreamTaskId, checkpointId, now);
UNCOV
557
      break;
×
558
    }
559
  }
560

UNCOV
561
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
×
UNCOV
562
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
×
UNCOV
563
    if (pReadyInfo == NULL) {
×
564
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
565
              pTask->id.idStr, i);
566
      continue;
×
567
    }
568

UNCOV
569
    if (pReadyInfo->sendCompleted == 1) {
×
UNCOV
570
      numOfConfirmed += 1;
×
571
    }
572
  }
573

UNCOV
574
  stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
×
575
          numOfConfirmed, checkpointId);
576

UNCOV
577
  streamMutexUnlock(&pInfo->lock);
×
UNCOV
578
  return TSDB_CODE_SUCCESS;
×
579
}
580

UNCOV
581
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
×
UNCOV
582
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
583

UNCOV
584
  pTask->chkInfo.startTs = 0;             // clear the recorded start time
×
UNCOV
585
  streamTaskOpenAllUpstreamInput(pTask);  // open inputQ for all upstream tasks
×
586

UNCOV
587
  streamMutexLock(&pInfo->lock);
×
UNCOV
588
  streamTaskClearActiveInfo(pInfo);
×
UNCOV
589
  if (clearChkpReadyMsg) {
×
UNCOV
590
    streamClearChkptReadyMsg(pInfo);
×
591
  }
UNCOV
592
  streamMutexUnlock(&pInfo->lock);
×
593

UNCOV
594
  stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64,
×
595
          pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
UNCOV
596
}
×
597

UNCOV
598
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
×
UNCOV
599
  SStreamMeta*     pMeta = pTask->pMeta;
×
UNCOV
600
  int32_t          vgId = pMeta->vgId;
×
UNCOV
601
  int32_t          code = 0;
×
UNCOV
602
  const char*      id = pTask->id.idStr;
×
UNCOV
603
  SCheckpointInfo* pInfo = &pTask->chkInfo;
×
604

UNCOV
605
  streamMutexLock(&pTask->lock);
×
606

607
  // not update the checkpoint info if the checkpointId is less than the failed checkpointId
UNCOV
608
  if (pReq->checkpointId < pInfo->pActiveInfo->failedId) {
×
UNCOV
609
    stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64
×
610
           " is less than the failed checkpointId:%" PRId64 ", discard the update info",
611
           id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId);
UNCOV
612
    streamMutexUnlock(&pTask->lock);
×
613

614
    // always return true
615
    return TSDB_CODE_SUCCESS;
×
616
  }
617

UNCOV
618
  if (pReq->checkpointId <= pInfo->checkpointId) {
×
UNCOV
619
    stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
×
620
            " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
621
            id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
622
            pReq->transId);
UNCOV
623
    streamMutexUnlock(&pTask->lock);
×
624

625
    {   // destroy the related fill-history tasks
626
       // drop task should not in the meta-lock, and drop the related fill-history task now
NEW
627
       if (pReq->dropRelHTask) {
×
NEW
628
         code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
×
NEW
629
         int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
NEW
630
         stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
×
631
                 id, vgId, pReq->taskId, numOfTasks);
632
       }
633

NEW
634
       if (pReq->dropRelHTask) {
×
NEW
635
         code = streamMetaCommit(pMeta);
×
636
       }
637
     }
638

639
    // always return true
UNCOV
640
    return TSDB_CODE_SUCCESS;
×
641
  }
642

UNCOV
643
  SStreamTaskState pStatus = streamTaskGetStatus(pTask);
×
644

UNCOV
645
  if (!restored) {  // during restore procedure, do update checkpoint-info
×
UNCOV
646
    stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
×
647
            " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
648
            id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
649
            pInfo->checkpointTime, pReq->checkpointTs);
650
  } else {  // not in restore status, must be in checkpoint status
UNCOV
651
    if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
×
UNCOV
652
      stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
×
653
              " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
654
              id, vgId, pStatus.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
655
              pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
656
    } else {
657
      stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
×
658
              " checkpointVer:%" PRId64 "->%" PRId64,
659
              id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
660
              pReq->checkpointVer);
661
    }
662
  }
663

UNCOV
664
  bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
×
UNCOV
665
                pInfo->processedVer <= pReq->checkpointVer);
×
666

UNCOV
667
  if (!valid) {
×
668
    stFatal("s-task:%s invalid checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
×
669
            " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it",
670
            id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
671
            pReq->checkpointVer);
672
    streamMutexUnlock(&pTask->lock);
×
673
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
674
  }
675

676
  // update only it is in checkpoint status, or during restore procedure.
UNCOV
677
  if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
×
UNCOV
678
    pInfo->checkpointId = pReq->checkpointId;
×
UNCOV
679
    pInfo->checkpointVer = pReq->checkpointVer;
×
UNCOV
680
    pInfo->checkpointTime = pReq->checkpointTs;
×
681

UNCOV
682
    if (restored && (pMeta->role == NODE_ROLE_LEADER)) {
×
UNCOV
683
      code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
×
684
    }
685
  }
686

UNCOV
687
  streamTaskClearCheckInfo(pTask, true);
×
688

UNCOV
689
  if (pReq->dropRelHTask) {
×
UNCOV
690
    stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
×
691
            pReq->taskId, vgId, pReq->hTaskId);
UNCOV
692
    CLEAR_RELATED_FILLHISTORY_TASK(pTask);
×
693
  }
694

UNCOV
695
  stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId,
×
696
          streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask).name);
697

UNCOV
698
  pTask->status.taskStatus = TASK_STATUS__READY;
×
699

NEW
700
  code = streamMetaSaveTaskInMeta(pMeta, pTask);
×
UNCOV
701
  streamMutexUnlock(&pTask->lock);
×
702

UNCOV
703
  if (code != TSDB_CODE_SUCCESS) {
×
704
    stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
×
705
            vgId, pReq->checkpointId, terrstr());
706
    return TSDB_CODE_SUCCESS;
×
707
  }
708

709
  // drop task should not in the meta-lock, and drop the related fill-history task now
UNCOV
710
  if (pReq->dropRelHTask) {
×
UNCOV
711
    code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
×
UNCOV
712
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
UNCOV
713
    stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
×
714
            (int32_t)pReq->hTaskId, numOfTasks);
715
  }
716

UNCOV
717
  code = streamMetaCommit(pMeta);
×
UNCOV
718
  return TSDB_CODE_SUCCESS;
×
719
}
720

UNCOV
721
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) {
×
UNCOV
722
  struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
723

UNCOV
724
  if (failedId <= 0) {
×
UNCOV
725
    stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64
×
726
           " activeId:%" PRId64,
727
           pTask->id.idStr, pInfo->failedId, pInfo->activeId);
728
  } else {
UNCOV
729
    if (failedId <= pInfo->failedId) {
×
UNCOV
730
      stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId);
×
731
    } else {
UNCOV
732
      stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64
×
733
              " prev failedId:%" PRId64,
734
              pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId);
UNCOV
735
      pInfo->failedId = failedId;
×
736
    }
737
  }
UNCOV
738
}
×
739

UNCOV
740
void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
×
UNCOV
741
  streamMutexLock(&pTask->lock);
×
UNCOV
742
  ETaskStatus status = streamTaskGetStatus(pTask).state;
×
UNCOV
743
  if (status == TASK_STATUS__CK) {
×
UNCOV
744
    streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId);
×
745
  }
UNCOV
746
  streamMutexUnlock(&pTask->lock);
×
UNCOV
747
}
×
748

UNCOV
749
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
×
UNCOV
750
  int32_t code = 0;
×
UNCOV
751
  int32_t cap = strlen(path) + 64;
×
752

UNCOV
753
  char* filePath = taosMemoryCalloc(1, cap);
×
UNCOV
754
  if (filePath == NULL) {
×
755
    return terrno;
×
756
  }
757

UNCOV
758
  int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
×
UNCOV
759
  if (nBytes <= 0 || nBytes >= cap) {
×
760
    taosMemoryFree(filePath);
×
761
    return TSDB_CODE_OUT_OF_RANGE;
×
762
  }
763

UNCOV
764
  code = downloadCheckpointDataByName(id, "META", filePath);
×
UNCOV
765
  if (code != 0) {
×
766
    stError("%s chkp failed to download meta file:%s", id, filePath);
×
767
    taosMemoryFree(filePath);
×
768
    return code;
×
769
  }
770

UNCOV
771
  code = remoteChkpGetDelFile(filePath, list);
×
UNCOV
772
  if (code != 0) {
×
UNCOV
773
    stError("%s chkp failed to get to del:%s", id, filePath);
×
UNCOV
774
    taosMemoryFree(filePath);
×
775
  }
UNCOV
776
  return 0;
×
777
}
778

UNCOV
779
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
×
UNCOV
780
  int32_t code = 0;
×
UNCOV
781
  char*   path = NULL;
×
782

UNCOV
783
  SStreamMeta* pMeta = pTask->pMeta;
×
UNCOV
784
  const char*  idStr = pTask->id.idStr;
×
UNCOV
785
  int64_t      now = taosGetTimestampMs();
×
786

UNCOV
787
  SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
×
UNCOV
788
  if (toDelFiles == NULL) {
×
789
    return terrno;
×
790
  }
791

UNCOV
792
  if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
×
793
                                      pTask->id.idStr)) != 0) {
UNCOV
794
    stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code));
×
795
  }
796

UNCOV
797
  if (type == DATA_UPLOAD_S3) {
×
UNCOV
798
    if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
×
799
      stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
×
800
              tstrerror(code));
801
    }
802
  }
803

UNCOV
804
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
805
    code = streamTaskUploadCheckpoint(idStr, path, checkpointId);
×
UNCOV
806
    if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
807
      stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
×
808
    } else {
UNCOV
809
      stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path,
×
810
              tstrerror(code));
811
    }
812
  }
813

UNCOV
814
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
815
    int32_t size = taosArrayGetSize(toDelFiles);
×
UNCOV
816
    stDebug("s-task:%s remove redundant %d files", idStr, size);
×
817

UNCOV
818
    for (int i = 0; i < size; i++) {
×
819
      char* pName = taosArrayGetP(toDelFiles, i);
×
820
      code = deleteCheckpointFile(idStr, pName);
×
821
      if (code != 0) {
×
822
        stDebug("s-task:%s failed to remove file: %s", idStr, pName);
×
823
        break;
×
824
      }
825
    }
826

UNCOV
827
    stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
×
828
  }
829

UNCOV
830
  taosArrayDestroyP(toDelFiles, NULL);
×
UNCOV
831
  double el = (taosGetTimestampMs() - now) / 1000.0;
×
832

UNCOV
833
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
834
    stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s",
×
835
            idStr, checkpointId, el, path);
UNCOV
836
    taosRemoveDir(path);
×
837
  } else {
UNCOV
838
    stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr,
×
839
            checkpointId, el);
840
  }
841

UNCOV
842
  taosMemoryFree(path);
×
UNCOV
843
  return code;
×
844
}
845

UNCOV
846
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
×
UNCOV
847
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
×
UNCOV
848
  if (type == DATA_UPLOAD_DISABLE) {
×
UNCOV
849
    stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr);
×
UNCOV
850
    return 0;
×
851
  }
852

UNCOV
853
  if (pTask == NULL || pTask->pBackend == NULL) {
×
854
    return 0;
×
855
  }
856

UNCOV
857
  int64_t dbRefId = taskGetDBRef(pTask->pBackend);
×
UNCOV
858
  void*   pBackend = taskAcquireDb(dbRefId);
×
UNCOV
859
  if (pBackend == NULL) {
×
860
    stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData",
×
861
            pTask->id.idStr);
862
    return -1;
×
863
  }
864

UNCOV
865
  int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
×
UNCOV
866
  taskReleaseDb(dbRefId);
×
867

UNCOV
868
  return code;
×
869
}
870

UNCOV
871
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
×
UNCOV
872
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
873
  int64_t      startTs = pTask->chkInfo.startTs;
×
UNCOV
874
  int64_t      ckId = pTask->chkInfo.pActiveInfo->activeId;
×
UNCOV
875
  const char*  id = pTask->id.idStr;
×
UNCOV
876
  bool         dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
×
UNCOV
877
  SStreamMeta* pMeta = pTask->pMeta;
×
878

879
  // sink task does not need to save the status, and generated the checkpoint
UNCOV
880
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
×
UNCOV
881
    stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
×
882

UNCOV
883
    int64_t ver = pTask->chkInfo.processedVer;
×
UNCOV
884
    code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver);
×
UNCOV
885
    if (code != TSDB_CODE_SUCCESS) {
×
886
      stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
×
887
    }
888
  }
889

890
  // TODO: monitoring the checkpoint-source msg
891
  // send check point response to upstream task
UNCOV
892
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
893
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
UNCOV
894
      code = streamTaskSendCheckpointSourceRsp(pTask);
×
895
    } else {
UNCOV
896
      code = streamTaskSendCheckpointReadyMsg(pTask);
×
897
    }
898

UNCOV
899
    if (code != TSDB_CODE_SUCCESS) {
×
900
      // todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
901
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
×
902
              tstrerror(code));
903
    }
904
  }
905

UNCOV
906
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
907
    code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
×
UNCOV
908
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
909
      stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
×
910
    }
911
  } else {
912
    stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
×
913
  }
914

915
  // TODO: monitoring the checkpoint-report msg
916
  // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
UNCOV
917
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
918
    if (pTask->pMsgCb != NULL) {
×
UNCOV
919
      code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
×
920
    }
921
  } else {  // clear the checkpoint info if failed
922
    // set failed checkpoint id before clear the checkpoint info
UNCOV
923
    streamMutexLock(&pTask->lock);
×
UNCOV
924
    streamTaskSetFailedCheckpointId(pTask, ckId);
×
UNCOV
925
    streamMutexUnlock(&pTask->lock);
×
926

UNCOV
927
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
×
UNCOV
928
    stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
×
929
  }
930

UNCOV
931
  double el = (taosGetTimestampMs() - startTs) / 1000.0;
×
UNCOV
932
  stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
×
933
         pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
934
         (code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
935

UNCOV
936
  return code;
×
937
}
938

UNCOV
939
static int32_t doChkptStatusCheck(SStreamTask* pTask, void* param) {
×
UNCOV
940
  const char*            id = pTask->id.idStr;
×
UNCOV
941
  int32_t                vgId = pTask->pMeta->vgId;
×
UNCOV
942
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
×
UNCOV
943
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
×
944

945
  // checkpoint-trigger recv flag is set, quit
UNCOV
946
  if (pActiveInfo->allUpstreamTriggerRecv) {
×
947
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
948
    stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger", id, vgId);
×
949
    return -1;
×
950
  }
951

UNCOV
952
  if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
×
953
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
954
    stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
×
955
           ", quit",
956
           id, vgId, pTmrInfo->launchChkptId);
957
    return -1;
×
958
  }
959

960
  // active checkpoint info is cleared for now
UNCOV
961
  if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
×
962
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
963
    stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr", id,
×
964
           vgId);
965
    return -1;
×
966
  }
967

UNCOV
968
  return 0;
×
969
}
970

UNCOV
971
static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) {
×
UNCOV
972
  const char*            id = pTask->id.idStr;
×
UNCOV
973
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
×
974

UNCOV
975
  SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
×
UNCOV
976
  if (pNotSendList == NULL) {
×
977
    stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
×
978
    return terrno;
×
979
  }
980

UNCOV
981
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
UNCOV
982
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
×
983

UNCOV
984
    bool recved = false;
×
UNCOV
985
    for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
×
UNCOV
986
      STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
×
UNCOV
987
      if (pReady == NULL) {
×
988
        continue;
×
989
      }
990

UNCOV
991
      if (pInfo->nodeId == pReady->upstreamNodeId) {
×
UNCOV
992
        recved = true;
×
UNCOV
993
        break;
×
994
      }
995
    }
996

UNCOV
997
    if (!recved) {  // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
×
UNCOV
998
      streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
×
UNCOV
999
      void* px = taosArrayPush(pNotSendList, pInfo);
×
UNCOV
1000
      if (px == NULL) {
×
1001
        stError("s-task:%s failed to record not send info, code: out of memory", id);
×
1002
        taosArrayDestroy(pNotSendList);
×
1003
        return terrno;
×
1004
      }
1005
    }
1006
  }
1007

UNCOV
1008
  *ppNotSendList = pNotSendList;
×
UNCOV
1009
  return 0;
×
1010
}
1011

UNCOV
1012
int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) {
×
UNCOV
1013
  const char*            id = pTask->id.idStr;
×
UNCOV
1014
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
×
UNCOV
1015
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
×
UNCOV
1016
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
×
UNCOV
1017
  int32_t                vgId = pTask->pMeta->vgId;
×
1018

UNCOV
1019
  int32_t code = doChkptStatusCheck(pTask, param);
×
UNCOV
1020
  if (code) {
×
1021
    return code;
×
1022
  }
1023

UNCOV
1024
  code = doFindNotSendUpstream(pTask, pList, ppNotSendList);
×
UNCOV
1025
  if (code) {
×
1026
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1027
    stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
×
1028
    return code;
×
1029
  }
1030

1031
  // do send retrieve checkpoint trigger msg to upstream
UNCOV
1032
  code = doSendRetrieveTriggerMsg(pTask, *ppNotSendList);
×
UNCOV
1033
  if (code) {
×
1034
    stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
1035
    code = 0;
×
1036
  }
1037

UNCOV
1038
  return code;
×
1039
}
1040

UNCOV
1041
static void doCleanup(SStreamTask* pTask, SArray* pList) {
×
UNCOV
1042
  streamMetaReleaseTask(pTask->pMeta, pTask);
×
UNCOV
1043
  taosArrayDestroy(pList);
×
UNCOV
1044
}
×
1045

UNCOV
1046
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
×
UNCOV
1047
  int32_t      code = 0;
×
UNCOV
1048
  int32_t      numOfNotSend = 0;
×
UNCOV
1049
  SArray*      pNotSendList = NULL;
×
UNCOV
1050
  int64_t      taskRefId = *(int64_t*)param;
×
UNCOV
1051
  int64_t      now = taosGetTimestampMs();
×
1052

UNCOV
1053
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
×
UNCOV
1054
  if (pTask == NULL) {
×
UNCOV
1055
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
×
UNCOV
1056
    streamTaskFreeRefId(param);
×
UNCOV
1057
    return;
×
1058
  }
1059

UNCOV
1060
  int32_t                vgId = pTask->pMeta->vgId;
×
UNCOV
1061
  const char*            id = pTask->id.idStr;
×
UNCOV
1062
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
×
UNCOV
1063
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
×
UNCOV
1064
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
×
1065

UNCOV
1066
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
1067
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1068
    stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id);
×
1069
    doCleanup(pTask, pNotSendList);
×
1070
    return;
×
1071
  }
1072

1073
  // check the status every 100ms
UNCOV
1074
  if (streamTaskShouldStop(pTask)) {
×
UNCOV
1075
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
UNCOV
1076
    stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger", id, vgId);
×
UNCOV
1077
    doCleanup(pTask, pNotSendList);
×
UNCOV
1078
    return;
×
1079
  }
1080

UNCOV
1081
  if (++pTmrInfo->activeCounter < 50) {
×
UNCOV
1082
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
×
1083
                   "trigger-recv-monitor");
UNCOV
1084
    doCleanup(pTask, pNotSendList);
×
UNCOV
1085
    return;
×
1086
  }
1087

UNCOV
1088
  pTmrInfo->activeCounter = 0;
×
UNCOV
1089
  stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
×
1090

UNCOV
1091
  streamMutexLock(&pTask->lock);
×
UNCOV
1092
  SStreamTaskState state = streamTaskGetStatus(pTask);
×
UNCOV
1093
  streamMutexUnlock(&pTask->lock);
×
1094

UNCOV
1095
  if (state.state != TASK_STATUS__CK) {
×
UNCOV
1096
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
UNCOV
1097
    stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger", id,
×
1098
            vgId, state.name);
UNCOV
1099
    doCleanup(pTask, pNotSendList);
×
UNCOV
1100
    return;
×
1101
  }
1102

UNCOV
1103
  streamMutexLock(&pActiveInfo->lock);
×
UNCOV
1104
  code = chkptTriggerRecvMonitorHelper(pTask, param, &pNotSendList);
×
UNCOV
1105
  streamMutexUnlock(&pActiveInfo->lock);
×
1106

UNCOV
1107
  if (code != TSDB_CODE_SUCCESS) {
×
1108
    doCleanup(pTask, pNotSendList);
×
1109
    return;
×
1110
  }
1111

1112
  // check every 100ms
UNCOV
1113
  numOfNotSend = taosArrayGetSize(pNotSendList);
×
UNCOV
1114
  if (numOfNotSend > 0) {
×
UNCOV
1115
    stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
×
UNCOV
1116
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
×
1117
                   "trigger-recv-monitor");
1118
  } else {
1119
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1120
    stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr", id);
×
1121
  }
1122

UNCOV
1123
  doCleanup(pTask, pNotSendList);
×
1124
}
1125

UNCOV
1126
int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
×
UNCOV
1127
  int32_t     code = 0;
×
UNCOV
1128
  int32_t     vgId = pTask->pMeta->vgId;
×
UNCOV
1129
  const char* pId = pTask->id.idStr;
×
UNCOV
1130
  int32_t     size = taosArrayGetSize(pNotSendList);
×
UNCOV
1131
  int32_t     numOfUpstream = streamTaskGetNumOfUpstream(pTask);
×
UNCOV
1132
  int64_t     checkpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1133

UNCOV
1134
  if (size <= 0) {
×
1135
    stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
×
1136
    return code;
×
1137
  }
1138

UNCOV
1139
  stDebug("s-task:%s %d/%d not recv checkpoint-trigger from upstream(s), start to send trigger-retrieve", pId, size,
×
1140
          numOfUpstream);
1141

UNCOV
1142
  for (int32_t i = 0; i < size; i++) {
×
UNCOV
1143
    SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
×
UNCOV
1144
    if (pUpstreamTask == NULL) {
×
1145
      return TSDB_CODE_INVALID_PARA;
×
1146
    }
1147

UNCOV
1148
    int32_t  ret = 0;
×
UNCOV
1149
    int32_t  tlen = 0;
×
UNCOV
1150
    void*    buf = NULL;
×
UNCOV
1151
    SRpcMsg  rpcMsg = {0};
×
1152
    SEncoder encoder;
1153

UNCOV
1154
    SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId,
×
UNCOV
1155
                                    .downstreamTaskId = pTask->id.taskId,
×
1156
                                    .downstreamNodeId = vgId,
UNCOV
1157
                                    .upstreamTaskId = pUpstreamTask->taskId,
×
UNCOV
1158
                                    .upstreamNodeId = pUpstreamTask->nodeId,
×
1159
                                    .checkpointId = checkpointId};
1160

UNCOV
1161
    tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
×
UNCOV
1162
    if (ret < 0) {
×
1163
      stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code));
×
1164
    }
1165

UNCOV
1166
    buf = rpcMallocCont(tlen + sizeof(SMsgHead));
×
UNCOV
1167
    if (buf == NULL) {
×
1168
      stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId);
×
1169
      continue;
×
1170
    }
1171

UNCOV
1172
    ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId);
×
UNCOV
1173
    void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
1174

UNCOV
1175
    tEncoderInit(&encoder, abuf, tlen);
×
UNCOV
1176
    if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) {
×
1177
      rpcFreeCont(buf);
×
1178
      tEncoderClear(&encoder);
×
1179
      stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code));
×
1180
      continue;
×
1181
    }
UNCOV
1182
    tEncoderClear(&encoder);
×
1183

UNCOV
1184
    initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead));
×
1185

UNCOV
1186
    code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
×
UNCOV
1187
    if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
1188
      stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId,
×
1189
              vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1190
    } else {
1191
      stError("s-task:%s vgId:%d failed to send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64,
×
1192
              pId, vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1193
    }
1194
  }
1195

UNCOV
1196
  return code;
×
1197
}
1198

UNCOV
1199
static int32_t isAlreadySendTriggerNoLock(SStreamTask* pTask, int32_t downstreamNodeId) {
×
UNCOV
1200
  int64_t                now = taosGetTimestampMs();
×
UNCOV
1201
  const char*            id = pTask->id.idStr;
×
UNCOV
1202
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
UNCOV
1203
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
×
1204

UNCOV
1205
  if (!pInfo->dispatchTrigger) {
×
1206
    return false;
×
1207
  }
1208

UNCOV
1209
  int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList);
×
UNCOV
1210
  for (int32_t i = 0; i < num; ++i) {
×
UNCOV
1211
    STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
×
UNCOV
1212
    if (pSendInfo == NULL) {
×
1213
      stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num);
×
1214
      continue;
×
1215
    }
1216

UNCOV
1217
    if (pSendInfo->nodeId != downstreamNodeId) {
×
1218
      continue;
×
1219
    }
1220

1221
    // has send trigger msg to downstream node,
UNCOV
1222
    double before = (now - pSendInfo->sendTs) / 1000.0;
×
UNCOV
1223
    if (pSendInfo->recved) {
×
1224
      stWarn("s-task:%s checkpoint-trigger msg already send at:%" PRId64
×
1225
             "(%.2fs before) and recv confirmed by downstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
1226
             id, pSendInfo->sendTs, before, pSendInfo->taskId, pInfo->activeId, pInfo->transId);
1227
    } else {
UNCOV
1228
      stWarn("s-task:%s checkpoint-trigger already send at:%" PRId64 "(%.2fs before), checkpointId:%" PRId64
×
1229
             ", transId:%d",
1230
             id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
1231
    }
1232

UNCOV
1233
    return true;
×
1234
  }
1235

1236
  return false;
×
1237
}
1238

UNCOV
1239
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
×
UNCOV
1240
  int64_t                now = taosGetTimestampMs();
×
UNCOV
1241
  const char*            id = pTask->id.idStr;
×
UNCOV
1242
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
UNCOV
1243
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
×
1244

UNCOV
1245
  if (pStatus.state != TASK_STATUS__CK) {
×
1246
    return false;
×
1247
  }
1248

UNCOV
1249
  streamMutexLock(&pInfo->lock);
×
UNCOV
1250
  bool send = isAlreadySendTriggerNoLock(pTask, downstreamNodeId);
×
UNCOV
1251
  streamMutexUnlock(&pInfo->lock);
×
1252

UNCOV
1253
  return send;
×
1254
}
1255

UNCOV
1256
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) {
×
UNCOV
1257
  *pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList);
×
1258

UNCOV
1259
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
UNCOV
1260
    *pTotal = 1;
×
1261
  } else {
UNCOV
1262
    *pTotal = streamTaskGetNumOfUpstream(pTask);
×
1263
  }
UNCOV
1264
}
×
1265

1266
// record the dispatch checkpoint trigger info in the list
1267
// memory insufficient may cause the stream computing stopped
UNCOV
1268
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId) {
×
UNCOV
1269
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
UNCOV
1270
  int64_t                now = taosGetTimestampMs();
×
UNCOV
1271
  int32_t                code = 0;
×
1272

UNCOV
1273
  streamMutexLock(&pInfo->lock);
×
1274

UNCOV
1275
  if (sendingChkptId > pInfo->failedId) {
×
UNCOV
1276
    pInfo->dispatchTrigger = true;
×
UNCOV
1277
    if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
×
UNCOV
1278
      STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
×
1279

UNCOV
1280
      STaskTriggerSendInfo p = {
×
UNCOV
1281
          .sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
×
UNCOV
1282
      void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
×
UNCOV
1283
      if (px == NULL) {  // pause the stream task, if memory not enough
×
1284
        code = terrno;
×
1285
      }
1286
    } else {
UNCOV
1287
      for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
×
UNCOV
1288
        SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
×
UNCOV
1289
        if (pVgInfo == NULL) {
×
1290
          continue;
×
1291
        }
1292

UNCOV
1293
        STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
×
UNCOV
1294
        void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
×
UNCOV
1295
        if (px == NULL) {  // pause the stream task, if memory not enough
×
1296
          code = terrno;
×
1297
          break;
×
1298
        }
1299
      }
1300
    }
1301
  }
1302

UNCOV
1303
  streamMutexUnlock(&pInfo->lock);
×
1304

UNCOV
1305
  return code;
×
1306
}
1307

UNCOV
1308
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
×
UNCOV
1309
  int32_t num = 0;
×
UNCOV
1310
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
×
UNCOV
1311
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
×
UNCOV
1312
    if (p == NULL) {
×
1313
      continue;
×
1314
    }
1315

UNCOV
1316
    if (p->recved) {
×
UNCOV
1317
      num++;
×
1318
    }
1319
  }
UNCOV
1320
  return num;
×
1321
}
1322

UNCOV
1323
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
×
UNCOV
1324
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
1325

UNCOV
1326
  int64_t now = taosGetTimestampMs();
×
UNCOV
1327
  int32_t taskId = 0;
×
UNCOV
1328
  int32_t total = streamTaskGetNumOfDownstream(pTask);
×
UNCOV
1329
  bool    alreadyRecv = false;
×
1330

UNCOV
1331
  streamMutexLock(&pInfo->lock);
×
1332

UNCOV
1333
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
×
UNCOV
1334
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
×
UNCOV
1335
    if (p == NULL) {
×
1336
      continue;
×
1337
    }
1338

UNCOV
1339
    if (p->nodeId == vgId) {
×
UNCOV
1340
      if (p->recved) {
×
1341
        stWarn("s-task:%s already recv checkpoint-trigger msg rsp from vgId:%d down:0x%x %.2fs ago, req send:%" PRId64
×
1342
               " discard",
1343
               pTask->id.idStr, vgId, p->taskId, (now - p->recvTs) / 1000.0, p->sendTs);
1344
        alreadyRecv = true;
×
1345
      } else {
UNCOV
1346
        p->recved = true;
×
UNCOV
1347
        p->recvTs = taosGetTimestampMs();
×
UNCOV
1348
        taskId = p->taskId;
×
1349
      }
UNCOV
1350
      break;
×
1351
    }
1352
  }
1353

UNCOV
1354
  int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
×
UNCOV
1355
  streamMutexUnlock(&pInfo->lock);
×
1356

UNCOV
1357
  if (taskId == 0) {
×
1358
    stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
×
1359
  } else {
UNCOV
1360
    if (!alreadyRecv) {
×
UNCOV
1361
      stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
×
1362
              pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
1363
    }
1364
  }
UNCOV
1365
}
×
1366

UNCOV
1367
int32_t uploadCheckpointToS3(const char* id, const char* path) {
×
UNCOV
1368
  int32_t code = 0;
×
UNCOV
1369
  int32_t nBytes = 0;
×
1370
  /*
1371
  if (s3Init() != 0) {
1372
    return TSDB_CODE_THIRDPARTY_ERROR;
1373
  }
1374
  */
UNCOV
1375
  TdDirPtr pDir = taosOpenDir(path);
×
UNCOV
1376
  if (pDir == NULL) {
×
1377
    return terrno;
×
1378
  }
1379

UNCOV
1380
  TdDirEntryPtr de = NULL;
×
UNCOV
1381
  while ((de = taosReadDir(pDir)) != NULL) {
×
UNCOV
1382
    char* name = taosGetDirEntryName(de);
×
UNCOV
1383
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
×
1384

UNCOV
1385
    char filename[PATH_MAX] = {0};
×
UNCOV
1386
    if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
×
1387
      nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
×
1388
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
×
1389
        code = TSDB_CODE_OUT_OF_RANGE;
×
1390
        break;
×
1391
      }
1392
    } else {
UNCOV
1393
      nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
×
UNCOV
1394
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
×
1395
        code = TSDB_CODE_OUT_OF_RANGE;
×
1396
        break;
×
1397
      }
1398
    }
1399

UNCOV
1400
    char object[PATH_MAX] = {0};
×
UNCOV
1401
    nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
×
UNCOV
1402
    if (nBytes <= 0 || nBytes >= sizeof(object)) {
×
1403
      code = TSDB_CODE_OUT_OF_RANGE;
×
1404
      break;
×
1405
    }
1406

UNCOV
1407
    code = tcsPutObjectFromFile2(filename, object, 0);
×
UNCOV
1408
    if (code != 0) {
×
UNCOV
1409
      stError("[tcs] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
×
1410
    } else {
1411
      stDebug("[tcs] upload checkpoint:%s", filename);
×
1412
    }
1413
  }
1414

UNCOV
1415
  int32_t ret = taosCloseDir(&pDir);
×
UNCOV
1416
  if (code == 0 && ret != 0) {
×
1417
    code = ret;
×
1418
  }
1419

UNCOV
1420
  return code;
×
1421
}
1422

UNCOV
1423
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
×
1424
  int32_t nBytes;
UNCOV
1425
  int32_t cap = strlen(id) + strlen(dstName) + 16;
×
1426

UNCOV
1427
  char* buf = taosMemoryCalloc(1, cap);
×
UNCOV
1428
  if (buf == NULL) {
×
1429
    return terrno;
×
1430
  }
1431

UNCOV
1432
  nBytes = snprintf(buf, cap, "%s/%s", id, fname);
×
UNCOV
1433
  if (nBytes <= 0 || nBytes >= cap) {
×
1434
    taosMemoryFree(buf);
×
1435
    return TSDB_CODE_OUT_OF_RANGE;
×
1436
  }
UNCOV
1437
  int32_t code = tcsGetObjectToFile(buf, dstName);
×
UNCOV
1438
  if (code != 0) {
×
UNCOV
1439
    taosMemoryFree(buf);
×
UNCOV
1440
    return TAOS_SYSTEM_ERROR(errno);
×
1441
  }
1442
  taosMemoryFree(buf);
×
1443
  return 0;
×
1444
}
1445

UNCOV
1446
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
×
UNCOV
1447
  if (strlen(tsSnodeAddress) != 0) {
×
UNCOV
1448
    return DATA_UPLOAD_RSYNC;
×
UNCOV
1449
  } else if (tsS3StreamEnabled) {
×
1450
    return DATA_UPLOAD_S3;
×
1451
  } else {
UNCOV
1452
    return DATA_UPLOAD_DISABLE;
×
1453
  }
1454
}
1455

UNCOV
1456
int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) {
×
UNCOV
1457
  int32_t code = 0;
×
UNCOV
1458
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
×
UNCOV
1459
    stError("invalid parameters in upload checkpoint, %s", id);
×
UNCOV
1460
    return TSDB_CODE_INVALID_CFG;
×
1461
  }
1462

UNCOV
1463
  if (strlen(tsSnodeAddress) != 0) {
×
1464
    code = uploadByRsync(id, path, checkpointId);
×
1465
    if (code != 0) {
×
1466
      return TAOS_SYSTEM_ERROR(errno);
×
1467
    }
UNCOV
1468
  } else if (tsS3StreamEnabled) {
×
1469
    return uploadCheckpointToS3(id, path);
×
1470
  }
1471

UNCOV
1472
  return 0;
×
1473
}
1474

1475
// fileName:  CURRENT
UNCOV
1476
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
×
UNCOV
1477
  if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
×
1478
    stError("down load checkpoint data parameters invalid");
×
1479
    return TSDB_CODE_INVALID_PARA;
×
1480
  }
1481

UNCOV
1482
  if (strlen(tsSnodeAddress) != 0) {
×
1483
    return 0;
×
UNCOV
1484
  } else if (tsS3StreamEnabled) {
×
1485
    return downloadCheckpointByNameS3(id, fname, dstName);
×
1486
  }
1487

UNCOV
1488
  return 0;
×
1489
}
1490

UNCOV
1491
int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) {
×
UNCOV
1492
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
×
1493
    stError("down checkpoint data parameters invalid");
×
1494
    return -1;
×
1495
  }
1496

UNCOV
1497
  if (strlen(tsSnodeAddress) != 0) {
×
UNCOV
1498
    return downloadByRsync(id, path, checkpointId);
×
1499
  } else if (tsS3StreamEnabled) {
×
1500
    return tcsGetObjectsByPrefix(id, path);
×
1501
  }
1502

1503
  return 0;
×
1504
}
1505

1506
#ifdef BUILD_NO_CALL
1507
int32_t deleteCheckpoint(const char* id) {
1508
  if (id == NULL || strlen(id) == 0) {
1509
    stError("deleteCheckpoint parameters invalid");
1510
    return TSDB_CODE_INVALID_PARA;
1511
  }
1512
  if (strlen(tsSnodeAddress) != 0) {
1513
    return deleteRsync(id);
1514
  } else if (tsS3StreamEnabled) {
1515
    tcsDeleteObjectsByPrefix(id);
1516
  }
1517
  return 0;
1518
}
1519
#endif
1520

UNCOV
1521
int32_t deleteCheckpointFile(const char* id, const char* name) {
×
UNCOV
1522
  char object[128] = {0};
×
1523

UNCOV
1524
  int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name);
×
UNCOV
1525
  if (nBytes <= 0 || nBytes >= sizeof(object)) {
×
1526
    return TSDB_CODE_OUT_OF_RANGE;
×
1527
  }
1528

UNCOV
1529
  char*   tmp = object;
×
UNCOV
1530
  int32_t code = tcsDeleteObjects((const char**)&tmp, 1);
×
UNCOV
1531
  if (code != 0) {
×
UNCOV
1532
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1533
  }
1534
  return code;
×
1535
}
1536

UNCOV
1537
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
×
UNCOV
1538
  streamMutexLock(&pTask->lock);
×
UNCOV
1539
  ETaskStatus p = streamTaskGetStatus(pTask).state;
×
1540
  //  if (pInfo->alreadySendChkptId == true) {
1541
  //    stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
1542
  //    streamMutexUnlock(&pTask->lock);
1543
  //    return TSDB_CODE_SUCCESS;
1544
  //  } else {
1545
  //    pInfo->alreadySendChkptId = true;
1546
  //  }
1547
  //
UNCOV
1548
  streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
×
UNCOV
1549
  streamMutexUnlock(&pTask->lock);
×
1550

UNCOV
1551
  if (pTask->pBackend != NULL) {
×
1552
    streamFreeTaskState(pTask, p);
×
1553
    pTask->pBackend = NULL;
×
1554
  }
UNCOV
1555
  return 0;
×
1556
}
1557

UNCOV
1558
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
×
UNCOV
1559
  int32_t code = 0;
×
UNCOV
1560
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
×
UNCOV
1561
    return code;
×
1562
  }
1563

UNCOV
1564
  streamMutexLock(&pTask->lock);
×
UNCOV
1565
  SStreamTaskState p = streamTaskGetStatus(pTask);
×
UNCOV
1566
  if (p.state == TASK_STATUS__CK) {
×
1567
    code = streamTaskSendCheckpointSourceRsp(pTask);
×
1568
  }
UNCOV
1569
  streamMutexUnlock(&pTask->lock);
×
1570

UNCOV
1571
  return code;
×
1572
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc