• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4912

04 Jan 2026 09:05AM UTC coverage: 64.888% (-0.1%) from 65.028%
#4912

push

travis-ci

web-flow
merge: from main to 3.0 branch #34156

1206 of 4524 new or added lines in 22 files covered. (26.66%)

5351 existing lines in 123 files now uncovered.

194856 of 300296 relevant lines covered (64.89%)

118198896.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

26.95
/source/dnode/mnode/impl/src/mndXnode.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include <stdio.h>
18
#include "mndDef.h"
19
#include "tdatablock.h"
20
#include "types.h"
21
#ifndef WINDOWS
22
#include <curl/curl.h>
23
#endif
24
#include "audit.h"
25
#include "mndDnode.h"
26
#include "mndPrivilege.h"
27
#include "mndShow.h"
28
#include "mndTrans.h"
29
#include "mndUser.h"
30
#include "mndXnode.h"
31
#include "sdb.h"
32
#include "taoserror.h"
33
#include "tjson.h"
34
#include "xnode.h"
35

36
#define TSDB_XNODE_RESERVE_SIZE 64
37
#define XNODED_PIPE_SOCKET_URL "http://localhost"
38
typedef enum {
39
  HTTP_TYPE_GET = 0,
40
  HTTP_TYPE_POST,
41
  HTTP_TYPE_DELETE,
42
} EHttpType;
43
typedef struct {
44
  char   *data;
45
  int64_t dataLen;
46
} SCurlResp;
47

48
const int32_t defaultTimeout = 1000;
49

50
/** xnodes systable actions */
51
SSdbRaw *mndXnodeActionEncode(SXnodeObj *pObj);
52
SSdbRow *mndXnodeActionDecode(SSdbRaw *pRaw);
53
int32_t  mndXnodeActionInsert(SSdb *pSdb, SXnodeObj *pObj);
54
int32_t  mndXnodeActionUpdate(SSdb *pSdb, SXnodeObj *pOld, SXnodeObj *pNew);
55
int32_t  mndXnodeActionDelete(SSdb *pSdb, SXnodeObj *pObj);
56

57
/** @section xnodes request handlers */
58
static int32_t mndProcessCreateXnodeReq(SRpcMsg *pReq);
59
static int32_t mndProcessUpdateXnodeReq(SRpcMsg *pReq);
60
static int32_t mndProcessDropXnodeReq(SRpcMsg *pReq);
61
static int32_t mndProcessDrainXnodeReq(SRpcMsg *pReq);
62
static int32_t mndRetrieveXnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
63
static void    mndCancelGetNextXnode(SMnode *pMnode, void *pIter);
64

65
/** @section xnode task handlers */
66
SSdbRaw *mndXnodeTaskActionEncode(SXnodeTaskObj *pObj);
67
SSdbRow *mndXnodeTaskActionDecode(SSdbRaw *pRaw);
68
int32_t  mndXnodeTaskActionInsert(SSdb *pSdb, SXnodeTaskObj *pObj);
69
int32_t  mndXnodeTaskActionUpdate(SSdb *pSdb, SXnodeTaskObj *pOld, SXnodeTaskObj *pNew);
70
int32_t  mndXnodeTaskActionDelete(SSdb *pSdb, SXnodeTaskObj *pObj);
71

72
static int32_t mndProcessCreateXnodeTaskReq(SRpcMsg *pReq);
73
static int32_t mndProcessStartXnodeTaskReq(SRpcMsg *pReq);
74
static int32_t mndProcessStopXnodeTaskReq(SRpcMsg *pReq);
75
static int32_t mndProcessUpdateXnodeTaskReq(SRpcMsg *pReq);
76
static int32_t mndProcessDropXnodeTaskReq(SRpcMsg *pReq);
77
static int32_t mndRetrieveXnodeTasks(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
78
static void    mndCancelGetNextXnodeTask(SMnode *pMnode, void *pIter);
79

80
/** @section xnode task job handlers */
81
SSdbRaw *mndXnodeJobActionEncode(SXnodeJobObj *pObj);
82
SSdbRow *mndXnodeJobActionDecode(SSdbRaw *pRaw);
83
int32_t  mndXnodeJobActionInsert(SSdb *pSdb, SXnodeJobObj *pObj);
84
int32_t  mndXnodeJobActionUpdate(SSdb *pSdb, SXnodeJobObj *pOld, SXnodeJobObj *pNew);
85
int32_t  mndXnodeJobActionDelete(SSdb *pSdb, SXnodeJobObj *pObj);
86

87
static int32_t mndProcessCreateXnodeJobReq(SRpcMsg *pReq);
88
static int32_t mndProcessUpdateXnodeJobReq(SRpcMsg *pReq);
89
static int32_t mndProcessRebalanceXnodeJobReq(SRpcMsg *pReq);
90
static int32_t mndProcessRebalanceXnodeJobsWhereReq(SRpcMsg *pReq);
91
static int32_t mndProcessDropXnodeJobReq(SRpcMsg *pReq);
92
static int32_t mndRetrieveXnodeJobs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
93
static void    mndCancelGetNextXnodeJob(SMnode *pMnode, void *pIter);
94

95
/** @section xnode user pass handlers */
96
SSdbRaw *mndXnodeUserPassActionEncode(SXnodeUserPassObj *pObj);
97
SSdbRow *mndXnodeUserPassActionDecode(SSdbRaw *pRaw);
98
int32_t  mndXnodeUserPassActionInsert(SSdb *pSdb, SXnodeUserPassObj *pObj);
99
int32_t  mndXnodeUserPassActionUpdate(SSdb *pSdb, SXnodeUserPassObj *pOld, SXnodeUserPassObj *pNew);
100
int32_t  mndXnodeUserPassActionDelete(SSdb *pSdb, SXnodeUserPassObj *pObj);
101

102
/** @section xnode agent handlers */
103
static int32_t mndRetrieveXnodeAgents(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
104
static void    mndCancelGetNextXnodeAgent(SMnode *pMnode, void *pIter);
105

106
static int32_t mndGetXnodeStatus(SXnodeObj *pObj, char *status, int32_t statusLen);
107

108
/** @section xnoded mgmt */
109
void mndStartXnoded(SMnode *pMnode, int32_t userLen, char *user, int32_t passLen, char *pass);
110

111
SXnodeTaskObj *mndAcquireXnodeTask(SMnode *pMnode, int32_t tid);
112
SJson         *mndSendReqRetJson(const char *url, EHttpType type, int64_t timeout, const char *buf, int64_t bufLen);
113
static int32_t mndSetDropXnodeJobInfoToTrans(STrans *pTrans, SXnodeJobObj *pObj, bool force);
114
void           mndReleaseXnodeJob(SMnode *pMnode, SXnodeJobObj *pObj);
115

116
int32_t mndInitXnode(SMnode *pMnode) {
385,261✔
117
  SSdbTable table = {
385,261✔
118
      .sdbType = SDB_XNODE,
119
      .keyType = SDB_KEY_INT32,
120
      .encodeFp = (SdbEncodeFp)mndXnodeActionEncode,
121
      .decodeFp = (SdbDecodeFp)mndXnodeActionDecode,
122
      .insertFp = (SdbInsertFp)mndXnodeActionInsert,
123
      .updateFp = (SdbUpdateFp)mndXnodeActionUpdate,
124
      .deleteFp = (SdbDeleteFp)mndXnodeActionDelete,
125
  };
126

127
  int32_t code = sdbSetTable(pMnode->pSdb, table);
385,261✔
128
  if (code != 0) {
385,261✔
NEW
129
    return code;
×
130
  }
131

132
  SSdbTable tasks = {
385,261✔
133
      .sdbType = SDB_XNODE_TASK,
134
      .keyType = SDB_KEY_INT32,
135
      .encodeFp = (SdbEncodeFp)mndXnodeTaskActionEncode,
136
      .decodeFp = (SdbDecodeFp)mndXnodeTaskActionDecode,
137
      .insertFp = (SdbInsertFp)mndXnodeTaskActionInsert,
138
      .updateFp = (SdbUpdateFp)mndXnodeTaskActionUpdate,
139
      .deleteFp = (SdbDeleteFp)mndXnodeTaskActionDelete,
140
  };
141

142
  code = sdbSetTable(pMnode->pSdb, tasks);
385,261✔
143
  if (code != 0) {
385,261✔
NEW
144
    return code;
×
145
  }
146

147
  SSdbTable jobs = {
385,261✔
148
      .sdbType = SDB_XNODE_JOB,
149
      .keyType = SDB_KEY_INT32,
150
      .encodeFp = (SdbEncodeFp)mndXnodeJobActionEncode,
151
      .decodeFp = (SdbDecodeFp)mndXnodeJobActionDecode,
152
      .insertFp = (SdbInsertFp)mndXnodeJobActionInsert,
153
      .updateFp = (SdbUpdateFp)mndXnodeJobActionUpdate,
154
      .deleteFp = (SdbDeleteFp)mndXnodeJobActionDelete,
155
  };
156

157
  code = sdbSetTable(pMnode->pSdb, jobs);
385,261✔
158
  if (code != 0) {
385,261✔
NEW
159
    return code;
×
160
  }
161

162
  SSdbTable userPass = {
385,261✔
163
      .sdbType = SDB_XNODE_USER_PASS,
164
      .keyType = SDB_KEY_INT32,
165
      .encodeFp = (SdbEncodeFp)mndXnodeUserPassActionEncode,
166
      .decodeFp = (SdbDecodeFp)mndXnodeUserPassActionDecode,
167
      .insertFp = (SdbInsertFp)mndXnodeUserPassActionInsert,
168
      .updateFp = (SdbUpdateFp)mndXnodeUserPassActionUpdate,
169
      .deleteFp = (SdbDeleteFp)mndXnodeUserPassActionDelete,
170
  };
171

172
  code = sdbSetTable(pMnode->pSdb, userPass);
385,261✔
173
  if (code != 0) {
385,261✔
NEW
174
    return code;
×
175
  }
176

177
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_XNODE, mndProcessCreateXnodeReq);
385,261✔
178
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_XNODE, mndProcessUpdateXnodeReq);
385,261✔
179
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_XNODE, mndProcessDropXnodeReq);
385,261✔
180
  mndSetMsgHandle(pMnode, TDMT_MND_DRAIN_XNODE, mndProcessDrainXnodeReq);
385,261✔
181
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_XNODES, mndRetrieveXnodes);
385,261✔
182
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_XNODES, mndCancelGetNextXnode);
385,261✔
183

184
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_XNODE_TASK, mndProcessCreateXnodeTaskReq);
385,261✔
185
  mndSetMsgHandle(pMnode, TDMT_MND_START_XNODE_TASK, mndProcessStartXnodeTaskReq);
385,261✔
186
  mndSetMsgHandle(pMnode, TDMT_MND_STOP_XNODE_TASK, mndProcessStopXnodeTaskReq);
385,261✔
187
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_XNODE_TASK, mndProcessUpdateXnodeTaskReq);
385,261✔
188
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_XNODE_TASK, mndProcessDropXnodeTaskReq);
385,261✔
189
  // todo: stop
190
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_XNODE_TASKS, mndRetrieveXnodeTasks);
385,261✔
191
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_XNODE_TASKS, mndCancelGetNextXnodeTask);
385,261✔
192

193
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_XNODE_AGENT, mndProcessCreateXnodeReq);
385,261✔
194
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_XNODE_AGENT, mndProcessDropXnodeReq);
385,261✔
195
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_XNODE_AGENTS, mndRetrieveXnodeAgents);
385,261✔
196
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_XNODE_AGENTS, mndCancelGetNextXnodeAgent);
385,261✔
197

198
  mndSetMsgHandle(pMnode, TDMT_MND_CREATE_XNODE_JOB, mndProcessCreateXnodeJobReq);
385,261✔
199
  mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_XNODE_JOB, mndProcessUpdateXnodeJobReq);
385,261✔
200
  mndSetMsgHandle(pMnode, TDMT_MND_REBALANCE_XNODE_JOB, mndProcessRebalanceXnodeJobReq);
385,261✔
201
  mndSetMsgHandle(pMnode, TDMT_MND_REBALANCE_XNODE_JOBS_WHERE, mndProcessRebalanceXnodeJobsWhereReq);
385,261✔
202
  mndSetMsgHandle(pMnode, TDMT_MND_DROP_XNODE_JOB, mndProcessDropXnodeJobReq);
385,261✔
203
  mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_XNODE_JOBS, mndRetrieveXnodeJobs);
385,261✔
204
  mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_XNODE_JOBS, mndCancelGetNextXnodeJob);
385,261✔
205

206
  return 0;
385,261✔
207
}
208

209
/** tools section **/
210

211
int32_t xnodeCheckPasswordFmt(const char *pwd) {
118✔
212
  if (strcmp(pwd, "taosdata") == 0) {
118✔
NEW
213
    return 0;
×
214
  }
215

216
  if (tsEnableStrongPassword == 0) {
118✔
NEW
217
    for (char c = *pwd; c != 0; c = *(++pwd)) {
×
NEW
218
      if (c == ' ' || c == '\'' || c == '\"' || c == '`' || c == '\\') {
×
NEW
219
        return TSDB_CODE_MND_INVALID_PASS_FORMAT;
×
220
      }
221
    }
NEW
222
    return 0;
×
223
  }
224

225
  int32_t len = strlen(pwd);
118✔
226
  if (len < TSDB_PASSWORD_MIN_LEN) {
118✔
NEW
227
    return TSDB_CODE_PAR_PASSWD_TOO_SHORT_OR_EMPTY;
×
228
  }
229

230
  if (len > TSDB_PASSWORD_MAX_LEN) {
118✔
NEW
231
    return TSDB_CODE_PAR_NAME_OR_PASSWD_TOO_LONG;
×
232
  }
233

234
  if (taosIsComplexString(pwd)) {
118✔
235
    return 0;
118✔
236
  }
237

NEW
238
  return TSDB_CODE_MND_INVALID_PASS_FORMAT;
×
239
}
240

241
static void swapFields(int32_t *newLen, char **ppNewStr, int32_t *oldLen, char **ppOldStr) {
354✔
242
  if (*newLen > 0) {
354✔
NEW
243
    int32_t tempLen = *newLen;
×
NEW
244
    *newLen = *oldLen;
×
NEW
245
    *oldLen = tempLen;
×
246

NEW
247
    char *tempStr = *ppNewStr;
×
NEW
248
    *ppNewStr = *ppOldStr;
×
NEW
249
    *ppOldStr = tempStr;
×
250
  }
251
}
354✔
252

253
/** xnode section **/
254

255
void mndCleanupXnode(SMnode *pMnode) {}
385,200✔
256

257
SXnodeObj *mndAcquireXnode(SMnode *pMnode, int32_t xnodeId) {
3,540✔
258
  SXnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_XNODE, &xnodeId);
3,540✔
259
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
3,540✔
260
    terrno = TSDB_CODE_MND_XNODE_NOT_EXIST;
3,540✔
261
  }
262
  return pObj;
3,540✔
263
}
264

265
void mndReleaseXnode(SMnode *pMnode, SXnodeObj *pObj) {
1,534✔
266
  SSdb *pSdb = pMnode->pSdb;
1,534✔
267
  sdbRelease(pSdb, pObj);
1,534✔
268
}
1,534✔
269

270
SSdbRaw *mndXnodeActionEncode(SXnodeObj *pObj) {
1,062✔
271
  int32_t code = 0;
1,062✔
272
  int32_t lino = 0;
1,062✔
273
  terrno = TSDB_CODE_OUT_OF_MEMORY;
1,062✔
274

275
  if (NULL == pObj) {
1,062✔
NEW
276
    terrno = TSDB_CODE_INVALID_PARA;
×
NEW
277
    return NULL;
×
278
  }
279

280
  int32_t rawDataLen = sizeof(SXnodeObj) + TSDB_XNODE_RESERVE_SIZE + pObj->urlLen + pObj->statusLen;
1,062✔
281

282
  SSdbRaw *pRaw = sdbAllocRaw(SDB_XNODE, TSDB_XNODE_VER_NUMBER, rawDataLen);
1,062✔
283
  if (pRaw == NULL) goto _OVER;
1,062✔
284

285
  int32_t dataPos = 0;
1,062✔
286
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
1,062✔
287
  SDB_SET_INT32(pRaw, dataPos, pObj->urlLen, _OVER)
1,062✔
288
  SDB_SET_BINARY(pRaw, dataPos, pObj->url, pObj->urlLen, _OVER)
1,062✔
289
  SDB_SET_INT32(pRaw, dataPos, pObj->statusLen, _OVER)
1,062✔
290
  SDB_SET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
1,062✔
291
  SDB_SET_INT64(pRaw, dataPos, pObj->createTime, _OVER)
1,062✔
292
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
1,062✔
293

294
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
1,062✔
295

296
  terrno = 0;
1,062✔
297

298
_OVER:
1,062✔
299
  if (terrno != 0) {
1,062✔
NEW
300
    mError("xnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
NEW
301
    sdbFreeRaw(pRaw);
×
NEW
302
    return NULL;
×
303
  }
304

305
  mTrace("xnode:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
1,062✔
306
  return pRaw;
1,062✔
307
}
308

309
SSdbRow *mndXnodeActionDecode(SSdbRaw *pRaw) {
708✔
310
  int32_t code = 0;
708✔
311
  int32_t lino = 0;
708✔
312
  terrno = TSDB_CODE_OUT_OF_MEMORY;
708✔
313
  SSdbRow   *pRow = NULL;
708✔
314
  SXnodeObj *pObj = NULL;
708✔
315

316
  if (NULL == pRaw) {
708✔
NEW
317
    terrno = TSDB_CODE_INVALID_PARA;
×
NEW
318
    return NULL;
×
319
  }
320

321
  int8_t sver = 0;
708✔
322
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
708✔
323

324
  if (sver != TSDB_XNODE_VER_NUMBER) {
708✔
NEW
325
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
NEW
326
    goto _OVER;
×
327
  }
328

329
  pRow = sdbAllocRow(sizeof(SXnodeObj));
708✔
330
  if (pRow == NULL) goto _OVER;
708✔
331

332
  pObj = sdbGetRowObj(pRow);
708✔
333
  if (pObj == NULL) goto _OVER;
708✔
334

335
  int32_t dataPos = 0;
708✔
336
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
708✔
337
  SDB_GET_INT32(pRaw, dataPos, &pObj->urlLen, _OVER)
708✔
338
  if (pObj->urlLen > 0) {
708✔
339
    pObj->url = taosMemoryCalloc(pObj->urlLen, 1);
708✔
340
    if (pObj->url == NULL) goto _OVER;
708✔
341
    SDB_GET_BINARY(pRaw, dataPos, pObj->url, pObj->urlLen, _OVER)
708✔
342
  } else {
NEW
343
    pObj->url = NULL;
×
344
  }
345
  SDB_GET_INT32(pRaw, dataPos, &pObj->statusLen, _OVER)
708✔
346
  if (pObj->statusLen > 0) {
708✔
NEW
347
    pObj->status = taosMemoryCalloc(pObj->statusLen, 1);
×
NEW
348
    if (pObj->status == NULL) goto _OVER;
×
NEW
349
    SDB_GET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
×
350
  } else {
351
    pObj->status = NULL;
708✔
352
  }
353
  SDB_GET_INT64(pRaw, dataPos, &pObj->createTime, _OVER)
708✔
354
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
708✔
355

356
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
708✔
357

358
  terrno = 0;
708✔
359

360
_OVER:
708✔
361
  if (terrno != 0) {
708✔
NEW
362
    mError("xnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
NEW
363
    if (pObj != NULL) {
×
NEW
364
      taosMemoryFreeClear(pObj->url);
×
365
    }
NEW
366
    taosMemoryFreeClear(pRow);
×
NEW
367
    return NULL;
×
368
  }
369

370
  mTrace("xnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
708✔
371
  return pRow;
708✔
372
}
373

374
static void mndFreeXnode(SXnodeObj *pObj) {
944✔
375
  if (pObj == NULL) return;
944✔
376
  if (pObj->url != NULL) {
944✔
377
    taosMemoryFreeClear(pObj->url);
944✔
378
  }
379
  if (pObj->status != NULL) {
944✔
NEW
380
    taosMemoryFreeClear(pObj->status);
×
381
  }
382
}
383

384
int32_t mndXnodeActionInsert(SSdb *pSdb, SXnodeObj *pObj) {
236✔
385
  mDebug("xnode:%d, perform insert action, row:%p", pObj->id, pObj);
236✔
386
  return 0;
236✔
387
}
388

389
int32_t mndXnodeActionDelete(SSdb *pSdb, SXnodeObj *pObj) {
708✔
390
  mDebug("xnode:%d, perform delete action, row:%p", pObj->id, pObj);
708✔
391
  mndFreeXnode(pObj);
708✔
392
  return 0;
708✔
393
}
394

395
int32_t mndXnodeActionUpdate(SSdb *pSdb, SXnodeObj *pOld, SXnodeObj *pNew) {
354✔
396
  mDebug("xnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
354✔
397

398
  taosWLockLatch(&pOld->lock);
354✔
399
  swapFields(&pNew->statusLen, &pNew->status, &pOld->statusLen, &pOld->status);
354✔
400
  if (pNew->updateTime > pOld->updateTime) {
354✔
NEW
401
    pOld->updateTime = pNew->updateTime;
×
402
  }
403
  taosWUnLockLatch(&pOld->lock);
354✔
404
  return 0;
354✔
405
}
406

407
SXnodeUserPassObj *mndAcquireFirstXnodeUserPass(SMnode *pMnode) {
356,999✔
408
  SSdb *pSdb = pMnode->pSdb;
356,999✔
409

410
  void *pIter = NULL;
356,999✔
NEW
411
  while (1) {
×
412
    SXnodeUserPassObj *pObj = NULL;
356,999✔
413
    pIter = sdbFetch(pSdb, SDB_XNODE_USER_PASS, pIter, (void **)&pObj);
356,999✔
414
    if (pIter == NULL) break;
356,999✔
415

NEW
416
    if (pObj != NULL) {
×
NEW
417
      sdbCancelFetch(pSdb, pIter);
×
NEW
418
      return pObj;
×
419
    }
420

NEW
421
    sdbRelease(pSdb, pObj);
×
422
  }
423
  terrno = TSDB_CODE_MND_XNODE_USER_PASS_NOT_EXIST;
356,999✔
424
  return NULL;
356,999✔
425
}
426

427
static int32_t mndSetCreateXnodeUserPassRedoLogs(STrans *pTrans, SXnodeUserPassObj *pObj) {
118✔
428
  int32_t  code = 0;
118✔
429
  SSdbRaw *pRedoRaw = mndXnodeUserPassActionEncode(pObj);
118✔
430
  if (pRedoRaw == NULL) {
118✔
NEW
431
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
432
    if (terrno != 0) code = terrno;
×
NEW
433
    TAOS_RETURN(code);
×
434
  }
435
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
118✔
436
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
118✔
437
  TAOS_RETURN(code);
118✔
438
}
439

440
static int32_t mndSetCreateXnodeUserPassCommitLogs(STrans *pTrans, SXnodeUserPassObj *pObj) {
118✔
441
  int32_t  code = 0;
118✔
442
  SSdbRaw *pCommitRaw = mndXnodeUserPassActionEncode(pObj);
118✔
443
  if (pCommitRaw == NULL) {
118✔
NEW
444
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
445
    if (terrno != 0) code = terrno;
×
NEW
446
    TAOS_RETURN(code);
×
447
  }
448
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
118✔
449
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
118✔
450
  TAOS_RETURN(code);
118✔
451
}
452

453
static int32_t mndSetCreateXnodeRedoLogs(STrans *pTrans, SXnodeObj *pObj) {
236✔
454
  int32_t  code = 0;
236✔
455
  SSdbRaw *pRedoRaw = mndXnodeActionEncode(pObj);
236✔
456
  if (pRedoRaw == NULL) {
236✔
NEW
457
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
458
    if (terrno != 0) code = terrno;
×
NEW
459
    TAOS_RETURN(code);
×
460
  }
461
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
236✔
462
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
236✔
463
  TAOS_RETURN(code);
236✔
464
}
465

466
static int32_t mndSetCreateXnodeUndoLogs(STrans *pTrans, SXnodeObj *pObj) {
236✔
467
  int32_t  code = 0;
236✔
468
  SSdbRaw *pUndoRaw = mndXnodeActionEncode(pObj);
236✔
469
  if (pUndoRaw == NULL) {
236✔
NEW
470
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
471
    if (terrno != 0) code = terrno;
×
NEW
472
    TAOS_RETURN(code);
×
473
  }
474
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
236✔
475
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
236✔
476
  TAOS_RETURN(code);
236✔
477
}
478

479
static int32_t mndSetCreateXnodeCommitLogs(STrans *pTrans, SXnodeObj *pObj) {
236✔
480
  int32_t  code = 0;
236✔
481
  SSdbRaw *pCommitRaw = mndXnodeActionEncode(pObj);
236✔
482
  if (pCommitRaw == NULL) {
236✔
NEW
483
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
484
    if (terrno != 0) code = terrno;
×
NEW
485
    TAOS_RETURN(code);
×
486
  }
487
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
236✔
488
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
236✔
489
  TAOS_RETURN(code);
236✔
490
}
491

492
static int32_t mndCreateXnode(SMnode *pMnode, SRpcMsg *pReq, SMCreateXnodeReq *pCreate) {
236✔
493
  int32_t code = -1;
236✔
494
  STrans *pTrans = NULL;
236✔
495

496
  SXnodeObj xnodeObj = {0};
236✔
497
  xnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_XNODE);
236✔
498

499
  xnodeObj.urlLen = pCreate->urlLen;
236✔
500
  if (xnodeObj.urlLen > TSDB_XNODE_URL_LEN) {
236✔
NEW
501
    code = TSDB_CODE_MND_XNODE_TOO_LONG_URL;
×
NEW
502
    goto _OVER;
×
503
  }
504
  xnodeObj.url = taosMemoryCalloc(1, pCreate->urlLen);
236✔
505
  if (xnodeObj.url == NULL) goto _OVER;
236✔
506
  (void)memcpy(xnodeObj.url, pCreate->url, pCreate->urlLen);
236✔
507

508
  xnodeObj.createTime = taosGetTimestampMs();
236✔
509
  xnodeObj.updateTime = xnodeObj.createTime;
236✔
510
  mInfo("create xnode, xnode.id:%d, xnode.url: %s, xnode.time:%" PRId64, xnodeObj.id, xnodeObj.url,
236✔
511
        xnodeObj.createTime);
512

513
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-xnode");
236✔
514
  if (pTrans == NULL) {
236✔
NEW
515
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
516
    if (terrno != 0) code = terrno;
×
NEW
517
    mInfo("failed to create transaction for xnode:%s, code:0x%x:%s", pCreate->url, code, tstrerror(code));
×
NEW
518
    goto _OVER;
×
519
  }
520
  mndTransSetSerial(pTrans);
236✔
521

522
  mInfo("trans:%d, used to create xnode:%s as xnode:%d", pTrans->id, pCreate->url, xnodeObj.id);
236✔
523

524
  TAOS_CHECK_GOTO(mndSetCreateXnodeRedoLogs(pTrans, &xnodeObj), NULL, _OVER);
236✔
525
  TAOS_CHECK_GOTO(mndSetCreateXnodeUndoLogs(pTrans, &xnodeObj), NULL, _OVER);
236✔
526
  TAOS_CHECK_GOTO(mndSetCreateXnodeCommitLogs(pTrans, &xnodeObj), NULL, _OVER);
236✔
527
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
236✔
528

529
  code = 0;
236✔
530

531
_OVER:
236✔
532
  mndFreeXnode(&xnodeObj);
236✔
533
  mndTransDrop(pTrans);
236✔
534
  TAOS_RETURN(code);
236✔
535
}
536

537
static SXnodeObj *mndAcquireXnodeByURL(SMnode *pMnode, char *url) {
1,770✔
538
  SSdb *pSdb = pMnode->pSdb;
1,770✔
539

540
  void *pIter = NULL;
1,770✔
541
  while (1) {
354✔
542
    SXnodeObj *pXnode = NULL;
2,124✔
543
    pIter = sdbFetch(pSdb, SDB_XNODE, pIter, (void **)&pXnode);
2,124✔
544
    if (pIter == NULL) break;
2,124✔
545

546
    if (strcasecmp(url, pXnode->url) == 0) {
708✔
547
      sdbCancelFetch(pSdb, pIter);
354✔
548
      return pXnode;
354✔
549
    }
550

551
    sdbRelease(pSdb, pXnode);
354✔
552
  }
553

554
  mError("xnode:%s, not found", url);
1,416✔
555
  terrno = TSDB_CODE_MND_XNODE_NOT_EXIST;
1,416✔
556
  return NULL;
1,416✔
557
}
558

559
static int32_t mndStoreXnodeUserPass(SMnode *pMnode, SRpcMsg *pReq, SMCreateXnodeReq *pCreate) {
118✔
560
  int32_t code = -1;
118✔
561
  STrans *pTrans = NULL;
118✔
562

563
  SXnodeUserPassObj upObj = {0};
118✔
564
  upObj.id = sdbGetMaxId(pMnode->pSdb, SDB_XNODE_USER_PASS);
118✔
565

566
  upObj.userLen = pCreate->userLen;
118✔
567
  if (upObj.userLen > TSDB_USER_LEN) {
118✔
NEW
568
    code = TSDB_CODE_MND_USER_NOT_AVAILABLE;
×
NEW
569
    goto _OVER;
×
570
  }
571
  upObj.user = taosMemoryCalloc(1, pCreate->userLen);
118✔
572
  if (upObj.user == NULL) goto _OVER;
118✔
573
  (void)memcpy(upObj.user, pCreate->user, pCreate->userLen);
118✔
574

575
  upObj.passLen = pCreate->passLen;
118✔
576
  if (upObj.passLen > TSDB_USER_PASSWORD_LONGLEN) {
118✔
NEW
577
    code = TSDB_CODE_MND_INVALID_PASS_FORMAT;
×
NEW
578
    goto _OVER;
×
579
  }
580
  upObj.pass = taosMemoryCalloc(1, pCreate->passLen);
118✔
581
  if (upObj.pass == NULL) goto _OVER;
118✔
582
  (void)memcpy(upObj.pass, pCreate->pass, pCreate->passLen);
118✔
583

584
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-xnode");
118✔
585
  if (pTrans == NULL) {
118✔
NEW
586
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
587
    if (terrno != 0) code = terrno;
×
NEW
588
    mInfo("failed to create transaction for xnode:%s, code:0x%x:%s", pCreate->url, code, tstrerror(code));
×
NEW
589
    goto _OVER;
×
590
  }
591
  mndTransSetSerial(pTrans);
118✔
592

593
  mInfo("trans:%d, used to create xnode:%s as xnode:%d", pTrans->id, pCreate->url, upObj.id);
118✔
594

595
  TAOS_CHECK_GOTO(mndSetCreateXnodeUserPassRedoLogs(pTrans, &upObj), NULL, _OVER);
118✔
596
  TAOS_CHECK_GOTO(mndSetCreateXnodeUserPassCommitLogs(pTrans, &upObj), NULL, _OVER);
118✔
597
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
118✔
598

599
  code = 0;
118✔
600

601
_OVER:
118✔
602
  if (upObj.user != NULL) {
118✔
603
    taosMemoryFree(upObj.user);
118✔
604
  }
605
  if (upObj.pass != NULL) {
118✔
606
    taosMemoryFree(upObj.pass);
118✔
607
  }
608
  mndTransDrop(pTrans);
118✔
609
  TAOS_RETURN(code);
118✔
610
}
611

612
static int32_t httpCreateXnode(SXnodeObj *pObj) {
236✔
613
  int32_t code = 0;
236✔
614
  SJson  *pJson = NULL;
236✔
615
  SJson  *postContent = NULL;
236✔
616
  char   *pContStr = NULL;
236✔
617

618
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
236✔
619
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/xnode", XNODED_PIPE_SOCKET_URL);
236✔
620
  postContent = tjsonCreateObject();
236✔
621
  if (postContent == NULL) {
236✔
NEW
622
    code = terrno;
×
NEW
623
    goto _OVER;
×
624
  }
625
  TAOS_CHECK_GOTO(tjsonAddDoubleToObject(postContent, "id", (double)pObj->id), NULL, _OVER);
236✔
626
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "url", pObj->url), NULL, _OVER);
236✔
627
  pContStr = tjsonToUnformattedString(postContent);
236✔
628
  if (pContStr == NULL) {
236✔
NEW
629
    code = terrno;
×
NEW
630
    goto _OVER;
×
631
  }
632
  (void)mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, defaultTimeout, pContStr, strlen(pContStr));
236✔
633

634
_OVER:
236✔
635
  if (postContent != NULL) {
236✔
636
    tjsonDelete(postContent);
236✔
637
  }
638
  if (pContStr != NULL) {
236✔
639
    taosMemFree(pContStr);
236✔
640
  }
641
  if (pJson != NULL) {
236✔
NEW
642
    tjsonDelete(pJson);
×
643
  }
644
  TAOS_RETURN(code);
236✔
645
}
646

647
static int32_t mndProcessCreateXnodeReq(SRpcMsg *pReq) {
236✔
648
  SMnode          *pMnode = pReq->info.node;
236✔
649
  int32_t          code = -1;
236✔
650
  SXnodeObj       *pObj = NULL;
236✔
651
  SMCreateXnodeReq createReq = {0};
236✔
652

653
  if ((code = grantCheck(TSDB_GRANT_XNODE)) != TSDB_CODE_SUCCESS) {
236✔
NEW
654
    mError("failed to create xnode, code:%s", tstrerror(code));
×
NEW
655
    goto _OVER;
×
656
  }
657

658
  TAOS_CHECK_GOTO(tDeserializeSMCreateXnodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
236✔
659

660
  mDebug("xnode:%s, start to create", createReq.url);
236✔
661
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_CREATE_XNODE), NULL, _OVER);
236✔
662

663
  pObj = mndAcquireXnodeByURL(pMnode, createReq.url);
236✔
664
  if (pObj != NULL) {
236✔
NEW
665
    code = TSDB_CODE_MND_XNODE_ALREADY_EXIST;
×
NEW
666
    goto _OVER;
×
667
  }
668

669
  int32_t numOfRows = sdbGetSize(pMnode->pSdb, SDB_XNODE_USER_PASS);
236✔
670
  if (numOfRows <= 0) {
236✔
671
    if (strlen(createReq.user) == 0 || strlen(createReq.pass) == 0) {
118✔
NEW
672
      code = TSDB_CODE_MND_XNODE_NEED_USER_PASS;
×
NEW
673
      goto _OVER;
×
674
    }
675
    TAOS_CHECK_GOTO(xnodeCheckPasswordFmt(createReq.pass), NULL, _OVER);
118✔
676
    // store user pass
677
    code = mndStoreXnodeUserPass(pMnode, pReq, &createReq);
118✔
678
    if (code != 0) goto _OVER;
118✔
679
    code = TSDB_CODE_ACTION_IN_PROGRESS;
118✔
680
    mndStartXnoded(pMnode, createReq.userLen, createReq.user, createReq.passLen, createReq.pass);
118✔
681
  }
682

683
  code = mndCreateXnode(pMnode, pReq, &createReq);
236✔
684
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
236✔
685

686
  taosMsleep(100);
236✔
687
  pObj = mndAcquireXnodeByURL(pMnode, createReq.url);
236✔
688
  if (pObj == NULL) {
236✔
NEW
689
    code = TSDB_CODE_MND_XNODE_NOT_EXIST;
×
NEW
690
    goto _OVER;
×
691
  }
692
  // send request
693
  (void)httpCreateXnode(pObj);
236✔
694

695
_OVER:
236✔
696
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
236✔
NEW
697
    mError("xnode:%s, failed to create since %s", createReq.url, tstrerror(code));
×
698
  }
699
  mndReleaseXnode(pMnode, pObj);
236✔
700
  tFreeSMCreateXnodeReq(&createReq);
236✔
701
  TAOS_RETURN(code);
236✔
702
}
703

NEW
704
static int32_t mndUpdateXnode(SMnode *pMnode, SXnodeObj *pXnode, SRpcMsg *pReq) {
×
NEW
705
  mInfo("xnode:%d, start to update", pXnode->id);
×
NEW
706
  int32_t   code = -1;
×
NEW
707
  STrans   *pTrans = NULL;
×
NEW
708
  SXnodeObj xnodeObj = {0};
×
NEW
709
  xnodeObj.id = pXnode->id;
×
NEW
710
  xnodeObj.updateTime = taosGetTimestampMs();
×
711

NEW
712
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-xnode");
×
NEW
713
  if (pTrans == NULL) {
×
NEW
714
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
715
    if (terrno != 0) code = terrno;
×
NEW
716
    goto _OVER;
×
717
  }
NEW
718
  mInfo("trans:%d, used to update xnode:%d", pTrans->id, xnodeObj.id);
×
719

NEW
720
  TAOS_CHECK_GOTO(mndSetCreateXnodeCommitLogs(pTrans, &xnodeObj), NULL, _OVER);
×
NEW
721
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
NEW
722
  code = 0;
×
723

NEW
724
_OVER:
×
NEW
725
  mndFreeXnode(&xnodeObj);
×
NEW
726
  mndTransDrop(pTrans);
×
NEW
727
  TAOS_RETURN(code);
×
728
}
729

NEW
730
static int32_t mndUpdateAllXnodes(SMnode *pMnode, SRpcMsg *pReq) {
×
NEW
731
  mInfo("update all xnodes");
×
NEW
732
  SSdb   *pSdb = pMnode->pSdb;
×
NEW
733
  int32_t code = 0;
×
NEW
734
  int32_t rows = 0;
×
NEW
735
  int32_t numOfRows = sdbGetSize(pSdb, SDB_XNODE);
×
736

NEW
737
  void *pIter = NULL;
×
NEW
738
  while (1) {
×
NEW
739
    SXnodeObj *pObj = NULL;
×
NEW
740
    ESdbStatus objStatus = 0;
×
NEW
741
    pIter = sdbFetchAll(pSdb, SDB_XNODE, pIter, (void **)&pObj, &objStatus, true);
×
NEW
742
    if (pIter == NULL) break;
×
743

NEW
744
    rows++;
×
NEW
745
    void *transReq = NULL;
×
NEW
746
    if (rows == numOfRows) transReq = pReq;
×
NEW
747
    code = mndUpdateXnode(pMnode, pObj, transReq);
×
NEW
748
    sdbRelease(pSdb, pObj);
×
749

NEW
750
    if (code != 0) break;
×
751
  }
752

NEW
753
  if (code == 0 && rows == numOfRows) {
×
NEW
754
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
755
  }
756

NEW
757
  TAOS_RETURN(code);
×
758
}
759

NEW
760
static int32_t mndProcessUpdateXnodeReq(SRpcMsg *pReq) {
×
NEW
761
  SMnode          *pMnode = pReq->info.node;
×
NEW
762
  int32_t          code = -1;
×
NEW
763
  SXnodeObj       *pObj = NULL;
×
NEW
764
  SMUpdateXnodeReq updateReq = {0};
×
765

NEW
766
  TAOS_CHECK_GOTO(tDeserializeSMUpdateXnodeReq(pReq->pCont, pReq->contLen, &updateReq), NULL, _OVER);
×
NEW
767
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_UPDATE_XNODE), NULL, _OVER);
×
768

NEW
769
  if (updateReq.xnodeId == -1) {
×
NEW
770
    code = mndUpdateAllXnodes(pMnode, pReq);
×
771
  } else {
NEW
772
    pObj = mndAcquireXnode(pMnode, updateReq.xnodeId);
×
NEW
773
    if (pObj == NULL) {
×
NEW
774
      code = TSDB_CODE_MND_XNODE_NOT_EXIST;
×
NEW
775
      goto _OVER;
×
776
    }
NEW
777
    code = mndUpdateXnode(pMnode, pObj, pReq);
×
NEW
778
    if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
779
  }
780

NEW
781
_OVER:
×
NEW
782
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
783
    if (updateReq.xnodeId != -1) {
×
NEW
784
      mError("xnode:%d, failed to update since %s", updateReq.xnodeId, tstrerror(code));
×
785
    }
786
  }
787

NEW
788
  mndReleaseXnode(pMnode, pObj);
×
NEW
789
  tFreeSMUpdateXnodeReq(&updateReq);
×
NEW
790
  TAOS_RETURN(code);
×
791
}
792

793
static int32_t mndSetDropXnodeRedoLogs(STrans *pTrans, SXnodeObj *pObj) {
118✔
794
  int32_t  code = 0;
118✔
795
  SSdbRaw *pRedoRaw = mndXnodeActionEncode(pObj);
118✔
796
  if (pRedoRaw == NULL) {
118✔
NEW
797
    code = terrno;
×
NEW
798
    TAOS_RETURN(code);
×
799
  }
800

801
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
118✔
802
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
118✔
803
  TAOS_RETURN(code);
118✔
804
}
805

806
static int32_t mndSetDropXnodeCommitLogs(STrans *pTrans, SXnodeObj *pObj) {
118✔
807
  int32_t  code = 0;
118✔
808
  SSdbRaw *pCommitRaw = mndXnodeActionEncode(pObj);
118✔
809
  if (pCommitRaw == NULL) {
118✔
NEW
810
    code = terrno;
×
NEW
811
    TAOS_RETURN(code);
×
812
  }
813

814
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
118✔
815
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
118✔
816
  TAOS_RETURN(code);
118✔
817
}
818

819
static int32_t mndSetDropXnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SXnodeObj *pObj, bool force) {
118✔
820
  if (pObj == NULL) return 0;
118✔
821
  TAOS_CHECK_RETURN(mndSetDropXnodeRedoLogs(pTrans, pObj));
118✔
822
  TAOS_CHECK_RETURN(mndSetDropXnodeCommitLogs(pTrans, pObj));
118✔
823
  return 0;
118✔
824
}
825

826
static int32_t mndDropXnode(SMnode *pMnode, SRpcMsg *pReq, SXnodeObj *pObj) {
118✔
827
  int32_t code = 0;
118✔
828
  int32_t lino = 0;
118✔
829

830
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-xnode");
118✔
831
  TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
118✔
832

833
  mndTransSetSerial(pTrans);
118✔
834
  mDebug("trans:%d, to drop xnode:%d", pTrans->id, pObj->id);
118✔
835

836
  code = mndSetDropXnodeInfoToTrans(pMnode, pTrans, pObj, false);
118✔
837
  TSDB_CHECK_CODE(code, lino, _OVER);
118✔
838

839
  code = mndTransPrepare(pMnode, pTrans);
118✔
840

841
_OVER:
118✔
842
  mndTransDrop(pTrans);
118✔
843
  TAOS_RETURN(code);
118✔
844
}
845

NEW
846
static int32_t mndDrainXnode(SMnode *pMnode, SRpcMsg *pReq, SXnodeObj *pObj) {
×
NEW
847
  int32_t code = 0;
×
NEW
848
  int32_t lino = 0;
×
849

NEW
850
  SXnodeObj xnodeObj = {0};
×
NEW
851
  xnodeObj.id = pObj->id;
×
NEW
852
  xnodeObj.status = "drain";
×
NEW
853
  xnodeObj.statusLen = strlen(xnodeObj.status) + 1;
×
NEW
854
  xnodeObj.updateTime = taosGetTimestampMs();
×
855

NEW
856
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drain-xnode");
×
NEW
857
  TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
×
858

NEW
859
  mndTransSetSerial(pTrans);
×
NEW
860
  mDebug("trans:%d, to drain xnode:%d", pTrans->id, xnodeObj.id);
×
861

NEW
862
  TAOS_CHECK_GOTO(mndSetCreateXnodeCommitLogs(pTrans, &xnodeObj), NULL, _OVER);
×
NEW
863
  code = mndTransPrepare(pMnode, pTrans);
×
864

NEW
865
_OVER:
×
NEW
866
  mndTransDrop(pTrans);
×
NEW
867
  TAOS_RETURN(code);
×
868
}
869

870
static int32_t mndProcessDropXnodeReq(SRpcMsg *pReq) {
3,658✔
871
  SMnode        *pMnode = pReq->info.node;
3,658✔
872
  int32_t        code = -1;
3,658✔
873
  SXnodeObj     *pObj = NULL;
3,658✔
874
  SMDropXnodeReq dropReq = {0};
3,658✔
875
  SJson         *pJson = NULL;
3,658✔
876

877
  TAOS_CHECK_GOTO(tDeserializeSMDropXnodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
3,658✔
878

879
  mDebug("xnode:%d, start to drop", dropReq.xnodeId);
3,658✔
880
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_DROP_XNODE), NULL, _OVER);
3,658✔
881

882
  if (dropReq.xnodeId <= 0 && (dropReq.url == NULL || strlen(dropReq.url) <= 0)) {
3,658✔
NEW
883
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
NEW
884
    goto _OVER;
×
885
  }
886

887
  if (dropReq.url != NULL && strlen(dropReq.url) > 0) {
3,658✔
888
    pObj = mndAcquireXnodeByURL(pMnode, dropReq.url);
1,298✔
889
    if (pObj == NULL) {
1,298✔
890
      code = terrno;
1,180✔
891
      goto _OVER;
1,180✔
892
    }
893
  } else {
894
    pObj = mndAcquireXnode(pMnode, dropReq.xnodeId);
2,360✔
895
    if (pObj == NULL) {
2,360✔
896
      code = terrno;
2,360✔
897
      goto _OVER;
2,360✔
898
    }
899
  }
900

901
  // send request
902
  char xnodeUrl[TSDB_XNODE_URL_LEN] = {0};
118✔
903
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/xnode/%d?force=%s", XNODED_PIPE_SOCKET_URL, pObj->id,
118✔
904
           dropReq.force ? "true" : "false");
118✔
905
  (void)mndSendReqRetJson(xnodeUrl, HTTP_TYPE_DELETE, defaultTimeout, NULL, 0);
118✔
906

907
  code = mndDropXnode(pMnode, pReq, pObj);
118✔
908
  if (code == 0) {
118✔
909
    code = TSDB_CODE_ACTION_IN_PROGRESS;
118✔
910
  }
911

912
_OVER:
3,658✔
913
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
3,658✔
914
    mError("xnode:%d, failed to drop since %s", dropReq.xnodeId, tstrerror(code));
3,540✔
915
  }
916
  if (pJson != NULL) {
3,658✔
NEW
917
    tjsonDelete(pJson);
×
918
  }
919
  if (pObj != NULL) {
3,658✔
920
    mndReleaseXnode(pMnode, pObj);
118✔
921
  }
922
  tFreeSMDropXnodeReq(&dropReq);
3,658✔
923
  TAOS_RETURN(code);
3,658✔
924
}
925

926
static int32_t mndProcessDrainXnodeReq(SRpcMsg *pReq) {
1,180✔
927
  SMnode         *pMnode = pReq->info.node;
1,180✔
928
  int32_t         code = -1;
1,180✔
929
  SXnodeObj      *pObj = NULL;
1,180✔
930
  SMDrainXnodeReq drainReq = {0};
1,180✔
931
  SJson          *pJson = NULL;
1,180✔
932
  SJson          *postContent = NULL;
1,180✔
933
  char           *pContStr = NULL;
1,180✔
934

935
  TAOS_CHECK_GOTO(tDeserializeSMDrainXnodeReq(pReq->pCont, pReq->contLen, &drainReq), NULL, _OVER);
1,180✔
936

937
  mDebug("xnode:%d, start to drain", drainReq.xnodeId);
1,180✔
938
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_DRAIN_XNODE), NULL, _OVER);
1,180✔
939

940
  if (drainReq.xnodeId <= 0) {
1,180✔
NEW
941
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
NEW
942
    goto _OVER;
×
943
  }
944

945
  pObj = mndAcquireXnode(pMnode, drainReq.xnodeId);
1,180✔
946
  if (pObj == NULL) {
1,180✔
947
    code = TSDB_CODE_MND_XNODE_NOT_EXIST;
1,180✔
948
    goto _OVER;
1,180✔
949
  }
950

951
  // send request
NEW
952
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
NEW
953
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/xnode/drain/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
NEW
954
  postContent = tjsonCreateObject();
×
NEW
955
  if (postContent == NULL) {
×
NEW
956
    code = terrno;
×
NEW
957
    goto _OVER;
×
958
  }
NEW
959
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "xnode", pObj->url), NULL, _OVER);
×
NEW
960
  pContStr = tjsonToString(postContent);
×
NEW
961
  if (pContStr == NULL) {
×
NEW
962
    code = terrno;
×
NEW
963
    goto _OVER;
×
964
  }
NEW
965
  (void)mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, defaultTimeout, pContStr, strlen(pContStr));
×
966

NEW
967
  code = mndDrainXnode(pMnode, pReq, pObj);
×
NEW
968
  if (code == 0) {
×
NEW
969
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
NEW
970
    goto _OVER;
×
971
  }
972

973
_OVER:
1,180✔
974
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
1,180✔
975
    mError("xnode:%d, failed to drain since %s", drainReq.xnodeId, tstrerror(code));
1,180✔
976
  }
977

978
  if (postContent != NULL) {
1,180✔
NEW
979
    tjsonDelete(postContent);
×
980
  }
981
  if (pContStr != NULL) {
1,180✔
NEW
982
    taosMemoryFree(pContStr);
×
983
  }
984
  if (pJson != NULL) {
1,180✔
NEW
985
    tjsonDelete(pJson);
×
986
  }
987
  mndReleaseXnode(pMnode, pObj);
1,180✔
988
  tFreeSMDrainXnodeReq(&drainReq);
1,180✔
989
  TAOS_RETURN(code);
1,180✔
990
}
991

992
static int32_t mndRetrieveXnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
700✔
993
  SMnode    *pMnode = pReq->info.node;
700✔
994
  SSdb      *pSdb = pMnode->pSdb;
700✔
995
  int32_t    numOfRows = 0;
700✔
996
  int32_t    cols = 0;
700✔
997
  SXnodeObj *pObj = NULL;
700✔
998
  char       buf[TSDB_XNODE_URL_LEN + VARSTR_HEADER_SIZE] = {0};
700✔
999
  char       status[TSDB_XNODE_STATUS_LEN] = {0};
700✔
1000
  int32_t    code = 0;
700✔
1001
  mDebug("show.type:%d, %s:%d: retrieve xnodes with rows: %d", pShow->type, __FILE__, __LINE__, rows);
700✔
1002

1003
  while (numOfRows < rows) {
818✔
1004
    pShow->pIter = sdbFetch(pSdb, SDB_XNODE, pShow->pIter, (void **)&pObj);
818✔
1005
    if (pShow->pIter == NULL) break;
818✔
1006

1007
    cols = 0;
118✔
1008
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
118✔
1009
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
118✔
1010
    if (code != 0) goto _end;
118✔
1011

1012
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->url, pShow->pMeta->pSchemas[cols].bytes);
118✔
1013
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
118✔
1014
    code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
118✔
1015
    if (code != 0) goto _end;
118✔
1016

1017
    if (mndGetXnodeStatus(pObj, status, TSDB_XNODE_STATUS_LEN) == 0) {
118✔
NEW
1018
      STR_TO_VARSTR(buf, status);
×
1019
    } else {
1020
      mDebug("xnode:%d, status request err: %s", pObj->id, tstrerror(terrno));
118✔
1021
      STR_TO_VARSTR(buf, "offline");
118✔
1022
    }
1023
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
118✔
1024
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
118✔
1025
    if (code != 0) goto _end;
118✔
1026

1027
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
118✔
1028
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createTime, false);
118✔
1029
    if (code != 0) goto _end;
118✔
1030

1031
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
118✔
1032
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->updateTime, false);
118✔
1033
    if (code != 0) goto _end;
118✔
1034

1035
    numOfRows++;
118✔
1036
    sdbRelease(pSdb, pObj);
118✔
1037
  }
1038

1039
_end:
700✔
1040
  if (code != 0) sdbRelease(pSdb, pObj);
700✔
1041

1042
  pShow->numOfRows += numOfRows;
700✔
1043
  return numOfRows;
700✔
1044
}
1045

NEW
1046
static void mndCancelGetNextXnode(SMnode *pMnode, void *pIter) {
×
NEW
1047
  SSdb *pSdb = pMnode->pSdb;
×
NEW
1048
  sdbCancelFetchByType(pSdb, pIter, SDB_XNODE);
×
NEW
1049
}
×
1050

1051
/** xnode task section **/
1052

NEW
1053
static SXnodeTaskObj *mndAcquireXnodeTaskById(SMnode *pMnode, int32_t tid) {
×
NEW
1054
  SSdb *pSdb = pMnode->pSdb;
×
1055

NEW
1056
  void *pIter = NULL;
×
NEW
1057
  while (1) {
×
NEW
1058
    SXnodeTaskObj *pTask = NULL;
×
NEW
1059
    pIter = sdbFetch(pSdb, SDB_XNODE_TASK, pIter, (void **)&pTask);
×
NEW
1060
    if (pIter == NULL) break;
×
1061

NEW
1062
    if (pTask->id == tid) {
×
NEW
1063
      sdbCancelFetch(pSdb, pIter);
×
NEW
1064
      return pTask;
×
1065
    }
1066

NEW
1067
    sdbRelease(pSdb, pTask);
×
1068
  }
1069

NEW
1070
  mError("xnode task:%d, not found", tid);
×
NEW
1071
  terrno = TSDB_CODE_MND_XNODE_TASK_NOT_EXIST;
×
NEW
1072
  return NULL;
×
1073
}
NEW
1074
static SXnodeTaskObj *mndAcquireXnodeTaskByName(SMnode *pMnode, const char *name) {
×
NEW
1075
  SSdb *pSdb = pMnode->pSdb;
×
1076

NEW
1077
  void *pIter = NULL;
×
NEW
1078
  while (1) {
×
NEW
1079
    SXnodeTaskObj *pTask = NULL;
×
NEW
1080
    pIter = sdbFetch(pSdb, SDB_XNODE_TASK, pIter, (void **)&pTask);
×
NEW
1081
    if (pIter == NULL) break;
×
NEW
1082
    if (pTask->name == NULL) {
×
NEW
1083
      continue;
×
1084
    }
1085

NEW
1086
    if (strcasecmp(name, pTask->name) == 0) {
×
NEW
1087
      sdbCancelFetch(pSdb, pIter);
×
NEW
1088
      return pTask;
×
1089
    }
1090

NEW
1091
    sdbRelease(pSdb, pTask);
×
1092
  }
1093

NEW
1094
  mError("xnode task:%s, not found", name);
×
NEW
1095
  terrno = TSDB_CODE_MND_XNODE_TASK_NOT_EXIST;
×
NEW
1096
  return NULL;
×
1097
}
1098

NEW
1099
static void mndFreeXnodeTask(SXnodeTaskObj *pObj) {
×
NEW
1100
  taosMemoryFreeClear(pObj->name);
×
NEW
1101
  taosMemoryFreeClear(pObj->sourceDsn);
×
NEW
1102
  taosMemoryFreeClear(pObj->sinkDsn);
×
NEW
1103
  taosMemoryFreeClear(pObj->parser);
×
NEW
1104
  taosMemoryFreeClear(pObj->reason);
×
NEW
1105
  taosMemoryFreeClear(pObj->status);
×
NEW
1106
}
×
1107

NEW
1108
SSdbRaw *mndXnodeTaskActionEncode(SXnodeTaskObj *pObj) {
×
NEW
1109
  int32_t code = 0;
×
NEW
1110
  int32_t lino = 0;
×
NEW
1111
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
1112
  if (NULL == pObj) {
×
NEW
1113
    terrno = TSDB_CODE_INVALID_PARA;
×
NEW
1114
    return NULL;
×
1115
  }
1116

NEW
1117
  int32_t totalStrLen =
×
NEW
1118
      pObj->nameLen + pObj->sourceDsnLen + pObj->sinkDsnLen + pObj->parserLen + pObj->reasonLen + pObj->statusLen;
×
NEW
1119
  int32_t rawDataLen = sizeof(SXnodeTaskObj) + TSDB_XNODE_RESERVE_SIZE + totalStrLen;
×
1120

NEW
1121
  SSdbRaw *pRaw = sdbAllocRaw(SDB_XNODE_TASK, TSDB_XNODE_VER_NUMBER, rawDataLen);
×
NEW
1122
  if (pRaw == NULL) goto _OVER;
×
1123

NEW
1124
  int32_t dataPos = 0;
×
NEW
1125
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
×
NEW
1126
  SDB_SET_INT64(pRaw, dataPos, pObj->createTime, _OVER)
×
NEW
1127
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
×
NEW
1128
  SDB_SET_INT32(pRaw, dataPos, pObj->statusLen, _OVER)
×
NEW
1129
  SDB_SET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
×
NEW
1130
  SDB_SET_INT32(pRaw, dataPos, pObj->via, _OVER)
×
NEW
1131
  SDB_SET_INT32(pRaw, dataPos, pObj->xnodeId, _OVER)
×
NEW
1132
  SDB_SET_INT32(pRaw, dataPos, pObj->nameLen, _OVER)
×
NEW
1133
  SDB_SET_BINARY(pRaw, dataPos, pObj->name, pObj->nameLen, _OVER)
×
NEW
1134
  SDB_SET_INT32(pRaw, dataPos, pObj->sourceType, _OVER)
×
NEW
1135
  SDB_SET_INT32(pRaw, dataPos, pObj->sourceDsnLen, _OVER)
×
NEW
1136
  SDB_SET_BINARY(pRaw, dataPos, pObj->sourceDsn, pObj->sourceDsnLen, _OVER)
×
NEW
1137
  SDB_SET_INT32(pRaw, dataPos, pObj->sinkType, _OVER)
×
NEW
1138
  SDB_SET_INT32(pRaw, dataPos, pObj->sinkDsnLen, _OVER)
×
NEW
1139
  SDB_SET_BINARY(pRaw, dataPos, pObj->sinkDsn, pObj->sinkDsnLen, _OVER)
×
NEW
1140
  SDB_SET_INT32(pRaw, dataPos, pObj->parserLen, _OVER)
×
NEW
1141
  SDB_SET_BINARY(pRaw, dataPos, pObj->parser, pObj->parserLen, _OVER)
×
NEW
1142
  SDB_SET_INT32(pRaw, dataPos, pObj->reasonLen, _OVER)
×
NEW
1143
  SDB_SET_BINARY(pRaw, dataPos, pObj->reason, pObj->reasonLen, _OVER)
×
1144

NEW
1145
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
×
1146

NEW
1147
  terrno = 0;
×
1148

NEW
1149
_OVER:
×
NEW
1150
  if (terrno != 0) {
×
NEW
1151
    mError("xnode task:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
NEW
1152
    sdbFreeRaw(pRaw);
×
NEW
1153
    return NULL;
×
1154
  }
1155

NEW
1156
  mTrace("xnode task:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
×
NEW
1157
  return pRaw;
×
1158
}
1159

NEW
1160
SSdbRow *mndXnodeTaskActionDecode(SSdbRaw *pRaw) {
×
NEW
1161
  int32_t code = 0;
×
NEW
1162
  int32_t lino = 0;
×
NEW
1163
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
1164
  SSdbRow       *pRow = NULL;
×
NEW
1165
  SXnodeTaskObj *pObj = NULL;
×
1166

NEW
1167
  if (NULL == pRaw) {
×
NEW
1168
    terrno = TSDB_CODE_INVALID_PARA;
×
NEW
1169
    return NULL;
×
1170
  }
1171

NEW
1172
  int8_t sver = 0;
×
NEW
1173
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
1174

NEW
1175
  if (sver != TSDB_XNODE_VER_NUMBER) {
×
NEW
1176
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
NEW
1177
    goto _OVER;
×
1178
  }
1179

NEW
1180
  pRow = sdbAllocRow(sizeof(SXnodeTaskObj));
×
NEW
1181
  if (pRow == NULL) goto _OVER;
×
1182

NEW
1183
  pObj = sdbGetRowObj(pRow);
×
NEW
1184
  if (pObj == NULL) goto _OVER;
×
1185

NEW
1186
  int32_t dataPos = 0;
×
NEW
1187
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
×
NEW
1188
  SDB_GET_INT64(pRaw, dataPos, &pObj->createTime, _OVER)
×
NEW
1189
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
×
NEW
1190
  SDB_GET_INT32(pRaw, dataPos, &pObj->statusLen, _OVER)
×
NEW
1191
  if (pObj->statusLen > 0) {
×
NEW
1192
    pObj->status = taosMemoryCalloc(pObj->statusLen + 1, 1);
×
NEW
1193
    if (pObj->status == NULL) goto _OVER;
×
NEW
1194
    SDB_GET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
×
1195
  }
1196

NEW
1197
  SDB_GET_INT32(pRaw, dataPos, &pObj->via, _OVER)
×
NEW
1198
  SDB_GET_INT32(pRaw, dataPos, &pObj->xnodeId, _OVER)
×
1199

NEW
1200
  SDB_GET_INT32(pRaw, dataPos, &pObj->nameLen, _OVER)
×
NEW
1201
  if (pObj->nameLen > 0) {
×
NEW
1202
    pObj->name = taosMemoryCalloc(pObj->nameLen + 1, 1);
×
NEW
1203
    if (pObj->name == NULL) goto _OVER;
×
NEW
1204
    SDB_GET_BINARY(pRaw, dataPos, pObj->name, pObj->nameLen, _OVER)
×
1205
  }
1206

NEW
1207
  SDB_GET_INT32(pRaw, dataPos, &pObj->sourceType, _OVER)
×
NEW
1208
  SDB_GET_INT32(pRaw, dataPos, &pObj->sourceDsnLen, _OVER)
×
NEW
1209
  if (pObj->sourceDsnLen > 0) {
×
NEW
1210
    pObj->sourceDsn = taosMemoryCalloc(pObj->sourceDsnLen + 1, 1);
×
NEW
1211
    if (pObj->sourceDsn == NULL) goto _OVER;
×
NEW
1212
    SDB_GET_BINARY(pRaw, dataPos, pObj->sourceDsn, pObj->sourceDsnLen, _OVER)
×
1213
  }
1214

NEW
1215
  SDB_GET_INT32(pRaw, dataPos, &pObj->sinkType, _OVER)
×
NEW
1216
  SDB_GET_INT32(pRaw, dataPos, &pObj->sinkDsnLen, _OVER)
×
NEW
1217
  if (pObj->sinkDsnLen > 0) {
×
NEW
1218
    pObj->sinkDsn = taosMemoryCalloc(pObj->sinkDsnLen + 1, 1);
×
NEW
1219
    if (pObj->sinkDsn == NULL) goto _OVER;
×
NEW
1220
    SDB_GET_BINARY(pRaw, dataPos, pObj->sinkDsn, pObj->sinkDsnLen, _OVER)
×
1221
  }
1222

NEW
1223
  SDB_GET_INT32(pRaw, dataPos, &pObj->parserLen, _OVER)
×
NEW
1224
  if (pObj->parserLen > 0) {
×
NEW
1225
    pObj->parser = taosMemoryCalloc(pObj->parserLen + 1, 1);
×
NEW
1226
    if (pObj->parser == NULL) goto _OVER;
×
NEW
1227
    SDB_GET_BINARY(pRaw, dataPos, pObj->parser, pObj->parserLen, _OVER)
×
1228
  }
1229

NEW
1230
  SDB_GET_INT32(pRaw, dataPos, &pObj->reasonLen, _OVER)
×
NEW
1231
  if (pObj->reasonLen > 0) {
×
NEW
1232
    pObj->reason = taosMemoryCalloc(pObj->reasonLen + 1, 1);
×
NEW
1233
    if (pObj->reason == NULL) goto _OVER;
×
NEW
1234
    SDB_GET_BINARY(pRaw, dataPos, pObj->reason, pObj->reasonLen, _OVER)
×
1235
  }
1236

NEW
1237
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
×
1238

NEW
1239
  terrno = 0;
×
1240

NEW
1241
_OVER:
×
NEW
1242
  if (terrno != 0) {
×
NEW
1243
    mError("xnode task:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
NEW
1244
    if (pObj != NULL) {
×
NEW
1245
      taosMemoryFreeClear(pObj->name);
×
NEW
1246
      taosMemoryFreeClear(pObj->sourceDsn);
×
NEW
1247
      taosMemoryFreeClear(pObj->sinkDsn);
×
NEW
1248
      taosMemoryFreeClear(pObj->parser);
×
NEW
1249
      taosMemoryFreeClear(pObj->reason);
×
NEW
1250
      taosMemoryFreeClear(pObj->status);
×
1251
    }
NEW
1252
    taosMemoryFreeClear(pRow);
×
NEW
1253
    return NULL;
×
1254
  }
1255

NEW
1256
  mTrace("xnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
×
NEW
1257
  return pRow;
×
1258
}
1259

NEW
1260
int32_t mndXnodeTaskActionInsert(SSdb *pSdb, SXnodeTaskObj *pObj) {
×
NEW
1261
  mDebug("xtask:%d, perform insert action, row:%p", pObj->id, pObj);
×
NEW
1262
  return 0;
×
1263
}
1264

NEW
1265
int32_t mndXnodeTaskActionDelete(SSdb *pSdb, SXnodeTaskObj *pObj) {
×
NEW
1266
  mDebug("xtask:%d, perform delete action, row:%p", pObj->id, pObj);
×
NEW
1267
  mndFreeXnodeTask(pObj);
×
NEW
1268
  return 0;
×
1269
}
1270

NEW
1271
int32_t mndXnodeTaskActionUpdate(SSdb *pSdb, SXnodeTaskObj *pOld, SXnodeTaskObj *pNew) {
×
NEW
1272
  mDebug("xtask:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
×
1273

NEW
1274
  taosWLockLatch(&pOld->lock);
×
NEW
1275
  pOld->via = pNew->via;
×
NEW
1276
  pOld->xnodeId = pNew->xnodeId;
×
NEW
1277
  swapFields(&pNew->statusLen, &pNew->status, &pOld->statusLen, &pOld->status);
×
NEW
1278
  swapFields(&pNew->nameLen, &pNew->name, &pOld->nameLen, &pOld->name);
×
NEW
1279
  swapFields(&pNew->sourceDsnLen, &pNew->sourceDsn, &pOld->sourceDsnLen, &pOld->sourceDsn);
×
NEW
1280
  swapFields(&pNew->sinkDsnLen, &pNew->sinkDsn, &pOld->sinkDsnLen, &pOld->sinkDsn);
×
NEW
1281
  swapFields(&pNew->parserLen, &pNew->parser, &pOld->parserLen, &pOld->parser);
×
NEW
1282
  swapFields(&pNew->reasonLen, &pNew->reason, &pOld->reasonLen, &pOld->reason);
×
NEW
1283
  if (pNew->updateTime > pOld->updateTime) {
×
NEW
1284
    pOld->updateTime = pNew->updateTime;
×
1285
  }
NEW
1286
  taosWUnLockLatch(&pOld->lock);
×
NEW
1287
  return 0;
×
1288
}
1289

NEW
1290
static int32_t mndSetCreateXnodeTaskRedoLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
NEW
1291
  int32_t  code = 0;
×
NEW
1292
  SSdbRaw *pRedoRaw = mndXnodeTaskActionEncode(pObj);
×
NEW
1293
  if (pRedoRaw == NULL) {
×
NEW
1294
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
1295
    if (terrno != 0) code = terrno;
×
NEW
1296
    TAOS_RETURN(code);
×
1297
  }
NEW
1298
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
NEW
1299
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
NEW
1300
  TAOS_RETURN(code);
×
1301
}
1302

NEW
1303
static int32_t mndSetCreateXnodeTaskUndoLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
NEW
1304
  int32_t  code = 0;
×
NEW
1305
  SSdbRaw *pUndoRaw = mndXnodeTaskActionEncode(pObj);
×
NEW
1306
  if (pUndoRaw == NULL) {
×
NEW
1307
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
1308
    if (terrno != 0) code = terrno;
×
NEW
1309
    TAOS_RETURN(code);
×
1310
  }
NEW
1311
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
NEW
1312
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
NEW
1313
  TAOS_RETURN(code);
×
1314
}
1315

NEW
1316
static int32_t mndSetCreateXnodeTaskCommitLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
NEW
1317
  int32_t  code = 0;
×
NEW
1318
  SSdbRaw *pCommitRaw = mndXnodeTaskActionEncode(pObj);
×
NEW
1319
  if (pCommitRaw == NULL) {
×
NEW
1320
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
1321
    if (terrno != 0) code = terrno;
×
NEW
1322
    TAOS_RETURN(code);
×
1323
  }
NEW
1324
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
NEW
1325
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
NEW
1326
  TAOS_RETURN(code);
×
1327
}
NEW
1328
void mndReleaseXnodeTask(SMnode *pMnode, SXnodeTaskObj *pObj) {
×
NEW
1329
  SSdb *pSdb = pMnode->pSdb;
×
NEW
1330
  sdbRelease(pSdb, pObj);
×
NEW
1331
}
×
1332

NEW
1333
static const char *getXTaskOptionByName(xTaskOptions *pOptions, const char *name) {
×
NEW
1334
  if (pOptions == NULL || name == NULL) return NULL;
×
NEW
1335
  for (int32_t i = 0; i < pOptions->optionsNum; ++i) {
×
NEW
1336
    CowStr option = pOptions->options[i];
×
NEW
1337
    if (option.ptr != NULL && strncasecmp(option.ptr, name, strlen(name)) == 0 && option.ptr[strlen(name)] == '=') {
×
NEW
1338
      return option.ptr + strlen(name) + 1;
×
1339
    }
1340
  }
NEW
1341
  return NULL;
×
1342
}
1343

NEW
1344
static int32_t mndCreateXnodeTask(SMnode *pMnode, SRpcMsg *pReq, SMCreateXnodeTaskReq *pCreate) {
×
NEW
1345
  int32_t code = -1;
×
NEW
1346
  STrans *pTrans = NULL;
×
1347

NEW
1348
  SXnodeTaskObj xnodeObj = {0};
×
NEW
1349
  xnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_XNODE_TASK);
×
NEW
1350
  xnodeObj.createTime = taosGetTimestampMs();
×
NEW
1351
  xnodeObj.updateTime = xnodeObj.createTime;
×
NEW
1352
  xnodeObj.via = pCreate->options.via;
×
NEW
1353
  xnodeObj.xnodeId = pCreate->xnodeId;
×
1354

NEW
1355
  xnodeObj.nameLen = pCreate->name.len;
×
NEW
1356
  xnodeObj.name = taosMemoryCalloc(1, pCreate->name.len);
×
NEW
1357
  if (xnodeObj.name == NULL) goto _OVER;
×
NEW
1358
  (void)memcpy(xnodeObj.name, pCreate->name.ptr, pCreate->name.len);
×
1359

NEW
1360
  xnodeObj.sourceType = pCreate->source.type;
×
NEW
1361
  xnodeObj.sourceDsnLen = pCreate->source.cstr.len;
×
NEW
1362
  xnodeObj.sourceDsn = taosMemoryCalloc(1, pCreate->source.cstr.len);
×
NEW
1363
  if (xnodeObj.sourceDsn == NULL) goto _OVER;
×
NEW
1364
  (void)memcpy(xnodeObj.sourceDsn, pCreate->source.cstr.ptr, pCreate->source.cstr.len);
×
1365

NEW
1366
  xnodeObj.sinkType = pCreate->sink.type;
×
NEW
1367
  xnodeObj.sinkDsnLen = pCreate->sink.cstr.len;
×
NEW
1368
  xnodeObj.sinkDsn = taosMemoryCalloc(1, pCreate->sink.cstr.len);
×
NEW
1369
  if (xnodeObj.sinkDsn == NULL) goto _OVER;
×
NEW
1370
  (void)memcpy(xnodeObj.sinkDsn, pCreate->sink.cstr.ptr, pCreate->sink.cstr.len);
×
1371

NEW
1372
  xnodeObj.parserLen = pCreate->options.parser.len;
×
NEW
1373
  if (xnodeObj.parserLen > 0 && pCreate->options.parser.ptr != NULL) {
×
NEW
1374
    xnodeObj.parser = taosMemoryCalloc(1, xnodeObj.parserLen);
×
NEW
1375
    if (xnodeObj.parser == NULL) goto _OVER;
×
NEW
1376
    (void)memcpy(xnodeObj.parser, pCreate->options.parser.ptr, xnodeObj.parserLen);
×
1377
  }
1378

NEW
1379
  const char *status = getXTaskOptionByName(&pCreate->options, "status");
×
NEW
1380
  if (status != NULL) {
×
NEW
1381
    xnodeObj.statusLen = strlen(status) + 1;
×
NEW
1382
    xnodeObj.status = taosMemoryCalloc(1, xnodeObj.statusLen);
×
NEW
1383
    if (xnodeObj.status == NULL) goto _OVER;
×
NEW
1384
    (void)memcpy(xnodeObj.status, status, xnodeObj.statusLen);
×
1385
  }
1386

NEW
1387
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-xnode-task");
×
NEW
1388
  if (pTrans == NULL) {
×
NEW
1389
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
1390
    if (terrno != 0) {
×
NEW
1391
      code = terrno;
×
1392
    }
NEW
1393
    mError("failed to create transaction for xnode-task:%s, code:0x%x:%s", pCreate->name.ptr, code, tstrerror(code));
×
NEW
1394
    goto _OVER;
×
1395
  }
NEW
1396
  mndTransSetSerial(pTrans);
×
1397

NEW
1398
  mDebug("trans:%d, used to create xnode task:%s as task:%d", pTrans->id, pCreate->name.ptr, xnodeObj.id);
×
1399

NEW
1400
  TAOS_CHECK_GOTO(mndSetCreateXnodeTaskRedoLogs(pTrans, &xnodeObj), NULL, _OVER);
×
NEW
1401
  TAOS_CHECK_GOTO(mndSetCreateXnodeTaskUndoLogs(pTrans, &xnodeObj), NULL, _OVER);
×
NEW
1402
  TAOS_CHECK_GOTO(mndSetCreateXnodeTaskCommitLogs(pTrans, &xnodeObj), NULL, _OVER);
×
NEW
1403
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
1404

NEW
1405
  code = 0;
×
1406

NEW
1407
_OVER:
×
NEW
1408
  mndFreeXnodeTask(&xnodeObj);
×
NEW
1409
  mndTransDrop(pTrans);
×
NEW
1410
  TAOS_RETURN(code);
×
1411
}
1412

1413
// Helper function to validate grant and permissions
NEW
1414
static int32_t mndValidateXnodeTaskPermissions(SMnode *pMnode, SRpcMsg *pReq) {
×
NEW
1415
  int32_t code = grantCheck(TSDB_GRANT_XNODE);
×
NEW
1416
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
1417
    mError("failed to create xnode, code:%s", tstrerror(code));
×
NEW
1418
    return code;
×
1419
  }
1420

NEW
1421
  return mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_CREATE_XNODE);
×
1422
}
1423

1424
// Helper function to parse and validate the request
NEW
1425
static int32_t mndValidateCreateXnodeTaskReq(SRpcMsg *pReq, SMCreateXnodeTaskReq *pCreateReq) {
×
NEW
1426
  int32_t code = 0;
×
NEW
1427
  SJson  *pJson = NULL;
×
NEW
1428
  SJson  *postContent = NULL;
×
NEW
1429
  char   *srcDsn = NULL;
×
NEW
1430
  char   *sinkDsn = NULL;
×
NEW
1431
  char   *parser = NULL;
×
NEW
1432
  char   *pContStr = NULL;
×
1433

1434
  // from, to, parser check
NEW
1435
  char xnodeUrl[TSDB_XNODE_URL_LEN] = {0};
×
NEW
1436
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/task/check", XNODED_PIPE_SOCKET_URL);
×
NEW
1437
  postContent = tjsonCreateObject();
×
NEW
1438
  if (postContent == NULL) {
×
NEW
1439
    code = terrno;
×
NEW
1440
    goto _OVER;
×
1441
  }
NEW
1442
  srcDsn = taosStrndupi(pCreateReq->source.cstr.ptr, (int64_t)pCreateReq->source.cstr.len);
×
NEW
1443
  if (srcDsn == NULL) {
×
NEW
1444
    code = terrno;
×
NEW
1445
    goto _OVER;
×
1446
  }
NEW
1447
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "from", srcDsn), NULL, _OVER);
×
1448

NEW
1449
  sinkDsn = taosStrndupi(pCreateReq->sink.cstr.ptr, (int64_t)pCreateReq->sink.cstr.len);
×
NEW
1450
  if (sinkDsn == NULL) {
×
NEW
1451
    code = terrno;
×
NEW
1452
    goto _OVER;
×
1453
  }
NEW
1454
  TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "to", sinkDsn), NULL, _OVER);
×
1455

NEW
1456
  if (pCreateReq->options.parser.len > 0 && pCreateReq->options.parser.ptr != NULL) {
×
NEW
1457
    parser = taosStrndupi(pCreateReq->options.parser.ptr, (int64_t)pCreateReq->options.parser.len);
×
NEW
1458
    if (parser == NULL) {
×
NEW
1459
      code = terrno;
×
NEW
1460
      goto _OVER;
×
1461
    }
NEW
1462
    TAOS_CHECK_GOTO(tjsonAddStringToObject(postContent, "parser", parser), NULL, _OVER);
×
1463
  }
1464

NEW
1465
  if (pCreateReq->xnodeId > 0) {
×
NEW
1466
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(postContent, "xnodeId", (double)pCreateReq->xnodeId), NULL, _OVER);
×
1467
  }
1468

NEW
1469
  pContStr = tjsonToUnformattedString(postContent);
×
NEW
1470
  if (pContStr == NULL) {
×
NEW
1471
    code = terrno;
×
NEW
1472
    goto _OVER;
×
1473
  }
1474

NEW
1475
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, 60000, pContStr, strlen(pContStr));
×
NEW
1476
  if (pJson == NULL) {
×
NEW
1477
    code = terrno;
×
NEW
1478
    goto _OVER;
×
1479
  }
1480

NEW
1481
_OVER:
×
NEW
1482
  if (srcDsn != NULL) taosMemoryFreeClear(srcDsn);
×
NEW
1483
  if (sinkDsn != NULL) taosMemoryFreeClear(sinkDsn);
×
NEW
1484
  if (parser != NULL) taosMemoryFreeClear(parser);
×
NEW
1485
  if (pContStr != NULL) taosMemoryFreeClear(pContStr);
×
NEW
1486
  if (postContent != NULL) tjsonDelete(postContent);
×
NEW
1487
  if (pJson != NULL) tjsonDelete(pJson);
×
1488

NEW
1489
  TAOS_RETURN(code);
×
1490
}
1491

1492
// Helper function to check if xnode task already exists
NEW
1493
static int32_t mndCheckXnodeTaskExists(SMnode *pMnode, const char *name) {
×
NEW
1494
  SXnodeTaskObj *pObj = mndAcquireXnodeTaskByName(pMnode, name);
×
NEW
1495
  if (pObj != NULL) {
×
NEW
1496
    mError("xnode task:%s already exists", name);
×
NEW
1497
    return TSDB_CODE_MND_XNODE_TASK_ALREADY_EXIST;
×
1498
  }
NEW
1499
  return TSDB_CODE_SUCCESS;
×
1500
}
1501

1502
// Helper function to handle the creation result
NEW
1503
static int32_t mndHandleCreateXnodeTaskResult(int32_t createCode) {
×
NEW
1504
  if (createCode == 0) {
×
NEW
1505
    return TSDB_CODE_ACTION_IN_PROGRESS;
×
1506
  }
NEW
1507
  return createCode;
×
1508
}
1509

NEW
1510
static int32_t mndProcessCreateXnodeTaskReq(SRpcMsg *pReq) {
×
NEW
1511
  mDebug("xnode create task request received, contLen:%d\n", pReq->contLen);
×
NEW
1512
  SMnode              *pMnode = pReq->info.node;
×
NEW
1513
  int32_t              code = -1;
×
NEW
1514
  SXnodeTaskObj       *pObj = NULL;
×
NEW
1515
  SMCreateXnodeTaskReq createReq = {0};
×
1516

1517
  // Step 1: Validate permissions
NEW
1518
  code = mndValidateXnodeTaskPermissions(pMnode, pReq);
×
NEW
1519
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
1520
    goto _OVER;
×
1521
  }
1522

NEW
1523
  code = tDeserializeSMCreateXnodeTaskReq(pReq->pCont, pReq->contLen, &createReq);
×
NEW
1524
  if (code != 0) {
×
NEW
1525
    mError("failed to deserialize create xnode task request, code:%s", tstrerror(code));
×
NEW
1526
    TAOS_RETURN(code);
×
1527
  }
1528

1529
  // Step 2: Check if task already exists
NEW
1530
  TAOS_CHECK_GOTO(mndCheckXnodeTaskExists(pMnode, createReq.name.ptr), NULL, _OVER);
×
1531

1532
  // Step 3: Parse and validate request
NEW
1533
  TAOS_CHECK_GOTO(mndValidateCreateXnodeTaskReq(pReq, &createReq), NULL, _OVER);
×
1534

1535
  // Step 4: Create the xnode task
NEW
1536
  TAOS_CHECK_GOTO(mndCreateXnodeTask(pMnode, pReq, &createReq), NULL, _OVER);
×
NEW
1537
  TAOS_CHECK_GOTO(mndHandleCreateXnodeTaskResult(code), NULL, _OVER);
×
1538

NEW
1539
_OVER:
×
NEW
1540
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
1541
    mError("xnode task:%s, failed to create since %s", createReq.name.ptr ? createReq.name.ptr : "unknown",
×
1542
           tstrerror(code));
1543
  }
1544

NEW
1545
  mndReleaseXnodeTask(pMnode, pObj);
×
NEW
1546
  tFreeSMCreateXnodeTaskReq(&createReq);
×
NEW
1547
  TAOS_RETURN(code);
×
1548
}
1549

NEW
1550
static int32_t httpStartXnodeTask(SXnodeTaskObj *pObj) {
×
NEW
1551
  int32_t code = 0;
×
1552
  struct {
1553
    char   xnodeUrl[TSDB_XNODE_URL_LEN + 1];
1554
    SJson *postContent;
1555
    SJson *pJson;
1556
    char  *pContStr;
1557
    char  *srcDsn;
1558
    char  *sinkDsn;
1559
    char  *parser;
NEW
1560
  } req = {0};
×
1561

NEW
1562
  snprintf(req.xnodeUrl, TSDB_XNODE_URL_LEN, "%s/task/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
NEW
1563
  req.postContent = tjsonCreateObject();
×
NEW
1564
  if (req.postContent == NULL) {
×
NEW
1565
    code = terrno;
×
NEW
1566
    goto _OVER;
×
1567
  }
NEW
1568
  req.srcDsn = taosStrndupi(pObj->sourceDsn, (int64_t)pObj->sourceDsnLen);
×
NEW
1569
  if (req.srcDsn == NULL) {
×
NEW
1570
    code = terrno;
×
NEW
1571
    goto _OVER;
×
1572
  }
NEW
1573
  TAOS_CHECK_GOTO(tjsonAddStringToObject(req.postContent, "from", req.srcDsn), NULL, _OVER);
×
1574

NEW
1575
  req.sinkDsn = taosStrndupi(pObj->sinkDsn, (int64_t)pObj->sinkDsnLen);
×
NEW
1576
  if (req.sinkDsn == NULL) {
×
NEW
1577
    code = terrno;
×
NEW
1578
    goto _OVER;
×
1579
  }
NEW
1580
  TAOS_CHECK_GOTO(tjsonAddStringToObject(req.postContent, "to", req.sinkDsn), NULL, _OVER);
×
1581

NEW
1582
  if (pObj->parserLen > 0) {
×
NEW
1583
    req.parser = taosStrndupi(pObj->parser, (int64_t)pObj->parserLen);
×
NEW
1584
    if (req.parser == NULL) {
×
NEW
1585
      code = terrno;
×
NEW
1586
      goto _OVER;
×
1587
    }
NEW
1588
    TAOS_CHECK_GOTO(tjsonAddStringToObject(req.postContent, "parser", req.parser), NULL, _OVER);
×
1589
  }
1590

NEW
1591
  if (pObj->xnodeId > 0) {
×
NEW
1592
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(req.postContent, "xnodeId", (double)pObj->xnodeId), NULL, _OVER);
×
1593
  }
1594

NEW
1595
  req.pContStr = tjsonToUnformattedString(req.postContent);
×
NEW
1596
  if (req.pContStr == NULL) {
×
NEW
1597
    code = terrno;
×
NEW
1598
    goto _OVER;
×
1599
  }
NEW
1600
  mDebug("start xnode post content:%s", req.pContStr);
×
NEW
1601
  (void)mndSendReqRetJson(req.xnodeUrl, HTTP_TYPE_POST, defaultTimeout, req.pContStr, strlen(req.pContStr));
×
1602

NEW
1603
_OVER:
×
NEW
1604
  if (req.pContStr != NULL) taosMemoryFreeClear(req.pContStr);
×
NEW
1605
  if (req.postContent != NULL) tjsonDelete(req.postContent);
×
NEW
1606
  if (req.pJson != NULL) tjsonDelete(req.pJson);
×
NEW
1607
  if (req.srcDsn != NULL) taosMemoryFreeClear(req.srcDsn);
×
NEW
1608
  if (req.sinkDsn != NULL) taosMemoryFreeClear(req.sinkDsn);
×
NEW
1609
  if (req.parser != NULL) taosMemoryFreeClear(req.parser);
×
NEW
1610
  TAOS_RETURN(code);
×
1611
}
1612

NEW
1613
static int32_t mndProcessStartXnodeTaskReq(SRpcMsg *pReq) {
×
NEW
1614
  SMnode             *pMnode = pReq->info.node;
×
NEW
1615
  int32_t             code = -1;
×
NEW
1616
  SXnodeTaskObj      *pObj = NULL;
×
NEW
1617
  SMStartXnodeTaskReq startReq = {0};
×
NEW
1618
  SXnodeTaskObj      *pObjClone = NULL;
×
1619

NEW
1620
  TAOS_CHECK_GOTO(tDeserializeSMStartXnodeTaskReq(pReq->pCont, pReq->contLen, &startReq), NULL, _OVER);
×
1621

NEW
1622
  mDebug("xnode start xnode task with tid:%d", startReq.tid);
×
NEW
1623
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_START_XNODE_TASK), NULL, _OVER);
×
1624

NEW
1625
  if (startReq.tid <= 0 && (startReq.name.len <= 0 || startReq.name.ptr == NULL)) {
×
NEW
1626
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
NEW
1627
    goto _OVER;
×
1628
  }
1629

NEW
1630
  if (startReq.tid > 0) {
×
NEW
1631
    pObj = mndAcquireXnodeTask(pMnode, startReq.tid);
×
1632
  } else {
NEW
1633
    pObj = mndAcquireXnodeTaskByName(pMnode, startReq.name.ptr);
×
1634
  }
NEW
1635
  if (pObj == NULL) {
×
NEW
1636
    code = terrno;
×
NEW
1637
    goto _OVER;
×
1638
  }
NEW
1639
  (void)httpStartXnodeTask(pObj);
×
1640

NEW
1641
_OVER:
×
NEW
1642
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
1643
    mError("xnode task:%d, failed to start since %s", startReq.tid, tstrerror(code));
×
1644
  }
NEW
1645
  tFreeSMStartXnodeTaskReq(&startReq);
×
NEW
1646
  if (pObj != NULL) {
×
NEW
1647
    mndReleaseXnodeTask(pMnode, pObj);
×
1648
  }
NEW
1649
  if (pObjClone != NULL) {
×
NEW
1650
    mndFreeXnodeTask(pObjClone);
×
NEW
1651
    taosMemFree(pObjClone);
×
1652
  }
NEW
1653
  TAOS_RETURN(code);
×
1654
}
1655

NEW
1656
static int32_t mndProcessStopXnodeTaskReq(SRpcMsg *pReq) {
×
NEW
1657
  SMnode            *pMnode = pReq->info.node;
×
NEW
1658
  int32_t            code = -1;
×
NEW
1659
  SXnodeTaskObj     *pObj = NULL;
×
NEW
1660
  SMStopXnodeTaskReq stopReq = {0};
×
NEW
1661
  SJson             *pJson = NULL;
×
1662

NEW
1663
  TAOS_CHECK_GOTO(tDeserializeSMStopXnodeTaskReq(pReq->pCont, pReq->contLen, &stopReq), NULL, _OVER);
×
1664

NEW
1665
  mDebug("Stop xnode task with tid:%d", stopReq.tid);
×
NEW
1666
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_STOP_XNODE_TASK), NULL, _OVER);
×
NEW
1667
  if (stopReq.tid <= 0 && (stopReq.name.len <= 0 || stopReq.name.ptr == NULL)) {
×
NEW
1668
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
NEW
1669
    goto _OVER;
×
1670
  }
1671

NEW
1672
  if (stopReq.tid > 0) {
×
NEW
1673
    pObj = mndAcquireXnodeTask(pMnode, stopReq.tid);
×
1674
  } else {
NEW
1675
    pObj = mndAcquireXnodeTaskByName(pMnode, stopReq.name.ptr);
×
1676
  }
NEW
1677
  if (pObj == NULL) {
×
NEW
1678
    code = terrno;
×
NEW
1679
    goto _OVER;
×
1680
  }
1681

1682
  // send request
NEW
1683
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
NEW
1684
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/task/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
NEW
1685
  (void)mndSendReqRetJson(xnodeUrl, HTTP_TYPE_DELETE, defaultTimeout, NULL, 0);
×
1686

NEW
1687
_OVER:
×
NEW
1688
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
1689
    mError("xnode task:%d, failed to stop since %s", stopReq.tid, tstrerror(code));
×
1690
  }
NEW
1691
  if (pJson != NULL) {
×
NEW
1692
    tjsonDelete(pJson);
×
1693
  }
NEW
1694
  tFreeSMStopXnodeTaskReq(&stopReq);
×
NEW
1695
  TAOS_RETURN(code);
×
1696
}
1697

NEW
1698
static int32_t mndUpdateXnodeTask(SMnode *pMnode, SRpcMsg *pReq, const SXnodeTaskObj *pOld,
×
1699
                                  SMUpdateXnodeTaskReq *pUpdate) {
NEW
1700
  mDebug("xnode task:%d, start to update", pUpdate->tid);
×
NEW
1701
  int32_t      code = -1;
×
NEW
1702
  STrans       *pTrans = NULL;
×
NEW
1703
  SXnodeTaskObj taskObj = *pOld;
×
1704
  struct {
1705
    bool status;
1706
    bool name;
1707
    bool source;
1708
    bool sink;
1709
    bool parser;
1710
    bool reason;
NEW
1711
  } isChange = {0};
×
1712

NEW
1713
  if (pUpdate->via > 0) {
×
NEW
1714
    taskObj.via = pUpdate->via;
×
1715
  }
NEW
1716
  if (pUpdate->xnodeId > 0) {
×
NEW
1717
    taskObj.xnodeId = pUpdate->xnodeId;
×
1718
  }
NEW
1719
  if (pUpdate->status.len > 0) {
×
NEW
1720
    taskObj.statusLen = pUpdate->status.len;
×
NEW
1721
    taskObj.status = taosMemoryCalloc(1, taskObj.statusLen);
×
NEW
1722
    if (taskObj.status == NULL) {
×
NEW
1723
      code = terrno;
×
NEW
1724
      goto _OVER;
×
1725
    }
NEW
1726
    (void)memcpy(taskObj.status, pUpdate->status.ptr, taskObj.statusLen);
×
NEW
1727
    isChange.status = true;
×
1728
  }
NEW
1729
  if (pUpdate->updateName.len > 0) {
×
NEW
1730
    taskObj.nameLen = pUpdate->updateName.len;
×
NEW
1731
    taskObj.name = taosMemoryCalloc(1, pUpdate->updateName.len);
×
NEW
1732
    if (taskObj.name == NULL) {
×
NEW
1733
      code = terrno;
×
NEW
1734
      goto _OVER;
×
1735
    }
NEW
1736
    (void)memcpy(taskObj.name, pUpdate->updateName.ptr, pUpdate->updateName.len);
×
NEW
1737
    isChange.name = true;
×
1738
  }
NEW
1739
  if (pUpdate->source.cstr.len > 0) {
×
NEW
1740
    taskObj.sourceType = pUpdate->source.type;
×
NEW
1741
    taskObj.sourceDsnLen = pUpdate->source.cstr.len;
×
NEW
1742
    taskObj.sourceDsn = taosMemoryCalloc(1, pUpdate->source.cstr.len);
×
NEW
1743
    if (taskObj.sourceDsn == NULL) {
×
NEW
1744
      code = terrno;
×
NEW
1745
      goto _OVER;
×
1746
    }
NEW
1747
    (void)memcpy(taskObj.sourceDsn, pUpdate->source.cstr.ptr, pUpdate->source.cstr.len);
×
NEW
1748
    isChange.source = true;
×
1749
  }
NEW
1750
  if (pUpdate->sink.cstr.len > 0) {
×
NEW
1751
    taskObj.sinkType = pUpdate->sink.type;
×
NEW
1752
    taskObj.sinkDsnLen = pUpdate->sink.cstr.len;
×
NEW
1753
    taskObj.sinkDsn = taosMemoryCalloc(1, pUpdate->sink.cstr.len);
×
NEW
1754
    if (taskObj.sinkDsn == NULL) {
×
NEW
1755
      code = terrno;
×
NEW
1756
      goto _OVER;
×
1757
    }
NEW
1758
    (void)memcpy(taskObj.sinkDsn, pUpdate->sink.cstr.ptr, pUpdate->sink.cstr.len);
×
NEW
1759
    isChange.sink = true;
×
1760
  }
NEW
1761
  if (pUpdate->parser.len > 0) {
×
NEW
1762
    taskObj.parserLen = pUpdate->parser.len;
×
NEW
1763
    taskObj.parser = taosMemoryCalloc(1, pUpdate->parser.len);
×
NEW
1764
    if (taskObj.parser == NULL) {
×
NEW
1765
      code = terrno;
×
NEW
1766
      goto _OVER;
×
1767
    }
NEW
1768
    (void)memcpy(taskObj.parser, pUpdate->parser.ptr, pUpdate->parser.len);
×
NEW
1769
    isChange.parser = true;
×
1770
  }
NEW
1771
  if (pUpdate->reason.len > 0) {
×
NEW
1772
    taskObj.reasonLen = pUpdate->reason.len;
×
NEW
1773
    taskObj.reason = taosMemoryCalloc(1, pUpdate->reason.len);
×
NEW
1774
    if (taskObj.reason == NULL) {
×
NEW
1775
      code = terrno;
×
NEW
1776
      goto _OVER;
×
1777
    }
NEW
1778
    (void)memcpy(taskObj.reason, pUpdate->reason.ptr, pUpdate->reason.len);
×
NEW
1779
    isChange.reason = true;
×
1780
  }
NEW
1781
  taskObj.updateTime = taosGetTimestampMs();
×
1782

NEW
1783
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-xnode-task");
×
NEW
1784
  if (pTrans == NULL) {
×
NEW
1785
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
1786
    if (terrno != 0) code = terrno;
×
NEW
1787
    goto _OVER;
×
1788
  }
NEW
1789
  mInfo("trans:%d, used to update xnode task:%d", pTrans->id, taskObj.id);
×
1790

NEW
1791
  TAOS_CHECK_GOTO(mndSetCreateXnodeTaskCommitLogs(pTrans, &taskObj), NULL, _OVER);
×
NEW
1792
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
NEW
1793
  code = 0;
×
1794

NEW
1795
_OVER:
×
NEW
1796
  if (NULL != taskObj.name && isChange.name) {
×
NEW
1797
    taosMemoryFree(taskObj.name);
×
1798
  }
NEW
1799
  if (NULL != taskObj.status && isChange.status) {
×
NEW
1800
    taosMemoryFree(taskObj.status);
×
1801
  }
NEW
1802
  if (NULL != taskObj.sourceDsn && isChange.source) {
×
NEW
1803
    taosMemoryFree(taskObj.sourceDsn);
×
1804
  }
NEW
1805
  if (NULL != taskObj.sinkDsn && isChange.sink) {
×
NEW
1806
    taosMemoryFree(taskObj.sinkDsn);
×
1807
  }
NEW
1808
  if (NULL != taskObj.parser && isChange.parser) {
×
NEW
1809
    taosMemoryFree(taskObj.parser);
×
1810
  }
NEW
1811
  if (NULL != taskObj.reason && isChange.reason) {
×
NEW
1812
    taosMemoryFree(taskObj.reason);
×
1813
  }
NEW
1814
  mndTransDrop(pTrans);
×
NEW
1815
  TAOS_RETURN(code);
×
1816
}
1817

NEW
1818
static int32_t mndProcessUpdateXnodeTaskReq(SRpcMsg *pReq) {
×
NEW
1819
  SMnode             *pMnode = pReq->info.node;
×
NEW
1820
  int32_t             code = -1;
×
NEW
1821
  SXnodeTaskObj       *pObj = NULL;
×
NEW
1822
  SMUpdateXnodeTaskReq updateReq = {0};
×
1823

NEW
1824
  if ((code = grantCheck(TSDB_GRANT_TD_GPT)) != TSDB_CODE_SUCCESS) {
×
NEW
1825
    mError("failed to create xnode, code:%s", tstrerror(code));
×
NEW
1826
    goto _OVER;
×
1827
  }
1828

NEW
1829
  TAOS_CHECK_GOTO(tDeserializeSMUpdateXnodeTaskReq(pReq->pCont, pReq->contLen, &updateReq), NULL, _OVER);
×
1830

NEW
1831
  if (updateReq.tid > 0) {
×
NEW
1832
    pObj = mndAcquireXnodeTaskById(pMnode, updateReq.tid);
×
1833
  } else {
NEW
1834
    pObj = mndAcquireXnodeTaskByName(pMnode, updateReq.name.ptr);
×
1835
  }
NEW
1836
  if (pObj == NULL) {
×
NEW
1837
    code = terrno;
×
NEW
1838
    goto _OVER;
×
1839
  }
1840

NEW
1841
  code = mndUpdateXnodeTask(pMnode, pReq, pObj, &updateReq);
×
NEW
1842
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1843

NEW
1844
_OVER:
×
NEW
1845
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
1846
    mError("xnode task:%d, failed to update since %s", updateReq.tid, tstrerror(code));
×
1847
  }
1848

NEW
1849
  mndReleaseXnodeTask(pMnode, pObj);
×
NEW
1850
  tFreeSMUpdateXnodeTaskReq(&updateReq);
×
NEW
1851
  TAOS_RETURN(code);
×
1852
  return 0;
1853
}
1854

NEW
1855
SXnodeTaskObj *mndAcquireXnodeTask(SMnode *pMnode, int32_t tid) {
×
NEW
1856
  SXnodeTaskObj *pObj = sdbAcquire(pMnode->pSdb, SDB_XNODE_TASK, &tid);
×
NEW
1857
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
NEW
1858
    terrno = TSDB_CODE_MND_XNODE_TASK_NOT_EXIST;
×
1859
  }
NEW
1860
  return pObj;
×
1861
}
1862

NEW
1863
static int32_t mndSetDropXnodeTaskRedoLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
NEW
1864
  int32_t  code = 0;
×
NEW
1865
  SSdbRaw *pRedoRaw = mndXnodeTaskActionEncode(pObj);
×
NEW
1866
  if (pRedoRaw == NULL) {
×
NEW
1867
    code = terrno;
×
NEW
1868
    return code;
×
1869
  }
1870

NEW
1871
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
NEW
1872
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
1873

NEW
1874
  TAOS_RETURN(code);
×
1875
}
1876

NEW
1877
static int32_t mndSetDropXnodeTaskCommitLogs(STrans *pTrans, SXnodeTaskObj *pObj) {
×
NEW
1878
  int32_t  code = 0;
×
NEW
1879
  SSdbRaw *pCommitRaw = mndXnodeTaskActionEncode(pObj);
×
NEW
1880
  if (pCommitRaw == NULL) {
×
NEW
1881
    code = terrno;
×
NEW
1882
    return code;
×
1883
  }
1884

NEW
1885
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
NEW
1886
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
NEW
1887
  TAOS_RETURN(code);
×
1888
}
NEW
1889
static int32_t mndSetDropXnodeTaskInfoToTrans(SMnode *pMnode, STrans *pTrans, SXnodeTaskObj *pObj, bool force) {
×
NEW
1890
  if (pObj == NULL) {
×
NEW
1891
    return 0;
×
1892
  }
NEW
1893
  TAOS_CHECK_RETURN(mndSetDropXnodeTaskRedoLogs(pTrans, pObj));
×
NEW
1894
  TAOS_CHECK_RETURN(mndSetDropXnodeTaskCommitLogs(pTrans, pObj));
×
NEW
1895
  return 0;
×
1896
}
1897

NEW
1898
static int32_t mndDropXnodeTask(SMnode *pMnode, SRpcMsg *pReq, SXnodeTaskObj *pTask) {
×
NEW
1899
  int32_t code = 0;
×
NEW
1900
  int32_t lino = 0;
×
NEW
1901
  SArray *pArray = NULL;
×
1902

NEW
1903
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-xnode-task");
×
NEW
1904
  TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
×
1905

NEW
1906
  mndTransSetSerial(pTrans);
×
NEW
1907
  mDebug("trans:%d, to drop xnode:%d", pTrans->id, pTask->id);
×
1908

1909
  // delete relative jobs
1910
  // TAOS_CHECK_GOTO(mndAcquireXnodeJobsByTaskId(pMnode, pTask->id, &pArray), NULL, _OVER);
1911
  // for (int i = 0; i < pArray->size; i++) {
1912
  //   SXnodeJobObj *pJob = taosArrayGet(pArray, i);
1913
  //   if (pJob == NULL) continue;
1914
  //   mDebug("xnode drop xnode task %d trans:%d, to drop xnode job:%d", pTask->id, pTrans->id, pJob->id);
1915
  //   TAOS_CHECK_GOTO(mndSetDropXnodeJobInfoToTrans(pTrans, pJob, false), NULL, _OVER);
1916
  // }
1917

NEW
1918
  code = mndSetDropXnodeTaskInfoToTrans(pMnode, pTrans, pTask, false);
×
NEW
1919
  mndReleaseXnodeTask(pMnode, pTask);
×
1920

NEW
1921
  TSDB_CHECK_CODE(code, lino, _OVER);
×
1922

NEW
1923
  code = mndTransPrepare(pMnode, pTrans);
×
1924

NEW
1925
_OVER:
×
NEW
1926
  if (pArray != NULL) {
×
NEW
1927
    for (int i = 0; i < pArray->size; i++) {
×
NEW
1928
      SXnodeJobObj *pJob = taosArrayGet(pArray, i);
×
NEW
1929
      if (pJob == NULL) continue;
×
NEW
1930
      mndReleaseXnodeJob(pMnode, pJob);
×
1931
    }
1932
  }
NEW
1933
  mndTransDrop(pTrans);
×
NEW
1934
  TAOS_RETURN(code);
×
1935
}
1936

NEW
1937
static int32_t mndProcessDropXnodeTaskReq(SRpcMsg *pReq) {
×
NEW
1938
  SMnode            *pMnode = pReq->info.node;
×
NEW
1939
  int32_t            code = -1;
×
NEW
1940
  SXnodeTaskObj     *pObj = NULL;
×
NEW
1941
  SMDropXnodeTaskReq dropReq = {0};
×
NEW
1942
  SJson             *pJson = NULL;
×
1943

NEW
1944
  TAOS_CHECK_GOTO(tDeserializeSMDropXnodeTaskReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
1945

NEW
1946
  mDebug("DropXnodeTask with tid:%d, start to drop", dropReq.tid);
×
NEW
1947
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_DROP_XNODE_TASK), NULL, _OVER);
×
1948

NEW
1949
  if (dropReq.tid <= 0 && (dropReq.nameLen <= 0 || dropReq.name == NULL)) {
×
NEW
1950
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
NEW
1951
    goto _OVER;
×
1952
  }
1953

NEW
1954
  if (dropReq.nameLen > 0 && dropReq.name != NULL) {
×
NEW
1955
    pObj = mndAcquireXnodeTaskByName(pMnode, dropReq.name);
×
1956
  } else {
NEW
1957
    pObj = mndAcquireXnodeTask(pMnode, dropReq.tid);
×
1958
  }
NEW
1959
  if (pObj == NULL) {
×
NEW
1960
    code = terrno;
×
NEW
1961
    goto _OVER;
×
1962
  }
1963

1964
  // send request to drop xnode task
NEW
1965
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
NEW
1966
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/task/drop/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
×
NEW
1967
  (void)mndSendReqRetJson(xnodeUrl, HTTP_TYPE_DELETE, defaultTimeout, NULL, 0);
×
1968

NEW
1969
  code = mndDropXnodeTask(pMnode, pReq, pObj);
×
NEW
1970
  if (code == 0) {
×
NEW
1971
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
1972
  }
1973

NEW
1974
_OVER:
×
NEW
1975
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
1976
    mError("xnode task:%d, failed to drop since %s", dropReq.tid, tstrerror(code));
×
1977
  }
NEW
1978
  if (pJson != NULL) {
×
NEW
1979
    tjsonDelete(pJson);
×
1980
  }
NEW
1981
  tFreeSMDropXnodeTaskReq(&dropReq);
×
NEW
1982
  TAOS_RETURN(code);
×
1983
}
1984
static int32_t mndRetrieveXnodeTasks(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
816✔
1985
  SMnode        *pMnode = pReq->info.node;
816✔
1986
  SSdb          *pSdb = pMnode->pSdb;
816✔
1987
  int32_t        numOfRows = 0;
816✔
1988
  int32_t        cols = 0;
816✔
1989
  SXnodeTaskObj *pObj = NULL;
816✔
NEW
1990
  char           buf[VARSTR_HEADER_SIZE +
×
1991
           TMAX(TSDB_XNODE_TASK_NAME_LEN,
816✔
1992
                          TMAX(TSDB_XNODE_TASK_SOURCE_LEN, TMAX(TSDB_XNODE_TASK_SINK_LEN, TSDB_XNODE_TASK_PARSER_LEN)))];
1993
  int32_t        code = 0;
816✔
1994
  mDebug("show.type:%d, %s:%d: retrieve xnode tasks with rows: %d", pShow->type, __FILE__, __LINE__, rows);
816✔
1995

1996
  while (numOfRows < rows) {
816✔
1997
    pShow->pIter = sdbFetch(pSdb, SDB_XNODE_TASK, pShow->pIter, (void **)&pObj);
816✔
1998
    if (pShow->pIter == NULL) break;
816✔
1999

NEW
2000
    cols = 0;
×
2001
    // id
NEW
2002
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2003
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
×
NEW
2004
    if (code != 0) goto _end;
×
2005

2006
    // name
NEW
2007
    buf[0] = 0;
×
NEW
2008
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->name, pShow->pMeta->pSchemas[cols].bytes);
×
NEW
2009
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2010
    code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
NEW
2011
    if (code != 0) goto _end;
×
2012

2013
    // from
NEW
2014
    buf[0] = 0;
×
NEW
2015
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->sourceDsn, pShow->pMeta->pSchemas[cols].bytes);
×
NEW
2016
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2017
    code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
NEW
2018
    if (code != 0) goto _end;
×
2019

2020
    // to
NEW
2021
    buf[0] = 0;
×
NEW
2022
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->sinkDsn, pShow->pMeta->pSchemas[cols].bytes);
×
NEW
2023
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2024
    code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
NEW
2025
    if (code != 0) goto _end;
×
2026

2027
    // parser
NEW
2028
    if (pObj->parserLen > 0 && pObj->parser != NULL) {
×
NEW
2029
      buf[0] = 0;
×
NEW
2030
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->parser, pShow->pMeta->pSchemas[cols].bytes);
×
NEW
2031
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2032
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
NEW
2033
      if (code != 0) goto _end;
×
2034
    } else {
NEW
2035
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2036
      colDataSetNULL(pColInfo, numOfRows);
×
2037
    }
2038

2039
    // via
NEW
2040
    if (pObj->via != 0) {
×
NEW
2041
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2042
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->via, false);
×
NEW
2043
      if (code != 0) goto _end;
×
2044
    } else {
NEW
2045
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2046
      colDataSetNULL(pColInfo, numOfRows);
×
2047
    }
2048

2049
    // xnode_id
NEW
2050
    if (pObj->xnodeId != 0) {
×
NEW
2051
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2052
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->xnodeId, false);
×
NEW
2053
      if (code != 0) goto _end;
×
2054
    } else {
NEW
2055
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2056
      colDataSetNULL(pColInfo, numOfRows);
×
2057
    }
2058

2059
    // status
NEW
2060
    if (pObj->statusLen > 0) {
×
NEW
2061
      buf[0] = 0;
×
NEW
2062
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->status, pShow->pMeta->pSchemas[cols].bytes);
×
NEW
2063
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2064
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
NEW
2065
      if (code != 0) goto _end;
×
2066
    } else {
NEW
2067
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2068
      colDataSetNULL(pColInfo, numOfRows);
×
2069
    }
2070

2071
    // reason
NEW
2072
    if (pObj->reasonLen > 0) {
×
NEW
2073
      buf[0] = 0;
×
NEW
2074
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->reason, pShow->pMeta->pSchemas[cols].bytes);
×
NEW
2075
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2076
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
NEW
2077
      if (code != 0) goto _end;
×
2078
    } else {
NEW
2079
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2080
      colDataSetNULL(pColInfo, numOfRows);
×
2081
    }
2082

2083
    // create_time
NEW
2084
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2085
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createTime, false);
×
NEW
2086
    if (code != 0) goto _end;
×
2087

2088
    // update_time
NEW
2089
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
2090
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->updateTime, false);
×
NEW
2091
    if (code != 0) goto _end;
×
2092

NEW
2093
    numOfRows++;
×
NEW
2094
    sdbRelease(pSdb, pObj);
×
2095
  }
2096

2097
_end:
816✔
2098
  if (code != 0 && pObj != NULL) sdbRelease(pSdb, pObj);
816✔
2099

2100
  pShow->numOfRows += numOfRows;
816✔
2101
  return numOfRows;
816✔
2102
}
2103

NEW
2104
static void mndCancelGetNextXnodeTask(SMnode *pMnode, void *pIter) {
×
NEW
2105
  SSdb *pSdb = pMnode->pSdb;
×
NEW
2106
  sdbCancelFetchByType(pSdb, pIter, SDB_XNODE_TASK);
×
NEW
2107
}
×
2108

2109
/** xnode job section **/
2110

NEW
2111
static int32_t mndAcquireXnodeJobsByTaskId(SMnode *pMnode, int32_t tid, SArray **ppArray) {
×
NEW
2112
  int32_t code = 0;
×
NEW
2113
  SSdb   *pSdb = pMnode->pSdb;
×
2114

NEW
2115
  *ppArray = taosArrayInit(16, sizeof(SXnodeJobObj));
×
NEW
2116
  if (ppArray == NULL) {
×
NEW
2117
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
2118
    if (terrno != 0) code = terrno;
×
NEW
2119
    goto _exit;
×
2120
  }
2121

NEW
2122
  int32_t idx = 0;
×
NEW
2123
  void   *pIter = NULL;
×
NEW
2124
  while (1) {
×
NEW
2125
    SXnodeJobObj *pJob = NULL;
×
NEW
2126
    pIter = sdbFetch(pSdb, SDB_XNODE_JOB, pIter, (void **)&pJob);
×
NEW
2127
    if (pIter == NULL) break;
×
2128

NEW
2129
    if (pJob->taskId == tid) {
×
NEW
2130
      if (NULL == taosArrayInsert(*ppArray, idx++, pJob)) {
×
NEW
2131
        code = terrno;
×
NEW
2132
        sdbRelease(pSdb, pJob);
×
NEW
2133
        goto _exit;
×
2134
      }
2135
    }
2136

NEW
2137
    sdbRelease(pSdb, pJob);
×
2138
  }
NEW
2139
  sdbCancelFetch(pSdb, pIter);
×
2140

NEW
2141
_exit:
×
NEW
2142
  TAOS_RETURN(code);
×
2143
}
2144

NEW
2145
static int32_t mndAcquireXnodeJobsAll(SMnode *pMnode, SArray **ppArray) {
×
NEW
2146
  int32_t code = 0;
×
NEW
2147
  SSdb   *pSdb = pMnode->pSdb;
×
2148

NEW
2149
  *ppArray = taosArrayInit(64, sizeof(SXnodeJobObj));
×
NEW
2150
  if (ppArray == NULL) {
×
NEW
2151
    code = terrno;
×
NEW
2152
    goto _exit;
×
2153
  }
2154

NEW
2155
  int32_t idx = 0;
×
NEW
2156
  void   *pIter = NULL;
×
NEW
2157
  while (1) {
×
NEW
2158
    SXnodeJobObj *pJob = NULL;
×
NEW
2159
    pIter = sdbFetch(pSdb, SDB_XNODE_JOB, pIter, (void **)&pJob);
×
NEW
2160
    if (pIter == NULL) break;
×
NEW
2161
    if (NULL == taosArrayInsert(*ppArray, idx++, pJob)) {
×
NEW
2162
      code = terrno;
×
NEW
2163
      goto _exit;
×
2164
    }
2165
  }
NEW
2166
  sdbCancelFetch(pSdb, pIter);
×
2167

NEW
2168
_exit:
×
NEW
2169
  TAOS_RETURN(code);
×
2170
}
2171

NEW
2172
static void mndFreeXnodeJob(SXnodeJobObj *pObj) {
×
NEW
2173
  if (NULL != pObj->config) {
×
NEW
2174
    taosMemoryFreeClear(pObj->config);
×
2175
  }
NEW
2176
  if (NULL != pObj->reason) {
×
NEW
2177
    taosMemoryFreeClear(pObj->reason);
×
2178
  }
NEW
2179
  if (NULL != pObj->status) {
×
NEW
2180
    taosMemoryFreeClear(pObj->status);
×
2181
  }
NEW
2182
}
×
2183

NEW
2184
SSdbRaw *mndXnodeJobActionEncode(SXnodeJobObj *pObj) {
×
NEW
2185
  int32_t code = 0;
×
NEW
2186
  int32_t lino = 0;
×
NEW
2187
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2188

NEW
2189
  if (NULL == pObj) {
×
NEW
2190
    terrno = TSDB_CODE_INVALID_PARA;
×
NEW
2191
    return NULL;
×
2192
  }
2193

NEW
2194
  mDebug("xnode tid:%d, jid:%d, start to encode to raw, row:%p", pObj->taskId, pObj->id, pObj);
×
2195

NEW
2196
  int32_t rawDataLen = sizeof(SXnodeJobObj) + TSDB_XNODE_RESERVE_SIZE + pObj->configLen + pObj->reasonLen;
×
2197

NEW
2198
  SSdbRaw *pRaw = sdbAllocRaw(SDB_XNODE_JOB, TSDB_XNODE_VER_NUMBER, rawDataLen);
×
NEW
2199
  if (pRaw == NULL) goto _OVER;
×
2200

NEW
2201
  int32_t dataPos = 0;
×
NEW
2202
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
×
NEW
2203
  SDB_SET_INT32(pRaw, dataPos, pObj->taskId, _OVER)
×
NEW
2204
  SDB_SET_INT32(pRaw, dataPos, pObj->configLen, _OVER)
×
NEW
2205
  SDB_SET_BINARY(pRaw, dataPos, pObj->config, pObj->configLen, _OVER)
×
NEW
2206
  SDB_SET_INT32(pRaw, dataPos, pObj->via, _OVER)
×
NEW
2207
  SDB_SET_INT32(pRaw, dataPos, pObj->xnodeId, _OVER)
×
NEW
2208
  SDB_SET_INT32(pRaw, dataPos, pObj->statusLen, _OVER)
×
NEW
2209
  SDB_SET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
×
NEW
2210
  SDB_SET_INT32(pRaw, dataPos, pObj->reasonLen, _OVER)
×
NEW
2211
  SDB_SET_BINARY(pRaw, dataPos, pObj->reason, pObj->reasonLen, _OVER)
×
NEW
2212
  SDB_SET_INT64(pRaw, dataPos, pObj->createTime, _OVER)
×
NEW
2213
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
×
2214

NEW
2215
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
×
2216

NEW
2217
  terrno = 0;
×
2218

NEW
2219
_OVER:
×
NEW
2220
  if (terrno != 0) {
×
NEW
2221
    mError("xnode tid:%d, jid:%d, failed to encode to raw:%p since %s", pObj->taskId, pObj->id, pRaw, terrstr());
×
NEW
2222
    sdbFreeRaw(pRaw);
×
NEW
2223
    return NULL;
×
2224
  }
2225

NEW
2226
  mTrace("xnode tid:%d, jid:%d, encode to raw:%p, row:%p", pObj->taskId, pObj->id, pRaw, pObj);
×
NEW
2227
  return pRaw;
×
2228
}
2229

NEW
2230
SSdbRow *mndXnodeJobActionDecode(SSdbRaw *pRaw) {
×
NEW
2231
  mInfo("xnode, start to decode from raw:%p", pRaw);
×
NEW
2232
  int32_t code = 0;
×
NEW
2233
  int32_t lino = 0;
×
NEW
2234
  terrno = TSDB_CODE_OUT_OF_MEMORY;
×
NEW
2235
  SSdbRow      *pRow = NULL;
×
NEW
2236
  SXnodeJobObj *pObj = NULL;
×
2237

NEW
2238
  if (NULL == pRaw) {
×
NEW
2239
    terrno = TSDB_CODE_INVALID_PARA;
×
NEW
2240
    return NULL;
×
2241
  }
2242

NEW
2243
  int8_t sver = 0;
×
NEW
2244
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
×
2245

NEW
2246
  if (sver != TSDB_XNODE_VER_NUMBER) {
×
NEW
2247
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
NEW
2248
    goto _OVER;
×
2249
  }
2250

NEW
2251
  pRow = sdbAllocRow(sizeof(SXnodeJobObj));
×
NEW
2252
  if (pRow == NULL) goto _OVER;
×
2253

NEW
2254
  pObj = sdbGetRowObj(pRow);
×
NEW
2255
  if (pObj == NULL) goto _OVER;
×
2256

NEW
2257
  int32_t dataPos = 0;
×
NEW
2258
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
×
NEW
2259
  SDB_GET_INT32(pRaw, dataPos, &pObj->taskId, _OVER)
×
2260

NEW
2261
  SDB_GET_INT32(pRaw, dataPos, &pObj->configLen, _OVER)
×
NEW
2262
  if (pObj->configLen > 0) {
×
NEW
2263
    pObj->config = taosMemoryCalloc(pObj->configLen, 1);
×
NEW
2264
    if (pObj->config == NULL) goto _OVER;
×
NEW
2265
    SDB_GET_BINARY(pRaw, dataPos, pObj->config, pObj->configLen, _OVER)
×
2266
  }
2267

NEW
2268
  SDB_GET_INT32(pRaw, dataPos, &pObj->via, _OVER)
×
NEW
2269
  SDB_GET_INT32(pRaw, dataPos, &pObj->xnodeId, _OVER)
×
NEW
2270
  SDB_GET_INT32(pRaw, dataPos, &pObj->statusLen, _OVER)
×
NEW
2271
  if (pObj->statusLen > 0) {
×
NEW
2272
    pObj->status = taosMemoryCalloc(pObj->statusLen, 1);
×
NEW
2273
    if (pObj->status == NULL) goto _OVER;
×
NEW
2274
    SDB_GET_BINARY(pRaw, dataPos, pObj->status, pObj->statusLen, _OVER)
×
2275
  }
2276

NEW
2277
  SDB_GET_INT32(pRaw, dataPos, &pObj->reasonLen, _OVER)
×
NEW
2278
  if (pObj->reasonLen > 0) {
×
NEW
2279
    pObj->reason = taosMemoryCalloc(pObj->reasonLen, 1);
×
NEW
2280
    if (pObj->reason == NULL) goto _OVER;
×
NEW
2281
    SDB_GET_BINARY(pRaw, dataPos, pObj->reason, pObj->reasonLen, _OVER)
×
2282
  }
2283

NEW
2284
  SDB_GET_INT64(pRaw, dataPos, &pObj->createTime, _OVER)
×
NEW
2285
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
×
2286

NEW
2287
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
×
2288

NEW
2289
  terrno = 0;
×
2290

NEW
2291
_OVER:
×
NEW
2292
  if (terrno != 0) {
×
NEW
2293
    mError("xnode tid:%d, jid:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->taskId,
×
2294
           pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
NEW
2295
    if (pObj != NULL) {
×
NEW
2296
      taosMemoryFreeClear(pObj->config);
×
NEW
2297
      taosMemoryFreeClear(pObj->reason);
×
2298
    }
NEW
2299
    taosMemoryFreeClear(pRow);
×
NEW
2300
    return NULL;
×
2301
  }
2302

NEW
2303
  mTrace("xnode:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
×
NEW
2304
  return pRow;
×
2305
}
2306

NEW
2307
int32_t mndXnodeJobActionInsert(SSdb *pSdb, SXnodeJobObj *pObj) {
×
NEW
2308
  mInfo("xnode tid:%d, jid:%d, perform insert action, row:%p", pObj->taskId, pObj->id, pObj);
×
NEW
2309
  return 0;
×
2310
}
2311

NEW
2312
int32_t mndXnodeJobActionDelete(SSdb *pSdb, SXnodeJobObj *pObj) {
×
NEW
2313
  mDebug("xnode tid:%d, jid:%d, perform delete action, row:%p", pObj->taskId, pObj->id, pObj);
×
NEW
2314
  mndFreeXnodeJob(pObj);
×
NEW
2315
  return 0;
×
2316
}
2317

NEW
2318
int32_t mndXnodeJobActionUpdate(SSdb *pSdb, SXnodeJobObj *pOld, SXnodeJobObj *pNew) {
×
NEW
2319
  mDebug("xnode tid:%d, jid:%d, perform update action, old row:%p new row:%p", pOld->taskId, pOld->id, pOld, pNew);
×
2320

NEW
2321
  taosWLockLatch(&pOld->lock);
×
NEW
2322
  pOld->via = pNew->via;
×
NEW
2323
  pOld->xnodeId = pNew->xnodeId;
×
NEW
2324
  swapFields(&pNew->statusLen, &pNew->status, &pOld->statusLen, &pOld->status);
×
NEW
2325
  swapFields(&pNew->configLen, &pNew->config, &pOld->configLen, &pOld->config);
×
NEW
2326
  swapFields(&pNew->reasonLen, &pNew->reason, &pOld->reasonLen, &pOld->reason);
×
NEW
2327
  if (pNew->updateTime > pOld->updateTime) {
×
NEW
2328
    pOld->updateTime = pNew->updateTime;
×
2329
  }
NEW
2330
  taosWUnLockLatch(&pOld->lock);
×
NEW
2331
  return 0;
×
2332
}
2333

2334
/* xnode user pass actions */
2335
SSdbRaw *mndXnodeUserPassActionEncode(SXnodeUserPassObj *pObj) {
354✔
2336
  int32_t code = 0;
354✔
2337
  int32_t lino = 0;
354✔
2338
  terrno = TSDB_CODE_OUT_OF_MEMORY;
354✔
2339

2340
  if (NULL == pObj) {
354✔
NEW
2341
    terrno = TSDB_CODE_INVALID_PARA;
×
NEW
2342
    return NULL;
×
2343
  }
2344

2345
  int32_t rawDataLen = sizeof(SXnodeUserPassObj) + TSDB_XNODE_RESERVE_SIZE + pObj->userLen + pObj->passLen;
354✔
2346

2347
  SSdbRaw *pRaw = sdbAllocRaw(SDB_XNODE_USER_PASS, TSDB_XNODE_VER_NUMBER, rawDataLen);
354✔
2348
  if (pRaw == NULL) goto _OVER;
354✔
2349

2350
  int32_t dataPos = 0;
354✔
2351
  SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
354✔
2352
  SDB_SET_INT32(pRaw, dataPos, pObj->userLen, _OVER)
354✔
2353
  SDB_SET_BINARY(pRaw, dataPos, pObj->user, pObj->userLen, _OVER)
354✔
2354
  SDB_SET_INT32(pRaw, dataPos, pObj->passLen, _OVER)
354✔
2355
  SDB_SET_BINARY(pRaw, dataPos, pObj->pass, pObj->passLen, _OVER)
354✔
2356
  SDB_SET_INT64(pRaw, dataPos, pObj->createTime, _OVER)
354✔
2357
  SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
354✔
2358

2359
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
354✔
2360

2361
  terrno = 0;
354✔
2362

2363
_OVER:
354✔
2364
  if (terrno != 0) {
354✔
NEW
2365
    mError("xnode user pass:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
×
NEW
2366
    sdbFreeRaw(pRaw);
×
NEW
2367
    return NULL;
×
2368
  }
2369

2370
  mTrace("xnode user pass:%d, encode to raw:%p, row:%p", pObj->id, pRaw, pObj);
354✔
2371
  return pRaw;
354✔
2372
}
2373
SSdbRow *mndXnodeUserPassActionDecode(SSdbRaw *pRaw) {
236✔
2374
  int32_t code = 0;
236✔
2375
  int32_t lino = 0;
236✔
2376
  terrno = TSDB_CODE_OUT_OF_MEMORY;
236✔
2377
  SSdbRow           *pRow = NULL;
236✔
2378
  SXnodeUserPassObj *pObj = NULL;
236✔
2379

2380
  if (NULL == pRaw) {
236✔
NEW
2381
    terrno = TSDB_CODE_INVALID_PARA;
×
NEW
2382
    return NULL;
×
2383
  }
2384

2385
  int8_t sver = 0;
236✔
2386
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
236✔
2387

2388
  if (sver != TSDB_XNODE_VER_NUMBER) {
236✔
NEW
2389
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
×
NEW
2390
    goto _OVER;
×
2391
  }
2392

2393
  pRow = sdbAllocRow(sizeof(SXnodeUserPassObj));
236✔
2394
  if (pRow == NULL) goto _OVER;
236✔
2395

2396
  pObj = sdbGetRowObj(pRow);
236✔
2397
  if (pObj == NULL) goto _OVER;
236✔
2398

2399
  int32_t dataPos = 0;
236✔
2400
  SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
236✔
2401
  SDB_GET_INT32(pRaw, dataPos, &pObj->userLen, _OVER)
236✔
2402
  if (pObj->userLen > 0) {
236✔
2403
    pObj->user = taosMemoryCalloc(pObj->userLen, 1);
236✔
2404
    if (pObj->user == NULL) goto _OVER;
236✔
2405
    SDB_GET_BINARY(pRaw, dataPos, pObj->user, pObj->userLen, _OVER)
236✔
2406
  }
2407
  SDB_GET_INT32(pRaw, dataPos, &pObj->passLen, _OVER)
236✔
2408
  if (pObj->passLen > 0) {
236✔
2409
    pObj->pass = taosMemoryCalloc(pObj->passLen, 1);
236✔
2410
    if (pObj->pass == NULL) goto _OVER;
236✔
2411
    SDB_GET_BINARY(pRaw, dataPos, pObj->pass, pObj->passLen, _OVER)
236✔
2412
  }
2413
  SDB_GET_INT64(pRaw, dataPos, &pObj->createTime, _OVER)
236✔
2414
  SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
236✔
2415

2416
  SDB_GET_RESERVE(pRaw, dataPos, TSDB_XNODE_RESERVE_SIZE, _OVER)
236✔
2417

2418
  terrno = 0;
236✔
2419

2420
_OVER:
236✔
2421
  if (terrno != 0) {
236✔
NEW
2422
    mError("xnode user pass:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
×
NEW
2423
    if (pObj != NULL) {
×
NEW
2424
      if (pObj->user != NULL) {
×
NEW
2425
        taosMemoryFreeClear(pObj->user);
×
2426
      }
NEW
2427
      if (pObj->pass != NULL) {
×
NEW
2428
        taosMemoryFreeClear(pObj->pass);
×
2429
      }
2430
    }
NEW
2431
    taosMemoryFreeClear(pRow);
×
NEW
2432
    return NULL;
×
2433
  }
2434

2435
  mTrace("xnode user pass:%d, decode from raw:%p, row:%p", pObj->id, pRaw, pObj);
236✔
2436
  return pRow;
236✔
2437
}
2438
int32_t mndXnodeUserPassActionInsert(SSdb *pSdb, SXnodeUserPassObj *pObj) {
118✔
2439
  mDebug("xnode user pass:%d, perform insert action, row:%p", pObj->id, pObj);
118✔
2440
  return 0;
118✔
2441
}
2442
int32_t mndXnodeUserPassActionUpdate(SSdb *pSdb, SXnodeUserPassObj *pOld, SXnodeUserPassObj *pNew) {
118✔
2443
  mDebug("xnode user pass:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew);
118✔
2444
  taosWLockLatch(&pOld->lock);
118✔
2445
  if (pNew->updateTime > pOld->updateTime) {
118✔
NEW
2446
    pOld->updateTime = pNew->updateTime;
×
2447
  }
2448
  taosWUnLockLatch(&pOld->lock);
118✔
2449
  return 0;
118✔
2450
}
2451
int32_t mndXnodeUserPassActionDelete(SSdb *pSdb, SXnodeUserPassObj *pObj) {
236✔
2452
  mDebug("xnode:%d, perform delete action, row:%p", pObj->id, pObj);
236✔
2453
  if (pObj->user != NULL) {
236✔
2454
    taosMemoryFreeClear(pObj->user);
236✔
2455
  }
2456
  if (pObj->pass != NULL) {
236✔
2457
    taosMemoryFreeClear(pObj->pass);
236✔
2458
  }
2459
  return 0;
236✔
2460
}
2461

NEW
2462
static int32_t mndSetCreateXnodeJobRedoLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
NEW
2463
  int32_t  code = 0;
×
NEW
2464
  SSdbRaw *pRedoRaw = mndXnodeJobActionEncode(pObj);
×
NEW
2465
  if (pRedoRaw == NULL) {
×
NEW
2466
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
2467
    if (terrno != 0) code = terrno;
×
NEW
2468
    TAOS_RETURN(code);
×
2469
  }
NEW
2470
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
NEW
2471
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING));
×
NEW
2472
  TAOS_RETURN(code);
×
2473
}
2474

NEW
2475
static int32_t mndSetCreateXnodeJobUndoLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
NEW
2476
  int32_t  code = 0;
×
NEW
2477
  SSdbRaw *pUndoRaw = mndXnodeJobActionEncode(pObj);
×
NEW
2478
  if (pUndoRaw == NULL) {
×
NEW
2479
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
2480
    if (terrno != 0) code = terrno;
×
NEW
2481
    TAOS_RETURN(code);
×
2482
  }
NEW
2483
  TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw));
×
NEW
2484
  TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED));
×
NEW
2485
  TAOS_RETURN(code);
×
2486
}
2487

NEW
2488
static int32_t mndSetCreateXnodeJobCommitLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
NEW
2489
  int32_t  code = 0;
×
NEW
2490
  SSdbRaw *pCommitRaw = mndXnodeJobActionEncode(pObj);
×
NEW
2491
  if (pCommitRaw == NULL) {
×
NEW
2492
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
2493
    if (terrno != 0) code = terrno;
×
NEW
2494
    TAOS_RETURN(code);
×
2495
  }
NEW
2496
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
NEW
2497
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY));
×
NEW
2498
  TAOS_RETURN(code);
×
2499
}
2500

NEW
2501
static int32_t mndCreateXnodeJob(SMnode *pMnode, SRpcMsg *pReq, SMCreateXnodeJobReq *pCreate) {
×
NEW
2502
  int32_t code = -1;
×
NEW
2503
  STrans *pTrans = NULL;
×
2504

NEW
2505
  SXnodeJobObj jobObj = {0};
×
NEW
2506
  jobObj.id = sdbGetMaxId(pMnode->pSdb, SDB_XNODE_JOB);
×
NEW
2507
  jobObj.taskId = pCreate->tid;
×
2508

NEW
2509
  jobObj.configLen = pCreate->configLen;
×
NEW
2510
  if (jobObj.configLen > TSDB_XNODE_TASK_JOB_CONFIG_LEN) {
×
NEW
2511
    code = TSDB_CODE_MND_XNODE_TASK_JOB_CONFIG_TOO_LONG;
×
NEW
2512
    goto _OVER;
×
2513
  }
NEW
2514
  jobObj.config = taosMemoryCalloc(1, pCreate->configLen);
×
NEW
2515
  if (jobObj.config == NULL) goto _OVER;
×
NEW
2516
  (void)memcpy(jobObj.config, pCreate->config, pCreate->configLen);
×
2517

NEW
2518
  jobObj.via = pCreate->via;
×
NEW
2519
  jobObj.xnodeId = pCreate->xnodeId;
×
2520

NEW
2521
  jobObj.statusLen = pCreate->status.len;
×
NEW
2522
  if (pCreate->status.len > 0) {
×
NEW
2523
    jobObj.status = taosMemoryCalloc(1, jobObj.statusLen);
×
NEW
2524
    if (jobObj.status == NULL) goto _OVER;
×
NEW
2525
    (void)memmove(jobObj.status, pCreate->status.ptr, jobObj.statusLen);
×
2526
  }
2527

NEW
2528
  jobObj.reasonLen = pCreate->reasonLen;
×
NEW
2529
  if (jobObj.reasonLen > TSDB_XNODE_TASK_REASON_LEN) {
×
NEW
2530
    code = TSDB_CODE_MND_XNODE_TASK_REASON_TOO_LONG;
×
NEW
2531
    goto _OVER;
×
2532
  }
NEW
2533
  if (jobObj.reasonLen > 0) {
×
NEW
2534
    jobObj.reason = taosMemoryCalloc(1, pCreate->reasonLen);
×
NEW
2535
    if (jobObj.reason == NULL) goto _OVER;
×
NEW
2536
    (void)memcpy(jobObj.reason, pCreate->reason, pCreate->reasonLen);
×
2537
  }
2538

NEW
2539
  jobObj.createTime = taosGetTimestampMs();
×
NEW
2540
  jobObj.updateTime = jobObj.createTime;
×
2541

NEW
2542
  mDebug("create xnode job, id:%d, tid:%d, config:%s, time:%" PRId64, jobObj.id, jobObj.taskId, jobObj.config,
×
2543
         jobObj.createTime);
2544

NEW
2545
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-xnode-job");
×
NEW
2546
  if (pTrans == NULL) {
×
NEW
2547
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
2548
    if (terrno != 0) code = terrno;
×
NEW
2549
    mInfo("failed to create transaction for xnode-job:%d, code:0x%x:%s", pCreate->tid, code, tstrerror(code));
×
NEW
2550
    goto _OVER;
×
2551
  }
NEW
2552
  mndTransSetSerial(pTrans);
×
2553

NEW
2554
  mInfo("trans:%d, used to create xnode job on %d as jid:%d", pTrans->id, pCreate->tid, jobObj.id);
×
2555

NEW
2556
  TAOS_CHECK_GOTO(mndSetCreateXnodeJobRedoLogs(pTrans, &jobObj), NULL, _OVER);
×
NEW
2557
  TAOS_CHECK_GOTO(mndSetCreateXnodeJobUndoLogs(pTrans, &jobObj), NULL, _OVER);
×
NEW
2558
  TAOS_CHECK_GOTO(mndSetCreateXnodeJobCommitLogs(pTrans, &jobObj), NULL, _OVER);
×
NEW
2559
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
2560

NEW
2561
  code = 0;
×
2562

NEW
2563
_OVER:
×
NEW
2564
  mndFreeXnodeJob(&jobObj);
×
NEW
2565
  mndTransDrop(pTrans);
×
NEW
2566
  TAOS_RETURN(code);
×
2567
}
2568

NEW
2569
static int32_t mndUpdateXnodeJob(SMnode *pMnode, SRpcMsg *pReq, SXnodeJobObj *pOld, SMUpdateXnodeJobReq *pUpdate) {
×
NEW
2570
  mInfo("xnode job:%d, start to update", pUpdate->jid);
×
NEW
2571
  int32_t      code = -1;
×
NEW
2572
  STrans      *pTrans = NULL;
×
NEW
2573
  SXnodeJobObj jobObj = *pOld;
×
2574
  struct {
2575
    bool status;
2576
    bool config;
2577
    bool reason;
NEW
2578
  } isChange = {0};
×
2579

NEW
2580
  jobObj.id = pUpdate->jid;
×
NEW
2581
  if (pUpdate->via > 0) {
×
NEW
2582
    jobObj.via = pUpdate->via;
×
2583
  }
NEW
2584
  if (pUpdate->xnodeId > 0) {
×
NEW
2585
    jobObj.xnodeId = pUpdate->xnodeId;
×
2586
  }
NEW
2587
  if (pUpdate->status.len > 0) {
×
NEW
2588
    jobObj.statusLen = pUpdate->status.len;
×
NEW
2589
    jobObj.status = taosMemoryCalloc(1, jobObj.statusLen);
×
NEW
2590
    if (jobObj.status == NULL) goto _OVER;
×
NEW
2591
    (void)memcpy(jobObj.status, pUpdate->status.ptr, jobObj.statusLen);
×
NEW
2592
    isChange.status = true;
×
2593
  }
NEW
2594
  if (pUpdate->configLen > 0) {
×
NEW
2595
    jobObj.configLen = pUpdate->configLen;
×
NEW
2596
    jobObj.config = taosMemoryCalloc(1, pUpdate->configLen);
×
NEW
2597
    if (jobObj.config == NULL) goto _OVER;
×
NEW
2598
    (void)memcpy(jobObj.config, pUpdate->config, pUpdate->configLen);
×
NEW
2599
    isChange.config = true;
×
2600
  }
NEW
2601
  if (pUpdate->reasonLen > 0) {
×
NEW
2602
    jobObj.reasonLen = pUpdate->reasonLen;
×
NEW
2603
    jobObj.reason = taosMemoryCalloc(1, pUpdate->reasonLen);
×
NEW
2604
    if (jobObj.reason == NULL) goto _OVER;
×
NEW
2605
    (void)memcpy(jobObj.reason, pUpdate->reason, pUpdate->reasonLen);
×
NEW
2606
    isChange.reason = true;
×
2607
  }
NEW
2608
  jobObj.updateTime = taosGetTimestampMs();
×
2609

NEW
2610
  pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-xnode");
×
NEW
2611
  if (pTrans == NULL) {
×
NEW
2612
    code = TSDB_CODE_MND_RETURN_VALUE_NULL;
×
NEW
2613
    if (terrno != 0) code = terrno;
×
NEW
2614
    goto _OVER;
×
2615
  }
NEW
2616
  mInfo("trans:%d, used to update xnode job:%d", pTrans->id, jobObj.id);
×
2617

NEW
2618
  TAOS_CHECK_GOTO(mndSetCreateXnodeJobCommitLogs(pTrans, &jobObj), NULL, _OVER);
×
NEW
2619
  TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
×
NEW
2620
  code = 0;
×
2621

NEW
2622
_OVER:
×
NEW
2623
  if (NULL != jobObj.status && isChange.status) {
×
NEW
2624
    taosMemoryFree(jobObj.status);
×
2625
  }
NEW
2626
  if (NULL != jobObj.config && isChange.config) {
×
NEW
2627
    taosMemoryFree(jobObj.config);
×
2628
  }
NEW
2629
  if (NULL != jobObj.reason && isChange.reason) {
×
NEW
2630
    taosMemoryFree(jobObj.reason);
×
2631
  }
NEW
2632
  mndTransDrop(pTrans);
×
NEW
2633
  TAOS_RETURN(code);
×
2634
}
2635

NEW
2636
void mndReleaseXnodeTaskJob(SMnode *pMnode, SXnodeJobObj *pObj) {
×
NEW
2637
  SSdb *pSdb = pMnode->pSdb;
×
NEW
2638
  sdbRelease(pSdb, pObj);
×
NEW
2639
}
×
2640

NEW
2641
SXnodeJobObj *mndAcquireXnodeJob(SMnode *pMnode, int32_t jid) {
×
NEW
2642
  SXnodeJobObj *pObj = sdbAcquire(pMnode->pSdb, SDB_XNODE_JOB, &jid);
×
NEW
2643
  if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
×
NEW
2644
    terrno = TSDB_CODE_MND_XNODE_JOB_NOT_EXIST;
×
2645
  }
NEW
2646
  return pObj;
×
2647
}
NEW
2648
void mndReleaseXnodeJob(SMnode *pMnode, SXnodeJobObj *pObj) {
×
NEW
2649
  SSdb *pSdb = pMnode->pSdb;
×
NEW
2650
  sdbRelease(pSdb, pObj);
×
NEW
2651
}
×
2652

NEW
2653
static int32_t mndSetDropXnodeJobRedoLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
NEW
2654
  int32_t  code = 0;
×
NEW
2655
  SSdbRaw *pRedoRaw = mndXnodeJobActionEncode(pObj);
×
NEW
2656
  if (pRedoRaw == NULL) {
×
NEW
2657
    code = terrno;
×
NEW
2658
    return code;
×
2659
  }
2660

NEW
2661
  TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw));
×
NEW
2662
  TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING));
×
2663

NEW
2664
  return code;
×
2665
}
2666

NEW
2667
static int32_t mndSetDropXnodeJobCommitLogs(STrans *pTrans, SXnodeJobObj *pObj) {
×
NEW
2668
  int32_t  code = 0;
×
NEW
2669
  SSdbRaw *pCommitRaw = mndXnodeJobActionEncode(pObj);
×
NEW
2670
  if (pCommitRaw == NULL) {
×
NEW
2671
    code = terrno;
×
NEW
2672
    return code;
×
2673
  }
2674

NEW
2675
  TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw));
×
NEW
2676
  TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED));
×
NEW
2677
  TAOS_RETURN(code);
×
2678
}
NEW
2679
static int32_t mndSetDropXnodeJobInfoToTrans(STrans *pTrans, SXnodeJobObj *pObj, bool force) {
×
NEW
2680
  if (pObj == NULL) {
×
NEW
2681
    return 0;
×
2682
  }
NEW
2683
  TAOS_CHECK_RETURN(mndSetDropXnodeJobRedoLogs(pTrans, pObj));
×
NEW
2684
  TAOS_CHECK_RETURN(mndSetDropXnodeJobCommitLogs(pTrans, pObj));
×
NEW
2685
  return 0;
×
2686
}
2687

NEW
2688
static int32_t mndDropXnodeJob(SMnode *pMnode, SRpcMsg *pReq, SXnodeJobObj *pObj) {
×
NEW
2689
  int32_t code = 0;
×
NEW
2690
  int32_t lino = 0;
×
2691

NEW
2692
  STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-xnode-job");
×
NEW
2693
  TSDB_CHECK_NULL(pTrans, code, lino, _OVER, terrno);
×
2694

NEW
2695
  mndTransSetSerial(pTrans);
×
NEW
2696
  mInfo("trans:%d, to drop xnode:%d", pTrans->id, pObj->id);
×
2697

NEW
2698
  code = mndSetDropXnodeJobInfoToTrans(pTrans, pObj, false);
×
2699

NEW
2700
  TSDB_CHECK_CODE(code, lino, _OVER);
×
2701

NEW
2702
  code = mndTransPrepare(pMnode, pTrans);
×
2703

NEW
2704
_OVER:
×
NEW
2705
  mndTransDrop(pTrans);
×
NEW
2706
  return code;
×
2707
}
NEW
2708
static int32_t mndProcessCreateXnodeJobReq(SRpcMsg *pReq) {
×
NEW
2709
  mDebug("create xnode job req, content len:%d", pReq->contLen);
×
NEW
2710
  SMnode             *pMnode = pReq->info.node;
×
NEW
2711
  int32_t             code = -1;
×
NEW
2712
  SMCreateXnodeJobReq createReq = {0};
×
2713

NEW
2714
  if ((code = grantCheck(TSDB_GRANT_XNODE)) != TSDB_CODE_SUCCESS) {
×
NEW
2715
    mError("failed to create xnode, code:%s", tstrerror(code));
×
NEW
2716
    goto _OVER;
×
2717
  }
2718

NEW
2719
  TAOS_CHECK_GOTO(tDeserializeSMCreateXnodeJobReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
×
2720

NEW
2721
  mDebug("xnode create job on xnode:%d", createReq.xnodeId);
×
NEW
2722
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_CREATE_XNODE_JOB), NULL, _OVER);
×
2723

NEW
2724
  code = mndCreateXnodeJob(pMnode, pReq, &createReq);
×
NEW
2725
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
2726

NEW
2727
_OVER:
×
NEW
2728
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
2729
    mError("xnode task job on task id:%d, failed to create since %s", createReq.tid, tstrerror(code));
×
2730
  }
2731

NEW
2732
  tFreeSMCreateXnodeJobReq(&createReq);
×
NEW
2733
  TAOS_RETURN(code);
×
2734
}
2735

NEW
2736
static int32_t mndProcessUpdateXnodeJobReq(SRpcMsg *pReq) {
×
NEW
2737
  SMnode             *pMnode = pReq->info.node;
×
NEW
2738
  int32_t             code = -1;
×
NEW
2739
  SXnodeJobObj       *pObj = NULL;
×
NEW
2740
  SMUpdateXnodeJobReq updateReq = {0};
×
2741

NEW
2742
  if ((code = grantCheck(TSDB_GRANT_TD_GPT)) != TSDB_CODE_SUCCESS) {
×
NEW
2743
    mError("failed to create xnode, code:%s", tstrerror(code));
×
NEW
2744
    goto _OVER;
×
2745
  }
2746

NEW
2747
  TAOS_CHECK_GOTO(tDeserializeSMUpdateXnodeJobReq(pReq->pCont, pReq->contLen, &updateReq), NULL, _OVER);
×
2748

NEW
2749
  pObj = mndAcquireXnodeJob(pMnode, updateReq.jid);
×
NEW
2750
  if (pObj == NULL) {
×
NEW
2751
    code = terrno;
×
NEW
2752
    goto _OVER;
×
2753
  }
2754

NEW
2755
  code = mndUpdateXnodeJob(pMnode, pReq, pObj, &updateReq);
×
NEW
2756
  if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
×
2757

NEW
2758
_OVER:
×
NEW
2759
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
2760
    mError("xnode task job on jid:%d, failed to update since %s", updateReq.jid, tstrerror(code));
×
2761
  }
2762

NEW
2763
  mndReleaseXnodeJob(pMnode, pObj);
×
NEW
2764
  tFreeSMUpdateXnodeJobReq(&updateReq);
×
NEW
2765
  TAOS_RETURN(code);
×
2766

2767
  return 0;
2768
}
2769

NEW
2770
static int32_t mndProcessRebalanceXnodeJobReq(SRpcMsg *pReq) {
×
NEW
2771
  SMnode                *pMnode = pReq->info.node;
×
NEW
2772
  int32_t                code = -1;
×
NEW
2773
  SXnodeJobObj          *pObj = NULL;
×
NEW
2774
  SMRebalanceXnodeJobReq rebalanceReq = {0};
×
NEW
2775
  SJson                 *pJson = NULL;
×
2776

NEW
2777
  TAOS_CHECK_GOTO(tDeserializeSMRebalanceXnodeJobReq(pReq->pCont, pReq->contLen, &rebalanceReq), NULL, _OVER);
×
2778

NEW
2779
  mDebug("RebalanceXnodeJob with jid:%d, xnode_id:%d, start to rebalance", rebalanceReq.jid, rebalanceReq.xnodeId);
×
NEW
2780
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_REBALANCE_XNODE_JOB), NULL, _OVER);
×
2781

NEW
2782
  if (rebalanceReq.jid <= 0) {
×
NEW
2783
    code = TSDB_CODE_INVALID_MSG;
×
NEW
2784
    goto _OVER;
×
2785
  }
2786

NEW
2787
  pObj = mndAcquireXnodeJob(pMnode, rebalanceReq.jid);
×
NEW
2788
  if (pObj == NULL) {
×
NEW
2789
    code = terrno;
×
NEW
2790
    goto _OVER;
×
2791
  }
2792

2793
  // send request
NEW
2794
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
NEW
2795
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/rebalance/manual/%d/%d/%d", XNODED_PIPE_SOCKET_URL, pObj->taskId, pObj->id,
×
2796
           rebalanceReq.xnodeId);
NEW
2797
  (void)mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, defaultTimeout, NULL, 0);
×
2798

NEW
2799
_OVER:
×
NEW
2800
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
2801
    mError("xnode:%d, failed to rebalance xnode job since %s", rebalanceReq.jid, tstrerror(code));
×
2802
  }
NEW
2803
  if (pJson != NULL) {
×
NEW
2804
    tjsonDelete(pJson);
×
2805
  }
NEW
2806
  mndReleaseXnodeJob(pMnode, pObj);
×
NEW
2807
  tFreeSMRebalanceXnodeJobReq(&rebalanceReq);
×
NEW
2808
  TAOS_RETURN(code);
×
2809
}
2810

2811
typedef struct {
2812
  SArray   *stack;
2813
  SHashObj *pMap;
2814
  int32_t   code;
2815
} SXndWhereContext;
2816

NEW
2817
void freeSXndWhereContext(SXndWhereContext *pCtx) {
×
NEW
2818
  if (pCtx == NULL) return;
×
2819

NEW
2820
  if (pCtx->pMap != NULL) {
×
NEW
2821
    taosHashCleanup(pCtx->pMap);
×
NEW
2822
    pCtx->pMap = NULL;
×
2823
  }
NEW
2824
  if (pCtx->stack != NULL) {
×
NEW
2825
    taosArrayDestroy(pCtx->stack);
×
NEW
2826
    pCtx->stack = NULL;
×
2827
  }
2828
}
2829

2830
typedef struct {
2831
  SValueNode nd;
2832
  bool       shouldFree;
2833
} SXndRefValueNode;
2834

NEW
2835
static void freeSXndRefValueNode(void *pNode) {
×
NEW
2836
  SXndRefValueNode *pRefNode = (SXndRefValueNode *)pNode;
×
NEW
2837
  if (pRefNode == NULL) return;
×
2838

NEW
2839
  if (pRefNode->shouldFree) {
×
NEW
2840
    if (NULL != pRefNode->nd.datum.p) {
×
NEW
2841
      taosMemoryFree(pRefNode->nd.datum.p);
×
NEW
2842
      pRefNode->nd.datum.p = NULL;
×
2843
    }
2844
  }
2845
}
2846

NEW
2847
static SHashObj *convertJob2Map(const SXnodeJobObj *pJob) {
×
NEW
2848
  SHashObj *pMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
×
NEW
2849
  if (pMap == NULL) {
×
NEW
2850
    return NULL;
×
2851
  }
NEW
2852
  taosHashSetFreeFp(pMap, freeSXndRefValueNode);
×
2853
  // id
NEW
2854
  SXndRefValueNode id = {0};
×
NEW
2855
  id.nd.node.type = QUERY_NODE_VALUE;
×
NEW
2856
  id.nd.node.resType.type = TSDB_DATA_TYPE_UBIGINT;
×
NEW
2857
  id.nd.datum.u = pJob->id;
×
NEW
2858
  int32_t code = taosHashPut(pMap, "id", strlen("id") + 1, &id, sizeof(SXndRefValueNode));
×
NEW
2859
  if (code != 0) {
×
NEW
2860
    taosHashCleanup(pMap);
×
NEW
2861
    return NULL;
×
2862
  }
2863
  // task id
NEW
2864
  SXndRefValueNode taskId = {0};
×
NEW
2865
  taskId.nd.node.type = QUERY_NODE_VALUE;
×
NEW
2866
  taskId.nd.node.resType.type = TSDB_DATA_TYPE_UBIGINT;
×
NEW
2867
  taskId.nd.datum.u = pJob->taskId;
×
NEW
2868
  if (pJob->taskId == 0) {
×
NEW
2869
    taskId.nd.isNull = true;
×
2870
  }
NEW
2871
  code = taosHashPut(pMap, "task_id", strlen("task_id") + 1, &taskId, sizeof(SXndRefValueNode));
×
NEW
2872
  if (code != 0) {
×
NEW
2873
    taosHashCleanup(pMap);
×
NEW
2874
    return NULL;
×
2875
  }
2876
  // via
NEW
2877
  SXndRefValueNode via = {0};
×
NEW
2878
  via.nd.node.type = QUERY_NODE_VALUE;
×
NEW
2879
  via.nd.node.resType.type = TSDB_DATA_TYPE_UBIGINT;
×
NEW
2880
  via.nd.datum.u = pJob->via;
×
NEW
2881
  if (pJob->via == 0) {
×
NEW
2882
    via.nd.isNull = true;
×
2883
  }
NEW
2884
  code = taosHashPut(pMap, "via", strlen("via") + 1, &via, sizeof(SXndRefValueNode));
×
NEW
2885
  if (code != 0) {
×
NEW
2886
    taosHashCleanup(pMap);
×
NEW
2887
    return NULL;
×
2888
  }
2889
  // xnode id
NEW
2890
  SXndRefValueNode xnodeId = {0};
×
NEW
2891
  xnodeId.nd.node.type = QUERY_NODE_VALUE;
×
NEW
2892
  xnodeId.nd.node.resType.type = TSDB_DATA_TYPE_UBIGINT;
×
NEW
2893
  xnodeId.nd.datum.u = pJob->xnodeId;
×
NEW
2894
  if (pJob->xnodeId == 0) {
×
NEW
2895
    xnodeId.nd.isNull = true;
×
2896
  }
NEW
2897
  code = taosHashPut(pMap, "xnode_id", strlen("xnode_id") + 1, &xnodeId, sizeof(SXndRefValueNode));
×
NEW
2898
  if (code != 0) {
×
NEW
2899
    taosHashCleanup(pMap);
×
NEW
2900
    return NULL;
×
2901
  }
2902
  // config
NEW
2903
  SXndRefValueNode config = {0};
×
NEW
2904
  config.nd.node.type = QUERY_NODE_VALUE;
×
NEW
2905
  if (pJob->configLen > 0) {
×
NEW
2906
    config.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
NEW
2907
    config.nd.datum.p = taosStrndupi(pJob->config, strlen(pJob->config) + 1);
×
NEW
2908
    config.shouldFree = true;
×
NEW
2909
    code = taosHashPut(pMap, "config", strlen("config") + 1, &config, sizeof(SXndRefValueNode));
×
2910
  } else {
NEW
2911
    config.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
NEW
2912
    config.nd.datum.p = NULL;
×
NEW
2913
    config.nd.isNull = true;
×
NEW
2914
    code = taosHashPut(pMap, "config", strlen("config") + 1, &config, sizeof(SXndRefValueNode));
×
2915
  }
NEW
2916
  if (code != 0) {
×
NEW
2917
    taosHashCleanup(pMap);
×
NEW
2918
    return NULL;
×
2919
  }
2920
  // status
NEW
2921
  SXndRefValueNode status = {0};
×
NEW
2922
  status.nd.node.type = QUERY_NODE_VALUE;
×
NEW
2923
  if (pJob->statusLen > 0) {
×
NEW
2924
    status.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
NEW
2925
    status.nd.datum.p = taosStrndupi(pJob->status, strlen(pJob->status) + 1);
×
NEW
2926
    status.shouldFree = true;
×
NEW
2927
    code = taosHashPut(pMap, "status", strlen("status") + 1, &status, sizeof(SXndRefValueNode));
×
2928
  } else {
NEW
2929
    status.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
NEW
2930
    status.nd.datum.p = NULL;
×
NEW
2931
    status.nd.isNull = true;
×
NEW
2932
    code = taosHashPut(pMap, "status", strlen("status") + 1, &status, sizeof(SXndRefValueNode));
×
2933
  }
NEW
2934
  if (code != 0) {
×
NEW
2935
    taosHashCleanup(pMap);
×
NEW
2936
    return NULL;
×
2937
  }
2938
  // reason
NEW
2939
  SXndRefValueNode reason = {0};
×
NEW
2940
  reason.nd.node.type = QUERY_NODE_VALUE;
×
NEW
2941
  if (pJob->reasonLen > 0) {
×
NEW
2942
    reason.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
NEW
2943
    reason.nd.datum.p = taosStrndupi(pJob->reason, strlen(pJob->reason) + 1);
×
NEW
2944
    reason.shouldFree = true;
×
NEW
2945
    code = taosHashPut(pMap, "reason", strlen("reason") + 1, &reason, sizeof(SXndRefValueNode));
×
2946
  } else {
NEW
2947
    reason.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
NEW
2948
    reason.nd.datum.p = NULL;
×
NEW
2949
    reason.nd.isNull = true;
×
NEW
2950
    code = taosHashPut(pMap, "reason", strlen("reason") + 1, &reason, sizeof(SXndRefValueNode));
×
2951
  }
NEW
2952
  if (code != 0) {
×
NEW
2953
    taosHashCleanup(pMap);
×
NEW
2954
    return NULL;
×
2955
  }
2956
  // create time
NEW
2957
  SXndRefValueNode createTime = {0};
×
NEW
2958
  createTime.nd.node.type = QUERY_NODE_VALUE;
×
NEW
2959
  createTime.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
NEW
2960
  createTime.nd.datum.p = taosMemoryCalloc(1, TD_TIME_STR_LEN);
×
NEW
2961
  createTime.nd.datum.p = formatTimestampLocal(createTime.nd.datum.p, pJob->createTime, TSDB_TIME_PRECISION_MILLI);
×
NEW
2962
  createTime.shouldFree = true;
×
NEW
2963
  code = taosHashPut(pMap, "create_time", strlen("create_time") + 1, &createTime, sizeof(SXndRefValueNode));
×
NEW
2964
  if (code != 0) {
×
NEW
2965
    taosHashCleanup(pMap);
×
NEW
2966
    return NULL;
×
2967
  }
2968
  // update time
NEW
2969
  SXndRefValueNode updateTime = {0};
×
NEW
2970
  updateTime.nd.node.type = QUERY_NODE_VALUE;
×
NEW
2971
  updateTime.nd.node.resType.type = TSDB_DATA_TYPE_BINARY;
×
NEW
2972
  updateTime.nd.datum.p = taosMemoryCalloc(1, TD_TIME_STR_LEN);
×
NEW
2973
  updateTime.nd.datum.p = formatTimestampLocal(updateTime.nd.datum.p, pJob->updateTime, TSDB_TIME_PRECISION_MILLI);
×
NEW
2974
  updateTime.shouldFree = true;
×
NEW
2975
  code = taosHashPut(pMap, "update_time", strlen("update_time") + 1, &updateTime, sizeof(SXndRefValueNode));
×
NEW
2976
  return pMap;
×
2977
}
2978

2979
typedef bool (*FOpCmp)(SValueNode *pval1, SValueNode *pval2);
2980

2981
#define XNODE_DEF_OP_FUNC(NAME, OP)                             \
2982
  static bool NAME(SValueNode *pval1, SValueNode *pval2) {      \
2983
    switch (pval1->node.resType.type) {                         \
2984
      case TSDB_DATA_TYPE_BOOL:                                 \
2985
        return pval1->datum.b OP pval2->datum.b;                \
2986
      case TSDB_DATA_TYPE_UBIGINT:                              \
2987
        return pval1->datum.u OP pval2->datum.u;                \
2988
      case TSDB_DATA_TYPE_BIGINT:                               \
2989
        return pval1->datum.i OP pval2->datum.i;                \
2990
      case TSDB_DATA_TYPE_FLOAT:                                \
2991
        return pval1->datum.d OP pval2->datum.d;                \
2992
      case TSDB_DATA_TYPE_BINARY: {                             \
2993
        if (pval1->datum.p == NULL || pval2->datum.p == NULL) { \
2994
          return pval1->datum.p OP pval2->datum.p;              \
2995
        }                                                       \
2996
        return strcmp(pval1->datum.p, pval2->datum.p) OP 0;     \
2997
      }                                                         \
2998
      default:                                                  \
2999
        return false;                                           \
3000
    }                                                           \
3001
  }
3002

NEW
3003
XNODE_DEF_OP_FUNC(op_greater_than, >)
×
NEW
3004
XNODE_DEF_OP_FUNC(op_greater_equal, >=)
×
NEW
3005
XNODE_DEF_OP_FUNC(op_lower_than, <)
×
NEW
3006
XNODE_DEF_OP_FUNC(op_lower_equal, <=)
×
NEW
3007
XNODE_DEF_OP_FUNC(op_equal, ==)
×
NEW
3008
XNODE_DEF_OP_FUNC(op_not_equal, !=)
×
3009

NEW
3010
static int32_t call_op_cmp(SXndWhereContext *pctx, FOpCmp opFn) {
×
NEW
3011
  int32_t     code = 0;
×
NEW
3012
  SValueNode *pval2 = (SValueNode *)taosArrayPop(pctx->stack);
×
NEW
3013
  SValueNode *pval1 = (SValueNode *)taosArrayPop(pctx->stack);
×
NEW
3014
  bool        ret = false;
×
3015

NEW
3016
  if (pval1->node.type != pval2->node.type) {
×
NEW
3017
    code = TSDB_CODE_MND_XNODE_WHERE_COL_TYPE_DIFF;
×
NEW
3018
    mError("xnode type not same, v1 type: %d, v2 type: %d", pval1->node.type, pval2->node.type);
×
NEW
3019
    goto _OVER;
×
3020
  } else {
NEW
3021
    mDebug("xnode type v1:%d, is null: %d, UB: %" PRIu64 ", v2 type: %d, is null: %d, UB: %" PRIu64, pval1->node.type,
×
3022
           pval1->isNull, pval1->datum.u, pval2->node.type, pval2->isNull, pval2->datum.u);
3023

NEW
3024
    ret = (*opFn)(pval1, pval2);
×
3025
  }
NEW
3026
  SXndRefValueNode pval = {0};
×
NEW
3027
  pval.nd.node.type = QUERY_NODE_VALUE;
×
NEW
3028
  pval.nd.node.resType.type = TSDB_DATA_TYPE_BOOL;
×
NEW
3029
  pval.nd.datum.b = ret;
×
NEW
3030
  if (NULL == taosArrayPush(pctx->stack, &pval)) {
×
NEW
3031
    mError("xnode evaluate walker array push error: %s", tstrerror(terrno));
×
NEW
3032
    code = terrno;
×
NEW
3033
    goto _OVER;
×
3034
  }
3035

NEW
3036
_OVER:
×
NEW
3037
  if (pval1 != NULL) freeSXndRefValueNode((SXndRefValueNode *)pval1);
×
NEW
3038
  if (pval2 != NULL) freeSXndRefValueNode((SXndRefValueNode *)pval2);
×
NEW
3039
  TAOS_RETURN(code);
×
3040
}
3041

3042
#define XND_WALKER_CHECK_GOTO(CMD, LABEL)    \
3043
  do {                                       \
3044
    pctx->code = (CMD);                      \
3045
    if ((pctx->code != TSDB_CODE_SUCCESS)) { \
3046
      goto LABEL;                            \
3047
    }                                        \
3048
  } while (0);
3049

NEW
3050
static EDealRes evaluateWaker(SNode *pNode, void *pWhereCtx) {
×
NEW
3051
  int32_t           code = 0;
×
NEW
3052
  SXndWhereContext *pctx = (SXndWhereContext *)pWhereCtx;
×
3053

NEW
3054
  if (nodeType(pNode) == QUERY_NODE_COLUMN) {
×
NEW
3055
    SColumnNode *colNode = (SColumnNode *)pNode;
×
3056
    SXndRefValueNode *pval =
NEW
3057
        (SXndRefValueNode *)taosHashGet(pctx->pMap, colNode->colName, strlen(colNode->colName) + 1);
×
NEW
3058
    if (pval == NULL) {
×
NEW
3059
      mError("xnode evaluateWhereCond hash get error: %s", tstrerror(terrno));
×
NEW
3060
      pctx->code = TSDB_CODE_MND_XNODE_WHERE_COL_NOT_EXIST;
×
NEW
3061
      return DEAL_RES_END;
×
3062
    }
NEW
3063
    if (NULL == taosArrayPush(pctx->stack, pval)) {
×
NEW
3064
      mError("xnode evaluate walker array push error: %s", tstrerror(terrno));
×
NEW
3065
      pctx->code = TSDB_CODE_FAILED;
×
NEW
3066
      return DEAL_RES_END;
×
3067
    }
NEW
3068
    return DEAL_RES_CONTINUE;
×
3069
  }
NEW
3070
  if (nodeType(pNode) == QUERY_NODE_VALUE) {
×
NEW
3071
    SValueNode *pval = (SValueNode *)pNode;
×
NEW
3072
    if (pval->node.resType.type == TSDB_DATA_TYPE_UBIGINT) {
×
NEW
3073
      pval->datum.u = taosStr2Int64(pval->literal, NULL, 10);
×
3074
    }
NEW
3075
    if (pval->node.resType.type == TSDB_DATA_TYPE_BINARY) {
×
NEW
3076
      pval->datum.p = taosStrndupi(pval->literal, strlen(pval->literal) + 1);
×
3077
    }
NEW
3078
    if (pval->node.resType.type == TSDB_DATA_TYPE_NULL) {
×
NEW
3079
      pval->isNull = true;
×
NEW
3080
      pval->datum.p = NULL;
×
3081
    }
NEW
3082
    if (pval->node.resType.type == TSDB_DATA_TYPE_BOOL) {
×
NEW
3083
      pval->datum.b = pval->literal[0] == 't' || pval->literal[0] == 'T';
×
3084
    }
NEW
3085
    SXndRefValueNode refVal = {0};
×
NEW
3086
    refVal.nd = *pval;
×
NEW
3087
    refVal.shouldFree = false;
×
NEW
3088
    if (NULL == taosArrayPush(pctx->stack, &refVal)) {
×
NEW
3089
      mError("xnode evaluate walker array push error: %s", tstrerror(terrno));
×
NEW
3090
      pctx->code = TSDB_CODE_FAILED;
×
NEW
3091
      return DEAL_RES_END;
×
3092
    }
NEW
3093
    return DEAL_RES_CONTINUE;
×
3094
  }
3095

NEW
3096
  if (nodeType(pNode) == QUERY_NODE_OPERATOR) {
×
NEW
3097
    SOperatorNode *opNode = (SOperatorNode *)pNode;
×
NEW
3098
    switch (opNode->opType) {
×
NEW
3099
      case OP_TYPE_GREATER_THAN: {
×
NEW
3100
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_greater_than), _exit);
×
NEW
3101
        break;
×
3102
      }
NEW
3103
      case OP_TYPE_GREATER_EQUAL: {
×
NEW
3104
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_greater_equal), _exit);
×
NEW
3105
        break;
×
3106
      }
NEW
3107
      case OP_TYPE_LOWER_THAN: {
×
NEW
3108
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_lower_than), _exit);
×
NEW
3109
        break;
×
3110
      }
NEW
3111
      case OP_TYPE_LOWER_EQUAL: {
×
NEW
3112
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_lower_equal), _exit);
×
NEW
3113
        break;
×
3114
      }
NEW
3115
      case OP_TYPE_EQUAL: {
×
NEW
3116
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_equal), _exit);
×
NEW
3117
        break;
×
3118
      }
NEW
3119
      case OP_TYPE_NOT_EQUAL: {
×
NEW
3120
        XND_WALKER_CHECK_GOTO(call_op_cmp(pctx, op_not_equal), _exit);
×
NEW
3121
        break;
×
3122
      }
NEW
3123
      default:
×
NEW
3124
        pctx->code = TSDB_CODE_MND_XNODE_WHERE_OP_NOT_SUPPORT;
×
NEW
3125
        return DEAL_RES_CONTINUE;
×
3126
    }
NEW
3127
    return DEAL_RES_CONTINUE;
×
3128
  }
3129

NEW
3130
  if (nodeType(pNode) == QUERY_NODE_LOGIC_CONDITION) {
×
NEW
3131
    SLogicConditionNode *logicNode = (SLogicConditionNode *)pNode;
×
NEW
3132
    SXndRefValueNode     pval = {0};
×
NEW
3133
    pval.nd.node.type = QUERY_NODE_VALUE;
×
NEW
3134
    pval.nd.node.resType.type = TSDB_DATA_TYPE_BOOL;
×
3135

NEW
3136
    switch (logicNode->condType) {
×
NEW
3137
      case LOGIC_COND_TYPE_AND: {
×
NEW
3138
        SValueNode *pval2 = (SValueNode *)taosArrayPop(pctx->stack);
×
NEW
3139
        SValueNode *pval1 = (SValueNode *)taosArrayPop(pctx->stack);
×
3140

NEW
3141
        pval.nd.datum.b = pval1->datum.b && pval2->datum.b;
×
NEW
3142
        if (NULL == taosArrayPush(pctx->stack, &pval)) {
×
NEW
3143
          mError("xnode walker AND array push err: %s", tstrerror(terrno));
×
NEW
3144
          pctx->code = TSDB_CODE_FAILED;
×
NEW
3145
          return DEAL_RES_END;
×
3146
        }
3147

NEW
3148
        freeSXndRefValueNode((SXndRefValueNode *)pval1);
×
NEW
3149
        freeSXndRefValueNode((SXndRefValueNode *)pval2);
×
NEW
3150
        break;
×
3151
      }
NEW
3152
      case LOGIC_COND_TYPE_OR: {
×
NEW
3153
        SValueNode *pval2 = (SValueNode *)taosArrayPop(pctx->stack);
×
NEW
3154
        SValueNode *pval1 = (SValueNode *)taosArrayPop(pctx->stack);
×
3155

NEW
3156
        pval.nd.datum.b = pval1->datum.b || pval2->datum.b;
×
NEW
3157
        if (NULL == taosArrayPush(pctx->stack, &pval)) {
×
NEW
3158
          mError("xnode walker OR array push err: %s", tstrerror(terrno));
×
NEW
3159
          pctx->code = TSDB_CODE_FAILED;
×
NEW
3160
          return DEAL_RES_END;
×
3161
        }
3162

NEW
3163
        freeSXndRefValueNode((SXndRefValueNode *)pval1);
×
NEW
3164
        freeSXndRefValueNode((SXndRefValueNode *)pval2);
×
NEW
3165
        break;
×
3166
      }
NEW
3167
      case LOGIC_COND_TYPE_NOT: {
×
NEW
3168
        SValueNode *pval1 = (SValueNode *)taosArrayPop(pctx->stack);
×
3169

NEW
3170
        pval.nd.datum.b = !pval1->datum.b;
×
NEW
3171
        if (NULL == taosArrayPush(pctx->stack, &pval)) {
×
NEW
3172
          mError("xnode walker NOT array push err: %s", tstrerror(terrno));
×
NEW
3173
          pctx->code = TSDB_CODE_FAILED;
×
NEW
3174
          return DEAL_RES_END;
×
3175
        }
3176

NEW
3177
        freeSXndRefValueNode((SXndRefValueNode *)pval1);
×
NEW
3178
        break;
×
3179
      }
NEW
3180
      default:
×
NEW
3181
        break;
×
3182
    }
NEW
3183
    return DEAL_RES_CONTINUE;
×
3184
  }
3185

NEW
3186
  pctx->code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
3187

NEW
3188
_exit:
×
NEW
3189
  return DEAL_RES_END;
×
3190
}
3191

NEW
3192
static bool evaluateWhereCond(SNode *pWhere, SHashObj *pDataMap, int32_t *code) {
×
NEW
3193
  bool             ret = false;
×
NEW
3194
  SXndWhereContext ctx = {0};
×
3195

NEW
3196
  ctx.stack = taosArrayInit(64, sizeof(SXndRefValueNode));
×
NEW
3197
  if (ctx.stack == NULL) {
×
NEW
3198
    mError("xnode evaluateWhereCond error: %s", tstrerror(terrno));
×
NEW
3199
    *code = terrno;
×
NEW
3200
    goto _exit;
×
3201
  }
NEW
3202
  ctx.pMap = pDataMap;
×
3203

3204
  // walkexpr pnode
NEW
3205
  nodesWalkExprPostOrder(pWhere, evaluateWaker, &ctx);
×
NEW
3206
  if (ctx.code != TSDB_CODE_SUCCESS) {
×
NEW
3207
    *code = ctx.code;
×
NEW
3208
    mError("xnode walkExpr error: %s", tstrerror(ctx.code));
×
NEW
3209
    goto _exit;
×
3210
  }
NEW
3211
  SValueNode *pval = taosArrayGetLast(ctx.stack);
×
NEW
3212
  if (pval == NULL) {
×
NEW
3213
    *code = terrno;
×
NEW
3214
    mError("xnode evaluateWhereCond error: %s", tstrerror(terrno));
×
NEW
3215
    goto _exit;
×
3216
  }
NEW
3217
  mDebug("xnode ctx stack size:%lu, last nd type:%d, bool:%d", ctx.stack->size, pval->node.type, pval->datum.b);
×
NEW
3218
  ret = pval->datum.b;
×
3219

NEW
3220
_exit:
×
NEW
3221
  freeSXndWhereContext(&ctx);
×
NEW
3222
  return ret;
×
3223
}
3224

NEW
3225
static int32_t filterJobsByWhereCond(SNode *pWhere, SArray *pArray, SArray **ppResult) {
×
NEW
3226
  int32_t code = 0;
×
3227

NEW
3228
  *ppResult = taosArrayInit(64, sizeof(SXnodeJobObj));
×
NEW
3229
  if (*ppResult == NULL) {
×
NEW
3230
    code = terrno;
×
NEW
3231
    goto _exit;
×
3232
  }
NEW
3233
  for (int32_t i = 0; i < pArray->size; i++) {
×
NEW
3234
    SXnodeJobObj *pJob = taosArrayGet(pArray, i);
×
3235

NEW
3236
    SHashObj *pDataMap = NULL;
×
NEW
3237
    if ((pDataMap = convertJob2Map(pJob)) == NULL) {
×
NEW
3238
      mError("xnode evaluate convertJow2Map error: %s", tstrerror(terrno));
×
NEW
3239
      goto _exit;
×
3240
    }
NEW
3241
    if (evaluateWhereCond(pWhere, pDataMap, &code)) {
×
NEW
3242
      if (NULL == taosArrayPush(*ppResult, pJob)) {
×
NEW
3243
        mError("xnode filterJobsByWhereCond array push err: %s", tstrerror(terrno));
×
NEW
3244
        code = TSDB_CODE_FAILED;
×
NEW
3245
        goto _exit;
×
3246
      }
3247
    }
NEW
3248
    if (code != TSDB_CODE_SUCCESS) {
×
NEW
3249
      goto _exit;
×
3250
    }
3251
  }
3252

NEW
3253
_exit:
×
NEW
3254
  TAOS_RETURN(code);
×
3255
}
3256

3257
#define XND_LOG_END(code, lino)                                                                 \
3258
  do {                                                                                          \
3259
    if (code != TSDB_CODE_SUCCESS) {                                                            \
3260
      mError("%s failed at line %d code: %d, since %s", __func__, lino, code, tstrerror(code)); \
3261
    }                                                                                           \
3262
  } while (0)
3263

NEW
3264
void httpRebalanceAuto(SArray *pResult) {
×
NEW
3265
  int32_t code = 0;
×
NEW
3266
  int32_t lino = 0;
×
NEW
3267
  SJson *pJsonArr = NULL;
×
NEW
3268
  char  *pContStr = NULL;
×
3269
  // convert pResult to [(tid, jid)*]
NEW
3270
  pJsonArr = tjsonCreateArray();
×
NEW
3271
  if (pJsonArr == NULL) {
×
NEW
3272
    code = terrno;
×
NEW
3273
    mError("xnode json array error: %s", tstrerror(code));
×
NEW
3274
    goto _OVER;
×
3275
  }
NEW
3276
  for (int32_t i = 0; i < pResult->size; i++) {
×
NEW
3277
    SXnodeJobObj *pJob = taosArrayGet(pResult, i);
×
NEW
3278
    SJson        *pJsonObj = tjsonCreateObject();
×
NEW
3279
    if (pJsonObj == NULL) {
×
NEW
3280
      code = terrno;
×
NEW
3281
      mError("xnode json object error: %s", tstrerror(code));
×
NEW
3282
      goto _OVER;
×
3283
    }
NEW
3284
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJsonObj, "tid", pJob->taskId), &lino, _OVER);
×
NEW
3285
    TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJsonObj, "jid", pJob->id), &lino, _OVER);
×
NEW
3286
    TAOS_CHECK_GOTO(tjsonAddItemToArray(pJsonArr, pJsonObj), &lino, _OVER);
×
3287
  }
3288

NEW
3289
  pContStr = tjsonToUnformattedString(pJsonArr);
×
NEW
3290
  if (pContStr == NULL) {
×
NEW
3291
    mError("xnode to json string error: %s", tstrerror(terrno));
×
NEW
3292
    goto _OVER;
×
3293
  }
NEW
3294
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
×
NEW
3295
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/rebalance/auto", XNODED_PIPE_SOCKET_URL);
×
NEW
3296
  (void)mndSendReqRetJson(xnodeUrl, HTTP_TYPE_POST, defaultTimeout, pContStr, strlen(pContStr));
×
3297

NEW
3298
_OVER:
×
NEW
3299
  if (pJsonArr != NULL) tjsonDelete(pJsonArr);
×
NEW
3300
  if (pContStr != NULL) taosMemoryFree(pContStr);
×
NEW
3301
  XND_LOG_END(code, lino);
×
NEW
3302
  return;
×
3303
}
3304

NEW
3305
static int32_t mndProcessRebalanceXnodeJobsWhereReq(SRpcMsg *pReq) {
×
NEW
3306
  mDebug("xnode reblance xnode jobs where req, content len:%d", pReq->contLen);
×
NEW
3307
  int32_t                      code = 0;
×
NEW
3308
  SMnode                      *pMnode = pReq->info.node;
×
NEW
3309
  SMRebalanceXnodeJobsWhereReq rebalanceReq = {0};
×
NEW
3310
  SNode                       *pWhere = NULL;
×
NEW
3311
  SArray                      *pArray = NULL;
×
NEW
3312
  SArray                      *pResult = NULL;
×
3313

NEW
3314
  TAOS_CHECK_GOTO(tDeserializeSMRebalanceXnodeJobsWhereReq(pReq->pCont, pReq->contLen, &rebalanceReq), NULL, _OVER);
×
NEW
3315
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_REBALANCE_XNODE_JOB), NULL, _OVER);
×
3316

NEW
3317
  TAOS_CHECK_GOTO(mndAcquireXnodeJobsAll(pMnode, &pArray), NULL, _OVER);
×
NEW
3318
  if (NULL != rebalanceReq.ast.ptr) {
×
NEW
3319
    TAOS_CHECK_GOTO(nodesStringToNode(rebalanceReq.ast.ptr, &pWhere), NULL, _OVER);
×
3320

NEW
3321
    TAOS_CHECK_GOTO(filterJobsByWhereCond(pWhere, pArray, &pResult), NULL, _OVER);
×
NEW
3322
    httpRebalanceAuto(pResult);
×
3323
  } else {
NEW
3324
    httpRebalanceAuto(pArray);
×
3325
  }
3326

NEW
3327
_OVER:
×
NEW
3328
  if (pWhere != NULL) {
×
NEW
3329
    nodesDestroyNode(pWhere);
×
3330
  }
NEW
3331
  if (pArray != NULL) {
×
NEW
3332
    taosArrayDestroy(pArray);
×
3333
  }
NEW
3334
  if (pResult != NULL) {
×
NEW
3335
    taosArrayDestroy(pResult);
×
3336
  }
NEW
3337
  tFreeSMRebalanceXnodeJobsWhereReq(&rebalanceReq);
×
NEW
3338
  TAOS_RETURN(code);
×
3339
}
3340

NEW
3341
static int32_t mndProcessDropXnodeJobReq(SRpcMsg *pReq) {
×
NEW
3342
  mDebug("drop xnode job req, content len:%d", pReq->contLen);
×
NEW
3343
  SMnode           *pMnode = pReq->info.node;
×
NEW
3344
  int32_t           code = -1;
×
NEW
3345
  SXnodeJobObj     *pObj = NULL;
×
NEW
3346
  SMDropXnodeJobReq dropReq = {0};
×
3347

NEW
3348
  TAOS_CHECK_GOTO(tDeserializeSMDropXnodeJobReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
×
3349

NEW
3350
  mDebug("DropXnodeJob with jid:%d, tid:%d, start to drop", dropReq.jid, dropReq.tid);
×
NEW
3351
  TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, NULL, MND_OPER_DROP_XNODE_JOB), NULL, _OVER);
×
3352

NEW
3353
  if (dropReq.jid <= 0) {
×
NEW
3354
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
NEW
3355
    goto _OVER;
×
3356
  }
3357

NEW
3358
  pObj = mndAcquireXnodeJob(pMnode, dropReq.jid);
×
NEW
3359
  if (pObj == NULL) {
×
NEW
3360
    code = terrno;
×
NEW
3361
    goto _OVER;
×
3362
  }
3363

NEW
3364
  code = mndDropXnodeJob(pMnode, pReq, pObj);
×
NEW
3365
  if (code == 0) {
×
NEW
3366
    code = TSDB_CODE_ACTION_IN_PROGRESS;
×
3367
  }
3368

NEW
3369
_OVER:
×
NEW
3370
  if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
×
NEW
3371
    mError("xnode:%d, failed to drop since %s", dropReq.jid, tstrerror(code));
×
3372
  }
NEW
3373
  mndReleaseXnodeJob(pMnode, pObj);
×
NEW
3374
  tFreeSMDropXnodeJobReq(&dropReq);
×
NEW
3375
  TAOS_RETURN(code);
×
3376
}
3377

3378
/**
3379
 * @brief Mapping the columns of show xnode jobs
3380
 *
3381
 * See [xnodeTaskJobSchema] in systable.h.
3382
 *
3383
 *  {.name = "jid", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
3384
    {.name = "tid", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
3385
    {.name = "config", .bytes = TSDB_XNODE_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo =
3386
 false},
3387
    {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
3388
    // {.name = "reason", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
3389
    {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
3390
    {.name = "update_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
3391
 * @param pReq
3392
 * @param pShow
3393
 * @param pBlock
3394
 * @param rows
3395
 * @return int32_t
3396
 */
3397
static int32_t mndRetrieveXnodeJobs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
352✔
3398
  SMnode       *pMnode = pReq->info.node;
352✔
3399
  SSdb         *pSdb = pMnode->pSdb;
352✔
3400
  int32_t       numOfRows = 0;
352✔
3401
  int32_t       cols = 0;
352✔
3402
  SXnodeJobObj *pObj = NULL;
352✔
3403
  char          buf[VARSTR_HEADER_SIZE + TMAX(TSDB_XNODE_TASK_JOB_CONFIG_LEN, TSDB_XNODE_TASK_REASON_LEN)];
352✔
3404
  char          status[64] = {0};
352✔
3405
  int32_t       code = 0;
352✔
3406
  mDebug("show.type:%d, %s:%d: retrieve xnode jobs with rows: %d", pShow->type, __FILE__, __LINE__, rows);
352✔
3407

3408
  while (numOfRows < rows) {
352✔
3409
    pShow->pIter = sdbFetch(pSdb, SDB_XNODE_JOB, pShow->pIter, (void **)&pObj);
352✔
3410
    if (pShow->pIter == NULL) break;
352✔
3411

NEW
3412
    cols = 0;
×
NEW
3413
    SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
3414
    // id
NEW
3415
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->id, false);
×
NEW
3416
    if (code != 0) goto _end;
×
3417
    // tid
NEW
3418
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3419
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->taskId, false);
×
NEW
3420
    if (code != 0) goto _end;
×
3421

3422
    // config
NEW
3423
    buf[0] = 0;
×
NEW
3424
    STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->config, pShow->pMeta->pSchemas[cols].bytes);
×
NEW
3425
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3426
    code = colDataSetVal(pColInfo, numOfRows, buf, false);
×
NEW
3427
    if (code != 0) goto _end;
×
3428

3429
    // via
NEW
3430
    if (pObj->via != 0) {
×
NEW
3431
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3432
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->via, false);
×
NEW
3433
      if (code != 0) goto _end;
×
3434
    } else {
NEW
3435
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3436
      colDataSetNULL(pColInfo, numOfRows);
×
3437
    }
3438

3439
    // xnode_id
NEW
3440
    if (pObj->xnodeId != 0) {
×
NEW
3441
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3442
      code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->xnodeId, false);
×
NEW
3443
      if (code != 0) goto _end;
×
3444
    } else {
NEW
3445
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3446
      colDataSetNULL(pColInfo, numOfRows);
×
3447
    }
3448

3449
    // status
NEW
3450
    if (pObj->statusLen > 0) {
×
NEW
3451
      buf[0] = 0;
×
NEW
3452
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->status, pShow->pMeta->pSchemas[cols].bytes);
×
NEW
3453
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3454
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
NEW
3455
      if (code != 0) goto _end;
×
3456
    } else {
NEW
3457
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3458
      colDataSetNULL(pColInfo, numOfRows);
×
3459
    }
3460

3461
    // reason
NEW
3462
    if (pObj->reasonLen > 0) {
×
NEW
3463
      buf[0] = 0;
×
NEW
3464
      STR_WITH_MAXSIZE_TO_VARSTR(buf, pObj->reason, pShow->pMeta->pSchemas[cols].bytes);
×
NEW
3465
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3466
      code = colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
×
NEW
3467
      if (code != 0) goto _end;
×
3468
    } else {
NEW
3469
      pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3470
      colDataSetNULL(pColInfo, numOfRows);
×
3471
    }
3472

3473
    // create_time
NEW
3474
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3475
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createTime, false);
×
NEW
3476
    if (code != 0) goto _end;
×
3477

3478
    // update_time
NEW
3479
    pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
×
NEW
3480
    code = colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->updateTime, false);
×
NEW
3481
    if (code != 0) goto _end;
×
3482

NEW
3483
    numOfRows++;
×
NEW
3484
    sdbRelease(pSdb, pObj);
×
3485
  }
3486

3487
_end:
352✔
3488
  if (code != 0) sdbRelease(pSdb, pObj);
352✔
3489

3490
  pShow->numOfRows += numOfRows;
352✔
3491
  return numOfRows;
352✔
3492
}
NEW
3493
static void mndCancelGetNextXnodeJob(SMnode *pMnode, void *pIter) {
×
NEW
3494
  SSdb *pSdb = pMnode->pSdb;
×
NEW
3495
  sdbCancelFetchByType(pSdb, pIter, SDB_XNODE_JOB);
×
NEW
3496
}
×
3497

3498
static int32_t mndRetrieveXnodeAgents(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
352✔
3499
  SMnode    *pMnode = pReq->info.node;
352✔
3500
  SSdb      *pSdb = pMnode->pSdb;
352✔
3501
  int32_t    numOfRows = 0;
352✔
3502
  int32_t    cols = 0;
352✔
3503
  SXnodeObj *pObj = NULL;
352✔
3504
  char       buf[TSDB_ANALYTIC_ALGO_NAME_LEN + VARSTR_HEADER_SIZE];
3505
  int32_t    code = 0;
352✔
3506

3507
  while (numOfRows < rows) {
352✔
3508
    pShow->pIter = sdbFetch(pSdb, SDB_XNODE_AGENT, pShow->pIter, (void **)&pObj);
352✔
3509
    if (pShow->pIter == NULL) break;
352✔
3510

3511
    // todo: add agent
3512

NEW
3513
    sdbRelease(pSdb, pObj);
×
3514
  }
3515

NEW
3516
_end:
×
3517
  if (code != 0) sdbRelease(pSdb, pObj);
352✔
3518

3519
  pShow->numOfRows += numOfRows;
352✔
3520
  return numOfRows;
352✔
3521
}
3522

NEW
3523
static void mndCancelGetNextXnodeAgent(SMnode *pMnode, void *pIter) {
×
NEW
3524
  SSdb *pSdb = pMnode->pSdb;
×
NEW
3525
  sdbCancelFetchByType(pSdb, pIter, SDB_XNODE_AGENT);
×
NEW
3526
}
×
3527

NEW
3528
static size_t taosCurlWriteData(char *pCont, size_t contLen, size_t nmemb, void *userdata) {
×
NEW
3529
  SCurlResp *pRsp = userdata;
×
NEW
3530
  if (contLen == 0 || nmemb == 0 || pCont == NULL) {
×
NEW
3531
    pRsp->dataLen = 0;
×
NEW
3532
    pRsp->data = NULL;
×
NEW
3533
    uError("curl response is received, len:%" PRId64, pRsp->dataLen);
×
NEW
3534
    return 0;
×
3535
  }
3536

NEW
3537
  int64_t newDataSize = (int64_t)contLen * nmemb;
×
NEW
3538
  int64_t size = pRsp->dataLen + newDataSize;
×
3539

NEW
3540
  if (pRsp->data == NULL) {
×
NEW
3541
    pRsp->data = taosMemoryMalloc(size + 1);
×
NEW
3542
    if (pRsp->data == NULL) {
×
NEW
3543
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
NEW
3544
      return 0;  // return the recv length, if failed, return 0
×
3545
    }
3546
  } else {
NEW
3547
    char *p = taosMemoryRealloc(pRsp->data, size + 1);
×
NEW
3548
    if (p == NULL) {
×
NEW
3549
      uError("failed to prepare recv buffer for post rsp, len:%d, code:%s", (int32_t)size + 1, tstrerror(terrno));
×
NEW
3550
      return 0;  // return the recv length, if failed, return 0
×
3551
    }
3552

NEW
3553
    pRsp->data = p;
×
3554
  }
3555

NEW
3556
  if (pRsp->data != NULL) {
×
NEW
3557
    (void)memcpy(pRsp->data + pRsp->dataLen, pCont, newDataSize);
×
3558

NEW
3559
    pRsp->dataLen = size;
×
NEW
3560
    pRsp->data[size] = 0;
×
3561

NEW
3562
    uDebugL("curl response is received, len:%" PRId64 ", content:%s", size, pRsp->data);
×
NEW
3563
    return newDataSize;
×
3564
  } else {
NEW
3565
    pRsp->dataLen = 0;
×
NEW
3566
    uError("failed to malloc curl response");
×
NEW
3567
    return 0;
×
3568
  }
3569
}
3570

3571
#ifndef WINDOWS
3572
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) {
118✔
3573
  CURL   *curl = NULL;
118✔
3574
  int32_t code = 0;
118✔
3575
  int32_t lino = 0;
118✔
3576

3577
  curl = curl_easy_init();
118✔
3578
  if (curl == NULL) {
118✔
NEW
3579
    uError("failed to create curl handle");
×
NEW
3580
    return -1;
×
3581
  }
3582

3583
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_UNIX_SOCKET_PATH, socketPath), &lino, _OVER);
118✔
3584
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_URL, url), &lino, _OVER);
118✔
3585
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData), &lino, _OVER);
118✔
3586
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp), &lino, _OVER);
118✔
3587
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout), &lino, _OVER);
118✔
3588

3589
  uDebug("curl get request will sent, url:%s", url);
118✔
3590
  CURLcode curlCode = curl_easy_perform(curl);
118✔
3591
  if (curlCode != CURLE_OK) {
118✔
3592
    if (curlCode == CURLE_OPERATION_TIMEDOUT) {
118✔
NEW
3593
      mError("xnode failed to perform curl action, code:%d", curlCode);
×
NEW
3594
      code = TSDB_CODE_MND_XNODE_URL_RESP_TIMEOUT;
×
NEW
3595
      goto _OVER;
×
3596
    }
3597
    uError("failed to perform curl action, code:%d", curlCode);
118✔
3598
    code = TSDB_CODE_MND_XNODE_URL_CANT_ACCESS;
118✔
3599
    goto _OVER;
118✔
3600
  }
3601

NEW
3602
  long http_code = 0;
×
NEW
3603
  TAOS_CHECK_GOTO(curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code), &lino, _OVER);
×
NEW
3604
  if (http_code != 200) {
×
NEW
3605
    code = TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR;
×
3606
  }
3607

3608
_OVER:
118✔
3609
  if (curl != NULL) curl_easy_cleanup(curl);
118✔
3610
  XND_LOG_END(code, lino);
118✔
3611
  return code;
118✔
3612
}
3613

3614
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout,
236✔
3615
                                   const char *socketPath) {
3616
  struct curl_slist *headers = NULL;
236✔
3617
  CURL              *curl = NULL;
236✔
3618
  int32_t            code = 0;
236✔
3619
  int32_t            lino = 0;
236✔
3620

3621
  curl = curl_easy_init();
236✔
3622
  if (curl == NULL) {
236✔
NEW
3623
    mError("xnode failed to create curl handle");
×
NEW
3624
    return -1;
×
3625
  }
3626

3627
  headers = curl_slist_append(headers, "Content-Type:application/json;charset=UTF-8");
236✔
3628
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_UNIX_SOCKET_PATH, socketPath), &lino, _OVER);
236✔
3629
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers), &lino, _OVER);
236✔
3630
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_URL, url), &lino, _OVER);
236✔
3631
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData), &lino, _OVER);
236✔
3632
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp), &lino, _OVER);
236✔
3633
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout), &lino, _OVER);
236✔
3634
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_POST, 1), &lino, _OVER);
236✔
3635
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, bufLen), &lino, _OVER);
236✔
3636
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf), &lino, _OVER);
236✔
3637
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L), &lino, _OVER);
236✔
3638
  TAOS_CHECK_GOTO(curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L), &lino, _OVER);
236✔
3639

3640
  mDebug("xnode curl post request will sent, url:%s len:%d content:%s", url, bufLen, buf);
236✔
3641
  CURLcode curlCode = curl_easy_perform(curl);
236✔
3642

3643
  if (curlCode != CURLE_OK) {
236✔
3644
    if (curlCode == CURLE_OPERATION_TIMEDOUT) {
236✔
NEW
3645
      mError("xnode failed to perform curl action, code:%d", curlCode);
×
NEW
3646
      code = TSDB_CODE_MND_XNODE_URL_RESP_TIMEOUT;
×
NEW
3647
      goto _OVER;
×
3648
    }
3649
    uError("xnode failed to perform curl action, code:%d", curlCode);
236✔
3650
    code = TSDB_CODE_MND_XNODE_URL_CANT_ACCESS;
236✔
3651
    goto _OVER;
236✔
3652
  }
3653

NEW
3654
  long http_code = 0;
×
NEW
3655
  TAOS_CHECK_GOTO(curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code), &lino, _OVER);
×
NEW
3656
  if (http_code != 200) {
×
NEW
3657
    mError("xnode failed to perform curl action, http code:%ld", http_code);
×
NEW
3658
    code = TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR;
×
3659
  }
3660

3661
_OVER:
236✔
3662
  if (curl != NULL) {
236✔
3663
    curl_slist_free_all(headers);
236✔
3664
    curl_easy_cleanup(curl);
236✔
3665
  }
3666
  XND_LOG_END(code, lino);
236✔
3667
  return code;
236✔
3668
}
3669

3670
static int32_t taosCurlDeleteRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) {
118✔
3671
  CURL   *curl = NULL;
118✔
3672
  int32_t code = 0;
118✔
3673
  int32_t lino = 0;
118✔
3674

3675
  curl = curl_easy_init();
118✔
3676
  if (curl == NULL) {
118✔
NEW
3677
    uError("xnode failed to create curl handle");
×
NEW
3678
    return -1;
×
3679
  }
3680

3681
  if (curl_easy_setopt(curl, CURLOPT_UNIX_SOCKET_PATH, socketPath)) goto _OVER;
118✔
3682
  if (curl_easy_setopt(curl, CURLOPT_URL, url) != 0) goto _OVER;
118✔
3683
  if (curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE") != 0) goto _OVER;
118✔
3684
  if (curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, taosCurlWriteData) != 0) goto _OVER;
118✔
3685
  if (curl_easy_setopt(curl, CURLOPT_WRITEDATA, pRsp) != 0) goto _OVER;
118✔
3686
  if (curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS, timeout) != 0) goto _OVER;
118✔
3687

3688
  uDebug("xnode curl get request will sent, url:%s", url);
118✔
3689
  CURLcode curlCode = curl_easy_perform(curl);
118✔
3690
  if (curlCode != CURLE_OK) {
118✔
3691
    uError("xnode failed to perform curl action, curl code:%d", curlCode);
118✔
3692
    if (curlCode == CURLE_OPERATION_TIMEDOUT) {
118✔
NEW
3693
      code = TSDB_CODE_MND_XNODE_URL_RESP_TIMEOUT;
×
NEW
3694
      goto _OVER;
×
3695
    }
3696
    code = TSDB_CODE_MND_XNODE_URL_CANT_ACCESS;
118✔
3697
    goto _OVER;
118✔
3698
  }
3699

NEW
3700
  long http_code = 0;
×
NEW
3701
  TAOS_CHECK_GOTO(curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code), &lino, _OVER);
×
NEW
3702
  if (http_code != 200 && http_code != 204) {
×
NEW
3703
    uError("xnode curl request response http code:%ld", http_code);
×
NEW
3704
    code = TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR;
×
3705
  }
3706

3707
_OVER:
118✔
3708
  if (curl != NULL) curl_easy_cleanup(curl);
118✔
3709
  XND_LOG_END(code, lino);
118✔
3710
  return code;
118✔
3711
}
3712
#else
3713
static int32_t taosCurlGetRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) { return 0; }
3714
static int32_t taosCurlPostRequest(const char *url, SCurlResp *pRsp, const char *buf, int32_t bufLen, int32_t timeout,
3715
                                   const char *socketPath) {
3716
  return 0;
3717
}
3718
static int32_t taosCurlDeleteRequest(const char *url, SCurlResp *pRsp, int32_t timeout, const char *socketPath) { return 0; }
3719
#endif
3720
SJson *mndSendReqRetJson(const char *url, EHttpType type, int64_t timeout, const char *buf, int64_t bufLen) {
472✔
3721
  SJson    *pJson = NULL;
472✔
3722
  SCurlResp curlRsp = {0};
472✔
3723
  char      socketPath[PATH_MAX] = {0};
472✔
3724

3725
  getXnodedPipeName(socketPath, sizeof(socketPath));
472✔
3726
  if (type == HTTP_TYPE_GET) {
472✔
3727
    if ((terrno = taosCurlGetRequest(url, &curlRsp, timeout, socketPath)) != 0) {
118✔
3728
      goto _OVER;
118✔
3729
    }
3730
  } else if (type == HTTP_TYPE_POST) {
354✔
3731
    if ((terrno = taosCurlPostRequest(url, &curlRsp, buf, bufLen, timeout, socketPath)) != 0) {
236✔
3732
      goto _OVER;
236✔
3733
    }
3734
  } else if (type == HTTP_TYPE_DELETE) {
118✔
3735
    if ((terrno = taosCurlDeleteRequest(url, &curlRsp, timeout, socketPath)) != 0) {
118✔
3736
      goto _OVER;
118✔
3737
    }
3738
  } else {
NEW
3739
    uError("xnode invalid http type:%d", type);
×
NEW
3740
    terrno = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
NEW
3741
    goto _OVER;
×
3742
  }
3743

NEW
3744
  if (curlRsp.data == NULL || curlRsp.dataLen == 0) {
×
NEW
3745
    pJson = tjsonCreateObject();
×
NEW
3746
    goto _OVER;
×
3747
  }
3748

NEW
3749
  pJson = tjsonParse(curlRsp.data);
×
NEW
3750
  if (pJson == NULL) {
×
NEW
3751
    terrno = TSDB_CODE_INVALID_JSON_FORMAT;
×
NEW
3752
    goto _OVER;
×
3753
  }
3754

3755
_OVER:
472✔
3756
  if (curlRsp.data != NULL) taosMemoryFreeClear(curlRsp.data);
472✔
3757
  if (terrno != TSDB_CODE_SUCCESS) {
472✔
3758
    mError("xnode failed to send request, since:%s", tstrerror(terrno));
472✔
3759
  }
3760
  return pJson;
472✔
3761
}
3762

3763
static int32_t mndGetXnodeStatus(SXnodeObj *pObj, char *status, int32_t statusLen) {
118✔
3764
  int32_t code = 0;
118✔
3765
  SJson  *pJson = NULL;
118✔
3766

3767
  char xnodeUrl[TSDB_XNODE_URL_LEN + 1] = {0};
118✔
3768
  snprintf(xnodeUrl, TSDB_XNODE_URL_LEN, "%s/xnode/%d", XNODED_PIPE_SOCKET_URL, pObj->id);
118✔
3769
  pJson = mndSendReqRetJson(xnodeUrl, HTTP_TYPE_GET, defaultTimeout, NULL, 0);
118✔
3770
  if (pJson == NULL) {
118✔
3771
    code = terrno;
118✔
3772
    goto _OVER;
118✔
3773
  }
3774

NEW
3775
  code = tjsonGetStringValue2(pJson, "status", status, statusLen);
×
NEW
3776
  if (code < 0) {
×
NEW
3777
    code = TSDB_CODE_INVALID_JSON_FORMAT;
×
NEW
3778
    goto _OVER;
×
3779
  }
NEW
3780
  if (strlen(status) == 0) {
×
NEW
3781
    code = TSDB_CODE_MND_XNODE_INVALID_MSG;
×
NEW
3782
    goto _OVER;
×
3783
  }
3784

3785
_OVER:
118✔
3786
  if (pJson != NULL) tjsonDelete(pJson);
118✔
3787
  TAOS_RETURN(code);
118✔
3788
}
3789

3790
/** xnoded mgmt section **/
3791

3792
void mndStartXnoded(SMnode *pMnode, int32_t userLen, char *user, int32_t passLen, char *pass) {
118✔
3793
  int32_t   code = 0;
118✔
3794
  SXnodeOpt pOption = {0};
118✔
3795

3796
  pOption.dnodeId = pMnode->selfDnodeId;
118✔
3797
  pOption.clusterId = pMnode->clusterId;
118✔
3798

3799
  SEpSet epset = mndGetDnodeEpsetById(pMnode, pMnode->selfDnodeId);
118✔
3800
  if (epset.numOfEps == 0) {
118✔
NEW
3801
    mError("xnode failed to start xnoded, dnode:%d", pMnode->selfDnodeId);
×
NEW
3802
    return;
×
3803
  }
3804
  pOption.ep = epset.eps[0];
118✔
3805
  // add user password
3806
  pOption.upLen = userLen + passLen;
118✔
3807
  snprintf(pOption.userPass, XNODE_USER_PASS_LEN, "%s:%s", user, pass);
118✔
3808

3809
  if ((code = mndOpenXnd(&pOption)) != 0) {
118✔
3810
    mError("xnode failed to open xnd since %s, dnodeId:%d", tstrerror(code), pOption.dnodeId);
118✔
3811
    return;
118✔
3812
  }
3813
}
3814

3815
void mndXnodeHandleBecomeLeader(SMnode *pMnode) {
356,999✔
3816
  mInfo("mndxnode start to process mnode become leader");
356,999✔
3817
  SXnodeUserPassObj *pObj = mndAcquireFirstXnodeUserPass(pMnode);
356,999✔
3818
  if (pObj == NULL) {
356,999✔
3819
    mError("xnode failed to acquire xnoded user pass");
356,999✔
3820
    return;
356,999✔
3821
  }
3822

NEW
3823
  mndStartXnoded(pMnode, pObj->userLen, pObj->user, pObj->passLen, pObj->pass);
×
3824
}
3825

3826
void mndXnodeHandleBecomeNotLeader() {
49,616✔
3827
  mInfo("mndxnode handle mnode become not leader");
49,616✔
3828
  mndCloseXnd();
49,616✔
3829
}
49,616✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc