• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5071

17 May 2026 01:15AM UTC coverage: 63.054% (-10.3%) from 73.326%
#5071

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

238317 of 377957 relevant lines covered (63.05%)

130539817.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.71
/source/dnode/vnode/src/tq/tqInterface.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tq.h"
17
#include <string.h>
18
#include "osDef.h"
19
#include "taoserror.h"
20
#include "stream.h"
21
#include "vnd.h"
22

23
#define MAX_LOOP 100
24

25
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return pHandle != NULL ? TMQ_HANDLE_STATUS_EXEC == pHandle->status : true; }
188,842,978✔
26
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
187,793,540✔
27
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { if (pHandle != NULL) pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
187,743,964✔
28

29
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
1,483,182✔
30
  if (pLeft == NULL || pRight == NULL) {
1,483,182✔
31
    return false;
×
32
  }
33
  return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
2,897,682✔
34
         pLeft->val.version == pRight->val.version;
1,414,874✔
35
}
36

37
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,618,947✔
38
  if (pTq == NULL) {
1,618,947✔
39
    return TSDB_CODE_INVALID_PARA;
×
40
  }
41
  SMqVgOffset vgOffset = {0};
1,618,947✔
42
  int32_t     vgId = TD_VID(pTq->pVnode);
1,618,947✔
43

44
  int32_t  code = 0;
1,618,947✔
45
  SDecoder decoder = {0};
1,618,947✔
46
  tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
1,618,947✔
47
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
1,618,947✔
48
    code = TSDB_CODE_INVALID_MSG;
×
49
    goto end;
×
50
  }
51

52
  tDecoderClear(&decoder);
1,618,947✔
53

54
  STqOffset* pOffset = &vgOffset.offset;
1,618,947✔
55

56
  if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
1,618,947✔
57
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
72,880✔
58
            pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
59
  } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
1,546,067✔
60
    tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
1,546,067✔
61
            pOffset->val.version);
62
  } else {
63
    tqError("invalid commit offset type:%d", pOffset->val.type);
×
64
    code = TSDB_CODE_INVALID_MSG;
×
65
    goto end;
×
66
  }
67

68
  STqOffset* pSavedOffset = NULL;
1,618,947✔
69
  code = tqMetaGetOffset(pTq, pOffset->subKey, &pSavedOffset);
1,618,947✔
70
  if (code == 0 && tqOffsetEqual(pOffset, pSavedOffset)) {
1,618,947✔
71
    tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
×
72
           vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
73
    goto end;  // no need to update the offset value
×
74
  }
75

76
  // if (pOffset->val.type == TMQ_OFFSET__LOG && vgOffset.markWal) {
77
  //   int32_t ret = walSetKeepVersion(pTq->pVnode->pWal, pOffset->val.version);
78
  //   tqDebug("set wal reader keep version to %" PRId64 " for vgId:%d sub:%s, code:%d", pOffset->val.version, vgId,
79
  //           pOffset->subKey, ret);
80
  // }
81
  // save the new offset value
82
  code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
1,618,573✔
83
  if (code != 0) {
1,618,539✔
84
    goto end;
×
85
  }
86

87
  return 0;
1,618,539✔
88
end:
×
89
  tDecoderClear(&decoder);
×
90
  tOffsetDestroy(&vgOffset.offset.val);
×
91
  return code;
×
92
}
93

94
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
187,814,729✔
95
  if (pTq == NULL || pMsg == NULL) {
187,814,729✔
96
    return TSDB_CODE_INVALID_PARA;
×
97
  }
98
  SMqPollReq req = {0};
187,819,953✔
99
  int        code = tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req);
187,820,349✔
100
  if (code < 0) {
187,756,440✔
101
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
102
    code = TSDB_CODE_INVALID_MSG;
×
103
    goto END;
×
104
  }
105
  if (req.rawData == 1){
187,756,440✔
106
    req.uidHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
×
107
    if (req.uidHash == NULL) {
×
108
      tqError("tq poll rawData taosHashInit failed");
×
109
      code = terrno;
×
110
      goto END;
×
111
    }
112
  }
113
  int64_t      consumerId = req.consumerId;
187,756,440✔
114
  int32_t      reqEpoch = req.epoch;
187,756,440✔
115
  int32_t      vgId = TD_VID(pTq->pVnode);
187,756,440✔
116
  STqHandle*   pHandle = NULL;
187,796,968✔
117

118
  while (1) {
2,884✔
119
    taosWLockLatch(&pTq->lock);
187,703,547✔
120
    // 1. find handle
121
    code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
187,799,626✔
122
    if (code != TDB_CODE_SUCCESS) {
187,771,031✔
123
      tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found, msg:%s", consumerId, vgId, req.subKey, tstrerror(code));
3,870✔
124
      taosWUnLockLatch(&pTq->lock);
3,870✔
125
      return code;
3,870✔
126
    }
127

128
    // 2. check rebalance status
129
    if (pHandle->consumerId != consumerId) {
187,767,161✔
130
      tqWarn("tmq poll: consumer:0x%" PRIx64" vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
131
              consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
132
      code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
133
      taosWUnLockLatch(&pTq->lock);
×
134
      goto END;
×
135
    }
136

137
    bool exec = tqIsHandleExec(pHandle);
187,768,195✔
138
    if (!exec) {
187,771,666✔
139
      tqSetHandleExec(pHandle);
187,793,540✔
140
      //      qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
141
      tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
187,743,855✔
142
              req.subKey, pHandle);
143
      taosWUnLockLatch(&pTq->lock);
187,814,864✔
144
      break;
187,831,120✔
145
    }
146
    taosWUnLockLatch(&pTq->lock);
×
147

148
    tqDebug("tmq poll: consumer:0x%" PRIx64
2,884✔
149
            " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p",
150
            consumerId, vgId, req.subKey, pHandle);
151
    taosMsleep(10);
2,884✔
152
  }
153

154
  // 3. update the epoch value
155
  if (pHandle->epoch < reqEpoch) {
187,831,120✔
156
    tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch,
368,969✔
157
            reqEpoch);
158
    pHandle->epoch = reqEpoch;
368,969✔
159
  }
160

161
  char buf[TSDB_OFFSET_LEN] = {0};
187,831,939✔
162
  (void)tFormatOffset(buf, TSDB_OFFSET_LEN, &req.reqOffset);
187,831,128✔
163
  tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, QID:0x%" PRIx64,
187,831,939✔
164
          consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
165

166
  code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
187,827,876✔
167
  tqSetHandleIdle(pHandle);
187,743,964✔
168

169
  tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
187,788,139✔
170
          req.subKey, pHandle);
171

172
END:
187,820,339✔
173
  tDestroySMqPollReq(&req);
187,830,715✔
174
  return code;
187,825,893✔
175
}
176

177
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
1,536✔
178
  if (pTq == NULL || pMsg == NULL) {
1,536✔
179
    return TSDB_CODE_INVALID_PARA;
×
180
  }
181
  void*   data = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
1,536✔
182
  int32_t len = pMsg->contLen - sizeof(SMsgHead);
1,536✔
183

184
  SMqVgOffset vgOffset = {0};
1,536✔
185

186
  SDecoder decoder;
1,536✔
187
  tDecoderInit(&decoder, (uint8_t*)data, len);
1,536✔
188
  if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
1,536✔
189
    tDecoderClear(&decoder);
×
190
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
191
    return terrno;
×
192
  }
193

194
  tDecoderClear(&decoder);
1,536✔
195

196
  STqOffset* pSavedOffset = NULL;
1,536✔
197
  int32_t    code = tqMetaGetOffset(pTq, vgOffset.offset.subKey, &pSavedOffset);
1,536✔
198
  if (code != 0) {
1,536✔
199
    return TSDB_CODE_TMQ_NO_COMMITTED;
×
200
  }
201
  vgOffset.offset = *pSavedOffset;
1,536✔
202

203
  tEncodeSize(tEncodeMqVgOffset, &vgOffset, len, code);
1,536✔
204
  if (code < 0) {
1,536✔
205
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
206
  }
207

208
  void* buf = rpcMallocCont(len);
1,536✔
209
  if (buf == NULL) {
1,536✔
210
    return terrno;
×
211
  }
212
  SEncoder encoder = {0};
1,536✔
213
  tEncoderInit(&encoder, buf, len);
1,536✔
214
  code = tEncodeMqVgOffset(&encoder, &vgOffset);
1,536✔
215
  tEncoderClear(&encoder);
1,536✔
216
  if (code < 0) {
1,536✔
217
    rpcFreeCont(buf);
×
218
    return TAOS_GET_TERRNO(TSDB_CODE_INVALID_PARA);
×
219
  }
220

221
  SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0};
1,536✔
222

223
  tmsgSendRsp(&rsp);
1,536✔
224
  return 0;
1,536✔
225
}
226

227
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
×
228
  if (pTq == NULL || pMsg == NULL) {
×
229
    return TSDB_CODE_INVALID_PARA;
×
230
  }
231
  int32_t    code = 0;
×
232
  SMqPollReq req = {0};
×
233
  if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
×
234
    tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
×
235
    return TSDB_CODE_INVALID_MSG;
×
236
  }
237

238
  int64_t      consumerId = req.consumerId;
×
239
  STqOffsetVal reqOffset = req.reqOffset;
×
240
  int32_t      vgId = TD_VID(pTq->pVnode);
×
241

242
  // 1. find handle
243
  taosRLockLatch(&pTq->lock);
×
244
  STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
×
245
  if (pHandle == NULL) {
×
246
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
×
247
    taosRUnLockLatch(&pTq->lock);
×
248
    return TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
×
249
  }
250

251
  // 2. check rebalance status
252
  if (pHandle->consumerId != consumerId) {
×
253
    tqError("%s consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
×
254
            __func__, consumerId, vgId, req.subKey, pHandle->consumerId);
255
    taosRUnLockLatch(&pTq->lock);
×
256
    return TSDB_CODE_TMQ_CONSUMER_MISMATCH;
×
257
  }
258

259
  int64_t sver = 0, ever = 0;
×
260
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
×
261
  taosRUnLockLatch(&pTq->lock);
×
262

263
  SMqDataRsp dataRsp = {0};
×
264
  code = tqInitDataRsp(&dataRsp, req.reqOffset);
×
265
  if (code != 0) {
×
266
    return code;
×
267
  }
268

269
  if (req.useSnapshot == true) {
×
270
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
×
271
    code = TSDB_CODE_INVALID_PARA;
×
272
    goto END;
×
273
  }
274

275
  dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
×
276

277
  if (reqOffset.type == TMQ_OFFSET__LOG) {
×
278
    dataRsp.rspOffset.version = reqOffset.version;
×
279
  } else if (reqOffset.type < 0) {
×
280
    STqOffset* pOffset = NULL;
×
281
    code = tqMetaGetOffset(pTq, req.subKey, &pOffset);
×
282
    if (code == 0) {
×
283
      if (pOffset->val.type != TMQ_OFFSET__LOG) {
×
284
        tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
×
285
        code = TSDB_CODE_INVALID_PARA;
×
286
        goto END;
×
287
      }
288

289
      dataRsp.rspOffset.version = pOffset->val.version;
×
290
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId,
×
291
             req.subKey, dataRsp.rspOffset.version);
292
    } else {
293
      if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
×
294
        dataRsp.rspOffset.version = sver;  // not consume yet, set the earliest position
×
295
      } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
×
296
        dataRsp.rspOffset.version = ever;
×
297
      }
298
      tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey,
×
299
             dataRsp.rspOffset.version);
300
    }
301
  } else {
302
    tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
×
303
            reqOffset.type);
304
    code = TSDB_CODE_INVALID_PARA;
×
305
    goto END;
×
306
  }
307

308
  code = tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
×
309

310
END:
×
311
  tDeleteMqDataRsp(&dataRsp);
×
312
  return code;
×
313
}
314

315
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
165,020✔
316
  if (pTq == NULL || msg == NULL) {
165,020✔
317
    return TSDB_CODE_INVALID_PARA;
×
318
  }
319
  SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
165,020✔
320
  int32_t        vgId = TD_VID(pTq->pVnode);
165,020✔
321

322
  tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
165,020✔
323
  int32_t code = 0;
165,020✔
324

325
  STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
165,020✔
326
  if (pHandle) {
165,020✔
327
    while (1) {
×
328
      taosWLockLatch(&pTq->lock);
165,020✔
329
      bool exec = tqIsHandleExec(pHandle);
165,020✔
330

331
      if (exec) {
165,020✔
332
        tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
×
333
               pHandle->subKey, pHandle);
334
        taosWUnLockLatch(&pTq->lock);
×
335
        taosMsleep(10);
×
336
        continue;
×
337
      }
338
      // tqUnregisterPushHandle(pTq, pHandle);
339
      code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
165,020✔
340
      if (code != 0) {
165,020✔
341
        tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
×
342
      }
343
      taosWUnLockLatch(&pTq->lock);
165,020✔
344
      break;
165,020✔
345
    }
346
  }
347

348
  taosWLockLatch(&pTq->lock);
165,020✔
349
  if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey)) != 0) {
164,636✔
350
    tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
77,720✔
351
  }
352
  if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
165,020✔
353
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
160,466✔
354
  }
355

356
  if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
164,272✔
357
    tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
×
358
  }
359
  taosWUnLockLatch(&pTq->lock);
163,803✔
360

361
  return 0;
164,645✔
362
}
363

364
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
1,175,080✔
365
  if (pTq == NULL || msg == NULL) {
1,175,080✔
366
    return TSDB_CODE_INVALID_PARA;
×
367
  }
368
  int         ret = 0;
1,175,080✔
369
  SMqRebVgReq req = {0};
1,175,080✔
370
  SDecoder    dc = {0};
1,175,080✔
371

372
  tDecoderInit(&dc, (uint8_t*)msg, msgLen);
1,175,080✔
373
  ret = tDecodeSMqRebVgReq(&dc, &req);
1,175,080✔
374
  if (ret < 0) {
1,175,080✔
375
    goto end;
×
376
  }
377

378
  tqInfo("vgId:%d, tmq subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
1,175,080✔
379
         req.oldConsumerId, req.newConsumerId);
380

381
  taosRLockLatch(&pTq->lock);  
1,175,852✔
382
  STqHandle* pHandle = NULL;
1,175,080✔
383
  int32_t code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
1,175,080✔
384
  if (code != 0){
1,173,904✔
385
    tqInfo("vgId:%d, tmq subscribe req:%s, no such handle, create new one, msg:%s", pTq->pVnode->config.vgId, req.subKey, tstrerror(code));
264,141✔
386
  }
387
  taosRUnLockLatch(&pTq->lock);
1,175,080✔
388
  if (pHandle == NULL) {
1,175,080✔
389
    if (req.oldConsumerId != -1) {
265,317✔
390
      tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
×
391
              req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
392
    }
393
    if (req.newConsumerId == -1) {
265,317✔
394
      tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64, req.vgId, req.newConsumerId);
×
395
      ret = TSDB_CODE_INVALID_PARA;
×
396
      goto end;
×
397
    }
398
    STqHandle handle = {0};
265,317✔
399
    ret = tqMetaCreateHandle(pTq, &req, &handle);
265,317✔
400
    if (ret < 0) {
264,525✔
401
      tqDestroySTqHandle(&handle);
×
402
      goto end;
×
403
    }
404
    taosWLockLatch(&pTq->lock);
264,525✔
405
    ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
265,317✔
406
    taosWUnLockLatch(&pTq->lock);
264,943✔
407
  } else {
408
    int maxLoop = MAX_LOOP;
909,763✔
409
    while (maxLoop-- > 0) {
909,763✔
410
      taosWLockLatch(&pTq->lock);
909,763✔
411
      bool exec = tqIsHandleExec(pHandle);
909,763✔
412
      if (exec) {
909,763✔
413
        tqInfo("vgId:%d, topic:%s, subscribe is executing, subscribe wait for 10ms and retry, pHandle:%p",
×
414
               pTq->pVnode->config.vgId, pHandle->subKey, pHandle);
415
        taosWUnLockLatch(&pTq->lock);
×
416
        taosMsleep(10);
×
417
        continue;
×
418
      }
419
      if (req.subType == TOPIC_SUB_TYPE__COLUMN && strcmp(req.qmsg, pHandle->execHandle.execCol.qmsg) != 0) {
910,513✔
420
        tqInfo("vgId:%d, topic:%s, subscribe qmsg changed from %s to %s, need recreate handle", pTq->pVnode->config.vgId,
750✔
421
               pHandle->subKey, pHandle->execHandle.execCol.qmsg, req.qmsg);
422
        // tqUnregisterPushHandle(pTq, pHandle);
423
        STqHandle handle = {0};
750✔
424
        ret = tqMetaCreateHandle(pTq, &req, &handle);
750✔
425
        if (ret < 0) {
750✔
426
          tqDestroySTqHandle(&handle);
×
427
          taosWUnLockLatch(&pTq->lock);
×
428
          goto end;
×
429
        }
430
        ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
750✔
431
        tqInfo("vgId:%d, reload subscribe req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
750✔
432
         req.oldConsumerId, req.newConsumerId);
433
      } else if (pHandle->consumerId == req.newConsumerId) {  // do nothing
909,013✔
434
        tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
×
435
      } else {
436
        tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
909,013✔
437
               req.newConsumerId);
438

439
        atomic_store_64(&pHandle->consumerId, req.newConsumerId);
909,013✔
440
        atomic_store_32(&pHandle->epoch, 0);
909,013✔
441
        // tqUnregisterPushHandle(pTq, pHandle);
442
        ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
909,013✔
443
      }
444
      taosWUnLockLatch(&pTq->lock);
906,583✔
445
      break;
908,228✔
446
    }
447
  }
448

449
end:
1,172,755✔
450
  if (req.schema.pSchema != NULL) {
1,172,739✔
451
    taosMemoryFree(req.schema.pSchema);
801,609✔
452
  }
453
  tDecoderClear(&dc);
1,170,330✔
454
  return ret;
1,170,675✔
455
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc