• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4513

17 Jul 2025 02:02AM UTC coverage: 31.359% (-31.1%) from 62.446%
#4513

push

travis-ci

web-flow
Merge pull request #31914 from taosdata/fix/3.0/compare-ans-failed

fix:Convert line endings from LF to CRLF for ans file

68541 of 301034 branches covered (22.77%)

Branch coverage included in aggregate %.

117356 of 291771 relevant lines covered (40.22%)

602262.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/tq/tqSink.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tcommon.h"
17
#include "tq.h"
18

19
typedef struct STableSinkInfo {
20
  uint64_t uid;
21
  tstr     name;
22
} STableSinkInfo;
23

24
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
×
25
                         const char* pIdStr, bool newSubTableRule) {
26
  int32_t          totalRows = pDataBlock->info.rows;
×
27
  SColumnInfoData* pStartTsCol = taosArrayGet(pDataBlock->pDataBlock, START_TS_COLUMN_INDEX);
×
28
  SColumnInfoData* pEndTsCol = taosArrayGet(pDataBlock->pDataBlock, END_TS_COLUMN_INDEX);
×
29
  SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
×
30
  SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
×
31

32
  if (pStartTsCol == NULL || pEndTsCol == NULL || pGidCol == NULL || pTbNameCol == NULL) {
×
33
    return terrno;
×
34
  }
35

36
  tqDebug("s-task:%s build %d rows delete msg for table:%s", pIdStr, totalRows, stbFullName);
×
37

38
  for (int32_t row = 0; row < totalRows; row++) {
×
39
    int64_t skey = *(int64_t*)colDataGetData(pStartTsCol, row);
×
40
    int64_t ekey = *(int64_t*)colDataGetData(pEndTsCol, row);
×
41
    int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
×
42

43
    char* name = NULL;
×
44
    char* originName = NULL;
×
45
    void* varTbName = NULL;
×
46
    if (!colDataIsNull(pTbNameCol, totalRows, row, NULL)) {
×
47
      varTbName = colDataGetVarData(pTbNameCol, row);
×
48
    }
49

50
    if (varTbName != NULL && varTbName != (void*)-1) {
×
51
      size_t cap = TMAX(TSDB_TABLE_NAME_LEN, varDataLen(varTbName) + 1);
×
52
      name = taosMemoryMalloc(cap);
×
53
      if (name == NULL) {
×
54
        return terrno;
×
55
      }
56

57
      memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
×
58
      name[varDataLen(varTbName)] = '\0';
×
59
      if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name, groupId) && groupId != 0 &&
×
60
          stbFullName) {
61
        int32_t code = buildCtbNameAddGroupId(stbFullName, name, groupId, cap);
×
62
        if (code != TSDB_CODE_SUCCESS) {
×
63
          return code;
×
64
        }
65
      }
66
    } else if (stbFullName) {
×
67
      int32_t code = buildCtbNameByGroupId(stbFullName, groupId, &name);
×
68
      if (code) {
×
69
        return code;
×
70
      }
71
    } else {
72
      originName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
×
73
      if (originName == NULL) {
×
74
        return terrno;
×
75
      }
76

77
      if (metaGetTableNameByUid(pTq->pVnode, groupId, originName) == 0) {
×
78
        name = varDataVal(originName);
×
79
      }
80
    }
81

82
    if (!name || *name == '\0') {
×
83
      tqWarn("s-task:%s failed to build delete msg groupId:%" PRId64 ", skey:%" PRId64 " ekey:%" PRId64
×
84
             " since invalid tbname:%s",
85
             pIdStr, groupId, skey, ekey, name ? name : "NULL");
86
    } else {
87
      tqDebug("s-task:%s build delete msg groupId:%" PRId64 ", name:%s, skey:%" PRId64 " ekey:%" PRId64, pIdStr,
×
88
              groupId, name, skey, ekey);
89

90
      SSingleDeleteReq req = {.startTs = skey, .endTs = ekey};
×
91
      tstrncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
×
92
      void* p = taosArrayPush(deleteReq->deleteReqs, &req);
×
93
      if (p == NULL) {
×
94
        return terrno;
×
95
      }
96
    }
97

98
    if (originName) {
×
99
      name = originName;
×
100
    }
101

102
    taosMemoryFreeClear(name);
×
103
  }
104

105
  return 0;
×
106
}
107

108
int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, const char* stbFullName, int32_t numOfTags) {
×
109
  pCreateTableReq->flags = 0;
×
110
  pCreateTableReq->type = TSDB_CHILD_TABLE;
×
111
  pCreateTableReq->ctb.suid = suid;
×
112

113
  // set super table name
114
  SName name = {0};
×
115

116
  int32_t code = tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
×
117
  if (code == 0) {
×
118
    pCreateTableReq->ctb.stbName = taosStrdup((char*)tNameGetTableName(&name));
×
119
    if (pCreateTableReq->ctb.stbName == NULL) {  // ignore this error code
×
120
      tqError("failed to duplicate the stb name:%s, failed to init create-table msg and create req table", stbFullName);
×
121
      code = terrno;
×
122
    }
123
  }
124

125
  pCreateTableReq->ctb.tagNum = numOfTags;
×
126
  return code;
×
127
}
128

129
int32_t createDefaultTagColName(SArray** pColNameList) {
×
130
  *pColNameList = NULL;
×
131

132
  SArray* pTagColNameList = taosArrayInit(1, TSDB_COL_NAME_LEN);
×
133
  if (pTagColNameList == NULL) {
×
134
    return terrno;
×
135
  }
136

137
  char  tagNameStr[TSDB_COL_NAME_LEN] = "group_id";
×
138
  void* p = taosArrayPush(pTagColNameList, tagNameStr);
×
139
  if (p == NULL) {
×
140
    taosArrayDestroy(pTagColNameList);
×
141
    return terrno;
×
142
  }
143

144
  *pColNameList = pTagColNameList;
×
145
  return TSDB_CODE_SUCCESS;
×
146
}
147

148
int32_t setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
×
149
                                   int64_t gid, bool newSubTableRule, const char* id) {
150
  if (pDataBlock->info.parTbName[0]) {
×
151
    if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
×
152
        !alreadyAddGroupId(pDataBlock->info.parTbName, gid) && gid != 0 && stbFullName) {
×
153
      pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
×
154
      if (pCreateTableReq->name == NULL) {
×
155
        return terrno;
×
156
      }
157

158
      tstrncpy(pCreateTableReq->name, pDataBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
×
159
      int32_t code = buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid, TSDB_TABLE_NAME_LEN);
×
160
      if (code != TSDB_CODE_SUCCESS) {
×
161
        return code;
×
162
      }
163
      tqDebug("s-task:%s gen name from:%s blockdata", id, pDataBlock->info.parTbName);
×
164
    } else {
165
      pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
×
166
      if (pCreateTableReq->name == NULL) {
×
167
        return terrno;
×
168
      }
169
      tqDebug("s-task:%s copy name:%s from blockdata", id, pDataBlock->info.parTbName);
×
170
    }
171
  } else {
172
    int32_t code = buildCtbNameByGroupId(stbFullName, gid, &pCreateTableReq->name);
×
173
    tqDebug("s-task:%s no name in blockdata, auto-created table name:%s", id, pCreateTableReq->name);
×
174
    return code;
×
175
  }
176

177
  return 0;
×
178
}
179

180
// merge the new submit table block with the existed blocks
181
// if ts in the new data block overlap with existed one, replace it
182
int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, const char* id) {
×
183
  int32_t oldLen = taosArrayGetSize(pExisted->aRowP);
×
184
  int32_t newLen = taosArrayGetSize(pNew->aRowP);
×
185
  int32_t numOfPk = 0;
×
186

187
  int32_t j = 0, k = 0;
×
188
  SArray* pFinal = taosArrayInit(oldLen + newLen, POINTER_BYTES);
×
189
  if (pFinal == NULL) {
×
190
    tqError("s-task:%s failed to prepare merge result datablock, code:%s", id, tstrerror(terrno));
×
191
    return terrno;
×
192
  }
193

194
  while (j < newLen && k < oldLen) {
×
195
    SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j);
×
196
    SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k);
×
197

198
    if (pNewRow->ts < pOldRow->ts) {
×
199
      void* p = taosArrayPush(pFinal, &pNewRow);
×
200
      if (p == NULL) {
×
201
        return terrno;
×
202
      }
203
      j += 1;
×
204
    } else if (pNewRow->ts > pOldRow->ts) {
×
205
      void* p = taosArrayPush(pFinal, &pOldRow);
×
206
      if (p == NULL) {
×
207
        return terrno;
×
208
      }
209

210
      k += 1;
×
211
    } else {
212
      // check for the existance of primary key
213
      if (pNewRow->numOfPKs == 0) {
×
214
        void* p = taosArrayPush(pFinal, &pNewRow);
×
215
        if (p == NULL) {
×
216
          return terrno;
×
217
        }
218

219
        k += 1;
×
220
        j += 1;
×
221
        tRowDestroy(pOldRow);
×
222
      } else {
223
        numOfPk = pNewRow->numOfPKs;
×
224

225
        SRowKey kNew, kOld;
226
        tRowGetKey(pNewRow, &kNew);
×
227
        tRowGetKey(pOldRow, &kOld);
×
228

229
        int32_t ret = tRowKeyCompare(&kNew, &kOld);
×
230
        if (ret <= 0) {
×
231
          void* p = taosArrayPush(pFinal, &pNewRow);
×
232
          if (p == NULL) {
×
233
            return terrno;
×
234
          }
235

236
          j += 1;
×
237

238
          if (ret == 0) {
×
239
            k += 1;
×
240
            tRowDestroy(pOldRow);
×
241
          }
242
        } else {
243
          void* p = taosArrayPush(pFinal, &pOldRow);
×
244
          if (p == NULL) {
×
245
            return terrno;
×
246
          }
247

248
          k += 1;
×
249
        }
250
      }
251
    }
252
  }
253

254
  while (j < newLen) {
×
255
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++);
×
256
    void* p = taosArrayPush(pFinal, &pRow);
×
257
    if (p == NULL) {
×
258
      return terrno;
×
259
    }
260
  }
261

262
  while (k < oldLen) {
×
263
    SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++);
×
264
    void* p = taosArrayPush(pFinal, &pRow);
×
265
    if (p == NULL) {
×
266
      return terrno;
×
267
    }
268
  }
269

270
  taosArrayDestroy(pNew->aRowP);
×
271
  taosArrayDestroy(pExisted->aRowP);
×
272
  pExisted->aRowP = pFinal;
×
273

274
  tqTrace("s-task:%s rows merged, final rows:%d, pk:%d uid:%" PRId64 ", existed auto-create table:%d, new-block:%d", id,
×
275
          (int32_t)taosArrayGetSize(pFinal), numOfPk, pExisted->uid, (pExisted->pCreateTbReq != NULL),
276
          (pNew->pCreateTbReq != NULL));
277

278
  tdDestroySVCreateTbReq(pNew->pCreateTbReq);
×
279
  taosMemoryFree(pNew->pCreateTbReq);
×
280
  return TSDB_CODE_SUCCESS;
×
281
}
282

283
int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock,
×
284
                                SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq, const char* id) {
285
  *pReq = NULL;
×
286

287
  SVCreateTbReq* pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
×
288
  if (pCreateTbReq == NULL) {
×
289
    return terrno;
×
290
  }
291

292
  taosArrayClear(pTagArray);
×
293
  int32_t code = initCreateTableMsg(pCreateTbReq, suid, stbFullName, 1);
×
294
  if (code != 0) {
×
295
    return code;
×
296
  }
297

298
  STagVal tagVal = {.cid = numOfCols, .type = TSDB_DATA_TYPE_UBIGINT, .i64 = pDataBlock->info.id.groupId};
×
299
  void* p = taosArrayPush(pTagArray, &tagVal);
×
300
  if (p == NULL) {
×
301
    return terrno;
×
302
  }
303

304
  code = tTagNew(pTagArray, 1, false, (STag**)&pCreateTbReq->ctb.pTag);
×
305
  if (pCreateTbReq->ctb.pTag == NULL || (code != 0)) {
×
306
    tdDestroySVCreateTbReq(pCreateTbReq);
307
    taosMemoryFreeClear(pCreateTbReq);
×
308
    return code;
×
309
  }
310

311
  code = createDefaultTagColName(&pCreateTbReq->ctb.tagName);
×
312
  if (code) {
×
313
    return code;
×
314
  }
315

316
  // set table name
317
  code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, pDataBlock->info.id.groupId, newSubTableRule,
×
318
                                    id);
319
  if (code) {
×
320
    return code;
×
321
  }
322

323
  *pReq = pCreateTbReq;
×
324
  return code;
×
325
}
326

327
int32_t tsAscendingSortFn(const void* p1, const void* p2) {
×
328
  SRow* pRow1 = *(SRow**)p1;
×
329
  SRow* pRow2 = *(SRow**)p2;
×
330

331
  if (pRow1->ts == pRow2->ts) {
×
332
    return 0;
×
333
  } else {
334
    return pRow1->ts > pRow2->ts ? 1 : -1;
×
335
  }
336
}
337

338
int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDataBlock* pDataBlock, int64_t earlyTs,
×
339
                      const char* id) {
340
  int32_t numOfRows = pDataBlock->info.rows;
×
341
  int32_t code = TSDB_CODE_SUCCESS;
×
342

343
  SArray* pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
×
344
  pTableData->aRowP = taosArrayInit(numOfRows, sizeof(SRow*));
×
345

346
  if (pTableData->aRowP == NULL || pVals == NULL) {
×
347
    taosArrayDestroy(pTableData->aRowP);
×
348
    pTableData->aRowP = NULL;
×
349
    taosArrayDestroy(pVals);
×
350
    code = terrno;
×
351
    tqError("s-task:%s failed to prepare write stream res blocks, code:%s", id, tstrerror(code));
×
352
    return code;
×
353
  }
354

355
  for (int32_t j = 0; j < numOfRows; j++) {
×
356
    taosArrayClear(pVals);
×
357

358
    int32_t dataIndex = 0;
×
359
    int64_t ts = 0;
×
360

361
    for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
×
362
      const STColumn* pCol = &pTSchema->columns[k];
×
363

364
      // primary timestamp column, for debug purpose
365
      if (k == 0) {
×
366
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
×
367
        if (pColData == NULL) {
×
368
          continue;
×
369
        }
370

371
        ts = *(int64_t*)colDataGetData(pColData, j);
×
372
        tqTrace("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, ts);
×
373

374
        if (ts < earlyTs) {
×
375
          tqError("s-task:%s ts:%" PRId64 " of generated results out of valid time range %" PRId64 " , discarded", id,
×
376
                  ts, earlyTs);
377
          taosArrayDestroy(pTableData->aRowP);
×
378
          pTableData->aRowP = NULL;
×
379
          taosArrayDestroy(pVals);
×
380
          return TSDB_CODE_SUCCESS;
×
381
        }
382
      }
383

384
      if (IS_SET_NULL(pCol)) {
×
385
        if (pCol->flags & COL_IS_KEY) {
×
386
          qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
×
387
                 pCol->colId, pCol->type);
388
          break;
×
389
        }
390
        SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
391
        void* p = taosArrayPush(pVals, &cv);
×
392
        if (p == NULL) {
×
393
          return terrno;
×
394
        }
395
      } else {
396
        SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
×
397
        if (pColData == NULL) {
×
398
          continue;
×
399
        }
400

401
        if (colDataIsNull_s(pColData, j)) {
×
402
          if (pCol->flags & COL_IS_KEY) {
×
403
            qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
×
404
                   ts, pCol->colId, pCol->type);
405
            break;
×
406
          }
407

408
          SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
×
409
          void* p = taosArrayPush(pVals, &cv);
×
410
          if (p == NULL) {
×
411
            return terrno;
×
412
          }
413

414
          dataIndex++;
×
415
        } else {
416
          void* colData = colDataGetData(pColData, j);
×
417
          if (IS_VAR_DATA_TYPE(pCol->type)) {  // address copy, no value
×
418
            SValue sv =
×
419
                (SValue){.type = pCol->type, .nData = varDataLen(colData), .pData = (uint8_t*)varDataVal(colData)};
×
420
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
421
            void* p = taosArrayPush(pVals, &cv);
×
422
            if (p == NULL) {
×
423
              return terrno;
×
424
            }
425
          } else {
426
            SValue sv = {.type = pCol->type};
×
427
            valueSetDatum(&sv, pCol->type, colData, tDataTypes[pCol->type].bytes);
×
428
            SColVal cv = COL_VAL_VALUE(pCol->colId, sv);
×
429
            void* p = taosArrayPush(pVals, &cv);
×
430
            if (p == NULL) {
×
431
              return terrno;
×
432
            }
433
          }
434
          dataIndex++;
×
435
        }
436
      }
437
    }
438

439
    SRow* pRow = NULL;
×
440
    code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
×
441
    if (code != TSDB_CODE_SUCCESS) {
×
442
      tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
×
443
      taosArrayDestroy(pVals);
×
444
      tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
×
445
      return code;
×
446
    }
447

448
    void* p = taosArrayPush(pTableData->aRowP, &pRow);
×
449
    if (p == NULL) {
×
450
      return terrno;
×
451
    }
452
  }
453

454
  taosArrayDestroy(pVals);
×
455
  return TSDB_CODE_SUCCESS;
×
456
}
457

458
int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock,
×
459
                                 SSubmitTbData* pTableData, int64_t earlyTs, const char* id) {
460
  int32_t numOfRows = pDataBlock->info.rows;
×
461
  char*   dstTableName = pDataBlock->info.parTbName;
×
462

463
  tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, id,
×
464
          blockIndex + 1, numOfRows, suid);
465

466
  // convert all rows
467
  int32_t code = doConvertRows(pTableData, pTSchema, pDataBlock, earlyTs, id);
×
468
  if (code != TSDB_CODE_SUCCESS) {
×
469
    tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno));
×
470
    return code;
×
471
  }
472

473
  if (pTableData->aRowP != NULL) {
×
474
    taosArraySort(pTableData->aRowP, tsAscendingSortFn);
×
475
    tqTrace("s-task:%s build submit msg for dstTable:%s, numOfRows:%d", id, dstTableName, numOfRows);
×
476
  }
477

478
  return code;
×
479
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc