• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.13
/source/libs/stream/src/streamMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "streamInt.h"
18
#include "tmisce.h"
19
#include "tref.h"
20
#include "tsched.h"
21
#include "tstream.h"
22
#include "ttimer.h"
23
#include "wal.h"
24

25
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
26

27
int32_t streamBackendId = 0;
28
int32_t streamBackendCfWrapperId = 0;
29
int32_t taskDbWrapperId = 0;
30
int32_t streamTaskRefPool = 0;
31

32
static int32_t streamMetaBegin(SStreamMeta* pMeta);
33
static void    streamMetaCloseImpl(void* arg);
34

35
typedef struct {
36
  TdThreadMutex mutex;
37
  SHashObj*     pTable;
38
} SMetaRefMgt;
39

40
SMetaRefMgt gMetaRefMgt;
41

42
int32_t metaRefMgtInit();
43
void    metaRefMgtCleanup();
44

45
static void streamMetaEnvInit() {
132✔
46
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
132✔
47
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
132✔
48
  taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
132✔
49

50
  streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
132✔
51
  streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
132✔
52

53
  int32_t code = metaRefMgtInit();
132✔
54
  if (code) {
132!
55
    stError("failed to init stream meta mgmt env, start failed");
×
56
    return;
×
57
  }
58

59
  code = streamTimerInit();
132✔
60
  if (code) {
132!
61
    stError("failed to init stream meta env, start failed");
×
62
  }
63
}
64

65
void streamMetaInit() {
134✔
66
  int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
134✔
67
  if (code) {
134!
68
    stError("failed to init stream Meta model, code:%s", tstrerror(code));
×
69
  }
70
}
134✔
71

72
void streamMetaCleanup() {
70✔
73
  taosCloseRef(streamBackendId);
70✔
74
  taosCloseRef(streamBackendCfWrapperId);
70✔
75
  taosCloseRef(streamMetaRefPool);
70✔
76
  taosCloseRef(streamTaskRefPool);
70✔
77

78
  metaRefMgtCleanup();
70✔
79
  streamTimerCleanUp();
70✔
80
}
70✔
81

82
int32_t metaRefMgtInit() {
132✔
83
  int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
132✔
84
  if (code) {
132!
85
    return code;
×
86
  }
87

88
  if (code == 0) {
132!
89
    gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
132✔
90
  }
91

92
  if (gMetaRefMgt.pTable == NULL) {
132!
93
    return terrno;
×
94
  } else {
95
    return code;
132✔
96
  }
97
}
98

99
void metaRefMgtCleanup() {
70✔
100
  void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
70✔
101
  while (pIter) {
578✔
102
    int64_t* p = *(int64_t**)pIter;
508✔
103
    taosMemoryFree(p);
508!
104
    pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
508✔
105
  }
106

107
  taosHashCleanup(gMetaRefMgt.pTable);
70✔
108
  streamMutexDestroy(&gMetaRefMgt.mutex);
70✔
109
}
70✔
110

111
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
873✔
112
  int32_t code = 0;
873✔
113
  void*   p = NULL;
873✔
114

115
  streamMutexLock(&gMetaRefMgt.mutex);
873✔
116

117
  p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
873✔
118
  if (p == NULL) {
873!
119
    code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
873✔
120
    if (code) {
873!
121
      stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid,
×
122
              tstrerror(code));
123
      return code;
×
124
    } else {  // not
125
              //      stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
126
    }
127
  } else {
128
    stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid);
×
129
  }
130

131
  streamMutexUnlock(&gMetaRefMgt.mutex);
873✔
132
  return code;
873✔
133
}
134

135
void metaRefMgtRemove(int64_t* pRefId) {
357✔
136
  streamMutexLock(&gMetaRefMgt.mutex);
357✔
137

138
  int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
357✔
139
  taosMemoryFree(pRefId);
357!
140
  streamMutexUnlock(&gMetaRefMgt.mutex);
357✔
141
}
357✔
142

143
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
287✔
144
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
287!
145
    stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
×
146
    return -1;
×
147
  }
148

149
  if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
287!
150
    stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
×
151
    return -1;
×
152
  }
153

154
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
287!
155
    stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
×
156
    return -1;
×
157
  }
158

159
  return 0;
287✔
160
}
161

162
//
163
// impl later
164
//
165
enum STREAM_STATE_VER {
166
  STREAM_STATA_NO_COMPATIBLE,
167
  STREAM_STATA_COMPATIBLE,
168
  STREAM_STATA_NEED_CONVERT,
169
};
170

171
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
287✔
172
  int8_t  ret = STREAM_STATA_COMPATIBLE;
287✔
173
  TBC*    pCur = NULL;
287✔
174
  int32_t code = 0;
287✔
175
  void*   pKey = NULL;
287✔
176
  int32_t kLen = 0;
287✔
177
  void*   pVal = NULL;
287✔
178
  int32_t vLen = 0;
287✔
179

180
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {  // no task info, no stream
287!
181
    return ret;
×
182
  }
183

184
  code = tdbTbcMoveToFirst(pCur);
287✔
185
  if (code) {
287!
186
    stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
×
187
            tstrerror(code));
188
    tdbTbcClose(pCur);
×
189
    return ret;
×
190
  }
191

192
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
287✔
193
    if (pVal == NULL || vLen == 0) {
2!
194
      break;
195
    }
196

197
    SDecoder        decoder;
198
    SCheckpointInfo info;
199
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
2✔
200
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
2!
201
      tDecoderClear(&decoder);
×
202
      continue;
×
203
    }
204

205
    if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
2!
206
      ret = STREAM_STATA_NO_COMPATIBLE;
×
207
    } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
2!
208
      ret = STREAM_STATA_NEED_CONVERT;
2✔
209
    }
210

211
    tDecoderClear(&decoder);
2✔
212
    break;
2✔
213
  }
214

215
  tdbFree(pKey);
287✔
216
  tdbFree(pVal);
287✔
217
  tdbTbcClose(pCur);
287✔
218
  return ret;
287✔
219
}
220

221
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
2✔
222
  int32_t          code = 0;
2✔
223
  SBackendWrapper* pBackend = NULL;
2✔
224
  int64_t          chkpId = streamMetaGetLatestCheckpointId(pMeta);
2✔
225

226
  terrno = 0;
2✔
227
  bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
2✔
228
  if (exist == false) {
2✔
229
    code = terrno;
1✔
230
    return code;
1✔
231
  }
232

233
  code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
1✔
234
  if (code) {
1!
235
    return code;
×
236
  }
237

238
  void* pIter = taosHashIterate(pBackend->cfInst, NULL);
1✔
239
  while (pIter) {
3✔
240
    void* key = taosHashGetKey(pIter, NULL);
2✔
241
    code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
2✔
242
    if (code != 0) {
2!
243
      stError("vgId:%d failed to cvt data", pMeta->vgId);
×
244
      goto _EXIT;
×
245
    }
246

247
    pIter = taosHashIterate(pBackend->cfInst, pIter);
2✔
248
  }
249

250
_EXIT:
1✔
251
  streamBackendCleanup((void*)pBackend);
1✔
252

253
  if (code == 0) {
1!
254
    int32_t len = strlen(pMeta->path) + 32;
1✔
255
    char*   state = taosMemoryCalloc(1, len);
1!
256
    if (state != NULL) {
1!
257
      (void)snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
1✔
258
      taosRemoveDir(state);
1✔
259
      taosMemoryFree(state);
1!
260
    } else {
261
      stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
×
262
    }
263
  }
264

265
  return code;
1✔
266
}
267

268
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
287✔
269
  int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
287✔
270
  if (compatible == STREAM_STATA_COMPATIBLE) {
287✔
271
    return 0;
285✔
272
  } else if (compatible == STREAM_STATA_NEED_CONVERT) {
2!
273
    stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
2!
274
    return streamMetaCvtDbFormat(pMeta);
2✔
275
  } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
×
276
    stError(
×
277
        "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
278
        "manually",
279
        pMeta->vgId, tsDataDir);
280

281
    return -1;
×
282
  }
283

284
  return 0;
×
285
}
286

287
int8_t streamTaskShouldRecalated(SStreamTask* pTask) { return pTask->info.fillHistory == 2 ? 1 : 0; }
×
288

289
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key, uint8_t recalated) {
23✔
290
  int32_t code = 0;
23✔
291
  int64_t chkpId = pTask->chkInfo.checkpointId;
23✔
292

293
  // int8_t recalated = streamTaskShouldRecalated(pTask);
294

295
  streamMutexLock(&pMeta->backendMutex);
23✔
296
  // streamId--taskId
297
  void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
23✔
298
  if ((ppBackend != NULL) && (*ppBackend != NULL)) {
23!
299
    void* p = taskDbAddRef(*ppBackend);
3✔
300
    if (p == NULL) {
3!
301
      stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
×
302
      streamMutexUnlock(&pMeta->backendMutex);
×
303
      return TSDB_CODE_FAILED;
×
304
    }
305

306
    STaskDbWrapper* pBackend = *ppBackend;
3✔
307
    pBackend->pMeta = pMeta;
3✔
308
    if (recalated) {
3!
309
      pTask->pRecalBackend = pBackend;
×
310
    } else {
311
      pTask->pBackend = pBackend;
3✔
312
    }
313

314
    streamMutexUnlock(&pMeta->backendMutex);
3✔
315
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
3!
316
    return 0;
3✔
317
  }
318

319
  STaskDbWrapper* pBackend = NULL;
20✔
320
  int64_t         processVer = -1;
20✔
321
  while (1) {
322
    code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
20✔
323
    if (code == 0) {
20!
324
      break;
20✔
325
    }
326

327
    streamMutexUnlock(&pMeta->backendMutex);
×
328
    taosMsleep(1000);
×
329

330
    stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
×
331
    streamMutexLock(&pMeta->backendMutex);
×
332
  }
333

334
  int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
20✔
335
  if (recalated) {
20!
336
    pTask->pRecalBackend = pBackend;
×
337
  } else {
338
    pTask->pBackend = pBackend;
20✔
339
  }
340
  pBackend->refId = tref;
20✔
341
  pBackend->pTask = pTask;
20✔
342
  pBackend->pMeta = pMeta;
20✔
343

344
  if (processVer != -1) {
20✔
345
    if (pTask->chkInfo.processedVer != processVer) {
3!
346
      stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
×
347
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
348
      pTask->chkInfo.processedVer = processVer;
×
349
      pTask->chkInfo.checkpointVer = processVer;
×
350
      pTask->chkInfo.nextProcessVer = processVer + 1;
×
351
    } else {
352
      stInfo("s-task:%s vgId:%d processedVer:%" PRId64
3!
353
             " in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
354
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
355
    }
356
  }
357

358
  code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
20✔
359
  if (code) {
20!
360
    stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
×
361
  }
362
  streamMutexUnlock(&pMeta->backendMutex);
20✔
363

364
  if (recalated) {
20!
365
    stDebug("s-task:0x%x set recalated backend %p", pTask->id.taskId, pBackend);
×
366
  } else {
367
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
20✔
368
  }
369
  return 0;
20✔
370
}
371

372
void streamMetaRemoveDB(void* arg, char* key) {
16✔
373
  if (arg == NULL || key == NULL) return;
16!
374

375
  SStreamMeta* pMeta = arg;
14✔
376
  streamMutexLock(&pMeta->backendMutex);
14✔
377
  int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
14✔
378
  if (code) {
14!
379
    stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
×
380
  }
381

382
  streamMutexUnlock(&pMeta->backendMutex);
14✔
383
}
384

385
int32_t streamMetaUpdateInfoInit(STaskUpdateInfo* pInfo) {
287✔
386
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
287✔
387

388
  pInfo->pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
287✔
389
  if (pInfo->pTasks == NULL) {
287!
390
    return terrno;
×
391
  }
392

393
  pInfo->pTaskList = taosArrayInit(4, sizeof(int32_t));
287✔
394
  if (pInfo->pTaskList == NULL) {
287!
395
    return terrno;
×
396
  }
397

398
  return TSDB_CODE_SUCCESS;
287✔
399
}
400

401
void streamMetaUpdateInfoCleanup(STaskUpdateInfo* pInfo) {
279✔
402
  taosHashCleanup(pInfo->pTasks);
279✔
403
  taosArrayDestroy(pInfo->pTaskList);
279✔
404
  pInfo->pTasks = NULL;
279✔
405
  pInfo->pTaskList = NULL;
279✔
406
}
279✔
407

408

409

410
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
287✔
411
                       int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
412
  QRY_PARAM_CHECK(p);
287!
413
  int32_t code = 0;
287✔
414
  int32_t lino = 0;
287✔
415

416
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
287!
417
  if (pMeta == NULL) {
287!
418
    stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
×
419
    return terrno;
×
420
  }
421

422
  int32_t len = strlen(path) + 64;
287✔
423
  char*   tpath = taosMemoryCalloc(1, len);
287!
424
  TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
287!
425

426
  (void)snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream");
287✔
427
  pMeta->path = tpath;
287✔
428

429
  code = streamMetaOpenTdb(pMeta);
287✔
430
  TSDB_CHECK_CODE(code, lino, _err);
287!
431

432
  if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
287!
433
    stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
×
434
            tstrerror(terrno));
435
    TSDB_CHECK_CODE(code, lino, _err);
×
436
  }
437

438
  // set the attribute when running on Linux OS
439
  TdThreadRwlockAttr attr;
440
  code = taosThreadRwlockAttrInit(&attr);
287✔
441
  TSDB_CHECK_CODE(code, lino, _err);
287!
442

443
#ifdef LINUX
444
  code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
287✔
445
  TSDB_CHECK_CODE(code, lino, _err);
287!
446
#endif
447

448
  code = taosThreadRwlockInit(&pMeta->lock, &attr);
287✔
449
  TSDB_CHECK_CODE(code, lino, _err);
287!
450

451
  code = taosThreadRwlockAttrDestroy(&attr);
287✔
452
  TSDB_CHECK_CODE(code, lino, _err);
287!
453

454
  if ((code = streamMetaBegin(pMeta) < 0)) {
287!
455
    stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
×
456
    goto _err;
×
457
  }
458

459
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
287✔
460
  pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
287✔
461
  TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
287!
462

463
  code = streamMetaUpdateInfoInit(&pMeta->updateInfo);
287✔
464
  TSDB_CHECK_CODE(code, lino, _err);
287!
465

466
  code = streamMetaInitStartInfo(&pMeta->startInfo);
287✔
467
  TSDB_CHECK_CODE(code, lino, _err);
287!
468

469
  // task list
470
  pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
287✔
471
  TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
287!
472

473
  pMeta->scanInfo.scanSentinel = 0;
287✔
474
  pMeta->scanInfo.lastScanTs = 0;
287✔
475
  pMeta->scanInfo.tickCounter = 0;
287✔
476

477
  pMeta->vgId = vgId;
287✔
478
  pMeta->ahandle = ahandle;
287✔
479
  pMeta->buildTaskFn = buildTaskFn;
287✔
480
  pMeta->expandTaskFn = expandTaskFn;
287✔
481
  pMeta->stage = stage;
287✔
482
  pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
287✔
483
  pMeta->updateInfo.activeTransId = -1;
287✔
484
  pMeta->updateInfo.completeTransId = -1;
287✔
485

486
  pMeta->startInfo.completeFn = fn;
287✔
487
  pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
287✔
488
  TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
287!
489

490
  pMeta->numOfPausedTasks = 0;
287✔
491
  pMeta->numOfStreamTasks = 0;
287✔
492
  pMeta->closeFlag = false;
287✔
493

494
  stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
287!
495

496
  code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
287✔
497
  TSDB_CHECK_CODE(code, lino, _err);
287!
498

499
  code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
287✔
500
  TSDB_CHECK_CODE(code, lino, _err);
287!
501

502
  // add refId at the end of initialization function
503
  pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
287✔
504

505
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
287!
506
  TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
287!
507

508
  memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
287✔
509

510
  code = metaRefMgtAdd(pMeta->vgId, pRid);
287✔
511
  TSDB_CHECK_CODE(code, lino, _err);
287!
512

513
  code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
287✔
514

515
  TSDB_CHECK_CODE(code, lino, _err);
287!
516

517
  *p = pMeta;
287✔
518
  return code;
287✔
519

520
_err:
×
521
  taosMemoryFree(pMeta->path);
×
522
  if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
×
523
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
×
524
  if (pMeta->pTaskDb) {
×
525
    tdbTbClose(pMeta->pTaskDb);
×
526
    pMeta->pTaskDb = NULL;
×
527
  }
528
  if (pMeta->pCheckpointDb) {
×
529
    tdbTbClose(pMeta->pCheckpointDb);
×
530
  }
531
  if (pMeta->db) {
×
532
    tdbClose(pMeta->db);
×
533
  }
534

535
  if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
×
536
  if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
×
537
  if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
×
538
  if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
×
539
  if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
×
540

541
  if (pMeta->startInfo.pStagesList) taosArrayDestroy(pMeta->startInfo.pStagesList);
×
542
  taosMemoryFree(pMeta);
×
543

544
  stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
×
545
  return code;
×
546
}
547

548
// todo refactor: the lock shoud be restricted in one function
549
#ifdef BUILD_NO_CALL
550
void streamMetaInitBackend(SStreamMeta* pMeta) {
551
  pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
552
  if (pMeta->streamBackend == NULL) {
553
    while (1) {
554
      streamMetaWLock(pMeta);
555
      pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
556
      if (pMeta->streamBackend != NULL) {
557
        break;
558
      }
559

560
      streamMetaWUnLock(pMeta);
561
      stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
562
      taosMsleep(100);
563
    }
564
  }
565

566
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
567
  streamBackendLoadCheckpointInfo(pMeta);
568
}
569
#endif
570

571
void streamMetaClear(SStreamMeta* pMeta) {
282✔
572
  // remove all existed tasks in this vnode
573
  int64_t st = taosGetTimestampMs();
282✔
574
  void*   pIter = NULL;
282✔
575

576
  while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
306✔
577
    int64_t      refId = *(int64_t*)pIter;
24✔
578
    SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
24✔
579
    if (p == NULL) {
24✔
580
      continue;
19✔
581
    }
582

583
    // release the ref by timer
584
    if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) {  // one more ref in timer
5!
585
      stDebug("s-task:%s stop schedTimer", p->id.idStr);
×
586
      streamTmrStop(p->schedInfo.pDelayTimer);
×
587
      p->info.delaySchedParam = 0;
×
588
    }
589

590
    int32_t code = taosRemoveRef(streamTaskRefPool, refId);
5✔
591
    if (code) {
5!
592
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
593
    }
594

595
    code = taosReleaseRef(streamTaskRefPool, refId);
5✔
596
    if (code) {
5!
597
      stError("vgId:%d failed to release refId:%" PRId64, pMeta->vgId, refId);
×
598
    }
599
  }
600

601
  int64_t et = taosGetTimestampMs();
282✔
602
  stDebug("vgId:%d clear task map, elapsed time:%.2fs", pMeta->vgId, (et - st)/1000.0);
282✔
603

604
  if (pMeta->streamBackendRid != 0) {
282!
605
    int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
×
606
    if (code) {
×
607
      stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
×
608
    }
609
  }
610

611
  int64_t et1 = taosGetTimestampMs();
282✔
612
  stDebug("vgId:%d clear backend completed, elapsed time:%.2fs", pMeta->vgId, (et1 - et)/1000.0);
282✔
613

614
  taosHashClear(pMeta->pTasksMap);
282✔
615

616
  taosArrayClear(pMeta->pTaskList);
282✔
617
  taosArrayClear(pMeta->chkpSaved);
282✔
618
  taosArrayClear(pMeta->chkpInUse);
282✔
619

620
  pMeta->numOfStreamTasks = 0;
282✔
621
  pMeta->numOfPausedTasks = 0;
282✔
622

623
  // the willrestart/starting flag can NOT be cleared
624
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
282✔
625
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
282✔
626

627
  taosArrayClear(pMeta->startInfo.pStagesList);
282✔
628
  pMeta->startInfo.readyTs = 0;
282✔
629
}
282✔
630

631
void streamMetaClose(SStreamMeta* pMeta) {
280✔
632
  if (pMeta == NULL) {
280✔
633
    return;
1✔
634
  }
635

636
  stDebug("vgId:%d start to close stream meta", pMeta->vgId);
279✔
637
  int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
279✔
638
  if (code) {
279!
639
    stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
×
640
  }
641
}
642

643
void streamMetaCloseImpl(void* arg) {
279✔
644
  SStreamMeta* pMeta = arg;
279✔
645
  if (pMeta == NULL) {
279!
646
    return;
×
647
  }
648

649
  int32_t code = 0;
279✔
650
  int32_t vgId = pMeta->vgId;
279✔
651
  stDebug("vgId:%d start to do-close stream meta", vgId);
279✔
652

653
  streamMetaWLock(pMeta);
279✔
654
  streamMetaClear(pMeta);
279✔
655
  streamMetaWUnLock(pMeta);
279✔
656

657
  // already log the error, ignore here
658
  tdbAbort(pMeta->db, pMeta->txn);
279✔
659
  tdbTbClose(pMeta->pTaskDb);
279✔
660
  tdbTbClose(pMeta->pCheckpointDb);
279✔
661
  tdbClose(pMeta->db);
279✔
662

663
  taosArrayDestroy(pMeta->pTaskList);
279✔
664
  taosArrayDestroy(pMeta->chkpSaved);
279✔
665
  taosArrayDestroy(pMeta->chkpInUse);
279✔
666

667
  taosHashCleanup(pMeta->pTasksMap);
279✔
668
  taosHashCleanup(pMeta->pTaskDbUnique);
279✔
669

670
  streamMetaUpdateInfoCleanup(&pMeta->updateInfo);
279✔
671
  streamMetaClearStartInfo(&pMeta->startInfo);
279✔
672

673
  destroyMetaHbInfo(pMeta->pHbInfo);
279✔
674
  pMeta->pHbInfo = NULL;
279✔
675

676
  taosMemoryFree(pMeta->path);
279!
677
  streamMutexDestroy(&pMeta->backendMutex);
279✔
678

679
  bkdMgtDestroy(pMeta->bkdChkptMgt);
279✔
680

681
  pMeta->role = NODE_ROLE_UNINIT;
279✔
682
  code = taosThreadRwlockDestroy(&pMeta->lock);
279✔
683
  if (code) {
279!
684
    stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
×
685
  }
686

687
  taosMemoryFree(pMeta);
279!
688
  stDebug("vgId:%d end to close stream meta", vgId);
279✔
689
}
690

691
// todo let's check the status for each task
692
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
26✔
693
  int32_t vgId = pTask->pMeta->vgId;
26✔
694
  void*   buf = NULL;
26✔
695
  int32_t len;
696
  int32_t code;
697
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
26!
698
  if (code < 0) {
26!
699
    return -1;
×
700
  }
701

702
  buf = taosMemoryCalloc(1, len);
26!
703
  if (buf == NULL) {
26!
704
    return terrno;
×
705
  }
706

707
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
26!
708
    pTask->ver = SSTREAM_TASK_VER;
×
709
  }
710

711
  SEncoder encoder = {0};
26✔
712
  tEncoderInit(&encoder, buf, len);
26✔
713
  code = tEncodeStreamTask(&encoder, pTask);
26✔
714
  tEncoderClear(&encoder);
26✔
715

716
  if (code == -1) {
26!
717
    stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
718
    return TSDB_CODE_INVALID_MSG;
×
719
  }
720

721
  int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
26✔
722

723
  code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
26✔
724
  if (code != TSDB_CODE_SUCCESS) {
26!
725
    code = terrno;
×
726
    stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
×
727
            vgId, pTask->id.refId, tstrerror(code));
728

729
    int64_t refId = pTask->id.refId;
×
730
    int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
×
731
    if (ret != 0) {
×
732
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId);
×
733
    }
734
  } else {
735
    stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
26!
736
  }
737

738
  taosMemoryFree(buf);
26!
739
  return code;
26✔
740
}
741

742
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
5✔
743
  int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
5✔
744
  int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
5✔
745
  if (code != 0) {
5!
746
    stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
×
747
            tstrerror(terrno));
748
  } else {
749
    stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
5!
750
  }
751

752
  return code;
5✔
753
}
754

755
// add to the ready tasks hash map, not the restored tasks hash map
756
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
23✔
757
  *pAdded = false;
23✔
758

759
  int32_t code = 0;
23✔
760
  int64_t refId = 0;
23✔
761
  STaskId id = streamTaskGetTaskId(pTask);
23✔
762
  void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
23✔
763

764
  if (p != NULL) {
23✔
765
    stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
2!
766
    tFreeStreamTask(pTask);
2✔
767
    return code;
2✔
768
  }
769

770
  if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
21!
771
    tFreeStreamTask(pTask);
×
772
    return code;
×
773
  }
774

775
  p = taosArrayPush(pMeta->pTaskList, &pTask->id);
21✔
776
  if (p == NULL) {
21!
777
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
778
    tFreeStreamTask(pTask);
×
779
    return terrno;
×
780
  }
781

782
  pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
21✔
783
  code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
21✔
784
  if (code) {
21!
785
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
786
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
787

788
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
789
    if (ret != 0) {
×
790
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
791
    }
792
    return code;
×
793
  }
794

795
  if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
21!
796
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
797
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
798

799
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
800
    if (ret) {
×
801
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
802
    }
803
    return code;
×
804
  }
805

806
  if ((code = streamMetaCommit(pMeta)) != 0) {
21!
807
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
808
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
809

810
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
811
    if (ret) {
×
812
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
813
    }
814

815
    return code;
×
816
  }
817

818
  if (pTask->info.fillHistory == 0) {
21✔
819
    int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
16✔
820
  }
821

822
  *pAdded = true;
21✔
823
  return code;
21✔
824
}
825

826
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
1,227✔
827
  int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
1,227✔
828
  int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
1,227✔
829
  if (sizeInList != size) {
1,227!
830
    stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
×
831
  }
832

833
  return size;
1,227✔
834
}
835

836
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
2,827✔
837
  QRY_PARAM_CHECK(pTask);
2,827!
838
  STaskId  id = {.streamId = streamId, .taskId = taskId};
2,827✔
839
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
2,827✔
840
  if (pTaskRefId == NULL) {
2,827!
841
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
842
  }
843

844
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
2,827✔
845
  if (p == NULL) {
2,827✔
846
    stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId);
15!
847
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
15✔
848
  }
849

850
  if (p->id.refId != *pTaskRefId) {
2,812!
851
    stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
×
852
            p->id.refId);
853
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
854
    if (ret) {
×
855
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
856
    }
857

858
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
859
  }
860

861
  if (streamTaskShouldStop(p)) {
2,812!
862
    stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
×
863
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
864
    if (ret) {
×
865
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
866
    }
867
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
868
  }
869

870
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
2,812!
871
  *pTask = p;
2,812✔
872
  return TSDB_CODE_SUCCESS;
2,812✔
873
}
874

875
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
310✔
876
  QRY_PARAM_CHECK(pTask);
310!
877
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
310✔
878

879
  if (pTaskRefId == NULL) {
310!
880
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
881
  }
882

883
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
310✔
884
  if (p == NULL) {
310!
885
    stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
×
886
            *pTaskRefId);
887
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
888
  }
889

890
  if (p->id.refId != *pTaskRefId) {
310!
891
    stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
×
892
            *pTaskRefId, p->id.refId);
893
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
894
    if (ret) {
×
895
      stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
×
896
    }
897

898
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
899
  }
900

901
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
310!
902
  *pTask = p;
310✔
903
  return TSDB_CODE_SUCCESS;
310✔
904
}
905

906
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
2,777✔
907
  streamMetaRLock(pMeta);
2,777✔
908
  int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
2,777✔
909
  streamMetaRUnLock(pMeta);
2,777✔
910
  return code;
2,777✔
911
}
912

913
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
3,455✔
914
  if (pTask == NULL) {
3,455✔
915
    return;
5✔
916
  }
917

918
  int32_t taskId = pTask->id.taskId;
3,450✔
919
  int64_t refId = pTask->id.refId;
3,450✔
920
  stDebug("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
3,450!
921
  int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
3,450✔
922
  if (ret) {
3,450!
923
    stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
×
924
  }
925
}
926

927
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
5✔
928
  bool remove = false;
5✔
929
  for (int32_t i = 0; i < num; ++i) {
9!
930
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
9✔
931
    if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
9!
932
      taosArrayRemove(pTaskList, i);
5✔
933
      remove = true;
5✔
934
      break;
5✔
935
    }
936
  }
937

938
  if (!remove) {
5!
939
    stError("s-task:0x%x not in meta task list, internal error", id->taskId);
×
940
  }
941
}
5✔
942

943
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
5✔
944
  int32_t code = 0;
5✔
945

946
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
5✔
947
    code = streamTaskSendCheckpointSourceRsp(pTask);
2✔
948
    if (code) {
2!
949
      stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
×
950
              tstrerror(code));
951
    }
952
  }
953

954
  // let's kill the query procedure within stream, to end it ASAP.
955
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
5!
956
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
3✔
957
    if (code != TSDB_CODE_SUCCESS) {
3!
958
      stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
959
    }
960
  }
961
  return code;
5✔
962
}
963

964
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
5✔
965
  SStreamTask* pTask = NULL;
5✔
966
  int32_t      vgId = pMeta->vgId;
5✔
967
  int32_t      code = 0;
5✔
968
  STaskId      id = {.streamId = streamId, .taskId = taskId};
5✔
969

970
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
5✔
971
  if (code == 0) {
5!
972
    // desc the paused task counter
973
    if (streamTaskShouldPause(pTask)) {
5!
974
      int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
×
975
      stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
×
976
    }
977

978
    // handle the dropping event
979
    code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
5✔
980
    if (code) {
5!
981
      stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
×
982
    }
983

984
    stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
5!
985

986
    // it is a fill-history task, remove the related stream task's id that points to it
987
    if (pTask->info.fillHistory == 0) {
5!
988
      int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
×
989
    }
990

991
    code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
5✔
992
    doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
5✔
993
    code = streamMetaRemoveTaskInMeta(pMeta, &id);
5✔
994
    if (code) {
5!
995
      stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
×
996
    }
997

998
    int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
5✔
999
    int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
5✔
1000
    if (sizeInList != size) {
5!
1001
      stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
×
1002
    }
1003

1004
    if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
5!
1005
      stDebug("s-task:%s stop schedTimer", pTask->id.idStr);
×
1006
      streamTmrStop(pTask->schedInfo.pDelayTimer);
×
1007
      pTask->info.delaySchedParam = 0;
×
1008
    }
1009

1010
    int64_t refId = pTask->id.refId;
5✔
1011
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
5✔
1012
    if (ret != 0) {
5!
1013
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1014
    }
1015

1016
    streamMetaReleaseTask(pMeta, pTask);
5✔
1017
  } else {
1018
    stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
×
1019
  }
1020

1021
  return 0;
5✔
1022
}
1023

1024
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
×
1025
  SStreamTask* pTask = NULL;
×
1026
  int32_t      code = 0;
×
1027
  int32_t      vgId = pMeta->vgId;
×
1028
  int32_t      numOfTasks = 0;
×
1029

1030
  streamMetaWLock(pMeta);
×
1031

1032
//  code = streamMetaUnregisterTask(pMeta, streamId, taskId);
1033
//  numOfTasks = streamMetaGetNumOfTasks(pMeta);
1034
//  if (code) {
1035
//    stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1036
//  }
1037
//
1038
//  code = streamMetaCommit(pMeta);
1039
//  if (code) {
1040
//    stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1041
//  } else {
1042
//    stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
1043
//  }
1044

1045
  streamMetaWUnLock(pMeta);
×
1046

1047
  return code;
×
1048
}
1049

1050
int32_t streamMetaBegin(SStreamMeta* pMeta) {
287✔
1051
  streamMetaWLock(pMeta);
287✔
1052
  int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
287✔
1053
                          TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1054
  if (code) {
287!
1055
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1056
  }
1057
  streamMetaWUnLock(pMeta);
287✔
1058
  return code;
287✔
1059
}
1060

1061
int32_t streamMetaCommit(SStreamMeta* pMeta) {
313✔
1062
  int32_t code = tdbCommit(pMeta->db, pMeta->txn);
313✔
1063
  if (code != 0) {
313!
1064
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1065
    stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1066
            pMeta->fatalInfo.line);
1067
  }
1068

1069
  code = tdbPostCommit(pMeta->db, pMeta->txn);
313✔
1070
  if (code != 0) {
313!
1071
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1072
    stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1073
            pMeta->fatalInfo.line);
1074
    return code;
×
1075
  }
1076

1077
  code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
313✔
1078
                  TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1079
  if (code != 0) {
313!
1080
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1081
    stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
×
1082
  } else {
1083
    stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
313✔
1084
  }
1085

1086
  return code;
313✔
1087
}
1088

1089
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
2✔
1090
  int64_t checkpointId = 0;
2✔
1091
  int32_t code = 0;
2✔
1092

1093
  TBC* pCur = NULL;
2✔
1094
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
2!
1095
    stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1096
    return checkpointId;
×
1097
  }
1098

1099
  void*    pKey = NULL;
2✔
1100
  int32_t  kLen = 0;
2✔
1101
  void*    pVal = NULL;
2✔
1102
  int32_t  vLen = 0;
2✔
1103
  SDecoder decoder;
1104

1105
  code = tdbTbcMoveToFirst(pCur);
2✔
1106
  if (code) {
2!
1107
    stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1108
    tdbTbcClose(pCur);
×
1109
    return checkpointId;
×
1110
  }
1111

1112
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
5✔
1113
    if (pVal == NULL || vLen == 0) {
3!
1114
      break;
1115
    }
1116
    SCheckpointInfo info;
1117
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
3✔
1118
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
3!
1119
      tDecoderClear(&decoder);
×
1120
      continue;
×
1121
    }
1122
    tDecoderClear(&decoder);
3✔
1123

1124
    checkpointId = TMAX(checkpointId, info.checkpointId);
3✔
1125
  }
1126

1127
  stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
2!
1128

1129
  tdbFree(pKey);
2✔
1130
  tdbFree(pVal);
2✔
1131

1132
  tdbTbcClose(pCur);
2✔
1133
  return checkpointId;
2✔
1134
}
1135

1136
// not allowed to return error code
1137
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
282✔
1138
  TBC*     pCur = NULL;
282✔
1139
  void*    pKey = NULL;
282✔
1140
  int32_t  kLen = 0;
282✔
1141
  void*    pVal = NULL;
282✔
1142
  int32_t  vLen = 0;
282✔
1143
  SDecoder decoder;
1144
  int32_t  vgId = 0;
282✔
1145
  int32_t  code = 0;
282✔
1146
  SArray*  pRecycleList = NULL;
282✔
1147

1148
  if (pMeta == NULL) {
282!
1149
    return;
×
1150
  }
1151

1152
  vgId = pMeta->vgId;
282✔
1153
  pRecycleList = taosArrayInit(4, sizeof(STaskId));
282✔
1154
  if (pRecycleList == NULL) {
282!
1155
    stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
×
1156
    return;
×
1157
  }
1158

1159
  stInfo("vgId:%d load stream tasks from meta files", vgId);
282!
1160

1161
  code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
282✔
1162
  if (code != TSDB_CODE_SUCCESS) {
282!
1163
    stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1164
    taosArrayDestroy(pRecycleList);
×
1165
    return;
×
1166
  }
1167

1168
  code = tdbTbcMoveToFirst(pCur);
282✔
1169
  if (code) {
282!
1170
    stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1171
    taosArrayDestroy(pRecycleList);
×
1172
    tdbTbcClose(pCur);
×
1173
    return;
×
1174
  }
1175

1176
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
290✔
1177
    if (pVal == NULL || vLen == 0) {
8!
1178
      break;
1179
    }
1180

1181
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
8!
1182
    if (pTask == NULL) {
8!
1183
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1184
      stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
×
1185
      break;
×
1186
    }
1187

1188
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
8✔
1189
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
8!
1190
      tDecoderClear(&decoder);
×
1191
      tFreeStreamTask(pTask);
×
1192
      stError(
×
1193
          "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
1194
          "stream manually",
1195
          vgId, tsDataDir);
1196
      break;
×
1197
    }
1198
    tDecoderClear(&decoder);
8✔
1199

1200
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
8!
1201
      int32_t taskId = pTask->id.taskId;
×
1202
      STaskId id = streamTaskGetTaskId(pTask);
×
1203

1204
      tFreeStreamTask(pTask);
×
1205
      void* px = taosArrayPush(pRecycleList, &id);
×
1206
      if (px == NULL) {
×
1207
        stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
×
1208
      }
1209

1210
      int32_t total = taosArrayGetSize(pRecycleList);
×
1211
      stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
×
1212
      continue;
×
1213
    }
1214

1215
    stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
8!
1216
            pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
1217

1218
    // do duplicate task check.
1219
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
8✔
1220
    void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
8✔
1221
    if (p == NULL) {
8!
1222
      code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
8✔
1223
      if (code < 0) {
8!
1224
        stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
×
1225
        tFreeStreamTask(pTask);
×
1226
        continue;
×
1227
      }
1228

1229
      void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
8✔
1230
      if (px == NULL) {
8!
1231
        stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
×
1232
      }
1233
    } else {
1234
      // todo this should replace the existed object put by replay creating stream task msg from mnode
1235
      stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
×
1236
      tFreeStreamTask(pTask);
×
1237
      continue;
×
1238
    }
1239

1240
    pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
8✔
1241

1242
    if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
8!
1243
      int64_t refId = pTask->id.refId;
×
1244
      stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
×
1245
              pTask->id.taskId, tstrerror(terrno), refId);
1246

1247
      void*   px = taosArrayPop(pMeta->pTaskList);
×
1248
      int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
1249
      if (ret != 0) {
×
1250
        stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1251
      }
1252
      continue;
×
1253
    }
1254

1255
    stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId);
8!
1256
    if (pTask->info.fillHistory == 0) {
8!
1257
      int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
8✔
1258
    }
1259

1260
    if (streamTaskShouldPause(pTask)) {
8!
1261
      int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
1262
    }
1263
  }
1264

1265
  tdbFree(pKey);
282✔
1266
  tdbFree(pVal);
282✔
1267

1268
  tdbTbcClose(pCur);
282✔
1269

1270
  if (taosArrayGetSize(pRecycleList) > 0) {
282!
1271
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
×
1272
      STaskId* pId = taosArrayGet(pRecycleList, i);
×
1273
      code = streamMetaRemoveTaskInMeta(pMeta, pId);
×
1274
      if (code) {
×
1275
        stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
×
1276
      }
1277
    }
1278
  }
1279

1280
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
282✔
1281
  stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
282✔
1282
          pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
1283

1284
  taosArrayDestroy(pRecycleList);
282✔
1285
  code = streamMetaCommit(pMeta);
282✔
1286
  if (code) {
282!
1287
    stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
×
1288
  }
1289
}
1290

1291
void streamMetaNotifyClose(SStreamMeta* pMeta) {
463✔
1292
  int32_t vgId = pMeta->vgId;
463✔
1293
  int64_t startTs = 0;
463✔
1294
  int32_t sendCount = 0;
463✔
1295

1296
  streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
463✔
1297
  stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
463!
1298
         vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
1299

1300
  // wait for the stream meta hb function stopping
1301
  pMeta->closeFlag = true;
463✔
1302
  streamMetaWaitForHbTmrQuit(pMeta);
463✔
1303

1304
  stDebug("vgId:%d start to check all tasks for closing", vgId);
463✔
1305
  int64_t st = taosGetTimestampMs();
463✔
1306

1307
  streamMetaRLock(pMeta);
463✔
1308

1309
  SArray* pTaskList = NULL;
463✔
1310
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
463✔
1311
  if (code != TSDB_CODE_SUCCESS) {
1312
  }
1313

1314
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
463✔
1315
  for (int32_t i = 0; i < numOfTasks; ++i) {
497✔
1316
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
34✔
1317
    SStreamTask*   pTask = NULL;
34✔
1318

1319
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
34✔
1320
    if (code != TSDB_CODE_SUCCESS) {
34✔
1321
      continue;
15✔
1322
    }
1323

1324
    int64_t refId = pTask->id.refId;
19✔
1325
    int32_t ret = streamTaskStop(pTask);
19✔
1326
    if (ret) {
19!
1327
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
1328
    }
1329

1330
    streamMetaReleaseTask(pMeta, pTask);
19✔
1331
    ret = taosRemoveRef(streamTaskRefPool, refId);
19✔
1332
    if (ret) {
19!
1333
      stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
×
1334
    }
1335
  }
1336

1337
  taosArrayDestroy(pTaskList);
463✔
1338

1339
  double el = (taosGetTimestampMs() - st) / 1000.0;
463✔
1340
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
463✔
1341

1342
  if (pMeta->scanInfo.scanTimer != NULL) {
463✔
1343
    streamTmrStop(pMeta->scanInfo.scanTimer);
229✔
1344
    pMeta->scanInfo.scanTimer = NULL;
229✔
1345
  }
1346

1347
  streamMetaRUnLock(pMeta);
463✔
1348
}
463✔
1349

1350
void streamMetaStartHb(SStreamMeta* pMeta) {
229✔
1351
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
229!
1352
  if (pRid == NULL) {
229!
1353
    stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
×
1354
    return;
×
1355
  }
1356

1357
  *pRid = pMeta->rid;
229✔
1358
  int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
229✔
1359
  if (code) {
229!
1360
    return;
×
1361
  }
1362

1363
  streamMetaHbToMnode(pRid, NULL);
229✔
1364
}
1365

1366
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
463✔
1367
  QRY_PARAM_CHECK(pList);
463!
1368

1369
  int32_t code = 0;
463✔
1370
  SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
463✔
1371
  if (pTaskList == NULL) {
463!
1372
    stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
×
1373
    return terrno;
×
1374
  }
1375

1376
  *pList = pTaskList;
463✔
1377

1378
  bool sendMsg = pMeta->sendMsgBeforeClosing;
463✔
1379
  if (!sendMsg) {
463!
1380
    stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
463✔
1381
    return TSDB_CODE_SUCCESS;
463✔
1382
  }
1383

1384
  stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
×
1385

1386
  // send hb msg to mnode before closing all tasks.
1387
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
×
1388
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
1389
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
1390
    SStreamTask*   pTask = NULL;
×
1391

1392
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
1393
    if (code != TSDB_CODE_SUCCESS) {  // this error is ignored
×
1394
      continue;
×
1395
    }
1396

1397
    streamTaskSetCheckpointFailed(pTask);
×
1398
    streamMetaReleaseTask(pMeta, pTask);
×
1399
  }
1400

1401
  code = streamMetaSendHbHelper(pMeta);
×
1402
  pMeta->sendMsgBeforeClosing = false;
×
1403
  return TSDB_CODE_SUCCESS;  // always return true
×
1404
}
1405

1406
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
345✔
1407
  streamMetaWLock(pMeta);
345✔
1408

1409
  int64_t prevStage = pMeta->stage;
345✔
1410
  pMeta->stage = stage;
345✔
1411

1412
  // mark the sign to send msg before close all tasks
1413
  // 1. for leader vnode, always send msg before closing
1414
  // 2. for follower vnode, if it's changed from leader, also sending msg before closing.
1415
  if (pMeta->role == NODE_ROLE_LEADER) {
345!
1416
    pMeta->sendMsgBeforeClosing = true;
×
1417
  }
1418

1419
  pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
345✔
1420
  if (!isLeader) {
345✔
1421
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
116✔
1422
  } else {  // wait for nodeep update if become leader from follower
1423
    if (prevStage == NODE_ROLE_FOLLOWER) {
229!
1424
      pMeta->startInfo.tasksWillRestart = 1;
×
1425
    }
1426
  }
1427

1428
  streamMetaWUnLock(pMeta);
345✔
1429

1430
  if (isLeader) {
345✔
1431
    if (prevStage == NODE_ROLE_FOLLOWER) {
229!
1432
      stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64
×
1433
             " restart after nodeEp being updated",
1434
             pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
1435
    } else {
1436
      stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
229!
1437
             pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
1438
    }
1439
    streamMetaStartHb(pMeta);
229✔
1440
  } else {
1441
    stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
116!
1442
           stage, prevStage, isLeader, pMeta->sendMsgBeforeClosing);
1443
  }
1444
}
345✔
1445

1446
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
4✔
1447
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
4✔
1448
  for (int32_t i = 0; i < num; ++i) {
11✔
1449
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
7✔
1450
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
7✔
1451
    SStreamTask*   pTask = NULL;
7✔
1452
    int32_t        code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
7✔
1453

1454
    if (code == 0) {
7!
1455
      if (pTask->status.downstreamReady == 0) {
7!
1456
        streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
×
1457
        return false;
×
1458
      }
1459
      streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
7✔
1460
    }
1461
  }
1462

1463
  return true;
4✔
1464
}
1465

1466
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
5✔
1467
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
5✔
1468

1469
  stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
5!
1470
  if (numOfTasks == 0) {
5!
1471
    return TSDB_CODE_SUCCESS;
×
1472
  }
1473

1474
  for (int32_t i = 0; i < numOfTasks; ++i) {
13✔
1475
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
8✔
1476
    STaskId        id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
8✔
1477
    SStreamTask*   pTask = NULL;
8✔
1478
    int32_t        code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
8✔
1479
    if (code == 0) {
8!
1480
      streamTaskResetStatus(pTask);
8✔
1481
      streamMetaReleaseTask(pMeta, pTask);
8✔
1482
    }
1483
  }
1484

1485
  return 0;
5✔
1486
}
1487

1488
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
5✔
1489
                                     int64_t startTs) {
1490
  const char* id = pTask->id.idStr;
5✔
1491
  int32_t     vgId = pMeta->vgId;
5✔
1492
  int32_t     code = 0;
5✔
1493

1494
  // keep the already updated info
1495
  STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
5✔
1496
  code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
5✔
1497
  if (code != 0) {
5!
1498
    stError("s-task:%s failed to put updateTask into update list", id);
×
1499
  }
1500

1501
  int64_t el = taosGetTimestampMs() - startTs;
5✔
1502
  if (pHTask != NULL) {
5!
1503
    STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
×
1504
    code = taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
×
1505
    if (code != 0) {
×
1506
      stError("s-task:%s failed to put updateTask into update list", id);
×
1507
    } else {
1508
      stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
×
1509
              " ms",
1510
              id, vgId, transId, el);
1511
    }
1512
  } else {
1513
    stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
5!
1514
            id, vgId, transId, el);
1515
  }
1516
}
5✔
1517

1518
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
3✔
1519
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
3✔
1520
  int32_t          num = taosArrayGetSize(pInfo->pTaskList);
3✔
1521

1522
  taosHashClear(pInfo->pTasks);
3✔
1523
  taosArrayClear(pInfo->pTaskList);
3✔
1524

1525
  int32_t prev = pInfo->completeTransId;
3✔
1526
  pInfo->completeTransId = pInfo->activeTransId;
3✔
1527
  pInfo->activeTransId = -1;
3✔
1528
  pInfo->completeTs = taosGetTimestampMs();
3✔
1529

1530
  stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64
3!
1531
          ", complete transId:%d->%d, update Tasks:%d reset active transId",
1532
          pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId, num);
1533
}
3✔
1534

1535
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
5✔
1536
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
5✔
1537

1538
  if (transId > pInfo->completeTransId) {
5!
1539
    if (pInfo->activeTransId == -1) {
5✔
1540
      taosHashClear(pInfo->pTasks);
3✔
1541
      pInfo->activeTransId = transId;
3✔
1542

1543
      stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId,
3!
1544
             pInfo->completeTransId);
1545
      return true;
3✔
1546
    } else {
1547
      if (pInfo->activeTransId == transId) {
2!
1548
        // do nothing
1549
        return true;
2✔
1550
      } else if (transId < pInfo->activeTransId) {
×
1551
        stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard",
×
1552
                pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
1553
        return false;
×
1554
      } else {  // transId > pInfo->activeTransId
1555
        taosHashClear(pInfo->pTasks);
×
1556
        int32_t prev = pInfo->activeTransId;
×
1557
        pInfo->activeTransId = transId;
×
1558

1559
        stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId,
×
1560
               transId, prev, pInfo->completeTransId);
1561
        return true;
×
1562
      }
1563
    }
1564
  } else if (transId == pInfo->completeTransId) {
×
1565
    stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
×
1566
            pInfo->completeTs);
1567
    return false;
×
1568
  } else {  // pInfo->completeTransId > transId
1569
    stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
×
1570
            pMeta->vgId, pInfo->activeTransId, transId);
1571
    return false;
×
1572
  }
1573
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc