• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5084

17 May 2026 01:15AM UTC coverage: 73.366% (-0.01%) from 73.377%
#5084

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281574 of 383795 relevant lines covered (73.37%)

136040085.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.12
/source/dnode/vnode/src/tq/tqScan.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "taoserror.h"
17
#include "tarray.h"
18
#include "tdef.h"
19
#include "thash.h"
20
#include "tmsg.h"
21
#include "tpriv.h"
22
#include "tq.h"
23

24
static int32_t tqRetrieveCols(STqReader *pReader, SSDataBlock *pRes, SHashObj* pCol2SlotId);
25
static int32_t tqAddTableListForStbSub(STqHandle* pTqHandle, STQ* pTq, const SArray* tbUidList, int64_t version);
26
static void    tqAlterTagForStbSub(SVnode *pVnode, const SArray* tbUidList, const SArray* tags, const SArray* tagsArray, STqHandle* pTqHandle, int64_t version);
27
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
28
                                 const SMqMetaRsp* pRsp, int32_t vgId);
29
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
30
                                      const SMqBatchMetaRsp* pRsp, int32_t vgId);
31
                                      
32
static void tqProcessCreateTbMsg(SDecoder* dcoder, SWalCont* pHead, STQ* pTq, STqHandle* pHandle, int64_t* realTbSuid, int64_t tbSuid) {
69,068✔
33
  int32_t code = 0;
69,068✔
34
  int32_t lino = 0;
69,068✔
35
  SVCreateTbReq* pCreateReq = NULL;
69,068✔
36
  SVCreateTbBatchReq reqNew = {0};
69,068✔
37
  void* buf = NULL;
69,068✔
38
  SArray *tbUids = NULL;
69,068✔
39
  SVCreateTbBatchReq req = {0};
69,068✔
40
  code = tDecodeSVCreateTbBatchReq(dcoder, &req);
69,068✔
41
  TSDB_CHECK_CODE(code, lino, end);
69,068✔
42

43
  STqExecHandle* pExec = &pHandle->execHandle;
69,068✔
44
  STqReader* pReader = pExec->pTqReader;
69,068✔
45

46
  tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
69,068✔
47
  TSDB_CHECK_NULL(tbUids, code, lino, end, terrno);
69,068✔
48
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
163,753✔
49
    pCreateReq = req.pReqs + iReq;
94,685✔
50
    if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) && pCreateReq->ctb.suid == tbSuid) {
94,685✔
51
      TSDB_CHECK_NULL(taosArrayPush(tbUids, &pCreateReq->uid), code, lino, end, terrno);
168,170✔
52
    }
53
  }
54
  TSDB_CHECK_CONDITION(taosArrayGetSize(tbUids) != 0, code, lino, end, TSDB_CODE_SUCCESS);
69,068✔
55

56
  taosWLockLatch(&pTq->lock);
59,788✔
57
  tqReaderRemoveTbUidList(pHandle->execHandle.pTqReader, tbUids);
59,788✔
58
  code = tqAddTableListForStbSub(pHandle, pTq, tbUids, pHead->version);
59,788✔
59
  taosWUnLockLatch(&pTq->lock);
59,788✔
60
  TSDB_CHECK_CODE(code, lino, end);
59,788✔
61

62
  reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
59,788✔
63
  TSDB_CHECK_NULL(reqNew.pArray, code, lino, end, terrno);
59,788✔
64
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
145,193✔
65
    pCreateReq = req.pReqs + iReq;
85,405✔
66
    if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) &&
85,405✔
67
        pCreateReq->ctb.suid == tbSuid &&
169,490✔
68
        taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
84,085✔
69
      TSDB_CHECK_NULL(taosArrayPush(reqNew.pArray, pCreateReq), code, lino, end, terrno);
135,632✔
70
      reqNew.nReqs++;
67,816✔
71
    }
72
  }
73

74
  TSDB_CHECK_CONDITION(reqNew.nReqs != 0, code, lino, end, TSDB_CODE_SUCCESS);
59,788✔
75
  *realTbSuid = tbSuid;
52,806✔
76

77
  int     tlen = 0;
52,806✔
78
  tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, code);
52,806✔
79
  TSDB_CHECK_CODE(code, lino, end);
52,806✔
80

81
  buf = taosMemoryMalloc(tlen);
52,806✔
82
  TSDB_CHECK_NULL(buf, code, lino, end, terrno);
52,806✔
83

84
  SEncoder coderNew = {0};
52,806✔
85
  tEncoderInit(&coderNew, buf, tlen);
52,806✔
86
  code = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
52,806✔
87
  tEncoderClear(&coderNew);
52,806✔
88
  TSDB_CHECK_CODE(code, lino, end);
52,806✔
89

90
  if (tlen + sizeof(SMsgHead) > pHead->bodyLen) {
52,806✔
91
    tqError("vgId:%d, %s failed at %s:%d since buffer overflow", TD_VID(pTq->pVnode), __func__, __FILE__, __LINE__);
×
92
  } else {
93
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
52,806✔
94
    pHead->bodyLen = tlen + sizeof(SMsgHead);
52,806✔
95
  }
96

97
end:
69,068✔
98
  taosMemoryFree(buf);
69,068✔
99
  taosArrayDestroy(reqNew.pArray);
69,068✔
100
  tDeleteSVCreateTbBatchReq(&req);
69,068✔
101
  taosArrayDestroy(tbUids);
69,068✔
102
  if (code < 0) {
69,068✔
103
    tqError("tqProcessCreateTbMsg failed, code:%d, line:%d", code, lino);
×
104
  }
105
}
69,068✔
106

107
static int32_t tqGetUidSuid(SMeta* pMeta, const char* tbName, int64_t* uid, int64_t* suid){
14,461✔
108
  SMetaReader mr = {0};
14,461✔
109

110
  metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
14,461✔
111
  int32_t code = metaGetTableEntryByName(&mr, tbName);
14,461✔
112
  if (code == 0) {
14,461✔
113
    *uid = mr.me.uid;
12,999✔
114
    *suid = mr.me.ctbEntry.suid;
12,999✔
115
  } else {
116
    tqError("tqGetUidSuid failed at %s:%d since table %s not found, code:%d", __FILE__, __LINE__, tbName, code);
1,462✔
117
  }
118

119
  metaReaderClear(&mr);
14,461✔
120
  return code;
14,461✔
121
}
122

123
// Cache entry for uid lookup results, avoiding duplicate tqGetUidSuid calls
124
typedef struct {
125
  int64_t uid;
126
  bool    valid;   // true if tqGetUidSuid succeeded and suid matches tbSuid
127
} SAlterTagUidCache;
128

129
static void tqAlterMultiTag(SVAlterTbReq* req, SWalCont* pHead, STQ* pTq, STqHandle* pHandle, int64_t* realTbSuid, int64_t tbSuid) {
6,943✔
130
  int32_t lino = 0;
6,943✔
131
  int32_t code = 0;
6,943✔
132
  SVAlterTbReq reqNew = {0};
6,943✔
133
  SArray* uidList = NULL;
6,943✔
134
  SArray* tagListArray = NULL;
6,943✔
135
  SAlterTagUidCache* uidCache = NULL;
6,943✔
136
  void* buf = NULL;
6,943✔
137

138
  STqExecHandle* pExec = &pHandle->execHandle;
6,943✔
139
  STqReader* pReader = pExec->pTqReader;
6,943✔
140

141
  int32_t nTables = taosArrayGetSize(req->tables);
6,943✔
142
  uidList = taosArrayInit(nTables, sizeof(tb_uid_t));
6,943✔
143
  TSDB_CHECK_NULL(uidList, code, lino, end, terrno);
6,943✔
144

145
  tagListArray = taosArrayInit(nTables, sizeof(void*));
6,943✔
146
  TSDB_CHECK_NULL(tagListArray, code, lino, end, terrno);
6,943✔
147

148
  // Cache uid lookup results for reuse in the second pass
149
  uidCache = taosMemoryCalloc(nTables, sizeof(SAlterTagUidCache));
6,943✔
150
  TSDB_CHECK_NULL(uidCache, code, lino, end, terrno);
6,943✔
151

152
  // First pass: resolve uid/suid once per table, collect for tag update notification
153
  for (int32_t i = 0; i < nTables; i++) {
16,508✔
154
    SUpdateTableTagVal *pTable = taosArrayGet(req->tables, i);
9,565✔
155
    if (pTable == NULL || pTable->tbName == NULL) {
9,565✔
156
      continue;
×
157
    }
158
    int64_t uid = 0;
9,565✔
159
    int64_t suid = 0;
9,565✔
160
    int32_t ret = tqGetUidSuid(pTq->pVnode->pMeta, pTable->tbName, &uid, &suid);
9,565✔
161
    if (ret != 0) {
9,565✔
162
      tqError("vgId:%d, %s failed at %s:%d since table %s not found", TD_VID(pTq->pVnode), __func__, __FILE__, __LINE__, pTable->tbName);
1,462✔
163
      continue;
1,462✔
164
    }
165
    if (suid != tbSuid) continue;
8,103✔
166

167
    uidCache[i].uid = uid;
8,103✔
168
    uidCache[i].valid = true;
8,103✔
169

170
    if (taosArrayPush(uidList, &uid) == NULL) {
8,103✔
171
      tqError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pTq->pVnode), __func__, __FILE__, __LINE__, tstrerror(terrno));
×
172
      continue;
×
173
    }
174
    if (taosArrayPush(tagListArray, &pTable->tags) == NULL){
16,206✔
175
      void* popRet = taosArrayPop(uidList);  // keep uidList and tagListArray in sync
×
176
      tqError("vgId:%d, %s failed at %s:%d since %s, ret:%p", TD_VID(pTq->pVnode), __func__, __FILE__, __LINE__, tstrerror(terrno), popRet);
×
177
      continue;
×
178
    }
179
  }
180

181
  // tqAlterTagForStbSub may modify pReader->tbIdHash, so must run before the second pass
182
  if (taosArrayGetSize(uidList) > 0) {
6,943✔
183
    tqAlterTagForStbSub(pTq->pVnode, uidList, NULL, tagListArray, pHandle, pHead->version);
6,141✔
184
  }
185

186
  // Build filtered message
187
  reqNew.action = req->action;
6,943✔
188
  reqNew.tbName = req->tbName;
6,943✔
189
  reqNew.tables = taosArrayInit(nTables, sizeof(SUpdateTableTagVal));
6,943✔
190
  TSDB_CHECK_NULL(reqNew.tables, code, lino, end, terrno);
6,943✔
191

192
  // Second pass: filter subscribed tables using cached uid (no repeated tqGetUidSuid calls)
193
  for (int32_t i = 0; i < nTables; i++) {
16,508✔
194
    if (!uidCache[i].valid) continue;
9,565✔
195
    SUpdateTableTagVal* pTable = taosArrayGet(req->tables, i);
8,103✔
196
    if (pTable == NULL) continue;
8,103✔
197

198
    if (taosHashGet(pReader->tbIdHash, &uidCache[i].uid, sizeof(int64_t)) != NULL) {
8,103✔
199
      TSDB_CHECK_NULL(taosArrayPush(reqNew.tables, pTable), code, lino, end, terrno);
12,398✔
200
    }
201
  }
202

203
  TSDB_CHECK_CONDITION(taosArrayGetSize(reqNew.tables) != 0, code, lino, end, TSDB_CODE_SUCCESS);
6,943✔
204

205
  *realTbSuid = tbSuid;
4,975✔
206

207
  // Encode filtered message
208
  int tlen = 0;
4,975✔
209
  tEncodeSize(tEncodeSVAlterTbReq, &reqNew, tlen, code);
4,975✔
210
  TSDB_CHECK_CODE(code, lino, end);
4,975✔
211

212
  buf = taosMemoryMalloc(tlen);
4,975✔
213
  TSDB_CHECK_NULL(buf, code, lino, end, terrno);
4,975✔
214

215
  SEncoder coderNew = {0};
4,975✔
216
  tEncoderInit(&coderNew, buf, tlen);
4,975✔
217
  code = tEncodeSVAlterTbReq(&coderNew, &reqNew);
4,975✔
218
  tEncoderClear(&coderNew);
4,975✔
219
  TSDB_CHECK_CODE(code, lino, end);
4,975✔
220

221
  if (tlen + sizeof(SMsgHead) > pHead->bodyLen) {
4,975✔
222
    tqError("vgId:%d, %s failed at %s:%d since buffer overflow", TD_VID(pTq->pVnode), __func__, __FILE__, __LINE__);
3,577✔
223
  } else {
224
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
1,398✔
225
    pHead->bodyLen = tlen + sizeof(SMsgHead);
1,398✔
226
  }
227

228
end:
6,943✔
229
  taosArrayDestroy(uidList);
6,943✔
230
  taosArrayDestroy(tagListArray);
6,943✔
231
  taosMemoryFree(uidCache);
6,943✔
232
  taosMemoryFree(buf);
6,943✔
233
  taosArrayDestroy(reqNew.tables);
6,943✔
234
  if (code != 0) {
6,943✔
235
    tqError("%s failed, code:%d, line:%d", __func__, code, lino);
×
236
  }
237
}
6,943✔
238

239
static void tqProcessAlterTbMsg(SDecoder* dcoder, SWalCont* pHead, STQ* pTq, STqHandle* pHandle, int64_t* realTbSuid, int64_t tbSuid) {
13,975✔
240
  SVAlterTbReq req = {0};
13,975✔
241
  SArray* uidList = NULL;
13,975✔
242
  int32_t lino = 0;
13,975✔
243
  int32_t code = tDecodeSVAlterTbReq(dcoder, &req);
13,975✔
244
  TSDB_CHECK_CODE(code, lino, end);
13,975✔
245

246
  if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
13,975✔
247
    tqAlterMultiTag(&req, pHead, pTq, pHandle, realTbSuid, tbSuid);
6,943✔
248
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
7,032✔
249
    ETableType tbType = 0;
2,136✔
250
    uint64_t suid = 0;
2,136✔
251
    STREAM_CHECK_RET_GOTO(metaGetTableTypeSuidByName(pTq->pVnode, req.tbName, &tbType, &suid));
2,136✔
252
    if (tbType != TSDB_SUPER_TABLE) {
2,136✔
253
      tqError("%s failed at line:%d since table %s is not super table, code:%d", __func__, lino, req.tbName, code);
×
254
      goto end;
×
255
    }
256
    SNode* pTagCond = getTagCondNodeForStableTmq(pHandle->execHandle.execTb.node);
2,136✔
257
    if (pTagCond != NULL) {
2,136✔
258
      uidList = taosArrayInit(8, sizeof(uint64_t));
660✔
259
      STREAM_CHECK_NULL_GOTO(uidList, terrno);
660✔
260
      STREAM_CHECK_RET_GOTO(vnodeGetCtbIdList(pTq->pVnode, suid, uidList));
660✔
261
      tqAlterTagForStbSub(pTq->pVnode, uidList, req.pMultiTag, NULL, pHandle, pHead->version);
660✔
262
    }
263
    *realTbSuid = suid;
2,136✔
264
  } else {
265
    int64_t uid = 0;
4,896✔
266
    int64_t suid = 0;
4,896✔
267
    code = tqGetUidSuid(pTq->pVnode->pMeta, req.tbName, &uid, &suid);
4,896✔
268
    TSDB_CHECK_CODE(code, lino, end);
4,896✔
269

270
    STqExecHandle* pExec = &pHandle->execHandle;
4,896✔
271
    STqReader* pReader = pExec->pTqReader;
4,896✔
272
    if (taosHashGet(pReader->tbIdHash, &uid, sizeof(int64_t)) != NULL) {
4,896✔
273
      *realTbSuid = suid;
1,632✔
274
    }
275
  }
276

277
end:
13,975✔
278
  if (code != 0) {
13,975✔
279
    tqError("%s failed at line:%d, code:%s, table:%s", __func__, lino, tstrerror(code), req.tbName);
×
280
  }
281
  destroyAlterTbReq(&req);
13,975✔
282
  taosArrayDestroy(uidList);
13,975✔
283
} 
13,975✔
284

285
static void tqProcessDropTbMsg(SDecoder* dcoder, SWalCont* pHead, STqHandle* pHandle, int64_t* realTbSuid, int64_t tbSuid) {
1,792✔
286
  SVDropTbBatchReq req = {0};
1,792✔
287
  SVDropTbBatchReq reqNew = {0};
1,792✔
288
  void* buf = NULL;
1,792✔
289
  int32_t lino = 0;
1,792✔
290
  int32_t code = tDecodeSVDropTbBatchReq(dcoder, &req);
1,792✔
291
  TSDB_CHECK_CODE(code, lino, end);
1,792✔
292

293
  STqExecHandle* pExec = &pHandle->execHandle;
1,792✔
294
  STqReader* pReader = pExec->pTqReader;
1,792✔
295

296
  reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
1,792✔
297
  TSDB_CHECK_NULL(reqNew.pArray, code, lino, end, terrno);
1,792✔
298
  SVDropTbReq* pDropReq = NULL;
1,792✔
299
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
4,574✔
300
    pDropReq = req.pReqs + iReq;
2,782✔
301
    if (pDropReq->suid == tbSuid && taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
2,782✔
302
      reqNew.nReqs++;
2,051✔
303
      TSDB_CHECK_NULL(taosArrayPush(reqNew.pArray, pDropReq), code, lino, end, terrno);
4,102✔
304
    }
305
  }
306

307
  TSDB_CHECK_CONDITION(taosArrayGetSize(reqNew.pArray) != 0, code, lino, end, TSDB_CODE_SUCCESS);
1,792✔
308

309
  *realTbSuid = tbSuid;
1,391✔
310
  int     tlen = 0;
1,391✔
311
  tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, code);
1,391✔
312
  TSDB_CHECK_CODE(code, lino, end);
1,391✔
313

314
  buf = taosMemoryMalloc(tlen);
1,391✔
315
  TSDB_CHECK_NULL(buf, code, lino, end, terrno);
1,391✔
316

317
  SEncoder coderNew = {0};
1,391✔
318
  tEncoderInit(&coderNew, buf, tlen);
1,391✔
319
  code = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
1,391✔
320
  tEncoderClear(&coderNew);
1,391✔
321
  TSDB_CHECK_CODE(code, lino, end);
1,391✔
322

323
  if (tlen + sizeof(SMsgHead) > pHead->bodyLen) {
1,391✔
324
    tqError("%s failed at %s:%d since buffer overflow", __func__, __FILE__, __LINE__);
×
325
  } else {
326
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
1,391✔
327
    pHead->bodyLen = tlen + sizeof(SMsgHead);
1,391✔
328
  }
329

330
end:
1,792✔
331
  taosMemoryFree(buf);
1,792✔
332
  taosArrayDestroy(reqNew.pArray);
1,792✔
333
  if (code != 0) {
1,792✔
334
    tqError("%s failed, code:%d, line:%d", __func__, code, lino);
×
335
  }
336
}
1,792✔
337

338
static bool tqProcessMetaForStbSub(STQ* pTq, STqHandle* pHandle, SWalCont* pHead) {
309,496✔
339
  int32_t code = 0;
309,496✔
340
  int32_t lino = 0;
309,496✔
341
  if (pHandle == NULL || pHead == NULL) {
309,496✔
342
    return false;
×
343
  }
344
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
309,826✔
345
    return true;
200,448✔
346
  }
347

348
  int16_t msgType = pHead->msgType;
109,048✔
349
  char*   body = pHead->body;
109,048✔
350
  int32_t bodyLen = pHead->bodyLen;
109,048✔
351

352
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
109,048✔
353
  int64_t  realTbSuid = 0;
109,048✔
354
  SDecoder dcoder = {0};
109,048✔
355
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
109,048✔
356
  int32_t  len = bodyLen - sizeof(SMsgHead);
109,048✔
357
  tDecoderInit(&dcoder, data, len);
109,048✔
358

359
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
133,261✔
360
    SVCreateStbReq req = {0};
24,213✔
361
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
24,213✔
362
      goto end;
×
363
    }
364
    realTbSuid = req.suid;
24,213✔
365
  } else if (msgType == TDMT_VND_DROP_STB) {
84,835✔
366
    SVDropStbReq req = {0};
×
367
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
368
      goto end;
×
369
    }
370
    realTbSuid = req.suid;
×
371
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
84,835✔
372
    tqProcessCreateTbMsg(&dcoder, pHead, pTq, pHandle, &realTbSuid, tbSuid);
69,068✔
373
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
15,767✔
374
    tqProcessAlterTbMsg(&dcoder, pHead, pTq, pHandle, &realTbSuid, tbSuid);
13,975✔
375
  } else if (msgType == TDMT_VND_DROP_TABLE) {
1,792✔
376
    tqProcessDropTbMsg(&dcoder, pHead, pHandle, &realTbSuid, tbSuid);
1,792✔
377
  } else if (msgType == TDMT_VND_DELETE) {
×
378
    SDeleteRes req = {0};
×
379
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
380
      goto end;
×
381
    }
382
    realTbSuid = req.suid;
×
383
  }
384

385
end:
109,048✔
386
  tDecoderClear(&dcoder);
109,048✔
387
  bool tmp = tbSuid == realTbSuid;
109,048✔
388
  if (pHandle->fetchMeta == ONLY_DATA){
109,048✔
389
    tmp = false;
77,376✔
390
  }
391
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
109,048✔
392
  return tmp;
109,048✔
393
}
394

395
static int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
213,758,180✔
396
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
213,758,180✔
397
    return -1;
×
398
  }
399
  int32_t code = -1;
213,817,292✔
400
  int32_t vgId = TD_VID(pTq->pVnode);
213,817,292✔
401
  int64_t id = pHandle->pWalReader->readerId;
213,818,507✔
402

403
  int64_t offset = *fetchOffset;
213,823,988✔
404
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
213,819,131✔
405
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
213,800,198✔
406
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
213,805,450✔
407

408
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
213,792,663✔
409
          ", 0x%" PRIx64,
410
          vgId, offset, lastVer, committedVer, appliedVer, id);
411

412
  while (offset <= appliedVer) {
217,332,998✔
413
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
56,183,600✔
414
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
415
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
416
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
417
      goto END;
×
418
    }
419

420
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
56,183,211✔
421
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
422

423
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
56,182,856✔
424
      code = walFetchBody(pHandle->pWalReader);
52,468,430✔
425
      goto END;
52,467,365✔
426
    } else {
427
      if (pHandle->fetchMeta != ONLY_DATA || pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
3,715,170✔
428
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
794,317✔
429
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
794,317✔
430
          code = walFetchBody(pHandle->pWalReader);
309,826✔
431
          if (code < 0) {
309,826✔
432
            goto END;
×
433
          }
434

435
          pHead = &(pHandle->pWalReader->pHead->head);
309,826✔
436
          if (tqProcessMetaForStbSub(pTq, pHandle, pHead)) {
309,826✔
437
            code = 0;
219,430✔
438
            goto END;
219,430✔
439
          } else {
440
            offset++;
90,066✔
441
            code = -1;
90,066✔
442
            continue;
90,066✔
443
          }
444
        }
445
      }
446
      code = walSkipFetchBody(pHandle->pWalReader);
3,405,344✔
447
      if (code < 0) {
3,405,344✔
448
        goto END;
×
449
      }
450
      offset++;
3,405,344✔
451
    }
452
    code = -1;
3,405,344✔
453
  }
454

455
END:
161,149,398✔
456
  *fetchOffset = offset;
213,836,193✔
457
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
213,825,847✔
458
          ", applied:%" PRId64 ", 0x%" PRIx64,
459
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
460
  return code;
213,837,221✔
461
}
462

463
bool tqGetTablePrimaryKey(STqReader* pReader) {
7,015,814✔
464
  if (pReader == NULL) {
7,015,814✔
465
    return false;
×
466
  }
467
  return pReader->hasPrimaryKey;
7,015,814✔
468
}
469

470
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
53,934✔
471
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
53,934✔
472

473
  if (pReader == NULL) {
53,934✔
474
    return;
×
475
  }
476
  bool            ret = false;
53,934✔
477
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnode->pMeta, uid, -1, 1, NULL, 0, false);
53,934✔
478
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
53,934✔
479
    ret = true;
1,105✔
480
  }
481
  tDeleteSchemaWrapper(schema);
482
  pReader->hasPrimaryKey = ret;
53,567✔
483
}
484

485
static void tqFreeTagCache(void* pData){
1,945,575✔
486
  if (pData == NULL) return;
1,945,575✔
487
  SArray* tagCache = *(SArray**)pData;
1,945,575✔
488
  taosArrayDestroyP(tagCache, taosMemFree);
1,945,575✔
489
}
490

491
STqReader* tqReaderOpen(SVnode* pVnode) {
536,549✔
492
  tqDebug("%s:%p", __FUNCTION__, pVnode);
536,549✔
493
  if (pVnode == NULL) {
536,907✔
494
    return NULL;
×
495
  }
496
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
536,907✔
497
  if (pReader == NULL) {
536,907✔
498
    return NULL;
×
499
  }
500

501
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
536,907✔
502
  if (pReader->pWalReader == NULL) {
536,907✔
503
    taosMemoryFree(pReader);
×
504
    return NULL;
×
505
  }
506

507
  pReader->pVnode = pVnode;
536,907✔
508
  pReader->pSchemaWrapper = NULL;
536,907✔
509
  pReader->tbIdHash = NULL;
536,907✔
510
  pReader->pTableTagCacheForTmq = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
536,907✔
511
  if (pReader->pTableTagCacheForTmq == NULL) {
536,907✔
512
    walCloseReader(pReader->pWalReader);
×
513
    taosMemoryFree(pReader);
×
514
    return NULL;
×
515
  }
516
  taosHashSetFreeFp(pReader->pTableTagCacheForTmq, tqFreeTagCache);
536,907✔
517
  taosInitRWLatch(&pReader->tagCachelock);
536,907✔
518

519
  return pReader;
536,907✔
520
}
521

522
void tqReaderClose(STqReader* pReader) {
542,354✔
523
  tqDebug("%s:%p", __FUNCTION__, pReader);
542,354✔
524
  if (pReader == NULL) return;
542,354✔
525

526
  // close wal reader
527
  walCloseReader(pReader->pWalReader);
536,907✔
528
  taosHashCleanup(pReader->pTableTagCacheForTmq);
536,907✔
529
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
536,907✔
530
  taosMemoryFree(pReader->pTSchema);
536,907✔
531
  taosMemoryFree(pReader->extSchema);
536,907✔
532

533
  // free hash
534
  taosHashCleanup(pReader->tbIdHash);
536,907✔
535
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
536,907✔
536

537
  taosMemoryFree(pReader);
536,907✔
538
}
539

540
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
8,286,317✔
541
  if (pReader == NULL) {
8,286,317✔
542
    return TSDB_CODE_INVALID_PARA;
×
543
  }
544
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
8,286,317✔
545
    return terrno;
8,005,908✔
546
  }
547
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
281,191✔
548
  return 0;
281,191✔
549
}
550

551
static int32_t tqGetTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid) {
354,342,224✔
552
  int32_t code = 0;
354,342,224✔
553
  int32_t lino = 0;
354,342,224✔
554

555
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
354,342,224✔
556
  if (data == NULL) {
354,360,999✔
557
    SStorageAPI api = {0}; 
44,062,346✔
558
    initStorageAPI(&api);
44,061,608✔
559
    code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, 0, &pReader->tagCachelock);
44,062,339✔
560
    TSDB_CHECK_CODE(code, lino, END);
44,063,412✔
561
  }
562

563
  END:
310,298,653✔
564
  if (code != TSDB_CODE_SUCCESS) {
354,340,990✔
565
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
566
  }
567
  
568
  return code;
354,333,873✔
569
}
570

571
void tqUpdateTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid, col_id_t colId) {
712,292✔
572
  int32_t code = 0;
712,292✔
573
  int32_t lino = 0;
712,292✔
574

575
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
712,292✔
576
  if (data == NULL) {
712,292✔
577
    return;
711,184✔
578
  }
579

580
  SStorageAPI api = {0}; 
1,108✔
581
  initStorageAPI(&api);
1,108✔
582
  code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, colId, &pReader->tagCachelock);
1,108✔
583
  TSDB_CHECK_CODE(code, lino, END);
1,108✔
584

585
  END:
1,108✔
586
  if (code != TSDB_CODE_SUCCESS) {
1,108✔
587
    tqError("%s failed at %d, failed to update tag cache code:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
588
  }
589
}
590

591
static int32_t tqRetrievePseudoCols(STqReader* pReader, SSDataBlock* pBlock, int32_t numOfRows, int64_t uid, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr) {
354,349,274✔
592
  if (pReader == NULL || pBlock == NULL) {
354,349,274✔
593
    return TSDB_CODE_INVALID_PARA;
×
594
  }
595
  int32_t code = TSDB_CODE_SUCCESS;
354,363,817✔
596
  int32_t lino = 0;
354,363,817✔
597
  
598
  code = tqGetTableTagCache(pReader, pPseudoExpr, numOfPseudoExpr, uid);
354,363,817✔
599
  TSDB_CHECK_CODE(code, lino, END);
354,341,130✔
600

601
  code = fillTag(pReader->pTableTagCacheForTmq, pPseudoExpr, numOfPseudoExpr, uid, pBlock, numOfRows, pBlock->info.rows - numOfRows, 1, &pReader->tagCachelock);
354,341,130✔
602
  TSDB_CHECK_CODE(code, lino, END);
354,353,213✔
603

604
END:
354,352,866✔
605
  if (code != 0) {
354,353,213✔
606
    tqError("tqRetrievePseudoCols failed, line:%d, msg:%s", lino, tstrerror(code));
347✔
607
  }
608
  return code;
354,340,182✔
609
}
610

611
int32_t tqNextBlockInWal(STqReader* pReader, SSDataBlock* pRes, SHashObj* pCol2SlotId, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
192,607,258✔
612
                         int sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
613
  int32_t code = 0;
192,607,258✔
614
  if (pReader == NULL) {
192,607,258✔
615
    return TSDB_CODE_INVALID_PARA;
×
616
  }
617
  SWalReader* pWalReader = pReader->pWalReader;
192,607,258✔
618

619
  int64_t st = taosGetTimestampMs();
192,608,752✔
620
  while (1) {
309,012,356✔
621
    code = walNextValidMsg(pWalReader, false);
501,621,108✔
622
    if (code != 0) {
501,647,212✔
623
      break;
191,760,115✔
624
    }
625

626
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
309,887,097✔
627
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
309,901,437✔
628
    int64_t ver = pWalReader->pHead->head.version;
309,911,072✔
629
    SDecoder decoder = {0};
309,905,646✔
630
    code = tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder);
309,902,670✔
631
    tDecoderClear(&decoder);
309,907,947✔
632
    if (code != 0) {
309,904,228✔
633
      return code;
×
634
    }
635
    pReader->nextBlk = 0;
309,904,228✔
636

637
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
309,906,877✔
638
    while (pReader->nextBlk < numOfBlocks) {
668,006,930✔
639
      tqDebug("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
358,103,514✔
640
              pReader->msg.ver);
641

642
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
358,122,480✔
643
      if (pSubmitTbData == NULL) {
358,135,252✔
644
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
645
                pReader->msg.ver);
646
        return terrno;
×
647
      }
648
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
358,135,252✔
649
        pReader->nextBlk += 1;
36,630✔
650
        continue;
36,630✔
651
      }
652
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
358,099,819✔
653
        tqDebug("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
354,373,946✔
654
        int32_t numOfRows = pRes->info.rows;
354,374,744✔
655
        code = tqRetrieveCols(pReader, pRes, pCol2SlotId);
354,374,744✔
656
        if (code != TSDB_CODE_SUCCESS) {
354,338,433✔
657
          return code;
×
658
        }
659
        code = tqRetrievePseudoCols(pReader, pRes, numOfRows, pSubmitTbData->uid, pPseudoExpr, numOfPseudoExpr);
354,338,433✔
660
        if (code != TSDB_CODE_SUCCESS) {
354,344,449✔
661
          return code;
347✔
662
        }
663

664
      }
665
      pReader->nextBlk += 1;
358,063,649✔
666
    }
667

668
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
309,899,931✔
669
    pReader->msg.msgStr = NULL;
309,885,218✔
670

671
    if (pRes->info.rows >= minPollRows || (enableReplay && pRes->info.rows > 0)){
309,843,507✔
672
      break;
673
    }
674
    int64_t elapsed = taosGetTimestampMs() - st;
309,230,543✔
675
    if (elapsed > timeout || elapsed < 0) {
309,230,543✔
676
      code = TSDB_CODE_TMQ_FETCH_TIMEOUT;
183,259✔
677
      terrno = code;
183,259✔
678
      break;
189,292✔
679
    }
680
  }
681
  return code;
192,606,791✔
682
}
683

684
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
362,343,278✔
685
  if (pReader == NULL) {
362,343,278✔
686
    return TSDB_CODE_INVALID_PARA;
×
687
  }
688
  pReader->msg.msgStr = msgStr;
362,343,278✔
689
  pReader->msg.msgLen = msgLen;
362,361,802✔
690
  pReader->msg.ver = ver;
362,353,053✔
691

692
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
362,372,889✔
693

694
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
362,372,889✔
695
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
362,332,356✔
696

697
  if (code != 0) {
362,374,478✔
698
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
699
  }
700

701
  return code;
362,374,999✔
702
}
703

704
void tqReaderClearSubmitMsg(STqReader* pReader) {
52,435,330✔
705
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
52,435,330✔
706
  pReader->nextBlk = 0;
52,440,011✔
707
  pReader->msg.msgStr = NULL;
52,455,024✔
708
}
52,441,601✔
709

710
SWalReader* tqGetWalReader(STqReader* pReader) {
397,464,055✔
711
  if (pReader == NULL) {
397,464,055✔
712
    return NULL;
×
713
  }
714
  return pReader->pWalReader;
397,464,055✔
715
}
716

717
int64_t tqGetResultBlockTime(STqReader* pReader) {
192,603,345✔
718
  if (pReader == NULL) {
192,603,345✔
719
    return 0;
×
720
  }
721
  return pReader->lastTs;
192,603,345✔
722
}
723

724
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
51,084,120✔
725
                    SExtSchema* extSrc) {
726
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
51,084,120✔
727
    return TSDB_CODE_INVALID_PARA;
26✔
728
  }
729
  int32_t code = 0;
51,095,484✔
730

731
  int32_t cnt = 0;
51,095,484✔
732
  for (int32_t i = 0; i < pSrc->nCols; i++) {
287,702,770✔
733
    cnt += mask[i];
236,616,340✔
734
  }
735

736
  pDst->nCols = cnt;
51,079,020✔
737
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
51,072,429✔
738
  if (pDst->pSchema == NULL) {
51,072,175✔
739
    return TAOS_GET_TERRNO(terrno);
×
740
  }
741

742
  int32_t j = 0;
51,077,613✔
743
  for (int32_t i = 0; i < pSrc->nCols; i++) {
287,752,747✔
744
    if (mask[i]) {
236,638,797✔
745
      pDst->pSchema[j++] = pSrc->pSchema[i];
236,652,409✔
746
      SColumnInfoData colInfo =
236,658,489✔
747
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
236,655,909✔
748
      if (extSrc != NULL) {
236,657,992✔
749
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
60,060✔
750
      }
751
      code = blockDataAppendColInfo(pBlock, &colInfo);
236,657,992✔
752
      if (code != 0) {
236,663,484✔
753
        return code;
×
754
      }
755
    }
756
  }
757
  return 0;
51,107,997✔
758
}
759

760
static int32_t tqDoSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
761
  int32_t code = 0;
×
762
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
763
    return TSDB_CODE_INVALID_PARA;
×
764
  }
765
  // TODO(yhDeng)
766
  if (COL_VAL_IS_VALUE(pColVal)) {
×
767
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
768
    if (val == NULL) {
×
769
      return terrno;
×
770
    }
771

772
    uint64_t seq = 0;
×
773
    int32_t  len = 0;
×
774
    if (pColVal->value.pData != NULL) {
×
775
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
776
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
777
      }
778
      SBlobItem item = {0};
×
779
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
780
      if (code != 0) {
×
781
        taosMemoryFree(val);
×
782
        terrno = code;
×
783
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
784
        return code;
×
785
      }
786

787
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
788
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
789
      len = item.len;
×
790
    }
791

792
    blobDataSetLen(val, len);
×
793
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
794

795
    taosMemoryFree(val);
×
796
  } else {
797
    colDataSetNULL(pColumnInfoData, idx);
×
798
  }
799
  return code;
×
800
}
801
static int32_t tqDoSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
802
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
803

804
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647✔
805
    if (COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
806
      char val[65535 + 2] = {0};
2,147,483,647✔
807
      if (pColVal->value.pData != NULL) {
2,147,483,647✔
808
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
2,147,483,647✔
809
      }
810
      varDataSetLen(val, pColVal->value.nData);
2,147,483,647✔
811
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
2,147,483,647✔
812
    } else {
813
      colDataSetNULL(pColumnInfoData, rowIndex);
×
814
    }
815
  } else {
816
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647✔
817
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647✔
818
  }
819

820
  return code;
2,147,483,647✔
821
}
822

823
static int32_t tqSetBlockData(SSDataBlock* pBlock, int32_t slotId, int32_t rowIndex, SColVal* colVal, SBlobSet* pBlobSet) {
2,147,483,647✔
824
  int32_t        code = 0;
2,147,483,647✔
825
  SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
826
  if (pColData == NULL) {
2,147,483,647✔
827
    return terrno;
×
828
  }
829

830
  uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647✔
831
  if (isBlob == 0) {
2,147,483,647✔
832
    code = tqDoSetVal(pColData, rowIndex, colVal);
2,147,483,647✔
833
  } else {
834
    code = tqDoSetBlobVal(pColData, rowIndex, colVal, pBlobSet);
×
835
  }
836
  return code;
2,147,483,647✔
837
}
838

839
static int32_t tqProcessSubmitRow(SArray*         pRows,
354,367,656✔
840
                                SSDataBlock*    pBlock,
841
                                SHashObj*       pCol2SlotId,
842
                                STqReader*      pReader,
843
                                SBlobSet*       pBlobSet) {
844
  int32_t        code = 0;
354,367,656✔
845
  int32_t        line = 0;
354,367,656✔
846

847
  SArray* pColArray = taosArrayInit(4, INT_BYTES * 2);
354,367,656✔
848
  TSDB_CHECK_NULL(pColArray, code, line, END, terrno);
354,369,263✔
849

850
  int32_t sourceIdx = -1;
354,369,263✔
851
  int32_t rowIndex = 0;
354,369,263✔
852
  SRow* pRow = taosArrayGetP(pRows, rowIndex);
354,369,263✔
853
  TSDB_CHECK_NULL(pRow, code, line, END, terrno);
354,371,262✔
854
  while (++sourceIdx < pReader->pTSchema->numOfCols) {
2,122,135,001✔
855
    SColVal colVal = {0};
1,767,805,639✔
856
    code = tRowGet(pRow, pReader->pTSchema, sourceIdx, &colVal);
1,767,796,201✔
857
    TSDB_CHECK_CODE(code, line, END);
1,767,845,356✔
858
    void* pSlotId = taosHashGet(pCol2SlotId, &colVal.cid, sizeof(colVal.cid));
1,767,845,356✔
859
    if (pSlotId == NULL) {
1,767,954,109✔
860
      continue;
290,391,043✔
861
    }
862
    int32_t pData[2] = {sourceIdx, *(int16_t*)pSlotId};
1,477,563,066✔
863
    TSDB_CHECK_NULL(taosArrayPush(pColArray, pData), code, line, END, terrno);
1,477,463,516✔
864
    code = tqSetBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
1,477,463,516✔
865
    TSDB_CHECK_CODE(code, line, END);
1,477,407,367✔
866
  }
867
  
868
  for (rowIndex = 1; rowIndex < taosArrayGetSize(pRows); rowIndex++) {
2,147,483,647✔
869
    SRow* pRow = taosArrayGetP(pRows, rowIndex);
2,147,483,647✔
870
    TSDB_CHECK_NULL(pRow, code, line, END, terrno);
2,147,483,647✔
871
    for (int32_t j = 0; j < taosArrayGetSize(pColArray); j++) {
2,147,483,647✔
872
      int32_t* pData = taosArrayGet(pColArray, j);
2,147,483,647✔
873
      TSDB_CHECK_NULL(pData, code, line, END, terrno);
2,147,483,647✔
874

875
      SColVal colVal = {0};
2,147,483,647✔
876
      code = tRowGet(pRow, pReader->pTSchema, pData[0], &colVal);
2,147,483,647✔
877
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
878

879
      code = tqSetBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
2,147,483,647✔
880
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
881
    }
882
  }
883

884
  END:
351,948,972✔
885
  taosArrayDestroy(pColArray);
336,593,638✔
886
  return code;
354,355,476✔
887
}
888

889
static int32_t tqProcessSubmitCol(SArray*         pCols,
1,512✔
890
                                SSDataBlock*    pBlock,
891
                                SHashObj*       pCol2SlotId,
892
                                SBlobSet*       pBlobSet) {
893
  int32_t        code = 0;
1,512✔
894
  int32_t        line = 0;
1,512✔
895

896
  for (int32_t i = 0; i < taosArrayGetSize(pCols); i++) {
4,536✔
897
    SColData* pCol = taosArrayGet(pCols, i);
3,024✔
898
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
3,024✔
899
    void* pSlotId = taosHashGet(pCol2SlotId, &pCol->cid, sizeof(pCol->cid));
3,024✔
900
    if (pSlotId == NULL) {
3,024✔
901
      continue;
1,512✔
902
    }
903
    SColVal colVal = {0};
1,512✔
904
    for (int32_t row = 0; row < pCol->nVal; row++) {
4,536✔
905
      code = tColDataGetValue(pCol, row, &colVal);
3,024✔
906
      TSDB_CHECK_CODE(code, line, END);
3,024✔
907

908
      code = tqSetBlockData(pBlock, *(int16_t*)pSlotId, pBlock->info.rows + row, &colVal, pBlobSet);
3,024✔
909
      TSDB_CHECK_CODE(code, line, END);
3,024✔
910
    }
911
  }
912
  
913
  END:
1,512✔
914
  return code;
1,512✔
915
}
916

917
static int32_t tqCheckSchema(STqReader* pReader, SSubmitTbData* pSubmitTbData) {
354,367,985✔
918
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
354,367,985✔
919
  int32_t sversion = pSubmitTbData->sver;
354,373,946✔
920
  int64_t suid = pSubmitTbData->suid;
354,373,149✔
921
  int64_t uid = pSubmitTbData->uid;
354,373,547✔
922
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
354,373,547✔
923
      (pReader->cachedSchemaVer != sversion)) {
354,198,488✔
924
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
107,096✔
925
    taosMemoryFreeClear(pReader->extSchema);
177,644✔
926
    taosMemoryFreeClear(pReader->pTSchema);
177,644✔
927
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0, true);
177,644✔
928
    if (pReader->pSchemaWrapper == NULL) {
177,644✔
929
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d, possibly dropped table",
×
930
              vgId, suid, uid, pReader->cachedSchemaVer);
931
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
932
    }
933
    pReader->pTSchema = tBuildTSchema(pReader->pSchemaWrapper->pSchema, pReader->pSchemaWrapper->nCols, pReader->pSchemaWrapper->version);
176,900✔
934
    if (pReader->pTSchema == NULL) {
177,644✔
935
      tqWarn("vgId:%d, cannot build schema for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d",
×
936
              vgId, suid, uid, pReader->cachedSchemaVer);
937
      return terrno;
×
938
    }
939
    pReader->cachedSchemaUid = uid;
177,230✔
940
    pReader->cachedSchemaSuid = suid;
177,644✔
941
    pReader->cachedSchemaVer = sversion;
177,644✔
942
  }
943
  return TSDB_CODE_SUCCESS;
354,372,749✔
944
}
945

946
static int32_t tqRetrieveCols(STqReader* pReader, SSDataBlock* pBlock, SHashObj* pCol2SlotId) {
354,368,360✔
947
  if (pReader == NULL || pBlock == NULL) {
354,368,360✔
948
    return TSDB_CODE_INVALID_PARA;
×
949
  }
950
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
354,373,946✔
951
  int32_t        code = 0;
354,373,547✔
952
  int32_t        line = 0;
354,373,547✔
953
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
354,373,547✔
954
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
354,374,345✔
955
  pReader->lastTs = pSubmitTbData->ctimeMs;
354,374,345✔
956

957
  int32_t numOfRows = 0;
354,374,345✔
958
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
354,374,345✔
959
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
1,512✔
960
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
1,512✔
961
    numOfRows = pCol->nVal;
1,512✔
962
  } else {
963
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
354,371,237✔
964
  }
965

966
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfRows);
354,373,946✔
967
  TSDB_CHECK_CODE(code, line, END);
354,373,172✔
968

969
  code = tqCheckSchema(pReader, pSubmitTbData);
354,373,172✔
970
  TSDB_CHECK_CODE(code, line, END);
354,374,369✔
971

972
  // convert and scan one block
973
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
354,374,369✔
974
    SArray* pCols = pSubmitTbData->aCol;
1,512✔
975
    code = tqProcessSubmitCol(pCols, pBlock, pCol2SlotId, pSubmitTbData->pBlobSet);
1,512✔
976
    TSDB_CHECK_CODE(code, line, END);
1,512✔
977
  } else {
978
    SArray*         pRows = pSubmitTbData->aRowP;
354,373,232✔
979
    code = tqProcessSubmitRow(pRows, pBlock, pCol2SlotId, pReader, pSubmitTbData->pBlobSet);
354,371,247✔
980
    TSDB_CHECK_CODE(code, line, END);
354,344,714✔
981
  }
982
  pBlock->info.rows += numOfRows;
354,346,226✔
983
END:
354,362,474✔
984
  if (code != 0) {
354,362,474✔
985
    tqError("tqRetrieveCols failed, line:%d, msg:%s", line, tstrerror(code));
×
986
  }
987
  return code;
354,273,493✔
988
}
989

990
#define PROCESS_VAL                                      \
991
  if (curRow == 0) {                                     \
992
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
993
    buildNew = true;                                     \
994
  } else {                                               \
995
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
996
    if (currentRowAssigned != assigned[j]) {             \
997
      assigned[j] = currentRowAssigned;                  \
998
      buildNew = true;                                   \
999
    }                                                    \
1000
  }
1001

1002
#define SET_DATA                                                                                    \
1003
  if (colVal.cid < pColData->info.colId) {                                                          \
1004
    sourceIdx++;                                                                                    \
1005
  } else if (colVal.cid == pColData->info.colId) {                                                  \
1006
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
1007
      TQ_ERR_GO_TO_END(tqDoSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
1008
    } else {                                                                                        \
1009
      TQ_ERR_GO_TO_END(tqDoSetVal(pColData, curRow - lastRow, &colVal));                              \
1010
    }                                                                                               \
1011
    sourceIdx++;                                                                                    \
1012
    targetIdx++;                                                                                    \
1013
  } else {                                                                                          \
1014
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
1015
    targetIdx++;                                                                                    \
1016
  }
1017

1018
static int32_t tqProcessBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
51,080,400✔
1019
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
1020
  int32_t         code = 0;
51,080,400✔
1021
  SSchemaWrapper* pSW = NULL;
51,080,400✔
1022
  SSDataBlock*    block = NULL;
51,090,553✔
1023
  if (taosArrayGetSize(blocks) > 0) {
51,090,553✔
1024
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
1025
    TQ_NULL_GO_TO_END(pLastBlock);
×
1026
    pLastBlock->info.rows = curRow - *lastRow;
×
1027
    *lastRow = curRow;
×
1028
  }
1029

1030
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
51,097,419✔
1031
  TQ_NULL_GO_TO_END(block);
51,071,963✔
1032

1033
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
51,071,963✔
1034
  TQ_NULL_GO_TO_END(pSW);
51,085,010✔
1035

1036
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
51,085,010✔
1037
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
51,106,577✔
1038
          (int32_t)taosArrayGetSize(block->pDataBlock));
1039

1040
  block->info.id.uid = pSubmitTbData->uid;
51,106,577✔
1041
  block->info.version = pReader->msg.ver;
51,098,867✔
1042
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
51,101,381✔
1043
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
51,102,236✔
1044
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
51,101,804✔
1045
  pSW = NULL;
51,101,804✔
1046

1047
  taosMemoryFreeClear(block);
51,101,804✔
1048

1049
END:
51,100,769✔
1050
  if (code != 0) {
51,096,715✔
1051
    tqError("tqProcessBuildNew failed, code:%d", code);
×
1052
  }
1053
  tDeleteSchemaWrapper(pSW);
51,096,715✔
1054
  blockDataFreeRes(block);
51,083,734✔
1055
  taosMemoryFree(block);
51,083,734✔
1056
  return code;
51,097,079✔
1057
}
1058
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
967,749✔
1059
  int32_t code = 0;
967,749✔
1060
  int32_t curRow = 0;
967,749✔
1061
  int32_t lastRow = 0;
967,749✔
1062

1063
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
967,749✔
1064
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
968,112✔
1065
  TQ_NULL_GO_TO_END(assigned);
967,749✔
1066

1067
  SArray*   pCols = pSubmitTbData->aCol;
967,749✔
1068
  SColData* pCol = taosArrayGet(pCols, 0);
967,749✔
1069
  TQ_NULL_GO_TO_END(pCol);
967,749✔
1070
  int32_t numOfRows = pCol->nVal;
967,749✔
1071
  int32_t numOfCols = taosArrayGetSize(pCols);
967,749✔
1072
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
967,749✔
1073
          numOfRows);
1074
  for (int32_t i = 0; i < numOfRows; i++) {
127,147,527✔
1075
    bool buildNew = false;
125,836,816✔
1076

1077
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
503,865,517✔
1078
      int32_t k = 0;
376,699,083✔
1079
      for (; k < numOfCols; k++) {
752,565,254✔
1080
        pCol = taosArrayGet(pCols, k);
744,985,689✔
1081
        TQ_NULL_GO_TO_END(pCol);
745,463,101✔
1082
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
745,463,101✔
1083
          SColVal colVal = {0};
376,785,922✔
1084
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
376,661,161✔
1085
          PROCESS_VAL
377,965,468✔
1086
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
378,174,376✔
1087
          break;
377,886,744✔
1088
        }
1089
      }
1090
      if (k >= numOfCols) {
378,028,701✔
1091
        // this column is not in the current row, so we set it to NULL
1092
        assigned[j] = 0;
×
1093
        buildNew = true;
×
1094
      }
1095
    }
1096

1097
    if (buildNew) {
125,463,221✔
1098
      TQ_ERR_GO_TO_END(tqProcessBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
968,112✔
1099
    }
1100

1101
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
125,462,857✔
1102
    TQ_NULL_GO_TO_END(pBlock);
125,394,100✔
1103

1104
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
125,394,100✔
1105
            (int32_t)taosArrayGetSize(blocks));
1106

1107
    int32_t targetIdx = 0;
125,394,100✔
1108
    int32_t sourceIdx = 0;
125,394,100✔
1109
    int32_t colActual = blockDataGetNumOfCols(pBlock);
125,394,100✔
1110
    while (targetIdx < colActual && sourceIdx < numOfCols) {
502,676,800✔
1111
      pCol = taosArrayGet(pCols, sourceIdx);
376,497,022✔
1112
      TQ_NULL_GO_TO_END(pCol);
375,859,656✔
1113
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
375,859,656✔
1114
      TQ_NULL_GO_TO_END(pColData);
375,061,903✔
1115
      SColVal colVal = {0};
375,061,903✔
1116
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
376,078,083✔
1117
      SET_DATA
377,841,627✔
1118
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
373,561,992✔
1119
    }
1120

1121
    curRow++;
126,179,778✔
1122
  }
1123
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
1,310,711✔
1124
  pLastBlock->info.rows = curRow - lastRow;
968,112✔
1125
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
967,023✔
1126
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1127
END:
9,925,558✔
1128
  if (code != TSDB_CODE_SUCCESS) {
968,112✔
1129
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1130
  }
1131
  taosMemoryFree(assigned);
968,112✔
1132
  return code;
968,112✔
1133
}
1134

1135
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
50,104,118✔
1136
  int32_t   code = 0;
50,104,118✔
1137
  STSchema* pTSchema = NULL;
50,104,118✔
1138

1139
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
50,104,118✔
1140
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
50,129,894✔
1141
  TQ_NULL_GO_TO_END(assigned);
50,112,056✔
1142

1143
  int32_t curRow = 0;
50,112,056✔
1144
  int32_t lastRow = 0;
50,112,056✔
1145
  SArray* pRows = pSubmitTbData->aRowP;
50,113,580✔
1146
  int32_t numOfRows = taosArrayGetSize(pRows);
50,132,230✔
1147
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
50,124,571✔
1148
  TQ_NULL_GO_TO_END(pTSchema);
50,123,475✔
1149
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
50,123,475✔
1150

1151
  for (int32_t i = 0; i < numOfRows; i++) {
1,864,974,818✔
1152
    bool  buildNew = false;
1,814,910,696✔
1153
    SRow* pRow = taosArrayGetP(pRows, i);
1,814,910,696✔
1154
    TQ_NULL_GO_TO_END(pRow);
1,813,277,657✔
1155

1156
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
2,147,483,647✔
1157
      SColVal colVal = {0};
2,147,483,647✔
1158
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2,147,483,647✔
1159
      PROCESS_VAL
2,147,483,647✔
1160
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
2,147,483,647✔
1161
    }
1162

1163
    if (buildNew) {
1,804,809,412✔
1164
      TQ_ERR_GO_TO_END(tqProcessBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
50,140,973✔
1165
    }
1166

1167
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
1,804,793,971✔
1168
    TQ_NULL_GO_TO_END(pBlock);
1,813,260,746✔
1169

1170
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
1,813,260,746✔
1171
            (int32_t)taosArrayGetSize(blocks));
1172

1173
    int32_t targetIdx = 0;
1,813,260,746✔
1174
    int32_t sourceIdx = 0;
1,813,260,746✔
1175
    int32_t colActual = blockDataGetNumOfCols(pBlock);
1,813,260,746✔
1176
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
2,147,483,647✔
1177
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2,147,483,647✔
1178
      TQ_NULL_GO_TO_END(pColData);
2,147,483,647✔
1179
      SColVal          colVal = {0};
2,147,483,647✔
1180
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2,147,483,647✔
1181
      SET_DATA
2,147,483,647✔
1182
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
2,147,483,647✔
1183
    }
1184

1185
    curRow++;
1,814,863,836✔
1186
  }
1187
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
50,064,122✔
1188
  if (pLastBlock != NULL) {
50,134,668✔
1189
    pLastBlock->info.rows = curRow - lastRow;
50,135,767✔
1190
  }
1191

1192
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
50,139,175✔
1193
          (int)taosArrayGetSize(blocks));
1194
END:
55,652,407✔
1195
  if (code != TSDB_CODE_SUCCESS) {
50,127,406✔
1196
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1197
  }
1198
  taosMemoryFreeClear(pTSchema);
50,119,532✔
1199
  taosMemoryFree(assigned);
50,107,942✔
1200
  return code;
50,127,208✔
1201
}
1202

1203
static int32_t tqBuildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
11,854✔
1204
  int32_t code = 0;
11,854✔
1205
  int32_t lino = 0;
11,854✔
1206
  void*   createReq = NULL;
11,854✔
1207
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
11,854✔
1208
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
11,854✔
1209

1210
  if (pRsp->createTableNum == 0) {
11,854✔
1211
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
7,816✔
1212
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
7,816✔
1213
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
7,816✔
1214
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
7,816✔
1215
  }
1216

1217
  uint32_t len = 0;
11,854✔
1218
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
11,854✔
1219
  TSDB_CHECK_CODE(code, lino, END);
11,854✔
1220
  createReq = taosMemoryCalloc(1, len);
11,854✔
1221
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
11,854✔
1222

1223
  SEncoder encoder = {0};
11,854✔
1224
  tEncoderInit(&encoder, createReq, len);
11,854✔
1225
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
11,854✔
1226
  tEncoderClear(&encoder);
11,854✔
1227
  TSDB_CHECK_CODE(code, lino, END);
11,854✔
1228
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
23,708✔
1229
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
23,708✔
1230
  pRsp->createTableNum++;
11,854✔
1231
  tqTrace("build create table info msg success");
11,854✔
1232

1233
END:
11,854✔
1234
  if (code != 0) {
11,854✔
1235
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1236
    taosMemoryFree(createReq);
×
1237
  }
1238
  return code;
11,854✔
1239
}
1240

1241

1242

1243
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
417,025✔
1244
  if (pReader == NULL || tbUidList == NULL) {
417,025✔
1245
    return TSDB_CODE_SUCCESS;
×
1246
  }
1247
  if (pReader->tbIdHash) {
417,025✔
1248
    taosHashClear(pReader->tbIdHash);
×
1249
  } else {
1250
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
417,025✔
1251
    if (pReader->tbIdHash == NULL) {
417,025✔
1252
      tqError("s-task:%s failed to init hash table", id);
×
1253
      return terrno;
×
1254
    }
1255
  }
1256

1257
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
11,482,904✔
1258
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
11,065,879✔
1259
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
11,065,879✔
1260
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1261
      continue;
×
1262
    }
1263
  }
1264

1265
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
417,025✔
1266
  return TSDB_CODE_SUCCESS;
417,025✔
1267
}
1268

1269
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
563,311✔
1270
  if (pReader == NULL || pTableUidList == NULL) {
563,311✔
1271
    return;
×
1272
  }
1273
  if (pReader->tbIdHash == NULL) {
563,311✔
1274
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1275
    if (pReader->tbIdHash == NULL) {
×
1276
      tqError("failed to init hash table");
×
1277
      return;
×
1278
    }
1279
  }
1280

1281
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
563,311✔
1282
  for (int i = 0; i < numOfTables; i++) {
1,130,181✔
1283
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
566,870✔
1284
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
566,870✔
1285
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1286
      continue;
×
1287
    }
1288
    tqDebug("%s add table uid:%" PRId64 " to hash:%p %p", __func__, *pKey, pReader, pReader->tbIdHash);
566,870✔
1289
  }
1290
}
1291

1292
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1293
  if (pReader == NULL) {
×
1294
    return false;
×
1295
  }
1296
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1297
}
1298

1299
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1300
  if (pReader == NULL) {
×
1301
    return false;
×
1302
  }
1303
  return pReader->msg.msgStr == NULL;
×
1304
}
1305

1306
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
78,541✔
1307
  if (pReader == NULL || tbUidList == NULL) {
78,541✔
1308
    return;
×
1309
  }
1310
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
184,097✔
1311
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
105,556✔
1312
    int32_t code = taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
105,556✔
1313
    if (code != 0) {
105,556✔
1314
      tqWarn("%s failed to remove table uid:%" PRId64 " from hash:%p %p, msg:%s", __func__, pKey != NULL ? *pKey : 0, pReader, pReader->tbIdHash, tstrerror(code));
29,466✔
1315
    }
1316
  }
1317
}
1318

1319
int32_t tqDeleteTbUidList(STQ* pTq, SArray* tbUidList) {
3,027,896✔
1320
  if (pTq == NULL) {
3,027,896✔
1321
    return 0;  // mounted vnode may have no tq
×
1322
  }
1323
  if (tbUidList == NULL) {
3,027,896✔
1324
    return TSDB_CODE_INVALID_PARA;
×
1325
  }
1326
  void*   pIter = NULL;
3,027,896✔
1327
  int32_t vgId = TD_VID(pTq->pVnode);
3,027,896✔
1328

1329
  // update the table list for each consumer handle
1330
  taosWLockLatch(&pTq->lock);
3,027,896✔
1331
  while (1) {
380,216✔
1332
    pIter = taosHashIterate(pTq->pHandle, pIter);
3,408,112✔
1333
    if (pIter == NULL) {
3,408,112✔
1334
      break;
3,027,896✔
1335
    }
1336

1337
    STqHandle* pTqHandle = (STqHandle*)pIter;
380,216✔
1338
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " delete table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
380,216✔
1339
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
380,216✔
1340
      int32_t code = qDeleteTableListForQuerySub(pTqHandle->execHandle.task, tbUidList);
1,640✔
1341
      if (code != 0) {
1,640✔
1342
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1343
        continue;
×
1344
      }
1345
    }
1346
  }
1347
  taosWUnLockLatch(&pTq->lock);
3,027,896✔
1348
  return 0;
3,027,896✔
1349
}
1350

1351
static SArray* tqCopyUidList(const SArray* tbUidList) {
74,819✔
1352
  SArray* tbUidListCopy = taosArrayInit(4, sizeof(int64_t));
74,819✔
1353
  if (tbUidListCopy == NULL) {
74,819✔
1354
    return NULL;
×
1355
  }
1356

1357
  if (taosArrayAddAll(tbUidListCopy, tbUidList) == NULL) {
74,819✔
1358
    taosArrayDestroy(tbUidListCopy);
×
1359
    tqError("copy table uid list failed");
×
1360
    return NULL;
×
1361
  }
1362
  return tbUidListCopy;
74,819✔
1363
}
1364

1365
static int32_t tqAddTableListForStbSub(STqHandle* pTqHandle, STQ* pTq, const SArray* tbUidList, int64_t version) {
74,819✔
1366
  int32_t code = 0;
74,819✔
1367
  SArray* tbUidListCopy = tqCopyUidList(tbUidList);
74,819✔
1368
  if (tbUidListCopy == NULL) {
74,819✔
1369
    code = terrno;
×
1370
    goto END;
×
1371
  }
1372
  code = qFilterTableList(pTq->pVnode, tbUidListCopy, version, pTqHandle->execHandle.execTb.node,
74,819✔
1373
                      pTqHandle->execHandle.task, pTqHandle->execHandle.execTb.suid);
74,819✔
1374
  if (code != TDB_CODE_SUCCESS) {
74,819✔
1375
    tqError("%s error:%d handle %s consumer:0x%" PRIx64, __func__, code, pTqHandle->subKey,
×
1376
            pTqHandle->consumerId);
1377
    goto END;
×
1378
  }
1379
  tqDebug("%s handle %s consumer:0x%" PRIx64 " add %d tables to tqReader", __func__, pTqHandle->subKey,
74,819✔
1380
          pTqHandle->consumerId, (int32_t)taosArrayGetSize(tbUidListCopy));
1381
  tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, tbUidListCopy);
74,819✔
1382

1383
END:
74,819✔
1384
  taosArrayDestroy(tbUidListCopy);
74,819✔
1385
  return code;
74,819✔
1386
}
1387

1388
int32_t tqAddTbUidListForQuerySub(STQ* pTq, const SArray* tbUidList) {
66,639,355✔
1389
  if (pTq == NULL) {
66,639,355✔
1390
    return 0;  // mounted vnode may have no tq
×
1391
  }
1392
  if (tbUidList == NULL) {
66,639,355✔
1393
    return TSDB_CODE_INVALID_PARA;
×
1394
  }
1395
  void*   pIter = NULL;
66,639,355✔
1396
  int32_t vgId = TD_VID(pTq->pVnode);
66,639,355✔
1397
  int32_t code = 0;
66,640,212✔
1398

1399
  // update the table list for each consumer handle
1400
  taosWLockLatch(&pTq->lock);
66,640,212✔
1401
  while (1) {
1,244,806✔
1402
    pIter = taosHashIterate(pTq->pHandle, pIter);
67,884,479✔
1403
    if (pIter == NULL) {
67,884,535✔
1404
      break;
66,639,729✔
1405
    }
1406

1407
    STqHandle* pTqHandle = (STqHandle*)pIter;
1,244,806✔
1408
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " add table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
1,244,806✔
1409
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,244,806✔
1410
      code = qAddTableListForQuerySub(pTqHandle->execHandle.task, tbUidList);
486,410✔
1411
      if (code != 0) {
486,410✔
1412
        tqError("add table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1413
        break;
×
1414
      }
1415
    }
1416
  }
1417
  taosHashCancelIterate(pTq->pHandle, pIter);
66,639,729✔
1418
  taosWUnLockLatch(&pTq->lock);
66,639,831✔
1419

1420
  return code;
66,640,212✔
1421
}
1422

1423
int32_t tqUpdateTbUidListForQuerySub(STQ* pTq, const SArray* tbUidList, SArray* cidList, SArray* cidListArray) {
8,598,265✔
1424
  if (pTq == NULL) {
8,598,265✔
1425
    return 0;  // mounted vnode may have no tq
×
1426
  }
1427
  if (tbUidList == NULL) {
8,598,265✔
1428
    return TSDB_CODE_INVALID_PARA;
×
1429
  }
1430
  void*   pIter = NULL;
8,598,265✔
1431
  int32_t vgId = TD_VID(pTq->pVnode);
8,598,265✔
1432
  int32_t code = 0;
8,598,265✔
1433
  // update the table list for each consumer handle
1434
  taosWLockLatch(&pTq->lock);
8,598,265✔
1435
  while (1) {
6,131✔
1436
    pIter = taosHashIterate(pTq->pHandle, pIter);
8,604,396✔
1437
    if (pIter == NULL) {
8,604,396✔
1438
      break;
8,598,265✔
1439
    }
1440

1441
    STqHandle* pTqHandle = (STqHandle*)pIter;
6,131✔
1442
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " update table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
6,131✔
1443
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
6,131✔
1444
      SNode* pTagCond = getTagCondNodeForQueryTmq(pTqHandle->execHandle.task);
3,537✔
1445
      bool ret = checkCidInTagCondition(pTagCond, cidList);
3,537✔
1446
      if (ret){
3,537✔
1447
        code = qUpdateTableListForQuerySub(pTqHandle->execHandle.task, tbUidList);
2,082✔
1448
        if (code != 0) {
2,082✔
1449
          tqError("update table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1450
          break;
×
1451
        }
1452
      }
1453
      qUpdateTableTagCacheForQuerySub(pTqHandle->execHandle.task, tbUidList, cidList, cidListArray);
3,537✔
1454
    }
1455
  }
1456

1457
  taosHashCancelIterate(pTq->pHandle, pIter);
8,598,265✔
1458
  taosWUnLockLatch(&pTq->lock);
8,598,265✔
1459

1460
  return code;
8,598,265✔
1461
}
1462

1463
static int32_t tqUpdateTableListForStbSub(STQ* pTq, const SArray* tbUidList, SArray* cidList, SArray* cidListArray, STqHandle* pTqHandle, int64_t version) {
6,801✔
1464
  if (pTq == NULL) {
6,801✔
1465
    return 0;  // mounted vnode may have no tq
×
1466
  }
1467
  if (tbUidList == NULL) {
6,801✔
1468
    return TSDB_CODE_INVALID_PARA;
×
1469
  }
1470
  void*   pIter = NULL;
6,801✔
1471
  int32_t vgId = TD_VID(pTq->pVnode);
6,801✔
1472
  int32_t code = 0;
6,801✔
1473
  taosWLockLatch(&pTq->lock);
6,801✔
1474
  tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " update table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
6,801✔
1475
  SNode* pTagCond = getTagCondNodeForStableTmq(pTqHandle->execHandle.execTb.node);
6,801✔
1476
  bool ret = checkCidInTagCondition(pTagCond, cidList);
6,801✔
1477
  if (ret){
6,801✔
1478
    tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
3,693✔
1479
    code = tqAddTableListForStbSub(pTqHandle, pTq, tbUidList, version);
3,693✔
1480
    if (code != 0) {
3,693✔
1481
      tqError("update table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1482
    }
1483
  }
1484
  taosWUnLockLatch(&pTq->lock);
6,801✔
1485

1486
  return code;
6,801✔
1487
}
1488

1489
static void tqAlterTagForStbSub(SVnode *pVnode, const SArray* tbUidList, const SArray* tags, const SArray* tagsArray, STqHandle* pTqHandle, int64_t version) {
6,801✔
1490
  int32_t       code = 0;
6,801✔
1491
  int32_t       lino = 0;
6,801✔
1492
  SArray*       cidList = NULL;
6,801✔
1493
  SArray*       cidListArray = NULL;
6,801✔
1494

1495
  code = getCidInfo(tags, tagsArray, &cidList, &cidListArray);
6,801✔
1496
  QUERY_CHECK_CODE(code, lino, end);
6,801✔
1497

1498
  tqDebug("vgId:%d, try to add %d tables in query table list, cidList size:%"PRIzu,
6,801✔
1499
         TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUidList), taosArrayGetSize(cidList));
1500
  code = tqUpdateTableListForStbSub(pVnode->pTq, tbUidList, cidList, cidListArray, pTqHandle, version);
6,801✔
1501
  QUERY_CHECK_CODE(code, lino, end);
6,801✔
1502

1503
end:
6,801✔
1504
  if (code != 0) {
6,801✔
1505
    qError("vgId:%d, failed to alter tags for %d tables since %s",
×
1506
           TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUidList), tstrerror(code));
1507
  }
1508
  taosArrayDestroy(cidList);
6,801✔
1509
  taosArrayDestroyP(cidListArray, (FDelete)taosArrayDestroy);
6,801✔
1510
}
6,801✔
1511

1512
static void tqDestroySourceScanTables(void* ptr) {
×
1513
  SArray** pTables = ptr;
×
1514
  if (pTables && *pTables) {
×
1515
    taosArrayDestroy(*pTables);
×
1516
    *pTables = NULL;
×
1517
  }
1518
}
×
1519

1520
static int32_t tqCompareSVTColInfo(const void* p1, const void* p2) {
×
1521
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1522
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1523
  if (pCol1->vColId == pCol2->vColId) {
×
1524
    return 0;
×
1525
  } else if (pCol1->vColId < pCol2->vColId) {
×
1526
    return -1;
×
1527
  } else {
1528
    return 1;
×
1529
  }
1530
}
1531

1532
static void tqFreeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1533
  if (value) {
×
1534
    SSchemaWrapper* pSchemaWrapper = value;
×
1535
    tDeleteSchemaWrapper(pSchemaWrapper);
1536
  }
1537
}
×
1538

1539
static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t precision) {
×
1540
  int32_t    code = TDB_CODE_SUCCESS;
×
1541
  int32_t    lino = 0;
×
1542
  void*      buf = NULL;
×
1543

1544
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + *(uint32_t *)rawData + INT_BYTES;
×
1545
  buf = taosMemoryCalloc(1, dataStrLen);
×
1546
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
×
1547

1548
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
×
1549
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_RAW_VERSION;
×
1550
  pRetrieve->precision = precision;
×
1551
  pRetrieve->compressed = 0;
×
1552

1553
  memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES);
×
1554
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno);
×
1555
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
×
1556
  pRsp->blockDataElementFree = true;
×
1557

1558
  tqTrace("tqAddRawDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
×
1559
  END:
×
1560
  if (code != TSDB_CODE_SUCCESS) {
×
1561
    taosMemoryFree(buf);
×
1562
    tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code));
×
1563
  }
1564
  return code;
×
1565
}
1566

1567
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, const SSchemaWrapper* pSW, int8_t precision) {
95,136,664✔
1568
  int32_t code = 0;
95,136,664✔
1569
  int32_t lino = 0;
95,136,664✔
1570
  SSchemaWrapper* pSchema = NULL;
95,136,664✔
1571
  
1572
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
95,139,997✔
1573
  int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
95,165,012✔
1574
  void*   buf = taosMemoryCalloc(1, dataStrLen);
95,165,012✔
1575
  TSDB_CHECK_NULL(buf, code, lino, END, terrno);
95,121,425✔
1576

1577
  SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
95,121,425✔
1578
  pRetrieve->version = RETRIEVE_TABLE_RSP_TMQ_VERSION;
95,121,425✔
1579
  pRetrieve->precision = precision;
95,126,324✔
1580
  pRetrieve->compressed = 0;
95,130,148✔
1581
  pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
95,127,941✔
1582

1583
  int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, pSW->nCols);
95,155,214✔
1584
  TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno);
95,123,126✔
1585

1586
  actualLen += sizeof(SRetrieveTableRspForTmq);
95,123,126✔
1587
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
190,281,702✔
1588
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
190,317,517✔
1589
  pSchema = tCloneSSchemaWrapper(pSW);
95,129,767✔
1590
  TSDB_CHECK_NULL(pSchema, code, lino, END, terrno);
95,129,767✔
1591
  TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSchema), code, lino, END, terrno);
190,289,012✔
1592
  pSchema = NULL;
95,159,245✔
1593
  pRsp->blockDataElementFree = true;
95,159,245✔
1594
  tqTrace("tqAddBlockDataToRsp add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
95,149,927✔
1595

1596
END:
95,149,927✔
1597
  tDeleteSchemaWrapper(pSchema);
95,141,165✔
1598
  if (code != TSDB_CODE_SUCCESS){
95,131,673✔
1599
    taosMemoryFree(buf);
×
1600
    tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
×
1601
  }
1602
  return code;
95,131,673✔
1603
}
1604

1605
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
51,047,404✔
1606
  int32_t    code = TDB_CODE_SUCCESS;
51,047,404✔
1607
  int32_t    lino = 0;
51,047,404✔
1608
  SMetaReader mr = {0};
51,047,404✔
1609

1610
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
51,055,389✔
1611
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
51,055,389✔
1612

1613
  metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
51,055,389✔
1614

1615
  code = metaReaderGetTableEntryByUidCache(&mr, uid);
51,058,541✔
1616
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST){
51,025,141✔
1617
    char tbname[TSDB_TABLE_NAME_LEN] = {0};
242,557✔
1618
    code = metaGetTbnameByIdIfTableNotExist(pTq->pVnode->pMeta, uid, tbname);
242,557✔
1619
    TSDB_CHECK_CODE(code, lino, END);
242,557✔
1620

1621
    for (int32_t i = 0; i < n; i++) {
485,114✔
1622
      char* tbName = taosStrdup(tbname);
242,557✔
1623
      TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
242,557✔
1624
      if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
485,114✔
1625
        tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
×
1626
        continue;
×
1627
      }
1628
      tqTrace("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
242,557✔
1629
    }
1630
    goto END;
242,557✔
1631
  }
1632
  TSDB_CHECK_CODE(code, lino, END);
50,782,584✔
1633

1634
  for (int32_t i = 0; i < n; i++) {
101,601,528✔
1635
    char* tbName = taosStrdup(mr.me.name);
50,779,034✔
1636
    TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
50,805,598✔
1637
    if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
101,628,697✔
1638
      tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
×
1639
      continue;
×
1640
    }
1641
    tqTrace("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
50,823,099✔
1642
  }
1643

1644
END:
51,065,829✔
1645
  if (code != TSDB_CODE_SUCCESS) {
51,051,949✔
1646
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
1647
  }
1648
  metaReaderClear(&mr);
51,051,949✔
1649
  return code;
51,047,536✔
1650
}
1651

1652
int32_t tqGetDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) {
194,673,152✔
1653
  if (task == NULL || pHandle == NULL || res == NULL) {
194,673,152✔
1654
    return TSDB_CODE_INVALID_PARA;
×
1655
  }
1656
  uint64_t ts = 0;
194,703,538✔
1657
  qStreamSetOpen(task);
194,685,790✔
1658

1659
  tqDebug("consumer:0x%" PRIx64 " vgId:%d, tmq one task start execute", pHandle->consumerId, vgId);
194,659,233✔
1660
  int32_t code = qExecTask(task, res, &ts);
194,716,260✔
1661
  if (code != TSDB_CODE_SUCCESS) {
194,722,702✔
1662
    tqError("consumer:0x%" PRIx64 " vgId:%d, task exec error since %s", pHandle->consumerId, vgId, tstrerror(code));
347✔
1663
  }
1664

1665
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq one task end executed, pDataBlock:%p", pHandle->consumerId, vgId, *res);
194,723,824✔
1666
  return code;
194,722,972✔
1667
}
1668

1669
static int32_t tqProcessReplayRsp(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, const SMqPollReq* pRequest, SSDataBlock* pDataBlock, qTaskInfo_t task){
695,688✔
1670
  int32_t code = 0;
695,688✔
1671
  int32_t lino = 0;
695,688✔
1672

1673
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) {
695,688✔
1674
    blockDataDestroy(pHandle->block);
328✔
1675
    pHandle->block = NULL;
328✔
1676
  }
1677
  if (pHandle->block == NULL) {
695,688✔
1678
    if (pDataBlock == NULL) {
691,752✔
1679
      goto END;
690,112✔
1680
    }
1681

1682
    STqOffsetVal offset = {0};
1,640✔
1683
    code = qStreamExtractOffset(task, &offset);
1,640✔
1684
    TSDB_CHECK_CODE(code, lino, END);
1,640✔
1685

1686
    pHandle->block = NULL;
1,640✔
1687

1688
    code = createOneDataBlock(pDataBlock, true, &pHandle->block);
1,640✔
1689
    TSDB_CHECK_CODE(code, lino, END);
1,640✔
1690

1691
    pHandle->blockTime = offset.ts;
1,640✔
1692
    tOffsetDestroy(&offset);
1,640✔
1693
    int32_t vgId = TD_VID(pTq->pVnode);
1,640✔
1694
    code = tqGetDataBlock(task, pHandle, vgId, &pDataBlock);
1,640✔
1695
    TSDB_CHECK_CODE(code, lino, END);
1,640✔
1696
  }
1697

1698
  const STqExecHandle* pExec = &pHandle->execHandle;
5,576✔
1699
  code = tqAddBlockDataToRsp(pHandle->block, pRsp, &pExec->execCol.pSW, pTq->pVnode->config.tsdbCfg.precision);
5,576✔
1700
  TSDB_CHECK_CODE(code, lino, END);
5,576✔
1701

1702
  pRsp->blockNum++;
5,576✔
1703
  if (pDataBlock == NULL) {
5,576✔
1704
    blockDataDestroy(pHandle->block);
1,312✔
1705
    pHandle->block = NULL;
1,312✔
1706
  } else {
1707
    code = copyDataBlock(pHandle->block, pDataBlock);
4,264✔
1708
    TSDB_CHECK_CODE(code, lino, END);
4,264✔
1709

1710
    STqOffsetVal offset = {0};
4,264✔
1711
    code = qStreamExtractOffset(task, &offset);
4,264✔
1712
    TSDB_CHECK_CODE(code, lino, END);
4,264✔
1713

1714
    pRsp->sleepTime = offset.ts - pHandle->blockTime;
4,264✔
1715
    pHandle->blockTime = offset.ts;
4,264✔
1716
    tOffsetDestroy(&offset);
4,264✔
1717
  }
1718

1719
END:
695,688✔
1720
  if (code != TSDB_CODE_SUCCESS) {
695,688✔
1721
    tqError("%s failed at %d, failed to process replay response:%s", __FUNCTION__, lino, tstrerror(code));
×
1722
  }
1723
  return code;
695,688✔
1724
}
1725

1726
static int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
205,566,723✔
1727
  int32_t code = 0;
205,566,723✔
1728
  int32_t lino = 0;
205,566,723✔
1729
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
205,566,723✔
1730
  TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
205,566,723✔
1731
  TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
205,566,723✔
1732
  TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA);
205,566,723✔
1733
  TSDB_CHECK_NULL(pRequest, code, lino, END, TSDB_CODE_INVALID_PARA);
205,566,723✔
1734

1735
  int32_t vgId = TD_VID(pTq->pVnode);
205,566,723✔
1736
  int32_t totalRows = 0;
205,556,859✔
1737

1738
  const STqExecHandle* pExec = &pHandle->execHandle;
205,556,859✔
1739
  qTaskInfo_t          task = pExec->task;
205,574,659✔
1740

1741
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
205,574,365✔
1742
  TSDB_CHECK_CODE(code, lino, END);
205,530,075✔
1743

1744
  qStreamSetParams(task, pRequest->sourceExcluded, pRequest->minPollRows, pRequest->timeout, pRequest->enableReplay);
194,665,588✔
1745
  do {
1746
    SSDataBlock* pDataBlock = NULL;
194,602,880✔
1747
    code = tqGetDataBlock(task, pHandle, vgId, &pDataBlock);
194,608,204✔
1748
    TSDB_CHECK_CODE(code, lino, END);
194,717,960✔
1749

1750
    if (pRequest->enableReplay) {
194,717,613✔
1751
      code = tqProcessReplayRsp(pTq, pHandle, pRsp, pRequest, pDataBlock, task);
695,688✔
1752
      TSDB_CHECK_CODE(code, lino, END);
695,688✔
1753
      break;
695,688✔
1754
    }
1755
    if (pDataBlock == NULL) {
194,024,272✔
1756
      break;
150,806,644✔
1757
    }
1758
    code = tqAddBlockDataToRsp(pDataBlock, pRsp, &pExec->execCol.pSW, pTq->pVnode->config.tsdbCfg.precision);
43,217,628✔
1759
    TSDB_CHECK_CODE(code, lino, END);
43,217,268✔
1760

1761
    pRsp->blockNum++;
43,217,268✔
1762
    totalRows += pDataBlock->info.rows;
43,217,628✔
1763
  } while(0);
1764

1765
  tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows);
194,719,555✔
1766
  code = qStreamExtractOffset(task, &pRsp->rspOffset);
194,716,361✔
1767

1768
END:
205,586,691✔
1769
  if (code != 0) {
205,586,691✔
1770
    tqError("%s failed at %d, tmq task executed error msg:%s", __FUNCTION__, lino, tstrerror(code));
10,864,854✔
1771
  }
1772
  return code;
205,587,053✔
1773
}
1774

1775
static int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
55,569✔
1776
  int32_t code = 0;
55,569✔
1777
  int32_t lino = 0;
55,569✔
1778
  char* tbName = NULL;
55,569✔
1779
  const STqExecHandle* pExec = &pHandle->execHandle;
55,569✔
1780
  qTaskInfo_t          task = pExec->task;
55,569✔
1781
  code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
55,569✔
1782
  TSDB_CHECK_CODE(code, lino, END);
55,569✔
1783

1784
  qStreamSetParams(task, pRequest->sourceExcluded, pRequest->minPollRows, pRequest->timeout, false);
55,569✔
1785

1786
  int32_t rowCnt = 0;
55,569✔
1787
  int64_t st = taosGetTimestampMs();
55,569✔
1788
  while (1) {
1,747,962✔
1789
    SSDataBlock* pDataBlock = NULL;
1,803,531✔
1790
    uint64_t     ts = 0;
1,803,531✔
1791
    tqDebug("tmqsnap task start to execute");
1,803,886✔
1792
    code = qExecTask(task, &pDataBlock, &ts);
1,804,241✔
1793
    TSDB_CHECK_CODE(code, lino, END);
1,803,886✔
1794
    tqDebug("tmqsnap task execute end, get %p", pDataBlock);
1,803,886✔
1795

1796
    if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
1,803,886✔
1797
      if (pRsp->withTbName) {
870,471✔
1798
        tbName = taosStrdup(qExtractTbnameFromTask(task));
870,471✔
1799
        TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
869,051✔
1800
        TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno);
1,739,522✔
1801
        tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName);
870,471✔
1802
        tbName = NULL;
870,471✔
1803
      }
1804

1805
      code = tqAddBlockDataToRsp(pDataBlock, pRsp, qExtractSchemaFromTask(task), pTq->pVnode->config.tsdbCfg.precision);
870,471✔
1806
      TSDB_CHECK_CODE(code, lino, END);
870,471✔
1807

1808
      pRsp->blockNum++;
870,471✔
1809
      rowCnt += pDataBlock->info.rows;
870,471✔
1810
      if (rowCnt <= pRequest->minPollRows && (taosGetTimestampMs() - st <= pRequest->timeout)) {
1,706,869✔
1811
        continue;
836,043✔
1812
      }
1813
    }
1814

1815
    // get meta
1816
    SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
967,843✔
1817
    if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
967,843✔
1818
      code = qStreamExtractOffset(task, &tmp->rspOffset);
6,979✔
1819
      TSDB_CHECK_CODE(code, lino, END);
6,979✔
1820
      *pBatchMetaRsp = *tmp;
6,979✔
1821
      tqDebug("tmqsnap task get meta");
6,979✔
1822
      break;
6,979✔
1823
    }
1824

1825
    if (pDataBlock == NULL) {
960,864✔
1826
      code = qStreamExtractOffset(task, pOffset);
926,436✔
1827
      TSDB_CHECK_CODE(code, lino, END);
926,436✔
1828

1829
      if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
926,436✔
1830
        continue;
912,274✔
1831
      }
1832

1833
      tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1);
14,162✔
1834
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
14,162✔
1835
      break;
14,162✔
1836
    }
1837

1838
    if (pRsp->blockNum > 0) {
34,428✔
1839
      tqDebug("tmqsnap task exec exited, get data");
34,428✔
1840
      code = qStreamExtractOffset(task, &pRsp->rspOffset);
34,428✔
1841
      break;
34,428✔
1842
    }
1843
  }
1844
  tqDebug("%s:%d success", __FUNCTION__, lino);
55,569✔
1845
END:
55,569✔
1846
  if (code != 0){
55,569✔
1847
    tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
×
1848
  }
1849
  taosMemoryFree(tbName);
55,569✔
1850
  return code;
55,569✔
1851
}
1852

1853
static int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
51,117,803✔
1854
                             SSubmitTbData* pSubmitTbData, SArray* rawList, int8_t fetchMeta) {
1855
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
51,117,803✔
1856
  if (fetchMeta == ONLY_META) {
51,107,855✔
1857
    if (pSubmitTbData->pCreateTbReq != NULL) {
8,334✔
1858
      if (pRsp->createTableReq == NULL) {
2,394✔
1859
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
1,404✔
1860
        if (pRsp->createTableReq == NULL) {
1,404✔
1861
          return terrno;
×
1862
        }
1863
      }
1864
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
4,788✔
1865
        return terrno;
×
1866
      }
1867
      pSubmitTbData->pCreateTbReq = NULL;
2,394✔
1868
    }
1869
    return 0;
8,334✔
1870
  }
1871

1872
  int32_t sversion = pSubmitTbData->sver;
51,099,521✔
1873
  int64_t uid = pSubmitTbData->uid;
51,108,032✔
1874
  pReader->lastBlkUid = uid;
51,108,767✔
1875

1876
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
51,107,669✔
1877
  taosMemoryFreeClear(pReader->extSchema);
51,104,466✔
1878
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0, true);
51,110,170✔
1879
  if (pReader->pSchemaWrapper == NULL) {
51,098,536✔
1880
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
2,105✔
1881
           pReader->pWalReader->pWal->cfg.vgId, uid, sversion);
1882
    pReader->cachedSchemaSuid = 0;
2,105✔
1883
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
2,105✔
1884
  }
1885

1886
  if (pSubmitTbData->pCreateTbReq != NULL && fetchMeta != ONLY_DATA) {
51,087,696✔
1887
    int32_t code = tqBuildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
11,854✔
1888
    if (code != 0) {
11,854✔
1889
      return code;
×
1890
    }
1891
  } else if (rawList != NULL) {
51,072,325✔
1892
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1893
      return terrno;
×
1894
    }
1895
    pReader->pSchemaWrapper = NULL;
×
1896
    return 0;
×
1897
  }
1898

1899
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
51,084,179✔
1900
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
968,112✔
1901
  } else {
1902
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
50,113,950✔
1903
  }
1904
}
1905

1906
static bool tqFilterForStbSub(STQ* pTq, STqHandle* pHandle, SSubmitTbData* pSubmitTbData, int64_t version) {
52,488,441✔
1907
  bool ret = false;
52,488,441✔
1908
  SArray* tbUids = NULL;
52,488,441✔
1909
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
52,488,441✔
1910
    goto end;
38,379,280✔
1911
  }
1912
  if (pSubmitTbData->pCreateTbReq != NULL && (pSubmitTbData->pCreateTbReq->type == TSDB_CHILD_TABLE || pSubmitTbData->pCreateTbReq->type == TSDB_VIRTUAL_CHILD_TABLE)
14,113,075✔
1913
        && pSubmitTbData->pCreateTbReq->ctb.suid == pHandle->execHandle.execTb.suid) {
14,776✔
1914
    tbUids = taosArrayInit(1, sizeof(int64_t));
11,338✔
1915
    if (tbUids == NULL) {
11,338✔
1916
      goto end;
×
1917
    }
1918
    if (taosArrayPush(tbUids, &pSubmitTbData->uid) == NULL) {
22,676✔
1919
      goto end;
×
1920
    }
1921
    
1922
    taosWLockLatch(&pTq->lock);
11,338✔
1923
    tqReaderRemoveTbUidList(pHandle->execHandle.pTqReader, tbUids);
11,338✔
1924
    int32_t code = tqAddTableListForStbSub(pHandle, pTq, tbUids, version);
11,338✔
1925
    taosWUnLockLatch(&pTq->lock);
11,338✔
1926
    if (code != 0) {
11,338✔
1927
      goto end;
×
1928
    }
1929
  }
1930
  
1931
  STqExecHandle* pExec = &pHandle->execHandle;
14,112,399✔
1932
  STqReader* pReader = pExec->pTqReader;
14,112,737✔
1933
  if (taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) == NULL) {
14,112,737✔
1934
    tqInfo("iterator submit block in hash continue for stb sub, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
1,379,282✔
1935
            pReader->nextBlk, (int32_t)taosArrayGetSize(pReader->submit.aSubmitTbData), (int32_t)taosHashGetSize(pReader->tbIdHash), pSubmitTbData->uid);
1936
    ret = true;
1,379,670✔
1937
  }
1938

1939
end:
12,733,455✔
1940
  taosArrayDestroy(tbUids);
52,492,405✔
1941
  return ret;
52,489,226✔
1942
}
1943

1944
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows,
52,488,458✔
1945
                             const SMqPollReq* pRequest, SArray* rawList, int64_t version){
1946
  int32_t code = 0;
52,488,458✔
1947
  int32_t lino = 0;
52,488,458✔
1948
  SArray* pBlocks = NULL;
52,488,458✔
1949
  SArray* pSchemas = NULL;
52,488,458✔
1950

1951
  STqExecHandle* pExec = &pHandle->execHandle;
52,488,458✔
1952
  STqReader* pReader = pExec->pTqReader;
52,495,656✔
1953

1954
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
52,491,273✔
1955
  TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, terrno);
52,494,201✔
1956

1957
  if (tqFilterForStbSub(pTq, pHandle, pSubmitTbData, version)) {
52,494,201✔
1958
    goto END;
1,379,670✔
1959
  }
1960

1961
  pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
51,109,911✔
1962
  TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno);
51,111,802✔
1963
  pSchemas = taosArrayInit(0, sizeof(void*));
51,111,802✔
1964
  TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno);
51,118,504✔
1965
  
1966
  code = tqRetrieveTaosxBlock(pReader, pRsp, pBlocks, pSchemas, pSubmitTbData, rawList, pHandle->fetchMeta);
51,118,504✔
1967
  TSDB_CHECK_CODE(code, lino, END);
51,089,242✔
1968
  bool tmp = (pSubmitTbData->flags & pRequest->sourceExcluded) != 0;
51,087,137✔
1969
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
51,096,414✔
1970

1971
  if (pHandle->fetchMeta == ONLY_META){
51,059,784✔
1972
    goto END;
8,334✔
1973
  }
1974

1975
  int32_t blockNum = taosArrayGetSize(pBlocks) == 0 ? 1 : taosArrayGetSize(pBlocks);
51,068,161✔
1976
  if (pRsp->withTbName) {
51,070,725✔
1977
    int64_t uid = pExec->pTqReader->lastBlkUid;
51,067,851✔
1978
    code = tqAddTbNameToRsp(pTq, uid, pRsp, blockNum);
51,069,220✔
1979
    TSDB_CHECK_CODE(code, lino, END);
51,027,132✔
1980
  }
1981

1982
  TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS);
51,025,614✔
1983
  for (int32_t i = 0; i < blockNum; i++) {
102,083,842✔
1984
    if (taosArrayGetSize(pBlocks) == 0){
51,022,446✔
1985
      void* rawData = taosArrayGetP(rawList, pReader->nextBlk);
×
1986
      if (rawData == NULL) {
×
1987
        continue;
×
1988
      }
1989
      if (tqAddRawDataToRsp(rawData, pRsp, pTq->pVnode->config.tsdbCfg.precision) != 0){
×
1990
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
1991
        continue;
×
1992
      }
1993
      *totalRows += *(uint32_t *)rawData + INT_BYTES; // bytes actually
×
1994
    } else {
1995
      SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
51,047,426✔
1996
      if (pBlock == NULL) {
51,046,952✔
1997
        continue;
×
1998
      }
1999

2000
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pSchemas, i);
51,046,952✔
2001
      if (tqAddBlockDataToRsp(pBlock, pRsp, pSW, pTq->pVnode->config.tsdbCfg.precision) != 0){
51,053,866✔
2002
        tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
×
2003
        continue;
×
2004
      }
2005
      *totalRows += pBlock->info.rows;
51,040,199✔
2006
    }
2007

2008
    pRsp->blockNum++;
51,056,311✔
2009
  }
2010
  tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
51,061,396✔
2011
END:
51,061,396✔
2012
  if (code != 0) {
52,455,403✔
2013
    tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
2,105✔
2014
  }
2015
  taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
52,455,403✔
2016
  taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
52,459,378✔
2017
}
52,475,383✔
2018

2019
static void tqPreProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){
52,460,167✔
2020
  STqExecHandle* pExec = &pHandle->execHandle;
52,460,167✔
2021
  STqReader* pReader = pExec->pTqReader;
52,464,153✔
2022
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
52,464,863✔
2023
  for (int32_t i = 0; i < blockSz; i++){
104,956,266✔
2024
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, i);
52,489,210✔
2025
    if (pSubmitTbData== NULL){
52,493,825✔
2026
      taosArrayDestroy(*rawList);
×
2027
      *rawList = NULL;
×
2028
      return;
×
2029
    }
2030

2031
    int64_t uid = pSubmitTbData->uid;
52,493,825✔
2032
    if (pRequest->rawData) {
52,494,834✔
2033
      if (taosHashGet(pRequest->uidHash, &uid, LONG_BYTES) != NULL) {
×
2034
        tqDebug("poll rawdata split,uid:%" PRId64 " is already exists", uid);
×
2035
        terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
2036
        return;
×
2037
      } else {
2038
        int32_t code = taosHashPut(pRequest->uidHash, &uid, LONG_BYTES, &uid, LONG_BYTES);
×
2039
        if (code != 0) {
×
2040
          tqError("failed to add table uid to hash, code:%d, uid:%" PRId64, code, uid);
×
2041
        }
2042
      }
2043
    }
2044

2045
    if (pSubmitTbData->pCreateTbReq == NULL){
52,494,915✔
2046
      continue;
51,499,428✔
2047
    }
2048

2049
    int64_t createTime = INT64_MAX;
993,358✔
2050
    int64_t *cTime = (int64_t*)taosHashGet(pHandle->tableCreateTimeHash, &uid, LONG_BYTES);
994,084✔
2051
    if (cTime != NULL){
994,084✔
2052
      createTime = *cTime;
1,122✔
2053
    } else{
2054
      createTime = metaGetTableCreateTime(pReader->pVnode->pMeta, uid, 1);
992,962✔
2055
      if (createTime != INT64_MAX){
991,873✔
2056
        int32_t code = taosHashPut(pHandle->tableCreateTimeHash, &uid, LONG_BYTES, &createTime, LONG_BYTES);
991,797✔
2057
        if (code != 0){
991,796✔
2058
          tqError("failed to add table create time to hash,code:%d, uid:%"PRId64, code, uid);
×
2059
        }
2060
      }
2061
    }
2062
    if (pSubmitTbData->ctimeMs > createTime){
992,994✔
2063
      tDestroySVSubmitCreateTbReq(pSubmitTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
1,122✔
2064
      taosMemoryFreeClear(pSubmitTbData->pCreateTbReq);
1,122✔
2065
    } else if (pHandle->fetchMeta != ONLY_DATA){
992,962✔
2066
      taosArrayDestroy(*rawList);
16,244✔
2067
      *rawList = NULL;
16,244✔
2068
    }
2069
  }
2070
}
2071

2072
static int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) {
52,468,075✔
2073
  int32_t code = 0;
52,468,075✔
2074
  int32_t lino = 0;
52,468,075✔
2075
  SDecoder decoder = {0};
52,468,075✔
2076
  STqExecHandle* pExec = &pHandle->execHandle;
52,468,430✔
2077
  STqReader* pReader = pExec->pTqReader;
52,468,430✔
2078
  SArray *rawList = NULL;
52,468,430✔
2079
  if (pRequest->rawData){
52,468,075✔
2080
    rawList = taosArrayInit(0, POINTER_BYTES);
×
2081
    TSDB_CHECK_NULL(rawList, code, lino, END, terrno);
×
2082
  }
2083
  code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver, rawList, &decoder);
52,467,687✔
2084
  TSDB_CHECK_CODE(code, lino, END);
52,467,010✔
2085
  tqPreProcessSubmitMsg(pHandle, pRequest, &rawList);
52,467,010✔
2086
  // data could not contains same uid data in rawdata mode
2087
  if (pRequest->rawData != 0 && terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
52,454,495✔
2088
    goto END;
×
2089
  }
2090

2091
  // this submit data is metadata and previous data is rawdata
2092
  if (pRequest->rawData != 0 && *totalRows > 0 && pRsp->createTableNum == 0 && rawList == NULL){
52,466,646✔
2093
    tqDebug("poll rawdata split,vgId:%d, this wal submit data contains metadata and previous data is data", pTq->pVnode->config.vgId);
×
2094
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
2095
    goto END;
×
2096
  }
2097

2098
  // this submit data is rawdata and previous data is metadata
2099
  if (pRequest->rawData != 0 && pRsp->createTableNum > 0 && rawList != NULL){
52,457,344✔
2100
    tqDebug("poll rawdata split,vgId:%d, this wal submit data is data and previous data is metadata", pTq->pVnode->config.vgId);
×
2101
    terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
×
2102
    goto END;
×
2103
  }
2104

2105
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
52,457,674✔
2106
  while (pReader->nextBlk < blockSz) {
104,957,550✔
2107
    tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList, submit.ver);
52,495,195✔
2108
    pReader->nextBlk++;
52,448,779✔
2109
  }
2110

2111
END:
52,459,531✔
2112
  tDecoderClear(&decoder);
52,460,607✔
2113
  tqReaderClearSubmitMsg(pReader);
52,462,581✔
2114
  taosArrayDestroy(rawList);
52,437,655✔
2115
  if (code != 0){
52,446,270✔
2116
    tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code));
×
2117
  }
2118
  return code;
52,446,270✔
2119
}
2120

2121
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
161,544,202✔
2122
  int32_t code = TDB_CODE_SUCCESS;
161,544,202✔
2123
  int32_t lino = 0;
161,544,202✔
2124
  tqDebug("%s called", __FUNCTION__);
161,544,202✔
2125
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
161,548,278✔
2126
  tOffsetCopy(&pRsp->reqOffset, &pOffset);
161,548,278✔
2127
  tOffsetCopy(&pRsp->rspOffset, &pOffset);
161,548,263✔
2128

2129
  pRsp->withTbName = 1;
161,548,667✔
2130
  pRsp->withSchema = 1;
161,548,667✔
2131
  pRsp->blockData = taosArrayInit(0, sizeof(void*));
161,548,667✔
2132
  TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);
161,547,878✔
2133

2134
  pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
161,546,824✔
2135
  TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno);
161,547,625✔
2136

2137
  pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
161,542,039✔
2138
  TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno);
161,542,248✔
2139

2140
  pRsp->blockSchema = taosArrayInit(0, sizeof(void*));
161,536,997✔
2141
  TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno);
161,543,138✔
2142

2143
END:
161,541,101✔
2144
  if (code != 0) {
161,541,101✔
2145
    tqError("%s failed at:%d, code:%s", __FUNCTION__, lino, tstrerror(code));
×
2146
    taosArrayDestroy(pRsp->blockData);
×
2147
    taosArrayDestroy(pRsp->blockDataLen);
×
2148
    taosArrayDestroy(pRsp->blockTbName);
×
2149
    taosArrayDestroy(pRsp->blockSchema);
×
2150
  }
2151
  return code;
161,538,307✔
2152
}
2153

2154
static int32_t tqExtractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
5,244,032✔
2155
                                     SRpcMsg* pMsg, bool* pBlockReturned) {
2156
  if (pOffsetVal == NULL || pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pBlockReturned == NULL) {
5,244,032✔
2157
    return TSDB_CODE_INVALID_PARA;
×
2158
  }
2159
  uint64_t   consumerId = pRequest->consumerId;
5,245,883✔
2160
  STqOffset* pOffset = NULL;
5,245,883✔
2161
  int32_t    code = tqMetaGetOffset(pTq, pRequest->subKey, &pOffset);
5,245,516✔
2162
  int32_t    vgId = TD_VID(pTq->pVnode);
5,240,674✔
2163

2164
  *pBlockReturned = false;
5,239,666✔
2165
  // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
2166
  if (code == 0) {
5,236,286✔
2167
    tOffsetCopy(pOffsetVal, &pOffset->val);
112,746✔
2168

2169
    char formatBuf[TSDB_OFFSET_LEN] = {0};
112,746✔
2170
    tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
112,746✔
2171
    tqDebug("tmq poll: consumer:0x%" PRIx64
112,746✔
2172
                ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64,
2173
            consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
2174
    return 0;
112,746✔
2175
  } else {
2176
    // no poll occurs in this vnode for this topic, let's seek to the right offset value.
2177
    if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
5,123,540✔
2178
      if (pRequest->useSnapshot) {
5,098,598✔
2179
        tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
4,818,198✔
2180
                consumerId, pHandle->subKey, vgId);
2181
        if (pHandle->fetchMeta) {
4,825,084✔
2182
          tqOffsetResetToMeta(pOffsetVal, 0);
2183
        } else {
2184
          SValue val = {0};
4,816,761✔
2185
          tqOffsetResetToData(pOffsetVal, 0, 0, val);
×
2186
        }
2187
      } else {
2188
        walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
277,821✔
2189
        tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer);
278,195✔
2190
      }
2191
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
29,754✔
2192
      walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
19,038✔
2193
      SMqDataRsp dataRsp = {0};
19,038✔
2194
      tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1);
19,038✔
2195

2196
      code = tqInitDataRsp(&dataRsp, *pOffsetVal);
19,038✔
2197
      if (code != 0) {
19,038✔
2198
        return code;
×
2199
      }
2200
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
19,038✔
2201
              pHandle->subKey, vgId, dataRsp.rspOffset.version);
2202
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, (pRequest->rawData == 1) ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
19,038✔
2203
      tDeleteMqDataRsp(&dataRsp);
19,038✔
2204

2205
      *pBlockReturned = true;
19,038✔
2206
      return code;
19,038✔
2207
    } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) {
10,716✔
2208
      tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
10,716✔
2209
                  " in vg %d, subkey %s, reset none failed",
2210
              pHandle->subKey, consumerId, vgId, pRequest->subKey);
2211
      return TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
11,104✔
2212
    }
2213
  }
2214

2215
  return 0;
5,102,239✔
2216
}
2217

2218
static int32_t tqExtractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
205,581,337✔
2219
                                                   SRpcMsg* pMsg, STqOffsetVal* pOffset) {
2220
  int32_t    code = TDB_CODE_SUCCESS;
205,581,337✔
2221
  int32_t    lino = 0;
205,581,337✔
2222
  tqDebug("%s called", __FUNCTION__ );
205,581,337✔
2223
  uint64_t consumerId = pRequest->consumerId;
205,581,581✔
2224
  int32_t  vgId = TD_VID(pTq->pVnode);
205,587,084✔
2225
  terrno = 0;
205,587,446✔
2226

2227
  SMqDataRsp dataRsp = {0};
205,587,413✔
2228
  code = tqInitDataRsp(&dataRsp, *pOffset);
205,587,477✔
2229
  TSDB_CHECK_CODE(code, lino, end);
205,580,774✔
2230

2231
  code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
205,580,774✔
2232
  TSDB_CHECK_CODE(code, lino, end);
205,577,245✔
2233

2234
  code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
205,577,245✔
2235
  if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
205,586,404✔
2236
    goto end;
4,763,844✔
2237
  }
2238

2239
  if (terrno == TSDB_CODE_TMQ_FETCH_TIMEOUT && dataRsp.blockNum == 0) {
200,822,560✔
2240
    dataRsp.timeout = true;
×
2241
  }
2242
  
2243
  // reqOffset represents the current date offset, may be changed if wal not exists
2244
  tOffsetCopy(&dataRsp.reqOffset, pOffset);
200,823,574✔
2245
  code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
200,822,470✔
2246

2247
end:
205,564,281✔
2248
  {
2249
    char buf[TSDB_OFFSET_LEN] = {0};
205,567,521✔
2250
    tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
205,569,721✔
2251
    if (code != 0){
205,557,758✔
2252
      tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
4,763,844✔
2253
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
2254
    } else {
2255
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
200,793,914✔
2256
              consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
2257
    }
2258

2259
    tDeleteMqDataRsp(&dataRsp);
205,591,774✔
2260
    return code;
205,583,010✔
2261
  }
2262
}
2263

2264
#define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC)                                               \
2265
  SDecoder decoder = {0};                                                                                  \
2266
  TYPE     req = {0};                                                                                      \
2267
  void*    data = POINTER_SHIFT(pHead->body, sizeof(SMsgHead));                                            \
2268
  int32_t  len = pHead->bodyLen - sizeof(SMsgHead);                                                        \
2269
  tDecoderInit(&decoder, data, len);                                                                       \
2270
  if (DECODE_FUNC(&decoder, &req) == 0 && (req.source & TD_REQ_FROM_TAOX) != 0) {                          \
2271
    tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, jump meta for, vgId:%d offset %" PRId64 \
2272
            " msgType %d",                                                                                 \
2273
            pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);                        \
2274
    fetchVer++;                                                                                            \
2275
    DELETE_FUNC(&req);                                                                                     \
2276
    tDecoderClear(&decoder);                                                                               \
2277
    continue;                                                                                              \
2278
  }                                                                                                        \
2279
  DELETE_FUNC(&req);                                                                                       \
2280
  tDecoderClear(&decoder);
2281

2282
static void tqDeleteCommon(void* parm) {}
90,521✔
2283

2284
#define POLL_RSP_TYPE(pRequest,taosxRsp) \
2285
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : \
2286
(pRequest->rawData == 1 ? TMQ_MSG_TYPE__POLL_RAW_DATA_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP)
2287

2288
static int32_t tqBuildBatchMeta(SMqBatchMetaRsp *btMetaRsp, int16_t type, int32_t bodyLen, void* body){
2,064✔
2289
  int32_t         code = 0;
2,064✔
2290

2291
  if (!btMetaRsp->batchMetaReq) {
2,064✔
2292
    btMetaRsp->batchMetaReq = taosArrayInit(4, POINTER_BYTES);
2,064✔
2293
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaReq);
2,064✔
2294
    btMetaRsp->batchMetaLen = taosArrayInit(4, sizeof(int32_t));
2,064✔
2295
    TQ_NULL_GO_TO_END(btMetaRsp->batchMetaLen);
2,064✔
2296
  }
2297

2298
  SMqMetaRsp tmpMetaRsp = {0};
2,064✔
2299
  tmpMetaRsp.resMsgType = type;
2,064✔
2300
  tmpMetaRsp.metaRspLen = bodyLen;
2,064✔
2301
  tmpMetaRsp.metaRsp = body;
2,064✔
2302
  uint32_t len = 0;
2,064✔
2303
  tEncodeSize(tEncodeMqMetaRsp, &tmpMetaRsp, len, code);
2,064✔
2304
  if (TSDB_CODE_SUCCESS != code) {
2,064✔
2305
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
2306
    goto END;
×
2307
  }
2308
  int32_t tLen = sizeof(SMqRspHead) + len;
2,064✔
2309
  void*   tBuf = taosMemoryCalloc(1, tLen);
2,064✔
2310
  TQ_NULL_GO_TO_END(tBuf);
2,064✔
2311
  void*    metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead));
2,064✔
2312
  SEncoder encoder = {0};
2,064✔
2313
  tEncoderInit(&encoder, metaBuff, len);
2,064✔
2314
  code = tEncodeMqMetaRsp(&encoder, &tmpMetaRsp);
2,064✔
2315
  tEncoderClear(&encoder);
2,064✔
2316

2317
  if (code < 0) {
2,064✔
2318
    tqError("tmq extract meta from log, tEncodeMqMetaRsp error");
×
2319
    goto END;
×
2320
  }
2321
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaReq, &tBuf));
4,128✔
2322
  TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp->batchMetaLen, &tLen));
4,128✔
2323

2324
END:
2,064✔
2325
  return code;
2,064✔
2326
}
2327

2328
static int32_t tqBuildCreateTbBatchReqBinary(SMqDataRsp *taosxRsp, void** pBuf, int32_t *len){
1,404✔
2329
  int32_t code = 0;
1,404✔
2330
  SVCreateTbBatchReq pReq = {0};
1,404✔
2331
  pReq.nReqs = taosArrayGetSize(taosxRsp->createTableReq);
1,404✔
2332
  pReq.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
1,404✔
2333
  TQ_NULL_GO_TO_END(pReq.pArray);
1,404✔
2334
  for (int i = 0; i < taosArrayGetSize(taosxRsp->createTableReq); i++){
3,798✔
2335
    void   *createTableReq = taosArrayGetP(taosxRsp->createTableReq, i);
2,394✔
2336
    TQ_NULL_GO_TO_END(taosArrayPush(pReq.pArray, createTableReq));
4,788✔
2337
  }
2338
  tEncodeSize(tEncodeSVCreateTbBatchReq, &pReq, *len, code);
1,404✔
2339
  if (code < 0) {
1,404✔
2340
    goto END;
×
2341
  }
2342
  *len += sizeof(SMsgHead);
1,404✔
2343
  *pBuf = taosMemoryMalloc(*len);
1,404✔
2344
  TQ_NULL_GO_TO_END(*pBuf);
1,404✔
2345
  SEncoder coder = {0};
1,404✔
2346
  tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), *len);
1,404✔
2347
  code = tEncodeSVCreateTbBatchReq(&coder, &pReq);
1,404✔
2348
  tEncoderClear(&coder);
1,404✔
2349

2350
END:
1,404✔
2351
  taosArrayDestroy(pReq.pArray);
1,404✔
2352
  return code;
1,404✔
2353
}
2354

2355
#define SEND_BATCH_META_RSP \
2356
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);\
2357
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);\
2358
goto END;
2359

2360
#define SEND_DATA_RSP \
2361
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);\
2362
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);\
2363
goto END;
2364

2365
static int32_t tqExtractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
161,546,370✔
2366
                                                  SRpcMsg* pMsg, STqOffsetVal* offset) {
2367
  int32_t         vgId = TD_VID(pTq->pVnode);
161,546,370✔
2368
  SMqDataRsp      taosxRsp = {0};
161,547,128✔
2369
  SMqBatchMetaRsp btMetaRsp = {0};
161,543,596✔
2370
  int32_t         code = 0;
161,547,100✔
2371

2372
  TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
161,547,100✔
2373
  if (offset->type != TMQ_OFFSET__LOG) {
161,535,673✔
2374
    TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
55,569✔
2375

2376
    if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
55,569✔
2377
      code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
6,979✔
2378
      tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
6,979✔
2379
              pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts);
2380
      goto END;
6,979✔
2381
    }
2382

2383
    tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64,
48,590✔
2384
            pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
2385
    if (taosxRsp.blockNum > 0) {
48,590✔
2386
      code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
45,659✔
2387
      goto END;
45,659✔
2388
    } else {
2389
      tOffsetCopy(offset, &taosxRsp.rspOffset);
2,931✔
2390
    }
2391
  }
2392

2393
  if (offset->type == TMQ_OFFSET__LOG) {
161,485,284✔
2394
    walReaderVerifyOffset(pHandle->pWalReader, offset);
161,492,703✔
2395
    int64_t fetchVer = offset->version;
161,487,701✔
2396

2397
    uint64_t st = taosGetTimestampMs();
161,480,420✔
2398
    int      totalRows = 0;
161,480,420✔
2399
    int32_t  totalMetaRows = 0;
161,476,854✔
2400
    while (1) {
52,300,558✔
2401
      int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
213,777,412✔
2402
      if (savedEpoch > pRequest->epoch) {
213,776,430✔
2403
        tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, savedEpoch error, vgId:%d offset %" PRId64,
×
2404
                pRequest->consumerId, pRequest->epoch, vgId, fetchVer);
2405
        code = TSDB_CODE_TQ_INTERNAL_ERROR;
×
2406
        goto END;
×
2407
      }
2408

2409
      if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
213,756,396✔
2410
        if (totalMetaRows > 0) {
161,149,414✔
2411
          SEND_BATCH_META_RSP
1,320✔
2412
        }
2413
        SEND_DATA_RSP
322,297,550✔
2414
      }
2415

2416
      SWalCont* pHead = &pHandle->pWalReader->pHead->head;
52,688,190✔
2417
      tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
52,688,190✔
2418
              pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
2419

2420
      // process meta
2421
      if (pHead->msgType != TDMT_VND_SUBMIT) {
52,688,190✔
2422
        if (totalRows > 0) {
219,760✔
2423
          SEND_DATA_RSP
35,252✔
2424
        }
2425

2426
        if ((pRequest->sourceExcluded & TD_REQ_FROM_TAOX) != 0) {
202,134✔
2427
          if (pHead->msgType == TDMT_VND_CREATE_TABLE) {
177,311✔
2428
            PROCESS_EXCLUDED_MSG(SVCreateTbBatchReq, tDecodeSVCreateTbBatchReq, tDeleteSVCreateTbBatchReq)
57,420✔
2429
          } else if (pHead->msgType == TDMT_VND_ALTER_TABLE) {
119,891✔
2430
            PROCESS_EXCLUDED_MSG(SVAlterTbReq, tDecodeSVAlterTbReq, destroyAlterTbReq)
16,170✔
2431
          } else if (pHead->msgType == TDMT_VND_CREATE_STB || pHead->msgType == TDMT_VND_ALTER_STB) {
130,552✔
2432
            PROCESS_EXCLUDED_MSG(SVCreateStbReq, tDecodeSVCreateStbReq, tqDeleteCommon)
88,871✔
2433
          } else if (pHead->msgType == TDMT_VND_DELETE) {
14,850✔
2434
            PROCESS_EXCLUDED_MSG(SDeleteRes, tDecodeDeleteRes, tqDeleteCommon)
1,650✔
2435
          }
2436
        }
2437

2438
        tqDebug("fetch meta msg, ver:%" PRId64 ", vgId:%d, type:%s, enable batch meta:%d", pHead->version, vgId,
90,264✔
2439
                TMSG_INFO(pHead->msgType), pRequest->enableBatchMeta);
2440
        if (!pRequest->enableBatchMeta && !pRequest->useSnapshot) {
90,264✔
2441
          SMqMetaRsp metaRsp = {0};
89,604✔
2442
          tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer + 1);
89,604✔
2443
          metaRsp.resMsgType = pHead->msgType;
89,604✔
2444
          metaRsp.metaRspLen = pHead->bodyLen;
89,604✔
2445
          metaRsp.metaRsp = pHead->body;
89,604✔
2446
          code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
89,604✔
2447
          goto END;
89,604✔
2448
        }
2449
        code = tqBuildBatchMeta(&btMetaRsp, pHead->msgType, pHead->bodyLen, pHead->body);
660✔
2450
        fetchVer++;
660✔
2451
        if (code != 0){
660✔
2452
          goto END;
×
2453
        }
2454
        totalMetaRows++;
660✔
2455
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= pRequest->minPollRows) || (taosGetTimestampMs() - st > pRequest->timeout)) {
1,320✔
2456
          SEND_BATCH_META_RSP
×
2457
        }
2458
        continue;
660✔
2459
      }
2460

2461
      if (totalMetaRows > 0 && pHandle->fetchMeta != ONLY_META) {
52,467,365✔
2462
        SEND_BATCH_META_RSP
×
2463
      }
2464

2465
      // process data
2466
      SPackedData submit = {
157,404,902✔
2467
          .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
52,467,365✔
2468
          .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
52,468,430✔
2469
          .ver = pHead->version,
52,468,042✔
2470
      };
2471

2472
      TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
52,468,430✔
2473

2474
      if (pHandle->fetchMeta == ONLY_META && taosArrayGetSize(taosxRsp.createTableReq) > 0){
52,424,125✔
2475
        int32_t len = 0;
1,404✔
2476
        void *pBuf = NULL;
1,404✔
2477
        code = tqBuildCreateTbBatchReqBinary(&taosxRsp, &pBuf, &len);
1,404✔
2478
        if (code == 0){
1,404✔
2479
          code = tqBuildBatchMeta(&btMetaRsp, TDMT_VND_CREATE_TABLE, len, pBuf);
1,404✔
2480
        }
2481
        taosMemoryFree(pBuf);
1,404✔
2482
        for (int i = 0; i < taosArrayGetSize(taosxRsp.createTableReq); i++) {
3,798✔
2483
          void* pCreateTbReq = taosArrayGetP(taosxRsp.createTableReq, i);
2,394✔
2484
          if (pCreateTbReq != NULL) {
2,394✔
2485
            tDestroySVSubmitCreateTbReq(pCreateTbReq, TSDB_MSG_FLG_DECODE);
2,394✔
2486
          }
2487
          taosMemoryFree(pCreateTbReq);
2,394✔
2488
        }
2489
        taosArrayDestroy(taosxRsp.createTableReq);
1,404✔
2490
        taosxRsp.createTableReq = NULL;
1,404✔
2491
        fetchVer++;
1,404✔
2492
        if (code != 0){
1,404✔
2493
          goto END;
×
2494
        }
2495
        totalMetaRows++;
1,404✔
2496
        if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= pRequest->minPollRows) ||
1,404✔
2497
            (taosGetTimestampMs() - st > pRequest->timeout) ||
1,404✔
2498
            (!pRequest->enableBatchMeta && !pRequest->useSnapshot)) {
1,404✔
2499
          SEND_BATCH_META_RSP
2,808✔
2500
        }
2501
        continue;
×
2502
      }
2503

2504
      if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
52,446,676✔
2505
          (taosGetTimestampMs() - st > pRequest->timeout) ||
52,202,967✔
2506
          (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
52,209,020✔
2507
                                      terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
×
2508
        if (terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT){
262,602✔
2509
          terrno = 0;
×
2510
        } else{
2511
          fetchVer++;
237,609✔
2512
        }
2513
        SEND_DATA_RSP
475,218✔
2514
      } else {
2515
        fetchVer++;
52,181,491✔
2516
      }
2517
    }
2518
  }
2519

2520
END:
161,517,934✔
2521
  if (code != 0){
161,521,878✔
2522
    tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
×
2523
            pRequest->subKey);
2524
  }
2525
  tDeleteMqBatchMetaRsp(&btMetaRsp);
161,521,878✔
2526
  tDeleteSTaosxRsp(&taosxRsp);
161,506,598✔
2527
  return code;
161,485,920✔
2528
}
2529

2530
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
367,163,363✔
2531
  if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL) {
367,163,363✔
2532
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2533
  }
2534
  int32_t      code = 0;
367,165,782✔
2535
  STqOffsetVal reqOffset = {0};
367,165,782✔
2536
  tOffsetCopy(&reqOffset, &pRequest->reqOffset);
367,165,782✔
2537

2538
  // reset the offset if needed
2539
  if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type)) {
367,163,183✔
2540
    bool blockReturned = false;
5,245,883✔
2541
    code = tqExtractResetOffsetVal(&reqOffset, pTq, pHandle, pRequest, pMsg, &blockReturned);
5,245,883✔
2542
    if (code != 0) {
5,245,483✔
2543
      goto END;
11,104✔
2544
    }
2545

2546
    // empty block returned, quit
2547
    if (blockReturned) {
5,234,379✔
2548
      goto END;
19,038✔
2549
    }
2550
  } else if (reqOffset.type == 0) {  // use the consumer specified offset
361,918,857✔
2551
    uError("req offset type is 0");
×
2552
    code = TSDB_CODE_TMQ_INVALID_MSG;
×
2553
    goto END;
×
2554
  }
2555

2556
  if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
367,134,198✔
2557
    code = tqExtractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
205,585,625✔
2558
  } else {
2559
    code = tqExtractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset);
161,547,878✔
2560
  }
2561

2562
END:
367,103,442✔
2563
  if (code != 0){
367,114,564✔
2564
    uError("failed to extract data for mq, msg:%s", tstrerror(code));
4,774,592✔
2565
  }
2566
  tOffsetDestroy(&reqOffset);
367,114,564✔
2567
  return code;
367,112,378✔
2568
}
2569

2570
static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
9,043✔
2571
                               const SMqBatchMetaRsp* pRsp, int32_t vgId) {
2572
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
9,043✔
2573
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2574
  }
2575
  int32_t len = 0;
9,043✔
2576
  int32_t code = 0;
9,043✔
2577
  tEncodeSize(tEncodeMqBatchMetaRsp, pRsp, len, code);
9,043✔
2578
  if (code < 0) {
9,043✔
2579
    return TAOS_GET_TERRNO(code);
×
2580
  }
2581
  int32_t tlen = sizeof(SMqRspHead) + len;
9,043✔
2582
  void*   buf = rpcMallocCont(tlen);
9,043✔
2583
  if (buf == NULL) {
9,043✔
2584
    return TAOS_GET_TERRNO(terrno);
×
2585
  }
2586

2587
  int64_t sver = 0, ever = 0;
9,043✔
2588
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
9,043✔
2589
  tqInitMqRspHead(buf, TMQ_MSG_TYPE__POLL_BATCH_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
9,043✔
2590

2591
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
9,043✔
2592

2593
  SEncoder encoder = {0};
9,043✔
2594
  tEncoderInit(&encoder, abuf, len);
9,043✔
2595
  code = tEncodeMqBatchMetaRsp(&encoder, pRsp);
9,043✔
2596
  tEncoderClear(&encoder);
9,043✔
2597
  if (code < 0) {
9,043✔
2598
    rpcFreeCont(buf);
×
2599
    return TAOS_GET_TERRNO(code);
×
2600
  }
2601
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
9,043✔
2602

2603
  tmsgSendRsp(&resp);
9,043✔
2604
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type: batch meta, size:%ld offset type:%d",
9,043✔
2605
          vgId, pReq->consumerId, pReq->epoch, taosArrayGetSize(pRsp->batchMetaReq), pRsp->rspOffset.type);
2606

2607
  return 0;
9,043✔
2608
}
2609

2610
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp,
89,604✔
2611
                          int32_t vgId) {
2612
  if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
89,604✔
2613
    return TSDB_CODE_TMQ_INVALID_MSG;
×
2614
  }
2615
  int32_t len = 0;
89,604✔
2616
  int32_t code = 0;
89,604✔
2617
  tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
89,604✔
2618
  if (code < 0) {
89,604✔
2619
    return TAOS_GET_TERRNO(code);
×
2620
  }
2621
  int32_t tlen = sizeof(SMqRspHead) + len;
89,604✔
2622
  void*   buf = rpcMallocCont(tlen);
89,604✔
2623
  if (buf == NULL) {
89,604✔
2624
    return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
×
2625
  }
2626

2627
  int64_t sver = 0, ever = 0;
89,604✔
2628
  walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
89,604✔
2629
  tqInitMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
89,604✔
2630

2631
  void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
89,604✔
2632

2633
  SEncoder encoder = {0};
89,604✔
2634
  tEncoderInit(&encoder, abuf, len);
89,604✔
2635
  code = tEncodeMqMetaRsp(&encoder, pRsp);
89,604✔
2636
  tEncoderClear(&encoder);
89,604✔
2637
  if (code < 0) {
89,604✔
2638
    rpcFreeCont(buf);
×
2639
    return TAOS_GET_TERRNO(code);
×
2640
  }
2641

2642
  SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
89,604✔
2643

2644
  tmsgSendRsp(&resp);
89,604✔
2645
  tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
89,604✔
2646
          pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
2647

2648
  return 0;
89,604✔
2649
}
2650

2651

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc