• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5051

13 May 2026 12:00PM UTC coverage: 73.358% (-0.04%) from 73.398%
#5051

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

714 existing lines in 146 files now uncovered.

281543 of 383795 relevant lines covered (73.36%)

135448694.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.84
/source/dnode/vnode/src/tq/tqInterface.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include <string.h>
18
#include "osDef.h"
19
#include "taoserror.h"
20
#include "stream.h"
21
#include "vnd.h"
22

23
#define MAX_LOOP 100
24

25
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return pHandle != NULL ? TMQ_HANDLE_STATUS_EXEC == pHandle->status : true; }
334,493,560✔
26
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
333,129,902✔
27
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
333,082,197✔
28

29
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
13,187,007✔
30
  if (pLeft == NULL || pRight == NULL) {
13,187,007✔
31
    return false;
×
32
  }
33
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
26,017,411✔
34
         pLeft->val.version == pRight->val.version;
12,830,404✔
35
}
36

37
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
13,522,894✔
38
  if (pTq == NULL) {
13,522,894✔
39
    return TSDB_CODE_INVALID_PARA;
×
40
  }
41
  SMqVgOffset vgOffset = {0};
13,522,894✔
42
  int32_t     vgId = TD_VID(pTq->pVnode);
13,522,894✔
43

44
  int32_t  code = 0;
13,522,894✔
45
  SDecoder decoder = {0};
13,522,894✔
46
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
13,522,894✔
47
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
13,522,894✔
48
    code = TSDB_CODE_INVALID_MSG;
×
49
    goto end;
×
50
  }
51

52
  tDecoderClear(&decoder);
13,522,894✔
53

54
  STqOffset* pOffset = &vgOffset.offset;
13,522,894✔
55

56
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
13,522,894✔
57
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
365,935✔
58
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
59
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
13,156,959✔
60
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
13,156,959✔
61
            pOffset->val.version);
62
  } else {
63
    tqError("invalid commit offset type:%d", pOffset->val.type);
×
64
    code = TSDB_CODE_INVALID_MSG;
×
65
    goto end;
×
66
  }
67

68
  STqOffset* pSavedOffset = NULL;
13,522,894✔
69
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
13,522,894✔
70
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
13,522,519✔
71
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
6,631✔
72
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
73
    goto end;  // no need to update the offset value
6,631✔
74
  }
75

76
  // if (pOffset->val.type == TMQ_OFFSET__LOG && vgOffset.markWal) {
77
  //   int32_t ret = walSetKeepVersion(pTq->pVnode->pWal, pOffset->val.version);
78
  //   tqDebug("set wal reader keep version to %" PRId64 " for vgId:%d sub:%s, code:%d", pOffset->val.version, vgId,
79
  //           pOffset->subKey, ret);
80
  // }
81
  // save the new offset value
82
  code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
13,515,888✔
83
  if (code != 0) {
13,515,125✔
84
    goto end;
×
85
  }
86

87
  return 0;
13,515,125✔
88
end:
6,631✔
89
  tDecoderClear(&decoder);
6,631✔
90
  tOffsetDestroy(&vgOffset.offset.val);
6,631✔
91
  return code;
6,631✔
92
}
93

94
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
333,173,181✔
95
  if (pTq == NULL || pMsg == NULL) {
333,173,181✔
96
    return TSDB_CODE_INVALID_PARA;
×
97
  }
98
  SMqPollReq req = {0};
333,184,855✔
99
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
333,186,560✔
100
  if (code < 0) {
333,039,325✔
101
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
102
    code = TSDB_CODE_INVALID_MSG;
×
103
    goto END;
×
104
  }
105
  if (req.rawData == 1){
333,039,325✔
106
    req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
107
    if (req.uidHash == NULL) {
×
108
      tqError("tq poll rawData taosHashInit failed");
×
109
      code = terrno;
×
110
      goto END;
×
111
    }
112
  }
113
  int64_t      consumerId = req.consumerId;
333,039,325✔
114
  int32_t      reqEpoch = req.epoch;
333,039,325✔
115
  int32_t      vgId = TD_VID(pTq->pVnode);
333,039,325✔
116
  STqHandle*   pHandle = NULL;
333,123,374✔
117

118
  while (1) {
34,207✔
119
    taosWLockLatch(&pTq->lock);
332,983,050✔
120
    // 1. find handle
121
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
333,161,085✔
122
    if (code != TDB_CODE_SUCCESS) {
333,108,456✔
123
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found, msg:%s", consumerId, vgId, req.subKey, tstrerror(code));
7,300✔
124
      taosWUnLockLatch(&pTq->lock);
7,300✔
125
      return code;
7,300✔
126
    }
127

128
    // 2. check rebalance status
129
    if (pHandle->consumerId != consumerId) {
333,101,156✔
130
      tqWarn("tmq poll: consumer:0x%" PRIx64" vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
730✔
131
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
132
      code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
730✔
133
      taosWUnLockLatch(&pTq->lock);
730✔
134
      goto END;
730✔
135
    }
136

137
    bool exec = tqIsHandleExec(pHandle);
333,134,681✔
138
    if (!exec) {
333,139,618✔
139
      tqSetHandleExec(pHandle);
333,129,902✔
140
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
141
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
333,023,530✔
142
              req.subKey, pHandle);
143
      taosWUnLockLatch(&pTq->lock);
333,188,505✔
144
      break;
333,196,624✔
145
    }
146
    taosWUnLockLatch(&pTq->lock);
12,828✔
147

148
    tqDebug("tmq poll: consumer:0x%" PRIx64
34,207✔
149
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
150
            consumerId, vgId, req.subKey, pHandle);
151
    taosMsleep(10);
34,207✔
152
  }
153

154
  // 3. update the epoch value
155
  if (pHandle->epoch < reqEpoch) {
333,196,624✔
156
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
538,081✔
157
            reqEpoch);
158
    pHandle->epoch = reqEpoch;
538,081✔
159
  }
160

161
  char buf[TSDB_OFFSET_LEN] = {0};
333,197,441✔
162
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &req.reqOffset);
333,193,077✔
163
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, QID:0x%" PRIx64,
333,194,754✔
164
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
165

166
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
333,198,731✔
167
  tqSetHandleIdle(pHandle);
333,082,197✔
168

169
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
333,118,709✔
170
          req.subKey, pHandle);
171

172
END:
333,177,632✔
173
  tDestroySMqPollReq(&req);
333,197,005✔
174
  return code;
333,196,163✔
175
}
176

177
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
1,098✔
178
  if (pTq == NULL || pMsg == NULL) {
1,098✔
179
    return TSDB_CODE_INVALID_PARA;
×
180
  }
181
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1,098✔
182
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
1,098✔
183

184
  SMqVgOffset vgOffset = {0};
1,098✔
185

186
  SDecoder decoder;
1,098✔
187
  tDecoderInit(&decoder, (uint8_t*)data, len);
1,098✔
188
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
1,098✔
189
    tDecoderClear(&decoder);
×
190
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
191
    return terrno;
×
192
  }
193

194
  tDecoderClear(&decoder);
1,098✔
195

196
  STqOffset* pSavedOffset = NULL;
1,098✔
197
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
1,098✔
198
  if (code != 0) {
1,098✔
199
    return TSDB_CODE_TMQ_NO_COMMITTED;
×
200
  }
201
  vgOffset.offset = *pSavedOffset;
1,098✔
202

203
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
1,098✔
204
  if (code < 0) {
1,098✔
205
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
206
  }
207

208
  void* buf = rpcMallocCont(len);
1,098✔
209
  if (buf == NULL) {
1,098✔
210
    return terrno;
×
211
  }
212
  SEncoder encoder = {0};
1,098✔
213
  tEncoderInit(&encoder, buf, len);
1,098✔
214
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
1,098✔
215
  tEncoderClear(&encoder);
1,098✔
216
  if (code < 0) {
1,098✔
217
    rpcFreeCont(buf);
×
218
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
219
  }
220

221
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
1,098✔
222

223
  tmsgSendRsp(&rsp);
1,098✔
224
  return 0;
1,098✔
225
}
226

227
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
740✔
228
  if (pTq == NULL || pMsg == NULL) {
740✔
229
    return TSDB_CODE_INVALID_PARA;
×
230
  }
231
  int32_t    code = 0;
740✔
232
  SMqPollReq req = {0};
740✔
233
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
740✔
234
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
235
    return TSDB_CODE_INVALID_MSG;
×
236
  }
237

238
  int64_t      consumerId = req.consumerId;
740✔
239
  STqOffsetVal reqOffset = req.reqOffset;
740✔
240
  int32_t      vgId = TD_VID(pTq->pVnode);
740✔
241

242
  // 1. find handle
243
  taosRLockLatch(&pTq->lock);
740✔
244
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
740✔
245
  if (pHandle == NULL) {
740✔
246
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
247
    taosRUnLockLatch(&pTq->lock);
×
248
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
249
  }
250

251
  // 2. check rebalance status
252
  if (pHandle->consumerId != consumerId) {
740✔
253
    tqError("%s consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
254
            __func__, consumerId, vgId, req.subKey, pHandle->consumerId);
255
    taosRUnLockLatch(&pTq->lock);
×
256
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
257
  }
258

259
  int64_t sver = 0, ever = 0;
740✔
260
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
740✔
261
  taosRUnLockLatch(&pTq->lock);
740✔
262

263
  SMqDataRsp dataRsp = {0};
740✔
264
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
740✔
265
  if (code != 0) {
740✔
266
    return code;
×
267
  }
268

269
  if (req.useSnapshot == true) {
740✔
270
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
271
    code = TSDB_CODE_INVALID_PARA;
×
272
    goto END;
×
273
  }
274

275
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
740✔
276

277
  if (reqOffset.type == TMQ_OFFSET__LOG) {
740✔
278
    dataRsp.rspOffset.version = reqOffset.version;
370✔
279
  } else if (reqOffset.type < 0) {
370✔
280
    STqOffset* pOffset = NULL;
370✔
281
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
370✔
282
    if (code == 0) {
370✔
283
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
370✔
284
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
285
        code = TSDB_CODE_INVALID_PARA;
×
286
        goto END;
×
287
      }
288

289
      dataRsp.rspOffset.version = pOffset->val.version;
370✔
290
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
370✔
291
             req.subKey, dataRsp.rspOffset.version);
292
    } else {
293
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
×
294
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
×
295
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
×
296
        dataRsp.rspOffset.version = ever;
×
297
      }
298
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
×
299
             dataRsp.rspOffset.version);
300
    }
301
  } else {
302
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
303
            reqOffset.type);
304
    code = TSDB_CODE_INVALID_PARA;
×
305
    goto END;
×
306
  }
307

308
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
740✔
309

310
END:
740✔
311
  tDeleteMqDataRsp(&dataRsp);
740✔
312
  return code;
740✔
313
}
314

315
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
348,699✔
316
  if (pTq == NULL || msg == NULL) {
348,699✔
317
    return TSDB_CODE_INVALID_PARA;
×
318
  }
319
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
348,699✔
320
  int32_t        vgId = TD_VID(pTq->pVnode);
348,699✔
321

322
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
348,699✔
323
  int32_t code = 0;
348,699✔
324

325
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
348,699✔
326
  if (pHandle) {
348,699✔
327
    while (1) {
×
328
      taosWLockLatch(&pTq->lock);
347,983✔
329
      bool exec = tqIsHandleExec(pHandle);
347,983✔
330

331
      if (exec) {
347,983✔
332
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
×
333
               pHandle->subKey, pHandle);
334
        taosWUnLockLatch(&pTq->lock);
×
335
        taosMsleep(10);
×
336
        continue;
×
337
      }
338
      // tqUnregisterPushHandle(pTq, pHandle);
339
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
347,983✔
340
      if (code != 0) {
347,220✔
341
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
342
      }
343
      taosWUnLockLatch(&pTq->lock);
347,220✔
344
      break;
347,983✔
345
    }
346
  }
347

348
  taosWLockLatch(&pTq->lock);
348,699✔
349
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
348,699✔
350
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
100,998✔
351
  }
352
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
348,279✔
353
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
323,814✔
354
  }
355

356
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
348,790✔
357
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
358
  }
359
  taosWUnLockLatch(&pTq->lock);
347,882✔
360

361
  return 0;
347,465✔
362
}
363

364
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,487,207✔
365
  if (pTq == NULL || msg == NULL) {
1,487,207✔
366
    return TSDB_CODE_INVALID_PARA;
×
367
  }
368
  int         ret = 0;
1,487,207✔
369
  SMqRebVgReq req = {0};
1,487,207✔
370
  SDecoder    dc = {0};
1,487,207✔
371

372
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
1,487,207✔
373
  ret = tDecodeSMqRebVgReq(&dc, &req);
1,487,207✔
374
  if (ret < 0) {
1,487,207✔
375
    goto end;
×
376
  }
377

378
  tqInfo("vgId:%d, tmq subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
1,487,207✔
379
         req.oldConsumerId, req.newConsumerId);
380

381
  taosRLockLatch(&pTq->lock);  
1,487,267✔
382
  STqHandle* pHandle = NULL;
1,487,207✔
383
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
1,487,207✔
384
  if (code != 0){
1,487,147✔
385
    tqInfo("vgId:%d, tmq subscribe req:%s, no such handle, create new one, msg:%s", pTq->pVnode->config.vgId, req.subKey, tstrerror(code));
476,251✔
386
  }
387
  taosRUnLockLatch(&pTq->lock);
1,487,848✔
388
  if (pHandle == NULL) {
1,486,432✔
389
    if (req.oldConsumerId != -1) {
476,311✔
390
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
×
391
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
392
    }
393
    if (req.newConsumerId == -1) {
476,311✔
394
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64, req.vgId, req.newConsumerId);
×
395
      ret = TSDB_CODE_INVALID_PARA;
×
396
      goto end;
×
397
    }
398
    STqHandle handle = {0};
476,311✔
399
    ret = tqMetaCreateHandle(pTq, &req, &handle);
476,311✔
400
    if (ret < 0) {
476,311✔
401
      tqDestroySTqHandle(&handle);
×
402
      goto end;
×
403
    }
404
    taosWLockLatch(&pTq->lock);
476,311✔
405
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
476,311✔
406
    taosWUnLockLatch(&pTq->lock);
474,256✔
407
  } else {
408
    int maxLoop = MAX_LOOP;
1,010,121✔
409
    while (maxLoop-- > 0) {
1,010,121✔
410
      taosWLockLatch(&pTq->lock);
1,010,121✔
411
      bool exec = tqIsHandleExec(pHandle);
1,010,896✔
412
      if (exec) {
1,010,121✔
413
        tqInfo("vgId:%d, topic:%s, subscribe is executing, subscribe wait for 10ms and retry, pHandle:%p",
×
414
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
415
        taosWUnLockLatch(&pTq->lock);
×
416
        taosMsleep(10);
×
UNCOV
417
        continue;
×
418
      }
419
      if (req.subType == TOPIC_SUB_TYPE__COLUMN && strcmp(req.qmsg, pHandle->execHandle.execCol.qmsg) != 0) {
1,013,423✔
420
        tqInfo("vgId:%d, topic:%s, subscribe qmsg changed from %s to %s, need recreate handle", pTq->pVnode->config.vgId,
3,302✔
421
               pHandle->subKey, pHandle->execHandle.execCol.qmsg, req.qmsg);
422
        // tqUnregisterPushHandle(pTq, pHandle);
423
        STqHandle handle = {0};
3,302✔
424
        ret = tqMetaCreateHandle(pTq, &req, &handle);
3,302✔
425
        if (ret < 0) {
3,302✔
426
          tqDestroySTqHandle(&handle);
×
427
          taosWUnLockLatch(&pTq->lock);
×
428
          goto end;
×
429
        }
430
        ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
3,302✔
431
        tqInfo("vgId:%d, reload subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
3,302✔
432
         req.oldConsumerId, req.newConsumerId);
433
      } else if (pHandle->consumerId == req.newConsumerId) {  // do nothing
1,007,225✔
434
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
54,255✔
435
      } else {
436
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
953,339✔
437
               req.newConsumerId);
438

439
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
953,339✔
440
        atomic_store_32(&pHandle->epoch, 0);
953,339✔
441
        // tqUnregisterPushHandle(pTq, pHandle);
442
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
953,339✔
443
      }
444
      taosWUnLockLatch(&pTq->lock);
1,006,984✔
445
      break;
1,007,664✔
446
    }
447
  }
448

449
end:
1,478,851✔
450
  if (req.schema.pSchema != NULL) {
1,483,209✔
451
    taosMemoryFree(req.schema.pSchema);
836,532✔
452
  }
453
  tDecoderClear(&dc);
1,477,428✔
454
  return ret;
1,475,342✔
455
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc