• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3831

02 Apr 2025 01:14AM UTC coverage: 34.081% (-0.02%) from 34.097%
#3831

push

travis-ci

happyguoxy
test:alter gcda dir

148596 of 599532 branches covered (24.79%)

Branch coverage included in aggregate %.

222550 of 489473 relevant lines covered (45.47%)

1589752.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.74
/source/libs/stream/src/streamMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "streamInt.h"
18
#include "tmisce.h"
19
#include "tref.h"
20
#include "tsched.h"
21
#include "tstream.h"
22
#include "ttimer.h"
23
#include "wal.h"
24

25
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
26

27
int32_t streamBackendId = 0;
28
int32_t streamBackendCfWrapperId = 0;
29
int32_t taskDbWrapperId = 0;
30
int32_t streamTaskRefPool = 0;
31

32
static int32_t streamMetaBegin(SStreamMeta* pMeta);
33
static void    streamMetaCloseImpl(void* arg);
34

35
typedef struct {
36
  TdThreadMutex mutex;
37
  SHashObj*     pTable;
38
} SMetaRefMgt;
39

40
SMetaRefMgt gMetaRefMgt;
41

42
int32_t metaRefMgtInit();
43
void    metaRefMgtCleanup();
44

45
static void streamMetaEnvInit() {
405✔
46
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
405✔
47
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
405✔
48
  taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
405✔
49

50
  streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
405✔
51
  streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
405✔
52

53
  int32_t code = metaRefMgtInit();
405✔
54
  if (code) {
405!
55
    stError("failed to init stream meta mgmt env, start failed");
×
56
    return;
×
57
  }
58

59
  code = streamTimerInit();
405✔
60
  if (code) {
405!
61
    stError("failed to init stream meta env, start failed");
×
62
  }
63
}
64

65
void streamMetaInit() {
411✔
66
  int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
411✔
67
  if (code) {
411!
68
    stError("failed to init stream Meta model, code:%s", tstrerror(code));
×
69
  }
70
}
411✔
71

72
void streamMetaCleanup() {
210✔
73
  taosCloseRef(streamBackendId);
210✔
74
  taosCloseRef(streamBackendCfWrapperId);
210✔
75
  taosCloseRef(streamMetaRefPool);
210✔
76
  taosCloseRef(streamTaskRefPool);
210✔
77

78
  metaRefMgtCleanup();
210✔
79
  streamTimerCleanUp();
210✔
80
}
210✔
81

82
int32_t metaRefMgtInit() {
405✔
83
  int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
405✔
84
  if (code) {
405!
85
    return code;
×
86
  }
87

88
  if (code == 0) {
405!
89
    gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
405✔
90
  }
91

92
  if (gMetaRefMgt.pTable == NULL) {
405!
93
    return terrno;
×
94
  } else {
95
    return code;
405✔
96
  }
97
}
98

99
void metaRefMgtCleanup() {
210✔
100
  void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
210✔
101
  while (pIter) {
1,740✔
102
    int64_t* p = *(int64_t**)pIter;
1,530✔
103
    taosMemoryFree(p);
1,530!
104
    pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
1,530✔
105
  }
106

107
  taosHashCleanup(gMetaRefMgt.pTable);
210✔
108
  streamMutexDestroy(&gMetaRefMgt.mutex);
210✔
109
}
210✔
110

111
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
2,631✔
112
  int32_t code = 0;
2,631✔
113
  void*   p = NULL;
2,631✔
114

115
  streamMutexLock(&gMetaRefMgt.mutex);
2,631✔
116

117
  p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
2,631✔
118
  if (p == NULL) {
2,631!
119
    code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
2,631✔
120
    if (code) {
2,631!
121
      stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid,
×
122
              tstrerror(code));
123
      return code;
×
124
    } else {  // not
125
              //      stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
126
    }
127
  } else {
128
    stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid);
×
129
  }
130

131
  streamMutexUnlock(&gMetaRefMgt.mutex);
2,631✔
132
  return code;
2,631✔
133
}
134

135
void metaRefMgtRemove(int64_t* pRefId) {
1,077✔
136
  streamMutexLock(&gMetaRefMgt.mutex);
1,077✔
137

138
  int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
1,077✔
139
  taosMemoryFree(pRefId);
1,077!
140
  streamMutexUnlock(&gMetaRefMgt.mutex);
1,077✔
141
}
1,077✔
142

143
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
861✔
144
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
861!
145
    stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
×
146
    return -1;
×
147
  }
148

149
  if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
861!
150
    stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
×
151
    return -1;
×
152
  }
153

154
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
861!
155
    stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
×
156
    return -1;
×
157
  }
158

159
  return 0;
861✔
160
}
161

162
//
163
// impl later
164
//
165
enum STREAM_STATE_VER {
166
  STREAM_STATA_NO_COMPATIBLE,
167
  STREAM_STATA_COMPATIBLE,
168
  STREAM_STATA_NEED_CONVERT,
169
};
170

171
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
861✔
172
  int8_t  ret = STREAM_STATA_COMPATIBLE;
861✔
173
  TBC*    pCur = NULL;
861✔
174
  int32_t code = 0;
861✔
175
  void*   pKey = NULL;
861✔
176
  int32_t kLen = 0;
861✔
177
  void*   pVal = NULL;
861✔
178
  int32_t vLen = 0;
861✔
179

180
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {  // no task info, no stream
861!
181
    return ret;
×
182
  }
183

184
  code = tdbTbcMoveToFirst(pCur);
861✔
185
  if (code) {
861!
186
    stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
×
187
            tstrerror(code));
188
    tdbTbcClose(pCur);
×
189
    return ret;
×
190
  }
191

192
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
861✔
193
    if (pVal == NULL || vLen == 0) {
6!
194
      break;
195
    }
196

197
    SDecoder        decoder;
198
    SCheckpointInfo info;
199
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
6✔
200
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
6!
201
      tDecoderClear(&decoder);
×
202
      continue;
×
203
    }
204

205
    if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
6!
206
      ret = STREAM_STATA_NO_COMPATIBLE;
×
207
    } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
6!
208
      ret = STREAM_STATA_NEED_CONVERT;
6✔
209
    }
210

211
    tDecoderClear(&decoder);
6✔
212
    break;
6✔
213
  }
214

215
  tdbFree(pKey);
861✔
216
  tdbFree(pVal);
861✔
217
  tdbTbcClose(pCur);
861✔
218
  return ret;
861✔
219
}
220

221
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
6✔
222
  int32_t          code = 0;
6✔
223
  SBackendWrapper* pBackend = NULL;
6✔
224
  int64_t          chkpId = streamMetaGetLatestCheckpointId(pMeta);
6✔
225

226
  terrno = 0;
6✔
227
  bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
6✔
228
  if (exist == false) {
6✔
229
    code = terrno;
3✔
230
    return code;
3✔
231
  }
232

233
  code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
3✔
234
  if (code) {
3!
235
    return code;
×
236
  }
237

238
  void* pIter = taosHashIterate(pBackend->cfInst, NULL);
3✔
239
  while (pIter) {
9✔
240
    void* key = taosHashGetKey(pIter, NULL);
6✔
241
    code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
6✔
242
    if (code != 0) {
6!
243
      stError("vgId:%d failed to cvt data", pMeta->vgId);
×
244
      goto _EXIT;
×
245
    }
246

247
    pIter = taosHashIterate(pBackend->cfInst, pIter);
6✔
248
  }
249

250
_EXIT:
3✔
251
  streamBackendCleanup((void*)pBackend);
3✔
252

253
  if (code == 0) {
3!
254
    int32_t len = strlen(pMeta->path) + 32;
3✔
255
    char*   state = taosMemoryCalloc(1, len);
3!
256
    if (state != NULL) {
3!
257
      (void)snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
3✔
258
      taosRemoveDir(state);
3✔
259
      taosMemoryFree(state);
3!
260
    } else {
261
      stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
×
262
    }
263
  }
264

265
  return code;
3✔
266
}
267

268
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
861✔
269
  int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
861✔
270
  if (compatible == STREAM_STATA_COMPATIBLE) {
861✔
271
    return 0;
855✔
272
  } else if (compatible == STREAM_STATA_NEED_CONVERT) {
6!
273
    stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
6!
274
    return streamMetaCvtDbFormat(pMeta);
6✔
275
  } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
×
276
    stError(
×
277
        "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
278
        "manually",
279
        pMeta->vgId, tsDataDir);
280

281
    return -1;
×
282
  }
283

284
  return 0;
×
285
}
286

287
int8_t streamTaskShouldRecalated(SStreamTask* pTask) { return pTask->info.fillHistory == 2 ? 1 : 0; }
×
288

289
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key, uint8_t recalated) {
69✔
290
  int32_t code = 0;
69✔
291
  int64_t chkpId = pTask->chkInfo.checkpointId;
69✔
292

293
  // int8_t recalated = streamTaskShouldRecalated(pTask);
294

295
  streamMutexLock(&pMeta->backendMutex);
69✔
296
  // streamId--taskId
297
  void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
69✔
298
  if ((ppBackend != NULL) && (*ppBackend != NULL)) {
69!
299
    void* p = taskDbAddRef(*ppBackend);
9✔
300
    if (p == NULL) {
9!
301
      stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
×
302
      streamMutexUnlock(&pMeta->backendMutex);
×
303
      return TSDB_CODE_FAILED;
×
304
    }
305

306
    STaskDbWrapper* pBackend = *ppBackend;
9✔
307
    pBackend->pMeta = pMeta;
9✔
308
    if (recalated) {
9!
309
      pTask->pRecalBackend = pBackend;
×
310
    } else {
311
      pTask->pBackend = pBackend;
9✔
312
    }
313

314
    streamMutexUnlock(&pMeta->backendMutex);
9✔
315
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
9!
316
    return 0;
9✔
317
  }
318

319
  STaskDbWrapper* pBackend = NULL;
60✔
320
  int64_t         processVer = -1;
60✔
321
  while (1) {
322
    code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
60✔
323
    if (code == 0) {
60!
324
      break;
60✔
325
    }
326

327
    streamMutexUnlock(&pMeta->backendMutex);
×
328
    taosMsleep(1000);
×
329

330
    stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
×
331
    streamMutexLock(&pMeta->backendMutex);
×
332
  }
333

334
  int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
60✔
335
  if (recalated) {
60!
336
    pTask->pRecalBackend = pBackend;
×
337
  } else {
338
    pTask->pBackend = pBackend;
60✔
339
  }
340
  pBackend->refId = tref;
60✔
341
  pBackend->pTask = pTask;
60✔
342
  pBackend->pMeta = pMeta;
60✔
343

344
  if (processVer != -1) {
60✔
345
    if (pTask->chkInfo.processedVer != processVer) {
9!
346
      stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
×
347
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
348
      pTask->chkInfo.processedVer = processVer;
×
349
      pTask->chkInfo.checkpointVer = processVer;
×
350
      pTask->chkInfo.nextProcessVer = processVer + 1;
×
351
    } else {
352
      stInfo("s-task:%s vgId:%d processedVer:%" PRId64
9!
353
             " in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
354
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
355
    }
356
  }
357

358
  code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
60✔
359
  if (code) {
60!
360
    stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
×
361
  }
362
  streamMutexUnlock(&pMeta->backendMutex);
60✔
363

364
  if (recalated) {
60!
365
    stDebug("s-task:0x%x set recalated backend %p", pTask->id.taskId, pBackend);
×
366
  } else {
367
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
60✔
368
  }
369
  return 0;
60✔
370
}
371

372
void streamMetaRemoveDB(void* arg, char* key) {
48✔
373
  if (arg == NULL || key == NULL) return;
48!
374

375
  SStreamMeta* pMeta = arg;
42✔
376
  streamMutexLock(&pMeta->backendMutex);
42✔
377
  int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
42✔
378
  if (code) {
42!
379
    stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
×
380
  }
381

382
  streamMutexUnlock(&pMeta->backendMutex);
42✔
383
}
384

385
int32_t streamMetaUpdateInfoInit(STaskUpdateInfo* pInfo) {
861✔
386
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
861✔
387

388
  pInfo->pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
861✔
389
  if (pInfo->pTasks == NULL) {
861!
390
    return terrno;
×
391
  }
392

393
  pInfo->pTaskList = taosArrayInit(4, sizeof(int32_t));
861✔
394
  if (pInfo->pTaskList == NULL) {
861!
395
    return terrno;
×
396
  }
397

398
  return TSDB_CODE_SUCCESS;
861✔
399
}
400

401
void streamMetaUpdateInfoCleanup(STaskUpdateInfo* pInfo) {
837✔
402
  taosHashCleanup(pInfo->pTasks);
837✔
403
  taosArrayDestroy(pInfo->pTaskList);
837✔
404
  pInfo->pTasks = NULL;
837✔
405
  pInfo->pTaskList = NULL;
837✔
406
}
837✔
407

408

409

410
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
861✔
411
                       int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
412
  QRY_PARAM_CHECK(p);
861!
413
  int32_t code = 0;
861✔
414
  int32_t lino = 0;
861✔
415

416
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
861!
417
  if (pMeta == NULL) {
861!
418
    stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
×
419
    return terrno;
×
420
  }
421

422
  int32_t len = strlen(path) + 64;
861✔
423
  char*   tpath = taosMemoryCalloc(1, len);
861!
424
  TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
861!
425

426
  (void)snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream");
861✔
427
  pMeta->path = tpath;
861✔
428

429
  code = streamMetaOpenTdb(pMeta);
861✔
430
  TSDB_CHECK_CODE(code, lino, _err);
861!
431

432
  if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
861!
433
    stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
×
434
            tstrerror(terrno));
435
    TSDB_CHECK_CODE(code, lino, _err);
×
436
  }
437

438
  // set the attribute when running on Linux OS
439
  TdThreadRwlockAttr attr;
440
  code = taosThreadRwlockAttrInit(&attr);
861✔
441
  TSDB_CHECK_CODE(code, lino, _err);
861!
442

443
#ifdef LINUX
444
  code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
861✔
445
  TSDB_CHECK_CODE(code, lino, _err);
861!
446
#endif
447

448
  code = taosThreadRwlockInit(&pMeta->lock, &attr);
861✔
449
  TSDB_CHECK_CODE(code, lino, _err);
861!
450

451
  code = taosThreadRwlockAttrDestroy(&attr);
861✔
452
  TSDB_CHECK_CODE(code, lino, _err);
861!
453

454
  if ((code = streamMetaBegin(pMeta) < 0)) {
861!
455
    stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
×
456
    goto _err;
×
457
  }
458

459
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
861✔
460
  pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
861✔
461
  TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
861!
462

463
  code = streamMetaUpdateInfoInit(&pMeta->updateInfo);
861✔
464
  TSDB_CHECK_CODE(code, lino, _err);
861!
465

466
  code = streamMetaInitStartInfo(&pMeta->startInfo);
861✔
467
  TSDB_CHECK_CODE(code, lino, _err);
861!
468

469
  // task list
470
  pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
861✔
471
  TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
861!
472

473
  pMeta->scanInfo.scanSentinel = 0;
861✔
474
  pMeta->scanInfo.lastScanTs = 0;
861✔
475
  pMeta->scanInfo.tickCounter = 0;
861✔
476

477
  pMeta->vgId = vgId;
861✔
478
  pMeta->ahandle = ahandle;
861✔
479
  pMeta->buildTaskFn = buildTaskFn;
861✔
480
  pMeta->expandTaskFn = expandTaskFn;
861✔
481
  pMeta->stage = stage;
861✔
482
  pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
861✔
483
  pMeta->updateInfo.activeTransId = -1;
861✔
484
  pMeta->updateInfo.completeTransId = -1;
861✔
485

486
  pMeta->startInfo.completeFn = fn;
861✔
487
  pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
861✔
488
  TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
861!
489

490
  pMeta->numOfPausedTasks = 0;
861✔
491
  pMeta->numOfStreamTasks = 0;
861✔
492
  pMeta->closeFlag = false;
861✔
493

494
  stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
861!
495

496
  code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
861✔
497
  TSDB_CHECK_CODE(code, lino, _err);
861!
498

499
  code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
861✔
500
  TSDB_CHECK_CODE(code, lino, _err);
861!
501

502
  // add refId at the end of initialization function
503
  pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
861✔
504

505
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
861!
506
  TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
861!
507

508
  memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
861✔
509

510
  code = metaRefMgtAdd(pMeta->vgId, pRid);
861✔
511
  TSDB_CHECK_CODE(code, lino, _err);
861!
512

513
  code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
861✔
514

515
  TSDB_CHECK_CODE(code, lino, _err);
861!
516

517
  *p = pMeta;
861✔
518
  return code;
861✔
519

520
_err:
×
521
  taosMemoryFree(pMeta->path);
×
522
  if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
×
523
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
×
524
  if (pMeta->pTaskDb) {
×
525
    tdbTbClose(pMeta->pTaskDb);
×
526
    pMeta->pTaskDb = NULL;
×
527
  }
528
  if (pMeta->pCheckpointDb) {
×
529
    tdbTbClose(pMeta->pCheckpointDb);
×
530
  }
531
  if (pMeta->db) {
×
532
    tdbClose(pMeta->db);
×
533
  }
534

535
  if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
×
536
  if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
×
537
  if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
×
538
  if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
×
539
  if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
×
540

541
  if (pMeta->startInfo.pStagesList) taosArrayDestroy(pMeta->startInfo.pStagesList);
×
542
  taosMemoryFree(pMeta);
×
543

544
  stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
×
545
  return code;
×
546
}
547

548
// todo refactor: the lock shoud be restricted in one function
549
#ifdef BUILD_NO_CALL
550
void streamMetaInitBackend(SStreamMeta* pMeta) {
551
  pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
552
  if (pMeta->streamBackend == NULL) {
553
    while (1) {
554
      streamMetaWLock(pMeta);
555
      pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
556
      if (pMeta->streamBackend != NULL) {
557
        break;
558
      }
559

560
      streamMetaWUnLock(pMeta);
561
      stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
562
      taosMsleep(100);
563
    }
564
  }
565

566
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
567
  streamBackendLoadCheckpointInfo(pMeta);
568
}
569
#endif
570

571
void streamMetaClear(SStreamMeta* pMeta) {
846✔
572
  // remove all existed tasks in this vnode
573
  int64_t st = taosGetTimestampMs();
846✔
574
  void*   pIter = NULL;
846✔
575

576
  while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
918✔
577
    int64_t      refId = *(int64_t*)pIter;
72✔
578
    SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
72✔
579
    if (p == NULL) {
72✔
580
      continue;
51✔
581
    }
582

583
    // release the ref by timer
584
    if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) {  // one more ref in timer
21!
585
      stDebug("s-task:%s stop schedTimer", p->id.idStr);
×
586
      streamTmrStop(p->schedInfo.pDelayTimer);
×
587
      p->info.delaySchedParam = 0;
×
588
    }
589

590
    int32_t code = taosRemoveRef(streamTaskRefPool, refId);
21✔
591
    if (code) {
21!
592
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
593
    }
594

595
    code = taosReleaseRef(streamTaskRefPool, refId);
21✔
596
    if (code) {
21!
597
      stError("vgId:%d failed to release refId:%" PRId64, pMeta->vgId, refId);
×
598
    }
599
  }
600

601
  int64_t et = taosGetTimestampMs();
846✔
602
  stDebug("vgId:%d clear task map, elapsed time:%.2fs", pMeta->vgId, (et - st)/1000.0);
846✔
603

604
  if (pMeta->streamBackendRid != 0) {
846!
605
    int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
×
606
    if (code) {
×
607
      stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
×
608
    }
609
  }
610

611
  int64_t et1 = taosGetTimestampMs();
846✔
612
  stDebug("vgId:%d clear backend completed, elapsed time:%.2fs", pMeta->vgId, (et1 - et)/1000.0);
846✔
613

614
  taosHashClear(pMeta->pTasksMap);
846✔
615

616
  taosArrayClear(pMeta->pTaskList);
846✔
617
  taosArrayClear(pMeta->chkpSaved);
846✔
618
  taosArrayClear(pMeta->chkpInUse);
846✔
619

620
  pMeta->numOfStreamTasks = 0;
846✔
621
  pMeta->numOfPausedTasks = 0;
846✔
622

623
  // NOTE: the willrestart/starting flag can NOT be cleared
624
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
846✔
625
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
846✔
626
  taosArrayClear(pMeta->startInfo.pStagesList);
846✔
627
  taosArrayClear(pMeta->startInfo.pRecvChkptIdTasks);
846✔
628

629
  pMeta->startInfo.readyTs = 0;
846✔
630
  pMeta->startInfo.elapsedTime = 0;
846✔
631
  pMeta->startInfo.startTs = 0;
846✔
632
}
846✔
633

634
void streamMetaClose(SStreamMeta* pMeta) {
840✔
635
  if (pMeta == NULL) {
840✔
636
    return;
3✔
637
  }
638

639
  stDebug("vgId:%d start to close stream meta", pMeta->vgId);
837✔
640
  int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
837✔
641
  if (code) {
837!
642
    stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
×
643
  }
644
}
645

646
void streamMetaCloseImpl(void* arg) {
837✔
647
  SStreamMeta* pMeta = arg;
837✔
648
  if (pMeta == NULL) {
837!
649
    return;
×
650
  }
651

652
  int32_t code = 0;
837✔
653
  int32_t vgId = pMeta->vgId;
837✔
654
  stDebug("vgId:%d start to do-close stream meta", vgId);
837✔
655

656
  streamMetaWLock(pMeta);
837✔
657
  streamMetaClear(pMeta);
837✔
658
  streamMetaWUnLock(pMeta);
837✔
659

660
  // already log the error, ignore here
661
  tdbAbort(pMeta->db, pMeta->txn);
837✔
662
  tdbTbClose(pMeta->pTaskDb);
837✔
663
  tdbTbClose(pMeta->pCheckpointDb);
837✔
664
  tdbClose(pMeta->db);
837✔
665

666
  taosArrayDestroy(pMeta->pTaskList);
837✔
667
  taosArrayDestroy(pMeta->chkpSaved);
837✔
668
  taosArrayDestroy(pMeta->chkpInUse);
837✔
669

670
  taosHashCleanup(pMeta->pTasksMap);
837✔
671
  taosHashCleanup(pMeta->pTaskDbUnique);
837✔
672

673
  streamMetaUpdateInfoCleanup(&pMeta->updateInfo);
837✔
674
  streamMetaClearStartInfo(&pMeta->startInfo);
837✔
675

676
  destroyMetaHbInfo(pMeta->pHbInfo);
837✔
677
  pMeta->pHbInfo = NULL;
837✔
678

679
  taosMemoryFree(pMeta->path);
837!
680
  streamMutexDestroy(&pMeta->backendMutex);
837✔
681

682
  bkdMgtDestroy(pMeta->bkdChkptMgt);
837✔
683

684
  pMeta->role = NODE_ROLE_UNINIT;
837✔
685
  code = taosThreadRwlockDestroy(&pMeta->lock);
837✔
686
  if (code) {
837!
687
    stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
×
688
  }
689

690
  taosMemoryFree(pMeta);
837!
691
  stDebug("vgId:%d end to close stream meta", vgId);
837✔
692
}
693

694
// todo let's check the status for each task
695
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
78✔
696
  int32_t vgId = pTask->pMeta->vgId;
78✔
697
  void*   buf = NULL;
78✔
698
  int32_t len;
699
  int32_t code;
700
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
78!
701
  if (code < 0) {
78!
702
    return -1;
×
703
  }
704

705
  buf = taosMemoryCalloc(1, len);
78!
706
  if (buf == NULL) {
78!
707
    return terrno;
×
708
  }
709

710
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
78!
711
    pTask->ver = SSTREAM_TASK_VER;
×
712
  }
713

714
  SEncoder encoder = {0};
78✔
715
  tEncoderInit(&encoder, buf, len);
78✔
716
  code = tEncodeStreamTask(&encoder, pTask);
78✔
717
  tEncoderClear(&encoder);
78✔
718

719
  if (code == -1) {
78!
720
    stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
721
    return TSDB_CODE_INVALID_MSG;
×
722
  }
723

724
  int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
78✔
725

726
  code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
78✔
727
  if (code != TSDB_CODE_SUCCESS) {
78!
728
    code = terrno;
×
729
    stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
×
730
            vgId, pTask->id.refId, tstrerror(code));
731

732
    int64_t refId = pTask->id.refId;
×
733
    int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
×
734
    if (ret != 0) {
×
735
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId);
×
736
    }
737
  } else {
738
    stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
78!
739
  }
740

741
  taosMemoryFree(buf);
78!
742
  return code;
78✔
743
}
744

745
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
15✔
746
  int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
15✔
747
  int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
15✔
748
  if (code != 0) {
15!
749
    stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
×
750
            tstrerror(terrno));
751
  } else {
752
    stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
15!
753
  }
754

755
  return code;
15✔
756
}
757

758
// add to the ready tasks hash map, not the restored tasks hash map
759
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
69✔
760
  *pAdded = false;
69✔
761

762
  int32_t code = 0;
69✔
763
  int64_t refId = 0;
69✔
764
  STaskId id = streamTaskGetTaskId(pTask);
69✔
765
  void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
69✔
766

767
  if (p != NULL) {
69✔
768
    stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
6!
769
    tFreeStreamTask(pTask);
6✔
770
    return code;
6✔
771
  }
772

773
  if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
63!
774
    tFreeStreamTask(pTask);
×
775
    return code;
×
776
  }
777

778
  p = taosArrayPush(pMeta->pTaskList, &pTask->id);
63✔
779
  if (p == NULL) {
63!
780
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
781
    tFreeStreamTask(pTask);
×
782
    return terrno;
×
783
  }
784

785
  pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
63✔
786
  code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
63✔
787
  if (code) {
63!
788
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
789
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
790

791
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
792
    if (ret != 0) {
×
793
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
794
    }
795
    return code;
×
796
  }
797

798
  if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
63!
799
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
800
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
801

802
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
803
    if (ret) {
×
804
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
805
    }
806
    return code;
×
807
  }
808

809
  if ((code = streamMetaCommit(pMeta)) != 0) {
63!
810
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
811
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
812

813
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
814
    if (ret) {
×
815
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
816
    }
817

818
    return code;
×
819
  }
820

821
  if (pTask->info.fillHistory == 0) {
63✔
822
    int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
48✔
823
  }
824

825
  *pAdded = true;
63✔
826
  return code;
63✔
827
}
828

829
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
3,534✔
830
  int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
3,534✔
831
  int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
3,534✔
832
  if (sizeInList != size) {
3,534!
833
    stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
×
834
  }
835

836
  return size;
3,534✔
837
}
838

839
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
8,814✔
840
  QRY_PARAM_CHECK(pTask);
8,814!
841
  STaskId  id = {.streamId = streamId, .taskId = taskId};
8,814✔
842
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
8,814✔
843
  if (pTaskRefId == NULL) {
8,814✔
844
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
6✔
845
  }
846

847
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
8,808✔
848
  if (p == NULL) {
8,808✔
849
    stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId);
45!
850
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
45✔
851
  }
852

853
  if (p->id.refId != *pTaskRefId) {
8,763!
854
    stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
×
855
            p->id.refId);
856
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
857
    if (ret) {
×
858
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
859
    }
860

861
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
862
  }
863

864
  if (streamTaskShouldStop(p)) {
8,763✔
865
    stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
6!
866
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
6✔
867
    if (ret) {
6!
868
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
869
    }
870
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
6✔
871
  }
872

873
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
8,757!
874
  *pTask = p;
8,757✔
875
  return TSDB_CODE_SUCCESS;
8,757✔
876
}
877

878
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
924✔
879
  QRY_PARAM_CHECK(pTask);
924!
880
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
924✔
881

882
  if (pTaskRefId == NULL) {
924✔
883
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
3✔
884
  }
885

886
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
921✔
887
  if (p == NULL) {
921!
888
    stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
×
889
            *pTaskRefId);
890
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
891
  }
892

893
  if (p->id.refId != *pTaskRefId) {
921!
894
    stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
×
895
            *pTaskRefId, p->id.refId);
896
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
897
    if (ret) {
×
898
      stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
×
899
    }
900

901
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
902
  }
903

904
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
921!
905
  *pTask = p;
921✔
906
  return TSDB_CODE_SUCCESS;
921✔
907
}
908

909
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
8,658✔
910
  streamMetaRLock(pMeta);
8,658✔
911
  int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
8,658✔
912
  streamMetaRUnLock(pMeta);
8,658✔
913
  return code;
8,658✔
914
}
915

916
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
10,848✔
917
  if (pTask == NULL) {
10,848✔
918
    return;
15✔
919
  }
920

921
  int32_t taskId = pTask->id.taskId;
10,833✔
922
  int64_t refId = pTask->id.refId;
10,833✔
923
  stDebug("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
10,833!
924
  int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
10,833✔
925
  if (ret) {
10,833!
926
    stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
×
927
  }
928
}
929

930
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
15✔
931
  bool remove = false;
15✔
932
  for (int32_t i = 0; i < num; ++i) {
33!
933
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
33✔
934
    if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
33!
935
      taosArrayRemove(pTaskList, i);
15✔
936
      remove = true;
15✔
937
      break;
15✔
938
    }
939
  }
940

941
  if (!remove) {
15!
942
    stError("s-task:0x%x not in meta task list, internal error", id->taskId);
×
943
  }
944
}
15✔
945

946
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
15✔
947
  int32_t code = 0;
15✔
948

949
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
15✔
950
    code = streamTaskSendCheckpointSourceRsp(pTask);
6✔
951
    if (code) {
6!
952
      stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
×
953
              tstrerror(code));
954
    }
955
  }
956

957
  // let's kill the query procedure within stream, to end it ASAP.
958
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
15!
959
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
9✔
960
    if (code != TSDB_CODE_SUCCESS) {
9!
961
      stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
962
    }
963
  }
964
  return code;
15✔
965
}
966

967
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
15✔
968
  SStreamTask* pTask = NULL;
15✔
969
  int32_t      vgId = pMeta->vgId;
15✔
970
  int32_t      code = 0;
15✔
971
  STaskId      id = {.streamId = streamId, .taskId = taskId};
15✔
972

973
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
15✔
974
  if (code == 0) {
15!
975
    // desc the paused task counter
976
    if (streamTaskShouldPause(pTask)) {
15!
977
      int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
×
978
      stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
×
979
    }
980

981
    // handle the dropping event
982
    code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
15✔
983
    if (code) {
15!
984
      stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
×
985
    }
986

987
    stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
15!
988

989
    // it is a fill-history task, remove the related stream task's id that points to it
990
    if (pTask->info.fillHistory == 0) {
15!
991
      int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
×
992
    }
993

994
    code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
15✔
995
    doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
15✔
996
    code = streamMetaRemoveTaskInMeta(pMeta, &id);
15✔
997
    if (code) {
15!
998
      stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
×
999
    }
1000

1001
    int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
15✔
1002
    int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
15✔
1003
    if (sizeInList != size) {
15!
1004
      stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
×
1005
    }
1006

1007
    if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
15!
1008
      stDebug("s-task:%s stop schedTimer", pTask->id.idStr);
×
1009
      streamTmrStop(pTask->schedInfo.pDelayTimer);
×
1010
      pTask->info.delaySchedParam = 0;
×
1011
    }
1012

1013
    int64_t refId = pTask->id.refId;
15✔
1014
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
15✔
1015
    if (ret != 0) {
15!
1016
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1017
    }
1018

1019
    streamMetaReleaseTask(pMeta, pTask);
15✔
1020
  } else {
1021
    stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
×
1022
  }
1023

1024
  return 0;
15✔
1025
}
1026

1027
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
×
1028
  SStreamTask* pTask = NULL;
×
1029
  int32_t      code = 0;
×
1030
  int32_t      vgId = pMeta->vgId;
×
1031
  int32_t      numOfTasks = 0;
×
1032

1033
  streamMetaWLock(pMeta);
×
1034

1035
//  code = streamMetaUnregisterTask(pMeta, streamId, taskId);
1036
//  numOfTasks = streamMetaGetNumOfTasks(pMeta);
1037
//  if (code) {
1038
//    stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1039
//  }
1040
//
1041
//  code = streamMetaCommit(pMeta);
1042
//  if (code) {
1043
//    stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1044
//  } else {
1045
//    stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
1046
//  }
1047

1048
  streamMetaWUnLock(pMeta);
×
1049

1050
  return code;
×
1051
}
1052

1053
int32_t streamMetaBegin(SStreamMeta* pMeta) {
861✔
1054
  streamMetaWLock(pMeta);
861✔
1055
  int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
861✔
1056
                          TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1057
  if (code) {
861!
1058
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1059
  }
1060
  streamMetaWUnLock(pMeta);
861✔
1061
  return code;
861✔
1062
}
1063

1064
int32_t streamMetaCommit(SStreamMeta* pMeta) {
939✔
1065
  int32_t code = tdbCommit(pMeta->db, pMeta->txn);
939✔
1066
  if (code != 0) {
939!
1067
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1068
    stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1069
            pMeta->fatalInfo.line);
1070
  }
1071

1072
  code = tdbPostCommit(pMeta->db, pMeta->txn);
939✔
1073
  if (code != 0) {
939!
1074
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1075
    stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1076
            pMeta->fatalInfo.line);
1077
    return code;
×
1078
  }
1079

1080
  code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
939✔
1081
                  TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1082
  if (code != 0) {
939!
1083
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1084
    stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
×
1085
  } else {
1086
    stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
939✔
1087
  }
1088

1089
  return code;
939✔
1090
}
1091

1092
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
6✔
1093
  int64_t checkpointId = 0;
6✔
1094
  int32_t code = 0;
6✔
1095

1096
  TBC* pCur = NULL;
6✔
1097
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
6!
1098
    stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1099
    return checkpointId;
×
1100
  }
1101

1102
  void*    pKey = NULL;
6✔
1103
  int32_t  kLen = 0;
6✔
1104
  void*    pVal = NULL;
6✔
1105
  int32_t  vLen = 0;
6✔
1106
  SDecoder decoder;
1107

1108
  code = tdbTbcMoveToFirst(pCur);
6✔
1109
  if (code) {
6!
1110
    stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1111
    tdbTbcClose(pCur);
×
1112
    return checkpointId;
×
1113
  }
1114

1115
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
15✔
1116
    if (pVal == NULL || vLen == 0) {
9!
1117
      break;
1118
    }
1119
    SCheckpointInfo info;
1120
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
9✔
1121
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
9!
1122
      tDecoderClear(&decoder);
×
1123
      continue;
×
1124
    }
1125
    tDecoderClear(&decoder);
9✔
1126

1127
    checkpointId = TMAX(checkpointId, info.checkpointId);
9✔
1128
  }
1129

1130
  stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
6!
1131

1132
  tdbFree(pKey);
6✔
1133
  tdbFree(pVal);
6✔
1134

1135
  tdbTbcClose(pCur);
6✔
1136
  return checkpointId;
6✔
1137
}
1138

1139
static void dropHistoryTaskIfNoStreamTask(SStreamMeta* pMeta, SArray*  pRecycleList) {
846✔
1140
  int32_t i = 0;
846✔
1141
  while (i < taosArrayGetSize(pMeta->pTaskList)) {
870✔
1142
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
24✔
1143
    if (pTaskId == NULL) {
24!
1144
      i++;
×
1145
      continue;
×
1146
    }
1147
    SStreamTask* task = taosAcquireRef(streamTaskRefPool, pTaskId->refId);
24✔
1148
    if (task != NULL && task->info.fillHistory == 1) {
24!
1149
      if (taosHashGet(pMeta->pTasksMap, &task->streamTaskId, sizeof(STaskId)) == NULL &&
×
1150
        task->status.taskStatus != TASK_STATUS__DROPPING) {
×
1151
        STaskId id = streamTaskGetTaskId(task);
×
1152
        if (taosArrayPush(pRecycleList, &id) == NULL) {
×
1153
          stError("%s s-task:0x%x failed to add into pRecycleList list due to:%d", __FUNCTION__, task->id.taskId, terrno);
×
1154
        } else {
1155
          int32_t total = taosArrayGetSize(pRecycleList);
×
1156
          stInfo("%s s-task:0x%x is already dropped, add into recycle list, total:%d", __FUNCTION__, task->id.taskId, total);
×
1157
        }
1158
        int32_t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(STaskId));
×
1159
        if (code == 0) {
×
1160
          taosArrayRemove(pMeta->pTaskList, i);
×
1161
        } else {
1162
          i++;
×
1163
          stError("%s s-task:0x%x failed to remove task from taskmap, code:%d", __FUNCTION__, task->id.taskId, code);
×
1164
        }
1165
        if (taosReleaseRef(streamTaskRefPool, pTaskId->refId) != 0) {
×
1166
          stError("%s s-task:0x%x failed to release refId:%" PRId64, __FUNCTION__, task->id.taskId, pTaskId->refId);
×
1167
        }
1168
        continue;
×
1169
      }
1170
    }
1171
    if (task != NULL && taosReleaseRef(streamTaskRefPool, pTaskId->refId) != 0) {
24!
1172
      stError("%s s-task:0x%x failed to release refId:%" PRId64, __FUNCTION__, task->id.taskId, pTaskId->refId);
×
1173
    }
1174
    i++;
24✔
1175
  }
1176
}
846✔
1177

1178
// not allowed to return error code
1179
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
846✔
1180
  TBC*     pCur = NULL;
846✔
1181
  void*    pKey = NULL;
846✔
1182
  int32_t  kLen = 0;
846✔
1183
  void*    pVal = NULL;
846✔
1184
  int32_t  vLen = 0;
846✔
1185
  SDecoder decoder;
1186
  int32_t  vgId = 0;
846✔
1187
  int32_t  code = 0;
846✔
1188
  SArray*  pRecycleList = NULL;
846✔
1189

1190
  if (pMeta == NULL) {
846!
1191
    return;
×
1192
  }
1193

1194
  vgId = pMeta->vgId;
846✔
1195
  pRecycleList = taosArrayInit(4, sizeof(STaskId));
846✔
1196
  if (pRecycleList == NULL) {
846!
1197
    stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
×
1198
    return;
×
1199
  }
1200

1201
  stInfo("vgId:%d load stream tasks from meta files", vgId);
846!
1202

1203
  code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
846✔
1204
  if (code != TSDB_CODE_SUCCESS) {
846!
1205
    stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1206
    taosArrayDestroy(pRecycleList);
×
1207
    return;
×
1208
  }
1209

1210
  code = tdbTbcMoveToFirst(pCur);
846✔
1211
  if (code) {
846!
1212
    stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1213
    taosArrayDestroy(pRecycleList);
×
1214
    tdbTbcClose(pCur);
×
1215
    return;
×
1216
  }
1217

1218
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
870✔
1219
    if (pVal == NULL || vLen == 0) {
24!
1220
      break;
1221
    }
1222

1223
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
24!
1224
    if (pTask == NULL) {
24!
1225
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1226
      stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
×
1227
      break;
×
1228
    }
1229

1230
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
24✔
1231
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
24!
1232
      tDecoderClear(&decoder);
×
1233
      tFreeStreamTask(pTask);
×
1234
      stError(
×
1235
          "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
1236
          "stream manually",
1237
          vgId, tsDataDir);
1238
      break;
×
1239
    }
1240
    tDecoderClear(&decoder);
24✔
1241

1242
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
24!
1243
      int32_t taskId = pTask->id.taskId;
×
1244
      STaskId id = streamTaskGetTaskId(pTask);
×
1245

1246
      tFreeStreamTask(pTask);
×
1247
      void* px = taosArrayPush(pRecycleList, &id);
×
1248
      if (px == NULL) {
×
1249
        stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
×
1250
      }
1251

1252
      int32_t total = taosArrayGetSize(pRecycleList);
×
1253
      stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
×
1254
      continue;
×
1255
    }
1256

1257
    stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
24!
1258
            pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
1259

1260
    // do duplicate task check.
1261
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
24✔
1262
    void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
24✔
1263
    if (p == NULL) {
24!
1264
      code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
24✔
1265
      if (code < 0) {
24!
1266
        stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
×
1267
        tFreeStreamTask(pTask);
×
1268
        continue;
×
1269
      }
1270

1271
      void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
24✔
1272
      if (px == NULL) {
24!
1273
        stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
×
1274
      }
1275
    } else {
1276
      // todo this should replace the existed object put by replay creating stream task msg from mnode
1277
      stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
×
1278
      tFreeStreamTask(pTask);
×
1279
      continue;
×
1280
    }
1281

1282
    pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
24✔
1283

1284
    if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
24!
1285
      int64_t refId = pTask->id.refId;
×
1286
      stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
×
1287
              pTask->id.taskId, tstrerror(terrno), refId);
1288

1289
      void*   px = taosArrayPop(pMeta->pTaskList);
×
1290
      int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
1291
      if (ret != 0) {
×
1292
        stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1293
      }
1294
      continue;
×
1295
    }
1296

1297
    stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId);
24!
1298
    if (pTask->info.fillHistory == 0) {
24!
1299
      int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
24✔
1300
    }
1301

1302
    if (streamTaskShouldPause(pTask)) {
24!
1303
      int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
1304
    }
1305
  }
1306

1307
  tdbFree(pKey);
846✔
1308
  tdbFree(pVal);
846✔
1309

1310
  tdbTbcClose(pCur);
846✔
1311

1312
  dropHistoryTaskIfNoStreamTask(pMeta, pRecycleList);
846✔
1313

1314
  if (taosArrayGetSize(pRecycleList) > 0) {
846!
1315
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
×
1316
      STaskId* pId = taosArrayGet(pRecycleList, i);
×
1317
      code = streamMetaRemoveTaskInMeta(pMeta, pId);
×
1318
      if (code) {
×
1319
        stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
×
1320
      }
1321
    }
1322
  }
1323

1324
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
846✔
1325
  stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
846✔
1326
          pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
1327

1328
  taosArrayDestroy(pRecycleList);
846✔
1329
  code = streamMetaCommit(pMeta);
846✔
1330
  if (code) {
846!
1331
    stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
×
1332
  }
1333
}
1334

1335
void streamMetaNotifyClose(SStreamMeta* pMeta) {
1,389✔
1336
  int32_t vgId = pMeta->vgId;
1,389✔
1337
  int64_t startTs = 0;
1,389✔
1338
  int32_t sendCount = 0;
1,389✔
1339

1340
  streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
1,389✔
1341
  stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
1,389!
1342
         vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
1343

1344
  // wait for the stream meta hb function stopping
1345
  pMeta->closeFlag = true;
1,389✔
1346
  streamMetaWaitForHbTmrQuit(pMeta);
1,389✔
1347

1348
  stDebug("vgId:%d start to check all tasks for closing", vgId);
1,389✔
1349
  int64_t st = taosGetTimestampMs();
1,389✔
1350

1351
  streamMetaRLock(pMeta);
1,389✔
1352

1353
  SArray* pTaskList = NULL;
1,389✔
1354
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
1,389✔
1355
  if (code != TSDB_CODE_SUCCESS) {
1356
  }
1357

1358
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
1,389✔
1359
  for (int32_t i = 0; i < numOfTasks; ++i) {
1,491✔
1360
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
102✔
1361
    SStreamTask*   pTask = NULL;
102✔
1362

1363
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
102✔
1364
    if (code != TSDB_CODE_SUCCESS) {
102✔
1365
      continue;
51✔
1366
    }
1367

1368
    int64_t refId = pTask->id.refId;
51✔
1369
    int32_t ret = streamTaskStop(pTask);
51✔
1370
    if (ret) {
51!
1371
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
1372
    }
1373

1374
    streamMetaReleaseTask(pMeta, pTask);
51✔
1375
    ret = taosRemoveRef(streamTaskRefPool, refId);
51✔
1376
    if (ret) {
51!
1377
      stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
×
1378
    }
1379
  }
1380

1381
  taosArrayDestroy(pTaskList);
1,389✔
1382

1383
  double el = (taosGetTimestampMs() - st) / 1000.0;
1,389✔
1384
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
1,389✔
1385

1386
  if (pMeta->scanInfo.scanTimer != NULL) {
1,389✔
1387
    streamTmrStop(pMeta->scanInfo.scanTimer);
687✔
1388
    pMeta->scanInfo.scanTimer = NULL;
687✔
1389
  }
1390

1391
  streamMetaRUnLock(pMeta);
1,389✔
1392
}
1,389✔
1393

1394
void streamMetaStartHb(SStreamMeta* pMeta) {
687✔
1395
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
687!
1396
  if (pRid == NULL) {
687!
1397
    stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
×
1398
    return;
×
1399
  }
1400

1401
  *pRid = pMeta->rid;
687✔
1402
  int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
687✔
1403
  if (code) {
687!
1404
    return;
×
1405
  }
1406

1407
  streamMetaHbToMnode(pRid, NULL);
687✔
1408
}
1409

1410
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
1,392✔
1411
  QRY_PARAM_CHECK(pList);
1,392!
1412

1413
  int32_t code = 0;
1,392✔
1414
  SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
1,392✔
1415
  if (pTaskList == NULL) {
1,392!
1416
    stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
×
1417
    return terrno;
×
1418
  }
1419

1420
  *pList = pTaskList;
1,392✔
1421

1422
  bool sendMsg = pMeta->sendMsgBeforeClosing;
1,392✔
1423
  if (!sendMsg) {
1,392!
1424
    stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
1,392✔
1425
    return TSDB_CODE_SUCCESS;
1,392✔
1426
  }
1427

1428
  stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
×
1429

1430
  // send hb msg to mnode before closing all tasks.
1431
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
×
1432
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
1433
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
1434
    SStreamTask*   pTask = NULL;
×
1435

1436
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
1437
    if (code != TSDB_CODE_SUCCESS) {  // this error is ignored
×
1438
      continue;
×
1439
    }
1440

1441
    streamTaskSetCheckpointFailed(pTask);
×
1442
    streamMetaReleaseTask(pMeta, pTask);
×
1443
  }
1444

1445
  code = streamMetaSendHbHelper(pMeta);
×
1446
  pMeta->sendMsgBeforeClosing = false;
×
1447
  return TSDB_CODE_SUCCESS;  // always return true
×
1448
}
1449

1450
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t term, bool isLeader) {
1,044✔
1451
  streamMetaWLock(pMeta);
1,044✔
1452

1453
  int64_t prevTerm = pMeta->stage;
1,044✔
1454
  int32_t prevRole = pMeta->role;
1,044✔
1455

1456
  pMeta->stage = term;
1,044✔
1457
  pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
1,044✔
1458

1459
  // mark the sign to send msg before close all tasks
1460
  // 1. for a leader vnode, always send msg before closing itself
1461
  // 2. for a follower vnode, if it's changed from a leader, also sending msg before closing.
1462
  if (prevRole == NODE_ROLE_LEADER) {
1,044!
1463
    pMeta->sendMsgBeforeClosing = true;
×
1464
  }
1465

1466
  if ((prevRole == NODE_ROLE_FOLLOWER || prevRole == NODE_ROLE_LEADER) && (prevRole != pMeta->role) &&
1,122!
1467
      (taosArrayGetSize(pMeta->pTaskList) > 0)) {
78✔
1468
    SStreamTask* pTask = NULL;
×
1469
    STaskId*     pId = taosArrayGet(pMeta->pTaskList, 0);
×
1470

1471
    int32_t code = streamMetaAcquireTaskUnsafe(pMeta, pId, &pTask);
×
1472
    if (code == 0) {
×
1473
      stInfo("vgId:%d role changed, added into nodeUpdate list, use s-task:0x%s", pMeta->vgId, pTask->id.idStr);
×
1474
      int32_t unused = streamTaskAddIntoNodeUpdateList(pTask, pMeta->vgId);
×
1475
      streamMetaReleaseTask(pMeta, pTask);
×
1476
    }
1477
  }
1478

1479
  if (!isLeader) {
1,044✔
1480
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
357✔
1481
  } else {  // wait for nodeep update if become leader from follower
1482
    if (prevRole == NODE_ROLE_FOLLOWER) {
687✔
1483
      pMeta->startInfo.tasksWillRestart = 1;
78✔
1484
    }
1485
  }
1486

1487
  streamMetaWUnLock(pMeta);
1,044✔
1488

1489
  if (isLeader) {
1,044✔
1490
    if (prevRole == NODE_ROLE_FOLLOWER) {
687✔
1491
      stInfo("vgId:%d update term:%" PRId64 ", prevTerm:%" PRId64
78!
1492
             " prevRole:%d leader:%d, start to send Hb, rid:%" PRId64 " restart after nodeEp being updated",
1493
             pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->rid);
1494
    } else {
1495
      stInfo("vgId:%d update term:%" PRId64 ", prevTerm:%" PRId64
609!
1496
             " prevRole:%d leader:%d, start to send Hb, rid:%" PRId64,
1497
             pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->rid);
1498
    }
1499
    streamMetaStartHb(pMeta);
687✔
1500
  } else {
1501
    stInfo("vgId:%d update term:%" PRId64 " prevTerm:%" PRId64 " prevRole:%d leader:%d sendMsg beforeClosing:%d",
357!
1502
           pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->sendMsgBeforeClosing);
1503
  }
1504
}
1,044✔
1505

1506
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
12✔
1507
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
12✔
1508
  for (int32_t i = 0; i < num; ++i) {
33✔
1509
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
21✔
1510
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
21✔
1511
    SStreamTask*   pTask = NULL;
21✔
1512
    int32_t        code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
21✔
1513

1514
    if (code == 0) {
21!
1515
      if (pTask->status.downstreamReady == 0) {
21!
1516
        streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
×
1517
        return false;
×
1518
      }
1519
      streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
21✔
1520
    }
1521
  }
1522

1523
  return true;
12✔
1524
}
1525

1526
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
15✔
1527
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
15✔
1528

1529
  stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
15!
1530
  if (numOfTasks == 0) {
15!
1531
    return TSDB_CODE_SUCCESS;
×
1532
  }
1533

1534
  for (int32_t i = 0; i < numOfTasks; ++i) {
39✔
1535
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
24✔
1536
    STaskId        id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
24✔
1537
    SStreamTask*   pTask = NULL;
24✔
1538
    int32_t        code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
24✔
1539
    if (code == 0) {
24!
1540
      streamTaskResetStatus(pTask);
24✔
1541
      streamMetaReleaseTask(pMeta, pTask);
24✔
1542
    }
1543
  }
1544

1545
  return 0;
15✔
1546
}
1547

1548
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
15✔
1549
                                     int64_t startTs) {
1550
  const char* id = pTask->id.idStr;
15✔
1551
  int32_t     vgId = pMeta->vgId;
15✔
1552
  int32_t     code = 0;
15✔
1553

1554
  // keep the already updated info
1555
  STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
15✔
1556
  code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
15✔
1557
  if (code != 0) {
15!
1558
    stError("s-task:%s failed to put updateTask into update list", id);
×
1559
  }
1560

1561
  int64_t el = taosGetTimestampMs() - startTs;
15✔
1562
  if (pHTask != NULL) {
15!
1563
    STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
×
1564
    code = taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
×
1565
    if (code != 0) {
×
1566
      stError("s-task:%s failed to put updateTask into update list", id);
×
1567
    } else {
1568
      stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
×
1569
              " ms",
1570
              id, vgId, transId, el);
1571
    }
1572
  } else {
1573
    stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
15!
1574
            id, vgId, transId, el);
1575
  }
1576
}
15✔
1577

1578
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
9✔
1579
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
9✔
1580
  int32_t          num = taosArrayGetSize(pInfo->pTaskList);
9✔
1581

1582
  taosHashClear(pInfo->pTasks);
9✔
1583
  taosArrayClear(pInfo->pTaskList);
9✔
1584

1585
  int32_t prev = pInfo->completeTransId;
9✔
1586
  pInfo->completeTransId = pInfo->activeTransId;
9✔
1587
  pInfo->activeTransId = -1;
9✔
1588
  pInfo->completeTs = taosGetTimestampMs();
9✔
1589

1590
  stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64
9!
1591
          ", complete transId:%d->%d, update Tasks:%d reset active transId",
1592
          pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId, num);
1593
}
9✔
1594

1595
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
15✔
1596
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
15✔
1597

1598
  if (transId > pInfo->completeTransId) {
15!
1599
    if (pInfo->activeTransId == -1) {
15✔
1600
      taosHashClear(pInfo->pTasks);
9✔
1601
      pInfo->activeTransId = transId;
9✔
1602

1603
      // interrupt the start all tasks procedure, only partial tasks will be started
1604
      // the completion of this processed is based on the partial started tasks.
1605
      if (pMeta->startInfo.startAllTasks == 1) {
9!
1606
        int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
×
1607
        pMeta->startInfo.partialTasksStarted = true;
×
1608
        stInfo(
×
1609
            "vgId:%d set the active epset update transId:%d, prev complete transId:%d, start all interrupted, only %d "
1610
            "tasks were started",
1611
            pMeta->vgId, transId, pInfo->completeTransId, num);
1612
      } else {
1613
        stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId,
9!
1614
               pInfo->completeTransId);
1615
      }
1616
      return true;
9✔
1617
    } else {
1618
      if (pInfo->activeTransId == transId) {
6!
1619
        // do nothing
1620
        return true;
6✔
1621
      } else if (transId < pInfo->activeTransId) {
×
1622
        stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard",
×
1623
                pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
1624
        return false;
×
1625
      } else {  // transId > pInfo->activeTransId
1626
        taosHashClear(pInfo->pTasks);
×
1627
        int32_t prev = pInfo->activeTransId;
×
1628
        pInfo->activeTransId = transId;
×
1629

1630
        stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId,
×
1631
               transId, prev, pInfo->completeTransId);
1632
        return true;
×
1633
      }
1634
    }
1635
  } else if (transId == pInfo->completeTransId) {
×
1636
    stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
×
1637
            pInfo->completeTs);
1638
    return false;
×
1639
  } else {  // pInfo->completeTransId > transId
1640
    stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
×
1641
            pMeta->vgId, pInfo->activeTransId, transId);
1642
    return false;
×
1643
  }
1644
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc