• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4905

29 Dec 2025 02:08PM UTC coverage: 65.423% (-0.3%) from 65.734%
#4905

push

travis-ci

web-flow
enh: sign connect request (#34067)

23 of 29 new or added lines in 4 files covered. (79.31%)

11614 existing lines in 186 files now uncovered.

193476 of 295730 relevant lines covered (65.42%)

115752566.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

4.87
/source/client/src/clientSession.c
1

2
/*
3
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
4
 *
5
 * This program is free software: you can use, redistribute, and/or modify
6
 * it under the terms of the GNU Affero General Public License, version 3
7
 * or later ("AGPL"), as published by the Free Software Foundation.
8
 *
9
 * This program is distributed in the hope that it will be useful, but WITHOUT
10
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11
 * FITNESS FOR A PARTICULAR PURPOSE.
12
 *
13
 * You should have received a copy of the GNU Affero General Public License
14
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
15
 */
16

17
#include "clientSession.h"
18
#include "clientInt.h"
19
#include "clientLog.h"
20

21
static SSessionMgt sessMgt = {0};
22

23
#define HANDLE_SESSION_CONTROL() \
24
  do {                           \
25
    if (sessMgt.inited == 0) {   \
26
      return TSDB_CODE_SUCCESS;  \
27
    }                            \
28
  } while (0)
29

30
static int32_t sessPerUserCheckFn(int64_t value, int64_t limit) {
×
31
  int32_t code = 0;
×
32
  if (limit == -1) {
×
33
    return 0;
×
34
  }
35

36
  if (value > limit) {
×
37
    code = TSDB_CODE_TSC_SESS_PER_USER_LIMIT;
×
38
  }
39

40
  return code;
×
41
}
42

43
static int32_t sessPerUserUpdateFn(int64_t *value, int64_t limit) {
×
44
  int32_t code = 0;
×
45
  *value += limit;
×
46
  return code;
×
47
}
48

49
static int32_t sessConnTimeCheckFn(int64_t value, int64_t limit) {
×
50
  int32_t code = 0;
×
51
  if (limit == -1) {
×
52
    return code;
×
53
  }
54
  int64_t currentTime = taosGetTimestampMs();
×
55
  if ((value + limit * 1000) < currentTime) {
×
56
    code = TSDB_CODE_TSC_SESS_CONN_TIMEOUT;
×
57
  }
58

59
  return code;
×
60
}
61

62
static int32_t sessConnTimeUpdateFn(int64_t *value, int64_t limit) {
×
63
  int32_t code = 0;
×
64
  *value = taosGetTimestampMs();
×
65
  return code;
×
66
}
67

68
static int32_t sessConnIdleTimeCheckFn(int64_t value, int64_t limit) {
×
69
  int32_t code = 0;
×
70
  if (limit == -1) {
×
71
    return 0;
×
72
  }
73
  int64_t currentTime = taosGetTimestampMs();
×
74
  if ((value + limit * 1000) < currentTime) {
×
75
    code = TSDB_CODE_TSC_SESS_CONN_IDLE_TIMEOUT;
×
76
  }
77
  return code;
×
78
}
79

80
static int32_t sessConnIdleTimeUpdateFn(int64_t *value, int64_t limit) {
×
81
  int32_t code = 0;
×
82
  *value = taosGetTimestampMs();
×
83
  return code;
×
84
}
85

86
static int32_t sessMaxConnCurrencyCheckFn(int64_t value, int64_t limit) {
×
87
  int32_t code = 0;
×
88
  if (limit == -1) {
×
89
    return code;
×
90
  }
91
  return code;
×
92
}
93

94
static int32_t sessMaxConnCurrencyUpdateFn(int64_t *value, int64_t delta) {
×
95
  int32_t code = 0;
×
96
  if (delta == -1) {
×
97
    return code;
×
98
  }
99
  return code;
×
100
}
101

102
static int32_t sessVnodeCallCheckFn(int64_t value, int64_t limit) {
×
103
  int32_t code = 0;
×
104
  if (limit == -1) {
×
105
    return code;
×
106
  }
107

108
  if (value > limit) {
×
109
    code = TSDB_CODE_TSC_SESS_MAX_CALL_VNODE_LIMIT;
×
110
  }
111
  return code;
×
112
}
113

114
static int32_t sessVnodeCallNumUpdateFn(int64_t *value, int64_t delta) {
×
115
  int32_t code = 0;
×
116
  *value += delta;
×
117
  return code;
×
118
}
119
static int32_t sessSetValueLimitFn(int64_t *pLimit, int64_t src) {
×
120
  int32_t code = 0;
×
121
  *pLimit = src;
×
122
  return code;
×
123
}
124

125
static SSessionError sessFnSet[] = {
126
    {SESSION_PER_USER, sessPerUserCheckFn, sessPerUserUpdateFn, sessSetValueLimitFn},
127
    {SESSION_CONN_TIME, sessConnTimeCheckFn, sessConnTimeUpdateFn, sessSetValueLimitFn},
128
    {SESSION_CONN_IDLE_TIME, sessConnIdleTimeCheckFn, sessConnIdleTimeUpdateFn, sessSetValueLimitFn},
129
    {SESSION_MAX_CONCURRENCY, sessMaxConnCurrencyCheckFn, sessMaxConnCurrencyUpdateFn, sessSetValueLimitFn},
130
    {SESSION_MAX_CALL_VNODE_NUM, sessVnodeCallCheckFn, sessVnodeCallNumUpdateFn, sessSetValueLimitFn},
131
};
132

133
int32_t sessMetricCreate(SSessMetric **ppMetric) {
×
134
  int32_t      code = 0;
×
135
  SSessMetric *pMetric = (SSessMetric *)taosMemoryMalloc(sizeof(SSessMetric));
×
136
  if (pMetric == NULL) {
×
137
    code = terrno;
×
138
    return code;
×
139
  }
140

141
  memset(pMetric->value, 0, sizeof(pMetric->value));
×
142

143
  pMetric->limit[SESSION_PER_USER] = sessionPerUser;
×
144
  pMetric->limit[SESSION_CONN_TIME] = sessionConnTime;
×
145
  pMetric->limit[SESSION_CONN_IDLE_TIME] = sessionConnIdleTime;
×
146
  pMetric->limit[SESSION_MAX_CONCURRENCY] = sessionMaxConcurrency;
×
147
  pMetric->limit[SESSION_MAX_CALL_VNODE_NUM] = sessionMaxCallVnodeNum;
×
148

149
  code = taosThreadRwlockInit(&pMetric->lock, NULL);
×
150
  if (code != 0) {
×
151
    taosMemoryFree(pMetric);
×
152
    return code;
×
153
  }
154

155
  *ppMetric = pMetric;
×
156
  return code;
×
157
}
158

159
int32_t sessMetricUpdateLimit(SSessMetric *pMetric, ESessionType type, int32_t value) {
×
160
  int32_t code = 0;
×
161

162
  (void)taosThreadRwlockWrlock(&pMetric->lock);
×
163
  code = sessFnSet[type].limitFn(&pMetric->limit[type], value);
×
164
  (void)taosThreadRwlockUnlock(&pMetric->lock);
×
165
  return code;
×
166
}
167

168
int32_t sessMetricCheckImpl(SSessMetric *pMetric) {
×
169
  int32_t code = 0;
×
170

171
  for (int32_t i = 0; i < sizeof(pMetric->limit) / sizeof(pMetric->limit[0]); i++) {
×
172
    code = sessFnSet[i].checkFn(pMetric->value[i], pMetric->limit[i]);
×
173
    if (code != 0) {
×
174
      break;
×
175
    }
176
  }
177

178
  return code;
×
179
}
180
int32_t sessMetricCheckByTypeImpl(SSessMetric *pMetric, ESessionType type) {
×
181
  return sessFnSet[type].checkFn(pMetric->value[type], pMetric->limit[type]);
×
182
}
183

184
int32_t sessMetricCheck(SSessMetric *pMetric) {
×
185
  int32_t code = 0;
×
186

187
  (void)taosThreadRwlockRdlock(&pMetric->lock);
×
188
  code = sessMetricCheckImpl(pMetric);
×
189

190
  (void)taosThreadRwlockUnlock(&pMetric->lock);
×
191

192
  return code;
×
193
}
194

195
int32_t sessMetricCheckByType(SSessMetric *pMetric, ESessionType type) {
×
196
  int32_t code = 0;
×
197

198
  (void)taosThreadRwlockRdlock(&pMetric->lock);
×
199
  code = sessMetricCheckByTypeImpl(pMetric, type);
×
200
  (void)taosThreadRwlockUnlock(&pMetric->lock);
×
201

202
  return code;
×
203
}
204

205
int32_t sessMetricGet(SSessMetric *pMetric, ESessionType type, int32_t *pValue) {
×
206
  int32_t code = 0;
×
207

208
  (void)taosThreadRwlockRdlock(&pMetric->lock);
×
209
  *pValue = pMetric->limit[type];
×
210
  (void)taosThreadRwlockUnlock(&pMetric->lock);
×
211

212
  return code;
×
213
}
214

215
int32_t sessMetricUpdate(SSessMetric *pMetric, SSessParam *p) {
×
216
  int32_t code = 0;
×
217
  int32_t lino = 0;
×
218
  (void)taosThreadRwlockWrlock(&pMetric->lock);
×
219

220
  code = sessMetricCheckByTypeImpl(pMetric, p->type);
×
221
  TAOS_CHECK_GOTO(code, &lino, _error);
×
222

223
  code = sessFnSet[p->type].updateFn(&pMetric->value[p->type], p->value);
×
224
_error:
×
225

226
  TAOS_UNUSED(taosThreadRwlockUnlock(&pMetric->lock));
×
227
  return code;
×
228
}
229

230
int32_t sessMetricCheckValue(SSessMetric *pMetric, ESessionType type, int64_t value) {
×
231
  int32_t code = 0;
×
232

233
  (void)taosThreadRwlockRdlock(&pMetric->lock);
×
234
  code = sessFnSet[type].checkFn(value, pMetric->limit[type]);
×
235
  (void)taosThreadRwlockUnlock(&pMetric->lock);
×
236

237
  return code;
×
238
}
239
void sessMetricDestroy(SSessMetric *pMetric) {
×
240
  TAOS_UNUSED(taosThreadRwlockDestroy(&pMetric->lock));
×
241
  taosMemoryFree(pMetric);
×
242
}
×
243

244
int32_t sessMgtInit() {
×
245
  int32_t code = 0;
×
246
  int32_t lino = 0;
×
247

248
  sessMgt.pSessMetricMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
×
249
  if (sessMgt.pSessMetricMap == NULL) {
×
250
    code = terrno;
×
251
    TAOS_CHECK_GOTO(code, &lino, _error);
×
252
  }
253
  sessMgt.inited = 1;
×
254

255
_error:
×
256
  if (code != 0) {
×
257
    tscError("failed to init session mgt, line:%d, code:%d", lino, code);
×
258
  }
259
  return code;
×
260
}
261

262
int32_t sessMgtUpdataLimit(char *user, ESessionType type, int32_t value) {
5,177,695✔
263
  HANDLE_SESSION_CONTROL();
5,177,695✔
264
  int32_t      code = 0;
×
265
  int32_t      lino = 0;
×
266
  SSessionMgt *pMgt = &sessMgt;
×
267

268
  if (type >= SESSION_MAX_TYPE || type < SESSION_PER_USER) {
×
269
    return TSDB_CODE_INVALID_PARA;
×
270
  }
271

272

273
  SSessMetric *pMetric = NULL;
×
274
  (void)taosThreadRwlockWrlock(&pMgt->lock);
×
275

276
  SSessMetric **ppMetric = taosHashGet(pMgt->pSessMetricMap, user, strlen(user));
×
277
  if (ppMetric == NULL || *ppMetric == NULL) {
×
278
    code = sessMetricCreate(&pMetric);
×
279
    TAOS_CHECK_GOTO(code, &lino, _error);
×
280

281
    code = taosHashPut(pMgt->pSessMetricMap, user, strlen(user), &pMetric, sizeof(SSessMetric *));
×
282
    TAOS_CHECK_GOTO(code, &lino, _error);
×
283

284
  } else {
285
    pMetric = *ppMetric;
×
286
  }
287

288
  code = sessMetricUpdateLimit(pMetric, type, value);
×
289
  TAOS_CHECK_GOTO(code, &lino, _error);
×
290

291
_error:
×
292
  if (code != 0) {
×
293
    uError("failed to update session mgt type:%d, line:%d, code:%d", type, lino, code);
×
294
  }
295

296
  TAOS_UNUSED(taosThreadRwlockUnlock(&pMgt->lock));
×
297

298
  return code;
×
299
}
300

301
int32_t sessMgtUpdateUserMetric(char *user, SSessParam *pPara) {
2,063,113✔
302
  HANDLE_SESSION_CONTROL();
2,063,113✔
303

304
  int32_t code = 0;
×
305
  int32_t lino = 0;
×
306

307
  SSessionMgt *pMgt = &sessMgt;
×
308

309
  SSessMetric *pMetric = NULL;
×
310
  (void)taosThreadRwlockWrlock(&pMgt->lock);
×
311

312
  SSessMetric **ppMetric = taosHashGet(pMgt->pSessMetricMap, user, strlen(user));
×
313
  if (ppMetric == NULL || *ppMetric == NULL) {
×
314
    code = sessMetricCreate(&pMetric);
×
315
    TAOS_CHECK_GOTO(code, &lino, _error);
×
316

317
    code = taosHashPut(pMgt->pSessMetricMap, user, strlen(user), &pMetric, sizeof(SSessMetric *));
×
318
    TAOS_CHECK_GOTO(code, &lino, _error);
×
319
  } else {
320
    pMetric = *ppMetric;
×
321
  }
322

323
  code = sessMetricUpdate(pMetric, pPara);
×
324
  TAOS_CHECK_GOTO(code, &lino, _error);
×
325

326
_error:
×
327
  if (code != 0) {
×
328
    uError("failed to update user session metric, line:%d, code:%d", lino, code);
×
329
  }
330

331
  TAOS_UNUSED(taosThreadRwlockUnlock(&pMgt->lock));
×
332
  return code;
×
333
}
334

335
int32_t sessMgtGet(char *user, ESessionType type, int32_t *pValue) {
×
336
  int32_t      code = 0;
×
337
  int32_t      lino = 0;
×
338
  SSessionMgt *pMgt = &sessMgt;
×
339
  HANDLE_SESSION_CONTROL();
×
340

341
  if (type >= SESSION_MAX_TYPE) {
×
342
    return TSDB_CODE_INVALID_PARA;
×
343
  }
344

345
  code = taosThreadRwlockRdlock(&pMgt->lock);
×
346
  TAOS_CHECK_GOTO(code, &lino, _error);
×
347

348
  SSessMetric **ppMetric = taosHashGet(pMgt->pSessMetricMap, user, strlen(user));
×
349
  if (ppMetric == NULL || *ppMetric == NULL) {
×
350
    code = TSDB_CODE_INVALID_PARA;
×
351
    TAOS_CHECK_GOTO(code, &lino, _error);
×
352
  }
353

354
  code = sessMetricGet(*ppMetric, type, pValue);
×
355
  TAOS_CHECK_GOTO(code, &lino, _error);
×
356

357
_error:
×
358
  TAOS_UNUSED(taosThreadRwlockUnlock(&pMgt->lock));
×
359

360
  if (code != 0) {
×
361
    uError("failed to get session mgt type:%d, line:%d, code:%d", type, lino, code);
×
362
  }
363
  return code;
×
364
}
365
int32_t sessMgtCheckUser(char *user, ESessionType type) {
×
366
  HANDLE_SESSION_CONTROL();
×
367

368
  int32_t      code = 0;
×
369
  int32_t      lino = 0;
×
370
  SSessionMgt *pMgt = &sessMgt;
×
371

372
  code = taosThreadRwlockRdlock(&pMgt->lock);
×
373
  TAOS_CHECK_GOTO(code, &lino, _error);
×
374

375
  SSessMetric **ppMetric = taosHashGet(pMgt->pSessMetricMap, user, strlen(user));
×
376
  if (ppMetric == NULL || *ppMetric == NULL) {
×
377
    code = TSDB_CODE_INVALID_PARA;
×
378
    TAOS_CHECK_GOTO(code, &lino, _error);
×
379
  }
380

381
  code = sessMetricCheckByType(*ppMetric, type);
×
382

383
_error:
×
384
  if (code != 0) {
×
385
    uError("failed to check user session, line:%d, code:%d", lino, code);
×
386
  }
387
  TAOS_UNUSED(taosThreadRwlockUnlock(&pMgt->lock));
×
388
  return code;
×
389
}
390
int32_t sessMgtCheckValue(char *user, ESessionType type, int64_t value) {
×
391
  int32_t      code = 0;
×
392
  int32_t      lino = 0;
×
393
  SSessionMgt *pMgt = &sessMgt;
×
394
  code = taosThreadRwlockRdlock(&pMgt->lock);
×
395
  TAOS_CHECK_GOTO(code, &lino, _error);
×
396

397
  SSessMetric **ppMetric = taosHashGet(pMgt->pSessMetricMap, user, strlen(user));
×
398
  if (ppMetric == NULL || *ppMetric == NULL) {
×
399
    code = TSDB_CODE_INVALID_PARA;
×
400
    TAOS_CHECK_GOTO(code, &lino, _error);
×
401
  }
402

403
  code = sessMetricCheckValue(*ppMetric, type, value);
×
404

405
_error:
×
406
  TAOS_UNUSED(taosThreadRwlockUnlock(&pMgt->lock));
×
407
  if (code != 0) {
×
408
    uError("failed to check user session, line:%d, code:%d", lino, code);
×
409
  }
410
  return code;
×
411
}
412

413
int32_t sessMgtRemoveUser(char *user) {
1,652✔
414
  HANDLE_SESSION_CONTROL();
1,652✔
415

416
  int32_t      code = 0;
×
417
  int32_t      lino = 0;
×
418
  SSessionMgt *pMgt = &sessMgt;
×
419

420

421
  code = taosThreadRwlockWrlock(&pMgt->lock);
×
422
  TAOS_CHECK_GOTO(code, &lino, _error);
×
423

424
  SSessMetric **ppMetric = taosHashGet(pMgt->pSessMetricMap, user, strlen(user));
×
425
  if (ppMetric != NULL && *ppMetric != NULL) {
×
426
    sessMetricDestroy(*ppMetric);
×
427
    code = taosHashRemove(pMgt->pSessMetricMap, user, strlen(user));
×
428
    TAOS_CHECK_GOTO(code, &lino, _error);
×
429
  }
430
_error:
×
431
  TAOS_UNUSED(taosThreadRwlockUnlock(&pMgt->lock));
×
432
  return code;
×
433
}
434

435
void sessMgtDestroy() {
1,160,440✔
436
  SSessionMgt *pMgt = &sessMgt;
1,160,440✔
437
  int32_t      code = 0;
1,160,440✔
438

439
  if (pMgt->pSessMetricMap == NULL) {
1,160,440✔
440
    return;
1,160,440✔
441
  }
442

443
  void *p = taosHashIterate(pMgt->pSessMetricMap, NULL);
×
444
  while (p) {
×
445
    SSessMetric *pMetric = *(SSessMetric **)p;
×
446
    sessMetricDestroy(pMetric);
×
447
    p = taosHashIterate(pMgt->pSessMetricMap, p);
×
448
  }
449

450
  code = taosThreadRwlockDestroy(&pMgt->lock);
×
451
  if (code != 0) {
×
452
    uError("failed to destroy session mgt, code:%d", code);
×
453
  }
454
  taosHashCleanup(pMgt->pSessMetricMap);
×
455

456
  pMgt->pSessMetricMap = NULL;
×
457
}
458
int32_t sessMgtCheckConnStatus(char *user, SConnAccessInfo *pInfo) {
×
459
  HANDLE_SESSION_CONTROL();
×
460

461
  int32_t code = 0;
×
462
  int32_t lino = 0;
×
463

464

465
  code = sessMgtCheckValue(user, SESSION_CONN_TIME, pInfo->startTime);
×
466
  TAOS_CHECK_GOTO(code, &lino, _error);
×
467

468
  code = sessMgtCheckValue(user, SESSION_CONN_IDLE_TIME, pInfo->lastAccessTime);
×
469
  TAOS_CHECK_GOTO(code, &lino, _error);
×
470

471
_error:
×
472
  if (code != 0) {
×
473
    uError("failed to check connection status for user:%s, line:%d, code:%d", user, lino, code);
×
474
  }
475
  return code;
×
476
}
477

478
int32_t connCheckAndUpateMetric(int64_t connId) {
744,873,531✔
479
  HANDLE_SESSION_CONTROL();
744,873,531✔
480

481
  int32_t code = 0;
×
482
  int32_t lino = 0;
×
483

484
  STscObj *pTscObj = acquireTscObj(connId);
×
485
  if (pTscObj == NULL) {
×
486
    code = TSDB_CODE_INVALID_PARA;
×
487
    return code;
×
488
  }
489

490
  code = sessMgtCheckConnStatus(pTscObj->user, &pTscObj->sessInfo);
×
491
  TAOS_CHECK_GOTO(code, &lino, _error);
×
492

493
  updateConnAccessInfo(&pTscObj->sessInfo);
×
494

495
  code = sessMgtUpdateUserMetric(pTscObj->user, &(SSessParam){.type = SESSION_MAX_CONCURRENCY, .value = 1});
×
496
  TAOS_CHECK_GOTO(code, &lino, _error);
×
497

498
_error:
×
499
  if (code != 0) {
×
500
    tscError("conn:0x%" PRIx64 ", check and update metric failed at line:%d, code:%s", connId, lino, tstrerror(code));
×
501
  }
502

503
  releaseTscObj(connId);
×
504
  return code;
×
505
}
506

507
int32_t tscUpdateSessMgtMetric(STscObj *pTscObj, SSessParam *pParam) {
755,635,354✔
508
  HANDLE_SESSION_CONTROL();
755,635,354✔
509

UNCOV
510
  int32_t code = 0;
×
511

UNCOV
512
  if (pTscObj == NULL) {
×
513
    code = TSDB_CODE_INVALID_PARA;
×
514
    return code;
×
515
  }
UNCOV
516
  code = sessMgtUpdateUserMetric(pTscObj->user, pParam);
×
517
  return code;
×
518
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc