• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4774

01 Oct 2025 04:06AM UTC coverage: 58.357% (-0.3%) from 58.689%
#4774

push

travis-ci

web-flow
Merge pull request #33171 from taosdata/merge/3.3.6tomain

merge: from 3.3.6 to main branch

138553 of 302743 branches covered (45.77%)

Branch coverage included in aggregate %.

15 of 20 new or added lines in 2 files covered. (75.0%)

2558 existing lines in 138 files now uncovered.

209925 of 294403 relevant lines covered (71.31%)

5595496.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

48.96
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr);
20

21
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
546✔
22
  if (pHandle == NULL || pHead == NULL) {
546!
23
    return false;
×
24
  }
25
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
546!
26
    return true;
546✔
27
  }
28

29
  int16_t msgType = pHead->msgType;
×
30
  char*   body = pHead->body;
×
31
  int32_t bodyLen = pHead->bodyLen;
×
32

33
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
×
34
  int64_t  realTbSuid = 0;
×
35
  SDecoder dcoder = {0};
×
36
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
×
37
  int32_t  len = bodyLen - sizeof(SMsgHead);
×
38
  tDecoderInit(&dcoder, data, len);
×
39

40
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
×
41
    SVCreateStbReq req = {0};
×
42
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
×
43
      goto end;
×
44
    }
45
    realTbSuid = req.suid;
×
46
  } else if (msgType == TDMT_VND_DROP_STB) {
×
47
    SVDropStbReq req = {0};
×
48
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
49
      goto end;
×
50
    }
51
    realTbSuid = req.suid;
×
52
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
×
53
    SVCreateTbBatchReq req = {0};
×
54
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
×
55
      goto end;
×
56
    }
57

58
    int32_t        needRebuild = 0;
×
59
    SVCreateTbReq* pCreateReq = NULL;
×
60
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
61
      pCreateReq = req.pReqs + iReq;
×
62
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
63
        needRebuild++;
×
64
      }
65
    }
66
    if (needRebuild == 0) {
×
67
      // do nothing
68
    } else if (needRebuild == req.nReqs) {
×
69
      realTbSuid = tbSuid;
×
70
    } else {
71
      realTbSuid = tbSuid;
×
72
      SVCreateTbBatchReq reqNew = {0};
×
73
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
74
      if (reqNew.pArray == NULL) {
×
75
        tDeleteSVCreateTbBatchReq(&req);
×
76
        goto end;
×
77
      }
78
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
79
        pCreateReq = req.pReqs + iReq;
×
80
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
81
          reqNew.nReqs++;
×
82
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
83
            taosArrayDestroy(reqNew.pArray);
×
84
            tDeleteSVCreateTbBatchReq(&req);
×
85
            goto end;
×
86
          }
87
        }
88
      }
89

90
      int     tlen = 0;
×
91
      int32_t ret = 0;
×
92
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
93
      void* buf = taosMemoryMalloc(tlen);
×
94
      if (NULL == buf) {
×
95
        taosArrayDestroy(reqNew.pArray);
×
96
        tDeleteSVCreateTbBatchReq(&req);
×
97
        goto end;
×
98
      }
99
      SEncoder coderNew = {0};
×
100
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
101
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
102
      tEncoderClear(&coderNew);
×
103
      if (ret < 0) {
×
104
        taosMemoryFree(buf);
×
105
        taosArrayDestroy(reqNew.pArray);
×
106
        tDeleteSVCreateTbBatchReq(&req);
×
107
        goto end;
×
108
      }
109
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
110
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
111
      taosMemoryFree(buf);
×
112
      taosArrayDestroy(reqNew.pArray);
×
113
    }
114

115
    tDeleteSVCreateTbBatchReq(&req);
×
116
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
117
    SVAlterTbReq req = {0};
×
118

119
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
120
      goto end;
×
121
    }
122

123
    SMetaReader mr = {0};
×
124
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
125

126
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
127
      metaReaderClear(&mr);
×
128
      goto end;
×
129
    }
130
    realTbSuid = mr.me.ctbEntry.suid;
×
131
    metaReaderClear(&mr);
×
132
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
133
    SVDropTbBatchReq req = {0};
×
134

135
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
136
      goto end;
×
137
    }
138

139
    int32_t      needRebuild = 0;
×
140
    SVDropTbReq* pDropReq = NULL;
×
141
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
142
      pDropReq = req.pReqs + iReq;
×
143

144
      if (pDropReq->suid == tbSuid) {
×
145
        needRebuild++;
×
146
      }
147
    }
148
    if (needRebuild == 0) {
×
149
      // do nothing
150
    } else if (needRebuild == req.nReqs) {
×
151
      realTbSuid = tbSuid;
×
152
    } else {
153
      realTbSuid = tbSuid;
×
154
      SVDropTbBatchReq reqNew = {0};
×
155
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
156
      if (reqNew.pArray == NULL) {
×
157
        goto end;
×
158
      }
159
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
160
        pDropReq = req.pReqs + iReq;
×
161
        if (pDropReq->suid == tbSuid) {
×
162
          reqNew.nReqs++;
×
163
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
164
            taosArrayDestroy(reqNew.pArray);
×
165
            goto end;
×
166
          }
167
        }
168
      }
169

170
      int     tlen = 0;
×
171
      int32_t ret = 0;
×
172
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
173
      void* buf = taosMemoryMalloc(tlen);
×
174
      if (NULL == buf) {
×
175
        taosArrayDestroy(reqNew.pArray);
×
176
        goto end;
×
177
      }
178
      SEncoder coderNew = {0};
×
179
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
180
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
181
      tEncoderClear(&coderNew);
×
182
      if (ret != 0) {
×
183
        taosMemoryFree(buf);
×
184
        taosArrayDestroy(reqNew.pArray);
×
185
        goto end;
×
186
      }
187
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
188
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
189
      taosMemoryFree(buf);
×
190
      taosArrayDestroy(reqNew.pArray);
×
191
    }
192
  } else if (msgType == TDMT_VND_DELETE) {
×
193
    SDeleteRes req = {0};
×
194
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
195
      goto end;
×
196
    }
197
    realTbSuid = req.suid;
×
198
  }
199

200
end:
×
201
  tDecoderClear(&dcoder);
×
202
  bool tmp = tbSuid == realTbSuid;
×
203
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
×
204
  return tmp;
×
205
}
206

207
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
141,017✔
208
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
141,017!
209
    return -1;
×
210
  }
211
  int32_t code = -1;
141,214✔
212
  int32_t vgId = TD_VID(pTq->pVnode);
141,214✔
213
  int64_t id = pHandle->pWalReader->readerId;
141,214✔
214

215
  int64_t offset = *fetchOffset;
141,214✔
216
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
141,214✔
217
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
141,268✔
218
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
141,303✔
219

220
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
141,362!
221
          ", 0x%" PRIx64,
222
          vgId, offset, lastVer, committedVer, appliedVer, id);
223

224
  while (offset <= appliedVer) {
149,192✔
225
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
142,218!
226
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
227
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
228
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
229
      goto END;
×
230
    }
231

232
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
142,216!
233
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
234

235
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
142,216✔
236
      code = walFetchBody(pHandle->pWalReader);
134,086✔
237
      goto END;
134,067✔
238
    } else {
239
      if (pHandle->fetchMeta != WITH_DATA) {
8,130✔
240
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
687✔
241
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
687✔
242
          code = walFetchBody(pHandle->pWalReader);
546✔
243
          if (code < 0) {
546!
244
            goto END;
×
245
          }
246

247
          pHead = &(pHandle->pWalReader->pHead->head);
546✔
248
          if (isValValidForTable(pHandle, pHead)) {
546!
249
            code = 0;
546✔
250
            goto END;
546✔
251
          } else {
252
            offset++;
×
253
            code = -1;
×
254
            continue;
×
255
          }
256
        }
257
      }
258
      code = walSkipFetchBody(pHandle->pWalReader);
7,584✔
259
      if (code < 0) {
7,583!
260
        goto END;
×
261
      }
262
      offset++;
7,583✔
263
    }
264
    code = -1;
7,583✔
265
  }
266

267
END:
6,974✔
268
  *fetchOffset = offset;
141,587✔
269
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
141,587✔
270
          ", applied:%" PRId64 ", 0x%" PRIx64,
271
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
272
  return code;
141,605✔
273
}
274

275
bool tqGetTablePrimaryKey(STqReader* pReader) {
19,035✔
276
  if (pReader == NULL) {
19,035!
277
    return false;
×
278
  }
279
  return pReader->hasPrimaryKey;
19,035✔
280
}
281

282
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
134✔
283
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
134!
284

285
  if (pReader == NULL) {
134!
286
    return;
×
287
  }
288
  bool            ret = false;
134✔
289
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL, 0);
134✔
290
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
133!
291
    ret = true;
×
292
  }
293
  tDeleteSchemaWrapper(schema);
294
  pReader->hasPrimaryKey = ret;
133✔
295
}
296

297
STqReader* tqReaderOpen(SVnode* pVnode) {
1,285✔
298
  tqDebug("%s:%p", __FUNCTION__, pVnode);
1,285!
299
  if (pVnode == NULL) {
1,302!
300
    return NULL;
×
301
  }
302
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
1,302!
303
  if (pReader == NULL) {
1,302!
304
    return NULL;
×
305
  }
306

307
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
1,302✔
308
  if (pReader->pWalReader == NULL) {
1,302!
309
    taosMemoryFree(pReader);
×
310
    return NULL;
×
311
  }
312

313
  pReader->pVnodeMeta = pVnode->pMeta;
1,302✔
314
  pReader->pColIdList = NULL;
1,302✔
315
  pReader->cachedSchemaVer = 0;
1,302✔
316
  pReader->cachedSchemaSuid = 0;
1,302✔
317
  pReader->pSchemaWrapper = NULL;
1,302✔
318
  pReader->tbIdHash = NULL;
1,302✔
319
  pReader->pResBlock = NULL;
1,302✔
320

321
  int32_t code = createDataBlock(&pReader->pResBlock);
1,302✔
322
  if (code) {
1,302!
323
    terrno = code;
×
324
  }
325

326
  return pReader;
1,302✔
327
}
328

329
void tqReaderClose(STqReader* pReader) {
1,318✔
330
  tqDebug("%s:%p", __FUNCTION__, pReader);
1,318!
331
  if (pReader == NULL) return;
1,319✔
332

333
  // close wal reader
334
  if (pReader->pWalReader) {
1,302!
335
    walCloseReader(pReader->pWalReader);
1,302✔
336
  }
337

338
  if (pReader->pSchemaWrapper) {
1,302✔
339
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
537!
340
  }
341

342
  taosMemoryFree(pReader->extSchema);
1,302!
343
  if (pReader->pColIdList) {
1,302✔
344
    taosArrayDestroy(pReader->pColIdList);
1,004✔
345
  }
346

347
  // free hash
348
  blockDataDestroy(pReader->pResBlock);
1,302✔
349
  taosHashCleanup(pReader->tbIdHash);
1,302✔
350
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
1,302✔
351

352
  taosHashCleanup(pReader->vtSourceScanInfo.pVirtualTables);
1,301✔
353
  taosHashCleanup(pReader->vtSourceScanInfo.pPhysicalTables);
1,301✔
354
  taosLRUCacheCleanup(pReader->vtSourceScanInfo.pPhyTblSchemaCache);
1,301✔
355
  taosMemoryFree(pReader);
1,302!
356
}
357

358
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
1,591✔
359
  if (pReader == NULL) {
1,591!
360
    return TSDB_CODE_INVALID_PARA;
×
361
  }
362
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
1,591✔
363
    return terrno;
81✔
364
  }
365
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
1,510!
366
  return 0;
1,510✔
367
}
368

369
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
247,709✔
370
  if (pReader == NULL) {
247,709!
371
    return false;
×
372
  }
373
  SWalReader* pWalReader = pReader->pWalReader;
247,709✔
374

375
  int64_t st = taosGetTimestampMs();
247,709✔
376
  while (1) {
268,353✔
377
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
516,062✔
378
    while (pReader->nextBlk < numOfBlocks) {
565,611✔
379
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
268,407!
380
              pReader->msg.ver);
381

382
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
268,407✔
383
      if (pSubmitTbData == NULL) {
268,399!
UNCOV
384
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
385
                pReader->msg.ver);
386
        return false;
247,691✔
387
      }
388
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
268,399✔
389
        pReader->nextBlk += 1;
106✔
390
        continue;
106✔
391
      }
392
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
268,290!
393
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
218,845!
394
        SSDataBlock* pRes = NULL;
218,845✔
395
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
218,845✔
396
        if (code == TSDB_CODE_SUCCESS) {
218,841!
397
          return true;
218,844✔
398
        }
399
      } else {
400
        pReader->nextBlk += 1;
49,455✔
401
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
49,455!
402
      }
403
    }
404

405
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
297,204✔
406
    pReader->msg.msgStr = NULL;
297,205✔
407

408
    int64_t elapsed = taosGetTimestampMs() - st;
297,205✔
409
    if (elapsed > 1000 || elapsed < 0) {
297,205!
UNCOV
410
      return false;
×
411
    }
412

413
    // try next message in wal file
414
    if (walNextValidMsg(pWalReader, false) < 0) {
297,206✔
415
      return false;
28,848✔
416
    }
417

418
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
268,353✔
419
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
268,353✔
420
    int64_t ver = pWalReader->pHead->head.version;
268,353✔
421
    SDecoder decoder = {0};
268,353✔
422
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder) != 0) {
268,353!
423
      tDecoderClear(&decoder);
×
424
      return false;
×
425
    }
426
    tDecoderClear(&decoder);
268,326✔
427
    pReader->nextBlk = 0;
268,353✔
428
  }
429
}
430

431
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
402,425✔
432
  if (pReader == NULL) {
402,425!
433
    return TSDB_CODE_INVALID_PARA;
×
434
  }
435
  pReader->msg.msgStr = msgStr;
402,425✔
436
  pReader->msg.msgLen = msgLen;
402,425✔
437
  pReader->msg.ver = ver;
402,425✔
438

439
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
402,425!
440

441
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
402,425✔
442
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
402,412✔
443

444
  if (code != 0) {
402,353!
445
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
446
  }
447

448
  return code;
402,354✔
449
}
450

451
void tqReaderClearSubmitMsg(STqReader* pReader) {
267,605✔
452
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
267,605✔
453
  pReader->nextBlk = 0;
268,018✔
454
  pReader->msg.msgStr = NULL;
268,018✔
455
}
268,018✔
456

457
SWalReader* tqGetWalReader(STqReader* pReader) {
279,442✔
458
  if (pReader == NULL) {
279,442!
459
    return NULL;
×
460
  }
461
  return pReader->pWalReader;
279,442✔
462
}
463

464
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
247,680✔
465
  if (pReader == NULL) {
247,680!
466
    return NULL;
×
467
  }
468
  return pReader->pResBlock;
247,680✔
469
}
470

471
int64_t tqGetResultBlockTime(STqReader* pReader) {
247,672✔
472
  if (pReader == NULL) {
247,672!
473
    return 0;
×
474
  }
475
  return pReader->lastTs;
247,672✔
476
}
477

478
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
69,908✔
479
  int32_t code = false;
69,908✔
480
  int32_t lino = 0;
69,908✔
481
  int64_t uid = 0;
69,908✔
482
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
69,908!
483
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
69,908!
484
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
69,908!
485

486
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
69,908✔
487
  while (pReader->nextBlk < blockSz) {
73,429✔
488
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
36,737✔
489
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
36,734!
490
    uid = pSubmitTbData->uid;
36,734✔
491
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
36,734✔
492
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
36,735✔
493

494
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
3,513!
495
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
496
    pReader->nextBlk++;
3,513✔
497
  }
498

499
  tqReaderClearSubmitMsg(pReader);
36,692✔
500
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
36,717!
501

502
END:
36,717✔
503
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
69,939!
504
  return code;
69,930✔
505
}
506

507
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
194,282✔
508
  int32_t code = false;
194,282✔
509
  int32_t lino = 0;
194,282✔
510
  int64_t uid = 0;
194,282✔
511

512
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
194,282!
513
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
194,282!
514
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
194,282!
515

516
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
194,282✔
517
  while (pReader->nextBlk < blockSz) {
194,297✔
518
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
97,410✔
519
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
97,406!
520
    uid = pSubmitTbData->uid;
97,406✔
521
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
97,406✔
522
    TSDB_CHECK_NULL(ret, code, lino, END, true);
97,415!
523
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
524
    pReader->nextBlk++;
×
525
  }
526
  tqReaderClearSubmitMsg(pReader);
96,887✔
527
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
97,175!
528

529
END:
97,175✔
530
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
194,590!
531
  return code;
194,510✔
532
}
533

534
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
130,241✔
535
                    SExtSchema* extSrc) {
536
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
130,241!
537
    return TSDB_CODE_INVALID_PARA;
×
538
  }
539
  int32_t code = 0;
130,388✔
540

541
  int32_t cnt = 0;
130,388✔
542
  for (int32_t i = 0; i < pSrc->nCols; i++) {
741,588✔
543
    cnt += mask[i];
611,200✔
544
  }
545

546
  pDst->nCols = cnt;
130,388✔
547
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
130,388!
548
  if (pDst->pSchema == NULL) {
130,364!
549
    return TAOS_GET_TERRNO(terrno);
×
550
  }
551

552
  int32_t j = 0;
130,364✔
553
  for (int32_t i = 0; i < pSrc->nCols; i++) {
740,942✔
554
    if (mask[i]) {
610,362!
555
      pDst->pSchema[j++] = pSrc->pSchema[i];
610,376✔
556
      SColumnInfoData colInfo =
557
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
610,376✔
558
      if (extSrc != NULL) {
611,024✔
559
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
218✔
560
      }
561
      code = blockDataAppendColInfo(pBlock, &colInfo);
611,024✔
562
      if (code != 0) {
610,592!
563
        return code;
×
564
      }
565
    }
566
  }
567
  return 0;
130,580✔
568
}
569

570
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
394✔
571
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
394!
572
    return TSDB_CODE_INVALID_PARA;
×
573
  }
574
  SSDataBlock* pBlock = pReader->pResBlock;
394✔
575
  if (blockDataGetNumOfCols(pBlock) > 0) {
394✔
576
    blockDataDestroy(pBlock);
2✔
577
    int32_t code = createDataBlock(&pReader->pResBlock);
2✔
578
    if (code) {
2!
579
      return code;
×
580
    }
581
    pBlock = pReader->pResBlock;
2✔
582

583
    pBlock->info.id.uid = pReader->cachedSchemaUid;
2✔
584
    pBlock->info.version = pReader->msg.ver;
2✔
585
  }
586

587
  int32_t numOfCols = taosArrayGetSize(pColIdList);
394✔
588

589
  if (numOfCols == 0) {  // all columns are required
394!
590
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
591
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
592
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
593

594
      if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
×
595
        decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
596
      }
597
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
598
      if (code != TSDB_CODE_SUCCESS) {
×
599
        blockDataFreeRes(pBlock);
×
600
        return terrno;
×
601
      }
602
    }
603
  } else {
604
    if (numOfCols > pSchema->nCols) {
394✔
605
      numOfCols = pSchema->nCols;
2✔
606
    }
607

608
    int32_t i = 0;
394✔
609
    int32_t j = 0;
394✔
610
    while (i < pSchema->nCols && j < numOfCols) {
3,204✔
611
      SSchema* pColSchema = &pSchema->pSchema[i];
2,809✔
612
      col_id_t colIdSchema = pColSchema->colId;
2,809✔
613

614
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
2,809✔
615
      if (pColIdNeed == NULL) {
2,809!
616
        break;
×
617
      }
618
      if (colIdSchema < *pColIdNeed) {
2,809✔
619
        i++;
3✔
620
      } else if (colIdSchema > *pColIdNeed) {
2,806!
621
        j++;
×
622
      } else {
623
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
2,806✔
624
        if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) {
2,807!
625
          decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
132✔
626
        }
627
        int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
2,807✔
628
        if (code != TSDB_CODE_SUCCESS) {
2,807!
629
          return -1;
×
630
        }
631
        i++;
2,807✔
632
        j++;
2,807✔
633
      }
634
    }
635
  }
636

637
  return TSDB_CODE_SUCCESS;
395✔
638
}
639

640
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
641
  int32_t code = 0;
×
642
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
643
    return TSDB_CODE_INVALID_PARA;
×
644
  }
645
  // TODO(yhDeng)
646
  if (COL_VAL_IS_VALUE(pColVal)) {
×
647
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
648
    if (val == NULL) {
×
649
      return terrno;
×
650
    }
651

652
    uint64_t seq = 0;
×
653
    int32_t  len = 0;
×
654
    if (pColVal->value.pData != NULL) {
×
655
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
656
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
657
      }
658
      SBlobItem item = {0};
×
659
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
660
      if (code != 0) {
×
661
        taosMemoryFree(val);
×
662
        terrno = code;
×
663
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
664
        return code;
×
665
      }
666

667
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
668
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
669
      len = item.len;
×
670
    }
671

672
    blobDataSetLen(val, len);
×
673
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
674

675
    taosMemoryFree(val);
×
676
  } else {
677
    colDataSetNULL(pColumnInfoData, idx);
×
678
  }
679
  return code;
×
680
}
681
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
29,672,905✔
682
  int32_t code = TSDB_CODE_SUCCESS;
29,672,905✔
683

684
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
29,672,905!
685
    if (COL_VAL_IS_VALUE(pColVal)) {
5,976,006!
686
      char val[65535 + 2] = {0};
6,485,189✔
687
      if (pColVal->value.pData != NULL) {
6,485,189!
688
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
6,630,365✔
689
      }
690
      varDataSetLen(val, pColVal->value.nData);
6,485,189✔
691
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
6,485,189✔
692
    } else {
693
      colDataSetNULL(pColumnInfoData, rowIndex);
×
694
    }
695
  } else {
696
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
23,696,899!
697
                         !COL_VAL_IS_VALUE(pColVal));
23,696,899✔
698
  }
699

700
  return code;
29,904,010✔
701
}
702

703
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
218,837✔
704
  if (pReader == NULL || pRes == NULL) {
218,837!
705
    return TSDB_CODE_INVALID_PARA;
×
706
  }
707
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
218,848!
708
  int32_t        code = 0;
218,852✔
709
  int32_t        line = 0;
218,852✔
710
  STSchema*      pTSchema = NULL;
218,852✔
711
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
218,852✔
712
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
218,858!
713
  SSDataBlock* pBlock = pReader->pResBlock;
218,858✔
714
  *pRes = pBlock;
218,858✔
715

716
  blockDataCleanup(pBlock);
218,858✔
717

718
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
218,829✔
719
  int32_t sversion = pSubmitTbData->sver;
218,829✔
720
  int64_t suid = pSubmitTbData->suid;
218,829✔
721
  int64_t uid = pSubmitTbData->uid;
218,829✔
722
  pReader->lastTs = pSubmitTbData->ctimeMs;
218,829✔
723

724
  pBlock->info.id.uid = uid;
218,829✔
725
  pBlock->info.version = pReader->msg.ver;
218,829✔
726

727
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
218,829✔
728
      (pReader->cachedSchemaVer != sversion)) {
218,440✔
729
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
390✔
730
    taosMemoryFree(pReader->extSchema);
394!
731
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema, 0);
394✔
732
    if (pReader->pSchemaWrapper == NULL) {
394!
733
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
734
             "version %d, possibly dropped table",
735
             vgId, suid, uid, pReader->cachedSchemaVer);
736
      pReader->cachedSchemaSuid = 0;
×
737
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
738
    }
739

740
    pReader->cachedSchemaUid = uid;
394✔
741
    pReader->cachedSchemaSuid = suid;
394✔
742
    pReader->cachedSchemaVer = sversion;
394✔
743

744
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
394!
745
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
746
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
747
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
748
    }
749
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
394✔
750
    TSDB_CHECK_CODE(code, line, END);
394!
751
    pBlock = pReader->pResBlock;
394✔
752
    *pRes = pBlock;
394✔
753
  }
754

755
  int32_t numOfRows = 0;
218,833✔
756
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
218,833✔
757
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
4✔
758
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
4!
759
    numOfRows = pCol->nVal;
4✔
760
  } else {
761
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
218,829✔
762
  }
763

764
  code = blockDataEnsureCapacity(pBlock, numOfRows);
218,832✔
765
  TSDB_CHECK_CODE(code, line, END);
218,829!
766
  pBlock->info.rows = numOfRows;
218,829✔
767
  int32_t colActual = blockDataGetNumOfCols(pBlock);
218,829✔
768

769
  // convert and scan one block
770
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
218,832✔
771
    SArray* pCols = pSubmitTbData->aCol;
4✔
772
    int32_t numOfCols = taosArrayGetSize(pCols);
4✔
773
    int32_t targetIdx = 0;
4✔
774
    int32_t sourceIdx = 0;
4✔
775
    while (targetIdx < colActual) {
18✔
776
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14✔
777
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
14!
778
      if (sourceIdx >= numOfCols) {
14✔
779
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
4!
780
        colDataSetNNULL(pColData, 0, numOfRows);
4!
781
        targetIdx++;
4✔
782
        continue;
4✔
783
      }
784

785
      uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
10!
786

787
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
10✔
788
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
10!
789
      SColVal colVal = {0};
10✔
790
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
10!
791
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
792
      if (pCol->cid < pColData->info.colId) {
10✔
793
        sourceIdx++;
4✔
794
      } else if (pCol->cid == pColData->info.colId) {
6✔
795
        for (int32_t i = 0; i < pCol->nVal; i++) {
12✔
796
          code = tColDataGetValue(pCol, i, &colVal);
8✔
797
          TSDB_CHECK_CODE(code, line, END);
8!
798

799
          if (isBlob == 0) {
8!
800
            code = doSetVal(pColData, i, &colVal);
8✔
801
          } else {
802
            code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobSet);
×
803
          }
804
          TSDB_CHECK_CODE(code, line, END);
8!
805
        }
806
        sourceIdx++;
4✔
807
        targetIdx++;
4✔
808
      } else {
809
        colDataSetNNULL(pColData, 0, numOfRows);
2!
810
        targetIdx++;
2✔
811
      }
812
    }
813
  } else {
814
    SArray*         pRows = pSubmitTbData->aRowP;
218,828✔
815
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
218,828✔
816
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
218,828✔
817
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
218,833✔
818

819
    for (int32_t i = 0; i < numOfRows; i++) {
6,194,418✔
820
      SRow* pRow = taosArrayGetP(pRows, i);
5,731,795✔
821
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
5,706,788✔
822
      int32_t sourceIdx = 0;
5,706,528✔
823
      for (int32_t j = 0; j < colActual; j++) {
25,314,973✔
824
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
19,339,387✔
825
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
19,204,354!
826

827
        uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
19,236,020!
828
        while (1) {
×
829
          SColVal colVal = {0};
19,236,020✔
830
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
19,236,020✔
831
          TSDB_CHECK_CODE(code, line, END);
19,444,444!
832

833
          if (colVal.cid < pColData->info.colId) {
19,444,444!
834
            sourceIdx++;
×
835
            continue;
×
836
          } else if (colVal.cid == pColData->info.colId) {
19,444,444!
837
            if (isBlob == 0) {
19,480,238!
838
              code = doSetVal(pColData, i, &colVal);
19,489,884✔
839
            } else {
840
              code = doSetBlobVal(pColData, i, &colVal, pSubmitTbData->pBlobSet);
×
841
            }
842

843
            TSDB_CHECK_CODE(code, line, END);
19,644,239!
844

845
            sourceIdx++;
19,644,239✔
846
            break;
19,608,445✔
847
          } else {
848
            colDataSetNULL(pColData, i);
×
849
            break;
×
850
          }
851
        }
852
      }
853
    }
854
  }
855

856
END:
462,623✔
857
  if (code != 0) {
462,627!
858
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
859
  }
860
  taosMemoryFreeClear(pTSchema);
218,829!
861
  return code;
218,853✔
862
}
863

864
#define PROCESS_VAL                                      \
865
  if (curRow == 0) {                                     \
866
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
867
    buildNew = true;                                     \
868
  } else {                                               \
869
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
870
    if (currentRowAssigned != assigned[j]) {             \
871
      assigned[j] = currentRowAssigned;                  \
872
      buildNew = true;                                   \
873
    }                                                    \
874
  }
875

876
#define SET_DATA                                                                                    \
877
  if (colVal.cid < pColData->info.colId) {                                                          \
878
    sourceIdx++;                                                                                    \
879
  } else if (colVal.cid == pColData->info.colId) {                                                  \
880
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
881
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
882
    } else {                                                                                        \
883
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
884
    }                                                                                               \
885
    sourceIdx++;                                                                                    \
886
    targetIdx++;                                                                                    \
887
  } else {                                                                                          \
888
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
889
    targetIdx++;                                                                                    \
890
  }
891

892
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
130,164✔
893
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
894
  int32_t         code = 0;
130,164✔
895
  SSchemaWrapper* pSW = NULL;
130,164✔
896
  SSDataBlock*    block = NULL;
130,164✔
897
  if (taosArrayGetSize(blocks) > 0) {
130,164!
898
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
899
    TQ_NULL_GO_TO_END(pLastBlock);
×
900
    pLastBlock->info.rows = curRow - *lastRow;
×
901
    *lastRow = curRow;
×
902
  }
903

904
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
130,271!
905
  TQ_NULL_GO_TO_END(block);
130,347!
906

907
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
130,347!
908
  TQ_NULL_GO_TO_END(pSW);
130,382!
909

910
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
130,382!
911
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
130,402!
912
          (int32_t)taosArrayGetSize(block->pDataBlock));
913

914
  block->info.id.uid = pSubmitTbData->uid;
130,402✔
915
  block->info.version = pReader->msg.ver;
130,402✔
916
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
130,402!
917
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
130,361!
918
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
130,343!
919
  pSW = NULL;
130,343✔
920

921
  taosMemoryFreeClear(block);
130,343!
922

923
END:
×
924
  if (code != 0) {
130,411!
925
    tqError("processBuildNew failed, code:%d", code);
×
926
  }
927
  tDeleteSchemaWrapper(pSW);
130,411!
928
  blockDataFreeRes(block);
130,309✔
929
  taosMemoryFree(block);
130,252!
930
  return code;
130,388✔
931
}
932
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
106✔
933
  int32_t code = 0;
106✔
934
  int32_t curRow = 0;
106✔
935
  int32_t lastRow = 0;
106✔
936

937
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
106✔
938
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
106!
939
  TQ_NULL_GO_TO_END(assigned);
106!
940

941
  SArray*   pCols = pSubmitTbData->aCol;
106✔
942
  SColData* pCol = taosArrayGet(pCols, 0);
106✔
943
  TQ_NULL_GO_TO_END(pCol);
106!
944
  int32_t numOfRows = pCol->nVal;
106✔
945
  int32_t numOfCols = taosArrayGetSize(pCols);
106✔
946
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
106!
947
          numOfRows);
948
  for (int32_t i = 0; i < numOfRows; i++) {
302✔
949
    bool buildNew = false;
196✔
950

951
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
1,006✔
952
      int32_t k = 0;
810✔
953
      for (; k < numOfCols; k++) {
2,165!
954
        pCol = taosArrayGet(pCols, k);
2,165✔
955
        TQ_NULL_GO_TO_END(pCol);
2,165!
956
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
2,165✔
957
          SColVal colVal = {0};
810✔
958
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
810!
959
          PROCESS_VAL
810!
960
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
810!
961
          break;
810✔
962
        }
963
      }
964
      if (k >= numOfCols) {
810!
965
        // this column is not in the current row, so we set it to NULL
966
        assigned[j] = 0;
×
967
        buildNew = true;
×
968
      }
969
    }
970

971
    if (buildNew) {
196✔
972
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
106!
973
    }
974

975
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
196✔
976
    TQ_NULL_GO_TO_END(pBlock);
196!
977

978
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
196!
979
            (int32_t)taosArrayGetSize(blocks));
980

981
    int32_t targetIdx = 0;
196✔
982
    int32_t sourceIdx = 0;
196✔
983
    int32_t colActual = blockDataGetNumOfCols(pBlock);
196✔
984
    while (targetIdx < colActual && sourceIdx < numOfCols) {
1,000!
985
      pCol = taosArrayGet(pCols, sourceIdx);
804✔
986
      TQ_NULL_GO_TO_END(pCol);
804!
987
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
804✔
988
      TQ_NULL_GO_TO_END(pColData);
804!
989
      SColVal colVal = {0};
804✔
990
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
804!
991
      SET_DATA
804!
992
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
804!
993
    }
994

995
    curRow++;
196✔
996
  }
997
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
106✔
998
  pLastBlock->info.rows = curRow - lastRow;
106✔
999
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
106!
1000
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1001
END:
106✔
1002
  if (code != TSDB_CODE_SUCCESS) {
106!
1003
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1004
  }
1005
  taosMemoryFree(assigned);
106!
1006
  return code;
106✔
1007
}
1008

1009
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
130,153✔
1010
  int32_t   code = 0;
130,153✔
1011
  STSchema* pTSchema = NULL;
130,153✔
1012

1013
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
130,153✔
1014
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
130,153!
1015
  TQ_NULL_GO_TO_END(assigned);
130,331!
1016

1017
  int32_t curRow = 0;
130,331✔
1018
  int32_t lastRow = 0;
130,331✔
1019
  SArray* pRows = pSubmitTbData->aRowP;
130,331✔
1020
  int32_t numOfRows = taosArrayGetSize(pRows);
130,331✔
1021
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
130,235✔
1022
  TQ_NULL_GO_TO_END(pTSchema);
130,290!
1023
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
130,290!
1024

1025
  for (int32_t i = 0; i < numOfRows; i++) {
2,531,472✔
1026
    bool  buildNew = false;
2,309,440✔
1027
    SRow* pRow = taosArrayGetP(pRows, i);
2,309,440✔
1028
    TQ_NULL_GO_TO_END(pRow);
2,297,998!
1029

1030
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
12,382,246✔
1031
      SColVal colVal = {0};
10,049,448✔
1032
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
10,049,448!
1033
      PROCESS_VAL
10,074,563!
1034
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
10,074,563!
1035
    }
1036

1037
    if (buildNew) {
2,332,798✔
1038
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
130,305!
1039
    }
1040

1041
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
2,332,739✔
1042
    TQ_NULL_GO_TO_END(pBlock);
2,298,756!
1043

1044
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
2,298,756!
1045
            (int32_t)taosArrayGetSize(blocks));
1046

1047
    int32_t targetIdx = 0;
2,298,756✔
1048
    int32_t sourceIdx = 0;
2,298,756✔
1049
    int32_t colActual = blockDataGetNumOfCols(pBlock);
2,298,756✔
1050
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
12,608,347!
1051
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
10,206,995✔
1052
      TQ_NULL_GO_TO_END(pColData);
10,142,800!
1053
      SColVal          colVal = {0};
10,142,800✔
1054
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
10,142,800!
1055
      SET_DATA
10,135,957!
1056
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
10,314,385!
1057
    }
1058

1059
    curRow++;
2,401,352✔
1060
  }
1061
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
222,032✔
1062
  pLastBlock->info.rows = curRow - lastRow;
130,131✔
1063

1064
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
130,131!
1065
          (int)taosArrayGetSize(blocks));
1066
END:
130,131✔
1067
  if (code != TSDB_CODE_SUCCESS) {
130,118!
1068
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1069
  }
1070
  taosMemoryFreeClear(pTSchema);
130,118!
1071
  taosMemoryFree(assigned);
130,296!
1072
  return code;
130,286✔
1073
}
1074

1075
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
30✔
1076
  int32_t code = 0;
30✔
1077
  int32_t lino = 0;
30✔
1078
  void*   createReq = NULL;
30✔
1079
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
30!
1080
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
30!
1081

1082
  if (pRsp->createTableNum == 0) {
30✔
1083
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
16✔
1084
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
16!
1085
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
16✔
1086
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
16!
1087
  }
1088

1089
  uint32_t len = 0;
30✔
1090
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
30!
1091
  TSDB_CHECK_CODE(code, lino, END);
30!
1092
  createReq = taosMemoryCalloc(1, len);
30!
1093
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
30!
1094

1095
  SEncoder encoder = {0};
30✔
1096
  tEncoderInit(&encoder, createReq, len);
30✔
1097
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
30✔
1098
  tEncoderClear(&encoder);
30✔
1099
  TSDB_CHECK_CODE(code, lino, END);
30!
1100
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
60!
1101
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
60!
1102
  pRsp->createTableNum++;
30✔
1103
  tqTrace("build create table info msg success");
30!
1104

1105
END:
30✔
1106
  if (code != 0) {
30!
1107
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1108
    taosMemoryFree(createReq);
×
1109
  }
1110
  return code;
30✔
1111
}
1112

1113
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
130,637✔
1114
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1115
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
130,637!
1116
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
130,637✔
1117
  if (pSubmitTbData == NULL) {
130,634!
1118
    return terrno;
×
1119
  }
1120
  pReader->nextBlk++;
130,641✔
1121

1122
  if (pSubmitTbDataRet) {
130,641✔
1123
    *pSubmitTbDataRet = pSubmitTbData;
130,636✔
1124
  }
1125

1126
  if (fetchMeta == ONLY_META) {
130,641✔
1127
    if (pSubmitTbData->pCreateTbReq != NULL) {
22✔
1128
      if (pRsp->createTableReq == NULL) {
4✔
1129
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
1✔
1130
        if (pRsp->createTableReq == NULL) {
1!
1131
          return terrno;
×
1132
        }
1133
      }
1134
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
8!
1135
        return terrno;
×
1136
      }
1137
      pSubmitTbData->pCreateTbReq = NULL;
4✔
1138
    }
1139
    return 0;
22✔
1140
  }
1141

1142
  int32_t sversion = pSubmitTbData->sver;
130,619✔
1143
  int64_t uid = pSubmitTbData->uid;
130,619✔
1144
  pReader->lastBlkUid = uid;
130,619✔
1145

1146
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
130,619✔
1147
  taosMemoryFreeClear(pReader->extSchema);
130,613!
1148
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema, 0);
130,613✔
1149
  if (pReader->pSchemaWrapper == NULL) {
130,463✔
1150
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
146!
1151
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1152
    pReader->cachedSchemaSuid = 0;
152✔
1153
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
152✔
1154
  }
1155

1156
  if (pSubmitTbData->pCreateTbReq != NULL) {
130,317✔
1157
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
30✔
1158
    if (code != 0) {
30!
1159
      return code;
×
1160
    }
1161
  } else if (rawList != NULL) {
130,287!
1162
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1163
      return terrno;
×
1164
    }
1165
    pReader->pSchemaWrapper = NULL;
×
1166
    return 0;
×
1167
  }
1168

1169
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
130,317✔
1170
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
106✔
1171
  } else {
1172
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
130,211✔
1173
  }
1174
}
1175

1176
int32_t tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList, const char* id) {
1,003✔
1177
  if (pReader == NULL) {
1,003!
1178
    return TSDB_CODE_SUCCESS;
×
1179
  }
1180
  pReader->pColIdList = pColIdList;
1,003✔
1181
  return tqCollectPhysicalTables(pReader, id);
1,003✔
1182
}
1183

1184
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
1,077✔
1185
  if (pReader == NULL || tbUidList == NULL) {
1,077!
1186
    return TSDB_CODE_SUCCESS;
×
1187
  }
1188
  if (pReader->tbIdHash) {
1,077✔
1189
    taosHashClear(pReader->tbIdHash);
14✔
1190
  } else {
1191
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
1,063✔
1192
    if (pReader->tbIdHash == NULL) {
1,065!
1193
      tqError("s-task:%s failed to init hash table", id);
×
1194
      return terrno;
×
1195
    }
1196
  }
1197

1198
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
28,625✔
1199
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
27,535✔
1200
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
27,514!
1201
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
1!
1202
      continue;
×
1203
    }
1204
  }
1205

1206
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
1,079!
1207
  return TSDB_CODE_SUCCESS;
1,079✔
1208
}
1209

1210
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
233✔
1211
  if (pReader == NULL || pTableUidList == NULL) {
233!
1212
    return;
×
1213
  }
1214
  if (pReader->tbIdHash == NULL) {
233!
1215
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1216
    if (pReader->tbIdHash == NULL) {
×
1217
      tqError("failed to init hash table");
×
1218
      return;
×
1219
    }
1220
  }
1221

1222
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
233✔
1223
  for (int i = 0; i < numOfTables; i++) {
459✔
1224
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
226✔
1225
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
226!
1226
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1227
      continue;
×
1228
    }
1229
  }
1230
}
1231

1232
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1233
  if (pReader == NULL) {
×
1234
    return false;
×
1235
  }
1236
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1237
}
1238

1239
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1240
  if (pReader == NULL) {
×
1241
    return false;
×
1242
  }
1243
  return pReader->msg.msgStr == NULL;
×
1244
}
1245

1246
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
3✔
1247
  if (pReader == NULL || tbUidList == NULL) {
3!
1248
    return;
×
1249
  }
1250
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
6✔
1251
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
3✔
1252
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
3!
1253
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1254
    }
1255
  }
1256
}
1257

1258
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
154,914✔
1259
  if (pTq == NULL) {
154,914!
1260
    return 0;  // mounted vnode may have no tq
×
1261
  }
1262
  if (tbUidList == NULL) {
154,914!
1263
    return TSDB_CODE_INVALID_PARA;
×
1264
  }
1265
  void*   pIter = NULL;
154,914✔
1266
  int32_t vgId = TD_VID(pTq->pVnode);
154,914✔
1267

1268
  // update the table list for each consumer handle
1269
  taosWLockLatch(&pTq->lock);
154,914✔
1270
  while (1) {
3,403✔
1271
    pIter = taosHashIterate(pTq->pHandle, pIter);
158,320✔
1272
    if (pIter == NULL) {
158,320✔
1273
      break;
154,916✔
1274
    }
1275

1276
    STqHandle* pTqHandle = (STqHandle*)pIter;
3,404✔
1277
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
3,404✔
1278
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
236✔
1279
      if (code != 0) {
236!
1280
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1281
        continue;
×
1282
      }
1283
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
3,168✔
1284
      if (!isAdd) {
3,153✔
1285
        int32_t sz = taosArrayGetSize(tbUidList);
1,060✔
1286
        for (int32_t i = 0; i < sz; i++) {
1,060!
1287
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1288
          if (tbUid &&
×
1289
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1290
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1291
            continue;
×
1292
          }
1293
        }
1294
      }
1295
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
15✔
1296
      if (isAdd) {
14!
1297
        SArray* list = NULL;
14✔
1298
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
14✔
1299
                                    &list, pTqHandle->execHandle.task);
1300
        if (ret == 0) {
14!
1301
          ret = tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
14✔
1302
        }                            
1303
        if (ret != TDB_CODE_SUCCESS) {
14!
1304
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1305
                  pTqHandle->consumerId);
1306
          taosArrayDestroy(list);
×
1307
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1308
          taosWUnLockLatch(&pTq->lock);
×
1309

1310
          return ret;
×
1311
        }
1312
        taosArrayDestroy(list);
14✔
1313
      } else {
1314
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1315
      }
1316
    }
1317
  }
1318
  taosWUnLockLatch(&pTq->lock);
154,916✔
1319

1320
  // update the table list handle for each stream scanner/wal reader
1321
/* STREAMTODO
1322
  streamMetaWLock(pTq->pStreamMeta);
1323
  while (1) {
1324
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
1325
    if (pIter == NULL) {
1326
      break;
1327
    }
1328

1329
    int64_t      refId = *(int64_t*)pIter;
1330
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
1331
    if (pTask != NULL) {
1332
      int32_t taskId = pTask->id.taskId;
1333

1334
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
1335
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
1336
        if (code != 0) {
1337
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
1338
        }
1339
      }
1340
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
1341
      if (ret) {
1342
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
1343
      }
1344
    }
1345
  }
1346

1347
  streamMetaWUnLock(pTq->pStreamMeta);
1348
*/  
1349
  return 0;
154,908✔
1350
}
1351

1352
static void destroySourceScanTables(void* ptr) {
×
1353
  SArray** pTables = ptr;
×
1354
  if (pTables && *pTables) {
×
1355
    taosArrayDestroy(*pTables);
×
1356
    *pTables = NULL;
×
1357
  }
1358
}
×
1359

1360
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1361
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1362
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1363
  if (pCol1->vColId == pCol2->vColId) {
×
1364
    return 0;
×
1365
  } else if (pCol1->vColId < pCol2->vColId) {
×
1366
    return -1;
×
1367
  } else {
1368
    return 1;
×
1369
  }
1370
}
1371

1372
int32_t tqReaderSetVtableInfo(STqReader* pReader, void* vnode, void* ptr, SSHashObj* pVtableInfos,
×
1373
                              SSDataBlock** ppResBlock, const char* idstr) {
1374
  int32_t            code = TSDB_CODE_SUCCESS;
×
1375
  int32_t            lino = 0;
×
1376
  SStorageAPI*       pAPI = ptr;
×
1377
  SVTSourceScanInfo* pScanInfo = NULL;
×
1378
  SHashObj*          pVirtualTables = NULL;
×
1379
  SMetaReader        metaReader = {0};
×
1380
  SVTColInfo         colInfo = {0};
×
1381
  SSchemaWrapper*    schema = NULL;
×
1382

1383
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1384
  TSDB_CHECK_NULL(vnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1385
  TSDB_CHECK_NULL(pAPI, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1386

1387
  pScanInfo = &pReader->vtSourceScanInfo;
×
1388
  taosHashCleanup(pScanInfo->pVirtualTables);
×
1389
  pScanInfo->pVirtualTables = NULL;
×
1390

1391
  if (tSimpleHashGetSize(pVtableInfos) == 0) {
×
1392
    goto _end;
×
1393
  }
1394

1395
  pVirtualTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1396
  TSDB_CHECK_NULL(pVirtualTables, code, lino, _end, terrno);
×
1397
  taosHashSetFreeFp(pVirtualTables, destroySourceScanTables);
×
1398

1399
  int32_t iter = 0;
×
1400
  void*   px = tSimpleHashIterate(pVtableInfos, NULL, &iter);
×
1401
  while (px != NULL) {
×
1402
    int64_t vTbUid = *(int64_t*)tSimpleHashGetKey(px, NULL);
×
1403
    SArray* pColInfos = taosArrayInit(8, sizeof(SVTColInfo));
×
1404
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, terrno);
×
1405
    code = taosHashPut(pVirtualTables, &vTbUid, sizeof(int64_t), &pColInfos, POINTER_BYTES);
×
1406
    TSDB_CHECK_CODE(code, lino, _end);
×
1407

1408
    SSHashObj* pPhysicalTables = *(SSHashObj**)px;
×
1409
    int32_t    iterIn = 0;
×
1410
    void*      pxIn = tSimpleHashIterate(pPhysicalTables, NULL, &iterIn);
×
1411
    while (pxIn != NULL) {
×
1412
      char* physicalTableName = tSimpleHashGetKey(pxIn, NULL);
×
1413
      pAPI->metaReaderFn.clearReader(&metaReader);
×
1414
      pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1415
      code = pAPI->metaReaderFn.getTableEntryByName(&metaReader, physicalTableName);
×
1416
      TSDB_CHECK_CODE(code, lino, _end);
×
1417
      pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1418
      colInfo.pTbUid = metaReader.me.uid;
×
1419

1420
      switch (metaReader.me.type) {
×
1421
        case TSDB_CHILD_TABLE: {
×
1422
          int64_t suid = metaReader.me.ctbEntry.suid;
×
1423
          pAPI->metaReaderFn.clearReader(&metaReader);
×
1424
          pAPI->metaReaderFn.initReader(&metaReader, vnode, META_READER_LOCK, &pAPI->metaFn);
×
1425
          code = pAPI->metaReaderFn.getTableEntryByUid(&metaReader, suid);
×
1426
          TSDB_CHECK_CODE(code, lino, _end);
×
1427
          pAPI->metaReaderFn.readerReleaseLock(&metaReader);
×
1428
          schema = &metaReader.me.stbEntry.schemaRow;
×
1429
          break;
×
1430
        }
1431
        case TSDB_NORMAL_TABLE: {
×
1432
          schema = &metaReader.me.ntbEntry.schemaRow;
×
1433
          break;
×
1434
        }
1435
        default: {
×
1436
          tqError("invalid table type: %d", metaReader.me.type);
×
1437
          code = TSDB_CODE_INVALID_PARA;
×
1438
          TSDB_CHECK_CODE(code, lino, _end);
×
1439
        }
1440
      }
1441

1442
      SArray* pCols = *(SArray**)pxIn;
×
1443
      int32_t ncols = taosArrayGetSize(pCols);
×
1444
      for (int32_t i = 0; i < ncols; ++i) {
×
1445
        SColIdName* pCol = taosArrayGet(pCols, i);
×
1446
        colInfo.vColId = pCol->colId;
×
1447

1448
        for (int32_t j = 0; j < schema->nCols; ++j) {
×
1449
          if (strncmp(pCol->colName, schema->pSchema[j].name, strlen(schema->pSchema[j].name)) == 0) {
×
1450
            colInfo.pColId = schema->pSchema[j].colId;
×
1451
            void* px = taosArrayPush(pColInfos, &colInfo);
×
1452
            TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1453
            break;
×
1454
          }
1455
        }
1456
      }
1457

1458
      taosArraySort(pColInfos, compareSVTColInfo);
×
1459
      pxIn = tSimpleHashIterate(pPhysicalTables, pxIn, &iterIn);
×
1460
    }
1461

1462
    px = tSimpleHashIterate(pVtableInfos, px, &iter);
×
1463
  }
1464

1465
  pScanInfo->pVirtualTables = pVirtualTables;
×
1466
  pVirtualTables = NULL;
×
1467

1468
  // set the result data block
1469
  if (pReader->pResBlock) {
×
1470
    blockDataDestroy(pReader->pResBlock);
×
1471
  }
1472
  pReader->pResBlock = *ppResBlock;
×
1473
  *ppResBlock = NULL;
×
1474

1475
  // update reader callback for vtable source scan
1476
  pAPI->tqReaderFn.tqNextBlockImpl = tqNextVTableSourceBlockImpl;
×
1477
  pAPI->tqReaderFn.tqReaderIsQueriedTable = tqReaderIsQueriedSourceTable;
×
1478

1479
_end:
×
1480
  if (code != TSDB_CODE_SUCCESS) {
×
1481
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1482
  }
1483
  pAPI->metaReaderFn.clearReader(&metaReader);
×
1484
  if (pVirtualTables != NULL) {
×
1485
    taosHashCleanup(pVirtualTables);
×
1486
  }
1487
  return code;
×
1488
}
1489

1490
static int32_t tqCollectPhysicalTables(STqReader* pReader, const char* idstr) {
1,004✔
1491
  int32_t            code = TSDB_CODE_SUCCESS;
1,004✔
1492
  int32_t            lino = 0;
1,004✔
1493
  SVTSourceScanInfo* pScanInfo = NULL;
1,004✔
1494
  SHashObj*          pVirtualTables = NULL;
1,004✔
1495
  SHashObj*          pPhysicalTables = NULL;
1,004✔
1496
  void*              pIter = NULL;
1,004✔
1497
  void*              px = NULL;
1,004✔
1498

1499
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
1,004!
1500

1501
  pScanInfo = &pReader->vtSourceScanInfo;
1,004✔
1502
  taosHashCleanup(pScanInfo->pPhysicalTables);
1,004✔
1503
  pScanInfo->pPhysicalTables = NULL;
1,001✔
1504
  taosLRUCacheCleanup(pScanInfo->pPhyTblSchemaCache);
1,001✔
1505
  pScanInfo->pPhyTblSchemaCache = NULL;
1,001✔
1506
  pScanInfo->nextVirtualTableIdx = -1;
1,001✔
1507
  pScanInfo->metaFetch = 0;
1,001✔
1508
  pScanInfo->cacheHit = 0;
1,001✔
1509

1510
  pVirtualTables = pScanInfo->pVirtualTables;
1,001✔
1511
  if (taosHashGetSize(pVirtualTables) == 0 || taosArrayGetSize(pReader->pColIdList) == 0) {
1,001!
1512
    goto _end;
1,001✔
1513
  }
1514

1515
  pPhysicalTables = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
×
1516
  TSDB_CHECK_NULL(pPhysicalTables, code, lino, _end, terrno);
×
1517
  taosHashSetFreeFp(pPhysicalTables, destroySourceScanTables);
×
1518

1519
  pIter = taosHashIterate(pVirtualTables, NULL);
×
1520
  while (pIter != NULL) {
×
1521
    int64_t vTbUid = *(int64_t*)taosHashGetKey(pIter, NULL);
×
1522
    SArray* pColInfos = *(SArray**)pIter;
×
1523
    TSDB_CHECK_NULL(pColInfos, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
×
1524

1525
    // Traverse all required columns and collect corresponding physical tables
1526
    int32_t nColInfos = taosArrayGetSize(pColInfos);
×
1527
    int32_t nOutputCols = taosArrayGetSize(pReader->pColIdList);
×
1528
    for (int32_t i = 0, j = 0; i < nColInfos && j < nOutputCols;) {
×
1529
      SVTColInfo* pCol = taosArrayGet(pColInfos, i);
×
1530
      col_id_t    colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j);
×
1531
      if (pCol->vColId < colIdNeed) {
×
1532
        i++;
×
1533
      } else if (pCol->vColId > colIdNeed) {
×
1534
        j++;
×
1535
      } else {
1536
        SArray* pRelatedVTs = NULL;
×
1537
        px = taosHashGet(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t));
×
1538
        if (px == NULL) {
×
1539
          pRelatedVTs = taosArrayInit(8, sizeof(int64_t));
×
1540
          TSDB_CHECK_NULL(pRelatedVTs, code, lino, _end, terrno);
×
1541
          code = taosHashPut(pPhysicalTables, &pCol->pTbUid, sizeof(int64_t), &pRelatedVTs, POINTER_BYTES);
×
1542
          if (code != TSDB_CODE_SUCCESS) {
×
1543
            taosArrayDestroy(pRelatedVTs);
×
1544
            TSDB_CHECK_CODE(code, lino, _end);
×
1545
          }
1546
        } else {
1547
          pRelatedVTs = *(SArray**)px;
×
1548
        }
1549
        if (taosArrayGetSize(pRelatedVTs) == 0 || *(int64_t*)taosArrayGetLast(pRelatedVTs) != vTbUid) {
×
1550
          px = taosArrayPush(pRelatedVTs, &vTbUid);
×
1551
          TSDB_CHECK_NULL(px, code, lino, _end, terrno);
×
1552
        }
1553
        i++;
×
1554
        j++;
×
1555
      }
1556
    }
1557
    pIter = taosHashIterate(pVirtualTables, pIter);
×
1558
  }
1559

1560
  pScanInfo->pPhysicalTables = pPhysicalTables;
×
1561
  pPhysicalTables = NULL;
×
1562

1563
  if (taosHashGetSize(pScanInfo->pPhysicalTables) > 0) {
×
1564
    pScanInfo->pPhyTblSchemaCache = taosLRUCacheInit(1024 * 128, -1, .5);
×
1565
    TSDB_CHECK_NULL(pScanInfo->pPhyTblSchemaCache, code, lino, _end, terrno);
×
1566
  }
1567

1568
_end:
×
1569
  if (code != TSDB_CODE_SUCCESS) {
1,001!
1570
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1571
  }
1572
  if (pIter != NULL) {
1,001!
1573
    taosHashCancelIterate(pReader->tbIdHash, pIter);
×
1574
  }
1575
  if (pPhysicalTables != NULL) {
1,001!
1576
    taosHashCleanup(pPhysicalTables);
×
1577
  }
1578
  return code;
1,001✔
1579
}
1580

1581
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1582
  if (value) {
×
1583
    SSchemaWrapper* pSchemaWrapper = value;
×
1584
    tDeleteSchemaWrapper(pSchemaWrapper);
1585
  }
1586
}
×
1587

1588
bool tqNextVTableSourceBlockImpl(STqReader* pReader, const char* idstr) {
×
1589
  int32_t            code = TSDB_CODE_SUCCESS;
×
1590
  int32_t            lino = 0;
×
1591
  SVTSourceScanInfo* pScanInfo = NULL;
×
1592

1593
  TSDB_CHECK_NULL(pReader, code, lino, _end, TSDB_CODE_INVALID_PARA);
×
1594

1595
  pScanInfo = &pReader->vtSourceScanInfo;
×
1596
  if (pReader->msg.msgStr == NULL || taosHashGetSize(pScanInfo->pPhysicalTables) == 0) {
×
1597
    return false;
×
1598
  }
1599

1600
  if (pScanInfo->nextVirtualTableIdx >= 0) {
×
1601
    // The data still needs to be converted into the virtual table result block
1602
    return true;
×
1603
  }
1604

1605
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
1606
  while (pReader->nextBlk < blockSz) {
×
1607
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
1608
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, _end, terrno);
×
1609
    int64_t pTbUid = pSubmitTbData->uid;
×
1610
    void*   px = taosHashGet(pScanInfo->pPhysicalTables, &pTbUid, sizeof(int64_t));
×
1611
    if (px != NULL) {
×
1612
      SArray* pRelatedVTs = *(SArray**)px;
×
1613
      if (taosArrayGetSize(pRelatedVTs) > 0) {
×
1614
        pScanInfo->nextVirtualTableIdx = 0;
×
1615
        return true;
×
1616
      }
1617
    }
1618
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, pTbUid);
×
1619
    pReader->nextBlk++;
×
1620
  }
1621

1622
  tqReaderClearSubmitMsg(pReader);
×
1623
  tqTrace("iterator data block end, total block num:%d", blockSz);
×
1624

1625
_end:
×
1626
  if (code != TSDB_CODE_SUCCESS) {
×
1627
    tqError("%s failed at line %d since %s, id: %s", __func__, lino, tstrerror(code), idstr);
×
1628
  }
1629
  return false;
×
1630
}
1631

1632
bool tqReaderIsQueriedSourceTable(STqReader* pReader, uint64_t uid) {
×
1633
  if (pReader == NULL) {
×
1634
    return false;
×
1635
  }
1636
  return taosHashGet(pReader->vtSourceScanInfo.pPhysicalTables, &uid, sizeof(uint64_t)) != NULL;
×
1637
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc