• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3825

01 Apr 2025 11:58AM UTC coverage: 34.067% (+0.003%) from 34.064%
#3825

push

travis-ci

happyguoxy
test:alter gcda dir

148492 of 599532 branches covered (24.77%)

Branch coverage included in aggregate %.

222504 of 489471 relevant lines covered (45.46%)

762290.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.58
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr);
20

21
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
×
22
  if (pHandle == NULL || pHead == NULL) {
×
23
    return false;
×
24
  }
25
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
×
26
    return true;
×
27
  }
28

29
  int16_t msgType = pHead->msgType;
×
30
  char*   body = pHead->body;
×
31
  int32_t bodyLen = pHead->bodyLen;
×
32

33
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
×
34
  int64_t  realTbSuid = 0;
×
35
  SDecoder dcoder = {0};
×
36
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
×
37
  int32_t  len = bodyLen - sizeof(SMsgHead);
×
38
  tDecoderInit(&dcoder, data, len);
×
39

40
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
×
41
    SVCreateStbReq req = {0};
×
42
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
×
43
      goto end;
×
44
    }
45
    realTbSuid = req.suid;
×
46
  } else if (msgType == TDMT_VND_DROP_STB) {
×
47
    SVDropStbReq req = {0};
×
48
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
49
      goto end;
×
50
    }
51
    realTbSuid = req.suid;
×
52
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
×
53
    SVCreateTbBatchReq req = {0};
×
54
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
×
55
      goto end;
×
56
    }
57

58
    int32_t        needRebuild = 0;
×
59
    SVCreateTbReq* pCreateReq = NULL;
×
60
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
61
      pCreateReq = req.pReqs + iReq;
×
62
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
63
        needRebuild++;
×
64
      }
65
    }
66
    if (needRebuild == 0) {
×
67
      // do nothing
68
    } else if (needRebuild == req.nReqs) {
×
69
      realTbSuid = tbSuid;
×
70
    } else {
71
      realTbSuid = tbSuid;
×
72
      SVCreateTbBatchReq reqNew = {0};
×
73
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
74
      if (reqNew.pArray == NULL) {
×
75
        tDeleteSVCreateTbBatchReq(&req);
×
76
        goto end;
×
77
      }
78
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
79
        pCreateReq = req.pReqs + iReq;
×
80
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
81
          reqNew.nReqs++;
×
82
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
83
            taosArrayDestroy(reqNew.pArray);
×
84
            tDeleteSVCreateTbBatchReq(&req);
×
85
            goto end;
×
86
          }
87
        }
88
      }
89

90
      int     tlen = 0;
×
91
      int32_t ret = 0;
×
92
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
93
      void* buf = taosMemoryMalloc(tlen);
×
94
      if (NULL == buf) {
×
95
        taosArrayDestroy(reqNew.pArray);
×
96
        tDeleteSVCreateTbBatchReq(&req);
×
97
        goto end;
×
98
      }
99
      SEncoder coderNew = {0};
×
100
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
101
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
102
      tEncoderClear(&coderNew);
×
103
      if (ret < 0) {
×
104
        taosMemoryFree(buf);
×
105
        taosArrayDestroy(reqNew.pArray);
×
106
        tDeleteSVCreateTbBatchReq(&req);
×
107
        goto end;
×
108
      }
109
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
110
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
111
      taosMemoryFree(buf);
×
112
      taosArrayDestroy(reqNew.pArray);
×
113
    }
114

115
    tDeleteSVCreateTbBatchReq(&req);
×
116
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
117
    SVAlterTbReq req = {0};
×
118

119
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
120
      goto end;
×
121
    }
122

123
    SMetaReader mr = {0};
×
124
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
125

126
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
127
      metaReaderClear(&mr);
×
128
      goto end;
×
129
    }
130
    realTbSuid = mr.me.ctbEntry.suid;
×
131
    metaReaderClear(&mr);
×
132
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
133
    SVDropTbBatchReq req = {0};
×
134

135
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
136
      goto end;
×
137
    }
138

139
    int32_t      needRebuild = 0;
×
140
    SVDropTbReq* pDropReq = NULL;
×
141
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
142
      pDropReq = req.pReqs + iReq;
×
143

144
      if (pDropReq->suid == tbSuid) {
×
145
        needRebuild++;
×
146
      }
147
    }
148
    if (needRebuild == 0) {
×
149
      // do nothing
150
    } else if (needRebuild == req.nReqs) {
×
151
      realTbSuid = tbSuid;
×
152
    } else {
153
      realTbSuid = tbSuid;
×
154
      SVDropTbBatchReq reqNew = {0};
×
155
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
156
      if (reqNew.pArray == NULL) {
×
157
        goto end;
×
158
      }
159
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
160
        pDropReq = req.pReqs + iReq;
×
161
        if (pDropReq->suid == tbSuid) {
×
162
          reqNew.nReqs++;
×
163
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
164
            taosArrayDestroy(reqNew.pArray);
×
165
            goto end;
×
166
          }
167
        }
168
      }
169

170
      int     tlen = 0;
×
171
      int32_t ret = 0;
×
172
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
173
      void* buf = taosMemoryMalloc(tlen);
×
174
      if (NULL == buf) {
×
175
        taosArrayDestroy(reqNew.pArray);
×
176
        goto end;
×
177
      }
178
      SEncoder coderNew = {0};
×
179
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
180
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
181
      tEncoderClear(&coderNew);
×
182
      if (ret != 0) {
×
183
        taosMemoryFree(buf);
×
184
        taosArrayDestroy(reqNew.pArray);
×
185
        goto end;
×
186
      }
187
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
188
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
189
      taosMemoryFree(buf);
×
190
      taosArrayDestroy(reqNew.pArray);
×
191
    }
192
  } else if (msgType == TDMT_VND_DELETE) {
×
193
    SDeleteRes req = {0};
×
194
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
195
      goto end;
×
196
    }
197
    realTbSuid = req.suid;
×
198
  }
199

200
end:
×
201
  tDecoderClear(&dcoder);
×
202
  bool tmp = tbSuid == realTbSuid;
×
203
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
×
204
  return tmp;
×
205
}
206

207
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
2,309✔
208
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
2,309!
209
    return -1;
×
210
  }
211
  int32_t code = -1;
2,309✔
212
  int32_t vgId = TD_VID(pTq->pVnode);
2,309✔
213
  int64_t id = pHandle->pWalReader->readerId;
2,309✔
214

215
  int64_t offset = *fetchOffset;
2,309✔
216
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
2,309✔
217
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
2,309✔
218
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
2,309✔
219

220
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
2,309!
221
          ", 0x%" PRIx64,
222
          vgId, offset, lastVer, committedVer, appliedVer, id);
223

224
  while (offset <= appliedVer) {
2,312✔
225
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
4!
226
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
227
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
228
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
229
      goto END;
×
230
    }
231

232
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
4!
233
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
234

235
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
4✔
236
      code = walFetchBody(pHandle->pWalReader);
1✔
237
      goto END;
1✔
238
    } else {
239
      if (pHandle->fetchMeta != WITH_DATA) {
3!
240
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
×
241
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
×
242
          code = walFetchBody(pHandle->pWalReader);
×
243
          if (code < 0) {
×
244
            goto END;
×
245
          }
246

247
          pHead = &(pHandle->pWalReader->pHead->head);
×
248
          if (isValValidForTable(pHandle, pHead)) {
×
249
            code = 0;
×
250
            goto END;
×
251
          } else {
252
            offset++;
×
253
            code = -1;
×
254
            continue;
×
255
          }
256
        }
257
      }
258
      code = walSkipFetchBody(pHandle->pWalReader);
3✔
259
      if (code < 0) {
3!
260
        goto END;
×
261
      }
262
      offset++;
3✔
263
    }
264
    code = -1;
3✔
265
  }
266

267
END:
2,308✔
268
  *fetchOffset = offset;
2,309✔
269
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
2,309!
270
          ", applied:%" PRId64 ", 0x%" PRIx64,
271
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
272
  return code;
2,309✔
273
}
274

275
bool tqGetTablePrimaryKey(STqReader* pReader) {
×
276
  if (pReader == NULL) {
×
277
    return false;
×
278
  }
279
  return pReader->hasPrimaryKey;
×
280
}
281

282
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
×
283
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
×
284

285
  if (pReader == NULL) {
×
286
    return;
×
287
  }
288
  bool            ret = false;
×
289
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
×
290
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
×
291
    ret = true;
×
292
  }
293
  tDeleteSchemaWrapper(schema);
294
  pReader->hasPrimaryKey = ret;
×
295
}
296

297
STqReader* tqReaderOpen(SVnode* pVnode) {
73✔
298
  tqDebug("%s:%p", __FUNCTION__, pVnode);
73!
299
  if (pVnode == NULL) {
73!
300
    return NULL;
×
301
  }
302
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
73!
303
  if (pReader == NULL) {
73!
304
    return NULL;
×
305
  }
306

307
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
73✔
308
  if (pReader->pWalReader == NULL) {
73!
309
    taosMemoryFree(pReader);
×
310
    return NULL;
×
311
  }
312

313
  pReader->pVnodeMeta = pVnode->pMeta;
73✔
314
  pReader->pColIdList = NULL;
73✔
315
  pReader->cachedSchemaVer = 0;
73✔
316
  pReader->cachedSchemaSuid = 0;
73✔
317
  pReader->pSchemaWrapper = NULL;
73✔
318
  pReader->tbIdHash = NULL;
73✔
319
  pReader->pResBlock = NULL;
73✔
320

321
  int32_t code = createDataBlock(&pReader->pResBlock);
73✔
322
  if (code) {
73!
323
    terrno = code;
×
324
  }
325

326
  return pReader;
73✔
327
}
328

329
void tqReaderClose(STqReader* pReader) {
73✔
330
  tqDebug("%s:%p", __FUNCTION__, pReader);
73!
331
  if (pReader == NULL) return;
73!
332

333
  // close wal reader
334
  if (pReader->pWalReader) {
73!
335
    walCloseReader(pReader->pWalReader);
73✔
336
  }
337

338
  if (pReader->pSchemaWrapper) {
73✔
339
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
47!
340
  }
341

342
  taosMemoryFree(pReader->extSchema);
73!
343
  if (pReader->pColIdList) {
73✔
344
    taosArrayDestroy(pReader->pColIdList);
72✔
345
  }
346

347
  // free hash
348
  blockDataDestroy(pReader->pResBlock);
73✔
349
  taosHashCleanup(pReader->tbIdHash);
73✔
350
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
73✔
351

352
  taosHashCleanup(pReader->vtSourceScanInfo.pVirtualTables);
73✔
353
  taosHashCleanup(pReader->vtSourceScanInfo.pPhysicalTables);
73✔
354
  taosLRUCacheCleanup(pReader->vtSourceScanInfo.pPhyTblSchemaCache);
73✔
355
  taosMemoryFree(pReader);
73!
356
}
357

358
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
4,411✔
359
  if (pReader == NULL) {
4,411!
360
    return TSDB_CODE_INVALID_PARA;
×
361
  }
362
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
4,411✔
363
    return terrno;
8✔
364
  }
365
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
4,402!
366
  return 0;
4,403✔
367
}
368

369
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
1,829✔
370
  int32_t code = 0;
1,829✔
371

372
  while (1) {
×
373
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
1,829✔
374

375
    SWalCont* pCont = &pReader->pHead->head;
1,203✔
376
    int64_t   ver = pCont->version;
1,203✔
377
    if (ver > maxVer) {
1,203!
378
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
×
379
      return TSDB_CODE_SUCCESS;
×
380
    }
381

382
    if (pCont->msgType == TDMT_VND_SUBMIT) {
1,203!
383
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
1,203✔
384
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
1,203✔
385

386
      void* data = taosMemoryMalloc(len);
1,203!
387
      if (data == NULL) {
1,203!
388
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
389
        // retry
390
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
391
        return terrno;
×
392
      }
393

394
      (void)memcpy(data, pBody, len);
1,203✔
395
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
1,203✔
396

397
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
1,203✔
398
      if (code != 0) {
1,203!
399
        tqError("%s failed to create data submit for stream since out of memory", id);
×
400
        return code;
×
401
      }
402
    } else if (pCont->msgType == TDMT_VND_DELETE) {
×
403
      void*       pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
×
404
      int32_t     len = pCont->bodyLen - sizeof(SMsgHead);
×
405
      EStreamType blockType = STREAM_DELETE_DATA;
×
406
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
×
407
      if (code == TSDB_CODE_SUCCESS) {
×
408
        if (*pItem == NULL) {
×
409
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
×
410
          // we need to continue check next data in the wal files.
411
          continue;
×
412
        } else {
413
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
×
414
        }
415
      } else {
416
        terrno = code;
×
417
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
418
        return code;
×
419
      }
420

421
    } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
×
422
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
×
423
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
×
424
      code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
×
425
      if (TSDB_CODE_SUCCESS == code) {
×
426
        if (!*pItem) {
×
427
          continue;
×
428
        } else {
429
          tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
×
430
        }
431
      } else {
432
        terrno = code;
×
433
        return code;
×
434
      }
435
    } else {
436
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
437
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
438
    }
439

440
    return code;
1,203✔
441
  }
442
}
443

444
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
252,822✔
445
  if (pReader == NULL) {
252,822!
446
    return false;
×
447
  }
448
  SWalReader* pWalReader = pReader->pWalReader;
252,822✔
449

450
  int64_t st = taosGetTimestampMs();
252,834✔
451
  while (1) {
306,781✔
452
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
559,615✔
453
    while (pReader->nextBlk < numOfBlocks) {
793,543✔
454
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
475,988✔
455
              pReader->msg.ver);
456

457
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
475,988✔
458
      if (pSubmitTbData == NULL) {
475,958✔
459
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
9!
460
                pReader->msg.ver);
461
        return false;
×
462
      }
463
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
475,949!
464
        pReader->nextBlk += 1;
×
465
        continue;
×
466
      }
467
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
475,945!
468
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
241,984✔
469
        SSDataBlock* pRes = NULL;
241,984✔
470
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
241,984✔
471
        if (code == TSDB_CODE_SUCCESS) {
242,044!
472
          return true;
242,048✔
473
        }
474
      } else {
475
        pReader->nextBlk += 1;
234,046✔
476
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
234,046✔
477
      }
478
    }
479

480
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
317,555✔
481
    pReader->msg.msgStr = NULL;
317,565✔
482

483
    int64_t elapsed = taosGetTimestampMs() - st;
317,552✔
484
    if (elapsed > 1000 || elapsed < 0) {
317,552!
485
      return false;
12✔
486
    }
487

488
    // try next message in wal file
489
    if (walNextValidMsg(pWalReader) < 0) {
317,540✔
490
      return false;
10,803✔
491
    }
492

493
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
306,758✔
494
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
306,758✔
495
    int64_t ver = pWalReader->pHead->head.version;
306,758✔
496
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL) != 0) {
306,758!
497
      return false;
×
498
    }
499
    pReader->nextBlk = 0;
306,781✔
500
  }
501
}
502

503
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList) {
307,955✔
504
  if (pReader == NULL) {
307,955!
505
    return TSDB_CODE_INVALID_PARA;
×
506
  }
507
  pReader->msg.msgStr = msgStr;
307,955✔
508
  pReader->msg.msgLen = msgLen;
307,955✔
509
  pReader->msg.ver = ver;
307,955✔
510

511
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
307,955✔
512
  SDecoder decoder = {0};
307,955✔
513

514
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
307,955✔
515
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit, rawList);
307,964✔
516
  tDecoderClear(&decoder);
307,878✔
517

518
  if (code != 0) {
307,989!
519
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
520
  }
521

522
  return code;
307,989✔
523
}
524

525
void tqReaderClearSubmitMsg(STqReader* pReader) {
1,205✔
526
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
1,205✔
527
  pReader->nextBlk = 0;
1,205✔
528
  pReader->msg.msgStr = NULL;
1,205✔
529
}
1,205✔
530

531
SWalReader* tqGetWalReader(STqReader* pReader) {
268,165✔
532
  if (pReader == NULL) {
268,165!
533
    return NULL;
×
534
  }
535
  return pReader->pWalReader;
268,165✔
536
}
537

538
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
252,832✔
539
  if (pReader == NULL) {
252,832!
540
    return NULL;
×
541
  }
542
  return pReader->pResBlock;
252,832✔
543
}
544

545
int64_t tqGetResultBlockTime(STqReader* pReader) {
252,845✔
546
  if (pReader == NULL) {
252,845!
547
    return 0;
×
548
  }
549
  return pReader->lastTs;
252,845✔
550
}
551

552
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
2,054✔
553
  int32_t code = false;
2,054✔
554
  int32_t lino = 0;
2,054✔
555
  int64_t uid = 0;
2,054✔
556
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
2,054!
557
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
2,054!
558
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
2,054!
559

560
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
2,054✔
561
  while (pReader->nextBlk < blockSz) {
2,524✔
562
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
1,321✔
563
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
1,321!
564
    uid = pSubmitTbData->uid;
1,321✔
565
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
1,321✔
566
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
1,321✔
567

568
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%"PRId64, pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
470✔
569
    pReader->nextBlk++;
470✔
570
  }
571

572
  tqReaderClearSubmitMsg(pReader);
1,203✔
573
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
1,203✔
574

575
END:
1,193✔
576
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
2,054✔
577
  return code;
2,054✔
578
}
579

580
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
2✔
581
  int32_t code = false;
2✔
582
  int32_t lino = 0;
2✔
583
  int64_t uid = 0;
2✔
584

585
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
2!
586
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
2!
587
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
2!
588

589
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
2✔
590
  while (pReader->nextBlk < blockSz) {
2✔
591
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
1✔
592
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
1!
593
    uid = pSubmitTbData->uid;
1✔
594
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
1✔
595
    TSDB_CHECK_NULL(ret, code, lino, END, true);
1!
596
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
597
    pReader->nextBlk++;
×
598
  }
599
  tqReaderClearSubmitMsg(pReader);
1✔
600
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
1!
601

602
END:
1✔
603
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
2!
604
  return code;
2✔
605
}
606

607
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSrc) {
1✔
608
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
1!
609
    return TSDB_CODE_INVALID_PARA;
×
610
  }
611
  int32_t code = 0;
1✔
612

613
  int32_t cnt = 0;
1✔
614
  for (int32_t i = 0; i < pSrc->nCols; i++) {
3✔
615
    cnt += mask[i];
2✔
616
  }
617

618
  pDst->nCols = cnt;
1✔
619
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
1!
620
  if (pDst->pSchema == NULL) {
1!
621
    return TAOS_GET_TERRNO(terrno);
×
622
  }
623

624
  int32_t j = 0;
1✔
625
  for (int32_t i = 0; i < pSrc->nCols; i++) {
3✔
626
    if (mask[i]) {
2!
627
      pDst->pSchema[j++] = pSrc->pSchema[i];
2✔
628
      SColumnInfoData colInfo =
629
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
2✔
630
      if (extSrc != NULL) {
2!
631
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
632
      }
633
      code = blockDataAppendColInfo(pBlock, &colInfo);
2✔
634
      if (code != 0) {
2!
635
        return code;
×
636
      }
637
    }
638
  }
639
  return 0;
1✔
640
}
641

642
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
46✔
643
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
46!
644
    return TSDB_CODE_INVALID_PARA;
×
645
  }
646
  SSDataBlock* pBlock = pReader->pResBlock;
46✔
647
  if (blockDataGetNumOfCols(pBlock) > 0) {
46!
648
    blockDataDestroy(pBlock);
×
649
    int32_t code = createDataBlock(&pReader->pResBlock);
×
650
    if (code) {
×
651
      return code;
×
652
    }
653
    pBlock = pReader->pResBlock;
×
654

655
    pBlock->info.id.uid = pReader->cachedSchemaUid;
×
656
    pBlock->info.version = pReader->msg.ver;
×
657
  }
658

659
  int32_t numOfCols = taosArrayGetSize(pColIdList);
46✔
660

661
  if (numOfCols == 0) {  // all columns are required
46!
662
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
663
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
664
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
665

666
      if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
×
667
        decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
668
      }
669
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
670
      if (code != TSDB_CODE_SUCCESS) {
×
671
        blockDataFreeRes(pBlock);
×
672
        return terrno;
×
673
      }
674
    }
675
  } else {
676
    if (numOfCols > pSchema->nCols) {
46!
677
      numOfCols = pSchema->nCols;
×
678
    }
679

680
    int32_t i = 0;
46✔
681
    int32_t j = 0;
46✔
682
    while (i < pSchema->nCols && j < numOfCols) {
229✔
683
      SSchema* pColSchema = &pSchema->pSchema[i];
183✔
684
      col_id_t colIdSchema = pColSchema->colId;
183✔
685

686
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
183✔
687
      if (pColIdNeed == NULL) {
183!
688
        break;
×
689
      }
690
      if (colIdSchema < *pColIdNeed) {
183✔
691
        i++;
64✔
692
      } else if (colIdSchema > *pColIdNeed) {
119!
693
        j++;
×
694
      } else {
695
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
119✔
696
        if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
119!
697
          decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
698
        }
699
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
119✔
700
        if (code != TSDB_CODE_SUCCESS) {
119!
701
          return -1;
×
702
        }
703
        i++;
119✔
704
        j++;
119✔
705
      }
706
    }
707
  }
708

709
  return TSDB_CODE_SUCCESS;
46✔
710
}
711

712
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
1,981,196✔
713
  int32_t code = TSDB_CODE_SUCCESS;
1,981,196✔
714

715
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,037,589!
716
    char val[65535 + 2] = {0};
53,372✔
717
    if (COL_VAL_IS_VALUE(pColVal)) {
53,372!
718
      if (pColVal->value.pData != NULL) {
56,394!
719
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
56,394✔
720
      }
721
      varDataSetLen(val, pColVal->value.nData);
56,394✔
722
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
56,394✔
723
    } else {
724
      colDataSetNULL(pColumnInfoData, rowIndex);
×
725
    }
726
  } else {
727
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type), !COL_VAL_IS_VALUE(pColVal));
1,927,824!
728
  }
729

730
  return code;
1,988,869✔
731
}
732

733
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
242,838✔
734
  if (pReader == NULL || pRes == NULL) {
242,838!
735
    return TSDB_CODE_INVALID_PARA;
736
  }
737
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
242,878!
738
  int32_t        code = 0;
242,893✔
739
  int32_t        line = 0;
242,893✔
740
  STSchema*      pTSchema = NULL;
242,893✔
741
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
242,893✔
742
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
242,905!
743
  SSDataBlock* pBlock = pReader->pResBlock;
242,905✔
744
  *pRes = pBlock;
242,905✔
745

746
  blockDataCleanup(pBlock);
242,905✔
747

748
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
242,889✔
749
  int32_t sversion = pSubmitTbData->sver;
242,889✔
750
  int64_t suid = pSubmitTbData->suid;
242,889✔
751
  int64_t uid = pSubmitTbData->uid;
242,889✔
752
  pReader->lastTs = pSubmitTbData->ctimeMs;
242,889✔
753

754
  pBlock->info.id.uid = uid;
242,889✔
755
  pBlock->info.version = pReader->msg.ver;
242,889✔
756

757
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
242,889✔
758
      (pReader->cachedSchemaVer != sversion)) {
242,841!
759
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
48!
760
    taosMemoryFree(pReader->extSchema);
46!
761
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
46✔
762
    if (pReader->pSchemaWrapper == NULL) {
46!
763
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
764
             "version %d, possibly dropped table",
765
             vgId, suid, uid, pReader->cachedSchemaVer);
766
      pReader->cachedSchemaSuid = 0;
×
767
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
768
    }
769

770
    pReader->cachedSchemaUid = uid;
46✔
771
    pReader->cachedSchemaSuid = suid;
46✔
772
    pReader->cachedSchemaVer = sversion;
46✔
773

774
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
46!
775
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
776
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
777
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
778
    }
779
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
46✔
780
    TSDB_CHECK_CODE(code, line, END);
46!
781
    pBlock = pReader->pResBlock;
46✔
782
    *pRes = pBlock;
46✔
783
  }
784

785
  int32_t numOfRows = 0;
242,887✔
786
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
242,887!
787
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
788
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
789
    numOfRows = pCol->nVal;
×
790
  } else {
791
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
242,887✔
792
  }
793

794
  code = blockDataEnsureCapacity(pBlock, numOfRows);
242,886✔
795
  TSDB_CHECK_CODE(code, line, END);
242,888!
796
  pBlock->info.rows = numOfRows;
242,888✔
797
  int32_t colActual = blockDataGetNumOfCols(pBlock);
242,888✔
798

799
  // convert and scan one block
800
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
242,890!
801
    SArray* pCols = pSubmitTbData->aCol;
×
802
    int32_t numOfCols = taosArrayGetSize(pCols);
×
803
    int32_t targetIdx = 0;
×
804
    int32_t sourceIdx = 0;
×
805
    while (targetIdx < colActual) {
×
806
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
807
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
×
808
      if (sourceIdx >= numOfCols) {
×
809
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
×
810
        colDataSetNNULL(pColData, 0, numOfRows);
×
811
        targetIdx++;
×
812
        continue;
×
813
      }
814

815
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
×
816
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
817
      SColVal colVal = {0};
×
818
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
×
819
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
820
      if (pCol->cid < pColData->info.colId) {
×
821
        sourceIdx++;
×
822
      } else if (pCol->cid == pColData->info.colId) {
×
823
        for (int32_t i = 0; i < pCol->nVal; i++) {
×
824
          code = tColDataGetValue(pCol, i, &colVal);
×
825
          TSDB_CHECK_CODE(code, line, END);
×
826
          code = doSetVal(pColData, i, &colVal);
×
827
          TSDB_CHECK_CODE(code, line, END);
×
828
        }
829
        sourceIdx++;
×
830
        targetIdx++;
×
831
      } else {
832
        colDataSetNNULL(pColData, 0, numOfRows);
×
833
        targetIdx++;
×
834
      }
835
    }
836
  } else {
837
    SArray*         pRows = pSubmitTbData->aRowP;
242,890✔
838
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
242,890✔
839
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
242,890✔
840
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
242,900✔
841

842
    for (int32_t i = 0; i < numOfRows; i++) {
883,868✔
843
      SRow* pRow = taosArrayGetP(pRows, i);
637,052✔
844
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
636,640!
845
      int32_t sourceIdx = 0;
636,919✔
846
      for (int32_t j = 0; j < colActual; j++) {
2,620,427✔
847
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
1,979,457✔
848
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
1,975,841!
849

850
        while (1) {
564,566✔
851
          SColVal colVal = {0};
2,540,407✔
852
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
2,540,407✔
853
          TSDB_CHECK_CODE(code, line, END);
2,545,337!
854

855
          if (colVal.cid < pColData->info.colId) {
2,545,337✔
856
            sourceIdx++;
564,566✔
857
            continue;
564,566✔
858
          } else if (colVal.cid == pColData->info.colId) {
1,980,771!
859
            code = doSetVal(pColData, i, &colVal);
1,983,334✔
860
            TSDB_CHECK_CODE(code, line, END);
1,986,071!
861
            sourceIdx++;
1,986,071✔
862
            break;
1,983,508✔
863
          } else {
864
            colDataSetNULL(pColData, i);
×
865
            break;
866
          }
867
        }
868
      }
869
    }
870
  }
871

872
END:
246,816✔
873
  if (code != 0) {
246,816!
874
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
875
  }
876
  taosMemoryFreeClear(pTSchema);
242,891!
877
  return code;
242,902✔
878
}
879

880
#define PROCESS_VAL                                      \
881
  if (curRow == 0) {                                     \
882
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
883
    buildNew = true;                                     \
884
  } else {                                               \
885
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
886
    if (currentRowAssigned != assigned[j]) {             \
887
      assigned[j] = currentRowAssigned;                  \
888
      buildNew = true;                                   \
889
    }                                                    \
890
  }
891

892
#define SET_DATA                                                     \
893
  if (colVal.cid < pColData->info.colId) {                           \
894
    sourceIdx++;                                                     \
895
  } else if (colVal.cid == pColData->info.colId) {                   \
896
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
897
    sourceIdx++;                                                     \
898
    targetIdx++;                                                     \
899
  }
900

901
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
1✔
902
                               char* assigned, int32_t numOfRows, int32_t curRow,
903
                               int32_t* lastRow) {
904
  int32_t         code = 0;
1✔
905
  SSchemaWrapper* pSW = NULL;
1✔
906
  SSDataBlock*    block = NULL;
1✔
907
  if (taosArrayGetSize(blocks) > 0) {
1!
908
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
909
    TQ_NULL_GO_TO_END(pLastBlock);
×
910
    pLastBlock->info.rows = curRow - *lastRow;
×
911
    *lastRow = curRow;
×
912
  }
913

914
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
1!
915
  TQ_NULL_GO_TO_END(block);
1!
916

917
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
1!
918
  TQ_NULL_GO_TO_END(pSW);
1!
919

920
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
1!
921
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
1!
922
          (int32_t)taosArrayGetSize(block->pDataBlock));
923

924
  block->info.id.uid = pSubmitTbData->uid;
1✔
925
  block->info.version = pReader->msg.ver;
1✔
926
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
1!
927
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
1!
928
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
1!
929
  pSW = NULL;
1✔
930

931
  taosMemoryFreeClear(block);
1!
932

933
END:
×
934
  if (code != 0) {
1!
935
    tqError("processBuildNew failed, code:%d", code);
×
936
  }
937
  tDeleteSchemaWrapper(pSW);
1!
938
  blockDataFreeRes(block);
1✔
939
  taosMemoryFree(block);
1!
940
  return code;
1✔
941
}
942
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
×
943
  int32_t code = 0;
×
944
  int32_t curRow = 0;
×
945
  int32_t lastRow = 0;
×
946

947
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
×
948
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
×
949
  TQ_NULL_GO_TO_END(assigned);
×
950

951
  SArray*   pCols = pSubmitTbData->aCol;
×
952
  SColData* pCol = taosArrayGet(pCols, 0);
×
953
  TQ_NULL_GO_TO_END(pCol);
×
954
  int32_t numOfRows = pCol->nVal;
×
955
  int32_t numOfCols = taosArrayGetSize(pCols);
×
956
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
×
957
          numOfRows);
958
  for (int32_t i = 0; i < numOfRows; i++) {
×
959
    bool buildNew = false;
×
960

961
    for (int32_t j = 0; j < numOfCols; j++) {
×
962
      pCol = taosArrayGet(pCols, j);
×
963
      TQ_NULL_GO_TO_END(pCol);
×
964
      SColVal colVal = {0};
×
965
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
×
966
      PROCESS_VAL
×
967
    }
968

969
    if (buildNew) {
×
970
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows,
×
971
                                       curRow, &lastRow));
972
    }
973

974
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
×
975
    TQ_NULL_GO_TO_END(pBlock);
×
976

977
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
×
978
            (int32_t)taosArrayGetSize(blocks));
979

980
    int32_t targetIdx = 0;
×
981
    int32_t sourceIdx = 0;
×
982
    int32_t colActual = blockDataGetNumOfCols(pBlock);
×
983
    while (targetIdx < colActual) {
×
984
      pCol = taosArrayGet(pCols, sourceIdx);
×
985
      TQ_NULL_GO_TO_END(pCol);
×
986
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
987
      TQ_NULL_GO_TO_END(pColData);
×
988
      SColVal colVal = {0};
×
989
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
×
990
      SET_DATA
×
991
    }
992

993
    curRow++;
×
994
  }
995
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
996
  pLastBlock->info.rows = curRow - lastRow;
×
997
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
×
998
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
999
END:
×
1000
  if (code != TSDB_CODE_SUCCESS) {
×
1001
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1002
  }
1003
  taosMemoryFree(assigned);
×
1004
  return code;
×
1005
}
1006

1007
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
1✔
1008
  int32_t   code = 0;
1✔
1009
  STSchema* pTSchema = NULL;
1✔
1010

1011
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
1✔
1012
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
1!
1013
  TQ_NULL_GO_TO_END(assigned);
1!
1014

1015
  int32_t curRow = 0;
1✔
1016
  int32_t lastRow = 0;
1✔
1017
  SArray* pRows = pSubmitTbData->aRowP;
1✔
1018
  int32_t numOfRows = taosArrayGetSize(pRows);
1✔
1019
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
1✔
1020
  TQ_NULL_GO_TO_END(pTSchema);
1!
1021
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
1!
1022

1023
  for (int32_t i = 0; i < numOfRows; i++) {
2✔
1024
    bool  buildNew = false;
1✔
1025
    SRow* pRow = taosArrayGetP(pRows, i);
1✔
1026
    TQ_NULL_GO_TO_END(pRow);
1!
1027

1028
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
3✔
1029
      SColVal colVal = {0};
2✔
1030
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2!
1031
      PROCESS_VAL
2!
1032
    }
1033

1034
    if (buildNew) {
1!
1035
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows,
1!
1036
                                       curRow, &lastRow));
1037
    }
1038

1039
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
1✔
1040
    TQ_NULL_GO_TO_END(pBlock);
1!
1041

1042
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
1!
1043
            (int32_t)taosArrayGetSize(blocks));
1044

1045
    int32_t targetIdx = 0;
1✔
1046
    int32_t sourceIdx = 0;
1✔
1047
    int32_t colActual = blockDataGetNumOfCols(pBlock);
1✔
1048
    while (targetIdx < colActual) {
3✔
1049
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2✔
1050
      SColVal          colVal = {0};
2✔
1051
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2!
1052
      SET_DATA
2!
1053
    }
1054

1055
    curRow++;
1✔
1056
  }
1057
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
1✔
1058
  pLastBlock->info.rows = curRow - lastRow;
1✔
1059

1060
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
1!
1061
          (int)taosArrayGetSize(blocks));
1062
END:
1✔
1063
  if (code != TSDB_CODE_SUCCESS) {
1!
1064
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1065
  }
1066
  taosMemoryFreeClear(pTSchema);
1!
1067
  taosMemoryFree(assigned);
1!
1068
  return code;
1✔
1069
}
1070

1071
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
×
1072
  int32_t code = 0;
×
1073
  int32_t lino = 0;
×
1074
  void*   createReq = NULL;
×
1075
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
×
1076
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
×
1077

1078
  if (pRsp->createTableNum == 0) {
×
1079
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
×
1080
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
×
1081
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
×
1082
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
×
1083
  }
1084

1085
  uint32_t len = 0;
×
1086
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
×
1087
  TSDB_CHECK_CODE(code, lino, END);
×
1088
  createReq = taosMemoryCalloc(1, len);
×
1089
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
×
1090

1091
  SEncoder encoder = {0};
×
1092
  tEncoderInit(&encoder, createReq, len);
×
1093
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
×
1094
  tEncoderClear(&encoder);
×
1095
  TSDB_CHECK_CODE(code, lino, END);
×
1096
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
×
1097
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
×
1098
  pRsp->createTableNum++;
×
1099
  tqTrace("build create table info msg success");
×
1100

1101
END:
×
1102
  if (code != 0) {
×
1103
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1104
    taosMemoryFree(createReq);
×
1105
  }
1106
  return code;
×
1107
}
1108

1109
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
1✔
1110
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1111
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
1!
1112
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
1✔
1113
  if (pSubmitTbData == NULL) {
1!
1114
    return terrno;
×
1115
  }
1116
  pReader->nextBlk++;
1✔
1117

1118
  if (pSubmitTbDataRet) {
1!
1119
    *pSubmitTbDataRet = pSubmitTbData;
1✔
1120
  }
1121

1122
  if (fetchMeta == ONLY_META) {
1!
1123
    if (pSubmitTbData->pCreateTbReq != NULL) {
×
1124
      if (pRsp->createTableReq == NULL) {
×
1125
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
×
1126
        if (pRsp->createTableReq == NULL) {
×
1127
          return terrno;
×
1128
        }
1129
      }
1130
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
×
1131
        return terrno;
×
1132
      }
1133
      pSubmitTbData->pCreateTbReq = NULL;
×
1134
    }
1135
    return 0;
×
1136
  }
1137

1138
  int32_t sversion = pSubmitTbData->sver;
1✔
1139
  int64_t uid = pSubmitTbData->uid;
1✔
1140
  pReader->lastBlkUid = uid;
1✔
1141

1142
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1!
1143
  taosMemoryFree(pReader->extSchema);
1!
1144
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
1✔
1145
  if (pReader->pSchemaWrapper == NULL) {
1!
1146
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
×
1147
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1148
    pReader->cachedSchemaSuid = 0;
×
1149
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
1150
  }
1151

1152
  if (pSubmitTbData->pCreateTbReq != NULL) {
1!
1153
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
×
1154
    if (code != 0) {
×
1155
      return code;
×
1156
    }
1157
  } else if (rawList != NULL) {
1!
1158
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1159
      return terrno;
×
1160
    }
1161
    pReader->pSchemaWrapper = NULL;
×
1162
    return 0;
×
1163
  }
1164

1165
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
1!
1166
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
×
1167
  } else {
1168
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
1✔
1169
  }
1170
}
1171

1172
int32_t tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList, const char* id) {
72✔
1173
  if (pReader == NULL) {
72!
1174
    return TSDB_CODE_SUCCESS;
×
1175
  }
1176
  pReader->pColIdList = pColIdList;
72✔
1177
  return tqCollectPhysicalTables(pReader, id);
72✔
1178
}
1179

1180
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
72✔
1181
  if (pReader == NULL || tbUidList == NULL) {
72!
1182
    return TSDB_CODE_SUCCESS;
×
1183
  }
1184
  if (pReader->tbIdHash) {
72!
1185
    taosHashClear(pReader->tbIdHash);
×
1186
  } else {
1187
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
72✔
1188
    if (pReader->tbIdHash == NULL) {
72!
1189
      tqError("s-task:%s failed to init hash table", id);
×
1190
      return terrno;
×
1191
    }
1192
  }
1193

1194
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
1,656✔
1195
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
1,584✔
1196
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
1,584!
1197
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1198
      continue;
×
1199
    }
1200
  }
1201

1202
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
72!
1203
  return TSDB_CODE_SUCCESS;
72✔
1204
}
1205

1206
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
22✔
1207
  if (pReader == NULL || pTableUidList == NULL) {
22!
1208
    return;
×
1209
  }
1210
  if (pReader->tbIdHash == NULL) {
22!
1211
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1212
    if (pReader->tbIdHash == NULL) {
×
1213
      tqError("failed to init hash table");
×
1214
      return;
×
1215
    }
1216
  }
1217

1218
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
22✔
1219
  for (int i = 0; i < numOfTables; i++) {
22!
1220
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
×
1221
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
×
1222
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1223
      continue;
×
1224
    }
1225
  }
1226
}
1227

1228
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1229
  if (pReader == NULL) {
×
1230
    return false;
×
1231
  }
1232
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1233
}
1234

1235
bool tqCurrentBlockConsumed(const STqReader* pReader) {
2,369✔
1236
  if (pReader == NULL) {
2,369!
1237
    return false;
×
1238
  }
1239
  return pReader->msg.msgStr == NULL;
2,369✔
1240
}
1241

1242
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
×
1243
  if (pReader == NULL || tbUidList == NULL) {
×
1244
    return;
×
1245
  }
1246
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
×
1247
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
×
1248
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
×
1249
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1250
    }
1251
  }
1252
}
1253

1254
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
2,931✔
1255
  if (pTq == NULL || tbUidList == NULL) {
2,931!
1256
    return TSDB_CODE_INVALID_PARA;
×
1257
  }
1258
  void*   pIter = NULL;
2,931✔
1259
  int32_t vgId = TD_VID(pTq->pVnode);
2,931✔
1260

1261
  // update the table list for each consumer handle
1262
  taosWLockLatch(&pTq->lock);
2,931✔
1263
  while (1) {
2✔
1264
    pIter = taosHashIterate(pTq->pHandle, pIter);
2,933✔
1265
    if (pIter == NULL) {
2,933✔
1266
      break;
2,931✔
1267
    }
1268

1269
    STqHandle* pTqHandle = (STqHandle*)pIter;
2✔
1270
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2!
1271
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
2✔
1272
      if (code != 0) {
2!
1273
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1274
        continue;
×
1275
      }
1276
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
×
1277
      if (!isAdd) {
×
1278
        int32_t sz = taosArrayGetSize(tbUidList);
×
1279
        for (int32_t i = 0; i < sz; i++) {
×
1280
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1281
          if (tbUid &&
×
1282
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1283
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1284
            continue;
×
1285
          }
1286
        }
1287
      }
1288
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
1289
      if (isAdd) {
×
1290
        SArray* list = NULL;
×
1291
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
×
1292
                                    &list, pTqHandle->execHandle.task);
1293
        if (ret != TDB_CODE_SUCCESS) {
×
1294
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1295
                  pTqHandle->consumerId);
1296
          taosArrayDestroy(list);
×
1297
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1298
          taosWUnLockLatch(&pTq->lock);
×
1299

1300
          return ret;
×
1301
        }
1302
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
×
1303
        taosArrayDestroy(list);
×
1304
      } else {
1305
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1306
      }
1307
    }
1308
  }
1309
  taosWUnLockLatch(&pTq->lock);
2,931✔
1310

1311
  // update the table list handle for each stream scanner/wal reader
1312
  streamMetaWLock(pTq->pStreamMeta);
2,931✔
1313
  while (1) {
36✔
1314
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
2,967✔
1315
    if (pIter == NULL) {
2,967✔
1316
      break;
2,931✔
1317
    }
1318

1319
    int64_t      refId = *(int64_t*)pIter;
36✔
1320
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
36✔
1321
    if (pTask != NULL) {
36!
1322
      int32_t taskId = pTask->id.taskId;
36✔
1323

1324
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
36!
1325
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
20✔
1326
        if (code != 0) {
20!
1327
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
×
1328
        }
1329
      }
1330
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
36✔
1331
      if (ret) {
36!
1332
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1333
      }
1334
    }
1335
  }
1336

1337
  streamMetaWUnLock(pTq->pStreamMeta);
2,931✔
1338
  return 0;
2,931✔
1339
}
1340

1341
static void destroySourceScanTables(void* ptr) {
×
1342
  SArray** pTables = ptr;
×
1343
  if (pTables && *pTables) {
×
1344
    taosArrayDestroy(*pTables);
×
1345
    *pTables = NULL;
×
1346
  }
1347
}
×
1348

1349
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1350
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1351
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1352
  if (pCol1->vColId == pCol2->vColId) {
×
1353
    return 0;
×
1354
  } else if (pCol1->vColId < pCol2->vColId) {
×
1355
    return -1;
×
1356
  } else {
1357
    return 1;
×
1358
  }
1359
}
1360

1361
int32_t tqReaderSetVtableInfo(STqReader* pReader, void* vnode, void* ptr, SSHashObj* pVtableInfos,
×
1362
                              SSDataBlock** ppResBlock, const char* idstr) {
1363
  int32_t            code = TSDB_CODE_SUCCESS;
×
1364
  int32_t            lino = 0;
×
1365
  SStorageAPI*       pAPI = ptr;
×
1366
  SVTSourceScanInfo* pScanInfo = NULL;
×
1367
  SHashObj*          pVirtualTables = NULL;
×
1368
  SMetaReader        metaReader = {0};
×
1369
  SVTColInfo         colInfo = {0};
×
1370
  SSchemaWrapper*    schema = NULL;
×
1371

1372
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1373
  TSDB_CHECK_NULL(vnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1374
  TSDB_CHECK_NULL(pAPI, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1375

1376
  pScanInfo = &pReader->vtSourceScanInfo;
×
1377
  taosHashCleanup(pScanInfo->pVirtualTables);
×
1378
  pScanInfo->pVirtualTables = NULL;
×
1379

1380
  if (tSimpleHashGetSize(pVtableInfos) == 0) {
×
1381
    goto _end;
×
1382
  }
1383

1384
  pVirtualTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1385
  TSDB_CHECK_NULL(pVirtualTables, code, lino, _end, terrno);
×
1386
  taosHashSetFreeFp(pVirtualTables, destroySourceScanTables);
×
1387

1388
  int32_t iter = 0;
×
1389
  void*   px = tSimpleHashIterate(pVtableInfos, NULL, &iter);
×
1390
  while (px != NULL) {
×
1391
    int64_t vTbUid = *(int64_t*)tSimpleHashGetKey(px, NULL);
×
1392
    SArray* pColInfos = taosArrayInit(8, sizeof(SVTColInfo));
×
1393
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, terrno);
×
1394
    code = taosHashPut(pVirtualTables, &vTbUid, sizeof(int64_t), &pColInfos, POINTER_BYTES);
×
1395
    TSDB_CHECK_CODE(code, lino, _end);
×
1396

1397
    SSHashObj* pPhysicalTables = *(SSHashObj**)px;
×
1398
    int32_t    iterIn = 0;
×
1399
    void*      pxIn = tSimpleHashIterate(pPhysicalTables, NULL, &iterIn);
×
1400
    while (pxIn != NULL) {
×
1401
      char* physicalTableName = tSimpleHashGetKey(pxIn, NULL);
×
1402
      pAPI->metaReaderFn.clearReader(&metaReader);
×
1403
      pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1404
      code = pAPI->metaReaderFn.getTableEntryByName(&metaReader, physicalTableName);
×
1405
      TSDB_CHECK_CODE(code, lino, _end);
×
1406
      pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1407
      colInfo.pTbUid = metaReader.me.uid;
×
1408

1409
      switch (metaReader.me.type) {
×
1410
        case TSDB_CHILD_TABLE: {
×
1411
          int64_t suid = metaReader.me.ctbEntry.suid;
×
1412
          pAPI->metaReaderFn.clearReader(&metaReader);
×
1413
          pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1414
          code = pAPI->metaReaderFn.getTableEntryByUid(&metaReader, suid);
×
1415
          TSDB_CHECK_CODE(code, lino, _end);
×
1416
          pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1417
          schema = &metaReader.me.stbEntry.schemaRow;
×
1418
          break;
×
1419
        }
1420
        case TSDB_NORMAL_TABLE: {
×
1421
          schema = &metaReader.me.ntbEntry.schemaRow;
×
1422
          break;
×
1423
        }
1424
        default: {
×
1425
          tqError("invalid table type: %d", metaReader.me.type);
×
1426
          code = TSDB_CODE_INVALID_PARA;
×
1427
          TSDB_CHECK_CODE(code, lino, _end);
×
1428
        }
1429
      }
1430

1431
      SArray* pCols = *(SArray**)pxIn;
×
1432
      int32_t ncols = taosArrayGetSize(pCols);
×
1433
      for (int32_t i = 0; i < ncols; ++i) {
×
1434
        SColIdName* pCol = taosArrayGet(pCols, i);
×
1435
        colInfo.vColId = pCol->colId;
×
1436

1437
        for (int32_t j = 0; j < schema->nCols; ++j) {
×
1438
          if (strncmp(pCol->colName, schema->pSchema[j].name, strlen(schema->pSchema[j].name)) == 0) {
×
1439
            colInfo.pColId = schema->pSchema[j].colId;
×
1440
            void* px = taosArrayPush(pColInfos, &colInfo);
×
1441
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1442
            break;
×
1443
          }
1444
        }
1445
      }
1446

1447
      taosArraySort(pColInfos, compareSVTColInfo);
×
1448
      pxIn = tSimpleHashIterate(pPhysicalTables, pxIn, &iterIn);
×
1449
    }
1450

1451
    px = tSimpleHashIterate(pVtableInfos, px, &iter);
×
1452
  }
1453

1454
  pScanInfo->pVirtualTables = pVirtualTables;
×
1455
  pVirtualTables = NULL;
×
1456

1457
  // set the result data block
1458
  if (pReader->pResBlock) {
×
1459
    blockDataDestroy(pReader->pResBlock);
×
1460
  }
1461
  pReader->pResBlock = *ppResBlock;
×
1462
  *ppResBlock = NULL;
×
1463

1464
  // update reader callback for vtable source scan
1465
  pAPI->tqReaderFn.tqRetrieveBlock = tqRetrieveVTableDataBlock;
×
1466
  pAPI->tqReaderFn.tqNextBlockImpl = tqNextVTableSourceBlockImpl;
×
1467
  pAPI->tqReaderFn.tqReaderIsQueriedTable = tqReaderIsQueriedSourceTable;
×
1468

1469
_end:
×
1470
  if (code != TSDB_CODE_SUCCESS) {
×
1471
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1472
  }
1473
  pAPI->metaReaderFn.clearReader(&metaReader);
×
1474
  if (pVirtualTables != NULL) {
×
1475
    taosHashCleanup(pVirtualTables);
×
1476
  }
1477
  return code;
×
1478
}
1479

1480
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
72✔
1481
  int32_t            code = TSDB_CODE_SUCCESS;
72✔
1482
  int32_t            lino = 0;
72✔
1483
  SVTSourceScanInfo* pScanInfo = NULL;
72✔
1484
  SHashObj*          pVirtualTables = NULL;
72✔
1485
  SHashObj*          pPhysicalTables = NULL;
72✔
1486
  void*              pIter = NULL;
72✔
1487
  void*              px = NULL;
72✔
1488

1489
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
72!
1490

1491
  pScanInfo = &pReader->vtSourceScanInfo;
72✔
1492
  taosHashCleanup(pScanInfo->pPhysicalTables);
72✔
1493
  pScanInfo->pPhysicalTables = NULL;
72✔
1494
  taosLRUCacheCleanup(pScanInfo->pPhyTblSchemaCache);
72✔
1495
  pScanInfo->pPhyTblSchemaCache = NULL;
72✔
1496
  pScanInfo->nextVirtualTableIdx = -1;
72✔
1497
  pScanInfo->metaFetch = 0;
72✔
1498
  pScanInfo->cacheHit = 0;
72✔
1499

1500
  pVirtualTables = pScanInfo->pVirtualTables;
72✔
1501
  if (taosHashGetSize(pVirtualTables) == 0 || taosArrayGetSize(pReader->pColIdList) == 0) {
72!
1502
    goto _end;
72✔
1503
  }
1504

1505
  pPhysicalTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1506
  TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno);
×
1507
  taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables);
×
1508

1509
  pIter = taosHashIterate(pVirtualTables, NULL);
×
1510
  while (pIter != NULL) {
×
1511
    int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL);
×
1512
    SArray* pColInfos = *(SArray**)pIter;
×
1513
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1514

1515
    // Traverse all required columns and collect corresponding physical tables
1516
    int32_t nColInfos = taosArrayGetSize(pColInfos);
×
1517
    int32_t nOutputCols = taosArrayGetSize(pReader->pColIdList);
×
1518
    for (int32_t i = 0, j = 0; i < nColInfos && j < nOutputCols;) {
×
1519
      SVTColInfo* pCol = taosArrayGet(pColInfos, i);
×
1520
      col_id_t    colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
×
1521
      if (pCol->vColId < colIdNeed) {
×
1522
        i++;
×
1523
      } else if (pCol->vColId > colIdNeed) {
×
1524
        j++;
×
1525
      } else {
1526
        SArray* pRelatedVTs = NULL;
×
1527
        px = taosHashGet(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t));
×
1528
        if (px == NULL) {
×
1529
          pRelatedVTs = taosArrayInit(8, sizeof(int64_t));
×
1530
          TSDB_CHECK_NULL(pRelatedVTs, code, lino, _end, terrno);
×
1531
          code = taosHashPut(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t), &pRelatedVTs, POINTER_BYTES);
×
1532
          if (code != TSDB_CODE_SUCCESS) {
×
1533
            taosArrayDestroy(pRelatedVTs);
×
1534
            TSDB_CHECK_CODE(code, lino, _end);
×
1535
          }
1536
        } else {
1537
          pRelatedVTs = *(SArray**)px;
×
1538
        }
1539
        if (taosArrayGetSize(pRelatedVTs) == 0 || *(int64_t*)taosArrayGetLast(pRelatedVTs) != vTbUid) {
×
1540
          px = taosArrayPush(pRelatedVTs, &vTbUid);
×
1541
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1542
        }
1543
        i++;
×
1544
        j++;
×
1545
      }
1546
    }
1547
    pIter = taosHashIterate(pVirtualTables, pIter);
×
1548
  }
1549

1550
  pScanInfo->pPhysicalTables = pPhysicalTables;
×
1551
  pPhysicalTables = NULL;
×
1552

1553
  if (taosHashGetSize(pScanInfo->pPhysicalTables) > 0) {
×
1554
    pScanInfo->pPhyTblSchemaCache = taosLRUCacheInit(1024 * 128, -1, .5);
×
1555
    TSDB_CHECK_NULL(pScanInfo->pPhyTblSchemaCache, code, lino, _end, terrno);
×
1556
  }
1557

1558
_end:
×
1559
  if (code != TSDB_CODE_SUCCESS) {
72!
1560
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1561
  }
1562
  if (pIter != NULL) {
72!
1563
    taosHashCancelIterate(pReader->tbIdHash, pIter);
×
1564
  }
1565
  if (pPhysicalTables != NULL) {
72!
1566
    taosHashCleanup(pPhysicalTables);
×
1567
  }
1568
  return code;
72✔
1569
}
1570

1571
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1572
  if (value) {
×
1573
    SSchemaWrapper* pSchemaWrapper = value;
×
1574
    tDeleteSchemaWrapper(pSchemaWrapper);
1575
  }
1576
}
×
1577

1578
int32_t tqRetrieveVTableDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* idstr) {
×
1579
  int32_t            code = TSDB_CODE_SUCCESS;
×
1580
  int32_t            lino = 0;
×
1581
  SVTSourceScanInfo* pScanInfo = NULL;
×
1582
  SSubmitTbData*     pSubmitTbData = NULL;
×
1583
  SSDataBlock*       pBlock = NULL;
×
1584
  void*              px = NULL;
×
1585
  int64_t            vTbUid = 0;
×
1586
  int64_t            pTbUid = 0;
×
1587
  LRUHandle*         h = NULL;
×
1588
  STSchema*          pPhyTblSchema = NULL;
×
1589

1590
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1591
  TSDB_CHECK_NULL(pRes, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1592

1593
  pScanInfo = &pReader->vtSourceScanInfo;
×
1594
  tqDebug("tq reader retrieve vtable data block from %p, nextBlk:%d, vtbIdx:%d, id:%s", pReader->msg.msgStr,
×
1595
          pReader->nextBlk, pScanInfo->nextVirtualTableIdx, idstr);
1596

1597
  *pRes = NULL;
×
1598
  pBlock = pReader->pResBlock;
×
1599
  blockDataCleanup(pBlock);
×
1600

1601
  pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
1602
  TSDB_CHECK_NULL(pSubmitTbData, code, lino, _end, terrno);
×
1603

1604
  pReader->lastTs = pSubmitTbData->ctimeMs;
×
1605

1606
  pTbUid = pSubmitTbData->uid;
×
1607
  px = taosHashGet(pScanInfo->pPhysicalTables, &pTbUid, sizeof(int64_t));
×
1608
  TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1609
  SArray* pRelatedVTs = *(SArray**)px;
×
1610
  vTbUid = *(int64_t*)taosArrayGet(pRelatedVTs, pScanInfo->nextVirtualTableIdx);
×
1611
  px = taosHashGet(pScanInfo->pVirtualTables, &vTbUid, sizeof(int64_t));
×
1612
  TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1613
  SArray* pColInfos = *(SArray**)px;
×
1614
  TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1615

1616
  int32_t nColInfos = taosArrayGetSize(pColInfos);
×
1617
  int32_t nOutputCols = taosArrayGetSize(pBlock->pDataBlock);
×
1618

1619
  int32_t numOfRows = 0;
×
1620
  int32_t nInputCols = 0;
×
1621
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
1622
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
1623
    TSDB_CHECK_NULL(pCol, code, lino, _end, terrno);
×
1624
    numOfRows = pCol->nVal;
×
1625
    nInputCols = taosArrayGetSize(pSubmitTbData->aCol);
×
1626
  } else {
1627
    // try to get physical table schema from cache
1628
    pScanInfo->metaFetch++;
×
1629
    int64_t         cacheKey = (pSubmitTbData->suid == 0) ? pTbUid : pSubmitTbData->suid;
×
1630
    SSchemaWrapper* pWrapper = NULL;
×
1631
    h = taosLRUCacheLookup(pScanInfo->pPhyTblSchemaCache, &cacheKey, sizeof(int64_t));
×
1632
    if (h != NULL) {
×
1633
      pWrapper = taosLRUCacheValue(pScanInfo->pPhyTblSchemaCache, h);
×
1634
      TSDB_CHECK_NULL(pWrapper, code, lino, _end, terrno);
×
1635
    }
1636

1637
    if (pWrapper != NULL && pWrapper->version != pSubmitTbData->sver) {
×
1638
      // reset outdated schema
1639
      tDeleteSchemaWrapper(pWrapper);
1640
      pWrapper = NULL;
×
1641
      taosLRUCacheUpdate(pScanInfo->pPhyTblSchemaCache, h, pWrapper);
×
1642
    }
1643

1644
    if (pWrapper == NULL) {
×
1645
      // get physical table schema from meta
1646
      pWrapper = metaGetTableSchema(pReader->pVnodeMeta, pTbUid, pSubmitTbData->sver, 1, NULL);
×
1647
      if (pWrapper == NULL) {
×
1648
        tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
1649
               "version %d, possibly dropped table",
1650
               pReader->pWalReader->pWal->cfg.vgId, pSubmitTbData->suid, pTbUid, pSubmitTbData->sver);
1651
        TSDB_CHECK_NULL(pWrapper, code, lino, _end, TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND);
×
1652
      }
1653
      if (h == NULL) {
×
1654
        // insert schema to cache
1655
        code = taosLRUCacheInsert(pScanInfo->pPhyTblSchemaCache, &cacheKey, sizeof(int64_t), pWrapper, POINTER_BYTES,
×
1656
                                  freeTableSchemaCache, NULL, NULL, TAOS_LRU_PRIORITY_LOW, NULL);
1657
        if (code != TSDB_CODE_SUCCESS) {
×
1658
          tDeleteSchemaWrapper(pWrapper);
1659
        }
1660
        TSDB_CHECK_CODE(code, lino, _end);
×
1661
      } else {
1662
        // update schema in cache
1663
        taosLRUCacheUpdate(pScanInfo->pPhyTblSchemaCache, h, pWrapper);
×
1664
      }
1665
    } else {
1666
      pScanInfo->cacheHit++;
×
1667
    }
1668
    TSDB_CHECK_NULL(pWrapper, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1669
    pPhyTblSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
×
1670
    TSDB_CHECK_NULL(pPhyTblSchema, code, lino, _end, terrno);
×
1671

1672
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
×
1673
    nInputCols = pPhyTblSchema->numOfCols;
×
1674
  }
1675

1676
  code = blockDataEnsureCapacity(pBlock, numOfRows);
×
1677
  TSDB_CHECK_CODE(code, lino, _end);
×
1678

1679
  // convert one block
1680
  for (int32_t i = 0, j = 1; j < nOutputCols;) {
×
1681
    SColumnInfoData* pOutCol = taosArrayGet(pBlock->pDataBlock, j);
×
1682
    TSDB_CHECK_NULL(pOutCol, code, lino, _end, terrno);
×
1683
    if (i >= nColInfos) {
×
1684
      tqTrace("%s has %d column info, but vtable column %d is missing, id: %s", __func__, nColInfos,
×
1685
              pOutCol->info.colId, idstr);
1686
      colDataSetNNULL(pOutCol, 0, numOfRows);
×
1687
      j++;
×
1688
      continue;
×
1689
    }
1690

1691
    SVTColInfo* pCol = taosArrayGet(pColInfos, i);
×
1692
    TSDB_CHECK_NULL(pCol, code, lino, _end, terrno);
×
1693
    if (pCol->vColId < pOutCol->info.colId) {
×
1694
      i++;
×
1695
      continue;
×
1696
    } else if (pCol->vColId > pOutCol->info.colId) {
×
1697
      tqTrace("%s does not find column info for vtable column %d, closest vtable column is %d, id: %s", __func__,
×
1698
              pOutCol->info.colId, pCol->vColId, idstr);
1699
      colDataSetNNULL(pOutCol, 0, numOfRows);
×
1700
      j++;
×
1701
      continue;
×
1702
    }
1703

1704
    // skip this column if it is from another physical table
1705
    if (pCol->pTbUid != pTbUid) {
×
1706
      tqTrace("skip column %d of virtual table %" PRId64 " since it is from table %" PRId64
×
1707
              ", current block table %" PRId64 ", id: %s",
1708
              pCol->vColId, vTbUid, pCol->pTbUid, pTbUid, idstr);
1709
      colDataSetNNULL(pOutCol, 0, numOfRows);
×
1710
      i++;
×
1711
      j++;
×
1712
      continue;
×
1713
    }
1714

1715
    // copy data from physical table to the result block of virtual table
1716
    if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
1717
      // try to find the corresponding column data of physical table
1718
      SColData* pColData = NULL;
×
1719
      for (int32_t k = 0; k < nInputCols; ++k) {
×
1720
        pColData = taosArrayGet(pSubmitTbData->aCol, k);
×
1721
        TSDB_CHECK_NULL(pColData, code, lino, _end, terrno);
×
1722
        if (pColData->cid == pCol->pColId) {
×
1723
          break;
×
1724
        }
1725
        pColData = NULL;
×
1726
      }
1727
      if (pColData == NULL) {
×
1728
        tqError("%s does not find data of physical table %" PRId64 " column %d, virtual table: %" PRId64
×
1729
                " column: %d, id: %s",
1730
                __func__, pTbUid, pCol->pColId, vTbUid, pCol->vColId, idstr);
1731
        colDataSetNNULL(pOutCol, 0, numOfRows);
×
1732
        i++;
×
1733
        j++;
×
1734
        continue;
×
1735
      }
1736
      SColVal colVal = {0};
×
1737
      for (int32_t k = 0; k < pColData->nVal; ++k) {
×
1738
        code = tColDataGetValue(pColData, k, &colVal);
×
1739
        TSDB_CHECK_CODE(code, lino, _end);
×
1740
        code = doSetVal(pOutCol, k, &colVal);
×
1741
        TSDB_CHECK_CODE(code, lino, _end);
×
1742
      }
1743
    } else {
1744
      SArray* pRows = pSubmitTbData->aRowP;
×
1745
      TSDB_CHECK_NULL(pPhyTblSchema, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1746

1747
      SColVal colVal = {0};
×
1748
      for (int32_t k = 0; k < numOfRows; ++k) {
×
1749
        SRow* pRow = taosArrayGetP(pRows, k);
×
1750
        TSDB_CHECK_NULL(pRow, code, lino, _end, terrno);
×
1751
        for (int32_t l = 0; l < nInputCols; ++l) {
×
1752
          code = tRowGet(pRow, pPhyTblSchema, l, &colVal);
×
1753
          TSDB_CHECK_CODE(code, lino, _end);
×
1754
          if (colVal.cid == pCol->pColId) {
×
1755
            code = doSetVal(pOutCol, k, &colVal);
×
1756
            TSDB_CHECK_CODE(code, lino, _end);
×
1757
            break;
×
1758
          } else if (colVal.cid > pCol->pColId || l == (nInputCols - 1)) {
×
1759
            colDataSetNULL(pOutCol, k);
×
1760
            break;
×
1761
          }
1762
        }
1763
      }
1764
    }
1765

1766
    i++;
×
1767
    j++;
×
1768
  }
1769

1770
  // enforce to fill the first ts column
1771
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
1772
    SColumnInfoData* pOutCol = taosArrayGet(pBlock->pDataBlock, 0);
×
1773
    SColData*        pColData = taosArrayGet(pSubmitTbData->aCol, 0);
×
1774
    TSDB_CHECK_NULL(pColData, code, lino, _end, terrno);
×
1775
    SColVal colVal = {0};
×
1776
    for (int32_t k = 0; k < pColData->nVal; ++k) {
×
1777
      code = tColDataGetValue(pColData, k, &colVal);
×
1778
      TSDB_CHECK_CODE(code, lino, _end);
×
1779
      code = doSetVal(pOutCol, k, &colVal);
×
1780
      TSDB_CHECK_CODE(code, lino, _end);
×
1781
    }
1782
  } else {
1783
    SColumnInfoData* pOutCol = taosArrayGet(pBlock->pDataBlock, 0);
×
1784
    SArray*          pRows = pSubmitTbData->aRowP;
×
1785
    TSDB_CHECK_NULL(pPhyTblSchema, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1786
    SColVal colVal = {0};
×
1787
    for (int32_t k = 0; k < numOfRows; ++k) {
×
1788
      SRow* pRow = taosArrayGetP(pRows, k);
×
1789
      TSDB_CHECK_NULL(pRow, code, lino, _end, terrno);
×
1790
      code = tRowGet(pRow, pPhyTblSchema, 0, &colVal);
×
1791
      TSDB_CHECK_CODE(code, lino, _end);
×
1792
      code = doSetVal(pOutCol, k, &colVal);
×
1793
      TSDB_CHECK_CODE(code, lino, _end);
×
1794
    }
1795
  }
1796

1797
  pBlock->info.rows = numOfRows;
×
1798
  pBlock->info.id.uid = vTbUid;
×
1799
  pBlock->info.id.groupId = pTbUid;
×
1800
  pBlock->info.version = pReader->msg.ver;
×
1801
  pScanInfo->nextVirtualTableIdx++;
×
1802
  if (pScanInfo->nextVirtualTableIdx >= taosArrayGetSize(pRelatedVTs)) {
×
1803
    pReader->nextBlk++;
×
1804
    pScanInfo->nextVirtualTableIdx = -1;
×
1805
  }
1806
  tqDebug("tq reader will retrieve next vtable data block from %p, nextBlk:%d, vtbIdx:%d, id:%s", pReader->msg.msgStr,
×
1807
          pReader->nextBlk, pScanInfo->nextVirtualTableIdx, idstr);
1808

1809
  *pRes = pBlock;
×
1810

1811
_end:
×
1812
  if (code != TSDB_CODE_SUCCESS) {
×
1813
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1814
  }
1815
  if (h != NULL) {
×
1816
    bool bRes = taosLRUCacheRelease(pScanInfo->pPhyTblSchemaCache, h, false);
×
1817
    tqTrace("release LRU cache, res %d, id: %s", bRes, idstr);
×
1818
  }
1819
  if (pPhyTblSchema != NULL) {
×
1820
    taosMemoryFreeClear(pPhyTblSchema);
×
1821
  }
1822
  return code;
×
1823
}
1824

1825
bool tqNextVTableSourceBlockImpl(STqReader* pReader, const char* idstr) {
×
1826
  int32_t            code = TSDB_CODE_SUCCESS;
×
1827
  int32_t            lino = 0;
×
1828
  SVTSourceScanInfo* pScanInfo = NULL;
×
1829

1830
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1831

1832
  pScanInfo = &pReader->vtSourceScanInfo;
×
1833
  if (pReader->msg.msgStr == NULL || taosHashGetSize(pScanInfo->pPhysicalTables) == 0) {
×
1834
    return false;
×
1835
  }
1836

1837
  if (pScanInfo->nextVirtualTableIdx >= 0) {
×
1838
    // The data still needs to be converted into the virtual table result block
1839
    return true;
×
1840
  }
1841

1842
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
1843
  while (pReader->nextBlk < blockSz) {
×
1844
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
1845
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, _end, terrno);
×
1846
    int64_t pTbUid = pSubmitTbData->uid;
×
1847
    void*   px = taosHashGet(pScanInfo->pPhysicalTables, &pTbUid, sizeof(int64_t));
×
1848
    if (px != NULL) {
×
1849
      SArray* pRelatedVTs = *(SArray**)px;
×
1850
      if (taosArrayGetSize(pRelatedVTs) > 0) {
×
1851
        pScanInfo->nextVirtualTableIdx = 0;
×
1852
        return true;
×
1853
      }
1854
    }
1855
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz,
×
1856
            pTbUid);
1857
    pReader->nextBlk++;
×
1858
  }
1859

1860
  tqReaderClearSubmitMsg(pReader);
×
1861
  tqTrace("iterator data block end, total block num:%d", blockSz);
×
1862

1863
_end:
×
1864
  if (code != TSDB_CODE_SUCCESS) {
×
1865
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1866
  }
1867
  return false;
×
1868
}
1869

1870
bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) {
×
1871
  if (pReader == NULL) {
×
1872
    return false;
×
1873
  }
1874
  return taosHashGet(pReader->vtSourceScanInfo.pPhysicalTables, &uid, sizeof(uint64_t)) != NULL;
×
1875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc