• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.1
/tools/taos-tools/src/benchInsertMix.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the MIT license as published by the Free Software
6
 * Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 */
12

13
#include "bench.h"
14
#include "benchLog.h"
15
#include "wrapDb.h"
16
#include "benchData.h"
17
#include "benchDataMix.h"
18
#include "benchInsertMix.h"
19

20

21
//
22
// ------------------ mix ratio area -----------------------
23
//
24

25
#define MDIS 0
26
#define MUPD 1
27
#define MDEL 2
28

29
#define MCNT 3
30

31
#define  NEED_TAKEOUT_ROW_TOBUF(type)  (mix->genCnt[type] > 0 && mix->doneCnt[type] + mix->bufCnt[type] < mix->genCnt[type] && taosRandom()%100 <= mix->ratio[type]*2)
32
#define  FORCE_TAKEOUT(type) (mix->insertedRows * 100 / mix->insertRows > 80)
33

34
#define FAILED_BREAK()   \
35
    if (g_arguments->continueIfFail == YES_IF_FAILED) {  \
36
        continue;                        \
37
    } else {                             \
38
        g_fail = true;                   \
39
        break;                           \
40
    }                                    \
41

42
typedef struct {
43
  uint64_t insertRows;   // need insert
44
  uint64_t insertedRows; // already inserted
45

46
  int8_t ratio[MCNT];
47
  int64_t range[MCNT];
48

49
  // need generate count , calc from stb->insertRows * ratio
50
  uint64_t genCnt[MCNT];
51

52
  // status already done count
53
  uint64_t doneCnt[MCNT];
54

55
  // task out from batch to list buffer
56
  TSKEY* buf[MCNT];
57

58
  // buffer cnt
59
  uint64_t capacity[MCNT]; // capacity size for buf
60
  uint64_t bufCnt[MCNT];   // current valid cnt in buf
61

62
  // calc need value
63
  int32_t curBatchCnt;
64

65
} SMixRatio;
66

67
typedef struct {
68
  uint64_t ordRows; // add new order rows
69
  uint64_t updRows;
70
  uint64_t disRows;
71
  uint64_t delRows;
72
} STotal;
73

74

75
void mixRatioInit(SMixRatio* mix, SSuperTable* stb) {
1,020✔
76
  memset(mix, 0, sizeof(SMixRatio));
1,020✔
77
  mix->insertRows = stb->insertRows;
1,020✔
78
  uint32_t batchSize = g_arguments->reqPerReq;
1,020✔
79

80
  if (batchSize == 0) batchSize = 1;
1,020!
81

82
  // set ratio
83
  mix->ratio[MDIS] = stb->disRatio;
1,020✔
84
  mix->ratio[MUPD] = stb->updRatio;
1,020✔
85
  mix->ratio[MDEL] = stb->delRatio;
1,020✔
86

87
  // set range
88
  mix->range[MDIS] = stb->disRange;
1,020✔
89
  mix->range[MUPD] = stb->updRange;
1,020✔
90
  mix->range[MDEL] = stb->delRange;
1,020✔
91

92
  // calc count
93
  mix->genCnt[MDIS] = mix->insertRows * stb->disRatio / 100;
1,020✔
94
  mix->genCnt[MUPD] = mix->insertRows * stb->updRatio / 100;
1,020✔
95
  mix->genCnt[MDEL] = mix->insertRows * stb->delRatio / 100;
1,020✔
96

97
  if(FULL_DISORDER(stb)) mix->genCnt[MDIS] = 0;
1,020!
98

99
  // malloc buffer
100
  for (int32_t i = 0; i < MCNT - 1; i++) {
3,060✔
101
    // max
102
    if (mix->genCnt[i] > 0) {
2,040!
103
      // buffer max count calc
104
      mix->capacity[i] = batchSize * 10 + mix->genCnt[i]* 8 / 1000;
2,040✔
105
      mix->buf[i] = calloc(mix->capacity[i], sizeof(TSKEY));
2,040✔
106
    } else {
107
      mix->capacity[i] = 0;
×
108
      mix->buf[i] = NULL;
×
109
    }
110
    mix->bufCnt[i] = 0;
2,040✔
111
  }
112
}
1,020✔
113

114
void mixRatioExit(SMixRatio* mix) {
1,020✔
115
  // free buffer
116
  for (int32_t i = 0; i < MCNT; i++) {
4,080✔
117
    if (mix->buf[i]) {
3,060✔
118
      free(mix->buf[i]);
2,040✔
119
      mix->buf[i] = NULL;
2,040✔
120
    }
121
  }
122
}
1,020✔
123

124
//
125
//  --------------------- util ----------------
126
//
127

128
// return true can do execute delete sql
129
bool needExecDel(SMixRatio* mix) {
1,220✔
130
  if (mix->genCnt[MDEL] == 0 || mix->doneCnt[MDEL] >= mix->genCnt[MDEL]) {
1,220!
131
    return false;
×
132
  }
133

134
  return true;
1,220✔
135
}
136

137
//
138
// ------------------ gen area -----------------------
139
//
140

141
#define SWAP(cols, l, r)  mid= cols[l]; cols[l]=cols[r]; cols[r]=mid;
142
void randomFillCols(uint16_t* cols, uint16_t max, uint16_t cnt) {
×
143
  uint16_t i;
144
  // fill index
145
  for (i = 0; i < max; i++) {
×
146
    cols[i] = i;
×
147
  }
148

149
  // check special
150
  if (cnt == max || cnt == 1) {
×
151
    return;
×
152
  }
153

154
  // swap cnt with random
155
  for (i = 0; i < cnt; i++) {
×
156
    uint16_t left, right, mid;
157
    left = RD(cnt);
×
158
    right = cnt + RD(max - cnt);
×
159
    SWAP(cols, left, right)
×
160
  }
161
}
162

163
char* genBatColsNames(threadInfo* info, SSuperTable* stb) {
1,000✔
164
  int32_t size = info->nBatCols * (TSDB_COL_NAME_LEN + 2);
1,000✔
165
  char* buf = calloc(1, size);
1,000✔
166
  strcpy(buf, stb->primaryKeyName);
1,000✔
167

168
  for (uint16_t i = 0; i < info->nBatCols; i++) {
14,000✔
169
    uint16_t idx = info->batCols[i];
13,000✔
170
    Field* fd = benchArrayGet(stb->cols, idx);
13,000✔
171
    strcat(buf, ",");
13,000✔
172
    strcat(buf, fd->name);
13,000✔
173
  }
174

175
  return buf;
1,000✔
176
}
177

178
//
179
// generate head
180
//
181
uint32_t genInsertPreSql(threadInfo* info, SDataBase* db, SSuperTable* stb, char* tableName, char* tagData, uint64_t tableSeq, char* pstr) {
1,220✔
182
  uint32_t len = 0;
1,220✔
183

184
  if (stb->genRowRule == RULE_OLD || stb->genRowRule == RULE_MIX_RANDOM) {
1,220!
185
    // ttl
186
    char ttl[20] = "";
220✔
187
    if (stb->ttl != 0) {
220!
188
      sprintf(ttl, "TTL %d", stb->ttl);
×
189
    }
190

191
    if (stb->partialColNum == stb->cols->size) {
220!
192
      if (stb->autoTblCreating) {
220✔
193
        len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s USING %s.%s TAGS (%s) %s VALUES ", STR_INSERT_INTO, db->dbName,
110✔
194
                       tableName, db->dbName, stb->stbName, tagData + stb->lenOfTags * tableSeq, ttl);
110✔
195
      } else {
196
        len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s VALUES ", STR_INSERT_INTO, db->dbName, tableName);
110✔
197
      }
198
    } else {
199
      if (stb->autoTblCreating) {
×
200
        len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s (%s) USING %s.%s TAGS (%s) %s VALUES ", STR_INSERT_INTO, db->dbName,
×
201
                       tableName, stb->partialColNameBuf, db->dbName, stb->stbName,
202
                       tagData + stb->lenOfTags * tableSeq, ttl);
×
203
      } else {
204
        len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s (%s) VALUES ", STR_INSERT_INTO, db->dbName, tableName,
×
205
                       stb->partialColNameBuf);
206
      }
207
    }
208

209
    // generate check sql
210
    if(info->csql) {
220!
211
      info->clen = snprintf(info->csql, TSDB_MAX_ALLOWED_SQL_LEN, "select count(*) from %s.%s where ts in(", db->dbName, tableName);
220✔
212
    }
213
    return len;
220✔
214
  }
215

216
  // generate check sql
217
  if (info->csql) {
1,000!
218
    info->clen = snprintf(info->csql, TSDB_MAX_ALLOWED_SQL_LEN, "select count(*) from %s.%s where ts in(", db->dbName, tableName);
×
219
  }
220

221
  // new mix rule
222
  int32_t max = stb->cols->size > MAX_BATCOLS ? MAX_BATCOLS : stb->cols->size;
1,000✔
223

224
  if(stb->partialColNum > 0 && stb->partialColNum < MAX_BATCOLS) {
2,000!
225
    info->nBatCols = stb->partialColNum;
1,000✔
226
    int j = 0;
1,000✔
227
    for (int i = stb->partialColFrom; i < stb->partialColFrom + stb->partialColNum; i++) {
14,000✔
228
      info->batCols[j++] = i;
13,000✔
229
    }
230
  } else {
231
    info->nBatCols = RD(max) + 1;
×
232
    // random select cnt elements from max
233
    randomFillCols(info->batCols, max, info->nBatCols);
×
234
  }
235
  
236
  char * colNames = genBatColsNames(info, stb);
1,000✔
237
  len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "%s %s.%s (%s) VALUES ", STR_INSERT_INTO, db->dbName, tableName, colNames);
1,000✔
238
  free(colNames);
1,000✔
239

240
  return len;
1,000✔
241
}
242

243
//
244
// generate delete pre sql like "delete from st"
245
//
246
uint32_t genDelPreSql(SDataBase* db, SSuperTable* stb, char* tableName, char* pstr) {
1,220✔
247
  uint32_t len = 0;
1,220✔
248
  // super table name or child table name random select
249
  len = snprintf(pstr, TSDB_MAX_ALLOWED_SQL_LEN, "delete from %s.%s where ", db->dbName, tableName);
1,220✔
250

251
  return len;
1,220✔
252
}
253

254
//
255
// append row to batch buffer
256
//
257
uint32_t appendRowRuleOld(SSuperTable* stb, char* pstr, uint32_t len, int64_t timestamp) {
×
258
  uint32_t size = 0;
×
259
  int32_t  pos = RD(g_arguments->prepared_rand);
×
260
  int      disorderRange = stb->disorderRange;
×
261

262
  if (stb->useSampleTs && !stb->random_data_source) {
×
263
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%s)", stb->sampleDataBuf + pos * stb->lenOfCols);
×
264
  } else {
265
    int64_t disorderTs = 0;
×
266
    if (stb->disorderRatio > 0) {
×
267
      int rand_num = taosRandom() % 100;
×
268
      if (rand_num < stb->disorderRatio) {
×
269
        disorderRange--;
×
270
        if (0 == disorderRange) {
×
271
          disorderRange = stb->disorderRange;
×
272
        }
273
        disorderTs = stb->startTimestamp - disorderRange;
×
274
        debugPrint(
×
275
            "rand_num: %d, < disorderRatio:"
276
            " %d, disorderTs: %" PRId64 "\n",
277
            rand_num, stb->disorderRatio, disorderTs);
278
      }
279
    }
280
    // generate
281
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%" PRId64 ",%s)", disorderTs ? disorderTs : timestamp,
×
282
                    stb->sampleDataBuf + pos * stb->lenOfCols);
×
283
  }
284

285
  return size;
×
286
}
287

288
#define GET_IDX(i) info->batCols[i]
289
uint32_t genRowMixAll(threadInfo* info, SSuperTable* stb, char* pstr, uint32_t len, int64_t ts, int64_t* k) {
2,846,017✔
290
  uint32_t size = 0;
2,846,017✔
291
  // first col is ts
292
  if (stb->useNow) {
2,846,017!
293
    char now[32] = "now";
×
294
    // write future 1% fixed fill
295
    if (stb->writeFuture && RD(100) == 0) {
×
296
      int32_t min = RD(stb->durMinute);
×
297
      if (min <= 0) min = 1;
×
298
      if (min > 120) min -= 60;  // delay 1 hour prevent date time out
×
299
      sprintf(now, "now+%dm", min);
×
300
    }
301

302
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%s", now);
×
303
  } else {
304
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%" PRId64, ts);
2,846,017✔
305
  }
306

307
  // other cols data
308
  for(uint16_t i = 0; i< info->nBatCols; i++) {
39,381,613✔
309
    Field* fd = benchArrayGet(stb->cols, GET_IDX(i));
36,540,572✔
310
    char* prefix = "";
36,369,921✔
311
    if(fd->type == TSDB_DATA_TYPE_BINARY) {
36,369,921✔
312
      if(stb->binaryPrefex) {
2,845,830!
313
        prefix = stb->binaryPrefex;
×
314
      }
315
    } else if(fd->type == TSDB_DATA_TYPE_NCHAR) {
33,524,091✔
316
      if(stb->ncharPrefex) {
2,846,328!
317
        prefix = stb->ncharPrefex;
×
318
      }
319
    }
320

321
    size += dataGenByField(fd, pstr, len + size, prefix, k, VAL_NULL);
36,369,921✔
322
  }
323

324
  // end
325
  size += snprintf(pstr + len + size, TSDB_MAX_ALLOWED_SQL_LEN - len - size, "%s", ")");
2,841,041✔
326

327
  return size;
2,841,041✔
328
}
329

330
uint32_t genRowTsCalc(threadInfo* info, SSuperTable* stb, char* pstr, uint32_t len, int64_t ts) {
×
331
  uint32_t size = 0;
×
332
  // first col is ts
333
  size = snprintf(pstr +len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%" PRId64, ts);
×
334

335
  // other cols data
336
  for(uint16_t i = 0; i< info->nBatCols; i++) {
×
337
    Field* fd = benchArrayGet(stb->cols, GET_IDX(i));
×
338
    size += dataGenByCalcTs(fd, pstr, len + size, ts);
×
339
  }
340

341
  // end
342
  size += snprintf(pstr + len + size, TSDB_MAX_ALLOWED_SQL_LEN - len - size, "%s", ")");
×
343

344
  return size;
×
345
}
346

347

348
// create columns data
349
uint32_t createColsData(threadInfo* info, SSuperTable* stb, char* pstr, uint32_t len, int64_t ts, int64_t* k) {
3,474,266✔
350
  uint32_t size = 0;
3,474,266✔
351

352
  // gen row data
353
  if (stb->genRowRule == RULE_MIX_ALL) {
3,474,266✔
354
    size = genRowMixAll(info, stb, pstr, len, ts, k);
2,846,524✔
355
  } else if (stb->genRowRule == RULE_MIX_TS_CALC) {
627,742!
356
    size = genRowTsCalc(info, stb, pstr, len, ts);
×
357
  } else {  // random
358
    int32_t pos = RD(g_arguments->prepared_rand);
627,742!
359
    size = snprintf(pstr + len, TSDB_MAX_ALLOWED_SQL_LEN - len, "(%" PRId64 ",%s)", ts, stb->sampleDataBuf + pos * stb->lenOfCols);
629,071✔
360
  }
361

362
  // check sql
363
  if(info->csql) {
3,476,272✔
364
    info->clen += snprintf(info->csql + info->clen, TSDB_MAX_ALLOWED_SQL_LEN - info->clen, "%" PRId64 ",", ts);
628,105✔
365
  }
366

367
  return size;
3,476,272✔
368
}
369

370
// take out row
371
bool takeRowOutToBuf(SMixRatio* mix, uint8_t type, int64_t ts) {
1,271,517✔
372
    int64_t* buf = mix->buf[type];
1,271,517✔
373
    if(buf == NULL){
1,271,517!
374
        return false;
×
375
    }
376

377
    if(mix->bufCnt[type] >= mix->capacity[type]) {
1,271,517✔
378
        // no space to save
379
        return false;
85,676✔
380
    }
381

382
    uint64_t bufCnt = mix->bufCnt[type];
1,185,841✔
383

384
    // save
385
    buf[bufCnt] = ts;
1,185,841✔
386
    // move next
387
    mix->bufCnt[type] += 1;
1,185,841✔
388

389
    return true;
1,185,841✔
390
}
391

392
//
393
// row rule mix , global info put into mix
394
//
395
#define MIN_COMMIT_ROWS 10000
396
uint32_t appendRowRuleMix(threadInfo* info, SSuperTable* stb, SMixRatio* mix, char* pstr, 
4,314,503✔
397
                        uint32_t len, int64_t ts, uint32_t* pGenRows, int64_t *k) {
398
    uint32_t size = 0;
4,314,503✔
399
    // remain need generate rows
400
    bool forceDis = FORCE_TAKEOUT(MDIS);
4,314,503✔
401
    bool forceUpd = FORCE_TAKEOUT(MUPD);
4,314,503✔
402

403
    // disorder
404
    if ( forceDis || NEED_TAKEOUT_ROW_TOBUF(MDIS)) {
4,314,503!
405
        // need take out current row to buf
406
        if(takeRowOutToBuf(mix, MDIS, ts)){
1,014,808✔
407
            return 0;
929,956✔
408
        }
409
    }
410

411
    // gen col data
412
    size = createColsData(info, stb, pstr, len, ts, k);
3,381,751✔
413
    if(size > 0) {
3,385,686!
414
      //record counter
415
      *k += 1;
3,385,843✔
416
      *pGenRows += 1;
3,385,843✔
417
      debugPrint("    row ord ts=%" PRId64 " k=%"PRId64"\n", ts, *k);
3,385,843!
418
    }
419

420
    // update
421
    if (forceUpd || NEED_TAKEOUT_ROW_TOBUF(MUPD)) {
3,385,686!
422
        takeRowOutToBuf(mix, MUPD, ts);
262,791✔
423
    }
424

425
    return size;
3,385,111✔
426
}
427

428
//
429
// fill update rows from mix
430
//
431
uint32_t fillBatchWithBuf(threadInfo* info, SSuperTable* stb, SMixRatio* mix, int64_t startTime, char* pstr, 
247,590✔
432
                uint32_t len, uint32_t* pGenRows, uint8_t type, uint32_t maxFill, bool force, int64_t *k) {
433
    uint32_t size = 0;
247,590✔
434
    if (maxFill == 0) return 0;
247,590!
435

436
    uint32_t rdFill = (uint32_t)(RD(maxFill) * 0.75);
247,590!
437
    if(rdFill == 0) {
247,600✔
438
        rdFill = maxFill;
56,927✔
439
    }
440

441
    int64_t* buf = mix->buf[type];
247,600✔
442
    int32_t  bufCnt = mix->bufCnt[type];
247,600✔
443

444
    if (force) {
247,600✔
445
        rdFill = bufCnt > maxFill ? maxFill : bufCnt;
52✔
446
    } else {
447
        if (rdFill > bufCnt) {
247,548✔
448
            rdFill = bufCnt / 2;
2,623✔
449
        }
450
    }
451

452
    // fill from buf
453
    int32_t selCnt = 0;
247,600✔
454
    int32_t findCnt = 0;
247,600✔
455
    int32_t multiple = force ? 4 : 2;
247,600✔
456

457
    while(selCnt < rdFill && bufCnt > 0 && ++findCnt < rdFill * multiple) {
2,300,517!
458
        // get ts
459
        int32_t i = RD(bufCnt);
2,052,587!
460
        int64_t ts = buf[i];
2,052,950✔
461
        if( ts >= startTime) {
2,052,950✔
462
            // in current batch , ignore
463
            continue;
1,962,998✔
464
        }
465

466
        char sts[128];
467
        sprintf(sts, "%" PRId64, ts);
89,952✔
468
        if (info->csql && strstr(info->csql, sts)) {
89,952!
469
          infoPrint("   %s found duplicate ts=%" PRId64 "\n", type == MDIS ? "dis" : "upd", ts);
×
470
        }
471

472
        // generate row by ts
473
        size += createColsData(info, stb, pstr, len + size, ts, k);
89,952✔
474
        *pGenRows += 1;
89,926✔
475
        selCnt ++;
89,926✔
476
        debugPrint("    row %s ts=%" PRId64 " \n", type == MDIS ? "dis" : "upd", ts);
89,926!
477

478
        // remove current item
479
        mix->bufCnt[type] -= 1;
89,919✔
480
        buf[i] = buf[bufCnt - 1]; // last set to current
89,919✔
481
        bufCnt = mix->bufCnt[type];
89,919✔
482
    }
483

484
    return size;
247,930✔
485
}
486

487

488
//
489
// generate  insert batch body, return rows in batch
490
//
491
uint32_t genBatchSql(threadInfo* info, SSuperTable* stb, SMixRatio* mix, int64_t* pStartTime, char* pstr, 
1,220✔
492
                     uint32_t slen, STotal* pBatT, int32_t *pkCur, int32_t *pkCnt, int64_t *k) {
493
  int32_t genRows = 0;
1,220✔
494
  int64_t  ts = *pStartTime;
1,220✔
495
  int64_t  startTime = *pStartTime;
1,220✔
496
  uint32_t len = slen; // slen: start len
1,220✔
497

498
  bool forceDis = FORCE_TAKEOUT(MDIS);
1,220✔
499
  bool forceUpd = FORCE_TAKEOUT(MUPD);
1,220✔
500
  int32_t timestamp_step = stb->timestamp_step;
1,220✔
501
  // full disorder
502
  if(FULL_DISORDER(stb)) timestamp_step *= -1;
1,220!
503

504
  debugPrint("  batch gen StartTime=%" PRId64 " batchID=%d \n", *pStartTime, mix->curBatchCnt);
1,220!
505

506
  while ( genRows < g_arguments->reqPerReq) {
4,289,945!
507
    int32_t last = genRows;
4,301,485✔
508
    if(stb->genRowRule == RULE_OLD) {
4,301,485!
509
        len += appendRowRuleOld(stb, pstr, len, ts);
×
510
        genRows ++;
×
511
        pBatT->ordRows ++;
×
512
    } else {
513
        char sts[128];
514
        sprintf(sts, "%" PRId64, ts);
4,301,485✔
515

516
        // add new row (maybe del)
517
        if (mix->insertedRows + pBatT->disRows + pBatT->ordRows  < mix->insertRows) {
4,301,485!
518
          uint32_t ordRows = 0;
4,318,471✔
519
          if(info->csql && strstr(info->csql, sts)) {
4,318,471!
520
            infoPrint("   ord found duplicate ts=%" PRId64 " rows=%" PRId64 "\n", ts, pBatT->ordRows);
×
521
          }
522

523
          len += appendRowRuleMix(info, stb, mix, pstr, len, ts, &ordRows, k);
4,318,471✔
524
          if (ordRows > 0) {
4,303,745✔
525
            genRows += ordRows;
3,385,003✔
526
            pBatT->ordRows += ordRows;
3,385,003✔
527
            //infoPrint("   ord ts=%" PRId64 " rows=%" PRId64 "\n", ts, pBatT->ordRows);
528
          } else {
529
            // takeout to disorder list, so continue to gen
530
            last = -1;
918,742✔
531
          }
532
        }
533

534
        if(genRows >= g_arguments->reqPerReq) {
4,286,759✔
535
          // move to next batch start time
536
          ts += timestamp_step;
1,199✔
537
          break;
1,199✔
538
        }
539

540
        if( forceUpd || RD(stb->fillIntervalUpd) == 0) {
4,285,560!
541
            // fill update rows from buffer
542
            uint32_t maxFill = stb->fillIntervalUpd/3;
800,797✔
543
            if(maxFill > g_arguments->reqPerReq - genRows) {
800,797✔
544
              maxFill = g_arguments->reqPerReq - genRows;
494✔
545
            }
546
            // calc need count
547
            int32_t remain = mix->genCnt[MUPD] - mix->doneCnt[MUPD] - pBatT->updRows;
800,797✔
548
            if (remain > 0) {
800,797✔
549
              if (maxFill > remain) {
140,704✔
550
                maxFill = remain;
50✔
551
              }
552

553
              uint32_t updRows = 0;
140,704✔
554
              len += fillBatchWithBuf(info, stb, mix, startTime, pstr, len, &updRows, MUPD, maxFill, forceUpd, k);
140,704✔
555
              if (updRows > 0) {
140,702✔
556
                genRows += updRows;
8,870✔
557
                pBatT->updRows += updRows;
8,870✔
558
                debugPrint("   upd ts=%" PRId64 " rows=%" PRId64 "\n", ts, pBatT->updRows);
8,870!
559
                if (genRows >= g_arguments->reqPerReq) {
8,870!
560
                  // move to next batch start time
561
                  ts += timestamp_step;
×
562
                  break;
×
563
                }
564
              }
565
            }
566
        }
567

568
        if( forceDis || RD(stb->fillIntervalDis) == 0) {
4,288,315!
569
            // fill disorder rows from buffer
570
            uint32_t maxFill = stb->fillIntervalDis/3;
760,468✔
571
            if(maxFill > g_arguments->reqPerReq - genRows) {
760,468✔
572
              maxFill = g_arguments->reqPerReq - genRows;
2,349✔
573
            }
574
            // calc need count
575
            int32_t remain = mix->genCnt[MDIS] - mix->doneCnt[MDIS] - pBatT->disRows;
760,468✔
576
            if (remain > 0) {
760,468✔
577
              if (maxFill > remain) {
106,902✔
578
                maxFill = remain;
45✔
579
              }
580

581
              uint32_t disRows = 0;
106,902✔
582
              len += fillBatchWithBuf(info, stb, mix, startTime, pstr, len, &disRows, MDIS, maxFill, forceDis, k);
106,902✔
583
              if (disRows > 0) {
106,903✔
584
                genRows += disRows;
1,527✔
585
                pBatT->disRows += disRows;
1,527✔
586
                debugPrint("   dis ts=%" PRId64 " rows=%" PRId64 "\n", ts, pBatT->disRows);
1,527!
587
              }
588
            }
589
        }
590
    } // if RULE_
591

592
    // move next ts
593
    if (!stb->primary_key || needChangeTs(stb, pkCur, pkCnt)) {
4,288,745!
594
      ts += timestamp_step;
4,303,496✔
595
    }
596

597
    // check over TSDB_MAX_ALLOWED_SQL_LENGTH
598
    if (len > (TSDB_MAX_ALLOWED_SQL_LEN - stb->lenOfCols - 320)) {
4,288,745!
599
      break;
×
600
    }
601

602
    if(genRows == last) {
4,288,745✔
603
      // now new row fill
604
      break;
20✔
605
    }
606
  } // while
607

608
  *pStartTime = ts;
×
609
  debugPrint("  batch gen EndTime=  %" PRId64 " genRows=%d \n", *pStartTime, genRows);
×
610

611
  return genRows;
1,220✔
612
}
613

614
//
615
// generate delete batch body
616
//
617
uint32_t genBatchDelSql(SSuperTable* stb, SMixRatio* mix, int64_t batStartTime, TAOS* taos, char* tbName, char* pstr, uint32_t slen, char * sql) {
1,220✔
618
  if (stb->genRowRule != RULE_MIX_ALL) {
1,220✔
619
    return 0;
220✔
620
  }
621

622
  int64_t range = ABS_DIFF(batStartTime, stb->startTimestamp);
1,000!
623
  int64_t rangeCnt = range / (stb->timestamp_step == 0 ? 1 : stb->timestamp_step);
1,000!
624

625
  if (rangeCnt < 200) return 0;
1,000!
626

627
  int32_t batCnt  = mix->insertRows / g_arguments->reqPerReq;
1,000✔
628
  if(batCnt ==0) batCnt = 1;
1,000!
629
  int32_t eachCnt = mix->genCnt[MDEL] / batCnt;
1,000✔
630
  if(eachCnt == 0) eachCnt = 1;
1,000!
631

632
  // get count
633
  uint32_t count = RD(eachCnt * 2);
1,000!
634
  if (count > rangeCnt) {
1,000!
635
    count = rangeCnt;
×
636
  }
637
  if (count == 0) count = 1;
1,000✔
638

639
  int64_t ds = batStartTime - RD(range);
1,000!
640
  if(FULL_DISORDER(stb)) ds = batStartTime + RD(range);
1,000!
641

642
  int64_t de = ds + count * stb->timestamp_step;
1,000✔
643

644
  char where[128] = "";
1,000✔
645
  sprintf(where, " ts >= %" PRId64 " and ts < %" PRId64 ";", ds, de);
1,000✔
646
  sprintf(sql, "select count(*) from %s where %s", tbName, where);
1,000✔
647

648
  int64_t count64 = 0;
1,000✔
649
  queryCnt(taos, sql, &count64);
1,000✔
650
  if(count64 == 0) return 0;
1,000✔
651
  count = count64;
993✔
652

653
  snprintf(pstr + slen, TSDB_MAX_ALLOWED_SQL_LEN - slen, "%s", where);
993✔
654
  //infoPrint("  batch delete cnt=%d range=%s \n", count, where);
655

656
  return count;
993✔
657
}
658

659
void appendEndCheckSql(threadInfo* info) {
220✔
660
  char * csql = info->csql;
220✔
661
  int32_t len = strlen(csql);
220✔
662
  if(len < 5) return ;
220!
663

664
  if(csql[len-1] == ',') {
220!
665
    csql[len-1] = ')';
220✔
666
    csql[len] = 0;
220✔
667
  } else {
668
    strcat(csql, ")");
×
669
  }
670
}
671

672
bool checkSqlsResult(threadInfo* info, int32_t rowsCnt, char* tbName, int32_t loop) {
222✔
673

674
  // info
675
  if(info->conn->ctaos == NULL) {
222!
676
    return false;
×
677
  }
678
  if(info->clen <= 5 || info->csql == NULL) {
222!
679
    return false;
×
680
  }
681

682
  int64_t count = 0;
222✔
683
  queryCnt(info->conn->ctaos, info->csql, &count);
222✔
684
  if(count == 0) {
222!
685
    return false;
×
686
  }
687

688
  if (count != rowsCnt) {
222✔
689
    errorPrint("  %s check write count error. loop=%d query: %" PRId64 " inserted: %d\n", tbName, loop, count, rowsCnt);
2!
690
    infoPrint("  insert sql:%s\n", info->buffer);
2✔
691
    infoPrint("  query  sql:%s\n", info->csql);
2✔
692
    return false;
2✔
693
  } else {
694
    infoPrint("  %s check write count ok. loop=%d query: %" PRId64 " inserted: %d\n", tbName, loop, count, rowsCnt);
220✔
695
  }
696

697
  return true;
220✔
698
}
699

700
int32_t errQuertCnt = 0;
701
int32_t errLastCnt = 0;
702

703
bool checkCorrect(threadInfo* info, SDataBase* db, SSuperTable* stb, char* tbName, int64_t lastTs) {
×
704
  char     sql[512];
705
  int64_t  count = 0, ts = 0;
×
706
  uint64_t calcCount = (lastTs - stb->startTimestamp) / stb->timestamp_step + 1;
×
707

708
  // check count correct
709
  sprintf(sql, "select count(*) from %s.%s ", db->dbName, tbName);
×
710
  int32_t loop = 0;
×
711
  int32_t code = 0;
×
712

713
  do {
714
    code = queryCnt(info->conn->taos, sql, &count);
×
715
    if(code == 0  && count == 0) {
×
716
      errQuertCnt++;
×
717
      errorPrint("  *** WARNING:  %s query count return zero. all error count=%d ***\n", tbName, errQuertCnt);
×
718
    }
719
    if(stb->trying_interval > 0 && (code != 0 || count == 0 )) {
×
720
      toolsMsleep(stb->trying_interval);
×
721
    }
722
  } while( loop++ < stb->keep_trying &&  (code != 0 || count == 0 ));
×
723
  if (code != 0) {
×
724
    errorPrint("checkCorrect sql exec error, error code =0x%x sql=%s", code, sql);
×
725
    return false;
×
726
  }
727
  if (count != calcCount) {
×
728
    errorPrint("checkCorrect query count unexpected, tbname=%s query=%" PRId64 " expect=%" PRId64, tbName, count,
×
729
               calcCount);
730

731
    return false;
×
732
  }
733

734
  // check last(ts) correct
735
  sprintf(sql, "select last(ts) from %s.%s ", db->dbName, tbName);
×
736
  loop = 0;
×
737
  do {
738
    code = queryTS(info->conn->taos, sql, &ts);
×
739
    if(code == 0  && ts == 0) {
×
740
      errLastCnt++;
×
741
      errorPrint("  *** WARNING:  %s query last ts return zero. all error count=%d ***\n", tbName, errLastCnt);
×
742
    }
743

744
    if(stb->trying_interval > 0 && (code != 0 || ts == 0 )) {
×
745
      toolsMsleep(stb->trying_interval);
×
746
    }
747
  } while( loop++ < stb->keep_trying &&  (code != 0 || ts == 0 ));
×
748
  if (code != 0) {
×
749
    errorPrint("checkCorrect sql exec error, error code =0x%x sql=%s", code, sql);
×
750
    return false;
×
751
  }
752

753
  // check count correct
754
  if (ts != lastTs) {
×
755
    errorPrint("checkCorrect query last unexpected, tbname=%s query last=%" PRId64 " expect=%" PRId64, tbName, ts,
×
756
               lastTs);
757
    return false;
×
758
  }
759

760
  infoPrint(" checkCorrect %s.%s count=%" PRId64 "  lastTs=%"PRId64 "  ......  passed.\n", db->dbName, tbName, count, ts);
×
761

762
  return true;
×
763
}
764

765
//
766
// insert data to db->stb with info
767
//
768
bool insertDataMix(threadInfo* info, SDataBase* db, SSuperTable* stb) {
1,141✔
769
  int64_t lastPrintTime = 0;
1,141✔
770
  // check interface
771
  if (stb->iface != TAOSC_IFACE) {
1,141✔
772
    return false;
298✔
773
  }
774

775
  // old rule return false
776
  if(stb->genRowRule == RULE_OLD)   {
843✔
777
    return false;
803✔
778
  }
779

780
  infoPrint("insert mode is mix. generate_row_rule=%d\n", stb->genRowRule);
40✔
781

782
  FILE* csvFile = NULL;
40✔
783
  char* tagData = NULL;
40✔
784
  bool  acreate = (stb->genRowRule == RULE_OLD || stb->genRowRule == RULE_MIX_RANDOM) && stb->autoTblCreating;
40!
785
  int   w       = 0;
40✔
786
  if (acreate) {
40✔
787
      csvFile = openTagCsv(stb, info->start_table_from);
10✔
788
      tagData = benchCalloc(TAG_BATCH_COUNT, stb->lenOfTags, false);
10✔
789
  }
790

791
  // debug
792
  //g_arguments->debug_print = true;
793

794
  STotal total;
795
  memset(&total, 0, sizeof(STotal));
40✔
796

797
  // passed variant set
798
  stb->durMinute = db->durMinute;
40✔
799

800
  // loop insert child tables
801
  int16_t index = info->start_table_from;
40✔
802
  for (uint64_t tbIdx = info->start_table_from; tbIdx <= info->end_table_to && !g_fail; ++tbIdx) {
1,060!
803
    // get child table
804
    SChildTable *childTbl;
805
    if (g_arguments->bind_vgroup) {
1,020!
806
        childTbl = info->vg->childTblArray[tbIdx];
×
807
    } else {
808
        childTbl = stb->childTblArray[tbIdx];
1,020✔
809
    }
810
    char* tbName = childTbl->name;
1,020✔
811

812
    SMixRatio mixRatio;
813
    mixRatioInit(&mixRatio, stb);
1,020✔
814
    int64_t batStartTime = stb->startTimestamp;
1,020✔
815
    int32_t pkCur = 0; // primary key repeat ts current count 
1,020✔
816
    int32_t pkCnt = 0; // primary key repeat ts count  
1,020✔
817
    STotal tbTotal;
818
    memset(&tbTotal, 0 , sizeof(STotal));
1,020✔
819
    int64_t k = 0; // position
1,020✔
820

821
    while (mixRatio.insertedRows < mixRatio.insertRows) {
2,240✔
822
      // check terminate
823
      if (g_arguments->terminate || g_fail) {
1,220!
824
        break;
825
      }
826

827
      if(acreate) {
1,220✔
828
          // generator
829
          if (w == 0) {
110✔
830
              if(!generateTagData(stb, tagData, TAG_BATCH_COUNT, csvFile, NULL, index)) {
10!
831
                 FAILED_BREAK()                
×
832
              }
833
          }
834
      }   
835

836
      // generate pre sql  like "insert into tbname ( part column names) values  "
837
      uint32_t len = genInsertPreSql(info, db, stb, tbName, tagData, tbIdx, info->buffer);
1,220✔
838

839
      if(acreate) {
1,220✔
840
          // move next
841
          if (++w >= TAG_BATCH_COUNT) {
110!
842
              // reset for gen again
843
              w = 0;
×
844
              index += TAG_BATCH_COUNT;
×
845
          } 
846
      }
847

848
      // batch create sql values
849
      STotal batTotal;
850
      memset(&batTotal, 0 , sizeof(STotal));
1,220✔
851
      uint32_t batchRows = genBatchSql(info, stb, &mixRatio, &batStartTime, info->buffer, len, &batTotal, &pkCur, &pkCnt, &k);
1,220✔
852

853
      // execute insert sql
854
      int64_t startTs = toolsGetTimestampUs();
1,220✔
855
      //g_arguments->debug_print = false;
856

857
      if(execInsert(info, batchRows, NULL) != 0) {
1,220!
858
        FAILED_BREAK()
×
859
      }
860
      //g_arguments->debug_print = true;
861
      int64_t endTs = toolsGetTimestampUs();
1,220✔
862

863
      // exec sql ok , update bat->total to table->total
864
      if (batTotal.ordRows > 0) {
1,220!
865
        tbTotal.ordRows += batTotal.ordRows;
1,220✔
866
      }
867

868
      if (batTotal.disRows > 0) {
1,220✔
869
        tbTotal.disRows += batTotal.disRows;
175✔
870
        mixRatio.doneCnt[MDIS] += batTotal.disRows;
175✔
871
      }
872

873
      if (batTotal.updRows > 0) {
1,220✔
874
        tbTotal.updRows += batTotal.updRows;
120✔
875
        mixRatio.doneCnt[MUPD] += batTotal.updRows;
120✔
876
      }
877

878
      // calc inserted rows = order rows + disorder rows
879
      mixRatio.insertedRows = tbTotal.ordRows + tbTotal.disRows;
1,220✔
880

881
      // need check sql
882
      if(g_arguments->check_sql) {
1,220✔
883
        appendEndCheckSql(info);
219✔
884
        int32_t loop = 0;
220✔
885
        bool ok = false;
220✔
886
        while(++loop < 10) {
222!
887
          if(!checkSqlsResult(info, batchRows, tbName, loop)) {
222✔
888
            toolsMsleep(500);
2✔
889
            continue;
2✔
890
          }
891
          ok = true;
220✔
892
          break;
220✔
893
        }
894

895
        if (!ok) {
220!
896
          FAILED_BREAK()
×
897
        }
898
      }
899

900
      // delete
901
      if (needExecDel(&mixRatio)) {
1,221!
902
        len = genDelPreSql(db, stb, tbName, info->buffer);
1,220✔
903
        char querySql[512] =  {0};
1,220✔
904
        batTotal.delRows = genBatchDelSql(stb, &mixRatio, batStartTime, info->conn->taos,  tbName, info->buffer, len, querySql);
1,220✔
905
        if (batTotal.delRows > 0) {
1,220✔
906
          // g_arguments->debug_print = false;
907
          if (execInsert(info, batTotal.delRows, NULL) != 0) {
993!
908
            FAILED_BREAK()
×
909
          }
910

911
          int64_t delCnt = 0;
993✔
912
          queryCnt(info->conn->taos, querySql, &delCnt);
993✔
913
          if (delCnt != 0) {
993!
914
            errorPrint(" del not clear zero. query count=%" PRId64 " \n  delete sql=%s\n  query sql=%s\n", delCnt, info->buffer, querySql);
×
915
            FAILED_BREAK();
×
916
          }
917

918
          // g_arguments->debug_print = true;
919
          tbTotal.delRows += batTotal.delRows;
993✔
920
          mixRatio.doneCnt[MDEL] += batTotal.delRows;
993✔
921
        }
922
      }
923

924
      // flush
925
      if (db->flush) {
1,220!
926
        char sql[260] = "";
1,220✔
927
        sprintf(sql, "flush database %s", db->dbName);
1,220✔
928
        int32_t code = executeSql(info->conn->taos,sql);
1,220✔
929
        if (code != 0) {
1,220!
930
          perfPrint(" %s failed. error code = 0x%x\n", sql, code);
×
931
        } else {
932
          perfPrint(" %s ok.\n", sql);
1,220!
933
        }
934
      }
935

936
      // sleep if need
937
      if (stb->insert_interval > 0) {
1,220!
938
        debugPrint("%s() LN%d, insert_interval: %" PRIu64 "\n", __func__, __LINE__, stb->insert_interval);
×
939
        perfPrint("sleep %" PRIu64 " ms\n", stb->insert_interval);
×
940
        toolsMsleep((int32_t)stb->insert_interval);
×
941
      }
942

943
      // show
944
      int64_t delay = endTs - startTs;
1,220✔
945
      if (delay <= 0) {
1,220!
946
        debugPrint("thread[%d]: startTS: %" PRId64 ", endTS: %" PRId64 "\n", info->threadID, startTs, endTs);
×
947
      } else {
948
        perfPrint("insert execution time is %10.2f ms\n", delay / 1E6);
1,220!
949

950
        int64_t* pdelay = benchCalloc(1, sizeof(int64_t), false);
1,220✔
951
        *pdelay = delay;
1,220✔
952
        benchArrayPush(info->delayList, pdelay);
1,220✔
953
        info->totalDelay += delay;
1,220✔
954
      }
955

956
      int64_t currentPrintTime = toolsGetTimestampMs();
1,220✔
957
      if (currentPrintTime - lastPrintTime > 30 * 1000) {
1,220✔
958
        infoPrint("thread[%d] has currently inserted rows: %" PRIu64 "\n", info->threadID,
80✔
959
                  info->totalInsertRows + tbTotal.ordRows + tbTotal.disRows);
960
        lastPrintTime = currentPrintTime;
80✔
961
      }
962

963
      // batch show
964
      debugPrint("  %s batch %d ord=%" PRId64 " dis=%" PRId64 " upd=%" PRId64 " del=%" PRId64 "\n", tbName,
1,220!
965
                 mixRatio.curBatchCnt, batTotal.ordRows, batTotal.disRows, batTotal.updRows, batTotal.delRows);
966

967
      // total
968
      mixRatio.curBatchCnt++;
1,220✔
969
      if(stb->insert_interval > 0){
1,220!
970
        toolsMsleep(stb->insert_interval);
×
971
      }
972

973
      if (stb->checkInterval > 0 && mixRatio.curBatchCnt % stb->checkInterval == 0) {
1,220!
974
        // need check
975
        int64_t lastTs = batStartTime - stb->timestamp_step;
×
976
        if (!checkCorrect(info, db, stb, tbName, lastTs)) {
×
977
          // at once exit
978
          errorPrint(" \n\n *************  check correct not passed %s.%s ! errQueryCnt=%d errLastCnt=%d *********** \n\n", db->dbName, tbName, errQuertCnt, errLastCnt);
×
979
          FAILED_BREAK()
×
980
        }
981
      }
982

983
    }  // row end
984

985
    // print
986
    if (mixRatio.insertedRows + tbTotal.ordRows + tbTotal.disRows + tbTotal.updRows + tbTotal.delRows > 0) {
1,020!
987
      infoPrint("table:%s inserted(%" PRId64 ") rows order(%" PRId64 ")  disorder(%" PRId64 ") update(%" PRId64 ") delete(%" PRId64 ") \n",
1,020!
988
                tbName, mixRatio.insertedRows, FULL_DISORDER(stb) ? 0 : tbTotal.ordRows, FULL_DISORDER(stb) ? tbTotal.ordRows : tbTotal.disRows,
989
                tbTotal.updRows, tbTotal.delRows);
990
    }
991

992
    // table total -> all total
993
    total.ordRows += tbTotal.ordRows;
1,020✔
994
    total.delRows += tbTotal.delRows;
1,020✔
995
    total.disRows += tbTotal.disRows;
1,020✔
996
    total.updRows += tbTotal.updRows;
1,020✔
997

998
    info->totalInsertRows +=mixRatio.insertedRows;
1,020✔
999

1000
    mixRatioExit(&mixRatio);
1,020✔
1001
  }  // child table end
1002

1003

1004
  // end
1005
  if (0 == info->totalDelay) info->totalDelay = 1;
40!
1006

1007

1008
  // total
1009
  info->totalInsertRows = total.ordRows + total.disRows;
40✔
1010
  succPrint("thread[%d] %s(), completed total inserted rows: %" PRIu64 ", %.2f records/second\n", info->threadID,
40!
1011
            __func__, info->totalInsertRows, (double)(info->totalInsertRows / ((double)info->totalDelay / 1E6)));
1012

1013
  // print
1014
  succPrint("inserted finished. \n    rows order: %" PRId64 " \n    disorder: %" PRId64 " \n    update: %" PRId64" \n    delete: %" PRId64 " \n",
40!
1015
            total.ordRows, total.disRows, total.updRows, total.delRows);
1016

1017
  //g_arguments->debug_print = false;
1018

1019
  // free
1020
  if(csvFile) {
40!
1021
      fclose(csvFile);
×
1022
  }
1023
  tmfree(tagData);
40✔
1024

1025
  return true;
40✔
1026
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc