• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4829

30 Oct 2025 09:25AM UTC coverage: 49.734% (-11.3%) from 61.071%
#4829

push

travis-ci

web-flow
Merge pull request #33435 from taosdata/3.0

merge 3.0

123072 of 323930 branches covered (37.99%)

Branch coverage included in aggregate %.

7 of 25 new or added lines in 3 files covered. (28.0%)

35232 existing lines in 327 files now uncovered.

172062 of 269495 relevant lines covered (63.85%)

70709785.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.58
/source/libs/executor/src/dataDeleter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dataSinkInt.h"
17
#include "dataSinkMgt.h"
18
#include "executorInt.h"
19
#include "planner.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24

25
extern SDataSinkStat gDataSinkStat;
26

27
typedef struct SDataDeleterBuf {
28
  int32_t useSize;
29
  int32_t allocSize;
30
  char*   pData;
31
} SDataDeleterBuf;
32

33
typedef struct SDataCacheEntry {
34
  int32_t dataLen;
35
  int32_t numOfRows;
36
  int32_t numOfCols;
37
  int8_t  compressed;
38
  char    data[];
39
} SDataCacheEntry;
40

41
typedef struct SDataDeleterHandle {
42
  SDataSinkHandle     sink;
43
  SDataSinkManager*   pManager;
44
  SDataSinkNode*      pSinkNode;
45
  SDataBlockDescNode* pSchema;
46
  SDataDeleterNode*   pDeleter;
47
  SDeleterParam*      pParam;
48
  STaosQueue*         pDataBlocks;
49
  SDataDeleterBuf     nextOutput;
50
  int32_t             status;
51
  bool                queryEnd;
52
  uint64_t            useconds;
53
  uint64_t            cachedSize;
54
  uint64_t            flags;
55
  TdThreadMutex       mutex;
56
} SDataDeleterHandle;
57

58
static int32_t toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) {
235,762✔
59
  int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
235,762!
60

61
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
235,762✔
62
  pEntry->compressed = 0;
235,762✔
63
  pEntry->numOfRows = pInput->pData->info.rows;
235,762✔
64
  pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
235,762✔
65
  pEntry->dataLen = sizeof(SDeleterRes);
235,762✔
66

67
  pBuf->useSize = sizeof(SDataCacheEntry);
235,762✔
68

69
  SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
235,762✔
70
  if (NULL == pColRes) {
235,762!
71
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
72
  }
73
  SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1);
235,762✔
74
  if (NULL == pColSKey) {
235,762!
75
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
76
  }
77
  SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2);
235,762✔
78
  if (NULL == pColEKey) {
235,762!
79
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
80
  }
81

82
  SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
235,762✔
83
  pRes->suid = pHandle->pParam->suid;
235,762✔
84
  pRes->uidList = pHandle->pParam->pUidList;
235,762✔
85
  TAOS_STRCPY(pRes->tableName, pHandle->pDeleter->tableFName);
235,762!
86
  TAOS_STRCPY(pRes->tsColName, pHandle->pDeleter->tsColName);
235,762!
87
  pRes->affectedRows = *(int64_t*)pColRes->pData;
235,762✔
88

89
  if (pRes->affectedRows) {
235,762✔
90
    pRes->skey = *(int64_t*)pColSKey->pData;
234,681✔
91
    pRes->ekey = *(int64_t*)pColEKey->pData;
234,681✔
92
    if (pRes->skey > pRes->ekey) {
234,681!
93
      qError("data delter skey:%" PRId64 " is bigger than ekey:%" PRId64, pRes->skey, pRes->ekey);
×
94
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
95
    }
96
  } else {
97
    pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
1,081✔
98
    pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
1,081✔
99
  }
100

101
  qDebug("delete %" PRId64 " rows, from %" PRId64 " to %" PRId64, pRes->affectedRows, pRes->skey, pRes->ekey);
235,762✔
102

103
  pBuf->useSize += pEntry->dataLen;
235,762✔
104

105
  (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
235,762✔
106
  (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
235,762✔
107

108
  return TSDB_CODE_SUCCESS;
235,762✔
109
}
110

111
static int32_t allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
235,762✔
112
  uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
235,762✔
113
  if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
235,762!
114
    qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
×
115
           taosQueueItemSize(pDeleter->pDataBlocks));
116
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
117
  }
118

119
  pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes);
235,762✔
120

121
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
235,762!
122
  if (pBuf->pData == NULL) {
235,762!
123
    qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(ERRNO));
×
124
    return terrno;
×
125
  }
126

127
  return TSDB_CODE_SUCCESS;
235,762✔
128
}
129

130
static int32_t updateStatus(SDataDeleterHandle* pDeleter) {
471,524✔
131
  (void)taosThreadMutexLock(&pDeleter->mutex);
471,524✔
132
  int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks);
471,524✔
133
  int32_t status =
471,524✔
134
      (0 == blockNums ? DS_BUF_EMPTY
135
                      : (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
471,524!
136
  pDeleter->status = status;
471,524✔
137
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
471,524✔
138
  
139
  return status;
471,524✔
140
}
141

142
static int32_t getStatus(SDataDeleterHandle* pDeleter) {
×
143
  (void)taosThreadMutexLock(&pDeleter->mutex);
×
144
  int32_t status = pDeleter->status;
×
145
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
×
146
  
147
  return status;
×
148
}
149

150
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
235,762✔
151
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
235,762✔
152
  SDataDeleterBuf*    pBuf = NULL;
235,762✔
153

154
  int32_t code = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0, (void**)&pBuf);
235,762✔
155
  if (code) {
235,762!
156
    return code;
×
157
  }
158

159
  code = allocBuf(pDeleter, pInput, pBuf);
235,762✔
160
  if (code) {
235,762!
161
    taosFreeQitem(pBuf);
×
162
    return code;
×
163
  }
164

165
  QRY_ERR_JRET(toDataCacheEntry(pDeleter, pInput, pBuf));
235,762!
166
  QRY_ERR_JRET(taosWriteQitem(pDeleter->pDataBlocks, pBuf));
235,762!
167
  *pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
235,762✔
168

169
  return TSDB_CODE_SUCCESS;
235,762✔
170

171
_return:
×
172

173
  taosFreeQitem(pBuf);
×
174
  
175
  return code;
×
176
}
177

178
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
235,762✔
179
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
235,762✔
180
  (void)taosThreadMutexLock(&pDeleter->mutex);
235,762✔
181
  pDeleter->queryEnd = true;
235,762✔
182
  pDeleter->useconds = useconds;
235,762✔
183
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
235,762✔
184
}
235,762✔
185

186
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
235,762✔
187
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
235,762✔
188
  if (taosQueueEmpty(pDeleter->pDataBlocks)) {
235,762!
UNCOV
189
    *pQueryEnd = pDeleter->queryEnd;
×
UNCOV
190
    *pLen = 0;
×
UNCOV
191
    return;
×
192
  }
193

194
  SDataDeleterBuf* pBuf = NULL;
235,762✔
195
  taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
235,762✔
196
  if (pBuf != NULL) {
235,762!
197
    TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
235,762✔
198
    taosFreeQitem(pBuf);
235,762✔
199
  }
200

201
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
235,762✔
202
  *pLen = pEntry->dataLen;
235,762✔
203
  *pRawLen = pEntry->dataLen;
235,762✔
204

205
  *pQueryEnd = pDeleter->queryEnd;
235,762!
206
  qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
235,762✔
207
         ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
208
}
209

210
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
235,762✔
211
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
235,762✔
212
  if (NULL == pDeleter->nextOutput.pData) {
235,762!
213
    if (!pDeleter->queryEnd) {
×
214
      qError("empty res while query not end in data deleter");
×
215
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
216
    }
217
    pOutput->useconds = pDeleter->useconds;
×
218
    pOutput->precision = pDeleter->pSchema->precision;
×
219
    pOutput->bufStatus = DS_BUF_EMPTY;
×
220
    pOutput->queryEnd = pDeleter->queryEnd;
×
221
    return TSDB_CODE_SUCCESS;
×
222
  }
223

224
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
235,762✔
225
  TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
235,762!
226
  pDeleter->pParam->pUidList = NULL;
235,762✔
227
  pOutput->numOfRows = pEntry->numOfRows;
235,762✔
228
  pOutput->numOfCols = pEntry->numOfCols;
235,762✔
229
  pOutput->compressed = pEntry->compressed;
235,762✔
230

231
  (void)atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
235,762✔
232
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
235,762✔
233

234
  taosMemoryFreeClear(pDeleter->nextOutput.pData);  // todo persistent
235,762!
235
  pOutput->bufStatus = updateStatus(pDeleter);
235,762✔
236
  (void)taosThreadMutexLock(&pDeleter->mutex);
235,762✔
237
  pOutput->queryEnd = pDeleter->queryEnd;
235,762!
238
  pOutput->useconds = pDeleter->useconds;
235,762✔
239
  pOutput->precision = pDeleter->pSchema->precision;
235,762✔
240
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
235,762✔
241

242
  return TSDB_CODE_SUCCESS;
235,762✔
243
}
244

245
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
235,762✔
246
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
235,762✔
247
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
235,762✔
248
  taosMemoryFreeClear(pDeleter->nextOutput.pData);
235,762!
249
  taosArrayDestroy(pDeleter->pParam->pUidList);
235,762✔
250
  taosMemoryFree(pDeleter->pParam);
235,762!
251
  while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
235,762!
252
    SDataDeleterBuf* pBuf = NULL;
×
253
    taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
×
254

255
    if (pBuf != NULL) {
×
256
      taosMemoryFreeClear(pBuf->pData);
×
257
      taosFreeQitem(pBuf);
×
258
    }
259
  }
260
  taosCloseQueue(pDeleter->pDataBlocks);
235,762✔
261
  (void)taosThreadMutexDestroy(&pDeleter->mutex);
235,762✔
262
  nodesDestroyNode((SNode*)pDeleter->pSinkNode);
235,762✔
263
  pDeleter->pSinkNode = NULL;
235,762✔
264

265
  taosMemoryFree(pDeleter->pManager);
235,762!
266

267
  return TSDB_CODE_SUCCESS;
235,762✔
268
}
269

270
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
271
  SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle;
×
272

273
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
274
  return TSDB_CODE_SUCCESS;
×
275
}
276

277

278
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
235,762✔
279
  SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle;
235,762✔
280

281
  *pFlags = atomic_load_64(&pDispatcher->flags);
235,762✔
282
  return TSDB_CODE_SUCCESS;
235,762✔
283
}
284

285

286
int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
235,686✔
287
                          void* pParam) {
288
  SDataSinkNode* pDataSink = *ppDataSink;
235,686✔
289
  int32_t code = TSDB_CODE_SUCCESS;
235,762✔
290
  if (pParam == NULL) {
235,762!
291
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
292
    qError("invalid input param in creating data deleter, code%s", tstrerror(code));
×
293
    goto _end;
×
294
  }
295

296
  SDeleterParam* pDeleterParam = (SDeleterParam*)pParam;
235,762✔
297

298
  SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
235,762!
299
  if (NULL == deleter) {
235,762!
300
    code = terrno;
×
301
    taosArrayDestroy(pDeleterParam->pUidList);
×
302
    taosMemoryFree(pParam);
×
303
    goto _end;
×
304
  }
305

306
  SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink;
235,762✔
307
  deleter->sink.fPut = putDataBlock;
235,762✔
308
  deleter->sink.fEndPut = endPut;
235,762✔
309
  deleter->sink.fGetLen = getDataLength;
235,762✔
310
  deleter->sink.fGetData = getDataBlock;
235,762✔
311
  deleter->sink.fDestroy = destroyDataSinker;
235,762✔
312
  deleter->sink.fGetCacheSize = getCacheSize;
235,762✔
313
  deleter->sink.fGetFlags = getSinkFlags;
235,762✔
314
  deleter->pManager = pManager;
235,762✔
315
  deleter->pDeleter = pDeleterNode;
235,762✔
316
  deleter->pSchema = pDataSink->pInputDataBlockDesc;
235,762✔
317
  deleter->pSinkNode = pDataSink;
235,762✔
318
  *ppDataSink = NULL;
235,762✔
319

320
  deleter->pParam = pParam;
235,762✔
321
  deleter->status = DS_BUF_EMPTY;
235,762✔
322
  deleter->queryEnd = false;
235,762✔
323
  code = taosOpenQueue(&deleter->pDataBlocks);
235,762✔
324
  if (code) {
235,762!
325
    goto _end;
×
326
  }
327
  deleter->flags = DS_FLAG_USE_MEMPOOL;
235,762✔
328
  code = taosThreadMutexInit(&deleter->mutex, NULL);
235,762✔
329
  if (code) {
235,762!
330
    goto _end;
×
331
  }
332

333
  *pHandle = deleter;
235,762✔
334
  return code;
235,762✔
335

336
_end:
×
337

338
  if (deleter != NULL) {
×
339
    (void)destroyDataSinker((SDataSinkHandle*)deleter);
×
340
    taosMemoryFree(deleter);
×
341
  } else {
342
    taosMemoryFree(pManager);
×
343
  }
344

345
  nodesDestroyNode((SNode *)*ppDataSink);
×
346
  *ppDataSink = NULL;
×
347
  
348
  return code;
×
349
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc