• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4541

19 Jul 2025 01:13AM UTC coverage: 56.753% (-1.6%) from 58.31%
#4541

push

travis-ci

web-flow
fix: subquery memleak (#32024)

124299 of 282344 branches covered (44.02%)

Branch coverage included in aggregate %.

181106 of 255787 relevant lines covered (70.8%)

24937406.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.49
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr);
20

21
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
546✔
22
  if (pHandle == NULL || pHead == NULL) {
546!
23
    return false;
×
24
  }
25
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
546!
26
    return true;
546✔
27
  }
28

29
  int16_t msgType = pHead->msgType;
×
30
  char*   body = pHead->body;
×
31
  int32_t bodyLen = pHead->bodyLen;
×
32

33
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
×
34
  int64_t  realTbSuid = 0;
×
35
  SDecoder dcoder = {0};
×
36
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
×
37
  int32_t  len = bodyLen - sizeof(SMsgHead);
×
38
  tDecoderInit(&dcoder, data, len);
×
39

40
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
×
41
    SVCreateStbReq req = {0};
×
42
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
×
43
      goto end;
×
44
    }
45
    realTbSuid = req.suid;
×
46
  } else if (msgType == TDMT_VND_DROP_STB) {
×
47
    SVDropStbReq req = {0};
×
48
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
49
      goto end;
×
50
    }
51
    realTbSuid = req.suid;
×
52
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
×
53
    SVCreateTbBatchReq req = {0};
×
54
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
×
55
      goto end;
×
56
    }
57

58
    int32_t        needRebuild = 0;
×
59
    SVCreateTbReq* pCreateReq = NULL;
×
60
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
61
      pCreateReq = req.pReqs + iReq;
×
62
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
63
        needRebuild++;
×
64
      }
65
    }
66
    if (needRebuild == 0) {
×
67
      // do nothing
68
    } else if (needRebuild == req.nReqs) {
×
69
      realTbSuid = tbSuid;
×
70
    } else {
71
      realTbSuid = tbSuid;
×
72
      SVCreateTbBatchReq reqNew = {0};
×
73
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
74
      if (reqNew.pArray == NULL) {
×
75
        tDeleteSVCreateTbBatchReq(&req);
×
76
        goto end;
×
77
      }
78
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
79
        pCreateReq = req.pReqs + iReq;
×
80
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
81
          reqNew.nReqs++;
×
82
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
83
            taosArrayDestroy(reqNew.pArray);
×
84
            tDeleteSVCreateTbBatchReq(&req);
×
85
            goto end;
×
86
          }
87
        }
88
      }
89

90
      int     tlen = 0;
×
91
      int32_t ret = 0;
×
92
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
93
      void* buf = taosMemoryMalloc(tlen);
×
94
      if (NULL == buf) {
×
95
        taosArrayDestroy(reqNew.pArray);
×
96
        tDeleteSVCreateTbBatchReq(&req);
×
97
        goto end;
×
98
      }
99
      SEncoder coderNew = {0};
×
100
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
101
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
102
      tEncoderClear(&coderNew);
×
103
      if (ret < 0) {
×
104
        taosMemoryFree(buf);
×
105
        taosArrayDestroy(reqNew.pArray);
×
106
        tDeleteSVCreateTbBatchReq(&req);
×
107
        goto end;
×
108
      }
109
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
110
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
111
      taosMemoryFree(buf);
×
112
      taosArrayDestroy(reqNew.pArray);
×
113
    }
114

115
    tDeleteSVCreateTbBatchReq(&req);
×
116
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
117
    SVAlterTbReq req = {0};
×
118

119
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
120
      goto end;
×
121
    }
122

123
    SMetaReader mr = {0};
×
124
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
125

126
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
127
      metaReaderClear(&mr);
×
128
      goto end;
×
129
    }
130
    realTbSuid = mr.me.ctbEntry.suid;
×
131
    metaReaderClear(&mr);
×
132
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
133
    SVDropTbBatchReq req = {0};
×
134

135
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
136
      goto end;
×
137
    }
138

139
    int32_t      needRebuild = 0;
×
140
    SVDropTbReq* pDropReq = NULL;
×
141
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
142
      pDropReq = req.pReqs + iReq;
×
143

144
      if (pDropReq->suid == tbSuid) {
×
145
        needRebuild++;
×
146
      }
147
    }
148
    if (needRebuild == 0) {
×
149
      // do nothing
150
    } else if (needRebuild == req.nReqs) {
×
151
      realTbSuid = tbSuid;
×
152
    } else {
153
      realTbSuid = tbSuid;
×
154
      SVDropTbBatchReq reqNew = {0};
×
155
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
156
      if (reqNew.pArray == NULL) {
×
157
        goto end;
×
158
      }
159
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
160
        pDropReq = req.pReqs + iReq;
×
161
        if (pDropReq->suid == tbSuid) {
×
162
          reqNew.nReqs++;
×
163
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
164
            taosArrayDestroy(reqNew.pArray);
×
165
            goto end;
×
166
          }
167
        }
168
      }
169

170
      int     tlen = 0;
×
171
      int32_t ret = 0;
×
172
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
173
      void* buf = taosMemoryMalloc(tlen);
×
174
      if (NULL == buf) {
×
175
        taosArrayDestroy(reqNew.pArray);
×
176
        goto end;
×
177
      }
178
      SEncoder coderNew = {0};
×
179
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
180
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
181
      tEncoderClear(&coderNew);
×
182
      if (ret != 0) {
×
183
        taosMemoryFree(buf);
×
184
        taosArrayDestroy(reqNew.pArray);
×
185
        goto end;
×
186
      }
187
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
188
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
189
      taosMemoryFree(buf);
×
190
      taosArrayDestroy(reqNew.pArray);
×
191
    }
192
  } else if (msgType == TDMT_VND_DELETE) {
×
193
    SDeleteRes req = {0};
×
194
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
195
      goto end;
×
196
    }
197
    realTbSuid = req.suid;
×
198
  }
199

200
end:
×
201
  tDecoderClear(&dcoder);
×
202
  bool tmp = tbSuid == realTbSuid;
×
203
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
×
204
  return tmp;
×
205
}
206

207
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
132,693✔
208
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
132,693!
209
    return -1;
×
210
  }
211
  int32_t code = -1;
132,758✔
212
  int32_t vgId = TD_VID(pTq->pVnode);
132,758✔
213
  int64_t id = pHandle->pWalReader->readerId;
132,758✔
214

215
  int64_t offset = *fetchOffset;
132,758✔
216
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
132,758✔
217
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
132,723✔
218
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
132,726✔
219

220
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
132,750!
221
          ", 0x%" PRIx64,
222
          vgId, offset, lastVer, committedVer, appliedVer, id);
223

224
  while (offset <= appliedVer) {
141,999✔
225
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
131,210!
226
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
227
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
228
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
229
      goto END;
×
230
    }
231

232
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
131,192!
233
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
234

235
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
131,203✔
236
      code = walFetchBody(pHandle->pWalReader);
121,718✔
237
      goto END;
121,706✔
238
    } else {
239
      if (pHandle->fetchMeta != WITH_DATA) {
9,485✔
240
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
679✔
241
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
679✔
242
          code = walFetchBody(pHandle->pWalReader);
546✔
243
          if (code < 0) {
546!
244
            goto END;
×
245
          }
246

247
          pHead = &(pHandle->pWalReader->pHead->head);
546✔
248
          if (isValValidForTable(pHandle, pHead)) {
546!
249
            code = 0;
546✔
250
            goto END;
546✔
251
          } else {
252
            offset++;
×
253
            code = -1;
×
254
            continue;
×
255
          }
256
        }
257
      }
258
      code = walSkipFetchBody(pHandle->pWalReader);
8,939✔
259
      if (code < 0) {
8,939!
260
        goto END;
×
261
      }
262
      offset++;
8,939✔
263
    }
264
    code = -1;
8,939✔
265
  }
266

267
END:
10,789✔
268
  *fetchOffset = offset;
133,041✔
269
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
133,041!
270
          ", applied:%" PRId64 ", 0x%" PRIx64,
271
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
272
  return code;
133,055✔
273
}
274

275
bool tqGetTablePrimaryKey(STqReader* pReader) {
20,392✔
276
  if (pReader == NULL) {
20,392!
277
    return false;
×
278
  }
279
  return pReader->hasPrimaryKey;
20,392✔
280
}
281

282
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
178✔
283
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
178!
284

285
  if (pReader == NULL) {
178!
286
    return;
×
287
  }
288
  bool            ret = false;
178✔
289
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
178✔
290
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
177!
291
    ret = true;
×
292
  }
293
  tDeleteSchemaWrapper(schema);
294
  pReader->hasPrimaryKey = ret;
178✔
295
}
296

297
STqReader* tqReaderOpen(SVnode* pVnode) {
1,619✔
298
  tqDebug("%s:%p", __FUNCTION__, pVnode);
1,619!
299
  if (pVnode == NULL) {
1,639!
300
    return NULL;
×
301
  }
302
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
1,639!
303
  if (pReader == NULL) {
1,639!
304
    return NULL;
×
305
  }
306

307
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
1,639✔
308
  if (pReader->pWalReader == NULL) {
1,638!
309
    taosMemoryFree(pReader);
×
310
    return NULL;
×
311
  }
312

313
  pReader->pVnodeMeta = pVnode->pMeta;
1,638✔
314
  pReader->pColIdList = NULL;
1,638✔
315
  pReader->cachedSchemaVer = 0;
1,638✔
316
  pReader->cachedSchemaSuid = 0;
1,638✔
317
  pReader->pSchemaWrapper = NULL;
1,638✔
318
  pReader->tbIdHash = NULL;
1,638✔
319
  pReader->pResBlock = NULL;
1,638✔
320

321
  int32_t code = createDataBlock(&pReader->pResBlock);
1,638✔
322
  if (code) {
1,638!
323
    terrno = code;
×
324
  }
325

326
  return pReader;
1,638✔
327
}
328

329
void tqReaderClose(STqReader* pReader) {
1,655✔
330
  tqDebug("%s:%p", __FUNCTION__, pReader);
1,655✔
331
  if (pReader == NULL) return;
1,655✔
332

333
  // close wal reader
334
  if (pReader->pWalReader) {
1,639!
335
    walCloseReader(pReader->pWalReader);
1,639✔
336
  }
337

338
  if (pReader->pSchemaWrapper) {
1,639✔
339
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
766!
340
  }
341

342
  taosMemoryFree(pReader->extSchema);
1,639!
343
  if (pReader->pColIdList) {
1,639✔
344
    taosArrayDestroy(pReader->pColIdList);
1,308✔
345
  }
346

347
  // free hash
348
  blockDataDestroy(pReader->pResBlock);
1,639✔
349
  taosHashCleanup(pReader->tbIdHash);
1,639✔
350
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
1,639✔
351

352
  taosHashCleanup(pReader->vtSourceScanInfo.pVirtualTables);
1,639✔
353
  taosHashCleanup(pReader->vtSourceScanInfo.pPhysicalTables);
1,639✔
354
  taosLRUCacheCleanup(pReader->vtSourceScanInfo.pPhyTblSchemaCache);
1,639✔
355
  taosMemoryFree(pReader);
1,639!
356
}
357

358
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
4,159✔
359
  if (pReader == NULL) {
4,159!
360
    return TSDB_CODE_INVALID_PARA;
×
361
  }
362
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
4,159✔
363
    return terrno;
141✔
364
  }
365
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
4,016!
366
  return 0;
4,018✔
367
}
368

369
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
541,611✔
370
  if (pReader == NULL) {
541,611!
371
    return false;
×
372
  }
373
  SWalReader* pWalReader = pReader->pWalReader;
541,611✔
374

375
  int64_t st = taosGetTimestampMs();
541,622✔
376
  while (1) {
487,727✔
377
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
1,029,349✔
378
    while (pReader->nextBlk < numOfBlocks) {
1,265,892✔
379
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
740,890✔
380
              pReader->msg.ver);
381

382
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
740,890✔
383
      if (pSubmitTbData == NULL) {
740,830!
384
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
385
                pReader->msg.ver);
386
        return false;
×
387
      }
388
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
740,835✔
389
        pReader->nextBlk += 1;
106✔
390
        continue;
106✔
391
      }
392
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
740,726!
393
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
504,291✔
394
        SSDataBlock* pRes = NULL;
504,291✔
395
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
504,291✔
396
        if (code == TSDB_CODE_SUCCESS) {
504,359!
397
          return true;
504,362✔
398
        }
399
      } else {
400
        pReader->nextBlk += 1;
236,597✔
401
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
236,597!
402
      }
403
    }
404

405
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
525,002✔
406
    pReader->msg.msgStr = NULL;
525,034✔
407

408
    int64_t elapsed = taosGetTimestampMs() - st;
525,002✔
409
    if (elapsed > 1000 || elapsed < 0) {
525,002!
410
      return false;
1✔
411
    }
412

413
    // try next message in wal file
414
    if (walNextValidMsg(pWalReader, false) < 0) {
525,001✔
415
      return false;
37,250✔
416
    }
417

418
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
487,739✔
419
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
487,739✔
420
    int64_t ver = pWalReader->pHead->head.version;
487,739✔
421
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL) != 0) {
487,739!
422
      return false;
×
423
    }
424
    pReader->nextBlk = 0;
487,727✔
425
  }
426
}
427

428
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList) {
609,431✔
429
  if (pReader == NULL) {
609,431!
430
    return TSDB_CODE_INVALID_PARA;
×
431
  }
432
  pReader->msg.msgStr = msgStr;
609,431✔
433
  pReader->msg.msgLen = msgLen;
609,431✔
434
  pReader->msg.ver = ver;
609,431✔
435

436
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
609,431✔
437
  SDecoder decoder = {0};
609,431✔
438

439
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
609,431✔
440
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit, rawList);
609,415✔
441
  tDecoderClear(&decoder);
609,002✔
442

443
  if (code != 0) {
609,440!
444
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
445
  }
446

447
  return code;
609,447✔
448
}
449

450
void tqReaderClearSubmitMsg(STqReader* pReader) {
242,836✔
451
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
242,836✔
452
  pReader->nextBlk = 0;
242,944✔
453
  pReader->msg.msgStr = NULL;
242,944✔
454
}
242,944✔
455

456
SWalReader* tqGetWalReader(STqReader* pReader) {
584,672✔
457
  if (pReader == NULL) {
584,672!
458
    return NULL;
×
459
  }
460
  return pReader->pWalReader;
584,672✔
461
}
462

463
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
541,604✔
464
  if (pReader == NULL) {
541,604!
465
    return NULL;
×
466
  }
467
  return pReader->pResBlock;
541,604✔
468
}
469

470
int64_t tqGetResultBlockTime(STqReader* pReader) {
541,588✔
471
  if (pReader == NULL) {
541,588!
472
    return 0;
×
473
  }
474
  return pReader->lastTs;
541,588✔
475
}
476

477
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
69,920✔
478
  int32_t code = false;
69,920✔
479
  int32_t lino = 0;
69,920✔
480
  int64_t uid = 0;
69,920✔
481
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
69,920!
482
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
69,920!
483
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
69,920!
484

485
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
69,920✔
486
  while (pReader->nextBlk < blockSz) {
73,422✔
487
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
36,735✔
488
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
36,735!
489
    uid = pSubmitTbData->uid;
36,735✔
490
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
36,735✔
491
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
36,736✔
492

493
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
3,515!
494
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
495
    pReader->nextBlk++;
3,514✔
496
  }
497

498
  tqReaderClearSubmitMsg(pReader);
36,687✔
499
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
36,720!
500

501
END:
36,720✔
502
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
69,941!
503
  return code;
69,928✔
504
}
505

506
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
169,815✔
507
  int32_t code = false;
169,815✔
508
  int32_t lino = 0;
169,815✔
509
  int64_t uid = 0;
169,815✔
510

511
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
169,815!
512
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
169,815!
513
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
169,815!
514

515
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
169,815✔
516
  while (pReader->nextBlk < blockSz) {
169,792✔
517
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
85,044✔
518
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
85,045!
519
    uid = pSubmitTbData->uid;
85,045✔
520
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
85,045✔
521
    TSDB_CHECK_NULL(ret, code, lino, END, true);
85,045!
522
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
523
    pReader->nextBlk++;
×
524
  }
525
  tqReaderClearSubmitMsg(pReader);
84,748✔
526
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
84,949!
527

528
END:
84,949✔
529
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
169,994!
530
  return code;
169,845✔
531
}
532

533
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
117,565✔
534
                    SExtSchema* extSrc) {
535
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
117,565!
536
    return TSDB_CODE_INVALID_PARA;
×
537
  }
538
  int32_t code = 0;
117,611✔
539

540
  int32_t cnt = 0;
117,611✔
541
  for (int32_t i = 0; i < pSrc->nCols; i++) {
698,850✔
542
    cnt += mask[i];
581,239✔
543
  }
544

545
  pDst->nCols = cnt;
117,611✔
546
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
117,611!
547
  if (pDst->pSchema == NULL) {
117,590!
548
    return TAOS_GET_TERRNO(terrno);
×
549
  }
550

551
  int32_t j = 0;
117,590✔
552
  for (int32_t i = 0; i < pSrc->nCols; i++) {
697,659✔
553
    if (mask[i]) {
579,884!
554
      pDst->pSchema[j++] = pSrc->pSchema[i];
579,900✔
555
      SColumnInfoData colInfo =
556
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
579,900✔
557
      if (extSrc != NULL) {
580,931✔
558
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
218✔
559
      }
560
      code = blockDataAppendColInfo(pBlock, &colInfo);
580,931✔
561
      if (code != 0) {
580,085!
562
        return code;
×
563
      }
564
    }
565
  }
566
  return 0;
117,775✔
567
}
568

569
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
589✔
570
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
589!
571
    return TSDB_CODE_INVALID_PARA;
×
572
  }
573
  SSDataBlock* pBlock = pReader->pResBlock;
589✔
574
  if (blockDataGetNumOfCols(pBlock) > 0) {
589✔
575
    blockDataDestroy(pBlock);
2✔
576
    int32_t code = createDataBlock(&pReader->pResBlock);
2✔
577
    if (code) {
2!
578
      return code;
×
579
    }
580
    pBlock = pReader->pResBlock;
2✔
581

582
    pBlock->info.id.uid = pReader->cachedSchemaUid;
2✔
583
    pBlock->info.version = pReader->msg.ver;
2✔
584
  }
585

586
  int32_t numOfCols = taosArrayGetSize(pColIdList);
589✔
587

588
  if (numOfCols == 0) {  // all columns are required
589!
589
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
590
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
591
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
592

593
      if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
×
594
        decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
595
      }
596
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
597
      if (code != TSDB_CODE_SUCCESS) {
×
598
        blockDataFreeRes(pBlock);
×
599
        return terrno;
×
600
      }
601
    }
602
  } else {
603
    if (numOfCols > pSchema->nCols) {
589✔
604
      numOfCols = pSchema->nCols;
2✔
605
    }
606

607
    int32_t i = 0;
589✔
608
    int32_t j = 0;
589✔
609
    while (i < pSchema->nCols && j < numOfCols) {
4,069✔
610
      SSchema* pColSchema = &pSchema->pSchema[i];
3,481✔
611
      col_id_t colIdSchema = pColSchema->colId;
3,481✔
612

613
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
3,481✔
614
      if (pColIdNeed == NULL) {
3,481!
615
        break;
×
616
      }
617
      if (colIdSchema < *pColIdNeed) {
3,481✔
618
        i++;
73✔
619
      } else if (colIdSchema > *pColIdNeed) {
3,408!
620
        j++;
×
621
      } else {
622
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
3,408✔
623
        if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
3,412!
624
          decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
132✔
625
        }
626
        int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
3,412✔
627
        if (code != TSDB_CODE_SUCCESS) {
3,407!
628
          return -1;
×
629
        }
630
        i++;
3,407✔
631
        j++;
3,407✔
632
      }
633
    }
634
  }
635

636
  return TSDB_CODE_SUCCESS;
588✔
637
}
638

639
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobRow2* pBlobRow2) {
×
640
  int32_t code = 0;
×
641
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
642
    return TSDB_CODE_INVALID_PARA;
×
643
  }
644
  // TODO(yhDeng)
645
  if (COL_VAL_IS_VALUE(pColVal)) {
×
646
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
647
    if (val == NULL) {
×
648
      return terrno;
×
649
    }
650

651
    uint64_t seq = 0;
×
652
    int32_t  len = 0;
×
653
    if (pColVal->value.pData != NULL) {
×
654
      tGetU64(pColVal->value.pData, &seq);
×
655
      SBlobItem item = {0};
×
656
      code = tBlobRowGet(pBlobRow2, seq, &item);
×
657
      if (code != 0) {
×
658
        taosMemoryFree(val);
×
659
        terrno = code;
×
660
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
661
        return code;
×
662
      }
663

664
      val = taosMemRealloc(val, item.dataLen + sizeof(BlobDataLenT));
×
665
      (void)memcpy(blobDataVal(val), item.data, item.dataLen);
×
666
      len = item.dataLen;
×
667
    }
668

669
    blobDataSetLen(val, len);
×
670
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
671

672
    taosMemoryFree(val);
×
673
  } else {
674
    colDataSetNULL(pColumnInfoData, idx);
×
675
  }
676
  return code;
×
677
}
678
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
44,934,881✔
679
  int32_t code = TSDB_CODE_SUCCESS;
44,934,881✔
680

681
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
44,934,881!
682
    if (COL_VAL_IS_VALUE(pColVal)) {
7,702,631!
683
      char val[65535 + 2] = {0};
8,573,235✔
684
      if (pColVal->value.pData != NULL) {
8,573,235!
685
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
8,667,879✔
686
      }
687
      varDataSetLen(val, pColVal->value.nData);
8,573,235✔
688
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
8,573,235✔
689
    } else {
690
      colDataSetNULL(pColumnInfoData, rowIndex);
×
691
    }
692
  } else {
693
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
37,232,250!
694
                         !COL_VAL_IS_VALUE(pColVal));
37,232,250✔
695
  }
696

697
  return code;
45,069,020✔
698
}
699

700
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
504,266✔
701
  if (pReader == NULL || pRes == NULL) {
504,266!
702
    return TSDB_CODE_INVALID_PARA;
×
703
  }
704
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
504,323!
705
  int32_t        code = 0;
504,375✔
706
  int32_t        line = 0;
504,375✔
707
  STSchema*      pTSchema = NULL;
504,375✔
708
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
504,375✔
709
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
504,383!
710
  SSDataBlock* pBlock = pReader->pResBlock;
504,383✔
711
  *pRes = pBlock;
504,383✔
712

713
  blockDataCleanup(pBlock);
504,383✔
714

715
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
504,352✔
716
  int32_t sversion = pSubmitTbData->sver;
504,352✔
717
  int64_t suid = pSubmitTbData->suid;
504,352✔
718
  int64_t uid = pSubmitTbData->uid;
504,352✔
719
  pReader->lastTs = pSubmitTbData->ctimeMs;
504,352✔
720

721
  pBlock->info.id.uid = uid;
504,352✔
722
  pBlock->info.version = pReader->msg.ver;
504,352✔
723

724
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
504,352✔
725
      (pReader->cachedSchemaVer != sversion)) {
503,765!
726
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
586✔
727
    taosMemoryFree(pReader->extSchema);
589!
728
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
589✔
729
    if (pReader->pSchemaWrapper == NULL) {
589!
730
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
731
             "version %d, possibly dropped table",
732
             vgId, suid, uid, pReader->cachedSchemaVer);
733
      pReader->cachedSchemaSuid = 0;
×
734
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
735
    }
736

737
    pReader->cachedSchemaUid = uid;
589✔
738
    pReader->cachedSchemaSuid = suid;
589✔
739
    pReader->cachedSchemaVer = sversion;
589✔
740

741
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
589!
742
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
743
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
744
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
745
    }
746
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
589✔
747
    TSDB_CHECK_CODE(code, line, END);
589!
748
    pBlock = pReader->pResBlock;
589✔
749
    *pRes = pBlock;
589✔
750
  }
751

752
  int32_t numOfRows = 0;
504,355✔
753
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
504,355✔
754
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
4✔
755
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
4!
756
    numOfRows = pCol->nVal;
4✔
757
  } else {
758
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
504,351✔
759
  }
760

761
  code = blockDataEnsureCapacity(pBlock, numOfRows);
504,355✔
762
  TSDB_CHECK_CODE(code, line, END);
504,355!
763
  pBlock->info.rows = numOfRows;
504,355✔
764
  int32_t colActual = blockDataGetNumOfCols(pBlock);
504,355✔
765

766
  // convert and scan one block
767
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
504,351✔
768
    SArray* pCols = pSubmitTbData->aCol;
4✔
769
    int32_t numOfCols = taosArrayGetSize(pCols);
4✔
770
    int32_t targetIdx = 0;
4✔
771
    int32_t sourceIdx = 0;
4✔
772
    while (targetIdx < colActual) {
18✔
773
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14✔
774
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
14!
775
      if (sourceIdx >= numOfCols) {
14✔
776
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
4!
777
        colDataSetNNULL(pColData, 0, numOfRows);
4!
778
        targetIdx++;
4✔
779
        continue;
4✔
780
      }
781

782
      uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
10!
783

784
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
10✔
785
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
10!
786
      SColVal colVal = {0};
10✔
787
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
10!
788
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
789
      if (pCol->cid < pColData->info.colId) {
10✔
790
        sourceIdx++;
4✔
791
      } else if (pCol->cid == pColData->info.colId) {
6✔
792
        for (int32_t i = 0; i < pCol->nVal; i++) {
12✔
793
          code = tColDataGetValue(pCol, i, &colVal);
8✔
794
          TSDB_CHECK_CODE(code, line, END);
8!
795

796
          if (isBlob == 0) {
8!
797
            code = doSetVal(pColData, i, &colVal);
8✔
798
          } else {
799
            code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobRow);
×
800
          }
801
          TSDB_CHECK_CODE(code, line, END);
8!
802
        }
803
        sourceIdx++;
4✔
804
        targetIdx++;
4✔
805
      } else {
806
        colDataSetNNULL(pColData, 0, numOfRows);
2!
807
        targetIdx++;
2✔
808
      }
809
    }
810
  } else {
811
    SArray*         pRows = pSubmitTbData->aRowP;
504,347✔
812
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
504,347✔
813
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
504,347✔
814
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
504,363!
815

816
    for (int32_t i = 0; i < numOfRows; i++) {
9,758,067✔
817
      SRow* pRow = taosArrayGetP(pRows, i);
8,833,432✔
818
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
8,808,671✔
819
      int32_t sourceIdx = 0;
8,793,705✔
820
      for (int32_t j = 0; j < colActual; j++) {
37,405,935✔
821
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
28,152,231✔
822
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
27,973,089!
823

824
        uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
28,017,082!
825
        while (1) {
2,160✔
826
          SColVal colVal = {0};
28,019,242✔
827
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
28,019,242✔
828
          TSDB_CHECK_CODE(code, line, END);
28,130,473!
829

830
          if (colVal.cid < pColData->info.colId) {
28,130,473✔
831
            sourceIdx++;
2,160✔
832
            continue;
2,160✔
833
          } else if (colVal.cid == pColData->info.colId) {
28,128,313!
834
            if (isBlob == 0) {
28,161,717!
835
              code = doSetVal(pColData, i, &colVal);
28,177,412✔
836
            } else {
837
              code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobRow);
×
838
            }
839

840
            TSDB_CHECK_CODE(code, line, END);
28,645,634!
841

842
            sourceIdx++;
28,645,634✔
843
            break;
28,612,230✔
844
          } else {
845
            colDataSetNULL(pColData, i);
×
846
            break;
×
847
          }
848
        }
849
      }
850
    }
851
  }
852

853
END:
924,635✔
854
  if (code != 0) {
924,639!
855
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
856
  }
857
  taosMemoryFreeClear(pTSchema);
504,350!
858
  return code;
504,372✔
859
}
860

861
#define PROCESS_VAL                                      \
862
  if (curRow == 0) {                                     \
863
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
864
    buildNew = true;                                     \
865
  } else {                                               \
866
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
867
    if (currentRowAssigned != assigned[j]) {             \
868
      assigned[j] = currentRowAssigned;                  \
869
      buildNew = true;                                   \
870
    }                                                    \
871
  }
872

873
#define SET_DATA                                                                                    \
874
  if (colVal.cid < pColData->info.colId) {                                                          \
875
    sourceIdx++;                                                                                    \
876
  } else if (colVal.cid == pColData->info.colId) {                                                  \
877
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
878
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobRow)); \
879
    } else {                                                                                        \
880
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
881
    }                                                                                               \
882
    sourceIdx++;                                                                                    \
883
    targetIdx++;                                                                                    \
884
  }
885

886
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
117,541✔
887
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
888
  int32_t         code = 0;
117,541✔
889
  SSchemaWrapper* pSW = NULL;
117,541✔
890
  SSDataBlock*    block = NULL;
117,541✔
891
  if (taosArrayGetSize(blocks) > 0) {
117,541!
892
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
893
    TQ_NULL_GO_TO_END(pLastBlock);
×
894
    pLastBlock->info.rows = curRow - *lastRow;
×
895
    *lastRow = curRow;
×
896
  }
897

898
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
117,559!
899
  TQ_NULL_GO_TO_END(block);
117,603!
900

901
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
117,603!
902
  TQ_NULL_GO_TO_END(pSW);
117,626!
903

904
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
117,626!
905
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
117,591!
906
          (int32_t)taosArrayGetSize(block->pDataBlock));
907

908
  block->info.id.uid = pSubmitTbData->uid;
117,591✔
909
  block->info.version = pReader->msg.ver;
117,591✔
910
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
117,591!
911
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
117,564!
912
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
117,509!
913
  pSW = NULL;
117,509✔
914

915
  taosMemoryFreeClear(block);
117,509!
916

917
END:
×
918
  if (code != 0) {
117,633!
919
    tqError("processBuildNew failed, code:%d", code);
×
920
  }
921
  tDeleteSchemaWrapper(pSW);
117,633!
922
  blockDataFreeRes(block);
117,595✔
923
  taosMemoryFree(block);
117,579!
924
  return code;
117,587✔
925
}
926
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
206✔
927
  int32_t code = 0;
206✔
928
  int32_t curRow = 0;
206✔
929
  int32_t lastRow = 0;
206✔
930

931
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
206✔
932
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
206!
933
  TQ_NULL_GO_TO_END(assigned);
206!
934

935
  SArray*   pCols = pSubmitTbData->aCol;
206✔
936
  SColData* pCol = taosArrayGet(pCols, 0);
206✔
937
  TQ_NULL_GO_TO_END(pCol);
206!
938
  int32_t numOfRows = pCol->nVal;
206✔
939
  int32_t numOfCols = taosArrayGetSize(pCols);
206✔
940
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
206!
941
          numOfRows);
942
  for (int32_t i = 0; i < numOfRows; i++) {
119,464✔
943
    bool buildNew = false;
86,955✔
944

945
    for (int32_t j = 0; j < numOfCols; j++) {
242,576✔
946
      pCol = taosArrayGet(pCols, j);
163,398✔
947
      TQ_NULL_GO_TO_END(pCol);
156,353!
948
      SColVal colVal = {0};
156,353✔
949
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
156,353!
950
      PROCESS_VAL
155,621!
951
    }
952

953
    if (buildNew) {
79,178✔
954
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
206!
955
    }
956

957
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
79,178✔
958
    TQ_NULL_GO_TO_END(pBlock);
74,497!
959

960
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
74,497!
961
            (int32_t)taosArrayGetSize(blocks));
962

963
    int32_t targetIdx = 0;
74,497✔
964
    int32_t sourceIdx = 0;
74,497✔
965
    int32_t colActual = blockDataGetNumOfCols(pBlock);
74,497✔
966
    while (targetIdx < colActual) {
299,378✔
967
      pCol = taosArrayGet(pCols, sourceIdx);
180,120✔
968
      TQ_NULL_GO_TO_END(pCol);
173,564!
969
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
173,564✔
970
      TQ_NULL_GO_TO_END(pColData);
161,674!
971
      SColVal colVal = {0};
161,674✔
972
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
161,674!
973
      SET_DATA
152,805!
974
    }
975

976
    curRow++;
119,258✔
977
  }
978
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
32,509✔
979
  pLastBlock->info.rows = curRow - lastRow;
206✔
980
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
206!
981
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
982
END:
206✔
983
  if (code != TSDB_CODE_SUCCESS) {
206!
984
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
985
  }
986
  taosMemoryFree(assigned);
206!
987
  return code;
206✔
988
}
989

990
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
117,394✔
991
  int32_t   code = 0;
117,394✔
992
  STSchema* pTSchema = NULL;
117,394✔
993

994
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
117,394✔
995
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
117,394!
996
  TQ_NULL_GO_TO_END(assigned);
117,458!
997

998
  int32_t curRow = 0;
117,458✔
999
  int32_t lastRow = 0;
117,458✔
1000
  SArray* pRows = pSubmitTbData->aRowP;
117,458✔
1001
  int32_t numOfRows = taosArrayGetSize(pRows);
117,458✔
1002
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
117,415✔
1003
  TQ_NULL_GO_TO_END(pTSchema);
117,428!
1004
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
117,428!
1005

1006
  for (int32_t i = 0; i < numOfRows; i++) {
3,898,076✔
1007
    bool  buildNew = false;
3,654,861✔
1008
    SRow* pRow = taosArrayGetP(pRows, i);
3,654,861✔
1009
    TQ_NULL_GO_TO_END(pRow);
3,646,843!
1010

1011
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
19,575,133✔
1012
      SColVal colVal = {0};
15,851,607✔
1013
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
15,851,607!
1014
      PROCESS_VAL
15,926,662!
1015
    }
1016

1017
    if (buildNew) {
3,723,526✔
1018
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
117,403!
1019
    }
1020

1021
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
3,723,490✔
1022
    TQ_NULL_GO_TO_END(pBlock);
3,638,964!
1023

1024
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
3,638,964!
1025
            (int32_t)taosArrayGetSize(blocks));
1026

1027
    int32_t targetIdx = 0;
3,638,964✔
1028
    int32_t sourceIdx = 0;
3,638,964✔
1029
    int32_t colActual = blockDataGetNumOfCols(pBlock);
3,638,964✔
1030
    while (targetIdx < colActual) {
19,839,007✔
1031
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
16,058,311✔
1032
      SColVal          colVal = {0};
15,981,404✔
1033
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
15,981,404!
1034
      SET_DATA
15,893,182!
1035
    }
1036

1037
    curRow++;
3,780,696✔
1038
  }
1039
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
243,215✔
1040
  pLastBlock->info.rows = curRow - lastRow;
117,176✔
1041

1042
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
117,176!
1043
          (int)taosArrayGetSize(blocks));
1044
END:
117,176✔
1045
  if (code != TSDB_CODE_SUCCESS) {
117,173!
1046
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1047
  }
1048
  taosMemoryFreeClear(pTSchema);
117,173!
1049
  taosMemoryFree(assigned);
117,372!
1050
  return code;
117,345✔
1051
}
1052

1053
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
29✔
1054
  int32_t code = 0;
29✔
1055
  int32_t lino = 0;
29✔
1056
  void*   createReq = NULL;
29✔
1057
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
29!
1058
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
29!
1059

1060
  if (pRsp->createTableNum == 0) {
29✔
1061
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
15✔
1062
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
15!
1063
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
15✔
1064
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
15!
1065
  }
1066

1067
  uint32_t len = 0;
29✔
1068
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
29!
1069
  TSDB_CHECK_CODE(code, lino, END);
29!
1070
  createReq = taosMemoryCalloc(1, len);
29!
1071
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
29!
1072

1073
  SEncoder encoder = {0};
29✔
1074
  tEncoderInit(&encoder, createReq, len);
29✔
1075
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
29✔
1076
  tEncoderClear(&encoder);
29✔
1077
  TSDB_CHECK_CODE(code, lino, END);
29!
1078
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
58!
1079
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
58!
1080
  pRsp->createTableNum++;
29✔
1081
  tqTrace("build create table info msg success");
29!
1082

1083
END:
29✔
1084
  if (code != 0) {
29!
1085
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1086
    taosMemoryFree(createReq);
×
1087
  }
1088
  return code;
29✔
1089
}
1090

1091
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
118,275✔
1092
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1093
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
118,275!
1094
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
118,275✔
1095
  if (pSubmitTbData == NULL) {
118,269!
1096
    return terrno;
×
1097
  }
1098
  pReader->nextBlk++;
118,270✔
1099

1100
  if (pSubmitTbDataRet) {
118,270✔
1101
    *pSubmitTbDataRet = pSubmitTbData;
118,269✔
1102
  }
1103

1104
  if (fetchMeta == ONLY_META) {
118,270✔
1105
    if (pSubmitTbData->pCreateTbReq != NULL) {
22✔
1106
      if (pRsp->createTableReq == NULL) {
4✔
1107
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
1✔
1108
        if (pRsp->createTableReq == NULL) {
1!
1109
          return terrno;
×
1110
        }
1111
      }
1112
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
8!
1113
        return terrno;
×
1114
      }
1115
      pSubmitTbData->pCreateTbReq = NULL;
4✔
1116
    }
1117
    return 0;
22✔
1118
  }
1119

1120
  int32_t sversion = pSubmitTbData->sver;
118,248✔
1121
  int64_t uid = pSubmitTbData->uid;
118,248✔
1122
  pReader->lastBlkUid = uid;
118,248✔
1123

1124
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
118,248✔
1125
  taosMemoryFreeClear(pReader->extSchema);
118,250!
1126
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
118,250✔
1127
  if (pReader->pSchemaWrapper == NULL) {
118,230✔
1128
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
603✔
1129
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1130
    pReader->cachedSchemaSuid = 0;
561✔
1131
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
561✔
1132
  }
1133

1134
  if (pSubmitTbData->pCreateTbReq != NULL) {
117,627✔
1135
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
29✔
1136
    if (code != 0) {
29!
1137
      return code;
×
1138
    }
1139
  } else if (rawList != NULL) {
117,598!
1140
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1141
      return terrno;
×
1142
    }
1143
    pReader->pSchemaWrapper = NULL;
×
1144
    return 0;
×
1145
  }
1146

1147
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
117,627✔
1148
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
206✔
1149
  } else {
1150
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
117,421✔
1151
  }
1152
}
1153

1154
int32_t tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList, const char* id) {
1,307✔
1155
  if (pReader == NULL) {
1,307!
1156
    return TSDB_CODE_SUCCESS;
×
1157
  }
1158
  pReader->pColIdList = pColIdList;
1,307✔
1159
  return tqCollectPhysicalTables(pReader, id);
1,307✔
1160
}
1161

1162
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
1,383✔
1163
  if (pReader == NULL || tbUidList == NULL) {
1,383!
1164
    return TSDB_CODE_SUCCESS;
×
1165
  }
1166
  if (pReader->tbIdHash) {
1,383✔
1167
    taosHashClear(pReader->tbIdHash);
14✔
1168
  } else {
1169
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1,369✔
1170
    if (pReader->tbIdHash == NULL) {
1,369!
1171
      tqError("s-task:%s failed to init hash table", id);
×
1172
      return terrno;
×
1173
    }
1174
  }
1175

1176
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
33,282✔
1177
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
31,882✔
1178
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
31,879✔
1179
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
3!
1180
      continue;
×
1181
    }
1182
  }
1183

1184
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
1,382✔
1185
  return TSDB_CODE_SUCCESS;
1,383✔
1186
}
1187

1188
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
1,052✔
1189
  if (pReader == NULL || pTableUidList == NULL) {
1,052!
1190
    return;
×
1191
  }
1192
  if (pReader->tbIdHash == NULL) {
1,052!
1193
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1194
    if (pReader->tbIdHash == NULL) {
×
1195
      tqError("failed to init hash table");
×
1196
      return;
×
1197
    }
1198
  }
1199

1200
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
1,052✔
1201
  for (int i = 0; i < numOfTables; i++) {
1,699✔
1202
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
647✔
1203
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
647!
1204
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1205
      continue;
×
1206
    }
1207
  }
1208
}
1209

1210
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1211
  if (pReader == NULL) {
×
1212
    return false;
×
1213
  }
1214
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1215
}
1216

1217
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1218
  if (pReader == NULL) {
×
1219
    return false;
×
1220
  }
1221
  return pReader->msg.msgStr == NULL;
×
1222
}
1223

1224
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
3✔
1225
  if (pReader == NULL || tbUidList == NULL) {
3!
1226
    return;
×
1227
  }
1228
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
6✔
1229
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
3✔
1230
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
3!
1231
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1232
    }
1233
  }
1234
}
1235

1236
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
188,648✔
1237
  if (pTq == NULL) {
188,648!
1238
    return 0;  // mounted vnode may have no tq
×
1239
  }
1240
  if (tbUidList == NULL) {
188,648!
1241
    return TSDB_CODE_INVALID_PARA;
×
1242
  }
1243
  void*   pIter = NULL;
188,648✔
1244
  int32_t vgId = TD_VID(pTq->pVnode);
188,648✔
1245

1246
  // update the table list for each consumer handle
1247
  taosWLockLatch(&pTq->lock);
188,648✔
1248
  while (1) {
4,224✔
1249
    pIter = taosHashIterate(pTq->pHandle, pIter);
192,876✔
1250
    if (pIter == NULL) {
192,876✔
1251
      break;
188,651✔
1252
    }
1253

1254
    STqHandle* pTqHandle = (STqHandle*)pIter;
4,225✔
1255
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4,225✔
1256
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
1,055✔
1257
      if (code != 0) {
1,055!
1258
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1259
        continue;
×
1260
      }
1261
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
3,170✔
1262
      if (!isAdd) {
3,155✔
1263
        int32_t sz = taosArrayGetSize(tbUidList);
1,062✔
1264
        for (int32_t i = 0; i < sz; i++) {
1,062!
1265
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1266
          if (tbUid &&
×
1267
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1268
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1269
            continue;
×
1270
          }
1271
        }
1272
      }
1273
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
15✔
1274
      if (isAdd) {
14!
1275
        SArray* list = NULL;
14✔
1276
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
14✔
1277
                                    &list, pTqHandle->execHandle.task);
1278
        if (ret != TDB_CODE_SUCCESS) {
14!
1279
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1280
                  pTqHandle->consumerId);
1281
          taosArrayDestroy(list);
×
1282
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1283
          taosWUnLockLatch(&pTq->lock);
×
1284

1285
          return ret;
×
1286
        }
1287
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
14✔
1288
        taosArrayDestroy(list);
14✔
1289
      } else {
1290
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1291
      }
1292
    }
1293
  }
1294
  taosWUnLockLatch(&pTq->lock);
188,651✔
1295

1296
  // update the table list handle for each stream scanner/wal reader
1297
/* STREAMTODO
1298
  streamMetaWLock(pTq->pStreamMeta);
1299
  while (1) {
1300
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
1301
    if (pIter == NULL) {
1302
      break;
1303
    }
1304

1305
    int64_t      refId = *(int64_t*)pIter;
1306
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
1307
    if (pTask != NULL) {
1308
      int32_t taskId = pTask->id.taskId;
1309

1310
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
1311
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
1312
        if (code != 0) {
1313
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
1314
        }
1315
      }
1316
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
1317
      if (ret) {
1318
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
1319
      }
1320
    }
1321
  }
1322

1323
  streamMetaWUnLock(pTq->pStreamMeta);
1324
*/  
1325
  return 0;
188,654✔
1326
}
1327

1328
static void destroySourceScanTables(void* ptr) {
×
1329
  SArray** pTables = ptr;
×
1330
  if (pTables && *pTables) {
×
1331
    taosArrayDestroy(*pTables);
×
1332
    *pTables = NULL;
×
1333
  }
1334
}
×
1335

1336
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1337
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1338
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1339
  if (pCol1->vColId == pCol2->vColId) {
×
1340
    return 0;
×
1341
  } else if (pCol1->vColId < pCol2->vColId) {
×
1342
    return -1;
×
1343
  } else {
1344
    return 1;
×
1345
  }
1346
}
1347

1348
int32_t tqReaderSetVtableInfo(STqReader* pReader, void* vnode, void* ptr, SSHashObj* pVtableInfos,
×
1349
                              SSDataBlock** ppResBlock, const char* idstr) {
1350
  int32_t            code = TSDB_CODE_SUCCESS;
×
1351
  int32_t            lino = 0;
×
1352
  SStorageAPI*       pAPI = ptr;
×
1353
  SVTSourceScanInfo* pScanInfo = NULL;
×
1354
  SHashObj*          pVirtualTables = NULL;
×
1355
  SMetaReader        metaReader = {0};
×
1356
  SVTColInfo         colInfo = {0};
×
1357
  SSchemaWrapper*    schema = NULL;
×
1358

1359
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1360
  TSDB_CHECK_NULL(vnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1361
  TSDB_CHECK_NULL(pAPI, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1362

1363
  pScanInfo = &pReader->vtSourceScanInfo;
×
1364
  taosHashCleanup(pScanInfo->pVirtualTables);
×
1365
  pScanInfo->pVirtualTables = NULL;
×
1366

1367
  if (tSimpleHashGetSize(pVtableInfos) == 0) {
×
1368
    goto _end;
×
1369
  }
1370

1371
  pVirtualTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1372
  TSDB_CHECK_NULL(pVirtualTables, code, lino, _end, terrno);
×
1373
  taosHashSetFreeFp(pVirtualTables, destroySourceScanTables);
×
1374

1375
  int32_t iter = 0;
×
1376
  void*   px = tSimpleHashIterate(pVtableInfos, NULL, &iter);
×
1377
  while (px != NULL) {
×
1378
    int64_t vTbUid = *(int64_t*)tSimpleHashGetKey(px, NULL);
×
1379
    SArray* pColInfos = taosArrayInit(8, sizeof(SVTColInfo));
×
1380
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, terrno);
×
1381
    code = taosHashPut(pVirtualTables, &vTbUid, sizeof(int64_t), &pColInfos, POINTER_BYTES);
×
1382
    TSDB_CHECK_CODE(code, lino, _end);
×
1383

1384
    SSHashObj* pPhysicalTables = *(SSHashObj**)px;
×
1385
    int32_t    iterIn = 0;
×
1386
    void*      pxIn = tSimpleHashIterate(pPhysicalTables, NULL, &iterIn);
×
1387
    while (pxIn != NULL) {
×
1388
      char* physicalTableName = tSimpleHashGetKey(pxIn, NULL);
×
1389
      pAPI->metaReaderFn.clearReader(&metaReader);
×
1390
      pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1391
      code = pAPI->metaReaderFn.getTableEntryByName(&metaReader, physicalTableName);
×
1392
      TSDB_CHECK_CODE(code, lino, _end);
×
1393
      pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1394
      colInfo.pTbUid = metaReader.me.uid;
×
1395

1396
      switch (metaReader.me.type) {
×
1397
        case TSDB_CHILD_TABLE: {
×
1398
          int64_t suid = metaReader.me.ctbEntry.suid;
×
1399
          pAPI->metaReaderFn.clearReader(&metaReader);
×
1400
          pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1401
          code = pAPI->metaReaderFn.getTableEntryByUid(&metaReader, suid);
×
1402
          TSDB_CHECK_CODE(code, lino, _end);
×
1403
          pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1404
          schema = &metaReader.me.stbEntry.schemaRow;
×
1405
          break;
×
1406
        }
1407
        case TSDB_NORMAL_TABLE: {
×
1408
          schema = &metaReader.me.ntbEntry.schemaRow;
×
1409
          break;
×
1410
        }
1411
        default: {
×
1412
          tqError("invalid table type: %d", metaReader.me.type);
×
1413
          code = TSDB_CODE_INVALID_PARA;
×
1414
          TSDB_CHECK_CODE(code, lino, _end);
×
1415
        }
1416
      }
1417

1418
      SArray* pCols = *(SArray**)pxIn;
×
1419
      int32_t ncols = taosArrayGetSize(pCols);
×
1420
      for (int32_t i = 0; i < ncols; ++i) {
×
1421
        SColIdName* pCol = taosArrayGet(pCols, i);
×
1422
        colInfo.vColId = pCol->colId;
×
1423

1424
        for (int32_t j = 0; j < schema->nCols; ++j) {
×
1425
          if (strncmp(pCol->colName, schema->pSchema[j].name, strlen(schema->pSchema[j].name)) == 0) {
×
1426
            colInfo.pColId = schema->pSchema[j].colId;
×
1427
            void* px = taosArrayPush(pColInfos, &colInfo);
×
1428
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1429
            break;
×
1430
          }
1431
        }
1432
      }
1433

1434
      taosArraySort(pColInfos, compareSVTColInfo);
×
1435
      pxIn = tSimpleHashIterate(pPhysicalTables, pxIn, &iterIn);
×
1436
    }
1437

1438
    px = tSimpleHashIterate(pVtableInfos, px, &iter);
×
1439
  }
1440

1441
  pScanInfo->pVirtualTables = pVirtualTables;
×
1442
  pVirtualTables = NULL;
×
1443

1444
  // set the result data block
1445
  if (pReader->pResBlock) {
×
1446
    blockDataDestroy(pReader->pResBlock);
×
1447
  }
1448
  pReader->pResBlock = *ppResBlock;
×
1449
  *ppResBlock = NULL;
×
1450

1451
  // update reader callback for vtable source scan
1452
  pAPI->tqReaderFn.tqNextBlockImpl = tqNextVTableSourceBlockImpl;
×
1453
  pAPI->tqReaderFn.tqReaderIsQueriedTable = tqReaderIsQueriedSourceTable;
×
1454

1455
_end:
×
1456
  if (code != TSDB_CODE_SUCCESS) {
×
1457
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1458
  }
1459
  pAPI->metaReaderFn.clearReader(&metaReader);
×
1460
  if (pVirtualTables != NULL) {
×
1461
    taosHashCleanup(pVirtualTables);
×
1462
  }
1463
  return code;
×
1464
}
1465

1466
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
1,308✔
1467
  int32_t            code = TSDB_CODE_SUCCESS;
1,308✔
1468
  int32_t            lino = 0;
1,308✔
1469
  SVTSourceScanInfo* pScanInfo = NULL;
1,308✔
1470
  SHashObj*          pVirtualTables = NULL;
1,308✔
1471
  SHashObj*          pPhysicalTables = NULL;
1,308✔
1472
  void*              pIter = NULL;
1,308✔
1473
  void*              px = NULL;
1,308✔
1474

1475
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,308!
1476

1477
  pScanInfo = &pReader->vtSourceScanInfo;
1,308✔
1478
  taosHashCleanup(pScanInfo->pPhysicalTables);
1,308✔
1479
  pScanInfo->pPhysicalTables = NULL;
1,308✔
1480
  taosLRUCacheCleanup(pScanInfo->pPhyTblSchemaCache);
1,308✔
1481
  pScanInfo->pPhyTblSchemaCache = NULL;
1,308✔
1482
  pScanInfo->nextVirtualTableIdx = -1;
1,308✔
1483
  pScanInfo->metaFetch = 0;
1,308✔
1484
  pScanInfo->cacheHit = 0;
1,308✔
1485

1486
  pVirtualTables = pScanInfo->pVirtualTables;
1,308✔
1487
  if (taosHashGetSize(pVirtualTables) == 0 || taosArrayGetSize(pReader->pColIdList) == 0) {
1,308!
1488
    goto _end;
1,308✔
1489
  }
1490

1491
  pPhysicalTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1492
  TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno);
×
1493
  taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables);
×
1494

1495
  pIter = taosHashIterate(pVirtualTables, NULL);
×
1496
  while (pIter != NULL) {
×
1497
    int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL);
×
1498
    SArray* pColInfos = *(SArray**)pIter;
×
1499
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1500

1501
    // Traverse all required columns and collect corresponding physical tables
1502
    int32_t nColInfos = taosArrayGetSize(pColInfos);
×
1503
    int32_t nOutputCols = taosArrayGetSize(pReader->pColIdList);
×
1504
    for (int32_t i = 0, j = 0; i < nColInfos && j < nOutputCols;) {
×
1505
      SVTColInfo* pCol = taosArrayGet(pColInfos, i);
×
1506
      col_id_t    colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
×
1507
      if (pCol->vColId < colIdNeed) {
×
1508
        i++;
×
1509
      } else if (pCol->vColId > colIdNeed) {
×
1510
        j++;
×
1511
      } else {
1512
        SArray* pRelatedVTs = NULL;
×
1513
        px = taosHashGet(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t));
×
1514
        if (px == NULL) {
×
1515
          pRelatedVTs = taosArrayInit(8, sizeof(int64_t));
×
1516
          TSDB_CHECK_NULL(pRelatedVTs, code, lino, _end, terrno);
×
1517
          code = taosHashPut(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t), &pRelatedVTs, POINTER_BYTES);
×
1518
          if (code != TSDB_CODE_SUCCESS) {
×
1519
            taosArrayDestroy(pRelatedVTs);
×
1520
            TSDB_CHECK_CODE(code, lino, _end);
×
1521
          }
1522
        } else {
1523
          pRelatedVTs = *(SArray**)px;
×
1524
        }
1525
        if (taosArrayGetSize(pRelatedVTs) == 0 || *(int64_t*)taosArrayGetLast(pRelatedVTs) != vTbUid) {
×
1526
          px = taosArrayPush(pRelatedVTs, &vTbUid);
×
1527
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1528
        }
1529
        i++;
×
1530
        j++;
×
1531
      }
1532
    }
1533
    pIter = taosHashIterate(pVirtualTables, pIter);
×
1534
  }
1535

1536
  pScanInfo->pPhysicalTables = pPhysicalTables;
×
1537
  pPhysicalTables = NULL;
×
1538

1539
  if (taosHashGetSize(pScanInfo->pPhysicalTables) > 0) {
×
1540
    pScanInfo->pPhyTblSchemaCache = taosLRUCacheInit(1024 * 128, -1, .5);
×
1541
    TSDB_CHECK_NULL(pScanInfo->pPhyTblSchemaCache, code, lino, _end, terrno);
×
1542
  }
1543

1544
_end:
×
1545
  if (code != TSDB_CODE_SUCCESS) {
1,308!
1546
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1547
  }
1548
  if (pIter != NULL) {
1,308!
1549
    taosHashCancelIterate(pReader->tbIdHash, pIter);
×
1550
  }
1551
  if (pPhysicalTables != NULL) {
1,308!
1552
    taosHashCleanup(pPhysicalTables);
×
1553
  }
1554
  return code;
1,308✔
1555
}
1556

1557
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1558
  if (value) {
×
1559
    SSchemaWrapper* pSchemaWrapper = value;
×
1560
    tDeleteSchemaWrapper(pSchemaWrapper);
1561
  }
1562
}
×
1563

1564
bool tqNextVTableSourceBlockImpl(STqReader* pReader, const char* idstr) {
×
1565
  int32_t            code = TSDB_CODE_SUCCESS;
×
1566
  int32_t            lino = 0;
×
1567
  SVTSourceScanInfo* pScanInfo = NULL;
×
1568

1569
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1570

1571
  pScanInfo = &pReader->vtSourceScanInfo;
×
1572
  if (pReader->msg.msgStr == NULL || taosHashGetSize(pScanInfo->pPhysicalTables) == 0) {
×
1573
    return false;
×
1574
  }
1575

1576
  if (pScanInfo->nextVirtualTableIdx >= 0) {
×
1577
    // The data still needs to be converted into the virtual table result block
1578
    return true;
×
1579
  }
1580

1581
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
1582
  while (pReader->nextBlk < blockSz) {
×
1583
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
1584
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, _end, terrno);
×
1585
    int64_t pTbUid = pSubmitTbData->uid;
×
1586
    void*   px = taosHashGet(pScanInfo->pPhysicalTables, &pTbUid, sizeof(int64_t));
×
1587
    if (px != NULL) {
×
1588
      SArray* pRelatedVTs = *(SArray**)px;
×
1589
      if (taosArrayGetSize(pRelatedVTs) > 0) {
×
1590
        pScanInfo->nextVirtualTableIdx = 0;
×
1591
        return true;
×
1592
      }
1593
    }
1594
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, pTbUid);
×
1595
    pReader->nextBlk++;
×
1596
  }
1597

1598
  tqReaderClearSubmitMsg(pReader);
×
1599
  tqTrace("iterator data block end, total block num:%d", blockSz);
×
1600

1601
_end:
×
1602
  if (code != TSDB_CODE_SUCCESS) {
×
1603
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1604
  }
1605
  return false;
×
1606
}
1607

1608
bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) {
×
1609
  if (pReader == NULL) {
×
1610
    return false;
×
1611
  }
1612
  return taosHashGet(pReader->vtSourceScanInfo.pPhysicalTables, &uid, sizeof(uint64_t)) != NULL;
×
1613
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc