• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.47
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
×
20
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
×
21
    return true;
×
22
  }
23

24
  int16_t msgType = pHead->msgType;
×
25
  char*   body = pHead->body;
×
26
  int32_t bodyLen = pHead->bodyLen;
×
27

28
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
×
29
  int64_t  realTbSuid = 0;
×
30
  SDecoder dcoder = {0};
×
31
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
×
32
  int32_t  len = bodyLen - sizeof(SMsgHead);
×
33
  tDecoderInit(&dcoder, data, len);
×
34

35
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
×
36
    SVCreateStbReq req = {0};
×
37
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
×
38
      goto end;
×
39
    }
40
    realTbSuid = req.suid;
×
41
  } else if (msgType == TDMT_VND_DROP_STB) {
×
42
    SVDropStbReq req = {0};
×
43
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
44
      goto end;
×
45
    }
46
    realTbSuid = req.suid;
×
47
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
×
48
    SVCreateTbBatchReq req = {0};
×
49
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
×
50
      goto end;
×
51
    }
52

53
    int32_t        needRebuild = 0;
×
54
    SVCreateTbReq* pCreateReq = NULL;
×
55
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
56
      pCreateReq = req.pReqs + iReq;
×
57
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
58
        needRebuild++;
×
59
      }
60
    }
61
    if (needRebuild == 0) {
×
62
      // do nothing
63
    } else if (needRebuild == req.nReqs) {
×
64
      realTbSuid = tbSuid;
×
65
    } else {
66
      realTbSuid = tbSuid;
×
67
      SVCreateTbBatchReq reqNew = {0};
×
68
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
69
      if (reqNew.pArray == NULL) {
×
70
        tDeleteSVCreateTbBatchReq(&req);
×
71
        goto end;
×
72
      }
73
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
74
        pCreateReq = req.pReqs + iReq;
×
75
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
76
          reqNew.nReqs++;
×
77
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
78
            taosArrayDestroy(reqNew.pArray);
×
79
            tDeleteSVCreateTbBatchReq(&req);
×
80
            goto end;
×
81
          }
82
        }
83
      }
84

85
      int     tlen = 0;
×
86
      int32_t ret = 0;
×
87
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
88
      void* buf = taosMemoryMalloc(tlen);
×
89
      if (NULL == buf) {
×
90
        taosArrayDestroy(reqNew.pArray);
×
91
        tDeleteSVCreateTbBatchReq(&req);
×
92
        goto end;
×
93
      }
94
      SEncoder coderNew = {0};
×
95
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
96
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
97
      tEncoderClear(&coderNew);
×
98
      if (ret < 0) {
×
99
        taosMemoryFree(buf);
×
100
        taosArrayDestroy(reqNew.pArray);
×
101
        tDeleteSVCreateTbBatchReq(&req);
×
102
        goto end;
×
103
      }
104
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
105
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
106
      taosMemoryFree(buf);
×
107
      taosArrayDestroy(reqNew.pArray);
×
108
    }
109

110
    tDeleteSVCreateTbBatchReq(&req);
×
111
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
112
    SVAlterTbReq req = {0};
×
113

114
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
115
      goto end;
×
116
    }
117

118
    SMetaReader mr = {0};
×
119
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
120

121
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
122
      metaReaderClear(&mr);
×
123
      goto end;
×
124
    }
125
    realTbSuid = mr.me.ctbEntry.suid;
×
126
    metaReaderClear(&mr);
×
127
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
128
    SVDropTbBatchReq req = {0};
×
129

130
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
131
      goto end;
×
132
    }
133

134
    int32_t      needRebuild = 0;
×
135
    SVDropTbReq* pDropReq = NULL;
×
136
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
137
      pDropReq = req.pReqs + iReq;
×
138

139
      if (pDropReq->suid == tbSuid) {
×
140
        needRebuild++;
×
141
      }
142
    }
143
    if (needRebuild == 0) {
×
144
      // do nothing
145
    } else if (needRebuild == req.nReqs) {
×
146
      realTbSuid = tbSuid;
×
147
    } else {
148
      realTbSuid = tbSuid;
×
149
      SVDropTbBatchReq reqNew = {0};
×
150
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
151
      if (reqNew.pArray == NULL) {
×
152
        goto end;
×
153
      }
154
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
155
        pDropReq = req.pReqs + iReq;
×
156
        if (pDropReq->suid == tbSuid) {
×
157
          reqNew.nReqs++;
×
158
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
159
            taosArrayDestroy(reqNew.pArray);
×
160
            goto end;
×
161
          }
162
        }
163
      }
164

165
      int     tlen = 0;
×
166
      int32_t ret = 0;
×
167
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
168
      void* buf = taosMemoryMalloc(tlen);
×
169
      if (NULL == buf) {
×
170
        taosArrayDestroy(reqNew.pArray);
×
171
        goto end;
×
172
      }
173
      SEncoder coderNew = {0};
×
174
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
175
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
176
      tEncoderClear(&coderNew);
×
177
      if (ret != 0) {
×
178
        taosMemoryFree(buf);
×
179
        taosArrayDestroy(reqNew.pArray);
×
180
        goto end;
×
181
      }
182
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
183
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
184
      taosMemoryFree(buf);
×
185
      taosArrayDestroy(reqNew.pArray);
×
186
    }
187
  } else if (msgType == TDMT_VND_DELETE) {
×
188
    SDeleteRes req = {0};
×
189
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
190
      goto end;
×
191
    }
192
    realTbSuid = req.suid;
×
193
  }
194

195
end:
×
196
  tDecoderClear(&dcoder);
×
197
  return tbSuid == realTbSuid;
×
198
}
199

200
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
101✔
201
  int32_t code = -1;
101✔
202
  int32_t vgId = TD_VID(pTq->pVnode);
101✔
203
  int64_t id = pHandle->pWalReader->readerId;
101✔
204

205
  int64_t offset = *fetchOffset;
101✔
206
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
101✔
207
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
101✔
208
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
101✔
209

210
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
101!
211
          ", 0x%" PRIx64,
212
          vgId, offset, lastVer, committedVer, appliedVer, id);
213

214
  while (offset <= appliedVer) {
227✔
215
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
208!
216
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
217
              ", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
218
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
219
      goto END;
×
220
    }
221

222
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
208!
223
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
224

225
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
208✔
226
      code = walFetchBody(pHandle->pWalReader);
82✔
227
      goto END;
82✔
228
    } else {
229
      if (pHandle->fetchMeta != WITH_DATA) {
126✔
230
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
10✔
231
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
10!
232
          code = walFetchBody(pHandle->pWalReader);
×
233
          if (code < 0) {
×
234
            goto END;
×
235
          }
236

237
          pHead = &(pHandle->pWalReader->pHead->head);
×
238
          if (isValValidForTable(pHandle, pHead)) {
×
239
            code = 0;
×
240
            goto END;
×
241
          } else {
242
            offset++;
×
243
            code = -1;
×
244
            continue;
×
245
          }
246
        }
247
      }
248
      code = walSkipFetchBody(pHandle->pWalReader);
126✔
249
      if (code < 0) {
126!
250
        goto END;
×
251
      }
252
      offset++;
126✔
253
    }
254
    code = -1;
126✔
255
  }
256

257
END:
19✔
258
  *fetchOffset = offset;
101✔
259
  return code;
101✔
260
}
261

262
bool tqGetTablePrimaryKey(STqReader* pReader) { return pReader->hasPrimaryKey; }
62✔
263

264
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
11✔
265
  bool            ret = false;
11✔
266
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
11✔
267
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
11!
268
    ret = true;
2✔
269
  }
270
  tDeleteSchemaWrapper(schema);
271
  pReader->hasPrimaryKey = ret;
11✔
272
}
11✔
273

274
STqReader* tqReaderOpen(SVnode* pVnode) {
2,927✔
275
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
2,927✔
276
  if (pReader == NULL) {
2,928!
277
    return NULL;
×
278
  }
279

280
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
2,928✔
281
  if (pReader->pWalReader == NULL) {
2,928!
282
    taosMemoryFree(pReader);
×
283
    return NULL;
×
284
  }
285

286
  pReader->pVnodeMeta = pVnode->pMeta;
2,928✔
287
  pReader->pColIdList = NULL;
2,928✔
288
  pReader->cachedSchemaVer = 0;
2,928✔
289
  pReader->cachedSchemaSuid = 0;
2,928✔
290
  pReader->pSchemaWrapper = NULL;
2,928✔
291
  pReader->tbIdHash = NULL;
2,928✔
292
  pReader->pResBlock = NULL;
2,928✔
293

294
  int32_t code = createDataBlock(&pReader->pResBlock);
2,928✔
295
  if (code) {
2,928!
296
    terrno = code;
×
297
  }
298

299
  return pReader;
2,928✔
300
}
301

302
void tqReaderClose(STqReader* pReader) {
2,896✔
303
  if (pReader == NULL) return;
2,896!
304

305
  // close wal reader
306
  if (pReader->pWalReader) {
2,896!
307
    walCloseReader(pReader->pWalReader);
2,896✔
308
  }
309

310
  if (pReader->pSchemaWrapper) {
2,896✔
311
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
918!
312
  }
313

314
  if (pReader->pColIdList) {
2,896✔
315
    taosArrayDestroy(pReader->pColIdList);
2,885✔
316
  }
317

318
  // free hash
319
  blockDataDestroy(pReader->pResBlock);
2,895✔
320
  taosHashCleanup(pReader->tbIdHash);
2,896✔
321
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
2,896✔
322
  taosMemoryFree(pReader);
2,896✔
323
}
324

325
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
119✔
326
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
119✔
327
    return -1;
18✔
328
  }
329
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
101!
330
  return 0;
101✔
331
}
332

333
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
140,261✔
334
  int32_t code = 0;
140,261✔
335

336
  while (1) {
21,192✔
337
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
161,453✔
338

339
    SWalCont* pCont = &pReader->pHead->head;
113,699✔
340
    int64_t   ver = pCont->version;
113,699✔
341
    if (ver > maxVer) {
113,699✔
342
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
199!
343
      return TSDB_CODE_SUCCESS;
199✔
344
    }
345

346
    if (pCont->msgType == TDMT_VND_SUBMIT) {
113,500✔
347
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
91,416✔
348
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
91,416✔
349

350
      void* data = taosMemoryMalloc(len);
91,416✔
351
      if (data == NULL) {
91,440!
352
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
353
        // retry
354
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
355
        return terrno;
×
356
      }
357

358
      (void)memcpy(data, pBody, len);
91,440✔
359
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
91,440✔
360

361
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
91,440✔
362
      if (code != 0) {
91,407✔
363
        tqError("%s failed to create data submit for stream since out of memory", id);
1!
364
        return code;
×
365
      }
366
    } else if (pCont->msgType == TDMT_VND_DELETE) {
22,084!
367
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
22,122✔
368
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
22,122✔
369

370
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0);
22,122✔
371
      if (code == TSDB_CODE_SUCCESS) {
22,111!
372
        if (*pItem == NULL) {
22,111✔
373
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
21,192✔
374
          // we need to continue check next data in the wal files.
375
          continue;
21,192✔
376
        } else {
377
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
919✔
378
        }
379
      } else {
380
        terrno = code;
×
381
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
382
        return code;
×
383
      }
384

385
    } else {
386
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
387
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
388
    }
389

390
    return code;
92,325✔
391
  }
392
}
393

394
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
15,587✔
395
  SWalReader* pWalReader = pReader->pWalReader;
15,587✔
396

397
  int64_t st = taosGetTimestampMs();
15,587✔
398
  while (1) {
20,984✔
399
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
36,571✔
400
    while (pReader->nextBlk < numOfBlocks) {
42,419✔
401
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
20,983!
402
              pReader->msg.ver);
403

404
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
20,983✔
405
      if (pSubmitTbData == NULL) {
20,983!
406
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
407
                pReader->msg.ver);
408
        return false;
×
409
      }
410
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
20,983!
411
        pReader->nextBlk += 1;
×
412
        continue;
×
413
      }
414
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
20,983!
415
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
15,137!
416
        SSDataBlock* pRes = NULL;
15,137✔
417
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
15,137✔
418
        if (code == TSDB_CODE_SUCCESS) {
15,138!
419
          return true;
15,138✔
420
        }
421
      } else {
422
        pReader->nextBlk += 1;
5,849✔
423
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
5,849!
424
      }
425
    }
426

427
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
21,436✔
428
    pReader->msg.msgStr = NULL;
21,434✔
429

430
    int64_t elapsed = taosGetTimestampMs() - st;
21,430✔
431
    if (elapsed > 1000 || elapsed < 0) {
21,430!
UNCOV
432
      return false;
×
433
    }
434

435
    // try next message in wal file
436
    if (walNextValidMsg(pWalReader) < 0) {
21,432✔
437
      return false;
449✔
438
    }
439

440
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
20,976✔
441
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
20,976✔
442
    int64_t ver = pWalReader->pHead->head.version;
20,976✔
443
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) {
20,976!
444
      return false;
×
445
    }
446
    pReader->nextBlk = 0;
20,984✔
447
  }
448
}
449

450
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
112,919✔
451
  pReader->msg.msgStr = msgStr;
112,919✔
452
  pReader->msg.msgLen = msgLen;
112,919✔
453
  pReader->msg.ver = ver;
112,919✔
454

455
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
112,919✔
456
  SDecoder decoder = {0};
112,931✔
457

458
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
112,931✔
459
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
112,897✔
460
  if (code != 0) {
112,786!
461
    tDecoderClear(&decoder);
×
462
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
463
    return code;
×
464
  }
465

466
  tDecoderClear(&decoder);
112,786✔
467
  return 0;
112,984✔
468
}
469

470
SWalReader* tqGetWalReader(STqReader* pReader) { return pReader->pWalReader; }
16,208✔
471

472
SSDataBlock* tqGetResultBlock(STqReader* pReader) { return pReader->pResBlock; }
15,587✔
473

474
int64_t tqGetResultBlockTime(STqReader* pReader) { return pReader->lastTs; }
15,586✔
475

476
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
107,786✔
477
  if (pReader->msg.msgStr == NULL) {
107,786!
478
    return false;
×
479
  }
480

481
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
107,786✔
482
  while (pReader->nextBlk < numOfBlocks) {
184,135✔
483
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
92,307✔
484
            (pReader->nextBlk + 1), numOfBlocks, idstr);
485

486
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
92,307✔
487
    if (pSubmitTbData == NULL) {
92,303!
488
      return false;
×
489
    }
490
    if (pReader->tbIdHash == NULL) {
92,303!
491
      return true;
×
492
    }
493

494
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
92,303✔
495
    if (ret != NULL) {
92,328✔
496
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
16,001✔
497
      return true;
16,001✔
498
    } else {
499
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
76,327✔
500
              taosHashGetSize(pReader->tbIdHash), idstr);
501
    }
502

503
    pReader->nextBlk++;
76,378✔
504
  }
505

506
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
91,828✔
507
  pReader->nextBlk = 0;
91,908✔
508
  pReader->msg.msgStr = NULL;
91,908✔
509

510
  return false;
91,908✔
511
}
512

513
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
156✔
514
  if (pReader->msg.msgStr == NULL) return false;
156!
515

516
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
156✔
517
  while (pReader->nextBlk < blockSz) {
156✔
518
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
78✔
519
    if (pSubmitTbData == NULL) return false;
78!
520
    if (filterOutUids == NULL) return true;
78!
521

522
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
78✔
523
    if (ret == NULL) {
78!
524
      return true;
78✔
525
    }
526
    pReader->nextBlk++;
×
527
  }
528

529
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
78✔
530
  pReader->nextBlk = 0;
78✔
531
  pReader->msg.msgStr = NULL;
78✔
532

533
  return false;
78✔
534
}
535

536
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
82✔
537
  int32_t code = 0;
82✔
538

539
  int32_t cnt = 0;
82✔
540
  for (int32_t i = 0; i < pSrc->nCols; i++) {
488✔
541
    cnt += mask[i];
406✔
542
  }
543

544
  pDst->nCols = cnt;
82✔
545
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
82✔
546
  if (pDst->pSchema == NULL) {
82!
547
    return TAOS_GET_TERRNO(terrno);
×
548
  }
549

550
  int32_t j = 0;
82✔
551
  for (int32_t i = 0; i < pSrc->nCols; i++) {
488✔
552
    if (mask[i]) {
406!
553
      pDst->pSchema[j++] = pSrc->pSchema[i];
406✔
554
      SColumnInfoData colInfo =
555
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
406✔
556
      code = blockDataAppendColInfo(pBlock, &colInfo);
406✔
557
      if (code != 0) {
406!
558
        return code;
×
559
      }
560
    }
561
  }
562
  return 0;
82✔
563
}
564

565
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
925✔
566
  SSDataBlock* pBlock = pReader->pResBlock;
925✔
567
  if (blockDataGetNumOfCols(pBlock) > 0) {
925!
568
      blockDataDestroy(pBlock);
×
569
      int32_t code = createDataBlock(&pReader->pResBlock);
×
570
      if (code) {
×
571
        return code;
×
572
      }
573
      pBlock = pReader->pResBlock;
×
574

575
      pBlock->info.id.uid = pReader->cachedSchemaUid;
×
576
      pBlock->info.version = pReader->msg.ver;
×
577
  }
578

579
  int32_t numOfCols = taosArrayGetSize(pColIdList);
925✔
580

581
  if (numOfCols == 0) {  // all columns are required
925!
582
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
583
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
584
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
585

586
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
587
      if (code != TSDB_CODE_SUCCESS) {
×
588
        blockDataFreeRes(pBlock);
×
589
        return terrno;
×
590
      }
591
    }
592
  } else {
593
    if (numOfCols > pSchema->nCols) {
925!
594
      numOfCols = pSchema->nCols;
×
595
    }
596

597
    int32_t i = 0;
925✔
598
    int32_t j = 0;
925✔
599
    while (i < pSchema->nCols && j < numOfCols) {
7,421✔
600
      SSchema* pColSchema = &pSchema->pSchema[i];
6,497✔
601
      col_id_t colIdSchema = pColSchema->colId;
6,497✔
602

603
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
6,497✔
604
      if (pColIdNeed == NULL) {
6,497!
605
        break;
×
606
      }
607
      if (colIdSchema < *pColIdNeed) {
6,497✔
608
        i++;
921✔
609
      } else if (colIdSchema > *pColIdNeed) {
5,576!
610
        j++;
×
611
      } else {
612
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
5,576✔
613
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
5,604✔
614
        if (code != TSDB_CODE_SUCCESS) {
5,575!
615
          return -1;
×
616
        }
617
        i++;
5,575✔
618
        j++;
5,575✔
619
      }
620
    }
621
  }
622

623
  return TSDB_CODE_SUCCESS;
924✔
624
}
625

626
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
21,700,915✔
627
  int32_t code = TSDB_CODE_SUCCESS;
21,700,915✔
628

629
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
22,103,653!
630
    char val[65535 + 2] = {0};
376,549✔
631
    if (COL_VAL_IS_VALUE(pColVal)) {
376,549!
632
      if (pColVal->value.pData != NULL) {
399,379!
633
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
399,562✔
634
      }
635
      varDataSetLen(val, pColVal->value.nData);
399,379✔
636
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
399,379✔
637
    } else {
638
      colDataSetNULL(pColumnInfoData, rowIndex);
×
639
    }
640
  } else {
641
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
21,324,366✔
642
  }
643

644
  return code;
21,726,034✔
645
}
646

647
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
31,129✔
648
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
31,129!
649
  int32_t        code = 0;
31,129✔
650
  int32_t        line = 0;
31,129✔
651
  STSchema*      pTSchema = NULL;
31,129✔
652
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
31,129✔
653
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
31,129!
654
  SSDataBlock* pBlock = pReader->pResBlock;
31,129✔
655
  *pRes = pBlock;
31,129✔
656

657
  blockDataCleanup(pBlock);
31,129✔
658

659
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
31,103✔
660
  int32_t sversion = pSubmitTbData->sver;
31,103✔
661
  int64_t suid = pSubmitTbData->suid;
31,103✔
662
  int64_t uid = pSubmitTbData->uid;
31,103✔
663
  pReader->lastTs = pSubmitTbData->ctimeMs;
31,103✔
664

665
  pBlock->info.id.uid = uid;
31,103✔
666
  pBlock->info.version = pReader->msg.ver;
31,103✔
667

668
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
31,103✔
669
      (pReader->cachedSchemaVer != sversion)) {
30,182✔
670
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
926!
671

672
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
925✔
673
    if (pReader->pSchemaWrapper == NULL) {
925!
UNCOV
674
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
675
             "version %d, possibly dropped table",
676
             vgId, suid, uid, pReader->cachedSchemaVer);
677
      pReader->cachedSchemaSuid = 0;
×
678
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
679
    }
680

681
    pReader->cachedSchemaUid = uid;
925✔
682
    pReader->cachedSchemaSuid = suid;
925✔
683
    pReader->cachedSchemaVer = sversion;
925✔
684

685
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
925!
686
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
687
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
688
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
689
    }
690
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
925✔
691
    TSDB_CHECK_CODE(code, line, END);
925!
692
    pBlock = pReader->pResBlock;
925✔
693
    *pRes = pBlock;
925✔
694
  }
695

696
  int32_t numOfRows = 0;
31,102✔
697
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
31,102!
698
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
699
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
700
    numOfRows = pCol->nVal;
×
701
  } else {
702
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
31,102✔
703
  }
704

705
  code = blockDataEnsureCapacity(pBlock, numOfRows);
31,103✔
706
  TSDB_CHECK_CODE(code, line, END);
31,105!
707
  pBlock->info.rows = numOfRows;
31,105✔
708
  int32_t colActual = blockDataGetNumOfCols(pBlock);
31,105✔
709

710
  // convert and scan one block
711
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
31,103!
712
    SArray* pCols = pSubmitTbData->aCol;
×
713
    int32_t numOfCols = taosArrayGetSize(pCols);
×
714
    int32_t targetIdx = 0;
×
715
    int32_t sourceIdx = 0;
×
716
    while (targetIdx < colActual) {
×
717
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
718
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
×
719
      if (sourceIdx >= numOfCols) {
×
720
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
×
721
        colDataSetNNULL(pColData, 0, numOfRows);
×
722
        targetIdx++;
×
723
        continue;
×
724
      }
725

726
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
×
727
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
728
      SColVal colVal = {0};
×
729
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
×
730
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
731
      if (pCol->cid < pColData->info.colId) {
×
732
        sourceIdx++;
×
733
      } else if (pCol->cid == pColData->info.colId) {
×
734
        for (int32_t i = 0; i < pCol->nVal; i++) {
×
735
          tColDataGetValue(pCol, i, &colVal);
×
736
          code = doSetVal(pColData, i, &colVal);
×
737
          TSDB_CHECK_CODE(code, line, END);
×
738
        }
739
        sourceIdx++;
×
740
        targetIdx++;
×
741
      } else {
742
        colDataSetNNULL(pColData, 0, numOfRows);
×
743
        targetIdx++;
×
744
      }
745
    }
746
  } else {
747
    SArray*         pRows = pSubmitTbData->aRowP;
31,103✔
748
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
31,103✔
749
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
31,103✔
750
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
31,128!
751

752
    for (int32_t i = 0; i < numOfRows; i++) {
10,195,880!
753
      SRow* pRow = taosArrayGetP(pRows, i);
10,317,671✔
754
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
10,315,729!
755
      int32_t sourceIdx = 0;
10,319,215✔
756
      for (int32_t j = 0; j < colActual; j++) {
31,220,717✔
757
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
21,055,966✔
758
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
21,049,734!
759

760
        while (1) {
10,002,478✔
761
          SColVal colVal = {0};
31,052,212✔
762
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
31,052,212✔
763
          TSDB_CHECK_CODE(code, line, END);
30,930,418!
764

765
          if (colVal.cid < pColData->info.colId) {
30,930,418✔
766
            sourceIdx++;
10,002,478✔
767
            continue;
10,002,478✔
768
          } else if (colVal.cid == pColData->info.colId) {
20,927,940!
769
            code = doSetVal(pColData, i, &colVal);
21,122,025✔
770
            TSDB_CHECK_CODE(code, line, END);
21,095,587!
771
            sourceIdx++;
21,095,587✔
772
            break;
20,901,502✔
773
          } else {
774
            colDataSetNULL(pColData, i);
×
775
            break;
×
776
          }
777
        }
778
      }
779
    }
780
  }
781

782
END:
×
783
  if (code != 0) {
×
784
    tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code);
×
785
  }
786
  taosMemoryFreeClear(pTSchema);
31,120!
787
  return code;
31,135✔
788
}
789

790
#define PROCESS_VAL                                      \
791
  if (curRow == 0) {                                     \
792
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
793
    buildNew = true;                                     \
794
  } else {                                               \
795
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
796
    if (currentRowAssigned != assigned[j]) {             \
797
      assigned[j] = currentRowAssigned;                  \
798
      buildNew = true;                                   \
799
    }                                                    \
800
  }
801

802
#define SET_DATA                                                     \
803
  if (colVal.cid < pColData->info.colId) {                           \
804
    sourceIdx++;                                                     \
805
  } else if (colVal.cid == pColData->info.colId) {                   \
806
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
807
    sourceIdx++;                                                     \
808
    targetIdx++;                                                     \
809
  }
810

811
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
82✔
812
                               SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
813
                               int32_t* lastRow) {
814
  int32_t         code = 0;
82✔
815
  SSchemaWrapper* pSW = NULL;
82✔
816
  SSDataBlock*    block = NULL;
82✔
817
  if (taosArrayGetSize(blocks) > 0) {
82!
818
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
819
    TQ_NULL_GO_TO_END(pLastBlock);
×
820
    pLastBlock->info.rows = curRow - *lastRow;
×
821
    *lastRow = curRow;
×
822
  }
823

824
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
82✔
825
  TQ_NULL_GO_TO_END(block);
82!
826

827
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
82✔
828
  TQ_NULL_GO_TO_END(pSW);
82!
829

830
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
82!
831
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
82!
832
          (int32_t)taosArrayGetSize(block->pDataBlock));
833

834
  block->info.id.uid = pSubmitTbData->uid;
82✔
835
  block->info.version = pReader->msg.ver;
82✔
836
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
82!
837
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
82!
838
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
82!
839
  pSW = NULL;
82✔
840
  taosMemoryFreeClear(block);
82!
841

842
END:
×
843
  tDeleteSchemaWrapper(pSW);
82!
844
  blockDataFreeRes(block);
82✔
845
  taosMemoryFree(block);
82✔
846
  return code;
82✔
847
}
848
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
×
849
  int32_t code = 0;
×
850
  int32_t curRow = 0;
×
851
  int32_t lastRow = 0;
×
852

853
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
×
854
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
×
855
  TQ_NULL_GO_TO_END(assigned);
×
856

857
  SArray*   pCols = pSubmitTbData->aCol;
×
858
  SColData* pCol = taosArrayGet(pCols, 0);
×
859
  TQ_NULL_GO_TO_END(pCol);
×
860
  int32_t numOfRows = pCol->nVal;
×
861
  int32_t numOfCols = taosArrayGetSize(pCols);
×
862
  for (int32_t i = 0; i < numOfRows; i++) {
×
863
    bool buildNew = false;
×
864

865
    for (int32_t j = 0; j < numOfCols; j++) {
×
866
      pCol = taosArrayGet(pCols, j);
×
867
      TQ_NULL_GO_TO_END(pCol);
×
868
      SColVal colVal = {0};
×
869
      tColDataGetValue(pCol, i, &colVal);
×
870
      PROCESS_VAL
×
871
    }
872

873
    if (buildNew) {
×
874
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
×
875
                                       curRow, &lastRow));
876
    }
877

878
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
×
879
    TQ_NULL_GO_TO_END(pBlock);
×
880

881
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
×
882
            (int32_t)taosArrayGetSize(blocks));
883

884
    int32_t targetIdx = 0;
×
885
    int32_t sourceIdx = 0;
×
886
    int32_t colActual = blockDataGetNumOfCols(pBlock);
×
887
    while (targetIdx < colActual) {
×
888
      pCol = taosArrayGet(pCols, sourceIdx);
×
889
      TQ_NULL_GO_TO_END(pCol);
×
890
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
891
      TQ_NULL_GO_TO_END(pColData);
×
892
      SColVal colVal = {0};
×
893
      tColDataGetValue(pCol, i, &colVal);
×
894
      SET_DATA
×
895
    }
896

897
    curRow++;
×
898
  }
899
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
900
  pLastBlock->info.rows = curRow - lastRow;
×
901

902
END:
×
903
  taosMemoryFree(assigned);
×
904
  return code;
×
905
}
906

907
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
82✔
908
  int32_t   code = 0;
82✔
909
  STSchema* pTSchema = NULL;
82✔
910

911
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
82✔
912
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
82✔
913
  TQ_NULL_GO_TO_END(assigned);
82!
914

915
  int32_t curRow = 0;
82✔
916
  int32_t lastRow = 0;
82✔
917
  SArray* pRows = pSubmitTbData->aRowP;
82✔
918
  int32_t numOfRows = taosArrayGetSize(pRows);
82✔
919
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
82✔
920

921
  for (int32_t i = 0; i < numOfRows; i++) {
120,063✔
922
    bool  buildNew = false;
119,981✔
923
    SRow* pRow = taosArrayGetP(pRows, i);
119,981✔
924
    TQ_NULL_GO_TO_END(pRow);
119,976!
925

926
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
712,353✔
927
      SColVal colVal = {0};
592,690✔
928
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
592,690!
929
      PROCESS_VAL
592,378!
930
    }
931

932
    if (buildNew) {
119,663✔
933
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
82!
934
                                       curRow, &lastRow));
935
    }
936

937
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
119,663✔
938
    TQ_NULL_GO_TO_END(pBlock);
119,352!
939

940
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
119,352!
941
            (int32_t)taosArrayGetSize(blocks));
942

943
    int32_t targetIdx = 0;
119,352✔
944
    int32_t sourceIdx = 0;
119,352✔
945
    int32_t colActual = blockDataGetNumOfCols(pBlock);
119,352✔
946
    while (targetIdx < colActual) {
716,132✔
947
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
595,351✔
948
      SColVal          colVal = {0};
595,264✔
949
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
595,264!
950
      SET_DATA
594,472!
951
    }
952

953
    curRow++;
120,781✔
954
  }
955
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
82✔
956
  pLastBlock->info.rows = curRow - lastRow;
82✔
957

958
END:
82✔
959
  taosMemoryFreeClear(pTSchema);
82!
960
  taosMemoryFree(assigned);
82✔
961
  return code;
82✔
962
}
963

964
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
82✔
965
  tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
82!
966
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
82✔
967
  if (pSubmitTbData == NULL) {
82!
968
    return terrno;
×
969
  }
970
  pReader->nextBlk++;
82✔
971

972
  if (pSubmitTbDataRet) {
82!
973
    *pSubmitTbDataRet = pSubmitTbData;
82✔
974
  }
975

976
  int32_t sversion = pSubmitTbData->sver;
82✔
977
  int64_t uid = pSubmitTbData->uid;
82✔
978
  pReader->lastBlkUid = uid;
82✔
979

980
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
82✔
981
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
82✔
982
  if (pReader->pSchemaWrapper == NULL) {
82!
983
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
×
984
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
985
    pReader->cachedSchemaSuid = 0;
×
986
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
987
  }
988

989
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
82!
990
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
×
991
  } else {
992
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
82✔
993
  }
994
}
995

996
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
2,915✔
997

998
void tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
2,920✔
999
  if (pReader->tbIdHash) {
2,920!
UNCOV
1000
    taosHashClear(pReader->tbIdHash);
×
1001
  } else {
1002
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
2,920✔
1003
    if (pReader->tbIdHash == NULL) {
2,919!
1004
      tqError("s-task:%s failed to init hash table", id);
×
1005
      return;
×
1006
    }
1007
  }
1008

1009
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
6,659✔
1010
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
3,737✔
1011
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
3,736!
1012
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
3!
1013
      continue;
×
1014
    }
1015
  }
1016

1017
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
2,919✔
1018
}
1019

1020
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
5,163✔
1021
  if (pReader->tbIdHash == NULL) {
5,163!
1022
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1023
    if (pReader->tbIdHash == NULL) {
×
1024
      tqError("failed to init hash table");
×
1025
      return;
×
1026
    }
1027
  }
1028

1029
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
5,163✔
1030
  for (int i = 0; i < numOfTables; i++) {
5,369✔
1031
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
205✔
1032
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
205!
1033
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1034
      continue;
×
1035
    }
1036
  }
1037
}
1038

1039
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
939✔
1040
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t));
939✔
1041
}
1042

1043
bool tqCurrentBlockConsumed(const STqReader* pReader) { return pReader->msg.msgStr == NULL; }
122,373✔
1044

1045
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
10✔
1046
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
15✔
1047
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
5✔
1048
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
5!
1049
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1050
    }
1051
  }
1052
}
10✔
1053

1054
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
19,159✔
1055
  void*   pIter = NULL;
19,159✔
1056
  int32_t vgId = TD_VID(pTq->pVnode);
19,159✔
1057

1058
  // update the table list for each consumer handle
1059
  taosWLockLatch(&pTq->lock);
19,159✔
1060
  while (1) {
2✔
1061
    pIter = taosHashIterate(pTq->pHandle, pIter);
19,164✔
1062
    if (pIter == NULL) {
19,164✔
1063
      break;
19,160✔
1064
    }
1065

1066
    STqHandle* pTqHandle = (STqHandle*)pIter;
4✔
1067
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
4!
UNCOV
1068
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
×
UNCOV
1069
      if (code != 0) {
×
1070
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1071
        continue;
×
1072
      }
1073
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
4✔
1074
      if (!isAdd) {
2!
1075
        int32_t sz = taosArrayGetSize(tbUidList);
2✔
1076
        for (int32_t i = 0; i < sz; i++) {
102✔
1077
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
100✔
1078
          if (tbUid &&
200!
1079
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
100✔
1080
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1081
            continue;
×
1082
          }
1083
        }
1084
      }
1085
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
2!
UNCOV
1086
      if (isAdd) {
×
UNCOV
1087
        SArray* list = NULL;
×
UNCOV
1088
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
×
1089
                                    &list, pTqHandle->execHandle.task);
UNCOV
1090
        if (ret != TDB_CODE_SUCCESS) {
×
1091
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1092
                  pTqHandle->consumerId);
1093
          taosArrayDestroy(list);
×
1094
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1095
          taosWUnLockLatch(&pTq->lock);
×
1096

1097
          return ret;
×
1098
        }
UNCOV
1099
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
×
UNCOV
1100
        taosArrayDestroy(list);
×
1101
      } else {
1102
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1103
      }
1104
    }
1105
  }
1106
  taosWUnLockLatch(&pTq->lock);
19,160✔
1107

1108
  // update the table list handle for each stream scanner/wal reader
1109
  streamMetaWLock(pTq->pStreamMeta);
19,160✔
1110
  while (1) {
10,442✔
1111
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
29,602✔
1112
    if (pIter == NULL) {
29,596✔
1113
      break;
19,158✔
1114
    }
1115

1116
    int64_t      refId = *(int64_t*)pIter;
10,438✔
1117
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
10,438✔
1118
    if (pTask != NULL) {
10,444!
1119
      int32_t taskId = pTask->id.taskId;
10,444✔
1120

1121
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
10,444✔
1122
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
5,181✔
1123
        if (code != 0) {
5,177✔
1124
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
20!
1125
        }
1126
      }
1127
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
10,440✔
1128
      if (ret) {
10,442!
1129
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1130
      }
1131
    }
1132
  }
1133

1134
  streamMetaWUnLock(pTq->pStreamMeta);
19,158✔
1135
  return 0;
19,159✔
1136
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc