• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4986

15 Mar 2026 08:32AM UTC coverage: 37.305% (-31.3%) from 68.601%
#4986

push

travis-ci

tomchon
test: keep docs and unit test

125478 of 336361 relevant lines covered (37.3%)

1134847.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.97
/source/dnode/mnode/impl/src/mndXnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include <stdio.h>
18
#include "mndDef.h"
19
#include "tdatablock.h"
20
#include "types.h"
21
#ifndef WINDOWS
22
#include <curl/curl.h>
23
#include <openssl/bio.h>
24
#include <openssl/buffer.h>
25
#include <openssl/err.h>
26
#include <openssl/evp.h>
27
#include <openssl/hmac.h>
28
#endif
29
#include "audit.h"
30
#include "mndDnode.h"
31
#include "mndPrivilege.h"
32
#include "mndShow.h"
33
#include "mndTrans.h"
34
#include "mndUser.h"
35
#include "mndXnode.h"
36
#include "sdb.h"
37
#include "taoserror.h"
38
#include "tjson.h"
39
#include "xnode.h"
40

41
#define TSDB_XNODE_RESERVE_SIZE 64
42
#define XNODED_PIPE_SOCKET_URL "http://localhost"
43
typedef enum {
44
  HTTP_TYPE_GET = 0,
45
  HTTP_TYPE_POST,
46
  HTTP_TYPE_DELETE,
47
} EHttpType;
48
typedef struct {
49
  char   *data;
50
  int64_t dataLen;
51
} SCurlResp;
52

53
const int32_t defaultTimeout = 1000;
54

55
/** xnodes systable actions */
56
SSdbRaw *mndXnodeActionEncode(SXnodeObj *pObj);
57
SSdbRow *mndXnodeActionDecode(SSdbRaw *pRaw);
58
int32_t  mndXnodeActionInsert(SSdb *pSdb, SXnodeObj *pObj);
59
int32_t  mndXnodeActionUpdate(SSdb *pSdb, SXnodeObj *pOld, SXnodeObj *pNew);
60
int32_t  mndXnodeActionDelete(SSdb *pSdb, SXnodeObj *pObj);
61

62
/** @section xnodes request handlers */
63
static int32_t mndProcessCreateXnodeReq(SRpcMsg *pReq);
64
static int32_t mndProcessUpdateXnodeReq(SRpcMsg *pReq);
65
static int32_t mndProcessDropXnodeReq(SRpcMsg *pReq);
66
static int32_t mndProcessDrainXnodeReq(SRpcMsg *pReq);
67
static int32_t mndRetrieveXnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
68
static void    mndCancelGetNextXnode(SMnode *pMnode, void *pIter);
69

70
/** @section xnode task handlers */
71
SSdbRaw *mndXnodeTaskActionEncode(SXnodeTaskObj *pObj);
72
SSdbRow *mndXnodeTaskActionDecode(SSdbRaw *pRaw);
73
int32_t  mndXnodeTaskActionInsert(SSdb *pSdb, SXnodeTaskObj *pObj);
74
int32_t  mndXnodeTaskActionUpdate(SSdb *pSdb, SXnodeTaskObj *pOld, SXnodeTaskObj *pNew);
75
int32_t  mndXnodeTaskActionDelete(SSdb *pSdb, SXnodeTaskObj *pObj);
76

77
static int32_t mndProcessCreateXnodeTaskReq(SRpcMsg *pReq);
78
static int32_t mndProcessStartXnodeTaskReq(SRpcMsg *pReq);
79
static int32_t mndProcessStopXnodeTaskReq(SRpcMsg *pReq);
80
static int32_t mndProcessUpdateXnodeTaskReq(SRpcMsg *pReq);
81
static int32_t mndProcessDropXnodeTaskReq(SRpcMsg *pReq);
82
static int32_t mndRetrieveXnodeTasks(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
83
static void    mndCancelGetNextXnodeTask(SMnode *pMnode, void *pIter);
84

85
/** @section xnode task job handlers */
86
SSdbRaw *mndXnodeJobActionEncode(SXnodeJobObj *pObj);
87
SSdbRow *mndXnodeJobActionDecode(SSdbRaw *pRaw);
88
int32_t  mndXnodeJobActionInsert(SSdb *pSdb, SXnodeJobObj *pObj);
89
int32_t  mndXnodeJobActionUpdate(SSdb *pSdb, SXnodeJobObj *pOld, SXnodeJobObj *pNew);
90
int32_t  mndXnodeJobActionDelete(SSdb *pSdb, SXnodeJobObj *pObj);
91

92
static int32_t mndProcessCreateXnodeJobReq(SRpcMsg *pReq);
93
static int32_t mndProcessUpdateXnodeJobReq(SRpcMsg *pReq);
94
static int32_t mndProcessRebalanceXnodeJobReq(SRpcMsg *pReq);
95
static int32_t mndProcessRebalanceXnodeJobsWhereReq(SRpcMsg *pReq);
96
static int32_t mndProcessDropXnodeJobReq(SRpcMsg *pReq);
97
static int32_t mndRetrieveXnodeJobs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
98
static void    mndCancelGetNextXnodeJob(SMnode *pMnode, void *pIter);
99

100
/** @section xnode user pass handlers */
101
SSdbRaw *mndXnodeUserPassActionEncode(SXnodeUserPassObj *pObj);
102
SSdbRow *mndXnodeUserPassActionDecode(SSdbRaw *pRaw);
103
int32_t  mndXnodeUserPassActionInsert(SSdb *pSdb, SXnodeUserPassObj *pObj);
104
int32_t  mndXnodeUserPassActionUpdate(SSdb *pSdb, SXnodeUserPassObj *pOld, SXnodeUserPassObj *pNew);
105
int32_t  mndXnodeUserPassActionDelete(SSdb *pSdb, SXnodeUserPassObj *pObj);
106

107
/** @section xnode agent handlers */
108
SSdbRaw *mndXnodeAgentActionEncode(SXnodeAgentObj *pObj);
109
SSdbRow *mndXnodeAgentActionDecode(SSdbRaw *pRaw);
110
int32_t  mndXnodeAgentActionInsert(SSdb *pSdb, SXnodeAgentObj *pObj);
111
int32_t  mndXnodeAgentActionUpdate(SSdb *pSdb, SXnodeAgentObj *pOld, SXnodeAgentObj *pNew);
112
int32_t  mndXnodeAgentActionDelete(SSdb *pSdb, SXnodeAgentObj *pObj);
113

114
static int32_t mndProcessCreateXnodeAgentReq(SRpcMsg *pReq);
115
static int32_t mndProcessUpdateXnodeAgentReq(SRpcMsg *pReq);
116
static int32_t mndProcessDropXnodeAgentReq(SRpcMsg *pReq);
117
static int32_t mndRetrieveXnodeAgents(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
118
static void    mndCancelGetNextXnodeAgent(SMnode *pMnode, void *pIter);
119

120
/** @section xnoded mgmt */
121
void mndStartXnoded(SMnode *pMnode, const char *user, const char *pass, const char *token);
122
void mndRestartXnoded(SMnode *pMnode);
123

124
/** @section others */
125
static int32_t mndGetXnodeStatus(SXnodeObj *pObj, char *status, int32_t statusLen);
126
SXnodeTaskObj *mndAcquireXnodeTask(SMnode *pMnode, int32_t tid);
127
SJson         *mndSendReqRetJson(const char *url, EHttpType type, int64_t timeout, const char *buf, int64_t bufLen);
128
static int32_t mndSetDropXnodeJobInfoToTrans(STrans *pTrans, SXnodeJobObj *pObj, bool force);
129
void           mndReleaseXnodeJob(SMnode *pMnode, SXnodeJobObj *pObj);
130
static int32_t mndValidateXnodePermissions(SMnode *pMnode, SRpcMsg *pReq, EOperType oper);
131

132
int32_t mndInitXnode(SMnode *pMnode) {
16✔
133
  SSdbTable table = {
16✔
134
      .sdbType = SDB_XNODE,
135
      .keyType = SDB_KEY_INT32,
136
      .encodeFp = (SdbEncodeFp)mndXnodeActionEncode,
137
      .decodeFp = (SdbDecodeFp)mndXnodeActionDecode,
138
      .insertFp = (SdbInsertFp)mndXnodeActionInsert,
139
      .updateFp = (SdbUpdateFp)mndXnodeActionUpdate,
140
      .deleteFp = (SdbDeleteFp)mndXnodeActionDelete,
141
  };
142

143
  int32_t code = sdbSetTable(pMnode->pSdb, table);
16✔
144
  if (code != 0) {
16✔
145
    return code;
×
146
  }
147

148
  SSdbTable tasks = {
16✔
149
      .sdbType = SDB_XNODE_TASK,
150
      .keyType = SDB_KEY_INT32,
151
      .encodeFp = (SdbEncodeFp)mndXnodeTaskActionEncode,
152
      .decodeFp = (SdbDecodeFp)mndXnodeTaskActionDecode,
153
      .insertFp = (SdbInsertFp)mndXnodeTaskActionInsert,
154
      .updateFp = (SdbUpdateFp)mndXnodeTaskActionUpdate,
155
      .deleteFp = (SdbDeleteFp)mndXnodeTaskActionDelete,
156
  };
157

158
  code = sdbSetTable(pMnode->pSdb, tasks);
16✔
159
  if (code != 0) {
16✔
160
    return code;
×
161
  }
162

163
  SSdbTable jobs = {
16✔
164
      .sdbType = SDB_XNODE_JOB,
165
      .keyType = SDB_KEY_INT32,
166
      .encodeFp = (SdbEncodeFp)mndXnodeJobActionEncode,
167
      .decodeFp = (SdbDecodeFp)mndXnodeJobActionDecode,
168
      .insertFp = (SdbInsertFp)mndXnodeJobActionInsert,
169
      .updateFp = (SdbUpdateFp)mndXnodeJobActionUpdate,
170
      .deleteFp = (SdbDeleteFp)mndXnodeJobActionDelete,
171
  };
172

173
  code = sdbSetTable(pMnode->pSdb, jobs);
16✔
174
  if (code != 0) {
16✔
175
    return code;
×
176
  }
177

178
  SSdbTable agents = {
16✔
179
      .sdbType = SDB_XNODE_AGENT,
180
      .keyType = SDB_KEY_INT32,
181
      .encodeFp = (SdbEncodeFp)mndXnodeAgentActionEncode,
182
      .decodeFp = (SdbDecodeFp)mndXnodeAgentActionDecode,
183
      .insertFp = (SdbInsertFp)mndXnodeAgentActionInsert,
184
      .updateFp = (SdbUpdateFp)mndXnodeAgentActionUpdate,
185
      .deleteFp = (SdbDeleteFp)mndXnodeAgentActionDelete,
186
  };
187

188
  code = sdbSetTable(pMnode->pSdb, agents);
16✔
189
  if (code != 0) {
16✔
190
    return code;
×
191
  }
192

193
  SSdbTable userPass = {
16✔
194
      .sdbType = SDB_XNODE_USER_PASS,
195
      .keyType = SDB_KEY_INT32,
196
      .encodeFp = (SdbEncodeFp)mndXnodeUserPassActionEncode,
197
      .decodeFp = (SdbDecodeFp)mndXnodeUserPassActionDecode,
198
      .insertFp = (SdbInsertFp)mndXnodeUserPassActionInsert,
199
      .updateFp = (SdbUpdateFp)mndXnodeUserPassActionUpdate,
200
      .deleteFp = (SdbDeleteFp)mndXnodeUserPassActionDelete,
201
  };
202

203
  code = sdbSetTable(pMnode->pSdb, userPass);
16✔
204
  if (code != 0) {
16✔
205
    return code;
×
206
  }
207

208
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_XNODE, mndProcessCreateXnodeReq);
16✔
209
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_XNODE, mndProcessUpdateXnodeReq);
16✔
210
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_XNODE, mndProcessDropXnodeReq);
16✔
211
  mndSetMsgHandle(pMnode, TDMT_MND_DRAIN_XNODE, mndProcessDrainXnodeReq);
16✔
212
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_XNODES, mndRetrieveXnodes);
16✔
213
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_XNODES, mndCancelGetNextXnode);
16✔
214

215
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_XNODE_TASK, mndProcessCreateXnodeTaskReq);
16✔
216
  mndSetMsgHandle(pMnode, TDMT_MND_START_XNODE_TASK, mndProcessStartXnodeTaskReq);
16✔
217
  mndSetMsgHandle(pMnode, TDMT_MND_STOP_XNODE_TASK, mndProcessStopXnodeTaskReq);
16✔
218
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_XNODE_TASK, mndProcessUpdateXnodeTaskReq);
16✔
219
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_XNODE_TASK, mndProcessDropXnodeTaskReq);
16✔
220
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_XNODE_TASKS, mndRetrieveXnodeTasks);
16✔
221
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_XNODE_TASKS, mndCancelGetNextXnodeTask);
16✔
222

223
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_XNODE_JOB, mndProcessCreateXnodeJobReq);
16✔
224
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_XNODE_JOB, mndProcessUpdateXnodeJobReq);
16✔
225
  mndSetMsgHandle(pMnode, TDMT_MND_REBALANCE_XNODE_JOB, mndProcessRebalanceXnodeJobReq);
16✔
226
  mndSetMsgHandle(pMnode, TDMT_MND_REBALANCE_XNODE_JOBS_WHERE, mndProcessRebalanceXnodeJobsWhereReq);
16✔
227
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_XNODE_JOB, mndProcessDropXnodeJobReq);
16✔
228
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_XNODE_JOBS, mndRetrieveXnodeJobs);
16✔
229
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_XNODE_JOBS, mndCancelGetNextXnodeJob);
16✔
230

231
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_XNODE_AGENT, mndProcessCreateXnodeAgentReq);
16✔
232
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_XNODE_AGENT, mndProcessUpdateXnodeAgentReq);
16✔
233
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_XNODE_AGENT, mndProcessDropXnodeAgentReq);
16✔
234
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_XNODE_AGENTS, mndRetrieveXnodeAgents);
16✔
235
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_XNODE_AGENTS, mndCancelGetNextXnodeAgent);
16✔
236

237
  return 0;
16✔
238
}
239

240
/** tools section **/
241

242
int32_t xnodeCheckPasswordFmt(const char *pwd) {
×
243
  if (tsEnableAdvancedSecurity == 0 && strcmp(pwd, "taosdata") == 0) {
×
244
    return 0;
×
245
  }
246

247
  if (tsEnableStrongPassword == 0) {
×
248
    for (char c = *pwd; c != 0; c = *(++pwd)) {
×
249
      if (c == ' ' || c == '\'' || c == '\"' || c == '`' || c == '\\') {
×
250
        return TSDB_CODE_MND_INVALID_PASS_FORMAT;
×
251
      }
252
    }
253
    return 0;
×
254
  }
255

256
  int32_t len = strlen(pwd);
×
257
  if (len < TSDB_PASSWORD_MIN_LEN) {
×
258
    return TSDB_CODE_PAR_PASSWD_TOO_SHORT_OR_EMPTY;
×
259
  }
260

261
  if (len > TSDB_PASSWORD_MAX_LEN) {
×
262
    return TSDB_CODE_PAR_NAME_OR_PASSWD_TOO_LONG;
×
263
  }
264

265
  if (taosIsComplexString(pwd)) {
×
266
    return 0;
×
267
  }
268

269
  return TSDB_CODE_MND_INVALID_PASS_FORMAT;
×
270
}
271

272
static void swapFields(int32_t *newLen, char **ppNewStr, int32_t *oldLen, char **ppOldStr) {
×
273
  if (*newLen > 0) {
×
274
    int32_t tempLen = *newLen;
×
275
    *newLen = *oldLen;
×
276
    *oldLen = tempLen;
×
277

278
    char *tempStr = *ppNewStr;
×
279
    *ppNewStr = *ppOldStr;
×
280
    *ppOldStr = tempStr;
×
281
  }
282
}
×
283

284
/** xnode section **/
285

286
void mndCleanupXnode(SMnode *pMnode) {}
16✔
287

288
SXnodeObj *mndAcquireXnode(SMnode *pMnode, int32_t xnodeId) {
×
289
  SXnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_XNODE, &xnodeId);
×
290
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
291
    terrno = TSDB_CODE_MND_XNODE_NOT_EXIST;
×
292
  }
293
  return pObj;
×
294
}
295

296
void mndReleaseXnode(SMnode *pMnode, SXnodeObj *pObj) {
×
297
  SSdb *pSdb = pMnode->pSdb;
×
298
  sdbRelease(pSdb, pObj);
×
299
}
×
300

301
SSdbRaw *mndXnodeActionEncode(SXnodeObj *pObj) {
28✔
302
  int32_t code = 0;
28✔
303
  int32_t lino = 0;
28✔
304
  terrno = TSDB_CODE_OUT_OF_MEMORY;
28✔
305

306
  if (NULL == pObj) {
28✔
307
    terrno = TSDB_CODE_INVALID_PARA;
2✔
308
    return NULL;
2✔
309
  }
310

311
  int32_t rawDataLen = sizeof(SXnodeObj) + TSDB_XNODE_RESERVE_SIZE + pObj->urlLen + pObj->statusLen;
26✔
312

313
  SSdbRaw *pRaw = sdbAllocRaw(SDB_XNODE, TSDB_XNODE_VER_NUMBER, rawDataLen);
26✔
314
  if (pRaw == NULL) goto _OVER;
26✔
315

316
  int32_t dataPos = 0;
26✔
317
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
26✔
318
  SDB_SET_INT32(pRaw, dataPos, pObj->urlLen, _OVER)
26✔
319
  SDB_SET_BINARY(pRaw, dataPos, pObj->url, pObj->urlLen, _OVER)
26✔
320
  SDB_SET_INT32(pRaw, dataPos, pObj->statusLen, _OVER)
26✔
321
  SDB_SET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
26✔
322
  SDB_SET_INT64(pRaw, dataPos, pObj->createTime, _OVER)
26✔
323
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
26✔
324

325
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
26✔
326

327
  terrno = 0;
26✔
328

329
_OVER:
26✔
330
  if (terrno != 0) {
26✔
331
    mError("xnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
332
    sdbFreeRaw(pRaw);
×
333
    return NULL;
×
334
  }
335

336
  mTrace("xnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
26✔
337
  return pRaw;
26✔
338
}
339

340
SSdbRow *mndXnodeActionDecode(SSdbRaw *pRaw) {
28✔
341
  int32_t code = 0;
28✔
342
  int32_t lino = 0;
28✔
343
  terrno = TSDB_CODE_OUT_OF_MEMORY;
28✔
344
  SSdbRow   *pRow = NULL;
28✔
345
  SXnodeObj *pObj = NULL;
28✔
346

347
  if (NULL == pRaw) {
28✔
348
    terrno = TSDB_CODE_INVALID_PARA;
2✔
349
    return NULL;
2✔
350
  }
351

352
  int8_t sver = 0;
26✔
353
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
26✔
354

355
  if (sver != TSDB_XNODE_VER_NUMBER) {
26✔
356
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
357
    goto _OVER;
×
358
  }
359

360
  pRow = sdbAllocRow(sizeof(SXnodeObj));
26✔
361
  if (pRow == NULL) goto _OVER;
26✔
362

363
  pObj = sdbGetRowObj(pRow);
26✔
364
  if (pObj == NULL) goto _OVER;
26✔
365

366
  int32_t dataPos = 0;
26✔
367
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
26✔
368
  SDB_GET_INT32(pRaw, dataPos, &pObj->urlLen, _OVER)
26✔
369
  if (pObj->urlLen > 0) {
26✔
370
    pObj->url = taosMemoryCalloc(1, pObj->urlLen + 1);
24✔
371
    if (pObj->url == NULL) goto _OVER;
24✔
372
    SDB_GET_BINARY(pRaw, dataPos, pObj->url, pObj->urlLen, _OVER)
24✔
373
  } else {
374
    pObj->url = NULL;
2✔
375
  }
376
  SDB_GET_INT32(pRaw, dataPos, &pObj->statusLen, _OVER)
26✔
377
  if (pObj->statusLen > 0) {
26✔
378
    pObj->status = taosMemoryCalloc(1, pObj->statusLen + 1);
24✔
379
    if (pObj->status == NULL) goto _OVER;
24✔
380
    SDB_GET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
24✔
381
  } else {
382
    pObj->status = NULL;
2✔
383
  }
384
  SDB_GET_INT64(pRaw, dataPos, &pObj->createTime, _OVER)
26✔
385
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
26✔
386

387
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
26✔
388

389
  terrno = 0;
26✔
390

391
_OVER:
26✔
392
  if (terrno != 0) {
26✔
393
    mError("xnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
394
    if (pObj != NULL) {
×
395
      taosMemoryFreeClear(pObj->url);
×
396
    }
397
    taosMemoryFreeClear(pRow);
×
398
    return NULL;
×
399
  }
400

401
  mTrace("xnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
26✔
402
  return pRow;
26✔
403
}
404

405
static void mndFreeXnode(SXnodeObj *pObj) {
×
406
  if (pObj == NULL) return;
×
407
  if (pObj->url != NULL) {
×
408
    taosMemoryFreeClear(pObj->url);
×
409
  }
410
  if (pObj->status != NULL) {
×
411
    taosMemoryFreeClear(pObj->status);
×
412
  }
413
}
414

415
int32_t mndXnodeActionInsert(SSdb *pSdb, SXnodeObj *pObj) {
×
416
  mDebug("xnode:%d, perform insert action, row:%p", pObj->id, pObj);
×
417
  return 0;
×
418
}
419

420
int32_t mndXnodeActionDelete(SSdb *pSdb, SXnodeObj *pObj) {
×
421
  mDebug("xnode:%d, perform delete action, row:%p", pObj->id, pObj);
×
422
  mndFreeXnode(pObj);
×
423
  return 0;
×
424
}
425

426
int32_t mndXnodeActionUpdate(SSdb *pSdb, SXnodeObj *pOld, SXnodeObj *pNew) {
×
427
  mDebug("xnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
×
428

429
  taosWLockLatch(&pOld->lock);
×
430
  swapFields(&pNew->statusLen, &pNew->status, &pOld->statusLen, &pOld->status);
×
431
  if (pNew->updateTime > pOld->updateTime) {
×
432
    pOld->updateTime = pNew->updateTime;
×
433
  }
434
  taosWUnLockLatch(&pOld->lock);
×
435
  return 0;
×
436
}
437

438
void mndReleaseXnodeUserPass(SMnode *pMnode, SXnodeUserPassObj *pObj) {
×
439
  SSdb *pSdb = pMnode->pSdb;
×
440
  sdbRelease(pSdb, pObj);
×
441
}
×
442

443
SXnodeUserPassObj *mndAcquireFirstXnodeUserPass(SMnode *pMnode) {
16✔
444
  SSdb *pSdb = pMnode->pSdb;
16✔
445

446
  void *pIter = NULL;
16✔
447
  while (1) {
×
448
    SXnodeUserPassObj *pObj = NULL;
16✔
449
    pIter = sdbFetch(pSdb, SDB_XNODE_USER_PASS, pIter, (void **)&pObj);
16✔
450
    if (pIter == NULL) break;
16✔
451

452
    if (pObj != NULL) {
×
453
      sdbCancelFetch(pSdb, pIter);
×
454
      return pObj;
×
455
    }
456

457
    sdbRelease(pSdb, pObj);
×
458
  }
459
  terrno = TSDB_CODE_MND_XNODE_USER_PASS_NOT_EXIST;
16✔
460
  return NULL;
16✔
461
}
462

463
static int32_t mndSetCreateXnodeUserPassRedoLogs(STrans *pTrans, SXnodeUserPassObj *pObj) {
×
464
  int32_t  code = 0;
×
465
  SSdbRaw *pRedoRaw = mndXnodeUserPassActionEncode(pObj);
×
466
  if (pRedoRaw == NULL) {
×
467
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
468
    if (terrno != 0) code = terrno;
×
469
    TAOS_RETURN(code);
×
470
  }
471
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
472
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
473
  TAOS_RETURN(code);
×
474
}
475

476
static int32_t mndSetCreateXnodeUserPassCommitLogs(STrans *pTrans, SXnodeUserPassObj *pObj) {
×
477
  int32_t  code = 0;
×
478
  SSdbRaw *pCommitRaw = mndXnodeUserPassActionEncode(pObj);
×
479
  if (pCommitRaw == NULL) {
×
480
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
481
    if (terrno != 0) code = terrno;
×
482
    TAOS_RETURN(code);
×
483
  }
484
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
485
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
486
  TAOS_RETURN(code);
×
487
}
488

489
static int32_t mndSetCreateXnodeRedoLogs(STrans *pTrans, SXnodeObj *pObj) {
×
490
  int32_t  code = 0;
×
491
  SSdbRaw *pRedoRaw = mndXnodeActionEncode(pObj);
×
492
  if (pRedoRaw == NULL) {
×
493
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
494
    if (terrno != 0) code = terrno;
×
495
    TAOS_RETURN(code);
×
496
  }
497
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
498
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
499
  TAOS_RETURN(code);
×
500
}
501

502
static int32_t mndSetCreateXnodeUndoLogs(STrans *pTrans, SXnodeObj *pObj) {
×
503
  int32_t  code = 0;
×
504
  SSdbRaw *pUndoRaw = mndXnodeActionEncode(pObj);
×
505
  if (pUndoRaw == NULL) {
×
506
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
507
    if (terrno != 0) code = terrno;
×
508
    TAOS_RETURN(code);
×
509
  }
510
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
511
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
512
  TAOS_RETURN(code);
×
513
}
514

515
static int32_t mndSetCreateXnodeCommitLogs(STrans *pTrans, SXnodeObj *pObj) {
×
516
  int32_t  code = 0;
×
517
  SSdbRaw *pCommitRaw = mndXnodeActionEncode(pObj);
×
518
  if (pCommitRaw == NULL) {
×
519
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
520
    if (terrno != 0) code = terrno;
×
521
    TAOS_RETURN(code);
×
522
  }
523
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
524
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
525
  TAOS_RETURN(code);
×
526
}
527

528
static int32_t mndCreateXnode(SMnode *pMnode, SRpcMsg *pReq, SMCreateXnodeReq *pCreate) {
×
529
  int32_t code = 0, lino = 0;
×
530
  STrans *pTrans = NULL;
×
531

532
  SXnodeObj xnodeObj = {0};
×
533
  xnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_XNODE);
×
534

535
  xnodeObj.urlLen = pCreate->urlLen;
×
536
  if (xnodeObj.urlLen > TSDB_XNODE_URL_LEN) {
×
537
    code = TSDB_CODE_MND_XNODE_TOO_LONG_URL;
×
538
    lino = __LINE__;
×
539
    goto _OVER;
×
540
  }
541
  xnodeObj.url = taosMemoryCalloc(1, pCreate->urlLen);
×
542
  if (xnodeObj.url == NULL) goto _OVER;
×
543
  (void)memcpy(xnodeObj.url, pCreate->url, pCreate->urlLen);
×
544

545
  xnodeObj.createTime = taosGetTimestampMs();
×
546
  xnodeObj.updateTime = xnodeObj.createTime;
×
547
  mInfo("create xnode, xnode.id:%d, xnode.url: %s, xnode.time:%" PRId64, xnodeObj.id, xnodeObj.url,
×
548
        xnodeObj.createTime);
549

550
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-xnode");
×
551
  if (pTrans == NULL) {
×
552
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
553
    if (terrno != 0) code = terrno;
×
554
    mInfo("failed to create transaction for xnode:%s, code:0x%x:%s", pCreate->url, code, tstrerror(code));
×
555
    lino = __LINE__;
×
556
    goto _OVER;
×
557
  }
558
  mndTransSetSerial(pTrans);
×
559

560
  mInfo("trans:%d, used to create xnode:%s as xnode:%d", pTrans->id, pCreate->url, xnodeObj.id);
×
561

562
  TAOS_CHECK_GOTO(mndSetCreateXnodeRedoLogs(pTrans, &xnodeObj), NULL, _OVER);
×
563
  TAOS_CHECK_GOTO(mndSetCreateXnodeUndoLogs(pTrans, &xnodeObj), NULL, _OVER);
×
564
  TAOS_CHECK_GOTO(mndSetCreateXnodeCommitLogs(pTrans, &xnodeObj), NULL, _OVER);
×
565
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
566
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
567

568
_OVER:
×
569
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
570
    mError("xnode:failed create xnode since %s, line:%d", tstrerror(code), lino);
×
571
  }
572
  mndFreeXnode(&xnodeObj);
×
573
  mndTransDrop(pTrans);
×
574
  TAOS_RETURN(code);
×
575
}
576

577
static SXnodeObj *mndAcquireXnodeByURL(SMnode *pMnode, char *url) {
×
578
  SSdb *pSdb = pMnode->pSdb;
×
579

580
  void *pIter = NULL;
×
581
  while (1) {
×
582
    SXnodeObj *pXnode = NULL;
×
583
    pIter = sdbFetch(pSdb, SDB_XNODE, pIter, (void **)&pXnode);
×
584
    if (pIter == NULL) break;
×
585

586
    if (strcasecmp(url, pXnode->url) == 0) {
×
587
      sdbCancelFetch(pSdb, pIter);
×
588
      return pXnode;
×
589
    }
590

591
    sdbRelease(pSdb, pXnode);
×
592
  }
593

594
  mError("xnode:%s, not found", url);
×
595
  terrno = TSDB_CODE_MND_XNODE_NOT_EXIST;
×
596
  return NULL;
×
597
}
598

599
static int32_t mndStoreXnodeUserPassToken(SMnode *pMnode, SRpcMsg *pReq, SMCreateXnodeReq *pCreate) {
×
600
  int32_t code = 0, lino = 0;
×
601
  STrans *pTrans = NULL;
×
602

603
  SXnodeUserPassObj upObj = {0};
×
604
  upObj.id = sdbGetMaxId(pMnode->pSdb, SDB_XNODE_USER_PASS);
×
605

606
  if (pCreate->user) {
×
607
    upObj.userLen = pCreate->userLen;
×
608
    if (upObj.userLen > TSDB_USER_LEN) {
×
609
      code = TSDB_CODE_MND_USER_NOT_AVAILABLE;
×
610
      lino = __LINE__;
×
611
      goto _OVER;
×
612
    }
613
    upObj.user = taosMemoryCalloc(1, pCreate->userLen);
×
614
    if (upObj.user == NULL) goto _OVER;
×
615
    (void)memcpy(upObj.user, pCreate->user, pCreate->userLen);
×
616
  }
617
  if (pCreate->pass) {
×
618
    upObj.passLen = pCreate->passLen;
×
619
    if (upObj.passLen > TSDB_USER_PASSWORD_LONGLEN) {
×
620
      code = TSDB_CODE_MND_INVALID_PASS_FORMAT;
×
621
      lino = __LINE__;
×
622
      goto _OVER;
×
623
    }
624
    upObj.pass = taosMemoryCalloc(1, pCreate->passLen);
×
625
    if (upObj.pass == NULL) goto _OVER;
×
626
    (void)memcpy(upObj.pass, pCreate->pass, pCreate->passLen);
×
627
  }
628

629
  if (pCreate->token.ptr) {
×
630
    upObj.tokenLen = pCreate->token.len + 1;
×
631
    upObj.token = taosMemoryCalloc(1, upObj.tokenLen);
×
632
    if (upObj.token == NULL) goto _OVER;
×
633
    (void)memcpy(upObj.token, pCreate->token.ptr, pCreate->token.len);
×
634
  }
635

636
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-userpass");
×
637
  if (pTrans == NULL) {
×
638
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
639
    if (terrno != 0) code = terrno;
×
640
    mInfo("failed to create transaction for xnode:%s, code:0x%x:%s", pCreate->url, code, tstrerror(code));
×
641
    lino = __LINE__;
×
642
    goto _OVER;
×
643
  }
644
  mndTransSetSerial(pTrans);
×
645

646
  mInfo("trans:%d, used to create xnode:%s as xnode:%d", pTrans->id, pCreate->url, upObj.id);
×
647

648
  TAOS_CHECK_GOTO(mndSetCreateXnodeUserPassRedoLogs(pTrans, &upObj), NULL, _OVER);
×
649
  TAOS_CHECK_GOTO(mndSetCreateXnodeUserPassCommitLogs(pTrans, &upObj), NULL, _OVER);
×
650
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
651
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
652

653
_OVER:
×
654
  if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
655
    mError("xnode failed to store userpass since %s, line:%d", tstrerror(code), lino);
×
656
  }
657
  taosMemoryFreeClear(upObj.user);
×
658
  taosMemoryFreeClear(upObj.pass);
×
659
  taosMemoryFreeClear(upObj.token);
×
660
  mndTransDrop(pTrans);
×
661
  TAOS_RETURN(code);
×
662
}
663

664
#ifndef TD_ENTERPRISE
665

666
int32_t mndXnodeCreateDefaultToken(SRpcMsg* pReq, char** ppToken) {
667
  return TSDB_CODE_OPS_NOT_SUPPORT;
668
}
669

670
#endif
671

672
static int32_t httpCreateXnode(SXnodeObj *pObj) {
×
673
  int32_t code = 0;
×
674
  SJson  *pJson = NULL;
×
675
  SJson  *postContent = NULL;
×
676
  char   *pContStr = NULL;
×
677

678
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
679
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/xnode", XNODED_PIPE_SOCKET_URL);
×
680
  postContent = tjsonCreateObject();
×
681
  if (postContent == NULL) {
×
682
    code = terrno;
×
683
    goto _OVER;
×
684
  }
685
  TAOS_CHECK_GOTO(tjsonAddDoubleToObject(postContent, "id", (double)pObj->id), NULL, _OVER);
×
686
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "url", pObj->url), NULL, _OVER);
×
687
  pContStr = tjsonToUnformattedString(postContent);
×
688
  if (pContStr == NULL) {
×
689
    code = terrno;
×
690
    goto _OVER;
×
691
  }
692
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, defaultTimeout, pContStr, strlen(pContStr));
×
693

694
_OVER:
×
695
  if (postContent != NULL) {
×
696
    tjsonDelete(postContent);
×
697
  }
698
  if (pContStr != NULL) {
×
699
    taosMemFree(pContStr);
×
700
  }
701
  if (pJson != NULL) {
×
702
    tjsonDelete(pJson);
×
703
  }
704
  TAOS_RETURN(code);
×
705
}
706

707
static int32_t mndProcessCreateXnodeReq(SRpcMsg *pReq) {
×
708
  SMnode          *pMnode = pReq->info.node;
×
709
  int32_t          code = 0, lino = 0;
×
710
  SXnodeObj       *pObj = NULL;
×
711
  SMCreateXnodeReq createReq = {0};
×
712
  char            *pToken = NULL;
×
713

714
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_CREATE_XNODE);
×
715
  if (code != TSDB_CODE_SUCCESS) {
×
716
    mError("failed check permission for create xnode, code:%s", tstrerror(code));
×
717
    goto _OVER;
×
718
  }
719
  TAOS_CHECK_GOTO(tDeserializeSMCreateXnodeReq(pReq->pCont, pReq->contLen, &createReq), &lino, _OVER);
×
720
  mDebug("xnode:%s, start to create", createReq.url);
×
721

722
  pObj = mndAcquireXnodeByURL(pMnode, createReq.url);
×
723
  if (pObj != NULL) {
×
724
    code = TSDB_CODE_MND_XNODE_ALREADY_EXIST;
×
725
    goto _OVER;
×
726
  }
727

728
  int32_t numOfRows = sdbGetSize(pMnode->pSdb, SDB_XNODE_USER_PASS);
×
729
  if (numOfRows <= 0) {
×
730
    if (createReq.token.ptr != NULL) {
×
731
      (void)mndCreateXnode(pMnode, NULL, &createReq);
×
732
      code = mndStoreXnodeUserPassToken(pMnode, pReq, &createReq);
×
733
      if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
734
        lino = __LINE__;
×
735
        goto _OVER;
×
736
      }
737
      mndStartXnoded(pMnode, NULL, NULL, createReq.token.ptr);
×
738
    } else if (createReq.user != NULL) {
×
739
      TAOS_CHECK_GOTO(xnodeCheckPasswordFmt(createReq.pass), &lino, _OVER);
×
740
      (void)mndCreateXnode(pMnode, NULL, &createReq);
×
741
      code = mndStoreXnodeUserPassToken(pMnode, pReq, &createReq);
×
742
      if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
743
        lino = __LINE__;
×
744
        goto _OVER;
×
745
      }
746
      mndStartXnoded(pMnode, createReq.user, createReq.pass, NULL);
×
747
    } else {
748
      code = mndXnodeCreateDefaultToken(pReq, &pToken);
×
749
      if (code == TSDB_CODE_MND_TOKEN_ALREADY_EXIST) {
×
750
        code = 0;
×
751
      }
752
      if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
753
        lino = __LINE__;
×
754
        goto _OVER;
×
755
      }
756
      (void)mndCreateXnode(pMnode, NULL, &createReq);
×
757
      createReq.token = xCreateCowStr(strlen(pToken), pToken, false);
×
758
      (void)mndStoreXnodeUserPassToken(pMnode, NULL, &createReq);
×
759
      mndStartXnoded(pMnode, NULL, NULL, createReq.token.ptr);
×
760
    }
761
  } else {
762
    code = mndCreateXnode(pMnode, pReq, &createReq);
×
763
    if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
764
      lino = __LINE__;
×
765
      goto _OVER;
×
766
    }
767
  }
768

769
  taosMsleep(defaultTimeout);
×
770
  pObj = mndAcquireXnodeByURL(pMnode, createReq.url);
×
771
  if (pObj == NULL) {
×
772
    code = TSDB_CODE_MND_XNODE_NOT_EXIST;
×
773
    goto _OVER;
×
774
  }
775
  // send request
776
  (void)httpCreateXnode(pObj);
×
777

778
_OVER:
×
779
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
780
    mError("xnode:%s, failed to create since %s, line:%d", createReq.url, tstrerror(code), lino);
×
781
  }
782
  taosMemoryFreeClear(pToken);
×
783
  mndReleaseXnode(pMnode, pObj);
×
784
  tFreeSMCreateXnodeReq(&createReq);
×
785
  TAOS_RETURN(code);
×
786
}
787

788
static int32_t mndUpdateXnode(SMnode *pMnode, SXnodeObj *pXnode, SRpcMsg *pReq) {
×
789
  mInfo("xnode:%d, start to update", pXnode->id);
×
790
  int32_t   code = -1;
×
791
  STrans   *pTrans = NULL;
×
792
  SXnodeObj xnodeObj = {0};
×
793
  xnodeObj.id = pXnode->id;
×
794
  xnodeObj.updateTime = taosGetTimestampMs();
×
795

796
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-xnode");
×
797
  if (pTrans == NULL) {
×
798
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
799
    if (terrno != 0) code = terrno;
×
800
    goto _OVER;
×
801
  }
802
  mInfo("trans:%d, used to update xnode:%d", pTrans->id, xnodeObj.id);
×
803

804
  TAOS_CHECK_GOTO(mndSetCreateXnodeCommitLogs(pTrans, &xnodeObj), NULL, _OVER);
×
805
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
806
  code = 0;
×
807

808
_OVER:
×
809
  mndFreeXnode(&xnodeObj);
×
810
  mndTransDrop(pTrans);
×
811
  TAOS_RETURN(code);
×
812
}
813

814
static int32_t mndUpdateXnodeUserPassToken(SMnode *pMnode, SRpcMsg *pReq, SMUpdateXnodeReq *pUpdate) {
×
815
  int32_t           code = 0, lino = 0;
×
816
  STrans           *pTrans = NULL;
×
817
  SXnodeUserPassObj upObj = {0};
×
818
  upObj.id = pUpdate->id;
×
819
  upObj.updateTime = taosGetTimestampMs();
×
820

821
  if (pUpdate->user.ptr != NULL) {
×
822
    upObj.userLen = pUpdate->user.len + 1;
×
823
    upObj.user = taosMemoryCalloc(1, upObj.userLen);
×
824
    if (upObj.user == NULL) goto _OVER;
×
825
    (void)memcpy(upObj.user, pUpdate->user.ptr, pUpdate->user.len);
×
826
  }
827
  if (pUpdate->pass.ptr != NULL) {
×
828
    upObj.passLen = pUpdate->pass.len + 1;
×
829
    upObj.pass = taosMemoryCalloc(1, upObj.passLen);
×
830
    if (upObj.pass == NULL) goto _OVER;
×
831
    (void)memcpy(upObj.pass, pUpdate->pass.ptr, pUpdate->pass.len);
×
832
  }
833
  if (pUpdate->token.ptr != NULL) {
×
834
    upObj.tokenLen = pUpdate->token.len + 1;
×
835
    upObj.token = taosMemoryCalloc(1, upObj.tokenLen);
×
836
    if (upObj.token == NULL) goto _OVER;
×
837
    (void)memcpy(upObj.token, pUpdate->token.ptr, pUpdate->token.len);
×
838
  }
839

840
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-xnode-userpass");
×
841
  if (pTrans == NULL) {
×
842
    code = terrno;
×
843
    lino = __LINE__;
×
844
    goto _OVER;
×
845
  }
846
  mDebug("trans:%d, used to update xnode userpass or token:%d", pTrans->id, upObj.id);
×
847

848
  TAOS_CHECK_GOTO(mndSetCreateXnodeUserPassCommitLogs(pTrans, &upObj), NULL, _OVER);
×
849
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
850
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
851

852
_OVER:
×
853
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
854
    mError("xnode:failed to update xnode %d since %s, line:%d", upObj.id, tstrerror(code), lino);
×
855
  }
856
  taosMemoryFreeClear(upObj.user);
×
857
  taosMemoryFreeClear(upObj.pass);
×
858
  taosMemoryFreeClear(upObj.token);
×
859
  mndTransDrop(pTrans);
×
860
  TAOS_RETURN(code);
×
861
}
862

863
static int32_t mndProcessUpdateXnodeReq(SRpcMsg *pReq) {
×
864
  SMnode          *pMnode = pReq->info.node;
×
865
  int32_t          code = 0;
×
866
  SXnodeObj       *pObj = NULL;
×
867
  SMUpdateXnodeReq updateReq = {0};
×
868

869
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_UPDATE_XNODE);
×
870
  if (code != TSDB_CODE_SUCCESS) {
×
871
    mError("failed check permission for update xnode, code:%s", tstrerror(code));
×
872
    goto _OVER;
×
873
  }
874
  TAOS_CHECK_GOTO(tDeserializeSMUpdateXnodeReq(pReq->pCont, pReq->contLen, &updateReq), NULL, _OVER);
×
875

876
  if (updateReq.token.ptr != NULL || (updateReq.user.ptr != NULL && updateReq.pass.ptr != NULL)) {
×
877
    code = mndUpdateXnodeUserPassToken(pMnode, pReq, &updateReq);
×
878
    if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
879
      goto _OVER;
×
880
    }
881
    mndRestartXnoded(pMnode);
×
882
  }
883

884
_OVER:
×
885
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
886
    mError("xnode:%d, failed to update since %s", updateReq.id, tstrerror(code));
×
887
  }
888
  mndReleaseXnode(pMnode, pObj);
×
889
  tFreeSMUpdateXnodeReq(&updateReq);
×
890
  TAOS_RETURN(code);
×
891
}
892

893
static int32_t mndSetDropXnodeRedoLogs(STrans *pTrans, SXnodeObj *pObj) {
×
894
  int32_t  code = 0;
×
895
  SSdbRaw *pRedoRaw = mndXnodeActionEncode(pObj);
×
896
  if (pRedoRaw == NULL) {
×
897
    code = terrno;
×
898
    TAOS_RETURN(code);
×
899
  }
900

901
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
902
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
903
  TAOS_RETURN(code);
×
904
}
905

906
static int32_t mndSetDropXnodeCommitLogs(STrans *pTrans, SXnodeObj *pObj) {
×
907
  int32_t  code = 0;
×
908
  SSdbRaw *pCommitRaw = mndXnodeActionEncode(pObj);
×
909
  if (pCommitRaw == NULL) {
×
910
    code = terrno;
×
911
    TAOS_RETURN(code);
×
912
  }
913

914
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
915
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
916
  TAOS_RETURN(code);
×
917
}
918

919
static int32_t mndSetDropXnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SXnodeObj *pObj, bool force) {
×
920
  if (pObj == NULL) return 0;
×
921
  TAOS_CHECK_RETURN(mndSetDropXnodeRedoLogs(pTrans, pObj));
×
922
  TAOS_CHECK_RETURN(mndSetDropXnodeCommitLogs(pTrans, pObj));
×
923
  return 0;
×
924
}
925

926
static int32_t mndDropXnode(SMnode *pMnode, SRpcMsg *pReq, SXnodeObj *pObj) {
×
927
  int32_t code = 0;
×
928
  int32_t lino = 0;
×
929

930
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-xnode");
×
931
  TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
×
932

933
  mndTransSetSerial(pTrans);
×
934
  mDebug("trans:%d, to drop xnode:%d", pTrans->id, pObj->id);
×
935

936
  code = mndSetDropXnodeInfoToTrans(pMnode, pTrans, pObj, false);
×
937
  TSDB_CHECK_CODE(code, lino, _OVER);
×
938

939
  code = mndTransPrepare(pMnode, pTrans);
×
940

941
_OVER:
×
942
  mndTransDrop(pTrans);
×
943
  TAOS_RETURN(code);
×
944
}
945

946
static int32_t mndDrainXnode(SMnode *pMnode, SRpcMsg *pReq, SXnodeObj *pObj) {
×
947
  int32_t code = 0;
×
948
  int32_t lino = 0;
×
949

950
  SXnodeObj xnodeObj = {0};
×
951
  xnodeObj.id = pObj->id;
×
952
  xnodeObj.status = "drain";
×
953
  xnodeObj.statusLen = strlen(xnodeObj.status) + 1;
×
954
  xnodeObj.updateTime = taosGetTimestampMs();
×
955

956
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drain-xnode");
×
957
  TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
×
958

959
  mndTransSetSerial(pTrans);
×
960
  mDebug("trans:%d, to drain xnode:%d", pTrans->id, xnodeObj.id);
×
961

962
  TAOS_CHECK_GOTO(mndSetCreateXnodeCommitLogs(pTrans, &xnodeObj), NULL, _OVER);
×
963
  code = mndTransPrepare(pMnode, pTrans);
×
964

965
_OVER:
×
966
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
967
    mError("xnode:%d, failed to drain since %s", xnodeObj.id, tstrerror(code));
×
968
  }
969
  mndTransDrop(pTrans);
×
970
  TAOS_RETURN(code);
×
971
}
972

973
static int32_t mndProcessDropXnodeReq(SRpcMsg *pReq) {
×
974
  SMnode        *pMnode = pReq->info.node;
×
975
  int32_t        code = -1;
×
976
  SXnodeObj     *pObj = NULL;
×
977
  SMDropXnodeReq dropReq = {0};
×
978
  SJson         *pJson = NULL;
×
979

980
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_DROP_XNODE);
×
981
  if (code != TSDB_CODE_SUCCESS) {
×
982
    mError("failed check permission for drop xnode, code:%s", tstrerror(code));
×
983
    goto _OVER;
×
984
  }
985
  TAOS_CHECK_GOTO(tDeserializeSMDropXnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
986
  mDebug("xnode:%d, start to drop", dropReq.xnodeId);
×
987

988
  if (dropReq.xnodeId <= 0 && (dropReq.url == NULL || strlen(dropReq.url) <= 0)) {
×
989
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
990
    goto _OVER;
×
991
  }
992

993
  if (dropReq.url != NULL && strlen(dropReq.url) > 0) {
×
994
    pObj = mndAcquireXnodeByURL(pMnode, dropReq.url);
×
995
    if (pObj == NULL) {
×
996
      code = terrno;
×
997
      goto _OVER;
×
998
    }
999
  } else {
1000
    pObj = mndAcquireXnode(pMnode, dropReq.xnodeId);
×
1001
    if (pObj == NULL) {
×
1002
      code = terrno;
×
1003
      goto _OVER;
×
1004
    }
1005
  }
1006

1007
  // send request
1008
  char xnodeUrl[TSDB_XNODE_URL_LEN] = {0};
×
1009
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/xnode/%d?force=%s", XNODED_PIPE_SOCKET_URL, pObj->id,
×
1010
           dropReq.force ? "true" : "false");
×
1011
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_DELETE, defaultTimeout, NULL, 0);
×
1012

1013
  code = mndDropXnode(pMnode, pReq, pObj);
×
1014
  if (code == 0) {
×
1015
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1016
  }
1017

1018
_OVER:
×
1019
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1020
    mError("xnode:%d, failed to drop since %s", dropReq.xnodeId, tstrerror(code));
×
1021
  }
1022
  if (pJson != NULL) {
×
1023
    tjsonDelete(pJson);
×
1024
  }
1025
  if (pObj != NULL) {
×
1026
    mndReleaseXnode(pMnode, pObj);
×
1027
  }
1028
  tFreeSMDropXnodeReq(&dropReq);
×
1029
  TAOS_RETURN(code);
×
1030
}
1031

1032
static int32_t mndProcessDrainXnodeReq(SRpcMsg *pReq) {
×
1033
  SMnode         *pMnode = pReq->info.node;
×
1034
  int32_t         code = 0;
×
1035
  SXnodeObj      *pObj = NULL;
×
1036
  SMDrainXnodeReq drainReq = {0};
×
1037
  SJson          *pJson = NULL;
×
1038
  SJson          *postContent = NULL;
×
1039
  char           *pContStr = NULL;
×
1040

1041
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_DRAIN_XNODE);
×
1042
  if (code != TSDB_CODE_SUCCESS) {
×
1043
    mError("failed check permission for drain xnode, code:%s", tstrerror(code));
×
1044
    goto _OVER;
×
1045
  }
1046
  TAOS_CHECK_GOTO(tDeserializeSMDrainXnodeReq(pReq->pCont, pReq->contLen, &drainReq), NULL, _OVER);
×
1047
  mDebug("xnode:%d, start to drain", drainReq.xnodeId);
×
1048

1049
  if (drainReq.xnodeId <= 0) {
×
1050
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
1051
    goto _OVER;
×
1052
  }
1053

1054
  pObj = mndAcquireXnode(pMnode, drainReq.xnodeId);
×
1055
  if (pObj == NULL) {
×
1056
    code = TSDB_CODE_MND_XNODE_NOT_EXIST;
×
1057
    goto _OVER;
×
1058
  }
1059

1060
  // send request
1061
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
1062
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/xnode/drain/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
1063
  postContent = tjsonCreateObject();
×
1064
  if (postContent == NULL) {
×
1065
    code = terrno;
×
1066
    goto _OVER;
×
1067
  }
1068
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "xnode", pObj->url), NULL, _OVER);
×
1069
  pContStr = tjsonToString(postContent);
×
1070
  if (pContStr == NULL) {
×
1071
    code = terrno;
×
1072
    goto _OVER;
×
1073
  }
1074
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, defaultTimeout, pContStr, strlen(pContStr));
×
1075

1076
  code = mndDrainXnode(pMnode, pReq, pObj);
×
1077
  if (code == 0) {
×
1078
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1079
    goto _OVER;
×
1080
  }
1081

1082
_OVER:
×
1083
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1084
    mError("xnode:%d, failed to drain since %s", drainReq.xnodeId, tstrerror(code));
×
1085
  }
1086

1087
  if (postContent != NULL) {
×
1088
    tjsonDelete(postContent);
×
1089
  }
1090
  if (pContStr != NULL) {
×
1091
    taosMemoryFree(pContStr);
×
1092
  }
1093
  if (pJson != NULL) {
×
1094
    tjsonDelete(pJson);
×
1095
  }
1096
  mndReleaseXnode(pMnode, pObj);
×
1097
  tFreeSMDrainXnodeReq(&drainReq);
×
1098
  TAOS_RETURN(code);
×
1099
}
1100

1101
static int32_t mndRetrieveXnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
1102
  SMnode    *pMnode = pReq->info.node;
×
1103
  SSdb      *pSdb = pMnode->pSdb;
×
1104
  int32_t    numOfRows = 0;
×
1105
  int32_t    cols = 0;
×
1106
  SXnodeObj *pObj = NULL;
×
1107
  char       buf[TSDB_XNODE_URL_LEN + VARSTR_HEADER_SIZE] = {0};
×
1108
  char       status[TSDB_XNODE_STATUS_LEN] = {0};
×
1109
  int32_t    code = 0;
×
1110
  mDebug("show.type:%d, %s:%d: retrieve xnodes with rows: %d", pShow->type, __FILE__, __LINE__, rows);
×
1111

1112
  while (numOfRows < rows) {
×
1113
    pShow->pIter = sdbFetch(pSdb, SDB_XNODE, pShow->pIter, (void **)&pObj);
×
1114
    if (pShow->pIter == NULL) break;
×
1115

1116
    cols = 0;
×
1117
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1118
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
×
1119
    if (code != 0) goto _end;
×
1120

1121
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->url, pShow->pMeta->pSchemas[cols].bytes);
×
1122
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1123
    code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
1124
    if (code != 0) goto _end;
×
1125

1126
    if (mndGetXnodeStatus(pObj, status, TSDB_XNODE_STATUS_LEN) == 0) {
×
1127
      STR_TO_VARSTR(buf, status);
×
1128
    } else {
1129
      mDebug("xnode:%d, status request err: %s", pObj->id, tstrerror(terrno));
×
1130
      STR_TO_VARSTR(buf, "offline");
×
1131
    }
1132
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1133
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
1134
    if (code != 0) goto _end;
×
1135

1136
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1137
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createTime, false);
×
1138
    if (code != 0) goto _end;
×
1139

1140
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
1141
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->updateTime, false);
×
1142
    if (code != 0) goto _end;
×
1143

1144
    numOfRows++;
×
1145
    sdbRelease(pSdb, pObj);
×
1146
  }
1147

1148
_end:
×
1149
  if (code != 0) sdbRelease(pSdb, pObj);
×
1150

1151
  pShow->numOfRows += numOfRows;
×
1152
  return numOfRows;
×
1153
}
1154

1155
static void mndCancelGetNextXnode(SMnode *pMnode, void *pIter) {
×
1156
  SSdb *pSdb = pMnode->pSdb;
×
1157
  sdbCancelFetchByType(pSdb, pIter, SDB_XNODE);
×
1158
}
×
1159

1160
/** xnode task section **/
1161

1162
static SXnodeTaskObj *mndAcquireXnodeTaskById(SMnode *pMnode, int32_t tid) {
×
1163
  SSdb *pSdb = pMnode->pSdb;
×
1164

1165
  void *pIter = NULL;
×
1166
  while (1) {
×
1167
    SXnodeTaskObj *pTask = NULL;
×
1168
    pIter = sdbFetch(pSdb, SDB_XNODE_TASK, pIter, (void **)&pTask);
×
1169
    if (pIter == NULL) break;
×
1170

1171
    if (pTask->id == tid) {
×
1172
      sdbCancelFetch(pSdb, pIter);
×
1173
      return pTask;
×
1174
    }
1175

1176
    sdbRelease(pSdb, pTask);
×
1177
  }
1178

1179
  mDebug("xnode task:%d, not found", tid);
×
1180
  terrno = TSDB_CODE_MND_XNODE_TASK_NOT_EXIST;
×
1181
  return NULL;
×
1182
}
1183
static SXnodeTaskObj *mndAcquireXnodeTaskByName(SMnode *pMnode, const char *name) {
×
1184
  SSdb *pSdb = pMnode->pSdb;
×
1185

1186
  void *pIter = NULL;
×
1187
  while (1) {
×
1188
    SXnodeTaskObj *pTask = NULL;
×
1189
    pIter = sdbFetch(pSdb, SDB_XNODE_TASK, pIter, (void **)&pTask);
×
1190
    if (pIter == NULL) break;
×
1191
    if (pTask->name == NULL) {
×
1192
      continue;
×
1193
    }
1194

1195
    if (strcasecmp(name, pTask->name) == 0) {
×
1196
      sdbCancelFetch(pSdb, pIter);
×
1197
      return pTask;
×
1198
    }
1199

1200
    sdbRelease(pSdb, pTask);
×
1201
  }
1202

1203
  mDebug("xnode task:%s, not found", name);
×
1204
  terrno = TSDB_CODE_MND_XNODE_TASK_NOT_EXIST;
×
1205
  return NULL;
×
1206
}
1207

1208
static void mndFreeXnodeTask(SXnodeTaskObj *pObj) {
×
1209
  taosMemoryFreeClear(pObj->name);
×
1210
  taosMemoryFreeClear(pObj->sourceDsn);
×
1211
  taosMemoryFreeClear(pObj->sinkDsn);
×
1212
  taosMemoryFreeClear(pObj->parser);
×
1213
  taosMemoryFreeClear(pObj->reason);
×
1214
  taosMemoryFreeClear(pObj->status);
×
1215
  taosMemoryFreeClear(pObj->createdBy);
×
1216
  taosMemoryFreeClear(pObj->labels);
×
1217
}
×
1218

1219
SSdbRaw *mndXnodeTaskActionEncode(SXnodeTaskObj *pObj) {
6✔
1220
  int32_t code = 0;
6✔
1221
  int32_t lino = 0;
6✔
1222
  terrno = TSDB_CODE_OUT_OF_MEMORY;
6✔
1223
  if (NULL == pObj) {
6✔
1224
    terrno = TSDB_CODE_INVALID_PARA;
2✔
1225
    return NULL;
2✔
1226
  }
1227

1228
  int32_t totalStrLen = pObj->nameLen + pObj->sourceDsnLen + pObj->sinkDsnLen + pObj->parserLen + pObj->reasonLen +
4✔
1229
                        pObj->statusLen + pObj->createdByLen + pObj->labelsLen;
4✔
1230
  int32_t rawDataLen = sizeof(SXnodeTaskObj) + TSDB_XNODE_RESERVE_SIZE + totalStrLen;
4✔
1231

1232
  SSdbRaw *pRaw = sdbAllocRaw(SDB_XNODE_TASK, TSDB_XNODE_VER_NUMBER, rawDataLen);
4✔
1233
  if (pRaw == NULL) goto _OVER;
4✔
1234

1235
  int32_t dataPos = 0;
4✔
1236
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
4✔
1237
  SDB_SET_INT64(pRaw, dataPos, pObj->createTime, _OVER)
4✔
1238
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
4✔
1239
  SDB_SET_INT32(pRaw, dataPos, pObj->statusLen, _OVER)
4✔
1240
  SDB_SET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
4✔
1241
  SDB_SET_INT32(pRaw, dataPos, pObj->via, _OVER)
4✔
1242
  SDB_SET_INT32(pRaw, dataPos, pObj->xnodeId, _OVER)
4✔
1243
  SDB_SET_INT32(pRaw, dataPos, pObj->nameLen, _OVER)
4✔
1244
  SDB_SET_BINARY(pRaw, dataPos, pObj->name, pObj->nameLen, _OVER)
4✔
1245
  SDB_SET_INT32(pRaw, dataPos, pObj->sourceType, _OVER)
4✔
1246
  SDB_SET_INT32(pRaw, dataPos, pObj->sourceDsnLen, _OVER)
4✔
1247
  SDB_SET_BINARY(pRaw, dataPos, pObj->sourceDsn, pObj->sourceDsnLen, _OVER)
4✔
1248
  SDB_SET_INT32(pRaw, dataPos, pObj->sinkType, _OVER)
4✔
1249
  SDB_SET_INT32(pRaw, dataPos, pObj->sinkDsnLen, _OVER)
4✔
1250
  SDB_SET_BINARY(pRaw, dataPos, pObj->sinkDsn, pObj->sinkDsnLen, _OVER)
4✔
1251
  SDB_SET_INT32(pRaw, dataPos, pObj->parserLen, _OVER)
4✔
1252
  SDB_SET_BINARY(pRaw, dataPos, pObj->parser, pObj->parserLen, _OVER)
4✔
1253
  SDB_SET_INT32(pRaw, dataPos, pObj->reasonLen, _OVER)
4✔
1254
  SDB_SET_BINARY(pRaw, dataPos, pObj->reason, pObj->reasonLen, _OVER)
4✔
1255
  SDB_SET_INT32(pRaw, dataPos, pObj->createdByLen, _OVER)
4✔
1256
  SDB_SET_BINARY(pRaw, dataPos, pObj->createdBy, pObj->createdByLen, _OVER)
4✔
1257
  SDB_SET_INT32(pRaw, dataPos, pObj->labelsLen, _OVER)
4✔
1258
  SDB_SET_BINARY(pRaw, dataPos, pObj->labels, pObj->labelsLen, _OVER)
4✔
1259

1260
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
4✔
1261

1262
  terrno = 0;
4✔
1263

1264
_OVER:
4✔
1265
  if (terrno != 0) {
4✔
1266
    mError("xnode task:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
1267
    sdbFreeRaw(pRaw);
×
1268
    return NULL;
×
1269
  }
1270

1271
  mTrace("xnode task:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
4✔
1272
  return pRaw;
4✔
1273
}
1274

1275
SSdbRow *mndXnodeTaskActionDecode(SSdbRaw *pRaw) {
6✔
1276
  int32_t code = 0;
6✔
1277
  int32_t lino = 0;
6✔
1278
  terrno = TSDB_CODE_OUT_OF_MEMORY;
6✔
1279
  SSdbRow       *pRow = NULL;
6✔
1280
  SXnodeTaskObj *pObj = NULL;
6✔
1281

1282
  if (NULL == pRaw) {
6✔
1283
    terrno = TSDB_CODE_INVALID_PARA;
2✔
1284
    return NULL;
2✔
1285
  }
1286

1287
  int8_t sver = 0;
4✔
1288
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
4✔
1289

1290
  if (sver != TSDB_XNODE_VER_NUMBER) {
4✔
1291
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
1292
    goto _OVER;
×
1293
  }
1294

1295
  pRow = sdbAllocRow(sizeof(SXnodeTaskObj));
4✔
1296
  if (pRow == NULL) goto _OVER;
4✔
1297

1298
  pObj = sdbGetRowObj(pRow);
4✔
1299
  if (pObj == NULL) goto _OVER;
4✔
1300

1301
  int32_t dataPos = 0;
4✔
1302
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
4✔
1303
  SDB_GET_INT64(pRaw, dataPos, &pObj->createTime, _OVER)
4✔
1304
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
4✔
1305
  SDB_GET_INT32(pRaw, dataPos, &pObj->statusLen, _OVER)
4✔
1306
  if (pObj->statusLen > 0) {
4✔
1307
    pObj->status = taosMemoryCalloc(pObj->statusLen + 1, 1);
4✔
1308
    if (pObj->status == NULL) goto _OVER;
4✔
1309
    SDB_GET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
4✔
1310
  }
1311

1312
  SDB_GET_INT32(pRaw, dataPos, &pObj->via, _OVER)
4✔
1313
  SDB_GET_INT32(pRaw, dataPos, &pObj->xnodeId, _OVER)
4✔
1314

1315
  SDB_GET_INT32(pRaw, dataPos, &pObj->nameLen, _OVER)
4✔
1316
  if (pObj->nameLen > 0) {
4✔
1317
    pObj->name = taosMemoryCalloc(pObj->nameLen + 1, 1);
4✔
1318
    if (pObj->name == NULL) goto _OVER;
4✔
1319
    SDB_GET_BINARY(pRaw, dataPos, pObj->name, pObj->nameLen, _OVER)
4✔
1320
  }
1321

1322
  SDB_GET_INT32(pRaw, dataPos, &pObj->sourceType, _OVER)
4✔
1323
  SDB_GET_INT32(pRaw, dataPos, &pObj->sourceDsnLen, _OVER)
4✔
1324
  if (pObj->sourceDsnLen > 0) {
4✔
1325
    pObj->sourceDsn = taosMemoryCalloc(pObj->sourceDsnLen + 1, 1);
4✔
1326
    if (pObj->sourceDsn == NULL) goto _OVER;
4✔
1327
    SDB_GET_BINARY(pRaw, dataPos, pObj->sourceDsn, pObj->sourceDsnLen, _OVER)
4✔
1328
  }
1329

1330
  SDB_GET_INT32(pRaw, dataPos, &pObj->sinkType, _OVER)
4✔
1331
  SDB_GET_INT32(pRaw, dataPos, &pObj->sinkDsnLen, _OVER)
4✔
1332
  if (pObj->sinkDsnLen > 0) {
4✔
1333
    pObj->sinkDsn = taosMemoryCalloc(pObj->sinkDsnLen + 1, 1);
4✔
1334
    if (pObj->sinkDsn == NULL) goto _OVER;
4✔
1335
    SDB_GET_BINARY(pRaw, dataPos, pObj->sinkDsn, pObj->sinkDsnLen, _OVER)
4✔
1336
  }
1337

1338
  SDB_GET_INT32(pRaw, dataPos, &pObj->parserLen, _OVER)
4✔
1339
  if (pObj->parserLen > 0) {
4✔
1340
    pObj->parser = taosMemoryCalloc(pObj->parserLen + 1, 1);
2✔
1341
    if (pObj->parser == NULL) goto _OVER;
2✔
1342
    SDB_GET_BINARY(pRaw, dataPos, pObj->parser, pObj->parserLen, _OVER)
2✔
1343
  }
1344

1345
  SDB_GET_INT32(pRaw, dataPos, &pObj->reasonLen, _OVER)
4✔
1346
  if (pObj->reasonLen > 0) {
4✔
1347
    pObj->reason = taosMemoryCalloc(pObj->reasonLen + 1, 1);
2✔
1348
    if (pObj->reason == NULL) goto _OVER;
2✔
1349
    SDB_GET_BINARY(pRaw, dataPos, pObj->reason, pObj->reasonLen, _OVER)
2✔
1350
  }
1351

1352
  SDB_GET_INT32(pRaw, dataPos, &pObj->createdByLen, _OVER)
4✔
1353
  if (pObj->createdByLen > 0) {
4✔
1354
    pObj->createdBy = taosMemoryCalloc(pObj->createdByLen + 1, 1);
×
1355
    if (pObj->createdBy == NULL) goto _OVER;
×
1356
    SDB_GET_BINARY(pRaw, dataPos, pObj->createdBy, pObj->createdByLen, _OVER)
×
1357
  }
1358

1359
  SDB_GET_INT32(pRaw, dataPos, &pObj->labelsLen, _OVER)
4✔
1360
  if (pObj->labelsLen > 0) {
4✔
1361
    pObj->labels = taosMemoryCalloc(pObj->labelsLen + 1, 1);
×
1362
    if (pObj->labels == NULL) goto _OVER;
×
1363
    SDB_GET_BINARY(pRaw, dataPos, pObj->labels, pObj->labelsLen, _OVER)
×
1364
  }
1365

1366
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
4✔
1367

1368
  terrno = 0;
4✔
1369

1370
_OVER:
4✔
1371
  if (terrno != 0) {
4✔
1372
    mError("xnode task:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
1373
    if (pObj != NULL) {
×
1374
      taosMemoryFreeClear(pObj->name);
×
1375
      taosMemoryFreeClear(pObj->sourceDsn);
×
1376
      taosMemoryFreeClear(pObj->sinkDsn);
×
1377
      taosMemoryFreeClear(pObj->parser);
×
1378
      taosMemoryFreeClear(pObj->reason);
×
1379
      taosMemoryFreeClear(pObj->status);
×
1380
    }
1381
    taosMemoryFreeClear(pRow);
×
1382
    return NULL;
×
1383
  }
1384

1385
  mTrace("xnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
4✔
1386
  return pRow;
4✔
1387
}
1388

1389
int32_t mndXnodeTaskActionInsert(SSdb *pSdb, SXnodeTaskObj *pObj) {
×
1390
  mDebug("xtask:%d, perform insert action, row:%p", pObj->id, pObj);
×
1391
  return 0;
×
1392
}
1393

1394
int32_t mndXnodeTaskActionDelete(SSdb *pSdb, SXnodeTaskObj *pObj) {
×
1395
  mDebug("xtask:%d, perform delete action, row:%p", pObj->id, pObj);
×
1396
  mndFreeXnodeTask(pObj);
×
1397
  return 0;
×
1398
}
1399

1400
int32_t mndXnodeTaskActionUpdate(SSdb *pSdb, SXnodeTaskObj *pOld, SXnodeTaskObj *pNew) {
×
1401
  mDebug("xtask:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
×
1402

1403
  taosWLockLatch(&pOld->lock);
×
1404
  pOld->via = pNew->via;
×
1405
  pOld->xnodeId = pNew->xnodeId;
×
1406
  swapFields(&pNew->statusLen, &pNew->status, &pOld->statusLen, &pOld->status);
×
1407
  swapFields(&pNew->nameLen, &pNew->name, &pOld->nameLen, &pOld->name);
×
1408
  swapFields(&pNew->sourceDsnLen, &pNew->sourceDsn, &pOld->sourceDsnLen, &pOld->sourceDsn);
×
1409
  swapFields(&pNew->sinkDsnLen, &pNew->sinkDsn, &pOld->sinkDsnLen, &pOld->sinkDsn);
×
1410
  swapFields(&pNew->parserLen, &pNew->parser, &pOld->parserLen, &pOld->parser);
×
1411
  swapFields(&pNew->reasonLen, &pNew->reason, &pOld->reasonLen, &pOld->reason);
×
1412
  swapFields(&pNew->labelsLen, &pNew->labels, &pOld->labelsLen, &pOld->labels);
×
1413
  if (pNew->updateTime > pOld->updateTime) {
×
1414
    pOld->updateTime = pNew->updateTime;
×
1415
  }
1416
  taosWUnLockLatch(&pOld->lock);
×
1417
  return 0;
×
1418
}
1419

1420
static int32_t mndSetCreateXnodeTaskRedoLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
1421
  int32_t  code = 0;
×
1422
  SSdbRaw *pRedoRaw = mndXnodeTaskActionEncode(pObj);
×
1423
  if (pRedoRaw == NULL) {
×
1424
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1425
    if (terrno != 0) code = terrno;
×
1426
    TAOS_RETURN(code);
×
1427
  }
1428
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
1429
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
1430
  TAOS_RETURN(code);
×
1431
}
1432

1433
static int32_t mndSetCreateXnodeTaskUndoLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
1434
  int32_t  code = 0;
×
1435
  SSdbRaw *pUndoRaw = mndXnodeTaskActionEncode(pObj);
×
1436
  if (pUndoRaw == NULL) {
×
1437
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1438
    if (terrno != 0) code = terrno;
×
1439
    TAOS_RETURN(code);
×
1440
  }
1441
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
1442
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
1443
  TAOS_RETURN(code);
×
1444
}
1445

1446
static int32_t mndSetCreateXnodeTaskCommitLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
1447
  int32_t  code = 0;
×
1448
  SSdbRaw *pCommitRaw = mndXnodeTaskActionEncode(pObj);
×
1449
  if (pCommitRaw == NULL) {
×
1450
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1451
    if (terrno != 0) code = terrno;
×
1452
    TAOS_RETURN(code);
×
1453
  }
1454
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
1455
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
1456
  TAOS_RETURN(code);
×
1457
}
1458
void mndReleaseXnodeTask(SMnode *pMnode, SXnodeTaskObj *pObj) {
×
1459
  SSdb *pSdb = pMnode->pSdb;
×
1460
  sdbRelease(pSdb, pObj);
×
1461
}
×
1462

1463
static const char *getXTaskOptionByName(xTaskOptions *pOptions, const char *name) {
×
1464
  if (pOptions == NULL || name == NULL) return NULL;
×
1465
  for (int32_t i = 0; i < pOptions->optionsNum; ++i) {
×
1466
    CowStr option = pOptions->options[i];
×
1467
    if (option.ptr != NULL && strncasecmp(option.ptr, name, strlen(name)) == 0 && option.ptr[strlen(name)] == '=') {
×
1468
      return option.ptr + strlen(name) + 1;
×
1469
    }
1470
  }
1471
  return NULL;
×
1472
}
1473

1474
static int32_t mndCreateXnodeTask(SMnode *pMnode, SRpcMsg *pReq, SMCreateXnodeTaskReq *pCreate) {
×
1475
  int32_t code = -1;
×
1476
  STrans *pTrans = NULL;
×
1477

1478
  SXnodeTaskObj xnodeObj = {0};
×
1479
  xnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_XNODE_TASK);
×
1480
  xnodeObj.createTime = taosGetTimestampMs();
×
1481
  xnodeObj.updateTime = xnodeObj.createTime;
×
1482
  xnodeObj.via = pCreate->options.via;
×
1483
  xnodeObj.xnodeId = pCreate->xnodeId;
×
1484

1485
  xnodeObj.nameLen = pCreate->name.len + 1;
×
1486
  xnodeObj.name = taosMemoryCalloc(1, xnodeObj.nameLen);
×
1487
  if (xnodeObj.name == NULL) goto _OVER;
×
1488
  (void)memcpy(xnodeObj.name, pCreate->name.ptr, pCreate->name.len);
×
1489

1490
  xnodeObj.sourceType = pCreate->source.type;
×
1491
  xnodeObj.sourceDsnLen = pCreate->source.cstr.len + 1;
×
1492
  xnodeObj.sourceDsn = taosMemoryCalloc(1, xnodeObj.sourceDsnLen);
×
1493
  if (xnodeObj.sourceDsn == NULL) goto _OVER;
×
1494
  (void)memcpy(xnodeObj.sourceDsn, pCreate->source.cstr.ptr, pCreate->source.cstr.len);
×
1495

1496
  xnodeObj.sinkType = pCreate->sink.type;
×
1497
  xnodeObj.sinkDsnLen = pCreate->sink.cstr.len + 1;
×
1498
  xnodeObj.sinkDsn = taosMemoryCalloc(1, xnodeObj.sinkDsnLen);
×
1499
  if (xnodeObj.sinkDsn == NULL) goto _OVER;
×
1500
  (void)memcpy(xnodeObj.sinkDsn, pCreate->sink.cstr.ptr, pCreate->sink.cstr.len);
×
1501

1502
  xnodeObj.parserLen = pCreate->options.parser.len + 1;
×
1503
  if (pCreate->options.parser.ptr != NULL) {
×
1504
    xnodeObj.parser = taosMemoryCalloc(1, xnodeObj.parserLen);
×
1505
    if (xnodeObj.parser == NULL) goto _OVER;
×
1506
    (void)memcpy(xnodeObj.parser, pCreate->options.parser.ptr, pCreate->options.parser.len);
×
1507
  }
1508

1509
  const char *status = getXTaskOptionByName(&pCreate->options, "status");
×
1510
  if (status != NULL) {
×
1511
    xnodeObj.statusLen = strlen(status) + 1;
×
1512
    xnodeObj.status = taosMemoryCalloc(1, xnodeObj.statusLen);
×
1513
    if (xnodeObj.status == NULL) goto _OVER;
×
1514
    (void)memcpy(xnodeObj.status, status, xnodeObj.statusLen - 1);
×
1515
  }
1516

1517
  xnodeObj.createdByLen = strlen(pReq->info.conn.user) + 1;
×
1518
  xnodeObj.createdBy = taosMemoryCalloc(1, xnodeObj.createdByLen);
×
1519
  if (xnodeObj.createdBy == NULL) goto _OVER;
×
1520
  (void)memcpy(xnodeObj.createdBy, pReq->info.conn.user, xnodeObj.createdByLen - 1);
×
1521

1522
  const char *labels = getXTaskOptionByName(&pCreate->options, "labels");
×
1523
  if (labels != NULL) {
×
1524
    xnodeObj.labelsLen = strlen(labels) + 1;
×
1525
    xnodeObj.labels = taosMemoryCalloc(1, xnodeObj.labelsLen);
×
1526
    if (xnodeObj.labels == NULL) goto _OVER;
×
1527
    (void)memcpy(xnodeObj.labels, labels, xnodeObj.labelsLen - 1);
×
1528
  }
1529

1530
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-xnode-task");
×
1531
  if (pTrans == NULL) {
×
1532
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1533
    if (terrno != 0) {
×
1534
      code = terrno;
×
1535
    }
1536
    mError("failed to create transaction for xnode-task:%s, code:0x%x:%s", pCreate->name.ptr, code, tstrerror(code));
×
1537
    goto _OVER;
×
1538
  }
1539
  mndTransSetSerial(pTrans);
×
1540

1541
  mDebug("trans:%d, used to create xnode task:%s as task:%d", pTrans->id, pCreate->name.ptr, xnodeObj.id);
×
1542

1543
  TAOS_CHECK_GOTO(mndSetCreateXnodeTaskRedoLogs(pTrans, &xnodeObj), NULL, _OVER);
×
1544
  TAOS_CHECK_GOTO(mndSetCreateXnodeTaskUndoLogs(pTrans, &xnodeObj), NULL, _OVER);
×
1545
  TAOS_CHECK_GOTO(mndSetCreateXnodeTaskCommitLogs(pTrans, &xnodeObj), NULL, _OVER);
×
1546
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1547

1548
  code = 0;
×
1549

1550
_OVER:
×
1551
  mndFreeXnodeTask(&xnodeObj);
×
1552
  mndTransDrop(pTrans);
×
1553
  TAOS_RETURN(code);
×
1554
}
1555

1556
// Helper function to parse and validate the request
1557
static int32_t mndValidateCreateXnodeTaskReq(SRpcMsg *pReq, SMCreateXnodeTaskReq *pCreateReq) {
×
1558
  int32_t code = 0;
×
1559
  SJson  *pJson = NULL;
×
1560
  SJson  *postContent = NULL;
×
1561
  char   *srcDsn = NULL;
×
1562
  char   *sinkDsn = NULL;
×
1563
  char   *parser = NULL;
×
1564
  char   *pContStr = NULL;
×
1565

1566
  // from, to, parser check
1567
  char xnodeUrl[TSDB_XNODE_URL_LEN] = {0};
×
1568
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/task/check", XNODED_PIPE_SOCKET_URL);
×
1569
  postContent = tjsonCreateObject();
×
1570
  if (postContent == NULL) {
×
1571
    code = terrno;
×
1572
    goto _OVER;
×
1573
  }
1574
  srcDsn = taosStrndupi(pCreateReq->source.cstr.ptr, (int64_t)pCreateReq->source.cstr.len);
×
1575
  if (srcDsn == NULL) {
×
1576
    code = terrno;
×
1577
    goto _OVER;
×
1578
  }
1579
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "from", srcDsn), NULL, _OVER);
×
1580

1581
  sinkDsn = taosStrndupi(pCreateReq->sink.cstr.ptr, (int64_t)pCreateReq->sink.cstr.len);
×
1582
  if (sinkDsn == NULL) {
×
1583
    code = terrno;
×
1584
    goto _OVER;
×
1585
  }
1586
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "to", sinkDsn), NULL, _OVER);
×
1587

1588
  if (pCreateReq->options.parser.len > 0 && pCreateReq->options.parser.ptr != NULL) {
×
1589
    parser = taosStrndupi(pCreateReq->options.parser.ptr, (int64_t)pCreateReq->options.parser.len);
×
1590
    if (parser == NULL) {
×
1591
      code = terrno;
×
1592
      goto _OVER;
×
1593
    }
1594
    TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "parser", parser), NULL, _OVER);
×
1595
  }
1596

1597
  if (pCreateReq->xnodeId > 0) {
×
1598
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(postContent, "xnode_id", (double)pCreateReq->xnodeId), NULL, _OVER);
×
1599
  }
1600

1601
  if (pCreateReq->options.via > 0) {
×
1602
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(postContent, "via", (double)pCreateReq->options.via), NULL, _OVER);
×
1603
  }
1604

1605
  const char *labels = getXTaskOptionByName(&pCreateReq->options, "labels");
×
1606
  if (labels != NULL) {
×
1607
    TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "labels", labels), NULL, _OVER);
×
1608
  }
1609

1610
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "created_by", pReq->info.conn.user), NULL, _OVER);
×
1611

1612
  pContStr = tjsonToUnformattedString(postContent);
×
1613
  if (pContStr == NULL) {
×
1614
    code = terrno;
×
1615
    goto _OVER;
×
1616
  }
1617

1618
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, 60000, pContStr, strlen(pContStr));
×
1619
  if (pJson == NULL) {
×
1620
    code = terrno;
×
1621
    goto _OVER;
×
1622
  }
1623
  SJson *errorJson = tjsonGetObjectItem(pJson, "__inner_error");
×
1624
  if (errorJson != NULL) {
×
1625
    code = TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR;
×
1626
    char* pValueString = ((cJSON*)errorJson)->valuestring;
×
1627
    if (NULL == pValueString) {
×
1628
      mError("should not failed to get __inner_error message, task name:%s", pCreateReq->name.ptr);
×
1629
      goto _OVER;
×
1630
    }
1631
    //handle response
1632
    int32_t contLen = strlen(pValueString) + strlen(tstrerror(code)) + 32;
×
1633
    void *pRsp = rpcMallocCont(contLen);
×
1634
    if (pRsp == NULL) {
×
1635
      TAOS_CHECK_GOTO(terrno, NULL, _OVER);
×
1636
    }
1637
    pReq->info.rspLen = contLen;
×
1638
    pReq->info.rsp = pRsp;
×
1639
    snprintf(pReq->info.rsp, contLen, "%s, since: %s", tstrerror(code), pValueString);
×
1640
    goto _OVER;
×
1641
  }
1642

1643
  // todo: only4test
1644
  // (void)mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, 60000, pContStr, strlen(pContStr));
1645
  // code = TSDB_CODE_SUCCESS;
1646

1647
_OVER:
×
1648
  if (srcDsn != NULL) taosMemoryFreeClear(srcDsn);
×
1649
  if (sinkDsn != NULL) taosMemoryFreeClear(sinkDsn);
×
1650
  if (parser != NULL) taosMemoryFreeClear(parser);
×
1651
  if (pContStr != NULL) taosMemoryFreeClear(pContStr);
×
1652
  if (postContent != NULL) tjsonDelete(postContent);
×
1653
  if (pJson != NULL) tjsonDelete(pJson);
×
1654

1655
  TAOS_RETURN(code);
×
1656
}
1657

1658
// Helper function to check if xnode task already exists
1659
static int32_t mndCheckXnodeTaskExists(SMnode *pMnode, const char *name) {
×
1660
  SXnodeTaskObj *pObj = mndAcquireXnodeTaskByName(pMnode, name);
×
1661
  if (pObj != NULL) {
×
1662
    mError("xnode task:%s already exists", name);
×
1663
    mndReleaseXnodeTask(pMnode, pObj);
×
1664
    return TSDB_CODE_MND_XNODE_TASK_ALREADY_EXIST;
×
1665
  }
1666
  return TSDB_CODE_SUCCESS;
×
1667
}
1668

1669
// Helper function to handle the creation result
1670
static int32_t mndHandleCreateXnodeTaskResult(int32_t createCode) {
×
1671
  if (createCode == 0) {
×
1672
    return TSDB_CODE_ACTION_IN_PROGRESS;
×
1673
  }
1674
  return createCode;
×
1675
}
1676

1677
static int32_t mndProcessCreateXnodeTaskReq(SRpcMsg *pReq) {
×
1678
  mDebug("xnode create task request received, contLen:%d\n", pReq->contLen);
×
1679
  SMnode              *pMnode = pReq->info.node;
×
1680
  int32_t              code = 0;
×
1681
  SMCreateXnodeTaskReq createReq = {0};
×
1682

1683
  // Step 1: Validate permissions
1684
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_CREATE_XNODE_TASK);
×
1685
  if (code != TSDB_CODE_SUCCESS) {
×
1686
    mError("failed check permission for create xnode task, code:%s", tstrerror(code));
×
1687
    goto _OVER;
×
1688
  }
1689

1690
  code = tDeserializeSMCreateXnodeTaskReq(pReq->pCont, pReq->contLen, &createReq);
×
1691
  if (code != 0) {
×
1692
    mError("failed to deserialize create xnode task request, code:%s", tstrerror(code));
×
1693
    TAOS_RETURN(code);
×
1694
  }
1695

1696
  // Step 2: Check if task already exists
1697
  TAOS_CHECK_GOTO(mndCheckXnodeTaskExists(pMnode, createReq.name.ptr), NULL, _OVER);
×
1698

1699
  // Step 3: Parse and validate request
1700
  TAOS_CHECK_GOTO(mndValidateCreateXnodeTaskReq(pReq, &createReq), NULL, _OVER);
×
1701

1702
  // Step 4: Create the xnode task
1703
  TAOS_CHECK_GOTO(mndCreateXnodeTask(pMnode, pReq, &createReq), NULL, _OVER);
×
1704
  TAOS_CHECK_GOTO(mndHandleCreateXnodeTaskResult(code), NULL, _OVER);
×
1705

1706
_OVER:
×
1707
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1708
    mError("xnode task:%s, failed to create since %s", createReq.name.ptr ? createReq.name.ptr : "unknown",
×
1709
           tstrerror(code));
1710
  }
1711
  tFreeSMCreateXnodeTaskReq(&createReq);
×
1712
  TAOS_RETURN(code);
×
1713
}
1714

1715
static int32_t httpStartXnodeTask(SXnodeTaskObj *pObj) {
×
1716
  int32_t code = 0;
×
1717
  struct {
1718
    char   xnodeUrl[TSDB_XNODE_URL_LEN + 1];
1719
    SJson *postContent;
1720
    SJson *pJson;
1721
    char  *pContStr;
1722
    char  *srcDsn;
1723
    char  *sinkDsn;
1724
    char  *parser;
1725
  } req = {0};
×
1726

1727
  snprintf(req.xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/task/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
1728
  req.postContent = tjsonCreateObject();
×
1729
  if (req.postContent == NULL) {
×
1730
    code = terrno;
×
1731
    goto _OVER;
×
1732
  }
1733
  req.srcDsn = taosStrndupi(pObj->sourceDsn, (int64_t)pObj->sourceDsnLen);
×
1734
  if (req.srcDsn == NULL) {
×
1735
    code = terrno;
×
1736
    goto _OVER;
×
1737
  }
1738
  TAOS_CHECK_GOTO(tjsonAddStringToObject(req.postContent, "from", req.srcDsn), NULL, _OVER);
×
1739

1740
  req.sinkDsn = taosStrndupi(pObj->sinkDsn, (int64_t)pObj->sinkDsnLen);
×
1741
  if (req.sinkDsn == NULL) {
×
1742
    code = terrno;
×
1743
    goto _OVER;
×
1744
  }
1745
  TAOS_CHECK_GOTO(tjsonAddStringToObject(req.postContent, "to", req.sinkDsn), NULL, _OVER);
×
1746

1747
  if (pObj->parserLen > 0) {
×
1748
    req.parser = taosStrndupi(pObj->parser, (int64_t)pObj->parserLen);
×
1749
    if (req.parser == NULL) {
×
1750
      code = terrno;
×
1751
      goto _OVER;
×
1752
    }
1753
    TAOS_CHECK_GOTO(tjsonAddStringToObject(req.postContent, "parser", req.parser), NULL, _OVER);
×
1754
  }
1755

1756
  if (pObj->xnodeId > 0) {
×
1757
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(req.postContent, "xnode_id", (double)pObj->xnodeId), NULL, _OVER);
×
1758
  }
1759

1760
  if (pObj->via > 0) {
×
1761
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(req.postContent, "via", (double)pObj->via), NULL, _OVER);
×
1762
  }
1763

1764
  if (pObj->createdBy != NULL) {
×
1765
    TAOS_CHECK_GOTO(tjsonAddStringToObject(req.postContent, "created_by", pObj->createdBy), NULL, _OVER);
×
1766
  }
1767

1768
  if (pObj->labels != NULL) {
×
1769
    TAOS_CHECK_GOTO(tjsonAddStringToObject(req.postContent, "labels", pObj->labels), NULL, _OVER);
×
1770
  }
1771

1772
  req.pContStr = tjsonToUnformattedString(req.postContent);
×
1773
  if (req.pContStr == NULL) {
×
1774
    code = terrno;
×
1775
    goto _OVER;
×
1776
  }
1777
  mDebug("start xnode post content:%s", req.pContStr);
×
1778
  req.pJson = mndSendReqRetJson(req.xnodeUrl, HTTP_TYPE_POST, defaultTimeout, req.pContStr, strlen(req.pContStr));
×
1779

1780
_OVER:
×
1781
  if (req.pContStr != NULL) taosMemoryFreeClear(req.pContStr);
×
1782
  if (req.postContent != NULL) tjsonDelete(req.postContent);
×
1783
  if (req.pJson != NULL) tjsonDelete(req.pJson);
×
1784
  if (req.srcDsn != NULL) taosMemoryFreeClear(req.srcDsn);
×
1785
  if (req.sinkDsn != NULL) taosMemoryFreeClear(req.sinkDsn);
×
1786
  if (req.parser != NULL) taosMemoryFreeClear(req.parser);
×
1787
  TAOS_RETURN(code);
×
1788
}
1789

1790
static int32_t mndProcessStartXnodeTaskReq(SRpcMsg *pReq) {
×
1791
  SMnode             *pMnode = pReq->info.node;
×
1792
  int32_t             code = 0;
×
1793
  SXnodeTaskObj      *pObj = NULL;
×
1794
  SMStartXnodeTaskReq startReq = {0};
×
1795
  SXnodeTaskObj      *pObjClone = NULL;
×
1796

1797
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_START_XNODE_TASK);
×
1798
  if (code != TSDB_CODE_SUCCESS) {
×
1799
    mError("failed check permission for start xnode task, code:%s", tstrerror(code));
×
1800
    goto _OVER;
×
1801
  }
1802
  TAOS_CHECK_GOTO(tDeserializeSMStartXnodeTaskReq(pReq->pCont, pReq->contLen, &startReq), NULL, _OVER);
×
1803
  mDebug("xnode start xnode task with tid:%d", startReq.tid);
×
1804

1805
  if (startReq.tid <= 0 && (startReq.name.len <= 0 || startReq.name.ptr == NULL)) {
×
1806
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
1807
    goto _OVER;
×
1808
  }
1809

1810
  if (startReq.tid > 0) {
×
1811
    pObj = mndAcquireXnodeTask(pMnode, startReq.tid);
×
1812
  } else {
1813
    pObj = mndAcquireXnodeTaskByName(pMnode, startReq.name.ptr);
×
1814
  }
1815
  if (pObj == NULL) {
×
1816
    code = terrno;
×
1817
    goto _OVER;
×
1818
  }
1819
  (void)httpStartXnodeTask(pObj);
×
1820

1821
_OVER:
×
1822
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1823
    mError("xnode task:%d, failed to start since %s", startReq.tid, tstrerror(code));
×
1824
  }
1825
  tFreeSMStartXnodeTaskReq(&startReq);
×
1826
  if (pObj != NULL) {
×
1827
    mndReleaseXnodeTask(pMnode, pObj);
×
1828
  }
1829
  if (pObjClone != NULL) {
×
1830
    mndFreeXnodeTask(pObjClone);
×
1831
    taosMemFree(pObjClone);
×
1832
  }
1833
  TAOS_RETURN(code);
×
1834
}
1835

1836
static int32_t mndProcessStopXnodeTaskReq(SRpcMsg *pReq) {
×
1837
  SMnode            *pMnode = pReq->info.node;
×
1838
  int32_t            code = -1;
×
1839
  SXnodeTaskObj     *pObj = NULL;
×
1840
  SMStopXnodeTaskReq stopReq = {0};
×
1841
  SJson             *pJson = NULL;
×
1842

1843
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_STOP_XNODE_TASK);
×
1844
  if (code != TSDB_CODE_SUCCESS) {
×
1845
    mError("failed check permission for stop xnode task, code:%s", tstrerror(code));
×
1846
    goto _OVER;
×
1847
  }
1848
  TAOS_CHECK_GOTO(tDeserializeSMStopXnodeTaskReq(pReq->pCont, pReq->contLen, &stopReq), NULL, _OVER);
×
1849
  mDebug("Stop xnode task with tid:%d", stopReq.tid);
×
1850

1851
  if (stopReq.tid <= 0 && (stopReq.name.len <= 0 || stopReq.name.ptr == NULL)) {
×
1852
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
1853
    goto _OVER;
×
1854
  }
1855

1856
  if (stopReq.tid > 0) {
×
1857
    pObj = mndAcquireXnodeTask(pMnode, stopReq.tid);
×
1858
  } else {
1859
    pObj = mndAcquireXnodeTaskByName(pMnode, stopReq.name.ptr);
×
1860
  }
1861
  if (pObj == NULL) {
×
1862
    code = terrno;
×
1863
    goto _OVER;
×
1864
  }
1865

1866
  // send request
1867
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
1868
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/task/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
1869
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_DELETE, defaultTimeout, NULL, 0);
×
1870

1871
_OVER:
×
1872
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
1873
    mError("xnode task:%d, failed to stop since %s", stopReq.tid, tstrerror(code));
×
1874
  }
1875
  if (pJson != NULL) {
×
1876
    tjsonDelete(pJson);
×
1877
  }
1878
  tFreeSMStopXnodeTaskReq(&stopReq);
×
1879
  TAOS_RETURN(code);
×
1880
}
1881

1882
static int32_t mndUpdateXnodeTask(SMnode *pMnode, SRpcMsg *pReq, const SXnodeTaskObj *pOld,
×
1883
                                  SMUpdateXnodeTaskReq *pUpdate) {
1884
  mDebug("xnode task:%d, start to update", pUpdate->tid);
×
1885
  int32_t      code = -1;
×
1886
  STrans       *pTrans = NULL;
×
1887
  SXnodeTaskObj taskObj = *pOld;
×
1888
  struct {
1889
    bool status;
1890
    bool name;
1891
    bool source;
1892
    bool sink;
1893
    bool parser;
1894
    bool reason;
1895
    bool labels;
1896
  } isChange = {0};
×
1897

1898
  if (pUpdate->via > 0) {
×
1899
    taskObj.via = pUpdate->via;
×
1900
  }
1901
  if (pUpdate->xnodeId > 0) {
×
1902
    taskObj.xnodeId = pUpdate->xnodeId;
×
1903
  }
1904
  if (pUpdate->status.ptr != NULL) {
×
1905
    taskObj.statusLen = pUpdate->status.len + 1;
×
1906
    taskObj.status = taosMemoryCalloc(1, taskObj.statusLen);
×
1907
    if (taskObj.status == NULL) {
×
1908
      code = terrno;
×
1909
      goto _OVER;
×
1910
    }
1911
    (void)memcpy(taskObj.status, pUpdate->status.ptr, pUpdate->status.len);
×
1912
    isChange.status = true;
×
1913
  }
1914
  if (pUpdate->updateName.ptr != NULL) {
×
1915
    taskObj.nameLen = pUpdate->updateName.len + 1;
×
1916
    taskObj.name = taosMemoryCalloc(1, taskObj.nameLen);
×
1917
    if (taskObj.name == NULL) {
×
1918
      code = terrno;
×
1919
      goto _OVER;
×
1920
    }
1921
    (void)memcpy(taskObj.name, pUpdate->updateName.ptr, pUpdate->updateName.len);
×
1922
    isChange.name = true;
×
1923
  }
1924
  if (pUpdate->source.cstr.ptr != NULL) {
×
1925
    taskObj.sourceType = pUpdate->source.type;
×
1926
    taskObj.sourceDsnLen = pUpdate->source.cstr.len + 1;
×
1927
    taskObj.sourceDsn = taosMemoryCalloc(1, taskObj.sourceDsnLen);
×
1928
    if (taskObj.sourceDsn == NULL) {
×
1929
      code = terrno;
×
1930
      goto _OVER;
×
1931
    }
1932
    (void)memcpy(taskObj.sourceDsn, pUpdate->source.cstr.ptr, pUpdate->source.cstr.len);
×
1933
    isChange.source = true;
×
1934
  }
1935
  if (pUpdate->sink.cstr.ptr != NULL) {
×
1936
    taskObj.sinkType = pUpdate->sink.type;
×
1937
    taskObj.sinkDsnLen = pUpdate->sink.cstr.len + 1;
×
1938
    taskObj.sinkDsn = taosMemoryCalloc(1, taskObj.sinkDsnLen);
×
1939
    if (taskObj.sinkDsn == NULL) {
×
1940
      code = terrno;
×
1941
      goto _OVER;
×
1942
    }
1943
    (void)memcpy(taskObj.sinkDsn, pUpdate->sink.cstr.ptr, pUpdate->sink.cstr.len);
×
1944
    isChange.sink = true;
×
1945
  }
1946
  if (pUpdate->parser.ptr != NULL) {
×
1947
    taskObj.parserLen = pUpdate->parser.len + 1;
×
1948
    taskObj.parser = taosMemoryCalloc(1, taskObj.parserLen);
×
1949
    if (taskObj.parser == NULL) {
×
1950
      code = terrno;
×
1951
      goto _OVER;
×
1952
    }
1953
    (void)memcpy(taskObj.parser, pUpdate->parser.ptr, pUpdate->parser.len);
×
1954
    isChange.parser = true;
×
1955
  }
1956
  if (pUpdate->reason.ptr != NULL) {
×
1957
    taskObj.reasonLen = pUpdate->reason.len + 1;
×
1958
    taskObj.reason = taosMemoryCalloc(1, taskObj.reasonLen);
×
1959
    if (taskObj.reason == NULL) {
×
1960
      code = terrno;
×
1961
      goto _OVER;
×
1962
    }
1963
    (void)memcpy(taskObj.reason, pUpdate->reason.ptr, pUpdate->reason.len);
×
1964
    isChange.reason = true;
×
1965
  }
1966
  if (pUpdate->labels.ptr != NULL) {
×
1967
    taskObj.labelsLen = pUpdate->labels.len + 1;
×
1968
    taskObj.labels = taosMemoryCalloc(1, taskObj.labelsLen);
×
1969
    if (taskObj.labels == NULL) {
×
1970
      code = terrno;
×
1971
      goto _OVER;
×
1972
    }
1973
    (void)memcpy(taskObj.labels, pUpdate->labels.ptr, pUpdate->labels.len);
×
1974
    isChange.labels = true;
×
1975
  }
1976
  taskObj.updateTime = taosGetTimestampMs();
×
1977

1978
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-xnode-task");
×
1979
  if (pTrans == NULL) {
×
1980
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
1981
    if (terrno != 0) code = terrno;
×
1982
    goto _OVER;
×
1983
  }
1984
  mInfo("trans:%d, used to update xnode task:%d", pTrans->id, taskObj.id);
×
1985

1986
  TAOS_CHECK_GOTO(mndSetCreateXnodeTaskCommitLogs(pTrans, &taskObj), NULL, _OVER);
×
1987
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1988
  code = 0;
×
1989

1990
_OVER:
×
1991
  if (NULL != taskObj.name && isChange.name) {
×
1992
    taosMemoryFree(taskObj.name);
×
1993
  }
1994
  if (NULL != taskObj.status && isChange.status) {
×
1995
    taosMemoryFree(taskObj.status);
×
1996
  }
1997
  if (NULL != taskObj.sourceDsn && isChange.source) {
×
1998
    taosMemoryFree(taskObj.sourceDsn);
×
1999
  }
2000
  if (NULL != taskObj.sinkDsn && isChange.sink) {
×
2001
    taosMemoryFree(taskObj.sinkDsn);
×
2002
  }
2003
  if (NULL != taskObj.parser && isChange.parser) {
×
2004
    taosMemoryFree(taskObj.parser);
×
2005
  }
2006
  if (NULL != taskObj.reason && isChange.reason) {
×
2007
    taosMemoryFree(taskObj.reason);
×
2008
  }
2009
  if (NULL != taskObj.labels && isChange.labels) {
×
2010
    taosMemoryFree(taskObj.labels);
×
2011
  }
2012
  mndTransDrop(pTrans);
×
2013
  TAOS_RETURN(code);
×
2014
}
2015

2016
static int32_t mndProcessUpdateXnodeTaskReq(SRpcMsg *pReq) {
×
2017
  SMnode             *pMnode = pReq->info.node;
×
2018
  int32_t             code = -1;
×
2019
  SXnodeTaskObj       *pObj = NULL;
×
2020
  SMUpdateXnodeTaskReq updateReq = {0};
×
2021

2022
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_UPDATE_XNODE_TASK);
×
2023
  if (code != TSDB_CODE_SUCCESS) {
×
2024
    mError("failed check permission for update xnode task, code:%s", tstrerror(code));
×
2025
    goto _OVER;
×
2026
  }
2027

2028
  TAOS_CHECK_GOTO(tDeserializeSMUpdateXnodeTaskReq(pReq->pCont, pReq->contLen, &updateReq), NULL, _OVER);
×
2029

2030
  if (updateReq.tid > 0) {
×
2031
    pObj = mndAcquireXnodeTaskById(pMnode, updateReq.tid);
×
2032
  } else {
2033
    pObj = mndAcquireXnodeTaskByName(pMnode, updateReq.name.ptr);
×
2034
  }
2035
  if (pObj == NULL) {
×
2036
    code = terrno;
×
2037
    goto _OVER;
×
2038
  }
2039

2040
  if (updateReq.updateName.len > 0) {
×
2041
    SXnodeTaskObj *tmpObj = mndAcquireXnodeTaskByName(pMnode, updateReq.updateName.ptr);
×
2042
    if (tmpObj != NULL) {
×
2043
      mndReleaseXnodeTask(pMnode, tmpObj);
×
2044
      code = TSDB_CODE_MND_XNODE_NAME_DUPLICATE;
×
2045
      goto _OVER;
×
2046
    }
2047
  }
2048

2049
  code = mndUpdateXnodeTask(pMnode, pReq, pObj, &updateReq);
×
2050
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
2051

2052
_OVER:
×
2053
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2054
    mError("xnode task:%d, failed to update since %s", updateReq.tid, tstrerror(code));
×
2055
  }
2056

2057
  mndReleaseXnodeTask(pMnode, pObj);
×
2058
  tFreeSMUpdateXnodeTaskReq(&updateReq);
×
2059
  TAOS_RETURN(code);
×
2060
  return 0;
2061
}
2062

2063
SXnodeTaskObj *mndAcquireXnodeTask(SMnode *pMnode, int32_t tid) {
×
2064
  SXnodeTaskObj *pObj = sdbAcquire(pMnode->pSdb, SDB_XNODE_TASK, &tid);
×
2065
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
2066
    terrno = TSDB_CODE_MND_XNODE_TASK_NOT_EXIST;
×
2067
  }
2068
  return pObj;
×
2069
}
2070

2071
static int32_t mndSetDropXnodeTaskRedoLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
2072
  int32_t  code = 0;
×
2073
  SSdbRaw *pRedoRaw = mndXnodeTaskActionEncode(pObj);
×
2074
  if (pRedoRaw == NULL) {
×
2075
    code = terrno;
×
2076
    return code;
×
2077
  }
2078

2079
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
2080
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
2081

2082
  TAOS_RETURN(code);
×
2083
}
2084

2085
static int32_t mndSetDropXnodeTaskCommitLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
2086
  int32_t  code = 0;
×
2087
  SSdbRaw *pCommitRaw = mndXnodeTaskActionEncode(pObj);
×
2088
  if (pCommitRaw == NULL) {
×
2089
    code = terrno;
×
2090
    return code;
×
2091
  }
2092

2093
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
2094
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
2095
  TAOS_RETURN(code);
×
2096
}
2097
static int32_t mndSetDropXnodeTaskInfoToTrans(SMnode *pMnode, STrans *pTrans, SXnodeTaskObj *pObj, bool force) {
×
2098
  if (pObj == NULL) {
×
2099
    return 0;
×
2100
  }
2101
  TAOS_CHECK_RETURN(mndSetDropXnodeTaskRedoLogs(pTrans, pObj));
×
2102
  TAOS_CHECK_RETURN(mndSetDropXnodeTaskCommitLogs(pTrans, pObj));
×
2103
  return 0;
×
2104
}
2105

2106
static int32_t mndDropXnodeTask(SMnode *pMnode, SRpcMsg *pReq, SXnodeTaskObj *pTask) {
×
2107
  int32_t code = 0;
×
2108
  int32_t lino = 0;
×
2109
  SArray *pArray = NULL;
×
2110

2111
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-xnode-task");
×
2112
  TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
×
2113

2114
  mndTransSetSerial(pTrans);
×
2115
  mDebug("trans:%d, to drop xnode:%d", pTrans->id, pTask->id);
×
2116

2117
  code = mndSetDropXnodeTaskInfoToTrans(pMnode, pTrans, pTask, false);
×
2118
  mndReleaseXnodeTask(pMnode, pTask);
×
2119

2120
  TSDB_CHECK_CODE(code, lino, _OVER);
×
2121

2122
  code = mndTransPrepare(pMnode, pTrans);
×
2123

2124
_OVER:
×
2125
  if (pArray != NULL) {
×
2126
    for (int i = 0; i < pArray->size; i++) {
×
2127
      SXnodeJobObj *pJob = taosArrayGet(pArray, i);
×
2128
      if (pJob == NULL) continue;
×
2129
      mndReleaseXnodeJob(pMnode, pJob);
×
2130
    }
2131
  }
2132
  mndTransDrop(pTrans);
×
2133
  TAOS_RETURN(code);
×
2134
}
2135

2136
static int32_t mndProcessDropXnodeTaskReq(SRpcMsg *pReq) {
×
2137
  SMnode            *pMnode = pReq->info.node;
×
2138
  int32_t            code = -1;
×
2139
  SXnodeTaskObj     *pObj = NULL;
×
2140
  SMDropXnodeTaskReq dropReq = {0};
×
2141
  SJson             *pJson = NULL;
×
2142

2143
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_DROP_XNODE_TASK);
×
2144
  if (code != TSDB_CODE_SUCCESS) {
×
2145
    mError("failed check permission for drop xnode task, code:%s", tstrerror(code));
×
2146
    goto _OVER;
×
2147
  }
2148
  TAOS_CHECK_GOTO(tDeserializeSMDropXnodeTaskReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
2149
  mDebug("DropXnodeTask with tid:%d, start to drop", dropReq.id);
×
2150

2151
  if (dropReq.id <= 0 && (dropReq.name.len <= 0 || dropReq.name.ptr == NULL)) {
×
2152
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
2153
    goto _OVER;
×
2154
  }
2155

2156
  if (dropReq.name.len > 0 && dropReq.name.ptr != NULL) {
×
2157
    pObj = mndAcquireXnodeTaskByName(pMnode, dropReq.name.ptr);
×
2158
  } else {
2159
    pObj = mndAcquireXnodeTask(pMnode, dropReq.id);
×
2160
  }
2161
  if (pObj == NULL) {
×
2162
    code = terrno;
×
2163
    goto _OVER;
×
2164
  }
2165

2166
  // send request to drop xnode task
2167
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
2168
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/task/drop/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
2169
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_DELETE, defaultTimeout, NULL, 0);
×
2170

2171
  code = mndDropXnodeTask(pMnode, pReq, pObj);
×
2172
  if (code == 0) {
×
2173
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
2174
  }
2175

2176
_OVER:
×
2177
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2178
    mError("xnode task:%d, failed to drop since %s", dropReq.id, tstrerror(code));
×
2179
  }
2180
  if (pJson != NULL) {
×
2181
    tjsonDelete(pJson);
×
2182
  }
2183
  tFreeSMDropXnodeTaskReq(&dropReq);
×
2184
  TAOS_RETURN(code);
×
2185
}
2186
static int32_t mndRetrieveXnodeTasks(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
2187
  SMnode        *pMnode = pReq->info.node;
×
2188
  SSdb          *pSdb = pMnode->pSdb;
×
2189
  int32_t        numOfRows = 0;
×
2190
  int32_t        cols = 0;
×
2191
  SXnodeTaskObj *pObj = NULL;
×
2192
  char           buf[VARSTR_HEADER_SIZE +
×
2193
           TMAX(TSDB_XNODE_TASK_NAME_LEN,
×
2194
                          TMAX(TSDB_XNODE_TASK_SOURCE_LEN, TMAX(TSDB_XNODE_TASK_SINK_LEN, TSDB_XNODE_TASK_PARSER_LEN)))];
2195
  int32_t        code = 0;
×
2196
  mDebug("show.type:%d, %s:%d: retrieve xnode tasks with rows: %d", pShow->type, __FILE__, __LINE__, rows);
×
2197

2198
  while (numOfRows < rows) {
×
2199
    pShow->pIter = sdbFetch(pSdb, SDB_XNODE_TASK, pShow->pIter, (void **)&pObj);
×
2200
    if (pShow->pIter == NULL) break;
×
2201

2202
    cols = 0;
×
2203
    // id
2204
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2205
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
×
2206
    if (code != 0) goto _end;
×
2207

2208
    // name
2209
    buf[0] = 0;
×
2210
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->name, pShow->pMeta->pSchemas[cols].bytes);
×
2211
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2212
    code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
2213
    if (code != 0) goto _end;
×
2214

2215
    // from
2216
    buf[0] = 0;
×
2217
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->sourceDsn, pShow->pMeta->pSchemas[cols].bytes);
×
2218
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2219
    code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
2220
    if (code != 0) goto _end;
×
2221

2222
    // to
2223
    buf[0] = 0;
×
2224
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->sinkDsn, pShow->pMeta->pSchemas[cols].bytes);
×
2225
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2226
    code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
2227
    if (code != 0) goto _end;
×
2228

2229
    // parser
2230
    if (pObj->parserLen > 0 && pObj->parser != NULL) {
×
2231
      buf[0] = 0;
×
2232
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->parser, pShow->pMeta->pSchemas[cols].bytes);
×
2233
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2234
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
2235
      if (code != 0) goto _end;
×
2236
    } else {
2237
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2238
      colDataSetNULL(pColInfo, numOfRows);
×
2239
    }
2240

2241
    // via
2242
    if (pObj->via != 0) {
×
2243
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2244
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->via, false);
×
2245
      if (code != 0) goto _end;
×
2246
    } else {
2247
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2248
      colDataSetNULL(pColInfo, numOfRows);
×
2249
    }
2250

2251
    // xnode_id
2252
    if (pObj->xnodeId != 0) {
×
2253
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2254
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->xnodeId, false);
×
2255
      if (code != 0) goto _end;
×
2256
    } else {
2257
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2258
      colDataSetNULL(pColInfo, numOfRows);
×
2259
    }
2260

2261
    // status
2262
    if (pObj->statusLen > 0) {
×
2263
      buf[0] = 0;
×
2264
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->status, pShow->pMeta->pSchemas[cols].bytes);
×
2265
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2266
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
2267
      if (code != 0) goto _end;
×
2268
    } else {
2269
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2270
      colDataSetNULL(pColInfo, numOfRows);
×
2271
    }
2272

2273
    // reason
2274
    if (pObj->reasonLen > 0) {
×
2275
      buf[0] = 0;
×
2276
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->reason, pShow->pMeta->pSchemas[cols].bytes);
×
2277
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2278
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
2279
      if (code != 0) goto _end;
×
2280
    } else {
2281
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2282
      colDataSetNULL(pColInfo, numOfRows);
×
2283
    }
2284

2285
    // create_by
2286
    if (pObj->createdByLen > 0) {
×
2287
      buf[0] = 0;
×
2288
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->createdBy, pShow->pMeta->pSchemas[cols].bytes);
×
2289
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2290
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
2291
      if (code != 0) goto _end;
×
2292
    } else {
2293
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2294
      colDataSetNULL(pColInfo, numOfRows);
×
2295
    }
2296

2297
    // labels
2298
    if (pObj->labelsLen > 0) {
×
2299
      buf[0] = 0;
×
2300
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->labels, pShow->pMeta->pSchemas[cols].bytes);
×
2301
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2302
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
2303
      if (code != 0) goto _end;
×
2304
    } else {
2305
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2306
      colDataSetNULL(pColInfo, numOfRows);
×
2307
    }
2308

2309
    // create_time
2310
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2311
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createTime, false);
×
2312
    if (code != 0) goto _end;
×
2313

2314
    // update_time
2315
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
2316
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->updateTime, false);
×
2317
    if (code != 0) goto _end;
×
2318

2319
    numOfRows++;
×
2320
    sdbRelease(pSdb, pObj);
×
2321
  }
2322

2323
_end:
×
2324
  if (code != 0 && pObj != NULL) sdbRelease(pSdb, pObj);
×
2325

2326
  pShow->numOfRows += numOfRows;
×
2327
  return numOfRows;
×
2328
}
2329

2330
static void mndCancelGetNextXnodeTask(SMnode *pMnode, void *pIter) {
×
2331
  SSdb *pSdb = pMnode->pSdb;
×
2332
  sdbCancelFetchByType(pSdb, pIter, SDB_XNODE_TASK);
×
2333
}
×
2334

2335
/** xnode job section **/
2336

2337
static int32_t mndAcquireXnodeJobsByTaskId(SMnode *pMnode, int32_t tid, SArray **ppArray) {
×
2338
  int32_t code = 0;
×
2339
  SSdb   *pSdb = pMnode->pSdb;
×
2340

2341
  *ppArray = taosArrayInit(16, sizeof(SXnodeJobObj));
×
2342
  if (ppArray == NULL) {
×
2343
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2344
    if (terrno != 0) code = terrno;
×
2345
    goto _exit;
×
2346
  }
2347

2348
  int32_t idx = 0;
×
2349
  void   *pIter = NULL;
×
2350
  while (1) {
×
2351
    SXnodeJobObj *pJob = NULL;
×
2352
    pIter = sdbFetch(pSdb, SDB_XNODE_JOB, pIter, (void **)&pJob);
×
2353
    if (pIter == NULL) break;
×
2354

2355
    if (pJob->taskId == tid) {
×
2356
      if (NULL == taosArrayInsert(*ppArray, idx++, pJob)) {
×
2357
        code = terrno;
×
2358
        sdbRelease(pSdb, pJob);
×
2359
        goto _exit;
×
2360
      }
2361
    }
2362

2363
    sdbRelease(pSdb, pJob);
×
2364
  }
2365
  sdbCancelFetch(pSdb, pIter);
×
2366

2367
_exit:
×
2368
  TAOS_RETURN(code);
×
2369
}
2370

2371
static int32_t mndAcquireXnodeJobsAll(SMnode *pMnode, SArray **ppArray) {
×
2372
  int32_t code = 0;
×
2373
  SSdb   *pSdb = pMnode->pSdb;
×
2374

2375
  *ppArray = taosArrayInit(64, sizeof(SXnodeJobObj));
×
2376
  if (ppArray == NULL) {
×
2377
    code = terrno;
×
2378
    goto _exit;
×
2379
  }
2380

2381
  int32_t idx = 0;
×
2382
  void   *pIter = NULL;
×
2383
  while (1) {
×
2384
    SXnodeJobObj *pJob = NULL;
×
2385
    pIter = sdbFetch(pSdb, SDB_XNODE_JOB, pIter, (void **)&pJob);
×
2386
    if (pIter == NULL) break;
×
2387
    if (NULL == taosArrayInsert(*ppArray, idx++, pJob)) {
×
2388
      code = terrno;
×
2389
      goto _exit;
×
2390
    }
2391
    sdbRelease(pSdb, pJob);
×
2392
  }
2393
  sdbCancelFetch(pSdb, pIter);
×
2394

2395
_exit:
×
2396
  TAOS_RETURN(code);
×
2397
}
2398

2399
static void mndFreeXnodeJob(SXnodeJobObj *pObj) {
×
2400
  if (NULL == pObj) {
×
2401
    return;
×
2402
  }
2403
  if (NULL != pObj->config) {
×
2404
    taosMemoryFreeClear(pObj->config);
×
2405
  }
2406
  if (NULL != pObj->reason) {
×
2407
    taosMemoryFreeClear(pObj->reason);
×
2408
  }
2409
  if (NULL != pObj->status) {
×
2410
    taosMemoryFreeClear(pObj->status);
×
2411
  }
2412
}
2413

2414
SSdbRaw *mndXnodeJobActionEncode(SXnodeJobObj *pObj) {
6✔
2415
  int32_t code = 0;
6✔
2416
  int32_t lino = 0;
6✔
2417
  terrno = TSDB_CODE_OUT_OF_MEMORY;
6✔
2418

2419
  if (NULL == pObj) {
6✔
2420
    terrno = TSDB_CODE_INVALID_PARA;
2✔
2421
    return NULL;
2✔
2422
  }
2423

2424
  mDebug("xnode tid:%d, jid:%d, start to encode to raw, row:%p", pObj->taskId, pObj->id, pObj);
4✔
2425

2426
  int32_t rawDataLen =
4✔
2427
      sizeof(SXnodeJobObj) + TSDB_XNODE_RESERVE_SIZE + pObj->configLen + pObj->reasonLen + pObj->statusLen;
4✔
2428

2429
  SSdbRaw *pRaw = sdbAllocRaw(SDB_XNODE_JOB, TSDB_XNODE_VER_NUMBER, rawDataLen);
4✔
2430
  if (pRaw == NULL) goto _OVER;
4✔
2431

2432
  int32_t dataPos = 0;
4✔
2433
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
4✔
2434
  SDB_SET_INT32(pRaw, dataPos, pObj->taskId, _OVER)
4✔
2435
  SDB_SET_INT32(pRaw, dataPos, pObj->configLen, _OVER)
4✔
2436
  SDB_SET_BINARY(pRaw, dataPos, pObj->config, pObj->configLen, _OVER)
4✔
2437
  SDB_SET_INT32(pRaw, dataPos, pObj->via, _OVER)
4✔
2438
  SDB_SET_INT32(pRaw, dataPos, pObj->xnodeId, _OVER)
4✔
2439
  SDB_SET_INT32(pRaw, dataPos, pObj->statusLen, _OVER)
4✔
2440
  SDB_SET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
4✔
2441
  SDB_SET_INT32(pRaw, dataPos, pObj->reasonLen, _OVER)
4✔
2442
  SDB_SET_BINARY(pRaw, dataPos, pObj->reason, pObj->reasonLen, _OVER)
4✔
2443
  SDB_SET_INT64(pRaw, dataPos, pObj->createTime, _OVER)
4✔
2444
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
4✔
2445

2446
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
4✔
2447

2448
  terrno = 0;
4✔
2449

2450
_OVER:
4✔
2451
  if (terrno != 0) {
4✔
2452
    mError("xnode tid:%d, jid:%d, failed to encode to raw:%p since %s", pObj->taskId, pObj->id, pRaw, terrstr());
×
2453
    sdbFreeRaw(pRaw);
×
2454
    return NULL;
×
2455
  }
2456

2457
  mTrace("xnode tid:%d, jid:%d, encode to raw:%p, row:%p", pObj->taskId, pObj->id, pRaw, pObj);
4✔
2458
  return pRaw;
4✔
2459
}
2460

2461
SSdbRow *mndXnodeJobActionDecode(SSdbRaw *pRaw) {
6✔
2462
  mInfo("xnode, start to decode from raw:%p", pRaw);
6✔
2463
  int32_t code = 0;
6✔
2464
  int32_t lino = 0;
6✔
2465
  terrno = TSDB_CODE_OUT_OF_MEMORY;
6✔
2466
  SSdbRow      *pRow = NULL;
6✔
2467
  SXnodeJobObj *pObj = NULL;
6✔
2468

2469
  if (NULL == pRaw) {
6✔
2470
    terrno = TSDB_CODE_INVALID_PARA;
2✔
2471
    return NULL;
2✔
2472
  }
2473

2474
  int8_t sver = 0;
4✔
2475
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
4✔
2476

2477
  if (sver != TSDB_XNODE_VER_NUMBER) {
4✔
2478
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
2479
    goto _OVER;
×
2480
  }
2481

2482
  pRow = sdbAllocRow(sizeof(SXnodeJobObj));
4✔
2483
  if (pRow == NULL) goto _OVER;
4✔
2484

2485
  pObj = sdbGetRowObj(pRow);
4✔
2486
  if (pObj == NULL) goto _OVER;
4✔
2487

2488
  int32_t dataPos = 0;
4✔
2489
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
4✔
2490
  SDB_GET_INT32(pRaw, dataPos, &pObj->taskId, _OVER)
4✔
2491

2492
  SDB_GET_INT32(pRaw, dataPos, &pObj->configLen, _OVER)
4✔
2493
  if (pObj->configLen > 0) {
4✔
2494
    pObj->config = taosMemoryCalloc(pObj->configLen, 1);
4✔
2495
    if (pObj->config == NULL) goto _OVER;
4✔
2496
    SDB_GET_BINARY(pRaw, dataPos, pObj->config, pObj->configLen, _OVER)
4✔
2497
  }
2498

2499
  SDB_GET_INT32(pRaw, dataPos, &pObj->via, _OVER)
4✔
2500
  SDB_GET_INT32(pRaw, dataPos, &pObj->xnodeId, _OVER)
4✔
2501
  SDB_GET_INT32(pRaw, dataPos, &pObj->statusLen, _OVER)
4✔
2502
  if (pObj->statusLen > 0) {
4✔
2503
    pObj->status = taosMemoryCalloc(pObj->statusLen, 1);
4✔
2504
    if (pObj->status == NULL) goto _OVER;
4✔
2505
    SDB_GET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
4✔
2506
  }
2507

2508
  SDB_GET_INT32(pRaw, dataPos, &pObj->reasonLen, _OVER)
4✔
2509
  if (pObj->reasonLen > 0) {
4✔
2510
    pObj->reason = taosMemoryCalloc(pObj->reasonLen, 1);
2✔
2511
    if (pObj->reason == NULL) goto _OVER;
2✔
2512
    SDB_GET_BINARY(pRaw, dataPos, pObj->reason, pObj->reasonLen, _OVER)
2✔
2513
  }
2514

2515
  SDB_GET_INT64(pRaw, dataPos, &pObj->createTime, _OVER)
4✔
2516
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
4✔
2517

2518
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
4✔
2519

2520
  terrno = 0;
4✔
2521

2522
_OVER:
4✔
2523
  if (terrno != 0) {
4✔
2524
    mError("xnode tid:%d, jid:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->taskId,
×
2525
           pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
2526
    if (pObj != NULL) {
×
2527
      taosMemoryFreeClear(pObj->config);
×
2528
      taosMemoryFreeClear(pObj->reason);
×
2529
      taosMemoryFreeClear(pObj->status);
×
2530
    }
2531
    taosMemoryFreeClear(pRow);
×
2532
    return NULL;
×
2533
  }
2534

2535
  mTrace("xnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
4✔
2536
  return pRow;
4✔
2537
}
2538

2539
int32_t mndXnodeJobActionInsert(SSdb *pSdb, SXnodeJobObj *pObj) {
×
2540
  mInfo("xnode tid:%d, jid:%d, perform insert action, row:%p", pObj->taskId, pObj->id, pObj);
×
2541
  return 0;
×
2542
}
2543

2544
int32_t mndXnodeJobActionDelete(SSdb *pSdb, SXnodeJobObj *pObj) {
×
2545
  mDebug("xnode tid:%d, jid:%d, perform delete action, row:%p", pObj->taskId, pObj->id, pObj);
×
2546
  mndFreeXnodeJob(pObj);
×
2547
  return 0;
×
2548
}
2549

2550
int32_t mndXnodeJobActionUpdate(SSdb *pSdb, SXnodeJobObj *pOld, SXnodeJobObj *pNew) {
×
2551
  mDebug("xnode tid:%d, jid:%d, perform update action, old row:%p new row:%p", pOld->taskId, pOld->id, pOld, pNew);
×
2552

2553
  taosWLockLatch(&pOld->lock);
×
2554
  pOld->via = pNew->via;
×
2555
  pOld->xnodeId = pNew->xnodeId;
×
2556
  swapFields(&pNew->statusLen, &pNew->status, &pOld->statusLen, &pOld->status);
×
2557
  swapFields(&pNew->configLen, &pNew->config, &pOld->configLen, &pOld->config);
×
2558
  swapFields(&pNew->reasonLen, &pNew->reason, &pOld->reasonLen, &pOld->reason);
×
2559
  if (pNew->updateTime > pOld->updateTime) {
×
2560
    pOld->updateTime = pNew->updateTime;
×
2561
  }
2562
  taosWUnLockLatch(&pOld->lock);
×
2563
  return 0;
×
2564
}
2565

2566
/* xnode user pass actions */
2567
SSdbRaw *mndXnodeUserPassActionEncode(SXnodeUserPassObj *pObj) {
8✔
2568
  int32_t code = 0;
8✔
2569
  int32_t lino = 0;
8✔
2570
  terrno = TSDB_CODE_OUT_OF_MEMORY;
8✔
2571

2572
  if (NULL == pObj) {
8✔
2573
    terrno = TSDB_CODE_INVALID_PARA;
2✔
2574
    return NULL;
2✔
2575
  }
2576

2577
  int32_t rawDataLen =
6✔
2578
      sizeof(SXnodeUserPassObj) + TSDB_XNODE_RESERVE_SIZE + pObj->userLen + pObj->passLen + pObj->tokenLen;
6✔
2579

2580
  SSdbRaw *pRaw = sdbAllocRaw(SDB_XNODE_USER_PASS, TSDB_XNODE_VER_NUMBER, rawDataLen);
6✔
2581
  if (pRaw == NULL) goto _OVER;
6✔
2582

2583
  int32_t dataPos = 0;
6✔
2584
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
6✔
2585
  SDB_SET_INT32(pRaw, dataPos, pObj->userLen, _OVER)
6✔
2586
  SDB_SET_BINARY(pRaw, dataPos, pObj->user, pObj->userLen, _OVER)
6✔
2587
  SDB_SET_INT32(pRaw, dataPos, pObj->passLen, _OVER)
6✔
2588
  SDB_SET_BINARY(pRaw, dataPos, pObj->pass, pObj->passLen, _OVER)
6✔
2589
  SDB_SET_INT32(pRaw, dataPos, pObj->tokenLen, _OVER)
6✔
2590
  SDB_SET_BINARY(pRaw, dataPos, pObj->token, pObj->tokenLen, _OVER)
6✔
2591
  SDB_SET_INT64(pRaw, dataPos, pObj->createTime, _OVER)
6✔
2592
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
6✔
2593

2594
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
6✔
2595

2596
  terrno = 0;
6✔
2597

2598
_OVER:
6✔
2599
  if (terrno != 0) {
6✔
2600
    mError("xnode user pass:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
2601
    sdbFreeRaw(pRaw);
×
2602
    return NULL;
×
2603
  }
2604

2605
  mTrace("xnode user pass:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
6✔
2606
  return pRaw;
6✔
2607
}
2608
SSdbRow *mndXnodeUserPassActionDecode(SSdbRaw *pRaw) {
8✔
2609
  int32_t code = 0;
8✔
2610
  int32_t lino = 0;
8✔
2611
  terrno = TSDB_CODE_OUT_OF_MEMORY;
8✔
2612
  SSdbRow           *pRow = NULL;
8✔
2613
  SXnodeUserPassObj *pObj = NULL;
8✔
2614

2615
  if (NULL == pRaw) {
8✔
2616
    terrno = TSDB_CODE_INVALID_PARA;
2✔
2617
    return NULL;
2✔
2618
  }
2619

2620
  int8_t sver = 0;
6✔
2621
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
6✔
2622

2623
  if (sver != TSDB_XNODE_VER_NUMBER) {
6✔
2624
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
2625
    goto _OVER;
×
2626
  }
2627

2628
  pRow = sdbAllocRow(sizeof(SXnodeUserPassObj));
6✔
2629
  if (pRow == NULL) goto _OVER;
6✔
2630

2631
  pObj = sdbGetRowObj(pRow);
6✔
2632
  if (pObj == NULL) goto _OVER;
6✔
2633

2634
  int32_t dataPos = 0;
6✔
2635
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
6✔
2636
  SDB_GET_INT32(pRaw, dataPos, &pObj->userLen, _OVER)
6✔
2637
  if (pObj->userLen > 0) {
6✔
2638
    pObj->user = taosMemoryCalloc(1, pObj->userLen + 1);
4✔
2639
    if (pObj->user == NULL) goto _OVER;
4✔
2640
    SDB_GET_BINARY(pRaw, dataPos, pObj->user, pObj->userLen, _OVER)
4✔
2641
  }
2642
  SDB_GET_INT32(pRaw, dataPos, &pObj->passLen, _OVER)
6✔
2643
  if (pObj->passLen > 0) {
6✔
2644
    pObj->pass = taosMemoryCalloc(1, pObj->passLen + 1);
4✔
2645
    if (pObj->pass == NULL) goto _OVER;
4✔
2646
    SDB_GET_BINARY(pRaw, dataPos, pObj->pass, pObj->passLen, _OVER)
4✔
2647
  }
2648
  SDB_GET_INT32(pRaw, dataPos, &pObj->tokenLen, _OVER)
6✔
2649
  if (pObj->tokenLen > 0) {
6✔
2650
    pObj->token = taosMemoryCalloc(1, pObj->tokenLen + 1);
×
2651
    if (pObj->token == NULL) goto _OVER;
×
2652
    SDB_GET_BINARY(pRaw, dataPos, pObj->token, pObj->tokenLen, _OVER)
×
2653
  }
2654
  SDB_GET_INT64(pRaw, dataPos, &pObj->createTime, _OVER)
6✔
2655
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
6✔
2656

2657
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
6✔
2658

2659
  terrno = 0;
6✔
2660

2661
_OVER:
6✔
2662
  if (terrno != 0) {
6✔
2663
    mError("xnode user pass:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
2664
    if (pObj != NULL) {
×
2665
      if (pObj->user != NULL) {
×
2666
        taosMemoryFreeClear(pObj->user);
×
2667
      }
2668
      if (pObj->pass != NULL) {
×
2669
        taosMemoryFreeClear(pObj->pass);
×
2670
      }
2671
    }
2672
    taosMemoryFreeClear(pRow);
×
2673
    return NULL;
×
2674
  }
2675

2676
  mTrace("xnode user pass:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
6✔
2677
  return pRow;
6✔
2678
}
2679
int32_t mndXnodeUserPassActionInsert(SSdb *pSdb, SXnodeUserPassObj *pObj) {
×
2680
  mDebug("xnode user pass:%d, perform insert action, row:%p", pObj->id, pObj);
×
2681
  return 0;
×
2682
}
2683
int32_t mndXnodeUserPassActionUpdate(SSdb *pSdb, SXnodeUserPassObj *pOld, SXnodeUserPassObj *pNew) {
×
2684
  mDebug("xnode user pass:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
×
2685
  taosWLockLatch(&pOld->lock);
×
2686
  if (pNew->updateTime > pOld->updateTime) {
×
2687
    pOld->updateTime = pNew->updateTime;
×
2688
  }
2689
  char* tmp = NULL;
×
2690
  pOld->userLen = pNew->userLen;
×
2691
  tmp = pOld->user;
×
2692
  pOld->user = pNew->user;
×
2693
  pNew->user = tmp;
×
2694

2695
  pOld->passLen = pNew->passLen;
×
2696
  tmp = pOld->pass;
×
2697
  pOld->pass = pNew->pass;
×
2698
  pNew->pass = tmp;
×
2699

2700
  pOld->tokenLen = pNew->tokenLen;
×
2701
  tmp = pOld->token;
×
2702
  pOld->token = pNew->token;
×
2703
  pNew->token = tmp;
×
2704
  
2705
  // swapFields(&pNew->userLen, &pNew->user, &pOld->userLen, &pOld->user);
2706
  // swapFields(&pNew->passLen, &pNew->pass, &pOld->passLen, &pOld->pass);
2707
  // swapFields(&pNew->tokenLen, &pNew->token, &pOld->tokenLen, &pOld->token);
2708
  // SXnodeUserPassObj* tmp = pNew;
2709
  // pNew = pOld;
2710
  // pOld = tmp;
2711
  
2712
  taosWUnLockLatch(&pOld->lock);
×
2713
  return 0;
×
2714
}
2715
int32_t mndXnodeUserPassActionDelete(SSdb *pSdb, SXnodeUserPassObj *pObj) {
×
2716
  mDebug("xnode:%d, perform delete action, row:%p", pObj->id, pObj);
×
2717
  taosMemoryFreeClear(pObj->user);
×
2718
  taosMemoryFreeClear(pObj->pass);
×
2719
  taosMemoryFreeClear(pObj->token);
×
2720
  return 0;
×
2721
}
2722

2723
static int32_t mndSetCreateXnodeJobRedoLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
2724
  int32_t  code = 0;
×
2725
  SSdbRaw *pRedoRaw = mndXnodeJobActionEncode(pObj);
×
2726
  if (pRedoRaw == NULL) {
×
2727
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2728
    if (terrno != 0) code = terrno;
×
2729
    TAOS_RETURN(code);
×
2730
  }
2731
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
2732
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
2733
  TAOS_RETURN(code);
×
2734
}
2735

2736
static int32_t mndSetCreateXnodeJobUndoLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
2737
  int32_t  code = 0;
×
2738
  SSdbRaw *pUndoRaw = mndXnodeJobActionEncode(pObj);
×
2739
  if (pUndoRaw == NULL) {
×
2740
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2741
    if (terrno != 0) code = terrno;
×
2742
    TAOS_RETURN(code);
×
2743
  }
2744
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
2745
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
2746
  TAOS_RETURN(code);
×
2747
}
2748

2749
static int32_t mndSetCreateXnodeJobCommitLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
2750
  int32_t  code = 0;
×
2751
  SSdbRaw *pCommitRaw = mndXnodeJobActionEncode(pObj);
×
2752
  if (pCommitRaw == NULL) {
×
2753
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2754
    if (terrno != 0) code = terrno;
×
2755
    TAOS_RETURN(code);
×
2756
  }
2757
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
2758
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
2759
  TAOS_RETURN(code);
×
2760
}
2761

2762
static int32_t mndCreateXnodeJob(SMnode *pMnode, SRpcMsg *pReq, SMCreateXnodeJobReq *pCreate) {
×
2763
  int32_t code = -1;
×
2764
  STrans *pTrans = NULL;
×
2765

2766
  SXnodeJobObj jobObj = {0};
×
2767
  jobObj.id = sdbGetMaxId(pMnode->pSdb, SDB_XNODE_JOB);
×
2768
  jobObj.taskId = pCreate->tid;
×
2769

2770
  jobObj.configLen = pCreate->config.len + 1;
×
2771
  if (jobObj.configLen > TSDB_XNODE_TASK_JOB_CONFIG_LEN + 1) {
×
2772
    code = TSDB_CODE_MND_XNODE_TASK_JOB_CONFIG_TOO_LONG;
×
2773
    goto _OVER;
×
2774
  }
2775
  jobObj.config = taosMemoryCalloc(1, jobObj.configLen);
×
2776
  if (jobObj.config == NULL) goto _OVER;
×
2777
  (void)memcpy(jobObj.config, pCreate->config.ptr, pCreate->config.len);
×
2778

2779
  jobObj.via = pCreate->via;
×
2780
  jobObj.xnodeId = pCreate->xnodeId;
×
2781

2782
  if (pCreate->status.ptr != NULL) {
×
2783
    jobObj.statusLen = pCreate->status.len + 1;
×
2784
    jobObj.status = taosMemoryCalloc(1, jobObj.statusLen);
×
2785
    if (jobObj.status == NULL) goto _OVER;
×
2786
    (void)memmove(jobObj.status, pCreate->status.ptr, pCreate->status.len);
×
2787
  }
2788

2789
  if (jobObj.reason != NULL) {
×
2790
    jobObj.reasonLen = pCreate->reason.len + 1;
×
2791
    if (jobObj.reasonLen > TSDB_XNODE_TASK_REASON_LEN + 1) {
×
2792
      code = TSDB_CODE_MND_XNODE_TASK_REASON_TOO_LONG;
×
2793
      goto _OVER;
×
2794
    }
2795
    jobObj.reason = taosMemoryCalloc(1, jobObj.reasonLen);
×
2796
    if (jobObj.reason == NULL) goto _OVER;
×
2797
    (void)memcpy(jobObj.reason, pCreate->reason.ptr, pCreate->reason.len);
×
2798
  }
2799

2800
  jobObj.createTime = taosGetTimestampMs();
×
2801
  jobObj.updateTime = jobObj.createTime;
×
2802

2803
  mDebug("create xnode job, id:%d, tid:%d, config:%s, time:%" PRId64, jobObj.id, jobObj.taskId, jobObj.config,
×
2804
         jobObj.createTime);
2805

2806
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-xnode-job");
×
2807
  if (pTrans == NULL) {
×
2808
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
2809
    if (terrno != 0) code = terrno;
×
2810
    mInfo("failed to create transaction for xnode-job:%d, code:0x%x:%s", pCreate->tid, code, tstrerror(code));
×
2811
    goto _OVER;
×
2812
  }
2813
  mndTransSetSerial(pTrans);
×
2814

2815
  mInfo("trans:%d, used to create xnode job on %d as jid:%d", pTrans->id, pCreate->tid, jobObj.id);
×
2816

2817
  TAOS_CHECK_GOTO(mndSetCreateXnodeJobRedoLogs(pTrans, &jobObj), NULL, _OVER);
×
2818
  TAOS_CHECK_GOTO(mndSetCreateXnodeJobUndoLogs(pTrans, &jobObj), NULL, _OVER);
×
2819
  TAOS_CHECK_GOTO(mndSetCreateXnodeJobCommitLogs(pTrans, &jobObj), NULL, _OVER);
×
2820
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
2821

2822
  code = 0;
×
2823

2824
_OVER:
×
2825
  mndFreeXnodeJob(&jobObj);
×
2826
  mndTransDrop(pTrans);
×
2827
  TAOS_RETURN(code);
×
2828
}
2829

2830
static int32_t mndUpdateXnodeJob(SMnode *pMnode, SRpcMsg *pReq, SXnodeJobObj *pOld, SMUpdateXnodeJobReq *pUpdate) {
×
2831
  mInfo("xnode job:%d, start to update", pUpdate->jid);
×
2832
  int32_t      code = -1;
×
2833
  STrans      *pTrans = NULL;
×
2834
  SXnodeJobObj jobObj = *pOld;
×
2835
  struct {
2836
    bool status;
2837
    bool config;
2838
    bool reason;
2839
  } isChange = {0};
×
2840

2841
  jobObj.id = pUpdate->jid;
×
2842
  if (pUpdate->via > 0) {
×
2843
    jobObj.via = pUpdate->via;
×
2844
  }
2845
  if (pUpdate->xnodeId > 0) {
×
2846
    jobObj.xnodeId = pUpdate->xnodeId;
×
2847
  }
2848
  if (pUpdate->status.ptr != NULL) {
×
2849
    jobObj.statusLen = pUpdate->status.len + 1;
×
2850
    jobObj.status = taosMemoryCalloc(1, jobObj.statusLen);
×
2851
    if (jobObj.status == NULL) goto _OVER;
×
2852
    (void)memcpy(jobObj.status, pUpdate->status.ptr, pUpdate->status.len);
×
2853
    isChange.status = true;
×
2854
  }
2855
  if (pUpdate->config != NULL) {
×
2856
    jobObj.configLen = pUpdate->configLen + 1;
×
2857
    jobObj.config = taosMemoryCalloc(1, jobObj.configLen);
×
2858
    if (jobObj.config == NULL) goto _OVER;
×
2859
    (void)memcpy(jobObj.config, pUpdate->config, pUpdate->configLen);
×
2860
    isChange.config = true;
×
2861
  }
2862
  if (pUpdate->reason != NULL) {
×
2863
    jobObj.reasonLen = pUpdate->reasonLen + 1;
×
2864
    jobObj.reason = taosMemoryCalloc(1, jobObj.reasonLen);
×
2865
    if (jobObj.reason == NULL) goto _OVER;
×
2866
    (void)memcpy(jobObj.reason, pUpdate->reason, pUpdate->reasonLen);
×
2867
    isChange.reason = true;
×
2868
  }
2869
  jobObj.updateTime = taosGetTimestampMs();
×
2870

2871
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-xnode");
×
2872
  if (pTrans == NULL) {
×
2873
    code = terrno;
×
2874
    goto _OVER;
×
2875
  }
2876
  mInfo("trans:%d, used to update xnode job:%d", pTrans->id, jobObj.id);
×
2877

2878
  TAOS_CHECK_GOTO(mndSetCreateXnodeJobCommitLogs(pTrans, &jobObj), NULL, _OVER);
×
2879
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
2880
  code = 0;
×
2881

2882
_OVER:
×
2883
  if (NULL != jobObj.status && isChange.status) {
×
2884
    taosMemoryFree(jobObj.status);
×
2885
  }
2886
  if (NULL != jobObj.config && isChange.config) {
×
2887
    taosMemoryFree(jobObj.config);
×
2888
  }
2889
  if (NULL != jobObj.reason && isChange.reason) {
×
2890
    taosMemoryFree(jobObj.reason);
×
2891
  }
2892
  mndTransDrop(pTrans);
×
2893
  TAOS_RETURN(code);
×
2894
}
2895

2896
void mndReleaseXnodeTaskJob(SMnode *pMnode, SXnodeJobObj *pObj) {
×
2897
  SSdb *pSdb = pMnode->pSdb;
×
2898
  sdbRelease(pSdb, pObj);
×
2899
}
×
2900

2901
SXnodeJobObj *mndAcquireXnodeJob(SMnode *pMnode, int32_t jid) {
×
2902
  SXnodeJobObj *pObj = sdbAcquire(pMnode->pSdb, SDB_XNODE_JOB, &jid);
×
2903
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
2904
    terrno = TSDB_CODE_MND_XNODE_JOB_NOT_EXIST;
×
2905
  }
2906
  return pObj;
×
2907
}
2908
void mndReleaseXnodeJob(SMnode *pMnode, SXnodeJobObj *pObj) {
×
2909
  SSdb *pSdb = pMnode->pSdb;
×
2910
  sdbRelease(pSdb, pObj);
×
2911
}
×
2912

2913
static int32_t mndSetDropXnodeJobRedoLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
2914
  int32_t  code = 0;
×
2915
  SSdbRaw *pRedoRaw = mndXnodeJobActionEncode(pObj);
×
2916
  if (pRedoRaw == NULL) {
×
2917
    code = terrno;
×
2918
    return code;
×
2919
  }
2920

2921
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
2922
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
2923

2924
  return code;
×
2925
}
2926

2927
static int32_t mndSetDropXnodeJobCommitLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
2928
  int32_t  code = 0;
×
2929
  SSdbRaw *pCommitRaw = mndXnodeJobActionEncode(pObj);
×
2930
  if (pCommitRaw == NULL) {
×
2931
    code = terrno;
×
2932
    return code;
×
2933
  }
2934

2935
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
2936
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
2937
  TAOS_RETURN(code);
×
2938
}
2939
static int32_t mndSetDropXnodeJobInfoToTrans(STrans *pTrans, SXnodeJobObj *pObj, bool force) {
×
2940
  if (pObj == NULL) {
×
2941
    return 0;
×
2942
  }
2943
  TAOS_CHECK_RETURN(mndSetDropXnodeJobRedoLogs(pTrans, pObj));
×
2944
  TAOS_CHECK_RETURN(mndSetDropXnodeJobCommitLogs(pTrans, pObj));
×
2945
  return 0;
×
2946
}
2947

2948
static int32_t mndDropXnodeJob(SMnode *pMnode, SRpcMsg *pReq, SXnodeJobObj *pObj) {
×
2949
  int32_t code = 0;
×
2950
  int32_t lino = 0;
×
2951

2952
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-xnode-job");
×
2953
  TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
×
2954

2955
  mndTransSetSerial(pTrans);
×
2956
  mInfo("trans:%d, to drop xnode:%d", pTrans->id, pObj->id);
×
2957

2958
  code = mndSetDropXnodeJobInfoToTrans(pTrans, pObj, false);
×
2959

2960
  TSDB_CHECK_CODE(code, lino, _OVER);
×
2961

2962
  code = mndTransPrepare(pMnode, pTrans);
×
2963

2964
_OVER:
×
2965
  mndTransDrop(pTrans);
×
2966
  return code;
×
2967
}
2968
static int32_t mndProcessCreateXnodeJobReq(SRpcMsg *pReq) {
×
2969
  mDebug("create xnode job req, content len:%d", pReq->contLen);
×
2970
  SMnode             *pMnode = pReq->info.node;
×
2971
  int32_t             code = -1;
×
2972
  SMCreateXnodeJobReq createReq = {0};
×
2973

2974
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_CREATE_XNODE_JOB);
×
2975
  if (code != TSDB_CODE_SUCCESS) {
×
2976
    mError("failed check permission for create xnode job, code:%s", tstrerror(code));
×
2977
    goto _OVER;
×
2978
  }
2979
  TAOS_CHECK_GOTO(tDeserializeSMCreateXnodeJobReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
2980
  mDebug("xnode create job on xnode:%d", createReq.xnodeId);
×
2981

2982
  code = mndCreateXnodeJob(pMnode, pReq, &createReq);
×
2983
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
2984

2985
_OVER:
×
2986
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
2987
    mError("xnode task job on task id:%d, failed to create since %s", createReq.tid, tstrerror(code));
×
2988
  }
2989

2990
  tFreeSMCreateXnodeJobReq(&createReq);
×
2991
  TAOS_RETURN(code);
×
2992
}
2993

2994
static int32_t mndProcessUpdateXnodeJobReq(SRpcMsg *pReq) {
×
2995
  SMnode             *pMnode = pReq->info.node;
×
2996
  int32_t             code = -1;
×
2997
  SXnodeJobObj       *pObj = NULL;
×
2998
  SMUpdateXnodeJobReq updateReq = {0};
×
2999

3000
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_UPDATE_XNODE_JOB);
×
3001
  if (code != TSDB_CODE_SUCCESS) {
×
3002
    mError("failed check permission for update xnode job, code:%s", tstrerror(code));
×
3003
    goto _OVER;
×
3004
  }
3005
  TAOS_CHECK_GOTO(tDeserializeSMUpdateXnodeJobReq(pReq->pCont, pReq->contLen, &updateReq), NULL, _OVER);
×
3006

3007
  pObj = mndAcquireXnodeJob(pMnode, updateReq.jid);
×
3008
  if (pObj == NULL) {
×
3009
    code = terrno;
×
3010
    goto _OVER;
×
3011
  }
3012

3013
  code = mndUpdateXnodeJob(pMnode, pReq, pObj, &updateReq);
×
3014
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
3015

3016
_OVER:
×
3017
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3018
    mError("xnode task job on jid:%d, failed to update since %s", updateReq.jid, tstrerror(code));
×
3019
  }
3020

3021
  mndReleaseXnodeJob(pMnode, pObj);
×
3022
  tFreeSMUpdateXnodeJobReq(&updateReq);
×
3023
  TAOS_RETURN(code);
×
3024

3025
  return 0;
3026
}
3027

3028
static int32_t mndProcessRebalanceXnodeJobReq(SRpcMsg *pReq) {
×
3029
  SMnode                *pMnode = pReq->info.node;
×
3030
  int32_t                code = -1;
×
3031
  SXnodeJobObj          *pObj = NULL;
×
3032
  SMRebalanceXnodeJobReq rebalanceReq = {0};
×
3033
  SJson                 *pJson = NULL;
×
3034

3035
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_REBALANCE_XNODE_JOB);
×
3036
  if (code != TSDB_CODE_SUCCESS) {
×
3037
    mError("failed check permission for rebalance xnode job, code:%s", tstrerror(code));
×
3038
    goto _OVER;
×
3039
  }
3040
  TAOS_CHECK_GOTO(tDeserializeSMRebalanceXnodeJobReq(pReq->pCont, pReq->contLen, &rebalanceReq), NULL, _OVER);
×
3041
  mDebug("RebalanceXnodeJob with jid:%d, xnode_id:%d, start to rebalance", rebalanceReq.jid, rebalanceReq.xnodeId);
×
3042

3043
  if (rebalanceReq.jid <= 0) {
×
3044
    code = TSDB_CODE_INVALID_MSG;
×
3045
    goto _OVER;
×
3046
  }
3047

3048
  pObj = mndAcquireXnodeJob(pMnode, rebalanceReq.jid);
×
3049
  if (pObj == NULL) {
×
3050
    code = terrno;
×
3051
    goto _OVER;
×
3052
  }
3053

3054
  // send request
3055
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
3056
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/rebalance/manual/%d/%d/%d", XNODED_PIPE_SOCKET_URL, pObj->taskId, pObj->id,
×
3057
           rebalanceReq.xnodeId);
3058
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, defaultTimeout, NULL, 0);
×
3059

3060
_OVER:
×
3061
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3062
    mError("xnode:%d, failed to rebalance xnode job since %s", rebalanceReq.jid, tstrerror(code));
×
3063
  }
3064
  if (pJson != NULL) {
×
3065
    tjsonDelete(pJson);
×
3066
  }
3067
  mndReleaseXnodeJob(pMnode, pObj);
×
3068
  tFreeSMRebalanceXnodeJobReq(&rebalanceReq);
×
3069
  TAOS_RETURN(code);
×
3070
}
3071

3072
typedef struct {
3073
  SValueNode nd;
3074
  bool       shouldFree;
3075
} SXndRefValueNode;
3076

3077
static void freeSXndRefValueNode(void *pNode) {
×
3078
  if (pNode == NULL) return;
×
3079

3080
  SXndRefValueNode *pRefNode = (SXndRefValueNode *)pNode;
×
3081
  if (pRefNode->shouldFree) {
×
3082
    taosMemoryFreeClear(pRefNode->nd.datum.p);
×
3083
  }
3084
}
3085

3086
typedef struct {
3087
  SArray   *stack;
3088
  SHashObj *pMap;
3089
  int32_t   code;
3090
} SXndWhereContext;
3091

3092
void freeSXndWhereContext(SXndWhereContext *pCtx) {
×
3093
  if (pCtx == NULL) return;
×
3094

3095
  if (pCtx->pMap != NULL) {
×
3096
    taosHashCleanup(pCtx->pMap);
×
3097
    pCtx->pMap = NULL;
×
3098
  }
3099
  if (pCtx->stack != NULL) {
×
3100
    for (int32_t i = 0; i < pCtx->stack->size; i++) {
×
3101
      SXndRefValueNode *pRefNode = (SXndRefValueNode *)taosArrayGet(pCtx->stack, i);
×
3102
      if (pRefNode != NULL) {
×
3103
        freeSXndRefValueNode(pRefNode);
×
3104
      }
3105
    }
3106
    taosArrayDestroy(pCtx->stack);
×
3107
    pCtx->stack = NULL;
×
3108
  }
3109
}
3110

3111
static SHashObj *convertJob2Map(const SXnodeJobObj *pJob) {
×
3112
  SHashObj *pMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
3113
  if (pMap == NULL) {
×
3114
    return NULL;
×
3115
  }
3116
  taosHashSetFreeFp(pMap, freeSXndRefValueNode);
×
3117
  // id
3118
  SXndRefValueNode id = {0};
×
3119
  id.nd.node.type = QUERY_NODE_VALUE;
×
3120
  id.nd.node.resType.type = TSDB_DATA_TYPE_UBIGINT;
×
3121
  id.nd.datum.u = pJob->id;
×
3122
  int32_t code = taosHashPut(pMap, "id", strlen("id") + 1, &id, sizeof(SXndRefValueNode));
×
3123
  if (code != 0) {
×
3124
    taosHashCleanup(pMap);
×
3125
    return NULL;
×
3126
  }
3127
  // task id
3128
  SXndRefValueNode taskId = {0};
×
3129
  taskId.nd.node.type = QUERY_NODE_VALUE;
×
3130
  taskId.nd.node.resType.type = TSDB_DATA_TYPE_UBIGINT;
×
3131
  taskId.nd.datum.u = pJob->taskId;
×
3132
  if (pJob->taskId == 0) {
×
3133
    taskId.nd.isNull = true;
×
3134
  }
3135
  code = taosHashPut(pMap, "task_id", strlen("task_id") + 1, &taskId, sizeof(SXndRefValueNode));
×
3136
  if (code != 0) {
×
3137
    taosHashCleanup(pMap);
×
3138
    return NULL;
×
3139
  }
3140
  // via
3141
  SXndRefValueNode via = {0};
×
3142
  via.nd.node.type = QUERY_NODE_VALUE;
×
3143
  via.nd.node.resType.type = TSDB_DATA_TYPE_UBIGINT;
×
3144
  via.nd.datum.u = pJob->via;
×
3145
  if (pJob->via == 0) {
×
3146
    via.nd.isNull = true;
×
3147
  }
3148
  code = taosHashPut(pMap, "via", strlen("via") + 1, &via, sizeof(SXndRefValueNode));
×
3149
  if (code != 0) {
×
3150
    taosHashCleanup(pMap);
×
3151
    return NULL;
×
3152
  }
3153
  // xnode id
3154
  SXndRefValueNode xnodeId = {0};
×
3155
  xnodeId.nd.node.type = QUERY_NODE_VALUE;
×
3156
  xnodeId.nd.node.resType.type = TSDB_DATA_TYPE_UBIGINT;
×
3157
  xnodeId.nd.datum.u = pJob->xnodeId;
×
3158
  if (pJob->xnodeId == 0) {
×
3159
    xnodeId.nd.isNull = true;
×
3160
  }
3161
  code = taosHashPut(pMap, "xnode_id", strlen("xnode_id") + 1, &xnodeId, sizeof(SXndRefValueNode));
×
3162
  if (code != 0) {
×
3163
    taosHashCleanup(pMap);
×
3164
    return NULL;
×
3165
  }
3166
  // config
3167
  SXndRefValueNode config = {0};
×
3168
  config.nd.node.type = QUERY_NODE_VALUE;
×
3169
  if (pJob->configLen > 0) {
×
3170
    config.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
3171
    config.nd.datum.p = taosStrndupi(pJob->config, strlen(pJob->config) + 1);
×
3172
    config.shouldFree = true;
×
3173
    code = taosHashPut(pMap, "config", strlen("config") + 1, &config, sizeof(SXndRefValueNode));
×
3174
  } else {
3175
    config.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
3176
    config.nd.datum.p = NULL;
×
3177
    config.nd.isNull = true;
×
3178
    code = taosHashPut(pMap, "config", strlen("config") + 1, &config, sizeof(SXndRefValueNode));
×
3179
  }
3180
  if (code != 0) {
×
3181
    taosHashCleanup(pMap);
×
3182
    return NULL;
×
3183
  }
3184
  // status
3185
  SXndRefValueNode status = {0};
×
3186
  status.nd.node.type = QUERY_NODE_VALUE;
×
3187
  if (pJob->statusLen > 0) {
×
3188
    status.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
3189
    status.nd.datum.p = taosStrndupi(pJob->status, strlen(pJob->status) + 1);
×
3190
    status.shouldFree = true;
×
3191
    code = taosHashPut(pMap, "status", strlen("status") + 1, &status, sizeof(SXndRefValueNode));
×
3192
  } else {
3193
    status.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
3194
    status.nd.datum.p = NULL;
×
3195
    status.nd.isNull = true;
×
3196
    code = taosHashPut(pMap, "status", strlen("status") + 1, &status, sizeof(SXndRefValueNode));
×
3197
  }
3198
  if (code != 0) {
×
3199
    taosHashCleanup(pMap);
×
3200
    return NULL;
×
3201
  }
3202
  // reason
3203
  SXndRefValueNode reason = {0};
×
3204
  reason.nd.node.type = QUERY_NODE_VALUE;
×
3205
  if (pJob->reasonLen > 0) {
×
3206
    reason.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
3207
    reason.nd.datum.p = taosStrndupi(pJob->reason, strlen(pJob->reason) + 1);
×
3208
    reason.shouldFree = true;
×
3209
    code = taosHashPut(pMap, "reason", strlen("reason") + 1, &reason, sizeof(SXndRefValueNode));
×
3210
  } else {
3211
    reason.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
3212
    reason.nd.datum.p = NULL;
×
3213
    reason.nd.isNull = true;
×
3214
    code = taosHashPut(pMap, "reason", strlen("reason") + 1, &reason, sizeof(SXndRefValueNode));
×
3215
  }
3216
  if (code != 0) {
×
3217
    taosHashCleanup(pMap);
×
3218
    return NULL;
×
3219
  }
3220
  // create time
3221
  SXndRefValueNode createTime = {0};
×
3222
  createTime.nd.node.type = QUERY_NODE_VALUE;
×
3223
  createTime.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
3224
  createTime.nd.datum.p = taosMemoryCalloc(1, TD_TIME_STR_LEN);
×
3225
  createTime.nd.datum.p = formatTimestampLocal(createTime.nd.datum.p, pJob->createTime, TSDB_TIME_PRECISION_MILLI);
×
3226
  createTime.shouldFree = true;
×
3227
  code = taosHashPut(pMap, "create_time", strlen("create_time") + 1, &createTime, sizeof(SXndRefValueNode));
×
3228
  if (code != 0) {
×
3229
    taosHashCleanup(pMap);
×
3230
    return NULL;
×
3231
  }
3232
  // update time
3233
  SXndRefValueNode updateTime = {0};
×
3234
  updateTime.nd.node.type = QUERY_NODE_VALUE;
×
3235
  updateTime.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
3236
  updateTime.nd.datum.p = taosMemoryCalloc(1, TD_TIME_STR_LEN);
×
3237
  updateTime.nd.datum.p = formatTimestampLocal(updateTime.nd.datum.p, pJob->updateTime, TSDB_TIME_PRECISION_MILLI);
×
3238
  updateTime.shouldFree = true;
×
3239
  code = taosHashPut(pMap, "update_time", strlen("update_time") + 1, &updateTime, sizeof(SXndRefValueNode));
×
3240
  return pMap;
×
3241
}
3242

3243
typedef bool (*FOpCmp)(SValueNode *pval1, SValueNode *pval2);
3244

3245
#define XNODE_DEF_OP_FUNC(NAME, OP)                             \
3246
  static bool NAME(SValueNode *pval1, SValueNode *pval2) {      \
3247
    switch (pval1->node.resType.type) {                         \
3248
      case TSDB_DATA_TYPE_BOOL:                                 \
3249
        return pval1->datum.b OP pval2->datum.b;                \
3250
      case TSDB_DATA_TYPE_UBIGINT:                              \
3251
        return pval1->datum.u OP pval2->datum.u;                \
3252
      case TSDB_DATA_TYPE_BIGINT:                               \
3253
        return pval1->datum.i OP pval2->datum.i;                \
3254
      case TSDB_DATA_TYPE_FLOAT:                                \
3255
        return pval1->datum.d OP pval2->datum.d;                \
3256
      case TSDB_DATA_TYPE_BINARY: {                             \
3257
        if (pval1->datum.p == NULL || pval2->datum.p == NULL) { \
3258
          return pval1->datum.p OP pval2->datum.p;              \
3259
        }                                                       \
3260
        return strcmp(pval1->datum.p, pval2->datum.p) OP 0;     \
3261
      }                                                         \
3262
      default:                                                  \
3263
        return false;                                           \
3264
    }                                                           \
3265
  }
3266

3267
XNODE_DEF_OP_FUNC(op_greater_than, >)
×
3268
XNODE_DEF_OP_FUNC(op_greater_equal, >=)
×
3269
XNODE_DEF_OP_FUNC(op_lower_than, <)
×
3270
XNODE_DEF_OP_FUNC(op_lower_equal, <=)
×
3271
XNODE_DEF_OP_FUNC(op_equal, ==)
×
3272
XNODE_DEF_OP_FUNC(op_not_equal, !=)
×
3273

3274
static int32_t call_op_cmp(SXndWhereContext *pctx, FOpCmp opFn) {
×
3275
  int32_t     code = 0;
×
3276
  SValueNode *pval2 = (SValueNode *)taosArrayPop(pctx->stack);
×
3277
  SValueNode *pval1 = (SValueNode *)taosArrayPop(pctx->stack);
×
3278
  bool        ret = false;
×
3279

3280
  if (pval1->node.type != pval2->node.type) {
×
3281
    code = TSDB_CODE_MND_XNODE_WHERE_COL_TYPE_DIFF;
×
3282
    mError("xnode type not same, v1 type: %d, v2 type: %d", pval1->node.type, pval2->node.type);
×
3283
    goto _OVER;
×
3284
  } else {
3285
    mDebug("xnode type v1:%d, is null: %d, UB: %" PRIu64 ", v2 type: %d, is null: %d, UB: %" PRIu64, pval1->node.type,
×
3286
           pval1->isNull, pval1->datum.u, pval2->node.type, pval2->isNull, pval2->datum.u);
3287

3288
    ret = (*opFn)(pval1, pval2);
×
3289
  }
3290
  SXndRefValueNode pval = {0};
×
3291
  pval.nd.node.type = QUERY_NODE_VALUE;
×
3292
  pval.nd.node.resType.type = TSDB_DATA_TYPE_BOOL;
×
3293
  pval.nd.datum.b = ret;
×
3294
  if (NULL == taosArrayPush(pctx->stack, &pval)) {
×
3295
    mError("xnode evaluate walker array push error: %s", tstrerror(terrno));
×
3296
    code = terrno;
×
3297
    goto _OVER;
×
3298
  }
3299

3300
_OVER:
×
3301
  if (pval1 != NULL) freeSXndRefValueNode((SXndRefValueNode *)pval1);
×
3302
  if (pval2 != NULL) freeSXndRefValueNode((SXndRefValueNode *)pval2);
×
3303
  TAOS_RETURN(code);
×
3304
}
3305

3306
#define XND_WALKER_CHECK_GOTO(CMD, LABEL)    \
3307
  do {                                       \
3308
    pctx->code = (CMD);                      \
3309
    if ((pctx->code != TSDB_CODE_SUCCESS)) { \
3310
      goto LABEL;                            \
3311
    }                                        \
3312
  } while (0);
3313

3314
static EDealRes evaluateWaker(SNode *pNode, void *pWhereCtx) {
×
3315
  int32_t           code = 0;
×
3316
  SXndWhereContext *pctx = (SXndWhereContext *)pWhereCtx;
×
3317

3318
  if (nodeType(pNode) == QUERY_NODE_COLUMN) {
×
3319
    SColumnNode *colNode = (SColumnNode *)pNode;
×
3320
    SXndRefValueNode *pval =
3321
        (SXndRefValueNode *)taosHashGet(pctx->pMap, colNode->colName, strlen(colNode->colName) + 1);
×
3322
    if (pval == NULL) {
×
3323
      mError("xnode evaluateWhereCond hash get error: %s", tstrerror(terrno));
×
3324
      pctx->code = TSDB_CODE_MND_XNODE_WHERE_COL_NOT_EXIST;
×
3325
      return DEAL_RES_END;
×
3326
    }
3327
    if (NULL == taosArrayPush(pctx->stack, pval)) {
×
3328
      mError("xnode evaluate walker array push error: %s", tstrerror(terrno));
×
3329
      pctx->code = TSDB_CODE_FAILED;
×
3330
      return DEAL_RES_END;
×
3331
    }
3332
    return DEAL_RES_CONTINUE;
×
3333
  }
3334
  if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
3335
    SValueNode *pval = (SValueNode *)pNode;
×
3336
    if (pval->node.resType.type == TSDB_DATA_TYPE_UBIGINT) {
×
3337
      pval->datum.u = taosStr2Int64(pval->literal, NULL, 10);
×
3338
    }
3339
    if (pval->node.resType.type == TSDB_DATA_TYPE_BINARY) {
×
3340
      if (pval->datum.p == NULL) {
×
3341
        pval->datum.p = taosStrndupi(pval->literal, strlen(pval->literal) + 1);
×
3342
      }
3343
    }
3344
    if (pval->node.resType.type == TSDB_DATA_TYPE_NULL) {
×
3345
      pval->isNull = true;
×
3346
      pval->datum.p = NULL;
×
3347
    }
3348
    if (pval->node.resType.type == TSDB_DATA_TYPE_BOOL) {
×
3349
      pval->datum.b = pval->literal[0] == 't' || pval->literal[0] == 'T';
×
3350
    }
3351
    SXndRefValueNode refVal = {0};
×
3352
    refVal.nd = *pval;
×
3353
    refVal.shouldFree = false;
×
3354
    if (NULL == taosArrayPush(pctx->stack, &refVal)) {
×
3355
      mError("xnode evaluate walker array push error: %s", tstrerror(terrno));
×
3356
      pctx->code = TSDB_CODE_FAILED;
×
3357
      return DEAL_RES_END;
×
3358
    }
3359
    return DEAL_RES_CONTINUE;
×
3360
  }
3361

3362
  if (nodeType(pNode) == QUERY_NODE_OPERATOR) {
×
3363
    SOperatorNode *opNode = (SOperatorNode *)pNode;
×
3364
    switch (opNode->opType) {
×
3365
      case OP_TYPE_GREATER_THAN: {
×
3366
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_greater_than), _exit);
×
3367
        break;
×
3368
      }
3369
      case OP_TYPE_GREATER_EQUAL: {
×
3370
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_greater_equal), _exit);
×
3371
        break;
×
3372
      }
3373
      case OP_TYPE_LOWER_THAN: {
×
3374
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_lower_than), _exit);
×
3375
        break;
×
3376
      }
3377
      case OP_TYPE_LOWER_EQUAL: {
×
3378
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_lower_equal), _exit);
×
3379
        break;
×
3380
      }
3381
      case OP_TYPE_EQUAL: {
×
3382
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_equal), _exit);
×
3383
        break;
×
3384
      }
3385
      case OP_TYPE_NOT_EQUAL: {
×
3386
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_not_equal), _exit);
×
3387
        break;
×
3388
      }
3389
      default:
×
3390
        pctx->code = TSDB_CODE_MND_XNODE_WHERE_OP_NOT_SUPPORT;
×
3391
        return DEAL_RES_CONTINUE;
×
3392
    }
3393
    return DEAL_RES_CONTINUE;
×
3394
  }
3395

3396
  if (nodeType(pNode) == QUERY_NODE_LOGIC_CONDITION) {
×
3397
    SLogicConditionNode *logicNode = (SLogicConditionNode *)pNode;
×
3398
    SXndRefValueNode     pval = {0};
×
3399
    pval.nd.node.type = QUERY_NODE_VALUE;
×
3400
    pval.nd.node.resType.type = TSDB_DATA_TYPE_BOOL;
×
3401

3402
    switch (logicNode->condType) {
×
3403
      case LOGIC_COND_TYPE_AND: {
×
3404
        SValueNode *pval2 = (SValueNode *)taosArrayPop(pctx->stack);
×
3405
        SValueNode *pval1 = (SValueNode *)taosArrayPop(pctx->stack);
×
3406

3407
        pval.nd.datum.b = pval1->datum.b && pval2->datum.b;
×
3408
        if (NULL == taosArrayPush(pctx->stack, &pval)) {
×
3409
          mError("xnode walker AND array push err: %s", tstrerror(terrno));
×
3410
          pctx->code = TSDB_CODE_FAILED;
×
3411
          return DEAL_RES_END;
×
3412
        }
3413

3414
        freeSXndRefValueNode((SXndRefValueNode *)pval1);
×
3415
        freeSXndRefValueNode((SXndRefValueNode *)pval2);
×
3416
        break;
×
3417
      }
3418
      case LOGIC_COND_TYPE_OR: {
×
3419
        SValueNode *pval2 = (SValueNode *)taosArrayPop(pctx->stack);
×
3420
        SValueNode *pval1 = (SValueNode *)taosArrayPop(pctx->stack);
×
3421

3422
        pval.nd.datum.b = pval1->datum.b || pval2->datum.b;
×
3423
        if (NULL == taosArrayPush(pctx->stack, &pval)) {
×
3424
          mError("xnode walker OR array push err: %s", tstrerror(terrno));
×
3425
          pctx->code = TSDB_CODE_FAILED;
×
3426
          return DEAL_RES_END;
×
3427
        }
3428

3429
        freeSXndRefValueNode((SXndRefValueNode *)pval1);
×
3430
        freeSXndRefValueNode((SXndRefValueNode *)pval2);
×
3431
        break;
×
3432
      }
3433
      case LOGIC_COND_TYPE_NOT: {
×
3434
        SValueNode *pval1 = (SValueNode *)taosArrayPop(pctx->stack);
×
3435

3436
        pval.nd.datum.b = !pval1->datum.b;
×
3437
        if (NULL == taosArrayPush(pctx->stack, &pval)) {
×
3438
          mError("xnode walker NOT array push err: %s", tstrerror(terrno));
×
3439
          pctx->code = TSDB_CODE_FAILED;
×
3440
          return DEAL_RES_END;
×
3441
        }
3442

3443
        freeSXndRefValueNode((SXndRefValueNode *)pval1);
×
3444
        break;
×
3445
      }
3446
      default:
×
3447
        break;
×
3448
    }
3449
    return DEAL_RES_CONTINUE;
×
3450
  }
3451

3452
  pctx->code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
3453

3454
_exit:
×
3455
  return DEAL_RES_END;
×
3456
}
3457

3458
static bool evaluateWhereCond(SNode *pWhere, SHashObj *pDataMap, int32_t *code) {
×
3459
  bool             ret = false;
×
3460
  SXndWhereContext ctx = {0};
×
3461

3462
  ctx.stack = taosArrayInit(64, sizeof(SXndRefValueNode));
×
3463
  if (ctx.stack == NULL) {
×
3464
    mError("xnode evaluateWhereCond error: %s", tstrerror(terrno));
×
3465
    *code = terrno;
×
3466
    goto _exit;
×
3467
  }
3468
  ctx.pMap = pDataMap;
×
3469

3470
  // walkexpr pnode
3471
  nodesWalkExprPostOrder(pWhere, evaluateWaker, &ctx);
×
3472
  if (ctx.code != TSDB_CODE_SUCCESS) {
×
3473
    *code = ctx.code;
×
3474
    mError("xnode walkExpr error: %s", tstrerror(ctx.code));
×
3475
    goto _exit;
×
3476
  }
3477
  SValueNode *pval = taosArrayGetLast(ctx.stack);
×
3478
  if (pval == NULL) {
×
3479
    *code = terrno;
×
3480
    mError("xnode evaluateWhereCond error: %s", tstrerror(terrno));
×
3481
    goto _exit;
×
3482
  }
3483
  mDebug("xnode ctx stack size:%lu, last nd type:%d, bool:%d", ctx.stack->size, pval->node.type, pval->datum.b);
×
3484
  ret = pval->datum.b;
×
3485

3486
_exit:
×
3487
  freeSXndWhereContext(&ctx);
×
3488
  return ret;
×
3489
}
3490

3491
static int32_t filterJobsByWhereCond(SNode *pWhere, SArray *pArray, SArray **ppResult) {
×
3492
  int32_t code = 0;
×
3493

3494
  *ppResult = taosArrayInit(64, sizeof(SXnodeJobObj));
×
3495
  if (*ppResult == NULL) {
×
3496
    code = terrno;
×
3497
    goto _exit;
×
3498
  }
3499
  for (int32_t i = 0; i < pArray->size; i++) {
×
3500
    SXnodeJobObj *pJob = taosArrayGet(pArray, i);
×
3501

3502
    SHashObj *pDataMap = NULL;
×
3503
    if ((pDataMap = convertJob2Map(pJob)) == NULL) {
×
3504
      mError("xnode evaluate convertJow2Map error: %s", tstrerror(terrno));
×
3505
      goto _exit;
×
3506
    }
3507
    if (evaluateWhereCond(pWhere, pDataMap, &code)) {
×
3508
      if (NULL == taosArrayPush(*ppResult, pJob)) {
×
3509
        mError("xnode filterJobsByWhereCond array push err: %s", tstrerror(terrno));
×
3510
        code = TSDB_CODE_FAILED;
×
3511
        goto _exit;
×
3512
      }
3513
    }
3514
    if (code != TSDB_CODE_SUCCESS) {
×
3515
      goto _exit;
×
3516
    }
3517
  }
3518

3519
_exit:
×
3520
  TAOS_RETURN(code);
×
3521
}
3522

3523
#define XND_LOG_END(code, lino)                                                                 \
3524
  do {                                                                                          \
3525
    if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {                    \
3526
      mError("xnode:%s failed at line %d code: %d, since %s", __func__, lino, code, tstrerror(code)); \
3527
    }                                                                                           \
3528
  } while (0)
3529

3530
void httpRebalanceAuto(SArray *pResult) {
×
3531
  int32_t code = 0;
×
3532
  int32_t lino = 0;
×
3533
  SJson *pJsonArr = NULL;
×
3534
  char  *pContStr = NULL;
×
3535
  // convert pResult to [(tid, jid)*]
3536
  pJsonArr = tjsonCreateArray();
×
3537
  if (pJsonArr == NULL) {
×
3538
    code = terrno;
×
3539
    mError("xnode json array error: %s", tstrerror(code));
×
3540
    goto _OVER;
×
3541
  }
3542
  for (int32_t i = 0; i < pResult->size; i++) {
×
3543
    SXnodeJobObj *pJob = taosArrayGet(pResult, i);
×
3544
    SJson        *pJsonObj = tjsonCreateObject();
×
3545
    if (pJsonObj == NULL) {
×
3546
      code = terrno;
×
3547
      mError("xnode json object error: %s", tstrerror(code));
×
3548
      goto _OVER;
×
3549
    }
3550
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJsonObj, "tid", pJob->taskId), &lino, _OVER);
×
3551
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJsonObj, "jid", pJob->id), &lino, _OVER);
×
3552
    TAOS_CHECK_GOTO(tjsonAddItemToArray(pJsonArr, pJsonObj), &lino, _OVER);
×
3553
  }
3554

3555
  pContStr = tjsonToUnformattedString(pJsonArr);
×
3556
  if (pContStr == NULL) {
×
3557
    mError("xnode to json string error: %s", tstrerror(terrno));
×
3558
    goto _OVER;
×
3559
  }
3560
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
3561
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/rebalance/auto", XNODED_PIPE_SOCKET_URL);
×
3562
  SJson* pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, defaultTimeout, pContStr, strlen(pContStr));
×
3563
  if (pJson) {
×
3564
    tjsonDelete(pJson);
×
3565
  }
3566

3567
_OVER:
×
3568
  if (pJsonArr != NULL) tjsonDelete(pJsonArr);
×
3569
  if (pContStr != NULL) taosMemoryFree(pContStr);
×
3570
  XND_LOG_END(code, lino);
×
3571
  return;
×
3572
}
3573

3574
static int32_t mndProcessRebalanceXnodeJobsWhereReq(SRpcMsg *pReq) {
×
3575
  int32_t                      code = 0;
×
3576
  SMnode                      *pMnode = pReq->info.node;
×
3577
  SMRebalanceXnodeJobsWhereReq rebalanceReq = {0};
×
3578
  SNode                       *pWhere = NULL;
×
3579
  SArray                      *pArray = NULL;
×
3580
  SArray                      *pResult = NULL;
×
3581

3582
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_REBALANCE_XNODE_JOB);
×
3583
  if (code != TSDB_CODE_SUCCESS) {
×
3584
    mError("failed check permission for rebalance xnode jobs where, code:%s", tstrerror(code));
×
3585
    goto _OVER;
×
3586
  }
3587
  TAOS_CHECK_GOTO(tDeserializeSMRebalanceXnodeJobsWhereReq(pReq->pCont, pReq->contLen, &rebalanceReq), NULL, _OVER);
×
3588

3589
  TAOS_CHECK_GOTO(mndAcquireXnodeJobsAll(pMnode, &pArray), NULL, _OVER);
×
3590
  if (NULL != rebalanceReq.ast.ptr) {
×
3591
    TAOS_CHECK_GOTO(nodesStringToNode(rebalanceReq.ast.ptr, &pWhere), NULL, _OVER);
×
3592

3593
    TAOS_CHECK_GOTO(filterJobsByWhereCond(pWhere, pArray, &pResult), NULL, _OVER);
×
3594
    httpRebalanceAuto(pResult);
×
3595
  } else {
3596
    httpRebalanceAuto(pArray);
×
3597
  }
3598

3599
_OVER:
×
3600
  if (pWhere != NULL) {
×
3601
    nodesDestroyNode(pWhere);
×
3602
  }
3603
  if (pArray != NULL) {
×
3604
    taosArrayDestroy(pArray);
×
3605
  }
3606
  if (pResult != NULL) {
×
3607
    taosArrayDestroy(pResult);
×
3608
  }
3609
  tFreeSMRebalanceXnodeJobsWhereReq(&rebalanceReq);
×
3610
  TAOS_RETURN(code);
×
3611
}
3612

3613
static int32_t dropXnodeJobById(SMnode *pMnode, SRpcMsg *pReq, int32_t jid) {
×
3614
  int32_t       code = 0;
×
3615
  int32_t       lino = 0;
×
3616
  SXnodeJobObj *pObj = NULL;
×
3617

3618
  pObj = mndAcquireXnodeJob(pMnode, jid);
×
3619
  if (pObj == NULL) {
×
3620
    code = terrno;
×
3621
    lino = __LINE__;
×
3622
    goto _OVER;
×
3623
  }
3624
  code = mndDropXnodeJob(pMnode, pReq, pObj);
×
3625

3626
_OVER:
×
3627
  XND_LOG_END(code, lino);
×
3628
  mndReleaseXnodeJob(pMnode, pObj);
×
3629
  return code;
×
3630
}
3631

3632
static int32_t dropXnodeJobByWhereCond(SMnode *pMnode, SRpcMsg *pReq, SMDropXnodeJobReq *dropReq) {
×
3633
  int32_t       code = 0;
×
3634
  int32_t       lino = 0;
×
3635
  SXnodeJobObj *pObj = NULL;
×
3636
  SNode        *pWhere = NULL;
×
3637
  SArray       *pArray = NULL;
×
3638
  SArray       *pResult = NULL;
×
3639

3640
  if (NULL != dropReq->ast.ptr) {
×
3641
    TAOS_CHECK_GOTO(mndAcquireXnodeJobsAll(pMnode, &pArray), &lino, _OVER);
×
3642
    TAOS_CHECK_GOTO(nodesStringToNode(dropReq->ast.ptr, &pWhere), &lino, _OVER);
×
3643
    TAOS_CHECK_GOTO(filterJobsByWhereCond(pWhere, pArray, &pResult), &lino, _OVER);
×
3644

3645
    for (int32_t i = 0; i < pResult->size; i++) {
×
3646
      pObj = taosArrayGet(pResult, i);
×
3647
      TAOS_CHECK_GOTO(mndDropXnodeJob(pMnode, NULL, pObj), &lino, _OVER);
×
3648
    }
3649
  }
3650

3651
_OVER:
×
3652
  XND_LOG_END(code, lino);
×
3653
  if (pResult != NULL) {
×
3654
    taosArrayDestroy(pResult);
×
3655
  }
3656
  if (pWhere != NULL) {
×
3657
    nodesDestroyNode(pWhere);
×
3658
  }
3659
  if (pArray != NULL) {
×
3660
    taosArrayDestroy(pArray);
×
3661
  }
3662
  return code;
×
3663
}
3664

3665
static int32_t mndProcessDropXnodeJobReq(SRpcMsg *pReq) {
×
3666
  mDebug("drop xnode job req, content len:%d", pReq->contLen);
×
3667
  SMnode           *pMnode = pReq->info.node;
×
3668
  int32_t           code = -1;
×
3669
  SMDropXnodeJobReq dropReq = {0};
×
3670

3671
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_DROP_XNODE_JOB);
×
3672
  if (code != TSDB_CODE_SUCCESS) {
×
3673
    mError("failed check permission for drop xnode jobs, code:%s", tstrerror(code));
×
3674
    goto _OVER;
×
3675
  }
3676
  TAOS_CHECK_GOTO(tDeserializeSMDropXnodeJobReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
3677
  mDebug("Xnode drop job with jid:%d", dropReq.jid);
×
3678

3679
  if (dropReq.jid <= 0 && dropReq.ast.ptr == NULL) {
×
3680
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
3681
    goto _OVER;
×
3682
  }
3683
  if (dropReq.jid > 0) {
×
3684
    code = dropXnodeJobById(pMnode, pReq, dropReq.jid);
×
3685
    if (code == 0) {
×
3686
      code = TSDB_CODE_ACTION_IN_PROGRESS;
×
3687
    } else {
3688
      goto _OVER;
×
3689
    }
3690
  } else {
3691
    TAOS_CHECK_GOTO(dropXnodeJobByWhereCond(pMnode, pReq, &dropReq), NULL, _OVER);
×
3692
  }
3693

3694
_OVER:
×
3695
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
3696
    mError("xnode:%d, failed to drop since %s", dropReq.jid, tstrerror(code));
×
3697
  }
3698
  tFreeSMDropXnodeJobReq(&dropReq);
×
3699
  TAOS_RETURN(code);
×
3700
}
3701

3702
/**
3703
 * @brief Mapping the columns of show xnode jobs
3704
 *
3705
 * See [xnodeTaskJobSchema] in systable.h.
3706
 *
3707
 *  {.name = "jid", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
3708
    {.name = "tid", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
3709
    {.name = "config", .bytes = TSDB_XNODE_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo =
3710
 false},
3711
    {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
3712
    // {.name = "reason", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
3713
    {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
3714
    {.name = "update_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
3715
 * @param pReq
3716
 * @param pShow
3717
 * @param pBlock
3718
 * @param rows
3719
 * @return int32_t
3720
 */
3721
static int32_t mndRetrieveXnodeJobs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
3722
  SMnode       *pMnode = pReq->info.node;
×
3723
  SSdb         *pSdb = pMnode->pSdb;
×
3724
  int32_t       numOfRows = 0;
×
3725
  int32_t       cols = 0;
×
3726
  SXnodeJobObj *pObj = NULL;
×
3727
  char          buf[VARSTR_HEADER_SIZE + TMAX(TSDB_XNODE_TASK_JOB_CONFIG_LEN, TSDB_XNODE_TASK_REASON_LEN)];
×
3728
  char          status[64] = {0};
×
3729
  int32_t       code = 0;
×
3730
  mDebug("show.type:%d, %s:%d: retrieve xnode jobs with rows: %d", pShow->type, __FILE__, __LINE__, rows);
×
3731

3732
  while (numOfRows < rows) {
×
3733
    pShow->pIter = sdbFetch(pSdb, SDB_XNODE_JOB, pShow->pIter, (void **)&pObj);
×
3734
    if (pShow->pIter == NULL) break;
×
3735

3736
    cols = 0;
×
3737
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3738
    // id
3739
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
×
3740
    if (code != 0) goto _end;
×
3741
    // tid
3742
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3743
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->taskId, false);
×
3744
    if (code != 0) goto _end;
×
3745

3746
    // config
3747
    buf[0] = 0;
×
3748
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->config, pShow->pMeta->pSchemas[cols].bytes);
×
3749
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3750
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
3751
    if (code != 0) goto _end;
×
3752

3753
    // via
3754
    if (pObj->via != 0) {
×
3755
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3756
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->via, false);
×
3757
      if (code != 0) goto _end;
×
3758
    } else {
3759
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3760
      colDataSetNULL(pColInfo, numOfRows);
×
3761
    }
3762

3763
    // xnode_id
3764
    if (pObj->xnodeId != 0) {
×
3765
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3766
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->xnodeId, false);
×
3767
      if (code != 0) goto _end;
×
3768
    } else {
3769
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3770
      colDataSetNULL(pColInfo, numOfRows);
×
3771
    }
3772

3773
    // status
3774
    if (pObj->statusLen > 0) {
×
3775
      buf[0] = 0;
×
3776
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->status, pShow->pMeta->pSchemas[cols].bytes);
×
3777
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3778
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
3779
      if (code != 0) goto _end;
×
3780
    } else {
3781
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3782
      colDataSetNULL(pColInfo, numOfRows);
×
3783
    }
3784

3785
    // reason
3786
    if (pObj->reasonLen > 0) {
×
3787
      buf[0] = 0;
×
3788
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->reason, pShow->pMeta->pSchemas[cols].bytes);
×
3789
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3790
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
3791
      if (code != 0) goto _end;
×
3792
    } else {
3793
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3794
      colDataSetNULL(pColInfo, numOfRows);
×
3795
    }
3796

3797
    // create_time
3798
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3799
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createTime, false);
×
3800
    if (code != 0) goto _end;
×
3801

3802
    // update_time
3803
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3804
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->updateTime, false);
×
3805
    if (code != 0) goto _end;
×
3806

3807
    numOfRows++;
×
3808
    sdbRelease(pSdb, pObj);
×
3809
  }
3810

3811
_end:
×
3812
  if (code != 0) sdbRelease(pSdb, pObj);
×
3813

3814
  pShow->numOfRows += numOfRows;
×
3815
  return numOfRows;
×
3816
}
3817
static void mndCancelGetNextXnodeJob(SMnode *pMnode, void *pIter) {
×
3818
  SSdb *pSdb = pMnode->pSdb;
×
3819
  sdbCancelFetchByType(pSdb, pIter, SDB_XNODE_JOB);
×
3820
}
×
3821

3822
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
×
3823
  SCurlResp *pRsp = userdata;
×
3824
  if (contLen == 0 || nmemb == 0 || pCont == NULL) {
×
3825
    pRsp->dataLen = 0;
×
3826
    pRsp->data = NULL;
×
3827
    uError("curl response is received, len:%" PRId64, pRsp->dataLen);
×
3828
    return 0;
×
3829
  }
3830

3831
  int64_t newDataSize = (int64_t)contLen * nmemb;
×
3832
  int64_t size = pRsp->dataLen + newDataSize;
×
3833

3834
  if (pRsp->data == NULL) {
×
3835
    pRsp->data = taosMemoryMalloc(size + 1);
×
3836
    if (pRsp->data == NULL) {
×
3837
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
3838
      return 0;  // return the recv length, if failed, return 0
×
3839
    }
3840
  } else {
3841
    char *p = taosMemoryRealloc(pRsp->data, size + 1);
×
3842
    if (p == NULL) {
×
3843
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
3844
      return 0;  // return the recv length, if failed, return 0
×
3845
    }
3846

3847
    pRsp->data = p;
×
3848
  }
3849

3850
  if (pRsp->data != NULL) {
×
3851
    (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize);
×
3852

3853
    pRsp->dataLen = size;
×
3854
    pRsp->data[size] = 0;
×
3855

3856
    uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data);
×
3857
    return newDataSize;
×
3858
  } else {
3859
    pRsp->dataLen = 0;
×
3860
    uError("failed to malloc curl response");
×
3861
    return 0;
×
3862
  }
3863
}
3864

3865
#ifndef WINDOWS
3866
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) {
×
3867
  CURL   *curl = NULL;
×
3868
  int32_t code = 0;
×
3869
  int32_t lino = 0;
×
3870

3871
  curl = curl_easy_init();
×
3872
  if (curl == NULL) {
×
3873
    uError("failed to create curl handle");
×
3874
    return -1;
×
3875
  }
3876

3877
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_UNIX_SOCKET_PATH, socketPath), &lino, _OVER);
×
3878
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_URL, url), &lino, _OVER);
×
3879
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData), &lino, _OVER);
×
3880
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp), &lino, _OVER);
×
3881
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout), &lino, _OVER);
×
3882

3883
  uDebug("curl get request will sent, url:%s", url);
×
3884
  CURLcode curlCode = curl_easy_perform(curl);
×
3885
  if (curlCode != CURLE_OK) {
×
3886
    if (curlCode == CURLE_OPERATION_TIMEDOUT) {
×
3887
      mError("xnode failed to perform curl action, code:%d", curlCode);
×
3888
      code = TSDB_CODE_MND_XNODE_URL_RESP_TIMEOUT;
×
3889
      goto _OVER;
×
3890
    }
3891
    uError("failed to perform curl action, code:%d", curlCode);
×
3892
    code = TSDB_CODE_MND_XNODE_URL_CANT_ACCESS;
×
3893
    goto _OVER;
×
3894
  }
3895

3896
  long http_code = 0;
×
3897
  TAOS_CHECK_GOTO(curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code), &lino, _OVER);
×
3898
  if (http_code != 200) {
×
3899
    code = TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR;
×
3900
  }
3901

3902
_OVER:
×
3903
  if (curl != NULL) curl_easy_cleanup(curl);
×
3904
  XND_LOG_END(code, lino);
×
3905
  return code;
×
3906
}
3907

3908
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout,
×
3909
                                   const char *socketPath) {
3910
  struct curl_slist *headers = NULL;
×
3911
  CURL              *curl = NULL;
×
3912
  int32_t            code = 0;
×
3913
  int32_t            lino = 0;
×
3914

3915
  curl = curl_easy_init();
×
3916
  if (curl == NULL) {
×
3917
    mError("xnode failed to create curl handle");
×
3918
    return -1;
×
3919
  }
3920

3921
  headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8");
×
3922
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_UNIX_SOCKET_PATH, socketPath), &lino, _OVER);
×
3923
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers), &lino, _OVER);
×
3924
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_URL, url), &lino, _OVER);
×
3925
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData), &lino, _OVER);
×
3926
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp), &lino, _OVER);
×
3927
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout), &lino, _OVER);
×
3928
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_POST, 1), &lino, _OVER);
×
3929
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen), &lino, _OVER);
×
3930
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf), &lino, _OVER);
×
3931
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_VERBOSE, 0L), &lino, _OVER);
×
3932
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L), &lino, _OVER);
×
3933

3934
  mDebug("xnode curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf);
×
3935
  CURLcode curlCode = curl_easy_perform(curl);
×
3936

3937
  if (curlCode != CURLE_OK) {
×
3938
    if (curlCode == CURLE_OPERATION_TIMEDOUT) {
×
3939
      mError("xnode failed to perform curl action, code:%d", curlCode);
×
3940
      code = TSDB_CODE_MND_XNODE_URL_RESP_TIMEOUT;
×
3941
      goto _OVER;
×
3942
    }
3943
    uError("xnode failed to perform curl action, code:%d", curlCode);
×
3944
    code = TSDB_CODE_MND_XNODE_URL_CANT_ACCESS;
×
3945
    goto _OVER;
×
3946
  }
3947

3948
  long http_code = 0;
×
3949
  TAOS_CHECK_GOTO(curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code), &lino, _OVER);
×
3950
  if (http_code != 200) {
×
3951
    mError("xnode failed to perform curl action, http code:%ld", http_code);
×
3952
    code = TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR;
×
3953
  }
3954

3955
_OVER:
×
3956
  if (curl != NULL) {
×
3957
    curl_slist_free_all(headers);
×
3958
    curl_easy_cleanup(curl);
×
3959
  }
3960
  XND_LOG_END(code, lino);
×
3961
  return code;
×
3962
}
3963

3964
static int32_t taosCurlDeleteRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) {
×
3965
  CURL   *curl = NULL;
×
3966
  int32_t code = 0;
×
3967
  int32_t lino = 0;
×
3968

3969
  curl = curl_easy_init();
×
3970
  if (curl == NULL) {
×
3971
    uError("xnode failed to create curl handle");
×
3972
    return -1;
×
3973
  }
3974

3975
  if (curl_easy_setopt(curl, CURLOPT_UNIX_SOCKET_PATH, socketPath)) goto _OVER;
×
3976
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
×
3977
  if (curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE") != 0) goto _OVER;
×
3978
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
×
3979
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
×
3980
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout) != 0) goto _OVER;
×
3981

3982
  uDebug("xnode curl get request will sent, url:%s", url);
×
3983
  CURLcode curlCode = curl_easy_perform(curl);
×
3984
  if (curlCode != CURLE_OK) {
×
3985
    uError("xnode failed to perform curl action, curl code:%d", curlCode);
×
3986
    if (curlCode == CURLE_OPERATION_TIMEDOUT) {
×
3987
      code = TSDB_CODE_MND_XNODE_URL_RESP_TIMEOUT;
×
3988
      goto _OVER;
×
3989
    }
3990
    code = TSDB_CODE_MND_XNODE_URL_CANT_ACCESS;
×
3991
    goto _OVER;
×
3992
  }
3993

3994
  long http_code = 0;
×
3995
  TAOS_CHECK_GOTO(curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code), &lino, _OVER);
×
3996
  if (http_code != 200 && http_code != 204) {
×
3997
    uError("xnode curl request response http code:%ld", http_code);
×
3998
    code = TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR;
×
3999
  }
4000

4001
_OVER:
×
4002
  if (curl != NULL) curl_easy_cleanup(curl);
×
4003
  XND_LOG_END(code, lino);
×
4004
  return code;
×
4005
}
4006
#else
4007
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) { return 0; }
4008
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout,
4009
                                   const char *socketPath) {
4010
  return 0;
4011
}
4012
static int32_t taosCurlDeleteRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) { return 0; }
4013
#endif
4014
SJson *mndSendReqRetJson(const char *url, EHttpType type, int64_t timeout, const char *buf, int64_t bufLen) {
×
4015
  SJson    *pJson = NULL;
×
4016
  SCurlResp curlRsp = {0};
×
4017
  char      socketPath[PATH_MAX] = {0};
×
4018

4019
  getXnodedPipeName(socketPath, sizeof(socketPath));
×
4020
  if (!taosCheckExistFile(socketPath)) {
×
4021
    uError("xnode failed to send request, socket path:%s not exist", socketPath);
×
4022
    terrno = TSDB_CODE_MND_XNODE_URL_CANT_ACCESS;
×
4023
    goto _EXIT;
×
4024
  }
4025
  if (type == HTTP_TYPE_GET) {
×
4026
    if ((terrno = taosCurlGetRequest(url, &curlRsp, timeout, socketPath)) != 0) {
×
4027
      goto _OVER;
×
4028
    }
4029
  } else if (type == HTTP_TYPE_POST) {
×
4030
    if ((terrno = taosCurlPostRequest(url, &curlRsp, buf, bufLen, timeout, socketPath)) != 0) {
×
4031
      goto _OVER;
×
4032
    }
4033
  } else if (type == HTTP_TYPE_DELETE) {
×
4034
    if ((terrno = taosCurlDeleteRequest(url, &curlRsp, timeout, socketPath)) != 0) {
×
4035
      goto _OVER;
×
4036
    }
4037
  } else {
4038
    uError("xnode invalid http type:%d", type);
×
4039
    terrno = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
4040
    goto _EXIT;
×
4041
  }
4042

4043
_OVER:
×
4044
  if (terrno == TSDB_CODE_SUCCESS) {
×
4045
    if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
×
4046
      pJson = tjsonCreateObject();
×
4047
      goto _EXIT;
×
4048
    }
4049
    pJson = tjsonParse(curlRsp.data);
×
4050
    if (pJson == NULL) {
×
4051
      terrno = TSDB_CODE_INVALID_JSON_FORMAT;
×
4052
      goto _EXIT;
×
4053
    }
4054
  } else if (terrno == TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR) {
×
4055
    pJson = tjsonCreateObject();
×
4056
    char *buf = taosMemCalloc(1, curlRsp.dataLen + 1);
×
4057
    (void)memcpy(buf, curlRsp.data, curlRsp.dataLen);
×
4058
    if (tjsonAddStringToObject(pJson, "__inner_error", buf) != TSDB_CODE_SUCCESS) {
×
4059
      taosMemoryFreeClear(buf);
×
4060
      goto _EXIT;
×
4061
    }
4062
    taosMemoryFreeClear(buf);
×
4063
  }
4064

4065
_EXIT:
×
4066
  if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data);
×
4067
  if (terrno != TSDB_CODE_SUCCESS) {
×
4068
    mError("xnode failed to send request, url: %s, since:%s", url, tstrerror(terrno));
×
4069
  }
4070
  return pJson;
×
4071
}
4072

4073
static int32_t mndGetXnodeStatus(SXnodeObj *pObj, char *status, int32_t statusLen) {
×
4074
  int32_t code = 0;
×
4075
  SJson  *pJson = NULL;
×
4076

4077
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
4078
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/xnode/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
4079
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_GET, defaultTimeout, NULL, 0);
×
4080
  if (pJson == NULL) {
×
4081
    code = terrno;
×
4082
    goto _OVER;
×
4083
  }
4084

4085
  code = tjsonGetStringValue2(pJson, "status", status, statusLen);
×
4086
  if (code < 0) {
×
4087
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
4088
    goto _OVER;
×
4089
  }
4090
  if (strlen(status) == 0) {
×
4091
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
4092
    goto _OVER;
×
4093
  }
4094

4095
_OVER:
×
4096
  if (pJson != NULL) tjsonDelete(pJson);
×
4097
  TAOS_RETURN(code);
×
4098
}
4099

4100
/** xnode agent section **/
4101

4102
SSdbRaw *mndXnodeAgentActionEncode(SXnodeAgentObj *pObj) {
4✔
4103
  int32_t code = 0;
4✔
4104
  int32_t lino = 0;
4✔
4105
  terrno = TSDB_CODE_OUT_OF_MEMORY;
4✔
4106

4107
  if (NULL == pObj) {
4✔
4108
    terrno = TSDB_CODE_INVALID_PARA;
×
4109
    return NULL;
×
4110
  }
4111

4112
  int32_t rawDataLen =
4✔
4113
      sizeof(SXnodeAgentObj) + TSDB_XNODE_RESERVE_SIZE + pObj->nameLen + pObj->tokenLen + pObj->statusLen;
4✔
4114

4115
  SSdbRaw *pRaw = sdbAllocRaw(SDB_XNODE_AGENT, TSDB_XNODE_VER_NUMBER, rawDataLen);
4✔
4116
  if (pRaw == NULL) goto _OVER;
4✔
4117

4118
  int32_t dataPos = 0;
4✔
4119
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
4✔
4120
  SDB_SET_INT32(pRaw, dataPos, pObj->nameLen, _OVER)
4✔
4121
  SDB_SET_BINARY(pRaw, dataPos, pObj->name, pObj->nameLen, _OVER)
4✔
4122
  SDB_SET_INT32(pRaw, dataPos, pObj->tokenLen, _OVER)
4✔
4123
  SDB_SET_BINARY(pRaw, dataPos, pObj->token, pObj->tokenLen, _OVER)
4✔
4124
  SDB_SET_INT32(pRaw, dataPos, pObj->statusLen, _OVER)
4✔
4125
  SDB_SET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
4✔
4126
  SDB_SET_INT64(pRaw, dataPos, pObj->createTime, _OVER)
4✔
4127
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
4✔
4128

4129
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
4✔
4130

4131
  terrno = 0;
4✔
4132

4133
_OVER:
4✔
4134
  if (terrno != 0) {
4✔
4135
    mError("xnode agent:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
4136
    sdbFreeRaw(pRaw);
×
4137
    return NULL;
×
4138
  }
4139

4140
  mTrace("xnode agent:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
4✔
4141
  return pRaw;
4✔
4142
}
4143

4144
SSdbRow *mndXnodeAgentActionDecode(SSdbRaw *pRaw) {
4✔
4145
  int32_t code = 0;
4✔
4146
  int32_t lino = 0;
4✔
4147
  terrno = TSDB_CODE_OUT_OF_MEMORY;
4✔
4148
  SSdbRow        *pRow = NULL;
4✔
4149
  SXnodeAgentObj *pObj = NULL;
4✔
4150

4151
  if (NULL == pRaw) {
4✔
4152
    terrno = TSDB_CODE_INVALID_PARA;
×
4153
    return NULL;
×
4154
  }
4155

4156
  int8_t sver = 0;
4✔
4157
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
4✔
4158

4159
  if (sver != TSDB_XNODE_VER_NUMBER) {
4✔
4160
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
4161
    goto _OVER;
×
4162
  }
4163

4164
  pRow = sdbAllocRow(sizeof(SXnodeAgentObj));
4✔
4165
  if (pRow == NULL) goto _OVER;
4✔
4166

4167
  pObj = sdbGetRowObj(pRow);
4✔
4168
  if (pObj == NULL) goto _OVER;
4✔
4169

4170
  int32_t dataPos = 0;
4✔
4171
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
4✔
4172
  SDB_GET_INT32(pRaw, dataPos, &pObj->nameLen, _OVER)
4✔
4173
  if (pObj->nameLen > 0) {
4✔
4174
    pObj->name = taosMemoryCalloc(pObj->nameLen, 1);
4✔
4175
    if (pObj->name == NULL) goto _OVER;
4✔
4176
    SDB_GET_BINARY(pRaw, dataPos, pObj->name, pObj->nameLen, _OVER)
4✔
4177
  } else {
4178
    pObj->name = NULL;
×
4179
  }
4180
  SDB_GET_INT32(pRaw, dataPos, &pObj->tokenLen, _OVER)
4✔
4181
  if (pObj->tokenLen > 0) {
4✔
4182
    pObj->token = taosMemoryCalloc(pObj->tokenLen, 1);
4✔
4183
    if (pObj->token == NULL) goto _OVER;
4✔
4184
    SDB_GET_BINARY(pRaw, dataPos, pObj->token, pObj->tokenLen, _OVER)
4✔
4185
  } else {
4186
    pObj->token = NULL;
×
4187
  }
4188
  SDB_GET_INT32(pRaw, dataPos, &pObj->statusLen, _OVER)
4✔
4189
  if (pObj->statusLen > 0) {
4✔
4190
    pObj->status = taosMemoryCalloc(pObj->statusLen, 1);
2✔
4191
    if (pObj->status == NULL) goto _OVER;
2✔
4192
    SDB_GET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
2✔
4193
  } else {
4194
    pObj->status = NULL;
2✔
4195
  }
4196
  SDB_GET_INT64(pRaw, dataPos, &pObj->createTime, _OVER)
4✔
4197
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
4✔
4198

4199
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
4✔
4200

4201
  terrno = 0;
4✔
4202

4203
_OVER:
4✔
4204
  if (terrno != 0) {
4✔
4205
    mError("xnode agent:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
4206
    if (pObj != NULL) {
×
4207
      taosMemoryFreeClear(pObj->name);
×
4208
      taosMemoryFreeClear(pObj->token);
×
4209
      taosMemoryFreeClear(pObj->status);
×
4210
    }
4211
    taosMemoryFreeClear(pRow);
×
4212
    return NULL;
×
4213
  }
4214

4215
  mTrace("xnode agent:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
4✔
4216
  return pRow;
4✔
4217
}
4218

4219
int32_t mndXnodeAgentActionInsert(SSdb *pSdb, SXnodeAgentObj *pObj) {
×
4220
  mDebug("xnode agent:%d, perform insert action, row:%p", pObj->id, pObj);
×
4221
  return 0;
×
4222
}
4223

4224
int32_t mndXnodeAgentActionUpdate(SSdb *pSdb, SXnodeAgentObj *pOld, SXnodeAgentObj *pNew) {
×
4225
  mDebug("xnode agent:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
×
4226

4227
  taosWLockLatch(&pOld->lock);
×
4228
  swapFields(&pNew->nameLen, &pNew->name, &pOld->nameLen, &pOld->name);
×
4229
  swapFields(&pNew->tokenLen, &pNew->token, &pOld->tokenLen, &pOld->token);
×
4230
  swapFields(&pNew->statusLen, &pNew->status, &pOld->statusLen, &pOld->status);
×
4231
  if (pNew->updateTime > pOld->updateTime) {
×
4232
    pOld->updateTime = pNew->updateTime;
×
4233
  }
4234
  taosWUnLockLatch(&pOld->lock);
×
4235
  return 0;
×
4236
}
4237

4238
static void mndFreeXnodeAgent(SXnodeAgentObj *pObj) {
×
4239
  if (pObj == NULL) return;
×
4240
  if (pObj->name != NULL) {
×
4241
    taosMemoryFreeClear(pObj->name);
×
4242
  }
4243
  if (pObj->token != NULL) {
×
4244
    taosMemoryFreeClear(pObj->token);
×
4245
  }
4246
  if (pObj->status != NULL) {
×
4247
    taosMemoryFreeClear(pObj->status);
×
4248
  }
4249
}
4250

4251
int32_t mndXnodeAgentActionDelete(SSdb *pSdb, SXnodeAgentObj *pObj) {
×
4252
  mDebug("xnode agent:%d, perform delete action, row:%p", pObj->id, pObj);
×
4253
  mndFreeXnodeAgent(pObj);
×
4254
  return 0;
×
4255
}
4256

4257
static int32_t mndSetCreateXnodeAgentRedoLogs(STrans *pTrans, SXnodeAgentObj *pObj) {
×
4258
  int32_t  code = 0;
×
4259
  SSdbRaw *pRedoRaw = mndXnodeAgentActionEncode(pObj);
×
4260
  if (pRedoRaw == NULL) {
×
4261
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4262
    if (terrno != 0) code = terrno;
×
4263
    TAOS_RETURN(code);
×
4264
  }
4265
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
4266
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
4267
  TAOS_RETURN(code);
×
4268
}
4269

4270
static int32_t mndSetCreateXnodeAgentUndoLogs(STrans *pTrans, SXnodeAgentObj *pObj) {
×
4271
  int32_t  code = 0;
×
4272
  SSdbRaw *pUndoRaw = mndXnodeAgentActionEncode(pObj);
×
4273
  if (pUndoRaw == NULL) {
×
4274
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4275
    if (terrno != 0) code = terrno;
×
4276
    TAOS_RETURN(code);
×
4277
  }
4278
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
4279
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
4280
  TAOS_RETURN(code);
×
4281
}
4282

4283
static int32_t mndSetCreateXnodeAgentCommitLogs(STrans *pTrans, SXnodeAgentObj *pObj) {
×
4284
  int32_t  code = 0;
×
4285
  SSdbRaw *pCommitRaw = mndXnodeAgentActionEncode(pObj);
×
4286
  if (pCommitRaw == NULL) {
×
4287
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4288
    if (terrno != 0) code = terrno;
×
4289
    TAOS_RETURN(code);
×
4290
  }
4291
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
4292
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
4293
  TAOS_RETURN(code);
×
4294
}
4295

4296
void mndReleaseXnodeAgent(SMnode *pMnode, SXnodeAgentObj *pObj) {
×
4297
  SSdb *pSdb = pMnode->pSdb;
×
4298
  sdbRelease(pSdb, pObj);
×
4299
}
×
4300

4301
static int32_t mndValidateXnodePermissions(SMnode *pMnode, SRpcMsg *pReq, EOperType oper) {
×
4302
  int32_t code = grantCheck(TSDB_GRANT_XNODE);
×
4303
  if (code != TSDB_CODE_SUCCESS) {
×
4304
    mError("failed to validate xnode permissions, code:%s, oper:%d", tstrerror(code), oper);
×
4305
    return code;
×
4306
  }
4307

4308
  return mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, oper);
×
4309
}
4310

4311
SXnodeAgentObj *mndAcquireXnodeAgentById(SMnode *pMnode, int32_t id) {
×
4312
  SXnodeAgentObj *pObj = sdbAcquire(pMnode->pSdb, SDB_XNODE_AGENT, &id);
×
4313
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
4314
    terrno = TSDB_CODE_MND_XNODE_AGENT_NOT_EXIST;
×
4315
  }
4316
  return pObj;
×
4317
}
4318

4319
static SXnodeAgentObj *mndAcquireXnodeAgentByName(SMnode *pMnode, const char *name) {
×
4320
  SSdb *pSdb = pMnode->pSdb;
×
4321

4322
  void *pIter = NULL;
×
4323
  while (1) {
×
4324
    SXnodeAgentObj *pAgent = NULL;
×
4325
    pIter = sdbFetch(pSdb, SDB_XNODE_AGENT, pIter, (void **)&pAgent);
×
4326
    if (pIter == NULL) break;
×
4327
    if (pAgent->name == NULL) {
×
4328
      continue;
×
4329
    }
4330

4331
    if (strcasecmp(name, pAgent->name) == 0) {
×
4332
      sdbCancelFetch(pSdb, pIter);
×
4333
      return pAgent;
×
4334
    }
4335

4336
    sdbRelease(pSdb, pAgent);
×
4337
  }
4338

4339
  mDebug("xnode agent:%s, not found", name);
×
4340
  terrno = TSDB_CODE_MND_XNODE_AGENT_NOT_EXIST;
×
4341
  return NULL;
×
4342
}
4343

4344
static int32_t mndCheckXnodeAgentExists(SMnode *pMnode, const char *name) {
×
4345
  SXnodeAgentObj *pObj = mndAcquireXnodeAgentByName(pMnode, name);
×
4346
  if (pObj != NULL) {
×
4347
    mError("xnode agent:%s already exists", name);
×
4348
    mndReleaseXnodeAgent(pMnode, pObj);
×
4349
    return TSDB_CODE_MND_XNODE_AGENT_ALREADY_EXIST;
×
4350
  }
4351
  terrno = TSDB_CODE_SUCCESS;
×
4352
  return TSDB_CODE_SUCCESS;
×
4353
}
4354

4355
#ifndef WINDOWS
4356
typedef struct {
4357
  int64_t sub;  // agent ID
4358
  int64_t iat;  // issued at time
4359
} agentTokenField;
4360

4361
const unsigned char MNDXNODE_DEFAULT_SECRET[] = {126, 222, 130, 137, 43,  122, 41,  173, 144, 146, 116,
4362
                                                 138, 153, 244, 251, 99,  50,  55,  140, 238, 218, 232,
4363
                                                 15,  161, 226, 54,  130, 40,  211, 234, 111, 171};
4364

4365
agentTokenField mndXnodeCreateAgentTokenField(long agent_id, time_t issued_at) {
×
4366
  agentTokenField field = {0};
×
4367
  field.sub = agent_id;
×
4368
  field.iat = issued_at;
×
4369
  return field;
×
4370
}
4371

4372
static char *mndXnodeBase64UrlEncodeOpenssl(const unsigned char *input, size_t input_len) {
12✔
4373
  __uint64_t code = 0;
12✔
4374
  int32_t lino = 0;
12✔
4375
  BIO     *bio = NULL, *b64 = NULL;
12✔
4376
  BUF_MEM *bufferPtr = NULL;
12✔
4377
  char    *base64_str = NULL;
12✔
4378

4379
  b64 = BIO_new(BIO_f_base64());
12✔
4380
  if (!b64) {
12✔
4381
    lino = __LINE__;
×
4382
    code = ERR_get_error();
×
4383
    goto _err;
×
4384
  }
4385

4386
  bio = BIO_new(BIO_s_mem());
12✔
4387
  if (!bio) {
12✔
4388
    lino = __LINE__;
×
4389
    code = ERR_get_error();
×
4390
    goto _err;
×
4391
  }
4392

4393
  // BIO chain:b64 → bio
4394
  bio = BIO_push(b64, bio);
12✔
4395
  if (!bio) {
12✔
4396
    lino = __LINE__;
×
4397
    code = ERR_get_error();
×
4398
    goto _err;
×
4399
  }
4400
  BIO_set_flags(bio, BIO_FLAGS_BASE64_NO_NL);
12✔
4401

4402
  int32_t write_ret = BIO_write(bio, input, input_len);
12✔
4403
  if (write_ret <= 0 || (size_t)write_ret != input_len) {
12✔
4404
    lino = __LINE__;
×
4405
    code = ERR_get_error();
×
4406
    goto _err;
×
4407
  }
4408
  int32_t flush_ret = BIO_flush(bio);
12✔
4409
  if (flush_ret != 1) {
12✔
4410
    lino = __LINE__;
×
4411
    code = ERR_get_error();
×
4412
    goto _err;
×
4413
  }
4414
  int64_t ret = BIO_get_mem_ptr(bio, &bufferPtr);
12✔
4415
  if (ret <= 0) {
12✔
4416
    lino = __LINE__;
×
4417
    code = ERR_get_error();
×
4418
    goto _err;
×
4419
}
4420
  if (!bufferPtr || !bufferPtr->data || bufferPtr->length == 0) {
12✔
4421
    lino = __LINE__;
×
4422
    code = ERR_get_error();
×
4423
    goto _err;
×
4424
  }
4425
  base64_str = taosMemoryMalloc(bufferPtr->length + 1);
12✔
4426
  if (!base64_str) {
12✔
4427
    lino = __LINE__;
×
4428
    code = ERR_get_error();
×
4429
    goto _err;
×
4430
  }
4431
  memcpy(base64_str, bufferPtr->data, bufferPtr->length);
12✔
4432
  base64_str[bufferPtr->length] = '\0';
12✔
4433
  // url safe
4434
  for (size_t i = 0; i < bufferPtr->length; i++) {
484✔
4435
    if (base64_str[i] == '+') {
472✔
4436
      base64_str[i] = '-';
6✔
4437
    } else if (base64_str[i] == '/') {
466✔
4438
      base64_str[i] = '_';
×
4439
    }
4440
  }
4441
  // remove padding char '='
4442
  size_t len = strlen(base64_str);
12✔
4443
  while (len > 0 && base64_str[len - 1] == '=') {
18✔
4444
    base64_str[len - 1] = '\0';
6✔
4445
    len--;
6✔
4446
  }
4447
  goto _exit;
12✔
4448

4449
_err:
×
4450
  if (code != TSDB_CODE_SUCCESS) {
×
4451
    mError("xnode agent: line: %d failed to encode base64 since %s", lino, ERR_error_string(code, NULL));
×
4452
  }
4453
  if (base64_str) {
×
4454
    taosMemoryFree(base64_str);
×
4455
    base64_str = NULL;
×
4456
  }
4457

4458
_exit:
×
4459
  if (bio) {
12✔
4460
    BIO_free_all(bio);  // will release b64 and bio
12✔
4461
  } else if (b64) {
×
4462
    int32_t ret = BIO_free(b64);
×
4463
    if (ret != 1) {
×
4464
      lino = __LINE__;
×
4465
      code = ERR_get_error();
×
4466
      mError("xnode agent: line: %d BIO failed to free since %s", lino, ERR_error_string(code, NULL));
×
4467
    }
4468
  }
4469

4470
  return base64_str;
12✔
4471
}
4472

4473
static char *mndXnodeCreateTokenHeader() {
4✔
4474
  int32_t code = 0, lino = 0;
4✔
4475
  cJSON  *headerJson = NULL;
4✔
4476
  char   *headerJsonStr = NULL;
4✔
4477
  char   *encoded = NULL;
4✔
4478

4479
  headerJson = tjsonCreateObject();
4✔
4480
  if (!headerJson) {
4✔
4481
    code = terrno;
×
4482
    goto _exit;
×
4483
  }
4484

4485
  TAOS_CHECK_EXIT(tjsonAddStringToObject(headerJson, "alg", "HS256"));
4✔
4486
  TAOS_CHECK_EXIT(tjsonAddStringToObject(headerJson, "typ", "JWT"));
4✔
4487

4488
  headerJsonStr = tjsonToUnformattedString(headerJson);
4✔
4489
  if (!headerJsonStr) {
4✔
4490
    code = terrno;
×
4491
    goto _exit;
×
4492
  }
4493
  encoded = mndXnodeBase64UrlEncodeOpenssl((const unsigned char *)headerJsonStr, strlen(headerJsonStr));
4✔
4494
  if (!encoded) {
4✔
4495
    code = terrno;
×
4496
    goto _exit;
×
4497
  }
4498

4499
_exit:
4✔
4500
  if (code != TSDB_CODE_SUCCESS) {
4✔
4501
    mError("xnode agent: line: %d failed to create header since %s", lino, tstrerror(code));
×
4502
    taosMemoryFree(encoded);
×
4503
    encoded = NULL;
×
4504
  }
4505

4506
  if (headerJsonStr) {
4✔
4507
    taosMemoryFree(headerJsonStr);
4✔
4508
  }
4509
  if (headerJson) {
4✔
4510
    tjsonDelete(headerJson);
4✔
4511
  }
4512

4513
  return encoded;
4✔
4514
}
4515

4516
static char *mndXnodeCreateTokenPayload(const agentTokenField *claims) {
4✔
4517
  int32_t code = 0, lino = 0;
4✔
4518
  cJSON  *payloadJson = NULL;
4✔
4519
  char   *payloadStr = NULL;
4✔
4520
  char   *encoded = NULL;
4✔
4521

4522
  if (!claims) {
4✔
4523
    code = TSDB_CODE_INVALID_PARA;
×
4524
    terrno = code;
×
4525
    return NULL;
×
4526
  }
4527

4528
  payloadJson = tjsonCreateObject();
4✔
4529
  if (!payloadJson) {
4✔
4530
    code = terrno;
×
4531
    goto _exit;
×
4532
  }
4533

4534
  TAOS_CHECK_EXIT(tjsonAddDoubleToObject(payloadJson, "iat", claims->iat));
4✔
4535
  TAOS_CHECK_EXIT(tjsonAddDoubleToObject(payloadJson, "sub", claims->sub));
4✔
4536

4537
  payloadStr = tjsonToUnformattedString(payloadJson);
4✔
4538
  if (!payloadStr) {
4✔
4539
    code = terrno;
×
4540
    goto _exit;
×
4541
  }
4542
  encoded = mndXnodeBase64UrlEncodeOpenssl((const unsigned char *)payloadStr, strlen(payloadStr));
4✔
4543
  if (!encoded) {
4✔
4544
    code = terrno;
×
4545
    goto _exit;
×
4546
  }
4547

4548
_exit:
4✔
4549
  if (code != TSDB_CODE_SUCCESS) {
4✔
4550
    mError("xnode agent line: %d failed to create payload since %s", lino, tstrerror(code));
×
4551
    taosMemoryFree(encoded);
×
4552
    encoded = NULL;
×
4553
  }
4554
  if (payloadStr) {
4✔
4555
    taosMemoryFree(payloadStr);
4✔
4556
  }
4557
  if (payloadJson) {
4✔
4558
    tjsonDelete(payloadJson);
4✔
4559
  }
4560
  return encoded;
4✔
4561
}
4562

4563
static char *mndXnodeCreateTokenSignature(const char *header_payload, const unsigned char *secret, size_t secret_len) {
4✔
4564
  int32_t       code = 0, lino = 0;
4✔
4565
  unsigned char hash[EVP_MAX_MD_SIZE] = {0};
4✔
4566
  unsigned int  hash_len = 0;
4✔
4567
  char         *encoded = NULL;
4✔
4568

4569
  // HMAC-SHA256
4570
  if (!HMAC(EVP_sha256(), secret, secret_len, (const unsigned char *)header_payload, strlen(header_payload), hash,
4✔
4571
            &hash_len)) {
4572
    code = terrno;
×
4573
    goto _exit;
×
4574
  }
4575

4576
  encoded = mndXnodeBase64UrlEncodeOpenssl(hash, hash_len);
4✔
4577
  if (!encoded) {
4✔
4578
    code = terrno;
×
4579
    goto _exit;
×
4580
  }
4581

4582
_exit:
4✔
4583
  if (code != TSDB_CODE_SUCCESS) {
4✔
4584
    mError("xnode agent line: %d failed create signature since %s", lino, tstrerror(code));
×
4585
    taosMemoryFree(encoded);
×
4586
    encoded = NULL;
×
4587
  }
4588
  return encoded;
4✔
4589
}
4590

4591
char *mndXnodeCreateAgentToken(const agentTokenField *claims, const unsigned char *secret, size_t secret_len) {
4✔
4592
  int32_t code = 0, lino = 0;
4✔
4593
  char   *header = NULL, *payload = NULL;
4✔
4594
  char   *headerPayload = NULL;
4✔
4595
  char   *signature = NULL;
4✔
4596
  char   *token = NULL;
4✔
4597

4598
  if (!claims) {
4✔
4599
    code = TSDB_CODE_INVALID_PARA;
×
4600
    goto _exit;
×
4601
  }
4602

4603
  if (!secret || secret_len == 0) {
4✔
4604
    secret = MNDXNODE_DEFAULT_SECRET;
×
4605
    secret_len = sizeof(MNDXNODE_DEFAULT_SECRET);
×
4606
  }
4607

4608
  header = mndXnodeCreateTokenHeader();
4✔
4609
  if (!header) {
4✔
4610
    code = terrno;
×
4611
    goto _exit;
×
4612
  }
4613

4614
  payload = mndXnodeCreateTokenPayload(claims);
4✔
4615
  if (!payload) {
4✔
4616
    code = terrno;
×
4617
    goto _exit;
×
4618
  }
4619

4620
  size_t header_payload_len = strlen(header) + strlen(payload) + 2;
4✔
4621
  headerPayload = taosMemoryMalloc(header_payload_len);
4✔
4622
  if (!headerPayload) {
4✔
4623
    code = terrno;
×
4624
    goto _exit;
×
4625
  }
4626
  snprintf(headerPayload, header_payload_len, "%s.%s", header, payload);
4✔
4627

4628
  signature = mndXnodeCreateTokenSignature(headerPayload, secret, secret_len);
4✔
4629
  if (!signature) {
4✔
4630
    code = terrno;
×
4631
    goto _exit;
×
4632
  }
4633

4634
  size_t token_len = strlen(headerPayload) + strlen(signature) + 2;
4✔
4635
  token = taosMemoryCalloc(1, token_len);
4✔
4636
  if (!token) {
4✔
4637
    code = terrno;
×
4638
    goto _exit;
×
4639
  }
4640

4641
  snprintf(token, token_len, "%s.%s", headerPayload, signature);
4✔
4642

4643
_exit:
4✔
4644
  if (code != TSDB_CODE_SUCCESS) {
4✔
4645
    mError("xnode agent line: %d failed create token since %s", lino, tstrerror(code));
×
4646
    taosMemoryFree(token);
×
4647
    token = NULL;
×
4648
  }
4649
  taosMemoryFree(signature);
4✔
4650
  taosMemoryFree(headerPayload);
4✔
4651
  taosMemoryFree(payload);
4✔
4652
  taosMemoryFree(header);
4✔
4653

4654
  return token;
4✔
4655
}
4656
#endif
4657

4658
int32_t mndXnodeGenAgentToken(const SXnodeAgentObj *pAgent, char *pTokenBuf) {
×
4659
  int32_t code = 0, lino = 0;
×
4660
  #ifndef WINDOWS
4661
  // char *token =
4662
  //     "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE3Njc1OTc3NzIsInN1YiI6MTIzNDV9.i7HvYf_S-yWGEExDzQESPUwVX23Ok_"
4663
  //     "7Fxo93aqgKrtw";
4664
  agentTokenField claims = {
×
4665
      .iat = pAgent->createTime,
×
4666
      .sub = pAgent->id,
×
4667
  };
4668
  char *token = mndXnodeCreateAgentToken(&claims, MNDXNODE_DEFAULT_SECRET, sizeof(MNDXNODE_DEFAULT_SECRET));
×
4669
  if (!token) {
×
4670
    code = terrno;
×
4671
    lino = __LINE__;
×
4672
    goto _exit;
×
4673
  }
4674
  (void)memcpy(pTokenBuf, token, TMIN(strlen(token) + 1, TSDB_XNODE_AGENT_TOKEN_LEN));
×
4675

4676
_exit:
×
4677
  if (code != TSDB_CODE_SUCCESS) {
×
4678
    mError("xnode agent line: %d failed gen token since %s", lino, tstrerror(code));
×
4679
  }
4680
  taosMemoryFree(token);
×
4681
  #endif
4682
  TAOS_RETURN(code);
×
4683
}
4684

4685
static int32_t mndCreateXnodeAgent(SMnode *pMnode, SRpcMsg *pReq, SMCreateXnodeAgentReq *pCreate,
×
4686
                                   SXnodeAgentObj **ppObj) {
4687
  int32_t code = -1;
×
4688
  STrans *pTrans = NULL;
×
4689

4690
  if ((*ppObj) == NULL) {
×
4691
    *ppObj = taosMemoryCalloc(1, sizeof(SXnodeAgentObj));
×
4692
    if (*ppObj == NULL) {
×
4693
      code = terrno;
×
4694
      goto _OVER;
×
4695
    }
4696
  }
4697
  SXnodeAgentObj *pAgentObj = *ppObj;
×
4698

4699
  pAgentObj->id = sdbGetMaxId(pMnode->pSdb, SDB_XNODE_AGENT);
×
4700
  pAgentObj->createTime = taosGetTimestampMs();
×
4701
  pAgentObj->updateTime = pAgentObj->createTime;
×
4702

4703
  pAgentObj->nameLen = pCreate->name.len + 1;
×
4704
  pAgentObj->name = taosMemoryCalloc(1, pAgentObj->nameLen);
×
4705
  if (pAgentObj->name == NULL) goto _OVER;
×
4706
  (void)memcpy(pAgentObj->name, pCreate->name.ptr, pCreate->name.len);
×
4707

4708
  if (pCreate->status.ptr != NULL) {
×
4709
    pAgentObj->statusLen = pCreate->status.len + 1;
×
4710
    pAgentObj->status = taosMemoryCalloc(1, pAgentObj->statusLen);
×
4711
    if (pAgentObj->status == NULL) goto _OVER;
×
4712
    (void)memcpy(pAgentObj->status, pCreate->status.ptr, pCreate->status.len);
×
4713
  }
4714
  // gen token
4715
  char token[TSDB_XNODE_AGENT_TOKEN_LEN] = {0};
×
4716
  TAOS_CHECK_GOTO(mndXnodeGenAgentToken(pAgentObj, token), NULL, _OVER);
×
4717
  pAgentObj->tokenLen = strlen(token) + 1;
×
4718
  pAgentObj->token = taosMemoryCalloc(1, pAgentObj->tokenLen);
×
4719
  if (pAgentObj->token == NULL) goto _OVER;
×
4720
  (void)memcpy(pAgentObj->token, token, pAgentObj->tokenLen - 1);
×
4721

4722
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-xnode-agent");
×
4723
  if (pTrans == NULL) {
×
4724
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4725
    if (terrno != 0) {
×
4726
      code = terrno;
×
4727
    }
4728
    mError("failed to create transaction for xnode-agent:%s, code:0x%x:%s", pCreate->name.ptr, code, tstrerror(code));
×
4729
    goto _OVER;
×
4730
  }
4731
  mndTransSetSerial(pTrans);
×
4732

4733
  mDebug("trans:%d, used to create xnode agent:%s as agent:%d", pTrans->id, pCreate->name.ptr, pAgentObj->id);
×
4734
  TAOS_CHECK_GOTO(mndSetCreateXnodeAgentRedoLogs(pTrans, pAgentObj), NULL, _OVER);
×
4735
  TAOS_CHECK_GOTO(mndSetCreateXnodeAgentUndoLogs(pTrans, pAgentObj), NULL, _OVER);
×
4736
  TAOS_CHECK_GOTO(mndSetCreateXnodeAgentCommitLogs(pTrans, pAgentObj), NULL, _OVER);
×
4737
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
4738

4739
  code = 0;
×
4740

4741
_OVER:
×
4742
  mndTransDrop(pTrans);
×
4743
  TAOS_RETURN(code);
×
4744
}
4745

4746
static int32_t httpCreateAgent(SXnodeAgentObj *pObj) {
×
4747
  int32_t code = 0;
×
4748
  SJson  *pJson = NULL;
×
4749
  SJson  *postContent = NULL;
×
4750
  char   *pContStr = NULL;
×
4751

4752
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
4753
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/agent", XNODED_PIPE_SOCKET_URL);
×
4754
  postContent = tjsonCreateObject();
×
4755
  if (postContent == NULL) {
×
4756
    code = terrno;
×
4757
    goto _OVER;
×
4758
  }
4759
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "token", pObj->token), NULL, _OVER);
×
4760
  pContStr = tjsonToUnformattedString(postContent);
×
4761
  if (pContStr == NULL) {
×
4762
    code = terrno;
×
4763
    goto _OVER;
×
4764
  }
4765
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, defaultTimeout, pContStr, strlen(pContStr));
×
4766

4767
_OVER:
×
4768
  if (postContent != NULL) {
×
4769
    tjsonDelete(postContent);
×
4770
  }
4771
  if (pContStr != NULL) {
×
4772
    taosMemFree(pContStr);
×
4773
  }
4774
  if (pJson != NULL) {
×
4775
    tjsonDelete(pJson);
×
4776
  }
4777
  TAOS_RETURN(code);
×
4778
}
4779

4780
static int32_t mndProcessCreateXnodeAgentReq(SRpcMsg *pReq) {
×
4781
  SMnode               *pMnode = pReq->info.node;
×
4782
  int32_t               code = 0;
×
4783
  SXnodeAgentObj       *pObj = NULL;
×
4784
  SMCreateXnodeAgentReq createReq = {0};
×
4785

4786
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_CREATE_XNODE_AGENT);
×
4787
  if (code != TSDB_CODE_SUCCESS) {
×
4788
    mError("failed check permission for create xnode agent, code:%s", tstrerror(code));
×
4789
    goto _OVER;
×
4790
  }
4791

4792
  code = tDeserializeSMCreateXnodeAgentReq(pReq->pCont, pReq->contLen, &createReq);
×
4793
  if (code != 0) {
×
4794
    mError("failed to deserialize create xnode agent request, code:%s", tstrerror(code));
×
4795
    TAOS_RETURN(code);
×
4796
  }
4797

4798
  TAOS_CHECK_GOTO(mndCheckXnodeAgentExists(pMnode, createReq.name.ptr), NULL, _OVER);
×
4799

4800
  TAOS_CHECK_GOTO(mndCreateXnodeAgent(pMnode, pReq, &createReq, &pObj), NULL, _OVER);
×
4801
  code = TSDB_CODE_ACTION_IN_PROGRESS;
×
4802

4803
  (void)httpCreateAgent(pObj);
×
4804

4805
_OVER:
×
4806
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
4807
    mError("xnode agent:%s, failed to create since %s", createReq.name.ptr ? createReq.name.ptr : "unknown",
×
4808
           tstrerror(code));
4809
  }
4810
  if (pObj != NULL) {
×
4811
    mndFreeXnodeAgent(pObj);
×
4812
    taosMemoryFree(pObj);
×
4813
  }
4814
  tFreeSMCreateXnodeAgentReq(&createReq);
×
4815
  TAOS_RETURN(code);
×
4816
}
4817

4818
static int32_t mndUpdateXnodeAgent(SMnode *pMnode, SRpcMsg *pReq, const SXnodeAgentObj *pOld,
×
4819
                                   SMUpdateXnodeAgentReq *pUpdate) {
4820
  mDebug("xnode agent:%d, start to update", pUpdate->id);
×
4821
  int32_t        code = -1;
×
4822
  STrans        *pTrans = NULL;
×
4823
  struct {
4824
    bool status;
4825
    bool name;
4826
  } isChange = {0};
×
4827
  SXnodeAgentObj agentObjRef = *pOld;
×
4828

4829
  const char *status = getXTaskOptionByName(&pUpdate->options, "status");
×
4830
  if (status != NULL) {
×
4831
    isChange.status = true;
×
4832
    agentObjRef.statusLen = strlen(status) + 1;
×
4833
    agentObjRef.status = taosMemoryCalloc(1, agentObjRef.statusLen);
×
4834
    if (agentObjRef.status == NULL) goto _OVER;
×
4835
    (void)memcpy(agentObjRef.status, status, agentObjRef.statusLen);
×
4836
  }
4837
  const char *name = getXTaskOptionByName(&pUpdate->options, "name");
×
4838
  if (name != NULL) {
×
4839
    isChange.name = true;
×
4840
    agentObjRef.nameLen = strlen(name) + 1;
×
4841
    agentObjRef.name = taosMemoryCalloc(1, agentObjRef.nameLen);
×
4842
    if (agentObjRef.name == NULL) goto _OVER;
×
4843
    (void)memcpy(agentObjRef.name, name, agentObjRef.nameLen);
×
4844
  }
4845
  agentObjRef.updateTime = taosGetTimestampMs();
×
4846

4847
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-xnode-agent");
×
4848
  if (pTrans == NULL) {
×
4849
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
4850
    if (terrno != 0) code = terrno;
×
4851
    goto _OVER;
×
4852
  }
4853
  mInfo("trans:%d, used to update xnode agent:%d", pTrans->id, agentObjRef.id);
×
4854

4855
  TAOS_CHECK_GOTO(mndSetCreateXnodeAgentCommitLogs(pTrans, &agentObjRef), NULL, _OVER);
×
4856
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
4857
  code = 0;
×
4858

4859
_OVER:
×
4860
  if (isChange.status) {
×
4861
    taosMemoryFree(agentObjRef.status);
×
4862
  }
4863
  if (isChange.name) {
×
4864
    taosMemoryFree(agentObjRef.name);
×
4865
  }
4866
  mndTransDrop(pTrans);
×
4867
  TAOS_RETURN(code);
×
4868
}
4869

4870
static int32_t mndProcessUpdateXnodeAgentReq(SRpcMsg *pReq) {
×
4871
  SMnode               *pMnode = pReq->info.node;
×
4872
  int32_t               code = -1;
×
4873
  SXnodeAgentObj       *pObj = NULL;
×
4874
  SMUpdateXnodeAgentReq updateReq = {0};
×
4875

4876
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_UPDATE_XNODE_AGENT);
×
4877
  if (code != TSDB_CODE_SUCCESS) {
×
4878
    mError("failed check permission for update xnode agent, code:%s", tstrerror(code));
×
4879
    goto _OVER;
×
4880
  }
4881

4882
  TAOS_CHECK_GOTO(tDeserializeSMUpdateXnodeAgentReq(pReq->pCont, pReq->contLen, &updateReq), NULL, _OVER);
×
4883
  mDebug("xnode update agent request id:%d, nameLen:%d\n", updateReq.id, updateReq.name.len);
×
4884

4885
  if (updateReq.id <= 0 && (updateReq.name.len <= 0 || updateReq.name.ptr == NULL)) {
×
4886
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
4887
    goto _OVER;
×
4888
  }
4889

4890
  if (updateReq.id > 0) {
×
4891
    pObj = mndAcquireXnodeAgentById(pMnode, updateReq.id);
×
4892
  } else {
4893
    pObj = mndAcquireXnodeAgentByName(pMnode, updateReq.name.ptr);
×
4894
  }
4895
  if (pObj == NULL) {
×
4896
    code = terrno;
×
4897
    goto _OVER;
×
4898
  }
4899
  const char *nameRef = getXTaskOptionByName(&updateReq.options, "name");
×
4900
  if (nameRef != NULL) {
×
4901
    SXnodeAgentObj* tmpObj = mndAcquireXnodeAgentByName(pMnode, nameRef);
×
4902
    if (tmpObj != NULL) {
×
4903
      mndReleaseXnodeAgent(pMnode, tmpObj);
×
4904
      code = TSDB_CODE_MND_XNODE_NAME_DUPLICATE;
×
4905
      goto _OVER;
×
4906
    }
4907
  }
4908

4909
  code = mndUpdateXnodeAgent(pMnode, pReq, pObj, &updateReq);
×
4910
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
4911

4912
_OVER:
×
4913
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
4914
    mError("xnode agent:%d, failed to update since %s", updateReq.id, tstrerror(code));
×
4915
  }
4916

4917
  mndReleaseXnodeAgent(pMnode, pObj);
×
4918
  tFreeSMUpdateXnodeAgentReq(&updateReq);
×
4919
  TAOS_RETURN(code);
×
4920
}
4921

4922
static int32_t mndSetDropXnodeAgentRedoLogs(STrans *pTrans, SXnodeAgentObj *pObj) {
×
4923
  int32_t  code = 0;
×
4924
  SSdbRaw *pRedoRaw = mndXnodeAgentActionEncode(pObj);
×
4925
  if (pRedoRaw == NULL) {
×
4926
    code = terrno;
×
4927
    return code;
×
4928
  }
4929

4930
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
4931
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
4932

4933
  TAOS_RETURN(code);
×
4934
}
4935

4936
static int32_t mndSetDropXnodeAgentCommitLogs(STrans *pTrans, SXnodeAgentObj *pObj) {
×
4937
  int32_t  code = 0;
×
4938
  SSdbRaw *pCommitRaw = mndXnodeAgentActionEncode(pObj);
×
4939
  if (pCommitRaw == NULL) {
×
4940
    code = terrno;
×
4941
    return code;
×
4942
  }
4943

4944
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
4945
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
4946
  TAOS_RETURN(code);
×
4947
}
4948

4949
static int32_t mndDropXnodeAgent(SMnode *pMnode, SRpcMsg *pReq, SXnodeAgentObj *pAgent) {
×
4950
  int32_t code = 0;
×
4951
  int32_t lino = 0;
×
4952

4953
  if (pAgent == NULL) {
×
4954
    mError("xnode agent fail to drop since pAgent is NULL");
×
4955
    code = TSDB_CODE_INVALID_PARA;
×
4956
    return code;
×
4957
  }
4958
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-xnode-agent");
×
4959
  TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
×
4960
  mndTransSetSerial(pTrans);
×
4961
  mDebug("trans:%d, to drop xnode agent:%d", pTrans->id, pAgent->id);
×
4962

4963
  TAOS_CHECK_GOTO(mndSetDropXnodeAgentRedoLogs(pTrans, pAgent), NULL, _OVER);
×
4964
  TAOS_CHECK_GOTO(mndSetDropXnodeAgentCommitLogs(pTrans, pAgent), NULL, _OVER);
×
4965
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
4966

4967
_OVER:
×
4968
  mndTransDrop(pTrans);
×
4969
  TAOS_RETURN(code);
×
4970
}
4971

4972
static int32_t mndProcessDropXnodeAgentReq(SRpcMsg *pReq) {
×
4973
  SMnode             *pMnode = pReq->info.node;
×
4974
  int32_t             code = -1;
×
4975
  SXnodeAgentObj     *pObj = NULL;
×
4976
  SMDropXnodeAgentReq dropReq = {0};
×
4977

4978
  code = mndValidateXnodePermissions(pMnode, pReq, MND_OPER_DROP_XNODE_AGENT);
×
4979
  if (code != TSDB_CODE_SUCCESS) {
×
4980
    mError("failed check permission for drop xnode agent, code:%s", tstrerror(code));
×
4981
    goto _OVER;
×
4982
  }
4983
  TAOS_CHECK_GOTO(tDeserializeSMDropXnodeAgentReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
4984
  mDebug("xnode drop agent with id:%d, start to drop", dropReq.id);
×
4985

4986
  if (dropReq.id <= 0 && (dropReq.name.len <= 0 || dropReq.name.ptr == NULL)) {
×
4987
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
4988
    goto _OVER;
×
4989
  }
4990

4991
  if (dropReq.name.len > 0 && dropReq.name.ptr != NULL) {
×
4992
    pObj = mndAcquireXnodeAgentByName(pMnode, dropReq.name.ptr);
×
4993
  } else {
4994
    pObj = mndAcquireXnodeAgentById(pMnode, dropReq.id);
×
4995
  }
4996
  if (pObj == NULL) {
×
4997
    code = terrno;
×
4998
    goto _OVER;
×
4999
  }
5000

5001
  // send request to drop xnode task
5002
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
5003
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN + 1, "%s/agent/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
5004
  SJson *pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_DELETE, defaultTimeout, NULL, 0);
×
5005
  if (pJson) {
×
5006
    tjsonDelete(pJson);
×
5007
  }
5008

5009
  code = mndDropXnodeAgent(pMnode, pReq, pObj);
×
5010
  if (code == 0) {
×
5011
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
5012
  }
5013

5014
_OVER:
×
5015
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
5016
    mError("xnode task:%d, failed to drop since %s", dropReq.id, tstrerror(code));
×
5017
  }
5018
  mndReleaseXnodeAgent(pMnode, pObj);
×
5019
  tFreeSMDropXnodeAgentReq(&dropReq);
×
5020
  TAOS_RETURN(code);
×
5021
}
5022

5023
static int32_t mndRetrieveXnodeAgents(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
×
5024
  int32_t         code = 0;
×
5025
  SMnode         *pMnode = pReq->info.node;
×
5026
  SSdb           *pSdb = pMnode->pSdb;
×
5027
  int32_t         numOfRows = 0;
×
5028
  int32_t         cols = 0;
×
5029
  char            buf[VARSTR_HEADER_SIZE + TSDB_XNODE_AGENT_TOKEN_LEN];
5030
  SXnodeAgentObj *pObj = NULL;
×
5031

5032
  while (numOfRows < rows) {
×
5033
    pShow->pIter = sdbFetch(pSdb, SDB_XNODE_AGENT, pShow->pIter, (void **)&pObj);
×
5034
    if (pShow->pIter == NULL) break;
×
5035

5036
    cols = 0;
×
5037
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
5038
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
×
5039
    if (code != 0) goto _end;
×
5040

5041
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->name, pShow->pMeta->pSchemas[cols].bytes);
×
5042
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
5043
    code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
5044
    if (code != 0) goto _end;
×
5045

5046
    if (pObj->tokenLen > 0) {
×
5047
      buf[0] = 0;
×
5048
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->token, pShow->pMeta->pSchemas[cols].bytes);
×
5049
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
5050
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
5051
      if (code != 0) goto _end;
×
5052
    } else {
5053
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
5054
      colDataSetNULL(pColInfo, numOfRows);
×
5055
    }
5056

5057
    if (pObj->statusLen > 0) {
×
5058
      buf[0] = 0;
×
5059
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->status, pShow->pMeta->pSchemas[cols].bytes);
×
5060
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
5061
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
5062
      if (code != 0) goto _end;
×
5063
    } else {
5064
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
5065
      colDataSetNULL(pColInfo, numOfRows);
×
5066
    }
5067

5068
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
5069
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createTime, false);
×
5070
    if (code != 0) goto _end;
×
5071

5072
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
5073
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->updateTime, false);
×
5074
    if (code != 0) goto _end;
×
5075

5076
    numOfRows++;
×
5077
    sdbRelease(pSdb, pObj);
×
5078
  }
5079

5080
_end:
×
5081
  if (code != 0) sdbRelease(pSdb, pObj);
×
5082

5083
  pShow->numOfRows += numOfRows;
×
5084
  return numOfRows;
×
5085
}
5086

5087
static void mndCancelGetNextXnodeAgent(SMnode *pMnode, void *pIter) {
×
5088
  SSdb *pSdb = pMnode->pSdb;
×
5089
  sdbCancelFetchByType(pSdb, pIter, SDB_XNODE_AGENT);
×
5090
}
×
5091

5092
/** xnoded mgmt section **/
5093

5094
void mndStartXnoded(SMnode *pMnode, const char *user, const char *pass, const char *token) {
×
5095
  int32_t   code = 0;
×
5096
  SXnodeOpt pOption = {0};
×
5097

5098
  if ((user == NULL || pass == NULL) && token == NULL) {
×
5099
    mError("xnode failed to start xnoded, dnode:%d", pMnode->selfDnodeId);
×
5100
    return;
×
5101
  }
5102

5103
  pOption.dnodeId = pMnode->selfDnodeId;
×
5104
  pOption.clusterId = pMnode->clusterId;
×
5105

5106
  SEpSet epset = mndGetDnodeEpsetById(pMnode, pMnode->selfDnodeId);
×
5107
  if (epset.numOfEps == 0) {
×
5108
    mError("xnode failed to start xnoded, dnode:%d", pMnode->selfDnodeId);
×
5109
    return;
×
5110
  }
5111
  pOption.ep = epset.eps[0];
×
5112
  // add user password
5113
  if (user != NULL && pass != NULL) {
×
5114
    pOption.upLen = strlen(user) + strlen(pass) + 1;
×
5115
    snprintf(pOption.userPass, XNODE_USER_PASS_LEN, "%s:%s", user, pass);
×
5116
  }
5117
  // add token
5118
  if (token != NULL) {
×
5119
    snprintf(pOption.token, sizeof(pOption.token), "%s", token);
×
5120
  }
5121
  if ((code = mndOpenXnd(&pOption)) != 0) {
×
5122
    mError("xnode failed to open xnd since %s, dnodeId:%d", tstrerror(code), pOption.dnodeId);
×
5123
    return;
×
5124
  }
5125
}
5126

5127
void mndXnodeHandleBecomeLeader(SMnode *pMnode) {
16✔
5128
  mInfo("mndxnode start to process mnode become leader");
16✔
5129
  SXnodeUserPassObj *pObj = mndAcquireFirstXnodeUserPass(pMnode);
16✔
5130
  if (pObj == NULL) {
16✔
5131
    mInfo("mndXnode become leader found no xnoded user pass");
16✔
5132
    return;
16✔
5133
  }
5134

5135
  mndStartXnoded(pMnode, pObj->user, pObj->pass, pObj->token);
×
5136
  mndReleaseXnodeUserPass(pMnode, pObj);
×
5137
}
5138

5139
void mndXnodeHandleBecomeNotLeader() {
×
5140
  mInfo("mndxnode handle mnode become not leader");
×
5141
  mndCloseXnd();
×
5142
}
×
5143

5144
void mndRestartXnoded(SMnode *pMnode) {
×
5145
  mInfo("mndxnode restart xnoded");
×
5146
  mndCloseXnd();
×
5147

5148
  taosMsleep(200);
×
5149
  SXnodeUserPassObj *pObj = mndAcquireFirstXnodeUserPass(pMnode);
×
5150
  if (pObj == NULL) {
×
5151
    mInfo("mndXnode restart found no xnoded user pass");
×
5152
    return;
×
5153
  }
5154
  mndStartXnoded(pMnode, pObj->user, pObj->pass, pObj->token);
×
5155
  mndReleaseXnodeUserPass(pMnode, pObj);
×
5156
  mInfo("mndxnode xnoded restarted");
×
5157
  return;
×
5158
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc