• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4984

13 Mar 2026 03:38AM UTC coverage: 68.643% (-0.01%) from 68.653%
#4984

push

travis-ci

web-flow
feat/6641435300-save-audit-in-self (#34738)

434 of 584 new or added lines in 10 files covered. (74.32%)

3048 existing lines in 150 files now uncovered.

212713 of 309883 relevant lines covered (68.64%)

135561814.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "taoserror.h"
17
#include "tarray.h"
18
#include "tdef.h"
19
#include "thash.h"
20
#include "tmsg.h"
21
#include "tpriv.h"
22
#include "tq.h"
23

24
static int32_t tqRetrieveCols(STqReader *pReader, SSDataBlock *pRes, SHashObj* pCol2SlotId);
25

26
static void processCreateTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
4,495✔
27
  int32_t code = 0;
4,495✔
28
  int32_t lino = 0;
4,495✔
29
  int32_t        needRebuild = 0;
4,495✔
30
  SVCreateTbReq* pCreateReq = NULL;
4,495✔
31
  SVCreateTbBatchReq reqNew = {0};
4,495✔
32
  void* buf = NULL;
4,495✔
33
  SVCreateTbBatchReq req = {0};
4,495✔
34
  code = tDecodeSVCreateTbBatchReq(dcoder, &req);
4,495✔
35
  if (code < 0) {
4,495✔
36
    lino = __LINE__;
×
37
    goto end;
×
38
  }
39

40
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
9,312✔
41
    pCreateReq = req.pReqs + iReq;
4,817✔
42
    if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) && 
4,817✔
43
         pCreateReq->ctb.suid == tbSuid &&
4,716✔
44
         taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {  
2,358✔
45
      needRebuild++;
1,713✔
46
    }
47
  }
48
  if (needRebuild == 0) {
4,495✔
49
    // do nothing
50
  } else if (needRebuild == req.nReqs) {
1,713✔
51
    *realTbSuid = tbSuid;
1,391✔
52
  } else {
53
    *realTbSuid = tbSuid;
322✔
54
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
322✔
55
    if (reqNew.pArray == NULL) {
322✔
56
      code = terrno;
×
57
      lino = __LINE__;
×
58
      goto end;
×
59
    }
60
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
966✔
61
      pCreateReq = req.pReqs + iReq;
644✔
62
      if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) &&
644✔
63
          pCreateReq->ctb.suid == tbSuid &&
1,288✔
64
          taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
644✔
65
        reqNew.nReqs++;
322✔
66
        if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
644✔
67
          code = terrno;
×
68
          lino = __LINE__;
×
69
          goto end;
×
70
        }
71
      }
72
    }
73

74
    int     tlen = 0;
322✔
75
    tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, code);
322✔
76
    buf = taosMemoryMalloc(tlen);
322✔
77
    if (NULL == buf || code < 0) {
322✔
78
      lino = __LINE__;
×
79
      goto end;
×
80
    }
81
    SEncoder coderNew = {0};
322✔
82
    tEncoderInit(&coderNew, buf, tlen);
322✔
83
    code = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
322✔
84
    tEncoderClear(&coderNew);
322✔
85
    if (code < 0) {
322✔
86
      lino = __LINE__;
×
87
      goto end;
×
88
    }
89
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
322✔
90
    pHead->bodyLen = tlen + sizeof(SMsgHead);
322✔
91
  }
92

93
end:
4,495✔
94
  taosMemoryFree(buf);
4,495✔
95
  taosArrayDestroy(reqNew.pArray);
4,495✔
96
  tDeleteSVCreateTbBatchReq(&req);
4,495✔
97
  if (code < 0) {
4,495✔
98
    tqError("processCreateTbMsg failed, code:%d, line:%d", code, lino);
×
99
  }
100
}
4,495✔
101

102
static void processAlterTbMsg(SDecoder* dcoder, STqReader* pReader, int64_t* realTbSuid) {
3,105✔
103
  SVAlterTbReq req = {0};
3,105✔
104
  SMetaReader mr = {0};
3,105✔
105
  int32_t lino = 0;
3,105✔
106
  int32_t code = tDecodeSVAlterTbReq(dcoder, &req);
3,105✔
107
  if (code < 0) {
3,105✔
108
    lino = __LINE__;
×
109
    goto end;
×
110
  }
111

112
  metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
3,105✔
113

114
  code = metaGetTableEntryByName(&mr, req.tbName);
3,105✔
115
  if (code < 0) {
3,105✔
116
    lino = __LINE__;
×
117
    goto end;
×
118
  }
119
  if (taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
3,105✔
120
    *realTbSuid = mr.me.ctbEntry.suid;
1,358✔
121
  }
122

123
end:
3,105✔
124
  taosArrayDestroy(req.pMultiTag);
3,105✔
125
  metaReaderClear(&mr);  
3,105✔
126
  if (code < 0) {
3,105✔
127
    tqError("processAlterTbMsg failed, code:%d, line:%d", code, lino);
×
128
  }
129
} 
3,105✔
130

131
static void processDropTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
×
132
  SVDropTbBatchReq req = {0};
×
133
  SVDropTbBatchReq reqNew = {0};
×
134
  void* buf = NULL;
×
135
  int32_t lino = 0;
×
136
  int32_t code = tDecodeSVDropTbBatchReq(dcoder, &req);
×
137
  if (code < 0) {
×
138
    lino = __LINE__;
×
139
    goto end;
×
140
  }
141

142
  int32_t      needRebuild = 0;
×
143
  SVDropTbReq* pDropReq = NULL;
×
144
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
145
    pDropReq = req.pReqs + iReq;
×
146

147
    if (pDropReq->suid == tbSuid &&
×
148
        taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
149
      needRebuild++;
×
150
    }
151
  }
152
  if (needRebuild == 0) {
×
153
    // do nothing
154
  } else if (needRebuild == req.nReqs) {
×
155
    *realTbSuid = tbSuid;
×
156
  } else {
157
    *realTbSuid = tbSuid;
×
158
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
159
    if (reqNew.pArray == NULL) {
×
160
      code = terrno;
×
161
      lino = __LINE__;
×
162
      goto end;
×
163
    }
164
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
165
      pDropReq = req.pReqs + iReq;
×
166
      if (pDropReq->suid == tbSuid &&
×
167
          taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
168
        reqNew.nReqs++;
×
169
        if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
170
          code = terrno;
×
171
          lino = __LINE__;
×
172
          goto end;
×
173
        }
174
      }
175
    }
176

177
    int     tlen = 0;
×
178
    tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, code);
×
179
    buf = taosMemoryMalloc(tlen);
×
180
    if (NULL == buf || code < 0) {
×
181
      lino = __LINE__;
×
182
      goto end;
×
183
    }
184
    SEncoder coderNew = {0};
×
185
    tEncoderInit(&coderNew, buf, tlen);
×
186
    code = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
187
    tEncoderClear(&coderNew);
×
188
    if (code != 0) {
×
189
      lino = __LINE__;
×
190
      goto end;
×
191
    }
192
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
193
    pHead->bodyLen = tlen + sizeof(SMsgHead);
×
194
  }
195

196
end:
×
197
  taosMemoryFree(buf);
×
198
  taosArrayDestroy(reqNew.pArray);
×
199
  if (code < 0) {
×
200
    tqError("processDropTbMsg failed, code:%d, line:%d", code, lino);
×
201
  }
202
}
×
203

204
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
165,941✔
205
  int32_t code = 0;
165,941✔
206
  int32_t lino = 0;
165,941✔
207
  if (pHandle == NULL || pHead == NULL) {
165,941✔
208
    return false;
×
209
  }
210
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
165,941✔
211
    return true;
156,272✔
212
  }
213

214
  STqExecHandle* pExec = &pHandle->execHandle;
9,669✔
215
  STqReader* pReader = pExec->pTqReader;
9,669✔
216

217
  int16_t msgType = pHead->msgType;
9,669✔
218
  char*   body = pHead->body;
9,669✔
219
  int32_t bodyLen = pHead->bodyLen;
9,669✔
220

221
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
9,669✔
222
  int64_t  realTbSuid = 0;
9,669✔
223
  SDecoder dcoder = {0};
9,669✔
224
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
9,669✔
225
  int32_t  len = bodyLen - sizeof(SMsgHead);
9,669✔
226
  tDecoderInit(&dcoder, data, len);
9,669✔
227

228
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
11,738✔
229
    SVCreateStbReq req = {0};
2,069✔
230
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
2,069✔
231
      goto end;
×
232
    }
233
    realTbSuid = req.suid;
2,069✔
234
  } else if (msgType == TDMT_VND_DROP_STB) {
7,600✔
235
    SVDropStbReq req = {0};
×
236
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
237
      goto end;
×
238
    }
239
    realTbSuid = req.suid;
×
240
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
7,600✔
241
    processCreateTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
4,495✔
242
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
3,105✔
243
    processAlterTbMsg(&dcoder, pReader, &realTbSuid);
3,105✔
244
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
245
    processDropTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
×
246
  } else if (msgType == TDMT_VND_DELETE) {
×
247
    SDeleteRes req = {0};
×
248
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
249
      goto end;
×
250
    }
251
    realTbSuid = req.suid;
×
252
  }
253

254
end:
9,669✔
255
  tDecoderClear(&dcoder);
9,669✔
256
  bool tmp = tbSuid == realTbSuid;
9,669✔
257
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
9,669✔
258
  return tmp;
9,669✔
259
}
260

261
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
51,248,864✔
262
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
51,248,864✔
263
    return -1;
×
264
  }
265
  int32_t code = -1;
51,357,652✔
266
  int32_t vgId = TD_VID(pTq->pVnode);
51,357,652✔
267
  int64_t id = pHandle->pWalReader->readerId;
51,380,802✔
268

269
  int64_t offset = *fetchOffset;
51,374,451✔
270
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
51,383,289✔
271
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
51,382,660✔
272
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
51,381,216✔
273

274
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
51,378,168✔
275
          ", 0x%" PRIx64,
276
          vgId, offset, lastVer, committedVer, appliedVer, id);
277

278
  while (offset <= appliedVer) {
54,470,519✔
279
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
48,939,158✔
280
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
281
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
282
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
283
      goto END;
×
284
    }
285

286
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
48,938,817✔
287
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
288

289
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
48,939,117✔
290
      code = walFetchBody(pHandle->pWalReader);
45,700,022✔
291
      goto END;
45,699,381✔
292
    } else {
293
      if (pHandle->fetchMeta != WITH_DATA) {
3,239,136✔
294
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
204,904✔
295
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
204,904✔
296
          code = walFetchBody(pHandle->pWalReader);
165,941✔
297
          if (code < 0) {
165,941✔
298
            goto END;
×
299
          }
300

301
          pHead = &(pHandle->pWalReader->pHead->head);
165,941✔
302
          if (isValValidForTable(pHandle, pHead)) {
165,941✔
303
            code = 0;
160,700✔
304
            goto END;
160,700✔
305
          } else {
306
            offset++;
5,241✔
307
            code = -1;
5,241✔
308
            continue;
5,241✔
309
          }
310
        }
311
      }
312
      code = walSkipFetchBody(pHandle->pWalReader);
3,073,195✔
313
      if (code < 0) {
3,073,195✔
314
        goto END;
×
315
      }
316
      offset++;
3,073,195✔
317
    }
318
    code = -1;
3,073,195✔
319
  }
320

321
END:
5,531,361✔
322
  *fetchOffset = offset;
51,391,442✔
323
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
51,391,483✔
324
          ", applied:%" PRId64 ", 0x%" PRIx64,
325
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
326
  return code;
51,392,424✔
327
}
328

329
bool tqGetTablePrimaryKey(STqReader* pReader) {
6,221,063✔
330
  if (pReader == NULL) {
6,221,063✔
331
    return false;
×
332
  }
333
  return pReader->hasPrimaryKey;
6,221,063✔
334
}
335

336
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
48,053✔
337
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
48,053✔
338

339
  if (pReader == NULL) {
48,053✔
340
    return;
×
341
  }
342
  bool            ret = false;
48,053✔
343
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnode->pMeta, uid, -1, 1, NULL, 0);
48,053✔
344
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
47,734✔
345
    ret = true;
926✔
346
  }
347
  tDeleteSchemaWrapper(schema);
348
  pReader->hasPrimaryKey = ret;
47,734✔
349
}
350

351
static void freeTagCache(void* pData){
1,679,480✔
352
  if (pData == NULL) return;
1,679,480✔
353
  SArray* tagCache = *(SArray**)pData;
1,679,480✔
354
  taosArrayDestroyP(tagCache, taosMemFree);
1,679,480✔
355
}
356

357
STqReader* tqReaderOpen(SVnode* pVnode) {
457,764✔
358
  tqDebug("%s:%p", __FUNCTION__, pVnode);
457,764✔
359
  if (pVnode == NULL) {
460,493✔
360
    return NULL;
×
361
  }
362
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
460,493✔
363
  if (pReader == NULL) {
460,493✔
364
    return NULL;
×
365
  }
366

367
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
460,493✔
368
  if (pReader->pWalReader == NULL) {
460,493✔
369
    taosMemoryFree(pReader);
×
370
    return NULL;
×
371
  }
372

373
  pReader->pVnode = pVnode;
460,318✔
374
  pReader->pSchemaWrapper = NULL;
460,318✔
375
  pReader->tbIdHash = NULL;
460,318✔
376
  pReader->pTableTagCacheForTmq = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
460,318✔
377
  if (pReader->pTableTagCacheForTmq == NULL) {
460,493✔
378
    walCloseReader(pReader->pWalReader);
×
379
    taosMemoryFree(pReader);
×
380
    return NULL;
×
381
  }
382
  taosHashSetFreeFp(pReader->pTableTagCacheForTmq, freeTagCache);
460,493✔
383
  taosInitRWLatch(&pReader->tagCachelock);
460,493✔
384

385
  return pReader;
460,493✔
386
}
387

388
void tqReaderClose(STqReader* pReader) {
464,658✔
389
  tqDebug("%s:%p", __FUNCTION__, pReader);
464,658✔
390
  if (pReader == NULL) return;
464,929✔
391

392
  // close wal reader
393
  walCloseReader(pReader->pWalReader);
460,493✔
394
  taosHashCleanup(pReader->pTableTagCacheForTmq);
460,493✔
395
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
460,493✔
396
  taosMemoryFree(pReader->pTSchema);
460,493✔
397
  taosMemoryFree(pReader->extSchema);
460,493✔
398

399
  // free hash
400
  taosHashCleanup(pReader->tbIdHash);
460,493✔
401
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
460,493✔
402

403
  taosMemoryFree(pReader);
459,660✔
404
}
405

406
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
1,013,332✔
407
  if (pReader == NULL) {
1,013,332✔
408
    return TSDB_CODE_INVALID_PARA;
×
409
  }
410
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
1,013,332✔
411
    return terrno;
23,624✔
412
  }
413
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
990,382✔
414
  return 0;
990,653✔
415
}
416

417
static int32_t getTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid) {
109,521,431✔
418
  int32_t code = 0;
109,521,431✔
419
  int32_t lino = 0;
109,521,431✔
420

421
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
109,521,431✔
422
  if (data == NULL) {
109,615,290✔
423
    SStorageAPI api = {0}; 
38,772,861✔
424
    initStorageAPI(&api);
38,773,182✔
425
    code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, 0, &pReader->tagCachelock);
38,769,954✔
426
    TSDB_CHECK_CODE(code, lino, END);
38,766,340✔
427
  }
428

429
  END:
70,842,429✔
430
  if (code != TSDB_CODE_SUCCESS) {
109,603,352✔
431
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
432
  }
433
  
434
  return code;
109,563,718✔
435
}
436

437
void tqUpdateTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid, col_id_t colId) {
508,250✔
438
  int32_t code = 0;
508,250✔
439
  int32_t lino = 0;
508,250✔
440

441
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
508,250✔
442
  if (data == NULL) {
508,250✔
443
    return;
507,595✔
444
  }
445

446
  SStorageAPI api = {0}; 
655✔
447
  initStorageAPI(&api);
655✔
448
  code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, colId, &pReader->tagCachelock);
655✔
449
  TSDB_CHECK_CODE(code, lino, END);
655✔
450

451
  END:
655✔
452
  if (code != TSDB_CODE_SUCCESS) {
655✔
453
    tqError("%s failed at %d, failed to update tag cache code:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
454
  }
455
}
456

457
static int32_t tqRetrievePseudoCols(STqReader* pReader, SSDataBlock* pBlock, int32_t numOfRows, int64_t uid, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr) {
109,573,191✔
458
  if (pReader == NULL || pBlock == NULL) {
109,573,191✔
UNCOV
459
    return TSDB_CODE_INVALID_PARA;
×
460
  }
461
  int32_t code = TSDB_CODE_SUCCESS;
109,599,775✔
462
  int32_t lino = 0;
109,599,775✔
463
  
464
  code = getTableTagCache(pReader, pPseudoExpr, numOfPseudoExpr, uid);
109,599,775✔
465
  TSDB_CHECK_CODE(code, lino, END);
109,566,470✔
466

467
  code = fillTag(pReader->pTableTagCacheForTmq, pPseudoExpr, numOfPseudoExpr, uid, pBlock, numOfRows, pBlock->info.rows - numOfRows, 1, &pReader->tagCachelock);
109,566,470✔
468
  TSDB_CHECK_CODE(code, lino, END);
109,608,837✔
469

470
END:
109,608,837✔
471
  if (code != 0) {
109,608,837✔
472
    tqError("tqRetrievePseudoCols failed, line:%d, msg:%s", lino, tstrerror(code));
×
473
  }
474
  return code;
109,577,856✔
475
}
476

477
int32_t tqNextBlockInWal(STqReader* pReader, SSDataBlock* pRes, SHashObj* pCol2SlotId, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
25,320,391✔
478
                         int sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
479
  int32_t code = 0;
25,320,391✔
480
  if (pReader == NULL) {
25,320,391✔
481
    return TSDB_CODE_INVALID_PARA;
×
482
  }
483
  SWalReader* pWalReader = pReader->pWalReader;
25,320,391✔
484

485
  int64_t st = taosGetTimestampMs();
25,319,687✔
486
  while (1) {
130,937,694✔
487
    code = walNextValidMsg(pWalReader, false);
156,257,381✔
488
    if (code != 0) {
156,227,377✔
489
      break;
24,493,354✔
490
    }
491

492
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
131,734,023✔
493
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
131,794,272✔
494
    int64_t ver = pWalReader->pHead->head.version;
131,794,056✔
495
    SDecoder decoder = {0};
131,804,686✔
496
    code = tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder);
131,799,454✔
497
    tDecoderClear(&decoder);
131,747,056✔
498
    if (code != 0) {
131,763,598✔
499
      return code;
×
500
    }
501
    pReader->nextBlk = 0;
131,763,598✔
502

503
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
131,804,195✔
504
    while (pReader->nextBlk < numOfBlocks) {
263,611,690✔
505
      tqDebug("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
131,801,964✔
506
              pReader->msg.ver);
507

508
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
131,838,355✔
509
      if (pSubmitTbData == NULL) {
131,848,951✔
510
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
511
                pReader->msg.ver);
512
        return terrno;
×
513
      }
514
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
131,848,951✔
515
        pReader->nextBlk += 1;
28,726✔
516
        continue;
28,726✔
517
      }
518
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
131,819,184✔
519
        tqDebug("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
109,628,545✔
520
        int32_t numOfRows = pRes->info.rows;
109,623,295✔
521
        code = tqRetrieveCols(pReader, pRes, pCol2SlotId);
109,624,022✔
522
        if (code != TSDB_CODE_SUCCESS) {
109,553,414✔
523
          return code;
×
524
        }
525
        code = tqRetrievePseudoCols(pReader, pRes, numOfRows, pSubmitTbData->uid, pPseudoExpr, numOfPseudoExpr);
109,553,414✔
526
        if (code != TSDB_CODE_SUCCESS) {
109,576,052✔
527
          return code;
×
528
        }
529

530
      }
531
      pReader->nextBlk += 1;
131,748,820✔
532
    }
533

534
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
131,780,720✔
535
    pReader->msg.msgStr = NULL;
131,780,414✔
536

537
    if (pRes->info.rows >= minPollRows || (enableReplay && pRes->info.rows > 0)){
131,790,147✔
538
      break;
539
    }
540
    int64_t elapsed = taosGetTimestampMs() - st;
131,187,353✔
541
    if (elapsed > timeout || elapsed < 0) {
131,187,353✔
542
      code = TSDB_CODE_TMQ_FETCH_TIMEOUT;
261,063✔
543
      terrno = code;
261,063✔
544
      break;
232,750✔
545
    }
546
  }
547
  return code;
25,320,425✔
548
}
549

550
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
177,433,119✔
551
  if (pReader == NULL) {
177,433,119✔
552
    return TSDB_CODE_INVALID_PARA;
×
553
  }
554
  pReader->msg.msgStr = msgStr;
177,433,119✔
555
  pReader->msg.msgLen = msgLen;
177,471,537✔
556
  pReader->msg.ver = ver;
177,495,815✔
557

558
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
177,499,577✔
559

560
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
177,499,577✔
561
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
177,505,188✔
562

563
  if (code != 0) {
177,489,250✔
564
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
565
  }
566

567
  return code;
177,421,398✔
568
}
569

570
void tqReaderClearSubmitMsg(STqReader* pReader) {
91,292,450✔
571
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
91,292,450✔
572
  pReader->nextBlk = 0;
91,312,076✔
573
  pReader->msg.msgStr = NULL;
91,332,115✔
574
}
91,342,544✔
575

576
SWalReader* tqGetWalReader(STqReader* pReader) {
51,490,210✔
577
  if (pReader == NULL) {
51,490,210✔
578
    return NULL;
×
579
  }
580
  return pReader->pWalReader;
51,490,210✔
581
}
582

583
int64_t tqGetResultBlockTime(STqReader* pReader) {
25,319,872✔
584
  if (pReader == NULL) {
25,319,872✔
585
    return 0;
×
586
  }
587
  return pReader->lastTs;
25,319,872✔
588
}
589

590
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
23,265,906✔
591
  int32_t code = false;
23,265,906✔
592
  int32_t lino = 0;
23,265,906✔
593
  int64_t uid = 0;
23,265,906✔
594
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
23,265,906✔
595
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
23,265,906✔
596
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
23,266,961✔
597

598
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
23,267,237✔
599
  while (pReader->nextBlk < blockSz) {
24,432,157✔
600
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
12,217,466✔
601
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
12,219,116✔
602
    uid = pSubmitTbData->uid;
12,219,116✔
603
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
12,219,446✔
604
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
12,219,446✔
605

606
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
1,166,868✔
607
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
608
    pReader->nextBlk++;
1,166,868✔
609
  }
610

611
  tqReaderClearSubmitMsg(pReader);
12,215,330✔
612
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
12,214,670✔
613

614
END:
12,214,670✔
615
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
23,267,248✔
616
  return code;
23,267,294✔
617
}
618

619
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
66,889,318✔
620
  int32_t code = false;
66,889,318✔
621
  int32_t lino = 0;
66,889,318✔
622
  int64_t uid = 0;
66,889,318✔
623

624
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
66,889,318✔
625
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
66,889,318✔
626
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
66,945,689✔
627

628
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
66,945,689✔
629
  while (pReader->nextBlk < blockSz) {
66,961,409✔
630
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
33,493,552✔
631
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
33,494,982✔
632
    uid = pSubmitTbData->uid;
33,494,982✔
633
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
33,496,918✔
634
    TSDB_CHECK_NULL(ret, code, lino, END, true);
33,498,491✔
635
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
×
636
    pReader->nextBlk++;
×
637
  }
638
  tqReaderClearSubmitMsg(pReader);
33,471,797✔
639
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
33,461,271✔
640

641
END:
33,461,271✔
642
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
66,959,762✔
643
  return code;
66,847,853✔
644
}
645

646
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
44,266,151✔
647
                    SExtSchema* extSrc) {
648
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
44,266,151✔
649
    return TSDB_CODE_INVALID_PARA;
×
650
  }
651
  int32_t code = 0;
44,297,859✔
652

653
  int32_t cnt = 0;
44,297,859✔
654
  for (int32_t i = 0; i < pSrc->nCols; i++) {
247,553,041✔
655
    cnt += mask[i];
203,230,770✔
656
  }
657

658
  pDst->nCols = cnt;
44,296,759✔
659
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
44,339,557✔
660
  if (pDst->pSchema == NULL) {
44,288,711✔
661
    return TAOS_GET_TERRNO(terrno);
×
662
  }
663

664
  int32_t j = 0;
44,279,050✔
665
  for (int32_t i = 0; i < pSrc->nCols; i++) {
247,610,645✔
666
    if (mask[i]) {
203,232,710✔
667
      pDst->pSchema[j++] = pSrc->pSchema[i];
203,292,517✔
668
      SColumnInfoData colInfo =
203,284,684✔
669
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
203,305,870✔
670
      if (extSrc != NULL) {
203,269,574✔
671
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
58,807✔
672
      }
673
      code = blockDataAppendColInfo(pBlock, &colInfo);
203,269,574✔
674
      if (code != 0) {
203,305,974✔
675
        return code;
×
676
      }
677
    }
678
  }
679
  return 0;
44,342,837✔
680
}
681

682
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
683
  int32_t code = 0;
×
684
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
685
    return TSDB_CODE_INVALID_PARA;
×
686
  }
687
  // TODO(yhDeng)
688
  if (COL_VAL_IS_VALUE(pColVal)) {
×
689
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
690
    if (val == NULL) {
×
691
      return terrno;
×
692
    }
693

694
    uint64_t seq = 0;
×
695
    int32_t  len = 0;
×
696
    if (pColVal->value.pData != NULL) {
×
697
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
698
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
699
      }
700
      SBlobItem item = {0};
×
701
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
702
      if (code != 0) {
×
703
        taosMemoryFree(val);
×
704
        terrno = code;
×
705
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
706
        return code;
×
707
      }
708

709
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
710
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
711
      len = item.len;
×
712
    }
713

714
    blobDataSetLen(val, len);
×
715
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
716

717
    taosMemoryFree(val);
×
718
  } else {
719
    colDataSetNULL(pColumnInfoData, idx);
×
720
  }
721
  return code;
×
722
}
723
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
724
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
725

726
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647✔
727
    if (COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
728
      char val[65535 + 2] = {0};
2,147,483,647✔
729
      if (pColVal->value.pData != NULL) {
2,147,483,647✔
730
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
2,147,483,647✔
731
      }
732
      varDataSetLen(val, pColVal->value.nData);
2,147,483,647✔
733
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
2,147,483,647✔
734
    } else {
735
      colDataSetNULL(pColumnInfoData, rowIndex);
×
736
    }
737
  } else {
738
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647✔
739
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647✔
740
  }
741

742
  return code;
2,147,483,647✔
743
}
744

745
static int32_t setBlockData(SSDataBlock* pBlock, int32_t slotId, int32_t rowIndex, SColVal* colVal, SBlobSet* pBlobSet) {
2,147,483,647✔
746
  int32_t        code = 0;
2,147,483,647✔
747
  SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
748
  if (pColData == NULL) {
2,147,483,647✔
749
    return terrno;
×
750
  }
751

752
  uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647✔
753
  if (isBlob == 0) {
2,147,483,647✔
754
    code = doSetVal(pColData, rowIndex, colVal);
2,147,483,647✔
755
  } else {
UNCOV
756
    code = doSetBlobVal(pColData, rowIndex, colVal, pBlobSet);
×
757
  }
758
  return code;
2,147,483,647✔
759
}
760

761
static int32_t processSubmitRow(SArray*         pRows,
109,617,801✔
762
                                SSDataBlock*    pBlock,
763
                                SHashObj*       pCol2SlotId,
764
                                STqReader*      pReader,
765
                                SBlobSet*       pBlobSet) {
766
  int32_t        code = 0;
109,617,801✔
767
  int32_t        line = 0;
109,617,801✔
768

769
  SArray* pColArray = taosArrayInit(4, INT_BYTES * 2);
109,617,801✔
770
  TSDB_CHECK_NULL(pColArray, code, line, END, terrno);
109,622,384✔
771

772
  int32_t sourceIdx = -1;
109,622,384✔
773
  int32_t rowIndex = 0;
109,622,384✔
774
  SRow* pRow = taosArrayGetP(pRows, rowIndex);
109,622,384✔
775
  TSDB_CHECK_NULL(pRow, code, line, END, terrno);
109,619,330✔
776
  while (++sourceIdx < pReader->pTSchema->numOfCols) {
930,440,430✔
777
    SColVal colVal = {0};
821,011,614✔
778
    code = tRowGet(pRow, pReader->pTSchema, sourceIdx, &colVal);
820,980,007✔
779
    TSDB_CHECK_CODE(code, line, END);
821,180,937✔
780
    void* pSlotId = taosHashGet(pCol2SlotId, &colVal.cid, sizeof(colVal.cid));
821,180,937✔
781
    if (pSlotId == NULL) {
821,621,071✔
782
      continue;
326,796,801✔
783
    }
784
    int32_t pData[2] = {sourceIdx, *(int16_t*)pSlotId};
494,824,270✔
785
    TSDB_CHECK_NULL(taosArrayPush(pColArray, pData), code, line, END, terrno);
494,814,126✔
786
    code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
494,814,126✔
787
    TSDB_CHECK_CODE(code, line, END);
494,183,128✔
788
  }
789
  
790
  for (rowIndex = 1; rowIndex < taosArrayGetSize(pRows); rowIndex++) {
2,147,483,647✔
791
    SRow* pRow = taosArrayGetP(pRows, rowIndex);
2,147,483,647✔
792
    TSDB_CHECK_NULL(pRow, code, line, END, terrno);
2,147,483,647✔
793
    for (int32_t j = 0; j < taosArrayGetSize(pColArray); j++) {
2,147,483,647✔
794
      int32_t* pData = taosArrayGet(pColArray, j);
2,147,483,647✔
795
      TSDB_CHECK_NULL(pData, code, line, END, terrno);
2,147,483,647✔
796

797
      SColVal colVal = {0};
2,147,483,647✔
798
      code = tRowGet(pRow, pReader->pTSchema, pData[0], &colVal);
2,147,483,647✔
799
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
800

801
      code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
2,147,483,647✔
802
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
803
    }
804
  }
805

806
  END:
110,566,052✔
807
  taosArrayDestroy(pColArray);
102,804,059✔
808
  return code;
109,591,537✔
809
}
810

811
static int32_t processSubmitCol(SArray*         pCols,
1,316✔
812
                                SSDataBlock*    pBlock,
813
                                SHashObj*       pCol2SlotId,
814
                                SBlobSet*       pBlobSet) {
815
  int32_t        code = 0;
1,316✔
816
  int32_t        line = 0;
1,316✔
817

818
  for (int32_t i = 0; i < taosArrayGetSize(pCols); i++) {
3,948✔
819
    SColData* pCol = taosArrayGet(pCols, i);
2,632✔
820
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
2,632✔
821
    void* pSlotId = taosHashGet(pCol2SlotId, &pCol->cid, sizeof(pCol->cid));
2,632✔
822
    if (pSlotId == NULL) {
2,632✔
823
      continue;
1,316✔
824
    }
825
    SColVal colVal = {0};
1,316✔
826
    for (int32_t row = 0; row < pCol->nVal; row++) {
3,948✔
827
      code = tColDataGetValue(pCol, row, &colVal);
2,632✔
828
      TSDB_CHECK_CODE(code, line, END);
2,632✔
829

830
      code = setBlockData(pBlock, *(int16_t*)pSlotId, pBlock->info.rows + row, &colVal, pBlobSet);
2,632✔
831
      TSDB_CHECK_CODE(code, line, END);
2,632✔
832
    }
833
  }
834
  
835
  END:
1,316✔
836
  return code;
1,316✔
837
}
838

839
static int32_t checkSchema(STqReader* pReader, SSubmitTbData* pSubmitTbData) {
109,615,945✔
840
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
109,615,945✔
841
  int32_t sversion = pSubmitTbData->sver;
109,627,487✔
842
  int64_t suid = pSubmitTbData->suid;
109,625,467✔
843
  int64_t uid = pSubmitTbData->uid;
109,622,945✔
844
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
109,624,695✔
845
      (pReader->cachedSchemaVer != sversion)) {
109,477,313✔
846
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
150,882✔
847
    taosMemoryFreeClear(pReader->extSchema);
148,854✔
848
    taosMemoryFreeClear(pReader->pTSchema);
148,854✔
849
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
148,854✔
850
    if (pReader->pSchemaWrapper == NULL) {
148,854✔
851
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d, possibly dropped table",
×
852
              vgId, suid, uid, pReader->cachedSchemaVer);
853
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
854
    }
855
    pReader->pTSchema = tBuildTSchema(pReader->pSchemaWrapper->pSchema, pReader->pSchemaWrapper->nCols, pReader->pSchemaWrapper->version);
148,854✔
856
    if (pReader->pTSchema == NULL) {
148,854✔
857
      tqWarn("vgId:%d, cannot build schema for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d",
×
858
              vgId, suid, uid, pReader->cachedSchemaVer);
859
      return terrno;
×
860
    }
861
    pReader->cachedSchemaUid = uid;
148,192✔
862
    pReader->cachedSchemaSuid = suid;
148,513✔
863
    pReader->cachedSchemaVer = sversion;
148,513✔
864
  }
865
  return TSDB_CODE_SUCCESS;
109,614,626✔
866
}
867

868
static int32_t tqRetrieveCols(STqReader* pReader, SSDataBlock* pBlock, SHashObj* pCol2SlotId) {
109,622,604✔
869
  if (pReader == NULL || pBlock == NULL) {
109,622,604✔
870
    return TSDB_CODE_INVALID_PARA;
×
871
  }
872
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
109,627,495✔
873
  int32_t        code = 0;
109,607,006✔
874
  int32_t        line = 0;
109,607,006✔
875
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
109,607,006✔
876
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
109,608,274✔
877
  pReader->lastTs = pSubmitTbData->ctimeMs;
109,608,274✔
878

879
  int32_t numOfRows = 0;
109,624,345✔
880
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
109,624,345✔
881
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
1,316✔
882
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
1,316✔
883
    numOfRows = pCol->nVal;
1,316✔
884
  } else {
885
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
109,617,071✔
886
  }
887

888
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfRows);
109,622,937✔
889
  TSDB_CHECK_CODE(code, line, END);
109,619,085✔
890

891
  code = checkSchema(pReader, pSubmitTbData);
109,619,085✔
892
  TSDB_CHECK_CODE(code, line, END);
109,622,676✔
893

894
  // convert and scan one block
895
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
109,622,676✔
896
    SArray* pCols = pSubmitTbData->aCol;
1,316✔
897
    code = processSubmitCol(pCols, pBlock, pCol2SlotId, pSubmitTbData->pBlobSet);
1,316✔
898
    TSDB_CHECK_CODE(code, line, END);
1,316✔
899
  } else {
900
    SArray*         pRows = pSubmitTbData->aRowP;
109,622,751✔
901
    code = processSubmitRow(pRows, pBlock, pCol2SlotId, pReader, pSubmitTbData->pBlobSet);
109,621,360✔
902
    TSDB_CHECK_CODE(code, line, END);
109,548,444✔
903
  }
904
  pBlock->info.rows += numOfRows;
109,549,760✔
905
END:
109,598,189✔
906
  if (code != 0) {
109,598,189✔
907
    tqError("tqRetrieveCols failed, line:%d, msg:%s", line, tstrerror(code));
×
908
  }
909
  return code;
109,577,224✔
910
}
911

912
#define PROCESS_VAL                                      \
913
  if (curRow == 0) {                                     \
914
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
915
    buildNew = true;                                     \
916
  } else {                                               \
917
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
918
    if (currentRowAssigned != assigned[j]) {             \
919
      assigned[j] = currentRowAssigned;                  \
920
      buildNew = true;                                   \
921
    }                                                    \
922
  }
923

924
#define SET_DATA                                                                                    \
925
  if (colVal.cid < pColData->info.colId) {                                                          \
926
    sourceIdx++;                                                                                    \
927
  } else if (colVal.cid == pColData->info.colId) {                                                  \
928
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
929
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
930
    } else {                                                                                        \
931
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
932
    }                                                                                               \
933
    sourceIdx++;                                                                                    \
934
    targetIdx++;                                                                                    \
935
  } else {                                                                                          \
936
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
937
    targetIdx++;                                                                                    \
938
  }
939

940
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
44,279,755✔
941
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
942
  int32_t         code = 0;
44,279,755✔
943
  SSchemaWrapper* pSW = NULL;
44,279,755✔
944
  SSDataBlock*    block = NULL;
44,298,347✔
945
  if (taosArrayGetSize(blocks) > 0) {
44,298,347✔
946
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
947
    TQ_NULL_GO_TO_END(pLastBlock);
×
948
    pLastBlock->info.rows = curRow - *lastRow;
×
949
    *lastRow = curRow;
×
950
  }
951

952
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
44,305,409✔
953
  TQ_NULL_GO_TO_END(block);
44,249,392✔
954

955
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
44,249,392✔
956
  TQ_NULL_GO_TO_END(pSW);
44,266,957✔
957

958
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
44,266,957✔
959
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
44,341,937✔
960
          (int32_t)taosArrayGetSize(block->pDataBlock));
961

962
  block->info.id.uid = pSubmitTbData->uid;
44,341,937✔
963
  block->info.version = pReader->msg.ver;
44,320,196✔
964
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
44,338,786✔
965
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
44,330,907✔
966
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
44,322,567✔
967
  pSW = NULL;
44,322,567✔
968

969
  taosMemoryFreeClear(block);
44,322,567✔
970

971
END:
44,321,862✔
972
  if (code != 0) {
44,319,493✔
973
    tqError("processBuildNew failed, code:%d", code);
×
974
  }
975
  tDeleteSchemaWrapper(pSW);
44,319,493✔
976
  blockDataFreeRes(block);
44,272,294✔
977
  taosMemoryFree(block);
44,274,620✔
978
  return code;
44,314,719✔
979
}
980
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
880,210✔
981
  int32_t code = 0;
880,210✔
982
  int32_t curRow = 0;
880,210✔
983
  int32_t lastRow = 0;
880,210✔
984

985
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
881,820✔
986
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
883,752✔
987
  TQ_NULL_GO_TO_END(assigned);
881,819✔
988

989
  SArray*   pCols = pSubmitTbData->aCol;
881,819✔
990
  SColData* pCol = taosArrayGet(pCols, 0);
882,785✔
991
  TQ_NULL_GO_TO_END(pCol);
883,751✔
992
  int32_t numOfRows = pCol->nVal;
883,751✔
993
  int32_t numOfCols = taosArrayGetSize(pCols);
884,073✔
994
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
883,751✔
995
          numOfRows);
996
  for (int32_t i = 0; i < numOfRows; i++) {
143,716,895✔
997
    bool buildNew = false;
143,046,653✔
998

999
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
569,908,889✔
1000
      int32_t k = 0;
424,991,604✔
1001
      for (; k < numOfCols; k++) {
843,071,987✔
1002
        pCol = taosArrayGet(pCols, k);
821,268,504✔
1003
        TQ_NULL_GO_TO_END(pCol);
817,183,449✔
1004
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
817,183,449✔
1005
          SColVal colVal = {0};
421,968,524✔
1006
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
423,090,364✔
1007
          PROCESS_VAL
429,530,320✔
1008
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
429,843,661✔
1009
          break;
427,934,209✔
1010
        }
1011
      }
1012
      if (k >= numOfCols) {
426,862,236✔
1013
        // this column is not in the current row, so we set it to NULL
1014
        assigned[j] = 0;
×
1015
        buildNew = true;
×
1016
      }
1017
    }
1018

1019
    if (buildNew) {
141,856,322✔
1020
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
886,650✔
1021
    }
1022

1023
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
141,854,712✔
1024
    TQ_NULL_GO_TO_END(pBlock);
143,290,907✔
1025

1026
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
143,290,907✔
1027
            (int32_t)taosArrayGetSize(blocks));
1028

1029
    int32_t targetIdx = 0;
143,290,907✔
1030
    int32_t sourceIdx = 0;
143,290,907✔
1031
    int32_t colActual = blockDataGetNumOfCols(pBlock);
143,290,907✔
1032
    while (targetIdx < colActual && sourceIdx < numOfCols) {
566,819,952✔
1033
      pCol = taosArrayGet(pCols, sourceIdx);
423,986,808✔
1034
      TQ_NULL_GO_TO_END(pCol);
423,411,988✔
1035
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
423,411,988✔
1036
      TQ_NULL_GO_TO_END(pColData);
418,786,664✔
1037
      SColVal colVal = {0};
418,786,664✔
1038
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
421,268,630✔
1039
      SET_DATA
426,507,634✔
1040
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
426,857,291✔
1041
    }
1042

1043
    curRow++;
142,833,144✔
1044
  }
1045
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
676,912✔
1046
  pLastBlock->info.rows = curRow - lastRow;
886,328✔
1047
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
886,006✔
1048
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1049
END:
28,948,173✔
1050
  if (code != TSDB_CODE_SUCCESS) {
886,328✔
1051
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1052
  }
1053
  taosMemoryFree(assigned);
885,362✔
1054
  return code;
886,328✔
1055
}
1056

1057
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
43,350,919✔
1058
  int32_t   code = 0;
43,350,919✔
1059
  STSchema* pTSchema = NULL;
43,350,919✔
1060

1061
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
43,350,919✔
1062
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
43,425,847✔
1063
  TQ_NULL_GO_TO_END(assigned);
43,405,447✔
1064

1065
  int32_t curRow = 0;
43,405,447✔
1066
  int32_t lastRow = 0;
43,405,447✔
1067
  SArray* pRows = pSubmitTbData->aRowP;
43,381,354✔
1068
  int32_t numOfRows = taosArrayGetSize(pRows);
43,424,514✔
1069
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
43,433,977✔
1070
  TQ_NULL_GO_TO_END(pTSchema);
43,420,392✔
1071
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
43,420,392✔
1072

1073
  for (int32_t i = 0; i < numOfRows; i++) {
1,269,661,510✔
1074
    bool  buildNew = false;
1,226,308,656✔
1075
    SRow* pRow = taosArrayGetP(pRows, i);
1,226,308,656✔
1076
    TQ_NULL_GO_TO_END(pRow);
1,225,703,133✔
1077

1078
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
2,147,483,647✔
1079
      SColVal colVal = {0};
2,147,483,647✔
1080
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2,147,483,647✔
1081
      PROCESS_VAL
2,147,483,647✔
1082
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
2,147,483,647✔
1083
    }
1084

1085
    if (buildNew) {
1,217,360,580✔
1086
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
43,453,991✔
1087
    }
1088

1089
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
1,217,329,049✔
1090
    TQ_NULL_GO_TO_END(pBlock);
1,225,992,453✔
1091

1092
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
1,225,992,453✔
1093
            (int32_t)taosArrayGetSize(blocks));
1094

1095
    int32_t targetIdx = 0;
1,225,992,453✔
1096
    int32_t sourceIdx = 0;
1,225,992,453✔
1097
    int32_t colActual = blockDataGetNumOfCols(pBlock);
1,225,992,453✔
1098
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
2,147,483,647✔
1099
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2,147,483,647✔
1100
      TQ_NULL_GO_TO_END(pColData);
2,147,483,647✔
1101
      SColVal          colVal = {0};
2,147,483,647✔
1102
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2,147,483,647✔
1103
      SET_DATA
2,147,483,647✔
1104
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
2,147,483,647✔
1105
    }
1106

1107
    curRow++;
1,226,299,593✔
1108
  }
1109
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
43,352,854✔
1110
  if (pLastBlock != NULL) {
43,440,260✔
1111
    pLastBlock->info.rows = curRow - lastRow;
43,446,560✔
1112
  }
1113

1114
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
43,450,524✔
1115
          (int)taosArrayGetSize(blocks));
1116
END:
45,542,100✔
1117
  if (code != TSDB_CODE_SUCCESS) {
43,399,635✔
1118
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1119
  }
1120
  taosMemoryFreeClear(pTSchema);
43,381,772✔
1121
  taosMemoryFree(assigned);
43,378,172✔
1122
  return code;
43,415,509✔
1123
}
1124

1125
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
8,674✔
1126
  int32_t code = 0;
8,674✔
1127
  int32_t lino = 0;
8,674✔
1128
  void*   createReq = NULL;
8,674✔
1129
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
8,674✔
1130
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
8,674✔
1131

1132
  if (pRsp->createTableNum == 0) {
8,674✔
1133
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
4,524✔
1134
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
4,524✔
1135
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
4,524✔
1136
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
4,524✔
1137
  }
1138

1139
  uint32_t len = 0;
8,674✔
1140
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
8,674✔
1141
  TSDB_CHECK_CODE(code, lino, END);
8,674✔
1142
  createReq = taosMemoryCalloc(1, len);
8,674✔
1143
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
8,674✔
1144

1145
  SEncoder encoder = {0};
8,674✔
1146
  tEncoderInit(&encoder, createReq, len);
8,674✔
1147
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
8,674✔
1148
  tEncoderClear(&encoder);
8,674✔
1149
  TSDB_CHECK_CODE(code, lino, END);
8,674✔
1150
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
17,348✔
1151
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
17,348✔
1152
  pRsp->createTableNum++;
8,674✔
1153
  tqTrace("build create table info msg success");
8,674✔
1154

1155
END:
8,674✔
1156
  if (code != 0) {
8,674✔
1157
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1158
    taosMemoryFree(createReq);
×
1159
  }
1160
  return code;
8,674✔
1161
}
1162

1163
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
44,534,089✔
1164
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1165
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
44,534,089✔
1166
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
44,534,089✔
1167
  if (pSubmitTbData == NULL) {
44,552,400✔
1168
    return terrno;
×
1169
  }
1170
  pReader->nextBlk++;
44,552,400✔
1171

1172
  if (pSubmitTbDataRet) {
44,543,170✔
1173
    *pSubmitTbDataRet = pSubmitTbData;
44,550,461✔
1174
  }
1175

1176
  if (fetchMeta == ONLY_META) {
44,545,955✔
1177
    if (pSubmitTbData->pCreateTbReq != NULL) {
6,874✔
1178
      if (pRsp->createTableReq == NULL) {
1,996✔
1179
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
1,183✔
1180
        if (pRsp->createTableReq == NULL) {
1,183✔
1181
          return terrno;
×
1182
        }
1183
      }
1184
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
3,992✔
1185
        return terrno;
×
1186
      }
1187
      pSubmitTbData->pCreateTbReq = NULL;
1,996✔
1188
    }
1189
    return 0;
6,874✔
1190
  }
1191

1192
  int32_t sversion = pSubmitTbData->sver;
44,539,081✔
1193
  int64_t uid = pSubmitTbData->uid;
44,536,269✔
1194
  pReader->lastBlkUid = uid;
44,533,578✔
1195

1196
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
44,542,256✔
1197
  taosMemoryFreeClear(pReader->extSchema);
44,525,240✔
1198
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
44,536,383✔
1199
  if (pReader->pSchemaWrapper == NULL) {
44,488,077✔
1200
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
204,503✔
1201
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1202
    pReader->cachedSchemaSuid = 0;
204,503✔
1203
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
204,503✔
1204
  }
1205

1206
  if (pSubmitTbData->pCreateTbReq != NULL) {
44,288,872✔
1207
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
8,674✔
1208
    if (code != 0) {
8,674✔
1209
      return code;
×
1210
    }
1211
  } else if (rawList != NULL) {
44,271,503✔
1212
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1213
      return terrno;
×
1214
    }
1215
    pReader->pSchemaWrapper = NULL;
×
1216
    return 0;
×
1217
  }
1218

1219
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
44,280,177✔
1220
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
881,176✔
1221
  } else {
1222
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
43,374,054✔
1223
  }
1224
}
1225

1226
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
360,091✔
1227
  if (pReader == NULL || tbUidList == NULL) {
360,091✔
1228
    return TSDB_CODE_SUCCESS;
×
1229
  }
1230
  if (pReader->tbIdHash) {
360,412✔
1231
    taosHashClear(pReader->tbIdHash);
×
1232
  } else {
1233
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
359,777✔
1234
    if (pReader->tbIdHash == NULL) {
360,412✔
1235
      tqError("s-task:%s failed to init hash table", id);
×
1236
      return terrno;
×
1237
    }
1238
  }
1239

1240
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
10,022,142✔
1241
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
9,661,411✔
1242
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
9,659,551✔
1243
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1244
      continue;
×
1245
    }
1246
  }
1247

1248
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
360,412✔
1249
  return TSDB_CODE_SUCCESS;
360,412✔
1250
}
1251

1252
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
401,948✔
1253
  if (pReader == NULL || pTableUidList == NULL) {
401,948✔
1254
    return;
×
1255
  }
1256
  if (pReader->tbIdHash == NULL) {
401,948✔
1257
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1258
    if (pReader->tbIdHash == NULL) {
×
1259
      tqError("failed to init hash table");
×
1260
      return;
×
1261
    }
1262
  }
1263

1264
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
401,948✔
1265
  for (int i = 0; i < numOfTables; i++) {
778,107✔
1266
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
376,159✔
1267
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
376,159✔
1268
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1269
      continue;
×
1270
    }
1271
    tqDebug("%s add table uid:%" PRId64 " to hash", __func__, *pKey);
376,159✔
1272
  }
1273
}
1274

1275
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1276
  if (pReader == NULL) {
×
1277
    return false;
×
1278
  }
1279
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1280
}
1281

1282
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1283
  if (pReader == NULL) {
×
1284
    return false;
×
1285
  }
1286
  return pReader->msg.msgStr == NULL;
×
1287
}
1288

1289
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
2,482✔
1290
  if (pReader == NULL || tbUidList == NULL) {
2,482✔
1291
    return;
×
1292
  }
1293
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
3,532✔
1294
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
1,050✔
1295
    int32_t code = taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
1,050✔
1296
    if (code != 0) {
1,050✔
1297
      tqWarn("%s failed to remove table uid:%" PRId64 " from hash, msg:%s", __func__, pKey != NULL ? *pKey : 0, tstrerror(code));
350✔
1298
    }
1299
  }
1300
}
1301

1302
int32_t tqDeleteTbUidList(STQ* pTq, SArray* tbUidList) {
2,363,503✔
1303
  if (pTq == NULL) {
2,363,503✔
1304
    return 0;  // mounted vnode may have no tq
×
1305
  }
1306
  if (tbUidList == NULL) {
2,363,503✔
1307
    return TSDB_CODE_INVALID_PARA;
×
1308
  }
1309
  void*   pIter = NULL;
2,363,503✔
1310
  int32_t vgId = TD_VID(pTq->pVnode);
2,363,503✔
1311

1312
  // update the table list for each consumer handle
1313
  taosWLockLatch(&pTq->lock);
2,363,503✔
1314
  while (1) {
320,864✔
1315
    pIter = taosHashIterate(pTq->pHandle, pIter);
2,684,367✔
1316
    if (pIter == NULL) {
2,684,367✔
1317
      break;
2,363,503✔
1318
    }
1319

1320
    STqHandle* pTqHandle = (STqHandle*)pIter;
320,864✔
1321
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " delete table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
320,864✔
1322
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
320,864✔
1323
      int32_t code = qDeleteTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
1,432✔
1324
      if (code != 0) {
1,432✔
1325
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1326
        continue;
×
1327
      }
1328
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
319,432✔
1329
      int32_t sz = taosArrayGetSize(tbUidList);
319,432✔
1330
      for (int32_t i = 0; i < sz; i++) {
319,432✔
1331
        int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
×
1332
        if (tbUid &&
×
1333
            taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
×
1334
          tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1335
          continue;
×
1336
        }
1337
      }
1338
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
1339
      tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1340
    }
1341
  }
1342
  taosWUnLockLatch(&pTq->lock);
2,363,503✔
1343
  return 0;
2,363,503✔
1344
}
1345

1346
static SArray* copyUidList(SArray* tbUidList) {
12,100✔
1347
  SArray* tbUidListCopy = taosArrayInit(4, sizeof(int64_t));
12,100✔
1348
  if (tbUidListCopy == NULL) {
12,100✔
1349
    return NULL;
×
1350
  }
1351

1352
  if (taosArrayAddAll(tbUidListCopy, tbUidList) == NULL) {
12,100✔
1353
    taosArrayDestroy(tbUidListCopy);
×
1354
    tqError("copy table uid list failed");
×
1355
    return NULL;
×
1356
  }
1357
  return tbUidListCopy;
12,100✔
1358
}
1359

1360
static int32_t addTableListForStableTmq(STqHandle* pTqHandle, STQ* pTq, SArray* tbUidList) {
12,100✔
1361
  int32_t code = 0;
12,100✔
1362
  SArray* tbUidListCopy = copyUidList(tbUidList);
12,100✔
1363
  if (tbUidListCopy == NULL) {
12,100✔
1364
    code = terrno;
×
1365
    goto END;
×
1366
  }
1367
  code = qFilterTableList(pTq->pVnode, tbUidListCopy, pTqHandle->execHandle.execTb.node,
12,100✔
1368
                      pTqHandle->execHandle.task, pTqHandle->execHandle.execTb.suid);
12,100✔
1369
  if (code != TDB_CODE_SUCCESS) {
12,100✔
1370
    tqError("tqAddTbUidList error:%d handle %s consumer:0x%" PRIx64, code, pTqHandle->subKey,
×
1371
            pTqHandle->consumerId);
1372
    goto END;
×
1373
  }
1374
  tqDebug("%s handle %s consumer:0x%" PRIx64 " add %d tables to tqReader", __func__, pTqHandle->subKey,
12,100✔
1375
          pTqHandle->consumerId, (int32_t)taosArrayGetSize(tbUidListCopy));
1376
  tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, tbUidListCopy);
12,100✔
1377

1378
END:
12,100✔
1379
  taosArrayDestroy(tbUidListCopy);
12,100✔
1380
  return code;
12,100✔
1381
}
1382

1383
int32_t tqAddTbUidList(STQ* pTq, SArray* tbUidList) {
72,556,112✔
1384
  if (pTq == NULL) {
72,556,112✔
1385
    return 0;  // mounted vnode may have no tq
×
1386
  }
1387
  if (tbUidList == NULL) {
72,556,112✔
1388
    return TSDB_CODE_INVALID_PARA;
×
1389
  }
1390
  void*   pIter = NULL;
72,556,112✔
1391
  int32_t vgId = TD_VID(pTq->pVnode);
72,556,112✔
1392
  int32_t code = 0;
72,560,646✔
1393

1394
  // update the table list for each consumer handle
1395
  taosWLockLatch(&pTq->lock);
72,560,646✔
1396
  while (1) {
1,038,572✔
1397
    pIter = taosHashIterate(pTq->pHandle, pIter);
73,598,443✔
1398
    if (pIter == NULL) {
73,598,024✔
1399
      break;
72,559,452✔
1400
    }
1401

1402
    STqHandle* pTqHandle = (STqHandle*)pIter;
1,038,572✔
1403
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " add table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
1,038,572✔
1404
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,038,572✔
1405
      code = qAddTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
389,848✔
1406
      if (code != 0) {
389,848✔
1407
        tqError("add table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1408
        break;
×
1409
      }
1410
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
648,724✔
1411
      code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
11,050✔
1412
      if (code != 0) {
11,050✔
1413
        tqError("add table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1414
        break;
×
1415
      }
1416
    }
1417
  }
1418
  taosHashCancelIterate(pTq->pHandle, pIter);
72,559,452✔
1419
  taosWUnLockLatch(&pTq->lock);
72,559,508✔
1420

1421
  return code;
72,560,125✔
1422
}
1423

1424
int32_t tqUpdateTbUidList(STQ* pTq, SArray* tbUidList, SArray* cidList) {
7,605,531✔
1425
  if (pTq == NULL) {
7,605,531✔
1426
    return 0;  // mounted vnode may have no tq
×
1427
  }
1428
  if (tbUidList == NULL) {
7,605,531✔
1429
    return TSDB_CODE_INVALID_PARA;
×
1430
  }
1431
  void*   pIter = NULL;
7,605,531✔
1432
  int32_t vgId = TD_VID(pTq->pVnode);
7,605,531✔
1433
  int32_t code = 0;
7,605,531✔
1434
  // update the table list for each consumer handle
1435
  taosWLockLatch(&pTq->lock);
7,605,531✔
1436
  while (1) {
2,755✔
1437
    pIter = taosHashIterate(pTq->pHandle, pIter);
7,608,286✔
1438
    if (pIter == NULL) {
7,608,286✔
1439
      break;
7,605,531✔
1440
    }
1441

1442
    STqHandle* pTqHandle = (STqHandle*)pIter;
2,755✔
1443
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " update table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
2,755✔
1444
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,755✔
1445
      SNode* pTagCond = getTagCondNodeForQueryTmq(pTqHandle->execHandle.task);
655✔
1446
      bool ret = checkCidInTagCondition(pTagCond, cidList);
655✔
1447
      if (ret){
655✔
1448
        code = qUpdateTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
×
1449
        if (code != 0) {
×
1450
          tqError("update table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1451
          break;
×
1452
        }
1453
      }
1454
      qUpdateTableTagCacheForTmq(pTqHandle->execHandle.task, tbUidList, cidList);
655✔
1455
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
2,100✔
1456
      SNode* pTagCond = getTagCondNodeForStableTmq(pTqHandle->execHandle.execTb.node);
2,100✔
1457
      bool ret = checkCidInTagCondition(pTagCond, cidList);
2,100✔
1458
      if (ret){
2,100✔
1459
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
1,050✔
1460
        code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
1,050✔
1461
        if (code != 0) {
1,050✔
1462
          tqError("update table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1463
          break;
×
1464
        }
1465
      }
1466
    }
1467
  }
1468

1469
  taosHashCancelIterate(pTq->pHandle, pIter);
7,605,531✔
1470
  taosWUnLockLatch(&pTq->lock);
7,605,531✔
1471

1472
  return code;
7,605,531✔
1473
}
1474

1475
static void destroySourceScanTables(void* ptr) {
×
1476
  SArray** pTables = ptr;
×
1477
  if (pTables && *pTables) {
×
1478
    taosArrayDestroy(*pTables);
×
1479
    *pTables = NULL;
×
1480
  }
1481
}
×
1482

1483
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1484
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1485
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1486
  if (pCol1->vColId == pCol2->vColId) {
×
1487
    return 0;
×
1488
  } else if (pCol1->vColId < pCol2->vColId) {
×
1489
    return -1;
×
1490
  } else {
1491
    return 1;
×
1492
  }
1493
}
1494

1495
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1496
  if (value) {
×
1497
    SSchemaWrapper* pSchemaWrapper = value;
×
1498
    tDeleteSchemaWrapper(pSchemaWrapper);
1499
  }
1500
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc