• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3558

17 Dec 2024 06:05AM UTC coverage: 59.778% (+1.6%) from 58.204%
#3558

push

travis-ci

web-flow
Merge pull request #29179 from taosdata/merge/mainto3.0

merge: form main to 3.0 branch

132787 of 287595 branches covered (46.17%)

Branch coverage included in aggregate %.

104 of 191 new or added lines in 5 files covered. (54.45%)

6085 existing lines in 168 files now uncovered.

209348 of 284746 relevant lines covered (73.52%)

8164844.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.2
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
559✔
20
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
559✔
21
    return true;
555✔
22
  }
23

24
  int16_t msgType = pHead->msgType;
4✔
25
  char*   body = pHead->body;
4✔
26
  int32_t bodyLen = pHead->bodyLen;
4✔
27

28
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
4✔
29
  int64_t  realTbSuid = 0;
4✔
30
  SDecoder dcoder = {0};
4✔
31
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
4✔
32
  int32_t  len = bodyLen - sizeof(SMsgHead);
4✔
33
  tDecoderInit(&dcoder, data, len);
4✔
34

35
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
4!
36
    SVCreateStbReq req = {0};
2✔
37
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
2!
38
      goto end;
×
39
    }
40
    realTbSuid = req.suid;
2✔
41
  } else if (msgType == TDMT_VND_DROP_STB) {
2!
42
    SVDropStbReq req = {0};
×
43
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
44
      goto end;
×
45
    }
46
    realTbSuid = req.suid;
×
47
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
2!
48
    SVCreateTbBatchReq req = {0};
2✔
49
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
2!
50
      goto end;
×
51
    }
52

53
    int32_t        needRebuild = 0;
2✔
54
    SVCreateTbReq* pCreateReq = NULL;
2✔
55
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
4✔
56
      pCreateReq = req.pReqs + iReq;
2✔
57
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
2!
58
        needRebuild++;
×
59
      }
60
    }
61
    if (needRebuild == 0) {
2!
62
      // do nothing
63
    } else if (needRebuild == req.nReqs) {
×
64
      realTbSuid = tbSuid;
×
65
    } else {
66
      realTbSuid = tbSuid;
×
67
      SVCreateTbBatchReq reqNew = {0};
×
68
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
69
      if (reqNew.pArray == NULL) {
×
70
        tDeleteSVCreateTbBatchReq(&req);
×
71
        goto end;
×
72
      }
73
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
74
        pCreateReq = req.pReqs + iReq;
×
75
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
76
          reqNew.nReqs++;
×
77
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
78
            taosArrayDestroy(reqNew.pArray);
×
79
            tDeleteSVCreateTbBatchReq(&req);
×
80
            goto end;
×
81
          }
82
        }
83
      }
84

85
      int     tlen = 0;
×
86
      int32_t ret = 0;
×
87
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
88
      void* buf = taosMemoryMalloc(tlen);
×
89
      if (NULL == buf) {
×
90
        taosArrayDestroy(reqNew.pArray);
×
91
        tDeleteSVCreateTbBatchReq(&req);
×
92
        goto end;
×
93
      }
94
      SEncoder coderNew = {0};
×
95
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
96
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
97
      tEncoderClear(&coderNew);
×
98
      if (ret < 0) {
×
99
        taosMemoryFree(buf);
×
100
        taosArrayDestroy(reqNew.pArray);
×
101
        tDeleteSVCreateTbBatchReq(&req);
×
102
        goto end;
×
103
      }
104
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
105
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
106
      taosMemoryFree(buf);
×
107
      taosArrayDestroy(reqNew.pArray);
×
108
    }
109

110
    tDeleteSVCreateTbBatchReq(&req);
2✔
111
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
112
    SVAlterTbReq req = {0};
×
113

114
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
115
      goto end;
×
116
    }
117

118
    SMetaReader mr = {0};
×
119
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
120

121
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
122
      metaReaderClear(&mr);
×
123
      goto end;
×
124
    }
125
    realTbSuid = mr.me.ctbEntry.suid;
×
126
    metaReaderClear(&mr);
×
127
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
128
    SVDropTbBatchReq req = {0};
×
129

130
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
131
      goto end;
×
132
    }
133

134
    int32_t      needRebuild = 0;
×
135
    SVDropTbReq* pDropReq = NULL;
×
136
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
137
      pDropReq = req.pReqs + iReq;
×
138

139
      if (pDropReq->suid == tbSuid) {
×
140
        needRebuild++;
×
141
      }
142
    }
143
    if (needRebuild == 0) {
×
144
      // do nothing
145
    } else if (needRebuild == req.nReqs) {
×
146
      realTbSuid = tbSuid;
×
147
    } else {
148
      realTbSuid = tbSuid;
×
149
      SVDropTbBatchReq reqNew = {0};
×
150
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
151
      if (reqNew.pArray == NULL) {
×
152
        goto end;
×
153
      }
154
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
155
        pDropReq = req.pReqs + iReq;
×
156
        if (pDropReq->suid == tbSuid) {
×
157
          reqNew.nReqs++;
×
158
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
159
            taosArrayDestroy(reqNew.pArray);
×
160
            goto end;
×
161
          }
162
        }
163
      }
164

165
      int     tlen = 0;
×
166
      int32_t ret = 0;
×
167
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
168
      void* buf = taosMemoryMalloc(tlen);
×
169
      if (NULL == buf) {
×
170
        taosArrayDestroy(reqNew.pArray);
×
171
        goto end;
×
172
      }
173
      SEncoder coderNew = {0};
×
174
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
175
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
176
      tEncoderClear(&coderNew);
×
177
      if (ret != 0) {
×
178
        taosMemoryFree(buf);
×
179
        taosArrayDestroy(reqNew.pArray);
×
180
        goto end;
×
181
      }
182
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
183
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
184
      taosMemoryFree(buf);
×
185
      taosArrayDestroy(reqNew.pArray);
×
186
    }
187
  } else if (msgType == TDMT_VND_DELETE) {
×
188
    SDeleteRes req = {0};
×
189
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
190
      goto end;
×
191
    }
192
    realTbSuid = req.suid;
×
193
  }
194

195
end:
×
196
  tDecoderClear(&dcoder);
4✔
197
  return tbSuid == realTbSuid;
4✔
198
}
199

200
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
130,792✔
201
  int32_t code = -1;
130,792✔
202
  int32_t vgId = TD_VID(pTq->pVnode);
130,792✔
203
  int64_t id = pHandle->pWalReader->readerId;
130,792✔
204

205
  int64_t offset = *fetchOffset;
130,792✔
206
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
130,792✔
207
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
130,849✔
208
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
130,871✔
209

210
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
130,875!
211
          ", 0x%" PRIx64,
212
          vgId, offset, lastVer, committedVer, appliedVer, id);
213

214
  while (offset <= appliedVer) {
138,800✔
215
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
129,900!
216
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
217
              ", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
218
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
219
      goto END;
×
220
    }
221

222
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
129,894!
223
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
224

225
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
129,901✔
226
      code = walFetchBody(pHandle->pWalReader);
121,551✔
227
      goto END;
121,537✔
228
    } else {
229
      if (pHandle->fetchMeta != WITH_DATA) {
8,350✔
230
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
714✔
231
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
714✔
232
          code = walFetchBody(pHandle->pWalReader);
559✔
233
          if (code < 0) {
559!
234
            goto END;
×
235
          }
236

237
          pHead = &(pHandle->pWalReader->pHead->head);
559✔
238
          if (isValValidForTable(pHandle, pHead)) {
559✔
239
            code = 0;
556✔
240
            goto END;
556✔
241
          } else {
242
            offset++;
3✔
243
            code = -1;
3✔
244
            continue;
3✔
245
          }
246
        }
247
      }
248
      code = walSkipFetchBody(pHandle->pWalReader);
7,791✔
249
      if (code < 0) {
7,792!
250
        goto END;
×
251
      }
252
      offset++;
7,792✔
253
    }
254
    code = -1;
7,792✔
255
  }
256

257
END:
8,900✔
258
  *fetchOffset = offset;
130,993✔
259
  return code;
130,993✔
260
}
261

262
bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; }
19,997✔
263

264
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
176✔
265
  bool            ret = false;
176✔
266
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
176✔
267
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
176!
268
    ret = true;
3✔
269
  }
270
  tDeleteSchemaWrapper(schema);
271
  pReader->hasPrimaryKey = ret;
176✔
272
}
176✔
273

274
STqReader* tqReaderOpen(SVnode* pVnode) {
8,240✔
275
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
8,240!
276
  if (pReader == NULL) {
8,251!
277
    return NULL;
×
278
  }
279

280
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
8,251✔
281
  if (pReader->pWalReader == NULL) {
8,246!
282
    taosMemoryFree(pReader);
×
283
    return NULL;
×
284
  }
285

286
  pReader->pVnodeMeta = pVnode->pMeta;
8,246✔
287
  pReader->pColIdList = NULL;
8,246✔
288
  pReader->cachedSchemaVer = 0;
8,246✔
289
  pReader->cachedSchemaSuid = 0;
8,246✔
290
  pReader->pSchemaWrapper = NULL;
8,246✔
291
  pReader->tbIdHash = NULL;
8,246✔
292
  pReader->pResBlock = NULL;
8,246✔
293

294
  int32_t code = createDataBlock(&pReader->pResBlock);
8,246✔
295
  if (code) {
8,249!
UNCOV
296
    terrno = code;
×
297
  }
298

299
  return pReader;
8,250✔
300
}
301

302
void tqReaderClose(STqReader* pReader) {
8,165✔
303
  if (pReader == NULL) return;
8,165✔
304

305
  // close wal reader
306
  if (pReader->pWalReader) {
8,125!
307
    walCloseReader(pReader->pWalReader);
8,126✔
308
  }
309

310
  if (pReader->pSchemaWrapper) {
8,123✔
311
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,878!
312
  }
313

314
  if (pReader->pColIdList) {
8,124✔
315
    taosArrayDestroy(pReader->pColIdList);
7,803✔
316
  }
317

318
  // free hash
319
  blockDataDestroy(pReader->pResBlock);
8,126✔
320
  taosHashCleanup(pReader->tbIdHash);
8,125✔
321
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
8,126✔
322
  taosMemoryFree(pReader);
8,126!
323
}
324

325
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
5,663✔
326
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
5,663✔
327
    return -1;
140✔
328
  }
329
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
5,521!
330
  return 0;
5,523✔
331
}
332

333
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
435,191✔
334
  int32_t code = 0;
435,191✔
335

336
  while (1) {
7,859✔
337
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
443,050✔
338

339
    SWalCont* pCont = &pReader->pHead->head;
379,603✔
340
    int64_t   ver = pCont->version;
379,603✔
341
    if (ver > maxVer) {
379,603✔
342
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
257✔
343
      return TSDB_CODE_SUCCESS;
257✔
344
    }
345

346
    if (pCont->msgType == TDMT_VND_SUBMIT) {
379,346✔
347
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
368,864✔
348
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
368,864✔
349

350
      void* data = taosMemoryMalloc(len);
368,864!
351
      if (data == NULL) {
368,864!
352
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
353
        // retry
354
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
355
        return terrno;
×
356
      }
357

358
      (void)memcpy(data, pBody, len);
368,864✔
359
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
368,864✔
360

361
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
368,864✔
362
      if (code != 0) {
368,872!
363
        tqError("%s failed to create data submit for stream since out of memory", id);
×
364
        return code;
×
365
      }
366
    } else if (pCont->msgType == TDMT_VND_DELETE) {
10,482✔
367
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
10,302✔
368
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
10,302✔
369
      EStreamType blockType = STREAM_DELETE_DATA;
10,302✔
370
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
10,302✔
371
      if (code == TSDB_CODE_SUCCESS) {
10,300!
372
        if (*pItem == NULL) {
10,300✔
373
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
7,859✔
374
          // we need to continue check next data in the wal files.
375
          continue;
7,859✔
376
        } else {
377
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
2,441✔
378
        }
379
      } else {
380
        terrno = code;
×
381
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
382
        return code;
×
383
      }
384

385
    } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
365!
386
      void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
185✔
387
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
185✔
388
      code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
185✔
389
      if (TSDB_CODE_SUCCESS == code) {
185!
390
        if (!*pItem) {
185!
391
          continue;
×
392
        } else {
393
          tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
185!
394
        }
395
      } else {
396
        terrno = code;
×
397
        return code;
×
398
      }
399
    } else {
400
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
401
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
402
    }
403

404
    return code;
371,497✔
405
  }
406
}
407

408
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
544,394✔
409
  SWalReader* pWalReader = pReader->pWalReader;
544,394✔
410

411
  int64_t st = taosGetTimestampMs();
544,395✔
412
  while (1) {
506,109✔
413
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
1,050,504✔
414
    while (pReader->nextBlk < numOfBlocks) {
1,316,497✔
415
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
752,264✔
416
              pReader->msg.ver);
417

418
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
752,264✔
419
      if (pSubmitTbData == NULL) {
752,258!
420
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
421
                pReader->msg.ver);
422
        return false;
×
423
      }
424
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
752,263✔
425
        pReader->nextBlk += 1;
110✔
426
        continue;
110✔
427
      }
428
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
752,152!
429
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
486,266✔
430
        SSDataBlock* pRes = NULL;
486,266✔
431
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
486,266✔
432
        if (code == TSDB_CODE_SUCCESS) {
486,260!
433
          return true;
486,261✔
434
        }
435
      } else {
436
        pReader->nextBlk += 1;
265,916✔
437
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
265,916!
438
      }
439
    }
440

441
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
564,233✔
442
    pReader->msg.msgStr = NULL;
564,243✔
443

444
    int64_t elapsed = taosGetTimestampMs() - st;
564,242✔
445
    if (elapsed > 1000 || elapsed < 0) {
564,242!
446
      return false;
×
447
    }
448

449
    // try next message in wal file
450
    if (walNextValidMsg(pWalReader) < 0) {
564,244✔
451
      return false;
58,126✔
452
    }
453

454
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
506,097✔
455
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
506,097✔
456
    int64_t ver = pWalReader->pHead->head.version;
506,097✔
457
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) {
506,097!
458
      return false;
×
459
    }
460
    pReader->nextBlk = 0;
506,109✔
461
  }
462
}
463

464
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
996,593✔
465
  pReader->msg.msgStr = msgStr;
996,593✔
466
  pReader->msg.msgLen = msgLen;
996,593✔
467
  pReader->msg.ver = ver;
996,593✔
468

469
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
996,593✔
470
  SDecoder decoder = {0};
996,654✔
471

472
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
996,654✔
473
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
996,627✔
474
  if (code != 0) {
996,533!
475
    tDecoderClear(&decoder);
×
476
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
477
    return code;
×
478
  }
479

480
  tDecoderClear(&decoder);
996,533✔
481
  return 0;
996,616✔
482
}
483

484
SWalReader* tqGetWalReader(STqReader* pReader) { return pReader->pWalReader; }
609,546✔
485

486
SSDataBlock* tqGetResultBlock(STqReader* pReader) { return pReader->pResBlock; }
544,380✔
487

488
int64_t tqGetResultBlockTime(STqReader* pReader) { return pReader->lastTs; }
544,372✔
489

490
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
737,708✔
491
  if (pReader->msg.msgStr == NULL) {
737,708!
492
    return false;
×
493
  }
494

495
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
737,708✔
496
  while (pReader->nextBlk < numOfBlocks) {
839,581✔
497
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
433,978✔
498
            (pReader->nextBlk + 1), numOfBlocks, idstr);
499

500
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
433,982✔
501
    if (pSubmitTbData == NULL) {
433,975!
502
      return false;
×
503
    }
504
    if (pReader->tbIdHash == NULL) {
433,975!
505
      return true;
×
506
    }
507

508
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
433,975✔
509
    if (ret != NULL) {
434,021✔
510
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
332,164✔
511
      return true;
332,164✔
512
    } else {
513
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
101,857✔
514
              taosHashGetSize(pReader->tbIdHash), idstr);
515
    }
516

517
    pReader->nextBlk++;
101,870✔
518
  }
519

520
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
405,603✔
521
  pReader->nextBlk = 0;
405,658✔
522
  pReader->msg.msgStr = NULL;
405,658✔
523

524
  return false;
405,658✔
525
}
526

527
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
169,661✔
528
  if (pReader->msg.msgStr == NULL) return false;
169,661!
529

530
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
169,661✔
531
  while (pReader->nextBlk < blockSz) {
169,643✔
532
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
84,931✔
533
    if (pSubmitTbData == NULL) return false;
84,929!
534
    if (filterOutUids == NULL) return true;
84,929!
535

536
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
84,929✔
537
    if (ret == NULL) {
84,925!
538
      return true;
84,926✔
539
    }
UNCOV
540
    pReader->nextBlk++;
×
541
  }
542

543
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
84,712✔
544
  pReader->nextBlk = 0;
84,828✔
545
  pReader->msg.msgStr = NULL;
84,828✔
546

547
  return false;
84,828✔
548
}
549

550
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
117,534✔
551
  int32_t code = 0;
117,534✔
552

553
  int32_t cnt = 0;
117,534✔
554
  for (int32_t i = 0; i < pSrc->nCols; i++) {
700,852✔
555
    cnt += mask[i];
583,318✔
556
  }
557

558
  pDst->nCols = cnt;
117,534✔
559
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
117,534!
560
  if (pDst->pSchema == NULL) {
117,547!
561
    return TAOS_GET_TERRNO(terrno);
×
562
  }
563

564
  int32_t j = 0;
117,547✔
565
  for (int32_t i = 0; i < pSrc->nCols; i++) {
700,088✔
566
    if (mask[i]) {
582,497✔
567
      pDst->pSchema[j++] = pSrc->pSchema[i];
582,477✔
568
      SColumnInfoData colInfo =
569
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
582,477✔
570
      code = blockDataAppendColInfo(pBlock, &colInfo);
583,198✔
571
      if (code != 0) {
582,521!
572
        return code;
×
573
      }
574
    }
575
  }
576
  return 0;
117,591✔
577
}
578

579
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
1,755✔
580
  SSDataBlock* pBlock = pReader->pResBlock;
1,755✔
581
  if (blockDataGetNumOfCols(pBlock) > 0) {
1,755✔
582
      blockDataDestroy(pBlock);
1✔
583
      int32_t code = createDataBlock(&pReader->pResBlock);
1✔
584
      if (code) {
1!
585
        return code;
×
586
      }
587
      pBlock = pReader->pResBlock;
1✔
588

589
      pBlock->info.id.uid = pReader->cachedSchemaUid;
1✔
590
      pBlock->info.version = pReader->msg.ver;
1✔
591
  }
592

593
  int32_t numOfCols = taosArrayGetSize(pColIdList);
1,756✔
594

595
  if (numOfCols == 0) {  // all columns are required
1,757!
596
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
597
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
598
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
599

600
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
601
      if (code != TSDB_CODE_SUCCESS) {
×
602
        blockDataFreeRes(pBlock);
×
603
        return terrno;
×
604
      }
605
    }
606
  } else {
607
    if (numOfCols > pSchema->nCols) {
1,757✔
608
      numOfCols = pSchema->nCols;
1✔
609
    }
610

611
    int32_t i = 0;
1,757✔
612
    int32_t j = 0;
1,757✔
613
    while (i < pSchema->nCols && j < numOfCols) {
14,481✔
614
      SSchema* pColSchema = &pSchema->pSchema[i];
12,714✔
615
      col_id_t colIdSchema = pColSchema->colId;
12,714✔
616

617
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
12,714✔
618
      if (pColIdNeed == NULL) {
12,716!
619
        break;
×
620
      }
621
      if (colIdSchema < *pColIdNeed) {
12,716✔
622
        i++;
1,631✔
623
      } else if (colIdSchema > *pColIdNeed) {
11,085!
624
        j++;
×
625
      } else {
626
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
11,085✔
627
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
11,120✔
628
        if (code != TSDB_CODE_SUCCESS) {
11,093!
629
          return -1;
×
630
        }
631
        i++;
11,093✔
632
        j++;
11,093✔
633
      }
634
    }
635
  }
636

637
  return TSDB_CODE_SUCCESS;
1,767✔
638
}
639

640
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
163,365,875✔
641
  int32_t code = TSDB_CODE_SUCCESS;
163,365,875✔
642

643
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
172,063,978!
644
    char val[65535 + 2] = {0};
8,161,388✔
645
    if (COL_VAL_IS_VALUE(pColVal)) {
8,161,388!
646
      if (pColVal->value.pData != NULL) {
8,710,038!
647
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
8,711,692✔
648
      }
649
      varDataSetLen(val, pColVal->value.nData);
8,710,038✔
650
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
8,710,038✔
651
    } else {
652
      colDataSetNULL(pColumnInfoData, rowIndex);
×
653
    }
654
  } else {
655
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
155,204,487✔
656
  }
657

658
  return code;
163,486,065✔
659
}
660

661
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
785,235✔
662
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
785,235✔
663
  int32_t        code = 0;
785,235✔
664
  int32_t        line = 0;
785,235✔
665
  STSchema*      pTSchema = NULL;
785,235✔
666
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
785,235✔
667
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
785,230!
668
  SSDataBlock* pBlock = pReader->pResBlock;
785,230✔
669
  *pRes = pBlock;
785,230✔
670

671
  blockDataCleanup(pBlock);
785,230✔
672

673
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
785,190✔
674
  int32_t sversion = pSubmitTbData->sver;
785,190✔
675
  int64_t suid = pSubmitTbData->suid;
785,190✔
676
  int64_t uid = pSubmitTbData->uid;
785,190✔
677
  pReader->lastTs = pSubmitTbData->ctimeMs;
785,190✔
678

679
  pBlock->info.id.uid = uid;
785,190✔
680
  pBlock->info.version = pReader->msg.ver;
785,190✔
681

682
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
785,190✔
683
      (pReader->cachedSchemaVer != sversion)) {
783,429✔
684
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,783✔
685

686
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
1,765✔
687
    if (pReader->pSchemaWrapper == NULL) {
1,766✔
688
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
9✔
689
             "version %d, possibly dropped table",
690
             vgId, suid, uid, pReader->cachedSchemaVer);
691
      pReader->cachedSchemaSuid = 0;
8✔
692
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
8✔
693
    }
694

695
    pReader->cachedSchemaUid = uid;
1,757✔
696
    pReader->cachedSchemaSuid = suid;
1,757✔
697
    pReader->cachedSchemaVer = sversion;
1,757✔
698

699
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
1,757!
700
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
701
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
702
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
703
    }
704
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
1,757✔
705
    TSDB_CHECK_CODE(code, line, END);
1,757!
706
    pBlock = pReader->pResBlock;
1,757✔
707
    *pRes = pBlock;
1,757✔
708
  }
709

710
  int32_t numOfRows = 0;
785,164✔
711
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
785,164✔
712
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
4✔
713
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
4!
714
    numOfRows = pCol->nVal;
4✔
715
  } else {
716
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
785,160✔
717
  }
718

719
  code = blockDataEnsureCapacity(pBlock, numOfRows);
785,165✔
720
  TSDB_CHECK_CODE(code, line, END);
785,169!
721
  pBlock->info.rows = numOfRows;
785,169✔
722
  int32_t colActual = blockDataGetNumOfCols(pBlock);
785,169✔
723

724
  // convert and scan one block
725
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
785,169✔
726
    SArray* pCols = pSubmitTbData->aCol;
4✔
727
    int32_t numOfCols = taosArrayGetSize(pCols);
4✔
728
    int32_t targetIdx = 0;
4✔
729
    int32_t sourceIdx = 0;
4✔
730
    while (targetIdx < colActual) {
18✔
731
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
14✔
732
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
14!
733
      if (sourceIdx >= numOfCols) {
14✔
734
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
4!
735
        colDataSetNNULL(pColData, 0, numOfRows);
4!
736
        targetIdx++;
4✔
737
        continue;
4✔
738
      }
739

740
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
10✔
741
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
10!
742
      SColVal colVal = {0};
10✔
743
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
10!
744
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
745
      if (pCol->cid < pColData->info.colId) {
10✔
746
        sourceIdx++;
4✔
747
      } else if (pCol->cid == pColData->info.colId) {
6✔
748
        for (int32_t i = 0; i < pCol->nVal; i++) {
12✔
749
          tColDataGetValue(pCol, i, &colVal);
8✔
750
          code = doSetVal(pColData, i, &colVal);
8✔
751
          TSDB_CHECK_CODE(code, line, END);
8!
752
        }
753
        sourceIdx++;
4✔
754
        targetIdx++;
4✔
755
      } else {
756
        colDataSetNNULL(pColData, 0, numOfRows);
2!
757
        targetIdx++;
2✔
758
      }
759
    }
760
  } else {
761
    SArray*         pRows = pSubmitTbData->aRowP;
785,165✔
762
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
785,165✔
763
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
785,165✔
764
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
785,194!
765

766
    for (int32_t i = 0; i < numOfRows; i++) {
54,954,408✔
767
      SRow* pRow = taosArrayGetP(pRows, i);
54,532,207✔
768
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
54,501,215✔
769
      int32_t sourceIdx = 0;
54,499,882✔
770
      for (int32_t j = 0; j < colActual; j++) {
199,834,714✔
771
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
145,665,500✔
772
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
145,582,842!
773

774
        while (1) {
19,973,315✔
775
          SColVal colVal = {0};
165,556,157✔
776
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
165,556,157✔
777
          TSDB_CHECK_CODE(code, line, END);
165,146,062!
778

779
          if (colVal.cid < pColData->info.colId) {
165,146,062✔
780
            sourceIdx++;
19,973,315✔
781
            continue;
19,973,315✔
782
          } else if (colVal.cid == pColData->info.colId) {
145,172,747!
783
            code = doSetVal(pColData, i, &colVal);
145,740,681✔
784
            TSDB_CHECK_CODE(code, line, END);
145,902,766!
785
            sourceIdx++;
145,902,766✔
786
            break;
145,334,832✔
787
          } else {
788
            colDataSetNULL(pColData, i);
×
789
            break;
×
790
          }
791
        }
792
      }
793
    }
794
  }
795

796
END:
422,201✔
797
  if (code != 0) {
422,205!
798
    tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code);
×
799
  }
800
  taosMemoryFreeClear(pTSchema);
785,195!
801
  return code;
785,232✔
802
}
803

804
#define PROCESS_VAL                                      \
805
  if (curRow == 0) {                                     \
806
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
807
    buildNew = true;                                     \
808
  } else {                                               \
809
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
810
    if (currentRowAssigned != assigned[j]) {             \
811
      assigned[j] = currentRowAssigned;                  \
812
      buildNew = true;                                   \
813
    }                                                    \
814
  }
815

816
#define SET_DATA                                                     \
817
  if (colVal.cid < pColData->info.colId) {                           \
818
    sourceIdx++;                                                     \
819
  } else if (colVal.cid == pColData->info.colId) {                   \
820
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
821
    sourceIdx++;                                                     \
822
    targetIdx++;                                                     \
823
  }
824

825
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
117,459✔
826
                               SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
827
                               int32_t* lastRow) {
828
  int32_t         code = 0;
117,459✔
829
  SSchemaWrapper* pSW = NULL;
117,459✔
830
  SSDataBlock*    block = NULL;
117,459✔
831
  if (taosArrayGetSize(blocks) > 0) {
117,459!
832
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
833
    TQ_NULL_GO_TO_END(pLastBlock);
×
834
    pLastBlock->info.rows = curRow - *lastRow;
×
835
    *lastRow = curRow;
×
836
  }
837

838
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
117,484!
839
  TQ_NULL_GO_TO_END(block);
117,568!
840

841
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
117,568!
842
  TQ_NULL_GO_TO_END(pSW);
117,564!
843

844
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
117,564!
845
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
117,561!
846
          (int32_t)taosArrayGetSize(block->pDataBlock));
847

848
  block->info.id.uid = pSubmitTbData->uid;
117,561✔
849
  block->info.version = pReader->msg.ver;
117,561✔
850
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
117,561!
851
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
117,540!
852
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
117,502!
853
  pSW = NULL;
117,502✔
854
  taosMemoryFreeClear(block);
117,502!
855

856
END:
×
857
  tDeleteSchemaWrapper(pSW);
117,571!
858
  blockDataFreeRes(block);
117,560✔
859
  taosMemoryFree(block);
117,549!
860
  return code;
117,553✔
861
}
862
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
113✔
863
  int32_t code = 0;
113✔
864
  int32_t curRow = 0;
113✔
865
  int32_t lastRow = 0;
113✔
866

867
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
113✔
868
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
113!
869
  TQ_NULL_GO_TO_END(assigned);
113!
870

871
  SArray*   pCols = pSubmitTbData->aCol;
113✔
872
  SColData* pCol = taosArrayGet(pCols, 0);
113✔
873
  TQ_NULL_GO_TO_END(pCol);
113!
874
  int32_t numOfRows = pCol->nVal;
113✔
875
  int32_t numOfCols = taosArrayGetSize(pCols);
113✔
876
  for (int32_t i = 0; i < numOfRows; i++) {
298✔
877
    bool buildNew = false;
185✔
878

879
    for (int32_t j = 0; j < numOfCols; j++) {
952✔
880
      pCol = taosArrayGet(pCols, j);
766✔
881
      TQ_NULL_GO_TO_END(pCol);
765!
882
      SColVal colVal = {0};
765✔
883
      tColDataGetValue(pCol, i, &colVal);
765✔
884
      PROCESS_VAL
767!
885
    }
886

887
    if (buildNew) {
186✔
888
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
113!
889
                                       curRow, &lastRow));
890
    }
891

892
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
186✔
893
    TQ_NULL_GO_TO_END(pBlock);
185!
894

895
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
185!
896
            (int32_t)taosArrayGetSize(blocks));
897

898
    int32_t targetIdx = 0;
185✔
899
    int32_t sourceIdx = 0;
185✔
900
    int32_t colActual = blockDataGetNumOfCols(pBlock);
185✔
901
    while (targetIdx < colActual) {
946✔
902
      pCol = taosArrayGet(pCols, sourceIdx);
761✔
903
      TQ_NULL_GO_TO_END(pCol);
761!
904
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
761✔
905
      TQ_NULL_GO_TO_END(pColData);
761!
906
      SColVal colVal = {0};
761✔
907
      tColDataGetValue(pCol, i, &colVal);
761✔
908
      SET_DATA
761!
909
    }
910

911
    curRow++;
185✔
912
  }
913
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
113✔
914
  pLastBlock->info.rows = curRow - lastRow;
113✔
915

916
END:
113✔
917
  taosMemoryFree(assigned);
113!
918
  return code;
113✔
919
}
920

921
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
117,476✔
922
  int32_t   code = 0;
117,476✔
923
  STSchema* pTSchema = NULL;
117,476✔
924

925
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
117,476✔
926
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
117,476!
927
  TQ_NULL_GO_TO_END(assigned);
117,490!
928

929
  int32_t curRow = 0;
117,490✔
930
  int32_t lastRow = 0;
117,490✔
931
  SArray* pRows = pSubmitTbData->aRowP;
117,490✔
932
  int32_t numOfRows = taosArrayGetSize(pRows);
117,490✔
933
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
117,464✔
934

935
  for (int32_t i = 0; i < numOfRows; i++) {
3,890,520✔
936
    bool  buildNew = false;
3,773,216✔
937
    SRow* pRow = taosArrayGetP(pRows, i);
3,773,216✔
938
    TQ_NULL_GO_TO_END(pRow);
3,764,794!
939

940
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
21,389,862✔
941
      SColVal colVal = {0};
17,649,745✔
942
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
17,649,745!
943
      PROCESS_VAL
17,625,703!
944
    }
945

946
    if (buildNew) {
3,740,117✔
947
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
117,370!
948
                                       curRow, &lastRow));
949
    }
950

951
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
3,740,174✔
952
    TQ_NULL_GO_TO_END(pBlock);
3,761,311!
953

954
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
3,761,311!
955
            (int32_t)taosArrayGetSize(blocks));
956

957
    int32_t targetIdx = 0;
3,761,311✔
958
    int32_t sourceIdx = 0;
3,761,311✔
959
    int32_t colActual = blockDataGetNumOfCols(pBlock);
3,761,311✔
960
    while (targetIdx < colActual) {
21,360,018✔
961
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
17,557,449✔
962
      SColVal          colVal = {0};
17,521,569✔
963
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
17,521,569!
964
      SET_DATA
17,439,871!
965
    }
966

967
    curRow++;
3,802,569✔
968
  }
969
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
117,304✔
970
  pLastBlock->info.rows = curRow - lastRow;
117,283✔
971

972
END:
117,283✔
973
  taosMemoryFreeClear(pTSchema);
117,283!
974
  taosMemoryFree(assigned);
117,449!
975
  return code;
117,398✔
976
}
977

978
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
118,115✔
979
  tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
118,115!
980
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
118,115✔
981
  if (pSubmitTbData == NULL) {
118,111!
982
    return terrno;
×
983
  }
984
  pReader->nextBlk++;
118,111✔
985

986
  if (pSubmitTbDataRet) {
118,111!
987
    *pSubmitTbDataRet = pSubmitTbData;
118,111✔
988
  }
989

990
  int32_t sversion = pSubmitTbData->sver;
118,111✔
991
  int64_t uid = pSubmitTbData->uid;
118,111✔
992
  pReader->lastBlkUid = uid;
118,111✔
993

994
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
118,111✔
995
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
118,112✔
996
  if (pReader->pSchemaWrapper == NULL) {
118,099✔
997
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
504✔
998
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
999
    pReader->cachedSchemaSuid = 0;
488✔
1000
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
488✔
1001
  }
1002

1003
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
117,595✔
1004
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
113✔
1005
  } else {
1006
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
117,482✔
1007
  }
1008
}
1009

1010
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
7,921✔
1011

1012
void tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
8,007✔
1013
  if (pReader->tbIdHash) {
8,007✔
1014
    taosHashClear(pReader->tbIdHash);
15✔
1015
  } else {
1016
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
7,992✔
1017
    if (pReader->tbIdHash == NULL) {
7,996!
1018
      tqError("s-task:%s failed to init hash table", id);
×
1019
      return;
×
1020
    }
1021
  }
1022

1023
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
432,016✔
1024
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
423,997✔
1025
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
423,984!
1026
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1027
      continue;
×
1028
    }
1029
  }
1030

1031
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
8,008✔
1032
}
1033

1034
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
19,134✔
1035
  if (pReader->tbIdHash == NULL) {
19,134!
1036
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1037
    if (pReader->tbIdHash == NULL) {
×
1038
      tqError("failed to init hash table");
×
1039
      return;
×
1040
    }
1041
  }
1042

1043
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
19,134✔
1044
  for (int i = 0; i < numOfTables; i++) {
20,015✔
1045
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
881✔
1046
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
881!
1047
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1048
      continue;
×
1049
    }
1050
  }
1051
}
1052

1053
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
2,462✔
1054
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
2,462✔
1055
}
1056

1057
bool tqCurrentBlockConsumed(const STqReader* pReader) { return pReader->msg.msgStr == NULL; }
693,180✔
1058

1059
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
721✔
1060
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
721!
UNCOV
1061
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1062
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
×
UNCOV
1063
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1064
    }
1065
  }
1066
}
721✔
1067

1068
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
91,145✔
1069
  void*   pIter = NULL;
91,145✔
1070
  int32_t vgId = TD_VID(pTq->pVnode);
91,145✔
1071

1072
  // update the table list for each consumer handle
1073
  taosWLockLatch(&pTq->lock);
91,145✔
1074
  while (1) {
4,137✔
1075
    pIter = taosHashIterate(pTq->pHandle, pIter);
95,284✔
1076
    if (pIter == NULL) {
95,283✔
1077
      break;
91,146✔
1078
    }
1079

1080
    STqHandle* pTqHandle = (STqHandle*)pIter;
4,137✔
1081
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4,137✔
1082
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
967✔
1083
      if (code != 0) {
967!
1084
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1085
        continue;
×
1086
      }
1087
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
3,170✔
1088
      if (!isAdd) {
3,155✔
1089
        int32_t sz = taosArrayGetSize(tbUidList);
1,062✔
1090
        for (int32_t i = 0; i < sz; i++) {
1,062!
UNCOV
1091
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1092
          if (tbUid &&
×
UNCOV
1093
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1094
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1095
            continue;
×
1096
          }
1097
        }
1098
      }
1099
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
15!
1100
      if (isAdd) {
15!
1101
        SArray* list = NULL;
15✔
1102
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
15✔
1103
                                    &list, pTqHandle->execHandle.task);
1104
        if (ret != TDB_CODE_SUCCESS) {
15!
1105
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1106
                  pTqHandle->consumerId);
1107
          taosArrayDestroy(list);
×
1108
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1109
          taosWUnLockLatch(&pTq->lock);
×
1110

1111
          return ret;
×
1112
        }
1113
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
15✔
1114
        taosArrayDestroy(list);
15✔
1115
      } else {
1116
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1117
      }
1118
    }
1119
  }
1120
  taosWUnLockLatch(&pTq->lock);
91,146✔
1121

1122
  // update the table list handle for each stream scanner/wal reader
1123
  streamMetaWLock(pTq->pStreamMeta);
91,146✔
1124
  while (1) {
38,914✔
1125
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
130,061✔
1126
    if (pIter == NULL) {
130,049✔
1127
      break;
91,145✔
1128
    }
1129

1130
    int64_t      refId = *(int64_t*)pIter;
38,904✔
1131
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
38,904✔
1132
    if (pTask != NULL) {
38,911!
1133
      int32_t taskId = pTask->id.taskId;
38,911✔
1134

1135
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
38,911✔
1136
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
18,942✔
1137
        if (code != 0) {
18,938✔
1138
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
63!
1139
        }
1140
      }
1141
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
38,907✔
1142
      if (ret) {
38,914!
1143
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1144
      }
1145
    }
1146
  }
1147

1148
  streamMetaWUnLock(pTq->pStreamMeta);
91,145✔
1149
  return 0;
91,145✔
1150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc