• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5071

17 May 2026 01:15AM UTC coverage: 63.054% (-10.3%) from 73.326%
#5071

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

238317 of 377957 relevant lines covered (63.05%)

130539817.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.03
/source/dnode/mnode/impl/src/mndDef.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#define _DEFAULT_SOURCE
16
#include "mndConsumer.h"
17
#include "mndDef.h"
18
#include "mndStream.h"
19
#include "taoserror.h"
20
#include "tunit.h"
21

22
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
434,914✔
23
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
434,914✔
24

25
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
869,828✔
26
  TAOS_CHECK_RETURN(tSerializeSCMCreateStreamReqImpl(pEncoder, pObj->pCreate));
434,914✔
27

28
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->mainSnodeId));
869,828✔
29
  TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->userStopped));
869,828✔
30
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->createTime));
869,828✔
31
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->updateTime));
869,828✔
32
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->createUser));
869,828✔
33
  TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->ownerId));
869,828✔
34

35
  tEndEncode(pEncoder);
434,914✔
36
  return pEncoder->pos;
434,914✔
37
}
38

39
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
128,318✔
40
  int32_t code = 0;
128,318✔
41
  int32_t lino = 0;
128,318✔
42
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
128,318✔
43

44
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
128,318✔
45
  pObj->pCreate = taosMemoryCalloc(1, sizeof(*pObj->pCreate));
128,318✔
46
  if (NULL == pObj) {
128,318✔
47
    TAOS_CHECK_EXIT(terrno);
×
48
  }
49
  
50
  if (MND_STREAM_VER_NUMBER == sver) {
128,318✔
51
    TAOS_CHECK_RETURN(tDeserializeSCMCreateStreamReqImpl(pDecoder, pObj->pCreate));
128,318✔
52
  } else {
53
    TAOS_CHECK_RETURN(
×
54
      tDeserializeSCMCreateStreamReqImplOld(pDecoder, pObj->pCreate, 21));
55
  }
56

57
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->mainSnodeId));
256,636✔
58
  TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pObj->userStopped));
256,636✔
59
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->createTime));
256,636✔
60
  TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->updateTime));
256,636✔
61
  if (!tDecodeIsEnd(pDecoder)) {
128,318✔
62
    TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->createUser));
128,318✔
63
    TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->ownerId));
256,636✔
64
  }
65

66
_exit:
128,318✔
67

68
  tEndDecode(pDecoder);
128,318✔
69
  tDecoderClear(pDecoder);  
128,318✔
70
  
71
  TAOS_RETURN(code);
128,318✔
72
}
73

74
void tFreeStreamObj(SStreamObj *pStream) {
236,460✔
75
  tFreeSCMCreateStreamReq(pStream->pCreate);
236,460✔
76
  taosMemoryFreeClear(pStream->pCreate);
236,460✔
77
}
236,460✔
78

79
void freeSMqConsumerEp(void* data) {
362,695✔
80
  if (data == NULL) return;
362,695✔
81
  SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
362,695✔
82
  taosArrayDestroy(pConsumerEp->vgs);
362,695✔
83
  pConsumerEp->vgs = NULL;
362,695✔
84
  taosArrayDestroy(pConsumerEp->offsetRows);
362,695✔
85
  pConsumerEp->offsetRows = NULL;
362,695✔
86
}
87

88
static void *tDecodeSMqVgEp(const void *buf, int32_t *vgId, int8_t sver) {
×
89
  buf = taosDecodeFixedI32(buf, vgId);
×
90
  if (sver == 1) {
×
91
    uint64_t size = 0;
×
92
    buf = taosDecodeVariantU64(buf, &size);
×
93
    buf = POINTER_SHIFT(buf, size);
×
94
  }
95
  if (sver <= 3) {
×
96
    SEpSet epSet = {0};
×
97
    buf = taosDecodeSEpSet(buf, &epSet);
×
98
  }
99
  return (void *)buf;
×
100
}
101

102
static void *topicNameDup(void *p) { return taosStrdup((char *)p); }
128,842✔
103

104
int32_t tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t updateType, char *topic, SCMSubscribeReq *subscribe,
522,519✔
105
                           SMqConsumerObj **ppConsumer) {
106
  int32_t         code = 0;
522,519✔
107
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
522,519✔
108
  if (pConsumer == NULL) {
522,519✔
109
    code = terrno;
×
110
    goto END;
×
111
  }
112

113
  pConsumer->consumerId = consumerId;
522,519✔
114
  (void)memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);
522,519✔
115

116
  pConsumer->epoch = 0;
522,519✔
117
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
522,519✔
118
  pConsumer->hbStatus = 0;
522,519✔
119
  pConsumer->pollStatus = 0;
522,519✔
120

121
  taosInitRWLatch(&pConsumer->lock);
522,519✔
122
  pConsumer->createTime = taosGetTimestampMs();
522,519✔
123
  pConsumer->updateType = updateType;
522,519✔
124

125
  if (updateType == CONSUMER_ADD_REB) {
522,519✔
126
    pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
131,201✔
127
    if (pConsumer->rebNewTopics == NULL) {
131,201✔
128
      code = terrno;
×
129
      goto END;
×
130
    }
131

132
    char *topicTmp = taosStrdup(topic);
131,201✔
133
    if (taosArrayPush(pConsumer->rebNewTopics, &topicTmp) == NULL) {
262,402✔
134
      code = terrno;
×
135
      goto END;
×
136
    }
137
  } else if (updateType == CONSUMER_REMOVE_REB) {
391,318✔
138
    pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
90,066✔
139
    if (pConsumer->rebRemovedTopics == NULL) {
90,066✔
140
      code = terrno;
×
141
      goto END;
×
142
    }
143
    char *topicTmp = taosStrdup(topic);
90,066✔
144
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topicTmp) == NULL) {
180,132✔
145
      code = terrno;
×
146
      goto END;
×
147
    }
148
  } else if (updateType == CONSUMER_INSERT_SUB) {
301,252✔
149
    tstrncpy(pConsumer->clientId, subscribe->clientId, tListLen(pConsumer->clientId));
126,408✔
150
    pConsumer->withTbName = subscribe->withTbName;
126,408✔
151
    pConsumer->autoCommit = subscribe->autoCommit;
126,408✔
152
    pConsumer->autoCommitInterval = subscribe->autoCommitInterval;
126,408✔
153
    pConsumer->resetOffsetCfg = subscribe->resetOffsetCfg;
126,408✔
154
    pConsumer->maxPollIntervalMs = subscribe->maxPollIntervalMs;
126,408✔
155
    pConsumer->sessionTimeoutMs = subscribe->sessionTimeoutMs;
126,408✔
156
    tstrncpy(pConsumer->user, subscribe->user, TSDB_USER_LEN);
126,408✔
157
    tstrncpy(pConsumer->fqdn, subscribe->fqdn, TSDB_FQDN_LEN);
126,408✔
158

159
    pConsumer->rebNewTopics = taosArrayDup(subscribe->topicNames, topicNameDup);
126,408✔
160
    if (pConsumer->rebNewTopics == NULL) {
126,408✔
161
      code = terrno;
×
162
      goto END;
×
163
    }
164
    pConsumer->assignedTopics = subscribe->topicNames;
126,408✔
165
    subscribe->topicNames = NULL;
126,408✔
166
  } else if (updateType == CONSUMER_UPDATE_SUB) {
174,844✔
167
    pConsumer->assignedTopics = subscribe->topicNames;
92,306✔
168
    subscribe->topicNames = NULL;
92,306✔
169
  }
170

171
  *ppConsumer = pConsumer;
522,519✔
172
  return 0;
522,519✔
173

174
END:
×
175
  tDeleteSMqConsumerObj(pConsumer);
×
176
  return code;
×
177
}
178

179
void tClearSMqConsumerObj(SMqConsumerObj *pConsumer) {
2,008,907✔
180
  if (pConsumer == NULL) return;
2,008,907✔
181
  taosArrayDestroyP(pConsumer->currentTopics, NULL);
1,042,590✔
182
  taosArrayDestroyP(pConsumer->rebNewTopics, NULL);
1,042,590✔
183
  taosArrayDestroyP(pConsumer->rebRemovedTopics, NULL);
1,042,590✔
184
  taosArrayDestroyP(pConsumer->assignedTopics, NULL);
1,042,590✔
185
}
186

187
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
1,488,836✔
188
  tClearSMqConsumerObj(pConsumer);
1,488,836✔
189
  taosMemoryFree(pConsumer);
1,488,836✔
190
}
1,488,836✔
191

192
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
1,288,818✔
193
  int32_t tlen = 0;
1,288,818✔
194
  int32_t sz;
195
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
1,288,818✔
196
  tlen += taosEncodeString(buf, pConsumer->clientId);
1,288,818✔
197
  tlen += taosEncodeString(buf, pConsumer->cgroup);
1,288,818✔
198
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
1,288,818✔
199
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
1,288,818✔
200
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
1,288,818✔
201

202
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
1,288,818✔
203
  tlen += taosEncodeFixedI64(buf, pConsumer->createTime);
1,288,818✔
204
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
1,288,818✔
205
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
1,288,818✔
206

207
  // current topics
208
  if (pConsumer->currentTopics) {
1,288,818✔
209
    sz = taosArrayGetSize(pConsumer->currentTopics);
254,050✔
210
    tlen += taosEncodeFixedI32(buf, sz);
254,050✔
211
    for (int32_t i = 0; i < sz; i++) {
405,354✔
212
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
151,304✔
213
      tlen += taosEncodeString(buf, topic);
151,304✔
214
    }
215
  } else {
216
    tlen += taosEncodeFixedI32(buf, 0);
1,034,768✔
217
  }
218

219
  // reb new topics
220
  if (pConsumer->rebNewTopics) {
1,288,818✔
221
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
943,610✔
222
    tlen += taosEncodeFixedI32(buf, sz);
943,610✔
223
    for (int32_t i = 0; i < sz; i++) {
1,555,432✔
224
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
611,822✔
225
      tlen += taosEncodeString(buf, topic);
611,822✔
226
    }
227
  } else {
228
    tlen += taosEncodeFixedI32(buf, 0);
345,208✔
229
  }
230

231
  // reb removed topics
232
  if (pConsumer->rebRemovedTopics) {
1,288,818✔
233
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
608,524✔
234
    tlen += taosEncodeFixedI32(buf, sz);
608,524✔
235
    for (int32_t i = 0; i < sz; i++) {
962,358✔
236
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
353,834✔
237
      tlen += taosEncodeString(buf, topic);
353,834✔
238
    }
239
  } else {
240
    tlen += taosEncodeFixedI32(buf, 0);
680,294✔
241
  }
242

243
  // lost topics
244
  if (pConsumer->assignedTopics) {
1,288,818✔
245
    sz = taosArrayGetSize(pConsumer->assignedTopics);
681,208✔
246
    tlen += taosEncodeFixedI32(buf, sz);
681,208✔
247
    for (int32_t i = 0; i < sz; i++) {
1,181,932✔
248
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
500,724✔
249
      tlen += taosEncodeString(buf, topic);
500,724✔
250
    }
251
  } else {
252
    tlen += taosEncodeFixedI32(buf, 0);
607,610✔
253
  }
254

255
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
1,288,818✔
256
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
1,288,818✔
257
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
1,288,818✔
258
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
1,288,818✔
259
  tlen += taosEncodeFixedI32(buf, pConsumer->maxPollIntervalMs);
1,288,818✔
260
  tlen += taosEncodeFixedI32(buf, pConsumer->sessionTimeoutMs);
1,288,818✔
261
  tlen += taosEncodeString(buf, pConsumer->user);
1,288,818✔
262
  tlen += taosEncodeString(buf, pConsumer->fqdn);
1,288,818✔
263
  return tlen;
1,288,818✔
264
}
265

266
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
520,071✔
267
  int32_t sz;
520,071✔
268
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
520,071✔
269
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
520,071✔
270
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
520,071✔
271
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
520,071✔
272
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
520,071✔
273
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
520,071✔
274

275
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
520,071✔
276
  if (sver <= 3) {
520,071✔
277
    SEpSet ep = {0};
×
278
    buf = taosDecodeSEpSet(buf, &ep);
×
279
  }
280
  buf = taosDecodeFixedI64(buf, &pConsumer->createTime);
520,071✔
281
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
520,071✔
282
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
1,040,142✔
283

284
  // current topics
285
  buf = taosDecodeFixedI32(buf, &sz);
520,071✔
286
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
520,071✔
287
  if (pConsumer->currentTopics == NULL) {
520,071✔
288
    return NULL;
×
289
  }
290
  for (int32_t i = 0; i < sz; i++) {
522,755✔
291
    char *topic;
2,684✔
292
    buf = taosDecodeString(buf, &topic);
2,684✔
293
    if (taosArrayPush(pConsumer->currentTopics, &topic) == NULL) {
5,368✔
294
      return NULL;
×
295
    }
296
  }
297

298
  // reb new topics
299
  buf = taosDecodeFixedI32(buf, &sz);
520,071✔
300
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
520,071✔
301
  for (int32_t i = 0; i < sz; i++) {
782,868✔
302
    char *topic;
262,797✔
303
    buf = taosDecodeString(buf, &topic);
262,797✔
304
    if (taosArrayPush(pConsumer->rebNewTopics, &topic) == NULL) {
525,594✔
305
      return NULL;
×
306
    }
307
  }
308

309
  // reb removed topics
310
  buf = taosDecodeFixedI32(buf, &sz);
520,071✔
311
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
520,071✔
312
  for (int32_t i = 0; i < sz; i++) {
696,988✔
313
    char *topic;
176,917✔
314
    buf = taosDecodeString(buf, &topic);
176,917✔
315
    if (taosArrayPush(pConsumer->rebRemovedTopics, &topic) == NULL) {
353,834✔
316
      return NULL;
×
317
    }
318
  }
319

320
  // reb removed topics
321
  buf = taosDecodeFixedI32(buf, &sz);
520,071✔
322
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
520,071✔
323
  for (int32_t i = 0; i < sz; i++) {
654,351✔
324
    char *topic;
134,280✔
325
    buf = taosDecodeString(buf, &topic);
134,280✔
326
    if (taosArrayPush(pConsumer->assignedTopics, &topic) == NULL) {
268,560✔
327
      return NULL;
×
328
    }
329
  }
330

331
  if (sver > 1) {
520,071✔
332
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
520,071✔
333
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
520,071✔
334
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
520,071✔
335
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
1,040,142✔
336
  }
337
  if (sver > 2) {
520,071✔
338
    buf = taosDecodeFixedI32(buf, &pConsumer->maxPollIntervalMs);
520,071✔
339
    buf = taosDecodeFixedI32(buf, &pConsumer->sessionTimeoutMs);
520,071✔
340
    buf = taosDecodeStringTo(buf, pConsumer->user);
520,071✔
341
    buf = taosDecodeStringTo(buf, pConsumer->fqdn);
1,040,142✔
342
  } else {
343
    pConsumer->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
×
344
    pConsumer->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
×
345
  }
346

347
  return (void *)buf;
520,071✔
348
}
349

350
int32_t tEncodeOffRows(void **buf, SArray *offsetRows) {
1,144,734✔
351
  int32_t tlen = 0;
1,144,734✔
352
  int32_t szVgs = taosArrayGetSize(offsetRows);
1,144,734✔
353
  tlen += taosEncodeFixedI32(buf, szVgs);
1,144,734✔
354
  for (int32_t j = 0; j < szVgs; ++j) {
2,535,488✔
355
    OffsetRows *offRows = taosArrayGet(offsetRows, j);
1,390,754✔
356
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
1,390,754✔
357
    tlen += taosEncodeFixedI64(buf, offRows->rows);
1,390,754✔
358
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
1,390,754✔
359
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
1,390,754✔
360
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
21,352✔
361
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
42,704✔
362
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
1,369,402✔
363
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
2,394,732✔
364
    } else {
365
      // do nothing
366
    }
367
    tlen += taosEncodeFixedI64(buf, offRows->ever);
2,781,508✔
368
  }
369

370
  return tlen;
1,144,734✔
371
}
372

373
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
422,462✔
374
  int32_t tlen = 0;
422,462✔
375
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
422,462✔
376
  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
422,462✔
377
  tlen += taosEncodeFixedI32(buf, sz);
422,462✔
378
  for (int32_t i = 0; i < sz; i++) {
1,335,378✔
379
    int32_t* vgId = (int32_t*)taosArrayGet(pConsumerEp->vgs, i);
912,916✔
380
    if (vgId == NULL) continue;
912,916✔
381
    tlen += taosEncodeFixedI32(buf, *vgId);
1,825,832✔
382
  }
383

384
  return tlen + tEncodeOffRows(buf, pConsumerEp->offsetRows);
422,462✔
385
}
386

387
void *tDecodeOffRows(const void *buf, SArray **offsetRows, int8_t sver) {
409,033✔
388
  int32_t szVgs = 0;
409,033✔
389
  buf = taosDecodeFixedI32(buf, &szVgs);
409,033✔
390
  if (szVgs > 0) {
409,033✔
391
    *offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
170,410✔
392
    if (NULL == *offsetRows) return NULL;
170,410✔
393
    for (int32_t j = 0; j < szVgs; ++j) {
780,854✔
394
      OffsetRows *offRows = taosArrayReserve(*offsetRows, 1);
610,444✔
395
      buf = taosDecodeFixedI32(buf, &offRows->vgId);
610,444✔
396
      buf = taosDecodeFixedI64(buf, &offRows->rows);
610,444✔
397
      buf = taosDecodeFixedI8(buf, &offRows->offset.type);
610,444✔
398
      if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
610,444✔
399
        buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
9,906✔
400
        buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
19,812✔
401
      } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
600,538✔
402
        buf = taosDecodeFixedI64(buf, &offRows->offset.version);
1,057,266✔
403
      } else {
404
        // do nothing
405
      }
406
      if (sver > 2) {
610,444✔
407
        buf = taosDecodeFixedI64(buf, &offRows->ever);
1,220,888✔
408
      }
409
    }
410
  }
411
  return (void *)buf;
409,033✔
412
}
413

414
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
138,263✔
415
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
138,263✔
416
  int32_t sz = 0;
138,263✔
417
  buf = taosDecodeFixedI32(buf, &sz);
138,263✔
418
  pConsumerEp->vgs = taosArrayInit(sz, sizeof(int32_t));
138,263✔
419
  if (pConsumerEp->vgs == NULL) {
138,263✔
420
    return NULL;
×
421
  }
422
  for (int32_t i = 0; i < sz; i++) {
521,750✔
423
    int32_t* vgId = taosArrayReserve(pConsumerEp->vgs, 1);
383,487✔
424
    if (vgId == NULL) {
383,487✔
425
      return NULL;
×
426
    }
427
    if (sver <= 3) {
383,487✔
428
      buf = tDecodeSMqVgEp(buf, vgId, sver);
×
429
    } else {
430
      buf = taosDecodeFixedI32(buf, vgId);
383,487✔
431
    }
432
  }
433
  if (sver > 1) {
138,263✔
434
    buf = tDecodeOffRows(buf, &pConsumerEp->offsetRows, sver);
138,263✔
435
  }
436

437
  return (void *)buf;
138,263✔
438
}
439

440
int32_t tNewSubscribeObj(const char *key, SMqSubscribeObj **ppSub) {
97,972✔
441
  int32_t          code = 0;
97,972✔
442
  int32_t          lino = 0;
97,972✔
443
  PRINT_LOG_START
97,972✔
444
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
97,972✔
445
  MND_TMQ_NULL_CHECK(pSubObj);
97,972✔
446
  
447
  (void)memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
97,972✔
448
  taosInitRWLatch(&pSubObj->lock);
97,972✔
449
  pSubObj->vgNum = 0;
97,972✔
450
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
97,972✔
451
  MND_TMQ_NULL_CHECK(pSubObj->consumerHash);
97,972✔
452
  taosHashSetFreeFp(pSubObj->consumerHash, freeSMqConsumerEp);
97,972✔
453

454
  pSubObj->unassignedVgs = taosArrayInit(0, sizeof(int32_t));
97,972✔
455
  MND_TMQ_NULL_CHECK(pSubObj->unassignedVgs);
97,972✔
456
  *ppSub = pSubObj;
97,972✔
457
  pSubObj = NULL;
97,972✔
458

459
END:
97,972✔
460
  tDeleteSubscribeObj(pSubObj);
97,972✔
461
  PRINT_LOG_END
97,972✔
462
  return code;
97,972✔
463
}
464

465
int32_t tCloneSubscribeObj(SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) {
119,274✔
466
  int32_t          code = 0;
119,274✔
467
  int32_t          lino = 0;
119,274✔
468
  PRINT_LOG_START
119,274✔
469
  taosRLockLatch(&pSub->lock);
119,274✔
470

471
  SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
119,274✔
472
  MND_TMQ_NULL_CHECK(pSubNew);
119,274✔
473
  *ppSub = pSubNew;
119,274✔
474

475
  (void)memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
119,274✔
476
  taosInitRWLatch(&pSubNew->lock);
119,274✔
477

478
  pSubNew->dbUid = pSub->dbUid;
119,274✔
479
  pSubNew->stbUid = pSub->stbUid;
119,274✔
480
  pSubNew->subType = pSub->subType;
119,274✔
481
  pSubNew->withMeta = pSub->withMeta;
119,274✔
482

483
  pSubNew->vgNum = pSub->vgNum;
119,274✔
484
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
119,274✔
485
  MND_TMQ_NULL_CHECK(pSubNew->consumerHash);
119,274✔
486
  taosHashSetFreeFp(pSubNew->consumerHash, freeSMqConsumerEp);
119,274✔
487

488
  void          *pIter = NULL;
119,274✔
489
  SMqConsumerEp *pConsumerEp = NULL;
119,274✔
490
  while (1) {
93,231✔
491
    pIter = taosHashIterate(pSub->consumerHash, pIter);
212,505✔
492
    if (pIter == NULL) break;
212,505✔
493
    pConsumerEp = (SMqConsumerEp *)pIter;
93,231✔
494
    SMqConsumerEp newEp = {
93,231✔
495
        .consumerId = pConsumerEp->consumerId,
93,231✔
496
        .vgs = taosArrayDup(pConsumerEp->vgs, NULL),
93,231✔
497
    };
498
    MND_TMQ_RETURN_CHECK(taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)));
93,231✔
499
  }
500
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL);
119,274✔
501
  MND_TMQ_NULL_CHECK(pSubNew->unassignedVgs);
119,274✔
502
  
503
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
119,274✔
504
  MND_TMQ_CONDITION_CHECK(pSub->offsetRows == NULL || pSubNew->offsetRows != NULL, terrno);
119,274✔
505
  (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
119,274✔
506

507
END:
119,274✔
508
  taosRUnLockLatch(&pSub->lock);
119,274✔
509
  return code;
119,274✔
510
}
511

512
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
585,988✔
513
  if (pSub == NULL) return;
585,988✔
514
  taosHashCleanup(pSub->consumerHash);
488,016✔
515
  pSub->consumerHash = NULL;
488,016✔
516
  taosArrayDestroy(pSub->unassignedVgs);
488,016✔
517
  pSub->unassignedVgs = NULL;
488,016✔
518
  taosArrayDestroy(pSub->offsetRows);
488,016✔
519
  pSub->offsetRows = NULL;
488,016✔
520
}
521

522
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
722,272✔
523
  int32_t tlen = 0;
722,272✔
524
  tlen += taosEncodeString(buf, pSub->key);
722,272✔
525
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
722,272✔
526
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
722,272✔
527
  tlen += taosEncodeFixedI8(buf, pSub->subType);
722,272✔
528
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
722,272✔
529
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
722,272✔
530

531
  void   *pIter = NULL;
722,272✔
532
  int32_t sz = taosHashGetSize(pSub->consumerHash);
722,272✔
533
  tlen += taosEncodeFixedI32(buf, sz);
722,272✔
534

535
  int32_t cnt = 0;
722,272✔
536
  while (1) {
422,462✔
537
    pIter = taosHashIterate(pSub->consumerHash, pIter);
1,144,734✔
538
    if (pIter == NULL) break;
1,144,734✔
539
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
422,462✔
540
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
422,462✔
541
    cnt++;
422,462✔
542
  }
543
  if (cnt != sz) return -1;
722,272✔
544
  int32_t len = taosArrayGetSize(pSub->unassignedVgs);
722,272✔
545
  tlen += taosEncodeFixedI32(buf, len);
722,272✔
546
  for (int32_t i = 0; i < len; i++) {
1,733,118✔
547
    int32_t* vgId = (int32_t*)taosArrayGet(pSub->unassignedVgs, i);
1,010,846✔
548
    if (vgId == NULL) continue;
1,010,846✔
549
    tlen += taosEncodeFixedI32(buf, *vgId);
2,021,692✔
550
  }
551
  tlen += taosEncodeString(buf, pSub->dbName);
722,272✔
552

553
  tlen += tEncodeOffRows(buf, pSub->offsetRows);
722,272✔
554
  return tlen;
722,272✔
555
}
556

557
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
270,770✔
558
  //
559
  buf = taosDecodeStringTo(buf, pSub->key);
270,770✔
560
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
270,770✔
561
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
270,770✔
562
  buf = taosDecodeFixedI8(buf, &pSub->subType);
270,770✔
563
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
270,770✔
564
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
270,770✔
565

566
  int32_t sz;
270,770✔
567
  buf = taosDecodeFixedI32(buf, &sz);
270,770✔
568

569
  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
270,770✔
570
  if (pSub->consumerHash == NULL) {
270,770✔
571
    return NULL;
×
572
  }
573
  taosHashSetFreeFp(pSub->consumerHash, freeSMqConsumerEp);
270,770✔
574

575
  for (int32_t i = 0; i < sz; i++) {
409,033✔
576
    SMqConsumerEp consumerEp = {0};
138,263✔
577
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
138,263✔
578
    if (taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp)) !=
138,263✔
579
        0)
580
      return NULL;
×
581
  }
582

583
  int32_t len = 0;
270,770✔
584
  buf = taosDecodeFixedI32(buf, &len);
270,770✔
585
  pSub->unassignedVgs = taosArrayInit(len, sizeof(int32_t));
270,770✔
586
  if (pSub->unassignedVgs == NULL) {
270,770✔
587
    return NULL;
×
588
  }
589
  for (int32_t i = 0; i < len; i++) {
731,185✔
590
    int32_t* vgId = taosArrayReserve(pSub->unassignedVgs, 1);
460,415✔
591
    if (vgId == NULL) {
460,415✔
592
      return NULL;
×
593
    }
594
    if (sver <= 3) {
460,415✔
595
      buf = tDecodeSMqVgEp(buf, vgId, sver);
×
596
    } else {
597
      buf = taosDecodeFixedI32(buf, vgId);
460,415✔
598
    }
599
  }
600

601
  buf = taosDecodeStringTo(buf, pSub->dbName);
270,770✔
602

603
  if (sver > 1) {
270,770✔
604
    buf = tDecodeOffRows(buf, &pSub->offsetRows, sver);
270,770✔
605
  }
606
  return (void *)buf;
270,770✔
607
}
608

609
int32_t mndInitConfigObj(SConfigItem *pItem, SConfigObj *pObj) {
38,122,620✔
610
  tstrncpy(pObj->name, pItem->name, CFG_NAME_MAX_LEN);
38,122,620✔
611
  pObj->dtype = pItem->dtype;
38,122,620✔
612
  switch (pItem->dtype) {
38,122,620✔
613
    case CFG_DTYPE_NONE:
×
614
      break;
×
615
    case CFG_DTYPE_BOOL:
9,298,200✔
616
      pObj->bval = pItem->bval;
9,298,200✔
617
      break;
9,298,200✔
618
    case CFG_DTYPE_INT32:
18,596,400✔
619
      pObj->i32 = pItem->i32;
18,596,400✔
620
      break;
18,596,400✔
621
    case CFG_DTYPE_INT64:
2,169,580✔
622
      pObj->i64 = pItem->i64;
2,169,580✔
623
      break;
2,169,580✔
624
    case CFG_DTYPE_FLOAT:
619,880✔
625
    case CFG_DTYPE_DOUBLE:
626
      pObj->fval = pItem->fval;
619,880✔
627
      break;
619,880✔
628
    case CFG_DTYPE_STRING:
7,438,560✔
629
    case CFG_DTYPE_DIR:
630
    case CFG_DTYPE_LOCALE:
631
    case CFG_DTYPE_CHARSET:
632
    case CFG_DTYPE_TIMEZONE:
633
      pObj->str = taosStrdup(pItem->str);
7,438,560✔
634
      if (pObj->str == NULL) {
7,438,560✔
635
        taosMemoryFree(pObj);
×
636
        return TSDB_CODE_OUT_OF_MEMORY;
×
637
      }
638
      break;
7,438,560✔
639
  }
640
  return TSDB_CODE_SUCCESS;
38,122,620✔
641
}
642

643
int32_t mndUpdateObj(SConfigObj *pObjNew, const char *name, char *value) {
11,163✔
644
  int32_t code = 0;
11,163✔
645
  switch (pObjNew->dtype) {
11,163✔
646
    case CFG_DTYPE_BOOL: {
1,909✔
647
      bool tmp = false;
1,909✔
648
      if (strcasecmp(value, "true") == 0) {
1,909✔
649
        tmp = true;
×
650
      }
651
      if (taosStr2Int32(value, NULL, 10) > 0) {
1,909✔
652
        tmp = true;
1,276✔
653
      }
654
      pObjNew->bval = tmp;
1,909✔
655
      break;
1,909✔
656
    }
657
    case CFG_DTYPE_INT32: {
9,254✔
658
      int32_t ival;
9,254✔
659
      TAOS_CHECK_RETURN(taosStrHumanToInt32(value, &ival));
9,254✔
660
      pObjNew->i32 = ival;
9,254✔
661
      break;
9,254✔
662
    }
663
    case CFG_DTYPE_INT64: {
×
664
      int64_t ival;
×
665
      TAOS_CHECK_RETURN(taosStrHumanToInt64(value, &ival));
×
666
      pObjNew->i64 = ival;
×
667
      break;
×
668
    }
669
    case CFG_DTYPE_FLOAT:
×
670
    case CFG_DTYPE_DOUBLE: {
671
      float dval = 0;
×
672
      TAOS_CHECK_RETURN(parseCfgReal(value, &dval));
×
673
      pObjNew->fval = dval;
×
674
      break;
×
675
    }
676
    case CFG_DTYPE_DIR:
×
677
    case CFG_DTYPE_TIMEZONE:
678
    case CFG_DTYPE_CHARSET:
679
    case CFG_DTYPE_LOCALE:
680
    case CFG_DTYPE_STRING: {
681
      pObjNew->str = taosStrdup(value);
×
682
      if (pObjNew->str == NULL) {
×
683
        code = terrno;
×
684
        return code;
×
685
      }
686
      break;
×
687
    }
688
    case CFG_DTYPE_NONE:
×
689
      break;
×
690
    default:
×
691
      code = TSDB_CODE_INVALID_CFG;
×
692
      break;
×
693
  }
694
  return code;
11,163✔
695
}
696

697
SConfigObj mndInitConfigVersion() {
310,748✔
698
  SConfigObj obj;
309,940✔
699
  memset(&obj, 0, sizeof(SConfigObj));
310,748✔
700

701
  tstrncpy(obj.name, "tsmmConfigVersion", CFG_NAME_MAX_LEN);
310,748✔
702
  obj.dtype = CFG_DTYPE_INT32;
310,748✔
703
  obj.i32 = 0;
310,748✔
704
  return obj;
310,748✔
705
}
706

707
int32_t tEncodeSConfigObj(SEncoder *pEncoder, const SConfigObj *pObj) {
271,956,738✔
708
  TAOS_CHECK_RETURN(tStartEncode(pEncoder));
271,956,738✔
709
  TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->name));
543,913,476✔
710

711
  TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->dtype));
543,913,476✔
712
  switch (pObj->dtype) {
271,956,738✔
713
    case CFG_DTYPE_BOOL:
65,781,802✔
714
      TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pObj->bval));
131,563,604✔
715
      break;
65,781,802✔
716
    case CFG_DTYPE_INT32:
133,824,746✔
717
      TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pObj->i32));
267,649,492✔
718
      break;
133,824,746✔
719
    case CFG_DTYPE_INT64:
15,347,010✔
720
      TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pObj->i64));
30,694,020✔
721
      break;
15,347,010✔
722
    case CFG_DTYPE_FLOAT:
4,384,860✔
723
    case CFG_DTYPE_DOUBLE:
724
      TAOS_CHECK_RETURN(tEncodeFloat(pEncoder, pObj->fval));
8,769,720✔
725
      break;
4,384,860✔
726
    case CFG_DTYPE_STRING:
52,618,320✔
727
    case CFG_DTYPE_DIR:
728
    case CFG_DTYPE_LOCALE:
729
    case CFG_DTYPE_CHARSET:
730
    case CFG_DTYPE_TIMEZONE:
731
      if (pObj->str != NULL) {
52,618,320✔
732
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pObj->str));
105,236,640✔
733
      } else {
734
        TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, ""));
×
735
      }
736
      break;
52,618,320✔
737
    default:
×
738
      break;
×
739
  }
740
  tEndEncode(pEncoder);
271,956,738✔
741
  return pEncoder->pos;
271,956,738✔
742
}
743

744
int32_t tDecodeSConfigObj(SDecoder *pDecoder, SConfigObj *pObj) {
50,086,156✔
745
  TAOS_CHECK_RETURN(tStartDecode(pDecoder));
50,086,156✔
746
  TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pObj->name));
50,086,156✔
747
  TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pObj->dtype));
100,172,312✔
748
  switch (pObj->dtype) {
50,086,156✔
749
    case CFG_DTYPE_NONE:
×
750
      break;
×
751
    case CFG_DTYPE_BOOL:
12,110,438✔
752
      TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pObj->bval));
12,110,438✔
753
      break;
12,110,438✔
754
    case CFG_DTYPE_INT32:
24,658,436✔
755
      TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pObj->i32));
49,316,872✔
756
      break;
24,658,436✔
757
    case CFG_DTYPE_INT64:
2,824,878✔
758
      TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pObj->i64));
5,649,756✔
759
      break;
2,824,878✔
760
    case CFG_DTYPE_FLOAT:
807,108✔
761
    case CFG_DTYPE_DOUBLE:
762
      TAOS_CHECK_RETURN(tDecodeFloat(pDecoder, &pObj->fval));
1,614,216✔
763
      break;
807,108✔
764
    case CFG_DTYPE_STRING:
9,685,296✔
765
    case CFG_DTYPE_DIR:
766
    case CFG_DTYPE_LOCALE:
767
    case CFG_DTYPE_CHARSET:
768
    case CFG_DTYPE_TIMEZONE:
769
      TAOS_CHECK_RETURN(tDecodeCStrAlloc(pDecoder, &pObj->str));
19,370,592✔
770
      break;
9,685,296✔
771
  }
772
  tEndDecode(pDecoder);
50,086,156✔
773
  TAOS_RETURN(TSDB_CODE_SUCCESS);
50,086,156✔
774
}
775

776
void tFreeSConfigObj(SConfigObj *obj) {
88,541,850✔
777
  if (obj == NULL) {
88,541,850✔
778
    return;
×
779
  }
780
  if (obj->dtype == CFG_DTYPE_STRING || obj->dtype == CFG_DTYPE_DIR || obj->dtype == CFG_DTYPE_LOCALE ||
88,541,850✔
781
      obj->dtype == CFG_DTYPE_CHARSET || obj->dtype == CFG_DTYPE_TIMEZONE) {
72,844,982✔
782
    taosMemoryFree(obj->str);
17,123,856✔
783
  }
784
}
785

786
// SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
787
//   SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
788
//   if (pEntryNew == NULL) return NULL;
789
//   pEntryNew->epoch = pEntry->epoch;
790
//   pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
791
//   return pEntryNew;
792
// }
793
//
794
// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
795
//   taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
796
// }
797

798
// int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
799
//   int32_t tlen = 0;
800
//   tlen += taosEncodeFixedI32(buf, pEntry->epoch);
801
//   tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
802
//   return tlen;
803
// }
804
//
805
// void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
806
//   buf = taosDecodeFixedI32(buf, &pEntry->epoch);
807
//   buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
808
//   return (void *)buf;
809
// }
810

811
// SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
812
//   SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
813
//   if (pLogNew == NULL) return pLogNew;
814
//   memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
815
//   pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
816
//   return pLogNew;
817
// }
818
//
819
// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
820
//   taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
821
// }
822

823
// int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
824
//   int32_t tlen = 0;
825
//   tlen += taosEncodeString(buf, pLog->key);
826
//   tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
827
//   return tlen;
828
// }
829
//
830
// void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
831
//   buf = taosDecodeStringTo(buf, pLog->key);
832
//   buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
833
//   return (void *)buf;
834
// }
835
//
836
// int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
837
//   int32_t tlen = 0;
838
//   tlen += taosEncodeString(buf, pOffset->key);
839
//   tlen += taosEncodeFixedI64(buf, pOffset->offset);
840
//   return tlen;
841
// }
842
//
843
// void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
844
//   buf = taosDecodeStringTo(buf, pOffset->key);
845
//   buf = taosDecodeFixedI64(buf, &pOffset->offset);
846
//   return buf;
847
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc