• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5009

29 Mar 2026 04:32AM UTC coverage: 72.26% (+0.02%) from 72.241%
#5009

push

travis-ci

web-flow
refactor: do some internal refactor for TDgpt. (#34955)

253662 of 351039 relevant lines covered (72.26%)

131649114.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.44
/source/dnode/vnode/src/tq/tqRead.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "taoserror.h"
17
#include "tarray.h"
18
#include "tdef.h"
19
#include "thash.h"
20
#include "tmsg.h"
21
#include "tpriv.h"
22
#include "tq.h"
23

24
static int32_t tqRetrieveCols(STqReader *pReader, SSDataBlock *pRes, SHashObj* pCol2SlotId);
25

26
static void processCreateTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
8,362✔
27
  int32_t code = 0;
8,362✔
28
  int32_t lino = 0;
8,362✔
29
  int32_t        needRebuild = 0;
8,362✔
30
  SVCreateTbReq* pCreateReq = NULL;
8,362✔
31
  SVCreateTbBatchReq reqNew = {0};
8,362✔
32
  void* buf = NULL;
8,362✔
33
  SVCreateTbBatchReq req = {0};
8,362✔
34
  code = tDecodeSVCreateTbBatchReq(dcoder, &req);
8,362✔
35
  if (code < 0) {
8,362✔
36
    lino = __LINE__;
×
37
    goto end;
×
38
  }
39

40
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
17,061✔
41
    pCreateReq = req.pReqs + iReq;
8,699✔
42
    if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) && 
8,699✔
43
         pCreateReq->ctb.suid == tbSuid &&
13,770✔
44
         taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {  
5,409✔
45
      needRebuild++;
3,627✔
46
    }
47
  }
48
  if (needRebuild == 0) {
8,362✔
49
    // do nothing
50
  } else if (needRebuild == req.nReqs) {
3,627✔
51
    *realTbSuid = tbSuid;
3,290✔
52
  } else {
53
    *realTbSuid = tbSuid;
337✔
54
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
337✔
55
    if (reqNew.pArray == NULL) {
337✔
56
      code = terrno;
×
57
      lino = __LINE__;
×
58
      goto end;
×
59
    }
60
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
1,011✔
61
      pCreateReq = req.pReqs + iReq;
674✔
62
      if ((pCreateReq->type == TSDB_CHILD_TABLE || pCreateReq->type == TSDB_VIRTUAL_CHILD_TABLE) &&
674✔
63
          pCreateReq->ctb.suid == tbSuid &&
1,348✔
64
          taosHashGet(pReader->tbIdHash, &pCreateReq->uid, sizeof(int64_t)) != NULL) {
674✔
65
        reqNew.nReqs++;
337✔
66
        if (taosArrayPush(reqNew.pArray, pCreateReq) == NULL) {
674✔
67
          code = terrno;
×
68
          lino = __LINE__;
×
69
          goto end;
×
70
        }
71
      }
72
    }
73

74
    int     tlen = 0;
337✔
75
    tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, code);
337✔
76
    buf = taosMemoryMalloc(tlen);
337✔
77
    if (NULL == buf || code < 0) {
337✔
78
      lino = __LINE__;
×
79
      goto end;
×
80
    }
81
    SEncoder coderNew = {0};
337✔
82
    tEncoderInit(&coderNew, buf, tlen);
337✔
83
    code = tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
337✔
84
    tEncoderClear(&coderNew);
337✔
85
    if (code < 0) {
337✔
86
      lino = __LINE__;
×
87
      goto end;
×
88
    }
89
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
337✔
90
    pHead->bodyLen = tlen + sizeof(SMsgHead);
337✔
91
  }
92

93
end:
8,362✔
94
  taosMemoryFree(buf);
8,362✔
95
  taosArrayDestroy(reqNew.pArray);
8,362✔
96
  tDeleteSVCreateTbBatchReq(&req);
8,362✔
97
  if (code < 0) {
8,362✔
98
    tqError("processCreateTbMsg failed, code:%d, line:%d", code, lino);
×
99
  }
100
}
8,362✔
101

102
static void processAlterTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
4,704✔
103
  SVAlterTbReq req = {0};
4,704✔
104
  SVAlterTbReq reqNew = {0};
4,704✔
105
  SMetaReader mr = {0};
4,704✔
106
  void* buf = NULL;
4,704✔
107
  int32_t lino = 0;
4,704✔
108
  int32_t code = tDecodeSVAlterTbReq(dcoder, &req);
4,704✔
109
  if (code < 0) {
4,704✔
110
    tqError("vgId:%d, processAlterTbMsg failed to decode SVAlterTbReq, code:%s",
×
111
            TD_VID(pReader->pVnode), tstrerror(code));
112
    lino = __LINE__;
×
113
    goto end;
×
114
  }
115

116
  if (req.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TABLE_TAG_VAL) {
4,704✔
117
    int32_t needRebuild = 0;
3,966✔
118
    for (int32_t i = 0; i < taosArrayGetSize(req.tables); i++) {
9,777✔
119
      SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
5,811✔
120
      if (pTable == NULL || pTable->tbName == NULL) {
5,811✔
121
        tqWarn("vgId:%d, processAlterTbMsg Type 1 table[%d] has invalid name, skip",
×
122
               TD_VID(pReader->pVnode), i);
123
        continue;
×
124
      }
125

126
      metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
5,811✔
127
      code = metaGetTableEntryByName(&mr, pTable->tbName);
5,811✔
128
      if (code < 0) {
5,811✔
129
        // Table not found, skip
130
        tqDebug("vgId:%d, processAlterTbMsg Type 1 table %s not found, skip",
×
131
                TD_VID(pReader->pVnode), pTable->tbName);
132
        metaReaderClear(&mr);
×
133
        continue;
×
134
      }
135

136
      if (mr.me.ctbEntry.suid == tbSuid &&
11,622✔
137
          taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
5,811✔
138
        needRebuild++;
4,735✔
139
      }
140
      metaReaderClear(&mr);
5,811✔
141
    }
142

143
    if (needRebuild == 0) {
3,966✔
144
      // No tables in subscription scope, skip message
145
      tqDebug("vgId:%d, processAlterTbMsg Type 1: 0/%d tables in subscription, skip",
338✔
146
              TD_VID(pReader->pVnode), (int)taosArrayGetSize(req.tables));
147
    } else if (needRebuild == taosArrayGetSize(req.tables)) {
3,628✔
148
      // All tables in subscription scope, forward complete message
149
      *realTbSuid = tbSuid;
2,890✔
150
      tqDebug("vgId:%d, processAlterTbMsg Type 1: %d/%d tables in subscription, forward all",
2,890✔
151
              TD_VID(pReader->pVnode), needRebuild, (int)taosArrayGetSize(req.tables));
152
    } else {
153
      // Partial match: rebuild message with only subscribed tables
154
      *realTbSuid = tbSuid;
738✔
155
      tqDebug("vgId:%d, processAlterTbMsg Type 1: %d/%d tables in subscription, rebuild message",
738✔
156
              TD_VID(pReader->pVnode), needRebuild, (int)taosArrayGetSize(req.tables));
157

158
      // Build filtered message
159
      reqNew.action = req.action;
738✔
160
      reqNew.tbName = req.tbName;
738✔
161
      reqNew.tables = taosArrayInit(needRebuild, sizeof(SUpdateTableTagVal));
738✔
162
      if (reqNew.tables == NULL) {
738✔
163
        code = terrno;
×
164
        tqError("vgId:%d, processAlterTbMsg failed to allocate filtered tables array, code:%s",
×
165
                TD_VID(pReader->pVnode), tstrerror(code));
166
        lino = __LINE__;
×
167
        goto end;
×
168
      }
169

170
      // Collect only subscribed tables
171
      for (int32_t i = 0; i < taosArrayGetSize(req.tables); i++) {
2,214✔
172
        SUpdateTableTagVal* pTable = taosArrayGet(req.tables, i);
1,476✔
173
        if (pTable == NULL || pTable->tbName == NULL) {
1,476✔
174
          continue;
×
175
        }
176
        metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
1,476✔
177
        code = metaGetTableEntryByName(&mr, pTable->tbName);
1,476✔
178
        if (code < 0) {
1,476✔
179
          metaReaderClear(&mr);
×
180
          continue;
×
181
        }
182

183
        if (mr.me.ctbEntry.suid == tbSuid &&
2,952✔
184
            taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
1,476✔
185
          if (taosArrayPush(reqNew.tables, pTable) == NULL) {
1,476✔
186
            code = terrno;
×
187
            tqError("vgId:%d, processAlterTbMsg failed to add table %s to filtered array, code:%s",
×
188
                    TD_VID(pReader->pVnode), pTable->tbName, tstrerror(code));
189
            lino = __LINE__;
×
190
            metaReaderClear(&mr);
×
191
            goto end;
×
192
          }
193
        }
194
        metaReaderClear(&mr);
1,476✔
195
      }
196

197
      // Encode filtered message
198
      int tlen = 0;
738✔
199
      tEncodeSize(tEncodeSVAlterTbReq, &reqNew, tlen, code);
738✔
200
      if (code < 0) {
738✔
201
        tqError("vgId:%d, processAlterTbMsg failed to calculate encode size, code:%s",
×
202
                TD_VID(pReader->pVnode), tstrerror(code));
203
        lino = __LINE__;
×
204
        goto end;
×
205
      }
206

207
      buf = taosMemoryMalloc(tlen);
738✔
208
      if (NULL == buf) {
738✔
209
        code = terrno;
×
210
        tqError("vgId:%d, processAlterTbMsg failed to allocate encode buffer size:%d, code:%s",
×
211
                TD_VID(pReader->pVnode), tlen, tstrerror(code));
212
        lino = __LINE__;
×
213
        goto end;
×
214
      }
215

216
      SEncoder coderNew = {0};
738✔
217
      tEncoderInit(&coderNew, buf, tlen);
738✔
218
      code = tEncodeSVAlterTbReq(&coderNew, &reqNew);
738✔
219
      tEncoderClear(&coderNew);
738✔
220
      if (code != 0) {
738✔
221
        tqError("vgId:%d, processAlterTbMsg failed to encode filtered message, code:%s",
×
222
                TD_VID(pReader->pVnode), tstrerror(code));
223
        lino = __LINE__;
×
224
        goto end;
×
225
      }
226
      (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
738✔
227
      pHead->bodyLen = tlen + sizeof(SMsgHead);
738✔
228
      tqInfo("vgId:%d, processAlterTbMsg Type 1 rebuilt message with %d/%d tables",
738✔
229
             TD_VID(pReader->pVnode), needRebuild, (int)taosArrayGetSize(req.tables));
230
    }
231

232
  } else if (req.action == TSDB_ALTER_TABLE_UPDATE_CHILD_TABLE_TAG_VAL) {
738✔
233
    // Type 2: WHERE condition on super table
234
    metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
738✔
235
    code = metaGetTableEntryByName(&mr, req.tbName);
738✔
236
    if (code < 0) {
738✔
237
      tqError("vgId:%d, processAlterTbMsg Type 2 failed to get super table %s, code:%s",
×
238
              TD_VID(pReader->pVnode), req.tbName, tstrerror(code));
239
      lino = __LINE__;
×
240
      metaReaderClear(&mr);
×
241
      goto end;
×
242
    }
243

244
    // For Type 2, the super table is specified in req.tbName
245
    // The actual filtering happens through tbIdHash which already contains
246
    // only the child tables matching the topic's WHERE condition
247
    if (mr.me.type == TSDB_SUPER_TABLE) {
738✔
248
      *realTbSuid = mr.me.uid;
738✔
249
      tqDebug("vgId:%d, processAlterTbMsg Type 2: super table %" PRId64 " (WHERE condition filtering via tbIdHash)",
738✔
250
              TD_VID(pReader->pVnode), mr.me.uid);
251
    }
252
    metaReaderClear(&mr);
738✔
253
  } else {
254
    // Legacy single-table tag modification
255
    metaReaderDoInit(&mr, pReader->pVnode->pMeta, META_READER_LOCK);
×
256
    code = metaGetTableEntryByName(&mr, req.tbName);
×
257
    if (code < 0) {
×
258
      tqDebug("vgId:%d, processAlterTbMsg legacy type failed to get table %s, code:%s",
×
259
              TD_VID(pReader->pVnode), req.tbName ? req.tbName : "(null)", tstrerror(code));
260
      lino = __LINE__;
×
261
      metaReaderClear(&mr);
×
262
      goto end;
×
263
    }
264
    if (taosHashGet(pReader->tbIdHash, &mr.me.uid, sizeof(int64_t)) != NULL) {
×
265
      *realTbSuid = mr.me.ctbEntry.suid;
×
266
    }
267
    metaReaderClear(&mr);
×
268
  }
269

270
end:
4,704✔
271
  taosMemoryFree(buf);
4,704✔
272
  taosArrayDestroy(reqNew.tables);
4,704✔
273
  destroyAlterTbReq(&req);
4,704✔
274
  if (code < 0) {
4,704✔
275
    tqError("vgId:%d, processAlterTbMsg failed at line:%d, code:%s",
×
276
            TD_VID(pReader->pVnode), lino, tstrerror(code));
277
  }
278
} 
4,704✔
279

280
static void processDropTbMsg(SDecoder* dcoder, SWalCont* pHead, STqReader* pReader, int64_t* realTbSuid, int64_t tbSuid) {
×
281
  SVDropTbBatchReq req = {0};
×
282
  SVDropTbBatchReq reqNew = {0};
×
283
  void* buf = NULL;
×
284
  int32_t lino = 0;
×
285
  int32_t code = tDecodeSVDropTbBatchReq(dcoder, &req);
×
286
  if (code < 0) {
×
287
    lino = __LINE__;
×
288
    goto end;
×
289
  }
290

291
  int32_t      needRebuild = 0;
×
292
  SVDropTbReq* pDropReq = NULL;
×
293
  for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
294
    pDropReq = req.pReqs + iReq;
×
295

296
    if (pDropReq->suid == tbSuid &&
×
297
        taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
298
      needRebuild++;
×
299
    }
300
  }
301
  if (needRebuild == 0) {
×
302
    // do nothing
303
  } else if (needRebuild == req.nReqs) {
×
304
    *realTbSuid = tbSuid;
×
305
  } else {
306
    *realTbSuid = tbSuid;
×
307
    reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
×
308
    if (reqNew.pArray == NULL) {
×
309
      code = terrno;
×
310
      lino = __LINE__;
×
311
      goto end;
×
312
    }
313
    for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
×
314
      pDropReq = req.pReqs + iReq;
×
315
      if (pDropReq->suid == tbSuid &&
×
316
          taosHashGet(pReader->tbIdHash, &pDropReq->uid, sizeof(int64_t)) != NULL) {
×
317
        reqNew.nReqs++;
×
318
        if (taosArrayPush(reqNew.pArray, pDropReq) == NULL) {
×
319
          code = terrno;
×
320
          lino = __LINE__;
×
321
          goto end;
×
322
        }
323
      }
324
    }
325

326
    int     tlen = 0;
×
327
    tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, code);
×
328
    buf = taosMemoryMalloc(tlen);
×
329
    if (NULL == buf || code < 0) {
×
330
      lino = __LINE__;
×
331
      goto end;
×
332
    }
333
    SEncoder coderNew = {0};
×
334
    tEncoderInit(&coderNew, buf, tlen);
×
335
    code = tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
×
336
    tEncoderClear(&coderNew);
×
337
    if (code != 0) {
×
338
      lino = __LINE__;
×
339
      goto end;
×
340
    }
341
    (void)memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
×
342
    pHead->bodyLen = tlen + sizeof(SMsgHead);
×
343
  }
344

345
end:
×
346
  taosMemoryFree(buf);
×
347
  taosArrayDestroy(reqNew.pArray);
×
348
  if (code < 0) {
×
349
    tqError("processDropTbMsg failed, code:%d, line:%d", code, lino);
×
350
  }
351
}
×
352

353
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
26,139✔
354
  int32_t code = 0;
26,139✔
355
  int32_t lino = 0;
26,139✔
356
  if (pHandle == NULL || pHead == NULL) {
26,139✔
357
    return false;
×
358
  }
359
  if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
26,139✔
360
    return true;
10,922✔
361
  }
362

363
  STqExecHandle* pExec = &pHandle->execHandle;
15,217✔
364
  STqReader* pReader = pExec->pTqReader;
15,217✔
365

366
  int16_t msgType = pHead->msgType;
15,217✔
367
  char*   body = pHead->body;
15,217✔
368
  int32_t bodyLen = pHead->bodyLen;
15,217✔
369

370
  int64_t  tbSuid = pHandle->execHandle.execTb.suid;
15,217✔
371
  int64_t  realTbSuid = 0;
15,217✔
372
  SDecoder dcoder = {0};
15,217✔
373
  void*    data = POINTER_SHIFT(body, sizeof(SMsgHead));
15,217✔
374
  int32_t  len = bodyLen - sizeof(SMsgHead);
15,217✔
375
  tDecoderInit(&dcoder, data, len);
15,217✔
376

377
  if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
17,368✔
378
    SVCreateStbReq req = {0};
2,151✔
379
    if (tDecodeSVCreateStbReq(&dcoder, &req) < 0) {
2,151✔
380
      goto end;
×
381
    }
382
    realTbSuid = req.suid;
2,151✔
383
  } else if (msgType == TDMT_VND_DROP_STB) {
13,066✔
384
    SVDropStbReq req = {0};
×
385
    if (tDecodeSVDropStbReq(&dcoder, &req) < 0) {
×
386
      goto end;
×
387
    }
388
    realTbSuid = req.suid;
×
389
  } else if (msgType == TDMT_VND_CREATE_TABLE) {
13,066✔
390
    processCreateTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
8,362✔
391
  } else if (msgType == TDMT_VND_ALTER_TABLE) {
4,704✔
392
    processAlterTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
4,704✔
393
  } else if (msgType == TDMT_VND_DROP_TABLE) {
×
394
    processDropTbMsg(&dcoder, pHead, pReader, &realTbSuid, tbSuid);
×
395
  } else if (msgType == TDMT_VND_DELETE) {
×
396
    SDeleteRes req = {0};
×
397
    if (tDecodeDeleteRes(&dcoder, &req) < 0) {
×
398
      goto end;
×
399
    }
400
    realTbSuid = req.suid;
×
401
  }
402

403
end:
15,217✔
404
  tDecoderClear(&dcoder);
15,217✔
405
  bool tmp = tbSuid == realTbSuid;
15,217✔
406
  tqDebug("%s suid:%" PRId64 " realSuid:%" PRId64 " return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp);
15,217✔
407
  return tmp;
15,217✔
408
}
409

410
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) {
52,393,561✔
411
  if (pTq == NULL || pHandle == NULL || fetchOffset == NULL) {
52,393,561✔
412
    return -1;
×
413
  }
414
  int32_t code = -1;
52,462,815✔
415
  int32_t vgId = TD_VID(pTq->pVnode);
52,462,815✔
416
  int64_t id = pHandle->pWalReader->readerId;
52,467,892✔
417

418
  int64_t offset = *fetchOffset;
52,468,767✔
419
  int64_t lastVer = walGetLastVer(pHandle->pWalReader->pWal);
52,473,288✔
420
  int64_t committedVer = walGetCommittedVer(pHandle->pWalReader->pWal);
52,473,120✔
421
  int64_t appliedVer = walGetAppliedVer(pHandle->pWalReader->pWal);
52,473,358✔
422

423
  tqDebug("vgId:%d, start to fetch wal, index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64
52,471,308✔
424
          ", 0x%" PRIx64,
425
          vgId, offset, lastVer, committedVer, appliedVer, id);
426

427
  while (offset <= appliedVer) {
55,343,018✔
428
    if (walFetchHead(pHandle->pWalReader, offset) < 0) {
50,585,031✔
429
      tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
×
430
              ", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
431
              pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
432
      goto END;
×
433
    }
434

435
    tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type:%s, QID:0x%" PRIx64 " 0x%" PRIx64,
50,587,100✔
436
            vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
437

438
    if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
50,584,887✔
439
      code = walFetchBody(pHandle->pWalReader);
47,706,211✔
440
      goto END;
47,705,305✔
441
    } else {
442
      if (pHandle->fetchMeta != WITH_DATA) {
2,882,508✔
443
        SWalCont* pHead = &(pHandle->pWalReader->pHead->head);
40,634✔
444
        if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
40,634✔
445
          code = walFetchBody(pHandle->pWalReader);
26,139✔
446
          if (code < 0) {
26,139✔
447
            goto END;
×
448
          }
449

450
          pHead = &(pHandle->pWalReader->pHead->head);
26,139✔
451
          if (isValValidForTable(pHandle, pHead)) {
26,139✔
452
            code = 0;
20,328✔
453
            goto END;
20,328✔
454
          } else {
455
            offset++;
5,811✔
456
            code = -1;
5,811✔
457
            continue;
5,811✔
458
          }
459
        }
460
      }
461
      code = walSkipFetchBody(pHandle->pWalReader);
2,856,369✔
462
      if (code < 0) {
2,856,369✔
463
        goto END;
×
464
      }
465
      offset++;
2,856,369✔
466
    }
467
    code = -1;
2,856,369✔
468
  }
469

470
END:
4,757,987✔
471
  *fetchOffset = offset;
52,483,620✔
472
  tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64
52,478,849✔
473
          ", applied:%" PRId64 ", 0x%" PRIx64,
474
          vgId, code, offset, lastVer, committedVer, appliedVer, id);
475
  return code;
52,480,162✔
476
}
477

478
bool tqGetTablePrimaryKey(STqReader* pReader) {
6,300,960✔
479
  if (pReader == NULL) {
6,300,960✔
480
    return false;
×
481
  }
482
  return pReader->hasPrimaryKey;
6,300,960✔
483
}
484

485
void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) {
47,791✔
486
  tqDebug("%s:%p uid:%" PRId64, __FUNCTION__, pReader, uid);
47,791✔
487

488
  if (pReader == NULL) {
47,791✔
489
    return;
×
490
  }
491
  bool            ret = false;
47,791✔
492
  SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnode->pMeta, uid, -1, 1, NULL, 0);
47,791✔
493
  if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) {
47,791✔
494
    ret = true;
965✔
495
  }
496
  tDeleteSchemaWrapper(schema);
497
  pReader->hasPrimaryKey = ret;
47,791✔
498
}
499

500
static void freeTagCache(void* pData){
1,765,326✔
501
  if (pData == NULL) return;
1,765,326✔
502
  SArray* tagCache = *(SArray**)pData;
1,765,326✔
503
  taosArrayDestroyP(tagCache, taosMemFree);
1,765,616✔
504
}
505

506
STqReader* tqReaderOpen(SVnode* pVnode) {
440,637✔
507
  tqDebug("%s:%p", __FUNCTION__, pVnode);
440,637✔
508
  if (pVnode == NULL) {
442,542✔
509
    return NULL;
×
510
  }
511
  STqReader* pReader = taosMemoryCalloc(1, sizeof(STqReader));
442,542✔
512
  if (pReader == NULL) {
442,542✔
513
    return NULL;
×
514
  }
515

516
  pReader->pWalReader = walOpenReader(pVnode->pWal, 0);
442,542✔
517
  if (pReader->pWalReader == NULL) {
442,542✔
518
    taosMemoryFree(pReader);
×
519
    return NULL;
×
520
  }
521

522
  pReader->pVnode = pVnode;
442,542✔
523
  pReader->pSchemaWrapper = NULL;
442,542✔
524
  pReader->tbIdHash = NULL;
442,542✔
525
  pReader->pTableTagCacheForTmq = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
442,542✔
526
  if (pReader->pTableTagCacheForTmq == NULL) {
442,542✔
527
    walCloseReader(pReader->pWalReader);
×
528
    taosMemoryFree(pReader);
×
529
    return NULL;
×
530
  }
531
  taosHashSetFreeFp(pReader->pTableTagCacheForTmq, freeTagCache);
442,542✔
532
  taosInitRWLatch(&pReader->tagCachelock);
442,542✔
533

534
  return pReader;
442,182✔
535
}
536

537
void tqReaderClose(STqReader* pReader) {
446,710✔
538
  tqDebug("%s:%p", __FUNCTION__, pReader);
446,710✔
539
  if (pReader == NULL) return;
447,342✔
540

541
  // close wal reader
542
  walCloseReader(pReader->pWalReader);
442,542✔
543
  taosHashCleanup(pReader->pTableTagCacheForTmq);
442,246✔
544
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
442,542✔
545
  taosMemoryFree(pReader->pTSchema);
442,542✔
546
  taosMemoryFree(pReader->extSchema);
442,542✔
547

548
  // free hash
549
  taosHashCleanup(pReader->tbIdHash);
442,542✔
550
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
442,542✔
551

552
  taosMemoryFree(pReader);
442,542✔
553
}
554

555
int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) {
365,112✔
556
  if (pReader == NULL) {
365,112✔
557
    return TSDB_CODE_INVALID_PARA;
×
558
  }
559
  if (walReaderSeekVer(pReader->pWalReader, ver) < 0) {
365,112✔
560
    return terrno;
127,653✔
561
  }
562
  tqDebug("wal reader seek to ver:%" PRId64 " %s", ver, id);
237,531✔
563
  return 0;
237,808✔
564
}
565

566
static int32_t getTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid) {
102,667,033✔
567
  int32_t code = 0;
102,667,033✔
568
  int32_t lino = 0;
102,667,033✔
569

570
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
102,667,033✔
571
  if (data == NULL) {
102,715,018✔
572
    SStorageAPI api = {0}; 
39,764,171✔
573
    initStorageAPI(&api);
39,764,868✔
574
    code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, 0, &pReader->tagCachelock);
39,751,527✔
575
    TSDB_CHECK_CODE(code, lino, END);
39,760,035✔
576
  }
577

578
  END:
62,950,847✔
579
  if (code != TSDB_CODE_SUCCESS) {
102,697,629✔
580
    tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
581
  }
582
  
583
  return code;
102,660,791✔
584
}
585

586
void tqUpdateTableTagCache(STqReader* pReader, SExprInfo* pExprInfo, int32_t numOfExpr, int64_t uid, col_id_t colId) {
393,756✔
587
  int32_t code = 0;
393,756✔
588
  int32_t lino = 0;
393,756✔
589

590
  void* data = taosHashGet(pReader->pTableTagCacheForTmq, &uid, LONG_BYTES);
393,756✔
591
  if (data == NULL) {
393,756✔
592
    return;
393,070✔
593
  }
594

595
  SStorageAPI api = {0}; 
686✔
596
  initStorageAPI(&api);
686✔
597
  code = cacheTag(pReader->pVnode, pReader->pTableTagCacheForTmq, pExprInfo, numOfExpr, &api, uid, colId, &pReader->tagCachelock);
686✔
598
  TSDB_CHECK_CODE(code, lino, END);
686✔
599

600
  END:
686✔
601
  if (code != TSDB_CODE_SUCCESS) {
686✔
602
    tqError("%s failed at %d, failed to update tag cache code:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid);
×
603
  }
604
}
605

606
static int32_t tqRetrievePseudoCols(STqReader* pReader, SSDataBlock* pBlock, int32_t numOfRows, int64_t uid, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr) {
102,685,927✔
607
  if (pReader == NULL || pBlock == NULL) {
102,685,927✔
608
    return TSDB_CODE_INVALID_PARA;
×
609
  }
610
  int32_t code = TSDB_CODE_SUCCESS;
102,706,100✔
611
  int32_t lino = 0;
102,706,100✔
612
  
613
  code = getTableTagCache(pReader, pPseudoExpr, numOfPseudoExpr, uid);
102,706,100✔
614
  TSDB_CHECK_CODE(code, lino, END);
102,673,066✔
615

616
  code = fillTag(pReader->pTableTagCacheForTmq, pPseudoExpr, numOfPseudoExpr, uid, pBlock, numOfRows, pBlock->info.rows - numOfRows, 1, &pReader->tagCachelock);
102,673,066✔
617
  TSDB_CHECK_CODE(code, lino, END);
102,707,887✔
618

619
END:
102,707,887✔
620
  if (code != 0) {
102,707,887✔
621
    tqError("tqRetrievePseudoCols failed, line:%d, msg:%s", lino, tstrerror(code));
×
622
  }
623
  return code;
102,687,740✔
624
}
625

626
int32_t tqNextBlockInWal(STqReader* pReader, SSDataBlock* pRes, SHashObj* pCol2SlotId, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
9,284,412✔
627
                         int sourceExcluded, int32_t minPollRows, int64_t timeout, int8_t enableReplay) {
628
  int32_t code = 0;
9,284,412✔
629
  if (pReader == NULL) {
9,284,412✔
630
    return TSDB_CODE_INVALID_PARA;
×
631
  }
632
  SWalReader* pWalReader = pReader->pWalReader;
9,284,412✔
633

634
  int64_t st = taosGetTimestampMs();
9,284,412✔
635
  while (1) {
105,243,224✔
636
    code = walNextValidMsg(pWalReader, false);
114,527,636✔
637
    if (code != 0) {
114,481,014✔
638
      break;
8,453,187✔
639
    }
640

641
    void*   pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
106,027,827✔
642
    int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
106,097,156✔
643
    int64_t ver = pWalReader->pHead->head.version;
106,089,050✔
644
    SDecoder decoder = {0};
106,107,488✔
645
    code = tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver, NULL, &decoder);
106,095,448✔
646
    tDecoderClear(&decoder);
106,073,957✔
647
    if (code != 0) {
106,074,722✔
648
      return code;
×
649
    }
650
    pReader->nextBlk = 0;
106,074,722✔
651

652
    int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
106,102,333✔
653
    while (pReader->nextBlk < numOfBlocks) {
212,184,556✔
654
      tqDebug("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
106,077,382✔
655
              pReader->msg.ver);
656

657
      SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
106,113,517✔
658
      if (pSubmitTbData == NULL) {
106,127,217✔
659
        tqError("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
×
660
                pReader->msg.ver);
661
        return terrno;
×
662
      }
663
      if ((pSubmitTbData->flags & sourceExcluded) != 0) {
106,127,217✔
664
        pReader->nextBlk += 1;
×
665
        continue;
×
666
      }
667
      if (pReader->tbIdHash == NULL || taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)) != NULL) {
106,115,127✔
668
        tqDebug("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
102,728,037✔
669
        int32_t numOfRows = pRes->info.rows;
102,707,305✔
670
        code = tqRetrieveCols(pReader, pRes, pCol2SlotId);
102,726,951✔
671
        if (code != TSDB_CODE_SUCCESS) {
102,672,884✔
672
          return code;
×
673
        }
674
        code = tqRetrievePseudoCols(pReader, pRes, numOfRows, pSubmitTbData->uid, pPseudoExpr, numOfPseudoExpr);
102,672,884✔
675
        if (code != TSDB_CODE_SUCCESS) {
102,691,073✔
676
          return code;
×
677
        }
678

679
      }
680
      pReader->nextBlk += 1;
106,058,253✔
681
    }
682

683
    tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
106,091,248✔
684
    pReader->msg.msgStr = NULL;
106,091,635✔
685

686
    if (pRes->info.rows >= minPollRows || (enableReplay && pRes->info.rows > 0)){
106,092,607✔
687
      break;
688
    }
689
    int64_t elapsed = taosGetTimestampMs() - st;
105,453,269✔
690
    if (elapsed > timeout || elapsed < 0) {
105,453,269✔
691
      code = TSDB_CODE_TMQ_FETCH_TIMEOUT;
224,362✔
692
      terrno = code;
224,362✔
693
      break;
194,756✔
694
    }
695
  }
696
  return code;
9,283,552✔
697
}
698

699
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver, SArray* rawList, SDecoder* decoder) {
153,753,443✔
700
  if (pReader == NULL) {
153,753,443✔
701
    return TSDB_CODE_INVALID_PARA;
×
702
  }
703
  pReader->msg.msgStr = msgStr;
153,753,443✔
704
  pReader->msg.msgLen = msgLen;
153,784,200✔
705
  pReader->msg.ver = ver;
153,801,530✔
706

707
  tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
153,810,055✔
708

709
  tDecoderInit(decoder, pReader->msg.msgStr, pReader->msg.msgLen);
153,810,055✔
710
  int32_t code = tDecodeSubmitReq(decoder, &pReader->submit, rawList);
153,791,999✔
711

712
  if (code != 0) {
153,793,667✔
713
    tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver);
×
714
  }
715

716
  return code;
153,760,811✔
717
}
718

719
void tqReaderClearSubmitMsg(STqReader* pReader) {
95,279,040✔
720
  tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
95,279,040✔
721
  pReader->nextBlk = 0;
95,339,965✔
722
  pReader->msg.msgStr = NULL;
95,357,118✔
723
}
95,361,331✔
724

725
SWalReader* tqGetWalReader(STqReader* pReader) {
18,803,908✔
726
  if (pReader == NULL) {
18,803,908✔
727
    return NULL;
×
728
  }
729
  return pReader->pWalReader;
18,803,908✔
730
}
731

732
int64_t tqGetResultBlockTime(STqReader* pReader) {
9,282,593✔
733
  if (pReader == NULL) {
9,282,593✔
734
    return 0;
×
735
  }
736
  return pReader->lastTs;
9,282,593✔
737
}
738

739
bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
24,597,015✔
740
  int32_t code = false;
24,597,015✔
741
  int32_t lino = 0;
24,597,015✔
742
  int64_t uid = 0;
24,597,015✔
743
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
24,597,015✔
744
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
24,597,015✔
745
  TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
24,600,234✔
746

747
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
24,600,234✔
748
  while (pReader->nextBlk < blockSz) {
25,820,621✔
749
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
12,911,541✔
750
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
12,913,301✔
751
    uid = pSubmitTbData->uid;
12,913,301✔
752
    void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
12,913,301✔
753
    TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
12,912,624✔
754

755
    tqTrace("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%" PRId64,
1,220,733✔
756
            pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
757
    pReader->nextBlk++;
1,220,733✔
758
  }
759

760
  tqReaderClearSubmitMsg(pReader);
12,907,651✔
761
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
12,908,358✔
762

763
END:
12,908,358✔
764
  tqTrace("%s:%d return:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
24,600,249✔
765
  return code;
24,598,820✔
766
}
767

768
bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
69,332,522✔
769
  int32_t code = false;
69,332,522✔
770
  int32_t lino = 0;
69,332,522✔
771
  int64_t uid = 0;
69,332,522✔
772

773
  TSDB_CHECK_NULL(pReader, code, lino, END, false);
69,332,522✔
774
  TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
69,332,522✔
775
  TSDB_CHECK_NULL(filterOutUids, code, lino, END, true);
69,356,805✔
776

777
  int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
69,356,805✔
778
  while (pReader->nextBlk < blockSz) {
69,569,076✔
779
    SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
34,790,887✔
780
    TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
34,796,683✔
781
    uid = pSubmitTbData->uid;
34,796,683✔
782
    void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
34,797,628✔
783
    TSDB_CHECK_NULL(ret, code, lino, END, true);
34,797,039✔
784
    tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64, pReader->nextBlk, blockSz, uid);
202,865✔
785
    pReader->nextBlk++;
202,865✔
786
  }
787
  tqReaderClearSubmitMsg(pReader);
34,780,860✔
788
  tqTrace("iterator data block end, total block num:%d, uid:%" PRId64, blockSz, uid);
34,775,167✔
789

790
END:
34,775,167✔
791
  tqTrace("%s:%d get data:%s, uid:%" PRId64, __FUNCTION__, lino, code ? "true" : "false", uid);
69,369,341✔
792
  return code;
69,315,060✔
793
}
794

795
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask,
46,228,962✔
796
                    SExtSchema* extSrc) {
797
  if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) {
46,228,962✔
798
    return TSDB_CODE_INVALID_PARA;
×
799
  }
800
  int32_t code = 0;
46,244,378✔
801

802
  int32_t cnt = 0;
46,244,378✔
803
  for (int32_t i = 0; i < pSrc->nCols; i++) {
259,588,038✔
804
    cnt += mask[i];
213,315,075✔
805
  }
806

807
  pDst->nCols = cnt;
46,262,304✔
808
  pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
46,274,602✔
809
  if (pDst->pSchema == NULL) {
46,235,814✔
810
    return TAOS_GET_TERRNO(terrno);
×
811
  }
812

813
  int32_t j = 0;
46,237,647✔
814
  for (int32_t i = 0; i < pSrc->nCols; i++) {
259,656,440✔
815
    if (mask[i]) {
213,351,799✔
816
      pDst->pSchema[j++] = pSrc->pSchema[i];
213,397,554✔
817
      SColumnInfoData colInfo =
213,390,104✔
818
          createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
213,395,564✔
819
      if (extSrc != NULL) {
213,357,632✔
820
        decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale);
×
821
      }
822
      code = blockDataAppendColInfo(pBlock, &colInfo);
213,357,632✔
823
      if (code != 0) {
213,404,684✔
824
        return code;
×
825
      }
826
    }
827
  }
828
  return 0;
46,279,260✔
829
}
830

831
static int32_t doSetBlobVal(SColumnInfoData* pColumnInfoData, int32_t idx, SColVal* pColVal, SBlobSet* pBlobRow2) {
×
832
  int32_t code = 0;
×
833
  if (pColumnInfoData == NULL || pColVal == NULL || pBlobRow2 == NULL) {
×
834
    return TSDB_CODE_INVALID_PARA;
×
835
  }
836
  // TODO(yhDeng)
837
  if (COL_VAL_IS_VALUE(pColVal)) {
×
838
    char* val = taosMemCalloc(1, pColVal->value.nData + sizeof(BlobDataLenT));
×
839
    if (val == NULL) {
×
840
      return terrno;
×
841
    }
842

843
    uint64_t seq = 0;
×
844
    int32_t  len = 0;
×
845
    if (pColVal->value.pData != NULL) {
×
846
      if (tGetU64(pColVal->value.pData, &seq) < 0){
×
847
        TAOS_CHECK_RETURN(TSDB_CODE_INVALID_PARA);
×
848
      }
849
      SBlobItem item = {0};
×
850
      code = tBlobSetGet(pBlobRow2, seq, &item);
×
851
      if (code != 0) {
×
852
        taosMemoryFree(val);
×
853
        terrno = code;
×
854
        uError("tq set blob val, idx:%d, get blob item failed, seq:%" PRIu64 ", code:%d", idx, seq, code);
×
855
        return code;
×
856
      }
857

858
      val = taosMemRealloc(val, item.len + sizeof(BlobDataLenT));
×
859
      (void)memcpy(blobDataVal(val), item.data, item.len);
×
860
      len = item.len;
×
861
    }
862

863
    blobDataSetLen(val, len);
×
864
    code = colDataSetVal(pColumnInfoData, idx, val, false);
×
865

866
    taosMemoryFree(val);
×
867
  } else {
868
    colDataSetNULL(pColumnInfoData, idx);
×
869
  }
870
  return code;
×
871
}
872
static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SColVal* pColVal) {
2,147,483,647✔
873
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
874

875
  if (IS_VAR_DATA_TYPE(pColVal->value.type)) {
2,147,483,647✔
876
    if (COL_VAL_IS_VALUE(pColVal)) {
2,147,483,647✔
877
      char val[65535 + 2] = {0};
2,147,483,647✔
878
      if (pColVal->value.pData != NULL) {
2,147,483,647✔
879
        (void)memcpy(varDataVal(val), pColVal->value.pData, pColVal->value.nData);
2,147,483,647✔
880
      }
881
      varDataSetLen(val, pColVal->value.nData);
2,147,483,647✔
882
      code = colDataSetVal(pColumnInfoData, rowIndex, val, false);
2,147,483,647✔
883
    } else {
884
      colDataSetNULL(pColumnInfoData, rowIndex);
×
885
    }
886
  } else {
887
    code = colDataSetVal(pColumnInfoData, rowIndex, VALUE_GET_DATUM(&pColVal->value, pColVal->value.type),
2,147,483,647✔
888
                         !COL_VAL_IS_VALUE(pColVal));
2,147,483,647✔
889
  }
890

891
  return code;
2,147,483,647✔
892
}
893

894
static int32_t setBlockData(SSDataBlock* pBlock, int32_t slotId, int32_t rowIndex, SColVal* colVal, SBlobSet* pBlobSet) {
2,147,483,647✔
895
  int32_t        code = 0;
2,147,483,647✔
896
  SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
897
  if (pColData == NULL) {
2,147,483,647✔
898
    return terrno;
×
899
  }
900

901
  uint8_t isBlob = IS_STR_DATA_BLOB(pColData->info.type) ? 1 : 0;
2,147,483,647✔
902
  if (isBlob == 0) {
2,147,483,647✔
903
    code = doSetVal(pColData, rowIndex, colVal);
2,147,483,647✔
904
  } else {
905
    code = doSetBlobVal(pColData, rowIndex, colVal, pBlobSet);
×
906
  }
907
  return code;
2,147,483,647✔
908
}
909

910
static int32_t processSubmitRow(SArray*         pRows,
102,714,375✔
911
                                SSDataBlock*    pBlock,
912
                                SHashObj*       pCol2SlotId,
913
                                STqReader*      pReader,
914
                                SBlobSet*       pBlobSet) {
915
  int32_t        code = 0;
102,714,375✔
916
  int32_t        line = 0;
102,714,375✔
917

918
  SArray* pColArray = taosArrayInit(4, INT_BYTES * 2);
102,714,375✔
919
  TSDB_CHECK_NULL(pColArray, code, line, END, terrno);
102,717,611✔
920

921
  int32_t sourceIdx = -1;
102,717,611✔
922
  int32_t rowIndex = 0;
102,717,611✔
923
  SRow* pRow = taosArrayGetP(pRows, rowIndex);
102,717,611✔
924
  TSDB_CHECK_NULL(pRow, code, line, END, terrno);
102,721,955✔
925
  while (++sourceIdx < pReader->pTSchema->numOfCols) {
833,186,316✔
926
    SColVal colVal = {0};
730,578,087✔
927
    code = tRowGet(pRow, pReader->pTSchema, sourceIdx, &colVal);
730,511,060✔
928
    TSDB_CHECK_CODE(code, line, END);
730,439,332✔
929
    void* pSlotId = taosHashGet(pCol2SlotId, &colVal.cid, sizeof(colVal.cid));
730,439,332✔
930
    if (pSlotId == NULL) {
731,019,516✔
931
      continue;
266,461,768✔
932
    }
933
    int32_t pData[2] = {sourceIdx, *(int16_t*)pSlotId};
464,557,748✔
934
    TSDB_CHECK_NULL(taosArrayPush(pColArray, pData), code, line, END, terrno);
464,509,923✔
935
    code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
464,509,923✔
936
    TSDB_CHECK_CODE(code, line, END);
463,953,592✔
937
  }
938
  
939
  for (rowIndex = 1; rowIndex < taosArrayGetSize(pRows); rowIndex++) {
2,147,483,647✔
940
    SRow* pRow = taosArrayGetP(pRows, rowIndex);
2,147,483,647✔
941
    TSDB_CHECK_NULL(pRow, code, line, END, terrno);
2,147,483,647✔
942
    for (int32_t j = 0; j < taosArrayGetSize(pColArray); j++) {
2,147,483,647✔
943
      int32_t* pData = taosArrayGet(pColArray, j);
2,147,483,647✔
944
      TSDB_CHECK_NULL(pData, code, line, END, terrno);
2,147,483,647✔
945

946
      SColVal colVal = {0};
2,147,483,647✔
947
      code = tRowGet(pRow, pReader->pTSchema, pData[0], &colVal);
2,147,483,647✔
948
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
949

950
      code = setBlockData(pBlock, pData[1], pBlock->info.rows + rowIndex, &colVal, pBlobSet);
2,147,483,647✔
951
      TSDB_CHECK_CODE(code, line, END);
2,147,483,647✔
952
    }
953
  }
954

955
  END:
102,661,099✔
956
  taosArrayDestroy(pColArray);
82,782,261✔
957
  return code;
102,684,072✔
958
}
959

960
static int32_t processSubmitCol(SArray*         pCols,
1,376✔
961
                                SSDataBlock*    pBlock,
962
                                SHashObj*       pCol2SlotId,
963
                                SBlobSet*       pBlobSet) {
964
  int32_t        code = 0;
1,376✔
965
  int32_t        line = 0;
1,376✔
966

967
  for (int32_t i = 0; i < taosArrayGetSize(pCols); i++) {
4,128✔
968
    SColData* pCol = taosArrayGet(pCols, i);
2,752✔
969
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
2,752✔
970
    void* pSlotId = taosHashGet(pCol2SlotId, &pCol->cid, sizeof(pCol->cid));
2,752✔
971
    if (pSlotId == NULL) {
2,752✔
972
      continue;
1,376✔
973
    }
974
    SColVal colVal = {0};
1,376✔
975
    for (int32_t row = 0; row < pCol->nVal; row++) {
4,128✔
976
      code = tColDataGetValue(pCol, row, &colVal);
2,752✔
977
      TSDB_CHECK_CODE(code, line, END);
2,752✔
978

979
      code = setBlockData(pBlock, *(int16_t*)pSlotId, pBlock->info.rows + row, &colVal, pBlobSet);
2,752✔
980
      TSDB_CHECK_CODE(code, line, END);
2,752✔
981
    }
982
  }
983
  
984
  END:
1,376✔
985
  return code;
1,376✔
986
}
987

988
static int32_t checkSchema(STqReader* pReader, SSubmitTbData* pSubmitTbData) {
102,645,885✔
989
  int32_t vgId = pReader->pWalReader->pWal->cfg.vgId;
102,645,885✔
990
  int32_t sversion = pSubmitTbData->sver;
102,725,141✔
991
  int64_t suid = pSubmitTbData->suid;
102,725,127✔
992
  int64_t uid = pSubmitTbData->uid;
102,718,252✔
993
  if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) ||
102,724,403✔
994
      (pReader->cachedSchemaVer != sversion)) {
102,573,716✔
995
    tDeleteSchemaWrapper(pReader->pSchemaWrapper);
162,633✔
996
    taosMemoryFreeClear(pReader->extSchema);
152,859✔
997
    taosMemoryFreeClear(pReader->pTSchema);
152,859✔
998
    pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
152,859✔
999
    if (pReader->pSchemaWrapper == NULL) {
152,510✔
1000
      tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d, possibly dropped table",
×
1001
              vgId, suid, uid, pReader->cachedSchemaVer);
1002
      return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
×
1003
    }
1004
    pReader->pTSchema = tBuildTSchema(pReader->pSchemaWrapper->pSchema, pReader->pSchemaWrapper->nCols, pReader->pSchemaWrapper->version);
152,510✔
1005
    if (pReader->pTSchema == NULL) {
152,859✔
1006
      tqWarn("vgId:%d, cannot build schema for table: suid:%" PRId64 ", uid:%" PRId64 ",version %d",
×
1007
              vgId, suid, uid, pReader->cachedSchemaVer);
1008
      return terrno;
×
1009
    }
1010
    pReader->cachedSchemaUid = uid;
152,134✔
1011
    pReader->cachedSchemaSuid = suid;
152,483✔
1012
    pReader->cachedSchemaVer = sversion;
152,859✔
1013
  }
1014
  return TSDB_CODE_SUCCESS;
102,712,819✔
1015
}
1016

1017
static int32_t tqRetrieveCols(STqReader* pReader, SSDataBlock* pBlock, SHashObj* pCol2SlotId) {
102,724,047✔
1018
  if (pReader == NULL || pBlock == NULL) {
102,724,047✔
1019
    return TSDB_CODE_INVALID_PARA;
×
1020
  }
1021
  tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
102,728,399✔
1022
  int32_t        code = 0;
102,718,973✔
1023
  int32_t        line = 0;
102,718,973✔
1024
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
102,718,973✔
1025
  TSDB_CHECK_NULL(pSubmitTbData, code, line, END, terrno);
102,708,851✔
1026
  pReader->lastTs = pSubmitTbData->ctimeMs;
102,708,851✔
1027

1028
  int32_t numOfRows = 0;
102,726,952✔
1029
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
102,726,952✔
1030
    SColData* pCol = taosArrayGet(pSubmitTbData->aCol, 0);
1,376✔
1031
    TSDB_CHECK_NULL(pCol, code, line, END, terrno);
1,376✔
1032
    numOfRows = pCol->nVal;
1,376✔
1033
  } else {
1034
    numOfRows = taosArrayGetSize(pSubmitTbData->aRowP);
102,719,059✔
1035
  }
1036

1037
  code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + numOfRows);
102,725,866✔
1038
  TSDB_CHECK_CODE(code, line, END);
102,720,436✔
1039

1040
  code = checkSchema(pReader, pSubmitTbData);
102,720,436✔
1041
  TSDB_CHECK_CODE(code, line, END);
102,720,058✔
1042

1043
  // convert and scan one block
1044
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
102,720,058✔
1045
    SArray* pCols = pSubmitTbData->aCol;
1,376✔
1046
    code = processSubmitCol(pCols, pBlock, pCol2SlotId, pSubmitTbData->pBlobSet);
1,376✔
1047
    TSDB_CHECK_CODE(code, line, END);
1,376✔
1048
  } else {
1049
    SArray*         pRows = pSubmitTbData->aRowP;
102,723,040✔
1050
    code = processSubmitRow(pRows, pBlock, pCol2SlotId, pReader, pSubmitTbData->pBlobSet);
102,721,579✔
1051
    TSDB_CHECK_CODE(code, line, END);
102,669,155✔
1052
  }
1053
  pBlock->info.rows += numOfRows;
102,670,531✔
1054
END:
102,700,253✔
1055
  if (code != 0) {
102,700,253✔
1056
    tqError("tqRetrieveCols failed, line:%d, msg:%s", line, tstrerror(code));
×
1057
  }
1058
  return code;
102,676,424✔
1059
}
1060

1061
#define PROCESS_VAL                                      \
1062
  if (curRow == 0) {                                     \
1063
    assigned[j] = !COL_VAL_IS_NONE(&colVal);             \
1064
    buildNew = true;                                     \
1065
  } else {                                               \
1066
    bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); \
1067
    if (currentRowAssigned != assigned[j]) {             \
1068
      assigned[j] = currentRowAssigned;                  \
1069
      buildNew = true;                                   \
1070
    }                                                    \
1071
  }
1072

1073
#define SET_DATA                                                                                    \
1074
  if (colVal.cid < pColData->info.colId) {                                                          \
1075
    sourceIdx++;                                                                                    \
1076
  } else if (colVal.cid == pColData->info.colId) {                                                  \
1077
    if (IS_STR_DATA_BLOB(pColData->info.type)) {                                                    \
1078
      TQ_ERR_GO_TO_END(doSetBlobVal(pColData, curRow - lastRow, &colVal, pSubmitTbData->pBlobSet)); \
1079
    } else {                                                                                        \
1080
      TQ_ERR_GO_TO_END(doSetVal(pColData, curRow - lastRow, &colVal));                              \
1081
    }                                                                                               \
1082
    sourceIdx++;                                                                                    \
1083
    targetIdx++;                                                                                    \
1084
  } else {                                                                                          \
1085
    colDataSetNULL(pColData, curRow - lastRow);                                                     \
1086
    targetIdx++;                                                                                    \
1087
  }
1088

1089
static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas,
46,205,629✔
1090
                               char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) {
1091
  int32_t         code = 0;
46,205,629✔
1092
  SSchemaWrapper* pSW = NULL;
46,205,629✔
1093
  SSDataBlock*    block = NULL;
46,224,874✔
1094
  if (taosArrayGetSize(blocks) > 0) {
46,224,874✔
1095
    SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
×
1096
    TQ_NULL_GO_TO_END(pLastBlock);
×
1097
    pLastBlock->info.rows = curRow - *lastRow;
×
1098
    *lastRow = curRow;
×
1099
  }
1100

1101
  block = taosMemoryCalloc(1, sizeof(SSDataBlock));
46,257,332✔
1102
  TQ_NULL_GO_TO_END(block);
46,211,328✔
1103

1104
  pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
46,211,328✔
1105
  TQ_NULL_GO_TO_END(pSW);
46,233,588✔
1106

1107
  TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema));
46,233,588✔
1108
  tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId,
46,278,498✔
1109
          (int32_t)taosArrayGetSize(block->pDataBlock));
1110

1111
  block->info.id.uid = pSubmitTbData->uid;
46,278,498✔
1112
  block->info.version = pReader->msg.ver;
46,261,040✔
1113
  TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow));
46,267,264✔
1114
  TQ_NULL_GO_TO_END(taosArrayPush(blocks, block));
46,271,712✔
1115
  TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW));
46,266,764✔
1116
  pSW = NULL;
46,266,764✔
1117

1118
  taosMemoryFreeClear(block);
46,266,764✔
1119

1120
END:
46,270,797✔
1121
  if (code != 0) {
46,267,417✔
1122
    tqError("processBuildNew failed, code:%d", code);
×
1123
  }
1124
  tDeleteSchemaWrapper(pSW);
46,267,417✔
1125
  blockDataFreeRes(block);
46,243,530✔
1126
  taosMemoryFree(block);
46,243,707✔
1127
  return code;
46,267,019✔
1128
}
1129
static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
724,905✔
1130
  int32_t code = 0;
724,905✔
1131
  int32_t curRow = 0;
724,905✔
1132
  int32_t lastRow = 0;
724,905✔
1133

1134
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
724,905✔
1135
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
724,905✔
1136
  TQ_NULL_GO_TO_END(assigned);
725,243✔
1137

1138
  SArray*   pCols = pSubmitTbData->aCol;
725,243✔
1139
  SColData* pCol = taosArrayGet(pCols, 0);
725,580✔
1140
  TQ_NULL_GO_TO_END(pCol);
725,242✔
1141
  int32_t numOfRows = pCol->nVal;
725,242✔
1142
  int32_t numOfCols = taosArrayGetSize(pCols);
725,580✔
1143
  tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols,
725,243✔
1144
          numOfRows);
1145
  for (int32_t i = 0; i < numOfRows; i++) {
103,281,553✔
1146
    bool buildNew = false;
102,387,111✔
1147

1148
    for (int32_t j = 0; j < pSchemaWrapper->nCols; j++) {
407,992,339✔
1149
      int32_t k = 0;
303,950,183✔
1150
      for (; k < numOfCols; k++) {
604,673,376✔
1151
        pCol = taosArrayGet(pCols, k);
593,436,090✔
1152
        TQ_NULL_GO_TO_END(pCol);
593,153,261✔
1153
        if (pSchemaWrapper->pSchema[j].colId == pCol->cid) {
593,153,261✔
1154
          SColVal colVal = {0};
304,093,585✔
1155
          TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
304,959,878✔
1156
          PROCESS_VAL
305,718,377✔
1157
          tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], numOfCols);
306,116,170✔
1158
          break;
305,224,778✔
1159
        }
1160
      }
1161
      if (k >= numOfCols) {
305,605,228✔
1162
        // this column is not in the current row, so we set it to NULL
1163
        assigned[j] = 0;
×
1164
        buildNew = true;
×
1165
      }
1166
    }
1167

1168
    if (buildNew) {
101,760,318✔
1169
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
725,243✔
1170
    }
1171

1172
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
101,760,655✔
1173
    TQ_NULL_GO_TO_END(pBlock);
102,345,700✔
1174

1175
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
102,345,700✔
1176
            (int32_t)taosArrayGetSize(blocks));
1177

1178
    int32_t targetIdx = 0;
102,345,700✔
1179
    int32_t sourceIdx = 0;
102,345,700✔
1180
    int32_t colActual = blockDataGetNumOfCols(pBlock);
102,345,700✔
1181
    while (targetIdx < colActual && sourceIdx < numOfCols) {
407,733,759✔
1182
      pCol = taosArrayGet(pCols, sourceIdx);
305,177,111✔
1183
      TQ_NULL_GO_TO_END(pCol);
304,283,124✔
1184
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
304,283,124✔
1185
      TQ_NULL_GO_TO_END(pColData);
298,665,207✔
1186
      SColVal colVal = {0};
298,665,207✔
1187
      TQ_ERR_GO_TO_END(tColDataGetValue(pCol, i, &colVal));
303,569,000✔
1188
      SET_DATA
305,354,307✔
1189
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
306,123,014✔
1190
    }
1191

1192
    curRow++;
102,556,648✔
1193
  }
1194
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
894,442✔
1195
  pLastBlock->info.rows = curRow - lastRow;
725,580✔
1196
  tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId,
725,580✔
1197
          numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
1198
END:
13,211,742✔
1199
  if (code != TSDB_CODE_SUCCESS) {
725,580✔
1200
    tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1201
  }
1202
  taosMemoryFree(assigned);
725,580✔
1203
  return code;
725,580✔
1204
}
1205

1206
int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) {
45,477,841✔
1207
  int32_t   code = 0;
45,477,841✔
1208
  STSchema* pTSchema = NULL;
45,477,841✔
1209

1210
  SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
45,477,841✔
1211
  char*           assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
45,504,600✔
1212
  TQ_NULL_GO_TO_END(assigned);
45,521,141✔
1213

1214
  int32_t curRow = 0;
45,521,141✔
1215
  int32_t lastRow = 0;
45,521,141✔
1216
  SArray* pRows = pSubmitTbData->aRowP;
45,505,316✔
1217
  int32_t numOfRows = taosArrayGetSize(pRows);
45,545,746✔
1218
  pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
45,538,427✔
1219
  TQ_NULL_GO_TO_END(pTSchema);
45,539,249✔
1220
  tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
45,539,249✔
1221

1222
  for (int32_t i = 0; i < numOfRows; i++) {
1,315,199,345✔
1223
    bool  buildNew = false;
1,269,799,007✔
1224
    SRow* pRow = taosArrayGetP(pRows, i);
1,269,799,007✔
1225
    TQ_NULL_GO_TO_END(pRow);
1,268,101,627✔
1226

1227
    for (int32_t j = 0; j < pTSchema->numOfCols; j++) {
2,147,483,647✔
1228
      SColVal colVal = {0};
2,147,483,647✔
1229
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, j, &colVal));
2,147,483,647✔
1230
      PROCESS_VAL
2,147,483,647✔
1231
      tqTrace("assign[%d] = %d, nCols:%d", j, assigned[j], pTSchema->numOfCols);
2,147,483,647✔
1232
    }
1233

1234
    if (buildNew) {
1,258,956,303✔
1235
      TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow));
45,551,895✔
1236
    }
1237

1238
    SSDataBlock* pBlock = taosArrayGetLast(blocks);
1,258,940,236✔
1239
    TQ_NULL_GO_TO_END(pBlock);
1,269,389,702✔
1240

1241
    tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId,
1,269,389,702✔
1242
            (int32_t)taosArrayGetSize(blocks));
1243

1244
    int32_t targetIdx = 0;
1,269,389,702✔
1245
    int32_t sourceIdx = 0;
1,269,389,702✔
1246
    int32_t colActual = blockDataGetNumOfCols(pBlock);
1,269,389,702✔
1247
    while (targetIdx < colActual && sourceIdx < pTSchema->numOfCols) {
2,147,483,647✔
1248
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
2,147,483,647✔
1249
      TQ_NULL_GO_TO_END(pColData);
2,147,483,647✔
1250
      SColVal          colVal = {0};
2,147,483,647✔
1251
      TQ_ERR_GO_TO_END(tRowGet(pRow, pTSchema, sourceIdx, &colVal));
2,147,483,647✔
1252
      SET_DATA
2,147,483,647✔
1253
      tqTrace("targetIdx:%d sourceIdx:%d colActual:%d", targetIdx, sourceIdx, colActual);
2,147,483,647✔
1254
    }
1255

1256
    curRow++;
1,269,707,659✔
1257
  }
1258
  SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
45,400,338✔
1259
  if (pLastBlock != NULL) {
45,548,771✔
1260
    pLastBlock->info.rows = curRow - lastRow;
45,551,698✔
1261
  }
1262

1263
  tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows,
45,552,328✔
1264
          (int)taosArrayGetSize(blocks));
1265
END:
48,232,528✔
1266
  if (code != TSDB_CODE_SUCCESS) {
45,529,527✔
1267
    tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
×
1268
  }
1269
  taosMemoryFreeClear(pTSchema);
45,517,825✔
1270
  taosMemoryFree(assigned);
45,498,472✔
1271
  return code;
45,532,461✔
1272
}
1273

1274
static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq) {
1,356✔
1275
  int32_t code = 0;
1,356✔
1276
  int32_t lino = 0;
1,356✔
1277
  void*   createReq = NULL;
1,356✔
1278
  TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
1,356✔
1279
  TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA);
1,356✔
1280

1281
  if (pRsp->createTableNum == 0) {
1,356✔
1282
    pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
1,356✔
1283
    TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno);
1,356✔
1284
    pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
1,356✔
1285
    TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno);
1,356✔
1286
  }
1287

1288
  uint32_t len = 0;
1,356✔
1289
  tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code);
1,356✔
1290
  TSDB_CHECK_CODE(code, lino, END);
1,356✔
1291
  createReq = taosMemoryCalloc(1, len);
1,356✔
1292
  TSDB_CHECK_NULL(createReq, code, lino, END, terrno);
1,356✔
1293

1294
  SEncoder encoder = {0};
1,356✔
1295
  tEncoderInit(&encoder, createReq, len);
1,356✔
1296
  code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq);
1,356✔
1297
  tEncoderClear(&encoder);
1,356✔
1298
  TSDB_CHECK_CODE(code, lino, END);
1,356✔
1299
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
2,712✔
1300
  TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
2,712✔
1301
  pRsp->createTableNum++;
1,356✔
1302
  tqTrace("build create table info msg success");
1,356✔
1303

1304
END:
1,356✔
1305
  if (code != 0) {
1,356✔
1306
    tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code));
×
1307
    taosMemoryFree(createReq);
×
1308
  }
1309
  return code;
1,356✔
1310
}
1311

1312
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas,
46,278,729✔
1313
                             SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
1314
  tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
46,278,729✔
1315
  SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
46,278,729✔
1316
  if (pSubmitTbData == NULL) {
46,283,778✔
1317
    return terrno;
×
1318
  }
1319
  pReader->nextBlk++;
46,283,778✔
1320

1321
  if (pSubmitTbDataRet) {
46,277,785✔
1322
    *pSubmitTbDataRet = pSubmitTbData;
46,284,464✔
1323
  }
1324

1325
  if (fetchMeta == ONLY_META) {
46,277,785✔
1326
    if (pSubmitTbData->pCreateTbReq != NULL) {
951✔
1327
      if (pRsp->createTableReq == NULL) {
951✔
1328
        pRsp->createTableReq = taosArrayInit(0, POINTER_BYTES);
951✔
1329
        if (pRsp->createTableReq == NULL) {
951✔
1330
          return terrno;
×
1331
        }
1332
      }
1333
      if (taosArrayPush(pRsp->createTableReq, &pSubmitTbData->pCreateTbReq) == NULL) {
1,902✔
1334
        return terrno;
×
1335
      }
1336
      pSubmitTbData->pCreateTbReq = NULL;
951✔
1337
    }
1338
    return 0;
951✔
1339
  }
1340

1341
  int32_t sversion = pSubmitTbData->sver;
46,276,834✔
1342
  int64_t uid = pSubmitTbData->uid;
46,278,913✔
1343
  pReader->lastBlkUid = uid;
46,277,515✔
1344

1345
  tDeleteSchemaWrapper(pReader->pSchemaWrapper);
46,278,542✔
1346
  taosMemoryFreeClear(pReader->extSchema);
46,275,038✔
1347
  pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnode->pMeta, uid, sversion, 1, &pReader->extSchema, 0);
46,280,840✔
1348
  if (pReader->pSchemaWrapper == NULL) {
46,239,705✔
1349
    tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
8,125✔
1350
           pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer);
1351
    pReader->cachedSchemaSuid = 0;
8,125✔
1352
    return TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
8,125✔
1353
  }
1354

1355
  if (pSubmitTbData->pCreateTbReq != NULL) {
46,233,683✔
1356
    int32_t code = buildCreateTbInfo(pRsp, pSubmitTbData->pCreateTbReq);
1,356✔
1357
    if (code != 0) {
1,356✔
1358
      return code;
×
1359
    }
1360
  } else if (rawList != NULL) {
46,239,646✔
1361
    if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL) {
×
1362
      return terrno;
×
1363
    }
1364
    pReader->pSchemaWrapper = NULL;
×
1365
    return 0;
×
1366
  }
1367

1368
  if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
46,241,002✔
1369
    return tqProcessColData(pReader, pSubmitTbData, blocks, schemas);
724,905✔
1370
  } else {
1371
    return tqProcessRowData(pReader, pSubmitTbData, blocks, schemas);
45,509,880✔
1372
  }
1373
}
1374

1375
int32_t tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList, const char* id) {
352,166✔
1376
  if (pReader == NULL || tbUidList == NULL) {
352,166✔
1377
    return TSDB_CODE_SUCCESS;
×
1378
  }
1379
  if (pReader->tbIdHash) {
352,481✔
1380
    taosHashClear(pReader->tbIdHash);
×
1381
  } else {
1382
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
352,481✔
1383
    if (pReader->tbIdHash == NULL) {
352,481✔
1384
      tqError("s-task:%s failed to init hash table", id);
×
1385
      return terrno;
×
1386
    }
1387
  }
1388

1389
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
10,382,889✔
1390
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
10,031,060✔
1391
    if (pKey && taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
10,030,784✔
1392
      tqError("s-task:%s failed to add table uid:%" PRId64 " to hash", id, *pKey);
×
1393
      continue;
×
1394
    }
1395
  }
1396

1397
  tqDebug("s-task:%s %d tables are set to be queried target table", id, (int32_t)taosArrayGetSize(tbUidList));
352,144✔
1398
  return TSDB_CODE_SUCCESS;
352,481✔
1399
}
1400

1401
void tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) {
395,895✔
1402
  if (pReader == NULL || pTableUidList == NULL) {
395,895✔
1403
    return;
×
1404
  }
1405
  if (pReader->tbIdHash == NULL) {
395,895✔
1406
    pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
×
1407
    if (pReader->tbIdHash == NULL) {
×
1408
      tqError("failed to init hash table");
×
1409
      return;
×
1410
    }
1411
  }
1412

1413
  int32_t numOfTables = taosArrayGetSize(pTableUidList);
395,895✔
1414
  for (int i = 0; i < numOfTables; i++) {
654,193✔
1415
    int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i);
258,298✔
1416
    if (taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0) != 0) {
258,298✔
1417
      tqError("failed to add table uid:%" PRId64 " to hash", *pKey);
×
1418
      continue;
×
1419
    }
1420
    tqDebug("%s add table uid:%" PRId64 " to hash", __func__, *pKey);
258,298✔
1421
  }
1422
}
1423

1424
bool tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid) {
×
1425
  if (pReader == NULL) {
×
1426
    return false;
×
1427
  }
1428
  return taosHashGet(pReader->tbIdHash, &uid, sizeof(uint64_t)) != NULL;
×
1429
}
1430

1431
bool tqCurrentBlockConsumed(const STqReader* pReader) {
×
1432
  if (pReader == NULL) {
×
1433
    return false;
×
1434
  }
1435
  return pReader->msg.msgStr == NULL;
×
1436
}
1437

1438
void tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
2,160✔
1439
  if (pReader == NULL || tbUidList == NULL) {
2,160✔
1440
    return;
×
1441
  }
1442
  for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
4,320✔
1443
    int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
2,160✔
1444
    int32_t code = taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
2,160✔
1445
    if (code != 0) {
2,160✔
1446
      tqWarn("%s failed to remove table uid:%" PRId64 " from hash, msg:%s", __func__, pKey != NULL ? *pKey : 0, tstrerror(code));
720✔
1447
    }
1448
  }
1449
}
1450

1451
int32_t tqDeleteTbUidList(STQ* pTq, SArray* tbUidList) {
2,537,484✔
1452
  if (pTq == NULL) {
2,537,484✔
1453
    return 0;  // mounted vnode may have no tq
×
1454
  }
1455
  if (tbUidList == NULL) {
2,537,484✔
1456
    return TSDB_CODE_INVALID_PARA;
×
1457
  }
1458
  void*   pIter = NULL;
2,537,484✔
1459
  int32_t vgId = TD_VID(pTq->pVnode);
2,537,484✔
1460

1461
  // update the table list for each consumer handle
1462
  taosWLockLatch(&pTq->lock);
2,537,484✔
1463
  while (1) {
335,246✔
1464
    pIter = taosHashIterate(pTq->pHandle, pIter);
2,872,730✔
1465
    if (pIter == NULL) {
2,872,730✔
1466
      break;
2,537,484✔
1467
    }
1468

1469
    STqHandle* pTqHandle = (STqHandle*)pIter;
335,246✔
1470
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " delete table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
335,246✔
1471
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
335,246✔
1472
      int32_t code = qDeleteTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
×
1473
      if (code != 0) {
×
1474
        tqError("update qualified table error for %s", pTqHandle->subKey);
×
1475
        continue;
×
1476
      }
1477
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
335,246✔
1478
      int32_t sz = taosArrayGetSize(tbUidList);
335,246✔
1479
      for (int32_t i = 0; i < sz; i++) {
768,896✔
1480
        int64_t* tbUid = (int64_t*)taosArrayGet(tbUidList, i);
433,650✔
1481
        if (tbUid &&
867,300✔
1482
            taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, tbUid, sizeof(int64_t), NULL, 0) != 0) {
433,650✔
1483
          tqError("failed to add table uid:%" PRId64 " to hash", *tbUid);
×
1484
          continue;
×
1485
        }
1486
      }
1487
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
×
1488
      tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
×
1489
    }
1490
  }
1491
  taosWUnLockLatch(&pTq->lock);
2,537,484✔
1492
  return 0;
2,537,484✔
1493
}
1494

1495
static SArray* copyUidList(const SArray* tbUidList) {
10,520✔
1496
  SArray* tbUidListCopy = taosArrayInit(4, sizeof(int64_t));
10,520✔
1497
  if (tbUidListCopy == NULL) {
10,520✔
1498
    return NULL;
×
1499
  }
1500

1501
  if (taosArrayAddAll(tbUidListCopy, tbUidList) == NULL) {
10,520✔
1502
    taosArrayDestroy(tbUidListCopy);
×
1503
    tqError("copy table uid list failed");
×
1504
    return NULL;
×
1505
  }
1506
  return tbUidListCopy;
10,520✔
1507
}
1508

1509
static int32_t addTableListForStableTmq(STqHandle* pTqHandle, STQ* pTq, const SArray* tbUidList) {
10,520✔
1510
  int32_t code = 0;
10,520✔
1511
  SArray* tbUidListCopy = copyUidList(tbUidList);
10,520✔
1512
  if (tbUidListCopy == NULL) {
10,520✔
1513
    code = terrno;
×
1514
    goto END;
×
1515
  }
1516
  code = qFilterTableList(pTq->pVnode, tbUidListCopy, pTqHandle->execHandle.execTb.node,
10,520✔
1517
                      pTqHandle->execHandle.task, pTqHandle->execHandle.execTb.suid);
10,520✔
1518
  if (code != TDB_CODE_SUCCESS) {
10,520✔
1519
    tqError("tqAddTbUidList error:%d handle %s consumer:0x%" PRIx64, code, pTqHandle->subKey,
×
1520
            pTqHandle->consumerId);
1521
    goto END;
×
1522
  }
1523
  tqDebug("%s handle %s consumer:0x%" PRIx64 " add %d tables to tqReader", __func__, pTqHandle->subKey,
10,520✔
1524
          pTqHandle->consumerId, (int32_t)taosArrayGetSize(tbUidListCopy));
1525
  tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, tbUidListCopy);
10,520✔
1526

1527
END:
10,520✔
1528
  taosArrayDestroy(tbUidListCopy);
10,520✔
1529
  return code;
10,520✔
1530
}
1531

1532
int32_t tqAddTbUidList(STQ* pTq, const SArray* tbUidList) {
58,427,624✔
1533
  if (pTq == NULL) {
58,427,624✔
1534
    return 0;  // mounted vnode may have no tq
×
1535
  }
1536
  if (tbUidList == NULL) {
58,427,624✔
1537
    return TSDB_CODE_INVALID_PARA;
×
1538
  }
1539
  void*   pIter = NULL;
58,427,624✔
1540
  int32_t vgId = TD_VID(pTq->pVnode);
58,427,624✔
1541
  int32_t code = 0;
58,428,497✔
1542

1543
  // update the table list for each consumer handle
1544
  taosWLockLatch(&pTq->lock);
58,428,497✔
1545
  while (1) {
1,056,155✔
1546
    pIter = taosHashIterate(pTq->pHandle, pIter);
59,483,973✔
1547
    if (pIter == NULL) {
59,483,862✔
1548
      break;
58,427,707✔
1549
    }
1550

1551
    STqHandle* pTqHandle = (STqHandle*)pIter;
1,056,155✔
1552
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " add table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
1,056,155✔
1553
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
1,056,155✔
1554
      code = qAddTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
385,375✔
1555
      if (code != 0) {
385,375✔
1556
        tqError("add table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1557
        break;
×
1558
      }
1559
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
670,780✔
1560
      code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
8,360✔
1561
      if (code != 0) {
8,360✔
1562
        tqError("add table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1563
        break;
×
1564
      }
1565
    }
1566
  }
1567
  taosHashCancelIterate(pTq->pHandle, pIter);
58,427,707✔
1568
  taosWUnLockLatch(&pTq->lock);
58,427,818✔
1569

1570
  return code;
58,428,934✔
1571
}
1572

1573
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, SArray* cidList, SArray* cidListArray) {
7,909,090✔
1574
  if (pTq == NULL) {
7,909,090✔
1575
    return 0;  // mounted vnode may have no tq
×
1576
  }
1577
  if (tbUidList == NULL) {
7,909,090✔
1578
    return TSDB_CODE_INVALID_PARA;
×
1579
  }
1580
  void*   pIter = NULL;
7,909,090✔
1581
  int32_t vgId = TD_VID(pTq->pVnode);
7,909,090✔
1582
  int32_t code = 0;
7,909,090✔
1583
  // update the table list for each consumer handle
1584
  taosWLockLatch(&pTq->lock);
7,909,090✔
1585
  while (1) {
2,846✔
1586
    pIter = taosHashIterate(pTq->pHandle, pIter);
7,911,936✔
1587
    if (pIter == NULL) {
7,911,936✔
1588
      break;
7,909,090✔
1589
    }
1590

1591
    STqHandle* pTqHandle = (STqHandle*)pIter;
2,846✔
1592
    tqDebug("%s subKey:%s, consumer:0x%" PRIx64 " update table list", __func__, pTqHandle->subKey, pTqHandle->consumerId);
2,846✔
1593
    if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
2,846✔
1594
      SNode* pTagCond = getTagCondNodeForQueryTmq(pTqHandle->execHandle.task);
686✔
1595
      bool ret = checkCidInTagCondition(pTagCond, cidList);
686✔
1596
      if (ret){
686✔
1597
        code = qUpdateTableListForTmqScanner(pTqHandle->execHandle.task, tbUidList);
×
1598
        if (code != 0) {
×
1599
          tqError("update table list for query tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1600
          break;
×
1601
        }
1602
      }
1603
      qUpdateTableTagCacheForTmq(pTqHandle->execHandle.task, tbUidList, cidList, cidListArray);
686✔
1604
    } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
2,160✔
1605
      SNode* pTagCond = getTagCondNodeForStableTmq(pTqHandle->execHandle.execTb.node);
2,160✔
1606
      bool ret = checkCidInTagCondition(pTagCond, cidList);
2,160✔
1607
      if (ret){
2,160✔
1608
        tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
2,160✔
1609
        code = addTableListForStableTmq(pTqHandle, pTq, tbUidList);
2,160✔
1610
        if (code != 0) {
2,160✔
1611
          tqError("update table list for stable tmq error for %s, msg:%s", pTqHandle->subKey, tstrerror(code));
×
1612
          break;
×
1613
        }
1614
      }
1615
    }
1616
  }
1617

1618
  taosHashCancelIterate(pTq->pHandle, pIter);
7,909,090✔
1619
  taosWUnLockLatch(&pTq->lock);
7,909,090✔
1620

1621
  return code;
7,909,090✔
1622
}
1623

1624
static void destroySourceScanTables(void* ptr) {
×
1625
  SArray** pTables = ptr;
×
1626
  if (pTables && *pTables) {
×
1627
    taosArrayDestroy(*pTables);
×
1628
    *pTables = NULL;
×
1629
  }
1630
}
×
1631

1632
static int32_t compareSVTColInfo(const void* p1, const void* p2) {
×
1633
  SVTColInfo* pCol1 = (SVTColInfo*)p1;
×
1634
  SVTColInfo* pCol2 = (SVTColInfo*)p2;
×
1635
  if (pCol1->vColId == pCol2->vColId) {
×
1636
    return 0;
×
1637
  } else if (pCol1->vColId < pCol2->vColId) {
×
1638
    return -1;
×
1639
  } else {
1640
    return 1;
×
1641
  }
1642
}
1643

1644
static void freeTableSchemaCache(const void* key, size_t keyLen, void* value, void* ud) {
×
1645
  if (value) {
×
1646
    SSchemaWrapper* pSchemaWrapper = value;
×
1647
    tDeleteSchemaWrapper(pSchemaWrapper);
1648
  }
1649
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc