• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3663

19 Mar 2025 09:21AM UTC coverage: 61.664% (-0.6%) from 62.28%
#3663

push

travis-ci

web-flow
docs: add defination of tmq_config_res_t & fix spell error (#30271)

153169 of 318241 branches covered (48.13%)

Branch coverage included in aggregate %.

239405 of 318390 relevant lines covered (75.19%)

5762846.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.56
/source/libs/stream/src/streamStartTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tmisce.h"
20
#include "tref.h"
21
#include "tsched.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25

26
typedef struct STaskInitTs {
27
  int64_t start;
28
  int64_t end;
29
  bool    success;
30
} STaskInitTs;
31

32
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now);
33
static bool    allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
34
static void    displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
35

36
// restore the checkpoint id by negotiating the latest consensus checkpoint id
37
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
9,709✔
38
  int32_t code = TSDB_CODE_SUCCESS;
9,709✔
39
  int32_t vgId = pMeta->vgId;
9,709✔
40
  int64_t now = taosGetTimestampMs();
9,707✔
41
  SArray* pTaskList = NULL;
9,707✔
42
  int32_t numOfConsensusChkptIdTasks = 0;
9,707✔
43
  int32_t numOfTasks = 0;
9,707✔
44

45
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
9,707✔
46
  if (numOfTasks == 0) {
9,711✔
47
    stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
9,606!
48
    streamMetaResetStartInfo(&pMeta->startInfo, vgId);
9,608✔
49
    return TSDB_CODE_SUCCESS;
9,608✔
50
  }
51

52
  stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
105!
53

54
  code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
105✔
55
  if (code != TSDB_CODE_SUCCESS) {
105!
56
    return TSDB_CODE_SUCCESS;  // ignore the error and return directly
×
57
  }
58

59
  // broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
60
  numOfTasks = taosArrayGetSize(pTaskList);
105✔
61

62
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
63
  // initialization, when the operation of check downstream tasks status is executed far quickly.
64
  for (int32_t i = 0; i < numOfTasks; ++i) {
512✔
65
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
407✔
66
    SStreamTask*   pTask = NULL;
407✔
67

68
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
407✔
69
    if ((pTask == NULL) || (code != 0)) {
408!
70
      stError("vgId:%d failed to acquire task:0x%x during start task, it may be dropped", pMeta->vgId, pTaskId->taskId);
×
71
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false);
×
72
      if (ret) {
×
73
        stError("s-task:0x%x add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
74
      }
75
      continue;
×
76
    }
77

78
    if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
408!
79
      code = pMeta->expandTaskFn(pTask);
50✔
80
      if (code != TSDB_CODE_SUCCESS) {
50!
81
        stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
×
82
        streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs, false);
×
83
      }
84
    }
85

86
    streamMetaReleaseTask(pMeta, pTask);
408✔
87
  }
88

89
  // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
90
  for (int32_t i = 0; i < numOfTasks; ++i) {
511✔
91
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
403✔
92

93
    SStreamTask* pTask = NULL;
406✔
94
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
406✔
95
    if ((pTask == NULL) || (code != 0)) {
408!
96
      stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
×
97
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false);
×
98
      if (ret) {
×
99
        stError("s-task:0x%x failed add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
100
      }
101

102
      continue;
50✔
103
    }
104

105
    STaskExecStatisInfo* pInfo = &pTask->execInfo;
408✔
106

107
    // fill-history task can only be launched by related stream tasks.
108
    if (pTask->info.fillHistory == 1) {
408✔
109
      stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
25!
110
      streamMetaReleaseTask(pMeta, pTask);
25✔
111
      continue;
25✔
112
    }
113

114
    // ready now, start the related fill-history task
115
    if (pTask->status.downstreamReady == 1) {
383!
116
      if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
117
        stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
×
118
                pTask->id.idStr);
119
        code = streamLaunchFillHistoryTask(pTask, false);  // todo: how about retry launch fill-history task?
×
120
        if (code) {
×
121
          stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
×
122
        }
123
      }
124

125
      code = streamMetaAddTaskLaunchResultNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs,
×
126
                                                 pInfo->readyTs, true);
127
      streamMetaReleaseTask(pMeta, pTask);
×
128
      continue;
×
129
    }
130

131
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
383✔
132
      int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
25✔
133
      if (ret != TSDB_CODE_SUCCESS) {
25!
134
        stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
×
135
        code = ret;
×
136

137
        // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
138
        if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
139
          streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
×
140
        }
141
      }
142

143
      streamMetaReleaseTask(pMeta, pTask);
25✔
144
      continue;
25✔
145
    }
146

147
    // negotiate the consensus checkpoint id for current task
148
    code = streamTaskSendNegotiateChkptIdMsg(pTask);
358✔
149
    if (code == 0) {
358!
150
      numOfConsensusChkptIdTasks += 1;
358✔
151
    }
152

153
    // this task may have no checkpoint, but others tasks may generate checkpoint already?
154
    streamMetaReleaseTask(pMeta, pTask);
358✔
155
  }
156

157
  if (numOfConsensusChkptIdTasks > 0) {
108✔
158
    pMeta->startInfo.curStage = START_MARK_REQ_CHKPID;
92✔
159
    SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
92✔
160

161
    taosArrayPush(pMeta->startInfo.pStagesList, &info);
92✔
162
    stDebug("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64 " numOfStageHist:%d", pMeta->vgId, numOfConsensusChkptIdTasks,
92✔
163
            info.ts, (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList));
164
  }
165

166
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
167
  // initialization, when the operation of check downstream tasks status is executed far quickly.
168
  stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
108✔
169
  taosArrayDestroy(pTaskList);
108✔
170
  return code;
105✔
171
}
172

173
int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
105✔
174
  STaskStartInfo* pInfo = &pMeta->startInfo;
105✔
175
  if (pMeta->closeFlag) {
105!
176
    stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
×
177
    return TSDB_CODE_FAILED;
×
178
  }
179

180
  *pList = taosArrayDup(pMeta->pTaskList, NULL);
105✔
181
  if (*pList == NULL) {
105!
182
    stError("vgId:%d failed to dup tasklist, before restart tasks, code:%s", pMeta->vgId, tstrerror(terrno));
×
183
    return terrno;
×
184
  }
185

186
  taosHashClear(pInfo->pReadyTaskSet);
105✔
187
  taosHashClear(pInfo->pFailedTaskSet);
105✔
188
  taosArrayClear(pInfo->pStagesList);
105✔
189

190
  pInfo->curStage = 0;
105✔
191
  pInfo->startTs = now;
105✔
192

193
  int32_t code = streamMetaResetTaskStatus(pMeta);
105✔
194
  return code;
105✔
195
}
196

197
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
16,091✔
198
  taosHashClear(pStartInfo->pReadyTaskSet);
16,091✔
199
  taosHashClear(pStartInfo->pFailedTaskSet);
16,089✔
200
  taosArrayClear(pStartInfo->pStagesList);
16,090✔
201

202
  pStartInfo->tasksWillRestart = 0;
16,089✔
203
  pStartInfo->readyTs = 0;
16,089✔
204
  pStartInfo->elapsedTime = 0;
16,089✔
205
  pStartInfo->curStage = 0;
16,089✔
206

207
  // reset the sentinel flag value to be 0
208
  pStartInfo->startAllTasks = 0;
16,089✔
209
  stDebug("vgId:%d clear start-all-task info", vgId);
16,089✔
210
}
16,091✔
211

212
static void streamMetaLogLaunchTasksInfo(SStreamMeta* pMeta, int32_t numOfTotal, int32_t taskId, bool ready) {
45✔
213
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
45✔
214

215
  pStartInfo->readyTs = taosGetTimestampMs();
45✔
216
  pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
45!
217

218
  for (int32_t i = 0; i < taosArrayGetSize(pStartInfo->pStagesList); ++i) {
141✔
219
    SStartTaskStageInfo* pStageInfo = taosArrayGet(pStartInfo->pStagesList, i);
96✔
220
    stDebug("vgId:%d start task procedure, stage:%d, ts:%" PRId64, pMeta->vgId, pStageInfo->stage, pStageInfo->ts);
96✔
221
  }
222

223
  stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
45✔
224
          ", readyTs:%" PRId64 " total elapsed time:%.2fs",
225
          pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
226
          pStartInfo->elapsedTime / 1000.0);
227

228
  // print the initialization elapsed time and info
229
  displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
45✔
230
  displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
45✔
231
}
45✔
232

233
int32_t streamMetaAddTaskLaunchResultNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId,
14,069✔
234
                                                   int64_t startTs, int64_t endTs, bool ready) {
235
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
14,069✔
236
  STaskId         id = {.streamId = streamId, .taskId = taskId};
14,069✔
237
  int32_t         vgId = pMeta->vgId;
14,069✔
238
  bool            allRsp = true;
14,069✔
239
  SStreamTask*    p = NULL;
14,069✔
240

241
  int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &p);
14,069✔
242
  if (code != 0) {  // task does not exist in current vnode, not record the complete info
14,070!
243
    stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
×
244
    return 0;
×
245
  }
246

247
  streamMetaReleaseTask(pMeta, p);
14,070✔
248

249
  if (pStartInfo->startAllTasks != 1) {
14,070✔
250
    int64_t el = endTs - startTs;
13,869✔
251
    stDebug(
13,869✔
252
        "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
253
        "time:%" PRId64 "ms",
254
        vgId, taskId, ready, el);
255
    return 0;
13,869✔
256
  }
257

258
  STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
201✔
259
  SHashObj*   pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
201✔
260
  code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
201✔
261
  if (code) {
201!
262
    if (code == TSDB_CODE_DUP_KEY) {
×
263
      stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
×
264
              " already exist start results in meta start task result hashmap",
265
              vgId, id.taskId);
266
      code = 0;
×
267
    } else {
268
      stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed, code:%s", vgId,
×
269
              id.taskId, tstrerror(code));
270
    }
271
  }
272

273
  int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
201✔
274
  int32_t numOfSucc = taosHashGetSize(pStartInfo->pReadyTaskSet);
201✔
275
  int32_t numOfRecv = numOfSucc + taosHashGetSize(pStartInfo->pFailedTaskSet);
201✔
276

277
  allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
201✔
278
  if (allRsp) {
201✔
279
    streamMetaLogLaunchTasksInfo(pMeta, numOfTotal, taskId, ready);
45✔
280
    streamMetaResetStartInfo(pStartInfo, vgId);
45✔
281

282
    code = pStartInfo->completeFn(pMeta);
45✔
283
  } else {
284
    stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
156✔
285
            numOfRecv, numOfTotal);
286
  }
287

288
  return code;
201✔
289
}
290

291
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
7,069✔
292
                                      int64_t endTs, bool ready) {
293
  int32_t code = 0;
7,069✔
294

295
  streamMetaWLock(pMeta);
7,069✔
296
  code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, endTs, ready);
7,069✔
297
  streamMetaWUnLock(pMeta);
7,069✔
298

299
  return code;
7,069✔
300
}
301

302
// check all existed tasks are received rsp
303
bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
201✔
304
  for (int32_t i = 0; i < numOfTotal; ++i) {
552✔
305
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
507✔
306
    if (pTaskId == NULL) {
507!
307
      continue;
×
308
    }
309

310
    STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
507✔
311
    void*   px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
507✔
312
    if (px == NULL) {
507✔
313
      px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
183✔
314
      if (px == NULL) {
183✔
315
        stDebug("vgId:%d s-task:0x%x start result not rsp yet", pMeta->vgId, (int32_t) idx.taskId);
156✔
316
        return false;
156✔
317
      }
318
    }
319
  }
320

321
  return true;
45✔
322
}
323

324
void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
90✔
325
  int32_t vgId = pMeta->vgId;
90✔
326
  void*   pIter = NULL;
90✔
327
  size_t  keyLen = 0;
90✔
328

329
  stInfo("vgId:%d %d tasks complete check-downstream, %s", vgId, taosHashGetSize(pTaskSet),
90!
330
         succ ? "success" : "failed");
331

332
  while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
251✔
333
    STaskInitTs* pInfo = pIter;
161✔
334
    void*        key = taosHashGetKey(pIter, &keyLen);
161✔
335
    SStreamTask* pTask = NULL;
161✔
336
    int32_t      code = streamMetaAcquireTaskUnsafe(pMeta, key, &pTask);
161✔
337
    if (code == 0) {
161!
338
      stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", pTask->id.idStr,
161!
339
             pTask->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
340
      streamMetaReleaseTask(pMeta, pTask);
161✔
341
    } else {
342
      stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
×
343
    }
344
  }
345
}
90✔
346

347
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) {
11,656✔
348
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
11,656✔
349

350
  pStartInfo->pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
11,676✔
351
  if (pStartInfo->pReadyTaskSet == NULL) {
11,677!
352
    return terrno;
×
353
  }
354

355
  pStartInfo->pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
11,677✔
356
  if (pStartInfo->pFailedTaskSet == NULL) {
11,675!
357
    return terrno;
×
358
  }
359

360
  pStartInfo->pStagesList = taosArrayInit(4, sizeof(SStartTaskStageInfo));
11,675✔
361
  if (pStartInfo->pStagesList == NULL) {
11,677!
362
    return terrno;
×
363
  }
364

365
  return 0;
11,677✔
366
}
367

368
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) {
11,667✔
369
  taosHashCleanup(pStartInfo->pReadyTaskSet);
11,667✔
370
  taosHashCleanup(pStartInfo->pFailedTaskSet);
11,669✔
371
  taosArrayDestroy(pStartInfo->pStagesList);
11,669✔
372

373
  pStartInfo->readyTs = 0;
11,669✔
374
  pStartInfo->elapsedTime = 0;
11,669✔
375
  pStartInfo->startTs = 0;
11,669✔
376
  pStartInfo->startAllTasks = 0;
11,669✔
377
  pStartInfo->tasksWillRestart = 0;
11,669✔
378
  pStartInfo->restartCount = 0;
11,669✔
379
}
11,669✔
380

381
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
9,306✔
382
  int32_t      code = 0;
9,306✔
383
  int32_t      vgId = pMeta->vgId;
9,306✔
384
  SStreamTask* pTask = NULL;
9,306✔
385
  bool         continueExec = true;
9,306✔
386

387
  stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
9,306!
388

389
  code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
9,306✔
390
  if ((pTask == NULL) || (code != 0)) {
9,305!
391
    stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
×
392
    int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId, true);
×
393
    if (ret) {
×
394
      stError("s-task:0x%x add check downstream failed, core:%s", taskId, tstrerror(ret));
×
395
    }
396

397
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
398
  }
399

400
  // fill-history task can only be launched by related stream tasks.
401
  STaskExecStatisInfo* pInfo = &pTask->execInfo;
9,305✔
402
  if (pTask->info.fillHistory == 1) {
9,305!
403
    stError("s-task:0x%x vgId:%d fill-history task, not start here", taskId, vgId);
×
404
    streamMetaReleaseTask(pMeta, pTask);
×
405
    return TSDB_CODE_SUCCESS;
×
406
  }
407

408
  // the start all tasks procedure may happen to start the newly deployed stream task, and results in the
409
  // concurrent start this task by two threads.
410
  streamMutexLock(&pTask->lock);
9,305✔
411

412
  SStreamTaskState status = streamTaskGetStatus(pTask);
9,306✔
413
  if (status.state != TASK_STATUS__UNINIT) {
9,306!
414
    stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
×
415
    continueExec = false;
×
416
  } else {
417
    continueExec = true;
9,306✔
418
  }
419
  streamMutexUnlock(&pTask->lock);
9,306✔
420

421
  if (!continueExec) {
9,306!
422
    streamMetaReleaseTask(pMeta, pTask);
×
423
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
424
  }
425

426
  if (pTask->status.downstreamReady != 0) {
9,306!
427
    stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId);
×
428
    streamMetaReleaseTask(pMeta, pTask);
×
429
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
430
  }
431

432
  streamMetaWLock(pMeta);
9,306✔
433

434
  // avoid initialization and destroy running concurrently.
435
  streamMutexLock(&pTask->lock);
9,306✔
436
  if (pTask->pBackend == NULL) {
9,306!
437
    code = pMeta->expandTaskFn(pTask);
9,306✔
438
    streamMutexUnlock(&pTask->lock);
9,306✔
439

440
    if (code != TSDB_CODE_SUCCESS) {
9,304!
441
      streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
×
442
    }
443
  } else {
444
    streamMutexUnlock(&pTask->lock);
×
445
  }
446

447
  // concurrently start task may cause the latter started task be failed, and also failed to added into meta result.
448
  if (code == TSDB_CODE_SUCCESS) {
9,304!
449
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
9,304✔
450
    if (code != TSDB_CODE_SUCCESS) {
9,306!
451
      stError("s-task:%s vgId:%d failed to handle event:init-task, code:%s", pTask->id.idStr, pMeta->vgId,
×
452
              tstrerror(code));
453

454
      // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
455
      if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
456
        streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
×
457
      }
458
    }
459
  }
460

461
  streamMetaWUnLock(pMeta);
9,306✔
462
  streamMetaReleaseTask(pMeta, pTask);
9,306✔
463

464
  return code;
9,306✔
465
}
466

467
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
8,719✔
468
  streamMetaRLock(pMeta);
8,719✔
469

470
  SArray* pTaskList = NULL;
8,728✔
471
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
8,728✔
472
  stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
8,725✔
473

474
  if (num == 0) {
8,728✔
475
    stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
8,567✔
476
    streamMetaRUnLock(pMeta);
8,567✔
477
    return TSDB_CODE_SUCCESS;
8,561✔
478
  }
479

480
  int64_t st = taosGetTimestampMs();
161✔
481

482
  // send hb msg to mnode before closing all tasks.
483
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
161✔
484
  if (code != TSDB_CODE_SUCCESS) {
161!
485
    streamMetaRUnLock(pMeta);
×
486
    return code;
×
487
  }
488

489
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
161✔
490
  for (int32_t i = 0; i < numOfTasks; ++i) {
609✔
491
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
448✔
492
    SStreamTask*   pTask = NULL;
448✔
493

494
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
448✔
495
    if (code != TSDB_CODE_SUCCESS) {
448✔
496
      continue;
42✔
497
    }
498

499
    int32_t ret = streamTaskStop(pTask);
406✔
500
    if (ret) {
406!
501
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
502
    }
503

504
    streamMetaReleaseTask(pMeta, pTask);
406✔
505
  }
506

507
  taosArrayDestroy(pTaskList);
161✔
508

509
  double el = (taosGetTimestampMs() - st) / 1000.0;
161✔
510
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
161✔
511

512
  streamMetaRUnLock(pMeta);
161✔
513
  return code;
161✔
514
}
515

516
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
31,276✔
517
  SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
31,276✔
518
  int32_t           vgId = pTask->pMeta->vgId;
31,276✔
519

520
  if (pTask->pMeta->startInfo.curStage == START_MARK_REQ_CHKPID) {
31,276✔
521
    if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
192!
522
      // mark the sending of req consensus checkpoint request.
523
      pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
192✔
524
      stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr, vgId,
192✔
525
              pConChkptInfo->statusTs);
526
      return 1;
192✔
527
    } else if (pConChkptInfo->status == 0) {
×
528
      stDebug("vgId:%d s-task:%s not need to set the req checkpointId, current stage:%d", vgId, pTask->id.idStr,
×
529
              pConChkptInfo->status);
530
    } else {
531
      stWarn("vgId:%d, s-task:%s restart procedure expired, start stage:%d", vgId, pTask->id.idStr,
×
532
             pConChkptInfo->status);
533
    }
534
  }
535

536
  return 0;
31,084✔
537
}
538

539
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) {
154✔
540
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
154✔
541
  pInfo->consenChkptTransId = transId;
154✔
542
  pInfo->status = TASK_CONSEN_CHKPT_RECV;
154✔
543
  pInfo->statusTs = ts;
154✔
544

545
  stInfo("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
154!
546
}
154✔
547

548
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
358✔
549
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
358✔
550
  int32_t           prevTrans = pInfo->consenChkptTransId;
358✔
551

552
  pInfo->status = TASK_CONSEN_CHKPT_REQ;
358✔
553
  pInfo->statusTs = ts;
358✔
554
  pInfo->consenChkptTransId = 0;
358✔
555

556
  stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64 ", task created ts:%" PRId64,
358✔
557
          pTask->id.idStr, prevTrans, ts, pTask->execInfo.created);
558
}
358✔
559

560
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, bool lock) {
1✔
561
  int32_t      code = TSDB_CODE_SUCCESS;
1✔
562
  int64_t      now = taosGetTimestampMs();
1✔
563
  int64_t      startTs = 0;
1✔
564
  bool         hasFillhistoryTask = false;
1✔
565
  STaskId      hId = {0};
1✔
566
  STaskId      id = {.streamId = streamId, .taskId = taskId};
1✔
567
  SStreamTask* pTask = NULL;
1✔
568

569
  stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
1!
570

571
  if (lock) {
1!
572
    streamMetaRLock(pMeta);
1✔
573
  }
574

575
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
1✔
576
  if (code == 0) {
1!
577
    startTs = pTask->taskCheckInfo.startTs;
1✔
578
    hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(pTask);
1✔
579
    hId = pTask->hTaskInfo.id;
1✔
580
    streamMetaReleaseTask(pMeta, pTask);
1✔
581

582
    if (lock) {
1!
583
      streamMetaRUnLock(pMeta);
1✔
584
    }
585

586
    // add the failed task info, along with the related fill-history task info into tasks list.
587
    if (lock) {
1!
588
      code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
1✔
589
      if (hasFillhistoryTask) {
1!
590
        code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
×
591
      }
592
    } else {
593
      code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, now, false);
×
594
      if (hasFillhistoryTask) {
×
595
        code = streamMetaAddTaskLaunchResultNoLock(pMeta, hId.streamId, hId.taskId, startTs, now, false);
×
596
      }
597
    }
598
  } else {
599
    if (lock) {
×
600
      streamMetaRUnLock(pMeta);
×
601
    }
602

603
    stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
×
604
            streamId, taskId, pMeta->vgId);
605
    code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
606
  }
607

608
  return code;
1✔
609
}
610

611
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs, bool lock) {
10✔
612
  int32_t startTs = pTask->execInfo.checkTs;
10✔
613
  int32_t code = 0;
10✔
614

615
  if (lock) {
10!
616
    code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
10✔
617
  } else {
618
    code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs,
×
619
                                               false);
620
  }
621

622
  if (code) {
10!
623
    stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
×
624
  }
625

626
  // automatically set the related fill-history task to be failed.
627
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
10✔
628
    STaskId* pId = &pTask->hTaskInfo.id;
9✔
629

630
    if (lock) {
9!
631
      code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
9✔
632
    } else {
633
      code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
×
634
    }
635

636
    if (code) {
9!
637
      stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
×
638
    }
639
  }
640
}
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc