• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.72
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr);
20

21
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
×
22
  if (pHandle == NULL || pHead == NULL) {
×
23
    return false;
×
24
  }
25
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
×
26
    return true;
×
27
  }
28

29
  int16_t msgType = pHead->msgType;
×
30
  char*   body = pHead->body;
×
31
  int32_t bodyLen = pHead->bodyLen;
×
32

33
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
×
34
  int64_t  realTbSuid = 0;
×
35
  SDecoder dcoder = {0};
×
36
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
×
37
  int32_t  len = bodyLen - sizeof(SMsgHead);
×
38
  tDecoderInit(&dcoder, data, len);
×
39

40
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
×
41
    SVCreateStbReq req = {0};
×
42
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
×
43
      goto end;
×
44
    }
45
    realTbSuid = req.suid;
×
46
  } else if (msgType == TDMT_VND_DROP_STB) {
×
47
    SVDropStbReq req = {0};
×
48
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
49
      goto end;
×
50
    }
51
    realTbSuid = req.suid;
×
52
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
×
53
    SVCreateTbBatchReq req = {0};
×
54
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
×
55
      goto end;
×
56
    }
57

58
    int32_t        needRebuild = 0;
×
59
    SVCreateTbReq* pCreateReq = NULL;
×
60
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
61
      pCreateReq = req.pReqs + iReq;
×
62
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
63
        needRebuild++;
×
64
      }
65
    }
66
    if (needRebuild == 0) {
×
67
      // do nothing
68
    } else if (needRebuild == req.nReqs) {
×
69
      realTbSuid = tbSuid;
×
70
    } else {
71
      realTbSuid = tbSuid;
×
72
      SVCreateTbBatchReq reqNew = {0};
×
73
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
74
      if (reqNew.pArray == NULL) {
×
75
        tDeleteSVCreateTbBatchReq(&req);
×
76
        goto end;
×
77
      }
78
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
79
        pCreateReq = req.pReqs + iReq;
×
80
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
81
          reqNew.nReqs++;
×
82
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
83
            taosArrayDestroy(reqNew.pArray);
×
84
            tDeleteSVCreateTbBatchReq(&req);
×
85
            goto end;
×
86
          }
87
        }
88
      }
89

90
      int     tlen = 0;
×
91
      int32_t ret = 0;
×
92
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
93
      void* buf = taosMemoryMalloc(tlen);
×
94
      if (NULL == buf) {
×
95
        taosArrayDestroy(reqNew.pArray);
×
96
        tDeleteSVCreateTbBatchReq(&req);
×
97
        goto end;
×
98
      }
99
      SEncoder coderNew = {0};
×
100
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
101
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
102
      tEncoderClear(&coderNew);
×
103
      if (ret < 0) {
×
104
        taosMemoryFree(buf);
×
105
        taosArrayDestroy(reqNew.pArray);
×
106
        tDeleteSVCreateTbBatchReq(&req);
×
107
        goto end;
×
108
      }
109
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
110
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
111
      taosMemoryFree(buf);
×
112
      taosArrayDestroy(reqNew.pArray);
×
113
    }
114

115
    tDeleteSVCreateTbBatchReq(&req);
×
116
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
117
    SVAlterTbReq req = {0};
×
118

119
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
120
      goto end;
×
121
    }
122

123
    SMetaReader mr = {0};
×
124
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
125

126
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
127
      metaReaderClear(&mr);
×
128
      goto end;
×
129
    }
130
    realTbSuid = mr.me.ctbEntry.suid;
×
131
    metaReaderClear(&mr);
×
132
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
133
    SVDropTbBatchReq req = {0};
×
134

135
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
136
      goto end;
×
137
    }
138

139
    int32_t      needRebuild = 0;
×
140
    SVDropTbReq* pDropReq = NULL;
×
141
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
142
      pDropReq = req.pReqs + iReq;
×
143

144
      if (pDropReq->suid == tbSuid) {
×
145
        needRebuild++;
×
146
      }
147
    }
148
    if (needRebuild == 0) {
×
149
      // do nothing
150
    } else if (needRebuild == req.nReqs) {
×
151
      realTbSuid = tbSuid;
×
152
    } else {
153
      realTbSuid = tbSuid;
×
154
      SVDropTbBatchReq reqNew = {0};
×
155
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
156
      if (reqNew.pArray == NULL) {
×
157
        goto end;
×
158
      }
159
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
160
        pDropReq = req.pReqs + iReq;
×
161
        if (pDropReq->suid == tbSuid) {
×
162
          reqNew.nReqs++;
×
163
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
164
            taosArrayDestroy(reqNew.pArray);
×
165
            goto end;
×
166
          }
167
        }
168
      }
169

170
      int     tlen = 0;
×
171
      int32_t ret = 0;
×
172
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
173
      void* buf = taosMemoryMalloc(tlen);
×
174
      if (NULL == buf) {
×
175
        taosArrayDestroy(reqNew.pArray);
×
176
        goto end;
×
177
      }
178
      SEncoder coderNew = {0};
×
179
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
180
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
181
      tEncoderClear(&coderNew);
×
182
      if (ret != 0) {
×
183
        taosMemoryFree(buf);
×
184
        taosArrayDestroy(reqNew.pArray);
×
185
        goto end;
×
186
      }
187
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
188
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
189
      taosMemoryFree(buf);
×
190
      taosArrayDestroy(reqNew.pArray);
×
191
    }
192
  } else if (msgType == TDMT_VND_DELETE) {
×
193
    SDeleteRes req = {0};
×
194
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
195
      goto end;
×
196
    }
197
    realTbSuid = req.suid;
×
198
  }
199

200
end:
×
201
  tDecoderClear(&dcoder);
×
202
  bool tmp = tbSuid == realTbSuid;
×
203
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
×
204
  return tmp;
×
205
}
206

207
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
×
208
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
×
209
    return -1;
×
210
  }
211
  int32_t code = -1;
×
212
  int32_t vgId = TD_VID(pTq->pVnode);
×
213
  int64_t id = pHandle->pWalReader->readerId;
×
214

215
  int64_t offset = *fetchOffset;
×
216
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
×
217
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
×
218
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
×
219

220
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
×
221
          ", 0x%" PRIx64,
222
          vgId, offset, lastVer, committedVer, appliedVer, id);
223

224
  while (offset <= appliedVer) {
×
225
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
×
226
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
227
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
228
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
229
      goto END;
×
230
    }
231

232
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
×
233
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
234

235
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
×
236
      code = walFetchBody(pHandle->pWalReader);
×
237
      goto END;
×
238
    } else {
239
      if (pHandle->fetchMeta != WITH_DATA) {
×
240
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
×
241
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
×
242
          code = walFetchBody(pHandle->pWalReader);
×
243
          if (code < 0) {
×
244
            goto END;
×
245
          }
246

247
          pHead = &(pHandle->pWalReader->pHead->head);
×
248
          if (isValValidForTable(pHandle, pHead)) {
×
249
            code = 0;
×
250
            goto END;
×
251
          } else {
252
            offset++;
×
253
            code = -1;
×
254
            continue;
×
255
          }
256
        }
257
      }
258
      code = walSkipFetchBody(pHandle->pWalReader);
×
259
      if (code < 0) {
×
260
        goto END;
×
261
      }
262
      offset++;
×
263
    }
264
    code = -1;
×
265
  }
266

267
END:
×
268
  *fetchOffset = offset;
×
269
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
×
270
          ", applied:%" PRId64 ", 0x%" PRIx64,
271
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
272
  return code;
×
273
}
274

275
bool tqGetTablePrimaryKey(STqReader* pReader) {
×
276
  if (pReader == NULL) {
×
277
    return false;
×
278
  }
279
  return pReader->hasPrimaryKey;
×
280
}
281

282
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
×
283
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
×
284

285
  if (pReader == NULL) {
×
286
    return;
×
287
  }
288
  bool            ret = false;
×
289
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
×
290
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
×
291
    ret = true;
×
292
  }
293
  tDeleteSchemaWrapper(schema);
294
  pReader->hasPrimaryKey = ret;
×
295
}
296

297
STqReader* tqReaderOpen(SVnode* pVnode) {
9✔
298
  tqDebug("%s:%p", __FUNCTION__, pVnode);
9!
299
  if (pVnode == NULL) {
9!
300
    return NULL;
×
301
  }
302
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
9!
303
  if (pReader == NULL) {
9!
304
    return NULL;
×
305
  }
306

307
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
9✔
308
  if (pReader->pWalReader == NULL) {
9!
309
    taosMemoryFree(pReader);
×
310
    return NULL;
×
311
  }
312

313
  pReader->pVnodeMeta = pVnode->pMeta;
9✔
314
  pReader->pColIdList = NULL;
9✔
315
  pReader->cachedSchemaVer = 0;
9✔
316
  pReader->cachedSchemaSuid = 0;
9✔
317
  pReader->pSchemaWrapper = NULL;
9✔
318
  pReader->tbIdHash = NULL;
9✔
319
  pReader->pResBlock = NULL;
9✔
320

321
  int32_t code = createDataBlock(&pReader->pResBlock);
9✔
322
  if (code) {
9!
323
    terrno = code;
×
324
  }
325

326
  return pReader;
9✔
327
}
328

329
void tqReaderClose(STqReader* pReader) {
9✔
330
  tqDebug("%s:%p", __FUNCTION__, pReader);
9!
331
  if (pReader == NULL) return;
9!
332

333
  // close wal reader
334
  if (pReader->pWalReader) {
9!
335
    walCloseReader(pReader->pWalReader);
9✔
336
  }
337

338
  if (pReader->pSchemaWrapper) {
9!
339
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
9!
340
  }
341

342
  taosMemoryFree(pReader->extSchema);
9!
343
  if (pReader->pColIdList) {
9!
344
    taosArrayDestroy(pReader->pColIdList);
9✔
345
  }
346

347
  // free hash
348
  blockDataDestroy(pReader->pResBlock);
9✔
349
  taosHashCleanup(pReader->tbIdHash);
9✔
350
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
9✔
351

352
  taosHashCleanup(pReader->vtSourceScanInfo.pVirtualTables);
9✔
353
  taosHashCleanup(pReader->vtSourceScanInfo.pPhysicalTables);
9✔
354
  taosLRUCacheCleanup(pReader->vtSourceScanInfo.pPhyTblSchemaCache);
9✔
355
  taosMemoryFree(pReader);
9!
356
}
357

358
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
9✔
359
  if (pReader == NULL) {
9!
360
    return TSDB_CODE_INVALID_PARA;
×
361
  }
362
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
9!
363
    return terrno;
×
364
  }
365
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
9!
366
  return 0;
9✔
367
}
368

369
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
382✔
370
  if (pReader == NULL) {
382!
371
    return false;
×
372
  }
373
  SWalReader* pWalReader = pReader->pWalReader;
382✔
374

375
  int64_t st = taosGetTimestampMs();
382✔
376
  while (1) {
1,830✔
377
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
2,212✔
378
    while (pReader->nextBlk < numOfBlocks) {
3,684✔
379
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
1,830!
380
              pReader->msg.ver);
381

382
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
1,830✔
383
      if (pSubmitTbData == NULL) {
1,829!
384
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
385
                pReader->msg.ver);
386
        return false;
×
387
      }
388
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
1,829!
389
        pReader->nextBlk += 1;
×
390
        continue;
×
391
      }
392
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
1,829!
393
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
360!
394
        SSDataBlock* pRes = NULL;
360✔
395
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
360✔
396
        if (code == TSDB_CODE_SUCCESS) {
360!
397
          return true;
360✔
398
        }
399
      } else {
400
        pReader->nextBlk += 1;
1,472✔
401
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
1,472!
402
      }
403
    }
404

405
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
1,854✔
406
    pReader->msg.msgStr = NULL;
1,853✔
407

408
    int64_t elapsed = taosGetTimestampMs() - st;
1,853✔
409
    if (elapsed > 1000 || elapsed < 0) {
1,853!
410
      return false;
×
411
    }
412

413
    // try next message in wal file
414
    if (walNextValidMsg(pWalReader, false) < 0) {
1,853✔
415
      return false;
22✔
416
    }
417

418
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
1,831✔
419
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
1,831✔
420
    int64_t ver = pWalReader->pHead->head.version;
1,831✔
421
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL) != 0) {
1,831!
422
      return false;
×
423
    }
424
    pReader->nextBlk = 0;
1,830✔
425
  }
426
}
427

428
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList) {
1,831✔
429
  if (pReader == NULL) {
1,831!
430
    return TSDB_CODE_INVALID_PARA;
×
431
  }
432
  pReader->msg.msgStr = msgStr;
1,831✔
433
  pReader->msg.msgLen = msgLen;
1,831✔
434
  pReader->msg.ver = ver;
1,831✔
435

436
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
1,831!
437
  SDecoder decoder = {0};
1,831✔
438

439
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
1,831✔
440
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit, rawList);
1,831✔
441
  tDecoderClear(&decoder);
1,829✔
442

443
  if (code != 0) {
1,830!
444
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
445
  }
446

447
  return code;
1,830✔
448
}
449

450
void tqReaderClearSubmitMsg(STqReader* pReader) {
×
451
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
×
452
  pReader->nextBlk = 0;
×
453
  pReader->msg.msgStr = NULL;
×
454
}
×
455

456
SWalReader* tqGetWalReader(STqReader* pReader) {
413✔
457
  if (pReader == NULL) {
413!
458
    return NULL;
×
459
  }
460
  return pReader->pWalReader;
413✔
461
}
462

463
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
382✔
464
  if (pReader == NULL) {
382!
465
    return NULL;
×
466
  }
467
  return pReader->pResBlock;
382✔
468
}
469

470
int64_t tqGetResultBlockTime(STqReader* pReader) {
382✔
471
  if (pReader == NULL) {
382!
472
    return 0;
×
473
  }
474
  return pReader->lastTs;
382✔
475
}
476

477
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
×
478
  int32_t code = false;
×
479
  int32_t lino = 0;
×
480
  int64_t uid = 0;
×
481
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
×
482
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
×
483
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
×
484

485
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
486
  while (pReader->nextBlk < blockSz) {
×
487
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
488
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
×
489
    uid = pSubmitTbData->uid;
×
490
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
×
491
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
×
492

493
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%"PRId64, pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
×
494
    pReader->nextBlk++;
×
495
  }
496

497
  tqReaderClearSubmitMsg(pReader);
×
498
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
×
499

500
END:
×
501
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
×
502
  return code;
×
503
}
504

505
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
×
506
  int32_t code = false;
×
507
  int32_t lino = 0;
×
508
  int64_t uid = 0;
×
509

510
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
×
511
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
×
512
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
×
513

514
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
515
  while (pReader->nextBlk < blockSz) {
×
516
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
517
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
×
518
    uid = pSubmitTbData->uid;
×
519
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
×
520
    TSDB_CHECK_NULL(ret, code, lino, END, true);
×
521
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
522
    pReader->nextBlk++;
×
523
  }
524
  tqReaderClearSubmitMsg(pReader);
×
525
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
×
526

527
END:
×
528
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
×
529
  return code;
×
530
}
531

532
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSrc) {
×
533
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
×
534
    return TSDB_CODE_INVALID_PARA;
×
535
  }
536
  int32_t code = 0;
×
537

538
  int32_t cnt = 0;
×
539
  for (int32_t i = 0; i < pSrc->nCols; i++) {
×
540
    cnt += mask[i];
×
541
  }
542

543
  pDst->nCols = cnt;
×
544
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
×
545
  if (pDst->pSchema == NULL) {
×
546
    return TAOS_GET_TERRNO(terrno);
×
547
  }
548

549
  int32_t j = 0;
×
550
  for (int32_t i = 0; i < pSrc->nCols; i++) {
×
551
    if (mask[i]) {
×
552
      pDst->pSchema[j++] = pSrc->pSchema[i];
×
553
      SColumnInfoData colInfo =
554
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
×
555
      if (extSrc != NULL) {
×
556
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
557
      }
558
      code = blockDataAppendColInfo(pBlock, &colInfo);
×
559
      if (code != 0) {
×
560
        return code;
×
561
      }
562
    }
563
  }
564
  return 0;
×
565
}
566

567
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
9✔
568
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
9!
569
    return TSDB_CODE_INVALID_PARA;
×
570
  }
571
  SSDataBlock* pBlock = pReader->pResBlock;
9✔
572
  if (blockDataGetNumOfCols(pBlock) > 0) {
9!
573
    blockDataDestroy(pBlock);
×
574
    int32_t code = createDataBlock(&pReader->pResBlock);
×
575
    if (code) {
×
576
      return code;
×
577
    }
578
    pBlock = pReader->pResBlock;
×
579

580
    pBlock->info.id.uid = pReader->cachedSchemaUid;
×
581
    pBlock->info.version = pReader->msg.ver;
×
582
  }
583

584
  int32_t numOfCols = taosArrayGetSize(pColIdList);
9✔
585

586
  if (numOfCols == 0) {  // all columns are required
9!
587
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
588
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
589
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
590

591
      if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
×
592
        decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
593
      }
594
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
595
      if (code != TSDB_CODE_SUCCESS) {
×
596
        blockDataFreeRes(pBlock);
×
597
        return terrno;
×
598
      }
599
    }
600
  } else {
601
    if (numOfCols > pSchema->nCols) {
9!
602
      numOfCols = pSchema->nCols;
×
603
    }
604

605
    int32_t i = 0;
9✔
606
    int32_t j = 0;
9✔
607
    while (i < pSchema->nCols && j < numOfCols) {
42✔
608
      SSchema* pColSchema = &pSchema->pSchema[i];
33✔
609
      col_id_t colIdSchema = pColSchema->colId;
33✔
610

611
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
33✔
612
      if (pColIdNeed == NULL) {
33!
613
        break;
×
614
      }
615
      if (colIdSchema < *pColIdNeed) {
33✔
616
        i++;
6✔
617
      } else if (colIdSchema > *pColIdNeed) {
27!
618
        j++;
×
619
      } else {
620
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
27✔
621
        if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
27!
622
          decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
623
        }
624
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
27✔
625
        if (code != TSDB_CODE_SUCCESS) {
27!
626
          return -1;
×
627
        }
628
        i++;
27✔
629
        j++;
27✔
630
      }
631
    }
632
  }
633

634
  return TSDB_CODE_SUCCESS;
9✔
635
}
636

637
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
1,079✔
638
  int32_t code = TSDB_CODE_SUCCESS;
1,079✔
639

640
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
1,319!
641
    char val[65535 + 2] = {0};
240✔
642
    if (COL_VAL_IS_VALUE(pColVal)) {
240!
643
      if (pColVal->value.pData != NULL) {
240!
644
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
240✔
645
      }
646
      varDataSetLen(val, pColVal->value.nData);
240✔
647
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
240✔
648
    } else {
649
      colDataSetNULL(pColumnInfoData, rowIndex);
×
650
    }
651
  } else {
652
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type), !COL_VAL_IS_VALUE(pColVal));
839!
653
  }
654

655
  return code;
1,079✔
656
}
657

658
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
360✔
659
  if (pReader == NULL || pRes == NULL) {
360!
660
    return TSDB_CODE_INVALID_PARA;
×
661
  }
662
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
360!
663
  int32_t        code = 0;
360✔
664
  int32_t        line = 0;
360✔
665
  STSchema*      pTSchema = NULL;
360✔
666
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
360✔
667
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
360!
668
  SSDataBlock* pBlock = pReader->pResBlock;
360✔
669
  *pRes = pBlock;
360✔
670

671
  blockDataCleanup(pBlock);
360✔
672

673
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
360✔
674
  int32_t sversion = pSubmitTbData->sver;
360✔
675
  int64_t suid = pSubmitTbData->suid;
360✔
676
  int64_t uid = pSubmitTbData->uid;
360✔
677
  pReader->lastTs = pSubmitTbData->ctimeMs;
360✔
678

679
  pBlock->info.id.uid = uid;
360✔
680
  pBlock->info.version = pReader->msg.ver;
360✔
681

682
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
360✔
683
      (pReader->cachedSchemaVer != sversion)) {
351!
684
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
9!
685
    taosMemoryFree(pReader->extSchema);
9!
686
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
9✔
687
    if (pReader->pSchemaWrapper == NULL) {
9!
688
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
689
             "version %d, possibly dropped table",
690
             vgId, suid, uid, pReader->cachedSchemaVer);
691
      pReader->cachedSchemaSuid = 0;
×
692
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
693
    }
694

695
    pReader->cachedSchemaUid = uid;
9✔
696
    pReader->cachedSchemaSuid = suid;
9✔
697
    pReader->cachedSchemaVer = sversion;
9✔
698

699
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
9!
700
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
701
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
702
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
703
    }
704
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
9✔
705
    TSDB_CHECK_CODE(code, line, END);
9!
706
    pBlock = pReader->pResBlock;
9✔
707
    *pRes = pBlock;
9✔
708
  }
709

710
  int32_t numOfRows = 0;
360✔
711
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
360!
712
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
713
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
714
    numOfRows = pCol->nVal;
×
715
  } else {
716
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
360✔
717
  }
718

719
  code = blockDataEnsureCapacity(pBlock, numOfRows);
360✔
720
  TSDB_CHECK_CODE(code, line, END);
360!
721
  pBlock->info.rows = numOfRows;
360✔
722
  int32_t colActual = blockDataGetNumOfCols(pBlock);
360✔
723

724
  // convert and scan one block
725
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
360!
726
    SArray* pCols = pSubmitTbData->aCol;
×
727
    int32_t numOfCols = taosArrayGetSize(pCols);
×
728
    int32_t targetIdx = 0;
×
729
    int32_t sourceIdx = 0;
×
730
    while (targetIdx < colActual) {
×
731
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
732
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
×
733
      if (sourceIdx >= numOfCols) {
×
734
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
×
735
        colDataSetNNULL(pColData, 0, numOfRows);
×
736
        targetIdx++;
×
737
        continue;
×
738
      }
739

740
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
×
741
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
742
      SColVal colVal = {0};
×
743
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
×
744
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
745
      if (pCol->cid < pColData->info.colId) {
×
746
        sourceIdx++;
×
747
      } else if (pCol->cid == pColData->info.colId) {
×
748
        for (int32_t i = 0; i < pCol->nVal; i++) {
×
749
          code = tColDataGetValue(pCol, i, &colVal);
×
750
          TSDB_CHECK_CODE(code, line, END);
×
751
          code = doSetVal(pColData, i, &colVal);
×
752
          TSDB_CHECK_CODE(code, line, END);
×
753
        }
754
        sourceIdx++;
×
755
        targetIdx++;
×
756
      } else {
757
        colDataSetNNULL(pColData, 0, numOfRows);
×
758
        targetIdx++;
×
759
      }
760
    }
761
  } else {
762
    SArray*         pRows = pSubmitTbData->aRowP;
360✔
763
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
360✔
764
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
360✔
765
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
360!
766

767
    for (int32_t i = 0; i < numOfRows; i++) {
720✔
768
      SRow* pRow = taosArrayGetP(pRows, i);
360✔
769
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
360!
770
      int32_t sourceIdx = 0;
360✔
771
      for (int32_t j = 0; j < colActual; j++) {
1,438✔
772
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
1,078✔
773
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
1,078!
774

775
        while (1) {
240✔
776
          SColVal colVal = {0};
1,318✔
777
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
1,318✔
778
          TSDB_CHECK_CODE(code, line, END);
1,318!
779

780
          if (colVal.cid < pColData->info.colId) {
1,318✔
781
            sourceIdx++;
240✔
782
            continue;
240✔
783
          } else if (colVal.cid == pColData->info.colId) {
1,078!
784
            code = doSetVal(pColData, i, &colVal);
1,079✔
785
            TSDB_CHECK_CODE(code, line, END);
1,079!
786
            sourceIdx++;
1,079✔
787
            break;
1,078✔
788
          } else {
789
            colDataSetNULL(pColData, i);
×
790
            break;
×
791
          }
792
        }
793
      }
794
    }
795
  }
796

797
END:
360✔
798
  if (code != 0) {
360!
799
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
800
  }
801
  taosMemoryFreeClear(pTSchema);
360!
802
  return code;
360✔
803
}
804

805
#define PROCESS_VAL                                      \
806
  if (curRow == 0) {                                     \
807
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
808
    buildNew = true;                                     \
809
  } else {                                               \
810
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
811
    if (currentRowAssigned != assigned[j]) {             \
812
      assigned[j] = currentRowAssigned;                  \
813
      buildNew = true;                                   \
814
    }                                                    \
815
  }
816

817
#define SET_DATA                                                     \
818
  if (colVal.cid < pColData->info.colId) {                           \
819
    sourceIdx++;                                                     \
820
  } else if (colVal.cid == pColData->info.colId) {                   \
821
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
822
    sourceIdx++;                                                     \
823
    targetIdx++;                                                     \
824
  }
825

826
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
×
827
                               char* assigned, int32_t numOfRows, int32_t curRow,
828
                               int32_t* lastRow) {
829
  int32_t         code = 0;
×
830
  SSchemaWrapper* pSW = NULL;
×
831
  SSDataBlock*    block = NULL;
×
832
  if (taosArrayGetSize(blocks) > 0) {
×
833
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
834
    TQ_NULL_GO_TO_END(pLastBlock);
×
835
    pLastBlock->info.rows = curRow - *lastRow;
×
836
    *lastRow = curRow;
×
837
  }
838

839
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
840
  TQ_NULL_GO_TO_END(block);
×
841

842
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
843
  TQ_NULL_GO_TO_END(pSW);
×
844

845
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
×
846
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
×
847
          (int32_t)taosArrayGetSize(block->pDataBlock));
848

849
  block->info.id.uid = pSubmitTbData->uid;
×
850
  block->info.version = pReader->msg.ver;
×
851
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
×
852
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
×
853
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
×
854
  pSW = NULL;
×
855

856
  taosMemoryFreeClear(block);
×
857

858
END:
×
859
  if (code != 0) {
×
860
    tqError("processBuildNew failed, code:%d", code);
×
861
  }
862
  tDeleteSchemaWrapper(pSW);
×
863
  blockDataFreeRes(block);
×
864
  taosMemoryFree(block);
×
865
  return code;
×
866
}
867
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
×
868
  int32_t code = 0;
×
869
  int32_t curRow = 0;
×
870
  int32_t lastRow = 0;
×
871

872
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
×
873
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
×
874
  TQ_NULL_GO_TO_END(assigned);
×
875

876
  SArray*   pCols = pSubmitTbData->aCol;
×
877
  SColData* pCol = taosArrayGet(pCols, 0);
×
878
  TQ_NULL_GO_TO_END(pCol);
×
879
  int32_t numOfRows = pCol->nVal;
×
880
  int32_t numOfCols = taosArrayGetSize(pCols);
×
881
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
×
882
          numOfRows);
883
  for (int32_t i = 0; i < numOfRows; i++) {
×
884
    bool buildNew = false;
×
885

886
    for (int32_t j = 0; j < numOfCols; j++) {
×
887
      pCol = taosArrayGet(pCols, j);
×
888
      TQ_NULL_GO_TO_END(pCol);
×
889
      SColVal colVal = {0};
×
890
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
×
891
      PROCESS_VAL
×
892
    }
893

894
    if (buildNew) {
×
895
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows,
×
896
                                       curRow, &lastRow));
897
    }
898

899
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
×
900
    TQ_NULL_GO_TO_END(pBlock);
×
901

902
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
×
903
            (int32_t)taosArrayGetSize(blocks));
904

905
    int32_t targetIdx = 0;
×
906
    int32_t sourceIdx = 0;
×
907
    int32_t colActual = blockDataGetNumOfCols(pBlock);
×
908
    while (targetIdx < colActual) {
×
909
      pCol = taosArrayGet(pCols, sourceIdx);
×
910
      TQ_NULL_GO_TO_END(pCol);
×
911
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
912
      TQ_NULL_GO_TO_END(pColData);
×
913
      SColVal colVal = {0};
×
914
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
×
915
      SET_DATA
×
916
    }
917

918
    curRow++;
×
919
  }
920
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
921
  pLastBlock->info.rows = curRow - lastRow;
×
922
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
×
923
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
924
END:
×
925
  if (code != TSDB_CODE_SUCCESS) {
×
926
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
927
  }
928
  taosMemoryFree(assigned);
×
929
  return code;
×
930
}
931

932
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
×
933
  int32_t   code = 0;
×
934
  STSchema* pTSchema = NULL;
×
935

936
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
×
937
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
×
938
  TQ_NULL_GO_TO_END(assigned);
×
939

940
  int32_t curRow = 0;
×
941
  int32_t lastRow = 0;
×
942
  SArray* pRows = pSubmitTbData->aRowP;
×
943
  int32_t numOfRows = taosArrayGetSize(pRows);
×
944
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
×
945
  TQ_NULL_GO_TO_END(pTSchema);
×
946
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
×
947

948
  for (int32_t i = 0; i < numOfRows; i++) {
×
949
    bool  buildNew = false;
×
950
    SRow* pRow = taosArrayGetP(pRows, i);
×
951
    TQ_NULL_GO_TO_END(pRow);
×
952

953
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
×
954
      SColVal colVal = {0};
×
955
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
×
956
      PROCESS_VAL
×
957
    }
958

959
    if (buildNew) {
×
960
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows,
×
961
                                       curRow, &lastRow));
962
    }
963

964
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
×
965
    TQ_NULL_GO_TO_END(pBlock);
×
966

967
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
×
968
            (int32_t)taosArrayGetSize(blocks));
969

970
    int32_t targetIdx = 0;
×
971
    int32_t sourceIdx = 0;
×
972
    int32_t colActual = blockDataGetNumOfCols(pBlock);
×
973
    while (targetIdx < colActual) {
×
974
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
975
      SColVal          colVal = {0};
×
976
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
×
977
      SET_DATA
×
978
    }
979

980
    curRow++;
×
981
  }
982
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
983
  pLastBlock->info.rows = curRow - lastRow;
×
984

985
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
×
986
          (int)taosArrayGetSize(blocks));
987
END:
×
988
  if (code != TSDB_CODE_SUCCESS) {
×
989
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
990
  }
991
  taosMemoryFreeClear(pTSchema);
×
992
  taosMemoryFree(assigned);
×
993
  return code;
×
994
}
995

996
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
×
997
  int32_t code = 0;
×
998
  int32_t lino = 0;
×
999
  void*   createReq = NULL;
×
1000
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
×
1001
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
×
1002

1003
  if (pRsp->createTableNum == 0) {
×
1004
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
×
1005
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
×
1006
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
×
1007
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
×
1008
  }
1009

1010
  uint32_t len = 0;
×
1011
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
×
1012
  TSDB_CHECK_CODE(code, lino, END);
×
1013
  createReq = taosMemoryCalloc(1, len);
×
1014
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
×
1015

1016
  SEncoder encoder = {0};
×
1017
  tEncoderInit(&encoder, createReq, len);
×
1018
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
×
1019
  tEncoderClear(&encoder);
×
1020
  TSDB_CHECK_CODE(code, lino, END);
×
1021
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
×
1022
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
×
1023
  pRsp->createTableNum++;
×
1024
  tqTrace("build create table info msg success");
×
1025

1026
END:
×
1027
  if (code != 0) {
×
1028
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1029
    taosMemoryFree(createReq);
×
1030
  }
1031
  return code;
×
1032
}
1033

1034
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
×
1035
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1036
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
×
1037
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
1038
  if (pSubmitTbData == NULL) {
×
1039
    return terrno;
×
1040
  }
1041
  pReader->nextBlk++;
×
1042

1043
  if (pSubmitTbDataRet) {
×
1044
    *pSubmitTbDataRet = pSubmitTbData;
×
1045
  }
1046

1047
  if (fetchMeta == ONLY_META) {
×
1048
    if (pSubmitTbData->pCreateTbReq != NULL) {
×
1049
      if (pRsp->createTableReq == NULL) {
×
1050
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
×
1051
        if (pRsp->createTableReq == NULL) {
×
1052
          return terrno;
×
1053
        }
1054
      }
1055
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
×
1056
        return terrno;
×
1057
      }
1058
      pSubmitTbData->pCreateTbReq = NULL;
×
1059
    }
1060
    return 0;
×
1061
  }
1062

1063
  int32_t sversion = pSubmitTbData->sver;
×
1064
  int64_t uid = pSubmitTbData->uid;
×
1065
  pReader->lastBlkUid = uid;
×
1066

1067
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
×
1068
  taosMemoryFreeClear(pReader->extSchema);
×
1069
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema);
×
1070
  if (pReader->pSchemaWrapper == NULL) {
×
1071
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
×
1072
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1073
    pReader->cachedSchemaSuid = 0;
×
1074
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
1075
  }
1076

1077
  if (pSubmitTbData->pCreateTbReq != NULL) {
×
1078
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
×
1079
    if (code != 0) {
×
1080
      return code;
×
1081
    }
1082
  } else if (rawList != NULL) {
×
1083
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1084
      return terrno;
×
1085
    }
1086
    pReader->pSchemaWrapper = NULL;
×
1087
    return 0;
×
1088
  }
1089

1090
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
1091
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
×
1092
  } else {
1093
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
×
1094
  }
1095
}
1096

1097
int32_t tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList, const char* id) {
9✔
1098
  if (pReader == NULL) {
9!
1099
    return TSDB_CODE_SUCCESS;
×
1100
  }
1101
  pReader->pColIdList = pColIdList;
9✔
1102
  return tqCollectPhysicalTables(pReader, id);
9✔
1103
}
1104

1105
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
9✔
1106
  if (pReader == NULL || tbUidList == NULL) {
9!
1107
    return TSDB_CODE_SUCCESS;
×
1108
  }
1109
  if (pReader->tbIdHash) {
9!
1110
    taosHashClear(pReader->tbIdHash);
×
1111
  } else {
1112
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
9✔
1113
    if (pReader->tbIdHash == NULL) {
9!
1114
      tqError("s-task:%s failed to init hash table", id);
×
1115
      return terrno;
×
1116
    }
1117
  }
1118

1119
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
45✔
1120
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
36✔
1121
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
36!
1122
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1123
      continue;
×
1124
    }
1125
  }
1126

1127
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
9!
1128
  return TSDB_CODE_SUCCESS;
9✔
1129
}
1130

1131
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
×
1132
  if (pReader == NULL || pTableUidList == NULL) {
×
1133
    return;
×
1134
  }
1135
  if (pReader->tbIdHash == NULL) {
×
1136
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1137
    if (pReader->tbIdHash == NULL) {
×
1138
      tqError("failed to init hash table");
×
1139
      return;
×
1140
    }
1141
  }
1142

1143
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
×
1144
  for (int i = 0; i < numOfTables; i++) {
×
1145
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
×
1146
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
×
1147
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1148
      continue;
×
1149
    }
1150
  }
1151
}
1152

1153
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1154
  if (pReader == NULL) {
×
1155
    return false;
×
1156
  }
1157
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1158
}
1159

1160
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1161
  if (pReader == NULL) {
×
1162
    return false;
×
1163
  }
1164
  return pReader->msg.msgStr == NULL;
×
1165
}
1166

1167
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
×
1168
  if (pReader == NULL || tbUidList == NULL) {
×
1169
    return;
×
1170
  }
1171
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
×
1172
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
×
1173
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
×
1174
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1175
    }
1176
  }
1177
}
1178

1179
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
120✔
1180
  if (pTq == NULL || tbUidList == NULL) {
120!
1181
    return TSDB_CODE_INVALID_PARA;
×
1182
  }
1183
  void*   pIter = NULL;
120✔
1184
  int32_t vgId = TD_VID(pTq->pVnode);
120✔
1185

1186
  // update the table list for each consumer handle
1187
  taosWLockLatch(&pTq->lock);
120✔
1188
  while (1) {
×
1189
    pIter = taosHashIterate(pTq->pHandle, pIter);
120✔
1190
    if (pIter == NULL) {
120!
1191
      break;
120✔
1192
    }
1193

1194
    STqHandle* pTqHandle = (STqHandle*)pIter;
×
1195
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
×
1196
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
×
1197
      if (code != 0) {
×
1198
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1199
        continue;
×
1200
      }
1201
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
×
1202
      if (!isAdd) {
×
1203
        int32_t sz = taosArrayGetSize(tbUidList);
×
1204
        for (int32_t i = 0; i < sz; i++) {
×
1205
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1206
          if (tbUid &&
×
1207
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1208
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1209
            continue;
×
1210
          }
1211
        }
1212
      }
1213
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
1214
      if (isAdd) {
×
1215
        SArray* list = NULL;
×
1216
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
×
1217
                                    &list, pTqHandle->execHandle.task);
1218
        if (ret != TDB_CODE_SUCCESS) {
×
1219
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1220
                  pTqHandle->consumerId);
1221
          taosArrayDestroy(list);
×
1222
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1223
          taosWUnLockLatch(&pTq->lock);
×
1224

1225
          return ret;
×
1226
        }
1227
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
×
1228
        taosArrayDestroy(list);
×
1229
      } else {
1230
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1231
      }
1232
    }
1233
  }
1234
  taosWUnLockLatch(&pTq->lock);
120✔
1235

1236
  // update the table list handle for each stream scanner/wal reader
1237
/* STREAMTODO
1238
  streamMetaWLock(pTq->pStreamMeta);
1239
  while (1) {
1240
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
1241
    if (pIter == NULL) {
1242
      break;
1243
    }
1244

1245
    int64_t      refId = *(int64_t*)pIter;
1246
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
1247
    if (pTask != NULL) {
1248
      int32_t taskId = pTask->id.taskId;
1249

1250
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
1251
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
1252
        if (code != 0) {
1253
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
1254
        }
1255
      }
1256
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
1257
      if (ret) {
1258
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
1259
      }
1260
    }
1261
  }
1262

1263
  streamMetaWUnLock(pTq->pStreamMeta);
1264
*/  
1265
  return 0;
120✔
1266
}
1267

1268
static void destroySourceScanTables(void* ptr) {
×
1269
  SArray** pTables = ptr;
×
1270
  if (pTables && *pTables) {
×
1271
    taosArrayDestroy(*pTables);
×
1272
    *pTables = NULL;
×
1273
  }
1274
}
×
1275

1276
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1277
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1278
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1279
  if (pCol1->vColId == pCol2->vColId) {
×
1280
    return 0;
×
1281
  } else if (pCol1->vColId < pCol2->vColId) {
×
1282
    return -1;
×
1283
  } else {
1284
    return 1;
×
1285
  }
1286
}
1287

1288
int32_t tqReaderSetVtableInfo(STqReader* pReader, void* vnode, void* ptr, SSHashObj* pVtableInfos,
×
1289
                              SSDataBlock** ppResBlock, const char* idstr) {
1290
  int32_t            code = TSDB_CODE_SUCCESS;
×
1291
  int32_t            lino = 0;
×
1292
  SStorageAPI*       pAPI = ptr;
×
1293
  SVTSourceScanInfo* pScanInfo = NULL;
×
1294
  SHashObj*          pVirtualTables = NULL;
×
1295
  SMetaReader        metaReader = {0};
×
1296
  SVTColInfo         colInfo = {0};
×
1297
  SSchemaWrapper*    schema = NULL;
×
1298

1299
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1300
  TSDB_CHECK_NULL(vnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1301
  TSDB_CHECK_NULL(pAPI, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1302

1303
  pScanInfo = &pReader->vtSourceScanInfo;
×
1304
  taosHashCleanup(pScanInfo->pVirtualTables);
×
1305
  pScanInfo->pVirtualTables = NULL;
×
1306

1307
  if (tSimpleHashGetSize(pVtableInfos) == 0) {
×
1308
    goto _end;
×
1309
  }
1310

1311
  pVirtualTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1312
  TSDB_CHECK_NULL(pVirtualTables, code, lino, _end, terrno);
×
1313
  taosHashSetFreeFp(pVirtualTables, destroySourceScanTables);
×
1314

1315
  int32_t iter = 0;
×
1316
  void*   px = tSimpleHashIterate(pVtableInfos, NULL, &iter);
×
1317
  while (px != NULL) {
×
1318
    int64_t vTbUid = *(int64_t*)tSimpleHashGetKey(px, NULL);
×
1319
    SArray* pColInfos = taosArrayInit(8, sizeof(SVTColInfo));
×
1320
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, terrno);
×
1321
    code = taosHashPut(pVirtualTables, &vTbUid, sizeof(int64_t), &pColInfos, POINTER_BYTES);
×
1322
    TSDB_CHECK_CODE(code, lino, _end);
×
1323

1324
    SSHashObj* pPhysicalTables = *(SSHashObj**)px;
×
1325
    int32_t    iterIn = 0;
×
1326
    void*      pxIn = tSimpleHashIterate(pPhysicalTables, NULL, &iterIn);
×
1327
    while (pxIn != NULL) {
×
1328
      char* physicalTableName = tSimpleHashGetKey(pxIn, NULL);
×
1329
      pAPI->metaReaderFn.clearReader(&metaReader);
×
1330
      pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1331
      code = pAPI->metaReaderFn.getTableEntryByName(&metaReader, physicalTableName);
×
1332
      TSDB_CHECK_CODE(code, lino, _end);
×
1333
      pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1334
      colInfo.pTbUid = metaReader.me.uid;
×
1335

1336
      switch (metaReader.me.type) {
×
1337
        case TSDB_CHILD_TABLE: {
×
1338
          int64_t suid = metaReader.me.ctbEntry.suid;
×
1339
          pAPI->metaReaderFn.clearReader(&metaReader);
×
1340
          pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1341
          code = pAPI->metaReaderFn.getTableEntryByUid(&metaReader, suid);
×
1342
          TSDB_CHECK_CODE(code, lino, _end);
×
1343
          pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1344
          schema = &metaReader.me.stbEntry.schemaRow;
×
1345
          break;
×
1346
        }
1347
        case TSDB_NORMAL_TABLE: {
×
1348
          schema = &metaReader.me.ntbEntry.schemaRow;
×
1349
          break;
×
1350
        }
1351
        default: {
×
1352
          tqError("invalid table type: %d", metaReader.me.type);
×
1353
          code = TSDB_CODE_INVALID_PARA;
×
1354
          TSDB_CHECK_CODE(code, lino, _end);
×
1355
        }
1356
      }
1357

1358
      SArray* pCols = *(SArray**)pxIn;
×
1359
      int32_t ncols = taosArrayGetSize(pCols);
×
1360
      for (int32_t i = 0; i < ncols; ++i) {
×
1361
        SColIdName* pCol = taosArrayGet(pCols, i);
×
1362
        colInfo.vColId = pCol->colId;
×
1363

1364
        for (int32_t j = 0; j < schema->nCols; ++j) {
×
1365
          if (strncmp(pCol->colName, schema->pSchema[j].name, strlen(schema->pSchema[j].name)) == 0) {
×
1366
            colInfo.pColId = schema->pSchema[j].colId;
×
1367
            void* px = taosArrayPush(pColInfos, &colInfo);
×
1368
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1369
            break;
×
1370
          }
1371
        }
1372
      }
1373

1374
      taosArraySort(pColInfos, compareSVTColInfo);
×
1375
      pxIn = tSimpleHashIterate(pPhysicalTables, pxIn, &iterIn);
×
1376
    }
1377

1378
    px = tSimpleHashIterate(pVtableInfos, px, &iter);
×
1379
  }
1380

1381
  pScanInfo->pVirtualTables = pVirtualTables;
×
1382
  pVirtualTables = NULL;
×
1383

1384
  // set the result data block
1385
  if (pReader->pResBlock) {
×
1386
    blockDataDestroy(pReader->pResBlock);
×
1387
  }
1388
  pReader->pResBlock = *ppResBlock;
×
1389
  *ppResBlock = NULL;
×
1390

1391
  // update reader callback for vtable source scan
1392
  pAPI->tqReaderFn.tqNextBlockImpl = tqNextVTableSourceBlockImpl;
×
1393
  pAPI->tqReaderFn.tqReaderIsQueriedTable = tqReaderIsQueriedSourceTable;
×
1394

1395
_end:
×
1396
  if (code != TSDB_CODE_SUCCESS) {
×
1397
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1398
  }
1399
  pAPI->metaReaderFn.clearReader(&metaReader);
×
1400
  if (pVirtualTables != NULL) {
×
1401
    taosHashCleanup(pVirtualTables);
×
1402
  }
1403
  return code;
×
1404
}
1405

1406
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
9✔
1407
  int32_t            code = TSDB_CODE_SUCCESS;
9✔
1408
  int32_t            lino = 0;
9✔
1409
  SVTSourceScanInfo* pScanInfo = NULL;
9✔
1410
  SHashObj*          pVirtualTables = NULL;
9✔
1411
  SHashObj*          pPhysicalTables = NULL;
9✔
1412
  void*              pIter = NULL;
9✔
1413
  void*              px = NULL;
9✔
1414

1415
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
9!
1416

1417
  pScanInfo = &pReader->vtSourceScanInfo;
9✔
1418
  taosHashCleanup(pScanInfo->pPhysicalTables);
9✔
1419
  pScanInfo->pPhysicalTables = NULL;
9✔
1420
  taosLRUCacheCleanup(pScanInfo->pPhyTblSchemaCache);
9✔
1421
  pScanInfo->pPhyTblSchemaCache = NULL;
9✔
1422
  pScanInfo->nextVirtualTableIdx = -1;
9✔
1423
  pScanInfo->metaFetch = 0;
9✔
1424
  pScanInfo->cacheHit = 0;
9✔
1425

1426
  pVirtualTables = pScanInfo->pVirtualTables;
9✔
1427
  if (taosHashGetSize(pVirtualTables) == 0 || taosArrayGetSize(pReader->pColIdList) == 0) {
9!
1428
    goto _end;
9✔
1429
  }
1430

1431
  pPhysicalTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1432
  TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno);
×
1433
  taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables);
×
1434

1435
  pIter = taosHashIterate(pVirtualTables, NULL);
×
1436
  while (pIter != NULL) {
×
1437
    int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL);
×
1438
    SArray* pColInfos = *(SArray**)pIter;
×
1439
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1440

1441
    // Traverse all required columns and collect corresponding physical tables
1442
    int32_t nColInfos = taosArrayGetSize(pColInfos);
×
1443
    int32_t nOutputCols = taosArrayGetSize(pReader->pColIdList);
×
1444
    for (int32_t i = 0, j = 0; i < nColInfos && j < nOutputCols;) {
×
1445
      SVTColInfo* pCol = taosArrayGet(pColInfos, i);
×
1446
      col_id_t    colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
×
1447
      if (pCol->vColId < colIdNeed) {
×
1448
        i++;
×
1449
      } else if (pCol->vColId > colIdNeed) {
×
1450
        j++;
×
1451
      } else {
1452
        SArray* pRelatedVTs = NULL;
×
1453
        px = taosHashGet(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t));
×
1454
        if (px == NULL) {
×
1455
          pRelatedVTs = taosArrayInit(8, sizeof(int64_t));
×
1456
          TSDB_CHECK_NULL(pRelatedVTs, code, lino, _end, terrno);
×
1457
          code = taosHashPut(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t), &pRelatedVTs, POINTER_BYTES);
×
1458
          if (code != TSDB_CODE_SUCCESS) {
×
1459
            taosArrayDestroy(pRelatedVTs);
×
1460
            TSDB_CHECK_CODE(code, lino, _end);
×
1461
          }
1462
        } else {
1463
          pRelatedVTs = *(SArray**)px;
×
1464
        }
1465
        if (taosArrayGetSize(pRelatedVTs) == 0 || *(int64_t*)taosArrayGetLast(pRelatedVTs) != vTbUid) {
×
1466
          px = taosArrayPush(pRelatedVTs, &vTbUid);
×
1467
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1468
        }
1469
        i++;
×
1470
        j++;
×
1471
      }
1472
    }
1473
    pIter = taosHashIterate(pVirtualTables, pIter);
×
1474
  }
1475

1476
  pScanInfo->pPhysicalTables = pPhysicalTables;
×
1477
  pPhysicalTables = NULL;
×
1478

1479
  if (taosHashGetSize(pScanInfo->pPhysicalTables) > 0) {
×
1480
    pScanInfo->pPhyTblSchemaCache = taosLRUCacheInit(1024 * 128, -1, .5);
×
1481
    TSDB_CHECK_NULL(pScanInfo->pPhyTblSchemaCache, code, lino, _end, terrno);
×
1482
  }
1483

1484
_end:
×
1485
  if (code != TSDB_CODE_SUCCESS) {
9!
1486
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1487
  }
1488
  if (pIter != NULL) {
9!
1489
    taosHashCancelIterate(pReader->tbIdHash, pIter);
×
1490
  }
1491
  if (pPhysicalTables != NULL) {
9!
1492
    taosHashCleanup(pPhysicalTables);
×
1493
  }
1494
  return code;
9✔
1495
}
1496

1497
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1498
  if (value) {
×
1499
    SSchemaWrapper* pSchemaWrapper = value;
×
1500
    tDeleteSchemaWrapper(pSchemaWrapper);
1501
  }
1502
}
×
1503

1504
bool tqNextVTableSourceBlockImpl(STqReader* pReader, const char* idstr) {
×
1505
  int32_t            code = TSDB_CODE_SUCCESS;
×
1506
  int32_t            lino = 0;
×
1507
  SVTSourceScanInfo* pScanInfo = NULL;
×
1508

1509
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1510

1511
  pScanInfo = &pReader->vtSourceScanInfo;
×
1512
  if (pReader->msg.msgStr == NULL || taosHashGetSize(pScanInfo->pPhysicalTables) == 0) {
×
1513
    return false;
×
1514
  }
1515

1516
  if (pScanInfo->nextVirtualTableIdx >= 0) {
×
1517
    // The data still needs to be converted into the virtual table result block
1518
    return true;
×
1519
  }
1520

1521
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
1522
  while (pReader->nextBlk < blockSz) {
×
1523
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
1524
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, _end, terrno);
×
1525
    int64_t pTbUid = pSubmitTbData->uid;
×
1526
    void*   px = taosHashGet(pScanInfo->pPhysicalTables, &pTbUid, sizeof(int64_t));
×
1527
    if (px != NULL) {
×
1528
      SArray* pRelatedVTs = *(SArray**)px;
×
1529
      if (taosArrayGetSize(pRelatedVTs) > 0) {
×
1530
        pScanInfo->nextVirtualTableIdx = 0;
×
1531
        return true;
×
1532
      }
1533
    }
1534
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz,
×
1535
            pTbUid);
1536
    pReader->nextBlk++;
×
1537
  }
1538

1539
  tqReaderClearSubmitMsg(pReader);
×
1540
  tqTrace("iterator data block end, total block num:%d", blockSz);
×
1541

1542
_end:
×
1543
  if (code != TSDB_CODE_SUCCESS) {
×
1544
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1545
  }
1546
  return false;
×
1547
}
1548

1549
bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) {
×
1550
  if (pReader == NULL) {
×
1551
    return false;
×
1552
  }
1553
  return taosHashGet(pReader->vtSourceScanInfo.pPhysicalTables, &uid, sizeof(uint64_t)) != NULL;
×
1554
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc