• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

1.2
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tmsg.h"
17
#include "tq.h"
18

UNCOV
19
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
×
UNCOV
20
  if (pHandle == NULL || pHead == NULL) {
×
21
    return false;
×
22
  }
UNCOV
23
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
×
UNCOV
24
    return true;
×
25
  }
26

UNCOV
27
  int16_t msgType = pHead->msgType;
×
UNCOV
28
  char*   body = pHead->body;
×
UNCOV
29
  int32_t bodyLen = pHead->bodyLen;
×
30

UNCOV
31
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
×
UNCOV
32
  int64_t  realTbSuid = 0;
×
UNCOV
33
  SDecoder dcoder = {0};
×
UNCOV
34
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
×
UNCOV
35
  int32_t  len = bodyLen - sizeof(SMsgHead);
×
UNCOV
36
  tDecoderInit(&dcoder, data, len);
×
37

UNCOV
38
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
×
UNCOV
39
    SVCreateStbReq req = {0};
×
UNCOV
40
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
×
41
      goto end;
×
42
    }
UNCOV
43
    realTbSuid = req.suid;
×
UNCOV
44
  } else if (msgType == TDMT_VND_DROP_STB) {
×
45
    SVDropStbReq req = {0};
×
46
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
47
      goto end;
×
48
    }
49
    realTbSuid = req.suid;
×
UNCOV
50
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
×
UNCOV
51
    SVCreateTbBatchReq req = {0};
×
UNCOV
52
    if (tDecodeSVCreateTbBatchReq(&dcoder, &req) < 0) {
×
53
      goto end;
×
54
    }
55

UNCOV
56
    int32_t        needRebuild = 0;
×
UNCOV
57
    SVCreateTbReq* pCreateReq = NULL;
×
UNCOV
58
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
UNCOV
59
      pCreateReq = req.pReqs + iReq;
×
UNCOV
60
      if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
61
        needRebuild++;
×
62
      }
63
    }
UNCOV
64
    if (needRebuild == 0) {
×
65
      // do nothing
66
    } else if (needRebuild == req.nReqs) {
×
67
      realTbSuid = tbSuid;
×
68
    } else {
69
      realTbSuid = tbSuid;
×
70
      SVCreateTbBatchReq reqNew = {0};
×
71
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
×
72
      if (reqNew.pArray == NULL) {
×
73
        tDeleteSVCreateTbBatchReq(&req);
×
74
        goto end;
×
75
      }
76
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
77
        pCreateReq = req.pReqs + iReq;
×
78
        if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
×
79
          reqNew.nReqs++;
×
80
          if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
×
81
            taosArrayDestroy(reqNew.pArray);
×
82
            tDeleteSVCreateTbBatchReq(&req);
×
83
            goto end;
×
84
          }
85
        }
86
      }
87

88
      int     tlen = 0;
×
89
      int32_t ret = 0;
×
90
      tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
×
91
      void* buf = taosMemoryMalloc(tlen);
×
92
      if (NULL == buf) {
×
93
        taosArrayDestroy(reqNew.pArray);
×
94
        tDeleteSVCreateTbBatchReq(&req);
×
95
        goto end;
×
96
      }
97
      SEncoder coderNew = {0};
×
98
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
99
      ret = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
×
100
      tEncoderClear(&coderNew);
×
101
      if (ret < 0) {
×
102
        taosMemoryFree(buf);
×
103
        taosArrayDestroy(reqNew.pArray);
×
104
        tDeleteSVCreateTbBatchReq(&req);
×
105
        goto end;
×
106
      }
107
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
108
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
109
      taosMemoryFree(buf);
×
110
      taosArrayDestroy(reqNew.pArray);
×
111
    }
112

UNCOV
113
    tDeleteSVCreateTbBatchReq(&req);
×
114
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
×
115
    SVAlterTbReq req = {0};
×
116

117
    if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
×
118
      goto end;
×
119
    }
120

121
    SMetaReader mr = {0};
×
122
    metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, META_READER_LOCK);
×
123

124
    if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
×
125
      metaReaderClear(&mr);
×
126
      goto end;
×
127
    }
128
    realTbSuid = mr.me.ctbEntry.suid;
×
129
    metaReaderClear(&mr);
×
130
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
131
    SVDropTbBatchReq req = {0};
×
132

133
    if (tDecodeSVDropTbBatchReq(&dcoder, &req) < 0) {
×
134
      goto end;
×
135
    }
136

137
    int32_t      needRebuild = 0;
×
138
    SVDropTbReq* pDropReq = NULL;
×
139
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
140
      pDropReq = req.pReqs + iReq;
×
141

142
      if (pDropReq->suid == tbSuid) {
×
143
        needRebuild++;
×
144
      }
145
    }
146
    if (needRebuild == 0) {
×
147
      // do nothing
148
    } else if (needRebuild == req.nReqs) {
×
149
      realTbSuid = tbSuid;
×
150
    } else {
151
      realTbSuid = tbSuid;
×
152
      SVDropTbBatchReq reqNew = {0};
×
153
      reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
154
      if (reqNew.pArray == NULL) {
×
155
        goto end;
×
156
      }
157
      for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
158
        pDropReq = req.pReqs + iReq;
×
159
        if (pDropReq->suid == tbSuid) {
×
160
          reqNew.nReqs++;
×
161
          if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
162
            taosArrayDestroy(reqNew.pArray);
×
163
            goto end;
×
164
          }
165
        }
166
      }
167

168
      int     tlen = 0;
×
169
      int32_t ret = 0;
×
170
      tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
×
171
      void* buf = taosMemoryMalloc(tlen);
×
172
      if (NULL == buf) {
×
173
        taosArrayDestroy(reqNew.pArray);
×
174
        goto end;
×
175
      }
176
      SEncoder coderNew = {0};
×
177
      tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
×
178
      ret = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
179
      tEncoderClear(&coderNew);
×
180
      if (ret != 0) {
×
181
        taosMemoryFree(buf);
×
182
        taosArrayDestroy(reqNew.pArray);
×
183
        goto end;
×
184
      }
185
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
186
      pHead->bodyLen = tlen + sizeof(SMsgHead);
×
187
      taosMemoryFree(buf);
×
188
      taosArrayDestroy(reqNew.pArray);
×
189
    }
190
  } else if (msgType == TDMT_VND_DELETE) {
×
191
    SDeleteRes req = {0};
×
192
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
193
      goto end;
×
194
    }
195
    realTbSuid = req.suid;
×
196
  }
197

198
end:
×
UNCOV
199
  tDecoderClear(&dcoder);
×
UNCOV
200
  return tbSuid == realTbSuid;
×
201
}
202

UNCOV
203
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
×
UNCOV
204
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
×
205
    return -1;
×
206
  }
UNCOV
207
  int32_t code = -1;
×
UNCOV
208
  int32_t vgId = TD_VID(pTq->pVnode);
×
UNCOV
209
  int64_t id = pHandle->pWalReader->readerId;
×
210

UNCOV
211
  int64_t offset = *fetchOffset;
×
UNCOV
212
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
×
UNCOV
213
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
×
UNCOV
214
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
×
215

UNCOV
216
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
×
217
          ", 0x%" PRIx64,
218
          vgId, offset, lastVer, committedVer, appliedVer, id);
219

UNCOV
220
  while (offset <= appliedVer) {
×
UNCOV
221
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
×
222
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
223
              ", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
224
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
225
      goto END;
×
226
    }
227

UNCOV
228
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
×
229
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
230

UNCOV
231
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
×
UNCOV
232
      code = walFetchBody(pHandle->pWalReader);
×
UNCOV
233
      goto END;
×
234
    } else {
UNCOV
235
      if (pHandle->fetchMeta != WITH_DATA) {
×
UNCOV
236
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
×
UNCOV
237
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
×
UNCOV
238
          code = walFetchBody(pHandle->pWalReader);
×
UNCOV
239
          if (code < 0) {
×
240
            goto END;
×
241
          }
242

UNCOV
243
          pHead = &(pHandle->pWalReader->pHead->head);
×
UNCOV
244
          if (isValValidForTable(pHandle, pHead)) {
×
UNCOV
245
            code = 0;
×
UNCOV
246
            goto END;
×
247
          } else {
UNCOV
248
            offset++;
×
UNCOV
249
            code = -1;
×
UNCOV
250
            continue;
×
251
          }
252
        }
253
      }
UNCOV
254
      code = walSkipFetchBody(pHandle->pWalReader);
×
UNCOV
255
      if (code < 0) {
×
256
        goto END;
×
257
      }
UNCOV
258
      offset++;
×
259
    }
UNCOV
260
    code = -1;
×
261
  }
262

UNCOV
263
END:
×
UNCOV
264
  *fetchOffset = offset;
×
UNCOV
265
  return code;
×
266
}
267

UNCOV
268
bool tqGetTablePrimaryKey(STqReader* pReader) {
×
UNCOV
269
  if (pReader == NULL) {
×
270
    return false;
×
271
  }
UNCOV
272
  return pReader->hasPrimaryKey;
×
273
}
274

UNCOV
275
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
×
UNCOV
276
  if (pReader == NULL) {
×
277
    return;
×
278
  }
UNCOV
279
  bool            ret = false;
×
UNCOV
280
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL);
×
UNCOV
281
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
×
UNCOV
282
    ret = true;
×
283
  }
284
  tDeleteSchemaWrapper(schema);
UNCOV
285
  pReader->hasPrimaryKey = ret;
×
286
}
287

UNCOV
288
STqReader* tqReaderOpen(SVnode* pVnode) {
×
UNCOV
289
  if (pVnode == NULL) {
×
290
    return NULL;
×
291
  }
UNCOV
292
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
×
UNCOV
293
  if (pReader == NULL) {
×
294
    return NULL;
×
295
  }
296

UNCOV
297
  pReader->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
×
UNCOV
298
  if (pReader->pWalReader == NULL) {
×
299
    taosMemoryFree(pReader);
×
300
    return NULL;
×
301
  }
302

UNCOV
303
  pReader->pVnodeMeta = pVnode->pMeta;
×
UNCOV
304
  pReader->pColIdList = NULL;
×
UNCOV
305
  pReader->cachedSchemaVer = 0;
×
UNCOV
306
  pReader->cachedSchemaSuid = 0;
×
UNCOV
307
  pReader->pSchemaWrapper = NULL;
×
UNCOV
308
  pReader->tbIdHash = NULL;
×
UNCOV
309
  pReader->pResBlock = NULL;
×
310

UNCOV
311
  int32_t code = createDataBlock(&pReader->pResBlock);
×
UNCOV
312
  if (code) {
×
313
    terrno = code;
×
314
  }
315

UNCOV
316
  return pReader;
×
317
}
318

UNCOV
319
void tqReaderClose(STqReader* pReader) {
×
UNCOV
320
  if (pReader == NULL) return;
×
321

322
  // close wal reader
UNCOV
323
  if (pReader->pWalReader) {
×
UNCOV
324
    walCloseReader(pReader->pWalReader);
×
325
  }
326

UNCOV
327
  if (pReader->pSchemaWrapper) {
×
UNCOV
328
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
×
329
  }
330

UNCOV
331
  if (pReader->pColIdList) {
×
UNCOV
332
    taosArrayDestroy(pReader->pColIdList);
×
333
  }
334

335
  // free hash
UNCOV
336
  blockDataDestroy(pReader->pResBlock);
×
UNCOV
337
  taosHashCleanup(pReader->tbIdHash);
×
UNCOV
338
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
×
UNCOV
339
  taosMemoryFree(pReader);
×
340
}
341

UNCOV
342
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
×
UNCOV
343
  if (pReader == NULL) {
×
344
    return TSDB_CODE_INVALID_PARA;
×
345
  }
UNCOV
346
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
×
UNCOV
347
    return -1;
×
348
  }
UNCOV
349
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
×
UNCOV
350
  return 0;
×
351
}
352

UNCOV
353
int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) {
×
UNCOV
354
  int32_t code = 0;
×
355

UNCOV
356
  while (1) {
×
UNCOV
357
    TAOS_CHECK_RETURN(walNextValidMsg(pReader));
×
358

UNCOV
359
    SWalCont* pCont = &pReader->pHead->head;
×
UNCOV
360
    int64_t   ver = pCont->version;
×
UNCOV
361
    if (ver > maxVer) {
×
UNCOV
362
      tqDebug("maxVer in WAL:%" PRId64 " reached, current:%" PRId64 ", do not scan wal anymore, %s", maxVer, ver, id);
×
UNCOV
363
      return TSDB_CODE_SUCCESS;
×
364
    }
365

UNCOV
366
    if (pCont->msgType == TDMT_VND_SUBMIT) {
×
UNCOV
367
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SSubmitReq2Msg));
×
UNCOV
368
      int32_t len = pCont->bodyLen - sizeof(SSubmitReq2Msg);
×
369

UNCOV
370
      void* data = taosMemoryMalloc(len);
×
UNCOV
371
      if (data == NULL) {
×
372
        // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then
373
        // retry
374
        tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0);
×
375
        return terrno;
×
376
      }
377

UNCOV
378
      (void)memcpy(data, pBody, len);
×
UNCOV
379
      SPackedData data1 = (SPackedData){.ver = ver, .msgLen = len, .msgStr = data};
×
380

UNCOV
381
      code = streamDataSubmitNew(&data1, STREAM_INPUT__DATA_SUBMIT, (SStreamDataSubmit**)pItem);
×
UNCOV
382
      if (code != 0) {
×
383
        tqError("%s failed to create data submit for stream since out of memory", id);
×
384
        return code;
×
385
      }
UNCOV
386
    } else if (pCont->msgType == TDMT_VND_DELETE) {
×
UNCOV
387
      void*   pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
×
UNCOV
388
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
×
UNCOV
389
      EStreamType blockType = STREAM_DELETE_DATA;
×
UNCOV
390
      code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
×
UNCOV
391
      if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
392
        if (*pItem == NULL) {
×
UNCOV
393
          tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);
×
394
          // we need to continue check next data in the wal files.
UNCOV
395
          continue;
×
396
        } else {
UNCOV
397
          tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%" PRId64, id, len, ver);
×
398
        }
399
      } else {
400
        terrno = code;
×
401
        tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
×
402
        return code;
×
403
      }
404

UNCOV
405
    } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
×
UNCOV
406
      void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
×
UNCOV
407
      int32_t len = pCont->bodyLen - sizeof(SMsgHead);
×
UNCOV
408
      code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0);
×
UNCOV
409
      if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
410
        if (!*pItem) {
×
411
          continue;
×
412
        } else {
UNCOV
413
          tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
×
414
        }
415
      } else {
416
        terrno = code;
×
417
        return code;
×
418
      }
419
    } else {
420
      tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver);
×
421
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
422
    }
423

UNCOV
424
    return code;
×
425
  }
426
}
427

UNCOV
428
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
×
UNCOV
429
  if (pReader == NULL) {
×
430
    return false;
×
431
  }
UNCOV
432
  SWalReader* pWalReader = pReader->pWalReader;
×
433

UNCOV
434
  int64_t st = taosGetTimestampMs();
×
UNCOV
435
  while (1) {
×
UNCOV
436
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
UNCOV
437
    while (pReader->nextBlk < numOfBlocks) {
×
UNCOV
438
      tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
439
              pReader->msg.ver);
440

UNCOV
441
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
UNCOV
442
      if (pSubmitTbData == NULL) {
×
UNCOV
443
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
444
                pReader->msg.ver);
445
        return false;
×
446
      }
UNCOV
447
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
×
UNCOV
448
        pReader->nextBlk += 1;
×
UNCOV
449
        continue;
×
450
      }
UNCOV
451
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
×
UNCOV
452
        tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
×
UNCOV
453
        SSDataBlock* pRes = NULL;
×
UNCOV
454
        int32_t      code = tqRetrieveDataBlock(pReader, &pRes, NULL);
×
UNCOV
455
        if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
456
          return true;
×
457
        }
458
      } else {
UNCOV
459
        pReader->nextBlk += 1;
×
UNCOV
460
        tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
×
461
      }
462
    }
463

UNCOV
464
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
×
UNCOV
465
    pReader->msg.msgStr = NULL;
×
466

UNCOV
467
    int64_t elapsed = taosGetTimestampMs() - st;
×
UNCOV
468
    if (elapsed > 1000 || elapsed < 0) {
×
UNCOV
469
      return false;
×
470
    }
471

472
    // try next message in wal file
UNCOV
473
    if (walNextValidMsg(pWalReader) < 0) {
×
UNCOV
474
      return false;
×
475
    }
476

UNCOV
477
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
×
UNCOV
478
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
×
UNCOV
479
    int64_t ver = pWalReader->pHead->head.version;
×
UNCOV
480
    if (tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver) != 0) {
×
481
      return false;
×
482
    }
UNCOV
483
    pReader->nextBlk = 0;
×
484
  }
485
}
486

UNCOV
487
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
×
UNCOV
488
if (pReader == NULL) {
×
489
    return TSDB_CODE_INVALID_PARA;
×
490
  }
UNCOV
491
  pReader->msg.msgStr = msgStr;
×
UNCOV
492
  pReader->msg.msgLen = msgLen;
×
UNCOV
493
  pReader->msg.ver = ver;
×
494

UNCOV
495
  tqDebug("tq reader set msg %p %d", msgStr, msgLen);
×
UNCOV
496
  SDecoder decoder = {0};
×
497

UNCOV
498
  tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
×
UNCOV
499
  int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit);
×
UNCOV
500
  if (code != 0) {
×
501
    tDecoderClear(&decoder);
×
502
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
503
    return code;
×
504
  }
505

UNCOV
506
  tDecoderClear(&decoder);
×
UNCOV
507
  return 0;
×
508
}
509

UNCOV
510
SWalReader* tqGetWalReader(STqReader* pReader) {
×
UNCOV
511
  if (pReader == NULL) {
×
512
    return NULL;
×
513
  }
UNCOV
514
  return pReader->pWalReader;
×
515
}
516

UNCOV
517
SSDataBlock* tqGetResultBlock(STqReader* pReader) {
×
UNCOV
518
  if (pReader == NULL) {
×
519
    return NULL;
×
520
  }
UNCOV
521
  return pReader->pResBlock;
×
522
}
523

UNCOV
524
int64_t tqGetResultBlockTime(STqReader* pReader) {
×
UNCOV
525
  if (pReader == NULL) {
×
526
    return 0;
×
527
  }
UNCOV
528
  return pReader->lastTs;
×
529
}
530

UNCOV
531
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
×
UNCOV
532
  if (pReader == NULL || pReader->msg.msgStr == NULL) {
×
533
    return false;
×
534
  }
535

UNCOV
536
  int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
UNCOV
537
  while (pReader->nextBlk < numOfBlocks) {
×
UNCOV
538
    tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
×
539
            (pReader->nextBlk + 1), numOfBlocks, idstr);
540

UNCOV
541
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
UNCOV
542
    if (pSubmitTbData == NULL) {
×
543
      return false;
×
544
    }
UNCOV
545
    if (pReader->tbIdHash == NULL) {
×
546
      return true;
×
547
    }
548

UNCOV
549
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
×
UNCOV
550
    if (ret != NULL) {
×
UNCOV
551
      tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr);
×
UNCOV
552
      return true;
×
553
    } else {
UNCOV
554
      tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid,
×
555
              taosHashGetSize(pReader->tbIdHash), idstr);
556
    }
557

UNCOV
558
    pReader->nextBlk++;
×
559
  }
560

UNCOV
561
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
×
UNCOV
562
  pReader->nextBlk = 0;
×
UNCOV
563
  pReader->msg.msgStr = NULL;
×
564

UNCOV
565
  return false;
×
566
}
567

UNCOV
568
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
×
UNCOV
569
  if (pReader == NULL || pReader->msg.msgStr == NULL) return false;
×
570

UNCOV
571
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
×
UNCOV
572
  while (pReader->nextBlk < blockSz) {
×
UNCOV
573
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
UNCOV
574
    if (pSubmitTbData == NULL) return false;
×
UNCOV
575
    if (filterOutUids == NULL) return true;
×
576

UNCOV
577
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
×
UNCOV
578
    if (ret == NULL) {
×
UNCOV
579
      return true;
×
580
    }
581
    pReader->nextBlk++;
×
582
  }
583

UNCOV
584
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
×
UNCOV
585
  pReader->nextBlk = 0;
×
UNCOV
586
  pReader->msg.msgStr = NULL;
×
587

UNCOV
588
  return false;
×
589
}
590

UNCOV
591
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
×
UNCOV
592
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
×
593
    return TSDB_CODE_INVALID_PARA;
×
594
  }
UNCOV
595
  int32_t code = 0;
×
596

UNCOV
597
  int32_t cnt = 0;
×
UNCOV
598
  for (int32_t i = 0; i < pSrc->nCols; i++) {
×
UNCOV
599
    cnt += mask[i];
×
600
  }
601

UNCOV
602
  pDst->nCols = cnt;
×
UNCOV
603
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
×
UNCOV
604
  if (pDst->pSchema == NULL) {
×
605
    return TAOS_GET_TERRNO(terrno);
×
606
  }
607

UNCOV
608
  int32_t j = 0;
×
UNCOV
609
  for (int32_t i = 0; i < pSrc->nCols; i++) {
×
UNCOV
610
    if (mask[i]) {
×
UNCOV
611
      pDst->pSchema[j++] = pSrc->pSchema[i];
×
612
      SColumnInfoData colInfo =
UNCOV
613
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
×
UNCOV
614
      code = blockDataAppendColInfo(pBlock, &colInfo);
×
UNCOV
615
      if (code != 0) {
×
616
        return code;
×
617
      }
618
    }
619
  }
UNCOV
620
  return 0;
×
621
}
622

UNCOV
623
static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, const SArray* pColIdList) {
×
UNCOV
624
  if (pReader == NULL || pSchema == NULL || pColIdList == NULL) {
×
625
    return TSDB_CODE_INVALID_PARA;
×
626
  }
UNCOV
627
  SSDataBlock* pBlock = pReader->pResBlock;
×
UNCOV
628
  if (blockDataGetNumOfCols(pBlock) > 0) {
×
UNCOV
629
      blockDataDestroy(pBlock);
×
UNCOV
630
      int32_t code = createDataBlock(&pReader->pResBlock);
×
UNCOV
631
      if (code) {
×
632
        return code;
×
633
      }
UNCOV
634
      pBlock = pReader->pResBlock;
×
635

UNCOV
636
      pBlock->info.id.uid = pReader->cachedSchemaUid;
×
UNCOV
637
      pBlock->info.version = pReader->msg.ver;
×
638
  }
639

UNCOV
640
  int32_t numOfCols = taosArrayGetSize(pColIdList);
×
641

UNCOV
642
  if (numOfCols == 0) {  // all columns are required
×
643
    for (int32_t i = 0; i < pSchema->nCols; ++i) {
×
644
      SSchema*        pColSchema = &pSchema->pSchema[i];
×
645
      SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
646

647
      int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
×
648
      if (code != TSDB_CODE_SUCCESS) {
×
649
        blockDataFreeRes(pBlock);
×
650
        return terrno;
×
651
      }
652
    }
653
  } else {
UNCOV
654
    if (numOfCols > pSchema->nCols) {
×
UNCOV
655
      numOfCols = pSchema->nCols;
×
656
    }
657

UNCOV
658
    int32_t i = 0;
×
UNCOV
659
    int32_t j = 0;
×
UNCOV
660
    while (i < pSchema->nCols && j < numOfCols) {
×
UNCOV
661
      SSchema* pColSchema = &pSchema->pSchema[i];
×
UNCOV
662
      col_id_t colIdSchema = pColSchema->colId;
×
663

UNCOV
664
      col_id_t* pColIdNeed = (col_id_t*)taosArrayGet(pColIdList, j);
×
UNCOV
665
      if (pColIdNeed == NULL) {
×
666
        break;
×
667
      }
UNCOV
668
      if (colIdSchema < *pColIdNeed) {
×
UNCOV
669
        i++;
×
UNCOV
670
      } else if (colIdSchema > *pColIdNeed) {
×
671
        j++;
×
672
      } else {
UNCOV
673
        SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
×
UNCOV
674
        int32_t         code = blockDataAppendColInfo(pBlock, &colInfo);
×
UNCOV
675
        if (code != TSDB_CODE_SUCCESS) {
×
676
          return -1;
×
677
        }
UNCOV
678
        i++;
×
UNCOV
679
        j++;
×
680
      }
681
    }
682
  }
683

UNCOV
684
  return TSDB_CODE_SUCCESS;
×
685
}
686

UNCOV
687
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
×
UNCOV
688
  int32_t code = TSDB_CODE_SUCCESS;
×
689

UNCOV
690
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
×
UNCOV
691
    char val[65535 + 2] = {0};
×
UNCOV
692
    if (COL_VAL_IS_VALUE(pColVal)) {
×
UNCOV
693
      if (pColVal->value.pData != NULL) {
×
UNCOV
694
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
×
695
      }
UNCOV
696
      varDataSetLen(val, pColVal->value.nData);
×
UNCOV
697
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
×
698
    } else {
699
      colDataSetNULL(pColumnInfoData, rowIndex);
×
700
    }
701
  } else {
UNCOV
702
    code = colDataSetVal(pColumnInfoData, rowIndex, (void*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal));
×
703
  }
704

UNCOV
705
  return code;
×
706
}
707

UNCOV
708
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
×
UNCOV
709
  if (pReader == NULL || pRes == NULL) {
×
710
    return TSDB_CODE_INVALID_PARA;
×
711
  }
UNCOV
712
  tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
×
UNCOV
713
  int32_t        code = 0;
×
UNCOV
714
  int32_t        line = 0;
×
UNCOV
715
  STSchema*      pTSchema = NULL;
×
UNCOV
716
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
×
UNCOV
717
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
×
UNCOV
718
  SSDataBlock* pBlock = pReader->pResBlock;
×
UNCOV
719
  *pRes = pBlock;
×
720

UNCOV
721
  blockDataCleanup(pBlock);
×
722

UNCOV
723
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
×
UNCOV
724
  int32_t sversion = pSubmitTbData->sver;
×
UNCOV
725
  int64_t suid = pSubmitTbData->suid;
×
UNCOV
726
  int64_t uid = pSubmitTbData->uid;
×
UNCOV
727
  pReader->lastTs = pSubmitTbData->ctimeMs;
×
728

UNCOV
729
  pBlock->info.id.uid = uid;
×
UNCOV
730
  pBlock->info.version = pReader->msg.ver;
×
731

UNCOV
732
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
×
UNCOV
733
      (pReader->cachedSchemaVer != sversion)) {
×
UNCOV
734
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
×
735

UNCOV
736
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL);
×
UNCOV
737
    if (pReader->pSchemaWrapper == NULL) {
×
UNCOV
738
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64
×
739
             "version %d, possibly dropped table",
740
             vgId, suid, uid, pReader->cachedSchemaVer);
UNCOV
741
      pReader->cachedSchemaSuid = 0;
×
UNCOV
742
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
743
    }
744

UNCOV
745
    pReader->cachedSchemaUid = uid;
×
UNCOV
746
    pReader->cachedSchemaSuid = suid;
×
UNCOV
747
    pReader->cachedSchemaVer = sversion;
×
748

UNCOV
749
    if (pReader->cachedSchemaVer != pReader->pSchemaWrapper->version) {
×
750
      tqError("vgId:%d, schema version mismatch, suid:%" PRId64 ", uid:%" PRId64 ", version:%d, cached version:%d",
×
751
              vgId, suid, uid, sversion, pReader->pSchemaWrapper->version);
752
      return TSDB_CODE_TQ_INTERNAL_ERROR;
×
753
    }
UNCOV
754
    code = buildResSDataBlock(pReader, pReader->pSchemaWrapper, pReader->pColIdList);
×
UNCOV
755
    TSDB_CHECK_CODE(code, line, END);
×
UNCOV
756
    pBlock = pReader->pResBlock;
×
UNCOV
757
    *pRes = pBlock;
×
758
  }
759

UNCOV
760
  int32_t numOfRows = 0;
×
UNCOV
761
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
UNCOV
762
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
×
UNCOV
763
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
UNCOV
764
    numOfRows = pCol->nVal;
×
765
  } else {
UNCOV
766
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
×
767
  }
768

UNCOV
769
  code = blockDataEnsureCapacity(pBlock, numOfRows);
×
UNCOV
770
  TSDB_CHECK_CODE(code, line, END);
×
UNCOV
771
  pBlock->info.rows = numOfRows;
×
UNCOV
772
  int32_t colActual = blockDataGetNumOfCols(pBlock);
×
773

774
  // convert and scan one block
UNCOV
775
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
UNCOV
776
    SArray* pCols = pSubmitTbData->aCol;
×
UNCOV
777
    int32_t numOfCols = taosArrayGetSize(pCols);
×
UNCOV
778
    int32_t targetIdx = 0;
×
UNCOV
779
    int32_t sourceIdx = 0;
×
UNCOV
780
    while (targetIdx < colActual) {
×
UNCOV
781
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
UNCOV
782
      TSDB_CHECK_NULL(pColData, code, line, END, terrno);
×
UNCOV
783
      if (sourceIdx >= numOfCols) {
×
UNCOV
784
        tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
×
UNCOV
785
        colDataSetNNULL(pColData, 0, numOfRows);
×
UNCOV
786
        targetIdx++;
×
UNCOV
787
        continue;
×
788
      }
789

UNCOV
790
      SColData* pCol = taosArrayGet(pCols, sourceIdx);
×
UNCOV
791
      TSDB_CHECK_NULL(pCol, code, line, END, terrno);
×
UNCOV
792
      SColVal colVal = {0};
×
UNCOV
793
      tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual,
×
794
              sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
UNCOV
795
      if (pCol->cid < pColData->info.colId) {
×
UNCOV
796
        sourceIdx++;
×
UNCOV
797
      } else if (pCol->cid == pColData->info.colId) {
×
UNCOV
798
        for (int32_t i = 0; i < pCol->nVal; i++) {
×
UNCOV
799
          tColDataGetValue(pCol, i, &colVal);
×
UNCOV
800
          code = doSetVal(pColData, i, &colVal);
×
UNCOV
801
          TSDB_CHECK_CODE(code, line, END);
×
802
        }
UNCOV
803
        sourceIdx++;
×
UNCOV
804
        targetIdx++;
×
805
      } else {
UNCOV
806
        colDataSetNNULL(pColData, 0, numOfRows);
×
UNCOV
807
        targetIdx++;
×
808
      }
809
    }
810
  } else {
UNCOV
811
    SArray*         pRows = pSubmitTbData->aRowP;
×
UNCOV
812
    SSchemaWrapper* pWrapper = pReader->pSchemaWrapper;
×
UNCOV
813
    pTSchema = tBuildTSchema(pWrapper->pSchema, pWrapper->nCols, pWrapper->version);
×
UNCOV
814
    TSDB_CHECK_NULL(pTSchema, code, line, END, terrno);
×
815

UNCOV
816
    for (int32_t i = 0; i < numOfRows; i++) {
×
UNCOV
817
      SRow* pRow = taosArrayGetP(pRows, i);
×
UNCOV
818
      TSDB_CHECK_NULL(pRow, code, line, END, terrno);
×
UNCOV
819
      int32_t sourceIdx = 0;
×
UNCOV
820
      for (int32_t j = 0; j < colActual; j++) {
×
UNCOV
821
        SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
×
UNCOV
822
        TSDB_CHECK_NULL(pColData, code, line, END, terrno);
×
823

UNCOV
824
        while (1) {
×
UNCOV
825
          SColVal colVal = {0};
×
UNCOV
826
          code = tRowGet(pRow, pTSchema, sourceIdx, &colVal);
×
UNCOV
827
          TSDB_CHECK_CODE(code, line, END);
×
828

UNCOV
829
          if (colVal.cid < pColData->info.colId) {
×
UNCOV
830
            sourceIdx++;
×
UNCOV
831
            continue;
×
UNCOV
832
          } else if (colVal.cid == pColData->info.colId) {
×
UNCOV
833
            code = doSetVal(pColData, i, &colVal);
×
UNCOV
834
            TSDB_CHECK_CODE(code, line, END);
×
UNCOV
835
            sourceIdx++;
×
UNCOV
836
            break;
×
837
          } else {
838
            colDataSetNULL(pColData, i);
×
839
            break;
×
840
          }
841
        }
842
      }
843
    }
844
  }
845

UNCOV
846
END:
×
UNCOV
847
  if (code != 0) {
×
848
    tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code);
×
849
  }
UNCOV
850
  taosMemoryFreeClear(pTSchema);
×
UNCOV
851
  return code;
×
852
}
853

854
#define PROCESS_VAL                                      \
855
  if (curRow == 0) {                                     \
856
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
857
    buildNew = true;                                     \
858
  } else {                                               \
859
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
860
    if (currentRowAssigned != assigned[j]) {             \
861
      assigned[j] = currentRowAssigned;                  \
862
      buildNew = true;                                   \
863
    }                                                    \
864
  }
865

866
#define SET_DATA                                                     \
867
  if (colVal.cid < pColData->info.colId) {                           \
868
    sourceIdx++;                                                     \
869
  } else if (colVal.cid == pColData->info.colId) {                   \
870
    TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal)); \
871
    sourceIdx++;                                                     \
872
    targetIdx++;                                                     \
873
  }
874

UNCOV
875
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
×
876
                               SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow,
877
                               int32_t* lastRow) {
UNCOV
878
  if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL || pSchemaWrapper == NULL ||
×
UNCOV
879
      assigned == NULL || lastRow == NULL) {
×
880
    return TSDB_CODE_INVALID_PARA;
×
881
  }
UNCOV
882
  int32_t         code = 0;
×
UNCOV
883
  SSchemaWrapper* pSW = NULL;
×
UNCOV
884
  SSDataBlock*    block = NULL;
×
UNCOV
885
  if (taosArrayGetSize(blocks) > 0) {
×
886
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
887
    TQ_NULL_GO_TO_END(pLastBlock);
×
888
    pLastBlock->info.rows = curRow - *lastRow;
×
889
    *lastRow = curRow;
×
890
  }
891

UNCOV
892
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
UNCOV
893
  TQ_NULL_GO_TO_END(block);
×
894

UNCOV
895
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
×
UNCOV
896
  TQ_NULL_GO_TO_END(pSW);
×
897

UNCOV
898
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned));
×
UNCOV
899
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
×
900
          (int32_t)taosArrayGetSize(block->pDataBlock));
901

UNCOV
902
  block->info.id.uid = pSubmitTbData->uid;
×
UNCOV
903
  block->info.version = pReader->msg.ver;
×
UNCOV
904
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
×
UNCOV
905
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
×
UNCOV
906
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
×
UNCOV
907
  pSW = NULL;
×
UNCOV
908
  taosMemoryFreeClear(block);
×
909

UNCOV
910
END:
×
UNCOV
911
  tDeleteSchemaWrapper(pSW);
×
UNCOV
912
  blockDataFreeRes(block);
×
UNCOV
913
  taosMemoryFree(block);
×
UNCOV
914
  return code;
×
915
}
UNCOV
916
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
×
UNCOV
917
  if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL) {
×
918
    return TSDB_CODE_INVALID_PARA;
×
919
  }
UNCOV
920
  int32_t code = 0;
×
UNCOV
921
  int32_t curRow = 0;
×
UNCOV
922
  int32_t lastRow = 0;
×
923

UNCOV
924
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
×
UNCOV
925
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
×
UNCOV
926
  TQ_NULL_GO_TO_END(assigned);
×
927

UNCOV
928
  SArray*   pCols = pSubmitTbData->aCol;
×
UNCOV
929
  SColData* pCol = taosArrayGet(pCols, 0);
×
UNCOV
930
  TQ_NULL_GO_TO_END(pCol);
×
UNCOV
931
  int32_t numOfRows = pCol->nVal;
×
UNCOV
932
  int32_t numOfCols = taosArrayGetSize(pCols);
×
UNCOV
933
  for (int32_t i = 0; i < numOfRows; i++) {
×
UNCOV
934
    bool buildNew = false;
×
935

UNCOV
936
    for (int32_t j = 0; j < numOfCols; j++) {
×
UNCOV
937
      pCol = taosArrayGet(pCols, j);
×
UNCOV
938
      TQ_NULL_GO_TO_END(pCol);
×
UNCOV
939
      SColVal colVal = {0};
×
UNCOV
940
      tColDataGetValue(pCol, i, &colVal);
×
UNCOV
941
      PROCESS_VAL
×
942
    }
943

UNCOV
944
    if (buildNew) {
×
UNCOV
945
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
×
946
                                       curRow, &lastRow));
947
    }
948

UNCOV
949
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
×
UNCOV
950
    TQ_NULL_GO_TO_END(pBlock);
×
951

UNCOV
952
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
×
953
            (int32_t)taosArrayGetSize(blocks));
954

UNCOV
955
    int32_t targetIdx = 0;
×
UNCOV
956
    int32_t sourceIdx = 0;
×
UNCOV
957
    int32_t colActual = blockDataGetNumOfCols(pBlock);
×
UNCOV
958
    while (targetIdx < colActual) {
×
UNCOV
959
      pCol = taosArrayGet(pCols, sourceIdx);
×
UNCOV
960
      TQ_NULL_GO_TO_END(pCol);
×
UNCOV
961
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
UNCOV
962
      TQ_NULL_GO_TO_END(pColData);
×
UNCOV
963
      SColVal colVal = {0};
×
UNCOV
964
      tColDataGetValue(pCol, i, &colVal);
×
UNCOV
965
      SET_DATA
×
966
    }
967

UNCOV
968
    curRow++;
×
969
  }
UNCOV
970
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
UNCOV
971
  pLastBlock->info.rows = curRow - lastRow;
×
972

UNCOV
973
END:
×
UNCOV
974
  taosMemoryFree(assigned);
×
UNCOV
975
  return code;
×
976
}
977

UNCOV
978
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
×
UNCOV
979
  if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL) {
×
980
    return TSDB_CODE_INVALID_PARA;
×
981
  }
UNCOV
982
  int32_t   code = 0;
×
UNCOV
983
  STSchema* pTSchema = NULL;
×
984

UNCOV
985
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
×
UNCOV
986
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
×
UNCOV
987
  TQ_NULL_GO_TO_END(assigned);
×
988

UNCOV
989
  int32_t curRow = 0;
×
UNCOV
990
  int32_t lastRow = 0;
×
UNCOV
991
  SArray* pRows = pSubmitTbData->aRowP;
×
UNCOV
992
  int32_t numOfRows = taosArrayGetSize(pRows);
×
UNCOV
993
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
×
994

UNCOV
995
  for (int32_t i = 0; i < numOfRows; i++) {
×
UNCOV
996
    bool  buildNew = false;
×
UNCOV
997
    SRow* pRow = taosArrayGetP(pRows, i);
×
UNCOV
998
    TQ_NULL_GO_TO_END(pRow);
×
999

UNCOV
1000
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
×
UNCOV
1001
      SColVal colVal = {0};
×
UNCOV
1002
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
×
UNCOV
1003
      PROCESS_VAL
×
1004
    }
1005

UNCOV
1006
    if (buildNew) {
×
UNCOV
1007
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows,
×
1008
                                       curRow, &lastRow));
1009
    }
1010

UNCOV
1011
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
×
UNCOV
1012
    TQ_NULL_GO_TO_END(pBlock);
×
1013

UNCOV
1014
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
×
1015
            (int32_t)taosArrayGetSize(blocks));
1016

UNCOV
1017
    int32_t targetIdx = 0;
×
UNCOV
1018
    int32_t sourceIdx = 0;
×
UNCOV
1019
    int32_t colActual = blockDataGetNumOfCols(pBlock);
×
UNCOV
1020
    while (targetIdx < colActual) {
×
UNCOV
1021
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
×
UNCOV
1022
      SColVal          colVal = {0};
×
UNCOV
1023
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
×
UNCOV
1024
      SET_DATA
×
1025
    }
1026

UNCOV
1027
    curRow++;
×
1028
  }
UNCOV
1029
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
UNCOV
1030
  pLastBlock->info.rows = curRow - lastRow;
×
1031

UNCOV
1032
END:
×
UNCOV
1033
  taosMemoryFreeClear(pTSchema);
×
UNCOV
1034
  taosMemoryFree(assigned);
×
UNCOV
1035
  return code;
×
1036
}
1037

UNCOV
1038
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
×
UNCOV
1039
  if (pReader == NULL || blocks == NULL || schemas == NULL) {
×
1040
    return TSDB_CODE_INVALID_PARA;
×
1041
  }
UNCOV
1042
  tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
×
UNCOV
1043
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
×
UNCOV
1044
  if (pSubmitTbData == NULL) {
×
1045
    return terrno;
×
1046
  }
UNCOV
1047
  pReader->nextBlk++;
×
1048

UNCOV
1049
  if (pSubmitTbDataRet) {
×
UNCOV
1050
    *pSubmitTbDataRet = pSubmitTbData;
×
1051
  }
1052

UNCOV
1053
  int32_t sversion = pSubmitTbData->sver;
×
UNCOV
1054
  int64_t uid = pSubmitTbData->uid;
×
UNCOV
1055
  pReader->lastBlkUid = uid;
×
1056

UNCOV
1057
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
×
UNCOV
1058
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime);
×
UNCOV
1059
  if (pReader->pSchemaWrapper == NULL) {
×
UNCOV
1060
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
×
1061
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
UNCOV
1062
    pReader->cachedSchemaSuid = 0;
×
UNCOV
1063
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
1064
  }
1065

UNCOV
1066
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
×
UNCOV
1067
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
×
1068
  } else {
UNCOV
1069
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
×
1070
  }
1071
}
1072

UNCOV
1073
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) {
×
UNCOV
1074
  if (pReader == NULL){
×
1075
    return;
×
1076
  }
UNCOV
1077
  pReader->pColIdList = pColIdList;
×
1078
}
1079

UNCOV
1080
void tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
×
UNCOV
1081
  if (pReader == NULL || tbUidList == NULL) {
×
1082
    return;
×
1083
  }
UNCOV
1084
  if (pReader->tbIdHash) {
×
UNCOV
1085
    taosHashClear(pReader->tbIdHash);
×
1086
  } else {
UNCOV
1087
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
UNCOV
1088
    if (pReader->tbIdHash == NULL) {
×
1089
      tqError("s-task:%s failed to init hash table", id);
×
1090
      return;
×
1091
    }
1092
  }
1093

UNCOV
1094
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
×
UNCOV
1095
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
×
UNCOV
1096
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
×
1097
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1098
      continue;
×
1099
    }
1100
  }
1101

UNCOV
1102
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
×
1103
}
1104

UNCOV
1105
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
×
UNCOV
1106
  if (pReader == NULL || pTableUidList == NULL) {
×
1107
    return;
×
1108
  }
UNCOV
1109
  if (pReader->tbIdHash == NULL) {
×
1110
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1111
    if (pReader->tbIdHash == NULL) {
×
1112
      tqError("failed to init hash table");
×
1113
      return;
×
1114
    }
1115
  }
1116

UNCOV
1117
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
×
UNCOV
1118
  for (int i = 0; i < numOfTables; i++) {
×
UNCOV
1119
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
×
UNCOV
1120
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
×
1121
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1122
      continue;
×
1123
    }
1124
  }
1125
}
1126

UNCOV
1127
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
UNCOV
1128
  if (pReader == NULL) {
×
1129
    return false;
×
1130
  }
UNCOV
1131
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1132
}
1133

UNCOV
1134
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
UNCOV
1135
  if (pReader == NULL) {
×
1136
    return false;
×
1137
  }
UNCOV
1138
  return pReader->msg.msgStr == NULL;
×
1139
}
1140

UNCOV
1141
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
×
UNCOV
1142
  if (pReader == NULL || tbUidList == NULL) {
×
1143
    return;
×
1144
  }
UNCOV
1145
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
×
1146
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
×
1147
    if (pKey && taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t)) != 0) {
×
1148
      tqError("failed to remove table uid:%" PRId64 " from hash", *pKey);
×
1149
    }
1150
  }
1151
}
1152

1153
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
568✔
1154
  if (pTq == NULL || tbUidList == NULL) {
568!
1155
    return TSDB_CODE_INVALID_PARA;
×
1156
  }
1157
  void*   pIter = NULL;
568✔
1158
  int32_t vgId = TD_VID(pTq->pVnode);
568✔
1159

1160
  // update the table list for each consumer handle
1161
  taosWLockLatch(&pTq->lock);
568✔
UNCOV
1162
  while (1) {
×
1163
    pIter = taosHashIterate(pTq->pHandle, pIter);
568✔
1164
    if (pIter == NULL) {
568!
1165
      break;
568✔
1166
    }
1167

UNCOV
1168
    STqHandle* pTqHandle = (STqHandle*)pIter;
×
UNCOV
1169
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
×
UNCOV
1170
      int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd);
×
UNCOV
1171
      if (code != 0) {
×
1172
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1173
        continue;
×
1174
      }
UNCOV
1175
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
×
UNCOV
1176
      if (!isAdd) {
×
UNCOV
1177
        int32_t sz = taosArrayGetSize(tbUidList);
×
UNCOV
1178
        for (int32_t i = 0; i < sz; i++) {
×
1179
          int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1180
          if (tbUid &&
×
1181
              taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1182
            tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1183
            continue;
×
1184
          }
1185
        }
1186
      }
UNCOV
1187
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
UNCOV
1188
      if (isAdd) {
×
UNCOV
1189
        SArray* list = NULL;
×
UNCOV
1190
        int     ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node,
×
1191
                                    &list, pTqHandle->execHandle.task);
UNCOV
1192
        if (ret != TDB_CODE_SUCCESS) {
×
1193
          tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey,
×
1194
                  pTqHandle->consumerId);
1195
          taosArrayDestroy(list);
×
1196
          taosHashCancelIterate(pTq->pHandle, pIter);
×
1197
          taosWUnLockLatch(&pTq->lock);
×
1198

1199
          return ret;
×
1200
        }
UNCOV
1201
        tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
×
UNCOV
1202
        taosArrayDestroy(list);
×
1203
      } else {
1204
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1205
      }
1206
    }
1207
  }
1208
  taosWUnLockLatch(&pTq->lock);
568✔
1209

1210
  // update the table list handle for each stream scanner/wal reader
1211
  streamMetaWLock(pTq->pStreamMeta);
568✔
UNCOV
1212
  while (1) {
×
1213
    pIter = taosHashIterate(pTq->pStreamMeta->pTasksMap, pIter);
568✔
1214
    if (pIter == NULL) {
568!
1215
      break;
568✔
1216
    }
1217

UNCOV
1218
    int64_t      refId = *(int64_t*)pIter;
×
UNCOV
1219
    SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
×
UNCOV
1220
    if (pTask != NULL) {
×
UNCOV
1221
      int32_t taskId = pTask->id.taskId;
×
1222

UNCOV
1223
      if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
×
UNCOV
1224
        int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
×
UNCOV
1225
        if (code != 0) {
×
UNCOV
1226
          tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
×
1227
        }
1228
      }
UNCOV
1229
      int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
×
UNCOV
1230
      if (ret) {
×
1231
        tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
×
1232
      }
1233
    }
1234
  }
1235

1236
  streamMetaWUnLock(pTq->pStreamMeta);
568✔
1237
  return 0;
568✔
1238
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc