• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / taos-tools / 12929336438

23 Jan 2025 12:38PM UTC coverage: 74.977% (-0.3%) from 75.249%
12929336438

Pull #839

github

web-flow
Merge 3ac3ee47d into 84c50c54f
Pull Request #839: FIX mixed query mode no need calc thread

396 of 526 new or added lines in 4 files covered. (75.29%)

296 existing lines in 7 files now uncovered.

12438 of 16589 relevant lines covered (74.98%)

329008.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.37
/src/benchQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include <bench.h>
14
#include "benchLog.h"
15

16
// query and get result  record is true to total request
17
int selectAndGetResult(qThreadInfo *pThreadInfo, char *command, bool record) {
11,317✔
18
    int ret = 0;
11,317✔
19

20
    // user cancel
21
    if (g_arguments->terminate) {
11,317✔
22
        return -1;
×
23
    }
24

25
    // execute sql
26
    uint32_t threadID = pThreadInfo->threadID;
11,317✔
27
    char dbName[TSDB_DB_NAME_LEN] = {0};
11,317✔
28
    tstrncpy(dbName, g_queryInfo.dbName, TSDB_DB_NAME_LEN);
11,317✔
29

30
    if (g_queryInfo.iface == REST_IFACE) {
11,317✔
31
        int retCode = postProceSql(command, g_queryInfo.dbName, 0, REST_IFACE,
2,236✔
32
                                   0, g_arguments->port, false,
2,236✔
33
                                   pThreadInfo->sockfd, pThreadInfo->filePath);
2,236✔
34
        if (0 != retCode) {
2,236✔
NEW
35
            errorPrint("====restful return fail, threadID[%u]\n", threadID);
×
UNCOV
36
            ret = -1;
×
37
        }
38
    } else {
39
        // query
40
        TAOS *taos = pThreadInfo->conn->taos;
9,081✔
41
        int64_t rows  = 0;
9,081✔
42
        TAOS_RES *res = taos_query(taos, command);
9,081✔
43
        int code = taos_errno(res);
9,084✔
44
        if (res == NULL || code) {
9,082✔
45
            // failed query
46
            errorPrint("failed to execute sql:%s, "
9✔
47
                        "code: 0x%08x, reason:%s\n",
48
                        command, code, taos_errstr(res));
49
            ret = -1;
9✔
50
        } else {
51
            // succ query
52
            if (record)
9,073✔
53
                rows = fetchResult(res, pThreadInfo->filePath);
9,067✔
54
        }
55

56
        // free result
57
        if (res) {
9,083✔
58
            taos_free_result(res);
9,084✔
59
        }
60
        debugPrint("query sql:%s rows:%"PRId64"\n", command, rows);
9,084✔
61
    }
62

63
    // record count
64
    if (ret ==0) {
11,320✔
65
        // succ
66
        if (record) 
11,311✔
67
            pThreadInfo->nSucc ++;
11,303✔
68
    } else {
69
        // fail
70
        if (record)
9✔
71
            pThreadInfo->nFail ++;
9✔
72

73
        // continue option
74
        if (YES_IF_FAILED == g_arguments->continueIfFail) {
9✔
75
            ret = 0; // force continue
3✔
76
        }
77
    }
78

79
    return ret;
11,320✔
80
}
81

82
// interlligent sleep
83
void autoSleep(uint64_t delay) {
2,920✔
84
    if (g_queryInfo.specifiedQueryInfo.queryInterval &&
2,920✔
85
        delay < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval * 1000) {
181✔
86
        toolsMsleep((int32_t)(
19✔
87
                g_queryInfo.specifiedQueryInfo.queryInterval*1000 - delay));  // ms
19✔
88
    }
89
}
2,920✔
90

91
// reset 
92
int32_t resetQueryCache(qThreadInfo* pThreadInfo) {    
7✔
93
    // execute sql 
94
    if (selectAndGetResult(pThreadInfo, "RESET QUERY CACHE", false)) {
7✔
NEW
95
        errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
96
        return -1;
×
97
    }
98
    // succ
99
    return 0;
7✔
100
}
101

102

103

104
//
105
//  ---------------------------------  second levle funtion for Thread -----------------------------------
106
//
107

108
// show rela qps
109
int64_t showRealQPS(qThreadInfo* pThreadInfo, int64_t lastPrintTime, int64_t startTs) {
11,303✔
110
    int64_t now = toolsGetTimestampMs();
11,303✔
111
    if (now - lastPrintTime > 10 * 1000) {
11,304✔
112
        // real total
NEW
113
        uint64_t totalQueried = pThreadInfo->nSucc;
×
NEW
114
        if(g_arguments->continueIfFail == YES_IF_FAILED) {
×
NEW
115
            totalQueried += pThreadInfo->nFail;
×
116
        }
NEW
117
        infoPrint(
×
118
            "thread[%d] has currently completed queries: %" PRIu64 ", QPS: %10.3f\n",
119
            pThreadInfo->threadID, totalQueried,
120
            (double)(totalQueried / ((now - startTs) / 1000.0)));
NEW
121
        return now;
×
122
    } else {
123
        return lastPrintTime;
11,305✔
124
    }    
125
}
126

127
// spec query mixed thread
128
static void *specQueryMixThread(void *sarg) {
9✔
129
    qThreadInfo *pThreadInfo = (qThreadInfo*)sarg;
9✔
130
#ifdef LINUX
131
    prctl(PR_SET_NAME, "specQueryMixThread");
9✔
132
#endif
133
    // use db
134
    if (g_queryInfo.dbName) {
9✔
135
        if (pThreadInfo->conn &&
9✔
136
            pThreadInfo->conn->taos &&
12✔
137
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
6✔
UNCOV
138
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
UNCOV
139
                return NULL;
×
140
        }
141
    }
142

143
    int64_t st = 0;
9✔
144
    int64_t et = 0;
9✔
145
    int64_t startTs = toolsGetTimestampMs();
9✔
146
    int64_t lastPrintTime = startTs;
9✔
147
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
9✔
148
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
9✔
149
    for (int i = pThreadInfo->start_sql; i <= pThreadInfo->end_sql; ++i) {
18✔
150
        SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
9✔
151
        for (int j = 0; j < queryTimes; ++j) {
533✔
152
            // use cancel
153
            if(g_arguments->terminate) {
524✔
NEW
154
                infoPrint("%s\n", "user cancel , so exit testing.");
×
UNCOV
155
                break;
×
156
            }
157

158
            // reset cache
159
            if (g_queryInfo.reset_query_cache) {
524✔
NEW
160
                if (resetQueryCache(pThreadInfo)) {
×
UNCOV
161
                    errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
UNCOV
162
                    return NULL;
×
163
                }
164
            }
165

166
            // execute sql
167
            st = toolsGetTimestampUs();
524✔
168
            int ret = selectAndGetResult(pThreadInfo, sql->command, true);
524✔
169
            if (ret) {
524✔
NEW
170
                g_fail = true;
×
NEW
171
                errorPrint("failed call mix selectAndGetResult, i=%d j=%d", i, j);
×
UNCOV
172
                return NULL;
×
173
            }
174
            et = toolsGetTimestampUs();
524✔
175

176
            // sleep
177
            autoSleep(et - st);
524✔
178

179
            // delay
180
            if (ret == 0) {
524✔
181
                int64_t* delay = benchCalloc(1, sizeof(int64_t), false);
524✔
182
                *delay = et - st;
524✔
183
                debugPrint("%s() LN%d, delay: %"PRId64"\n", __func__, __LINE__, *delay);
524✔
184

185
                pThreadInfo->total_delay += *delay;
524✔
186
                if(benchArrayPush(pThreadInfo->query_delay_list, delay) == NULL){
524✔
NEW
187
                    tmfree(delay);
×
188
                }
189
            }
190

191
            // real show
192
            lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
524✔
193
        }
194
    }
195

196
    return NULL;
9✔
197
}
198

199
// spec query thread
200
static void *specQueryThread(void *sarg) {
126✔
201
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
126✔
202
#ifdef LINUX
203
    prctl(PR_SET_NAME, "specQueryThread");
126✔
204
#endif
205
    uint64_t st = 0;
126✔
206
    uint64_t et = 0;
126✔
207
    int32_t  index = 0;
126✔
208

209
    // use db
210
    if (g_queryInfo.dbName) {
126✔
211
        if (pThreadInfo->conn &&
126✔
212
            pThreadInfo->conn->taos &&
208✔
213
            taos_select_db(pThreadInfo->conn->taos, g_queryInfo.dbName)) {
104✔
UNCOV
214
                errorPrint("thread[%d]: failed to select database(%s)\n", pThreadInfo->threadID, g_queryInfo.dbName);
×
215
                return NULL;
×
216
        }
217
    }
218

219
    uint64_t  queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
126✔
220
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(int64_t));
126✔
221

222
    uint64_t  startTs       = toolsGetTimestampMs();
126✔
223
    uint64_t  lastPrintTime = startTs;
126✔
224

225
    SSQL * sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, pThreadInfo->querySeq);
126✔
226

227
    if (sql->result[0] != '\0') {
126✔
228
        snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
108✔
229
                sql->result, pThreadInfo->threadID);
108✔
230
    }
231

232
    while (index < queryTimes) {
1,845✔
233
        // use cancel
234
        if(g_arguments->terminate) {
1,723✔
NEW
235
            infoPrint("thread[%d] user cancel , so exit testing.\n", pThreadInfo->threadID);
×
236
            break;
6✔
237
        }
238

239
        // reset cache
240
        if (g_queryInfo.reset_query_cache) {
1,723✔
241
            if (resetQueryCache(pThreadInfo)) {
6✔
UNCOV
242
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
243
                return NULL;
×
244
            }
245
        }
246

247
        // execute sql
248
        st = toolsGetTimestampUs();
1,723✔
249
        int ret = selectAndGetResult(pThreadInfo, sql->command, true);
1,723✔
250
        if (ret) {
1,724✔
251
            g_fail = true;
6✔
252
            errorPrint("failed call spec selectAndGetResult, index=%d\n", index);
6✔
253
            break;
6✔
254
        }
255
        et = toolsGetTimestampUs();
1,718✔
256

257
        // sleep
258
        autoSleep(et - st);
1,719✔
259

260
        uint64_t delay = et - st;
1,717✔
261
        debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
1,717✔
262

263
        if (ret == 0) {
1,717✔
264
            // only succ add delay list
265
            benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
1,719✔
266
            pThreadInfo->total_delay += delay;
1,719✔
267
        }
268
        index++;
1,717✔
269

270
        // real show
271
        lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
1,717✔
272
    }
273

274
    return NULL;
128✔
275
}
276

277
// super table query thread
278
static void *stbQueryThread(void *sarg) {
40✔
279
    char *sqlstr = benchCalloc(1, TSDB_MAX_ALLOWED_SQL_LEN, false);
40✔
280
    qThreadInfo *pThreadInfo = (qThreadInfo *)sarg;
40✔
281
#ifdef LINUX
282
    prctl(PR_SET_NAME, "stbQueryThread");
40✔
283
#endif
284

285
    uint64_t st = 0;
40✔
286
    uint64_t et = 0;
40✔
287

288
    uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
40✔
289
    pThreadInfo->query_delay_list = benchArrayInit(queryTimes, sizeof(uint64_t));
40✔
290
    
291
    uint64_t startTs = toolsGetTimestampMs();
40✔
292
    uint64_t lastPrintTime = startTs;
40✔
293
    while (queryTimes--) {
719✔
294
        // use cancel
295
        if(g_arguments->terminate) {
679✔
NEW
296
            infoPrint("%s\n", "user cancel , so exit testing.");
×
UNCOV
297
            break;
×
298
        }
299

300
        // reset cache
301
        if (g_queryInfo.reset_query_cache) {
679✔
302
            if (resetQueryCache(pThreadInfo)) {
1✔
NEW
303
                errorPrint("%s() LN%d, reset query cache failed\n", __func__, __LINE__);
×
NEW
304
                return NULL;
×
305
            }
306
        }
307

308
        // execute
309
        st = toolsGetTimestampMs();
679✔
310
        // for each table
311
        for (int i = (int)pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) {
2,803✔
312
            // use cancel
313
            if(g_arguments->terminate) {
2,124✔
NEW
314
                infoPrint("%s\n", "user cancel , so exit testing.");
×
UNCOV
315
                break;
×
316
            }
317

318
            // for each sql
319
            for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) {
11,186✔
320
                memset(sqlstr, 0, TSDB_MAX_ALLOWED_SQL_LEN);
9,064✔
321
                // use cancel
322
                if(g_arguments->terminate) {
9,064✔
NEW
323
                    infoPrint("%s\n", "user cancel , so exit testing.");
×
324
                    break;
1✔
325
                }
326
                
327
                // get real child name sql
328
                if (replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i)) {
9,064✔
329
                    // fault
NEW
330
                    tmfree(sqlstr);
×
NEW
331
                    return NULL;
×
332
                }
333

334
                if (g_queryInfo.superQueryInfo.result[j][0] != '\0') {
9,063✔
335
                    snprintf(pThreadInfo->filePath, MAX_PATH_LEN, "%s-%d",
8,062✔
336
                            g_queryInfo.superQueryInfo.result[j],
8,062✔
337
                            pThreadInfo->threadID);
338
                }
339

340
                // execute sql
341
                uint64_t s = toolsGetTimestampUs();
9,063✔
342
                int ret = selectAndGetResult(pThreadInfo, sqlstr, true);
9,061✔
343
                if (ret) {
9,062✔
344
                    // found error
NEW
345
                    errorPrint("failed call stb selectAndGetResult, i=%d j=%d\n", i, j);
×
NEW
346
                    g_fail = true;
×
NEW
347
                    tmfree(sqlstr);
×
NEW
348
                    return NULL;
×
349
                }
350
                uint64_t delay = toolsGetTimestampUs() - s;
9,062✔
351
                debugPrint("%s() LN%d, delay: %"PRIu64"\n", __func__, __LINE__, delay);
9,064✔
352
                if (ret == 0) {
9,064✔
353
                    // only succ add delay list
354
                    benchArrayPushNoFree(pThreadInfo->query_delay_list, &delay);
9,063✔
355
                    pThreadInfo->total_delay += delay;
9,062✔
356
                }
357

358
                // show real QPS
359
                lastPrintTime = showRealQPS(pThreadInfo, lastPrintTime, startTs);
9,063✔
360
            }
361
        }
362
        et = toolsGetTimestampMs();
679✔
363

364
        // sleep
365
        autoSleep(et - st);
679✔
366
    }
367
    tmfree(sqlstr);
40✔
368

369
    return NULL;
40✔
370
}
371

372
//
373
// ---------------------------------  firse level function ------------------------------
374
//
375

376
void totalChildQuery(qThreadInfo* infos, int threadCnt, int64_t spend) {
15✔
377
    // valid check
378
    if (infos == NULL || threadCnt == 0) {
15✔
UNCOV
379
        return ;
×
380
    }
381
    
382
    // statistic
383
    BArray * delay_list = benchArrayInit(1, sizeof(int64_t));
15✔
384
    double total_delays = 0;
15✔
385

386
    // clear
387
    for (int i = 0; i < threadCnt; ++i) {
64✔
388
        qThreadInfo * pThreadInfo = infos + i;
49✔
389
        if(pThreadInfo->query_delay_list == NULL) {
49✔
NEW
390
            continue;;
×
391
        }
392
        
393
        // append delay
394
        benchArrayAddBatch(delay_list, pThreadInfo->query_delay_list->pData,
49✔
395
                pThreadInfo->query_delay_list->size, false);
49✔
396
        total_delays += pThreadInfo->total_delay;
49✔
397

398
        // free delay
399
        benchArrayDestroy(pThreadInfo->query_delay_list);
49✔
400
        pThreadInfo->query_delay_list = NULL;
49✔
401

402
    }
403

404
    // succ is zero
405
    if (delay_list->size == 0) {
15✔
NEW
406
        errorPrint("%s", "succ queries count is zero.\n");
×
NEW
407
        benchArrayDestroy(delay_list);
×
NEW
408
        return ;
×
409
    }
410

411

412
    // sort
413
    qsort(delay_list->pData, delay_list->size, delay_list->elemSize, compare);
15✔
414

415
    // show delay min max
416
    if (delay_list->size) {
15✔
417
        infoPrint(
15✔
418
                "spend %.6fs using "
419
                "%d threads complete query %d times,  "
420
                "min delay: %.6fs, "
421
                "avg delay: %.6fs, "
422
                "p90: %.6fs, "
423
                "p95: %.6fs, "
424
                "p99: %.6fs, "
425
                "max: %.6fs\n",
426
                spend/1E6,
427
                threadCnt, (int)delay_list->size,
428
                *(int64_t *)(benchArrayGet(delay_list, 0))/1E6,
429
                (double)total_delays/delay_list->size/1E6,
430
                *(int64_t *)(benchArrayGet(delay_list,
431
                                    (int32_t)(delay_list->size * 0.9)))/1E6,
432
                *(int64_t *)(benchArrayGet(delay_list,
433
                                    (int32_t)(delay_list->size * 0.95)))/1E6,
434
                *(int64_t *)(benchArrayGet(delay_list,
435
                                    (int32_t)(delay_list->size * 0.99)))/1E6,
436
                *(int64_t *)(benchArrayGet(delay_list,
437
                                    (int32_t)(delay_list->size - 1)))/1E6);
438
    } else {
NEW
439
        errorPrint("%s() LN%d, delay_list size: %"PRId64"\n",
×
440
                   __func__, __LINE__, (int64_t)delay_list->size);
441
    }
442
    benchArrayDestroy(delay_list);
15✔
443
}
444

445
//
446
// super table query
447
//
448
static int stbQuery(uint16_t iface, char* dbName) {
11✔
449
    int ret = -1;
11✔
450
    pthread_t * pidsOfSub   = NULL;
11✔
451
    qThreadInfo *threadInfos = NULL;
11✔
452
    g_queryInfo.superQueryInfo.totalQueried = 0;
11✔
453
    g_queryInfo.superQueryInfo.totalFail    = 0;
11✔
454

455
    // check
456
    if ((g_queryInfo.superQueryInfo.sqlCount == 0)
11✔
457
        || (g_queryInfo.superQueryInfo.threadCnt == 0)) {
11✔
NEW
458
        return 0;
×
459
    }
460

461
    // malloc 
462
    pidsOfSub = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
11✔
463
                            *sizeof(pthread_t),
464
                            false);
465
    threadInfos = benchCalloc(1, g_queryInfo.superQueryInfo.threadCnt
11✔
466
                                *sizeof(qThreadInfo), false);
467

468
    int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
11✔
469
    int nConcurrent = g_queryInfo.superQueryInfo.threadCnt;
11✔
470

471
    int64_t a = ntables / nConcurrent;
11✔
472
    if (a < 1) {
11✔
473
        nConcurrent = (int)ntables;
2✔
474
        a = 1;
2✔
475
    }
476

477
    int64_t b = 0;
11✔
478
    if (nConcurrent != 0) {
11✔
479
        b = ntables % nConcurrent;
11✔
480
    }
481

482
    uint64_t tableFrom = 0;
11✔
483
    int threadCnt = 0;
11✔
484
    for (int i = 0; i < nConcurrent; i++) {
51✔
485
        qThreadInfo *pThreadInfo = threadInfos + i;
40✔
486
        pThreadInfo->threadID = i;
40✔
487
        pThreadInfo->start_table_from = tableFrom;
40✔
488
        pThreadInfo->ntables = i < b ? a + 1 : a;
40✔
489
        pThreadInfo->end_table_to =
40✔
490
                i < b ? tableFrom + a : tableFrom + a - 1;
40✔
491
        tableFrom = pThreadInfo->end_table_to + 1;
40✔
492
        // create conn
493
        if (initQueryConn(pThreadInfo, iface)){
40✔
NEW
494
            break;
×
495
        }
496
        int code = pthread_create(pidsOfSub + i, NULL, stbQueryThread, pThreadInfo);
40✔
497
        if (code != 0) {
40✔
NEW
498
            errorPrint("failed stbQueryThread create. error code =%d \n", code);
×
NEW
499
            break;
×
500
        }
501
        threadCnt ++;
40✔
502
    }
503

504
    bool needExit = false;
11✔
505
    // if failed, set termainte flag true like ctrl+c exit
506
    if (threadCnt != nConcurrent  ) {
11✔
NEW
507
        needExit = true;
×
NEW
508
        g_arguments->terminate = true;
×
NEW
509
        goto OVER;
×
510
    }
511

512
    // reset total
513
    g_queryInfo.superQueryInfo.totalQueried = 0;
11✔
514
    g_queryInfo.superQueryInfo.totalFail    = 0;
11✔
515

516
    // real thread count
517
    g_queryInfo.superQueryInfo.threadCnt = threadCnt;
11✔
518
    int64_t start = toolsGetTimestampUs();
11✔
519

520
    for (int i = 0; i < threadCnt; i++) {
51✔
521
        pthread_join(pidsOfSub[i], NULL);
40✔
522
        qThreadInfo *pThreadInfo = threadInfos + i;
40✔
523
        // add succ
524
        g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nSucc;
40✔
525
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
40✔
526
            // "yes" need add fail cnt
527
            g_queryInfo.superQueryInfo.totalQueried += pThreadInfo->nFail;
6✔
528
            g_queryInfo.superQueryInfo.totalFail    += pThreadInfo->nFail;
6✔
529
        }
530

531
        // close conn
532
        closeQueryConn(pThreadInfo, iface);
40✔
533
    }
534
    int64_t end = toolsGetTimestampUs();
11✔
535

536
    if (needExit) {
11✔
NEW
537
        goto OVER;
×
538
    }
539

540
    // total show
541
    totalChildQuery(threadInfos, threadCnt, end - start);
11✔
542

543
    ret = 0;
11✔
544

545
OVER:
11✔
546
    tmfree((char *)pidsOfSub);
11✔
547
    tmfree((char *)threadInfos);
11✔
548

549
    for (int64_t i = 0; i < g_queryInfo.superQueryInfo.childTblCount; ++i) {
91✔
550
        tmfree(g_queryInfo.superQueryInfo.childTblName[i]);
80✔
551
    }
552
    tmfree(g_queryInfo.superQueryInfo.childTblName);
11✔
553
    return ret;
11✔
554
}
555

556
//
557
// specQuery
558
//
559
static int specQuery(uint16_t iface, char* dbName) {
14✔
560
    int ret = -1;
14✔
561
    pthread_t    *pids = NULL;
14✔
562
    qThreadInfo *infos = NULL;
14✔
563
    int    nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
14✔
564
    uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqls->size;
14✔
565
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
14✔
566
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
14✔
567

568
    // check invaid
569
    if(nSqlCount == 0 || nConcurrent == 0 ) {
14✔
NEW
570
        if(nSqlCount == 0)
×
NEW
571
           warnPrint("specified table query sql count is %" PRIu64 ".\n", nSqlCount);
×
NEW
572
        if(nConcurrent == 0)
×
UNCOV
573
           warnPrint("nConcurrent is %d , specified_table_query->nConcurrent is zero. \n", nConcurrent);
×
NEW
574
        return 0;
×
575
    }
576

577
    // malloc threads memory
578
    pids  = benchCalloc(1, nConcurrent * sizeof(pthread_t),  false);
14✔
579
    infos = benchCalloc(1, nConcurrent * sizeof(qThreadInfo), false);
14✔
580

581
    for (uint64_t i = 0; i < nSqlCount; i++) {
58✔
582
        if( g_arguments->terminate ) {
46✔
UNCOV
583
            break;
×
584
        }
585

586
        // reset
587
        memset(pids,  0, nConcurrent * sizeof(pthread_t));
46✔
588
        memset(infos, 0, nConcurrent * sizeof(qThreadInfo));
46✔
589

590
        // get execute sql
591
        SSQL *sql = benchArrayGet(g_queryInfo.specifiedQueryInfo.sqls, i);
46✔
592

593
        // create threads
594
        int threadCnt = 0;
46✔
595
        for (int j = 0; j < nConcurrent; j++) {
172✔
596
           qThreadInfo *pThreadInfo = infos + j;
126✔
597
           pThreadInfo->threadID = i * nConcurrent + j;
126✔
598
           pThreadInfo->querySeq = i;
126✔
599

600
           // create conn
601
           if (initQueryConn(pThreadInfo, iface)) {
126✔
UNCOV
602
               break;
×
603
           }
604

605
           int code = pthread_create(pids + j, NULL, specQueryThread, pThreadInfo);
126✔
606
           if (code != 0) {
126✔
UNCOV
607
               errorPrint("failed specQueryThread create. error code =%d \n", code);
×
UNCOV
608
               break;
×
609
           }
610
           threadCnt++;
126✔
611
        }
612

613
        bool needExit = false;
46✔
614
        // if failed, set termainte flag true like ctrl+c exit
615
        if (threadCnt != nConcurrent  ) {
46✔
NEW
616
            needExit = true;
×
NEW
617
            g_arguments->terminate = true;
×
618
        }
619

620
        int64_t start = toolsGetTimestampUs();
46✔
621
        // wait threads execute finished one by one
622
        for (int j = 0; j < threadCnt ; j++) {
172✔
623
           pthread_join(pids[j], NULL);
126✔
624
           qThreadInfo *pThreadInfo = infos + j;
126✔
625
           closeQueryConn(pThreadInfo, iface);
126✔
626

627
           // need exit in loop
628
           if (needExit) {
126✔
629
                // free BArray
UNCOV
630
                benchArrayDestroy(pThreadInfo->query_delay_list);
×
UNCOV
631
                pThreadInfo->query_delay_list = NULL;
×
632
           }
633
        }
634
        int64_t spend = toolsGetTimestampUs() - start;
46✔
635
        if(spend == 0) {
46✔
636
            // avoid xx/spend expr throw error
NEW
637
            spend = 1;
×
638
        }
639

640
        // create 
641
        if (needExit) {
46✔
UNCOV
642
            errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
643
            goto OVER;
×
644
        }
645

646
        //
647
        // show QPS and P90 ...
648
        //
649
        uint64_t n = 0;
46✔
650
        double  total_delays = 0.0;
46✔
651
        uint64_t totalQueried = 0;
46✔
652
        uint64_t totalFail    = 0;
46✔
653
        for (int j = 0; j < threadCnt; j++) {
172✔
654
           qThreadInfo *pThreadInfo = infos + j;
126✔
655
           if(pThreadInfo->query_delay_list == NULL) {
126✔
NEW
656
                continue;;
×
657
           }
658
           
659
           // total one sql
660
           for (uint64_t k = 0; k < pThreadInfo->query_delay_list->size; k++) {
1,845✔
661
                int64_t * delay = benchArrayGet(pThreadInfo->query_delay_list, k);
1,719✔
662
                sql->delay_list[n++] = *delay;
1,719✔
663
                total_delays += *delay;
1,719✔
664
           }
665

666
           // total queries
667
           totalQueried += pThreadInfo->nSucc;
126✔
668
           if (g_arguments->continueIfFail == YES_IF_FAILED) {
126✔
669
                totalQueried += pThreadInfo->nFail;
7✔
670
                totalFail    += pThreadInfo->nFail;
7✔
671
           }
672

673
           // free BArray query_delay_list
674
           benchArrayDestroy(pThreadInfo->query_delay_list);
126✔
675
           pThreadInfo->query_delay_list = NULL;
126✔
676
        }
677

678
        // appand current sql
679
        g_queryInfo.specifiedQueryInfo.totalQueried += totalQueried;
46✔
680
        g_queryInfo.specifiedQueryInfo.totalFail    += totalFail;
46✔
681

682
        // succ is zero
683
        if(totalQueried == 0 || n == 0) {
46✔
684
            errorPrint("%s", "succ queries count is zero.\n");
2✔
685
            goto OVER;
2✔
686
        }
687

688
        qsort(sql->delay_list, n, sizeof(uint64_t), compare);
44✔
689
        int32_t bufLen = strlen(sql->command) + 512;
44✔
690
        char * buf = benchCalloc(bufLen, sizeof(char), false);
44✔
691
        snprintf(buf , bufLen, "complete query with %d threads and %" PRIu64 " "
44✔
692
                             "sql %"PRIu64" spend %.6fs QPS: %.3f "
693
                             "query delay "
694
                             "avg: %.6fs "
695
                             "min: %.6fs "
696
                             "max: %.6fs "
697
                             "p90: %.6fs "
698
                             "p95: %.6fs "
699
                             "p99: %.6fs "
700
                             "SQL command: %s \n",
701
                             threadCnt, totalQueried,
702
                             i + 1, spend/1E6, totalQueried / (spend/1E6),
44✔
703
                             total_delays/n/1E6,           /* avg */
44✔
704
                             sql->delay_list[0] / 1E6,     /* min */
44✔
705
                             sql->delay_list[n - 1] / 1E6, /* max */
44✔
706
                             /*  p90 */
707
                             sql->delay_list[(uint64_t)(n * 0.90)] / 1E6,
44✔
708
                             /*  p95 */
709
                             sql->delay_list[(uint64_t)(n * 0.95)] / 1E6,
44✔
710
                             /*  p99 */
711
                             sql->delay_list[(uint64_t)(n * 0.99)] / 1E6, 
44✔
712
                             sql->command);
713

714
        infoPrintNoTimestamp("%s", buf);
44✔
715
        infoPrintNoTimestampToFile("%s", buf);
44✔
716
        tmfree(buf);
44✔
717
    }
718
    ret = 0;
12✔
719

720
OVER:
14✔
721
    tmfree((char *)pids);
14✔
722
    tmfree((char *)infos);
14✔
723

724
    // free specialQueryInfo
725
    freeSpecialQueryInfo();
14✔
726
    return ret;
14✔
727
}
728

729
//
730
// specQueryMix
731
//
732
static int specQueryMix(uint16_t iface, char* dbName) {
4✔
733
    // init
734
    int ret            = -1;
4✔
735
    int nConcurrent    = g_queryInfo.specifiedQueryInfo.concurrent;
4✔
736
    pthread_t * pids   = benchCalloc(nConcurrent, sizeof(pthread_t), true);
4✔
737
    qThreadInfo *infos = benchCalloc(nConcurrent, sizeof(qThreadInfo), true);
4✔
738

739
    // concurent calc
740
    int total_sql_num = g_queryInfo.specifiedQueryInfo.sqls->size;
4✔
741
    int start_sql     = 0;
4✔
742
    int a             = total_sql_num / nConcurrent;
4✔
743
    if (a < 1) {
4✔
744
        warnPrint("sqls num:%d < concurent:%d, so set concurrent to %d\n", total_sql_num, nConcurrent, nConcurrent);
2✔
745
        nConcurrent = total_sql_num;
2✔
746
        a = 1;
2✔
747
    }
748
    int b = 0;
4✔
749
    if (nConcurrent != 0) {
4✔
750
        b = total_sql_num % nConcurrent;
4✔
751
    }
752

753
    //
754
    // running
755
    //
756
    int threadCnt = 0;
4✔
757
    for (int i = 0; i < nConcurrent; ++i) {
13✔
758
        qThreadInfo *pThreadInfo = infos + i;
9✔
759
        pThreadInfo->threadID    = i;
9✔
760
        pThreadInfo->start_sql   = start_sql;
9✔
761
        pThreadInfo->end_sql     = i < b ? start_sql + a : start_sql + a - 1;
9✔
762
        start_sql = pThreadInfo->end_sql + 1;
9✔
763
        pThreadInfo->total_delay = 0;
9✔
764

765
        // create conn
766
        if (initQueryConn(pThreadInfo, iface)){
9✔
NEW
767
            break;
×
768
        }
769
        // main run
770
        int code = pthread_create(pids + i, NULL, specQueryMixThread, pThreadInfo);
9✔
771
        if (code != 0) {
9✔
NEW
772
            errorPrint("failed specQueryMixThread create. error code =%d \n", code);
×
NEW
773
            break;
×
774
        }
775
        
776
        threadCnt ++;
9✔
777
    }
778

779
    bool needExit = false;
4✔
780
    // if failed, set termainte flag true like ctrl+c exit
781
    if (threadCnt != nConcurrent) {
4✔
NEW
782
        needExit = true;
×
NEW
783
        g_arguments->terminate = true;
×
784
    }
785

786
    // reset total
787
    g_queryInfo.specifiedQueryInfo.totalQueried = 0;
4✔
788
    g_queryInfo.specifiedQueryInfo.totalFail    = 0;
4✔
789

790
    int64_t start = toolsGetTimestampUs();
4✔
791
    for (int i = 0; i < threadCnt; ++i) {
13✔
792
        pthread_join(pids[i], NULL);
9✔
793
        qThreadInfo *pThreadInfo = infos + i;
9✔
794
        closeQueryConn(pThreadInfo, iface);
9✔
795

796
        // total queries
797
        g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nSucc;
9✔
798
        if (g_arguments->continueIfFail == YES_IF_FAILED) {
9✔
799
            // yes need add failed count
800
            g_queryInfo.specifiedQueryInfo.totalQueried += pThreadInfo->nFail;
2✔
801
            g_queryInfo.specifiedQueryInfo.totalFail    += pThreadInfo->nFail;
2✔
802
        }
803

804
        // destory
805
        if (needExit) {
9✔
NEW
806
            benchArrayDestroy(pThreadInfo->query_delay_list);
×
NEW
807
            pThreadInfo->query_delay_list = NULL;
×
808
        }
809
    }
810
    int64_t end = toolsGetTimestampUs();
4✔
811

812
    // create 
813
    if (needExit) {
4✔
NEW
814
        errorPrint("failed to create thread. expect nConcurrent=%d real threadCnt=%d,  exit testing.\n", nConcurrent, threadCnt);
×
NEW
815
        goto OVER;
×
816
    }
817

818
    // statistic
819
    totalChildQuery(infos, threadCnt, end - start);
4✔
820
    ret = 0;
4✔
821

822
OVER:
4✔
823
    tmfree(pids);
4✔
824
    tmfree(infos);
4✔
825

826
    // free sqls
827
    freeSpecialQueryInfo();
4✔
828

829
    return ret;
4✔
830
}
831

832
// total query for end 
833
void totalQuery(int64_t spends) {
27✔
834
    // total QPS
835
    uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried
27✔
836
        + g_queryInfo.superQueryInfo.totalQueried;
27✔
837

838
    // error rate
839
    char errRate[128] = "";
27✔
840
    if(g_arguments->continueIfFail == YES_IF_FAILED) {
27✔
841
        uint64_t totalFail = g_queryInfo.specifiedQueryInfo.totalFail + g_queryInfo.superQueryInfo.totalFail;
5✔
842
        if (totalQueried > 0) {
5✔
843
            snprintf(errRate, sizeof(errRate), " ,error %" PRIu64 " (rate:%.3f%%)", totalFail, ((float)totalFail * 100)/totalQueried);
5✔
844
        }
845
    }
846

847
    // show
848
    double  tInS = (double)spends / 1000;
27✔
849
    char buf[512] = "";
27✔
850
    snprintf(buf, sizeof(buf),
27✔
851
                "Spend %.4f second completed total queries: %" PRIu64
852
                ", the QPS of all threads: %10.3f%s\n\n",
853
                tInS, totalQueried, (double)totalQueried / tInS, errRate);
27✔
854
    infoPrint("%s", buf);
27✔
855
    infoPrintToFile("%s", buf);
27✔
856
}
27✔
857

858
int queryTestProcess() {
29✔
859
    prompt(0);
29✔
860

861
    if (REST_IFACE == g_queryInfo.iface) {
29✔
862
        encodeAuthBase64();
9✔
863
    }
864

865
    // kill sql for executing seconds over "kill_slow_query_threshold"
866
    if (g_queryInfo.iface == TAOSC_IFACE && g_queryInfo.killQueryThreshold) {
29✔
UNCOV
867
        int32_t ret = killSlowQuery();
×
UNCOV
868
        if (ret != 0) {
×
UNCOV
869
            return ret;
×
870
        }
871
    }
872

873
    // covert addr
874
    if (g_queryInfo.iface == REST_IFACE) {
29✔
875
        if (convertHostToServAddr(g_arguments->host,
9✔
876
                    g_arguments->port + TSDB_PORT_HTTP,
9✔
877
                    &(g_arguments->serv_addr)) != 0) {
9✔
NEW
878
            errorPrint("%s", "convert host to server address\n");
×
NEW
879
            return -1;
×
880
        }
881
    }
882

883
    // fetch child name if super table
884
    if ((g_queryInfo.superQueryInfo.sqlCount > 0) &&
29✔
885
            (g_queryInfo.superQueryInfo.threadCnt > 0)) {
11✔
886
        int32_t ret = fetchChildTableName(g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
11✔
887
        if (ret != 0) {
11✔
888
            errorPrint("fetchChildTableName dbName=%s stb=%s failed.", g_queryInfo.dbName, g_queryInfo.superQueryInfo.stbName);
×
UNCOV
889
            return -1;
×
890
        }
891
    }
892

893
    // 
894
    // start running
895
    //
896

897
    
898
    uint64_t startTs = toolsGetTimestampMs();
29✔
899
    if(g_queryInfo.specifiedQueryInfo.sqls && g_queryInfo.specifiedQueryInfo.sqls->size > 0) {
29✔
900
        // specified table
901
        if (g_queryInfo.specifiedQueryInfo.mixed_query) {
18✔
902
            // mixed
903
            if (specQueryMix(g_queryInfo.iface, g_queryInfo.dbName)) {
4✔
NEW
904
                return -1;
×
905
            }
906
        } else {
907
            // no mixied
908
            if (specQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
14✔
909
                return -1;
2✔
910
            }
911
        }
912
    } else if(g_queryInfo.superQueryInfo.sqlCount > 0) {
11✔
913
        // super table
914
        if (stbQuery(g_queryInfo.iface, g_queryInfo.dbName)) {
11✔
915
            return -1;
×
916
        }
917
    } else {
918
        // nothing
UNCOV
919
        errorPrint("%s\n", "Both 'specified_table_query' and 'super_table_query' sqls is empty.");
×
UNCOV
920
        return -1;
×
921
    }
922

923
    // total 
924
    totalQuery(toolsGetTimestampMs() - startTs); 
27✔
925
    return 0;
27✔
926
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc