• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.56
/source/libs/stream/src/streamTaskSm.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17
#include "streamsm.h"
18
#include "tmisce.h"
19
#include "tstream.h"
20
#include "ttimer.h"
21
#include "wal.h"
22

23
static int32_t initRes = 0;
24

25
#define GET_EVT_NAME(_ev)  (StreamTaskEventList[(_ev)].name)
26
#define CHECK_RET_VAL(_exec)             \
27
  do {                                   \
28
    void* p = (_exec);                   \
29
    if (p == NULL) {                     \
30
      initRes = TSDB_CODE_OUT_OF_MEMORY; \
31
      return;                            \
32
    }                                    \
33
  } while (0);
34

35
SStreamTaskState StreamTaskStatusList[9] = {
36
    {.state = TASK_STATUS__READY, .name = "ready"},
37
    {.state = TASK_STATUS__DROPPING, .name = "dropped"},
38
    {.state = TASK_STATUS__UNINIT, .name = "uninit"},
39
    {.state = TASK_STATUS__STOP, .name = "stop"},
40
    {.state = TASK_STATUS__SCAN_HISTORY, .name = "scan-history"},
41
    {.state = TASK_STATUS__HALT, .name = "halt"},
42
    {.state = TASK_STATUS__PAUSE, .name = "paused"},
43
    {.state = TASK_STATUS__CK, .name = "checkpoint"},
44
};
45

46
typedef struct SStreamEventInfo {
47
  EStreamTaskEvent event;
48
  const char*      name;
49
} SStreamEventInfo;
50

51
SStreamEventInfo StreamTaskEventList[12] = {
52
    {.event = 0, .name = ""},  // dummy event, place holder
53
    {.event = TASK_EVENT_INIT, .name = "initialize"},
54
    {.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-init"},
55
    {.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"},
56
    {.event = TASK_EVENT_STOP, .name = "stopping"},
57
    {.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"},
58
    {.event = TASK_EVENT_CHECKPOINT_DONE, .name = "checkpoint-done"},
59
    {.event = TASK_EVENT_PAUSE, .name = "pausing"},
60
    {.event = TASK_EVENT_RESUME, .name = "resuming"},
61
    {.event = TASK_EVENT_HALT, .name = "halting"},
62
    {.event = TASK_EVENT_DROPPING, .name = "dropping"},
63
};
64

65
static TdThreadOnce streamTaskStateMachineInit = PTHREAD_ONCE_INIT;
66
static SArray*      streamTaskSMTrans = NULL;
67

68
static int32_t streamTaskInitStatus(SStreamTask* pTask);
69
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
70
static int32_t initStateTransferTable();
71
static void    doInitStateTransferTable(void);
72

73
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
74
                                            __state_trans_fn fn, __state_trans_succ_fn succFn,
75
                                            SFutureHandleEventInfo* pEventInfo);
76

77
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
63,382✔
78

79
static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) {
72✔
80
  char* p = streamTaskGetStatus(pTask).name;
72✔
81

82
  stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
72!
83
          GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
84

85
  SArray* pList = pTask->status.pSM->pWaitingEventList;
72✔
86

87
  void* px = taosArrayPush(pList, pEvtInfo);
72✔
88
  if (px == NULL) {
72!
89
    stError("s-task:%s failed to add into waiting list, total waiting events:%d, code: out of memory", pTask->id.idStr,
×
90
            (int32_t)taosArrayGetSize(pList));
91
    return terrno;
×
92
  } else {
93
    stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr,
72!
94
            (int32_t)taosArrayGetSize(pList));
95
    return TSDB_CODE_SUCCESS;
72✔
96
  }
97
}
98

99
int32_t streamTaskInitStatus(SStreamTask* pTask) {
13,816✔
100
  pTask->execInfo.checkTs = taosGetTimestampMs();
13,818✔
101
  stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
13,818✔
102
          pTask->execInfo.checkTs);
103

104
  streamTaskSendCheckMsg(pTask);
13,818✔
105
  return 0;
13,817✔
106
}
107

108
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
4,840✔
109
  if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
4,840!
110
    stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr);
×
111
  }
112

113
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
4,841✔
114
    pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
2,477✔
115
    if (pTask->hTaskInfo.haltVer == -1) {
2,477✔
116
      pTask->hTaskInfo.haltVer = pTask->dataRange.range.minVer;
944✔
117
    }
118
  }
119

120
  return TSDB_CODE_SUCCESS;
4,841✔
121
}
122

123
// todo check rsp code for handle Event:TASK_EVENT_SCANHIST_DONE
124
static bool isInvalidStateTransfer(ETaskStatus state, const EStreamTaskEvent event) {
4✔
125
  if (event == TASK_EVENT_INIT || event == TASK_EVENT_INIT_SCANHIST) {
4!
126
    return (state != TASK_STATUS__UNINIT);
4✔
127
  }
128

129
  if (event == TASK_EVENT_SCANHIST_DONE) {
×
130
    return (state != TASK_STATUS__SCAN_HISTORY);
×
131
  }
132

133
  if (event == TASK_EVENT_GEN_CHECKPOINT) {
×
134
    return (state != TASK_STATUS__READY);
×
135
  }
136

137
  if (event == TASK_EVENT_CHECKPOINT_DONE) {
×
138
    return (state != TASK_STATUS__CK);
×
139
  }
140

141
  // todo refactor later
142
  if (event == TASK_EVENT_RESUME) {
×
143
    return true;
×
144
  }
145

146
  if (event == TASK_EVENT_HALT) {
×
147
    if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__UNINIT || state == TASK_STATUS__STOP ||
×
148
        state == TASK_STATUS__SCAN_HISTORY) {
149
      return true;
×
150
    }
151
  }
152

153
  return false;
×
154
}
155

156
// todo optimize the perf of find the trans objs by using hash table
157
static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStreamTaskEvent event) {
48,034✔
158
  int32_t numOfTrans = taosArrayGetSize(streamTaskSMTrans);
48,034✔
159
  for (int32_t i = 0; i < numOfTrans; ++i) {
562,046✔
160
    STaskStateTrans* pTrans = taosArrayGet(streamTaskSMTrans, i);
561,882✔
161
    if (pTrans == NULL) {
561,979!
162
      continue;
×
163
    }
164

165
    if (pTrans->state.state == state && pTrans->event == event) {
561,979✔
166
      return pTrans;
47,966✔
167
    }
168
  }
169

170
  if (isInvalidStateTransfer(state, event)) {
164!
171
    stError("invalid state transfer %d, handle event:%s", state, GET_EVT_NAME(event));
4!
172
  }
173

174
  return NULL;
4✔
175
}
176

UNCOV
177
static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, SStreamTask* pTask) {
×
UNCOV
178
  int32_t code = TSDB_CODE_SUCCESS;
×
UNCOV
179
  int64_t el = (taosGetTimestampMs() - pSM->startTs);
×
UNCOV
180
  stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
×
181
          pEventName, el, pSM->prev.state.name, pSM->current.name);
182

UNCOV
183
  SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
×
UNCOV
184
  if (pEvtInfo == NULL) {
×
185
    return terrno;
×
186
  }
187

188
  // OK, let's handle the waiting event, since the task has reached the required status now
UNCOV
189
  if (pSM->current.state == pEvtInfo->status) {
×
UNCOV
190
    stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
×
191
            GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
192

193
    // remove it
UNCOV
194
    void* px = taosArrayPop(pSM->pWaitingEventList);
×
195

UNCOV
196
    STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
×
197

UNCOV
198
    pSM->pActiveTrans = pNextTrans;
×
UNCOV
199
    pSM->startTs = taosGetTimestampMs();
×
UNCOV
200
    streamMutexUnlock(&pTask->lock);
×
201

UNCOV
202
    code = pNextTrans->pAction(pSM->pTask);
×
UNCOV
203
    if (pNextTrans->autoInvokeEndFn) {
×
UNCOV
204
      return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event, pEvtInfo->callBackFn, pEvtInfo->pParam);
×
205
    } else {
206
      return code;
×
207
    }
208
  } else {
209
    streamMutexUnlock(&pTask->lock);
×
210
    stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr,
×
211
            pSM->current.name, GET_EVT_NAME(pEvtInfo->event),
212
            StreamTaskStatusList[pEvtInfo->status].name);
213
  }
214

215
  return code;
×
216
}
217

218
static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) {
72✔
219
  SStreamTaskSM* pSM = pTask->status.pSM;
72✔
220
  bool           removed = false;
72✔
221
  int32_t        num = taosArrayGetSize(pSM->pWaitingEventList);
72✔
222

223
  for (int32_t i = 0; i < num; ++i) {
72!
224
    SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
72✔
225
    if (pInfo == NULL) {
72!
226
      continue;
×
227
    }
228

229
    if (pInfo->event == event) {
72!
230
      taosArrayRemove(pSM->pWaitingEventList, i);
72✔
231
      stDebug("s-task:%s %s event in waiting list not be handled yet, remove it from waiting list, remaining events:%d",
72!
232
              pTask->id.idStr, GET_EVT_NAME(pInfo->event), num - 1);
233
      removed = true;
72✔
234
      break;
72✔
235
    }
236
  }
237

238
  if (!removed) {
72!
239
    stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, GET_EVT_NAME(event));
×
240
  }
241

242
  return TSDB_CODE_SUCCESS;
72✔
243
}
244

245
int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
1,458✔
246
  SStreamTaskSM* pSM = pTask->status.pSM;
1,458✔
247
  int32_t        code = 0;
1,458✔
248

249
  streamMutexLock(&pTask->lock);
1,458✔
250

251
  if (pSM->current.state == TASK_STATUS__PAUSE && pSM->pActiveTrans == NULL) {
2,842!
252
    SStreamTaskState state = pSM->current;
1,386✔
253
    pSM->current = pSM->prev.state;
1,386✔
254

255
    pSM->prev.state = state;
1,386✔
256
    pSM->prev.evt = 0;
1,386✔
257

258
    pSM->startTs = taosGetTimestampMs();
1,385✔
259

260
    if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
1,385!
261
      stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr,
×
262
              pSM->prev.state.name, pSM->current.name);
263
      code = doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
×
264
    } else {
265
      stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
1,384✔
266
    }
267
  } else {
268
    code = removeEventInWaitingList(pTask, TASK_EVENT_PAUSE);  // ignore the return value,
72✔
269
    if (code) {
72!
270
      stError("s-task:%s failed to remove event in waiting list, code:%s", pTask->id.idStr, tstrerror(code));
×
271
    }
272

273
    code = TSDB_CODE_FAILED;  // failed to restore the status, since it is not in pause status
72✔
274
  }
275

276
  streamMutexUnlock(&pTask->lock);
1,456✔
277
  return code;
1,456✔
278
}
279

280
int32_t streamCreateStateMachine(SStreamTask* pTask) {
27,931✔
281
  int32_t code = initStateTransferTable();
27,931✔
282
  if (code != TSDB_CODE_SUCCESS) {
27,927!
283
    return code;
×
284
  }
285

286
  const char* id = pTask->id.idStr;
27,927✔
287

288
  SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM));
27,927✔
289
  if (pSM == NULL) {
27,943!
290
    stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
×
291
            tstrerror(terrno));
292
    return terrno;
×
293
  }
294

295
  pSM->pTask = pTask;
27,943✔
296
  pSM->pWaitingEventList = taosArrayInit(4, sizeof(SFutureHandleEventInfo));
27,943✔
297
  if (pSM->pWaitingEventList == NULL) {
27,940!
298
    taosMemoryFree(pSM);
×
299
    stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
×
300
            tstrerror(terrno));
301
    return terrno;
×
302
  }
303

304
  // set the initial state for the state-machine of stream task
305
  pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
27,940✔
306
  pSM->startTs = taosGetTimestampMs();
27,925✔
307

308
  pTask->status.pSM = pSM;
27,925✔
309
  return TSDB_CODE_SUCCESS;
27,925✔
310
}
311

312
void streamDestroyStateMachine(SStreamTaskSM* pSM) {
59,568✔
313
  if (pSM == NULL) {
59,568✔
314
    return;
31,887✔
315
  }
316

317
  taosArrayDestroy(pSM->pWaitingEventList);
27,681✔
318
  taosMemoryFree(pSM);
27,690✔
319
}
320

321
static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans) {
30,048✔
322
  SStreamTask* pTask = pSM->pTask;
30,048✔
323
  const char*  id = pTask->id.idStr;
30,048✔
324
  int32_t      code = 0;
30,048✔
325

326
  if (pTrans->attachEvent.event != 0) {
30,048!
327
    code = attachWaitedEvent(pTask, &pTrans->attachEvent);
×
328
    streamMutexUnlock(&pTask->lock);
×
329
    if (code) {
×
330
      return code;
×
331
    }
332

333
    while (1) {
×
334
      // wait for the task to be here
335
      streamMutexLock(&pTask->lock);
×
336
      ETaskStatus s = streamTaskGetStatus(pTask).state;
×
337
      streamMutexUnlock(&pTask->lock);
×
338

339
      if ((s == pTrans->next.state) && (pSM->prev.evt == pTrans->event)) {// this event has been handled already
×
340
        stDebug("s-task:%s attached event:%s handled", id, GET_EVT_NAME(pTrans->event));
×
341
        return TSDB_CODE_SUCCESS;
×
342
      } else if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT) {
×
343
        stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", id, GET_EVT_NAME(event));
×
344
        taosMsleep(100);
×
345
      } else {
346
        stDebug("s-task:%s is dropped or stopped already, not wait.", id);
×
347
        return TSDB_CODE_STREAM_INVALID_STATETRANS;
×
348
      }
349
    }
350

351
  } else {  // override current active trans
352
    pSM->pActiveTrans = pTrans;
30,048✔
353
    pSM->startTs = taosGetTimestampMs();
30,060✔
354
    streamMutexUnlock(&pTask->lock);
30,060✔
355

356
    code = pTrans->pAction(pTask);
30,074✔
357

358
    if (pTrans->autoInvokeEndFn) {
30,071✔
359
      int32_t c = streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL);
16,254✔
360
      if (code == TSDB_CODE_SUCCESS) {
16,264!
361
        code = c;
16,264✔
362
      }
363
    }
364
  }
365

366
  return code;
30,081✔
367
}
368

369
static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) {
17,904✔
370
  SStreamTask* pTask = pSM->pTask;
17,904✔
371
  int32_t code = 0;
17,904✔
372

373
  if (pTrans->attachEvent.event != 0) {
17,904✔
374
    SFutureHandleEventInfo info = pTrans->attachEvent;
72✔
375
    info.pParam = param;
72✔
376
    info.callBackFn = callbackFn;
72✔
377

378
    code = attachWaitedEvent(pTask, &info);
72✔
379
    streamMutexUnlock(&pTask->lock);
72✔
380
  } else {  // override current active trans
381
    pSM->pActiveTrans = pTrans;
17,832✔
382
    pSM->startTs = taosGetTimestampMs();
17,837✔
383
    streamMutexUnlock(&pTask->lock);
17,837✔
384

385
    code = pTrans->pAction(pTask);
17,898✔
386
    // todo handle error code;
387

388
    if (pTrans->autoInvokeEndFn) {
17,888!
389
      int32_t c = streamTaskOnHandleEventSuccess(pSM, event, callbackFn, param);
17,888✔
390
      if (code == TSDB_CODE_SUCCESS) {
17,894!
391
        code = c;
17,895✔
392
      }
393
    }
394
  }
395

396
  return code;
17,966✔
397
}
398

399
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
30,086✔
400
  int32_t          code = TSDB_CODE_SUCCESS;
30,086✔
401
  SStreamTask*     pTask = pSM->pTask;
30,086✔
402
  STaskStateTrans* pTrans = NULL;
30,086✔
403

404
  while (1) {
405
    streamMutexLock(&pTask->lock);
30,086✔
406

407
    if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
30,095!
408
      EStreamTaskEvent evt = pSM->pActiveTrans->event;
×
409
      streamMutexUnlock(&pTask->lock);
×
410

411
      stDebug("s-task:%s status:%s handling event:%s by another thread, wait for 100ms and check if completed",
×
412
              pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
413
      taosMsleep(100);
×
414
    } else {
415
      // no active event trans exists, handle this event directly
416
      pTrans = streamTaskFindTransform(pSM->current.state, event);
30,095✔
417
      if (pTrans == NULL) {
30,059✔
418
        stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event),
4!
419
                pSM->current.name);
420
        streamMutexUnlock(&pTask->lock);
4✔
421
        return TSDB_CODE_STREAM_INVALID_STATETRANS;
4✔
422
      }
423

424
      if (pSM->pActiveTrans != NULL) {
30,055✔
425
        // not allowed concurrently initialization
426
        if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) {
72!
427
          streamMutexUnlock(&pTask->lock);
×
428
          stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr);
×
429
          return TSDB_CODE_STREAM_CONFLICT_EVENT;
×
430
        }
431

432
        // currently in some state transfer procedure, not auto invoke transfer, abort it
433
        stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
72!
434
                pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
435
                pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
436
      }
437

438
      code = doHandleEvent(pSM, event, pTrans);
30,055✔
439
      break;
30,082✔
440
    }
441
  }
442

443
  return code;
30,082✔
444
}
445

446
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
17,962✔
447
  int32_t          code = TSDB_CODE_SUCCESS;
17,962✔
448
  SStreamTask*     pTask = pSM->pTask;
17,962✔
449
  STaskStateTrans* pTrans = NULL;
17,962✔
450

451
  while (1) {
452
    streamMutexLock(&pTask->lock);
17,962✔
453

454
    if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
17,975!
455
      EStreamTaskEvent evt = pSM->pActiveTrans->event;
×
456
      streamMutexUnlock(&pTask->lock);
×
457

458
      stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
×
459
              pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
460
      taosMsleep(100);
×
461
    } else {
462
      // no active event trans exists, handle this event directly
463
      pTrans = streamTaskFindTransform(pSM->current.state, event);
17,975✔
464
      if (pTrans == NULL) {
17,919!
465
        stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name);
×
466
        streamMutexUnlock(&pTask->lock);
×
467
        return TSDB_CODE_STREAM_INVALID_STATETRANS;
×
468
      }
469

470
      if (pSM->pActiveTrans != NULL) {
17,919✔
471
        // currently in some state transfer procedure, not auto invoke transfer, quit from this procedure
472
        stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
30✔
473
                pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
474
                pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
475
      }
476

477
      code = doHandleEventAsync(pSM, event, pTrans, callbackFn, param);
17,919✔
478
      break;
17,965✔
479
    }
480
  }
481

482
  return code;
17,965✔
483
}
484

485
static void keepPrevInfo(SStreamTaskSM* pSM) {
47,827✔
486
  STaskStateTrans* pTrans = pSM->pActiveTrans;
47,827✔
487

488
  pSM->prev.state = pSM->current;
47,827✔
489
  pSM->prev.evt = pTrans->event;
47,827✔
490
}
47,827✔
491

492
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
47,834✔
493
  SStreamTask* pTask = pSM->pTask;
47,834✔
494
  const char*  id = pTask->id.idStr;
47,834✔
495
  int32_t      code = 0;
47,834✔
496

497
  // do update the task status
498
  streamMutexLock(&pTask->lock);
47,834✔
499

500
  STaskStateTrans* pTrans = pSM->pActiveTrans;
47,855✔
501
  if (pTrans == NULL) {
47,855✔
502
    ETaskStatus s = pSM->current.state;
38✔
503
    // when trying to finish current event successfully, another event with high priorities, such as dropping/stop, has
504
    // interrupted this procedure, and changed the status after freeing the activeTrans, resulting in the failure of
505
    // processing of current event.
506
    if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT &&
38!
507
        s != TASK_STATUS__READY) {
508
      stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name,
×
509
              GET_EVT_NAME(pSM->prev.evt));
510
    }
511

512
    // the pSM->prev.evt may be 0, so print string is not appropriate.
513
    stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event),
38✔
514
            pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
515

516
    streamMutexUnlock(&pTask->lock);
38✔
517
    return TSDB_CODE_STREAM_INVALID_STATETRANS;
1✔
518
  }
519

520
  if (pTrans->event != event) {
47,817!
521
    stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", id, GET_EVT_NAME(event),
×
522
           pSM->current.name, GET_EVT_NAME(pTrans->event));
523
    streamMutexUnlock(&pTask->lock);
×
524
    return TSDB_CODE_STREAM_INVALID_STATETRANS;
×
525
  }
526

527
  // repeat pause will not overwrite the previous pause state
528
  if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) {
47,817!
529
    keepPrevInfo(pSM);
47,817✔
530
    pSM->current = pTrans->next;
47,807✔
531
  } else {
532
    stDebug("s-task:%s repeat pause evt recv, not update prev status", id);
×
533
  }
534

535
  pSM->pActiveTrans = NULL;
47,807✔
536
  // todo remove it
537
  // todo: handle the error code
538
  // on success callback, add into lock if necessary, or maybe we should add an option for this?
539
  code = pTrans->pSuccAction(pTask);
47,807✔
540

541
  streamMutexUnlock(&pTask->lock);
47,826✔
542

543
  // todo: add parameter to control lock
544
  // after handling the callback function assigned by invoker, go on handling the waiting tasks
545
  if (callbackFn != NULL) {
47,854✔
546
    stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event));
17,900✔
547
    int32_t ret = callbackFn(pSM->pTask, param);
17,901✔
548
    if (ret != TSDB_CODE_SUCCESS) {
549
      // todo  handle error
550
    }
551

552
    stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event));
17,887✔
553
  }
554

555
  streamMutexLock(&pTask->lock);
47,841✔
556

557
  // tasks in waiting list
558
  if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
47,851!
UNCOV
559
    code = doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);
×
560
  } else {
561
    streamMutexUnlock(&pTask->lock);
47,857✔
562

563
    int64_t el = (taosGetTimestampMs() - pSM->startTs);
47,857✔
564
    stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", id,
47,857✔
565
            GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
566
  }
567

568
  return code;
47,858✔
569
}
570

571
SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask) {
3,649,126✔
572
  return pTask->status.pSM->current;  // copy one obj in case of multi-thread environment
3,649,126✔
573
}
574

575
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) {
12,278✔
576
  return pTask->status.pSM->prev.state.state;
12,278✔
577
}
578

579
const char* streamTaskGetStatusStr(ETaskStatus status) {
221,514✔
580
  int32_t index = status;
221,514✔
581
  if (index < 0 || index > tListLen(StreamTaskStatusList)) {
221,514!
582
    return "";
×
583
  }
584

585
  return StreamTaskStatusList[status].name;
221,558✔
586
}
587

588
void streamTaskResetStatus(SStreamTask* pTask) {
780✔
589
  SStreamTaskSM* pSM = pTask->status.pSM;
780✔
590

591
  streamMutexLock(&pTask->lock);
780✔
592
  stDebug("s-task:%s level:%d fill-history:%d vgId:%d set uninit, prev status:%s", pTask->id.idStr,
780✔
593
          pTask->info.taskLevel, pTask->info.fillHistory, pTask->pMeta->vgId, pSM->current.name);
594

595
  pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
780✔
596
  pSM->pActiveTrans = NULL;
780✔
597
  taosArrayClear(pSM->pWaitingEventList);
780✔
598
  streamMutexUnlock(&pTask->lock);
780✔
599

600
  // clear the downstream ready status
601
  pTask->status.downstreamReady = 0;
780✔
602
}
780✔
603

604
void streamTaskSetStatusReady(SStreamTask* pTask) {
×
605
  SStreamTaskSM* pSM = pTask->status.pSM;
×
606
  if (pSM->current.state == TASK_STATUS__DROPPING) {
×
607
    stError("s-task:%s task in dropping state, cannot be set ready", pTask->id.idStr);
×
608
    return;
×
609
  }
610

611
  pSM->prev.state = pSM->current;
×
612
  pSM->prev.evt = 0;
×
613

614
  pSM->current = StreamTaskStatusList[TASK_STATUS__READY];
×
615
  pSM->startTs = taosGetTimestampMs();
×
616
  pSM->pActiveTrans = NULL;
×
617
  taosArrayClear(pSM->pWaitingEventList);
×
618
}
619

620
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
7,004✔
621
                                     __state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo) {
622
  STaskStateTrans trans = {0};
7,004✔
623
  trans.state = StreamTaskStatusList[current];
7,004✔
624
  trans.next = StreamTaskStatusList[next];
7,004✔
625
  trans.event = event;
7,004✔
626

627
  if (pEventInfo != NULL) {
7,004✔
628
    trans.attachEvent = *pEventInfo;
824✔
629
  } else {
630
    trans.attachEvent.event = 0;
6,180✔
631
    trans.attachEvent.status = 0;
6,180✔
632
  }
633

634
  trans.pAction = (fn != NULL) ? fn : dummyFn;
7,004✔
635
  trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn;
7,004✔
636
  trans.autoInvokeEndFn = (fn == NULL);
7,004✔
637
  return trans;
7,004✔
638
}
639

640
int32_t initStateTransferTable() {
27,930✔
641
  return taosThreadOnce(&streamTaskStateMachineInit, doInitStateTransferTable);
27,930✔
642
}
643

644
//clang-format off
645
void doInitStateTransferTable(void) {
206✔
646
  streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
206✔
647

648
  // initialization event handle
649
  STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL);
206✔
650
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
651

652
  trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanHistoryTaskReady, NULL);
206✔
653
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
654

655
  // scan-history related event
656
  trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL);
206✔
657
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
658

659
  // halt stream task, from other task status
660
  trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
206✔
661
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
662
  trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
206✔
663
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
664

665
  SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
206✔
666

667
  trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info);
206✔
668
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
669
  trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
206✔
670
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
671

672
  // checkpoint related event
673
  trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
206✔
674
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
675
  trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
206✔
676
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
677
  trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL);
206✔
678
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
679

680
  // pause & resume related event handle
681
  trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
206✔
682
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
683
  trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
206✔
684
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
685

686
  info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
206✔
687
  trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
206✔
688
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
689
  trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
206✔
690
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
691
  trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
206✔
692
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
693

694
  trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
206✔
695
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
696
  trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);
206✔
697
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
698
  trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL);
206✔
699
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
700

701
  // resume is completed by restore status of state-machine
702

703
  // stop related event
704
  trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
206✔
705
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
706
  trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
206✔
707
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
708
  trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
206✔
709
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
710
  trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
206✔
711
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
712
  trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
206✔
713
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
714
  trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
206✔
715
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
716
  trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
206✔
717
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
718
  trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
206✔
719
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
720

721
  // dropping related event
722
  trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
206✔
723
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
724
  trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
206✔
725
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
726
  trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
206✔
727
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
728
  trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
206✔
729
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
730
  trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
206✔
731
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
732
  trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
206✔
733
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
734
  trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
206✔
735
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
736
  trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
206✔
737
  CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
412!
738
}
739
//clang-format on
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc