• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3798

31 Mar 2025 10:39AM UTC coverage: 9.424% (-20.9%) from 30.372%
#3798

push

travis-ci

happyguoxy
test:add test cases

21549 of 307601 branches covered (7.01%)

Branch coverage included in aggregate %.

36084 of 303967 relevant lines covered (11.87%)

58620.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/stream/src/streamMeta.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamBackendRocksdb.h"
17
#include "streamInt.h"
18
#include "tmisce.h"
19
#include "tref.h"
20
#include "tsched.h"
21
#include "tstream.h"
22
#include "ttimer.h"
23
#include "wal.h"
24

25
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
26

27
int32_t streamBackendId = 0;
28
int32_t streamBackendCfWrapperId = 0;
29
int32_t taskDbWrapperId = 0;
30
int32_t streamTaskRefPool = 0;
31

32
static int32_t streamMetaBegin(SStreamMeta* pMeta);
33
static void    streamMetaCloseImpl(void* arg);
34

35
typedef struct {
36
  TdThreadMutex mutex;
37
  SHashObj*     pTable;
38
} SMetaRefMgt;
39

40
SMetaRefMgt gMetaRefMgt;
41

42
int32_t metaRefMgtInit();
43
void    metaRefMgtCleanup();
44

45
static void streamMetaEnvInit() {
×
46
  streamBackendId = taosOpenRef(64, streamBackendCleanup);
×
47
  streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
×
48
  taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
×
49

50
  streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
×
51
  streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
×
52

53
  int32_t code = metaRefMgtInit();
×
54
  if (code) {
×
55
    stError("failed to init stream meta mgmt env, start failed");
×
56
    return;
×
57
  }
58

59
  code = streamTimerInit();
×
60
  if (code) {
×
61
    stError("failed to init stream meta env, start failed");
×
62
  }
63
}
64

65
void streamMetaInit() {
×
66
  int32_t code = taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);
×
67
  if (code) {
×
68
    stError("failed to init stream Meta model, code:%s", tstrerror(code));
×
69
  }
70
}
×
71

72
void streamMetaCleanup() {
×
73
  taosCloseRef(streamBackendId);
×
74
  taosCloseRef(streamBackendCfWrapperId);
×
75
  taosCloseRef(streamMetaRefPool);
×
76
  taosCloseRef(streamTaskRefPool);
×
77

78
  metaRefMgtCleanup();
×
79
  streamTimerCleanUp();
×
80
}
×
81

82
int32_t metaRefMgtInit() {
×
83
  int32_t code = taosThreadMutexInit(&(gMetaRefMgt.mutex), NULL);
×
84
  if (code) {
×
85
    return code;
×
86
  }
87

88
  if (code == 0) {
×
89
    gMetaRefMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
×
90
  }
91

92
  if (gMetaRefMgt.pTable == NULL) {
×
93
    return terrno;
×
94
  } else {
95
    return code;
×
96
  }
97
}
98

99
void metaRefMgtCleanup() {
×
100
  void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
×
101
  while (pIter) {
×
102
    int64_t* p = *(int64_t**)pIter;
×
103
    taosMemoryFree(p);
×
104
    pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
×
105
  }
106

107
  taosHashCleanup(gMetaRefMgt.pTable);
×
108
  streamMutexDestroy(&gMetaRefMgt.mutex);
×
109
}
×
110

111
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
×
112
  int32_t code = 0;
×
113
  void*   p = NULL;
×
114

115
  streamMutexLock(&gMetaRefMgt.mutex);
×
116

117
  p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
×
118
  if (p == NULL) {
×
119
    code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
×
120
    if (code) {
×
121
      stError("vgId:%d failed to put into refId mgt, refId:%" PRId64 " %p, code:%s", (int32_t)vgId, *rid, rid,
×
122
              tstrerror(code));
123
      return code;
×
124
    } else {  // not
125
              //      stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
126
    }
127
  } else {
128
    stFatal("try to add refId:%" PRId64 " vgId:%d, %p that already added into mgt", *rid, (int32_t)vgId, rid);
×
129
  }
130

131
  streamMutexUnlock(&gMetaRefMgt.mutex);
×
132
  return code;
×
133
}
134

135
void metaRefMgtRemove(int64_t* pRefId) {
×
136
  streamMutexLock(&gMetaRefMgt.mutex);
×
137

138
  int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
×
139
  taosMemoryFree(pRefId);
×
140
  streamMutexUnlock(&gMetaRefMgt.mutex);
×
141
}
×
142

143
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
×
144
  if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
×
145
    stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
×
146
    return -1;
×
147
  }
148

149
  if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
×
150
    stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId);
×
151
    return -1;
×
152
  }
153

154
  if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
×
155
    stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId);
×
156
    return -1;
×
157
  }
158

159
  return 0;
×
160
}
161

162
//
163
// impl later
164
//
165
enum STREAM_STATE_VER {
166
  STREAM_STATA_NO_COMPATIBLE,
167
  STREAM_STATA_COMPATIBLE,
168
  STREAM_STATA_NEED_CONVERT,
169
};
170

171
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
×
172
  int8_t  ret = STREAM_STATA_COMPATIBLE;
×
173
  TBC*    pCur = NULL;
×
174
  int32_t code = 0;
×
175
  void*   pKey = NULL;
×
176
  int32_t kLen = 0;
×
177
  void*   pVal = NULL;
×
178
  int32_t vLen = 0;
×
179

180
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {  // no task info, no stream
×
181
    return ret;
×
182
  }
183

184
  code = tdbTbcMoveToFirst(pCur);
×
185
  if (code) {
×
186
    stError("vgId:%d failed to open stream meta file cursor, not perform compatible check, code:%s", pMeta->vgId,
×
187
            tstrerror(code));
188
    tdbTbcClose(pCur);
×
189
    return ret;
×
190
  }
191

192
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
193
    if (pVal == NULL || vLen == 0) {
×
194
      break;
195
    }
196

197
    SDecoder        decoder;
198
    SCheckpointInfo info;
199
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
×
200
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
×
201
      tDecoderClear(&decoder);
×
202
      continue;
×
203
    }
204

205
    if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) {
×
206
      ret = STREAM_STATA_NO_COMPATIBLE;
×
207
    } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) {
×
208
      ret = STREAM_STATA_NEED_CONVERT;
×
209
    }
210

211
    tDecoderClear(&decoder);
×
212
    break;
×
213
  }
214

215
  tdbFree(pKey);
×
216
  tdbFree(pVal);
×
217
  tdbTbcClose(pCur);
×
218
  return ret;
×
219
}
220

221
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
×
222
  int32_t          code = 0;
×
223
  SBackendWrapper* pBackend = NULL;
×
224
  int64_t          chkpId = streamMetaGetLatestCheckpointId(pMeta);
×
225

226
  terrno = 0;
×
227
  bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
×
228
  if (exist == false) {
×
229
    code = terrno;
×
230
    return code;
×
231
  }
232

233
  code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend);
×
234
  if (code) {
×
235
    return code;
×
236
  }
237

238
  void* pIter = taosHashIterate(pBackend->cfInst, NULL);
×
239
  while (pIter) {
×
240
    void* key = taosHashGetKey(pIter, NULL);
×
241
    code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
×
242
    if (code != 0) {
×
243
      stError("vgId:%d failed to cvt data", pMeta->vgId);
×
244
      goto _EXIT;
×
245
    }
246

247
    pIter = taosHashIterate(pBackend->cfInst, pIter);
×
248
  }
249

250
_EXIT:
×
251
  streamBackendCleanup((void*)pBackend);
×
252

253
  if (code == 0) {
×
254
    int32_t len = strlen(pMeta->path) + 32;
×
255
    char*   state = taosMemoryCalloc(1, len);
×
256
    if (state != NULL) {
×
257
      (void)snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
×
258
      taosRemoveDir(state);
×
259
      taosMemoryFree(state);
×
260
    } else {
261
      stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code));
×
262
    }
263
  }
264

265
  return code;
×
266
}
267

268
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
×
269
  int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
×
270
  if (compatible == STREAM_STATA_COMPATIBLE) {
×
271
    return 0;
×
272
  } else if (compatible == STREAM_STATA_NEED_CONVERT) {
×
273
    stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
×
274
    return streamMetaCvtDbFormat(pMeta);
×
275
  } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
×
276
    stError(
×
277
        "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
278
        "manually",
279
        pMeta->vgId, tsDataDir);
280

281
    return -1;
×
282
  }
283

284
  return 0;
×
285
}
286

287
int8_t streamTaskShouldRecalated(SStreamTask* pTask) { return pTask->info.fillHistory == 2 ? 1 : 0; }
×
288

289
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key, uint8_t recalated) {
×
290
  int32_t code = 0;
×
291
  int64_t chkpId = pTask->chkInfo.checkpointId;
×
292

293
  // int8_t recalated = streamTaskShouldRecalated(pTask);
294

295
  streamMutexLock(&pMeta->backendMutex);
×
296
  // streamId--taskId
297
  void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
×
298
  if ((ppBackend != NULL) && (*ppBackend != NULL)) {
×
299
    void* p = taskDbAddRef(*ppBackend);
×
300
    if (p == NULL) {
×
301
      stError("s-task:0x%x failed to ref backend", pTask->id.taskId);
×
302
      streamMutexUnlock(&pMeta->backendMutex);
×
303
      return TSDB_CODE_FAILED;
×
304
    }
305

306
    STaskDbWrapper* pBackend = *ppBackend;
×
307
    pBackend->pMeta = pMeta;
×
308
    if (recalated) {
×
309
      pTask->pRecalBackend = pBackend;
×
310
    } else {
311
      pTask->pBackend = pBackend;
×
312
    }
313

314
    streamMutexUnlock(&pMeta->backendMutex);
×
315
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
×
316
    return 0;
×
317
  }
318

319
  STaskDbWrapper* pBackend = NULL;
×
320
  int64_t         processVer = -1;
×
321
  while (1) {
322
    code = taskDbOpen(pMeta->path, key, chkpId, &processVer, &pBackend);
×
323
    if (code == 0) {
×
324
      break;
×
325
    }
326

327
    streamMutexUnlock(&pMeta->backendMutex);
×
328
    taosMsleep(1000);
×
329

330
    stDebug("backend held by other task, restart later, path:%s, key:%s", pMeta->path, key);
×
331
    streamMutexLock(&pMeta->backendMutex);
×
332
  }
333

334
  int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
×
335
  if (recalated) {
×
336
    pTask->pRecalBackend = pBackend;
×
337
  } else {
338
    pTask->pBackend = pBackend;
×
339
  }
340
  pBackend->refId = tref;
×
341
  pBackend->pTask = pTask;
×
342
  pBackend->pMeta = pMeta;
×
343

344
  if (processVer != -1) {
×
345
    if (pTask->chkInfo.processedVer != processVer) {
×
346
      stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
×
347
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
348
      pTask->chkInfo.processedVer = processVer;
×
349
      pTask->chkInfo.checkpointVer = processVer;
×
350
      pTask->chkInfo.nextProcessVer = processVer + 1;
×
351
    } else {
352
      stInfo("s-task:%s vgId:%d processedVer:%" PRId64
×
353
             " in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
354
             pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
355
    }
356
  }
357

358
  code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
×
359
  if (code) {
×
360
    stError("s-task:0x%x failed to put taskDb backend, code:out of memory", pTask->id.taskId);
×
361
  }
362
  streamMutexUnlock(&pMeta->backendMutex);
×
363

364
  if (recalated) {
×
365
    stDebug("s-task:0x%x set recalated backend %p", pTask->id.taskId, pBackend);
×
366
  } else {
367
    stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
×
368
  }
369
  return 0;
×
370
}
371

372
void streamMetaRemoveDB(void* arg, char* key) {
×
373
  if (arg == NULL || key == NULL) return;
×
374

375
  SStreamMeta* pMeta = arg;
×
376
  streamMutexLock(&pMeta->backendMutex);
×
377
  int32_t code = taosHashRemove(pMeta->pTaskDbUnique, key, strlen(key));
×
378
  if (code) {
×
379
    stError("vgId:%d failed to remove key:%s in taskDbUnique map", pMeta->vgId, key);
×
380
  }
381

382
  streamMutexUnlock(&pMeta->backendMutex);
×
383
}
384

385
int32_t streamMetaUpdateInfoInit(STaskUpdateInfo* pInfo) {
×
386
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
×
387

388
  pInfo->pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
×
389
  if (pInfo->pTasks == NULL) {
×
390
    return terrno;
×
391
  }
392

393
  pInfo->pTaskList = taosArrayInit(4, sizeof(int32_t));
×
394
  if (pInfo->pTaskList == NULL) {
×
395
    return terrno;
×
396
  }
397

398
  return TSDB_CODE_SUCCESS;
×
399
}
400

401
void streamMetaUpdateInfoCleanup(STaskUpdateInfo* pInfo) {
×
402
  taosHashCleanup(pInfo->pTasks);
×
403
  taosArrayDestroy(pInfo->pTaskList);
×
404
  pInfo->pTasks = NULL;
×
405
  pInfo->pTaskList = NULL;
×
406
}
×
407

408

409

410
int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId,
×
411
                       int64_t stage, startComplete_fn_t fn, SStreamMeta** p) {
412
  QRY_PARAM_CHECK(p);
×
413
  int32_t code = 0;
×
414
  int32_t lino = 0;
×
415

416
  SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
×
417
  if (pMeta == NULL) {
×
418
    stError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta));
×
419
    return terrno;
×
420
  }
421

422
  int32_t len = strlen(path) + 64;
×
423
  char*   tpath = taosMemoryCalloc(1, len);
×
424
  TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
×
425

426
  (void)snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream");
×
427
  pMeta->path = tpath;
×
428

429
  code = streamMetaOpenTdb(pMeta);
×
430
  TSDB_CHECK_CODE(code, lino, _err);
×
431

432
  if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) {
×
433
    stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId,
×
434
            tstrerror(terrno));
435
    TSDB_CHECK_CODE(code, lino, _err);
×
436
  }
437

438
  // set the attribute when running on Linux OS
439
  TdThreadRwlockAttr attr;
440
  code = taosThreadRwlockAttrInit(&attr);
×
441
  TSDB_CHECK_CODE(code, lino, _err);
×
442

443
#ifdef LINUX
444
  code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
×
445
  TSDB_CHECK_CODE(code, lino, _err);
×
446
#endif
447

448
  code = taosThreadRwlockInit(&pMeta->lock, &attr);
×
449
  TSDB_CHECK_CODE(code, lino, _err);
×
450

451
  code = taosThreadRwlockAttrDestroy(&attr);
×
452
  TSDB_CHECK_CODE(code, lino, _err);
×
453

454
  if ((code = streamMetaBegin(pMeta) < 0)) {
×
455
    stError("vgId:%d begin trans for stream meta failed", pMeta->vgId);
×
456
    goto _err;
×
457
  }
458

459
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
×
460
  pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK);
×
461
  TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno);
×
462

463
  code = streamMetaUpdateInfoInit(&pMeta->updateInfo);
×
464
  TSDB_CHECK_CODE(code, lino, _err);
×
465

466
  code = streamMetaInitStartInfo(&pMeta->startInfo);
×
467
  TSDB_CHECK_CODE(code, lino, _err);
×
468

469
  // task list
470
  pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
×
471
  TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
×
472

473
  pMeta->scanInfo.scanSentinel = 0;
×
474
  pMeta->scanInfo.lastScanTs = 0;
×
475
  pMeta->scanInfo.tickCounter = 0;
×
476

477
  pMeta->vgId = vgId;
×
478
  pMeta->ahandle = ahandle;
×
479
  pMeta->buildTaskFn = buildTaskFn;
×
480
  pMeta->expandTaskFn = expandTaskFn;
×
481
  pMeta->stage = stage;
×
482
  pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
×
483
  pMeta->updateInfo.activeTransId = -1;
×
484
  pMeta->updateInfo.completeTransId = -1;
×
485

486
  pMeta->startInfo.completeFn = fn;
×
487
  pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
488
  TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno);
×
489

490
  pMeta->numOfPausedTasks = 0;
×
491
  pMeta->numOfStreamTasks = 0;
×
492
  pMeta->closeFlag = false;
×
493

494
  stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
×
495

496
  code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
×
497
  TSDB_CHECK_CODE(code, lino, _err);
×
498

499
  code = taosThreadMutexInit(&pMeta->backendMutex, NULL);
×
500
  TSDB_CHECK_CODE(code, lino, _err);
×
501

502
  // add refId at the end of initialization function
503
  pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
×
504

505
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
×
506
  TSDB_CHECK_NULL(pRid, code, lino, _err, terrno);
×
507

508
  memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
×
509

510
  code = metaRefMgtAdd(pMeta->vgId, pRid);
×
511
  TSDB_CHECK_CODE(code, lino, _err);
×
512

513
  code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
×
514

515
  TSDB_CHECK_CODE(code, lino, _err);
×
516

517
  *p = pMeta;
×
518
  return code;
×
519

520
_err:
×
521
  taosMemoryFree(pMeta->path);
×
522
  if (pMeta->pTasksMap) taosHashCleanup(pMeta->pTasksMap);
×
523
  if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
×
524
  if (pMeta->pTaskDb) {
×
525
    tdbTbClose(pMeta->pTaskDb);
×
526
    pMeta->pTaskDb = NULL;
×
527
  }
528
  if (pMeta->pCheckpointDb) {
×
529
    tdbTbClose(pMeta->pCheckpointDb);
×
530
  }
531
  if (pMeta->db) {
×
532
    tdbClose(pMeta->db);
×
533
  }
534

535
  if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
×
536
  if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
×
537
  if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
×
538
  if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
×
539
  if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt);
×
540

541
  if (pMeta->startInfo.pStagesList) taosArrayDestroy(pMeta->startInfo.pStagesList);
×
542
  taosMemoryFree(pMeta);
×
543

544
  stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code));
×
545
  return code;
×
546
}
547

548
// todo refactor: the lock shoud be restricted in one function
549
#ifdef BUILD_NO_CALL
550
void streamMetaInitBackend(SStreamMeta* pMeta) {
551
  pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
552
  if (pMeta->streamBackend == NULL) {
553
    while (1) {
554
      streamMetaWLock(pMeta);
555
      pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
556
      if (pMeta->streamBackend != NULL) {
557
        break;
558
      }
559

560
      streamMetaWUnLock(pMeta);
561
      stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId);
562
      taosMsleep(100);
563
    }
564
  }
565

566
  pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
567
  streamBackendLoadCheckpointInfo(pMeta);
568
}
569
#endif
570

571
void streamMetaClear(SStreamMeta* pMeta) {
×
572
  // remove all existed tasks in this vnode
573
  int64_t st = taosGetTimestampMs();
×
574
  void*   pIter = NULL;
×
575

576
  while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
×
577
    int64_t      refId = *(int64_t*)pIter;
×
578
    SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
×
579
    if (p == NULL) {
×
580
      continue;
×
581
    }
582

583
    // release the ref by timer
584
    if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) {  // one more ref in timer
×
585
      stDebug("s-task:%s stop schedTimer", p->id.idStr);
×
586
      streamTmrStop(p->schedInfo.pDelayTimer);
×
587
      p->info.delaySchedParam = 0;
×
588
    }
589

590
    int32_t code = taosRemoveRef(streamTaskRefPool, refId);
×
591
    if (code) {
×
592
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
593
    }
594

595
    code = taosReleaseRef(streamTaskRefPool, refId);
×
596
    if (code) {
×
597
      stError("vgId:%d failed to release refId:%" PRId64, pMeta->vgId, refId);
×
598
    }
599
  }
600

601
  int64_t et = taosGetTimestampMs();
×
602
  stDebug("vgId:%d clear task map, elapsed time:%.2fs", pMeta->vgId, (et - st)/1000.0);
×
603

604
  if (pMeta->streamBackendRid != 0) {
×
605
    int32_t code = taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
×
606
    if (code) {
×
607
      stError("vgId:%d remove stream backend Ref failed, rid:%" PRId64, pMeta->vgId, pMeta->streamBackendRid);
×
608
    }
609
  }
610

611
  int64_t et1 = taosGetTimestampMs();
×
612
  stDebug("vgId:%d clear backend completed, elapsed time:%.2fs", pMeta->vgId, (et1 - et)/1000.0);
×
613

614
  taosHashClear(pMeta->pTasksMap);
×
615

616
  taosArrayClear(pMeta->pTaskList);
×
617
  taosArrayClear(pMeta->chkpSaved);
×
618
  taosArrayClear(pMeta->chkpInUse);
×
619

620
  pMeta->numOfStreamTasks = 0;
×
621
  pMeta->numOfPausedTasks = 0;
×
622

623
  // the willrestart/starting flag can NOT be cleared
624
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
×
625
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
×
626

627
  taosArrayClear(pMeta->startInfo.pStagesList);
×
628
  pMeta->startInfo.readyTs = 0;
×
629
}
×
630

631
void streamMetaClose(SStreamMeta* pMeta) {
×
632
  if (pMeta == NULL) {
×
633
    return;
×
634
  }
635

636
  stDebug("vgId:%d start to close stream meta", pMeta->vgId);
×
637
  int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
×
638
  if (code) {
×
639
    stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
×
640
  }
641
}
642

643
void streamMetaCloseImpl(void* arg) {
×
644
  SStreamMeta* pMeta = arg;
×
645
  if (pMeta == NULL) {
×
646
    return;
×
647
  }
648

649
  int32_t code = 0;
×
650
  int32_t vgId = pMeta->vgId;
×
651
  stDebug("vgId:%d start to do-close stream meta", vgId);
×
652

653
  streamMetaWLock(pMeta);
×
654
  streamMetaClear(pMeta);
×
655
  streamMetaWUnLock(pMeta);
×
656

657
  // already log the error, ignore here
658
  tdbAbort(pMeta->db, pMeta->txn);
×
659
  tdbTbClose(pMeta->pTaskDb);
×
660
  tdbTbClose(pMeta->pCheckpointDb);
×
661
  tdbClose(pMeta->db);
×
662

663
  taosArrayDestroy(pMeta->pTaskList);
×
664
  taosArrayDestroy(pMeta->chkpSaved);
×
665
  taosArrayDestroy(pMeta->chkpInUse);
×
666

667
  taosHashCleanup(pMeta->pTasksMap);
×
668
  taosHashCleanup(pMeta->pTaskDbUnique);
×
669

670
  streamMetaUpdateInfoCleanup(&pMeta->updateInfo);
×
671
  streamMetaClearStartInfo(&pMeta->startInfo);
×
672

673
  destroyMetaHbInfo(pMeta->pHbInfo);
×
674
  pMeta->pHbInfo = NULL;
×
675

676
  taosMemoryFree(pMeta->path);
×
677
  streamMutexDestroy(&pMeta->backendMutex);
×
678

679
  bkdMgtDestroy(pMeta->bkdChkptMgt);
×
680

681
  pMeta->role = NODE_ROLE_UNINIT;
×
682
  code = taosThreadRwlockDestroy(&pMeta->lock);
×
683
  if (code) {
×
684
    stError("vgId:%d destroy rwlock, code:%s", vgId, tstrerror(code));
×
685
  }
686

687
  taosMemoryFree(pMeta);
×
688
  stDebug("vgId:%d end to close stream meta", vgId);
×
689
}
690

691
// todo let's check the status for each task
692
int32_t streamMetaSaveTaskInMeta(SStreamMeta* pMeta, SStreamTask* pTask) {
×
693
  int32_t vgId = pTask->pMeta->vgId;
×
694
  void*   buf = NULL;
×
695
  int32_t len;
696
  int32_t code;
697
  tEncodeSize(tEncodeStreamTask, pTask, len, code);
×
698
  if (code < 0) {
×
699
    return -1;
×
700
  }
701

702
  buf = taosMemoryCalloc(1, len);
×
703
  if (buf == NULL) {
×
704
    return terrno;
×
705
  }
706

707
  if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
×
708
    pTask->ver = SSTREAM_TASK_VER;
×
709
  }
710

711
  SEncoder encoder = {0};
×
712
  tEncoderInit(&encoder, buf, len);
×
713
  code = tEncodeStreamTask(&encoder, pTask);
×
714
  tEncoderClear(&encoder);
×
715

716
  if (code == -1) {
×
717
    stError("s-task:%s vgId:%d task meta encode failed, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
718
    return TSDB_CODE_INVALID_MSG;
×
719
  }
720

721
  int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
×
722

723
  code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
×
724
  if (code != TSDB_CODE_SUCCESS) {
×
725
    code = terrno;
×
726
    stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
×
727
            vgId, pTask->id.refId, tstrerror(code));
728

729
    int64_t refId = pTask->id.refId;
×
730
    int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
×
731
    if (ret != 0) {
×
732
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id[1], refId);
×
733
    }
734
  } else {
735
    stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
×
736
  }
737

738
  taosMemoryFree(buf);
×
739
  return code;
×
740
}
741

742
int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pTaskId) {
×
743
  int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
×
744
  int32_t code = tdbTbDelete(pMeta->pTaskDb, key, STREAM_TASK_KEY_LEN, pMeta->txn);
×
745
  if (code != 0) {
×
746
    stError("vgId:%d failed to remove task:0x%x from metastore, code:%s", pMeta->vgId, (int32_t)pTaskId->taskId,
×
747
            tstrerror(terrno));
748
  } else {
749
    stDebug("vgId:%d remove task:0x%x from metastore", pMeta->vgId, (int32_t)pTaskId->taskId);
×
750
  }
751

752
  return code;
×
753
}
754

755
// add to the ready tasks hash map, not the restored tasks hash map
756
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
×
757
  *pAdded = false;
×
758

759
  int32_t code = 0;
×
760
  int64_t refId = 0;
×
761
  STaskId id = streamTaskGetTaskId(pTask);
×
762
  void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
×
763

764
  if (p != NULL) {
×
765
    stDebug("s-task:0x%" PRIx64 " already exist in meta, no need to register", id.taskId);
×
766
    tFreeStreamTask(pTask);
×
767
    return code;
×
768
  }
769

770
  if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
×
771
    tFreeStreamTask(pTask);
×
772
    return code;
×
773
  }
774

775
  p = taosArrayPush(pMeta->pTaskList, &pTask->id);
×
776
  if (p == NULL) {
×
777
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
778
    tFreeStreamTask(pTask);
×
779
    return terrno;
×
780
  }
781

782
  pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
×
783
  code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
×
784
  if (code) {
×
785
    stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
×
786
    void* pUnused = taosArrayPop(pMeta->pTaskList);
×
787

788
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
789
    if (ret != 0) {
×
790
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
791
    }
792
    return code;
×
793
  }
794

795
  if ((code = streamMetaSaveTaskInMeta(pMeta, pTask)) != 0) {
×
796
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
797
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
798

799
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
800
    if (ret) {
×
801
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
802
    }
803
    return code;
×
804
  }
805

806
  if ((code = streamMetaCommit(pMeta)) != 0) {
×
807
    int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
808
    void*   pUnused = taosArrayPop(pMeta->pTaskList);
×
809

810
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
811
    if (ret) {
×
812
      stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
×
813
    }
814

815
    return code;
×
816
  }
817

818
  if (pTask->info.fillHistory == 0) {
×
819
    int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
×
820
  }
821

822
  *pAdded = true;
×
823
  return code;
×
824
}
825

826
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
×
827
  int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
×
828
  int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
×
829
  if (sizeInList != size) {
×
830
    stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
×
831
  }
832

833
  return size;
×
834
}
835

836
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
×
837
  QRY_PARAM_CHECK(pTask);
×
838
  STaskId  id = {.streamId = streamId, .taskId = taskId};
×
839
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
×
840
  if (pTaskRefId == NULL) {
×
841
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
842
  }
843

844
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
×
845
  if (p == NULL) {
×
846
    stDebug("s-task:%x failed to acquire task refId:%" PRId64 ", may have been destoried", taskId, *pTaskRefId);
×
847
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
848
  }
849

850
  if (p->id.refId != *pTaskRefId) {
×
851
    stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
×
852
            p->id.refId);
853
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
854
    if (ret) {
×
855
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
856
    }
857

858
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
859
  }
860

861
  if (streamTaskShouldStop(p)) {
×
862
    stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
×
863
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
864
    if (ret) {
×
865
      stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
×
866
    }
867
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
868
  }
869

870
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
×
871
  *pTask = p;
×
872
  return TSDB_CODE_SUCCESS;
×
873
}
874

875
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
×
876
  QRY_PARAM_CHECK(pTask);
×
877
  int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
×
878

879
  if (pTaskRefId == NULL) {
×
880
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
881
  }
882

883
  SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
×
884
  if (p == NULL) {
×
885
    stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
×
886
            *pTaskRefId);
887
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
888
  }
889

890
  if (p->id.refId != *pTaskRefId) {
×
891
    stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
×
892
            *pTaskRefId, p->id.refId);
893
    int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
×
894
    if (ret) {
×
895
      stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
×
896
    }
897

898
    return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
899
  }
900

901
  stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
×
902
  *pTask = p;
×
903
  return TSDB_CODE_SUCCESS;
×
904
}
905

906
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
×
907
  streamMetaRLock(pMeta);
×
908
  int32_t code = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId, pTask);
×
909
  streamMetaRUnLock(pMeta);
×
910
  return code;
×
911
}
912

913
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
×
914
  if (pTask == NULL) {
×
915
    return;
×
916
  }
917

918
  int32_t taskId = pTask->id.taskId;
×
919
  int64_t refId = pTask->id.refId;
×
920
  stDebug("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
×
921
  int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
×
922
  if (ret) {
×
923
    stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
×
924
  }
925
}
926

927
static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) {
×
928
  bool remove = false;
×
929
  for (int32_t i = 0; i < num; ++i) {
×
930
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
931
    if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
×
932
      taosArrayRemove(pTaskList, i);
×
933
      remove = true;
×
934
      break;
×
935
    }
936
  }
937

938
  if (!remove) {
×
939
    stError("s-task:0x%x not in meta task list, internal error", id->taskId);
×
940
  }
941
}
×
942

943
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
×
944
  int32_t code = 0;
×
945

946
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
947
    code = streamTaskSendCheckpointSourceRsp(pTask);
×
948
    if (code) {
×
949
      stError("s-task:%s vgId:%d failed to send checkpoint-source rsp, code:%s", pTask->id.idStr, pTask->pMeta->vgId,
×
950
              tstrerror(code));
951
    }
952
  }
953

954
  // let's kill the query procedure within stream, to end it ASAP.
955
  if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
×
956
    code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
×
957
    if (code != TSDB_CODE_SUCCESS) {
×
958
      stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
959
    }
960
  }
961
  return code;
×
962
}
963

964
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
×
965
  SStreamTask* pTask = NULL;
×
966
  int32_t      vgId = pMeta->vgId;
×
967
  int32_t      code = 0;
×
968
  STaskId      id = {.streamId = streamId, .taskId = taskId};
×
969

970
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
×
971
  if (code == 0) {
×
972
    // desc the paused task counter
973
    if (streamTaskShouldPause(pTask)) {
×
974
      int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
×
975
      stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
×
976
    }
977

978
    // handle the dropping event
979
    code = streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
×
980
    if (code) {
×
981
      stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
×
982
    }
983

984
    stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
×
985

986
    // it is a fill-history task, remove the related stream task's id that points to it
987
    if (pTask->info.fillHistory == 0) {
×
988
      int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
×
989
    }
990

991
    code = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
×
992
    doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
×
993
    code = streamMetaRemoveTaskInMeta(pMeta, &id);
×
994
    if (code) {
×
995
      stError("vgId:%d failed to remove task:0x%" PRIx64 ", code:%s", pMeta->vgId, id.taskId, tstrerror(code));
×
996
    }
997

998
    int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
×
999
    int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
×
1000
    if (sizeInList != size) {
×
1001
      stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
×
1002
    }
1003

1004
    if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
×
1005
      stDebug("s-task:%s stop schedTimer", pTask->id.idStr);
×
1006
      streamTmrStop(pTask->schedInfo.pDelayTimer);
×
1007
      pTask->info.delaySchedParam = 0;
×
1008
    }
1009

1010
    int64_t refId = pTask->id.refId;
×
1011
    int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
1012
    if (ret != 0) {
×
1013
      stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1014
    }
1015

1016
    streamMetaReleaseTask(pMeta, pTask);
×
1017
  } else {
1018
    stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
×
1019
  }
1020

1021
  return 0;
×
1022
}
1023

1024
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
×
1025
  SStreamTask* pTask = NULL;
×
1026
  int32_t      code = 0;
×
1027
  int32_t      vgId = pMeta->vgId;
×
1028
  int32_t      numOfTasks = 0;
×
1029

1030
  streamMetaWLock(pMeta);
×
1031

1032
//  code = streamMetaUnregisterTask(pMeta, streamId, taskId);
1033
//  numOfTasks = streamMetaGetNumOfTasks(pMeta);
1034
//  if (code) {
1035
//    stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1036
//  }
1037
//
1038
//  code = streamMetaCommit(pMeta);
1039
//  if (code) {
1040
//    stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code));
1041
//  } else {
1042
//    stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks);
1043
//  }
1044

1045
  streamMetaWUnLock(pMeta);
×
1046

1047
  return code;
×
1048
}
1049

1050
int32_t streamMetaBegin(SStreamMeta* pMeta) {
×
1051
  streamMetaWLock(pMeta);
×
1052
  int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
×
1053
                          TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1054
  if (code) {
×
1055
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1056
  }
1057
  streamMetaWUnLock(pMeta);
×
1058
  return code;
×
1059
}
1060

1061
int32_t streamMetaCommit(SStreamMeta* pMeta) {
×
1062
  int32_t code = tdbCommit(pMeta->db, pMeta->txn);
×
1063
  if (code != 0) {
×
1064
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1065
    stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1066
            pMeta->fatalInfo.line);
1067
  }
1068

1069
  code = tdbPostCommit(pMeta->db, pMeta->txn);
×
1070
  if (code != 0) {
×
1071
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1072
    stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
×
1073
            pMeta->fatalInfo.line);
1074
    return code;
×
1075
  }
1076

1077
  code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
×
1078
                  TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
1079
  if (code != 0) {
×
1080
    streamSetFatalError(pMeta, code, __func__, __LINE__);
×
1081
    stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
×
1082
  } else {
1083
    stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
×
1084
  }
1085

1086
  return code;
×
1087
}
1088

1089
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
×
1090
  int64_t checkpointId = 0;
×
1091
  int32_t code = 0;
×
1092

1093
  TBC* pCur = NULL;
×
1094
  if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
×
1095
    stError("failed to open stream meta file, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1096
    return checkpointId;
×
1097
  }
1098

1099
  void*    pKey = NULL;
×
1100
  int32_t  kLen = 0;
×
1101
  void*    pVal = NULL;
×
1102
  int32_t  vLen = 0;
×
1103
  SDecoder decoder;
1104

1105
  code = tdbTbcMoveToFirst(pCur);
×
1106
  if (code) {
×
1107
    stError("failed to move stream meta file cursor, the latest checkpointId is 0, vgId:%d", pMeta->vgId);
×
1108
    tdbTbcClose(pCur);
×
1109
    return checkpointId;
×
1110
  }
1111

1112
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
1113
    if (pVal == NULL || vLen == 0) {
×
1114
      break;
1115
    }
1116
    SCheckpointInfo info;
1117
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
×
1118
    if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
×
1119
      tDecoderClear(&decoder);
×
1120
      continue;
×
1121
    }
1122
    tDecoderClear(&decoder);
×
1123

1124
    checkpointId = TMAX(checkpointId, info.checkpointId);
×
1125
  }
1126

1127
  stDebug("vgId:%d get max checkpointId:%" PRId64, pMeta->vgId, checkpointId);
×
1128

1129
  tdbFree(pKey);
×
1130
  tdbFree(pVal);
×
1131

1132
  tdbTbcClose(pCur);
×
1133
  return checkpointId;
×
1134
}
1135

1136
// not allowed to return error code
1137
void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
×
1138
  TBC*     pCur = NULL;
×
1139
  void*    pKey = NULL;
×
1140
  int32_t  kLen = 0;
×
1141
  void*    pVal = NULL;
×
1142
  int32_t  vLen = 0;
×
1143
  SDecoder decoder;
1144
  int32_t  vgId = 0;
×
1145
  int32_t  code = 0;
×
1146
  SArray*  pRecycleList = NULL;
×
1147

1148
  if (pMeta == NULL) {
×
1149
    return;
×
1150
  }
1151

1152
  vgId = pMeta->vgId;
×
1153
  pRecycleList = taosArrayInit(4, sizeof(STaskId));
×
1154
  if (pRecycleList == NULL) {
×
1155
    stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
×
1156
    return;
×
1157
  }
1158

1159
  stInfo("vgId:%d load stream tasks from meta files", vgId);
×
1160

1161
  code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
×
1162
  if (code != TSDB_CODE_SUCCESS) {
×
1163
    stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1164
    taosArrayDestroy(pRecycleList);
×
1165
    return;
×
1166
  }
1167

1168
  code = tdbTbcMoveToFirst(pCur);
×
1169
  if (code) {
×
1170
    stError("vgId:%d failed to open stream meta cursor, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
×
1171
    taosArrayDestroy(pRecycleList);
×
1172
    tdbTbcClose(pCur);
×
1173
    return;
×
1174
  }
1175

1176
  while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
×
1177
    if (pVal == NULL || vLen == 0) {
×
1178
      break;
1179
    }
1180

1181
    SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
×
1182
    if (pTask == NULL) {
×
1183
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1184
      stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
×
1185
      break;
×
1186
    }
1187

1188
    tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
×
1189
    if (tDecodeStreamTask(&decoder, pTask) < 0) {
×
1190
      tDecoderClear(&decoder);
×
1191
      tFreeStreamTask(pTask);
×
1192
      stError(
×
1193
          "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
1194
          "stream manually",
1195
          vgId, tsDataDir);
1196
      break;
×
1197
    }
1198
    tDecoderClear(&decoder);
×
1199

1200
    if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
×
1201
      int32_t taskId = pTask->id.taskId;
×
1202
      STaskId id = streamTaskGetTaskId(pTask);
×
1203

1204
      tFreeStreamTask(pTask);
×
1205
      void* px = taosArrayPush(pRecycleList, &id);
×
1206
      if (px == NULL) {
×
1207
        stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
×
1208
      }
1209

1210
      int32_t total = taosArrayGetSize(pRecycleList);
×
1211
      stDebug("s-task:0x%x is already dropped, add into recycle list, total:%d", taskId, total);
×
1212
      continue;
×
1213
    }
1214

1215
    stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
×
1216
            pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
1217

1218
    // do duplicate task check.
1219
    STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
×
1220
    void*   p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
×
1221
    if (p == NULL) {
×
1222
      code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
×
1223
      if (code < 0) {
×
1224
        stError("failed to load s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
×
1225
        tFreeStreamTask(pTask);
×
1226
        continue;
×
1227
      }
1228

1229
      void* px = taosArrayPush(pMeta->pTaskList, &pTask->id);
×
1230
      if (px == NULL) {
×
1231
        stFatal("s-task:0x%x failed to add into task list due to out of memory", pTask->id.taskId);
×
1232
      }
1233
    } else {
1234
      // todo this should replace the existed object put by replay creating stream task msg from mnode
1235
      stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
×
1236
      tFreeStreamTask(pTask);
×
1237
      continue;
×
1238
    }
1239

1240
    pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
×
1241

1242
    if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
×
1243
      int64_t refId = pTask->id.refId;
×
1244
      stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
×
1245
              pTask->id.taskId, tstrerror(terrno), refId);
1246

1247
      void*   px = taosArrayPop(pMeta->pTaskList);
×
1248
      int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
×
1249
      if (ret != 0) {
×
1250
        stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
×
1251
      }
1252
      continue;
×
1253
    }
1254

1255
    stInfo("s-task:0x%x vgId:%d set refId:%" PRId64, (int32_t)id.taskId, vgId, pTask->id.refId);
×
1256
    if (pTask->info.fillHistory == 0) {
×
1257
      int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
×
1258
    }
1259

1260
    if (streamTaskShouldPause(pTask)) {
×
1261
      int32_t val = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
×
1262
    }
1263
  }
1264

1265
  tdbFree(pKey);
×
1266
  tdbFree(pVal);
×
1267

1268
  tdbTbcClose(pCur);
×
1269

1270
  if (taosArrayGetSize(pRecycleList) > 0) {
×
1271
    for (int32_t i = 0; i < taosArrayGetSize(pRecycleList); ++i) {
×
1272
      STaskId* pId = taosArrayGet(pRecycleList, i);
×
1273
      code = streamMetaRemoveTaskInMeta(pMeta, pId);
×
1274
      if (code) {
×
1275
        stError("s-task:0x%" PRIx64 " failed to remove task, code:%s", pId->taskId, tstrerror(code));
×
1276
      }
1277
    }
1278
  }
1279

1280
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
×
1281
  stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
×
1282
          pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
1283

1284
  taosArrayDestroy(pRecycleList);
×
1285
  code = streamMetaCommit(pMeta);
×
1286
  if (code) {
×
1287
    stError("vgId:%d failed to commit, code:%s", pMeta->vgId, tstrerror(code));
×
1288
  }
1289
}
1290

1291
void streamMetaNotifyClose(SStreamMeta* pMeta) {
×
1292
  int32_t vgId = pMeta->vgId;
×
1293
  int64_t startTs = 0;
×
1294
  int32_t sendCount = 0;
×
1295

1296
  streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
×
1297
  stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
×
1298
         vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
1299

1300
  // wait for the stream meta hb function stopping
1301
  pMeta->closeFlag = true;
×
1302
  streamMetaWaitForHbTmrQuit(pMeta);
×
1303

1304
  stDebug("vgId:%d start to check all tasks for closing", vgId);
×
1305
  int64_t st = taosGetTimestampMs();
×
1306

1307
  streamMetaRLock(pMeta);
×
1308

1309
  SArray* pTaskList = NULL;
×
1310
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
×
1311
  if (code != TSDB_CODE_SUCCESS) {
1312
  }
1313

1314
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
×
1315
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
1316
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
1317
    SStreamTask*   pTask = NULL;
×
1318

1319
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
1320
    if (code != TSDB_CODE_SUCCESS) {
×
1321
      continue;
×
1322
    }
1323

1324
    int64_t refId = pTask->id.refId;
×
1325
    int32_t ret = streamTaskStop(pTask);
×
1326
    if (ret) {
×
1327
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
1328
    }
1329

1330
    streamMetaReleaseTask(pMeta, pTask);
×
1331
    ret = taosRemoveRef(streamTaskRefPool, refId);
×
1332
    if (ret) {
×
1333
      stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
×
1334
    }
1335
  }
1336

1337
  taosArrayDestroy(pTaskList);
×
1338

1339
  double el = (taosGetTimestampMs() - st) / 1000.0;
×
1340
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
×
1341

1342
  if (pMeta->scanInfo.scanTimer != NULL) {
×
1343
    streamTmrStop(pMeta->scanInfo.scanTimer);
×
1344
    pMeta->scanInfo.scanTimer = NULL;
×
1345
  }
1346

1347
  streamMetaRUnLock(pMeta);
×
1348
}
×
1349

1350
void streamMetaStartHb(SStreamMeta* pMeta) {
×
1351
  int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
×
1352
  if (pRid == NULL) {
×
1353
    stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId);
×
1354
    return;
×
1355
  }
1356

1357
  *pRid = pMeta->rid;
×
1358
  int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
×
1359
  if (code) {
×
1360
    return;
×
1361
  }
1362

1363
  streamMetaHbToMnode(pRid, NULL);
×
1364
}
1365

1366
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
×
1367
  QRY_PARAM_CHECK(pList);
×
1368

1369
  int32_t code = 0;
×
1370
  SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
×
1371
  if (pTaskList == NULL) {
×
1372
    stError("failed to generate the task list during send hbMsg to mnode, vgId:%d, code: out of memory", pMeta->vgId);
×
1373
    return terrno;
×
1374
  }
1375

1376
  *pList = pTaskList;
×
1377

1378
  bool sendMsg = pMeta->sendMsgBeforeClosing;
×
1379
  if (!sendMsg) {
×
1380
    stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
×
1381
    return TSDB_CODE_SUCCESS;
×
1382
  }
1383

1384
  stDebug("vgId:%d send msg to mnode before closing all tasks", pMeta->vgId);
×
1385

1386
  // send hb msg to mnode before closing all tasks.
1387
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
×
1388
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
1389
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
1390
    SStreamTask*   pTask = NULL;
×
1391

1392
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
1393
    if (code != TSDB_CODE_SUCCESS) {  // this error is ignored
×
1394
      continue;
×
1395
    }
1396

1397
    streamTaskSetCheckpointFailed(pTask);
×
1398
    streamMetaReleaseTask(pMeta, pTask);
×
1399
  }
1400

1401
  code = streamMetaSendHbHelper(pMeta);
×
1402
  pMeta->sendMsgBeforeClosing = false;
×
1403
  return TSDB_CODE_SUCCESS;  // always return true
×
1404
}
1405

1406
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
×
1407
  streamMetaWLock(pMeta);
×
1408

1409
  int64_t prevStage = pMeta->stage;
×
1410
  pMeta->stage = stage;
×
1411

1412
  // mark the sign to send msg before close all tasks
1413
  // 1. for leader vnode, always send msg before closing
1414
  // 2. for follower vnode, if it's changed from leader, also sending msg before closing.
1415
  if (pMeta->role == NODE_ROLE_LEADER) {
×
1416
    pMeta->sendMsgBeforeClosing = true;
×
1417
  }
1418

1419
  pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER;
×
1420
  if (!isLeader) {
×
1421
    streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
×
1422
  } else {  // wait for nodeep update if become leader from follower
1423
    if (prevStage == NODE_ROLE_FOLLOWER) {
×
1424
      pMeta->startInfo.tasksWillRestart = 1;
×
1425
    }
1426
  }
1427

1428
  streamMetaWUnLock(pMeta);
×
1429

1430
  if (isLeader) {
×
1431
    if (prevStage == NODE_ROLE_FOLLOWER) {
×
1432
      stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64
×
1433
             " restart after nodeEp being updated",
1434
             pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
1435
    } else {
1436
      stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
×
1437
             pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
1438
    }
1439
    streamMetaStartHb(pMeta);
×
1440
  } else {
1441
    stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
×
1442
           stage, prevStage, isLeader, pMeta->sendMsgBeforeClosing);
1443
  }
1444
}
×
1445

1446
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
×
1447
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
×
1448
  for (int32_t i = 0; i < num; ++i) {
×
1449
    SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
×
1450
    STaskId        id = {.streamId = pId->streamId, .taskId = pId->taskId};
×
1451
    SStreamTask*   pTask = NULL;
×
1452
    int32_t        code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
×
1453

1454
    if (code == 0) {
×
1455
      if (pTask->status.downstreamReady == 0) {
×
1456
        streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
×
1457
        return false;
×
1458
      }
1459
      streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
×
1460
    }
1461
  }
1462

1463
  return true;
×
1464
}
1465

1466
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
×
1467
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
×
1468

1469
  stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
×
1470
  if (numOfTasks == 0) {
×
1471
    return TSDB_CODE_SUCCESS;
×
1472
  }
1473

1474
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
1475
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
×
1476
    STaskId        id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
×
1477
    SStreamTask*   pTask = NULL;
×
1478
    int32_t        code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
×
1479
    if (code == 0) {
×
1480
      streamTaskResetStatus(pTask);
×
1481
      streamMetaReleaseTask(pMeta, pTask);
×
1482
    }
1483
  }
1484

1485
  return 0;
×
1486
}
1487

1488
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
×
1489
                                     int64_t startTs) {
1490
  const char* id = pTask->id.idStr;
×
1491
  int32_t     vgId = pMeta->vgId;
×
1492
  int32_t     code = 0;
×
1493

1494
  // keep the already updated info
1495
  STaskUpdateEntry entry = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .transId = transId};
×
1496
  code = taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
×
1497
  if (code != 0) {
×
1498
    stError("s-task:%s failed to put updateTask into update list", id);
×
1499
  }
1500

1501
  int64_t el = taosGetTimestampMs() - startTs;
×
1502
  if (pHTask != NULL) {
×
1503
    STaskUpdateEntry hEntry = {.streamId = pHTask->id.streamId, .taskId = pHTask->id.taskId, .transId = transId};
×
1504
    code = taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0);
×
1505
    if (code != 0) {
×
1506
      stError("s-task:%s failed to put updateTask into update list", id);
×
1507
    } else {
1508
      stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64
×
1509
              " ms",
1510
              id, vgId, transId, el);
1511
    }
1512
  } else {
1513
    stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms",
×
1514
            id, vgId, transId, el);
1515
  }
1516
}
×
1517

1518
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
×
1519
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
×
1520
  int32_t          num = taosArrayGetSize(pInfo->pTaskList);
×
1521

1522
  taosHashClear(pInfo->pTasks);
×
1523
  taosArrayClear(pInfo->pTaskList);
×
1524

1525
  int32_t prev = pInfo->completeTransId;
×
1526
  pInfo->completeTransId = pInfo->activeTransId;
×
1527
  pInfo->activeTransId = -1;
×
1528
  pInfo->completeTs = taosGetTimestampMs();
×
1529

1530
  stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64
×
1531
          ", complete transId:%d->%d, update Tasks:%d reset active transId",
1532
          pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId, num);
1533
}
×
1534

1535
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
×
1536
  STaskUpdateInfo* pInfo = &pMeta->updateInfo;
×
1537

1538
  if (transId > pInfo->completeTransId) {
×
1539
    if (pInfo->activeTransId == -1) {
×
1540
      taosHashClear(pInfo->pTasks);
×
1541
      pInfo->activeTransId = transId;
×
1542

1543
      stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId,
×
1544
             pInfo->completeTransId);
1545
      return true;
×
1546
    } else {
1547
      if (pInfo->activeTransId == transId) {
×
1548
        // do nothing
1549
        return true;
×
1550
      } else if (transId < pInfo->activeTransId) {
×
1551
        stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard",
×
1552
                pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
1553
        return false;
×
1554
      } else {  // transId > pInfo->activeTransId
1555
        taosHashClear(pInfo->pTasks);
×
1556
        int32_t prev = pInfo->activeTransId;
×
1557
        pInfo->activeTransId = transId;
×
1558

1559
        stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId,
×
1560
               transId, prev, pInfo->completeTransId);
1561
        return true;
×
1562
      }
1563
    }
1564
  } else if (transId == pInfo->completeTransId) {
×
1565
    stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
×
1566
            pInfo->completeTs);
1567
    return false;
×
1568
  } else {  // pInfo->completeTransId > transId
1569
    stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
×
1570
            pMeta->vgId, pInfo->activeTransId, transId);
1571
    return false;
×
1572
  }
1573
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc