• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3531

19 Nov 2024 10:42AM UTC coverage: 60.213% (-0.006%) from 60.219%
#3531

push

travis-ci

web-flow
Merge pull request #28777 from taosdata/fix/3.0/TD-32366

fix:TD-32366/stmt add geometry datatype check

118529 of 252344 branches covered (46.97%)

Branch coverage included in aggregate %.

7 of 48 new or added lines in 3 files covered. (14.58%)

2282 existing lines in 115 files now uncovered.

199096 of 275161 relevant lines covered (72.36%)

6067577.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.37
/source/libs/stream/src/streamSched.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "ttime.h"
17
#include "streamInt.h"
18
#include "ttimer.h"
19

20
static void streamTaskResumeHelper(void* param, void* tmrId);
21
static void streamTaskSchedHelper(void* param, void* tmrId);
22

23
void streamSetupScheduleTrigger(SStreamTask* pTask) {
7,666✔
24
  int64_t     delay = 0;
7,666✔
25
  int32_t     code = 0;
7,666✔
26
  const char* id = pTask->id.idStr;
7,666✔
27
  int64_t* pTaskRefId = NULL;
7,666✔
28

29
  if (pTask->info.fillHistory == 1) {
7,666✔
30
    return;
6,658✔
31
  }
32

33
  // dynamic set the trigger & triggerParam for STREAM_TRIGGER_FORCE_WINDOW_CLOSE
34
  if ((pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) && (pTask->info.taskLevel == TASK_LEVEL__SOURCE)) {
4,960!
35
    int64_t   waterMark = 0;
35✔
36
    SInterval interval = {0};
35✔
37
    STimeWindow lastTimeWindow = {0};
35✔
38
    code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval, &lastTimeWindow);
35✔
39
    if (code) {
35!
40
      stError("s-task:%s failed to init scheduler info, code:%s", id, tstrerror(code));
×
41
      return;
×
42
    }
43

44
    pTask->status.latestForceWindow = lastTimeWindow;
35✔
45
    pTask->info.delaySchedParam = interval.sliding;
35✔
46
    pTask->info.watermark = waterMark;
35✔
47
    pTask->info.interval = interval;
35✔
48

49
    // calculate the first start timestamp
50
    int64_t now = taosGetTimestamp(interval.precision);
35!
51
    STimeWindow curWin = getAlignQueryTimeWindow(&pTask->info.interval, now);
35✔
52
    delay = (curWin.ekey + 1) - now + waterMark;
35✔
53

54
    stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64
35!
55
           " unit:%c, initial start after:%" PRId64,
56
           id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit, delay);
57
  } else {
58
    delay = pTask->info.delaySchedParam;
4,890✔
59
    if (delay == 0) {
4,890✔
60
      return;
3,917✔
61
    }
62
  }
63

64
  code = streamTaskAllocRefId(pTask, &pTaskRefId);
1,008✔
65
  if (code == 0) {
1,008!
66
    stDebug("s-task:%s refId:%" PRId64 " enable the scheduler trigger, delay:%" PRId64, pTask->id.idStr,
1,008!
67
            pTask->id.refId, delay);
68

69
    streamTmrStart(streamTaskSchedHelper, (int32_t)delay, pTaskRefId, streamTimer,
1,008✔
70
                    &pTask->schedInfo.pDelayTimer, pTask->pMeta->vgId, "sched-tmr");
1,008✔
71
    pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
1,008✔
72
  }
73
}
74

75
int32_t streamTrySchedExec(SStreamTask* pTask) {
43,727✔
76
  if (streamTaskSetSchedStatusWait(pTask)) {
43,727✔
77
    return streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0);
34,701✔
78
  } else {
79
    stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
9,026✔
80
  }
81

82
  return 0;
9,026✔
83
}
84

85
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) {
104,363✔
86
  int32_t code = 0;
104,363✔
87
  int32_t tlen = 0;
104,363✔
88

89
  SStreamTaskRunReq req = {.streamId = streamId, .taskId = taskId, .reqType = execType};
104,363✔
90

91
  tEncodeSize(tEncodeStreamTaskRunReq, &req, tlen, code);
104,363!
92
  if (code < 0) {
104,400!
UNCOV
93
    stError("s-task:0x%" PRIx64 " vgId:%d encode stream task run req failed, code:%s", streamId, vgId, tstrerror(code));
×
UNCOV
94
    return code;
×
95
  }
96

97
  void* buf = rpcMallocCont(tlen + sizeof(SMsgHead));
104,400✔
98
  if (buf == NULL) {
104,399!
UNCOV
99
    stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
×
100
            tstrerror(terrno));
UNCOV
101
    return terrno;
×
102
  }
103

104
  ((SMsgHead*)buf)->vgId = vgId;
104,399✔
105
  char* bufx = POINTER_SHIFT(buf, sizeof(SMsgHead));
104,399✔
106

107
  SEncoder encoder;
108
  tEncoderInit(&encoder, (uint8_t*)bufx, tlen);
104,399✔
109
  if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) {
104,396!
UNCOV
110
    rpcFreeCont(buf);
×
UNCOV
111
    tEncoderClear(&encoder);
×
UNCOV
112
    stError("s-task:0x%x vgId:%d encode run task msg failed, code:%s", taskId, vgId, tstrerror(code));
×
UNCOV
113
    return code;
×
114
  }
115
  tEncoderClear(&encoder);
104,397✔
116

117
  if (streamId != 0) {
104,395✔
118
    stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType,
46,406✔
119
            streamTaskGetExecType(execType));
120
  } else {
121
    stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType));
57,989✔
122
  }
123

124
  SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
104,402✔
125
  code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
104,402✔
126
  if (code) {
104,402✔
127
    stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
581!
128
  }
129
  return code;
104,402✔
130
}
131

132
void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
41,965✔
133

134
void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
8,660✔
135

136
void streamTaskResumeInFuture(SStreamTask* pTask) {
6,877✔
137
  stDebug("s-task:%s task should idle, add into timer to retry in %dms", pTask->id.idStr,
6,877✔
138
          pTask->status.schedIdleTime);
139

140
  // add one ref count for task
141
  int64_t* pTaskRefId = NULL;
6,877✔
142
  int32_t  code = streamTaskAllocRefId(pTask, &pTaskRefId);
6,877✔
143
  if (code == 0) {
6,877!
144
    streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTaskRefId, streamTimer,
6,877✔
145
                   &pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr");
6,877✔
146
  }
147
}
6,877✔
148

149
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
150
void streamTaskResumeHelper(void* param, void* tmrId) {
6,877✔
151
  int32_t      code = 0;
6,877✔
152
  int64_t      taskRefId = *(int64_t*)param;
6,877✔
153
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
6,877✔
154
  if (pTask == NULL) {
6,877✔
155
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
28!
156
    streamTaskFreeRefId(param);
28✔
157
    return;
28✔
158
  }
159

160
  SStreamTaskId*   pId = &pTask->id;
6,849✔
161
  SStreamTaskState p = streamTaskGetStatus(pTask);
6,849✔
162

163
  if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
6,849!
UNCOV
164
    int8_t status = streamTaskSetSchedStatusInactive(pTask);
×
165
    TAOS_UNUSED(status);
166

UNCOV
167
    stDebug("s-task:%s status:%s not resume task", pId->idStr, p.name);
×
UNCOV
168
    streamMetaReleaseTask(pTask->pMeta, pTask);
×
UNCOV
169
    streamTaskFreeRefId(param);
×
UNCOV
170
    return;
×
171
  }
172

173
  code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
6,849✔
174
  if (code) {
6,849!
UNCOV
175
    stError("s-task:%s sched task failed, code:%s", pId->idStr, tstrerror(code));
×
176
  } else {
177
    stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime);
6,849✔
178

179
    // release the task ref count
180
    streamTaskClearSchedIdleInfo(pTask);
6,849✔
181
  }
182

183
  streamMetaReleaseTask(pTask->pMeta, pTask);
6,849✔
184
  streamTaskFreeRefId(param);
6,849✔
185
}
186

187
void streamTaskSchedHelper(void* param, void* tmrId) {
4,903✔
188
  int64_t      taskRefId = *(int64_t*)param;
4,903✔
189
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
4,903✔
190
  if (pTask == NULL) {
4,903!
UNCOV
191
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
×
UNCOV
192
    streamTaskFreeRefId(param);
×
UNCOV
193
    return;
×
194
  }
195

196
  stDebug("s-task:%s acquire task, refId:%"PRId64, pTask->id.idStr, pTask->id.refId);
4,903!
197

198
  const char*  id = pTask->id.idStr;
4,903✔
199
  int32_t      nextTrigger = (int32_t)pTask->info.delaySchedParam;
4,903✔
200
  int32_t      vgId = pTask->pMeta->vgId;
4,903✔
201
  int32_t      code = 0;
4,903✔
202

203
  int8_t status = atomic_load_8(&pTask->schedInfo.status);
4,903✔
204
  stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
4,903✔
205

206
  if (streamTaskShouldStop(pTask)) {
4,903✔
207
    stDebug("s-task:%s should stop, jump out of schedTimer", id);
1!
208
    streamMetaReleaseTask(pTask->pMeta, pTask);
1✔
209
    streamTaskFreeRefId(param);
1✔
210
    return;
1✔
211
  }
212

213
  if (streamTaskShouldPause(pTask)) {
4,902!
UNCOV
214
    stDebug("s-task:%s is paused, recheck in %.2fs", id, nextTrigger/1000.0);
×
UNCOV
215
    streamTmrStart(streamTaskSchedHelper, nextTrigger, param, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
×
216
                   "sched-run-tmr");
UNCOV
217
    streamMetaReleaseTask(pTask->pMeta, pTask);
×
218
    return;
×
219
  }
220

221
  if (streamTaskShouldPause(pTask)) {
4,902!
UNCOV
222
    stDebug("s-task:%s is paused, check in nextTrigger:%ds", id, nextTrigger/1000);
×
UNCOV
223
    streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
×
224
                   "sched-run-tmr");
225
  }
226

227
  if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
4,902✔
228
    stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
26!
229
  } else {
230
    if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
4,876!
231
      SStreamTrigger* pTrigger = NULL;
671✔
232

233
      while (1) {
12✔
234
        code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval,
683✔
235
                                              &pTask->status.latestForceWindow, id);
236
        if (code != 0) {
683!
UNCOV
237
          stError("s-task:%s failed to prepare force window close trigger, code:%s, try again in %dms", id,
×
238
                  tstrerror(code), nextTrigger);
UNCOV
239
          goto _end;
×
240
        }
241

242
        // in the force window close model, status trigger does not matter. So we do not set the trigger model
243
        code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
683✔
244
        if (code != TSDB_CODE_SUCCESS) {
683!
UNCOV
245
          stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
246
          goto _end;
×
247
        }
248

249
        // check whether the time window gaps exist or not
250
        int64_t now = taosGetTimestamp(pTask->info.interval.precision);
683!
251
        int64_t intervalEndTs = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval;
683✔
252

253
        // there are gaps, needs to be filled
254
        STimeWindow w = pTrigger->pBlock->info.window;
683✔
255
        w.ekey = w.skey + pTask->info.interval.interval;
683✔
256
        if (w.skey <= pTask->status.latestForceWindow.skey) {
683!
257
          stFatal("s-task:%s invalid new time window in force_window_close model, skey:%" PRId64
×
258
                  " should be greater than latestForceWindow skey:%" PRId64,
259
                  pTask->id.idStr, w.skey, pTask->status.latestForceWindow.skey);
260
        }
261

262
        pTask->status.latestForceWindow = w;
683✔
263
        if (intervalEndTs + pTask->info.watermark + pTask->info.interval.interval > now) {
683✔
264
          break;
671✔
265
        } else {
266
          stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey);
12!
267
        }
268
      }
269

270
    } else if (status == TASK_TRIGGER_STATUS__MAY_ACTIVE) {
4,205✔
271
      SStreamTrigger* pTrigger = NULL;
991✔
272
      code = streamCreateSinkResTrigger(&pTrigger);
991✔
273
      if (code) {
991!
UNCOV
274
        stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, tstrerror(code),
×
275
                nextTrigger);
UNCOV
276
        goto _end;
×
277
      }
278

279
      atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
991✔
280

281
      code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
991✔
282
      if (code != TSDB_CODE_SUCCESS) {
991!
UNCOV
283
        stError("s-task:%s failed to put retrieve aggRes block into q, code:%s", pTask->id.idStr, tstrerror(code));
×
UNCOV
284
        goto _end;
×
285
      }
286
    }
287

288
    code = streamTrySchedExec(pTask);
4,876✔
289
    if (code != TSDB_CODE_SUCCESS) {
4,876✔
290
      stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr);
17!
291
    }
292
  }
293

294
_end:
4,859✔
295
  streamTmrStart(streamTaskSchedHelper, nextTrigger, param, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
4,902✔
296
                 "sched-run-tmr");
297
  streamMetaReleaseTask(pTask->pMeta, pTask);
4,902✔
298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc