• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5047

07 May 2026 07:27AM UTC coverage: 73.152% (+0.008%) from 73.144%
#5047

push

travis-ci

web-flow
feat(parser): add support for QUARTER duration alias and related tests (#35268)

6 of 6 new or added lines in 2 files covered. (100.0%)

597 existing lines in 127 files now uncovered.

277691 of 379607 relevant lines covered (73.15%)

132499712.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.55
/source/dnode/vnode/src/tq/tqInterface.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include <string.h>
18
#include "osDef.h"
19
#include "taoserror.h"
20
#include "stream.h"
21
#include "vnd.h"
22

23
#define MAX_LOOP 100
24

25
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return pHandle != NULL ? TMQ_HANDLE_STATUS_EXEC == pHandle->status : true; }
307,944,379✔
26
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
306,699,296✔
27
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
306,661,360✔
28

29
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
17,328,336✔
30
  if (pLeft == NULL || pRight == NULL) {
17,328,336✔
31
    return false;
×
32
  }
33
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
34,332,425✔
34
         pLeft->val.version == pRight->val.version;
17,004,452✔
35
}
36

37
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
17,634,478✔
38
  if (pTq == NULL) {
17,634,478✔
39
    return TSDB_CODE_INVALID_PARA;
×
40
  }
41
  SMqVgOffset vgOffset = {0};
17,634,478✔
42
  int32_t     vgId = TD_VID(pTq->pVnode);
17,634,478✔
43

44
  int32_t  code = 0;
17,634,478✔
45
  SDecoder decoder = {0};
17,634,478✔
46
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
17,634,478✔
47
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
17,634,478✔
48
    code = TSDB_CODE_INVALID_MSG;
×
49
    goto end;
×
50
  }
51

52
  tDecoderClear(&decoder);
17,634,478✔
53

54
  STqOffset* pOffset = &vgOffset.offset;
17,634,123✔
55

56
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
17,634,123✔
57
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
331,534✔
58
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
59
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
17,302,944✔
60
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
17,302,944✔
61
            pOffset->val.version);
62
  } else {
63
    tqError("invalid commit offset type:%d", pOffset->val.type);
×
64
    code = TSDB_CODE_INVALID_MSG;
×
65
    goto end;
×
66
  }
67

68
  STqOffset* pSavedOffset = NULL;
17,634,478✔
69
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
17,634,478✔
70
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
17,633,785✔
71
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
10,478✔
72
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
73
    goto end;  // no need to update the offset value
10,478✔
74
  }
75

76
  // if (pOffset->val.type == TMQ_OFFSET__LOG && vgOffset.markWal) {
77
  //   int32_t ret = walSetKeepVersion(pTq->pVnode->pWal, pOffset->val.version);
78
  //   tqDebug("set wal reader keep version to %" PRId64 " for vgId:%d sub:%s, code:%d", pOffset->val.version, vgId,
79
  //           pOffset->subKey, ret);
80
  // }
81
  // save the new offset value
82
  code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
17,623,265✔
83
  if (code != 0) {
17,623,675✔
84
    goto end;
×
85
  }
86

87
  return 0;
17,623,675✔
88
end:
10,478✔
89
  tDecoderClear(&decoder);
10,478✔
90
  tOffsetDestroy(&vgOffset.offset.val);
10,478✔
91
  return code;
10,478✔
92
}
93

94
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
306,719,747✔
95
  if (pTq == NULL || pMsg == NULL) {
306,719,747✔
96
    return TSDB_CODE_INVALID_PARA;
×
97
  }
98
  SMqPollReq req = {0};
306,725,748✔
99
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
306,726,238✔
100
  if (code < 0) {
306,673,342✔
101
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
102
    code = TSDB_CODE_INVALID_MSG;
×
103
    goto END;
×
104
  }
105
  if (req.rawData == 1){
306,673,342✔
106
    req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
107
    if (req.uidHash == NULL) {
×
108
      tqError("tq poll rawData taosHashInit failed");
×
109
      code = terrno;
×
110
      goto END;
×
111
    }
112
  }
113
  int64_t      consumerId = req.consumerId;
306,673,342✔
114
  int32_t      reqEpoch = req.epoch;
306,673,342✔
115
  int32_t      vgId = TD_VID(pTq->pVnode);
306,673,342✔
116
  STqHandle*   pHandle = NULL;
306,707,119✔
117

118
  while (1) {
21,489✔
119
    taosWLockLatch(&pTq->lock);
306,601,577✔
120
    // 1. find handle
121
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
306,704,668✔
122
    if (code != TDB_CODE_SUCCESS) {
306,690,320✔
UNCOV
123
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found, msg:%s", consumerId, vgId, req.subKey, tstrerror(code));
×
UNCOV
124
      taosWUnLockLatch(&pTq->lock);
×
UNCOV
125
      return code;
×
126
    }
127

128
    // 2. check rebalance status
129
    if (pHandle->consumerId != consumerId) {
306,690,320✔
130
      tqWarn("tmq poll: consumer:0x%" PRIx64" vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
1,607✔
131
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
132
      code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
1,607✔
133
      taosWUnLockLatch(&pTq->lock);
1,607✔
134
      goto END;
1,607✔
135
    }
136

137
    bool exec = tqIsHandleExec(pHandle);
306,684,293✔
138
    if (!exec) {
306,708,948✔
139
      tqSetHandleExec(pHandle);
306,699,296✔
140
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
141
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
306,703,236✔
142
              req.subKey, pHandle);
143
      taosWUnLockLatch(&pTq->lock);
306,803,221✔
144
      break;
306,743,986✔
145
    }
146
    taosWUnLockLatch(&pTq->lock);
10,245✔
147

148
    tqDebug("tmq poll: consumer:0x%" PRIx64
21,489✔
149
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
150
            consumerId, vgId, req.subKey, pHandle);
151
    taosMsleep(10);
21,489✔
152
  }
153

154
  // 3. update the epoch value
155
  if (pHandle->epoch < reqEpoch) {
306,743,986✔
156
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
499,017✔
157
            reqEpoch);
158
    pHandle->epoch = reqEpoch;
499,017✔
159
  }
160

161
  char buf[TSDB_OFFSET_LEN] = {0};
306,743,986✔
162
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &req.reqOffset);
306,742,368✔
163
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, QID:0x%" PRIx64,
306,741,769✔
164
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
165

166
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
306,742,851✔
167
  tqSetHandleIdle(pHandle);
306,661,360✔
168

169
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
306,692,432✔
170
          req.subKey, pHandle);
171

172
END:
306,733,098✔
173
  tDestroySMqPollReq(&req);
306,744,520✔
174
  return code;
306,744,134✔
175
}
176

177
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
342✔
178
  if (pTq == NULL || pMsg == NULL) {
342✔
179
    return TSDB_CODE_INVALID_PARA;
×
180
  }
181
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
342✔
182
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
342✔
183

184
  SMqVgOffset vgOffset = {0};
342✔
185

186
  SDecoder decoder;
342✔
187
  tDecoderInit(&decoder, (uint8_t*)data, len);
342✔
188
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
342✔
189
    tDecoderClear(&decoder);
×
190
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
191
    return terrno;
×
192
  }
193

194
  tDecoderClear(&decoder);
342✔
195

196
  STqOffset* pSavedOffset = NULL;
342✔
197
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
342✔
198
  if (code != 0) {
342✔
UNCOV
199
    return TSDB_CODE_TMQ_NO_COMMITTED;
×
200
  }
201
  vgOffset.offset = *pSavedOffset;
342✔
202

203
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
342✔
204
  if (code < 0) {
342✔
205
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
206
  }
207

208
  void* buf = rpcMallocCont(len);
342✔
209
  if (buf == NULL) {
342✔
210
    return terrno;
×
211
  }
212
  SEncoder encoder = {0};
342✔
213
  tEncoderInit(&encoder, buf, len);
342✔
214
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
342✔
215
  tEncoderClear(&encoder);
342✔
216
  if (code < 0) {
342✔
217
    rpcFreeCont(buf);
×
218
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
219
  }
220

221
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
342✔
222

223
  tmsgSendRsp(&rsp);
342✔
224
  return 0;
342✔
225
}
226

227
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
684✔
228
  if (pTq == NULL || pMsg == NULL) {
684✔
229
    return TSDB_CODE_INVALID_PARA;
×
230
  }
231
  int32_t    code = 0;
684✔
232
  SMqPollReq req = {0};
684✔
233
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
684✔
234
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
235
    return TSDB_CODE_INVALID_MSG;
×
236
  }
237

238
  int64_t      consumerId = req.consumerId;
684✔
239
  STqOffsetVal reqOffset = req.reqOffset;
684✔
240
  int32_t      vgId = TD_VID(pTq->pVnode);
684✔
241

242
  // 1. find handle
243
  taosRLockLatch(&pTq->lock);
684✔
244
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
684✔
245
  if (pHandle == NULL) {
684✔
246
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
247
    taosRUnLockLatch(&pTq->lock);
×
248
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
249
  }
250

251
  // 2. check rebalance status
252
  if (pHandle->consumerId != consumerId) {
684✔
253
    tqError("%s consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
254
            __func__, consumerId, vgId, req.subKey, pHandle->consumerId);
255
    taosRUnLockLatch(&pTq->lock);
×
256
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
257
  }
258

259
  int64_t sver = 0, ever = 0;
684✔
260
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
684✔
261
  taosRUnLockLatch(&pTq->lock);
684✔
262

263
  SMqDataRsp dataRsp = {0};
684✔
264
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
684✔
265
  if (code != 0) {
684✔
266
    return code;
×
267
  }
268

269
  if (req.useSnapshot == true) {
684✔
270
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
271
    code = TSDB_CODE_INVALID_PARA;
×
272
    goto END;
×
273
  }
274

275
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
684✔
276

277
  if (reqOffset.type == TMQ_OFFSET__LOG) {
684✔
278
    dataRsp.rspOffset.version = reqOffset.version;
342✔
279
  } else if (reqOffset.type < 0) {
342✔
280
    STqOffset* pOffset = NULL;
342✔
281
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
342✔
282
    if (code == 0) {
342✔
283
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
342✔
284
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
285
        code = TSDB_CODE_INVALID_PARA;
×
286
        goto END;
×
287
      }
288

289
      dataRsp.rspOffset.version = pOffset->val.version;
342✔
290
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
342✔
291
             req.subKey, dataRsp.rspOffset.version);
292
    } else {
293
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
×
294
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
×
295
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
×
296
        dataRsp.rspOffset.version = ever;
×
297
      }
298
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
×
299
             dataRsp.rspOffset.version);
300
    }
301
  } else {
302
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
303
            reqOffset.type);
304
    code = TSDB_CODE_INVALID_PARA;
×
305
    goto END;
×
306
  }
307

308
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
684✔
309

310
END:
684✔
311
  tDeleteMqDataRsp(&dataRsp);
684✔
312
  return code;
684✔
313
}
314

315
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
323,309✔
316
  if (pTq == NULL || msg == NULL) {
323,309✔
317
    return TSDB_CODE_INVALID_PARA;
×
318
  }
319
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
323,309✔
320
  int32_t        vgId = TD_VID(pTq->pVnode);
323,309✔
321

322
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
323,309✔
323
  int32_t code = 0;
323,309✔
324

325
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
323,309✔
326
  if (pHandle) {
323,309✔
327
    while (1) {
2,792✔
328
      taosWLockLatch(&pTq->lock);
324,521✔
329
      bool exec = tqIsHandleExec(pHandle);
324,521✔
330

331
      if (exec) {
324,521✔
332
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
2,792✔
333
               pHandle->subKey, pHandle);
334
        taosWUnLockLatch(&pTq->lock);
2,792✔
335
        taosMsleep(10);
2,792✔
336
        continue;
2,792✔
337
      }
338
      // tqUnregisterPushHandle(pTq, pHandle);
339
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
321,729✔
340
      if (code != 0) {
321,729✔
341
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
342
      }
343
      taosWUnLockLatch(&pTq->lock);
321,729✔
344
      break;
321,729✔
345
    }
346
  }
347

348
  taosWLockLatch(&pTq->lock);
323,309✔
349
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
323,003✔
350
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
96,411✔
351
  }
352
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
323,686✔
353
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
300,904✔
354
  }
355

356
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
322,909✔
357
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
358
  }
359
  taosWUnLockLatch(&pTq->lock);
321,863✔
360

361
  return 0;
322,258✔
362
}
363

364
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,372,825✔
365
  if (pTq == NULL || msg == NULL) {
1,372,825✔
366
    return TSDB_CODE_INVALID_PARA;
×
367
  }
368
  int         ret = 0;
1,373,228✔
369
  SMqRebVgReq req = {0};
1,373,228✔
370
  SDecoder    dc = {0};
1,373,228✔
371

372
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
1,373,228✔
373
  ret = tDecodeSMqRebVgReq(&dc, &req);
1,373,228✔
374
  if (ret < 0) {
1,372,461✔
375
    goto end;
×
376
  }
377

378
  tqInfo("vgId:%d, tmq subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
1,372,461✔
379
         req.oldConsumerId, req.newConsumerId);
380

381
  taosRLockLatch(&pTq->lock);  
1,372,814✔
382
  STqHandle* pHandle = NULL;
1,373,228✔
383
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
1,373,228✔
384
  if (code != 0){
1,372,158✔
385
    tqInfo("vgId:%d, tmq subscribe req:%s, no such handle, create new one, msg:%s", pTq->pVnode->config.vgId, req.subKey, tstrerror(code));
436,593✔
386
  }
387
  taosRUnLockLatch(&pTq->lock);
1,373,147✔
388
  if (pHandle == NULL) {
1,373,228✔
389
    if (req.oldConsumerId != -1) {
437,663✔
390
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
×
391
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
392
    }
393
    if (req.newConsumerId == -1) {
437,663✔
394
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64, req.vgId, req.newConsumerId);
×
395
      ret = TSDB_CODE_INVALID_PARA;
×
396
      goto end;
×
397
    }
398
    STqHandle handle = {0};
437,663✔
399
    ret = tqMetaCreateHandle(pTq, &req, &handle);
437,663✔
400
    if (ret < 0) {
437,663✔
401
      tqDestroySTqHandle(&handle);
×
402
      goto end;
×
403
    }
404
    taosWLockLatch(&pTq->lock);
437,663✔
405
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
437,663✔
406
    taosWUnLockLatch(&pTq->lock);
435,716✔
407
  } else {
408
    int maxLoop = MAX_LOOP;
935,565✔
409
    while (maxLoop-- > 0) {
935,565✔
410
      taosWLockLatch(&pTq->lock);
935,565✔
411
      bool exec = tqIsHandleExec(pHandle);
935,565✔
412
      if (exec) {
935,565✔
413
        tqInfo("vgId:%d, topic:%s, subscribe is executing, subscribe wait for 10ms and retry, pHandle:%p",
×
414
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
415
        taosWUnLockLatch(&pTq->lock);
×
416
        taosMsleep(10);
×
417
        continue;
×
418
      }
419
      if (req.subType == TOPIC_SUB_TYPE__COLUMN && strcmp(req.qmsg, pHandle->execHandle.execCol.qmsg) != 0) {
938,582✔
420
        tqInfo("vgId:%d, topic:%s, subscribe qmsg changed from %s to %s, need recreate handle", pTq->pVnode->config.vgId,
3,017✔
421
               pHandle->subKey, pHandle->execHandle.execCol.qmsg, req.qmsg);
422
        // tqUnregisterPushHandle(pTq, pHandle);
423
        STqHandle handle = {0};
3,017✔
424
        ret = tqMetaCreateHandle(pTq, &req, &handle);
3,017✔
425
        if (ret < 0) {
3,017✔
426
          tqDestroySTqHandle(&handle);
×
427
          taosWUnLockLatch(&pTq->lock);
×
428
          goto end;
×
429
        }
430
        ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
3,017✔
431
        tqInfo("vgId:%d, reload subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
3,017✔
432
         req.oldConsumerId, req.newConsumerId);
433
      } else if (pHandle->consumerId == req.newConsumerId) {  // do nothing
932,548✔
434
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
49,017✔
435
      } else {
436
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
883,531✔
437
               req.newConsumerId);
438

439
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
883,531✔
440
        atomic_store_32(&pHandle->epoch, 0);
883,531✔
441
        // tqUnregisterPushHandle(pTq, pHandle);
442
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
883,531✔
443
      }
444
      taosWUnLockLatch(&pTq->lock);
933,029✔
445
      break;
934,561✔
446
    }
447
  }
448

449
end:
1,366,685✔
450
  if (req.schema.pSchema != NULL) {
1,370,197✔
451
    taosMemoryFree(req.schema.pSchema);
783,921✔
452
  }
453
  tDecoderClear(&dc);
1,366,796✔
454
  return ret;
1,364,679✔
455
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc