• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5059

17 May 2026 01:15AM UTC coverage: 73.443% (+0.06%) from 73.387%
#5059

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281870 of 383795 relevant lines covered (73.44%)

135516561.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.19
/source/dnode/vnode/src/tq/tqInterface.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include <string.h>
18
#include "osDef.h"
19
#include "taoserror.h"
20
#include "stream.h"
21
#include "vnd.h"
22

23
#define MAX_LOOP 100
24

25
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return pHandle != NULL ? TMQ_HANDLE_STATUS_EXEC == pHandle->status : true; }
300,956,920✔
26
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
299,600,355✔
27
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
299,568,667✔
28

29
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
12,855,909✔
30
  if (pLeft == NULL || pRight == NULL) {
12,855,909✔
31
    return false;
×
32
  }
33
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
25,361,072✔
34
         pLeft->val.version == pRight->val.version;
12,504,785✔
35
}
36

37
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
13,178,846✔
38
  if (pTq == NULL) {
13,178,846✔
39
    return TSDB_CODE_INVALID_PARA;
×
40
  }
41
  SMqVgOffset vgOffset = {0};
13,178,846✔
42
  int32_t     vgId = TD_VID(pTq->pVnode);
13,178,846✔
43

44
  int32_t  code = 0;
13,178,846✔
45
  SDecoder decoder = {0};
13,178,846✔
46
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
13,178,846✔
47
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
13,178,846✔
48
    code = TSDB_CODE_INVALID_MSG;
×
49
    goto end;
×
50
  }
51

52
  tDecoderClear(&decoder);
13,178,846✔
53

54
  STqOffset* pOffset = &vgOffset.offset;
13,178,502✔
55

56
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
13,178,502✔
57
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
359,619✔
58
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
59
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
12,818,883✔
60
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
12,819,227✔
61
            pOffset->val.version);
62
  } else {
63
    tqError("invalid commit offset type:%d", pOffset->val.type);
×
64
    code = TSDB_CODE_INVALID_MSG;
×
65
    goto end;
×
66
  }
67

68
  STqOffset* pSavedOffset = NULL;
13,178,846✔
69
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
13,178,846✔
70
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
13,178,492✔
71
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
2,685✔
72
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
73
    goto end;  // no need to update the offset value
2,685✔
74
  }
75

76
  // if (pOffset->val.type == TMQ_OFFSET__LOG && vgOffset.markWal) {
77
  //   int32_t ret = walSetKeepVersion(pTq->pVnode->pWal, pOffset->val.version);
78
  //   tqDebug("set wal reader keep version to %" PRId64 " for vgId:%d sub:%s, code:%d", pOffset->val.version, vgId,
79
  //           pOffset->subKey, ret);
80
  // }
81
  // save the new offset value
82
  code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
13,175,429✔
83
  if (code != 0) {
13,175,831✔
84
    goto end;
×
85
  }
86

87
  return 0;
13,175,831✔
88
end:
2,685✔
89
  tDecoderClear(&decoder);
2,685✔
90
  tOffsetDestroy(&vgOffset.offset.val);
2,685✔
91
  return code;
2,685✔
92
}
93

94
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
299,635,770✔
95
  if (pTq == NULL || pMsg == NULL) {
299,635,770✔
96
    return TSDB_CODE_INVALID_PARA;
×
97
  }
98
  SMqPollReq req = {0};
299,643,392✔
99
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
299,643,722✔
100
  if (code < 0) {
299,583,307✔
101
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
102
    code = TSDB_CODE_INVALID_MSG;
×
103
    goto END;
×
104
  }
105
  if (req.rawData == 1){
299,583,307✔
106
    req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
107
    if (req.uidHash == NULL) {
×
108
      tqError("tq poll rawData taosHashInit failed");
×
109
      code = terrno;
×
110
      goto END;
×
111
    }
112
  }
113
  int64_t      consumerId = req.consumerId;
299,583,307✔
114
  int32_t      reqEpoch = req.epoch;
299,583,307✔
115
  int32_t      vgId = TD_VID(pTq->pVnode);
299,583,307✔
116
  STqHandle*   pHandle = NULL;
299,591,983✔
117

118
  while (1) {
42,755✔
119
    taosWLockLatch(&pTq->lock);
299,501,177✔
120
    // 1. find handle
121
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
299,670,480✔
122
    if (code != TDB_CODE_SUCCESS) {
299,608,758✔
123
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found, msg:%s", consumerId, vgId, req.subKey, tstrerror(code));
3,615✔
124
      taosWUnLockLatch(&pTq->lock);
3,615✔
125
      return code;
3,615✔
126
    }
127

128
    // 2. check rebalance status
129
    if (pHandle->consumerId != consumerId) {
299,605,143✔
130
      tqWarn("tmq poll: consumer:0x%" PRIx64" vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
712✔
131
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
132
      code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
712✔
133
      taosWUnLockLatch(&pTq->lock);
712✔
134
      goto END;
712✔
135
    }
136

137
    bool exec = tqIsHandleExec(pHandle);
299,624,172✔
138
    if (!exec) {
299,617,729✔
139
      tqSetHandleExec(pHandle);
299,600,355✔
140
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
141
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
299,537,053✔
142
              req.subKey, pHandle);
143
      taosWUnLockLatch(&pTq->lock);
299,646,984✔
144
      break;
299,659,658✔
145
    }
146
    taosWUnLockLatch(&pTq->lock);
19,490✔
147

148
    tqDebug("tmq poll: consumer:0x%" PRIx64
42,755✔
149
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
150
            consumerId, vgId, req.subKey, pHandle);
151
    taosMsleep(10);
42,755✔
152
  }
153

154
  // 3. update the epoch value
155
  if (pHandle->epoch < reqEpoch) {
299,659,658✔
156
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
530,301✔
157
            reqEpoch);
158
    pHandle->epoch = reqEpoch;
530,301✔
159
  }
160

161
  char buf[TSDB_OFFSET_LEN] = {0};
299,659,612✔
162
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &req.reqOffset);
299,659,312✔
163
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, QID:0x%" PRIx64,
299,657,405✔
164
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
165

166
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
299,653,780✔
167
  tqSetHandleIdle(pHandle);
299,568,667✔
168

169
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
299,607,158✔
170
          req.subKey, pHandle);
171

172
END:
299,649,533✔
173
  tDestroySMqPollReq(&req);
299,658,885✔
174
  return code;
299,656,443✔
175
}
176

177
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
365✔
178
  if (pTq == NULL || pMsg == NULL) {
365✔
179
    return TSDB_CODE_INVALID_PARA;
×
180
  }
181
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
365✔
182
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
365✔
183

184
  SMqVgOffset vgOffset = {0};
365✔
185

186
  SDecoder decoder;
365✔
187
  tDecoderInit(&decoder, (uint8_t*)data, len);
365✔
188
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
365✔
189
    tDecoderClear(&decoder);
×
190
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
191
    return terrno;
×
192
  }
193

194
  tDecoderClear(&decoder);
365✔
195

196
  STqOffset* pSavedOffset = NULL;
365✔
197
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
365✔
198
  if (code != 0) {
365✔
199
    return TSDB_CODE_TMQ_NO_COMMITTED;
×
200
  }
201
  vgOffset.offset = *pSavedOffset;
365✔
202

203
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
365✔
204
  if (code < 0) {
365✔
205
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
206
  }
207

208
  void* buf = rpcMallocCont(len);
365✔
209
  if (buf == NULL) {
365✔
210
    return terrno;
×
211
  }
212
  SEncoder encoder = {0};
365✔
213
  tEncoderInit(&encoder, buf, len);
365✔
214
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
365✔
215
  tEncoderClear(&encoder);
365✔
216
  if (code < 0) {
365✔
217
    rpcFreeCont(buf);
×
218
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
219
  }
220

221
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
365✔
222

223
  tmsgSendRsp(&rsp);
365✔
224
  return 0;
365✔
225
}
226

227
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
730✔
228
  if (pTq == NULL || pMsg == NULL) {
730✔
229
    return TSDB_CODE_INVALID_PARA;
×
230
  }
231
  int32_t    code = 0;
730✔
232
  SMqPollReq req = {0};
730✔
233
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
730✔
234
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
235
    return TSDB_CODE_INVALID_MSG;
×
236
  }
237

238
  int64_t      consumerId = req.consumerId;
730✔
239
  STqOffsetVal reqOffset = req.reqOffset;
730✔
240
  int32_t      vgId = TD_VID(pTq->pVnode);
730✔
241

242
  // 1. find handle
243
  taosRLockLatch(&pTq->lock);
730✔
244
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
730✔
245
  if (pHandle == NULL) {
730✔
246
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
247
    taosRUnLockLatch(&pTq->lock);
×
248
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
249
  }
250

251
  // 2. check rebalance status
252
  if (pHandle->consumerId != consumerId) {
730✔
253
    tqError("%s consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
254
            __func__, consumerId, vgId, req.subKey, pHandle->consumerId);
255
    taosRUnLockLatch(&pTq->lock);
×
256
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
257
  }
258

259
  int64_t sver = 0, ever = 0;
730✔
260
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
730✔
261
  taosRUnLockLatch(&pTq->lock);
730✔
262

263
  SMqDataRsp dataRsp = {0};
730✔
264
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
730✔
265
  if (code != 0) {
730✔
266
    return code;
×
267
  }
268

269
  if (req.useSnapshot == true) {
730✔
270
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
271
    code = TSDB_CODE_INVALID_PARA;
×
272
    goto END;
×
273
  }
274

275
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
730✔
276

277
  if (reqOffset.type == TMQ_OFFSET__LOG) {
730✔
278
    dataRsp.rspOffset.version = reqOffset.version;
365✔
279
  } else if (reqOffset.type < 0) {
365✔
280
    STqOffset* pOffset = NULL;
365✔
281
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
365✔
282
    if (code == 0) {
365✔
283
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
365✔
284
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
285
        code = TSDB_CODE_INVALID_PARA;
×
286
        goto END;
×
287
      }
288

289
      dataRsp.rspOffset.version = pOffset->val.version;
365✔
290
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
365✔
291
             req.subKey, dataRsp.rspOffset.version);
292
    } else {
293
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
×
294
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
×
295
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
×
296
        dataRsp.rspOffset.version = ever;
×
297
      }
298
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
×
299
             dataRsp.rspOffset.version);
300
    }
301
  } else {
302
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
303
            reqOffset.type);
304
    code = TSDB_CODE_INVALID_PARA;
×
305
    goto END;
×
306
  }
307

308
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
730✔
309

310
END:
730✔
311
  tDeleteMqDataRsp(&dataRsp);
730✔
312
  return code;
730✔
313
}
314

315
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
347,331✔
316
  if (pTq == NULL || msg == NULL) {
347,331✔
317
    return TSDB_CODE_INVALID_PARA;
×
318
  }
319
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
347,331✔
320
  int32_t        vgId = TD_VID(pTq->pVnode);
347,331✔
321

322
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
347,331✔
323
  int32_t code = 0;
347,331✔
324

325
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
347,331✔
326
  if (pHandle) {
347,331✔
327
    while (1) {
×
328
      taosWLockLatch(&pTq->lock);
345,943✔
329
      bool exec = tqIsHandleExec(pHandle);
345,943✔
330

331
      if (exec) {
345,943✔
332
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
×
333
               pHandle->subKey, pHandle);
334
        taosWUnLockLatch(&pTq->lock);
×
335
        taosMsleep(10);
×
336
        continue;
×
337
      }
338
      // tqUnregisterPushHandle(pTq, pHandle);
339
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
345,943✔
340
      if (code != 0) {
344,821✔
341
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
342
      }
343
      taosWUnLockLatch(&pTq->lock);
344,821✔
344
      break;
345,618✔
345
    }
346
  }
347

348
  taosWLockLatch(&pTq->lock);
347,006✔
349
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
346,953✔
350
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
99,342✔
351
  }
352
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
347,331✔
353
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
322,459✔
354
  }
355

356
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
346,967✔
357
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
358
  }
359
  taosWUnLockLatch(&pTq->lock);
344,160✔
360

361
  return 0;
345,376✔
362
}
363

364
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,453,000✔
365
  if (pTq == NULL || msg == NULL) {
1,453,000✔
366
    return TSDB_CODE_INVALID_PARA;
×
367
  }
368
  int         ret = 0;
1,453,394✔
369
  SMqRebVgReq req = {0};
1,453,394✔
370
  SDecoder    dc = {0};
1,453,394✔
371

372
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
1,453,394✔
373
  ret = tDecodeSMqRebVgReq(&dc, &req);
1,453,394✔
374
  if (ret < 0) {
1,453,394✔
375
    goto end;
×
376
  }
377

378
  tqInfo("vgId:%d, tmq subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
1,453,394✔
379
         req.oldConsumerId, req.newConsumerId);
380

381
  taosRLockLatch(&pTq->lock);  
1,453,766✔
382
  STqHandle* pHandle = NULL;
1,453,394✔
383
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
1,453,394✔
384
  if (code != 0){
1,453,037✔
385
    tqInfo("vgId:%d, tmq subscribe req:%s, no such handle, create new one, msg:%s", pTq->pVnode->config.vgId, req.subKey, tstrerror(code));
466,232✔
386
  }
387
  taosRUnLockLatch(&pTq->lock);
1,453,394✔
388
  if (pHandle == NULL) {
1,453,069✔
389
    if (req.oldConsumerId != -1) {
466,589✔
390
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
×
391
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
392
    }
393
    if (req.newConsumerId == -1) {
466,589✔
394
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64, req.vgId, req.newConsumerId);
×
395
      ret = TSDB_CODE_INVALID_PARA;
×
396
      goto end;
×
397
    }
398
    STqHandle handle = {0};
466,589✔
399
    ret = tqMetaCreateHandle(pTq, &req, &handle);
466,589✔
400
    if (ret < 0) {
466,589✔
401
      tqDestroySTqHandle(&handle);
×
402
      goto end;
×
403
    }
404
    taosWLockLatch(&pTq->lock);
466,589✔
405
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
466,589✔
406
    taosWUnLockLatch(&pTq->lock);
464,331✔
407
  } else {
408
    int maxLoop = MAX_LOOP;
986,480✔
409
    while (maxLoop-- > 0) {
986,805✔
410
      taosWLockLatch(&pTq->lock);
986,805✔
411
      bool exec = tqIsHandleExec(pHandle);
986,805✔
412
      if (exec) {
986,805✔
413
        tqInfo("vgId:%d, topic:%s, subscribe is executing, subscribe wait for 10ms and retry, pHandle:%p",
×
414
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
415
        taosWUnLockLatch(&pTq->lock);
×
416
        taosMsleep(10);
×
417
        continue;
325✔
418
      }
419
      if (req.subType == TOPIC_SUB_TYPE__COLUMN && strcmp(req.qmsg, pHandle->execHandle.execCol.qmsg) != 0) {
989,883✔
420
        tqInfo("vgId:%d, topic:%s, subscribe qmsg changed from %s to %s, need recreate handle", pTq->pVnode->config.vgId,
3,078✔
421
               pHandle->subKey, pHandle->execHandle.execCol.qmsg, req.qmsg);
422
        // tqUnregisterPushHandle(pTq, pHandle);
423
        STqHandle handle = {0};
3,078✔
424
        ret = tqMetaCreateHandle(pTq, &req, &handle);
3,078✔
425
        if (ret < 0) {
3,078✔
426
          tqDestroySTqHandle(&handle);
×
427
          taosWUnLockLatch(&pTq->lock);
×
428
          goto end;
×
429
        }
430
        ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
3,078✔
431
        tqInfo("vgId:%d, reload subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
3,078✔
432
         req.oldConsumerId, req.newConsumerId);
433
      } else if (pHandle->consumerId == req.newConsumerId) {  // do nothing
983,727✔
434
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
53,368✔
435
      } else {
436
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
929,987✔
437
               req.newConsumerId);
438

439
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
929,987✔
440
        atomic_store_32(&pHandle->epoch, 0);
930,359✔
441
        // tqUnregisterPushHandle(pTq, pHandle);
442
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
930,359✔
443
      }
444
      taosWUnLockLatch(&pTq->lock);
982,052✔
445
      break;
984,002✔
446
    }
447
  }
448

449
end:
1,446,668✔
450
  if (req.schema.pSchema != NULL) {
1,449,462✔
451
    taosMemoryFree(req.schema.pSchema);
824,339✔
452
  }
453
  tDecoderClear(&dc);
1,444,507✔
454
  return ret;
1,441,299✔
455
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc