• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

trydofor / professional-wings / #125

31 Aug 2024 04:46AM UTC coverage: 63.579% (+0.7%) from 62.919%
#125

push

web-flow
Merge pull request #290 from trydofor/develop

3.2.130

1428 of 2191 new or added lines in 106 files covered. (65.18%)

41 existing lines in 24 files now uncovered.

12923 of 20326 relevant lines covered (63.58%)

0.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.39
/wings/faceless-flywave/src/main/kotlin/pro/fessional/wings/faceless/flywave/SchemaJournalManager.kt
1
package pro.fessional.wings.faceless.flywave
2

3
import org.slf4j.LoggerFactory
4
import org.slf4j.event.Level.ERROR
5
import org.slf4j.event.Level.INFO
6
import org.slf4j.event.Level.WARN
7
import pro.fessional.mirana.data.Null
8
import pro.fessional.wings.faceless.flywave.SqlSegmentProcessor.Companion.TYPE_PLAIN
9
import pro.fessional.wings.faceless.flywave.SqlSegmentProcessor.Companion.TYPE_SHARD
10
import pro.fessional.wings.faceless.flywave.SqlSegmentProcessor.Companion.hasType
11
import pro.fessional.wings.faceless.flywave.impl.DefaultInteractiveManager
12
import pro.fessional.wings.faceless.flywave.util.SimpleJdbcTemplate
13
import pro.fessional.wings.faceless.flywave.util.TemplateUtil
14
import java.util.LinkedList
15
import java.util.concurrent.atomic.AtomicReference
16
import java.util.function.BiConsumer
17
import java.util.function.Function
18
import javax.sql.DataSource
19

20
/**
21
 * Mange `log_update` and `log_delete` of `$schemaJournalTable`
22
 * to auto create or delete Trigger.
23
 *
24
 * @author trydofor
25
 * @since 2019-06-13
26
 */
27
class SchemaJournalManager(
1✔
28
    private val plainDataSources: Map<String, DataSource>,
1✔
29
    private val sqlStatementParser: SqlStatementParser,
1✔
30
    private val schemaDefinitionLoader: SchemaDefinitionLoader,
1✔
31
    private val journalDdl: JournalDdl,
1✔
32
    private val schemaJournalTable: String = "sys_schema_journal"
1✔
33
) : InteractiveManager<SchemaJournalManager.AskType> {
34

35
    enum class AskType {
36
        DropTable, DropTrigger, ManualCheck
1✔
37
    }
38

39
    data class JournalDdl(
1✔
40
        var insTbl: String = Null.Str,
1✔
41
        var insTrg: String = Null.Str,
1✔
42
        var updTbl: String = Null.Str,
1✔
43
        var updTrg: String = Null.Str,
1✔
44
        var delTbl: String = Null.Str,
1✔
45
        var delTrg: String = Null.Str
1✔
46
    )
×
47

48
    companion object {
49
        const val PLAIN_NAME = "{{PLAIN_NAME}}"
50
        const val TABLE_NAME = "{{TABLE_NAME}}"
51
        const val TABLE_BONE = "{{TABLE_BONE}}"
52
        const val TABLE_PKEY = "{{TABLE_PKEY}}"
53
    }
54

55
    private val log = LoggerFactory.getLogger(SchemaJournalManager::class.java)
1✔
56
    private val interactive = DefaultInteractiveManager<AskType>(log, plainDataSources, "🐶")
1✔
57

58
    override fun logWay(func: BiConsumer<String, String>): BiConsumer<String, String> {
59
        return interactive.logWay(func)
×
60
    }
61

62
    override fun askWay(func: Function<String, Boolean>): Function<String, Boolean> {
63
        return interactive.askWay(func)
×
64
    }
65

66
    override fun needAsk(ask: AskType, yes: Boolean): Boolean? {
67
        return interactive.needAsk(ask, yes)
×
68
    }
69

70
    /**
71
     * Apply trace table and its Update Trigger based on DDL templates.
72
     * Trace table, if it exists and has no data, recreate it.
73
     * Trace table, if it exists and has data, ignore if the structure is the same, otherwise throw an error.
74
     * Trigger, if trigger exists, delete and recreate it.
75
     * If neither trace table nor trigger exists, create new one.
76
     *
77
     * @param table plain table
78
     * @param enable enable or disable
79
     * @param commitId commit id of Journal
80
     */
81
    fun publishUpdate(table: String, enable: Boolean, commitId: Long) =
82
        publishJournal(table, enable, commitId, "update")
1✔
83

84
    /**
85
     * Apply trace table and its Delete Trigger based on DDL templates.
86
     * Trace table, if it exists and has no data, recreate it.
87
     * Trace table, if it exists and has data, ignore if the structure is the same, otherwise throw an error.
88
     * Trigger, if trigger exists, delete and recreate it.
89
     * If neither trace table nor trigger exists, create new one.
90
     *
91
     * @param table plain table
92
     * @param enable enable or disable
93
     * @param commitId commit id of Journal
94
     */
95
    fun publishDelete(table: String, enable: Boolean, commitId: Long) =
96
        publishJournal(table, enable, commitId, "delete")
1✔
97

98
    /**
99
     * Apply trace table and its Insert Trigger based on DDL templates.
100
     * Trace table, if it exists and has no data, recreate it.
101
     * Trace table, if it exists and has data, ignore if the structure is the same, otherwise throw an error.
102
     * Trigger, if trigger exists, delete and recreate it.
103
     * If neither trace table nor trigger exists, create new one.
104
     *
105
     * @param table plain table
106
     * @param enable enable or disable
107
     * @param commitId commit id of Journal
108
     */
109
    fun publishInsert(table: String, enable: Boolean, commitId: Long) =
110
        publishJournal(table, enable, commitId, "insert")
1✔
111

112
    /**
113
     * Check all triggers, and can ask whether to delete.
114
     *
115
     * @param table plain table
116
     * @param drop Whether to ask for drop, default false
117
     */
118
    fun manageTriggers(table: String, drop: Boolean = false) {
×
119
        val here = "manageTriggers"
×
120
        interactive.log(INFO, here, "start check triggers on table=$table")
×
121
        for ((plainName, plainDs) in plainDataSources) {
×
122
            interactive.log(INFO, here, "ready to check triggers, table=$table on db=$plainName")
×
123
            val tgs = schemaDefinitionLoader.showBoneTrg(plainDs, table)
×
124
            interactive.log(INFO, here, "find ${tgs.size} triggers, table=$table on db=$plainName")
×
125
            val tmpl = SimpleJdbcTemplate(plainDs, plainName)
×
126
            for (trg in tgs) {
×
127
                val msg = "${trg.name} ${trg.timing} ${trg.action}\n${trg.event}"
×
128
                interactive.log(INFO, here, msg)
×
129
                if (drop && interactive.ask("drop trigger?\n$msg", false)) {
×
130
                    tmpl.execute(schemaDefinitionLoader.makeDdlTrg(trg, true))
×
131
                }
132
            }
133
        }
134
    }
×
135

136
    /**
137
     * Compare the SQL between in local and in database.
138
     * If it does not exist, then save local to database.
139
     * If it exists but the contents are not the same and has been `APPLY`
140
     * then log error, otherwise update it.
141
     *
142
     * @param table plain table
143
     * @param commitId commit id of Journal
144
     */
145
    fun checkAndInitDdl(table: String, commitId: Long) {
146
        val here = "checkAndInitDdl"
1✔
147
        interactive.log(INFO, here, "start check journal table=$table")
1✔
148
        val selectSql = """
1✔
149
                SELECT ddl_instbl, ddl_instrg, ddl_updtbl, ddl_updtrg, ddl_deltbl, ddl_deltrg, log_insert, log_update, log_delete
150
                FROM $schemaJournalTable
1✔
151
                WHERE table_name = ?
152
                """.trimIndent()
1✔
153
        val insertSql = """
1✔
154
                INSERT INTO $schemaJournalTable
1✔
155
                (table_name, commit_id, ddl_instbl, ddl_instrg, ddl_updtbl, ddl_updtrg, ddl_deltbl, ddl_deltrg)
156
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
157
                """.trimIndent()
1✔
158

159
        for ((plainName, plainDs) in plainDataSources) {
1✔
160
            interactive.log(INFO, here, "ready to check journal, table=$table on db=$plainName")
1✔
161
            val tables = schemaDefinitionLoader.showTables(plainDs).associateBy {
1✔
162
                it.lowercase()
1✔
163
            }
164

165
            if (!tables.containsKey(table.lowercase())) {
1✔
166
                throw IllegalArgumentException("table not existed. table=$table")
×
167
            }
168

169
            val tmpl = SimpleJdbcTemplate(plainDs, plainName)
1✔
170
            val dbVal = HashMap<String, String>()
1✔
171
            tmpl.query(selectSql, table) {
1✔
172
                dbVal["ddl_instbl"] = it.getString("ddl_instbl")
×
173
                dbVal["ddl_instrg"] = it.getString("ddl_instrg")
×
174
                dbVal["ddl_updtbl"] = it.getString("ddl_updtbl")
×
175
                dbVal["ddl_updtrg"] = it.getString("ddl_updtrg")
×
176
                dbVal["ddl_deltbl"] = it.getString("ddl_deltbl")
×
177
                dbVal["ddl_deltrg"] = it.getString("ddl_deltrg")
×
178
                dbVal["log_insert"] = it.getString("log_insert")
×
179
                dbVal["log_update"] = it.getString("log_update")
×
180
                dbVal["log_delete"] = it.getString("log_delete")
×
181
            }
×
182

183
            if (dbVal.isEmpty()) {
1✔
184
                interactive.log(INFO, here, "insert journal ddl, table=$table, db=$plainName")
1✔
185
                val rst = tmpl.update(insertSql, table, commitId, journalDdl.insTbl, journalDdl.insTrg, journalDdl.updTbl, journalDdl.updTrg, journalDdl.delTbl, journalDdl.delTrg)
1✔
186
                if (rst != 1) {
1✔
187
                    throw IllegalStateException("failed to insert journal ddl, table=$table, db=$plainName")
×
188
                }
189
                continue
1✔
190
            }
191

192
            // check
193
            val updSql = StringBuilder()
×
194
            val updVal = LinkedList<Any>()
×
195
            val badDif = StringBuilder()
×
196
            val insNot = notApply(dbVal["log_insert"])
×
197
            if (journalDdl.insTbl != dbVal["ddl_instbl"]) {
×
198
                if (insNot) {
×
199
                    interactive.log(WARN, here, "diff ddl-ins-tbl, update it. table=$table, db=$plainName")
×
200
                } else {
201
                    badDif.append("\ninsert-tracer")
×
202
                    interactive.log(WARN, here, "diff applied ddl-ins-tbl, should manually disable it first. table=$table, db=$plainName")
×
203
                    interactive.log(INFO, here, dbVal["ddl_instbl"] ?: "")
×
204
                    interactive.log(INFO, here, journalDdl.insTbl)
×
205
                    if (interactive.needAsk(AskType.ManualCheck)) {
×
206
                        interactive.ask("continue?\nupdate diff applied insert-tracer. table=$table")
×
207
                    }
208
                }
209
                updSql.append("ddl_instbl = ?, ")
×
210
                updVal.add(journalDdl.insTbl)
×
211
            }
212
            if (journalDdl.insTrg != dbVal["ddl_instrg"]) {
×
213
                if (insNot) {
×
214
                    interactive.log(WARN, here, "diff ddl-ins-trg, update it. table=$table, db=$plainName")
×
215
                } else {
216
                    badDif.append("\ninsert-trigger")
×
217
                    interactive.log(WARN, here, "diff applied ddl-ins-trg, should manually disable it first. table=$table, db=$plainName")
×
218
                    interactive.log(INFO, here, dbVal["ddl_instrg"] ?: "")
×
219
                    interactive.log(INFO, here, journalDdl.insTrg)
×
220
                    if (interactive.needAsk(AskType.ManualCheck)) {
×
221
                        interactive.ask("continue?\nupdate diff applied insert-trigger. table=$table")
×
222
                    }
223
                }
224
                updSql.append("ddl_instrg = ?, ")
×
225
                updVal.add(journalDdl.insTrg)
×
226
            }
227

228
            val updNot = notApply(dbVal["log_update"])
×
229
            if (journalDdl.updTbl != dbVal["ddl_updtbl"]) {
×
230
                if (updNot) {
×
231
                    interactive.log(WARN, here, "diff ddl-upd-tbl, update it. table=$table, db=$plainName")
×
232
                } else {
233
                    badDif.append("\nupdate-tracer")
×
234
                    interactive.log(WARN, here, "diff applied ddl-upd-tbl, should manually disable it first. table=$table, db=$plainName")
×
235
                    interactive.log(INFO, here, dbVal["ddl_updtbl"] ?: "")
×
236
                    interactive.log(INFO, here, journalDdl.updTbl)
×
237
                    if (interactive.needAsk(AskType.ManualCheck)) {
×
238
                        interactive.ask("continue?\nupdate diff applied update-tracer. table=$table")
×
239
                    }
240
                }
241
                updSql.append("ddl_updtbl = ?, ")
×
242
                updVal.add(journalDdl.updTbl)
×
243
            }
244

245
            if (journalDdl.updTrg != dbVal["ddl_updtrg"]) {
×
246
                if (updNot) {
×
247
                    interactive.log(WARN, here, "diff ddl-upd-trg, update it. table=$table, db=$plainName")
×
248
                } else {
249
                    badDif.append("\nupdate-trigger")
×
250
                    interactive.log(WARN, here, "diff applied ddl-upd-trg, should manually disable it first. table=$table, db=$plainName")
×
251
                    interactive.log(INFO, here, dbVal["ddl_updtrg"] ?: "")
×
252
                    interactive.log(INFO, here, journalDdl.updTrg)
×
253
                    if (interactive.needAsk(AskType.ManualCheck)) {
×
254
                        interactive.ask("continue?\nupdate diff applied update-trigger. table=$table")
×
255
                    }
256
                }
257
                updSql.append("ddl_updtrg = ?, ")
×
258
                updVal.add(journalDdl.updTrg)
×
259
            }
260

261
            val delNot = notApply(dbVal["log_delete"])
×
262
            if (journalDdl.delTbl != dbVal["ddl_deltbl"]) {
×
263
                if (delNot) {
×
264
                    interactive.log(WARN, here, "diff ddl-del-tbl, update it. table=$table, db=$plainName")
×
265
                } else {
266
                    badDif.append("\ndelete-tracer")
×
267
                    interactive.log(WARN, here, "diff applied ddl-del-tbl, should manually disable it first. table=$table, db=$plainName")
×
268
                    interactive.log(INFO, here, dbVal["ddl_deltbl"] ?: "")
×
269
                    interactive.log(INFO, here, journalDdl.delTbl)
×
270
                    if (interactive.needAsk(AskType.ManualCheck)) {
×
271
                        interactive.ask("continue?\nupdate diff applied delete-tracer. table=$table")
×
272
                    }
273
                }
274
                updSql.append("ddl_deltbl = ?, ")
×
275
                updVal.add(journalDdl.delTbl)
×
276
            }
277
            if (journalDdl.delTrg != dbVal["ddl_deltrg"]) {
×
278
                if (delNot) {
×
279
                    interactive.log(WARN, here, "diff ddl-del-trg, update it. table=$table, db=$plainName")
×
280
                } else {
281
                    badDif.append("\ndelete-trigger")
×
282
                    interactive.log(WARN, here, "diff applied ddl-del-trg, should manually disable it first. table=$table, db=$plainName")
×
283
                    interactive.log(INFO, here, dbVal["ddl_deltrg"] ?: "")
×
284
                    interactive.log(INFO, here, journalDdl.delTrg)
×
285
                    if (interactive.needAsk(AskType.ManualCheck)) {
×
286
                        interactive.ask("continue?\nupdate diff applied delete-trigger. table=$table")
×
287
                    }
288
                }
289
                updSql.append("ddl_deltrg = ?, ")
×
290
                updVal.add(journalDdl.delTrg)
×
291
            }
292

293
            // update
294
            if (updSql.isNotEmpty()) {
×
295
                if (badDif.isNotEmpty() && interactive.needAsk(AskType.ManualCheck)) {
×
296
                    interactive.ask("continue?\ntable=$table $badDif")
×
297
                }
298

299
                updVal.add(commitId)
×
300
                updVal.add(table)
×
301
                val sql = """
×
302
                        UPDATE $schemaJournalTable SET
×
303
                            $updSql
×
304
                            modify_dt = NOW(3),
305
                            commit_id = ?
306
                        WHERE table_name = ?
307
                        """.trimIndent()
×
308
                interactive.log(INFO, here, "update diff journal to database table=$table, db=$plainName")
×
309
                interactive.log(INFO, here, sql)
×
310
                val rst = tmpl.update(
×
311
                    sql, *updVal.toArray()
×
312
                )
313
                if (rst != 1) {
×
314
                    throw IllegalStateException("failed to update table=$table, db=$plainName")
×
315
                }
316
            } else {
317
                interactive.log(INFO, here, "skip all same journal, table=$table, db=$plainName")
×
318
            }
319
        }
320
        interactive.log(INFO, here, "done check journal table=$table")
1✔
321
    }
1✔
322

323
    private fun publishJournal(table: String, enable: Boolean, commitId: Long, event: String) {
324
        val here = "publishJournal"
1✔
325
        interactive.log(INFO, here, "start publish $event table=$table, enable=$enable")
1✔
326

327
        val isInsert = "insert".equals(event, true)
1✔
328
        val isUpdate = "update".equals(event, true)
1✔
329
        val isDelete = "delete".equals(event, true)
1✔
330

331
        val logDate = if (enable) {
1✔
332
            "NOW(3)"
1✔
333
        } else {
334
            "'1000-01-01 00:00:00.000'"
1✔
335
        }
336

337
        val (selectSql, updateSql) = when {
1✔
338
            isInsert -> Pair(
1✔
339
                """
340
                SELECT
341
                    ddl_instbl ddl_tbl,
342
                    ddl_instrg ddl_trg,
343
                    log_insert apply_dt
344
                FROM $schemaJournalTable
1✔
345
                WHERE table_name = ?
346
                """.trimIndent(),
1✔
347
                """
348
                UPDATE $schemaJournalTable SET
1✔
349
                    log_insert = $logDate,
1✔
350
                    commit_id = ?
351
                WHERE table_name = ?
352
                """.trimIndent()
1✔
353
            )
354

355
            isUpdate -> Pair(
1✔
356
                """
357
                SELECT
358
                    ddl_updtbl ddl_tbl,
359
                    ddl_updtrg ddl_trg,
360
                    log_update apply_dt
361
                FROM $schemaJournalTable
1✔
362
                WHERE table_name = ?
363
                """.trimIndent(),
1✔
364
                """
365
                UPDATE $schemaJournalTable SET
1✔
366
                    log_update = $logDate,
1✔
367
                    commit_id = ?
368
                WHERE table_name = ?
369
                """.trimIndent()
1✔
370
            )
371

372
            isDelete -> Pair(
1✔
373
                """
374
                SELECT
375
                    ddl_deltbl ddl_tbl,
376
                    ddl_deltrg ddl_trg,
377
                    log_delete apply_dt
378
                FROM $schemaJournalTable
1✔
379
                WHERE table_name = ?
380
                """.trimIndent(),
1✔
381
                """
382
                UPDATE $schemaJournalTable SET
1✔
383
                    log_delete = $logDate,
1✔
384
                    commit_id = ?
385
                WHERE table_name = ?
386
                """.trimIndent()
1✔
387
            )
388

NEW
389
            else -> throw RuntimeException("unsupported event $event")
×
390
        }
391

392
        val model = HashMap<String, String>()
1✔
393
        for ((plainName, plainDs) in plainDataSources) {
1✔
394
            interactive.log(INFO, here, "ready to publish $event table=$table, enable=$enable, db=$plainName")
1✔
395
            val tmpl = SimpleJdbcTemplate(plainDs, plainName)
1✔
396
            val olds = AtomicReference<Triple<String, String, String>>()
1✔
397
            tmpl.query(selectSql, table) {
1✔
398
                olds.set(
1✔
399
                    Triple(
1✔
400
                        it.getString("ddl_tbl"),
1✔
401
                        it.getString("ddl_trg"),
1✔
402
                        it.getString("apply_dt")
1✔
403
                    )
404
                )
405
            }
1✔
406

407
            if (olds.get() == null) {
1✔
408
                interactive.log(WARN, here, "skip template not found, checkAndInitDdl first, table=$table, db=$plainName")
×
409
                continue
×
410
            }
411

412
            val (tmplTbl, tmplTrg, applyDt) = olds.get()
1✔
413
            if (tmplTbl.isBlank() || tmplTrg.isBlank()) {
1✔
414
                interactive.log(WARN, here, "skip blank template, table=$table, db=$plainName")
×
415
                continue
×
416
            }
417

418
            val tables = schemaDefinitionLoader.showTables(plainDs).associateBy {
1✔
419
                it.lowercase()
1✔
420
            }
421

422
            val staffs = tables.filter {
1✔
423
                val tp = hasType(table, it.value)
1✔
424
                tp == TYPE_SHARD || tp == TYPE_PLAIN
1✔
425
            }.toMap()
1✔
426

427
            interactive.log(INFO, here, "init model, applyDt=$applyDt table=$table, enable=$enable, db=$plainName")
1✔
428
            initModelOnce(table, plainDs, model)
1✔
429

430
            val trcChk = HashMap<String, String>()
1✔
431
            val trcDdl = HashMap<String, String>()
1✔
432
            val trgDdl = HashMap<String, String>()
1✔
433
            val drpTbl = HashMap<String, String>()
1✔
434

435
            val tmpTkn = "___temp_fw79"
1✔
436

437
            // clean temp table
438
            for ((_, tblRaw) in tables) {
1✔
439
                if (tblRaw.contains(tmpTkn, true)) {
1✔
440
                    interactive.log(INFO, here, "remove temp table table=$tblRaw, db=$plainName")
×
441
                    tmpl.execute("DROP TABLE IF EXISTS ${sqlStatementParser.safeName(tblRaw)}")
×
442
                }
443
            }
444

445
            // trace table, delete existed and non-empty
446
            for ((_, tblRaw) in staffs) {
1✔
447
                val ddlTbl = mergeDdl(tmplTbl, model, tblRaw)
1✔
448
                val ddlTrg = mergeDdl(tmplTrg, model, tblRaw)
1✔
449

450
                val curTac = parseTblName(ddlTbl)
1✔
451
                if (curTac.isBlank()) {
1✔
452
                    interactive.log(WARN, here, "skip bad table=$tblRaw, trace-table-ddl =$ddlTbl")
×
453
                    continue
×
454
                }
455

456
                // check trigger
457
                val furTrg = parseTrgName(ddlTrg) // new trigger name
1✔
458
                var refTrc = false // has ref
1✔
459
                for (trg in schemaDefinitionLoader.showBoneTrg(plainDs, tblRaw)) {
1✔
460
                    // delete same name
461
                    if (trg.name.equals(furTrg, true)) {
1✔
462
                        interactive.log(WARN, here, "drop trigger=${trg.name}, existed same name, table=$tblRaw, db=$plainName")
1✔
463
                        if (interactive.needAsk(AskType.DropTrigger)) {
1✔
464
                            interactive.ask("continue?\ndrop trigger=${trg.name}, existed same name")
1✔
465
                        }
466
                        tmpl.execute(schemaDefinitionLoader.makeDdlTrg(trg, true))
1✔
467
                    } else {
468
                        // keep trace table used by trigger
469
                        if (TemplateUtil.isBoundary(trg.event, curTac, false)) {
1✔
470
                            interactive.log(INFO, here, "trigger=${trg.name}, with same trace-table=$curTac, db=$plainName")
1✔
471
                            refTrc = true
1✔
472
                        }
473
                    }
474
                }
475

476
                // check trace table
477
                var newTrc = true
1✔
478
                if (tables.containsKey(curTac.lowercase())) {
1✔
479
                    interactive.log(INFO, here, "existed trace-table=$curTac, table=$tblRaw, db=$plainName")
1✔
480
                    val safeCurTrc = sqlStatementParser.safeName(curTac)
1✔
481
                    val cnt = tmpl.count("SELECT COUNT(1) FROM $safeCurTrc")
1✔
482
                    if (cnt == 0 && !refTrc) {
1✔
483
                        drpTbl["DROP TABLE IF EXISTS $safeCurTrc"] = curTac
1✔
484
                    } else {
485
                        interactive.log(WARN, here, "lazy-check existed $cnt records trace-table=$curTac, table=$tblRaw, db=$plainName")
1✔
486
                        trcChk[curTac] = tblRaw
1✔
487
                        newTrc = false
1✔
488
                    }
489
                }
490
                if (newTrc) {
1✔
491
                    trcDdl[ddlTbl] = tblRaw
1✔
492
                }
493
                trgDdl[ddlTrg] = tblRaw
1✔
494
            }
495

496
            // check existed, all trace tables have same structure.
497
            if (trcChk.isNotEmpty()) {
1✔
498
                val tmpTrc = "$table$tmpTkn"
1✔
499
                val tmpDdl = mergeDdl(tmplTbl, model, tmpTrc)
1✔
500
                val tmpTbl = parseTblName(tmpDdl)
1✔
501
                val safeTmp = sqlStatementParser.safeName(tmpTbl)
1✔
502
                //val tmpRpl = TemplateUtil.replace(tmpDdl, tmpTbl, tmpTrc)
503

504
                tmpl.execute(tmpDdl)
1✔
505
                interactive.log(INFO, here, "create temp-trace-table=$tmpTbl, db=$plainName")
1✔
506
                try {
1✔
507
                    val diffTbl = HashSet<String>()
1✔
508
                    for ((trc, stf) in trcChk) {
1✔
509
                        val df = schemaDefinitionLoader.diffBoneSame(plainDs, tmpTbl, trc)
1✔
510
                        if (df.isNotEmpty()) {
1✔
511
                            diffTbl.add(trc)
×
512
                            interactive.log(ERROR, here, "different trace-table=$trc of staff=$stf, error=$df")
×
513
                        } else {
514
                            interactive.log(INFO, here, "same bone column trace-table=$trc, temp-trace=$tmpTbl")
1✔
515
                        }
516
                    }
517
                    if (diffTbl.isEmpty()) {
1✔
518
                        interactive.log(INFO, here, "existed tracers all the same table=$table, db=$plainName")
1✔
519
                    } else {
520
                        interactive.log(ERROR, here, "need manually check different tracers. table=$table, db=$plainName, tracers=${diffTbl.joinToString(",")}")
×
521
                        if (interactive.needAsk(AskType.ManualCheck)) {
×
522
                            interactive.ask("continue?\ndifferent tracers tracers:\n${diffTbl.joinToString("\n")}\ntable=$table")
×
523
                        }
524
                        continue
×
525
                    }
526
                } finally {
527
                    // delete if created
528
                    tmpl.execute("DROP TABLE IF EXISTS $safeTmp")
1✔
529
                    interactive.log(INFO, here, "remove temp-trace-table=$tmpTbl, db=$plainName")
1✔
530
                }
531
            }
532

533
            for ((ddl, tbl) in drpTbl) {
1✔
534
                interactive.log(WARN, here, "drop trace-table=$tbl, empty existed, table=$table, db=$plainName")
1✔
535
                if (interactive.needAsk(AskType.DropTable)) {
1✔
536
                    interactive.ask("continue?\ndrop tracer=$tbl\nddl=$ddl")
1✔
537
                }
538
                tmpl.execute(ddl)
1✔
539
            }
540

541
            if (enable) {
1✔
542
                interactive.log(INFO, here, "execute enable journal, plain-table=table=$table, db=$plainName")
1✔
543
                for ((ddl, tbl) in trcDdl) {
1✔
544
                    interactive.log(INFO, here, "execute trace-table ddl on table=$tbl, db=$plainName")
1✔
545
                    tmpl.execute(ddl)
1✔
546
                }
547
                for ((ddl, tbl) in trgDdl) {
1✔
548
                    interactive.log(INFO, here, "execute trigger ddl on table=$tbl, db=$plainName")
1✔
549
                    tmpl.execute(ddl)
1✔
550
                }
551
            } else {
552
                interactive.log(INFO, here, "execute disable journal, plain-table=table=$table, db=$plainName")
1✔
553
            }
554

555
            // update status
556
            val rst = tmpl.update(updateSql, commitId, table)
1✔
557
            if (rst != 1) {
1✔
558
                throw IllegalStateException("update journal $rst records, table=$table, db=$plainName")
×
559
            }
560
        }
561

562
        interactive.log(INFO, here, "done publish $event table=$table, enable=$enable")
1✔
563
    }
1✔
564

565
    private fun initModelOnce(table: String, ds: DataSource, map: HashMap<String, String>) {
566
        if (map.isNotEmpty()) {
1✔
567
            return
×
568
        }
569

570
        map[PLAIN_NAME] = table
1✔
571
        val bone = schemaDefinitionLoader.showBoneCol(ds, table).joinToString(",\n")
1✔
572
        map[TABLE_BONE] = bone
1✔
573
        val keys = schemaDefinitionLoader.showPkeyCol(ds, table).joinToString()
1✔
574
        map[TABLE_PKEY] = keys
1✔
575

576
        interactive.log(INFO, "initModelOnce", "init model table=$table, keys=$keys")
1✔
577
    }
1✔
578

579
    private fun mergeDdl(ddl: String, map: HashMap<String, String>, table: String): String {
580
        val tkn = listOf(PLAIN_NAME, TABLE_NAME, TABLE_BONE, TABLE_PKEY)
1✔
581
        val idx = TemplateUtil.parse(ddl, tkn, "'", false)
1✔
582

583
        map[TABLE_NAME] = table
1✔
584
        return TemplateUtil.merge(ddl, idx, map)
1✔
585
    }
586

587
    private fun notApply(str: String?): Boolean {
588
        if (str.isNullOrEmpty()) return true
×
589
        return str.startsWith("1000-01-01") || str.startsWith("999-01-01")
×
590
    }
591

592
    private val trgNameRegex = """\s+TRIGGER\s+[`'"]*([^`'"]+)[`'"]*"""
1✔
593
        .toRegex(setOf(RegexOption.IGNORE_CASE, RegexOption.MULTILINE))
1✔
594

595
    private fun parseTrgName(ddl: String): String {
596
        return trgNameRegex.find(ddl)?.groupValues?.get(1) ?: Null.Str
1✔
597
    }
598

599
    private fun parseTblName(ddl: String) = when (val st = sqlStatementParser.parseTypeAndTable(ddl)) {
1✔
600
        is SqlStatementParser.SqlType.Plain -> st.table
1✔
601
        is SqlStatementParser.SqlType.Shard -> st.table
×
602
        else -> Null.Str
×
603
    }
1✔
604
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc