• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5002

24 Mar 2026 01:11AM UTC coverage: 72.671% (+0.4%) from 72.254%
#5002

push

travis-ci

web-flow
fix: possible memory leak in tdb; (#34872)

1 of 20 new or added lines in 2 files covered. (5.0%)

539 existing lines in 124 files now uncovered.

227507 of 313065 relevant lines covered (72.67%)

147556935.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.71
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "taoserror.h"
17
#include "tarray.h"
18
#include "tdef.h"
19
#include "thash.h"
20
#include "tmsg.h"
21
#include "tpriv.h"
22
#include "tq.h"
23

24
static int32_t tqRetrieveCols(STqReader *pReader, SSDataBlock *pRes, SHashObj* pCol2SlotId);
25

26
static void processCreateTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
11,560✔
27
  int32_t code = 0;
11,560✔
28
  int32_t lino = 0;
11,560✔
29
  int32_t        needRebuild = 0;
11,560✔
30
  SVCreateTbReq* pCreateReq = NULL;
11,560✔
31
  SVCreateTbBatchReq reqNew = {0};
11,560✔
32
  void* buf = NULL;
11,560✔
33
  SVCreateTbBatchReq req = {0};
11,560✔
34
  code = tDecodeSVCreateTbBatchReq(dcoder, &req);
11,560✔
35
  if (code < 0) {
11,560✔
36
    lino = __LINE__;
×
37
    goto end;
×
38
  }
39

40
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
23,455✔
41
    pCreateReq = req.pReqs + iReq;
11,895✔
42
    if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) && 
11,895✔
43
         pCreateReq->ctb.suid == tbSuid &&
15,820✔
44
         taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {  
6,450✔
45
      needRebuild++;
4,685✔
46
    }
47
  }
48
  if (needRebuild == 0) {
11,560✔
49
    // do nothing
50
  } else if (needRebuild == req.nReqs) {
4,685✔
51
    *realTbSuid = tbSuid;
4,350✔
52
  } else {
53
    *realTbSuid = tbSuid;
335✔
54
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
335✔
55
    if (reqNew.pArray == NULL) {
335✔
56
      code = terrno;
×
57
      lino = __LINE__;
×
58
      goto end;
×
59
    }
60
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
1,005✔
61
      pCreateReq = req.pReqs + iReq;
670✔
62
      if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) &&
670✔
63
          pCreateReq->ctb.suid == tbSuid &&
1,340✔
64
          taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
670✔
65
        reqNew.nReqs++;
335✔
66
        if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
670✔
67
          code = terrno;
×
68
          lino = __LINE__;
×
69
          goto end;
×
70
        }
71
      }
72
    }
73

74
    int     tlen = 0;
335✔
75
    tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, code);
335✔
76
    buf = taosMemoryMalloc(tlen);
335✔
77
    if (NULL == buf || code < 0) {
335✔
78
      lino = __LINE__;
×
79
      goto end;
×
80
    }
81
    SEncoder coderNew = {0};
335✔
82
    tEncoderInit(&coderNew, buf, tlen);
335✔
83
    code = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
335✔
84
    tEncoderClear(&coderNew);
335✔
85
    if (code < 0) {
335✔
86
      lino = __LINE__;
×
87
      goto end;
×
88
    }
89
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
335✔
90
    pHead->bodyLen = tlen + sizeof(SMsgHead);
335✔
91
  }
92

93
end:
11,560✔
94
  taosMemoryFree(buf);
11,560✔
95
  taosArrayDestroy(reqNew.pArray);
11,560✔
96
  tDeleteSVCreateTbBatchReq(&req);
11,560✔
97
  if (code < 0) {
11,560✔
98
    tqError("processCreateTbMsg failed, code:%d, line:%d", code, lino);
×
99
  }
100
}
11,560✔
101

102
static void processAlterTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
6,845✔
103
  SVAlterTbReq req = {0};
6,845✔
104
  SVAlterTbReq reqNew = {0};
6,845✔
105
  SMetaReader mr = {0};
6,845✔
106
  void* buf = NULL;
6,845✔
107
  int32_t lino = 0;
6,845✔
108
  int32_t code = tDecodeSVAlterTbReq(dcoder, &req);
6,845✔
109
  if (code < 0) {
6,845✔
110
    tqError("vgId:%d, processAlterTbMsg failed to decode SVAlterTbReq, code:%s",
×
111
            TD_VID(pReader->pVnode), tstrerror(code));
112
    lino = __LINE__;
×
113
    goto end;
×
114
  }
115

116
  if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
6,845✔
117
    int32_t needRebuild = 0;
3,925✔
118
    for (int32_t i = 0; i < taosArrayGetSize(req.tables); i++) {
9,675✔
119
      SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
5,750✔
120
      if (pTable == NULL || pTable->tbName == NULL) {
5,750✔
121
        tqWarn("vgId:%d, processAlterTbMsg Type 1 table[%d] has invalid name, skip",
×
122
               TD_VID(pReader->pVnode), i);
123
        continue;
×
124
      }
125

126
      metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
5,750✔
127
      code = metaGetTableEntryByName(&mr, pTable->tbName);
5,750✔
128
      if (code < 0) {
5,750✔
129
        // Table not found, skip
130
        tqDebug("vgId:%d, processAlterTbMsg Type 1 table %s not found, skip",
×
131
                TD_VID(pReader->pVnode), pTable->tbName);
132
        metaReaderClear(&mr);
×
133
        continue;
×
134
      }
135

136
      if (mr.me.ctbEntry.suid == tbSuid &&
11,500✔
137
          taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
5,750✔
138
        needRebuild++;
4,685✔
139
      }
140
      metaReaderClear(&mr);
5,750✔
141
    }
142

143
    if (needRebuild == 0) {
3,925✔
144
      // No tables in subscription scope, skip message
145
      tqDebug("vgId:%d, processAlterTbMsg Type 1: 0/%d tables in subscription, skip",
335✔
146
              TD_VID(pReader->pVnode), (int)taosArrayGetSize(req.tables));
147
    } else if (needRebuild == taosArrayGetSize(req.tables)) {
3,590✔
148
      // All tables in subscription scope, forward complete message
149
      *realTbSuid = tbSuid;
2,860✔
150
      tqDebug("vgId:%d, processAlterTbMsg Type 1: %d/%d tables in subscription, forward all",
2,860✔
151
              TD_VID(pReader->pVnode), needRebuild, (int)taosArrayGetSize(req.tables));
152
    } else {
153
      // Partial match: rebuild message with only subscribed tables
154
      *realTbSuid = tbSuid;
730✔
155
      tqDebug("vgId:%d, processAlterTbMsg Type 1: %d/%d tables in subscription, rebuild message",
730✔
156
              TD_VID(pReader->pVnode), needRebuild, (int)taosArrayGetSize(req.tables));
157

158
      // Build filtered message
159
      reqNew.action = req.action;
730✔
160
      reqNew.tbName = req.tbName;
730✔
161
      reqNew.tables = taosArrayInit(needRebuild, sizeof(SUpdateTableTagVal));
730✔
162
      if (reqNew.tables == NULL) {
730✔
163
        code = terrno;
×
164
        tqError("vgId:%d, processAlterTbMsg failed to allocate filtered tables array, code:%s",
×
165
                TD_VID(pReader->pVnode), tstrerror(code));
166
        lino = __LINE__;
×
167
        goto end;
×
168
      }
169

170
      // Collect only subscribed tables
171
      for (int32_t i = 0; i < taosArrayGetSize(req.tables); i++) {
2,190✔
172
        SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
1,460✔
173
        if (pTable == NULL || pTable->tbName == NULL) {
1,460✔
174
          continue;
×
175
        }
176
        metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
1,460✔
177
        code = metaGetTableEntryByName(&mr, pTable->tbName);
1,460✔
178
        if (code < 0) {
1,460✔
179
          metaReaderClear(&mr);
×
180
          continue;
×
181
        }
182

183
        if (mr.me.ctbEntry.suid == tbSuid &&
2,920✔
184
            taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
1,460✔
185
          if (taosArrayPush(reqNew.tables, pTable) == NULL) {
1,460✔
186
            code = terrno;
×
187
            tqError("vgId:%d, processAlterTbMsg failed to add table %s to filtered array, code:%s",
×
188
                    TD_VID(pReader->pVnode), pTable->tbName, tstrerror(code));
189
            lino = __LINE__;
×
190
            metaReaderClear(&mr);
×
191
            goto end;
×
192
          }
193
        }
194
        metaReaderClear(&mr);
1,460✔
195
      }
196

197
      // Encode filtered message
198
      int tlen = 0;
730✔
199
      tEncodeSize(tEncodeSVAlterTbReq, &reqNew, tlen, code);
730✔
200
      if (code < 0) {
730✔
201
        tqError("vgId:%d, processAlterTbMsg failed to calculate encode size, code:%s",
×
202
                TD_VID(pReader->pVnode), tstrerror(code));
203
        lino = __LINE__;
×
204
        goto end;
×
205
      }
206

207
      buf = taosMemoryMalloc(tlen);
730✔
208
      if (NULL == buf) {
730✔
209
        code = terrno;
×
210
        tqError("vgId:%d, processAlterTbMsg failed to allocate encode buffer size:%d, code:%s",
×
211
                TD_VID(pReader->pVnode), tlen, tstrerror(code));
212
        lino = __LINE__;
×
213
        goto end;
×
214
      }
215

216
      SEncoder coderNew = {0};
730✔
217
      tEncoderInit(&coderNew, buf, tlen);
730✔
218
      code = tEncodeSVAlterTbReq(&coderNew, &reqNew);
730✔
219
      tEncoderClear(&coderNew);
730✔
220
      if (code != 0) {
730✔
221
        tqError("vgId:%d, processAlterTbMsg failed to encode filtered message, code:%s",
×
222
                TD_VID(pReader->pVnode), tstrerror(code));
223
        lino = __LINE__;
×
224
        goto end;
×
225
      }
226
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
730✔
227
      pHead->bodyLen = tlen + sizeof(SMsgHead);
730✔
228
      tqInfo("vgId:%d, processAlterTbMsg Type 1 rebuilt message with %d/%d tables",
730✔
229
             TD_VID(pReader->pVnode), needRebuild, (int)taosArrayGetSize(req.tables));
230
    }
231

232
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
2,920✔
233
    // Type 2: WHERE condition on super table
234
    metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
730✔
235
    code = metaGetTableEntryByName(&mr, req.tbName);
730✔
236
    if (code < 0) {
730✔
237
      tqError("vgId:%d, processAlterTbMsg Type 2 failed to get super table %s, code:%s",
×
238
              TD_VID(pReader->pVnode), req.tbName, tstrerror(code));
239
      lino = __LINE__;
×
240
      metaReaderClear(&mr);
×
241
      goto end;
×
242
    }
243

244
    // For Type 2, the super table is specified in req.tbName
245
    // The actual filtering happens through tbIdHash which already contains
246
    // only the child tables matching the topic's WHERE condition
247
    if (mr.me.type == TSDB_SUPER_TABLE) {
730✔
248
      *realTbSuid = mr.me.uid;
730✔
249
      tqDebug("vgId:%d, processAlterTbMsg Type 2: super table %" PRId64 " (WHERE condition filtering via tbIdHash)",
730✔
250
              TD_VID(pReader->pVnode), mr.me.uid);
251
    }
252
    metaReaderClear(&mr);
730✔
253
  } else {
254
    // Legacy single-table tag modification
255
    metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
2,190✔
256
    code = metaGetTableEntryByName(&mr, req.tbName);
2,190✔
257
    if (code < 0) {
2,190✔
258
      tqDebug("vgId:%d, processAlterTbMsg legacy type failed to get table %s, code:%s",
×
259
              TD_VID(pReader->pVnode), req.tbName ? req.tbName : "(null)", tstrerror(code));
260
      lino = __LINE__;
×
261
      metaReaderClear(&mr);
×
262
      goto end;
×
263
    }
264
    if (taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
2,190✔
265
      *realTbSuid = mr.me.ctbEntry.suid;
730✔
266
    }
267
    metaReaderClear(&mr);
2,190✔
268
  }
269

270
end:
6,845✔
271
  taosMemoryFree(buf);
6,845✔
272
  taosArrayDestroy(reqNew.tables);
6,845✔
273
  destroyAlterTbReq(&req);
6,845✔
274
  if (code < 0) {
6,845✔
275
    tqError("vgId:%d, processAlterTbMsg failed at line:%d, code:%s",
×
276
            TD_VID(pReader->pVnode), lino, tstrerror(code));
277
  }
278
} 
6,845✔
279

280
static void processDropTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
×
281
  SVDropTbBatchReq req = {0};
×
282
  SVDropTbBatchReq reqNew = {0};
×
283
  void* buf = NULL;
×
284
  int32_t lino = 0;
×
285
  int32_t code = tDecodeSVDropTbBatchReq(dcoder, &req);
×
286
  if (code < 0) {
×
287
    lino = __LINE__;
×
288
    goto end;
×
289
  }
290

291
  int32_t      needRebuild = 0;
×
292
  SVDropTbReq* pDropReq = NULL;
×
293
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
294
    pDropReq = req.pReqs + iReq;
×
295

296
    if (pDropReq->suid == tbSuid &&
×
297
        taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
298
      needRebuild++;
×
299
    }
300
  }
301
  if (needRebuild == 0) {
×
302
    // do nothing
303
  } else if (needRebuild == req.nReqs) {
×
304
    *realTbSuid = tbSuid;
×
305
  } else {
306
    *realTbSuid = tbSuid;
×
307
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
308
    if (reqNew.pArray == NULL) {
×
309
      code = terrno;
×
310
      lino = __LINE__;
×
311
      goto end;
×
312
    }
313
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
314
      pDropReq = req.pReqs + iReq;
×
315
      if (pDropReq->suid == tbSuid &&
×
316
          taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
317
        reqNew.nReqs++;
×
318
        if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
319
          code = terrno;
×
320
          lino = __LINE__;
×
321
          goto end;
×
322
        }
323
      }
324
    }
325

326
    int     tlen = 0;
×
327
    tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, code);
×
328
    buf = taosMemoryMalloc(tlen);
×
329
    if (NULL == buf || code < 0) {
×
330
      lino = __LINE__;
×
331
      goto end;
×
332
    }
333
    SEncoder coderNew = {0};
×
334
    tEncoderInit(&coderNew, buf, tlen);
×
335
    code = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
336
    tEncoderClear(&coderNew);
×
337
    if (code != 0) {
×
338
      lino = __LINE__;
×
339
      goto end;
×
340
    }
341
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
342
    pHead->bodyLen = tlen + sizeof(SMsgHead);
×
343
  }
344

345
end:
×
346
  taosMemoryFree(buf);
×
347
  taosArrayDestroy(reqNew.pArray);
×
348
  if (code < 0) {
×
349
    tqError("processDropTbMsg failed, code:%d, line:%d", code, lino);
×
350
  }
351
}
×
352

353
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
38,342✔
354
  int32_t code = 0;
38,342✔
355
  int32_t lino = 0;
38,342✔
356
  if (pHandle == NULL || pHead == NULL) {
38,342✔
357
    return false;
×
358
  }
359
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
38,342✔
360
    return true;
16,347✔
361
  }
362

363
  STqExecHandle* pExec = &pHandle->execHandle;
21,995✔
364
  STqReader* pReader = pExec->pTqReader;
21,995✔
365

366
  int16_t msgType = pHead->msgType;
21,995✔
367
  char*   body = pHead->body;
21,995✔
368
  int32_t bodyLen = pHead->bodyLen;
21,995✔
369

370
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
21,995✔
371
  int64_t  realTbSuid = 0;
21,995✔
372
  SDecoder dcoder = {0};
21,995✔
373
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
21,995✔
374
  int32_t  len = bodyLen - sizeof(SMsgHead);
21,995✔
375
  tDecoderInit(&dcoder, data, len);
21,995✔
376

377
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
25,585✔
378
    SVCreateStbReq req = {0};
3,590✔
379
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
3,590✔
380
      goto end;
×
381
    }
382
    realTbSuid = req.suid;
3,590✔
383
  } else if (msgType == TDMT_VND_DROP_STB) {
18,405✔
384
    SVDropStbReq req = {0};
×
385
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
386
      goto end;
×
387
    }
388
    realTbSuid = req.suid;
×
389
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
18,405✔
390
    processCreateTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
11,560✔
391
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
6,845✔
392
    processAlterTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
6,845✔
393
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
394
    processDropTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
×
395
  } else if (msgType == TDMT_VND_DELETE) {
×
396
    SDeleteRes req = {0};
×
397
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
398
      goto end;
×
399
    }
400
    realTbSuid = req.suid;
×
401
  }
402

403
end:
21,995✔
404
  tDecoderClear(&dcoder);
21,995✔
405
  bool tmp = tbSuid == realTbSuid;
21,995✔
406
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
21,995✔
407
  return tmp;
21,995✔
408
}
409

410
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
52,928,294✔
411
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
52,928,294✔
412
    return -1;
×
413
  }
414
  int32_t code = -1;
53,025,686✔
415
  int32_t vgId = TD_VID(pTq->pVnode);
53,025,686✔
416
  int64_t id = pHandle->pWalReader->readerId;
53,045,471✔
417

418
  int64_t offset = *fetchOffset;
53,052,007✔
419
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
53,057,117✔
420
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
53,057,313✔
421
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
53,037,677✔
422

423
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
53,041,408✔
424
          ", 0x%" PRIx64,
425
          vgId, offset, lastVer, committedVer, appliedVer, id);
426

427
  while (offset <= appliedVer) {
56,087,069✔
428
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
50,956,120✔
429
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
430
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
431
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
432
      goto END;
×
433
    }
434

435
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
50,960,957✔
436
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
437

438
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
50,961,001✔
439
      code = walFetchBody(pHandle->pWalReader);
47,908,783✔
440
      goto END;
47,907,818✔
441
    } else {
442
      if (pHandle->fetchMeta != WITH_DATA) {
3,052,848✔
443
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
52,306✔
444
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
52,306✔
445
          code = walFetchBody(pHandle->pWalReader);
38,342✔
446
          if (code < 0) {
38,342✔
447
            goto END;
×
448
          }
449

450
          pHead = &(pHandle->pWalReader->pHead->head);
38,342✔
451
          if (isValValidForTable(pHandle, pHead)) {
38,342✔
452
            code = 0;
28,212✔
453
            goto END;
28,212✔
454
          } else {
455
            offset++;
10,130✔
456
            code = -1;
10,130✔
457
            continue;
10,130✔
458
          }
459
        }
460
      }
461
      code = walSkipFetchBody(pHandle->pWalReader);
3,014,506✔
462
      if (code < 0) {
3,014,506✔
463
        goto END;
×
464
      }
465
      offset++;
3,014,506✔
466
    }
467
    code = -1;
3,014,506✔
468
  }
469

470
END:
5,130,949✔
471
  *fetchOffset = offset;
53,066,979✔
472
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
53,060,525✔
473
          ", applied:%" PRId64 ", 0x%" PRIx64,
474
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
475
  return code;
53,062,069✔
476
}
477

478
bool tqGetTablePrimaryKey(STqReader* pReader) {
6,162,855✔
479
  if (pReader == NULL) {
6,162,855✔
480
    return false;
×
481
  }
482
  return pReader->hasPrimaryKey;
6,162,855✔
483
}
484

485
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
46,583✔
486
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
46,583✔
487

488
  if (pReader == NULL) {
46,583✔
489
    return;
×
490
  }
491
  bool            ret = false;
46,583✔
492
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnode->pMeta, uid, -1, 1, NULL, 0);
46,583✔
493
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
46,583✔
494
    ret = true;
959✔
495
  }
496
  tDeleteSchemaWrapper(schema);
497
  pReader->hasPrimaryKey = ret;
46,249✔
498
}
499

500
static void freeTagCache(void* pData){
1,716,025✔
501
  if (pData == NULL) return;
1,716,025✔
502
  SArray* tagCache = *(SArray**)pData;
1,716,025✔
503
  taosArrayDestroyP(tagCache, taosMemFree);
1,716,025✔
504
}
505

506
STqReader* tqReaderOpen(SVnode* pVnode) {
455,391✔
507
  tqDebug("%s:%p", __FUNCTION__, pVnode);
455,391✔
508
  if (pVnode == NULL) {
456,388✔
509
    return NULL;
×
510
  }
511
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
456,388✔
512
  if (pReader == NULL) {
456,388✔
513
    return NULL;
×
514
  }
515

516
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
456,388✔
517
  if (pReader->pWalReader == NULL) {
456,388✔
518
    taosMemoryFree(pReader);
×
519
    return NULL;
×
520
  }
521

522
  pReader->pVnode = pVnode;
456,388✔
523
  pReader->pSchemaWrapper = NULL;
456,388✔
524
  pReader->tbIdHash = NULL;
456,388✔
525
  pReader->pTableTagCacheForTmq = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
456,388✔
526
  if (pReader->pTableTagCacheForTmq == NULL) {
456,388✔
527
    walCloseReader(pReader->pWalReader);
×
528
    taosMemoryFree(pReader);
×
529
    return NULL;
×
530
  }
531
  taosHashSetFreeFp(pReader->pTableTagCacheForTmq, freeTagCache);
456,388✔
532
  taosInitRWLatch(&pReader->tagCachelock);
456,388✔
533

534
  return pReader;
456,388✔
535
}
536

537
void tqReaderClose(STqReader* pReader) {
460,820✔
538
  tqDebug("%s:%p", __FUNCTION__, pReader);
460,820✔
539
  if (pReader == NULL) return;
461,115✔
540

541
  // close wal reader
542
  walCloseReader(pReader->pWalReader);
456,388✔
543
  taosHashCleanup(pReader->pTableTagCacheForTmq);
456,388✔
544
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
456,388✔
545
  taosMemoryFree(pReader->pTSchema);
456,093✔
546
  taosMemoryFree(pReader->extSchema);
456,093✔
547

548
  // free hash
549
  taosHashCleanup(pReader->tbIdHash);
456,093✔
550
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
456,388✔
551

552
  taosMemoryFree(pReader);
456,388✔
553
}
554

555
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
371,013✔
556
  if (pReader == NULL) {
371,013✔
557
    return TSDB_CODE_INVALID_PARA;
×
558
  }
559
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
371,013✔
560
    return terrno;
138,420✔
561
  }
562
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
232,197✔
563
  return 0;
232,905✔
564
}
565

566
static int32_t getTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid) {
104,841,092✔
567
  int32_t code = 0;
104,841,092✔
568
  int32_t lino = 0;
104,841,092✔
569

570
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
104,841,092✔
571
  if (data == NULL) {
104,914,589✔
572
    SStorageAPI api = {0}; 
40,620,479✔
573
    initStorageAPI(&api);
40,620,479✔
574
    code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, 0, &pReader->tagCachelock);
40,619,468✔
575
    TSDB_CHECK_CODE(code, lino, END);
40,621,918✔
576
  }
577

578
  END:
64,294,110✔
579
  if (code != TSDB_CODE_SUCCESS) {
104,892,116✔
580
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
581
  }
582
  
583
  return code;
104,846,068✔
584
}
585

586
void tqUpdateTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid, col_id_t colId) {
490,315✔
587
  int32_t code = 0;
490,315✔
588
  int32_t lino = 0;
490,315✔
589

590
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
490,315✔
591
  if (data == NULL) {
490,315✔
592
    return;
489,634✔
593
  }
594

595
  SStorageAPI api = {0}; 
681✔
596
  initStorageAPI(&api);
681✔
597
  code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, colId, &pReader->tagCachelock);
681✔
598
  TSDB_CHECK_CODE(code, lino, END);
681✔
599

600
  END:
681✔
601
  if (code != TSDB_CODE_SUCCESS) {
681✔
602
    tqError("%s failed at %d, failed to update tag cache code:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
603
  }
604
}
605

606
static int32_t tqRetrievePseudoCols(STqReader* pReader, SSDataBlock* pBlock, int32_t numOfRows, int64_t uid, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr) {
104,846,917✔
607
  if (pReader == NULL || pBlock == NULL) {
104,846,917✔
608
    return TSDB_CODE_INVALID_PARA;
×
609
  }
610
  int32_t code = TSDB_CODE_SUCCESS;
104,883,039✔
611
  int32_t lino = 0;
104,883,039✔
612
  
613
  code = getTableTagCache(pReader, pPseudoExpr, numOfPseudoExpr, uid);
104,883,039✔
614
  TSDB_CHECK_CODE(code, lino, END);
104,853,450✔
615

616
  code = fillTag(pReader->pTableTagCacheForTmq, pPseudoExpr, numOfPseudoExpr, uid, pBlock, numOfRows, pBlock->info.rows - numOfRows, 1, &pReader->tagCachelock);
104,853,450✔
617
  TSDB_CHECK_CODE(code, lino, END);
104,899,975✔
618

619
END:
104,899,975✔
620
  if (code != 0) {
104,899,975✔
621
    tqError("tqRetrievePseudoCols failed, line:%d, msg:%s", lino, tstrerror(code));
×
622
  }
623
  return code;
104,870,511✔
624
}
625

626
int32_t tqNextBlockInWal(STqReader* pReader, SSDataBlock* pRes, SHashObj* pCol2SlotId, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
9,537,963✔
627
                         int sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
628
  int32_t code = 0;
9,537,963✔
629
  if (pReader == NULL) {
9,537,963✔
630
    return TSDB_CODE_INVALID_PARA;
×
631
  }
632
  SWalReader* pWalReader = pReader->pWalReader;
9,537,963✔
633

634
  int64_t st = taosGetTimestampMs();
9,537,960✔
635
  while (1) {
106,520,061✔
636
    code = walNextValidMsg(pWalReader, false);
116,058,021✔
637
    if (code != 0) {
116,055,252✔
638
      break;
8,698,717✔
639
    }
640

641
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
107,356,535✔
642
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
107,428,684✔
643
    int64_t ver = pWalReader->pHead->head.version;
107,411,371✔
644
    SDecoder decoder = {0};
107,422,175✔
645
    code = tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder);
107,414,382✔
646
    tDecoderClear(&decoder);
107,369,017✔
647
    if (code != 0) {
107,368,103✔
648
      return code;
×
649
    }
650
    pReader->nextBlk = 0;
107,368,103✔
651

652
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
107,421,287✔
653
    while (pReader->nextBlk < numOfBlocks) {
214,813,253✔
654
      tqDebug("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
107,389,655✔
655
              pReader->msg.ver);
656

657
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
107,437,705✔
658
      if (pSubmitTbData == NULL) {
107,449,835✔
659
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
660
                pReader->msg.ver);
661
        return terrno;
×
662
      }
663
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
107,449,835✔
664
        pReader->nextBlk += 1;
×
665
        continue;
×
666
      }
667
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
107,448,011✔
668
        tqDebug("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
104,922,128✔
669
        int32_t numOfRows = pRes->info.rows;
104,919,927✔
670
        code = tqRetrieveCols(pReader, pRes, pCol2SlotId);
104,923,223✔
671
        if (code != TSDB_CODE_SUCCESS) {
104,806,764✔
672
          return code;
×
673
        }
674
        code = tqRetrievePseudoCols(pReader, pRes, numOfRows, pSubmitTbData->uid, pPseudoExpr, numOfPseudoExpr);
104,806,764✔
675
        if (code != TSDB_CODE_SUCCESS) {
104,866,432✔
676
          return code;
×
677
        }
678

679
      }
680
      pReader->nextBlk += 1;
107,394,494✔
681
    }
682

683
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
107,385,181✔
684
    pReader->msg.msgStr = NULL;
107,393,114✔
685

686
    if (pRes->info.rows >= minPollRows || (enableReplay && pRes->info.rows > 0)){
107,411,818✔
687
      break;
688
    }
689
    int64_t elapsed = taosGetTimestampMs() - st;
106,745,855✔
690
    if (elapsed > timeout || elapsed < 0) {
106,745,855✔
691
      code = TSDB_CODE_TMQ_FETCH_TIMEOUT;
254,192✔
692
      terrno = code;
254,192✔
693
      break;
196,735✔
694
    }
695
  }
696
  return code;
9,535,625✔
697
}
698

699
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
155,272,951✔
700
  if (pReader == NULL) {
155,272,951✔
701
    return TSDB_CODE_INVALID_PARA;
×
702
  }
703
  pReader->msg.msgStr = msgStr;
155,272,951✔
704
  pReader->msg.msgLen = msgLen;
155,299,827✔
705
  pReader->msg.ver = ver;
155,319,072✔
706

707
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
155,309,946✔
708

709
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
155,309,946✔
710
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
155,300,861✔
711

712
  if (code != 0) {
155,295,421✔
713
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
714
  }
715

716
  return code;
155,260,549✔
717
}
718

719
void tqReaderClearSubmitMsg(STqReader* pReader) {
95,715,298✔
720
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
95,715,298✔
721
  pReader->nextBlk = 0;
95,715,300✔
722
  pReader->msg.msgStr = NULL;
95,732,332✔
723
}
95,757,978✔
724

725
SWalReader* tqGetWalReader(STqReader* pReader) {
19,311,705✔
726
  if (pReader == NULL) {
19,311,705✔
727
    return NULL;
×
728
  }
729
  return pReader->pWalReader;
19,311,705✔
730
}
731

732
int64_t tqGetResultBlockTime(STqReader* pReader) {
9,537,031✔
733
  if (pReader == NULL) {
9,537,031✔
734
    return 0;
×
735
  }
736
  return pReader->lastTs;
9,537,031✔
737
}
738

739
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
24,639,678✔
740
  int32_t code = false;
24,639,678✔
741
  int32_t lino = 0;
24,639,678✔
742
  int64_t uid = 0;
24,639,678✔
743
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
24,639,678✔
744
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
24,639,678✔
745
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
24,645,475✔
746

747
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
24,650,207✔
748
  while (pReader->nextBlk < blockSz) {
25,865,124✔
749
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
12,934,510✔
750
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
12,935,602✔
751
    uid = pSubmitTbData->uid;
12,935,602✔
752
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
12,935,602✔
753
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
12,935,602✔
754

755
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
1,215,645✔
756
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
757
    pReader->nextBlk++;
1,215,645✔
758
  }
759

760
  tqReaderClearSubmitMsg(pReader);
12,927,386✔
761
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
12,928,842✔
762

763
END:
12,928,842✔
764
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
24,648,799✔
765
  return code;
24,645,887✔
766
}
767

768
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
69,847,881✔
769
  int32_t code = false;
69,847,881✔
770
  int32_t lino = 0;
69,847,881✔
771
  int64_t uid = 0;
69,847,881✔
772

773
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
69,847,881✔
774
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
69,847,881✔
775
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
69,890,688✔
776

777
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
69,890,688✔
778
  while (pReader->nextBlk < blockSz) {
69,933,175✔
779
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
34,967,398✔
780
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
34,977,641✔
781
    uid = pSubmitTbData->uid;
34,977,641✔
782
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
34,977,641✔
783
    TSDB_CHECK_NULL(ret, code, lino, END, true);
34,971,999✔
784
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
785
    pReader->nextBlk++;
×
786
  }
787
  tqReaderClearSubmitMsg(pReader);
34,950,420✔
788
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
34,934,959✔
789

790
END:
34,934,959✔
791
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
69,906,958✔
792
  return code;
69,831,327✔
793
}
794

795
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
46,409,659✔
796
                    SExtSchema* extSrc) {
797
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
46,409,659✔
798
    return TSDB_CODE_INVALID_PARA;
×
799
  }
800
  int32_t code = 0;
46,447,263✔
801

802
  int32_t cnt = 0;
46,447,263✔
803
  for (int32_t i = 0; i < pSrc->nCols; i++) {
260,543,026✔
804
    cnt += mask[i];
214,091,337✔
805
  }
806

807
  pDst->nCols = cnt;
46,471,957✔
808
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
46,482,380✔
809
  if (pDst->pSchema == NULL) {
46,422,769✔
810
    return TAOS_GET_TERRNO(terrno);
×
811
  }
812

813
  int32_t j = 0;
46,425,552✔
814
  for (int32_t i = 0; i < pSrc->nCols; i++) {
260,629,664✔
815
    if (mask[i]) {
214,105,571✔
816
      pDst->pSchema[j++] = pSrc->pSchema[i];
214,170,778✔
817
      SColumnInfoData colInfo =
214,168,572✔
818
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
214,175,153✔
819
      if (extSrc != NULL) {
214,134,608✔
820
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
821
      }
822
      code = blockDataAppendColInfo(pBlock, &colInfo);
214,134,608✔
823
      if (code != 0) {
214,202,061✔
824
        return code;
×
825
      }
826
    }
827
  }
828
  return 0;
46,488,352✔
829
}
830

831
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
832
  int32_t code = 0;
×
833
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
834
    return TSDB_CODE_INVALID_PARA;
×
835
  }
836
  // TODO(yhDeng)
837
  if (COL_VAL_IS_VALUE(pColVal)) {
×
838
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
839
    if (val == NULL) {
×
840
      return terrno;
×
841
    }
842

843
    uint64_t seq = 0;
×
844
    int32_t  len = 0;
×
845
    if (pColVal->value.pData != NULL) {
×
846
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
847
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
848
      }
849
      SBlobItem item = {0};
×
850
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
851
      if (code != 0) {
×
852
        taosMemoryFree(val);
×
853
        terrno = code;
×
854
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
855
        return code;
×
856
      }
857

858
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
859
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
860
      len = item.len;
×
861
    }
862

863
    blobDataSetLen(val, len);
×
864
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
865

866
    taosMemoryFree(val);
×
867
  } else {
868
    colDataSetNULL(pColumnInfoData, idx);
×
869
  }
870
  return code;
×
871
}
872
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
873
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
874

875
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647✔
876
    if (COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
877
      char val[65535 + 2] = {0};
2,147,483,647✔
878
      if (pColVal->value.pData != NULL) {
2,147,483,647✔
879
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
2,147,483,647✔
880
      }
881
      varDataSetLen(val, pColVal->value.nData);
2,147,483,647✔
882
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
2,147,483,647✔
883
    } else {
884
      colDataSetNULL(pColumnInfoData, rowIndex);
×
885
    }
886
  } else {
887
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647✔
888
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647✔
889
  }
890

891
  return code;
2,147,483,647✔
892
}
893

894
static int32_t setBlockData(SSDataBlock* pBlock, int32_t slotId, int32_t rowIndex, SColVal* colVal, SBlobSet* pBlobSet) {
2,147,483,647✔
895
  int32_t        code = 0;
2,147,483,647✔
896
  SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
897
  if (pColData == NULL) {
2,147,483,647✔
898
    return terrno;
×
899
  }
900

901
  uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647✔
902
  if (isBlob == 0) {
2,147,483,647✔
903
    code = doSetVal(pColData, rowIndex, colVal);
2,147,483,647✔
904
  } else {
UNCOV
905
    code = doSetBlobVal(pColData, rowIndex, colVal, pBlobSet);
×
906
  }
907
  return code;
2,147,483,647✔
908
}
909

910
static int32_t processSubmitRow(SArray*         pRows,
104,913,527✔
911
                                SSDataBlock*    pBlock,
912
                                SHashObj*       pCol2SlotId,
913
                                STqReader*      pReader,
914
                                SBlobSet*       pBlobSet) {
915
  int32_t        code = 0;
104,913,527✔
916
  int32_t        line = 0;
104,913,527✔
917

918
  SArray* pColArray = taosArrayInit(4, INT_BYTES * 2);
104,913,527✔
919
  TSDB_CHECK_NULL(pColArray, code, line, END, terrno);
104,914,911✔
920

921
  int32_t sourceIdx = -1;
104,914,911✔
922
  int32_t rowIndex = 0;
104,914,911✔
923
  SRow* pRow = taosArrayGetP(pRows, rowIndex);
104,914,911✔
924
  TSDB_CHECK_NULL(pRow, code, line, END, terrno);
104,913,892✔
925
  while (++sourceIdx < pReader->pTSchema->numOfCols) {
826,977,032✔
926
    SColVal colVal = {0};
722,218,969✔
927
    code = tRowGet(pRow, pReader->pTSchema, sourceIdx, &colVal);
722,072,131✔
928
    TSDB_CHECK_CODE(code, line, END);
722,211,107✔
929
    void* pSlotId = taosHashGet(pCol2SlotId, &colVal.cid, sizeof(colVal.cid));
722,211,107✔
930
    if (pSlotId == NULL) {
722,721,154✔
931
      continue;
236,059,194✔
932
    }
933
    int32_t pData[2] = {sourceIdx, *(int16_t*)pSlotId};
486,661,960✔
934
    TSDB_CHECK_NULL(taosArrayPush(pColArray, pData), code, line, END, terrno);
486,634,736✔
935
    code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
486,634,736✔
936
    TSDB_CHECK_CODE(code, line, END);
485,978,442✔
937
  }
938
  
939
  for (rowIndex = 1; rowIndex < taosArrayGetSize(pRows); rowIndex++) {
2,147,483,647✔
940
    SRow* pRow = taosArrayGetP(pRows, rowIndex);
2,147,483,647✔
941
    TSDB_CHECK_NULL(pRow, code, line, END, terrno);
2,147,483,647✔
942
    for (int32_t j = 0; j < taosArrayGetSize(pColArray); j++) {
2,147,483,647✔
943
      int32_t* pData = taosArrayGet(pColArray, j);
2,147,483,647✔
944
      TSDB_CHECK_NULL(pData, code, line, END, terrno);
2,147,483,647✔
945

946
      SColVal colVal = {0};
2,147,483,647✔
947
      code = tRowGet(pRow, pReader->pTSchema, pData[0], &colVal);
2,147,483,647✔
948
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
949

950
      code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
2,147,483,647✔
951
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
952
    }
953
  }
954

955
  END:
104,454,180✔
956
  taosArrayDestroy(pColArray);
94,625,542✔
957
  return code;
104,884,944✔
958
}
959

960
static int32_t processSubmitCol(SArray*         pCols,
1,364✔
961
                                SSDataBlock*    pBlock,
962
                                SHashObj*       pCol2SlotId,
963
                                SBlobSet*       pBlobSet) {
964
  int32_t        code = 0;
1,364✔
965
  int32_t        line = 0;
1,364✔
966

967
  for (int32_t i = 0; i < taosArrayGetSize(pCols); i++) {
4,092✔
968
    SColData* pCol = taosArrayGet(pCols, i);
2,728✔
969
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
2,728✔
970
    void* pSlotId = taosHashGet(pCol2SlotId, &pCol->cid, sizeof(pCol->cid));
2,728✔
971
    if (pSlotId == NULL) {
2,728✔
972
      continue;
1,364✔
973
    }
974
    SColVal colVal = {0};
1,364✔
975
    for (int32_t row = 0; row < pCol->nVal; row++) {
4,092✔
976
      code = tColDataGetValue(pCol, row, &colVal);
2,728✔
977
      TSDB_CHECK_CODE(code, line, END);
2,728✔
978

979
      code = setBlockData(pBlock, *(int16_t*)pSlotId, pBlock->info.rows + row, &colVal, pBlobSet);
2,728✔
980
      TSDB_CHECK_CODE(code, line, END);
2,728✔
981
    }
982
  }
983
  
984
  END:
1,364✔
985
  return code;
1,364✔
986
}
987

988
static int32_t checkSchema(STqReader* pReader, SSubmitTbData* pSubmitTbData) {
104,910,449✔
989
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
104,910,449✔
990
  int32_t sversion = pSubmitTbData->sver;
104,919,561✔
991
  int64_t suid = pSubmitTbData->suid;
104,921,398✔
992
  int64_t uid = pSubmitTbData->uid;
104,919,929✔
993
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
104,920,659✔
994
      (pReader->cachedSchemaVer != sversion)) {
104,771,050✔
995
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
180,574✔
996
    taosMemoryFreeClear(pReader->extSchema);
151,078✔
997
    taosMemoryFreeClear(pReader->pTSchema);
151,078✔
998
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
151,078✔
999
    if (pReader->pSchemaWrapper == NULL) {
151,078✔
1000
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d, possibly dropped table",
×
1001
              vgId, suid, uid, pReader->cachedSchemaVer);
1002
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
1003
    }
1004
    pReader->pTSchema = tBuildTSchema(pReader->pSchemaWrapper->pSchema, pReader->pSchemaWrapper->nCols, pReader->pSchemaWrapper->version);
151,078✔
1005
    if (pReader->pTSchema == NULL) {
151,078✔
1006
      tqWarn("vgId:%d, cannot build schema for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d",
×
1007
              vgId, suid, uid, pReader->cachedSchemaVer);
1008
      return terrno;
×
1009
    }
1010
    pReader->cachedSchemaUid = uid;
150,743✔
1011
    pReader->cachedSchemaSuid = suid;
150,743✔
1012
    pReader->cachedSchemaVer = sversion;
151,078✔
1013
  }
1014
  return TSDB_CODE_SUCCESS;
104,888,659✔
1015
}
1016

1017
static int32_t tqRetrieveCols(STqReader* pReader, SSDataBlock* pBlock, SHashObj* pCol2SlotId) {
104,922,523✔
1018
  if (pReader == NULL || pBlock == NULL) {
104,922,523✔
1019
    return TSDB_CODE_INVALID_PARA;
×
1020
  }
1021
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
104,923,588✔
1022
  int32_t        code = 0;
104,913,394✔
1023
  int32_t        line = 0;
104,913,394✔
1024
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
104,913,394✔
1025
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
104,918,113✔
1026
  pReader->lastTs = pSubmitTbData->ctimeMs;
104,918,113✔
1027

1028
  int32_t numOfRows = 0;
104,922,128✔
1029
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
104,922,128✔
1030
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
1,364✔
1031
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
1,364✔
1032
    numOfRows = pCol->nVal;
1,364✔
1033
  } else {
1034
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
104,919,669✔
1035
  }
1036

1037
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfRows);
104,922,128✔
1038
  TSDB_CHECK_CODE(code, line, END);
104,919,208✔
1039

1040
  code = checkSchema(pReader, pSubmitTbData);
104,919,208✔
1041
  TSDB_CHECK_CODE(code, line, END);
104,917,010✔
1042

1043
  // convert and scan one block
1044
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
104,917,010✔
1045
    SArray* pCols = pSubmitTbData->aCol;
1,364✔
1046
    code = processSubmitCol(pCols, pBlock, pCol2SlotId, pSubmitTbData->pBlobSet);
1,364✔
1047
    TSDB_CHECK_CODE(code, line, END);
1,364✔
1048
  } else {
1049
    SArray*         pRows = pSubmitTbData->aRowP;
104,918,201✔
1050
    code = processSubmitRow(pRows, pBlock, pCol2SlotId, pReader, pSubmitTbData->pBlobSet);
104,918,638✔
1051
    TSDB_CHECK_CODE(code, line, END);
104,841,580✔
1052
  }
1053
  pBlock->info.rows += numOfRows;
104,842,944✔
1054
END:
104,891,086✔
1055
  if (code != 0) {
104,891,086✔
1056
    tqError("tqRetrieveCols failed, line:%d, msg:%s", line, tstrerror(code));
×
1057
  }
1058
  return code;
104,853,225✔
1059
}
1060

1061
#define PROCESS_VAL                                      \
1062
  if (curRow == 0) {                                     \
1063
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
1064
    buildNew = true;                                     \
1065
  } else {                                               \
1066
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
1067
    if (currentRowAssigned != assigned[j]) {             \
1068
      assigned[j] = currentRowAssigned;                  \
1069
      buildNew = true;                                   \
1070
    }                                                    \
1071
  }
1072

1073
#define SET_DATA                                                                                    \
1074
  if (colVal.cid < pColData->info.colId) {                                                          \
1075
    sourceIdx++;                                                                                    \
1076
  } else if (colVal.cid == pColData->info.colId) {                                                  \
1077
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
1078
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
1079
    } else {                                                                                        \
1080
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
1081
    }                                                                                               \
1082
    sourceIdx++;                                                                                    \
1083
    targetIdx++;                                                                                    \
1084
  } else {                                                                                          \
1085
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
1086
    targetIdx++;                                                                                    \
1087
  }
1088

1089
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
46,383,289✔
1090
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
1091
  int32_t         code = 0;
46,383,289✔
1092
  SSchemaWrapper* pSW = NULL;
46,383,289✔
1093
  SSDataBlock*    block = NULL;
46,415,099✔
1094
  if (taosArrayGetSize(blocks) > 0) {
46,415,099✔
1095
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
1096
    TQ_NULL_GO_TO_END(pLastBlock);
×
1097
    pLastBlock->info.rows = curRow - *lastRow;
×
1098
    *lastRow = curRow;
×
1099
  }
1100

1101
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
46,447,980✔
1102
  TQ_NULL_GO_TO_END(block);
46,388,304✔
1103

1104
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
46,388,304✔
1105
  TQ_NULL_GO_TO_END(pSW);
46,414,953✔
1106

1107
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
46,414,953✔
1108
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
46,486,663✔
1109
          (int32_t)taosArrayGetSize(block->pDataBlock));
1110

1111
  block->info.id.uid = pSubmitTbData->uid;
46,486,663✔
1112
  block->info.version = pReader->msg.ver;
46,477,029✔
1113
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
46,480,925✔
1114
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
46,474,818✔
1115
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
46,460,957✔
1116
  pSW = NULL;
46,460,957✔
1117

1118
  taosMemoryFreeClear(block);
46,460,957✔
1119

1120
END:
46,475,387✔
1121
  if (code != 0) {
46,463,752✔
1122
    tqError("processBuildNew failed, code:%d", code);
×
1123
  }
1124
  tDeleteSchemaWrapper(pSW);
46,463,752✔
1125
  blockDataFreeRes(block);
46,430,393✔
1126
  taosMemoryFree(block);
46,419,069✔
1127
  return code;
46,465,320✔
1128
}
1129
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
856,775✔
1130
  int32_t code = 0;
856,775✔
1131
  int32_t curRow = 0;
856,775✔
1132
  int32_t lastRow = 0;
856,775✔
1133

1134
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
856,775✔
1135
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
857,110✔
1136
  TQ_NULL_GO_TO_END(assigned);
857,110✔
1137

1138
  SArray*   pCols = pSubmitTbData->aCol;
857,110✔
1139
  SColData* pCol = taosArrayGet(pCols, 0);
857,445✔
1140
  TQ_NULL_GO_TO_END(pCol);
856,775✔
1141
  int32_t numOfRows = pCol->nVal;
856,775✔
1142
  int32_t numOfCols = taosArrayGetSize(pCols);
857,110✔
1143
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
857,110✔
1144
          numOfRows);
1145
  for (int32_t i = 0; i < numOfRows; i++) {
116,339,650✔
1146
    bool buildNew = false;
115,382,315✔
1147

1148
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
458,342,572✔
1149
      int32_t k = 0;
341,418,385✔
1150
      for (; k < numOfCols; k++) {
679,811,534✔
1151
        pCol = taosArrayGet(pCols, k);
664,110,783✔
1152
        TQ_NULL_GO_TO_END(pCol);
659,472,306✔
1153
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
659,472,306✔
1154
          SColVal colVal = {0};
341,866,739✔
1155
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
341,542,812✔
1156
          PROCESS_VAL
343,486,855✔
1157
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
344,066,184✔
1158
          break;
343,254,315✔
1159
        }
1160
      }
1161
      if (k >= numOfCols) {
342,960,257✔
1162
        // this column is not in the current row, so we set it to NULL
1163
        assigned[j] = 0;
×
1164
        buildNew = true;
×
1165
      }
1166
    }
1167

1168
    if (buildNew) {
113,496,224✔
1169
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
857,445✔
1170
    }
1171

1172
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
113,496,224✔
1173
    TQ_NULL_GO_TO_END(pBlock);
115,322,767✔
1174

1175
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
115,322,767✔
1176
            (int32_t)taosArrayGetSize(blocks));
1177

1178
    int32_t targetIdx = 0;
115,322,767✔
1179
    int32_t sourceIdx = 0;
115,322,767✔
1180
    int32_t colActual = blockDataGetNumOfCols(pBlock);
115,322,767✔
1181
    while (targetIdx < colActual && sourceIdx < numOfCols) {
457,710,713✔
1182
      pCol = taosArrayGet(pCols, sourceIdx);
342,228,173✔
1183
      TQ_NULL_GO_TO_END(pCol);
341,971,489✔
1184
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
341,971,489✔
1185
      TQ_NULL_GO_TO_END(pColData);
334,711,009✔
1186
      SColVal colVal = {0};
334,711,009✔
1187
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
341,466,131✔
1188
      SET_DATA
343,803,266✔
1189
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
345,211,599✔
1190
    }
1191

1192
    curRow++;
115,482,540✔
1193
  }
1194
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
957,335✔
1195
  pLastBlock->info.rows = curRow - lastRow;
857,780✔
1196
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
857,780✔
1197
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1198
END:
19,844,108✔
1199
  if (code != TSDB_CODE_SUCCESS) {
857,780✔
1200
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1201
  }
1202
  taosMemoryFree(assigned);
857,780✔
1203
  return code;
857,780✔
1204
}
1205

1206
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
45,518,557✔
1207
  int32_t   code = 0;
45,518,557✔
1208
  STSchema* pTSchema = NULL;
45,518,557✔
1209

1210
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
45,518,557✔
1211
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
45,555,161✔
1212
  TQ_NULL_GO_TO_END(assigned);
45,565,526✔
1213

1214
  int32_t curRow = 0;
45,565,526✔
1215
  int32_t lastRow = 0;
45,565,526✔
1216
  SArray* pRows = pSubmitTbData->aRowP;
45,541,970✔
1217
  int32_t numOfRows = taosArrayGetSize(pRows);
45,594,635✔
1218
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
45,588,251✔
1219
  TQ_NULL_GO_TO_END(pTSchema);
45,614,207✔
1220
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
45,614,207✔
1221

1222
  for (int32_t i = 0; i < numOfRows; i++) {
1,328,516,368✔
1223
    bool  buildNew = false;
1,282,961,181✔
1224
    SRow* pRow = taosArrayGetP(pRows, i);
1,282,961,181✔
1225
    TQ_NULL_GO_TO_END(pRow);
1,282,375,534✔
1226

1227
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
2,147,483,647✔
1228
      SColVal colVal = {0};
2,147,483,647✔
1229
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2,147,483,647✔
1230
      PROCESS_VAL
2,147,483,647✔
1231
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
2,147,483,647✔
1232
    }
1233

1234
    if (buildNew) {
1,273,422,649✔
1235
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
45,629,461✔
1236
    }
1237

1238
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
1,273,391,436✔
1239
    TQ_NULL_GO_TO_END(pBlock);
1,282,996,903✔
1240

1241
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
1,282,996,903✔
1242
            (int32_t)taosArrayGetSize(blocks));
1243

1244
    int32_t targetIdx = 0;
1,282,996,903✔
1245
    int32_t sourceIdx = 0;
1,282,996,903✔
1246
    int32_t colActual = blockDataGetNumOfCols(pBlock);
1,282,996,903✔
1247
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
2,147,483,647✔
1248
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2,147,483,647✔
1249
      TQ_NULL_GO_TO_END(pColData);
2,147,483,647✔
1250
      SColVal          colVal = {0};
2,147,483,647✔
1251
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2,147,483,647✔
1252
      SET_DATA
2,147,483,647✔
1253
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
2,147,483,647✔
1254
    }
1255

1256
    curRow++;
1,282,978,464✔
1257
  }
1258
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
45,555,187✔
1259
  if (pLastBlock != NULL) {
45,612,805✔
1260
    pLastBlock->info.rows = curRow - lastRow;
45,621,335✔
1261
  }
1262

1263
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
45,625,467✔
1264
          (int)taosArrayGetSize(blocks));
1265
END:
48,782,279✔
1266
  if (code != TSDB_CODE_SUCCESS) {
45,602,684✔
1267
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1268
  }
1269
  taosMemoryFreeClear(pTSchema);
45,565,634✔
1270
  taosMemoryFree(assigned);
45,542,107✔
1271
  return code;
45,592,122✔
1272
}
1273

1274
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
1,741✔
1275
  int32_t code = 0;
1,741✔
1276
  int32_t lino = 0;
1,741✔
1277
  void*   createReq = NULL;
1,741✔
1278
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
1,741✔
1279
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
1,741✔
1280

1281
  if (pRsp->createTableNum == 0) {
1,741✔
1282
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
1,376✔
1283
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
1,376✔
1284
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
1,376✔
1285
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
1,376✔
1286
  }
1287

1288
  uint32_t len = 0;
1,741✔
1289
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
1,741✔
1290
  TSDB_CHECK_CODE(code, lino, END);
1,741✔
1291
  createReq = taosMemoryCalloc(1, len);
1,741✔
1292
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
1,741✔
1293

1294
  SEncoder encoder = {0};
1,741✔
1295
  tEncoderInit(&encoder, createReq, len);
1,741✔
1296
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
1,741✔
1297
  tEncoderClear(&encoder);
1,741✔
1298
  TSDB_CHECK_CODE(code, lino, END);
1,741✔
1299
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
3,482✔
1300
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
3,482✔
1301
  pRsp->createTableNum++;
1,741✔
1302
  tqTrace("build create table info msg success");
1,741✔
1303

1304
END:
1,741✔
1305
  if (code != 0) {
1,741✔
1306
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1307
    taosMemoryFree(createReq);
×
1308
  }
1309
  return code;
1,741✔
1310
}
1311

1312
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
46,685,238✔
1313
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1314
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
46,685,238✔
1315
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
46,685,238✔
1316
  if (pSubmitTbData == NULL) {
46,696,293✔
1317
    return terrno;
×
1318
  }
1319
  pReader->nextBlk++;
46,696,293✔
1320

1321
  if (pSubmitTbDataRet) {
46,680,858✔
1322
    *pSubmitTbDataRet = pSubmitTbData;
46,691,794✔
1323
  }
1324

1325
  if (fetchMeta == ONLY_META) {
46,684,709✔
1326
    if (pSubmitTbData->pCreateTbReq != NULL) {
954✔
1327
      if (pRsp->createTableReq == NULL) {
954✔
1328
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
954✔
1329
        if (pRsp->createTableReq == NULL) {
954✔
1330
          return terrno;
×
1331
        }
1332
      }
1333
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
1,908✔
1334
        return terrno;
×
1335
      }
1336
      pSubmitTbData->pCreateTbReq = NULL;
954✔
1337
    }
1338
    return 0;
954✔
1339
  }
1340

1341
  int32_t sversion = pSubmitTbData->sver;
46,683,755✔
1342
  int64_t uid = pSubmitTbData->uid;
46,683,814✔
1343
  pReader->lastBlkUid = uid;
46,682,624✔
1344

1345
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
46,688,920✔
1346
  taosMemoryFreeClear(pReader->extSchema);
46,666,885✔
1347
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
46,685,922✔
1348
  if (pReader->pSchemaWrapper == NULL) {
46,650,035✔
1349
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
210,220✔
1350
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1351
    pReader->cachedSchemaSuid = 0;
210,220✔
1352
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
210,220✔
1353
  }
1354

1355
  if (pSubmitTbData->pCreateTbReq != NULL) {
46,444,123✔
1356
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
1,741✔
1357
    if (code != 0) {
1,741✔
1358
      return code;
×
1359
    }
1360
  } else if (rawList != NULL) {
46,439,489✔
1361
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1362
      return terrno;
×
1363
    }
1364
    pReader->pSchemaWrapper = NULL;
×
1365
    return 0;
×
1366
  }
1367

1368
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
46,441,230✔
1369
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
857,110✔
1370
  } else {
1371
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
45,557,133✔
1372
  }
1373
}
1374

1375
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
364,732✔
1376
  if (pReader == NULL || tbUidList == NULL) {
364,732✔
1377
    return TSDB_CODE_SUCCESS;
×
1378
  }
1379
  if (pReader->tbIdHash) {
364,732✔
1380
    taosHashClear(pReader->tbIdHash);
×
1381
  } else {
1382
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
364,732✔
1383
    if (pReader->tbIdHash == NULL) {
364,732✔
1384
      tqError("s-task:%s failed to init hash table", id);
×
1385
      return terrno;
×
1386
    }
1387
  }
1388

1389
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
10,419,284✔
1390
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
10,039,339✔
1391
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
10,032,853✔
1392
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1393
      continue;
×
1394
    }
1395
  }
1396

1397
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
364,427✔
1398
  return TSDB_CODE_SUCCESS;
364,732✔
1399
}
1400

1401
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
421,694✔
1402
  if (pReader == NULL || pTableUidList == NULL) {
421,694✔
1403
    return;
×
1404
  }
1405
  if (pReader->tbIdHash == NULL) {
421,694✔
1406
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1407
    if (pReader->tbIdHash == NULL) {
×
1408
      tqError("failed to init hash table");
×
1409
      return;
×
1410
    }
1411
  }
1412

1413
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
421,694✔
1414
  for (int i = 0; i < numOfTables; i++) {
735,258✔
1415
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
313,564✔
1416
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
313,564✔
1417
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1418
      continue;
×
1419
    }
1420
    tqDebug("%s add table uid:%" PRId64 " to hash", __func__, *pKey);
313,564✔
1421
  }
1422
}
1423

1424
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1425
  if (pReader == NULL) {
×
1426
    return false;
×
1427
  }
1428
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1429
}
1430

1431
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1432
  if (pReader == NULL) {
×
1433
    return false;
×
1434
  }
1435
  return pReader->msg.msgStr == NULL;
×
1436
}
1437

1438
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
3,680✔
1439
  if (pReader == NULL || tbUidList == NULL) {
3,680✔
1440
    return;
×
1441
  }
1442
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
5,864✔
1443
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
2,184✔
1444
    int32_t code = taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
2,184✔
1445
    if (code != 0) {
2,184✔
1446
      tqWarn("%s failed to remove table uid:%" PRId64 " from hash, msg:%s", __func__, pKey != NULL ? *pKey : 0, tstrerror(code));
728✔
1447
    }
1448
  }
1449
}
1450

1451
int32_t tqDeleteTbUidList(STQ* pTq, SArray* tbUidList) {
2,586,608✔
1452
  if (pTq == NULL) {
2,586,608✔
1453
    return 0;  // mounted vnode may have no tq
×
1454
  }
1455
  if (tbUidList == NULL) {
2,586,608✔
1456
    return TSDB_CODE_INVALID_PARA;
×
1457
  }
1458
  void*   pIter = NULL;
2,586,608✔
1459
  int32_t vgId = TD_VID(pTq->pVnode);
2,586,608✔
1460

1461
  // update the table list for each consumer handle
1462
  taosWLockLatch(&pTq->lock);
2,586,608✔
1463
  while (1) {
335,740✔
1464
    pIter = taosHashIterate(pTq->pHandle, pIter);
2,922,348✔
1465
    if (pIter == NULL) {
2,921,751✔
1466
      break;
2,586,011✔
1467
    }
1468

1469
    STqHandle* pTqHandle = (STqHandle*)pIter;
335,740✔
1470
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " delete table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
335,740✔
1471
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
335,740✔
1472
      int32_t code = qDeleteTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
1,496✔
1473
      if (code != 0) {
1,496✔
1474
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1475
        continue;
×
1476
      }
1477
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
334,244✔
1478
      int32_t sz = taosArrayGetSize(tbUidList);
334,244✔
1479
      for (int32_t i = 0; i < sz; i++) {
334,244✔
1480
        int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1481
        if (tbUid &&
×
1482
            taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1483
          tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1484
          continue;
×
1485
        }
1486
      }
1487
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
1488
      tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1489
    }
1490
  }
1491
  taosWUnLockLatch(&pTq->lock);
2,586,011✔
1492
  return 0;
2,586,608✔
1493
}
1494

1495
static SArray* copyUidList(const SArray* tbUidList) {
10,189✔
1496
  SArray* tbUidListCopy = taosArrayInit(4, sizeof(int64_t));
10,189✔
1497
  if (tbUidListCopy == NULL) {
10,189✔
1498
    return NULL;
×
1499
  }
1500

1501
  if (taosArrayAddAll(tbUidListCopy, tbUidList) == NULL) {
10,189✔
1502
    taosArrayDestroy(tbUidListCopy);
×
1503
    tqError("copy table uid list failed");
×
1504
    return NULL;
×
1505
  }
1506
  return tbUidListCopy;
10,189✔
1507
}
1508

1509
static int32_t addTableListForStableTmq(STqHandle* pTqHandle, STQ* pTq, const SArray* tbUidList) {
10,189✔
1510
  int32_t code = 0;
10,189✔
1511
  SArray* tbUidListCopy = copyUidList(tbUidList);
10,189✔
1512
  if (tbUidListCopy == NULL) {
10,189✔
1513
    code = terrno;
×
1514
    goto END;
×
1515
  }
1516
  code = qFilterTableList(pTq->pVnode, tbUidListCopy, pTqHandle->execHandle.execTb.node,
10,189✔
1517
                      pTqHandle->execHandle.task, pTqHandle->execHandle.execTb.suid);
10,189✔
1518
  if (code != TDB_CODE_SUCCESS) {
10,189✔
1519
    tqError("tqAddTbUidList error:%d handle %s consumer:0x%" PRIx64, code, pTqHandle->subKey,
×
1520
            pTqHandle->consumerId);
1521
    goto END;
×
1522
  }
1523
  tqDebug("%s handle %s consumer:0x%" PRIx64 " add %d tables to tqReader", __func__, pTqHandle->subKey,
10,189✔
1524
          pTqHandle->consumerId, (int32_t)taosArrayGetSize(tbUidListCopy));
1525
  tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, tbUidListCopy);
10,189✔
1526

1527
END:
10,189✔
1528
  taosArrayDestroy(tbUidListCopy);
10,189✔
1529
  return code;
10,189✔
1530
}
1531

1532
int32_t tqAddTbUidList(STQ* pTq, const SArray* tbUidList) {
58,319,355✔
1533
  if (pTq == NULL) {
58,319,355✔
1534
    return 0;  // mounted vnode may have no tq
×
1535
  }
1536
  if (tbUidList == NULL) {
58,319,355✔
1537
    return TSDB_CODE_INVALID_PARA;
×
1538
  }
1539
  void*   pIter = NULL;
58,319,355✔
1540
  int32_t vgId = TD_VID(pTq->pVnode);
58,319,355✔
1541
  int32_t code = 0;
58,320,324✔
1542

1543
  // update the table list for each consumer handle
1544
  taosWLockLatch(&pTq->lock);
58,320,324✔
1545
  while (1) {
1,079,461✔
1546
    pIter = taosHashIterate(pTq->pHandle, pIter);
59,399,785✔
1547
    if (pIter == NULL) {
59,399,785✔
1548
      break;
58,320,324✔
1549
    }
1550

1551
    STqHandle* pTqHandle = (STqHandle*)pIter;
1,079,461✔
1552
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " add table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
1,079,461✔
1553
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,079,461✔
1554
      code = qAddTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
411,505✔
1555
      if (code != 0) {
411,505✔
1556
        tqError("add table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1557
        break;
×
1558
      }
1559
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
667,956✔
1560
      code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
8,005✔
1561
      if (code != 0) {
8,005✔
1562
        tqError("add table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1563
        break;
×
1564
      }
1565
    }
1566
  }
1567
  taosHashCancelIterate(pTq->pHandle, pIter);
58,320,324✔
1568
  taosWUnLockLatch(&pTq->lock);
58,319,914✔
1569

1570
  return code;
58,320,324✔
1571
}
1572

1573
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, SArray* cidList, SArray* cidListArray) {
7,898,803✔
1574
  if (pTq == NULL) {
7,898,803✔
1575
    return 0;  // mounted vnode may have no tq
×
1576
  }
1577
  if (tbUidList == NULL) {
7,898,803✔
1578
    return TSDB_CODE_INVALID_PARA;
×
1579
  }
1580
  void*   pIter = NULL;
7,898,803✔
1581
  int32_t vgId = TD_VID(pTq->pVnode);
7,898,803✔
1582
  int32_t code = 0;
7,898,803✔
1583
  // update the table list for each consumer handle
1584
  taosWLockLatch(&pTq->lock);
7,898,803✔
1585
  while (1) {
2,865✔
1586
    pIter = taosHashIterate(pTq->pHandle, pIter);
7,901,668✔
1587
    if (pIter == NULL) {
7,901,668✔
1588
      break;
7,898,803✔
1589
    }
1590

1591
    STqHandle* pTqHandle = (STqHandle*)pIter;
2,865✔
1592
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " update table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
2,865✔
1593
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,865✔
1594
      SNode* pTagCond = getTagCondNodeForQueryTmq(pTqHandle->execHandle.task);
681✔
1595
      bool ret = checkCidInTagCondition(pTagCond, cidList);
681✔
1596
      if (ret){
681✔
1597
        code = qUpdateTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
×
1598
        if (code != 0) {
×
1599
          tqError("update table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1600
          break;
×
1601
        }
1602
      }
1603
      qUpdateTableTagCacheForTmq(pTqHandle->execHandle.task, tbUidList, cidList, cidListArray);
681✔
1604
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
2,184✔
1605
      SNode* pTagCond = getTagCondNodeForStableTmq(pTqHandle->execHandle.execTb.node);
2,184✔
1606
      bool ret = checkCidInTagCondition(pTagCond, cidList);
2,184✔
1607
      if (ret){
2,184✔
1608
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
2,184✔
1609
        code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
2,184✔
1610
        if (code != 0) {
2,184✔
1611
          tqError("update table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1612
          break;
×
1613
        }
1614
      }
1615
    }
1616
  }
1617

1618
  taosHashCancelIterate(pTq->pHandle, pIter);
7,898,803✔
1619
  taosWUnLockLatch(&pTq->lock);
7,898,803✔
1620

1621
  return code;
7,898,803✔
1622
}
1623

1624
static void destroySourceScanTables(void* ptr) {
×
1625
  SArray** pTables = ptr;
×
1626
  if (pTables && *pTables) {
×
1627
    taosArrayDestroy(*pTables);
×
1628
    *pTables = NULL;
×
1629
  }
1630
}
×
1631

1632
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1633
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1634
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1635
  if (pCol1->vColId == pCol2->vColId) {
×
1636
    return 0;
×
1637
  } else if (pCol1->vColId < pCol2->vColId) {
×
1638
    return -1;
×
1639
  } else {
1640
    return 1;
×
1641
  }
1642
}
1643

1644
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1645
  if (value) {
×
1646
    SSchemaWrapper* pSchemaWrapper = value;
×
1647
    tDeleteSchemaWrapper(pSchemaWrapper);
1648
  }
1649
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc