• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5029

21 Apr 2026 10:00AM UTC coverage: 73.003% (+0.02%) from 72.986%
#5029

push

travis-ci

web-flow
fix(tmq): balance vgroup error (#35183)

273843 of 375111 relevant lines covered (73.0%)

134074995.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.84
/source/dnode/vnode/src/tq/tqInterface.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include <string.h>
18
#include "osDef.h"
19
#include "taoserror.h"
20
#include "stream.h"
21
#include "vnd.h"
22

23
#define MAX_LOOP 100
24

25
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return pHandle != NULL ? TMQ_HANDLE_STATUS_EXEC == pHandle->status : true; }
306,791,721✔
26
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
305,503,642✔
27
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
305,473,170✔
28

29
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
9,421,282✔
30
  if (pLeft == NULL || pRight == NULL) {
9,421,282✔
31
    return false;
×
32
  }
33
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
18,530,742✔
34
         pLeft->val.version == pRight->val.version;
9,110,109✔
35
}
36

37
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
9,716,551✔
38
  if (pTq == NULL) {
9,716,551✔
39
    return TSDB_CODE_INVALID_PARA;
×
40
  }
41
  SMqVgOffset vgOffset = {0};
9,716,551✔
42
  int32_t     vgId = TD_VID(pTq->pVnode);
9,716,551✔
43

44
  int32_t  code = 0;
9,716,551✔
45
  SDecoder decoder = {0};
9,716,551✔
46
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
9,716,551✔
47
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
9,716,551✔
48
    code = TSDB_CODE_INVALID_MSG;
×
49
    goto end;
×
50
  }
51

52
  tDecoderClear(&decoder);
9,716,198✔
53

54
  STqOffset* pOffset = &vgOffset.offset;
9,716,551✔
55

56
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
9,716,551✔
57
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
319,459✔
58
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
59
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
9,397,092✔
60
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
9,397,092✔
61
            pOffset->val.version);
62
  } else {
63
    tqError("invalid commit offset type:%d", pOffset->val.type);
×
64
    code = TSDB_CODE_INVALID_MSG;
×
65
    goto end;
×
66
  }
67

68
  STqOffset* pSavedOffset = NULL;
9,716,551✔
69
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
9,716,551✔
70
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
9,716,551✔
71
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
10,442✔
72
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
73
    goto end;  // no need to update the offset value
10,442✔
74
  }
75

76
  // if (pOffset->val.type == TMQ_OFFSET__LOG && vgOffset.markWal) {
77
  //   int32_t ret = walSetKeepVersion(pTq->pVnode->pWal, pOffset->val.version);
78
  //   tqDebug("set wal reader keep version to %" PRId64 " for vgId:%d sub:%s, code:%d", pOffset->val.version, vgId,
79
  //           pOffset->subKey, ret);
80
  // }
81
  // save the new offset value
82
  code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
9,705,460✔
83
  if (code != 0) {
9,706,109✔
84
    goto end;
×
85
  }
86

87
  return 0;
9,706,109✔
88
end:
10,442✔
89
  tDecoderClear(&decoder);
10,442✔
90
  tOffsetDestroy(&vgOffset.offset.val);
10,442✔
91
  return code;
10,442✔
92
}
93

94
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
305,526,736✔
95
  if (pTq == NULL || pMsg == NULL) {
305,526,736✔
96
    return TSDB_CODE_INVALID_PARA;
×
97
  }
98
  SMqPollReq req = {0};
305,528,908✔
99
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
305,529,908✔
100
  if (code < 0) {
305,491,186✔
101
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
102
    code = TSDB_CODE_INVALID_MSG;
×
103
    goto END;
×
104
  }
105
  if (req.rawData == 1){
305,491,186✔
106
    req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
107
    if (req.uidHash == NULL) {
×
108
      tqError("tq poll rawData taosHashInit failed");
×
109
      code = terrno;
×
110
      goto END;
×
111
    }
112
  }
113
  int64_t      consumerId = req.consumerId;
305,491,186✔
114
  int32_t      reqEpoch = req.epoch;
305,491,186✔
115
  int32_t      vgId = TD_VID(pTq->pVnode);
305,491,186✔
116
  STqHandle*   pHandle = NULL;
305,510,372✔
117

118
  while (1) {
58,203✔
119
    taosWLockLatch(&pTq->lock);
305,484,674✔
120
    // 1. find handle
121
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
305,559,905✔
122
    if (code != TDB_CODE_SUCCESS) {
305,553,696✔
123
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found, msg:%s", consumerId, vgId, req.subKey, tstrerror(code));
3,455✔
124
      taosWUnLockLatch(&pTq->lock);
3,455✔
125
      return code;
3,455✔
126
    }
127

128
    // 2. check rebalance status
129
    if (pHandle->consumerId != consumerId) {
305,550,241✔
130
      tqWarn("tmq poll: consumer:0x%" PRIx64" vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
311✔
131
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
132
      code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
311✔
133
      taosWUnLockLatch(&pTq->lock);
311✔
134
      goto END;
311✔
135
    }
136

137
    bool exec = tqIsHandleExec(pHandle);
305,553,100✔
138
    if (!exec) {
305,555,151✔
139
      tqSetHandleExec(pHandle);
305,503,642✔
140
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
141
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
305,506,419✔
142
              req.subKey, pHandle);
143
      taosWUnLockLatch(&pTq->lock);
305,560,758✔
144
      break;
305,537,019✔
145
    }
146
    taosWUnLockLatch(&pTq->lock);
51,509✔
147

148
    tqDebug("tmq poll: consumer:0x%" PRIx64
58,203✔
149
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
150
            consumerId, vgId, req.subKey, pHandle);
151
    taosMsleep(10);
58,203✔
152
  }
153

154
  // 3. update the epoch value
155
  if (pHandle->epoch < reqEpoch) {
305,537,019✔
156
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
490,211✔
157
            reqEpoch);
158
    pHandle->epoch = reqEpoch;
490,211✔
159
  }
160

161
  char buf[TSDB_OFFSET_LEN] = {0};
305,537,019✔
162
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &req.reqOffset);
305,535,381✔
163
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, QID:0x%" PRIx64,
305,535,628✔
164
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
165

166
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
305,536,343✔
167
  tqSetHandleIdle(pHandle);
305,473,170✔
168

169
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
305,482,023✔
170
          req.subKey, pHandle);
171

172
END:
305,526,979✔
173
  tDestroySMqPollReq(&req);
305,537,663✔
174
  return code;
305,536,950✔
175
}
176

177
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
323✔
178
  if (pTq == NULL || pMsg == NULL) {
323✔
179
    return TSDB_CODE_INVALID_PARA;
×
180
  }
181
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
323✔
182
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
323✔
183

184
  SMqVgOffset vgOffset = {0};
323✔
185

186
  SDecoder decoder;
323✔
187
  tDecoderInit(&decoder, (uint8_t*)data, len);
323✔
188
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
323✔
189
    tDecoderClear(&decoder);
×
190
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
191
    return terrno;
×
192
  }
193

194
  tDecoderClear(&decoder);
323✔
195

196
  STqOffset* pSavedOffset = NULL;
323✔
197
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
323✔
198
  if (code != 0) {
323✔
199
    return TSDB_CODE_TMQ_NO_COMMITTED;
×
200
  }
201
  vgOffset.offset = *pSavedOffset;
323✔
202

203
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
323✔
204
  if (code < 0) {
323✔
205
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
206
  }
207

208
  void* buf = rpcMallocCont(len);
323✔
209
  if (buf == NULL) {
323✔
210
    return terrno;
×
211
  }
212
  SEncoder encoder = {0};
323✔
213
  tEncoderInit(&encoder, buf, len);
323✔
214
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
323✔
215
  tEncoderClear(&encoder);
323✔
216
  if (code < 0) {
323✔
217
    rpcFreeCont(buf);
×
218
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
219
  }
220

221
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
323✔
222

223
  tmsgSendRsp(&rsp);
323✔
224
  return 0;
323✔
225
}
226

227
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
646✔
228
  if (pTq == NULL || pMsg == NULL) {
646✔
229
    return TSDB_CODE_INVALID_PARA;
×
230
  }
231
  int32_t    code = 0;
646✔
232
  SMqPollReq req = {0};
646✔
233
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
646✔
234
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
235
    return TSDB_CODE_INVALID_MSG;
×
236
  }
237

238
  int64_t      consumerId = req.consumerId;
646✔
239
  STqOffsetVal reqOffset = req.reqOffset;
646✔
240
  int32_t      vgId = TD_VID(pTq->pVnode);
646✔
241

242
  // 1. find handle
243
  taosRLockLatch(&pTq->lock);
646✔
244
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
646✔
245
  if (pHandle == NULL) {
646✔
246
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
247
    taosRUnLockLatch(&pTq->lock);
×
248
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
249
  }
250

251
  // 2. check rebalance status
252
  if (pHandle->consumerId != consumerId) {
646✔
253
    tqError("%s consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
254
            __func__, consumerId, vgId, req.subKey, pHandle->consumerId);
255
    taosRUnLockLatch(&pTq->lock);
×
256
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
257
  }
258

259
  int64_t sver = 0, ever = 0;
646✔
260
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
646✔
261
  taosRUnLockLatch(&pTq->lock);
646✔
262

263
  SMqDataRsp dataRsp = {0};
646✔
264
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
646✔
265
  if (code != 0) {
646✔
266
    return code;
×
267
  }
268

269
  if (req.useSnapshot == true) {
646✔
270
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
271
    code = TSDB_CODE_INVALID_PARA;
×
272
    goto END;
×
273
  }
274

275
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
646✔
276

277
  if (reqOffset.type == TMQ_OFFSET__LOG) {
646✔
278
    dataRsp.rspOffset.version = reqOffset.version;
323✔
279
  } else if (reqOffset.type < 0) {
323✔
280
    STqOffset* pOffset = NULL;
323✔
281
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
323✔
282
    if (code == 0) {
323✔
283
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
323✔
284
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
285
        code = TSDB_CODE_INVALID_PARA;
×
286
        goto END;
×
287
      }
288

289
      dataRsp.rspOffset.version = pOffset->val.version;
323✔
290
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
323✔
291
             req.subKey, dataRsp.rspOffset.version);
292
    } else {
293
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
×
294
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
×
295
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
×
296
        dataRsp.rspOffset.version = ever;
×
297
      }
298
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
×
299
             dataRsp.rspOffset.version);
300
    }
301
  } else {
302
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
303
            reqOffset.type);
304
    code = TSDB_CODE_INVALID_PARA;
×
305
    goto END;
×
306
  }
307

308
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
646✔
309

310
END:
646✔
311
  tDeleteMqDataRsp(&dataRsp);
646✔
312
  return code;
646✔
313
}
314

315
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
306,971✔
316
  if (pTq == NULL || msg == NULL) {
306,971✔
317
    return TSDB_CODE_INVALID_PARA;
×
318
  }
319
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
306,971✔
320
  int32_t        vgId = TD_VID(pTq->pVnode);
306,971✔
321

322
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
306,971✔
323
  int32_t code = 0;
306,971✔
324

325
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
306,971✔
326
  if (pHandle) {
306,971✔
327
    while (1) {
×
328
      taosWLockLatch(&pTq->lock);
306,056✔
329
      bool exec = tqIsHandleExec(pHandle);
306,056✔
330

331
      if (exec) {
306,056✔
332
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
×
333
               pHandle->subKey, pHandle);
334
        taosWUnLockLatch(&pTq->lock);
×
335
        taosMsleep(10);
×
336
        continue;
×
337
      }
338
      // tqUnregisterPushHandle(pTq, pHandle);
339
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
306,056✔
340
      if (code != 0) {
305,702✔
341
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
342
      }
343
      taosWUnLockLatch(&pTq->lock);
305,702✔
344
      break;
305,381✔
345
    }
346
  }
347

348
  taosWLockLatch(&pTq->lock);
306,296✔
349
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
306,971✔
350
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
88,621✔
351
  }
352
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
306,971✔
353
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
286,575✔
354
  }
355

356
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
306,932✔
357
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
358
  }
359
  taosWUnLockLatch(&pTq->lock);
306,609✔
360

361
  return 0;
306,971✔
362
}
363

364
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,354,832✔
365
  if (pTq == NULL || msg == NULL) {
1,354,832✔
366
    return TSDB_CODE_INVALID_PARA;
×
367
  }
368
  int         ret = 0;
1,355,019✔
369
  SMqRebVgReq req = {0};
1,355,019✔
370
  SDecoder    dc = {0};
1,355,019✔
371

372
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
1,355,019✔
373
  ret = tDecodeSMqRebVgReq(&dc, &req);
1,355,019✔
374
  if (ret < 0) {
1,354,316✔
375
    goto end;
×
376
  }
377

378
  tqInfo("vgId:%d, tmq subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
1,354,316✔
379
         req.oldConsumerId, req.newConsumerId);
380

381
  taosRLockLatch(&pTq->lock);  
1,355,004✔
382
  STqHandle* pHandle = NULL;
1,355,019✔
383
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
1,355,019✔
384
  if (code != 0){
1,354,396✔
385
    tqInfo("vgId:%d, tmq subscribe req:%s, no such handle, create new one, msg:%s", pTq->pVnode->config.vgId, req.subKey, tstrerror(code));
421,831✔
386
  }
387
  taosRUnLockLatch(&pTq->lock);
1,355,019✔
388
  if (pHandle == NULL) {
1,355,019✔
389
    if (req.oldConsumerId != -1) {
422,454✔
390
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
×
391
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
392
    }
393
    if (req.newConsumerId == -1) {
422,454✔
394
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64, req.vgId, req.newConsumerId);
×
395
      ret = TSDB_CODE_INVALID_PARA;
×
396
      goto end;
×
397
    }
398
    STqHandle handle = {0};
422,454✔
399
    ret = tqMetaCreateHandle(pTq, &req, &handle);
422,454✔
400
    if (ret < 0) {
422,454✔
401
      tqDestroySTqHandle(&handle);
×
402
      goto end;
×
403
    }
404
    taosWLockLatch(&pTq->lock);
422,454✔
405
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
422,454✔
406
    taosWUnLockLatch(&pTq->lock);
421,438✔
407
  } else {
408
    int maxLoop = MAX_LOOP;
932,565✔
409
    while (maxLoop-- > 0) {
932,565✔
410
      taosWLockLatch(&pTq->lock);
932,565✔
411
      bool exec = tqIsHandleExec(pHandle);
932,565✔
412
      if (exec) {
932,565✔
413
        tqInfo("vgId:%d, topic:%s, subscribe is executing, subscribe wait for 10ms and retry, pHandle:%p",
×
414
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
415
        taosWUnLockLatch(&pTq->lock);
×
416
        taosMsleep(10);
×
417
        continue;
×
418
      }
419
      if (req.subType == TOPIC_SUB_TYPE__COLUMN && strcmp(req.qmsg, pHandle->execHandle.execCol.qmsg) != 0) {
935,429✔
420
        tqInfo("vgId:%d, topic:%s, subscribe qmsg changed from %s to %s, need recreate handle", pTq->pVnode->config.vgId,
2,864✔
421
               pHandle->subKey, pHandle->execHandle.execCol.qmsg, req.qmsg);
422
        // tqUnregisterPushHandle(pTq, pHandle);
423
        STqHandle handle = {0};
2,864✔
424
        ret = tqMetaCreateHandle(pTq, &req, &handle);
2,864✔
425
        if (ret < 0) {
2,864✔
426
          tqDestroySTqHandle(&handle);
×
427
          taosWUnLockLatch(&pTq->lock);
×
428
          goto end;
×
429
        }
430
        ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
2,864✔
431
        tqInfo("vgId:%d, reload subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
2,864✔
432
         req.oldConsumerId, req.newConsumerId);
433
      } else if (pHandle->consumerId == req.newConsumerId) {  // do nothing
929,701✔
434
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
46,520✔
435
      } else {
436
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
883,181✔
437
               req.newConsumerId);
438

439
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
883,181✔
440
        atomic_store_32(&pHandle->epoch, 0);
883,181✔
441
        // tqUnregisterPushHandle(pTq, pHandle);
442
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
883,181✔
443
      }
444
      taosWUnLockLatch(&pTq->lock);
931,724✔
445
      break;
932,204✔
446
    }
447
  }
448

449
end:
1,353,327✔
450
  if (req.schema.pSchema != NULL) {
1,353,790✔
451
    taosMemoryFree(req.schema.pSchema);
786,459✔
452
  }
453
  tDecoderClear(&dc);
1,353,714✔
454
  return ret;
1,350,456✔
455
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc