• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.47
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr);
20

21
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
669✔
22
  if (pHandle == NULL || pHead == NULL) {
669!
23
    return false;
×
24
  }
25
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
669✔
26
    return true;
665✔
27
  }
28

29
  int16_t msgType = pHead->msgType;
4✔
30
  char*   body = pHead->body;
4✔
31
  int32_t bodyLen = pHead->bodyLen;
4✔
32

33
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
4✔
34
  int64_t  realTbSuid = 0;
4✔
35
  SDecoder dcoder = {0};
4✔
36
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
4✔
37
  int32_t  len = bodyLen - sizeof(SMsgHead);
4✔
38
  tDecoderInit(&dcoder, data, len);
4✔
39

40
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
4!
41
    SVCreateStbReq req = {0};
2✔
42
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
2!
43
      goto end;
×
44
    }
45
    realTbSuid = req.suid;
2✔
46
  } else if (msgType == TDMT_VND_DROP_STB) {
2!
47
    SVDropStbReq req = {0};
×
48
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
49
      goto end;
×
50
    }
51
    realTbSuid = req.suid;
×
52
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2!
53
    SVCreateTbBatchReq req = {0};
2✔
54
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
2!
55
      goto end;
×
56
    }
57

58
    int32_t        needRebuild = 0;
2✔
59
    SVCreateTbReq* pCreateReq = NULL;
2✔
60
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
4✔
61
      pCreateReq = req.pReqs + iReq;
2✔
62
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
2!
63
        needRebuild++;
×
64
      }
65
    }
66
    if (needRebuild == 0) {
2!
67
      // do nothing
68
    } else if (needRebuild == req.nReqs) {
×
69
      realTbSuid = tbSuid;
×
70
    } else {
71
      realTbSuid = tbSuid;
×
72
      SVCreateTbBatchReq reqNew = {0};
×
73
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
74
      if (reqNew.pArray == NULL) {
×
75
        tDeleteSVCreateTbBatchReq(&req);
×
76
        goto end;
×
77
      }
78
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
79
        pCreateReq = req.pReqs + iReq;
×
80
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
81
          reqNew.nReqs++;
×
82
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
83
            taosArrayDestroy(reqNew.pArray);
×
84
            tDeleteSVCreateTbBatchReq(&req);
×
85
            goto end;
×
86
          }
87
        }
88
      }
89

90
      int     tlen = 0;
×
91
      int32_t ret = 0;
×
92
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
93
      void* buf = taosMemoryMalloc(tlen);
×
94
      if (NULL == buf) {
×
95
        taosArrayDestroy(reqNew.pArray);
×
96
        tDeleteSVCreateTbBatchReq(&req);
×
97
        goto end;
×
98
      }
99
      SEncoder coderNew = {0};
×
100
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
101
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
102
      tEncoderClear(&coderNew);
×
103
      if (ret < 0) {
×
104
        taosMemoryFree(buf);
×
105
        taosArrayDestroy(reqNew.pArray);
×
106
        tDeleteSVCreateTbBatchReq(&req);
×
107
        goto end;
×
108
      }
109
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
110
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
111
      taosMemoryFree(buf);
×
112
      taosArrayDestroy(reqNew.pArray);
×
113
    }
114

115
    tDeleteSVCreateTbBatchReq(&req);
2✔
116
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
117
    SVAlterTbReq req = {0};
×
118

119
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
120
      goto end;
×
121
    }
122

123
    SMetaReader mr = {0};
×
124
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
125

126
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
127
      metaReaderClear(&mr);
×
128
      goto end;
×
129
    }
130
    realTbSuid = mr.me.ctbEntry.suid;
×
131
    metaReaderClear(&mr);
×
132
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
133
    SVDropTbBatchReq req = {0};
×
134

135
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
136
      goto end;
×
137
    }
138

139
    int32_t      needRebuild = 0;
×
140
    SVDropTbReq* pDropReq = NULL;
×
141
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
142
      pDropReq = req.pReqs + iReq;
×
143

144
      if (pDropReq->suid == tbSuid) {
×
145
        needRebuild++;
×
146
      }
147
    }
148
    if (needRebuild == 0) {
×
149
      // do nothing
150
    } else if (needRebuild == req.nReqs) {
×
151
      realTbSuid = tbSuid;
×
152
    } else {
153
      realTbSuid = tbSuid;
×
154
      SVDropTbBatchReq reqNew = {0};
×
155
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
156
      if (reqNew.pArray == NULL) {
×
157
        goto end;
×
158
      }
159
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
160
        pDropReq = req.pReqs + iReq;
×
161
        if (pDropReq->suid == tbSuid) {
×
162
          reqNew.nReqs++;
×
163
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
164
            taosArrayDestroy(reqNew.pArray);
×
165
            goto end;
×
166
          }
167
        }
168
      }
169

170
      int     tlen = 0;
×
171
      int32_t ret = 0;
×
172
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
173
      void* buf = taosMemoryMalloc(tlen);
×
174
      if (NULL == buf) {
×
175
        taosArrayDestroy(reqNew.pArray);
×
176
        goto end;
×
177
      }
178
      SEncoder coderNew = {0};
×
179
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
180
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
181
      tEncoderClear(&coderNew);
×
182
      if (ret != 0) {
×
183
        taosMemoryFree(buf);
×
184
        taosArrayDestroy(reqNew.pArray);
×
185
        goto end;
×
186
      }
187
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
188
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
189
      taosMemoryFree(buf);
×
190
      taosArrayDestroy(reqNew.pArray);
×
191
    }
192
  } else if (msgType == TDMT_VND_DELETE) {
×
193
    SDeleteRes req = {0};
×
194
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
195
      goto end;
×
196
    }
197
    realTbSuid = req.suid;
×
198
  }
199

200
end:
×
201
  tDecoderClear(&dcoder);
4✔
202
  bool tmp = tbSuid == realTbSuid;
4✔
203
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
4!
204
  return tmp;
4✔
205
}
206

207
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
2,893,846✔
208
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
2,893,846!
209
    return -1;
×
210
  }
211
  int32_t code = -1;
2,893,861✔
212
  int32_t vgId = TD_VID(pTq->pVnode);
2,893,861✔
213
  int64_t id = pHandle->pWalReader->readerId;
2,893,861✔
214

215
  int64_t offset = *fetchOffset;
2,893,861✔
216
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
2,893,861✔
217
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
2,893,858✔
218
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
2,893,857✔
219

220
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
2,893,841!
221
          ", 0x%" PRIx64,
222
          vgId, offset, lastVer, committedVer, appliedVer, id);
223

224
  while (offset <= appliedVer) {
2,903,781✔
225
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
130,333!
226
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
227
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
228
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
229
      goto END;
×
230
    }
231

232
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
130,327!
233
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
234

235
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
130,332✔
236
      code = walFetchBody(pHandle->pWalReader);
119,812✔
237
      goto END;
119,805✔
238
    } else {
239
      if (pHandle->fetchMeta != WITH_DATA) {
10,520✔
240
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
839✔
241
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
839✔
242
          code = walFetchBody(pHandle->pWalReader);
669✔
243
          if (code < 0) {
669!
244
            goto END;
×
245
          }
246

247
          pHead = &(pHandle->pWalReader->pHead->head);
669✔
248
          if (isValValidForTable(pHandle, pHead)) {
669✔
249
            code = 0;
666✔
250
            goto END;
666✔
251
          } else {
252
            offset++;
3✔
253
            code = -1;
3✔
254
            continue;
3✔
255
          }
256
        }
257
      }
258
      code = walSkipFetchBody(pHandle->pWalReader);
9,851✔
259
      if (code < 0) {
9,851!
260
        goto END;
×
261
      }
262
      offset++;
9,851✔
263
    }
264
    code = -1;
9,851✔
265
  }
266

267
END:
2,773,448✔
268
  *fetchOffset = offset;
2,893,919✔
269
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
2,893,919✔
270
          ", applied:%" PRId64 ", 0x%" PRIx64,
271
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
272
  return code;
2,893,928✔
273
}
274

275
bool tqGetTablePrimaryKey(STqReader* pReader) {
20,475✔
276
  if (pReader == NULL) {
20,475!
277
    return false;
×
278
  }
279
  return pReader->hasPrimaryKey;
20,475✔
280
}
281

282
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
180✔
283
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
180!
284

285
  if (pReader == NULL) {
180!
286
    return;
×
287
  }
288
  bool            ret = false;
180✔
289
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
180✔
290
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
180!
291
    ret = true;
×
292
  }
293
  tDeleteSchemaWrapper(schema);
294
  pReader->hasPrimaryKey = ret;
180✔
295
}
296

297
STqReader* tqReaderOpen(SVnode* pVnode) {
8,772✔
298
  tqDebug("%s:%p", __FUNCTION__, pVnode);
8,772✔
299
  if (pVnode == NULL) {
8,787!
300
    return NULL;
×
301
  }
302
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
8,787!
303
  if (pReader == NULL) {
8,786!
304
    return NULL;
×
305
  }
306

307
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
8,786✔
308
  if (pReader->pWalReader == NULL) {
8,785!
309
    taosMemoryFree(pReader);
×
310
    return NULL;
×
311
  }
312

313
  pReader->pVnodeMeta = pVnode->pMeta;
8,785✔
314
  pReader->pColIdList = NULL;
8,785✔
315
  pReader->cachedSchemaVer = 0;
8,785✔
316
  pReader->cachedSchemaSuid = 0;
8,785✔
317
  pReader->pSchemaWrapper = NULL;
8,785✔
318
  pReader->tbIdHash = NULL;
8,785✔
319
  pReader->pResBlock = NULL;
8,785✔
320

321
  int32_t code = createDataBlock(&pReader->pResBlock);
8,785✔
322
  if (code) {
8,786!
323
    terrno = code;
×
324
  }
325

326
  return pReader;
8,786✔
327
}
328

329
void tqReaderClose(STqReader* pReader) {
8,815✔
330
  tqDebug("%s:%p", __FUNCTION__, pReader);
8,815✔
331
  if (pReader == NULL) return;
8,817✔
332

333
  // close wal reader
334
  if (pReader->pWalReader) {
8,787!
335
    walCloseReader(pReader->pWalReader);
8,787✔
336
  }
337

338
  if (pReader->pSchemaWrapper) {
8,787✔
339
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
2,070!
340
  }
341

342
  taosMemoryFree(pReader->extSchema);
8,787!
343
  if (pReader->pColIdList) {
8,787✔
344
    taosArrayDestroy(pReader->pColIdList);
8,479✔
345
  }
346

347
  // free hash
348
  blockDataDestroy(pReader->pResBlock);
8,787✔
349
  taosHashCleanup(pReader->tbIdHash);
8,787✔
350
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
8,787✔
351

352
  taosHashCleanup(pReader->vtSourceScanInfo.pVirtualTables);
8,787✔
353
  taosHashCleanup(pReader->vtSourceScanInfo.pPhysicalTables);
8,787✔
354
  taosLRUCacheCleanup(pReader->vtSourceScanInfo.pPhyTblSchemaCache);
8,787✔
355
  taosMemoryFree(pReader);
8,787!
356
}
357

358
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
6,706✔
359
  if (pReader == NULL) {
6,706!
UNCOV
360
    return TSDB_CODE_INVALID_PARA;
×
361
  }
362
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
6,706✔
363
    return terrno;
166✔
364
  }
365
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
6,539✔
366
  return 0;
6,540✔
367
}
368

369
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
793,406✔
370
  int32_t code = 0;
793,406✔
371

372
  while (1) {
7,712✔
373
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
801,118✔
374

375
    SWalCont* pCont = &pReader->pHead->head;
481,085✔
376
    int64_t   ver = pCont->version;
481,085✔
377
    if (ver > maxVer) {
481,085✔
378
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
447✔
379
      return TSDB_CODE_SUCCESS;
447✔
380
    }
381

382
    if (pCont->msgType == TDMT_VND_SUBMIT) {
480,638✔
383
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
470,334✔
384
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
470,334✔
385

386
      void* data = taosMemoryMalloc(len);
470,334!
387
      if (data == NULL) {
470,341!
388
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
389
        // retry
390
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
UNCOV
391
        return terrno;
×
392
      }
393

394
      (void)memcpy(data, pBody, len);
470,341✔
395
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
470,341✔
396

397
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
470,341✔
398
      if (code != 0) {
470,339!
399
        tqError("%s failed to create data submit for stream since out of memory", id);
×
UNCOV
400
        return code;
×
401
      }
402
    } else if (pCont->msgType == TDMT_VND_DELETE) {
10,304✔
403
      void*       pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
10,123✔
404
      int32_t     len = pCont->bodyLen - sizeof(SMsgHead);
10,123✔
405
      EStreamType blockType = STREAM_DELETE_DATA;
10,123✔
406
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
10,123✔
407
      if (code == TSDB_CODE_SUCCESS) {
10,122!
408
        if (*pItem == NULL) {
10,123✔
409
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
7,712✔
410
          // we need to continue check next data in the wal files.
411
          continue;
7,712✔
412
        } else {
413
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
2,411✔
414
        }
415
      } else {
416
        terrno = code;
×
417
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
UNCOV
418
        return code;
×
419
      }
420

421
    } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
366!
422
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
185✔
423
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
185✔
424
      code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
185✔
425
      if (TSDB_CODE_SUCCESS == code) {
185!
426
        if (!*pItem) {
185!
UNCOV
427
          continue;
×
428
        } else {
429
          tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
185!
430
        }
431
      } else {
432
        terrno = code;
×
UNCOV
433
        return code;
×
434
      }
435
    } else {
436
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
UNCOV
437
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
438
    }
439

440
    return code;
472,932✔
441
  }
442
}
443

444
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
575,966✔
445
  if (pReader == NULL) {
575,966!
UNCOV
446
    return false;
×
447
  }
448
  SWalReader* pWalReader = pReader->pWalReader;
575,966✔
449

450
  int64_t st = taosGetTimestampMs();
575,967✔
451
  while (1) {
573,741✔
452
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
1,149,708✔
453
    while (pReader->nextBlk < numOfBlocks) {
1,481,484✔
454
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
812,360✔
455
              pReader->msg.ver);
456

457
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
812,360✔
458
      if (pSubmitTbData == NULL) {
812,297!
UNCOV
459
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
460
                pReader->msg.ver);
UNCOV
461
        return false;
×
462
      }
463
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
812,307✔
464
        pReader->nextBlk += 1;
128✔
465
        continue;
128✔
466
      }
467
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
813,973!
468
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
482,302✔
469
        SSDataBlock* pRes = NULL;
482,302✔
470
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
482,302✔
471
        if (code == TSDB_CODE_SUCCESS) {
482,398✔
472
          return true;
480,604✔
473
        }
474
      } else {
475
        pReader->nextBlk += 1;
329,992✔
476
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
329,992!
477
      }
478
    }
479

480
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
669,124✔
481
    pReader->msg.msgStr = NULL;
669,118✔
482

483
    int64_t elapsed = taosGetTimestampMs() - st;
669,103✔
484
    if (elapsed > 1000 || elapsed < 0) {
669,103!
UNCOV
485
      return false;
×
486
    }
487

488
    // try next message in wal file
489
    if (walNextValidMsg(pWalReader) < 0) {
669,107✔
490
      return false;
95,365✔
491
    }
492

493
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
573,738✔
494
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
573,738✔
495
    int64_t ver = pWalReader->pHead->head.version;
573,738✔
496
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL) != 0) {
573,738!
UNCOV
497
      return false;
×
498
    }
499
    pReader->nextBlk = 0;
573,741✔
500
  }
501
}
502

503
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList) {
1,163,994✔
504
  if (pReader == NULL) {
1,163,994!
UNCOV
505
    return TSDB_CODE_INVALID_PARA;
×
506
  }
507
  pReader->msg.msgStr = msgStr;
1,163,994✔
508
  pReader->msg.msgLen = msgLen;
1,163,994✔
509
  pReader->msg.ver = ver;
1,163,994✔
510

511
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
1,163,994✔
512
  SDecoder decoder = {0};
1,163,994✔
513

514
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
1,163,994✔
515
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit, rawList);
1,163,836✔
516
  tDecoderClear(&decoder);
1,163,489✔
517

518
  if (code != 0) {
1,164,054!
UNCOV
519
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
520
  }
521

522
  return code;
1,164,012✔
523
}
524

525
void tqReaderClearSubmitMsg(STqReader* pReader) {
709,949✔
526
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
709,949✔
527
  pReader->nextBlk = 0;
709,918✔
528
  pReader->msg.msgStr = NULL;
709,918✔
529
}
709,918✔
530

531
SWalReader* tqGetWalReader(STqReader* pReader) {
679,400✔
532
  if (pReader == NULL) {
679,400!
UNCOV
533
    return NULL;
×
534
  }
535
  return pReader->pWalReader;
679,400✔
536
}
537

538
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
575,950✔
539
  if (pReader == NULL) {
575,950!
UNCOV
540
    return NULL;
×
541
  }
542
  return pReader->pResBlock;
575,950✔
543
}
544

545
int64_t tqGetResultBlockTime(STqReader* pReader) {
575,940✔
546
  if (pReader == NULL) {
575,940!
UNCOV
547
    return 0;
×
548
  }
549
  return pReader->lastTs;
575,940✔
550
}
551

552
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
937,414✔
553
  int32_t code = false;
937,414✔
554
  int32_t lino = 0;
937,414✔
555
  int64_t uid = 0;
937,414✔
556
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
937,414!
557
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
937,414!
558
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
937,414!
559

560
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
937,414✔
561
  while (pReader->nextBlk < blockSz) {
1,087,274✔
562
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
580,374✔
563
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
580,370!
564
    uid = pSubmitTbData->uid;
580,370✔
565
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
580,370✔
566
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
580,421✔
567

568
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
149,821✔
569
    pReader->nextBlk++;
149,837✔
570
  }
571

572
  tqReaderClearSubmitMsg(pReader);
506,900✔
573
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
506,951✔
574

575
END:
500,521✔
576
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
937,551✔
577
  return code;
937,524✔
578
}
579

580
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
166,554✔
581
  int32_t code = false;
166,554✔
582
  int32_t lino = 0;
166,554✔
583
  int64_t uid = 0;
166,554✔
584

585
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
166,554!
586
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
166,554!
587
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
166,554!
588

589
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
166,554✔
590
  while (pReader->nextBlk < blockSz) {
166,538✔
591
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
83,389✔
592
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
83,387!
593
    uid = pSubmitTbData->uid;
83,387✔
594
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
83,387✔
595
    TSDB_CHECK_NULL(ret, code, lino, END, true);
83,388!
596
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
×
UNCOV
597
    pReader->nextBlk++;
×
598
  }
599
  tqReaderClearSubmitMsg(pReader);
83,149✔
600
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
83,314!
601

602
END:
83,314✔
603
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
166,702!
604
  return code;
166,565✔
605
}
606

607
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSrc) {
115,747✔
608
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
115,747!
UNCOV
609
    return TSDB_CODE_INVALID_PARA;
×
610
  }
611
  int32_t code = 0;
115,756✔
612

613
  int32_t cnt = 0;
115,756✔
614
  for (int32_t i = 0; i < pSrc->nCols; i++) {
684,852✔
615
    cnt += mask[i];
569,096✔
616
  }
617

618
  pDst->nCols = cnt;
115,756✔
619
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
115,756!
620
  if (pDst->pSchema == NULL) {
115,742!
UNCOV
621
    return TAOS_GET_TERRNO(terrno);
×
622
  }
623

624
  int32_t j = 0;
115,742✔
625
  for (int32_t i = 0; i < pSrc->nCols; i++) {
684,218✔
626
    if (mask[i]) {
568,525✔
627
      pDst->pSchema[j++] = pSrc->pSchema[i];
568,510✔
628
      SColumnInfoData colInfo =
629
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
568,510✔
630
      if (extSrc != NULL) {
568,802✔
631
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
239✔
632
      }
633
      code = blockDataAppendColInfo(pBlock, &colInfo);
568,802✔
634
      if (code != 0) {
568,461!
UNCOV
635
        return code;
×
636
      }
637
    }
638
  }
639
  return 0;
115,693✔
640
}
641

642
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
1,904✔
643
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
1,904!
UNCOV
644
    return TSDB_CODE_INVALID_PARA;
×
645
  }
646
  SSDataBlock* pBlock = pReader->pResBlock;
1,904✔
647
  if (blockDataGetNumOfCols(pBlock) > 0) {
1,904✔
648
    blockDataDestroy(pBlock);
2✔
649
    int32_t code = createDataBlock(&pReader->pResBlock);
2✔
650
    if (code) {
2!
NEW
651
      return code;
×
652
    }
653
    pBlock = pReader->pResBlock;
2✔
654

655
    pBlock->info.id.uid = pReader->cachedSchemaUid;
2✔
656
    pBlock->info.version = pReader->msg.ver;
2✔
657
  }
658

659
  int32_t numOfCols = taosArrayGetSize(pColIdList);
1,905✔
660

661
  if (numOfCols == 0) {  // all columns are required
1,906!
662
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
663
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
664
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
665

UNCOV
666
      if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
×
UNCOV
667
        decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
668
      }
UNCOV
669
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
UNCOV
670
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
671
        blockDataFreeRes(pBlock);
×
UNCOV
672
        return terrno;
×
673
      }
674
    }
675
  } else {
676
    if (numOfCols > pSchema->nCols) {
1,906✔
677
      numOfCols = pSchema->nCols;
2✔
678
    }
679

680
    int32_t i = 0;
1,906✔
681
    int32_t j = 0;
1,906✔
682
    while (i < pSchema->nCols && j < numOfCols) {
15,071✔
683
      SSchema* pColSchema = &pSchema->pSchema[i];
13,159✔
684
      col_id_t colIdSchema = pColSchema->colId;
13,159✔
685

686
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
13,159✔
687
      if (pColIdNeed == NULL) {
13,169!
UNCOV
688
        break;
×
689
      }
690
      if (colIdSchema < *pColIdNeed) {
13,169✔
691
        i++;
1,667✔
692
      } else if (colIdSchema > *pColIdNeed) {
11,502!
UNCOV
693
        j++;
×
694
      } else {
695
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
11,502✔
696
        if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
11,527!
697
          decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
12✔
698
        }
699
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
11,527✔
700
        if (code != TSDB_CODE_SUCCESS) {
11,498!
UNCOV
701
          return -1;
×
702
        }
703
        i++;
11,498✔
704
        j++;
11,498✔
705
      }
706
    }
707
  }
708

709
  return TSDB_CODE_SUCCESS;
1,912✔
710
}
711

712
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
190,631,130✔
713
  int32_t code = TSDB_CODE_SUCCESS;
190,631,130✔
714

715
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
199,009,783!
716
    char val[65535 + 2] = {0};
7,687,365✔
717
    if (COL_VAL_IS_VALUE(pColVal)) {
7,687,365!
718
      if (pColVal->value.pData != NULL) {
8,426,997!
719
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
8,431,923✔
720
      }
721
      varDataSetLen(val, pColVal->value.nData);
8,426,997✔
722
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
8,426,997✔
723
    } else {
UNCOV
724
      colDataSetNULL(pColumnInfoData, rowIndex);
×
725
    }
726
  } else {
727
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type), !COL_VAL_IS_VALUE(pColVal));
182,943,765!
728
  }
729

730
  return code;
190,794,589✔
731
}
732

733
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
879,919✔
734
  if (pReader == NULL || pRes == NULL) {
879,919!
UNCOV
735
    return TSDB_CODE_INVALID_PARA;
×
736
  }
737
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
879,988✔
738
  int32_t        code = 0;
880,014✔
739
  int32_t        line = 0;
880,014✔
740
  STSchema*      pTSchema = NULL;
880,014✔
741
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
880,014✔
742
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
880,017!
743
  SSDataBlock* pBlock = pReader->pResBlock;
880,017✔
744
  *pRes = pBlock;
880,017✔
745

746
  blockDataCleanup(pBlock);
880,017✔
747

748
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
879,967✔
749
  int32_t sversion = pSubmitTbData->sver;
879,967✔
750
  int64_t suid = pSubmitTbData->suid;
879,967✔
751
  int64_t uid = pSubmitTbData->uid;
879,967✔
752
  pReader->lastTs = pSubmitTbData->ctimeMs;
879,967✔
753

754
  pBlock->info.id.uid = uid;
879,967✔
755
  pBlock->info.version = pReader->msg.ver;
879,967✔
756

757
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
879,967✔
758
      (pReader->cachedSchemaVer != sversion)) {
876,241!
759
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
3,725✔
760
    taosMemoryFree(pReader->extSchema);
3,730!
761
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
3,731✔
762
    if (pReader->pSchemaWrapper == NULL) {
3,730✔
763
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
1,827✔
764
             "version %d, possibly dropped table",
765
             vgId, suid, uid, pReader->cachedSchemaVer);
766
      pReader->cachedSchemaSuid = 0;
1,825✔
767
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
1,825✔
768
    }
769

770
    pReader->cachedSchemaUid = uid;
1,903✔
771
    pReader->cachedSchemaSuid = suid;
1,903✔
772
    pReader->cachedSchemaVer = sversion;
1,903✔
773

774
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
1,903!
UNCOV
775
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
776
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
UNCOV
777
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
778
    }
779
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
1,903✔
780
    TSDB_CHECK_CODE(code, line, END);
1,906!
781
    pBlock = pReader->pResBlock;
1,906✔
782
    *pRes = pBlock;
1,906✔
783
  }
784

785
  int32_t numOfRows = 0;
878,148✔
786
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
878,148✔
787
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
4✔
788
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
4!
789
    numOfRows = pCol->nVal;
4✔
790
  } else {
791
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
878,144✔
792
  }
793

794
  code = blockDataEnsureCapacity(pBlock, numOfRows);
878,149✔
795
  TSDB_CHECK_CODE(code, line, END);
878,150!
796
  pBlock->info.rows = numOfRows;
878,150✔
797
  int32_t colActual = blockDataGetNumOfCols(pBlock);
878,150✔
798

799
  // convert and scan one block
800
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
878,150✔
801
    SArray* pCols = pSubmitTbData->aCol;
4✔
802
    int32_t numOfCols = taosArrayGetSize(pCols);
4✔
803
    int32_t targetIdx = 0;
4✔
804
    int32_t sourceIdx = 0;
4✔
805
    while (targetIdx < colActual) {
18✔
806
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14✔
807
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
14!
808
      if (sourceIdx >= numOfCols) {
14✔
809
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
4!
810
        colDataSetNNULL(pColData, 0, numOfRows);
4!
811
        targetIdx++;
4✔
812
        continue;
4✔
813
      }
814

815
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
10✔
816
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
10!
817
      SColVal colVal = {0};
10✔
818
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
10!
819
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
820
      if (pCol->cid < pColData->info.colId) {
10✔
821
        sourceIdx++;
4✔
822
      } else if (pCol->cid == pColData->info.colId) {
6✔
823
        for (int32_t i = 0; i < pCol->nVal; i++) {
12✔
824
          code = tColDataGetValue(pCol, i, &colVal);
8✔
825
          TSDB_CHECK_CODE(code, line, END);
8!
826
          code = doSetVal(pColData, i, &colVal);
8✔
827
          TSDB_CHECK_CODE(code, line, END);
8!
828
        }
829
        sourceIdx++;
4✔
830
        targetIdx++;
4✔
831
      } else {
832
        colDataSetNNULL(pColData, 0, numOfRows);
2!
833
        targetIdx++;
2✔
834
      }
835
    }
836
  } else {
837
    SArray*         pRows = pSubmitTbData->aRowP;
878,146✔
838
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
878,146✔
839
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
878,146✔
840
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
878,173!
841

842
    for (int32_t i = 0; i < numOfRows; i++) {
65,143,867✔
843
      SRow* pRow = taosArrayGetP(pRows, i);
64,262,328✔
844
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
64,216,578✔
845
      int32_t sourceIdx = 0;
64,149,343✔
846
      for (int32_t j = 0; j < colActual; j++) {
237,764,583✔
847
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
173,498,895✔
848
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
173,114,208!
849

850
        while (1) {
19,966,962✔
851
          SColVal colVal = {0};
193,081,170✔
852
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
193,081,170✔
853
          TSDB_CHECK_CODE(code, line, END);
193,061,585!
854

855
          if (colVal.cid < pColData->info.colId) {
193,061,585✔
856
            sourceIdx++;
19,966,962✔
857
            continue;
19,966,962✔
858
          } else if (colVal.cid == pColData->info.colId) {
173,094,623!
859
            code = doSetVal(pColData, i, &colVal);
173,833,288✔
860
            TSDB_CHECK_CODE(code, line, END);
174,353,905!
861
            sourceIdx++;
174,353,905✔
862
            break;
173,615,240✔
863
          } else {
864
            colDataSetNULL(pColData, i);
×
UNCOV
865
            break;
×
866
          }
867
        }
868
      }
869
    }
870
  }
871

872
END:
881,539✔
873
  if (code != 0) {
881,543!
UNCOV
874
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
875
  }
876
  taosMemoryFreeClear(pTSchema);
878,151!
877
  return code;
878,199✔
878
}
879

880
#define PROCESS_VAL                                      \
881
  if (curRow == 0) {                                     \
882
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
883
    buildNew = true;                                     \
884
  } else {                                               \
885
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
886
    if (currentRowAssigned != assigned[j]) {             \
887
      assigned[j] = currentRowAssigned;                  \
888
      buildNew = true;                                   \
889
    }                                                    \
890
  }
891

892
#define SET_DATA                                                     \
893
  if (colVal.cid < pColData->info.colId) {                           \
894
    sourceIdx++;                                                     \
895
  } else if (colVal.cid == pColData->info.colId) {                   \
896
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
897
    sourceIdx++;                                                     \
898
    targetIdx++;                                                     \
899
  }
900

901
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
115,711✔
902
                               char* assigned, int32_t numOfRows, int32_t curRow,
903
                               int32_t* lastRow) {
904
  int32_t         code = 0;
115,711✔
905
  SSchemaWrapper* pSW = NULL;
115,711✔
906
  SSDataBlock*    block = NULL;
115,711✔
907
  if (taosArrayGetSize(blocks) > 0) {
115,711!
UNCOV
908
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
UNCOV
909
    TQ_NULL_GO_TO_END(pLastBlock);
×
UNCOV
910
    pLastBlock->info.rows = curRow - *lastRow;
×
UNCOV
911
    *lastRow = curRow;
×
912
  }
913

914
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
115,721!
915
  TQ_NULL_GO_TO_END(block);
115,749!
916

917
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
115,749!
918
  TQ_NULL_GO_TO_END(pSW);
115,755!
919

920
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
115,755!
921
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
115,734!
922
          (int32_t)taosArrayGetSize(block->pDataBlock));
923

924
  block->info.id.uid = pSubmitTbData->uid;
115,734✔
925
  block->info.version = pReader->msg.ver;
115,734✔
926
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
115,734!
927
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
115,741!
928
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
115,736!
929
  pSW = NULL;
115,736✔
930

931
  taosMemoryFreeClear(block);
115,736!
932

UNCOV
933
END:
×
934
  if (code != 0) {
115,750!
UNCOV
935
    tqError("processBuildNew failed, code:%d", code);
×
936
  }
937
  tDeleteSchemaWrapper(pSW);
115,750!
938
  blockDataFreeRes(block);
115,741✔
939
  taosMemoryFree(block);
115,741!
940
  return code;
115,743✔
941
}
942
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
114✔
943
  int32_t code = 0;
114✔
944
  int32_t curRow = 0;
114✔
945
  int32_t lastRow = 0;
114✔
946

947
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
114✔
948
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
114!
949
  TQ_NULL_GO_TO_END(assigned);
114!
950

951
  SArray*   pCols = pSubmitTbData->aCol;
114✔
952
  SColData* pCol = taosArrayGet(pCols, 0);
114✔
953
  TQ_NULL_GO_TO_END(pCol);
114!
954
  int32_t numOfRows = pCol->nVal;
114✔
955
  int32_t numOfCols = taosArrayGetSize(pCols);
114✔
956
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
114!
957
          numOfRows);
958
  for (int32_t i = 0; i < numOfRows; i++) {
319✔
959
    bool buildNew = false;
204✔
960

961
    for (int32_t j = 0; j < numOfCols; j++) {
1,043✔
962
      pCol = taosArrayGet(pCols, j);
839✔
963
      TQ_NULL_GO_TO_END(pCol);
839!
964
      SColVal colVal = {0};
839✔
965
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
839!
966
      PROCESS_VAL
839!
967
    }
968

969
    if (buildNew) {
204✔
970
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows,
114!
971
                                       curRow, &lastRow));
972
    }
973

974
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
204✔
975
    TQ_NULL_GO_TO_END(pBlock);
204!
976

977
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
204!
978
            (int32_t)taosArrayGetSize(blocks));
979

980
    int32_t targetIdx = 0;
204✔
981
    int32_t sourceIdx = 0;
204✔
982
    int32_t colActual = blockDataGetNumOfCols(pBlock);
204✔
983
    while (targetIdx < colActual) {
1,037✔
984
      pCol = taosArrayGet(pCols, sourceIdx);
832✔
985
      TQ_NULL_GO_TO_END(pCol);
832!
986
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
832✔
987
      TQ_NULL_GO_TO_END(pColData);
832!
988
      SColVal colVal = {0};
832✔
989
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
832!
990
      SET_DATA
834!
991
    }
992

993
    curRow++;
205✔
994
  }
995
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
115✔
996
  pLastBlock->info.rows = curRow - lastRow;
114✔
997
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
114!
998
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
999
END:
114✔
1000
  if (code != TSDB_CODE_SUCCESS) {
114!
UNCOV
1001
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1002
  }
1003
  taosMemoryFree(assigned);
114!
1004
  return code;
114✔
1005
}
1006

1007
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
115,638✔
1008
  int32_t   code = 0;
115,638✔
1009
  STSchema* pTSchema = NULL;
115,638✔
1010

1011
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
115,638✔
1012
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
115,638!
1013
  TQ_NULL_GO_TO_END(assigned);
115,650!
1014

1015
  int32_t curRow = 0;
115,650✔
1016
  int32_t lastRow = 0;
115,650✔
1017
  SArray* pRows = pSubmitTbData->aRowP;
115,650✔
1018
  int32_t numOfRows = taosArrayGetSize(pRows);
115,650✔
1019
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
115,644✔
1020
  TQ_NULL_GO_TO_END(pTSchema);
115,639!
1021
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
115,639!
1022

1023
  for (int32_t i = 0; i < numOfRows; i++) {
3,788,663✔
1024
    bool  buildNew = false;
3,631,228✔
1025
    SRow* pRow = taosArrayGetP(pRows, i);
3,631,228✔
1026
    TQ_NULL_GO_TO_END(pRow);
3,624,331!
1027

1028
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
19,692,732✔
1029
      SColVal colVal = {0};
16,010,109✔
1030
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
16,010,109✔
1031
      PROCESS_VAL
16,067,684!
1032
    }
1033

1034
    if (buildNew) {
3,682,623✔
1035
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows,
115,604!
1036
                                       curRow, &lastRow));
1037
    }
1038

1039
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
3,682,648✔
1040
    TQ_NULL_GO_TO_END(pBlock);
3,579,280!
1041

1042
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
3,579,280!
1043
            (int32_t)taosArrayGetSize(blocks));
1044

1045
    int32_t targetIdx = 0;
3,579,280✔
1046
    int32_t sourceIdx = 0;
3,579,280✔
1047
    int32_t colActual = blockDataGetNumOfCols(pBlock);
3,579,280✔
1048
    while (targetIdx < colActual) {
19,905,172✔
1049
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
16,232,141✔
1050
      SColVal          colVal = {0};
16,189,586✔
1051
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
16,189,586!
1052
      SET_DATA
16,107,163!
1053
    }
1054

1055
    curRow++;
3,673,031✔
1056
  }
1057
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
157,435✔
1058
  pLastBlock->info.rows = curRow - lastRow;
115,618✔
1059

1060
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
115,618!
1061
          (int)taosArrayGetSize(blocks));
1062
END:
115,618✔
1063
  if (code != TSDB_CODE_SUCCESS) {
115,622✔
1064
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
5!
1065
  }
1066
  taosMemoryFreeClear(pTSchema);
115,622!
1067
  taosMemoryFree(assigned);
115,640!
1068
  return code;
115,640✔
1069
}
1070

1071
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
42✔
1072
  int32_t code = 0;
42✔
1073
  int32_t lino = 0;
42✔
1074
  void*   createReq = NULL;
42✔
1075
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
42!
1076
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
42!
1077

1078
  if (pRsp->createTableNum == 0) {
42✔
1079
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
25✔
1080
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
25!
1081
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
25✔
1082
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
25!
1083
  }
1084

1085
  uint32_t len = 0;
42✔
1086
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
42!
1087
  TSDB_CHECK_CODE(code, lino, END);
42!
1088
  createReq = taosMemoryCalloc(1, len);
42!
1089
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
42!
1090

1091
  SEncoder encoder = {0};
42✔
1092
  tEncoderInit(&encoder, createReq, len);
42✔
1093
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
42✔
1094
  tEncoderClear(&encoder);
42✔
1095
  TSDB_CHECK_CODE(code, lino, END);
42!
1096
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
84!
1097
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
84!
1098
  pRsp->createTableNum++;
42✔
1099
  tqTrace("build create table info msg success");
42!
1100

1101
END:
42✔
1102
  if (code != 0) {
42!
UNCOV
1103
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
UNCOV
1104
    taosMemoryFree(createReq);
×
1105
  }
1106
  return code;
42✔
1107
}
1108

1109
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
116,364✔
1110
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1111
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
116,364!
1112
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
116,364✔
1113
  if (pSubmitTbData == NULL) {
116,364!
UNCOV
1114
    return terrno;
×
1115
  }
1116
  pReader->nextBlk++;
116,364✔
1117

1118
  if (pSubmitTbDataRet) {
116,364!
1119
    *pSubmitTbDataRet = pSubmitTbData;
116,364✔
1120
  }
1121

1122
  if (fetchMeta == ONLY_META) {
116,364✔
1123
    if (pSubmitTbData->pCreateTbReq != NULL) {
22✔
1124
      if (pRsp->createTableReq == NULL) {
4✔
1125
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
1✔
1126
        if (pRsp->createTableReq == NULL) {
1!
UNCOV
1127
          return terrno;
×
1128
        }
1129
      }
1130
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
8!
UNCOV
1131
        return terrno;
×
1132
      }
1133
      pSubmitTbData->pCreateTbReq = NULL;
4✔
1134
    }
1135
    return 0;
22✔
1136
  }
1137

1138
  int32_t sversion = pSubmitTbData->sver;
116,342✔
1139
  int64_t uid = pSubmitTbData->uid;
116,342✔
1140
  pReader->lastBlkUid = uid;
116,342✔
1141

1142
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
116,342✔
1143
  taosMemoryFree(pReader->extSchema);
116,347!
1144
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
116,343✔
1145
  if (pReader->pSchemaWrapper == NULL) {
116,349✔
1146
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
571✔
1147
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1148
    pReader->cachedSchemaSuid = 0;
564✔
1149
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
564✔
1150
  }
1151

1152
  if (pSubmitTbData->pCreateTbReq != NULL) {
115,778✔
1153
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
42✔
1154
    if (code != 0) {
42!
UNCOV
1155
      return code;
×
1156
    }
1157
  } else if (rawList != NULL) {
115,736✔
1158
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
34!
UNCOV
1159
      return terrno;
×
1160
    }
1161
    pReader->pSchemaWrapper = NULL;
17✔
1162
    return 0;
17✔
1163
  }
1164

1165
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
115,761✔
1166
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
114✔
1167
  } else {
1168
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
115,647✔
1169
  }
1170
}
1171

1172
int32_t tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList, const char* id) {
8,477✔
1173
  if (pReader == NULL) {
8,477!
NEW
1174
    return TSDB_CODE_SUCCESS;
×
1175
  }
1176
  pReader->pColIdList = pColIdList;
8,477✔
1177
  return tqCollectPhysicalTables(pReader, id);
8,477✔
1178
}
1179

1180
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
8,558✔
1181
  if (pReader == NULL || tbUidList == NULL) {
8,558!
NEW
1182
    return TSDB_CODE_SUCCESS;
×
1183
  }
1184
  if (pReader->tbIdHash) {
8,559✔
1185
    taosHashClear(pReader->tbIdHash);
17✔
1186
  } else {
1187
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
8,542✔
1188
    if (pReader->tbIdHash == NULL) {
8,541!
UNCOV
1189
      tqError("s-task:%s failed to init hash table", id);
×
NEW
1190
      return terrno;
×
1191
    }
1192
  }
1193

1194
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
195,763✔
1195
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
187,176✔
1196
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
187,156✔
1197
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
72!
1198
      continue;
×
1199
    }
1200
  }
1201

1202
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
8,554✔
1203
  return tqCollectPhysicalTables(pReader, id);
8,554✔
1204
}
1205

1206
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
21,553✔
1207
  if (pReader == NULL || pTableUidList == NULL) {
21,553!
UNCOV
1208
    return;
×
1209
  }
1210
  if (pReader->tbIdHash == NULL) {
21,554!
1211
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
UNCOV
1212
    if (pReader->tbIdHash == NULL) {
×
UNCOV
1213
      tqError("failed to init hash table");
×
UNCOV
1214
      return;
×
1215
    }
1216
  }
1217

1218
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
21,554✔
1219
  for (int i = 0; i < numOfTables; i++) {
22,564✔
1220
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
1,011✔
1221
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
1,011!
UNCOV
1222
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
UNCOV
1223
      continue;
×
1224
    }
1225
  }
1226
}
1227

1228
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
2,431✔
1229
  if (pReader == NULL) {
2,431!
UNCOV
1230
    return false;
×
1231
  }
1232
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
2,431✔
1233
}
1234

1235
bool tqCurrentBlockConsumed(const STqReader* pReader) {
863,666✔
1236
  if (pReader == NULL) {
863,666!
UNCOV
1237
    return false;
×
1238
  }
1239
  return pReader->msg.msgStr == NULL;
863,666✔
1240
}
1241

1242
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
760✔
1243
  if (pReader == NULL || tbUidList == NULL) {
760!
1244
    return;
×
1245
  }
1246
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
766✔
1247
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
6✔
1248
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
6!
1249
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
1!
1250
    }
1251
  }
1252
}
1253

1254
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
109,706✔
1255
  if (pTq == NULL || tbUidList == NULL) {
109,706!
UNCOV
1256
    return TSDB_CODE_INVALID_PARA;
×
1257
  }
1258
  void*   pIter = NULL;
109,708✔
1259
  int32_t vgId = TD_VID(pTq->pVnode);
109,708✔
1260

1261
  // update the table list for each consumer handle
1262
  taosWLockLatch(&pTq->lock);
109,708✔
1263
  while (1) {
4,324✔
1264
    pIter = taosHashIterate(pTq->pHandle, pIter);
114,033✔
1265
    if (pIter == NULL) {
114,032✔
1266
      break;
109,707✔
1267
    }
1268

1269
    STqHandle* pTqHandle = (STqHandle*)pIter;
4,325✔
1270
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4,325✔
1271
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
1,152✔
1272
      if (code != 0) {
1,152!
UNCOV
1273
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
UNCOV
1274
        continue;
×
1275
      }
1276
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
3,173✔
1277
      if (!isAdd) {
3,155✔
1278
        int32_t sz = taosArrayGetSize(tbUidList);
1,062✔
1279
        for (int32_t i = 0; i < sz; i++) {
1,062!
UNCOV
1280
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1281
          if (tbUid &&
×
1282
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
UNCOV
1283
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1284
            continue;
×
1285
          }
1286
        }
1287
      }
1288
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
18✔
1289
      if (isAdd) {
17!
1290
        SArray* list = NULL;
17✔
1291
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
17✔
1292
                                    &list, pTqHandle->execHandle.task);
1293
        if (ret != TDB_CODE_SUCCESS) {
17!
UNCOV
1294
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1295
                  pTqHandle->consumerId);
UNCOV
1296
          taosArrayDestroy(list);
×
UNCOV
1297
          taosHashCancelIterate(pTq->pHandle, pIter);
×
UNCOV
1298
          taosWUnLockLatch(&pTq->lock);
×
1299

UNCOV
1300
          return ret;
×
1301
        }
1302
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
17✔
1303
        taosArrayDestroy(list);
17✔
1304
      } else {
UNCOV
1305
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1306
      }
1307
    }
1308
  }
1309
  taosWUnLockLatch(&pTq->lock);
109,707✔
1310

1311
  // update the table list handle for each stream scanner/wal reader
1312
  streamMetaWLock(pTq->pStreamMeta);
109,707✔
1313
  while (1) {
42,390✔
1314
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
152,099✔
1315
    if (pIter == NULL) {
152,098✔
1316
      break;
109,708✔
1317
    }
1318

1319
    int64_t      refId = *(int64_t*)pIter;
42,390✔
1320
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
42,390✔
1321
    if (pTask != NULL) {
42,392!
1322
      int32_t taskId = pTask->id.taskId;
42,392✔
1323

1324
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
42,392✔
1325
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
21,168✔
1326
        if (code != 0) {
21,162✔
1327
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
15!
1328
        }
1329
      }
1330
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
42,386✔
1331
      if (ret) {
42,389!
UNCOV
1332
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1333
      }
1334
    }
1335
  }
1336

1337
  streamMetaWUnLock(pTq->pStreamMeta);
109,708✔
1338
  return 0;
109,707✔
1339
}
1340

NEW
1341
static void destroySourceScanTables(void* ptr) {
×
NEW
1342
  SArray** pTables = ptr;
×
NEW
1343
  if (pTables && *pTables) {
×
NEW
1344
    taosArrayDestroy(*pTables);
×
NEW
1345
    *pTables = NULL;
×
1346
  }
NEW
1347
}
×
1348

NEW
1349
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
NEW
1350
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
NEW
1351
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
NEW
1352
  if (pCol1->vColId == pCol2->vColId) {
×
NEW
1353
    return 0;
×
NEW
1354
  } else if (pCol1->vColId < pCol2->vColId) {
×
NEW
1355
    return -1;
×
1356
  } else {
NEW
1357
    return 1;
×
1358
  }
1359
}
1360

NEW
1361
int32_t tqReaderSetVtableInfo(STqReader* pReader, void* vnode, void* ptr, SSHashObj* pVtableInfos,
×
1362
                              SSDataBlock** ppResBlock, const char* idstr) {
NEW
1363
  int32_t            code = TSDB_CODE_SUCCESS;
×
NEW
1364
  int32_t            lino = 0;
×
NEW
1365
  SStorageAPI*       pAPI = ptr;
×
NEW
1366
  SVTSourceScanInfo* pScanInfo = NULL;
×
NEW
1367
  SHashObj*          pVirtualTables = NULL;
×
NEW
1368
  SMetaReader        metaReader = {0};
×
NEW
1369
  SVTColInfo         colInfo = {0};
×
NEW
1370
  SSchemaWrapper*    schema = NULL;
×
1371

NEW
1372
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
NEW
1373
  TSDB_CHECK_NULL(vnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
NEW
1374
  TSDB_CHECK_NULL(pAPI, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1375

NEW
1376
  pScanInfo = &pReader->vtSourceScanInfo;
×
NEW
1377
  taosHashCleanup(pScanInfo->pVirtualTables);
×
NEW
1378
  pScanInfo->pVirtualTables = NULL;
×
1379

NEW
1380
  if (tSimpleHashGetSize(pVtableInfos) == 0) {
×
NEW
1381
    goto _end;
×
1382
  }
1383

NEW
1384
  pVirtualTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
NEW
1385
  TSDB_CHECK_NULL(pVirtualTables, code, lino, _end, terrno);
×
NEW
1386
  taosHashSetFreeFp(pVirtualTables, destroySourceScanTables);
×
1387

NEW
1388
  int32_t iter = 0;
×
NEW
1389
  void*   px = tSimpleHashIterate(pVtableInfos, NULL, &iter);
×
NEW
1390
  while (px != NULL) {
×
NEW
1391
    int64_t vTbUid = *(int64_t*)tSimpleHashGetKey(px, NULL);
×
NEW
1392
    SArray* pColInfos = taosArrayInit(8, sizeof(SVTColInfo));
×
NEW
1393
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, terrno);
×
NEW
1394
    code = taosHashPut(pVirtualTables, &vTbUid, sizeof(int64_t), &pColInfos, POINTER_BYTES);
×
NEW
1395
    TSDB_CHECK_CODE(code, lino, _end);
×
1396

NEW
1397
    SSHashObj* pPhysicalTables = *(SSHashObj**)px;
×
NEW
1398
    int32_t    iterIn = 0;
×
NEW
1399
    void*      pxIn = tSimpleHashIterate(pPhysicalTables, NULL, &iterIn);
×
NEW
1400
    while (pxIn != NULL) {
×
NEW
1401
      char* physicalTableName = tSimpleHashGetKey(pxIn, NULL);
×
NEW
1402
      pAPI->metaReaderFn.clearReader(&metaReader);
×
NEW
1403
      pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
NEW
1404
      code = pAPI->metaReaderFn.getTableEntryByName(&metaReader, physicalTableName);
×
NEW
1405
      TSDB_CHECK_CODE(code, lino, _end);
×
NEW
1406
      pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
NEW
1407
      colInfo.pTbUid = metaReader.me.uid;
×
1408

NEW
1409
      switch (metaReader.me.type) {
×
NEW
1410
        case TSDB_CHILD_TABLE: {
×
NEW
1411
          int64_t suid = metaReader.me.ctbEntry.suid;
×
NEW
1412
          pAPI->metaReaderFn.clearReader(&metaReader);
×
NEW
1413
          pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
NEW
1414
          code = pAPI->metaReaderFn.getTableEntryByUid(&metaReader, suid);
×
NEW
1415
          TSDB_CHECK_CODE(code, lino, _end);
×
NEW
1416
          pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
NEW
1417
          schema = &metaReader.me.stbEntry.schemaRow;
×
NEW
1418
          break;
×
1419
        }
NEW
1420
        case TSDB_NORMAL_TABLE: {
×
NEW
1421
          schema = &metaReader.me.ntbEntry.schemaRow;
×
NEW
1422
          break;
×
1423
        }
NEW
1424
        default: {
×
NEW
1425
          tqError("invalid table type: %d", metaReader.me.type);
×
NEW
1426
          code = TSDB_CODE_INVALID_PARA;
×
NEW
1427
          TSDB_CHECK_CODE(code, lino, _end);
×
1428
        }
1429
      }
1430

NEW
1431
      SArray* pCols = *(SArray**)pxIn;
×
NEW
1432
      int32_t ncols = taosArrayGetSize(pCols);
×
NEW
1433
      for (int32_t i = 0; i < ncols; ++i) {
×
NEW
1434
        SColIdName* pCol = taosArrayGet(pCols, i);
×
NEW
1435
        colInfo.vColId = pCol->colId;
×
1436

NEW
1437
        for (int32_t j = 0; j < schema->nCols; ++j) {
×
NEW
1438
          if (strncmp(pCol->colName, schema->pSchema[j].name, strlen(schema->pSchema[j].name)) == 0) {
×
NEW
1439
            colInfo.pColId = schema->pSchema[j].colId;
×
NEW
1440
            void* px = taosArrayPush(pColInfos, &colInfo);
×
NEW
1441
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
NEW
1442
            break;
×
1443
          }
1444
        }
1445
      }
1446

NEW
1447
      taosArraySort(pColInfos, compareSVTColInfo);
×
NEW
1448
      pxIn = tSimpleHashIterate(pPhysicalTables, pxIn, &iterIn);
×
1449
    }
1450

NEW
1451
    px = tSimpleHashIterate(pVtableInfos, px, &iter);
×
1452
  }
1453

NEW
1454
  pScanInfo->pVirtualTables = pVirtualTables;
×
NEW
1455
  pVirtualTables = NULL;
×
1456

1457
  // set the result data block
NEW
1458
  if (pReader->pResBlock) {
×
NEW
1459
    blockDataDestroy(pReader->pResBlock);
×
1460
  }
NEW
1461
  pReader->pResBlock = *ppResBlock;
×
NEW
1462
  *ppResBlock = NULL;
×
1463

1464
  // update reader callback for vtable source scan
NEW
1465
  pAPI->tqReaderFn.tqRetrieveBlock = tqRetrieveVTableDataBlock;
×
NEW
1466
  pAPI->tqReaderFn.tqNextBlockImpl = tqNextVTableSourceBlockImpl;
×
NEW
1467
  pAPI->tqReaderFn.tqReaderIsQueriedTable = tqReaderIsQueriedSourceTable;
×
1468

NEW
1469
_end:
×
NEW
1470
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
1471
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1472
  }
NEW
1473
  pAPI->metaReaderFn.clearReader(&metaReader);
×
NEW
1474
  if (pVirtualTables != NULL) {
×
NEW
1475
    taosHashCleanup(pVirtualTables);
×
1476
  }
NEW
1477
  return code;
×
1478
}
1479

1480
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
17,036✔
1481
  int32_t            code = TSDB_CODE_SUCCESS;
17,036✔
1482
  int32_t            lino = 0;
17,036✔
1483
  SVTSourceScanInfo* pScanInfo = NULL;
17,036✔
1484
  SHashObj*          pVirtualTables = NULL;
17,036✔
1485
  SHashObj*          pPhysicalTables = NULL;
17,036✔
1486
  void*              pIter = NULL;
17,036✔
1487
  void*              px = NULL;
17,036✔
1488

1489
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
17,036!
1490

1491
  pScanInfo = &pReader->vtSourceScanInfo;
17,036✔
1492
  taosHashCleanup(pScanInfo->pPhysicalTables);
17,036✔
1493
  pScanInfo->pPhysicalTables = NULL;
17,032✔
1494
  taosLRUCacheCleanup(pScanInfo->pPhyTblSchemaCache);
17,032✔
1495
  pScanInfo->pPhyTblSchemaCache = NULL;
17,032✔
1496
  pScanInfo->nextVirtualTableIdx = -1;
17,032✔
1497
  pScanInfo->metaFetch = 0;
17,032✔
1498
  pScanInfo->cacheHit = 0;
17,032✔
1499

1500
  pVirtualTables = pScanInfo->pVirtualTables;
17,032✔
1501
  if (taosHashGetSize(pVirtualTables) == 0 || taosHashGetSize(pReader->tbIdHash) == 0 ||
17,032!
NEW
1502
      taosArrayGetSize(pReader->pColIdList) == 0) {
×
1503
    goto _end;
17,033✔
1504
  }
1505

NEW
1506
  pPhysicalTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
NEW
1507
  TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno);
×
NEW
1508
  taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables);
×
1509

NEW
1510
  pIter = taosHashIterate(pReader->tbIdHash, NULL);
×
NEW
1511
  while (pIter != NULL) {
×
NEW
1512
    int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL);
×
1513

NEW
1514
    px = taosHashGet(pVirtualTables, &vTbUid, sizeof(int64_t));
×
NEW
1515
    TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
NEW
1516
    SArray* pColInfos = *(SArray**)px;
×
NEW
1517
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1518

1519
    // Traverse all required columns and collect corresponding physical tables
NEW
1520
    int32_t nColInfos = taosArrayGetSize(pColInfos);
×
NEW
1521
    int32_t nOutputCols = taosArrayGetSize(pReader->pColIdList);
×
NEW
1522
    for (int32_t i = 0, j = 0; i < nColInfos && j < nOutputCols;) {
×
NEW
1523
      SVTColInfo* pCol = taosArrayGet(pColInfos, i);
×
NEW
1524
      col_id_t    colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
×
NEW
1525
      if (pCol->vColId < colIdNeed) {
×
NEW
1526
        i++;
×
NEW
1527
      } else if (pCol->vColId > colIdNeed) {
×
NEW
1528
        j++;
×
1529
      } else {
NEW
1530
        SArray* pRelatedVTs = NULL;
×
NEW
1531
        px = taosHashGet(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t));
×
NEW
1532
        if (px == NULL) {
×
NEW
1533
          pRelatedVTs = taosArrayInit(8, sizeof(int64_t));
×
NEW
1534
          TSDB_CHECK_NULL(pRelatedVTs, code, lino, _end, terrno);
×
NEW
1535
          code = taosHashPut(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t), &pRelatedVTs, POINTER_BYTES);
×
NEW
1536
          if (code != TSDB_CODE_SUCCESS) {
×
NEW
1537
            taosArrayDestroy(pRelatedVTs);
×
NEW
1538
            TSDB_CHECK_CODE(code, lino, _end);
×
1539
          }
1540
        } else {
NEW
1541
          pRelatedVTs = *(SArray**)px;
×
1542
        }
NEW
1543
        if (taosArrayGetSize(pRelatedVTs) == 0 || *(int64_t*)taosArrayGetLast(pRelatedVTs) != vTbUid) {
×
NEW
1544
          px = taosArrayPush(pRelatedVTs, &vTbUid);
×
NEW
1545
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1546
        }
NEW
1547
        i++;
×
NEW
1548
        j++;
×
1549
      }
1550
    }
NEW
1551
    pIter = taosHashIterate(pReader->tbIdHash, pIter);
×
1552
  }
1553

NEW
1554
  pScanInfo->pPhysicalTables = pPhysicalTables;
×
NEW
1555
  pPhysicalTables = NULL;
×
1556

NEW
1557
  if (taosHashGetSize(pScanInfo->pPhysicalTables) > 0) {
×
NEW
1558
    pScanInfo->pPhyTblSchemaCache = taosLRUCacheInit(1024 * 128, -1, .5);
×
NEW
1559
    TSDB_CHECK_NULL(pScanInfo->pPhyTblSchemaCache, code, lino, _end, terrno);
×
1560
  }
1561

NEW
1562
_end:
×
1563
  if (code != TSDB_CODE_SUCCESS) {
17,033!
NEW
1564
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1565
  }
1566
  if (pIter != NULL) {
17,036!
NEW
1567
    taosHashCancelIterate(pReader->tbIdHash, pIter);
×
1568
  }
1569
  if (pPhysicalTables != NULL) {
17,036!
NEW
1570
    taosHashCleanup(pPhysicalTables);
×
1571
  }
1572
  return code;
17,034✔
1573
}
1574

NEW
1575
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
NEW
1576
  if (value) {
×
NEW
1577
    SSchemaWrapper** ppSchemaWrapper = value;
×
NEW
1578
    tDeleteSchemaWrapper(*ppSchemaWrapper);
×
NEW
1579
    *ppSchemaWrapper = NULL;
×
1580
  }
NEW
1581
}
×
1582

NEW
1583
int32_t tqRetrieveVTableDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* idstr) {
×
NEW
1584
  int32_t            code = TSDB_CODE_SUCCESS;
×
NEW
1585
  int32_t            lino = 0;
×
NEW
1586
  SVTSourceScanInfo* pScanInfo = NULL;
×
NEW
1587
  SSubmitTbData*     pSubmitTbData = NULL;
×
NEW
1588
  SSDataBlock*       pBlock = NULL;
×
NEW
1589
  void*              px = NULL;
×
NEW
1590
  int64_t            vTbUid = 0;
×
NEW
1591
  int64_t            pTbUid = 0;
×
NEW
1592
  LRUHandle*         h = NULL;
×
NEW
1593
  STSchema*          pPhyTblSchema = NULL;
×
1594

NEW
1595
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
NEW
1596
  TSDB_CHECK_NULL(pRes, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1597

NEW
1598
  pScanInfo = &pReader->vtSourceScanInfo;
×
NEW
1599
  tqDebug("tq reader retrieve vtable data block from %p, nextBlk:%d, vtbIdx:%d, id:%s", pReader->msg.msgStr,
×
1600
          pReader->nextBlk, pScanInfo->nextVirtualTableIdx, idstr);
1601

NEW
1602
  *pRes = NULL;
×
NEW
1603
  pBlock = pReader->pResBlock;
×
NEW
1604
  blockDataCleanup(pBlock);
×
1605

NEW
1606
  pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
NEW
1607
  TSDB_CHECK_NULL(pSubmitTbData, code, lino, _end, terrno);
×
1608

NEW
1609
  pReader->lastTs = pSubmitTbData->ctimeMs;
×
1610

NEW
1611
  pTbUid = pSubmitTbData->uid;
×
NEW
1612
  px = taosHashGet(pScanInfo->pPhysicalTables, &pTbUid, sizeof(int64_t));
×
NEW
1613
  TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
NEW
1614
  SArray* pRelatedVTs = *(SArray**)px;
×
NEW
1615
  vTbUid = *(int64_t*)taosArrayGet(pRelatedVTs, pScanInfo->nextVirtualTableIdx);
×
NEW
1616
  px = taosHashGet(pScanInfo->pVirtualTables, &vTbUid, sizeof(int64_t));
×
NEW
1617
  TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
NEW
1618
  SArray* pColInfos = *(SArray**)px;
×
NEW
1619
  TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1620

NEW
1621
  int32_t nColInfos = taosArrayGetSize(pColInfos);
×
NEW
1622
  int32_t nOutputCols = taosArrayGetSize(pBlock->pDataBlock);
×
1623

NEW
1624
  int32_t numOfRows = 0;
×
NEW
1625
  int32_t nInputCols = 0;
×
NEW
1626
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
NEW
1627
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
NEW
1628
    TSDB_CHECK_NULL(pCol, code, lino, _end, terrno);
×
NEW
1629
    numOfRows = pCol->nVal;
×
NEW
1630
    nInputCols = taosArrayGetSize(pSubmitTbData->aCol);
×
1631
  } else {
1632
    // try to get physical table schema from cache
NEW
1633
    pScanInfo->metaFetch++;
×
NEW
1634
    int64_t         cacheKey = (pSubmitTbData->suid == 0) ? pTbUid : pSubmitTbData->suid;
×
NEW
1635
    SSchemaWrapper* pWrapper = NULL;
×
NEW
1636
    h = taosLRUCacheLookup(pScanInfo->pPhyTblSchemaCache, &cacheKey, sizeof(int64_t));
×
NEW
1637
    if (h != NULL) {
×
NEW
1638
      pWrapper = taosLRUCacheValue(pScanInfo->pPhyTblSchemaCache, h);
×
NEW
1639
      TSDB_CHECK_NULL(pWrapper, code, lino, _end, terrno);
×
1640
    }
1641

NEW
1642
    if (pWrapper != NULL && pWrapper->version != pSubmitTbData->sver) {
×
1643
      // reset outdated schema
1644
      tDeleteSchemaWrapper(pWrapper);
NEW
1645
      pWrapper = NULL;
×
NEW
1646
      taosLRUCacheUpdate(pScanInfo->pPhyTblSchemaCache, h, pWrapper);
×
1647
    }
1648

NEW
1649
    if (pWrapper == NULL) {
×
1650
      // get physical table schema from meta
NEW
1651
      pWrapper = metaGetTableSchema(pReader->pVnodeMeta, pTbUid, pSubmitTbData->sver, 1, NULL);
×
NEW
1652
      if (pWrapper == NULL) {
×
NEW
1653
        tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
1654
               "version %d, possibly dropped table",
1655
               pReader->pWalReader->pWal->cfg.vgId, pSubmitTbData->suid, pTbUid, pSubmitTbData->sver);
NEW
1656
        TSDB_CHECK_NULL(pWrapper, code, lino, _end, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
×
1657
      }
NEW
1658
      if (h == NULL) {
×
1659
        // insert schema to cache
NEW
1660
        code = taosLRUCacheInsert(pScanInfo->pPhyTblSchemaCache, &cacheKey, sizeof(int64_t), pWrapper, POINTER_BYTES,
×
1661
                                  freeTableSchemaCache, NULL, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
NEW
1662
        if (code != TSDB_CODE_SUCCESS) {
×
1663
          tDeleteSchemaWrapper(pWrapper);
1664
        }
NEW
1665
        TSDB_CHECK_CODE(code, lino, _end);
×
1666
      } else {
1667
        // update schema in cache
NEW
1668
        taosLRUCacheUpdate(pScanInfo->pPhyTblSchemaCache, h, pWrapper);
×
1669
      }
1670
    } else {
NEW
1671
      pScanInfo->cacheHit++;
×
1672
    }
NEW
1673
    TSDB_CHECK_NULL(pWrapper, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
NEW
1674
    pPhyTblSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
×
NEW
1675
    TSDB_CHECK_NULL(pPhyTblSchema, code, lino, _end, terrno);
×
1676

NEW
1677
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
×
NEW
1678
    nInputCols = pPhyTblSchema->numOfCols;
×
1679
  }
1680

NEW
1681
  code = blockDataEnsureCapacity(pBlock, numOfRows);
×
NEW
1682
  TSDB_CHECK_CODE(code, lino, _end);
×
1683

1684
  // convert one block
NEW
1685
  for (int32_t i = 0, j = 1; j < nOutputCols;) {
×
NEW
1686
    SColumnInfoData* pOutCol = taosArrayGet(pBlock->pDataBlock, j);
×
NEW
1687
    TSDB_CHECK_NULL(pOutCol, code, lino, _end, terrno);
×
NEW
1688
    if (i >= nColInfos) {
×
NEW
1689
      tqInfo("%s has %d column info, but vtable column %d is missing, id: %s", __func__, nColInfos, pOutCol->info.colId,
×
1690
             idstr);
NEW
1691
      colDataSetNNULL(pOutCol, 0, numOfRows);
×
NEW
1692
      j++;
×
NEW
1693
      continue;
×
1694
    }
1695

NEW
1696
    SVTColInfo* pCol = taosArrayGet(pColInfos, i);
×
NEW
1697
    TSDB_CHECK_NULL(pCol, code, lino, _end, terrno);
×
NEW
1698
    if (pCol->vColId < pOutCol->info.colId) {
×
NEW
1699
      i++;
×
NEW
1700
      continue;
×
NEW
1701
    } else if (pCol->vColId > pOutCol->info.colId) {
×
NEW
1702
      tqInfo("%s does not find column info for vtable column %d, closest vtable column is %d, id: %s", __func__,
×
1703
             pOutCol->info.colId, pCol->vColId, idstr);
NEW
1704
      colDataSetNNULL(pOutCol, 0, numOfRows);
×
NEW
1705
      j++;
×
NEW
1706
      continue;
×
1707
    }
1708

1709
    // copy data from physical table to the result block of virtual table
NEW
1710
    if (pCol->pTbUid != pTbUid) {
×
1711
      // skip this column since it is from another physical table
NEW
1712
    } else if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
1713
      // try to find the corresponding column data of physical table
NEW
1714
      SColData* pColData = NULL;
×
NEW
1715
      for (int32_t k = 0; k < nInputCols; ++k) {
×
NEW
1716
        pColData = taosArrayGet(pSubmitTbData->aCol, k);
×
NEW
1717
        TSDB_CHECK_NULL(pColData, code, lino, _end, terrno);
×
NEW
1718
        if (pColData->cid == pCol->pColId) {
×
NEW
1719
          break;
×
1720
        }
NEW
1721
        pColData = NULL;
×
1722
      }
NEW
1723
      if (pColData == NULL) {
×
NEW
1724
        tqError("%s does not find data of physical table %" PRId64 " column %d, virtual table: %" PRId64
×
1725
                " column: %d, id: %s",
1726
                __func__, pTbUid, pCol->pColId, vTbUid, pCol->vColId, idstr);
NEW
1727
        colDataSetNNULL(pOutCol, 0, numOfRows);
×
NEW
1728
        i++;
×
NEW
1729
        j++;
×
NEW
1730
        continue;
×
1731
      }
NEW
1732
      SColVal colVal = {0};
×
NEW
1733
      for (int32_t k = 0; k < pColData->nVal; ++k) {
×
NEW
1734
        code = tColDataGetValue(pColData, k, &colVal);
×
NEW
1735
        TSDB_CHECK_CODE(code, lino, _end);
×
NEW
1736
        code = doSetVal(pOutCol, k, &colVal);
×
NEW
1737
        TSDB_CHECK_CODE(code, lino, _end);
×
1738
      }
1739
    } else {
NEW
1740
      SArray* pRows = pSubmitTbData->aRowP;
×
NEW
1741
      TSDB_CHECK_NULL(pPhyTblSchema, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1742

NEW
1743
      SColVal colVal = {0};
×
NEW
1744
      for (int32_t k = 0; k < numOfRows; ++k) {
×
NEW
1745
        SRow* pRow = taosArrayGetP(pRows, k);
×
NEW
1746
        TSDB_CHECK_NULL(pRow, code, lino, _end, terrno);
×
NEW
1747
        for (int32_t l = 0; l < nInputCols; ++l) {
×
NEW
1748
          code = tRowGet(pRow, pPhyTblSchema, l, &colVal);
×
NEW
1749
          TSDB_CHECK_CODE(code, lino, _end);
×
NEW
1750
          if (colVal.cid == pCol->pColId) {
×
NEW
1751
            code = doSetVal(pOutCol, k, &colVal);
×
NEW
1752
            TSDB_CHECK_CODE(code, lino, _end);
×
NEW
1753
            break;
×
NEW
1754
          } else if (colVal.cid > pCol->pColId || l == (nInputCols - 1)) {
×
NEW
1755
            colDataSetNULL(pOutCol, k);
×
NEW
1756
            break;
×
1757
          }
1758
        }
1759
      }
1760
    }
1761

NEW
1762
    i++;
×
NEW
1763
    j++;
×
1764
  }
1765

1766
  // enforce to fill the first ts column
NEW
1767
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
NEW
1768
    SColumnInfoData* pOutCol = taosArrayGet(pBlock->pDataBlock, 0);
×
NEW
1769
    SColData*        pColData = taosArrayGet(pSubmitTbData->aCol, 0);
×
NEW
1770
    TSDB_CHECK_NULL(pColData, code, lino, _end, terrno);
×
NEW
1771
    SColVal colVal = {0};
×
NEW
1772
    for (int32_t k = 0; k < pColData->nVal; ++k) {
×
NEW
1773
      code = tColDataGetValue(pColData, k, &colVal);
×
NEW
1774
      TSDB_CHECK_CODE(code, lino, _end);
×
NEW
1775
      code = doSetVal(pOutCol, k, &colVal);
×
NEW
1776
      TSDB_CHECK_CODE(code, lino, _end);
×
1777
    }
1778
  } else {
NEW
1779
    SColumnInfoData* pOutCol = taosArrayGet(pBlock->pDataBlock, 0);
×
NEW
1780
    SArray*          pRows = pSubmitTbData->aRowP;
×
NEW
1781
    TSDB_CHECK_NULL(pPhyTblSchema, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
NEW
1782
    SColVal colVal = {0};
×
NEW
1783
    for (int32_t k = 0; k < numOfRows; ++k) {
×
NEW
1784
      SRow* pRow = taosArrayGetP(pRows, k);
×
NEW
1785
      TSDB_CHECK_NULL(pRow, code, lino, _end, terrno);
×
NEW
1786
      code = tRowGet(pRow, pPhyTblSchema, 0, &colVal);
×
NEW
1787
      TSDB_CHECK_CODE(code, lino, _end);
×
NEW
1788
      code = doSetVal(pOutCol, k, &colVal);
×
NEW
1789
      TSDB_CHECK_CODE(code, lino, _end);
×
1790
    }
1791
  }
1792

NEW
1793
  pBlock->info.rows = numOfRows;
×
NEW
1794
  pBlock->info.id.uid = vTbUid;
×
NEW
1795
  pBlock->info.id.groupId = pTbUid;
×
NEW
1796
  pBlock->info.version = pReader->msg.ver;
×
NEW
1797
  pScanInfo->nextVirtualTableIdx++;
×
NEW
1798
  if (pScanInfo->nextVirtualTableIdx >= taosArrayGetSize(pRelatedVTs)) {
×
NEW
1799
    pReader->nextBlk++;
×
NEW
1800
    pScanInfo->nextVirtualTableIdx = -1;
×
1801
  }
NEW
1802
  tqDebug("tq reader will retrieve next vtable data block from %p, nextBlk:%d, vtbIdx:%d, id:%s", pReader->msg.msgStr,
×
1803
          pReader->nextBlk, pScanInfo->nextVirtualTableIdx, idstr);
1804

NEW
1805
  *pRes = pBlock;
×
1806

NEW
1807
_end:
×
NEW
1808
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
1809
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1810
  }
NEW
1811
  if (h != NULL) {
×
NEW
1812
    bool bRes = taosLRUCacheRelease(pScanInfo->pPhyTblSchemaCache, h, false);
×
NEW
1813
    tqTrace("release LRU cache, res %d, id: %s", bRes, idstr);
×
1814
  }
NEW
1815
  if (pPhyTblSchema != NULL) {
×
NEW
1816
    taosMemoryFreeClear(pPhyTblSchema);
×
1817
  }
NEW
1818
  return code;
×
1819
}
1820

NEW
1821
bool tqNextVTableSourceBlockImpl(STqReader* pReader, const char* idstr) {
×
NEW
1822
  int32_t            code = TSDB_CODE_SUCCESS;
×
NEW
1823
  int32_t            lino = 0;
×
NEW
1824
  SVTSourceScanInfo* pScanInfo = NULL;
×
1825

NEW
1826
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1827

NEW
1828
  pScanInfo = &pReader->vtSourceScanInfo;
×
NEW
1829
  if (pReader->msg.msgStr == NULL || taosHashGetSize(pScanInfo->pPhysicalTables) == 0) {
×
NEW
1830
    return false;
×
1831
  }
1832

NEW
1833
  if (pScanInfo->nextVirtualTableIdx >= 0) {
×
1834
    // The data still needs to be converted into the virtual table result block
NEW
1835
    return true;
×
1836
  }
1837

NEW
1838
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
NEW
1839
  while (pReader->nextBlk < blockSz) {
×
NEW
1840
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
NEW
1841
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, _end, terrno);
×
NEW
1842
    int64_t pTbUid = pSubmitTbData->uid;
×
NEW
1843
    void*   px = taosHashGet(pScanInfo->pPhysicalTables, &pTbUid, sizeof(int64_t));
×
NEW
1844
    if (px != NULL) {
×
NEW
1845
      SArray* pRelatedVTs = *(SArray**)px;
×
NEW
1846
      if (taosArrayGetSize(pRelatedVTs) > 0) {
×
NEW
1847
        pScanInfo->nextVirtualTableIdx = 0;
×
NEW
1848
        return true;
×
1849
      }
1850
    }
NEW
1851
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz,
×
1852
            pTbUid);
NEW
1853
    pReader->nextBlk++;
×
1854
  }
1855

NEW
1856
  tqReaderClearSubmitMsg(pReader);
×
NEW
1857
  tqTrace("iterator data block end, total block num:%d", blockSz);
×
1858

NEW
1859
_end:
×
NEW
1860
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
1861
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1862
  }
NEW
1863
  return (code == TSDB_CODE_SUCCESS);
×
1864
}
1865

NEW
1866
bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) {
×
NEW
1867
  if (pReader == NULL) {
×
NEW
1868
    return false;
×
1869
  }
NEW
1870
  return taosHashGet(pReader->vtSourceScanInfo.pPhysicalTables, &uid, sizeof(uint64_t)) != NULL;
×
1871
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc