• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3608

12 Feb 2025 05:57AM UTC coverage: 63.066% (+1.4%) from 61.715%
#3608

push

travis-ci

web-flow
Merge pull request #29746 from taosdata/merge/mainto3.02

merge: from main to 3.0 branch

140199 of 286257 branches covered (48.98%)

Branch coverage included in aggregate %.

89 of 161 new or added lines in 18 files covered. (55.28%)

3211 existing lines in 190 files now uncovered.

218998 of 283298 relevant lines covered (77.3%)

5949310.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.75
/source/libs/stream/src/streamMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "streamInt.h"
18
#include "tmisce.h"
19
#include "tref.h"
20
#include "tsched.h"
21
#include "tstream.h"
22
#include "ttimer.h"
23
#include "wal.h"
24

25
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
26

27
int32_t streamBackendId = 0;
28
int32_t streamBackendCfWrapperId = 0;
29
int32_t taskDbWrapperId = 0;
30
int32_t streamTaskRefPool = 0;
31

32
static int32_t streamMetaBegin(SStreamMeta* pMeta);
33
static void    streamMetaCloseImpl(void* arg);
34

35
typedef struct {
36
  TdThreadMutex mutex;
37
  SHashObj*     pTable;
38
} SMetaRefMgt;
39

40
SMetaRefMgt gMetaRefMgt;
41

42
int32_t metaRefMgtInit();
43
void    metaRefMgtCleanup();
44

45
static void streamMetaEnvInit() {
2,187✔
46
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
2,187✔
47
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
2,187✔
48
  taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
2,187✔
49

50
  streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
2,187✔
51
  streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
2,187✔
52

53
  int32_t code = metaRefMgtInit();
2,187✔
54
  if (code) {
2,187!
55
    stError("failed to init stream meta mgmt env, start failed");
×
56
    return;
×
57
  }
58

59
  code = streamTimerInit();
2,187✔
60
  if (code) {
2,187!
61
    stError("failed to init stream meta env, start failed");
×
62
  }
63
}
64

65
void streamMetaInit() {
2,189✔
66
  int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
2,189✔
67
  if (code) {
2,189!
68
    stError("failed to init stream Meta model, code:%s", tstrerror(code));
×
69
  }
70
}
2,189✔
71

72
void streamMetaCleanup() {
2,186✔
73
  taosCloseRef(streamBackendId);
2,186✔
74
  taosCloseRef(streamBackendCfWrapperId);
2,186✔
75
  taosCloseRef(streamMetaRefPool);
2,186✔
76
  taosCloseRef(streamTaskRefPool);
2,186✔
77

78
  metaRefMgtCleanup();
2,186✔
79
  streamTimerCleanUp();
2,186✔
80
}
2,186✔
81

82
int32_t metaRefMgtInit() {
2,187✔
83
  int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
2,187✔
84
  if (code) {
2,187!
85
    return code;
×
86
  }
87

88
  if (code == 0) {
2,187!
89
    gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
2,187✔
90
  }
91

92
  if (gMetaRefMgt.pTable == NULL) {
2,187!
93
    return terrno;
×
94
  } else {
95
    return code;
2,187✔
96
  }
97
}
98

99
void metaRefMgtCleanup() {
2,186✔
100
  void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
2,186✔
101
  while (pIter) {
25,394✔
102
    int64_t* p = *(int64_t**)pIter;
23,208✔
103
    taosMemoryFree(p);
23,208!
104
    pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
23,208✔
105
  }
106

107
  taosHashCleanup(gMetaRefMgt.pTable);
2,186✔
108
  streamMutexDestroy(&gMetaRefMgt.mutex);
2,186✔
109
}
2,186✔
110

111
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
39,657✔
112
  int32_t code = 0;
39,657✔
113
  void*   p = NULL;
39,657✔
114

115
  streamMutexLock(&gMetaRefMgt.mutex);
39,657✔
116

117
  p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
39,657✔
118
  if (p == NULL) {
39,657!
119
    code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
39,657✔
120
    if (code) {
39,657!
121
      stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid,
×
122
              tstrerror(code));
123
      return code;
×
124
    } else {  // not
125
              //      stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
126
    }
127
  } else {
128
    stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid);
×
129
  }
130

131
  streamMutexUnlock(&gMetaRefMgt.mutex);
39,657✔
132
  return code;
39,657✔
133
}
134

135
void metaRefMgtRemove(int64_t* pRefId) {
16,441✔
136
  streamMutexLock(&gMetaRefMgt.mutex);
16,441✔
137

138
  int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
16,441✔
139
  taosMemoryFree(pRefId);
16,441!
140
  streamMutexUnlock(&gMetaRefMgt.mutex);
16,441✔
141
}
16,441✔
142

143
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
10,794✔
144
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
10,794!
145
    stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
×
146
    return -1;
×
147
  }
148

149
  if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
10,799!
150
    stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
×
151
    return -1;
×
152
  }
153

154
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
10,796!
155
    stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
×
156
    return -1;
×
157
  }
158

159
  return 0;
10,798✔
160
}
161

162
//
163
// impl later
164
//
165
enum STREAM_STATE_VER {
166
  STREAM_STATA_NO_COMPATIBLE,
167
  STREAM_STATA_COMPATIBLE,
168
  STREAM_STATA_NEED_CONVERT,
169
};
170

171
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
10,798✔
172
  int8_t  ret = STREAM_STATA_COMPATIBLE;
10,798✔
173
  TBC*    pCur = NULL;
10,798✔
174
  int32_t code = 0;
10,798✔
175
  void*   pKey = NULL;
10,798✔
176
  int32_t kLen = 0;
10,798✔
177
  void*   pVal = NULL;
10,798✔
178
  int32_t vLen = 0;
10,798✔
179

180
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {  // no task info, no stream
10,798!
181
    return ret;
×
182
  }
183

184
  code = tdbTbcMoveToFirst(pCur);
10,798✔
185
  if (code) {
10,797!
186
    stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
×
187
            tstrerror(code));
188
    tdbTbcClose(pCur);
×
189
    return ret;
×
190
  }
191

192
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
10,813✔
193
    if (pVal == NULL || vLen == 0) {
79!
194
      break;
195
    }
196

197
    SDecoder        decoder;
198
    SCheckpointInfo info;
199
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
79✔
200
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
79✔
201
      tDecoderClear(&decoder);
16✔
202
      continue;
16✔
203
    }
204

205
    if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
63!
206
      ret = STREAM_STATA_NO_COMPATIBLE;
×
207
    } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
63!
208
      ret = STREAM_STATA_NEED_CONVERT;
63✔
209
    }
210

211
    tDecoderClear(&decoder);
63✔
212
    break;
63✔
213
  }
214

215
  tdbFree(pKey);
10,796✔
216
  tdbFree(pVal);
10,796✔
217
  tdbTbcClose(pCur);
10,797✔
218
  return ret;
10,799✔
219
}
220

221
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
63✔
222
  int32_t          code = 0;
63✔
223
  SBackendWrapper* pBackend = NULL;
63✔
224
  int64_t          chkpId = streamMetaGetLatestCheckpointId(pMeta);
63✔
225

226
  terrno = 0;
63✔
227
  bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
63✔
228
  if (exist == false) {
63!
229
    code = terrno;
63✔
230
    return code;
63✔
231
  }
232

233
  code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
×
234
  if (code) {
×
235
    return code;
×
236
  }
237

238
  void* pIter = taosHashIterate(pBackend->cfInst, NULL);
×
239
  while (pIter) {
×
240
    void* key = taosHashGetKey(pIter, NULL);
×
241
    code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
×
242
    if (code != 0) {
×
243
      stError("failed to cvt data");
×
244
      goto _EXIT;
×
245
    }
246

247
    pIter = taosHashIterate(pBackend->cfInst, pIter);
×
248
  }
249

250
_EXIT:
×
251
  streamBackendCleanup((void*)pBackend);
×
252

253
  if (code == 0) {
×
254
    int32_t len = strlen(pMeta->path) + 32;
×
255
    char*   state = taosMemoryCalloc(1, len);
×
256
    if (state != NULL) {
×
257
      (void)snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
×
258
      taosRemoveDir(state);
×
259
      taosMemoryFree(state);
×
260
    } else {
261
      stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
×
262
    }
263
  }
264

265
  return code;
×
266
}
267

268
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
10,798✔
269
  int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
10,798✔
270
  if (compatible == STREAM_STATA_COMPATIBLE) {
10,797✔
271
    return 0;
10,734✔
272
  } else if (compatible == STREAM_STATA_NEED_CONVERT) {
63!
273
    stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
63!
274
    return streamMetaCvtDbFormat(pMeta);
63✔
275
  } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
×
276
    stError(
×
277
        "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
278
        "manually",
279
        pMeta->vgId, tsDataDir);
280

281
    return -1;
×
282
  }
283

284
  return 0;
×
285
}
286

287
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) {
4,756✔
288
  int32_t code = 0;
4,756✔
289
  int64_t chkpId = pTask->chkInfo.checkpointId;
4,756✔
290

291
  streamMutexLock(&pMeta->backendMutex);
4,756✔
292
  void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
4,756✔
293
  if ((ppBackend != NULL) && (*ppBackend != NULL)) {
4,756!
294
    void* p = taskDbAddRef(*ppBackend);
1,534✔
295
    if (p == NULL) {
1,534!
296
      stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
×
297
      streamMutexUnlock(&pMeta->backendMutex);
×
298
      return TSDB_CODE_FAILED;
×
299
    }
300

301
    STaskDbWrapper* pBackend = *ppBackend;
1,534✔
302
    pBackend->pMeta = pMeta;
1,534✔
303
    pTask->pBackend = pBackend;
1,534✔
304

305
    streamMutexUnlock(&pMeta->backendMutex);
1,534✔
306
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
1,534✔
307
    return 0;
1,534✔
308
  }
309

310
  STaskDbWrapper* pBackend = NULL;
3,222✔
311
  int64_t         processVer = -1;
3,222✔
312
  while (1) {
313
    code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
3,222✔
314
    if (code == 0) {
3,222!
315
      break;
3,222✔
316
    }
317

318
    streamMutexUnlock(&pMeta->backendMutex);
×
319
    taosMsleep(1000);
×
320

321
    stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
×
322
    streamMutexLock(&pMeta->backendMutex);
×
323
  }
324

325
  int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
3,222✔
326
  pTask->pBackend = pBackend;
3,222✔
327
  pBackend->refId = tref;
3,222✔
328
  pBackend->pTask = pTask;
3,222✔
329
  pBackend->pMeta = pMeta;
3,222✔
330

331
  if (processVer != -1) {
3,222✔
332
    if (pTask->chkInfo.processedVer != processVer) {
10!
333
      stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
×
334
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
335
      pTask->chkInfo.processedVer = processVer;
×
336
      pTask->chkInfo.checkpointVer = processVer;
×
337
      pTask->chkInfo.nextProcessVer = processVer + 1;
×
338
    } else {
339
      stInfo("s-task:%s vgId:%d processedVer:%" PRId64
10!
340
             " in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
341
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
342
    }
343
  }
344

345
  code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
3,222✔
346
  if (code) {
3,222!
347
    stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
×
348
  }
349
  streamMutexUnlock(&pMeta->backendMutex);
3,222✔
350

351
  stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
3,222✔
352
  return 0;
3,222✔
353
}
354

355
void streamMetaRemoveDB(void* arg, char* key) {
3,105✔
356
  if (arg == NULL || key == NULL) return;
3,105!
357

358
  SStreamMeta* pMeta = arg;
3,105✔
359
  streamMutexLock(&pMeta->backendMutex);
3,105✔
360
  int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
3,105✔
361
  if (code) {
3,105!
362
    stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
×
363
  }
364

365
  streamMutexUnlock(&pMeta->backendMutex);
3,105✔
366
}
367

368
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
10,707✔
369
                       int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
370
  QRY_PARAM_CHECK(p);
10,707!
371
  int32_t code = 0;
10,707✔
372
  int32_t lino = 0;
10,707✔
373

374
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
10,707!
375
  if (pMeta == NULL) {
10,796!
376
    stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
×
377
    return terrno;
×
378
  }
379

380
  int32_t len = strlen(path) + 64;
10,796✔
381
  char*   tpath = taosMemoryCalloc(1, len);
10,796!
382
  TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
10,793!
383

384
  (void)snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream");
10,793✔
385
  pMeta->path = tpath;
10,793✔
386

387
  code = streamMetaOpenTdb(pMeta);
10,793✔
388
  TSDB_CHECK_CODE(code, lino, _err);
10,798!
389

390
  if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
10,798!
391
    stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
×
392
            tstrerror(terrno));
393
    TSDB_CHECK_CODE(code, lino, _err);
×
394
  }
395

396
  // set the attribute when running on Linux OS
397
  TdThreadRwlockAttr attr;
398
  code = taosThreadRwlockAttrInit(&attr);
10,796✔
399
  TSDB_CHECK_CODE(code, lino, _err);
10,796!
400

401
#ifdef LINUX
402
  code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
10,796✔
403
  TSDB_CHECK_CODE(code, lino, _err);
10,797!
404
#endif
405

406
  code = taosThreadRwlockInit(&pMeta->lock, &attr);
10,797✔
407
  TSDB_CHECK_CODE(code, lino, _err);
10,796!
408

409
  code = taosThreadRwlockAttrDestroy(&attr);
10,796✔
410
  TSDB_CHECK_CODE(code, lino, _err);
10,796!
411

412
  if ((code = streamMetaBegin(pMeta) < 0)) {
10,796!
413
    stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
×
414
    goto _err;
×
415
  }
416

417
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
10,798✔
418
  pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
10,796✔
419
  TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
10,797!
420

421
  pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
10,797✔
422
  TSDB_CHECK_NULL(pMeta->updateInfo.pTasks, code, lino, _err, terrno);
10,799!
423

424
  code = streamMetaInitStartInfo(&pMeta->startInfo);
10,799✔
425
  TSDB_CHECK_CODE(code, lino, _err);
10,798!
426

427
  // task list
428
  pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
10,798✔
429
  TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
10,797!
430

431
  pMeta->scanInfo.scanCounter = 0;
10,797✔
432
  pMeta->vgId = vgId;
10,797✔
433
  pMeta->ahandle = ahandle;
10,797✔
434
  pMeta->buildTaskFn = buildTaskFn;
10,797✔
435
  pMeta->expandTaskFn = expandTaskFn;
10,797✔
436
  pMeta->stage = stage;
10,797✔
437
  pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
10,797✔
438
  pMeta->updateInfo.activeTransId = -1;
10,797✔
439
  pMeta->updateInfo.completeTransId = -1;
10,797✔
440

441
  pMeta->startInfo.completeFn = fn;
10,797✔
442
  pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
10,797✔
443
  TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
10,797!
444

445
  pMeta->numOfPausedTasks = 0;
10,797✔
446
  pMeta->numOfStreamTasks = 0;
10,797✔
447
  pMeta->closeFlag = false;
10,797✔
448

449
  stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
10,797✔
450

451
  code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
10,800✔
452
  TSDB_CHECK_CODE(code, lino, _err);
10,798!
453

454
  code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
10,798✔
455
  TSDB_CHECK_CODE(code, lino, _err);
10,799!
456

457
  // add refId at the end of initialization function
458
  pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
10,799✔
459

460
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
10,797!
461
  TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
10,799!
462

463
  memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
10,799✔
464

465
  code = metaRefMgtAdd(pMeta->vgId, pRid);
10,799✔
466
  TSDB_CHECK_CODE(code, lino, _err);
10,799!
467

468
  code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
10,799✔
469

470
  TSDB_CHECK_CODE(code, lino, _err);
10,799!
471

472
  *p = pMeta;
10,799✔
473
  return code;
10,799✔
474

475
_err:
×
476
  taosMemoryFree(pMeta->path);
×
477
  if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
×
478
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
×
479
  if (pMeta->pTaskDb) {
×
480
    tdbTbClose(pMeta->pTaskDb);
×
481
    pMeta->pTaskDb = NULL;
×
482
  }
483
  if (pMeta->pCheckpointDb) {
×
484
    tdbTbClose(pMeta->pCheckpointDb);
×
485
  }
486
  if (pMeta->db) {
×
487
    tdbClose(pMeta->db);
×
488
  }
489

490
  if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
×
491
  if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
×
492
  if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
×
493
  if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
×
494
  if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
×
495

496
  taosMemoryFree(pMeta);
×
497

498
  stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
×
499
  return code;
×
500
}
501

502
// todo refactor: the lock shoud be restricted in one function
503
#ifdef BUILD_NO_CALL
504
void streamMetaInitBackend(SStreamMeta* pMeta) {
505
  pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
506
  if (pMeta->streamBackend == NULL) {
507
    while (1) {
508
      streamMetaWLock(pMeta);
509
      pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
510
      if (pMeta->streamBackend != NULL) {
511
        break;
512
      }
513

514
      streamMetaWUnLock(pMeta);
515
      stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
516
      taosMsleep(100);
517
    }
518
  }
519

520
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
521
  streamBackendLoadCheckpointInfo(pMeta);
522
}
523
#endif
524

525
void streamMetaClear(SStreamMeta* pMeta) {
10,808✔
526
  // remove all existed tasks in this vnode
527
  void* pIter = NULL;
10,808✔
528
  while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
13,289✔
529
    int64_t      refId = *(int64_t*)pIter;
2,481✔
530
    SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
2,481✔
531
    if (p == NULL) {
2,481✔
532
      continue;
2,413✔
533
    }
534

535
    // release the ref by timer
536
    if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) {  // one more ref in timer
68!
537
      stDebug("s-task:%s stop schedTimer", p->id.idStr);
×
538
      streamTmrStop(p->schedInfo.pDelayTimer);
×
539
      p->info.delaySchedParam = 0;
×
540
    }
541

542
    int32_t code = taosRemoveRef(streamTaskRefPool, refId);
68✔
543
    if (code) {
68!
544
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
545
    }
546

547
    code = taosReleaseRef(streamTaskRefPool, refId);
68✔
548
    if (code) {
68!
549
      stError("vgId:%d failed to release refId:%" PRId64, pMeta->vgId, refId);
×
550
    }
551
  }
552

553
  if (pMeta->streamBackendRid != 0) {
10,808!
554
    int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
×
555
    if (code) {
×
556
      stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
×
557
    }
558
  }
559

560
  taosHashClear(pMeta->pTasksMap);
10,808✔
561

562
  taosArrayClear(pMeta->pTaskList);
10,808✔
563
  taosArrayClear(pMeta->chkpSaved);
10,807✔
564
  taosArrayClear(pMeta->chkpInUse);
10,808✔
565

566
  pMeta->numOfStreamTasks = 0;
10,808✔
567
  pMeta->numOfPausedTasks = 0;
10,808✔
568

569
  // the willrestart/starting flag can NOT be cleared
570
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
10,808✔
571
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
10,808✔
572
  pMeta->startInfo.readyTs = 0;
10,808✔
573
}
10,808✔
574

575
void streamMetaClose(SStreamMeta* pMeta) {
10,792✔
576
  stDebug("vgId:%d start to close stream meta", pMeta->vgId);
10,792✔
577
  if (pMeta == NULL) {
10,792✔
578
    return;
1✔
579
  }
580

581
  int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
10,791✔
582
  if (code) {
10,791!
583
    stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
×
584
  }
585
}
586

587
void streamMetaCloseImpl(void* arg) {
10,790✔
588
  SStreamMeta* pMeta = arg;
10,790✔
589
  if (pMeta == NULL) {
10,790!
590
    return;
×
591
  }
592

593
  int32_t code = 0;
10,790✔
594
  int32_t vgId = pMeta->vgId;
10,790✔
595
  stDebug("vgId:%d start to do-close stream meta", vgId);
10,790✔
596

597
  streamMetaWLock(pMeta);
10,790✔
598
  streamMetaClear(pMeta);
10,791✔
599
  streamMetaWUnLock(pMeta);
10,791✔
600

601
  // already log the error, ignore here
602
  tdbAbort(pMeta->db, pMeta->txn);
10,791✔
603
  tdbTbClose(pMeta->pTaskDb);
10,788✔
604
  tdbTbClose(pMeta->pCheckpointDb);
10,791✔
605
  tdbClose(pMeta->db);
10,791✔
606

607
  taosArrayDestroy(pMeta->pTaskList);
10,791✔
608
  taosArrayDestroy(pMeta->chkpSaved);
10,791✔
609
  taosArrayDestroy(pMeta->chkpInUse);
10,791✔
610

611
  taosHashCleanup(pMeta->pTasksMap);
10,791✔
612
  taosHashCleanup(pMeta->pTaskDbUnique);
10,791✔
613
  taosHashCleanup(pMeta->updateInfo.pTasks);
10,791✔
614

615
  streamMetaClearStartInfo(&pMeta->startInfo);
10,791✔
616

617
  destroyMetaHbInfo(pMeta->pHbInfo);
10,791✔
618
  pMeta->pHbInfo = NULL;
10,791✔
619

620
  taosMemoryFree(pMeta->path);
10,791!
621
  streamMutexDestroy(&pMeta->backendMutex);
10,790✔
622

623
  bkdMgtDestroy(pMeta->bkdChkptMgt);
10,791✔
624

625
  pMeta->role = NODE_ROLE_UNINIT;
10,791✔
626
  code = taosThreadRwlockDestroy(&pMeta->lock);
10,791✔
627
  if (code) {
10,791!
628
    stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
×
629
  }
630

631
  taosMemoryFree(pMeta);
10,791!
632
  stDebug("vgId:%d end to close stream meta", vgId);
10,791✔
633
}
634

635
// todo let's check the status for each task
636
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
11,295✔
637
  int32_t vgId = pTask->pMeta->vgId;
11,295✔
638
  void*   buf = NULL;
11,295✔
639
  int32_t len;
640
  int32_t code;
641
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
11,295!
642
  if (code < 0) {
11,294!
643
    return -1;
×
644
  }
645

646
  buf = taosMemoryCalloc(1, len);
11,294!
647
  if (buf == NULL) {
11,294!
648
    return terrno;
×
649
  }
650

651
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
11,294✔
652
    pTask->ver = SSTREAM_TASK_VER;
22✔
653
  }
654

655
  SEncoder encoder = {0};
11,294✔
656
  tEncoderInit(&encoder, buf, len);
11,294✔
657
  code = tEncodeStreamTask(&encoder, pTask);
11,294✔
658
  tEncoderClear(&encoder);
11,295✔
659

660
  if (code == -1) {
11,294!
661
    stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
662
    return TSDB_CODE_INVALID_MSG;
×
663
  }
664

665
  int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
11,294✔
666

667
  code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
11,294✔
668
  if (code != TSDB_CODE_SUCCESS) {
11,295!
669
    code = terrno;
×
670
    stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
×
671
            vgId, pTask->id.refId, tstrerror(code));
672

673
    int64_t refId = pTask->id.refId;
×
674
    int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
×
675
    if (ret != 0) {
×
676
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId);
×
677
    }
678
  } else {
679
    stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
11,295✔
680
  }
681

682
  taosMemoryFree(buf);
11,295!
683
  return code;
11,295✔
684
}
685

686
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
6,522✔
687
  int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
6,522✔
688
  int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
6,522✔
689
  if (code != 0) {
6,521!
690
    stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
×
691
            tstrerror(terrno));
692
  } else {
693
    stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
6,521✔
694
  }
695

696
  return code;
6,522✔
697
}
698

699
// add to the ready tasks hash map, not the restored tasks hash map
700
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
8,813✔
701
  *pAdded = false;
8,813✔
702

703
  int32_t code = 0;
8,813✔
704
  int64_t refId = 0;
8,813✔
705
  STaskId id = streamTaskGetTaskId(pTask);
8,813✔
706
  void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
8,812✔
707

708
  if (p != NULL) {
8,812✔
709
    stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId);
63!
710
    tFreeStreamTask(pTask);
63✔
711
    return code;
63✔
712
  }
713

714
  if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
8,749!
715
    tFreeStreamTask(pTask);
×
716
    return code;
×
717
  }
718

719
  p = taosArrayPush(pMeta->pTaskList, &pTask->id);
8,750✔
720
  if (p == NULL) {
8,750!
721
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
722
    tFreeStreamTask(pTask);
×
723
    return terrno;
×
724
  }
725

726
  pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
8,750✔
727
  code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
8,750✔
728
  if (code) {
8,750!
729
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
730
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
731

732
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
733
    if (ret != 0) {
×
734
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
735
    }
736
    return code;
×
737
  }
738

739
  if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
8,750!
740
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
741
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
742

743
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
744
    if (ret) {
×
745
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
746
    }
747
    return code;
×
748
  }
749

750
  if ((code = streamMetaCommit(pMeta)) != 0) {
8,750!
751
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
752
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
753

754
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
755
    if (ret) {
×
756
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
757
    }
758

759
    return code;
×
760
  }
761

762
  if (pTask->info.fillHistory == 0) {
8,750✔
763
    int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
5,730✔
764
  }
765

766
  *pAdded = true;
8,750✔
767
  return code;
8,750✔
768
}
769

770
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
2,474,028✔
771
  int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
2,474,028✔
772
  int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
2,474,118✔
773
  if (sizeInList != size) {
2,474,119!
774
    stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
×
775
  }
776

777
  return size;
2,474,118✔
778
}
779

780
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
284,352✔
781
  QRY_PARAM_CHECK(pTask);
284,352!
782
  STaskId  id = {.streamId = streamId, .taskId = taskId};
284,352✔
783
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
284,352✔
784
  if (pTaskRefId == NULL) {
284,350✔
785
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
1,945✔
786
  }
787

788
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
282,405✔
789
  if (p == NULL) {
282,420✔
790
    stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId);
2,183✔
791
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
2,180✔
792
  }
793

794
  if (p->id.refId != *pTaskRefId) {
280,237!
795
    stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
×
796
            p->id.refId);
797
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
798
    if (ret) {
×
799
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
800
    }
801

802
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
803
  }
804

805
  if (streamTaskShouldStop(p)) {
280,237✔
806
    stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
54✔
807
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
54✔
808
    if (ret) {
54!
809
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
810
    }
811
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
54✔
812
  }
813

814
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
280,176✔
815
  *pTask = p;
280,181✔
816
  return TSDB_CODE_SUCCESS;
280,181✔
817
}
818

819
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
73,372✔
820
  QRY_PARAM_CHECK(pTask);
73,372!
821
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
73,372✔
822

823
  if (pTaskRefId == NULL) {
73,374✔
824
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
1,325✔
825
  }
826

827
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
72,049✔
828
  if (p == NULL) {
72,053!
UNCOV
829
    stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
×
830
            *pTaskRefId);
831
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
832
  }
833

834
  if (p->id.refId != *pTaskRefId) {
72,053!
835
    stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
×
836
            *pTaskRefId, p->id.refId);
837
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
838
    if (ret) {
×
839
      stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
×
840
    }
841

842
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
843
  }
844

845
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
72,053✔
846
  *pTask = p;
72,053✔
847
  return TSDB_CODE_SUCCESS;
72,053✔
848
}
849

850
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
279,918✔
851
  streamMetaRLock(pMeta);
279,918✔
852
  int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
279,913✔
853
  streamMetaRUnLock(pMeta);
279,920✔
854
  return code;
279,924✔
855
}
856

857
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
454,244✔
858
  if (pTask == NULL) {
454,244✔
859
    return;
12✔
860
  }
861

862
  int32_t taskId = pTask->id.taskId;
454,232✔
863
  int64_t refId = pTask->id.refId;
454,232✔
864
  stDebug("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
454,232✔
865
  int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
454,236✔
866
  if (ret) {
454,237!
867
    stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
×
868
  }
869
}
870

871
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
6,522✔
872
  bool remove = false;
6,522✔
873
  for (int32_t i = 0; i < num; ++i) {
18,213!
874
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
18,213✔
875
    if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
18,213✔
876
      taosArrayRemove(pTaskList, i);
6,522✔
877
      remove = true;
6,522✔
878
      break;
6,522✔
879
    }
880
  }
881

882
  if (!remove) {
6,522!
883
    stError("s-task:0x%x not in meta task list, internal error", id->taskId);
×
884
  }
885
}
6,522✔
886

887
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
6,521✔
888
  int32_t code = 0;
6,521✔
889
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
6,521✔
890
    code = streamTaskSendCheckpointSourceRsp(pTask);
3,259✔
891
    if (code) {
3,260!
892
      stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
×
893
              tstrerror(code));
894
    }
895
  }
896

897
  // let's kill the query procedure within stream, to end it ASAP.
898
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
6,522✔
899
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
3,348✔
900
    if (code != TSDB_CODE_SUCCESS) {
3,348!
901
      stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
902
    }
903
  }
904
  return code;
6,522✔
905
}
906

907
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
6,552✔
908
  SStreamTask* pTask = NULL;
6,552✔
909
  int32_t      vgId = pMeta->vgId;
6,552✔
910
  int32_t      code = 0;
6,552✔
911
  STaskId      id = {.streamId = streamId, .taskId = taskId};
6,552✔
912

913
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
6,552✔
914
  if (code == 0) {
6,552✔
915
    // desc the paused task counter
916
    if (streamTaskShouldPause(pTask)) {
6,522!
UNCOV
917
      int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
×
UNCOV
918
      stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
×
919
    }
920

921
    // handle the dropping event
922
    code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
6,522✔
923
    if (code) {
6,522!
924
      stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
×
925
    }
926

927
    stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
6,522✔
928

929
    // it is a fill-history task, remove the related stream task's id that points to it
930
    if (pTask->info.fillHistory == 0) {
6,522✔
931
      int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
3,604✔
932
    }
933

934
    code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
6,522✔
935
    doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
6,522✔
936
    code = streamMetaRemoveTask(pMeta, &id);
6,522✔
937
    if (code) {
6,522!
938
      stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
×
939
    }
940

941
    int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
6,522✔
942
    int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
6,522✔
943
    if (sizeInList != size) {
6,522!
944
      stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
×
945
    }
946

947
    if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
6,522✔
948
      stDebug("s-task:%s stop schedTimer", pTask->id.idStr);
930!
949
      streamTmrStop(pTask->schedInfo.pDelayTimer);
930✔
950
      pTask->info.delaySchedParam = 0;
930✔
951
    }
952

953
    int64_t refId = pTask->id.refId;
6,522✔
954
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
6,522✔
955
    if (ret != 0) {
6,522!
956
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
957
    }
958

959
    streamMetaReleaseTask(pMeta, pTask);
6,522✔
960
  } else {
961
    stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
30!
962
  }
963

964
  return 0;
6,552✔
965
}
966

967
int32_t streamMetaBegin(SStreamMeta* pMeta) {
10,784✔
968
  streamMetaWLock(pMeta);
10,784✔
969
  int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
10,799✔
970
                          TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
971
  if (code) {
10,799!
972
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
973
  }
974
  streamMetaWUnLock(pMeta);
10,799✔
975
  return code;
10,797✔
976
}
977

978
int32_t streamMetaCommit(SStreamMeta* pMeta) {
25,725✔
979
  int32_t code = tdbCommit(pMeta->db, pMeta->txn);
25,725✔
980
  if (code != 0) {
25,723!
981
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
982
    stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
983
            pMeta->fatalInfo.line);
984
  }
985

986
  code = tdbPostCommit(pMeta->db, pMeta->txn);
25,723✔
987
  if (code != 0) {
25,723!
988
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
989
    stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
990
            pMeta->fatalInfo.line);
991
    return code;
×
992
  }
993

994
  code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
25,723✔
995
                  TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
996
  if (code != 0) {
25,722!
997
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
998
    stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
×
999
  } else {
1000
    stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
25,722✔
1001
  }
1002

1003
  return code;
25,725✔
1004
}
1005

1006
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
97✔
1007
  int64_t checkpointId = 0;
97✔
1008
  int32_t code = 0;
97✔
1009

1010
  TBC* pCur = NULL;
97✔
1011
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
97!
1012
    stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1013
    return checkpointId;
×
1014
  }
1015

1016
  void*    pKey = NULL;
97✔
1017
  int32_t  kLen = 0;
97✔
1018
  void*    pVal = NULL;
97✔
1019
  int32_t  vLen = 0;
97✔
1020
  SDecoder decoder;
1021

1022
  code = tdbTbcMoveToFirst(pCur);
97✔
1023
  if (code) {
97!
1024
    stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1025
    tdbTbcClose(pCur);
×
1026
    return checkpointId;
×
1027
  }
1028

1029
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
334✔
1030
    if (pVal == NULL || vLen == 0) {
237!
1031
      break;
1032
    }
1033
    SCheckpointInfo info;
1034
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
237✔
1035
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
237✔
1036
      tDecoderClear(&decoder);
32✔
1037
      continue;
32✔
1038
    }
1039
    tDecoderClear(&decoder);
205✔
1040

1041
    checkpointId = TMAX(checkpointId, info.checkpointId);
205✔
1042
  }
1043

1044
  stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
97✔
1045

1046
  tdbFree(pKey);
97✔
1047
  tdbFree(pVal);
97✔
1048

1049
  tdbTbcClose(pCur);
97✔
1050
  return checkpointId;
97✔
1051
}
1052

1053
// not allowed to return error code
1054
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
10,810✔
1055
  TBC*     pCur = NULL;
10,810✔
1056
  void*    pKey = NULL;
10,810✔
1057
  int32_t  kLen = 0;
10,810✔
1058
  void*    pVal = NULL;
10,810✔
1059
  int32_t  vLen = 0;
10,810✔
1060
  SDecoder decoder;
1061
  int32_t  vgId = 0;
10,810✔
1062
  int32_t  code = 0;
10,810✔
1063
  SArray*  pRecycleList = NULL;
10,810✔
1064

1065
  if (pMeta == NULL) {
10,810!
1066
    return;
×
1067
  }
1068

1069
  vgId = pMeta->vgId;
10,810✔
1070
  pRecycleList = taosArrayInit(4, sizeof(STaskId));
10,810✔
1071
  if (pRecycleList == NULL) {
10,810!
1072
    stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
×
1073
    return;
×
1074
  }
1075

1076
  stInfo("vgId:%d load stream tasks from meta files", vgId);
10,810!
1077

1078
  code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
10,810✔
1079
  if (code != TSDB_CODE_SUCCESS) {
10,810!
1080
    stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1081
    taosArrayDestroy(pRecycleList);
×
1082
    return;
×
1083
  }
1084

1085
  code = tdbTbcMoveToFirst(pCur);
10,810✔
1086
  if (code) {
10,809!
1087
    stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1088
    taosArrayDestroy(pRecycleList);
×
1089
    tdbTbcClose(pCur);
×
1090
    return;
×
1091
  }
1092

1093
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
11,062✔
1094
    if (pVal == NULL || vLen == 0) {
261!
1095
      break;
1096
    }
1097

1098
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
261!
1099
    if (pTask == NULL) {
261!
1100
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1101
      stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
×
1102
      break;
×
1103
    }
1104

1105
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
261✔
1106
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
261✔
1107
      tDecoderClear(&decoder);
8✔
1108
      tFreeStreamTask(pTask);
8✔
1109
      stError(
8!
1110
          "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
1111
          "stream manually",
1112
          vgId, tsDataDir);
1113
      break;
8✔
1114
    }
1115
    tDecoderClear(&decoder);
253✔
1116

1117
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
253!
1118
      int32_t taskId = pTask->id.taskId;
×
1119
      STaskId id = streamTaskGetTaskId(pTask);
×
1120

1121
      tFreeStreamTask(pTask);
×
1122
      void* px = taosArrayPush(pRecycleList, &id);
×
1123
      if (px == NULL) {
×
1124
        stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
×
1125
      }
1126

1127
      int32_t total = taosArrayGetSize(pRecycleList);
×
1128
      stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
×
1129
      continue;
×
1130
    }
1131

1132
    stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
253✔
1133
            pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
1134

1135
    // do duplicate task check.
1136
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
253✔
1137
    void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
253✔
1138
    if (p == NULL) {
253!
1139
      code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
253✔
1140
      if (code < 0) {
253!
1141
        stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
×
1142
        tFreeStreamTask(pTask);
×
1143
        continue;
×
1144
      }
1145

1146
      void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
253✔
1147
      if (px == NULL) {
253!
1148
        stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
×
1149
      }
1150
    } else {
1151
      // todo this should replace the existed object put by replay creating stream task msg from mnode
1152
      stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
×
1153
      tFreeStreamTask(pTask);
×
1154
      continue;
×
1155
    }
1156

1157
    pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
253✔
1158

1159
    if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
253!
1160
      int64_t refId = pTask->id.refId;
×
1161
      stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
×
1162
              pTask->id.taskId, tstrerror(terrno), refId);
1163

1164
      void*   px = taosArrayPop(pMeta->pTaskList);
×
1165
      int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
1166
      if (ret != 0) {
×
1167
        stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1168
      }
1169
      continue;
×
1170
    }
1171

1172
    stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId);
253!
1173
    if (pTask->info.fillHistory == 0) {
253✔
1174
      int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
198✔
1175
    }
1176

1177
    if (streamTaskShouldPause(pTask)) {
253!
1178
      int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
1179
    }
1180
  }
1181

1182
  tdbFree(pKey);
10,809✔
1183
  tdbFree(pVal);
10,809✔
1184

1185
  tdbTbcClose(pCur);
10,809✔
1186

1187
  if (taosArrayGetSize(pRecycleList) > 0) {
10,810!
1188
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
×
1189
      STaskId* pId = taosArrayGet(pRecycleList, i);
×
1190
      code = streamMetaRemoveTask(pMeta, pId);
×
1191
      if (code) {
×
1192
        stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
×
1193
      }
1194
    }
1195
  }
1196

1197
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
10,810✔
1198
  stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
10,810✔
1199
          pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
1200

1201
  taosArrayDestroy(pRecycleList);
10,810✔
1202
  code = streamMetaCommit(pMeta);
10,810✔
1203
  if (code) {
10,810!
1204
    stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
×
1205
  }
1206
}
1207

1208
void streamMetaNotifyClose(SStreamMeta* pMeta) {
16,879✔
1209
  int32_t vgId = pMeta->vgId;
16,879✔
1210
  int64_t startTs = 0;
16,879✔
1211
  int32_t sendCount = 0;
16,879✔
1212

1213
  streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
16,879✔
1214
  stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
16,879!
1215
         vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
1216

1217
  // wait for the stream meta hb function stopping
1218
  streamMetaWaitForHbTmrQuit(pMeta);
16,879✔
1219
  pMeta->closeFlag = true;
16,879✔
1220

1221
  stDebug("vgId:%d start to check all tasks for closing", vgId);
16,879✔
1222
  int64_t st = taosGetTimestampMs();
16,879✔
1223

1224
  streamMetaRLock(pMeta);
16,879✔
1225

1226
  SArray* pTaskList = NULL;
16,879✔
1227
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
16,879✔
1228
  if (code != TSDB_CODE_SUCCESS) {
1229
  }
1230

1231
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
16,879✔
1232
  for (int32_t i = 0; i < numOfTasks; ++i) {
21,224✔
1233
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
4,345✔
1234
    SStreamTask*   pTask = NULL;
4,345✔
1235

1236
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
4,345✔
1237
    if (code != TSDB_CODE_SUCCESS) {
4,345✔
1238
      continue;
1,932✔
1239
    }
1240

1241
    int64_t refId = pTask->id.refId;
2,413✔
1242
    int32_t ret = streamTaskStop(pTask);
2,413✔
1243
    if (ret) {
2,413!
1244
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
1245
    }
1246

1247
    streamMetaReleaseTask(pMeta, pTask);
2,413✔
1248
    ret = taosRemoveRef(streamTaskRefPool, refId);
2,413✔
1249
    if (ret) {
2,413!
1250
      stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
×
1251
    }
1252
  }
1253

1254
  taosArrayDestroy(pTaskList);
16,879✔
1255

1256
  double el = (taosGetTimestampMs() - st) / 1000.0;
16,879✔
1257
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
16,879✔
1258
  streamMetaRUnLock(pMeta);
16,879✔
1259
}
16,879✔
1260

1261
void streamMetaStartHb(SStreamMeta* pMeta) {
8,832✔
1262
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
8,832!
1263
  if (pRid == NULL) {
8,831!
1264
    stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
×
1265
    return;
×
1266
  }
1267

1268
  *pRid = pMeta->rid;
8,831✔
1269
  int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
8,831✔
1270
  if (code) {
8,832!
1271
    return;
×
1272
  }
1273

1274
  streamMetaHbToMnode(pRid, NULL);
8,832✔
1275
}
1276

1277
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
16,910✔
1278
  QRY_PARAM_CHECK(pList);
16,910!
1279

1280
  int32_t code = 0;
16,910✔
1281
  SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
16,910✔
1282
  if (pTaskList == NULL) {
16,909!
1283
    stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
×
1284
    return terrno;
×
1285
  }
1286

1287
  *pList = pTaskList;
16,909✔
1288

1289
  bool sendMsg = pMeta->sendMsgBeforeClosing;
16,909✔
1290
  if (!sendMsg) {
16,909✔
1291
    stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
16,887✔
1292
    return TSDB_CODE_SUCCESS;
16,888✔
1293
  }
1294

1295
  stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
22!
1296

1297
  // send hb msg to mnode before closing all tasks.
1298
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
22✔
1299
  for (int32_t i = 0; i < numOfTasks; ++i) {
22!
1300
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
1301
    SStreamTask*   pTask = NULL;
×
1302

1303
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
1304
    if (code != TSDB_CODE_SUCCESS) {  // this error is ignored
×
1305
      continue;
×
1306
    }
1307

1308
    streamTaskSetCheckpointFailed(pTask);
×
1309
    streamMetaReleaseTask(pMeta, pTask);
×
1310
  }
1311

1312
  code = streamMetaSendHbHelper(pMeta);
22✔
1313
  pMeta->sendMsgBeforeClosing = false;
22✔
1314
  return TSDB_CODE_SUCCESS;  // always return true
22✔
1315
}
1316

1317
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
13,415✔
1318
  streamMetaWLock(pMeta);
13,415✔
1319

1320
  int64_t prevStage = pMeta->stage;
13,416✔
1321
  pMeta->stage = stage;
13,416✔
1322

1323
  // mark the sign to send msg before close all tasks
1324
  // 1. for leader vnode, always send msg before closing
1325
  // 2. for follower vnode, if it's is changed from leader, also sending msg before closing.
1326
  if (pMeta->role == NODE_ROLE_LEADER) {
13,416✔
1327
    pMeta->sendMsgBeforeClosing = true;
23✔
1328
  }
1329

1330
  pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
13,416✔
1331
  streamMetaWUnLock(pMeta);
13,416✔
1332

1333
  if (isLeader) {
13,416✔
1334
    stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
8,832!
1335
           pMeta->vgId, prevStage, stage, isLeader, pMeta->rid);
1336
    streamMetaStartHb(pMeta);
8,832✔
1337
  } else {
1338
    stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
4,584!
1339
           prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing);
1340
  }
1341
}
13,416✔
1342

1343
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
1,216✔
1344
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
1,216✔
1345
  for (int32_t i = 0; i < num; ++i) {
3,727✔
1346
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
2,527✔
1347
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
2,527✔
1348
    SStreamTask*   pTask = NULL;
2,527✔
1349
    int32_t        code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
2,527✔
1350

1351
    if (code == 0) {
2,527!
1352
      if (pTask->status.downstreamReady == 0) {
2,527✔
1353
        streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
16✔
1354
        return false;
16✔
1355
      }
1356
      streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
2,511✔
1357
    }
1358
  }
1359

1360
  return true;
1,200✔
1361
}
1362

1363
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
62✔
1364
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
62✔
1365

1366
  stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
62✔
1367
  if (numOfTasks == 0) {
62!
1368
    return TSDB_CODE_SUCCESS;
×
1369
  }
1370

1371
  for (int32_t i = 0; i < numOfTasks; ++i) {
253✔
1372
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
191✔
1373
    STaskId        id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
191✔
1374
    SStreamTask*   pTask = NULL;
191✔
1375
    int32_t        code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
191✔
1376
    if (code == 0) {
191!
1377
      streamTaskResetStatus(pTask);
191✔
1378
      streamMetaReleaseTask(pMeta, pTask);
191✔
1379
    }
1380
  }
1381

1382
  return 0;
62✔
1383
}
1384

1385
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
31✔
1386
                                     int64_t startTs) {
1387
  const char* id = pTask->id.idStr;
31✔
1388
  int32_t     vgId = pMeta->vgId;
31✔
1389
  int32_t     code = 0;
31✔
1390

1391
  // keep the already updated info
1392
  STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
31✔
1393
  code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
31✔
1394
  if (code != 0) {
31!
1395
    stError("s-task:%s failed to put updateTask into update list", id);
×
1396
  }
1397

1398
  int64_t el = taosGetTimestampMs() - startTs;
31✔
1399
  if (pHTask != NULL) {
31✔
1400
    STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
19✔
1401
    code = taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
19✔
1402
    if (code != 0) {
19!
1403
      stError("s-task:%s failed to put updateTask into update list", id);
×
1404
    } else {
1405
      stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
19!
1406
              " ms",
1407
              id, vgId, transId, el);
1408
    }
1409
  } else {
1410
    stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
12!
1411
            id, vgId, transId, el);
1412
  }
1413
}
31✔
1414

1415
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
17✔
1416
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
17✔
1417

1418
  taosHashClear(pInfo->pTasks);
17✔
1419

1420
  int32_t prev = pInfo->completeTransId;
17✔
1421
  pInfo->completeTransId = pInfo->activeTransId;
17✔
1422
  pInfo->activeTransId = -1;
17✔
1423
  pInfo->completeTs = taosGetTimestampMs();
17✔
1424

1425
  stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64 ", complete transId:%d->%d, reset active transId",
17!
1426
          pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId);
1427
}
17✔
1428

1429
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
31✔
1430
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
31✔
1431

1432
  if (transId > pInfo->completeTransId) {
31!
1433
    if (pInfo->activeTransId == -1) {
31✔
1434
      taosHashClear(pInfo->pTasks);
17✔
1435
      pInfo->activeTransId = transId;
17✔
1436

1437
      stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId,
17!
1438
             pInfo->completeTransId);
1439
      return true;
17✔
1440
    } else {
1441
      if (pInfo->activeTransId == transId) {
14!
1442
        // do nothing
1443
        return true;
14✔
1444
      } else if (transId < pInfo->activeTransId) {
×
1445
        stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard",
×
1446
                pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
1447
        return false;
×
1448
      } else {  // transId > pInfo->activeTransId
1449
        taosHashClear(pInfo->pTasks);
×
1450
        int32_t prev = pInfo->activeTransId;
×
1451
        pInfo->activeTransId = transId;
×
1452

1453
        stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId,
×
1454
               transId, prev, pInfo->completeTransId);
1455
        return true;
×
1456
      }
1457
    }
1458
  } else if (transId == pInfo->completeTransId) {
×
1459
    stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
×
1460
            pInfo->completeTs);
1461
    return false;
×
1462
  } else {  // pInfo->completeTransId > transId
1463
    stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
×
1464
            pMeta->vgId, pInfo->activeTransId, transId);
1465
    return false;
×
1466
  }
1467
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc