• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/snode/src/snode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "rsync.h"
18
#include "sndInt.h"
19
#include "tqCommon.h"
20
#include "tuuid.h"
21

22
// clang-format off
23
#define sndError(...) do {  if (sndDebugFlag & DEBUG_ERROR) { taosPrintLog("SND ERROR ", DEBUG_ERROR, sndDebugFlag, __VA_ARGS__);}} while (0)
24
#define sndInfo(...)  do {  if (sndDebugFlag & DEBUG_INFO)  { taosPrintLog("SND INFO  ", DEBUG_INFO,  sndDebugFlag, __VA_ARGS__);}} while (0)
25
#define sndDebug(...) do {  if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND DEBUG ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
26
// clang-format on
27

UNCOV
28
int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
×
UNCOV
29
  if (!(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0)) {
×
30
    return TSDB_CODE_INVALID_PARA;
×
31
  }
UNCOV
32
  int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
×
UNCOV
33
  if (code != TSDB_CODE_SUCCESS) {
×
34
    return code;
×
35
  }
36

UNCOV
37
  pTask->pBackend = NULL;
×
UNCOV
38
  streamTaskOpenAllUpstreamInput(pTask);
×
39

UNCOV
40
  streamTaskResetUpstreamStageInfo(pTask);
×
41

UNCOV
42
  SCheckpointInfo *pChkInfo = &pTask->chkInfo;
×
UNCOV
43
  tqSetRestoreVersionInfo(pTask);
×
44

UNCOV
45
  char *p = streamTaskGetStatus(pTask).name;
×
UNCOV
46
  if (pTask->info.fillHistory) {
×
UNCOV
47
    sndInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
×
48
            " nextProcessVer:%" PRId64
49
            " child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
50
            SNODE_HANDLE, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
51
            pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
52
            (int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam);
53
  } else {
UNCOV
54
    sndInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
×
55
            " nextProcessVer:%" PRId64
56
            " child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
57
            SNODE_HANDLE, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
58
            pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
59
            (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam);
60
  }
UNCOV
61
  return 0;
×
62
}
63

UNCOV
64
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
×
UNCOV
65
  int32_t code = 0;
×
UNCOV
66
  SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
×
UNCOV
67
  if (pSnode == NULL) {
×
68
    return NULL;
×
69
  }
70

UNCOV
71
  stopRsync();
×
UNCOV
72
  code = startRsync();
×
UNCOV
73
  if (code != 0) {
×
74
    terrno = code;
×
75
    goto _ERR;
×
76
  }
77

UNCOV
78
  pSnode->msgCb = pOption->msgCb;
×
UNCOV
79
  code = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE,
×
80
                        taosGetTimestampMs(), tqStartTaskCompleteCallback, &pSnode->pMeta);
UNCOV
81
  if (code != TSDB_CODE_SUCCESS) {
×
82
    terrno = code;
×
83
    goto _ERR;
×
84
  }
85

UNCOV
86
  streamMetaLoadAllTasks(pSnode->pMeta);
×
UNCOV
87
  return pSnode;
×
88

89
_ERR:
×
90
  taosMemoryFree(pSnode);
×
91
  return NULL;
×
92
}
93

UNCOV
94
int32_t sndInit(SSnode *pSnode) {
×
UNCOV
95
  if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false) != 0) {
×
96
    sndError("failed to start all tasks");
×
97
  }
UNCOV
98
  return 0;
×
99
}
100

UNCOV
101
void sndClose(SSnode *pSnode) {
×
UNCOV
102
  stopRsync();
×
UNCOV
103
  streamMetaNotifyClose(pSnode->pMeta);
×
UNCOV
104
  if (streamMetaCommit(pSnode->pMeta) != 0) {
×
105
    sndError("failed to commit stream meta");
×
106
  }
UNCOV
107
  streamMetaClose(pSnode->pMeta);
×
UNCOV
108
  taosMemoryFree(pSnode);
×
UNCOV
109
}
×
110

UNCOV
111
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
×
UNCOV
112
  switch (pMsg->msgType) {
×
UNCOV
113
    case TDMT_STREAM_TASK_RUN:
×
UNCOV
114
      return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true);
×
UNCOV
115
    case TDMT_STREAM_TASK_DISPATCH:
×
UNCOV
116
      return tqStreamTaskProcessDispatchReq(pSnode->pMeta, pMsg);
×
UNCOV
117
    case TDMT_STREAM_TASK_DISPATCH_RSP:
×
UNCOV
118
      return tqStreamTaskProcessDispatchRsp(pSnode->pMeta, pMsg);
×
119
    case TDMT_STREAM_RETRIEVE:
×
120
      return tqStreamTaskProcessRetrieveReq(pSnode->pMeta, pMsg);
×
UNCOV
121
    case TDMT_STREAM_RETRIEVE_RSP:  // 1036
×
UNCOV
122
      break;
×
UNCOV
123
    case TDMT_VND_STREAM_TASK_CHECK:
×
UNCOV
124
      return tqStreamTaskProcessCheckReq(pSnode->pMeta, pMsg);
×
UNCOV
125
    case TDMT_VND_STREAM_TASK_CHECK_RSP:
×
UNCOV
126
      return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true);
×
UNCOV
127
    case TDMT_STREAM_TASK_CHECKPOINT_READY:
×
UNCOV
128
      return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
×
UNCOV
129
    case TDMT_MND_STREAM_HEARTBEAT_RSP:
×
UNCOV
130
      return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg);
×
UNCOV
131
    case TDMT_MND_STREAM_REQ_CHKPT_RSP:
×
UNCOV
132
      return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg);
×
UNCOV
133
    case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
×
UNCOV
134
      return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg);
×
UNCOV
135
    case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
×
UNCOV
136
      return tqStreamProcessChkptReportRsp(pSnode->pMeta, pMsg);
×
137
    case TDMT_STREAM_RETRIEVE_TRIGGER:
×
138
      return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
×
139
    case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
×
140
      return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg);
×
UNCOV
141
    case TDMT_STREAM_CHKPT_EXEC:
×
UNCOV
142
      return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true);
×
143
    default:
×
144
      sndError("invalid snode msg:%d", pMsg->msgType);
×
145
      return TSDB_CODE_INVALID_MSG;
×
146
  }
UNCOV
147
  return 0;
×
148
}
149

UNCOV
150
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
×
UNCOV
151
  switch (pMsg->msgType) {
×
UNCOV
152
    case TDMT_STREAM_TASK_DEPLOY: {
×
UNCOV
153
      void   *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
×
UNCOV
154
      int32_t len = pMsg->contLen - sizeof(SMsgHead);
×
UNCOV
155
      return tqStreamTaskProcessDeployReq(pSnode->pMeta, &pSnode->msgCb, pMsg->info.conn.applyIndex, pReq, len, true,
×
156
                                          true);
157
    }
158

UNCOV
159
    case TDMT_STREAM_TASK_DROP:
×
UNCOV
160
      return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
×
UNCOV
161
    case TDMT_VND_STREAM_TASK_UPDATE:
×
UNCOV
162
      return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true, true);
×
163
    case TDMT_VND_STREAM_TASK_RESET:
×
164
      return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont);
×
165
    case TDMT_VND_STREAM_ALL_STOP:
×
166
      return tqStreamTaskProcessAllTaskStopReq(pSnode->pMeta, &pSnode->msgCb, pMsg);
×
UNCOV
167
    case TDMT_STREAM_TASK_PAUSE:
×
UNCOV
168
      return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
×
UNCOV
169
    case TDMT_STREAM_TASK_RESUME:
×
UNCOV
170
      return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
×
UNCOV
171
    case TDMT_STREAM_TASK_UPDATE_CHKPT:
×
UNCOV
172
      return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont);
×
UNCOV
173
    case TDMT_STREAM_CONSEN_CHKPT:
×
UNCOV
174
      return tqStreamTaskProcessConsenChkptIdReq(pSnode->pMeta, pMsg);
×
175
    default:
×
176
      return TSDB_CODE_INVALID_MSG;
×
177
  }
178
  return 0;
179
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc