• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3796

31 Mar 2025 10:39AM UTC coverage: 30.372% (-7.1%) from 37.443%
#3796

push

travis-ci

happyguoxy
test:add test cases

69287 of 309062 branches covered (22.42%)

Branch coverage included in aggregate %.

118044 of 307720 relevant lines covered (38.36%)

278592.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/libs/executor/src/dataDeleter.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dataSinkInt.h"
17
#include "dataSinkMgt.h"
18
#include "executorInt.h"
19
#include "planner.h"
20
#include "tcompression.h"
21
#include "tdatablock.h"
22
#include "tglobal.h"
23
#include "tqueue.h"
24

25
extern SDataSinkStat gDataSinkStat;
26

27
typedef struct SDataDeleterBuf {
28
  int32_t useSize;
29
  int32_t allocSize;
30
  char*   pData;
31
} SDataDeleterBuf;
32

33
typedef struct SDataCacheEntry {
34
  int32_t dataLen;
35
  int32_t numOfRows;
36
  int32_t numOfCols;
37
  int8_t  compressed;
38
  char    data[];
39
} SDataCacheEntry;
40

41
typedef struct SDataDeleterHandle {
42
  SDataSinkHandle     sink;
43
  SDataSinkManager*   pManager;
44
  SDataSinkNode*      pSinkNode;
45
  SDataBlockDescNode* pSchema;
46
  SDataDeleterNode*   pDeleter;
47
  SDeleterParam*      pParam;
48
  STaosQueue*         pDataBlocks;
49
  SDataDeleterBuf     nextOutput;
50
  int32_t             status;
51
  bool                queryEnd;
52
  uint64_t            useconds;
53
  uint64_t            cachedSize;
54
  uint64_t            flags;
55
  TdThreadMutex       mutex;
56
} SDataDeleterHandle;
57

58
static int32_t toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) {
×
59
  int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
×
60

61
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
×
62
  pEntry->compressed = 0;
×
63
  pEntry->numOfRows = pInput->pData->info.rows;
×
64
  pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock);
×
65
  pEntry->dataLen = sizeof(SDeleterRes);
×
66

67
  pBuf->useSize = sizeof(SDataCacheEntry);
×
68

69
  SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0);
×
70
  if (NULL == pColRes) {
×
71
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
72
  }
73
  SColumnInfoData* pColSKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 1);
×
74
  if (NULL == pColSKey) {
×
75
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
76
  }
77
  SColumnInfoData* pColEKey = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 2);
×
78
  if (NULL == pColEKey) {
×
79
    QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
80
  }
81

82
  SDeleterRes* pRes = (SDeleterRes*)pEntry->data;
×
83
  pRes->suid = pHandle->pParam->suid;
×
84
  pRes->uidList = pHandle->pParam->pUidList;
×
85
  TAOS_STRCPY(pRes->tableName, pHandle->pDeleter->tableFName);
×
86
  TAOS_STRCPY(pRes->tsColName, pHandle->pDeleter->tsColName);
×
87
  pRes->affectedRows = *(int64_t*)pColRes->pData;
×
88

89
  if (pRes->affectedRows) {
×
90
    pRes->skey = *(int64_t*)pColSKey->pData;
×
91
    pRes->ekey = *(int64_t*)pColEKey->pData;
×
92
    if (pRes->skey > pRes->ekey) {
×
93
      qError("data delter skey:%" PRId64 " is bigger than ekey:%" PRId64, pRes->skey, pRes->ekey);
×
94
      QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
95
    }
96
  } else {
97
    pRes->skey = pHandle->pDeleter->deleteTimeRange.skey;
×
98
    pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey;
×
99
  }
100

101
  qDebug("delete %" PRId64 " rows, from %" PRId64 " to %" PRId64, pRes->affectedRows, pRes->skey, pRes->ekey);
×
102

103
  pBuf->useSize += pEntry->dataLen;
×
104

105
  (void)atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen);
×
106
  (void)atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
×
107

108
  return TSDB_CODE_SUCCESS;
×
109
}
110

111
static int32_t allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) {
×
112
  uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery;
×
113
  if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) {
×
114
    qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
×
115
           taosQueueItemSize(pDeleter->pDataBlocks));
116
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
117
  }
118

119
  pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes);
×
120

121
  pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
×
122
  if (pBuf->pData == NULL) {
×
123
    qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(ERRNO));
×
124
    return terrno;
×
125
  }
126

127
  return TSDB_CODE_SUCCESS;
×
128
}
129

130
static int32_t updateStatus(SDataDeleterHandle* pDeleter) {
×
131
  (void)taosThreadMutexLock(&pDeleter->mutex);
×
132
  int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks);
×
133
  int32_t status =
×
134
      (0 == blockNums ? DS_BUF_EMPTY
135
                      : (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
×
136
  pDeleter->status = status;
×
137
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
×
138
  
139
  return status;
×
140
}
141

142
static int32_t getStatus(SDataDeleterHandle* pDeleter) {
×
143
  (void)taosThreadMutexLock(&pDeleter->mutex);
×
144
  int32_t status = pDeleter->status;
×
145
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
×
146
  
147
  return status;
×
148
}
149

150
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
×
151
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
×
152
  SDataDeleterBuf*    pBuf = NULL;
×
153

154
  int32_t code = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0, (void**)&pBuf);
×
155
  if (code) {
×
156
    return code;
×
157
  }
158

159
  code = allocBuf(pDeleter, pInput, pBuf);
×
160
  if (code) {
×
161
    taosFreeQitem(pBuf);
×
162
    return code;
×
163
  }
164

165
  QRY_ERR_JRET(toDataCacheEntry(pDeleter, pInput, pBuf));
×
166
  QRY_ERR_JRET(taosWriteQitem(pDeleter->pDataBlocks, pBuf));
×
167
  *pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
×
168

169
  return TSDB_CODE_SUCCESS;
×
170

171
_return:
×
172

173
  taosFreeQitem(pBuf);
×
174
  
175
  return code;
×
176
}
177

178
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
×
179
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
×
180
  (void)taosThreadMutexLock(&pDeleter->mutex);
×
181
  pDeleter->queryEnd = true;
×
182
  pDeleter->useconds = useconds;
×
183
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
×
184
}
×
185

186
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
×
187
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
×
188
  if (taosQueueEmpty(pDeleter->pDataBlocks)) {
×
189
    *pQueryEnd = pDeleter->queryEnd;
×
190
    *pLen = 0;
×
191
    return;
×
192
  }
193

194
  SDataDeleterBuf* pBuf = NULL;
×
195
  taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
×
196
  if (pBuf != NULL) {
×
197
    TAOS_MEMCPY(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
×
198
    taosFreeQitem(pBuf);
×
199
  }
200

201
  SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData;
×
202
  *pLen = pEntry->dataLen;
×
203
  *pRawLen = pEntry->dataLen;
×
204

205
  *pQueryEnd = pDeleter->queryEnd;
×
206
  qDebug("got data len %" PRId64 ", row num %d in sink", *pLen,
×
207
         ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
208
}
209

210
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
×
211
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
×
212
  if (NULL == pDeleter->nextOutput.pData) {
×
213
    if (!pDeleter->queryEnd) {
×
214
      qError("empty res while query not end in data deleter");
×
215
      return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
216
    }
217
    pOutput->useconds = pDeleter->useconds;
×
218
    pOutput->precision = pDeleter->pSchema->precision;
×
219
    pOutput->bufStatus = DS_BUF_EMPTY;
×
220
    pOutput->queryEnd = pDeleter->queryEnd;
×
221
    return TSDB_CODE_SUCCESS;
×
222
  }
223

224
  SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData);
×
225
  TAOS_MEMCPY(pOutput->pData, pEntry->data, pEntry->dataLen);
×
226
  pDeleter->pParam->pUidList = NULL;
×
227
  pOutput->numOfRows = pEntry->numOfRows;
×
228
  pOutput->numOfCols = pEntry->numOfCols;
×
229
  pOutput->compressed = pEntry->compressed;
×
230

231
  (void)atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen);
×
232
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen);
×
233

234
  taosMemoryFreeClear(pDeleter->nextOutput.pData);  // todo persistent
×
235
  pOutput->bufStatus = updateStatus(pDeleter);
×
236
  (void)taosThreadMutexLock(&pDeleter->mutex);
×
237
  pOutput->queryEnd = pDeleter->queryEnd;
×
238
  pOutput->useconds = pDeleter->useconds;
×
239
  pOutput->precision = pDeleter->pSchema->precision;
×
240
  (void)taosThreadMutexUnlock(&pDeleter->mutex);
×
241

242
  return TSDB_CODE_SUCCESS;
×
243
}
244

245
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
×
246
  SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
×
247
  (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize);
×
248
  taosMemoryFreeClear(pDeleter->nextOutput.pData);
×
249
  taosArrayDestroy(pDeleter->pParam->pUidList);
×
250
  taosMemoryFree(pDeleter->pParam);
×
251
  while (!taosQueueEmpty(pDeleter->pDataBlocks)) {
×
252
    SDataDeleterBuf* pBuf = NULL;
×
253
    taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
×
254

255
    if (pBuf != NULL) {
×
256
      taosMemoryFreeClear(pBuf->pData);
×
257
      taosFreeQitem(pBuf);
×
258
    }
259
  }
260
  taosCloseQueue(pDeleter->pDataBlocks);
×
261
  (void)taosThreadMutexDestroy(&pDeleter->mutex);
×
262
  nodesDestroyNode((SNode*)pDeleter->pSinkNode);
×
263
  pDeleter->pSinkNode = NULL;
×
264

265
  taosMemoryFree(pDeleter->pManager);
×
266

267
  return TSDB_CODE_SUCCESS;
×
268
}
269

270
static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
×
271
  SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle;
×
272

273
  *size = atomic_load_64(&pDispatcher->cachedSize);
×
274
  return TSDB_CODE_SUCCESS;
×
275
}
276

277

278
static int32_t getSinkFlags(struct SDataSinkHandle* pHandle, uint64_t* pFlags) {
×
279
  SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle;
×
280

281
  *pFlags = atomic_load_64(&pDispatcher->flags);
×
282
  return TSDB_CODE_SUCCESS;
×
283
}
284

285

286
int32_t createDataDeleter(SDataSinkManager* pManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle,
×
287
                          void* pParam) {
288
  SDataSinkNode* pDataSink = *ppDataSink;
×
289
  int32_t code = TSDB_CODE_SUCCESS;
×
290
  if (pParam == NULL) {
×
291
    code = TSDB_CODE_QRY_INVALID_INPUT;
×
292
    qError("invalid input param in creating data deleter, code%s", tstrerror(code));
×
293
    goto _end;
×
294
  }
295

296
  SDeleterParam* pDeleterParam = (SDeleterParam*)pParam;
×
297

298
  SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
×
299
  if (NULL == deleter) {
×
300
    code = terrno;
×
301
    taosArrayDestroy(pDeleterParam->pUidList);
×
302
    taosMemoryFree(pParam);
×
303
    goto _end;
×
304
  }
305

306
  SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink;
×
307
  deleter->sink.fPut = putDataBlock;
×
308
  deleter->sink.fEndPut = endPut;
×
309
  deleter->sink.fGetLen = getDataLength;
×
310
  deleter->sink.fGetData = getDataBlock;
×
311
  deleter->sink.fDestroy = destroyDataSinker;
×
312
  deleter->sink.fGetCacheSize = getCacheSize;
×
313
  deleter->sink.fGetFlags = getSinkFlags;
×
314
  deleter->pManager = pManager;
×
315
  deleter->pDeleter = pDeleterNode;
×
316
  deleter->pSchema = pDataSink->pInputDataBlockDesc;
×
317
  deleter->pSinkNode = pDataSink;
×
318
  *ppDataSink = NULL;
×
319

320
  deleter->pParam = pParam;
×
321
  deleter->status = DS_BUF_EMPTY;
×
322
  deleter->queryEnd = false;
×
323
  code = taosOpenQueue(&deleter->pDataBlocks);
×
324
  if (code) {
×
325
    goto _end;
×
326
  }
327
  deleter->flags = DS_FLAG_USE_MEMPOOL;
×
328
  code = taosThreadMutexInit(&deleter->mutex, NULL);
×
329
  if (code) {
×
330
    goto _end;
×
331
  }
332

333
  *pHandle = deleter;
×
334
  return code;
×
335

336
_end:
×
337

338
  if (deleter != NULL) {
×
339
    (void)destroyDataSinker((SDataSinkHandle*)deleter);
×
340
    taosMemoryFree(deleter);
×
341
  } else {
342
    taosMemoryFree(pManager);
×
343
  }
344

345
  nodesDestroyNode((SNode *)*ppDataSink);
×
346
  *ppDataSink = NULL;
×
347
  
348
  return code;
×
349
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc