• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4271

10 Jun 2025 09:45AM UTC coverage: 62.985% (+0.002%) from 62.983%
#4271

push

travis-ci

web-flow
Merge pull request #31337 from taosdata/newtest_3.0

fix TD-35057 and TD-35346

158179 of 319671 branches covered (49.48%)

Branch coverage included in aggregate %.

243860 of 318637 relevant lines covered (76.53%)

18624660.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.47
/source/libs/stream/src/streamStartTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tmisce.h"
20
#include "tref.h"
21
#include "tsched.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25

26
typedef struct STaskInitTs {
27
  int64_t start;
28
  int64_t end;
29
  bool    success;
30
} STaskInitTs;
31

32
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now);
33
static bool    allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
34
static void    displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
35

36
// restore the checkpoint id by negotiating the latest consensus checkpoint id
37
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
11,314✔
38
  int32_t code = TSDB_CODE_SUCCESS;
11,314✔
39
  int32_t vgId = pMeta->vgId;
11,314✔
40
  int64_t now = taosGetTimestampMs();
11,313✔
41
  SArray* pTaskList = NULL;
11,313✔
42
  int32_t numOfConsensusChkptIdTasks = 0;
11,313✔
43
  int32_t numOfTasks = 0;
11,313✔
44

45
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
11,313✔
46
  if (numOfTasks == 0) {
11,319✔
47
    stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
11,200✔
48
    streamMetaResetStartInfo(&pMeta->startInfo, vgId);
11,203✔
49
    return TSDB_CODE_SUCCESS;
11,202✔
50
  }
51

52
  stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
119!
53

54
  code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
119✔
55
  if (code != TSDB_CODE_SUCCESS) {
120!
56
    return TSDB_CODE_SUCCESS;  // ignore the error and return directly
×
57
  }
58

59
  // broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
60
  numOfTasks = taosArrayGetSize(pTaskList);
120✔
61

62
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
63
  // initialization, when the operation of check downstream tasks status is executed far quickly.
64
  for (int32_t i = 0; i < numOfTasks; ++i) {
594✔
65
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
473✔
66
    SStreamTask*   pTask = NULL;
472✔
67

68
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
472✔
69
    if ((pTask == NULL) || (code != 0)) {
474!
70
      stError("vgId:%d failed to acquire task:0x%x during start task, it may be dropped", pMeta->vgId, pTaskId->taskId);
1!
71
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false);
1✔
72
      if (ret) {
×
73
        stError("s-task:0x%x add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
74
      }
75
      continue;
×
76
    }
77

78
    if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
473!
79
      code = pMeta->expandTaskFn(pTask);
122✔
80
      if (code != TSDB_CODE_SUCCESS) {
122!
81
        stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
×
82
        streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs, false);
×
83
      }
84
    }
85

86
    streamMetaReleaseTask(pMeta, pTask);
473✔
87
  }
88

89
  // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
90
  for (int32_t i = 0; i < numOfTasks; ++i) {
595✔
91
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
474✔
92

93
    SStreamTask* pTask = NULL;
474✔
94
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
474✔
95
    if ((pTask == NULL) || (code != 0)) {
474!
96
      stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
1!
97
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false);
1✔
98
      if (ret) {
×
99
        stError("s-task:0x%x failed add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
100
      }
101

102
      continue;
122✔
103
    }
104

105
    STaskExecStatisInfo* pInfo = &pTask->execInfo;
473✔
106

107
    // fill-history task can only be launched by related stream tasks.
108
    if (pTask->info.fillHistory == 1) {
473✔
109
      stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
61!
110
      streamMetaReleaseTask(pMeta, pTask);
61✔
111
      continue;
61✔
112
    }
113

114
    // ready now, start the related fill-history task
115
    if (pTask->status.downstreamReady == 1) {
412!
116
      if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
117
        stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
×
118
                pTask->id.idStr);
119
        code = streamLaunchFillHistoryTask(pTask, false);  // todo: how about retry launch fill-history task?
×
120
        if (code) {
×
121
          stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
×
122
        }
123
      }
124

125
      code = streamMetaAddTaskLaunchResultNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs,
×
126
                                                 pInfo->readyTs, true);
127
      streamMetaReleaseTask(pMeta, pTask);
×
128
      continue;
×
129
    }
130

131
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
412✔
132
      int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
61✔
133
      if (ret != TSDB_CODE_SUCCESS) {
61!
134
        stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
×
135
        code = ret;
×
136

137
        // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
138
        if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
139
          streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
×
140
        }
141
      }
142

143
      streamMetaReleaseTask(pMeta, pTask);
61✔
144
      continue;
61✔
145
    }
146

147
    // negotiate the consensus checkpoint id for current task
148
    code = streamTaskSendNegotiateChkptIdMsg(pTask);
351✔
149
    if (code == 0) {
352!
150
      numOfConsensusChkptIdTasks += 1;
352✔
151
    }
152

153
    // this task may have no checkpoint, but others tasks may generate checkpoint already?
154
    streamMetaReleaseTask(pMeta, pTask);
352✔
155
  }
156

157
  if (numOfConsensusChkptIdTasks > 0) {
121✔
158
    pMeta->startInfo.curStage = START_MARK_REQ_CHKPID;
89✔
159
    SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
89✔
160

161
    void*   p = taosArrayPush(pMeta->startInfo.pStagesList, &info);
89✔
162
    int32_t num = (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList);
89✔
163

164
    if (p != NULL) {
89!
165
      stDebug("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64 " numOfStageHist:%d", pMeta->vgId,
89✔
166
              numOfConsensusChkptIdTasks, info.ts, num);
167
    } else {
168
      stError("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64
×
169
              " numOfStageHist:%d, FAILED, out of memory",
170
              pMeta->vgId, numOfConsensusChkptIdTasks, info.ts, num);
171
    }
172
  }
173

174
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
175
  // initialization, when the operation of check downstream tasks status is executed far quickly.
176
  stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
120!
177
  taosArrayDestroy(pTaskList);
120✔
178
  return code;
120✔
179
}
180

181
int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
120✔
182
  STaskStartInfo* pInfo = &pMeta->startInfo;
120✔
183
  if (pMeta->closeFlag) {
120!
184
    stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
×
185
    return TSDB_CODE_FAILED;
×
186
  }
187

188
  *pList = taosArrayDup(pMeta->pTaskList, NULL);
120✔
189
  if (*pList == NULL) {
120!
190
    stError("vgId:%d failed to dup tasklist, before restart tasks, code:%s", pMeta->vgId, tstrerror(terrno));
×
191
    return terrno;
×
192
  }
193

194
  taosHashClear(pInfo->pReadyTaskSet);
120✔
195
  taosHashClear(pInfo->pFailedTaskSet);
120✔
196
  taosArrayClear(pInfo->pStagesList);
120✔
197
  taosArrayClear(pInfo->pRecvChkptIdTasks);
120✔
198

199
  pInfo->partialTasksStarted = false;
120✔
200
  pInfo->curStage = 0;
120✔
201
  pInfo->startTs = now;
120✔
202

203
  int32_t code = streamMetaResetTaskStatus(pMeta);
120✔
204
  return code;
120✔
205
}
206

207
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
18,447✔
208
  taosHashClear(pStartInfo->pReadyTaskSet);
18,447✔
209
  taosHashClear(pStartInfo->pFailedTaskSet);
18,446✔
210
  taosArrayClear(pStartInfo->pStagesList);
18,446✔
211
  taosArrayClear(pStartInfo->pRecvChkptIdTasks);
18,447✔
212

213
  pStartInfo->tasksWillRestart = 0;
18,447✔
214
  pStartInfo->readyTs = 0;
18,447✔
215
  pStartInfo->elapsedTime = 0;
18,447✔
216
  pStartInfo->curStage = 0;
18,447✔
217
  pStartInfo->partialTasksStarted = false;
18,447✔
218

219
  // reset the sentinel flag value to be 0
220
  pStartInfo->startAllTasks = 0;
18,447✔
221
  stDebug("vgId:%d clear start-all-task info", vgId);
18,447✔
222
}
18,448✔
223

224
static void streamMetaLogLaunchTasksInfo(SStreamMeta* pMeta, int32_t numOfTotal, int32_t taskId, bool ready) {
45✔
225
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
45✔
226

227
  pStartInfo->readyTs = taosGetTimestampMs();
45✔
228
  pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
45!
229

230
  for (int32_t i = 0; i < taosArrayGetSize(pStartInfo->pStagesList); ++i) {
111✔
231
    SStartTaskStageInfo* pStageInfo = taosArrayGet(pStartInfo->pStagesList, i);
66✔
232
    stDebug("vgId:%d start task procedure, stage:%d, ts:%" PRId64, pMeta->vgId, pStageInfo->stage, pStageInfo->ts);
66✔
233
  }
234

235
  stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
45✔
236
          ", readyTs:%" PRId64 " total elapsed time:%.2fs",
237
          pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
238
          pStartInfo->elapsedTime / 1000.0);
239

240
  // print the initialization elapsed time and info
241
  displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
45✔
242
  displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
45✔
243
}
45✔
244

245
int32_t streamMetaAddTaskLaunchResultNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
11,947✔
246
                                            int64_t endTs, bool ready) {
247
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
11,947✔
248
  STaskId         id = {.streamId = streamId, .taskId = taskId};
11,947✔
249
  int32_t         vgId = pMeta->vgId;
11,947✔
250
  bool            allRsp = true;
11,947✔
251
  SStreamTask*    p = NULL;
11,947✔
252

253
  int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &p);
11,947✔
254
  if (code != 0) {  // task does not exist in current vnode, not record the complete info
11,949!
255
    stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
×
256
    return 0;
×
257
  }
258

259
  streamMetaReleaseTask(pMeta, p);
11,949✔
260

261
  if (pStartInfo->startAllTasks != 1) {
11,944✔
262
    int64_t el = endTs - startTs;
11,739✔
263
    stDebug(
11,739✔
264
        "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
265
        "time:%" PRId64 "ms",
266
        vgId, taskId, ready, el);
267
    return 0;
11,739✔
268
  }
269

270
  STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
205✔
271
  SHashObj*   pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
205✔
272
  code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
205✔
273
  if (code) {
206!
274
    if (code == TSDB_CODE_DUP_KEY) {
×
275
      stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
×
276
              " already exist start results in meta start task result hashmap",
277
              vgId, id.taskId);
278
      code = 0;
×
279
    } else {
280
      stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed, code:%s", vgId,
×
281
              id.taskId, tstrerror(code));
282
    }
283
  }
284

285
  int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
206✔
286
  int32_t numOfSucc = taosHashGetSize(pStartInfo->pReadyTaskSet);
206✔
287
  int32_t numOfRecv = numOfSucc + taosHashGetSize(pStartInfo->pFailedTaskSet);
206✔
288

289
  if (pStartInfo->partialTasksStarted) {
206!
290
    int32_t newTotal = taosArrayGetSize(pStartInfo->pRecvChkptIdTasks);
×
291
    stDebug(
×
292
        "vgId:%d start all tasks procedure is interrupted by transId:%d, wait for partial tasks rsp. recv check "
293
        "downstream results, s-task:0x%x succ:%d, received:%d results, waited for tasks:%d, total tasks:%d",
294
        vgId, pMeta->updateInfo.activeTransId, taskId, ready, numOfRecv, newTotal, numOfTotal);
295

296
    allRsp = allCheckDownstreamRspPartial(pStartInfo, newTotal, pMeta->vgId);
×
297
  } else {
298
    allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
206✔
299
  }
300

301
  if (allRsp) {
206✔
302
    streamMetaLogLaunchTasksInfo(pMeta, numOfTotal, taskId, ready);
45✔
303
    streamMetaResetStartInfo(pStartInfo, vgId);
45✔
304

305
    code = pStartInfo->completeFn(pMeta);
45✔
306
  } else {
307
    stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d results, total:%d", vgId, taskId,
161✔
308
            ready, numOfRecv, numOfTotal);
309
  }
310

311
  return code;
205✔
312
}
313

314
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
5,991✔
315
                                      int64_t endTs, bool ready) {
316
  int32_t code = 0;
5,991✔
317

318
  streamMetaWLock(pMeta);
5,991✔
319
  code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, endTs, ready);
5,992✔
320
  streamMetaWUnLock(pMeta);
5,987✔
321

322
  return code;
5,992✔
323
}
324

325
// check all existed tasks are received rsp
326
bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
205✔
327
  for (int32_t i = 0; i < numOfTotal; ++i) {
586✔
328
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
541✔
329
    if (pTaskId == NULL) {
541!
330
      continue;
×
331
    }
332

333
    STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
541✔
334
    void*   px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
541✔
335
    if (px == NULL) {
542✔
336
      px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
218✔
337
      if (px == NULL) {
218✔
338
        stDebug("vgId:%d s-task:0x%x start result not rsp yet", pMeta->vgId, (int32_t)idx.taskId);
161✔
339
        return false;
161✔
340
      }
341
    }
342
  }
343

344
  return true;
45✔
345
}
346

347
bool allCheckDownstreamRspPartial(STaskStartInfo* pStartInfo, int32_t num, int32_t vgId) {
×
348
  for (int32_t i = 0; i < num; ++i) {
×
349
    STaskId* pTaskId = taosArrayGet(pStartInfo->pRecvChkptIdTasks, i);
×
350
    if (pTaskId == NULL) {
×
351
      continue;
×
352
    }
353

354
    void* px = taosHashGet(pStartInfo->pReadyTaskSet, pTaskId, sizeof(STaskId));
×
355
    if (px == NULL) {
×
356
      px = taosHashGet(pStartInfo->pFailedTaskSet, pTaskId, sizeof(STaskId));
×
357
      if (px == NULL) {
×
358
        stDebug("vgId:%d s-task:0x%x start result not rsp yet", vgId, (int32_t)pTaskId->taskId);
×
359
        return false;
×
360
      }
361
    }
362
  }
363

364
  return true;
×
365
}
366

367
void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
90✔
368
  int32_t vgId = pMeta->vgId;
90✔
369
  void*   pIter = NULL;
90✔
370
  size_t  keyLen = 0;
90✔
371

372
  stInfo("vgId:%d %d tasks complete check-downstream, %s", vgId, taosHashGetSize(pTaskSet),
90!
373
         succ ? "success" : "failed");
374

375
  while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
272✔
376
    STaskInitTs* pInfo = pIter;
182✔
377
    void*        key = taosHashGetKey(pIter, &keyLen);
182✔
378
    SStreamTask* pTask = NULL;
182✔
379
    int32_t      code = streamMetaAcquireTaskUnsafe(pMeta, key, &pTask);
182✔
380
    if (code == 0) {
182!
381
      stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", pTask->id.idStr,
182!
382
             pTask->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
383
      streamMetaReleaseTask(pMeta, pTask);
182✔
384
    } else {
385
      stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
×
386
    }
387
  }
388
}
89✔
389

390
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) {
14,729✔
391
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
14,729✔
392

393
  pStartInfo->pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
14,739✔
394
  if (pStartInfo->pReadyTaskSet == NULL) {
14,744!
395
    return terrno;
×
396
  }
397

398
  pStartInfo->pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
14,744✔
399
  if (pStartInfo->pFailedTaskSet == NULL) {
14,745!
400
    return terrno;
×
401
  }
402

403
  pStartInfo->pStagesList = taosArrayInit(4, sizeof(SStartTaskStageInfo));
14,745✔
404
  if (pStartInfo->pStagesList == NULL) {
14,745!
405
    return terrno;
×
406
  }
407

408
  pStartInfo->pRecvChkptIdTasks = taosArrayInit(4, sizeof(STaskId));
14,745✔
409
  if (pStartInfo->pRecvChkptIdTasks == NULL) {
14,745!
410
    return terrno;
×
411
  }
412

413
  pStartInfo->partialTasksStarted = false;
14,745✔
414
  return 0;
14,745✔
415
}
416

417
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) {
14,736✔
418
  streamMetaClearStartInfoPartial(pStartInfo);
14,736✔
419

420
  pStartInfo->startAllTasks = 0;
14,736✔
421
  pStartInfo->tasksWillRestart = 0;
14,736✔
422
  pStartInfo->restartCount = 0;
14,736✔
423
}
14,736✔
424

425
void streamMetaClearStartInfoPartial(STaskStartInfo* pStartInfo) {
14,736✔
426
  taosHashCleanup(pStartInfo->pReadyTaskSet);
14,736✔
427
  taosHashCleanup(pStartInfo->pFailedTaskSet);
14,737✔
428
  taosArrayDestroy(pStartInfo->pStagesList);
14,736✔
429
  taosArrayDestroy(pStartInfo->pRecvChkptIdTasks);
14,736✔
430

431
  pStartInfo->pReadyTaskSet = NULL;
14,736✔
432
  pStartInfo->pFailedTaskSet = NULL;
14,736✔
433
  pStartInfo->pStagesList = NULL;
14,736✔
434
  pStartInfo->pRecvChkptIdTasks = NULL;
14,736✔
435

436
  pStartInfo->readyTs = 0;
14,736✔
437
  pStartInfo->elapsedTime = 0;
14,736✔
438
  pStartInfo->startTs = 0;
14,736✔
439
}
14,736✔
440

441
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
8,234✔
442
  int32_t      code = 0;
8,234✔
443
  int32_t      vgId = pMeta->vgId;
8,234✔
444
  SStreamTask* pTask = NULL;
8,234✔
445
  bool         continueExec = true;
8,234✔
446

447
  stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
8,234!
448

449
  code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
8,234✔
450
  if ((pTask == NULL) || (code != 0)) {
8,235!
451
    stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
×
452
    int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId, true);
×
453
    if (ret) {
×
454
      stError("s-task:0x%x add check downstream failed, core:%s", taskId, tstrerror(ret));
×
455
    }
456

457
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
458
  }
459

460
  // fill-history task can only be launched by related stream tasks.
461
  STaskExecStatisInfo* pInfo = &pTask->execInfo;
8,235✔
462
  if (pTask->info.fillHistory == 1) {
8,235!
463
    stError("s-task:0x%x vgId:%d fill-history task, not start here", taskId, vgId);
×
464
    streamMetaReleaseTask(pMeta, pTask);
×
465
    return TSDB_CODE_SUCCESS;
×
466
  }
467

468
  // the start all tasks procedure may happen to start the newly deployed stream task, and results in the
469
  // concurrent start this task by two threads.
470
  streamMutexLock(&pTask->lock);
8,235✔
471

472
  SStreamTaskState status = streamTaskGetStatus(pTask);
8,235✔
473
  if (status.state != TASK_STATUS__UNINIT) {
8,235!
474
    stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
×
475
    continueExec = false;
×
476
  } else {
477
    continueExec = true;
8,235✔
478
  }
479
  streamMutexUnlock(&pTask->lock);
8,235✔
480

481
  if (!continueExec) {
8,235!
482
    streamMetaReleaseTask(pMeta, pTask);
×
483
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
484
  }
485

486
  if (pTask->status.downstreamReady != 0) {
8,235!
487
    stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId);
×
488
    streamMetaReleaseTask(pMeta, pTask);
×
489
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
490
  }
491

492
  streamMetaWLock(pMeta);
8,235✔
493

494
  // avoid initialization and destroy running concurrently.
495
  streamMutexLock(&pTask->lock);
8,235✔
496
  if (pTask->pBackend == NULL) {
8,235!
497
    code = pMeta->expandTaskFn(pTask);
8,235✔
498
    streamMutexUnlock(&pTask->lock);
8,234✔
499

500
    if (code != TSDB_CODE_SUCCESS) {
8,235!
501
      streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
×
502
    }
503
  } else {
504
    streamMutexUnlock(&pTask->lock);
×
505
  }
506

507
  // concurrently start task may cause the latter started task be failed, and also failed to added into meta result.
508
  if (code == TSDB_CODE_SUCCESS) {
8,235!
509
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
8,235✔
510
    if (code != TSDB_CODE_SUCCESS) {
8,234!
511
      stError("s-task:%s vgId:%d failed to handle event:init-task, code:%s", pTask->id.idStr, pMeta->vgId,
×
512
              tstrerror(code));
513

514
      // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
515
      if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
516
        streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
×
517
      }
518
    }
519
  }
520

521
  streamMetaWUnLock(pMeta);
8,234✔
522
  streamMetaReleaseTask(pMeta, pTask);
8,235✔
523

524
  return code;
8,235✔
525
}
526

527
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
10,473✔
528
  streamMetaWLock(pMeta);
10,473✔
529

530
  SArray* pTaskList = NULL;
10,491✔
531
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
10,491✔
532
  stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
10,485✔
533

534
  if (num == 0) {
10,486✔
535
    stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
10,311✔
536
    streamMetaWUnLock(pMeta);
10,311✔
537
    return TSDB_CODE_SUCCESS;
10,302✔
538
  }
539

540
  int64_t st = taosGetTimestampMs();
175✔
541

542
  // send hb msg to mnode before closing all tasks.
543
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
175✔
544
  if (code != TSDB_CODE_SUCCESS) {
175!
545
    streamMetaWUnLock(pMeta);
×
546
    return code;
×
547
  }
548

549
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
175✔
550
  for (int32_t i = 0; i < numOfTasks; ++i) {
693✔
551
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
518✔
552
    SStreamTask*   pTask = NULL;
518✔
553

554
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
518✔
555
    if (code != TSDB_CODE_SUCCESS) {
518✔
556
      continue;
174✔
557
    }
558

559
    int32_t ret = streamTaskStop(pTask);
344✔
560
    if (ret) {
344!
561
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
562
    }
563

564
    streamMetaReleaseTask(pMeta, pTask);
344✔
565
  }
566

567
  taosArrayDestroy(pTaskList);
175✔
568

569
  double el = (taosGetTimestampMs() - st) / 1000.0;
175✔
570
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
175✔
571

572
  streamMetaWUnLock(pMeta);
175✔
573
  return code;
175✔
574
}
575

576
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
57,552✔
577
  SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
57,552✔
578
  int32_t           vgId = pTask->pMeta->vgId;
57,552✔
579

580
  if (pTask->pMeta->startInfo.curStage == START_MARK_REQ_CHKPID) {
57,552✔
581
    if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
196!
582
      // mark the sending of req consensus checkpoint request.
583
      pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
196✔
584
      stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr, vgId,
196✔
585
              pConChkptInfo->statusTs);
586
      return 1;
196✔
587
    } else if (pConChkptInfo->status == 0) {
×
588
      stDebug("vgId:%d s-task:%s not need to set the req checkpointId, current stage:%d", vgId, pTask->id.idStr,
×
589
              pConChkptInfo->status);
590
    } else {
591
      stWarn("vgId:%d, s-task:%s restart procedure expired, start stage:%d", vgId, pTask->id.idStr,
×
592
             pConChkptInfo->status);
593
    }
594
  }
595

596
  return 0;
57,356✔
597
}
598

599
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) {
100✔
600
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
100✔
601
  pInfo->consenChkptTransId = transId;
100✔
602
  pInfo->status = TASK_CONSEN_CHKPT_RECV;
100✔
603
  pInfo->statusTs = ts;
100✔
604

605
  stInfo("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
100!
606
}
100✔
607

608
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
352✔
609
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
352✔
610
  int32_t           prevTrans = pInfo->consenChkptTransId;
352✔
611

612
  pInfo->status = TASK_CONSEN_CHKPT_REQ;
352✔
613
  pInfo->statusTs = ts;
352✔
614
  pInfo->consenChkptTransId = 0;
352✔
615

616
  stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64 ", task created ts:%" PRId64,
352✔
617
          pTask->id.idStr, prevTrans, ts, pTask->execInfo.created);
618
}
352✔
619

620
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, bool lock) {
115✔
621
  int32_t      code = TSDB_CODE_SUCCESS;
115✔
622
  int64_t      now = taosGetTimestampMs();
115✔
623
  int64_t      startTs = 0;
115✔
624
  bool         hasFillhistoryTask = false;
115✔
625
  STaskId      hId = {0};
115✔
626
  STaskId      id = {.streamId = streamId, .taskId = taskId};
115✔
627
  SStreamTask* pTask = NULL;
115✔
628

629
  stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
115!
630

631
  if (lock) {
115!
632
    streamMetaRLock(pMeta);
115✔
633
  }
634

635
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
115✔
636
  if (code == 0) {
115✔
637
    startTs = pTask->taskCheckInfo.startTs;
1✔
638
    hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(pTask);
1✔
639
    hId = pTask->hTaskInfo.id;
1✔
640
    streamMetaReleaseTask(pMeta, pTask);
1✔
641

642
    if (lock) {
1!
643
      streamMetaRUnLock(pMeta);
1✔
644
    }
645

646
    // add the failed task info, along with the related fill-history task info into tasks list.
647
    if (lock) {
1!
648
      code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
1✔
649
      if (hasFillhistoryTask) {
1!
650
        code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
×
651
      }
652
    } else {
653
      code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, now, false);
×
654
      if (hasFillhistoryTask) {
×
655
        code = streamMetaAddTaskLaunchResultNoLock(pMeta, hId.streamId, hId.taskId, startTs, now, false);
×
656
      }
657
    }
658
  } else {
659
    if (lock) {
114!
660
      streamMetaRUnLock(pMeta);
114✔
661
    }
662

663
    stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
114!
664
            streamId, taskId, pMeta->vgId);
665
    code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
114✔
666
  }
667

668
  return code;
115✔
669
}
670

671
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs, bool lock) {
26✔
672
  int32_t startTs = pTask->execInfo.checkTs;
26✔
673
  int32_t code = 0;
26✔
674

675
  if (lock) {
26!
676
    code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
26✔
677
  } else {
678
    code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs,
×
679
                                               false);
680
  }
681

682
  if (code) {
26!
683
    stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
×
684
  }
685

686
  // automatically set the related fill-history task to be failed.
687
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
26✔
688
    STaskId* pId = &pTask->hTaskInfo.id;
24✔
689

690
    if (lock) {
24!
691
      code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
24✔
692
    } else {
693
      code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
×
694
    }
695

696
    if (code) {
24!
697
      stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
×
698
    }
699
  }
700
}
26✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc