• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3621

22 Feb 2025 11:44AM UTC coverage: 2.037% (-61.5%) from 63.573%
#3621

push

travis-ci

web-flow
Merge pull request #29874 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

4357 of 287032 branches covered (1.52%)

Branch coverage included in aggregate %.

0 of 174 new or added lines in 18 files covered. (0.0%)

213359 existing lines in 469 files now uncovered.

7260 of 283369 relevant lines covered (2.56%)

23737.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

UNCOV
19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
×
UNCOV
20
  if (pHandle == NULL || pHead == NULL) {
×
21
    return false;
×
22
  }
UNCOV
23
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
×
UNCOV
24
    return true;
×
25
  }
26

UNCOV
27
  int16_t msgType = pHead->msgType;
×
UNCOV
28
  char*   body = pHead->body;
×
UNCOV
29
  int32_t bodyLen = pHead->bodyLen;
×
30

UNCOV
31
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
×
UNCOV
32
  int64_t  realTbSuid = 0;
×
UNCOV
33
  SDecoder dcoder = {0};
×
UNCOV
34
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
×
UNCOV
35
  int32_t  len = bodyLen - sizeof(SMsgHead);
×
UNCOV
36
  tDecoderInit(&dcoder, data, len);
×
37

UNCOV
38
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
×
UNCOV
39
    SVCreateStbReq req = {0};
×
UNCOV
40
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
×
41
      goto end;
×
42
    }
UNCOV
43
    realTbSuid = req.suid;
×
UNCOV
44
  } else if (msgType == TDMT_VND_DROP_STB) {
×
45
    SVDropStbReq req = {0};
×
46
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
47
      goto end;
×
48
    }
49
    realTbSuid = req.suid;
×
UNCOV
50
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
×
UNCOV
51
    SVCreateTbBatchReq req = {0};
×
UNCOV
52
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
×
53
      goto end;
×
54
    }
55

UNCOV
56
    int32_t        needRebuild = 0;
×
UNCOV
57
    SVCreateTbReq* pCreateReq = NULL;
×
UNCOV
58
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
UNCOV
59
      pCreateReq = req.pReqs + iReq;
×
UNCOV
60
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
61
        needRebuild++;
×
62
      }
63
    }
UNCOV
64
    if (needRebuild == 0) {
×
65
      // do nothing
66
    } else if (needRebuild == req.nReqs) {
×
67
      realTbSuid = tbSuid;
×
68
    } else {
69
      realTbSuid = tbSuid;
×
70
      SVCreateTbBatchReq reqNew = {0};
×
71
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
72
      if (reqNew.pArray == NULL) {
×
73
        tDeleteSVCreateTbBatchReq(&req);
×
74
        goto end;
×
75
      }
76
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
77
        pCreateReq = req.pReqs + iReq;
×
78
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
79
          reqNew.nReqs++;
×
80
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
81
            taosArrayDestroy(reqNew.pArray);
×
82
            tDeleteSVCreateTbBatchReq(&req);
×
83
            goto end;
×
84
          }
85
        }
86
      }
87

88
      int     tlen = 0;
×
89
      int32_t ret = 0;
×
90
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
91
      void* buf = taosMemoryMalloc(tlen);
×
92
      if (NULL == buf) {
×
93
        taosArrayDestroy(reqNew.pArray);
×
94
        tDeleteSVCreateTbBatchReq(&req);
×
95
        goto end;
×
96
      }
97
      SEncoder coderNew = {0};
×
98
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
99
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
100
      tEncoderClear(&coderNew);
×
101
      if (ret < 0) {
×
102
        taosMemoryFree(buf);
×
103
        taosArrayDestroy(reqNew.pArray);
×
104
        tDeleteSVCreateTbBatchReq(&req);
×
105
        goto end;
×
106
      }
107
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
108
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
109
      taosMemoryFree(buf);
×
110
      taosArrayDestroy(reqNew.pArray);
×
111
    }
112

UNCOV
113
    tDeleteSVCreateTbBatchReq(&req);
×
114
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
115
    SVAlterTbReq req = {0};
×
116

117
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
118
      goto end;
×
119
    }
120

121
    SMetaReader mr = {0};
×
122
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
123

124
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
125
      metaReaderClear(&mr);
×
126
      goto end;
×
127
    }
128
    realTbSuid = mr.me.ctbEntry.suid;
×
129
    metaReaderClear(&mr);
×
130
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
131
    SVDropTbBatchReq req = {0};
×
132

133
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
134
      goto end;
×
135
    }
136

137
    int32_t      needRebuild = 0;
×
138
    SVDropTbReq* pDropReq = NULL;
×
139
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
140
      pDropReq = req.pReqs + iReq;
×
141

142
      if (pDropReq->suid == tbSuid) {
×
143
        needRebuild++;
×
144
      }
145
    }
146
    if (needRebuild == 0) {
×
147
      // do nothing
148
    } else if (needRebuild == req.nReqs) {
×
149
      realTbSuid = tbSuid;
×
150
    } else {
151
      realTbSuid = tbSuid;
×
152
      SVDropTbBatchReq reqNew = {0};
×
153
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
154
      if (reqNew.pArray == NULL) {
×
155
        goto end;
×
156
      }
157
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
158
        pDropReq = req.pReqs + iReq;
×
159
        if (pDropReq->suid == tbSuid) {
×
160
          reqNew.nReqs++;
×
161
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
162
            taosArrayDestroy(reqNew.pArray);
×
163
            goto end;
×
164
          }
165
        }
166
      }
167

168
      int     tlen = 0;
×
169
      int32_t ret = 0;
×
170
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
171
      void* buf = taosMemoryMalloc(tlen);
×
172
      if (NULL == buf) {
×
173
        taosArrayDestroy(reqNew.pArray);
×
174
        goto end;
×
175
      }
176
      SEncoder coderNew = {0};
×
177
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
178
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
179
      tEncoderClear(&coderNew);
×
180
      if (ret != 0) {
×
181
        taosMemoryFree(buf);
×
182
        taosArrayDestroy(reqNew.pArray);
×
183
        goto end;
×
184
      }
185
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
186
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
187
      taosMemoryFree(buf);
×
188
      taosArrayDestroy(reqNew.pArray);
×
189
    }
190
  } else if (msgType == TDMT_VND_DELETE) {
×
191
    SDeleteRes req = {0};
×
192
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
193
      goto end;
×
194
    }
195
    realTbSuid = req.suid;
×
196
  }
197

198
end:
×
UNCOV
199
  tDecoderClear(&dcoder);
×
UNCOV
200
  bool tmp = tbSuid == realTbSuid;
×
UNCOV
201
  tqDebug("%s suid:%"PRId64" realSuid:%"PRId64" return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
×
UNCOV
202
  return tmp;
×
203
}
204

UNCOV
205
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
×
UNCOV
206
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
×
207
    return -1;
×
208
  }
UNCOV
209
  int32_t code = -1;
×
UNCOV
210
  int32_t vgId = TD_VID(pTq->pVnode);
×
UNCOV
211
  int64_t id = pHandle->pWalReader->readerId;
×
212

UNCOV
213
  int64_t offset = *fetchOffset;
×
UNCOV
214
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
×
UNCOV
215
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
×
UNCOV
216
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
×
217

UNCOV
218
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
×
219
          ", 0x%" PRIx64,
220
          vgId, offset, lastVer, committedVer, appliedVer, id);
221

UNCOV
222
  while (offset <= appliedVer) {
×
UNCOV
223
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
×
224
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
225
              ", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
226
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
227
      goto END;
×
228
    }
229

UNCOV
230
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
×
231
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
232

UNCOV
233
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
×
UNCOV
234
      code = walFetchBody(pHandle->pWalReader);
×
UNCOV
235
      goto END;
×
236
    } else {
UNCOV
237
      if (pHandle->fetchMeta != WITH_DATA) {
×
UNCOV
238
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
×
UNCOV
239
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
×
UNCOV
240
          code = walFetchBody(pHandle->pWalReader);
×
UNCOV
241
          if (code < 0) {
×
242
            goto END;
×
243
          }
244

UNCOV
245
          pHead = &(pHandle->pWalReader->pHead->head);
×
UNCOV
246
          if (isValValidForTable(pHandle, pHead)) {
×
UNCOV
247
            code = 0;
×
UNCOV
248
            goto END;
×
249
          } else {
UNCOV
250
            offset++;
×
UNCOV
251
            code = -1;
×
UNCOV
252
            continue;
×
253
          }
254
        }
255
      }
UNCOV
256
      code = walSkipFetchBody(pHandle->pWalReader);
×
UNCOV
257
      if (code < 0) {
×
258
        goto END;
×
259
      }
UNCOV
260
      offset++;
×
261
    }
UNCOV
262
    code = -1;
×
263
  }
264

UNCOV
265
END:
×
UNCOV
266
  *fetchOffset = offset;
×
UNCOV
267
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64 ", 0x%" PRIx64,
×
268
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
UNCOV
269
  return code;
×
270
}
271

UNCOV
272
bool tqGetTablePrimaryKey(STqReader* pReader) {
×
UNCOV
273
  if (pReader == NULL) {
×
274
    return false;
×
275
  }
UNCOV
276
  return pReader->hasPrimaryKey;
×
277
}
278

UNCOV
279
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
×
UNCOV
280
  tqDebug("%s:%p uid:%"PRId64, __FUNCTION__ , pReader, uid);
×
281

UNCOV
282
  if (pReader == NULL) {
×
283
    return;
×
284
  }
UNCOV
285
  bool            ret = false;
×
UNCOV
286
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1);
×
UNCOV
287
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
×
288
    ret = true;
×
289
  }
290
  tDeleteSchemaWrapper(schema);
UNCOV
291
  pReader->hasPrimaryKey = ret;
×
292
}
293

UNCOV
294
STqReader* tqReaderOpen(SVnode* pVnode) {
×
UNCOV
295
  tqDebug("%s:%p", __FUNCTION__ , pVnode);
×
UNCOV
296
  if (pVnode == NULL) {
×
297
    return NULL;
×
298
  }
UNCOV
299
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
×
UNCOV
300
  if (pReader == NULL) {
×
301
    return NULL;
×
302
  }
303

UNCOV
304
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
×
UNCOV
305
  if (pReader->pWalReader == NULL) {
×
306
    taosMemoryFree(pReader);
×
307
    return NULL;
×
308
  }
309

UNCOV
310
  pReader->pVnodeMeta = pVnode->pMeta;
×
UNCOV
311
  pReader->pColIdList = NULL;
×
UNCOV
312
  pReader->cachedSchemaVer = 0;
×
UNCOV
313
  pReader->cachedSchemaSuid = 0;
×
UNCOV
314
  pReader->pSchemaWrapper = NULL;
×
UNCOV
315
  pReader->tbIdHash = NULL;
×
UNCOV
316
  pReader->pResBlock = NULL;
×
317

UNCOV
318
  int32_t code = createDataBlock(&pReader->pResBlock);
×
UNCOV
319
  if (code) {
×
320
    terrno = code;
×
321
  }
322

UNCOV
323
  return pReader;
×
324
}
325

UNCOV
326
void tqReaderClose(STqReader* pReader) {
×
UNCOV
327
  tqDebug("%s:%p", __FUNCTION__ , pReader);
×
UNCOV
328
  if (pReader == NULL) return;
×
329

330
  // close wal reader
UNCOV
331
  if (pReader->pWalReader) {
×
UNCOV
332
    walCloseReader(pReader->pWalReader);
×
333
  }
334

UNCOV
335
  if (pReader->pSchemaWrapper) {
×
UNCOV
336
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
×
337
  }
338

UNCOV
339
  if (pReader->pColIdList) {
×
UNCOV
340
    taosArrayDestroy(pReader->pColIdList);
×
341
  }
342

343
  // free hash
UNCOV
344
  blockDataDestroy(pReader->pResBlock);
×
UNCOV
345
  taosHashCleanup(pReader->tbIdHash);
×
UNCOV
346
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
×
UNCOV
347
  taosMemoryFree(pReader);
×
348
}
349

UNCOV
350
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
×
UNCOV
351
  if (pReader == NULL) {
×
352
    return TSDB_CODE_INVALID_PARA;
×
353
  }
UNCOV
354
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
×
UNCOV
355
    return terrno;
×
356
  }
UNCOV
357
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
×
UNCOV
358
  return 0;
×
359
}
360

UNCOV
361
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
×
UNCOV
362
  int32_t code = 0;
×
363

UNCOV
364
  while (1) {
×
UNCOV
365
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
×
366

UNCOV
367
    SWalCont* pCont = &pReader->pHead->head;
×
UNCOV
368
    int64_t   ver = pCont->version;
×
UNCOV
369
    if (ver > maxVer) {
×
UNCOV
370
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
×
UNCOV
371
      return TSDB_CODE_SUCCESS;
×
372
    }
373

UNCOV
374
    if (pCont->msgType == TDMT_VND_SUBMIT) {
×
UNCOV
375
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
×
UNCOV
376
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
×
377

UNCOV
378
      void* data = taosMemoryMalloc(len);
×
UNCOV
379
      if (data == NULL) {
×
380
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
381
        // retry
382
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
383
        return terrno;
×
384
      }
385

UNCOV
386
      (void)memcpy(data, pBody, len);
×
UNCOV
387
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
×
388

UNCOV
389
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
×
UNCOV
390
      if (code != 0) {
×
391
        tqError("%s failed to create data submit for stream since out of memory", id);
×
392
        return code;
×
393
      }
UNCOV
394
    } else if (pCont->msgType == TDMT_VND_DELETE) {
×
UNCOV
395
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
×
UNCOV
396
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
×
UNCOV
397
      EStreamType blockType = STREAM_DELETE_DATA;
×
UNCOV
398
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
×
UNCOV
399
      if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
400
        if (*pItem == NULL) {
×
UNCOV
401
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
×
402
          // we need to continue check next data in the wal files.
UNCOV
403
          continue;
×
404
        } else {
UNCOV
405
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
×
406
        }
407
      } else {
408
        terrno = code;
×
409
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
410
        return code;
×
411
      }
412

UNCOV
413
    } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
×
UNCOV
414
      void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
×
UNCOV
415
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
×
UNCOV
416
      code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
×
UNCOV
417
      if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
418
        if (!*pItem) {
×
419
          continue;
×
420
        } else {
UNCOV
421
          tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
×
422
        }
423
      } else {
424
        terrno = code;
×
425
        return code;
×
426
      }
427
    } else {
428
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
429
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
430
    }
431

UNCOV
432
    return code;
×
433
  }
434
}
435

UNCOV
436
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
×
UNCOV
437
  if (pReader == NULL) {
×
438
    return false;
×
439
  }
UNCOV
440
  SWalReader* pWalReader = pReader->pWalReader;
×
441

UNCOV
442
  int64_t st = taosGetTimestampMs();
×
UNCOV
443
  while (1) {
×
UNCOV
444
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
UNCOV
445
    while (pReader->nextBlk < numOfBlocks) {
×
UNCOV
446
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
447
              pReader->msg.ver);
448

UNCOV
449
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
UNCOV
450
      if (pSubmitTbData == NULL) {
×
451
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
452
                pReader->msg.ver);
453
        return false;
×
454
      }
UNCOV
455
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
×
UNCOV
456
        pReader->nextBlk += 1;
×
UNCOV
457
        continue;
×
458
      }
UNCOV
459
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
×
UNCOV
460
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
×
UNCOV
461
        SSDataBlock* pRes = NULL;
×
UNCOV
462
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
×
UNCOV
463
        if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
464
          return true;
×
465
        }
466
      } else {
UNCOV
467
        pReader->nextBlk += 1;
×
UNCOV
468
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
×
469
      }
470
    }
471

UNCOV
472
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
×
UNCOV
473
    pReader->msg.msgStr = NULL;
×
474

UNCOV
475
    int64_t elapsed = taosGetTimestampMs() - st;
×
UNCOV
476
    if (elapsed > 1000 || elapsed < 0) {
×
477
      return false;
×
478
    }
479

480
    // try next message in wal file
UNCOV
481
    if (walNextValidMsg(pWalReader) < 0) {
×
UNCOV
482
      return false;
×
483
    }
484

UNCOV
485
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
×
UNCOV
486
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
×
UNCOV
487
    int64_t ver = pWalReader->pHead->head.version;
×
UNCOV
488
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL) != 0) {
×
489
      return false;
×
490
    }
UNCOV
491
    pReader->nextBlk = 0;
×
492
  }
493
}
494

UNCOV
495
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList) {
×
UNCOV
496
  if (pReader == NULL) {
×
497
    return TSDB_CODE_INVALID_PARA;
×
498
  }
UNCOV
499
  pReader->msg.msgStr = msgStr;
×
UNCOV
500
  pReader->msg.msgLen = msgLen;
×
UNCOV
501
  pReader->msg.ver = ver;
×
502

UNCOV
503
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
×
UNCOV
504
  SDecoder decoder = {0};
×
505

UNCOV
506
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
×
UNCOV
507
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit, rawList);
×
UNCOV
508
  tDecoderClear(&decoder);
×
509

UNCOV
510
  if (code != 0) {
×
511
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
512
  }
513

UNCOV
514
  return code;
×
515
}
516

UNCOV
517
void tqReaderClearSubmitMsg(STqReader *pReader) {
×
UNCOV
518
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
×
UNCOV
519
  pReader->nextBlk = 0;
×
UNCOV
520
  pReader->msg.msgStr = NULL;
×
UNCOV
521
}
×
522

523

UNCOV
524
SWalReader* tqGetWalReader(STqReader* pReader) {
×
UNCOV
525
  if (pReader == NULL) {
×
526
    return NULL;
×
527
  }
UNCOV
528
  return pReader->pWalReader;
×
529
}
530

UNCOV
531
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
×
UNCOV
532
  if (pReader == NULL) {
×
533
    return NULL;
×
534
  }
UNCOV
535
  return pReader->pResBlock;
×
536
}
537

UNCOV
538
int64_t tqGetResultBlockTime(STqReader* pReader) {
×
UNCOV
539
  if (pReader == NULL) {
×
540
    return 0;
×
541
  }
UNCOV
542
  return pReader->lastTs;
×
543
}
544

UNCOV
545
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
×
UNCOV
546
  int32_t code = false;
×
UNCOV
547
  int32_t lino = 0;
×
UNCOV
548
  int64_t uid = 0;
×
UNCOV
549
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
×
UNCOV
550
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
×
UNCOV
551
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
×
552

UNCOV
553
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
UNCOV
554
  while (pReader->nextBlk < blockSz) {
×
UNCOV
555
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
UNCOV
556
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
×
UNCOV
557
    uid = pSubmitTbData->uid;
×
UNCOV
558
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
×
UNCOV
559
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
×
560

UNCOV
561
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
×
UNCOV
562
    pReader->nextBlk++;
×
563
  }
564

UNCOV
565
  tqReaderClearSubmitMsg(pReader);
×
UNCOV
566
  tqTrace("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
×
567

UNCOV
568
END:
×
UNCOV
569
  tqTrace("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
×
UNCOV
570
  return code;
×
571
}
572

UNCOV
573
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
×
UNCOV
574
  int32_t code = false;
×
UNCOV
575
  int32_t lino = 0;
×
UNCOV
576
  int64_t uid = 0;
×
577

UNCOV
578
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
×
UNCOV
579
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
×
UNCOV
580
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
×
581

UNCOV
582
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
UNCOV
583
  while (pReader->nextBlk < blockSz) {
×
UNCOV
584
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
UNCOV
585
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
×
UNCOV
586
    uid = pSubmitTbData->uid;
×
UNCOV
587
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
×
UNCOV
588
    TSDB_CHECK_NULL(ret, code, lino, END, true);
×
589
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
×
590
    pReader->nextBlk++;
×
591
  }
UNCOV
592
  tqReaderClearSubmitMsg(pReader);
×
UNCOV
593
  tqTrace("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
×
594

UNCOV
595
END:
×
UNCOV
596
  tqTrace("%s:%d get data:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
×
UNCOV
597
  return code;
×
598
}
599

UNCOV
600
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
×
UNCOV
601
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
×
602
    return TSDB_CODE_INVALID_PARA;
×
603
  }
UNCOV
604
  int32_t code = 0;
×
605

UNCOV
606
  int32_t cnt = 0;
×
UNCOV
607
  for (int32_t i = 0; i < pSrc->nCols; i++) {
×
UNCOV
608
    cnt += mask[i];
×
609
  }
610

UNCOV
611
  pDst->nCols = cnt;
×
UNCOV
612
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
×
UNCOV
613
  if (pDst->pSchema == NULL) {
×
614
    return TAOS_GET_TERRNO(terrno);
×
615
  }
616

UNCOV
617
  int32_t j = 0;
×
UNCOV
618
  for (int32_t i = 0; i < pSrc->nCols; i++) {
×
UNCOV
619
    if (mask[i]) {
×
UNCOV
620
      pDst->pSchema[j++] = pSrc->pSchema[i];
×
621
      SColumnInfoData colInfo =
UNCOV
622
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
×
UNCOV
623
      code = blockDataAppendColInfo(pBlock, &colInfo);
×
UNCOV
624
      if (code != 0) {
×
625
        return code;
×
626
      }
627
    }
628
  }
UNCOV
629
  return 0;
×
630
}
631

UNCOV
632
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
×
UNCOV
633
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
×
634
    return TSDB_CODE_INVALID_PARA;
×
635
  }
UNCOV
636
  SSDataBlock* pBlock = pReader->pResBlock;
×
UNCOV
637
  if (blockDataGetNumOfCols(pBlock) > 0) {
×
UNCOV
638
      blockDataDestroy(pBlock);
×
UNCOV
639
      int32_t code = createDataBlock(&pReader->pResBlock);
×
UNCOV
640
      if (code) {
×
641
        return code;
×
642
      }
UNCOV
643
      pBlock = pReader->pResBlock;
×
644

UNCOV
645
      pBlock->info.id.uid = pReader->cachedSchemaUid;
×
UNCOV
646
      pBlock->info.version = pReader->msg.ver;
×
647
  }
648

UNCOV
649
  int32_t numOfCols = taosArrayGetSize(pColIdList);
×
650

UNCOV
651
  if (numOfCols == 0) {  // all columns are required
×
652
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
653
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
654
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
655

656
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
657
      if (code != TSDB_CODE_SUCCESS) {
×
658
        blockDataFreeRes(pBlock);
×
659
        return terrno;
×
660
      }
661
    }
662
  } else {
UNCOV
663
    if (numOfCols > pSchema->nCols) {
×
UNCOV
664
      numOfCols = pSchema->nCols;
×
665
    }
666

UNCOV
667
    int32_t i = 0;
×
UNCOV
668
    int32_t j = 0;
×
UNCOV
669
    while (i < pSchema->nCols && j < numOfCols) {
×
UNCOV
670
      SSchema* pColSchema = &pSchema->pSchema[i];
×
UNCOV
671
      col_id_t colIdSchema = pColSchema->colId;
×
672

UNCOV
673
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
×
UNCOV
674
      if (pColIdNeed == NULL) {
×
675
        break;
×
676
      }
UNCOV
677
      if (colIdSchema < *pColIdNeed) {
×
UNCOV
678
        i++;
×
UNCOV
679
      } else if (colIdSchema > *pColIdNeed) {
×
680
        j++;
×
681
      } else {
UNCOV
682
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
UNCOV
683
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
×
UNCOV
684
        if (code != TSDB_CODE_SUCCESS) {
×
685
          return -1;
×
686
        }
UNCOV
687
        i++;
×
UNCOV
688
        j++;
×
689
      }
690
    }
691
  }
692

UNCOV
693
  return TSDB_CODE_SUCCESS;
×
694
}
695

UNCOV
696
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
×
UNCOV
697
  int32_t code = TSDB_CODE_SUCCESS;
×
698

UNCOV
699
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
×
UNCOV
700
    char val[65535 + 2] = {0};
×
UNCOV
701
    if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
702
      if (pColVal->value.pData != NULL) {
×
UNCOV
703
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
×
704
      }
UNCOV
705
      varDataSetLen(val, pColVal->value.nData);
×
UNCOV
706
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
×
707
    } else {
708
      colDataSetNULL(pColumnInfoData, rowIndex);
×
709
    }
710
  } else {
UNCOV
711
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
×
712
  }
713

UNCOV
714
  return code;
×
715
}
716

UNCOV
717
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
×
UNCOV
718
  if (pReader == NULL || pRes == NULL) {
×
719
    return TSDB_CODE_INVALID_PARA;
×
720
  }
UNCOV
721
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
×
UNCOV
722
  int32_t        code = 0;
×
UNCOV
723
  int32_t        line = 0;
×
UNCOV
724
  STSchema*      pTSchema = NULL;
×
UNCOV
725
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
×
UNCOV
726
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
×
UNCOV
727
  SSDataBlock* pBlock = pReader->pResBlock;
×
UNCOV
728
  *pRes = pBlock;
×
729

UNCOV
730
  blockDataCleanup(pBlock);
×
731

UNCOV
732
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
×
UNCOV
733
  int32_t sversion = pSubmitTbData->sver;
×
UNCOV
734
  int64_t suid = pSubmitTbData->suid;
×
UNCOV
735
  int64_t uid = pSubmitTbData->uid;
×
UNCOV
736
  pReader->lastTs = pSubmitTbData->ctimeMs;
×
737

UNCOV
738
  pBlock->info.id.uid = uid;
×
UNCOV
739
  pBlock->info.version = pReader->msg.ver;
×
740

UNCOV
741
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
×
UNCOV
742
      (pReader->cachedSchemaVer != sversion)) {
×
UNCOV
743
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
×
744

UNCOV
745
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
×
UNCOV
746
    if (pReader->pSchemaWrapper == NULL) {
×
UNCOV
747
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
748
             "version %d, possibly dropped table",
749
             vgId, suid, uid, pReader->cachedSchemaVer);
UNCOV
750
      pReader->cachedSchemaSuid = 0;
×
UNCOV
751
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
752
    }
753

UNCOV
754
    pReader->cachedSchemaUid = uid;
×
UNCOV
755
    pReader->cachedSchemaSuid = suid;
×
UNCOV
756
    pReader->cachedSchemaVer = sversion;
×
757

UNCOV
758
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
×
759
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
760
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
761
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
762
    }
UNCOV
763
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
×
UNCOV
764
    TSDB_CHECK_CODE(code, line, END);
×
UNCOV
765
    pBlock = pReader->pResBlock;
×
UNCOV
766
    *pRes = pBlock;
×
767
  }
768

UNCOV
769
  int32_t numOfRows = 0;
×
UNCOV
770
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
UNCOV
771
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
UNCOV
772
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
UNCOV
773
    numOfRows = pCol->nVal;
×
774
  } else {
UNCOV
775
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
×
776
  }
777

UNCOV
778
  code = blockDataEnsureCapacity(pBlock, numOfRows);
×
UNCOV
779
  TSDB_CHECK_CODE(code, line, END);
×
UNCOV
780
  pBlock->info.rows = numOfRows;
×
UNCOV
781
  int32_t colActual = blockDataGetNumOfCols(pBlock);
×
782

783
  // convert and scan one block
UNCOV
784
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
UNCOV
785
    SArray* pCols = pSubmitTbData->aCol;
×
UNCOV
786
    int32_t numOfCols = taosArrayGetSize(pCols);
×
UNCOV
787
    int32_t targetIdx = 0;
×
UNCOV
788
    int32_t sourceIdx = 0;
×
UNCOV
789
    while (targetIdx < colActual) {
×
UNCOV
790
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
UNCOV
791
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
×
UNCOV
792
      if (sourceIdx >= numOfCols) {
×
UNCOV
793
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
×
UNCOV
794
        colDataSetNNULL(pColData, 0, numOfRows);
×
UNCOV
795
        targetIdx++;
×
UNCOV
796
        continue;
×
797
      }
798

UNCOV
799
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
×
UNCOV
800
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
UNCOV
801
      SColVal colVal = {0};
×
UNCOV
802
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
×
803
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
UNCOV
804
      if (pCol->cid < pColData->info.colId) {
×
UNCOV
805
        sourceIdx++;
×
UNCOV
806
      } else if (pCol->cid == pColData->info.colId) {
×
UNCOV
807
        for (int32_t i = 0; i < pCol->nVal; i++) {
×
UNCOV
808
          code = tColDataGetValue(pCol, i, &colVal);
×
UNCOV
809
          TSDB_CHECK_CODE(code, line, END);
×
UNCOV
810
          code = doSetVal(pColData, i, &colVal);
×
UNCOV
811
          TSDB_CHECK_CODE(code, line, END);
×
812
        }
UNCOV
813
        sourceIdx++;
×
UNCOV
814
        targetIdx++;
×
815
      } else {
UNCOV
816
        colDataSetNNULL(pColData, 0, numOfRows);
×
UNCOV
817
        targetIdx++;
×
818
      }
819
    }
820
  } else {
UNCOV
821
    SArray*         pRows = pSubmitTbData->aRowP;
×
UNCOV
822
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
×
UNCOV
823
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
×
UNCOV
824
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
×
825

UNCOV
826
    for (int32_t i = 0; i < numOfRows; i++) {
×
UNCOV
827
      SRow* pRow = taosArrayGetP(pRows, i);
×
UNCOV
828
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
×
UNCOV
829
      int32_t sourceIdx = 0;
×
UNCOV
830
      for (int32_t j = 0; j < colActual; j++) {
×
UNCOV
831
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
×
UNCOV
832
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
×
833

UNCOV
834
        while (1) {
×
UNCOV
835
          SColVal colVal = {0};
×
UNCOV
836
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
×
UNCOV
837
          TSDB_CHECK_CODE(code, line, END);
×
838

UNCOV
839
          if (colVal.cid < pColData->info.colId) {
×
UNCOV
840
            sourceIdx++;
×
UNCOV
841
            continue;
×
UNCOV
842
          } else if (colVal.cid == pColData->info.colId) {
×
UNCOV
843
            code = doSetVal(pColData, i, &colVal);
×
UNCOV
844
            TSDB_CHECK_CODE(code, line, END);
×
UNCOV
845
            sourceIdx++;
×
UNCOV
846
            break;
×
847
          } else {
848
            colDataSetNULL(pColData, i);
×
849
            break;
×
850
          }
851
        }
852
      }
853
    }
854
  }
855

UNCOV
856
END:
×
UNCOV
857
  if (code != 0) {
×
858
    tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
×
859
  }
UNCOV
860
  taosMemoryFreeClear(pTSchema);
×
UNCOV
861
  return code;
×
862
}
863

864
#define PROCESS_VAL                                      \
865
  if (curRow == 0) {                                     \
866
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
867
    buildNew = true;                                     \
868
  } else {                                               \
869
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
870
    if (currentRowAssigned != assigned[j]) {             \
871
      assigned[j] = currentRowAssigned;                  \
872
      buildNew = true;                                   \
873
    }                                                    \
874
  }
875

876
#define SET_DATA                                                     \
877
  if (colVal.cid < pColData->info.colId) {                           \
878
    sourceIdx++;                                                     \
879
  } else if (colVal.cid == pColData->info.colId) {                   \
880
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
881
    sourceIdx++;                                                     \
882
    targetIdx++;                                                     \
883
  }
884

UNCOV
885
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
×
886
                               SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
887
                               int32_t* lastRow) {
UNCOV
888
  int32_t         code = 0;
×
UNCOV
889
  SSchemaWrapper* pSW = NULL;
×
UNCOV
890
  SSDataBlock*    block = NULL;
×
UNCOV
891
  if (taosArrayGetSize(blocks) > 0) {
×
892
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
893
    TQ_NULL_GO_TO_END(pLastBlock);
×
894
    pLastBlock->info.rows = curRow - *lastRow;
×
895
    *lastRow = curRow;
×
896
  }
897

UNCOV
898
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
UNCOV
899
  TQ_NULL_GO_TO_END(block);
×
900

UNCOV
901
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
UNCOV
902
  TQ_NULL_GO_TO_END(pSW);
×
903

UNCOV
904
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
×
UNCOV
905
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
×
906
          (int32_t)taosArrayGetSize(block->pDataBlock));
907

UNCOV
908
  block->info.id.uid = pSubmitTbData->uid;
×
UNCOV
909
  block->info.version = pReader->msg.ver;
×
UNCOV
910
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
×
UNCOV
911
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
×
UNCOV
912
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
×
UNCOV
913
  pSW = NULL;
×
UNCOV
914
  taosMemoryFreeClear(block);
×
915

916
END:
×
UNCOV
917
  if (code != 0) {
×
918
    tqError("processBuildNew failed, code:%d", code);
×
919
  }
UNCOV
920
  tDeleteSchemaWrapper(pSW);
×
UNCOV
921
  blockDataFreeRes(block);
×
UNCOV
922
  taosMemoryFree(block);
×
UNCOV
923
  return code;
×
924
}
UNCOV
925
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
×
UNCOV
926
  int32_t code = 0;
×
UNCOV
927
  int32_t curRow = 0;
×
UNCOV
928
  int32_t lastRow = 0;
×
929

UNCOV
930
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
×
UNCOV
931
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
×
UNCOV
932
  TQ_NULL_GO_TO_END(assigned);
×
933

UNCOV
934
  SArray*   pCols = pSubmitTbData->aCol;
×
UNCOV
935
  SColData* pCol = taosArrayGet(pCols, 0);
×
UNCOV
936
  TQ_NULL_GO_TO_END(pCol);
×
UNCOV
937
  int32_t numOfRows = pCol->nVal;
×
UNCOV
938
  int32_t numOfCols = taosArrayGetSize(pCols);
×
UNCOV
939
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows);
×
UNCOV
940
  for (int32_t i = 0; i < numOfRows; i++) {
×
UNCOV
941
    bool buildNew = false;
×
942

UNCOV
943
    for (int32_t j = 0; j < numOfCols; j++) {
×
UNCOV
944
      pCol = taosArrayGet(pCols, j);
×
UNCOV
945
      TQ_NULL_GO_TO_END(pCol);
×
UNCOV
946
      SColVal colVal = {0};
×
UNCOV
947
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
×
UNCOV
948
      PROCESS_VAL
×
949
    }
950

UNCOV
951
    if (buildNew) {
×
UNCOV
952
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
×
953
                                       curRow, &lastRow));
954
    }
955

UNCOV
956
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
×
UNCOV
957
    TQ_NULL_GO_TO_END(pBlock);
×
958

UNCOV
959
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
×
960
            (int32_t)taosArrayGetSize(blocks));
961

UNCOV
962
    int32_t targetIdx = 0;
×
UNCOV
963
    int32_t sourceIdx = 0;
×
UNCOV
964
    int32_t colActual = blockDataGetNumOfCols(pBlock);
×
UNCOV
965
    while (targetIdx < colActual) {
×
UNCOV
966
      pCol = taosArrayGet(pCols, sourceIdx);
×
UNCOV
967
      TQ_NULL_GO_TO_END(pCol);
×
UNCOV
968
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
UNCOV
969
      TQ_NULL_GO_TO_END(pColData);
×
UNCOV
970
      SColVal colVal = {0};
×
UNCOV
971
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
×
UNCOV
972
      SET_DATA
×
973
    }
974

UNCOV
975
    curRow++;
×
976
  }
UNCOV
977
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
UNCOV
978
  pLastBlock->info.rows = curRow - lastRow;
×
UNCOV
979
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
×
UNCOV
980
END:
×
UNCOV
981
  if (code != TSDB_CODE_SUCCESS) {
×
982
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
983
  }
UNCOV
984
  taosMemoryFree(assigned);
×
UNCOV
985
  return code;
×
986
}
987

UNCOV
988
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
×
UNCOV
989
  int32_t   code = 0;
×
UNCOV
990
  STSchema* pTSchema = NULL;
×
991

UNCOV
992
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
×
UNCOV
993
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
×
UNCOV
994
  TQ_NULL_GO_TO_END(assigned);
×
995

UNCOV
996
  int32_t curRow = 0;
×
UNCOV
997
  int32_t lastRow = 0;
×
UNCOV
998
  SArray* pRows = pSubmitTbData->aRowP;
×
UNCOV
999
  int32_t numOfRows = taosArrayGetSize(pRows);
×
UNCOV
1000
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
×
UNCOV
1001
  TQ_NULL_GO_TO_END(pTSchema);
×
UNCOV
1002
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
×
1003

UNCOV
1004
  for (int32_t i = 0; i < numOfRows; i++) {
×
UNCOV
1005
    bool  buildNew = false;
×
UNCOV
1006
    SRow* pRow = taosArrayGetP(pRows, i);
×
UNCOV
1007
    TQ_NULL_GO_TO_END(pRow);
×
1008

UNCOV
1009
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
×
UNCOV
1010
      SColVal colVal = {0};
×
UNCOV
1011
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
×
UNCOV
1012
      PROCESS_VAL
×
1013
    }
1014

UNCOV
1015
    if (buildNew) {
×
UNCOV
1016
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
×
1017
                                       curRow, &lastRow));
1018
    }
1019

UNCOV
1020
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
×
UNCOV
1021
    TQ_NULL_GO_TO_END(pBlock);
×
1022

UNCOV
1023
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
×
1024
            (int32_t)taosArrayGetSize(blocks));
1025

UNCOV
1026
    int32_t targetIdx = 0;
×
UNCOV
1027
    int32_t sourceIdx = 0;
×
UNCOV
1028
    int32_t colActual = blockDataGetNumOfCols(pBlock);
×
UNCOV
1029
    while (targetIdx < colActual) {
×
UNCOV
1030
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
UNCOV
1031
      SColVal          colVal = {0};
×
UNCOV
1032
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
×
UNCOV
1033
      SET_DATA
×
1034
    }
1035

UNCOV
1036
    curRow++;
×
1037
  }
UNCOV
1038
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
UNCOV
1039
  pLastBlock->info.rows = curRow - lastRow;
×
1040

UNCOV
1041
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks));
×
UNCOV
1042
END:
×
UNCOV
1043
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1044
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1045
  }
UNCOV
1046
  taosMemoryFreeClear(pTSchema);
×
UNCOV
1047
  taosMemoryFree(assigned);
×
UNCOV
1048
  return code;
×
1049
}
1050

UNCOV
1051
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
×
UNCOV
1052
  int32_t code = 0;
×
UNCOV
1053
  int32_t lino = 0;
×
UNCOV
1054
  void*   createReq = NULL;
×
UNCOV
1055
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
×
UNCOV
1056
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
×
1057

UNCOV
1058
  if (pRsp->createTableNum == 0) {
×
UNCOV
1059
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
×
UNCOV
1060
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
×
UNCOV
1061
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
×
UNCOV
1062
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
×
1063
  }
1064

UNCOV
1065
  uint32_t len = 0;
×
UNCOV
1066
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
×
UNCOV
1067
  TSDB_CHECK_CODE(code, lino, END);
×
UNCOV
1068
  createReq = taosMemoryCalloc(1, len);
×
UNCOV
1069
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
×
1070

UNCOV
1071
  SEncoder encoder = {0};
×
UNCOV
1072
  tEncoderInit(&encoder, createReq, len);
×
UNCOV
1073
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
×
UNCOV
1074
  tEncoderClear(&encoder);
×
UNCOV
1075
  TSDB_CHECK_CODE(code, lino, END);
×
UNCOV
1076
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
×
UNCOV
1077
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
×
UNCOV
1078
  pRsp->createTableNum++;
×
UNCOV
1079
  tqTrace("build create table info msg success");
×
1080

UNCOV
1081
  END:
×
UNCOV
1082
  if (code != 0){
×
1083
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1084
    taosMemoryFree(createReq);
×
1085
  }
UNCOV
1086
  return code;
×
1087
}
1088

UNCOV
1089
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
×
UNCOV
1090
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
×
UNCOV
1091
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
UNCOV
1092
  if (pSubmitTbData == NULL) {
×
1093
    return terrno;
×
1094
  }
UNCOV
1095
  pReader->nextBlk++;
×
1096

UNCOV
1097
  if (pSubmitTbDataRet) {
×
UNCOV
1098
    *pSubmitTbDataRet = pSubmitTbData;
×
1099
  }
1100

UNCOV
1101
  int32_t sversion = pSubmitTbData->sver;
×
UNCOV
1102
  int64_t uid = pSubmitTbData->uid;
×
UNCOV
1103
  pReader->lastBlkUid = uid;
×
1104

UNCOV
1105
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
×
UNCOV
1106
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1);
×
UNCOV
1107
  if (pReader->pSchemaWrapper == NULL) {
×
UNCOV
1108
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
×
1109
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
UNCOV
1110
    pReader->cachedSchemaSuid = 0;
×
UNCOV
1111
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
1112
  }
1113

UNCOV
1114
  if (pSubmitTbData->pCreateTbReq != NULL) {
×
UNCOV
1115
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
×
UNCOV
1116
    if (code != 0) {
×
1117
      return code;
×
1118
    }
UNCOV
1119
  } else if (rawList != NULL) {
×
UNCOV
1120
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){
×
1121
      return terrno;
×
1122
    }
UNCOV
1123
    pReader->pSchemaWrapper = NULL;
×
UNCOV
1124
    return 0;
×
1125
  }
1126

UNCOV
1127
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
UNCOV
1128
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
×
1129
  } else {
UNCOV
1130
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
×
1131
  }
1132
}
1133

UNCOV
1134
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) {
×
UNCOV
1135
  if (pReader == NULL){
×
1136
    return;
×
1137
  }
UNCOV
1138
  pReader->pColIdList = pColIdList;
×
1139
}
1140

UNCOV
1141
void tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
×
UNCOV
1142
  if (pReader == NULL || tbUidList == NULL) {
×
1143
    return;
×
1144
  }
UNCOV
1145
  if (pReader->tbIdHash) {
×
UNCOV
1146
    taosHashClear(pReader->tbIdHash);
×
1147
  } else {
UNCOV
1148
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
UNCOV
1149
    if (pReader->tbIdHash == NULL) {
×
1150
      tqError("s-task:%s failed to init hash table", id);
×
1151
      return;
×
1152
    }
1153
  }
1154

UNCOV
1155
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
×
UNCOV
1156
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1157
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
×
1158
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1159
      continue;
×
1160
    }
1161
  }
1162

UNCOV
1163
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
×
1164
}
1165

UNCOV
1166
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
×
UNCOV
1167
  if (pReader == NULL || pTableUidList == NULL) {
×
1168
    return;
×
1169
  }
UNCOV
1170
  if (pReader->tbIdHash == NULL) {
×
1171
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1172
    if (pReader->tbIdHash == NULL) {
×
1173
      tqError("failed to init hash table");
×
1174
      return;
×
1175
    }
1176
  }
1177

UNCOV
1178
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
×
UNCOV
1179
  for (int i = 0; i < numOfTables; i++) {
×
UNCOV
1180
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
×
UNCOV
1181
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
×
1182
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1183
      continue;
×
1184
    }
1185
  }
1186
}
1187

UNCOV
1188
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
UNCOV
1189
  if (pReader == NULL) {
×
1190
    return false;
×
1191
  }
UNCOV
1192
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1193
}
1194

UNCOV
1195
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
UNCOV
1196
  if (pReader == NULL) {
×
1197
    return false;
×
1198
  }
UNCOV
1199
  return pReader->msg.msgStr == NULL;
×
1200
}
1201

UNCOV
1202
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
×
UNCOV
1203
  if (pReader == NULL || tbUidList == NULL) {
×
1204
    return;
×
1205
  }
UNCOV
1206
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
×
UNCOV
1207
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1208
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
×
UNCOV
1209
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1210
    }
1211
  }
1212
}
1213

UNCOV
1214
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
×
UNCOV
1215
  if (pTq == NULL || tbUidList == NULL) {
×
1216
    return TSDB_CODE_INVALID_PARA;
×
1217
  }
UNCOV
1218
  void*   pIter = NULL;
×
UNCOV
1219
  int32_t vgId = TD_VID(pTq->pVnode);
×
1220

1221
  // update the table list for each consumer handle
UNCOV
1222
  taosWLockLatch(&pTq->lock);
×
UNCOV
1223
  while (1) {
×
UNCOV
1224
    pIter = taosHashIterate(pTq->pHandle, pIter);
×
UNCOV
1225
    if (pIter == NULL) {
×
UNCOV
1226
      break;
×
1227
    }
1228

UNCOV
1229
    STqHandle* pTqHandle = (STqHandle*)pIter;
×
UNCOV
1230
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
×
UNCOV
1231
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
×
UNCOV
1232
      if (code != 0) {
×
1233
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1234
        continue;
×
1235
      }
UNCOV
1236
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
×
UNCOV
1237
      if (!isAdd) {
×
UNCOV
1238
        int32_t sz = taosArrayGetSize(tbUidList);
×
UNCOV
1239
        for (int32_t i = 0; i < sz; i++) {
×
1240
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1241
          if (tbUid &&
×
1242
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1243
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1244
            continue;
×
1245
          }
1246
        }
1247
      }
UNCOV
1248
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
UNCOV
1249
      if (isAdd) {
×
UNCOV
1250
        SArray* list = NULL;
×
UNCOV
1251
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
×
1252
                                    &list, pTqHandle->execHandle.task);
UNCOV
1253
        if (ret != TDB_CODE_SUCCESS) {
×
1254
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1255
                  pTqHandle->consumerId);
1256
          taosArrayDestroy(list);
×
1257
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1258
          taosWUnLockLatch(&pTq->lock);
×
1259

1260
          return ret;
×
1261
        }
UNCOV
1262
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
×
UNCOV
1263
        taosArrayDestroy(list);
×
1264
      } else {
1265
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1266
      }
1267
    }
1268
  }
UNCOV
1269
  taosWUnLockLatch(&pTq->lock);
×
1270

1271
  // update the table list handle for each stream scanner/wal reader
UNCOV
1272
  streamMetaWLock(pTq->pStreamMeta);
×
UNCOV
1273
  while (1) {
×
UNCOV
1274
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
×
UNCOV
1275
    if (pIter == NULL) {
×
UNCOV
1276
      break;
×
1277
    }
1278

UNCOV
1279
    int64_t      refId = *(int64_t*)pIter;
×
UNCOV
1280
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
×
UNCOV
1281
    if (pTask != NULL) {
×
UNCOV
1282
      int32_t taskId = pTask->id.taskId;
×
1283

UNCOV
1284
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
×
UNCOV
1285
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
×
UNCOV
1286
        if (code != 0) {
×
UNCOV
1287
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
×
1288
        }
1289
      }
UNCOV
1290
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
×
UNCOV
1291
      if (ret) {
×
1292
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1293
      }
1294
    }
1295
  }
1296

UNCOV
1297
  streamMetaWUnLock(pTq->pStreamMeta);
×
UNCOV
1298
  return 0;
×
1299
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc