• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4935

22 Jan 2026 06:38AM UTC coverage: 66.708% (+0.02%) from 66.691%
#4935

push

travis-ci

web-flow
merge: from main to 3.0 #34371

121 of 271 new or added lines in 17 files covered. (44.65%)

9066 existing lines in 149 files now uncovered.

203884 of 305637 relevant lines covered (66.71%)

125811266.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.18
/source/dnode/mgmt/mgmt_snode/src/smInt.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "libs/function/function.h"
18
#include "libs/function/tudf.h"
19
#include "smInt.h"
20
#include "stream.h"
21
#include "tencrypt.h"
22

23
extern SSnodeInfo gSnode;
24

25
static int32_t smRequire(const SMgmtInputOpt *pInput, bool *required) {
546,239✔
26
  char path[TSDB_FILENAME_LEN];
544,774✔
27
  snprintf(path, TSDB_FILENAME_LEN, "%s%ssnode%d", pInput->path, TD_DIRSEP, pInput->dnodeId);
546,239✔
28
  SJson    *pJson = NULL;
546,239✔
29

30
  int32_t code = dmReadFileJson(path, pInput->name, &pJson, required);
546,239✔
31
  if (code) {
546,239✔
32
    return code;
×
33
  }
34

35
  SDCreateSnodeReq req = {0};
546,239✔
36
  code = smBuildCreateReqFromJson(pJson, &req);
546,239✔
37
  if (code) {
546,239✔
38
    if (pJson != NULL) cJSON_Delete(pJson);
×
39
    return code;
×
40
  }
41

42
  smUpdateSnodeInfo(&req);
546,239✔
43

44
  if (pJson != NULL) cJSON_Delete(pJson);
546,239✔
45
  return code;
546,239✔
46
}
47

48
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { pOption->msgCb = pMgmt->msgCb; }
56,491✔
49

50
static void smClose(SSnodeMgmt *pMgmt) {
56,491✔
51
  if (pMgmt->pSnode != NULL) {
56,491✔
52
    sndClose(pMgmt->pSnode);
56,491✔
53
    smStopWorker(pMgmt);
56,491✔
54
    pMgmt->pSnode = NULL;
56,491✔
55
  }
56

57
  taosMemoryFree(pMgmt);
56,491✔
58
}
56,491✔
59
int32_t sndOpenWrapper(const char *path, SSnodeOpt *pOption, SSnode **pNode) {
56,491✔
60
  *pNode = sndOpen(path, pOption);
56,491✔
61
  if (*pNode == NULL) {
56,491✔
62
    return terrno;
×
63
  }
64
  return 0;
56,491✔
65
}
66

67
// Check and migrate checkpoint files to encrypted format if needed
68
int32_t smCheckAndMigrateCheckpoints(const char *path) {
56,491✔
69
  int32_t code = TSDB_CODE_SUCCESS;
56,491✔
70

71
  // Wait for encryption keys to be loaded
72
  code = taosWaitCfgKeyLoaded();
56,491✔
73
  if (TSDB_CODE_SUCCESS != code) {
56,491✔
NEW
74
    dError("failed to wait for encryption keys since %s", tstrerror(code));
×
NEW
75
    return code;
×
76
  }
77

78
  // Read encrypted flag from global snode info (set by smBuildCreateReqFromJson)
79
  taosRLockLatch(&gSnode.snodeLock);
56,491✔
80
  int32_t encrypted = gSnode.encrypted;
56,491✔
81
  taosRUnLockLatch(&gSnode.snodeLock);
56,491✔
82

83
  // If already encrypted or no metaKey, nothing to do
84
  if (encrypted || tsMetaKey[0] == '\0') {
56,491✔
85
    return 0;
56,491✔
86
  }
87

NEW
88
  dInfo("snode checkpoints not encrypted but tsMetaKey available, starting migration");
×
89

90
  // List all checkpoint files and re-encrypt them
NEW
91
  char checkpointDir[PATH_MAX] = {0};
×
NEW
92
  snprintf(checkpointDir, sizeof(checkpointDir), "%s%ssnode%scheckpoint", tsDataDir, TD_DIRSEP, TD_DIRSEP);
×
93

NEW
94
  TdDirPtr pDir = taosOpenDir(checkpointDir);
×
NEW
95
  if (pDir == NULL) {
×
96
    // Checkpoint directory doesn't exist yet, just mark as encrypted
NEW
97
    dInfo("checkpoint directory doesn't exist, marking as encrypted");
×
98
  } else {
99
    // Iterate through checkpoint files and re-encrypt each one
NEW
100
    TdDirEntryPtr de = NULL;
×
NEW
101
    while ((de = taosReadDir(pDir)) != NULL) {
×
NEW
102
      if (taosDirEntryIsDir(de)) {
×
NEW
103
        continue;
×
104
      }
105

NEW
106
      char *name = taosGetDirEntryName(de);
×
NEW
107
      if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) {
×
NEW
108
        continue;
×
109
      }
110

111
      // Only process .ck files
NEW
112
      if (strstr(name, ".ck") == NULL) {
×
NEW
113
        continue;
×
114
      }
115

116
      // Extract streamId from filename (format: <streamId>.ck)
NEW
117
      int64_t streamId = 0;
×
NEW
118
      if (sscanf(name, "%" PRIx64 ".ck", &streamId) != 1) {
×
NEW
119
        dWarn("skip invalid checkpoint file:%s", name);
×
NEW
120
        continue;
×
121
      }
122

NEW
123
      dInfo("migrating checkpoint file to encrypted format, streamId:%" PRIx64, streamId);
×
124

125
      // Read checkpoint
NEW
126
      void   *data = NULL;
×
NEW
127
      int64_t dataLen = 0;
×
NEW
128
      code = streamReadCheckPoint(streamId, &data, &dataLen);
×
NEW
129
      if (TSDB_CODE_SUCCESS != code) {
×
NEW
130
        dError("failed to read checkpoint for migration, streamId:%" PRIx64 " since %s", streamId, tstrerror(code));
×
NEW
131
        return code;
×
132
      }
133

NEW
134
      if (data == NULL || dataLen <= 0) {
×
NEW
135
        dInfo("no checkpoint data for streamId:%" PRIx64 ", skipping", streamId);
×
NEW
136
        taosMemoryFree(data);
×
NEW
137
        continue;
×
138
      }
139

140
      // Rewrite checkpoint (will be encrypted with tsMetaKey)
NEW
141
      code = streamWriteCheckPoint(streamId, data, dataLen);
×
NEW
142
      taosMemoryFree(data);
×
NEW
143
      if (code != TSDB_CODE_SUCCESS) {
×
NEW
144
        dError("failed to rewrite checkpoint for streamId:%" PRIx64 " since %s", streamId, tstrerror(code));
×
NEW
145
        taosMemoryFree(data);
×
NEW
146
        TAOS_UNUSED(taosCloseDir(&pDir));
×
NEW
147
        return code;
×
148
      }
149

NEW
150
      dInfo("successfully migrated checkpoint to encrypted format, streamId:%" PRIx64, streamId);
×
151
    }
152

NEW
153
    TAOS_UNUSED(taosCloseDir(&pDir));
×
154
  }
155

156
  // Update and persist encrypted flag to snode.json
157
  // First need to read deployed status
NEW
158
  bool   deployed = false;
×
NEW
159
  SJson *pJson = NULL;
×
NEW
160
  code = dmReadFileJson(path, "snode", &pJson, &deployed);
×
NEW
161
  if (TSDB_CODE_SUCCESS != code) {
×
NEW
162
    dError("failed to read snode.json for persisting encrypted flag since %s", tstrerror(code));
×
NEW
163
    return code;
×
164
  }
NEW
165
  if (pJson != NULL) cJSON_Delete(pJson);
×
166

167
  // Now write with encrypted flag set to true
NEW
168
  code = dmWriteFileWithEncrypted(path, "snode", deployed, true);
×
NEW
169
  if (TSDB_CODE_SUCCESS != code) {
×
NEW
170
    dError("failed to persist encrypted flag to snode.json since %s", tstrerror(code));
×
NEW
171
    return code;
×
172
  }
173

174
  // Update global encrypted flag
NEW
175
  taosWLockLatch(&gSnode.snodeLock);
×
NEW
176
  gSnode.encrypted = 1;
×
NEW
177
  taosWUnLockLatch(&gSnode.snodeLock);
×
178

NEW
179
  dInfo("snode checkpoints migrated to encrypted format and persisted encrypted flag successfully");
×
NEW
180
  return TSDB_CODE_SUCCESS;
×
181
}
182

183
int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
56,491✔
184
  int32_t     code = 0;
56,491✔
185
  SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt));
56,491✔
186
  if (pMgmt == NULL) {
56,491✔
187
    code = terrno;
×
188
    return code;
×
189
  }
190

191
  pMgmt->pData = pInput->pData;
56,491✔
192
  pMgmt->path = pInput->path;
56,491✔
193
  pMgmt->name = pInput->name;
56,491✔
194
  pMgmt->msgCb = pInput->msgCb;
56,491✔
195
  pMgmt->msgCb.mgmt = pMgmt;
56,491✔
196
  pMgmt->msgCb.putToQueueFp = (PutToQueueFp)smPutMsgToQueue;
56,491✔
197

198
  SSnodeOpt option = {0};
56,491✔
199
  smInitOption(pMgmt, &option);
56,491✔
200

201
  code = sndOpenWrapper(pMgmt->path, &option, &pMgmt->pSnode);
56,491✔
202
  if (code != 0) {
56,491✔
203
    dError("failed to open snode since %s", tstrerror(code));
×
204
    smClose(pMgmt);
×
205
    return code;
×
206
  }
207

208
  tmsgReportStartup("snode-impl", "initialized");
56,491✔
209

210
  // Check and migrate checkpoint files to encrypted format if needed
211
  // The encrypted flag is read from snode.json in smRequire() via smBuildCreateReqFromJson()
212
  code = smCheckAndMigrateCheckpoints(pMgmt->path);
56,491✔
213
  if (TSDB_CODE_SUCCESS != code) {
56,491✔
NEW
214
    dError("failed to check and migrate snode checkpoints since %s", tstrerror(code));
×
215
    // Don't fail the startup, just log the error
NEW
216
    smClose(pMgmt);
×
NEW
217
    return code;
×
218
  }
219

220
  if ((code = smStartWorker(pMgmt)) != 0) {
56,491✔
221
    dError("failed to start snode worker since %s", tstrerror(code));
×
222
    smClose(pMgmt);
×
223
    return code;
×
224
  }
225
  tmsgReportStartup("snode-worker", "initialized");
56,491✔
226

227
  if ((code = udfcOpen()) != 0) {
56,491✔
228
    dError("failed to open udfc in snode since:%s", tstrerror(code));
×
229
    smClose(pMgmt);
×
230
    return code;
×
231
  }
232

233
  pOutput->pMgmt = pMgmt;
56,491✔
234
  return 0;
56,491✔
235
}
236

237
static int32_t smStartSnodes(SSnodeMgmt *pMgmt) { 
56,491✔
238
  return sndInit(pMgmt->pSnode); 
56,491✔
239
}
240

241
SMgmtFunc smGetMgmtFunc() {
546,239✔
242
  SMgmtFunc mgmtFunc = {0};
546,239✔
243
  mgmtFunc.openFp = smOpen;
546,239✔
244
  mgmtFunc.startFp = (NodeStartFp)smStartSnodes;
546,239✔
245
  mgmtFunc.closeFp = (NodeCloseFp)smClose;
546,239✔
246
  mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq;
546,239✔
247
  mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq;
546,239✔
248
  mgmtFunc.requiredFp = smRequire;
546,239✔
249
  mgmtFunc.getHandlesFp = smGetMsgHandles;
546,239✔
250

251
  return mgmtFunc;
546,239✔
252
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc