• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4986

15 Mar 2026 08:32AM UTC coverage: 37.305% (-31.3%) from 68.601%
#4986

push

travis-ci

tomchon
test: keep docs and unit test

125478 of 336361 relevant lines covered (37.3%)

1134847.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

14.89
/source/dnode/mgmt/mgmt_snode/src/smInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "libs/function/function.h"
18
#include "libs/function/tudf.h"
19
#include "smInt.h"
20
#include "stream.h"
21
#include "tencrypt.h"
22

23
extern SSnodeInfo gSnode;
24

25
static int32_t smRequire(const SMgmtInputOpt *pInput, bool *required) {
16✔
26
  char path[TSDB_FILENAME_LEN];
27
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, pInput->dnodeId);
16✔
28
  SJson    *pJson = NULL;
16✔
29

30
  int32_t code = dmReadFileJson(path, pInput->name, &pJson, required);
16✔
31
  if (code) {
16✔
32
    return code;
×
33
  }
34

35
  SDCreateSnodeReq req = {0};
16✔
36
  code = smBuildCreateReqFromJson(pJson, &req);
16✔
37
  if (code) {
16✔
38
    if (pJson != NULL) cJSON_Delete(pJson);
×
39
    return code;
×
40
  }
41

42
  smUpdateSnodeInfo(&req);
16✔
43

44
  if (pJson != NULL) cJSON_Delete(pJson);
16✔
45
  return code;
16✔
46
}
47

48
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { pOption->msgCb = pMgmt->msgCb; }
×
49

50
static void smClose(SSnodeMgmt *pMgmt) {
×
51
  if (pMgmt->pSnode != NULL) {
×
52
    sndClose(pMgmt->pSnode);
×
53
    smStopWorker(pMgmt);
×
54
    pMgmt->pSnode = NULL;
×
55
  }
56

57
  taosMemoryFree(pMgmt);
×
58
}
×
59
int32_t sndOpenWrapper(const char *path, SSnodeOpt *pOption, SSnode **pNode) {
×
60
  *pNode = sndOpen(path, pOption);
×
61
  if (*pNode == NULL) {
×
62
    return terrno;
×
63
  }
64
  return 0;
×
65
}
66

67
// Check and migrate checkpoint files to encrypted format if needed
68
int32_t smCheckAndMigrateCheckpoints(const char *path) {
×
69
  int32_t code = TSDB_CODE_SUCCESS;
×
70

71
  // Wait for encryption keys to be loaded
72
  code = taosWaitCfgKeyLoaded();
×
73
  if (TSDB_CODE_SUCCESS != code) {
×
74
    dError("failed to wait for encryption keys since %s", tstrerror(code));
×
75
    return code;
×
76
  }
77

78
  // Read encrypted flag from global snode info (set by smBuildCreateReqFromJson)
79
  taosRLockLatch(&gSnode.snodeLock);
×
80
  int32_t encrypted = gSnode.encrypted;
×
81
  taosRUnLockLatch(&gSnode.snodeLock);
×
82

83
  // If already encrypted or no metaKey, nothing to do
84
  if (encrypted || tsMetaKey[0] == '\0') {
×
85
    return 0;
×
86
  }
87

88
  dInfo("snode checkpoints not encrypted but tsMetaKey available, starting migration");
×
89

90
  // List all checkpoint files and re-encrypt them
91
  char checkpointDir[PATH_MAX] = {0};
×
92
  snprintf(checkpointDir, sizeof(checkpointDir), "%s%ssnode%scheckpoint", tsDataDir, TD_DIRSEP, TD_DIRSEP);
×
93

94
  TdDirPtr pDir = taosOpenDir(checkpointDir);
×
95
  if (pDir == NULL) {
×
96
    // Checkpoint directory doesn't exist yet, just mark as encrypted
97
    dInfo("checkpoint directory doesn't exist, marking as encrypted");
×
98
  } else {
99
    // Iterate through checkpoint files and re-encrypt each one
100
    TdDirEntryPtr de = NULL;
×
101
    while ((de = taosReadDir(pDir)) != NULL) {
×
102
      if (taosDirEntryIsDir(de)) {
×
103
        continue;
×
104
      }
105

106
      char *name = taosGetDirEntryName(de);
×
107
      if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) {
×
108
        continue;
×
109
      }
110

111
      // Only process .ck files
112
      if (strstr(name, ".ck") == NULL) {
×
113
        continue;
×
114
      }
115

116
      // Extract streamId from filename (format: <streamId>.ck)
117
      int64_t streamId = 0;
×
118
      if (sscanf(name, "%" PRIx64 ".ck", &streamId) != 1) {
×
119
        dWarn("skip invalid checkpoint file:%s", name);
×
120
        continue;
×
121
      }
122

123
      dInfo("migrating checkpoint file to encrypted format, streamId:%" PRIx64, streamId);
×
124

125
      // Read checkpoint
126
      void   *data = NULL;
×
127
      int64_t dataLen = 0;
×
128
      code = streamReadCheckPoint(streamId, &data, &dataLen);
×
129
      if (TSDB_CODE_SUCCESS != code) {
×
130
        dError("failed to read checkpoint for migration, streamId:%" PRIx64 " since %s", streamId, tstrerror(code));
×
131
        return code;
×
132
      }
133

134
      if (data == NULL || dataLen <= 0) {
×
135
        dInfo("no checkpoint data for streamId:%" PRIx64 ", skipping", streamId);
×
136
        taosMemoryFree(data);
×
137
        continue;
×
138
      }
139

140
      // Rewrite checkpoint (will be encrypted with tsMetaKey)
141
      code = streamWriteCheckPoint(streamId, data, dataLen);
×
142
      taosMemoryFree(data);
×
143
      if (code != TSDB_CODE_SUCCESS) {
×
144
        dError("failed to rewrite checkpoint for streamId:%" PRIx64 " since %s", streamId, tstrerror(code));
×
145
        taosMemoryFree(data);
×
146
        TAOS_UNUSED(taosCloseDir(&pDir));
×
147
        return code;
×
148
      }
149

150
      dInfo("successfully migrated checkpoint to encrypted format, streamId:%" PRIx64, streamId);
×
151
    }
152

153
    TAOS_UNUSED(taosCloseDir(&pDir));
×
154
  }
155

156
  // Update and persist encrypted flag to snode.json
157
  // First need to read deployed status
158
  bool   deployed = false;
×
159
  SJson *pJson = NULL;
×
160
  code = dmReadFileJson(path, "snode", &pJson, &deployed);
×
161
  if (TSDB_CODE_SUCCESS != code) {
×
162
    dError("failed to read snode.json for persisting encrypted flag since %s", tstrerror(code));
×
163
    return code;
×
164
  }
165
  if (pJson != NULL) cJSON_Delete(pJson);
×
166

167
  // Now write with encrypted flag set to true
168
  code = dmWriteFileWithEncrypted(path, "snode", deployed, true);
×
169
  if (TSDB_CODE_SUCCESS != code) {
×
170
    dError("failed to persist encrypted flag to snode.json since %s", tstrerror(code));
×
171
    return code;
×
172
  }
173

174
  // Update global encrypted flag
175
  taosWLockLatch(&gSnode.snodeLock);
×
176
  gSnode.encrypted = 1;
×
177
  taosWUnLockLatch(&gSnode.snodeLock);
×
178

179
  dInfo("snode checkpoints migrated to encrypted format and persisted encrypted flag successfully");
×
180
  return TSDB_CODE_SUCCESS;
×
181
}
182

183
int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
×
184
  int32_t     code = 0;
×
185
  SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt));
×
186
  if (pMgmt == NULL) {
×
187
    code = terrno;
×
188
    return code;
×
189
  }
190

191
  pMgmt->pData = pInput->pData;
×
192
  pMgmt->path = pInput->path;
×
193
  pMgmt->name = pInput->name;
×
194
  pMgmt->msgCb = pInput->msgCb;
×
195
  pMgmt->msgCb.mgmt = pMgmt;
×
196
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)smPutMsgToQueue;
×
197

198
  SSnodeOpt option = {0};
×
199
  smInitOption(pMgmt, &option);
×
200

201
  code = sndOpenWrapper(pMgmt->path, &option, &pMgmt->pSnode);
×
202
  if (code != 0) {
×
203
    dError("failed to open snode since %s", tstrerror(code));
×
204
    smClose(pMgmt);
×
205
    return code;
×
206
  }
207

208
  tmsgReportStartup("snode-impl", "initialized");
×
209

210
  // Check and migrate checkpoint files to encrypted format if needed
211
  // The encrypted flag is read from snode.json in smRequire() via smBuildCreateReqFromJson()
212
  code = smCheckAndMigrateCheckpoints(pMgmt->path);
×
213
  if (TSDB_CODE_SUCCESS != code) {
×
214
    dError("failed to check and migrate snode checkpoints since %s", tstrerror(code));
×
215
    // Don't fail the startup, just log the error
216
    smClose(pMgmt);
×
217
    return code;
×
218
  }
219

220
  if ((code = smStartWorker(pMgmt)) != 0) {
×
221
    dError("failed to start snode worker since %s", tstrerror(code));
×
222
    smClose(pMgmt);
×
223
    return code;
×
224
  }
225
  tmsgReportStartup("snode-worker", "initialized");
×
226

227
  if ((code = udfcOpen()) != 0) {
×
228
    dError("failed to open udfc in snode since:%s", tstrerror(code));
×
229
    smClose(pMgmt);
×
230
    return code;
×
231
  }
232

233
  pOutput->pMgmt = pMgmt;
×
234
  return 0;
×
235
}
236

237
static int32_t smStartSnodes(SSnodeMgmt *pMgmt) { 
×
238
  return sndInit(pMgmt->pSnode); 
×
239
}
240

241
SMgmtFunc smGetMgmtFunc() {
16✔
242
  SMgmtFunc mgmtFunc = {0};
16✔
243
  mgmtFunc.openFp = smOpen;
16✔
244
  mgmtFunc.startFp = (NodeStartFp)smStartSnodes;
16✔
245
  mgmtFunc.closeFp = (NodeCloseFp)smClose;
16✔
246
  mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq;
16✔
247
  mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq;
16✔
248
  mgmtFunc.requiredFp = smRequire;
16✔
249
  mgmtFunc.getHandlesFp = smGetMsgHandles;
16✔
250

251
  return mgmtFunc;
16✔
252
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc