• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3562

20 Dec 2024 09:57AM UTC coverage: 26.655% (-32.2%) from 58.812%
#3562

push

travis-ci

web-flow
Merge pull request #29229 from taosdata/enh/TS-5749-3.0

enh: seperate tsdb async tasks to different thread pools

21498 of 109421 branches covered (19.65%)

Branch coverage included in aggregate %.

66 of 96 new or added lines in 7 files covered. (68.75%)

39441 existing lines in 157 files now uncovered.

35007 of 102566 relevant lines covered (34.13%)

53922.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/inc/sma.h
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#ifndef _TD_VNODE_SMA_H_
17
#define _TD_VNODE_SMA_H_
18

19
#include "vnodeInt.h"
20

21
#ifdef __cplusplus
22
extern "C" {
23
#endif
24

25
// smaDebug ================
26
// clang-format off
27
#define smaFatal(...) do { if (smaDebugFlag & DEBUG_FATAL) { taosPrintLog("SMA FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
28
#define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
29
#define smaWarn(...)  do { if (smaDebugFlag & DEBUG_WARN)  { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
30
#define smaInfo(...)  do { if (smaDebugFlag & DEBUG_INFO)  { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
31
#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
32
#define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
33
// clang-format on
34

35
#define RSMA_TASK_INFO_HASH_SLOT (8)
36

37
typedef struct SSmaEnv       SSmaEnv;
38
typedef struct SSmaStat      SSmaStat;
39
typedef struct STSmaStat     STSmaStat;
40
typedef struct SRSmaStat     SRSmaStat;
41
typedef struct SRSmaRef      SRSmaRef;
42
typedef struct SRSmaInfo     SRSmaInfo;
43
typedef struct SRSmaInfoItem SRSmaInfoItem;
44
typedef struct SRSmaFS       SRSmaFS;
45
typedef struct SQTaskFile    SQTaskFile;
46
typedef struct SQTaskFReader SQTaskFReader;
47

48
struct SSmaEnv {
49
  SRWLatch  lock;
50
  int8_t    type;
51
  int8_t    flag;  // 0x01 inClose
52
  SSmaStat *pStat;
53
};
54

55
#define SMA_ENV_FLG_CLOSE ((int8_t)0x1)
56

57
struct SRSmaRef {
58
  int64_t refId;  // for SRSmaStat
59
  int64_t suid;
60
};
61

62
typedef struct {
63
  int8_t  inited;
64
  int32_t rsetId;
65
  void   *tmrHandle;  // shared by all fetch tasks
66
  /**
67
   * @brief key: void* of SRSmaInfoItem, value: SRSmaRef
68
   *  N.B. Although there is a very small possibility that "void*" point to different objects while with the same
69
   * address after release/renew, the functionality is not affected as it just used to fetch the rsma results.
70
   */
71
  SHashObj *refHash;  // shared by all vgroups
72
} SSmaMgmt;
73

74
#define SMA_ENV_LOCK(env)  (&(env)->lock)
75
#define SMA_ENV_TYPE(env)  ((env)->type)
76
#define SMA_ENV_STAT(env)  ((env)->pStat)
77
#define SMA_RSMA_STAT(sma) ((SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)(sma)->pRSmaEnv))
78

79
struct STSmaStat {
80
  int8_t    state;  // ETsdbSmaStat
81
  STSma    *pTSma;  // cache schema
82
  STSchema *pTSchema;
83
};
84

85
struct SQTaskFile {
86
  volatile int32_t nRef;
87
  int8_t           level;
88
  int64_t          suid;
89
  int64_t          version;
90
  int64_t          size;
91
  int64_t          mtime;
92
};
93

94
struct SQTaskFReader {
95
  SSma     *pSma;
96
  int8_t    level;
97
  int64_t   suid;
98
  int64_t   version;
99
  TdFilePtr pReadH;
100
};
101

102
struct SRSmaFS {
103
  SArray *aQTaskInf;  // array of SQTaskFile
104
};
105

106
struct SRSmaStat {
107
  SSma            *pSma;
108
  int64_t          refId;        // shared by fetch tasks
109
  volatile int64_t nBufItems;    // number of items in queue buffer
110
  SRWLatch         lock;         // r/w lock for rsma fs(e.g. qtaskinfo)
111
  volatile int32_t execStat;     // 0 succeed, other failed
112
  volatile int32_t nFetchAll;    // active number of fetch all
113
  volatile int8_t  triggerStat;  // shared by fetch tasks
114
  volatile int8_t  commitStat;   // 0 not in committing, 1 in committing
115
  volatile int8_t  delFlag;      // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo
116
  SRSmaFS          fs;           // for recovery/snapshot r/w
117
  SHashObj        *infoHash;     // key: suid, value: SRSmaInfo
118
  tsem_t           notEmpty;     // has items in queue buffer
119
  SArray          *blocks;       // SArray<SSDataBlock>
120
};
121

122
struct SSmaStat {
123
  union {
124
    STSmaStat tsmaStat;  // time-range-wise sma
125
    SRSmaStat rsmaStat;  // rollup sma
126
  };
127
  T_REF_DECLARE()
128
  char data[];
129
};
130

131
#define SMA_STAT_TSMA(s)     (&(s)->tsmaStat)
132
#define SMA_STAT_RSMA(s)     (&(s)->rsmaStat)
133
#define RSMA_INFO_HASH(r)    ((r)->infoHash)
134
#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat)
135
#define RSMA_COMMIT_STAT(r)  (&(r)->commitStat)
136
#define RSMA_REF_ID(r)       ((r)->refId)
137
#define RSMA_FS(r)           (&(r)->fs)
138
#define RSMA_FS_LOCK(r)      (&(r)->lock)
139

140
struct SRSmaInfoItem {
141
  int8_t  level;
142
  int8_t  fetchLevel;
143
  int8_t  triggerStat;
144
  int32_t nScanned;
145
  int32_t streamFlushed : 1;
146
  int32_t maxDelay : 31;  // ms
147
  int64_t submitReqVer;
148
  int64_t fetchResultVer;
149
  tmr_h   tmrId;
150
  void   *pStreamState;
151
  void   *pStreamTask;  // SStreamTask
152
  SArray *pResList;
153
};
154

155
struct SRSmaInfo {
156
  SSma     *pSma;
157
  STSchema *pTSchema;
158
  int64_t   suid;
159
  int64_t   lastRecv;  // ms
160
  int8_t    assigned;  // 0 idle, 1 assgined for exec
161
  int8_t    delFlag;
162
  int16_t   padding;
163
  T_REF_DECLARE()
164
  SRSmaInfoItem items[TSDB_RETENTION_L2];
165
  void         *taskInfo[TSDB_RETENTION_L2];  // qTaskInfo_t
166
  STaosQueue   *queue;                        // buffer queue of SubmitReq
167
  STaosQall    *qall;                         // buffer qall of SubmitReq
168
};
169

170
#define RSMA_INFO_IS_DEL(r)   ((r)->delFlag == 1)
171
#define RSMA_INFO_SET_DEL(r)  ((r)->delFlag = 1)
172
#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i])
173
#define RSMA_INFO_ITEM(r, i)  (&(r)->items[i])
174

175
enum {
176
  TASK_TRIGGER_STAT_INIT = 0,
177
  TASK_TRIGGER_STAT_ACTIVE = 1,
178
  TASK_TRIGGER_STAT_INACTIVE = 2,
179
  TASK_TRIGGER_STAT_PAUSED = 3,
180
  TASK_TRIGGER_STAT_CANCELLED = 4,
181
  TASK_TRIGGER_STAT_DROPPED = 5,
182
};
183

184
enum {
185
  RSMA_RESTORE_REBOOT = 1,
186
  RSMA_RESTORE_SYNC = 2,
187
};
188

189
typedef enum {
190
  RSMA_EXEC_OVERFLOW = 1,  // triggered by queue buf overflow
191
  RSMA_EXEC_TIMEOUT = 2,   // triggered by timer
192
  RSMA_EXEC_COMMIT = 3,    // triggered by commit
193
} ERsmaExecType;
194

195
#define TD_SMA_LOOPS_CHECK(n, limit) \
196
  if (++(n) > limit) {               \
197
    (void)sched_yield();             \
198
    (n) = 0;                         \
199
  }
200

201
// sma
202
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
203
void    tdDestroySmaEnv(SSmaEnv *pSmaEnv);
204
void   *tdFreeSmaEnv(SSmaEnv *pSmaEnv);
205
int32_t tdLockSma(SSma *pSma);
206
int32_t tdUnLockSma(SSma *pSma);
207
void   *tdAcquireSmaRef(int32_t rsetId, int64_t refId);
208
int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId);
209

210
static FORCE_INLINE void tdRefSmaStat(SSma *pSma, SSmaStat *pStat) {
211
  int32_t ref = T_REF_INC(pStat);
212
  smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
213
}
214
static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
215
  int32_t ref = T_REF_DEC(pStat);
216
  smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
217
}
218

219
int32_t smaPreClose(SSma *pSma);
220

221
// rsma
222
void   *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo);
223
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
224
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
225
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
226
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
227
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
228
void    tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);
229

230
static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
UNCOV
231
  int32_t ref = T_REF_INC(pRSmaInfo);
×
UNCOV
232
  smaTrace("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
×
UNCOV
233
}
×
234
static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) {
UNCOV
235
  int32_t ref = T_REF_DEC(pRSmaInfo);
×
UNCOV
236
  smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref);
×
UNCOV
237
}
×
238

239
void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputName);
240

241
#ifdef __cplusplus
242
}
243
#endif
244

245
#endif /*_TD_VNODE_SMA_H_*/
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc