• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.54
/source/libs/stream/src/streamMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "streamInt.h"
18
#include "tmisce.h"
19
#include "tref.h"
20
#include "tsched.h"
21
#include "tstream.h"
22
#include "ttimer.h"
23
#include "wal.h"
24

25
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
26

27
int32_t streamBackendId = 0;
28
int32_t streamBackendCfWrapperId = 0;
29
int32_t taskDbWrapperId = 0;
30
int32_t streamTaskRefPool = 0;
31

32
static int32_t streamMetaBegin(SStreamMeta* pMeta);
33
static void    streamMetaCloseImpl(void* arg);
34

35
typedef struct {
36
  TdThreadMutex mutex;
37
  SHashObj*     pTable;
38
} SMetaRefMgt;
39

40
SMetaRefMgt gMetaRefMgt;
41

42
int32_t metaRefMgtInit();
43
void    metaRefMgtCleanup();
44

45
static void streamMetaEnvInit() {
1,880✔
46
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
1,880✔
47
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
1,880✔
48
  taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
1,880✔
49

50
  streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
1,880✔
51
  streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
1,880✔
52

53
  int32_t code = metaRefMgtInit();
1,880✔
54
  if (code) {
1,880!
55
    stError("failed to init stream meta mgmt env, start failed");
×
56
    return;
×
57
  }
58

59
  code = streamTimerInit();
1,880✔
60
  if (code) {
1,880!
61
    stError("failed to init stream meta env, start failed");
×
62
  }
63
}
64

65
void streamMetaInit() {
1,880✔
66
  int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
1,880✔
67
  if (code) {
1,880!
68
    stError("failed to init stream Meta model, code:%s", tstrerror(code));
×
69
  }
70
}
1,880✔
71

72
void streamMetaCleanup() {
1,879✔
73
  taosCloseRef(streamBackendId);
1,879✔
74
  taosCloseRef(streamBackendCfWrapperId);
1,879✔
75
  taosCloseRef(streamMetaRefPool);
1,879✔
76
  taosCloseRef(streamTaskRefPool);
1,879✔
77

78
  metaRefMgtCleanup();
1,879✔
79
  streamTimerCleanUp();
1,879✔
80
}
1,879✔
81

82
int32_t metaRefMgtInit() {
1,880✔
83
  int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
1,880✔
84
  if (code) {
1,880!
85
    return code;
×
86
  }
87

88
  if (code == 0) {
1,880!
89
    gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
1,880✔
90
  }
91

92
  if (gMetaRefMgt.pTable == NULL) {
1,880!
93
    return terrno;
×
94
  } else {
95
    return code;
1,880✔
96
  }
97
}
98

99
void metaRefMgtCleanup() {
1,879✔
100
  void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
1,879✔
101
  while (pIter) {
25,481✔
102
    int64_t* p = *(int64_t**)pIter;
23,602✔
103
    taosMemoryFree(p);
23,602!
104
    pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
23,602✔
105
  }
106

107
  taosHashCleanup(gMetaRefMgt.pTable);
1,879✔
108
  streamMutexDestroy(&gMetaRefMgt.mutex);
1,879✔
109
}
1,879✔
110

111
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
53,689✔
112
  int32_t code = 0;
53,689✔
113
  void*   p = NULL;
53,689✔
114

115
  streamMutexLock(&gMetaRefMgt.mutex);
53,689✔
116

117
  p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
53,710✔
118
  if (p == NULL) {
53,710!
119
    code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
53,710✔
120
    if (code) {
53,710!
121
      stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid,
×
122
              tstrerror(code));
123
      return code;
×
124
    } else {  // not
125
              //      stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
126
    }
127
  } else {
128
    stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid);
×
129
  }
130

131
  streamMutexUnlock(&gMetaRefMgt.mutex);
53,710✔
132
  return code;
53,710✔
133
}
134

135
void metaRefMgtRemove(int64_t* pRefId) {
30,107✔
136
  streamMutexLock(&gMetaRefMgt.mutex);
30,107✔
137

138
  int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
30,107✔
139
  taosMemoryFree(pRefId);
30,107!
140
  streamMutexUnlock(&gMetaRefMgt.mutex);
30,107✔
141
}
30,107✔
142

143
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
10,185✔
144
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
10,185!
145
    stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
×
146
    return -1;
×
147
  }
148

149
  if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
10,186!
150
    stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
×
151
    return -1;
×
152
  }
153

154
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
10,185!
155
    stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
×
156
    return -1;
×
157
  }
158

159
  return 0;
10,184✔
160
}
161

162
//
163
// impl later
164
//
165
enum STREAM_STATE_VER {
166
  STREAM_STATA_NO_COMPATIBLE,
167
  STREAM_STATA_COMPATIBLE,
168
  STREAM_STATA_NEED_CONVERT,
169
};
170

171
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
10,184✔
172
  int8_t  ret = STREAM_STATA_COMPATIBLE;
10,184✔
173
  TBC*    pCur = NULL;
10,184✔
174
  int32_t code = 0;
10,184✔
175
  void*   pKey = NULL;
10,184✔
176
  int32_t kLen = 0;
10,184✔
177
  void*   pVal = NULL;
10,184✔
178
  int32_t vLen = 0;
10,184✔
179

180
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {  // no task info, no stream
10,184!
181
    return ret;
×
182
  }
183

184
  code = tdbTbcMoveToFirst(pCur);
10,184✔
185
  if (code) {
10,184!
186
    stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
×
187
            tstrerror(code));
188
    tdbTbcClose(pCur);
×
189
    return ret;
×
190
  }
191

192
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
10,184✔
193
    if (pVal == NULL || vLen == 0) {
165!
194
      break;
195
    }
196

197
    SDecoder        decoder;
198
    SCheckpointInfo info;
199
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
165✔
200
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
165!
201
      continue;
×
202
    }
203

204
    if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
165!
205
      ret = STREAM_STATA_NO_COMPATIBLE;
×
206
    } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
165!
207
      ret = STREAM_STATA_NEED_CONVERT;
165✔
208
    }
209

210
    tDecoderClear(&decoder);
165✔
211
    break;
165✔
212
  }
213

214
  tdbFree(pKey);
10,184✔
215
  tdbFree(pVal);
10,184✔
216
  tdbTbcClose(pCur);
10,184✔
217
  return ret;
10,186✔
218
}
219

220
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
165✔
221
  int32_t          code = 0;
165✔
222
  SBackendWrapper* pBackend = NULL;
165✔
223
  int64_t          chkpId = streamMetaGetLatestCheckpointId(pMeta);
165✔
224

225
  terrno = 0;
165✔
226
  bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
165✔
227
  if (exist == false) {
165!
228
    code = terrno;
165✔
229
    return code;
165✔
230
  }
231

232
  code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
×
233
  if (code) {
×
234
    return code;
×
235
  }
236

237
  void* pIter = taosHashIterate(pBackend->cfInst, NULL);
×
238
  while (pIter) {
×
239
    void* key = taosHashGetKey(pIter, NULL);
×
240
    code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
×
241
    if (code != 0) {
×
242
      stError("failed to cvt data");
×
243
      goto _EXIT;
×
244
    }
245

246
    pIter = taosHashIterate(pBackend->cfInst, pIter);
×
247
  }
248

249
_EXIT:
×
250
  streamBackendCleanup((void*)pBackend);
×
251

252
  if (code == 0) {
×
253
    char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32);
×
254
    if (state != NULL) {
×
255
      sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
×
256
      taosRemoveDir(state);
×
257
      taosMemoryFree(state);
×
258
    } else {
259
      stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
×
260
    }
261
  }
262

263
  return code;
×
264
}
265

266
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
10,184✔
267
  int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
10,184✔
268
  if (compatible == STREAM_STATA_COMPATIBLE) {
10,185✔
269
    return 0;
10,020✔
270
  } else if (compatible == STREAM_STATA_NEED_CONVERT) {
165!
271
    stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
165!
272
    return streamMetaCvtDbFormat(pMeta);
165✔
273
  } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
×
274
    stError(
×
275
        "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
276
        "manually",
277
        pMeta->vgId, tsDataDir);
278

279
    return -1;
×
280
  }
281

282
  return 0;
×
283
}
284

285
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) {
7,035✔
286
  int32_t code = 0;
7,035✔
287
  int64_t chkpId = pTask->chkInfo.checkpointId;
7,035✔
288

289
  streamMutexLock(&pMeta->backendMutex);
7,035✔
290
  void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
7,035✔
291
  if ((ppBackend != NULL) && (*ppBackend != NULL)) {
7,035!
292
    void* p = taskDbAddRef(*ppBackend);
2,417✔
293
    if (p == NULL) {
2,417!
294
      stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
×
295
      streamMutexUnlock(&pMeta->backendMutex);
×
296
      return TSDB_CODE_FAILED;
×
297
    }
298

299
    STaskDbWrapper* pBackend = *ppBackend;
2,417✔
300
    pBackend->pMeta = pMeta;
2,417✔
301
    pTask->pBackend = pBackend;
2,417✔
302

303
    streamMutexUnlock(&pMeta->backendMutex);
2,417✔
304
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
2,417✔
305
    return 0;
2,417✔
306
  }
307

308
  STaskDbWrapper* pBackend = NULL;
4,618✔
309
  int64_t         processVer = -1;
4,618✔
310
  while (1) {
311
    code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
4,618✔
312
    if (code == 0) {
4,618!
313
      break;
4,618✔
314
    }
315

316
    streamMutexUnlock(&pMeta->backendMutex);
×
317
    taosMsleep(1000);
×
318

319
    stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
×
320
    streamMutexLock(&pMeta->backendMutex);
×
321
  }
322

323
  int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
4,618✔
324
  pTask->pBackend = pBackend;
4,618✔
325
  pBackend->refId = tref;
4,618✔
326
  pBackend->pTask = pTask;
4,618✔
327
  pBackend->pMeta = pMeta;
4,618✔
328

329
  if (processVer != -1) {
4,618✔
330
    if (pTask->chkInfo.processedVer != processVer) {
27!
331
      stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
×
332
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
333
      pTask->chkInfo.processedVer = processVer;
×
334
      pTask->chkInfo.checkpointVer = processVer;
×
335
      pTask->chkInfo.nextProcessVer = processVer + 1;
×
336
    } else {
337
      stInfo("s-task:%s vgId:%d processedVer:%" PRId64
27!
338
             " in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
339
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
340
    }
341
  }
342

343
  code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
4,618✔
344
  if (code) {
4,618!
345
    stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
×
346
  }
347
  streamMutexUnlock(&pMeta->backendMutex);
4,618✔
348

349
  stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
4,618✔
350
  return 0;
4,618✔
351
}
352

353
void streamMetaRemoveDB(void* arg, char* key) {
4,453✔
354
  if (arg == NULL || key == NULL) return;
4,453!
355

356
  SStreamMeta* pMeta = arg;
4,453✔
357
  streamMutexLock(&pMeta->backendMutex);
4,453✔
358
  int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
4,454✔
359
  if (code) {
4,454!
360
    stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
×
361
  }
362

363
  streamMutexUnlock(&pMeta->backendMutex);
4,454✔
364
}
365

366
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
10,112✔
367
                       int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
368
  QRY_PARAM_CHECK(p);
10,112!
369
  int32_t code = 0;
10,112✔
370
  int32_t lino = 0;
10,112✔
371

372
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
10,112!
373
  if (pMeta == NULL) {
10,186!
374
    stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
×
375
    return terrno;
×
376
  }
377

378
  int32_t len = strlen(path) + 64;
10,186✔
379
  char*   tpath = taosMemoryCalloc(1, len);
10,186!
380
  TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
10,186!
381

382
  sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
10,186✔
383
  pMeta->path = tpath;
10,186✔
384

385
  code = streamMetaOpenTdb(pMeta);
10,186✔
386
  TSDB_CHECK_CODE(code, lino, _err);
10,184!
387

388
  if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
10,184!
389
    stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
×
390
            tstrerror(terrno));
391
    TSDB_CHECK_CODE(code, lino, _err);
×
392
  }
393

394
  if ((code = streamMetaBegin(pMeta) < 0)) {
10,184!
395
    stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
×
396
    goto _err;
×
397
  }
398

399
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
10,186✔
400
  pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
10,185✔
401
  TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
10,185!
402

403
  pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
10,185✔
404
  TSDB_CHECK_NULL(pMeta->updateInfo.pTasks, code, lino, _err, terrno);
10,186!
405

406
  code = streamMetaInitStartInfo(&pMeta->startInfo);
10,186✔
407
  TSDB_CHECK_CODE(code, lino, _err);
10,185!
408

409
  // task list
410
  pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
10,185✔
411
  TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
10,186!
412

413
  pMeta->scanInfo.scanCounter = 0;
10,186✔
414
  pMeta->vgId = vgId;
10,186✔
415
  pMeta->ahandle = ahandle;
10,186✔
416
  pMeta->buildTaskFn = buildTaskFn;
10,186✔
417
  pMeta->expandTaskFn = expandTaskFn;
10,186✔
418
  pMeta->stage = stage;
10,186✔
419
  pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
10,186✔
420
  pMeta->updateInfo.activeTransId = -1;
10,186✔
421
  pMeta->updateInfo.completeTransId = -1;
10,186✔
422

423
  pMeta->startInfo.completeFn = fn;
10,186✔
424
  pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
10,186✔
425
  TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
10,186!
426

427
  pMeta->numOfPausedTasks = 0;
10,186✔
428
  pMeta->numOfStreamTasks = 0;
10,186✔
429
  pMeta->closeFlag = false;
10,186✔
430

431
  stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
10,186!
432

433
  // set the attribute when running on Linux OS
434
  TdThreadRwlockAttr attr;
435
  code = taosThreadRwlockAttrInit(&attr);
10,186✔
436
  TSDB_CHECK_CODE(code, lino, _err);
10,186!
437

438
#ifdef LINUX
439
  code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
10,186✔
440
  TSDB_CHECK_CODE(code, lino, _err);
10,186!
441
#endif
442

443
  code = taosThreadRwlockInit(&pMeta->lock, &attr);
10,186✔
444
  TSDB_CHECK_CODE(code, lino, _err);
10,186!
445

446
  code = taosThreadRwlockAttrDestroy(&attr);
10,186✔
447
  TSDB_CHECK_CODE(code, lino, _err);
10,186!
448

449
  code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
10,186✔
450
  TSDB_CHECK_CODE(code, lino, _err);
10,186!
451

452
  code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
10,186✔
453
  TSDB_CHECK_CODE(code, lino, _err);
10,185!
454

455
  // add refId at the end of initialization function
456
  pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
10,185✔
457

458
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
10,186!
459
  TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
10,186!
460

461
  memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
10,186✔
462

463
  code = metaRefMgtAdd(pMeta->vgId, pRid);
10,186✔
464
  TSDB_CHECK_CODE(code, lino, _err);
10,186!
465

466
  code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
10,186✔
467

468
  TSDB_CHECK_CODE(code, lino, _err);
10,186!
469

470
  *p = pMeta;
10,186✔
471
  return code;
10,186✔
472

473
_err:
×
474
  taosMemoryFree(pMeta->path);
×
475
  if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
×
476
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
×
477
  if (pMeta->pTaskDb) {
×
478
    tdbTbClose(pMeta->pTaskDb);
×
479
    pMeta->pTaskDb = NULL;
×
480
  }
481
  if (pMeta->pCheckpointDb) {
×
482
    tdbTbClose(pMeta->pCheckpointDb);
×
483
  }
484
  if (pMeta->db) {
×
485
    tdbClose(pMeta->db);
×
486
  }
487

488
  if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
×
489
  if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
×
490
  if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
×
491
  if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
×
492
  if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
×
493

494
  taosMemoryFree(pMeta);
×
495

496
  stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
×
497
  return code;
×
498
}
499

500
// todo refactor: the lock shoud be restricted in one function
501
#ifdef BUILD_NO_CALL
502
void streamMetaInitBackend(SStreamMeta* pMeta) {
503
  pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
504
  if (pMeta->streamBackend == NULL) {
505
    while (1) {
506
      streamMetaWLock(pMeta);
507
      pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
508
      if (pMeta->streamBackend != NULL) {
509
        break;
510
      }
511

512
      streamMetaWUnLock(pMeta);
513
      stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
514
      taosMsleep(100);
515
    }
516
  }
517

518
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
519
  streamBackendLoadCheckpointInfo(pMeta);
520
}
521
#endif
522

523
void streamMetaClear(SStreamMeta* pMeta) {
10,220✔
524
  // remove all existed tasks in this vnode
525
  void* pIter = NULL;
10,220✔
526
  while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
12,725✔
527
    int64_t      refId = *(int64_t*)pIter;
2,504✔
528
    SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
2,504✔
529
    if (p == NULL) {
2,505✔
530
      continue;
2,316✔
531
    }
532

533
    // release the ref by timer
534
    if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) {  // one more ref in timer
189!
535
      stDebug("s-task:%s stop schedTimer", p->id.idStr);
×
536
      streamTmrStop(p->schedInfo.pDelayTimer);
×
537
      p->info.delaySchedParam = 0;
×
538
    }
539

540
    int32_t code = taosRemoveRef(streamTaskRefPool, refId);
189✔
541
    if (code) {
189!
542
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
543
    }
544

545
    code = taosReleaseRef(streamTaskRefPool, refId);
189✔
546
    if (code) {
189!
547
      stError("vgId:%d failed to release refId:%" PRId64, pMeta->vgId, refId);
×
548
    }
549
  }
550

551
  if (pMeta->streamBackendRid != 0) {
10,220!
552
    int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
×
553
    if (code) {
×
554
      stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
×
555
    }
556
  }
557

558
  taosHashClear(pMeta->pTasksMap);
10,220✔
559

560
  taosArrayClear(pMeta->pTaskList);
10,220✔
561
  taosArrayClear(pMeta->chkpSaved);
10,220✔
562
  taosArrayClear(pMeta->chkpInUse);
10,220✔
563

564
  pMeta->numOfStreamTasks = 0;
10,220✔
565
  pMeta->numOfPausedTasks = 0;
10,220✔
566

567
  // the willrestart/starting flag can NOT be cleared
568
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
10,220✔
569
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
10,220✔
570
  pMeta->startInfo.readyTs = 0;
10,220✔
571
}
10,220✔
572

573
void streamMetaClose(SStreamMeta* pMeta) {
10,178✔
574
  stDebug("vgId:%d start to close stream meta", pMeta->vgId);
10,178✔
575
  if (pMeta == NULL) {
10,185!
576
    return;
×
577
  }
578
  int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
10,185✔
579
  if (code) {
10,185!
580
    stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
×
581
  }
582
}
583

584
void streamMetaCloseImpl(void* arg) {
10,185✔
585
  SStreamMeta* pMeta = arg;
10,185✔
586
  if (pMeta == NULL) {
10,185!
587
    return;
×
588
  }
589

590
  int32_t code = 0;
10,185✔
591
  int32_t vgId = pMeta->vgId;
10,185✔
592
  stDebug("vgId:%d start to do-close stream meta", vgId);
10,185✔
593

594
  streamMetaWLock(pMeta);
10,185✔
595
  streamMetaClear(pMeta);
10,185✔
596
  streamMetaWUnLock(pMeta);
10,185✔
597

598
  // already log the error, ignore here
599
  tdbAbort(pMeta->db, pMeta->txn);
10,185✔
600
  tdbTbClose(pMeta->pTaskDb);
10,182✔
601
  tdbTbClose(pMeta->pCheckpointDb);
10,185✔
602
  tdbClose(pMeta->db);
10,185✔
603

604
  taosArrayDestroy(pMeta->pTaskList);
10,185✔
605
  taosArrayDestroy(pMeta->chkpSaved);
10,185✔
606
  taosArrayDestroy(pMeta->chkpInUse);
10,185✔
607

608
  taosHashCleanup(pMeta->pTasksMap);
10,185✔
609
  taosHashCleanup(pMeta->pTaskDbUnique);
10,185✔
610
  taosHashCleanup(pMeta->updateInfo.pTasks);
10,185✔
611

612
  streamMetaClearStartInfo(&pMeta->startInfo);
10,185✔
613

614
  destroyMetaHbInfo(pMeta->pHbInfo);
10,185✔
615
  pMeta->pHbInfo = NULL;
10,185✔
616

617
  taosMemoryFree(pMeta->path);
10,185!
618
  streamMutexDestroy(&pMeta->backendMutex);
10,185✔
619

620
  bkdMgtDestroy(pMeta->bkdChkptMgt);
10,185✔
621

622
  pMeta->role = NODE_ROLE_UNINIT;
10,185✔
623
  code = taosThreadRwlockDestroy(&pMeta->lock);
10,185✔
624
  if (code) {
10,185!
625
    stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
×
626
  }
627

628
  taosMemoryFree(pMeta);
10,185!
629
  stDebug("vgId:%d end to close stream meta", vgId);
10,185✔
630
}
631

632
// todo let's check the status for each task
633
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
16,674✔
634
  int32_t vgId = pTask->pMeta->vgId;
16,674✔
635
  void*   buf = NULL;
16,674✔
636
  int32_t len;
637
  int32_t code;
638
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
16,674!
639
  if (code < 0) {
16,676!
640
    return -1;
×
641
  }
642

643
  buf = taosMemoryCalloc(1, len);
16,676!
644
  if (buf == NULL) {
16,677!
645
    return terrno;
×
646
  }
647

648
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
16,677✔
649
    pTask->ver = SSTREAM_TASK_VER;
22✔
650
  }
651

652
  SEncoder encoder = {0};
16,677✔
653
  tEncoderInit(&encoder, buf, len);
16,677✔
654
  code = tEncodeStreamTask(&encoder, pTask);
16,679✔
655
  tEncoderClear(&encoder);
16,680✔
656

657
  if (code == -1) {
16,670!
658
    stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
659
    return TSDB_CODE_INVALID_MSG;
×
660
  }
661

662
  int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
16,670✔
663

664
  code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
16,670✔
665
  if (code != TSDB_CODE_SUCCESS) {
16,673!
666
    code = terrno;
×
667
    stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
×
668
            vgId, pTask->id.refId, tstrerror(code));
669

670
    int64_t refId = pTask->id.refId;
×
671
    int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
×
672
    if (ret != 0) {
×
673
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId);
×
674
    }
675
  } else {
676
    stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
16,673✔
677
  }
678

679
  taosMemoryFree(buf);
16,673!
680
  return code;
16,682✔
681
}
682

683
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {
11,411✔
684
  int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
11,411✔
685
  int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
11,411✔
686
  if (code != 0) {
11,424!
687
    stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
×
688
            tstrerror(terrno));
689
  } else {
690
    stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
11,424✔
691
  }
692

693
  return code;
11,423✔
694
}
695

696
// add to the ready tasks hash map, not the restored tasks hash map
697
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
13,344✔
698
  *pAdded = false;
13,344✔
699

700
  int32_t code = 0;
13,344✔
701
  int64_t refId = 0;
13,344✔
702
  STaskId id = streamTaskGetTaskId(pTask);
13,344✔
703
  void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
13,341✔
704

705
  if (p != NULL) {
13,349✔
706
    stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId);
170✔
707
    tFreeStreamTask(pTask);
170✔
708
    return code;
170✔
709
  }
710

711
  if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
13,179!
712
    tFreeStreamTask(pTask);
×
713
    return code;
×
714
  }
715

716
  p = taosArrayPush(pMeta->pTaskList, &pTask->id);
13,185✔
717
  if (p == NULL) {
13,185!
718
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
719
    tFreeStreamTask(pTask);
×
720
    return terrno;
×
721
  }
722

723
  pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
13,185✔
724
  code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
13,185✔
725
  if (code) {
13,185!
726
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
727
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
728

729
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
730
    if (ret != 0) {
×
731
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
732
    }
733
    return code;
×
734
  }
735

736
  if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
13,185!
737
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
738
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
739

740
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
741
    if (ret) {
×
742
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
743
    }
744
    return code;
×
745
  }
746

747
  if ((code = streamMetaCommit(pMeta)) != 0) {
13,175!
748
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
749
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
750

751
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
752
    if (ret) {
×
753
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
754
    }
755

756
    return code;
×
757
  }
758

759
  if (pTask->info.fillHistory == 0) {
13,184✔
760
    int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
8,454✔
761
  }
762

763
  *pAdded = true;
13,185✔
764
  return code;
13,185✔
765
}
766

767
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
12,831,873✔
768
  int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
12,831,873✔
769
  int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
12,831,943✔
770
  if (sizeInList != size) {
12,831,954!
771
    stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
×
772
  }
773

774
  return size;
12,831,964✔
775
}
776

777
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
482,951✔
778
  QRY_PARAM_CHECK(pTask);
482,951!
779
  STaskId  id = {.streamId = streamId, .taskId = taskId};
482,951✔
780
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
482,951✔
781
  if (pTaskRefId == NULL) {
482,897✔
782
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
1,971✔
783
  }
784

785
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
480,926✔
786
  if (p == NULL) {
481,273!
UNCOV
787
    stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId);
×
788
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
5✔
789
  }
790

791
  if (p->id.refId != *pTaskRefId) {
481,278!
792
    stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
×
793
            p->id.refId);
794
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
795
    if (ret) {
×
796
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
797
    }
798

799
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
800
  }
801

802
  if (streamTaskShouldStop(p)) {
481,278✔
803
    stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
379✔
804
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
379✔
805
    if (ret) {
379!
806
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
807
    }
808
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
379✔
809
  }
810

811
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
480,738✔
812
  *pTask = p;
480,711✔
813
  return TSDB_CODE_SUCCESS;
480,711✔
814
}
815

816
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
97,499✔
817
  QRY_PARAM_CHECK(pTask);
97,499!
818
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
97,499✔
819

820
  if (pTaskRefId == NULL) {
97,536✔
821
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
1,772✔
822
  }
823

824
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
95,764✔
825
  if (p == NULL) {
95,779✔
826
    stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
12✔
827
            *pTaskRefId);
828
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
4✔
829
  }
830

831
  if (p->id.refId != *pTaskRefId) {
95,767!
832
    stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
×
833
            *pTaskRefId, p->id.refId);
834
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
835
    if (ret) {
×
836
      stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
×
837
    }
838

839
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
840
  }
841

842
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
95,767✔
843
  *pTask = p;
95,756✔
844
  return TSDB_CODE_SUCCESS;
95,756✔
845
}
846

847
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
480,298✔
848
  streamMetaRLock(pMeta);
480,298✔
849
  int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
480,337✔
850
  streamMetaRUnLock(pMeta);
480,335✔
851
  return code;
480,310✔
852
}
853

854
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
718,479✔
855
  if (pTask == NULL) {
718,479✔
856
    return;
12✔
857
  }
858

859
  int32_t taskId = pTask->id.taskId;
718,467✔
860
  int64_t refId = pTask->id.refId;
718,467✔
861
  stDebug("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
718,467✔
862
  int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
718,471✔
863
  if (ret) {
718,735!
864
    stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
×
865
  }
866
}
867

868
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
11,443✔
869
  bool remove = false;
11,443✔
870
  for (int32_t i = 0; i < num; ++i) {
28,836✔
871
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
28,830✔
872
    if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
28,829✔
873
      taosArrayRemove(pTaskList, i);
11,436✔
874
      remove = true;
11,420✔
875
      break;
11,420✔
876
    }
877
  }
878

879
  if (!remove) {
11,426!
880
    stError("s-task:0x%x not in meta task list, internal error", id->taskId);
×
881
  }
882
}
11,426✔
883

884
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
11,413✔
885
  int32_t code = 0;
11,413✔
886
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
11,413✔
887
    code = streamTaskSendCheckpointSourceRsp(pTask);
5,671✔
888
    if (code) {
5,672!
889
      stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
×
890
              tstrerror(code));
891
    }
892
  }
893

894
  // let's kill the query procedure within stream, to end it ASAP.
895
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
11,424✔
896
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
5,734✔
897
    if (code != TSDB_CODE_SUCCESS) {
5,735!
898
      stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
899
    }
900
  }
901
  return code;
11,422✔
902
}
903

904
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
11,444✔
905
  SStreamTask* pTask = NULL;
11,444✔
906
  int32_t      vgId = pMeta->vgId;
11,444✔
907
  int32_t      code = 0;
11,444✔
908
  STaskId      id = {.streamId = streamId, .taskId = taskId};
11,444✔
909

910
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
11,444✔
911
  if (code == 0) {
11,456✔
912
    // desc the paused task counter
913
    if (streamTaskShouldPause(pTask)) {
11,442✔
914
      int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
6✔
915
      stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
6!
916
    }
917

918
    // handle the dropping event
919
    code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
11,440✔
920
    if (code) {
11,438!
921
      stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
×
922
    }
923

924
    stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
11,435✔
925

926
    // it is a fill-history task, remove the related stream task's id that points to it
927
    if (pTask->info.fillHistory == 0) {
11,435✔
928
      int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
6,804✔
929
    }
930

931
    code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
11,443✔
932
    doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
11,441✔
933
    code = streamMetaRemoveTask(pMeta, &id);
11,421✔
934
    if (code) {
11,419!
935
      stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
×
936
    }
937

938
    int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
11,419✔
939
    int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
11,424✔
940
    if (sizeInList != size) {
11,427!
941
      stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
×
942
    }
943

944
    if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
11,427✔
945
      stDebug("s-task:%s stop schedTimer", pTask->id.idStr);
869✔
946
      streamTmrStop(pTask->schedInfo.pDelayTimer);
869✔
947
      pTask->info.delaySchedParam = 0;
869✔
948
    }
949

950
    int64_t refId = pTask->id.refId;
11,427✔
951
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
11,427✔
952
    if (ret != 0) {
11,436!
953
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
954
    }
955

956
    streamMetaReleaseTask(pMeta, pTask);
11,436✔
957
  } else {
958
    stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
14!
959
  }
960

961
  return 0;
11,452✔
962
}
963

964
int32_t streamMetaBegin(SStreamMeta* pMeta) {
10,183✔
965
  streamMetaWLock(pMeta);
10,183✔
966
  int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
10,185✔
967
                          TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
968
  if (code) {
10,185!
969
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
970
  }
971
  streamMetaWUnLock(pMeta);
10,185✔
972
  return code;
10,185✔
973
}
974

975
int32_t streamMetaCommit(SStreamMeta* pMeta) {
33,762✔
976
  int32_t code = tdbCommit(pMeta->db, pMeta->txn);
33,762✔
977
  if (code != 0) {
33,779!
978
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
979
    stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
980
            pMeta->fatalInfo.line);
981
  }
982

983
  code = tdbPostCommit(pMeta->db, pMeta->txn);
33,779✔
984
  if (code != 0) {
33,780!
985
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
986
    stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
987
            pMeta->fatalInfo.line);
988
    return code;
×
989
  }
990

991
  code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
33,780✔
992
                  TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
993
  if (code != 0) {
33,777!
994
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
995
    stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
×
996
  } else {
997
    stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
33,777✔
998
  }
999

1000
  return code;
33,778✔
1001
}
1002

1003
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
198✔
1004
  int64_t checkpointId = 0;
198✔
1005
  int32_t code = 0;
198✔
1006

1007
  TBC* pCur = NULL;
198✔
1008
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
198!
1009
    stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1010
    return checkpointId;
×
1011
  }
1012

1013
  void*    pKey = NULL;
199✔
1014
  int32_t  kLen = 0;
199✔
1015
  void*    pVal = NULL;
199✔
1016
  int32_t  vLen = 0;
199✔
1017
  SDecoder decoder;
1018

1019
  code = tdbTbcMoveToFirst(pCur);
199✔
1020
  if (code) {
198!
1021
    stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1022
    tdbTbcClose(pCur);
×
1023
    return checkpointId;
×
1024
  }
1025

1026
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
815✔
1027
    if (pVal == NULL || vLen == 0) {
617!
1028
      break;
1029
    }
1030
    SCheckpointInfo info;
1031
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
617✔
1032
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
617!
1033
      continue;
×
1034
    }
1035
    tDecoderClear(&decoder);
617✔
1036

1037
    checkpointId = TMAX(checkpointId, info.checkpointId);
617✔
1038
  }
1039

1040
  stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
198✔
1041

1042
  tdbFree(pKey);
199✔
1043
  tdbFree(pVal);
199✔
1044

1045
  tdbTbcClose(pCur);
199✔
1046
  return checkpointId;
199✔
1047
}
1048

1049
// not allowed to return error code
1050
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
10,236✔
1051
  TBC*     pCur = NULL;
10,236✔
1052
  void*    pKey = NULL;
10,236✔
1053
  int32_t  kLen = 0;
10,236✔
1054
  void*    pVal = NULL;
10,236✔
1055
  int32_t  vLen = 0;
10,236✔
1056
  SDecoder decoder;
1057
  int32_t  vgId = 0;
10,236✔
1058
  int32_t  code = 0;
10,236✔
1059
  SArray*  pRecycleList = NULL;
10,236✔
1060

1061
  if (pMeta == NULL) {
10,236!
1062
    return;
×
1063
  }
1064

1065
  vgId = pMeta->vgId;
10,236✔
1066
  pRecycleList = taosArrayInit(4, sizeof(STaskId));
10,236✔
1067
  if (pRecycleList == NULL) {
10,235!
1068
    stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
×
1069
    return;
×
1070
  }
1071

1072
  stInfo("vgId:%d load stream tasks from meta files", vgId);
10,235!
1073

1074
  code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
10,236✔
1075
  if (code != TSDB_CODE_SUCCESS) {
10,236!
1076
    stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1077
    taosArrayDestroy(pRecycleList);
×
1078
    return;
×
1079
  }
1080

1081
  code = tdbTbcMoveToFirst(pCur);
10,236✔
1082
  if (code) {
10,236!
1083
    stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1084
    taosArrayDestroy(pRecycleList);
×
1085
    tdbTbcClose(pCur);
×
1086
    return;
×
1087
  }
1088

1089
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
11,001✔
1090
    if (pVal == NULL || vLen == 0) {
765!
1091
      break;
1092
    }
1093

1094
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
765!
1095
    if (pTask == NULL) {
765!
1096
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1097
      stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
×
1098
      break;
×
1099
    }
1100

1101
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
765✔
1102
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
765!
1103
      tDecoderClear(&decoder);
×
1104
      tFreeStreamTask(pTask);
×
1105
      stError(
×
1106
          "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
1107
          "stream manually",
1108
          vgId, tsDataDir);
1109
      break;
×
1110
    }
1111
    tDecoderClear(&decoder);
765✔
1112

1113
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
765!
1114
      int32_t taskId = pTask->id.taskId;
×
1115
      STaskId id = streamTaskGetTaskId(pTask);
×
1116

1117
      tFreeStreamTask(pTask);
×
1118
      void* px = taosArrayPush(pRecycleList, &id);
×
1119
      if (px == NULL) {
×
1120
        stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
×
1121
      }
1122

1123
      int32_t total = taosArrayGetSize(pRecycleList);
×
1124
      stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
×
1125
      continue;
×
1126
    }
1127

1128
    stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
765✔
1129
            pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
1130

1131
    // do duplicate task check.
1132
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
765✔
1133
    void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
765✔
1134
    if (p == NULL) {
765!
1135
      code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
765✔
1136
      if (code < 0) {
765!
1137
        stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
×
1138
        tFreeStreamTask(pTask);
×
1139
        continue;
×
1140
      }
1141

1142
      void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
765✔
1143
      if (px == NULL) {
765!
1144
        stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
×
1145
      }
1146
    } else {
1147
      // todo this should replace the existed object put by replay creating stream task msg from mnode
1148
      stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
×
1149
      tFreeStreamTask(pTask);
×
1150
      continue;
×
1151
    }
1152

1153
    pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
765✔
1154

1155
    if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
765!
1156
      int64_t refId = pTask->id.refId;
×
1157
      stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
×
1158
              pTask->id.taskId, tstrerror(terrno), refId);
1159

1160
      void*   px = taosArrayPop(pMeta->pTaskList);
×
1161
      int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
1162
      if (ret != 0) {
×
1163
        stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1164
      }
1165
      continue;
×
1166
    }
1167

1168
    stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId);
765!
1169
    if (pTask->info.fillHistory == 0) {
765✔
1170
      int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
555✔
1171
    }
1172

1173
    if (streamTaskShouldPause(pTask)) {
765!
1174
      int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
1175
    }
1176
  }
1177

1178
  tdbFree(pKey);
10,236✔
1179
  tdbFree(pVal);
10,236✔
1180

1181
  tdbTbcClose(pCur);
10,235✔
1182

1183
  if (taosArrayGetSize(pRecycleList) > 0) {
10,235!
1184
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
×
1185
      STaskId* pId = taosArrayGet(pRecycleList, i);
×
1186
      code = streamMetaRemoveTask(pMeta, pId);
×
1187
      if (code) {
×
1188
        stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
×
1189
      }
1190
    }
1191
  }
1192

1193
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
10,235✔
1194
  stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
10,235✔
1195
          pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
1196

1197
  taosArrayDestroy(pRecycleList);
10,236✔
1198
  code = streamMetaCommit(pMeta);
10,236✔
1199
  if (code) {
10,236!
1200
    stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
×
1201
  }
1202
}
1203

1204
void streamMetaNotifyClose(SStreamMeta* pMeta) {
10,184✔
1205
  int32_t vgId = pMeta->vgId;
10,184✔
1206
  int64_t startTs = 0;
10,184✔
1207
  int32_t sendCount = 0;
10,184✔
1208

1209
  streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
10,184✔
1210
  stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
10,184!
1211
         vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
1212

1213
  // wait for the stream meta hb function stopping
1214
  streamMetaWaitForHbTmrQuit(pMeta);
10,185✔
1215
  pMeta->closeFlag = true;
10,184✔
1216

1217
  stDebug("vgId:%d start to check all tasks for closing", vgId);
10,184✔
1218
  int64_t st = taosGetTimestampMs();
10,184✔
1219

1220
  streamMetaRLock(pMeta);
10,184✔
1221

1222
  SArray* pTaskList = NULL;
10,184✔
1223
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
10,184✔
1224
  if (code != TSDB_CODE_SUCCESS) {
1225
  }
1226

1227
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
10,185✔
1228
  for (int32_t i = 0; i < numOfTasks; ++i) {
12,567✔
1229
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
2,383✔
1230
    SStreamTask*   pTask = NULL;
2,382✔
1231

1232
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
2,382✔
1233
    if (code != TSDB_CODE_SUCCESS) {
2,383✔
1234
      continue;
67✔
1235
    }
1236

1237
    int64_t refId = pTask->id.refId;
2,316✔
1238
    int32_t ret = streamTaskStop(pTask);
2,316✔
1239
    if (ret) {
2,316!
1240
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
1241
    }
1242

1243
    streamMetaReleaseTask(pMeta, pTask);
2,316✔
1244
    ret = taosRemoveRef(streamTaskRefPool, refId);
2,315✔
1245
    if (ret) {
2,316!
1246
      stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
×
1247
    }
1248
  }
1249

1250
  taosArrayDestroy(pTaskList);
10,184✔
1251

1252
  double el = (taosGetTimestampMs() - st) / 1000.0;
10,185✔
1253
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
10,185✔
1254
  streamMetaRUnLock(pMeta);
10,185✔
1255
}
10,185✔
1256

1257
void streamMetaStartHb(SStreamMeta* pMeta) {
8,508✔
1258
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
8,508!
1259
  if (pRid == NULL) {
8,508!
1260
    stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
×
1261
    return;
×
1262
  }
1263

1264
  *pRid = pMeta->rid;
8,508✔
1265
  int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
8,508✔
1266
  if (code) {
8,508!
1267
    return;
×
1268
  }
1269

1270
  streamMetaHbToMnode(pRid, NULL);
8,508✔
1271
}
1272

1273
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
10,275✔
1274
  QRY_PARAM_CHECK(pList);
10,275!
1275

1276
  int32_t code = 0;
10,275✔
1277
  SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
10,275✔
1278
  if (pTaskList == NULL) {
10,276!
1279
    stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
×
1280
    return terrno;
×
1281
  }
1282

1283
  *pList = pTaskList;
10,276✔
1284

1285
  bool sendMsg = pMeta->sendMsgBeforeClosing;
10,276✔
1286
  if (!sendMsg) {
10,276✔
1287
    stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
10,272✔
1288
    return TSDB_CODE_SUCCESS;
10,272✔
1289
  }
1290

1291
  stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
4!
1292

1293
  // send hb msg to mnode before closing all tasks.
1294
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
4✔
1295
  for (int32_t i = 0; i < numOfTasks; ++i) {
4!
1296
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
1297
    SStreamTask*   pTask = NULL;
×
1298

1299
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
1300
    if (code != TSDB_CODE_SUCCESS) {  // this error is ignored
×
1301
      continue;
×
1302
    }
1303

1304
    streamTaskSetCheckpointFailed(pTask);
×
1305
    streamMetaReleaseTask(pMeta, pTask);
×
1306
  }
1307

1308
  code = streamMetaSendHbHelper(pMeta);
4✔
1309
  pMeta->sendMsgBeforeClosing = false;
4✔
1310
  return TSDB_CODE_SUCCESS;  // always return true
4✔
1311
}
1312

1313
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
12,300✔
1314
  streamMetaWLock(pMeta);
12,300✔
1315

1316
  int64_t prevStage = pMeta->stage;
12,301✔
1317
  pMeta->stage = stage;
12,301✔
1318

1319
  // mark the sign to send msg before close all tasks
1320
  // 1. for leader vnode, always send msg before closing
1321
  // 2. for follower vnode, if it's is changed from leader, also sending msg before closing.
1322
  if (pMeta->role == NODE_ROLE_LEADER) {
12,301✔
1323
    pMeta->sendMsgBeforeClosing = true;
4✔
1324
  }
1325

1326
  pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
12,301✔
1327
  streamMetaWUnLock(pMeta);
12,301✔
1328

1329
  if (isLeader) {
12,301✔
1330
    stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
8,508!
1331
           pMeta->vgId, prevStage, stage, isLeader, pMeta->rid);
1332
    streamMetaStartHb(pMeta);
8,508✔
1333
  } else {
1334
    stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
3,793!
1335
           prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing);
1336
  }
1337
}
12,301✔
1338

1339
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
1,752✔
1340
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
1,752✔
1341
  for (int32_t i = 0; i < num; ++i) {
4,800✔
1342
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
3,098✔
1343
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
3,098✔
1344
    SStreamTask*   pTask = NULL;
3,098✔
1345
    int32_t        code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
3,098✔
1346

1347
    if (code == 0) {
3,098!
1348
      if (pTask->status.downstreamReady == 0) {
3,098✔
1349
        streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
50✔
1350
        return false;
50✔
1351
      }
1352
      streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
3,048✔
1353
    }
1354
  }
1355

1356
  return true;
1,702✔
1357
}
1358

1359
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
162✔
1360
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
162✔
1361

1362
  stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
162✔
1363
  if (numOfTasks == 0) {
162!
1364
    return TSDB_CODE_SUCCESS;
×
1365
  }
1366

1367
  for (int32_t i = 0; i < numOfTasks; ++i) {
750✔
1368
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
588✔
1369
    STaskId        id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
588✔
1370
    SStreamTask*   pTask = NULL;
588✔
1371
    int32_t        code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
588✔
1372
    if (code == 0) {
588!
1373
      streamTaskResetStatus(pTask);
588✔
1374
      streamMetaReleaseTask(pMeta, pTask);
588✔
1375
    }
1376
  }
1377

1378
  return 0;
162✔
1379
}
1380

1381
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
83✔
1382
                                     int64_t startTs) {
1383
  const char* id = pTask->id.idStr;
83✔
1384
  int32_t     vgId = pMeta->vgId;
83✔
1385
  int32_t     code = 0;
83✔
1386

1387
  // keep the already updated info
1388
  STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
83✔
1389
  code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
83✔
1390
  if (code != 0) {
83!
1391
    stError("s-task:%s failed to put updateTask into update list", id);
×
1392
  }
1393

1394
  int64_t el = taosGetTimestampMs() - startTs;
83✔
1395
  if (pHTask != NULL) {
83✔
1396
    STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
71✔
1397
    code = taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
71✔
1398
    if (code != 0) {
71!
1399
      stError("s-task:%s failed to put updateTask into update list", id);
×
1400
    } else {
1401
      stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
71!
1402
              " ms",
1403
              id, vgId, transId, el);
1404
    }
1405
  } else {
1406
    stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
12!
1407
            id, vgId, transId, el);
1408
  }
1409
}
83✔
1410

1411
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
43✔
1412
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
43✔
1413

1414
  taosHashClear(pInfo->pTasks);
43✔
1415

1416
  int32_t prev = pInfo->completeTransId;
43✔
1417
  pInfo->completeTransId = pInfo->activeTransId;
43✔
1418
  pInfo->activeTransId = -1;
43✔
1419
  pInfo->completeTs = taosGetTimestampMs();
43✔
1420

1421
  stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64 ", complete transId:%d->%d, reset active transId",
43!
1422
          pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId);
1423
}
43✔
1424

1425
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
83✔
1426
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
83✔
1427

1428
  if (transId > pInfo->completeTransId) {
83!
1429
    if (pInfo->activeTransId == -1) {
83✔
1430
      taosHashClear(pInfo->pTasks);
43✔
1431
      pInfo->activeTransId = transId;
43✔
1432

1433
      stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId,
43!
1434
             pInfo->completeTransId);
1435
      return true;
43✔
1436
    } else {
1437
      if (pInfo->activeTransId == transId) {
40!
1438
        // do nothing
1439
        return true;
40✔
1440
      } else if (transId < pInfo->activeTransId) {
×
1441
        stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard",
×
1442
                pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
1443
        return false;
×
1444
      } else {  // transId > pInfo->activeTransId
1445
        taosHashClear(pInfo->pTasks);
×
1446
        int32_t prev = pInfo->activeTransId;
×
1447
        pInfo->activeTransId = transId;
×
1448

1449
        stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId,
×
1450
               transId, prev, pInfo->completeTransId);
1451
        return true;
×
1452
      }
1453
    }
1454
  } else if (transId == pInfo->completeTransId) {
×
1455
    stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
×
1456
            pInfo->completeTs);
1457
    return false;
×
1458
  } else {  // pInfo->completeTransId > transId
1459
    stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
×
1460
            pMeta->vgId, pInfo->activeTransId, transId);
1461
    return false;
×
1462
  }
1463
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc