• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.4
/source/dnode/mgmt/mgmt_snode/src/smWorker.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http:www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "smInt.h"
18
#include "stream.h"
19

20
static void smProcessRunnerQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
12,391✔
21
  SSnodeMgmt     *pMgmt = pInfo->ahandle;
12,391✔
22
  const STraceId *trace = &pMsg->info.traceId;
12,391✔
23

24
  dDebug("msg:%p %d, get from snode-stream-runner queue", pMsg, pMsg->msgType);
12,391✔
25
  
26
  int32_t code = sndProcessStreamMsg(pMgmt->pSnode, pInfo->workerCb, pMsg);
12,391✔
27
  if (code < 0) {
12,390✔
28
    dGError("snd, msg:%p failed to process stream msg %s since %s", pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code));
63!
29
  }
30

31
  dTrace("msg:%p, is freed", pMsg);
12,390!
32
  rpcFreeCont(pMsg->pCont);
12,390✔
33
  taosFreeQitem(pMsg);
12,390✔
34
}
12,391✔
35

36
static void smSendErrorRrsp(SRpcMsg *pMsg, int32_t errCode) {
×
37
  SRpcMsg             rspMsg = {0};
×
38

39
  rspMsg.info = pMsg->info;
×
40
  rspMsg.pCont = NULL;
×
41
  rspMsg.contLen = 0;
×
42
  rspMsg.code = errCode;
×
43
  rspMsg.msgType = pMsg->msgType;
×
44

45
  tmsgSendRsp(&rspMsg);
×
46
}
×
47

48
static void smProcessStreamTriggerQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
66,155✔
49
  int32_t       code = TSDB_CODE_SUCCESS;
66,155✔
50
  int32_t       lino = 0;
66,155✔
51
  SBatchReq     batchReq = {0};
66,155✔
52
  int64_t       streamId = 0, taskId = 0;
66,155✔
53
  SStreamTask  *pTask = NULL;
66,155✔
54
  void         *taskAddr = NULL;
66,155✔
55
  SMsgSendInfo *ahandle = NULL;
66,155✔
56

57
  SSnodeMgmt *pMgmt = pInfo->ahandle;
66,155✔
58
  STraceId   *trace = &pMsg->info.traceId;
66,155✔
59
  dGDebug("msg:%p, get from snode-stream-trigger queue, type:%s %" PRIx64 ":%" PRIx64, pMsg, TMSG_INFO(pMsg->msgType), TRACE_GET_ROOTID(trace), TRACE_GET_MSGID(trace));
66,155!
60

61
  if (pMsg->msgType == TDMT_SND_BATCH_META) {
66,155!
62
    code = tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq);
×
63
    if (code != TSDB_CODE_SUCCESS) {
×
64
      dError("msg:%p, invalid batch meta request in snode-stream-trigger queue", pMsg);
×
65
      smSendErrorRrsp(pMsg, TSDB_CODE_INVALID_MSG);
×
66
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
67
    }
68
    SBatchMsg         *pReq = TARRAY_DATA(batchReq.pMsgs);
×
69
    SStreamProgressReq req = {0};
×
70
    code = tDeserializeStreamProgressReq(pReq->msg, pReq->msgLen, &req);
×
71
    if (code != TSDB_CODE_SUCCESS) {
×
72
      dError("msg:%p, invalid stream progress request in snode-stream-trigger queue", pMsg);
×
73
      smSendErrorRrsp(pMsg, TSDB_CODE_INVALID_MSG);
×
74
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
75
    }
76
    streamId = req.streamId;
×
77
    taskId = req.taskId;
×
78
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_CTRL) {
66,155✔
79
    SSTriggerCtrlRequest ctrlReq = {0};
15,542✔
80
    code = tDeserializeSTriggerCtrlRequest(pMsg->pCont, pMsg->contLen, &ctrlReq);
15,542✔
81
    if (code != TSDB_CODE_SUCCESS) {
15,572!
82
      dError("msg:%p, invalid trigger control request in snode-stream-trigger queue", pMsg);
×
83
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
84
    }
85
    streamId = ctrlReq.streamId;
15,546✔
86
    taskId = ctrlReq.taskId;
15,546✔
87
  } else {
88
    ahandle = pMsg->info.ahandle;
50,613✔
89
    if (ahandle == NULL) {
50,613!
90
      dError("empty ahandle for msg %s", TMSG_INFO(pMsg->msgType));
×
91
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
×
92
    }
93
    SSTriggerAHandle* pAhandle = ahandle->param;
50,613✔
94
    if (pAhandle == NULL) {
50,613!
95
      dError("empty trigger ahandle for msg %s", TMSG_INFO(pMsg->msgType));
×
96
      TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG);
14!
97
    }
98
    streamId = pAhandle->streamId;
50,627✔
99
    taskId = pAhandle->taskId;
50,627✔
100
  }
101

102
  TAOS_CHECK_EXIT(streamAcquireTask(streamId, taskId, &pTask, &taskAddr));
66,173!
103

104
  int64_t errTaskId = 0;
66,254✔
105
  code = stTriggerTaskProcessRsp(pTask, pMsg, &errTaskId);
66,254✔
106
  if (code != TSDB_CODE_SUCCESS) {
66,350✔
107
    streamHandleTaskError(pTask->streamId, errTaskId, code);
285✔
108
    TAOS_CHECK_EXIT(code);
285✔
109
  }
110

111
_exit:
66,071✔
112
  taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg);
66,350✔
113
  if (taskAddr != NULL) {
66,350!
114
    streamReleaseTask(taskAddr);
66,351✔
115
  }
116
  if (ahandle != NULL) {
66,356✔
117
    destroyAhandle(ahandle);
50,620✔
118
  }
119
  dTrace("msg:%p, is freed, code:%d", pMsg, code);
66,352!
120
  rpcFreeCont(pMsg->pCont);
66,352✔
121
  taosFreeQitem(pMsg);
66,370✔
122
  if (code) {
66,384✔
123
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
285!
124
  }
125
}
66,384✔
126
static int32_t smDispatchStreamTriggerRsp(struct SDispatchWorkerPool *pPool, void *pParam, int32_t *pWorkerIdx) {
66,401✔
127
  int32_t       code = TSDB_CODE_SUCCESS;
66,401✔
128
  int32_t       lino = 0;
66,401✔
129
  SBatchReq     batchReq = {0};
66,401✔
130
  SRpcMsg      *pMsg = (SRpcMsg *)pParam;
66,401✔
131
  int64_t       streamId = 0, taskId = 0, sessionId = 0;
66,401✔
132
  void         *taskAddr = NULL;
66,401✔
133
  SMsgSendInfo *ahandle = NULL;
66,401✔
134

135
  dDebug("dispatch snode %s msg", TMSG_INFO(pMsg->msgType));
66,401!
136

137
  if (pMsg->msgType == TDMT_SND_BATCH_META) {
66,401!
138
    code = tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq);
×
139
    if (code != TSDB_CODE_SUCCESS) {
×
140
      dError("msg:%p, failed to deserialize batch meta request", pMsg);
×
141
      TAOS_CHECK_EXIT(TSDB_CODE_MSG_NOT_PROCESSED);
×
142
    }
143
    SBatchMsg         *pReq = TARRAY_DATA(batchReq.pMsgs);
×
144
    SStreamProgressReq req = {0};
×
145
    code = tDeserializeStreamProgressReq(pReq->msg, pReq->msgLen, &req);
×
146
    if (code != TSDB_CODE_SUCCESS) {
×
147
      dError("msg:%p, failed to deserialize stream progress request", pMsg);
×
148
      TAOS_CHECK_EXIT(TSDB_CODE_MSG_NOT_PROCESSED);
×
149
    }
150
    streamId = req.streamId;
×
151
    taskId = req.taskId;
×
152
    sessionId = 1;
×
153
  } else if (pMsg->msgType == TDMT_STREAM_TRIGGER_CTRL) {
66,401✔
154
    SSTriggerCtrlRequest ctrlReq = {0};
15,772✔
155
    code = tDeserializeSTriggerCtrlRequest(pMsg->pCont, pMsg->contLen, &ctrlReq);
15,772✔
156
    if (code != TSDB_CODE_SUCCESS) {
15,772!
157
      dError("msg:%p, failed to deserialize trigger control request", pMsg);
×
158
      TAOS_CHECK_EXIT(TSDB_CODE_MSG_NOT_PROCESSED);
×
159
    }
160
    streamId = ctrlReq.streamId;
15,772✔
161
    taskId = ctrlReq.taskId;
15,772✔
162
    sessionId = ctrlReq.sessionId;
15,772✔
163
  } else {
164
    ahandle = pMsg->info.ahandle;
50,629✔
165
    if (ahandle == NULL) {
50,629!
166
      dError("empty ahandle for msg %s", TMSG_INFO(pMsg->msgType));
×
167
      TAOS_CHECK_EXIT(TSDB_CODE_MSG_NOT_PROCESSED);
×
168
    }
169
    SSTriggerAHandle* pAhandle = ahandle->param;
50,629✔
170
    if (pAhandle == NULL) {
50,629!
171
      dError("empty trigger ahandle for msg %s", TMSG_INFO(pMsg->msgType));
×
172
      TAOS_CHECK_EXIT(TSDB_CODE_MSG_NOT_PROCESSED);
×
173
    }
174
    streamId = pAhandle->streamId;
50,629✔
175
    taskId = pAhandle->taskId;
50,629✔
176
    sessionId = pAhandle->sessionId;
50,629✔
177
  }
178

179
  int64_t  buf[] = {streamId, taskId, sessionId};
66,401✔
180
  uint32_t hashVal = MurmurHash3_32((const char *)buf, sizeof(buf));
66,401✔
181
  *pWorkerIdx = hashVal % tsNumOfStreamTriggerThreads;
66,401✔
182

183
_exit:
66,401✔
184
  taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg);
66,401✔
185
  if (taskAddr != NULL) {
66,401!
186
    streamReleaseTask(taskAddr);
×
187
  }
188
  if (code) {
66,401!
189
    destroyAhandle(ahandle);
×
190
    //rpcFreeCont(pMsg->pCont);
191
    //taosFreeQitem(pMsg);
192
    stError("%s failed at line %d, error:%s", __FUNCTION__, lino, tstrerror(code));
×
193
  }
194

195
  return code;
66,401✔
196
}
197

198
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
226✔
199
  int32_t code = 0;
226✔
200

201
  SSingleWorkerCfg cfg = {
226✔
202
      .min = tsNumOfStreamRunnerThreads,
203
      .max = tsNumOfStreamRunnerThreads,
204
      .name = "snode-stream-runner",
205
      .fp = (FItem)smProcessRunnerQueue,
206
      .param = pMgmt,
207
      .poolType = QUERY_AUTO_QWORKER_POOL,
208
      .stopNoWaitQueue = true,
209
  };
210

211
  if ((code = tSingleWorkerInit(&pMgmt->runnerWorker, &cfg)) != 0) {
226!
212
    dError("failed to start snode runner worker since %s", tstrerror(code));
×
213
    return code;
×
214
  }
215

216
  SDispatchWorkerPool* pTriggerPool = &pMgmt->triggerWorkerPool;
226✔
217
  pTriggerPool->max = tsNumOfStreamTriggerThreads;
226✔
218
  pTriggerPool->name = "snode-stream-trigger";
226✔
219
  code = tDispatchWorkerInit(pTriggerPool);
226✔
220
  if (code != 0) {
226!
221
    dError("failed to start snode stream-trigger worker since %s", tstrerror(code));
×
222
    return code;
×
223
  }
224
  code = tDispatchWorkerAllocQueue(pTriggerPool, pMgmt, (FItem)smProcessStreamTriggerQueue, smDispatchStreamTriggerRsp);
226✔
225
  if (code != 0) {
226!
226
    dError("failed to start snode stream-trigger worker since %s", tstrerror(code));
×
227
    return code;
×
228
  }
229

230
  dDebug("snode workers are initialized");
226✔
231
  return code;
226✔
232
}
233

234
void smStopWorker(SSnodeMgmt *pMgmt) {
226✔
235
  tSingleWorkerCleanup(&pMgmt->runnerWorker);
226✔
236
  tDispatchWorkerCleanup(&pMgmt->triggerWorkerPool);
226✔
237
  dDebug("snode workers are closed");
226✔
238
}
226✔
239

240
int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
15,772✔
241
  int32_t  code;
242
  SRpcMsg *pMsg;
243

244
  code = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen, (void **)&pMsg);
15,772✔
245
  if (code) {
15,772!
246
    rpcFreeCont(pRpc->pCont);
×
247
    pRpc->pCont = NULL;
×
248
    return code = terrno;
×
249
  }
250

251
  SSnode *pSnode = pMgmt->pSnode;
15,772✔
252
  if (pSnode == NULL) {
15,772!
253
    code = terrno;
×
254
    dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, tstrerror(code),
×
255
           TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen);
256
    taosFreeQitem(pMsg);
×
257
    rpcFreeCont(pRpc->pCont);
×
258
    pRpc->pCont = NULL;
×
259
    return code;
×
260
  }
261

262
  memcpy(pMsg, pRpc, sizeof(SRpcMsg));
15,772✔
263
  pRpc->pCont = NULL;
15,772✔
264

265
  switch (qtype) {
15,772!
266
    case STREAM_RUNNER_QUEUE:
×
267
      code = smPutMsgToRunnerQueue(pMgmt, pMsg);
×
268
      break;
×
269
    case STREAM_TRIGGER_QUEUE:
15,772✔
270
      code = smPutMsgToTriggerQueue(pMgmt, pMsg);
15,772✔
271
      break;
15,772✔
272
    default:
×
273
      code = TSDB_CODE_INVALID_PARA;
×
274
      rpcFreeCont(pMsg->pCont);
×
275
      taosFreeQitem(pMsg);
×
276
      return code;
×
277
  }
278
  return code;
15,772✔
279
}
280

281
int32_t smPutMsgToRunnerQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
12,389✔
282
  SSingleWorker *pWorker = &pMgmt->runnerWorker;
12,389✔
283

284
  dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
12,389!
285
  return taosWriteQitem(pWorker->queue, pMsg);
12,389✔
286
}
287

288
int32_t smPutMsgToTriggerQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
66,344✔
289
  int32_t code = tAddTaskIntoDispatchWorkerPool(&pMgmt->triggerWorkerPool, pMsg);
66,344✔
290
  stDebug("msg:%p, put into pool %s, code %d", pMsg, pMgmt->triggerWorkerPool.name, code);
66,400✔
291
  return code;
66,400✔
292
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc