• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4348

21 Jun 2025 07:48AM UTC coverage: 62.366% (+1.8%) from 60.571%
#4348

push

travis-ci

web-flow
docs: add OpenMetrics support and configuration details to taosAdapter documentation (#31427)

* docs: add OpenMetrics support and configuration details to taosAdapter documentation

* docs: enhance OpenMetrics section in taosAdapter documentation

156282 of 319947 branches covered (48.85%)

Branch coverage included in aggregate %.

242147 of 318911 relevant lines covered (75.93%)

6151642.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.43
/source/libs/stream/src/streamMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "streamInt.h"
18
#include "tglobal.h"
19
#include "tmisce.h"
20
#include "tref.h"
21
#include "tsched.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25

26
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
27

28
int32_t streamBackendId = 0;
29
int32_t streamBackendCfWrapperId = 0;
30
int32_t taskDbWrapperId = 0;
31
int32_t streamTaskRefPool = 0;
32

33
static int32_t streamMetaBegin(SStreamMeta* pMeta);
34
static void    streamMetaCloseImpl(void* arg);
35

36
typedef struct {
37
  TdThreadMutex mutex;
38
  SHashObj*     pTable;
39
} SMetaRefMgt;
40

41
SMetaRefMgt gMetaRefMgt;
42

43
int32_t metaRefMgtInit();
44
void    metaRefMgtCleanup();
45

46
static void streamMetaEnvInit() {
2,661✔
47
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
2,661✔
48
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
2,661✔
49
  taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
2,661✔
50

51
  streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
2,661✔
52
  streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
2,661✔
53

54
  int32_t code = metaRefMgtInit();
2,661✔
55
  if (code) {
2,661!
56
    stError("failed to init stream meta mgmt env, start failed");
×
57
    return;
×
58
  }
59

60
  code = streamTimerInit();
2,661✔
61
  if (code) {
2,661!
62
    stError("failed to init stream meta env, start failed");
×
63
  }
64
}
65

66
void streamMetaInit() {
2,663✔
67
  int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
2,663✔
68
  if (code) {
2,663!
69
    stError("failed to init stream Meta model, code:%s", tstrerror(code));
×
70
  }
71
}
2,663✔
72

73
void streamMetaCleanup() {
2,659✔
74
  taosCloseRef(streamBackendId);
2,659✔
75
  taosCloseRef(streamBackendCfWrapperId);
2,659✔
76
  taosCloseRef(streamMetaRefPool);
2,659✔
77
  taosCloseRef(streamTaskRefPool);
2,659✔
78

79
  metaRefMgtCleanup();
2,659✔
80
  streamTimerCleanUp();
2,659✔
81
}
2,659✔
82

83
int32_t metaRefMgtInit() {
2,661✔
84
  int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
2,661✔
85
  if (code) {
2,661!
86
    return code;
×
87
  }
88

89
  if (code == 0) {
2,661!
90
    gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
2,661✔
91
  }
92

93
  if (gMetaRefMgt.pTable == NULL) {
2,661!
94
    return terrno;
×
95
  } else {
96
    return code;
2,661✔
97
  }
98
}
99

100
void metaRefMgtCleanup() {
2,659✔
101
  void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
2,659✔
102
  while (pIter) {
28,304✔
103
    int64_t* p = *(int64_t**)pIter;
25,645✔
104
    taosMemoryFree(p);
25,645!
105
    pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
25,645✔
106
  }
107

108
  taosHashCleanup(gMetaRefMgt.pTable);
2,659✔
109
  streamMutexDestroy(&gMetaRefMgt.mutex);
2,659✔
110
}
2,659✔
111

112
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
37,210✔
113
  int32_t code = 0;
37,210✔
114
  void*   p = NULL;
37,210✔
115

116
  streamMutexLock(&gMetaRefMgt.mutex);
37,210✔
117

118
  p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
37,210✔
119
  if (p == NULL) {
37,210!
120
    code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
37,210✔
121
    if (code) {
37,210!
122
      stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid,
×
123
              tstrerror(code));
124
      return code;
×
125
    } else {  // not
126
              //      stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
127
    }
128
  } else {
129
    stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid);
×
130
  }
131

132
  streamMutexUnlock(&gMetaRefMgt.mutex);
37,210✔
133
  return code;
37,210✔
134
}
135

136
void metaRefMgtRemove(int64_t* pRefId) {
11,497✔
137
  streamMutexLock(&gMetaRefMgt.mutex);
11,497✔
138

139
  int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
11,497✔
140
  taosMemoryFree(pRefId);
11,497!
141
  streamMutexUnlock(&gMetaRefMgt.mutex);
11,497✔
142
}
11,497✔
143

144
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
13,128✔
145
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
13,128!
146
    stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
×
147
    return -1;
×
148
  }
149

150
  if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
13,128!
151
    stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
×
152
    return -1;
×
153
  }
154

155
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
13,131!
156
    stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
×
157
    return -1;
×
158
  }
159

160
  return 0;
13,131✔
161
}
162

163
//
164
// impl later
165
//
166
enum STREAM_STATE_VER {
167
  STREAM_STATA_NO_COMPATIBLE,
168
  STREAM_STATA_COMPATIBLE,
169
  STREAM_STATA_NEED_CONVERT,
170
};
171

172
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
13,131✔
173
  int8_t  ret = STREAM_STATA_COMPATIBLE;
13,131✔
174
  TBC*    pCur = NULL;
13,131✔
175
  int32_t code = 0;
13,131✔
176
  void*   pKey = NULL;
13,131✔
177
  int32_t kLen = 0;
13,131✔
178
  void*   pVal = NULL;
13,131✔
179
  int32_t vLen = 0;
13,131✔
180

181
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {  // no task info, no stream
13,131!
182
    return ret;
×
183
  }
184

185
  code = tdbTbcMoveToFirst(pCur);
13,130✔
186
  if (code) {
13,130!
187
    stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
×
188
            tstrerror(code));
189
    tdbTbcClose(pCur);
×
190
    return ret;
×
191
  }
192

193
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
13,146✔
194
    if (pVal == NULL || vLen == 0) {
62!
195
      break;
196
    }
197

198
    SDecoder        decoder;
199
    SCheckpointInfo info;
200
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
62✔
201
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
62✔
202
      tDecoderClear(&decoder);
16✔
203
      continue;
16✔
204
    }
205

206
    if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
46!
207
      ret = STREAM_STATA_NO_COMPATIBLE;
×
208
    } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
46!
209
      ret = STREAM_STATA_NEED_CONVERT;
46✔
210
    }
211

212
    tDecoderClear(&decoder);
46✔
213
    break;
46✔
214
  }
215

216
  tdbFree(pKey);
13,130✔
217
  tdbFree(pVal);
13,130✔
218
  tdbTbcClose(pCur);
13,131✔
219
  return ret;
13,131✔
220
}
221

222
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
46✔
223
  int32_t          code = 0;
46✔
224
  SBackendWrapper* pBackend = NULL;
46✔
225
  int64_t          chkpId = streamMetaGetLatestCheckpointId(pMeta);
46✔
226

227
  terrno = 0;
46✔
228
  bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
46✔
229
  if (exist == false) {
46!
230
    code = terrno;
46✔
231
    return code;
46✔
232
  }
233

234
  code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
×
235
  if (code) {
×
236
    return code;
×
237
  }
238

239
  void* pIter = taosHashIterate(pBackend->cfInst, NULL);
×
240
  while (pIter) {
×
241
    void* key = taosHashGetKey(pIter, NULL);
×
242
    code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
×
243
    if (code != 0) {
×
244
      stError("vgId:%d failed to cvt data", pMeta->vgId);
×
245
      goto _EXIT;
×
246
    }
247

248
    pIter = taosHashIterate(pBackend->cfInst, pIter);
×
249
  }
250

251
_EXIT:
×
252
  streamBackendCleanup((void*)pBackend);
×
253

254
  if (code == 0) {
×
255
    int32_t len = strlen(pMeta->path) + 32;
×
256
    char*   state = taosMemoryCalloc(1, len);
×
257
    if (state != NULL) {
×
258
      (void)snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
×
259
      taosRemoveDir(state);
×
260
      taosMemoryFree(state);
×
261
    } else {
262
      stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
×
263
    }
264
  }
265

266
  return code;
×
267
}
268

269
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
13,131✔
270
  int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
13,131✔
271
  if (compatible == STREAM_STATA_COMPATIBLE) {
13,131✔
272
    return 0;
13,085✔
273
  } else if (compatible == STREAM_STATA_NEED_CONVERT) {
46!
274
    stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
46!
275
    return streamMetaCvtDbFormat(pMeta);
46✔
276
  } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
×
277
    stError(
×
278
        "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
279
        "manually",
280
        pMeta->vgId, tsDataDir);
281

282
    return -1;
×
283
  }
284

285
  return 0;
×
286
}
287

288
int8_t streamTaskShouldRecalated(SStreamTask* pTask) { return pTask->info.fillHistory == 2 ? 1 : 0; }
×
289

290
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key, uint8_t recalated) {
3,595✔
291
  int32_t code = 0;
3,595✔
292
  int64_t chkpId = pTask->chkInfo.checkpointId;
3,595✔
293

294
  // int8_t recalated = streamTaskShouldRecalated(pTask);
295

296
  streamMutexLock(&pMeta->backendMutex);
3,595✔
297
  // streamId--taskId
298
  void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
3,595✔
299
  if ((ppBackend != NULL) && (*ppBackend != NULL)) {
3,595!
300
    void* p = taskDbAddRef(*ppBackend);
936✔
301
    if (p == NULL) {
936!
302
      stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
×
303
      streamMutexUnlock(&pMeta->backendMutex);
×
304
      return TSDB_CODE_FAILED;
×
305
    }
306

307
    STaskDbWrapper* pBackend = *ppBackend;
936✔
308
    pBackend->pMeta = pMeta;
936✔
309
    if (recalated) {
936!
310
      pTask->pRecalBackend = pBackend;
×
311
    } else {
312
      pTask->pBackend = pBackend;
936✔
313
    }
314

315
    streamMutexUnlock(&pMeta->backendMutex);
936✔
316
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
936!
317
    return 0;
936✔
318
  }
319

320
  STaskDbWrapper* pBackend = NULL;
2,659✔
321
  int64_t         processVer = -1;
2,659✔
322
  while (1) {
323
    code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
2,659✔
324
    if (code == 0) {
2,659!
325
      break;
2,659✔
326
    }
327

328
    streamMutexUnlock(&pMeta->backendMutex);
×
329
    taosMsleep(1000);
×
330

331
    stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
×
332
    streamMutexLock(&pMeta->backendMutex);
×
333
  }
334

335
  int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
2,659✔
336
  if (recalated) {
2,659!
337
    pTask->pRecalBackend = pBackend;
×
338
  } else {
339
    pTask->pBackend = pBackend;
2,659✔
340
  }
341
  pBackend->refId = tref;
2,659✔
342
  pBackend->pTask = pTask;
2,659✔
343
  pBackend->pMeta = pMeta;
2,659✔
344

345
  if (processVer != -1) {
2,659✔
346
    if (pTask->chkInfo.processedVer != processVer) {
10!
347
      stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
×
348
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
349
      pTask->chkInfo.processedVer = processVer;
×
350
      pTask->chkInfo.checkpointVer = processVer;
×
351
      pTask->chkInfo.nextProcessVer = processVer + 1;
×
352
    } else {
353
      stInfo("s-task:%s vgId:%d processedVer:%" PRId64
10!
354
             " in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
355
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
356
    }
357
  }
358

359
  code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
2,659✔
360
  if (code) {
2,659!
361
    stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
×
362
  }
363
  streamMutexUnlock(&pMeta->backendMutex);
2,659✔
364

365
  if (recalated) {
2,659!
366
    stDebug("s-task:0x%x set recalated backend %p", pTask->id.taskId, pBackend);
×
367
  } else {
368
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
2,659✔
369
  }
370
  return 0;
2,659✔
371
}
372

373
void streamMetaRemoveDB(void* arg, char* key) {
2,613✔
374
  if (arg == NULL || key == NULL) return;
2,613!
375

376
  SStreamMeta* pMeta = arg;
2,613✔
377
  streamMutexLock(&pMeta->backendMutex);
2,613✔
378
  int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
2,613✔
379
  if (code) {
2,613!
380
    stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
×
381
  }
382

383
  streamMutexUnlock(&pMeta->backendMutex);
2,613✔
384
}
385

386
int32_t streamMetaUpdateInfoInit(STaskUpdateInfo* pInfo) {
13,130✔
387
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
13,130✔
388

389
  pInfo->pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
13,131✔
390
  if (pInfo->pTasks == NULL) {
13,131!
391
    return terrno;
×
392
  }
393

394
  return TSDB_CODE_SUCCESS;
13,131✔
395
}
396

397
void streamMetaUpdateInfoCleanup(STaskUpdateInfo* pInfo) {
13,092✔
398
  taosHashCleanup(pInfo->pTasks);
13,092✔
399
  pInfo->pTasks = NULL;
13,095✔
400
}
13,095✔
401

402
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
13,073✔
403
                       int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
404
  QRY_PARAM_CHECK(p);
13,073!
405
  int32_t code = 0;
13,073✔
406
  int32_t lino = 0;
13,073✔
407

408
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
13,073!
409
  if (pMeta == NULL) {
13,129!
410
    stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
×
411
    return terrno;
×
412
  }
413

414
  int32_t len = strlen(path) + 64;
13,129✔
415
  char*   tpath = taosMemoryCalloc(1, len);
13,129!
416
  TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
13,129!
417

418
  (void)snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream");
13,129✔
419
  pMeta->path = tpath;
13,129✔
420

421
  code = streamMetaOpenTdb(pMeta);
13,129✔
422
  TSDB_CHECK_CODE(code, lino, _err);
13,131!
423

424
  if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
13,131!
425
    stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
×
426
            tstrerror(terrno));
427
    TSDB_CHECK_CODE(code, lino, _err);
×
428
  }
429

430
  // set the attribute when running on Linux OS
431
  TdThreadRwlockAttr attr;
432
  code = taosThreadRwlockAttrInit(&attr);
13,130✔
433
  TSDB_CHECK_CODE(code, lino, _err);
13,131!
434

435
#ifdef LINUX
436
  code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
13,131✔
437
  TSDB_CHECK_CODE(code, lino, _err);
13,130!
438
#endif
439

440
  code = taosThreadRwlockInit(&pMeta->lock, &attr);
13,130✔
441
  TSDB_CHECK_CODE(code, lino, _err);
13,129!
442

443
  code = taosThreadRwlockAttrDestroy(&attr);
13,129✔
444
  TSDB_CHECK_CODE(code, lino, _err);
13,129!
445

446
  if ((code = streamMetaBegin(pMeta) < 0)) {
13,129!
447
    stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
×
448
    goto _err;
×
449
  }
450

451
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
13,130✔
452
  pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
13,131✔
453
  TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
13,130!
454

455
  code = streamMetaUpdateInfoInit(&pMeta->updateInfo);
13,130✔
456
  TSDB_CHECK_CODE(code, lino, _err);
13,130!
457

458
  code = streamMetaInitStartInfo(&pMeta->startInfo);
13,130✔
459
  TSDB_CHECK_CODE(code, lino, _err);
13,131!
460

461
  // task list
462
  pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
13,131✔
463
  TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
13,130!
464

465
  pMeta->scanInfo.scanSentinel = 0;
13,130✔
466
  pMeta->scanInfo.lastScanTs = 0;
13,130✔
467
  pMeta->scanInfo.tickCounter = 0;
13,130✔
468

469
  pMeta->vgId = vgId;
13,130✔
470
  pMeta->ahandle = ahandle;
13,130✔
471
  pMeta->buildTaskFn = buildTaskFn;
13,130✔
472
  pMeta->expandTaskFn = expandTaskFn;
13,130✔
473
  pMeta->stage = stage;
13,130✔
474
  pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
13,130✔
475
  pMeta->updateInfo.activeTransId = -1;
13,130✔
476
  pMeta->updateInfo.completeTransId = -1;
13,130✔
477

478
  pMeta->startInfo.completeFn = fn;
13,130✔
479
  pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
13,130✔
480
  TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
13,129!
481

482
  pMeta->numOfPausedTasks = 0;
13,129✔
483
  pMeta->numOfStreamTasks = 0;
13,129✔
484
  pMeta->closeFlag = false;
13,129✔
485

486
  stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
13,129!
487

488
  code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
13,131✔
489
  TSDB_CHECK_CODE(code, lino, _err);
13,131!
490

491
  code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
13,131✔
492
  TSDB_CHECK_CODE(code, lino, _err);
13,131!
493

494
  // add refId at the end of initialization function
495
  pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
13,131✔
496

497
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
13,131!
498
  TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
13,130!
499

500
  memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
13,130✔
501

502
  code = metaRefMgtAdd(pMeta->vgId, pRid);
13,130✔
503
  TSDB_CHECK_CODE(code, lino, _err);
13,131!
504

505
  code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
13,131✔
506

507
  TSDB_CHECK_CODE(code, lino, _err);
13,131!
508

509
  *p = pMeta;
13,131✔
510
  return code;
13,131✔
511

512
_err:
×
513
  taosMemoryFree(pMeta->path);
×
514
  if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
×
515
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
×
516
  if (pMeta->pTaskDb) {
×
517
    tdbTbClose(pMeta->pTaskDb);
×
518
    pMeta->pTaskDb = NULL;
×
519
  }
520
  if (pMeta->pCheckpointDb) {
×
521
    tdbTbClose(pMeta->pCheckpointDb);
×
522
  }
523
  if (pMeta->db) {
×
524
    tdbClose(pMeta->db);
×
525
  }
526

527
  if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
×
528
  if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
×
529
  if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
×
530
  if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
×
531
  if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
×
532

533
  if (pMeta->startInfo.pStagesList) taosArrayDestroy(pMeta->startInfo.pStagesList);
×
534
  taosMemoryFree(pMeta);
×
535

536
  stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
×
537
  return code;
×
538
}
539

540
// todo refactor: the lock shoud be restricted in one function
541
#ifdef BUILD_NO_CALL
542
void streamMetaInitBackend(SStreamMeta* pMeta) {
543
  pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
544
  if (pMeta->streamBackend == NULL) {
545
    while (1) {
546
      streamMetaWLock(pMeta);
547
      pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
548
      if (pMeta->streamBackend != NULL) {
549
        break;
550
      }
551

552
      streamMetaWUnLock(pMeta);
553
      stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
554
      taosMsleep(100);
555
    }
556
  }
557

558
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
559
  streamBackendLoadCheckpointInfo(pMeta);
560
}
561
#endif
562

563
void streamMetaClear(SStreamMeta* pMeta) {
13,106✔
564
  // remove all existed tasks in this vnode
565
  int64_t st = taosGetTimestampMs();
13,106✔
566
  void*   pIter = NULL;
13,106✔
567

568
  while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
15,385✔
569
    int64_t      refId = *(int64_t*)pIter;
2,278✔
570
    SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
2,278✔
571
    if (p == NULL) {
2,279✔
572
      continue;
2,024✔
573
    }
574

575
    // release the ref by timer
576
    if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) {  // one more ref in timer
255!
577
      stDebug("s-task:%s stop schedTimer", p->id.idStr);
2!
578
      streamTmrStop(p->schedInfo.pDelayTimer);
2✔
579
      p->info.delaySchedParam = 0;
2✔
580
    }
581

582
    int32_t code = taosRemoveRef(streamTaskRefPool, refId);
255✔
583
    if (code) {
255!
584
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
585
    }
586

587
    code = taosReleaseRef(streamTaskRefPool, refId);
255✔
588
    if (code) {
255!
589
      stError("vgId:%d failed to release refId:%" PRId64, pMeta->vgId, refId);
×
590
    }
591
  }
592

593
  int64_t et = taosGetTimestampMs();
13,106✔
594
  stDebug("vgId:%d clear task map, elapsed time:%.2fs", pMeta->vgId, (et - st)/1000.0);
13,106✔
595

596
  if (pMeta->streamBackendRid != 0) {
13,106!
597
    int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
×
598
    if (code) {
×
599
      stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
×
600
    }
601
  }
602

603
  int64_t et1 = taosGetTimestampMs();
13,106✔
604
  stDebug("vgId:%d clear backend completed, elapsed time:%.2fs", pMeta->vgId, (et1 - et)/1000.0);
13,106✔
605

606
  taosHashClear(pMeta->pTasksMap);
13,106✔
607

608
  taosArrayClear(pMeta->pTaskList);
13,106✔
609
  taosArrayClear(pMeta->chkpSaved);
13,106✔
610
  taosArrayClear(pMeta->chkpInUse);
13,106✔
611

612
  pMeta->numOfStreamTasks = 0;
13,106✔
613
  pMeta->numOfPausedTasks = 0;
13,106✔
614

615
  // NOTE: the willrestart/starting flag can NOT be cleared
616
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
13,106✔
617
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
13,106✔
618
  taosArrayClear(pMeta->startInfo.pStagesList);
13,106✔
619
  taosArrayClear(pMeta->startInfo.pRecvChkptIdTasks);
13,106✔
620

621
  pMeta->startInfo.readyTs = 0;
13,106✔
622
  pMeta->startInfo.elapsedTime = 0;
13,106✔
623
  pMeta->startInfo.startTs = 0;
13,106✔
624
}
13,106✔
625

626
void streamMetaClose(SStreamMeta* pMeta) {
13,096✔
627
  if (pMeta == NULL) {
13,096✔
628
    return;
1✔
629
  }
630

631
  stDebug("vgId:%d start to close stream meta", pMeta->vgId);
13,095✔
632
  int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
13,095✔
633
  if (code) {
13,095!
634
    stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
×
635
  }
636
}
637

638
void streamMetaCloseImpl(void* arg) {
13,095✔
639
  SStreamMeta* pMeta = arg;
13,095✔
640
  if (pMeta == NULL) {
13,095!
641
    return;
×
642
  }
643

644
  int32_t code = 0;
13,095✔
645
  int32_t vgId = pMeta->vgId;
13,095✔
646
  stDebug("vgId:%d start to do-close stream meta", vgId);
13,095✔
647

648
  streamMetaWLock(pMeta);
13,095✔
649
  streamMetaClear(pMeta);
13,095✔
650
  streamMetaWUnLock(pMeta);
13,095✔
651

652
  // already log the error, ignore here
653
  tdbAbort(pMeta->db, pMeta->txn);
13,095✔
654
  tdbTbClose(pMeta->pTaskDb);
13,091✔
655
  tdbTbClose(pMeta->pCheckpointDb);
13,093✔
656
  tdbClose(pMeta->db);
13,095✔
657

658
  taosArrayDestroy(pMeta->pTaskList);
13,095✔
659
  taosArrayDestroy(pMeta->chkpSaved);
13,095✔
660
  taosArrayDestroy(pMeta->chkpInUse);
13,095✔
661

662
  taosHashCleanup(pMeta->pTasksMap);
13,095✔
663
  taosHashCleanup(pMeta->pTaskDbUnique);
13,095✔
664

665
  streamMetaUpdateInfoCleanup(&pMeta->updateInfo);
13,095✔
666
  streamMetaClearStartInfo(&pMeta->startInfo);
13,095✔
667

668
  destroyMetaHbInfo(pMeta->pHbInfo);
13,095✔
669
  pMeta->pHbInfo = NULL;
13,092✔
670

671
  taosMemoryFree(pMeta->path);
13,092!
672
  streamMutexDestroy(&pMeta->backendMutex);
13,092✔
673

674
  bkdMgtDestroy(pMeta->bkdChkptMgt);
13,094✔
675

676
  pMeta->role = NODE_ROLE_UNINIT;
13,094✔
677
  code = taosThreadRwlockDestroy(&pMeta->lock);
13,094✔
678
  if (code) {
13,095!
679
    stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
×
680
  }
681

682
  taosMemoryFree(pMeta);
13,095!
683
  stDebug("vgId:%d end to close stream meta", vgId);
13,095✔
684
}
685

686
// todo let's check the status for each task
687
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
7,974✔
688
  int32_t vgId = pTask->pMeta->vgId;
7,974✔
689
  void*   buf = NULL;
7,974✔
690
  int32_t len;
691
  int32_t code;
692
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
7,974!
693
  if (code < 0) {
7,972!
694
    return -1;
×
695
  }
696

697
  buf = taosMemoryCalloc(1, len);
7,972!
698
  if (buf == NULL) {
7,974!
699
    return terrno;
×
700
  }
701

702
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
7,974✔
703
    pTask->ver = SSTREAM_TASK_VER;
26✔
704
  }
705

706
  SEncoder encoder = {0};
7,974✔
707
  tEncoderInit(&encoder, buf, len);
7,974✔
708
  code = tEncodeStreamTask(&encoder, pTask);
7,974✔
709
  tEncoderClear(&encoder);
7,973✔
710

711
  if (code == -1) {
7,972!
712
    stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
713
    return TSDB_CODE_INVALID_MSG;
×
714
  }
715

716
  int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
7,972✔
717

718
  code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
7,972✔
719
  if (code != TSDB_CODE_SUCCESS) {
7,968!
720
    code = terrno;
×
721
    stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
×
722
            vgId, pTask->id.refId, tstrerror(code));
723

724
    int64_t refId = pTask->id.refId;
×
725
    int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
×
726
    if (ret != 0) {
×
727
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId);
×
728
    }
729
  } else {
730
    stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
7,968!
731
  }
732

733
  taosMemoryFree(buf);
7,974!
734
  return code;
7,974✔
735
}
736

737
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
4,486✔
738
  int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
4,486✔
739
  int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
4,486✔
740
  if (code != 0) {
4,485!
741
    stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
×
742
            tstrerror(terrno));
743
  } else {
744
    stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
4,485!
745
  }
746

747
  return code;
4,486✔
748
}
749

750
// add to the ready tasks hash map, not the restored tasks hash map
751
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
6,576✔
752
  *pAdded = false;
6,576✔
753

754
  int32_t code = 0;
6,576✔
755
  int64_t refId = 0;
6,576✔
756
  STaskId id = streamTaskGetTaskId(pTask);
6,576✔
757
  void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
6,576✔
758

759
  if (p != NULL) {
6,576!
760
    stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
×
761
    tFreeStreamTask(pTask);
×
762
    return code;
×
763
  }
764

765
  if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
6,576!
766
    tFreeStreamTask(pTask);
×
767
    return code;
×
768
  }
769

770
  p = taosArrayPush(pMeta->pTaskList, &pTask->id);
6,577✔
771
  if (p == NULL) {
6,577!
772
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
773
    tFreeStreamTask(pTask);
×
774
    return terrno;
×
775
  }
776

777
  pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
6,577✔
778
  code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
6,577✔
779
  if (code) {
6,577!
780
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
781
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
782

783
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
784
    if (ret != 0) {
×
785
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
786
    }
787
    return code;
×
788
  }
789

790
  if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
6,577!
791
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
792
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
793

794
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
795
    if (ret) {
×
796
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
797
    }
798
    return code;
×
799
  }
800

801
  if ((code = streamMetaCommit(pMeta)) != 0) {
6,577!
802
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
803
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
804

805
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
806
    if (ret) {
×
807
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
808
    }
809

810
    return code;
×
811
  }
812

813
  if (pTask->info.fillHistory == 0) {
6,577✔
814
    int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
4,715✔
815
  }
816

817
  *pAdded = true;
6,577✔
818
  return code;
6,577✔
819
}
820

821
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
52,231✔
822
  int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
52,231✔
823
  int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
52,231✔
824
  if (sizeInList != size) {
52,231!
825
    stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
×
826
  }
827

828
  return size;
52,231✔
829
}
830

831
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
353,651✔
832
  QRY_PARAM_CHECK(pTask);
353,651!
833
  STaskId  id = {.streamId = streamId, .taskId = taskId};
353,651✔
834
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
353,651✔
835
  if (pTaskRefId == NULL) {
353,595✔
836
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
523✔
837
  }
838

839
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
353,072✔
840
  if (p == NULL) {
353,169✔
841
    stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId);
1,958✔
842
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
1,956✔
843
  }
844

845
  if (p->id.refId != *pTaskRefId) {
351,211!
846
    stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
×
847
            p->id.refId);
848
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
849
    if (ret) {
×
850
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
851
    }
852

853
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
854
  }
855

856
  if (streamTaskShouldStop(p)) {
351,211✔
857
    stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
7,053!
858
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
7,053✔
859
    if (ret) {
7,053!
860
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
861
    }
862
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
7,053✔
863
  }
864

865
  stTrace("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
344,101✔
866
  *pTask = p;
344,102✔
867
  return TSDB_CODE_SUCCESS;
344,102✔
868
}
869

870
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
55,835✔
871
  QRY_PARAM_CHECK(pTask);
55,835!
872
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
55,835✔
873

874
  if (pTaskRefId == NULL) {
55,837✔
875
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
1,143✔
876
  }
877

878
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
54,694✔
879
  if (p == NULL) {
54,693!
880
    stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
×
881
            *pTaskRefId);
882
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
883
  }
884

885
  if (p->id.refId != *pTaskRefId) {
54,693!
886
    stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
×
887
            *pTaskRefId, p->id.refId);
888
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
889
    if (ret) {
×
890
      stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
×
891
    }
892

893
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
894
  }
895

896
  stTrace("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
54,693✔
897
  *pTask = p;
54,694✔
898
  return TSDB_CODE_SUCCESS;
54,694✔
899
}
900

901
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
348,889✔
902
  streamMetaRLock(pMeta);
348,889✔
903
  int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
348,905✔
904
  streamMetaRUnLock(pMeta);
348,878✔
905
  return code;
348,912✔
906
}
907

908
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
466,302✔
909
  if (pTask == NULL) {
466,302✔
910
    return;
12✔
911
  }
912

913
  int32_t taskId = pTask->id.taskId;
466,290✔
914
  int64_t refId = pTask->id.refId;
466,290✔
915
  stTrace("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
466,290✔
916
  int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
466,293✔
917
  if (ret) {
466,336!
918
    stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
×
919
  }
920
}
921

922
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
4,486✔
923
  bool remove = false;
4,486✔
924
  for (int32_t i = 0; i < num; ++i) {
10,519!
925
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
10,519✔
926
    if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
10,519✔
927
      taosArrayRemove(pTaskList, i);
4,486✔
928
      remove = true;
4,486✔
929
      break;
4,486✔
930
    }
931
  }
932

933
  if (!remove) {
4,486!
934
    stError("s-task:0x%x not in meta task list, internal error", id->taskId);
×
935
  }
936
}
4,486✔
937

938
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
4,486✔
939
  int32_t code = 0;
4,486✔
940

941
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
4,486✔
942
    code = streamTaskSendCheckpointSourceRsp(pTask);
2,238✔
943
    if (code) {
2,238!
944
      stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
×
945
              tstrerror(code));
946
    }
947
  }
948

949
  // let's kill the query procedure within stream, to end it ASAP.
950
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
4,486✔
951
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
2,303✔
952
    if (code != TSDB_CODE_SUCCESS) {
2,303!
953
      stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
954
    }
955
  }
956
  return code;
4,486✔
957
}
958

959
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
4,522✔
960
  SStreamTask* pTask = NULL;
4,522✔
961
  int32_t      vgId = pMeta->vgId;
4,522✔
962
  int32_t      code = 0;
4,522✔
963
  STaskId      id = {.streamId = streamId, .taskId = taskId};
4,522✔
964

965
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
4,522✔
966
  if (code == 0) {
4,522✔
967
    // desc the paused task counter
968
    if (streamTaskShouldPause(pTask)) {
4,486!
969
      int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
×
970
      stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
×
971
    }
972

973
    // handle the dropping event
974
    code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
4,486✔
975
    if (code) {
4,486!
976
      stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
×
977
    }
978

979
    stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
4,486!
980

981
    // it is a fill-history task, remove the related stream task's id that points to it
982
    if (pTask->info.fillHistory == 0) {
4,486✔
983
      int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
2,637✔
984
    }
985

986
    code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
4,486✔
987
    doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
4,486✔
988
    code = streamMetaRemoveTaskInMeta(pMeta, &id);
4,486✔
989
    if (code) {
4,486!
990
      stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
×
991
    }
992

993
    int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
4,486✔
994
    int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
4,486✔
995
    if (sizeInList != size) {
4,486!
996
      stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
×
997
    }
998

999
    if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
4,486✔
1000
      stDebug("s-task:%s stop schedTimer", pTask->id.idStr);
479!
1001
      streamTmrStop(pTask->schedInfo.pDelayTimer);
479✔
1002
      pTask->info.delaySchedParam = 0;
479✔
1003
    }
1004

1005
    int64_t refId = pTask->id.refId;
4,486✔
1006
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
4,486✔
1007
    if (ret != 0) {
4,486!
1008
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1009
    }
1010

1011
    streamMetaReleaseTask(pMeta, pTask);
4,486✔
1012
  } else {
1013
    stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
36!
1014
  }
1015

1016
  return 0;
4,522✔
1017
}
1018

1019
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
×
1020
  SStreamTask* pTask = NULL;
×
1021
  int32_t      code = 0;
×
1022
  int32_t      vgId = pMeta->vgId;
×
1023
  int32_t      numOfTasks = 0;
×
1024

1025
  streamMetaWLock(pMeta);
×
1026

1027
//  code = streamMetaUnregisterTask(pMeta, streamId, taskId);
1028
//  numOfTasks = streamMetaGetNumOfTasks(pMeta);
1029
//  if (code) {
1030
//    stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1031
//  }
1032
//
1033
//  code = streamMetaCommit(pMeta);
1034
//  if (code) {
1035
//    stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1036
//  } else {
1037
//    stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
1038
//  }
1039

1040
  streamMetaWUnLock(pMeta);
×
1041

1042
  return code;
×
1043
}
1044

1045
int32_t streamMetaBegin(SStreamMeta* pMeta) {
13,123✔
1046
  streamMetaWLock(pMeta);
13,123✔
1047
  int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
13,131✔
1048
                          TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1049
  if (code) {
13,131!
1050
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1051
  }
1052
  streamMetaWUnLock(pMeta);
13,131✔
1053
  return code;
13,130✔
1054
}
1055

1056
int32_t streamMetaCommit(SStreamMeta* pMeta) {
23,754✔
1057
  int32_t code = tdbCommit(pMeta->db, pMeta->txn);
23,754✔
1058
  if (code != 0) {
23,754!
1059
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1060
    stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1061
            pMeta->fatalInfo.line);
1062
  }
1063

1064
  code = tdbPostCommit(pMeta->db, pMeta->txn);
23,754✔
1065
  if (code != 0) {
23,753!
1066
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1067
    stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1068
            pMeta->fatalInfo.line);
1069
    return code;
×
1070
  }
1071

1072
  code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
23,753✔
1073
                  TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1074
  if (code != 0) {
23,754!
1075
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1076
    stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
×
1077
  } else {
1078
    stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
23,754✔
1079
  }
1080

1081
  return code;
23,754✔
1082
}
1083

1084
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
86✔
1085
  int64_t checkpointId = 0;
86✔
1086
  int32_t code = 0;
86✔
1087

1088
  TBC* pCur = NULL;
86✔
1089
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
86!
1090
    stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1091
    return checkpointId;
×
1092
  }
1093

1094
  void*    pKey = NULL;
86✔
1095
  int32_t  kLen = 0;
86✔
1096
  void*    pVal = NULL;
86✔
1097
  int32_t  vLen = 0;
86✔
1098
  SDecoder decoder;
1099

1100
  code = tdbTbcMoveToFirst(pCur);
86✔
1101
  if (code) {
86!
1102
    stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1103
    tdbTbcClose(pCur);
×
1104
    return checkpointId;
×
1105
  }
1106

1107
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
286✔
1108
    if (pVal == NULL || vLen == 0) {
200!
1109
      break;
1110
    }
1111
    SCheckpointInfo info;
1112
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
200✔
1113
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
200✔
1114
      tDecoderClear(&decoder);
32✔
1115
      continue;
32✔
1116
    }
1117
    tDecoderClear(&decoder);
168✔
1118

1119
    checkpointId = TMAX(checkpointId, info.checkpointId);
168✔
1120
  }
1121

1122
  stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
86!
1123

1124
  tdbFree(pKey);
86✔
1125
  tdbFree(pVal);
86✔
1126

1127
  tdbTbcClose(pCur);
86✔
1128
  return checkpointId;
86✔
1129
}
1130

1131
static void dropHistoryTaskIfNoStreamTask(SStreamMeta* pMeta, SArray*  pRecycleList) {
13,134✔
1132
  int32_t i = 0;
13,134✔
1133
  while (i < taosArrayGetSize(pMeta->pTaskList)) {
13,322✔
1134
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
188✔
1135
    if (pTaskId == NULL) {
188!
1136
      i++;
×
1137
      continue;
×
1138
    }
1139
    SStreamTask* task = taosAcquireRef(streamTaskRefPool, pTaskId->refId);
188✔
1140
    if (task != NULL && task->info.fillHistory == 1) {
188!
1141
      if (taosHashGet(pMeta->pTasksMap, &task->streamTaskId, sizeof(STaskId)) == NULL &&
×
1142
        task->status.taskStatus != TASK_STATUS__DROPPING) {
×
1143
        STaskId id = streamTaskGetTaskId(task);
×
1144
        if (taosArrayPush(pRecycleList, &id) == NULL) {
×
1145
          stError("%s s-task:0x%x failed to add into pRecycleList list due to:%d", __FUNCTION__, task->id.taskId, terrno);
×
1146
        } else {
1147
          int32_t total = taosArrayGetSize(pRecycleList);
×
1148
          stInfo("%s s-task:0x%x is already dropped, add into recycle list, total:%d", __FUNCTION__, task->id.taskId, total);
×
1149
        }
1150
        int32_t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(STaskId));
×
1151
        if (code == 0) {
×
1152
          taosArrayRemove(pMeta->pTaskList, i);
×
1153
        } else {
1154
          i++;
×
1155
          stError("%s s-task:0x%x failed to remove task from taskmap, code:%d", __FUNCTION__, task->id.taskId, code);
×
1156
        }
1157
        if (taosReleaseRef(streamTaskRefPool, pTaskId->refId) != 0) {
×
1158
          stError("%s s-task:0x%x failed to release refId:%" PRId64, __FUNCTION__, task->id.taskId, pTaskId->refId);
×
1159
        }
1160
        continue;
×
1161
      }
1162
    }
1163
    if (task != NULL && taosReleaseRef(streamTaskRefPool, pTaskId->refId) != 0) {
188!
1164
      stError("%s s-task:0x%x failed to release refId:%" PRId64, __FUNCTION__, task->id.taskId, pTaskId->refId);
×
1165
    }
1166
    i++;
188✔
1167
  }
1168
}
13,134✔
1169

1170
// not allowed to return error code
1171
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
13,134✔
1172
  TBC*     pCur = NULL;
13,134✔
1173
  void*    pKey = NULL;
13,134✔
1174
  int32_t  kLen = 0;
13,134✔
1175
  void*    pVal = NULL;
13,134✔
1176
  int32_t  vLen = 0;
13,134✔
1177
  SDecoder decoder;
1178
  int32_t  vgId = 0;
13,134✔
1179
  int32_t  code = 0;
13,134✔
1180
  SArray*  pRecycleList = NULL;
13,134✔
1181

1182
  if (pMeta == NULL) {
13,134!
1183
    return;
×
1184
  }
1185

1186
  vgId = pMeta->vgId;
13,134✔
1187
  pRecycleList = taosArrayInit(4, sizeof(STaskId));
13,134✔
1188
  if (pRecycleList == NULL) {
13,134!
1189
    stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
×
1190
    return;
×
1191
  }
1192

1193
  stInfo("vgId:%d load stream tasks from meta files", vgId);
13,134!
1194

1195
  code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
13,134✔
1196
  if (code != TSDB_CODE_SUCCESS) {
13,134!
1197
    stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1198
    taosArrayDestroy(pRecycleList);
×
1199
    return;
×
1200
  }
1201

1202
  code = tdbTbcMoveToFirst(pCur);
13,134✔
1203
  if (code) {
13,134!
1204
    stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1205
    taosArrayDestroy(pRecycleList);
×
1206
    tdbTbcClose(pCur);
×
1207
    return;
×
1208
  }
1209

1210
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
13,322✔
1211
    if (pVal == NULL || vLen == 0) {
196!
1212
      break;
1213
    }
1214

1215
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
196!
1216
    if (pTask == NULL) {
196!
1217
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1218
      stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
×
1219
      break;
×
1220
    }
1221

1222
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
196✔
1223
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
196✔
1224
      tDecoderClear(&decoder);
8✔
1225
      tFreeStreamTask(pTask);
8✔
1226
      stError(
8!
1227
          "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
1228
          "stream manually",
1229
          vgId, tsDataDir);
1230
      break;
8✔
1231
    }
1232
    tDecoderClear(&decoder);
188✔
1233

1234
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
188!
1235
      int32_t taskId = pTask->id.taskId;
×
1236
      STaskId id = streamTaskGetTaskId(pTask);
×
1237

1238
      tFreeStreamTask(pTask);
×
1239
      void* px = taosArrayPush(pRecycleList, &id);
×
1240
      if (px == NULL) {
×
1241
        stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
×
1242
      }
1243

1244
      int32_t total = taosArrayGetSize(pRecycleList);
×
1245
      stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
×
1246
      continue;
×
1247
    }
1248

1249
    stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
188!
1250
            pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
1251

1252
    // do duplicate task check.
1253
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
188✔
1254
    void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
188✔
1255
    if (p == NULL) {
188!
1256
      code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
188✔
1257
      if (code < 0) {
188!
1258
        stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
×
1259
        tFreeStreamTask(pTask);
×
1260
        continue;
×
1261
      }
1262

1263
      void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
188✔
1264
      if (px == NULL) {
188!
1265
        stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
×
1266
      }
1267
    } else {
1268
      // todo this should replace the existed object put by replay creating stream task msg from mnode
1269
      stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
×
1270
      tFreeStreamTask(pTask);
×
1271
      continue;
×
1272
    }
1273

1274
    pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
188✔
1275

1276
    if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
188!
1277
      int64_t refId = pTask->id.refId;
×
1278
      stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
×
1279
              pTask->id.taskId, tstrerror(terrno), refId);
1280

1281
      void*   px = taosArrayPop(pMeta->pTaskList);
×
1282
      int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
1283
      if (ret != 0) {
×
1284
        stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1285
      }
1286
      continue;
×
1287
    }
1288

1289
    stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId);
188!
1290
    if (pTask->info.fillHistory == 0) {
188✔
1291
      int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
153✔
1292
    }
1293

1294
    if (streamTaskShouldPause(pTask)) {
188!
1295
      int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
1296
    }
1297
  }
1298

1299
  tdbFree(pKey);
13,134✔
1300
  tdbFree(pVal);
13,134✔
1301

1302
  tdbTbcClose(pCur);
13,134✔
1303

1304
  dropHistoryTaskIfNoStreamTask(pMeta, pRecycleList);
13,134✔
1305

1306
  if (taosArrayGetSize(pRecycleList) > 0) {
13,134!
1307
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
×
1308
      STaskId* pId = taosArrayGet(pRecycleList, i);
×
1309
      code = streamMetaRemoveTaskInMeta(pMeta, pId);
×
1310
      if (code) {
×
1311
        stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
×
1312
      }
1313
    }
1314
  }
1315

1316
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
13,133✔
1317
  stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
13,133✔
1318
          pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
1319

1320
  taosArrayDestroy(pRecycleList);
13,134✔
1321
  code = streamMetaCommit(pMeta);
13,134✔
1322
  if (code) {
13,134!
1323
    stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
×
1324
  }
1325
}
1326

1327
void streamMetaNotifyClose(SStreamMeta* pMeta) {
20,501✔
1328
  int32_t vgId = pMeta->vgId;
20,501✔
1329
  int64_t startTs = 0;
20,501✔
1330
  int32_t sendCount = 0;
20,501✔
1331
  int32_t numOfTasks = 0;
20,501✔
1332

1333
  streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
20,501✔
1334
  stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
20,501!
1335
         vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
1336

1337
  // wait for the stream meta hb function stopping
1338
  pMeta->closeFlag = true;
20,501✔
1339

1340
  if (!tsDisableStream) { // stream is disabled, no need to wait for the timer out
20,501!
1341
    streamMetaWaitForHbTmrQuit(pMeta);
20,501✔
1342
  }
1343

1344
  stDebug("vgId:%d start to check all tasks for closing", vgId);
20,500✔
1345
  int64_t st = taosGetTimestampMs();
20,500✔
1346

1347
  streamMetaRLock(pMeta);
20,500✔
1348

1349
  SArray* pTaskList = NULL;
20,501✔
1350
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
20,501✔
1351
  if (code != TSDB_CODE_SUCCESS) {
1352
  }
1353

1354
  numOfTasks = taosArrayGetSize(pTaskList);
20,501✔
1355
  for (int32_t i = 0; i < numOfTasks; ++i) {
24,728✔
1356
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
4,227✔
1357
    SStreamTask*   pTask = NULL;
4,227✔
1358

1359
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
4,227✔
1360
    if (code != TSDB_CODE_SUCCESS) {
4,227✔
1361
      continue;
2,203✔
1362
    }
1363

1364
    int64_t refId = pTask->id.refId;
2,024✔
1365
    int32_t ret = streamTaskStop(pTask);
2,024✔
1366
    if (ret) {
2,024!
1367
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
1368
    }
1369

1370
    streamMetaReleaseTask(pMeta, pTask);
2,024✔
1371
    ret = taosRemoveRef(streamTaskRefPool, refId);
2,024✔
1372
    if (ret) {
2,024!
1373
      stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
×
1374
    }
1375
  }
1376

1377
  taosArrayDestroy(pTaskList);
20,501✔
1378

1379
  double el = (taosGetTimestampMs() - st) / 1000.0;
20,501✔
1380
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
20,501✔
1381

1382
  if (pMeta->scanInfo.scanTimer != NULL) {
20,502✔
1383
    streamTmrStop(pMeta->scanInfo.scanTimer);
10,782✔
1384
    pMeta->scanInfo.scanTimer = NULL;
10,782✔
1385
  }
1386

1387
  streamMetaRUnLock(pMeta);
20,502✔
1388
}
20,500✔
1389

1390
void streamMetaStartHb(SStreamMeta* pMeta) {
10,828✔
1391
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
10,828!
1392
  if (pRid == NULL) {
10,828!
1393
    stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
×
1394
    return;
×
1395
  }
1396

1397
  *pRid = pMeta->rid;
10,828✔
1398
  int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
10,828✔
1399
  if (code) {
10,828!
1400
    return;
×
1401
  }
1402

1403
  streamMetaHbToMnode(pRid, NULL);
10,828✔
1404
}
1405

1406
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
20,599✔
1407
  QRY_PARAM_CHECK(pList);
20,599!
1408

1409
  int32_t code = 0;
20,599✔
1410
  SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
20,599✔
1411
  if (pTaskList == NULL) {
20,599!
1412
    stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
×
1413
    return terrno;
×
1414
  }
1415

1416
  *pList = pTaskList;
20,599✔
1417

1418
  bool sendMsg = pMeta->sendMsgBeforeClosing;
20,599✔
1419
  if (!sendMsg) {
20,599✔
1420
    stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
20,545✔
1421
    return TSDB_CODE_SUCCESS;
20,545✔
1422
  }
1423

1424
  stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
54!
1425

1426
  // send hb msg to mnode before closing all tasks.
1427
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
54✔
1428
  for (int32_t i = 0; i < numOfTasks; ++i) {
58✔
1429
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
4✔
1430
    SStreamTask*   pTask = NULL;
4✔
1431

1432
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
4✔
1433
    if (code != TSDB_CODE_SUCCESS) {  // this error is ignored
4!
1434
      continue;
×
1435
    }
1436

1437
    streamTaskSetCheckpointFailed(pTask);
4✔
1438
    streamMetaReleaseTask(pMeta, pTask);
4✔
1439
  }
1440

1441
  code = streamMetaSendHbHelper(pMeta);
54✔
1442
  pMeta->sendMsgBeforeClosing = false;
54✔
1443
  return TSDB_CODE_SUCCESS;  // always return true
54✔
1444
}
1445

1446
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t term, bool isLeader) {
16,509✔
1447
  streamMetaWLock(pMeta);
16,509✔
1448

1449
  int64_t prevTerm = pMeta->stage;
16,510✔
1450
  int32_t prevRole = pMeta->role;
16,510✔
1451

1452
  pMeta->stage = term;
16,510✔
1453
  pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
16,510✔
1454

1455
  // mark the sign to send msg before close all tasks
1456
  // 1. for a leader vnode, always send msg before closing itself
1457
  // 2. for a follower vnode, if it's changed from a leader, also sending msg before closing.
1458
  if (prevRole == NODE_ROLE_LEADER) {
16,510✔
1459
    pMeta->sendMsgBeforeClosing = true;
57✔
1460
  }
1461

1462
  if ((prevRole == NODE_ROLE_FOLLOWER || prevRole == NODE_ROLE_LEADER) && (prevRole != pMeta->role) &&
17,706✔
1463
      (taosArrayGetSize(pMeta->pTaskList) > 0)) {
1,196✔
1464
    SStreamTask* pTask = NULL;
12✔
1465
    STaskId*     pId = taosArrayGet(pMeta->pTaskList, 0);
12✔
1466

1467
    int32_t code = streamMetaAcquireTaskUnsafe(pMeta, pId, &pTask);
11✔
1468
    if (code == 0) {
11!
1469
      stInfo("vgId:%d role changed, added into nodeUpdate list, use s-task:0x%s", pMeta->vgId, pTask->id.idStr);
11!
1470
      int32_t unused = streamTaskAddIntoNodeUpdateList(pTask, pMeta->vgId);
11✔
1471
      streamMetaReleaseTask(pMeta, pTask);
11✔
1472
    }
1473
  }
1474

1475
  if (!isLeader) {
16,509✔
1476
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
5,682✔
1477
  } else {  // wait for nodeep update if become leader from follower
1478
    if (prevRole == NODE_ROLE_FOLLOWER) {
10,827✔
1479
      pMeta->startInfo.tasksWillRestart = 1;
1,139✔
1480
    }
1481
  }
1482

1483
  streamMetaWUnLock(pMeta);
16,509✔
1484

1485
  if (!tsDisableStream) {
16,510!
1486
    if (isLeader) {
16,510✔
1487
      if (prevRole == NODE_ROLE_FOLLOWER) {
10,828✔
1488
        stInfo("vgId:%d update term:%" PRId64 ", prevTerm:%" PRId64
1,139!
1489
               " prevRole:%d leader:%d, start to send Hb, rid:%" PRId64 " restart after nodeEp being updated",
1490
               pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->rid);
1491
      } else {
1492
        stInfo("vgId:%d update term:%" PRId64 ", prevTerm:%" PRId64
9,689!
1493
               " prevRole:%d leader:%d, start to send Hb, rid:%" PRId64,
1494
               pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->rid);
1495
      }
1496
      streamMetaStartHb(pMeta);
10,828✔
1497
    } else {
1498
      stInfo("vgId:%d update term:%" PRId64 " prevTerm:%" PRId64 " prevRole:%d leader:%d sendMsg beforeClosing:%d",
5,682!
1499
             pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->sendMsgBeforeClosing);
1500
    }
1501
  } else {
1502
    stInfo("vgId:%d stream is disabled, not start the Hb", pMeta->vgId);
×
1503
  }
1504
}
16,510✔
1505

1506
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
21✔
1507
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
21✔
1508
  for (int32_t i = 0; i < num; ++i) {
66✔
1509
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
48✔
1510
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
48✔
1511
    SStreamTask*   pTask = NULL;
48✔
1512
    int32_t        code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
48✔
1513

1514
    if (code == 0) {
48!
1515
      if (pTask->status.downstreamReady == 0) {
48✔
1516
        streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
3✔
1517
        return false;
3✔
1518
      }
1519
      streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
45✔
1520
    }
1521
  }
1522

1523
  return true;
18✔
1524
}
1525

1526
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
38✔
1527
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
38✔
1528

1529
  stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
38!
1530
  if (numOfTasks == 0) {
38!
1531
    return TSDB_CODE_SUCCESS;
×
1532
  }
1533

1534
  for (int32_t i = 0; i < numOfTasks; ++i) {
166✔
1535
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
128✔
1536
    STaskId        id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
128✔
1537
    SStreamTask*   pTask = NULL;
128✔
1538
    int32_t        code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
128✔
1539
    if (code == 0) {
128!
1540
      streamTaskResetStatus(pTask);
128✔
1541
      streamMetaReleaseTask(pMeta, pTask);
128✔
1542
    }
1543
  }
1544

1545
  return 0;
38✔
1546
}
1547

1548
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, int32_t transId, int64_t startTs) {
31✔
1549
  const char* id = pTask->id.idStr;
31✔
1550
  int32_t     vgId = pMeta->vgId;
31✔
1551
  int32_t     code = 0;
31✔
1552

1553
  // keep the already updated info
1554
  STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
31✔
1555
  code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
31✔
1556
  if (code != 0) {
31!
1557
    stError("s-task:%s failed to put updateTask into update list", id);
×
1558
  }
1559

1560
  int64_t el = taosGetTimestampMs() - startTs;
31✔
1561
  stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
31!
1562
          id, vgId, transId, el);
1563
}
31✔
1564

1565
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
17✔
1566
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
17✔
1567
  int32_t          num = taosHashGetSize(pInfo->pTasks);
17✔
1568

1569
  taosHashClear(pInfo->pTasks);
17✔
1570

1571
  int32_t prev = pInfo->completeTransId;
17✔
1572
  pInfo->completeTransId = pInfo->activeTransId;
17✔
1573
  pInfo->activeTransId = -1;
17✔
1574
  pInfo->completeTs = taosGetTimestampMs();
17✔
1575

1576
  stInfo("vgId:%d set the nodeEp update complete, ts:%" PRId64
17!
1577
          ", complete transId:%d->%d, update Tasks:%d reset active transId",
1578
          pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId, num);
1579
}
17✔
1580

1581
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId, SArray* pUpdateTaskList) {
31✔
1582
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
31✔
1583
  int32_t          numOfTasks = taosArrayGetSize(pUpdateTaskList);
31✔
1584

1585
  if (transId > pInfo->completeTransId) {
31!
1586
    if (pInfo->activeTransId == -1) {
31✔
1587
      taosHashClear(pInfo->pTasks);
17✔
1588
      pInfo->activeTransId = transId;
17✔
1589

1590
      // interrupt the start all tasks procedure, only partial tasks will be started
1591
      // the completion of this processed is based on the partial started tasks.
1592
      if (pMeta->startInfo.startAllTasks == 1) {
17✔
1593
        int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
3✔
1594
        pMeta->startInfo.partialTasksStarted = true;
3✔
1595
        stInfo(
3!
1596
            "vgId:%d set the active epset update transId:%d for %d tasks, prev complete transId:%d, start all "
1597
            "interrupted, only %d tasks were started",
1598
            pMeta->vgId, transId, numOfTasks, pInfo->completeTransId, num);
1599
      } else {
1600
        stInfo("vgId:%d set the active epset update transId:%d for %d tasks, prev complete transId:%d", pMeta->vgId,
14!
1601
               transId, numOfTasks, pInfo->completeTransId);
1602
      }
1603
      return true;
17✔
1604
    } else {
1605
      if (pInfo->activeTransId == transId) {
14!
1606
        // do nothing
1607
        return true;
14✔
1608
      } else if (transId < pInfo->activeTransId) {
×
1609
        stError("vgId:%d invalid(out of order) epset update transId:%d, active transId:%d, complete transId:%d, discard",
×
1610
                pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
1611
        return false;
×
1612
      } else {  // transId > pInfo->activeTransId
1613
        taosHashClear(pInfo->pTasks);
×
1614
        int32_t prev = pInfo->activeTransId;
×
1615
        pInfo->activeTransId = transId;
×
1616

1617
        stInfo(
×
1618
            "vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d, reqUpdate tasks:%d",
1619
            pMeta->vgId, prev, transId, pInfo->completeTransId, numOfTasks);
1620
        return true;
×
1621
      }
1622
    }
1623
  } else if (transId == pInfo->completeTransId) {
×
1624
    stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
×
1625
            pInfo->completeTs);
1626
    return false;
×
1627
  } else {  // pInfo->completeTransId > transId
1628
    stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
×
1629
            pMeta->vgId, pInfo->activeTransId, transId);
1630
    return false;
×
1631
  }
1632
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc