• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3825

01 Apr 2025 11:58AM UTC coverage: 34.067% (+0.003%) from 34.064%
#3825

push

travis-ci

happyguoxy
test:alter gcda dir

148492 of 599532 branches covered (24.77%)

Branch coverage included in aggregate %.

222504 of 489471 relevant lines covered (45.46%)

762290.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.28
/source/libs/stream/src/streamMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "streamInt.h"
18
#include "tmisce.h"
19
#include "tref.h"
20
#include "tsched.h"
21
#include "tstream.h"
22
#include "ttimer.h"
23
#include "wal.h"
24

25
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
26

27
int32_t streamBackendId = 0;
28
int32_t streamBackendCfWrapperId = 0;
29
int32_t taskDbWrapperId = 0;
30
int32_t streamTaskRefPool = 0;
31

32
static int32_t streamMetaBegin(SStreamMeta* pMeta);
33
static void    streamMetaCloseImpl(void* arg);
34

35
typedef struct {
36
  TdThreadMutex mutex;
37
  SHashObj*     pTable;
38
} SMetaRefMgt;
39

40
SMetaRefMgt gMetaRefMgt;
41

42
int32_t metaRefMgtInit();
43
void    metaRefMgtCleanup();
44

45
static void streamMetaEnvInit() {
135✔
46
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
135✔
47
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
135✔
48
  taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
135✔
49

50
  streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
135✔
51
  streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
135✔
52

53
  int32_t code = metaRefMgtInit();
135✔
54
  if (code) {
135!
55
    stError("failed to init stream meta mgmt env, start failed");
×
56
    return;
×
57
  }
58

59
  code = streamTimerInit();
135✔
60
  if (code) {
135!
61
    stError("failed to init stream meta env, start failed");
×
62
  }
63
}
64

65
void streamMetaInit() {
137✔
66
  int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
137✔
67
  if (code) {
137!
68
    stError("failed to init stream Meta model, code:%s", tstrerror(code));
×
69
  }
70
}
137✔
71

72
void streamMetaCleanup() {
70✔
73
  taosCloseRef(streamBackendId);
70✔
74
  taosCloseRef(streamBackendCfWrapperId);
70✔
75
  taosCloseRef(streamMetaRefPool);
70✔
76
  taosCloseRef(streamTaskRefPool);
70✔
77

78
  metaRefMgtCleanup();
70✔
79
  streamTimerCleanUp();
70✔
80
}
70✔
81

82
int32_t metaRefMgtInit() {
135✔
83
  int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
135✔
84
  if (code) {
135!
85
    return code;
×
86
  }
87

88
  if (code == 0) {
135!
89
    gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
135✔
90
  }
91

92
  if (gMetaRefMgt.pTable == NULL) {
135!
93
    return terrno;
×
94
  } else {
95
    return code;
135✔
96
  }
97
}
98

99
void metaRefMgtCleanup() {
70✔
100
  void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
70✔
101
  while (pIter) {
579✔
102
    int64_t* p = *(int64_t**)pIter;
509✔
103
    taosMemoryFree(p);
509!
104
    pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
509✔
105
  }
106

107
  taosHashCleanup(gMetaRefMgt.pTable);
70✔
108
  streamMutexDestroy(&gMetaRefMgt.mutex);
70✔
109
}
70✔
110

111
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
849✔
112
  int32_t code = 0;
849✔
113
  void*   p = NULL;
849✔
114

115
  streamMutexLock(&gMetaRefMgt.mutex);
849✔
116

117
  p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
849✔
118
  if (p == NULL) {
849!
119
    code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
849✔
120
    if (code) {
849!
121
      stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid,
×
122
              tstrerror(code));
123
      return code;
×
124
    } else {  // not
125
              //      stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
126
    }
127
  } else {
128
    stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid);
×
129
  }
130

131
  streamMutexUnlock(&gMetaRefMgt.mutex);
849✔
132
  return code;
849✔
133
}
134

135
void metaRefMgtRemove(int64_t* pRefId) {
332✔
136
  streamMutexLock(&gMetaRefMgt.mutex);
332✔
137

138
  int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
332✔
139
  taosMemoryFree(pRefId);
332!
140
  streamMutexUnlock(&gMetaRefMgt.mutex);
332✔
141
}
332✔
142

143
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
287✔
144
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
287!
145
    stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
×
146
    return -1;
×
147
  }
148

149
  if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
287!
150
    stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
×
151
    return -1;
×
152
  }
153

154
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
287!
155
    stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
×
156
    return -1;
×
157
  }
158

159
  return 0;
286✔
160
}
161

162
//
163
// impl later
164
//
165
enum STREAM_STATE_VER {
166
  STREAM_STATA_NO_COMPATIBLE,
167
  STREAM_STATA_COMPATIBLE,
168
  STREAM_STATA_NEED_CONVERT,
169
};
170

171
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
286✔
172
  int8_t  ret = STREAM_STATA_COMPATIBLE;
286✔
173
  TBC*    pCur = NULL;
286✔
174
  int32_t code = 0;
286✔
175
  void*   pKey = NULL;
286✔
176
  int32_t kLen = 0;
286✔
177
  void*   pVal = NULL;
286✔
178
  int32_t vLen = 0;
286✔
179

180
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {  // no task info, no stream
286!
181
    return ret;
×
182
  }
183

184
  code = tdbTbcMoveToFirst(pCur);
286✔
185
  if (code) {
285!
186
    stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
×
187
            tstrerror(code));
188
    tdbTbcClose(pCur);
189
    return ret;
×
190
  }
191

192
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
286✔
193
    if (pVal == NULL || vLen == 0) {
2!
194
      break;
195
    }
196

197
    SDecoder        decoder;
198
    SCheckpointInfo info;
199
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
2✔
200
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
2!
201
      tDecoderClear(&decoder);
×
202
      continue;
×
203
    }
204

205
    if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
2!
206
      ret = STREAM_STATA_NO_COMPATIBLE;
×
207
    } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
2!
208
      ret = STREAM_STATA_NEED_CONVERT;
2✔
209
    }
210

211
    tDecoderClear(&decoder);
2✔
212
    break;
2✔
213
  }
214

215
  tdbFree(pKey);
286✔
216
  tdbFree(pVal);
286✔
217
  tdbTbcClose(pCur);
287✔
218
  return ret;
287✔
219
}
220

221
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
2✔
222
  int32_t          code = 0;
2✔
223
  SBackendWrapper* pBackend = NULL;
2✔
224
  int64_t          chkpId = streamMetaGetLatestCheckpointId(pMeta);
2✔
225

226
  terrno = 0;
2✔
227
  bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
2✔
228
  if (exist == false) {
2✔
229
    code = terrno;
1✔
230
    return code;
1✔
231
  }
232

233
  code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
1✔
234
  if (code) {
1!
235
    return code;
×
236
  }
237

238
  void* pIter = taosHashIterate(pBackend->cfInst, NULL);
1✔
239
  while (pIter) {
3✔
240
    void* key = taosHashGetKey(pIter, NULL);
2✔
241
    code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
2✔
242
    if (code != 0) {
2!
243
      stError("vgId:%d failed to cvt data", pMeta->vgId);
×
244
      goto _EXIT;
×
245
    }
246

247
    pIter = taosHashIterate(pBackend->cfInst, pIter);
2✔
248
  }
249

250
_EXIT:
1✔
251
  streamBackendCleanup((void*)pBackend);
1✔
252

253
  if (code == 0) {
1!
254
    int32_t len = strlen(pMeta->path) + 32;
1✔
255
    char*   state = taosMemoryCalloc(1, len);
1!
256
    if (state != NULL) {
1!
257
      (void)snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
1✔
258
      taosRemoveDir(state);
1✔
259
      taosMemoryFree(state);
1!
260
    } else {
261
      stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
×
262
    }
263
  }
264

265
  return code;
1✔
266
}
267

268
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
286✔
269
  int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
286✔
270
  if (compatible == STREAM_STATA_COMPATIBLE) {
287✔
271
    return 0;
285✔
272
  } else if (compatible == STREAM_STATA_NEED_CONVERT) {
2!
273
    stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
2!
274
    return streamMetaCvtDbFormat(pMeta);
2✔
275
  } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
×
276
    stError(
×
277
        "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
278
        "manually",
279
        pMeta->vgId, tsDataDir);
280

281
    return -1;
×
282
  }
283

284
  return 0;
×
285
}
286

287
int8_t streamTaskShouldRecalated(SStreamTask* pTask) { return pTask->info.fillHistory == 2 ? 1 : 0; }
×
288

289
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key, uint8_t recalated) {
23✔
290
  int32_t code = 0;
23✔
291
  int64_t chkpId = pTask->chkInfo.checkpointId;
23✔
292

293
  // int8_t recalated = streamTaskShouldRecalated(pTask);
294

295
  streamMutexLock(&pMeta->backendMutex);
23✔
296
  // streamId--taskId
297
  void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
23✔
298
  if ((ppBackend != NULL) && (*ppBackend != NULL)) {
23!
299
    void* p = taskDbAddRef(*ppBackend);
3✔
300
    if (p == NULL) {
3!
301
      stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
×
302
      streamMutexUnlock(&pMeta->backendMutex);
×
303
      return TSDB_CODE_FAILED;
×
304
    }
305

306
    STaskDbWrapper* pBackend = *ppBackend;
3✔
307
    pBackend->pMeta = pMeta;
3✔
308
    if (recalated) {
3!
309
      pTask->pRecalBackend = pBackend;
×
310
    } else {
311
      pTask->pBackend = pBackend;
3✔
312
    }
313

314
    streamMutexUnlock(&pMeta->backendMutex);
3✔
315
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
3!
316
    return 0;
3✔
317
  }
318

319
  STaskDbWrapper* pBackend = NULL;
20✔
320
  int64_t         processVer = -1;
20✔
321
  while (1) {
322
    code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
20✔
323
    if (code == 0) {
20!
324
      break;
20✔
325
    }
326

327
    streamMutexUnlock(&pMeta->backendMutex);
×
328
    taosMsleep(1000);
×
329

330
    stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
×
331
    streamMutexLock(&pMeta->backendMutex);
×
332
  }
333

334
  int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
20✔
335
  if (recalated) {
20!
336
    pTask->pRecalBackend = pBackend;
×
337
  } else {
338
    pTask->pBackend = pBackend;
20✔
339
  }
340
  pBackend->refId = tref;
20✔
341
  pBackend->pTask = pTask;
20✔
342
  pBackend->pMeta = pMeta;
20✔
343

344
  if (processVer != -1) {
20✔
345
    if (pTask->chkInfo.processedVer != processVer) {
3!
346
      stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
×
347
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
348
      pTask->chkInfo.processedVer = processVer;
×
349
      pTask->chkInfo.checkpointVer = processVer;
×
350
      pTask->chkInfo.nextProcessVer = processVer + 1;
×
351
    } else {
352
      stInfo("s-task:%s vgId:%d processedVer:%" PRId64
3!
353
             " in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
354
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
355
    }
356
  }
357

358
  code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
20✔
359
  if (code) {
20!
360
    stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
×
361
  }
362
  streamMutexUnlock(&pMeta->backendMutex);
20✔
363

364
  if (recalated) {
20!
365
    stDebug("s-task:0x%x set recalated backend %p", pTask->id.taskId, pBackend);
×
366
  } else {
367
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
20✔
368
  }
369
  return 0;
20✔
370
}
371

372
void streamMetaRemoveDB(void* arg, char* key) {
16✔
373
  if (arg == NULL || key == NULL) return;
16!
374

375
  SStreamMeta* pMeta = arg;
14✔
376
  streamMutexLock(&pMeta->backendMutex);
14✔
377
  int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
14✔
378
  if (code) {
14!
379
    stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
×
380
  }
381

382
  streamMutexUnlock(&pMeta->backendMutex);
14✔
383
}
384

385
int32_t streamMetaUpdateInfoInit(STaskUpdateInfo* pInfo) {
286✔
386
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
286✔
387

388
  pInfo->pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
286✔
389
  if (pInfo->pTasks == NULL) {
287!
390
    return terrno;
×
391
  }
392

393
  pInfo->pTaskList = taosArrayInit(4, sizeof(int32_t));
287✔
394
  if (pInfo->pTaskList == NULL) {
287!
395
    return terrno;
×
396
  }
397

398
  return TSDB_CODE_SUCCESS;
287✔
399
}
400

401
void streamMetaUpdateInfoCleanup(STaskUpdateInfo* pInfo) {
278✔
402
  taosHashCleanup(pInfo->pTasks);
278✔
403
  taosArrayDestroy(pInfo->pTaskList);
279✔
404
  pInfo->pTasks = NULL;
279✔
405
  pInfo->pTaskList = NULL;
279✔
406
}
279✔
407

408

409

410
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
287✔
411
                       int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
412
  QRY_PARAM_CHECK(p);
287!
413
  int32_t code = 0;
287✔
414
  int32_t lino = 0;
287✔
415

416
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
287!
417
  if (pMeta == NULL) {
287!
418
    stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
×
419
    return terrno;
×
420
  }
421

422
  int32_t len = strlen(path) + 64;
287✔
423
  char*   tpath = taosMemoryCalloc(1, len);
287!
424
  TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
287!
425

426
  (void)snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream");
287✔
427
  pMeta->path = tpath;
287✔
428

429
  code = streamMetaOpenTdb(pMeta);
287✔
430
  TSDB_CHECK_CODE(code, lino, _err);
286!
431

432
  if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
286!
433
    stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
×
434
            tstrerror(terrno));
435
    TSDB_CHECK_CODE(code, lino, _err);
×
436
  }
437

438
  // set the attribute when running on Linux OS
439
  TdThreadRwlockAttr attr;
440
  code = taosThreadRwlockAttrInit(&attr);
287✔
441
  TSDB_CHECK_CODE(code, lino, _err);
287!
442

443
#ifdef LINUX
444
  code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
287✔
445
  TSDB_CHECK_CODE(code, lino, _err);
287!
446
#endif
447

448
  code = taosThreadRwlockInit(&pMeta->lock, &attr);
287✔
449
  TSDB_CHECK_CODE(code, lino, _err);
287!
450

451
  code = taosThreadRwlockAttrDestroy(&attr);
287✔
452
  TSDB_CHECK_CODE(code, lino, _err);
287!
453

454
  if ((code = streamMetaBegin(pMeta) < 0)) {
287!
455
    stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
×
456
    goto _err;
×
457
  }
458

459
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
287✔
460
  pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
286✔
461
  TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
287!
462

463
  code = streamMetaUpdateInfoInit(&pMeta->updateInfo);
287✔
464
  TSDB_CHECK_CODE(code, lino, _err);
287!
465

466
  code = streamMetaInitStartInfo(&pMeta->startInfo);
287✔
467
  TSDB_CHECK_CODE(code, lino, _err);
287!
468

469
  // task list
470
  pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
287✔
471
  TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
287!
472

473
  pMeta->scanInfo.scanSentinel = 0;
287✔
474
  pMeta->scanInfo.lastScanTs = 0;
287✔
475
  pMeta->scanInfo.tickCounter = 0;
287✔
476

477
  pMeta->vgId = vgId;
287✔
478
  pMeta->ahandle = ahandle;
287✔
479
  pMeta->buildTaskFn = buildTaskFn;
287✔
480
  pMeta->expandTaskFn = expandTaskFn;
287✔
481
  pMeta->stage = stage;
287✔
482
  pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
287✔
483
  pMeta->updateInfo.activeTransId = -1;
287✔
484
  pMeta->updateInfo.completeTransId = -1;
287✔
485

486
  pMeta->startInfo.completeFn = fn;
287✔
487
  pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
287✔
488
  TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
287!
489

490
  pMeta->numOfPausedTasks = 0;
287✔
491
  pMeta->numOfStreamTasks = 0;
287✔
492
  pMeta->closeFlag = false;
287✔
493

494
  stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
287!
495

496
  code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
287✔
497
  TSDB_CHECK_CODE(code, lino, _err);
287!
498

499
  code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
287✔
500
  TSDB_CHECK_CODE(code, lino, _err);
287!
501

502
  // add refId at the end of initialization function
503
  pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
287✔
504

505
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
287!
506
  TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
287!
507

508
  memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
287✔
509

510
  code = metaRefMgtAdd(pMeta->vgId, pRid);
287✔
511
  TSDB_CHECK_CODE(code, lino, _err);
287!
512

513
  code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
287✔
514

515
  TSDB_CHECK_CODE(code, lino, _err);
287!
516

517
  *p = pMeta;
287✔
518
  return code;
287✔
519

520
_err:
×
521
  taosMemoryFree(pMeta->path);
×
522
  if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
×
523
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
×
524
  if (pMeta->pTaskDb) {
×
525
    tdbTbClose(pMeta->pTaskDb);
×
526
    pMeta->pTaskDb = NULL;
×
527
  }
528
  if (pMeta->pCheckpointDb) {
×
529
    tdbTbClose(pMeta->pCheckpointDb);
×
530
  }
531
  if (pMeta->db) {
×
532
    tdbClose(pMeta->db);
×
533
  }
534

535
  if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
×
536
  if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
×
537
  if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
×
538
  if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
×
539
  if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
×
540

541
  if (pMeta->startInfo.pStagesList) taosArrayDestroy(pMeta->startInfo.pStagesList);
×
542
  taosMemoryFree(pMeta);
×
543

544
  stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
×
545
  return code;
×
546
}
547

548
// todo refactor: the lock shoud be restricted in one function
549
#ifdef BUILD_NO_CALL
550
void streamMetaInitBackend(SStreamMeta* pMeta) {
551
  pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
552
  if (pMeta->streamBackend == NULL) {
553
    while (1) {
554
      streamMetaWLock(pMeta);
555
      pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
556
      if (pMeta->streamBackend != NULL) {
557
        break;
558
      }
559

560
      streamMetaWUnLock(pMeta);
561
      stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
562
      taosMsleep(100);
563
    }
564
  }
565

566
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
567
  streamBackendLoadCheckpointInfo(pMeta);
568
}
569
#endif
570

571
void streamMetaClear(SStreamMeta* pMeta) {
282✔
572
  // remove all existed tasks in this vnode
573
  int64_t st = taosGetTimestampMs();
282✔
574
  void*   pIter = NULL;
282✔
575

576
  while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
306✔
577
    int64_t      refId = *(int64_t*)pIter;
24✔
578
    SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
24✔
579
    if (p == NULL) {
24✔
580
      continue;
19✔
581
    }
582

583
    // release the ref by timer
584
    if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) {  // one more ref in timer
5!
585
      stDebug("s-task:%s stop schedTimer", p->id.idStr);
×
586
      streamTmrStop(p->schedInfo.pDelayTimer);
×
587
      p->info.delaySchedParam = 0;
×
588
    }
589

590
    int32_t code = taosRemoveRef(streamTaskRefPool, refId);
5✔
591
    if (code) {
5!
592
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
593
    }
594

595
    code = taosReleaseRef(streamTaskRefPool, refId);
5✔
596
    if (code) {
5!
597
      stError("vgId:%d failed to release refId:%" PRId64, pMeta->vgId, refId);
×
598
    }
599
  }
600

601
  int64_t et = taosGetTimestampMs();
282✔
602
  stDebug("vgId:%d clear task map, elapsed time:%.2fs", pMeta->vgId, (et - st)/1000.0);
282✔
603

604
  if (pMeta->streamBackendRid != 0) {
282!
605
    int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
×
606
    if (code) {
×
607
      stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
×
608
    }
609
  }
610

611
  int64_t et1 = taosGetTimestampMs();
282✔
612
  stDebug("vgId:%d clear backend completed, elapsed time:%.2fs", pMeta->vgId, (et1 - et)/1000.0);
282✔
613

614
  taosHashClear(pMeta->pTasksMap);
282✔
615

616
  taosArrayClear(pMeta->pTaskList);
282✔
617
  taosArrayClear(pMeta->chkpSaved);
282✔
618
  taosArrayClear(pMeta->chkpInUse);
282✔
619

620
  pMeta->numOfStreamTasks = 0;
282✔
621
  pMeta->numOfPausedTasks = 0;
282✔
622

623
  // NOTE: the willrestart/starting flag can NOT be cleared
624
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
282✔
625
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
282✔
626
  taosArrayClear(pMeta->startInfo.pStagesList);
282✔
627
  taosArrayClear(pMeta->startInfo.pRecvChkptIdTasks);
282✔
628

629
  pMeta->startInfo.readyTs = 0;
282✔
630
  pMeta->startInfo.elapsedTime = 0;
282✔
631
  pMeta->startInfo.startTs = 0;
282✔
632
}
282✔
633

634
void streamMetaClose(SStreamMeta* pMeta) {
280✔
635
  if (pMeta == NULL) {
280✔
636
    return;
1✔
637
  }
638

639
  stDebug("vgId:%d start to close stream meta", pMeta->vgId);
279✔
640
  int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
279✔
641
  if (code) {
279!
642
    stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
×
643
  }
644
}
645

646
void streamMetaCloseImpl(void* arg) {
279✔
647
  SStreamMeta* pMeta = arg;
279✔
648
  if (pMeta == NULL) {
279!
649
    return;
×
650
  }
651

652
  int32_t code = 0;
279✔
653
  int32_t vgId = pMeta->vgId;
279✔
654
  stDebug("vgId:%d start to do-close stream meta", vgId);
279✔
655

656
  streamMetaWLock(pMeta);
279✔
657
  streamMetaClear(pMeta);
279✔
658
  streamMetaWUnLock(pMeta);
279✔
659

660
  // already log the error, ignore here
661
  tdbAbort(pMeta->db, pMeta->txn);
279✔
662
  tdbTbClose(pMeta->pTaskDb);
279✔
663
  tdbTbClose(pMeta->pCheckpointDb);
279✔
664
  tdbClose(pMeta->db);
279✔
665

666
  taosArrayDestroy(pMeta->pTaskList);
279✔
667
  taosArrayDestroy(pMeta->chkpSaved);
279✔
668
  taosArrayDestroy(pMeta->chkpInUse);
279✔
669

670
  taosHashCleanup(pMeta->pTasksMap);
279✔
671
  taosHashCleanup(pMeta->pTaskDbUnique);
279✔
672

673
  streamMetaUpdateInfoCleanup(&pMeta->updateInfo);
279✔
674
  streamMetaClearStartInfo(&pMeta->startInfo);
279✔
675

676
  destroyMetaHbInfo(pMeta->pHbInfo);
279✔
677
  pMeta->pHbInfo = NULL;
279✔
678

679
  taosMemoryFree(pMeta->path);
279!
680
  streamMutexDestroy(&pMeta->backendMutex);
279✔
681

682
  bkdMgtDestroy(pMeta->bkdChkptMgt);
279✔
683

684
  pMeta->role = NODE_ROLE_UNINIT;
279✔
685
  code = taosThreadRwlockDestroy(&pMeta->lock);
279✔
686
  if (code) {
279!
687
    stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
×
688
  }
689

690
  taosMemoryFree(pMeta);
279!
691
  stDebug("vgId:%d end to close stream meta", vgId);
279✔
692
}
693

694
// todo let's check the status for each task
695
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
26✔
696
  int32_t vgId = pTask->pMeta->vgId;
26✔
697
  void*   buf = NULL;
26✔
698
  int32_t len;
699
  int32_t code;
700
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
26!
701
  if (code < 0) {
26!
702
    return -1;
×
703
  }
704

705
  buf = taosMemoryCalloc(1, len);
26!
706
  if (buf == NULL) {
26!
707
    return terrno;
×
708
  }
709

710
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
26!
711
    pTask->ver = SSTREAM_TASK_VER;
×
712
  }
713

714
  SEncoder encoder = {0};
26✔
715
  tEncoderInit(&encoder, buf, len);
26✔
716
  code = tEncodeStreamTask(&encoder, pTask);
26✔
717
  tEncoderClear(&encoder);
26✔
718

719
  if (code == -1) {
26!
720
    stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
721
    return TSDB_CODE_INVALID_MSG;
×
722
  }
723

724
  int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
26✔
725

726
  code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
26✔
727
  if (code != TSDB_CODE_SUCCESS) {
26!
728
    code = terrno;
×
729
    stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
×
730
            vgId, pTask->id.refId, tstrerror(code));
731

732
    int64_t refId = pTask->id.refId;
×
733
    int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
×
734
    if (ret != 0) {
×
735
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId);
×
736
    }
737
  } else {
738
    stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
26!
739
  }
740

741
  taosMemoryFree(buf);
26!
742
  return code;
26✔
743
}
744

745
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
5✔
746
  int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
5✔
747
  int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
5✔
748
  if (code != 0) {
5!
749
    stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
×
750
            tstrerror(terrno));
751
  } else {
752
    stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
5!
753
  }
754

755
  return code;
5✔
756
}
757

758
// add to the ready tasks hash map, not the restored tasks hash map
759
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
23✔
760
  *pAdded = false;
23✔
761

762
  int32_t code = 0;
23✔
763
  int64_t refId = 0;
23✔
764
  STaskId id = streamTaskGetTaskId(pTask);
23✔
765
  void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
23✔
766

767
  if (p != NULL) {
23✔
768
    stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
2!
769
    tFreeStreamTask(pTask);
2✔
770
    return code;
2✔
771
  }
772

773
  if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
21!
774
    tFreeStreamTask(pTask);
×
775
    return code;
×
776
  }
777

778
  p = taosArrayPush(pMeta->pTaskList, &pTask->id);
21✔
779
  if (p == NULL) {
21!
780
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
781
    tFreeStreamTask(pTask);
×
782
    return terrno;
×
783
  }
784

785
  pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
21✔
786
  code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
21✔
787
  if (code) {
21!
788
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
789
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
790

791
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
792
    if (ret != 0) {
×
793
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
794
    }
795
    return code;
×
796
  }
797

798
  if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
21!
799
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
800
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
801

802
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
803
    if (ret) {
×
804
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
805
    }
806
    return code;
×
807
  }
808

809
  if ((code = streamMetaCommit(pMeta)) != 0) {
21!
810
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
811
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
812

813
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
814
    if (ret) {
×
815
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
816
    }
817

818
    return code;
×
819
  }
820

821
  if (pTask->info.fillHistory == 0) {
21✔
822
    int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
16✔
823
  }
824

825
  *pAdded = true;
21✔
826
  return code;
21✔
827
}
828

829
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
1,173✔
830
  int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
1,173✔
831
  int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
1,173✔
832
  if (sizeInList != size) {
1,173!
833
    stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
×
834
  }
835

836
  return size;
1,173✔
837
}
838

839
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
2,976✔
840
  QRY_PARAM_CHECK(pTask);
2,976!
841
  STaskId  id = {.streamId = streamId, .taskId = taskId};
2,976✔
842
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
2,976✔
843
  if (pTaskRefId == NULL) {
2,976!
844
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
845
  }
846

847
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
2,976✔
848
  if (p == NULL) {
2,976✔
849
    stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId);
15!
850
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
15✔
851
  }
852

853
  if (p->id.refId != *pTaskRefId) {
2,961!
854
    stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
×
855
            p->id.refId);
856
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
857
    if (ret) {
×
858
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
859
    }
860

861
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
862
  }
863

864
  if (streamTaskShouldStop(p)) {
2,961!
865
    stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
×
866
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
867
    if (ret) {
×
868
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
869
    }
870
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
871
  }
872

873
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
2,961!
874
  *pTask = p;
2,961✔
875
  return TSDB_CODE_SUCCESS;
2,961✔
876
}
877

878
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
313✔
879
  QRY_PARAM_CHECK(pTask);
313!
880
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
313✔
881

882
  if (pTaskRefId == NULL) {
313✔
883
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
2✔
884
  }
885

886
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
311✔
887
  if (p == NULL) {
311!
888
    stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
×
889
            *pTaskRefId);
890
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
891
  }
892

893
  if (p->id.refId != *pTaskRefId) {
311!
894
    stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
×
895
            *pTaskRefId, p->id.refId);
896
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
897
    if (ret) {
×
898
      stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
×
899
    }
900

901
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
902
  }
903

904
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
311!
905
  *pTask = p;
311✔
906
  return TSDB_CODE_SUCCESS;
311✔
907
}
908

909
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
2,926✔
910
  streamMetaRLock(pMeta);
2,926✔
911
  int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
2,926✔
912
  streamMetaRUnLock(pMeta);
2,926✔
913
  return code;
2,926✔
914
}
915

916
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
3,614✔
917
  if (pTask == NULL) {
3,614✔
918
    return;
5✔
919
  }
920

921
  int32_t taskId = pTask->id.taskId;
3,609✔
922
  int64_t refId = pTask->id.refId;
3,609✔
923
  stDebug("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
3,609!
924
  int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
3,609✔
925
  if (ret) {
3,609!
926
    stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
×
927
  }
928
}
929

930
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
5✔
931
  bool remove = false;
5✔
932
  for (int32_t i = 0; i < num; ++i) {
14!
933
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
14✔
934
    if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
14!
935
      taosArrayRemove(pTaskList, i);
5✔
936
      remove = true;
5✔
937
      break;
5✔
938
    }
939
  }
940

941
  if (!remove) {
5!
942
    stError("s-task:0x%x not in meta task list, internal error", id->taskId);
×
943
  }
944
}
5✔
945

946
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
5✔
947
  int32_t code = 0;
5✔
948

949
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
5✔
950
    code = streamTaskSendCheckpointSourceRsp(pTask);
2✔
951
    if (code) {
2!
952
      stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
×
953
              tstrerror(code));
954
    }
955
  }
956

957
  // let's kill the query procedure within stream, to end it ASAP.
958
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
5!
959
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
3✔
960
    if (code != TSDB_CODE_SUCCESS) {
3!
961
      stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
962
    }
963
  }
964
  return code;
5✔
965
}
966

967
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
5✔
968
  SStreamTask* pTask = NULL;
5✔
969
  int32_t      vgId = pMeta->vgId;
5✔
970
  int32_t      code = 0;
5✔
971
  STaskId      id = {.streamId = streamId, .taskId = taskId};
5✔
972

973
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
5✔
974
  if (code == 0) {
5!
975
    // desc the paused task counter
976
    if (streamTaskShouldPause(pTask)) {
5!
977
      int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
×
978
      stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
×
979
    }
980

981
    // handle the dropping event
982
    code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
5✔
983
    if (code) {
5!
984
      stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
×
985
    }
986

987
    stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
5!
988

989
    // it is a fill-history task, remove the related stream task's id that points to it
990
    if (pTask->info.fillHistory == 0) {
5!
991
      int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
×
992
    }
993

994
    code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
5✔
995
    doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
5✔
996
    code = streamMetaRemoveTaskInMeta(pMeta, &id);
5✔
997
    if (code) {
5!
998
      stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
×
999
    }
1000

1001
    int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
5✔
1002
    int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
5✔
1003
    if (sizeInList != size) {
5!
1004
      stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
×
1005
    }
1006

1007
    if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
5!
1008
      stDebug("s-task:%s stop schedTimer", pTask->id.idStr);
×
1009
      streamTmrStop(pTask->schedInfo.pDelayTimer);
×
1010
      pTask->info.delaySchedParam = 0;
×
1011
    }
1012

1013
    int64_t refId = pTask->id.refId;
5✔
1014
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
5✔
1015
    if (ret != 0) {
5!
1016
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1017
    }
1018

1019
    streamMetaReleaseTask(pMeta, pTask);
5✔
1020
  } else {
1021
    stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
×
1022
  }
1023

1024
  return 0;
5✔
1025
}
1026

1027
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
×
1028
  SStreamTask* pTask = NULL;
×
1029
  int32_t      code = 0;
×
1030
  int32_t      vgId = pMeta->vgId;
×
1031
  int32_t      numOfTasks = 0;
×
1032

1033
  streamMetaWLock(pMeta);
×
1034

1035
//  code = streamMetaUnregisterTask(pMeta, streamId, taskId);
1036
//  numOfTasks = streamMetaGetNumOfTasks(pMeta);
1037
//  if (code) {
1038
//    stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1039
//  }
1040
//
1041
//  code = streamMetaCommit(pMeta);
1042
//  if (code) {
1043
//    stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1044
//  } else {
1045
//    stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
1046
//  }
1047

1048
  streamMetaWUnLock(pMeta);
×
1049

1050
  return code;
×
1051
}
1052

1053
int32_t streamMetaBegin(SStreamMeta* pMeta) {
287✔
1054
  streamMetaWLock(pMeta);
287✔
1055
  int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
287✔
1056
                          TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1057
  if (code) {
287!
1058
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1059
  }
1060
  streamMetaWUnLock(pMeta);
287✔
1061
  return code;
287✔
1062
}
1063

1064
int32_t streamMetaCommit(SStreamMeta* pMeta) {
313✔
1065
  int32_t code = tdbCommit(pMeta->db, pMeta->txn);
313✔
1066
  if (code != 0) {
313!
1067
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1068
    stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1069
            pMeta->fatalInfo.line);
1070
  }
1071

1072
  code = tdbPostCommit(pMeta->db, pMeta->txn);
313✔
1073
  if (code != 0) {
313!
1074
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1075
    stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1076
            pMeta->fatalInfo.line);
1077
    return code;
×
1078
  }
1079

1080
  code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
313✔
1081
                  TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1082
  if (code != 0) {
313!
1083
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1084
    stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
×
1085
  } else {
1086
    stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
313✔
1087
  }
1088

1089
  return code;
313✔
1090
}
1091

1092
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
2✔
1093
  int64_t checkpointId = 0;
2✔
1094
  int32_t code = 0;
2✔
1095

1096
  TBC* pCur = NULL;
2✔
1097
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
2!
1098
    stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1099
    return checkpointId;
×
1100
  }
1101

1102
  void*    pKey = NULL;
2✔
1103
  int32_t  kLen = 0;
2✔
1104
  void*    pVal = NULL;
2✔
1105
  int32_t  vLen = 0;
2✔
1106
  SDecoder decoder;
1107

1108
  code = tdbTbcMoveToFirst(pCur);
2✔
1109
  if (code) {
2!
1110
    stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1111
    tdbTbcClose(pCur);
×
1112
    return checkpointId;
×
1113
  }
1114

1115
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
5✔
1116
    if (pVal == NULL || vLen == 0) {
3!
1117
      break;
1118
    }
1119
    SCheckpointInfo info;
1120
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
3✔
1121
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
3!
1122
      tDecoderClear(&decoder);
×
1123
      continue;
×
1124
    }
1125
    tDecoderClear(&decoder);
3✔
1126

1127
    checkpointId = TMAX(checkpointId, info.checkpointId);
3✔
1128
  }
1129

1130
  stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
2!
1131

1132
  tdbFree(pKey);
2✔
1133
  tdbFree(pVal);
2✔
1134

1135
  tdbTbcClose(pCur);
2✔
1136
  return checkpointId;
2✔
1137
}
1138

1139
static void dropHistoryTaskIfNoStreamTask(SStreamMeta* pMeta, SArray*  pRecycleList) {
282✔
1140
  int32_t i = 0;
282✔
1141
  while (i < taosArrayGetSize(pMeta->pTaskList)) {
290✔
1142
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
8✔
1143
    if (pTaskId == NULL) {
8!
1144
      i++;
×
1145
      continue;
×
1146
    }
1147
    SStreamTask* task = taosAcquireRef(streamTaskRefPool, pTaskId->refId);
8✔
1148
    if (task != NULL && task->info.fillHistory == 1) {
8!
1149
      if (taosHashGet(pMeta->pTasksMap, &task->streamTaskId, sizeof(STaskId)) == NULL &&
×
1150
        task->status.taskStatus != TASK_STATUS__DROPPING) {
×
1151
        STaskId id = streamTaskGetTaskId(task);
×
1152
        if (taosArrayPush(pRecycleList, &id) == NULL) {
×
1153
          stError("%s s-task:0x%x failed to add into pRecycleList list due to:%d", __FUNCTION__, task->id.taskId, terrno);
×
1154
        } else {
1155
          int32_t total = taosArrayGetSize(pRecycleList);
×
1156
          stInfo("%s s-task:0x%x is already dropped, add into recycle list, total:%d", __FUNCTION__, task->id.taskId, total);
×
1157
        }
1158
        int32_t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(STaskId));
×
1159
        if (code == 0) {
×
1160
          taosArrayRemove(pMeta->pTaskList, i);
×
1161
        } else {
1162
          i++;
×
1163
          stError("%s s-task:0x%x failed to remove task from taskmap, code:%d", __FUNCTION__, task->id.taskId, code);
×
1164
        }
1165
        if (taosReleaseRef(streamTaskRefPool, pTaskId->refId) != 0) {
×
1166
          stError("%s s-task:0x%x failed to release refId:%" PRId64, __FUNCTION__, task->id.taskId, pTaskId->refId);
×
1167
        }
1168
        continue;
×
1169
      }
1170
    }
1171
    if (task != NULL && taosReleaseRef(streamTaskRefPool, pTaskId->refId) != 0) {
8!
1172
      stError("%s s-task:0x%x failed to release refId:%" PRId64, __FUNCTION__, task->id.taskId, pTaskId->refId);
×
1173
    }
1174
    i++;
8✔
1175
  }
1176
}
282✔
1177

1178
// not allowed to return error code
1179
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
281✔
1180
  TBC*     pCur = NULL;
281✔
1181
  void*    pKey = NULL;
281✔
1182
  int32_t  kLen = 0;
281✔
1183
  void*    pVal = NULL;
281✔
1184
  int32_t  vLen = 0;
281✔
1185
  SDecoder decoder;
1186
  int32_t  vgId = 0;
281✔
1187
  int32_t  code = 0;
281✔
1188
  SArray*  pRecycleList = NULL;
281✔
1189

1190
  if (pMeta == NULL) {
281!
1191
    return;
×
1192
  }
1193

1194
  vgId = pMeta->vgId;
281✔
1195
  pRecycleList = taosArrayInit(4, sizeof(STaskId));
281✔
1196
  if (pRecycleList == NULL) {
281!
1197
    stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
×
1198
    return;
×
1199
  }
1200

1201
  stInfo("vgId:%d load stream tasks from meta files", vgId);
281!
1202

1203
  code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
282✔
1204
  if (code != TSDB_CODE_SUCCESS) {
282!
1205
    stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1206
    taosArrayDestroy(pRecycleList);
×
1207
    return;
×
1208
  }
1209

1210
  code = tdbTbcMoveToFirst(pCur);
282✔
1211
  if (code) {
282!
1212
    stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1213
    taosArrayDestroy(pRecycleList);
×
1214
    tdbTbcClose(pCur);
×
1215
    return;
×
1216
  }
1217

1218
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
290✔
1219
    if (pVal == NULL || vLen == 0) {
8!
1220
      break;
1221
    }
1222

1223
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
8!
1224
    if (pTask == NULL) {
8!
1225
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1226
      stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
×
1227
      break;
×
1228
    }
1229

1230
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
8✔
1231
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
8!
1232
      tDecoderClear(&decoder);
×
1233
      tFreeStreamTask(pTask);
×
1234
      stError(
×
1235
          "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
1236
          "stream manually",
1237
          vgId, tsDataDir);
1238
      break;
×
1239
    }
1240
    tDecoderClear(&decoder);
8✔
1241

1242
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
8!
1243
      int32_t taskId = pTask->id.taskId;
×
1244
      STaskId id = streamTaskGetTaskId(pTask);
×
1245

1246
      tFreeStreamTask(pTask);
×
1247
      void* px = taosArrayPush(pRecycleList, &id);
×
1248
      if (px == NULL) {
×
1249
        stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
×
1250
      }
1251

1252
      int32_t total = taosArrayGetSize(pRecycleList);
×
1253
      stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
×
1254
      continue;
×
1255
    }
1256

1257
    stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
8!
1258
            pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
1259

1260
    // do duplicate task check.
1261
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
8✔
1262
    void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
8✔
1263
    if (p == NULL) {
8!
1264
      code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
8✔
1265
      if (code < 0) {
8!
1266
        stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
×
1267
        tFreeStreamTask(pTask);
×
1268
        continue;
×
1269
      }
1270

1271
      void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
8✔
1272
      if (px == NULL) {
8!
1273
        stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
×
1274
      }
1275
    } else {
1276
      // todo this should replace the existed object put by replay creating stream task msg from mnode
1277
      stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
×
1278
      tFreeStreamTask(pTask);
×
1279
      continue;
×
1280
    }
1281

1282
    pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
8✔
1283

1284
    if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
8!
1285
      int64_t refId = pTask->id.refId;
×
1286
      stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
×
1287
              pTask->id.taskId, tstrerror(terrno), refId);
1288

1289
      void*   px = taosArrayPop(pMeta->pTaskList);
×
1290
      int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
1291
      if (ret != 0) {
×
1292
        stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1293
      }
1294
      continue;
×
1295
    }
1296

1297
    stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId);
8!
1298
    if (pTask->info.fillHistory == 0) {
8!
1299
      int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
8✔
1300
    }
1301

1302
    if (streamTaskShouldPause(pTask)) {
8!
1303
      int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
1304
    }
1305
  }
1306

1307
  tdbFree(pKey);
282✔
1308
  tdbFree(pVal);
282✔
1309

1310
  tdbTbcClose(pCur);
282✔
1311

1312
  dropHistoryTaskIfNoStreamTask(pMeta, pRecycleList);
282✔
1313

1314
  if (taosArrayGetSize(pRecycleList) > 0) {
282!
1315
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
×
1316
      STaskId* pId = taosArrayGet(pRecycleList, i);
×
1317
      code = streamMetaRemoveTaskInMeta(pMeta, pId);
×
1318
      if (code) {
×
1319
        stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
×
1320
      }
1321
    }
1322
  }
1323

1324
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
282✔
1325
  stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
282✔
1326
          pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
1327

1328
  taosArrayDestroy(pRecycleList);
282✔
1329
  code = streamMetaCommit(pMeta);
282✔
1330
  if (code) {
282!
1331
    stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
×
1332
  }
1333
}
1334

1335
void streamMetaNotifyClose(SStreamMeta* pMeta) {
463✔
1336
  int32_t vgId = pMeta->vgId;
463✔
1337
  int64_t startTs = 0;
463✔
1338
  int32_t sendCount = 0;
463✔
1339

1340
  streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
463✔
1341
  stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
463!
1342
         vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
1343

1344
  // wait for the stream meta hb function stopping
1345
  pMeta->closeFlag = true;
463✔
1346
  streamMetaWaitForHbTmrQuit(pMeta);
463✔
1347

1348
  stDebug("vgId:%d start to check all tasks for closing", vgId);
463✔
1349
  int64_t st = taosGetTimestampMs();
463✔
1350

1351
  streamMetaRLock(pMeta);
463✔
1352

1353
  SArray* pTaskList = NULL;
463✔
1354
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
463✔
1355
  if (code != TSDB_CODE_SUCCESS) {
1356
  }
1357

1358
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
463✔
1359
  for (int32_t i = 0; i < numOfTasks; ++i) {
497✔
1360
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
34✔
1361
    SStreamTask*   pTask = NULL;
34✔
1362

1363
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
34✔
1364
    if (code != TSDB_CODE_SUCCESS) {
34✔
1365
      continue;
15✔
1366
    }
1367

1368
    int64_t refId = pTask->id.refId;
19✔
1369
    int32_t ret = streamTaskStop(pTask);
19✔
1370
    if (ret) {
19!
1371
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
1372
    }
1373

1374
    streamMetaReleaseTask(pMeta, pTask);
19✔
1375
    ret = taosRemoveRef(streamTaskRefPool, refId);
19✔
1376
    if (ret) {
19!
1377
      stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
×
1378
    }
1379
  }
1380

1381
  taosArrayDestroy(pTaskList);
463✔
1382

1383
  double el = (taosGetTimestampMs() - st) / 1000.0;
463✔
1384
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
463✔
1385

1386
  if (pMeta->scanInfo.scanTimer != NULL) {
463✔
1387
    streamTmrStop(pMeta->scanInfo.scanTimer);
229✔
1388
    pMeta->scanInfo.scanTimer = NULL;
229✔
1389
  }
1390

1391
  streamMetaRUnLock(pMeta);
463✔
1392
}
463✔
1393

1394
void streamMetaStartHb(SStreamMeta* pMeta) {
229✔
1395
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
229!
1396
  if (pRid == NULL) {
229!
1397
    stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
×
1398
    return;
×
1399
  }
1400

1401
  *pRid = pMeta->rid;
229✔
1402
  int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
229✔
1403
  if (code) {
229!
1404
    return;
×
1405
  }
1406

1407
  streamMetaHbToMnode(pRid, NULL);
229✔
1408
}
1409

1410
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
463✔
1411
  QRY_PARAM_CHECK(pList);
463!
1412

1413
  int32_t code = 0;
463✔
1414
  SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
463✔
1415
  if (pTaskList == NULL) {
463!
1416
    stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
×
1417
    return terrno;
×
1418
  }
1419

1420
  *pList = pTaskList;
463✔
1421

1422
  bool sendMsg = pMeta->sendMsgBeforeClosing;
463✔
1423
  if (!sendMsg) {
463!
1424
    stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
463✔
1425
    return TSDB_CODE_SUCCESS;
463✔
1426
  }
1427

1428
  stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
×
1429

1430
  // send hb msg to mnode before closing all tasks.
1431
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
×
1432
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
1433
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
1434
    SStreamTask*   pTask = NULL;
×
1435

1436
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
1437
    if (code != TSDB_CODE_SUCCESS) {  // this error is ignored
×
1438
      continue;
×
1439
    }
1440

1441
    streamTaskSetCheckpointFailed(pTask);
×
1442
    streamMetaReleaseTask(pMeta, pTask);
×
1443
  }
1444

1445
  code = streamMetaSendHbHelper(pMeta);
×
1446
  pMeta->sendMsgBeforeClosing = false;
×
1447
  return TSDB_CODE_SUCCESS;  // always return true
×
1448
}
1449

1450
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t term, bool isLeader) {
350✔
1451
  streamMetaWLock(pMeta);
350✔
1452

1453
  int64_t prevTerm = pMeta->stage;
349✔
1454
  int32_t prevRole = pMeta->role;
349✔
1455

1456
  pMeta->stage = term;
349✔
1457
  pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
349✔
1458

1459
  // mark the sign to send msg before close all tasks
1460
  // 1. for a leader vnode, always send msg before closing itself
1461
  // 2. for a follower vnode, if it's changed from a leader, also sending msg before closing.
1462
  if (prevRole == NODE_ROLE_LEADER) {
349!
1463
    pMeta->sendMsgBeforeClosing = true;
×
1464
  }
1465

1466
  if ((prevRole == NODE_ROLE_FOLLOWER || prevRole == NODE_ROLE_LEADER) && (prevRole != pMeta->role) &&
375!
1467
      (taosArrayGetSize(pMeta->pTaskList) > 0)) {
26✔
1468
    SStreamTask* pTask = NULL;
×
1469
    STaskId*     pId = taosArrayGet(pMeta->pTaskList, 0);
×
1470

1471
    int32_t code = streamMetaAcquireTaskUnsafe(pMeta, pId, &pTask);
×
1472
    if (code == 0) {
×
1473
      stInfo("vgId:%d role changed, added into nodeUpdate list, use s-task:0x%s", pMeta->vgId, pTask->id.idStr);
×
1474
      int32_t unused = streamTaskAddIntoNodeUpdateList(pTask, pMeta->vgId);
×
1475
      streamMetaReleaseTask(pMeta, pTask);
×
1476
    }
1477
  }
1478

1479
  if (!isLeader) {
349✔
1480
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
121✔
1481
  } else {  // wait for nodeep update if become leader from follower
1482
    if (prevRole == NODE_ROLE_FOLLOWER) {
228✔
1483
      pMeta->startInfo.tasksWillRestart = 1;
26✔
1484
    }
1485
  }
1486

1487
  streamMetaWUnLock(pMeta);
349✔
1488

1489
  if (isLeader) {
350✔
1490
    if (prevRole == NODE_ROLE_FOLLOWER) {
229✔
1491
      stInfo("vgId:%d update term:%" PRId64 ", prevTerm:%" PRId64
26!
1492
             " prevRole:%d leader:%d, start to send Hb, rid:%" PRId64 " restart after nodeEp being updated",
1493
             pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->rid);
1494
    } else {
1495
      stInfo("vgId:%d update term:%" PRId64 ", prevTerm:%" PRId64
203!
1496
             " prevRole:%d leader:%d, start to send Hb, rid:%" PRId64,
1497
             pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->rid);
1498
    }
1499
    streamMetaStartHb(pMeta);
229✔
1500
  } else {
1501
    stInfo("vgId:%d update term:%" PRId64 " prevTerm:%" PRId64 " prevRole:%d leader:%d sendMsg beforeClosing:%d",
121!
1502
           pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->sendMsgBeforeClosing);
1503
  }
1504
}
350✔
1505

1506
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
4✔
1507
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
4✔
1508
  for (int32_t i = 0; i < num; ++i) {
11✔
1509
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
7✔
1510
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
7✔
1511
    SStreamTask*   pTask = NULL;
7✔
1512
    int32_t        code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
7✔
1513

1514
    if (code == 0) {
7!
1515
      if (pTask->status.downstreamReady == 0) {
7!
1516
        streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
×
1517
        return false;
×
1518
      }
1519
      streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
7✔
1520
    }
1521
  }
1522

1523
  return true;
4✔
1524
}
1525

1526
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
5✔
1527
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5✔
1528

1529
  stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
5!
1530
  if (numOfTasks == 0) {
5!
1531
    return TSDB_CODE_SUCCESS;
×
1532
  }
1533

1534
  for (int32_t i = 0; i < numOfTasks; ++i) {
13✔
1535
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
8✔
1536
    STaskId        id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
8✔
1537
    SStreamTask*   pTask = NULL;
8✔
1538
    int32_t        code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
8✔
1539
    if (code == 0) {
8!
1540
      streamTaskResetStatus(pTask);
8✔
1541
      streamMetaReleaseTask(pMeta, pTask);
8✔
1542
    }
1543
  }
1544

1545
  return 0;
5✔
1546
}
1547

1548
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
5✔
1549
                                     int64_t startTs) {
1550
  const char* id = pTask->id.idStr;
5✔
1551
  int32_t     vgId = pMeta->vgId;
5✔
1552
  int32_t     code = 0;
5✔
1553

1554
  // keep the already updated info
1555
  STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
5✔
1556
  code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
5✔
1557
  if (code != 0) {
5!
1558
    stError("s-task:%s failed to put updateTask into update list", id);
×
1559
  }
1560

1561
  int64_t el = taosGetTimestampMs() - startTs;
5✔
1562
  if (pHTask != NULL) {
5!
1563
    STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
×
1564
    code = taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
×
1565
    if (code != 0) {
×
1566
      stError("s-task:%s failed to put updateTask into update list", id);
×
1567
    } else {
1568
      stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
×
1569
              " ms",
1570
              id, vgId, transId, el);
1571
    }
1572
  } else {
1573
    stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
5!
1574
            id, vgId, transId, el);
1575
  }
1576
}
5✔
1577

1578
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
3✔
1579
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
3✔
1580
  int32_t          num = taosArrayGetSize(pInfo->pTaskList);
3✔
1581

1582
  taosHashClear(pInfo->pTasks);
3✔
1583
  taosArrayClear(pInfo->pTaskList);
3✔
1584

1585
  int32_t prev = pInfo->completeTransId;
3✔
1586
  pInfo->completeTransId = pInfo->activeTransId;
3✔
1587
  pInfo->activeTransId = -1;
3✔
1588
  pInfo->completeTs = taosGetTimestampMs();
3✔
1589

1590
  stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64
3!
1591
          ", complete transId:%d->%d, update Tasks:%d reset active transId",
1592
          pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId, num);
1593
}
3✔
1594

1595
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
5✔
1596
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
5✔
1597

1598
  if (transId > pInfo->completeTransId) {
5!
1599
    if (pInfo->activeTransId == -1) {
5✔
1600
      taosHashClear(pInfo->pTasks);
3✔
1601
      pInfo->activeTransId = transId;
3✔
1602

1603
      // interrupt the start all tasks procedure, only partial tasks will be started
1604
      // the completion of this processed is based on the partial started tasks.
1605
      if (pMeta->startInfo.startAllTasks == 1) {
3!
1606
        int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
×
1607
        pMeta->startInfo.partialTasksStarted = true;
×
1608
        stInfo(
×
1609
            "vgId:%d set the active epset update transId:%d, prev complete transId:%d, start all interrupted, only %d "
1610
            "tasks were started",
1611
            pMeta->vgId, transId, pInfo->completeTransId, num);
1612
      } else {
1613
        stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId,
3!
1614
               pInfo->completeTransId);
1615
      }
1616
      return true;
3✔
1617
    } else {
1618
      if (pInfo->activeTransId == transId) {
2!
1619
        // do nothing
1620
        return true;
2✔
1621
      } else if (transId < pInfo->activeTransId) {
×
1622
        stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard",
×
1623
                pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
1624
        return false;
×
1625
      } else {  // transId > pInfo->activeTransId
1626
        taosHashClear(pInfo->pTasks);
×
1627
        int32_t prev = pInfo->activeTransId;
×
1628
        pInfo->activeTransId = transId;
×
1629

1630
        stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId,
×
1631
               transId, prev, pInfo->completeTransId);
1632
        return true;
×
1633
      }
1634
    }
1635
  } else if (transId == pInfo->completeTransId) {
×
1636
    stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
×
1637
            pInfo->completeTs);
1638
    return false;
×
1639
  } else {  // pInfo->completeTransId > transId
1640
    stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
×
1641
            pMeta->vgId, pInfo->activeTransId, transId);
1642
    return false;
×
1643
  }
1644
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc