• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3911

24 Apr 2025 11:36PM UTC coverage: 53.735% (-1.6%) from 55.295%
#3911

push

travis-ci

happyguoxy
Sync branches at 2025-04-25 07:35

170049 of 316459 relevant lines covered (53.73%)

1192430.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.45
/source/libs/executor/src/dataSinkMgt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "dataSinkMgt.h"
17
#include "dataSinkInt.h"
18
#include "planner.h"
19
#include "tarray.h"
20

21
SDataSinkStat gDataSinkStat = {0};
22

23
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) {
2,155✔
24
  SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager));
2,155✔
25
  if (NULL == pSinkManager) {
2,154✔
26
    return terrno;
×
27
  }
28

29
  pSinkManager->cfg = *cfg;
2,154✔
30
  pSinkManager->pAPI = pAPI;
2,154✔
31

32
  *ppSinkManager = pSinkManager;
2,154✔
33
  return TSDB_CODE_SUCCESS;  // to avoid compiler eror
2,154✔
34
}
35

36
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
6✔
37
  pStat->cachedSize = atomic_load_64(&gDataSinkStat.cachedSize);
6✔
38

39
  return TSDB_CODE_SUCCESS;
6✔
40
}
41

42
int32_t dsCreateDataSinker(void* pSinkManager, SDataSinkNode** ppDataSink, DataSinkHandle* pHandle, void* pParam, const char* id, bool processOneBlock) {
2,155✔
43
  SDataSinkManager* pManager = pSinkManager;
2,155✔
44
  switch ((int)nodeType(*ppDataSink)) {
2,155✔
45
    case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
2,155✔
46
      return createDataDispatcher(pManager, ppDataSink, pHandle, processOneBlock);
2,155✔
47
    case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
×
48
      return createDataDeleter(pManager, ppDataSink, pHandle, pParam);
×
49
    }
50
    case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
×
51
      return createDataInserter(pManager, ppDataSink, pHandle, pParam);
×
52
    }
53
    default:
×
54
      break;
×
55
  }
56

57
  taosMemoryFree(pSinkManager);
×
58
  qError("invalid input node type:%d, %s", nodeType(*ppDataSink), id);
×
59
  
60
  return TSDB_CODE_QRY_INVALID_INPUT;
×
61
}
62

63
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
5,853✔
64
  SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
5,853✔
65
  return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
5,853✔
66
}
67

68
void dsEndPut(DataSinkHandle handle, uint64_t useconds) {
2,007✔
69
  SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
2,007✔
70
  return pHandleImpl->fEndPut(pHandleImpl, useconds);
2,007✔
71
}
72

73
void dsReset(DataSinkHandle handle) {
×
74
  SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
×
75
  if (pHandleImpl->fReset) {
×
76
    return pHandleImpl->fReset(pHandleImpl);
×
77
  }
78
}
79

80
void dsGetDataLength(DataSinkHandle handle, int64_t* pLen, int64_t* pRawLen, bool* pQueryEnd) {
6,400✔
81
  SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
6,400✔
82
  pHandleImpl->fGetLen(pHandleImpl, pLen, pRawLen, pQueryEnd);
6,400✔
83
}
6,398✔
84

85
int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
5,983✔
86
  SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
5,983✔
87
  return pHandleImpl->fGetData(pHandleImpl, pOutput);
5,983✔
88
}
89

90
int32_t dsGetCacheSize(DataSinkHandle handle, uint64_t* pSize) {
×
91
  SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
×
92
  return pHandleImpl->fGetCacheSize(pHandleImpl, pSize);
×
93
}
94

95
void dsScheduleProcess(void* ahandle, void* pItem) {
×
96
  // todo
97
}
×
98

99
void dsDestroyDataSinker(DataSinkHandle handle) {
2,155✔
100
  SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
2,155✔
101
  (void)pHandleImpl->fDestroy(pHandleImpl);
2,155✔
102
  taosMemoryFree(pHandleImpl);
2,156✔
103
}
2,155✔
104

105
int32_t dsGetSinkFlags(DataSinkHandle handle, uint64_t* pFlags) {
3,548✔
106
  SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
3,548✔
107
  return pHandleImpl->fGetFlags(pHandleImpl, pFlags);
3,548✔
108
}
109

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc