• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3522

07 Nov 2024 05:59AM UTC coverage: 58.216% (+1.3%) from 56.943%
#3522

push

travis-ci

web-flow
Merge pull request #28663 from taosdata/fix/3_liaohj

fix(stream): stop the underlying scan operations for stream

111884 of 248391 branches covered (45.04%)

Branch coverage included in aggregate %.

3 of 4 new or added lines in 1 file covered. (75.0%)

1164 existing lines in 134 files now uncovered.

191720 of 273118 relevant lines covered (70.2%)

13088725.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.23
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
×
20
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
×
21
    return true;
×
22
  }
23

24
  int16_t msgType = pHead->msgType;
×
25
  char*   body = pHead->body;
×
26
  int32_t bodyLen = pHead->bodyLen;
×
27

28
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
×
29
  int64_t  realTbSuid = 0;
×
30
  SDecoder dcoder = {0};
×
31
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
×
32
  int32_t  len = bodyLen - sizeof(SMsgHead);
×
33
  tDecoderInit(&dcoder, data, len);
×
34

35
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
×
36
    SVCreateStbReq req = {0};
×
37
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
×
38
      goto end;
×
39
    }
40
    realTbSuid = req.suid;
×
41
  } else if (msgType == TDMT_VND_DROP_STB) {
×
42
    SVDropStbReq req = {0};
×
43
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
44
      goto end;
×
45
    }
46
    realTbSuid = req.suid;
×
47
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
×
48
    SVCreateTbBatchReq req = {0};
×
49
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
×
50
      goto end;
×
51
    }
52

53
    int32_t        needRebuild = 0;
×
54
    SVCreateTbReq* pCreateReq = NULL;
×
55
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
56
      pCreateReq = req.pReqs + iReq;
×
57
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
58
        needRebuild++;
×
59
      }
60
    }
61
    if (needRebuild == 0) {
×
62
      // do nothing
63
    } else if (needRebuild == req.nReqs) {
×
64
      realTbSuid = tbSuid;
×
65
    } else {
66
      realTbSuid = tbSuid;
×
67
      SVCreateTbBatchReq reqNew = {0};
×
68
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
69
      if (reqNew.pArray == NULL) {
×
70
        tDeleteSVCreateTbBatchReq(&req);
×
71
        goto end;
×
72
      }
73
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
74
        pCreateReq = req.pReqs + iReq;
×
75
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
76
          reqNew.nReqs++;
×
77
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
78
            taosArrayDestroy(reqNew.pArray);
×
79
            tDeleteSVCreateTbBatchReq(&req);
×
80
            goto end;
×
81
          }
82
        }
83
      }
84

85
      int     tlen = 0;
×
86
      int32_t ret = 0;
×
87
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
88
      void* buf = taosMemoryMalloc(tlen);
×
89
      if (NULL == buf) {
×
90
        taosArrayDestroy(reqNew.pArray);
×
91
        tDeleteSVCreateTbBatchReq(&req);
×
92
        goto end;
×
93
      }
94
      SEncoder coderNew = {0};
×
95
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
96
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
97
      tEncoderClear(&coderNew);
×
98
      if (ret < 0) {
×
99
        taosMemoryFree(buf);
×
100
        taosArrayDestroy(reqNew.pArray);
×
101
        tDeleteSVCreateTbBatchReq(&req);
×
102
        goto end;
×
103
      }
104
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
105
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
106
      taosMemoryFree(buf);
×
107
      taosArrayDestroy(reqNew.pArray);
×
108
    }
109

110
    tDeleteSVCreateTbBatchReq(&req);
×
111
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
112
    SVAlterTbReq req = {0};
×
113

114
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
115
      goto end;
×
116
    }
117

118
    SMetaReader mr = {0};
×
119
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
120

121
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
122
      metaReaderClear(&mr);
×
123
      goto end;
×
124
    }
125
    realTbSuid = mr.me.ctbEntry.suid;
×
126
    metaReaderClear(&mr);
×
127
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
128
    SVDropTbBatchReq req = {0};
×
129

130
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
131
      goto end;
×
132
    }
133

134
    int32_t      needRebuild = 0;
×
135
    SVDropTbReq* pDropReq = NULL;
×
136
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
137
      pDropReq = req.pReqs + iReq;
×
138

139
      if (pDropReq->suid == tbSuid) {
×
140
        needRebuild++;
×
141
      }
142
    }
143
    if (needRebuild == 0) {
×
144
      // do nothing
145
    } else if (needRebuild == req.nReqs) {
×
146
      realTbSuid = tbSuid;
×
147
    } else {
148
      realTbSuid = tbSuid;
×
149
      SVDropTbBatchReq reqNew = {0};
×
150
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
151
      if (reqNew.pArray == NULL) {
×
152
        goto end;
×
153
      }
154
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
155
        pDropReq = req.pReqs + iReq;
×
156
        if (pDropReq->suid == tbSuid) {
×
157
          reqNew.nReqs++;
×
158
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
159
            taosArrayDestroy(reqNew.pArray);
×
160
            goto end;
×
161
          }
162
        }
163
      }
164

165
      int     tlen = 0;
×
166
      int32_t ret = 0;
×
167
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
168
      void* buf = taosMemoryMalloc(tlen);
×
169
      if (NULL == buf) {
×
170
        taosArrayDestroy(reqNew.pArray);
×
171
        goto end;
×
172
      }
173
      SEncoder coderNew = {0};
×
174
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
175
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
176
      tEncoderClear(&coderNew);
×
177
      if (ret != 0) {
×
178
        taosMemoryFree(buf);
×
179
        taosArrayDestroy(reqNew.pArray);
×
180
        goto end;
×
181
      }
182
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
183
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
184
      taosMemoryFree(buf);
×
185
      taosArrayDestroy(reqNew.pArray);
×
186
    }
187
  } else if (msgType == TDMT_VND_DELETE) {
×
188
    SDeleteRes req = {0};
×
189
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
190
      goto end;
×
191
    }
192
    realTbSuid = req.suid;
×
193
  }
194

195
end:
×
196
  tDecoderClear(&dcoder);
×
197
  return tbSuid == realTbSuid;
×
198
}
199

200
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
3,317✔
201
  int32_t code = -1;
3,317✔
202
  int32_t vgId = TD_VID(pTq->pVnode);
3,317✔
203
  int64_t id = pHandle->pWalReader->readerId;
3,317✔
204

205
  int64_t offset = *fetchOffset;
3,317✔
206
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
3,317✔
207
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
3,320✔
208
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
3,320✔
209

210
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
3,320!
211
          ", 0x%" PRIx64,
212
          vgId, offset, lastVer, committedVer, appliedVer, id);
213

214
  while (offset <= appliedVer) {
3,562✔
215
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
3,087!
216
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
217
              ", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
218
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
219
      goto END;
×
220
    }
221

222
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
3,087!
223
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
224

225
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
3,087✔
226
      code = walFetchBody(pHandle->pWalReader);
2,845✔
227
      goto END;
2,845✔
228
    } else {
229
      if (pHandle->fetchMeta != WITH_DATA) {
242✔
230
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
10✔
231
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
10!
232
          code = walFetchBody(pHandle->pWalReader);
×
233
          if (code < 0) {
×
234
            goto END;
×
235
          }
236

237
          pHead = &(pHandle->pWalReader->pHead->head);
×
238
          if (isValValidForTable(pHandle, pHead)) {
×
239
            code = 0;
×
240
            goto END;
×
241
          } else {
242
            offset++;
×
243
            code = -1;
×
244
            continue;
×
245
          }
246
        }
247
      }
248
      code = walSkipFetchBody(pHandle->pWalReader);
242✔
249
      if (code < 0) {
242!
250
        goto END;
×
251
      }
252
      offset++;
242✔
253
    }
254
    code = -1;
242✔
255
  }
256

257
END:
475✔
258
  *fetchOffset = offset;
3,320✔
259
  return code;
3,320✔
260
}
261

262
bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; }
350✔
263

264
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
22✔
265
  bool            ret = false;
22✔
266
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
22✔
267
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
22!
268
    ret = true;
2✔
269
  }
270
  tDeleteSchemaWrapper(schema);
271
  pReader->hasPrimaryKey = ret;
22✔
272
}
22✔
273

274
STqReader* tqReaderOpen(SVnode* pVnode) {
3,839✔
275
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
3,839✔
276
  if (pReader == NULL) {
3,841!
277
    return NULL;
×
278
  }
279

280
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
3,841✔
281
  if (pReader->pWalReader == NULL) {
3,840!
282
    taosMemoryFree(pReader);
×
283
    return NULL;
×
284
  }
285

286
  pReader->pVnodeMeta = pVnode->pMeta;
3,840✔
287
  pReader->pColIdList = NULL;
3,840✔
288
  pReader->cachedSchemaVer = 0;
3,840✔
289
  pReader->cachedSchemaSuid = 0;
3,840✔
290
  pReader->pSchemaWrapper = NULL;
3,840✔
291
  pReader->tbIdHash = NULL;
3,840✔
292
  pReader->pResBlock = NULL;
3,840✔
293

294
  int32_t code = createDataBlock(&pReader->pResBlock);
3,840✔
295
  if (code) {
3,839!
296
    terrno = code;
×
297
  }
298

299
  return pReader;
3,839✔
300
}
301

302
void tqReaderClose(STqReader* pReader) {
3,773✔
303
  if (pReader == NULL) return;
3,773!
304

305
  // close wal reader
306
  if (pReader->pWalReader) {
3,773!
307
    walCloseReader(pReader->pWalReader);
3,773✔
308
  }
309

310
  if (pReader->pSchemaWrapper) {
3,773✔
311
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,020!
312
  }
313

314
  if (pReader->pColIdList) {
3,773✔
315
    taosArrayDestroy(pReader->pColIdList);
3,756✔
316
  }
317

318
  // free hash
319
  blockDataDestroy(pReader->pResBlock);
3,773✔
320
  taosHashCleanup(pReader->tbIdHash);
3,772✔
321
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
3,773✔
322
  taosMemoryFree(pReader);
3,773✔
323
}
324

325
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
240✔
326
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
240✔
327
    return -1;
28✔
328
  }
329
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
212!
330
  return 0;
212✔
331
}
332

333
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
377,619✔
334
  int32_t code = 0;
377,619✔
335

336
  while (1) {
15,750✔
337
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
393,369✔
338

339
    SWalCont* pCont = &pReader->pHead->head;
324,757✔
340
    int64_t   ver = pCont->version;
324,757✔
341
    if (ver > maxVer) {
324,757✔
342
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
107!
343
      return TSDB_CODE_SUCCESS;
107✔
344
    }
345

346
    if (pCont->msgType == TDMT_VND_SUBMIT) {
324,650✔
347
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
307,579✔
348
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
307,579✔
349

350
      void* data = taosMemoryMalloc(len);
307,579✔
351
      if (data == NULL) {
307,591!
352
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
353
        // retry
354
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
355
        return terrno;
×
356
      }
357

358
      (void)memcpy(data, pBody, len);
307,591✔
359
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
307,591✔
360

361
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
307,591✔
362
      if (code != 0) {
307,599✔
363
        tqError("%s failed to create data submit for stream since out of memory", id);
2!
364
        return code;
×
365
      }
366
    } else if (pCont->msgType == TDMT_VND_DELETE) {
17,071!
367
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
17,083✔
368
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
17,083✔
369

370
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0);
17,083✔
371
      if (code == TSDB_CODE_SUCCESS) {
17,082!
372
        if (*pItem == NULL) {
17,082✔
373
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
15,750✔
374
          // we need to continue check next data in the wal files.
375
          continue;
15,750✔
376
        } else {
377
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
1,332✔
378
        }
379
      } else {
380
        terrno = code;
×
381
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
382
        return code;
×
383
      }
384

385
    } else {
386
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
387
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
388
    }
389

390
    return code;
308,925✔
391
  }
392
}
393

394
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
30,876✔
395
  SWalReader* pWalReader = pReader->pWalReader;
30,876✔
396

397
  int64_t st = taosGetTimestampMs();
30,876✔
398
  while (1) {
41,329✔
399
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
72,205✔
400
    while (pReader->nextBlk < numOfBlocks) {
85,280✔
401
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
41,328!
402
              pReader->msg.ver);
403

404
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
41,328✔
405
      if (pSubmitTbData == NULL) {
41,328!
406
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
407
                pReader->msg.ver);
408
        return false;
×
409
      }
410
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
41,328!
411
        pReader->nextBlk += 1;
×
412
        continue;
×
413
      }
414
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
41,328!
415
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
28,255!
416
        SSDataBlock* pRes = NULL;
28,255✔
417
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
28,255✔
418
        if (code == TSDB_CODE_SUCCESS) {
28,251!
419
          return true;
28,251✔
420
        }
421
      } else {
422
        pReader->nextBlk += 1;
13,080✔
423
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
13,080!
424
      }
425
    }
426

427
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
43,952✔
428
    pReader->msg.msgStr = NULL;
43,954✔
429

430
    int64_t elapsed = taosGetTimestampMs() - st;
43,954✔
431
    if (elapsed > 1000 || elapsed < 0) {
43,954!
432
      return false;
1✔
433
    }
434

435
    // try next message in wal file
436
    if (walNextValidMsg(pWalReader) < 0) {
43,953✔
437
      return false;
2,621✔
438
    }
439

440
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
41,332✔
441
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
41,332✔
442
    int64_t ver = pWalReader->pHead->head.version;
41,332✔
443
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) {
41,332!
444
      return false;
×
445
    }
446
    pReader->nextBlk = 0;
41,329✔
447
  }
448
}
449

450
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
352,301✔
451
  pReader->msg.msgStr = msgStr;
352,301✔
452
  pReader->msg.msgLen = msgLen;
352,301✔
453
  pReader->msg.ver = ver;
352,301✔
454

455
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
352,301✔
456
  SDecoder decoder = {0};
352,308✔
457

458
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
352,308✔
459
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
352,296✔
460
  if (code != 0) {
352,256!
461
    tDecoderClear(&decoder);
×
462
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
463
    return code;
×
464
  }
465

466
  tDecoderClear(&decoder);
352,256✔
467
  return 0;
352,341✔
468
}
469

470
SWalReader* tqGetWalReader(STqReader* pReader) { return pReader->pWalReader; }
33,786✔
471

472
SSDataBlock* tqGetResultBlock(STqReader* pReader) { return pReader->pResBlock; }
30,872✔
473

474
int64_t tqGetResultBlockTime(STqReader* pReader) { return pReader->lastTs; }
30,870✔
475

476
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
547,774✔
477
  if (pReader->msg.msgStr == NULL) {
547,774!
478
    return false;
×
479
  }
480

481
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
547,774✔
482
  while (pReader->nextBlk < numOfBlocks) {
649,843✔
483
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
341,700✔
484
            (pReader->nextBlk + 1), numOfBlocks, idstr);
485

486
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
341,700✔
487
    if (pSubmitTbData == NULL) {
341,697!
488
      return false;
×
489
    }
490
    if (pReader->tbIdHash == NULL) {
341,697!
491
      return true;
×
492
    }
493

494
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
341,697✔
495
    if (ret != NULL) {
341,739✔
496
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
239,700✔
497
      return true;
239,701✔
498
    } else {
499
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
102,039✔
500
              taosHashGetSize(pReader->tbIdHash), idstr);
501
    }
502

503
    pReader->nextBlk++;
102,064✔
504
  }
505

506
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
308,143✔
507
  pReader->nextBlk = 0;
308,182✔
508
  pReader->msg.msgStr = NULL;
308,182✔
509

510
  return false;
308,182✔
511
}
512

513
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
5,659✔
514
  if (pReader->msg.msgStr == NULL) return false;
5,659!
515

516
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
5,659✔
517
  while (pReader->nextBlk < blockSz) {
5,662✔
518
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
2,831✔
519
    if (pSubmitTbData == NULL) return false;
2,831!
520
    if (filterOutUids == NULL) return true;
2,831!
521

522
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
2,831✔
523
    if (ret == NULL) {
2,831!
524
      return true;
2,831✔
525
    }
526
    pReader->nextBlk++;
×
527
  }
528

529
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
2,831✔
530
  pReader->nextBlk = 0;
2,831✔
531
  pReader->msg.msgStr = NULL;
2,831✔
532

533
  return false;
2,831✔
534
}
535

536
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
2,845✔
537
  int32_t code = 0;
2,845✔
538

539
  int32_t cnt = 0;
2,845✔
540
  for (int32_t i = 0; i < pSrc->nCols; i++) {
22,712✔
541
    cnt += mask[i];
19,867✔
542
  }
543

544
  pDst->nCols = cnt;
2,845✔
545
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
2,845✔
546
  if (pDst->pSchema == NULL) {
2,845!
547
    return TAOS_GET_TERRNO(terrno);
×
548
  }
549

550
  int32_t j = 0;
2,845✔
551
  for (int32_t i = 0; i < pSrc->nCols; i++) {
22,710✔
552
    if (mask[i]) {
19,866!
553
      pDst->pSchema[j++] = pSrc->pSchema[i];
19,866✔
554
      SColumnInfoData colInfo =
555
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
19,866✔
556
      code = blockDataAppendColInfo(pBlock, &colInfo);
19,867✔
557
      if (code != 0) {
19,865!
558
        return code;
×
559
      }
560
    }
561
  }
562
  return 0;
2,844✔
563
}
564

565
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
1,035✔
566
  SSDataBlock* pBlock = pReader->pResBlock;
1,035✔
567
  if (blockDataGetNumOfCols(pBlock) > 0) {
1,035!
568
      blockDataDestroy(pBlock);
×
569
      int32_t code = createDataBlock(&pReader->pResBlock);
×
570
      if (code) {
×
571
        return code;
×
572
      }
573
      pBlock = pReader->pResBlock;
×
574

575
      pBlock->info.id.uid = pReader->cachedSchemaUid;
×
576
      pBlock->info.version = pReader->msg.ver;
×
577
  }
578

579
  int32_t numOfCols = taosArrayGetSize(pColIdList);
1,036✔
580

581
  if (numOfCols == 0) {  // all columns are required
1,036!
582
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
583
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
584
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
585

586
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
587
      if (code != TSDB_CODE_SUCCESS) {
×
588
        blockDataFreeRes(pBlock);
×
589
        return terrno;
×
590
      }
591
    }
592
  } else {
593
    if (numOfCols > pSchema->nCols) {
1,036!
594
      numOfCols = pSchema->nCols;
×
595
    }
596

597
    int32_t i = 0;
1,036✔
598
    int32_t j = 0;
1,036✔
599
    while (i < pSchema->nCols && j < numOfCols) {
8,563✔
600
      SSchema* pColSchema = &pSchema->pSchema[i];
7,518✔
601
      col_id_t colIdSchema = pColSchema->colId;
7,518✔
602

603
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
7,518✔
604
      if (pColIdNeed == NULL) {
7,521!
605
        break;
×
606
      }
607
      if (colIdSchema < *pColIdNeed) {
7,521✔
608
        i++;
1,084✔
609
      } else if (colIdSchema > *pColIdNeed) {
6,437!
610
        j++;
×
611
      } else {
612
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
6,437✔
613
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
6,447✔
614
        if (code != TSDB_CODE_SUCCESS) {
6,443!
615
          return -1;
×
616
        }
617
        i++;
6,443✔
618
        j++;
6,443✔
619
      }
620
    }
621
  }
622

623
  return TSDB_CODE_SUCCESS;
1,045✔
624
}
625

626
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
107,605,296✔
627
  int32_t code = TSDB_CODE_SUCCESS;
107,605,296✔
628

629
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
107,974,519!
630
    char val[65535 + 2] = {0};
256,881✔
631
    if (COL_VAL_IS_VALUE(pColVal)) {
256,881!
632
      if (pColVal->value.pData != NULL) {
366,676!
633
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
366,765✔
634
      }
635
      varDataSetLen(val, pColVal->value.nData);
366,676✔
636
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
366,676✔
637
    } else {
638
      colDataSetNULL(pColumnInfoData, rowIndex);
×
639
    }
640
  } else {
641
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
107,348,415✔
642
  }
643

644
  return code;
107,643,539✔
645
}
646

647
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
267,939✔
648
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
267,939!
649
  int32_t        code = 0;
267,939✔
650
  int32_t        line = 0;
267,939✔
651
  STSchema*      pTSchema = NULL;
267,939✔
652
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
267,939✔
653
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
267,937!
654
  SSDataBlock* pBlock = pReader->pResBlock;
267,937✔
655
  *pRes = pBlock;
267,937✔
656

657
  blockDataCleanup(pBlock);
267,937✔
658

659
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
267,908✔
660
  int32_t sversion = pSubmitTbData->sver;
267,908✔
661
  int64_t suid = pSubmitTbData->suid;
267,908✔
662
  int64_t uid = pSubmitTbData->uid;
267,908✔
663
  pReader->lastTs = pSubmitTbData->ctimeMs;
267,908✔
664

665
  pBlock->info.id.uid = uid;
267,908✔
666
  pBlock->info.version = pReader->msg.ver;
267,908✔
667

668
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
267,908✔
669
      (pReader->cachedSchemaVer != sversion)) {
266,876✔
670
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
1,034!
671

672
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
1,036✔
673
    if (pReader->pSchemaWrapper == NULL) {
1,036✔
674
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
1!
675
             "version %d, possibly dropped table",
676
             vgId, suid, uid, pReader->cachedSchemaVer);
677
      pReader->cachedSchemaSuid = 0;
×
678
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
679
    }
680

681
    pReader->cachedSchemaUid = uid;
1,035✔
682
    pReader->cachedSchemaSuid = suid;
1,035✔
683
    pReader->cachedSchemaVer = sversion;
1,035✔
684

685
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
1,035!
686
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
687
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
688
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
689
    }
690
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
1,035✔
691
    TSDB_CHECK_CODE(code, line, END);
1,035!
692
    pBlock = pReader->pResBlock;
1,035✔
693
    *pRes = pBlock;
1,035✔
694
  }
695

696
  int32_t numOfRows = 0;
267,909✔
697
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
267,909!
698
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
699
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
700
    numOfRows = pCol->nVal;
×
701
  } else {
702
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
267,909✔
703
  }
704

705
  code = blockDataEnsureCapacity(pBlock, numOfRows);
267,907✔
706
  TSDB_CHECK_CODE(code, line, END);
267,906!
707
  pBlock->info.rows = numOfRows;
267,906✔
708
  int32_t colActual = blockDataGetNumOfCols(pBlock);
267,906✔
709

710
  // convert and scan one block
711
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
267,906!
712
    SArray* pCols = pSubmitTbData->aCol;
×
713
    int32_t numOfCols = taosArrayGetSize(pCols);
×
714
    int32_t targetIdx = 0;
×
715
    int32_t sourceIdx = 0;
×
716
    while (targetIdx < colActual) {
×
717
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
718
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
×
719
      if (sourceIdx >= numOfCols) {
×
720
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
×
721
        colDataSetNNULL(pColData, 0, numOfRows);
×
722
        targetIdx++;
×
723
        continue;
×
724
      }
725

726
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
×
727
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
728
      SColVal colVal = {0};
×
729
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
×
730
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
731
      if (pCol->cid < pColData->info.colId) {
×
732
        sourceIdx++;
×
733
      } else if (pCol->cid == pColData->info.colId) {
×
734
        for (int32_t i = 0; i < pCol->nVal; i++) {
×
735
          tColDataGetValue(pCol, i, &colVal);
×
736
          code = doSetVal(pColData, i, &colVal);
×
737
          TSDB_CHECK_CODE(code, line, END);
×
738
        }
739
        sourceIdx++;
×
740
        targetIdx++;
×
741
      } else {
742
        colDataSetNNULL(pColData, 0, numOfRows);
×
743
        targetIdx++;
×
744
      }
745
    }
746
  } else {
747
    SArray*         pRows = pSubmitTbData->aRowP;
267,906✔
748
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
267,906✔
749
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
267,906✔
750
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
267,927!
751

752
    for (int32_t i = 0; i < numOfRows; i++) {
42,330,253!
753
      SRow* pRow = taosArrayGetP(pRows, i);
42,426,574✔
754
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
42,381,395!
755
      int32_t sourceIdx = 0;
42,409,649✔
756
      for (int32_t j = 0; j < colActual; j++) {
149,449,448✔
757
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
107,387,123✔
758
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
107,343,217!
759

760
        while (1) {
19,974,505✔
761
          SColVal colVal = {0};
127,317,722✔
762
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
127,317,722✔
763
          TSDB_CHECK_CODE(code, line, END);
127,022,259!
764

765
          if (colVal.cid < pColData->info.colId) {
127,022,259✔
766
            sourceIdx++;
19,974,505✔
767
            continue;
19,974,505✔
768
          } else if (colVal.cid == pColData->info.colId) {
107,047,754!
769
            code = doSetVal(pColData, i, &colVal);
107,502,984✔
770
            TSDB_CHECK_CODE(code, line, END);
107,495,029!
771
            sourceIdx++;
107,495,029✔
772
            break;
107,039,799✔
773
          } else {
774
            colDataSetNULL(pColData, i);
×
775
            break;
×
776
          }
777
        }
778
      }
779
    }
780
  }
781

UNCOV
782
END:
×
UNCOV
783
  if (code != 0) {
×
784
    tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code);
×
785
  }
786
  taosMemoryFreeClear(pTSchema);
267,927✔
787
  return code;
267,934✔
788
}
789

790
#define PROCESS_VAL                                      \
791
  if (curRow == 0) {                                     \
792
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
793
    buildNew = true;                                     \
794
  } else {                                               \
795
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
796
    if (currentRowAssigned != assigned[j]) {             \
797
      assigned[j] = currentRowAssigned;                  \
798
      buildNew = true;                                   \
799
    }                                                    \
800
  }
801

802
#define SET_DATA                                                     \
803
  if (colVal.cid < pColData->info.colId) {                           \
804
    sourceIdx++;                                                     \
805
  } else if (colVal.cid == pColData->info.colId) {                   \
806
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
807
    sourceIdx++;                                                     \
808
    targetIdx++;                                                     \
809
  }
810

811
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
2,844✔
812
                               SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
813
                               int32_t* lastRow) {
814
  int32_t         code = 0;
2,844✔
815
  SSchemaWrapper* pSW = NULL;
2,844✔
816
  SSDataBlock*    block = NULL;
2,844✔
817
  if (taosArrayGetSize(blocks) > 0) {
2,844!
818
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
819
    TQ_NULL_GO_TO_END(pLastBlock);
×
820
    pLastBlock->info.rows = curRow - *lastRow;
×
821
    *lastRow = curRow;
×
822
  }
823

824
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
2,844✔
825
  TQ_NULL_GO_TO_END(block);
2,845!
826

827
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
2,845✔
828
  TQ_NULL_GO_TO_END(pSW);
2,845!
829

830
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
2,845!
831
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
2,845!
832
          (int32_t)taosArrayGetSize(block->pDataBlock));
833

834
  block->info.id.uid = pSubmitTbData->uid;
2,845✔
835
  block->info.version = pReader->msg.ver;
2,845✔
836
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
2,845!
837
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
2,845!
838
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
2,844!
839
  pSW = NULL;
2,844✔
840
  taosMemoryFreeClear(block);
2,844!
841

842
END:
×
843
  tDeleteSchemaWrapper(pSW);
2,845!
844
  blockDataFreeRes(block);
2,845✔
845
  taosMemoryFree(block);
2,845✔
846
  return code;
2,845✔
847
}
848
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
×
849
  int32_t code = 0;
×
850
  int32_t curRow = 0;
×
851
  int32_t lastRow = 0;
×
852

853
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
×
854
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
×
855
  TQ_NULL_GO_TO_END(assigned);
×
856

857
  SArray*   pCols = pSubmitTbData->aCol;
×
858
  SColData* pCol = taosArrayGet(pCols, 0);
×
859
  TQ_NULL_GO_TO_END(pCol);
×
860
  int32_t numOfRows = pCol->nVal;
×
861
  int32_t numOfCols = taosArrayGetSize(pCols);
×
862
  for (int32_t i = 0; i < numOfRows; i++) {
×
863
    bool buildNew = false;
×
864

865
    for (int32_t j = 0; j < numOfCols; j++) {
×
866
      pCol = taosArrayGet(pCols, j);
×
867
      TQ_NULL_GO_TO_END(pCol);
×
868
      SColVal colVal = {0};
×
869
      tColDataGetValue(pCol, i, &colVal);
×
870
      PROCESS_VAL
×
871
    }
872

873
    if (buildNew) {
×
874
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
×
875
                                       curRow, &lastRow));
876
    }
877

878
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
×
879
    TQ_NULL_GO_TO_END(pBlock);
×
880

881
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
×
882
            (int32_t)taosArrayGetSize(blocks));
883

884
    int32_t targetIdx = 0;
×
885
    int32_t sourceIdx = 0;
×
886
    int32_t colActual = blockDataGetNumOfCols(pBlock);
×
887
    while (targetIdx < colActual) {
×
888
      pCol = taosArrayGet(pCols, sourceIdx);
×
889
      TQ_NULL_GO_TO_END(pCol);
×
890
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
891
      TQ_NULL_GO_TO_END(pColData);
×
892
      SColVal colVal = {0};
×
893
      tColDataGetValue(pCol, i, &colVal);
×
894
      SET_DATA
×
895
    }
896

897
    curRow++;
×
898
  }
899
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
900
  pLastBlock->info.rows = curRow - lastRow;
×
901

902
END:
×
903
  taosMemoryFree(assigned);
×
904
  return code;
×
905
}
906

907
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
2,845✔
908
  int32_t   code = 0;
2,845✔
909
  STSchema* pTSchema = NULL;
2,845✔
910

911
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
2,845✔
912
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
2,845✔
913
  TQ_NULL_GO_TO_END(assigned);
2,844!
914

915
  int32_t curRow = 0;
2,844✔
916
  int32_t lastRow = 0;
2,844✔
917
  SArray* pRows = pSubmitTbData->aRowP;
2,844✔
918
  int32_t numOfRows = taosArrayGetSize(pRows);
2,844✔
919
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
2,844✔
920

921
  for (int32_t i = 0; i < numOfRows; i++) {
31,173✔
922
    bool  buildNew = false;
28,328✔
923
    SRow* pRow = taosArrayGetP(pRows, i);
28,328✔
924
    TQ_NULL_GO_TO_END(pRow);
28,329!
925

926
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
226,177✔
927
      SColVal colVal = {0};
197,857✔
928
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
197,857!
929
      PROCESS_VAL
197,848!
930
    }
931

932
    if (buildNew) {
28,320✔
933
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
2,844!
934
                                       curRow, &lastRow));
935
    }
936

937
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
28,321✔
938
    TQ_NULL_GO_TO_END(pBlock);
28,326!
939

940
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
28,326!
941
            (int32_t)taosArrayGetSize(blocks));
942

943
    int32_t targetIdx = 0;
28,326✔
944
    int32_t sourceIdx = 0;
28,326✔
945
    int32_t colActual = blockDataGetNumOfCols(pBlock);
28,326✔
946
    while (targetIdx < colActual) {
225,773✔
947
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
197,421✔
948
      SColVal          colVal = {0};
197,408✔
949
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
197,408!
950
      SET_DATA
197,294!
951
    }
952

953
    curRow++;
28,352✔
954
  }
955
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
2,845✔
956
  pLastBlock->info.rows = curRow - lastRow;
2,845✔
957

958
END:
2,845✔
959
  taosMemoryFreeClear(pTSchema);
2,845!
960
  taosMemoryFree(assigned);
2,845✔
961
  return code;
2,845✔
962
}
963

964
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
2,845✔
965
  tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
2,845!
966
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
2,845✔
967
  if (pSubmitTbData == NULL) {
2,845!
968
    return terrno;
×
969
  }
970
  pReader->nextBlk++;
2,845✔
971

972
  if (pSubmitTbDataRet) {
2,845!
973
    *pSubmitTbDataRet = pSubmitTbData;
2,845✔
974
  }
975

976
  int32_t sversion = pSubmitTbData->sver;
2,845✔
977
  int64_t uid = pSubmitTbData->uid;
2,845✔
978
  pReader->lastBlkUid = uid;
2,845✔
979

980
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
2,845✔
981
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
2,845✔
982
  if (pReader->pSchemaWrapper == NULL) {
2,845!
983
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
×
984
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
985
    pReader->cachedSchemaSuid = 0;
×
986
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
987
  }
988

989
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
2,845!
990
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
×
991
  } else {
992
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
2,845✔
993
  }
994
}
995

996
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
3,823✔
997

998
void tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
3,840✔
999
  if (pReader->tbIdHash) {
3,840✔
1000
    taosHashClear(pReader->tbIdHash);
10✔
1001
  } else {
1002
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
3,830✔
1003
    if (pReader->tbIdHash == NULL) {
3,832!
1004
      tqError("s-task:%s failed to init hash table", id);
×
1005
      return;
×
1006
    }
1007
  }
1008

1009
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
453,283✔
1010
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
449,397✔
1011
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
449,349!
1012
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
34!
1013
      continue;
×
1014
    }
1015
  }
1016

1017
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
3,710✔
1018
}
1019

1020
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
9,393✔
1021
  if (pReader->tbIdHash == NULL) {
9,393!
1022
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1023
    if (pReader->tbIdHash == NULL) {
×
1024
      tqError("failed to init hash table");
×
1025
      return;
×
1026
    }
1027
  }
1028

1029
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
9,393✔
1030
  for (int i = 0; i < numOfTables; i++) {
9,619✔
1031
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
226✔
1032
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
226!
1033
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1034
      continue;
×
1035
    }
1036
  }
1037
}
1038

1039
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
1,353✔
1040
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
1,353✔
1041
}
1042

1043
bool tqCurrentBlockConsumed(const STqReader* pReader) { return pReader->msg.msgStr == NULL; }
566,198✔
1044

1045
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
10✔
1046
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
15✔
1047
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
5✔
1048
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
5!
1049
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1050
    }
1051
  }
1052
}
10✔
1053

1054
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
50,839✔
1055
  void*   pIter = NULL;
50,839✔
1056
  int32_t vgId = TD_VID(pTq->pVnode);
50,839✔
1057

1058
  // update the table list for each consumer handle
1059
  taosWLockLatch(&pTq->lock);
50,839✔
1060
  while (1) {
101✔
1061
    pIter = taosHashIterate(pTq->pHandle, pIter);
50,943✔
1062
    if (pIter == NULL) {
50,943✔
1063
      break;
50,842✔
1064
    }
1065

1066
    STqHandle* pTqHandle = (STqHandle*)pIter;
101✔
1067
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
101✔
1068
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
1✔
1069
      if (code != 0) {
1!
1070
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1071
        continue;
×
1072
      }
1073
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
100✔
1074
      if (!isAdd) {
90!
1075
        int32_t sz = taosArrayGetSize(tbUidList);
×
1076
        for (int32_t i = 0; i < sz; i++) {
×
1077
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1078
          if (tbUid &&
×
1079
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1080
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1081
            continue;
×
1082
          }
1083
        }
1084
      }
1085
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
10!
1086
      if (isAdd) {
10!
1087
        SArray* list = NULL;
10✔
1088
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
10✔
1089
                                    &list, pTqHandle->execHandle.task);
1090
        if (ret != TDB_CODE_SUCCESS) {
10!
1091
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1092
                  pTqHandle->consumerId);
1093
          taosArrayDestroy(list);
×
1094
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1095
          taosWUnLockLatch(&pTq->lock);
×
1096

1097
          return ret;
×
1098
        }
1099
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
10✔
1100
        taosArrayDestroy(list);
10✔
1101
      } else {
1102
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1103
      }
1104
    }
1105
  }
1106
  taosWUnLockLatch(&pTq->lock);
50,842✔
1107

1108
  // update the table list handle for each stream scanner/wal reader
1109
  streamMetaWLock(pTq->pStreamMeta);
50,842✔
1110
  while (1) {
19,563✔
1111
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
70,406✔
1112
    if (pIter == NULL) {
70,399✔
1113
      break;
50,842✔
1114
    }
1115

1116
    int64_t      refId = *(int64_t*)pIter;
19,557✔
1117
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
19,557✔
1118
    if (pTask != NULL) {
19,563!
1119
      int32_t taskId = pTask->id.taskId;
19,563✔
1120

1121
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
19,563✔
1122
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
9,420✔
1123
        if (code != 0) {
9,418✔
1124
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
33!
1125
        }
1126
      }
1127
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
19,561✔
1128
      if (ret) {
19,563!
1129
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1130
      }
1131
    }
1132
  }
1133

1134
  streamMetaWUnLock(pTq->pStreamMeta);
50,842✔
1135
  return 0;
50,843✔
1136
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc