• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 13135713772

04 Feb 2025 12:42PM UTC coverage: 75.074% (-0.03%) from 75.104%
13135713772

Pull #843

github

web-flow
Merge 7e4333581 into 85e3589c2
Pull Request #843: taos-tools Merge 3.0 to Main Branch

433 of 593 new or added lines in 6 files covered. (73.02%)

41 existing lines in 6 files now uncovered.

12457 of 16593 relevant lines covered (75.07%)

327996.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.34
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
// query and get result  record is true to total request
17
int selectAndGetResult(qThreadInfo *pThreadInfo, char *command, bool record) {
11,319✔
18
    int ret = 0;
11,319✔
19

20
    // user cancel
21
    if (g_arguments->terminate) {
11,319✔
22
        return -1;
×
23
    }
24

25
    // execute sql
26
    uint32_t threadID = pThreadInfo->threadID;
11,319✔
27
    char dbName[TSDB_DB_NAME_LEN] = {0};
11,319✔
28
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
11,319✔
29

30
    if (g_queryInfo.iface == REST_IFACE) {
11,319✔
31
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
2,235✔
32
                                   0, g_arguments->port, false,
2,235✔
33
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
2,235✔
34
        if (0 != retCode) {
2,236✔
NEW
35
            errorPrint("====restful return fail, threadID[%u]\n", threadID);
×
UNCOV
36
            ret = -1;
×
37
        }
38
    } else {
39
        // query
40
        TAOS *taos = pThreadInfo->conn->taos;
9,084✔
41
        int64_t rows  = 0;
9,084✔
42
        TAOS_RES *res = taos_query(taos, command);
9,084✔
43
        int code = taos_errno(res);
9,084✔
44
        if (res == NULL || code) {
9,084✔
45
            // failed query
46
            errorPrint("failed to execute sql:%s, "
11✔
47
                        "code: 0x%08x, reason:%s\n",
48
                        command, code, taos_errstr(res));
49
            ret = -1;
9✔
50
        } else {
51
            // succ query
52
            if (record)
9,073✔
53
                rows = fetchResult(res, pThreadInfo->filePath);
9,066✔
54
        }
55

56
        // free result
57
        if (res) {
9,084✔
58
            taos_free_result(res);
9,084✔
59
        }
60
        debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
9,084✔
61
    }
62

63
    // record count
64
    if (ret ==0) {
11,320✔
65
        // succ
66
        if (record) 
11,311✔
67
            pThreadInfo->nSucc ++;
11,304✔
68
    } else {
69
        // fail
70
        if (record)
9✔
71
            pThreadInfo->nFail ++;
9✔
72

73
        // continue option
74
        if (YES_IF_FAILED == g_arguments->continueIfFail) {
9✔
75
            ret = 0; // force continue
3✔
76
        }
77
    }
78

79
    return ret;
11,320✔
80
}
81

82
// interlligent sleep
83
void autoSleep(uint64_t interval, uint64_t delay ) {
261✔
84
    if (delay < interval * 1000) {
261✔
85
        toolsMsleep((int32_t)(interval * 1000 - delay));  // ms
98✔
86
    }
87
}
261✔
88

89
// reset 
90
int32_t resetQueryCache(qThreadInfo* pThreadInfo) {    
7✔
91
    // execute sql 
92
    if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE", false)) {
7✔
NEW
93
        errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
94
        return -1;
×
95
    }
96
    // succ
97
    return 0;
7✔
98
}
99

100

101

102
//
103
//  ---------------------------------  second levle funtion for Thread -----------------------------------
104
//
105

106
// show rela qps
107
int64_t showRealQPS(qThreadInfo* pThreadInfo, int64_t lastPrintTime, int64_t startTs) {
11,305✔
108
    int64_t now = toolsGetTimestampMs();
11,305✔
109
    if (now - lastPrintTime > 10 * 1000) {
11,304✔
110
        // real total
111
        uint64_t totalQueried = pThreadInfo->nSucc;
33✔
112
        if(g_arguments->continueIfFail == YES_IF_FAILED) {
33✔
NEW
113
            totalQueried += pThreadInfo->nFail;
×
114
        }
115
        infoPrint(
33✔
116
            "thread[%d] has currently completed queries: %" PRIu64 ", QPS: %10.3f\n",
117
            pThreadInfo->threadID, totalQueried,
118
            (double)(totalQueried / ((now - startTs) / 1000.0)));
119
        return now;
36✔
120
    } else {
121
        return lastPrintTime;
11,271✔
122
    }    
123
}
124

125
// spec query mixed thread
126
static void *specQueryMixThread(void *sarg) {
9✔
127
    qThreadInfo *pThreadInfo = (qThreadInfo*)sarg;
9✔
128
#ifdef LINUX
129
    prctl(PR_SET_NAME, "specQueryMixThread");
9✔
130
#endif
131
    // use db
132
    if (g_queryInfo.dbName) {
9✔
133
        if (pThreadInfo->conn &&
9✔
134
            pThreadInfo->conn->taos &&
12✔
135
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
6✔
NEW
136
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
137
                return NULL;
×
138
        }
139
    }
140

141
    int64_t st = 0;
9✔
142
    int64_t et = 0;
9✔
143
    int64_t startTs = toolsGetTimestampMs();
9✔
144
    int64_t lastPrintTime = startTs;
9✔
145
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
9✔
146
    uint64_t interval = g_queryInfo.specifiedQueryInfo.queryInterval;
9✔
147
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
9✔
148
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
18✔
149
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
9✔
150
        for (int j = 0; j < queryTimes; ++j) {
533✔
151
            // use cancel
152
            if(g_arguments->terminate) {
524✔
NEW
153
                infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
154
                break;
×
155
            }
156

157
            // reset cache
158
            if (g_queryInfo.reset_query_cache) {
524✔
NEW
159
                if (resetQueryCache(pThreadInfo)) {
×
NEW
160
                    errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
UNCOV
161
                    return NULL;
×
162
                }
163
            }
164

165
            // execute sql
166
            st = toolsGetTimestampUs();
524✔
167
            int ret = selectAndGetResult(pThreadInfo, sql->command, true);
524✔
168
            if (ret) {
524✔
NEW
169
                g_fail = true;
×
NEW
170
                errorPrint("failed call mix selectAndGetResult, i=%d j=%d", i, j);
×
NEW
171
                return NULL;
×
172
            }
173
            et = toolsGetTimestampUs();
524✔
174

175
            // sleep
176
            if (interval > 0) {
524✔
177
                autoSleep(interval, et - st);
4✔
178
            }
179

180
            // delay
181
            if (ret == 0) {
524✔
182
                int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
524✔
183
                *delay = et - st;
524✔
184
                debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, *delay);
524✔
185

186
                pThreadInfo->total_delay += *delay;
524✔
187
                if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
524✔
NEW
188
                    tmfree(delay);
×
189
                }
190
            }
191

192
            // real show
193
            lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
524✔
194
        }
195
    }
196

197
    return NULL;
9✔
198
}
199

200
// spec query thread
201
static void *specQueryThread(void *sarg) {
126✔
202
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
126✔
203
#ifdef LINUX
204
    prctl(PR_SET_NAME, "specQueryThread");
126✔
205
#endif
206
    uint64_t st = 0;
126✔
207
    uint64_t et = 0;
126✔
208
    int32_t  index = 0;
126✔
209

210
    // use db
211
    if (g_queryInfo.dbName) {
126✔
212
        if (pThreadInfo->conn &&
126✔
213
            pThreadInfo->conn->taos &&
208✔
214
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
104✔
215
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
216
                return NULL;
×
217
        }
218
    }
219

220
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
126✔
221
    uint64_t  interval   = g_queryInfo.specifiedQueryInfo.queryInterval;
126✔
222
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
126✔
223

224
    uint64_t  startTs       = toolsGetTimestampMs();
126✔
225
    uint64_t  lastPrintTime = startTs;
126✔
226

227
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, pThreadInfo->querySeq);
126✔
228

229
    if (sql->result[0] != '\0') {
126✔
230
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
108✔
231
                sql->result, pThreadInfo->threadID);
108✔
232
    }
233

234
    while (index < queryTimes) {
1,845✔
235
        // use cancel
236
        if(g_arguments->terminate) {
1,724✔
NEW
237
            infoPrint("thread[%d] user cancel , so exit testing.\n", pThreadInfo->threadID);
×
238
            break;
6✔
239
        }
240

241
        // reset cache
242
        if (g_queryInfo.reset_query_cache) {
1,724✔
243
            if (resetQueryCache(pThreadInfo)) {
6✔
NEW
244
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
UNCOV
245
                return NULL;
×
246
            }
247
        }
248

249
        // execute sql
250
        st = toolsGetTimestampUs();
1,724✔
251
        int ret = selectAndGetResult(pThreadInfo, sql->command, true);
1,723✔
252
        if (ret) {
1,724✔
253
            g_fail = true;
6✔
254
            errorPrint("failed call spec selectAndGetResult, index=%d\n", index);
6✔
255
            break;
6✔
256
        }
257
        et = toolsGetTimestampUs();
1,718✔
258

259
        // sleep
260
        if (interval > 0) {
1,719✔
261
            autoSleep(interval, et - st);
178✔
262
        }
263

264

265
        uint64_t delay = et - st;
1,719✔
266
        debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
1,719✔
267

268
        if (ret == 0) {
1,719✔
269
            // only succ add delay list
270
            benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
1,719✔
271
            pThreadInfo->total_delay += delay;
1,719✔
272
        }
273
        index++;
1,719✔
274

275
        // real show
276
        lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
1,719✔
277
    }
278

279
    return NULL;
127✔
280
}
281

282
// super table query thread
283
static void *stbQueryThread(void *sarg) {
40✔
284
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
40✔
285
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
40✔
286
#ifdef LINUX
287
    prctl(PR_SET_NAME, "stbQueryThread");
40✔
288
#endif
289

290
    uint64_t st = 0;
40✔
291
    uint64_t et = 0;
40✔
292

293
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
40✔
294
    uint64_t interval   = g_queryInfo.superQueryInfo.queryInterval;
40✔
295
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(uint64_t));
40✔
296
    
297
    uint64_t startTs = toolsGetTimestampMs();
40✔
298
    uint64_t lastPrintTime = startTs;
40✔
299
    while (queryTimes--) {
720✔
300
        // use cancel
301
        if(g_arguments->terminate) {
680✔
NEW
302
            infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
303
            break;
×
304
        }
305

306
        // reset cache
307
        if (g_queryInfo.reset_query_cache) {
680✔
308
            if (resetQueryCache(pThreadInfo)) {
1✔
NEW
309
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
310
                return NULL;
×
311
            }
312
        }
313

314
        // execute
315
        st = toolsGetTimestampMs();
680✔
316
        // for each table
317
        for (int i = (int)pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
2,804✔
318
            // use cancel
319
            if(g_arguments->terminate) {
2,123✔
NEW
320
                infoPrint("%s\n", "user cancel , so exit testing.");
×
NEW
321
                break;
×
322
            }
323

324
            // for each sql
325
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
11,187✔
326
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
9,064✔
327
                // use cancel
328
                if(g_arguments->terminate) {
9,064✔
NEW
329
                    infoPrint("%s\n", "user cancel , so exit testing.");
×
330
                    break;
1✔
331
                }
332
                
333
                // get real child name sql
334
                if (replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i)) {
9,064✔
335
                    // fault
NEW
336
                    tmfree(sqlstr);
×
NEW
337
                    return NULL;
×
338
                }
339

340
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
9,062✔
341
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
8,059✔
342
                            g_queryInfo.superQueryInfo.result[j],
8,059✔
343
                            pThreadInfo->threadID);
344
                }
345

346
                // execute sql
347
                uint64_t s = toolsGetTimestampUs();
9,062✔
348
                int ret = selectAndGetResult(pThreadInfo, sqlstr, true);
9,060✔
349
                if (ret) {
9,061✔
350
                    // found error
NEW
351
                    errorPrint("failed call stb selectAndGetResult, i=%d j=%d\n", i, j);
×
UNCOV
352
                    g_fail = true;
×
NEW
353
                    tmfree(sqlstr);
×
NEW
354
                    return NULL;
×
355
                }
356
                uint64_t delay = toolsGetTimestampUs() - s;
9,061✔
357
                debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
9,063✔
358
                if (ret == 0) {
9,063✔
359
                    // only succ add delay list
360
                    benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
9,064✔
361
                    pThreadInfo->total_delay += delay;
9,058✔
362
                }
363

364
                // show real QPS
365
                lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
9,057✔
366
            }
367
        }
368
        et = toolsGetTimestampMs();
681✔
369

370
        // sleep
371
        if (interval > 0) {
680✔
372
            autoSleep(interval, et - st);
79✔
373
        }
374

375
    }
376
    tmfree(sqlstr);
40✔
377

378
    return NULL;
40✔
379
}
380

381
//
382
// ---------------------------------  firse level function ------------------------------
383
//
384

385
void totalChildQuery(qThreadInfo* infos, int threadCnt, int64_t spend) {
15✔
386
    // valid check
387
    if (infos == NULL || threadCnt == 0) {
15✔
NEW
388
        return ;
×
389
    }
390
    
391
    // statistic
392
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
15✔
393
    double total_delays = 0;
15✔
394

395
    // clear
396
    for (int i = 0; i < threadCnt; ++i) {
64✔
397
        qThreadInfo * pThreadInfo = infos + i;
49✔
398
        if(pThreadInfo->query_delay_list == NULL) {
49✔
NEW
399
            continue;;
×
400
        }
401
        
402
        // append delay
403
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
49✔
404
                pThreadInfo->query_delay_list->size, false);
49✔
405
        total_delays += pThreadInfo->total_delay;
49✔
406

407
        // free delay
408
        benchArrayDestroy(pThreadInfo->query_delay_list);
49✔
409
        pThreadInfo->query_delay_list = NULL;
49✔
410

411
    }
412

413
    // succ is zero
414
    if (delay_list->size == 0) {
15✔
NEW
415
        errorPrint("%s", "succ queries count is zero.\n");
×
NEW
416
        benchArrayDestroy(delay_list);
×
NEW
417
        return ;
×
418
    }
419

420

421
    // sort
422
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
15✔
423

424
    // show delay min max
425
    if (delay_list->size) {
15✔
426
        infoPrint(
15✔
427
                "spend %.6fs using "
428
                "%d threads complete query %d times,  "
429
                "min delay: %.6fs, "
430
                "avg delay: %.6fs, "
431
                "p90: %.6fs, "
432
                "p95: %.6fs, "
433
                "p99: %.6fs, "
434
                "max: %.6fs\n",
435
                spend/1E6,
436
                threadCnt, (int)delay_list->size,
437
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
438
                (double)total_delays/delay_list->size/1E6,
439
                *(int64_t *)(benchArrayGet(delay_list,
440
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
441
                *(int64_t *)(benchArrayGet(delay_list,
442
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
443
                *(int64_t *)(benchArrayGet(delay_list,
444
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
445
                *(int64_t *)(benchArrayGet(delay_list,
446
                                    (int32_t)(delay_list->size - 1)))/1E6);
447
    } else {
NEW
448
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
449
                   __func__, __LINE__, (int64_t)delay_list->size);
450
    }
451
    benchArrayDestroy(delay_list);
15✔
452
}
453

454
//
455
// super table query
456
//
457
static int stbQuery(uint16_t iface, char* dbName) {
11✔
458
    int ret = -1;
11✔
459
    pthread_t * pidsOfSub   = NULL;
11✔
460
    qThreadInfo *threadInfos = NULL;
11✔
461
    g_queryInfo.superQueryInfo.totalQueried = 0;
11✔
462
    g_queryInfo.superQueryInfo.totalFail    = 0;
11✔
463

464
    // check
465
    if ((g_queryInfo.superQueryInfo.sqlCount == 0)
11✔
466
        || (g_queryInfo.superQueryInfo.threadCnt == 0)) {
11✔
UNCOV
467
        return 0;
×
468
    }
469

470
    // malloc 
471
    pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
11✔
472
                            *sizeof(pthread_t),
473
                            false);
474
    threadInfos = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
11✔
475
                                *sizeof(qThreadInfo), false);
476

477
    int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
11✔
478
    int nConcurrent = g_queryInfo.superQueryInfo.threadCnt;
11✔
479

480
    int64_t a = ntables / nConcurrent;
11✔
481
    if (a < 1) {
11✔
482
        nConcurrent = (int)ntables;
2✔
483
        a = 1;
2✔
484
    }
485

486
    int64_t b = 0;
11✔
487
    if (nConcurrent != 0) {
11✔
488
        b = ntables % nConcurrent;
11✔
489
    }
490

491
    uint64_t tableFrom = 0;
11✔
492
    int threadCnt = 0;
11✔
493
    for (int i = 0; i < nConcurrent; i++) {
51✔
494
        qThreadInfo *pThreadInfo = threadInfos + i;
40✔
495
        pThreadInfo->threadID = i;
40✔
496
        pThreadInfo->start_table_from = tableFrom;
40✔
497
        pThreadInfo->ntables = i < b ? a + 1 : a;
40✔
498
        pThreadInfo->end_table_to =
40✔
499
                i < b ? tableFrom + a : tableFrom + a - 1;
40✔
500
        tableFrom = pThreadInfo->end_table_to + 1;
40✔
501
        // create conn
502
        if (initQueryConn(pThreadInfo, iface)){
40✔
NEW
503
            break;
×
504
        }
505
        int code = pthread_create(pidsOfSub + i, NULL, stbQueryThread, pThreadInfo);
40✔
506
        if (code != 0) {
40✔
NEW
507
            errorPrint("failed stbQueryThread create. error code =%d \n", code);
×
NEW
508
            break;
×
509
        }
510
        threadCnt ++;
40✔
511
    }
512

513
    bool needExit = false;
11✔
514
    // if failed, set termainte flag true like ctrl+c exit
515
    if (threadCnt != nConcurrent  ) {
11✔
NEW
516
        needExit = true;
×
NEW
517
        g_arguments->terminate = true;
×
NEW
518
        goto OVER;
×
519
    }
520

521
    // reset total
522
    g_queryInfo.superQueryInfo.totalQueried = 0;
11✔
523
    g_queryInfo.superQueryInfo.totalFail    = 0;
11✔
524

525
    // real thread count
526
    g_queryInfo.superQueryInfo.threadCnt = threadCnt;
11✔
527
    int64_t start = toolsGetTimestampUs();
11✔
528

529
    for (int i = 0; i < threadCnt; i++) {
51✔
530
        pthread_join(pidsOfSub[i], NULL);
40✔
531
        qThreadInfo *pThreadInfo = threadInfos + i;
40✔
532
        // add succ
533
        g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nSucc;
40✔
534
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
40✔
535
            // "yes" need add fail cnt
536
            g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nFail;
6✔
537
            g_queryInfo.superQueryInfo.totalFail    += pThreadInfo->nFail;
6✔
538
        }
539

540
        // close conn
541
        closeQueryConn(pThreadInfo, iface);
40✔
542
    }
543
    int64_t end = toolsGetTimestampUs();
11✔
544

545
    if (needExit) {
11✔
NEW
546
        goto OVER;
×
547
    }
548

549
    // total show
550
    totalChildQuery(threadInfos, threadCnt, end - start);
11✔
551

552
    ret = 0;
11✔
553

554
OVER:
11✔
555
    tmfree((char *)pidsOfSub);
11✔
556
    tmfree((char *)threadInfos);
11✔
557

558
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
91✔
559
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
80✔
560
    }
561
    tmfree(g_queryInfo.superQueryInfo.childTblName);
11✔
562
    return ret;
11✔
563
}
564

565
//
566
// specQuery
567
//
568
static int specQuery(uint16_t iface, char* dbName) {
14✔
569
    int ret = -1;
14✔
570
    pthread_t    *pids = NULL;
14✔
571
    qThreadInfo *infos = NULL;
14✔
572
    int    nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
14✔
573
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
14✔
574
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
14✔
575
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
14✔
576

577
    // check invaid
578
    if(nSqlCount == 0 || nConcurrent == 0 ) {
14✔
UNCOV
579
        if(nSqlCount == 0)
×
UNCOV
580
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
×
UNCOV
581
        if(nConcurrent == 0)
×
NEW
582
           warnPrint("nConcurrent is %d , specified_table_query->nConcurrent is zero. \n", nConcurrent);
×
UNCOV
583
        return 0;
×
584
    }
585

586
    // malloc threads memory
587
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
14✔
588
    infos = benchCalloc(1, nConcurrent * sizeof(qThreadInfo), false);
14✔
589

590
    for (uint64_t i = 0; i < nSqlCount; i++) {
58✔
591
        if( g_arguments->terminate ) {
46✔
NEW
592
            break;
×
593
        }
594

595
        // reset
596
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
46✔
597
        memset(infos, 0, nConcurrent * sizeof(qThreadInfo));
46✔
598

599
        // get execute sql
600
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
46✔
601

602
        // create threads
603
        int threadCnt = 0;
46✔
604
        for (int j = 0; j < nConcurrent; j++) {
172✔
605
           qThreadInfo *pThreadInfo = infos + j;
126✔
606
           pThreadInfo->threadID = i * nConcurrent + j;
126✔
607
           pThreadInfo->querySeq = i;
126✔
608

609
           // create conn
610
           if (initQueryConn(pThreadInfo, iface)) {
126✔
NEW
611
               break;
×
612
           }
613

614
           int code = pthread_create(pids + j, NULL, specQueryThread, pThreadInfo);
126✔
615
           if (code != 0) {
126✔
NEW
616
               errorPrint("failed specQueryThread create. error code =%d \n", code);
×
NEW
617
               break;
×
618
           }
619
           threadCnt++;
126✔
620
        }
621

622
        bool needExit = false;
46✔
623
        // if failed, set termainte flag true like ctrl+c exit
624
        if (threadCnt != nConcurrent  ) {
46✔
NEW
625
            needExit = true;
×
UNCOV
626
            g_arguments->terminate = true;
×
627
        }
628

629
        int64_t start = toolsGetTimestampUs();
46✔
630
        // wait threads execute finished one by one
631
        for (int j = 0; j < threadCnt ; j++) {
172✔
632
           pthread_join(pids[j], NULL);
126✔
633
           qThreadInfo *pThreadInfo = infos + j;
126✔
634
           closeQueryConn(pThreadInfo, iface);
126✔
635

636
           // need exit in loop
637
           if (needExit) {
126✔
638
                // free BArray
NEW
639
                benchArrayDestroy(pThreadInfo->query_delay_list);
×
UNCOV
640
                pThreadInfo->query_delay_list = NULL;
×
641
           }
642
        }
643
        int64_t spend = toolsGetTimestampUs() - start;
46✔
644
        if(spend == 0) {
46✔
645
            // avoid xx/spend expr throw error
NEW
646
            spend = 1;
×
647
        }
648

649
        // create 
650
        if (needExit) {
46✔
NEW
651
            errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
652
            goto OVER;
×
653
        }
654

655
        //
656
        // show QPS and P90 ...
657
        //
658
        uint64_t n = 0;
46✔
659
        double  total_delays = 0.0;
46✔
660
        uint64_t totalQueried = 0;
46✔
661
        uint64_t totalFail    = 0;
46✔
662
        for (int j = 0; j < threadCnt; j++) {
172✔
663
           qThreadInfo *pThreadInfo = infos + j;
126✔
664
           if(pThreadInfo->query_delay_list == NULL) {
126✔
NEW
665
                continue;;
×
666
           }
667
           
668
           // total one sql
669
           for (uint64_t k = 0; k < pThreadInfo->query_delay_list->size; k++) {
1,845✔
670
                int64_t * delay = benchArrayGet(pThreadInfo->query_delay_list, k);
1,719✔
671
                sql->delay_list[n++] = *delay;
1,719✔
672
                total_delays += *delay;
1,719✔
673
           }
674

675
           // total queries
676
           totalQueried += pThreadInfo->nSucc;
126✔
677
           if (g_arguments->continueIfFail == YES_IF_FAILED) {
126✔
678
                totalQueried += pThreadInfo->nFail;
7✔
679
                totalFail    += pThreadInfo->nFail;
7✔
680
           }
681

682
           // free BArray query_delay_list
683
           benchArrayDestroy(pThreadInfo->query_delay_list);
126✔
684
           pThreadInfo->query_delay_list = NULL;
126✔
685
        }
686

687
        // appand current sql
688
        g_queryInfo.specifiedQueryInfo.totalQueried += totalQueried;
46✔
689
        g_queryInfo.specifiedQueryInfo.totalFail    += totalFail;
46✔
690

691
        // succ is zero
692
        if(totalQueried == 0 || n == 0) {
46✔
693
            errorPrint("%s", "succ queries count is zero.\n");
2✔
694
            goto OVER;
2✔
695
        }
696

697
        qsort(sql->delay_list, n, sizeof(uint64_t), compare);
44✔
698
        int32_t bufLen = strlen(sql->command) + 512;
44✔
699
        char * buf = benchCalloc(bufLen, sizeof(char), false);
44✔
700
        snprintf(buf , bufLen, "complete query with %d threads and %" PRIu64 " "
44✔
701
                             "sql %"PRIu64" spend %.6fs QPS: %.3f "
702
                             "query delay "
703
                             "avg: %.6fs "
704
                             "min: %.6fs "
705
                             "max: %.6fs "
706
                             "p90: %.6fs "
707
                             "p95: %.6fs "
708
                             "p99: %.6fs "
709
                             "SQL command: %s \n",
710
                             threadCnt, totalQueried,
711
                             i + 1, spend/1E6, totalQueried / (spend/1E6),
44✔
712
                             total_delays/n/1E6,           /* avg */
44✔
713
                             sql->delay_list[0] / 1E6,     /* min */
44✔
714
                             sql->delay_list[n - 1] / 1E6, /* max */
44✔
715
                             /*  p90 */
716
                             sql->delay_list[(uint64_t)(n * 0.90)] / 1E6,
44✔
717
                             /*  p95 */
718
                             sql->delay_list[(uint64_t)(n * 0.95)] / 1E6,
44✔
719
                             /*  p99 */
720
                             sql->delay_list[(uint64_t)(n * 0.99)] / 1E6, 
44✔
721
                             sql->command);
722

723
        infoPrintNoTimestamp("%s", buf);
44✔
724
        infoPrintNoTimestampToFile("%s", buf);
44✔
725
        tmfree(buf);
44✔
726
    }
727
    ret = 0;
12✔
728

729
OVER:
14✔
730
    tmfree((char *)pids);
14✔
731
    tmfree((char *)infos);
14✔
732

733
    // free specialQueryInfo
734
    freeSpecialQueryInfo();
14✔
735
    return ret;
14✔
736
}
737

738
//
739
// specQueryMix
740
//
741
static int specQueryMix(uint16_t iface, char* dbName) {
4✔
742
    // init
743
    int ret            = -1;
4✔
744
    int nConcurrent    = g_queryInfo.specifiedQueryInfo.concurrent;
4✔
745
    pthread_t * pids   = benchCalloc(nConcurrent, sizeof(pthread_t), true);
4✔
746
    qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true);
4✔
747

748
    // concurent calc
749
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
4✔
750
    int start_sql     = 0;
4✔
751
    int a             = total_sql_num / nConcurrent;
4✔
752
    if (a < 1) {
4✔
753
        warnPrint("sqls num:%d < concurent:%d, so set concurrent to %d\n", total_sql_num, nConcurrent, nConcurrent);
2✔
754
        nConcurrent = total_sql_num;
2✔
755
        a = 1;
2✔
756
    }
757
    int b = 0;
4✔
758
    if (nConcurrent != 0) {
4✔
759
        b = total_sql_num % nConcurrent;
4✔
760
    }
761

762
    //
763
    // running
764
    //
765
    int threadCnt = 0;
4✔
766
    for (int i = 0; i < nConcurrent; ++i) {
13✔
767
        qThreadInfo *pThreadInfo = infos + i;
9✔
768
        pThreadInfo->threadID    = i;
9✔
769
        pThreadInfo->start_sql   = start_sql;
9✔
770
        pThreadInfo->end_sql     = i < b ? start_sql + a : start_sql + a - 1;
9✔
771
        start_sql = pThreadInfo->end_sql + 1;
9✔
772
        pThreadInfo->total_delay = 0;
9✔
773

774
        // create conn
775
        if (initQueryConn(pThreadInfo, iface)){
9✔
NEW
776
            break;
×
777
        }
778
        // main run
779
        int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo);
9✔
780
        if (code != 0) {
9✔
NEW
781
            errorPrint("failed specQueryMixThread create. error code =%d \n", code);
×
NEW
782
            break;
×
783
        }
784
        
785
        threadCnt ++;
9✔
786
    }
787

788
    bool needExit = false;
4✔
789
    // if failed, set termainte flag true like ctrl+c exit
790
    if (threadCnt != nConcurrent) {
4✔
NEW
791
        needExit = true;
×
NEW
792
        g_arguments->terminate = true;
×
793
    }
794

795
    // reset total
796
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
4✔
797
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
4✔
798

799
    int64_t start = toolsGetTimestampUs();
4✔
800
    for (int i = 0; i < threadCnt; ++i) {
13✔
801
        pthread_join(pids[i], NULL);
9✔
802
        qThreadInfo *pThreadInfo = infos + i;
9✔
803
        closeQueryConn(pThreadInfo, iface);
9✔
804

805
        // total queries
806
        g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc;
9✔
807
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
9✔
808
            // yes need add failed count
809
            g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail;
2✔
810
            g_queryInfo.specifiedQueryInfo.totalFail    += pThreadInfo->nFail;
2✔
811
        }
812

813
        // destory
814
        if (needExit) {
9✔
NEW
815
            benchArrayDestroy(pThreadInfo->query_delay_list);
×
NEW
816
            pThreadInfo->query_delay_list = NULL;
×
817
        }
818
    }
819
    int64_t end = toolsGetTimestampUs();
4✔
820

821
    // create 
822
    if (needExit) {
4✔
NEW
823
        errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
824
        goto OVER;
×
825
    }
826

827
    // statistic
828
    totalChildQuery(infos, threadCnt, end - start);
4✔
829
    ret = 0;
4✔
830

831
OVER:
4✔
832
    tmfree(pids);
4✔
833
    tmfree(infos);
4✔
834

835
    // free sqls
836
    freeSpecialQueryInfo();
4✔
837

838
    return ret;
4✔
839
}
840

841
// total query for end 
842
void totalQuery(int64_t spends) {
27✔
843
    // total QPS
844
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
27✔
845
        + g_queryInfo.superQueryInfo.totalQueried;
27✔
846

847
    // error rate
848
    char errRate[128] = "";
27✔
849
    if(g_arguments->continueIfFail == YES_IF_FAILED) {
27✔
850
        uint64_t totalFail = g_queryInfo.specifiedQueryInfo.totalFail + g_queryInfo.superQueryInfo.totalFail;
5✔
851
        if (totalQueried > 0) {
5✔
852
            snprintf(errRate, sizeof(errRate), " ,error %" PRIu64 " (rate:%.3f%%)", totalFail, ((float)totalFail * 100)/totalQueried);
5✔
853
        }
854
    }
855

856
    // show
857
    double  tInS = (double)spends / 1000;
27✔
858
    char buf[512] = "";
27✔
859
    snprintf(buf, sizeof(buf),
27✔
860
                "Spend %.4f second completed total queries: %" PRIu64
861
                ", the QPS of all threads: %10.3f%s\n\n",
862
                tInS, totalQueried, (double)totalQueried / tInS, errRate);
27✔
863
    infoPrint("%s", buf);
27✔
864
    infoPrintToFile("%s", buf);
27✔
865
}
27✔
866

867
int queryTestProcess() {
29✔
868
    prompt(0);
29✔
869

870
    if (REST_IFACE == g_queryInfo.iface) {
29✔
871
        encodeAuthBase64();
9✔
872
    }
873

874
    // kill sql for executing seconds over "kill_slow_query_threshold"
875
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
29✔
NEW
876
        int32_t ret = killSlowQuery();
×
NEW
877
        if (ret != 0) {
×
NEW
878
            return ret;
×
879
        }
880
    }
881

882
    // covert addr
883
    if (g_queryInfo.iface == REST_IFACE) {
29✔
884
        if (convertHostToServAddr(g_arguments->host,
9✔
885
                    g_arguments->port + TSDB_PORT_HTTP,
9✔
886
                    &(g_arguments->serv_addr)) != 0) {
9✔
887
            errorPrint("%s", "convert host to server address\n");
×
888
            return -1;
×
889
        }
890
    }
891

892
    // fetch child name if super table
893
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
29✔
894
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
11✔
895
        int32_t ret = fetchChildTableName(g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
11✔
896
        if (ret != 0) {
11✔
NEW
897
            errorPrint("fetchChildTableName dbName=%s stb=%s failed.", g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
×
UNCOV
898
            return -1;
×
899
        }
900
    }
901

902
    // 
903
    // start running
904
    //
905

906
    
907
    uint64_t startTs = toolsGetTimestampMs();
29✔
908
    if(g_queryInfo.specifiedQueryInfo.sqls && g_queryInfo.specifiedQueryInfo.sqls->size > 0) {
29✔
909
        // specified table
910
        if (g_queryInfo.specifiedQueryInfo.mixed_query) {
18✔
911
            // mixed
912
            if (specQueryMix(g_queryInfo.iface, g_queryInfo.dbName)) {
4✔
NEW
913
                return -1;
×
914
            }
915
        } else {
916
            // no mixied
917
            if (specQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
14✔
918
                return -1;
2✔
919
            }
920
        }
921
    } else if(g_queryInfo.superQueryInfo.sqlCount > 0) {
11✔
922
        // super table
923
        if (stbQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
11✔
UNCOV
924
            return -1;
×
925
        }
926
    } else {
927
        // nothing
NEW
928
        errorPrint("%s\n", "Both 'specified_table_query' and 'super_table_query' sqls is empty.");
×
UNCOV
929
        return -1;
×
930
    }
931

932
    // total 
933
    totalQuery(toolsGetTimestampMs() - startTs); 
27✔
934
    return 0;
27✔
935
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc