• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4113

17 May 2025 06:43AM UTC coverage: 62.054% (-0.8%) from 62.857%
#4113

push

travis-ci

web-flow
Merge pull request #31115 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

154737 of 318088 branches covered (48.65%)

Branch coverage included in aggregate %.

175 of 225 new or added lines in 20 files covered. (77.78%)

5853 existing lines in 216 files now uncovered.

239453 of 317147 relevant lines covered (75.5%)

15121865.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.51
/source/libs/stream/src/streamMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "streamInt.h"
18
#include "tmisce.h"
19
#include "tref.h"
20
#include "tsched.h"
21
#include "tstream.h"
22
#include "ttimer.h"
23
#include "wal.h"
24

25
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
26

27
int32_t streamBackendId = 0;
28
int32_t streamBackendCfWrapperId = 0;
29
int32_t taskDbWrapperId = 0;
30
int32_t streamTaskRefPool = 0;
31

32
static int32_t streamMetaBegin(SStreamMeta* pMeta);
33
static void    streamMetaCloseImpl(void* arg);
34

35
typedef struct {
36
  TdThreadMutex mutex;
37
  SHashObj*     pTable;
38
} SMetaRefMgt;
39

40
SMetaRefMgt gMetaRefMgt;
41

42
int32_t metaRefMgtInit();
43
void    metaRefMgtCleanup();
44

45
static void streamMetaEnvInit() {
2,619✔
46
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
2,619✔
47
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
2,619✔
48
  taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
2,619✔
49

50
  streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
2,619✔
51
  streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
2,619✔
52

53
  int32_t code = metaRefMgtInit();
2,619✔
54
  if (code) {
2,619!
55
    stError("failed to init stream meta mgmt env, start failed");
×
56
    return;
×
57
  }
58

59
  code = streamTimerInit();
2,619✔
60
  if (code) {
2,619!
61
    stError("failed to init stream meta env, start failed");
×
62
  }
63
}
64

65
void streamMetaInit() {
2,621✔
66
  int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
2,621✔
67
  if (code) {
2,621!
68
    stError("failed to init stream Meta model, code:%s", tstrerror(code));
×
69
  }
70
}
2,621✔
71

72
void streamMetaCleanup() {
2,618✔
73
  taosCloseRef(streamBackendId);
2,618✔
74
  taosCloseRef(streamBackendCfWrapperId);
2,618✔
75
  taosCloseRef(streamMetaRefPool);
2,618✔
76
  taosCloseRef(streamTaskRefPool);
2,618✔
77

78
  metaRefMgtCleanup();
2,618✔
79
  streamTimerCleanUp();
2,618✔
80
}
2,618✔
81

82
int32_t metaRefMgtInit() {
2,619✔
83
  int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
2,619✔
84
  if (code) {
2,619!
85
    return code;
×
86
  }
87

88
  if (code == 0) {
2,619!
89
    gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
2,619✔
90
  }
91

92
  if (gMetaRefMgt.pTable == NULL) {
2,619!
93
    return terrno;
×
94
  } else {
95
    return code;
2,619✔
96
  }
97
}
98

99
void metaRefMgtCleanup() {
2,618✔
100
  void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
2,618✔
101
  while (pIter) {
30,429✔
102
    int64_t* p = *(int64_t**)pIter;
27,811✔
103
    taosMemoryFree(p);
27,811!
104
    pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
27,811✔
105
  }
106

107
  taosHashCleanup(gMetaRefMgt.pTable);
2,618✔
108
  streamMutexDestroy(&gMetaRefMgt.mutex);
2,618✔
109
}
2,618✔
110

111
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
49,862✔
112
  int32_t code = 0;
49,862✔
113
  void*   p = NULL;
49,862✔
114

115
  streamMutexLock(&gMetaRefMgt.mutex);
49,862✔
116

117
  p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
49,906✔
118
  if (p == NULL) {
49,906!
119
    code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
49,906✔
120
    if (code) {
49,906!
121
      stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid,
×
122
              tstrerror(code));
123
      return code;
×
124
    } else {  // not
125
              //      stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
126
    }
127
  } else {
128
    stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid);
×
129
  }
130

131
  streamMutexUnlock(&gMetaRefMgt.mutex);
49,906✔
132
  return code;
49,906✔
133
}
134

135
void metaRefMgtRemove(int64_t* pRefId) {
22,087✔
136
  streamMutexLock(&gMetaRefMgt.mutex);
22,087✔
137

138
  int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
22,087✔
139
  taosMemoryFree(pRefId);
22,087!
140
  streamMutexUnlock(&gMetaRefMgt.mutex);
22,087✔
141
}
22,087✔
142

143
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
13,715✔
144
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
13,715!
145
    stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
×
146
    return -1;
×
147
  }
148

149
  if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
13,718!
150
    stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
×
151
    return -1;
×
152
  }
153

154
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
13,718!
155
    stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
×
156
    return -1;
×
157
  }
158

159
  return 0;
13,719✔
160
}
161

162
//
163
// impl later
164
//
165
enum STREAM_STATE_VER {
166
  STREAM_STATA_NO_COMPATIBLE,
167
  STREAM_STATA_COMPATIBLE,
168
  STREAM_STATA_NEED_CONVERT,
169
};
170

171
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
13,718✔
172
  int8_t  ret = STREAM_STATA_COMPATIBLE;
13,718✔
173
  TBC*    pCur = NULL;
13,718✔
174
  int32_t code = 0;
13,718✔
175
  void*   pKey = NULL;
13,718✔
176
  int32_t kLen = 0;
13,718✔
177
  void*   pVal = NULL;
13,718✔
178
  int32_t vLen = 0;
13,718✔
179

180
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {  // no task info, no stream
13,718!
181
    return ret;
×
182
  }
183

184
  code = tdbTbcMoveToFirst(pCur);
13,718✔
185
  if (code) {
13,718!
186
    stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
×
187
            tstrerror(code));
188
    tdbTbcClose(pCur);
×
189
    return ret;
×
190
  }
191

192
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
13,734✔
193
    if (pVal == NULL || vLen == 0) {
150!
194
      break;
195
    }
196

197
    SDecoder        decoder;
198
    SCheckpointInfo info;
199
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
150✔
200
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
150✔
201
      tDecoderClear(&decoder);
16✔
202
      continue;
16✔
203
    }
204

205
    if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
134!
206
      ret = STREAM_STATA_NO_COMPATIBLE;
×
207
    } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
134!
208
      ret = STREAM_STATA_NEED_CONVERT;
134✔
209
    }
210

211
    tDecoderClear(&decoder);
134✔
212
    break;
134✔
213
  }
214

215
  tdbFree(pKey);
13,717✔
216
  tdbFree(pVal);
13,718✔
217
  tdbTbcClose(pCur);
13,720✔
218
  return ret;
13,720✔
219
}
220

221
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
134✔
222
  int32_t          code = 0;
134✔
223
  SBackendWrapper* pBackend = NULL;
134✔
224
  int64_t          chkpId = streamMetaGetLatestCheckpointId(pMeta);
134✔
225

226
  terrno = 0;
134✔
227
  bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
134✔
228
  if (exist == false) {
134!
229
    code = terrno;
134✔
230
    return code;
134✔
231
  }
232

233
  code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
×
234
  if (code) {
×
235
    return code;
×
236
  }
237

238
  void* pIter = taosHashIterate(pBackend->cfInst, NULL);
×
239
  while (pIter) {
×
240
    void* key = taosHashGetKey(pIter, NULL);
×
241
    code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
×
242
    if (code != 0) {
×
243
      stError("vgId:%d failed to cvt data", pMeta->vgId);
×
244
      goto _EXIT;
×
245
    }
246

247
    pIter = taosHashIterate(pBackend->cfInst, pIter);
×
248
  }
249

250
_EXIT:
×
251
  streamBackendCleanup((void*)pBackend);
×
252

253
  if (code == 0) {
×
254
    int32_t len = strlen(pMeta->path) + 32;
×
255
    char*   state = taosMemoryCalloc(1, len);
×
256
    if (state != NULL) {
×
257
      (void)snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
×
258
      taosRemoveDir(state);
×
259
      taosMemoryFree(state);
×
260
    } else {
261
      stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
×
262
    }
263
  }
264

265
  return code;
×
266
}
267

268
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
13,718✔
269
  int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
13,718✔
270
  if (compatible == STREAM_STATA_COMPATIBLE) {
13,719✔
271
    return 0;
13,585✔
272
  } else if (compatible == STREAM_STATA_NEED_CONVERT) {
134!
273
    stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
134!
274
    return streamMetaCvtDbFormat(pMeta);
134✔
275
  } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
×
276
    stError(
×
277
        "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
278
        "manually",
279
        pMeta->vgId, tsDataDir);
280

281
    return -1;
×
282
  }
283

284
  return 0;
×
285
}
286

287
int8_t streamTaskShouldRecalated(SStreamTask* pTask) { return pTask->info.fillHistory == 2 ? 1 : 0; }
×
288

289
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key, uint8_t recalated) {
6,383✔
290
  int32_t code = 0;
6,383✔
291
  int64_t chkpId = pTask->chkInfo.checkpointId;
6,383✔
292

293
  // int8_t recalated = streamTaskShouldRecalated(pTask);
294

295
  streamMutexLock(&pMeta->backendMutex);
6,383✔
296
  // streamId--taskId
297
  void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
6,386✔
298
  if ((ppBackend != NULL) && (*ppBackend != NULL)) {
6,385!
299
    void* p = taskDbAddRef(*ppBackend);
1,902✔
300
    if (p == NULL) {
1,901✔
301
      stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
3!
302
      streamMutexUnlock(&pMeta->backendMutex);
3✔
303
      return TSDB_CODE_FAILED;
×
304
    }
305

306
    STaskDbWrapper* pBackend = *ppBackend;
1,898✔
307
    pBackend->pMeta = pMeta;
1,898✔
308
    if (recalated) {
1,898!
309
      pTask->pRecalBackend = pBackend;
×
310
    } else {
311
      pTask->pBackend = pBackend;
1,898✔
312
    }
313

314
    streamMutexUnlock(&pMeta->backendMutex);
1,898✔
315
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
1,903✔
316
    return 0;
1,903✔
317
  }
318

319
  STaskDbWrapper* pBackend = NULL;
4,483✔
320
  int64_t         processVer = -1;
4,483✔
321
  while (1) {
322
    code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
4,483✔
323
    if (code == 0) {
4,483!
324
      break;
4,483✔
325
    }
326

327
    streamMutexUnlock(&pMeta->backendMutex);
×
328
    taosMsleep(1000);
×
329

330
    stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
×
331
    streamMutexLock(&pMeta->backendMutex);
×
332
  }
333

334
  int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
4,483✔
335
  if (recalated) {
4,483!
336
    pTask->pRecalBackend = pBackend;
×
337
  } else {
338
    pTask->pBackend = pBackend;
4,483✔
339
  }
340
  pBackend->refId = tref;
4,483✔
341
  pBackend->pTask = pTask;
4,483✔
342
  pBackend->pMeta = pMeta;
4,483✔
343

344
  if (processVer != -1) {
4,483✔
345
    if (pTask->chkInfo.processedVer != processVer) {
20!
346
      stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
×
347
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
348
      pTask->chkInfo.processedVer = processVer;
×
349
      pTask->chkInfo.checkpointVer = processVer;
×
350
      pTask->chkInfo.nextProcessVer = processVer + 1;
×
351
    } else {
352
      stInfo("s-task:%s vgId:%d processedVer:%" PRId64
20!
353
             " in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
354
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
355
    }
356
  }
357

358
  code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
4,483✔
359
  if (code) {
4,483!
360
    stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
×
361
  }
362
  streamMutexUnlock(&pMeta->backendMutex);
4,483✔
363

364
  if (recalated) {
4,483!
365
    stDebug("s-task:0x%x set recalated backend %p", pTask->id.taskId, pBackend);
×
366
  } else {
367
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
4,483✔
368
  }
369
  return 0;
4,483✔
370
}
371

372
void streamMetaRemoveDB(void* arg, char* key) {
4,432✔
373
  if (arg == NULL || key == NULL) return;
4,432!
374

375
  SStreamMeta* pMeta = arg;
4,433✔
376
  streamMutexLock(&pMeta->backendMutex);
4,433✔
377
  int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
4,433✔
378
  if (code) {
4,433!
379
    stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
×
380
  }
381

382
  streamMutexUnlock(&pMeta->backendMutex);
4,433✔
383
}
384

385
int32_t streamMetaUpdateInfoInit(STaskUpdateInfo* pInfo) {
13,719✔
386
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
13,719✔
387

388
  pInfo->pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
13,719✔
389
  if (pInfo->pTasks == NULL) {
13,720!
390
    return terrno;
×
391
  }
392

393
  return TSDB_CODE_SUCCESS;
13,720✔
394
}
395

396
void streamMetaUpdateInfoCleanup(STaskUpdateInfo* pInfo) {
13,710✔
397
  taosHashCleanup(pInfo->pTasks);
13,710✔
398
  pInfo->pTasks = NULL;
13,711✔
399
}
13,711✔
400

401
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
13,595✔
402
                       int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
403
  QRY_PARAM_CHECK(p);
13,595!
404
  int32_t code = 0;
13,595✔
405
  int32_t lino = 0;
13,595✔
406

407
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
13,595!
408
  if (pMeta == NULL) {
13,718!
UNCOV
409
    stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
×
UNCOV
410
    return terrno;
×
411
  }
412

413
  int32_t len = strlen(path) + 64;
13,718✔
414
  char*   tpath = taosMemoryCalloc(1, len);
13,718!
415
  TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
13,719!
416

417
  (void)snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream");
13,719✔
418
  pMeta->path = tpath;
13,719✔
419

420
  code = streamMetaOpenTdb(pMeta);
13,719✔
421
  TSDB_CHECK_CODE(code, lino, _err);
13,719!
422

423
  if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
13,719!
UNCOV
424
    stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
×
425
            tstrerror(terrno));
UNCOV
426
    TSDB_CHECK_CODE(code, lino, _err);
×
427
  }
428

429
  // set the attribute when running on Linux OS
430
  TdThreadRwlockAttr attr;
431
  code = taosThreadRwlockAttrInit(&attr);
13,718✔
432
  TSDB_CHECK_CODE(code, lino, _err);
13,718!
433

434
#ifdef LINUX
435
  code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
13,718✔
436
  TSDB_CHECK_CODE(code, lino, _err);
13,719!
437
#endif
438

439
  code = taosThreadRwlockInit(&pMeta->lock, &attr);
13,719✔
440
  TSDB_CHECK_CODE(code, lino, _err);
13,717!
441

442
  code = taosThreadRwlockAttrDestroy(&attr);
13,717✔
443
  TSDB_CHECK_CODE(code, lino, _err);
13,719!
444

445
  if ((code = streamMetaBegin(pMeta) < 0)) {
13,719!
UNCOV
446
    stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
×
UNCOV
447
    goto _err;
×
448
  }
449

450
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
13,719✔
451
  pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
13,719✔
452
  TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
13,720!
453

454
  code = streamMetaUpdateInfoInit(&pMeta->updateInfo);
13,720✔
455
  TSDB_CHECK_CODE(code, lino, _err);
13,720!
456

457
  code = streamMetaInitStartInfo(&pMeta->startInfo);
13,720✔
458
  TSDB_CHECK_CODE(code, lino, _err);
13,719!
459

460
  // task list
461
  pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
13,719✔
462
  TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
13,720✔
463

464
  pMeta->scanInfo.scanSentinel = 0;
13,719✔
465
  pMeta->scanInfo.lastScanTs = 0;
13,719✔
466
  pMeta->scanInfo.tickCounter = 0;
13,719✔
467

468
  pMeta->vgId = vgId;
13,719✔
469
  pMeta->ahandle = ahandle;
13,719✔
470
  pMeta->buildTaskFn = buildTaskFn;
13,719✔
471
  pMeta->expandTaskFn = expandTaskFn;
13,719✔
472
  pMeta->stage = stage;
13,719✔
473
  pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
13,719✔
474
  pMeta->updateInfo.activeTransId = -1;
13,719✔
475
  pMeta->updateInfo.completeTransId = -1;
13,719✔
476

477
  pMeta->startInfo.completeFn = fn;
13,719✔
478
  pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
13,719✔
479
  TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
13,720!
480

481
  pMeta->numOfPausedTasks = 0;
13,720✔
482
  pMeta->numOfStreamTasks = 0;
13,720✔
483
  pMeta->closeFlag = false;
13,720✔
484

485
  stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
13,720!
486

487
  code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
13,720✔
488
  TSDB_CHECK_CODE(code, lino, _err);
13,719!
489

490
  code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
13,719✔
491
  TSDB_CHECK_CODE(code, lino, _err);
13,720!
492

493
  // add refId at the end of initialization function
494
  pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
13,720✔
495

496
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
13,719!
497
  TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
13,720!
498

499
  memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
13,720✔
500

501
  code = metaRefMgtAdd(pMeta->vgId, pRid);
13,720✔
502
  TSDB_CHECK_CODE(code, lino, _err);
13,720!
503

504
  code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
13,720✔
505

506
  TSDB_CHECK_CODE(code, lino, _err);
13,720!
507

508
  *p = pMeta;
13,720✔
509
  return code;
13,720✔
510

511
_err:
×
UNCOV
512
  taosMemoryFree(pMeta->path);
×
513
  if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
×
514
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
×
UNCOV
515
  if (pMeta->pTaskDb) {
×
UNCOV
516
    tdbTbClose(pMeta->pTaskDb);
×
517
    pMeta->pTaskDb = NULL;
×
518
  }
519
  if (pMeta->pCheckpointDb) {
×
520
    tdbTbClose(pMeta->pCheckpointDb);
×
521
  }
UNCOV
522
  if (pMeta->db) {
×
523
    tdbClose(pMeta->db);
×
524
  }
525

526
  if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
×
527
  if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
×
UNCOV
528
  if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
×
UNCOV
529
  if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
×
UNCOV
530
  if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
×
531

UNCOV
532
  if (pMeta->startInfo.pStagesList) taosArrayDestroy(pMeta->startInfo.pStagesList);
×
UNCOV
533
  taosMemoryFree(pMeta);
×
534

UNCOV
535
  stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
×
UNCOV
536
  return code;
×
537
}
538

539
// todo refactor: the lock shoud be restricted in one function
540
#ifdef BUILD_NO_CALL
541
void streamMetaInitBackend(SStreamMeta* pMeta) {
542
  pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
543
  if (pMeta->streamBackend == NULL) {
544
    while (1) {
545
      streamMetaWLock(pMeta);
546
      pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
547
      if (pMeta->streamBackend != NULL) {
548
        break;
549
      }
550

551
      streamMetaWUnLock(pMeta);
552
      stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
553
      taosMsleep(100);
554
    }
555
  }
556

557
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
558
  streamBackendLoadCheckpointInfo(pMeta);
559
}
560
#endif
561

562
void streamMetaClear(SStreamMeta* pMeta) {
13,733✔
563
  // remove all existed tasks in this vnode
564
  int64_t st = taosGetTimestampMs();
13,733✔
565
  void*   pIter = NULL;
13,733✔
566

567
  while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
16,473✔
568
    int64_t      refId = *(int64_t*)pIter;
2,740✔
569
    SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
2,740✔
570
    if (p == NULL) {
2,740✔
571
      continue;
2,381✔
572
    }
573

574
    // release the ref by timer
575
    if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) {  // one more ref in timer
359!
576
      stDebug("s-task:%s stop schedTimer", p->id.idStr);
2!
577
      streamTmrStop(p->schedInfo.pDelayTimer);
2✔
578
      p->info.delaySchedParam = 0;
2✔
579
    }
580

581
    int32_t code = taosRemoveRef(streamTaskRefPool, refId);
359✔
582
    if (code) {
359!
UNCOV
583
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
584
    }
585

586
    code = taosReleaseRef(streamTaskRefPool, refId);
359✔
587
    if (code) {
359!
588
      stError("vgId:%d failed to release refId:%" PRId64, pMeta->vgId, refId);
×
589
    }
590
  }
591

592
  int64_t et = taosGetTimestampMs();
13,733✔
593
  stDebug("vgId:%d clear task map, elapsed time:%.2fs", pMeta->vgId, (et - st)/1000.0);
13,733✔
594

595
  if (pMeta->streamBackendRid != 0) {
13,733!
UNCOV
596
    int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
×
UNCOV
597
    if (code) {
×
UNCOV
598
      stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
×
599
    }
600
  }
601

602
  int64_t et1 = taosGetTimestampMs();
13,733✔
603
  stDebug("vgId:%d clear backend completed, elapsed time:%.2fs", pMeta->vgId, (et1 - et)/1000.0);
13,733✔
604

605
  taosHashClear(pMeta->pTasksMap);
13,733✔
606

607
  taosArrayClear(pMeta->pTaskList);
13,733✔
608
  taosArrayClear(pMeta->chkpSaved);
13,733✔
609
  taosArrayClear(pMeta->chkpInUse);
13,733✔
610

611
  pMeta->numOfStreamTasks = 0;
13,733✔
612
  pMeta->numOfPausedTasks = 0;
13,733✔
613

614
  // NOTE: the willrestart/starting flag can NOT be cleared
615
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
13,733✔
616
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
13,733✔
617
  taosArrayClear(pMeta->startInfo.pStagesList);
13,733✔
618
  taosArrayClear(pMeta->startInfo.pRecvChkptIdTasks);
13,733✔
619

620
  pMeta->startInfo.readyTs = 0;
13,733✔
621
  pMeta->startInfo.elapsedTime = 0;
13,733✔
622
  pMeta->startInfo.startTs = 0;
13,733✔
623
}
13,733✔
624

625
void streamMetaClose(SStreamMeta* pMeta) {
13,712✔
626
  if (pMeta == NULL) {
13,712✔
627
    return;
1✔
628
  }
629

630
  stDebug("vgId:%d start to close stream meta", pMeta->vgId);
13,711✔
631
  int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
13,711✔
632
  if (code) {
13,712!
UNCOV
633
    stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
×
634
  }
635
}
636

637
void streamMetaCloseImpl(void* arg) {
13,712✔
638
  SStreamMeta* pMeta = arg;
13,712✔
639
  if (pMeta == NULL) {
13,712!
UNCOV
640
    return;
×
641
  }
642

643
  int32_t code = 0;
13,712✔
644
  int32_t vgId = pMeta->vgId;
13,712✔
645
  stDebug("vgId:%d start to do-close stream meta", vgId);
13,712✔
646

647
  streamMetaWLock(pMeta);
13,712✔
648
  streamMetaClear(pMeta);
13,712✔
649
  streamMetaWUnLock(pMeta);
13,712✔
650

651
  // already log the error, ignore here
652
  tdbAbort(pMeta->db, pMeta->txn);
13,712✔
653
  tdbTbClose(pMeta->pTaskDb);
13,711✔
654
  tdbTbClose(pMeta->pCheckpointDb);
13,711✔
655
  tdbClose(pMeta->db);
13,712✔
656

657
  taosArrayDestroy(pMeta->pTaskList);
13,711✔
658
  taosArrayDestroy(pMeta->chkpSaved);
13,711✔
659
  taosArrayDestroy(pMeta->chkpInUse);
13,712✔
660

661
  taosHashCleanup(pMeta->pTasksMap);
13,712✔
662
  taosHashCleanup(pMeta->pTaskDbUnique);
13,712✔
663

664
  streamMetaUpdateInfoCleanup(&pMeta->updateInfo);
13,712✔
665
  streamMetaClearStartInfo(&pMeta->startInfo);
13,711✔
666

667
  destroyMetaHbInfo(pMeta->pHbInfo);
13,712✔
668
  pMeta->pHbInfo = NULL;
13,712✔
669

670
  taosMemoryFree(pMeta->path);
13,712!
671
  streamMutexDestroy(&pMeta->backendMutex);
13,710✔
672

673
  bkdMgtDestroy(pMeta->bkdChkptMgt);
13,711✔
674

675
  pMeta->role = NODE_ROLE_UNINIT;
13,709✔
676
  code = taosThreadRwlockDestroy(&pMeta->lock);
13,709✔
677
  if (code) {
13,712!
UNCOV
678
    stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
×
679
  }
680

681
  taosMemoryFree(pMeta);
13,712!
682
  stDebug("vgId:%d end to close stream meta", vgId);
13,710✔
683
}
684

685
// todo let's check the status for each task
686
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
14,225✔
687
  int32_t vgId = pTask->pMeta->vgId;
14,225✔
688
  void*   buf = NULL;
14,225✔
689
  int32_t len;
690
  int32_t code;
691
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
14,225!
692
  if (code < 0) {
14,215!
UNCOV
693
    return -1;
×
694
  }
695

696
  buf = taosMemoryCalloc(1, len);
14,215!
697
  if (buf == NULL) {
14,217!
UNCOV
698
    return terrno;
×
699
  }
700

701
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
14,217✔
702
    pTask->ver = SSTREAM_TASK_VER;
28✔
703
  }
704

705
  SEncoder encoder = {0};
14,217✔
706
  tEncoderInit(&encoder, buf, len);
14,217✔
707
  code = tEncodeStreamTask(&encoder, pTask);
14,214✔
708
  tEncoderClear(&encoder);
14,218✔
709

710
  if (code == -1) {
14,215!
711
    stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
UNCOV
712
    return TSDB_CODE_INVALID_MSG;
×
713
  }
714

715
  int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
14,215✔
716

717
  code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
14,215✔
718
  if (code != TSDB_CODE_SUCCESS) {
14,208!
UNCOV
719
    code = terrno;
×
UNCOV
720
    stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
×
721
            vgId, pTask->id.refId, tstrerror(code));
722

UNCOV
723
    int64_t refId = pTask->id.refId;
×
UNCOV
724
    int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
×
UNCOV
725
    if (ret != 0) {
×
UNCOV
726
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId);
×
727
    }
728
  } else {
729
    stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
14,208✔
730
  }
731

732
  taosMemoryFree(buf);
14,208!
733
  return code;
14,223✔
734
}
735

736
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
9,692✔
737
  int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
9,692✔
738
  int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
9,692✔
739
  if (code != 0) {
9,700!
UNCOV
740
    stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
×
741
            tstrerror(terrno));
742
  } else {
743
    stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
9,700✔
744
  }
745

746
  return code;
9,697✔
747
}
748

749
// add to the ready tasks hash map, not the restored tasks hash map
750
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
11,825✔
751
  *pAdded = false;
11,825✔
752

753
  int32_t code = 0;
11,825✔
754
  int64_t refId = 0;
11,825✔
755
  STaskId id = streamTaskGetTaskId(pTask);
11,825✔
756
  void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
11,823✔
757

758
  if (p != NULL) {
11,832!
UNCOV
759
    stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
×
UNCOV
760
    tFreeStreamTask(pTask);
×
UNCOV
761
    return code;
×
762
  }
763

764
  if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
11,832!
UNCOV
765
    tFreeStreamTask(pTask);
×
UNCOV
766
    return code;
×
767
  }
768

769
  p = taosArrayPush(pMeta->pTaskList, &pTask->id);
11,843✔
770
  if (p == NULL) {
11,843!
771
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
UNCOV
772
    tFreeStreamTask(pTask);
×
773
    return terrno;
×
774
  }
775

776
  pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
11,843✔
777
  code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
11,843✔
778
  if (code) {
11,843!
UNCOV
779
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
UNCOV
780
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
781

782
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
UNCOV
783
    if (ret != 0) {
×
784
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
785
    }
786
    return code;
×
787
  }
788

789
  if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
11,843!
UNCOV
790
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
UNCOV
791
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
792

793
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
UNCOV
794
    if (ret) {
×
795
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
796
    }
797
    return code;
×
798
  }
799

800
  if ((code = streamMetaCommit(pMeta)) != 0) {
11,838!
UNCOV
801
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
UNCOV
802
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
803

UNCOV
804
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
UNCOV
805
    if (ret) {
×
UNCOV
806
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
807
    }
808

UNCOV
809
    return code;
×
810
  }
811

812
  if (pTask->info.fillHistory == 0) {
11,843✔
813
    int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
8,158✔
814
  }
815

816
  *pAdded = true;
11,843✔
817
  return code;
11,843✔
818
}
819

820
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
212,214✔
821
  int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
212,214✔
822
  int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
212,214✔
823
  if (sizeInList != size) {
212,217!
UNCOV
824
    stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
×
825
  }
826

827
  return size;
212,219✔
828
}
829

830
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
576,394✔
831
  QRY_PARAM_CHECK(pTask);
576,394!
832
  STaskId  id = {.streamId = streamId, .taskId = taskId};
576,394✔
833
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
576,394✔
834
  if (pTaskRefId == NULL) {
576,327✔
835
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
1,583✔
836
  }
837

838
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
574,744✔
839
  if (p == NULL) {
575,296✔
840
    stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId);
2,299✔
841
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
2,285✔
842
  }
843

844
  if (p->id.refId != *pTaskRefId) {
572,997!
UNCOV
845
    stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
×
846
            p->id.refId);
UNCOV
847
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
UNCOV
848
    if (ret) {
×
UNCOV
849
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
850
    }
851

UNCOV
852
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
853
  }
854

855
  if (streamTaskShouldStop(p)) {
572,997✔
856
    stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
14,213✔
857
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
14,225✔
858
    if (ret) {
14,225!
UNCOV
859
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
860
    }
861
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
14,225✔
862
  }
863

864
  stTrace("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
558,588✔
865
  *pTask = p;
558,562✔
866
  return TSDB_CODE_SUCCESS;
558,562✔
867
}
868

869
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
97,597✔
870
  QRY_PARAM_CHECK(pTask);
97,597!
871
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
97,597✔
872

873
  if (pTaskRefId == NULL) {
97,627✔
874
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
1,944✔
875
  }
876

877
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
95,683✔
878
  if (p == NULL) {
95,701✔
879
    stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
3!
880
            *pTaskRefId);
UNCOV
881
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
882
  }
883

884
  if (p->id.refId != *pTaskRefId) {
95,698!
UNCOV
885
    stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
×
886
            *pTaskRefId, p->id.refId);
UNCOV
887
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
UNCOV
888
    if (ret) {
×
UNCOV
889
      stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
×
890
    }
891

UNCOV
892
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
893
  }
894

895
  stTrace("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
95,698✔
896
  *pTask = p;
95,694✔
897
  return TSDB_CODE_SUCCESS;
95,694✔
898
}
899

900
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
569,728✔
901
  streamMetaRLock(pMeta);
569,728✔
902
  int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
570,244✔
903
  streamMetaRUnLock(pMeta);
570,231✔
904
  return code;
570,270✔
905
}
906

907
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
806,169✔
908
  if (pTask == NULL) {
806,169✔
909
    return;
12✔
910
  }
911

912
  int32_t taskId = pTask->id.taskId;
806,157✔
913
  int64_t refId = pTask->id.refId;
806,157✔
914
  stTrace("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
806,157✔
915
  int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
806,156✔
916
  if (ret) {
806,460!
UNCOV
917
    stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
×
918
  }
919
}
920

921
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
9,710✔
922
  bool remove = false;
9,710✔
923
  for (int32_t i = 0; i < num; ++i) {
21,749✔
924
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
21,748✔
925
    if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
21,749✔
926
      taosArrayRemove(pTaskList, i);
9,710✔
927
      remove = true;
9,699✔
928
      break;
9,699✔
929
    }
930
  }
931

932
  if (!remove) {
9,700!
UNCOV
933
    stError("s-task:0x%x not in meta task list, internal error", id->taskId);
×
934
  }
935
}
9,700✔
936

937
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
9,686✔
938
  int32_t code = 0;
9,686✔
939

940
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
9,686✔
941
    code = streamTaskSendCheckpointSourceRsp(pTask);
4,816✔
942
    if (code) {
4,816!
943
      stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
×
944
              tstrerror(code));
945
    }
946
  }
947

948
  // let's kill the query procedure within stream, to end it ASAP.
949
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
9,692✔
950
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
4,892✔
951
    if (code != TSDB_CODE_SUCCESS) {
4,892!
UNCOV
952
      stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
953
    }
954
  }
955
  return code;
9,689✔
956
}
957

958
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
9,737✔
959
  SStreamTask* pTask = NULL;
9,737✔
960
  int32_t      vgId = pMeta->vgId;
9,737✔
961
  int32_t      code = 0;
9,737✔
962
  STaskId      id = {.streamId = streamId, .taskId = taskId};
9,737✔
963

964
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
9,737✔
965
  if (code == 0) {
9,749✔
966
    // desc the paused task counter
967
    if (streamTaskShouldPause(pTask)) {
9,711!
UNCOV
968
      int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
×
UNCOV
969
      stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
×
970
    }
971

972
    // handle the dropping event
973
    code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
9,712✔
974
    if (code) {
9,693!
UNCOV
975
      stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
×
976
    }
977

978
    stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
9,696✔
979

980
    // it is a fill-history task, remove the related stream task's id that points to it
981
    if (pTask->info.fillHistory == 0) {
9,696✔
982
      int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
6,040✔
983
    }
984

985
    code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
9,704✔
986
    doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
9,710✔
987
    code = streamMetaRemoveTaskInMeta(pMeta, &id);
9,701✔
988
    if (code) {
9,693!
UNCOV
989
      stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
×
990
    }
991

992
    int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
9,693✔
993
    int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
9,699✔
994
    if (sizeInList != size) {
9,700!
UNCOV
995
      stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
×
996
    }
997

998
    if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
9,700✔
999
      stDebug("s-task:%s stop schedTimer", pTask->id.idStr);
539✔
1000
      streamTmrStop(pTask->schedInfo.pDelayTimer);
539✔
1001
      pTask->info.delaySchedParam = 0;
539✔
1002
    }
1003

1004
    int64_t refId = pTask->id.refId;
9,700✔
1005
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
9,700✔
1006
    if (ret != 0) {
9,707!
UNCOV
1007
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1008
    }
1009

1010
    streamMetaReleaseTask(pMeta, pTask);
9,707✔
1011
  } else {
1012
    stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
38!
1013
  }
1014

1015
  return 0;
9,743✔
1016
}
1017

UNCOV
1018
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
×
UNCOV
1019
  SStreamTask* pTask = NULL;
×
UNCOV
1020
  int32_t      code = 0;
×
UNCOV
1021
  int32_t      vgId = pMeta->vgId;
×
UNCOV
1022
  int32_t      numOfTasks = 0;
×
1023

UNCOV
1024
  streamMetaWLock(pMeta);
×
1025

1026
//  code = streamMetaUnregisterTask(pMeta, streamId, taskId);
1027
//  numOfTasks = streamMetaGetNumOfTasks(pMeta);
1028
//  if (code) {
1029
//    stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1030
//  }
1031
//
1032
//  code = streamMetaCommit(pMeta);
1033
//  if (code) {
1034
//    stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1035
//  } else {
1036
//    stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
1037
//  }
1038

UNCOV
1039
  streamMetaWUnLock(pMeta);
×
1040

UNCOV
1041
  return code;
×
1042
}
1043

1044
int32_t streamMetaBegin(SStreamMeta* pMeta) {
13,714✔
1045
  streamMetaWLock(pMeta);
13,714✔
1046
  int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
13,720✔
1047
                          TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1048
  if (code) {
13,719!
1049
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1050
  }
1051
  streamMetaWUnLock(pMeta);
13,719✔
1052
  return code;
13,720✔
1053
}
1054

1055
int32_t streamMetaCommit(SStreamMeta* pMeta) {
34,071✔
1056
  int32_t code = tdbCommit(pMeta->db, pMeta->txn);
34,071✔
1057
  if (code != 0) {
34,101!
UNCOV
1058
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1059
    stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1060
            pMeta->fatalInfo.line);
1061
  }
1062

1063
  code = tdbPostCommit(pMeta->db, pMeta->txn);
34,101✔
1064
  if (code != 0) {
34,102!
1065
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1066
    stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1067
            pMeta->fatalInfo.line);
UNCOV
1068
    return code;
×
1069
  }
1070

1071
  code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
34,102✔
1072
                  TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1073
  if (code != 0) {
34,102!
UNCOV
1074
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
UNCOV
1075
    stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
×
1076
  } else {
1077
    stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
34,102✔
1078
  }
1079

1080
  return code;
34,102✔
1081
}
1082

1083
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
178✔
1084
  int64_t checkpointId = 0;
178✔
1085
  int32_t code = 0;
178✔
1086

1087
  TBC* pCur = NULL;
178✔
1088
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
178!
UNCOV
1089
    stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
UNCOV
1090
    return checkpointId;
×
1091
  }
1092

1093
  void*    pKey = NULL;
178✔
1094
  int32_t  kLen = 0;
178✔
1095
  void*    pVal = NULL;
178✔
1096
  int32_t  vLen = 0;
178✔
1097
  SDecoder decoder;
1098

1099
  code = tdbTbcMoveToFirst(pCur);
178✔
1100
  if (code) {
178!
UNCOV
1101
    stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
UNCOV
1102
    tdbTbcClose(pCur);
×
UNCOV
1103
    return checkpointId;
×
1104
  }
1105

1106
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
756✔
1107
    if (pVal == NULL || vLen == 0) {
577!
1108
      break;
1109
    }
1110
    SCheckpointInfo info;
1111
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
577✔
1112
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
577✔
1113
      tDecoderClear(&decoder);
32✔
1114
      continue;
32✔
1115
    }
1116
    tDecoderClear(&decoder);
546✔
1117

1118
    checkpointId = TMAX(checkpointId, info.checkpointId);
546✔
1119
  }
1120

1121
  stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
177✔
1122

1123
  tdbFree(pKey);
177✔
1124
  tdbFree(pVal);
178✔
1125

1126
  tdbTbcClose(pCur);
178✔
1127
  return checkpointId;
178✔
1128
}
1129

1130
static void dropHistoryTaskIfNoStreamTask(SStreamMeta* pMeta, SArray*  pRecycleList) {
13,736✔
1131
  int32_t i = 0;
13,736✔
1132
  while (i < taosArrayGetSize(pMeta->pTaskList)) {
14,347✔
1133
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
611✔
1134
    if (pTaskId == NULL) {
611!
1135
      i++;
×
UNCOV
1136
      continue;
×
1137
    }
1138
    SStreamTask* task = taosAcquireRef(streamTaskRefPool, pTaskId->refId);
611✔
1139
    if (task != NULL && task->info.fillHistory == 1) {
611!
1140
      if (taosHashGet(pMeta->pTasksMap, &task->streamTaskId, sizeof(STaskId)) == NULL &&
×
1141
        task->status.taskStatus != TASK_STATUS__DROPPING) {
×
1142
        STaskId id = streamTaskGetTaskId(task);
×
UNCOV
1143
        if (taosArrayPush(pRecycleList, &id) == NULL) {
×
1144
          stError("%s s-task:0x%x failed to add into pRecycleList list due to:%d", __FUNCTION__, task->id.taskId, terrno);
×
1145
        } else {
UNCOV
1146
          int32_t total = taosArrayGetSize(pRecycleList);
×
1147
          stInfo("%s s-task:0x%x is already dropped, add into recycle list, total:%d", __FUNCTION__, task->id.taskId, total);
×
1148
        }
UNCOV
1149
        int32_t code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(STaskId));
×
1150
        if (code == 0) {
×
UNCOV
1151
          taosArrayRemove(pMeta->pTaskList, i);
×
1152
        } else {
UNCOV
1153
          i++;
×
1154
          stError("%s s-task:0x%x failed to remove task from taskmap, code:%d", __FUNCTION__, task->id.taskId, code);
×
1155
        }
UNCOV
1156
        if (taosReleaseRef(streamTaskRefPool, pTaskId->refId) != 0) {
×
UNCOV
1157
          stError("%s s-task:0x%x failed to release refId:%" PRId64, __FUNCTION__, task->id.taskId, pTaskId->refId);
×
1158
        }
UNCOV
1159
        continue;
×
1160
      }
1161
    }
1162
    if (task != NULL && taosReleaseRef(streamTaskRefPool, pTaskId->refId) != 0) {
611!
UNCOV
1163
      stError("%s s-task:0x%x failed to release refId:%" PRId64, __FUNCTION__, task->id.taskId, pTaskId->refId);
×
1164
    }
1165
    i++;
611✔
1166
  }
1167
}
13,736✔
1168

1169
// not allowed to return error code
1170
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
13,736✔
1171
  TBC*     pCur = NULL;
13,736✔
1172
  void*    pKey = NULL;
13,736✔
1173
  int32_t  kLen = 0;
13,736✔
1174
  void*    pVal = NULL;
13,736✔
1175
  int32_t  vLen = 0;
13,736✔
1176
  SDecoder decoder;
1177
  int32_t  vgId = 0;
13,736✔
1178
  int32_t  code = 0;
13,736✔
1179
  SArray*  pRecycleList = NULL;
13,736✔
1180

1181
  if (pMeta == NULL) {
13,736!
UNCOV
1182
    return;
×
1183
  }
1184

1185
  vgId = pMeta->vgId;
13,736✔
1186
  pRecycleList = taosArrayInit(4, sizeof(STaskId));
13,736✔
1187
  if (pRecycleList == NULL) {
13,736!
1188
    stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
×
1189
    return;
×
1190
  }
1191

1192
  stInfo("vgId:%d load stream tasks from meta files", vgId);
13,736!
1193

1194
  code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
13,736✔
1195
  if (code != TSDB_CODE_SUCCESS) {
13,736!
1196
    stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1197
    taosArrayDestroy(pRecycleList);
×
UNCOV
1198
    return;
×
1199
  }
1200

1201
  code = tdbTbcMoveToFirst(pCur);
13,736✔
1202
  if (code) {
13,736!
UNCOV
1203
    stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
UNCOV
1204
    taosArrayDestroy(pRecycleList);
×
UNCOV
1205
    tdbTbcClose(pCur);
×
UNCOV
1206
    return;
×
1207
  }
1208

1209
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
14,347✔
1210
    if (pVal == NULL || vLen == 0) {
618!
1211
      break;
1212
    }
1213

1214
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
618!
1215
    if (pTask == NULL) {
619!
UNCOV
1216
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
1217
      stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
×
UNCOV
1218
      break;
×
1219
    }
1220

1221
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
619✔
1222
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
619✔
1223
      tDecoderClear(&decoder);
8✔
1224
      tFreeStreamTask(pTask);
8✔
1225
      stError(
8!
1226
          "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
1227
          "stream manually",
1228
          vgId, tsDataDir);
1229
      break;
8✔
1230
    }
1231
    tDecoderClear(&decoder);
611✔
1232

1233
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
611!
1234
      int32_t taskId = pTask->id.taskId;
×
1235
      STaskId id = streamTaskGetTaskId(pTask);
×
1236

UNCOV
1237
      tFreeStreamTask(pTask);
×
UNCOV
1238
      void* px = taosArrayPush(pRecycleList, &id);
×
UNCOV
1239
      if (px == NULL) {
×
UNCOV
1240
        stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
×
1241
      }
1242

UNCOV
1243
      int32_t total = taosArrayGetSize(pRecycleList);
×
UNCOV
1244
      stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
×
UNCOV
1245
      continue;
×
1246
    }
1247

1248
    stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
611✔
1249
            pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
1250

1251
    // do duplicate task check.
1252
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
611✔
1253
    void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
611✔
1254
    if (p == NULL) {
611!
1255
      code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
611✔
1256
      if (code < 0) {
611!
UNCOV
1257
        stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
×
UNCOV
1258
        tFreeStreamTask(pTask);
×
1259
        continue;
×
1260
      }
1261

1262
      void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
611✔
1263
      if (px == NULL) {
611!
UNCOV
1264
        stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
×
1265
      }
1266
    } else {
1267
      // todo this should replace the existed object put by replay creating stream task msg from mnode
1268
      stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
×
UNCOV
1269
      tFreeStreamTask(pTask);
×
UNCOV
1270
      continue;
×
1271
    }
1272

1273
    pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
611✔
1274

1275
    if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
611!
1276
      int64_t refId = pTask->id.refId;
×
UNCOV
1277
      stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
×
1278
              pTask->id.taskId, tstrerror(terrno), refId);
1279

UNCOV
1280
      void*   px = taosArrayPop(pMeta->pTaskList);
×
UNCOV
1281
      int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
UNCOV
1282
      if (ret != 0) {
×
UNCOV
1283
        stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1284
      }
1285
      continue;
×
1286
    }
1287

1288
    stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId);
611!
1289
    if (pTask->info.fillHistory == 0) {
611✔
1290
      int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
490✔
1291
    }
1292

1293
    if (streamTaskShouldPause(pTask)) {
611!
UNCOV
1294
      int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
1295
    }
1296
  }
1297

1298
  tdbFree(pKey);
13,737✔
1299
  tdbFree(pVal);
13,736✔
1300

1301
  tdbTbcClose(pCur);
13,736✔
1302

1303
  dropHistoryTaskIfNoStreamTask(pMeta, pRecycleList);
13,736✔
1304

1305
  if (taosArrayGetSize(pRecycleList) > 0) {
13,736!
UNCOV
1306
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
×
UNCOV
1307
      STaskId* pId = taosArrayGet(pRecycleList, i);
×
UNCOV
1308
      code = streamMetaRemoveTaskInMeta(pMeta, pId);
×
UNCOV
1309
      if (code) {
×
UNCOV
1310
        stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
×
1311
      }
1312
    }
1313
  }
1314

1315
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
13,736✔
1316
  stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
13,736✔
1317
          pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
1318

1319
  taosArrayDestroy(pRecycleList);
13,736✔
1320
  code = streamMetaCommit(pMeta);
13,736✔
1321
  if (code) {
13,736!
UNCOV
1322
    stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
×
1323
  }
1324
}
1325

1326
void streamMetaNotifyClose(SStreamMeta* pMeta) {
20,988✔
1327
  int32_t vgId = pMeta->vgId;
20,988✔
1328
  int64_t startTs = 0;
20,988✔
1329
  int32_t sendCount = 0;
20,988✔
1330

1331
  streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
20,988✔
1332
  stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
20,989!
1333
         vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
1334

1335
  // wait for the stream meta hb function stopping
1336
  pMeta->closeFlag = true;
20,990✔
1337
  streamMetaWaitForHbTmrQuit(pMeta);
20,990✔
1338

1339
  stDebug("vgId:%d start to check all tasks for closing", vgId);
20,990✔
1340
  int64_t st = taosGetTimestampMs();
20,990✔
1341

1342
  streamMetaRLock(pMeta);
20,990✔
1343

1344
  SArray* pTaskList = NULL;
20,990✔
1345
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
20,990✔
1346
  if (code != TSDB_CODE_SUCCESS) {
1347
  }
1348

1349
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
20,990✔
1350
  for (int32_t i = 0; i < numOfTasks; ++i) {
25,998✔
1351
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
5,008✔
1352
    SStreamTask*   pTask = NULL;
5,007✔
1353

1354
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
5,007✔
1355
    if (code != TSDB_CODE_SUCCESS) {
5,008✔
1356
      continue;
2,627✔
1357
    }
1358

1359
    int64_t refId = pTask->id.refId;
2,381✔
1360
    int32_t ret = streamTaskStop(pTask);
2,381✔
1361
    if (ret) {
2,381!
UNCOV
1362
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
1363
    }
1364

1365
    streamMetaReleaseTask(pMeta, pTask);
2,381✔
1366
    ret = taosRemoveRef(streamTaskRefPool, refId);
2,381✔
1367
    if (ret) {
2,381!
UNCOV
1368
      stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
×
1369
    }
1370
  }
1371

1372
  taosArrayDestroy(pTaskList);
20,990✔
1373

1374
  double el = (taosGetTimestampMs() - st) / 1000.0;
20,990✔
1375
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
20,990✔
1376

1377
  if (pMeta->scanInfo.scanTimer != NULL) {
20,990✔
1378
    streamTmrStop(pMeta->scanInfo.scanTimer);
11,231✔
1379
    pMeta->scanInfo.scanTimer = NULL;
11,231✔
1380
  }
1381

1382
  streamMetaRUnLock(pMeta);
20,990✔
1383
}
20,990✔
1384

1385
void streamMetaStartHb(SStreamMeta* pMeta) {
11,240✔
1386
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
11,240!
1387
  if (pRid == NULL) {
11,240!
UNCOV
1388
    stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
×
UNCOV
1389
    return;
×
1390
  }
1391

1392
  *pRid = pMeta->rid;
11,240✔
1393
  int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
11,240✔
1394
  if (code) {
11,240!
UNCOV
1395
    return;
×
1396
  }
1397

1398
  streamMetaHbToMnode(pRid, NULL);
11,240✔
1399
}
1400

1401
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
21,110✔
1402
  QRY_PARAM_CHECK(pList);
21,110!
1403

1404
  int32_t code = 0;
21,110✔
1405
  SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
21,110✔
1406
  if (pTaskList == NULL) {
21,109!
UNCOV
1407
    stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
×
UNCOV
1408
    return terrno;
×
1409
  }
1410

1411
  *pList = pTaskList;
21,109✔
1412

1413
  bool sendMsg = pMeta->sendMsgBeforeClosing;
21,109✔
1414
  if (!sendMsg) {
21,109✔
1415
    stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
21,075✔
1416
    return TSDB_CODE_SUCCESS;
21,076✔
1417
  }
1418

1419
  stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
34!
1420

1421
  // send hb msg to mnode before closing all tasks.
1422
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
34✔
1423
  for (int32_t i = 0; i < numOfTasks; ++i) {
34!
1424
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
UNCOV
1425
    SStreamTask*   pTask = NULL;
×
1426

UNCOV
1427
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
UNCOV
1428
    if (code != TSDB_CODE_SUCCESS) {  // this error is ignored
×
UNCOV
1429
      continue;
×
1430
    }
1431

UNCOV
1432
    streamTaskSetCheckpointFailed(pTask);
×
UNCOV
1433
    streamMetaReleaseTask(pMeta, pTask);
×
1434
  }
1435

1436
  code = streamMetaSendHbHelper(pMeta);
34✔
1437
  pMeta->sendMsgBeforeClosing = false;
34✔
1438
  return TSDB_CODE_SUCCESS;  // always return true
34✔
1439
}
1440

1441
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t term, bool isLeader) {
17,039✔
1442
  streamMetaWLock(pMeta);
17,039✔
1443

1444
  int64_t prevTerm = pMeta->stage;
17,039✔
1445
  int32_t prevRole = pMeta->role;
17,039✔
1446

1447
  pMeta->stage = term;
17,039✔
1448
  pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
17,039✔
1449

1450
  // mark the sign to send msg before close all tasks
1451
  // 1. for a leader vnode, always send msg before closing itself
1452
  // 2. for a follower vnode, if it's changed from a leader, also sending msg before closing.
1453
  if (prevRole == NODE_ROLE_LEADER) {
17,039✔
1454
    pMeta->sendMsgBeforeClosing = true;
34✔
1455
  }
1456

1457
  if ((prevRole == NODE_ROLE_FOLLOWER || prevRole == NODE_ROLE_LEADER) && (prevRole != pMeta->role) &&
18,198✔
1458
      (taosArrayGetSize(pMeta->pTaskList) > 0)) {
1,159✔
1459
    SStreamTask* pTask = NULL;
12✔
1460
    STaskId*     pId = taosArrayGet(pMeta->pTaskList, 0);
12✔
1461

1462
    int32_t code = streamMetaAcquireTaskUnsafe(pMeta, pId, &pTask);
11✔
1463
    if (code == 0) {
11!
1464
      stInfo("vgId:%d role changed, added into nodeUpdate list, use s-task:0x%s", pMeta->vgId, pTask->id.idStr);
11!
1465
      int32_t unused = streamTaskAddIntoNodeUpdateList(pTask, pMeta->vgId);
11✔
1466
      streamMetaReleaseTask(pMeta, pTask);
11✔
1467
    }
1468
  }
1469

1470
  if (!isLeader) {
17,038✔
1471
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
5,799✔
1472
  } else {  // wait for nodeep update if become leader from follower
1473
    if (prevRole == NODE_ROLE_FOLLOWER) {
11,239✔
1474
      pMeta->startInfo.tasksWillRestart = 1;
1,125✔
1475
    }
1476
  }
1477

1478
  streamMetaWUnLock(pMeta);
17,038✔
1479

1480
  if (isLeader) {
17,038✔
1481
    if (prevRole == NODE_ROLE_FOLLOWER) {
11,240✔
1482
      stInfo("vgId:%d update term:%" PRId64 ", prevTerm:%" PRId64
1,125!
1483
             " prevRole:%d leader:%d, start to send Hb, rid:%" PRId64 " restart after nodeEp being updated",
1484
             pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->rid);
1485
    } else {
1486
      stInfo("vgId:%d update term:%" PRId64 ", prevTerm:%" PRId64
10,115!
1487
             " prevRole:%d leader:%d, start to send Hb, rid:%" PRId64,
1488
             pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->rid);
1489
    }
1490
    streamMetaStartHb(pMeta);
11,240✔
1491
  } else {
1492
    stInfo("vgId:%d update term:%" PRId64 " prevTerm:%" PRId64 " prevRole:%d leader:%d sendMsg beforeClosing:%d",
5,798!
1493
           pMeta->vgId, term, prevTerm, prevRole, isLeader, pMeta->sendMsgBeforeClosing);
1494
  }
1495
}
17,038✔
1496

1497
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
40✔
1498
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
40✔
1499
  for (int32_t i = 0; i < num; ++i) {
181✔
1500
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
148✔
1501
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
148✔
1502
    SStreamTask*   pTask = NULL;
148✔
1503
    int32_t        code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
148✔
1504

1505
    if (code == 0) {
148!
1506
      if (pTask->status.downstreamReady == 0) {
148✔
1507
        streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
7✔
1508
        return false;
7✔
1509
      }
1510
      streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
141✔
1511
    }
1512
  }
1513

1514
  return true;
33✔
1515
}
1516

1517
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
131✔
1518
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
131✔
1519

1520
  stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
131✔
1521
  if (numOfTasks == 0) {
131!
UNCOV
1522
    return TSDB_CODE_SUCCESS;
×
1523
  }
1524

1525
  for (int32_t i = 0; i < numOfTasks; ++i) {
664✔
1526
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
533✔
1527
    STaskId        id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
533✔
1528
    SStreamTask*   pTask = NULL;
533✔
1529
    int32_t        code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
533✔
1530
    if (code == 0) {
533!
1531
      streamTaskResetStatus(pTask);
533✔
1532
      streamMetaReleaseTask(pMeta, pTask);
533✔
1533
    }
1534
  }
1535

1536
  return 0;
131✔
1537
}
1538

1539
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
70✔
1540
                                     int64_t startTs) {
1541
  const char* id = pTask->id.idStr;
70✔
1542
  int32_t     vgId = pMeta->vgId;
70✔
1543
  int32_t     code = 0;
70✔
1544

1545
  // keep the already updated info
1546
  STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
70✔
1547
  code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
70✔
1548
  if (code != 0) {
70!
UNCOV
1549
    stError("s-task:%s failed to put updateTask into update list", id);
×
1550
  }
1551

1552
  int64_t el = taosGetTimestampMs() - startTs;
70✔
1553
  if (pHTask != NULL) {
70✔
1554
    STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
58✔
1555
    code = taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
58✔
1556
    if (code != 0) {
58!
UNCOV
1557
      stError("s-task:%s failed to put updateTask into update list", id);
×
1558
    } else {
1559
      stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
58!
1560
              " ms",
1561
              id, vgId, transId, el);
1562
    }
1563
  } else {
1564
    stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
12!
1565
            id, vgId, transId, el);
1566
  }
1567
}
70✔
1568

1569
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
65✔
1570
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
65✔
1571
  int32_t          num = taosHashGetSize(pInfo->pTasks);
65✔
1572

1573
  taosHashClear(pInfo->pTasks);
65✔
1574

1575
  int32_t prev = pInfo->completeTransId;
65✔
1576
  pInfo->completeTransId = pInfo->activeTransId;
65✔
1577
  pInfo->activeTransId = -1;
65✔
1578
  pInfo->completeTs = taosGetTimestampMs();
65✔
1579

1580
  stInfo("vgId:%d set the nodeEp update complete, ts:%" PRId64
65!
1581
          ", complete transId:%d->%d, update Tasks:%d reset active transId",
1582
          pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId, num);
1583
}
65✔
1584

1585
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId, SArray* pUpdateTaskList) {
127✔
1586
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
127✔
1587
  int32_t          numOfTasks = taosArrayGetSize(pUpdateTaskList);
127✔
1588

1589
  if (transId > pInfo->completeTransId) {
127✔
1590
    if (pInfo->activeTransId == -1) {
70✔
1591
      taosHashClear(pInfo->pTasks);
65✔
1592
      pInfo->activeTransId = transId;
65✔
1593

1594
      // interrupt the start all tasks procedure, only partial tasks will be started
1595
      // the completion of this processed is based on the partial started tasks.
1596
      if (pMeta->startInfo.startAllTasks == 1) {
65✔
1597
        int32_t num = taosArrayGetSize(pMeta->startInfo.pRecvChkptIdTasks);
23✔
1598
        pMeta->startInfo.partialTasksStarted = true;
23✔
1599
        stInfo(
23!
1600
            "vgId:%d set the active epset update transId:%d for %d tasks, prev complete transId:%d, start all "
1601
            "interrupted, only %d tasks were started",
1602
            pMeta->vgId, transId, numOfTasks, pInfo->completeTransId, num);
1603
      } else {
1604
        stInfo("vgId:%d set the active epset update transId:%d for %d tasks, prev complete transId:%d", pMeta->vgId,
42!
1605
               transId, numOfTasks, pInfo->completeTransId);
1606
      }
1607
      return true;
65✔
1608
    } else {
1609
      if (pInfo->activeTransId == transId) {
5!
1610
        // do nothing
1611
        return true;
5✔
1612
      } else if (transId < pInfo->activeTransId) {
×
NEW
1613
        stError("vgId:%d invalid(out of order) epset update transId:%d, active transId:%d, complete transId:%d, discard",
×
1614
                pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
1615
        return false;
×
1616
      } else {  // transId > pInfo->activeTransId
UNCOV
1617
        taosHashClear(pInfo->pTasks);
×
UNCOV
1618
        int32_t prev = pInfo->activeTransId;
×
UNCOV
1619
        pInfo->activeTransId = transId;
×
1620

NEW
1621
        stInfo(
×
1622
            "vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d, reqUpdate tasks:%d",
1623
            pMeta->vgId, prev, transId, pInfo->completeTransId, numOfTasks);
UNCOV
1624
        return true;
×
1625
      }
1626
    }
1627
  } else if (transId == pInfo->completeTransId) {
57!
1628
    stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
57!
1629
            pInfo->completeTs);
1630
    return false;
57✔
1631
  } else {  // pInfo->completeTransId > transId
UNCOV
1632
    stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
×
1633
            pMeta->vgId, pInfo->activeTransId, transId);
UNCOV
1634
    return false;
×
1635
  }
1636
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc