• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3526

10 Nov 2024 03:50AM UTC coverage: 60.225% (-0.6%) from 60.818%
#3526

push

travis-ci

web-flow
Merge pull request #28709 from taosdata/main

merge: from main to 3.0 branch

117031 of 249004 branches covered (47.0%)

Branch coverage included in aggregate %.

130 of 169 new or added lines in 23 files covered. (76.92%)

4149 existing lines in 176 files now uncovered.

197577 of 273386 relevant lines covered (72.27%)

5840219.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.61
/source/libs/stream/src/streamStartTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tmisce.h"
20
#include "tref.h"
21
#include "tsched.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25

26
typedef struct STaskInitTs {
27
  int64_t start;
28
  int64_t end;
29
  bool    success;
30
} STaskInitTs;
31

32
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now);
33
static bool    allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
34
static void    displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
35

36
// restore the checkpoint id by negotiating the latest consensus checkpoint id
37
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
10,176✔
38
  int32_t code = TSDB_CODE_SUCCESS;
10,176✔
39
  int32_t vgId = pMeta->vgId;
10,176✔
40
  int64_t now = taosGetTimestampMs();
10,176✔
41
  SArray* pTaskList = NULL;
10,176✔
42

43
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
10,176✔
44
  stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
10,176!
45

46
  if (numOfTasks == 0) {
10,176✔
47
    stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
10,164!
48
    return TSDB_CODE_SUCCESS;
10,164✔
49
  }
50

51
  code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
12✔
52
  if (code != TSDB_CODE_SUCCESS) {
12!
53
    return TSDB_CODE_SUCCESS;  // ignore the error and return directly
×
54
  }
55

56
  // broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
57
  numOfTasks = taosArrayGetSize(pTaskList);
12✔
58

59
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
60
  // initialization, when the operation of check downstream tasks status is executed far quickly.
61
  for (int32_t i = 0; i < numOfTasks; ++i) {
38✔
62
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
26✔
63
    SStreamTask*   pTask = NULL;
26✔
64
    code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
26✔
65
    if ((pTask == NULL) || (code != 0)) {
26!
66
      stError("vgId:%d failed to acquire task:0x%x during start task, it may be dropped", pMeta->vgId, pTaskId->taskId);
×
67
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
×
68
      if (ret) {
×
69
        stError("s-task:0x%x add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
70
      }
71
      continue;
×
72
    }
73

74
    if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
26!
75
      code = pMeta->expandTaskFn(pTask);
16✔
76
      if (code != TSDB_CODE_SUCCESS) {
16!
77
        stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
×
78
        streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
×
79
      }
80
    }
81

82
    streamMetaReleaseTask(pMeta, pTask);
26✔
83
  }
84

85
  // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
86
  for (int32_t i = 0; i < numOfTasks; ++i) {
38✔
87
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
26✔
88

89
    SStreamTask* pTask = NULL;
26✔
90
    code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
26✔
91
    if ((pTask == NULL )|| (code != 0)) {
26!
92
      stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
×
93
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
×
94
      if (ret) {
×
95
        stError("s-task:0x%x failed add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
96
      }
97

98
      continue;
16✔
99
    }
100

101
    STaskExecStatisInfo* pInfo = &pTask->execInfo;
26✔
102

103
    // fill-history task can only be launched by related stream tasks.
104
    if (pTask->info.fillHistory == 1) {
26✔
105
      stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
8!
106
      streamMetaReleaseTask(pMeta, pTask);
8✔
107
      continue;
8✔
108
    }
109

110
    // ready now, start the related fill-history task
111
    if (pTask->status.downstreamReady == 1) {
18!
112
      if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
113
        stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
×
114
                pTask->id.idStr);
115
        code = streamLaunchFillHistoryTask(pTask);  // todo: how about retry launch fill-history task?
×
116
        if (code) {
×
117
          stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
×
118
        }
119
      }
120

121
      code = streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
×
122
                                           true);
123
      streamMetaReleaseTask(pMeta, pTask);
×
124
      continue;
×
125
    }
126

127
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
18✔
128
      int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
8✔
129
      if (ret != TSDB_CODE_SUCCESS) {
8!
130
        stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
×
131
        code = ret;
×
132

133
        // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
134
        if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
135
          streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
×
136
        }
137
      }
138

139
      streamMetaReleaseTask(pMeta, pTask);
8✔
140
      continue;
8✔
141
    }
142

143
    // negotiate the consensus checkpoint id for current task
144
    code = streamTaskSendNegotiateChkptIdMsg(pTask);
10✔
145

146
    // this task may has no checkpoint, but others tasks may generate checkpoint already?
147
    streamMetaReleaseTask(pMeta, pTask);
10✔
148
  }
149

150
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
151
  // initialization, when the operation of check downstream tasks status is executed far quickly.
152
  stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
12!
153
  taosArrayDestroy(pTaskList);
12✔
154
  return code;
12✔
155
}
156

157
int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
12✔
158
  streamMetaWLock(pMeta);
12✔
159

160
  if (pMeta->closeFlag) {
12!
161
    streamMetaWUnLock(pMeta);
×
162
    stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
×
163
    return TSDB_CODE_FAILED;
×
164
  }
165

166
  *pList = taosArrayDup(pMeta->pTaskList, NULL);
12✔
167
  if (*pList == NULL) {
12!
168
    return terrno;
×
169
  }
170

171
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
12✔
172
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
12✔
173
  pMeta->startInfo.startTs = now;
12✔
174

175
  int32_t code = streamMetaResetTaskStatus(pMeta);
12✔
176
  streamMetaWUnLock(pMeta);
12✔
177

178
  return code;
12✔
179
}
180

181
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
1,041✔
182
  taosHashClear(pStartInfo->pReadyTaskSet);
1,041✔
183
  taosHashClear(pStartInfo->pFailedTaskSet);
1,041✔
184
  pStartInfo->tasksWillRestart = 0;
1,041✔
185
  pStartInfo->readyTs = 0;
1,041✔
186
  pStartInfo->elapsedTime = 0;
1,041✔
187

188
  // reset the sentinel flag value to be 0
189
  pStartInfo->startAllTasks = 0;
1,041✔
190
  stDebug("vgId:%d clear start-all-task info", vgId);
1,041!
191
}
1,041✔
192

193
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
7,988✔
194
                                      int64_t endTs, bool ready) {
195
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
7,988✔
196
  STaskId         id = {.streamId = streamId, .taskId = taskId};
7,988✔
197
  int32_t         vgId = pMeta->vgId;
7,988✔
198
  bool            allRsp = true;
7,988✔
199
  SStreamTask*    p = NULL;
7,988✔
200

201
  streamMetaWLock(pMeta);
7,988✔
202
  int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &p);
7,988✔
203
  if (code != 0) {  // task does not exist in current vnode, not record the complete info
7,988✔
204
    stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
1!
205
    streamMetaWUnLock(pMeta);
1✔
206
    return 0;
1✔
207
  }
208

209
  streamMetaReleaseTask(pMeta, p);
7,987✔
210

211
  if (pStartInfo->startAllTasks != 1) {
7,987✔
212
    int64_t el = endTs - startTs;
5,772✔
213
    stDebug(
5,772!
214
        "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
215
        "time:%" PRId64 "ms",
216
        vgId, taskId, ready, el);
217
    streamMetaWUnLock(pMeta);
5,772✔
218
    return 0;
5,772✔
219
  }
220

221
  STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
2,215✔
222
  SHashObj*   pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
2,215!
223
  code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
2,215✔
224
  if (code) {
2,215!
UNCOV
225
    if (code == TSDB_CODE_DUP_KEY) {
×
UNCOV
226
      stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
×
227
              " already exist start results in meta start task result hashmap",
228
              vgId, id.taskId);
229
    } else {
230
      stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
×
231
    }
UNCOV
232
    streamMetaWUnLock(pMeta);
×
UNCOV
233
    return code;
×
234
  }
235

236
  int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
2,215✔
237
  int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
2,215✔
238

239
  allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
2,215✔
240
  if (allRsp) {
2,215✔
241
    pStartInfo->readyTs = taosGetTimestampMs();
1,041✔
242
    pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
1,041✔
243

244
    stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
1,041!
245
                ", readyTs:%" PRId64 " total elapsed time:%.2fs",
246
            vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
247
            pStartInfo->elapsedTime / 1000.0);
248

249
    // print the initialization elapsed time and info
250
    displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
1,041✔
251
    displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
1,041✔
252
    streamMetaResetStartInfo(pStartInfo, vgId);
1,041✔
253
    streamMetaWUnLock(pMeta);
1,041✔
254

255
    code = pStartInfo->completeFn(pMeta);
1,041✔
256
  } else {
257
    streamMetaWUnLock(pMeta);
1,174✔
258
    stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
1,174!
259
            numOfRecv, numOfTotal);
260
  }
261

262
  return code;
2,215✔
263
}
264

265
// check all existed tasks are received rsp
266
bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
2,215✔
267
  for (int32_t i = 0; i < numOfTotal; ++i) {
6,045✔
268
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
5,004✔
269
    if (pTaskId == NULL) {
5,004!
270
      continue;
×
271
    }
272

273
    STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
5,004✔
274
    void*   px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
5,004✔
275
    if (px == NULL) {
5,004✔
276
      px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
1,174✔
277
      if (px == NULL) {
1,174!
278
        return false;
1,174✔
279
      }
280
    }
281
  }
282

283
  return true;
1,041✔
284
}
285

286
void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
2,082✔
287
  int32_t vgId = pMeta->vgId;
2,082✔
288
  void*   pIter = NULL;
2,082✔
289
  size_t  keyLen = 0;
2,082✔
290

291
  stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet),
2,082!
292
         succ ? "success" : "failed");
293

294
  while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
4,186✔
295
    STaskInitTs* pInfo = pIter;
2,104✔
296
    void*        key = taosHashGetKey(pIter, &keyLen);
2,104✔
297
    SStreamTask* pTask = NULL;
2,104✔
298
    int32_t      code = streamMetaAcquireTaskUnsafe(pMeta, key, &pTask);
2,104✔
299
    if (code == 0) {
2,104!
300
      stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", pTask->id.idStr,
2,104!
301
             pTask->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
302
      streamMetaReleaseTask(pMeta, pTask);
2,104✔
303
    } else {
UNCOV
304
      stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
×
305
    }
306
  }
307
}
2,082✔
308

309
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) {
12,262✔
310
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
12,262✔
311

312
  pStartInfo->pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
12,277✔
313
  if (pStartInfo->pReadyTaskSet == NULL) {
12,277!
314
    return terrno;
×
315
  }
316

317
  pStartInfo->pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
12,277✔
318
  if (pStartInfo->pFailedTaskSet == NULL) {
12,278!
319
    return terrno;
×
320
  }
321

322
  return 0;
12,278✔
323
}
324

325
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) {
12,278✔
326
  taosHashCleanup(pStartInfo->pReadyTaskSet);
12,278✔
327
  taosHashCleanup(pStartInfo->pFailedTaskSet);
12,278✔
328
  pStartInfo->readyTs = 0;
12,277✔
329
  pStartInfo->elapsedTime = 0;
12,277✔
330
  pStartInfo->startTs = 0;
12,277✔
331
  pStartInfo->startAllTasks = 0;
12,277✔
332
  pStartInfo->tasksWillRestart = 0;
12,277✔
333
  pStartInfo->restartCount = 0;
12,277✔
334
}
12,277✔
335

336
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
5,107✔
337
  int32_t      code = 0;
5,107✔
338
  int32_t      vgId = pMeta->vgId;
5,107✔
339
  SStreamTask* pTask = NULL;
5,107✔
340
  bool         continueExec = true;
5,107✔
341

342
  stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
5,107!
343

344
  code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
5,107✔
345
  if ((pTask == NULL) || (code != 0)) {
5,107!
UNCOV
346
    stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
×
UNCOV
347
    int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId);
×
UNCOV
348
    if (ret) {
×
UNCOV
349
      stError("s-task:0x%x add check downstream failed, core:%s", taskId, tstrerror(ret));
×
350
    }
351

UNCOV
352
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
353
  }
354

355
  // fill-history task can only be launched by related stream tasks.
356
  STaskExecStatisInfo* pInfo = &pTask->execInfo;
5,107✔
357
  if (pTask->info.fillHistory == 1) {
5,107!
358
    stError("s-task:0x%x vgId:%d fill-history task, not start here", taskId, vgId);
×
359
    streamMetaReleaseTask(pMeta, pTask);
×
360
    return TSDB_CODE_SUCCESS;
×
361
  }
362

363
  // the start all tasks procedure may happen to start the newly deployed stream task, and results in the
364
  // concurrently start this task by two threads.
365
  streamMutexLock(&pTask->lock);
5,107✔
366

367
  SStreamTaskState status = streamTaskGetStatus(pTask);
5,107✔
368
  if (status.state != TASK_STATUS__UNINIT) {
5,107!
UNCOV
369
    stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
×
UNCOV
370
    continueExec = false;
×
371
  } else {
372
    continueExec = true;
5,107✔
373
  }
374
  streamMutexUnlock(&pTask->lock);
5,107✔
375

376
  if (!continueExec) {
5,107!
UNCOV
377
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
378
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
379
  }
380

381
  if(pTask->status.downstreamReady != 0) {
5,107!
382
    stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId);
×
383
    streamMetaReleaseTask(pMeta, pTask);
×
384
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
385
  }
386

387
  // avoid initialization and destroy running concurrently.
388
  streamMutexLock(&pTask->lock);
5,107✔
389
  if (pTask->pBackend == NULL) {
5,107!
390
    code = pMeta->expandTaskFn(pTask);
5,107✔
391
    streamMutexUnlock(&pTask->lock);
5,107✔
392

393
    if (code != TSDB_CODE_SUCCESS) {
5,107!
394
      streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
×
395
    }
396
  } else {
397
    streamMutexUnlock(&pTask->lock);
×
398
  }
399

400
  // concurrently start task may cause the latter started task be failed, and also failed to added into meta result.
401
  if (code == TSDB_CODE_SUCCESS) {
5,107!
402
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
5,107✔
403
    if (code != TSDB_CODE_SUCCESS) {
5,107!
404
      stError("s-task:%s vgId:%d failed to handle event:init-task, code:%s", pTask->id.idStr, pMeta->vgId,
×
405
              tstrerror(code));
406

407
      // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
408
      if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
409
        streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
×
410
      }
411
    }
412
  }
413

414
  streamMetaReleaseTask(pMeta, pTask);
5,107✔
415
  return code;
5,107✔
416
}
417

418
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
4,871✔
419
  streamMetaRLock(pMeta);
4,871✔
420

421
  SArray* pTaskList = NULL;
4,871✔
422
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
4,871✔
423
  stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
4,870!
424

425
  if (num == 0) {
4,871!
426
    stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
4,871!
427
    streamMetaRUnLock(pMeta);
4,871✔
428
    return TSDB_CODE_SUCCESS;
4,871✔
429
  }
430

UNCOV
431
  int64_t st = taosGetTimestampMs();
×
432

433
  // send hb msg to mnode before closing all tasks.
UNCOV
434
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
×
UNCOV
435
  if (code != TSDB_CODE_SUCCESS) {
×
436
    return code;
×
437
  }
438

UNCOV
439
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
×
UNCOV
440
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
UNCOV
441
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
UNCOV
442
    SStreamTask*   pTask = NULL;
×
443

UNCOV
444
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
UNCOV
445
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
446
      continue;
×
447
    }
448

UNCOV
449
    int64_t refId = pTask->id.refId;
×
UNCOV
450
    int32_t ret = streamTaskStop(pTask);
×
UNCOV
451
    if (ret) {
×
452
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
453
    }
454

UNCOV
455
    streamMetaReleaseTask(pMeta, pTask);
×
456
  }
457

UNCOV
458
  taosArrayDestroy(pTaskList);
×
459

UNCOV
460
  double el = (taosGetTimestampMs() - st) / 1000.0;
×
UNCOV
461
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
×
462

UNCOV
463
  streamMetaRUnLock(pMeta);
×
UNCOV
464
  return code;
×
465
}
466

467
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
14,753✔
468
  SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
14,753✔
469

470
  int32_t vgId = pTask->pMeta->vgId;
14,753✔
471
  if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
14,753✔
472
    // mark the sending of req consensus checkpoint request.
473
    pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
10✔
474
    pConChkptInfo->statusTs = ts;
10✔
475
    stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr,
10!
476
            vgId, pConChkptInfo->statusTs);
477
    return 1;
10✔
478
  } else {
479
    int32_t el = (ts - pConChkptInfo->statusTs) / 1000;
14,743✔
480

481
    // not recv consensus-checkpoint rsp for 60sec, send it again in hb to mnode
482
    if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) {
14,743!
483
      pConChkptInfo->statusTs = ts;
×
484

485
      stWarn(
×
486
          "s-task:%s vgId:%d not recv consensus-chkptId for %ds(more than 60s), set requiring in Hb again, ts:%" PRId64,
487
          pTask->id.idStr, vgId, el, pConChkptInfo->statusTs);
488
      return 1;
×
489
    }
490
  }
491

492
  return 0;
14,743✔
493
}
494

495
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) {
10✔
496
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
10✔
497
  pInfo->consenChkptTransId = transId;
10✔
498
  pInfo->status = TASK_CONSEN_CHKPT_RECV;
10✔
499
  pInfo->statusTs = ts;
10✔
500

501
  stInfo("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
10!
502
}
10✔
503

504
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
10✔
505
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
10✔
506
  int32_t           prevTrans = pInfo->consenChkptTransId;
10✔
507

508
  pInfo->status = TASK_CONSEN_CHKPT_REQ;
10✔
509
  pInfo->statusTs = ts;
10✔
510
  pInfo->consenChkptTransId = 0;
10✔
511

512
  stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts);
10!
513
}
10✔
514

515
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
44✔
516
  int32_t      code = TSDB_CODE_SUCCESS;
44✔
517
  int64_t      now = taosGetTimestampMs();
44✔
518
  int64_t      startTs = 0;
44✔
519
  bool         hasFillhistoryTask = false;
44✔
520
  STaskId      hId = {0};
44✔
521
  STaskId      id = {.streamId = streamId, .taskId = taskId};
44✔
522
  SStreamTask* pTask = NULL;
44✔
523

524
  stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
44!
525

526
  streamMetaRLock(pMeta);
44✔
527

528
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
44✔
529
  if (code == 0) {
44!
UNCOV
530
    startTs = pTask->taskCheckInfo.startTs;
×
UNCOV
531
    hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(pTask);
×
UNCOV
532
    hId = pTask->hTaskInfo.id;
×
UNCOV
533
    streamMetaReleaseTask(pMeta, pTask);
×
534

UNCOV
535
    streamMetaRUnLock(pMeta);
×
536

537
    // add the failed task info, along with the related fill-history task info into tasks list.
UNCOV
538
    code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
×
UNCOV
539
    if (hasFillhistoryTask) {
×
UNCOV
540
      code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
×
541
    }
542
  } else {
543
    streamMetaRUnLock(pMeta);
44✔
544

545
    stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
44!
546
            streamId, taskId, pMeta->vgId);
547
    code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
44✔
548
  }
549

550
  return code;
44✔
551
}
552

UNCOV
553
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
×
UNCOV
554
  int32_t startTs = pTask->execInfo.checkTs;
×
UNCOV
555
  int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
×
UNCOV
556
  if (code) {
×
UNCOV
557
    stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
×
558
  }
559

560
  // automatically set the related fill-history task to be failed.
UNCOV
561
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
UNCOV
562
    STaskId* pId = &pTask->hTaskInfo.id;
×
UNCOV
563
    code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
×
UNCOV
564
    if (code) {
×
565
      stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
×
566
    }
567
  }
UNCOV
568
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc