• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.7
/source/libs/stream/src/streamMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "streamInt.h"
18
#include "tmisce.h"
19
#include "tref.h"
20
#include "tsched.h"
21
#include "tstream.h"
22
#include "ttimer.h"
23
#include "wal.h"
24

25
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
26

27
int32_t streamBackendId = 0;
28
int32_t streamBackendCfWrapperId = 0;
29
int32_t taskDbWrapperId = 0;
30
int32_t streamTaskRefPool = 0;
31

32
static int32_t streamMetaBegin(SStreamMeta* pMeta);
33
static void    streamMetaCloseImpl(void* arg);
34

35
typedef struct {
36
  TdThreadMutex mutex;
37
  SHashObj*     pTable;
38
} SMetaRefMgt;
39

40
SMetaRefMgt gMetaRefMgt;
41

42
int32_t metaRefMgtInit();
43
void    metaRefMgtCleanup();
44

45
static void streamMetaEnvInit() {
2,440✔
46
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
2,440✔
47
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
2,440✔
48
  taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
2,440✔
49

50
  streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
2,440✔
51
  streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
2,440✔
52

53
  int32_t code = metaRefMgtInit();
2,440✔
54
  if (code) {
2,440!
55
    stError("failed to init stream meta mgmt env, start failed");
×
56
    return;
×
57
  }
58

59
  code = streamTimerInit();
2,440✔
60
  if (code) {
2,440!
61
    stError("failed to init stream meta env, start failed");
×
62
  }
63
}
64

65
void streamMetaInit() {
2,440✔
66
  int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
2,440✔
67
  if (code) {
2,440!
68
    stError("failed to init stream Meta model, code:%s", tstrerror(code));
×
69
  }
70
}
2,440✔
71

72
void streamMetaCleanup() {
2,440✔
73
  taosCloseRef(streamBackendId);
2,440✔
74
  taosCloseRef(streamBackendCfWrapperId);
2,440✔
75
  taosCloseRef(streamMetaRefPool);
2,440✔
76
  taosCloseRef(streamTaskRefPool);
2,440✔
77

78
  metaRefMgtCleanup();
2,440✔
79
  streamTimerCleanUp();
2,440✔
80
}
2,440✔
81

82
int32_t metaRefMgtInit() {
2,440✔
83
  int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
2,440✔
84
  if (code) {
2,440!
85
    return code;
×
86
  }
87

88
  if (code == 0) {
2,440!
89
    gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
2,440✔
90
  }
91

92
  if (gMetaRefMgt.pTable == NULL) {
2,440!
93
    return terrno;
×
94
  } else {
95
    return code;
2,440✔
96
  }
97
}
98

99
void metaRefMgtCleanup() {
2,440✔
100
  void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
2,440✔
101
  while (pIter) {
34,119✔
102
    int64_t* p = *(int64_t**) pIter;
31,679✔
103
    taosMemoryFree(p);
31,679✔
104
    pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
31,679✔
105
  }
106

107
  taosHashCleanup(gMetaRefMgt.pTable);
2,440✔
108
  streamMutexDestroy(&gMetaRefMgt.mutex);
2,440✔
109
}
2,440✔
110

111
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
70,643✔
112
  int32_t code = 0;
70,643✔
113
  void*   p = NULL;
70,643✔
114

115
  streamMutexLock(&gMetaRefMgt.mutex);
70,643✔
116

117
  p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
70,685✔
118
  if (p == NULL) {
70,685!
119
    code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
70,685✔
120
    if (code) {
70,685!
121
      stError("vgId:%d failed to put into refId mgt, refId:%" PRId64" %p, code:%s", (int32_t)vgId, *rid, rid,
×
122
              tstrerror(code));
123
      return code;
×
124
    } else { // not
125
//      stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
126
    }
127
  } else {
128
    stFatal("try to add refId:%"PRId64" vgId:%d, %p that already added into mgt", *rid, (int32_t) vgId, rid);
×
129
  }
130

131
  streamMutexUnlock(&gMetaRefMgt.mutex);
70,685✔
132
  return code;
70,685✔
133
}
134

135
void metaRefMgtRemove(int64_t* pRefId) {
39,006✔
136
  streamMutexLock(&gMetaRefMgt.mutex);
39,006✔
137

138
  int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
39,006✔
139
  taosMemoryFree(pRefId);
39,006✔
140
  streamMutexUnlock(&gMetaRefMgt.mutex);
39,006✔
141
}
39,006✔
142

143
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
13,572✔
144
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
13,572!
145
    stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
×
146
    return -1;
×
147
  }
148

149
  if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
13,936!
150
    stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
×
151
    return -1;
×
152
  }
153

154
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
13,938!
155
    stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
×
156
    return -1;
×
157
  }
158

159
  return 0;
13,936✔
160
}
161

162
//
163
// impl later
164
//
165
enum STREAM_STATE_VER {
166
  STREAM_STATA_NO_COMPATIBLE,
167
  STREAM_STATA_COMPATIBLE,
168
  STREAM_STATA_NEED_CONVERT,
169
};
170

171
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
13,936✔
172
  int8_t  ret = STREAM_STATA_COMPATIBLE;
13,936✔
173
  TBC*    pCur = NULL;
13,936✔
174
  int32_t code = 0;
13,936✔
175
  void*   pKey = NULL;
13,936✔
176
  int32_t kLen = 0;
13,936✔
177
  void*   pVal = NULL;
13,936✔
178
  int32_t vLen = 0;
13,936✔
179

180
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {  // no task info, no stream
13,936!
181
    return ret;
×
182
  }
183

184
  code = tdbTbcMoveToFirst(pCur);
13,937✔
185
  if (code) {
13,936✔
186
    stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
1!
187
            tstrerror(code));
188
    tdbTbcClose(pCur);
1✔
189
    return ret;
×
190
  }
191

192
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
13,935✔
193
    if (pVal == NULL || vLen == 0) {
159!
194
      break;
195
    }
196

197
    SDecoder        decoder;
198
    SCheckpointInfo info;
199
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
159✔
200
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
159!
201
      continue;
×
202
    }
203

204
    if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
159!
205
      ret = STREAM_STATA_NO_COMPATIBLE;
×
206
    } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
159!
207
      ret = STREAM_STATA_NEED_CONVERT;
159✔
208
    }
209

210
    tDecoderClear(&decoder);
159✔
211
    break;
159✔
212
  }
213

214
  tdbFree(pKey);
13,935✔
215
  tdbFree(pVal);
13,937✔
216
  tdbTbcClose(pCur);
13,938✔
217
  return ret;
13,938✔
218
}
219

220
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
159✔
221
  int32_t          code = 0;
159✔
222
  SBackendWrapper* pBackend = NULL;
159✔
223
  int64_t          chkpId = streamMetaGetLatestCheckpointId(pMeta);
159✔
224

225
  terrno = 0;
159✔
226
  bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
159✔
227
  if (exist == false) {
159!
228
    code = terrno;
159✔
229
    return code;
159✔
230
  }
231

232
  code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
×
233
  if (code) {
×
234
    return code;
×
235
  }
236

237
  void* pIter = taosHashIterate(pBackend->cfInst, NULL);
×
238
  while (pIter) {
×
239
    void* key = taosHashGetKey(pIter, NULL);
×
240
    code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
×
241
    if (code != 0) {
×
242
      stError("failed to cvt data");
×
243
      goto _EXIT;
×
244
    }
245

246
    pIter = taosHashIterate(pBackend->cfInst, pIter);
×
247
  }
248

249
_EXIT:
×
250
  streamBackendCleanup((void*)pBackend);
×
251

252
  if (code == 0) {
×
253
    char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32);
×
254
    if (state != NULL) {
×
255
      sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
×
256
      taosRemoveDir(state);
×
257
      taosMemoryFree(state);
×
258
    } else {
259
      stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
×
260
    }
261
  }
262

263
  return code;
×
264
}
265

266
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
13,936✔
267
  int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
13,936✔
268
  if (compatible == STREAM_STATA_COMPATIBLE) {
13,937✔
269
    return 0;
13,779✔
270
  } else if (compatible == STREAM_STATA_NEED_CONVERT) {
158!
271
    stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
159!
272
    return streamMetaCvtDbFormat(pMeta);
159✔
273
  } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
×
274
    stError(
×
275
        "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
276
        "manually",
277
        pMeta->vgId, tsDataDir);
278

279
    return -1;
×
280
  }
281

282
  return 0;
×
283
}
284

285
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) {
7,275✔
286
  int32_t code = 0;
7,275✔
287
  int64_t chkpId = pTask->chkInfo.checkpointId;
7,275✔
288

289
  streamMutexLock(&pMeta->backendMutex);
7,275✔
290
  void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
7,277✔
291
  if ((ppBackend != NULL) && (*ppBackend != NULL)) {
7,277!
292
    void* p = taskDbAddRef(*ppBackend);
2,525✔
293
    if (p == NULL) {
2,525!
294
      stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
×
295
      return TSDB_CODE_FAILED;
×
296
    }
297

298
    STaskDbWrapper* pBackend = *ppBackend;
2,525✔
299
    pBackend->pMeta = pMeta;
2,525✔
300
    pTask->pBackend = pBackend;
2,525✔
301

302
    streamMutexUnlock(&pMeta->backendMutex);
2,525✔
303
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
2,525✔
304
    return 0;
2,525✔
305
  }
306

307
  STaskDbWrapper* pBackend = NULL;
4,752✔
308
  int64_t         processVer = -1;
4,752✔
309
  while (1) {
310
    code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
4,752✔
311
    if (code == 0) {
4,752!
312
      break;
4,752✔
313
    }
314

315
    streamMutexUnlock(&pMeta->backendMutex);
×
316
    taosMsleep(1000);
×
317

318
    stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
×
319
    streamMutexLock(&pMeta->backendMutex);
×
320
  }
321

322
  int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
4,752✔
323
  pTask->pBackend = pBackend;
4,752✔
324
  pBackend->refId = tref;
4,752✔
325
  pBackend->pTask = pTask;
4,752✔
326
  pBackend->pMeta = pMeta;
4,752✔
327

328
  if (processVer != -1) {
4,752✔
329
    if (pTask->chkInfo.processedVer != processVer) {
32!
330
      stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
×
331
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
332
      pTask->chkInfo.processedVer = processVer;
×
333
      pTask->chkInfo.checkpointVer = processVer;
×
334
      pTask->chkInfo.nextProcessVer = processVer + 1;
×
335
    } else {
336
      stInfo("s-task:%s vgId:%d processedVer:%" PRId64
32!
337
             " in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
338
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
339
    }
340
  }
341

342
  code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
4,752✔
343
  if (code) {
4,752!
344
    stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
×
345
  }
346
  streamMutexUnlock(&pMeta->backendMutex);
4,752✔
347

348
  stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
4,752✔
349
  return 0;
4,752✔
350
}
351

352
void streamMetaRemoveDB(void* arg, char* key) {
4,591✔
353
  if (arg == NULL || key == NULL) return;
4,591!
354

355
  SStreamMeta* pMeta = arg;
4,591✔
356
  streamMutexLock(&pMeta->backendMutex);
4,591✔
357
  int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
4,591✔
358
  if (code) {
4,591!
359
    stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
×
360
  }
361

362
  streamMutexUnlock(&pMeta->backendMutex);
4,591✔
363
}
364

365
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
13,673✔
366
                       int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
367
  QRY_PARAM_CHECK(p);
13,673!
368
  int32_t code = 0;
13,673✔
369
  int32_t lino = 0;
13,673✔
370

371
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
13,673✔
372
  if (pMeta == NULL) {
13,935!
373
    stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
×
374
    return terrno;
×
375
  }
376

377
  int32_t len = strlen(path) + 64;
13,935✔
378
  char*   tpath = taosMemoryCalloc(1, len);
13,935✔
379
  TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
13,931!
380

381
  sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
13,931✔
382
  pMeta->path = tpath;
13,931✔
383

384
  code = streamMetaOpenTdb(pMeta);
13,931✔
385
  TSDB_CHECK_CODE(code, lino, _err);
13,936!
386

387
  if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
13,936!
388
    stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
×
389
            tstrerror(terrno));
390
    TSDB_CHECK_CODE(code, lino, _err);
×
391
  }
392

393
  if ((code = streamMetaBegin(pMeta) < 0)) {
13,936!
394
    stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
×
395
    goto _err;
×
396
  }
397

398
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
13,936✔
399
  pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
13,935✔
400
  TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
13,935!
401

402
  pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
13,935✔
403
  TSDB_CHECK_NULL(pMeta->updateInfo.pTasks, code, lino, _err, terrno);
13,938!
404

405
  code = streamMetaInitStartInfo(&pMeta->startInfo);
13,938✔
406
  TSDB_CHECK_CODE(code, lino, _err);
13,937!
407

408
  // task list
409
  pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
13,937✔
410
  TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
13,937!
411

412
  pMeta->scanInfo.scanCounter = 0;
13,938✔
413
  pMeta->vgId = vgId;
13,938✔
414
  pMeta->ahandle = ahandle;
13,938✔
415
  pMeta->buildTaskFn = buildTaskFn;
13,938✔
416
  pMeta->expandTaskFn = expandTaskFn;
13,938✔
417
  pMeta->stage = stage;
13,938✔
418
  pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
13,938✔
419
  pMeta->updateInfo.activeTransId = -1;
13,938✔
420
  pMeta->updateInfo.completeTransId = -1;
13,938✔
421

422
  pMeta->startInfo.completeFn = fn;
13,938✔
423
  pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
13,938✔
424
  TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
13,938!
425

426
  pMeta->numOfPausedTasks = 0;
13,938✔
427
  pMeta->numOfStreamTasks = 0;
13,938✔
428
  pMeta->closeFlag = false;
13,938✔
429

430
  stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
13,938✔
431

432
  // set the attribute when running on Linux OS
433
  TdThreadRwlockAttr attr;
434
  code = taosThreadRwlockAttrInit(&attr);
13,939✔
435
  TSDB_CHECK_CODE(code, lino, _err);
13,938!
436

437
#ifdef LINUX
438
  code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
13,938✔
439
  TSDB_CHECK_CODE(code, lino, _err);
13,938!
440
#endif
441

442
  code = taosThreadRwlockInit(&pMeta->lock, &attr);
13,938✔
443
  TSDB_CHECK_CODE(code, lino, _err);
13,938!
444

445
  code = taosThreadRwlockAttrDestroy(&attr);
13,938✔
446
  TSDB_CHECK_CODE(code, lino, _err);
13,938!
447

448
  code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
13,938✔
449
  TSDB_CHECK_CODE(code, lino, _err);
13,937!
450

451
  code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
13,937✔
452
  TSDB_CHECK_CODE(code, lino, _err);
13,937!
453

454
  // add refId at the end of initialization function
455
  pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
13,937✔
456

457
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
13,936✔
458
  TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
13,938!
459

460
  memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
13,938✔
461

462
  code = metaRefMgtAdd(pMeta->vgId, pRid);
13,938✔
463
  TSDB_CHECK_CODE(code, lino, _err);
13,938!
464

465
  code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
13,938✔
466

467
  TSDB_CHECK_CODE(code, lino, _err);
13,938!
468

469
  *p = pMeta;
13,938✔
470
  return code;
13,938✔
471

472
_err:
×
473
  taosMemoryFree(pMeta->path);
×
474
  if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
×
475
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
×
476
  if (pMeta->pTaskDb) {
×
477
    tdbTbClose(pMeta->pTaskDb);
×
478
    pMeta->pTaskDb = NULL;
×
479
  }
480
  if (pMeta->pCheckpointDb) {
×
481
    tdbTbClose(pMeta->pCheckpointDb);
×
482
  }
483
  if (pMeta->db) {
×
484
    tdbClose(pMeta->db);
×
485
  }
486

487
  if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
×
488
  if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
×
489
  if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
×
490
  if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
×
491
  if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
×
492

493
  taosMemoryFree(pMeta);
×
494

495
  stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
×
496
  return code;
×
497
}
498

499
// todo refactor: the lock shoud be restricted in one function
500
#ifdef BUILD_NO_CALL
501
void streamMetaInitBackend(SStreamMeta* pMeta) {
502
  pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
503
  if (pMeta->streamBackend == NULL) {
504
    streamMetaWUnLock(pMeta);
505

506
    while (1) {
507
      streamMetaWLock(pMeta);
508
      pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
509
      if (pMeta->streamBackend != NULL) {
510
        break;
511
      }
512

513
      streamMetaWUnLock(pMeta);
514
      stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
515
      taosMsleep(100);
516
    }
517
  }
518

519
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
520
  streamBackendLoadCheckpointInfo(pMeta);
521
}
522
#endif
523

524
void streamMetaClear(SStreamMeta* pMeta) {
13,977✔
525
  // remove all existed tasks in this vnode
526
  void* pIter = NULL;
13,977✔
527
  while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
16,551✔
528
    int64_t      refId = *(int64_t*)pIter;
2,574✔
529
    SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
2,574✔
530
    if (p == NULL) {
2,574✔
531
      continue;
2,372✔
532
    }
533

534
    // release the ref by timer
535
    if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) {  // one more ref in timer
202!
536
      stDebug("s-task:%s stop schedTimer", p->id.idStr);
×
537
      streamTmrStop(p->schedInfo.pDelayTimer);
×
538
      p->info.delaySchedParam = 0;
×
539
    }
540

541
    int32_t code = taosRemoveRef(streamTaskRefPool, refId);
202✔
542
    if (code) {
202!
543
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
544
    }
545

546
    code = taosReleaseRef(streamTaskRefPool, refId);
202✔
547
    if (code) {
202!
548
      stError("vgId:%d failed to release refId:%" PRId64, pMeta->vgId, refId);
×
549
    }
550
  }
551

552
  if (pMeta->streamBackendRid != 0) {
13,977!
553
    int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
×
554
    if (code) {
×
555
      stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
×
556
    }
557
  }
558

559
  taosHashClear(pMeta->pTasksMap);
13,977✔
560

561
  taosArrayClear(pMeta->pTaskList);
13,977✔
562
  taosArrayClear(pMeta->chkpSaved);
13,977✔
563
  taosArrayClear(pMeta->chkpInUse);
13,977✔
564

565
  pMeta->numOfStreamTasks = 0;
13,977✔
566
  pMeta->numOfPausedTasks = 0;
13,977✔
567

568
  // the willrestart/starting flag can NOT be cleared
569
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
13,977✔
570
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
13,977✔
571
  pMeta->startInfo.readyTs = 0;
13,977✔
572
}
13,977✔
573

574
void streamMetaClose(SStreamMeta* pMeta) {
13,925✔
575
  stDebug("vgId:%d start to close stream meta", pMeta->vgId);
13,925✔
576
  if (pMeta == NULL) {
13,938!
577
    return;
×
578
  }
579
  int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
13,938✔
580
  if (code) {
13,938!
581
    stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
×
582
  }
583
}
584

585
void streamMetaCloseImpl(void* arg) {
13,938✔
586
  SStreamMeta* pMeta = arg;
13,938✔
587
  if (pMeta == NULL) {
13,938!
588
    return;
×
589
  }
590

591
  int32_t code = 0;
13,938✔
592
  int32_t vgId = pMeta->vgId;
13,938✔
593
  stDebug("vgId:%d start to do-close stream meta", vgId);
13,938✔
594

595
  streamMetaWLock(pMeta);
13,938✔
596
  streamMetaClear(pMeta);
13,938✔
597
  streamMetaWUnLock(pMeta);
13,938✔
598

599
  // already log the error, ignore here
600
  tdbAbort(pMeta->db, pMeta->txn);
13,938✔
601
  tdbTbClose(pMeta->pTaskDb);
13,935✔
602
  tdbTbClose(pMeta->pCheckpointDb);
13,938✔
603
  tdbClose(pMeta->db);
13,938✔
604

605
  taosArrayDestroy(pMeta->pTaskList);
13,937✔
606
  taosArrayDestroy(pMeta->chkpSaved);
13,938✔
607
  taosArrayDestroy(pMeta->chkpInUse);
13,937✔
608

609
  taosHashCleanup(pMeta->pTasksMap);
13,937✔
610
  taosHashCleanup(pMeta->pTaskDbUnique);
13,938✔
611
  taosHashCleanup(pMeta->updateInfo.pTasks);
13,938✔
612

613
  streamMetaClearStartInfo(&pMeta->startInfo);
13,938✔
614

615
  destroyMetaHbInfo(pMeta->pHbInfo);
13,938✔
616
  pMeta->pHbInfo = NULL;
13,938✔
617

618
  taosMemoryFree(pMeta->path);
13,938✔
619
  streamMutexDestroy(&pMeta->backendMutex);
13,938✔
620

621
  bkdMgtDestroy(pMeta->bkdChkptMgt);
13,936✔
622

623
  pMeta->role = NODE_ROLE_UNINIT;
13,936✔
624
  code = taosThreadRwlockDestroy(&pMeta->lock);
13,936✔
625
  if (code) {
13,937!
626
    stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
×
627
  }
628

629
  taosMemoryFree(pMeta);
13,937✔
630
  stDebug("vgId:%d end to close stream meta", vgId);
13,937✔
631
}
632

633
// todo let's check the status for each task
634
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
19,167✔
635
  int32_t vgId = pTask->pMeta->vgId;
19,167✔
636
  void*   buf = NULL;
19,167✔
637
  int32_t len;
638
  int32_t code;
639
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
19,167!
640
  if (code < 0) {
19,154!
641
    return -1;
×
642
  }
643

644
  buf = taosMemoryCalloc(1, len);
19,154✔
645
  if (buf == NULL) {
19,165!
646
    return terrno;
×
647
  }
648

649
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
19,165✔
650
    pTask->ver = SSTREAM_TASK_VER;
22✔
651
  }
652

653
  SEncoder encoder = {0};
19,165✔
654
  tEncoderInit(&encoder, buf, len);
19,165✔
655
  code = tEncodeStreamTask(&encoder, pTask);
19,168✔
656
  tEncoderClear(&encoder);
19,168✔
657

658
  if (code == -1) {
19,159!
659
    stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
660
    return TSDB_CODE_INVALID_MSG;
×
661
  }
662

663
  int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
19,159✔
664

665
  code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
19,159✔
666
  if (code != TSDB_CODE_SUCCESS) {
19,160!
667
    code = terrno;
×
668
    stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
×
669
            vgId, pTask->id.refId, tstrerror(code));
670

671
    int64_t refId = pTask->id.refId;
×
672
    int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
×
673
    if (ret != 0) {
×
674
      stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id[1], refId);
×
675
    }
676
  } else {
677
    stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
19,160✔
678
  }
679

680
  taosMemoryFree(buf);
19,163✔
681
  return code;
19,159✔
682
}
683

684
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
11,820✔
685
  int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
11,820✔
686
  int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
11,820✔
687
  if (code != 0) {
11,818!
688
    stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
×
689
            tstrerror(terrno));
690
  } else {
691
    stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
11,818✔
692
  }
693

694
  return code;
11,814✔
695
}
696

697
// add to the ready tasks hash map, not the restored tasks hash map
698
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
13,847✔
699
  *pAdded = false;
13,847✔
700

701
  int32_t code = 0;
13,847✔
702
  int64_t refId = 0;
13,847✔
703
  STaskId id = streamTaskGetTaskId(pTask);
13,847✔
704
  void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
13,839✔
705

706
  if (p != NULL) {
13,852✔
707
    stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId);
197✔
708
    tFreeStreamTask(pTask);
197✔
709
    return code;
197✔
710
  }
711

712
  if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
13,655!
713
    tFreeStreamTask(pTask);
×
714
    return code;
×
715
  }
716

717
  p = taosArrayPush(pMeta->pTaskList, &pTask->id);
13,658✔
718
  if (p == NULL) {
13,658!
719
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
720
    tFreeStreamTask(pTask);
×
721
    return terrno;
×
722
  }
723

724
  pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
13,658✔
725
  code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
13,658✔
726
  if (code) {
13,658!
727
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
UNCOV
728
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
729

730
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
731
    if (ret != 0) {
×
UNCOV
732
      stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId);
×
733
    }
UNCOV
734
    return code;
×
735
  }
736

737
  if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
13,658!
738
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
739
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
740

741
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
UNCOV
742
    if (ret) {
×
UNCOV
743
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
744
    }
745
    return code;
×
746
  }
747

748
  if ((code = streamMetaCommit(pMeta)) != 0) {
13,643!
UNCOV
749
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
750
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
751

UNCOV
752
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
UNCOV
753
    if (ret) {
×
UNCOV
754
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
755
    }
756

UNCOV
757
    return code;
×
758
  }
759

760
  if (pTask->info.fillHistory == 0) {
13,657✔
761
    int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
8,700✔
762
  }
763

764
  *pAdded = true;
13,658✔
765
  return code;
13,658✔
766
}
767

768
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
13,594,939✔
769
  int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
13,594,939✔
770
  int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
13,595,104✔
771
  if (sizeInList != size) {
13,595,133!
UNCOV
772
    stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
×
773
  }
774

775
  return size;
13,595,129✔
776
}
777

778
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
723,476✔
779
  QRY_PARAM_CHECK(pTask);
723,476!
780
  STaskId  id = {.streamId = streamId, .taskId = taskId};
723,476✔
781
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
723,476✔
782
  if (pTaskRefId == NULL) {
723,149✔
783
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
2,057✔
784
  }
785

786
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
721,092✔
787
  if (p == NULL) {
721,898✔
788
    stDebug("s-task:%x failed to acquire task refId:%"PRId64", may have been destoried", taskId, *pTaskRefId);
23✔
789
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
9✔
790
  }
791

792
  if (p->id.refId != *pTaskRefId) {
721,875!
793
    stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
×
794
            p->id.refId);
UNCOV
795
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
UNCOV
796
    if (ret) {
×
UNCOV
797
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
798
    }
799

800
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
801
  }
802

803
  if (streamTaskShouldStop(p)) {
721,875✔
804
    stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
299✔
805
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
299✔
806
    if (ret) {
299!
UNCOV
807
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
808
    }
809
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
299✔
810
  }
811

812
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
721,126✔
813
  *pTask = p;
720,954✔
814
  return TSDB_CODE_SUCCESS;
720,954✔
815
}
816

817
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
126,777✔
818
  QRY_PARAM_CHECK(pTask);
126,777!
819
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
126,777✔
820

821
  if (pTaskRefId == NULL) {
126,778✔
822
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
1,985✔
823
  }
824

825
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
124,793✔
826
  if (p == NULL) {
124,822✔
827
    stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
2!
828
            *pTaskRefId);
829
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
830
  }
831

832
  if (p->id.refId != *pTaskRefId) {
124,820!
833
    stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
×
834
            *pTaskRefId, p->id.refId);
UNCOV
835
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
UNCOV
836
    if (ret) {
×
UNCOV
837
      stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
×
838
    }
839

UNCOV
840
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
841
  }
842

843
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
124,820✔
844
  *pTask = p;
124,818✔
845
  return TSDB_CODE_SUCCESS;
124,818✔
846
}
847

848
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
720,637✔
849
  streamMetaRLock(pMeta);
720,637✔
850
  int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
721,013✔
851
  streamMetaRUnLock(pMeta);
720,571✔
852
  return code;
720,837✔
853
}
854

855
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
1,093,695✔
856
  if (pTask == NULL) {
1,093,695✔
857
    return;
12✔
858
  }
859

860
  int32_t taskId = pTask->id.taskId;
1,093,683✔
861
  int64_t refId = pTask->id.refId;
1,093,683✔
862
  stDebug("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
1,093,683✔
863
  int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
1,093,689✔
864
  if (ret) {
1,094,300!
UNCOV
865
    stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
×
866
  }
867
}
868

869
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
11,840✔
870
  bool remove = false;
11,840✔
871
  for (int32_t i = 0; i < num; ++i) {
29,956!
872
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
29,957✔
873
    if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
29,950✔
874
      taosArrayRemove(pTaskList, i);
11,834✔
875
      remove = true;
11,826✔
876
      break;
11,826✔
877
    }
878
  }
879

880
  if (!remove) {
11,825!
UNCOV
881
    stError("s-task:0x%x not in meta task list, internal error", id->taskId);
×
882
  }
883
}
11,825✔
884

885
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
11,825✔
886
  int32_t code = 0;
11,825✔
887
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
11,825✔
888
    code = streamTaskSendCheckpointSourceRsp(pTask);
5,872✔
889
    if (code) {
5,873!
UNCOV
890
      stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
×
891
              tstrerror(code));
892
    }
893
  }
894

895
  // let's kill the query procedure within stream, to end it ASAP.
896
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
11,829✔
897
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
5,916✔
898
    if (code != TSDB_CODE_SUCCESS) {
5,916!
UNCOV
899
      stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
900
    }
901
  }
902
  return code;
11,828✔
903
}
904

905
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
11,837✔
906
  SStreamTask* pTask = NULL;
11,837✔
907
  int32_t      vgId = pMeta->vgId;
11,837✔
908
  int32_t      code = 0;
11,837✔
909
  STaskId      id = {.streamId = streamId, .taskId = taskId};
11,837✔
910

911
  streamMetaWLock(pMeta);
11,837✔
912

913
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
11,855✔
914
  if (code == 0) {
11,858✔
915
    // desc the paused task counter
916
    if (streamTaskShouldPause(pTask)) {
11,844!
917
      int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
×
UNCOV
918
      stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
×
919
    }
920

921
    // handle the dropping event
922
    code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
11,845✔
923
    if (code) {
11,836!
UNCOV
924
      stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
×
925
    }
926

927
    stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
11,836✔
928

929
    // it is a fill-history task, remove the related stream task's id that points to it
930
    if (pTask->info.fillHistory == 0) {
11,836✔
931
      int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
7,000✔
932
    }
933

934
    code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
11,843✔
935
    doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
11,841✔
936
    code = streamMetaRemoveTask(pMeta, &id);
11,825✔
937
    if (code) {
11,813!
UNCOV
938
      stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
×
939
    }
940

941
    int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
11,813✔
942
    int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
11,824✔
943
    if (sizeInList != size) {
11,826!
UNCOV
944
      stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
×
945
    }
946

947
    if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
11,826✔
948
      stDebug("s-task:%s stop schedTimer", pTask->id.idStr);
953✔
949
      streamTmrStop(pTask->schedInfo.pDelayTimer);
953✔
950
      pTask->info.delaySchedParam = 0;
953✔
951
    }
952

953

954
    int64_t refId = pTask->id.refId;
11,826✔
955
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
11,826✔
956
    if (ret != 0) {
11,836!
UNCOV
957
      stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId);
×
958
    }
959

960
    streamMetaReleaseTask(pMeta, pTask);
11,836✔
961
    streamMetaWUnLock(pMeta);
11,842✔
962
  } else {
963
    stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
14!
964
    streamMetaWUnLock(pMeta);
14✔
965
  }
966

967
  return 0;
11,856✔
968
}
969

970
int32_t streamMetaBegin(SStreamMeta* pMeta) {
13,936✔
971
  streamMetaWLock(pMeta);
13,936✔
972
  int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
13,937✔
973
                          TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
974
  if (code) {
13,938!
UNCOV
975
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
976
  }
977
  streamMetaWUnLock(pMeta);
13,938✔
978
  return code;
13,937✔
979
}
980

981
int32_t streamMetaCommit(SStreamMeta* pMeta) {
40,212✔
982
  int32_t code = tdbCommit(pMeta->db, pMeta->txn);
40,212✔
983
  if (code != 0) {
40,238!
984
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
985
    stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
986
            pMeta->fatalInfo.line);
987
  }
988

989
  code = tdbPostCommit(pMeta->db, pMeta->txn);
40,238✔
990
  if (code != 0) {
40,240!
UNCOV
991
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
UNCOV
992
    stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
993
            pMeta->fatalInfo.line);
994
    return code;
×
995
  }
996

997
  code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
40,240✔
998
                  TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
999
  if (code != 0) {
40,241!
UNCOV
1000
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
UNCOV
1001
    stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
×
1002
  } else {
1003
    stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
40,241✔
1004
  }
1005

1006
  return code;
40,242✔
1007
}
1008

1009
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
193✔
1010
  int64_t checkpointId = 0;
193✔
1011
  int32_t code = 0;
193✔
1012

1013
  TBC* pCur = NULL;
193✔
1014
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
193!
UNCOV
1015
    stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
UNCOV
1016
    return checkpointId;
×
1017
  }
1018

1019
  void*    pKey = NULL;
193✔
1020
  int32_t  kLen = 0;
193✔
1021
  void*    pVal = NULL;
193✔
1022
  int32_t  vLen = 0;
193✔
1023
  SDecoder decoder;
1024

1025
  code = tdbTbcMoveToFirst(pCur);
193✔
1026
  if (code) {
193!
UNCOV
1027
    stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
UNCOV
1028
    tdbTbcClose(pCur);
×
UNCOV
1029
    return checkpointId;
×
1030
  }
1031

1032
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
789✔
1033
    if (pVal == NULL || vLen == 0) {
596!
1034
      break;
1035
    }
1036
    SCheckpointInfo info;
1037
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
596✔
1038
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
596!
UNCOV
1039
      continue;
×
1040
    }
1041
    tDecoderClear(&decoder);
596✔
1042

1043
    checkpointId = TMAX(checkpointId, info.checkpointId);
596✔
1044
  }
1045

1046
  stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
193✔
1047

1048
  tdbFree(pKey);
193✔
1049
  tdbFree(pVal);
193✔
1050

1051
  tdbTbcClose(pCur);
193✔
1052
  return checkpointId;
193✔
1053
}
1054

1055
// not allowed to return error code
1056
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
13,993✔
1057
  TBC*     pCur = NULL;
13,993✔
1058
  void*    pKey = NULL;
13,993✔
1059
  int32_t  kLen = 0;
13,993✔
1060
  void*    pVal = NULL;
13,993✔
1061
  int32_t  vLen = 0;
13,993✔
1062
  SDecoder decoder;
1063
  int32_t  vgId = 0;
13,993✔
1064
  int32_t  code = 0;
13,993✔
1065
  SArray*  pRecycleList = NULL;
13,993✔
1066

1067
  if (pMeta == NULL) {
13,993!
1068
    return;
×
1069
  }
1070

1071
  vgId = pMeta->vgId;
13,993✔
1072
  pRecycleList = taosArrayInit(4, sizeof(STaskId));
13,993✔
1073
  if (pRecycleList == NULL) {
13,993!
UNCOV
1074
    stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
×
1075
    return;
×
1076
  }
1077

1078
  stInfo("vgId:%d load stream tasks from meta files", vgId);
13,993✔
1079

1080
  code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
13,995✔
1081
  if (code != TSDB_CODE_SUCCESS) {
13,993!
1082
    stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1083
    taosArrayDestroy(pRecycleList);
×
1084
    return;
×
1085
  }
1086

1087
  code = tdbTbcMoveToFirst(pCur);
13,993✔
1088
  if (code) {
13,993!
UNCOV
1089
    stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
UNCOV
1090
    taosArrayDestroy(pRecycleList);
×
UNCOV
1091
    tdbTbcClose(pCur);
×
UNCOV
1092
    return;
×
1093
  }
1094

1095
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
14,754✔
1096
    if (pVal == NULL || vLen == 0) {
762!
1097
      break;
1098
    }
1099

1100
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
762✔
1101
    if (pTask == NULL) {
762!
1102
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1103
      stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
×
1104
      break;
×
1105
    }
1106

1107
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
762✔
1108
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
762!
UNCOV
1109
      tDecoderClear(&decoder);
×
UNCOV
1110
      tFreeStreamTask(pTask);
×
UNCOV
1111
      stError(
×
1112
          "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
1113
          "stream manually",
1114
          vgId, tsDataDir);
UNCOV
1115
      break;
×
1116
    }
1117
    tDecoderClear(&decoder);
762✔
1118

1119
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
762!
UNCOV
1120
      int32_t taskId = pTask->id.taskId;
×
UNCOV
1121
      STaskId id = streamTaskGetTaskId(pTask);
×
1122

1123
      tFreeStreamTask(pTask);
×
1124
      void*   px = taosArrayPush(pRecycleList, &id);
×
UNCOV
1125
      if (px == NULL) {
×
UNCOV
1126
        stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
×
1127
      }
1128

UNCOV
1129
      int32_t total = taosArrayGetSize(pRecycleList);
×
UNCOV
1130
      stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
×
UNCOV
1131
      continue;
×
1132
    }
1133

1134
    stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
762✔
1135
            pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
1136

1137
    // do duplicate task check.
1138
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
762✔
1139
    void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
762✔
1140
    if (p == NULL) {
762!
1141
      code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
762✔
1142
      if (code < 0) {
762!
1143
        stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
×
UNCOV
1144
        tFreeStreamTask(pTask);
×
UNCOV
1145
        continue;
×
1146
      }
1147

1148
      void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
762✔
1149
      if (px == NULL) {
762!
UNCOV
1150
        stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
×
1151
      }
1152
    } else {
1153
      // todo this should replace the existed object put by replay creating stream task msg from mnode
UNCOV
1154
      stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
×
1155
      tFreeStreamTask(pTask);
×
1156
      continue;
×
1157
    }
1158

1159
    pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
762✔
1160

1161
    if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
762!
1162
      int64_t refId = pTask->id.refId;
×
UNCOV
1163
      stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
×
1164
              pTask->id.taskId, tstrerror(terrno), refId);
1165

UNCOV
1166
      void*   px = taosArrayPop(pMeta->pTaskList);
×
UNCOV
1167
      int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
UNCOV
1168
      if (ret != 0) {
×
UNCOV
1169
        stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1170
      }
UNCOV
1171
      continue;
×
1172
    }
1173

1174
    stInfo("s-task:0x%x vgId:%d set refId:%"PRId64, (int32_t) id.taskId, vgId, pTask->id.refId);
762!
1175
    if (pTask->info.fillHistory == 0) {
762✔
1176
      int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
553✔
1177
    }
1178

1179
    if (streamTaskShouldPause(pTask)) {
762!
UNCOV
1180
      int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
1181
    }
1182
  }
1183

1184
  tdbFree(pKey);
13,993✔
1185
  tdbFree(pVal);
13,993✔
1186

1187
  tdbTbcClose(pCur);
13,993✔
1188

1189
  if (taosArrayGetSize(pRecycleList) > 0) {
13,993!
UNCOV
1190
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
×
UNCOV
1191
      STaskId* pId = taosArrayGet(pRecycleList, i);
×
UNCOV
1192
      code = streamMetaRemoveTask(pMeta, pId);
×
UNCOV
1193
      if (code) {
×
UNCOV
1194
        stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
×
1195
      }
1196
    }
1197
  }
1198

1199
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
13,993✔
1200
  stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
13,992✔
1201
          pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
1202

1203
  taosArrayDestroy(pRecycleList);
13,993✔
1204
  code = streamMetaCommit(pMeta);
13,993✔
1205
  if (code) {
13,993!
UNCOV
1206
    stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
×
1207
  }
1208
}
1209

1210
void streamMetaNotifyClose(SStreamMeta* pMeta) {
13,935✔
1211
  int32_t vgId = pMeta->vgId;
13,935✔
1212
  int64_t startTs = 0;
13,935✔
1213
  int32_t sendCount = 0;
13,935✔
1214

1215
  streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
13,935✔
1216
  stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
13,935!
1217
         vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
1218

1219
  // wait for the stream meta hb function stopping
1220
  streamMetaWaitForHbTmrQuit(pMeta);
13,938✔
1221
  pMeta->closeFlag = true;
13,938✔
1222

1223
  stDebug("vgId:%d start to check all tasks for closing", vgId);
13,938✔
1224
  int64_t st = taosGetTimestampMs();
13,937✔
1225

1226
  streamMetaRLock(pMeta);
13,937✔
1227

1228
  SArray* pTaskList = NULL;
13,938✔
1229
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
13,938✔
1230
  if (code != TSDB_CODE_SUCCESS) {
1231
  }
1232

1233
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
13,938✔
1234
  for (int32_t i = 0; i < numOfTasks; ++i) {
16,371✔
1235
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
2,434✔
1236
    SStreamTask*   pTask = NULL;
2,434✔
1237

1238
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
2,434✔
1239
    if (code != TSDB_CODE_SUCCESS) {
2,433✔
1240
      continue;
62✔
1241
    }
1242

1243
    int64_t refId = pTask->id.refId;
2,371✔
1244
    int32_t ret = streamTaskStop(pTask);
2,371✔
1245
    if (ret) {
2,372!
UNCOV
1246
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
1247
    }
1248

1249
    streamMetaReleaseTask(pMeta, pTask);
2,372✔
1250
    ret = taosRemoveRef(streamTaskRefPool, refId);
2,372✔
1251
    if (ret) {
2,372!
UNCOV
1252
      stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
×
1253
    }
1254
  }
1255

1256
  taosArrayDestroy(pTaskList);
13,937✔
1257

1258
  double el = (taosGetTimestampMs() - st) / 1000.0;
13,937✔
1259
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
13,937✔
1260
  streamMetaRUnLock(pMeta);
13,938✔
1261
}
13,938✔
1262

1263
void streamMetaStartHb(SStreamMeta* pMeta) {
11,719✔
1264
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
11,719✔
1265
  if (pRid == NULL) {
11,719!
1266
    stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
×
UNCOV
1267
    return;
×
1268
  }
1269

1270
  *pRid = pMeta->rid;
11,719✔
1271
  int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
11,719✔
1272
  if (code) {
11,719!
UNCOV
1273
    return;
×
1274
  }
1275

1276
  streamMetaHbToMnode(pRid, NULL);
11,719✔
1277
}
1278

1279
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
14,013✔
1280
  QRY_PARAM_CHECK(pList);
14,013!
1281

1282
  int32_t code = 0;
14,013✔
1283
  SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
14,013✔
1284
  if (pTaskList == NULL) {
14,013!
UNCOV
1285
    stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
×
UNCOV
1286
    return terrno;
×
1287
  }
1288

1289
  *pList = pTaskList;
14,013✔
1290

1291
  bool sendMsg = pMeta->sendMsgBeforeClosing;
14,013✔
1292
  if (!sendMsg) {
14,013✔
1293
    stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
14,003✔
1294
    return TSDB_CODE_SUCCESS;
14,003✔
1295
  }
1296

1297
  stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
10!
1298

1299
  // send hb msg to mnode before closing all tasks.
1300
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
10✔
1301
  for (int32_t i = 0; i < numOfTasks; ++i) {
10!
UNCOV
1302
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
1303
    SStreamTask*   pTask = NULL;
×
1304

UNCOV
1305
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
UNCOV
1306
    if (code != TSDB_CODE_SUCCESS) {  // this error is ignored
×
UNCOV
1307
      continue;
×
1308
    }
1309

UNCOV
1310
    streamTaskSetCheckpointFailed(pTask);
×
UNCOV
1311
    streamMetaReleaseTask(pMeta, pTask);
×
1312
  }
1313

1314
  code = streamMetaSendHbHelper(pMeta);
10✔
1315
  pMeta->sendMsgBeforeClosing = false;
10✔
1316
  return TSDB_CODE_SUCCESS;  // always return true
10✔
1317
}
1318

1319
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
16,795✔
1320
  streamMetaWLock(pMeta);
16,795✔
1321

1322
  int64_t prevStage = pMeta->stage;
16,796✔
1323
  pMeta->stage = stage;
16,796✔
1324

1325
  // mark the sign to send msg before close all tasks
1326
  // 1. for leader vnode, always send msg before closing
1327
  // 2. for follower vnode, if it's is changed from leader, also sending msg before closing.
1328
  if (pMeta->role == NODE_ROLE_LEADER) {
16,796✔
1329
    pMeta->sendMsgBeforeClosing = true;
11✔
1330
  }
1331

1332
  pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
16,796✔
1333
  streamMetaWUnLock(pMeta);
16,796✔
1334

1335
  if (isLeader) {
16,796✔
1336
    stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
11,719!
1337
           pMeta->vgId, prevStage, stage, isLeader, pMeta->rid);
1338
    streamMetaStartHb(pMeta);
11,719✔
1339
  } else {
1340
    stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
5,077!
1341
           prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing);
1342
  }
1343
}
16,796✔
1344

1345
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
1,795✔
1346
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
1,795✔
1347
  for (int32_t i = 0; i < num; ++i) {
4,868✔
1348
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
3,124✔
1349
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
3,124✔
1350
    SStreamTask*   pTask = NULL;
3,124✔
1351
    int32_t        code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
3,124✔
1352

1353
    if (code == 0) {
3,124!
1354
      if (pTask->status.downstreamReady == 0) {
3,124✔
1355
        streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
51✔
1356
        return false;
51✔
1357
      }
1358
      streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
3,073✔
1359
    }
1360
  }
1361

1362
  return true;
1,744✔
1363
}
1364

1365
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
168✔
1366
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
168✔
1367

1368
  stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
168✔
1369
  if (numOfTasks == 0) {
168!
UNCOV
1370
    return TSDB_CODE_SUCCESS;
×
1371
  }
1372

1373
  for (int32_t i = 0; i < numOfTasks; ++i) {
780✔
1374
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
612✔
1375
    STaskId        id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
612✔
1376
    SStreamTask*   pTask = NULL;
612✔
1377
    int32_t        code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
612✔
1378
    if (code == 0) {
612!
1379
      streamTaskResetStatus(pTask);
612✔
1380
      streamMetaReleaseTask(pMeta, pTask);
612✔
1381
    }
1382
  }
1383

1384
  return 0;
168✔
1385
}
1386

1387
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
90✔
1388
                                     int64_t startTs) {
1389
  const char* id = pTask->id.idStr;
90✔
1390
  int32_t     vgId = pMeta->vgId;
90✔
1391
  int32_t     code = 0;
90✔
1392

1393
  // keep the already updated info
1394
  STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
90✔
1395
  code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
90✔
1396
  if (code != 0) {
90!
UNCOV
1397
    stError("s-task:%s failed to put updateTask into update list", id);
×
1398
  }
1399

1400
  int64_t el = taosGetTimestampMs() - startTs;
90✔
1401
  if (pHTask != NULL) {
90✔
1402
    STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
78✔
1403
    code = taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
78✔
1404
    if (code != 0) {
78!
UNCOV
1405
      stError("s-task:%s failed to put updateTask into update list", id);
×
1406
    } else {
1407
      stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
78!
1408
              " ms",
1409
              id, vgId, transId, el);
1410
    }
1411
  } else {
1412
    stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
12!
1413
            id, vgId, transId, el);
1414
  }
1415
}
90✔
1416

1417
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
46✔
1418
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
46✔
1419

1420
  taosHashClear(pInfo->pTasks);
46✔
1421

1422
  int32_t prev = pInfo->completeTransId;
46✔
1423
  pInfo->completeTransId = pInfo->activeTransId;
46✔
1424
  pInfo->activeTransId = -1;
46✔
1425
  pInfo->completeTs = taosGetTimestampMs();
46✔
1426

1427
  stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64 ", complete transId:%d->%d, reset active transId",
46!
1428
          pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId);
1429
}
46✔
1430

1431
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
90✔
1432
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
90✔
1433

1434
  if (transId > pInfo->completeTransId) {
90!
1435
    if (pInfo->activeTransId == -1) {
90✔
1436
      taosHashClear(pInfo->pTasks);
46✔
1437
      pInfo->activeTransId = transId;
46✔
1438

1439
      stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId,
46!
1440
             pInfo->completeTransId);
1441
      return true;
46✔
1442
    } else {
1443
      if (pInfo->activeTransId == transId) {
44!
1444
        // do nothing
1445
        return true;
44✔
1446
      } else if (transId < pInfo->activeTransId) {
×
UNCOV
1447
        stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard",
×
1448
                pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
UNCOV
1449
        return false;
×
1450
      } else {  // transId > pInfo->activeTransId
UNCOV
1451
        taosHashClear(pInfo->pTasks);
×
UNCOV
1452
        int32_t prev = pInfo->activeTransId;
×
1453
        pInfo->activeTransId = transId;
×
1454

UNCOV
1455
        stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId,
×
1456
               transId, prev, pInfo->completeTransId);
UNCOV
1457
        return true;
×
1458
      }
1459
    }
1460
  } else if (transId == pInfo->completeTransId) {
×
UNCOV
1461
    stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
×
1462
            pInfo->completeTs);
UNCOV
1463
    return false;
×
1464
  } else {  // pInfo->completeTransId > transId
UNCOV
1465
    stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
×
1466
            pMeta->vgId, pInfo->activeTransId, transId);
UNCOV
1467
    return false;
×
1468
  }
1469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc