• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.82
/tools/taos-tools/src/benchInsertMix.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include "bench.h"
14
#include "benchLog.h"
15
#include "wrapDb.h"
16
#include "benchData.h"
17
#include "benchDataMix.h"
18
#include "benchInsertMix.h"
19

20

21
//
22
// ------------------ mix ratio area -----------------------
23
//
24

25
#define MDIS 0
26
#define MUPD 1
27
#define MDEL 2
28

29
#define MCNT 3
30

31
#define  NEED_TAKEOUT_ROW_TOBUF(type)  (mix->genCnt[type] > 0 && mix->doneCnt[type] + mix->bufCnt[type] < mix->genCnt[type] && taosRandom()%100 <= mix->ratio[type]*2)
32
#define  FORCE_TAKEOUT(type) (mix->insertedRows * 100 / mix->insertRows > 80)
33

34
#define FAILED_BREAK()   \
35
    if (g_arguments->continueIfFail == YES_IF_FAILED) {  \
36
        continue;                        \
37
    } else {                             \
38
        g_fail = true;                   \
39
        break;                           \
40
    }                                    \
41

42
typedef struct {
43
  uint64_t insertRows;   // need insert
44
  uint64_t insertedRows; // already inserted
45

46
  int8_t ratio[MCNT];
47
  int64_t range[MCNT];
48

49
  // need generate count , calc from stb->insertRows * ratio
50
  uint64_t genCnt[MCNT];
51

52
  // status already done count
53
  uint64_t doneCnt[MCNT];
54

55
  // task out from batch to list buffer
56
  TSKEY* buf[MCNT];
57

58
  // buffer cnt
59
  uint64_t capacity[MCNT]; // capacity size for buf
60
  uint64_t bufCnt[MCNT];   // current valid cnt in buf
61

62
  // calc need value
63
  int32_t curBatchCnt;
64

65
} SMixRatio;
66

67
typedef struct {
68
  uint64_t ordRows; // add new order rows
69
  uint64_t updRows;
70
  uint64_t disRows;
71
  uint64_t delRows;
72
} STotal;
73

74

75
void mixRatioInit(SMixRatio* mix, SSuperTable* stb) {
19,521✔
76
  memset(mix, 0, sizeof(SMixRatio));
19,521✔
77
  mix->insertRows = stb->insertRows;
19,521✔
78
  uint32_t batchSize = g_arguments->reqPerReq;
19,521✔
79

80
  if (batchSize == 0) batchSize = 1;
19,521!
81

82
  // set ratio
83
  mix->ratio[MDIS] = stb->disRatio;
19,521✔
84
  mix->ratio[MUPD] = stb->updRatio;
19,521✔
85
  mix->ratio[MDEL] = stb->delRatio;
19,521✔
86

87
  // set range
88
  mix->range[MDIS] = stb->disRange;
19,521✔
89
  mix->range[MUPD] = stb->updRange;
19,521✔
90
  mix->range[MDEL] = stb->delRange;
19,521✔
91

92
  // calc count
93
  mix->genCnt[MDIS] = mix->insertRows * stb->disRatio / 100;
19,521✔
94
  mix->genCnt[MUPD] = mix->insertRows * stb->updRatio / 100;
19,521✔
95
  mix->genCnt[MDEL] = mix->insertRows * stb->delRatio / 100;
19,521✔
96

97
  if(FULL_DISORDER(stb)) mix->genCnt[MDIS] = 0;
19,521!
98

99
  // malloc buffer
100
  for (int32_t i = 0; i < MCNT - 1; i++) {
58,571✔
101
    // max
102
    if (mix->genCnt[i] > 0) {
39,050✔
103
      // buffer max count calc
104
      mix->capacity[i] = batchSize * 10 + mix->genCnt[i]* 8 / 1000;
39,004✔
105
      mix->buf[i] = calloc(mix->capacity[i], sizeof(TSKEY));
39,004✔
106
    } else {
107
      mix->capacity[i] = 0;
46✔
108
      mix->buf[i] = NULL;
46✔
109
    }
110
    mix->bufCnt[i] = 0;
39,050✔
111
  }
112
}
19,521✔
113

114
void mixRatioExit(SMixRatio* mix) {
19,527✔
115
  // free buffer
116
  for (int32_t i = 0; i < MCNT; i++) {
78,108✔
117
    if (mix->buf[i]) {
58,581✔
118
      free(mix->buf[i]);
39,008✔
119
      mix->buf[i] = NULL;
39,008✔
120
    }
121
  }
122
}
19,527✔
123

124
//
125
//  --------------------- util ----------------
126
//
127

128
// return true can do execute delete sql
129
bool needExecDel(SMixRatio* mix) {
52,179✔
130
  if (mix->genCnt[MDEL] == 0 || mix->doneCnt[MDEL] >= mix->genCnt[MDEL]) {
52,179✔
131
    return false;
3,607✔
132
  }
133

134
  return true;
48,572✔
135
}
136

137
//
138
// ------------------ gen area -----------------------
139
//
140

141
#define SWAP(cols, l, r)  mid= cols[l]; cols[l]=cols[r]; cols[r]=mid;
142
void randomFillCols(uint16_t* cols, uint16_t max, uint16_t cnt) {
×
143
  uint16_t i;
144
  // fill index
145
  for (i = 0; i < max; i++) {
×
146
    cols[i] = i;
×
147
  }
148

149
  // check special
150
  if (cnt == max || cnt == 1) {
×
151
    return;
×
152
  }
153

154
  // swap cnt with random
155
  for (i = 0; i < cnt; i++) {
×
156
    uint16_t left, right, mid;
157
    left = RD(cnt);
×
158
    right = cnt + RD(max - cnt);
×
159
    SWAP(cols, left, right)
×
160
  }
161
}
162

163
char* genBatColsNames(threadInfo* info, SSuperTable* stb) {
52,163✔
164
  int32_t size = info->nBatCols * (TSDB_COL_NAME_LEN + 2);
52,163✔
165
  char* buf = calloc(1, size);
52,163✔
166
  strcpy(buf, TS_COL_NAME);
52,163✔
167

168
  for (uint16_t i = 0; i < info->nBatCols; i++) {
225,594✔
169
    uint16_t idx = info->batCols[i];
173,441✔
170
    Field* fd = benchArrayGet(stb->cols, idx);
173,441✔
171
    strcat(buf, ",");
173,431✔
172
    strcat(buf, fd->name);
173,431✔
173
  }
174

175
  return buf;
52,153✔
176
}
177

178
//
179
// generate head
180
//
181
uint32_t genInsertPreSql(threadInfo* info, SDataBase* db, SSuperTable* stb, char* tableName, char* tagData, uint64_t tableSeq, char* pstr) {
52,178✔
182
  uint32_t len = 0;
52,178✔
183

184
  if (stb->genRowRule == RULE_OLD || stb->genRowRule == RULE_MIX_RANDOM) {
52,178!
185
    // ttl
186
    char ttl[20] = "";
11✔
187
    if (stb->ttl != 0) {
11!
188
      sprintf(ttl, "TTL %d", stb->ttl);
×
189
    }
190

191
    if (stb->partialColNum == stb->cols->size) {
11✔
192
      if (stb->autoTblCreating) {
10!
193
        len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s USING %s.%s TAGS (%s) %s VALUES ", STR_INSERT_INTO, db->dbName,
×
194
                       tableName, db->dbName, stb->stbName, tagData + stb->lenOfTags * tableSeq, ttl);
×
195
      } else {
196
        len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s VALUES ", STR_INSERT_INTO, db->dbName, tableName);
10✔
197
      }
198
    } else {
199
      if (stb->autoTblCreating) {
1!
200
        len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s (%s) USING %s.%s TAGS (%s) %s VALUES ", STR_INSERT_INTO, db->dbName,
×
201
                       tableName, stb->partialColNameBuf, db->dbName, stb->stbName,
202
                       tagData + stb->lenOfTags * tableSeq, ttl);
×
203
      } else {
204
        len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s (%s) VALUES ", STR_INSERT_INTO, db->dbName, tableName,
1✔
205
                       stb->partialColNameBuf);
206
      }
207
    }
208

209
    // generate check sql
210
    if(info->csql) {
11!
211
      info->clen = snprintf(info->csql, TSDB_MAX_ALLOWED_SQL_LEN, "select count(*) from %s.%s where ts in(", db->dbName, tableName);
×
212
    }
213
    return len;
11✔
214
  }
215

216
  // generate check sql
217
  if (info->csql) {
52,167✔
218
    info->clen = snprintf(info->csql, TSDB_MAX_ALLOWED_SQL_LEN, "select count(*) from %s.%s where ts in(", db->dbName, tableName);
420✔
219
  }
220

221
  // new mix rule
222
  int32_t max = stb->cols->size > MAX_BATCOLS ? MAX_BATCOLS : stb->cols->size;
52,167✔
223

224
  if(stb->partialColNum > 0 && stb->partialColNum < MAX_BATCOLS) {
104,337!
225
    info->nBatCols = stb->partialColNum;
52,170✔
226
    int j = 0;
52,170✔
227
    for (int i = stb->partialColFrom; i < stb->partialColFrom + stb->partialColNum; i++) {
225,602✔
228
      info->batCols[j++] = i;
173,432✔
229
    }
230
  } else {
231
    info->nBatCols = RD(max) + 1;
×
232
    // random select cnt elements from max
233
    randomFillCols(info->batCols, max, info->nBatCols);
×
234
  }
235
  
236
  char * colNames = genBatColsNames(info, stb);
52,170✔
237
  len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s (%s) VALUES ", STR_INSERT_INTO, db->dbName, tableName, colNames);
52,170✔
238
  free(colNames);
52,170✔
239

240
  return len;
52,170✔
241
}
242

243
//
244
// generate delete pre sql like "delete from st"
245
//
246
uint32_t genDelPreSql(SDataBase* db, SSuperTable* stb, char* tableName, char* pstr) {
48,571✔
247
  uint32_t len = 0;
48,571✔
248
  // super table name or child table name random select
249
  len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "delete from %s.%s where ", db->dbName, tableName);
48,571✔
250

251
  return len;
48,571✔
252
}
253

254
//
255
// append row to batch buffer
256
//
257
uint32_t appendRowRuleOld(SSuperTable* stb, char* pstr, uint32_t len, int64_t timestamp) {
×
258
  uint32_t size = 0;
×
259
  int32_t  pos = RD(g_arguments->prepared_rand);
×
260
  int      disorderRange = stb->disorderRange;
×
261

262
  if (stb->useSampleTs && !stb->random_data_source) {
×
263
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%s)", stb->sampleDataBuf + pos * stb->lenOfCols);
×
264
  } else {
265
    int64_t disorderTs = 0;
×
266
    if (stb->disorderRatio > 0) {
×
267
      int rand_num = taosRandom() % 100;
×
268
      if (rand_num < stb->disorderRatio) {
×
269
        disorderRange--;
×
270
        if (0 == disorderRange) {
×
271
          disorderRange = stb->disorderRange;
×
272
        }
273
        disorderTs = stb->startTimestamp - disorderRange;
×
274
        debugPrint(
×
275
            "rand_num: %d, < disorderRatio:"
276
            " %d, disorderTs: %" PRId64 "\n",
277
            rand_num, stb->disorderRatio, disorderTs);
278
      }
279
    }
280
    // generate
281
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%" PRId64 ",%s)", disorderTs ? disorderTs : timestamp,
×
282
                    stb->sampleDataBuf + pos * stb->lenOfCols);
×
283
  }
284

285
  return size;
×
286
}
287

288
#define GET_IDX(i) info->batCols[i]
289
uint32_t genRowMixAll(threadInfo* info, SSuperTable* stb, char* pstr, uint32_t len, int64_t ts, int64_t* k) {
63,273,467✔
290
  uint32_t size = 0;
63,273,467✔
291
  // first col is ts
292
  if (stb->useNow) {
63,273,467!
293
    char now[32] = "now";
×
294
    // write future 1% fixed fill
295
    if (stb->writeFuture && RD(100) == 0) {
×
296
      int32_t min = RD(stb->durMinute);
×
297
      if (min <= 0) min = 1;
×
298
      if (min > 120) min -= 60;  // delay 1 hour prevent date time out
×
299
      sprintf(now, "now+%dm", min);
×
300
    }
301

302
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%s", now);
×
303
  } else {
304
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%" PRId64, ts);
63,273,467✔
305
  }
306

307
  // other cols data
308
  for(uint16_t i = 0; i< info->nBatCols; i++) {
290,923,434✔
309
    Field* fd = benchArrayGet(stb->cols, GET_IDX(i));
227,655,991✔
310
    char* prefix = "";
227,115,641✔
311
    if(fd->type == TSDB_DATA_TYPE_BINARY) {
227,115,641✔
312
      if(stb->binaryPrefex) {
3,867,041!
313
        prefix = stb->binaryPrefex;
×
314
      }
315
    } else if(fd->type == TSDB_DATA_TYPE_NCHAR) {
223,248,600✔
316
      if(stb->ncharPrefex) {
3,866,702!
317
        prefix = stb->ncharPrefex;
×
318
      }
319
    }
320

321
    size += dataGenByField(fd, pstr, len + size, prefix, k, VAL_NULL);
227,115,641✔
322
  }
323

324
  // end
325
  size += snprintf(pstr + len + size, TSDB_MAX_ALLOWED_SQL_LEN - len - size, "%s", ")");
63,267,443✔
326

327
  return size;
63,267,443✔
328
}
329

330
uint32_t genRowTsCalc(threadInfo* info, SSuperTable* stb, char* pstr, uint32_t len, int64_t ts) {
35,489✔
331
  uint32_t size = 0;
35,489✔
332
  // first col is ts
333
  size = snprintf(pstr +len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%" PRId64, ts);
35,489✔
334

335
  // other cols data
336
  for(uint16_t i = 0; i< info->nBatCols; i++) {
496,942✔
337
    Field* fd = benchArrayGet(stb->cols, GET_IDX(i));
461,463✔
338
    size += dataGenByCalcTs(fd, pstr, len + size, ts);
460,982✔
339
  }
340

341
  // end
342
  size += snprintf(pstr + len + size, TSDB_MAX_ALLOWED_SQL_LEN - len - size, "%s", ")");
35,479✔
343

344
  return size;
35,479✔
345
}
346

347

348
// create columns data
349
uint32_t createColsData(threadInfo* info, SSuperTable* stb, char* pstr, uint32_t len, int64_t ts, int64_t* k) {
63,312,847✔
350
  uint32_t size = 0;
63,312,847✔
351

352
  // gen row data
353
  if (stb->genRowRule == RULE_MIX_ALL) {
63,312,847!
354
    size = genRowMixAll(info, stb, pstr, len, ts, k);
63,321,529✔
355
  } else if (stb->genRowRule == RULE_MIX_TS_CALC) {
×
356
    size = genRowTsCalc(info, stb, pstr, len, ts);
35,489✔
357
  } else {  // random
358
    int32_t pos = RD(g_arguments->prepared_rand);
×
359
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%" PRId64 ",%s)", ts, stb->sampleDataBuf + pos * stb->lenOfCols);
×
360
  }
361

362
  // check sql
363
  if(info->csql) {
63,373,138✔
364
    info->clen += snprintf(info->csql + info->clen, TSDB_MAX_ALLOWED_SQL_LEN - info->clen, "%" PRId64 ",", ts);
41,978✔
365
  }
366

367
  return size;
63,373,138✔
368
}
369

370
// take out row
371
bool takeRowOutToBuf(SMixRatio* mix, uint8_t type, int64_t ts) {
25,711,878✔
372
    int64_t* buf = mix->buf[type];
25,711,878✔
373
    if(buf == NULL){
25,711,878✔
374
        return false;
300,968✔
375
    }
376

377
    if(mix->bufCnt[type] >= mix->capacity[type]) {
25,410,910✔
378
        // no space to save
379
        return false;
11,099✔
380
    }
381

382
    uint64_t bufCnt = mix->bufCnt[type];
25,399,811✔
383

384
    // save
385
    buf[bufCnt] = ts;
25,399,811✔
386
    // move next
387
    mix->bufCnt[type] += 1;
25,399,811✔
388

389
    return true;
25,399,811✔
390
}
391

392
//
393
// row rule mix , global info put into mix
394
//
395
#define MIN_COMMIT_ROWS 10000
396
uint32_t appendRowRuleMix(threadInfo* info, SSuperTable* stb, SMixRatio* mix, char* pstr, 
57,056,885✔
397
                        uint32_t len, int64_t ts, uint32_t* pGenRows, int64_t *k) {
398
    uint32_t size = 0;
57,056,885✔
399
    // remain need generate rows
400
    bool forceDis = FORCE_TAKEOUT(MDIS);
57,056,885✔
401
    bool forceUpd = FORCE_TAKEOUT(MUPD);
57,056,885✔
402

403
    // disorder
404
    if ( forceDis || NEED_TAKEOUT_ROW_TOBUF(MDIS)) {
57,056,885✔
405
        // need take out current row to buf
406
        if(takeRowOutToBuf(mix, MDIS, ts)){
12,888,682✔
407
            return 0;
12,780,430✔
408
        }
409
    }
410

411
    // gen col data
412
    size = createColsData(info, stb, pstr, len, ts, k);
44,317,778✔
413
    if(size > 0) {
44,335,079!
414
      //record counter
415
      *k += 1;
44,348,612✔
416
      *pGenRows += 1;
44,348,612✔
417
      debugPrint("    row ord ts=%" PRId64 " k=%"PRId64"\n", ts, *k);
44,348,612!
418
    }
419

420
    // update
421
    if (forceUpd || NEED_TAKEOUT_ROW_TOBUF(MUPD)) {
44,335,079✔
422
        takeRowOutToBuf(mix, MUPD, ts);
12,814,281✔
423
    }
424

425
    return size;
44,310,699✔
426
}
427

428
//
429
// fill update rows from mix
430
//
431
uint32_t fillBatchWithBuf(threadInfo* info, SSuperTable* stb, SMixRatio* mix, int64_t startTime, char* pstr, 
3,511,833✔
432
                uint32_t len, uint32_t* pGenRows, uint8_t type, uint32_t maxFill, bool force, int64_t *k) {
433
    uint32_t size = 0;
3,511,833✔
434
    if (maxFill == 0) return 0;
3,511,833!
435

436
    uint32_t rdFill = (uint32_t)(RD(maxFill) * 0.75);
3,511,833!
437
    if(rdFill == 0) {
3,511,912✔
438
        rdFill = maxFill;
822,679✔
439
    }
440

441
    int64_t* buf = mix->buf[type];
3,511,912✔
442
    int32_t  bufCnt = mix->bufCnt[type];
3,511,912✔
443

444
    if (force) {
3,511,912✔
445
        rdFill = bufCnt > maxFill ? maxFill : bufCnt;
986,687✔
446
    } else {
447
        if (rdFill > bufCnt) {
2,525,225✔
448
            rdFill = bufCnt / 2;
23,346✔
449
        }
450
    }
451

452
    // fill from buf
453
    int32_t selCnt = 0;
3,511,912✔
454
    int32_t findCnt = 0;
3,511,912✔
455
    int32_t multiple = force ? 4 : 2;
3,511,912✔
456

457
    while(selCnt < rdFill && bufCnt > 0 && ++findCnt < rdFill * multiple) {
45,973,032!
458
        // get ts
459
        int32_t i = RD(bufCnt);
42,424,398!
460
        int64_t ts = buf[i];
42,466,060✔
461
        if( ts >= startTime) {
42,466,060✔
462
            // in current batch , ignore
463
            continue;
23,437,235✔
464
        }
465

466
        char sts[128];
467
        sprintf(sts, "%" PRId64, ts);
19,028,825✔
468
        if (info->csql && strstr(info->csql, sts)) {
19,028,825!
469
          infoPrint("   %s found duplicate ts=%" PRId64 "\n", type == MDIS ? "dis" : "upd", ts);
×
470
        }
471

472
        // generate row by ts
473
        size += createColsData(info, stb, pstr, len + size, ts, k);
19,028,825✔
474
        *pGenRows += 1;
19,022,831✔
475
        selCnt ++;
19,022,831✔
476
        debugPrint("    row %s ts=%" PRId64 " \n", type == MDIS ? "dis" : "upd", ts);
19,022,831!
477

478
        // remove current item
479
        mix->bufCnt[type] -= 1;
19,023,885✔
480
        buf[i] = buf[bufCnt - 1]; // last set to current
19,023,885✔
481
        bufCnt = mix->bufCnt[type];
19,023,885✔
482
    }
483

484
    return size;
3,548,634✔
485
}
486

487

488
//
489
// generate  insert batch body, return rows in batch
490
//
491
uint32_t genBatchSql(threadInfo* info, SSuperTable* stb, SMixRatio* mix, int64_t* pStartTime, char* pstr, 
52,174✔
492
                     uint32_t slen, STotal* pBatT, int32_t *pkCur, int32_t *pkCnt, int64_t *k) {
493
  int32_t genRows = 0;
52,174✔
494
  int64_t  ts = *pStartTime;
52,174✔
495
  int64_t  startTime = *pStartTime;
52,174✔
496
  uint32_t len = slen; // slen: start len
52,174✔
497

498
  bool forceDis = FORCE_TAKEOUT(MDIS);
52,174✔
499
  bool forceUpd = FORCE_TAKEOUT(MUPD);
52,174✔
500
  int32_t timestamp_step = stb->timestamp_step;
52,174✔
501
  // full disorder
502
  if(FULL_DISORDER(stb)) timestamp_step *= -1;
52,174!
503

504
  debugPrint("  batch gen StartTime=%" PRId64 " batchID=%d \n", *pStartTime, mix->curBatchCnt);
52,174!
505

506
  while ( genRows < g_arguments->reqPerReq) {
57,903,440!
507
    int32_t last = genRows;
57,961,772✔
508
    if(stb->genRowRule == RULE_OLD) {
57,961,772!
509
        len += appendRowRuleOld(stb, pstr, len, ts);
×
510
        genRows ++;
×
511
        pBatT->ordRows ++;
×
512
    } else {
513
        char sts[128];
514
        sprintf(sts, "%" PRId64, ts);
57,961,772✔
515

516
        // add new row (maybe del)
517
        if (mix->insertedRows + pBatT->disRows + pBatT->ordRows  < mix->insertRows) {
57,961,772✔
518
          uint32_t ordRows = 0;
57,141,478✔
519
          if(info->csql && strstr(info->csql, sts)) {
57,141,478!
520
            infoPrint("   ord found duplicate ts=%" PRId64 " rows=%" PRId64 "\n", ts, pBatT->ordRows);
×
521
          }
522

523
          len += appendRowRuleMix(info, stb, mix, pstr, len, ts, &ordRows, k);
57,141,478✔
524
          if (ordRows > 0) {
56,995,409✔
525
            genRows += ordRows;
44,312,215✔
526
            pBatT->ordRows += ordRows;
44,312,215✔
527
            //infoPrint("   ord ts=%" PRId64 " rows=%" PRId64 "\n", ts, pBatT->ordRows);
528
          } else {
529
            // takeout to disorder list, so continue to gen
530
            last = -1;
12,683,194✔
531
          }
532
        }
533

534
        if(genRows >= g_arguments->reqPerReq) {
57,815,703✔
535
          // move to next batch start time
536
          ts += timestamp_step;
33,301✔
537
          break;
40,441✔
538
        }
539

540
        if( forceUpd || RD(stb->fillIntervalUpd) == 0) {
57,782,402✔
541
            // fill update rows from buffer
542
            uint32_t maxFill = stb->fillIntervalUpd/3;
3,328,351✔
543
            if(maxFill > g_arguments->reqPerReq - genRows) {
3,328,351✔
544
              maxFill = g_arguments->reqPerReq - genRows;
14,536✔
545
            }
546
            // calc need count
547
            int32_t remain = mix->genCnt[MUPD] - mix->doneCnt[MUPD] - pBatT->updRows;
3,328,351✔
548
            if (remain > 0) {
3,328,351✔
549
              if (maxFill > remain) {
3,144,883✔
550
                maxFill = remain;
9,895✔
551
              }
552

553
              uint32_t updRows = 0;
3,144,883✔
554
              len += fillBatchWithBuf(info, stb, mix, startTime, pstr, len, &updRows, MUPD, maxFill, forceUpd, k);
3,144,883✔
555
              if (updRows > 0) {
3,144,532✔
556
                genRows += updRows;
1,437,470✔
557
                pBatT->updRows += updRows;
1,437,470✔
558
                debugPrint("   upd ts=%" PRId64 " rows=%" PRId64 "\n", ts, pBatT->updRows);
1,437,470!
559
                if (genRows >= g_arguments->reqPerReq) {
1,437,471✔
560
                  // move to next batch start time
561
                  ts += timestamp_step;
7,140✔
562
                  break;
7,140✔
563
                }
564
              }
565
            }
566
        }
567

568
        if( forceDis || RD(stb->fillIntervalDis) == 0) {
57,843,967✔
569
            // fill disorder rows from buffer
570
            uint32_t maxFill = stb->fillIntervalDis/3;
1,359,041✔
571
            if(maxFill > g_arguments->reqPerReq - genRows) {
1,359,041✔
572
              maxFill = g_arguments->reqPerReq - genRows;
136,428✔
573
            }
574
            // calc need count
575
            int32_t remain = mix->genCnt[MDIS] - mix->doneCnt[MDIS] - pBatT->disRows;
1,359,041✔
576
            if (remain > 0) {
1,359,041✔
577
              if (maxFill > remain) {
367,373✔
578
                maxFill = remain;
14,867✔
579
              }
580

581
              uint32_t disRows = 0;
367,373✔
582
              len += fillBatchWithBuf(info, stb, mix, startTime, pstr, len, &disRows, MDIS, maxFill, forceDis, k);
367,373✔
583
              if (disRows > 0) {
367,358✔
584
                genRows += disRows;
132,280✔
585
                pBatT->disRows += disRows;
132,280✔
586
                debugPrint("   dis ts=%" PRId64 " rows=%" PRId64 "\n", ts, pBatT->disRows);
132,280!
587
              }
588
            }
589
        }
590
    } // if RULE_
591

592
    // move next ts
593
    if (!stb->primary_key || needChangeTs(stb, pkCur, pkCnt)) {
57,854,303!
594
      ts += timestamp_step;
57,954,558✔
595
    }
596

597
    // check over TSDB_MAX_ALLOWED_SQL_LENGTH
598
    if (len > (TSDB_MAX_ALLOWED_SQL_LEN - stb->lenOfCols - 320)) {
57,862,530!
599
      break;
×
600
    }
601

602
    if(genRows == last) {
57,862,530✔
603
      // now new row fill
604
      break;
11,266✔
605
    }
606
  } // while
607

UNCOV
608
  *pStartTime = ts;
×
UNCOV
609
  debugPrint("  batch gen EndTime=  %" PRId64 " genRows=%d \n", *pStartTime, genRows);
×
610

611
  return genRows;
52,180✔
612
}
613

614
//
615
// generate delete batch body
616
//
617
uint32_t genBatchDelSql(SSuperTable* stb, SMixRatio* mix, int64_t batStartTime, TAOS* taos, char* tbName, char* pstr, uint32_t slen, char * sql) {
48,572✔
618
  if (stb->genRowRule != RULE_MIX_ALL) {
48,572✔
619
    return 0;
315✔
620
  }
621

622
  int64_t range = ABS_DIFF(batStartTime, stb->startTimestamp);
48,257!
623
  int64_t rangeCnt = range / (stb->timestamp_step == 0 ? 1 : stb->timestamp_step);
48,257!
624

625
  if (rangeCnt < 200) return 0;
48,257✔
626

627
  int32_t batCnt  = mix->insertRows / g_arguments->reqPerReq;
48,256✔
628
  if(batCnt ==0) batCnt = 1;
48,256!
629
  int32_t eachCnt = mix->genCnt[MDEL] / batCnt;
48,256✔
630
  if(eachCnt == 0) eachCnt = 1;
48,256!
631

632
  // get count
633
  uint32_t count = RD(eachCnt * 2);
48,256✔
634
  if (count > rangeCnt) {
48,258!
635
    count = rangeCnt;
×
636
  }
637
  if (count == 0) count = 1;
48,258✔
638

639
  int64_t ds = batStartTime - RD(range);
48,258✔
640
  if(FULL_DISORDER(stb)) ds = batStartTime + RD(range);
48,258!
641

642
  int64_t de = ds + count * stb->timestamp_step;
48,258✔
643

644
  char where[128] = "";
48,258✔
645
  sprintf(where, " ts >= %" PRId64 " and ts < %" PRId64 ";", ds, de);
48,258✔
646
  sprintf(sql, "select count(*) from %s where %s", tbName, where);
48,258✔
647

648
  int64_t count64 = 0;
48,258✔
649
  queryCnt(taos, sql, &count64);
48,258✔
650
  if(count64 == 0) return 0;
48,257✔
651
  count = count64;
47,744✔
652

653
  snprintf(pstr + slen, TSDB_MAX_ALLOWED_SQL_LEN - slen, "%s", where);
47,744✔
654
  //infoPrint("  batch delete cnt=%d range=%s \n", count, where);
655

656
  return count;
47,744✔
657
}
658

659
void appendEndCheckSql(threadInfo* info) {
420✔
660
  char * csql = info->csql;
420✔
661
  int32_t len = strlen(csql);
420✔
662
  if(len < 5) return ;
420!
663

664
  if(csql[len-1] == ',') {
420!
665
    csql[len-1] = ')';
420✔
666
    csql[len] = 0;
420✔
667
  } else {
668
    strcat(csql, ")");
×
669
  }
670
}
671

672
bool checkSqlsResult(threadInfo* info, int32_t rowsCnt, char* tbName, int32_t loop) {
420✔
673

674
  // info
675
  if(info->conn->ctaos == NULL) {
420!
676
    return false;
×
677
  }
678
  if(info->clen <= 5 || info->csql == NULL) {
420!
679
    return false;
×
680
  }
681

682
  int64_t count = 0;
420✔
683
  queryCnt(info->conn->ctaos, info->csql, &count);
420✔
684
  if(count == 0) {
420!
685
    return false;
×
686
  }
687

688
  if (count != rowsCnt) {
420!
689
    errorPrint("  %s check write count error. loop=%d query: %" PRId64 " inserted: %d\n", tbName, loop, count, rowsCnt);
×
690
    infoPrint("  insert sql:%s\n", info->buffer);
×
691
    infoPrint("  query  sql:%s\n", info->csql);
×
692
    return false;
×
693
  } else {
694
    infoPrint("  %s check write count ok. loop=%d query: %" PRId64 " inserted: %d\n", tbName, loop, count, rowsCnt);
420✔
695
  }
696

697
  return true;
420✔
698
}
699

700
int32_t errQuertCnt = 0;
701
int32_t errLastCnt = 0;
702

703
bool checkCorrect(threadInfo* info, SDataBase* db, SSuperTable* stb, char* tbName, int64_t lastTs) {
40✔
704
  char     sql[512];
705
  int64_t  count = 0, ts = 0;
40✔
706
  uint64_t calcCount = (lastTs - stb->startTimestamp) / stb->timestamp_step + 1;
40✔
707

708
  // check count correct
709
  sprintf(sql, "select count(*) from %s.%s ", db->dbName, tbName);
40✔
710
  int32_t loop = 0;
40✔
711
  int32_t code = 0;
40✔
712

713
  do {
714
    code = queryCnt(info->conn->taos, sql, &count);
40✔
715
    if(code == 0  && count == 0) {
40!
716
      errQuertCnt++;
×
717
      errorPrint("  *** WARNING:  %s query count return zero. all error count=%d ***\n", tbName, errQuertCnt);
×
718
    }
719
    if(stb->trying_interval > 0 && (code != 0 || count == 0 )) {
40!
720
      toolsMsleep(stb->trying_interval);
×
721
    }
722
  } while( loop++ < stb->keep_trying &&  (code != 0 || count == 0 ));
40!
723
  if (code != 0) {
40!
724
    errorPrint("checkCorrect sql exec error, error code =0x%x sql=%s", code, sql);
×
725
    return false;
×
726
  }
727
  if (count != calcCount) {
40!
728
    errorPrint("checkCorrect query count unexpected, tbname=%s query=%" PRId64 " expect=%" PRId64, tbName, count,
×
729
               calcCount);
730

731
    return false;
×
732
  }
733

734
  // check last(ts) correct
735
  sprintf(sql, "select last(ts) from %s.%s ", db->dbName, tbName);
40✔
736
  loop = 0;
40✔
737
  do {
738
    code = queryTS(info->conn->taos, sql, &ts);
40✔
739
    if(code == 0  && ts == 0) {
40!
740
      errLastCnt++;
×
741
      errorPrint("  *** WARNING:  %s query last ts return zero. all error count=%d ***\n", tbName, errLastCnt);
×
742
    }
743

744
    if(stb->trying_interval > 0 && (code != 0 || ts == 0 )) {
40!
745
      toolsMsleep(stb->trying_interval);
×
746
    }
747
  } while( loop++ < stb->keep_trying &&  (code != 0 || ts == 0 ));
40!
748
  if (code != 0) {
40!
749
    errorPrint("checkCorrect sql exec error, error code =0x%x sql=%s", code, sql);
×
750
    return false;
×
751
  }
752

753
  // check count correct
754
  if (ts != lastTs) {
40!
755
    errorPrint("checkCorrect query last unexpected, tbname=%s query last=%" PRId64 " expect=%" PRId64, tbName, ts,
×
756
               lastTs);
757
    return false;
×
758
  }
759

760
  infoPrint(" checkCorrect %s.%s count=%" PRId64 "  lastTs=%"PRId64 "  ......  passed.\n", db->dbName, tbName, count, ts);
40✔
761

762
  return true;
40✔
763
}
764

765
//
766
// insert data to db->stb with info
767
//
768
bool insertDataMix(threadInfo* info, SDataBase* db, SSuperTable* stb) {
2,420✔
769
  int64_t lastPrintTime = 0;
2,420✔
770
  // check interface
771
  if (stb->iface != TAOSC_IFACE) {
2,420✔
772
    return false;
311✔
773
  }
774

775
  // old rule return false
776
  if(stb->genRowRule == RULE_OLD)   {
2,109✔
777
    return false;
1,434✔
778
  }
779

780
  infoPrint("insert mode is mix. generate_row_rule=%d\n", stb->genRowRule);
675✔
781

782
  FILE* csvFile = NULL;
704✔
783
  char* tagData = NULL;
704✔
784
  bool  acreate = (stb->genRowRule == RULE_OLD || stb->genRowRule == RULE_MIX_RANDOM) && stb->autoTblCreating;
704!
785
  int   w       = 0;
704✔
786
  if (acreate) {
704!
787
      csvFile = openTagCsv(stb, info->start_table_from);
×
788
      tagData = benchCalloc(TAG_BATCH_COUNT, stb->lenOfTags, false);
×
789
  }
790

791
  // debug
792
  //g_arguments->debug_print = true;
793

794
  STotal total;
795
  memset(&total, 0, sizeof(STotal));
704✔
796

797
  // passed variant set
798
  stb->durMinute = db->durMinute;
704✔
799

800
  // loop insert child tables
801
  int16_t index = info->start_table_from;
704✔
802
  for (uint64_t tbIdx = info->start_table_from; tbIdx <= info->end_table_to && !g_fail; ++tbIdx) {
20,231!
803
    // get child table
804
    SChildTable *childTbl;
805
    if (g_arguments->bind_vgroup) {
19,527!
806
        childTbl = info->vg->childTblArray[tbIdx];
×
807
    } else {
808
        childTbl = stb->childTblArray[tbIdx];
19,527✔
809
    }
810
    char* tbName = childTbl->name;
19,527✔
811

812
    SMixRatio mixRatio;
813
    mixRatioInit(&mixRatio, stb);
19,527✔
814
    int64_t batStartTime = stb->startTimestamp;
19,526✔
815
    int32_t pkCur = 0; // primary key repeat ts current count 
19,526✔
816
    int32_t pkCnt = 0; // primary key repeat ts count  
19,526✔
817
    STotal tbTotal;
818
    memset(&tbTotal, 0 , sizeof(STotal));
19,526✔
819
    int64_t k = 0; // position
19,526✔
820

821
    while (mixRatio.insertedRows < mixRatio.insertRows) {
71,706✔
822
      // check terminate
823
      if (g_arguments->terminate || g_fail) {
52,178!
824
        break;
825
      }
826

827
      if(acreate) {
52,178!
828
          // generator
829
          if (w == 0) {
×
830
              if(!generateTagData(stb, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
×
831
                 FAILED_BREAK()                
×
832
              }
833
          }
834
      }   
835

836
      // generate pre sql  like "insert into tbname ( part column names) values  "
837
      uint32_t len = genInsertPreSql(info, db, stb, tbName, tagData, tbIdx, info->buffer);
52,178✔
838

839
      if(acreate) {
52,176!
840
          // move next
841
          if (++w >= TAG_BATCH_COUNT) {
×
842
              // reset for gen again
843
              w = 0;
×
844
              index += TAG_BATCH_COUNT;
×
845
          } 
846
      }
847

848
      // batch create sql values
849
      STotal batTotal;
850
      memset(&batTotal, 0 , sizeof(STotal));
52,176✔
851
      uint32_t batchRows = genBatchSql(info, stb, &mixRatio, &batStartTime, info->buffer, len, &batTotal, &pkCur, &pkCnt, &k);
52,176✔
852

853
      // execute insert sql
854
      int64_t startTs = toolsGetTimestampUs();
52,180✔
855
      //g_arguments->debug_print = false;
856

857
      if(execInsert(info, batchRows, NULL) != 0) {
52,180!
858
        FAILED_BREAK()
×
859
      }
860
      //g_arguments->debug_print = true;
861
      int64_t endTs = toolsGetTimestampUs();
52,177✔
862

863
      // exec sql ok , update bat->total to table->total
864
      if (batTotal.ordRows > 0) {
52,178✔
865
        tbTotal.ordRows += batTotal.ordRows;
36,181✔
866
      }
867

868
      if (batTotal.disRows > 0) {
52,178✔
869
        tbTotal.disRows += batTotal.disRows;
31,319✔
870
        mixRatio.doneCnt[MDIS] += batTotal.disRows;
31,319✔
871
      }
872

873
      if (batTotal.updRows > 0) {
52,178✔
874
        tbTotal.updRows += batTotal.updRows;
32,227✔
875
        mixRatio.doneCnt[MUPD] += batTotal.updRows;
32,227✔
876
      }
877

878
      // calc inserted rows = order rows + disorder rows
879
      mixRatio.insertedRows = tbTotal.ordRows + tbTotal.disRows;
52,178✔
880

881
      // need check sql
882
      if(g_arguments->check_sql) {
52,178✔
883
        appendEndCheckSql(info);
420✔
884
        int32_t loop = 0;
420✔
885
        bool ok = false;
420✔
886
        while(++loop < 10) {
420!
887
          if(!checkSqlsResult(info, batchRows, tbName, loop)) {
420!
888
            toolsMsleep(500);
×
889
            continue;
×
890
          }
891
          ok = true;
420✔
892
          break;
420✔
893
        }
894

895
        if (!ok) {
420!
896
          FAILED_BREAK()
×
897
        }
898
      }
899

900
      // delete
901
      if (needExecDel(&mixRatio)) {
52,178✔
902
        len = genDelPreSql(db, stb, tbName, info->buffer);
48,572✔
903
        char querySql[512] =  {0};
48,573✔
904
        batTotal.delRows = genBatchDelSql(stb, &mixRatio, batStartTime, info->conn->taos,  tbName, info->buffer, len, querySql);
48,573✔
905
        if (batTotal.delRows > 0) {
48,573✔
906
          // g_arguments->debug_print = false;
907
          if (execInsert(info, batTotal.delRows, NULL) != 0) {
47,744!
908
            FAILED_BREAK()
×
909
          }
910

911
          int64_t delCnt = 0;
47,744✔
912
          queryCnt(info->conn->taos, querySql, &delCnt);
47,744✔
913
          if (delCnt != 0) {
47,743✔
914
            errorPrint(" del not clear zero. query count=%" PRId64 " \n  delete sql=%s\n  query sql=%s\n", delCnt, info->buffer, querySql);
1!
915
            FAILED_BREAK();
×
916
          }
917

918
          // g_arguments->debug_print = true;
919
          tbTotal.delRows += batTotal.delRows;
47,742✔
920
          mixRatio.doneCnt[MDEL] += batTotal.delRows;
47,742✔
921
        }
922
      }
923

924
      // flush
925
      if (db->flush) {
52,178✔
926
        char sql[260] = "";
1,040✔
927
        sprintf(sql, "flush database %s", db->dbName);
1,040✔
928
        int32_t code = executeSql(info->conn->taos,sql);
1,040✔
929
        if (code != 0) {
1,040!
930
          perfPrint(" %s failed. error code = 0x%x\n", sql, code);
×
931
        } else {
932
          perfPrint(" %s ok.\n", sql);
1,040!
933
        }
934
      }
935

936
      // sleep if need
937
      if (stb->insert_interval > 0) {
52,178✔
938
        debugPrint("%s() LN%d, insert_interval: %" PRIu64 "\n", __func__, __LINE__, stb->insert_interval);
10!
939
        perfPrint("sleep %" PRIu64 " ms\n", stb->insert_interval);
10!
940
        toolsMsleep((int32_t)stb->insert_interval);
10✔
941
      }
942

943
      // show
944
      int64_t delay = endTs - startTs;
52,176✔
945
      if (delay <= 0) {
52,176!
946
        debugPrint("thread[%d]: startTS: %" PRId64 ", endTS: %" PRId64 "\n", info->threadID, startTs, endTs);
×
947
      } else {
948
        perfPrint("insert execution time is %10.2f ms\n", delay / 1E6);
52,176!
949

950
        int64_t* pdelay = benchCalloc(1, sizeof(int64_t), false);
52,176✔
951
        *pdelay = delay;
52,180✔
952
        benchArrayPush(info->delayList, pdelay);
52,180✔
953
        info->totalDelay += delay;
52,179✔
954
      }
955

956
      int64_t currentPrintTime = toolsGetTimestampMs();
52,179✔
957
      if (currentPrintTime - lastPrintTime > 30 * 1000) {
52,180✔
958
        infoPrint("thread[%d] has currently inserted rows: %" PRIu64 "\n", info->threadID,
777✔
959
                  info->totalInsertRows + tbTotal.ordRows + tbTotal.disRows);
960
        lastPrintTime = currentPrintTime;
777✔
961
      }
962

963
      // batch show
964
      debugPrint("  %s batch %d ord=%" PRId64 " dis=%" PRId64 " upd=%" PRId64 " del=%" PRId64 "\n", tbName,
52,180!
965
                 mixRatio.curBatchCnt, batTotal.ordRows, batTotal.disRows, batTotal.updRows, batTotal.delRows);
966

967
      // total
968
      mixRatio.curBatchCnt++;
52,180✔
969
      if(stb->insert_interval > 0){
52,180✔
970
        toolsMsleep(stb->insert_interval);
10✔
971
      }
972

973
      if (stb->checkInterval > 0 && mixRatio.curBatchCnt % stb->checkInterval == 0) {
52,180!
974
        // need check
975
        int64_t lastTs = batStartTime - stb->timestamp_step;
40✔
976
        if (!checkCorrect(info, db, stb, tbName, lastTs)) {
40!
977
          // at once exit
978
          errorPrint(" \n\n *************  check correct not passed %s.%s ! errQueryCnt=%d errLastCnt=%d *********** \n\n", db->dbName, tbName, errQuertCnt, errLastCnt);
×
979
          FAILED_BREAK()
×
980
        }
981
      }
982

983
    }  // row end
984

985
    // print
986
    if (mixRatio.insertedRows + tbTotal.ordRows + tbTotal.disRows + tbTotal.updRows + tbTotal.delRows > 0) {
19,528✔
987
      infoPrint("table:%s inserted(%" PRId64 ") rows order(%" PRId64 ")  disorder(%" PRId64 ") update(%" PRId64 ") delete(%" PRId64 ") \n",
19,527!
988
                tbName, mixRatio.insertedRows, FULL_DISORDER(stb) ? 0 : tbTotal.ordRows, FULL_DISORDER(stb) ? tbTotal.ordRows : tbTotal.disRows,
989
                tbTotal.updRows, tbTotal.delRows);
990
    }
991

992
    // table total -> all total
993
    total.ordRows += tbTotal.ordRows;
19,528✔
994
    total.delRows += tbTotal.delRows;
19,528✔
995
    total.disRows += tbTotal.disRows;
19,528✔
996
    total.updRows += tbTotal.updRows;
19,528✔
997

998
    info->totalInsertRows +=mixRatio.insertedRows;
19,528✔
999

1000
    mixRatioExit(&mixRatio);
19,528✔
1001
  }  // child table end
1002

1003

1004
  // end
1005
  if (0 == info->totalDelay) info->totalDelay = 1;
704!
1006

1007

1008
  // total
1009
  info->totalInsertRows = total.ordRows + total.disRows;
704✔
1010
  succPrint("thread[%d] %s(), completed total inserted rows: %" PRIu64 ", %.2f records/second\n", info->threadID,
704!
1011
            __func__, info->totalInsertRows, (double)(info->totalInsertRows / ((double)info->totalDelay / 1E6)));
1012

1013
  // print
1014
  succPrint("inserted finished. \n    rows order: %" PRId64 " \n    disorder: %" PRId64 " \n    update: %" PRId64" \n    delete: %" PRId64 " \n",
704!
1015
            total.ordRows, total.disRows, total.updRows, total.delRows);
1016

1017
  //g_arguments->debug_print = false;
1018

1019
  // free
1020
  if(csvFile) {
704!
1021
      fclose(csvFile);
×
1022
  }
1023
  tmfree(tagData);
704✔
1024

1025
  return true;
704✔
1026
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc